tap_node/event/mod.rs
1//! # Event System for TAP Node
2//!
3//! This module provides a comprehensive event handling and subscription system for TAP Node.
4//! The event system allows components to publish and subscribe to various events that occur
5//! within the node, enabling loose coupling between components and reactive programming patterns.
6//!
7//! ## Event Types
8//!
9//! The `NodeEvent` enum defines all the possible events that can be emitted by the TAP Node:
10//!
11//! - **PlainMessageReceived**: When a message is received by an agent
12//! - **PlainMessageSent**: When a message is sent from an agent to another
13//! - **AgentRegistered**: When a new agent is registered with the node
14//! - **AgentUnregistered**: When an agent is removed from the node
15//! - **DidResolved**: When a DID is resolved (successfully or not)
16//! - **AgentPlainMessage**: Raw message data intended for an agent
17//!
18//! ## Subscription Models
19//!
20//! The event system supports two subscription models:
21//!
22//! 1. **Callback-based**: Implementing the `EventSubscriber` trait to receive events via callbacks
23//! 2. **Channel-based**: Using `tokio::sync::broadcast` channels to receive events asynchronously
24//!
25//! ## Built-in Event Handlers
26//!
27//! The event system includes several built-in event handlers:
28//!
29//! - **EventLogger**: Logs all events to a configurable destination (console, file, or custom handler)
30//!
31//! ## Usage Examples
32//!
33//! ### Callback-based Subscription
34//!
35//! ```
36//! use std::sync::Arc;
37//! use async_trait::async_trait;
38//! use tap_node::event::{EventBus, EventSubscriber, NodeEvent};
39//!
40//! // Create a custom event handler
41//! struct LoggingEventHandler;
42//!
43//! #[async_trait]
44//! impl EventSubscriber for LoggingEventHandler {
45//! async fn handle_event(&self, event: NodeEvent) {
46//! match event {
47//! NodeEvent::PlainMessageReceived { message } => {
48//! println!("PlainMessage received: {:?}", message);
49//! },
50//! NodeEvent::AgentRegistered { did } => {
51//! println!("Agent registered: {}", did);
52//! },
53//! // Handle other event types...
54//! _ => {}
55//! }
56//! }
57//! }
58//!
59//! // Later, subscribe to events
60//! async fn subscribe_events(event_bus: &EventBus) {
61//! let handler = Arc::new(LoggingEventHandler);
62//! event_bus.subscribe(handler).await;
63//! }
64//! ```
65//!
66//! ### Channel-based Subscription
67//!
68//! ```
69//! use tap_node::event::{EventBus, NodeEvent};
70//! use tokio::spawn;
71//!
72//! async fn monitor_events(event_bus: &EventBus) {
73//! // Get a receiver for the events
74//! let mut receiver = event_bus.subscribe_channel();
75//!
76//! // Process events in a separate task
77//! spawn(async move {
78//! while let Ok(event) = receiver.recv().await {
79//! match event {
80//! NodeEvent::PlainMessageSent { message, from, to } => {
81//! println!("PlainMessage sent from {} to {}", from, to);
82//! },
83//! // Handle other events...
84//! _ => {}
85//! }
86//! }
87//! });
88//! }
89//! ```
90//!
91//! ### Using the Event Logger
92//!
93//! ```no_run
94//! use std::sync::Arc;
95//! use tap_node::{NodeConfig, TapNode};
96//! use tap_node::event::logger::{EventLogger, EventLoggerConfig, LogDestination};
97//!
98//! async fn example() {
99//! // Create a new TAP node
100//! let node = TapNode::new(NodeConfig::default());
101//!
102//! // Configure the event logger
103//! let logger_config = EventLoggerConfig {
104//! destination: LogDestination::File {
105//! path: "/var/log/tap-node/events.log".to_string(),
106//! max_size: Some(10 * 1024 * 1024), // 10 MB
107//! rotate: true,
108//! },
109//! structured: true, // Use JSON format
110//! log_level: log::Level::Info,
111//! };
112//!
113//! // Create and subscribe the event logger
114//! let event_logger = Arc::new(EventLogger::new(logger_config));
115//! node.event_bus().subscribe(event_logger).await;
116//! }
117//! ```
118//!
119//! ## Thread Safety
120//!
121//! The event system is designed to be thread-safe, with all mutable state protected
122//! by appropriate synchronization primitives. The `EventBus` can be safely shared
123//! across threads using `Arc<EventBus>`.
124
125#[cfg(feature = "storage")]
126pub mod customer_handler;
127#[cfg(feature = "storage")]
128pub mod decision_expiration_handler;
129#[cfg(feature = "storage")]
130pub mod decision_log_handler;
131#[cfg(feature = "storage")]
132pub mod decision_state_handler;
133pub mod handlers;
134pub mod logger;
135pub mod trust_ping_handler;
136
137use async_trait::async_trait;
138use serde_json::Value;
139use std::sync::Arc;
140use tap_msg::didcomm::PlainMessage;
141use tokio::sync::{broadcast, RwLock};
142
143/// Event types that can be emitted by the TAP Node
144///
145/// The `NodeEvent` enum represents all the possible events that can occur
146/// within a TAP Node. These events can be subscribed to using the `EventBus`
147/// to enable reactive programming patterns and loose coupling between components.
148///
149/// # Event Categories
150///
151/// Events are broadly categorized into:
152///
153/// - **PlainMessage Events**: Related to message processing and delivery (PlainMessageReceived, PlainMessageSent)
154/// - **Agent Events**: Related to agent lifecycle management (AgentRegistered, AgentUnregistered)
155/// - **Resolution Events**: Related to DID resolution (DidResolved)
156/// - **Raw PlainMessage Events**: Raw binary messages for agents (AgentPlainMessage)
157///
158/// # Usage
159///
160/// Events are typically consumed by matching on the event type and taking appropriate action:
161///
162/// ```
163/// use tap_node::event::NodeEvent;
164///
165/// fn process_event(event: NodeEvent) {
166/// match event {
167/// NodeEvent::PlainMessageReceived { message } => {
168/// println!("PlainMessage received: {:?}", message);
169/// },
170/// NodeEvent::AgentRegistered { did } => {
171/// println!("Agent registered: {}", did);
172/// },
173/// // Handle other event types...
174/// _ => {}
175/// }
176/// }
177/// ```
178#[derive(Debug, Clone)]
179pub enum NodeEvent {
180 /// A DIDComm message was received by the node
181 ///
182 /// This event is triggered after a message has been successfully processed by
183 /// the node's incoming message processors. It contains the deserialized message
184 /// content as a JSON Value.
185 ///
186 /// # Parameters
187 ///
188 /// - `message`: The received message as a JSON Value
189 ///
190 /// # Example Use Cases
191 ///
192 /// - Monitoring and logging received messages
193 /// - Triggering follow-up actions based on message content
194 /// - Auditing message flow through the system
195 PlainMessageReceived {
196 /// The received message as a JSON Value
197 message: Value,
198 },
199
200 /// A DIDComm message was sent from one agent to another
201 ///
202 /// This event is triggered after a message has been successfully processed by
203 /// the node's outgoing message processors and prepared for delivery.
204 ///
205 /// # Parameters
206 ///
207 /// - `message`: The sent message as a JSON Value
208 /// - `from`: The DID of the sending agent
209 /// - `to`: The DID of the receiving agent
210 ///
211 /// # Example Use Cases
212 ///
213 /// - Tracking message delivery
214 /// - Analyzing communication patterns
215 /// - Generating message delivery receipts
216 PlainMessageSent {
217 /// The sent message as a JSON Value
218 message: Value,
219 /// The DID of the sending agent
220 from: String,
221 /// The DID of the receiving agent
222 to: String,
223 },
224
225 /// A new agent was registered with the node
226 ///
227 /// This event is triggered when an agent is successfully registered with the
228 /// node's agent registry. It contains the DID of the registered agent.
229 ///
230 /// # Parameters
231 ///
232 /// - `did`: The DID of the registered agent
233 ///
234 /// # Example Use Cases
235 ///
236 /// - Tracking agent lifecycle
237 /// - Initializing resources for new agents
238 /// - Notifying other components of new agent availability
239 AgentRegistered {
240 /// The DID of the registered agent
241 did: String,
242 },
243
244 /// An agent was unregistered from the node
245 ///
246 /// This event is triggered when an agent is removed from the node's agent
247 /// registry. It contains the DID of the unregistered agent.
248 ///
249 /// # Parameters
250 ///
251 /// - `did`: The DID of the unregistered agent
252 ///
253 /// # Example Use Cases
254 ///
255 /// - Cleanup of resources associated with the agent
256 /// - Notifying other components of agent removal
257 /// - Updating routing tables
258 AgentUnregistered {
259 /// The DID of the unregistered agent
260 did: String,
261 },
262
263 /// A DID was resolved by the node's resolver
264 ///
265 /// This event is triggered when the node attempts to resolve a DID. It includes
266 /// both the DID being resolved and whether the resolution was successful.
267 ///
268 /// # Parameters
269 ///
270 /// - `did`: The DID that was resolved
271 /// - `success`: Whether the resolution was successful
272 ///
273 /// # Example Use Cases
274 ///
275 /// - Monitoring resolution failures
276 /// - Caching resolution results
277 /// - Diagnostics and debugging
278 DidResolved {
279 /// The DID that was resolved
280 did: String,
281 /// Whether the resolution was successful
282 success: bool,
283 },
284
285 /// A raw message event for an agent
286 ///
287 /// This event contains raw binary message data intended for a specific agent.
288 /// It is typically used for low-level message delivery mechanisms.
289 ///
290 /// # Parameters
291 ///
292 /// - `did`: The DID of the target agent
293 /// - `message`: The raw binary message data
294 ///
295 /// # Example Use Cases
296 ///
297 /// - Direct message delivery to agents
298 /// - Integration with transport-specific mechanisms
299 /// - Binary protocol support
300 AgentPlainMessage {
301 /// The DID of the target agent
302 did: String,
303 /// The raw binary message data
304 message: Vec<u8>,
305 },
306
307 /// A message was rejected by validation
308 ///
309 /// This event is triggered when a message fails validation checks and is rejected.
310 /// It contains information about why the message was rejected.
311 ///
312 /// # Parameters
313 ///
314 /// - `message_id`: The ID of the rejected message
315 /// - `reason`: The reason for rejection
316 /// - `from`: The DID of the sender
317 /// - `to`: The DID of the intended recipient
318 ///
319 /// # Example Use Cases
320 ///
321 /// - Monitoring validation failures
322 /// - Alerting on suspicious activity
323 /// - Debugging message flow issues
324 MessageRejected {
325 /// The ID of the rejected message
326 message_id: String,
327 /// The reason for rejection
328 reason: String,
329 /// The DID of the sender
330 from: String,
331 /// The DID of the intended recipient
332 to: String,
333 },
334
335 /// A message was accepted and processed
336 ///
337 /// This event is triggered when a message passes all validation checks and is accepted
338 /// for processing. It indicates successful message reception and validation.
339 ///
340 /// # Parameters
341 ///
342 /// - `message_id`: The ID of the accepted message
343 /// - `message_type`: The type of the message
344 /// - `from`: The DID of the sender
345 /// - `to`: The DID of the recipient
346 ///
347 /// # Example Use Cases
348 ///
349 /// - Tracking successful message flow
350 /// - Updating message status in database
351 /// - Triggering downstream processing
352 MessageAccepted {
353 /// The ID of the accepted message
354 message_id: String,
355 /// The type of the message
356 message_type: String,
357 /// The DID of the sender
358 from: String,
359 /// The DID of the recipient
360 to: String,
361 },
362
363 /// A reply was received for a previous message
364 ///
365 /// This event is triggered when a message is received that is a reply to a previously
366 /// sent message. It includes both the original message and the reply for context.
367 ///
368 /// # Parameters
369 ///
370 /// - `original_message_id`: The ID of the original message
371 /// - `reply_message`: The reply message
372 /// - `original_message`: The original message being replied to
373 ///
374 /// # Example Use Cases
375 ///
376 /// - Correlating request/response pairs
377 /// - Tracking conversation flow
378 /// - Implementing timeout handling
379 ReplyReceived {
380 /// The ID of the original message
381 original_message_id: String,
382 /// The reply message
383 reply_message: PlainMessage,
384 /// The original message being replied to
385 original_message: Box<PlainMessage>,
386 },
387
388 /// A transaction's state has changed
389 ///
390 /// This event is triggered when a transaction transitions from one state to another.
391 /// It includes information about the state transition and optionally the agent that
392 /// triggered the change.
393 ///
394 /// # Parameters
395 ///
396 /// - `transaction_id`: The ID of the transaction
397 /// - `old_state`: The previous state
398 /// - `new_state`: The new state
399 /// - `agent_did`: The DID of the agent that triggered the change (if applicable)
400 ///
401 /// # Example Use Cases
402 ///
403 /// - Monitoring transaction lifecycle
404 /// - Triggering state-specific actions
405 /// - Auditing state transitions
406 TransactionStateChanged {
407 /// The ID of the transaction
408 transaction_id: String,
409 /// The previous state
410 old_state: String,
411 /// The new state
412 new_state: String,
413 /// The DID of the agent that triggered the change
414 agent_did: Option<String>,
415 },
416
417 /// New events for customer extraction and compliance
418
419 /// A message was received from a source
420 MessageReceived {
421 /// The received message
422 message: PlainMessage,
423 /// The source of the message
424 source: String,
425 },
426
427 /// A message was sent to a destination
428 MessageSent {
429 /// The sent message
430 message: PlainMessage,
431 /// The destination of the message
432 destination: String,
433 },
434
435 /// A new transaction was created
436 TransactionCreated {
437 /// The transaction data
438 transaction: crate::storage::Transaction,
439 /// The agent that created the transaction
440 agent_did: String,
441 },
442
443 /// A customer record was created or updated
444 CustomerUpdated {
445 /// The customer ID
446 customer_id: String,
447 /// The agent that owns the customer
448 agent_did: String,
449 /// The type of update (created, updated, verified)
450 update_type: String,
451 },
452
453 /// A transaction reached a decision point that requires external input.
454 ///
455 /// This event is published when the node's `DecisionMode` is set to
456 /// `EventBus`. External systems should subscribe to this event and
457 /// call back into the node (via sending appropriate TAP messages) to
458 /// advance the transaction.
459 ///
460 /// # Parameters
461 ///
462 /// - `transaction_id`: The transaction requiring a decision
463 /// - `transaction_state`: Current FSM state as a string
464 /// - `decision`: The decision type as a serialized JSON value
465 /// - `pending_agents`: DIDs of agents that still need to act
466 DecisionRequired {
467 /// The transaction requiring a decision
468 transaction_id: String,
469 /// Current FSM state
470 transaction_state: String,
471 /// The decision type and details (serialized Decision)
472 decision: Value,
473 /// DIDs of agents that still need to act
474 pending_agents: Vec<String>,
475 },
476}
477
478/// Event subscriber trait for receiving node events
479///
480/// This trait defines the interface for components that want to receive
481/// node events via callbacks. Implementers must define the `handle_event`
482/// method to process events as they occur.
483///
484/// # Thread Safety
485///
486/// All implementations must be `Send + Sync` to ensure they can be safely
487/// used in multithreaded environments.
488///
489/// # Usage
490///
491/// ```
492/// use std::sync::Arc;
493/// use async_trait::async_trait;
494/// use tap_node::event::{EventSubscriber, NodeEvent, EventBus};
495///
496/// #[derive(Debug)]
497/// struct MyEventHandler {
498/// name: String,
499/// }
500///
501/// #[async_trait]
502/// impl EventSubscriber for MyEventHandler {
503/// async fn handle_event(&self, event: NodeEvent) {
504/// println!("Handler '{}' received event: {:?}", self.name, event);
505/// }
506/// }
507///
508/// async fn example(event_bus: &EventBus) {
509/// let handler = Arc::new(MyEventHandler { name: "Logger".to_string() });
510/// event_bus.subscribe(handler).await;
511/// }
512/// ```
513#[async_trait]
514pub trait EventSubscriber: Send + Sync {
515 /// Handle a node event
516 ///
517 /// This method is called whenever an event is published to the event bus.
518 /// Implementations should process the event appropriately based on its type.
519 ///
520 /// # Parameters
521 ///
522 /// - `event`: The NodeEvent to handle
523 ///
524 /// # Note
525 ///
526 /// - This method should return quickly to avoid blocking the event bus
527 /// - For long-running operations, spawn a separate task
528 /// - Handle errors gracefully, as exceptions may disrupt the event system
529 async fn handle_event(&self, event: NodeEvent);
530}
531
532/// Event bus for publishing and subscribing to node events
533///
534/// The `EventBus` is the central coordination point for the event system. It allows
535/// components to publish events and provides two mechanisms for subscribing to events:
536///
537/// 1. Callback-based: Register an `EventSubscriber` to receive events via callbacks
538/// 2. Channel-based: Get a `broadcast::Receiver<NodeEvent>` for async event processing
539///
540/// # Thread Safety
541///
542/// The `EventBus` is designed to be thread-safe, with all mutable state protected
543/// by appropriate synchronization primitives. It can be safely shared across threads
544/// using `Arc<EventBus>`.
545///
546/// # Example Usage
547///
548/// ```rust,no_run
549/// use std::sync::Arc;
550/// use tap_node::event::{EventBus, NodeEvent};
551///
552/// async fn example() {
553/// // Create a new event bus
554/// let event_bus = Arc::new(EventBus::new());
555///
556/// // Subscribe to events using a channel
557/// let mut receiver = event_bus.subscribe_channel();
558///
559/// // Publish an event using public methods
560/// let did = "did:example:123".to_string();
561/// event_bus.publish_agent_registered(did).await;
562///
563/// // Process events from the channel
564/// tokio::spawn(async move {
565/// while let Ok(event) = receiver.recv().await {
566/// println!("Received event: {:?}", event);
567/// }
568/// });
569/// }
570/// ```
571pub struct EventBus {
572 /// Sender for events
573 sender: broadcast::Sender<NodeEvent>,
574 /// Subscribers
575 subscribers: RwLock<Vec<Arc<dyn EventSubscriber>>>,
576}
577
578impl Default for EventBus {
579 fn default() -> Self {
580 Self::new()
581 }
582}
583
584impl Clone for EventBus {
585 fn clone(&self) -> Self {
586 Self {
587 sender: self.sender.clone(),
588 subscribers: RwLock::new(Vec::new()),
589 }
590 }
591}
592
593impl EventBus {
594 /// Create a new event bus
595 pub fn new() -> Self {
596 // Create a channel with capacity for 100 events
597 let (sender, _) = broadcast::channel(100);
598
599 Self {
600 sender,
601 subscribers: RwLock::new(Vec::new()),
602 }
603 }
604
605 /// Subscribe to node events
606 pub async fn subscribe(&self, subscriber: Arc<dyn EventSubscriber>) {
607 let mut subscribers = self.subscribers.write().await;
608 subscribers.push(subscriber);
609 }
610
611 /// Get a receiver for node events
612 pub fn subscribe_channel(&self) -> broadcast::Receiver<NodeEvent> {
613 self.sender.subscribe()
614 }
615
616 /// Remove a subscriber from the event bus
617 pub async fn unsubscribe(&self, subscriber: &Arc<dyn EventSubscriber>) {
618 let mut subscribers = self.subscribers.write().await;
619 subscribers.retain(|s| !Arc::ptr_eq(s, subscriber));
620 }
621
622 /// Publish a message received event
623 pub async fn publish_message_received(&self, message: PlainMessage) {
624 let event = NodeEvent::PlainMessageReceived {
625 message: serde_json::to_value(message).unwrap(),
626 };
627 self.publish_event(event).await;
628 }
629
630 /// Publish a message sent event
631 pub async fn publish_message_sent(&self, message: PlainMessage, from: String, to: String) {
632 let event = NodeEvent::PlainMessageSent {
633 message: serde_json::to_value(message).unwrap(),
634 from,
635 to,
636 };
637 self.publish_event(event).await;
638 }
639
640 /// Publish an agent registered event
641 pub async fn publish_agent_registered(&self, did: String) {
642 let event = NodeEvent::AgentRegistered { did };
643 self.publish_event(event).await;
644 }
645
646 /// Publish an agent unregistered event
647 pub async fn publish_agent_unregistered(&self, did: String) {
648 let event = NodeEvent::AgentUnregistered { did };
649 self.publish_event(event).await;
650 }
651
652 /// Publish an agent message event
653 pub async fn publish_agent_message(&self, did: String, message: Vec<u8>) {
654 let event = NodeEvent::AgentPlainMessage { did, message };
655 self.publish_event(event).await;
656 }
657
658 /// Publish a DID resolved event
659 pub async fn publish_did_resolved(&self, did: String, success: bool) {
660 let event = NodeEvent::DidResolved { did, success };
661 self.publish_event(event).await;
662 }
663
664 /// Publish a message rejected event
665 pub async fn publish_message_rejected(
666 &self,
667 message_id: String,
668 reason: String,
669 from: String,
670 to: String,
671 ) {
672 let event = NodeEvent::MessageRejected {
673 message_id,
674 reason,
675 from,
676 to,
677 };
678 self.publish_event(event).await;
679 }
680
681 /// Publish a message accepted event
682 pub async fn publish_message_accepted(
683 &self,
684 message_id: String,
685 message_type: String,
686 from: String,
687 to: String,
688 ) {
689 let event = NodeEvent::MessageAccepted {
690 message_id,
691 message_type,
692 from,
693 to,
694 };
695 self.publish_event(event).await;
696 }
697
698 /// Publish a reply received event
699 pub async fn publish_reply_received(
700 &self,
701 original_message_id: String,
702 reply_message: PlainMessage,
703 original_message: PlainMessage,
704 ) {
705 let event = NodeEvent::ReplyReceived {
706 original_message_id,
707 reply_message,
708 original_message: Box::new(original_message),
709 };
710 self.publish_event(event).await;
711 }
712
713 /// Publish a transaction state changed event
714 pub async fn publish_transaction_state_changed(
715 &self,
716 transaction_id: String,
717 old_state: String,
718 new_state: String,
719 agent_did: Option<String>,
720 ) {
721 let event = NodeEvent::TransactionStateChanged {
722 transaction_id,
723 old_state,
724 new_state,
725 agent_did,
726 };
727 self.publish_event(event).await;
728 }
729
730 /// Publish a decision required event
731 pub async fn publish_decision_required(
732 &self,
733 transaction_id: String,
734 transaction_state: String,
735 decision: serde_json::Value,
736 pending_agents: Vec<String>,
737 ) {
738 let event = NodeEvent::DecisionRequired {
739 transaction_id,
740 transaction_state,
741 decision,
742 pending_agents,
743 };
744 self.publish_event(event).await;
745 }
746
747 /// Publish an event to all subscribers
748 pub async fn publish_event(&self, event: NodeEvent) {
749 // Send to channel
750 let _ = self.sender.send(event.clone());
751
752 // Notify subscribers
753 for subscriber in self.subscribers.read().await.iter() {
754 subscriber.handle_event(event.clone()).await;
755 }
756 }
757}