tap_node/event/
handlers.rs

1//! Event handlers for updating message and transaction statuses
2//!
3//! This module provides event handlers that respond to message acceptance/rejection events
4//! and update the corresponding database records.
5
6use super::{EventSubscriber, NodeEvent};
7use crate::storage::Storage;
8use async_trait::async_trait;
9use std::sync::Arc;
10
11/// Event handler for updating message status in the database
12///
13/// This handler listens for MessageAccepted and MessageRejected events
14/// and updates the corresponding message status in the database.
15pub struct MessageStatusHandler {
16    storage: Arc<Storage>,
17}
18
19impl MessageStatusHandler {
20    /// Create a new message status handler
21    pub fn new(storage: Arc<Storage>) -> Self {
22        Self { storage }
23    }
24
25    /// Update message status in the database
26    async fn update_message_status(&self, message_id: &str, status: &str) {
27        if let Err(e) = self.storage.update_message_status(message_id, status).await {
28            log::error!("Failed to update message status for {}: {}", message_id, e);
29        }
30    }
31}
32
33#[async_trait]
34impl EventSubscriber for MessageStatusHandler {
35    async fn handle_event(&self, event: NodeEvent) {
36        match event {
37            NodeEvent::MessageAccepted { message_id, .. } => {
38                self.update_message_status(&message_id, "accepted").await;
39            }
40            NodeEvent::MessageRejected { message_id, .. } => {
41                self.update_message_status(&message_id, "rejected").await;
42            }
43            _ => {} // Ignore other events
44        }
45    }
46}
47
48/// Event handler for updating transaction state in the database
49///
50/// This handler listens for TransactionStateChanged events
51/// and updates the corresponding transaction status in the database.
52pub struct TransactionStateHandler {
53    storage: Arc<Storage>,
54}
55
56impl TransactionStateHandler {
57    /// Create a new transaction state handler
58    pub fn new(storage: Arc<Storage>) -> Self {
59        Self { storage }
60    }
61
62    /// Update transaction status in the database
63    async fn update_transaction_status(&self, transaction_id: &str, status: &str) {
64        if let Err(e) = self
65            .storage
66            .update_transaction_status(transaction_id, status)
67            .await
68        {
69            log::error!(
70                "Failed to update transaction status for {}: {}",
71                transaction_id,
72                e
73            );
74        }
75    }
76}
77
78#[async_trait]
79impl EventSubscriber for TransactionStateHandler {
80    async fn handle_event(&self, event: NodeEvent) {
81        if let NodeEvent::TransactionStateChanged {
82            transaction_id,
83            new_state,
84            ..
85        } = event
86        {
87            self.update_transaction_status(&transaction_id, &new_state)
88                .await;
89        }
90    }
91}
92
93/// Event handler for logging transaction state transitions
94///
95/// This handler provides detailed logging of transaction state changes
96/// for debugging and auditing purposes.
97pub struct TransactionAuditHandler;
98
99impl TransactionAuditHandler {
100    /// Create a new transaction audit handler
101    pub fn new() -> Self {
102        Self
103    }
104}
105
106impl Default for TransactionAuditHandler {
107    fn default() -> Self {
108        Self::new()
109    }
110}
111
112#[async_trait]
113impl EventSubscriber for TransactionAuditHandler {
114    async fn handle_event(&self, event: NodeEvent) {
115        match event {
116            NodeEvent::TransactionStateChanged {
117                transaction_id,
118                old_state,
119                new_state,
120                agent_did,
121            } => match agent_did {
122                Some(did) => {
123                    log::info!(
124                        "Transaction {} state changed from '{}' to '{}' by agent {}",
125                        transaction_id,
126                        old_state,
127                        new_state,
128                        did
129                    );
130                }
131                None => {
132                    log::info!(
133                        "Transaction {} state changed from '{}' to '{}'",
134                        transaction_id,
135                        old_state,
136                        new_state
137                    );
138                }
139            },
140            NodeEvent::MessageAccepted {
141                message_id,
142                message_type,
143                from,
144                to,
145            } => {
146                log::info!(
147                    "Message {} of type {} accepted from {} to {}",
148                    message_id,
149                    message_type,
150                    from,
151                    to
152                );
153            }
154            NodeEvent::MessageRejected {
155                message_id,
156                reason,
157                from,
158                to,
159            } => {
160                log::warn!(
161                    "Message {} rejected from {} to {}: {}",
162                    message_id,
163                    from,
164                    to,
165                    reason
166                );
167            }
168            NodeEvent::ReplyReceived {
169                original_message_id,
170                ..
171            } => {
172                log::info!("Reply received for message {}", original_message_id);
173            }
174            _ => {} // Ignore other events
175        }
176    }
177}