tap_node/event/
mod.rs

1//! # Event System for TAP Node
2//!
3//! This module provides a comprehensive event handling and subscription system for TAP Node.
4//! The event system allows components to publish and subscribe to various events that occur
5//! within the node, enabling loose coupling between components and reactive programming patterns.
6//!
7//! ## Event Types
8//!
9//! The `NodeEvent` enum defines all the possible events that can be emitted by the TAP Node:
10//!
11//! - **PlainMessageReceived**: When a message is received by an agent
12//! - **PlainMessageSent**: When a message is sent from an agent to another
13//! - **AgentRegistered**: When a new agent is registered with the node
14//! - **AgentUnregistered**: When an agent is removed from the node
15//! - **DidResolved**: When a DID is resolved (successfully or not)
16//! - **AgentPlainMessage**: Raw message data intended for an agent
17//!
18//! ## Subscription Models
19//!
20//! The event system supports two subscription models:
21//!
22//! 1. **Callback-based**: Implementing the `EventSubscriber` trait to receive events via callbacks
23//! 2. **Channel-based**: Using `tokio::sync::broadcast` channels to receive events asynchronously
24//!
25//! ## Built-in Event Handlers
26//!
27//! The event system includes several built-in event handlers:
28//!
29//! - **EventLogger**: Logs all events to a configurable destination (console, file, or custom handler)
30//!
31//! ## Usage Examples
32//!
33//! ### Callback-based Subscription
34//!
35//! ```
36//! use std::sync::Arc;
37//! use async_trait::async_trait;
38//! use tap_node::event::{EventBus, EventSubscriber, NodeEvent};
39//!
40//! // Create a custom event handler
41//! struct LoggingEventHandler;
42//!
43//! #[async_trait]
44//! impl EventSubscriber for LoggingEventHandler {
45//!     async fn handle_event(&self, event: NodeEvent) {
46//!         match event {
47//!             NodeEvent::PlainMessageReceived { message } => {
48//!                 println!("PlainMessage received: {:?}", message);
49//!             },
50//!             NodeEvent::AgentRegistered { did } => {
51//!                 println!("Agent registered: {}", did);
52//!             },
53//!             // Handle other event types...
54//!             _ => {}
55//!         }
56//!     }
57//! }
58//!
59//! // Later, subscribe to events
60//! async fn subscribe_events(event_bus: &EventBus) {
61//!     let handler = Arc::new(LoggingEventHandler);
62//!     event_bus.subscribe(handler).await;
63//! }
64//! ```
65//!
66//! ### Channel-based Subscription
67//!
68//! ```
69//! use tap_node::event::{EventBus, NodeEvent};
70//! use tokio::spawn;
71//!
72//! async fn monitor_events(event_bus: &EventBus) {
73//!     // Get a receiver for the events
74//!     let mut receiver = event_bus.subscribe_channel();
75//!
76//!     // Process events in a separate task
77//!     spawn(async move {
78//!         while let Ok(event) = receiver.recv().await {
79//!             match event {
80//!                 NodeEvent::PlainMessageSent { message, from, to } => {
81//!                     println!("PlainMessage sent from {} to {}", from, to);
82//!                 },
83//!                 // Handle other events...
84//!                 _ => {}
85//!             }
86//!         }
87//!     });
88//! }
89//! ```
90//!
91//! ### Using the Event Logger
92//!
93//! ```no_run
94//! use std::sync::Arc;
95//! use tap_node::{NodeConfig, TapNode};
96//! use tap_node::event::logger::{EventLogger, EventLoggerConfig, LogDestination};
97//!
98//! async fn example() {
99//!     // Create a new TAP node
100//!     let node = TapNode::new(NodeConfig::default());
101//!     
102//!     // Configure the event logger
103//!     let logger_config = EventLoggerConfig {
104//!         destination: LogDestination::File {
105//!             path: "/var/log/tap-node/events.log".to_string(),
106//!             max_size: Some(10 * 1024 * 1024), // 10 MB
107//!             rotate: true,
108//!         },
109//!         structured: true, // Use JSON format
110//!         log_level: log::Level::Info,
111//!     };
112//!     
113//!     // Create and subscribe the event logger
114//!     let event_logger = Arc::new(EventLogger::new(logger_config));
115//!     node.event_bus().subscribe(event_logger).await;
116//! }
117//! ```
118//!
119//! ## Thread Safety
120//!
121//! The event system is designed to be thread-safe, with all mutable state protected
122//! by appropriate synchronization primitives. The `EventBus` can be safely shared
123//! across threads using `Arc<EventBus>`.
124
125pub mod logger;
126
127use async_trait::async_trait;
128use serde_json::Value;
129use std::sync::Arc;
130use tap_msg::didcomm::PlainMessage;
131use tokio::sync::{broadcast, RwLock};
132
133/// Event types that can be emitted by the TAP Node
134///
135/// The `NodeEvent` enum represents all the possible events that can occur
136/// within a TAP Node. These events can be subscribed to using the `EventBus`
137/// to enable reactive programming patterns and loose coupling between components.
138///
139/// # Event Categories
140///
141/// Events are broadly categorized into:
142///
143/// - **PlainMessage Events**: Related to message processing and delivery (PlainMessageReceived, PlainMessageSent)
144/// - **Agent Events**: Related to agent lifecycle management (AgentRegistered, AgentUnregistered)
145/// - **Resolution Events**: Related to DID resolution (DidResolved)
146/// - **Raw PlainMessage Events**: Raw binary messages for agents (AgentPlainMessage)
147///
148/// # Usage
149///
150/// Events are typically consumed by matching on the event type and taking appropriate action:
151///
152/// ```
153/// use tap_node::event::NodeEvent;
154///
155/// fn process_event(event: NodeEvent) {
156///     match event {
157///         NodeEvent::PlainMessageReceived { message } => {
158///             println!("PlainMessage received: {:?}", message);
159///         },
160///         NodeEvent::AgentRegistered { did } => {
161///             println!("Agent registered: {}", did);
162///         },
163///         // Handle other event types...
164///         _ => {}
165///     }
166/// }
167/// ```
168#[derive(Debug, Clone)]
169pub enum NodeEvent {
170    /// A DIDComm message was received by the node
171    ///
172    /// This event is triggered after a message has been successfully processed by
173    /// the node's incoming message processors. It contains the deserialized message
174    /// content as a JSON Value.
175    ///
176    /// # Parameters
177    ///
178    /// - `message`: The received message as a JSON Value
179    ///
180    /// # Example Use Cases
181    ///
182    /// - Monitoring and logging received messages
183    /// - Triggering follow-up actions based on message content
184    /// - Auditing message flow through the system
185    PlainMessageReceived {
186        /// The received message as a JSON Value
187        message: Value,
188    },
189
190    /// A DIDComm message was sent from one agent to another
191    ///
192    /// This event is triggered after a message has been successfully processed by
193    /// the node's outgoing message processors and prepared for delivery.
194    ///
195    /// # Parameters
196    ///
197    /// - `message`: The sent message as a JSON Value
198    /// - `from`: The DID of the sending agent
199    /// - `to`: The DID of the receiving agent
200    ///
201    /// # Example Use Cases
202    ///
203    /// - Tracking message delivery
204    /// - Analyzing communication patterns
205    /// - Generating message delivery receipts
206    PlainMessageSent {
207        /// The sent message as a JSON Value
208        message: Value,
209        /// The DID of the sending agent
210        from: String,
211        /// The DID of the receiving agent
212        to: String,
213    },
214
215    /// A new agent was registered with the node
216    ///
217    /// This event is triggered when an agent is successfully registered with the
218    /// node's agent registry. It contains the DID of the registered agent.
219    ///
220    /// # Parameters
221    ///
222    /// - `did`: The DID of the registered agent
223    ///
224    /// # Example Use Cases
225    ///
226    /// - Tracking agent lifecycle
227    /// - Initializing resources for new agents
228    /// - Notifying other components of new agent availability
229    AgentRegistered {
230        /// The DID of the registered agent
231        did: String,
232    },
233
234    /// An agent was unregistered from the node
235    ///
236    /// This event is triggered when an agent is removed from the node's agent
237    /// registry. It contains the DID of the unregistered agent.
238    ///
239    /// # Parameters
240    ///
241    /// - `did`: The DID of the unregistered agent
242    ///
243    /// # Example Use Cases
244    ///
245    /// - Cleanup of resources associated with the agent
246    /// - Notifying other components of agent removal
247    /// - Updating routing tables
248    AgentUnregistered {
249        /// The DID of the unregistered agent
250        did: String,
251    },
252
253    /// A DID was resolved by the node's resolver
254    ///
255    /// This event is triggered when the node attempts to resolve a DID. It includes
256    /// both the DID being resolved and whether the resolution was successful.
257    ///
258    /// # Parameters
259    ///
260    /// - `did`: The DID that was resolved
261    /// - `success`: Whether the resolution was successful
262    ///
263    /// # Example Use Cases
264    ///
265    /// - Monitoring resolution failures
266    /// - Caching resolution results
267    /// - Diagnostics and debugging
268    DidResolved {
269        /// The DID that was resolved
270        did: String,
271        /// Whether the resolution was successful
272        success: bool,
273    },
274
275    /// A raw message event for an agent
276    ///
277    /// This event contains raw binary message data intended for a specific agent.
278    /// It is typically used for low-level message delivery mechanisms.
279    ///
280    /// # Parameters
281    ///
282    /// - `did`: The DID of the target agent
283    /// - `message`: The raw binary message data
284    ///
285    /// # Example Use Cases
286    ///
287    /// - Direct message delivery to agents
288    /// - Integration with transport-specific mechanisms
289    /// - Binary protocol support
290    AgentPlainMessage {
291        /// The DID of the target agent
292        did: String,
293        /// The raw binary message data
294        message: Vec<u8>,
295    },
296}
297
298/// Event subscriber trait for receiving node events
299///
300/// This trait defines the interface for components that want to receive
301/// node events via callbacks. Implementers must define the `handle_event`
302/// method to process events as they occur.
303///
304/// # Thread Safety
305///
306/// All implementations must be `Send + Sync` to ensure they can be safely
307/// used in multithreaded environments.
308///
309/// # Usage
310///
311/// ```
312/// use std::sync::Arc;
313/// use async_trait::async_trait;
314/// use tap_node::event::{EventSubscriber, NodeEvent, EventBus};
315///
316/// #[derive(Debug)]
317/// struct MyEventHandler {
318///     name: String,
319/// }
320///
321/// #[async_trait]
322/// impl EventSubscriber for MyEventHandler {
323///     async fn handle_event(&self, event: NodeEvent) {
324///         println!("Handler '{}' received event: {:?}", self.name, event);
325///     }
326/// }
327///
328/// async fn example(event_bus: &EventBus) {
329///     let handler = Arc::new(MyEventHandler { name: "Logger".to_string() });
330///     event_bus.subscribe(handler).await;
331/// }
332/// ```
333#[async_trait]
334pub trait EventSubscriber: Send + Sync {
335    /// Handle a node event
336    ///
337    /// This method is called whenever an event is published to the event bus.
338    /// Implementations should process the event appropriately based on its type.
339    ///
340    /// # Parameters
341    ///
342    /// - `event`: The NodeEvent to handle
343    ///
344    /// # Note
345    ///
346    /// - This method should return quickly to avoid blocking the event bus
347    /// - For long-running operations, spawn a separate task
348    /// - Handle errors gracefully, as exceptions may disrupt the event system
349    async fn handle_event(&self, event: NodeEvent);
350}
351
352/// Event bus for publishing and subscribing to node events
353///
354/// The `EventBus` is the central coordination point for the event system. It allows
355/// components to publish events and provides two mechanisms for subscribing to events:
356///
357/// 1. Callback-based: Register an `EventSubscriber` to receive events via callbacks
358/// 2. Channel-based: Get a `broadcast::Receiver<NodeEvent>` for async event processing
359///
360/// # Thread Safety
361///
362/// The `EventBus` is designed to be thread-safe, with all mutable state protected
363/// by appropriate synchronization primitives. It can be safely shared across threads
364/// using `Arc<EventBus>`.
365///
366/// # Example Usage
367///
368/// ```rust,no_run
369/// use std::sync::Arc;
370/// use tap_node::event::{EventBus, NodeEvent};
371///
372/// async fn example() {
373///     // Create a new event bus
374///     let event_bus = Arc::new(EventBus::new());
375///
376///     // Subscribe to events using a channel
377///     let mut receiver = event_bus.subscribe_channel();
378///
379///     // Publish an event using public methods
380///     let did = "did:example:123".to_string();
381///     event_bus.publish_agent_registered(did).await;
382///
383///     // Process events from the channel
384///     tokio::spawn(async move {
385///         while let Ok(event) = receiver.recv().await {
386///             println!("Received event: {:?}", event);
387///         }
388///     });
389/// }
390/// ```
391pub struct EventBus {
392    /// Sender for events
393    sender: broadcast::Sender<NodeEvent>,
394    /// Subscribers
395    subscribers: RwLock<Vec<Arc<dyn EventSubscriber>>>,
396}
397
398impl Default for EventBus {
399    fn default() -> Self {
400        Self::new()
401    }
402}
403
404impl Clone for EventBus {
405    fn clone(&self) -> Self {
406        Self {
407            sender: self.sender.clone(),
408            subscribers: RwLock::new(Vec::new()),
409        }
410    }
411}
412
413impl EventBus {
414    /// Create a new event bus
415    pub fn new() -> Self {
416        // Create a channel with capacity for 100 events
417        let (sender, _) = broadcast::channel(100);
418
419        Self {
420            sender,
421            subscribers: RwLock::new(Vec::new()),
422        }
423    }
424
425    /// Subscribe to node events
426    pub async fn subscribe(&self, subscriber: Arc<dyn EventSubscriber>) {
427        let mut subscribers = self.subscribers.write().await;
428        subscribers.push(subscriber);
429    }
430
431    /// Get a receiver for node events
432    pub fn subscribe_channel(&self) -> broadcast::Receiver<NodeEvent> {
433        self.sender.subscribe()
434    }
435
436    /// Remove a subscriber from the event bus
437    pub async fn unsubscribe(&self, subscriber: &Arc<dyn EventSubscriber>) {
438        let mut subscribers = self.subscribers.write().await;
439        subscribers.retain(|s| !Arc::ptr_eq(s, subscriber));
440    }
441
442    /// Publish a message received event
443    pub async fn publish_message_received(&self, message: PlainMessage) {
444        let event = NodeEvent::PlainMessageReceived {
445            message: serde_json::to_value(message).unwrap(),
446        };
447        self.publish_event(event).await;
448    }
449
450    /// Publish a message sent event
451    pub async fn publish_message_sent(&self, message: PlainMessage, from: String, to: String) {
452        let event = NodeEvent::PlainMessageSent {
453            message: serde_json::to_value(message).unwrap(),
454            from,
455            to,
456        };
457        self.publish_event(event).await;
458    }
459
460    /// Publish an agent registered event
461    pub async fn publish_agent_registered(&self, did: String) {
462        let event = NodeEvent::AgentRegistered { did };
463        self.publish_event(event).await;
464    }
465
466    /// Publish an agent unregistered event
467    pub async fn publish_agent_unregistered(&self, did: String) {
468        let event = NodeEvent::AgentUnregistered { did };
469        self.publish_event(event).await;
470    }
471
472    /// Publish an agent message event
473    pub async fn publish_agent_message(&self, did: String, message: Vec<u8>) {
474        let event = NodeEvent::AgentPlainMessage { did, message };
475        self.publish_event(event).await;
476    }
477
478    /// Publish a DID resolved event
479    pub async fn publish_did_resolved(&self, did: String, success: bool) {
480        let event = NodeEvent::DidResolved { did, success };
481        self.publish_event(event).await;
482    }
483
484    /// Publish an event to all subscribers
485    async fn publish_event(&self, event: NodeEvent) {
486        // Send to channel
487        let _ = self.sender.send(event.clone());
488
489        // Notify subscribers
490        for subscriber in self.subscribers.read().await.iter() {
491            subscriber.handle_event(event.clone()).await;
492        }
493    }
494}