Skip to main content

harn_vm/
external_agent.rs

1use std::collections::{BTreeMap, HashMap};
2use std::sync::{LazyLock, Mutex};
3
4use async_trait::async_trait;
5use serde::{Deserialize, Serialize};
6use serde_json::{Map, Value};
7use tokio::sync::broadcast;
8
9use crate::a2a::{A2aClientError, ResolvedA2aEndpoint};
10use crate::orchestration::{
11    extract_handoffs_from_json_value, handoff_artifact_record, ArtifactRecord, HandoffArtifact,
12    HandoffBudgetRemainingRecord, HandoffTargetRecord,
13};
14
15pub const EXTERNAL_AGENT_SCHEMA_ID: &str = "harn.external_agent.v1";
16pub const EXTERNAL_AGENT_HANDOFF_SCHEMA_ID: &str = "harn.external_agent.handoff.v1";
17pub const A2A_PLAN_METHOD: &str = "_harn/externalAgent.plan";
18pub const A2A_DISPATCH_METHOD: &str = "_harn/externalAgent.dispatch";
19
20static IDEMPOTENCY_CACHE: LazyLock<Mutex<HashMap<String, ExternalAgentDelegationEnvelope>>> =
21    LazyLock::new(|| Mutex::new(HashMap::new()));
22
23#[async_trait]
24pub trait ExternalAgent: Send + Sync {
25    async fn capabilities(
26        &self,
27        request: &ExternalAgentDelegationRequest,
28        cancel_rx: &mut broadcast::Receiver<()>,
29    ) -> Result<ExternalAgentPeer, ExternalAgentError>;
30
31    async fn plan(
32        &self,
33        peer: &ExternalAgentPeer,
34        request: &ExternalAgentDelegationRequest,
35        cancel_rx: &mut broadcast::Receiver<()>,
36    ) -> Result<Option<ExternalAgentPlanCheckpoint>, ExternalAgentError>;
37
38    async fn dispatch(
39        &self,
40        peer: &ExternalAgentPeer,
41        request: &ExternalAgentDelegationRequest,
42        checkpoint: &ExternalAgentPlanCheckpoint,
43        cancel_rx: &mut broadcast::Receiver<()>,
44    ) -> Result<Value, ExternalAgentError>;
45}
46
47#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
48#[serde(rename_all = "snake_case")]
49pub enum ExternalAgentTransport {
50    #[default]
51    A2a,
52}
53
54#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
55#[serde(default)]
56pub struct ExternalAgentBudget {
57    pub max_usd: Option<f64>,
58    pub max_tokens: Option<u64>,
59    pub max_seconds: Option<u64>,
60    pub max_tool_calls: Option<u64>,
61}
62
63#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
64#[serde(default)]
65pub struct ExternalAgentBudgetUsage {
66    #[serde(alias = "max_usd", alias = "dollars", alias = "cost_usd")]
67    pub usd: Option<f64>,
68    #[serde(alias = "max_tokens", alias = "token_count")]
69    pub tokens: Option<u64>,
70    #[serde(alias = "max_seconds", alias = "duration_seconds")]
71    pub seconds: Option<u64>,
72    #[serde(alias = "max_tool_calls")]
73    pub tool_calls: Option<u64>,
74}
75
76#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
77#[serde(default)]
78pub struct ExternalAgentCheckpointPolicy {
79    pub approved: bool,
80    pub approved_by: Option<String>,
81    pub allow_local_fallback: bool,
82    pub local_plan: Option<String>,
83}
84
85#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
86#[serde(default)]
87pub struct ExternalAgentDelegationRequest {
88    pub transport: ExternalAgentTransport,
89    pub target: String,
90    pub allow_cleartext: bool,
91    pub task: String,
92    pub budget: ExternalAgentBudget,
93    pub checkpoint: ExternalAgentCheckpointPolicy,
94    pub idempotency_key: Option<String>,
95    pub expected_scope: Vec<String>,
96    pub context: Value,
97    pub metadata: BTreeMap<String, Value>,
98}
99
100impl Default for ExternalAgentDelegationRequest {
101    fn default() -> Self {
102        Self {
103            transport: ExternalAgentTransport::A2a,
104            target: String::new(),
105            allow_cleartext: false,
106            task: String::new(),
107            budget: ExternalAgentBudget::default(),
108            checkpoint: ExternalAgentCheckpointPolicy::default(),
109            idempotency_key: None,
110            expected_scope: Vec::new(),
111            context: Value::Null,
112            metadata: BTreeMap::new(),
113        }
114    }
115}
116
117#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
118#[serde(default)]
119pub struct ExternalAgentCapabilities {
120    pub schema: Option<String>,
121    pub pre_dispatch_checkpoint: bool,
122    pub budget_cap: bool,
123    pub idempotency: bool,
124    pub reviewable_handoff: bool,
125    pub dispatch: bool,
126    pub operations: Vec<String>,
127    pub raw: Option<Value>,
128}
129
130#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
131#[serde(default)]
132pub struct ExternalAgentPeer {
133    pub transport: ExternalAgentTransport,
134    pub target: String,
135    pub card_url: String,
136    pub rpc_url: String,
137    pub agent_id: Option<String>,
138    pub target_agent: String,
139    pub capabilities: ExternalAgentCapabilities,
140    pub card: Value,
141}
142
143impl Default for ExternalAgentPeer {
144    fn default() -> Self {
145        Self {
146            transport: ExternalAgentTransport::A2a,
147            target: String::new(),
148            card_url: String::new(),
149            rpc_url: String::new(),
150            agent_id: None,
151            target_agent: String::new(),
152            capabilities: ExternalAgentCapabilities::default(),
153            card: Value::Null,
154        }
155    }
156}
157
158#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
159#[serde(default)]
160pub struct ExternalAgentPlanCheckpoint {
161    #[serde(rename = "_type")]
162    pub type_name: String,
163    pub schema: String,
164    pub checkpoint_id: String,
165    pub source: String,
166    pub plan: String,
167    pub expected_scope: Vec<String>,
168    pub budget: ExternalAgentBudget,
169    pub evidence_refs: Vec<Value>,
170    pub metadata: BTreeMap<String, Value>,
171}
172
173impl Default for ExternalAgentPlanCheckpoint {
174    fn default() -> Self {
175        Self {
176            type_name: "external_agent_checkpoint".to_string(),
177            schema: EXTERNAL_AGENT_SCHEMA_ID.to_string(),
178            checkpoint_id: new_id("external_checkpoint"),
179            source: "remote".to_string(),
180            plan: String::new(),
181            expected_scope: Vec::new(),
182            budget: ExternalAgentBudget::default(),
183            evidence_refs: Vec::new(),
184            metadata: BTreeMap::new(),
185        }
186    }
187}
188
189#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
190#[serde(default)]
191pub struct ExternalAgentDelegationEnvelope {
192    #[serde(rename = "_type")]
193    pub type_name: String,
194    pub schema: String,
195    pub id: String,
196    pub status: String,
197    pub error: Option<String>,
198    pub transport: ExternalAgentTransport,
199    pub target: String,
200    pub allow_cleartext: bool,
201    pub task: String,
202    pub budget: ExternalAgentBudget,
203    pub budget_used: Option<ExternalAgentBudgetUsage>,
204    pub idempotency_key: Option<String>,
205    pub capabilities: Option<ExternalAgentCapabilities>,
206    pub checkpoint: Option<ExternalAgentPlanCheckpoint>,
207    pub handoff: Option<Value>,
208    pub artifacts: Vec<Value>,
209    pub receipts: Vec<Value>,
210    pub evidence_refs: Vec<Value>,
211    pub result: Option<Value>,
212    pub replayed: bool,
213    pub replay_of: Option<String>,
214    pub metadata: BTreeMap<String, Value>,
215}
216
217impl Default for ExternalAgentDelegationEnvelope {
218    fn default() -> Self {
219        Self {
220            type_name: "external_agent_delegation".to_string(),
221            schema: EXTERNAL_AGENT_HANDOFF_SCHEMA_ID.to_string(),
222            id: new_id("external_delegate"),
223            status: "created".to_string(),
224            error: None,
225            transport: ExternalAgentTransport::A2a,
226            target: String::new(),
227            allow_cleartext: false,
228            task: String::new(),
229            budget: ExternalAgentBudget::default(),
230            budget_used: None,
231            idempotency_key: None,
232            capabilities: None,
233            checkpoint: None,
234            handoff: None,
235            artifacts: Vec::new(),
236            receipts: Vec::new(),
237            evidence_refs: Vec::new(),
238            result: None,
239            replayed: false,
240            replay_of: None,
241            metadata: BTreeMap::new(),
242        }
243    }
244}
245
246#[derive(Debug)]
247pub enum ExternalAgentError {
248    InvalidRequest(String),
249    Transport(String),
250    Protocol(String),
251}
252
253impl std::fmt::Display for ExternalAgentError {
254    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
255        match self {
256            Self::InvalidRequest(message) | Self::Transport(message) | Self::Protocol(message) => {
257                f.write_str(message)
258            }
259        }
260    }
261}
262
263impl std::error::Error for ExternalAgentError {}
264
265impl From<A2aClientError> for ExternalAgentError {
266    fn from(error: A2aClientError) -> Self {
267        match error {
268            A2aClientError::InvalidTarget(message) => Self::InvalidRequest(message),
269            A2aClientError::Discovery(message)
270            | A2aClientError::Denied(message)
271            | A2aClientError::Timeout(message)
272            | A2aClientError::Cancelled(message) => Self::Transport(message),
273            A2aClientError::Protocol(message) => Self::Protocol(message),
274        }
275    }
276}
277
278pub fn reset_external_agent_state() {
279    idempotency_cache().clear();
280}
281
282pub async fn delegate_external_agent(
283    request: ExternalAgentDelegationRequest,
284    cancel_rx: &mut broadcast::Receiver<()>,
285) -> Result<ExternalAgentDelegationEnvelope, ExternalAgentError> {
286    validate_request(&request)?;
287    let idempotency_key = request.idempotency_key.as_deref().unwrap_or_default();
288    if let Some(cached) = cached_response(&request, idempotency_key) {
289        return Ok(cached);
290    }
291
292    match request.transport {
293        ExternalAgentTransport::A2a => {
294            let agent = A2aExternalAgent;
295            delegate_with_agent(&agent, request, cancel_rx).await
296        }
297    }
298}
299
300async fn delegate_with_agent<A: ExternalAgent>(
301    agent: &A,
302    request: ExternalAgentDelegationRequest,
303    cancel_rx: &mut broadcast::Receiver<()>,
304) -> Result<ExternalAgentDelegationEnvelope, ExternalAgentError> {
305    let idempotency_key = request.idempotency_key.as_deref().unwrap_or_default();
306    let cached_checkpoint = cached_checkpoint(idempotency_key);
307    let peer = agent.capabilities(&request, cancel_rx).await?;
308    let mut base = base_envelope(&request, &peer);
309
310    let missing = missing_dispatch_capabilities(&peer.capabilities);
311    if !missing.is_empty() {
312        return refuse_and_cache(
313            base,
314            &format!(
315                "external agent is missing required delegation capabilities: {}",
316                missing.join(", ")
317            ),
318        );
319    }
320
321    let checkpoint = if let Some(checkpoint) = cached_checkpoint {
322        checkpoint
323    } else if peer.capabilities.pre_dispatch_checkpoint {
324        match agent.plan(&peer, &request, cancel_rx).await? {
325            Some(checkpoint) => checkpoint,
326            None => {
327                return refuse_and_cache(
328                    base,
329                    "external agent did not return a pre-dispatch checkpoint",
330                );
331            }
332        }
333    } else if request.checkpoint.allow_local_fallback {
334        synthesize_local_checkpoint(&request)
335    } else {
336        return refuse_and_cache(
337            base,
338            "external agent does not advertise pre-dispatch checkpoint support",
339        );
340    };
341
342    base.checkpoint = Some(checkpoint.clone());
343    if !request.checkpoint.approved {
344        base.status = "checkpoint_required".to_string();
345        cache_envelope(idempotency_key, &base);
346        return Ok(base);
347    }
348
349    let result = agent
350        .dispatch(&peer, &request, &checkpoint, cancel_rx)
351        .await?;
352    let budget_used = parse_budget_usage(&result);
353    let exceeded = budget_used
354        .as_ref()
355        .is_some_and(|used| budget_exceeded(&request.budget, used));
356    base.status = if exceeded {
357        "budget_exceeded".to_string()
358    } else {
359        "completed".to_string()
360    };
361    if exceeded {
362        base.error = Some("external agent reported usage over the approved budget cap".to_string());
363    }
364    base.budget_used = budget_used;
365    base.result = Some(result.clone());
366    base.receipts = extract_value_array(&result, &["receipts", "receipt_links"]);
367    base.evidence_refs = extract_value_array(&result, &["evidence_refs", "evidence"]);
368    let handoff = first_handoff_or_synthesize(&request, &peer, &checkpoint, &result);
369    let handoff_json = serde_json::to_value(&handoff).unwrap_or(Value::Null);
370    base.handoff = Some(handoff_json);
371    base.artifacts = reviewable_artifacts(&handoff, &result);
372    cache_envelope(idempotency_key, &base);
373    Ok(base)
374}
375
376pub struct A2aExternalAgent;
377
378#[async_trait]
379impl ExternalAgent for A2aExternalAgent {
380    async fn capabilities(
381        &self,
382        request: &ExternalAgentDelegationRequest,
383        cancel_rx: &mut broadcast::Receiver<()>,
384    ) -> Result<ExternalAgentPeer, ExternalAgentError> {
385        let target = normalize_a2a_target(&request.target);
386        let resolved =
387            crate::a2a::resolve_agent(&target, request.allow_cleartext, cancel_rx).await?;
388        Ok(peer_from_a2a(
389            &request.target,
390            resolved.endpoint,
391            resolved.card,
392        ))
393    }
394
395    async fn plan(
396        &self,
397        peer: &ExternalAgentPeer,
398        request: &ExternalAgentDelegationRequest,
399        cancel_rx: &mut broadcast::Receiver<()>,
400    ) -> Result<Option<ExternalAgentPlanCheckpoint>, ExternalAgentError> {
401        let request_id = format!(
402            "{}.plan",
403            request
404                .idempotency_key
405                .as_deref()
406                .unwrap_or("external-agent")
407        );
408        let response = send_a2a_rpc(
409            peer,
410            request_id,
411            A2A_PLAN_METHOD,
412            serde_json::json!({
413                "schema": EXTERNAL_AGENT_SCHEMA_ID,
414                "target_agent": peer.target_agent,
415                "task": request.task,
416                "budget": request.budget,
417                "idempotency_key": request.idempotency_key,
418                "expected_scope": request.expected_scope,
419                "context": request.context,
420                "metadata": request.metadata,
421            }),
422            cancel_rx,
423        )
424        .await?;
425        Ok(parse_remote_checkpoint(&response, request))
426    }
427
428    async fn dispatch(
429        &self,
430        peer: &ExternalAgentPeer,
431        request: &ExternalAgentDelegationRequest,
432        checkpoint: &ExternalAgentPlanCheckpoint,
433        cancel_rx: &mut broadcast::Receiver<()>,
434    ) -> Result<Value, ExternalAgentError> {
435        let request_id = format!(
436            "{}.dispatch",
437            request
438                .idempotency_key
439                .as_deref()
440                .unwrap_or("external-agent")
441        );
442        send_a2a_rpc(
443            peer,
444            request_id,
445            A2A_DISPATCH_METHOD,
446            serde_json::json!({
447                "schema": EXTERNAL_AGENT_SCHEMA_ID,
448                "target_agent": peer.target_agent,
449                "task": request.task,
450                "budget": request.budget,
451                "idempotency_key": request.idempotency_key,
452                "checkpoint": checkpoint,
453                "expected_scope": request.expected_scope,
454                "context": request.context,
455                "metadata": request.metadata,
456            }),
457            cancel_rx,
458        )
459        .await
460    }
461}
462
463fn validate_request(request: &ExternalAgentDelegationRequest) -> Result<(), ExternalAgentError> {
464    if request.target.trim().is_empty() {
465        return Err(ExternalAgentError::InvalidRequest(
466            "external_agent_delegate: target is required".to_string(),
467        ));
468    }
469    if request.task.trim().is_empty() {
470        return Err(ExternalAgentError::InvalidRequest(
471            "external_agent_delegate: task is required".to_string(),
472        ));
473    }
474    let Some(idempotency_key) = request.idempotency_key.as_deref() else {
475        return Err(ExternalAgentError::InvalidRequest(
476            "external_agent_delegate: idempotency_key is required".to_string(),
477        ));
478    };
479    if idempotency_key.trim().is_empty() {
480        return Err(ExternalAgentError::InvalidRequest(
481            "external_agent_delegate: idempotency_key is required".to_string(),
482        ));
483    }
484    if !budget_has_cap(&request.budget) {
485        return Err(ExternalAgentError::InvalidRequest(
486            "external_agent_delegate: budget must include at least one positive cap".to_string(),
487        ));
488    }
489    Ok(())
490}
491
492fn budget_has_cap(budget: &ExternalAgentBudget) -> bool {
493    budget.max_usd.is_some_and(|value| value > 0.0)
494        || budget.max_tokens.is_some_and(|value| value > 0)
495        || budget.max_seconds.is_some_and(|value| value > 0)
496        || budget.max_tool_calls.is_some_and(|value| value > 0)
497}
498
499fn cached_response(
500    request: &ExternalAgentDelegationRequest,
501    idempotency_key: &str,
502) -> Option<ExternalAgentDelegationEnvelope> {
503    let cached = idempotency_cache().get(idempotency_key).cloned()?;
504    if cached.status == "checkpoint_required" && request.checkpoint.approved {
505        return None;
506    }
507    if cached.status == "checkpoint_required" {
508        return Some(cached);
509    }
510    Some(replay_envelope(cached))
511}
512
513fn cached_checkpoint(idempotency_key: &str) -> Option<ExternalAgentPlanCheckpoint> {
514    idempotency_cache()
515        .get(idempotency_key)
516        .filter(|envelope| envelope.status == "checkpoint_required")
517        .and_then(|envelope| envelope.checkpoint.clone())
518}
519
520fn cache_envelope(idempotency_key: &str, envelope: &ExternalAgentDelegationEnvelope) {
521    idempotency_cache().insert(idempotency_key.to_string(), envelope.clone());
522}
523
524fn idempotency_cache(
525) -> std::sync::MutexGuard<'static, HashMap<String, ExternalAgentDelegationEnvelope>> {
526    IDEMPOTENCY_CACHE
527        .lock()
528        .unwrap_or_else(|poisoned| poisoned.into_inner())
529}
530
531fn replay_envelope(original: ExternalAgentDelegationEnvelope) -> ExternalAgentDelegationEnvelope {
532    let mut replayed = original.clone();
533    replayed.id = new_id("external_delegate");
534    replayed.status = "replayed".to_string();
535    replayed.replayed = true;
536    replayed.replay_of = Some(original.id);
537    replayed
538}
539
540fn base_envelope(
541    request: &ExternalAgentDelegationRequest,
542    peer: &ExternalAgentPeer,
543) -> ExternalAgentDelegationEnvelope {
544    let mut metadata = request.metadata.clone();
545    metadata.insert("card_url".to_string(), Value::String(peer.card_url.clone()));
546    metadata.insert("rpc_url".to_string(), Value::String(peer.rpc_url.clone()));
547    if let Some(agent_id) = peer.agent_id.as_ref() {
548        metadata.insert("agent_id".to_string(), Value::String(agent_id.clone()));
549    }
550    ExternalAgentDelegationEnvelope {
551        status: "ready".to_string(),
552        transport: request.transport.clone(),
553        target: request.target.clone(),
554        allow_cleartext: request.allow_cleartext,
555        task: request.task.clone(),
556        budget: request.budget.clone(),
557        idempotency_key: request.idempotency_key.clone(),
558        capabilities: Some(peer.capabilities.clone()),
559        metadata,
560        ..ExternalAgentDelegationEnvelope::default()
561    }
562}
563
564fn refuse_and_cache(
565    mut envelope: ExternalAgentDelegationEnvelope,
566    reason: &str,
567) -> Result<ExternalAgentDelegationEnvelope, ExternalAgentError> {
568    envelope.status = "refused".to_string();
569    envelope.error = Some(reason.to_string());
570    if let Some(key) = envelope.idempotency_key.clone() {
571        cache_envelope(&key, &envelope);
572    }
573    Ok(envelope)
574}
575
576fn missing_dispatch_capabilities(capabilities: &ExternalAgentCapabilities) -> Vec<&'static str> {
577    let mut missing = Vec::new();
578    if !capabilities.dispatch {
579        missing.push("dispatch");
580    }
581    if !capabilities.budget_cap {
582        missing.push("budget_cap");
583    }
584    if !capabilities.idempotency {
585        missing.push("idempotency");
586    }
587    if !capabilities.reviewable_handoff {
588        missing.push("reviewable_handoff");
589    }
590    missing
591}
592
593fn peer_from_a2a(target: &str, endpoint: ResolvedA2aEndpoint, card: Value) -> ExternalAgentPeer {
594    ExternalAgentPeer {
595        transport: ExternalAgentTransport::A2a,
596        target: target.to_string(),
597        card_url: endpoint.card_url,
598        rpc_url: endpoint.rpc_url,
599        agent_id: endpoint.agent_id,
600        target_agent: endpoint.target_agent,
601        capabilities: capabilities_from_card(&card),
602        card,
603    }
604}
605
606fn capabilities_from_card(card: &Value) -> ExternalAgentCapabilities {
607    let mut capabilities = ExternalAgentCapabilities::default();
608    let raw = external_agent_metadata(card);
609    if let Some(raw) = raw {
610        capabilities.raw = Some(Value::Object(raw.clone()));
611        capabilities.schema = string_field(raw, &["schema", "schema_id"])
612            .or_else(|| Some(EXTERNAL_AGENT_SCHEMA_ID.to_string()));
613        capabilities.pre_dispatch_checkpoint = bool_field(
614            raw,
615            &[
616                "pre_dispatch_checkpoint",
617                "preDispatchCheckpoint",
618                "checkpoint",
619            ],
620        );
621        capabilities.budget_cap = bool_field(raw, &["budget_cap", "budgetCap"]);
622        capabilities.idempotency = bool_field(raw, &["idempotency", "idempotency_key"]);
623        capabilities.reviewable_handoff =
624            bool_field(raw, &["reviewable_handoff", "reviewableHandoff", "handoff"]);
625        capabilities.dispatch = bool_field(raw, &["dispatch"]);
626        capabilities.operations =
627            strings_field(raw, &["operations", "methods", "extensionMethods"]);
628    }
629
630    for operation in extension_operations(card) {
631        if !capabilities.operations.contains(&operation) {
632            capabilities.operations.push(operation);
633        }
634    }
635    if capabilities
636        .operations
637        .iter()
638        .any(|op| op == A2A_PLAN_METHOD)
639    {
640        capabilities.pre_dispatch_checkpoint = true;
641    }
642    if capabilities
643        .operations
644        .iter()
645        .any(|op| op == A2A_DISPATCH_METHOD)
646    {
647        capabilities.dispatch = true;
648    }
649    if card_contains_schema(card, EXTERNAL_AGENT_SCHEMA_ID) {
650        capabilities.schema = Some(EXTERNAL_AGENT_SCHEMA_ID.to_string());
651    }
652    capabilities
653}
654
655fn external_agent_metadata(card: &Value) -> Option<&Map<String, Value>> {
656    object_at(card, &["_meta", "harn", "externalAgent"])
657        .or_else(|| object_at(card, &["_meta", "harn", "external_agent"]))
658        .or_else(|| object_at(card, &["capabilities", "_meta", "harn", "externalAgent"]))
659        .or_else(|| object_at(card, &["capabilities", "_meta", "harn", "external_agent"]))
660        .or_else(|| object_at(card, &["capabilities", "externalAgent"]))
661        .or_else(|| object_at(card, &["capabilities", "external_agent"]))
662}
663
664fn object_at<'a>(value: &'a Value, path: &[&str]) -> Option<&'a Map<String, Value>> {
665    let mut cursor = value;
666    for key in path {
667        cursor = cursor.get(*key)?;
668    }
669    cursor.as_object()
670}
671
672fn bool_field(object: &Map<String, Value>, keys: &[&str]) -> bool {
673    keys.iter()
674        .any(|key| object.get(*key).and_then(Value::as_bool).unwrap_or(false))
675}
676
677fn string_field(object: &Map<String, Value>, keys: &[&str]) -> Option<String> {
678    keys.iter().find_map(|key| {
679        object
680            .get(*key)
681            .and_then(Value::as_str)
682            .map(str::to_string)
683            .filter(|value| !value.is_empty())
684    })
685}
686
687fn strings_field(object: &Map<String, Value>, keys: &[&str]) -> Vec<String> {
688    keys.iter()
689        .find_map(|key| value_to_strings(object.get(*key)?))
690        .unwrap_or_default()
691}
692
693fn value_to_strings(value: &Value) -> Option<Vec<String>> {
694    match value {
695        Value::Array(items) => Some(
696            items
697                .iter()
698                .filter_map(Value::as_str)
699                .map(str::to_string)
700                .filter(|value| !value.is_empty())
701                .collect(),
702        ),
703        Value::String(value) if !value.is_empty() => Some(vec![value.clone()]),
704        _ => None,
705    }
706}
707
708fn extension_operations(card: &Value) -> Vec<String> {
709    let mut operations = Vec::new();
710    for path in [
711        &["capabilities", "extensions"][..],
712        &["_meta", "harn", "extensions"][..],
713        &["extensions"][..],
714    ] {
715        let Some(items) = value_at(card, path).and_then(Value::as_array) else {
716            continue;
717        };
718        for item in items {
719            match item {
720                Value::String(value) if value == EXTERNAL_AGENT_SCHEMA_ID => {
721                    operations.push(A2A_PLAN_METHOD.to_string());
722                    operations.push(A2A_DISPATCH_METHOD.to_string());
723                }
724                Value::String(value)
725                    if value == A2A_PLAN_METHOD || value == A2A_DISPATCH_METHOD =>
726                {
727                    operations.push(value.clone());
728                }
729                Value::Object(object) => {
730                    if object
731                        .values()
732                        .filter_map(Value::as_str)
733                        .any(|value| value == EXTERNAL_AGENT_SCHEMA_ID)
734                    {
735                        operations.push(A2A_PLAN_METHOD.to_string());
736                        operations.push(A2A_DISPATCH_METHOD.to_string());
737                    }
738                    operations.extend(strings_field(object, &["operations", "methods"]));
739                }
740                _ => {}
741            }
742        }
743    }
744    operations.sort();
745    operations.dedup();
746    operations
747}
748
749fn value_at<'a>(value: &'a Value, path: &[&str]) -> Option<&'a Value> {
750    let mut cursor = value;
751    for key in path {
752        cursor = cursor.get(*key)?;
753    }
754    Some(cursor)
755}
756
757fn card_contains_schema(card: &Value, schema: &str) -> bool {
758    match card {
759        Value::String(value) => value == schema,
760        Value::Array(items) => items
761            .iter()
762            .any(|value| card_contains_schema(value, schema)),
763        Value::Object(object) => object
764            .values()
765            .any(|value| card_contains_schema(value, schema)),
766        _ => false,
767    }
768}
769
770fn normalize_a2a_target(target: &str) -> String {
771    target
772        .trim()
773        .strip_prefix("a2a://")
774        .unwrap_or_else(|| target.trim())
775        .to_string()
776}
777
778async fn send_a2a_rpc(
779    peer: &ExternalAgentPeer,
780    request_id: String,
781    method: &str,
782    params: Value,
783    cancel_rx: &mut broadcast::Receiver<()>,
784) -> Result<Value, ExternalAgentError> {
785    let request = crate::jsonrpc::request(request_id.clone(), method, params);
786    let body =
787        crate::a2a::send_jsonrpc_request(&peer.rpc_url, &request, &request_id, cancel_rx).await?;
788    if let Some(error) = body.get("error") {
789        let message = error
790            .get("message")
791            .and_then(Value::as_str)
792            .unwrap_or("unknown external agent error");
793        return Err(ExternalAgentError::Protocol(format!(
794            "{method} failed: {message}"
795        )));
796    }
797    body.get("result")
798        .cloned()
799        .ok_or_else(|| ExternalAgentError::Protocol(format!("{method} response missing result")))
800}
801
802fn parse_remote_checkpoint(
803    result: &Value,
804    request: &ExternalAgentDelegationRequest,
805) -> Option<ExternalAgentPlanCheckpoint> {
806    let checkpoint_value = result.get("checkpoint").unwrap_or(result);
807    let plan = checkpoint_value
808        .get("plan")
809        .or_else(|| checkpoint_value.get("summary"))
810        .and_then(Value::as_str)
811        .map(str::trim)
812        .filter(|value| !value.is_empty())?;
813    let expected_scope = string_array_field(checkpoint_value, &["expected_scope", "scope"])
814        .filter(|items| !items.is_empty())
815        .unwrap_or_else(|| request.expected_scope.clone());
816    let checkpoint_id = checkpoint_value
817        .get("checkpoint_id")
818        .or_else(|| checkpoint_value.get("id"))
819        .and_then(Value::as_str)
820        .map(str::to_string)
821        .filter(|value| !value.is_empty())
822        .unwrap_or_else(|| new_id("external_checkpoint"));
823    let evidence_refs = extract_value_array(checkpoint_value, &["evidence_refs", "evidence"]);
824    let metadata = checkpoint_value
825        .get("metadata")
826        .and_then(Value::as_object)
827        .map(|object| {
828            object
829                .iter()
830                .map(|(key, value)| (key.clone(), value.clone()))
831                .collect()
832        })
833        .unwrap_or_default();
834    Some(ExternalAgentPlanCheckpoint {
835        checkpoint_id,
836        plan: plan.to_string(),
837        expected_scope,
838        budget: request.budget.clone(),
839        evidence_refs,
840        metadata,
841        ..ExternalAgentPlanCheckpoint::default()
842    })
843}
844
845fn synthesize_local_checkpoint(
846    request: &ExternalAgentDelegationRequest,
847) -> ExternalAgentPlanCheckpoint {
848    let plan = request.checkpoint.local_plan.clone().unwrap_or_else(|| {
849        let scope = if request.expected_scope.is_empty() {
850            "Remote agent must state any files or entities before mutating them.".to_string()
851        } else {
852            format!(
853                "Remote agent may work only within: {}.",
854                request.expected_scope.join(", ")
855            )
856        };
857        format!(
858            "Delegate task after local approval. Task: {} {scope}",
859            request.task
860        )
861    });
862    ExternalAgentPlanCheckpoint {
863        source: "local_fallback".to_string(),
864        plan,
865        expected_scope: request.expected_scope.clone(),
866        budget: request.budget.clone(),
867        ..ExternalAgentPlanCheckpoint::default()
868    }
869}
870
871fn parse_budget_usage(result: &Value) -> Option<ExternalAgentBudgetUsage> {
872    let value = result
873        .get("budget_used")
874        .or_else(|| result.pointer("/budget/used"))
875        .or_else(|| result.get("usage"))?;
876    serde_json::from_value(value.clone()).ok()
877}
878
879fn budget_exceeded(budget: &ExternalAgentBudget, used: &ExternalAgentBudgetUsage) -> bool {
880    budget
881        .max_usd
882        .zip(used.usd)
883        .is_some_and(|(cap, used)| used > cap)
884        || budget
885            .max_tokens
886            .zip(used.tokens)
887            .is_some_and(|(cap, used)| used > cap)
888        || budget
889            .max_seconds
890            .zip(used.seconds)
891            .is_some_and(|(cap, used)| used > cap)
892        || budget
893            .max_tool_calls
894            .zip(used.tool_calls)
895            .is_some_and(|(cap, used)| used > cap)
896}
897
898fn first_handoff_or_synthesize(
899    request: &ExternalAgentDelegationRequest,
900    peer: &ExternalAgentPeer,
901    checkpoint: &ExternalAgentPlanCheckpoint,
902    result: &Value,
903) -> HandoffArtifact {
904    extract_handoffs_from_json_value(result)
905        .into_iter()
906        .next()
907        .unwrap_or_else(|| synthesize_handoff(request, peer, checkpoint, result))
908}
909
910fn synthesize_handoff(
911    request: &ExternalAgentDelegationRequest,
912    peer: &ExternalAgentPeer,
913    checkpoint: &ExternalAgentPlanCheckpoint,
914    result: &Value,
915) -> HandoffArtifact {
916    let files = string_array_field(result, &["files_or_entities_touched", "files", "paths"])
917        .filter(|items| !items.is_empty())
918        .unwrap_or_else(|| checkpoint.expected_scope.clone());
919    let confidence = result.get("confidence").and_then(Value::as_f64);
920    let mut metadata = BTreeMap::new();
921    metadata.insert(
922        "schema".to_string(),
923        Value::String(EXTERNAL_AGENT_HANDOFF_SCHEMA_ID.to_string()),
924    );
925    metadata.insert("card_url".to_string(), Value::String(peer.card_url.clone()));
926    metadata.insert("rpc_url".to_string(), Value::String(peer.rpc_url.clone()));
927    metadata.insert(
928        "checkpoint_id".to_string(),
929        Value::String(checkpoint.checkpoint_id.clone()),
930    );
931    if let Some(key) = request.idempotency_key.as_ref() {
932        metadata.insert("idempotency_key".to_string(), Value::String(key.clone()));
933    }
934    HandoffArtifact {
935        type_name: "handoff_artifact".to_string(),
936        kind: "external_agent_delegation".to_string(),
937        id: new_id("handoff"),
938        source_persona: "external_agent".to_string(),
939        target_persona_or_human: HandoffTargetRecord {
940            kind: "a2a".to_string(),
941            id: peer.agent_id.clone().or_else(|| Some(peer.target.clone())),
942            label: Some(peer.target_agent.clone()).filter(|value| !value.is_empty()),
943            uri: Some(peer.card_url.clone()),
944        },
945        task: request.task.clone(),
946        reason: "External agent returned delegated work for review.".to_string(),
947        files_or_entities_touched: files,
948        requested_capabilities: peer.capabilities.operations.clone(),
949        allowed_side_effects: vec![
950            "reviewable_handoff".to_string(),
951            "reviewable_diff".to_string(),
952        ],
953        budget_remaining: budget_remaining(&request.budget, parse_budget_usage(result).as_ref()),
954        confidence,
955        metadata,
956        ..HandoffArtifact::default()
957    }
958    .normalize()
959}
960
961fn budget_remaining(
962    budget: &ExternalAgentBudget,
963    used: Option<&ExternalAgentBudgetUsage>,
964) -> Option<HandoffBudgetRemainingRecord> {
965    let used = used?;
966    Some(HandoffBudgetRemainingRecord {
967        tokens: budget
968            .max_tokens
969            .zip(used.tokens)
970            .map(|(cap, used)| cap as i64 - used as i64),
971        tool_calls: budget
972            .max_tool_calls
973            .zip(used.tool_calls)
974            .map(|(cap, used)| cap as i64 - used as i64),
975        dollars: budget.max_usd.zip(used.usd).map(|(cap, used)| cap - used),
976    })
977}
978
979fn reviewable_artifacts(handoff: &HandoffArtifact, result: &Value) -> Vec<Value> {
980    let mut artifacts = Vec::new();
981    artifacts
982        .push(serde_json::to_value(handoff_artifact_record(handoff, None)).unwrap_or(Value::Null));
983    artifacts.extend(
984        result
985            .get("artifacts")
986            .and_then(Value::as_array)
987            .into_iter()
988            .flatten()
989            .cloned(),
990    );
991    if let Some(diff) = result
992        .get("diff")
993        .or_else(|| result.get("patch"))
994        .and_then(Value::as_str)
995        .filter(|value| !value.is_empty())
996    {
997        let artifact = ArtifactRecord {
998            kind: "diff".to_string(),
999            title: Some("External agent diff".to_string()),
1000            text: Some(diff.to_string()),
1001            data: Some(serde_json::json!({
1002                "format": "unified",
1003                "diff": diff,
1004            })),
1005            source: Some("external_agent".to_string()),
1006            freshness: Some("fresh".to_string()),
1007            relevance: handoff.confidence,
1008            ..ArtifactRecord::default()
1009        }
1010        .normalize();
1011        artifacts.push(serde_json::to_value(artifact).unwrap_or(Value::Null));
1012    }
1013    artifacts
1014}
1015
1016fn extract_value_array(value: &Value, keys: &[&str]) -> Vec<Value> {
1017    keys.iter()
1018        .find_map(|key| value.get(*key).and_then(Value::as_array))
1019        .cloned()
1020        .unwrap_or_default()
1021}
1022
1023fn string_array_field(value: &Value, keys: &[&str]) -> Option<Vec<String>> {
1024    keys.iter()
1025        .find_map(|key| value.get(*key).and_then(value_to_strings))
1026}
1027
1028fn new_id(prefix: &str) -> String {
1029    format!("{prefix}_{}", uuid::Uuid::now_v7())
1030}
1031
1032#[cfg(test)]
1033mod tests {
1034    use super::*;
1035    use std::io::{Read, Write};
1036    use std::net::{TcpListener, TcpStream};
1037    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1038    use std::sync::Arc;
1039    use std::thread;
1040
1041    struct MockA2aServer {
1042        addr: String,
1043        state: Arc<MockState>,
1044        shutdown: Arc<AtomicBool>,
1045        handle: Option<thread::JoinHandle<()>>,
1046    }
1047
1048    struct MockState {
1049        card: Value,
1050        plan_response: Value,
1051        dispatch_response: Value,
1052        plan_count: AtomicUsize,
1053        dispatch_count: AtomicUsize,
1054        requests: Mutex<Vec<Value>>,
1055    }
1056
1057    impl MockA2aServer {
1058        fn new(card_capabilities: Value, plan_response: Value, dispatch_response: Value) -> Self {
1059            let listener = TcpListener::bind("127.0.0.1:0").expect("bind mock A2A server");
1060            let addr = listener.local_addr().expect("mock A2A addr");
1061            let url = format!("http://localhost:{}/rpc", addr.port());
1062            let card = serde_json::json!({
1063                "name": "mock external agent",
1064                "id": "mock-agent",
1065                "protocolVersion": "0.3.0",
1066                "url": url,
1067                "preferredTransport": "JSONRPC",
1068                "capabilities": card_capabilities,
1069            });
1070            let state = Arc::new(MockState {
1071                card,
1072                plan_response,
1073                dispatch_response,
1074                plan_count: AtomicUsize::new(0),
1075                dispatch_count: AtomicUsize::new(0),
1076                requests: Mutex::new(Vec::new()),
1077            });
1078            let shutdown = Arc::new(AtomicBool::new(false));
1079            let thread_state = Arc::clone(&state);
1080            let thread_shutdown = Arc::clone(&shutdown);
1081            let handle = thread::spawn(move || {
1082                while !thread_shutdown.load(Ordering::SeqCst) {
1083                    let Ok((mut stream, _)) = listener.accept() else {
1084                        break;
1085                    };
1086                    if thread_shutdown.load(Ordering::SeqCst) {
1087                        break;
1088                    }
1089                    handle_connection(&mut stream, &thread_state);
1090                }
1091            });
1092            Self {
1093                addr: format!("127.0.0.1:{}", addr.port()),
1094                state,
1095                shutdown,
1096                handle: Some(handle),
1097            }
1098        }
1099
1100        fn target(&self) -> String {
1101            self.addr.clone()
1102        }
1103
1104        fn plan_count(&self) -> usize {
1105            self.state.plan_count.load(Ordering::SeqCst)
1106        }
1107
1108        fn dispatch_count(&self) -> usize {
1109            self.state.dispatch_count.load(Ordering::SeqCst)
1110        }
1111
1112        fn requests(&self) -> Vec<Value> {
1113            self.state
1114                .requests
1115                .lock()
1116                .expect("mock request log poisoned")
1117                .clone()
1118        }
1119    }
1120
1121    impl Drop for MockA2aServer {
1122        fn drop(&mut self) {
1123            self.shutdown.store(true, Ordering::SeqCst);
1124            let _ = TcpStream::connect(&self.addr);
1125            if let Some(handle) = self.handle.take() {
1126                let _ = handle.join();
1127            }
1128        }
1129    }
1130
1131    fn handle_connection(stream: &mut TcpStream, state: &MockState) {
1132        let Some((method, path, body)) = read_request(stream) else {
1133            return;
1134        };
1135        if method == "GET" && path == "/.well-known/agent-card.json" {
1136            write_response(stream, 200, &state.card);
1137            return;
1138        }
1139        if method == "POST" && path == "/rpc" {
1140            let request: Value =
1141                serde_json::from_slice(&body).expect("parse mock JSON-RPC request");
1142            state
1143                .requests
1144                .lock()
1145                .expect("mock request log poisoned")
1146                .push(request.clone());
1147            let response = match request.get("method").and_then(Value::as_str) {
1148                Some(A2A_PLAN_METHOD) => {
1149                    state.plan_count.fetch_add(1, Ordering::SeqCst);
1150                    state.plan_response.clone()
1151                }
1152                Some(A2A_DISPATCH_METHOD) => {
1153                    state.dispatch_count.fetch_add(1, Ordering::SeqCst);
1154                    state.dispatch_response.clone()
1155                }
1156                Some(method) => serde_json::json!({
1157                    "error": {"code": -32601, "message": format!("unknown method {method}")},
1158                }),
1159                None => serde_json::json!({
1160                    "error": {"code": -32600, "message": "missing method"},
1161                }),
1162            };
1163            let body = if response.get("error").is_some() {
1164                serde_json::json!({
1165                    "jsonrpc": "2.0",
1166                    "id": request.get("id").cloned().unwrap_or(Value::Null),
1167                    "error": response["error"].clone(),
1168                })
1169            } else {
1170                serde_json::json!({
1171                    "jsonrpc": "2.0",
1172                    "id": request.get("id").cloned().unwrap_or(Value::Null),
1173                    "result": response,
1174                })
1175            };
1176            write_response(stream, 200, &body);
1177            return;
1178        }
1179        write_response(stream, 404, &serde_json::json!({"error": "not found"}));
1180    }
1181
1182    fn read_request(stream: &mut TcpStream) -> Option<(String, String, Vec<u8>)> {
1183        let mut first = [0_u8; 1];
1184        stream.read_exact(&mut first).ok()?;
1185        if first[0] != b'G' && first[0] != b'P' {
1186            return None;
1187        }
1188        let mut header = vec![first[0]];
1189        while !header.ends_with(b"\r\n\r\n") {
1190            let mut byte = [0_u8; 1];
1191            stream.read_exact(&mut byte).ok()?;
1192            header.push(byte[0]);
1193        }
1194        let header_text = String::from_utf8_lossy(&header);
1195        let mut lines = header_text.lines();
1196        let request_line = lines.next()?;
1197        let mut parts = request_line.split_whitespace();
1198        let method = parts.next()?.to_string();
1199        let path = parts.next()?.to_string();
1200        let content_length = lines
1201            .filter_map(|line| line.split_once(':'))
1202            .find(|(name, _)| name.eq_ignore_ascii_case("content-length"))
1203            .and_then(|(_, value)| value.trim().parse::<usize>().ok())
1204            .unwrap_or(0);
1205        let mut body = vec![0_u8; content_length];
1206        if content_length > 0 {
1207            stream.read_exact(&mut body).ok()?;
1208        }
1209        Some((method, path, body))
1210    }
1211
1212    fn write_response(stream: &mut TcpStream, status: u16, body: &Value) {
1213        let body = serde_json::to_vec(body).expect("serialize mock response");
1214        let status_text = if status == 200 { "OK" } else { "Not Found" };
1215        let header = format!(
1216            "HTTP/1.1 {status} {status_text}\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n",
1217            body.len()
1218        );
1219        stream
1220            .write_all(header.as_bytes())
1221            .expect("write response header");
1222        stream.write_all(&body).expect("write response body");
1223        stream.flush().expect("flush response");
1224    }
1225
1226    fn full_capabilities() -> Value {
1227        serde_json::json!({
1228            "externalAgent": {
1229                "schema": EXTERNAL_AGENT_SCHEMA_ID,
1230                "pre_dispatch_checkpoint": true,
1231                "budget_cap": true,
1232                "idempotency": true,
1233                "reviewable_handoff": true,
1234                "dispatch": true,
1235                "operations": [A2A_PLAN_METHOD, A2A_DISPATCH_METHOD],
1236            },
1237        })
1238    }
1239
1240    fn dispatch_only_capabilities() -> Value {
1241        serde_json::json!({
1242            "externalAgent": {
1243                "schema": EXTERNAL_AGENT_SCHEMA_ID,
1244                "budget_cap": true,
1245                "idempotency": true,
1246                "reviewable_handoff": true,
1247                "dispatch": true,
1248                "operations": [A2A_DISPATCH_METHOD],
1249            },
1250        })
1251    }
1252
1253    fn request(
1254        server: &MockA2aServer,
1255        approved: bool,
1256        key: &str,
1257    ) -> ExternalAgentDelegationRequest {
1258        ExternalAgentDelegationRequest {
1259            target: server.target(),
1260            allow_cleartext: true,
1261            task: "edit src/lib.rs".to_string(),
1262            budget: ExternalAgentBudget {
1263                max_usd: Some(0.05),
1264                max_tokens: Some(500),
1265                ..ExternalAgentBudget::default()
1266            },
1267            checkpoint: ExternalAgentCheckpointPolicy {
1268                approved,
1269                allow_local_fallback: false,
1270                ..ExternalAgentCheckpointPolicy::default()
1271            },
1272            idempotency_key: Some(key.to_string()),
1273            expected_scope: vec!["src/lib.rs".to_string()],
1274            ..ExternalAgentDelegationRequest::default()
1275        }
1276    }
1277
1278    fn cancel_channel() -> (broadcast::Sender<()>, broadcast::Receiver<()>) {
1279        broadcast::channel(1)
1280    }
1281
1282    #[tokio::test]
1283    async fn checkpoint_is_required_before_dispatch_then_replayed() {
1284        reset_external_agent_state();
1285        let server = MockA2aServer::new(
1286            full_capabilities(),
1287            serde_json::json!({
1288                "checkpoint_id": "chk_1",
1289                "plan": "Inspect src/lib.rs, edit only the requested area, and return a reviewable diff.",
1290                "expected_scope": ["src/lib.rs"],
1291            }),
1292            serde_json::json!({
1293                "status": "completed",
1294                "diff": "--- a/src/lib.rs\n+++ b/src/lib.rs\n@@\n-ok\n+better\n",
1295                "budget_used": {"usd": 0.02, "tokens": 120},
1296                "confidence": 0.91,
1297            }),
1298        );
1299
1300        let (_tx, mut rx) = cancel_channel();
1301        let checkpoint = delegate_external_agent(request(&server, false, "idem-1"), &mut rx)
1302            .await
1303            .expect("checkpoint response");
1304        assert_eq!(checkpoint.status, "checkpoint_required");
1305        assert!(checkpoint.allow_cleartext);
1306        assert_eq!(server.plan_count(), 1);
1307        assert_eq!(server.dispatch_count(), 0);
1308
1309        let (_tx, mut rx) = cancel_channel();
1310        let completed = delegate_external_agent(request(&server, true, "idem-1"), &mut rx)
1311            .await
1312            .expect("approved dispatch");
1313        assert_eq!(completed.status, "completed");
1314        assert_eq!(
1315            completed.budget_used.as_ref().and_then(|used| used.tokens),
1316            Some(120)
1317        );
1318        assert_eq!(server.plan_count(), 1);
1319        assert_eq!(server.dispatch_count(), 1);
1320        assert!(completed.handoff.is_some());
1321        assert!(completed
1322            .artifacts
1323            .iter()
1324            .any(|artifact| { artifact.get("kind").and_then(Value::as_str) == Some("diff") }));
1325
1326        let (_tx, mut rx) = cancel_channel();
1327        let replayed = delegate_external_agent(request(&server, true, "idem-1"), &mut rx)
1328            .await
1329            .expect("idempotent replay");
1330        assert_eq!(replayed.status, "replayed");
1331        assert!(replayed.replayed);
1332        assert_eq!(server.dispatch_count(), 1);
1333
1334        let dispatch_request = server
1335            .requests()
1336            .into_iter()
1337            .find(|request| {
1338                request.get("method").and_then(Value::as_str) == Some(A2A_DISPATCH_METHOD)
1339            })
1340            .expect("dispatch request recorded");
1341        assert_eq!(
1342            dispatch_request
1343                .pointer("/params/budget/max_usd")
1344                .and_then(Value::as_f64),
1345            Some(0.05)
1346        );
1347        assert_eq!(
1348            dispatch_request
1349                .pointer("/params/idempotency_key")
1350                .and_then(Value::as_str),
1351            Some("idem-1")
1352        );
1353    }
1354
1355    #[tokio::test]
1356    async fn refuses_when_checkpoint_capability_is_missing() {
1357        reset_external_agent_state();
1358        let server = MockA2aServer::new(
1359            dispatch_only_capabilities(),
1360            serde_json::json!({"plan": "should not be called"}),
1361            serde_json::json!({"status": "completed"}),
1362        );
1363
1364        let (_tx, mut rx) = cancel_channel();
1365        let envelope = delegate_external_agent(request(&server, false, "idem-2"), &mut rx)
1366            .await
1367            .expect("refusal envelope");
1368        assert_eq!(envelope.status, "refused");
1369        assert!(envelope
1370            .error
1371            .as_deref()
1372            .unwrap_or_default()
1373            .contains("pre-dispatch checkpoint"));
1374        assert_eq!(server.plan_count(), 0);
1375        assert_eq!(server.dispatch_count(), 0);
1376    }
1377
1378    #[tokio::test]
1379    async fn refuses_before_plan_when_dispatch_capabilities_are_missing() {
1380        reset_external_agent_state();
1381        let server = MockA2aServer::new(
1382            serde_json::json!({
1383                "externalAgent": {
1384                    "pre_dispatch_checkpoint": true,
1385                    "dispatch": true,
1386                    "operations": [A2A_PLAN_METHOD, A2A_DISPATCH_METHOD],
1387                },
1388            }),
1389            serde_json::json!({"plan": "should not be called"}),
1390            serde_json::json!({"status": "completed"}),
1391        );
1392
1393        let (_tx, mut rx) = cancel_channel();
1394        let envelope =
1395            delegate_external_agent(request(&server, false, "idem-missing-caps"), &mut rx)
1396                .await
1397                .expect("refusal envelope");
1398        assert_eq!(envelope.status, "refused");
1399        assert!(envelope
1400            .error
1401            .as_deref()
1402            .unwrap_or_default()
1403            .contains("budget_cap"));
1404        assert_eq!(server.plan_count(), 0);
1405        assert_eq!(server.dispatch_count(), 0);
1406    }
1407
1408    #[tokio::test]
1409    async fn local_checkpoint_fallback_still_requires_explicit_approval() {
1410        reset_external_agent_state();
1411        let server = MockA2aServer::new(
1412            dispatch_only_capabilities(),
1413            serde_json::json!({"plan": "should not be called"}),
1414            serde_json::json!({
1415                "status": "completed",
1416                "budget_used": {"usd": 0.01},
1417                "files": ["src/lib.rs"],
1418            }),
1419        );
1420        let mut unapproved = request(&server, false, "idem-3");
1421        unapproved.checkpoint.allow_local_fallback = true;
1422        unapproved.checkpoint.local_plan = Some("Local plan approved by host.".to_string());
1423
1424        let (_tx, mut rx) = cancel_channel();
1425        let checkpoint = delegate_external_agent(unapproved.clone(), &mut rx)
1426            .await
1427            .expect("local checkpoint");
1428        assert_eq!(checkpoint.status, "checkpoint_required");
1429        assert_eq!(
1430            checkpoint
1431                .checkpoint
1432                .as_ref()
1433                .map(|value| value.source.as_str()),
1434            Some("local_fallback")
1435        );
1436        assert_eq!(server.plan_count(), 0);
1437        assert_eq!(server.dispatch_count(), 0);
1438
1439        let mut approved = unapproved;
1440        approved.checkpoint.approved = true;
1441        let (_tx, mut rx) = cancel_channel();
1442        let completed = delegate_external_agent(approved, &mut rx)
1443            .await
1444            .expect("approved local fallback dispatch");
1445        assert_eq!(completed.status, "completed");
1446        assert_eq!(server.plan_count(), 0);
1447        assert_eq!(server.dispatch_count(), 1);
1448    }
1449
1450    #[tokio::test]
1451    async fn over_budget_result_is_reviewable_but_marked() {
1452        reset_external_agent_state();
1453        let server = MockA2aServer::new(
1454            full_capabilities(),
1455            serde_json::json!({"checkpoint_id": "chk_budget", "plan": "Spend less than cap."}),
1456            serde_json::json!({
1457                "status": "completed",
1458                "diff": "diff --git a/file b/file\n",
1459                "budget_used": {"usd": 0.10, "tokens": 100},
1460            }),
1461        );
1462        let mut first = request(&server, false, "idem-4");
1463        first.budget.max_usd = Some(0.05);
1464        let (_tx, mut rx) = cancel_channel();
1465        delegate_external_agent(first, &mut rx)
1466            .await
1467            .expect("checkpoint");
1468
1469        let mut approved = request(&server, true, "idem-4");
1470        approved.budget.max_usd = Some(0.05);
1471        let (_tx, mut rx) = cancel_channel();
1472        let envelope = delegate_external_agent(approved, &mut rx)
1473            .await
1474            .expect("budget-marked envelope");
1475        assert_eq!(envelope.status, "budget_exceeded");
1476        assert!(envelope
1477            .error
1478            .as_deref()
1479            .unwrap_or_default()
1480            .contains("budget"));
1481        assert!(envelope.handoff.is_some());
1482        assert_eq!(server.dispatch_count(), 1);
1483    }
1484}