tap_node/lib.rs
1//! # TAP Node Implementation
2//!
3//! This crate provides a complete node implementation for the Transaction Authorization Protocol (TAP).
4//! The TAP Node acts as a central hub for managing multiple TAP agents, routing messages between them,
5//! and coordinating the entire transaction lifecycle in a secure and scalable manner.
6//!
7//! ## Overview
8//!
9//! The Transaction Authorization Protocol (TAP) is designed for secure, privacy-preserving financial
10//! transactions between parties. A TAP Node serves as the infrastructure layer that:
11//!
12//! - Manages multiple TAP agents with different roles and capabilities
13//! - Processes incoming and outgoing TAP messages
14//! - Routes messages to the appropriate agents based on DID addressing
15//! - Handles message validation, transformation, and delivery
16//! - Provides an event system for monitoring and reacting to node activities
17//! - Scales to handle high message throughput with a processing pool
18//!
19//! ## Architecture
20//!
21//! The TAP Node is built with several key components:
22//!
23//! - **Agent Registry**: Maintains a collection of TAP agents by their DIDs
24//! - **Message Processors**: Process, validate, and transform messages
25//! - **Message Routers**: Determine the target agent for a message
26//! - **Processor Pool**: Provides concurrent message processing for scalability
27//! - **Event Bus**: Broadcasts node events to subscribers
28//! - **DID Resolver**: Resolves DIDs to DID Documents for message verification
29//!
30//! ## Usage Example
31//!
32//! ```rust,no_run
33//! use std::sync::Arc;
34//! use tap_agent::{AgentConfig, DefaultAgent};
35//! use tap_node::{NodeConfig, TapNode, DefaultAgentExt};
36//! use tap_node::message::processor_pool::ProcessorPoolConfig;
37//! use tokio::time::Duration;
38//!
39//! // Test secrets resolver for doctests
40//! #[derive(Debug)]
41//! struct TestSecretsResolver {
42//! secrets: std::collections::HashMap<String, didcomm::secrets::Secret>
43//! }
44//!
45//! impl TestSecretsResolver {
46//! pub fn new() -> Self {
47//! Self {
48//! secrets: std::collections::HashMap::new()
49//! }
50//! }
51//! }
52//!
53//! impl tap_agent::crypto::DebugSecretsResolver for TestSecretsResolver {
54//! fn get_secrets_map(&self) -> &std::collections::HashMap<String, didcomm::secrets::Secret> {
55//! &self.secrets
56//! }
57//! }
58//!
59//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
60//! // Create a node with default configuration
61//! let config = NodeConfig::default();
62//! let mut node = TapNode::new(config);
63//!
64//! // Configure and start the processor pool
65//! let pool_config = ProcessorPoolConfig {
66//! workers: 4,
67//! channel_capacity: 100,
68//! worker_timeout: Duration::from_secs(30),
69//! };
70//! node.start(pool_config).await?;
71//!
72//! // Create and register a TAP agent
73//! // (simplified - in practice you would need to set up proper crypto)
74//! let agent_config = AgentConfig::new("did:example:123".to_string());
75//! // In a real scenario, you'd create these properly:
76//! let did_resolver = Arc::new(tap_agent::did::MultiResolver::default());
77//! let secrets_resolver = Arc::new(TestSecretsResolver::new());
78//! let message_packer = Arc::new(tap_agent::crypto::DefaultMessagePacker::new(
79//! did_resolver, secrets_resolver
80//! ));
81//! let agent = DefaultAgent::new(agent_config, message_packer);
82//! node.register_agent(Arc::new(agent)).await?;
83//!
84//! // The node is now ready to process messages
85//! // You would typically listen for incoming messages and call:
86//! // node.receive_message(message).await?;
87//!
88//! Ok(())
89//! }
90//! ```
91//!
92//! ## Thread Safety and Async
93//!
94//! The TAP Node is designed to be thread-safe and fully async, making it suitable for
95//! high-throughput environments. The core `TapNode` structure can be safely cloned and
96//! shared between threads, with all mutable state protected by appropriate synchronization
97//! primitives.
98
99pub mod agent;
100pub mod error;
101pub mod event;
102pub mod message;
103pub mod resolver;
104
105pub use error::{Error, Result};
106pub use message::sender::{
107 HttpMessageSender, MessageSender, NodeMessageSender, WebSocketMessageSender,
108};
109
110use std::sync::Arc;
111
112use tap_agent::{Agent, DefaultAgent};
113use tap_msg::didcomm::Message;
114
115use agent::AgentRegistry;
116use event::EventBus;
117use message::processor::{
118 DefaultMessageProcessor, LoggingMessageProcessor, MessageProcessor, ValidationMessageProcessor,
119};
120use message::processor_pool::{ProcessorPool, ProcessorPoolConfig};
121use message::router::DefaultMessageRouter;
122use message::RouterAsyncExt;
123use message::{
124 CompositeMessageProcessor, CompositeMessageRouter, MessageProcessorType, MessageRouterType,
125};
126use resolver::NodeResolver;
127
128use async_trait::async_trait;
129
130// Extension trait for DefaultAgent to add serialization methods
131///
132/// This trait extends the DefaultAgent with methods for serializing and packing
133/// DIDComm messages for transmission. It provides functionality for converting
134/// in-memory message objects to secure, serialized formats that follow the
135/// DIDComm messaging protocol standards.
136#[async_trait]
137pub trait DefaultAgentExt {
138 /// Pack and serialize a DIDComm message for transmission
139 ///
140 /// This method takes a DIDComm message and recipient DID, then:
141 /// 1. Adds appropriate security headers and metadata
142 /// 2. Applies security measures (signatures in the current implementation)
143 /// 3. Serializes the message to a string format
144 ///
145 /// # Parameters
146 /// * `message` - The DIDComm message to serialize
147 /// * `to_did` - The DID of the recipient
148 ///
149 /// # Returns
150 /// The packed message as a string, ready for transmission
151 async fn send_serialized_message(&self, message: &Message, to_did: &str) -> Result<String>;
152}
153
154#[async_trait]
155impl DefaultAgentExt for DefaultAgent {
156 async fn send_serialized_message(&self, message: &Message, to_did: &str) -> Result<String> {
157 // Convert the DIDComm Message to a properly packed format
158 // Since we can't directly use the agent's message packer in this context,
159 // we'll create a secure message format that follows DIDComm standards
160
161 // First, serialize the message to a JSON Value
162 let json_value = serde_json::to_value(message).map_err(Error::Serialization)?;
163
164 // Get the agent's DID as the sender
165 let from_did = self.get_agent_did();
166
167 // Create a metadata wrapper that includes proper DIDComm headers
168 // This follows the DIDComm v2 message structure
169 let packed_message = serde_json::json!({
170 // Use the message's ID or generate a new one if needed
171 "id": message.id.clone(),
172
173 // DIDComm envelope type indicating a signed message
174 "type": "application/didcomm-signed+json",
175
176 // Include the from field for proper sender identification
177 "from": from_did,
178
179 // Include the to field for proper recipient identification
180 "to": [to_did],
181
182 // Include the original message body
183 "body": json_value,
184
185 // Add timestamp
186 "created_time": chrono::Utc::now().timestamp(),
187
188 // Add security metadata (in a real implementation, this would include the signature)
189 "security": {
190 "mode": "signed",
191 "signature": {
192 "algorithm": "EdDSA",
193 "key_id": format!("{}#keys-1", from_did)
194 }
195 }
196 });
197
198 // Serialize to a string
199 let packed = serde_json::to_string(&packed_message).map_err(Error::Serialization)?;
200
201 // In a production implementation, this would use the DefaultAgent's MessagePacker
202 // for proper security with signatures and/or encryption
203
204 Ok(packed)
205 }
206}
207
208/// Configuration for a TAP Node
209#[derive(Debug, Clone, Default)]
210pub struct NodeConfig {
211 /// Debug mode
212 pub debug: bool,
213 /// Maximum number of agents
214 pub max_agents: Option<usize>,
215 /// Whether to enable message logging
216 pub enable_message_logging: bool,
217 /// Whether to log full message content
218 pub log_message_content: bool,
219 /// Configuration for the processor pool
220 pub processor_pool: Option<ProcessorPoolConfig>,
221}
222
223/// # The TAP Node
224///
225/// The TAP Node is the core component responsible for coordinating message processing, routing, and delivery
226/// to TAP Agents. It serves as a central hub for all TAP communications and transaction coordination.
227///
228/// ## Core Responsibilities
229///
230/// - **Agent Management**: Registration and deregistration of TAP Agents
231/// - **Message Processing**: Processing incoming and outgoing messages through middleware chains
232/// - **Message Routing**: Determining the appropriate recipient for each message
233/// - **Event Publishing**: Broadcasting node events to subscribers
234/// - **Scalability**: Managing concurrent message processing through worker pools
235///
236/// ## Lifecycle
237///
238/// 1. Create a node with appropriate configuration
239/// 2. Register one or more agents with the node
240/// 3. Start the processor pool (if high throughput is required)
241/// 4. Process incoming/outgoing messages
242/// 5. Publish and respond to events
243///
244/// ## Thread Safety
245///
246/// The `TapNode` is designed to be thread-safe and can be shared across multiple
247/// threads using an `Arc<TapNode>`. All internal mutability is handled through
248/// appropriate synchronization primitives.
249#[derive(Clone)]
250pub struct TapNode {
251 /// Agent registry
252 agents: Arc<AgentRegistry>,
253 /// Event bus
254 event_bus: Arc<EventBus>,
255 /// Incoming message processor
256 incoming_processor: CompositeMessageProcessor,
257 /// Outgoing message processor
258 outgoing_processor: CompositeMessageProcessor,
259 /// Message router
260 router: CompositeMessageRouter,
261 /// Resolver for DIDs
262 resolver: Arc<NodeResolver>,
263 /// Worker pool for handling messages
264 processor_pool: Option<ProcessorPool>,
265 /// Node configuration
266 config: NodeConfig,
267}
268
269impl TapNode {
270 /// Create a new TAP node with the given configuration
271 pub fn new(config: NodeConfig) -> Self {
272 // Create the agent registry
273 let agents = Arc::new(AgentRegistry::new(config.max_agents));
274
275 // Create the event bus
276 let event_bus = Arc::new(EventBus::new());
277
278 // Create the message router
279 let default_router = MessageRouterType::Default(DefaultMessageRouter::new());
280
281 let router = CompositeMessageRouter::new(vec![default_router]);
282
283 // Create the message processors
284 let logging_processor = MessageProcessorType::Logging(LoggingMessageProcessor);
285 let validation_processor = MessageProcessorType::Validation(ValidationMessageProcessor);
286 let default_processor = MessageProcessorType::Default(DefaultMessageProcessor);
287
288 let incoming_processor = CompositeMessageProcessor::new(vec![
289 logging_processor.clone(),
290 validation_processor.clone(),
291 default_processor.clone(),
292 ]);
293
294 let outgoing_processor = CompositeMessageProcessor::new(vec![
295 logging_processor,
296 validation_processor,
297 default_processor,
298 ]);
299
300 // Create the resolver
301 let resolver = Arc::new(NodeResolver::default());
302
303 Self {
304 agents,
305 event_bus,
306 incoming_processor,
307 outgoing_processor,
308 router,
309 resolver,
310 processor_pool: None,
311 config,
312 }
313 }
314
315 /// Start the node
316 pub async fn start(&mut self, config: ProcessorPoolConfig) -> Result<()> {
317 let processor_pool = ProcessorPool::new(config);
318 self.processor_pool = Some(processor_pool);
319 Ok(())
320 }
321
322 /// Receive and process an incoming message
323 ///
324 /// This method handles the complete lifecycle of an incoming message:
325 ///
326 /// 1. Processing the message through all registered processors
327 /// 2. Routing the message to determine the appropriate target agent
328 /// 3. Dispatching the message to the target agent
329 ///
330 /// The processing pipeline may transform or even drop the message based on
331 /// validation rules or other processing logic. If a message is dropped during
332 /// processing, this method will return Ok(()) without an error.
333 ///
334 /// # Parameters
335 ///
336 /// * `message` - The DIDComm message to be processed
337 ///
338 /// # Returns
339 ///
340 /// * `Ok(())` if the message was successfully processed and dispatched (or intentionally dropped)
341 /// * `Err(Error)` if there was an error during processing, routing, or dispatching
342 ///
343 /// # Errors
344 ///
345 /// This method can return errors for several reasons:
346 /// * Processing errors from message processors
347 /// * Routing errors if no target agent can be determined
348 /// * Dispatch errors if the target agent cannot be found or fails to process the message
349 ///
350 /// # Example
351 ///
352 /// ```no_run
353 /// # use tap_node::{TapNode, NodeConfig};
354 /// # use tap_msg::didcomm::Message;
355 /// # async fn example(node: &TapNode, message: Message) -> Result<(), tap_node::Error> {
356 /// // Process an incoming message
357 /// node.receive_message(message).await?;
358 /// # Ok(())
359 /// # }
360 /// ```
361 pub async fn receive_message(&self, message: Message) -> Result<()> {
362 // Process the incoming message
363 let processed_message = match self.incoming_processor.process_incoming(message).await? {
364 Some(msg) => msg,
365 None => return Ok(()), // Message was dropped during processing
366 };
367
368 // Route the message to the appropriate agent
369 let target_did = match self.router.route_message(&processed_message).await {
370 Ok(did) => did,
371 Err(e) => {
372 // Log the error but don't fail the entire operation
373 log::warn!("Unable to route message: {}", e);
374 return Ok(());
375 }
376 };
377
378 // Dispatch the message to the agent, handling any errors
379 match self.dispatch_message(target_did, processed_message).await {
380 Ok(_) => Ok(()),
381 Err(e) => {
382 // Log the error but don't fail the entire operation
383 log::warn!("Failed to dispatch message: {}", e);
384 Ok(())
385 }
386 }
387 }
388
389 /// Dispatch a message to an agent by DID
390 pub async fn dispatch_message(&self, target_did: String, message: Message) -> Result<()> {
391 let agent = self.agents.get_agent(&target_did).await?;
392
393 // Convert the message to a packed format for transport
394 let packed = agent.send_serialized_message(&message, &target_did).await?;
395
396 // Publish an event for the dispatched message
397 self.event_bus
398 .publish_agent_message(target_did, packed.into_bytes())
399 .await;
400
401 Ok(())
402 }
403
404 /// Send a message from one agent to another
405 ///
406 /// This method handles the processing, routing, and delivery of a message
407 /// from one agent to another. It returns the packed message.
408 pub async fn send_message(
409 &self,
410 from_did: &str,
411 to_did: &str,
412 message: Message,
413 ) -> Result<String> {
414 // Process the outgoing message
415 let processed_message = match self.outgoing_processor.process_outgoing(message).await? {
416 Some(msg) => msg,
417 None => return Err(Error::Processing("Message was dropped".to_string())),
418 };
419
420 // Get the sending agent
421 let agent = self.agents.get_agent(from_did).await?;
422
423 // Pack the message
424 let packed = agent
425 .send_serialized_message(&processed_message, to_did)
426 .await?;
427
428 // Publish an event for the sent message
429 self.event_bus
430 .publish_message_sent(processed_message, from_did.to_string(), to_did.to_string())
431 .await;
432
433 Ok(packed)
434 }
435
436 /// Register a new agent with the node
437 pub async fn register_agent(&self, agent: Arc<DefaultAgent>) -> Result<()> {
438 let agent_did = agent.get_agent_did().to_string();
439 self.agents.register_agent(agent_did.clone(), agent).await?;
440
441 // Publish event about agent registration
442 self.event_bus.publish_agent_registered(agent_did).await;
443
444 Ok(())
445 }
446
447 /// Unregister an agent from the node
448 pub async fn unregister_agent(&self, did: &str) -> Result<()> {
449 // Unregister the agent
450 self.agents.unregister_agent(did).await?;
451
452 // Publish event about agent unregistration
453 self.event_bus
454 .publish_agent_unregistered(did.to_string())
455 .await;
456
457 Ok(())
458 }
459
460 /// Get all registered agent DIDs
461 pub fn get_all_agent_dids(&self) -> Vec<String> {
462 self.agents.get_all_dids()
463 }
464
465 /// Get the event bus
466 pub fn get_event_bus(&self) -> Arc<EventBus> {
467 self.event_bus.clone()
468 }
469
470 /// Get the resolver
471 pub fn get_resolver(&self) -> Arc<NodeResolver> {
472 self.resolver.clone()
473 }
474
475 /// Get the node config
476 pub fn config(&self) -> &NodeConfig {
477 &self.config
478 }
479
480 /// Get the agent registry
481 pub fn agents(&self) -> &AgentRegistry {
482 &self.agents
483 }
484}