tap_node/lib.rs
1//! TAP Node - A node implementation for the TAP protocol
2//!
3//! The TAP Node is the central component that manages TAP Agents, routes messages,
4//! processes events, and provides a scalable architecture for TAP deployments.
5//!
6//! # Key Components
7//!
8//! - **Agent Registry**: Manages multiple TAP Agents
9//! - **Event Bus**: Publishes and distributes events throughout the system
10//! - **Message Processors**: Process incoming and outgoing messages
11//! - **Message Router**: Routes messages to the appropriate agent
12//! - **Processor Pool**: Provides scalable concurrent message processing
13//!
14//! # Thread Safety and Concurrency
15//!
16//! The TAP Node is designed with concurrent operations in mind. It uses a combination of
17//! async/await patterns and synchronization primitives to safely handle multiple operations
18//! simultaneously. Most components within the node are either immutable or use interior
19//! mutability with appropriate synchronization.
20//!
21//! # Message Flow
22//!
23//! Messages in TAP Node follow a structured flow:
24//!
25//! 1. **Receipt**: Messages are received through the `receive_message` method
26//! 2. **Processing**: Each message is processed by the registered processors
27//! 3. **Routing**: The router determines which agent should handle the message
28//! 4. **Dispatch**: The message is delivered to the appropriate agent
29//! 5. **Response**: Responses are handled similarly in the reverse direction
30//!
31//! # Scalability
32//!
33//! The node supports scalable message processing through the optional processor pool,
34//! which uses a configurable number of worker threads to process messages concurrently.
35//! This allows a single node to handle high message throughput while maintaining
36//! shared between threads, with all mutable state protected by appropriate synchronization
37//! primitives.
38
39pub mod agent;
40pub mod error;
41pub mod event;
42pub mod message;
43pub mod resolver;
44
45pub use error::{Error, Result};
46pub use event::logger::{EventLogger, EventLoggerConfig, LogDestination};
47pub use event::{EventSubscriber, NodeEvent};
48pub use message::sender::{
49 HttpPlainMessageSender, NodePlainMessageSender, PlainMessageSender, WebSocketPlainMessageSender,
50};
51
52use std::sync::Arc;
53
54use tap_agent::{Agent, TapAgent};
55// use tap_agent::message_packing::PackOptions;
56use tap_msg::didcomm::PlainMessage;
57
58use crate::message::processor::PlainMessageProcessor;
59use crate::message::{
60 CompositePlainMessageProcessor, CompositePlainMessageRouter, PlainMessageProcessorType,
61 PlainMessageRouterType,
62};
63use agent::AgentRegistry;
64use event::EventBus;
65use resolver::NodeResolver;
66
67use async_trait::async_trait;
68
69// Extension trait for TapAgent to add serialization methods
70///
71/// This trait extends the TapAgent with methods for serializing and packing
72/// DIDComm messages for transmission. It provides functionality for converting
73/// in-memory message objects to secure, serialized formats that follow the
74/// DIDComm messaging protocol standards.
75#[async_trait]
76pub trait TapAgentExt {
77 /// Pack and serialize a DIDComm message for transmission
78 ///
79 /// This method takes a DIDComm message and recipient DID, then:
80 /// 1. Uses the agent's PlainMessagePacker to properly sign and encrypt the message
81 /// 2. Serializes the message to a string format
82 ///
83 /// # Parameters
84 /// * `message` - The DIDComm message to serialize
85 /// * `to_did` - The DID of the recipient
86 ///
87 /// # Returns
88 /// The packed message as a string, ready for transmission
89 async fn send_serialized_message(&self, message: &PlainMessage, to_did: &str)
90 -> Result<String>;
91}
92
93#[async_trait]
94impl TapAgentExt for TapAgent {
95 async fn send_serialized_message(
96 &self,
97 message: &PlainMessage,
98 _to_did: &str,
99 ) -> Result<String> {
100 // Serialize the PlainMessage to JSON first to work around the TapMessageBody trait constraint
101 let json_value = serde_json::to_value(message).map_err(Error::Serialization)?;
102
103 // Use JSON string for transportation instead of direct message passing
104 // This bypasses the need for PlainMessage to implement TapMessageBody
105 let serialized = serde_json::to_string(&json_value).map_err(Error::Serialization)?;
106
107 Ok(serialized)
108 }
109}
110
111/// Configuration for a TAP Node
112#[derive(Debug, Clone, Default)]
113pub struct NodeConfig {
114 /// Debug mode
115 pub debug: bool,
116 /// Maximum number of agents
117 pub max_agents: Option<usize>,
118 /// Whether to enable message logging
119 pub enable_message_logging: bool,
120 /// Whether to log full message content
121 pub log_message_content: bool,
122 /// Configuration for the processor pool
123 pub processor_pool: Option<ProcessorPoolConfig>,
124 /// Configuration for the event logger
125 pub event_logger: Option<EventLoggerConfig>,
126}
127
128/// # The TAP Node
129///
130/// The TAP Node is the core component responsible for coordinating message processing, routing, and delivery
131/// to TAP Agents. It serves as a central hub for all TAP communications and transaction coordination.
132///
133/// ## Core Responsibilities
134///
135/// - **Agent Management**: Registration and deregistration of TAP Agents
136/// - **PlainMessage Processing**: Processing incoming and outgoing messages through middleware chains
137/// - **PlainMessage Routing**: Determining the appropriate recipient for each message
138/// - **Event Publishing**: Broadcasting node events to subscribers
139/// - **Scalability**: Managing concurrent message processing through worker pools
140///
141/// ## Lifecycle
142///
143/// 1. Create a node with appropriate configuration
144/// 2. Register one or more agents with the node
145/// 3. Start the processor pool (if high throughput is required)
146/// 4. Process incoming/outgoing messages
147/// 5. Publish and respond to events
148///
149/// ## Thread Safety
150///
151/// The `TapNode` is designed to be thread-safe and can be shared across multiple
152/// threads using an `Arc<TapNode>`. All internal mutability is handled through
153/// appropriate synchronization primitives.
154#[derive(Clone)]
155pub struct TapNode {
156 /// Agent registry
157 agents: Arc<AgentRegistry>,
158 /// Event bus
159 event_bus: Arc<EventBus>,
160 /// Incoming message processor
161 incoming_processor: CompositePlainMessageProcessor,
162 /// Outgoing message processor
163 outgoing_processor: CompositePlainMessageProcessor,
164 /// PlainMessage router
165 router: CompositePlainMessageRouter,
166 /// Resolver for DIDs
167 resolver: Arc<NodeResolver>,
168 /// Worker pool for handling messages
169 processor_pool: Option<ProcessorPool>,
170 /// Node configuration
171 config: NodeConfig,
172}
173
174impl TapNode {
175 /// Create a new TAP node with the given configuration
176 pub fn new(config: NodeConfig) -> Self {
177 // Create the agent registry
178 let agents = Arc::new(AgentRegistry::new(config.max_agents));
179
180 // Create the event bus
181 let event_bus = Arc::new(EventBus::new());
182
183 // Create the message router
184 let default_router = PlainMessageRouterType::Default(DefaultPlainMessageRouter::new());
185
186 let router = CompositePlainMessageRouter::new(vec![default_router]);
187
188 // Create the message processors
189 let logging_processor = PlainMessageProcessorType::Logging(LoggingPlainMessageProcessor);
190 let validation_processor =
191 PlainMessageProcessorType::Validation(ValidationPlainMessageProcessor);
192 let default_processor = PlainMessageProcessorType::Default(DefaultPlainMessageProcessor);
193
194 let incoming_processor = CompositePlainMessageProcessor::new(vec![
195 logging_processor.clone(),
196 validation_processor.clone(),
197 default_processor.clone(),
198 ]);
199
200 let outgoing_processor = CompositePlainMessageProcessor::new(vec![
201 logging_processor,
202 validation_processor,
203 default_processor,
204 ]);
205
206 // Create the resolver
207 let resolver = Arc::new(NodeResolver::default());
208
209 let node = Self {
210 agents,
211 event_bus,
212 incoming_processor,
213 outgoing_processor,
214 router,
215 resolver,
216 processor_pool: None,
217 config,
218 };
219
220 // Set up the event logger if configured
221 if let Some(logger_config) = &node.config.event_logger {
222 let event_logger = Arc::new(EventLogger::new(logger_config.clone()));
223
224 // We need to handle the async subscribe in a blocking context
225 // This is safe because EventBus methods are designed to be called in this way
226 let event_bus = node.event_bus.clone();
227 tokio::task::block_in_place(|| {
228 tokio::runtime::Handle::current().block_on(async {
229 event_bus.subscribe(event_logger).await;
230 })
231 });
232 }
233
234 node
235 }
236
237 /// Start the node
238 pub async fn start(&mut self, config: ProcessorPoolConfig) -> Result<()> {
239 let processor_pool = ProcessorPool::new(config);
240 self.processor_pool = Some(processor_pool);
241 Ok(())
242 }
243
244 /// Receive and process an incoming message
245 ///
246 /// This method handles the complete lifecycle of an incoming message:
247 ///
248 /// 1. Processing the message through all registered processors
249 /// 2. Routing the message to determine the appropriate target agent
250 /// 3. Dispatching the message to the target agent
251 ///
252 /// The processing pipeline may transform or even drop the message based on
253 /// validation rules or other processing logic. If a message is dropped during
254 /// processing, this method will return Ok(()) without an error.
255 ///
256 /// # Parameters
257 ///
258 /// * `message` - The DIDComm message to be processed
259 ///
260 /// # Returns
261 ///
262 /// * `Ok(())` if the message was successfully processed and dispatched (or intentionally dropped)
263 /// * `Err(Error)` if there was an error during processing, routing, or dispatching
264 ///
265 /// # Errors
266 ///
267 /// This method can return errors for several reasons:
268 /// * Processing errors from message processors
269 /// * Routing errors if no target agent can be determined
270 /// * Dispatch errors if the target agent cannot be found or fails to process the message
271 ///
272 /// # Example
273 ///
274 /// ```no_run
275 /// # use tap_node::{TapNode, NodeConfig};
276 /// # use tap_msg::didcomm::PlainMessage;
277 /// # async fn example(node: &TapNode, message: PlainMessage) -> Result<(), tap_node::Error> {
278 /// // Process an incoming message
279 /// node.receive_message(message).await?;
280 /// # Ok(())
281 /// # }
282 /// ```
283 pub async fn receive_message(&self, message: PlainMessage) -> Result<()> {
284 // Process the incoming message
285 let processed_message = match self.incoming_processor.process_incoming(message).await? {
286 Some(msg) => msg,
287 None => return Ok(()), // PlainMessage was dropped during processing
288 };
289
290 // Route the message to the appropriate agent
291 let target_did = match self.router.route_message(&processed_message).await {
292 Ok(did) => did,
293 Err(e) => {
294 // Log the error but don't fail the entire operation
295 log::warn!("Unable to route message: {}", e);
296 return Ok(());
297 }
298 };
299
300 // Dispatch the message to the agent, handling any errors
301 match self.dispatch_message(target_did, processed_message).await {
302 Ok(_) => Ok(()),
303 Err(e) => {
304 // Log the error but don't fail the entire operation
305 log::warn!("Failed to dispatch message: {}", e);
306 Ok(())
307 }
308 }
309 }
310
311 /// Dispatch a message to an agent by DID
312 pub async fn dispatch_message(&self, target_did: String, message: PlainMessage) -> Result<()> {
313 let agent = self.agents.get_agent(&target_did).await?;
314
315 // Convert the message to a packed format for transport
316 let packed = agent.send_serialized_message(&message, &target_did).await?;
317
318 // Publish an event for the dispatched message
319 self.event_bus
320 .publish_agent_message(target_did, packed.into_bytes())
321 .await;
322
323 Ok(())
324 }
325
326 /// Send a message to an agent
327 pub async fn send_message(
328 &self,
329 sender_did: String,
330 to_did: String,
331 message: PlainMessage,
332 ) -> Result<String> {
333 // Process the outgoing message
334 let processed_message = match self.outgoing_processor.process_outgoing(message).await? {
335 Some(msg) => msg,
336 None => {
337 return Err(Error::MessageDropped(
338 "PlainMessage dropped during processing".to_string(),
339 ))
340 }
341 };
342
343 // Get the sender agent
344 let agent = self.agents.get_agent(&sender_did).await?;
345
346 // Pack the message
347 let packed = agent
348 .send_serialized_message(&processed_message, to_did.as_str())
349 .await?;
350
351 // Publish an event for the message
352 self.event_bus
353 .publish_agent_message(sender_did, packed.clone().into_bytes())
354 .await;
355
356 Ok(packed)
357 }
358
359 /// Register a new agent with the node
360 pub async fn register_agent(&self, agent: Arc<TapAgent>) -> Result<()> {
361 let agent_did = agent.get_agent_did().to_string();
362 self.agents.register_agent(agent_did.clone(), agent).await?;
363
364 // Publish event about agent registration
365 self.event_bus.publish_agent_registered(agent_did).await;
366
367 Ok(())
368 }
369
370 /// Unregister an agent from the node
371 pub async fn unregister_agent(&self, did: &str) -> Result<()> {
372 self.agents.unregister_agent(did).await?;
373
374 // Publish event about agent registration
375 self.event_bus
376 .publish_agent_unregistered(did.to_string())
377 .await;
378
379 Ok(())
380 }
381
382 /// Get a list of registered agent DIDs
383 pub fn list_agents(&self) -> Vec<String> {
384 self.agents.get_all_dids()
385 }
386
387 /// Get a reference to the agent registry
388 pub fn agents(&self) -> &Arc<AgentRegistry> {
389 &self.agents
390 }
391
392 /// Get a reference to the event bus
393 pub fn event_bus(&self) -> &Arc<EventBus> {
394 &self.event_bus
395 }
396
397 /// Get a reference to the resolver
398 pub fn resolver(&self) -> &Arc<NodeResolver> {
399 &self.resolver
400 }
401
402 /// Get a mutable reference to the processor pool
403 /// This is a reference to Option<ProcessorPool> to allow starting the pool after node creation
404 pub fn processor_pool_mut(&mut self) -> &mut Option<ProcessorPool> {
405 &mut self.processor_pool
406 }
407
408 /// Get the node configuration
409 pub fn config(&self) -> &NodeConfig {
410 &self.config
411 }
412}
413
414// Namespace imports
415// These imports make the implementation cleaner, but should be hidden from public API
416use message::processor::DefaultPlainMessageProcessor;
417use message::processor::LoggingPlainMessageProcessor;
418use message::processor::ValidationPlainMessageProcessor;
419use message::processor_pool::{ProcessorPool, ProcessorPoolConfig};
420use message::router::DefaultPlainMessageRouter;
421use message::RouterAsyncExt;