1use std::collections::{BTreeMap, HashMap};
2use std::sync::{LazyLock, Mutex};
3
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use serde_json::{Map, Value};
7use tokio::sync::broadcast;
8
9use crate::a2a::{A2aClientError, ResolvedA2aEndpoint};
10use crate::orchestration::{
11 extract_handoffs_from_json_value, handoff_artifact_record, ArtifactRecord, HandoffArtifact,
12 HandoffBudgetRemainingRecord, HandoffTargetRecord,
13};
14
15pub const EXTERNAL_AGENT_SCHEMA_ID: &str = "harn.external_agent.v1";
16pub const EXTERNAL_AGENT_HANDOFF_SCHEMA_ID: &str = "harn.external_agent.handoff.v1";
17pub const A2A_PLAN_METHOD: &str = "_harn/externalAgent.plan";
18pub const A2A_DISPATCH_METHOD: &str = "_harn/externalAgent.dispatch";
19
20static IDEMPOTENCY_CACHE: LazyLock<Mutex<HashMap<String, ExternalAgentDelegationEnvelope>>> =
21 LazyLock::new(|| Mutex::new(HashMap::new()));
22
23#[async_trait]
24pub trait ExternalAgent: Send + Sync {
25 async fn capabilities(
26 &self,
27 request: &ExternalAgentDelegationRequest,
28 cancel_rx: &mut broadcast::Receiver<()>,
29 ) -> Result<ExternalAgentPeer, ExternalAgentError>;
30
31 async fn plan(
32 &self,
33 peer: &ExternalAgentPeer,
34 request: &ExternalAgentDelegationRequest,
35 cancel_rx: &mut broadcast::Receiver<()>,
36 ) -> Result<Option<ExternalAgentPlanCheckpoint>, ExternalAgentError>;
37
38 async fn dispatch(
39 &self,
40 peer: &ExternalAgentPeer,
41 request: &ExternalAgentDelegationRequest,
42 checkpoint: &ExternalAgentPlanCheckpoint,
43 cancel_rx: &mut broadcast::Receiver<()>,
44 ) -> Result<Value, ExternalAgentError>;
45}
46
47#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
48#[serde(rename_all = "snake_case")]
49pub enum ExternalAgentTransport {
50 #[default]
51 A2a,
52}
53
54#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
55#[serde(default)]
56pub struct ExternalAgentBudget {
57 pub max_usd: Option<f64>,
58 pub max_tokens: Option<u64>,
59 pub max_seconds: Option<u64>,
60 pub max_tool_calls: Option<u64>,
61}
62
63#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
64#[serde(default)]
65pub struct ExternalAgentBudgetUsage {
66 #[serde(alias = "max_usd", alias = "dollars", alias = "cost_usd")]
67 pub usd: Option<f64>,
68 #[serde(alias = "max_tokens", alias = "token_count")]
69 pub tokens: Option<u64>,
70 #[serde(alias = "max_seconds", alias = "duration_seconds")]
71 pub seconds: Option<u64>,
72 #[serde(alias = "max_tool_calls")]
73 pub tool_calls: Option<u64>,
74}
75
76#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
77#[serde(default)]
78pub struct ExternalAgentCheckpointPolicy {
79 pub approved: bool,
80 pub approved_by: Option<String>,
81 pub allow_local_fallback: bool,
82 pub local_plan: Option<String>,
83}
84
85#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
86#[serde(default)]
87pub struct ExternalAgentDelegationRequest {
88 pub transport: ExternalAgentTransport,
89 pub target: String,
90 pub allow_cleartext: bool,
91 pub task: String,
92 pub budget: ExternalAgentBudget,
93 pub checkpoint: ExternalAgentCheckpointPolicy,
94 pub idempotency_key: Option<String>,
95 pub expected_scope: Vec<String>,
96 pub context: Value,
97 pub metadata: BTreeMap<String, Value>,
98}
99
100impl Default for ExternalAgentDelegationRequest {
101 fn default() -> Self {
102 Self {
103 transport: ExternalAgentTransport::A2a,
104 target: String::new(),
105 allow_cleartext: false,
106 task: String::new(),
107 budget: ExternalAgentBudget::default(),
108 checkpoint: ExternalAgentCheckpointPolicy::default(),
109 idempotency_key: None,
110 expected_scope: Vec::new(),
111 context: Value::Null,
112 metadata: BTreeMap::new(),
113 }
114 }
115}
116
117#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
118#[serde(default)]
119pub struct ExternalAgentCapabilities {
120 pub schema: Option<String>,
121 pub pre_dispatch_checkpoint: bool,
122 pub budget_cap: bool,
123 pub idempotency: bool,
124 pub reviewable_handoff: bool,
125 pub dispatch: bool,
126 pub operations: Vec<String>,
127 pub raw: Option<Value>,
128}
129
130#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
131#[serde(default)]
132pub struct ExternalAgentPeer {
133 pub transport: ExternalAgentTransport,
134 pub target: String,
135 pub card_url: String,
136 pub rpc_url: String,
137 pub agent_id: Option<String>,
138 pub target_agent: String,
139 pub capabilities: ExternalAgentCapabilities,
140 pub card: Value,
141}
142
143impl Default for ExternalAgentPeer {
144 fn default() -> Self {
145 Self {
146 transport: ExternalAgentTransport::A2a,
147 target: String::new(),
148 card_url: String::new(),
149 rpc_url: String::new(),
150 agent_id: None,
151 target_agent: String::new(),
152 capabilities: ExternalAgentCapabilities::default(),
153 card: Value::Null,
154 }
155 }
156}
157
158#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
159#[serde(default)]
160pub struct ExternalAgentPlanCheckpoint {
161 #[serde(rename = "_type")]
162 pub type_name: String,
163 pub schema: String,
164 pub checkpoint_id: String,
165 pub source: String,
166 pub plan: String,
167 pub expected_scope: Vec<String>,
168 pub budget: ExternalAgentBudget,
169 pub evidence_refs: Vec<Value>,
170 pub metadata: BTreeMap<String, Value>,
171}
172
173impl Default for ExternalAgentPlanCheckpoint {
174 fn default() -> Self {
175 Self {
176 type_name: "external_agent_checkpoint".to_string(),
177 schema: EXTERNAL_AGENT_SCHEMA_ID.to_string(),
178 checkpoint_id: new_id("external_checkpoint"),
179 source: "remote".to_string(),
180 plan: String::new(),
181 expected_scope: Vec::new(),
182 budget: ExternalAgentBudget::default(),
183 evidence_refs: Vec::new(),
184 metadata: BTreeMap::new(),
185 }
186 }
187}
188
189#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
190#[serde(default)]
191pub struct ExternalAgentDelegationEnvelope {
192 #[serde(rename = "_type")]
193 pub type_name: String,
194 pub schema: String,
195 pub id: String,
196 pub status: String,
197 pub error: Option<String>,
198 pub transport: ExternalAgentTransport,
199 pub target: String,
200 pub allow_cleartext: bool,
201 pub task: String,
202 pub budget: ExternalAgentBudget,
203 pub budget_used: Option<ExternalAgentBudgetUsage>,
204 pub idempotency_key: Option<String>,
205 pub capabilities: Option<ExternalAgentCapabilities>,
206 pub checkpoint: Option<ExternalAgentPlanCheckpoint>,
207 pub handoff: Option<Value>,
208 pub artifacts: Vec<Value>,
209 pub receipts: Vec<Value>,
210 pub evidence_refs: Vec<Value>,
211 pub result: Option<Value>,
212 pub replayed: bool,
213 pub replay_of: Option<String>,
214 pub metadata: BTreeMap<String, Value>,
215}
216
217impl Default for ExternalAgentDelegationEnvelope {
218 fn default() -> Self {
219 Self {
220 type_name: "external_agent_delegation".to_string(),
221 schema: EXTERNAL_AGENT_HANDOFF_SCHEMA_ID.to_string(),
222 id: new_id("external_delegate"),
223 status: "created".to_string(),
224 error: None,
225 transport: ExternalAgentTransport::A2a,
226 target: String::new(),
227 allow_cleartext: false,
228 task: String::new(),
229 budget: ExternalAgentBudget::default(),
230 budget_used: None,
231 idempotency_key: None,
232 capabilities: None,
233 checkpoint: None,
234 handoff: None,
235 artifacts: Vec::new(),
236 receipts: Vec::new(),
237 evidence_refs: Vec::new(),
238 result: None,
239 replayed: false,
240 replay_of: None,
241 metadata: BTreeMap::new(),
242 }
243 }
244}
245
246#[derive(Debug)]
247pub enum ExternalAgentError {
248 InvalidRequest(String),
249 Transport(String),
250 Protocol(String),
251}
252
253impl std::fmt::Display for ExternalAgentError {
254 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255 match self {
256 Self::InvalidRequest(message) | Self::Transport(message) | Self::Protocol(message) => {
257 f.write_str(message)
258 }
259 }
260 }
261}
262
263impl std::error::Error for ExternalAgentError {}
264
265impl From<A2aClientError> for ExternalAgentError {
266 fn from(error: A2aClientError) -> Self {
267 match error {
268 A2aClientError::InvalidTarget(message) => Self::InvalidRequest(message),
269 A2aClientError::Discovery(message)
270 | A2aClientError::Denied(message)
271 | A2aClientError::Timeout(message)
272 | A2aClientError::Cancelled(message) => Self::Transport(message),
273 A2aClientError::Protocol(message) => Self::Protocol(message),
274 }
275 }
276}
277
278pub fn reset_external_agent_state() {
279 idempotency_cache().clear();
280}
281
282pub async fn delegate_external_agent(
283 request: ExternalAgentDelegationRequest,
284 cancel_rx: &mut broadcast::Receiver<()>,
285) -> Result<ExternalAgentDelegationEnvelope, ExternalAgentError> {
286 validate_request(&request)?;
287 let idempotency_key = request.idempotency_key.as_deref().unwrap_or_default();
288 if let Some(cached) = cached_response(&request, idempotency_key) {
289 return Ok(cached);
290 }
291
292 match request.transport {
293 ExternalAgentTransport::A2a => {
294 let agent = A2aExternalAgent;
295 delegate_with_agent(&agent, request, cancel_rx).await
296 }
297 }
298}
299
300async fn delegate_with_agent<A: ExternalAgent>(
301 agent: &A,
302 request: ExternalAgentDelegationRequest,
303 cancel_rx: &mut broadcast::Receiver<()>,
304) -> Result<ExternalAgentDelegationEnvelope, ExternalAgentError> {
305 let idempotency_key = request.idempotency_key.as_deref().unwrap_or_default();
306 let cached_checkpoint = cached_checkpoint(idempotency_key);
307 let peer = agent.capabilities(&request, cancel_rx).await?;
308 let mut base = base_envelope(&request, &peer);
309
310 let missing = missing_dispatch_capabilities(&peer.capabilities);
311 if !missing.is_empty() {
312 return refuse_and_cache(
313 base,
314 &format!(
315 "external agent is missing required delegation capabilities: {}",
316 missing.join(", ")
317 ),
318 );
319 }
320
321 let checkpoint = if let Some(checkpoint) = cached_checkpoint {
322 checkpoint
323 } else if peer.capabilities.pre_dispatch_checkpoint {
324 match agent.plan(&peer, &request, cancel_rx).await? {
325 Some(checkpoint) => checkpoint,
326 None => {
327 return refuse_and_cache(
328 base,
329 "external agent did not return a pre-dispatch checkpoint",
330 );
331 }
332 }
333 } else if request.checkpoint.allow_local_fallback {
334 synthesize_local_checkpoint(&request)
335 } else {
336 return refuse_and_cache(
337 base,
338 "external agent does not advertise pre-dispatch checkpoint support",
339 );
340 };
341
342 base.checkpoint = Some(checkpoint.clone());
343 if !request.checkpoint.approved {
344 base.status = "checkpoint_required".to_string();
345 cache_envelope(idempotency_key, &base);
346 return Ok(base);
347 }
348
349 let result = agent
350 .dispatch(&peer, &request, &checkpoint, cancel_rx)
351 .await?;
352 let budget_used = parse_budget_usage(&result);
353 let exceeded = budget_used
354 .as_ref()
355 .is_some_and(|used| budget_exceeded(&request.budget, used));
356 base.status = if exceeded {
357 "budget_exceeded".to_string()
358 } else {
359 "completed".to_string()
360 };
361 if exceeded {
362 base.error = Some("external agent reported usage over the approved budget cap".to_string());
363 }
364 base.budget_used = budget_used;
365 base.result = Some(result.clone());
366 base.receipts = extract_value_array(&result, &["receipts", "receipt_links"]);
367 base.evidence_refs = extract_value_array(&result, &["evidence_refs", "evidence"]);
368 let handoff = first_handoff_or_synthesize(&request, &peer, &checkpoint, &result);
369 let handoff_json = serde_json::to_value(&handoff).unwrap_or(Value::Null);
370 base.handoff = Some(handoff_json);
371 base.artifacts = reviewable_artifacts(&handoff, &result);
372 cache_envelope(idempotency_key, &base);
373 Ok(base)
374}
375
376pub struct A2aExternalAgent;
377
378#[async_trait]
379impl ExternalAgent for A2aExternalAgent {
380 async fn capabilities(
381 &self,
382 request: &ExternalAgentDelegationRequest,
383 cancel_rx: &mut broadcast::Receiver<()>,
384 ) -> Result<ExternalAgentPeer, ExternalAgentError> {
385 let target = normalize_a2a_target(&request.target);
386 let resolved =
387 crate::a2a::resolve_agent(&target, request.allow_cleartext, cancel_rx).await?;
388 Ok(peer_from_a2a(
389 &request.target,
390 resolved.endpoint,
391 resolved.card,
392 ))
393 }
394
395 async fn plan(
396 &self,
397 peer: &ExternalAgentPeer,
398 request: &ExternalAgentDelegationRequest,
399 cancel_rx: &mut broadcast::Receiver<()>,
400 ) -> Result<Option<ExternalAgentPlanCheckpoint>, ExternalAgentError> {
401 let request_id = format!(
402 "{}.plan",
403 request
404 .idempotency_key
405 .as_deref()
406 .unwrap_or("external-agent")
407 );
408 let response = send_a2a_rpc(
409 peer,
410 request_id,
411 A2A_PLAN_METHOD,
412 serde_json::json!({
413 "schema": EXTERNAL_AGENT_SCHEMA_ID,
414 "target_agent": peer.target_agent,
415 "task": request.task,
416 "budget": request.budget,
417 "idempotency_key": request.idempotency_key,
418 "expected_scope": request.expected_scope,
419 "context": request.context,
420 "metadata": request.metadata,
421 }),
422 cancel_rx,
423 )
424 .await?;
425 Ok(parse_remote_checkpoint(&response, request))
426 }
427
428 async fn dispatch(
429 &self,
430 peer: &ExternalAgentPeer,
431 request: &ExternalAgentDelegationRequest,
432 checkpoint: &ExternalAgentPlanCheckpoint,
433 cancel_rx: &mut broadcast::Receiver<()>,
434 ) -> Result<Value, ExternalAgentError> {
435 let request_id = format!(
436 "{}.dispatch",
437 request
438 .idempotency_key
439 .as_deref()
440 .unwrap_or("external-agent")
441 );
442 send_a2a_rpc(
443 peer,
444 request_id,
445 A2A_DISPATCH_METHOD,
446 serde_json::json!({
447 "schema": EXTERNAL_AGENT_SCHEMA_ID,
448 "target_agent": peer.target_agent,
449 "task": request.task,
450 "budget": request.budget,
451 "idempotency_key": request.idempotency_key,
452 "checkpoint": checkpoint,
453 "expected_scope": request.expected_scope,
454 "context": request.context,
455 "metadata": request.metadata,
456 }),
457 cancel_rx,
458 )
459 .await
460 }
461}
462
463fn validate_request(request: &ExternalAgentDelegationRequest) -> Result<(), ExternalAgentError> {
464 if request.target.trim().is_empty() {
465 return Err(ExternalAgentError::InvalidRequest(
466 "external_agent_delegate: target is required".to_string(),
467 ));
468 }
469 if request.task.trim().is_empty() {
470 return Err(ExternalAgentError::InvalidRequest(
471 "external_agent_delegate: task is required".to_string(),
472 ));
473 }
474 let Some(idempotency_key) = request.idempotency_key.as_deref() else {
475 return Err(ExternalAgentError::InvalidRequest(
476 "external_agent_delegate: idempotency_key is required".to_string(),
477 ));
478 };
479 if idempotency_key.trim().is_empty() {
480 return Err(ExternalAgentError::InvalidRequest(
481 "external_agent_delegate: idempotency_key is required".to_string(),
482 ));
483 }
484 if !budget_has_cap(&request.budget) {
485 return Err(ExternalAgentError::InvalidRequest(
486 "external_agent_delegate: budget must include at least one positive cap".to_string(),
487 ));
488 }
489 Ok(())
490}
491
492fn budget_has_cap(budget: &ExternalAgentBudget) -> bool {
493 budget.max_usd.is_some_and(|value| value > 0.0)
494 || budget.max_tokens.is_some_and(|value| value > 0)
495 || budget.max_seconds.is_some_and(|value| value > 0)
496 || budget.max_tool_calls.is_some_and(|value| value > 0)
497}
498
499fn cached_response(
500 request: &ExternalAgentDelegationRequest,
501 idempotency_key: &str,
502) -> Option<ExternalAgentDelegationEnvelope> {
503 let cached = idempotency_cache().get(idempotency_key).cloned()?;
504 if cached.status == "checkpoint_required" && request.checkpoint.approved {
505 return None;
506 }
507 if cached.status == "checkpoint_required" {
508 return Some(cached);
509 }
510 Some(replay_envelope(cached))
511}
512
513fn cached_checkpoint(idempotency_key: &str) -> Option<ExternalAgentPlanCheckpoint> {
514 idempotency_cache()
515 .get(idempotency_key)
516 .filter(|envelope| envelope.status == "checkpoint_required")
517 .and_then(|envelope| envelope.checkpoint.clone())
518}
519
520fn cache_envelope(idempotency_key: &str, envelope: &ExternalAgentDelegationEnvelope) {
521 idempotency_cache().insert(idempotency_key.to_string(), envelope.clone());
522}
523
524fn idempotency_cache(
525) -> std::sync::MutexGuard<'static, HashMap<String, ExternalAgentDelegationEnvelope>> {
526 IDEMPOTENCY_CACHE
527 .lock()
528 .unwrap_or_else(|poisoned| poisoned.into_inner())
529}
530
531fn replay_envelope(original: ExternalAgentDelegationEnvelope) -> ExternalAgentDelegationEnvelope {
532 let mut replayed = original.clone();
533 replayed.id = new_id("external_delegate");
534 replayed.status = "replayed".to_string();
535 replayed.replayed = true;
536 replayed.replay_of = Some(original.id);
537 replayed
538}
539
540fn base_envelope(
541 request: &ExternalAgentDelegationRequest,
542 peer: &ExternalAgentPeer,
543) -> ExternalAgentDelegationEnvelope {
544 let mut metadata = request.metadata.clone();
545 metadata.insert("card_url".to_string(), Value::String(peer.card_url.clone()));
546 metadata.insert("rpc_url".to_string(), Value::String(peer.rpc_url.clone()));
547 if let Some(agent_id) = peer.agent_id.as_ref() {
548 metadata.insert("agent_id".to_string(), Value::String(agent_id.clone()));
549 }
550 ExternalAgentDelegationEnvelope {
551 status: "ready".to_string(),
552 transport: request.transport.clone(),
553 target: request.target.clone(),
554 allow_cleartext: request.allow_cleartext,
555 task: request.task.clone(),
556 budget: request.budget.clone(),
557 idempotency_key: request.idempotency_key.clone(),
558 capabilities: Some(peer.capabilities.clone()),
559 metadata,
560 ..ExternalAgentDelegationEnvelope::default()
561 }
562}
563
564fn refuse_and_cache(
565 mut envelope: ExternalAgentDelegationEnvelope,
566 reason: &str,
567) -> Result<ExternalAgentDelegationEnvelope, ExternalAgentError> {
568 envelope.status = "refused".to_string();
569 envelope.error = Some(reason.to_string());
570 if let Some(key) = envelope.idempotency_key.clone() {
571 cache_envelope(&key, &envelope);
572 }
573 Ok(envelope)
574}
575
576fn missing_dispatch_capabilities(capabilities: &ExternalAgentCapabilities) -> Vec<&'static str> {
577 let mut missing = Vec::new();
578 if !capabilities.dispatch {
579 missing.push("dispatch");
580 }
581 if !capabilities.budget_cap {
582 missing.push("budget_cap");
583 }
584 if !capabilities.idempotency {
585 missing.push("idempotency");
586 }
587 if !capabilities.reviewable_handoff {
588 missing.push("reviewable_handoff");
589 }
590 missing
591}
592
593fn peer_from_a2a(target: &str, endpoint: ResolvedA2aEndpoint, card: Value) -> ExternalAgentPeer {
594 ExternalAgentPeer {
595 transport: ExternalAgentTransport::A2a,
596 target: target.to_string(),
597 card_url: endpoint.card_url,
598 rpc_url: endpoint.rpc_url,
599 agent_id: endpoint.agent_id,
600 target_agent: endpoint.target_agent,
601 capabilities: capabilities_from_card(&card),
602 card,
603 }
604}
605
606fn capabilities_from_card(card: &Value) -> ExternalAgentCapabilities {
607 let mut capabilities = ExternalAgentCapabilities::default();
608 let raw = external_agent_metadata(card);
609 if let Some(raw) = raw {
610 capabilities.raw = Some(Value::Object(raw.clone()));
611 capabilities.schema = string_field(raw, &["schema", "schema_id"])
612 .or_else(|| Some(EXTERNAL_AGENT_SCHEMA_ID.to_string()));
613 capabilities.pre_dispatch_checkpoint = bool_field(
614 raw,
615 &[
616 "pre_dispatch_checkpoint",
617 "preDispatchCheckpoint",
618 "checkpoint",
619 ],
620 );
621 capabilities.budget_cap = bool_field(raw, &["budget_cap", "budgetCap"]);
622 capabilities.idempotency = bool_field(raw, &["idempotency", "idempotency_key"]);
623 capabilities.reviewable_handoff =
624 bool_field(raw, &["reviewable_handoff", "reviewableHandoff", "handoff"]);
625 capabilities.dispatch = bool_field(raw, &["dispatch"]);
626 capabilities.operations =
627 strings_field(raw, &["operations", "methods", "extensionMethods"]);
628 }
629
630 for operation in extension_operations(card) {
631 if !capabilities.operations.contains(&operation) {
632 capabilities.operations.push(operation);
633 }
634 }
635 if capabilities
636 .operations
637 .iter()
638 .any(|op| op == A2A_PLAN_METHOD)
639 {
640 capabilities.pre_dispatch_checkpoint = true;
641 }
642 if capabilities
643 .operations
644 .iter()
645 .any(|op| op == A2A_DISPATCH_METHOD)
646 {
647 capabilities.dispatch = true;
648 }
649 if card_contains_schema(card, EXTERNAL_AGENT_SCHEMA_ID) {
650 capabilities.schema = Some(EXTERNAL_AGENT_SCHEMA_ID.to_string());
651 }
652 capabilities
653}
654
655fn external_agent_metadata(card: &Value) -> Option<&Map<String, Value>> {
656 object_at(card, &["_meta", "harn", "externalAgent"])
657 .or_else(|| object_at(card, &["_meta", "harn", "external_agent"]))
658 .or_else(|| object_at(card, &["capabilities", "_meta", "harn", "externalAgent"]))
659 .or_else(|| object_at(card, &["capabilities", "_meta", "harn", "external_agent"]))
660 .or_else(|| object_at(card, &["capabilities", "externalAgent"]))
661 .or_else(|| object_at(card, &["capabilities", "external_agent"]))
662}
663
664fn object_at<'a>(value: &'a Value, path: &[&str]) -> Option<&'a Map<String, Value>> {
665 let mut cursor = value;
666 for key in path {
667 cursor = cursor.get(*key)?;
668 }
669 cursor.as_object()
670}
671
672fn bool_field(object: &Map<String, Value>, keys: &[&str]) -> bool {
673 keys.iter()
674 .any(|key| object.get(*key).and_then(Value::as_bool).unwrap_or(false))
675}
676
677fn string_field(object: &Map<String, Value>, keys: &[&str]) -> Option<String> {
678 keys.iter().find_map(|key| {
679 object
680 .get(*key)
681 .and_then(Value::as_str)
682 .map(str::to_string)
683 .filter(|value| !value.is_empty())
684 })
685}
686
687fn strings_field(object: &Map<String, Value>, keys: &[&str]) -> Vec<String> {
688 keys.iter()
689 .find_map(|key| value_to_strings(object.get(*key)?))
690 .unwrap_or_default()
691}
692
693fn value_to_strings(value: &Value) -> Option<Vec<String>> {
694 match value {
695 Value::Array(items) => Some(
696 items
697 .iter()
698 .filter_map(Value::as_str)
699 .map(str::to_string)
700 .filter(|value| !value.is_empty())
701 .collect(),
702 ),
703 Value::String(value) if !value.is_empty() => Some(vec![value.clone()]),
704 _ => None,
705 }
706}
707
708fn extension_operations(card: &Value) -> Vec<String> {
709 let mut operations = Vec::new();
710 for path in [
711 &["capabilities", "extensions"][..],
712 &["_meta", "harn", "extensions"][..],
713 &["extensions"][..],
714 ] {
715 let Some(items) = value_at(card, path).and_then(Value::as_array) else {
716 continue;
717 };
718 for item in items {
719 match item {
720 Value::String(value) if value == EXTERNAL_AGENT_SCHEMA_ID => {
721 operations.push(A2A_PLAN_METHOD.to_string());
722 operations.push(A2A_DISPATCH_METHOD.to_string());
723 }
724 Value::String(value)
725 if value == A2A_PLAN_METHOD || value == A2A_DISPATCH_METHOD =>
726 {
727 operations.push(value.clone());
728 }
729 Value::Object(object) => {
730 if object
731 .values()
732 .filter_map(Value::as_str)
733 .any(|value| value == EXTERNAL_AGENT_SCHEMA_ID)
734 {
735 operations.push(A2A_PLAN_METHOD.to_string());
736 operations.push(A2A_DISPATCH_METHOD.to_string());
737 }
738 operations.extend(strings_field(object, &["operations", "methods"]));
739 }
740 _ => {}
741 }
742 }
743 }
744 operations.sort();
745 operations.dedup();
746 operations
747}
748
749fn value_at<'a>(value: &'a Value, path: &[&str]) -> Option<&'a Value> {
750 let mut cursor = value;
751 for key in path {
752 cursor = cursor.get(*key)?;
753 }
754 Some(cursor)
755}
756
757fn card_contains_schema(card: &Value, schema: &str) -> bool {
758 match card {
759 Value::String(value) => value == schema,
760 Value::Array(items) => items
761 .iter()
762 .any(|value| card_contains_schema(value, schema)),
763 Value::Object(object) => object
764 .values()
765 .any(|value| card_contains_schema(value, schema)),
766 _ => false,
767 }
768}
769
770fn normalize_a2a_target(target: &str) -> String {
771 target
772 .trim()
773 .strip_prefix("a2a://")
774 .unwrap_or_else(|| target.trim())
775 .to_string()
776}
777
778async fn send_a2a_rpc(
779 peer: &ExternalAgentPeer,
780 request_id: String,
781 method: &str,
782 params: Value,
783 cancel_rx: &mut broadcast::Receiver<()>,
784) -> Result<Value, ExternalAgentError> {
785 let request = crate::jsonrpc::request(request_id.clone(), method, params);
786 let body =
787 crate::a2a::send_jsonrpc_request(&peer.rpc_url, &request, &request_id, cancel_rx).await?;
788 if let Some(error) = body.get("error") {
789 let message = error
790 .get("message")
791 .and_then(Value::as_str)
792 .unwrap_or("unknown external agent error");
793 return Err(ExternalAgentError::Protocol(format!(
794 "{method} failed: {message}"
795 )));
796 }
797 body.get("result")
798 .cloned()
799 .ok_or_else(|| ExternalAgentError::Protocol(format!("{method} response missing result")))
800}
801
802fn parse_remote_checkpoint(
803 result: &Value,
804 request: &ExternalAgentDelegationRequest,
805) -> Option<ExternalAgentPlanCheckpoint> {
806 let checkpoint_value = result.get("checkpoint").unwrap_or(result);
807 let plan = checkpoint_value
808 .get("plan")
809 .or_else(|| checkpoint_value.get("summary"))
810 .and_then(Value::as_str)
811 .map(str::trim)
812 .filter(|value| !value.is_empty())?;
813 let expected_scope = string_array_field(checkpoint_value, &["expected_scope", "scope"])
814 .filter(|items| !items.is_empty())
815 .unwrap_or_else(|| request.expected_scope.clone());
816 let checkpoint_id = checkpoint_value
817 .get("checkpoint_id")
818 .or_else(|| checkpoint_value.get("id"))
819 .and_then(Value::as_str)
820 .map(str::to_string)
821 .filter(|value| !value.is_empty())
822 .unwrap_or_else(|| new_id("external_checkpoint"));
823 let evidence_refs = extract_value_array(checkpoint_value, &["evidence_refs", "evidence"]);
824 let metadata = checkpoint_value
825 .get("metadata")
826 .and_then(Value::as_object)
827 .map(|object| {
828 object
829 .iter()
830 .map(|(key, value)| (key.clone(), value.clone()))
831 .collect()
832 })
833 .unwrap_or_default();
834 Some(ExternalAgentPlanCheckpoint {
835 checkpoint_id,
836 plan: plan.to_string(),
837 expected_scope,
838 budget: request.budget.clone(),
839 evidence_refs,
840 metadata,
841 ..ExternalAgentPlanCheckpoint::default()
842 })
843}
844
845fn synthesize_local_checkpoint(
846 request: &ExternalAgentDelegationRequest,
847) -> ExternalAgentPlanCheckpoint {
848 let plan = request.checkpoint.local_plan.clone().unwrap_or_else(|| {
849 let scope = if request.expected_scope.is_empty() {
850 "Remote agent must state any files or entities before mutating them.".to_string()
851 } else {
852 format!(
853 "Remote agent may work only within: {}.",
854 request.expected_scope.join(", ")
855 )
856 };
857 format!(
858 "Delegate task after local approval. Task: {} {scope}",
859 request.task
860 )
861 });
862 ExternalAgentPlanCheckpoint {
863 source: "local_fallback".to_string(),
864 plan,
865 expected_scope: request.expected_scope.clone(),
866 budget: request.budget.clone(),
867 ..ExternalAgentPlanCheckpoint::default()
868 }
869}
870
871fn parse_budget_usage(result: &Value) -> Option<ExternalAgentBudgetUsage> {
872 let value = result
873 .get("budget_used")
874 .or_else(|| result.pointer("/budget/used"))
875 .or_else(|| result.get("usage"))?;
876 serde_json::from_value(value.clone()).ok()
877}
878
879fn budget_exceeded(budget: &ExternalAgentBudget, used: &ExternalAgentBudgetUsage) -> bool {
880 budget
881 .max_usd
882 .zip(used.usd)
883 .is_some_and(|(cap, used)| used > cap)
884 || budget
885 .max_tokens
886 .zip(used.tokens)
887 .is_some_and(|(cap, used)| used > cap)
888 || budget
889 .max_seconds
890 .zip(used.seconds)
891 .is_some_and(|(cap, used)| used > cap)
892 || budget
893 .max_tool_calls
894 .zip(used.tool_calls)
895 .is_some_and(|(cap, used)| used > cap)
896}
897
898fn first_handoff_or_synthesize(
899 request: &ExternalAgentDelegationRequest,
900 peer: &ExternalAgentPeer,
901 checkpoint: &ExternalAgentPlanCheckpoint,
902 result: &Value,
903) -> HandoffArtifact {
904 extract_handoffs_from_json_value(result)
905 .into_iter()
906 .next()
907 .unwrap_or_else(|| synthesize_handoff(request, peer, checkpoint, result))
908}
909
910fn synthesize_handoff(
911 request: &ExternalAgentDelegationRequest,
912 peer: &ExternalAgentPeer,
913 checkpoint: &ExternalAgentPlanCheckpoint,
914 result: &Value,
915) -> HandoffArtifact {
916 let files = string_array_field(result, &["files_or_entities_touched", "files", "paths"])
917 .filter(|items| !items.is_empty())
918 .unwrap_or_else(|| checkpoint.expected_scope.clone());
919 let confidence = result.get("confidence").and_then(Value::as_f64);
920 let mut metadata = BTreeMap::new();
921 metadata.insert(
922 "schema".to_string(),
923 Value::String(EXTERNAL_AGENT_HANDOFF_SCHEMA_ID.to_string()),
924 );
925 metadata.insert("card_url".to_string(), Value::String(peer.card_url.clone()));
926 metadata.insert("rpc_url".to_string(), Value::String(peer.rpc_url.clone()));
927 metadata.insert(
928 "checkpoint_id".to_string(),
929 Value::String(checkpoint.checkpoint_id.clone()),
930 );
931 if let Some(key) = request.idempotency_key.as_ref() {
932 metadata.insert("idempotency_key".to_string(), Value::String(key.clone()));
933 }
934 HandoffArtifact {
935 type_name: "handoff_artifact".to_string(),
936 kind: "external_agent_delegation".to_string(),
937 id: new_id("handoff"),
938 source_persona: "external_agent".to_string(),
939 target_persona_or_human: HandoffTargetRecord {
940 kind: "a2a".to_string(),
941 id: peer.agent_id.clone().or_else(|| Some(peer.target.clone())),
942 label: Some(peer.target_agent.clone()).filter(|value| !value.is_empty()),
943 uri: Some(peer.card_url.clone()),
944 },
945 task: request.task.clone(),
946 reason: "External agent returned delegated work for review.".to_string(),
947 files_or_entities_touched: files,
948 requested_capabilities: peer.capabilities.operations.clone(),
949 allowed_side_effects: vec![
950 "reviewable_handoff".to_string(),
951 "reviewable_diff".to_string(),
952 ],
953 budget_remaining: budget_remaining(&request.budget, parse_budget_usage(result).as_ref()),
954 confidence,
955 metadata,
956 ..HandoffArtifact::default()
957 }
958 .normalize()
959}
960
961fn budget_remaining(
962 budget: &ExternalAgentBudget,
963 used: Option<&ExternalAgentBudgetUsage>,
964) -> Option<HandoffBudgetRemainingRecord> {
965 let used = used?;
966 Some(HandoffBudgetRemainingRecord {
967 tokens: budget
968 .max_tokens
969 .zip(used.tokens)
970 .map(|(cap, used)| cap as i64 - used as i64),
971 tool_calls: budget
972 .max_tool_calls
973 .zip(used.tool_calls)
974 .map(|(cap, used)| cap as i64 - used as i64),
975 dollars: budget.max_usd.zip(used.usd).map(|(cap, used)| cap - used),
976 })
977}
978
979fn reviewable_artifacts(handoff: &HandoffArtifact, result: &Value) -> Vec<Value> {
980 let mut artifacts = Vec::new();
981 artifacts
982 .push(serde_json::to_value(handoff_artifact_record(handoff, None)).unwrap_or(Value::Null));
983 artifacts.extend(
984 result
985 .get("artifacts")
986 .and_then(Value::as_array)
987 .into_iter()
988 .flatten()
989 .cloned(),
990 );
991 if let Some(diff) = result
992 .get("diff")
993 .or_else(|| result.get("patch"))
994 .and_then(Value::as_str)
995 .filter(|value| !value.is_empty())
996 {
997 let artifact = ArtifactRecord {
998 kind: "diff".to_string(),
999 title: Some("External agent diff".to_string()),
1000 text: Some(diff.to_string()),
1001 data: Some(serde_json::json!({
1002 "format": "unified",
1003 "diff": diff,
1004 })),
1005 source: Some("external_agent".to_string()),
1006 freshness: Some("fresh".to_string()),
1007 relevance: handoff.confidence,
1008 ..ArtifactRecord::default()
1009 }
1010 .normalize();
1011 artifacts.push(serde_json::to_value(artifact).unwrap_or(Value::Null));
1012 }
1013 artifacts
1014}
1015
1016fn extract_value_array(value: &Value, keys: &[&str]) -> Vec<Value> {
1017 keys.iter()
1018 .find_map(|key| value.get(*key).and_then(Value::as_array))
1019 .cloned()
1020 .unwrap_or_default()
1021}
1022
1023fn string_array_field(value: &Value, keys: &[&str]) -> Option<Vec<String>> {
1024 keys.iter()
1025 .find_map(|key| value.get(*key).and_then(value_to_strings))
1026}
1027
1028fn new_id(prefix: &str) -> String {
1029 format!("{prefix}_{}", uuid::Uuid::now_v7())
1030}
1031
1032#[cfg(test)]
1033mod tests {
1034 use super::*;
1035 use std::io::{Read, Write};
1036 use std::net::{TcpListener, TcpStream};
1037 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1038 use std::sync::Arc;
1039 use std::thread;
1040
1041 struct MockA2aServer {
1042 addr: String,
1043 state: Arc<MockState>,
1044 shutdown: Arc<AtomicBool>,
1045 handle: Option<thread::JoinHandle<()>>,
1046 }
1047
1048 struct MockState {
1049 card: Value,
1050 plan_response: Value,
1051 dispatch_response: Value,
1052 plan_count: AtomicUsize,
1053 dispatch_count: AtomicUsize,
1054 requests: Mutex<Vec<Value>>,
1055 }
1056
1057 impl MockA2aServer {
1058 fn new(card_capabilities: Value, plan_response: Value, dispatch_response: Value) -> Self {
1059 let listener = TcpListener::bind("127.0.0.1:0").expect("bind mock A2A server");
1060 let addr = listener.local_addr().expect("mock A2A addr");
1061 let url = format!("http://localhost:{}/rpc", addr.port());
1062 let card = serde_json::json!({
1063 "name": "mock external agent",
1064 "id": "mock-agent",
1065 "protocolVersion": "0.3.0",
1066 "url": url,
1067 "preferredTransport": "JSONRPC",
1068 "capabilities": card_capabilities,
1069 });
1070 let state = Arc::new(MockState {
1071 card,
1072 plan_response,
1073 dispatch_response,
1074 plan_count: AtomicUsize::new(0),
1075 dispatch_count: AtomicUsize::new(0),
1076 requests: Mutex::new(Vec::new()),
1077 });
1078 let shutdown = Arc::new(AtomicBool::new(false));
1079 let thread_state = Arc::clone(&state);
1080 let thread_shutdown = Arc::clone(&shutdown);
1081 let handle = thread::spawn(move || {
1082 while !thread_shutdown.load(Ordering::SeqCst) {
1083 let Ok((mut stream, _)) = listener.accept() else {
1084 break;
1085 };
1086 if thread_shutdown.load(Ordering::SeqCst) {
1087 break;
1088 }
1089 handle_connection(&mut stream, &thread_state);
1090 }
1091 });
1092 Self {
1093 addr: format!("127.0.0.1:{}", addr.port()),
1094 state,
1095 shutdown,
1096 handle: Some(handle),
1097 }
1098 }
1099
1100 fn target(&self) -> String {
1101 self.addr.clone()
1102 }
1103
1104 fn plan_count(&self) -> usize {
1105 self.state.plan_count.load(Ordering::SeqCst)
1106 }
1107
1108 fn dispatch_count(&self) -> usize {
1109 self.state.dispatch_count.load(Ordering::SeqCst)
1110 }
1111
1112 fn requests(&self) -> Vec<Value> {
1113 self.state
1114 .requests
1115 .lock()
1116 .expect("mock request log poisoned")
1117 .clone()
1118 }
1119 }
1120
1121 impl Drop for MockA2aServer {
1122 fn drop(&mut self) {
1123 self.shutdown.store(true, Ordering::SeqCst);
1124 let _ = TcpStream::connect(&self.addr);
1125 if let Some(handle) = self.handle.take() {
1126 let _ = handle.join();
1127 }
1128 }
1129 }
1130
1131 fn handle_connection(stream: &mut TcpStream, state: &MockState) {
1132 let Some((method, path, body)) = read_request(stream) else {
1133 return;
1134 };
1135 if method == "GET" && path == "/.well-known/agent-card.json" {
1136 write_response(stream, 200, &state.card);
1137 return;
1138 }
1139 if method == "POST" && path == "/rpc" {
1140 let request: Value =
1141 serde_json::from_slice(&body).expect("parse mock JSON-RPC request");
1142 state
1143 .requests
1144 .lock()
1145 .expect("mock request log poisoned")
1146 .push(request.clone());
1147 let response = match request.get("method").and_then(Value::as_str) {
1148 Some(A2A_PLAN_METHOD) => {
1149 state.plan_count.fetch_add(1, Ordering::SeqCst);
1150 state.plan_response.clone()
1151 }
1152 Some(A2A_DISPATCH_METHOD) => {
1153 state.dispatch_count.fetch_add(1, Ordering::SeqCst);
1154 state.dispatch_response.clone()
1155 }
1156 Some(method) => serde_json::json!({
1157 "error": {"code": -32601, "message": format!("unknown method {method}")},
1158 }),
1159 None => serde_json::json!({
1160 "error": {"code": -32600, "message": "missing method"},
1161 }),
1162 };
1163 let body = if response.get("error").is_some() {
1164 serde_json::json!({
1165 "jsonrpc": "2.0",
1166 "id": request.get("id").cloned().unwrap_or(Value::Null),
1167 "error": response["error"].clone(),
1168 })
1169 } else {
1170 serde_json::json!({
1171 "jsonrpc": "2.0",
1172 "id": request.get("id").cloned().unwrap_or(Value::Null),
1173 "result": response,
1174 })
1175 };
1176 write_response(stream, 200, &body);
1177 return;
1178 }
1179 write_response(stream, 404, &serde_json::json!({"error": "not found"}));
1180 }
1181
1182 fn read_request(stream: &mut TcpStream) -> Option<(String, String, Vec<u8>)> {
1183 let mut first = [0_u8; 1];
1184 stream.read_exact(&mut first).ok()?;
1185 if first[0] != b'G' && first[0] != b'P' {
1186 return None;
1187 }
1188 let mut header = vec![first[0]];
1189 while !header.ends_with(b"\r\n\r\n") {
1190 let mut byte = [0_u8; 1];
1191 stream.read_exact(&mut byte).ok()?;
1192 header.push(byte[0]);
1193 }
1194 let header_text = String::from_utf8_lossy(&header);
1195 let mut lines = header_text.lines();
1196 let request_line = lines.next()?;
1197 let mut parts = request_line.split_whitespace();
1198 let method = parts.next()?.to_string();
1199 let path = parts.next()?.to_string();
1200 let content_length = lines
1201 .filter_map(|line| line.split_once(':'))
1202 .find(|(name, _)| name.eq_ignore_ascii_case("content-length"))
1203 .and_then(|(_, value)| value.trim().parse::<usize>().ok())
1204 .unwrap_or(0);
1205 let mut body = vec![0_u8; content_length];
1206 if content_length > 0 {
1207 stream.read_exact(&mut body).ok()?;
1208 }
1209 Some((method, path, body))
1210 }
1211
1212 fn write_response(stream: &mut TcpStream, status: u16, body: &Value) {
1213 let body = serde_json::to_vec(body).expect("serialize mock response");
1214 let status_text = if status == 200 { "OK" } else { "Not Found" };
1215 let header = format!(
1216 "HTTP/1.1 {status} {status_text}\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n",
1217 body.len()
1218 );
1219 stream
1220 .write_all(header.as_bytes())
1221 .expect("write response header");
1222 stream.write_all(&body).expect("write response body");
1223 stream.flush().expect("flush response");
1224 }
1225
1226 fn full_capabilities() -> Value {
1227 serde_json::json!({
1228 "externalAgent": {
1229 "schema": EXTERNAL_AGENT_SCHEMA_ID,
1230 "pre_dispatch_checkpoint": true,
1231 "budget_cap": true,
1232 "idempotency": true,
1233 "reviewable_handoff": true,
1234 "dispatch": true,
1235 "operations": [A2A_PLAN_METHOD, A2A_DISPATCH_METHOD],
1236 },
1237 })
1238 }
1239
1240 fn dispatch_only_capabilities() -> Value {
1241 serde_json::json!({
1242 "externalAgent": {
1243 "schema": EXTERNAL_AGENT_SCHEMA_ID,
1244 "budget_cap": true,
1245 "idempotency": true,
1246 "reviewable_handoff": true,
1247 "dispatch": true,
1248 "operations": [A2A_DISPATCH_METHOD],
1249 },
1250 })
1251 }
1252
1253 fn request(
1254 server: &MockA2aServer,
1255 approved: bool,
1256 key: &str,
1257 ) -> ExternalAgentDelegationRequest {
1258 ExternalAgentDelegationRequest {
1259 target: server.target(),
1260 allow_cleartext: true,
1261 task: "edit src/lib.rs".to_string(),
1262 budget: ExternalAgentBudget {
1263 max_usd: Some(0.05),
1264 max_tokens: Some(500),
1265 ..ExternalAgentBudget::default()
1266 },
1267 checkpoint: ExternalAgentCheckpointPolicy {
1268 approved,
1269 allow_local_fallback: false,
1270 ..ExternalAgentCheckpointPolicy::default()
1271 },
1272 idempotency_key: Some(key.to_string()),
1273 expected_scope: vec!["src/lib.rs".to_string()],
1274 ..ExternalAgentDelegationRequest::default()
1275 }
1276 }
1277
1278 fn cancel_channel() -> (broadcast::Sender<()>, broadcast::Receiver<()>) {
1279 broadcast::channel(1)
1280 }
1281
1282 #[tokio::test]
1283 async fn checkpoint_is_required_before_dispatch_then_replayed() {
1284 reset_external_agent_state();
1285 let server = MockA2aServer::new(
1286 full_capabilities(),
1287 serde_json::json!({
1288 "checkpoint_id": "chk_1",
1289 "plan": "Inspect src/lib.rs, edit only the requested area, and return a reviewable diff.",
1290 "expected_scope": ["src/lib.rs"],
1291 }),
1292 serde_json::json!({
1293 "status": "completed",
1294 "diff": "--- a/src/lib.rs\n+++ b/src/lib.rs\n@@\n-ok\n+better\n",
1295 "budget_used": {"usd": 0.02, "tokens": 120},
1296 "confidence": 0.91,
1297 }),
1298 );
1299
1300 let (_tx, mut rx) = cancel_channel();
1301 let checkpoint = delegate_external_agent(request(&server, false, "idem-1"), &mut rx)
1302 .await
1303 .expect("checkpoint response");
1304 assert_eq!(checkpoint.status, "checkpoint_required");
1305 assert!(checkpoint.allow_cleartext);
1306 assert_eq!(server.plan_count(), 1);
1307 assert_eq!(server.dispatch_count(), 0);
1308
1309 let (_tx, mut rx) = cancel_channel();
1310 let completed = delegate_external_agent(request(&server, true, "idem-1"), &mut rx)
1311 .await
1312 .expect("approved dispatch");
1313 assert_eq!(completed.status, "completed");
1314 assert_eq!(
1315 completed.budget_used.as_ref().and_then(|used| used.tokens),
1316 Some(120)
1317 );
1318 assert_eq!(server.plan_count(), 1);
1319 assert_eq!(server.dispatch_count(), 1);
1320 assert!(completed.handoff.is_some());
1321 assert!(completed
1322 .artifacts
1323 .iter()
1324 .any(|artifact| { artifact.get("kind").and_then(Value::as_str) == Some("diff") }));
1325
1326 let (_tx, mut rx) = cancel_channel();
1327 let replayed = delegate_external_agent(request(&server, true, "idem-1"), &mut rx)
1328 .await
1329 .expect("idempotent replay");
1330 assert_eq!(replayed.status, "replayed");
1331 assert!(replayed.replayed);
1332 assert_eq!(server.dispatch_count(), 1);
1333
1334 let dispatch_request = server
1335 .requests()
1336 .into_iter()
1337 .find(|request| {
1338 request.get("method").and_then(Value::as_str) == Some(A2A_DISPATCH_METHOD)
1339 })
1340 .expect("dispatch request recorded");
1341 assert_eq!(
1342 dispatch_request
1343 .pointer("/params/budget/max_usd")
1344 .and_then(Value::as_f64),
1345 Some(0.05)
1346 );
1347 assert_eq!(
1348 dispatch_request
1349 .pointer("/params/idempotency_key")
1350 .and_then(Value::as_str),
1351 Some("idem-1")
1352 );
1353 }
1354
1355 #[tokio::test]
1356 async fn refuses_when_checkpoint_capability_is_missing() {
1357 reset_external_agent_state();
1358 let server = MockA2aServer::new(
1359 dispatch_only_capabilities(),
1360 serde_json::json!({"plan": "should not be called"}),
1361 serde_json::json!({"status": "completed"}),
1362 );
1363
1364 let (_tx, mut rx) = cancel_channel();
1365 let envelope = delegate_external_agent(request(&server, false, "idem-2"), &mut rx)
1366 .await
1367 .expect("refusal envelope");
1368 assert_eq!(envelope.status, "refused");
1369 assert!(envelope
1370 .error
1371 .as_deref()
1372 .unwrap_or_default()
1373 .contains("pre-dispatch checkpoint"));
1374 assert_eq!(server.plan_count(), 0);
1375 assert_eq!(server.dispatch_count(), 0);
1376 }
1377
1378 #[tokio::test]
1379 async fn refuses_before_plan_when_dispatch_capabilities_are_missing() {
1380 reset_external_agent_state();
1381 let server = MockA2aServer::new(
1382 serde_json::json!({
1383 "externalAgent": {
1384 "pre_dispatch_checkpoint": true,
1385 "dispatch": true,
1386 "operations": [A2A_PLAN_METHOD, A2A_DISPATCH_METHOD],
1387 },
1388 }),
1389 serde_json::json!({"plan": "should not be called"}),
1390 serde_json::json!({"status": "completed"}),
1391 );
1392
1393 let (_tx, mut rx) = cancel_channel();
1394 let envelope =
1395 delegate_external_agent(request(&server, false, "idem-missing-caps"), &mut rx)
1396 .await
1397 .expect("refusal envelope");
1398 assert_eq!(envelope.status, "refused");
1399 assert!(envelope
1400 .error
1401 .as_deref()
1402 .unwrap_or_default()
1403 .contains("budget_cap"));
1404 assert_eq!(server.plan_count(), 0);
1405 assert_eq!(server.dispatch_count(), 0);
1406 }
1407
1408 #[tokio::test]
1409 async fn local_checkpoint_fallback_still_requires_explicit_approval() {
1410 reset_external_agent_state();
1411 let server = MockA2aServer::new(
1412 dispatch_only_capabilities(),
1413 serde_json::json!({"plan": "should not be called"}),
1414 serde_json::json!({
1415 "status": "completed",
1416 "budget_used": {"usd": 0.01},
1417 "files": ["src/lib.rs"],
1418 }),
1419 );
1420 let mut unapproved = request(&server, false, "idem-3");
1421 unapproved.checkpoint.allow_local_fallback = true;
1422 unapproved.checkpoint.local_plan = Some("Local plan approved by host.".to_string());
1423
1424 let (_tx, mut rx) = cancel_channel();
1425 let checkpoint = delegate_external_agent(unapproved.clone(), &mut rx)
1426 .await
1427 .expect("local checkpoint");
1428 assert_eq!(checkpoint.status, "checkpoint_required");
1429 assert_eq!(
1430 checkpoint
1431 .checkpoint
1432 .as_ref()
1433 .map(|value| value.source.as_str()),
1434 Some("local_fallback")
1435 );
1436 assert_eq!(server.plan_count(), 0);
1437 assert_eq!(server.dispatch_count(), 0);
1438
1439 let mut approved = unapproved;
1440 approved.checkpoint.approved = true;
1441 let (_tx, mut rx) = cancel_channel();
1442 let completed = delegate_external_agent(approved, &mut rx)
1443 .await
1444 .expect("approved local fallback dispatch");
1445 assert_eq!(completed.status, "completed");
1446 assert_eq!(server.plan_count(), 0);
1447 assert_eq!(server.dispatch_count(), 1);
1448 }
1449
1450 #[tokio::test]
1451 async fn over_budget_result_is_reviewable_but_marked() {
1452 reset_external_agent_state();
1453 let server = MockA2aServer::new(
1454 full_capabilities(),
1455 serde_json::json!({"checkpoint_id": "chk_budget", "plan": "Spend less than cap."}),
1456 serde_json::json!({
1457 "status": "completed",
1458 "diff": "diff --git a/file b/file\n",
1459 "budget_used": {"usd": 0.10, "tokens": 100},
1460 }),
1461 );
1462 let mut first = request(&server, false, "idem-4");
1463 first.budget.max_usd = Some(0.05);
1464 let (_tx, mut rx) = cancel_channel();
1465 delegate_external_agent(first, &mut rx)
1466 .await
1467 .expect("checkpoint");
1468
1469 let mut approved = request(&server, true, "idem-4");
1470 approved.budget.max_usd = Some(0.05);
1471 let (_tx, mut rx) = cancel_channel();
1472 let envelope = delegate_external_agent(approved, &mut rx)
1473 .await
1474 .expect("budget-marked envelope");
1475 assert_eq!(envelope.status, "budget_exceeded");
1476 assert!(envelope
1477 .error
1478 .as_deref()
1479 .unwrap_or_default()
1480 .contains("budget"));
1481 assert!(envelope.handoff.is_some());
1482 assert_eq!(server.dispatch_count(), 1);
1483 }
1484}