1use super::{EventSubscriber, NodeEvent};
10use crate::state_machine::fsm::TransactionState;
11use crate::storage::{DecisionType, Storage};
12use async_trait::async_trait;
13use std::sync::Arc;
14use tracing::{debug, error};
15
16pub struct DecisionStateHandler {
18 storage: Arc<Storage>,
19}
20
21impl DecisionStateHandler {
22 pub fn new(storage: Arc<Storage>) -> Self {
24 Self { storage }
25 }
26}
27
28#[async_trait]
29impl EventSubscriber for DecisionStateHandler {
30 async fn handle_event(&self, event: NodeEvent) {
31 if let NodeEvent::TransactionStateChanged {
32 transaction_id,
33 new_state,
34 ..
35 } = event
36 {
37 let state = match new_state.parse::<TransactionState>() {
38 Ok(s) => s,
39 Err(_) => return,
40 };
41
42 if state.is_terminal() {
43 debug!(
45 "Transaction {} reached terminal state {}, expiring pending decisions",
46 transaction_id, new_state
47 );
48 match self
49 .storage
50 .expire_decisions_for_transaction(&transaction_id)
51 .await
52 {
53 Ok(count) => {
54 if count > 0 {
55 debug!(
56 "Expired {} decisions for transaction {}",
57 count, transaction_id
58 );
59 }
60 }
61 Err(e) => {
62 error!(
63 "Failed to expire decisions for transaction {}: {}",
64 transaction_id, e
65 );
66 }
67 }
68 } else {
69 let resolution = match state {
71 TransactionState::PartiallyAuthorized | TransactionState::ReadyToSettle => {
72 Some(("authorize", Some(DecisionType::AuthorizationRequired)))
73 }
74 TransactionState::Settled => {
75 Some(("settle", Some(DecisionType::SettlementRequired)))
76 }
77 _ => None,
78 };
79
80 if let Some((action, decision_type)) = resolution {
81 debug!(
82 "Transaction {} reached state {}, resolving {} decisions",
83 transaction_id, new_state, action
84 );
85 match self
86 .storage
87 .resolve_decisions_for_transaction(&transaction_id, action, decision_type)
88 .await
89 {
90 Ok(count) => {
91 if count > 0 {
92 debug!(
93 "Resolved {} decisions for transaction {} with action: {}",
94 count, transaction_id, action
95 );
96 }
97 }
98 Err(e) => {
99 error!(
100 "Failed to resolve decisions for transaction {}: {}",
101 transaction_id, e
102 );
103 }
104 }
105 }
106 }
107 }
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114 use crate::storage::DecisionStatus;
115 use serde_json::json;
116
117 #[tokio::test]
118 async fn test_expires_pending_decisions_on_terminal_state() {
119 let storage = Arc::new(Storage::new_in_memory().await.unwrap());
120 let handler = DecisionStateHandler::new(storage.clone());
121
122 let context = json!({"info": "test"});
123
124 let id = storage
125 .insert_decision(
126 "txn-100",
127 "did:key:z6MkAgent1",
128 DecisionType::AuthorizationRequired,
129 &context,
130 )
131 .await
132 .unwrap();
133
134 handler
135 .handle_event(NodeEvent::TransactionStateChanged {
136 transaction_id: "txn-100".to_string(),
137 old_state: "received".to_string(),
138 new_state: "rejected".to_string(),
139 agent_did: Some("did:key:z6MkOther".to_string()),
140 })
141 .await;
142
143 let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
144 assert_eq!(entry.status, DecisionStatus::Expired);
145 }
146
147 #[tokio::test]
148 async fn test_expires_delivered_decisions_on_terminal_state() {
149 let storage = Arc::new(Storage::new_in_memory().await.unwrap());
150 let handler = DecisionStateHandler::new(storage.clone());
151
152 let context = json!({"info": "test"});
153
154 let id = storage
155 .insert_decision(
156 "txn-101",
157 "did:key:z6MkAgent1",
158 DecisionType::AuthorizationRequired,
159 &context,
160 )
161 .await
162 .unwrap();
163
164 storage
165 .update_decision_status(id, DecisionStatus::Delivered, None, None)
166 .await
167 .unwrap();
168
169 handler
170 .handle_event(NodeEvent::TransactionStateChanged {
171 transaction_id: "txn-101".to_string(),
172 old_state: "received".to_string(),
173 new_state: "cancelled".to_string(),
174 agent_did: None,
175 })
176 .await;
177
178 let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
179 assert_eq!(entry.status, DecisionStatus::Expired);
180 }
181
182 #[tokio::test]
183 async fn test_does_not_expire_resolved_decisions() {
184 let storage = Arc::new(Storage::new_in_memory().await.unwrap());
185 let handler = DecisionStateHandler::new(storage.clone());
186
187 let context = json!({"info": "test"});
188
189 let id = storage
190 .insert_decision(
191 "txn-102",
192 "did:key:z6MkAgent1",
193 DecisionType::AuthorizationRequired,
194 &context,
195 )
196 .await
197 .unwrap();
198
199 storage
200 .update_decision_status(id, DecisionStatus::Resolved, Some("authorize"), None)
201 .await
202 .unwrap();
203
204 handler
205 .handle_event(NodeEvent::TransactionStateChanged {
206 transaction_id: "txn-102".to_string(),
207 old_state: "settled".to_string(),
208 new_state: "reverted".to_string(),
209 agent_did: None,
210 })
211 .await;
212
213 let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
214 assert_eq!(entry.status, DecisionStatus::Resolved);
215 }
216
217 #[tokio::test]
218 async fn test_resolves_authorization_on_ready_to_settle_state() {
219 let storage = Arc::new(Storage::new_in_memory().await.unwrap());
220 let handler = DecisionStateHandler::new(storage.clone());
221
222 let context = json!({"info": "test"});
223
224 let id = storage
225 .insert_decision(
226 "txn-200",
227 "did:key:z6MkAgent1",
228 DecisionType::AuthorizationRequired,
229 &context,
230 )
231 .await
232 .unwrap();
233
234 handler
235 .handle_event(NodeEvent::TransactionStateChanged {
236 transaction_id: "txn-200".to_string(),
237 old_state: "received".to_string(),
238 new_state: "ready_to_settle".to_string(),
239 agent_did: Some("did:key:z6MkAgent1".to_string()),
240 })
241 .await;
242
243 let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
244 assert_eq!(entry.status, DecisionStatus::Resolved);
245 assert_eq!(entry.resolution.as_deref(), Some("authorize"));
246 }
247
248 #[tokio::test]
249 async fn test_resolves_settlement_on_settled_state() {
250 let storage = Arc::new(Storage::new_in_memory().await.unwrap());
251 let handler = DecisionStateHandler::new(storage.clone());
252
253 let context = json!({"info": "test"});
254
255 let id = storage
256 .insert_decision(
257 "txn-201",
258 "did:key:z6MkAgent1",
259 DecisionType::SettlementRequired,
260 &context,
261 )
262 .await
263 .unwrap();
264
265 handler
266 .handle_event(NodeEvent::TransactionStateChanged {
267 transaction_id: "txn-201".to_string(),
268 old_state: "ready_to_settle".to_string(),
269 new_state: "settled".to_string(),
270 agent_did: Some("did:key:z6MkAgent1".to_string()),
271 })
272 .await;
273
274 let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
275 assert_eq!(entry.status, DecisionStatus::Resolved);
276 assert_eq!(entry.resolution.as_deref(), Some("settle"));
277 }
278
279 #[tokio::test]
280 async fn test_does_not_resolve_unrelated_decision_types() {
281 let storage = Arc::new(Storage::new_in_memory().await.unwrap());
282 let handler = DecisionStateHandler::new(storage.clone());
283
284 let context = json!({"info": "test"});
285
286 let id = storage
288 .insert_decision(
289 "txn-202",
290 "did:key:z6MkAgent1",
291 DecisionType::SettlementRequired,
292 &context,
293 )
294 .await
295 .unwrap();
296
297 handler
298 .handle_event(NodeEvent::TransactionStateChanged {
299 transaction_id: "txn-202".to_string(),
300 old_state: "received".to_string(),
301 new_state: "ready_to_settle".to_string(),
302 agent_did: Some("did:key:z6MkAgent1".to_string()),
303 })
304 .await;
305
306 let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
308 assert_eq!(entry.status, DecisionStatus::Pending);
309 }
310
311 #[tokio::test]
312 async fn test_ignores_non_state_change_events() {
313 let storage = Arc::new(Storage::new_in_memory().await.unwrap());
314 let handler = DecisionStateHandler::new(storage.clone());
315
316 let context = json!({"info": "test"});
317
318 let id = storage
319 .insert_decision(
320 "txn-203",
321 "did:key:z6MkAgent1",
322 DecisionType::AuthorizationRequired,
323 &context,
324 )
325 .await
326 .unwrap();
327
328 handler
329 .handle_event(NodeEvent::AgentRegistered {
330 did: "did:key:z6MkNew".to_string(),
331 })
332 .await;
333
334 let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
335 assert_eq!(entry.status, DecisionStatus::Pending);
336 }
337}