Skip to main content

tap_node/
lib.rs

1//! TAP Node - A node implementation for the TAP protocol
2//!
3//! The TAP Node is the central component that manages TAP Agents, routes messages,
4//! processes events, stores transactions, and provides a scalable architecture for TAP deployments.
5//!
6//! # Architecture Overview
7//!
8//! The TAP Node acts as a message router and coordinator for multiple TAP Agents. It provides:
9//!
10//! - **Efficient Message Processing**: Different handling for plain, signed, and encrypted messages
11//! - **Centralized Verification**: Signed messages are verified once for all agents
12//! - **Smart Routing**: Encrypted messages are routed to appropriate recipient agents
13//! - **Scalable Design**: Supports multiple agents with concurrent message processing
14//!
15//! # Key Components
16//!
17//! - **Agent Registry**: Manages multiple TAP Agents
18//! - **Event Bus**: Publishes and distributes events throughout the system
19//! - **Message Processors**: Process incoming and outgoing messages
20//! - **Message Router**: Routes messages to the appropriate agent
21//! - **DID Resolver**: Resolves DIDs for signature verification
22//! - **Storage**: Persistent SQLite storage with transaction tracking and audit trails
23//!
24//! # Message Processing Flow
25//!
26//! The TAP Node uses an optimized message processing flow based on message type:
27//!
28//! ## Signed Messages (JWS)
29//! 1. **Single Verification**: Signature verified once using DID resolver
30//! 2. **Routing**: Verified PlainMessage routed to appropriate agent
31//! 3. **Processing**: Agent receives verified message via `receive_plain_message()`
32//!
33//! ## Encrypted Messages (JWE)  
34//! 1. **Recipient Identification**: Extract recipient DIDs from JWE headers
35//! 2. **Agent Routing**: Send encrypted message to each matching agent
36//! 3. **Decryption**: Each agent attempts decryption via `receive_encrypted_message()`
37//! 4. **Processing**: Successfully decrypted messages are processed by the agent
38//!
39//! ## Plain Messages
40//! 1. **Direct Processing**: Plain messages processed through the pipeline
41//! 2. **Routing**: Routed to appropriate agent
42//! 3. **Delivery**: Agent receives via `receive_plain_message()`
43//!
44//! # Benefits of This Architecture
45//!
46//! - **Efficiency**: Signed messages verified once, not per-agent
47//! - **Scalability**: Encrypted messages naturally distributed to recipients
48//! - **Flexibility**: Agents remain fully functional standalone
49//! - **Security**: Centralized verification with distributed decryption
50//!
51//! # Thread Safety and Concurrency
52//!
53//! The TAP Node is designed with concurrent operations in mind. It uses a combination of
54//! async/await patterns and synchronization primitives to safely handle multiple operations
55//! simultaneously. Most components within the node are either immutable or use interior
56//! mutability with appropriate synchronization.
57//!
58//! # Example Usage
59//!
60//! ```rust,no_run
61//! use tap_node::{TapNode, NodeConfig};
62//! use tap_agent::TapAgent;
63//! use std::sync::Arc;
64//!
65//! #[tokio::main]
66//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
67//!     // Create node
68//!     let config = NodeConfig::default();
69//!     let node = Arc::new(TapNode::new(config));
70//!     
71//!     // Create and register agent
72//!     let (agent, _did) = TapAgent::from_ephemeral_key().await?;
73//!     node.register_agent(Arc::new(agent)).await?;
74//!     
75//!     // Process incoming message (JSON Value)
76//!     let message_value = serde_json::json!({
77//!         "id": "msg-123",
78//!         "type": "test-message",
79//!         "body": {"content": "Hello"}
80//!     });
81//!     
82//!     node.receive_message(message_value).await?;
83//!     Ok(())
84//! }
85//! ```
86
87pub mod agent;
88#[cfg(feature = "storage")]
89pub mod customer;
90pub mod error;
91pub mod event;
92pub mod message;
93#[cfg(feature = "storage")]
94pub mod state_machine;
95pub mod storage;
96#[cfg(feature = "storage")]
97pub mod validation;
98
99pub use error::{Error, Result};
100pub use event::logger::{EventLogger, EventLoggerConfig, LogDestination};
101pub use event::{EventSubscriber, NodeEvent};
102pub use message::sender::{
103    HttpPlainMessageSender, HttpPlainMessageSenderWithTracking, NodePlainMessageSender,
104    PlainMessageSender, WebSocketPlainMessageSender,
105};
106#[cfg(feature = "storage")]
107pub use storage::{
108    models::{DeliveryStatus, DeliveryType},
109    Storage,
110};
111
112use std::sync::Arc;
113
114use tap_agent::{Agent, TapAgent};
115// use tap_agent::message_packing::PackOptions;
116use tap_msg::didcomm::PlainMessage;
117
118use crate::message::processor::PlainMessageProcessor;
119use crate::message::{
120    CompositePlainMessageProcessor, CompositePlainMessageRouter, PlainMessageProcessorType,
121    PlainMessageRouterType,
122};
123use agent::AgentRegistry;
124use event::EventBus;
125use tap_agent::did::MultiResolver;
126
127use async_trait::async_trait;
128
129// Extension trait for TapAgent to add serialization methods
130///
131/// This trait extends the TapAgent with methods for serializing and packing
132/// DIDComm messages for transmission. It provides functionality for converting
133/// in-memory message objects to secure, serialized formats that follow the
134/// DIDComm messaging protocol standards.
135#[async_trait]
136pub trait TapAgentExt {
137    /// Pack and serialize a DIDComm message for transmission
138    ///
139    /// This method takes a DIDComm message and recipient DID, then:
140    /// 1. Uses the agent's PlainMessagePacker to properly sign and encrypt the message
141    /// 2. Serializes the message to a string format
142    ///
143    /// # Parameters
144    /// * `message` - The DIDComm message to serialize
145    /// * `to_did` - The DID of the recipient
146    ///
147    /// # Returns
148    /// The packed message as a string, ready for transmission
149    async fn send_serialized_message(&self, message: &PlainMessage) -> Result<String>;
150}
151
152#[async_trait]
153impl TapAgentExt for TapAgent {
154    async fn send_serialized_message(&self, message: &PlainMessage) -> Result<String> {
155        // Serialize the PlainMessage to JSON first to work around the TapMessageBody trait constraint
156        let json_value =
157            serde_json::to_value(message).map_err(|e| Error::Serialization(e.to_string()))?;
158
159        // Use JSON string for transportation instead of direct message passing
160        // This bypasses the need for PlainMessage to implement TapMessageBody
161        let serialized =
162            serde_json::to_string(&json_value).map_err(|e| Error::Serialization(e.to_string()))?;
163
164        Ok(serialized)
165    }
166}
167
168/// Configuration for a TAP Node
169#[derive(Debug, Clone, Default)]
170pub struct NodeConfig {
171    /// Debug mode
172    pub debug: bool,
173    /// Maximum number of agents
174    pub max_agents: Option<usize>,
175    /// Whether to enable message logging
176    pub enable_message_logging: bool,
177    /// Whether to log full message content
178    pub log_message_content: bool,
179    /// Configuration for the processor pool
180    pub processor_pool: Option<ProcessorPoolConfig>,
181    /// Configuration for the event logger
182    pub event_logger: Option<EventLoggerConfig>,
183    /// Path to the storage database (None for default)
184    #[cfg(feature = "storage")]
185    pub storage_path: Option<std::path::PathBuf>,
186    /// Agent DID for storage organization
187    #[cfg(feature = "storage")]
188    pub agent_did: Option<String>,
189    /// Custom TAP root directory (defaults to ~/.tap)
190    #[cfg(feature = "storage")]
191    pub tap_root: Option<std::path::PathBuf>,
192    /// How the node handles FSM decision points.
193    ///
194    /// - `AutoApprove` (default): Automatically authorize and settle —
195    ///   preserves the existing behavior.
196    /// - `EventBus`: Publish `NodeEvent::DecisionRequired` events for
197    ///   external systems to handle. No automatic action is taken.
198    /// - `Custom(handler)`: Delegate to a caller-provided
199    ///   [`DecisionHandler`](state_machine::fsm::DecisionHandler).
200    pub decision_mode: state_machine::fsm::DecisionMode,
201}
202
203/// # The TAP Node
204///
205/// The TAP Node is the core component responsible for coordinating message processing, routing, and delivery
206/// to TAP Agents. It serves as a central hub for all TAP communications and transaction coordination.
207///
208/// ## Core Responsibilities
209///
210/// - **Agent Management**: Registration and deregistration of TAP Agents
211/// - **PlainMessage Processing**: Processing incoming and outgoing messages through middleware chains
212/// - **PlainMessage Routing**: Determining the appropriate recipient for each message
213/// - **Event Publishing**: Broadcasting node events to subscribers
214/// - **Scalability**: Managing concurrent message processing through worker pools
215///
216/// ## Lifecycle
217///
218/// 1. Create a node with appropriate configuration
219/// 2. Register one or more agents with the node
220/// 3. Start the processor pool (if high throughput is required)
221/// 4. Process incoming/outgoing messages
222/// 5. Publish and respond to events
223///
224/// ## Thread Safety
225///
226/// The `TapNode` is designed to be thread-safe and can be shared across multiple
227/// threads using an `Arc<TapNode>`. All internal mutability is handled through
228/// appropriate synchronization primitives.
229#[derive(Clone)]
230pub struct TapNode {
231    /// Agent registry
232    agents: Arc<AgentRegistry>,
233    /// Event bus
234    event_bus: Arc<EventBus>,
235    /// Incoming message processor
236    incoming_processor: CompositePlainMessageProcessor,
237    /// Outgoing message processor
238    outgoing_processor: CompositePlainMessageProcessor,
239    /// PlainMessage router
240    router: CompositePlainMessageRouter,
241    /// Resolver for DIDs
242    resolver: Arc<MultiResolver>,
243    /// Worker pool for handling messages
244    processor_pool: Option<ProcessorPool>,
245    /// Node configuration
246    config: NodeConfig,
247    /// Storage for transactions (legacy centralized storage)
248    #[cfg(feature = "storage")]
249    storage: Option<Arc<storage::Storage>>,
250    /// Agent-specific storage manager
251    #[cfg(feature = "storage")]
252    agent_storage_manager: Option<Arc<storage::AgentStorageManager>>,
253    /// Transaction state processor
254    #[cfg(feature = "storage")]
255    state_processor: Option<Arc<state_machine::StandardTransactionProcessor>>,
256}
257
258impl TapNode {
259    /// Create a new TAP node with the given configuration
260    pub fn new(config: NodeConfig) -> Self {
261        // Create the agent registry
262        let agents = Arc::new(AgentRegistry::new(config.max_agents));
263
264        // Create the event bus
265        let event_bus = Arc::new(EventBus::new());
266
267        // Create the message router
268        let default_router = PlainMessageRouterType::Default(DefaultPlainMessageRouter::new());
269
270        let router = CompositePlainMessageRouter::new(vec![default_router]);
271
272        // Create the message processors
273        let logging_processor = PlainMessageProcessorType::Logging(LoggingPlainMessageProcessor);
274        let validation_processor =
275            PlainMessageProcessorType::Validation(ValidationPlainMessageProcessor);
276        let trust_ping_processor = PlainMessageProcessorType::TrustPing(
277            TrustPingProcessor::with_event_bus(event_bus.clone()),
278        );
279        let default_processor = PlainMessageProcessorType::Default(DefaultPlainMessageProcessor);
280
281        let incoming_processor = CompositePlainMessageProcessor::new(vec![
282            logging_processor.clone(),
283            validation_processor.clone(),
284            trust_ping_processor.clone(),
285            default_processor.clone(),
286        ]);
287
288        let outgoing_processor = CompositePlainMessageProcessor::new(vec![
289            logging_processor,
290            validation_processor,
291            trust_ping_processor,
292            default_processor,
293        ]);
294
295        // Create the resolver
296        let resolver = Arc::new(MultiResolver::default());
297
298        // Storage will be initialized on first use
299        #[cfg(feature = "storage")]
300        let storage = None;
301        #[cfg(feature = "storage")]
302        let agent_storage_manager = Some(Arc::new(storage::AgentStorageManager::new(
303            config.tap_root.clone(),
304        )));
305        #[cfg(feature = "storage")]
306        let state_processor = None;
307
308        let node = Self {
309            agents,
310            event_bus,
311            incoming_processor,
312            outgoing_processor,
313            router,
314            resolver,
315            processor_pool: None,
316            config,
317            #[cfg(feature = "storage")]
318            storage,
319            #[cfg(feature = "storage")]
320            agent_storage_manager,
321            #[cfg(feature = "storage")]
322            state_processor,
323        };
324
325        // Set up the event logger if configured
326        if let Some(logger_config) = &node.config.event_logger {
327            let event_logger = Arc::new(EventLogger::new(logger_config.clone()));
328
329            // We need to handle the async subscribe in a blocking context
330            // This is safe because EventBus methods are designed to be called in this way
331            let event_bus = node.event_bus.clone();
332            tokio::task::block_in_place(|| {
333                tokio::runtime::Handle::current().block_on(async {
334                    event_bus.subscribe(event_logger).await;
335                })
336            });
337        }
338
339        node
340    }
341
342    /// Initialize storage asynchronously
343    #[cfg(feature = "storage")]
344    pub async fn init_storage(&mut self) -> Result<()> {
345        let storage = if let Some(agent_did) = &self.config.agent_did {
346            // Use new DID-based storage structure
347            match storage::Storage::new_with_did(agent_did, self.config.tap_root.clone()).await {
348                Ok(s) => s,
349                Err(e) => {
350                    log::error!("Failed to initialize storage with DID: {}", e);
351                    return Err(Error::Storage(e.to_string()));
352                }
353            }
354        } else if let Some(storage_path) = self.config.storage_path.clone() {
355            // Use explicit path
356            match storage::Storage::new(Some(storage_path)).await {
357                Ok(s) => s,
358                Err(e) => {
359                    log::error!("Failed to initialize storage: {}", e);
360                    return Err(Error::Storage(e.to_string()));
361                }
362            }
363        } else {
364            // Initialize with default path
365            match storage::Storage::new(None).await {
366                Ok(s) => s,
367                Err(e) => {
368                    log::error!("Failed to initialize storage: {}", e);
369                    return Err(Error::Storage(e.to_string()));
370                }
371            }
372        };
373
374        let storage_arc = Arc::new(storage);
375
376        // Subscribe event handlers
377        let message_status_handler = Arc::new(event::handlers::MessageStatusHandler::new(
378            storage_arc.clone(),
379        ));
380        self.event_bus.subscribe(message_status_handler).await;
381
382        let transaction_state_handler = Arc::new(event::handlers::TransactionStateHandler::new(
383            storage_arc.clone(),
384        ));
385        self.event_bus.subscribe(transaction_state_handler).await;
386
387        let transaction_audit_handler = Arc::new(event::handlers::TransactionAuditHandler::new());
388        self.event_bus.subscribe(transaction_audit_handler).await;
389
390        // Create state processor with configured decision mode
391        let state_processor = Arc::new(state_machine::StandardTransactionProcessor::new(
392            storage_arc.clone(),
393            self.event_bus.clone(),
394            self.agents.clone(),
395            self.config.decision_mode.clone(),
396        ));
397
398        self.storage = Some(storage_arc);
399        self.state_processor = Some(state_processor);
400        Ok(())
401    }
402
403    /// Start the node
404    pub async fn start(&mut self, config: ProcessorPoolConfig) -> Result<()> {
405        let processor_pool = ProcessorPool::new(config);
406        self.processor_pool = Some(processor_pool);
407        Ok(())
408    }
409
410    /// Receive and process an incoming message
411    ///
412    /// This method handles the complete lifecycle of an incoming message:
413    ///
414    /// 1. Determining the message type (plain, signed, or encrypted)
415    /// 2. Verifying signatures or routing to agents for decryption
416    /// 3. Processing the resulting plain messages through the pipeline
417    /// 4. Routing and dispatching to the appropriate agents
418    ///
419    /// # Parameters
420    ///
421    /// * `message` - The message as a JSON Value (can be plain, JWS, or JWE)
422    ///
423    /// # Returns
424    ///
425    /// * `Ok(())` if the message was successfully processed
426    /// * `Err(Error)` if there was an error during processing
427    pub async fn receive_message(&self, message: serde_json::Value) -> Result<()> {
428        // Default to internal source when no source is specified
429        self.receive_message_from_source(message, storage::SourceType::Internal, None)
430            .await
431    }
432
433    /// Receive and process an incoming message with source information
434    ///
435    /// This method handles the complete lifecycle of an incoming message with
436    /// tracking of where the message came from.
437    ///
438    /// # Parameters
439    ///
440    /// * `message` - The message as a JSON Value (can be plain, JWS, or JWE)
441    /// * `source_type` - The type of source (https, internal, websocket, etc.)
442    /// * `source_identifier` - Optional identifier for the source (URL, agent DID, etc.)
443    ///
444    /// # Returns
445    ///
446    /// * `Ok(())` if the message was successfully processed
447    /// * `Err(Error)` if there was an error during processing
448    pub async fn receive_message_from_source(
449        &self,
450        message: serde_json::Value,
451        source_type: storage::SourceType,
452        source_identifier: Option<&str>,
453    ) -> Result<()> {
454        // Store the raw message for logging
455        let raw_message = serde_json::to_string(&message).ok();
456
457        // Store the raw message in the received table
458        #[cfg(feature = "storage")]
459        let mut received_ids: Vec<(String, i64)> = Vec::new();
460
461        use tap_agent::{verify_jws, Jwe, Jws};
462
463        // Determine message type
464        let is_encrypted =
465            message.get("protected").is_some() && message.get("recipients").is_some();
466        let is_signed = message.get("payload").is_some()
467            && (message.get("signatures").is_some() || message.get("signature").is_some());
468
469        // Helper function to store received message in agent storage
470        #[cfg(feature = "storage")]
471        async fn store_received_for_agent(
472            storage_manager: &storage::AgentStorageManager,
473            agent_did: &str,
474            raw_message: &str,
475            source_type: &storage::SourceType,
476            source_identifier: Option<&str>,
477        ) -> Option<i64> {
478            match storage_manager.get_agent_storage(agent_did).await {
479                Ok(agent_storage) => {
480                    match agent_storage
481                        .create_received(raw_message, source_type.clone(), source_identifier)
482                        .await
483                    {
484                        Ok(id) => {
485                            log::debug!(
486                                "Created received record {} for agent {} for incoming message",
487                                id,
488                                agent_did
489                            );
490                            Some(id)
491                        }
492                        Err(e) => {
493                            log::warn!(
494                                "Failed to create received record for agent {}: {}",
495                                agent_did,
496                                e
497                            );
498                            None
499                        }
500                    }
501                }
502                Err(e) => {
503                    log::warn!("Failed to get storage for agent {}: {}", agent_did, e);
504                    None
505                }
506            }
507        }
508
509        if is_signed {
510            // Verify signature once using resolver
511            let jws: Jws = serde_json::from_value(message)
512                .map_err(|e| Error::Serialization(format!("Failed to parse JWS: {}", e)))?;
513
514            let plain_message = verify_jws(&jws, &*self.resolver)
515                .await
516                .map_err(|e| Error::Verification(format!("JWS verification failed: {}", e)))?;
517
518            // Store in recipient agents' storage
519            #[cfg(feature = "storage")]
520            {
521                if let Some(ref storage_manager) = self.agent_storage_manager {
522                    let empty_msg = "{}".to_string();
523                    let raw_msg = raw_message.as_ref().unwrap_or(&empty_msg);
524
525                    // Store for each recipient agent
526                    for recipient_did in &plain_message.to {
527                        if self.agents.has_agent(recipient_did) {
528                            if let Some(id) = store_received_for_agent(
529                                storage_manager,
530                                recipient_did,
531                                raw_msg,
532                                &source_type,
533                                source_identifier,
534                            )
535                            .await
536                            {
537                                received_ids.push((recipient_did.clone(), id));
538                            }
539                        }
540                    }
541
542                    // If no recipients are registered agents, store for sender if they're an agent
543                    if received_ids.is_empty() && self.agents.has_agent(&plain_message.from) {
544                        if let Some(id) = store_received_for_agent(
545                            storage_manager,
546                            &plain_message.from,
547                            raw_msg,
548                            &source_type,
549                            source_identifier,
550                        )
551                        .await
552                        {
553                            received_ids.push((plain_message.from.clone(), id));
554                        }
555                    }
556                }
557            }
558
559            // Process the verified plain message
560            let result = self.process_plain_message(plain_message).await;
561
562            // Update the received records
563            #[cfg(feature = "storage")]
564            {
565                if let Some(ref storage_manager) = self.agent_storage_manager {
566                    let status = if result.is_ok() {
567                        storage::ReceivedStatus::Processed
568                    } else {
569                        storage::ReceivedStatus::Failed
570                    };
571                    let error_msg = result.as_ref().err().map(|e| e.to_string());
572
573                    for (agent_did, received_id) in &received_ids {
574                        if let Ok(agent_storage) =
575                            storage_manager.get_agent_storage(agent_did).await
576                        {
577                            let _ = agent_storage
578                                .update_received_status(
579                                    *received_id,
580                                    status.clone(),
581                                    None, // We'll set this after processing
582                                    error_msg.as_deref(),
583                                )
584                                .await;
585                        }
586                    }
587                }
588            }
589
590            result
591        } else if is_encrypted {
592            // Route encrypted message to each matching agent
593            let jwe: Jwe = serde_json::from_value(message.clone())
594                .map_err(|e| Error::Serialization(format!("Failed to parse JWE: {}", e)))?;
595
596            // Store in recipient agents' storage
597            #[cfg(feature = "storage")]
598            {
599                if let Some(ref storage_manager) = self.agent_storage_manager {
600                    let empty_msg = "{}".to_string();
601                    let raw_msg = raw_message.as_ref().unwrap_or(&empty_msg);
602
603                    // Store for each recipient agent based on JWE recipients
604                    for recipient in &jwe.recipients {
605                        if let Some(did) = recipient.header.kid.split('#').next() {
606                            if self.agents.has_agent(did) {
607                                if let Some(id) = store_received_for_agent(
608                                    storage_manager,
609                                    did,
610                                    raw_msg,
611                                    &source_type,
612                                    source_identifier,
613                                )
614                                .await
615                                {
616                                    received_ids.push((did.to_string(), id));
617                                }
618                            }
619                        }
620                    }
621                }
622            }
623
624            // Find agents that match recipients
625            let mut processed = false;
626            for recipient in &jwe.recipients {
627                if let Some(did) = recipient.header.kid.split('#').next() {
628                    if let Ok(agent) = self.agents.get_agent(did).await {
629                        // Let the agent handle decryption and processing
630                        match agent.receive_encrypted_message(&message).await {
631                            Ok(_) => {
632                                processed = true;
633                                log::debug!(
634                                    "Agent {} successfully processed encrypted message",
635                                    did
636                                );
637                            }
638                            Err(e) => {
639                                log::debug!(
640                                    "Agent {} couldn't process encrypted message: {}",
641                                    did,
642                                    e
643                                );
644                            }
645                        }
646                    }
647                }
648            }
649
650            let result = if !processed {
651                Err(Error::Processing(
652                    "No agent could process the encrypted message".to_string(),
653                ))
654            } else {
655                Ok(())
656            };
657
658            // Update the received records for encrypted messages
659            #[cfg(feature = "storage")]
660            {
661                if let Some(ref storage_manager) = self.agent_storage_manager {
662                    let status = if result.is_ok() {
663                        storage::ReceivedStatus::Processed
664                    } else {
665                        storage::ReceivedStatus::Failed
666                    };
667                    let error_msg = result.as_ref().err().map(|e| e.to_string());
668
669                    for (agent_did, received_id) in &received_ids {
670                        if let Ok(agent_storage) =
671                            storage_manager.get_agent_storage(agent_did).await
672                        {
673                            let _ = agent_storage
674                                .update_received_status(
675                                    *received_id,
676                                    status.clone(),
677                                    None, // We don't have access to the decrypted message ID
678                                    error_msg.as_deref(),
679                                )
680                                .await;
681                        }
682                    }
683                }
684            }
685
686            result
687        } else {
688            // Plain message - parse and process
689            let plain_message: PlainMessage = serde_json::from_value(message).map_err(|e| {
690                Error::Serialization(format!("Failed to parse PlainMessage: {}", e))
691            })?;
692
693            // Store in recipient agents' storage
694            #[cfg(feature = "storage")]
695            {
696                if let Some(ref storage_manager) = self.agent_storage_manager {
697                    let empty_msg = "{}".to_string();
698                    let raw_msg = raw_message.as_ref().unwrap_or(&empty_msg);
699
700                    // Store for each recipient agent
701                    for recipient_did in &plain_message.to {
702                        if self.agents.has_agent(recipient_did) {
703                            if let Some(id) = store_received_for_agent(
704                                storage_manager,
705                                recipient_did,
706                                raw_msg,
707                                &source_type,
708                                source_identifier,
709                            )
710                            .await
711                            {
712                                received_ids.push((recipient_did.clone(), id));
713                            }
714                        }
715                    }
716
717                    // If no recipients are registered agents, store for sender if they're an agent
718                    if received_ids.is_empty() && self.agents.has_agent(&plain_message.from) {
719                        if let Some(id) = store_received_for_agent(
720                            storage_manager,
721                            &plain_message.from,
722                            raw_msg,
723                            &source_type,
724                            source_identifier,
725                        )
726                        .await
727                        {
728                            received_ids.push((plain_message.from.clone(), id));
729                        }
730                    }
731                }
732            }
733
734            let result = self.process_plain_message(plain_message).await;
735
736            // Update the received records
737            #[cfg(feature = "storage")]
738            {
739                if let Some(ref storage_manager) = self.agent_storage_manager {
740                    let status = if result.is_ok() {
741                        storage::ReceivedStatus::Processed
742                    } else {
743                        storage::ReceivedStatus::Failed
744                    };
745                    let error_msg = result.as_ref().err().map(|e| e.to_string());
746
747                    for (agent_did, received_id) in &received_ids {
748                        if let Ok(agent_storage) =
749                            storage_manager.get_agent_storage(agent_did).await
750                        {
751                            let _ = agent_storage
752                                .update_received_status(
753                                    *received_id,
754                                    status.clone(),
755                                    None, // We'll set this after processing
756                                    error_msg.as_deref(),
757                                )
758                                .await;
759                        }
760                    }
761                }
762            }
763
764            result
765        }
766    }
767
768    /// Process a plain message through the pipeline
769    async fn process_plain_message(&self, message: PlainMessage) -> Result<()> {
770        // Validate the message if storage/validation is available
771        #[cfg(feature = "storage")]
772        {
773            if let Some(ref storage) = self.storage {
774                // Create validator
775                let validator_config = validation::StandardValidatorConfig {
776                    max_timestamp_drift_secs: 60,
777                    storage: storage.clone(),
778                };
779                let validator = validation::create_standard_validator(validator_config).await;
780
781                // Validate the message
782                use crate::validation::{MessageValidator, ValidationResult};
783                match validator.validate(&message).await {
784                    ValidationResult::Accept => {
785                        // Publish accepted event
786                        self.event_bus
787                            .publish_message_accepted(
788                                message.id.clone(),
789                                message.type_.clone(),
790                                message.from.clone(),
791                                message.to.first().cloned().unwrap_or_default(),
792                            )
793                            .await;
794                    }
795                    ValidationResult::Reject(reason) => {
796                        // Publish rejected event
797                        self.event_bus
798                            .publish_message_rejected(
799                                message.id.clone(),
800                                reason.clone(),
801                                message.from.clone(),
802                                message.to.first().cloned().unwrap_or_default(),
803                            )
804                            .await;
805
806                        // Return error to stop processing
807                        return Err(Error::Validation(reason));
808                    }
809                }
810            }
811        }
812
813        // Process message through state machine if available
814        #[cfg(feature = "storage")]
815        {
816            if let Some(ref state_processor) = self.state_processor {
817                use crate::state_machine::TransactionStateProcessor;
818                if let Err(e) = state_processor.process_message(&message).await {
819                    log::warn!("State processor error: {}", e);
820                    // Don't fail the entire message processing, just log the error
821                }
822            }
823        }
824        // Log incoming messages to agent-specific storage
825        #[cfg(feature = "storage")]
826        {
827            if let Some(ref storage_manager) = self.agent_storage_manager {
828                // Check if this is a transaction message
829                let message_type_lower = message.type_.to_lowercase();
830                let is_transaction = message_type_lower.contains("transfer")
831                    || message_type_lower.contains("payment");
832                log::debug!(
833                    "Message type: {}, is_transaction: {}",
834                    message.type_,
835                    is_transaction
836                );
837
838                if is_transaction {
839                    // For transactions, store in ALL involved agents' databases
840                    let involved_agents = self.extract_transaction_agents(&message);
841
842                    if involved_agents.is_empty() {
843                        log::warn!("No registered agents found for transaction: {}", message.id);
844                    } else {
845                        log::debug!(
846                            "Storing transaction {} in {} agent databases",
847                            message.id,
848                            involved_agents.len()
849                        );
850
851                        // Store transaction in each involved agent's database
852                        for agent_did in &involved_agents {
853                            if let Ok(agent_storage) =
854                                storage_manager.get_agent_storage(agent_did).await
855                            {
856                                // Log the message
857                                match agent_storage
858                                    .log_message(&message, storage::MessageDirection::Incoming)
859                                    .await
860                                {
861                                    Ok(_) => log::debug!(
862                                        "Logged incoming message to agent {}: {}",
863                                        agent_did,
864                                        message.id
865                                    ),
866                                    Err(e) => log::warn!(
867                                        "Failed to log incoming message for agent {}: {}",
868                                        agent_did,
869                                        e
870                                    ),
871                                }
872
873                                // Store as transaction
874                                match agent_storage.insert_transaction(&message).await {
875                                    Ok(()) => {
876                                        log::debug!(
877                                            "Stored transaction for agent {}: {}",
878                                            agent_did,
879                                            message.id
880                                        );
881
882                                        // Publish TransactionCreated event
883                                        // Use the message.id as the reference_id to fetch the transaction
884                                        if let Ok(Some(tx)) =
885                                            agent_storage.get_transaction_by_id(&message.id).await
886                                        {
887                                            self.event_bus
888                                                .publish_event(NodeEvent::TransactionCreated {
889                                                    transaction: tx,
890                                                    agent_did: agent_did.clone(),
891                                                })
892                                                .await;
893                                        }
894                                    }
895                                    Err(e) => log::warn!(
896                                        "Failed to store transaction for agent {}: {}",
897                                        agent_did,
898                                        e
899                                    ),
900                                }
901                            } else {
902                                log::warn!("Failed to get storage for agent: {}", agent_did);
903                            }
904                        }
905                    }
906                } else {
907                    // For non-transaction messages, log to all recipient agents' storage
908                    let mut logged_to_any = false;
909
910                    for recipient_did in &message.to {
911                        // Check if this recipient is a registered agent
912                        if self.agents.has_agent(recipient_did) {
913                            if let Ok(agent_storage) =
914                                storage_manager.get_agent_storage(recipient_did).await
915                            {
916                                // Log the message to this recipient's storage
917                                match agent_storage
918                                    .log_message(&message, storage::MessageDirection::Incoming)
919                                    .await
920                                {
921                                    Ok(_) => {
922                                        log::debug!(
923                                            "Logged incoming message to recipient {}: {}",
924                                            recipient_did,
925                                            message.id
926                                        );
927                                        logged_to_any = true;
928                                    }
929                                    Err(e) => log::warn!(
930                                        "Failed to log incoming message for recipient {}: {}",
931                                        recipient_did,
932                                        e
933                                    ),
934                                }
935                            } else {
936                                log::warn!(
937                                    "Failed to get storage for recipient: {}",
938                                    recipient_did
939                                );
940                            }
941                        }
942                    }
943
944                    // If no recipients were logged, fall back to sender or router-based storage
945                    if !logged_to_any {
946                        match self.determine_message_agent(&message) {
947                            Ok(agent_did) => {
948                                if let Ok(agent_storage) =
949                                    storage_manager.get_agent_storage(&agent_did).await
950                                {
951                                    // Log the message to the agent's storage
952                                    match agent_storage
953                                        .log_message(
954                                            &message,
955                                            storage::MessageDirection::Incoming,
956                                        )
957                                        .await
958                                    {
959                                        Ok(_) => log::debug!(
960                                            "Logged incoming message to fallback agent {}: {}",
961                                            agent_did,
962                                            message.id
963                                        ),
964                                        Err(e) => log::warn!(
965                                            "Failed to log incoming message for fallback agent {}: {}",
966                                            agent_did,
967                                            e
968                                        ),
969                                    }
970                                } else {
971                                    log::warn!(
972                                        "Failed to get storage for fallback agent: {}",
973                                        agent_did
974                                    );
975                                }
976                            }
977                            Err(e) => {
978                                log::warn!(
979                                    "Failed to determine fallback agent for message storage: {}",
980                                    e
981                                );
982                                // Fall back to centralized storage if available
983                                if let Some(ref storage) = self.storage {
984                                    let _ = storage
985                                        .log_message(&message, storage::MessageDirection::Incoming)
986                                        .await;
987                                }
988                            }
989                        }
990                    }
991                }
992            }
993        }
994
995        // Process the incoming message
996        let processed_message = match self.incoming_processor.process_incoming(message).await? {
997            Some(msg) => msg,
998            None => return Ok(()), // PlainMessage was dropped during processing
999        };
1000
1001        // Deliver the message to all recipients in the 'to' field
1002        let mut delivery_success = false;
1003
1004        for recipient_did in &processed_message.to {
1005            // Check if we have a registered agent for this recipient
1006            match self.agents.get_agent(recipient_did).await {
1007                Ok(agent) => {
1008                    // Create delivery record for internal delivery tracking
1009                    #[cfg(feature = "storage")]
1010                    let delivery_id = if let Some(ref storage_manager) = self.agent_storage_manager
1011                    {
1012                        if let Ok(agent_storage) =
1013                            storage_manager.get_agent_storage(recipient_did).await
1014                        {
1015                            // Serialize message for storage
1016                            let message_text = serde_json::to_string(&processed_message)
1017                                .unwrap_or_else(|_| "Failed to serialize message".to_string());
1018
1019                            match agent_storage
1020                                .create_delivery(
1021                                    &processed_message.id,
1022                                    &message_text,
1023                                    recipient_did,
1024                                    None, // No URL for internal delivery
1025                                    storage::models::DeliveryType::Internal,
1026                                )
1027                                .await
1028                            {
1029                                Ok(id) => {
1030                                    log::debug!(
1031                                        "Created internal delivery record {} for message {} to {}",
1032                                        id,
1033                                        processed_message.id,
1034                                        recipient_did
1035                                    );
1036                                    Some(id)
1037                                }
1038                                Err(e) => {
1039                                    log::warn!("Failed to create internal delivery record: {}", e);
1040                                    None
1041                                }
1042                            }
1043                        } else {
1044                            None
1045                        }
1046                    } else {
1047                        None
1048                    };
1049
1050                    // Let the agent process the plain message
1051                    match agent.receive_plain_message(processed_message.clone()).await {
1052                        Ok(_) => {
1053                            log::debug!(
1054                                "Successfully delivered message to agent: {}",
1055                                recipient_did
1056                            );
1057                            delivery_success = true;
1058
1059                            // Update delivery record to success
1060                            #[cfg(feature = "storage")]
1061                            if let (Some(delivery_id), Some(ref storage_manager)) =
1062                                (delivery_id, &self.agent_storage_manager)
1063                            {
1064                                if let Ok(agent_storage) =
1065                                    storage_manager.get_agent_storage(recipient_did).await
1066                                {
1067                                    if let Err(e) = agent_storage
1068                                        .update_delivery_status(
1069                                            delivery_id,
1070                                            storage::models::DeliveryStatus::Success,
1071                                            None, // No HTTP status for internal delivery
1072                                            None, // No error message
1073                                        )
1074                                        .await
1075                                    {
1076                                        log::warn!("Failed to update internal delivery record to success: {}", e);
1077                                    }
1078                                }
1079                            }
1080                        }
1081                        Err(e) => {
1082                            log::warn!("Agent {} failed to process message: {}", recipient_did, e);
1083
1084                            // Update delivery record to failed
1085                            #[cfg(feature = "storage")]
1086                            if let (Some(delivery_id), Some(ref storage_manager)) =
1087                                (delivery_id, &self.agent_storage_manager)
1088                            {
1089                                if let Ok(agent_storage) =
1090                                    storage_manager.get_agent_storage(recipient_did).await
1091                                {
1092                                    if let Err(e2) = agent_storage
1093                                        .update_delivery_status(
1094                                            delivery_id,
1095                                            storage::models::DeliveryStatus::Failed,
1096                                            None, // No HTTP status for internal delivery
1097                                            Some(&e.to_string()), // Include error message
1098                                        )
1099                                        .await
1100                                    {
1101                                        log::warn!("Failed to update internal delivery record to failed: {}", e2);
1102                                    }
1103                                }
1104                            }
1105                        }
1106                    }
1107                }
1108                Err(e) => {
1109                    log::debug!(
1110                        "No registered agent found for recipient {}: {}",
1111                        recipient_did,
1112                        e
1113                    );
1114                    // This is not an error - the recipient might be external to this node
1115                }
1116            }
1117        }
1118
1119        // If no recipients were successfully processed, try the router as fallback
1120        if !delivery_success {
1121            let target_did = match self.router.route_message(&processed_message).await {
1122                Ok(did) => did,
1123                Err(e) => {
1124                    log::warn!("Unable to route message and no recipients processed: {}", e);
1125                    return Ok(());
1126                }
1127            };
1128
1129            // Get the agent
1130            let agent = match self.agents.get_agent(&target_did).await {
1131                Ok(a) => a,
1132                Err(e) => {
1133                    log::warn!("Failed to get agent for dispatch: {}", e);
1134                    return Ok(());
1135                }
1136            };
1137
1138            // Create delivery record for internal delivery tracking
1139            #[cfg(feature = "storage")]
1140            let delivery_id = if let Some(ref storage_manager) = self.agent_storage_manager {
1141                if let Ok(agent_storage) = storage_manager.get_agent_storage(&target_did).await {
1142                    // Serialize message for storage
1143                    let message_text = serde_json::to_string(&processed_message)
1144                        .unwrap_or_else(|_| "Failed to serialize message".to_string());
1145
1146                    match agent_storage
1147                        .create_delivery(
1148                            &processed_message.id,
1149                            &message_text,
1150                            &target_did,
1151                            None, // No URL for internal delivery
1152                            storage::models::DeliveryType::Internal,
1153                        )
1154                        .await
1155                    {
1156                        Ok(id) => {
1157                            log::debug!(
1158                                "Created internal delivery record {} for routed message {} to {}",
1159                                id,
1160                                processed_message.id,
1161                                target_did
1162                            );
1163                            Some(id)
1164                        }
1165                        Err(e) => {
1166                            log::warn!(
1167                                "Failed to create internal delivery record for routing: {}",
1168                                e
1169                            );
1170                            None
1171                        }
1172                    }
1173                } else {
1174                    None
1175                }
1176            } else {
1177                None
1178            };
1179
1180            // Let the agent process the plain message
1181            match agent.receive_plain_message(processed_message).await {
1182                Ok(_) => {
1183                    log::debug!("Successfully routed message to agent: {}", target_did);
1184
1185                    // Update delivery record to success
1186                    #[cfg(feature = "storage")]
1187                    if let (Some(delivery_id), Some(ref storage_manager)) =
1188                        (delivery_id, &self.agent_storage_manager)
1189                    {
1190                        if let Ok(agent_storage) =
1191                            storage_manager.get_agent_storage(&target_did).await
1192                        {
1193                            if let Err(e) = agent_storage
1194                                .update_delivery_status(
1195                                    delivery_id,
1196                                    storage::models::DeliveryStatus::Success,
1197                                    None, // No HTTP status for internal delivery
1198                                    None, // No error message
1199                                )
1200                                .await
1201                            {
1202                                log::warn!(
1203                                    "Failed to update routed delivery record to success: {}",
1204                                    e
1205                                );
1206                            }
1207                        }
1208                    }
1209                }
1210                Err(e) => {
1211                    log::warn!("Agent failed to process message: {}", e);
1212
1213                    // Update delivery record to failed
1214                    #[cfg(feature = "storage")]
1215                    if let (Some(delivery_id), Some(ref storage_manager)) =
1216                        (delivery_id, &self.agent_storage_manager)
1217                    {
1218                        if let Ok(agent_storage) =
1219                            storage_manager.get_agent_storage(&target_did).await
1220                        {
1221                            if let Err(e2) = agent_storage
1222                                .update_delivery_status(
1223                                    delivery_id,
1224                                    storage::models::DeliveryStatus::Failed,
1225                                    None,                 // No HTTP status for internal delivery
1226                                    Some(&e.to_string()), // Include error message
1227                                )
1228                                .await
1229                            {
1230                                log::warn!(
1231                                    "Failed to update routed delivery record to failed: {}",
1232                                    e2
1233                                );
1234                            }
1235                        }
1236                    }
1237                }
1238            }
1239        }
1240
1241        Ok(())
1242    }
1243
1244    /// Dispatch a message to an agent by DID
1245    pub async fn dispatch_message(&self, target_did: String, message: PlainMessage) -> Result<()> {
1246        let agent = self.agents.get_agent(&target_did).await?;
1247
1248        // Convert the message to a packed format for transport
1249        let packed = agent.send_serialized_message(&message).await?;
1250
1251        // Publish an event for the dispatched message
1252        self.event_bus
1253            .publish_agent_message(target_did, packed.into_bytes())
1254            .await;
1255
1256        Ok(())
1257    }
1258
1259    /// Send a message to an agent
1260    ///
1261    /// This method now includes comprehensive delivery tracking and actual message delivery.
1262    /// For internal recipients (registered agents), messages are delivered directly.
1263    /// For external recipients, messages are delivered via HTTP with tracking.
1264    pub async fn send_message(&self, sender_did: String, message: PlainMessage) -> Result<String> {
1265        // Log outgoing messages to agent-specific storage
1266        #[cfg(feature = "storage")]
1267        {
1268            if let Some(ref storage_manager) = self.agent_storage_manager {
1269                // Check if this is a transaction message
1270                let message_type_lower = message.type_.to_lowercase();
1271                let is_transaction = message_type_lower.contains("transfer")
1272                    || message_type_lower.contains("payment");
1273                log::debug!(
1274                    "Message type: {}, is_transaction: {}",
1275                    message.type_,
1276                    is_transaction
1277                );
1278
1279                if is_transaction {
1280                    // For transactions, store in ALL involved agents' databases
1281                    let involved_agents = self.extract_transaction_agents(&message);
1282
1283                    if involved_agents.is_empty() {
1284                        log::warn!(
1285                            "No registered agents found for outgoing transaction: {}",
1286                            message.id
1287                        );
1288                    } else {
1289                        log::debug!(
1290                            "Storing outgoing transaction {} in {} agent databases",
1291                            message.id,
1292                            involved_agents.len()
1293                        );
1294
1295                        // Store transaction in each involved agent's database
1296                        for agent_did in &involved_agents {
1297                            if let Ok(agent_storage) =
1298                                storage_manager.get_agent_storage(agent_did).await
1299                            {
1300                                // Log the message
1301                                match agent_storage
1302                                    .log_message(&message, storage::MessageDirection::Outgoing)
1303                                    .await
1304                                {
1305                                    Ok(_) => log::debug!(
1306                                        "Logged outgoing message to agent {}: {}",
1307                                        agent_did,
1308                                        message.id
1309                                    ),
1310                                    Err(e) => log::warn!(
1311                                        "Failed to log outgoing message for agent {}: {}",
1312                                        agent_did,
1313                                        e
1314                                    ),
1315                                }
1316
1317                                // Store as transaction
1318                                match agent_storage.insert_transaction(&message).await {
1319                                    Ok(()) => {
1320                                        log::debug!(
1321                                            "Stored outgoing transaction for agent {}: {}",
1322                                            agent_did,
1323                                            message.id
1324                                        );
1325
1326                                        // Publish TransactionCreated event
1327                                        // Use the message.id as the reference_id to fetch the transaction
1328                                        if let Ok(Some(tx)) =
1329                                            agent_storage.get_transaction_by_id(&message.id).await
1330                                        {
1331                                            self.event_bus
1332                                                .publish_event(NodeEvent::TransactionCreated {
1333                                                    transaction: tx,
1334                                                    agent_did: agent_did.clone(),
1335                                                })
1336                                                .await;
1337                                        }
1338                                    }
1339                                    Err(e) => log::warn!(
1340                                        "Failed to store outgoing transaction for agent {}: {}",
1341                                        agent_did,
1342                                        e
1343                                    ),
1344                                }
1345                            } else {
1346                                log::warn!("Failed to get storage for agent: {}", agent_did);
1347                            }
1348                        }
1349                    }
1350                } else {
1351                    // For non-transaction messages, just store in sender's storage
1352                    if let Ok(sender_storage) = storage_manager.get_agent_storage(&sender_did).await
1353                    {
1354                        // Log the message to the sender's storage
1355                        match sender_storage
1356                            .log_message(&message, storage::MessageDirection::Outgoing)
1357                            .await
1358                        {
1359                            Ok(_) => log::debug!(
1360                                "Logged outgoing message for agent {}: {}",
1361                                sender_did,
1362                                message.id
1363                            ),
1364                            Err(e) => log::warn!(
1365                                "Failed to log outgoing message for agent {}: {}",
1366                                sender_did,
1367                                e
1368                            ),
1369                        }
1370                    } else {
1371                        log::warn!("Failed to get storage for sender agent: {}", sender_did);
1372                        // Fall back to centralized storage if available
1373                        if let Some(ref storage) = self.storage {
1374                            let _ = storage
1375                                .log_message(&message, storage::MessageDirection::Outgoing)
1376                                .await;
1377                        }
1378                    }
1379                }
1380            }
1381        }
1382
1383        // Process the outgoing message
1384        let processed_message = match self.outgoing_processor.process_outgoing(message).await? {
1385            Some(msg) => msg,
1386            None => {
1387                return Err(Error::MessageDropped(
1388                    "PlainMessage dropped during processing".to_string(),
1389                ))
1390            }
1391        };
1392
1393        // Get the sender agent and its key manager
1394        let agent = self.agents.get_agent(&sender_did).await?;
1395        let key_manager = agent.key_manager();
1396
1397        // Determine security mode based on message type
1398        use tap_agent::message::SecurityMode;
1399        let security_mode = if processed_message.type_.contains("credential")
1400            || processed_message.type_.contains("transfer")
1401            || processed_message.type_.contains("payment")
1402        {
1403            SecurityMode::AuthCrypt
1404        } else {
1405            SecurityMode::Signed
1406        };
1407
1408        // Get sender key ID
1409        let sender_kid = agent.get_signing_kid().await?;
1410
1411        // For single recipient auth-crypt, get recipient key
1412        let recipient_kid =
1413            if processed_message.to.len() == 1 && security_mode == SecurityMode::AuthCrypt {
1414                Some(agent.get_encryption_kid(&processed_message.to[0]).await?)
1415            } else {
1416                None
1417            };
1418
1419        // Create pack options
1420        use tap_agent::message_packing::{PackOptions, Packable};
1421        let pack_options = PackOptions {
1422            security_mode,
1423            sender_kid: Some(sender_kid),
1424            recipient_kid,
1425        };
1426
1427        // Pack/sign the message properly
1428        let packed = processed_message.pack(&**key_manager, pack_options).await?;
1429
1430        // Deliver to all recipients in the message
1431        let mut delivery_errors = Vec::new();
1432
1433        for recipient_did in &processed_message.to {
1434            log::debug!("Processing delivery to recipient: {}", recipient_did);
1435
1436            // Check if recipient is a local agent (internal delivery) or external
1437            let is_internal_recipient = self.agents.get_agent(recipient_did).await.is_ok();
1438
1439            if is_internal_recipient {
1440                // Internal delivery - deliver to registered agent with tracking
1441                log::debug!("Delivering message internally to agent: {}", recipient_did);
1442
1443                #[cfg(feature = "storage")]
1444                let delivery_id = if let Some(ref storage_manager) = self.agent_storage_manager {
1445                    if let Ok(sender_storage) = storage_manager.get_agent_storage(&sender_did).await
1446                    {
1447                        // Create delivery record for internal delivery
1448                        match sender_storage
1449                            .create_delivery(
1450                                &processed_message.id,
1451                                &packed, // Store the signed/packed message
1452                                recipient_did,
1453                                None, // No URL for internal delivery
1454                                storage::models::DeliveryType::Internal,
1455                            )
1456                            .await
1457                        {
1458                            Ok(id) => {
1459                                log::debug!(
1460                                    "Created internal delivery record {} for message {} to {}",
1461                                    id,
1462                                    processed_message.id,
1463                                    recipient_did
1464                                );
1465                                Some(id)
1466                            }
1467                            Err(e) => {
1468                                log::warn!("Failed to create internal delivery record: {}", e);
1469                                None
1470                            }
1471                        }
1472                    } else {
1473                        None
1474                    }
1475                } else {
1476                    None
1477                };
1478
1479                // Process the message internally through receive_message_from_source
1480                // to ensure it gets recorded in the received table
1481                // Pass the packed (signed) message just like external messages
1482                let message_value = match serde_json::from_str::<serde_json::Value>(&packed) {
1483                    Ok(val) => val,
1484                    Err(e) => {
1485                        log::error!("Failed to parse packed message as JSON: {}", e);
1486                        continue;
1487                    }
1488                };
1489
1490                match self
1491                    .receive_message_from_source(
1492                        message_value,
1493                        storage::SourceType::Internal,
1494                        Some(&sender_did),
1495                    )
1496                    .await
1497                {
1498                    Ok(_) => {
1499                        log::debug!(
1500                            "Successfully delivered message internally to: {}",
1501                            recipient_did
1502                        );
1503
1504                        // Update delivery record to success
1505                        #[cfg(feature = "storage")]
1506                        if let (Some(delivery_id), Some(ref storage_manager)) =
1507                            (delivery_id, &self.agent_storage_manager)
1508                        {
1509                            if let Ok(sender_storage) =
1510                                storage_manager.get_agent_storage(&sender_did).await
1511                            {
1512                                if let Err(e) = sender_storage
1513                                    .update_delivery_status(
1514                                        delivery_id,
1515                                        storage::models::DeliveryStatus::Success,
1516                                        None, // No HTTP status for internal delivery
1517                                        None, // No error message
1518                                    )
1519                                    .await
1520                                {
1521                                    log::warn!("Failed to update internal delivery status: {}", e);
1522                                }
1523                            }
1524                        }
1525                    }
1526                    Err(e) => {
1527                        log::error!(
1528                            "Failed to deliver message internally to {}: {}",
1529                            recipient_did,
1530                            e
1531                        );
1532
1533                        // Update delivery record to failed
1534                        #[cfg(feature = "storage")]
1535                        if let (Some(delivery_id), Some(ref storage_manager)) =
1536                            (delivery_id, &self.agent_storage_manager)
1537                        {
1538                            if let Ok(sender_storage) =
1539                                storage_manager.get_agent_storage(&sender_did).await
1540                            {
1541                                if let Err(e2) = sender_storage
1542                                    .update_delivery_status(
1543                                        delivery_id,
1544                                        storage::models::DeliveryStatus::Failed,
1545                                        None, // No HTTP status for internal delivery
1546                                        Some(&format!("Internal delivery failed: {}", e)),
1547                                    )
1548                                    .await
1549                                {
1550                                    log::warn!("Failed to update internal delivery status: {}", e2);
1551                                }
1552                            }
1553                        }
1554
1555                        delivery_errors.push((recipient_did.clone(), e));
1556                        continue; // Continue to next recipient
1557                    }
1558                }
1559            } else {
1560                // External delivery - use TapAgent's built-in HTTP delivery with tracking
1561                log::debug!("Attempting external delivery to: {}", recipient_did);
1562
1563                // Get the sender agent for HTTP delivery
1564                let sender_agent = self.agents.get_agent(&sender_did).await?;
1565
1566                // Resolve the service endpoint for the recipient
1567                let endpoint = match sender_agent.get_service_endpoint(recipient_did).await? {
1568                    Some(ep) => ep,
1569                    None => {
1570                        log::warn!(
1571                            "No service endpoint found for {}, delivery failed",
1572                            recipient_did
1573                        );
1574
1575                        // Create failed delivery record
1576                        #[cfg(feature = "storage")]
1577                        if let Some(ref storage_manager) = self.agent_storage_manager {
1578                            if let Ok(sender_storage) =
1579                                storage_manager.get_agent_storage(&sender_did).await
1580                            {
1581                                if let Ok(delivery_id) = sender_storage
1582                                    .create_delivery(
1583                                        &processed_message.id,
1584                                        &packed,
1585                                        recipient_did,
1586                                        None,
1587                                        storage::models::DeliveryType::Https,
1588                                    )
1589                                    .await
1590                                {
1591                                    let _ = sender_storage
1592                                        .update_delivery_status(
1593                                            delivery_id,
1594                                            storage::models::DeliveryStatus::Failed,
1595                                            None,
1596                                            Some("No service endpoint found for recipient"),
1597                                        )
1598                                        .await;
1599                                }
1600                            }
1601                        }
1602
1603                        delivery_errors.push((
1604                            recipient_did.clone(),
1605                            Error::Dispatch(format!(
1606                                "No service endpoint found for recipient: {}",
1607                                recipient_did
1608                            )),
1609                        ));
1610                        continue; // Continue to next recipient
1611                    }
1612                };
1613
1614                // Create delivery record before attempting delivery
1615                #[cfg(feature = "storage")]
1616                let delivery_id = if let Some(ref storage_manager) = self.agent_storage_manager {
1617                    if let Ok(sender_storage) = storage_manager.get_agent_storage(&sender_did).await
1618                    {
1619                        match sender_storage
1620                            .create_delivery(
1621                                &processed_message.id,
1622                                &packed, // Store the signed/packed message
1623                                recipient_did,
1624                                Some(&endpoint),
1625                                storage::models::DeliveryType::Https,
1626                            )
1627                            .await
1628                        {
1629                            Ok(id) => {
1630                                log::debug!(
1631                                    "Created external delivery record {} for message {} to {} at {}",
1632                                    id,
1633                                    processed_message.id,
1634                                    recipient_did,
1635                                    endpoint
1636                                );
1637                                Some(id)
1638                            }
1639                            Err(e) => {
1640                                log::warn!("Failed to create external delivery record: {}", e);
1641                                None
1642                            }
1643                        }
1644                    } else {
1645                        None
1646                    }
1647                } else {
1648                    None
1649                };
1650
1651                // Attempt HTTP delivery using TapAgent's built-in functionality
1652                match sender_agent.send_to_endpoint(&packed, &endpoint).await {
1653                    Ok(status_code) => {
1654                        log::debug!(
1655                            "Successfully delivered message {} to {} at {} (HTTP {})",
1656                            processed_message.id,
1657                            recipient_did,
1658                            endpoint,
1659                            status_code
1660                        );
1661
1662                        // Update delivery record to success
1663                        #[cfg(feature = "storage")]
1664                        if let (Some(delivery_id), Some(ref storage_manager)) =
1665                            (delivery_id, &self.agent_storage_manager)
1666                        {
1667                            if let Ok(sender_storage) =
1668                                storage_manager.get_agent_storage(&sender_did).await
1669                            {
1670                                if let Err(e) = sender_storage
1671                                    .update_delivery_status(
1672                                        delivery_id,
1673                                        storage::models::DeliveryStatus::Success,
1674                                        Some(status_code as i32),
1675                                        None,
1676                                    )
1677                                    .await
1678                                {
1679                                    log::warn!(
1680                                        "Failed to update external delivery status to success: {}",
1681                                        e
1682                                    );
1683                                }
1684                            }
1685                        }
1686                    }
1687                    Err(e) => {
1688                        log::error!(
1689                            "Failed to deliver message {} to {} at {}: {}",
1690                            processed_message.id,
1691                            recipient_did,
1692                            endpoint,
1693                            e
1694                        );
1695
1696                        // Update delivery record to failed
1697                        #[cfg(feature = "storage")]
1698                        if let (Some(delivery_id), Some(ref storage_manager)) =
1699                            (delivery_id, &self.agent_storage_manager)
1700                        {
1701                            if let Ok(sender_storage) =
1702                                storage_manager.get_agent_storage(&sender_did).await
1703                            {
1704                                // Extract HTTP status code from error if possible
1705                                let error_msg = e.to_string();
1706                                let http_status_code = if error_msg.contains("status:") {
1707                                    error_msg
1708                                        .split("status:")
1709                                        .nth(1)
1710                                        .and_then(|s| s.split_whitespace().next())
1711                                        .and_then(|s| s.parse::<i32>().ok())
1712                                } else {
1713                                    None
1714                                };
1715
1716                                if let Err(e2) = sender_storage
1717                                    .update_delivery_status(
1718                                        delivery_id,
1719                                        storage::models::DeliveryStatus::Failed,
1720                                        http_status_code,
1721                                        Some(&error_msg),
1722                                    )
1723                                    .await
1724                                {
1725                                    log::warn!(
1726                                        "Failed to update external delivery status to failed: {}",
1727                                        e2
1728                                    );
1729                                }
1730
1731                                // Increment retry count for future retry processing
1732                                if let Err(e2) = sender_storage
1733                                    .increment_delivery_retry_count(delivery_id)
1734                                    .await
1735                                {
1736                                    log::warn!("Failed to increment retry count: {}", e2);
1737                                }
1738                            }
1739                        }
1740
1741                        delivery_errors.push((
1742                            recipient_did.clone(),
1743                            Error::Dispatch(format!(
1744                                "HTTP delivery failed for {}: {}",
1745                                recipient_did, e
1746                            )),
1747                        ));
1748                        continue; // Continue to next recipient
1749                    }
1750                }
1751            }
1752        }
1753
1754        // Check if all deliveries failed
1755        if !delivery_errors.is_empty() && delivery_errors.len() == processed_message.to.len() {
1756            return Err(Error::Dispatch(format!(
1757                "Failed to deliver message to all recipients: {:?}",
1758                delivery_errors
1759            )));
1760        }
1761
1762        // Log partial failures
1763        if !delivery_errors.is_empty() {
1764            log::warn!(
1765                "Message delivered to {}/{} recipients. Failures: {:?}",
1766                processed_message.to.len() - delivery_errors.len(),
1767                processed_message.to.len(),
1768                delivery_errors
1769            );
1770        }
1771
1772        // Publish an event for the message
1773        self.event_bus
1774            .publish_agent_message(sender_did, packed.clone().into_bytes())
1775            .await;
1776
1777        Ok(packed)
1778    }
1779
1780    /// Register a new agent with the node
1781    ///
1782    /// This method registers an agent with the TAP Node and automatically initializes
1783    /// DID-specific storage for the agent. The storage directory structure follows:
1784    /// - `~/.tap/{sanitized_did}/transactions.db` (default)
1785    /// - `{tap_root}/{sanitized_did}/transactions.db` (if custom TAP root is configured)
1786    ///
1787    /// # Storage Initialization
1788    ///
1789    /// When an agent is registered, a dedicated SQLite database is created for that agent's DID.
1790    /// This ensures transaction isolation between different agents while maintaining a consistent
1791    /// storage structure. If storage initialization fails, the agent registration continues but
1792    /// a warning is logged.
1793    ///
1794    /// # Arguments
1795    ///
1796    /// * `agent` - The TapAgent to register with the node
1797    ///
1798    /// # Returns
1799    ///
1800    /// * `Ok(())` if the agent was successfully registered
1801    /// * `Err(Error)` if agent registration fails
1802    pub async fn register_agent(&self, agent: Arc<TapAgent>) -> Result<()> {
1803        let agent_did = agent.get_agent_did().to_string();
1804
1805        // Initialize storage for this agent if storage is enabled
1806        #[cfg(feature = "storage")]
1807        {
1808            if let Some(ref storage_manager) = self.agent_storage_manager {
1809                match storage_manager.ensure_agent_storage(&agent_did).await {
1810                    Ok(_) => {
1811                        log::info!("Initialized storage for agent: {}", agent_did);
1812
1813                        // Get the agent storage and register customer event handler
1814                        if let Ok(agent_storage) =
1815                            storage_manager.get_agent_storage(&agent_did).await
1816                        {
1817                            let customer_handler =
1818                                Arc::new(event::customer_handler::CustomerEventHandler::new(
1819                                    agent_storage,
1820                                    agent_did.clone(),
1821                                ));
1822                            self.event_bus.subscribe(customer_handler).await;
1823                            log::debug!(
1824                                "Registered customer event handler for agent: {}",
1825                                agent_did
1826                            );
1827                        }
1828                    }
1829                    Err(e) => {
1830                        log::warn!(
1831                            "Failed to initialize storage for agent {}: {}",
1832                            agent_did,
1833                            e
1834                        );
1835                        // Don't fail the registration, just log the warning
1836                    }
1837                }
1838            }
1839        }
1840
1841        self.agents.register_agent(agent_did.clone(), agent).await?;
1842
1843        // Publish event about agent registration
1844        self.event_bus.publish_agent_registered(agent_did).await;
1845
1846        Ok(())
1847    }
1848
1849    /// Unregister an agent from the node
1850    pub async fn unregister_agent(&self, did: &str) -> Result<()> {
1851        self.agents.unregister_agent(did).await?;
1852
1853        // Publish event about agent registration
1854        self.event_bus
1855            .publish_agent_unregistered(did.to_string())
1856            .await;
1857
1858        Ok(())
1859    }
1860
1861    /// Get a list of registered agent DIDs
1862    pub fn list_agents(&self) -> Vec<String> {
1863        self.agents.get_all_dids()
1864    }
1865
1866    /// Get a reference to the agent registry
1867    pub fn agents(&self) -> &Arc<AgentRegistry> {
1868        &self.agents
1869    }
1870
1871    /// Get a reference to the event bus
1872    pub fn event_bus(&self) -> &Arc<EventBus> {
1873        &self.event_bus
1874    }
1875
1876    /// Get a reference to the resolver
1877    pub fn resolver(&self) -> &Arc<MultiResolver> {
1878        &self.resolver
1879    }
1880
1881    /// Get a mutable reference to the processor pool
1882    /// This is a reference to `Option<ProcessorPool>` to allow starting the pool after node creation
1883    pub fn processor_pool_mut(&mut self) -> &mut Option<ProcessorPool> {
1884        &mut self.processor_pool
1885    }
1886
1887    /// Get the node configuration
1888    pub fn config(&self) -> &NodeConfig {
1889        &self.config
1890    }
1891
1892    /// Set the decision mode at runtime.
1893    ///
1894    /// Call this after `init_storage()` but before processing any messages
1895    /// to configure how the FSM handles decision points.
1896    pub fn set_decision_mode(&mut self, mode: state_machine::fsm::DecisionMode) {
1897        self.config.decision_mode = mode;
1898    }
1899
1900    /// Get a reference to the storage (if available)
1901    #[cfg(feature = "storage")]
1902    pub fn storage(&self) -> Option<&Arc<storage::Storage>> {
1903        self.storage.as_ref()
1904    }
1905
1906    /// Get a reference to the agent storage manager (if available)
1907    #[cfg(feature = "storage")]
1908    pub fn agent_storage_manager(&self) -> Option<&Arc<storage::AgentStorageManager>> {
1909        self.agent_storage_manager.as_ref()
1910    }
1911
1912    /// Set storage for testing purposes
1913    /// This allows injecting in-memory databases for complete test isolation
1914    #[cfg(feature = "storage")]
1915    pub async fn set_storage(&mut self, storage: storage::Storage) -> Result<()> {
1916        let storage_arc = Arc::new(storage);
1917
1918        // Subscribe event handlers
1919        let message_status_handler = Arc::new(event::handlers::MessageStatusHandler::new(
1920            storage_arc.clone(),
1921        ));
1922        self.event_bus.subscribe(message_status_handler).await;
1923
1924        let transaction_state_handler = Arc::new(event::handlers::TransactionStateHandler::new(
1925            storage_arc.clone(),
1926        ));
1927        self.event_bus.subscribe(transaction_state_handler).await;
1928
1929        let transaction_audit_handler = Arc::new(event::handlers::TransactionAuditHandler::new());
1930        self.event_bus.subscribe(transaction_audit_handler).await;
1931
1932        // Create state processor with configured decision mode
1933        let state_processor = Arc::new(state_machine::StandardTransactionProcessor::new(
1934            storage_arc.clone(),
1935            self.event_bus.clone(),
1936            self.agents.clone(),
1937            self.config.decision_mode.clone(),
1938        ));
1939
1940        self.storage = Some(storage_arc);
1941        self.state_processor = Some(state_processor);
1942        Ok(())
1943    }
1944
1945    /// Determine which agent's storage should be used for a message
1946    ///
1947    /// This method uses the following strategy:
1948    /// 1. Use the first recipient DID if it's one of our registered agents
1949    /// 2. Use the sender DID if it's one of our registered agents  
1950    /// 3. If no local agents are involved, fall back to the first recipient
1951    #[cfg(feature = "storage")]
1952    fn determine_message_agent(&self, message: &PlainMessage) -> Result<String> {
1953        // Strategy 1: Use the first recipient DID if it's one of our agents
1954        for recipient in &message.to {
1955            if self.agents.has_agent(recipient) {
1956                return Ok(recipient.clone());
1957            }
1958        }
1959
1960        // Strategy 2: Use the sender DID if it's one of our agents
1961        if self.agents.has_agent(&message.from) {
1962            return Ok(message.from.clone());
1963        }
1964
1965        // Strategy 3: If no local agents involved, fall back to first recipient
1966        if !message.to.is_empty() {
1967            return Ok(message.to[0].clone());
1968        }
1969
1970        Err(Error::Storage(
1971            "Cannot determine agent for message storage".to_string(),
1972        ))
1973    }
1974
1975    /// Extract all agent DIDs involved in a transaction
1976    ///
1977    /// For Transfer and Payment messages, this includes:
1978    /// - Originator/Customer from the message body
1979    /// - Beneficiary/Merchant from the message body
1980    /// - All agents in the agents array
1981    /// - Sender (from) and recipients (to) from the message envelope
1982    #[cfg(feature = "storage")]
1983    fn extract_transaction_agents(&self, message: &PlainMessage) -> Vec<String> {
1984        use std::collections::HashSet;
1985        let mut agent_dids = HashSet::new();
1986
1987        log::debug!("Extracting transaction agents for message: {}", message.id);
1988
1989        // Add sender and recipients from message envelope
1990        agent_dids.insert(message.from.clone());
1991        log::debug!("Added sender: {}", message.from);
1992
1993        for recipient in &message.to {
1994            agent_dids.insert(recipient.clone());
1995            log::debug!("Added recipient: {}", recipient);
1996        }
1997
1998        // Extract agents from message body based on type
1999        let message_type_lower = message.type_.to_lowercase();
2000        log::debug!("Message type: {}", message_type_lower);
2001
2002        if message_type_lower.contains("transfer") {
2003            // Parse Transfer message body
2004            if let Ok(transfer) =
2005                serde_json::from_value::<tap_msg::message::Transfer>(message.body.clone())
2006            {
2007                // Add originator if present
2008                if let Some(originator) = &transfer.originator {
2009                    agent_dids.insert(originator.id.clone());
2010                    log::debug!("Added originator: {}", originator.id);
2011                }
2012
2013                // Add beneficiary if present
2014                if let Some(beneficiary) = &transfer.beneficiary {
2015                    agent_dids.insert(beneficiary.id.clone());
2016                    log::debug!("Added beneficiary: {}", beneficiary.id);
2017                }
2018
2019                // Add all agents
2020                for agent in &transfer.agents {
2021                    agent_dids.insert(agent.id.clone());
2022                    log::debug!("Added agent: {}", agent.id);
2023                }
2024            } else {
2025                log::warn!("Failed to parse Transfer message body");
2026            }
2027        } else if message_type_lower.contains("payment") {
2028            // Parse Payment message body
2029            if let Ok(payment) =
2030                serde_json::from_value::<tap_msg::message::Payment>(message.body.clone())
2031            {
2032                // Add merchant
2033                agent_dids.insert(payment.merchant.id.clone());
2034                log::debug!("Added merchant: {}", payment.merchant.id);
2035
2036                // Add customer if present
2037                if let Some(customer) = &payment.customer {
2038                    agent_dids.insert(customer.id.clone());
2039                    log::debug!("Added customer: {}", customer.id);
2040                }
2041
2042                // Add all agents
2043                for agent in &payment.agents {
2044                    agent_dids.insert(agent.id.clone());
2045                    log::debug!("Added agent: {}", agent.id);
2046                }
2047            } else {
2048                log::warn!("Failed to parse Payment message body");
2049            }
2050        }
2051
2052        log::debug!("Total unique agents found: {}", agent_dids.len());
2053
2054        // Convert to Vec and filter to only include registered agents
2055        let registered_agents: Vec<String> = agent_dids
2056            .into_iter()
2057            .filter(|did| {
2058                let is_registered = self.agents.has_agent(did);
2059                log::debug!("Agent {} registered: {}", did, is_registered);
2060                is_registered
2061            })
2062            .collect();
2063
2064        log::debug!(
2065            "Registered agents involved in transaction: {:?}",
2066            registered_agents
2067        );
2068        registered_agents
2069    }
2070}
2071
2072// Namespace imports
2073// These imports make the implementation cleaner, but should be hidden from public API
2074use message::processor::DefaultPlainMessageProcessor;
2075use message::processor::LoggingPlainMessageProcessor;
2076use message::processor::ValidationPlainMessageProcessor;
2077use message::processor_pool::{ProcessorPool, ProcessorPoolConfig};
2078use message::router::DefaultPlainMessageRouter;
2079use message::trust_ping_processor::TrustPingProcessor;
2080use message::RouterAsyncExt;