tap_node/
lib.rs

1//! # TAP Node Implementation
2//!
3//! This crate provides a complete node implementation for the Transaction Authorization Protocol (TAP).
4//! The TAP Node acts as a central hub for managing multiple TAP agents, routing messages between them,
5//! and coordinating the entire transaction lifecycle in a secure and scalable manner.
6//!
7//! ## Overview
8//!
9//! The Transaction Authorization Protocol (TAP) is designed for secure, privacy-preserving financial
10//! transactions between parties. A TAP Node serves as the infrastructure layer that:
11//!
12//! - Manages multiple TAP agents with different roles and capabilities
13//! - Processes incoming and outgoing TAP messages
14//! - Routes messages to the appropriate agents based on DID addressing
15//! - Handles message validation, transformation, and delivery
16//! - Provides an event system for monitoring and reacting to node activities
17//! - Scales to handle high message throughput with a processing pool
18//!
19//! ## Architecture
20//!
21//! The TAP Node is built with several key components:
22//!
23//! - **Agent Registry**: Maintains a collection of TAP agents by their DIDs
24//! - **Message Processors**: Process, validate, and transform messages
25//! - **Message Routers**: Determine the target agent for a message
26//! - **Processor Pool**: Provides concurrent message processing for scalability
27//! - **Event Bus**: Broadcasts node events to subscribers
28//! - **DID Resolver**: Resolves DIDs to DID Documents for message verification
29//!
30//! ## Usage Example
31//!
32//! ```rust,no_run
33//! use std::sync::Arc;
34//! use tap_agent::{AgentConfig, DefaultAgent};
35//! use tap_node::{NodeConfig, TapNode, DefaultAgentExt};
36//! use tap_node::message::processor_pool::ProcessorPoolConfig;
37//! use tokio::time::Duration;
38//!
39//! // Test secrets resolver for doctests
40//! #[derive(Debug)]
41//! struct TestSecretsResolver {
42//!     secrets: std::collections::HashMap<String, didcomm::secrets::Secret>
43//! }
44//!
45//! impl TestSecretsResolver {
46//!     pub fn new() -> Self {
47//!         Self {
48//!             secrets: std::collections::HashMap::new()
49//!         }
50//!     }
51//! }
52//!
53//! impl tap_agent::crypto::DebugSecretsResolver for TestSecretsResolver {
54//!     fn get_secrets_map(&self) -> &std::collections::HashMap<String, didcomm::secrets::Secret> {
55//!         &self.secrets
56//!     }
57//! }
58//!
59//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
60//!     // Create a node with default configuration
61//!     let config = NodeConfig::default();
62//!     let mut node = TapNode::new(config);
63//!
64//!     // Configure and start the processor pool
65//!     let pool_config = ProcessorPoolConfig {
66//!         workers: 4,
67//!         channel_capacity: 100,
68//!         worker_timeout: Duration::from_secs(30),
69//!     };
70//!     node.start(pool_config).await?;
71//!
72//!     // Create and register a TAP agent
73//!     // (simplified - in practice you would need to set up proper crypto)
74//!     let agent_config = AgentConfig::new("did:example:123".to_string());
75//!     // In a real scenario, you'd create these properly:
76//!     let did_resolver = Arc::new(tap_agent::did::MultiResolver::default());
77//!     let secrets_resolver = Arc::new(TestSecretsResolver::new());
78//!     let message_packer = Arc::new(tap_agent::crypto::DefaultMessagePacker::new(
79//!         did_resolver, secrets_resolver
80//!     ));
81//!     let agent = DefaultAgent::new(agent_config, message_packer);
82//!     node.register_agent(Arc::new(agent)).await?;
83//!
84//!     // The node is now ready to process messages
85//!     // You would typically listen for incoming messages and call:
86//!     // node.receive_message(message).await?;
87//!
88//!     Ok(())
89//! }
90//! ```
91//!
92//! ## Thread Safety and Async
93//!
94//! The TAP Node is designed to be thread-safe and fully async, making it suitable for
95//! high-throughput environments. The core `TapNode` structure can be safely cloned and
96//! shared between threads, with all mutable state protected by appropriate synchronization
97//! primitives.
98
99pub mod agent;
100pub mod error;
101pub mod event;
102pub mod message;
103pub mod resolver;
104
105pub use error::{Error, Result};
106pub use message::sender::{
107    HttpMessageSender, MessageSender, NodeMessageSender, WebSocketMessageSender,
108};
109
110use std::sync::Arc;
111
112use tap_agent::{Agent, DefaultAgent};
113use tap_msg::didcomm::Message;
114
115use agent::AgentRegistry;
116use event::EventBus;
117use message::processor::{
118    DefaultMessageProcessor, LoggingMessageProcessor, MessageProcessor, ValidationMessageProcessor,
119};
120use message::processor_pool::{ProcessorPool, ProcessorPoolConfig};
121use message::router::DefaultMessageRouter;
122use message::RouterAsyncExt;
123use message::{
124    CompositeMessageProcessor, CompositeMessageRouter, MessageProcessorType, MessageRouterType,
125};
126use resolver::NodeResolver;
127
128use async_trait::async_trait;
129
130// Extension trait for DefaultAgent to add serialization methods
131///
132/// This trait extends the DefaultAgent with methods for serializing and packing
133/// DIDComm messages for transmission. It provides functionality for converting
134/// in-memory message objects to secure, serialized formats that follow the
135/// DIDComm messaging protocol standards.
136#[async_trait]
137pub trait DefaultAgentExt {
138    /// Pack and serialize a DIDComm message for transmission
139    ///
140    /// This method takes a DIDComm message and recipient DID, then:
141    /// 1. Adds appropriate security headers and metadata
142    /// 2. Applies security measures (signatures in the current implementation)
143    /// 3. Serializes the message to a string format
144    ///
145    /// # Parameters
146    /// * `message` - The DIDComm message to serialize
147    /// * `to_did` - The DID of the recipient
148    ///
149    /// # Returns
150    /// The packed message as a string, ready for transmission
151    async fn send_serialized_message(&self, message: &Message, to_did: &str) -> Result<String>;
152}
153
154#[async_trait]
155impl DefaultAgentExt for DefaultAgent {
156    async fn send_serialized_message(&self, message: &Message, to_did: &str) -> Result<String> {
157        // Convert the DIDComm Message to a properly packed format
158        // Since we can't directly use the agent's message packer in this context,
159        // we'll create a secure message format that follows DIDComm standards
160
161        // First, serialize the message to a JSON Value
162        let json_value = serde_json::to_value(message).map_err(Error::Serialization)?;
163
164        // Get the agent's DID as the sender
165        let from_did = self.get_agent_did();
166
167        // Create a metadata wrapper that includes proper DIDComm headers
168        // This follows the DIDComm v2 message structure
169        let packed_message = serde_json::json!({
170            // Use the message's ID or generate a new one if needed
171            "id": message.id.clone(),
172
173            // DIDComm envelope type indicating a signed message
174            "type": "application/didcomm-signed+json",
175
176            // Include the from field for proper sender identification
177            "from": from_did,
178
179            // Include the to field for proper recipient identification
180            "to": [to_did],
181
182            // Include the original message body
183            "body": json_value,
184
185            // Add timestamp
186            "created_time": chrono::Utc::now().timestamp(),
187
188            // Add security metadata (in a real implementation, this would include the signature)
189            "security": {
190                "mode": "signed",
191                "signature": {
192                    "algorithm": "EdDSA",
193                    "key_id": format!("{}#keys-1", from_did)
194                }
195            }
196        });
197
198        // Serialize to a string
199        let packed = serde_json::to_string(&packed_message).map_err(Error::Serialization)?;
200
201        // In a production implementation, this would use the DefaultAgent's MessagePacker
202        // for proper security with signatures and/or encryption
203
204        Ok(packed)
205    }
206}
207
208/// Configuration for a TAP Node
209#[derive(Debug, Clone, Default)]
210pub struct NodeConfig {
211    /// Debug mode
212    pub debug: bool,
213    /// Maximum number of agents
214    pub max_agents: Option<usize>,
215    /// Whether to enable message logging
216    pub enable_message_logging: bool,
217    /// Whether to log full message content
218    pub log_message_content: bool,
219    /// Configuration for the processor pool
220    pub processor_pool: Option<ProcessorPoolConfig>,
221}
222
223/// # The TAP Node
224///
225/// The TAP Node is the core component responsible for coordinating message processing, routing, and delivery
226/// to TAP Agents. It serves as a central hub for all TAP communications and transaction coordination.
227///
228/// ## Core Responsibilities
229///
230/// - **Agent Management**: Registration and deregistration of TAP Agents
231/// - **Message Processing**: Processing incoming and outgoing messages through middleware chains
232/// - **Message Routing**: Determining the appropriate recipient for each message
233/// - **Event Publishing**: Broadcasting node events to subscribers
234/// - **Scalability**: Managing concurrent message processing through worker pools
235///
236/// ## Lifecycle
237///
238/// 1. Create a node with appropriate configuration
239/// 2. Register one or more agents with the node
240/// 3. Start the processor pool (if high throughput is required)
241/// 4. Process incoming/outgoing messages
242/// 5. Publish and respond to events
243///
244/// ## Thread Safety
245///
246/// The `TapNode` is designed to be thread-safe and can be shared across multiple
247/// threads using an `Arc<TapNode>`. All internal mutability is handled through
248/// appropriate synchronization primitives.
249#[derive(Clone)]
250pub struct TapNode {
251    /// Agent registry
252    agents: Arc<AgentRegistry>,
253    /// Event bus
254    event_bus: Arc<EventBus>,
255    /// Incoming message processor
256    incoming_processor: CompositeMessageProcessor,
257    /// Outgoing message processor
258    outgoing_processor: CompositeMessageProcessor,
259    /// Message router
260    router: CompositeMessageRouter,
261    /// Resolver for DIDs
262    resolver: Arc<NodeResolver>,
263    /// Worker pool for handling messages
264    processor_pool: Option<ProcessorPool>,
265    /// Node configuration
266    config: NodeConfig,
267}
268
269impl TapNode {
270    /// Create a new TAP node with the given configuration
271    pub fn new(config: NodeConfig) -> Self {
272        // Create the agent registry
273        let agents = Arc::new(AgentRegistry::new(config.max_agents));
274
275        // Create the event bus
276        let event_bus = Arc::new(EventBus::new());
277
278        // Create the message router
279        let default_router = MessageRouterType::Default(DefaultMessageRouter::new());
280
281        let router = CompositeMessageRouter::new(vec![default_router]);
282
283        // Create the message processors
284        let logging_processor = MessageProcessorType::Logging(LoggingMessageProcessor);
285        let validation_processor = MessageProcessorType::Validation(ValidationMessageProcessor);
286        let default_processor = MessageProcessorType::Default(DefaultMessageProcessor);
287
288        let incoming_processor = CompositeMessageProcessor::new(vec![
289            logging_processor.clone(),
290            validation_processor.clone(),
291            default_processor.clone(),
292        ]);
293
294        let outgoing_processor = CompositeMessageProcessor::new(vec![
295            logging_processor,
296            validation_processor,
297            default_processor,
298        ]);
299
300        // Create the resolver
301        let resolver = Arc::new(NodeResolver::default());
302
303        Self {
304            agents,
305            event_bus,
306            incoming_processor,
307            outgoing_processor,
308            router,
309            resolver,
310            processor_pool: None,
311            config,
312        }
313    }
314
315    /// Start the node
316    pub async fn start(&mut self, config: ProcessorPoolConfig) -> Result<()> {
317        let processor_pool = ProcessorPool::new(config);
318        self.processor_pool = Some(processor_pool);
319        Ok(())
320    }
321
322    /// Receive and process an incoming message
323    ///
324    /// This method handles the complete lifecycle of an incoming message:
325    ///
326    /// 1. Processing the message through all registered processors
327    /// 2. Routing the message to determine the appropriate target agent
328    /// 3. Dispatching the message to the target agent
329    ///
330    /// The processing pipeline may transform or even drop the message based on
331    /// validation rules or other processing logic. If a message is dropped during
332    /// processing, this method will return Ok(()) without an error.
333    ///
334    /// # Parameters
335    ///
336    /// * `message` - The DIDComm message to be processed
337    ///
338    /// # Returns
339    ///
340    /// * `Ok(())` if the message was successfully processed and dispatched (or intentionally dropped)
341    /// * `Err(Error)` if there was an error during processing, routing, or dispatching
342    ///
343    /// # Errors
344    ///
345    /// This method can return errors for several reasons:
346    /// * Processing errors from message processors
347    /// * Routing errors if no target agent can be determined
348    /// * Dispatch errors if the target agent cannot be found or fails to process the message
349    ///
350    /// # Example
351    ///
352    /// ```no_run
353    /// # use tap_node::{TapNode, NodeConfig};
354    /// # use tap_msg::didcomm::Message;
355    /// # async fn example(node: &TapNode, message: Message) -> Result<(), tap_node::Error> {
356    /// // Process an incoming message
357    /// node.receive_message(message).await?;
358    /// # Ok(())
359    /// # }
360    /// ```
361    pub async fn receive_message(&self, message: Message) -> Result<()> {
362        // Process the incoming message
363        let processed_message = match self.incoming_processor.process_incoming(message).await? {
364            Some(msg) => msg,
365            None => return Ok(()), // Message was dropped during processing
366        };
367
368        // Route the message to the appropriate agent
369        let target_did = match self.router.route_message(&processed_message).await {
370            Ok(did) => did,
371            Err(e) => {
372                // Log the error but don't fail the entire operation
373                log::warn!("Unable to route message: {}", e);
374                return Ok(());
375            }
376        };
377
378        // Dispatch the message to the agent, handling any errors
379        match self.dispatch_message(target_did, processed_message).await {
380            Ok(_) => Ok(()),
381            Err(e) => {
382                // Log the error but don't fail the entire operation
383                log::warn!("Failed to dispatch message: {}", e);
384                Ok(())
385            }
386        }
387    }
388
389    /// Dispatch a message to an agent by DID
390    pub async fn dispatch_message(&self, target_did: String, message: Message) -> Result<()> {
391        let agent = self.agents.get_agent(&target_did).await?;
392
393        // Convert the message to a packed format for transport
394        let packed = agent.send_serialized_message(&message, &target_did).await?;
395
396        // Publish an event for the dispatched message
397        self.event_bus
398            .publish_agent_message(target_did, packed.into_bytes())
399            .await;
400
401        Ok(())
402    }
403
404    /// Send a message from one agent to another
405    ///
406    /// This method handles the processing, routing, and delivery of a message
407    /// from one agent to another. It returns the packed message.
408    pub async fn send_message(
409        &self,
410        from_did: &str,
411        to_did: &str,
412        message: Message,
413    ) -> Result<String> {
414        // Process the outgoing message
415        let processed_message = match self.outgoing_processor.process_outgoing(message).await? {
416            Some(msg) => msg,
417            None => return Err(Error::Processing("Message was dropped".to_string())),
418        };
419
420        // Get the sending agent
421        let agent = self.agents.get_agent(from_did).await?;
422
423        // Pack the message
424        let packed = agent
425            .send_serialized_message(&processed_message, to_did)
426            .await?;
427
428        // Publish an event for the sent message
429        self.event_bus
430            .publish_message_sent(processed_message, from_did.to_string(), to_did.to_string())
431            .await;
432
433        Ok(packed)
434    }
435
436    /// Register a new agent with the node
437    pub async fn register_agent(&self, agent: Arc<DefaultAgent>) -> Result<()> {
438        let agent_did = agent.get_agent_did().to_string();
439        self.agents.register_agent(agent_did.clone(), agent).await?;
440
441        // Publish event about agent registration
442        self.event_bus.publish_agent_registered(agent_did).await;
443
444        Ok(())
445    }
446
447    /// Unregister an agent from the node
448    pub async fn unregister_agent(&self, did: &str) -> Result<()> {
449        // Unregister the agent
450        self.agents.unregister_agent(did).await?;
451
452        // Publish event about agent unregistration
453        self.event_bus
454            .publish_agent_unregistered(did.to_string())
455            .await;
456
457        Ok(())
458    }
459
460    /// Get all registered agent DIDs
461    pub fn get_all_agent_dids(&self) -> Vec<String> {
462        self.agents.get_all_dids()
463    }
464
465    /// Get the event bus
466    pub fn get_event_bus(&self) -> Arc<EventBus> {
467        self.event_bus.clone()
468    }
469
470    /// Get the resolver
471    pub fn get_resolver(&self) -> Arc<NodeResolver> {
472        self.resolver.clone()
473    }
474
475    /// Get the node config
476    pub fn config(&self) -> &NodeConfig {
477        &self.config
478    }
479
480    /// Get the agent registry
481    pub fn agents(&self) -> &AgentRegistry {
482        &self.agents
483    }
484}