Skip to main content

agent_sdk_core/application/
approval.rs

1//! Approval broker coordination. Use this module to turn policy decisions into
2//! host-dispatched approval requests. Broker methods may call a dispatcher and record
3//! cancellation or denial outcomes, but UI transport remains host-owned.
4//!
5use std::sync::{
6    Mutex,
7    atomic::{AtomicU64, Ordering},
8};
9
10use crate::{
11    approval_ports::{ApprovalDispatchResponse, ApprovalDispatcher},
12    approval_records::{
13        ApprovalBrokerOutcome, ApprovalDecision, ApprovalLifecycleStatus, ApprovalRequest,
14        ApprovalTerminalStatus, approval_dispatch_intent_record, approval_dispatch_result_record,
15    },
16    domain::{AgentError, AgentErrorKind, ApprovalRequestId, RetryClassification, SourceKind},
17    effect::EffectTerminalStatus,
18    journal::{JournalRecord, JournalRecordBase, PendingSideEffect, RecoveryMarker},
19    journal_ports::RunJournal,
20};
21
22#[derive(Debug)]
23/// Holds approval broker application-layer state or configuration.
24/// Use it with the documented coordinator methods; run, journal, event, provider, or port effects are called out on those methods rather than on construction.
25pub struct ApprovalBroker {
26    next_journal_seq: AtomicU64,
27    cancelled_before_dispatch: Mutex<Vec<(ApprovalRequestId, String)>>,
28}
29
30impl Default for ApprovalBroker {
31    fn default() -> Self {
32        Self {
33            next_journal_seq: AtomicU64::new(1),
34            cancelled_before_dispatch: Mutex::new(Vec::new()),
35        }
36    }
37}
38
39impl ApprovalBroker {
40    /// Coordinates cancel before dispatch for the application::approval
41    /// contract. This may call configured ports and update
42    /// runtime/journal/event state according to the surrounding module,
43    /// without introducing a parallel behavior path.
44    pub fn cancel_before_dispatch(
45        &self,
46        approval_request_id: ApprovalRequestId,
47        reason: impl Into<String>,
48    ) {
49        self.cancelled_before_dispatch
50            .lock()
51            .expect("approval cancellation lock")
52            .push((approval_request_id, reason.into()));
53    }
54
55    /// Coordinates request approval for the application::approval contract.
56    /// This may call configured ports and update runtime/journal/event state
57    /// according to the surrounding module, without introducing a parallel
58    /// behavior path.
59    pub fn request_approval(
60        &self,
61        request: ApprovalRequest,
62        dispatcher: Option<&dyn ApprovalDispatcher>,
63        journal: &dyn RunJournal,
64    ) -> Result<ApprovalBrokerOutcome, AgentError> {
65        request.validate()?;
66
67        let intent_record =
68            approval_dispatch_intent_record(self.base_for_request(&request, "intent"), &request);
69        journal.append(intent_record).map_err(journal_failure)?;
70
71        if self
72            .take_cancelled_reason(&request.approval_request_id)
73            .is_some()
74        {
75            return self.append_terminal_result(
76                &request,
77                journal,
78                EffectTerminalStatus::Cancelled,
79                ApprovalTerminalStatus::Cancelled,
80                false,
81                None,
82                "approval.cancelled",
83                "approval cancelled",
84            );
85        }
86
87        let Some(dispatcher) = dispatcher else {
88            return self.append_terminal_result(
89                &request,
90                journal,
91                EffectTerminalStatus::Failed,
92                ApprovalTerminalStatus::Denied,
93                false,
94                None,
95                "missing.approval_dispatcher",
96                "approval dispatcher unavailable",
97            );
98        };
99
100        match dispatcher.dispatch(request.clone()) {
101            Ok(ApprovalDispatchResponse::Decision(decision)) => {
102                self.handle_decision(request, decision, journal)
103            }
104            Ok(ApprovalDispatchResponse::TimedOut) => self.append_terminal_result(
105                &request,
106                journal,
107                EffectTerminalStatus::TimedOut,
108                ApprovalTerminalStatus::TimedOut,
109                true,
110                None,
111                "approval.timeout",
112                "approval timed out",
113            ),
114            Ok(ApprovalDispatchResponse::Cancelled) => self.append_terminal_result(
115                &request,
116                journal,
117                EffectTerminalStatus::Cancelled,
118                ApprovalTerminalStatus::Cancelled,
119                true,
120                None,
121                "approval.cancelled",
122                "approval cancelled",
123            ),
124            Ok(ApprovalDispatchResponse::Unavailable { reason_code }) => self
125                .append_terminal_result(
126                    &request,
127                    journal,
128                    EffectTerminalStatus::Failed,
129                    ApprovalTerminalStatus::DispatcherUnavailable,
130                    true,
131                    None,
132                    reason_code,
133                    "approval dispatcher unavailable",
134                ),
135            Err(error) => self.append_terminal_result(
136                &request,
137                journal,
138                EffectTerminalStatus::Failed,
139                ApprovalTerminalStatus::DispatcherUnavailable,
140                true,
141                None,
142                "approval.dispatcher_error",
143                format!("approval dispatcher failed: {}", error.context().message),
144            ),
145        }
146    }
147
148    fn handle_decision(
149        &self,
150        request: ApprovalRequest,
151        decision: ApprovalDecision,
152        journal: &dyn RunJournal,
153    ) -> Result<ApprovalBrokerOutcome, AgentError> {
154        if is_extension_self_response(&request, &decision) {
155            return self.append_terminal_result(
156                &request,
157                journal,
158                EffectTerminalStatus::DeniedBeforeExecution,
159                ApprovalTerminalStatus::Denied,
160                true,
161                Some(ApprovalDecision::denied("approval.extension_self_response")),
162                "approval.extension_self_response",
163                "extension cannot approve its own action",
164            );
165        }
166
167        if !request.allows_decision(decision.kind()) {
168            return self.append_terminal_result(
169                &request,
170                journal,
171                EffectTerminalStatus::DeniedBeforeExecution,
172                ApprovalTerminalStatus::Denied,
173                true,
174                Some(ApprovalDecision::denied("approval.decision_not_allowed")),
175                "approval.decision_not_allowed",
176                "approval decision not allowed",
177            );
178        }
179
180        match decision {
181            ApprovalDecision::Approved { .. } => self.append_terminal_result(
182                &request,
183                journal,
184                EffectTerminalStatus::Completed,
185                ApprovalTerminalStatus::Approved,
186                true,
187                Some(decision),
188                "approval.approved",
189                "approval approved",
190            ),
191            ApprovalDecision::ApprovedForSession { .. } => self.append_terminal_result(
192                &request,
193                journal,
194                EffectTerminalStatus::Completed,
195                ApprovalTerminalStatus::ApprovedForSession,
196                true,
197                Some(decision),
198                "approval.approved_for_session",
199                "approval approved for session",
200            ),
201            ApprovalDecision::Denied { .. } => {
202                let reason_code = match &decision {
203                    ApprovalDecision::Denied { reason_code, .. } => reason_code.clone(),
204                    _ => unreachable!("matched denied decision"),
205                };
206                self.append_terminal_result(
207                    &request,
208                    journal,
209                    EffectTerminalStatus::DeniedBeforeExecution,
210                    ApprovalTerminalStatus::Denied,
211                    true,
212                    Some(decision),
213                    reason_code,
214                    "approval denied",
215                )
216            }
217        }
218    }
219
220    #[expect(
221        clippy::too_many_arguments,
222        reason = "approval terminal journaling needs explicit status, decision, and summary fields until this becomes a terminal-result command object"
223    )]
224    fn append_terminal_result(
225        &self,
226        request: &ApprovalRequest,
227        journal: &dyn RunJournal,
228        effect_status: EffectTerminalStatus,
229        terminal_status: ApprovalTerminalStatus,
230        dispatcher_contacted: bool,
231        decision: Option<ApprovalDecision>,
232        reason_code: impl Into<String>,
233        redacted_summary: impl Into<String>,
234    ) -> Result<ApprovalBrokerOutcome, AgentError> {
235        let result_base = self.base_for_request(request, "result");
236        let result_record = approval_dispatch_result_record(
237            result_base.clone(),
238            request,
239            approval_lifecycle_status(&terminal_status),
240            effect_status,
241            redacted_summary,
242        );
243        if let Err(error) = journal.append(result_record) {
244            if dispatcher_contacted {
245                self.append_recovery_after_result_failure(request, journal, result_base, error)?;
246            }
247            return Err(AgentError::new(
248                if dispatcher_contacted {
249                    AgentErrorKind::RecoveryRepairNeeded
250                } else {
251                    AgentErrorKind::JournalFailure
252                },
253                RetryClassification::RepairNeeded,
254                "approval dispatch terminal result append failed; tool execution remains blocked",
255            )
256            .with_subject(request.subject_ref()));
257        }
258        Ok(ApprovalBrokerOutcome {
259            approval_request_id: request.approval_request_id.clone(),
260            status: terminal_status,
261            decision,
262            reason_code: reason_code.into(),
263        })
264    }
265
266    fn base_for_request(&self, request: &ApprovalRequest, suffix: &str) -> JournalRecordBase {
267        let journal_seq = self.next_journal_seq.fetch_add(1, Ordering::SeqCst);
268        let mut base = JournalRecordBase::new(
269            journal_seq,
270            format!(
271                "journal.record.approval.{}.{}",
272                request.approval_request_id.as_str(),
273                suffix
274            ),
275            request.run_id.clone(),
276            request.agent_id.clone(),
277            request.source.clone(),
278        );
279        base.turn_id = Some(request.turn_id.clone());
280        base.destination = Some(request.destination.clone());
281        base.timestamp_millis = request.created_at_millis + journal_seq;
282        base.runtime_package_fingerprint = request.runtime_package_fingerprint.as_str().to_string();
283        base
284    }
285
286    fn append_recovery_after_result_failure(
287        &self,
288        request: &ApprovalRequest,
289        journal: &dyn RunJournal,
290        mut base: JournalRecordBase,
291        result_error: AgentError,
292    ) -> Result<(), AgentError> {
293        base.record_id = format!(
294            "journal.record.approval.{}.recovery",
295            request.approval_request_id.as_str()
296        );
297        let recovery = RecoveryMarker {
298            unsafe_pending: vec![PendingSideEffect {
299                effect_id: request.approval_dispatch_effect_id.clone(),
300                intent_record_id: format!(
301                    "journal.record.approval.{}.intent",
302                    request.approval_request_id.as_str()
303                ),
304                idempotency_key: None,
305                dedupe_key: None,
306                unsafe_pending_reason: format!(
307                    "approval dispatcher was contacted but terminal result append failed: {}",
308                    result_error.context().message
309                ),
310            }],
311            recovery_reason: "approval dispatch terminal result append failed".to_string(),
312            policy_refs: request.policy_refs.clone(),
313        };
314        let recovery_record = JournalRecord::recovery(base, recovery);
315        journal
316            .append(recovery_record)
317            .map(|_| ())
318            .map_err(|error| {
319                AgentError::new(
320                    AgentErrorKind::RecoveryRepairNeeded,
321                    RetryClassification::RepairNeeded,
322                    format!(
323                        "approval result append failed and recovery append failed: {}",
324                        error.context().message
325                    ),
326                )
327                .with_subject(request.subject_ref())
328            })
329    }
330
331    fn take_cancelled_reason(&self, approval_request_id: &ApprovalRequestId) -> Option<String> {
332        let mut cancelled = self
333            .cancelled_before_dispatch
334            .lock()
335            .expect("approval cancellation lock");
336        let index = cancelled
337            .iter()
338            .position(|(candidate, _)| candidate == approval_request_id)?;
339        Some(cancelled.remove(index).1)
340    }
341}
342
343fn approval_lifecycle_status(status: &ApprovalTerminalStatus) -> ApprovalLifecycleStatus {
344    match status {
345        ApprovalTerminalStatus::Approved => ApprovalLifecycleStatus::Approved,
346        ApprovalTerminalStatus::ApprovedForSession => ApprovalLifecycleStatus::ApprovedForSession,
347        ApprovalTerminalStatus::Denied => ApprovalLifecycleStatus::Denied,
348        ApprovalTerminalStatus::TimedOut => ApprovalLifecycleStatus::TimedOut,
349        ApprovalTerminalStatus::Cancelled => ApprovalLifecycleStatus::Cancelled,
350        ApprovalTerminalStatus::DispatcherUnavailable => {
351            ApprovalLifecycleStatus::DispatcherUnavailable
352        }
353        ApprovalTerminalStatus::RecoveryRequired => ApprovalLifecycleStatus::RecoveryRequired,
354    }
355}
356
357fn journal_failure(error: AgentError) -> AgentError {
358    AgentError::new(
359        AgentErrorKind::JournalFailure,
360        RetryClassification::RepairNeeded,
361        error.context().message,
362    )
363}
364
365fn is_extension_self_response(request: &ApprovalRequest, decision: &ApprovalDecision) -> bool {
366    if request.source.kind != SourceKind::Extension {
367        return false;
368    }
369    match decision {
370        ApprovalDecision::Approved { actor_ref }
371        | ApprovalDecision::ApprovedForSession { actor_ref } => actor_ref == &request.source,
372        ApprovalDecision::Denied {
373            actor_ref: Some(actor_ref),
374            ..
375        } => actor_ref == &request.source,
376        ApprovalDecision::Denied {
377            actor_ref: None, ..
378        } => false,
379    }
380}