Skip to main content

tap_node/event/
mod.rs

1//! # Event System for TAP Node
2//!
3//! This module provides a comprehensive event handling and subscription system for TAP Node.
4//! The event system allows components to publish and subscribe to various events that occur
5//! within the node, enabling loose coupling between components and reactive programming patterns.
6//!
7//! ## Event Types
8//!
9//! The `NodeEvent` enum defines all the possible events that can be emitted by the TAP Node:
10//!
11//! - **PlainMessageReceived**: When a message is received by an agent
12//! - **PlainMessageSent**: When a message is sent from an agent to another
13//! - **AgentRegistered**: When a new agent is registered with the node
14//! - **AgentUnregistered**: When an agent is removed from the node
15//! - **DidResolved**: When a DID is resolved (successfully or not)
16//! - **AgentPlainMessage**: Raw message data intended for an agent
17//!
18//! ## Subscription Models
19//!
20//! The event system supports two subscription models:
21//!
22//! 1. **Callback-based**: Implementing the `EventSubscriber` trait to receive events via callbacks
23//! 2. **Channel-based**: Using `tokio::sync::broadcast` channels to receive events asynchronously
24//!
25//! ## Built-in Event Handlers
26//!
27//! The event system includes several built-in event handlers:
28//!
29//! - **EventLogger**: Logs all events to a configurable destination (console, file, or custom handler)
30//!
31//! ## Usage Examples
32//!
33//! ### Callback-based Subscription
34//!
35//! ```
36//! use std::sync::Arc;
37//! use async_trait::async_trait;
38//! use tap_node::event::{EventBus, EventSubscriber, NodeEvent};
39//!
40//! // Create a custom event handler
41//! struct LoggingEventHandler;
42//!
43//! #[async_trait]
44//! impl EventSubscriber for LoggingEventHandler {
45//!     async fn handle_event(&self, event: NodeEvent) {
46//!         match event {
47//!             NodeEvent::PlainMessageReceived { message } => {
48//!                 println!("PlainMessage received: {:?}", message);
49//!             },
50//!             NodeEvent::AgentRegistered { did } => {
51//!                 println!("Agent registered: {}", did);
52//!             },
53//!             // Handle other event types...
54//!             _ => {}
55//!         }
56//!     }
57//! }
58//!
59//! // Later, subscribe to events
60//! async fn subscribe_events(event_bus: &EventBus) {
61//!     let handler = Arc::new(LoggingEventHandler);
62//!     event_bus.subscribe(handler).await;
63//! }
64//! ```
65//!
66//! ### Channel-based Subscription
67//!
68//! ```
69//! use tap_node::event::{EventBus, NodeEvent};
70//! use tokio::spawn;
71//!
72//! async fn monitor_events(event_bus: &EventBus) {
73//!     // Get a receiver for the events
74//!     let mut receiver = event_bus.subscribe_channel();
75//!
76//!     // Process events in a separate task
77//!     spawn(async move {
78//!         while let Ok(event) = receiver.recv().await {
79//!             match event {
80//!                 NodeEvent::PlainMessageSent { message, from, to } => {
81//!                     println!("PlainMessage sent from {} to {}", from, to);
82//!                 },
83//!                 // Handle other events...
84//!                 _ => {}
85//!             }
86//!         }
87//!     });
88//! }
89//! ```
90//!
91//! ### Using the Event Logger
92//!
93//! ```no_run
94//! use std::sync::Arc;
95//! use tap_node::{NodeConfig, TapNode};
96//! use tap_node::event::logger::{EventLogger, EventLoggerConfig, LogDestination};
97//!
98//! async fn example() {
99//!     // Create a new TAP node
100//!     let node = TapNode::new(NodeConfig::default());
101//!     
102//!     // Configure the event logger
103//!     let logger_config = EventLoggerConfig {
104//!         destination: LogDestination::File {
105//!             path: "/var/log/tap-node/events.log".to_string(),
106//!             max_size: Some(10 * 1024 * 1024), // 10 MB
107//!             rotate: true,
108//!         },
109//!         structured: true, // Use JSON format
110//!         log_level: log::Level::Info,
111//!     };
112//!     
113//!     // Create and subscribe the event logger
114//!     let event_logger = Arc::new(EventLogger::new(logger_config));
115//!     node.event_bus().subscribe(event_logger).await;
116//! }
117//! ```
118//!
119//! ## Thread Safety
120//!
121//! The event system is designed to be thread-safe, with all mutable state protected
122//! by appropriate synchronization primitives. The `EventBus` can be safely shared
123//! across threads using `Arc<EventBus>`.
124
125#[cfg(feature = "storage")]
126pub mod customer_handler;
127#[cfg(feature = "storage")]
128pub mod decision_expiration_handler;
129#[cfg(feature = "storage")]
130pub mod decision_log_handler;
131#[cfg(feature = "storage")]
132pub mod decision_state_handler;
133pub mod handlers;
134pub mod logger;
135pub mod trust_ping_handler;
136
137use async_trait::async_trait;
138use serde_json::Value;
139use std::sync::Arc;
140use tap_msg::didcomm::PlainMessage;
141use tokio::sync::{broadcast, RwLock};
142
143/// Event types that can be emitted by the TAP Node
144///
145/// The `NodeEvent` enum represents all the possible events that can occur
146/// within a TAP Node. These events can be subscribed to using the `EventBus`
147/// to enable reactive programming patterns and loose coupling between components.
148///
149/// # Event Categories
150///
151/// Events are broadly categorized into:
152///
153/// - **PlainMessage Events**: Related to message processing and delivery (PlainMessageReceived, PlainMessageSent)
154/// - **Agent Events**: Related to agent lifecycle management (AgentRegistered, AgentUnregistered)
155/// - **Resolution Events**: Related to DID resolution (DidResolved)
156/// - **Raw PlainMessage Events**: Raw binary messages for agents (AgentPlainMessage)
157///
158/// # Usage
159///
160/// Events are typically consumed by matching on the event type and taking appropriate action:
161///
162/// ```
163/// use tap_node::event::NodeEvent;
164///
165/// fn process_event(event: NodeEvent) {
166///     match event {
167///         NodeEvent::PlainMessageReceived { message } => {
168///             println!("PlainMessage received: {:?}", message);
169///         },
170///         NodeEvent::AgentRegistered { did } => {
171///             println!("Agent registered: {}", did);
172///         },
173///         // Handle other event types...
174///         _ => {}
175///     }
176/// }
177/// ```
178#[derive(Debug, Clone)]
179pub enum NodeEvent {
180    /// A DIDComm message was received by the node
181    ///
182    /// This event is triggered after a message has been successfully processed by
183    /// the node's incoming message processors. It contains the deserialized message
184    /// content as a JSON Value.
185    ///
186    /// # Parameters
187    ///
188    /// - `message`: The received message as a JSON Value
189    ///
190    /// # Example Use Cases
191    ///
192    /// - Monitoring and logging received messages
193    /// - Triggering follow-up actions based on message content
194    /// - Auditing message flow through the system
195    PlainMessageReceived {
196        /// The received message as a JSON Value
197        message: Value,
198    },
199
200    /// A DIDComm message was sent from one agent to another
201    ///
202    /// This event is triggered after a message has been successfully processed by
203    /// the node's outgoing message processors and prepared for delivery.
204    ///
205    /// # Parameters
206    ///
207    /// - `message`: The sent message as a JSON Value
208    /// - `from`: The DID of the sending agent
209    /// - `to`: The DID of the receiving agent
210    ///
211    /// # Example Use Cases
212    ///
213    /// - Tracking message delivery
214    /// - Analyzing communication patterns
215    /// - Generating message delivery receipts
216    PlainMessageSent {
217        /// The sent message as a JSON Value
218        message: Value,
219        /// The DID of the sending agent
220        from: String,
221        /// The DID of the receiving agent
222        to: String,
223    },
224
225    /// A new agent was registered with the node
226    ///
227    /// This event is triggered when an agent is successfully registered with the
228    /// node's agent registry. It contains the DID of the registered agent.
229    ///
230    /// # Parameters
231    ///
232    /// - `did`: The DID of the registered agent
233    ///
234    /// # Example Use Cases
235    ///
236    /// - Tracking agent lifecycle
237    /// - Initializing resources for new agents
238    /// - Notifying other components of new agent availability
239    AgentRegistered {
240        /// The DID of the registered agent
241        did: String,
242    },
243
244    /// An agent was unregistered from the node
245    ///
246    /// This event is triggered when an agent is removed from the node's agent
247    /// registry. It contains the DID of the unregistered agent.
248    ///
249    /// # Parameters
250    ///
251    /// - `did`: The DID of the unregistered agent
252    ///
253    /// # Example Use Cases
254    ///
255    /// - Cleanup of resources associated with the agent
256    /// - Notifying other components of agent removal
257    /// - Updating routing tables
258    AgentUnregistered {
259        /// The DID of the unregistered agent
260        did: String,
261    },
262
263    /// A DID was resolved by the node's resolver
264    ///
265    /// This event is triggered when the node attempts to resolve a DID. It includes
266    /// both the DID being resolved and whether the resolution was successful.
267    ///
268    /// # Parameters
269    ///
270    /// - `did`: The DID that was resolved
271    /// - `success`: Whether the resolution was successful
272    ///
273    /// # Example Use Cases
274    ///
275    /// - Monitoring resolution failures
276    /// - Caching resolution results
277    /// - Diagnostics and debugging
278    DidResolved {
279        /// The DID that was resolved
280        did: String,
281        /// Whether the resolution was successful
282        success: bool,
283    },
284
285    /// A raw message event for an agent
286    ///
287    /// This event contains raw binary message data intended for a specific agent.
288    /// It is typically used for low-level message delivery mechanisms.
289    ///
290    /// # Parameters
291    ///
292    /// - `did`: The DID of the target agent
293    /// - `message`: The raw binary message data
294    ///
295    /// # Example Use Cases
296    ///
297    /// - Direct message delivery to agents
298    /// - Integration with transport-specific mechanisms
299    /// - Binary protocol support
300    AgentPlainMessage {
301        /// The DID of the target agent
302        did: String,
303        /// The raw binary message data
304        message: Vec<u8>,
305    },
306
307    /// A message was rejected by validation
308    ///
309    /// This event is triggered when a message fails validation checks and is rejected.
310    /// It contains information about why the message was rejected.
311    ///
312    /// # Parameters
313    ///
314    /// - `message_id`: The ID of the rejected message
315    /// - `reason`: The reason for rejection
316    /// - `from`: The DID of the sender
317    /// - `to`: The DID of the intended recipient
318    ///
319    /// # Example Use Cases
320    ///
321    /// - Monitoring validation failures
322    /// - Alerting on suspicious activity
323    /// - Debugging message flow issues
324    MessageRejected {
325        /// The ID of the rejected message
326        message_id: String,
327        /// The reason for rejection
328        reason: String,
329        /// The DID of the sender
330        from: String,
331        /// The DID of the intended recipient
332        to: String,
333    },
334
335    /// A message was accepted and processed
336    ///
337    /// This event is triggered when a message passes all validation checks and is accepted
338    /// for processing. It indicates successful message reception and validation.
339    ///
340    /// # Parameters
341    ///
342    /// - `message_id`: The ID of the accepted message
343    /// - `message_type`: The type of the message
344    /// - `from`: The DID of the sender
345    /// - `to`: The DID of the recipient
346    ///
347    /// # Example Use Cases
348    ///
349    /// - Tracking successful message flow
350    /// - Updating message status in database
351    /// - Triggering downstream processing
352    MessageAccepted {
353        /// The ID of the accepted message
354        message_id: String,
355        /// The type of the message
356        message_type: String,
357        /// The DID of the sender
358        from: String,
359        /// The DID of the recipient
360        to: String,
361    },
362
363    /// A reply was received for a previous message
364    ///
365    /// This event is triggered when a message is received that is a reply to a previously
366    /// sent message. It includes both the original message and the reply for context.
367    ///
368    /// # Parameters
369    ///
370    /// - `original_message_id`: The ID of the original message
371    /// - `reply_message`: The reply message
372    /// - `original_message`: The original message being replied to
373    ///
374    /// # Example Use Cases
375    ///
376    /// - Correlating request/response pairs
377    /// - Tracking conversation flow
378    /// - Implementing timeout handling
379    ReplyReceived {
380        /// The ID of the original message
381        original_message_id: String,
382        /// The reply message
383        reply_message: PlainMessage,
384        /// The original message being replied to
385        original_message: Box<PlainMessage>,
386    },
387
388    /// A transaction's state has changed
389    ///
390    /// This event is triggered when a transaction transitions from one state to another.
391    /// It includes information about the state transition and optionally the agent that
392    /// triggered the change.
393    ///
394    /// # Parameters
395    ///
396    /// - `transaction_id`: The ID of the transaction
397    /// - `old_state`: The previous state
398    /// - `new_state`: The new state
399    /// - `agent_did`: The DID of the agent that triggered the change (if applicable)
400    ///
401    /// # Example Use Cases
402    ///
403    /// - Monitoring transaction lifecycle
404    /// - Triggering state-specific actions
405    /// - Auditing state transitions
406    TransactionStateChanged {
407        /// The ID of the transaction
408        transaction_id: String,
409        /// The previous state
410        old_state: String,
411        /// The new state
412        new_state: String,
413        /// The DID of the agent that triggered the change
414        agent_did: Option<String>,
415    },
416
417    /// New events for customer extraction and compliance
418
419    /// A message was received from a source
420    MessageReceived {
421        /// The received message
422        message: PlainMessage,
423        /// The source of the message
424        source: String,
425    },
426
427    /// A message was sent to a destination
428    MessageSent {
429        /// The sent message
430        message: PlainMessage,
431        /// The destination of the message
432        destination: String,
433    },
434
435    /// A new transaction was created
436    TransactionCreated {
437        /// The transaction data
438        transaction: crate::storage::Transaction,
439        /// The agent that created the transaction
440        agent_did: String,
441    },
442
443    /// A customer record was created or updated
444    CustomerUpdated {
445        /// The customer ID
446        customer_id: String,
447        /// The agent that owns the customer
448        agent_did: String,
449        /// The type of update (created, updated, verified)
450        update_type: String,
451    },
452
453    /// A transaction reached a decision point that requires external input.
454    ///
455    /// This event is published when the node's `DecisionMode` is set to
456    /// `EventBus`. External systems should subscribe to this event and
457    /// call back into the node (via sending appropriate TAP messages) to
458    /// advance the transaction.
459    ///
460    /// # Parameters
461    ///
462    /// - `transaction_id`: The transaction requiring a decision
463    /// - `transaction_state`: Current FSM state as a string
464    /// - `decision`: The decision type as a serialized JSON value
465    /// - `pending_agents`: DIDs of agents that still need to act
466    DecisionRequired {
467        /// The transaction requiring a decision
468        transaction_id: String,
469        /// Current FSM state
470        transaction_state: String,
471        /// The decision type and details (serialized Decision)
472        decision: Value,
473        /// DIDs of agents that still need to act
474        pending_agents: Vec<String>,
475    },
476}
477
478/// Event subscriber trait for receiving node events
479///
480/// This trait defines the interface for components that want to receive
481/// node events via callbacks. Implementers must define the `handle_event`
482/// method to process events as they occur.
483///
484/// # Thread Safety
485///
486/// All implementations must be `Send + Sync` to ensure they can be safely
487/// used in multithreaded environments.
488///
489/// # Usage
490///
491/// ```
492/// use std::sync::Arc;
493/// use async_trait::async_trait;
494/// use tap_node::event::{EventSubscriber, NodeEvent, EventBus};
495///
496/// #[derive(Debug)]
497/// struct MyEventHandler {
498///     name: String,
499/// }
500///
501/// #[async_trait]
502/// impl EventSubscriber for MyEventHandler {
503///     async fn handle_event(&self, event: NodeEvent) {
504///         println!("Handler '{}' received event: {:?}", self.name, event);
505///     }
506/// }
507///
508/// async fn example(event_bus: &EventBus) {
509///     let handler = Arc::new(MyEventHandler { name: "Logger".to_string() });
510///     event_bus.subscribe(handler).await;
511/// }
512/// ```
513#[async_trait]
514pub trait EventSubscriber: Send + Sync {
515    /// Handle a node event
516    ///
517    /// This method is called whenever an event is published to the event bus.
518    /// Implementations should process the event appropriately based on its type.
519    ///
520    /// # Parameters
521    ///
522    /// - `event`: The NodeEvent to handle
523    ///
524    /// # Note
525    ///
526    /// - This method should return quickly to avoid blocking the event bus
527    /// - For long-running operations, spawn a separate task
528    /// - Handle errors gracefully, as exceptions may disrupt the event system
529    async fn handle_event(&self, event: NodeEvent);
530}
531
532/// Event bus for publishing and subscribing to node events
533///
534/// The `EventBus` is the central coordination point for the event system. It allows
535/// components to publish events and provides two mechanisms for subscribing to events:
536///
537/// 1. Callback-based: Register an `EventSubscriber` to receive events via callbacks
538/// 2. Channel-based: Get a `broadcast::Receiver<NodeEvent>` for async event processing
539///
540/// # Thread Safety
541///
542/// The `EventBus` is designed to be thread-safe, with all mutable state protected
543/// by appropriate synchronization primitives. It can be safely shared across threads
544/// using `Arc<EventBus>`.
545///
546/// # Example Usage
547///
548/// ```rust,no_run
549/// use std::sync::Arc;
550/// use tap_node::event::{EventBus, NodeEvent};
551///
552/// async fn example() {
553///     // Create a new event bus
554///     let event_bus = Arc::new(EventBus::new());
555///
556///     // Subscribe to events using a channel
557///     let mut receiver = event_bus.subscribe_channel();
558///
559///     // Publish an event using public methods
560///     let did = "did:example:123".to_string();
561///     event_bus.publish_agent_registered(did).await;
562///
563///     // Process events from the channel
564///     tokio::spawn(async move {
565///         while let Ok(event) = receiver.recv().await {
566///             println!("Received event: {:?}", event);
567///         }
568///     });
569/// }
570/// ```
571pub struct EventBus {
572    /// Sender for events
573    sender: broadcast::Sender<NodeEvent>,
574    /// Subscribers
575    subscribers: RwLock<Vec<Arc<dyn EventSubscriber>>>,
576}
577
578impl Default for EventBus {
579    fn default() -> Self {
580        Self::new()
581    }
582}
583
584impl Clone for EventBus {
585    fn clone(&self) -> Self {
586        Self {
587            sender: self.sender.clone(),
588            subscribers: RwLock::new(Vec::new()),
589        }
590    }
591}
592
593impl EventBus {
594    /// Create a new event bus
595    pub fn new() -> Self {
596        // Create a channel with capacity for 100 events
597        let (sender, _) = broadcast::channel(100);
598
599        Self {
600            sender,
601            subscribers: RwLock::new(Vec::new()),
602        }
603    }
604
605    /// Subscribe to node events
606    pub async fn subscribe(&self, subscriber: Arc<dyn EventSubscriber>) {
607        let mut subscribers = self.subscribers.write().await;
608        subscribers.push(subscriber);
609    }
610
611    /// Get a receiver for node events
612    pub fn subscribe_channel(&self) -> broadcast::Receiver<NodeEvent> {
613        self.sender.subscribe()
614    }
615
616    /// Remove a subscriber from the event bus
617    pub async fn unsubscribe(&self, subscriber: &Arc<dyn EventSubscriber>) {
618        let mut subscribers = self.subscribers.write().await;
619        subscribers.retain(|s| !Arc::ptr_eq(s, subscriber));
620    }
621
622    /// Publish a message received event
623    pub async fn publish_message_received(&self, message: PlainMessage) {
624        let event = NodeEvent::PlainMessageReceived {
625            message: serde_json::to_value(message).unwrap(),
626        };
627        self.publish_event(event).await;
628    }
629
630    /// Publish a message sent event
631    pub async fn publish_message_sent(&self, message: PlainMessage, from: String, to: String) {
632        let event = NodeEvent::PlainMessageSent {
633            message: serde_json::to_value(message).unwrap(),
634            from,
635            to,
636        };
637        self.publish_event(event).await;
638    }
639
640    /// Publish an agent registered event
641    pub async fn publish_agent_registered(&self, did: String) {
642        let event = NodeEvent::AgentRegistered { did };
643        self.publish_event(event).await;
644    }
645
646    /// Publish an agent unregistered event
647    pub async fn publish_agent_unregistered(&self, did: String) {
648        let event = NodeEvent::AgentUnregistered { did };
649        self.publish_event(event).await;
650    }
651
652    /// Publish an agent message event
653    pub async fn publish_agent_message(&self, did: String, message: Vec<u8>) {
654        let event = NodeEvent::AgentPlainMessage { did, message };
655        self.publish_event(event).await;
656    }
657
658    /// Publish a DID resolved event
659    pub async fn publish_did_resolved(&self, did: String, success: bool) {
660        let event = NodeEvent::DidResolved { did, success };
661        self.publish_event(event).await;
662    }
663
664    /// Publish a message rejected event
665    pub async fn publish_message_rejected(
666        &self,
667        message_id: String,
668        reason: String,
669        from: String,
670        to: String,
671    ) {
672        let event = NodeEvent::MessageRejected {
673            message_id,
674            reason,
675            from,
676            to,
677        };
678        self.publish_event(event).await;
679    }
680
681    /// Publish a message accepted event
682    pub async fn publish_message_accepted(
683        &self,
684        message_id: String,
685        message_type: String,
686        from: String,
687        to: String,
688    ) {
689        let event = NodeEvent::MessageAccepted {
690            message_id,
691            message_type,
692            from,
693            to,
694        };
695        self.publish_event(event).await;
696    }
697
698    /// Publish a reply received event
699    pub async fn publish_reply_received(
700        &self,
701        original_message_id: String,
702        reply_message: PlainMessage,
703        original_message: PlainMessage,
704    ) {
705        let event = NodeEvent::ReplyReceived {
706            original_message_id,
707            reply_message,
708            original_message: Box::new(original_message),
709        };
710        self.publish_event(event).await;
711    }
712
713    /// Publish a transaction state changed event
714    pub async fn publish_transaction_state_changed(
715        &self,
716        transaction_id: String,
717        old_state: String,
718        new_state: String,
719        agent_did: Option<String>,
720    ) {
721        let event = NodeEvent::TransactionStateChanged {
722            transaction_id,
723            old_state,
724            new_state,
725            agent_did,
726        };
727        self.publish_event(event).await;
728    }
729
730    /// Publish a decision required event
731    pub async fn publish_decision_required(
732        &self,
733        transaction_id: String,
734        transaction_state: String,
735        decision: serde_json::Value,
736        pending_agents: Vec<String>,
737    ) {
738        let event = NodeEvent::DecisionRequired {
739            transaction_id,
740            transaction_state,
741            decision,
742            pending_agents,
743        };
744        self.publish_event(event).await;
745    }
746
747    /// Publish an event to all subscribers
748    pub async fn publish_event(&self, event: NodeEvent) {
749        // Send to channel
750        let _ = self.sender.send(event.clone());
751
752        // Notify subscribers
753        for subscriber in self.subscribers.read().await.iter() {
754            subscriber.handle_event(event.clone()).await;
755        }
756    }
757}