Skip to main content

acp_runtime/
agent.rs

1// Copyright 2026 ACP Project
2// Licensed under the Apache License, Version 2.0
3// See LICENSE file for details.
4
5use std::collections::{HashMap, HashSet};
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use serde::Serialize;
11use serde_json::{Map, Value};
12use uuid::Uuid;
13
14use crate::amqp_transport::AmqpTransportClient;
15use crate::capabilities::AgentCapabilities;
16use crate::constants::{ACP_VERSION, DEFAULT_CRYPTO_SUITE};
17use crate::crypto;
18use crate::discovery::DiscoveryClient;
19use crate::errors::{AcpError, AcpResult, FailReason};
20use crate::http_security::{HttpSecurityPolicy, validate_http_client_policy, validate_http_url};
21use crate::identity::{
22    AgentIdentity, parse_agent_id, read_identity, verify_identity_document, write_identity,
23};
24use crate::key_provider::{
25    IdentityKeyMaterial, KeyProvider, KeyProviderInfo, LocalKeyProvider, VaultKeyProvider,
26};
27use crate::messages::{
28    AcpMessage, CompensateInstruction, DeliveryMode, DeliveryOutcome, DeliveryState, Envelope,
29    MessageClass, SendResult, build_ack_payload, build_fail_payload,
30};
31use crate::mqtt_transport::MqttTransportClient;
32use crate::options::AcpAgentOptions;
33use crate::transport::{TransportClient, TransportResponse};
34use crate::well_known::build_well_known_document;
35
36#[derive(Debug, Clone, Serialize)]
37pub struct InboundResult {
38    pub state: DeliveryState,
39    pub reason_code: Option<String>,
40    pub detail: Option<String>,
41    pub decrypted_payload: Option<Map<String, Value>>,
42    pub response_message: Option<Map<String, Value>>,
43}
44
45#[derive(Debug, Clone)]
46pub struct DecryptedMessage {
47    pub message: AcpMessage,
48    pub payload: Map<String, Value>,
49}
50
51pub type InboundHandlerFn =
52    dyn for<'a> Fn(&'a Map<String, Value>, &'a Envelope) -> Option<Map<String, Value>>;
53
54#[derive(Debug, Clone)]
55pub struct CapabilityRequestResult {
56    pub result: SendResult,
57    pub capabilities: Option<Map<String, Value>>,
58}
59
60#[derive(Debug, Clone)]
61pub struct AcpAgent {
62    pub identity: AgentIdentity,
63    pub identity_document: Map<String, Value>,
64    pub discovery: DiscoveryClient,
65    pub transport: TransportClient,
66    pub amqp_transport: Option<AmqpTransportClient>,
67    pub mqtt_transport: Option<MqttTransportClient>,
68    pub capabilities: AgentCapabilities,
69    pub storage_dir: PathBuf,
70    pub trust_profile: String,
71    pub relay_url: String,
72    pub default_delivery_mode: DeliveryMode,
73    pub key_provider_info: KeyProviderInfo,
74    delivery_states: HashMap<String, HashMap<String, String>>,
75    dedup: DedupStore,
76}
77
78impl AcpAgent {
79    pub fn load_or_create(agent_id: &str, options: Option<AcpAgentOptions>) -> AcpResult<Self> {
80        parse_agent_id(agent_id)?;
81        let options = options.unwrap_or_default();
82        std::fs::create_dir_all(&options.storage_dir)?;
83
84        let key_provider = resolve_key_provider(&options)?;
85        let key_provider_info = key_provider.describe();
86
87        let provider_tls_material = key_provider.load_tls_material(agent_id).ok();
88        let provider_ca_bundle = key_provider.load_ca_bundle(agent_id).ok().flatten();
89
90        let effective_ca_file = first_non_blank(&[
91            options.ca_file.clone(),
92            provider_tls_material
93                .as_ref()
94                .and_then(|m| m.ca_file.clone()),
95            provider_ca_bundle,
96        ]);
97        let effective_cert_file = first_non_blank(&[
98            options.cert_file.clone(),
99            provider_tls_material
100                .as_ref()
101                .and_then(|m| m.cert_file.clone()),
102        ]);
103        let effective_key_file = first_non_blank(&[
104            options.key_file.clone(),
105            provider_tls_material
106                .as_ref()
107                .and_then(|m| m.key_file.clone()),
108        ]);
109
110        let policy = HttpSecurityPolicy {
111            allow_insecure_http: options.allow_insecure_http,
112            allow_insecure_tls: options.allow_insecure_tls,
113            mtls_enabled: options.mtls_enabled,
114            ca_file: effective_ca_file.clone(),
115            cert_file: effective_cert_file.clone(),
116            key_file: effective_key_file.clone(),
117        };
118        validate_http_client_policy(&policy, "Agent HTTP security configuration")?;
119        if let Some(endpoint) = options.endpoint.as_deref() {
120            validate_http_url(
121                endpoint,
122                policy.allow_insecure_http,
123                policy.mtls_enabled,
124                "Agent direct endpoint configuration",
125            )?;
126        }
127        validate_http_url(
128            &options.relay_url,
129            policy.allow_insecure_http,
130            policy.mtls_enabled,
131            "Agent relay URL configuration",
132        )?;
133        for relay_hint in &options.relay_hints {
134            validate_http_url(
135                relay_hint,
136                policy.allow_insecure_http,
137                policy.mtls_enabled,
138                "Agent relay hint configuration",
139            )?;
140        }
141        for directory_hint in &options.enterprise_directory_hints {
142            validate_http_url(
143                directory_hint,
144                policy.allow_insecure_http,
145                policy.mtls_enabled,
146                "Agent enterprise directory hint configuration",
147            )?;
148        }
149
150        let local_amqp_service = build_local_amqp_service(agent_id, &options)?;
151        let local_mqtt_service = build_local_mqtt_service(agent_id, &options)?;
152
153        let provider_identity_keys = key_provider.load_identity_keys(agent_id).ok();
154        let external_key_provider = !matches_provider_local(&key_provider_info);
155
156        let (identity, identity_document, capabilities) =
157            match read_identity(&options.storage_dir, agent_id)? {
158                None => {
159                    let identity = if let Some(keys) = &provider_identity_keys {
160                        identity_from_provider(agent_id, keys)?
161                    } else if external_key_provider {
162                        return Err(AcpError::KeyProvider(
163                            "Unable to load identity keys from key provider".to_string(),
164                        ));
165                    } else {
166                        AgentIdentity::create(agent_id)?
167                    };
168                    let capabilities = options
169                        .capabilities
170                        .clone()
171                        .unwrap_or_else(|| AgentCapabilities::new(agent_id.to_string()));
172                    let mut identity_document = identity.build_identity_document(
173                        options.endpoint.as_deref(),
174                        &options.relay_hints,
175                        &options.trust_profile,
176                        Some(&capabilities.to_map()),
177                        365,
178                        local_amqp_service.as_ref(),
179                        local_mqtt_service.as_ref(),
180                        if options.mtls_enabled {
181                            Some("mtls")
182                        } else {
183                            None
184                        },
185                        if options.mtls_enabled {
186                            Some("mtls")
187                        } else {
188                            None
189                        },
190                    )?;
191                    apply_http_security_profile(&mut identity_document, options.mtls_enabled);
192                    write_identity(&options.storage_dir, &identity, &identity_document)?;
193                    (identity, identity_document, capabilities)
194                }
195                Some(bundle) => {
196                    let mut identity = bundle.identity;
197                    let mut identity_document = bundle.identity_document;
198                    if let Some(keys) = &provider_identity_keys {
199                        identity = apply_provider_keys(&identity, keys)?;
200                    } else if external_key_provider {
201                        return Err(AcpError::KeyProvider(
202                            "Unable to load identity keys from key provider".to_string(),
203                        ));
204                    }
205                    let valid_document = verify_identity_document(&identity_document);
206                    let capabilities = options.capabilities.clone().unwrap_or_else(|| {
207                        AgentCapabilities::from_map(
208                            identity_document
209                                .get("capabilities")
210                                .and_then(Value::as_object),
211                            agent_id,
212                        )
213                    });
214                    let should_rewrite = !valid_document
215                        || options.endpoint.is_some()
216                        || !options.relay_hints.is_empty()
217                        || options.capabilities.is_some()
218                        || local_amqp_service.is_some()
219                        || local_mqtt_service.is_some();
220                    if should_rewrite {
221                        let existing_service = identity_document
222                            .get("service")
223                            .and_then(Value::as_object)
224                            .cloned()
225                            .unwrap_or_default();
226                        let existing_endpoint = existing_service
227                            .get("direct_endpoint")
228                            .and_then(Value::as_str)
229                            .map(str::to_string);
230                        let existing_hints = existing_service
231                            .get("relay_hints")
232                            .and_then(Value::as_array)
233                            .map(|items| {
234                                items
235                                    .iter()
236                                    .filter_map(Value::as_str)
237                                    .map(str::to_string)
238                                    .collect::<Vec<_>>()
239                            })
240                            .unwrap_or_default();
241                        let existing_amqp_service = existing_service
242                            .get("amqp")
243                            .and_then(Value::as_object)
244                            .cloned();
245                        let existing_mqtt_service = existing_service
246                            .get("mqtt")
247                            .and_then(Value::as_object)
248                            .cloned();
249                        identity_document = identity.build_identity_document(
250                            options.endpoint.as_deref().or(existing_endpoint.as_deref()),
251                            if options.relay_hints.is_empty() {
252                                &existing_hints
253                            } else {
254                                &options.relay_hints
255                            },
256                            &options.trust_profile,
257                            Some(&capabilities.to_map()),
258                            365,
259                            local_amqp_service
260                                .as_ref()
261                                .or(existing_amqp_service.as_ref()),
262                            local_mqtt_service
263                                .as_ref()
264                                .or(existing_mqtt_service.as_ref()),
265                            if options.mtls_enabled {
266                                Some("mtls")
267                            } else {
268                                None
269                            },
270                            if options.mtls_enabled {
271                                Some("mtls")
272                            } else {
273                                None
274                            },
275                        )?;
276                        apply_http_security_profile(&mut identity_document, options.mtls_enabled);
277                        write_identity(&options.storage_dir, &identity, &identity_document)?;
278                    }
279                    (identity, identity_document, capabilities)
280                }
281            };
282
283        let mut effective_relay_hints = if !options.relay_hints.is_empty() {
284            options.relay_hints.clone()
285        } else {
286            identity_document
287                .get("service")
288                .and_then(Value::as_object)
289                .and_then(|service| service.get("relay_hints"))
290                .and_then(Value::as_array)
291                .map(|items| {
292                    items
293                        .iter()
294                        .filter_map(Value::as_str)
295                        .map(str::to_string)
296                        .collect::<Vec<_>>()
297                })
298                .unwrap_or_default()
299        };
300        if !effective_relay_hints.contains(&options.relay_url) {
301            effective_relay_hints.push(options.relay_url.clone());
302        }
303
304        let mut discovery = DiscoveryClient::new(
305            Some(options.storage_dir.join("discovery_cache.json")),
306            Some(options.discovery_scheme.clone()),
307            Some(effective_relay_hints),
308            Some(options.enterprise_directory_hints.clone()),
309            options.http_timeout_seconds,
310            policy.allow_insecure_http,
311            policy.allow_insecure_tls,
312            policy.ca_file.clone(),
313            policy.mtls_enabled,
314            policy.cert_file.clone(),
315            policy.key_file.clone(),
316        )?;
317        discovery.seed(identity_document.clone())?;
318
319        let amqp_transport = if let Some(transport) = options.amqp_transport.clone() {
320            Some(transport)
321        } else if let Some(broker_url) = options.amqp_broker_url.as_deref() {
322            Some(AmqpTransportClient::new(
323                broker_url.to_string(),
324                Some(options.amqp_exchange.clone()),
325                Some(options.amqp_exchange_type.clone()),
326                options.http_timeout_seconds,
327            )?)
328        } else {
329            None
330        };
331
332        let mqtt_transport = if let Some(transport) = options.mqtt_transport.clone() {
333            Some(transport)
334        } else if let Some(broker_url) = options.mqtt_broker_url.as_deref() {
335            Some(MqttTransportClient::new(
336                broker_url.to_string(),
337                Some(options.mqtt_qos),
338                Some(options.mqtt_topic_prefix.clone()),
339                options.http_timeout_seconds,
340                30,
341            )?)
342        } else {
343            None
344        };
345
346        Ok(Self {
347            identity,
348            identity_document,
349            discovery,
350            transport: TransportClient::new(options.http_timeout_seconds, &policy)?,
351            amqp_transport,
352            mqtt_transport,
353            capabilities,
354            storage_dir: options.storage_dir,
355            trust_profile: options.trust_profile,
356            relay_url: options.relay_url,
357            default_delivery_mode: options.default_delivery_mode,
358            key_provider_info,
359            delivery_states: HashMap::new(),
360            dedup: DedupStore::new(Duration::from_secs(3600)),
361        })
362    }
363
364    pub fn agent_id(&self) -> &str {
365        &self.identity.agent_id
366    }
367
368    pub fn get_delivery_states(&self) -> &HashMap<String, HashMap<String, String>> {
369        &self.delivery_states
370    }
371
372    pub fn build_well_known_document(
373        &self,
374        base_url: Option<&str>,
375        identity_document_url: Option<&str>,
376    ) -> AcpResult<Map<String, Value>> {
377        let resolved_base_url = base_url
378            .map(str::to_string)
379            .or_else(|| {
380                self.identity_document
381                    .get("service")
382                    .and_then(Value::as_object)
383                    .and_then(|service| service.get("direct_endpoint"))
384                    .and_then(Value::as_str)
385                    .and_then(base_url_from_endpoint)
386            })
387            .ok_or_else(|| {
388                AcpError::Validation(
389                    "Unable to build /.well-known/acp metadata without base_url or direct_endpoint"
390                        .to_string(),
391                )
392            })?;
393        build_well_known_document(
394            &self.identity_document,
395            &resolved_base_url,
396            identity_document_url,
397            Some(ACP_VERSION),
398        )
399    }
400
401    pub fn register_identity_document(
402        &mut self,
403        identity_document: Map<String, Value>,
404    ) -> AcpResult<()> {
405        self.discovery.register_identity_document(identity_document)
406    }
407
408    pub fn resolve_well_known(
409        &mut self,
410        base_url: &str,
411        expected_agent_id: Option<&str>,
412    ) -> AcpResult<Map<String, Value>> {
413        self.discovery
414            .resolve_well_known(base_url, expected_agent_id)
415    }
416
417    pub fn send(
418        &mut self,
419        recipients: Vec<String>,
420        payload: Map<String, Value>,
421        context: Option<String>,
422        message_class: MessageClass,
423        expires_in_seconds: i64,
424        correlation_id: Option<String>,
425        in_reply_to: Option<String>,
426        delivery_mode: Option<DeliveryMode>,
427    ) -> AcpResult<SendResult> {
428        if recipients.is_empty() {
429            return Err(AcpError::InvalidArgument(
430                "send() requires at least one recipient".to_string(),
431            ));
432        }
433        let mode = delivery_mode.unwrap_or(self.default_delivery_mode);
434        let operation_id = Uuid::new_v4().to_string();
435        let context_id = context.unwrap_or_else(|| operation_id.clone());
436
437        let resolved = self.resolve_recipients(&recipients, mode)?;
438        if resolved.deliverable.is_empty() {
439            let result = SendResult {
440                operation_id: operation_id.clone(),
441                message_id: Uuid::new_v4().to_string(),
442                message_ids: Vec::new(),
443                outcomes: resolved.preflight_outcomes.clone(),
444            };
445            self.sync_delivery_states(&operation_id, &result.outcomes);
446            return Ok(result);
447        }
448
449        let mut outcomes = resolved.preflight_outcomes;
450        let mut message_ids = Vec::new();
451        let direct_targets = resolved
452            .deliverable
453            .iter()
454            .filter(|target| target.channel == "direct")
455            .cloned()
456            .collect::<Vec<_>>();
457        let relay_targets = resolved
458            .deliverable
459            .iter()
460            .filter(|target| target.channel == "relay")
461            .cloned()
462            .collect::<Vec<_>>();
463        let amqp_targets = resolved
464            .deliverable
465            .iter()
466            .filter(|target| target.channel == "amqp")
467            .cloned()
468            .collect::<Vec<_>>();
469        let mqtt_targets = resolved
470            .deliverable
471            .iter()
472            .filter(|target| target.channel == "mqtt")
473            .cloned()
474            .collect::<Vec<_>>();
475
476        if !direct_targets.is_empty() {
477            let message = self.build_message(
478                direct_targets.iter().map(|r| r.recipient.clone()).collect(),
479                &payload,
480                &to_public_key_map(&direct_targets),
481                message_class,
482                &context_id,
483                Some(operation_id.clone()),
484                expires_in_seconds,
485                correlation_id.clone(),
486                in_reply_to.clone(),
487            )?;
488            message_ids.push(message.envelope.message_id.clone());
489            outcomes.extend(self.deliver_direct(&message, &direct_targets));
490        }
491        if !relay_targets.is_empty() {
492            let message = self.build_message(
493                relay_targets.iter().map(|r| r.recipient.clone()).collect(),
494                &payload,
495                &to_public_key_map(&relay_targets),
496                message_class,
497                &context_id,
498                Some(operation_id.clone()),
499                expires_in_seconds,
500                correlation_id.clone(),
501                in_reply_to.clone(),
502            )?;
503            message_ids.push(message.envelope.message_id.clone());
504            outcomes.extend(self.deliver_via_relay(&message, &relay_targets));
505        }
506        for target in &amqp_targets {
507            let recipient_public_keys =
508                HashMap::from([(target.recipient.clone(), target.public_key.clone())]);
509            let message = self.build_message(
510                vec![target.recipient.clone()],
511                &payload,
512                &recipient_public_keys,
513                message_class,
514                &context_id,
515                Some(operation_id.clone()),
516                expires_in_seconds,
517                correlation_id.clone(),
518                in_reply_to.clone(),
519            )?;
520            message_ids.push(message.envelope.message_id.clone());
521            outcomes.push(self.deliver_via_amqp(&message, target));
522        }
523        for target in &mqtt_targets {
524            let recipient_public_keys =
525                HashMap::from([(target.recipient.clone(), target.public_key.clone())]);
526            let message = self.build_message(
527                vec![target.recipient.clone()],
528                &payload,
529                &recipient_public_keys,
530                message_class,
531                &context_id,
532                Some(operation_id.clone()),
533                expires_in_seconds,
534                correlation_id.clone(),
535                in_reply_to.clone(),
536            )?;
537            message_ids.push(message.envelope.message_id.clone());
538            outcomes.push(self.deliver_via_mqtt(&message, target));
539        }
540        if message_ids.is_empty() {
541            message_ids.push(Uuid::new_v4().to_string());
542        }
543        let result = SendResult {
544            operation_id: operation_id.clone(),
545            message_id: message_ids[0].clone(),
546            message_ids,
547            outcomes,
548        };
549        self.sync_delivery_states(&operation_id, &result.outcomes);
550        Ok(result)
551    }
552
553    pub fn send_basic(
554        &mut self,
555        recipients: Vec<String>,
556        payload: Map<String, Value>,
557        context: Option<String>,
558    ) -> AcpResult<SendResult> {
559        self.send(
560            recipients,
561            payload,
562            context,
563            MessageClass::Send,
564            300,
565            None,
566            None,
567            Some(self.default_delivery_mode),
568        )
569    }
570
571    pub fn send_compensate(
572        &mut self,
573        recipients: Vec<String>,
574        original_operation_id: &str,
575        reason: &str,
576        actions: Vec<Map<String, Value>>,
577        context: Option<String>,
578        delivery_mode: Option<DeliveryMode>,
579    ) -> AcpResult<SendResult> {
580        let instruction = CompensateInstruction {
581            operation_id: original_operation_id.to_string(),
582            reason: reason.to_string(),
583            actions,
584        };
585        let mut payload = Map::new();
586        payload.insert(
587            "compensation".to_string(),
588            serde_json::to_value(instruction).unwrap_or(Value::Null),
589        );
590        self.send(
591            recipients,
592            payload,
593            context.or_else(|| Some(format!("compensate:{original_operation_id}"))),
594            MessageClass::Compensate,
595            300,
596            Some(original_operation_id.to_string()),
597            None,
598            delivery_mode,
599        )
600    }
601
602    pub fn decrypt_message_for_self(
603        &mut self,
604        raw_message: &Map<String, Value>,
605    ) -> AcpResult<DecryptedMessage> {
606        let message = AcpMessage::from_map(raw_message)?;
607        self.validate_envelope_for_inbound(&message.envelope)?;
608        if !message
609            .envelope
610            .recipients
611            .iter()
612            .any(|recipient| recipient == self.agent_id())
613        {
614            return Err(AcpError::Processing {
615                reason: FailReason::PolicyRejected,
616                detail: "Message is not addressed to this agent".to_string(),
617            });
618        }
619        let sender_doc =
620            self.resolve_sender_identity_document(raw_message, &message.envelope.sender)?;
621        let sender_signing_key = sender_doc
622            .get("keys")
623            .and_then(Value::as_object)
624            .and_then(|keys| keys.get("signing"))
625            .and_then(Value::as_object)
626            .and_then(|signing| signing.get("public_key"))
627            .and_then(Value::as_str)
628            .ok_or_else(|| AcpError::Processing {
629                reason: FailReason::InvalidSignature,
630                detail: "Sender signing public key missing".to_string(),
631            })?;
632        if !crypto::verify_protected_payload_signature(
633            &message.envelope,
634            &message.protected_payload,
635            sender_signing_key,
636        ) {
637            return Err(AcpError::Processing {
638                reason: FailReason::InvalidSignature,
639                detail: "Message signature verification failed".to_string(),
640            });
641        }
642        let payload = crypto::decrypt_for_recipient(
643            &message.envelope,
644            &message.protected_payload,
645            self.agent_id(),
646            &self.identity.encryption_private_key,
647        )?;
648        Ok(DecryptedMessage { message, payload })
649    }
650
651    pub fn receive(
652        &mut self,
653        raw_message: &Map<String, Value>,
654        handler: Option<&InboundHandlerFn>,
655    ) -> InboundResult {
656        let mut result = InboundResult {
657            state: DeliveryState::Failed,
658            reason_code: None,
659            detail: None,
660            decrypted_payload: None,
661            response_message: None,
662        };
663
664        let request_message = match AcpMessage::from_map(raw_message) {
665            Ok(message) => message,
666            Err(exc) => {
667                result.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
668                result.detail = Some(format!("Invalid ACP message structure: {exc}"));
669                return result;
670            }
671        };
672
673        let mut sender_identity_document: Option<Map<String, Value>> = None;
674
675        let processing_result = (|| -> AcpResult<()> {
676            self.validate_envelope_for_inbound(&request_message.envelope)?;
677            if !request_message
678                .envelope
679                .recipients
680                .iter()
681                .any(|recipient| recipient == self.agent_id())
682            {
683                return Err(AcpError::Processing {
684                    reason: FailReason::PolicyRejected,
685                    detail: format!("Recipient {} not in message recipients", self.agent_id()),
686                });
687            }
688
689            let sender_doc = self
690                .resolve_sender_identity_document(raw_message, &request_message.envelope.sender)?;
691            sender_identity_document = Some(sender_doc.clone());
692            let sender_signing_key = sender_doc
693                .get("keys")
694                .and_then(Value::as_object)
695                .and_then(|keys| keys.get("signing"))
696                .and_then(Value::as_object)
697                .and_then(|signing| signing.get("public_key"))
698                .and_then(Value::as_str)
699                .ok_or_else(|| AcpError::Processing {
700                    reason: FailReason::InvalidSignature,
701                    detail: "Sender signing key missing from identity document".to_string(),
702                })?;
703            if !crypto::verify_protected_payload_signature(
704                &request_message.envelope,
705                &request_message.protected_payload,
706                sender_signing_key,
707            ) {
708                return Err(AcpError::Processing {
709                    reason: FailReason::InvalidSignature,
710                    detail: "Signature verification failed".to_string(),
711                });
712            }
713
714            if self
715                .dedup
716                .is_duplicate(&request_message.envelope.message_id)
717            {
718                result.state = DeliveryState::Acknowledged;
719                result.detail = Some("Duplicate message acknowledged".to_string());
720                if !matches!(
721                    request_message.envelope.message_class,
722                    MessageClass::Ack | MessageClass::Fail
723                ) {
724                    let duplicate_ack = self.create_response_message(
725                        &sender_doc,
726                        &request_message.envelope,
727                        MessageClass::Ack,
728                        build_ack_payload(&request_message.envelope.message_id, "duplicate"),
729                    )?;
730                    result.response_message = Some(duplicate_ack.to_map()?);
731                }
732                return Ok(());
733            }
734
735            let decrypted_payload = crypto::decrypt_for_recipient(
736                &request_message.envelope,
737                &request_message.protected_payload,
738                self.agent_id(),
739                &self.identity.encryption_private_key,
740            )?;
741            result.decrypted_payload = Some(decrypted_payload.clone());
742
743            let response_message =
744                if request_message.envelope.message_class == MessageClass::Capabilities {
745                    Some(self.create_response_message(
746                        &sender_doc,
747                        &request_message.envelope,
748                        MessageClass::Capabilities,
749                        self.capabilities.to_map(),
750                    )?)
751                } else {
752                    let mut ack_payload =
753                        build_ack_payload(&request_message.envelope.message_id, "accepted");
754                    if let Some(handler) = handler
755                        && let Some(handler_payload) =
756                            handler(&decrypted_payload, &request_message.envelope)
757                        && !handler_payload.is_empty()
758                    {
759                        ack_payload.insert("handler".to_string(), Value::Object(handler_payload));
760                    }
761                    if matches!(
762                        request_message.envelope.message_class,
763                        MessageClass::Ack | MessageClass::Fail
764                    ) {
765                        None
766                    } else {
767                        Some(self.create_response_message(
768                            &sender_doc,
769                            &request_message.envelope,
770                            MessageClass::Ack,
771                            ack_payload,
772                        )?)
773                    }
774                };
775
776            self.dedup
777                .mark_processed(&request_message.envelope.message_id);
778            result.state = DeliveryState::Acknowledged;
779            result.response_message = response_message
780                .map(|message| message.to_map())
781                .transpose()?;
782            Ok(())
783        })();
784
785        if let Err(exc) = processing_result {
786            let (reason_code, detail) = match exc {
787                AcpError::Processing { reason, detail } => (reason.as_str().to_string(), detail),
788                _ => (
789                    FailReason::PolicyRejected.as_str().to_string(),
790                    exc.to_string(),
791                ),
792            };
793            result.reason_code = Some(reason_code.clone());
794            result.detail = Some(detail.clone());
795            if let Some(sender_doc) = sender_identity_document {
796                let fail_response = self.create_response_message(
797                    &sender_doc,
798                    &request_message.envelope,
799                    MessageClass::Fail,
800                    build_fail_payload(reason_code, detail, false),
801                );
802                result.response_message =
803                    fail_response.ok().and_then(|message| message.to_map().ok());
804            }
805        }
806        result
807    }
808
809    pub fn request_capabilities(&mut self, recipient: &str) -> AcpResult<CapabilityRequestResult> {
810        let mut payload = Map::new();
811        payload.insert(
812            "request".to_string(),
813            Value::String("capabilities".to_string()),
814        );
815        let result = self.send(
816            vec![recipient.to_string()],
817            payload,
818            Some(format!("capabilities:{}", Uuid::new_v4())),
819            MessageClass::Capabilities,
820            300,
821            None,
822            None,
823            Some(self.default_delivery_mode),
824        )?;
825        let mut response_payload = None;
826        for outcome in &result.outcomes {
827            let Some(response_message) = &outcome.response_message else {
828                continue;
829            };
830            if let Ok(decrypted) = self.decrypt_message_for_self(response_message)
831                && decrypted.message.envelope.message_class == MessageClass::Capabilities
832            {
833                response_payload = Some(decrypted.payload);
834                break;
835            }
836        }
837        Ok(CapabilityRequestResult {
838            result,
839            capabilities: response_payload,
840        })
841    }
842
843    pub fn consume_from_amqp<F>(
844        &mut self,
845        max_messages: usize,
846        handler: Option<F>,
847    ) -> AcpResult<usize>
848    where
849        F: Fn(&Map<String, Value>, &Envelope) -> Option<Map<String, Value>> + Send + 'static,
850    {
851        let amqp_transport = self.amqp_transport.clone().ok_or_else(|| {
852            AcpError::Transport("consume_from_amqp requires an AMQP-configured agent".to_string())
853        })?;
854        let amqp_service = self
855            .identity_document
856            .get("service")
857            .and_then(Value::as_object)
858            .and_then(|service| service.get("amqp"))
859            .and_then(Value::as_object)
860            .cloned()
861            .ok_or_else(|| {
862                AcpError::Transport(
863                    "Identity document is missing service.amqp configuration".to_string(),
864                )
865            })?;
866        let agent_ptr = Arc::new(std::sync::Mutex::new(self.clone()));
867        let agent_for_handler = Arc::clone(&agent_ptr);
868        let processed = amqp_transport.consume(
869            self.agent_id(),
870            move |raw_message| {
871                let Ok(mut guard) = agent_for_handler.lock() else {
872                    return false;
873                };
874                let inbound = guard.receive(
875                    raw_message,
876                    handler.as_ref().map(|inner| inner as &InboundHandlerFn),
877                );
878                if let Some(response_message) = inbound.response_message.clone() {
879                    if guard
880                        .publish_amqp_response_message(raw_message, &response_message)
881                        .is_err()
882                    {
883                        return false;
884                    }
885                }
886                matches!(
887                    inbound.state,
888                    DeliveryState::Acknowledged
889                        | DeliveryState::Failed
890                        | DeliveryState::Declined
891                        | DeliveryState::Expired
892                )
893            },
894            Some(&amqp_service),
895            max_messages,
896        )?;
897        if let Ok(guard) = agent_ptr.lock() {
898            *self = guard.clone();
899        }
900        Ok(processed)
901    }
902
903    pub fn consume_from_mqtt<F>(
904        &mut self,
905        max_messages: usize,
906        handler: Option<F>,
907    ) -> AcpResult<usize>
908    where
909        F: Fn(&Map<String, Value>, &Envelope) -> Option<Map<String, Value>> + Send + 'static,
910    {
911        let mqtt_transport = self.mqtt_transport.clone().ok_or_else(|| {
912            AcpError::Transport("consume_from_mqtt requires an MQTT-configured agent".to_string())
913        })?;
914        let mqtt_service = self
915            .identity_document
916            .get("service")
917            .and_then(Value::as_object)
918            .and_then(|service| service.get("mqtt"))
919            .and_then(Value::as_object)
920            .cloned()
921            .ok_or_else(|| {
922                AcpError::Transport(
923                    "Identity document is missing service.mqtt configuration".to_string(),
924                )
925            })?;
926        let agent_ptr = Arc::new(std::sync::Mutex::new(self.clone()));
927        let agent_for_handler = Arc::clone(&agent_ptr);
928        let processed = mqtt_transport.consume(
929            self.agent_id(),
930            move |raw_message| {
931                let Ok(mut guard) = agent_for_handler.lock() else {
932                    return false;
933                };
934                let inbound = guard.receive(
935                    raw_message,
936                    handler.as_ref().map(|inner| inner as &InboundHandlerFn),
937                );
938                if let Some(response_message) = inbound.response_message.clone() {
939                    if guard
940                        .publish_mqtt_response_message(raw_message, &response_message)
941                        .is_err()
942                    {
943                        return false;
944                    }
945                }
946                matches!(
947                    inbound.state,
948                    DeliveryState::Acknowledged
949                        | DeliveryState::Failed
950                        | DeliveryState::Declined
951                        | DeliveryState::Expired
952                )
953            },
954            Some(&mqtt_service),
955            max_messages,
956            Duration::from_secs(1),
957        )?;
958        if let Ok(guard) = agent_ptr.lock() {
959            *self = guard.clone();
960        }
961        Ok(processed)
962    }
963
964    fn resolve_recipients(
965        &mut self,
966        recipients: &[String],
967        mode: DeliveryMode,
968    ) -> AcpResult<ResolvedRecipients> {
969        let mut deliverable = Vec::new();
970        let mut preflight_outcomes = Vec::new();
971        for recipient in recipients {
972            let identity_doc = match self.discovery.resolve(recipient) {
973                Ok(identity_doc) => identity_doc,
974                Err(exc) => {
975                    preflight_outcomes.push(failed_outcome(
976                        recipient,
977                        FailReason::PolicyRejected.as_str(),
978                        &exc.to_string(),
979                    ));
980                    continue;
981                }
982            };
983            let remote_capabilities = AgentCapabilities::from_map(
984                identity_doc.get("capabilities").and_then(Value::as_object),
985                recipient,
986            );
987            let capability_match = self.capabilities.choose_compatible(&remote_capabilities);
988            if !capability_match.compatible {
989                preflight_outcomes.push(failed_outcome(
990                    recipient,
991                    reason_for_capability_mismatch(capability_match.reason.as_deref()).as_str(),
992                    capability_match
993                        .reason
994                        .as_deref()
995                        .unwrap_or("No compatible capabilities"),
996                ));
997                continue;
998            }
999            let choice = self.choose_delivery_channel(&remote_capabilities, &identity_doc, mode)?;
1000            let Some(channel) = choice.channel else {
1001                preflight_outcomes.push(failed_outcome(
1002                    recipient,
1003                    FailReason::PolicyRejected.as_str(),
1004                    choice
1005                        .detail
1006                        .as_deref()
1007                        .unwrap_or("Delivery channel unavailable"),
1008                ));
1009                continue;
1010            };
1011            let recipient_public_key = identity_doc
1012                .get("keys")
1013                .and_then(Value::as_object)
1014                .and_then(|keys| keys.get("encryption"))
1015                .and_then(Value::as_object)
1016                .and_then(|enc| enc.get("public_key"))
1017                .and_then(Value::as_str)
1018                .map(str::to_string);
1019            let Some(recipient_public_key) = recipient_public_key else {
1020                preflight_outcomes.push(failed_outcome(
1021                    recipient,
1022                    FailReason::PolicyRejected.as_str(),
1023                    "Recipient identity document missing encryption public key",
1024                ));
1025                continue;
1026            };
1027            deliverable.push(ResolvedRecipient {
1028                recipient: recipient.to_string(),
1029                public_key: recipient_public_key,
1030                channel,
1031                endpoint: choice.endpoint,
1032                amqp_service: choice.amqp_service,
1033                mqtt_service: choice.mqtt_service,
1034            });
1035        }
1036        Ok(ResolvedRecipients {
1037            deliverable,
1038            preflight_outcomes,
1039        })
1040    }
1041
1042    fn choose_delivery_channel(
1043        &self,
1044        remote_capabilities: &AgentCapabilities,
1045        identity_document: &Map<String, Value>,
1046        mode: DeliveryMode,
1047    ) -> AcpResult<ChannelChoice> {
1048        let remote_transports: HashSet<String> = remote_capabilities
1049            .transports
1050            .iter()
1051            .map(|t| t.to_lowercase())
1052            .collect();
1053        let shared = self
1054            .capabilities
1055            .transports
1056            .iter()
1057            .filter(|transport| remote_transports.contains(&transport.to_lowercase()))
1058            .map(|transport| transport.to_lowercase())
1059            .collect::<Vec<_>>();
1060
1061        let service = identity_document
1062            .get("service")
1063            .and_then(Value::as_object)
1064            .cloned()
1065            .unwrap_or_default();
1066        let direct_endpoint = service
1067            .get("direct_endpoint")
1068            .and_then(Value::as_str)
1069            .map(str::to_string);
1070        let has_direct = direct_endpoint
1071            .as_deref()
1072            .map(|endpoint| !endpoint.trim().is_empty())
1073            .unwrap_or(false);
1074        let direct_available = has_direct
1075            && shared
1076                .iter()
1077                .any(|transport| matches!(transport.as_str(), "https" | "http" | "direct"));
1078        let relay_available =
1079            !self.relay_url.trim().is_empty() && shared.iter().any(|t| t == "relay");
1080        let amqp_service = service.get("amqp").and_then(Value::as_object).cloned();
1081        let amqp_available = shared.iter().any(|t| t == "amqp")
1082            && amqp_service
1083                .as_ref()
1084                .and_then(|service| service.get("broker_url"))
1085                .and_then(Value::as_str)
1086                .map(str::trim)
1087                .filter(|v| !v.is_empty())
1088                .is_some();
1089        let mqtt_service = service.get("mqtt").and_then(Value::as_object).cloned();
1090        let mqtt_available = shared.iter().any(|t| t == "mqtt")
1091            && mqtt_service
1092                .as_ref()
1093                .and_then(|service| service.get("broker_url"))
1094                .and_then(Value::as_str)
1095                .map(str::trim)
1096                .filter(|v| !v.is_empty())
1097                .is_some()
1098            && mqtt_service
1099                .as_ref()
1100                .and_then(|service| service.get("topic"))
1101                .and_then(Value::as_str)
1102                .map(str::trim)
1103                .filter(|v| !v.is_empty())
1104                .is_some();
1105
1106        match mode {
1107            DeliveryMode::Direct => {
1108                if direct_available {
1109                    return Ok(ChannelChoice::new(
1110                        Some("direct".to_string()),
1111                        direct_endpoint,
1112                        None,
1113                        None,
1114                        None,
1115                    ));
1116                }
1117                return Ok(ChannelChoice::new(
1118                    None,
1119                    None,
1120                    None,
1121                    None,
1122                    Some("No compatible direct transport and endpoint available".to_string()),
1123                ));
1124            }
1125            DeliveryMode::Relay => {
1126                if relay_available {
1127                    return Ok(ChannelChoice::new(
1128                        Some("relay".to_string()),
1129                        None,
1130                        None,
1131                        None,
1132                        None,
1133                    ));
1134                }
1135                return Ok(ChannelChoice::new(
1136                    None,
1137                    None,
1138                    None,
1139                    None,
1140                    Some("No compatible relay transport available".to_string()),
1141                ));
1142            }
1143            DeliveryMode::Amqp => {
1144                if amqp_available {
1145                    return Ok(ChannelChoice::new(
1146                        Some("amqp".to_string()),
1147                        None,
1148                        amqp_service,
1149                        None,
1150                        None,
1151                    ));
1152                }
1153                return Ok(ChannelChoice::new(
1154                    None,
1155                    None,
1156                    None,
1157                    None,
1158                    Some("No compatible AMQP transport available".to_string()),
1159                ));
1160            }
1161            DeliveryMode::Mqtt => {
1162                if mqtt_available {
1163                    return Ok(ChannelChoice::new(
1164                        Some("mqtt".to_string()),
1165                        None,
1166                        None,
1167                        mqtt_service,
1168                        None,
1169                    ));
1170                }
1171                return Ok(ChannelChoice::new(
1172                    None,
1173                    None,
1174                    None,
1175                    None,
1176                    Some("No compatible MQTT transport available".to_string()),
1177                ));
1178            }
1179            DeliveryMode::Auto => {}
1180        }
1181
1182        if direct_available {
1183            return Ok(ChannelChoice::new(
1184                Some("direct".to_string()),
1185                direct_endpoint,
1186                None,
1187                None,
1188                None,
1189            ));
1190        }
1191        if relay_available {
1192            return Ok(ChannelChoice::new(
1193                Some("relay".to_string()),
1194                None,
1195                None,
1196                None,
1197                None,
1198            ));
1199        }
1200        if amqp_available {
1201            return Ok(ChannelChoice::new(
1202                Some("amqp".to_string()),
1203                None,
1204                amqp_service,
1205                None,
1206                None,
1207            ));
1208        }
1209        if mqtt_available {
1210            return Ok(ChannelChoice::new(
1211                Some("mqtt".to_string()),
1212                None,
1213                None,
1214                mqtt_service,
1215                None,
1216            ));
1217        }
1218        if has_direct {
1219            return Ok(ChannelChoice::new(
1220                None,
1221                None,
1222                None,
1223                None,
1224                Some(
1225                    "No compatible transport implementation available for this recipient"
1226                        .to_string(),
1227                ),
1228            ));
1229        }
1230        if amqp_service.is_some() {
1231            return Ok(ChannelChoice::new(
1232                None,
1233                None,
1234                None,
1235                None,
1236                Some(
1237                    "AMQP transport is advertised but not compatible with sender capabilities"
1238                        .to_string(),
1239                ),
1240            ));
1241        }
1242        if mqtt_service.is_some() {
1243            return Ok(ChannelChoice::new(
1244                None,
1245                None,
1246                None,
1247                None,
1248                Some(
1249                    "MQTT transport is advertised but not compatible with sender capabilities"
1250                        .to_string(),
1251                ),
1252            ));
1253        }
1254        Ok(ChannelChoice::new(
1255            None,
1256            None,
1257            None,
1258            None,
1259            Some(
1260                "Recipient identity document is missing direct_endpoint/amqp/mqtt and no relay fallback is compatible"
1261                    .to_string(),
1262            ),
1263        ))
1264    }
1265
1266    #[allow(clippy::too_many_arguments)]
1267    fn build_message(
1268        &self,
1269        recipients: Vec<String>,
1270        payload: &Map<String, Value>,
1271        recipient_public_keys: &HashMap<String, String>,
1272        message_class: MessageClass,
1273        context_id: &str,
1274        operation_id: Option<String>,
1275        expires_in_seconds: i64,
1276        correlation_id: Option<String>,
1277        in_reply_to: Option<String>,
1278    ) -> AcpResult<AcpMessage> {
1279        let envelope = Envelope::build(
1280            self.agent_id().to_string(),
1281            recipients,
1282            message_class,
1283            context_id.to_string(),
1284            expires_in_seconds,
1285            operation_id,
1286            correlation_id,
1287            in_reply_to,
1288            Some(DEFAULT_CRYPTO_SUITE.to_string()),
1289        )?;
1290        let mut protected_payload =
1291            crypto::encrypt_for_recipients(payload, &envelope, recipient_public_keys)?;
1292        crypto::sign_protected_payload(
1293            &envelope,
1294            &mut protected_payload,
1295            &self.identity.signing_private_key,
1296            &self.identity.signing_kid,
1297        )?;
1298        Ok(AcpMessage {
1299            envelope,
1300            protected_payload,
1301            sender_identity_document: Some(self.identity_document.clone()),
1302        })
1303    }
1304
1305    fn deliver_direct(
1306        &self,
1307        message: &AcpMessage,
1308        targets: &[ResolvedRecipient],
1309    ) -> Vec<DeliveryOutcome> {
1310        let mut outcomes = Vec::new();
1311        let message_map = match message.to_map() {
1312            Ok(map) => map,
1313            Err(exc) => {
1314                for target in targets {
1315                    outcomes.push(failed_outcome(
1316                        &target.recipient,
1317                        FailReason::PolicyRejected.as_str(),
1318                        &format!("Message serialization failure: {exc}"),
1319                    ));
1320                }
1321                return outcomes;
1322            }
1323        };
1324        for target in targets {
1325            let Some(endpoint) = target.endpoint.as_deref() else {
1326                outcomes.push(failed_outcome(
1327                    &target.recipient,
1328                    FailReason::PolicyRejected.as_str(),
1329                    "Missing direct endpoint for direct delivery",
1330                ));
1331                continue;
1332            };
1333            match self.transport.post_json(endpoint, &message_map) {
1334                Ok(response) => {
1335                    outcomes.push(outcome_from_http_response(&target.recipient, &response))
1336                }
1337                Err(exc) => outcomes.push(failed_outcome(
1338                    &target.recipient,
1339                    FailReason::PolicyRejected.as_str(),
1340                    &format!("Direct transport failure: {exc}"),
1341                )),
1342            }
1343        }
1344        outcomes
1345    }
1346
1347    fn deliver_via_relay(
1348        &self,
1349        message: &AcpMessage,
1350        targets: &[ResolvedRecipient],
1351    ) -> Vec<DeliveryOutcome> {
1352        let mut outcomes = Vec::new();
1353        match self.transport.send_to_relay(&self.relay_url, message) {
1354            Ok(relay_response) => {
1355                let mut delivered = HashSet::new();
1356                if let Some(raw_outcomes) = relay_response.get("outcomes").and_then(Value::as_array)
1357                {
1358                    for item in raw_outcomes {
1359                        if let Ok(outcome) = serde_json::from_value::<DeliveryOutcome>(item.clone())
1360                        {
1361                            delivered.insert(outcome.recipient.clone());
1362                            outcomes.push(outcome);
1363                        }
1364                    }
1365                }
1366                for target in targets {
1367                    if !delivered.contains(&target.recipient) {
1368                        outcomes.push(failed_outcome(
1369                            &target.recipient,
1370                            FailReason::PolicyRejected.as_str(),
1371                            "Relay did not return an outcome for recipient",
1372                        ));
1373                    }
1374                }
1375            }
1376            Err(exc) => {
1377                for target in targets {
1378                    outcomes.push(failed_outcome(
1379                        &target.recipient,
1380                        FailReason::PolicyRejected.as_str(),
1381                        &format!("Relay transport failure: {exc}"),
1382                    ));
1383                }
1384            }
1385        }
1386        outcomes
1387    }
1388
1389    fn deliver_via_amqp(
1390        &self,
1391        message: &AcpMessage,
1392        target: &ResolvedRecipient,
1393    ) -> DeliveryOutcome {
1394        let mut outcome = DeliveryOutcome {
1395            recipient: target.recipient.clone(),
1396            state: DeliveryState::Pending,
1397            status_code: None,
1398            response_class: None,
1399            reason_code: None,
1400            detail: None,
1401            response_message: None,
1402        };
1403        let message_map = match message.to_map() {
1404            Ok(message_map) => message_map,
1405            Err(exc) => {
1406                outcome.state = DeliveryState::Failed;
1407                outcome.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
1408                outcome.detail = Some(format!("AMQP message serialization failure: {exc}"));
1409                return outcome;
1410            }
1411        };
1412        let result = (|| -> AcpResult<()> {
1413            let client = if let Some(amqp_transport) = &self.amqp_transport {
1414                amqp_transport.clone()
1415            } else {
1416                let broker_url = target
1417                    .amqp_service
1418                    .as_ref()
1419                    .and_then(|service| service.get("broker_url"))
1420                    .and_then(Value::as_str)
1421                    .ok_or_else(|| {
1422                        AcpError::Transport(
1423                            "AMQP delivery selected but sender is not configured with an AMQP broker"
1424                                .to_string(),
1425                        )
1426                    })?;
1427                AmqpTransportClient::new(
1428                    broker_url.to_string(),
1429                    target
1430                        .amqp_service
1431                        .as_ref()
1432                        .and_then(|service| service.get("exchange"))
1433                        .and_then(Value::as_str)
1434                        .map(str::to_string),
1435                    None,
1436                    10,
1437                )?
1438            };
1439            client.publish(
1440                &message_map,
1441                &target.recipient,
1442                target.amqp_service.as_ref(),
1443            )
1444        })();
1445        match result {
1446            Ok(()) => outcome.state = DeliveryState::Delivered,
1447            Err(exc) => {
1448                outcome.state = DeliveryState::Failed;
1449                outcome.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
1450                outcome.detail = Some(format!("AMQP transport failure: {exc}"));
1451            }
1452        }
1453        outcome
1454    }
1455
1456    fn deliver_via_mqtt(
1457        &self,
1458        message: &AcpMessage,
1459        target: &ResolvedRecipient,
1460    ) -> DeliveryOutcome {
1461        let mut outcome = DeliveryOutcome {
1462            recipient: target.recipient.clone(),
1463            state: DeliveryState::Pending,
1464            status_code: None,
1465            response_class: None,
1466            reason_code: None,
1467            detail: None,
1468            response_message: None,
1469        };
1470        let message_map = match message.to_map() {
1471            Ok(message_map) => message_map,
1472            Err(exc) => {
1473                outcome.state = DeliveryState::Failed;
1474                outcome.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
1475                outcome.detail = Some(format!("MQTT message serialization failure: {exc}"));
1476                return outcome;
1477            }
1478        };
1479        let result = (|| -> AcpResult<()> {
1480            let client = if let Some(mqtt_transport) = &self.mqtt_transport {
1481                mqtt_transport.clone()
1482            } else {
1483                let service = target.mqtt_service.as_ref().ok_or_else(|| {
1484                    AcpError::Transport(
1485                        "MQTT delivery selected but sender is not configured with an MQTT broker"
1486                            .to_string(),
1487                    )
1488                })?;
1489                let broker_url = service
1490                    .get("broker_url")
1491                    .and_then(Value::as_str)
1492                    .ok_or_else(|| {
1493                        AcpError::Transport(
1494                            "MQTT delivery selected but sender is not configured with an MQTT broker"
1495                                .to_string(),
1496                        )
1497                    })?;
1498                MqttTransportClient::new(
1499                    broker_url.to_string(),
1500                    service.get("qos").and_then(|v| v.as_u64()).map(|v| v as u8),
1501                    None,
1502                    10,
1503                    30,
1504                )?
1505            };
1506            client.publish(
1507                &message_map,
1508                &target.recipient,
1509                target.mqtt_service.as_ref(),
1510            )
1511        })();
1512        match result {
1513            Ok(()) => outcome.state = DeliveryState::Delivered,
1514            Err(exc) => {
1515                outcome.state = DeliveryState::Failed;
1516                outcome.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
1517                outcome.detail = Some(format!("MQTT transport failure: {exc}"));
1518            }
1519        }
1520        outcome
1521    }
1522
1523    fn publish_amqp_response_message(
1524        &mut self,
1525        raw_message: &Map<String, Value>,
1526        response_message: &Map<String, Value>,
1527    ) -> AcpResult<()> {
1528        let amqp_transport = self
1529            .amqp_transport
1530            .clone()
1531            .ok_or_else(|| AcpError::Transport("AMQP transport is not configured".to_string()))?;
1532        let sender_id = raw_message
1533            .get("envelope")
1534            .and_then(Value::as_object)
1535            .and_then(|envelope| envelope.get("sender"))
1536            .and_then(Value::as_str)
1537            .ok_or_else(|| {
1538                AcpError::Transport(
1539                    "Inbound message sender is missing for AMQP response routing".to_string(),
1540                )
1541            })?;
1542        let sender_identity = self.resolve_sender_identity_document(raw_message, sender_id)?;
1543        let sender_amqp_service = sender_identity
1544            .get("service")
1545            .and_then(Value::as_object)
1546            .and_then(|service| service.get("amqp"))
1547            .and_then(Value::as_object)
1548            .cloned()
1549            .ok_or_else(|| {
1550                AcpError::Transport(format!(
1551                    "Sender {sender_id} does not advertise service.amqp for AMQP response delivery"
1552                ))
1553            })?;
1554        amqp_transport.publish(response_message, sender_id, Some(&sender_amqp_service))
1555    }
1556
1557    fn publish_mqtt_response_message(
1558        &mut self,
1559        raw_message: &Map<String, Value>,
1560        response_message: &Map<String, Value>,
1561    ) -> AcpResult<()> {
1562        let mqtt_transport = self
1563            .mqtt_transport
1564            .clone()
1565            .ok_or_else(|| AcpError::Transport("MQTT transport is not configured".to_string()))?;
1566        let sender_id = raw_message
1567            .get("envelope")
1568            .and_then(Value::as_object)
1569            .and_then(|envelope| envelope.get("sender"))
1570            .and_then(Value::as_str)
1571            .ok_or_else(|| {
1572                AcpError::Transport(
1573                    "Inbound message sender is missing for MQTT response routing".to_string(),
1574                )
1575            })?;
1576        let sender_identity = self.resolve_sender_identity_document(raw_message, sender_id)?;
1577        let sender_mqtt_service = sender_identity
1578            .get("service")
1579            .and_then(Value::as_object)
1580            .and_then(|service| service.get("mqtt"))
1581            .and_then(Value::as_object)
1582            .cloned()
1583            .ok_or_else(|| {
1584                AcpError::Transport(format!(
1585                    "Sender {sender_id} does not advertise service.mqtt for MQTT response delivery"
1586                ))
1587            })?;
1588        mqtt_transport.publish(response_message, sender_id, Some(&sender_mqtt_service))
1589    }
1590
1591    fn resolve_sender_identity_document(
1592        &mut self,
1593        raw_message: &Map<String, Value>,
1594        sender_id: &str,
1595    ) -> AcpResult<Map<String, Value>> {
1596        if let Some(embedded) = raw_message
1597            .get("sender_identity_document")
1598            .and_then(Value::as_object)
1599            .cloned()
1600            && embedded.get("agent_id").and_then(Value::as_str) == Some(sender_id)
1601            && verify_identity_document(&embedded)
1602        {
1603            return Ok(embedded);
1604        }
1605        self.discovery.resolve(sender_id)
1606    }
1607
1608    fn validate_envelope_for_inbound(&self, envelope: &Envelope) -> AcpResult<()> {
1609        if envelope.acp_version != ACP_VERSION {
1610            return Err(AcpError::Processing {
1611                reason: FailReason::UnsupportedVersion,
1612                detail: format!("Unsupported ACP version: {}", envelope.acp_version),
1613            });
1614        }
1615        if envelope.crypto_suite != DEFAULT_CRYPTO_SUITE {
1616            return Err(AcpError::Processing {
1617                reason: FailReason::UnsupportedCryptoSuite,
1618                detail: format!("Unsupported crypto suite: {}", envelope.crypto_suite),
1619            });
1620        }
1621        if envelope.is_expired() {
1622            return Err(AcpError::Processing {
1623                reason: FailReason::ExpiredMessage,
1624                detail: "Message is expired".to_string(),
1625            });
1626        }
1627        Ok(())
1628    }
1629
1630    fn create_response_message(
1631        &self,
1632        sender_identity_document: &Map<String, Value>,
1633        request_envelope: &Envelope,
1634        response_class: MessageClass,
1635        response_payload: Map<String, Value>,
1636    ) -> AcpResult<AcpMessage> {
1637        let sender_id = &request_envelope.sender;
1638        let sender_encryption_public_key = sender_identity_document
1639            .get("keys")
1640            .and_then(Value::as_object)
1641            .and_then(|keys| keys.get("encryption"))
1642            .and_then(Value::as_object)
1643            .and_then(|encryption| encryption.get("public_key"))
1644            .and_then(Value::as_str)
1645            .ok_or_else(|| AcpError::Processing {
1646                reason: FailReason::PolicyRejected,
1647                detail: "Sender identity document missing encryption key".to_string(),
1648            })?;
1649        self.build_message(
1650            vec![sender_id.to_string()],
1651            &response_payload,
1652            &HashMap::from([(
1653                sender_id.to_string(),
1654                sender_encryption_public_key.to_string(),
1655            )]),
1656            response_class,
1657            &request_envelope.context_id,
1658            Some(request_envelope.operation_id.clone()),
1659            300,
1660            request_envelope
1661                .correlation_id
1662                .clone()
1663                .or_else(|| Some(request_envelope.operation_id.clone())),
1664            Some(request_envelope.message_id.clone()),
1665        )
1666    }
1667
1668    fn sync_delivery_states(&mut self, operation_id: &str, outcomes: &[DeliveryOutcome]) {
1669        let mut states = HashMap::new();
1670        for outcome in outcomes {
1671            states.insert(
1672                outcome.recipient.clone(),
1673                format!("{:?}", outcome.state).to_uppercase(),
1674            );
1675        }
1676        self.delivery_states
1677            .insert(operation_id.to_string(), states);
1678    }
1679}
1680
1681#[derive(Debug, Clone)]
1682struct ResolvedRecipient {
1683    recipient: String,
1684    public_key: String,
1685    channel: String,
1686    endpoint: Option<String>,
1687    amqp_service: Option<Map<String, Value>>,
1688    mqtt_service: Option<Map<String, Value>>,
1689}
1690
1691#[derive(Debug, Clone)]
1692struct ResolvedRecipients {
1693    deliverable: Vec<ResolvedRecipient>,
1694    preflight_outcomes: Vec<DeliveryOutcome>,
1695}
1696
1697#[derive(Debug, Clone)]
1698struct ChannelChoice {
1699    channel: Option<String>,
1700    endpoint: Option<String>,
1701    amqp_service: Option<Map<String, Value>>,
1702    mqtt_service: Option<Map<String, Value>>,
1703    detail: Option<String>,
1704}
1705
1706impl ChannelChoice {
1707    fn new(
1708        channel: Option<String>,
1709        endpoint: Option<String>,
1710        amqp_service: Option<Map<String, Value>>,
1711        mqtt_service: Option<Map<String, Value>>,
1712        detail: Option<String>,
1713    ) -> Self {
1714        Self {
1715            channel,
1716            endpoint,
1717            amqp_service,
1718            mqtt_service,
1719            detail,
1720        }
1721    }
1722}
1723
1724#[derive(Debug, Clone)]
1725struct DedupStore {
1726    ttl: Duration,
1727    processed: HashMap<String, Instant>,
1728}
1729
1730impl DedupStore {
1731    fn new(ttl: Duration) -> Self {
1732        Self {
1733            ttl,
1734            processed: HashMap::new(),
1735        }
1736    }
1737
1738    fn is_duplicate(&mut self, message_id: &str) -> bool {
1739        self.cleanup_expired();
1740        self.processed.contains_key(message_id)
1741    }
1742
1743    fn mark_processed(&mut self, message_id: &str) {
1744        self.processed
1745            .insert(message_id.to_string(), Instant::now());
1746    }
1747
1748    fn cleanup_expired(&mut self) {
1749        let ttl = self.ttl;
1750        self.processed
1751            .retain(|_, timestamp| timestamp.elapsed() < ttl);
1752    }
1753}
1754
1755fn outcome_from_http_response(recipient: &str, response: &TransportResponse) -> DeliveryOutcome {
1756    let mut response_class = None;
1757    let mut response_message = None;
1758    let mut reason_code = None;
1759    let mut detail = None;
1760
1761    if let Some(body) = &response.body {
1762        if let Some(raw_response_message) = body.get("response_message").and_then(Value::as_object)
1763        {
1764            response_message = Some(raw_response_message.clone());
1765            response_class = raw_response_message
1766                .get("envelope")
1767                .and_then(Value::as_object)
1768                .and_then(|envelope| envelope.get("message_class"))
1769                .and_then(Value::as_str)
1770                .and_then(parse_message_class);
1771        }
1772        reason_code = body
1773            .get("reason_code")
1774            .and_then(Value::as_str)
1775            .map(str::to_string);
1776        detail = body
1777            .get("detail")
1778            .and_then(Value::as_str)
1779            .map(str::to_string);
1780    }
1781    if detail.is_none() && response.status_code >= 400 {
1782        detail = Some(format!("Recipient HTTP {}", response.status_code));
1783    }
1784
1785    DeliveryOutcome {
1786        recipient: recipient.to_string(),
1787        state: delivery_state_from_response(
1788            response.status_code,
1789            response_class,
1790            reason_code.as_deref(),
1791        ),
1792        status_code: Some(response.status_code),
1793        response_class,
1794        reason_code,
1795        detail,
1796        response_message,
1797    }
1798}
1799
1800fn delivery_state_from_response(
1801    status_code: u16,
1802    response_class: Option<MessageClass>,
1803    reason_code: Option<&str>,
1804) -> DeliveryState {
1805    if (200..300).contains(&status_code) {
1806        if response_class == Some(MessageClass::Fail) {
1807            if reason_code == Some(FailReason::ExpiredMessage.as_str()) {
1808                return DeliveryState::Expired;
1809            }
1810            if reason_code == Some(FailReason::PolicyRejected.as_str()) {
1811                return DeliveryState::Declined;
1812            }
1813            return DeliveryState::Failed;
1814        }
1815        if matches!(
1816            response_class,
1817            Some(MessageClass::Ack | MessageClass::Capabilities)
1818        ) {
1819            return DeliveryState::Acknowledged;
1820        }
1821        return DeliveryState::Delivered;
1822    }
1823    if status_code == 410 {
1824        return DeliveryState::Expired;
1825    }
1826    if [401, 403, 409, 422].contains(&status_code) {
1827        return DeliveryState::Declined;
1828    }
1829    DeliveryState::Failed
1830}
1831
1832fn parse_message_class(value: &str) -> Option<MessageClass> {
1833    match value {
1834        "SEND" => Some(MessageClass::Send),
1835        "ACK" => Some(MessageClass::Ack),
1836        "FAIL" => Some(MessageClass::Fail),
1837        "CAPABILITIES" => Some(MessageClass::Capabilities),
1838        "COMPENSATE" => Some(MessageClass::Compensate),
1839        _ => None,
1840    }
1841}
1842
1843fn failed_outcome(recipient: &str, reason_code: &str, detail: &str) -> DeliveryOutcome {
1844    DeliveryOutcome {
1845        recipient: recipient.to_string(),
1846        state: DeliveryState::Failed,
1847        status_code: None,
1848        response_class: None,
1849        reason_code: Some(reason_code.to_string()),
1850        detail: Some(detail.to_string()),
1851        response_message: None,
1852    }
1853}
1854
1855fn to_public_key_map(targets: &[ResolvedRecipient]) -> HashMap<String, String> {
1856    targets
1857        .iter()
1858        .map(|target| (target.recipient.clone(), target.public_key.clone()))
1859        .collect()
1860}
1861
1862fn reason_for_capability_mismatch(reason: Option<&str>) -> FailReason {
1863    let normalized = reason.unwrap_or_default().to_lowercase();
1864    if normalized.contains("protocol") {
1865        return FailReason::UnsupportedVersion;
1866    }
1867    if normalized.contains("crypto") {
1868        return FailReason::UnsupportedCryptoSuite;
1869    }
1870    if normalized.contains("profile") {
1871        return FailReason::UnsupportedProfile;
1872    }
1873    FailReason::PolicyRejected
1874}
1875
1876fn build_local_amqp_service(
1877    agent_id: &str,
1878    options: &AcpAgentOptions,
1879) -> AcpResult<Option<Map<String, Value>>> {
1880    let Some(broker_url) = options.amqp_broker_url.as_deref() else {
1881        return Ok(None);
1882    };
1883    Ok(Some(AmqpTransportClient::build_service_hint(
1884        agent_id,
1885        broker_url,
1886        Some(&options.amqp_exchange),
1887    )?))
1888}
1889
1890fn build_local_mqtt_service(
1891    agent_id: &str,
1892    options: &AcpAgentOptions,
1893) -> AcpResult<Option<Map<String, Value>>> {
1894    let Some(broker_url) = options.mqtt_broker_url.as_deref() else {
1895        return Ok(None);
1896    };
1897    Ok(Some(MqttTransportClient::build_service_hint(
1898        agent_id,
1899        broker_url,
1900        None,
1901        Some(options.mqtt_qos),
1902        Some(&options.mqtt_topic_prefix),
1903    )?))
1904}
1905
1906fn apply_http_security_profile(identity_document: &mut Map<String, Value>, mtls_enabled: bool) {
1907    if !mtls_enabled {
1908        return;
1909    }
1910    let mut service = identity_document
1911        .get("service")
1912        .and_then(Value::as_object)
1913        .cloned()
1914        .unwrap_or_default();
1915    let direct_endpoint = service
1916        .get("direct_endpoint")
1917        .and_then(Value::as_str)
1918        .map(str::to_string);
1919    let relay_hints = service
1920        .get("relay_hints")
1921        .and_then(Value::as_array)
1922        .map(|items| {
1923            items
1924                .iter()
1925                .filter_map(Value::as_str)
1926                .map(str::to_string)
1927                .collect::<Vec<_>>()
1928        })
1929        .unwrap_or_default();
1930    if let Some(endpoint) = direct_endpoint
1931        && !endpoint.trim().is_empty()
1932    {
1933        service.insert(
1934            "http".to_string(),
1935            serde_json::json!({
1936                "endpoint": endpoint,
1937                "security_profile": "mtls",
1938            }),
1939        );
1940    }
1941    if let Some(relay_endpoint) = relay_hints.first()
1942        && !relay_endpoint.trim().is_empty()
1943    {
1944        service.insert(
1945            "relay".to_string(),
1946            serde_json::json!({
1947                "endpoint": relay_endpoint,
1948                "security_profile": "mtls",
1949            }),
1950        );
1951    }
1952    identity_document.insert("service".to_string(), Value::Object(service));
1953}
1954
1955fn resolve_key_provider(options: &AcpAgentOptions) -> AcpResult<Box<dyn KeyProvider>> {
1956    if let Some(provider) = options.key_provider_instance.clone() {
1957        return Ok(Box::new(ArcKeyProvider(provider)));
1958    }
1959    let provider_name = normalize_key_provider_name(&options.key_provider);
1960    match provider_name.as_str() {
1961        "local" => Ok(Box::new(LocalKeyProvider::new(
1962            options.storage_dir.clone(),
1963            options.cert_file.clone(),
1964            options.key_file.clone(),
1965            options.ca_file.clone(),
1966        ))),
1967        "vault" => {
1968            let vault_url = options.vault_url.clone().ok_or_else(|| {
1969                AcpError::Validation("vault_url is required when key_provider=vault".to_string())
1970            })?;
1971            let vault_path = options.vault_path.clone().ok_or_else(|| {
1972                AcpError::Validation("vault_path is required when key_provider=vault".to_string())
1973            })?;
1974            Ok(Box::new(VaultKeyProvider::new(
1975                vault_url,
1976                vault_path,
1977                Some(options.vault_token_env.clone()),
1978                options.vault_token.clone(),
1979                options.http_timeout_seconds,
1980                options.ca_file.clone(),
1981                options.allow_insecure_tls,
1982                options.allow_insecure_http,
1983            )?))
1984        }
1985        _ => Err(AcpError::Validation(format!(
1986            "Unsupported key_provider: {}",
1987            options.key_provider
1988        ))),
1989    }
1990}
1991
1992fn normalize_key_provider_name(value: &str) -> String {
1993    let normalized = value.trim().to_lowercase();
1994    if normalized.is_empty() {
1995        "local".to_string()
1996    } else {
1997        normalized
1998    }
1999}
2000
2001fn matches_provider_local(key_provider_info: &KeyProviderInfo) -> bool {
2002    key_provider_info
2003        .get("provider")
2004        .and_then(Value::as_str)
2005        .map(|provider| provider == "local")
2006        .unwrap_or(false)
2007}
2008
2009fn identity_from_provider(agent_id: &str, keys: &IdentityKeyMaterial) -> AcpResult<AgentIdentity> {
2010    let mut missing = Vec::new();
2011    if keys
2012        .signing_public_key
2013        .as_deref()
2014        .map(str::trim)
2015        .filter(|v| !v.is_empty())
2016        .is_none()
2017    {
2018        missing.push("signing_public_key");
2019    }
2020    if keys
2021        .encryption_public_key
2022        .as_deref()
2023        .map(str::trim)
2024        .filter(|v| !v.is_empty())
2025        .is_none()
2026    {
2027        missing.push("encryption_public_key");
2028    }
2029    if keys
2030        .signing_kid
2031        .as_deref()
2032        .map(str::trim)
2033        .filter(|v| !v.is_empty())
2034        .is_none()
2035    {
2036        missing.push("signing_kid");
2037    }
2038    if keys
2039        .encryption_kid
2040        .as_deref()
2041        .map(str::trim)
2042        .filter(|v| !v.is_empty())
2043        .is_none()
2044    {
2045        missing.push("encryption_kid");
2046    }
2047    if !missing.is_empty() {
2048        return Err(AcpError::Validation(format!(
2049            "External key provider requires identity public metadata for first-time bootstrap: {}",
2050            missing.join(", ")
2051        )));
2052    }
2053    Ok(AgentIdentity {
2054        agent_id: agent_id.to_string(),
2055        signing_private_key: keys.signing_private_key.clone(),
2056        signing_public_key: keys.signing_public_key.clone().unwrap_or_default(),
2057        encryption_private_key: keys.encryption_private_key.clone(),
2058        encryption_public_key: keys.encryption_public_key.clone().unwrap_or_default(),
2059        signing_kid: keys.signing_kid.clone().unwrap_or_default(),
2060        encryption_kid: keys.encryption_kid.clone().unwrap_or_default(),
2061    })
2062}
2063
2064fn apply_provider_keys(
2065    identity: &AgentIdentity,
2066    keys: &IdentityKeyMaterial,
2067) -> AcpResult<AgentIdentity> {
2068    if let Some(signing_public_key) = keys.signing_public_key.as_deref()
2069        && signing_public_key != identity.signing_public_key
2070    {
2071        return Err(AcpError::Validation(
2072            "Key provider signing_public_key does not match local identity metadata".to_string(),
2073        ));
2074    }
2075    if let Some(encryption_public_key) = keys.encryption_public_key.as_deref()
2076        && encryption_public_key != identity.encryption_public_key
2077    {
2078        return Err(AcpError::Validation(
2079            "Key provider encryption_public_key does not match local identity metadata".to_string(),
2080        ));
2081    }
2082    if let Some(signing_kid) = keys.signing_kid.as_deref()
2083        && signing_kid != identity.signing_kid
2084    {
2085        return Err(AcpError::Validation(
2086            "Key provider signing_kid does not match local identity metadata".to_string(),
2087        ));
2088    }
2089    if let Some(encryption_kid) = keys.encryption_kid.as_deref()
2090        && encryption_kid != identity.encryption_kid
2091    {
2092        return Err(AcpError::Validation(
2093            "Key provider encryption_kid does not match local identity metadata".to_string(),
2094        ));
2095    }
2096    Ok(AgentIdentity {
2097        agent_id: identity.agent_id.clone(),
2098        signing_private_key: keys.signing_private_key.clone(),
2099        signing_public_key: identity.signing_public_key.clone(),
2100        encryption_private_key: keys.encryption_private_key.clone(),
2101        encryption_public_key: identity.encryption_public_key.clone(),
2102        signing_kid: identity.signing_kid.clone(),
2103        encryption_kid: identity.encryption_kid.clone(),
2104    })
2105}
2106
2107fn base_url_from_endpoint(endpoint: &str) -> Option<String> {
2108    let endpoint = endpoint.trim();
2109    if endpoint.is_empty() {
2110        return None;
2111    }
2112    let parsed = url::Url::parse(endpoint).ok()?;
2113    let scheme = parsed.scheme();
2114    let host = parsed.host_str()?;
2115    let authority = if let Some(port) = parsed.port() {
2116        format!("{host}:{port}")
2117    } else {
2118        host.to_string()
2119    };
2120    Some(format!("{scheme}://{authority}"))
2121}
2122
2123fn first_non_blank(values: &[Option<String>]) -> Option<String> {
2124    values
2125        .iter()
2126        .flatten()
2127        .map(|value| value.trim())
2128        .find(|value| !value.is_empty())
2129        .map(str::to_string)
2130}
2131
2132#[derive(Clone)]
2133struct ArcKeyProvider(Arc<dyn KeyProvider>);
2134
2135impl KeyProvider for ArcKeyProvider {
2136    fn load_identity_keys(&self, agent_id: &str) -> AcpResult<IdentityKeyMaterial> {
2137        self.0.load_identity_keys(agent_id)
2138    }
2139
2140    fn load_tls_material(&self, agent_id: &str) -> AcpResult<crate::key_provider::TlsMaterial> {
2141        self.0.load_tls_material(agent_id)
2142    }
2143
2144    fn load_ca_bundle(&self, agent_id: &str) -> AcpResult<Option<String>> {
2145        self.0.load_ca_bundle(agent_id)
2146    }
2147
2148    fn describe(&self) -> KeyProviderInfo {
2149        self.0.describe()
2150    }
2151}