tap_node/event/mod.rs
1//! # Event System for TAP Node
2//!
3//! This module provides a comprehensive event handling and subscription system for TAP Node.
4//! The event system allows components to publish and subscribe to various events that occur
5//! within the node, enabling loose coupling between components and reactive programming patterns.
6//!
7//! ## Event Types
8//!
9//! The `NodeEvent` enum defines all the possible events that can be emitted by the TAP Node:
10//!
11//! - **MessageReceived**: When a message is received by an agent
12//! - **MessageSent**: When a message is sent from an agent to another
13//! - **AgentRegistered**: When a new agent is registered with the node
14//! - **AgentUnregistered**: When an agent is removed from the node
15//! - **DidResolved**: When a DID is resolved (successfully or not)
16//! - **AgentMessage**: Raw message data intended for an agent
17//!
18//! ## Subscription Models
19//!
20//! The event system supports two subscription models:
21//!
22//! 1. **Callback-based**: Implementing the `EventSubscriber` trait to receive events via callbacks
23//! 2. **Channel-based**: Using `tokio::sync::broadcast` channels to receive events asynchronously
24//!
25//! ## Usage Examples
26//!
27//! ### Callback-based Subscription
28//!
29//! ```
30//! use std::sync::Arc;
31//! use async_trait::async_trait;
32//! use tap_node::event::{EventBus, EventSubscriber, NodeEvent};
33//!
34//! // Create a custom event handler
35//! struct LoggingEventHandler;
36//!
37//! #[async_trait]
38//! impl EventSubscriber for LoggingEventHandler {
39//! async fn handle_event(&self, event: NodeEvent) {
40//! match event {
41//! NodeEvent::MessageReceived { message } => {
42//! println!("Message received: {:?}", message);
43//! },
44//! NodeEvent::AgentRegistered { did } => {
45//! println!("Agent registered: {}", did);
46//! },
47//! // Handle other event types...
48//! _ => {}
49//! }
50//! }
51//! }
52//!
53//! // Later, subscribe to events
54//! async fn subscribe_events(event_bus: &EventBus) {
55//! let handler = Arc::new(LoggingEventHandler);
56//! event_bus.subscribe(handler).await;
57//! }
58//! ```
59//!
60//! ### Channel-based Subscription
61//!
62//! ```
63//! use tap_node::event::{EventBus, NodeEvent};
64//! use tokio::spawn;
65//!
66//! async fn monitor_events(event_bus: &EventBus) {
67//! // Get a receiver for the events
68//! let mut receiver = event_bus.subscribe_channel();
69//!
70//! // Process events in a separate task
71//! spawn(async move {
72//! while let Ok(event) = receiver.recv().await {
73//! match event {
74//! NodeEvent::MessageSent { message, from, to } => {
75//! println!("Message sent from {} to {}", from, to);
76//! },
77//! // Handle other events...
78//! _ => {}
79//! }
80//! }
81//! });
82//! }
83//! ```
84//!
85//! ## Thread Safety
86//!
87//! The event system is designed to be thread-safe, with all mutable state protected
88//! by appropriate synchronization primitives. The `EventBus` can be safely shared
89//! across threads using `Arc<EventBus>`.
90
91use async_trait::async_trait;
92use serde_json::Value;
93use std::sync::Arc;
94use tap_msg::didcomm::Message;
95use tokio::sync::{broadcast, RwLock};
96
97/// Event types that can be emitted by the TAP Node
98///
99/// The `NodeEvent` enum represents all the possible events that can occur
100/// within a TAP Node. These events can be subscribed to using the `EventBus`
101/// to enable reactive programming patterns and loose coupling between components.
102///
103/// # Event Categories
104///
105/// Events are broadly categorized into:
106///
107/// - **Message Events**: Related to message processing and delivery (MessageReceived, MessageSent)
108/// - **Agent Events**: Related to agent lifecycle management (AgentRegistered, AgentUnregistered)
109/// - **Resolution Events**: Related to DID resolution (DidResolved)
110/// - **Raw Message Events**: Raw binary messages for agents (AgentMessage)
111///
112/// # Usage
113///
114/// Events are typically consumed by matching on the event type and taking appropriate action:
115///
116/// ```
117/// use tap_node::event::NodeEvent;
118///
119/// fn process_event(event: NodeEvent) {
120/// match event {
121/// NodeEvent::MessageReceived { message } => {
122/// println!("Message received: {:?}", message);
123/// },
124/// NodeEvent::AgentRegistered { did } => {
125/// println!("Agent registered: {}", did);
126/// },
127/// // Handle other event types...
128/// _ => {}
129/// }
130/// }
131/// ```
132#[derive(Debug, Clone)]
133pub enum NodeEvent {
134 /// A DIDComm message was received by the node
135 ///
136 /// This event is triggered after a message has been successfully processed by
137 /// the node's incoming message processors. It contains the deserialized message
138 /// content as a JSON Value.
139 ///
140 /// # Parameters
141 ///
142 /// - `message`: The received message as a JSON Value
143 ///
144 /// # Example Use Cases
145 ///
146 /// - Monitoring and logging received messages
147 /// - Triggering follow-up actions based on message content
148 /// - Auditing message flow through the system
149 MessageReceived {
150 /// The received message as a JSON Value
151 message: Value,
152 },
153
154 /// A DIDComm message was sent from one agent to another
155 ///
156 /// This event is triggered after a message has been successfully processed by
157 /// the node's outgoing message processors and prepared for delivery.
158 ///
159 /// # Parameters
160 ///
161 /// - `message`: The sent message as a JSON Value
162 /// - `from`: The DID of the sending agent
163 /// - `to`: The DID of the receiving agent
164 ///
165 /// # Example Use Cases
166 ///
167 /// - Tracking message delivery
168 /// - Analyzing communication patterns
169 /// - Generating message delivery receipts
170 MessageSent {
171 /// The sent message as a JSON Value
172 message: Value,
173 /// The DID of the sending agent
174 from: String,
175 /// The DID of the receiving agent
176 to: String,
177 },
178
179 /// A new agent was registered with the node
180 ///
181 /// This event is triggered when an agent is successfully registered with the
182 /// node's agent registry. It contains the DID of the registered agent.
183 ///
184 /// # Parameters
185 ///
186 /// - `did`: The DID of the registered agent
187 ///
188 /// # Example Use Cases
189 ///
190 /// - Tracking agent lifecycle
191 /// - Initializing resources for new agents
192 /// - Notifying other components of new agent availability
193 AgentRegistered {
194 /// The DID of the registered agent
195 did: String,
196 },
197
198 /// An agent was unregistered from the node
199 ///
200 /// This event is triggered when an agent is removed from the node's agent
201 /// registry. It contains the DID of the unregistered agent.
202 ///
203 /// # Parameters
204 ///
205 /// - `did`: The DID of the unregistered agent
206 ///
207 /// # Example Use Cases
208 ///
209 /// - Cleanup of resources associated with the agent
210 /// - Notifying other components of agent removal
211 /// - Updating routing tables
212 AgentUnregistered {
213 /// The DID of the unregistered agent
214 did: String,
215 },
216
217 /// A DID was resolved by the node's resolver
218 ///
219 /// This event is triggered when the node attempts to resolve a DID. It includes
220 /// both the DID being resolved and whether the resolution was successful.
221 ///
222 /// # Parameters
223 ///
224 /// - `did`: The DID that was resolved
225 /// - `success`: Whether the resolution was successful
226 ///
227 /// # Example Use Cases
228 ///
229 /// - Monitoring resolution failures
230 /// - Caching resolution results
231 /// - Diagnostics and debugging
232 DidResolved {
233 /// The DID that was resolved
234 did: String,
235 /// Whether the resolution was successful
236 success: bool,
237 },
238
239 /// A raw message event for an agent
240 ///
241 /// This event contains raw binary message data intended for a specific agent.
242 /// It is typically used for low-level message delivery mechanisms.
243 ///
244 /// # Parameters
245 ///
246 /// - `did`: The DID of the target agent
247 /// - `message`: The raw binary message data
248 ///
249 /// # Example Use Cases
250 ///
251 /// - Direct message delivery to agents
252 /// - Integration with transport-specific mechanisms
253 /// - Binary protocol support
254 AgentMessage {
255 /// The DID of the target agent
256 did: String,
257 /// The raw binary message data
258 message: Vec<u8>,
259 },
260}
261
262/// Event subscriber trait for receiving node events
263///
264/// This trait defines the interface for components that want to receive
265/// node events via callbacks. Implementers must define the `handle_event`
266/// method to process events as they occur.
267///
268/// # Thread Safety
269///
270/// All implementations must be `Send + Sync` to ensure they can be safely
271/// used in multithreaded environments.
272///
273/// # Usage
274///
275/// ```
276/// use std::sync::Arc;
277/// use async_trait::async_trait;
278/// use tap_node::event::{EventSubscriber, NodeEvent, EventBus};
279///
280/// #[derive(Debug)]
281/// struct MyEventHandler {
282/// name: String,
283/// }
284///
285/// #[async_trait]
286/// impl EventSubscriber for MyEventHandler {
287/// async fn handle_event(&self, event: NodeEvent) {
288/// println!("Handler '{}' received event: {:?}", self.name, event);
289/// }
290/// }
291///
292/// async fn example(event_bus: &EventBus) {
293/// let handler = Arc::new(MyEventHandler { name: "Logger".to_string() });
294/// event_bus.subscribe(handler).await;
295/// }
296/// ```
297#[async_trait]
298pub trait EventSubscriber: Send + Sync {
299 /// Handle a node event
300 ///
301 /// This method is called whenever an event is published to the event bus.
302 /// Implementations should process the event appropriately based on its type.
303 ///
304 /// # Parameters
305 ///
306 /// - `event`: The NodeEvent to handle
307 ///
308 /// # Note
309 ///
310 /// - This method should return quickly to avoid blocking the event bus
311 /// - For long-running operations, spawn a separate task
312 /// - Handle errors gracefully, as exceptions may disrupt the event system
313 async fn handle_event(&self, event: NodeEvent);
314}
315
316/// Event bus for publishing and subscribing to node events
317///
318/// The `EventBus` is the central coordination point for the event system. It allows
319/// components to publish events and provides two mechanisms for subscribing to events:
320///
321/// 1. Callback-based: Register an `EventSubscriber` to receive events via callbacks
322/// 2. Channel-based: Get a `broadcast::Receiver<NodeEvent>` for async event processing
323///
324/// # Thread Safety
325///
326/// The `EventBus` is designed to be thread-safe, with all mutable state protected
327/// by appropriate synchronization primitives. It can be safely shared across threads
328/// using `Arc<EventBus>`.
329///
330/// # Example Usage
331///
332/// ```rust,no_run
333/// use std::sync::Arc;
334/// use tap_node::event::{EventBus, NodeEvent};
335///
336/// async fn example() {
337/// // Create a new event bus
338/// let event_bus = Arc::new(EventBus::new());
339///
340/// // Subscribe to events using a channel
341/// let mut receiver = event_bus.subscribe_channel();
342///
343/// // Publish an event using public methods
344/// let did = "did:example:123".to_string();
345/// event_bus.publish_agent_registered(did).await;
346///
347/// // Process events from the channel
348/// tokio::spawn(async move {
349/// while let Ok(event) = receiver.recv().await {
350/// println!("Received event: {:?}", event);
351/// }
352/// });
353/// }
354/// ```
355pub struct EventBus {
356 /// Sender for events
357 sender: broadcast::Sender<NodeEvent>,
358 /// Subscribers
359 subscribers: RwLock<Vec<Arc<dyn EventSubscriber>>>,
360}
361
362impl Default for EventBus {
363 fn default() -> Self {
364 Self::new()
365 }
366}
367
368impl Clone for EventBus {
369 fn clone(&self) -> Self {
370 Self {
371 sender: self.sender.clone(),
372 subscribers: RwLock::new(Vec::new()),
373 }
374 }
375}
376
377impl EventBus {
378 /// Create a new event bus
379 pub fn new() -> Self {
380 // Create a channel with capacity for 100 events
381 let (sender, _) = broadcast::channel(100);
382
383 Self {
384 sender,
385 subscribers: RwLock::new(Vec::new()),
386 }
387 }
388
389 /// Subscribe to node events
390 pub async fn subscribe(&self, subscriber: Arc<dyn EventSubscriber>) {
391 let mut subscribers = self.subscribers.write().await;
392 subscribers.push(subscriber);
393 }
394
395 /// Get a receiver for node events
396 pub fn subscribe_channel(&self) -> broadcast::Receiver<NodeEvent> {
397 self.sender.subscribe()
398 }
399
400 /// Remove a subscriber from the event bus
401 pub async fn unsubscribe(&self, subscriber: &Arc<dyn EventSubscriber>) {
402 let mut subscribers = self.subscribers.write().await;
403 subscribers.retain(|s| !Arc::ptr_eq(s, subscriber));
404 }
405
406 /// Publish a message received event
407 pub async fn publish_message_received(&self, message: Message) {
408 let event = NodeEvent::MessageReceived {
409 message: serde_json::to_value(message).unwrap(),
410 };
411 self.publish_event(event).await;
412 }
413
414 /// Publish a message sent event
415 pub async fn publish_message_sent(&self, message: Message, from: String, to: String) {
416 let event = NodeEvent::MessageSent {
417 message: serde_json::to_value(message).unwrap(),
418 from,
419 to,
420 };
421 self.publish_event(event).await;
422 }
423
424 /// Publish an agent registered event
425 pub async fn publish_agent_registered(&self, did: String) {
426 let event = NodeEvent::AgentRegistered { did };
427 self.publish_event(event).await;
428 }
429
430 /// Publish an agent unregistered event
431 pub async fn publish_agent_unregistered(&self, did: String) {
432 let event = NodeEvent::AgentUnregistered { did };
433 self.publish_event(event).await;
434 }
435
436 /// Publish an agent message event
437 pub async fn publish_agent_message(&self, did: String, message: Vec<u8>) {
438 let event = NodeEvent::AgentMessage { did, message };
439 self.publish_event(event).await;
440 }
441
442 /// Publish a DID resolved event
443 pub async fn publish_did_resolved(&self, did: String, success: bool) {
444 let event = NodeEvent::DidResolved { did, success };
445 self.publish_event(event).await;
446 }
447
448 /// Publish an event to all subscribers
449 async fn publish_event(&self, event: NodeEvent) {
450 // Send to channel
451 let _ = self.sender.send(event.clone());
452
453 // Notify subscribers
454 for subscriber in self.subscribers.read().await.iter() {
455 subscriber.handle_event(event.clone()).await;
456 }
457 }
458}