tap_node/event/
decision_expiration_handler.rs1use super::{EventSubscriber, NodeEvent};
9use crate::state_machine::fsm::TransactionState;
10use crate::storage::Storage;
11use async_trait::async_trait;
12use std::sync::Arc;
13use tracing::{debug, error};
14
15pub struct DecisionExpirationHandler {
17 storage: Arc<Storage>,
18}
19
20impl DecisionExpirationHandler {
21 pub fn new(storage: Arc<Storage>) -> Self {
23 Self { storage }
24 }
25}
26
27#[async_trait]
28impl EventSubscriber for DecisionExpirationHandler {
29 async fn handle_event(&self, event: NodeEvent) {
30 if let NodeEvent::TransactionStateChanged {
31 transaction_id,
32 new_state,
33 ..
34 } = event
35 {
36 if let Ok(state) = new_state.parse::<TransactionState>() {
38 if state.is_terminal() {
39 debug!(
40 "Transaction {} reached terminal state {}, expiring pending decisions",
41 transaction_id, new_state
42 );
43 match self
44 .storage
45 .expire_decisions_for_transaction(&transaction_id)
46 .await
47 {
48 Ok(count) => {
49 if count > 0 {
50 debug!(
51 "Expired {} decisions for transaction {}",
52 count, transaction_id
53 );
54 }
55 }
56 Err(e) => {
57 error!(
58 "Failed to expire decisions for transaction {}: {}",
59 transaction_id, e
60 );
61 }
62 }
63 }
64 }
65 }
66 }
67}
68
69#[cfg(test)]
70mod tests {
71 use super::*;
72 use crate::storage::{DecisionStatus, DecisionType};
73 use serde_json::json;
74
75 #[tokio::test]
76 async fn test_expires_pending_decisions_on_terminal_state() {
77 let storage = Arc::new(Storage::new_in_memory().await.unwrap());
78 let handler = DecisionExpirationHandler::new(storage.clone());
79
80 let context = json!({"info": "test"});
81
82 let id = storage
84 .insert_decision(
85 "txn-100",
86 "did:key:z6MkAgent1",
87 DecisionType::AuthorizationRequired,
88 &context,
89 )
90 .await
91 .unwrap();
92
93 handler
95 .handle_event(NodeEvent::TransactionStateChanged {
96 transaction_id: "txn-100".to_string(),
97 old_state: "received".to_string(),
98 new_state: "rejected".to_string(),
99 agent_did: Some("did:key:z6MkOther".to_string()),
100 })
101 .await;
102
103 let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
104 assert_eq!(entry.status, DecisionStatus::Expired);
105 }
106
107 #[tokio::test]
108 async fn test_expires_delivered_decisions_on_terminal_state() {
109 let storage = Arc::new(Storage::new_in_memory().await.unwrap());
110 let handler = DecisionExpirationHandler::new(storage.clone());
111
112 let context = json!({"info": "test"});
113
114 let id = storage
115 .insert_decision(
116 "txn-101",
117 "did:key:z6MkAgent1",
118 DecisionType::AuthorizationRequired,
119 &context,
120 )
121 .await
122 .unwrap();
123
124 storage
126 .update_decision_status(id, DecisionStatus::Delivered, None, None)
127 .await
128 .unwrap();
129
130 handler
132 .handle_event(NodeEvent::TransactionStateChanged {
133 transaction_id: "txn-101".to_string(),
134 old_state: "received".to_string(),
135 new_state: "cancelled".to_string(),
136 agent_did: None,
137 })
138 .await;
139
140 let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
141 assert_eq!(entry.status, DecisionStatus::Expired);
142 }
143
144 #[tokio::test]
145 async fn test_does_not_expire_resolved_decisions() {
146 let storage = Arc::new(Storage::new_in_memory().await.unwrap());
147 let handler = DecisionExpirationHandler::new(storage.clone());
148
149 let context = json!({"info": "test"});
150
151 let id = storage
152 .insert_decision(
153 "txn-102",
154 "did:key:z6MkAgent1",
155 DecisionType::AuthorizationRequired,
156 &context,
157 )
158 .await
159 .unwrap();
160
161 storage
163 .update_decision_status(id, DecisionStatus::Resolved, Some("authorize"), None)
164 .await
165 .unwrap();
166
167 handler
169 .handle_event(NodeEvent::TransactionStateChanged {
170 transaction_id: "txn-102".to_string(),
171 old_state: "settled".to_string(),
172 new_state: "reverted".to_string(),
173 agent_did: None,
174 })
175 .await;
176
177 let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
179 assert_eq!(entry.status, DecisionStatus::Resolved);
180 }
181
182 #[tokio::test]
183 async fn test_ignores_non_terminal_states() {
184 let storage = Arc::new(Storage::new_in_memory().await.unwrap());
185 let handler = DecisionExpirationHandler::new(storage.clone());
186
187 let context = json!({"info": "test"});
188
189 let id = storage
190 .insert_decision(
191 "txn-103",
192 "did:key:z6MkAgent1",
193 DecisionType::AuthorizationRequired,
194 &context,
195 )
196 .await
197 .unwrap();
198
199 handler
201 .handle_event(NodeEvent::TransactionStateChanged {
202 transaction_id: "txn-103".to_string(),
203 old_state: "received".to_string(),
204 new_state: "partially_authorized".to_string(),
205 agent_did: Some("did:key:z6MkAgent2".to_string()),
206 })
207 .await;
208
209 let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
211 assert_eq!(entry.status, DecisionStatus::Pending);
212 }
213
214 #[tokio::test]
215 async fn test_ignores_non_state_change_events() {
216 let storage = Arc::new(Storage::new_in_memory().await.unwrap());
217 let handler = DecisionExpirationHandler::new(storage.clone());
218
219 let context = json!({"info": "test"});
220
221 let id = storage
222 .insert_decision(
223 "txn-104",
224 "did:key:z6MkAgent1",
225 DecisionType::AuthorizationRequired,
226 &context,
227 )
228 .await
229 .unwrap();
230
231 handler
233 .handle_event(NodeEvent::AgentRegistered {
234 did: "did:key:z6MkNew".to_string(),
235 })
236 .await;
237
238 let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
240 assert_eq!(entry.status, DecisionStatus::Pending);
241 }
242}