tap_node/
lib.rs

1//! TAP Node - A node implementation for the TAP protocol
2//!
3//! The TAP Node is the central component that manages TAP Agents, routes messages,
4//! processes events, stores transactions, and provides a scalable architecture for TAP deployments.
5//!
6//! # Architecture Overview
7//!
8//! The TAP Node acts as a message router and coordinator for multiple TAP Agents. It provides:
9//!
10//! - **Efficient Message Processing**: Different handling for plain, signed, and encrypted messages
11//! - **Centralized Verification**: Signed messages are verified once for all agents
12//! - **Smart Routing**: Encrypted messages are routed to appropriate recipient agents
13//! - **Scalable Design**: Supports multiple agents with concurrent message processing
14//!
15//! # Key Components
16//!
17//! - **Agent Registry**: Manages multiple TAP Agents
18//! - **Event Bus**: Publishes and distributes events throughout the system
19//! - **Message Processors**: Process incoming and outgoing messages
20//! - **Message Router**: Routes messages to the appropriate agent
21//! - **DID Resolver**: Resolves DIDs for signature verification
22//! - **Storage**: Persistent SQLite storage with transaction tracking and audit trails
23//!
24//! # Message Processing Flow
25//!
26//! The TAP Node uses an optimized message processing flow based on message type:
27//!
28//! ## Signed Messages (JWS)
29//! 1. **Single Verification**: Signature verified once using DID resolver
30//! 2. **Routing**: Verified PlainMessage routed to appropriate agent
31//! 3. **Processing**: Agent receives verified message via `receive_plain_message()`
32//!
33//! ## Encrypted Messages (JWE)  
34//! 1. **Recipient Identification**: Extract recipient DIDs from JWE headers
35//! 2. **Agent Routing**: Send encrypted message to each matching agent
36//! 3. **Decryption**: Each agent attempts decryption via `receive_encrypted_message()`
37//! 4. **Processing**: Successfully decrypted messages are processed by the agent
38//!
39//! ## Plain Messages
40//! 1. **Direct Processing**: Plain messages processed through the pipeline
41//! 2. **Routing**: Routed to appropriate agent
42//! 3. **Delivery**: Agent receives via `receive_plain_message()`
43//!
44//! # Benefits of This Architecture
45//!
46//! - **Efficiency**: Signed messages verified once, not per-agent
47//! - **Scalability**: Encrypted messages naturally distributed to recipients
48//! - **Flexibility**: Agents remain fully functional standalone
49//! - **Security**: Centralized verification with distributed decryption
50//!
51//! # Thread Safety and Concurrency
52//!
53//! The TAP Node is designed with concurrent operations in mind. It uses a combination of
54//! async/await patterns and synchronization primitives to safely handle multiple operations
55//! simultaneously. Most components within the node are either immutable or use interior
56//! mutability with appropriate synchronization.
57//!
58//! # Example Usage
59//!
60//! ```rust,no_run
61//! use tap_node::{TapNode, NodeConfig};
62//! use tap_agent::TapAgent;
63//! use std::sync::Arc;
64//!
65//! #[tokio::main]
66//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
67//!     // Create node
68//!     let config = NodeConfig::default();
69//!     let node = Arc::new(TapNode::new(config));
70//!     
71//!     // Create and register agent
72//!     let (agent, _did) = TapAgent::from_ephemeral_key().await?;
73//!     node.register_agent(Arc::new(agent)).await?;
74//!     
75//!     // Process incoming message (JSON Value)
76//!     let message_value = serde_json::json!({
77//!         "id": "msg-123",
78//!         "type": "test-message",
79//!         "body": {"content": "Hello"}
80//!     });
81//!     
82//!     node.receive_message(message_value).await?;
83//!     Ok(())
84//! }
85//! ```
86
87pub mod agent;
88pub mod error;
89pub mod event;
90pub mod message;
91#[cfg(feature = "storage")]
92pub mod state_machine;
93pub mod storage;
94#[cfg(feature = "storage")]
95pub mod validation;
96
97pub use error::{Error, Result};
98pub use event::logger::{EventLogger, EventLoggerConfig, LogDestination};
99pub use event::{EventSubscriber, NodeEvent};
100pub use message::sender::{
101    HttpPlainMessageSender, NodePlainMessageSender, PlainMessageSender, WebSocketPlainMessageSender,
102};
103
104use std::sync::Arc;
105
106use tap_agent::{Agent, TapAgent};
107// use tap_agent::message_packing::PackOptions;
108use tap_msg::didcomm::PlainMessage;
109
110use crate::message::processor::PlainMessageProcessor;
111use crate::message::{
112    CompositePlainMessageProcessor, CompositePlainMessageRouter, PlainMessageProcessorType,
113    PlainMessageRouterType,
114};
115use agent::AgentRegistry;
116use event::EventBus;
117use tap_agent::did::MultiResolver;
118
119use async_trait::async_trait;
120
121// Extension trait for TapAgent to add serialization methods
122///
123/// This trait extends the TapAgent with methods for serializing and packing
124/// DIDComm messages for transmission. It provides functionality for converting
125/// in-memory message objects to secure, serialized formats that follow the
126/// DIDComm messaging protocol standards.
127#[async_trait]
128pub trait TapAgentExt {
129    /// Pack and serialize a DIDComm message for transmission
130    ///
131    /// This method takes a DIDComm message and recipient DID, then:
132    /// 1. Uses the agent's PlainMessagePacker to properly sign and encrypt the message
133    /// 2. Serializes the message to a string format
134    ///
135    /// # Parameters
136    /// * `message` - The DIDComm message to serialize
137    /// * `to_did` - The DID of the recipient
138    ///
139    /// # Returns
140    /// The packed message as a string, ready for transmission
141    async fn send_serialized_message(&self, message: &PlainMessage, to_did: &str)
142        -> Result<String>;
143}
144
145#[async_trait]
146impl TapAgentExt for TapAgent {
147    async fn send_serialized_message(
148        &self,
149        message: &PlainMessage,
150        _to_did: &str,
151    ) -> Result<String> {
152        // Serialize the PlainMessage to JSON first to work around the TapMessageBody trait constraint
153        let json_value =
154            serde_json::to_value(message).map_err(|e| Error::Serialization(e.to_string()))?;
155
156        // Use JSON string for transportation instead of direct message passing
157        // This bypasses the need for PlainMessage to implement TapMessageBody
158        let serialized =
159            serde_json::to_string(&json_value).map_err(|e| Error::Serialization(e.to_string()))?;
160
161        Ok(serialized)
162    }
163}
164
165/// Configuration for a TAP Node
166#[derive(Debug, Clone, Default)]
167pub struct NodeConfig {
168    /// Debug mode
169    pub debug: bool,
170    /// Maximum number of agents
171    pub max_agents: Option<usize>,
172    /// Whether to enable message logging
173    pub enable_message_logging: bool,
174    /// Whether to log full message content
175    pub log_message_content: bool,
176    /// Configuration for the processor pool
177    pub processor_pool: Option<ProcessorPoolConfig>,
178    /// Configuration for the event logger
179    pub event_logger: Option<EventLoggerConfig>,
180    /// Path to the storage database (None for default)
181    #[cfg(feature = "storage")]
182    pub storage_path: Option<std::path::PathBuf>,
183    /// Agent DID for storage organization
184    #[cfg(feature = "storage")]
185    pub agent_did: Option<String>,
186    /// Custom TAP root directory (defaults to ~/.tap)
187    #[cfg(feature = "storage")]
188    pub tap_root: Option<std::path::PathBuf>,
189}
190
191/// # The TAP Node
192///
193/// The TAP Node is the core component responsible for coordinating message processing, routing, and delivery
194/// to TAP Agents. It serves as a central hub for all TAP communications and transaction coordination.
195///
196/// ## Core Responsibilities
197///
198/// - **Agent Management**: Registration and deregistration of TAP Agents
199/// - **PlainMessage Processing**: Processing incoming and outgoing messages through middleware chains
200/// - **PlainMessage Routing**: Determining the appropriate recipient for each message
201/// - **Event Publishing**: Broadcasting node events to subscribers
202/// - **Scalability**: Managing concurrent message processing through worker pools
203///
204/// ## Lifecycle
205///
206/// 1. Create a node with appropriate configuration
207/// 2. Register one or more agents with the node
208/// 3. Start the processor pool (if high throughput is required)
209/// 4. Process incoming/outgoing messages
210/// 5. Publish and respond to events
211///
212/// ## Thread Safety
213///
214/// The `TapNode` is designed to be thread-safe and can be shared across multiple
215/// threads using an `Arc<TapNode>`. All internal mutability is handled through
216/// appropriate synchronization primitives.
217#[derive(Clone)]
218pub struct TapNode {
219    /// Agent registry
220    agents: Arc<AgentRegistry>,
221    /// Event bus
222    event_bus: Arc<EventBus>,
223    /// Incoming message processor
224    incoming_processor: CompositePlainMessageProcessor,
225    /// Outgoing message processor
226    outgoing_processor: CompositePlainMessageProcessor,
227    /// PlainMessage router
228    router: CompositePlainMessageRouter,
229    /// Resolver for DIDs
230    resolver: Arc<MultiResolver>,
231    /// Worker pool for handling messages
232    processor_pool: Option<ProcessorPool>,
233    /// Node configuration
234    config: NodeConfig,
235    /// Storage for transactions (legacy centralized storage)
236    #[cfg(feature = "storage")]
237    storage: Option<Arc<storage::Storage>>,
238    /// Agent-specific storage manager
239    #[cfg(feature = "storage")]
240    agent_storage_manager: Option<Arc<storage::AgentStorageManager>>,
241    /// Transaction state processor
242    #[cfg(feature = "storage")]
243    state_processor: Option<Arc<state_machine::StandardTransactionProcessor>>,
244}
245
246impl TapNode {
247    /// Create a new TAP node with the given configuration
248    pub fn new(config: NodeConfig) -> Self {
249        // Create the agent registry
250        let agents = Arc::new(AgentRegistry::new(config.max_agents));
251
252        // Create the event bus
253        let event_bus = Arc::new(EventBus::new());
254
255        // Create the message router
256        let default_router = PlainMessageRouterType::Default(DefaultPlainMessageRouter::new());
257
258        let router = CompositePlainMessageRouter::new(vec![default_router]);
259
260        // Create the message processors
261        let logging_processor = PlainMessageProcessorType::Logging(LoggingPlainMessageProcessor);
262        let validation_processor =
263            PlainMessageProcessorType::Validation(ValidationPlainMessageProcessor);
264        let default_processor = PlainMessageProcessorType::Default(DefaultPlainMessageProcessor);
265
266        let incoming_processor = CompositePlainMessageProcessor::new(vec![
267            logging_processor.clone(),
268            validation_processor.clone(),
269            default_processor.clone(),
270        ]);
271
272        let outgoing_processor = CompositePlainMessageProcessor::new(vec![
273            logging_processor,
274            validation_processor,
275            default_processor,
276        ]);
277
278        // Create the resolver
279        let resolver = Arc::new(MultiResolver::default());
280
281        // Storage will be initialized on first use
282        #[cfg(feature = "storage")]
283        let storage = None;
284        #[cfg(feature = "storage")]
285        let agent_storage_manager = Some(Arc::new(storage::AgentStorageManager::new(
286            config.tap_root.clone(),
287        )));
288        #[cfg(feature = "storage")]
289        let state_processor = None;
290
291        let node = Self {
292            agents,
293            event_bus,
294            incoming_processor,
295            outgoing_processor,
296            router,
297            resolver,
298            processor_pool: None,
299            config,
300            #[cfg(feature = "storage")]
301            storage,
302            #[cfg(feature = "storage")]
303            agent_storage_manager,
304            #[cfg(feature = "storage")]
305            state_processor,
306        };
307
308        // Set up the event logger if configured
309        if let Some(logger_config) = &node.config.event_logger {
310            let event_logger = Arc::new(EventLogger::new(logger_config.clone()));
311
312            // We need to handle the async subscribe in a blocking context
313            // This is safe because EventBus methods are designed to be called in this way
314            let event_bus = node.event_bus.clone();
315            tokio::task::block_in_place(|| {
316                tokio::runtime::Handle::current().block_on(async {
317                    event_bus.subscribe(event_logger).await;
318                })
319            });
320        }
321
322        node
323    }
324
325    /// Initialize storage asynchronously
326    #[cfg(feature = "storage")]
327    pub async fn init_storage(&mut self) -> Result<()> {
328        let storage = if let Some(agent_did) = &self.config.agent_did {
329            // Use new DID-based storage structure
330            match storage::Storage::new_with_did(agent_did, self.config.tap_root.clone()).await {
331                Ok(s) => s,
332                Err(e) => {
333                    log::error!("Failed to initialize storage with DID: {}", e);
334                    return Err(Error::Storage(e.to_string()));
335                }
336            }
337        } else if let Some(storage_path) = self.config.storage_path.clone() {
338            // Use explicit path
339            match storage::Storage::new(Some(storage_path)).await {
340                Ok(s) => s,
341                Err(e) => {
342                    log::error!("Failed to initialize storage: {}", e);
343                    return Err(Error::Storage(e.to_string()));
344                }
345            }
346        } else {
347            // Initialize with default path
348            match storage::Storage::new(None).await {
349                Ok(s) => s,
350                Err(e) => {
351                    log::error!("Failed to initialize storage: {}", e);
352                    return Err(Error::Storage(e.to_string()));
353                }
354            }
355        };
356
357        let storage_arc = Arc::new(storage);
358
359        // Subscribe event handlers
360        let message_status_handler = Arc::new(event::handlers::MessageStatusHandler::new(
361            storage_arc.clone(),
362        ));
363        self.event_bus.subscribe(message_status_handler).await;
364
365        let transaction_state_handler = Arc::new(event::handlers::TransactionStateHandler::new(
366            storage_arc.clone(),
367        ));
368        self.event_bus.subscribe(transaction_state_handler).await;
369
370        let transaction_audit_handler = Arc::new(event::handlers::TransactionAuditHandler::new());
371        self.event_bus.subscribe(transaction_audit_handler).await;
372
373        // Create state processor
374        let state_processor = Arc::new(state_machine::StandardTransactionProcessor::new(
375            storage_arc.clone(),
376            self.event_bus.clone(),
377            self.agents.clone(),
378        ));
379
380        self.storage = Some(storage_arc);
381        self.state_processor = Some(state_processor);
382        Ok(())
383    }
384
385    /// Start the node
386    pub async fn start(&mut self, config: ProcessorPoolConfig) -> Result<()> {
387        let processor_pool = ProcessorPool::new(config);
388        self.processor_pool = Some(processor_pool);
389        Ok(())
390    }
391
392    /// Receive and process an incoming message
393    ///
394    /// This method handles the complete lifecycle of an incoming message:
395    ///
396    /// 1. Determining the message type (plain, signed, or encrypted)
397    /// 2. Verifying signatures or routing to agents for decryption
398    /// 3. Processing the resulting plain messages through the pipeline
399    /// 4. Routing and dispatching to the appropriate agents
400    ///
401    /// # Parameters
402    ///
403    /// * `message` - The message as a JSON Value (can be plain, JWS, or JWE)
404    ///
405    /// # Returns
406    ///
407    /// * `Ok(())` if the message was successfully processed
408    /// * `Err(Error)` if there was an error during processing
409    pub async fn receive_message(&self, message: serde_json::Value) -> Result<()> {
410        // Store the raw message for logging
411        let raw_message = serde_json::to_string(&message).ok();
412        use tap_agent::{verify_jws, Jwe, Jws};
413
414        // Determine message type
415        let is_encrypted =
416            message.get("protected").is_some() && message.get("recipients").is_some();
417        let is_signed = message.get("payload").is_some() && message.get("signatures").is_some();
418
419        if is_signed {
420            // Verify signature once using resolver
421            let jws: Jws = serde_json::from_value(message)
422                .map_err(|e| Error::Serialization(format!("Failed to parse JWS: {}", e)))?;
423
424            let plain_message = verify_jws(&jws, &*self.resolver)
425                .await
426                .map_err(|e| Error::Verification(format!("JWS verification failed: {}", e)))?;
427
428            // Process the verified plain message
429            self.process_plain_message(plain_message, raw_message.as_deref())
430                .await
431        } else if is_encrypted {
432            // Route encrypted message to each matching agent
433            let jwe: Jwe = serde_json::from_value(message.clone())
434                .map_err(|e| Error::Serialization(format!("Failed to parse JWE: {}", e)))?;
435
436            // Find agents that match recipients
437            let mut processed = false;
438            for recipient in &jwe.recipients {
439                if let Some(did) = recipient.header.kid.split('#').next() {
440                    if let Ok(agent) = self.agents.get_agent(did).await {
441                        // Let the agent handle decryption and processing
442                        match agent.receive_encrypted_message(&message).await {
443                            Ok(_) => {
444                                processed = true;
445                                log::debug!(
446                                    "Agent {} successfully processed encrypted message",
447                                    did
448                                );
449                            }
450                            Err(e) => {
451                                log::debug!(
452                                    "Agent {} couldn't process encrypted message: {}",
453                                    did,
454                                    e
455                                );
456                            }
457                        }
458                    }
459                }
460            }
461
462            if !processed {
463                return Err(Error::Processing(
464                    "No agent could process the encrypted message".to_string(),
465                ));
466            }
467            Ok(())
468        } else {
469            // Plain message - parse and process
470            let plain_message: PlainMessage = serde_json::from_value(message).map_err(|e| {
471                Error::Serialization(format!("Failed to parse PlainMessage: {}", e))
472            })?;
473
474            self.process_plain_message(plain_message, raw_message.as_deref())
475                .await
476        }
477    }
478
479    /// Process a plain message through the pipeline
480    async fn process_plain_message(
481        &self,
482        message: PlainMessage,
483        raw_message: Option<&str>,
484    ) -> Result<()> {
485        // Validate the message if storage/validation is available
486        #[cfg(feature = "storage")]
487        {
488            if let Some(ref storage) = self.storage {
489                // Create validator
490                let validator_config = validation::StandardValidatorConfig {
491                    max_timestamp_drift_secs: 60,
492                    storage: storage.clone(),
493                };
494                let validator = validation::create_standard_validator(validator_config).await;
495
496                // Validate the message
497                use crate::validation::{MessageValidator, ValidationResult};
498                match validator.validate(&message).await {
499                    ValidationResult::Accept => {
500                        // Publish accepted event
501                        self.event_bus
502                            .publish_message_accepted(
503                                message.id.clone(),
504                                message.type_.clone(),
505                                message.from.clone(),
506                                message.to.first().cloned().unwrap_or_default(),
507                            )
508                            .await;
509                    }
510                    ValidationResult::Reject(reason) => {
511                        // Publish rejected event
512                        self.event_bus
513                            .publish_message_rejected(
514                                message.id.clone(),
515                                reason.clone(),
516                                message.from.clone(),
517                                message.to.first().cloned().unwrap_or_default(),
518                            )
519                            .await;
520
521                        // Return error to stop processing
522                        return Err(Error::Validation(reason));
523                    }
524                }
525            }
526        }
527
528        // Process message through state machine if available
529        #[cfg(feature = "storage")]
530        {
531            if let Some(ref state_processor) = self.state_processor {
532                use crate::state_machine::TransactionStateProcessor;
533                if let Err(e) = state_processor.process_message(&message).await {
534                    log::warn!("State processor error: {}", e);
535                    // Don't fail the entire message processing, just log the error
536                }
537            }
538        }
539        // Log incoming messages to agent-specific storage
540        #[cfg(feature = "storage")]
541        {
542            if let Some(ref storage_manager) = self.agent_storage_manager {
543                // Check if this is a transaction message
544                let message_type_lower = message.type_.to_lowercase();
545                let is_transaction = message_type_lower.contains("transfer")
546                    || message_type_lower.contains("payment");
547                log::debug!(
548                    "Message type: {}, is_transaction: {}",
549                    message.type_,
550                    is_transaction
551                );
552
553                if is_transaction {
554                    // For transactions, store in ALL involved agents' databases
555                    let involved_agents = self.extract_transaction_agents(&message);
556
557                    if involved_agents.is_empty() {
558                        log::warn!("No registered agents found for transaction: {}", message.id);
559                    } else {
560                        log::debug!(
561                            "Storing transaction {} in {} agent databases",
562                            message.id,
563                            involved_agents.len()
564                        );
565
566                        // Store transaction in each involved agent's database
567                        for agent_did in &involved_agents {
568                            if let Ok(agent_storage) =
569                                storage_manager.get_agent_storage(agent_did).await
570                            {
571                                // Log the message
572                                match agent_storage
573                                    .log_message(
574                                        &message,
575                                        storage::MessageDirection::Incoming,
576                                        raw_message,
577                                    )
578                                    .await
579                                {
580                                    Ok(_) => log::debug!(
581                                        "Logged incoming message to agent {}: {}",
582                                        agent_did,
583                                        message.id
584                                    ),
585                                    Err(e) => log::warn!(
586                                        "Failed to log incoming message for agent {}: {}",
587                                        agent_did,
588                                        e
589                                    ),
590                                }
591
592                                // Store as transaction
593                                match agent_storage.insert_transaction(&message).await {
594                                    Ok(_) => log::debug!(
595                                        "Stored transaction for agent {}: {}",
596                                        agent_did,
597                                        message.id
598                                    ),
599                                    Err(e) => log::warn!(
600                                        "Failed to store transaction for agent {}: {}",
601                                        agent_did,
602                                        e
603                                    ),
604                                }
605                            } else {
606                                log::warn!("Failed to get storage for agent: {}", agent_did);
607                            }
608                        }
609                    }
610                } else {
611                    // For non-transaction messages, log to all recipient agents' storage
612                    let mut logged_to_any = false;
613
614                    for recipient_did in &message.to {
615                        // Check if this recipient is a registered agent
616                        if self.agents.has_agent(recipient_did) {
617                            if let Ok(agent_storage) =
618                                storage_manager.get_agent_storage(recipient_did).await
619                            {
620                                // Log the message to this recipient's storage
621                                match agent_storage
622                                    .log_message(
623                                        &message,
624                                        storage::MessageDirection::Incoming,
625                                        raw_message,
626                                    )
627                                    .await
628                                {
629                                    Ok(_) => {
630                                        log::debug!(
631                                            "Logged incoming message to recipient {}: {}",
632                                            recipient_did,
633                                            message.id
634                                        );
635                                        logged_to_any = true;
636                                    }
637                                    Err(e) => log::warn!(
638                                        "Failed to log incoming message for recipient {}: {}",
639                                        recipient_did,
640                                        e
641                                    ),
642                                }
643                            } else {
644                                log::warn!(
645                                    "Failed to get storage for recipient: {}",
646                                    recipient_did
647                                );
648                            }
649                        }
650                    }
651
652                    // If no recipients were logged, fall back to sender or router-based storage
653                    if !logged_to_any {
654                        match self.determine_message_agent(&message) {
655                            Ok(agent_did) => {
656                                if let Ok(agent_storage) =
657                                    storage_manager.get_agent_storage(&agent_did).await
658                                {
659                                    // Log the message to the agent's storage
660                                    match agent_storage
661                                        .log_message(
662                                            &message,
663                                            storage::MessageDirection::Incoming,
664                                            raw_message,
665                                        )
666                                        .await
667                                    {
668                                        Ok(_) => log::debug!(
669                                            "Logged incoming message to fallback agent {}: {}",
670                                            agent_did,
671                                            message.id
672                                        ),
673                                        Err(e) => log::warn!(
674                                            "Failed to log incoming message for fallback agent {}: {}",
675                                            agent_did,
676                                            e
677                                        ),
678                                    }
679                                } else {
680                                    log::warn!(
681                                        "Failed to get storage for fallback agent: {}",
682                                        agent_did
683                                    );
684                                }
685                            }
686                            Err(e) => {
687                                log::warn!(
688                                    "Failed to determine fallback agent for message storage: {}",
689                                    e
690                                );
691                                // Fall back to centralized storage if available
692                                if let Some(ref storage) = self.storage {
693                                    let _ = storage
694                                        .log_message(
695                                            &message,
696                                            storage::MessageDirection::Incoming,
697                                            raw_message,
698                                        )
699                                        .await;
700                                }
701                            }
702                        }
703                    }
704                }
705            }
706        }
707
708        // Process the incoming message
709        let processed_message = match self.incoming_processor.process_incoming(message).await? {
710            Some(msg) => msg,
711            None => return Ok(()), // PlainMessage was dropped during processing
712        };
713
714        // Deliver the message to all recipients in the 'to' field
715        let mut delivery_success = false;
716
717        for recipient_did in &processed_message.to {
718            // Check if we have a registered agent for this recipient
719            match self.agents.get_agent(recipient_did).await {
720                Ok(agent) => {
721                    // Let the agent process the plain message
722                    match agent.receive_plain_message(processed_message.clone()).await {
723                        Ok(_) => {
724                            log::debug!(
725                                "Successfully delivered message to agent: {}",
726                                recipient_did
727                            );
728                            delivery_success = true;
729                        }
730                        Err(e) => {
731                            log::warn!("Agent {} failed to process message: {}", recipient_did, e);
732                        }
733                    }
734                }
735                Err(e) => {
736                    log::debug!(
737                        "No registered agent found for recipient {}: {}",
738                        recipient_did,
739                        e
740                    );
741                    // This is not an error - the recipient might be external to this node
742                }
743            }
744        }
745
746        // If no recipients were successfully processed, try the router as fallback
747        if !delivery_success {
748            let target_did = match self.router.route_message(&processed_message).await {
749                Ok(did) => did,
750                Err(e) => {
751                    log::warn!("Unable to route message and no recipients processed: {}", e);
752                    return Ok(());
753                }
754            };
755
756            // Get the agent
757            let agent = match self.agents.get_agent(&target_did).await {
758                Ok(a) => a,
759                Err(e) => {
760                    log::warn!("Failed to get agent for dispatch: {}", e);
761                    return Ok(());
762                }
763            };
764
765            // Let the agent process the plain message
766            match agent.receive_plain_message(processed_message).await {
767                Ok(_) => {
768                    log::debug!("Successfully routed message to agent: {}", target_did);
769                }
770                Err(e) => {
771                    log::warn!("Agent failed to process message: {}", e);
772                }
773            }
774        }
775
776        Ok(())
777    }
778
779    /// Dispatch a message to an agent by DID
780    pub async fn dispatch_message(&self, target_did: String, message: PlainMessage) -> Result<()> {
781        let agent = self.agents.get_agent(&target_did).await?;
782
783        // Convert the message to a packed format for transport
784        let packed = agent.send_serialized_message(&message, &target_did).await?;
785
786        // Publish an event for the dispatched message
787        self.event_bus
788            .publish_agent_message(target_did, packed.into_bytes())
789            .await;
790
791        Ok(())
792    }
793
794    /// Send a message to an agent
795    pub async fn send_message(
796        &self,
797        sender_did: String,
798        to_did: String,
799        message: PlainMessage,
800    ) -> Result<String> {
801        // Log outgoing messages to agent-specific storage
802        #[cfg(feature = "storage")]
803        {
804            if let Some(ref storage_manager) = self.agent_storage_manager {
805                // Check if this is a transaction message
806                let message_type_lower = message.type_.to_lowercase();
807                let is_transaction = message_type_lower.contains("transfer")
808                    || message_type_lower.contains("payment");
809                log::debug!(
810                    "Message type: {}, is_transaction: {}",
811                    message.type_,
812                    is_transaction
813                );
814
815                if is_transaction {
816                    // For transactions, store in ALL involved agents' databases
817                    let involved_agents = self.extract_transaction_agents(&message);
818
819                    if involved_agents.is_empty() {
820                        log::warn!(
821                            "No registered agents found for outgoing transaction: {}",
822                            message.id
823                        );
824                    } else {
825                        log::debug!(
826                            "Storing outgoing transaction {} in {} agent databases",
827                            message.id,
828                            involved_agents.len()
829                        );
830
831                        // Store transaction in each involved agent's database
832                        for agent_did in &involved_agents {
833                            if let Ok(agent_storage) =
834                                storage_manager.get_agent_storage(agent_did).await
835                            {
836                                // Log the message
837                                match agent_storage
838                                    .log_message(
839                                        &message,
840                                        storage::MessageDirection::Outgoing,
841                                        None,
842                                    )
843                                    .await
844                                {
845                                    Ok(_) => log::debug!(
846                                        "Logged outgoing message to agent {}: {}",
847                                        agent_did,
848                                        message.id
849                                    ),
850                                    Err(e) => log::warn!(
851                                        "Failed to log outgoing message for agent {}: {}",
852                                        agent_did,
853                                        e
854                                    ),
855                                }
856
857                                // Store as transaction
858                                match agent_storage.insert_transaction(&message).await {
859                                    Ok(_) => log::debug!(
860                                        "Stored outgoing transaction for agent {}: {}",
861                                        agent_did,
862                                        message.id
863                                    ),
864                                    Err(e) => log::warn!(
865                                        "Failed to store outgoing transaction for agent {}: {}",
866                                        agent_did,
867                                        e
868                                    ),
869                                }
870                            } else {
871                                log::warn!("Failed to get storage for agent: {}", agent_did);
872                            }
873                        }
874                    }
875                } else {
876                    // For non-transaction messages, just store in sender's storage
877                    if let Ok(sender_storage) = storage_manager.get_agent_storage(&sender_did).await
878                    {
879                        // Log the message to the sender's storage
880                        match sender_storage
881                            .log_message(&message, storage::MessageDirection::Outgoing, None)
882                            .await
883                        {
884                            Ok(_) => log::debug!(
885                                "Logged outgoing message for agent {}: {}",
886                                sender_did,
887                                message.id
888                            ),
889                            Err(e) => log::warn!(
890                                "Failed to log outgoing message for agent {}: {}",
891                                sender_did,
892                                e
893                            ),
894                        }
895                    } else {
896                        log::warn!("Failed to get storage for sender agent: {}", sender_did);
897                        // Fall back to centralized storage if available
898                        if let Some(ref storage) = self.storage {
899                            let _ = storage
900                                .log_message(&message, storage::MessageDirection::Outgoing, None)
901                                .await;
902                        }
903                    }
904                }
905            }
906        }
907
908        // Process the outgoing message
909        let processed_message = match self.outgoing_processor.process_outgoing(message).await? {
910            Some(msg) => msg,
911            None => {
912                return Err(Error::MessageDropped(
913                    "PlainMessage dropped during processing".to_string(),
914                ))
915            }
916        };
917
918        // Get the sender agent
919        let agent = self.agents.get_agent(&sender_did).await?;
920
921        // Pack the message
922        let packed = agent
923            .send_serialized_message(&processed_message, to_did.as_str())
924            .await?;
925
926        // Publish an event for the message
927        self.event_bus
928            .publish_agent_message(sender_did, packed.clone().into_bytes())
929            .await;
930
931        Ok(packed)
932    }
933
934    /// Register a new agent with the node
935    ///
936    /// This method registers an agent with the TAP Node and automatically initializes
937    /// DID-specific storage for the agent. The storage directory structure follows:
938    /// - `~/.tap/{sanitized_did}/transactions.db` (default)
939    /// - `{tap_root}/{sanitized_did}/transactions.db` (if custom TAP root is configured)
940    ///
941    /// # Storage Initialization
942    ///
943    /// When an agent is registered, a dedicated SQLite database is created for that agent's DID.
944    /// This ensures transaction isolation between different agents while maintaining a consistent
945    /// storage structure. If storage initialization fails, the agent registration continues but
946    /// a warning is logged.
947    ///
948    /// # Arguments
949    ///
950    /// * `agent` - The TapAgent to register with the node
951    ///
952    /// # Returns
953    ///
954    /// * `Ok(())` if the agent was successfully registered
955    /// * `Err(Error)` if agent registration fails
956    pub async fn register_agent(&self, agent: Arc<TapAgent>) -> Result<()> {
957        let agent_did = agent.get_agent_did().to_string();
958
959        // Initialize storage for this agent if storage is enabled
960        #[cfg(feature = "storage")]
961        {
962            if let Some(ref storage_manager) = self.agent_storage_manager {
963                match storage_manager.ensure_agent_storage(&agent_did).await {
964                    Ok(_) => {
965                        log::info!("Initialized storage for agent: {}", agent_did);
966                    }
967                    Err(e) => {
968                        log::warn!(
969                            "Failed to initialize storage for agent {}: {}",
970                            agent_did,
971                            e
972                        );
973                        // Don't fail the registration, just log the warning
974                    }
975                }
976            }
977        }
978
979        self.agents.register_agent(agent_did.clone(), agent).await?;
980
981        // Publish event about agent registration
982        self.event_bus.publish_agent_registered(agent_did).await;
983
984        Ok(())
985    }
986
987    /// Unregister an agent from the node
988    pub async fn unregister_agent(&self, did: &str) -> Result<()> {
989        self.agents.unregister_agent(did).await?;
990
991        // Publish event about agent registration
992        self.event_bus
993            .publish_agent_unregistered(did.to_string())
994            .await;
995
996        Ok(())
997    }
998
999    /// Get a list of registered agent DIDs
1000    pub fn list_agents(&self) -> Vec<String> {
1001        self.agents.get_all_dids()
1002    }
1003
1004    /// Get a reference to the agent registry
1005    pub fn agents(&self) -> &Arc<AgentRegistry> {
1006        &self.agents
1007    }
1008
1009    /// Get a reference to the event bus
1010    pub fn event_bus(&self) -> &Arc<EventBus> {
1011        &self.event_bus
1012    }
1013
1014    /// Get a reference to the resolver
1015    pub fn resolver(&self) -> &Arc<MultiResolver> {
1016        &self.resolver
1017    }
1018
1019    /// Get a mutable reference to the processor pool
1020    /// This is a reference to `Option<ProcessorPool>` to allow starting the pool after node creation
1021    pub fn processor_pool_mut(&mut self) -> &mut Option<ProcessorPool> {
1022        &mut self.processor_pool
1023    }
1024
1025    /// Get the node configuration
1026    pub fn config(&self) -> &NodeConfig {
1027        &self.config
1028    }
1029
1030    /// Get a reference to the storage (if available)
1031    #[cfg(feature = "storage")]
1032    pub fn storage(&self) -> Option<&Arc<storage::Storage>> {
1033        self.storage.as_ref()
1034    }
1035
1036    /// Get a reference to the agent storage manager (if available)
1037    #[cfg(feature = "storage")]
1038    pub fn agent_storage_manager(&self) -> Option<&Arc<storage::AgentStorageManager>> {
1039        self.agent_storage_manager.as_ref()
1040    }
1041
1042    /// Determine which agent's storage should be used for a message
1043    ///
1044    /// This method uses the following strategy:
1045    /// 1. Use the first recipient DID if it's one of our registered agents
1046    /// 2. Use the sender DID if it's one of our registered agents  
1047    /// 3. If no local agents are involved, fall back to the first recipient
1048    #[cfg(feature = "storage")]
1049    fn determine_message_agent(&self, message: &PlainMessage) -> Result<String> {
1050        // Strategy 1: Use the first recipient DID if it's one of our agents
1051        for recipient in &message.to {
1052            if self.agents.has_agent(recipient) {
1053                return Ok(recipient.clone());
1054            }
1055        }
1056
1057        // Strategy 2: Use the sender DID if it's one of our agents
1058        if self.agents.has_agent(&message.from) {
1059            return Ok(message.from.clone());
1060        }
1061
1062        // Strategy 3: If no local agents involved, fall back to first recipient
1063        if !message.to.is_empty() {
1064            return Ok(message.to[0].clone());
1065        }
1066
1067        Err(Error::Storage(
1068            "Cannot determine agent for message storage".to_string(),
1069        ))
1070    }
1071
1072    /// Extract all agent DIDs involved in a transaction
1073    ///
1074    /// For Transfer and Payment messages, this includes:
1075    /// - Originator/Customer from the message body
1076    /// - Beneficiary/Merchant from the message body
1077    /// - All agents in the agents array
1078    /// - Sender (from) and recipients (to) from the message envelope
1079    #[cfg(feature = "storage")]
1080    fn extract_transaction_agents(&self, message: &PlainMessage) -> Vec<String> {
1081        use std::collections::HashSet;
1082        let mut agent_dids = HashSet::new();
1083
1084        log::debug!("Extracting transaction agents for message: {}", message.id);
1085
1086        // Add sender and recipients from message envelope
1087        agent_dids.insert(message.from.clone());
1088        log::debug!("Added sender: {}", message.from);
1089
1090        for recipient in &message.to {
1091            agent_dids.insert(recipient.clone());
1092            log::debug!("Added recipient: {}", recipient);
1093        }
1094
1095        // Extract agents from message body based on type
1096        let message_type_lower = message.type_.to_lowercase();
1097        log::debug!("Message type: {}", message_type_lower);
1098
1099        if message_type_lower.contains("transfer") {
1100            // Parse Transfer message body
1101            if let Ok(transfer) =
1102                serde_json::from_value::<tap_msg::message::Transfer>(message.body.clone())
1103            {
1104                // Add originator
1105                agent_dids.insert(transfer.originator.id.clone());
1106                log::debug!("Added originator: {}", transfer.originator.id);
1107
1108                // Add beneficiary if present
1109                if let Some(beneficiary) = &transfer.beneficiary {
1110                    agent_dids.insert(beneficiary.id.clone());
1111                    log::debug!("Added beneficiary: {}", beneficiary.id);
1112                }
1113
1114                // Add all agents
1115                for agent in &transfer.agents {
1116                    agent_dids.insert(agent.id.clone());
1117                    log::debug!("Added agent: {}", agent.id);
1118                }
1119            } else {
1120                log::warn!("Failed to parse Transfer message body");
1121            }
1122        } else if message_type_lower.contains("payment") {
1123            // Parse Payment message body
1124            if let Ok(payment) =
1125                serde_json::from_value::<tap_msg::message::Payment>(message.body.clone())
1126            {
1127                // Add merchant
1128                agent_dids.insert(payment.merchant.id.clone());
1129                log::debug!("Added merchant: {}", payment.merchant.id);
1130
1131                // Add customer if present
1132                if let Some(customer) = &payment.customer {
1133                    agent_dids.insert(customer.id.clone());
1134                    log::debug!("Added customer: {}", customer.id);
1135                }
1136
1137                // Add all agents
1138                for agent in &payment.agents {
1139                    agent_dids.insert(agent.id.clone());
1140                    log::debug!("Added agent: {}", agent.id);
1141                }
1142            } else {
1143                log::warn!("Failed to parse Payment message body");
1144            }
1145        }
1146
1147        log::debug!("Total unique agents found: {}", agent_dids.len());
1148
1149        // Convert to Vec and filter to only include registered agents
1150        let registered_agents: Vec<String> = agent_dids
1151            .into_iter()
1152            .filter(|did| {
1153                let is_registered = self.agents.has_agent(did);
1154                log::debug!("Agent {} registered: {}", did, is_registered);
1155                is_registered
1156            })
1157            .collect();
1158
1159        log::debug!(
1160            "Registered agents involved in transaction: {:?}",
1161            registered_agents
1162        );
1163        registered_agents
1164    }
1165}
1166
1167// Namespace imports
1168// These imports make the implementation cleaner, but should be hidden from public API
1169use message::processor::DefaultPlainMessageProcessor;
1170use message::processor::LoggingPlainMessageProcessor;
1171use message::processor::ValidationPlainMessageProcessor;
1172use message::processor_pool::{ProcessorPool, ProcessorPoolConfig};
1173use message::router::DefaultPlainMessageRouter;
1174use message::RouterAsyncExt;