tap_node/event/mod.rs
1//! # Event System for TAP Node
2//!
3//! This module provides a comprehensive event handling and subscription system for TAP Node.
4//! The event system allows components to publish and subscribe to various events that occur
5//! within the node, enabling loose coupling between components and reactive programming patterns.
6//!
7//! ## Event Types
8//!
9//! The `NodeEvent` enum defines all the possible events that can be emitted by the TAP Node:
10//!
11//! - **PlainMessageReceived**: When a message is received by an agent
12//! - **PlainMessageSent**: When a message is sent from an agent to another
13//! - **AgentRegistered**: When a new agent is registered with the node
14//! - **AgentUnregistered**: When an agent is removed from the node
15//! - **DidResolved**: When a DID is resolved (successfully or not)
16//! - **AgentPlainMessage**: Raw message data intended for an agent
17//!
18//! ## Subscription Models
19//!
20//! The event system supports two subscription models:
21//!
22//! 1. **Callback-based**: Implementing the `EventSubscriber` trait to receive events via callbacks
23//! 2. **Channel-based**: Using `tokio::sync::broadcast` channels to receive events asynchronously
24//!
25//! ## Built-in Event Handlers
26//!
27//! The event system includes several built-in event handlers:
28//!
29//! - **EventLogger**: Logs all events to a configurable destination (console, file, or custom handler)
30//!
31//! ## Usage Examples
32//!
33//! ### Callback-based Subscription
34//!
35//! ```
36//! use std::sync::Arc;
37//! use async_trait::async_trait;
38//! use tap_node::event::{EventBus, EventSubscriber, NodeEvent};
39//!
40//! // Create a custom event handler
41//! struct LoggingEventHandler;
42//!
43//! #[async_trait]
44//! impl EventSubscriber for LoggingEventHandler {
45//! async fn handle_event(&self, event: NodeEvent) {
46//! match event {
47//! NodeEvent::PlainMessageReceived { message } => {
48//! println!("PlainMessage received: {:?}", message);
49//! },
50//! NodeEvent::AgentRegistered { did } => {
51//! println!("Agent registered: {}", did);
52//! },
53//! // Handle other event types...
54//! _ => {}
55//! }
56//! }
57//! }
58//!
59//! // Later, subscribe to events
60//! async fn subscribe_events(event_bus: &EventBus) {
61//! let handler = Arc::new(LoggingEventHandler);
62//! event_bus.subscribe(handler).await;
63//! }
64//! ```
65//!
66//! ### Channel-based Subscription
67//!
68//! ```
69//! use tap_node::event::{EventBus, NodeEvent};
70//! use tokio::spawn;
71//!
72//! async fn monitor_events(event_bus: &EventBus) {
73//! // Get a receiver for the events
74//! let mut receiver = event_bus.subscribe_channel();
75//!
76//! // Process events in a separate task
77//! spawn(async move {
78//! while let Ok(event) = receiver.recv().await {
79//! match event {
80//! NodeEvent::PlainMessageSent { message, from, to } => {
81//! println!("PlainMessage sent from {} to {}", from, to);
82//! },
83//! // Handle other events...
84//! _ => {}
85//! }
86//! }
87//! });
88//! }
89//! ```
90//!
91//! ### Using the Event Logger
92//!
93//! ```no_run
94//! use std::sync::Arc;
95//! use tap_node::{NodeConfig, TapNode};
96//! use tap_node::event::logger::{EventLogger, EventLoggerConfig, LogDestination};
97//!
98//! async fn example() {
99//! // Create a new TAP node
100//! let node = TapNode::new(NodeConfig::default());
101//!
102//! // Configure the event logger
103//! let logger_config = EventLoggerConfig {
104//! destination: LogDestination::File {
105//! path: "/var/log/tap-node/events.log".to_string(),
106//! max_size: Some(10 * 1024 * 1024), // 10 MB
107//! rotate: true,
108//! },
109//! structured: true, // Use JSON format
110//! log_level: log::Level::Info,
111//! };
112//!
113//! // Create and subscribe the event logger
114//! let event_logger = Arc::new(EventLogger::new(logger_config));
115//! node.event_bus().subscribe(event_logger).await;
116//! }
117//! ```
118//!
119//! ## Thread Safety
120//!
121//! The event system is designed to be thread-safe, with all mutable state protected
122//! by appropriate synchronization primitives. The `EventBus` can be safely shared
123//! across threads using `Arc<EventBus>`.
124
125pub mod logger;
126
127use async_trait::async_trait;
128use serde_json::Value;
129use std::sync::Arc;
130use tap_msg::didcomm::PlainMessage;
131use tokio::sync::{broadcast, RwLock};
132
133/// Event types that can be emitted by the TAP Node
134///
135/// The `NodeEvent` enum represents all the possible events that can occur
136/// within a TAP Node. These events can be subscribed to using the `EventBus`
137/// to enable reactive programming patterns and loose coupling between components.
138///
139/// # Event Categories
140///
141/// Events are broadly categorized into:
142///
143/// - **PlainMessage Events**: Related to message processing and delivery (PlainMessageReceived, PlainMessageSent)
144/// - **Agent Events**: Related to agent lifecycle management (AgentRegistered, AgentUnregistered)
145/// - **Resolution Events**: Related to DID resolution (DidResolved)
146/// - **Raw PlainMessage Events**: Raw binary messages for agents (AgentPlainMessage)
147///
148/// # Usage
149///
150/// Events are typically consumed by matching on the event type and taking appropriate action:
151///
152/// ```
153/// use tap_node::event::NodeEvent;
154///
155/// fn process_event(event: NodeEvent) {
156/// match event {
157/// NodeEvent::PlainMessageReceived { message } => {
158/// println!("PlainMessage received: {:?}", message);
159/// },
160/// NodeEvent::AgentRegistered { did } => {
161/// println!("Agent registered: {}", did);
162/// },
163/// // Handle other event types...
164/// _ => {}
165/// }
166/// }
167/// ```
168#[derive(Debug, Clone)]
169pub enum NodeEvent {
170 /// A DIDComm message was received by the node
171 ///
172 /// This event is triggered after a message has been successfully processed by
173 /// the node's incoming message processors. It contains the deserialized message
174 /// content as a JSON Value.
175 ///
176 /// # Parameters
177 ///
178 /// - `message`: The received message as a JSON Value
179 ///
180 /// # Example Use Cases
181 ///
182 /// - Monitoring and logging received messages
183 /// - Triggering follow-up actions based on message content
184 /// - Auditing message flow through the system
185 PlainMessageReceived {
186 /// The received message as a JSON Value
187 message: Value,
188 },
189
190 /// A DIDComm message was sent from one agent to another
191 ///
192 /// This event is triggered after a message has been successfully processed by
193 /// the node's outgoing message processors and prepared for delivery.
194 ///
195 /// # Parameters
196 ///
197 /// - `message`: The sent message as a JSON Value
198 /// - `from`: The DID of the sending agent
199 /// - `to`: The DID of the receiving agent
200 ///
201 /// # Example Use Cases
202 ///
203 /// - Tracking message delivery
204 /// - Analyzing communication patterns
205 /// - Generating message delivery receipts
206 PlainMessageSent {
207 /// The sent message as a JSON Value
208 message: Value,
209 /// The DID of the sending agent
210 from: String,
211 /// The DID of the receiving agent
212 to: String,
213 },
214
215 /// A new agent was registered with the node
216 ///
217 /// This event is triggered when an agent is successfully registered with the
218 /// node's agent registry. It contains the DID of the registered agent.
219 ///
220 /// # Parameters
221 ///
222 /// - `did`: The DID of the registered agent
223 ///
224 /// # Example Use Cases
225 ///
226 /// - Tracking agent lifecycle
227 /// - Initializing resources for new agents
228 /// - Notifying other components of new agent availability
229 AgentRegistered {
230 /// The DID of the registered agent
231 did: String,
232 },
233
234 /// An agent was unregistered from the node
235 ///
236 /// This event is triggered when an agent is removed from the node's agent
237 /// registry. It contains the DID of the unregistered agent.
238 ///
239 /// # Parameters
240 ///
241 /// - `did`: The DID of the unregistered agent
242 ///
243 /// # Example Use Cases
244 ///
245 /// - Cleanup of resources associated with the agent
246 /// - Notifying other components of agent removal
247 /// - Updating routing tables
248 AgentUnregistered {
249 /// The DID of the unregistered agent
250 did: String,
251 },
252
253 /// A DID was resolved by the node's resolver
254 ///
255 /// This event is triggered when the node attempts to resolve a DID. It includes
256 /// both the DID being resolved and whether the resolution was successful.
257 ///
258 /// # Parameters
259 ///
260 /// - `did`: The DID that was resolved
261 /// - `success`: Whether the resolution was successful
262 ///
263 /// # Example Use Cases
264 ///
265 /// - Monitoring resolution failures
266 /// - Caching resolution results
267 /// - Diagnostics and debugging
268 DidResolved {
269 /// The DID that was resolved
270 did: String,
271 /// Whether the resolution was successful
272 success: bool,
273 },
274
275 /// A raw message event for an agent
276 ///
277 /// This event contains raw binary message data intended for a specific agent.
278 /// It is typically used for low-level message delivery mechanisms.
279 ///
280 /// # Parameters
281 ///
282 /// - `did`: The DID of the target agent
283 /// - `message`: The raw binary message data
284 ///
285 /// # Example Use Cases
286 ///
287 /// - Direct message delivery to agents
288 /// - Integration with transport-specific mechanisms
289 /// - Binary protocol support
290 AgentPlainMessage {
291 /// The DID of the target agent
292 did: String,
293 /// The raw binary message data
294 message: Vec<u8>,
295 },
296}
297
298/// Event subscriber trait for receiving node events
299///
300/// This trait defines the interface for components that want to receive
301/// node events via callbacks. Implementers must define the `handle_event`
302/// method to process events as they occur.
303///
304/// # Thread Safety
305///
306/// All implementations must be `Send + Sync` to ensure they can be safely
307/// used in multithreaded environments.
308///
309/// # Usage
310///
311/// ```
312/// use std::sync::Arc;
313/// use async_trait::async_trait;
314/// use tap_node::event::{EventSubscriber, NodeEvent, EventBus};
315///
316/// #[derive(Debug)]
317/// struct MyEventHandler {
318/// name: String,
319/// }
320///
321/// #[async_trait]
322/// impl EventSubscriber for MyEventHandler {
323/// async fn handle_event(&self, event: NodeEvent) {
324/// println!("Handler '{}' received event: {:?}", self.name, event);
325/// }
326/// }
327///
328/// async fn example(event_bus: &EventBus) {
329/// let handler = Arc::new(MyEventHandler { name: "Logger".to_string() });
330/// event_bus.subscribe(handler).await;
331/// }
332/// ```
333#[async_trait]
334pub trait EventSubscriber: Send + Sync {
335 /// Handle a node event
336 ///
337 /// This method is called whenever an event is published to the event bus.
338 /// Implementations should process the event appropriately based on its type.
339 ///
340 /// # Parameters
341 ///
342 /// - `event`: The NodeEvent to handle
343 ///
344 /// # Note
345 ///
346 /// - This method should return quickly to avoid blocking the event bus
347 /// - For long-running operations, spawn a separate task
348 /// - Handle errors gracefully, as exceptions may disrupt the event system
349 async fn handle_event(&self, event: NodeEvent);
350}
351
352/// Event bus for publishing and subscribing to node events
353///
354/// The `EventBus` is the central coordination point for the event system. It allows
355/// components to publish events and provides two mechanisms for subscribing to events:
356///
357/// 1. Callback-based: Register an `EventSubscriber` to receive events via callbacks
358/// 2. Channel-based: Get a `broadcast::Receiver<NodeEvent>` for async event processing
359///
360/// # Thread Safety
361///
362/// The `EventBus` is designed to be thread-safe, with all mutable state protected
363/// by appropriate synchronization primitives. It can be safely shared across threads
364/// using `Arc<EventBus>`.
365///
366/// # Example Usage
367///
368/// ```rust,no_run
369/// use std::sync::Arc;
370/// use tap_node::event::{EventBus, NodeEvent};
371///
372/// async fn example() {
373/// // Create a new event bus
374/// let event_bus = Arc::new(EventBus::new());
375///
376/// // Subscribe to events using a channel
377/// let mut receiver = event_bus.subscribe_channel();
378///
379/// // Publish an event using public methods
380/// let did = "did:example:123".to_string();
381/// event_bus.publish_agent_registered(did).await;
382///
383/// // Process events from the channel
384/// tokio::spawn(async move {
385/// while let Ok(event) = receiver.recv().await {
386/// println!("Received event: {:?}", event);
387/// }
388/// });
389/// }
390/// ```
391pub struct EventBus {
392 /// Sender for events
393 sender: broadcast::Sender<NodeEvent>,
394 /// Subscribers
395 subscribers: RwLock<Vec<Arc<dyn EventSubscriber>>>,
396}
397
398impl Default for EventBus {
399 fn default() -> Self {
400 Self::new()
401 }
402}
403
404impl Clone for EventBus {
405 fn clone(&self) -> Self {
406 Self {
407 sender: self.sender.clone(),
408 subscribers: RwLock::new(Vec::new()),
409 }
410 }
411}
412
413impl EventBus {
414 /// Create a new event bus
415 pub fn new() -> Self {
416 // Create a channel with capacity for 100 events
417 let (sender, _) = broadcast::channel(100);
418
419 Self {
420 sender,
421 subscribers: RwLock::new(Vec::new()),
422 }
423 }
424
425 /// Subscribe to node events
426 pub async fn subscribe(&self, subscriber: Arc<dyn EventSubscriber>) {
427 let mut subscribers = self.subscribers.write().await;
428 subscribers.push(subscriber);
429 }
430
431 /// Get a receiver for node events
432 pub fn subscribe_channel(&self) -> broadcast::Receiver<NodeEvent> {
433 self.sender.subscribe()
434 }
435
436 /// Remove a subscriber from the event bus
437 pub async fn unsubscribe(&self, subscriber: &Arc<dyn EventSubscriber>) {
438 let mut subscribers = self.subscribers.write().await;
439 subscribers.retain(|s| !Arc::ptr_eq(s, subscriber));
440 }
441
442 /// Publish a message received event
443 pub async fn publish_message_received(&self, message: PlainMessage) {
444 let event = NodeEvent::PlainMessageReceived {
445 message: serde_json::to_value(message).unwrap(),
446 };
447 self.publish_event(event).await;
448 }
449
450 /// Publish a message sent event
451 pub async fn publish_message_sent(&self, message: PlainMessage, from: String, to: String) {
452 let event = NodeEvent::PlainMessageSent {
453 message: serde_json::to_value(message).unwrap(),
454 from,
455 to,
456 };
457 self.publish_event(event).await;
458 }
459
460 /// Publish an agent registered event
461 pub async fn publish_agent_registered(&self, did: String) {
462 let event = NodeEvent::AgentRegistered { did };
463 self.publish_event(event).await;
464 }
465
466 /// Publish an agent unregistered event
467 pub async fn publish_agent_unregistered(&self, did: String) {
468 let event = NodeEvent::AgentUnregistered { did };
469 self.publish_event(event).await;
470 }
471
472 /// Publish an agent message event
473 pub async fn publish_agent_message(&self, did: String, message: Vec<u8>) {
474 let event = NodeEvent::AgentPlainMessage { did, message };
475 self.publish_event(event).await;
476 }
477
478 /// Publish a DID resolved event
479 pub async fn publish_did_resolved(&self, did: String, success: bool) {
480 let event = NodeEvent::DidResolved { did, success };
481 self.publish_event(event).await;
482 }
483
484 /// Publish an event to all subscribers
485 async fn publish_event(&self, event: NodeEvent) {
486 // Send to channel
487 let _ = self.sender.send(event.clone());
488
489 // Notify subscribers
490 for subscriber in self.subscribers.read().await.iter() {
491 subscriber.handle_event(event.clone()).await;
492 }
493 }
494}