1use std::collections::{HashMap, HashSet};
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use serde::Serialize;
11use serde_json::{Map, Value};
12use uuid::Uuid;
13
14use crate::amqp_transport::AmqpTransportClient;
15use crate::capabilities::AgentCapabilities;
16use crate::constants::{ACP_VERSION, DEFAULT_CRYPTO_SUITE};
17use crate::crypto;
18use crate::discovery::DiscoveryClient;
19use crate::errors::{AcpError, AcpResult, FailReason};
20use crate::http_security::{HttpSecurityPolicy, validate_http_client_policy, validate_http_url};
21use crate::identity::{
22 AgentIdentity, parse_agent_id, read_identity, verify_identity_document, write_identity,
23};
24use crate::key_provider::{
25 IdentityKeyMaterial, KeyProvider, KeyProviderInfo, LocalKeyProvider, VaultKeyProvider,
26};
27use crate::messages::{
28 AcpMessage, CompensateInstruction, DeliveryMode, DeliveryOutcome, DeliveryState, Envelope,
29 MessageClass, SendResult, build_ack_payload, build_fail_payload,
30};
31use crate::mqtt_transport::MqttTransportClient;
32use crate::options::AcpAgentOptions;
33use crate::transport::{TransportClient, TransportResponse};
34use crate::well_known::build_well_known_document;
35
36#[derive(Debug, Clone, Serialize)]
37pub struct InboundResult {
38 pub state: DeliveryState,
39 pub reason_code: Option<String>,
40 pub detail: Option<String>,
41 pub decrypted_payload: Option<Map<String, Value>>,
42 pub response_message: Option<Map<String, Value>>,
43}
44
45#[derive(Debug, Clone)]
46pub struct DecryptedMessage {
47 pub message: AcpMessage,
48 pub payload: Map<String, Value>,
49}
50
51pub type InboundHandlerFn =
52 dyn for<'a> Fn(&'a Map<String, Value>, &'a Envelope) -> Option<Map<String, Value>>;
53
54#[derive(Debug, Clone)]
55pub struct CapabilityRequestResult {
56 pub result: SendResult,
57 pub capabilities: Option<Map<String, Value>>,
58}
59
60#[derive(Debug, Clone)]
61pub struct AcpAgent {
62 pub identity: AgentIdentity,
63 pub identity_document: Map<String, Value>,
64 pub discovery: DiscoveryClient,
65 pub transport: TransportClient,
66 pub amqp_transport: Option<AmqpTransportClient>,
67 pub mqtt_transport: Option<MqttTransportClient>,
68 pub capabilities: AgentCapabilities,
69 pub storage_dir: PathBuf,
70 pub trust_profile: String,
71 pub relay_url: String,
72 pub default_delivery_mode: DeliveryMode,
73 pub key_provider_info: KeyProviderInfo,
74 delivery_states: HashMap<String, HashMap<String, String>>,
75 dedup: DedupStore,
76}
77
78impl AcpAgent {
79 pub fn load_or_create(agent_id: &str, options: Option<AcpAgentOptions>) -> AcpResult<Self> {
80 parse_agent_id(agent_id)?;
81 let options = options.unwrap_or_default();
82 std::fs::create_dir_all(&options.storage_dir)?;
83
84 let key_provider = resolve_key_provider(&options)?;
85 let key_provider_info = key_provider.describe();
86
87 let provider_tls_material = key_provider.load_tls_material(agent_id).ok();
88 let provider_ca_bundle = key_provider.load_ca_bundle(agent_id).ok().flatten();
89
90 let effective_ca_file = first_non_blank(&[
91 options.ca_file.clone(),
92 provider_tls_material
93 .as_ref()
94 .and_then(|m| m.ca_file.clone()),
95 provider_ca_bundle,
96 ]);
97 let effective_cert_file = first_non_blank(&[
98 options.cert_file.clone(),
99 provider_tls_material
100 .as_ref()
101 .and_then(|m| m.cert_file.clone()),
102 ]);
103 let effective_key_file = first_non_blank(&[
104 options.key_file.clone(),
105 provider_tls_material
106 .as_ref()
107 .and_then(|m| m.key_file.clone()),
108 ]);
109
110 let policy = HttpSecurityPolicy {
111 allow_insecure_http: options.allow_insecure_http,
112 allow_insecure_tls: options.allow_insecure_tls,
113 mtls_enabled: options.mtls_enabled,
114 ca_file: effective_ca_file.clone(),
115 cert_file: effective_cert_file.clone(),
116 key_file: effective_key_file.clone(),
117 };
118 validate_http_client_policy(&policy, "Agent HTTP security configuration")?;
119 if let Some(endpoint) = options.endpoint.as_deref() {
120 validate_http_url(
121 endpoint,
122 policy.allow_insecure_http,
123 policy.mtls_enabled,
124 "Agent direct endpoint configuration",
125 )?;
126 }
127 validate_http_url(
128 &options.relay_url,
129 policy.allow_insecure_http,
130 policy.mtls_enabled,
131 "Agent relay URL configuration",
132 )?;
133 for relay_hint in &options.relay_hints {
134 validate_http_url(
135 relay_hint,
136 policy.allow_insecure_http,
137 policy.mtls_enabled,
138 "Agent relay hint configuration",
139 )?;
140 }
141 for directory_hint in &options.enterprise_directory_hints {
142 validate_http_url(
143 directory_hint,
144 policy.allow_insecure_http,
145 policy.mtls_enabled,
146 "Agent enterprise directory hint configuration",
147 )?;
148 }
149
150 let local_amqp_service = build_local_amqp_service(agent_id, &options)?;
151 let local_mqtt_service = build_local_mqtt_service(agent_id, &options)?;
152
153 let provider_identity_keys = key_provider.load_identity_keys(agent_id).ok();
154 let external_key_provider = !matches_provider_local(&key_provider_info);
155
156 let (identity, identity_document, capabilities) =
157 match read_identity(&options.storage_dir, agent_id)? {
158 None => {
159 let identity = if let Some(keys) = &provider_identity_keys {
160 identity_from_provider(agent_id, keys)?
161 } else if external_key_provider {
162 return Err(AcpError::KeyProvider(
163 "Unable to load identity keys from key provider".to_string(),
164 ));
165 } else {
166 AgentIdentity::create(agent_id)?
167 };
168 let capabilities = options
169 .capabilities
170 .clone()
171 .unwrap_or_else(|| AgentCapabilities::new(agent_id.to_string()));
172 let mut identity_document = identity.build_identity_document(
173 options.endpoint.as_deref(),
174 &options.relay_hints,
175 &options.trust_profile,
176 Some(&capabilities.to_map()),
177 365,
178 local_amqp_service.as_ref(),
179 local_mqtt_service.as_ref(),
180 if options.mtls_enabled {
181 Some("mtls")
182 } else {
183 None
184 },
185 if options.mtls_enabled {
186 Some("mtls")
187 } else {
188 None
189 },
190 )?;
191 apply_http_security_profile(&mut identity_document, options.mtls_enabled);
192 write_identity(&options.storage_dir, &identity, &identity_document)?;
193 (identity, identity_document, capabilities)
194 }
195 Some(bundle) => {
196 let mut identity = bundle.identity;
197 let mut identity_document = bundle.identity_document;
198 if let Some(keys) = &provider_identity_keys {
199 identity = apply_provider_keys(&identity, keys)?;
200 } else if external_key_provider {
201 return Err(AcpError::KeyProvider(
202 "Unable to load identity keys from key provider".to_string(),
203 ));
204 }
205 let valid_document = verify_identity_document(&identity_document);
206 let capabilities = options.capabilities.clone().unwrap_or_else(|| {
207 AgentCapabilities::from_map(
208 identity_document
209 .get("capabilities")
210 .and_then(Value::as_object),
211 agent_id,
212 )
213 });
214 let should_rewrite = !valid_document
215 || options.endpoint.is_some()
216 || !options.relay_hints.is_empty()
217 || options.capabilities.is_some()
218 || local_amqp_service.is_some()
219 || local_mqtt_service.is_some();
220 if should_rewrite {
221 let existing_service = identity_document
222 .get("service")
223 .and_then(Value::as_object)
224 .cloned()
225 .unwrap_or_default();
226 let existing_endpoint = existing_service
227 .get("direct_endpoint")
228 .and_then(Value::as_str)
229 .map(str::to_string);
230 let existing_hints = existing_service
231 .get("relay_hints")
232 .and_then(Value::as_array)
233 .map(|items| {
234 items
235 .iter()
236 .filter_map(Value::as_str)
237 .map(str::to_string)
238 .collect::<Vec<_>>()
239 })
240 .unwrap_or_default();
241 let existing_amqp_service = existing_service
242 .get("amqp")
243 .and_then(Value::as_object)
244 .cloned();
245 let existing_mqtt_service = existing_service
246 .get("mqtt")
247 .and_then(Value::as_object)
248 .cloned();
249 identity_document = identity.build_identity_document(
250 options.endpoint.as_deref().or(existing_endpoint.as_deref()),
251 if options.relay_hints.is_empty() {
252 &existing_hints
253 } else {
254 &options.relay_hints
255 },
256 &options.trust_profile,
257 Some(&capabilities.to_map()),
258 365,
259 local_amqp_service
260 .as_ref()
261 .or(existing_amqp_service.as_ref()),
262 local_mqtt_service
263 .as_ref()
264 .or(existing_mqtt_service.as_ref()),
265 if options.mtls_enabled {
266 Some("mtls")
267 } else {
268 None
269 },
270 if options.mtls_enabled {
271 Some("mtls")
272 } else {
273 None
274 },
275 )?;
276 apply_http_security_profile(&mut identity_document, options.mtls_enabled);
277 write_identity(&options.storage_dir, &identity, &identity_document)?;
278 }
279 (identity, identity_document, capabilities)
280 }
281 };
282
283 let mut effective_relay_hints = if !options.relay_hints.is_empty() {
284 options.relay_hints.clone()
285 } else {
286 identity_document
287 .get("service")
288 .and_then(Value::as_object)
289 .and_then(|service| service.get("relay_hints"))
290 .and_then(Value::as_array)
291 .map(|items| {
292 items
293 .iter()
294 .filter_map(Value::as_str)
295 .map(str::to_string)
296 .collect::<Vec<_>>()
297 })
298 .unwrap_or_default()
299 };
300 if !effective_relay_hints.contains(&options.relay_url) {
301 effective_relay_hints.push(options.relay_url.clone());
302 }
303
304 let mut discovery = DiscoveryClient::new(
305 Some(options.storage_dir.join("discovery_cache.json")),
306 Some(options.discovery_scheme.clone()),
307 Some(effective_relay_hints),
308 Some(options.enterprise_directory_hints.clone()),
309 options.http_timeout_seconds,
310 policy.allow_insecure_http,
311 policy.allow_insecure_tls,
312 policy.ca_file.clone(),
313 policy.mtls_enabled,
314 policy.cert_file.clone(),
315 policy.key_file.clone(),
316 )?;
317 discovery.seed(identity_document.clone())?;
318
319 let amqp_transport = if let Some(transport) = options.amqp_transport.clone() {
320 Some(transport)
321 } else if let Some(broker_url) = options.amqp_broker_url.as_deref() {
322 Some(AmqpTransportClient::new(
323 broker_url.to_string(),
324 Some(options.amqp_exchange.clone()),
325 Some(options.amqp_exchange_type.clone()),
326 options.http_timeout_seconds,
327 )?)
328 } else {
329 None
330 };
331
332 let mqtt_transport = if let Some(transport) = options.mqtt_transport.clone() {
333 Some(transport)
334 } else if let Some(broker_url) = options.mqtt_broker_url.as_deref() {
335 Some(MqttTransportClient::new(
336 broker_url.to_string(),
337 Some(options.mqtt_qos),
338 Some(options.mqtt_topic_prefix.clone()),
339 options.http_timeout_seconds,
340 30,
341 )?)
342 } else {
343 None
344 };
345
346 Ok(Self {
347 identity,
348 identity_document,
349 discovery,
350 transport: TransportClient::new(options.http_timeout_seconds, &policy)?,
351 amqp_transport,
352 mqtt_transport,
353 capabilities,
354 storage_dir: options.storage_dir,
355 trust_profile: options.trust_profile,
356 relay_url: options.relay_url,
357 default_delivery_mode: options.default_delivery_mode,
358 key_provider_info,
359 delivery_states: HashMap::new(),
360 dedup: DedupStore::new(Duration::from_secs(3600)),
361 })
362 }
363
364 pub fn agent_id(&self) -> &str {
365 &self.identity.agent_id
366 }
367
368 pub fn get_delivery_states(&self) -> &HashMap<String, HashMap<String, String>> {
369 &self.delivery_states
370 }
371
372 pub fn build_well_known_document(
373 &self,
374 base_url: Option<&str>,
375 identity_document_url: Option<&str>,
376 ) -> AcpResult<Map<String, Value>> {
377 let resolved_base_url = base_url
378 .map(str::to_string)
379 .or_else(|| {
380 self.identity_document
381 .get("service")
382 .and_then(Value::as_object)
383 .and_then(|service| service.get("direct_endpoint"))
384 .and_then(Value::as_str)
385 .and_then(base_url_from_endpoint)
386 })
387 .ok_or_else(|| {
388 AcpError::Validation(
389 "Unable to build /.well-known/acp metadata without base_url or direct_endpoint"
390 .to_string(),
391 )
392 })?;
393 build_well_known_document(
394 &self.identity_document,
395 &resolved_base_url,
396 identity_document_url,
397 Some(ACP_VERSION),
398 )
399 }
400
401 pub fn register_identity_document(
402 &mut self,
403 identity_document: Map<String, Value>,
404 ) -> AcpResult<()> {
405 self.discovery.register_identity_document(identity_document)
406 }
407
408 pub fn resolve_well_known(
409 &mut self,
410 base_url: &str,
411 expected_agent_id: Option<&str>,
412 ) -> AcpResult<Map<String, Value>> {
413 self.discovery
414 .resolve_well_known(base_url, expected_agent_id)
415 }
416
417 pub fn send(
418 &mut self,
419 recipients: Vec<String>,
420 payload: Map<String, Value>,
421 context: Option<String>,
422 message_class: MessageClass,
423 expires_in_seconds: i64,
424 correlation_id: Option<String>,
425 in_reply_to: Option<String>,
426 delivery_mode: Option<DeliveryMode>,
427 ) -> AcpResult<SendResult> {
428 if recipients.is_empty() {
429 return Err(AcpError::InvalidArgument(
430 "send() requires at least one recipient".to_string(),
431 ));
432 }
433 let mode = delivery_mode.unwrap_or(self.default_delivery_mode);
434 let operation_id = Uuid::new_v4().to_string();
435 let context_id = context.unwrap_or_else(|| operation_id.clone());
436
437 let resolved = self.resolve_recipients(&recipients, mode)?;
438 if resolved.deliverable.is_empty() {
439 let result = SendResult {
440 operation_id: operation_id.clone(),
441 message_id: Uuid::new_v4().to_string(),
442 message_ids: Vec::new(),
443 outcomes: resolved.preflight_outcomes.clone(),
444 };
445 self.sync_delivery_states(&operation_id, &result.outcomes);
446 return Ok(result);
447 }
448
449 let mut outcomes = resolved.preflight_outcomes;
450 let mut message_ids = Vec::new();
451 let direct_targets = resolved
452 .deliverable
453 .iter()
454 .filter(|target| target.channel == "direct")
455 .cloned()
456 .collect::<Vec<_>>();
457 let relay_targets = resolved
458 .deliverable
459 .iter()
460 .filter(|target| target.channel == "relay")
461 .cloned()
462 .collect::<Vec<_>>();
463 let amqp_targets = resolved
464 .deliverable
465 .iter()
466 .filter(|target| target.channel == "amqp")
467 .cloned()
468 .collect::<Vec<_>>();
469 let mqtt_targets = resolved
470 .deliverable
471 .iter()
472 .filter(|target| target.channel == "mqtt")
473 .cloned()
474 .collect::<Vec<_>>();
475
476 if !direct_targets.is_empty() {
477 let message = self.build_message(
478 direct_targets.iter().map(|r| r.recipient.clone()).collect(),
479 &payload,
480 &to_public_key_map(&direct_targets),
481 message_class,
482 &context_id,
483 Some(operation_id.clone()),
484 expires_in_seconds,
485 correlation_id.clone(),
486 in_reply_to.clone(),
487 )?;
488 message_ids.push(message.envelope.message_id.clone());
489 outcomes.extend(self.deliver_direct(&message, &direct_targets));
490 }
491 if !relay_targets.is_empty() {
492 let message = self.build_message(
493 relay_targets.iter().map(|r| r.recipient.clone()).collect(),
494 &payload,
495 &to_public_key_map(&relay_targets),
496 message_class,
497 &context_id,
498 Some(operation_id.clone()),
499 expires_in_seconds,
500 correlation_id.clone(),
501 in_reply_to.clone(),
502 )?;
503 message_ids.push(message.envelope.message_id.clone());
504 outcomes.extend(self.deliver_via_relay(&message, &relay_targets));
505 }
506 for target in &amqp_targets {
507 let recipient_public_keys =
508 HashMap::from([(target.recipient.clone(), target.public_key.clone())]);
509 let message = self.build_message(
510 vec![target.recipient.clone()],
511 &payload,
512 &recipient_public_keys,
513 message_class,
514 &context_id,
515 Some(operation_id.clone()),
516 expires_in_seconds,
517 correlation_id.clone(),
518 in_reply_to.clone(),
519 )?;
520 message_ids.push(message.envelope.message_id.clone());
521 outcomes.push(self.deliver_via_amqp(&message, target));
522 }
523 for target in &mqtt_targets {
524 let recipient_public_keys =
525 HashMap::from([(target.recipient.clone(), target.public_key.clone())]);
526 let message = self.build_message(
527 vec![target.recipient.clone()],
528 &payload,
529 &recipient_public_keys,
530 message_class,
531 &context_id,
532 Some(operation_id.clone()),
533 expires_in_seconds,
534 correlation_id.clone(),
535 in_reply_to.clone(),
536 )?;
537 message_ids.push(message.envelope.message_id.clone());
538 outcomes.push(self.deliver_via_mqtt(&message, target));
539 }
540 if message_ids.is_empty() {
541 message_ids.push(Uuid::new_v4().to_string());
542 }
543 let result = SendResult {
544 operation_id: operation_id.clone(),
545 message_id: message_ids[0].clone(),
546 message_ids,
547 outcomes,
548 };
549 self.sync_delivery_states(&operation_id, &result.outcomes);
550 Ok(result)
551 }
552
553 pub fn send_basic(
554 &mut self,
555 recipients: Vec<String>,
556 payload: Map<String, Value>,
557 context: Option<String>,
558 ) -> AcpResult<SendResult> {
559 self.send(
560 recipients,
561 payload,
562 context,
563 MessageClass::Send,
564 300,
565 None,
566 None,
567 Some(self.default_delivery_mode),
568 )
569 }
570
571 pub fn send_compensate(
572 &mut self,
573 recipients: Vec<String>,
574 original_operation_id: &str,
575 reason: &str,
576 actions: Vec<Map<String, Value>>,
577 context: Option<String>,
578 delivery_mode: Option<DeliveryMode>,
579 ) -> AcpResult<SendResult> {
580 let instruction = CompensateInstruction {
581 operation_id: original_operation_id.to_string(),
582 reason: reason.to_string(),
583 actions,
584 };
585 let mut payload = Map::new();
586 payload.insert(
587 "compensation".to_string(),
588 serde_json::to_value(instruction).unwrap_or(Value::Null),
589 );
590 self.send(
591 recipients,
592 payload,
593 context.or_else(|| Some(format!("compensate:{original_operation_id}"))),
594 MessageClass::Compensate,
595 300,
596 Some(original_operation_id.to_string()),
597 None,
598 delivery_mode,
599 )
600 }
601
602 pub fn decrypt_message_for_self(
603 &mut self,
604 raw_message: &Map<String, Value>,
605 ) -> AcpResult<DecryptedMessage> {
606 let message = AcpMessage::from_map(raw_message)?;
607 self.validate_envelope_for_inbound(&message.envelope)?;
608 if !message
609 .envelope
610 .recipients
611 .iter()
612 .any(|recipient| recipient == self.agent_id())
613 {
614 return Err(AcpError::Processing {
615 reason: FailReason::PolicyRejected,
616 detail: "Message is not addressed to this agent".to_string(),
617 });
618 }
619 let sender_doc =
620 self.resolve_sender_identity_document(raw_message, &message.envelope.sender)?;
621 let sender_signing_key = sender_doc
622 .get("keys")
623 .and_then(Value::as_object)
624 .and_then(|keys| keys.get("signing"))
625 .and_then(Value::as_object)
626 .and_then(|signing| signing.get("public_key"))
627 .and_then(Value::as_str)
628 .ok_or_else(|| AcpError::Processing {
629 reason: FailReason::InvalidSignature,
630 detail: "Sender signing public key missing".to_string(),
631 })?;
632 if !crypto::verify_protected_payload_signature(
633 &message.envelope,
634 &message.protected_payload,
635 sender_signing_key,
636 ) {
637 return Err(AcpError::Processing {
638 reason: FailReason::InvalidSignature,
639 detail: "Message signature verification failed".to_string(),
640 });
641 }
642 let payload = crypto::decrypt_for_recipient(
643 &message.envelope,
644 &message.protected_payload,
645 self.agent_id(),
646 &self.identity.encryption_private_key,
647 )?;
648 Ok(DecryptedMessage { message, payload })
649 }
650
651 pub fn receive(
652 &mut self,
653 raw_message: &Map<String, Value>,
654 handler: Option<&InboundHandlerFn>,
655 ) -> InboundResult {
656 let mut result = InboundResult {
657 state: DeliveryState::Failed,
658 reason_code: None,
659 detail: None,
660 decrypted_payload: None,
661 response_message: None,
662 };
663
664 let request_message = match AcpMessage::from_map(raw_message) {
665 Ok(message) => message,
666 Err(exc) => {
667 result.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
668 result.detail = Some(format!("Invalid ACP message structure: {exc}"));
669 return result;
670 }
671 };
672
673 let mut sender_identity_document: Option<Map<String, Value>> = None;
674
675 let processing_result = (|| -> AcpResult<()> {
676 self.validate_envelope_for_inbound(&request_message.envelope)?;
677 if !request_message
678 .envelope
679 .recipients
680 .iter()
681 .any(|recipient| recipient == self.agent_id())
682 {
683 return Err(AcpError::Processing {
684 reason: FailReason::PolicyRejected,
685 detail: format!("Recipient {} not in message recipients", self.agent_id()),
686 });
687 }
688
689 let sender_doc = self
690 .resolve_sender_identity_document(raw_message, &request_message.envelope.sender)?;
691 sender_identity_document = Some(sender_doc.clone());
692 let sender_signing_key = sender_doc
693 .get("keys")
694 .and_then(Value::as_object)
695 .and_then(|keys| keys.get("signing"))
696 .and_then(Value::as_object)
697 .and_then(|signing| signing.get("public_key"))
698 .and_then(Value::as_str)
699 .ok_or_else(|| AcpError::Processing {
700 reason: FailReason::InvalidSignature,
701 detail: "Sender signing key missing from identity document".to_string(),
702 })?;
703 if !crypto::verify_protected_payload_signature(
704 &request_message.envelope,
705 &request_message.protected_payload,
706 sender_signing_key,
707 ) {
708 return Err(AcpError::Processing {
709 reason: FailReason::InvalidSignature,
710 detail: "Signature verification failed".to_string(),
711 });
712 }
713
714 if self
715 .dedup
716 .is_duplicate(&request_message.envelope.message_id)
717 {
718 result.state = DeliveryState::Acknowledged;
719 result.detail = Some("Duplicate message acknowledged".to_string());
720 if !matches!(
721 request_message.envelope.message_class,
722 MessageClass::Ack | MessageClass::Fail
723 ) {
724 let duplicate_ack = self.create_response_message(
725 &sender_doc,
726 &request_message.envelope,
727 MessageClass::Ack,
728 build_ack_payload(&request_message.envelope.message_id, "duplicate"),
729 )?;
730 result.response_message = Some(duplicate_ack.to_map()?);
731 }
732 return Ok(());
733 }
734
735 let decrypted_payload = crypto::decrypt_for_recipient(
736 &request_message.envelope,
737 &request_message.protected_payload,
738 self.agent_id(),
739 &self.identity.encryption_private_key,
740 )?;
741 result.decrypted_payload = Some(decrypted_payload.clone());
742
743 let response_message =
744 if request_message.envelope.message_class == MessageClass::Capabilities {
745 Some(self.create_response_message(
746 &sender_doc,
747 &request_message.envelope,
748 MessageClass::Capabilities,
749 self.capabilities.to_map(),
750 )?)
751 } else {
752 let mut ack_payload =
753 build_ack_payload(&request_message.envelope.message_id, "accepted");
754 if let Some(handler) = handler
755 && let Some(handler_payload) =
756 handler(&decrypted_payload, &request_message.envelope)
757 && !handler_payload.is_empty()
758 {
759 ack_payload.insert("handler".to_string(), Value::Object(handler_payload));
760 }
761 if matches!(
762 request_message.envelope.message_class,
763 MessageClass::Ack | MessageClass::Fail
764 ) {
765 None
766 } else {
767 Some(self.create_response_message(
768 &sender_doc,
769 &request_message.envelope,
770 MessageClass::Ack,
771 ack_payload,
772 )?)
773 }
774 };
775
776 self.dedup
777 .mark_processed(&request_message.envelope.message_id);
778 result.state = DeliveryState::Acknowledged;
779 result.response_message = response_message
780 .map(|message| message.to_map())
781 .transpose()?;
782 Ok(())
783 })();
784
785 if let Err(exc) = processing_result {
786 let (reason_code, detail) = match exc {
787 AcpError::Processing { reason, detail } => (reason.as_str().to_string(), detail),
788 _ => (
789 FailReason::PolicyRejected.as_str().to_string(),
790 exc.to_string(),
791 ),
792 };
793 result.reason_code = Some(reason_code.clone());
794 result.detail = Some(detail.clone());
795 if let Some(sender_doc) = sender_identity_document {
796 let fail_response = self.create_response_message(
797 &sender_doc,
798 &request_message.envelope,
799 MessageClass::Fail,
800 build_fail_payload(reason_code, detail, false),
801 );
802 result.response_message =
803 fail_response.ok().and_then(|message| message.to_map().ok());
804 }
805 }
806 result
807 }
808
809 pub fn request_capabilities(&mut self, recipient: &str) -> AcpResult<CapabilityRequestResult> {
810 let mut payload = Map::new();
811 payload.insert(
812 "request".to_string(),
813 Value::String("capabilities".to_string()),
814 );
815 let result = self.send(
816 vec![recipient.to_string()],
817 payload,
818 Some(format!("capabilities:{}", Uuid::new_v4())),
819 MessageClass::Capabilities,
820 300,
821 None,
822 None,
823 Some(self.default_delivery_mode),
824 )?;
825 let mut response_payload = None;
826 for outcome in &result.outcomes {
827 let Some(response_message) = &outcome.response_message else {
828 continue;
829 };
830 if let Ok(decrypted) = self.decrypt_message_for_self(response_message)
831 && decrypted.message.envelope.message_class == MessageClass::Capabilities
832 {
833 response_payload = Some(decrypted.payload);
834 break;
835 }
836 }
837 Ok(CapabilityRequestResult {
838 result,
839 capabilities: response_payload,
840 })
841 }
842
843 pub fn consume_from_amqp<F>(
844 &mut self,
845 max_messages: usize,
846 handler: Option<F>,
847 ) -> AcpResult<usize>
848 where
849 F: Fn(&Map<String, Value>, &Envelope) -> Option<Map<String, Value>> + Send + 'static,
850 {
851 let amqp_transport = self.amqp_transport.clone().ok_or_else(|| {
852 AcpError::Transport("consume_from_amqp requires an AMQP-configured agent".to_string())
853 })?;
854 let amqp_service = self
855 .identity_document
856 .get("service")
857 .and_then(Value::as_object)
858 .and_then(|service| service.get("amqp"))
859 .and_then(Value::as_object)
860 .cloned()
861 .ok_or_else(|| {
862 AcpError::Transport(
863 "Identity document is missing service.amqp configuration".to_string(),
864 )
865 })?;
866 let agent_ptr = Arc::new(std::sync::Mutex::new(self.clone()));
867 let agent_for_handler = Arc::clone(&agent_ptr);
868 let processed = amqp_transport.consume(
869 self.agent_id(),
870 move |raw_message| {
871 let Ok(mut guard) = agent_for_handler.lock() else {
872 return false;
873 };
874 let inbound = guard.receive(
875 raw_message,
876 handler.as_ref().map(|inner| inner as &InboundHandlerFn),
877 );
878 if let Some(response_message) = inbound.response_message.clone() {
879 if guard
880 .publish_amqp_response_message(raw_message, &response_message)
881 .is_err()
882 {
883 return false;
884 }
885 }
886 matches!(
887 inbound.state,
888 DeliveryState::Acknowledged
889 | DeliveryState::Failed
890 | DeliveryState::Declined
891 | DeliveryState::Expired
892 )
893 },
894 Some(&amqp_service),
895 max_messages,
896 )?;
897 if let Ok(guard) = agent_ptr.lock() {
898 *self = guard.clone();
899 }
900 Ok(processed)
901 }
902
903 pub fn consume_from_mqtt<F>(
904 &mut self,
905 max_messages: usize,
906 handler: Option<F>,
907 ) -> AcpResult<usize>
908 where
909 F: Fn(&Map<String, Value>, &Envelope) -> Option<Map<String, Value>> + Send + 'static,
910 {
911 let mqtt_transport = self.mqtt_transport.clone().ok_or_else(|| {
912 AcpError::Transport("consume_from_mqtt requires an MQTT-configured agent".to_string())
913 })?;
914 let mqtt_service = self
915 .identity_document
916 .get("service")
917 .and_then(Value::as_object)
918 .and_then(|service| service.get("mqtt"))
919 .and_then(Value::as_object)
920 .cloned()
921 .ok_or_else(|| {
922 AcpError::Transport(
923 "Identity document is missing service.mqtt configuration".to_string(),
924 )
925 })?;
926 let agent_ptr = Arc::new(std::sync::Mutex::new(self.clone()));
927 let agent_for_handler = Arc::clone(&agent_ptr);
928 let processed = mqtt_transport.consume(
929 self.agent_id(),
930 move |raw_message| {
931 let Ok(mut guard) = agent_for_handler.lock() else {
932 return false;
933 };
934 let inbound = guard.receive(
935 raw_message,
936 handler.as_ref().map(|inner| inner as &InboundHandlerFn),
937 );
938 if let Some(response_message) = inbound.response_message.clone() {
939 if guard
940 .publish_mqtt_response_message(raw_message, &response_message)
941 .is_err()
942 {
943 return false;
944 }
945 }
946 matches!(
947 inbound.state,
948 DeliveryState::Acknowledged
949 | DeliveryState::Failed
950 | DeliveryState::Declined
951 | DeliveryState::Expired
952 )
953 },
954 Some(&mqtt_service),
955 max_messages,
956 Duration::from_secs(1),
957 )?;
958 if let Ok(guard) = agent_ptr.lock() {
959 *self = guard.clone();
960 }
961 Ok(processed)
962 }
963
964 fn resolve_recipients(
965 &mut self,
966 recipients: &[String],
967 mode: DeliveryMode,
968 ) -> AcpResult<ResolvedRecipients> {
969 let mut deliverable = Vec::new();
970 let mut preflight_outcomes = Vec::new();
971 for recipient in recipients {
972 let identity_doc = match self.discovery.resolve(recipient) {
973 Ok(identity_doc) => identity_doc,
974 Err(exc) => {
975 preflight_outcomes.push(failed_outcome(
976 recipient,
977 FailReason::PolicyRejected.as_str(),
978 &exc.to_string(),
979 ));
980 continue;
981 }
982 };
983 let remote_capabilities = AgentCapabilities::from_map(
984 identity_doc.get("capabilities").and_then(Value::as_object),
985 recipient,
986 );
987 let capability_match = self.capabilities.choose_compatible(&remote_capabilities);
988 if !capability_match.compatible {
989 preflight_outcomes.push(failed_outcome(
990 recipient,
991 reason_for_capability_mismatch(capability_match.reason.as_deref()).as_str(),
992 capability_match
993 .reason
994 .as_deref()
995 .unwrap_or("No compatible capabilities"),
996 ));
997 continue;
998 }
999 let choice = self.choose_delivery_channel(&remote_capabilities, &identity_doc, mode)?;
1000 let Some(channel) = choice.channel else {
1001 preflight_outcomes.push(failed_outcome(
1002 recipient,
1003 FailReason::PolicyRejected.as_str(),
1004 choice
1005 .detail
1006 .as_deref()
1007 .unwrap_or("Delivery channel unavailable"),
1008 ));
1009 continue;
1010 };
1011 let recipient_public_key = identity_doc
1012 .get("keys")
1013 .and_then(Value::as_object)
1014 .and_then(|keys| keys.get("encryption"))
1015 .and_then(Value::as_object)
1016 .and_then(|enc| enc.get("public_key"))
1017 .and_then(Value::as_str)
1018 .map(str::to_string);
1019 let Some(recipient_public_key) = recipient_public_key else {
1020 preflight_outcomes.push(failed_outcome(
1021 recipient,
1022 FailReason::PolicyRejected.as_str(),
1023 "Recipient identity document missing encryption public key",
1024 ));
1025 continue;
1026 };
1027 deliverable.push(ResolvedRecipient {
1028 recipient: recipient.to_string(),
1029 public_key: recipient_public_key,
1030 channel,
1031 endpoint: choice.endpoint,
1032 amqp_service: choice.amqp_service,
1033 mqtt_service: choice.mqtt_service,
1034 });
1035 }
1036 Ok(ResolvedRecipients {
1037 deliverable,
1038 preflight_outcomes,
1039 })
1040 }
1041
1042 fn choose_delivery_channel(
1043 &self,
1044 remote_capabilities: &AgentCapabilities,
1045 identity_document: &Map<String, Value>,
1046 mode: DeliveryMode,
1047 ) -> AcpResult<ChannelChoice> {
1048 let remote_transports: HashSet<String> = remote_capabilities
1049 .transports
1050 .iter()
1051 .map(|t| t.to_lowercase())
1052 .collect();
1053 let shared = self
1054 .capabilities
1055 .transports
1056 .iter()
1057 .filter(|transport| remote_transports.contains(&transport.to_lowercase()))
1058 .map(|transport| transport.to_lowercase())
1059 .collect::<Vec<_>>();
1060
1061 let service = identity_document
1062 .get("service")
1063 .and_then(Value::as_object)
1064 .cloned()
1065 .unwrap_or_default();
1066 let direct_endpoint = service
1067 .get("direct_endpoint")
1068 .and_then(Value::as_str)
1069 .map(str::to_string);
1070 let has_direct = direct_endpoint
1071 .as_deref()
1072 .map(|endpoint| !endpoint.trim().is_empty())
1073 .unwrap_or(false);
1074 let direct_available = has_direct
1075 && shared
1076 .iter()
1077 .any(|transport| matches!(transport.as_str(), "https" | "http" | "direct"));
1078 let relay_available =
1079 !self.relay_url.trim().is_empty() && shared.iter().any(|t| t == "relay");
1080 let amqp_service = service.get("amqp").and_then(Value::as_object).cloned();
1081 let amqp_available = shared.iter().any(|t| t == "amqp")
1082 && amqp_service
1083 .as_ref()
1084 .and_then(|service| service.get("broker_url"))
1085 .and_then(Value::as_str)
1086 .map(str::trim)
1087 .filter(|v| !v.is_empty())
1088 .is_some();
1089 let mqtt_service = service.get("mqtt").and_then(Value::as_object).cloned();
1090 let mqtt_available = shared.iter().any(|t| t == "mqtt")
1091 && mqtt_service
1092 .as_ref()
1093 .and_then(|service| service.get("broker_url"))
1094 .and_then(Value::as_str)
1095 .map(str::trim)
1096 .filter(|v| !v.is_empty())
1097 .is_some()
1098 && mqtt_service
1099 .as_ref()
1100 .and_then(|service| service.get("topic"))
1101 .and_then(Value::as_str)
1102 .map(str::trim)
1103 .filter(|v| !v.is_empty())
1104 .is_some();
1105
1106 match mode {
1107 DeliveryMode::Direct => {
1108 if direct_available {
1109 return Ok(ChannelChoice::new(
1110 Some("direct".to_string()),
1111 direct_endpoint,
1112 None,
1113 None,
1114 None,
1115 ));
1116 }
1117 return Ok(ChannelChoice::new(
1118 None,
1119 None,
1120 None,
1121 None,
1122 Some("No compatible direct transport and endpoint available".to_string()),
1123 ));
1124 }
1125 DeliveryMode::Relay => {
1126 if relay_available {
1127 return Ok(ChannelChoice::new(
1128 Some("relay".to_string()),
1129 None,
1130 None,
1131 None,
1132 None,
1133 ));
1134 }
1135 return Ok(ChannelChoice::new(
1136 None,
1137 None,
1138 None,
1139 None,
1140 Some("No compatible relay transport available".to_string()),
1141 ));
1142 }
1143 DeliveryMode::Amqp => {
1144 if amqp_available {
1145 return Ok(ChannelChoice::new(
1146 Some("amqp".to_string()),
1147 None,
1148 amqp_service,
1149 None,
1150 None,
1151 ));
1152 }
1153 return Ok(ChannelChoice::new(
1154 None,
1155 None,
1156 None,
1157 None,
1158 Some("No compatible AMQP transport available".to_string()),
1159 ));
1160 }
1161 DeliveryMode::Mqtt => {
1162 if mqtt_available {
1163 return Ok(ChannelChoice::new(
1164 Some("mqtt".to_string()),
1165 None,
1166 None,
1167 mqtt_service,
1168 None,
1169 ));
1170 }
1171 return Ok(ChannelChoice::new(
1172 None,
1173 None,
1174 None,
1175 None,
1176 Some("No compatible MQTT transport available".to_string()),
1177 ));
1178 }
1179 DeliveryMode::Auto => {}
1180 }
1181
1182 if direct_available {
1183 return Ok(ChannelChoice::new(
1184 Some("direct".to_string()),
1185 direct_endpoint,
1186 None,
1187 None,
1188 None,
1189 ));
1190 }
1191 if relay_available {
1192 return Ok(ChannelChoice::new(
1193 Some("relay".to_string()),
1194 None,
1195 None,
1196 None,
1197 None,
1198 ));
1199 }
1200 if amqp_available {
1201 return Ok(ChannelChoice::new(
1202 Some("amqp".to_string()),
1203 None,
1204 amqp_service,
1205 None,
1206 None,
1207 ));
1208 }
1209 if mqtt_available {
1210 return Ok(ChannelChoice::new(
1211 Some("mqtt".to_string()),
1212 None,
1213 None,
1214 mqtt_service,
1215 None,
1216 ));
1217 }
1218 if has_direct {
1219 return Ok(ChannelChoice::new(
1220 None,
1221 None,
1222 None,
1223 None,
1224 Some(
1225 "No compatible transport implementation available for this recipient"
1226 .to_string(),
1227 ),
1228 ));
1229 }
1230 if amqp_service.is_some() {
1231 return Ok(ChannelChoice::new(
1232 None,
1233 None,
1234 None,
1235 None,
1236 Some(
1237 "AMQP transport is advertised but not compatible with sender capabilities"
1238 .to_string(),
1239 ),
1240 ));
1241 }
1242 if mqtt_service.is_some() {
1243 return Ok(ChannelChoice::new(
1244 None,
1245 None,
1246 None,
1247 None,
1248 Some(
1249 "MQTT transport is advertised but not compatible with sender capabilities"
1250 .to_string(),
1251 ),
1252 ));
1253 }
1254 Ok(ChannelChoice::new(
1255 None,
1256 None,
1257 None,
1258 None,
1259 Some(
1260 "Recipient identity document is missing direct_endpoint/amqp/mqtt and no relay fallback is compatible"
1261 .to_string(),
1262 ),
1263 ))
1264 }
1265
1266 #[allow(clippy::too_many_arguments)]
1267 fn build_message(
1268 &self,
1269 recipients: Vec<String>,
1270 payload: &Map<String, Value>,
1271 recipient_public_keys: &HashMap<String, String>,
1272 message_class: MessageClass,
1273 context_id: &str,
1274 operation_id: Option<String>,
1275 expires_in_seconds: i64,
1276 correlation_id: Option<String>,
1277 in_reply_to: Option<String>,
1278 ) -> AcpResult<AcpMessage> {
1279 let envelope = Envelope::build(
1280 self.agent_id().to_string(),
1281 recipients,
1282 message_class,
1283 context_id.to_string(),
1284 expires_in_seconds,
1285 operation_id,
1286 correlation_id,
1287 in_reply_to,
1288 Some(DEFAULT_CRYPTO_SUITE.to_string()),
1289 )?;
1290 let mut protected_payload =
1291 crypto::encrypt_for_recipients(payload, &envelope, recipient_public_keys)?;
1292 crypto::sign_protected_payload(
1293 &envelope,
1294 &mut protected_payload,
1295 &self.identity.signing_private_key,
1296 &self.identity.signing_kid,
1297 )?;
1298 Ok(AcpMessage {
1299 envelope,
1300 protected_payload,
1301 sender_identity_document: Some(self.identity_document.clone()),
1302 })
1303 }
1304
1305 fn deliver_direct(
1306 &self,
1307 message: &AcpMessage,
1308 targets: &[ResolvedRecipient],
1309 ) -> Vec<DeliveryOutcome> {
1310 let mut outcomes = Vec::new();
1311 let message_map = match message.to_map() {
1312 Ok(map) => map,
1313 Err(exc) => {
1314 for target in targets {
1315 outcomes.push(failed_outcome(
1316 &target.recipient,
1317 FailReason::PolicyRejected.as_str(),
1318 &format!("Message serialization failure: {exc}"),
1319 ));
1320 }
1321 return outcomes;
1322 }
1323 };
1324 for target in targets {
1325 let Some(endpoint) = target.endpoint.as_deref() else {
1326 outcomes.push(failed_outcome(
1327 &target.recipient,
1328 FailReason::PolicyRejected.as_str(),
1329 "Missing direct endpoint for direct delivery",
1330 ));
1331 continue;
1332 };
1333 match self.transport.post_json(endpoint, &message_map) {
1334 Ok(response) => {
1335 outcomes.push(outcome_from_http_response(&target.recipient, &response))
1336 }
1337 Err(exc) => outcomes.push(failed_outcome(
1338 &target.recipient,
1339 FailReason::PolicyRejected.as_str(),
1340 &format!("Direct transport failure: {exc}"),
1341 )),
1342 }
1343 }
1344 outcomes
1345 }
1346
1347 fn deliver_via_relay(
1348 &self,
1349 message: &AcpMessage,
1350 targets: &[ResolvedRecipient],
1351 ) -> Vec<DeliveryOutcome> {
1352 let mut outcomes = Vec::new();
1353 match self.transport.send_to_relay(&self.relay_url, message) {
1354 Ok(relay_response) => {
1355 let mut delivered = HashSet::new();
1356 if let Some(raw_outcomes) = relay_response.get("outcomes").and_then(Value::as_array)
1357 {
1358 for item in raw_outcomes {
1359 if let Ok(outcome) = serde_json::from_value::<DeliveryOutcome>(item.clone())
1360 {
1361 delivered.insert(outcome.recipient.clone());
1362 outcomes.push(outcome);
1363 }
1364 }
1365 }
1366 for target in targets {
1367 if !delivered.contains(&target.recipient) {
1368 outcomes.push(failed_outcome(
1369 &target.recipient,
1370 FailReason::PolicyRejected.as_str(),
1371 "Relay did not return an outcome for recipient",
1372 ));
1373 }
1374 }
1375 }
1376 Err(exc) => {
1377 for target in targets {
1378 outcomes.push(failed_outcome(
1379 &target.recipient,
1380 FailReason::PolicyRejected.as_str(),
1381 &format!("Relay transport failure: {exc}"),
1382 ));
1383 }
1384 }
1385 }
1386 outcomes
1387 }
1388
1389 fn deliver_via_amqp(
1390 &self,
1391 message: &AcpMessage,
1392 target: &ResolvedRecipient,
1393 ) -> DeliveryOutcome {
1394 let mut outcome = DeliveryOutcome {
1395 recipient: target.recipient.clone(),
1396 state: DeliveryState::Pending,
1397 status_code: None,
1398 response_class: None,
1399 reason_code: None,
1400 detail: None,
1401 response_message: None,
1402 };
1403 let message_map = match message.to_map() {
1404 Ok(message_map) => message_map,
1405 Err(exc) => {
1406 outcome.state = DeliveryState::Failed;
1407 outcome.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
1408 outcome.detail = Some(format!("AMQP message serialization failure: {exc}"));
1409 return outcome;
1410 }
1411 };
1412 let result = (|| -> AcpResult<()> {
1413 let client = if let Some(amqp_transport) = &self.amqp_transport {
1414 amqp_transport.clone()
1415 } else {
1416 let broker_url = target
1417 .amqp_service
1418 .as_ref()
1419 .and_then(|service| service.get("broker_url"))
1420 .and_then(Value::as_str)
1421 .ok_or_else(|| {
1422 AcpError::Transport(
1423 "AMQP delivery selected but sender is not configured with an AMQP broker"
1424 .to_string(),
1425 )
1426 })?;
1427 AmqpTransportClient::new(
1428 broker_url.to_string(),
1429 target
1430 .amqp_service
1431 .as_ref()
1432 .and_then(|service| service.get("exchange"))
1433 .and_then(Value::as_str)
1434 .map(str::to_string),
1435 None,
1436 10,
1437 )?
1438 };
1439 client.publish(
1440 &message_map,
1441 &target.recipient,
1442 target.amqp_service.as_ref(),
1443 )
1444 })();
1445 match result {
1446 Ok(()) => outcome.state = DeliveryState::Delivered,
1447 Err(exc) => {
1448 outcome.state = DeliveryState::Failed;
1449 outcome.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
1450 outcome.detail = Some(format!("AMQP transport failure: {exc}"));
1451 }
1452 }
1453 outcome
1454 }
1455
1456 fn deliver_via_mqtt(
1457 &self,
1458 message: &AcpMessage,
1459 target: &ResolvedRecipient,
1460 ) -> DeliveryOutcome {
1461 let mut outcome = DeliveryOutcome {
1462 recipient: target.recipient.clone(),
1463 state: DeliveryState::Pending,
1464 status_code: None,
1465 response_class: None,
1466 reason_code: None,
1467 detail: None,
1468 response_message: None,
1469 };
1470 let message_map = match message.to_map() {
1471 Ok(message_map) => message_map,
1472 Err(exc) => {
1473 outcome.state = DeliveryState::Failed;
1474 outcome.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
1475 outcome.detail = Some(format!("MQTT message serialization failure: {exc}"));
1476 return outcome;
1477 }
1478 };
1479 let result = (|| -> AcpResult<()> {
1480 let client = if let Some(mqtt_transport) = &self.mqtt_transport {
1481 mqtt_transport.clone()
1482 } else {
1483 let service = target.mqtt_service.as_ref().ok_or_else(|| {
1484 AcpError::Transport(
1485 "MQTT delivery selected but sender is not configured with an MQTT broker"
1486 .to_string(),
1487 )
1488 })?;
1489 let broker_url = service
1490 .get("broker_url")
1491 .and_then(Value::as_str)
1492 .ok_or_else(|| {
1493 AcpError::Transport(
1494 "MQTT delivery selected but sender is not configured with an MQTT broker"
1495 .to_string(),
1496 )
1497 })?;
1498 MqttTransportClient::new(
1499 broker_url.to_string(),
1500 service.get("qos").and_then(|v| v.as_u64()).map(|v| v as u8),
1501 None,
1502 10,
1503 30,
1504 )?
1505 };
1506 client.publish(
1507 &message_map,
1508 &target.recipient,
1509 target.mqtt_service.as_ref(),
1510 )
1511 })();
1512 match result {
1513 Ok(()) => outcome.state = DeliveryState::Delivered,
1514 Err(exc) => {
1515 outcome.state = DeliveryState::Failed;
1516 outcome.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
1517 outcome.detail = Some(format!("MQTT transport failure: {exc}"));
1518 }
1519 }
1520 outcome
1521 }
1522
1523 fn publish_amqp_response_message(
1524 &mut self,
1525 raw_message: &Map<String, Value>,
1526 response_message: &Map<String, Value>,
1527 ) -> AcpResult<()> {
1528 let amqp_transport = self
1529 .amqp_transport
1530 .clone()
1531 .ok_or_else(|| AcpError::Transport("AMQP transport is not configured".to_string()))?;
1532 let sender_id = raw_message
1533 .get("envelope")
1534 .and_then(Value::as_object)
1535 .and_then(|envelope| envelope.get("sender"))
1536 .and_then(Value::as_str)
1537 .ok_or_else(|| {
1538 AcpError::Transport(
1539 "Inbound message sender is missing for AMQP response routing".to_string(),
1540 )
1541 })?;
1542 let sender_identity = self.resolve_sender_identity_document(raw_message, sender_id)?;
1543 let sender_amqp_service = sender_identity
1544 .get("service")
1545 .and_then(Value::as_object)
1546 .and_then(|service| service.get("amqp"))
1547 .and_then(Value::as_object)
1548 .cloned()
1549 .ok_or_else(|| {
1550 AcpError::Transport(format!(
1551 "Sender {sender_id} does not advertise service.amqp for AMQP response delivery"
1552 ))
1553 })?;
1554 amqp_transport.publish(response_message, sender_id, Some(&sender_amqp_service))
1555 }
1556
1557 fn publish_mqtt_response_message(
1558 &mut self,
1559 raw_message: &Map<String, Value>,
1560 response_message: &Map<String, Value>,
1561 ) -> AcpResult<()> {
1562 let mqtt_transport = self
1563 .mqtt_transport
1564 .clone()
1565 .ok_or_else(|| AcpError::Transport("MQTT transport is not configured".to_string()))?;
1566 let sender_id = raw_message
1567 .get("envelope")
1568 .and_then(Value::as_object)
1569 .and_then(|envelope| envelope.get("sender"))
1570 .and_then(Value::as_str)
1571 .ok_or_else(|| {
1572 AcpError::Transport(
1573 "Inbound message sender is missing for MQTT response routing".to_string(),
1574 )
1575 })?;
1576 let sender_identity = self.resolve_sender_identity_document(raw_message, sender_id)?;
1577 let sender_mqtt_service = sender_identity
1578 .get("service")
1579 .and_then(Value::as_object)
1580 .and_then(|service| service.get("mqtt"))
1581 .and_then(Value::as_object)
1582 .cloned()
1583 .ok_or_else(|| {
1584 AcpError::Transport(format!(
1585 "Sender {sender_id} does not advertise service.mqtt for MQTT response delivery"
1586 ))
1587 })?;
1588 mqtt_transport.publish(response_message, sender_id, Some(&sender_mqtt_service))
1589 }
1590
1591 fn resolve_sender_identity_document(
1592 &mut self,
1593 raw_message: &Map<String, Value>,
1594 sender_id: &str,
1595 ) -> AcpResult<Map<String, Value>> {
1596 if let Some(embedded) = raw_message
1597 .get("sender_identity_document")
1598 .and_then(Value::as_object)
1599 .cloned()
1600 && embedded.get("agent_id").and_then(Value::as_str) == Some(sender_id)
1601 && verify_identity_document(&embedded)
1602 {
1603 return Ok(embedded);
1604 }
1605 self.discovery.resolve(sender_id)
1606 }
1607
1608 fn validate_envelope_for_inbound(&self, envelope: &Envelope) -> AcpResult<()> {
1609 if envelope.acp_version != ACP_VERSION {
1610 return Err(AcpError::Processing {
1611 reason: FailReason::UnsupportedVersion,
1612 detail: format!("Unsupported ACP version: {}", envelope.acp_version),
1613 });
1614 }
1615 if envelope.crypto_suite != DEFAULT_CRYPTO_SUITE {
1616 return Err(AcpError::Processing {
1617 reason: FailReason::UnsupportedCryptoSuite,
1618 detail: format!("Unsupported crypto suite: {}", envelope.crypto_suite),
1619 });
1620 }
1621 if envelope.is_expired() {
1622 return Err(AcpError::Processing {
1623 reason: FailReason::ExpiredMessage,
1624 detail: "Message is expired".to_string(),
1625 });
1626 }
1627 Ok(())
1628 }
1629
1630 fn create_response_message(
1631 &self,
1632 sender_identity_document: &Map<String, Value>,
1633 request_envelope: &Envelope,
1634 response_class: MessageClass,
1635 response_payload: Map<String, Value>,
1636 ) -> AcpResult<AcpMessage> {
1637 let sender_id = &request_envelope.sender;
1638 let sender_encryption_public_key = sender_identity_document
1639 .get("keys")
1640 .and_then(Value::as_object)
1641 .and_then(|keys| keys.get("encryption"))
1642 .and_then(Value::as_object)
1643 .and_then(|encryption| encryption.get("public_key"))
1644 .and_then(Value::as_str)
1645 .ok_or_else(|| AcpError::Processing {
1646 reason: FailReason::PolicyRejected,
1647 detail: "Sender identity document missing encryption key".to_string(),
1648 })?;
1649 self.build_message(
1650 vec![sender_id.to_string()],
1651 &response_payload,
1652 &HashMap::from([(
1653 sender_id.to_string(),
1654 sender_encryption_public_key.to_string(),
1655 )]),
1656 response_class,
1657 &request_envelope.context_id,
1658 Some(request_envelope.operation_id.clone()),
1659 300,
1660 request_envelope
1661 .correlation_id
1662 .clone()
1663 .or_else(|| Some(request_envelope.operation_id.clone())),
1664 Some(request_envelope.message_id.clone()),
1665 )
1666 }
1667
1668 fn sync_delivery_states(&mut self, operation_id: &str, outcomes: &[DeliveryOutcome]) {
1669 let mut states = HashMap::new();
1670 for outcome in outcomes {
1671 states.insert(
1672 outcome.recipient.clone(),
1673 format!("{:?}", outcome.state).to_uppercase(),
1674 );
1675 }
1676 self.delivery_states
1677 .insert(operation_id.to_string(), states);
1678 }
1679}
1680
1681#[derive(Debug, Clone)]
1682struct ResolvedRecipient {
1683 recipient: String,
1684 public_key: String,
1685 channel: String,
1686 endpoint: Option<String>,
1687 amqp_service: Option<Map<String, Value>>,
1688 mqtt_service: Option<Map<String, Value>>,
1689}
1690
1691#[derive(Debug, Clone)]
1692struct ResolvedRecipients {
1693 deliverable: Vec<ResolvedRecipient>,
1694 preflight_outcomes: Vec<DeliveryOutcome>,
1695}
1696
1697#[derive(Debug, Clone)]
1698struct ChannelChoice {
1699 channel: Option<String>,
1700 endpoint: Option<String>,
1701 amqp_service: Option<Map<String, Value>>,
1702 mqtt_service: Option<Map<String, Value>>,
1703 detail: Option<String>,
1704}
1705
1706impl ChannelChoice {
1707 fn new(
1708 channel: Option<String>,
1709 endpoint: Option<String>,
1710 amqp_service: Option<Map<String, Value>>,
1711 mqtt_service: Option<Map<String, Value>>,
1712 detail: Option<String>,
1713 ) -> Self {
1714 Self {
1715 channel,
1716 endpoint,
1717 amqp_service,
1718 mqtt_service,
1719 detail,
1720 }
1721 }
1722}
1723
1724#[derive(Debug, Clone)]
1725struct DedupStore {
1726 ttl: Duration,
1727 processed: HashMap<String, Instant>,
1728}
1729
1730impl DedupStore {
1731 fn new(ttl: Duration) -> Self {
1732 Self {
1733 ttl,
1734 processed: HashMap::new(),
1735 }
1736 }
1737
1738 fn is_duplicate(&mut self, message_id: &str) -> bool {
1739 self.cleanup_expired();
1740 self.processed.contains_key(message_id)
1741 }
1742
1743 fn mark_processed(&mut self, message_id: &str) {
1744 self.processed
1745 .insert(message_id.to_string(), Instant::now());
1746 }
1747
1748 fn cleanup_expired(&mut self) {
1749 let ttl = self.ttl;
1750 self.processed
1751 .retain(|_, timestamp| timestamp.elapsed() < ttl);
1752 }
1753}
1754
1755fn outcome_from_http_response(recipient: &str, response: &TransportResponse) -> DeliveryOutcome {
1756 let mut response_class = None;
1757 let mut response_message = None;
1758 let mut reason_code = None;
1759 let mut detail = None;
1760
1761 if let Some(body) = &response.body {
1762 if let Some(raw_response_message) = body.get("response_message").and_then(Value::as_object)
1763 {
1764 response_message = Some(raw_response_message.clone());
1765 response_class = raw_response_message
1766 .get("envelope")
1767 .and_then(Value::as_object)
1768 .and_then(|envelope| envelope.get("message_class"))
1769 .and_then(Value::as_str)
1770 .and_then(parse_message_class);
1771 }
1772 reason_code = body
1773 .get("reason_code")
1774 .and_then(Value::as_str)
1775 .map(str::to_string);
1776 detail = body
1777 .get("detail")
1778 .and_then(Value::as_str)
1779 .map(str::to_string);
1780 }
1781 if detail.is_none() && response.status_code >= 400 {
1782 detail = Some(format!("Recipient HTTP {}", response.status_code));
1783 }
1784
1785 DeliveryOutcome {
1786 recipient: recipient.to_string(),
1787 state: delivery_state_from_response(
1788 response.status_code,
1789 response_class,
1790 reason_code.as_deref(),
1791 ),
1792 status_code: Some(response.status_code),
1793 response_class,
1794 reason_code,
1795 detail,
1796 response_message,
1797 }
1798}
1799
1800fn delivery_state_from_response(
1801 status_code: u16,
1802 response_class: Option<MessageClass>,
1803 reason_code: Option<&str>,
1804) -> DeliveryState {
1805 if (200..300).contains(&status_code) {
1806 if response_class == Some(MessageClass::Fail) {
1807 if reason_code == Some(FailReason::ExpiredMessage.as_str()) {
1808 return DeliveryState::Expired;
1809 }
1810 if reason_code == Some(FailReason::PolicyRejected.as_str()) {
1811 return DeliveryState::Declined;
1812 }
1813 return DeliveryState::Failed;
1814 }
1815 if matches!(
1816 response_class,
1817 Some(MessageClass::Ack | MessageClass::Capabilities)
1818 ) {
1819 return DeliveryState::Acknowledged;
1820 }
1821 return DeliveryState::Delivered;
1822 }
1823 if status_code == 410 {
1824 return DeliveryState::Expired;
1825 }
1826 if [401, 403, 409, 422].contains(&status_code) {
1827 return DeliveryState::Declined;
1828 }
1829 DeliveryState::Failed
1830}
1831
1832fn parse_message_class(value: &str) -> Option<MessageClass> {
1833 match value {
1834 "SEND" => Some(MessageClass::Send),
1835 "ACK" => Some(MessageClass::Ack),
1836 "FAIL" => Some(MessageClass::Fail),
1837 "CAPABILITIES" => Some(MessageClass::Capabilities),
1838 "COMPENSATE" => Some(MessageClass::Compensate),
1839 _ => None,
1840 }
1841}
1842
1843fn failed_outcome(recipient: &str, reason_code: &str, detail: &str) -> DeliveryOutcome {
1844 DeliveryOutcome {
1845 recipient: recipient.to_string(),
1846 state: DeliveryState::Failed,
1847 status_code: None,
1848 response_class: None,
1849 reason_code: Some(reason_code.to_string()),
1850 detail: Some(detail.to_string()),
1851 response_message: None,
1852 }
1853}
1854
1855fn to_public_key_map(targets: &[ResolvedRecipient]) -> HashMap<String, String> {
1856 targets
1857 .iter()
1858 .map(|target| (target.recipient.clone(), target.public_key.clone()))
1859 .collect()
1860}
1861
1862fn reason_for_capability_mismatch(reason: Option<&str>) -> FailReason {
1863 let normalized = reason.unwrap_or_default().to_lowercase();
1864 if normalized.contains("protocol") {
1865 return FailReason::UnsupportedVersion;
1866 }
1867 if normalized.contains("crypto") {
1868 return FailReason::UnsupportedCryptoSuite;
1869 }
1870 if normalized.contains("profile") {
1871 return FailReason::UnsupportedProfile;
1872 }
1873 FailReason::PolicyRejected
1874}
1875
1876fn build_local_amqp_service(
1877 agent_id: &str,
1878 options: &AcpAgentOptions,
1879) -> AcpResult<Option<Map<String, Value>>> {
1880 let Some(broker_url) = options.amqp_broker_url.as_deref() else {
1881 return Ok(None);
1882 };
1883 Ok(Some(AmqpTransportClient::build_service_hint(
1884 agent_id,
1885 broker_url,
1886 Some(&options.amqp_exchange),
1887 )?))
1888}
1889
1890fn build_local_mqtt_service(
1891 agent_id: &str,
1892 options: &AcpAgentOptions,
1893) -> AcpResult<Option<Map<String, Value>>> {
1894 let Some(broker_url) = options.mqtt_broker_url.as_deref() else {
1895 return Ok(None);
1896 };
1897 Ok(Some(MqttTransportClient::build_service_hint(
1898 agent_id,
1899 broker_url,
1900 None,
1901 Some(options.mqtt_qos),
1902 Some(&options.mqtt_topic_prefix),
1903 )?))
1904}
1905
1906fn apply_http_security_profile(identity_document: &mut Map<String, Value>, mtls_enabled: bool) {
1907 if !mtls_enabled {
1908 return;
1909 }
1910 let mut service = identity_document
1911 .get("service")
1912 .and_then(Value::as_object)
1913 .cloned()
1914 .unwrap_or_default();
1915 let direct_endpoint = service
1916 .get("direct_endpoint")
1917 .and_then(Value::as_str)
1918 .map(str::to_string);
1919 let relay_hints = service
1920 .get("relay_hints")
1921 .and_then(Value::as_array)
1922 .map(|items| {
1923 items
1924 .iter()
1925 .filter_map(Value::as_str)
1926 .map(str::to_string)
1927 .collect::<Vec<_>>()
1928 })
1929 .unwrap_or_default();
1930 if let Some(endpoint) = direct_endpoint
1931 && !endpoint.trim().is_empty()
1932 {
1933 service.insert(
1934 "http".to_string(),
1935 serde_json::json!({
1936 "endpoint": endpoint,
1937 "security_profile": "mtls",
1938 }),
1939 );
1940 }
1941 if let Some(relay_endpoint) = relay_hints.first()
1942 && !relay_endpoint.trim().is_empty()
1943 {
1944 service.insert(
1945 "relay".to_string(),
1946 serde_json::json!({
1947 "endpoint": relay_endpoint,
1948 "security_profile": "mtls",
1949 }),
1950 );
1951 }
1952 identity_document.insert("service".to_string(), Value::Object(service));
1953}
1954
1955fn resolve_key_provider(options: &AcpAgentOptions) -> AcpResult<Box<dyn KeyProvider>> {
1956 if let Some(provider) = options.key_provider_instance.clone() {
1957 return Ok(Box::new(ArcKeyProvider(provider)));
1958 }
1959 let provider_name = normalize_key_provider_name(&options.key_provider);
1960 match provider_name.as_str() {
1961 "local" => Ok(Box::new(LocalKeyProvider::new(
1962 options.storage_dir.clone(),
1963 options.cert_file.clone(),
1964 options.key_file.clone(),
1965 options.ca_file.clone(),
1966 ))),
1967 "vault" => {
1968 let vault_url = options.vault_url.clone().ok_or_else(|| {
1969 AcpError::Validation("vault_url is required when key_provider=vault".to_string())
1970 })?;
1971 let vault_path = options.vault_path.clone().ok_or_else(|| {
1972 AcpError::Validation("vault_path is required when key_provider=vault".to_string())
1973 })?;
1974 Ok(Box::new(VaultKeyProvider::new(
1975 vault_url,
1976 vault_path,
1977 Some(options.vault_token_env.clone()),
1978 options.vault_token.clone(),
1979 options.http_timeout_seconds,
1980 options.ca_file.clone(),
1981 options.allow_insecure_tls,
1982 options.allow_insecure_http,
1983 )?))
1984 }
1985 _ => Err(AcpError::Validation(format!(
1986 "Unsupported key_provider: {}",
1987 options.key_provider
1988 ))),
1989 }
1990}
1991
1992fn normalize_key_provider_name(value: &str) -> String {
1993 let normalized = value.trim().to_lowercase();
1994 if normalized.is_empty() {
1995 "local".to_string()
1996 } else {
1997 normalized
1998 }
1999}
2000
2001fn matches_provider_local(key_provider_info: &KeyProviderInfo) -> bool {
2002 key_provider_info
2003 .get("provider")
2004 .and_then(Value::as_str)
2005 .map(|provider| provider == "local")
2006 .unwrap_or(false)
2007}
2008
2009fn identity_from_provider(agent_id: &str, keys: &IdentityKeyMaterial) -> AcpResult<AgentIdentity> {
2010 let mut missing = Vec::new();
2011 if keys
2012 .signing_public_key
2013 .as_deref()
2014 .map(str::trim)
2015 .filter(|v| !v.is_empty())
2016 .is_none()
2017 {
2018 missing.push("signing_public_key");
2019 }
2020 if keys
2021 .encryption_public_key
2022 .as_deref()
2023 .map(str::trim)
2024 .filter(|v| !v.is_empty())
2025 .is_none()
2026 {
2027 missing.push("encryption_public_key");
2028 }
2029 if keys
2030 .signing_kid
2031 .as_deref()
2032 .map(str::trim)
2033 .filter(|v| !v.is_empty())
2034 .is_none()
2035 {
2036 missing.push("signing_kid");
2037 }
2038 if keys
2039 .encryption_kid
2040 .as_deref()
2041 .map(str::trim)
2042 .filter(|v| !v.is_empty())
2043 .is_none()
2044 {
2045 missing.push("encryption_kid");
2046 }
2047 if !missing.is_empty() {
2048 return Err(AcpError::Validation(format!(
2049 "External key provider requires identity public metadata for first-time bootstrap: {}",
2050 missing.join(", ")
2051 )));
2052 }
2053 Ok(AgentIdentity {
2054 agent_id: agent_id.to_string(),
2055 signing_private_key: keys.signing_private_key.clone(),
2056 signing_public_key: keys.signing_public_key.clone().unwrap_or_default(),
2057 encryption_private_key: keys.encryption_private_key.clone(),
2058 encryption_public_key: keys.encryption_public_key.clone().unwrap_or_default(),
2059 signing_kid: keys.signing_kid.clone().unwrap_or_default(),
2060 encryption_kid: keys.encryption_kid.clone().unwrap_or_default(),
2061 })
2062}
2063
2064fn apply_provider_keys(
2065 identity: &AgentIdentity,
2066 keys: &IdentityKeyMaterial,
2067) -> AcpResult<AgentIdentity> {
2068 if let Some(signing_public_key) = keys.signing_public_key.as_deref()
2069 && signing_public_key != identity.signing_public_key
2070 {
2071 return Err(AcpError::Validation(
2072 "Key provider signing_public_key does not match local identity metadata".to_string(),
2073 ));
2074 }
2075 if let Some(encryption_public_key) = keys.encryption_public_key.as_deref()
2076 && encryption_public_key != identity.encryption_public_key
2077 {
2078 return Err(AcpError::Validation(
2079 "Key provider encryption_public_key does not match local identity metadata".to_string(),
2080 ));
2081 }
2082 if let Some(signing_kid) = keys.signing_kid.as_deref()
2083 && signing_kid != identity.signing_kid
2084 {
2085 return Err(AcpError::Validation(
2086 "Key provider signing_kid does not match local identity metadata".to_string(),
2087 ));
2088 }
2089 if let Some(encryption_kid) = keys.encryption_kid.as_deref()
2090 && encryption_kid != identity.encryption_kid
2091 {
2092 return Err(AcpError::Validation(
2093 "Key provider encryption_kid does not match local identity metadata".to_string(),
2094 ));
2095 }
2096 Ok(AgentIdentity {
2097 agent_id: identity.agent_id.clone(),
2098 signing_private_key: keys.signing_private_key.clone(),
2099 signing_public_key: identity.signing_public_key.clone(),
2100 encryption_private_key: keys.encryption_private_key.clone(),
2101 encryption_public_key: identity.encryption_public_key.clone(),
2102 signing_kid: identity.signing_kid.clone(),
2103 encryption_kid: identity.encryption_kid.clone(),
2104 })
2105}
2106
2107fn base_url_from_endpoint(endpoint: &str) -> Option<String> {
2108 let endpoint = endpoint.trim();
2109 if endpoint.is_empty() {
2110 return None;
2111 }
2112 let parsed = url::Url::parse(endpoint).ok()?;
2113 let scheme = parsed.scheme();
2114 let host = parsed.host_str()?;
2115 let authority = if let Some(port) = parsed.port() {
2116 format!("{host}:{port}")
2117 } else {
2118 host.to_string()
2119 };
2120 Some(format!("{scheme}://{authority}"))
2121}
2122
2123fn first_non_blank(values: &[Option<String>]) -> Option<String> {
2124 values
2125 .iter()
2126 .flatten()
2127 .map(|value| value.trim())
2128 .find(|value| !value.is_empty())
2129 .map(str::to_string)
2130}
2131
2132#[derive(Clone)]
2133struct ArcKeyProvider(Arc<dyn KeyProvider>);
2134
2135impl KeyProvider for ArcKeyProvider {
2136 fn load_identity_keys(&self, agent_id: &str) -> AcpResult<IdentityKeyMaterial> {
2137 self.0.load_identity_keys(agent_id)
2138 }
2139
2140 fn load_tls_material(&self, agent_id: &str) -> AcpResult<crate::key_provider::TlsMaterial> {
2141 self.0.load_tls_material(agent_id)
2142 }
2143
2144 fn load_ca_bundle(&self, agent_id: &str) -> AcpResult<Option<String>> {
2145 self.0.load_ca_bundle(agent_id)
2146 }
2147
2148 fn describe(&self) -> KeyProviderInfo {
2149 self.0.describe()
2150 }
2151}