tap_node/
lib.rs

1//! TAP Node - A node implementation for the TAP protocol
2//!
3//! The TAP Node is the central component that manages TAP Agents, routes messages,
4//! processes events, and provides a scalable architecture for TAP deployments.
5//!
6//! # Key Components
7//!
8//! - **Agent Registry**: Manages multiple TAP Agents
9//! - **Event Bus**: Publishes and distributes events throughout the system
10//! - **Message Processors**: Process incoming and outgoing messages
11//! - **Message Router**: Routes messages to the appropriate agent
12//! - **Processor Pool**: Provides scalable concurrent message processing
13//!
14//! # Thread Safety and Concurrency
15//!
16//! The TAP Node is designed with concurrent operations in mind. It uses a combination of
17//! async/await patterns and synchronization primitives to safely handle multiple operations
18//! simultaneously. Most components within the node are either immutable or use interior
19//! mutability with appropriate synchronization.
20//!
21//! # Message Flow
22//!
23//! Messages in TAP Node follow a structured flow:
24//!
25//! 1. **Receipt**: Messages are received through the `receive_message` method
26//! 2. **Processing**: Each message is processed by the registered processors
27//! 3. **Routing**: The router determines which agent should handle the message
28//! 4. **Dispatch**: The message is delivered to the appropriate agent
29//! 5. **Response**: Responses are handled similarly in the reverse direction
30//!
31//! # Scalability
32//!
33//! The node supports scalable message processing through the optional processor pool,
34//! which uses a configurable number of worker threads to process messages concurrently.
35//! This allows a single node to handle high message throughput while maintaining
36//! shared between threads, with all mutable state protected by appropriate synchronization
37//! primitives.
38
39pub mod agent;
40pub mod error;
41pub mod event;
42pub mod message;
43pub mod resolver;
44
45pub use error::{Error, Result};
46pub use event::logger::{EventLogger, EventLoggerConfig, LogDestination};
47pub use event::{EventSubscriber, NodeEvent};
48pub use message::sender::{
49    HttpPlainMessageSender, NodePlainMessageSender, PlainMessageSender, WebSocketPlainMessageSender,
50};
51
52use std::sync::Arc;
53
54use tap_agent::{Agent, TapAgent};
55// use tap_agent::message_packing::PackOptions;
56use tap_msg::didcomm::PlainMessage;
57
58use crate::message::processor::PlainMessageProcessor;
59use crate::message::{
60    CompositePlainMessageProcessor, CompositePlainMessageRouter, PlainMessageProcessorType,
61    PlainMessageRouterType,
62};
63use agent::AgentRegistry;
64use event::EventBus;
65use resolver::NodeResolver;
66
67use async_trait::async_trait;
68
69// Extension trait for TapAgent to add serialization methods
70///
71/// This trait extends the TapAgent with methods for serializing and packing
72/// DIDComm messages for transmission. It provides functionality for converting
73/// in-memory message objects to secure, serialized formats that follow the
74/// DIDComm messaging protocol standards.
75#[async_trait]
76pub trait TapAgentExt {
77    /// Pack and serialize a DIDComm message for transmission
78    ///
79    /// This method takes a DIDComm message and recipient DID, then:
80    /// 1. Uses the agent's PlainMessagePacker to properly sign and encrypt the message
81    /// 2. Serializes the message to a string format
82    ///
83    /// # Parameters
84    /// * `message` - The DIDComm message to serialize
85    /// * `to_did` - The DID of the recipient
86    ///
87    /// # Returns
88    /// The packed message as a string, ready for transmission
89    async fn send_serialized_message(&self, message: &PlainMessage, to_did: &str)
90        -> Result<String>;
91}
92
93#[async_trait]
94impl TapAgentExt for TapAgent {
95    async fn send_serialized_message(
96        &self,
97        message: &PlainMessage,
98        _to_did: &str,
99    ) -> Result<String> {
100        // Serialize the PlainMessage to JSON first to work around the TapMessageBody trait constraint
101        let json_value = serde_json::to_value(message).map_err(Error::Serialization)?;
102
103        // Use JSON string for transportation instead of direct message passing
104        // This bypasses the need for PlainMessage to implement TapMessageBody
105        let serialized = serde_json::to_string(&json_value).map_err(Error::Serialization)?;
106
107        Ok(serialized)
108    }
109}
110
111/// Configuration for a TAP Node
112#[derive(Debug, Clone, Default)]
113pub struct NodeConfig {
114    /// Debug mode
115    pub debug: bool,
116    /// Maximum number of agents
117    pub max_agents: Option<usize>,
118    /// Whether to enable message logging
119    pub enable_message_logging: bool,
120    /// Whether to log full message content
121    pub log_message_content: bool,
122    /// Configuration for the processor pool
123    pub processor_pool: Option<ProcessorPoolConfig>,
124    /// Configuration for the event logger
125    pub event_logger: Option<EventLoggerConfig>,
126}
127
128/// # The TAP Node
129///
130/// The TAP Node is the core component responsible for coordinating message processing, routing, and delivery
131/// to TAP Agents. It serves as a central hub for all TAP communications and transaction coordination.
132///
133/// ## Core Responsibilities
134///
135/// - **Agent Management**: Registration and deregistration of TAP Agents
136/// - **PlainMessage Processing**: Processing incoming and outgoing messages through middleware chains
137/// - **PlainMessage Routing**: Determining the appropriate recipient for each message
138/// - **Event Publishing**: Broadcasting node events to subscribers
139/// - **Scalability**: Managing concurrent message processing through worker pools
140///
141/// ## Lifecycle
142///
143/// 1. Create a node with appropriate configuration
144/// 2. Register one or more agents with the node
145/// 3. Start the processor pool (if high throughput is required)
146/// 4. Process incoming/outgoing messages
147/// 5. Publish and respond to events
148///
149/// ## Thread Safety
150///
151/// The `TapNode` is designed to be thread-safe and can be shared across multiple
152/// threads using an `Arc<TapNode>`. All internal mutability is handled through
153/// appropriate synchronization primitives.
154#[derive(Clone)]
155pub struct TapNode {
156    /// Agent registry
157    agents: Arc<AgentRegistry>,
158    /// Event bus
159    event_bus: Arc<EventBus>,
160    /// Incoming message processor
161    incoming_processor: CompositePlainMessageProcessor,
162    /// Outgoing message processor
163    outgoing_processor: CompositePlainMessageProcessor,
164    /// PlainMessage router
165    router: CompositePlainMessageRouter,
166    /// Resolver for DIDs
167    resolver: Arc<NodeResolver>,
168    /// Worker pool for handling messages
169    processor_pool: Option<ProcessorPool>,
170    /// Node configuration
171    config: NodeConfig,
172}
173
174impl TapNode {
175    /// Create a new TAP node with the given configuration
176    pub fn new(config: NodeConfig) -> Self {
177        // Create the agent registry
178        let agents = Arc::new(AgentRegistry::new(config.max_agents));
179
180        // Create the event bus
181        let event_bus = Arc::new(EventBus::new());
182
183        // Create the message router
184        let default_router = PlainMessageRouterType::Default(DefaultPlainMessageRouter::new());
185
186        let router = CompositePlainMessageRouter::new(vec![default_router]);
187
188        // Create the message processors
189        let logging_processor = PlainMessageProcessorType::Logging(LoggingPlainMessageProcessor);
190        let validation_processor =
191            PlainMessageProcessorType::Validation(ValidationPlainMessageProcessor);
192        let default_processor = PlainMessageProcessorType::Default(DefaultPlainMessageProcessor);
193
194        let incoming_processor = CompositePlainMessageProcessor::new(vec![
195            logging_processor.clone(),
196            validation_processor.clone(),
197            default_processor.clone(),
198        ]);
199
200        let outgoing_processor = CompositePlainMessageProcessor::new(vec![
201            logging_processor,
202            validation_processor,
203            default_processor,
204        ]);
205
206        // Create the resolver
207        let resolver = Arc::new(NodeResolver::default());
208
209        let node = Self {
210            agents,
211            event_bus,
212            incoming_processor,
213            outgoing_processor,
214            router,
215            resolver,
216            processor_pool: None,
217            config,
218        };
219
220        // Set up the event logger if configured
221        if let Some(logger_config) = &node.config.event_logger {
222            let event_logger = Arc::new(EventLogger::new(logger_config.clone()));
223
224            // We need to handle the async subscribe in a blocking context
225            // This is safe because EventBus methods are designed to be called in this way
226            let event_bus = node.event_bus.clone();
227            tokio::task::block_in_place(|| {
228                tokio::runtime::Handle::current().block_on(async {
229                    event_bus.subscribe(event_logger).await;
230                })
231            });
232        }
233
234        node
235    }
236
237    /// Start the node
238    pub async fn start(&mut self, config: ProcessorPoolConfig) -> Result<()> {
239        let processor_pool = ProcessorPool::new(config);
240        self.processor_pool = Some(processor_pool);
241        Ok(())
242    }
243
244    /// Receive and process an incoming message
245    ///
246    /// This method handles the complete lifecycle of an incoming message:
247    ///
248    /// 1. Processing the message through all registered processors
249    /// 2. Routing the message to determine the appropriate target agent
250    /// 3. Dispatching the message to the target agent
251    ///
252    /// The processing pipeline may transform or even drop the message based on
253    /// validation rules or other processing logic. If a message is dropped during
254    /// processing, this method will return Ok(()) without an error.
255    ///
256    /// # Parameters
257    ///
258    /// * `message` - The DIDComm message to be processed
259    ///
260    /// # Returns
261    ///
262    /// * `Ok(())` if the message was successfully processed and dispatched (or intentionally dropped)
263    /// * `Err(Error)` if there was an error during processing, routing, or dispatching
264    ///
265    /// # Errors
266    ///
267    /// This method can return errors for several reasons:
268    /// * Processing errors from message processors
269    /// * Routing errors if no target agent can be determined
270    /// * Dispatch errors if the target agent cannot be found or fails to process the message
271    ///
272    /// # Example
273    ///
274    /// ```no_run
275    /// # use tap_node::{TapNode, NodeConfig};
276    /// # use tap_msg::didcomm::PlainMessage;
277    /// # async fn example(node: &TapNode, message: PlainMessage) -> Result<(), tap_node::Error> {
278    /// // Process an incoming message
279    /// node.receive_message(message).await?;
280    /// # Ok(())
281    /// # }
282    /// ```
283    pub async fn receive_message(&self, message: PlainMessage) -> Result<()> {
284        // Process the incoming message
285        let processed_message = match self.incoming_processor.process_incoming(message).await? {
286            Some(msg) => msg,
287            None => return Ok(()), // PlainMessage was dropped during processing
288        };
289
290        // Route the message to the appropriate agent
291        let target_did = match self.router.route_message(&processed_message).await {
292            Ok(did) => did,
293            Err(e) => {
294                // Log the error but don't fail the entire operation
295                log::warn!("Unable to route message: {}", e);
296                return Ok(());
297            }
298        };
299
300        // Dispatch the message to the agent, handling any errors
301        match self.dispatch_message(target_did, processed_message).await {
302            Ok(_) => Ok(()),
303            Err(e) => {
304                // Log the error but don't fail the entire operation
305                log::warn!("Failed to dispatch message: {}", e);
306                Ok(())
307            }
308        }
309    }
310
311    /// Dispatch a message to an agent by DID
312    pub async fn dispatch_message(&self, target_did: String, message: PlainMessage) -> Result<()> {
313        let agent = self.agents.get_agent(&target_did).await?;
314
315        // Convert the message to a packed format for transport
316        let packed = agent.send_serialized_message(&message, &target_did).await?;
317
318        // Publish an event for the dispatched message
319        self.event_bus
320            .publish_agent_message(target_did, packed.into_bytes())
321            .await;
322
323        Ok(())
324    }
325
326    /// Send a message to an agent
327    pub async fn send_message(
328        &self,
329        sender_did: String,
330        to_did: String,
331        message: PlainMessage,
332    ) -> Result<String> {
333        // Process the outgoing message
334        let processed_message = match self.outgoing_processor.process_outgoing(message).await? {
335            Some(msg) => msg,
336            None => {
337                return Err(Error::MessageDropped(
338                    "PlainMessage dropped during processing".to_string(),
339                ))
340            }
341        };
342
343        // Get the sender agent
344        let agent = self.agents.get_agent(&sender_did).await?;
345
346        // Pack the message
347        let packed = agent
348            .send_serialized_message(&processed_message, to_did.as_str())
349            .await?;
350
351        // Publish an event for the message
352        self.event_bus
353            .publish_agent_message(sender_did, packed.clone().into_bytes())
354            .await;
355
356        Ok(packed)
357    }
358
359    /// Register a new agent with the node
360    pub async fn register_agent(&self, agent: Arc<TapAgent>) -> Result<()> {
361        let agent_did = agent.get_agent_did().to_string();
362        self.agents.register_agent(agent_did.clone(), agent).await?;
363
364        // Publish event about agent registration
365        self.event_bus.publish_agent_registered(agent_did).await;
366
367        Ok(())
368    }
369
370    /// Unregister an agent from the node
371    pub async fn unregister_agent(&self, did: &str) -> Result<()> {
372        self.agents.unregister_agent(did).await?;
373
374        // Publish event about agent registration
375        self.event_bus
376            .publish_agent_unregistered(did.to_string())
377            .await;
378
379        Ok(())
380    }
381
382    /// Get a list of registered agent DIDs
383    pub fn list_agents(&self) -> Vec<String> {
384        self.agents.get_all_dids()
385    }
386
387    /// Get a reference to the agent registry
388    pub fn agents(&self) -> &Arc<AgentRegistry> {
389        &self.agents
390    }
391
392    /// Get a reference to the event bus
393    pub fn event_bus(&self) -> &Arc<EventBus> {
394        &self.event_bus
395    }
396
397    /// Get a reference to the resolver
398    pub fn resolver(&self) -> &Arc<NodeResolver> {
399        &self.resolver
400    }
401
402    /// Get a mutable reference to the processor pool
403    /// This is a reference to Option<ProcessorPool> to allow starting the pool after node creation
404    pub fn processor_pool_mut(&mut self) -> &mut Option<ProcessorPool> {
405        &mut self.processor_pool
406    }
407
408    /// Get the node configuration
409    pub fn config(&self) -> &NodeConfig {
410        &self.config
411    }
412}
413
414// Namespace imports
415// These imports make the implementation cleaner, but should be hidden from public API
416use message::processor::DefaultPlainMessageProcessor;
417use message::processor::LoggingPlainMessageProcessor;
418use message::processor::ValidationPlainMessageProcessor;
419use message::processor_pool::{ProcessorPool, ProcessorPoolConfig};
420use message::router::DefaultPlainMessageRouter;
421use message::RouterAsyncExt;