1use std::collections::{HashMap, HashSet};
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use serde::Serialize;
11use serde_json::{Map, Value};
12use uuid::Uuid;
13
14use crate::amqp_transport::AmqpTransportClient;
15use crate::capabilities::AgentCapabilities;
16use crate::constants::{ACP_VERSION, DEFAULT_CRYPTO_SUITE};
17use crate::crypto;
18use crate::discovery::DiscoveryClient;
19use crate::errors::{AcpError, AcpResult, FailReason};
20use crate::http_security::{HttpSecurityPolicy, validate_http_client_policy, validate_http_url};
21use crate::identity::{
22 AgentIdentity, parse_agent_id, read_identity, verify_identity_document, write_identity,
23};
24use crate::key_provider::{
25 IdentityKeyMaterial, KeyProvider, KeyProviderInfo, LocalKeyProvider, VaultKeyProvider,
26};
27use crate::messages::{
28 AcpMessage, CompensateInstruction, DeliveryMode, DeliveryOutcome, DeliveryState, Envelope,
29 MessageClass, SendResult, build_ack_payload, build_fail_payload,
30};
31use crate::mqtt_transport::MqttTransportClient;
32use crate::options::AcpAgentOptions;
33use crate::transport::{TransportClient, TransportResponse};
34use crate::well_known::build_well_known_document;
35
36#[derive(Debug, Clone, Serialize)]
37pub struct InboundResult {
38 pub state: DeliveryState,
39 pub reason_code: Option<String>,
40 pub detail: Option<String>,
41 pub decrypted_payload: Option<Map<String, Value>>,
42 pub response_message: Option<Map<String, Value>>,
43}
44
45#[derive(Debug, Clone)]
46pub struct DecryptedMessage {
47 pub message: AcpMessage,
48 pub payload: Map<String, Value>,
49}
50
51pub type InboundHandlerFn =
52 dyn for<'a> Fn(&'a Map<String, Value>, &'a Envelope) -> Option<Map<String, Value>>;
53
54#[derive(Debug, Clone)]
55pub struct CapabilityRequestResult {
56 pub result: SendResult,
57 pub capabilities: Option<Map<String, Value>>,
58}
59
60#[derive(Debug, Clone)]
61pub struct AcpAgent {
62 pub identity: AgentIdentity,
63 pub identity_document: Map<String, Value>,
64 pub discovery: DiscoveryClient,
65 pub transport: TransportClient,
66 pub amqp_transport: Option<AmqpTransportClient>,
67 pub mqtt_transport: Option<MqttTransportClient>,
68 pub capabilities: AgentCapabilities,
69 pub storage_dir: PathBuf,
70 pub trust_profile: String,
71 pub relay_url: String,
72 pub default_delivery_mode: DeliveryMode,
73 pub key_provider_info: KeyProviderInfo,
74 delivery_states: HashMap<String, HashMap<String, String>>,
75 dedup: DedupStore,
76}
77
78impl AcpAgent {
79 pub fn load_or_create(agent_id: &str, options: Option<AcpAgentOptions>) -> AcpResult<Self> {
80 parse_agent_id(agent_id)?;
81 let options = options.unwrap_or_default();
82 std::fs::create_dir_all(&options.storage_dir)?;
83
84 let key_provider = resolve_key_provider(&options)?;
85 let key_provider_info = key_provider.describe();
86
87 let provider_tls_material = key_provider.load_tls_material(agent_id).ok();
88 let provider_ca_bundle = key_provider.load_ca_bundle(agent_id).ok().flatten();
89
90 let effective_ca_file = first_non_blank(&[
91 options.ca_file.clone(),
92 provider_tls_material
93 .as_ref()
94 .and_then(|m| m.ca_file.clone()),
95 provider_ca_bundle,
96 ]);
97 let effective_cert_file = first_non_blank(&[
98 options.cert_file.clone(),
99 provider_tls_material
100 .as_ref()
101 .and_then(|m| m.cert_file.clone()),
102 ]);
103 let effective_key_file = first_non_blank(&[
104 options.key_file.clone(),
105 provider_tls_material
106 .as_ref()
107 .and_then(|m| m.key_file.clone()),
108 ]);
109
110 let policy = HttpSecurityPolicy {
111 allow_insecure_http: options.allow_insecure_http,
112 allow_insecure_tls: options.allow_insecure_tls,
113 mtls_enabled: options.mtls_enabled,
114 ca_file: effective_ca_file.clone(),
115 cert_file: effective_cert_file.clone(),
116 key_file: effective_key_file.clone(),
117 };
118 validate_http_client_policy(&policy, "Agent HTTP security configuration")?;
119 if let Some(endpoint) = options.endpoint.as_deref() {
120 validate_http_url(
121 endpoint,
122 policy.allow_insecure_http,
123 policy.mtls_enabled,
124 "Agent direct endpoint configuration",
125 )?;
126 }
127 validate_http_url(
128 &options.relay_url,
129 policy.allow_insecure_http,
130 policy.mtls_enabled,
131 "Agent relay URL configuration",
132 )?;
133 for relay_hint in &options.relay_hints {
134 validate_http_url(
135 relay_hint,
136 policy.allow_insecure_http,
137 policy.mtls_enabled,
138 "Agent relay hint configuration",
139 )?;
140 }
141 for directory_hint in &options.enterprise_directory_hints {
142 validate_http_url(
143 directory_hint,
144 policy.allow_insecure_http,
145 policy.mtls_enabled,
146 "Agent enterprise directory hint configuration",
147 )?;
148 }
149
150 let local_amqp_service = build_local_amqp_service(agent_id, &options)?;
151 let local_mqtt_service = build_local_mqtt_service(agent_id, &options)?;
152
153 let provider_identity_keys = key_provider.load_identity_keys(agent_id).ok();
154 let external_key_provider = !matches_provider_local(&key_provider_info);
155
156 let (identity, identity_document, capabilities) =
157 match read_identity(&options.storage_dir, agent_id)? {
158 None => {
159 let identity = if let Some(keys) = &provider_identity_keys {
160 identity_from_provider(agent_id, keys)?
161 } else if external_key_provider {
162 return Err(AcpError::KeyProvider(
163 "Unable to load identity keys from key provider".to_string(),
164 ));
165 } else {
166 AgentIdentity::create(agent_id)?
167 };
168 let capabilities = options
169 .capabilities
170 .clone()
171 .unwrap_or_else(|| AgentCapabilities::new(agent_id.to_string()));
172 let mut identity_document = identity.build_identity_document(
173 options.endpoint.as_deref(),
174 &options.relay_hints,
175 &options.trust_profile,
176 Some(&capabilities.to_map()),
177 365,
178 local_amqp_service.as_ref(),
179 local_mqtt_service.as_ref(),
180 if options.mtls_enabled {
181 Some("mtls")
182 } else {
183 None
184 },
185 if options.mtls_enabled {
186 Some("mtls")
187 } else {
188 None
189 },
190 )?;
191 apply_http_security_profile(&mut identity_document, options.mtls_enabled);
192 write_identity(&options.storage_dir, &identity, &identity_document)?;
193 (identity, identity_document, capabilities)
194 }
195 Some(bundle) => {
196 let mut identity = bundle.identity;
197 let mut identity_document = bundle.identity_document;
198 if let Some(keys) = &provider_identity_keys {
199 identity = apply_provider_keys(&identity, keys)?;
200 } else if external_key_provider {
201 return Err(AcpError::KeyProvider(
202 "Unable to load identity keys from key provider".to_string(),
203 ));
204 }
205 let valid_document = verify_identity_document(&identity_document);
206 let capabilities = options.capabilities.clone().unwrap_or_else(|| {
207 AgentCapabilities::from_map(
208 identity_document
209 .get("capabilities")
210 .and_then(Value::as_object),
211 agent_id,
212 )
213 });
214 let should_rewrite = !valid_document
215 || options.endpoint.is_some()
216 || !options.relay_hints.is_empty()
217 || options.capabilities.is_some()
218 || local_amqp_service.is_some()
219 || local_mqtt_service.is_some();
220 if should_rewrite {
221 let existing_service = identity_document
222 .get("service")
223 .and_then(Value::as_object)
224 .cloned()
225 .unwrap_or_default();
226 let existing_endpoint = existing_service
227 .get("direct_endpoint")
228 .and_then(Value::as_str)
229 .map(str::to_string);
230 let existing_hints = existing_service
231 .get("relay_hints")
232 .and_then(Value::as_array)
233 .map(|items| {
234 items
235 .iter()
236 .filter_map(Value::as_str)
237 .map(str::to_string)
238 .collect::<Vec<_>>()
239 })
240 .unwrap_or_default();
241 let existing_amqp_service = existing_service
242 .get("amqp")
243 .and_then(Value::as_object)
244 .cloned();
245 let existing_mqtt_service = existing_service
246 .get("mqtt")
247 .and_then(Value::as_object)
248 .cloned();
249 identity_document = identity.build_identity_document(
250 options.endpoint.as_deref().or(existing_endpoint.as_deref()),
251 if options.relay_hints.is_empty() {
252 &existing_hints
253 } else {
254 &options.relay_hints
255 },
256 &options.trust_profile,
257 Some(&capabilities.to_map()),
258 365,
259 local_amqp_service
260 .as_ref()
261 .or(existing_amqp_service.as_ref()),
262 local_mqtt_service
263 .as_ref()
264 .or(existing_mqtt_service.as_ref()),
265 if options.mtls_enabled {
266 Some("mtls")
267 } else {
268 None
269 },
270 if options.mtls_enabled {
271 Some("mtls")
272 } else {
273 None
274 },
275 )?;
276 apply_http_security_profile(&mut identity_document, options.mtls_enabled);
277 write_identity(&options.storage_dir, &identity, &identity_document)?;
278 }
279 (identity, identity_document, capabilities)
280 }
281 };
282
283 let mut effective_relay_hints = if !options.relay_hints.is_empty() {
284 options.relay_hints.clone()
285 } else {
286 identity_document
287 .get("service")
288 .and_then(Value::as_object)
289 .and_then(|service| service.get("relay_hints"))
290 .and_then(Value::as_array)
291 .map(|items| {
292 items
293 .iter()
294 .filter_map(Value::as_str)
295 .map(str::to_string)
296 .collect::<Vec<_>>()
297 })
298 .unwrap_or_default()
299 };
300 if !effective_relay_hints.contains(&options.relay_url) {
301 effective_relay_hints.push(options.relay_url.clone());
302 }
303
304 let mut discovery = DiscoveryClient::new(
305 Some(options.storage_dir.join("discovery_cache.json")),
306 Some(options.discovery_scheme.clone()),
307 Some(effective_relay_hints),
308 Some(options.enterprise_directory_hints.clone()),
309 options.http_timeout_seconds,
310 policy.allow_insecure_http,
311 policy.allow_insecure_tls,
312 policy.ca_file.clone(),
313 policy.mtls_enabled,
314 policy.cert_file.clone(),
315 policy.key_file.clone(),
316 )?;
317 discovery.seed(identity_document.clone())?;
318
319 let amqp_transport = if let Some(transport) = options.amqp_transport.clone() {
320 Some(transport)
321 } else if let Some(broker_url) = options.amqp_broker_url.as_deref() {
322 Some(AmqpTransportClient::new(
323 broker_url.to_string(),
324 Some(options.amqp_exchange.clone()),
325 Some(options.amqp_exchange_type.clone()),
326 options.http_timeout_seconds,
327 )?)
328 } else {
329 None
330 };
331
332 let mqtt_transport = if let Some(transport) = options.mqtt_transport.clone() {
333 Some(transport)
334 } else if let Some(broker_url) = options.mqtt_broker_url.as_deref() {
335 Some(MqttTransportClient::new(
336 broker_url.to_string(),
337 Some(options.mqtt_qos),
338 Some(options.mqtt_topic_prefix.clone()),
339 options.http_timeout_seconds,
340 30,
341 )?)
342 } else {
343 None
344 };
345
346 Ok(Self {
347 identity,
348 identity_document,
349 discovery,
350 transport: TransportClient::new(options.http_timeout_seconds, &policy)?,
351 amqp_transport,
352 mqtt_transport,
353 capabilities,
354 storage_dir: options.storage_dir,
355 trust_profile: options.trust_profile,
356 relay_url: options.relay_url,
357 default_delivery_mode: options.default_delivery_mode,
358 key_provider_info,
359 delivery_states: HashMap::new(),
360 dedup: DedupStore::new(Duration::from_secs(3600)),
361 })
362 }
363
364 pub fn agent_id(&self) -> &str {
365 &self.identity.agent_id
366 }
367
368 pub fn get_delivery_states(&self) -> &HashMap<String, HashMap<String, String>> {
369 &self.delivery_states
370 }
371
372 pub fn build_well_known_document(
373 &self,
374 base_url: Option<&str>,
375 identity_document_url: Option<&str>,
376 ) -> AcpResult<Map<String, Value>> {
377 let resolved_base_url = base_url
378 .map(str::to_string)
379 .or_else(|| {
380 self.identity_document
381 .get("service")
382 .and_then(Value::as_object)
383 .and_then(|service| service.get("direct_endpoint"))
384 .and_then(Value::as_str)
385 .and_then(base_url_from_endpoint)
386 })
387 .ok_or_else(|| {
388 AcpError::Validation(
389 "Unable to build /.well-known/acp metadata without base_url or direct_endpoint"
390 .to_string(),
391 )
392 })?;
393 build_well_known_document(
394 &self.identity_document,
395 &resolved_base_url,
396 identity_document_url,
397 Some(ACP_VERSION),
398 )
399 }
400
401 pub fn register_identity_document(
402 &mut self,
403 identity_document: Map<String, Value>,
404 ) -> AcpResult<()> {
405 self.discovery.register_identity_document(identity_document)
406 }
407
408 pub fn resolve_well_known(
409 &mut self,
410 base_url: &str,
411 expected_agent_id: Option<&str>,
412 ) -> AcpResult<Map<String, Value>> {
413 self.discovery
414 .resolve_well_known(base_url, expected_agent_id)
415 }
416
417 pub fn send(
418 &mut self,
419 recipients: Vec<String>,
420 payload: Map<String, Value>,
421 context: Option<String>,
422 message_class: MessageClass,
423 expires_in_seconds: i64,
424 correlation_id: Option<String>,
425 in_reply_to: Option<String>,
426 delivery_mode: Option<DeliveryMode>,
427 ) -> AcpResult<SendResult> {
428 if recipients.is_empty() {
429 return Err(AcpError::InvalidArgument(
430 "send() requires at least one recipient".to_string(),
431 ));
432 }
433 let mode = delivery_mode.unwrap_or(self.default_delivery_mode);
434 let operation_id = Uuid::new_v4().to_string();
435 let context_id = context.unwrap_or_else(|| operation_id.clone());
436
437 let resolved = self.resolve_recipients(&recipients, mode)?;
438 if resolved.deliverable.is_empty() {
439 let result = SendResult {
440 operation_id: operation_id.clone(),
441 message_id: Uuid::new_v4().to_string(),
442 message_ids: Vec::new(),
443 outcomes: resolved.preflight_outcomes.clone(),
444 };
445 self.sync_delivery_states(&operation_id, &result.outcomes);
446 return Ok(result);
447 }
448
449 let mut outcomes = resolved.preflight_outcomes;
450 let mut message_ids = Vec::new();
451 let direct_targets = resolved
452 .deliverable
453 .iter()
454 .filter(|target| target.channel == "direct")
455 .cloned()
456 .collect::<Vec<_>>();
457 let relay_targets = resolved
458 .deliverable
459 .iter()
460 .filter(|target| target.channel == "relay")
461 .cloned()
462 .collect::<Vec<_>>();
463 let amqp_targets = resolved
464 .deliverable
465 .iter()
466 .filter(|target| target.channel == "amqp")
467 .cloned()
468 .collect::<Vec<_>>();
469 let mqtt_targets = resolved
470 .deliverable
471 .iter()
472 .filter(|target| target.channel == "mqtt")
473 .cloned()
474 .collect::<Vec<_>>();
475
476 if !direct_targets.is_empty() {
477 let message = self.build_message(
478 direct_targets.iter().map(|r| r.recipient.clone()).collect(),
479 &payload,
480 &to_public_key_map(&direct_targets),
481 message_class,
482 &context_id,
483 Some(operation_id.clone()),
484 expires_in_seconds,
485 correlation_id.clone(),
486 in_reply_to.clone(),
487 )?;
488 message_ids.push(message.envelope.message_id.clone());
489 outcomes.extend(self.deliver_direct(&message, &direct_targets));
490 }
491 if !relay_targets.is_empty() {
492 let message = self.build_message(
493 relay_targets.iter().map(|r| r.recipient.clone()).collect(),
494 &payload,
495 &to_public_key_map(&relay_targets),
496 message_class,
497 &context_id,
498 Some(operation_id.clone()),
499 expires_in_seconds,
500 correlation_id.clone(),
501 in_reply_to.clone(),
502 )?;
503 message_ids.push(message.envelope.message_id.clone());
504 outcomes.extend(self.deliver_via_relay(&message, &relay_targets));
505 }
506 for target in &amqp_targets {
507 let recipient_public_keys =
508 HashMap::from([(target.recipient.clone(), target.public_key.clone())]);
509 let message = self.build_message(
510 vec![target.recipient.clone()],
511 &payload,
512 &recipient_public_keys,
513 message_class,
514 &context_id,
515 Some(operation_id.clone()),
516 expires_in_seconds,
517 correlation_id.clone(),
518 in_reply_to.clone(),
519 )?;
520 message_ids.push(message.envelope.message_id.clone());
521 outcomes.push(self.deliver_via_amqp(&message, target));
522 }
523 for target in &mqtt_targets {
524 let recipient_public_keys =
525 HashMap::from([(target.recipient.clone(), target.public_key.clone())]);
526 let message = self.build_message(
527 vec![target.recipient.clone()],
528 &payload,
529 &recipient_public_keys,
530 message_class,
531 &context_id,
532 Some(operation_id.clone()),
533 expires_in_seconds,
534 correlation_id.clone(),
535 in_reply_to.clone(),
536 )?;
537 message_ids.push(message.envelope.message_id.clone());
538 outcomes.push(self.deliver_via_mqtt(&message, target));
539 }
540 if message_ids.is_empty() {
541 message_ids.push(Uuid::new_v4().to_string());
542 }
543 let result = SendResult {
544 operation_id: operation_id.clone(),
545 message_id: message_ids[0].clone(),
546 message_ids,
547 outcomes,
548 };
549 self.sync_delivery_states(&operation_id, &result.outcomes);
550 Ok(result)
551 }
552
553 pub fn send_basic(
554 &mut self,
555 recipients: Vec<String>,
556 payload: Map<String, Value>,
557 context: Option<String>,
558 ) -> AcpResult<SendResult> {
559 self.send(
560 recipients,
561 payload,
562 context,
563 MessageClass::Send,
564 300,
565 None,
566 None,
567 Some(self.default_delivery_mode),
568 )
569 }
570
571 pub fn send_compensate(
572 &mut self,
573 recipients: Vec<String>,
574 original_operation_id: &str,
575 reason: &str,
576 actions: Vec<Map<String, Value>>,
577 context: Option<String>,
578 delivery_mode: Option<DeliveryMode>,
579 ) -> AcpResult<SendResult> {
580 let instruction = CompensateInstruction {
581 operation_id: original_operation_id.to_string(),
582 reason: reason.to_string(),
583 actions,
584 };
585 let mut payload = Map::new();
586 payload.insert(
587 "compensation".to_string(),
588 serde_json::to_value(instruction).unwrap_or(Value::Null),
589 );
590 self.send(
591 recipients,
592 payload,
593 context.or_else(|| Some(format!("compensate:{original_operation_id}"))),
594 MessageClass::Compensate,
595 300,
596 Some(original_operation_id.to_string()),
597 None,
598 delivery_mode,
599 )
600 }
601
602 pub fn decrypt_message_for_self(
603 &mut self,
604 raw_message: &Map<String, Value>,
605 ) -> AcpResult<DecryptedMessage> {
606 let message = AcpMessage::from_map(raw_message)?;
607 self.validate_envelope_for_inbound(&message.envelope)?;
608 if !message
609 .envelope
610 .recipients
611 .iter()
612 .any(|recipient| recipient == self.agent_id())
613 {
614 return Err(AcpError::Processing {
615 reason: FailReason::PolicyRejected,
616 detail: "Message is not addressed to this agent".to_string(),
617 });
618 }
619 let sender_doc =
620 self.resolve_sender_identity_document(raw_message, &message.envelope.sender)?;
621 let sender_signing_key = sender_doc
622 .get("keys")
623 .and_then(Value::as_object)
624 .and_then(|keys| keys.get("signing"))
625 .and_then(Value::as_object)
626 .and_then(|signing| signing.get("public_key"))
627 .and_then(Value::as_str)
628 .ok_or_else(|| AcpError::Processing {
629 reason: FailReason::InvalidSignature,
630 detail: "Sender signing public key missing".to_string(),
631 })?;
632 if !crypto::verify_protected_payload_signature(
633 &message.envelope,
634 &message.protected_payload,
635 sender_signing_key,
636 ) {
637 return Err(AcpError::Processing {
638 reason: FailReason::InvalidSignature,
639 detail: "Message signature verification failed".to_string(),
640 });
641 }
642 let payload = crypto::decrypt_for_recipient(
643 &message.envelope,
644 &message.protected_payload,
645 self.agent_id(),
646 &self.identity.encryption_private_key,
647 )?;
648 Ok(DecryptedMessage { message, payload })
649 }
650
651 pub fn receive(
652 &mut self,
653 raw_message: &Map<String, Value>,
654 handler: Option<&InboundHandlerFn>,
655 ) -> InboundResult {
656 let mut result = InboundResult {
657 state: DeliveryState::Failed,
658 reason_code: None,
659 detail: None,
660 decrypted_payload: None,
661 response_message: None,
662 };
663
664 let request_message = match AcpMessage::from_map(raw_message) {
665 Ok(message) => message,
666 Err(exc) => {
667 result.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
668 result.detail = Some(format!("Invalid ACP message structure: {exc}"));
669 return result;
670 }
671 };
672
673 let mut sender_identity_document: Option<Map<String, Value>> = None;
674
675 let processing_result = (|| -> AcpResult<()> {
676 self.validate_envelope_for_inbound(&request_message.envelope)?;
677 if !request_message
678 .envelope
679 .recipients
680 .iter()
681 .any(|recipient| recipient == self.agent_id())
682 {
683 return Err(AcpError::Processing {
684 reason: FailReason::PolicyRejected,
685 detail: format!("Recipient {} not in message recipients", self.agent_id()),
686 });
687 }
688
689 let sender_doc = self
690 .resolve_sender_identity_document(raw_message, &request_message.envelope.sender)?;
691 sender_identity_document = Some(sender_doc.clone());
692 let sender_signing_key = sender_doc
693 .get("keys")
694 .and_then(Value::as_object)
695 .and_then(|keys| keys.get("signing"))
696 .and_then(Value::as_object)
697 .and_then(|signing| signing.get("public_key"))
698 .and_then(Value::as_str)
699 .ok_or_else(|| AcpError::Processing {
700 reason: FailReason::InvalidSignature,
701 detail: "Sender signing key missing from identity document".to_string(),
702 })?;
703 if !crypto::verify_protected_payload_signature(
704 &request_message.envelope,
705 &request_message.protected_payload,
706 sender_signing_key,
707 ) {
708 return Err(AcpError::Processing {
709 reason: FailReason::InvalidSignature,
710 detail: "Signature verification failed".to_string(),
711 });
712 }
713
714 if self
715 .dedup
716 .is_duplicate(&request_message.envelope.message_id)
717 {
718 result.state = DeliveryState::Acknowledged;
719 result.detail = Some("Duplicate message acknowledged".to_string());
720 if !matches!(
721 request_message.envelope.message_class,
722 MessageClass::Ack | MessageClass::Fail
723 ) {
724 let duplicate_ack = self.create_response_message(
725 &sender_doc,
726 &request_message.envelope,
727 MessageClass::Ack,
728 build_ack_payload(&request_message.envelope.message_id, "duplicate"),
729 )?;
730 result.response_message = Some(duplicate_ack.to_map()?);
731 }
732 return Ok(());
733 }
734
735 let decrypted_payload = crypto::decrypt_for_recipient(
736 &request_message.envelope,
737 &request_message.protected_payload,
738 self.agent_id(),
739 &self.identity.encryption_private_key,
740 )?;
741 result.decrypted_payload = Some(decrypted_payload.clone());
742
743 let response_message =
744 if request_message.envelope.message_class == MessageClass::Capabilities {
745 Some(self.create_response_message(
746 &sender_doc,
747 &request_message.envelope,
748 MessageClass::Capabilities,
749 self.capabilities.to_map(),
750 )?)
751 } else {
752 let mut ack_payload =
753 build_ack_payload(&request_message.envelope.message_id, "accepted");
754 if let Some(handler) = handler
755 && let Some(handler_payload) =
756 handler(&decrypted_payload, &request_message.envelope)
757 && !handler_payload.is_empty()
758 {
759 ack_payload.insert("handler".to_string(), Value::Object(handler_payload));
760 }
761 if matches!(
762 request_message.envelope.message_class,
763 MessageClass::Ack | MessageClass::Fail
764 ) {
765 None
766 } else {
767 Some(self.create_response_message(
768 &sender_doc,
769 &request_message.envelope,
770 MessageClass::Ack,
771 ack_payload,
772 )?)
773 }
774 };
775
776 self.dedup
777 .mark_processed(&request_message.envelope.message_id);
778 result.state = DeliveryState::Acknowledged;
779 result.response_message = response_message
780 .map(|message| message.to_map())
781 .transpose()?;
782 Ok(())
783 })();
784
785 if let Err(exc) = processing_result {
786 let (reason_code, detail) = match exc {
787 AcpError::Processing { reason, detail } => (reason.as_str().to_string(), detail),
788 _ => (
789 FailReason::PolicyRejected.as_str().to_string(),
790 exc.to_string(),
791 ),
792 };
793 result.reason_code = Some(reason_code.clone());
794 result.detail = Some(detail.clone());
795 if let Some(sender_doc) = sender_identity_document {
796 let fail_response = self.create_response_message(
797 &sender_doc,
798 &request_message.envelope,
799 MessageClass::Fail,
800 build_fail_payload(reason_code, detail, false),
801 );
802 result.response_message =
803 fail_response.ok().and_then(|message| message.to_map().ok());
804 }
805 }
806 result
807 }
808
809 pub fn request_capabilities(&mut self, recipient: &str) -> AcpResult<CapabilityRequestResult> {
810 let mut payload = Map::new();
811 payload.insert(
812 "request".to_string(),
813 Value::String("capabilities".to_string()),
814 );
815 let result = self.send(
816 vec![recipient.to_string()],
817 payload,
818 Some(format!("capabilities:{}", Uuid::new_v4())),
819 MessageClass::Capabilities,
820 300,
821 None,
822 None,
823 Some(self.default_delivery_mode),
824 )?;
825 let mut response_payload = None;
826 for outcome in &result.outcomes {
827 let Some(response_message) = &outcome.response_message else {
828 continue;
829 };
830 if let Ok(decrypted) = self.decrypt_message_for_self(response_message)
831 && decrypted.message.envelope.message_class == MessageClass::Capabilities
832 {
833 response_payload = Some(decrypted.payload);
834 break;
835 }
836 }
837 Ok(CapabilityRequestResult {
838 result,
839 capabilities: response_payload,
840 })
841 }
842
843 pub fn consume_from_amqp<F>(
844 &mut self,
845 max_messages: usize,
846 handler: Option<F>,
847 ) -> AcpResult<usize>
848 where
849 F: Fn(&Map<String, Value>, &Envelope) -> Option<Map<String, Value>> + Send + 'static,
850 {
851 let amqp_transport = self.amqp_transport.clone().ok_or_else(|| {
852 AcpError::Transport("consume_from_amqp requires an AMQP-configured agent".to_string())
853 })?;
854 let amqp_service = self
855 .identity_document
856 .get("service")
857 .and_then(Value::as_object)
858 .and_then(|service| service.get("amqp"))
859 .and_then(Value::as_object)
860 .cloned()
861 .ok_or_else(|| {
862 AcpError::Transport(
863 "Identity document is missing service.amqp configuration".to_string(),
864 )
865 })?;
866 let agent_ptr = Arc::new(std::sync::Mutex::new(self.clone()));
867 let agent_for_handler = Arc::clone(&agent_ptr);
868 let processed = amqp_transport.consume(
869 self.agent_id(),
870 move |raw_message| {
871 let Ok(mut guard) = agent_for_handler.lock() else {
872 return false;
873 };
874 let inbound = guard.receive(
875 raw_message,
876 handler.as_ref().map(|inner| inner as &InboundHandlerFn),
877 );
878 if let Some(response_message) = inbound.response_message.clone() {
879 if guard
880 .publish_amqp_response_message(raw_message, &response_message)
881 .is_err()
882 {
883 return false;
884 }
885 }
886 matches!(
887 inbound.state,
888 DeliveryState::Acknowledged
889 | DeliveryState::Failed
890 | DeliveryState::Declined
891 | DeliveryState::Expired
892 )
893 },
894 Some(&amqp_service),
895 max_messages,
896 )?;
897 if let Ok(guard) = agent_ptr.lock() {
898 *self = guard.clone();
899 }
900 Ok(processed)
901 }
902
903 pub fn consume_from_mqtt<F>(
904 &mut self,
905 max_messages: usize,
906 handler: Option<F>,
907 ) -> AcpResult<usize>
908 where
909 F: Fn(&Map<String, Value>, &Envelope) -> Option<Map<String, Value>> + Send + 'static,
910 {
911 let mqtt_transport = self.mqtt_transport.clone().ok_or_else(|| {
912 AcpError::Transport("consume_from_mqtt requires an MQTT-configured agent".to_string())
913 })?;
914 let mqtt_service = self
915 .identity_document
916 .get("service")
917 .and_then(Value::as_object)
918 .and_then(|service| service.get("mqtt"))
919 .and_then(Value::as_object)
920 .cloned()
921 .ok_or_else(|| {
922 AcpError::Transport(
923 "Identity document is missing service.mqtt configuration".to_string(),
924 )
925 })?;
926 let agent_ptr = Arc::new(std::sync::Mutex::new(self.clone()));
927 let agent_for_handler = Arc::clone(&agent_ptr);
928 let processed = mqtt_transport.consume(
929 self.agent_id(),
930 move |raw_message| {
931 let Ok(mut guard) = agent_for_handler.lock() else {
932 return false;
933 };
934 let inbound = guard.receive(
935 raw_message,
936 handler.as_ref().map(|inner| inner as &InboundHandlerFn),
937 );
938 if let Some(response_message) = inbound.response_message.clone() {
939 if guard
940 .publish_mqtt_response_message(raw_message, &response_message)
941 .is_err()
942 {
943 return false;
944 }
945 }
946 matches!(
947 inbound.state,
948 DeliveryState::Acknowledged
949 | DeliveryState::Failed
950 | DeliveryState::Declined
951 | DeliveryState::Expired
952 )
953 },
954 Some(&mqtt_service),
955 max_messages,
956 Duration::from_secs(1),
957 )?;
958 if let Ok(guard) = agent_ptr.lock() {
959 *self = guard.clone();
960 }
961 Ok(processed)
962 }
963
964 fn resolve_recipients(
965 &mut self,
966 recipients: &[String],
967 mode: DeliveryMode,
968 ) -> AcpResult<ResolvedRecipients> {
969 let mut deliverable = Vec::new();
970 let mut preflight_outcomes = Vec::new();
971 for recipient in recipients {
972 let identity_doc = match self.discovery.resolve(recipient) {
973 Ok(identity_doc) => identity_doc,
974 Err(exc) => {
975 preflight_outcomes.push(failed_outcome(
976 recipient,
977 FailReason::PolicyRejected.as_str(),
978 &exc.to_string(),
979 ));
980 continue;
981 }
982 };
983 let remote_capabilities = AgentCapabilities::from_map(
984 identity_doc.get("capabilities").and_then(Value::as_object),
985 recipient,
986 );
987 let capability_match = self.capabilities.choose_compatible(&remote_capabilities);
988 if !capability_match.compatible {
989 preflight_outcomes.push(failed_outcome(
990 recipient,
991 reason_for_capability_mismatch(capability_match.reason.as_deref()).as_str(),
992 capability_match
993 .reason
994 .as_deref()
995 .unwrap_or("No compatible capabilities"),
996 ));
997 continue;
998 }
999 let choice = self.choose_delivery_channel(&remote_capabilities, &identity_doc, mode)?;
1000 let Some(channel) = choice.channel else {
1001 preflight_outcomes.push(failed_outcome(
1002 recipient,
1003 FailReason::PolicyRejected.as_str(),
1004 choice
1005 .detail
1006 .as_deref()
1007 .unwrap_or("Delivery channel unavailable"),
1008 ));
1009 continue;
1010 };
1011 let recipient_public_key = identity_doc
1012 .get("keys")
1013 .and_then(Value::as_object)
1014 .and_then(|keys| keys.get("encryption"))
1015 .and_then(Value::as_object)
1016 .and_then(|enc| enc.get("public_key"))
1017 .and_then(Value::as_str)
1018 .map(str::to_string);
1019 let Some(recipient_public_key) = recipient_public_key else {
1020 preflight_outcomes.push(failed_outcome(
1021 recipient,
1022 FailReason::PolicyRejected.as_str(),
1023 "Recipient identity document missing encryption public key",
1024 ));
1025 continue;
1026 };
1027 deliverable.push(ResolvedRecipient {
1028 recipient: recipient.to_string(),
1029 public_key: recipient_public_key,
1030 channel,
1031 endpoint: choice.endpoint,
1032 amqp_service: choice.amqp_service,
1033 mqtt_service: choice.mqtt_service,
1034 });
1035 }
1036 Ok(ResolvedRecipients {
1037 deliverable,
1038 preflight_outcomes,
1039 })
1040 }
1041
1042 fn choose_delivery_channel(
1043 &self,
1044 remote_capabilities: &AgentCapabilities,
1045 identity_document: &Map<String, Value>,
1046 mode: DeliveryMode,
1047 ) -> AcpResult<ChannelChoice> {
1048 let remote_transports: HashSet<String> = remote_capabilities
1049 .transports
1050 .iter()
1051 .map(|t| t.to_lowercase())
1052 .collect();
1053 let shared = self
1054 .capabilities
1055 .transports
1056 .iter()
1057 .filter(|transport| remote_transports.contains(&transport.to_lowercase()))
1058 .map(|transport| transport.to_lowercase())
1059 .collect::<Vec<_>>();
1060
1061 let service = identity_document
1062 .get("service")
1063 .and_then(Value::as_object)
1064 .cloned()
1065 .unwrap_or_default();
1066 let direct_endpoint = service
1067 .get("direct_endpoint")
1068 .and_then(Value::as_str)
1069 .map(str::to_string);
1070 let has_direct = direct_endpoint
1071 .as_deref()
1072 .map(|endpoint| !endpoint.trim().is_empty())
1073 .unwrap_or(false);
1074 let direct_available = has_direct
1075 && shared
1076 .iter()
1077 .any(|transport| matches!(transport.as_str(), "https" | "http" | "direct"));
1078 let relay_available =
1079 !self.relay_url.trim().is_empty() && shared.iter().any(|t| t == "relay");
1080 let amqp_service = service.get("amqp").and_then(Value::as_object).cloned();
1081 let amqp_available = shared.iter().any(|t| t == "amqp")
1082 && amqp_service
1083 .as_ref()
1084 .and_then(|service| service.get("broker_url"))
1085 .and_then(Value::as_str)
1086 .map(str::trim)
1087 .filter(|v| !v.is_empty())
1088 .is_some();
1089 let mqtt_service = service.get("mqtt").and_then(Value::as_object).cloned();
1090 let mqtt_available = shared.iter().any(|t| t == "mqtt")
1091 && mqtt_service
1092 .as_ref()
1093 .and_then(|service| service.get("broker_url"))
1094 .and_then(Value::as_str)
1095 .map(str::trim)
1096 .filter(|v| !v.is_empty())
1097 .is_some()
1098 && mqtt_service
1099 .as_ref()
1100 .and_then(|service| service.get("topic"))
1101 .and_then(Value::as_str)
1102 .map(str::trim)
1103 .filter(|v| !v.is_empty())
1104 .is_some();
1105
1106 match mode {
1107 DeliveryMode::Direct => {
1108 if direct_available {
1109 return Ok(ChannelChoice::new(
1110 Some("direct".to_string()),
1111 direct_endpoint,
1112 None,
1113 None,
1114 None,
1115 ));
1116 }
1117 return Ok(ChannelChoice::new(
1118 None,
1119 None,
1120 None,
1121 None,
1122 Some("No compatible direct transport and endpoint available".to_string()),
1123 ));
1124 }
1125 DeliveryMode::Relay => {
1126 if relay_available {
1127 return Ok(ChannelChoice::new(
1128 Some("relay".to_string()),
1129 None,
1130 None,
1131 None,
1132 None,
1133 ));
1134 }
1135 return Ok(ChannelChoice::new(
1136 None,
1137 None,
1138 None,
1139 None,
1140 Some("No compatible relay transport available".to_string()),
1141 ));
1142 }
1143 DeliveryMode::Amqp => {
1144 if amqp_available {
1145 return Ok(ChannelChoice::new(
1146 Some("amqp".to_string()),
1147 None,
1148 amqp_service,
1149 None,
1150 None,
1151 ));
1152 }
1153 return Ok(ChannelChoice::new(
1154 None,
1155 None,
1156 None,
1157 None,
1158 Some("No compatible AMQP transport available".to_string()),
1159 ));
1160 }
1161 DeliveryMode::Mqtt => {
1162 if mqtt_available {
1163 return Ok(ChannelChoice::new(
1164 Some("mqtt".to_string()),
1165 None,
1166 None,
1167 mqtt_service,
1168 None,
1169 ));
1170 }
1171 return Ok(ChannelChoice::new(
1172 None,
1173 None,
1174 None,
1175 None,
1176 Some("No compatible MQTT transport available".to_string()),
1177 ));
1178 }
1179 DeliveryMode::Auto => {}
1180 }
1181
1182 if direct_available {
1183 return Ok(ChannelChoice::new(
1184 Some("direct".to_string()),
1185 direct_endpoint,
1186 None,
1187 None,
1188 None,
1189 ));
1190 }
1191 if relay_available {
1192 return Ok(ChannelChoice::new(
1193 Some("relay".to_string()),
1194 None,
1195 None,
1196 None,
1197 None,
1198 ));
1199 }
1200 if amqp_available {
1201 return Ok(ChannelChoice::new(
1202 Some("amqp".to_string()),
1203 None,
1204 amqp_service,
1205 None,
1206 None,
1207 ));
1208 }
1209 if mqtt_available {
1210 return Ok(ChannelChoice::new(
1211 Some("mqtt".to_string()),
1212 None,
1213 None,
1214 mqtt_service,
1215 None,
1216 ));
1217 }
1218 if has_direct {
1219 return Ok(ChannelChoice::new(
1220 None,
1221 None,
1222 None,
1223 None,
1224 Some(
1225 "No compatible transport implementation available for this recipient"
1226 .to_string(),
1227 ),
1228 ));
1229 }
1230 if amqp_service.is_some() {
1231 return Ok(ChannelChoice::new(
1232 None,
1233 None,
1234 None,
1235 None,
1236 Some(
1237 "AMQP transport is advertised but not compatible with sender capabilities"
1238 .to_string(),
1239 ),
1240 ));
1241 }
1242 if mqtt_service.is_some() {
1243 return Ok(ChannelChoice::new(
1244 None,
1245 None,
1246 None,
1247 None,
1248 Some(
1249 "MQTT transport is advertised but not compatible with sender capabilities"
1250 .to_string(),
1251 ),
1252 ));
1253 }
1254 Ok(ChannelChoice::new(
1255 None,
1256 None,
1257 None,
1258 None,
1259 Some(
1260 "Recipient identity document is missing direct_endpoint/amqp/mqtt and no relay fallback is compatible"
1261 .to_string(),
1262 ),
1263 ))
1264 }
1265
1266 #[allow(clippy::too_many_arguments)]
1267 fn build_message(
1268 &self,
1269 recipients: Vec<String>,
1270 payload: &Map<String, Value>,
1271 recipient_public_keys: &HashMap<String, String>,
1272 message_class: MessageClass,
1273 context_id: &str,
1274 operation_id: Option<String>,
1275 expires_in_seconds: i64,
1276 correlation_id: Option<String>,
1277 in_reply_to: Option<String>,
1278 ) -> AcpResult<AcpMessage> {
1279 let envelope = Envelope::build(
1280 self.agent_id().to_string(),
1281 recipients,
1282 message_class,
1283 context_id.to_string(),
1284 expires_in_seconds,
1285 operation_id,
1286 None,
1287 correlation_id,
1288 in_reply_to,
1289 Some(DEFAULT_CRYPTO_SUITE.to_string()),
1290 )?;
1291 let mut protected_payload =
1292 crypto::encrypt_for_recipients(payload, &envelope, recipient_public_keys)?;
1293 crypto::sign_protected_payload(
1294 &envelope,
1295 &mut protected_payload,
1296 &self.identity.signing_private_key,
1297 &self.identity.signing_kid,
1298 )?;
1299 Ok(AcpMessage {
1300 envelope,
1301 protected_payload,
1302 sender_identity_document: Some(self.identity_document.clone()),
1303 })
1304 }
1305
1306 fn deliver_direct(
1307 &self,
1308 message: &AcpMessage,
1309 targets: &[ResolvedRecipient],
1310 ) -> Vec<DeliveryOutcome> {
1311 let mut outcomes = Vec::new();
1312 let message_map = match message.to_map() {
1313 Ok(map) => map,
1314 Err(exc) => {
1315 for target in targets {
1316 outcomes.push(failed_outcome(
1317 &target.recipient,
1318 FailReason::PolicyRejected.as_str(),
1319 &format!("Message serialization failure: {exc}"),
1320 ));
1321 }
1322 return outcomes;
1323 }
1324 };
1325 for target in targets {
1326 let Some(endpoint) = target.endpoint.as_deref() else {
1327 outcomes.push(failed_outcome(
1328 &target.recipient,
1329 FailReason::PolicyRejected.as_str(),
1330 "Missing direct endpoint for direct delivery",
1331 ));
1332 continue;
1333 };
1334 match self.transport.post_json(endpoint, &message_map) {
1335 Ok(response) => {
1336 outcomes.push(outcome_from_http_response(&target.recipient, &response))
1337 }
1338 Err(exc) => outcomes.push(failed_outcome(
1339 &target.recipient,
1340 FailReason::PolicyRejected.as_str(),
1341 &format!("Direct transport failure: {exc}"),
1342 )),
1343 }
1344 }
1345 outcomes
1346 }
1347
1348 fn deliver_via_relay(
1349 &self,
1350 message: &AcpMessage,
1351 targets: &[ResolvedRecipient],
1352 ) -> Vec<DeliveryOutcome> {
1353 let mut outcomes = Vec::new();
1354 match self.transport.send_to_relay(&self.relay_url, message) {
1355 Ok(relay_response) => {
1356 let mut delivered = HashSet::new();
1357 if let Some(raw_outcomes) = relay_response.get("outcomes").and_then(Value::as_array)
1358 {
1359 for item in raw_outcomes {
1360 if let Ok(outcome) = serde_json::from_value::<DeliveryOutcome>(item.clone())
1361 {
1362 delivered.insert(outcome.recipient.clone());
1363 outcomes.push(outcome);
1364 }
1365 }
1366 }
1367 for target in targets {
1368 if !delivered.contains(&target.recipient) {
1369 outcomes.push(failed_outcome(
1370 &target.recipient,
1371 FailReason::PolicyRejected.as_str(),
1372 "Relay did not return an outcome for recipient",
1373 ));
1374 }
1375 }
1376 }
1377 Err(exc) => {
1378 for target in targets {
1379 outcomes.push(failed_outcome(
1380 &target.recipient,
1381 FailReason::PolicyRejected.as_str(),
1382 &format!("Relay transport failure: {exc}"),
1383 ));
1384 }
1385 }
1386 }
1387 outcomes
1388 }
1389
1390 fn deliver_via_amqp(
1391 &self,
1392 message: &AcpMessage,
1393 target: &ResolvedRecipient,
1394 ) -> DeliveryOutcome {
1395 let mut outcome = DeliveryOutcome {
1396 recipient: target.recipient.clone(),
1397 state: DeliveryState::Pending,
1398 status_code: None,
1399 response_class: None,
1400 reason_code: None,
1401 detail: None,
1402 response_message: None,
1403 };
1404 let message_map = match message.to_map() {
1405 Ok(message_map) => message_map,
1406 Err(exc) => {
1407 outcome.state = DeliveryState::Failed;
1408 outcome.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
1409 outcome.detail = Some(format!("AMQP message serialization failure: {exc}"));
1410 return outcome;
1411 }
1412 };
1413 let result = (|| -> AcpResult<()> {
1414 let client = if let Some(amqp_transport) = &self.amqp_transport {
1415 amqp_transport.clone()
1416 } else {
1417 let broker_url = target
1418 .amqp_service
1419 .as_ref()
1420 .and_then(|service| service.get("broker_url"))
1421 .and_then(Value::as_str)
1422 .ok_or_else(|| {
1423 AcpError::Transport(
1424 "AMQP delivery selected but sender is not configured with an AMQP broker"
1425 .to_string(),
1426 )
1427 })?;
1428 AmqpTransportClient::new(
1429 broker_url.to_string(),
1430 target
1431 .amqp_service
1432 .as_ref()
1433 .and_then(|service| service.get("exchange"))
1434 .and_then(Value::as_str)
1435 .map(str::to_string),
1436 None,
1437 10,
1438 )?
1439 };
1440 client.publish(
1441 &message_map,
1442 &target.recipient,
1443 target.amqp_service.as_ref(),
1444 )
1445 })();
1446 match result {
1447 Ok(()) => outcome.state = DeliveryState::Delivered,
1448 Err(exc) => {
1449 outcome.state = DeliveryState::Failed;
1450 outcome.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
1451 outcome.detail = Some(format!("AMQP transport failure: {exc}"));
1452 }
1453 }
1454 outcome
1455 }
1456
1457 fn deliver_via_mqtt(
1458 &self,
1459 message: &AcpMessage,
1460 target: &ResolvedRecipient,
1461 ) -> DeliveryOutcome {
1462 let mut outcome = DeliveryOutcome {
1463 recipient: target.recipient.clone(),
1464 state: DeliveryState::Pending,
1465 status_code: None,
1466 response_class: None,
1467 reason_code: None,
1468 detail: None,
1469 response_message: None,
1470 };
1471 let message_map = match message.to_map() {
1472 Ok(message_map) => message_map,
1473 Err(exc) => {
1474 outcome.state = DeliveryState::Failed;
1475 outcome.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
1476 outcome.detail = Some(format!("MQTT message serialization failure: {exc}"));
1477 return outcome;
1478 }
1479 };
1480 let result = (|| -> AcpResult<()> {
1481 let client = if let Some(mqtt_transport) = &self.mqtt_transport {
1482 mqtt_transport.clone()
1483 } else {
1484 let service = target.mqtt_service.as_ref().ok_or_else(|| {
1485 AcpError::Transport(
1486 "MQTT delivery selected but sender is not configured with an MQTT broker"
1487 .to_string(),
1488 )
1489 })?;
1490 let broker_url = service
1491 .get("broker_url")
1492 .and_then(Value::as_str)
1493 .ok_or_else(|| {
1494 AcpError::Transport(
1495 "MQTT delivery selected but sender is not configured with an MQTT broker"
1496 .to_string(),
1497 )
1498 })?;
1499 MqttTransportClient::new(
1500 broker_url.to_string(),
1501 service.get("qos").and_then(|v| v.as_u64()).map(|v| v as u8),
1502 None,
1503 10,
1504 30,
1505 )?
1506 };
1507 client.publish(
1508 &message_map,
1509 &target.recipient,
1510 target.mqtt_service.as_ref(),
1511 )
1512 })();
1513 match result {
1514 Ok(()) => outcome.state = DeliveryState::Delivered,
1515 Err(exc) => {
1516 outcome.state = DeliveryState::Failed;
1517 outcome.reason_code = Some(FailReason::PolicyRejected.as_str().to_string());
1518 outcome.detail = Some(format!("MQTT transport failure: {exc}"));
1519 }
1520 }
1521 outcome
1522 }
1523
1524 fn publish_amqp_response_message(
1525 &mut self,
1526 raw_message: &Map<String, Value>,
1527 response_message: &Map<String, Value>,
1528 ) -> AcpResult<()> {
1529 let amqp_transport = self
1530 .amqp_transport
1531 .clone()
1532 .ok_or_else(|| AcpError::Transport("AMQP transport is not configured".to_string()))?;
1533 let sender_id = raw_message
1534 .get("envelope")
1535 .and_then(Value::as_object)
1536 .and_then(|envelope| envelope.get("sender"))
1537 .and_then(Value::as_str)
1538 .ok_or_else(|| {
1539 AcpError::Transport(
1540 "Inbound message sender is missing for AMQP response routing".to_string(),
1541 )
1542 })?;
1543 let sender_identity = self.resolve_sender_identity_document(raw_message, sender_id)?;
1544 let sender_amqp_service = sender_identity
1545 .get("service")
1546 .and_then(Value::as_object)
1547 .and_then(|service| service.get("amqp"))
1548 .and_then(Value::as_object)
1549 .cloned()
1550 .ok_or_else(|| {
1551 AcpError::Transport(format!(
1552 "Sender {sender_id} does not advertise service.amqp for AMQP response delivery"
1553 ))
1554 })?;
1555 amqp_transport.publish(response_message, sender_id, Some(&sender_amqp_service))
1556 }
1557
1558 fn publish_mqtt_response_message(
1559 &mut self,
1560 raw_message: &Map<String, Value>,
1561 response_message: &Map<String, Value>,
1562 ) -> AcpResult<()> {
1563 let mqtt_transport = self
1564 .mqtt_transport
1565 .clone()
1566 .ok_or_else(|| AcpError::Transport("MQTT transport is not configured".to_string()))?;
1567 let sender_id = raw_message
1568 .get("envelope")
1569 .and_then(Value::as_object)
1570 .and_then(|envelope| envelope.get("sender"))
1571 .and_then(Value::as_str)
1572 .ok_or_else(|| {
1573 AcpError::Transport(
1574 "Inbound message sender is missing for MQTT response routing".to_string(),
1575 )
1576 })?;
1577 let sender_identity = self.resolve_sender_identity_document(raw_message, sender_id)?;
1578 let sender_mqtt_service = sender_identity
1579 .get("service")
1580 .and_then(Value::as_object)
1581 .and_then(|service| service.get("mqtt"))
1582 .and_then(Value::as_object)
1583 .cloned()
1584 .ok_or_else(|| {
1585 AcpError::Transport(format!(
1586 "Sender {sender_id} does not advertise service.mqtt for MQTT response delivery"
1587 ))
1588 })?;
1589 mqtt_transport.publish(response_message, sender_id, Some(&sender_mqtt_service))
1590 }
1591
1592 fn resolve_sender_identity_document(
1593 &mut self,
1594 raw_message: &Map<String, Value>,
1595 sender_id: &str,
1596 ) -> AcpResult<Map<String, Value>> {
1597 if let Some(embedded) = raw_message
1598 .get("sender_identity_document")
1599 .and_then(Value::as_object)
1600 .cloned()
1601 && embedded.get("agent_id").and_then(Value::as_str) == Some(sender_id)
1602 && verify_identity_document(&embedded)
1603 {
1604 let _ = self.discovery.register_identity_document(embedded.clone());
1607 return Ok(embedded);
1608 }
1609 self.discovery.resolve(sender_id)
1610 }
1611
1612 fn validate_envelope_for_inbound(&self, envelope: &Envelope) -> AcpResult<()> {
1613 if envelope.acp_version != ACP_VERSION {
1614 return Err(AcpError::Processing {
1615 reason: FailReason::UnsupportedVersion,
1616 detail: format!("Unsupported ACP version: {}", envelope.acp_version),
1617 });
1618 }
1619 if envelope.crypto_suite != DEFAULT_CRYPTO_SUITE {
1620 return Err(AcpError::Processing {
1621 reason: FailReason::UnsupportedCryptoSuite,
1622 detail: format!("Unsupported crypto suite: {}", envelope.crypto_suite),
1623 });
1624 }
1625 if envelope.is_expired() {
1626 return Err(AcpError::Processing {
1627 reason: FailReason::ExpiredMessage,
1628 detail: "Message is expired".to_string(),
1629 });
1630 }
1631 Ok(())
1632 }
1633
1634 fn create_response_message(
1635 &self,
1636 sender_identity_document: &Map<String, Value>,
1637 request_envelope: &Envelope,
1638 response_class: MessageClass,
1639 response_payload: Map<String, Value>,
1640 ) -> AcpResult<AcpMessage> {
1641 let sender_id = &request_envelope.sender;
1642 let sender_encryption_public_key = sender_identity_document
1643 .get("keys")
1644 .and_then(Value::as_object)
1645 .and_then(|keys| keys.get("encryption"))
1646 .and_then(Value::as_object)
1647 .and_then(|encryption| encryption.get("public_key"))
1648 .and_then(Value::as_str)
1649 .ok_or_else(|| AcpError::Processing {
1650 reason: FailReason::PolicyRejected,
1651 detail: "Sender identity document missing encryption key".to_string(),
1652 })?;
1653 self.build_message(
1654 vec![sender_id.to_string()],
1655 &response_payload,
1656 &HashMap::from([(
1657 sender_id.to_string(),
1658 sender_encryption_public_key.to_string(),
1659 )]),
1660 response_class,
1661 &request_envelope.context_id,
1662 Some(request_envelope.operation_id.clone()),
1663 300,
1664 request_envelope
1665 .correlation_id
1666 .clone()
1667 .or_else(|| Some(request_envelope.operation_id.clone())),
1668 Some(request_envelope.message_id.clone()),
1669 )
1670 }
1671
1672 fn sync_delivery_states(&mut self, operation_id: &str, outcomes: &[DeliveryOutcome]) {
1673 let mut states = HashMap::new();
1674 for outcome in outcomes {
1675 states.insert(
1676 outcome.recipient.clone(),
1677 format!("{:?}", outcome.state).to_uppercase(),
1678 );
1679 }
1680 self.delivery_states
1681 .insert(operation_id.to_string(), states);
1682 }
1683}
1684
1685#[derive(Debug, Clone)]
1686struct ResolvedRecipient {
1687 recipient: String,
1688 public_key: String,
1689 channel: String,
1690 endpoint: Option<String>,
1691 amqp_service: Option<Map<String, Value>>,
1692 mqtt_service: Option<Map<String, Value>>,
1693}
1694
1695#[derive(Debug, Clone)]
1696struct ResolvedRecipients {
1697 deliverable: Vec<ResolvedRecipient>,
1698 preflight_outcomes: Vec<DeliveryOutcome>,
1699}
1700
1701#[derive(Debug, Clone)]
1702struct ChannelChoice {
1703 channel: Option<String>,
1704 endpoint: Option<String>,
1705 amqp_service: Option<Map<String, Value>>,
1706 mqtt_service: Option<Map<String, Value>>,
1707 detail: Option<String>,
1708}
1709
1710impl ChannelChoice {
1711 fn new(
1712 channel: Option<String>,
1713 endpoint: Option<String>,
1714 amqp_service: Option<Map<String, Value>>,
1715 mqtt_service: Option<Map<String, Value>>,
1716 detail: Option<String>,
1717 ) -> Self {
1718 Self {
1719 channel,
1720 endpoint,
1721 amqp_service,
1722 mqtt_service,
1723 detail,
1724 }
1725 }
1726}
1727
1728#[derive(Debug, Clone)]
1729struct DedupStore {
1730 ttl: Duration,
1731 processed: HashMap<String, Instant>,
1732}
1733
1734impl DedupStore {
1735 fn new(ttl: Duration) -> Self {
1736 Self {
1737 ttl,
1738 processed: HashMap::new(),
1739 }
1740 }
1741
1742 fn is_duplicate(&mut self, message_id: &str) -> bool {
1743 self.cleanup_expired();
1744 self.processed.contains_key(message_id)
1745 }
1746
1747 fn mark_processed(&mut self, message_id: &str) {
1748 self.processed
1749 .insert(message_id.to_string(), Instant::now());
1750 }
1751
1752 fn cleanup_expired(&mut self) {
1753 let ttl = self.ttl;
1754 self.processed
1755 .retain(|_, timestamp| timestamp.elapsed() < ttl);
1756 }
1757}
1758
1759fn outcome_from_http_response(recipient: &str, response: &TransportResponse) -> DeliveryOutcome {
1760 let mut response_class = None;
1761 let mut response_message = None;
1762 let mut reason_code = None;
1763 let mut detail = None;
1764
1765 if let Some(body) = &response.body {
1766 if let Some(raw_response_message) = body.get("response_message").and_then(Value::as_object)
1767 {
1768 response_message = Some(raw_response_message.clone());
1769 response_class = raw_response_message
1770 .get("envelope")
1771 .and_then(Value::as_object)
1772 .and_then(|envelope| envelope.get("message_class"))
1773 .and_then(Value::as_str)
1774 .and_then(parse_message_class);
1775 }
1776 reason_code = body
1777 .get("reason_code")
1778 .and_then(Value::as_str)
1779 .map(str::to_string);
1780 detail = body
1781 .get("detail")
1782 .and_then(Value::as_str)
1783 .map(str::to_string);
1784 }
1785 if detail.is_none() && response.status_code >= 400 {
1786 detail = Some(format!("Recipient HTTP {}", response.status_code));
1787 }
1788
1789 DeliveryOutcome {
1790 recipient: recipient.to_string(),
1791 state: delivery_state_from_response(
1792 response.status_code,
1793 response_class,
1794 reason_code.as_deref(),
1795 ),
1796 status_code: Some(response.status_code),
1797 response_class,
1798 reason_code,
1799 detail,
1800 response_message,
1801 }
1802}
1803
1804fn delivery_state_from_response(
1805 status_code: u16,
1806 response_class: Option<MessageClass>,
1807 reason_code: Option<&str>,
1808) -> DeliveryState {
1809 if (200..300).contains(&status_code) {
1810 if response_class == Some(MessageClass::Fail) {
1811 if reason_code == Some(FailReason::ExpiredMessage.as_str()) {
1812 return DeliveryState::Expired;
1813 }
1814 if reason_code == Some(FailReason::PolicyRejected.as_str()) {
1815 return DeliveryState::Declined;
1816 }
1817 return DeliveryState::Failed;
1818 }
1819 if matches!(
1820 response_class,
1821 Some(MessageClass::Ack | MessageClass::Capabilities)
1822 ) {
1823 return DeliveryState::Acknowledged;
1824 }
1825 return DeliveryState::Delivered;
1826 }
1827 if status_code == 410 {
1828 return DeliveryState::Expired;
1829 }
1830 if [401, 403, 409, 422].contains(&status_code) {
1831 return DeliveryState::Declined;
1832 }
1833 DeliveryState::Failed
1834}
1835
1836fn parse_message_class(value: &str) -> Option<MessageClass> {
1837 match value {
1838 "SEND" => Some(MessageClass::Send),
1839 "ACK" => Some(MessageClass::Ack),
1840 "FAIL" => Some(MessageClass::Fail),
1841 "CAPABILITIES" => Some(MessageClass::Capabilities),
1842 "COMPENSATE" => Some(MessageClass::Compensate),
1843 _ => None,
1844 }
1845}
1846
1847fn failed_outcome(recipient: &str, reason_code: &str, detail: &str) -> DeliveryOutcome {
1848 DeliveryOutcome {
1849 recipient: recipient.to_string(),
1850 state: DeliveryState::Failed,
1851 status_code: None,
1852 response_class: None,
1853 reason_code: Some(reason_code.to_string()),
1854 detail: Some(detail.to_string()),
1855 response_message: None,
1856 }
1857}
1858
1859fn to_public_key_map(targets: &[ResolvedRecipient]) -> HashMap<String, String> {
1860 targets
1861 .iter()
1862 .map(|target| (target.recipient.clone(), target.public_key.clone()))
1863 .collect()
1864}
1865
1866fn reason_for_capability_mismatch(reason: Option<&str>) -> FailReason {
1867 let normalized = reason.unwrap_or_default().to_lowercase();
1868 if normalized.contains("protocol") {
1869 return FailReason::UnsupportedVersion;
1870 }
1871 if normalized.contains("crypto") {
1872 return FailReason::UnsupportedCryptoSuite;
1873 }
1874 if normalized.contains("profile") {
1875 return FailReason::UnsupportedProfile;
1876 }
1877 FailReason::PolicyRejected
1878}
1879
1880fn build_local_amqp_service(
1881 agent_id: &str,
1882 options: &AcpAgentOptions,
1883) -> AcpResult<Option<Map<String, Value>>> {
1884 let Some(broker_url) = options.amqp_broker_url.as_deref() else {
1885 return Ok(None);
1886 };
1887 Ok(Some(AmqpTransportClient::build_service_hint(
1888 agent_id,
1889 broker_url,
1890 Some(&options.amqp_exchange),
1891 )?))
1892}
1893
1894fn build_local_mqtt_service(
1895 agent_id: &str,
1896 options: &AcpAgentOptions,
1897) -> AcpResult<Option<Map<String, Value>>> {
1898 let Some(broker_url) = options.mqtt_broker_url.as_deref() else {
1899 return Ok(None);
1900 };
1901 Ok(Some(MqttTransportClient::build_service_hint(
1902 agent_id,
1903 broker_url,
1904 None,
1905 Some(options.mqtt_qos),
1906 Some(&options.mqtt_topic_prefix),
1907 )?))
1908}
1909
1910fn apply_http_security_profile(identity_document: &mut Map<String, Value>, mtls_enabled: bool) {
1911 if !mtls_enabled {
1912 return;
1913 }
1914 let mut service = identity_document
1915 .get("service")
1916 .and_then(Value::as_object)
1917 .cloned()
1918 .unwrap_or_default();
1919 let direct_endpoint = service
1920 .get("direct_endpoint")
1921 .and_then(Value::as_str)
1922 .map(str::to_string);
1923 let relay_hints = service
1924 .get("relay_hints")
1925 .and_then(Value::as_array)
1926 .map(|items| {
1927 items
1928 .iter()
1929 .filter_map(Value::as_str)
1930 .map(str::to_string)
1931 .collect::<Vec<_>>()
1932 })
1933 .unwrap_or_default();
1934 if let Some(endpoint) = direct_endpoint
1935 && !endpoint.trim().is_empty()
1936 {
1937 service.insert(
1938 "http".to_string(),
1939 serde_json::json!({
1940 "endpoint": endpoint,
1941 "security_profile": "mtls",
1942 }),
1943 );
1944 }
1945 if let Some(relay_endpoint) = relay_hints.first()
1946 && !relay_endpoint.trim().is_empty()
1947 {
1948 service.insert(
1949 "relay".to_string(),
1950 serde_json::json!({
1951 "endpoint": relay_endpoint,
1952 "security_profile": "mtls",
1953 }),
1954 );
1955 }
1956 identity_document.insert("service".to_string(), Value::Object(service));
1957}
1958
1959fn resolve_key_provider(options: &AcpAgentOptions) -> AcpResult<Box<dyn KeyProvider>> {
1960 if let Some(provider) = options.key_provider_instance.clone() {
1961 return Ok(Box::new(ArcKeyProvider(provider)));
1962 }
1963 let provider_name = normalize_key_provider_name(&options.key_provider);
1964 match provider_name.as_str() {
1965 "local" => Ok(Box::new(LocalKeyProvider::new(
1966 options.storage_dir.clone(),
1967 options.cert_file.clone(),
1968 options.key_file.clone(),
1969 options.ca_file.clone(),
1970 ))),
1971 "vault" => {
1972 let vault_url = options.vault_url.clone().ok_or_else(|| {
1973 AcpError::Validation("vault_url is required when key_provider=vault".to_string())
1974 })?;
1975 let vault_path = options.vault_path.clone().ok_or_else(|| {
1976 AcpError::Validation("vault_path is required when key_provider=vault".to_string())
1977 })?;
1978 Ok(Box::new(VaultKeyProvider::new(
1979 vault_url,
1980 vault_path,
1981 Some(options.vault_token_env.clone()),
1982 options.vault_token.clone(),
1983 options.http_timeout_seconds,
1984 options.ca_file.clone(),
1985 options.allow_insecure_tls,
1986 options.allow_insecure_http,
1987 )?))
1988 }
1989 _ => Err(AcpError::Validation(format!(
1990 "Unsupported key_provider: {}",
1991 options.key_provider
1992 ))),
1993 }
1994}
1995
1996fn normalize_key_provider_name(value: &str) -> String {
1997 let normalized = value.trim().to_lowercase();
1998 if normalized.is_empty() {
1999 "local".to_string()
2000 } else {
2001 normalized
2002 }
2003}
2004
2005fn matches_provider_local(key_provider_info: &KeyProviderInfo) -> bool {
2006 key_provider_info
2007 .get("provider")
2008 .and_then(Value::as_str)
2009 .map(|provider| provider == "local")
2010 .unwrap_or(false)
2011}
2012
2013fn identity_from_provider(agent_id: &str, keys: &IdentityKeyMaterial) -> AcpResult<AgentIdentity> {
2014 let mut missing = Vec::new();
2015 if keys
2016 .signing_public_key
2017 .as_deref()
2018 .map(str::trim)
2019 .filter(|v| !v.is_empty())
2020 .is_none()
2021 {
2022 missing.push("signing_public_key");
2023 }
2024 if keys
2025 .encryption_public_key
2026 .as_deref()
2027 .map(str::trim)
2028 .filter(|v| !v.is_empty())
2029 .is_none()
2030 {
2031 missing.push("encryption_public_key");
2032 }
2033 if keys
2034 .signing_kid
2035 .as_deref()
2036 .map(str::trim)
2037 .filter(|v| !v.is_empty())
2038 .is_none()
2039 {
2040 missing.push("signing_kid");
2041 }
2042 if keys
2043 .encryption_kid
2044 .as_deref()
2045 .map(str::trim)
2046 .filter(|v| !v.is_empty())
2047 .is_none()
2048 {
2049 missing.push("encryption_kid");
2050 }
2051 if !missing.is_empty() {
2052 return Err(AcpError::Validation(format!(
2053 "External key provider requires identity public metadata for first-time bootstrap: {}",
2054 missing.join(", ")
2055 )));
2056 }
2057 Ok(AgentIdentity {
2058 agent_id: agent_id.to_string(),
2059 signing_private_key: keys.signing_private_key.clone(),
2060 signing_public_key: keys.signing_public_key.clone().unwrap_or_default(),
2061 encryption_private_key: keys.encryption_private_key.clone(),
2062 encryption_public_key: keys.encryption_public_key.clone().unwrap_or_default(),
2063 signing_kid: keys.signing_kid.clone().unwrap_or_default(),
2064 encryption_kid: keys.encryption_kid.clone().unwrap_or_default(),
2065 })
2066}
2067
2068fn apply_provider_keys(
2069 identity: &AgentIdentity,
2070 keys: &IdentityKeyMaterial,
2071) -> AcpResult<AgentIdentity> {
2072 if let Some(signing_public_key) = keys.signing_public_key.as_deref()
2073 && signing_public_key != identity.signing_public_key
2074 {
2075 return Err(AcpError::Validation(
2076 "Key provider signing_public_key does not match local identity metadata".to_string(),
2077 ));
2078 }
2079 if let Some(encryption_public_key) = keys.encryption_public_key.as_deref()
2080 && encryption_public_key != identity.encryption_public_key
2081 {
2082 return Err(AcpError::Validation(
2083 "Key provider encryption_public_key does not match local identity metadata".to_string(),
2084 ));
2085 }
2086 if let Some(signing_kid) = keys.signing_kid.as_deref()
2087 && signing_kid != identity.signing_kid
2088 {
2089 return Err(AcpError::Validation(
2090 "Key provider signing_kid does not match local identity metadata".to_string(),
2091 ));
2092 }
2093 if let Some(encryption_kid) = keys.encryption_kid.as_deref()
2094 && encryption_kid != identity.encryption_kid
2095 {
2096 return Err(AcpError::Validation(
2097 "Key provider encryption_kid does not match local identity metadata".to_string(),
2098 ));
2099 }
2100 Ok(AgentIdentity {
2101 agent_id: identity.agent_id.clone(),
2102 signing_private_key: keys.signing_private_key.clone(),
2103 signing_public_key: identity.signing_public_key.clone(),
2104 encryption_private_key: keys.encryption_private_key.clone(),
2105 encryption_public_key: identity.encryption_public_key.clone(),
2106 signing_kid: identity.signing_kid.clone(),
2107 encryption_kid: identity.encryption_kid.clone(),
2108 })
2109}
2110
2111fn base_url_from_endpoint(endpoint: &str) -> Option<String> {
2112 let endpoint = endpoint.trim();
2113 if endpoint.is_empty() {
2114 return None;
2115 }
2116 let parsed = url::Url::parse(endpoint).ok()?;
2117 let scheme = parsed.scheme();
2118 let host = parsed.host_str()?;
2119 let authority = if let Some(port) = parsed.port() {
2120 format!("{host}:{port}")
2121 } else {
2122 host.to_string()
2123 };
2124 Some(format!("{scheme}://{authority}"))
2125}
2126
2127fn first_non_blank(values: &[Option<String>]) -> Option<String> {
2128 values
2129 .iter()
2130 .flatten()
2131 .map(|value| value.trim())
2132 .find(|value| !value.is_empty())
2133 .map(str::to_string)
2134}
2135
2136#[derive(Clone)]
2137struct ArcKeyProvider(Arc<dyn KeyProvider>);
2138
2139impl KeyProvider for ArcKeyProvider {
2140 fn load_identity_keys(&self, agent_id: &str) -> AcpResult<IdentityKeyMaterial> {
2141 self.0.load_identity_keys(agent_id)
2142 }
2143
2144 fn load_tls_material(&self, agent_id: &str) -> AcpResult<crate::key_provider::TlsMaterial> {
2145 self.0.load_tls_material(agent_id)
2146 }
2147
2148 fn load_ca_bundle(&self, agent_id: &str) -> AcpResult<Option<String>> {
2149 self.0.load_ca_bundle(agent_id)
2150 }
2151
2152 fn describe(&self) -> KeyProviderInfo {
2153 self.0.describe()
2154 }
2155}