tap_node/lib.rs
1//! TAP Node - A node implementation for the TAP protocol
2//!
3//! The TAP Node is the central component that manages TAP Agents, routes messages,
4//! processes events, stores transactions, and provides a scalable architecture for TAP deployments.
5//!
6//! # Architecture Overview
7//!
8//! The TAP Node acts as a message router and coordinator for multiple TAP Agents. It provides:
9//!
10//! - **Efficient Message Processing**: Different handling for plain, signed, and encrypted messages
11//! - **Centralized Verification**: Signed messages are verified once for all agents
12//! - **Smart Routing**: Encrypted messages are routed to appropriate recipient agents
13//! - **Scalable Design**: Supports multiple agents with concurrent message processing
14//!
15//! # Key Components
16//!
17//! - **Agent Registry**: Manages multiple TAP Agents
18//! - **Event Bus**: Publishes and distributes events throughout the system
19//! - **Message Processors**: Process incoming and outgoing messages
20//! - **Message Router**: Routes messages to the appropriate agent
21//! - **DID Resolver**: Resolves DIDs for signature verification
22//! - **Storage**: Persistent SQLite storage with transaction tracking and audit trails
23//!
24//! # Message Processing Flow
25//!
26//! The TAP Node uses an optimized message processing flow based on message type:
27//!
28//! ## Signed Messages (JWS)
29//! 1. **Single Verification**: Signature verified once using DID resolver
30//! 2. **Routing**: Verified PlainMessage routed to appropriate agent
31//! 3. **Processing**: Agent receives verified message via `receive_plain_message()`
32//!
33//! ## Encrypted Messages (JWE)
34//! 1. **Recipient Identification**: Extract recipient DIDs from JWE headers
35//! 2. **Agent Routing**: Send encrypted message to each matching agent
36//! 3. **Decryption**: Each agent attempts decryption via `receive_encrypted_message()`
37//! 4. **Processing**: Successfully decrypted messages are processed by the agent
38//!
39//! ## Plain Messages
40//! 1. **Direct Processing**: Plain messages processed through the pipeline
41//! 2. **Routing**: Routed to appropriate agent
42//! 3. **Delivery**: Agent receives via `receive_plain_message()`
43//!
44//! # Benefits of This Architecture
45//!
46//! - **Efficiency**: Signed messages verified once, not per-agent
47//! - **Scalability**: Encrypted messages naturally distributed to recipients
48//! - **Flexibility**: Agents remain fully functional standalone
49//! - **Security**: Centralized verification with distributed decryption
50//!
51//! # Thread Safety and Concurrency
52//!
53//! The TAP Node is designed with concurrent operations in mind. It uses a combination of
54//! async/await patterns and synchronization primitives to safely handle multiple operations
55//! simultaneously. Most components within the node are either immutable or use interior
56//! mutability with appropriate synchronization.
57//!
58//! # Example Usage
59//!
60//! ```rust,no_run
61//! use tap_node::{TapNode, NodeConfig};
62//! use tap_agent::TapAgent;
63//! use std::sync::Arc;
64//!
65//! #[tokio::main]
66//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
67//! // Create node
68//! let config = NodeConfig::default();
69//! let node = Arc::new(TapNode::new(config));
70//!
71//! // Create and register agent
72//! let (agent, _did) = TapAgent::from_ephemeral_key().await?;
73//! node.register_agent(Arc::new(agent)).await?;
74//!
75//! // Process incoming message (JSON Value)
76//! let message_value = serde_json::json!({
77//! "id": "msg-123",
78//! "type": "test-message",
79//! "body": {"content": "Hello"}
80//! });
81//!
82//! node.receive_message(message_value).await?;
83//! Ok(())
84//! }
85//! ```
86
87pub mod agent;
88pub mod error;
89pub mod event;
90pub mod message;
91#[cfg(feature = "storage")]
92pub mod state_machine;
93pub mod storage;
94#[cfg(feature = "storage")]
95pub mod validation;
96
97pub use error::{Error, Result};
98pub use event::logger::{EventLogger, EventLoggerConfig, LogDestination};
99pub use event::{EventSubscriber, NodeEvent};
100pub use message::sender::{
101 HttpPlainMessageSender, NodePlainMessageSender, PlainMessageSender, WebSocketPlainMessageSender,
102};
103
104use std::sync::Arc;
105
106use tap_agent::{Agent, TapAgent};
107// use tap_agent::message_packing::PackOptions;
108use tap_msg::didcomm::PlainMessage;
109
110use crate::message::processor::PlainMessageProcessor;
111use crate::message::{
112 CompositePlainMessageProcessor, CompositePlainMessageRouter, PlainMessageProcessorType,
113 PlainMessageRouterType,
114};
115use agent::AgentRegistry;
116use event::EventBus;
117use tap_agent::did::MultiResolver;
118
119use async_trait::async_trait;
120
121// Extension trait for TapAgent to add serialization methods
122///
123/// This trait extends the TapAgent with methods for serializing and packing
124/// DIDComm messages for transmission. It provides functionality for converting
125/// in-memory message objects to secure, serialized formats that follow the
126/// DIDComm messaging protocol standards.
127#[async_trait]
128pub trait TapAgentExt {
129 /// Pack and serialize a DIDComm message for transmission
130 ///
131 /// This method takes a DIDComm message and recipient DID, then:
132 /// 1. Uses the agent's PlainMessagePacker to properly sign and encrypt the message
133 /// 2. Serializes the message to a string format
134 ///
135 /// # Parameters
136 /// * `message` - The DIDComm message to serialize
137 /// * `to_did` - The DID of the recipient
138 ///
139 /// # Returns
140 /// The packed message as a string, ready for transmission
141 async fn send_serialized_message(&self, message: &PlainMessage, to_did: &str)
142 -> Result<String>;
143}
144
145#[async_trait]
146impl TapAgentExt for TapAgent {
147 async fn send_serialized_message(
148 &self,
149 message: &PlainMessage,
150 _to_did: &str,
151 ) -> Result<String> {
152 // Serialize the PlainMessage to JSON first to work around the TapMessageBody trait constraint
153 let json_value =
154 serde_json::to_value(message).map_err(|e| Error::Serialization(e.to_string()))?;
155
156 // Use JSON string for transportation instead of direct message passing
157 // This bypasses the need for PlainMessage to implement TapMessageBody
158 let serialized =
159 serde_json::to_string(&json_value).map_err(|e| Error::Serialization(e.to_string()))?;
160
161 Ok(serialized)
162 }
163}
164
165/// Configuration for a TAP Node
166#[derive(Debug, Clone, Default)]
167pub struct NodeConfig {
168 /// Debug mode
169 pub debug: bool,
170 /// Maximum number of agents
171 pub max_agents: Option<usize>,
172 /// Whether to enable message logging
173 pub enable_message_logging: bool,
174 /// Whether to log full message content
175 pub log_message_content: bool,
176 /// Configuration for the processor pool
177 pub processor_pool: Option<ProcessorPoolConfig>,
178 /// Configuration for the event logger
179 pub event_logger: Option<EventLoggerConfig>,
180 /// Path to the storage database (None for default)
181 #[cfg(feature = "storage")]
182 pub storage_path: Option<std::path::PathBuf>,
183 /// Agent DID for storage organization
184 #[cfg(feature = "storage")]
185 pub agent_did: Option<String>,
186 /// Custom TAP root directory (defaults to ~/.tap)
187 #[cfg(feature = "storage")]
188 pub tap_root: Option<std::path::PathBuf>,
189}
190
191/// # The TAP Node
192///
193/// The TAP Node is the core component responsible for coordinating message processing, routing, and delivery
194/// to TAP Agents. It serves as a central hub for all TAP communications and transaction coordination.
195///
196/// ## Core Responsibilities
197///
198/// - **Agent Management**: Registration and deregistration of TAP Agents
199/// - **PlainMessage Processing**: Processing incoming and outgoing messages through middleware chains
200/// - **PlainMessage Routing**: Determining the appropriate recipient for each message
201/// - **Event Publishing**: Broadcasting node events to subscribers
202/// - **Scalability**: Managing concurrent message processing through worker pools
203///
204/// ## Lifecycle
205///
206/// 1. Create a node with appropriate configuration
207/// 2. Register one or more agents with the node
208/// 3. Start the processor pool (if high throughput is required)
209/// 4. Process incoming/outgoing messages
210/// 5. Publish and respond to events
211///
212/// ## Thread Safety
213///
214/// The `TapNode` is designed to be thread-safe and can be shared across multiple
215/// threads using an `Arc<TapNode>`. All internal mutability is handled through
216/// appropriate synchronization primitives.
217#[derive(Clone)]
218pub struct TapNode {
219 /// Agent registry
220 agents: Arc<AgentRegistry>,
221 /// Event bus
222 event_bus: Arc<EventBus>,
223 /// Incoming message processor
224 incoming_processor: CompositePlainMessageProcessor,
225 /// Outgoing message processor
226 outgoing_processor: CompositePlainMessageProcessor,
227 /// PlainMessage router
228 router: CompositePlainMessageRouter,
229 /// Resolver for DIDs
230 resolver: Arc<MultiResolver>,
231 /// Worker pool for handling messages
232 processor_pool: Option<ProcessorPool>,
233 /// Node configuration
234 config: NodeConfig,
235 /// Storage for transactions (legacy centralized storage)
236 #[cfg(feature = "storage")]
237 storage: Option<Arc<storage::Storage>>,
238 /// Agent-specific storage manager
239 #[cfg(feature = "storage")]
240 agent_storage_manager: Option<Arc<storage::AgentStorageManager>>,
241 /// Transaction state processor
242 #[cfg(feature = "storage")]
243 state_processor: Option<Arc<state_machine::StandardTransactionProcessor>>,
244}
245
246impl TapNode {
247 /// Create a new TAP node with the given configuration
248 pub fn new(config: NodeConfig) -> Self {
249 // Create the agent registry
250 let agents = Arc::new(AgentRegistry::new(config.max_agents));
251
252 // Create the event bus
253 let event_bus = Arc::new(EventBus::new());
254
255 // Create the message router
256 let default_router = PlainMessageRouterType::Default(DefaultPlainMessageRouter::new());
257
258 let router = CompositePlainMessageRouter::new(vec![default_router]);
259
260 // Create the message processors
261 let logging_processor = PlainMessageProcessorType::Logging(LoggingPlainMessageProcessor);
262 let validation_processor =
263 PlainMessageProcessorType::Validation(ValidationPlainMessageProcessor);
264 let default_processor = PlainMessageProcessorType::Default(DefaultPlainMessageProcessor);
265
266 let incoming_processor = CompositePlainMessageProcessor::new(vec![
267 logging_processor.clone(),
268 validation_processor.clone(),
269 default_processor.clone(),
270 ]);
271
272 let outgoing_processor = CompositePlainMessageProcessor::new(vec![
273 logging_processor,
274 validation_processor,
275 default_processor,
276 ]);
277
278 // Create the resolver
279 let resolver = Arc::new(MultiResolver::default());
280
281 // Storage will be initialized on first use
282 #[cfg(feature = "storage")]
283 let storage = None;
284 #[cfg(feature = "storage")]
285 let agent_storage_manager = Some(Arc::new(storage::AgentStorageManager::new(
286 config.tap_root.clone(),
287 )));
288 #[cfg(feature = "storage")]
289 let state_processor = None;
290
291 let node = Self {
292 agents,
293 event_bus,
294 incoming_processor,
295 outgoing_processor,
296 router,
297 resolver,
298 processor_pool: None,
299 config,
300 #[cfg(feature = "storage")]
301 storage,
302 #[cfg(feature = "storage")]
303 agent_storage_manager,
304 #[cfg(feature = "storage")]
305 state_processor,
306 };
307
308 // Set up the event logger if configured
309 if let Some(logger_config) = &node.config.event_logger {
310 let event_logger = Arc::new(EventLogger::new(logger_config.clone()));
311
312 // We need to handle the async subscribe in a blocking context
313 // This is safe because EventBus methods are designed to be called in this way
314 let event_bus = node.event_bus.clone();
315 tokio::task::block_in_place(|| {
316 tokio::runtime::Handle::current().block_on(async {
317 event_bus.subscribe(event_logger).await;
318 })
319 });
320 }
321
322 node
323 }
324
325 /// Initialize storage asynchronously
326 #[cfg(feature = "storage")]
327 pub async fn init_storage(&mut self) -> Result<()> {
328 let storage = if let Some(agent_did) = &self.config.agent_did {
329 // Use new DID-based storage structure
330 match storage::Storage::new_with_did(agent_did, self.config.tap_root.clone()).await {
331 Ok(s) => s,
332 Err(e) => {
333 log::error!("Failed to initialize storage with DID: {}", e);
334 return Err(Error::Storage(e.to_string()));
335 }
336 }
337 } else if let Some(storage_path) = self.config.storage_path.clone() {
338 // Use explicit path
339 match storage::Storage::new(Some(storage_path)).await {
340 Ok(s) => s,
341 Err(e) => {
342 log::error!("Failed to initialize storage: {}", e);
343 return Err(Error::Storage(e.to_string()));
344 }
345 }
346 } else {
347 // Initialize with default path
348 match storage::Storage::new(None).await {
349 Ok(s) => s,
350 Err(e) => {
351 log::error!("Failed to initialize storage: {}", e);
352 return Err(Error::Storage(e.to_string()));
353 }
354 }
355 };
356
357 let storage_arc = Arc::new(storage);
358
359 // Subscribe event handlers
360 let message_status_handler = Arc::new(event::handlers::MessageStatusHandler::new(
361 storage_arc.clone(),
362 ));
363 self.event_bus.subscribe(message_status_handler).await;
364
365 let transaction_state_handler = Arc::new(event::handlers::TransactionStateHandler::new(
366 storage_arc.clone(),
367 ));
368 self.event_bus.subscribe(transaction_state_handler).await;
369
370 let transaction_audit_handler = Arc::new(event::handlers::TransactionAuditHandler::new());
371 self.event_bus.subscribe(transaction_audit_handler).await;
372
373 // Create state processor
374 let state_processor = Arc::new(state_machine::StandardTransactionProcessor::new(
375 storage_arc.clone(),
376 self.event_bus.clone(),
377 self.agents.clone(),
378 ));
379
380 self.storage = Some(storage_arc);
381 self.state_processor = Some(state_processor);
382 Ok(())
383 }
384
385 /// Start the node
386 pub async fn start(&mut self, config: ProcessorPoolConfig) -> Result<()> {
387 let processor_pool = ProcessorPool::new(config);
388 self.processor_pool = Some(processor_pool);
389 Ok(())
390 }
391
392 /// Receive and process an incoming message
393 ///
394 /// This method handles the complete lifecycle of an incoming message:
395 ///
396 /// 1. Determining the message type (plain, signed, or encrypted)
397 /// 2. Verifying signatures or routing to agents for decryption
398 /// 3. Processing the resulting plain messages through the pipeline
399 /// 4. Routing and dispatching to the appropriate agents
400 ///
401 /// # Parameters
402 ///
403 /// * `message` - The message as a JSON Value (can be plain, JWS, or JWE)
404 ///
405 /// # Returns
406 ///
407 /// * `Ok(())` if the message was successfully processed
408 /// * `Err(Error)` if there was an error during processing
409 pub async fn receive_message(&self, message: serde_json::Value) -> Result<()> {
410 // Store the raw message for logging
411 let raw_message = serde_json::to_string(&message).ok();
412 use tap_agent::{verify_jws, Jwe, Jws};
413
414 // Determine message type
415 let is_encrypted =
416 message.get("protected").is_some() && message.get("recipients").is_some();
417 let is_signed = message.get("payload").is_some() && message.get("signatures").is_some();
418
419 if is_signed {
420 // Verify signature once using resolver
421 let jws: Jws = serde_json::from_value(message)
422 .map_err(|e| Error::Serialization(format!("Failed to parse JWS: {}", e)))?;
423
424 let plain_message = verify_jws(&jws, &*self.resolver)
425 .await
426 .map_err(|e| Error::Verification(format!("JWS verification failed: {}", e)))?;
427
428 // Process the verified plain message
429 self.process_plain_message(plain_message, raw_message.as_deref())
430 .await
431 } else if is_encrypted {
432 // Route encrypted message to each matching agent
433 let jwe: Jwe = serde_json::from_value(message.clone())
434 .map_err(|e| Error::Serialization(format!("Failed to parse JWE: {}", e)))?;
435
436 // Find agents that match recipients
437 let mut processed = false;
438 for recipient in &jwe.recipients {
439 if let Some(did) = recipient.header.kid.split('#').next() {
440 if let Ok(agent) = self.agents.get_agent(did).await {
441 // Let the agent handle decryption and processing
442 match agent.receive_encrypted_message(&message).await {
443 Ok(_) => {
444 processed = true;
445 log::debug!(
446 "Agent {} successfully processed encrypted message",
447 did
448 );
449 }
450 Err(e) => {
451 log::debug!(
452 "Agent {} couldn't process encrypted message: {}",
453 did,
454 e
455 );
456 }
457 }
458 }
459 }
460 }
461
462 if !processed {
463 return Err(Error::Processing(
464 "No agent could process the encrypted message".to_string(),
465 ));
466 }
467 Ok(())
468 } else {
469 // Plain message - parse and process
470 let plain_message: PlainMessage = serde_json::from_value(message).map_err(|e| {
471 Error::Serialization(format!("Failed to parse PlainMessage: {}", e))
472 })?;
473
474 self.process_plain_message(plain_message, raw_message.as_deref())
475 .await
476 }
477 }
478
479 /// Process a plain message through the pipeline
480 async fn process_plain_message(
481 &self,
482 message: PlainMessage,
483 raw_message: Option<&str>,
484 ) -> Result<()> {
485 // Validate the message if storage/validation is available
486 #[cfg(feature = "storage")]
487 {
488 if let Some(ref storage) = self.storage {
489 // Create validator
490 let validator_config = validation::StandardValidatorConfig {
491 max_timestamp_drift_secs: 60,
492 storage: storage.clone(),
493 };
494 let validator = validation::create_standard_validator(validator_config).await;
495
496 // Validate the message
497 use crate::validation::{MessageValidator, ValidationResult};
498 match validator.validate(&message).await {
499 ValidationResult::Accept => {
500 // Publish accepted event
501 self.event_bus
502 .publish_message_accepted(
503 message.id.clone(),
504 message.type_.clone(),
505 message.from.clone(),
506 message.to.first().cloned().unwrap_or_default(),
507 )
508 .await;
509 }
510 ValidationResult::Reject(reason) => {
511 // Publish rejected event
512 self.event_bus
513 .publish_message_rejected(
514 message.id.clone(),
515 reason.clone(),
516 message.from.clone(),
517 message.to.first().cloned().unwrap_or_default(),
518 )
519 .await;
520
521 // Return error to stop processing
522 return Err(Error::Validation(reason));
523 }
524 }
525 }
526 }
527
528 // Process message through state machine if available
529 #[cfg(feature = "storage")]
530 {
531 if let Some(ref state_processor) = self.state_processor {
532 use crate::state_machine::TransactionStateProcessor;
533 if let Err(e) = state_processor.process_message(&message).await {
534 log::warn!("State processor error: {}", e);
535 // Don't fail the entire message processing, just log the error
536 }
537 }
538 }
539 // Log incoming messages to agent-specific storage
540 #[cfg(feature = "storage")]
541 {
542 if let Some(ref storage_manager) = self.agent_storage_manager {
543 // Check if this is a transaction message
544 let message_type_lower = message.type_.to_lowercase();
545 let is_transaction = message_type_lower.contains("transfer")
546 || message_type_lower.contains("payment");
547 log::debug!(
548 "Message type: {}, is_transaction: {}",
549 message.type_,
550 is_transaction
551 );
552
553 if is_transaction {
554 // For transactions, store in ALL involved agents' databases
555 let involved_agents = self.extract_transaction_agents(&message);
556
557 if involved_agents.is_empty() {
558 log::warn!("No registered agents found for transaction: {}", message.id);
559 } else {
560 log::debug!(
561 "Storing transaction {} in {} agent databases",
562 message.id,
563 involved_agents.len()
564 );
565
566 // Store transaction in each involved agent's database
567 for agent_did in &involved_agents {
568 if let Ok(agent_storage) =
569 storage_manager.get_agent_storage(agent_did).await
570 {
571 // Log the message
572 match agent_storage
573 .log_message(
574 &message,
575 storage::MessageDirection::Incoming,
576 raw_message,
577 )
578 .await
579 {
580 Ok(_) => log::debug!(
581 "Logged incoming message to agent {}: {}",
582 agent_did,
583 message.id
584 ),
585 Err(e) => log::warn!(
586 "Failed to log incoming message for agent {}: {}",
587 agent_did,
588 e
589 ),
590 }
591
592 // Store as transaction
593 match agent_storage.insert_transaction(&message).await {
594 Ok(_) => log::debug!(
595 "Stored transaction for agent {}: {}",
596 agent_did,
597 message.id
598 ),
599 Err(e) => log::warn!(
600 "Failed to store transaction for agent {}: {}",
601 agent_did,
602 e
603 ),
604 }
605 } else {
606 log::warn!("Failed to get storage for agent: {}", agent_did);
607 }
608 }
609 }
610 } else {
611 // For non-transaction messages, log to all recipient agents' storage
612 let mut logged_to_any = false;
613
614 for recipient_did in &message.to {
615 // Check if this recipient is a registered agent
616 if self.agents.has_agent(recipient_did) {
617 if let Ok(agent_storage) =
618 storage_manager.get_agent_storage(recipient_did).await
619 {
620 // Log the message to this recipient's storage
621 match agent_storage
622 .log_message(
623 &message,
624 storage::MessageDirection::Incoming,
625 raw_message,
626 )
627 .await
628 {
629 Ok(_) => {
630 log::debug!(
631 "Logged incoming message to recipient {}: {}",
632 recipient_did,
633 message.id
634 );
635 logged_to_any = true;
636 }
637 Err(e) => log::warn!(
638 "Failed to log incoming message for recipient {}: {}",
639 recipient_did,
640 e
641 ),
642 }
643 } else {
644 log::warn!(
645 "Failed to get storage for recipient: {}",
646 recipient_did
647 );
648 }
649 }
650 }
651
652 // If no recipients were logged, fall back to sender or router-based storage
653 if !logged_to_any {
654 match self.determine_message_agent(&message) {
655 Ok(agent_did) => {
656 if let Ok(agent_storage) =
657 storage_manager.get_agent_storage(&agent_did).await
658 {
659 // Log the message to the agent's storage
660 match agent_storage
661 .log_message(
662 &message,
663 storage::MessageDirection::Incoming,
664 raw_message,
665 )
666 .await
667 {
668 Ok(_) => log::debug!(
669 "Logged incoming message to fallback agent {}: {}",
670 agent_did,
671 message.id
672 ),
673 Err(e) => log::warn!(
674 "Failed to log incoming message for fallback agent {}: {}",
675 agent_did,
676 e
677 ),
678 }
679 } else {
680 log::warn!(
681 "Failed to get storage for fallback agent: {}",
682 agent_did
683 );
684 }
685 }
686 Err(e) => {
687 log::warn!(
688 "Failed to determine fallback agent for message storage: {}",
689 e
690 );
691 // Fall back to centralized storage if available
692 if let Some(ref storage) = self.storage {
693 let _ = storage
694 .log_message(
695 &message,
696 storage::MessageDirection::Incoming,
697 raw_message,
698 )
699 .await;
700 }
701 }
702 }
703 }
704 }
705 }
706 }
707
708 // Process the incoming message
709 let processed_message = match self.incoming_processor.process_incoming(message).await? {
710 Some(msg) => msg,
711 None => return Ok(()), // PlainMessage was dropped during processing
712 };
713
714 // Deliver the message to all recipients in the 'to' field
715 let mut delivery_success = false;
716
717 for recipient_did in &processed_message.to {
718 // Check if we have a registered agent for this recipient
719 match self.agents.get_agent(recipient_did).await {
720 Ok(agent) => {
721 // Let the agent process the plain message
722 match agent.receive_plain_message(processed_message.clone()).await {
723 Ok(_) => {
724 log::debug!(
725 "Successfully delivered message to agent: {}",
726 recipient_did
727 );
728 delivery_success = true;
729 }
730 Err(e) => {
731 log::warn!("Agent {} failed to process message: {}", recipient_did, e);
732 }
733 }
734 }
735 Err(e) => {
736 log::debug!(
737 "No registered agent found for recipient {}: {}",
738 recipient_did,
739 e
740 );
741 // This is not an error - the recipient might be external to this node
742 }
743 }
744 }
745
746 // If no recipients were successfully processed, try the router as fallback
747 if !delivery_success {
748 let target_did = match self.router.route_message(&processed_message).await {
749 Ok(did) => did,
750 Err(e) => {
751 log::warn!("Unable to route message and no recipients processed: {}", e);
752 return Ok(());
753 }
754 };
755
756 // Get the agent
757 let agent = match self.agents.get_agent(&target_did).await {
758 Ok(a) => a,
759 Err(e) => {
760 log::warn!("Failed to get agent for dispatch: {}", e);
761 return Ok(());
762 }
763 };
764
765 // Let the agent process the plain message
766 match agent.receive_plain_message(processed_message).await {
767 Ok(_) => {
768 log::debug!("Successfully routed message to agent: {}", target_did);
769 }
770 Err(e) => {
771 log::warn!("Agent failed to process message: {}", e);
772 }
773 }
774 }
775
776 Ok(())
777 }
778
779 /// Dispatch a message to an agent by DID
780 pub async fn dispatch_message(&self, target_did: String, message: PlainMessage) -> Result<()> {
781 let agent = self.agents.get_agent(&target_did).await?;
782
783 // Convert the message to a packed format for transport
784 let packed = agent.send_serialized_message(&message, &target_did).await?;
785
786 // Publish an event for the dispatched message
787 self.event_bus
788 .publish_agent_message(target_did, packed.into_bytes())
789 .await;
790
791 Ok(())
792 }
793
794 /// Send a message to an agent
795 pub async fn send_message(
796 &self,
797 sender_did: String,
798 to_did: String,
799 message: PlainMessage,
800 ) -> Result<String> {
801 // Log outgoing messages to agent-specific storage
802 #[cfg(feature = "storage")]
803 {
804 if let Some(ref storage_manager) = self.agent_storage_manager {
805 // Check if this is a transaction message
806 let message_type_lower = message.type_.to_lowercase();
807 let is_transaction = message_type_lower.contains("transfer")
808 || message_type_lower.contains("payment");
809 log::debug!(
810 "Message type: {}, is_transaction: {}",
811 message.type_,
812 is_transaction
813 );
814
815 if is_transaction {
816 // For transactions, store in ALL involved agents' databases
817 let involved_agents = self.extract_transaction_agents(&message);
818
819 if involved_agents.is_empty() {
820 log::warn!(
821 "No registered agents found for outgoing transaction: {}",
822 message.id
823 );
824 } else {
825 log::debug!(
826 "Storing outgoing transaction {} in {} agent databases",
827 message.id,
828 involved_agents.len()
829 );
830
831 // Store transaction in each involved agent's database
832 for agent_did in &involved_agents {
833 if let Ok(agent_storage) =
834 storage_manager.get_agent_storage(agent_did).await
835 {
836 // Log the message
837 match agent_storage
838 .log_message(
839 &message,
840 storage::MessageDirection::Outgoing,
841 None,
842 )
843 .await
844 {
845 Ok(_) => log::debug!(
846 "Logged outgoing message to agent {}: {}",
847 agent_did,
848 message.id
849 ),
850 Err(e) => log::warn!(
851 "Failed to log outgoing message for agent {}: {}",
852 agent_did,
853 e
854 ),
855 }
856
857 // Store as transaction
858 match agent_storage.insert_transaction(&message).await {
859 Ok(_) => log::debug!(
860 "Stored outgoing transaction for agent {}: {}",
861 agent_did,
862 message.id
863 ),
864 Err(e) => log::warn!(
865 "Failed to store outgoing transaction for agent {}: {}",
866 agent_did,
867 e
868 ),
869 }
870 } else {
871 log::warn!("Failed to get storage for agent: {}", agent_did);
872 }
873 }
874 }
875 } else {
876 // For non-transaction messages, just store in sender's storage
877 if let Ok(sender_storage) = storage_manager.get_agent_storage(&sender_did).await
878 {
879 // Log the message to the sender's storage
880 match sender_storage
881 .log_message(&message, storage::MessageDirection::Outgoing, None)
882 .await
883 {
884 Ok(_) => log::debug!(
885 "Logged outgoing message for agent {}: {}",
886 sender_did,
887 message.id
888 ),
889 Err(e) => log::warn!(
890 "Failed to log outgoing message for agent {}: {}",
891 sender_did,
892 e
893 ),
894 }
895 } else {
896 log::warn!("Failed to get storage for sender agent: {}", sender_did);
897 // Fall back to centralized storage if available
898 if let Some(ref storage) = self.storage {
899 let _ = storage
900 .log_message(&message, storage::MessageDirection::Outgoing, None)
901 .await;
902 }
903 }
904 }
905 }
906 }
907
908 // Process the outgoing message
909 let processed_message = match self.outgoing_processor.process_outgoing(message).await? {
910 Some(msg) => msg,
911 None => {
912 return Err(Error::MessageDropped(
913 "PlainMessage dropped during processing".to_string(),
914 ))
915 }
916 };
917
918 // Get the sender agent
919 let agent = self.agents.get_agent(&sender_did).await?;
920
921 // Pack the message
922 let packed = agent
923 .send_serialized_message(&processed_message, to_did.as_str())
924 .await?;
925
926 // Publish an event for the message
927 self.event_bus
928 .publish_agent_message(sender_did, packed.clone().into_bytes())
929 .await;
930
931 Ok(packed)
932 }
933
934 /// Register a new agent with the node
935 ///
936 /// This method registers an agent with the TAP Node and automatically initializes
937 /// DID-specific storage for the agent. The storage directory structure follows:
938 /// - `~/.tap/{sanitized_did}/transactions.db` (default)
939 /// - `{tap_root}/{sanitized_did}/transactions.db` (if custom TAP root is configured)
940 ///
941 /// # Storage Initialization
942 ///
943 /// When an agent is registered, a dedicated SQLite database is created for that agent's DID.
944 /// This ensures transaction isolation between different agents while maintaining a consistent
945 /// storage structure. If storage initialization fails, the agent registration continues but
946 /// a warning is logged.
947 ///
948 /// # Arguments
949 ///
950 /// * `agent` - The TapAgent to register with the node
951 ///
952 /// # Returns
953 ///
954 /// * `Ok(())` if the agent was successfully registered
955 /// * `Err(Error)` if agent registration fails
956 pub async fn register_agent(&self, agent: Arc<TapAgent>) -> Result<()> {
957 let agent_did = agent.get_agent_did().to_string();
958
959 // Initialize storage for this agent if storage is enabled
960 #[cfg(feature = "storage")]
961 {
962 if let Some(ref storage_manager) = self.agent_storage_manager {
963 match storage_manager.ensure_agent_storage(&agent_did).await {
964 Ok(_) => {
965 log::info!("Initialized storage for agent: {}", agent_did);
966 }
967 Err(e) => {
968 log::warn!(
969 "Failed to initialize storage for agent {}: {}",
970 agent_did,
971 e
972 );
973 // Don't fail the registration, just log the warning
974 }
975 }
976 }
977 }
978
979 self.agents.register_agent(agent_did.clone(), agent).await?;
980
981 // Publish event about agent registration
982 self.event_bus.publish_agent_registered(agent_did).await;
983
984 Ok(())
985 }
986
987 /// Unregister an agent from the node
988 pub async fn unregister_agent(&self, did: &str) -> Result<()> {
989 self.agents.unregister_agent(did).await?;
990
991 // Publish event about agent registration
992 self.event_bus
993 .publish_agent_unregistered(did.to_string())
994 .await;
995
996 Ok(())
997 }
998
999 /// Get a list of registered agent DIDs
1000 pub fn list_agents(&self) -> Vec<String> {
1001 self.agents.get_all_dids()
1002 }
1003
1004 /// Get a reference to the agent registry
1005 pub fn agents(&self) -> &Arc<AgentRegistry> {
1006 &self.agents
1007 }
1008
1009 /// Get a reference to the event bus
1010 pub fn event_bus(&self) -> &Arc<EventBus> {
1011 &self.event_bus
1012 }
1013
1014 /// Get a reference to the resolver
1015 pub fn resolver(&self) -> &Arc<MultiResolver> {
1016 &self.resolver
1017 }
1018
1019 /// Get a mutable reference to the processor pool
1020 /// This is a reference to `Option<ProcessorPool>` to allow starting the pool after node creation
1021 pub fn processor_pool_mut(&mut self) -> &mut Option<ProcessorPool> {
1022 &mut self.processor_pool
1023 }
1024
1025 /// Get the node configuration
1026 pub fn config(&self) -> &NodeConfig {
1027 &self.config
1028 }
1029
1030 /// Get a reference to the storage (if available)
1031 #[cfg(feature = "storage")]
1032 pub fn storage(&self) -> Option<&Arc<storage::Storage>> {
1033 self.storage.as_ref()
1034 }
1035
1036 /// Get a reference to the agent storage manager (if available)
1037 #[cfg(feature = "storage")]
1038 pub fn agent_storage_manager(&self) -> Option<&Arc<storage::AgentStorageManager>> {
1039 self.agent_storage_manager.as_ref()
1040 }
1041
1042 /// Determine which agent's storage should be used for a message
1043 ///
1044 /// This method uses the following strategy:
1045 /// 1. Use the first recipient DID if it's one of our registered agents
1046 /// 2. Use the sender DID if it's one of our registered agents
1047 /// 3. If no local agents are involved, fall back to the first recipient
1048 #[cfg(feature = "storage")]
1049 fn determine_message_agent(&self, message: &PlainMessage) -> Result<String> {
1050 // Strategy 1: Use the first recipient DID if it's one of our agents
1051 for recipient in &message.to {
1052 if self.agents.has_agent(recipient) {
1053 return Ok(recipient.clone());
1054 }
1055 }
1056
1057 // Strategy 2: Use the sender DID if it's one of our agents
1058 if self.agents.has_agent(&message.from) {
1059 return Ok(message.from.clone());
1060 }
1061
1062 // Strategy 3: If no local agents involved, fall back to first recipient
1063 if !message.to.is_empty() {
1064 return Ok(message.to[0].clone());
1065 }
1066
1067 Err(Error::Storage(
1068 "Cannot determine agent for message storage".to_string(),
1069 ))
1070 }
1071
1072 /// Extract all agent DIDs involved in a transaction
1073 ///
1074 /// For Transfer and Payment messages, this includes:
1075 /// - Originator/Customer from the message body
1076 /// - Beneficiary/Merchant from the message body
1077 /// - All agents in the agents array
1078 /// - Sender (from) and recipients (to) from the message envelope
1079 #[cfg(feature = "storage")]
1080 fn extract_transaction_agents(&self, message: &PlainMessage) -> Vec<String> {
1081 use std::collections::HashSet;
1082 let mut agent_dids = HashSet::new();
1083
1084 log::debug!("Extracting transaction agents for message: {}", message.id);
1085
1086 // Add sender and recipients from message envelope
1087 agent_dids.insert(message.from.clone());
1088 log::debug!("Added sender: {}", message.from);
1089
1090 for recipient in &message.to {
1091 agent_dids.insert(recipient.clone());
1092 log::debug!("Added recipient: {}", recipient);
1093 }
1094
1095 // Extract agents from message body based on type
1096 let message_type_lower = message.type_.to_lowercase();
1097 log::debug!("Message type: {}", message_type_lower);
1098
1099 if message_type_lower.contains("transfer") {
1100 // Parse Transfer message body
1101 if let Ok(transfer) =
1102 serde_json::from_value::<tap_msg::message::Transfer>(message.body.clone())
1103 {
1104 // Add originator
1105 agent_dids.insert(transfer.originator.id.clone());
1106 log::debug!("Added originator: {}", transfer.originator.id);
1107
1108 // Add beneficiary if present
1109 if let Some(beneficiary) = &transfer.beneficiary {
1110 agent_dids.insert(beneficiary.id.clone());
1111 log::debug!("Added beneficiary: {}", beneficiary.id);
1112 }
1113
1114 // Add all agents
1115 for agent in &transfer.agents {
1116 agent_dids.insert(agent.id.clone());
1117 log::debug!("Added agent: {}", agent.id);
1118 }
1119 } else {
1120 log::warn!("Failed to parse Transfer message body");
1121 }
1122 } else if message_type_lower.contains("payment") {
1123 // Parse Payment message body
1124 if let Ok(payment) =
1125 serde_json::from_value::<tap_msg::message::Payment>(message.body.clone())
1126 {
1127 // Add merchant
1128 agent_dids.insert(payment.merchant.id.clone());
1129 log::debug!("Added merchant: {}", payment.merchant.id);
1130
1131 // Add customer if present
1132 if let Some(customer) = &payment.customer {
1133 agent_dids.insert(customer.id.clone());
1134 log::debug!("Added customer: {}", customer.id);
1135 }
1136
1137 // Add all agents
1138 for agent in &payment.agents {
1139 agent_dids.insert(agent.id.clone());
1140 log::debug!("Added agent: {}", agent.id);
1141 }
1142 } else {
1143 log::warn!("Failed to parse Payment message body");
1144 }
1145 }
1146
1147 log::debug!("Total unique agents found: {}", agent_dids.len());
1148
1149 // Convert to Vec and filter to only include registered agents
1150 let registered_agents: Vec<String> = agent_dids
1151 .into_iter()
1152 .filter(|did| {
1153 let is_registered = self.agents.has_agent(did);
1154 log::debug!("Agent {} registered: {}", did, is_registered);
1155 is_registered
1156 })
1157 .collect();
1158
1159 log::debug!(
1160 "Registered agents involved in transaction: {:?}",
1161 registered_agents
1162 );
1163 registered_agents
1164 }
1165}
1166
1167// Namespace imports
1168// These imports make the implementation cleaner, but should be hidden from public API
1169use message::processor::DefaultPlainMessageProcessor;
1170use message::processor::LoggingPlainMessageProcessor;
1171use message::processor::ValidationPlainMessageProcessor;
1172use message::processor_pool::{ProcessorPool, ProcessorPoolConfig};
1173use message::router::DefaultPlainMessageRouter;
1174use message::RouterAsyncExt;