1use std::sync::{
6 Mutex,
7 atomic::{AtomicU64, Ordering},
8};
9
10use crate::{
11 approval_ports::{ApprovalDispatchResponse, ApprovalDispatcher},
12 approval_records::{
13 ApprovalBrokerOutcome, ApprovalDecision, ApprovalLifecycleStatus, ApprovalRequest,
14 ApprovalTerminalStatus, approval_dispatch_intent_record, approval_dispatch_result_record,
15 },
16 domain::{AgentError, AgentErrorKind, ApprovalRequestId, RetryClassification, SourceKind},
17 effect::EffectTerminalStatus,
18 journal::{JournalRecord, JournalRecordBase, PendingSideEffect, RecoveryMarker},
19 journal_ports::RunJournal,
20};
21
22#[derive(Debug)]
23pub struct ApprovalBroker {
26 next_journal_seq: AtomicU64,
27 cancelled_before_dispatch: Mutex<Vec<(ApprovalRequestId, String)>>,
28}
29
30impl Default for ApprovalBroker {
31 fn default() -> Self {
32 Self {
33 next_journal_seq: AtomicU64::new(1),
34 cancelled_before_dispatch: Mutex::new(Vec::new()),
35 }
36 }
37}
38
39impl ApprovalBroker {
40 pub fn cancel_before_dispatch(
45 &self,
46 approval_request_id: ApprovalRequestId,
47 reason: impl Into<String>,
48 ) {
49 self.cancelled_before_dispatch
50 .lock()
51 .expect("approval cancellation lock")
52 .push((approval_request_id, reason.into()));
53 }
54
55 pub fn request_approval(
60 &self,
61 request: ApprovalRequest,
62 dispatcher: Option<&dyn ApprovalDispatcher>,
63 journal: &dyn RunJournal,
64 ) -> Result<ApprovalBrokerOutcome, AgentError> {
65 request.validate()?;
66
67 let intent_record =
68 approval_dispatch_intent_record(self.base_for_request(&request, "intent"), &request);
69 journal.append(intent_record).map_err(journal_failure)?;
70
71 if self
72 .take_cancelled_reason(&request.approval_request_id)
73 .is_some()
74 {
75 return self.append_terminal_result(
76 &request,
77 journal,
78 EffectTerminalStatus::Cancelled,
79 ApprovalTerminalStatus::Cancelled,
80 false,
81 None,
82 "approval.cancelled",
83 "approval cancelled",
84 );
85 }
86
87 let Some(dispatcher) = dispatcher else {
88 return self.append_terminal_result(
89 &request,
90 journal,
91 EffectTerminalStatus::Failed,
92 ApprovalTerminalStatus::Denied,
93 false,
94 None,
95 "missing.approval_dispatcher",
96 "approval dispatcher unavailable",
97 );
98 };
99
100 match dispatcher.dispatch(request.clone()) {
101 Ok(ApprovalDispatchResponse::Decision(decision)) => {
102 self.handle_decision(request, decision, journal)
103 }
104 Ok(ApprovalDispatchResponse::TimedOut) => self.append_terminal_result(
105 &request,
106 journal,
107 EffectTerminalStatus::TimedOut,
108 ApprovalTerminalStatus::TimedOut,
109 true,
110 None,
111 "approval.timeout",
112 "approval timed out",
113 ),
114 Ok(ApprovalDispatchResponse::Cancelled) => self.append_terminal_result(
115 &request,
116 journal,
117 EffectTerminalStatus::Cancelled,
118 ApprovalTerminalStatus::Cancelled,
119 true,
120 None,
121 "approval.cancelled",
122 "approval cancelled",
123 ),
124 Ok(ApprovalDispatchResponse::Unavailable { reason_code }) => self
125 .append_terminal_result(
126 &request,
127 journal,
128 EffectTerminalStatus::Failed,
129 ApprovalTerminalStatus::DispatcherUnavailable,
130 true,
131 None,
132 reason_code,
133 "approval dispatcher unavailable",
134 ),
135 Err(error) => self.append_terminal_result(
136 &request,
137 journal,
138 EffectTerminalStatus::Failed,
139 ApprovalTerminalStatus::DispatcherUnavailable,
140 true,
141 None,
142 "approval.dispatcher_error",
143 format!("approval dispatcher failed: {}", error.context().message),
144 ),
145 }
146 }
147
148 fn handle_decision(
149 &self,
150 request: ApprovalRequest,
151 decision: ApprovalDecision,
152 journal: &dyn RunJournal,
153 ) -> Result<ApprovalBrokerOutcome, AgentError> {
154 if is_extension_self_response(&request, &decision) {
155 return self.append_terminal_result(
156 &request,
157 journal,
158 EffectTerminalStatus::DeniedBeforeExecution,
159 ApprovalTerminalStatus::Denied,
160 true,
161 Some(ApprovalDecision::denied("approval.extension_self_response")),
162 "approval.extension_self_response",
163 "extension cannot approve its own action",
164 );
165 }
166
167 if !request.allows_decision(decision.kind()) {
168 return self.append_terminal_result(
169 &request,
170 journal,
171 EffectTerminalStatus::DeniedBeforeExecution,
172 ApprovalTerminalStatus::Denied,
173 true,
174 Some(ApprovalDecision::denied("approval.decision_not_allowed")),
175 "approval.decision_not_allowed",
176 "approval decision not allowed",
177 );
178 }
179
180 match decision {
181 ApprovalDecision::Approved { .. } => self.append_terminal_result(
182 &request,
183 journal,
184 EffectTerminalStatus::Completed,
185 ApprovalTerminalStatus::Approved,
186 true,
187 Some(decision),
188 "approval.approved",
189 "approval approved",
190 ),
191 ApprovalDecision::ApprovedForSession { .. } => self.append_terminal_result(
192 &request,
193 journal,
194 EffectTerminalStatus::Completed,
195 ApprovalTerminalStatus::ApprovedForSession,
196 true,
197 Some(decision),
198 "approval.approved_for_session",
199 "approval approved for session",
200 ),
201 ApprovalDecision::Denied { .. } => {
202 let reason_code = match &decision {
203 ApprovalDecision::Denied { reason_code, .. } => reason_code.clone(),
204 _ => unreachable!("matched denied decision"),
205 };
206 self.append_terminal_result(
207 &request,
208 journal,
209 EffectTerminalStatus::DeniedBeforeExecution,
210 ApprovalTerminalStatus::Denied,
211 true,
212 Some(decision),
213 reason_code,
214 "approval denied",
215 )
216 }
217 }
218 }
219
220 #[expect(
221 clippy::too_many_arguments,
222 reason = "approval terminal journaling needs explicit status, decision, and summary fields until this becomes a terminal-result command object"
223 )]
224 fn append_terminal_result(
225 &self,
226 request: &ApprovalRequest,
227 journal: &dyn RunJournal,
228 effect_status: EffectTerminalStatus,
229 terminal_status: ApprovalTerminalStatus,
230 dispatcher_contacted: bool,
231 decision: Option<ApprovalDecision>,
232 reason_code: impl Into<String>,
233 redacted_summary: impl Into<String>,
234 ) -> Result<ApprovalBrokerOutcome, AgentError> {
235 let result_base = self.base_for_request(request, "result");
236 let result_record = approval_dispatch_result_record(
237 result_base.clone(),
238 request,
239 approval_lifecycle_status(&terminal_status),
240 effect_status,
241 redacted_summary,
242 );
243 if let Err(error) = journal.append(result_record) {
244 if dispatcher_contacted {
245 self.append_recovery_after_result_failure(request, journal, result_base, error)?;
246 }
247 return Err(AgentError::new(
248 if dispatcher_contacted {
249 AgentErrorKind::RecoveryRepairNeeded
250 } else {
251 AgentErrorKind::JournalFailure
252 },
253 RetryClassification::RepairNeeded,
254 "approval dispatch terminal result append failed; tool execution remains blocked",
255 )
256 .with_subject(request.subject_ref()));
257 }
258 Ok(ApprovalBrokerOutcome {
259 approval_request_id: request.approval_request_id.clone(),
260 status: terminal_status,
261 decision,
262 reason_code: reason_code.into(),
263 })
264 }
265
266 fn base_for_request(&self, request: &ApprovalRequest, suffix: &str) -> JournalRecordBase {
267 let journal_seq = self.next_journal_seq.fetch_add(1, Ordering::SeqCst);
268 let mut base = JournalRecordBase::new(
269 journal_seq,
270 format!(
271 "journal.record.approval.{}.{}",
272 request.approval_request_id.as_str(),
273 suffix
274 ),
275 request.run_id.clone(),
276 request.agent_id.clone(),
277 request.source.clone(),
278 );
279 base.turn_id = Some(request.turn_id.clone());
280 base.destination = Some(request.destination.clone());
281 base.timestamp_millis = request.created_at_millis + journal_seq;
282 base.runtime_package_fingerprint = request.runtime_package_fingerprint.as_str().to_string();
283 base
284 }
285
286 fn append_recovery_after_result_failure(
287 &self,
288 request: &ApprovalRequest,
289 journal: &dyn RunJournal,
290 mut base: JournalRecordBase,
291 result_error: AgentError,
292 ) -> Result<(), AgentError> {
293 base.record_id = format!(
294 "journal.record.approval.{}.recovery",
295 request.approval_request_id.as_str()
296 );
297 let recovery = RecoveryMarker {
298 unsafe_pending: vec![PendingSideEffect {
299 effect_id: request.approval_dispatch_effect_id.clone(),
300 intent_record_id: format!(
301 "journal.record.approval.{}.intent",
302 request.approval_request_id.as_str()
303 ),
304 idempotency_key: None,
305 dedupe_key: None,
306 unsafe_pending_reason: format!(
307 "approval dispatcher was contacted but terminal result append failed: {}",
308 result_error.context().message
309 ),
310 }],
311 recovery_reason: "approval dispatch terminal result append failed".to_string(),
312 policy_refs: request.policy_refs.clone(),
313 };
314 let recovery_record = JournalRecord::recovery(base, recovery);
315 journal
316 .append(recovery_record)
317 .map(|_| ())
318 .map_err(|error| {
319 AgentError::new(
320 AgentErrorKind::RecoveryRepairNeeded,
321 RetryClassification::RepairNeeded,
322 format!(
323 "approval result append failed and recovery append failed: {}",
324 error.context().message
325 ),
326 )
327 .with_subject(request.subject_ref())
328 })
329 }
330
331 fn take_cancelled_reason(&self, approval_request_id: &ApprovalRequestId) -> Option<String> {
332 let mut cancelled = self
333 .cancelled_before_dispatch
334 .lock()
335 .expect("approval cancellation lock");
336 let index = cancelled
337 .iter()
338 .position(|(candidate, _)| candidate == approval_request_id)?;
339 Some(cancelled.remove(index).1)
340 }
341}
342
343fn approval_lifecycle_status(status: &ApprovalTerminalStatus) -> ApprovalLifecycleStatus {
344 match status {
345 ApprovalTerminalStatus::Approved => ApprovalLifecycleStatus::Approved,
346 ApprovalTerminalStatus::ApprovedForSession => ApprovalLifecycleStatus::ApprovedForSession,
347 ApprovalTerminalStatus::Denied => ApprovalLifecycleStatus::Denied,
348 ApprovalTerminalStatus::TimedOut => ApprovalLifecycleStatus::TimedOut,
349 ApprovalTerminalStatus::Cancelled => ApprovalLifecycleStatus::Cancelled,
350 ApprovalTerminalStatus::DispatcherUnavailable => {
351 ApprovalLifecycleStatus::DispatcherUnavailable
352 }
353 ApprovalTerminalStatus::RecoveryRequired => ApprovalLifecycleStatus::RecoveryRequired,
354 }
355}
356
357fn journal_failure(error: AgentError) -> AgentError {
358 AgentError::new(
359 AgentErrorKind::JournalFailure,
360 RetryClassification::RepairNeeded,
361 error.context().message,
362 )
363}
364
365fn is_extension_self_response(request: &ApprovalRequest, decision: &ApprovalDecision) -> bool {
366 if request.source.kind != SourceKind::Extension {
367 return false;
368 }
369 match decision {
370 ApprovalDecision::Approved { actor_ref }
371 | ApprovalDecision::ApprovedForSession { actor_ref } => actor_ref == &request.source,
372 ApprovalDecision::Denied {
373 actor_ref: Some(actor_ref),
374 ..
375 } => actor_ref == &request.source,
376 ApprovalDecision::Denied {
377 actor_ref: None, ..
378 } => false,
379 }
380}