tap_node/lib.rs
1//! TAP Node - A node implementation for the TAP protocol
2//!
3//! The TAP Node is the central component that manages TAP Agents, routes messages,
4//! processes events, stores transactions, and provides a scalable architecture for TAP deployments.
5//!
6//! # Architecture Overview
7//!
8//! The TAP Node acts as a message router and coordinator for multiple TAP Agents. It provides:
9//!
10//! - **Efficient Message Processing**: Different handling for plain, signed, and encrypted messages
11//! - **Centralized Verification**: Signed messages are verified once for all agents
12//! - **Smart Routing**: Encrypted messages are routed to appropriate recipient agents
13//! - **Scalable Design**: Supports multiple agents with concurrent message processing
14//!
15//! # Key Components
16//!
17//! - **Agent Registry**: Manages multiple TAP Agents
18//! - **Event Bus**: Publishes and distributes events throughout the system
19//! - **Message Processors**: Process incoming and outgoing messages
20//! - **Message Router**: Routes messages to the appropriate agent
21//! - **DID Resolver**: Resolves DIDs for signature verification
22//! - **Storage**: Persistent SQLite storage with transaction tracking and audit trails
23//!
24//! # Message Processing Flow
25//!
26//! The TAP Node uses an optimized message processing flow based on message type:
27//!
28//! ## Signed Messages (JWS)
29//! 1. **Single Verification**: Signature verified once using DID resolver
30//! 2. **Routing**: Verified PlainMessage routed to appropriate agent
31//! 3. **Processing**: Agent receives verified message via `receive_plain_message()`
32//!
33//! ## Encrypted Messages (JWE)
34//! 1. **Recipient Identification**: Extract recipient DIDs from JWE headers
35//! 2. **Agent Routing**: Send encrypted message to each matching agent
36//! 3. **Decryption**: Each agent attempts decryption via `receive_encrypted_message()`
37//! 4. **Processing**: Successfully decrypted messages are processed by the agent
38//!
39//! ## Plain Messages
40//! 1. **Direct Processing**: Plain messages processed through the pipeline
41//! 2. **Routing**: Routed to appropriate agent
42//! 3. **Delivery**: Agent receives via `receive_plain_message()`
43//!
44//! # Benefits of This Architecture
45//!
46//! - **Efficiency**: Signed messages verified once, not per-agent
47//! - **Scalability**: Encrypted messages naturally distributed to recipients
48//! - **Flexibility**: Agents remain fully functional standalone
49//! - **Security**: Centralized verification with distributed decryption
50//!
51//! # Thread Safety and Concurrency
52//!
53//! The TAP Node is designed with concurrent operations in mind. It uses a combination of
54//! async/await patterns and synchronization primitives to safely handle multiple operations
55//! simultaneously. Most components within the node are either immutable or use interior
56//! mutability with appropriate synchronization.
57//!
58//! # Example Usage
59//!
60//! ```rust,no_run
61//! use tap_node::{TapNode, NodeConfig};
62//! use tap_agent::TapAgent;
63//! use std::sync::Arc;
64//!
65//! #[tokio::main]
66//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
67//! // Create node
68//! let config = NodeConfig::default();
69//! let node = Arc::new(TapNode::new(config));
70//!
71//! // Create and register agent
72//! let (agent, _did) = TapAgent::from_ephemeral_key().await?;
73//! node.register_agent(Arc::new(agent)).await?;
74//!
75//! // Process incoming message (JSON Value)
76//! let message_value = serde_json::json!({
77//! "id": "msg-123",
78//! "type": "test-message",
79//! "body": {"content": "Hello"}
80//! });
81//!
82//! node.receive_message(message_value).await?;
83//! Ok(())
84//! }
85//! ```
86
87pub mod agent;
88#[cfg(feature = "storage")]
89pub mod customer;
90pub mod error;
91pub mod event;
92pub mod message;
93#[cfg(feature = "storage")]
94pub mod state_machine;
95pub mod storage;
96#[cfg(feature = "storage")]
97pub mod validation;
98
99pub use error::{Error, Result};
100pub use event::logger::{EventLogger, EventLoggerConfig, LogDestination};
101pub use event::{EventSubscriber, NodeEvent};
102pub use message::sender::{
103 HttpPlainMessageSender, HttpPlainMessageSenderWithTracking, NodePlainMessageSender,
104 PlainMessageSender, WebSocketPlainMessageSender,
105};
106#[cfg(feature = "storage")]
107pub use storage::{
108 models::{DeliveryStatus, DeliveryType},
109 Storage,
110};
111
112use std::sync::Arc;
113
114use tap_agent::{Agent, TapAgent};
115// use tap_agent::message_packing::PackOptions;
116use tap_msg::didcomm::PlainMessage;
117
118use crate::message::processor::PlainMessageProcessor;
119use crate::message::{
120 CompositePlainMessageProcessor, CompositePlainMessageRouter, PlainMessageProcessorType,
121 PlainMessageRouterType,
122};
123use agent::AgentRegistry;
124use event::EventBus;
125use tap_agent::did::MultiResolver;
126
127use async_trait::async_trait;
128
129// Extension trait for TapAgent to add serialization methods
130///
131/// This trait extends the TapAgent with methods for serializing and packing
132/// DIDComm messages for transmission. It provides functionality for converting
133/// in-memory message objects to secure, serialized formats that follow the
134/// DIDComm messaging protocol standards.
135#[async_trait]
136pub trait TapAgentExt {
137 /// Pack and serialize a DIDComm message for transmission
138 ///
139 /// This method takes a DIDComm message and recipient DID, then:
140 /// 1. Uses the agent's PlainMessagePacker to properly sign and encrypt the message
141 /// 2. Serializes the message to a string format
142 ///
143 /// # Parameters
144 /// * `message` - The DIDComm message to serialize
145 /// * `to_did` - The DID of the recipient
146 ///
147 /// # Returns
148 /// The packed message as a string, ready for transmission
149 async fn send_serialized_message(&self, message: &PlainMessage) -> Result<String>;
150}
151
152#[async_trait]
153impl TapAgentExt for TapAgent {
154 async fn send_serialized_message(&self, message: &PlainMessage) -> Result<String> {
155 // Serialize the PlainMessage to JSON first to work around the TapMessageBody trait constraint
156 let json_value =
157 serde_json::to_value(message).map_err(|e| Error::Serialization(e.to_string()))?;
158
159 // Use JSON string for transportation instead of direct message passing
160 // This bypasses the need for PlainMessage to implement TapMessageBody
161 let serialized =
162 serde_json::to_string(&json_value).map_err(|e| Error::Serialization(e.to_string()))?;
163
164 Ok(serialized)
165 }
166}
167
168/// Configuration for a TAP Node
169#[derive(Debug, Clone, Default)]
170pub struct NodeConfig {
171 /// Debug mode
172 pub debug: bool,
173 /// Maximum number of agents
174 pub max_agents: Option<usize>,
175 /// Whether to enable message logging
176 pub enable_message_logging: bool,
177 /// Whether to log full message content
178 pub log_message_content: bool,
179 /// Configuration for the processor pool
180 pub processor_pool: Option<ProcessorPoolConfig>,
181 /// Configuration for the event logger
182 pub event_logger: Option<EventLoggerConfig>,
183 /// Path to the storage database (None for default)
184 #[cfg(feature = "storage")]
185 pub storage_path: Option<std::path::PathBuf>,
186 /// Agent DID for storage organization
187 #[cfg(feature = "storage")]
188 pub agent_did: Option<String>,
189 /// Custom TAP root directory (defaults to ~/.tap)
190 #[cfg(feature = "storage")]
191 pub tap_root: Option<std::path::PathBuf>,
192 /// How the node handles FSM decision points.
193 ///
194 /// - `AutoApprove` (default): Automatically authorize and settle —
195 /// preserves the existing behavior.
196 /// - `EventBus`: Publish `NodeEvent::DecisionRequired` events for
197 /// external systems to handle. No automatic action is taken.
198 /// - `Custom(handler)`: Delegate to a caller-provided
199 /// [`DecisionHandler`](state_machine::fsm::DecisionHandler).
200 pub decision_mode: state_machine::fsm::DecisionMode,
201}
202
203/// # The TAP Node
204///
205/// The TAP Node is the core component responsible for coordinating message processing, routing, and delivery
206/// to TAP Agents. It serves as a central hub for all TAP communications and transaction coordination.
207///
208/// ## Core Responsibilities
209///
210/// - **Agent Management**: Registration and deregistration of TAP Agents
211/// - **PlainMessage Processing**: Processing incoming and outgoing messages through middleware chains
212/// - **PlainMessage Routing**: Determining the appropriate recipient for each message
213/// - **Event Publishing**: Broadcasting node events to subscribers
214/// - **Scalability**: Managing concurrent message processing through worker pools
215///
216/// ## Lifecycle
217///
218/// 1. Create a node with appropriate configuration
219/// 2. Register one or more agents with the node
220/// 3. Start the processor pool (if high throughput is required)
221/// 4. Process incoming/outgoing messages
222/// 5. Publish and respond to events
223///
224/// ## Thread Safety
225///
226/// The `TapNode` is designed to be thread-safe and can be shared across multiple
227/// threads using an `Arc<TapNode>`. All internal mutability is handled through
228/// appropriate synchronization primitives.
229#[derive(Clone)]
230pub struct TapNode {
231 /// Agent registry
232 agents: Arc<AgentRegistry>,
233 /// Event bus
234 event_bus: Arc<EventBus>,
235 /// Incoming message processor
236 incoming_processor: CompositePlainMessageProcessor,
237 /// Outgoing message processor
238 outgoing_processor: CompositePlainMessageProcessor,
239 /// PlainMessage router
240 router: CompositePlainMessageRouter,
241 /// Resolver for DIDs
242 resolver: Arc<MultiResolver>,
243 /// Worker pool for handling messages
244 processor_pool: Option<ProcessorPool>,
245 /// Node configuration
246 config: NodeConfig,
247 /// Storage for transactions (legacy centralized storage)
248 #[cfg(feature = "storage")]
249 storage: Option<Arc<storage::Storage>>,
250 /// Agent-specific storage manager
251 #[cfg(feature = "storage")]
252 agent_storage_manager: Option<Arc<storage::AgentStorageManager>>,
253 /// Transaction state processor
254 #[cfg(feature = "storage")]
255 state_processor: Option<Arc<state_machine::StandardTransactionProcessor>>,
256}
257
258impl TapNode {
259 /// Create a new TAP node with the given configuration
260 pub fn new(config: NodeConfig) -> Self {
261 // Create the agent registry
262 let agents = Arc::new(AgentRegistry::new(config.max_agents));
263
264 // Create the event bus
265 let event_bus = Arc::new(EventBus::new());
266
267 // Create the message router
268 let default_router = PlainMessageRouterType::Default(DefaultPlainMessageRouter::new());
269
270 let router = CompositePlainMessageRouter::new(vec![default_router]);
271
272 // Create the message processors
273 let logging_processor = PlainMessageProcessorType::Logging(LoggingPlainMessageProcessor);
274 let validation_processor =
275 PlainMessageProcessorType::Validation(ValidationPlainMessageProcessor);
276 let trust_ping_processor = PlainMessageProcessorType::TrustPing(
277 TrustPingProcessor::with_event_bus(event_bus.clone()),
278 );
279 let default_processor = PlainMessageProcessorType::Default(DefaultPlainMessageProcessor);
280
281 let incoming_processor = CompositePlainMessageProcessor::new(vec![
282 logging_processor.clone(),
283 validation_processor.clone(),
284 trust_ping_processor.clone(),
285 default_processor.clone(),
286 ]);
287
288 let outgoing_processor = CompositePlainMessageProcessor::new(vec![
289 logging_processor,
290 validation_processor,
291 trust_ping_processor,
292 default_processor,
293 ]);
294
295 // Create the resolver
296 let resolver = Arc::new(MultiResolver::default());
297
298 // Storage will be initialized on first use
299 #[cfg(feature = "storage")]
300 let storage = None;
301 #[cfg(feature = "storage")]
302 let agent_storage_manager = Some(Arc::new(storage::AgentStorageManager::new(
303 config.tap_root.clone(),
304 )));
305 #[cfg(feature = "storage")]
306 let state_processor = None;
307
308 let node = Self {
309 agents,
310 event_bus,
311 incoming_processor,
312 outgoing_processor,
313 router,
314 resolver,
315 processor_pool: None,
316 config,
317 #[cfg(feature = "storage")]
318 storage,
319 #[cfg(feature = "storage")]
320 agent_storage_manager,
321 #[cfg(feature = "storage")]
322 state_processor,
323 };
324
325 // Set up the event logger if configured
326 if let Some(logger_config) = &node.config.event_logger {
327 let event_logger = Arc::new(EventLogger::new(logger_config.clone()));
328
329 // We need to handle the async subscribe in a blocking context
330 // This is safe because EventBus methods are designed to be called in this way
331 let event_bus = node.event_bus.clone();
332 tokio::task::block_in_place(|| {
333 tokio::runtime::Handle::current().block_on(async {
334 event_bus.subscribe(event_logger).await;
335 })
336 });
337 }
338
339 node
340 }
341
342 /// Initialize storage asynchronously
343 #[cfg(feature = "storage")]
344 pub async fn init_storage(&mut self) -> Result<()> {
345 let storage = if let Some(agent_did) = &self.config.agent_did {
346 // Use new DID-based storage structure
347 match storage::Storage::new_with_did(agent_did, self.config.tap_root.clone()).await {
348 Ok(s) => s,
349 Err(e) => {
350 log::error!("Failed to initialize storage with DID: {}", e);
351 return Err(Error::Storage(e.to_string()));
352 }
353 }
354 } else if let Some(storage_path) = self.config.storage_path.clone() {
355 // Use explicit path
356 match storage::Storage::new(Some(storage_path)).await {
357 Ok(s) => s,
358 Err(e) => {
359 log::error!("Failed to initialize storage: {}", e);
360 return Err(Error::Storage(e.to_string()));
361 }
362 }
363 } else {
364 // Initialize with default path
365 match storage::Storage::new(None).await {
366 Ok(s) => s,
367 Err(e) => {
368 log::error!("Failed to initialize storage: {}", e);
369 return Err(Error::Storage(e.to_string()));
370 }
371 }
372 };
373
374 let storage_arc = Arc::new(storage);
375
376 // Subscribe event handlers
377 let message_status_handler = Arc::new(event::handlers::MessageStatusHandler::new(
378 storage_arc.clone(),
379 ));
380 self.event_bus.subscribe(message_status_handler).await;
381
382 let transaction_state_handler = Arc::new(event::handlers::TransactionStateHandler::new(
383 storage_arc.clone(),
384 ));
385 self.event_bus.subscribe(transaction_state_handler).await;
386
387 let transaction_audit_handler = Arc::new(event::handlers::TransactionAuditHandler::new());
388 self.event_bus.subscribe(transaction_audit_handler).await;
389
390 // Create state processor with configured decision mode
391 let state_processor = Arc::new(state_machine::StandardTransactionProcessor::new(
392 storage_arc.clone(),
393 self.event_bus.clone(),
394 self.agents.clone(),
395 self.config.decision_mode.clone(),
396 ));
397
398 self.storage = Some(storage_arc);
399 self.state_processor = Some(state_processor);
400 Ok(())
401 }
402
403 /// Start the node
404 pub async fn start(&mut self, config: ProcessorPoolConfig) -> Result<()> {
405 let processor_pool = ProcessorPool::new(config);
406 self.processor_pool = Some(processor_pool);
407 Ok(())
408 }
409
410 /// Receive and process an incoming message
411 ///
412 /// This method handles the complete lifecycle of an incoming message:
413 ///
414 /// 1. Determining the message type (plain, signed, or encrypted)
415 /// 2. Verifying signatures or routing to agents for decryption
416 /// 3. Processing the resulting plain messages through the pipeline
417 /// 4. Routing and dispatching to the appropriate agents
418 ///
419 /// # Parameters
420 ///
421 /// * `message` - The message as a JSON Value (can be plain, JWS, or JWE)
422 ///
423 /// # Returns
424 ///
425 /// * `Ok(())` if the message was successfully processed
426 /// * `Err(Error)` if there was an error during processing
427 pub async fn receive_message(&self, message: serde_json::Value) -> Result<()> {
428 // Default to internal source when no source is specified
429 self.receive_message_from_source(message, storage::SourceType::Internal, None)
430 .await
431 }
432
433 /// Receive and process an incoming message with source information
434 ///
435 /// This method handles the complete lifecycle of an incoming message with
436 /// tracking of where the message came from.
437 ///
438 /// # Parameters
439 ///
440 /// * `message` - The message as a JSON Value (can be plain, JWS, or JWE)
441 /// * `source_type` - The type of source (https, internal, websocket, etc.)
442 /// * `source_identifier` - Optional identifier for the source (URL, agent DID, etc.)
443 ///
444 /// # Returns
445 ///
446 /// * `Ok(())` if the message was successfully processed
447 /// * `Err(Error)` if there was an error during processing
448 pub async fn receive_message_from_source(
449 &self,
450 message: serde_json::Value,
451 source_type: storage::SourceType,
452 source_identifier: Option<&str>,
453 ) -> Result<()> {
454 // Store the raw message for logging
455 let raw_message = serde_json::to_string(&message).ok();
456
457 // Store the raw message in the received table
458 #[cfg(feature = "storage")]
459 let mut received_ids: Vec<(String, i64)> = Vec::new();
460
461 use tap_agent::{verify_jws, Jwe, Jws};
462
463 // Determine message type
464 let is_encrypted =
465 message.get("protected").is_some() && message.get("recipients").is_some();
466 let is_signed = message.get("payload").is_some()
467 && (message.get("signatures").is_some() || message.get("signature").is_some());
468
469 // Helper function to store received message in agent storage
470 #[cfg(feature = "storage")]
471 async fn store_received_for_agent(
472 storage_manager: &storage::AgentStorageManager,
473 agent_did: &str,
474 raw_message: &str,
475 source_type: &storage::SourceType,
476 source_identifier: Option<&str>,
477 ) -> Option<i64> {
478 match storage_manager.get_agent_storage(agent_did).await {
479 Ok(agent_storage) => {
480 match agent_storage
481 .create_received(raw_message, source_type.clone(), source_identifier)
482 .await
483 {
484 Ok(id) => {
485 log::debug!(
486 "Created received record {} for agent {} for incoming message",
487 id,
488 agent_did
489 );
490 Some(id)
491 }
492 Err(e) => {
493 log::warn!(
494 "Failed to create received record for agent {}: {}",
495 agent_did,
496 e
497 );
498 None
499 }
500 }
501 }
502 Err(e) => {
503 log::warn!("Failed to get storage for agent {}: {}", agent_did, e);
504 None
505 }
506 }
507 }
508
509 if is_signed {
510 // Verify signature once using resolver
511 let jws: Jws = serde_json::from_value(message)
512 .map_err(|e| Error::Serialization(format!("Failed to parse JWS: {}", e)))?;
513
514 let plain_message = verify_jws(&jws, &*self.resolver)
515 .await
516 .map_err(|e| Error::Verification(format!("JWS verification failed: {}", e)))?;
517
518 // Store in recipient agents' storage
519 #[cfg(feature = "storage")]
520 {
521 if let Some(ref storage_manager) = self.agent_storage_manager {
522 let empty_msg = "{}".to_string();
523 let raw_msg = raw_message.as_ref().unwrap_or(&empty_msg);
524
525 // Store for each recipient agent
526 for recipient_did in &plain_message.to {
527 if self.agents.has_agent(recipient_did) {
528 if let Some(id) = store_received_for_agent(
529 storage_manager,
530 recipient_did,
531 raw_msg,
532 &source_type,
533 source_identifier,
534 )
535 .await
536 {
537 received_ids.push((recipient_did.clone(), id));
538 }
539 }
540 }
541
542 // If no recipients are registered agents, store for sender if they're an agent
543 if received_ids.is_empty() && self.agents.has_agent(&plain_message.from) {
544 if let Some(id) = store_received_for_agent(
545 storage_manager,
546 &plain_message.from,
547 raw_msg,
548 &source_type,
549 source_identifier,
550 )
551 .await
552 {
553 received_ids.push((plain_message.from.clone(), id));
554 }
555 }
556 }
557 }
558
559 // Process the verified plain message
560 let result = self.process_plain_message(plain_message).await;
561
562 // Update the received records
563 #[cfg(feature = "storage")]
564 {
565 if let Some(ref storage_manager) = self.agent_storage_manager {
566 let status = if result.is_ok() {
567 storage::ReceivedStatus::Processed
568 } else {
569 storage::ReceivedStatus::Failed
570 };
571 let error_msg = result.as_ref().err().map(|e| e.to_string());
572
573 for (agent_did, received_id) in &received_ids {
574 if let Ok(agent_storage) =
575 storage_manager.get_agent_storage(agent_did).await
576 {
577 let _ = agent_storage
578 .update_received_status(
579 *received_id,
580 status.clone(),
581 None, // We'll set this after processing
582 error_msg.as_deref(),
583 )
584 .await;
585 }
586 }
587 }
588 }
589
590 result
591 } else if is_encrypted {
592 // Route encrypted message to each matching agent
593 let jwe: Jwe = serde_json::from_value(message.clone())
594 .map_err(|e| Error::Serialization(format!("Failed to parse JWE: {}", e)))?;
595
596 // Store in recipient agents' storage
597 #[cfg(feature = "storage")]
598 {
599 if let Some(ref storage_manager) = self.agent_storage_manager {
600 let empty_msg = "{}".to_string();
601 let raw_msg = raw_message.as_ref().unwrap_or(&empty_msg);
602
603 // Store for each recipient agent based on JWE recipients
604 for recipient in &jwe.recipients {
605 if let Some(did) = recipient.header.kid.split('#').next() {
606 if self.agents.has_agent(did) {
607 if let Some(id) = store_received_for_agent(
608 storage_manager,
609 did,
610 raw_msg,
611 &source_type,
612 source_identifier,
613 )
614 .await
615 {
616 received_ids.push((did.to_string(), id));
617 }
618 }
619 }
620 }
621 }
622 }
623
624 // Find agents that match recipients
625 let mut processed = false;
626 for recipient in &jwe.recipients {
627 if let Some(did) = recipient.header.kid.split('#').next() {
628 if let Ok(agent) = self.agents.get_agent(did).await {
629 // Let the agent handle decryption and processing
630 match agent.receive_encrypted_message(&message).await {
631 Ok(_) => {
632 processed = true;
633 log::debug!(
634 "Agent {} successfully processed encrypted message",
635 did
636 );
637 }
638 Err(e) => {
639 log::debug!(
640 "Agent {} couldn't process encrypted message: {}",
641 did,
642 e
643 );
644 }
645 }
646 }
647 }
648 }
649
650 let result = if !processed {
651 Err(Error::Processing(
652 "No agent could process the encrypted message".to_string(),
653 ))
654 } else {
655 Ok(())
656 };
657
658 // Update the received records for encrypted messages
659 #[cfg(feature = "storage")]
660 {
661 if let Some(ref storage_manager) = self.agent_storage_manager {
662 let status = if result.is_ok() {
663 storage::ReceivedStatus::Processed
664 } else {
665 storage::ReceivedStatus::Failed
666 };
667 let error_msg = result.as_ref().err().map(|e| e.to_string());
668
669 for (agent_did, received_id) in &received_ids {
670 if let Ok(agent_storage) =
671 storage_manager.get_agent_storage(agent_did).await
672 {
673 let _ = agent_storage
674 .update_received_status(
675 *received_id,
676 status.clone(),
677 None, // We don't have access to the decrypted message ID
678 error_msg.as_deref(),
679 )
680 .await;
681 }
682 }
683 }
684 }
685
686 result
687 } else {
688 // Plain message - parse and process
689 let plain_message: PlainMessage = serde_json::from_value(message).map_err(|e| {
690 Error::Serialization(format!("Failed to parse PlainMessage: {}", e))
691 })?;
692
693 // Store in recipient agents' storage
694 #[cfg(feature = "storage")]
695 {
696 if let Some(ref storage_manager) = self.agent_storage_manager {
697 let empty_msg = "{}".to_string();
698 let raw_msg = raw_message.as_ref().unwrap_or(&empty_msg);
699
700 // Store for each recipient agent
701 for recipient_did in &plain_message.to {
702 if self.agents.has_agent(recipient_did) {
703 if let Some(id) = store_received_for_agent(
704 storage_manager,
705 recipient_did,
706 raw_msg,
707 &source_type,
708 source_identifier,
709 )
710 .await
711 {
712 received_ids.push((recipient_did.clone(), id));
713 }
714 }
715 }
716
717 // If no recipients are registered agents, store for sender if they're an agent
718 if received_ids.is_empty() && self.agents.has_agent(&plain_message.from) {
719 if let Some(id) = store_received_for_agent(
720 storage_manager,
721 &plain_message.from,
722 raw_msg,
723 &source_type,
724 source_identifier,
725 )
726 .await
727 {
728 received_ids.push((plain_message.from.clone(), id));
729 }
730 }
731 }
732 }
733
734 let result = self.process_plain_message(plain_message).await;
735
736 // Update the received records
737 #[cfg(feature = "storage")]
738 {
739 if let Some(ref storage_manager) = self.agent_storage_manager {
740 let status = if result.is_ok() {
741 storage::ReceivedStatus::Processed
742 } else {
743 storage::ReceivedStatus::Failed
744 };
745 let error_msg = result.as_ref().err().map(|e| e.to_string());
746
747 for (agent_did, received_id) in &received_ids {
748 if let Ok(agent_storage) =
749 storage_manager.get_agent_storage(agent_did).await
750 {
751 let _ = agent_storage
752 .update_received_status(
753 *received_id,
754 status.clone(),
755 None, // We'll set this after processing
756 error_msg.as_deref(),
757 )
758 .await;
759 }
760 }
761 }
762 }
763
764 result
765 }
766 }
767
768 /// Process a plain message through the pipeline
769 async fn process_plain_message(&self, message: PlainMessage) -> Result<()> {
770 // Validate the message if storage/validation is available
771 #[cfg(feature = "storage")]
772 {
773 if let Some(ref storage) = self.storage {
774 // Create validator
775 let validator_config = validation::StandardValidatorConfig {
776 max_timestamp_drift_secs: 60,
777 storage: storage.clone(),
778 };
779 let validator = validation::create_standard_validator(validator_config).await;
780
781 // Validate the message
782 use crate::validation::{MessageValidator, ValidationResult};
783 match validator.validate(&message).await {
784 ValidationResult::Accept => {
785 // Publish accepted event
786 self.event_bus
787 .publish_message_accepted(
788 message.id.clone(),
789 message.type_.clone(),
790 message.from.clone(),
791 message.to.first().cloned().unwrap_or_default(),
792 )
793 .await;
794 }
795 ValidationResult::Reject(reason) => {
796 // Publish rejected event
797 self.event_bus
798 .publish_message_rejected(
799 message.id.clone(),
800 reason.clone(),
801 message.from.clone(),
802 message.to.first().cloned().unwrap_or_default(),
803 )
804 .await;
805
806 // Return error to stop processing
807 return Err(Error::Validation(reason));
808 }
809 }
810 }
811 }
812
813 // Process message through state machine if available
814 #[cfg(feature = "storage")]
815 {
816 if let Some(ref state_processor) = self.state_processor {
817 use crate::state_machine::TransactionStateProcessor;
818 if let Err(e) = state_processor.process_message(&message).await {
819 log::warn!("State processor error: {}", e);
820 // Don't fail the entire message processing, just log the error
821 }
822 }
823 }
824 // Log incoming messages to agent-specific storage
825 #[cfg(feature = "storage")]
826 {
827 if let Some(ref storage_manager) = self.agent_storage_manager {
828 // Check if this is a transaction message
829 let message_type_lower = message.type_.to_lowercase();
830 let is_transaction = message_type_lower.contains("transfer")
831 || message_type_lower.contains("payment");
832 log::debug!(
833 "Message type: {}, is_transaction: {}",
834 message.type_,
835 is_transaction
836 );
837
838 if is_transaction {
839 // For transactions, store in ALL involved agents' databases
840 let involved_agents = self.extract_transaction_agents(&message);
841
842 if involved_agents.is_empty() {
843 log::warn!("No registered agents found for transaction: {}", message.id);
844 } else {
845 log::debug!(
846 "Storing transaction {} in {} agent databases",
847 message.id,
848 involved_agents.len()
849 );
850
851 // Store transaction in each involved agent's database
852 for agent_did in &involved_agents {
853 if let Ok(agent_storage) =
854 storage_manager.get_agent_storage(agent_did).await
855 {
856 // Log the message
857 match agent_storage
858 .log_message(&message, storage::MessageDirection::Incoming)
859 .await
860 {
861 Ok(_) => log::debug!(
862 "Logged incoming message to agent {}: {}",
863 agent_did,
864 message.id
865 ),
866 Err(e) => log::warn!(
867 "Failed to log incoming message for agent {}: {}",
868 agent_did,
869 e
870 ),
871 }
872
873 // Store as transaction
874 match agent_storage.insert_transaction(&message).await {
875 Ok(()) => {
876 log::debug!(
877 "Stored transaction for agent {}: {}",
878 agent_did,
879 message.id
880 );
881
882 // Publish TransactionCreated event
883 // Use the message.id as the reference_id to fetch the transaction
884 if let Ok(Some(tx)) =
885 agent_storage.get_transaction_by_id(&message.id).await
886 {
887 self.event_bus
888 .publish_event(NodeEvent::TransactionCreated {
889 transaction: tx,
890 agent_did: agent_did.clone(),
891 })
892 .await;
893 }
894 }
895 Err(e) => log::warn!(
896 "Failed to store transaction for agent {}: {}",
897 agent_did,
898 e
899 ),
900 }
901 } else {
902 log::warn!("Failed to get storage for agent: {}", agent_did);
903 }
904 }
905 }
906 } else {
907 // For non-transaction messages, log to all recipient agents' storage
908 let mut logged_to_any = false;
909
910 for recipient_did in &message.to {
911 // Check if this recipient is a registered agent
912 if self.agents.has_agent(recipient_did) {
913 if let Ok(agent_storage) =
914 storage_manager.get_agent_storage(recipient_did).await
915 {
916 // Log the message to this recipient's storage
917 match agent_storage
918 .log_message(&message, storage::MessageDirection::Incoming)
919 .await
920 {
921 Ok(_) => {
922 log::debug!(
923 "Logged incoming message to recipient {}: {}",
924 recipient_did,
925 message.id
926 );
927 logged_to_any = true;
928 }
929 Err(e) => log::warn!(
930 "Failed to log incoming message for recipient {}: {}",
931 recipient_did,
932 e
933 ),
934 }
935 } else {
936 log::warn!(
937 "Failed to get storage for recipient: {}",
938 recipient_did
939 );
940 }
941 }
942 }
943
944 // If no recipients were logged, fall back to sender or router-based storage
945 if !logged_to_any {
946 match self.determine_message_agent(&message) {
947 Ok(agent_did) => {
948 if let Ok(agent_storage) =
949 storage_manager.get_agent_storage(&agent_did).await
950 {
951 // Log the message to the agent's storage
952 match agent_storage
953 .log_message(
954 &message,
955 storage::MessageDirection::Incoming,
956 )
957 .await
958 {
959 Ok(_) => log::debug!(
960 "Logged incoming message to fallback agent {}: {}",
961 agent_did,
962 message.id
963 ),
964 Err(e) => log::warn!(
965 "Failed to log incoming message for fallback agent {}: {}",
966 agent_did,
967 e
968 ),
969 }
970 } else {
971 log::warn!(
972 "Failed to get storage for fallback agent: {}",
973 agent_did
974 );
975 }
976 }
977 Err(e) => {
978 log::warn!(
979 "Failed to determine fallback agent for message storage: {}",
980 e
981 );
982 // Fall back to centralized storage if available
983 if let Some(ref storage) = self.storage {
984 let _ = storage
985 .log_message(&message, storage::MessageDirection::Incoming)
986 .await;
987 }
988 }
989 }
990 }
991 }
992 }
993 }
994
995 // Process the incoming message
996 let processed_message = match self.incoming_processor.process_incoming(message).await? {
997 Some(msg) => msg,
998 None => return Ok(()), // PlainMessage was dropped during processing
999 };
1000
1001 // Deliver the message to all recipients in the 'to' field
1002 let mut delivery_success = false;
1003
1004 for recipient_did in &processed_message.to {
1005 // Check if we have a registered agent for this recipient
1006 match self.agents.get_agent(recipient_did).await {
1007 Ok(agent) => {
1008 // Create delivery record for internal delivery tracking
1009 #[cfg(feature = "storage")]
1010 let delivery_id = if let Some(ref storage_manager) = self.agent_storage_manager
1011 {
1012 if let Ok(agent_storage) =
1013 storage_manager.get_agent_storage(recipient_did).await
1014 {
1015 // Serialize message for storage
1016 let message_text = serde_json::to_string(&processed_message)
1017 .unwrap_or_else(|_| "Failed to serialize message".to_string());
1018
1019 match agent_storage
1020 .create_delivery(
1021 &processed_message.id,
1022 &message_text,
1023 recipient_did,
1024 None, // No URL for internal delivery
1025 storage::models::DeliveryType::Internal,
1026 )
1027 .await
1028 {
1029 Ok(id) => {
1030 log::debug!(
1031 "Created internal delivery record {} for message {} to {}",
1032 id,
1033 processed_message.id,
1034 recipient_did
1035 );
1036 Some(id)
1037 }
1038 Err(e) => {
1039 log::warn!("Failed to create internal delivery record: {}", e);
1040 None
1041 }
1042 }
1043 } else {
1044 None
1045 }
1046 } else {
1047 None
1048 };
1049
1050 // Let the agent process the plain message
1051 match agent.receive_plain_message(processed_message.clone()).await {
1052 Ok(_) => {
1053 log::debug!(
1054 "Successfully delivered message to agent: {}",
1055 recipient_did
1056 );
1057 delivery_success = true;
1058
1059 // Update delivery record to success
1060 #[cfg(feature = "storage")]
1061 if let (Some(delivery_id), Some(ref storage_manager)) =
1062 (delivery_id, &self.agent_storage_manager)
1063 {
1064 if let Ok(agent_storage) =
1065 storage_manager.get_agent_storage(recipient_did).await
1066 {
1067 if let Err(e) = agent_storage
1068 .update_delivery_status(
1069 delivery_id,
1070 storage::models::DeliveryStatus::Success,
1071 None, // No HTTP status for internal delivery
1072 None, // No error message
1073 )
1074 .await
1075 {
1076 log::warn!("Failed to update internal delivery record to success: {}", e);
1077 }
1078 }
1079 }
1080 }
1081 Err(e) => {
1082 log::warn!("Agent {} failed to process message: {}", recipient_did, e);
1083
1084 // Update delivery record to failed
1085 #[cfg(feature = "storage")]
1086 if let (Some(delivery_id), Some(ref storage_manager)) =
1087 (delivery_id, &self.agent_storage_manager)
1088 {
1089 if let Ok(agent_storage) =
1090 storage_manager.get_agent_storage(recipient_did).await
1091 {
1092 if let Err(e2) = agent_storage
1093 .update_delivery_status(
1094 delivery_id,
1095 storage::models::DeliveryStatus::Failed,
1096 None, // No HTTP status for internal delivery
1097 Some(&e.to_string()), // Include error message
1098 )
1099 .await
1100 {
1101 log::warn!("Failed to update internal delivery record to failed: {}", e2);
1102 }
1103 }
1104 }
1105 }
1106 }
1107 }
1108 Err(e) => {
1109 log::debug!(
1110 "No registered agent found for recipient {}: {}",
1111 recipient_did,
1112 e
1113 );
1114 // This is not an error - the recipient might be external to this node
1115 }
1116 }
1117 }
1118
1119 // If no recipients were successfully processed, try the router as fallback
1120 if !delivery_success {
1121 let target_did = match self.router.route_message(&processed_message).await {
1122 Ok(did) => did,
1123 Err(e) => {
1124 log::warn!("Unable to route message and no recipients processed: {}", e);
1125 return Ok(());
1126 }
1127 };
1128
1129 // Get the agent
1130 let agent = match self.agents.get_agent(&target_did).await {
1131 Ok(a) => a,
1132 Err(e) => {
1133 log::warn!("Failed to get agent for dispatch: {}", e);
1134 return Ok(());
1135 }
1136 };
1137
1138 // Create delivery record for internal delivery tracking
1139 #[cfg(feature = "storage")]
1140 let delivery_id = if let Some(ref storage_manager) = self.agent_storage_manager {
1141 if let Ok(agent_storage) = storage_manager.get_agent_storage(&target_did).await {
1142 // Serialize message for storage
1143 let message_text = serde_json::to_string(&processed_message)
1144 .unwrap_or_else(|_| "Failed to serialize message".to_string());
1145
1146 match agent_storage
1147 .create_delivery(
1148 &processed_message.id,
1149 &message_text,
1150 &target_did,
1151 None, // No URL for internal delivery
1152 storage::models::DeliveryType::Internal,
1153 )
1154 .await
1155 {
1156 Ok(id) => {
1157 log::debug!(
1158 "Created internal delivery record {} for routed message {} to {}",
1159 id,
1160 processed_message.id,
1161 target_did
1162 );
1163 Some(id)
1164 }
1165 Err(e) => {
1166 log::warn!(
1167 "Failed to create internal delivery record for routing: {}",
1168 e
1169 );
1170 None
1171 }
1172 }
1173 } else {
1174 None
1175 }
1176 } else {
1177 None
1178 };
1179
1180 // Let the agent process the plain message
1181 match agent.receive_plain_message(processed_message).await {
1182 Ok(_) => {
1183 log::debug!("Successfully routed message to agent: {}", target_did);
1184
1185 // Update delivery record to success
1186 #[cfg(feature = "storage")]
1187 if let (Some(delivery_id), Some(ref storage_manager)) =
1188 (delivery_id, &self.agent_storage_manager)
1189 {
1190 if let Ok(agent_storage) =
1191 storage_manager.get_agent_storage(&target_did).await
1192 {
1193 if let Err(e) = agent_storage
1194 .update_delivery_status(
1195 delivery_id,
1196 storage::models::DeliveryStatus::Success,
1197 None, // No HTTP status for internal delivery
1198 None, // No error message
1199 )
1200 .await
1201 {
1202 log::warn!(
1203 "Failed to update routed delivery record to success: {}",
1204 e
1205 );
1206 }
1207 }
1208 }
1209 }
1210 Err(e) => {
1211 log::warn!("Agent failed to process message: {}", e);
1212
1213 // Update delivery record to failed
1214 #[cfg(feature = "storage")]
1215 if let (Some(delivery_id), Some(ref storage_manager)) =
1216 (delivery_id, &self.agent_storage_manager)
1217 {
1218 if let Ok(agent_storage) =
1219 storage_manager.get_agent_storage(&target_did).await
1220 {
1221 if let Err(e2) = agent_storage
1222 .update_delivery_status(
1223 delivery_id,
1224 storage::models::DeliveryStatus::Failed,
1225 None, // No HTTP status for internal delivery
1226 Some(&e.to_string()), // Include error message
1227 )
1228 .await
1229 {
1230 log::warn!(
1231 "Failed to update routed delivery record to failed: {}",
1232 e2
1233 );
1234 }
1235 }
1236 }
1237 }
1238 }
1239 }
1240
1241 Ok(())
1242 }
1243
1244 /// Dispatch a message to an agent by DID
1245 pub async fn dispatch_message(&self, target_did: String, message: PlainMessage) -> Result<()> {
1246 let agent = self.agents.get_agent(&target_did).await?;
1247
1248 // Convert the message to a packed format for transport
1249 let packed = agent.send_serialized_message(&message).await?;
1250
1251 // Publish an event for the dispatched message
1252 self.event_bus
1253 .publish_agent_message(target_did, packed.into_bytes())
1254 .await;
1255
1256 Ok(())
1257 }
1258
1259 /// Send a message to an agent
1260 ///
1261 /// This method now includes comprehensive delivery tracking and actual message delivery.
1262 /// For internal recipients (registered agents), messages are delivered directly.
1263 /// For external recipients, messages are delivered via HTTP with tracking.
1264 pub async fn send_message(&self, sender_did: String, message: PlainMessage) -> Result<String> {
1265 // Log outgoing messages to agent-specific storage
1266 #[cfg(feature = "storage")]
1267 {
1268 if let Some(ref storage_manager) = self.agent_storage_manager {
1269 // Check if this is a transaction message
1270 let message_type_lower = message.type_.to_lowercase();
1271 let is_transaction = message_type_lower.contains("transfer")
1272 || message_type_lower.contains("payment");
1273 log::debug!(
1274 "Message type: {}, is_transaction: {}",
1275 message.type_,
1276 is_transaction
1277 );
1278
1279 if is_transaction {
1280 // For transactions, store in ALL involved agents' databases
1281 let involved_agents = self.extract_transaction_agents(&message);
1282
1283 if involved_agents.is_empty() {
1284 log::warn!(
1285 "No registered agents found for outgoing transaction: {}",
1286 message.id
1287 );
1288 } else {
1289 log::debug!(
1290 "Storing outgoing transaction {} in {} agent databases",
1291 message.id,
1292 involved_agents.len()
1293 );
1294
1295 // Store transaction in each involved agent's database
1296 for agent_did in &involved_agents {
1297 if let Ok(agent_storage) =
1298 storage_manager.get_agent_storage(agent_did).await
1299 {
1300 // Log the message
1301 match agent_storage
1302 .log_message(&message, storage::MessageDirection::Outgoing)
1303 .await
1304 {
1305 Ok(_) => log::debug!(
1306 "Logged outgoing message to agent {}: {}",
1307 agent_did,
1308 message.id
1309 ),
1310 Err(e) => log::warn!(
1311 "Failed to log outgoing message for agent {}: {}",
1312 agent_did,
1313 e
1314 ),
1315 }
1316
1317 // Store as transaction
1318 match agent_storage.insert_transaction(&message).await {
1319 Ok(()) => {
1320 log::debug!(
1321 "Stored outgoing transaction for agent {}: {}",
1322 agent_did,
1323 message.id
1324 );
1325
1326 // Publish TransactionCreated event
1327 // Use the message.id as the reference_id to fetch the transaction
1328 if let Ok(Some(tx)) =
1329 agent_storage.get_transaction_by_id(&message.id).await
1330 {
1331 self.event_bus
1332 .publish_event(NodeEvent::TransactionCreated {
1333 transaction: tx,
1334 agent_did: agent_did.clone(),
1335 })
1336 .await;
1337 }
1338 }
1339 Err(e) => log::warn!(
1340 "Failed to store outgoing transaction for agent {}: {}",
1341 agent_did,
1342 e
1343 ),
1344 }
1345 } else {
1346 log::warn!("Failed to get storage for agent: {}", agent_did);
1347 }
1348 }
1349 }
1350 } else {
1351 // For non-transaction messages, just store in sender's storage
1352 if let Ok(sender_storage) = storage_manager.get_agent_storage(&sender_did).await
1353 {
1354 // Log the message to the sender's storage
1355 match sender_storage
1356 .log_message(&message, storage::MessageDirection::Outgoing)
1357 .await
1358 {
1359 Ok(_) => log::debug!(
1360 "Logged outgoing message for agent {}: {}",
1361 sender_did,
1362 message.id
1363 ),
1364 Err(e) => log::warn!(
1365 "Failed to log outgoing message for agent {}: {}",
1366 sender_did,
1367 e
1368 ),
1369 }
1370 } else {
1371 log::warn!("Failed to get storage for sender agent: {}", sender_did);
1372 // Fall back to centralized storage if available
1373 if let Some(ref storage) = self.storage {
1374 let _ = storage
1375 .log_message(&message, storage::MessageDirection::Outgoing)
1376 .await;
1377 }
1378 }
1379 }
1380 }
1381 }
1382
1383 // Process the outgoing message
1384 let processed_message = match self.outgoing_processor.process_outgoing(message).await? {
1385 Some(msg) => msg,
1386 None => {
1387 return Err(Error::MessageDropped(
1388 "PlainMessage dropped during processing".to_string(),
1389 ))
1390 }
1391 };
1392
1393 // Get the sender agent and its key manager
1394 let agent = self.agents.get_agent(&sender_did).await?;
1395 let key_manager = agent.key_manager();
1396
1397 // Determine security mode based on message type
1398 use tap_agent::message::SecurityMode;
1399 let security_mode = if processed_message.type_.contains("credential")
1400 || processed_message.type_.contains("transfer")
1401 || processed_message.type_.contains("payment")
1402 {
1403 SecurityMode::AuthCrypt
1404 } else {
1405 SecurityMode::Signed
1406 };
1407
1408 // Get sender key ID
1409 let sender_kid = agent.get_signing_kid().await?;
1410
1411 // For single recipient auth-crypt, get recipient key
1412 let recipient_kid =
1413 if processed_message.to.len() == 1 && security_mode == SecurityMode::AuthCrypt {
1414 Some(agent.get_encryption_kid(&processed_message.to[0]).await?)
1415 } else {
1416 None
1417 };
1418
1419 // Create pack options
1420 use tap_agent::message_packing::{PackOptions, Packable};
1421 let pack_options = PackOptions {
1422 security_mode,
1423 sender_kid: Some(sender_kid),
1424 recipient_kid,
1425 };
1426
1427 // Pack/sign the message properly
1428 let packed = processed_message.pack(&**key_manager, pack_options).await?;
1429
1430 // Deliver to all recipients in the message
1431 let mut delivery_errors = Vec::new();
1432
1433 for recipient_did in &processed_message.to {
1434 log::debug!("Processing delivery to recipient: {}", recipient_did);
1435
1436 // Check if recipient is a local agent (internal delivery) or external
1437 let is_internal_recipient = self.agents.get_agent(recipient_did).await.is_ok();
1438
1439 if is_internal_recipient {
1440 // Internal delivery - deliver to registered agent with tracking
1441 log::debug!("Delivering message internally to agent: {}", recipient_did);
1442
1443 #[cfg(feature = "storage")]
1444 let delivery_id = if let Some(ref storage_manager) = self.agent_storage_manager {
1445 if let Ok(sender_storage) = storage_manager.get_agent_storage(&sender_did).await
1446 {
1447 // Create delivery record for internal delivery
1448 match sender_storage
1449 .create_delivery(
1450 &processed_message.id,
1451 &packed, // Store the signed/packed message
1452 recipient_did,
1453 None, // No URL for internal delivery
1454 storage::models::DeliveryType::Internal,
1455 )
1456 .await
1457 {
1458 Ok(id) => {
1459 log::debug!(
1460 "Created internal delivery record {} for message {} to {}",
1461 id,
1462 processed_message.id,
1463 recipient_did
1464 );
1465 Some(id)
1466 }
1467 Err(e) => {
1468 log::warn!("Failed to create internal delivery record: {}", e);
1469 None
1470 }
1471 }
1472 } else {
1473 None
1474 }
1475 } else {
1476 None
1477 };
1478
1479 // Process the message internally through receive_message_from_source
1480 // to ensure it gets recorded in the received table
1481 // Pass the packed (signed) message just like external messages
1482 let message_value = match serde_json::from_str::<serde_json::Value>(&packed) {
1483 Ok(val) => val,
1484 Err(e) => {
1485 log::error!("Failed to parse packed message as JSON: {}", e);
1486 continue;
1487 }
1488 };
1489
1490 match self
1491 .receive_message_from_source(
1492 message_value,
1493 storage::SourceType::Internal,
1494 Some(&sender_did),
1495 )
1496 .await
1497 {
1498 Ok(_) => {
1499 log::debug!(
1500 "Successfully delivered message internally to: {}",
1501 recipient_did
1502 );
1503
1504 // Update delivery record to success
1505 #[cfg(feature = "storage")]
1506 if let (Some(delivery_id), Some(ref storage_manager)) =
1507 (delivery_id, &self.agent_storage_manager)
1508 {
1509 if let Ok(sender_storage) =
1510 storage_manager.get_agent_storage(&sender_did).await
1511 {
1512 if let Err(e) = sender_storage
1513 .update_delivery_status(
1514 delivery_id,
1515 storage::models::DeliveryStatus::Success,
1516 None, // No HTTP status for internal delivery
1517 None, // No error message
1518 )
1519 .await
1520 {
1521 log::warn!("Failed to update internal delivery status: {}", e);
1522 }
1523 }
1524 }
1525 }
1526 Err(e) => {
1527 log::error!(
1528 "Failed to deliver message internally to {}: {}",
1529 recipient_did,
1530 e
1531 );
1532
1533 // Update delivery record to failed
1534 #[cfg(feature = "storage")]
1535 if let (Some(delivery_id), Some(ref storage_manager)) =
1536 (delivery_id, &self.agent_storage_manager)
1537 {
1538 if let Ok(sender_storage) =
1539 storage_manager.get_agent_storage(&sender_did).await
1540 {
1541 if let Err(e2) = sender_storage
1542 .update_delivery_status(
1543 delivery_id,
1544 storage::models::DeliveryStatus::Failed,
1545 None, // No HTTP status for internal delivery
1546 Some(&format!("Internal delivery failed: {}", e)),
1547 )
1548 .await
1549 {
1550 log::warn!("Failed to update internal delivery status: {}", e2);
1551 }
1552 }
1553 }
1554
1555 delivery_errors.push((recipient_did.clone(), e));
1556 continue; // Continue to next recipient
1557 }
1558 }
1559 } else {
1560 // External delivery - use TapAgent's built-in HTTP delivery with tracking
1561 log::debug!("Attempting external delivery to: {}", recipient_did);
1562
1563 // Get the sender agent for HTTP delivery
1564 let sender_agent = self.agents.get_agent(&sender_did).await?;
1565
1566 // Resolve the service endpoint for the recipient
1567 let endpoint = match sender_agent.get_service_endpoint(recipient_did).await? {
1568 Some(ep) => ep,
1569 None => {
1570 log::warn!(
1571 "No service endpoint found for {}, delivery failed",
1572 recipient_did
1573 );
1574
1575 // Create failed delivery record
1576 #[cfg(feature = "storage")]
1577 if let Some(ref storage_manager) = self.agent_storage_manager {
1578 if let Ok(sender_storage) =
1579 storage_manager.get_agent_storage(&sender_did).await
1580 {
1581 if let Ok(delivery_id) = sender_storage
1582 .create_delivery(
1583 &processed_message.id,
1584 &packed,
1585 recipient_did,
1586 None,
1587 storage::models::DeliveryType::Https,
1588 )
1589 .await
1590 {
1591 let _ = sender_storage
1592 .update_delivery_status(
1593 delivery_id,
1594 storage::models::DeliveryStatus::Failed,
1595 None,
1596 Some("No service endpoint found for recipient"),
1597 )
1598 .await;
1599 }
1600 }
1601 }
1602
1603 delivery_errors.push((
1604 recipient_did.clone(),
1605 Error::Dispatch(format!(
1606 "No service endpoint found for recipient: {}",
1607 recipient_did
1608 )),
1609 ));
1610 continue; // Continue to next recipient
1611 }
1612 };
1613
1614 // Create delivery record before attempting delivery
1615 #[cfg(feature = "storage")]
1616 let delivery_id = if let Some(ref storage_manager) = self.agent_storage_manager {
1617 if let Ok(sender_storage) = storage_manager.get_agent_storage(&sender_did).await
1618 {
1619 match sender_storage
1620 .create_delivery(
1621 &processed_message.id,
1622 &packed, // Store the signed/packed message
1623 recipient_did,
1624 Some(&endpoint),
1625 storage::models::DeliveryType::Https,
1626 )
1627 .await
1628 {
1629 Ok(id) => {
1630 log::debug!(
1631 "Created external delivery record {} for message {} to {} at {}",
1632 id,
1633 processed_message.id,
1634 recipient_did,
1635 endpoint
1636 );
1637 Some(id)
1638 }
1639 Err(e) => {
1640 log::warn!("Failed to create external delivery record: {}", e);
1641 None
1642 }
1643 }
1644 } else {
1645 None
1646 }
1647 } else {
1648 None
1649 };
1650
1651 // Attempt HTTP delivery using TapAgent's built-in functionality
1652 match sender_agent.send_to_endpoint(&packed, &endpoint).await {
1653 Ok(status_code) => {
1654 log::debug!(
1655 "Successfully delivered message {} to {} at {} (HTTP {})",
1656 processed_message.id,
1657 recipient_did,
1658 endpoint,
1659 status_code
1660 );
1661
1662 // Update delivery record to success
1663 #[cfg(feature = "storage")]
1664 if let (Some(delivery_id), Some(ref storage_manager)) =
1665 (delivery_id, &self.agent_storage_manager)
1666 {
1667 if let Ok(sender_storage) =
1668 storage_manager.get_agent_storage(&sender_did).await
1669 {
1670 if let Err(e) = sender_storage
1671 .update_delivery_status(
1672 delivery_id,
1673 storage::models::DeliveryStatus::Success,
1674 Some(status_code as i32),
1675 None,
1676 )
1677 .await
1678 {
1679 log::warn!(
1680 "Failed to update external delivery status to success: {}",
1681 e
1682 );
1683 }
1684 }
1685 }
1686 }
1687 Err(e) => {
1688 log::error!(
1689 "Failed to deliver message {} to {} at {}: {}",
1690 processed_message.id,
1691 recipient_did,
1692 endpoint,
1693 e
1694 );
1695
1696 // Update delivery record to failed
1697 #[cfg(feature = "storage")]
1698 if let (Some(delivery_id), Some(ref storage_manager)) =
1699 (delivery_id, &self.agent_storage_manager)
1700 {
1701 if let Ok(sender_storage) =
1702 storage_manager.get_agent_storage(&sender_did).await
1703 {
1704 // Extract HTTP status code from error if possible
1705 let error_msg = e.to_string();
1706 let http_status_code = if error_msg.contains("status:") {
1707 error_msg
1708 .split("status:")
1709 .nth(1)
1710 .and_then(|s| s.split_whitespace().next())
1711 .and_then(|s| s.parse::<i32>().ok())
1712 } else {
1713 None
1714 };
1715
1716 if let Err(e2) = sender_storage
1717 .update_delivery_status(
1718 delivery_id,
1719 storage::models::DeliveryStatus::Failed,
1720 http_status_code,
1721 Some(&error_msg),
1722 )
1723 .await
1724 {
1725 log::warn!(
1726 "Failed to update external delivery status to failed: {}",
1727 e2
1728 );
1729 }
1730
1731 // Increment retry count for future retry processing
1732 if let Err(e2) = sender_storage
1733 .increment_delivery_retry_count(delivery_id)
1734 .await
1735 {
1736 log::warn!("Failed to increment retry count: {}", e2);
1737 }
1738 }
1739 }
1740
1741 delivery_errors.push((
1742 recipient_did.clone(),
1743 Error::Dispatch(format!(
1744 "HTTP delivery failed for {}: {}",
1745 recipient_did, e
1746 )),
1747 ));
1748 continue; // Continue to next recipient
1749 }
1750 }
1751 }
1752 }
1753
1754 // Check if all deliveries failed
1755 if !delivery_errors.is_empty() && delivery_errors.len() == processed_message.to.len() {
1756 return Err(Error::Dispatch(format!(
1757 "Failed to deliver message to all recipients: {:?}",
1758 delivery_errors
1759 )));
1760 }
1761
1762 // Log partial failures
1763 if !delivery_errors.is_empty() {
1764 log::warn!(
1765 "Message delivered to {}/{} recipients. Failures: {:?}",
1766 processed_message.to.len() - delivery_errors.len(),
1767 processed_message.to.len(),
1768 delivery_errors
1769 );
1770 }
1771
1772 // Publish an event for the message
1773 self.event_bus
1774 .publish_agent_message(sender_did, packed.clone().into_bytes())
1775 .await;
1776
1777 Ok(packed)
1778 }
1779
1780 /// Register a new agent with the node
1781 ///
1782 /// This method registers an agent with the TAP Node and automatically initializes
1783 /// DID-specific storage for the agent. The storage directory structure follows:
1784 /// - `~/.tap/{sanitized_did}/transactions.db` (default)
1785 /// - `{tap_root}/{sanitized_did}/transactions.db` (if custom TAP root is configured)
1786 ///
1787 /// # Storage Initialization
1788 ///
1789 /// When an agent is registered, a dedicated SQLite database is created for that agent's DID.
1790 /// This ensures transaction isolation between different agents while maintaining a consistent
1791 /// storage structure. If storage initialization fails, the agent registration continues but
1792 /// a warning is logged.
1793 ///
1794 /// # Arguments
1795 ///
1796 /// * `agent` - The TapAgent to register with the node
1797 ///
1798 /// # Returns
1799 ///
1800 /// * `Ok(())` if the agent was successfully registered
1801 /// * `Err(Error)` if agent registration fails
1802 pub async fn register_agent(&self, agent: Arc<TapAgent>) -> Result<()> {
1803 let agent_did = agent.get_agent_did().to_string();
1804
1805 // Initialize storage for this agent if storage is enabled
1806 #[cfg(feature = "storage")]
1807 {
1808 if let Some(ref storage_manager) = self.agent_storage_manager {
1809 match storage_manager.ensure_agent_storage(&agent_did).await {
1810 Ok(_) => {
1811 log::info!("Initialized storage for agent: {}", agent_did);
1812
1813 // Get the agent storage and register customer event handler
1814 if let Ok(agent_storage) =
1815 storage_manager.get_agent_storage(&agent_did).await
1816 {
1817 let customer_handler =
1818 Arc::new(event::customer_handler::CustomerEventHandler::new(
1819 agent_storage,
1820 agent_did.clone(),
1821 ));
1822 self.event_bus.subscribe(customer_handler).await;
1823 log::debug!(
1824 "Registered customer event handler for agent: {}",
1825 agent_did
1826 );
1827 }
1828 }
1829 Err(e) => {
1830 log::warn!(
1831 "Failed to initialize storage for agent {}: {}",
1832 agent_did,
1833 e
1834 );
1835 // Don't fail the registration, just log the warning
1836 }
1837 }
1838 }
1839 }
1840
1841 self.agents.register_agent(agent_did.clone(), agent).await?;
1842
1843 // Publish event about agent registration
1844 self.event_bus.publish_agent_registered(agent_did).await;
1845
1846 Ok(())
1847 }
1848
1849 /// Unregister an agent from the node
1850 pub async fn unregister_agent(&self, did: &str) -> Result<()> {
1851 self.agents.unregister_agent(did).await?;
1852
1853 // Publish event about agent registration
1854 self.event_bus
1855 .publish_agent_unregistered(did.to_string())
1856 .await;
1857
1858 Ok(())
1859 }
1860
1861 /// Get a list of registered agent DIDs
1862 pub fn list_agents(&self) -> Vec<String> {
1863 self.agents.get_all_dids()
1864 }
1865
1866 /// Get a reference to the agent registry
1867 pub fn agents(&self) -> &Arc<AgentRegistry> {
1868 &self.agents
1869 }
1870
1871 /// Get a reference to the event bus
1872 pub fn event_bus(&self) -> &Arc<EventBus> {
1873 &self.event_bus
1874 }
1875
1876 /// Get a reference to the resolver
1877 pub fn resolver(&self) -> &Arc<MultiResolver> {
1878 &self.resolver
1879 }
1880
1881 /// Get a mutable reference to the processor pool
1882 /// This is a reference to `Option<ProcessorPool>` to allow starting the pool after node creation
1883 pub fn processor_pool_mut(&mut self) -> &mut Option<ProcessorPool> {
1884 &mut self.processor_pool
1885 }
1886
1887 /// Get the node configuration
1888 pub fn config(&self) -> &NodeConfig {
1889 &self.config
1890 }
1891
1892 /// Set the decision mode at runtime.
1893 ///
1894 /// Call this after `init_storage()` but before processing any messages
1895 /// to configure how the FSM handles decision points.
1896 pub fn set_decision_mode(&mut self, mode: state_machine::fsm::DecisionMode) {
1897 self.config.decision_mode = mode;
1898 }
1899
1900 /// Get a reference to the storage (if available)
1901 #[cfg(feature = "storage")]
1902 pub fn storage(&self) -> Option<&Arc<storage::Storage>> {
1903 self.storage.as_ref()
1904 }
1905
1906 /// Get a reference to the agent storage manager (if available)
1907 #[cfg(feature = "storage")]
1908 pub fn agent_storage_manager(&self) -> Option<&Arc<storage::AgentStorageManager>> {
1909 self.agent_storage_manager.as_ref()
1910 }
1911
1912 /// Set storage for testing purposes
1913 /// This allows injecting in-memory databases for complete test isolation
1914 #[cfg(feature = "storage")]
1915 pub async fn set_storage(&mut self, storage: storage::Storage) -> Result<()> {
1916 let storage_arc = Arc::new(storage);
1917
1918 // Subscribe event handlers
1919 let message_status_handler = Arc::new(event::handlers::MessageStatusHandler::new(
1920 storage_arc.clone(),
1921 ));
1922 self.event_bus.subscribe(message_status_handler).await;
1923
1924 let transaction_state_handler = Arc::new(event::handlers::TransactionStateHandler::new(
1925 storage_arc.clone(),
1926 ));
1927 self.event_bus.subscribe(transaction_state_handler).await;
1928
1929 let transaction_audit_handler = Arc::new(event::handlers::TransactionAuditHandler::new());
1930 self.event_bus.subscribe(transaction_audit_handler).await;
1931
1932 // Create state processor with configured decision mode
1933 let state_processor = Arc::new(state_machine::StandardTransactionProcessor::new(
1934 storage_arc.clone(),
1935 self.event_bus.clone(),
1936 self.agents.clone(),
1937 self.config.decision_mode.clone(),
1938 ));
1939
1940 self.storage = Some(storage_arc);
1941 self.state_processor = Some(state_processor);
1942 Ok(())
1943 }
1944
1945 /// Determine which agent's storage should be used for a message
1946 ///
1947 /// This method uses the following strategy:
1948 /// 1. Use the first recipient DID if it's one of our registered agents
1949 /// 2. Use the sender DID if it's one of our registered agents
1950 /// 3. If no local agents are involved, fall back to the first recipient
1951 #[cfg(feature = "storage")]
1952 fn determine_message_agent(&self, message: &PlainMessage) -> Result<String> {
1953 // Strategy 1: Use the first recipient DID if it's one of our agents
1954 for recipient in &message.to {
1955 if self.agents.has_agent(recipient) {
1956 return Ok(recipient.clone());
1957 }
1958 }
1959
1960 // Strategy 2: Use the sender DID if it's one of our agents
1961 if self.agents.has_agent(&message.from) {
1962 return Ok(message.from.clone());
1963 }
1964
1965 // Strategy 3: If no local agents involved, fall back to first recipient
1966 if !message.to.is_empty() {
1967 return Ok(message.to[0].clone());
1968 }
1969
1970 Err(Error::Storage(
1971 "Cannot determine agent for message storage".to_string(),
1972 ))
1973 }
1974
1975 /// Extract all agent DIDs involved in a transaction
1976 ///
1977 /// For Transfer and Payment messages, this includes:
1978 /// - Originator/Customer from the message body
1979 /// - Beneficiary/Merchant from the message body
1980 /// - All agents in the agents array
1981 /// - Sender (from) and recipients (to) from the message envelope
1982 #[cfg(feature = "storage")]
1983 fn extract_transaction_agents(&self, message: &PlainMessage) -> Vec<String> {
1984 use std::collections::HashSet;
1985 let mut agent_dids = HashSet::new();
1986
1987 log::debug!("Extracting transaction agents for message: {}", message.id);
1988
1989 // Add sender and recipients from message envelope
1990 agent_dids.insert(message.from.clone());
1991 log::debug!("Added sender: {}", message.from);
1992
1993 for recipient in &message.to {
1994 agent_dids.insert(recipient.clone());
1995 log::debug!("Added recipient: {}", recipient);
1996 }
1997
1998 // Extract agents from message body based on type
1999 let message_type_lower = message.type_.to_lowercase();
2000 log::debug!("Message type: {}", message_type_lower);
2001
2002 if message_type_lower.contains("transfer") {
2003 // Parse Transfer message body
2004 if let Ok(transfer) =
2005 serde_json::from_value::<tap_msg::message::Transfer>(message.body.clone())
2006 {
2007 // Add originator if present
2008 if let Some(originator) = &transfer.originator {
2009 agent_dids.insert(originator.id.clone());
2010 log::debug!("Added originator: {}", originator.id);
2011 }
2012
2013 // Add beneficiary if present
2014 if let Some(beneficiary) = &transfer.beneficiary {
2015 agent_dids.insert(beneficiary.id.clone());
2016 log::debug!("Added beneficiary: {}", beneficiary.id);
2017 }
2018
2019 // Add all agents
2020 for agent in &transfer.agents {
2021 agent_dids.insert(agent.id.clone());
2022 log::debug!("Added agent: {}", agent.id);
2023 }
2024 } else {
2025 log::warn!("Failed to parse Transfer message body");
2026 }
2027 } else if message_type_lower.contains("payment") {
2028 // Parse Payment message body
2029 if let Ok(payment) =
2030 serde_json::from_value::<tap_msg::message::Payment>(message.body.clone())
2031 {
2032 // Add merchant
2033 agent_dids.insert(payment.merchant.id.clone());
2034 log::debug!("Added merchant: {}", payment.merchant.id);
2035
2036 // Add customer if present
2037 if let Some(customer) = &payment.customer {
2038 agent_dids.insert(customer.id.clone());
2039 log::debug!("Added customer: {}", customer.id);
2040 }
2041
2042 // Add all agents
2043 for agent in &payment.agents {
2044 agent_dids.insert(agent.id.clone());
2045 log::debug!("Added agent: {}", agent.id);
2046 }
2047 } else {
2048 log::warn!("Failed to parse Payment message body");
2049 }
2050 }
2051
2052 log::debug!("Total unique agents found: {}", agent_dids.len());
2053
2054 // Convert to Vec and filter to only include registered agents
2055 let registered_agents: Vec<String> = agent_dids
2056 .into_iter()
2057 .filter(|did| {
2058 let is_registered = self.agents.has_agent(did);
2059 log::debug!("Agent {} registered: {}", did, is_registered);
2060 is_registered
2061 })
2062 .collect();
2063
2064 log::debug!(
2065 "Registered agents involved in transaction: {:?}",
2066 registered_agents
2067 );
2068 registered_agents
2069 }
2070}
2071
2072// Namespace imports
2073// These imports make the implementation cleaner, but should be hidden from public API
2074use message::processor::DefaultPlainMessageProcessor;
2075use message::processor::LoggingPlainMessageProcessor;
2076use message::processor::ValidationPlainMessageProcessor;
2077use message::processor_pool::{ProcessorPool, ProcessorPoolConfig};
2078use message::router::DefaultPlainMessageRouter;
2079use message::trust_ping_processor::TrustPingProcessor;
2080use message::RouterAsyncExt;