tap_node/event/
mod.rs

1//! # Event System for TAP Node
2//!
3//! This module provides a comprehensive event handling and subscription system for TAP Node.
4//! The event system allows components to publish and subscribe to various events that occur
5//! within the node, enabling loose coupling between components and reactive programming patterns.
6//!
7//! ## Event Types
8//!
9//! The `NodeEvent` enum defines all the possible events that can be emitted by the TAP Node:
10//!
11//! - **PlainMessageReceived**: When a message is received by an agent
12//! - **PlainMessageSent**: When a message is sent from an agent to another
13//! - **AgentRegistered**: When a new agent is registered with the node
14//! - **AgentUnregistered**: When an agent is removed from the node
15//! - **DidResolved**: When a DID is resolved (successfully or not)
16//! - **AgentPlainMessage**: Raw message data intended for an agent
17//!
18//! ## Subscription Models
19//!
20//! The event system supports two subscription models:
21//!
22//! 1. **Callback-based**: Implementing the `EventSubscriber` trait to receive events via callbacks
23//! 2. **Channel-based**: Using `tokio::sync::broadcast` channels to receive events asynchronously
24//!
25//! ## Built-in Event Handlers
26//!
27//! The event system includes several built-in event handlers:
28//!
29//! - **EventLogger**: Logs all events to a configurable destination (console, file, or custom handler)
30//!
31//! ## Usage Examples
32//!
33//! ### Callback-based Subscription
34//!
35//! ```
36//! use std::sync::Arc;
37//! use async_trait::async_trait;
38//! use tap_node::event::{EventBus, EventSubscriber, NodeEvent};
39//!
40//! // Create a custom event handler
41//! struct LoggingEventHandler;
42//!
43//! #[async_trait]
44//! impl EventSubscriber for LoggingEventHandler {
45//!     async fn handle_event(&self, event: NodeEvent) {
46//!         match event {
47//!             NodeEvent::PlainMessageReceived { message } => {
48//!                 println!("PlainMessage received: {:?}", message);
49//!             },
50//!             NodeEvent::AgentRegistered { did } => {
51//!                 println!("Agent registered: {}", did);
52//!             },
53//!             // Handle other event types...
54//!             _ => {}
55//!         }
56//!     }
57//! }
58//!
59//! // Later, subscribe to events
60//! async fn subscribe_events(event_bus: &EventBus) {
61//!     let handler = Arc::new(LoggingEventHandler);
62//!     event_bus.subscribe(handler).await;
63//! }
64//! ```
65//!
66//! ### Channel-based Subscription
67//!
68//! ```
69//! use tap_node::event::{EventBus, NodeEvent};
70//! use tokio::spawn;
71//!
72//! async fn monitor_events(event_bus: &EventBus) {
73//!     // Get a receiver for the events
74//!     let mut receiver = event_bus.subscribe_channel();
75//!
76//!     // Process events in a separate task
77//!     spawn(async move {
78//!         while let Ok(event) = receiver.recv().await {
79//!             match event {
80//!                 NodeEvent::PlainMessageSent { message, from, to } => {
81//!                     println!("PlainMessage sent from {} to {}", from, to);
82//!                 },
83//!                 // Handle other events...
84//!                 _ => {}
85//!             }
86//!         }
87//!     });
88//! }
89//! ```
90//!
91//! ### Using the Event Logger
92//!
93//! ```no_run
94//! use std::sync::Arc;
95//! use tap_node::{NodeConfig, TapNode};
96//! use tap_node::event::logger::{EventLogger, EventLoggerConfig, LogDestination};
97//!
98//! async fn example() {
99//!     // Create a new TAP node
100//!     let node = TapNode::new(NodeConfig::default());
101//!     
102//!     // Configure the event logger
103//!     let logger_config = EventLoggerConfig {
104//!         destination: LogDestination::File {
105//!             path: "/var/log/tap-node/events.log".to_string(),
106//!             max_size: Some(10 * 1024 * 1024), // 10 MB
107//!             rotate: true,
108//!         },
109//!         structured: true, // Use JSON format
110//!         log_level: log::Level::Info,
111//!     };
112//!     
113//!     // Create and subscribe the event logger
114//!     let event_logger = Arc::new(EventLogger::new(logger_config));
115//!     node.event_bus().subscribe(event_logger).await;
116//! }
117//! ```
118//!
119//! ## Thread Safety
120//!
121//! The event system is designed to be thread-safe, with all mutable state protected
122//! by appropriate synchronization primitives. The `EventBus` can be safely shared
123//! across threads using `Arc<EventBus>`.
124
125pub mod handlers;
126pub mod logger;
127
128use async_trait::async_trait;
129use serde_json::Value;
130use std::sync::Arc;
131use tap_msg::didcomm::PlainMessage;
132use tokio::sync::{broadcast, RwLock};
133
134/// Event types that can be emitted by the TAP Node
135///
136/// The `NodeEvent` enum represents all the possible events that can occur
137/// within a TAP Node. These events can be subscribed to using the `EventBus`
138/// to enable reactive programming patterns and loose coupling between components.
139///
140/// # Event Categories
141///
142/// Events are broadly categorized into:
143///
144/// - **PlainMessage Events**: Related to message processing and delivery (PlainMessageReceived, PlainMessageSent)
145/// - **Agent Events**: Related to agent lifecycle management (AgentRegistered, AgentUnregistered)
146/// - **Resolution Events**: Related to DID resolution (DidResolved)
147/// - **Raw PlainMessage Events**: Raw binary messages for agents (AgentPlainMessage)
148///
149/// # Usage
150///
151/// Events are typically consumed by matching on the event type and taking appropriate action:
152///
153/// ```
154/// use tap_node::event::NodeEvent;
155///
156/// fn process_event(event: NodeEvent) {
157///     match event {
158///         NodeEvent::PlainMessageReceived { message } => {
159///             println!("PlainMessage received: {:?}", message);
160///         },
161///         NodeEvent::AgentRegistered { did } => {
162///             println!("Agent registered: {}", did);
163///         },
164///         // Handle other event types...
165///         _ => {}
166///     }
167/// }
168/// ```
169#[derive(Debug, Clone)]
170pub enum NodeEvent {
171    /// A DIDComm message was received by the node
172    ///
173    /// This event is triggered after a message has been successfully processed by
174    /// the node's incoming message processors. It contains the deserialized message
175    /// content as a JSON Value.
176    ///
177    /// # Parameters
178    ///
179    /// - `message`: The received message as a JSON Value
180    ///
181    /// # Example Use Cases
182    ///
183    /// - Monitoring and logging received messages
184    /// - Triggering follow-up actions based on message content
185    /// - Auditing message flow through the system
186    PlainMessageReceived {
187        /// The received message as a JSON Value
188        message: Value,
189    },
190
191    /// A DIDComm message was sent from one agent to another
192    ///
193    /// This event is triggered after a message has been successfully processed by
194    /// the node's outgoing message processors and prepared for delivery.
195    ///
196    /// # Parameters
197    ///
198    /// - `message`: The sent message as a JSON Value
199    /// - `from`: The DID of the sending agent
200    /// - `to`: The DID of the receiving agent
201    ///
202    /// # Example Use Cases
203    ///
204    /// - Tracking message delivery
205    /// - Analyzing communication patterns
206    /// - Generating message delivery receipts
207    PlainMessageSent {
208        /// The sent message as a JSON Value
209        message: Value,
210        /// The DID of the sending agent
211        from: String,
212        /// The DID of the receiving agent
213        to: String,
214    },
215
216    /// A new agent was registered with the node
217    ///
218    /// This event is triggered when an agent is successfully registered with the
219    /// node's agent registry. It contains the DID of the registered agent.
220    ///
221    /// # Parameters
222    ///
223    /// - `did`: The DID of the registered agent
224    ///
225    /// # Example Use Cases
226    ///
227    /// - Tracking agent lifecycle
228    /// - Initializing resources for new agents
229    /// - Notifying other components of new agent availability
230    AgentRegistered {
231        /// The DID of the registered agent
232        did: String,
233    },
234
235    /// An agent was unregistered from the node
236    ///
237    /// This event is triggered when an agent is removed from the node's agent
238    /// registry. It contains the DID of the unregistered agent.
239    ///
240    /// # Parameters
241    ///
242    /// - `did`: The DID of the unregistered agent
243    ///
244    /// # Example Use Cases
245    ///
246    /// - Cleanup of resources associated with the agent
247    /// - Notifying other components of agent removal
248    /// - Updating routing tables
249    AgentUnregistered {
250        /// The DID of the unregistered agent
251        did: String,
252    },
253
254    /// A DID was resolved by the node's resolver
255    ///
256    /// This event is triggered when the node attempts to resolve a DID. It includes
257    /// both the DID being resolved and whether the resolution was successful.
258    ///
259    /// # Parameters
260    ///
261    /// - `did`: The DID that was resolved
262    /// - `success`: Whether the resolution was successful
263    ///
264    /// # Example Use Cases
265    ///
266    /// - Monitoring resolution failures
267    /// - Caching resolution results
268    /// - Diagnostics and debugging
269    DidResolved {
270        /// The DID that was resolved
271        did: String,
272        /// Whether the resolution was successful
273        success: bool,
274    },
275
276    /// A raw message event for an agent
277    ///
278    /// This event contains raw binary message data intended for a specific agent.
279    /// It is typically used for low-level message delivery mechanisms.
280    ///
281    /// # Parameters
282    ///
283    /// - `did`: The DID of the target agent
284    /// - `message`: The raw binary message data
285    ///
286    /// # Example Use Cases
287    ///
288    /// - Direct message delivery to agents
289    /// - Integration with transport-specific mechanisms
290    /// - Binary protocol support
291    AgentPlainMessage {
292        /// The DID of the target agent
293        did: String,
294        /// The raw binary message data
295        message: Vec<u8>,
296    },
297
298    /// A message was rejected by validation
299    ///
300    /// This event is triggered when a message fails validation checks and is rejected.
301    /// It contains information about why the message was rejected.
302    ///
303    /// # Parameters
304    ///
305    /// - `message_id`: The ID of the rejected message
306    /// - `reason`: The reason for rejection
307    /// - `from`: The DID of the sender
308    /// - `to`: The DID of the intended recipient
309    ///
310    /// # Example Use Cases
311    ///
312    /// - Monitoring validation failures
313    /// - Alerting on suspicious activity
314    /// - Debugging message flow issues
315    MessageRejected {
316        /// The ID of the rejected message
317        message_id: String,
318        /// The reason for rejection
319        reason: String,
320        /// The DID of the sender
321        from: String,
322        /// The DID of the intended recipient
323        to: String,
324    },
325
326    /// A message was accepted and processed
327    ///
328    /// This event is triggered when a message passes all validation checks and is accepted
329    /// for processing. It indicates successful message reception and validation.
330    ///
331    /// # Parameters
332    ///
333    /// - `message_id`: The ID of the accepted message
334    /// - `message_type`: The type of the message
335    /// - `from`: The DID of the sender
336    /// - `to`: The DID of the recipient
337    ///
338    /// # Example Use Cases
339    ///
340    /// - Tracking successful message flow
341    /// - Updating message status in database
342    /// - Triggering downstream processing
343    MessageAccepted {
344        /// The ID of the accepted message
345        message_id: String,
346        /// The type of the message
347        message_type: String,
348        /// The DID of the sender
349        from: String,
350        /// The DID of the recipient
351        to: String,
352    },
353
354    /// A reply was received for a previous message
355    ///
356    /// This event is triggered when a message is received that is a reply to a previously
357    /// sent message. It includes both the original message and the reply for context.
358    ///
359    /// # Parameters
360    ///
361    /// - `original_message_id`: The ID of the original message
362    /// - `reply_message`: The reply message
363    /// - `original_message`: The original message being replied to
364    ///
365    /// # Example Use Cases
366    ///
367    /// - Correlating request/response pairs
368    /// - Tracking conversation flow
369    /// - Implementing timeout handling
370    ReplyReceived {
371        /// The ID of the original message
372        original_message_id: String,
373        /// The reply message
374        reply_message: PlainMessage,
375        /// The original message being replied to
376        original_message: PlainMessage,
377    },
378
379    /// A transaction's state has changed
380    ///
381    /// This event is triggered when a transaction transitions from one state to another.
382    /// It includes information about the state transition and optionally the agent that
383    /// triggered the change.
384    ///
385    /// # Parameters
386    ///
387    /// - `transaction_id`: The ID of the transaction
388    /// - `old_state`: The previous state
389    /// - `new_state`: The new state
390    /// - `agent_did`: The DID of the agent that triggered the change (if applicable)
391    ///
392    /// # Example Use Cases
393    ///
394    /// - Monitoring transaction lifecycle
395    /// - Triggering state-specific actions
396    /// - Auditing state transitions
397    TransactionStateChanged {
398        /// The ID of the transaction
399        transaction_id: String,
400        /// The previous state
401        old_state: String,
402        /// The new state
403        new_state: String,
404        /// The DID of the agent that triggered the change
405        agent_did: Option<String>,
406    },
407}
408
409/// Event subscriber trait for receiving node events
410///
411/// This trait defines the interface for components that want to receive
412/// node events via callbacks. Implementers must define the `handle_event`
413/// method to process events as they occur.
414///
415/// # Thread Safety
416///
417/// All implementations must be `Send + Sync` to ensure they can be safely
418/// used in multithreaded environments.
419///
420/// # Usage
421///
422/// ```
423/// use std::sync::Arc;
424/// use async_trait::async_trait;
425/// use tap_node::event::{EventSubscriber, NodeEvent, EventBus};
426///
427/// #[derive(Debug)]
428/// struct MyEventHandler {
429///     name: String,
430/// }
431///
432/// #[async_trait]
433/// impl EventSubscriber for MyEventHandler {
434///     async fn handle_event(&self, event: NodeEvent) {
435///         println!("Handler '{}' received event: {:?}", self.name, event);
436///     }
437/// }
438///
439/// async fn example(event_bus: &EventBus) {
440///     let handler = Arc::new(MyEventHandler { name: "Logger".to_string() });
441///     event_bus.subscribe(handler).await;
442/// }
443/// ```
444#[async_trait]
445pub trait EventSubscriber: Send + Sync {
446    /// Handle a node event
447    ///
448    /// This method is called whenever an event is published to the event bus.
449    /// Implementations should process the event appropriately based on its type.
450    ///
451    /// # Parameters
452    ///
453    /// - `event`: The NodeEvent to handle
454    ///
455    /// # Note
456    ///
457    /// - This method should return quickly to avoid blocking the event bus
458    /// - For long-running operations, spawn a separate task
459    /// - Handle errors gracefully, as exceptions may disrupt the event system
460    async fn handle_event(&self, event: NodeEvent);
461}
462
463/// Event bus for publishing and subscribing to node events
464///
465/// The `EventBus` is the central coordination point for the event system. It allows
466/// components to publish events and provides two mechanisms for subscribing to events:
467///
468/// 1. Callback-based: Register an `EventSubscriber` to receive events via callbacks
469/// 2. Channel-based: Get a `broadcast::Receiver<NodeEvent>` for async event processing
470///
471/// # Thread Safety
472///
473/// The `EventBus` is designed to be thread-safe, with all mutable state protected
474/// by appropriate synchronization primitives. It can be safely shared across threads
475/// using `Arc<EventBus>`.
476///
477/// # Example Usage
478///
479/// ```rust,no_run
480/// use std::sync::Arc;
481/// use tap_node::event::{EventBus, NodeEvent};
482///
483/// async fn example() {
484///     // Create a new event bus
485///     let event_bus = Arc::new(EventBus::new());
486///
487///     // Subscribe to events using a channel
488///     let mut receiver = event_bus.subscribe_channel();
489///
490///     // Publish an event using public methods
491///     let did = "did:example:123".to_string();
492///     event_bus.publish_agent_registered(did).await;
493///
494///     // Process events from the channel
495///     tokio::spawn(async move {
496///         while let Ok(event) = receiver.recv().await {
497///             println!("Received event: {:?}", event);
498///         }
499///     });
500/// }
501/// ```
502pub struct EventBus {
503    /// Sender for events
504    sender: broadcast::Sender<NodeEvent>,
505    /// Subscribers
506    subscribers: RwLock<Vec<Arc<dyn EventSubscriber>>>,
507}
508
509impl Default for EventBus {
510    fn default() -> Self {
511        Self::new()
512    }
513}
514
515impl Clone for EventBus {
516    fn clone(&self) -> Self {
517        Self {
518            sender: self.sender.clone(),
519            subscribers: RwLock::new(Vec::new()),
520        }
521    }
522}
523
524impl EventBus {
525    /// Create a new event bus
526    pub fn new() -> Self {
527        // Create a channel with capacity for 100 events
528        let (sender, _) = broadcast::channel(100);
529
530        Self {
531            sender,
532            subscribers: RwLock::new(Vec::new()),
533        }
534    }
535
536    /// Subscribe to node events
537    pub async fn subscribe(&self, subscriber: Arc<dyn EventSubscriber>) {
538        let mut subscribers = self.subscribers.write().await;
539        subscribers.push(subscriber);
540    }
541
542    /// Get a receiver for node events
543    pub fn subscribe_channel(&self) -> broadcast::Receiver<NodeEvent> {
544        self.sender.subscribe()
545    }
546
547    /// Remove a subscriber from the event bus
548    pub async fn unsubscribe(&self, subscriber: &Arc<dyn EventSubscriber>) {
549        let mut subscribers = self.subscribers.write().await;
550        subscribers.retain(|s| !Arc::ptr_eq(s, subscriber));
551    }
552
553    /// Publish a message received event
554    pub async fn publish_message_received(&self, message: PlainMessage) {
555        let event = NodeEvent::PlainMessageReceived {
556            message: serde_json::to_value(message).unwrap(),
557        };
558        self.publish_event(event).await;
559    }
560
561    /// Publish a message sent event
562    pub async fn publish_message_sent(&self, message: PlainMessage, from: String, to: String) {
563        let event = NodeEvent::PlainMessageSent {
564            message: serde_json::to_value(message).unwrap(),
565            from,
566            to,
567        };
568        self.publish_event(event).await;
569    }
570
571    /// Publish an agent registered event
572    pub async fn publish_agent_registered(&self, did: String) {
573        let event = NodeEvent::AgentRegistered { did };
574        self.publish_event(event).await;
575    }
576
577    /// Publish an agent unregistered event
578    pub async fn publish_agent_unregistered(&self, did: String) {
579        let event = NodeEvent::AgentUnregistered { did };
580        self.publish_event(event).await;
581    }
582
583    /// Publish an agent message event
584    pub async fn publish_agent_message(&self, did: String, message: Vec<u8>) {
585        let event = NodeEvent::AgentPlainMessage { did, message };
586        self.publish_event(event).await;
587    }
588
589    /// Publish a DID resolved event
590    pub async fn publish_did_resolved(&self, did: String, success: bool) {
591        let event = NodeEvent::DidResolved { did, success };
592        self.publish_event(event).await;
593    }
594
595    /// Publish a message rejected event
596    pub async fn publish_message_rejected(
597        &self,
598        message_id: String,
599        reason: String,
600        from: String,
601        to: String,
602    ) {
603        let event = NodeEvent::MessageRejected {
604            message_id,
605            reason,
606            from,
607            to,
608        };
609        self.publish_event(event).await;
610    }
611
612    /// Publish a message accepted event
613    pub async fn publish_message_accepted(
614        &self,
615        message_id: String,
616        message_type: String,
617        from: String,
618        to: String,
619    ) {
620        let event = NodeEvent::MessageAccepted {
621            message_id,
622            message_type,
623            from,
624            to,
625        };
626        self.publish_event(event).await;
627    }
628
629    /// Publish a reply received event
630    pub async fn publish_reply_received(
631        &self,
632        original_message_id: String,
633        reply_message: PlainMessage,
634        original_message: PlainMessage,
635    ) {
636        let event = NodeEvent::ReplyReceived {
637            original_message_id,
638            reply_message,
639            original_message,
640        };
641        self.publish_event(event).await;
642    }
643
644    /// Publish a transaction state changed event
645    pub async fn publish_transaction_state_changed(
646        &self,
647        transaction_id: String,
648        old_state: String,
649        new_state: String,
650        agent_did: Option<String>,
651    ) {
652        let event = NodeEvent::TransactionStateChanged {
653            transaction_id,
654            old_state,
655            new_state,
656            agent_did,
657        };
658        self.publish_event(event).await;
659    }
660
661    /// Publish an event to all subscribers
662    async fn publish_event(&self, event: NodeEvent) {
663        // Send to channel
664        let _ = self.sender.send(event.clone());
665
666        // Notify subscribers
667        for subscriber in self.subscribers.read().await.iter() {
668            subscriber.handle_event(event.clone()).await;
669        }
670    }
671}