Skip to main content

actrpc_interceptor/interceptors/policy/
interceptor.rs

1use crate::interceptors::policy::{
2    config::{PolicyConfig, PolicyEffect, PolicyReviewSeverity},
3    effect::effect_to_action,
4    engine::{MatchedPolicyRule, PolicyEngine},
5    error::PolicyError,
6};
7use actrpc_core::{
8    InterceptorInitialization,
9    action::{
10        ActionDescriptor, ActionKind, ActionSpec, RequestedAction, RequestedActionRecord,
11        ResolvedActionRecord,
12    },
13    interception::{InterceptionRequest, InterceptionResponse, InterceptorContinuation},
14};
15use actrpc_orchestrator::{
16    action::actions::{
17        exclude_interceptors::ExcludeInterceptors,
18        reject_call::RejectCall,
19        request_review::{
20            REVIEW_DECISION_APPROVED, REVIEW_DECISION_DENIED, RequestReview, RequestReviewParams,
21            RequestReviewResult,
22        },
23    },
24    interceptor::{Interceptor, InterceptorFuture},
25};
26use std::collections::HashMap;
27
28#[derive(Debug, Clone)]
29pub struct PolicyInterceptor {
30    engine: PolicyEngine,
31}
32
33impl PolicyInterceptor {
34    pub fn new(config: PolicyConfig) -> Result<Self, PolicyError> {
35        Ok(Self {
36            engine: PolicyEngine::compile(config)?,
37        })
38    }
39}
40
41impl Interceptor for PolicyInterceptor {
42    fn initialize<'a>(
43        &'a self,
44    ) -> InterceptorFuture<
45        'a,
46        Result<InterceptorInitialization, actrpc_orchestrator::error::InterceptorRuntimeError>,
47    >
48    where
49        Self: 'a,
50    {
51        Box::pin(async move {
52            Ok(InterceptorInitialization {
53                supports_outbound: true,
54                supports_inbound: true,
55                actions: action_descriptors(),
56            })
57        })
58    }
59
60    fn intercept<'a>(
61        &'a self,
62        request: &'a InterceptionRequest,
63    ) -> InterceptorFuture<
64        'a,
65        Result<InterceptionResponse, actrpc_orchestrator::error::InterceptorRuntimeError>,
66    >
67    where
68        Self: 'a,
69    {
70        Box::pin(async move {
71            self.intercept_inner(request).map_err(|error| {
72                actrpc_orchestrator::error::InterceptorRuntimeError::Request {
73                    message: error.to_string(),
74                }
75            })
76        })
77    }
78}
79
80impl PolicyInterceptor {
81    fn intercept_inner(
82        &self,
83        request: &InterceptionRequest,
84    ) -> Result<InterceptionResponse, PolicyError> {
85        let decision = self.engine.evaluate(request);
86        let review_results = collect_request_review_results(request)?;
87
88        let missing_review_actions =
89            missing_review_actions(&decision.matched_rules, &review_results)?;
90
91        if !missing_review_actions.is_empty() {
92            return Ok(InterceptionResponse {
93                actions: missing_review_actions,
94                continuation: InterceptorContinuation::Reinvoke,
95            });
96        }
97
98        let finalized_effects = finalized_effects(&decision.matched_rules, &review_results)?;
99
100        let mut actions = Vec::with_capacity(finalized_effects.len());
101
102        for effect in &finalized_effects {
103            actions.push(effect_to_action(effect)?);
104        }
105
106        Ok(InterceptionResponse {
107            actions,
108            continuation: InterceptorContinuation::Stop,
109        })
110    }
111}
112
113fn missing_review_actions(
114    matched_rules: &[MatchedPolicyRule],
115    review_results: &HashMap<String, RequestReviewResult>,
116) -> Result<Vec<RequestedActionRecord>, PolicyError> {
117    let mut actions = Vec::new();
118
119    for rule in matched_rules {
120        let Some(review) = &rule.apply.review else {
121            continue;
122        };
123
124        if review_results.contains_key(&rule.name) {
125            continue;
126        }
127
128        actions.push(request_review_action(RequestReviewParams {
129            rule_name: rule.name.clone(),
130            title: review.title.clone(),
131            reason: review.reason.clone(),
132            severity: review_severity_to_string(review.severity),
133        })?);
134    }
135
136    Ok(actions)
137}
138
139fn finalized_effects(
140    matched_rules: &[MatchedPolicyRule],
141    review_results: &HashMap<String, RequestReviewResult>,
142) -> Result<Vec<PolicyEffect>, PolicyError> {
143    let mut effects = Vec::new();
144
145    for rule in matched_rules {
146        effects.extend(rule.apply.immediate.clone());
147
148        let Some(review) = &rule.apply.review else {
149            continue;
150        };
151
152        let review_result =
153            review_results
154                .get(&rule.name)
155                .ok_or_else(|| PolicyError::InvalidReviewResult {
156                    message: format!(
157                        "policy rule {} requires review, but no review result was found",
158                        rule.name
159                    ),
160                })?;
161
162        match review_result.decision.as_str() {
163            decision if decision == REVIEW_DECISION_APPROVED => {
164                effects.extend(review.on_approve.clone());
165            }
166
167            decision if decision == REVIEW_DECISION_DENIED => {
168                effects.extend(review.effective_on_deny());
169            }
170
171            other => {
172                return Err(PolicyError::InvalidReviewResult {
173                    message: format!("unknown review decision for rule {}: {other}", rule.name),
174                });
175            }
176        }
177    }
178
179    Ok(effects)
180}
181
182fn collect_request_review_results(
183    request: &InterceptionRequest,
184) -> Result<HashMap<String, RequestReviewResult>, PolicyError> {
185    let mut results = HashMap::new();
186
187    for actions in request.iter_resolved_action_rounds() {
188        for action in actions {
189            if action.kind != RequestReview::action_kind() {
190                continue;
191            }
192
193            let params = decode_review_params(action)?;
194            let result = decode_review_result(action)?;
195
196            results.insert(params.rule_name, result);
197        }
198    }
199
200    Ok(results)
201}
202
203fn request_review_action(
204    params: RequestReviewParams,
205) -> Result<RequestedActionRecord, PolicyError> {
206    RequestedAction::<RequestReview> { params }
207        .try_into()
208        .map_err(|source| PolicyError::ActionEncoding { source })
209}
210
211fn decode_review_params(action: &ResolvedActionRecord) -> Result<RequestReviewParams, PolicyError> {
212    let Some(value) = &action.params else {
213        return Err(PolicyError::InvalidReviewResult {
214            message: "request_review action is missing params".to_owned(),
215        });
216    };
217
218    serde_json::from_value(value.clone()).map_err(|source| PolicyError::InvalidReviewResult {
219        message: format!("failed to decode request_review params: {source}"),
220    })
221}
222
223fn decode_review_result(action: &ResolvedActionRecord) -> Result<RequestReviewResult, PolicyError> {
224    let Ok(Some(value)) = &action.result else {
225        return Err(PolicyError::InvalidReviewResult {
226            message: "request_review action did not resolve successfully".to_owned(),
227        });
228    };
229
230    serde_json::from_value(value.clone()).map_err(|source| PolicyError::InvalidReviewResult {
231        message: format!("failed to decode request_review result: {source}"),
232    })
233}
234
235fn review_severity_to_string(severity: PolicyReviewSeverity) -> String {
236    match severity {
237        PolicyReviewSeverity::Low => "low".to_owned(),
238        PolicyReviewSeverity::Medium => "medium".to_owned(),
239        PolicyReviewSeverity::High => "high".to_owned(),
240    }
241}
242
243fn action_descriptors() -> HashMap<ActionKind, ActionDescriptor> {
244    actrpc_core::action::action_descriptor_map!(RejectCall, ExcludeInterceptors, RequestReview)
245}