tap_node/event/mod.rs
1//! # Event System for TAP Node
2//!
3//! This module provides a comprehensive event handling and subscription system for TAP Node.
4//! The event system allows components to publish and subscribe to various events that occur
5//! within the node, enabling loose coupling between components and reactive programming patterns.
6//!
7//! ## Event Types
8//!
9//! The `NodeEvent` enum defines all the possible events that can be emitted by the TAP Node:
10//!
11//! - **PlainMessageReceived**: When a message is received by an agent
12//! - **PlainMessageSent**: When a message is sent from an agent to another
13//! - **AgentRegistered**: When a new agent is registered with the node
14//! - **AgentUnregistered**: When an agent is removed from the node
15//! - **DidResolved**: When a DID is resolved (successfully or not)
16//! - **AgentPlainMessage**: Raw message data intended for an agent
17//!
18//! ## Subscription Models
19//!
20//! The event system supports two subscription models:
21//!
22//! 1. **Callback-based**: Implementing the `EventSubscriber` trait to receive events via callbacks
23//! 2. **Channel-based**: Using `tokio::sync::broadcast` channels to receive events asynchronously
24//!
25//! ## Built-in Event Handlers
26//!
27//! The event system includes several built-in event handlers:
28//!
29//! - **EventLogger**: Logs all events to a configurable destination (console, file, or custom handler)
30//!
31//! ## Usage Examples
32//!
33//! ### Callback-based Subscription
34//!
35//! ```
36//! use std::sync::Arc;
37//! use async_trait::async_trait;
38//! use tap_node::event::{EventBus, EventSubscriber, NodeEvent};
39//!
40//! // Create a custom event handler
41//! struct LoggingEventHandler;
42//!
43//! #[async_trait]
44//! impl EventSubscriber for LoggingEventHandler {
45//! async fn handle_event(&self, event: NodeEvent) {
46//! match event {
47//! NodeEvent::PlainMessageReceived { message } => {
48//! println!("PlainMessage received: {:?}", message);
49//! },
50//! NodeEvent::AgentRegistered { did } => {
51//! println!("Agent registered: {}", did);
52//! },
53//! // Handle other event types...
54//! _ => {}
55//! }
56//! }
57//! }
58//!
59//! // Later, subscribe to events
60//! async fn subscribe_events(event_bus: &EventBus) {
61//! let handler = Arc::new(LoggingEventHandler);
62//! event_bus.subscribe(handler).await;
63//! }
64//! ```
65//!
66//! ### Channel-based Subscription
67//!
68//! ```
69//! use tap_node::event::{EventBus, NodeEvent};
70//! use tokio::spawn;
71//!
72//! async fn monitor_events(event_bus: &EventBus) {
73//! // Get a receiver for the events
74//! let mut receiver = event_bus.subscribe_channel();
75//!
76//! // Process events in a separate task
77//! spawn(async move {
78//! while let Ok(event) = receiver.recv().await {
79//! match event {
80//! NodeEvent::PlainMessageSent { message, from, to } => {
81//! println!("PlainMessage sent from {} to {}", from, to);
82//! },
83//! // Handle other events...
84//! _ => {}
85//! }
86//! }
87//! });
88//! }
89//! ```
90//!
91//! ### Using the Event Logger
92//!
93//! ```no_run
94//! use std::sync::Arc;
95//! use tap_node::{NodeConfig, TapNode};
96//! use tap_node::event::logger::{EventLogger, EventLoggerConfig, LogDestination};
97//!
98//! async fn example() {
99//! // Create a new TAP node
100//! let node = TapNode::new(NodeConfig::default());
101//!
102//! // Configure the event logger
103//! let logger_config = EventLoggerConfig {
104//! destination: LogDestination::File {
105//! path: "/var/log/tap-node/events.log".to_string(),
106//! max_size: Some(10 * 1024 * 1024), // 10 MB
107//! rotate: true,
108//! },
109//! structured: true, // Use JSON format
110//! log_level: log::Level::Info,
111//! };
112//!
113//! // Create and subscribe the event logger
114//! let event_logger = Arc::new(EventLogger::new(logger_config));
115//! node.event_bus().subscribe(event_logger).await;
116//! }
117//! ```
118//!
119//! ## Thread Safety
120//!
121//! The event system is designed to be thread-safe, with all mutable state protected
122//! by appropriate synchronization primitives. The `EventBus` can be safely shared
123//! across threads using `Arc<EventBus>`.
124
125pub mod handlers;
126pub mod logger;
127
128use async_trait::async_trait;
129use serde_json::Value;
130use std::sync::Arc;
131use tap_msg::didcomm::PlainMessage;
132use tokio::sync::{broadcast, RwLock};
133
134/// Event types that can be emitted by the TAP Node
135///
136/// The `NodeEvent` enum represents all the possible events that can occur
137/// within a TAP Node. These events can be subscribed to using the `EventBus`
138/// to enable reactive programming patterns and loose coupling between components.
139///
140/// # Event Categories
141///
142/// Events are broadly categorized into:
143///
144/// - **PlainMessage Events**: Related to message processing and delivery (PlainMessageReceived, PlainMessageSent)
145/// - **Agent Events**: Related to agent lifecycle management (AgentRegistered, AgentUnregistered)
146/// - **Resolution Events**: Related to DID resolution (DidResolved)
147/// - **Raw PlainMessage Events**: Raw binary messages for agents (AgentPlainMessage)
148///
149/// # Usage
150///
151/// Events are typically consumed by matching on the event type and taking appropriate action:
152///
153/// ```
154/// use tap_node::event::NodeEvent;
155///
156/// fn process_event(event: NodeEvent) {
157/// match event {
158/// NodeEvent::PlainMessageReceived { message } => {
159/// println!("PlainMessage received: {:?}", message);
160/// },
161/// NodeEvent::AgentRegistered { did } => {
162/// println!("Agent registered: {}", did);
163/// },
164/// // Handle other event types...
165/// _ => {}
166/// }
167/// }
168/// ```
169#[derive(Debug, Clone)]
170pub enum NodeEvent {
171 /// A DIDComm message was received by the node
172 ///
173 /// This event is triggered after a message has been successfully processed by
174 /// the node's incoming message processors. It contains the deserialized message
175 /// content as a JSON Value.
176 ///
177 /// # Parameters
178 ///
179 /// - `message`: The received message as a JSON Value
180 ///
181 /// # Example Use Cases
182 ///
183 /// - Monitoring and logging received messages
184 /// - Triggering follow-up actions based on message content
185 /// - Auditing message flow through the system
186 PlainMessageReceived {
187 /// The received message as a JSON Value
188 message: Value,
189 },
190
191 /// A DIDComm message was sent from one agent to another
192 ///
193 /// This event is triggered after a message has been successfully processed by
194 /// the node's outgoing message processors and prepared for delivery.
195 ///
196 /// # Parameters
197 ///
198 /// - `message`: The sent message as a JSON Value
199 /// - `from`: The DID of the sending agent
200 /// - `to`: The DID of the receiving agent
201 ///
202 /// # Example Use Cases
203 ///
204 /// - Tracking message delivery
205 /// - Analyzing communication patterns
206 /// - Generating message delivery receipts
207 PlainMessageSent {
208 /// The sent message as a JSON Value
209 message: Value,
210 /// The DID of the sending agent
211 from: String,
212 /// The DID of the receiving agent
213 to: String,
214 },
215
216 /// A new agent was registered with the node
217 ///
218 /// This event is triggered when an agent is successfully registered with the
219 /// node's agent registry. It contains the DID of the registered agent.
220 ///
221 /// # Parameters
222 ///
223 /// - `did`: The DID of the registered agent
224 ///
225 /// # Example Use Cases
226 ///
227 /// - Tracking agent lifecycle
228 /// - Initializing resources for new agents
229 /// - Notifying other components of new agent availability
230 AgentRegistered {
231 /// The DID of the registered agent
232 did: String,
233 },
234
235 /// An agent was unregistered from the node
236 ///
237 /// This event is triggered when an agent is removed from the node's agent
238 /// registry. It contains the DID of the unregistered agent.
239 ///
240 /// # Parameters
241 ///
242 /// - `did`: The DID of the unregistered agent
243 ///
244 /// # Example Use Cases
245 ///
246 /// - Cleanup of resources associated with the agent
247 /// - Notifying other components of agent removal
248 /// - Updating routing tables
249 AgentUnregistered {
250 /// The DID of the unregistered agent
251 did: String,
252 },
253
254 /// A DID was resolved by the node's resolver
255 ///
256 /// This event is triggered when the node attempts to resolve a DID. It includes
257 /// both the DID being resolved and whether the resolution was successful.
258 ///
259 /// # Parameters
260 ///
261 /// - `did`: The DID that was resolved
262 /// - `success`: Whether the resolution was successful
263 ///
264 /// # Example Use Cases
265 ///
266 /// - Monitoring resolution failures
267 /// - Caching resolution results
268 /// - Diagnostics and debugging
269 DidResolved {
270 /// The DID that was resolved
271 did: String,
272 /// Whether the resolution was successful
273 success: bool,
274 },
275
276 /// A raw message event for an agent
277 ///
278 /// This event contains raw binary message data intended for a specific agent.
279 /// It is typically used for low-level message delivery mechanisms.
280 ///
281 /// # Parameters
282 ///
283 /// - `did`: The DID of the target agent
284 /// - `message`: The raw binary message data
285 ///
286 /// # Example Use Cases
287 ///
288 /// - Direct message delivery to agents
289 /// - Integration with transport-specific mechanisms
290 /// - Binary protocol support
291 AgentPlainMessage {
292 /// The DID of the target agent
293 did: String,
294 /// The raw binary message data
295 message: Vec<u8>,
296 },
297
298 /// A message was rejected by validation
299 ///
300 /// This event is triggered when a message fails validation checks and is rejected.
301 /// It contains information about why the message was rejected.
302 ///
303 /// # Parameters
304 ///
305 /// - `message_id`: The ID of the rejected message
306 /// - `reason`: The reason for rejection
307 /// - `from`: The DID of the sender
308 /// - `to`: The DID of the intended recipient
309 ///
310 /// # Example Use Cases
311 ///
312 /// - Monitoring validation failures
313 /// - Alerting on suspicious activity
314 /// - Debugging message flow issues
315 MessageRejected {
316 /// The ID of the rejected message
317 message_id: String,
318 /// The reason for rejection
319 reason: String,
320 /// The DID of the sender
321 from: String,
322 /// The DID of the intended recipient
323 to: String,
324 },
325
326 /// A message was accepted and processed
327 ///
328 /// This event is triggered when a message passes all validation checks and is accepted
329 /// for processing. It indicates successful message reception and validation.
330 ///
331 /// # Parameters
332 ///
333 /// - `message_id`: The ID of the accepted message
334 /// - `message_type`: The type of the message
335 /// - `from`: The DID of the sender
336 /// - `to`: The DID of the recipient
337 ///
338 /// # Example Use Cases
339 ///
340 /// - Tracking successful message flow
341 /// - Updating message status in database
342 /// - Triggering downstream processing
343 MessageAccepted {
344 /// The ID of the accepted message
345 message_id: String,
346 /// The type of the message
347 message_type: String,
348 /// The DID of the sender
349 from: String,
350 /// The DID of the recipient
351 to: String,
352 },
353
354 /// A reply was received for a previous message
355 ///
356 /// This event is triggered when a message is received that is a reply to a previously
357 /// sent message. It includes both the original message and the reply for context.
358 ///
359 /// # Parameters
360 ///
361 /// - `original_message_id`: The ID of the original message
362 /// - `reply_message`: The reply message
363 /// - `original_message`: The original message being replied to
364 ///
365 /// # Example Use Cases
366 ///
367 /// - Correlating request/response pairs
368 /// - Tracking conversation flow
369 /// - Implementing timeout handling
370 ReplyReceived {
371 /// The ID of the original message
372 original_message_id: String,
373 /// The reply message
374 reply_message: PlainMessage,
375 /// The original message being replied to
376 original_message: PlainMessage,
377 },
378
379 /// A transaction's state has changed
380 ///
381 /// This event is triggered when a transaction transitions from one state to another.
382 /// It includes information about the state transition and optionally the agent that
383 /// triggered the change.
384 ///
385 /// # Parameters
386 ///
387 /// - `transaction_id`: The ID of the transaction
388 /// - `old_state`: The previous state
389 /// - `new_state`: The new state
390 /// - `agent_did`: The DID of the agent that triggered the change (if applicable)
391 ///
392 /// # Example Use Cases
393 ///
394 /// - Monitoring transaction lifecycle
395 /// - Triggering state-specific actions
396 /// - Auditing state transitions
397 TransactionStateChanged {
398 /// The ID of the transaction
399 transaction_id: String,
400 /// The previous state
401 old_state: String,
402 /// The new state
403 new_state: String,
404 /// The DID of the agent that triggered the change
405 agent_did: Option<String>,
406 },
407}
408
409/// Event subscriber trait for receiving node events
410///
411/// This trait defines the interface for components that want to receive
412/// node events via callbacks. Implementers must define the `handle_event`
413/// method to process events as they occur.
414///
415/// # Thread Safety
416///
417/// All implementations must be `Send + Sync` to ensure they can be safely
418/// used in multithreaded environments.
419///
420/// # Usage
421///
422/// ```
423/// use std::sync::Arc;
424/// use async_trait::async_trait;
425/// use tap_node::event::{EventSubscriber, NodeEvent, EventBus};
426///
427/// #[derive(Debug)]
428/// struct MyEventHandler {
429/// name: String,
430/// }
431///
432/// #[async_trait]
433/// impl EventSubscriber for MyEventHandler {
434/// async fn handle_event(&self, event: NodeEvent) {
435/// println!("Handler '{}' received event: {:?}", self.name, event);
436/// }
437/// }
438///
439/// async fn example(event_bus: &EventBus) {
440/// let handler = Arc::new(MyEventHandler { name: "Logger".to_string() });
441/// event_bus.subscribe(handler).await;
442/// }
443/// ```
444#[async_trait]
445pub trait EventSubscriber: Send + Sync {
446 /// Handle a node event
447 ///
448 /// This method is called whenever an event is published to the event bus.
449 /// Implementations should process the event appropriately based on its type.
450 ///
451 /// # Parameters
452 ///
453 /// - `event`: The NodeEvent to handle
454 ///
455 /// # Note
456 ///
457 /// - This method should return quickly to avoid blocking the event bus
458 /// - For long-running operations, spawn a separate task
459 /// - Handle errors gracefully, as exceptions may disrupt the event system
460 async fn handle_event(&self, event: NodeEvent);
461}
462
463/// Event bus for publishing and subscribing to node events
464///
465/// The `EventBus` is the central coordination point for the event system. It allows
466/// components to publish events and provides two mechanisms for subscribing to events:
467///
468/// 1. Callback-based: Register an `EventSubscriber` to receive events via callbacks
469/// 2. Channel-based: Get a `broadcast::Receiver<NodeEvent>` for async event processing
470///
471/// # Thread Safety
472///
473/// The `EventBus` is designed to be thread-safe, with all mutable state protected
474/// by appropriate synchronization primitives. It can be safely shared across threads
475/// using `Arc<EventBus>`.
476///
477/// # Example Usage
478///
479/// ```rust,no_run
480/// use std::sync::Arc;
481/// use tap_node::event::{EventBus, NodeEvent};
482///
483/// async fn example() {
484/// // Create a new event bus
485/// let event_bus = Arc::new(EventBus::new());
486///
487/// // Subscribe to events using a channel
488/// let mut receiver = event_bus.subscribe_channel();
489///
490/// // Publish an event using public methods
491/// let did = "did:example:123".to_string();
492/// event_bus.publish_agent_registered(did).await;
493///
494/// // Process events from the channel
495/// tokio::spawn(async move {
496/// while let Ok(event) = receiver.recv().await {
497/// println!("Received event: {:?}", event);
498/// }
499/// });
500/// }
501/// ```
502pub struct EventBus {
503 /// Sender for events
504 sender: broadcast::Sender<NodeEvent>,
505 /// Subscribers
506 subscribers: RwLock<Vec<Arc<dyn EventSubscriber>>>,
507}
508
509impl Default for EventBus {
510 fn default() -> Self {
511 Self::new()
512 }
513}
514
515impl Clone for EventBus {
516 fn clone(&self) -> Self {
517 Self {
518 sender: self.sender.clone(),
519 subscribers: RwLock::new(Vec::new()),
520 }
521 }
522}
523
524impl EventBus {
525 /// Create a new event bus
526 pub fn new() -> Self {
527 // Create a channel with capacity for 100 events
528 let (sender, _) = broadcast::channel(100);
529
530 Self {
531 sender,
532 subscribers: RwLock::new(Vec::new()),
533 }
534 }
535
536 /// Subscribe to node events
537 pub async fn subscribe(&self, subscriber: Arc<dyn EventSubscriber>) {
538 let mut subscribers = self.subscribers.write().await;
539 subscribers.push(subscriber);
540 }
541
542 /// Get a receiver for node events
543 pub fn subscribe_channel(&self) -> broadcast::Receiver<NodeEvent> {
544 self.sender.subscribe()
545 }
546
547 /// Remove a subscriber from the event bus
548 pub async fn unsubscribe(&self, subscriber: &Arc<dyn EventSubscriber>) {
549 let mut subscribers = self.subscribers.write().await;
550 subscribers.retain(|s| !Arc::ptr_eq(s, subscriber));
551 }
552
553 /// Publish a message received event
554 pub async fn publish_message_received(&self, message: PlainMessage) {
555 let event = NodeEvent::PlainMessageReceived {
556 message: serde_json::to_value(message).unwrap(),
557 };
558 self.publish_event(event).await;
559 }
560
561 /// Publish a message sent event
562 pub async fn publish_message_sent(&self, message: PlainMessage, from: String, to: String) {
563 let event = NodeEvent::PlainMessageSent {
564 message: serde_json::to_value(message).unwrap(),
565 from,
566 to,
567 };
568 self.publish_event(event).await;
569 }
570
571 /// Publish an agent registered event
572 pub async fn publish_agent_registered(&self, did: String) {
573 let event = NodeEvent::AgentRegistered { did };
574 self.publish_event(event).await;
575 }
576
577 /// Publish an agent unregistered event
578 pub async fn publish_agent_unregistered(&self, did: String) {
579 let event = NodeEvent::AgentUnregistered { did };
580 self.publish_event(event).await;
581 }
582
583 /// Publish an agent message event
584 pub async fn publish_agent_message(&self, did: String, message: Vec<u8>) {
585 let event = NodeEvent::AgentPlainMessage { did, message };
586 self.publish_event(event).await;
587 }
588
589 /// Publish a DID resolved event
590 pub async fn publish_did_resolved(&self, did: String, success: bool) {
591 let event = NodeEvent::DidResolved { did, success };
592 self.publish_event(event).await;
593 }
594
595 /// Publish a message rejected event
596 pub async fn publish_message_rejected(
597 &self,
598 message_id: String,
599 reason: String,
600 from: String,
601 to: String,
602 ) {
603 let event = NodeEvent::MessageRejected {
604 message_id,
605 reason,
606 from,
607 to,
608 };
609 self.publish_event(event).await;
610 }
611
612 /// Publish a message accepted event
613 pub async fn publish_message_accepted(
614 &self,
615 message_id: String,
616 message_type: String,
617 from: String,
618 to: String,
619 ) {
620 let event = NodeEvent::MessageAccepted {
621 message_id,
622 message_type,
623 from,
624 to,
625 };
626 self.publish_event(event).await;
627 }
628
629 /// Publish a reply received event
630 pub async fn publish_reply_received(
631 &self,
632 original_message_id: String,
633 reply_message: PlainMessage,
634 original_message: PlainMessage,
635 ) {
636 let event = NodeEvent::ReplyReceived {
637 original_message_id,
638 reply_message,
639 original_message,
640 };
641 self.publish_event(event).await;
642 }
643
644 /// Publish a transaction state changed event
645 pub async fn publish_transaction_state_changed(
646 &self,
647 transaction_id: String,
648 old_state: String,
649 new_state: String,
650 agent_did: Option<String>,
651 ) {
652 let event = NodeEvent::TransactionStateChanged {
653 transaction_id,
654 old_state,
655 new_state,
656 agent_did,
657 };
658 self.publish_event(event).await;
659 }
660
661 /// Publish an event to all subscribers
662 async fn publish_event(&self, event: NodeEvent) {
663 // Send to channel
664 let _ = self.sender.send(event.clone());
665
666 // Notify subscribers
667 for subscriber in self.subscribers.read().await.iter() {
668 subscriber.handle_event(event.clone()).await;
669 }
670 }
671}