Skip to main content

tap_node/event/
decision_state_handler.rs

1//! Decision state handler
2//!
3//! Listens for `TransactionStateChanged` events and manages decision lifecycle:
4//! - Expires pending/delivered decisions when a transaction reaches a terminal state
5//!   (Rejected, Cancelled, Reverted)
6//! - Resolves pending/delivered decisions when the corresponding action is observed
7//!   (e.g., authorization_required decisions resolved when state becomes Authorized)
8
9use super::{EventSubscriber, NodeEvent};
10use crate::state_machine::fsm::TransactionState;
11use crate::storage::{DecisionType, Storage};
12use async_trait::async_trait;
13use std::sync::Arc;
14use tracing::{debug, error};
15
16/// Manages decision lifecycle based on transaction state changes
17pub struct DecisionStateHandler {
18    storage: Arc<Storage>,
19}
20
21impl DecisionStateHandler {
22    /// Create a new decision state handler
23    pub fn new(storage: Arc<Storage>) -> Self {
24        Self { storage }
25    }
26}
27
28#[async_trait]
29impl EventSubscriber for DecisionStateHandler {
30    async fn handle_event(&self, event: NodeEvent) {
31        if let NodeEvent::TransactionStateChanged {
32            transaction_id,
33            new_state,
34            ..
35        } = event
36        {
37            let state = match new_state.parse::<TransactionState>() {
38                Ok(s) => s,
39                Err(_) => return,
40            };
41
42            if state.is_terminal() {
43                // Terminal states: expire all pending/delivered decisions
44                debug!(
45                    "Transaction {} reached terminal state {}, expiring pending decisions",
46                    transaction_id, new_state
47                );
48                match self
49                    .storage
50                    .expire_decisions_for_transaction(&transaction_id)
51                    .await
52                {
53                    Ok(count) => {
54                        if count > 0 {
55                            debug!(
56                                "Expired {} decisions for transaction {}",
57                                count, transaction_id
58                            );
59                        }
60                    }
61                    Err(e) => {
62                        error!(
63                            "Failed to expire decisions for transaction {}: {}",
64                            transaction_id, e
65                        );
66                    }
67                }
68            } else {
69                // Non-terminal state changes: resolve matching decisions
70                let resolution = match state {
71                    TransactionState::PartiallyAuthorized | TransactionState::ReadyToSettle => {
72                        Some(("authorize", Some(DecisionType::AuthorizationRequired)))
73                    }
74                    TransactionState::Settled => {
75                        Some(("settle", Some(DecisionType::SettlementRequired)))
76                    }
77                    _ => None,
78                };
79
80                if let Some((action, decision_type)) = resolution {
81                    debug!(
82                        "Transaction {} reached state {}, resolving {} decisions",
83                        transaction_id, new_state, action
84                    );
85                    match self
86                        .storage
87                        .resolve_decisions_for_transaction(&transaction_id, action, decision_type)
88                        .await
89                    {
90                        Ok(count) => {
91                            if count > 0 {
92                                debug!(
93                                    "Resolved {} decisions for transaction {} with action: {}",
94                                    count, transaction_id, action
95                                );
96                            }
97                        }
98                        Err(e) => {
99                            error!(
100                                "Failed to resolve decisions for transaction {}: {}",
101                                transaction_id, e
102                            );
103                        }
104                    }
105                }
106            }
107        }
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114    use crate::storage::DecisionStatus;
115    use serde_json::json;
116
117    #[tokio::test]
118    async fn test_expires_pending_decisions_on_terminal_state() {
119        let storage = Arc::new(Storage::new_in_memory().await.unwrap());
120        let handler = DecisionStateHandler::new(storage.clone());
121
122        let context = json!({"info": "test"});
123
124        let id = storage
125            .insert_decision(
126                "txn-100",
127                "did:key:z6MkAgent1",
128                DecisionType::AuthorizationRequired,
129                &context,
130            )
131            .await
132            .unwrap();
133
134        handler
135            .handle_event(NodeEvent::TransactionStateChanged {
136                transaction_id: "txn-100".to_string(),
137                old_state: "received".to_string(),
138                new_state: "rejected".to_string(),
139                agent_did: Some("did:key:z6MkOther".to_string()),
140            })
141            .await;
142
143        let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
144        assert_eq!(entry.status, DecisionStatus::Expired);
145    }
146
147    #[tokio::test]
148    async fn test_expires_delivered_decisions_on_terminal_state() {
149        let storage = Arc::new(Storage::new_in_memory().await.unwrap());
150        let handler = DecisionStateHandler::new(storage.clone());
151
152        let context = json!({"info": "test"});
153
154        let id = storage
155            .insert_decision(
156                "txn-101",
157                "did:key:z6MkAgent1",
158                DecisionType::AuthorizationRequired,
159                &context,
160            )
161            .await
162            .unwrap();
163
164        storage
165            .update_decision_status(id, DecisionStatus::Delivered, None, None)
166            .await
167            .unwrap();
168
169        handler
170            .handle_event(NodeEvent::TransactionStateChanged {
171                transaction_id: "txn-101".to_string(),
172                old_state: "received".to_string(),
173                new_state: "cancelled".to_string(),
174                agent_did: None,
175            })
176            .await;
177
178        let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
179        assert_eq!(entry.status, DecisionStatus::Expired);
180    }
181
182    #[tokio::test]
183    async fn test_does_not_expire_resolved_decisions() {
184        let storage = Arc::new(Storage::new_in_memory().await.unwrap());
185        let handler = DecisionStateHandler::new(storage.clone());
186
187        let context = json!({"info": "test"});
188
189        let id = storage
190            .insert_decision(
191                "txn-102",
192                "did:key:z6MkAgent1",
193                DecisionType::AuthorizationRequired,
194                &context,
195            )
196            .await
197            .unwrap();
198
199        storage
200            .update_decision_status(id, DecisionStatus::Resolved, Some("authorize"), None)
201            .await
202            .unwrap();
203
204        handler
205            .handle_event(NodeEvent::TransactionStateChanged {
206                transaction_id: "txn-102".to_string(),
207                old_state: "settled".to_string(),
208                new_state: "reverted".to_string(),
209                agent_did: None,
210            })
211            .await;
212
213        let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
214        assert_eq!(entry.status, DecisionStatus::Resolved);
215    }
216
217    #[tokio::test]
218    async fn test_resolves_authorization_on_ready_to_settle_state() {
219        let storage = Arc::new(Storage::new_in_memory().await.unwrap());
220        let handler = DecisionStateHandler::new(storage.clone());
221
222        let context = json!({"info": "test"});
223
224        let id = storage
225            .insert_decision(
226                "txn-200",
227                "did:key:z6MkAgent1",
228                DecisionType::AuthorizationRequired,
229                &context,
230            )
231            .await
232            .unwrap();
233
234        handler
235            .handle_event(NodeEvent::TransactionStateChanged {
236                transaction_id: "txn-200".to_string(),
237                old_state: "received".to_string(),
238                new_state: "ready_to_settle".to_string(),
239                agent_did: Some("did:key:z6MkAgent1".to_string()),
240            })
241            .await;
242
243        let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
244        assert_eq!(entry.status, DecisionStatus::Resolved);
245        assert_eq!(entry.resolution.as_deref(), Some("authorize"));
246    }
247
248    #[tokio::test]
249    async fn test_resolves_settlement_on_settled_state() {
250        let storage = Arc::new(Storage::new_in_memory().await.unwrap());
251        let handler = DecisionStateHandler::new(storage.clone());
252
253        let context = json!({"info": "test"});
254
255        let id = storage
256            .insert_decision(
257                "txn-201",
258                "did:key:z6MkAgent1",
259                DecisionType::SettlementRequired,
260                &context,
261            )
262            .await
263            .unwrap();
264
265        handler
266            .handle_event(NodeEvent::TransactionStateChanged {
267                transaction_id: "txn-201".to_string(),
268                old_state: "ready_to_settle".to_string(),
269                new_state: "settled".to_string(),
270                agent_did: Some("did:key:z6MkAgent1".to_string()),
271            })
272            .await;
273
274        let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
275        assert_eq!(entry.status, DecisionStatus::Resolved);
276        assert_eq!(entry.resolution.as_deref(), Some("settle"));
277    }
278
279    #[tokio::test]
280    async fn test_does_not_resolve_unrelated_decision_types() {
281        let storage = Arc::new(Storage::new_in_memory().await.unwrap());
282        let handler = DecisionStateHandler::new(storage.clone());
283
284        let context = json!({"info": "test"});
285
286        // Insert a settlement decision, but trigger a ready_to_settle state
287        let id = storage
288            .insert_decision(
289                "txn-202",
290                "did:key:z6MkAgent1",
291                DecisionType::SettlementRequired,
292                &context,
293            )
294            .await
295            .unwrap();
296
297        handler
298            .handle_event(NodeEvent::TransactionStateChanged {
299                transaction_id: "txn-202".to_string(),
300                old_state: "received".to_string(),
301                new_state: "ready_to_settle".to_string(),
302                agent_did: Some("did:key:z6MkAgent1".to_string()),
303            })
304            .await;
305
306        // Settlement decision should still be pending (authorize resolves auth decisions, not settlement)
307        let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
308        assert_eq!(entry.status, DecisionStatus::Pending);
309    }
310
311    #[tokio::test]
312    async fn test_ignores_non_state_change_events() {
313        let storage = Arc::new(Storage::new_in_memory().await.unwrap());
314        let handler = DecisionStateHandler::new(storage.clone());
315
316        let context = json!({"info": "test"});
317
318        let id = storage
319            .insert_decision(
320                "txn-203",
321                "did:key:z6MkAgent1",
322                DecisionType::AuthorizationRequired,
323                &context,
324            )
325            .await
326            .unwrap();
327
328        handler
329            .handle_event(NodeEvent::AgentRegistered {
330                did: "did:key:z6MkNew".to_string(),
331            })
332            .await;
333
334        let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
335        assert_eq!(entry.status, DecisionStatus::Pending);
336    }
337}