1use crate::errors::{AuthError, Result};
23use serde::{Deserialize, Serialize};
24use std::collections::HashMap;
25use std::sync::Arc;
26use std::time::Duration;
27use tokio::sync::RwLock;
28use url::Url;
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct OpaConfig {
35 pub base_url: String,
37 pub default_policy_path: String,
39 pub timeout_secs: u64,
41 pub auth_token: Option<String>,
43 pub enable_cache: bool,
45 pub cache_ttl_secs: u64,
47}
48
49impl Default for OpaConfig {
50 fn default() -> Self {
51 Self {
52 base_url: "http://localhost:8181".to_string(),
53 default_policy_path: "authz/allow".to_string(),
54 timeout_secs: 5,
55 auth_token: None,
56 enable_cache: false,
57 cache_ttl_secs: 60,
58 }
59 }
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct OpaInput {
67 pub input: serde_json::Value,
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
73pub struct OpaResponse {
74 #[serde(default)]
76 pub result: serde_json::Value,
77 #[serde(default)]
79 pub decision_id: Option<String>,
80}
81
82impl OpaResponse {
83 pub fn is_allowed(&self) -> bool {
85 self.result.as_bool().unwrap_or(false)
86 }
87
88 pub fn get_bool(&self, path: &str) -> Option<bool> {
90 let mut current = &self.result;
91 for segment in path.split('.') {
92 current = current.get(segment)?;
93 }
94 current.as_bool()
95 }
96
97 pub fn get_str(&self, path: &str) -> Option<&str> {
99 let mut current = &self.result;
100 for segment in path.split('.') {
101 current = current.get(segment)?;
102 }
103 current.as_str()
104 }
105}
106
107struct CacheEntry {
110 response: OpaResponse,
111 expires_at: u64,
112}
113
114pub struct OpaClient {
118 config: OpaConfig,
119 base_url: Url,
120 http: reqwest::Client,
121 cache: Arc<RwLock<HashMap<String, CacheEntry>>>,
122}
123
124impl OpaClient {
125 pub fn new(config: OpaConfig) -> Result<Self> {
127 let base_url = normalize_opa_base_url(&config.base_url)?;
128
129 let http = reqwest::Client::builder()
130 .timeout(Duration::from_secs(config.timeout_secs))
131 .build()
132 .map_err(|e| AuthError::internal(&format!("HTTP client init failed: {e}")))?;
133
134 Ok(Self {
135 config,
136 base_url,
137 http,
138 cache: Arc::new(RwLock::new(HashMap::new())),
139 })
140 }
141
142 pub async fn query(&self, policy_path: &str, input: serde_json::Value) -> Result<OpaResponse> {
168 if self.config.enable_cache {
170 let cache_key = format!("{}:{}", policy_path, input);
171 let cache = self.cache.read().await;
172 if let Some(entry) = cache.get(&cache_key) {
173 let now = now_secs();
174 if self.config.cache_ttl_secs == 0 || entry.expires_at > now {
175 return Ok(entry.response.clone());
176 }
177 }
178 drop(cache);
179 }
180
181 let url = self.build_api_url("v1/data", policy_path)?;
182 let payload = OpaInput {
183 input: input.clone(),
184 };
185
186 let mut request = self.http.post(url).json(&payload);
187 if let Some(ref token) = self.config.auth_token {
188 request = request.bearer_auth(token);
189 }
190
191 let resp = request
192 .send()
193 .await
194 .map_err(|e| AuthError::internal(&format!("OPA request failed: {e}")))?;
195
196 if !resp.status().is_success() {
197 let status = resp.status();
198 let body = read_error_body(resp).await;
199 return Err(AuthError::internal(&format!(
200 "OPA returned HTTP {status}: {body}"
201 )));
202 }
203
204 let opa_response: OpaResponse = resp
205 .json()
206 .await
207 .map_err(|e| AuthError::internal(&format!("Invalid OPA response: {e}")))?;
208
209 if self.config.enable_cache {
211 let cache_key = format!("{}:{}", policy_path, input);
212 let entry = CacheEntry {
213 response: opa_response.clone(),
214 expires_at: now_secs() + self.config.cache_ttl_secs,
215 };
216 self.cache.write().await.insert(cache_key, entry);
217 }
218
219 Ok(opa_response)
220 }
221
222 pub async fn evaluate(&self, input: serde_json::Value) -> Result<OpaResponse> {
224 self.query(&self.config.default_policy_path, input).await
225 }
226
227 pub async fn is_allowed(&self, input: serde_json::Value) -> Result<bool> {
229 let resp = self.evaluate(input).await?;
230 Ok(resp.is_allowed())
231 }
232
233 pub async fn health_check(&self) -> Result<bool> {
235 let url = self.build_static_url("health")?;
236 let mut request = self.http.get(url);
237 if let Some(ref token) = self.config.auth_token {
238 request = request.bearer_auth(token);
239 }
240 let resp = request
241 .send()
242 .await
243 .map_err(|e| AuthError::internal(&format!("OPA health check failed: {e}")))?;
244 Ok(resp.status().is_success())
245 }
246
247 pub async fn put_policy(&self, policy_id: &str, rego: &str) -> Result<()> {
249 let url = self.build_api_url("v1/policies", policy_id)?;
250 let mut request = self
251 .http
252 .put(url)
253 .header("Content-Type", "text/plain")
254 .body(rego.to_string());
255 if let Some(ref token) = self.config.auth_token {
256 request = request.bearer_auth(token);
257 }
258
259 let resp = request
260 .send()
261 .await
262 .map_err(|e| AuthError::internal(&format!("OPA policy upload failed: {e}")))?;
263
264 if !resp.status().is_success() {
265 let body = read_error_body(resp).await;
266 return Err(AuthError::internal(&format!(
267 "OPA policy upload returned error: {body}"
268 )));
269 }
270 Ok(())
271 }
272
273 pub async fn delete_policy(&self, policy_id: &str) -> Result<()> {
275 let url = self.build_api_url("v1/policies", policy_id)?;
276 let mut request = self.http.delete(url);
277 if let Some(ref token) = self.config.auth_token {
278 request = request.bearer_auth(token);
279 }
280
281 let resp = request
282 .send()
283 .await
284 .map_err(|e| AuthError::internal(&format!("OPA policy delete failed: {e}")))?;
285
286 if !resp.status().is_success() {
287 let body = read_error_body(resp).await;
288 return Err(AuthError::internal(&format!(
289 "OPA policy delete returned error: {body}"
290 )));
291 }
292 Ok(())
293 }
294
295 pub async fn put_data(&self, data_path: &str, data: serde_json::Value) -> Result<()> {
297 let url = self.build_api_url("v1/data", data_path)?;
298 let mut request = self.http.put(url).json(&data);
299 if let Some(ref token) = self.config.auth_token {
300 request = request.bearer_auth(token);
301 }
302
303 let resp = request
304 .send()
305 .await
306 .map_err(|e| AuthError::internal(&format!("OPA data upload failed: {e}")))?;
307
308 if !resp.status().is_success() {
309 let body = read_error_body(resp).await;
310 return Err(AuthError::internal(&format!(
311 "OPA data upload error: {body}"
312 )));
313 }
314 Ok(())
315 }
316
317 fn build_static_url(&self, path: &str) -> Result<Url> {
318 self.base_url
319 .join(path)
320 .map_err(|e| AuthError::internal(&format!("Failed to build OPA URL: {e}")))
321 }
322
323 fn build_api_url(&self, prefix: &str, path: &str) -> Result<Url> {
324 let sanitized_path = sanitize_opa_path(path)?;
325 let joined = if sanitized_path.is_empty() {
326 prefix.to_string()
327 } else {
328 format!("{}/{}", prefix.trim_end_matches('/'), sanitized_path)
329 };
330 self.build_static_url(&joined)
331 }
332
333 pub async fn clear_cache(&self) {
335 self.cache.write().await.clear();
336 }
337
338 pub async fn cache_size(&self) -> usize {
340 self.cache.read().await.len()
341 }
342}
343
344fn now_secs() -> u64 {
345 std::time::SystemTime::now()
346 .duration_since(std::time::UNIX_EPOCH)
347 .unwrap_or_default()
348 .as_secs()
349}
350
351fn normalize_opa_base_url(base_url: &str) -> Result<Url> {
352 if base_url.is_empty() {
353 return Err(AuthError::validation("OPA base URL cannot be empty"));
354 }
355
356 let mut parsed = Url::parse(base_url)
357 .map_err(|e| AuthError::validation(format!("Invalid OPA base URL: {e}")))?;
358
359 if !matches!(parsed.scheme(), "http" | "https") {
360 return Err(AuthError::validation("OPA base URL must use http or https"));
361 }
362
363 if parsed.host_str().is_none() {
364 return Err(AuthError::validation("OPA base URL must include a host"));
365 }
366
367 if !parsed.username().is_empty() || parsed.password().is_some() {
368 return Err(AuthError::validation(
369 "OPA base URL must not embed credentials",
370 ));
371 }
372
373 if parsed.query().is_some() || parsed.fragment().is_some() {
374 return Err(AuthError::validation(
375 "OPA base URL must not include query parameters or fragments",
376 ));
377 }
378
379 if !parsed.path().ends_with('/') {
380 let new_path = format!("{}/", parsed.path().trim_end_matches('/'));
381 parsed.set_path(&new_path);
382 }
383
384 Ok(parsed)
385}
386
387fn sanitize_opa_path(path: &str) -> Result<String> {
388 let segments: Vec<&str> = path
389 .split('/')
390 .filter(|segment| !segment.is_empty())
391 .collect();
392
393 if segments.is_empty() {
394 return Err(AuthError::validation("OPA path cannot be empty"));
395 }
396
397 for segment in &segments {
398 if matches!(*segment, "." | "..")
399 || segment.contains('\\')
400 || segment.contains('?')
401 || segment.contains('#')
402 {
403 return Err(AuthError::validation("OPA path contains invalid segments"));
404 }
405 }
406
407 Ok(segments.join("/"))
408}
409
410async fn read_error_body(response: reqwest::Response) -> String {
411 match response.text().await {
412 Ok(body) if !body.is_empty() => body,
413 Ok(_) => "<empty response body>".to_string(),
414 Err(error) => format!("<failed to read response body: {error}>"),
415 }
416}
417
418pub struct LocalPolicyEvaluator {
425 rules: Vec<PolicyRule>,
426}
427
428#[derive(Debug, Clone, Serialize, Deserialize)]
430pub struct PolicyRule {
431 pub name: String,
433 pub conditions: Vec<PolicyCondition>,
435 pub effect: PolicyEffect,
437}
438
439#[derive(Debug, Clone, Serialize, Deserialize)]
441pub struct PolicyCondition {
442 pub attribute: String,
444 pub operator: ConditionOperator,
446 pub value: serde_json::Value,
448}
449
450#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
452#[serde(rename_all = "snake_case")]
453pub enum ConditionOperator {
454 Equals,
455 NotEquals,
456 Contains,
457 In,
458 Exists,
459}
460
461#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
463#[serde(rename_all = "lowercase")]
464pub enum PolicyEffect {
465 Allow,
466 Deny,
467}
468
469impl LocalPolicyEvaluator {
470 pub fn new() -> Self {
472 Self { rules: Vec::new() }
473 }
474
475 pub fn add_rule(&mut self, rule: PolicyRule) {
477 self.rules.push(rule);
478 }
479
480 pub fn evaluate(&self, input: &serde_json::Value) -> PolicyEffect {
485 let mut any_allow = false;
486
487 for rule in &self.rules {
488 if self.evaluate_rule(rule, input) {
489 match rule.effect {
490 PolicyEffect::Deny => return PolicyEffect::Deny,
491 PolicyEffect::Allow => any_allow = true,
492 }
493 }
494 }
495
496 if any_allow {
497 PolicyEffect::Allow
498 } else {
499 PolicyEffect::Deny
500 }
501 }
502
503 fn evaluate_rule(&self, rule: &PolicyRule, input: &serde_json::Value) -> bool {
504 rule.conditions
505 .iter()
506 .all(|cond| self.evaluate_condition(cond, input))
507 }
508
509 fn evaluate_condition(&self, cond: &PolicyCondition, input: &serde_json::Value) -> bool {
510 let actual = resolve_path(input, &cond.attribute);
511
512 match cond.operator {
513 ConditionOperator::Equals => match actual {
514 Some(v) => *v == cond.value,
515 None => false,
516 },
517 ConditionOperator::NotEquals => match actual {
518 Some(v) => *v != cond.value,
519 None => true,
520 },
521 ConditionOperator::Contains => match actual {
522 Some(v) => {
523 if let (Some(arr), Some(needle)) = (v.as_array(), cond.value.as_str()) {
524 arr.iter().any(|e| e.as_str() == Some(needle))
525 } else if let (Some(s), Some(needle)) = (v.as_str(), cond.value.as_str()) {
526 s.contains(needle)
527 } else {
528 false
529 }
530 }
531 None => false,
532 },
533 ConditionOperator::In => match actual {
534 Some(v) => {
535 if let Some(arr) = cond.value.as_array() {
536 arr.contains(v)
537 } else {
538 false
539 }
540 }
541 None => false,
542 },
543 ConditionOperator::Exists => actual.is_some(),
544 }
545 }
546}
547
548impl Default for LocalPolicyEvaluator {
549 fn default() -> Self {
550 Self::new()
551 }
552}
553
554fn resolve_path<'a>(value: &'a serde_json::Value, path: &str) -> Option<&'a serde_json::Value> {
556 let mut current = value;
557 for segment in path.split('.') {
558 current = current.get(segment)?;
559 }
560 Some(current)
561}
562
563#[cfg(test)]
564mod tests {
565 use super::*;
566
567 #[test]
570 fn test_config_defaults() {
571 let cfg = OpaConfig::default();
572 assert_eq!(cfg.base_url, "http://localhost:8181");
573 assert_eq!(cfg.default_policy_path, "authz/allow");
574 assert_eq!(cfg.timeout_secs, 5);
575 assert!(cfg.auth_token.is_none());
576 assert!(!cfg.enable_cache);
577 }
578
579 #[test]
582 fn test_response_is_allowed_true() {
583 let resp = OpaResponse {
584 result: serde_json::json!(true),
585 decision_id: None,
586 };
587 assert!(resp.is_allowed());
588 }
589
590 #[test]
591 fn test_response_is_allowed_false() {
592 let resp = OpaResponse {
593 result: serde_json::json!(false),
594 decision_id: None,
595 };
596 assert!(!resp.is_allowed());
597 }
598
599 #[test]
600 fn test_response_is_allowed_non_bool() {
601 let resp = OpaResponse {
602 result: serde_json::json!({"allow": true}),
603 decision_id: None,
604 };
605 assert!(!resp.is_allowed());
606 }
607
608 #[test]
609 fn test_response_get_bool() {
610 let resp = OpaResponse {
611 result: serde_json::json!({"authz": {"allow": true, "admin": false}}),
612 decision_id: Some("dec-1".to_string()),
613 };
614 assert_eq!(resp.get_bool("authz.allow"), Some(true));
615 assert_eq!(resp.get_bool("authz.admin"), Some(false));
616 assert_eq!(resp.get_bool("authz.missing"), None);
617 }
618
619 #[test]
620 fn test_response_get_str() {
621 let resp = OpaResponse {
622 result: serde_json::json!({"reason": "policy XYZ"}),
623 decision_id: None,
624 };
625 assert_eq!(resp.get_str("reason"), Some("policy XYZ"));
626 }
627
628 #[test]
631 fn test_client_creation_valid() {
632 let client = OpaClient::new(OpaConfig::default());
633 assert!(client.is_ok());
634 }
635
636 #[test]
637 fn test_client_creation_empty_url() {
638 let cfg = OpaConfig {
639 base_url: String::new(),
640 ..Default::default()
641 };
642 assert!(OpaClient::new(cfg).is_err());
643 }
644
645 #[test]
646 fn test_client_creation_rejects_embedded_credentials() {
647 let cfg = OpaConfig {
648 base_url: "https://user:pass@opa.example.com".to_string(),
649 ..Default::default()
650 };
651 assert!(OpaClient::new(cfg).is_err());
652 }
653
654 #[test]
655 fn test_client_creation_rejects_query_string_base_url() {
656 let cfg = OpaConfig {
657 base_url: "https://opa.example.com?target=internal".to_string(),
658 ..Default::default()
659 };
660 assert!(OpaClient::new(cfg).is_err());
661 }
662
663 #[test]
664 fn test_sanitize_opa_path_rejects_traversal() {
665 assert!(sanitize_opa_path("../system/main").is_err());
666 assert!(sanitize_opa_path("authz/../../admin").is_err());
667 }
668
669 #[test]
672 fn test_local_evaluator_default_deny() {
673 let eval = LocalPolicyEvaluator::new();
674 let input = serde_json::json!({"user": "alice"});
675 assert_eq!(eval.evaluate(&input), PolicyEffect::Deny);
676 }
677
678 #[test]
679 fn test_local_evaluator_allow_rule() {
680 let mut eval = LocalPolicyEvaluator::new();
681 eval.add_rule(PolicyRule {
682 name: "allow admins".to_string(),
683 conditions: vec![PolicyCondition {
684 attribute: "user.role".to_string(),
685 operator: ConditionOperator::Equals,
686 value: serde_json::json!("admin"),
687 }],
688 effect: PolicyEffect::Allow,
689 });
690
691 let input = serde_json::json!({"user": {"role": "admin"}});
692 assert_eq!(eval.evaluate(&input), PolicyEffect::Allow);
693
694 let input2 = serde_json::json!({"user": {"role": "viewer"}});
695 assert_eq!(eval.evaluate(&input2), PolicyEffect::Deny);
696 }
697
698 #[test]
699 fn test_local_evaluator_deny_overrides_allow() {
700 let mut eval = LocalPolicyEvaluator::new();
701 eval.add_rule(PolicyRule {
702 name: "allow all".to_string(),
703 conditions: vec![PolicyCondition {
704 attribute: "user.active".to_string(),
705 operator: ConditionOperator::Equals,
706 value: serde_json::json!(true),
707 }],
708 effect: PolicyEffect::Allow,
709 });
710 eval.add_rule(PolicyRule {
711 name: "deny blocked".to_string(),
712 conditions: vec![PolicyCondition {
713 attribute: "user.blocked".to_string(),
714 operator: ConditionOperator::Equals,
715 value: serde_json::json!(true),
716 }],
717 effect: PolicyEffect::Deny,
718 });
719
720 let input = serde_json::json!({"user": {"active": true, "blocked": true}});
721 assert_eq!(eval.evaluate(&input), PolicyEffect::Deny);
722 }
723
724 #[test]
725 fn test_local_evaluator_contains_operator() {
726 let mut eval = LocalPolicyEvaluator::new();
727 eval.add_rule(PolicyRule {
728 name: "role check".to_string(),
729 conditions: vec![PolicyCondition {
730 attribute: "user.roles".to_string(),
731 operator: ConditionOperator::Contains,
732 value: serde_json::json!("editor"),
733 }],
734 effect: PolicyEffect::Allow,
735 });
736
737 let input = serde_json::json!({"user": {"roles": ["viewer", "editor"]}});
738 assert_eq!(eval.evaluate(&input), PolicyEffect::Allow);
739
740 let input2 = serde_json::json!({"user": {"roles": ["viewer"]}});
741 assert_eq!(eval.evaluate(&input2), PolicyEffect::Deny);
742 }
743
744 #[test]
745 fn test_local_evaluator_in_operator() {
746 let mut eval = LocalPolicyEvaluator::new();
747 eval.add_rule(PolicyRule {
748 name: "allowed actions".to_string(),
749 conditions: vec![PolicyCondition {
750 attribute: "action".to_string(),
751 operator: ConditionOperator::In,
752 value: serde_json::json!(["read", "list"]),
753 }],
754 effect: PolicyEffect::Allow,
755 });
756
757 let input = serde_json::json!({"action": "read"});
758 assert_eq!(eval.evaluate(&input), PolicyEffect::Allow);
759
760 let input2 = serde_json::json!({"action": "delete"});
761 assert_eq!(eval.evaluate(&input2), PolicyEffect::Deny);
762 }
763
764 #[test]
765 fn test_local_evaluator_exists_operator() {
766 let mut eval = LocalPolicyEvaluator::new();
767 eval.add_rule(PolicyRule {
768 name: "has token".to_string(),
769 conditions: vec![PolicyCondition {
770 attribute: "auth.token".to_string(),
771 operator: ConditionOperator::Exists,
772 value: serde_json::json!(null),
773 }],
774 effect: PolicyEffect::Allow,
775 });
776
777 let input = serde_json::json!({"auth": {"token": "abc"}});
778 assert_eq!(eval.evaluate(&input), PolicyEffect::Allow);
779
780 let input2 = serde_json::json!({"auth": {}});
781 assert_eq!(eval.evaluate(&input2), PolicyEffect::Deny);
782 }
783
784 #[test]
785 fn test_local_evaluator_not_equals() {
786 let mut eval = LocalPolicyEvaluator::new();
787 eval.add_rule(PolicyRule {
788 name: "not guest".to_string(),
789 conditions: vec![PolicyCondition {
790 attribute: "user.role".to_string(),
791 operator: ConditionOperator::NotEquals,
792 value: serde_json::json!("guest"),
793 }],
794 effect: PolicyEffect::Allow,
795 });
796
797 let input = serde_json::json!({"user": {"role": "admin"}});
798 assert_eq!(eval.evaluate(&input), PolicyEffect::Allow);
799
800 let guest = serde_json::json!({"user": {"role": "guest"}});
801 assert_eq!(eval.evaluate(&guest), PolicyEffect::Deny);
802 }
803
804 #[test]
805 fn test_local_evaluator_multiple_conditions() {
806 let mut eval = LocalPolicyEvaluator::new();
807 eval.add_rule(PolicyRule {
808 name: "admin write".to_string(),
809 conditions: vec![
810 PolicyCondition {
811 attribute: "user.role".to_string(),
812 operator: ConditionOperator::Equals,
813 value: serde_json::json!("admin"),
814 },
815 PolicyCondition {
816 attribute: "action".to_string(),
817 operator: ConditionOperator::Equals,
818 value: serde_json::json!("write"),
819 },
820 ],
821 effect: PolicyEffect::Allow,
822 });
823
824 let input = serde_json::json!({"user": {"role": "admin"}, "action": "write"});
826 assert_eq!(eval.evaluate(&input), PolicyEffect::Allow);
827
828 let input2 = serde_json::json!({"user": {"role": "admin"}, "action": "read"});
830 assert_eq!(eval.evaluate(&input2), PolicyEffect::Deny);
831 }
832
833 #[test]
834 fn test_resolve_path() {
835 let v = serde_json::json!({"a": {"b": {"c": 42}}});
836 assert_eq!(resolve_path(&v, "a.b.c"), Some(&serde_json::json!(42)));
837 assert_eq!(resolve_path(&v, "a.b"), Some(&serde_json::json!({"c": 42})));
838 assert_eq!(resolve_path(&v, "x.y"), None);
839 }
840
841 #[test]
842 fn test_policy_rule_serialization() {
843 let rule = PolicyRule {
844 name: "test".to_string(),
845 conditions: vec![PolicyCondition {
846 attribute: "user.role".to_string(),
847 operator: ConditionOperator::Equals,
848 value: serde_json::json!("admin"),
849 }],
850 effect: PolicyEffect::Allow,
851 };
852 let json = serde_json::to_string(&rule).unwrap();
853 let parsed: PolicyRule = serde_json::from_str(&json).unwrap();
854 assert_eq!(parsed.name, "test");
855 assert_eq!(parsed.effect, PolicyEffect::Allow);
856 }
857
858 #[test]
859 fn test_opa_input_serialization() {
860 let input = OpaInput {
861 input: serde_json::json!({"user": "alice"}),
862 };
863 let json = serde_json::to_value(&input).unwrap();
864 assert_eq!(json["input"]["user"], "alice");
865 }
866}