tap_node/event/
mod.rs

1//! # Event System for TAP Node
2//!
3//! This module provides a comprehensive event handling and subscription system for TAP Node.
4//! The event system allows components to publish and subscribe to various events that occur
5//! within the node, enabling loose coupling between components and reactive programming patterns.
6//!
7//! ## Event Types
8//!
9//! The `NodeEvent` enum defines all the possible events that can be emitted by the TAP Node:
10//!
11//! - **MessageReceived**: When a message is received by an agent
12//! - **MessageSent**: When a message is sent from an agent to another
13//! - **AgentRegistered**: When a new agent is registered with the node
14//! - **AgentUnregistered**: When an agent is removed from the node
15//! - **DidResolved**: When a DID is resolved (successfully or not)
16//! - **AgentMessage**: Raw message data intended for an agent
17//!
18//! ## Subscription Models
19//!
20//! The event system supports two subscription models:
21//!
22//! 1. **Callback-based**: Implementing the `EventSubscriber` trait to receive events via callbacks
23//! 2. **Channel-based**: Using `tokio::sync::broadcast` channels to receive events asynchronously
24//!
25//! ## Usage Examples
26//!
27//! ### Callback-based Subscription
28//!
29//! ```
30//! use std::sync::Arc;
31//! use async_trait::async_trait;
32//! use tap_node::event::{EventBus, EventSubscriber, NodeEvent};
33//!
34//! // Create a custom event handler
35//! struct LoggingEventHandler;
36//!
37//! #[async_trait]
38//! impl EventSubscriber for LoggingEventHandler {
39//!     async fn handle_event(&self, event: NodeEvent) {
40//!         match event {
41//!             NodeEvent::MessageReceived { message } => {
42//!                 println!("Message received: {:?}", message);
43//!             },
44//!             NodeEvent::AgentRegistered { did } => {
45//!                 println!("Agent registered: {}", did);
46//!             },
47//!             // Handle other event types...
48//!             _ => {}
49//!         }
50//!     }
51//! }
52//!
53//! // Later, subscribe to events
54//! async fn subscribe_events(event_bus: &EventBus) {
55//!     let handler = Arc::new(LoggingEventHandler);
56//!     event_bus.subscribe(handler).await;
57//! }
58//! ```
59//!
60//! ### Channel-based Subscription
61//!
62//! ```
63//! use tap_node::event::{EventBus, NodeEvent};
64//! use tokio::spawn;
65//!
66//! async fn monitor_events(event_bus: &EventBus) {
67//!     // Get a receiver for the events
68//!     let mut receiver = event_bus.subscribe_channel();
69//!
70//!     // Process events in a separate task
71//!     spawn(async move {
72//!         while let Ok(event) = receiver.recv().await {
73//!             match event {
74//!                 NodeEvent::MessageSent { message, from, to } => {
75//!                     println!("Message sent from {} to {}", from, to);
76//!                 },
77//!                 // Handle other events...
78//!                 _ => {}
79//!             }
80//!         }
81//!     });
82//! }
83//! ```
84//!
85//! ## Thread Safety
86//!
87//! The event system is designed to be thread-safe, with all mutable state protected
88//! by appropriate synchronization primitives. The `EventBus` can be safely shared
89//! across threads using `Arc<EventBus>`.
90
91use async_trait::async_trait;
92use serde_json::Value;
93use std::sync::Arc;
94use tap_msg::didcomm::Message;
95use tokio::sync::{broadcast, RwLock};
96
97/// Event types that can be emitted by the TAP Node
98///
99/// The `NodeEvent` enum represents all the possible events that can occur
100/// within a TAP Node. These events can be subscribed to using the `EventBus`
101/// to enable reactive programming patterns and loose coupling between components.
102///
103/// # Event Categories
104///
105/// Events are broadly categorized into:
106///
107/// - **Message Events**: Related to message processing and delivery (MessageReceived, MessageSent)
108/// - **Agent Events**: Related to agent lifecycle management (AgentRegistered, AgentUnregistered)
109/// - **Resolution Events**: Related to DID resolution (DidResolved)
110/// - **Raw Message Events**: Raw binary messages for agents (AgentMessage)
111///
112/// # Usage
113///
114/// Events are typically consumed by matching on the event type and taking appropriate action:
115///
116/// ```
117/// use tap_node::event::NodeEvent;
118///
119/// fn process_event(event: NodeEvent) {
120///     match event {
121///         NodeEvent::MessageReceived { message } => {
122///             println!("Message received: {:?}", message);
123///         },
124///         NodeEvent::AgentRegistered { did } => {
125///             println!("Agent registered: {}", did);
126///         },
127///         // Handle other event types...
128///         _ => {}
129///     }
130/// }
131/// ```
132#[derive(Debug, Clone)]
133pub enum NodeEvent {
134    /// A DIDComm message was received by the node
135    ///
136    /// This event is triggered after a message has been successfully processed by
137    /// the node's incoming message processors. It contains the deserialized message
138    /// content as a JSON Value.
139    ///
140    /// # Parameters
141    ///
142    /// - `message`: The received message as a JSON Value
143    ///
144    /// # Example Use Cases
145    ///
146    /// - Monitoring and logging received messages
147    /// - Triggering follow-up actions based on message content
148    /// - Auditing message flow through the system
149    MessageReceived {
150        /// The received message as a JSON Value
151        message: Value,
152    },
153
154    /// A DIDComm message was sent from one agent to another
155    ///
156    /// This event is triggered after a message has been successfully processed by
157    /// the node's outgoing message processors and prepared for delivery.
158    ///
159    /// # Parameters
160    ///
161    /// - `message`: The sent message as a JSON Value
162    /// - `from`: The DID of the sending agent
163    /// - `to`: The DID of the receiving agent
164    ///
165    /// # Example Use Cases
166    ///
167    /// - Tracking message delivery
168    /// - Analyzing communication patterns
169    /// - Generating message delivery receipts
170    MessageSent {
171        /// The sent message as a JSON Value
172        message: Value,
173        /// The DID of the sending agent
174        from: String,
175        /// The DID of the receiving agent
176        to: String,
177    },
178
179    /// A new agent was registered with the node
180    ///
181    /// This event is triggered when an agent is successfully registered with the
182    /// node's agent registry. It contains the DID of the registered agent.
183    ///
184    /// # Parameters
185    ///
186    /// - `did`: The DID of the registered agent
187    ///
188    /// # Example Use Cases
189    ///
190    /// - Tracking agent lifecycle
191    /// - Initializing resources for new agents
192    /// - Notifying other components of new agent availability
193    AgentRegistered {
194        /// The DID of the registered agent
195        did: String,
196    },
197
198    /// An agent was unregistered from the node
199    ///
200    /// This event is triggered when an agent is removed from the node's agent
201    /// registry. It contains the DID of the unregistered agent.
202    ///
203    /// # Parameters
204    ///
205    /// - `did`: The DID of the unregistered agent
206    ///
207    /// # Example Use Cases
208    ///
209    /// - Cleanup of resources associated with the agent
210    /// - Notifying other components of agent removal
211    /// - Updating routing tables
212    AgentUnregistered {
213        /// The DID of the unregistered agent
214        did: String,
215    },
216
217    /// A DID was resolved by the node's resolver
218    ///
219    /// This event is triggered when the node attempts to resolve a DID. It includes
220    /// both the DID being resolved and whether the resolution was successful.
221    ///
222    /// # Parameters
223    ///
224    /// - `did`: The DID that was resolved
225    /// - `success`: Whether the resolution was successful
226    ///
227    /// # Example Use Cases
228    ///
229    /// - Monitoring resolution failures
230    /// - Caching resolution results
231    /// - Diagnostics and debugging
232    DidResolved {
233        /// The DID that was resolved
234        did: String,
235        /// Whether the resolution was successful
236        success: bool,
237    },
238
239    /// A raw message event for an agent
240    ///
241    /// This event contains raw binary message data intended for a specific agent.
242    /// It is typically used for low-level message delivery mechanisms.
243    ///
244    /// # Parameters
245    ///
246    /// - `did`: The DID of the target agent
247    /// - `message`: The raw binary message data
248    ///
249    /// # Example Use Cases
250    ///
251    /// - Direct message delivery to agents
252    /// - Integration with transport-specific mechanisms
253    /// - Binary protocol support
254    AgentMessage {
255        /// The DID of the target agent
256        did: String,
257        /// The raw binary message data
258        message: Vec<u8>,
259    },
260}
261
262/// Event subscriber trait for receiving node events
263///
264/// This trait defines the interface for components that want to receive
265/// node events via callbacks. Implementers must define the `handle_event`
266/// method to process events as they occur.
267///
268/// # Thread Safety
269///
270/// All implementations must be `Send + Sync` to ensure they can be safely
271/// used in multithreaded environments.
272///
273/// # Usage
274///
275/// ```
276/// use std::sync::Arc;
277/// use async_trait::async_trait;
278/// use tap_node::event::{EventSubscriber, NodeEvent, EventBus};
279///
280/// #[derive(Debug)]
281/// struct MyEventHandler {
282///     name: String,
283/// }
284///
285/// #[async_trait]
286/// impl EventSubscriber for MyEventHandler {
287///     async fn handle_event(&self, event: NodeEvent) {
288///         println!("Handler '{}' received event: {:?}", self.name, event);
289///     }
290/// }
291///
292/// async fn example(event_bus: &EventBus) {
293///     let handler = Arc::new(MyEventHandler { name: "Logger".to_string() });
294///     event_bus.subscribe(handler).await;
295/// }
296/// ```
297#[async_trait]
298pub trait EventSubscriber: Send + Sync {
299    /// Handle a node event
300    ///
301    /// This method is called whenever an event is published to the event bus.
302    /// Implementations should process the event appropriately based on its type.
303    ///
304    /// # Parameters
305    ///
306    /// - `event`: The NodeEvent to handle
307    ///
308    /// # Note
309    ///
310    /// - This method should return quickly to avoid blocking the event bus
311    /// - For long-running operations, spawn a separate task
312    /// - Handle errors gracefully, as exceptions may disrupt the event system
313    async fn handle_event(&self, event: NodeEvent);
314}
315
316/// Event bus for publishing and subscribing to node events
317///
318/// The `EventBus` is the central coordination point for the event system. It allows
319/// components to publish events and provides two mechanisms for subscribing to events:
320///
321/// 1. Callback-based: Register an `EventSubscriber` to receive events via callbacks
322/// 2. Channel-based: Get a `broadcast::Receiver<NodeEvent>` for async event processing
323///
324/// # Thread Safety
325///
326/// The `EventBus` is designed to be thread-safe, with all mutable state protected
327/// by appropriate synchronization primitives. It can be safely shared across threads
328/// using `Arc<EventBus>`.
329///
330/// # Example Usage
331///
332/// ```rust,no_run
333/// use std::sync::Arc;
334/// use tap_node::event::{EventBus, NodeEvent};
335///
336/// async fn example() {
337///     // Create a new event bus
338///     let event_bus = Arc::new(EventBus::new());
339///
340///     // Subscribe to events using a channel
341///     let mut receiver = event_bus.subscribe_channel();
342///
343///     // Publish an event using public methods
344///     let did = "did:example:123".to_string();
345///     event_bus.publish_agent_registered(did).await;
346///
347///     // Process events from the channel
348///     tokio::spawn(async move {
349///         while let Ok(event) = receiver.recv().await {
350///             println!("Received event: {:?}", event);
351///         }
352///     });
353/// }
354/// ```
355pub struct EventBus {
356    /// Sender for events
357    sender: broadcast::Sender<NodeEvent>,
358    /// Subscribers
359    subscribers: RwLock<Vec<Arc<dyn EventSubscriber>>>,
360}
361
362impl Default for EventBus {
363    fn default() -> Self {
364        Self::new()
365    }
366}
367
368impl Clone for EventBus {
369    fn clone(&self) -> Self {
370        Self {
371            sender: self.sender.clone(),
372            subscribers: RwLock::new(Vec::new()),
373        }
374    }
375}
376
377impl EventBus {
378    /// Create a new event bus
379    pub fn new() -> Self {
380        // Create a channel with capacity for 100 events
381        let (sender, _) = broadcast::channel(100);
382
383        Self {
384            sender,
385            subscribers: RwLock::new(Vec::new()),
386        }
387    }
388
389    /// Subscribe to node events
390    pub async fn subscribe(&self, subscriber: Arc<dyn EventSubscriber>) {
391        let mut subscribers = self.subscribers.write().await;
392        subscribers.push(subscriber);
393    }
394
395    /// Get a receiver for node events
396    pub fn subscribe_channel(&self) -> broadcast::Receiver<NodeEvent> {
397        self.sender.subscribe()
398    }
399
400    /// Remove a subscriber from the event bus
401    pub async fn unsubscribe(&self, subscriber: &Arc<dyn EventSubscriber>) {
402        let mut subscribers = self.subscribers.write().await;
403        subscribers.retain(|s| !Arc::ptr_eq(s, subscriber));
404    }
405
406    /// Publish a message received event
407    pub async fn publish_message_received(&self, message: Message) {
408        let event = NodeEvent::MessageReceived {
409            message: serde_json::to_value(message).unwrap(),
410        };
411        self.publish_event(event).await;
412    }
413
414    /// Publish a message sent event
415    pub async fn publish_message_sent(&self, message: Message, from: String, to: String) {
416        let event = NodeEvent::MessageSent {
417            message: serde_json::to_value(message).unwrap(),
418            from,
419            to,
420        };
421        self.publish_event(event).await;
422    }
423
424    /// Publish an agent registered event
425    pub async fn publish_agent_registered(&self, did: String) {
426        let event = NodeEvent::AgentRegistered { did };
427        self.publish_event(event).await;
428    }
429
430    /// Publish an agent unregistered event
431    pub async fn publish_agent_unregistered(&self, did: String) {
432        let event = NodeEvent::AgentUnregistered { did };
433        self.publish_event(event).await;
434    }
435
436    /// Publish an agent message event
437    pub async fn publish_agent_message(&self, did: String, message: Vec<u8>) {
438        let event = NodeEvent::AgentMessage { did, message };
439        self.publish_event(event).await;
440    }
441
442    /// Publish a DID resolved event
443    pub async fn publish_did_resolved(&self, did: String, success: bool) {
444        let event = NodeEvent::DidResolved { did, success };
445        self.publish_event(event).await;
446    }
447
448    /// Publish an event to all subscribers
449    async fn publish_event(&self, event: NodeEvent) {
450        // Send to channel
451        let _ = self.sender.send(event.clone());
452
453        // Notify subscribers
454        for subscriber in self.subscribers.read().await.iter() {
455            subscriber.handle_event(event.clone()).await;
456        }
457    }
458}