Skip to main content

acp_runtime/
agent.rs

1// Copyright 2026 ACP Project
2// Licensed under the Apache License, Version 2.0
3// See LICENSE file for details.
4
5use std::collections::{HashMap, HashSet};
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use serde::Serialize;
11use serde_json::{Map, Value};
12use uuid::Uuid;
13
14use crate::amqp_transport::AmqpTransportClient;
15use crate::capabilities::AgentCapabilities;
16use crate::constants::{ACP_VERSION, DEFAULT_CRYPTO_SUITE};
17use crate::crypto;
18use crate::discovery::DiscoveryClient;
19use crate::errors::{AcpError, AcpResult, FailReason};
20use crate::http_security::{HttpSecurityPolicy, validate_http_client_policy, validate_http_url};
21use crate::identity::{
22    AgentIdentity, parse_agent_id, read_identity, verify_identity_document, write_identity,
23};
24use crate::key_provider::{
25    IdentityKeyMaterial, KeyProvider, KeyProviderInfo, LocalKeyProvider, VaultKeyProvider,
26};
27use crate::messages::{
28    AcpMessage, CompensateInstruction, DeliveryMode, DeliveryOutcome, DeliveryState, Envelope,
29    MessageClass, SendResult, build_ack_payload, build_fail_payload,
30};
31use crate::mqtt_transport::MqttTransportClient;
32use crate::options::AcpAgentOptions;
33use crate::transport::{TransportClient, TransportResponse};
34use crate::well_known::build_well_known_document;
35
36#[derive(Debug, Clone, Serialize)]
37pub struct InboundResult {
38    pub state: DeliveryState,
39    pub reason_code: Option<String>,
40    pub detail: Option<String>,
41    pub decrypted_payload: Option<Map<String, Value>>,
42    pub response_message: Option<Map<String, Value>>,
43}
44
45#[derive(Debug, Clone)]
46pub struct DecryptedMessage {
47    pub message: AcpMessage,
48    pub payload: Map<String, Value>,
49}
50
51pub type InboundHandlerFn =
52    dyn for<'a> Fn(&'a Map<String, Value>, &'a Envelope) -> Option<Map<String, Value>>;
53
54#[derive(Debug, Clone)]
55pub struct CapabilityRequestResult {
56    pub result: SendResult,
57    pub capabilities: Option<Map<String, Value>>,
58}
59
60#[derive(Debug, Clone)]
61pub struct AcpAgent {
62    pub identity: AgentIdentity,
63    pub identity_document: Map<String, Value>,
64    pub discovery: DiscoveryClient,
65    pub transport: TransportClient,
66    pub amqp_transport: Option<AmqpTransportClient>,
67    pub mqtt_transport: Option<MqttTransportClient>,
68    pub capabilities: AgentCapabilities,
69    pub storage_dir: PathBuf,
70    pub trust_profile: String,
71    pub relay_url: String,
72    pub default_delivery_mode: DeliveryMode,
73    pub key_provider_info: KeyProviderInfo,
74    delivery_states: HashMap<String, HashMap<String, String>>,
75    dedup: DedupStore,
76}
77
78impl AcpAgent {
79    pub fn load_or_create(agent_id: &str, options: Option<AcpAgentOptions>) -> AcpResult<Self> {
80        parse_agent_id(agent_id)?;
81        let options = options.unwrap_or_default();
82        std::fs::create_dir_all(&options.storage_dir)?;
83
84        let key_provider = resolve_key_provider(&options)?;
85        let key_provider_info = key_provider.describe();
86
87        let provider_tls_material = key_provider.load_tls_material(agent_id).ok();
88        let provider_ca_bundle = key_provider.load_ca_bundle(agent_id).ok().flatten();
89
90        let effective_ca_file = first_non_blank(&[
91            options.ca_file.clone(),
92            provider_tls_material
93                .as_ref()
94                .and_then(|m| m.ca_file.clone()),
95            provider_ca_bundle,
96        ]);
97        let effective_cert_file = first_non_blank(&[
98            options.cert_file.clone(),
99            provider_tls_material
100                .as_ref()
101                .and_then(|m| m.cert_file.clone()),
102        ]);
103        let effective_key_file = first_non_blank(&[
104            options.key_file.clone(),
105            provider_tls_material
106                .as_ref()
107                .and_then(|m| m.key_file.clone()),
108        ]);
109
110        let policy = HttpSecurityPolicy {
111            allow_insecure_http: options.allow_insecure_http,
112            allow_insecure_tls: options.allow_insecure_tls,
113            mtls_enabled: options.mtls_enabled,
114            ca_file: effective_ca_file.clone(),
115            cert_file: effective_cert_file.clone(),
116            key_file: effective_key_file.clone(),
117        };
118        validate_http_client_policy(&policy, "Agent HTTP security configuration")?;
119        if let Some(endpoint) = options.endpoint.as_deref() {
120            validate_http_url(
121                endpoint,
122                policy.allow_insecure_http,
123                policy.mtls_enabled,
124                "Agent direct endpoint configuration",
125            )?;
126        }
127        validate_http_url(
128            &options.relay_url,
129            policy.allow_insecure_http,
130            policy.mtls_enabled,
131            "Agent relay URL configuration",
132        )?;
133        for relay_hint in &options.relay_hints {
134            validate_http_url(
135                relay_hint,
136                policy.allow_insecure_http,
137                policy.mtls_enabled,
138                "Agent relay hint configuration",
139            )?;
140        }
141        for directory_hint in &options.enterprise_directory_hints {
142            validate_http_url(
143                directory_hint,
144                policy.allow_insecure_http,
145                policy.mtls_enabled,
146                "Agent enterprise directory hint configuration",
147            )?;
148        }
149
150        let local_amqp_service = build_local_amqp_service(agent_id, &options)?;
151        let local_mqtt_service = build_local_mqtt_service(agent_id, &options)?;
152
153        let provider_identity_keys = key_provider.load_identity_keys(agent_id).ok();
154        let external_key_provider = !matches_provider_local(&key_provider_info);
155
156        let (identity, identity_document, capabilities) =
157            match read_identity(&options.storage_dir, agent_id)? {
158                None => {
159                    let identity = if let Some(keys) = &provider_identity_keys {
160                        identity_from_provider(agent_id, keys)?
161                    } else if external_key_provider {
162                        return Err(AcpError::KeyProvider(
163                            "Unable to load identity keys from key provider".to_string(),
164                        ));
165                    } else {
166                        AgentIdentity::create(agent_id)?
167                    };
168                    let capabilities = options
169                        .capabilities
170                        .clone()
171                        .unwrap_or_else(|| AgentCapabilities::new(agent_id.to_string()));
172                    let mut identity_document = identity.build_identity_document(
173                        options.endpoint.as_deref(),
174                        &options.relay_hints,
175                        &options.trust_profile,
176                        Some(&capabilities.to_map()),
177                        365,
178                        local_amqp_service.as_ref(),
179                        local_mqtt_service.as_ref(),
180                        if options.mtls_enabled {
181                            Some("mtls")
182                        } else {
183                            None
184                        },
185                        if options.mtls_enabled {
186                            Some("mtls")
187                        } else {
188                            None
189                        },
190                    )?;
191                    apply_http_security_profile(&mut identity_document, options.mtls_enabled);
192                    write_identity(&options.storage_dir, &identity, &identity_document)?;
193                    (identity, identity_document, capabilities)
194                }
195                Some(bundle) => {
196                    let mut identity = bundle.identity;
197                    let mut identity_document = bundle.identity_document;
198                    if let Some(keys) = &provider_identity_keys {
199                        identity = apply_provider_keys(&identity, keys)?;
200                    } else if external_key_provider {
201                        return Err(AcpError::KeyProvider(
202                            "Unable to load identity keys from key provider".to_string(),
203                        ));
204                    }
205                    let valid_document = verify_identity_document(&identity_document);
206                    let capabilities = options.capabilities.clone().unwrap_or_else(|| {
207                        AgentCapabilities::from_map(
208                            identity_document
209                                .get("capabilities")
210                                .and_then(Value::as_object),
211                            agent_id,
212                        )
213                    });
214                    let should_rewrite = !valid_document
215                        || options.endpoint.is_some()
216                        || !options.relay_hints.is_empty()
217                        || options.capabilities.is_some()
218                        || local_amqp_service.is_some()
219                        || local_mqtt_service.is_some();
220                    if should_rewrite {
221                        let existing_service = identity_document
222                            .get("service")
223                            .and_then(Value::as_object)
224                            .cloned()
225                            .unwrap_or_default();
226                        let existing_endpoint = existing_service
227                            .get("direct_endpoint")
228                            .and_then(Value::as_str)
229                            .map(str::to_string);
230                        let existing_hints = existing_service
231                            .get("relay_hints")
232                            .and_then(Value::as_array)
233                            .map(|items| {
234                                items
235                                    .iter()
236                                    .filter_map(Value::as_str)
237                                    .map(str::to_string)
238                                    .collect::<Vec<_>>()
239                            })
240                            .unwrap_or_default();
241                        let existing_amqp_service = existing_service
242                            .get("amqp")
243                            .and_then(Value::as_object)
244                            .cloned();
245                        let existing_mqtt_service = existing_service
246                            .get("mqtt")
247                            .and_then(Value::as_object)
248                            .cloned();
249                        identity_document = identity.build_identity_document(
250                            options.endpoint.as_deref().or(existing_endpoint.as_deref()),
251                            if options.relay_hints.is_empty() {
252                                &existing_hints
253                            } else {
254                                &options.relay_hints
255                            },
256                            &options.trust_profile,
257                            Some(&capabilities.to_map()),
258                            365,
259                            local_amqp_service
260                                .as_ref()
261                                .or(existing_amqp_service.as_ref()),
262                            local_mqtt_service
263                                .as_ref()
264                                .or(existing_mqtt_service.as_ref()),
265                            if options.mtls_enabled {
266                                Some("mtls")
267                            } else {
268                                None
269                            },
270                            if options.mtls_enabled {
271                                Some("mtls")
272                            } else {
273                                None
274                            },
275                        )?;
276                        apply_http_security_profile(&mut identity_document, options.mtls_enabled);
277                        write_identity(&options.storage_dir, &identity, &identity_document)?;
278                    }
279                    (identity, identity_document, capabilities)
280                }
281            };
282
283        let mut effective_relay_hints = if !options.relay_hints.is_empty() {
284            options.relay_hints.clone()
285        } else {
286            identity_document
287                .get("service")
288                .and_then(Value::as_object)
289                .and_then(|service| service.get("relay_hints"))
290                .and_then(Value::as_array)
291                .map(|items| {
292                    items
293                        .iter()
294                        .filter_map(Value::as_str)
295                        .map(str::to_string)
296                        .collect::<Vec<_>>()
297                })
298                .unwrap_or_default()
299        };
300        if !effective_relay_hints.contains(&options.relay_url) {
301            effective_relay_hints.push(options.relay_url.clone());
302        }
303
304        let mut discovery = DiscoveryClient::new(
305            Some(options.storage_dir.join("discovery_cache.json")),
306            Some(options.discovery_scheme.clone()),
307            Some(effective_relay_hints),
308            Some(options.enterprise_directory_hints.clone()),
309            options.http_timeout_seconds,
310            policy.allow_insecure_http,
311            policy.allow_insecure_tls,
312            policy.ca_file.clone(),
313            policy.mtls_enabled,
314            policy.cert_file.clone(),
315            policy.key_file.clone(),
316        )?;
317        discovery.seed(identity_document.clone())?;
318
319        let amqp_transport = if let Some(transport) = options.amqp_transport.clone() {
320            Some(transport)
321        } else if let Some(broker_url) = options.amqp_broker_url.as_deref() {
322            Some(AmqpTransportClient::new(
323                broker_url.to_string(),
324                Some(options.amqp_exchange.clone()),
325                Some(options.amqp_exchange_type.clone()),
326                options.http_timeout_seconds,
327            )?)
328        } else {
329            None
330        };
331
332        let mqtt_transport = if let Some(transport) = options.mqtt_transport.clone() {
333            Some(transport)
334        } else if let Some(broker_url) = options.mqtt_broker_url.as_deref() {
335            Some(MqttTransportClient::new(
336                broker_url.to_string(),
337                Some(options.mqtt_qos),
338                Some(options.mqtt_topic_prefix.clone()),
339                options.http_timeout_seconds,
340                30,
341            )?)
342        } else {
343            None
344        };
345
346        Ok(Self {
347            identity,
348            identity_document,
349            discovery,
350            transport: TransportClient::new(options.http_timeout_seconds, &policy)?,
351            amqp_transport,
352            mqtt_transport,
353            capabilities,
354            storage_dir: options.storage_dir,
355            trust_profile: options.trust_profile,
356            relay_url: options.relay_url,
357            default_delivery_mode: options.default_delivery_mode,
358            key_provider_info,
359            delivery_states: HashMap::new(),
360            dedup: DedupStore::new(Duration::from_secs(3600)),
361        })
362    }
363
364    pub fn agent_id(&self) -> &str {
365        &self.identity.agent_id
366    }
367
368    pub fn get_delivery_states(&self) -> &HashMap<String, HashMap<String, String>> {
369        &self.delivery_states
370    }
371
372    pub fn build_well_known_document(
373        &self,
374        base_url: Option<&str>,
375        identity_document_url: Option<&str>,
376    ) -> AcpResult<Map<String, Value>> {
377        let resolved_base_url = base_url
378            .map(str::to_string)
379            .or_else(|| {
380                self.identity_document
381                    .get("service")
382                    .and_then(Value::as_object)
383                    .and_then(|service| service.get("direct_endpoint"))
384                    .and_then(Value::as_str)
385                    .and_then(base_url_from_endpoint)
386            })
387            .ok_or_else(|| {
388                AcpError::Validation(
389                    "Unable to build /.well-known/acp metadata without base_url or direct_endpoint"
390                        .to_string(),
391                )
392            })?;
393        build_well_known_document(
394            &self.identity_document,
395            &resolved_base_url,
396            identity_document_url,
397            Some(ACP_VERSION),
398        )
399    }
400
401    pub fn register_identity_document(
402        &mut self,
403        identity_document: Map<String, Value>,
404    ) -> AcpResult<()> {
405        self.discovery.register_identity_document(identity_document)
406    }
407
408    pub fn resolve_well_known(
409        &mut self,
410        base_url: &str,
411        expected_agent_id: Option<&str>,
412    ) -> AcpResult<Map<String, Value>> {
413        self.discovery
414            .resolve_well_known(base_url, expected_agent_id)
415    }
416
417    pub fn send(
418        &mut self,
419        recipients: Vec<String>,
420        payload: Map<String, Value>,
421        context: Option<String>,
422        message_class: MessageClass,
423        expires_in_seconds: i64,
424        correlation_id: Option<String>,
425        in_reply_to: Option<String>,
426        delivery_mode: Option<DeliveryMode>,
427    ) -> AcpResult<SendResult> {
428        if recipients.is_empty() {
429            return Err(AcpError::InvalidArgument(
430                "send() requires at least one recipient".to_string(),
431            ));
432        }
433        let mode = delivery_mode.unwrap_or(self.default_delivery_mode);
434        let operation_id = Uuid::new_v4().to_string();
435        let context_id = context.unwrap_or_else(|| operation_id.clone());
436
437        let resolved = self.resolve_recipients(&recipients, mode)?;
438        if resolved.deliverable.is_empty() {
439            let result = SendResult {
440                operation_id: operation_id.clone(),
441                message_id: Uuid::new_v4().to_string(),
442                message_ids: Vec::new(),
443                outcomes: resolved.preflight_outcomes.clone(),
444            };
445            self.sync_delivery_states(&operation_id, &result.outcomes);
446            return Ok(result);
447        }
448
449        let mut outcomes = resolved.preflight_outcomes;
450        let mut message_ids = Vec::new();
451        let direct_targets = resolved
452            .deliverable
453            .iter()
454            .filter(|target| target.channel == "direct")
455            .cloned()
456            .collect::<Vec<_>>();
457        let relay_targets = resolved
458            .deliverable
459            .iter()
460            .filter(|target| target.channel == "relay")
461            .cloned()
462            .collect::<Vec<_>>();
463        let amqp_targets = resolved
464            .deliverable
465            .iter()
466            .filter(|target| target.channel == "amqp")
467            .cloned()
468            .collect::<Vec<_>>();
469        let mqtt_targets = resolved
470            .deliverable
471            .iter()
472            .filter(|target| target.channel == "mqtt")
473            .cloned()
474            .collect::<Vec<_>>();
475
476        if !direct_targets.is_empty() {
477            let message = self.build_message(
478                direct_targets.iter().map(|r| r.recipient.clone()).collect(),
479                &payload,
480                &to_public_key_map(&direct_targets),
481                message_class,
482                &context_id,
483                Some(operation_id.clone()),
484                expires_in_seconds,
485                correlation_id.clone(),
486                in_reply_to.clone(),
487            )?;
488            message_ids.push(message.envelope.message_id.clone());
489            outcomes.extend(self.deliver_direct(&message, &direct_targets));
490        }
491        if !relay_targets.is_empty() {
492            let message = self.build_message(
493                relay_targets.iter().map(|r| r.recipient.clone()).collect(),
494                &payload,
495                &to_public_key_map(&relay_targets),
496                message_class,
497                &context_id,
498                Some(operation_id.clone()),
499                expires_in_seconds,
500                correlation_id.clone(),
501                in_reply_to.clone(),
502            )?;
503            message_ids.push(message.envelope.message_id.clone());
504            outcomes.extend(self.deliver_via_relay(&message, &relay_targets));
505        }
506        for target in &amqp_targets {
507            let recipient_public_keys =
508                HashMap::from([(target.recipient.clone(), target.public_key.clone())]);
509            let message = self.build_message(
510                vec![target.recipient.clone()],
511                &payload,
512                &recipient_public_keys,
513                message_class,
514                &context_id,
515                Some(operation_id.clone()),
516                expires_in_seconds,
517                correlation_id.clone(),
518                in_reply_to.clone(),
519            )?;
520            message_ids.push(message.envelope.message_id.clone());
521            outcomes.push(self.deliver_via_amqp(&message, target));
522        }
523        for target in &mqtt_targets {
524            let recipient_public_keys =
525                HashMap::from([(target.recipient.clone(), target.public_key.clone())]);
526            let message = self.build_message(
527                vec![target.recipient.clone()],
528                &payload,
529                &recipient_public_keys,
530                message_class,
531                &context_id,
532                Some(operation_id.clone()),
533                expires_in_seconds,
534                correlation_id.clone(),
535                in_reply_to.clone(),
536            )?;
537            message_ids.push(message.envelope.message_id.clone());
538            outcomes.push(self.deliver_via_mqtt(&message, target));
539        }
540        if message_ids.is_empty() {
541            message_ids.push(Uuid::new_v4().to_string());
542        }
543        let result = SendResult {
544            operation_id: operation_id.clone(),
545            message_id: message_ids[0].clone(),
546            message_ids,
547            outcomes,
548        };
549        self.sync_delivery_states(&operation_id, &result.outcomes);
550        Ok(result)
551    }
552
553    pub fn send_basic(
554        &mut self,
555        recipients: Vec<String>,
556        payload: Map<String, Value>,
557        context: Option<String>,
558    ) -> AcpResult<SendResult> {
559        self.send(
560            recipients,
561            payload,
562            context,
563            MessageClass::Send,
564            300,
565            None,
566            None,
567            Some(self.default_delivery_mode),
568        )
569    }
570
571    pub fn send_compensate(
572        &mut self,
573        recipients: Vec<String>,
574        original_operation_id: &str,
575        reason: &str,
576        actions: Vec<Map<String, Value>>,
577        context: Option<String>,
578        delivery_mode: Option<DeliveryMode>,
579    ) -> AcpResult<SendResult> {
580        let instruction = CompensateInstruction {
581            operation_id: original_operation_id.to_string(),
582            reason: reason.to_string(),
583            actions,
584        };
585        let mut payload = Map::new();
586        payload.insert(
587            "compensation".to_string(),
588            serde_json::to_value(instruction).unwrap_or(Value::Null),
589        );
590        self.send(
591            recipients,
592            payload,
593            context.or_else(|| Some(format!("compensate:{original_operation_id}"))),
594            MessageClass::Compensate,
595            300,
596            Some(original_operation_id.to_string()),
597            None,
598            delivery_mode,
599        )
600    }
601
602    pub fn decrypt_message_for_self(
603        &mut self,
604        raw_message: &Map<String, Value>,
605    ) -> AcpResult<DecryptedMessage> {
606        let message = AcpMessage::from_map(raw_message)?;
607        self.validate_envelope_for_inbound(&message.envelope)?;
608        if !message
609            .envelope
610            .recipients
611            .iter()
612            .any(|recipient| recipient == self.agent_id())
613        {
614            return Err(AcpError::Processing {
615                reason: FailReason::PolicyRejected,
616                detail: "Message is not addressed to this agent".to_string(),
617            });
618        }
619        let sender_doc =
620            self.resolve_sender_identity_document(raw_message, &message.envelope.sender)?;
621        let sender_signing_key = sender_doc
622            .get("keys")
623            .and_then(Value::as_object)
624            .and_then(|keys| keys.get("signing"))
625            .and_then(Value::as_object)
626            .and_then(|signing| signing.get("public_key"))
627            .and_then(Value::as_str)
628            .ok_or_else(|| AcpError::Processing {
629                reason: FailReason::InvalidSignature,
630                detail: "Sender signing public key missing".to_string(),
631            })?;
632        if !crypto::verify_protected_payload_signature(
633            &message.envelope,
634            &message.protected_payload,
635            sender_signing_key,
636        ) {
637            return Err(AcpError::Processing {
638                reason: FailReason::InvalidSignature,
639                detail: "Message signature verification failed".to_string(),
640            });
641        }
642        let payload = crypto::decrypt_for_recipient(
643            &message.envelope,
644            &message.protected_payload,
645            self.agent_id(),
646            &self.identity.encryption_private_key,
647        )?;
648        Ok(DecryptedMessage { message, payload })
649    }
650
651    pub fn receive(
652        &mut self,
653        raw_message: &Map<String, Value>,
654        handler: Option<&InboundHandlerFn>,
655    ) -> InboundResult {
656        let mut result = InboundResult {
657            state: DeliveryState::Failed,
658            reason_code: None,
659            detail: None,
660            decrypted_payload: None,
661            response_message: None,
662        };
663
664        let request_message = match AcpMessage::from_map(raw_message) {
665            Ok(message) => message,
666            Err(exc) => {
667                result.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
668                result.detail = Some(format!("Invalid ACP message structure: {exc}"));
669                return result;
670            }
671        };
672
673        let mut sender_identity_document: Option<Map<String, Value>> = None;
674
675        let processing_result = (|| -> AcpResult<()> {
676            self.validate_envelope_for_inbound(&request_message.envelope)?;
677            if !request_message
678                .envelope
679                .recipients
680                .iter()
681                .any(|recipient| recipient == self.agent_id())
682            {
683                return Err(AcpError::Processing {
684                    reason: FailReason::PolicyRejected,
685                    detail: format!("Recipient {} not in message recipients", self.agent_id()),
686                });
687            }
688
689            let sender_doc = self
690                .resolve_sender_identity_document(raw_message, &request_message.envelope.sender)?;
691            sender_identity_document = Some(sender_doc.clone());
692            let sender_signing_key = sender_doc
693                .get("keys")
694                .and_then(Value::as_object)
695                .and_then(|keys| keys.get("signing"))
696                .and_then(Value::as_object)
697                .and_then(|signing| signing.get("public_key"))
698                .and_then(Value::as_str)
699                .ok_or_else(|| AcpError::Processing {
700                    reason: FailReason::InvalidSignature,
701                    detail: "Sender signing key missing from identity document".to_string(),
702                })?;
703            if !crypto::verify_protected_payload_signature(
704                &request_message.envelope,
705                &request_message.protected_payload,
706                sender_signing_key,
707            ) {
708                return Err(AcpError::Processing {
709                    reason: FailReason::InvalidSignature,
710                    detail: "Signature verification failed".to_string(),
711                });
712            }
713
714            if self
715                .dedup
716                .is_duplicate(&request_message.envelope.message_id)
717            {
718                result.state = DeliveryState::Acknowledged;
719                result.detail = Some("Duplicate message acknowledged".to_string());
720                if !matches!(
721                    request_message.envelope.message_class,
722                    MessageClass::Ack | MessageClass::Fail
723                ) {
724                    let duplicate_ack = self.create_response_message(
725                        &sender_doc,
726                        &request_message.envelope,
727                        MessageClass::Ack,
728                        build_ack_payload(&request_message.envelope.message_id, "duplicate"),
729                    )?;
730                    result.response_message = Some(duplicate_ack.to_map()?);
731                }
732                return Ok(());
733            }
734
735            let decrypted_payload = crypto::decrypt_for_recipient(
736                &request_message.envelope,
737                &request_message.protected_payload,
738                self.agent_id(),
739                &self.identity.encryption_private_key,
740            )?;
741            result.decrypted_payload = Some(decrypted_payload.clone());
742
743            let response_message =
744                if request_message.envelope.message_class == MessageClass::Capabilities {
745                    Some(self.create_response_message(
746                        &sender_doc,
747                        &request_message.envelope,
748                        MessageClass::Capabilities,
749                        self.capabilities.to_map(),
750                    )?)
751                } else {
752                    let mut ack_payload =
753                        build_ack_payload(&request_message.envelope.message_id, "accepted");
754                    if let Some(handler) = handler
755                        && let Some(handler_payload) =
756                            handler(&decrypted_payload, &request_message.envelope)
757                        && !handler_payload.is_empty()
758                    {
759                        ack_payload.insert("handler".to_string(), Value::Object(handler_payload));
760                    }
761                    if matches!(
762                        request_message.envelope.message_class,
763                        MessageClass::Ack | MessageClass::Fail
764                    ) {
765                        None
766                    } else {
767                        Some(self.create_response_message(
768                            &sender_doc,
769                            &request_message.envelope,
770                            MessageClass::Ack,
771                            ack_payload,
772                        )?)
773                    }
774                };
775
776            self.dedup
777                .mark_processed(&request_message.envelope.message_id);
778            result.state = DeliveryState::Acknowledged;
779            result.response_message = response_message
780                .map(|message| message.to_map())
781                .transpose()?;
782            Ok(())
783        })();
784
785        if let Err(exc) = processing_result {
786            let (reason_code, detail) = match exc {
787                AcpError::Processing { reason, detail } => (reason.as_str().to_string(), detail),
788                _ => (
789                    FailReason::PolicyRejected.as_str().to_string(),
790                    exc.to_string(),
791                ),
792            };
793            result.reason_code = Some(reason_code.clone());
794            result.detail = Some(detail.clone());
795            if let Some(sender_doc) = sender_identity_document {
796                let fail_response = self.create_response_message(
797                    &sender_doc,
798                    &request_message.envelope,
799                    MessageClass::Fail,
800                    build_fail_payload(reason_code, detail, false),
801                );
802                result.response_message =
803                    fail_response.ok().and_then(|message| message.to_map().ok());
804            }
805        }
806        result
807    }
808
809    pub fn request_capabilities(&mut self, recipient: &str) -> AcpResult<CapabilityRequestResult> {
810        let mut payload = Map::new();
811        payload.insert(
812            "request".to_string(),
813            Value::String("capabilities".to_string()),
814        );
815        let result = self.send(
816            vec![recipient.to_string()],
817            payload,
818            Some(format!("capabilities:{}", Uuid::new_v4())),
819            MessageClass::Capabilities,
820            300,
821            None,
822            None,
823            Some(self.default_delivery_mode),
824        )?;
825        let mut response_payload = None;
826        for outcome in &result.outcomes {
827            let Some(response_message) = &outcome.response_message else {
828                continue;
829            };
830            if let Ok(decrypted) = self.decrypt_message_for_self(response_message)
831                && decrypted.message.envelope.message_class == MessageClass::Capabilities
832            {
833                response_payload = Some(decrypted.payload);
834                break;
835            }
836        }
837        Ok(CapabilityRequestResult {
838            result,
839            capabilities: response_payload,
840        })
841    }
842
843    pub fn consume_from_amqp<F>(
844        &mut self,
845        max_messages: usize,
846        handler: Option<F>,
847    ) -> AcpResult<usize>
848    where
849        F: Fn(&Map<String, Value>, &Envelope) -> Option<Map<String, Value>> + Send + 'static,
850    {
851        let amqp_transport = self.amqp_transport.clone().ok_or_else(|| {
852            AcpError::Transport("consume_from_amqp requires an AMQP-configured agent".to_string())
853        })?;
854        let amqp_service = self
855            .identity_document
856            .get("service")
857            .and_then(Value::as_object)
858            .and_then(|service| service.get("amqp"))
859            .and_then(Value::as_object)
860            .cloned()
861            .ok_or_else(|| {
862                AcpError::Transport(
863                    "Identity document is missing service.amqp configuration".to_string(),
864                )
865            })?;
866        let agent_ptr = Arc::new(std::sync::Mutex::new(self.clone()));
867        let agent_for_handler = Arc::clone(&agent_ptr);
868        let processed = amqp_transport.consume(
869            self.agent_id(),
870            move |raw_message| {
871                let Ok(mut guard) = agent_for_handler.lock() else {
872                    return false;
873                };
874                let inbound = guard.receive(
875                    raw_message,
876                    handler.as_ref().map(|inner| inner as &InboundHandlerFn),
877                );
878                if let Some(response_message) = inbound.response_message.clone() {
879                    if guard
880                        .publish_amqp_response_message(raw_message, &response_message)
881                        .is_err()
882                    {
883                        return false;
884                    }
885                }
886                matches!(
887                    inbound.state,
888                    DeliveryState::Acknowledged
889                        | DeliveryState::Failed
890                        | DeliveryState::Declined
891                        | DeliveryState::Expired
892                )
893            },
894            Some(&amqp_service),
895            max_messages,
896        )?;
897        if let Ok(guard) = agent_ptr.lock() {
898            *self = guard.clone();
899        }
900        Ok(processed)
901    }
902
903    pub fn consume_from_mqtt<F>(
904        &mut self,
905        max_messages: usize,
906        handler: Option<F>,
907    ) -> AcpResult<usize>
908    where
909        F: Fn(&Map<String, Value>, &Envelope) -> Option<Map<String, Value>> + Send + 'static,
910    {
911        let mqtt_transport = self.mqtt_transport.clone().ok_or_else(|| {
912            AcpError::Transport("consume_from_mqtt requires an MQTT-configured agent".to_string())
913        })?;
914        let mqtt_service = self
915            .identity_document
916            .get("service")
917            .and_then(Value::as_object)
918            .and_then(|service| service.get("mqtt"))
919            .and_then(Value::as_object)
920            .cloned()
921            .ok_or_else(|| {
922                AcpError::Transport(
923                    "Identity document is missing service.mqtt configuration".to_string(),
924                )
925            })?;
926        let agent_ptr = Arc::new(std::sync::Mutex::new(self.clone()));
927        let agent_for_handler = Arc::clone(&agent_ptr);
928        let processed = mqtt_transport.consume(
929            self.agent_id(),
930            move |raw_message| {
931                let Ok(mut guard) = agent_for_handler.lock() else {
932                    return false;
933                };
934                let inbound = guard.receive(
935                    raw_message,
936                    handler.as_ref().map(|inner| inner as &InboundHandlerFn),
937                );
938                if let Some(response_message) = inbound.response_message.clone() {
939                    if guard
940                        .publish_mqtt_response_message(raw_message, &response_message)
941                        .is_err()
942                    {
943                        return false;
944                    }
945                }
946                matches!(
947                    inbound.state,
948                    DeliveryState::Acknowledged
949                        | DeliveryState::Failed
950                        | DeliveryState::Declined
951                        | DeliveryState::Expired
952                )
953            },
954            Some(&mqtt_service),
955            max_messages,
956            Duration::from_secs(1),
957        )?;
958        if let Ok(guard) = agent_ptr.lock() {
959            *self = guard.clone();
960        }
961        Ok(processed)
962    }
963
964    fn resolve_recipients(
965        &mut self,
966        recipients: &[String],
967        mode: DeliveryMode,
968    ) -> AcpResult<ResolvedRecipients> {
969        let mut deliverable = Vec::new();
970        let mut preflight_outcomes = Vec::new();
971        for recipient in recipients {
972            let identity_doc = match self.discovery.resolve(recipient) {
973                Ok(identity_doc) => identity_doc,
974                Err(exc) => {
975                    preflight_outcomes.push(failed_outcome(
976                        recipient,
977                        FailReason::PolicyRejected.as_str(),
978                        &exc.to_string(),
979                    ));
980                    continue;
981                }
982            };
983            let remote_capabilities = AgentCapabilities::from_map(
984                identity_doc.get("capabilities").and_then(Value::as_object),
985                recipient,
986            );
987            let capability_match = self.capabilities.choose_compatible(&remote_capabilities);
988            if !capability_match.compatible {
989                preflight_outcomes.push(failed_outcome(
990                    recipient,
991                    reason_for_capability_mismatch(capability_match.reason.as_deref()).as_str(),
992                    capability_match
993                        .reason
994                        .as_deref()
995                        .unwrap_or("No compatible capabilities"),
996                ));
997                continue;
998            }
999            let choice = self.choose_delivery_channel(&remote_capabilities, &identity_doc, mode)?;
1000            let Some(channel) = choice.channel else {
1001                preflight_outcomes.push(failed_outcome(
1002                    recipient,
1003                    FailReason::PolicyRejected.as_str(),
1004                    choice
1005                        .detail
1006                        .as_deref()
1007                        .unwrap_or("Delivery channel unavailable"),
1008                ));
1009                continue;
1010            };
1011            let recipient_public_key = identity_doc
1012                .get("keys")
1013                .and_then(Value::as_object)
1014                .and_then(|keys| keys.get("encryption"))
1015                .and_then(Value::as_object)
1016                .and_then(|enc| enc.get("public_key"))
1017                .and_then(Value::as_str)
1018                .map(str::to_string);
1019            let Some(recipient_public_key) = recipient_public_key else {
1020                preflight_outcomes.push(failed_outcome(
1021                    recipient,
1022                    FailReason::PolicyRejected.as_str(),
1023                    "Recipient identity document missing encryption public key",
1024                ));
1025                continue;
1026            };
1027            deliverable.push(ResolvedRecipient {
1028                recipient: recipient.to_string(),
1029                public_key: recipient_public_key,
1030                channel,
1031                endpoint: choice.endpoint,
1032                amqp_service: choice.amqp_service,
1033                mqtt_service: choice.mqtt_service,
1034            });
1035        }
1036        Ok(ResolvedRecipients {
1037            deliverable,
1038            preflight_outcomes,
1039        })
1040    }
1041
1042    fn choose_delivery_channel(
1043        &self,
1044        remote_capabilities: &AgentCapabilities,
1045        identity_document: &Map<String, Value>,
1046        mode: DeliveryMode,
1047    ) -> AcpResult<ChannelChoice> {
1048        let remote_transports: HashSet<String> = remote_capabilities
1049            .transports
1050            .iter()
1051            .map(|t| t.to_lowercase())
1052            .collect();
1053        let shared = self
1054            .capabilities
1055            .transports
1056            .iter()
1057            .filter(|transport| remote_transports.contains(&transport.to_lowercase()))
1058            .map(|transport| transport.to_lowercase())
1059            .collect::<Vec<_>>();
1060
1061        let service = identity_document
1062            .get("service")
1063            .and_then(Value::as_object)
1064            .cloned()
1065            .unwrap_or_default();
1066        let direct_endpoint = service
1067            .get("direct_endpoint")
1068            .and_then(Value::as_str)
1069            .map(str::to_string);
1070        let has_direct = direct_endpoint
1071            .as_deref()
1072            .map(|endpoint| !endpoint.trim().is_empty())
1073            .unwrap_or(false);
1074        let direct_available = has_direct
1075            && shared
1076                .iter()
1077                .any(|transport| matches!(transport.as_str(), "https" | "http" | "direct"));
1078        let relay_available =
1079            !self.relay_url.trim().is_empty() && shared.iter().any(|t| t == "relay");
1080        let amqp_service = service.get("amqp").and_then(Value::as_object).cloned();
1081        let amqp_available = shared.iter().any(|t| t == "amqp")
1082            && amqp_service
1083                .as_ref()
1084                .and_then(|service| service.get("broker_url"))
1085                .and_then(Value::as_str)
1086                .map(str::trim)
1087                .filter(|v| !v.is_empty())
1088                .is_some();
1089        let mqtt_service = service.get("mqtt").and_then(Value::as_object).cloned();
1090        let mqtt_available = shared.iter().any(|t| t == "mqtt")
1091            && mqtt_service
1092                .as_ref()
1093                .and_then(|service| service.get("broker_url"))
1094                .and_then(Value::as_str)
1095                .map(str::trim)
1096                .filter(|v| !v.is_empty())
1097                .is_some()
1098            && mqtt_service
1099                .as_ref()
1100                .and_then(|service| service.get("topic"))
1101                .and_then(Value::as_str)
1102                .map(str::trim)
1103                .filter(|v| !v.is_empty())
1104                .is_some();
1105
1106        match mode {
1107            DeliveryMode::Direct => {
1108                if direct_available {
1109                    return Ok(ChannelChoice::new(
1110                        Some("direct".to_string()),
1111                        direct_endpoint,
1112                        None,
1113                        None,
1114                        None,
1115                    ));
1116                }
1117                return Ok(ChannelChoice::new(
1118                    None,
1119                    None,
1120                    None,
1121                    None,
1122                    Some("No compatible direct transport and endpoint available".to_string()),
1123                ));
1124            }
1125            DeliveryMode::Relay => {
1126                if relay_available {
1127                    return Ok(ChannelChoice::new(
1128                        Some("relay".to_string()),
1129                        None,
1130                        None,
1131                        None,
1132                        None,
1133                    ));
1134                }
1135                return Ok(ChannelChoice::new(
1136                    None,
1137                    None,
1138                    None,
1139                    None,
1140                    Some("No compatible relay transport available".to_string()),
1141                ));
1142            }
1143            DeliveryMode::Amqp => {
1144                if amqp_available {
1145                    return Ok(ChannelChoice::new(
1146                        Some("amqp".to_string()),
1147                        None,
1148                        amqp_service,
1149                        None,
1150                        None,
1151                    ));
1152                }
1153                return Ok(ChannelChoice::new(
1154                    None,
1155                    None,
1156                    None,
1157                    None,
1158                    Some("No compatible AMQP transport available".to_string()),
1159                ));
1160            }
1161            DeliveryMode::Mqtt => {
1162                if mqtt_available {
1163                    return Ok(ChannelChoice::new(
1164                        Some("mqtt".to_string()),
1165                        None,
1166                        None,
1167                        mqtt_service,
1168                        None,
1169                    ));
1170                }
1171                return Ok(ChannelChoice::new(
1172                    None,
1173                    None,
1174                    None,
1175                    None,
1176                    Some("No compatible MQTT transport available".to_string()),
1177                ));
1178            }
1179            DeliveryMode::Auto => {}
1180        }
1181
1182        if direct_available {
1183            return Ok(ChannelChoice::new(
1184                Some("direct".to_string()),
1185                direct_endpoint,
1186                None,
1187                None,
1188                None,
1189            ));
1190        }
1191        if relay_available {
1192            return Ok(ChannelChoice::new(
1193                Some("relay".to_string()),
1194                None,
1195                None,
1196                None,
1197                None,
1198            ));
1199        }
1200        if amqp_available {
1201            return Ok(ChannelChoice::new(
1202                Some("amqp".to_string()),
1203                None,
1204                amqp_service,
1205                None,
1206                None,
1207            ));
1208        }
1209        if mqtt_available {
1210            return Ok(ChannelChoice::new(
1211                Some("mqtt".to_string()),
1212                None,
1213                None,
1214                mqtt_service,
1215                None,
1216            ));
1217        }
1218        if has_direct {
1219            return Ok(ChannelChoice::new(
1220                None,
1221                None,
1222                None,
1223                None,
1224                Some(
1225                    "No compatible transport implementation available for this recipient"
1226                        .to_string(),
1227                ),
1228            ));
1229        }
1230        if amqp_service.is_some() {
1231            return Ok(ChannelChoice::new(
1232                None,
1233                None,
1234                None,
1235                None,
1236                Some(
1237                    "AMQP transport is advertised but not compatible with sender capabilities"
1238                        .to_string(),
1239                ),
1240            ));
1241        }
1242        if mqtt_service.is_some() {
1243            return Ok(ChannelChoice::new(
1244                None,
1245                None,
1246                None,
1247                None,
1248                Some(
1249                    "MQTT transport is advertised but not compatible with sender capabilities"
1250                        .to_string(),
1251                ),
1252            ));
1253        }
1254        Ok(ChannelChoice::new(
1255            None,
1256            None,
1257            None,
1258            None,
1259            Some(
1260                "Recipient identity document is missing direct_endpoint/amqp/mqtt and no relay fallback is compatible"
1261                    .to_string(),
1262            ),
1263        ))
1264    }
1265
1266    #[allow(clippy::too_many_arguments)]
1267    fn build_message(
1268        &self,
1269        recipients: Vec<String>,
1270        payload: &Map<String, Value>,
1271        recipient_public_keys: &HashMap<String, String>,
1272        message_class: MessageClass,
1273        context_id: &str,
1274        operation_id: Option<String>,
1275        expires_in_seconds: i64,
1276        correlation_id: Option<String>,
1277        in_reply_to: Option<String>,
1278    ) -> AcpResult<AcpMessage> {
1279        let envelope = Envelope::build(
1280            self.agent_id().to_string(),
1281            recipients,
1282            message_class,
1283            context_id.to_string(),
1284            expires_in_seconds,
1285            operation_id,
1286            None,
1287            correlation_id,
1288            in_reply_to,
1289            Some(DEFAULT_CRYPTO_SUITE.to_string()),
1290        )?;
1291        let mut protected_payload =
1292            crypto::encrypt_for_recipients(payload, &envelope, recipient_public_keys)?;
1293        crypto::sign_protected_payload(
1294            &envelope,
1295            &mut protected_payload,
1296            &self.identity.signing_private_key,
1297            &self.identity.signing_kid,
1298        )?;
1299        Ok(AcpMessage {
1300            envelope,
1301            protected_payload,
1302            sender_identity_document: Some(self.identity_document.clone()),
1303        })
1304    }
1305
1306    fn deliver_direct(
1307        &self,
1308        message: &AcpMessage,
1309        targets: &[ResolvedRecipient],
1310    ) -> Vec<DeliveryOutcome> {
1311        let mut outcomes = Vec::new();
1312        let message_map = match message.to_map() {
1313            Ok(map) => map,
1314            Err(exc) => {
1315                for target in targets {
1316                    outcomes.push(failed_outcome(
1317                        &target.recipient,
1318                        FailReason::PolicyRejected.as_str(),
1319                        &format!("Message serialization failure: {exc}"),
1320                    ));
1321                }
1322                return outcomes;
1323            }
1324        };
1325        for target in targets {
1326            let Some(endpoint) = target.endpoint.as_deref() else {
1327                outcomes.push(failed_outcome(
1328                    &target.recipient,
1329                    FailReason::PolicyRejected.as_str(),
1330                    "Missing direct endpoint for direct delivery",
1331                ));
1332                continue;
1333            };
1334            match self.transport.post_json(endpoint, &message_map) {
1335                Ok(response) => {
1336                    outcomes.push(outcome_from_http_response(&target.recipient, &response))
1337                }
1338                Err(exc) => outcomes.push(failed_outcome(
1339                    &target.recipient,
1340                    FailReason::PolicyRejected.as_str(),
1341                    &format!("Direct transport failure: {exc}"),
1342                )),
1343            }
1344        }
1345        outcomes
1346    }
1347
1348    fn deliver_via_relay(
1349        &self,
1350        message: &AcpMessage,
1351        targets: &[ResolvedRecipient],
1352    ) -> Vec<DeliveryOutcome> {
1353        let mut outcomes = Vec::new();
1354        match self.transport.send_to_relay(&self.relay_url, message) {
1355            Ok(relay_response) => {
1356                let mut delivered = HashSet::new();
1357                if let Some(raw_outcomes) = relay_response.get("outcomes").and_then(Value::as_array)
1358                {
1359                    for item in raw_outcomes {
1360                        if let Ok(outcome) = serde_json::from_value::<DeliveryOutcome>(item.clone())
1361                        {
1362                            delivered.insert(outcome.recipient.clone());
1363                            outcomes.push(outcome);
1364                        }
1365                    }
1366                }
1367                for target in targets {
1368                    if !delivered.contains(&target.recipient) {
1369                        outcomes.push(failed_outcome(
1370                            &target.recipient,
1371                            FailReason::PolicyRejected.as_str(),
1372                            "Relay did not return an outcome for recipient",
1373                        ));
1374                    }
1375                }
1376            }
1377            Err(exc) => {
1378                for target in targets {
1379                    outcomes.push(failed_outcome(
1380                        &target.recipient,
1381                        FailReason::PolicyRejected.as_str(),
1382                        &format!("Relay transport failure: {exc}"),
1383                    ));
1384                }
1385            }
1386        }
1387        outcomes
1388    }
1389
1390    fn deliver_via_amqp(
1391        &self,
1392        message: &AcpMessage,
1393        target: &ResolvedRecipient,
1394    ) -> DeliveryOutcome {
1395        let mut outcome = DeliveryOutcome {
1396            recipient: target.recipient.clone(),
1397            state: DeliveryState::Pending,
1398            status_code: None,
1399            response_class: None,
1400            reason_code: None,
1401            detail: None,
1402            response_message: None,
1403        };
1404        let message_map = match message.to_map() {
1405            Ok(message_map) => message_map,
1406            Err(exc) => {
1407                outcome.state = DeliveryState::Failed;
1408                outcome.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
1409                outcome.detail = Some(format!("AMQP message serialization failure: {exc}"));
1410                return outcome;
1411            }
1412        };
1413        let result = (|| -> AcpResult<()> {
1414            let client = if let Some(amqp_transport) = &self.amqp_transport {
1415                amqp_transport.clone()
1416            } else {
1417                let broker_url = target
1418                    .amqp_service
1419                    .as_ref()
1420                    .and_then(|service| service.get("broker_url"))
1421                    .and_then(Value::as_str)
1422                    .ok_or_else(|| {
1423                        AcpError::Transport(
1424                            "AMQP delivery selected but sender is not configured with an AMQP broker"
1425                                .to_string(),
1426                        )
1427                    })?;
1428                AmqpTransportClient::new(
1429                    broker_url.to_string(),
1430                    target
1431                        .amqp_service
1432                        .as_ref()
1433                        .and_then(|service| service.get("exchange"))
1434                        .and_then(Value::as_str)
1435                        .map(str::to_string),
1436                    None,
1437                    10,
1438                )?
1439            };
1440            client.publish(
1441                &message_map,
1442                &target.recipient,
1443                target.amqp_service.as_ref(),
1444            )
1445        })();
1446        match result {
1447            Ok(()) => outcome.state = DeliveryState::Delivered,
1448            Err(exc) => {
1449                outcome.state = DeliveryState::Failed;
1450                outcome.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
1451                outcome.detail = Some(format!("AMQP transport failure: {exc}"));
1452            }
1453        }
1454        outcome
1455    }
1456
1457    fn deliver_via_mqtt(
1458        &self,
1459        message: &AcpMessage,
1460        target: &ResolvedRecipient,
1461    ) -> DeliveryOutcome {
1462        let mut outcome = DeliveryOutcome {
1463            recipient: target.recipient.clone(),
1464            state: DeliveryState::Pending,
1465            status_code: None,
1466            response_class: None,
1467            reason_code: None,
1468            detail: None,
1469            response_message: None,
1470        };
1471        let message_map = match message.to_map() {
1472            Ok(message_map) => message_map,
1473            Err(exc) => {
1474                outcome.state = DeliveryState::Failed;
1475                outcome.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
1476                outcome.detail = Some(format!("MQTT message serialization failure: {exc}"));
1477                return outcome;
1478            }
1479        };
1480        let result = (|| -> AcpResult<()> {
1481            let client = if let Some(mqtt_transport) = &self.mqtt_transport {
1482                mqtt_transport.clone()
1483            } else {
1484                let service = target.mqtt_service.as_ref().ok_or_else(|| {
1485                    AcpError::Transport(
1486                        "MQTT delivery selected but sender is not configured with an MQTT broker"
1487                            .to_string(),
1488                    )
1489                })?;
1490                let broker_url = service
1491                    .get("broker_url")
1492                    .and_then(Value::as_str)
1493                    .ok_or_else(|| {
1494                        AcpError::Transport(
1495                            "MQTT delivery selected but sender is not configured with an MQTT broker"
1496                                .to_string(),
1497                        )
1498                    })?;
1499                MqttTransportClient::new(
1500                    broker_url.to_string(),
1501                    service.get("qos").and_then(|v| v.as_u64()).map(|v| v as u8),
1502                    None,
1503                    10,
1504                    30,
1505                )?
1506            };
1507            client.publish(
1508                &message_map,
1509                &target.recipient,
1510                target.mqtt_service.as_ref(),
1511            )
1512        })();
1513        match result {
1514            Ok(()) => outcome.state = DeliveryState::Delivered,
1515            Err(exc) => {
1516                outcome.state = DeliveryState::Failed;
1517                outcome.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
1518                outcome.detail = Some(format!("MQTT transport failure: {exc}"));
1519            }
1520        }
1521        outcome
1522    }
1523
1524    fn publish_amqp_response_message(
1525        &mut self,
1526        raw_message: &Map<String, Value>,
1527        response_message: &Map<String, Value>,
1528    ) -> AcpResult<()> {
1529        let amqp_transport = self
1530            .amqp_transport
1531            .clone()
1532            .ok_or_else(|| AcpError::Transport("AMQP transport is not configured".to_string()))?;
1533        let sender_id = raw_message
1534            .get("envelope")
1535            .and_then(Value::as_object)
1536            .and_then(|envelope| envelope.get("sender"))
1537            .and_then(Value::as_str)
1538            .ok_or_else(|| {
1539                AcpError::Transport(
1540                    "Inbound message sender is missing for AMQP response routing".to_string(),
1541                )
1542            })?;
1543        let sender_identity = self.resolve_sender_identity_document(raw_message, sender_id)?;
1544        let sender_amqp_service = sender_identity
1545            .get("service")
1546            .and_then(Value::as_object)
1547            .and_then(|service| service.get("amqp"))
1548            .and_then(Value::as_object)
1549            .cloned()
1550            .ok_or_else(|| {
1551                AcpError::Transport(format!(
1552                    "Sender {sender_id} does not advertise service.amqp for AMQP response delivery"
1553                ))
1554            })?;
1555        amqp_transport.publish(response_message, sender_id, Some(&sender_amqp_service))
1556    }
1557
1558    fn publish_mqtt_response_message(
1559        &mut self,
1560        raw_message: &Map<String, Value>,
1561        response_message: &Map<String, Value>,
1562    ) -> AcpResult<()> {
1563        let mqtt_transport = self
1564            .mqtt_transport
1565            .clone()
1566            .ok_or_else(|| AcpError::Transport("MQTT transport is not configured".to_string()))?;
1567        let sender_id = raw_message
1568            .get("envelope")
1569            .and_then(Value::as_object)
1570            .and_then(|envelope| envelope.get("sender"))
1571            .and_then(Value::as_str)
1572            .ok_or_else(|| {
1573                AcpError::Transport(
1574                    "Inbound message sender is missing for MQTT response routing".to_string(),
1575                )
1576            })?;
1577        let sender_identity = self.resolve_sender_identity_document(raw_message, sender_id)?;
1578        let sender_mqtt_service = sender_identity
1579            .get("service")
1580            .and_then(Value::as_object)
1581            .and_then(|service| service.get("mqtt"))
1582            .and_then(Value::as_object)
1583            .cloned()
1584            .ok_or_else(|| {
1585                AcpError::Transport(format!(
1586                    "Sender {sender_id} does not advertise service.mqtt for MQTT response delivery"
1587                ))
1588            })?;
1589        mqtt_transport.publish(response_message, sender_id, Some(&sender_mqtt_service))
1590    }
1591
1592    fn resolve_sender_identity_document(
1593        &mut self,
1594        raw_message: &Map<String, Value>,
1595        sender_id: &str,
1596    ) -> AcpResult<Map<String, Value>> {
1597        if let Some(embedded) = raw_message
1598            .get("sender_identity_document")
1599            .and_then(Value::as_object)
1600            .cloned()
1601            && embedded.get("agent_id").and_then(Value::as_str) == Some(sender_id)
1602            && verify_identity_document(&embedded)
1603        {
1604            // Cache verified embedded identity documents to avoid blocking discovery lookups
1605            // when replying to the same sender in the current request cycle.
1606            let _ = self.discovery.register_identity_document(embedded.clone());
1607            return Ok(embedded);
1608        }
1609        self.discovery.resolve(sender_id)
1610    }
1611
1612    fn validate_envelope_for_inbound(&self, envelope: &Envelope) -> AcpResult<()> {
1613        if envelope.acp_version != ACP_VERSION {
1614            return Err(AcpError::Processing {
1615                reason: FailReason::UnsupportedVersion,
1616                detail: format!("Unsupported ACP version: {}", envelope.acp_version),
1617            });
1618        }
1619        if envelope.crypto_suite != DEFAULT_CRYPTO_SUITE {
1620            return Err(AcpError::Processing {
1621                reason: FailReason::UnsupportedCryptoSuite,
1622                detail: format!("Unsupported crypto suite: {}", envelope.crypto_suite),
1623            });
1624        }
1625        if envelope.is_expired() {
1626            return Err(AcpError::Processing {
1627                reason: FailReason::ExpiredMessage,
1628                detail: "Message is expired".to_string(),
1629            });
1630        }
1631        Ok(())
1632    }
1633
1634    fn create_response_message(
1635        &self,
1636        sender_identity_document: &Map<String, Value>,
1637        request_envelope: &Envelope,
1638        response_class: MessageClass,
1639        response_payload: Map<String, Value>,
1640    ) -> AcpResult<AcpMessage> {
1641        let sender_id = &request_envelope.sender;
1642        let sender_encryption_public_key = sender_identity_document
1643            .get("keys")
1644            .and_then(Value::as_object)
1645            .and_then(|keys| keys.get("encryption"))
1646            .and_then(Value::as_object)
1647            .and_then(|encryption| encryption.get("public_key"))
1648            .and_then(Value::as_str)
1649            .ok_or_else(|| AcpError::Processing {
1650                reason: FailReason::PolicyRejected,
1651                detail: "Sender identity document missing encryption key".to_string(),
1652            })?;
1653        self.build_message(
1654            vec![sender_id.to_string()],
1655            &response_payload,
1656            &HashMap::from([(
1657                sender_id.to_string(),
1658                sender_encryption_public_key.to_string(),
1659            )]),
1660            response_class,
1661            &request_envelope.context_id,
1662            Some(request_envelope.operation_id.clone()),
1663            300,
1664            request_envelope
1665                .correlation_id
1666                .clone()
1667                .or_else(|| Some(request_envelope.operation_id.clone())),
1668            Some(request_envelope.message_id.clone()),
1669        )
1670    }
1671
1672    fn sync_delivery_states(&mut self, operation_id: &str, outcomes: &[DeliveryOutcome]) {
1673        let mut states = HashMap::new();
1674        for outcome in outcomes {
1675            states.insert(
1676                outcome.recipient.clone(),
1677                format!("{:?}", outcome.state).to_uppercase(),
1678            );
1679        }
1680        self.delivery_states
1681            .insert(operation_id.to_string(), states);
1682    }
1683}
1684
1685#[derive(Debug, Clone)]
1686struct ResolvedRecipient {
1687    recipient: String,
1688    public_key: String,
1689    channel: String,
1690    endpoint: Option<String>,
1691    amqp_service: Option<Map<String, Value>>,
1692    mqtt_service: Option<Map<String, Value>>,
1693}
1694
1695#[derive(Debug, Clone)]
1696struct ResolvedRecipients {
1697    deliverable: Vec<ResolvedRecipient>,
1698    preflight_outcomes: Vec<DeliveryOutcome>,
1699}
1700
1701#[derive(Debug, Clone)]
1702struct ChannelChoice {
1703    channel: Option<String>,
1704    endpoint: Option<String>,
1705    amqp_service: Option<Map<String, Value>>,
1706    mqtt_service: Option<Map<String, Value>>,
1707    detail: Option<String>,
1708}
1709
1710impl ChannelChoice {
1711    fn new(
1712        channel: Option<String>,
1713        endpoint: Option<String>,
1714        amqp_service: Option<Map<String, Value>>,
1715        mqtt_service: Option<Map<String, Value>>,
1716        detail: Option<String>,
1717    ) -> Self {
1718        Self {
1719            channel,
1720            endpoint,
1721            amqp_service,
1722            mqtt_service,
1723            detail,
1724        }
1725    }
1726}
1727
1728#[derive(Debug, Clone)]
1729struct DedupStore {
1730    ttl: Duration,
1731    processed: HashMap<String, Instant>,
1732}
1733
1734impl DedupStore {
1735    fn new(ttl: Duration) -> Self {
1736        Self {
1737            ttl,
1738            processed: HashMap::new(),
1739        }
1740    }
1741
1742    fn is_duplicate(&mut self, message_id: &str) -> bool {
1743        self.cleanup_expired();
1744        self.processed.contains_key(message_id)
1745    }
1746
1747    fn mark_processed(&mut self, message_id: &str) {
1748        self.processed
1749            .insert(message_id.to_string(), Instant::now());
1750    }
1751
1752    fn cleanup_expired(&mut self) {
1753        let ttl = self.ttl;
1754        self.processed
1755            .retain(|_, timestamp| timestamp.elapsed() < ttl);
1756    }
1757}
1758
1759fn outcome_from_http_response(recipient: &str, response: &TransportResponse) -> DeliveryOutcome {
1760    let mut response_class = None;
1761    let mut response_message = None;
1762    let mut reason_code = None;
1763    let mut detail = None;
1764
1765    if let Some(body) = &response.body {
1766        if let Some(raw_response_message) = body.get("response_message").and_then(Value::as_object)
1767        {
1768            response_message = Some(raw_response_message.clone());
1769            response_class = raw_response_message
1770                .get("envelope")
1771                .and_then(Value::as_object)
1772                .and_then(|envelope| envelope.get("message_class"))
1773                .and_then(Value::as_str)
1774                .and_then(parse_message_class);
1775        }
1776        reason_code = body
1777            .get("reason_code")
1778            .and_then(Value::as_str)
1779            .map(str::to_string);
1780        detail = body
1781            .get("detail")
1782            .and_then(Value::as_str)
1783            .map(str::to_string);
1784    }
1785    if detail.is_none() && response.status_code >= 400 {
1786        detail = Some(format!("Recipient HTTP {}", response.status_code));
1787    }
1788
1789    DeliveryOutcome {
1790        recipient: recipient.to_string(),
1791        state: delivery_state_from_response(
1792            response.status_code,
1793            response_class,
1794            reason_code.as_deref(),
1795        ),
1796        status_code: Some(response.status_code),
1797        response_class,
1798        reason_code,
1799        detail,
1800        response_message,
1801    }
1802}
1803
1804fn delivery_state_from_response(
1805    status_code: u16,
1806    response_class: Option<MessageClass>,
1807    reason_code: Option<&str>,
1808) -> DeliveryState {
1809    if (200..300).contains(&status_code) {
1810        if response_class == Some(MessageClass::Fail) {
1811            if reason_code == Some(FailReason::ExpiredMessage.as_str()) {
1812                return DeliveryState::Expired;
1813            }
1814            if reason_code == Some(FailReason::PolicyRejected.as_str()) {
1815                return DeliveryState::Declined;
1816            }
1817            return DeliveryState::Failed;
1818        }
1819        if matches!(
1820            response_class,
1821            Some(MessageClass::Ack | MessageClass::Capabilities)
1822        ) {
1823            return DeliveryState::Acknowledged;
1824        }
1825        return DeliveryState::Delivered;
1826    }
1827    if status_code == 410 {
1828        return DeliveryState::Expired;
1829    }
1830    if [401, 403, 409, 422].contains(&status_code) {
1831        return DeliveryState::Declined;
1832    }
1833    DeliveryState::Failed
1834}
1835
1836fn parse_message_class(value: &str) -> Option<MessageClass> {
1837    match value {
1838        "SEND" => Some(MessageClass::Send),
1839        "ACK" => Some(MessageClass::Ack),
1840        "FAIL" => Some(MessageClass::Fail),
1841        "CAPABILITIES" => Some(MessageClass::Capabilities),
1842        "COMPENSATE" => Some(MessageClass::Compensate),
1843        _ => None,
1844    }
1845}
1846
1847fn failed_outcome(recipient: &str, reason_code: &str, detail: &str) -> DeliveryOutcome {
1848    DeliveryOutcome {
1849        recipient: recipient.to_string(),
1850        state: DeliveryState::Failed,
1851        status_code: None,
1852        response_class: None,
1853        reason_code: Some(reason_code.to_string()),
1854        detail: Some(detail.to_string()),
1855        response_message: None,
1856    }
1857}
1858
1859fn to_public_key_map(targets: &[ResolvedRecipient]) -> HashMap<String, String> {
1860    targets
1861        .iter()
1862        .map(|target| (target.recipient.clone(), target.public_key.clone()))
1863        .collect()
1864}
1865
1866fn reason_for_capability_mismatch(reason: Option<&str>) -> FailReason {
1867    let normalized = reason.unwrap_or_default().to_lowercase();
1868    if normalized.contains("protocol") {
1869        return FailReason::UnsupportedVersion;
1870    }
1871    if normalized.contains("crypto") {
1872        return FailReason::UnsupportedCryptoSuite;
1873    }
1874    if normalized.contains("profile") {
1875        return FailReason::UnsupportedProfile;
1876    }
1877    FailReason::PolicyRejected
1878}
1879
1880fn build_local_amqp_service(
1881    agent_id: &str,
1882    options: &AcpAgentOptions,
1883) -> AcpResult<Option<Map<String, Value>>> {
1884    let Some(broker_url) = options.amqp_broker_url.as_deref() else {
1885        return Ok(None);
1886    };
1887    Ok(Some(AmqpTransportClient::build_service_hint(
1888        agent_id,
1889        broker_url,
1890        Some(&options.amqp_exchange),
1891    )?))
1892}
1893
1894fn build_local_mqtt_service(
1895    agent_id: &str,
1896    options: &AcpAgentOptions,
1897) -> AcpResult<Option<Map<String, Value>>> {
1898    let Some(broker_url) = options.mqtt_broker_url.as_deref() else {
1899        return Ok(None);
1900    };
1901    Ok(Some(MqttTransportClient::build_service_hint(
1902        agent_id,
1903        broker_url,
1904        None,
1905        Some(options.mqtt_qos),
1906        Some(&options.mqtt_topic_prefix),
1907    )?))
1908}
1909
1910fn apply_http_security_profile(identity_document: &mut Map<String, Value>, mtls_enabled: bool) {
1911    if !mtls_enabled {
1912        return;
1913    }
1914    let mut service = identity_document
1915        .get("service")
1916        .and_then(Value::as_object)
1917        .cloned()
1918        .unwrap_or_default();
1919    let direct_endpoint = service
1920        .get("direct_endpoint")
1921        .and_then(Value::as_str)
1922        .map(str::to_string);
1923    let relay_hints = service
1924        .get("relay_hints")
1925        .and_then(Value::as_array)
1926        .map(|items| {
1927            items
1928                .iter()
1929                .filter_map(Value::as_str)
1930                .map(str::to_string)
1931                .collect::<Vec<_>>()
1932        })
1933        .unwrap_or_default();
1934    if let Some(endpoint) = direct_endpoint
1935        && !endpoint.trim().is_empty()
1936    {
1937        service.insert(
1938            "http".to_string(),
1939            serde_json::json!({
1940                "endpoint": endpoint,
1941                "security_profile": "mtls",
1942            }),
1943        );
1944    }
1945    if let Some(relay_endpoint) = relay_hints.first()
1946        && !relay_endpoint.trim().is_empty()
1947    {
1948        service.insert(
1949            "relay".to_string(),
1950            serde_json::json!({
1951                "endpoint": relay_endpoint,
1952                "security_profile": "mtls",
1953            }),
1954        );
1955    }
1956    identity_document.insert("service".to_string(), Value::Object(service));
1957}
1958
1959fn resolve_key_provider(options: &AcpAgentOptions) -> AcpResult<Box<dyn KeyProvider>> {
1960    if let Some(provider) = options.key_provider_instance.clone() {
1961        return Ok(Box::new(ArcKeyProvider(provider)));
1962    }
1963    let provider_name = normalize_key_provider_name(&options.key_provider);
1964    match provider_name.as_str() {
1965        "local" => Ok(Box::new(LocalKeyProvider::new(
1966            options.storage_dir.clone(),
1967            options.cert_file.clone(),
1968            options.key_file.clone(),
1969            options.ca_file.clone(),
1970        ))),
1971        "vault" => {
1972            let vault_url = options.vault_url.clone().ok_or_else(|| {
1973                AcpError::Validation("vault_url is required when key_provider=vault".to_string())
1974            })?;
1975            let vault_path = options.vault_path.clone().ok_or_else(|| {
1976                AcpError::Validation("vault_path is required when key_provider=vault".to_string())
1977            })?;
1978            Ok(Box::new(VaultKeyProvider::new(
1979                vault_url,
1980                vault_path,
1981                Some(options.vault_token_env.clone()),
1982                options.vault_token.clone(),
1983                options.http_timeout_seconds,
1984                options.ca_file.clone(),
1985                options.allow_insecure_tls,
1986                options.allow_insecure_http,
1987            )?))
1988        }
1989        _ => Err(AcpError::Validation(format!(
1990            "Unsupported key_provider: {}",
1991            options.key_provider
1992        ))),
1993    }
1994}
1995
1996fn normalize_key_provider_name(value: &str) -> String {
1997    let normalized = value.trim().to_lowercase();
1998    if normalized.is_empty() {
1999        "local".to_string()
2000    } else {
2001        normalized
2002    }
2003}
2004
2005fn matches_provider_local(key_provider_info: &KeyProviderInfo) -> bool {
2006    key_provider_info
2007        .get("provider")
2008        .and_then(Value::as_str)
2009        .map(|provider| provider == "local")
2010        .unwrap_or(false)
2011}
2012
2013fn identity_from_provider(agent_id: &str, keys: &IdentityKeyMaterial) -> AcpResult<AgentIdentity> {
2014    let mut missing = Vec::new();
2015    if keys
2016        .signing_public_key
2017        .as_deref()
2018        .map(str::trim)
2019        .filter(|v| !v.is_empty())
2020        .is_none()
2021    {
2022        missing.push("signing_public_key");
2023    }
2024    if keys
2025        .encryption_public_key
2026        .as_deref()
2027        .map(str::trim)
2028        .filter(|v| !v.is_empty())
2029        .is_none()
2030    {
2031        missing.push("encryption_public_key");
2032    }
2033    if keys
2034        .signing_kid
2035        .as_deref()
2036        .map(str::trim)
2037        .filter(|v| !v.is_empty())
2038        .is_none()
2039    {
2040        missing.push("signing_kid");
2041    }
2042    if keys
2043        .encryption_kid
2044        .as_deref()
2045        .map(str::trim)
2046        .filter(|v| !v.is_empty())
2047        .is_none()
2048    {
2049        missing.push("encryption_kid");
2050    }
2051    if !missing.is_empty() {
2052        return Err(AcpError::Validation(format!(
2053            "External key provider requires identity public metadata for first-time bootstrap: {}",
2054            missing.join(", ")
2055        )));
2056    }
2057    Ok(AgentIdentity {
2058        agent_id: agent_id.to_string(),
2059        signing_private_key: keys.signing_private_key.clone(),
2060        signing_public_key: keys.signing_public_key.clone().unwrap_or_default(),
2061        encryption_private_key: keys.encryption_private_key.clone(),
2062        encryption_public_key: keys.encryption_public_key.clone().unwrap_or_default(),
2063        signing_kid: keys.signing_kid.clone().unwrap_or_default(),
2064        encryption_kid: keys.encryption_kid.clone().unwrap_or_default(),
2065    })
2066}
2067
2068fn apply_provider_keys(
2069    identity: &AgentIdentity,
2070    keys: &IdentityKeyMaterial,
2071) -> AcpResult<AgentIdentity> {
2072    if let Some(signing_public_key) = keys.signing_public_key.as_deref()
2073        && signing_public_key != identity.signing_public_key
2074    {
2075        return Err(AcpError::Validation(
2076            "Key provider signing_public_key does not match local identity metadata".to_string(),
2077        ));
2078    }
2079    if let Some(encryption_public_key) = keys.encryption_public_key.as_deref()
2080        && encryption_public_key != identity.encryption_public_key
2081    {
2082        return Err(AcpError::Validation(
2083            "Key provider encryption_public_key does not match local identity metadata".to_string(),
2084        ));
2085    }
2086    if let Some(signing_kid) = keys.signing_kid.as_deref()
2087        && signing_kid != identity.signing_kid
2088    {
2089        return Err(AcpError::Validation(
2090            "Key provider signing_kid does not match local identity metadata".to_string(),
2091        ));
2092    }
2093    if let Some(encryption_kid) = keys.encryption_kid.as_deref()
2094        && encryption_kid != identity.encryption_kid
2095    {
2096        return Err(AcpError::Validation(
2097            "Key provider encryption_kid does not match local identity metadata".to_string(),
2098        ));
2099    }
2100    Ok(AgentIdentity {
2101        agent_id: identity.agent_id.clone(),
2102        signing_private_key: keys.signing_private_key.clone(),
2103        signing_public_key: identity.signing_public_key.clone(),
2104        encryption_private_key: keys.encryption_private_key.clone(),
2105        encryption_public_key: identity.encryption_public_key.clone(),
2106        signing_kid: identity.signing_kid.clone(),
2107        encryption_kid: identity.encryption_kid.clone(),
2108    })
2109}
2110
2111fn base_url_from_endpoint(endpoint: &str) -> Option<String> {
2112    let endpoint = endpoint.trim();
2113    if endpoint.is_empty() {
2114        return None;
2115    }
2116    let parsed = url::Url::parse(endpoint).ok()?;
2117    let scheme = parsed.scheme();
2118    let host = parsed.host_str()?;
2119    let authority = if let Some(port) = parsed.port() {
2120        format!("{host}:{port}")
2121    } else {
2122        host.to_string()
2123    };
2124    Some(format!("{scheme}://{authority}"))
2125}
2126
2127fn first_non_blank(values: &[Option<String>]) -> Option<String> {
2128    values
2129        .iter()
2130        .flatten()
2131        .map(|value| value.trim())
2132        .find(|value| !value.is_empty())
2133        .map(str::to_string)
2134}
2135
2136#[derive(Clone)]
2137struct ArcKeyProvider(Arc<dyn KeyProvider>);
2138
2139impl KeyProvider for ArcKeyProvider {
2140    fn load_identity_keys(&self, agent_id: &str) -> AcpResult<IdentityKeyMaterial> {
2141        self.0.load_identity_keys(agent_id)
2142    }
2143
2144    fn load_tls_material(&self, agent_id: &str) -> AcpResult<crate::key_provider::TlsMaterial> {
2145        self.0.load_tls_material(agent_id)
2146    }
2147
2148    fn load_ca_bundle(&self, agent_id: &str) -> AcpResult<Option<String>> {
2149        self.0.load_ca_bundle(agent_id)
2150    }
2151
2152    fn describe(&self) -> KeyProviderInfo {
2153        self.0.describe()
2154    }
2155}