Skip to main content

tap_node/event/
decision_expiration_handler.rs

1//! Decision expiration handler
2//!
3//! Listens for `TransactionStateChanged` events and expires pending/delivered
4//! decisions when a transaction reaches a terminal state (Rejected, Cancelled,
5//! Reverted). This prevents external decision processes from acting on stale
6//! decisions after a transaction has already been resolved.
7
8use super::{EventSubscriber, NodeEvent};
9use crate::state_machine::fsm::TransactionState;
10use crate::storage::Storage;
11use async_trait::async_trait;
12use std::sync::Arc;
13use tracing::{debug, error};
14
15/// Expires pending decisions when transactions reach terminal states
16pub struct DecisionExpirationHandler {
17    storage: Arc<Storage>,
18}
19
20impl DecisionExpirationHandler {
21    /// Create a new decision expiration handler
22    pub fn new(storage: Arc<Storage>) -> Self {
23        Self { storage }
24    }
25}
26
27#[async_trait]
28impl EventSubscriber for DecisionExpirationHandler {
29    async fn handle_event(&self, event: NodeEvent) {
30        if let NodeEvent::TransactionStateChanged {
31            transaction_id,
32            new_state,
33            ..
34        } = event
35        {
36            // Parse the new state and check if it's terminal
37            if let Ok(state) = new_state.parse::<TransactionState>() {
38                if state.is_terminal() {
39                    debug!(
40                        "Transaction {} reached terminal state {}, expiring pending decisions",
41                        transaction_id, new_state
42                    );
43                    match self
44                        .storage
45                        .expire_decisions_for_transaction(&transaction_id)
46                        .await
47                    {
48                        Ok(count) => {
49                            if count > 0 {
50                                debug!(
51                                    "Expired {} decisions for transaction {}",
52                                    count, transaction_id
53                                );
54                            }
55                        }
56                        Err(e) => {
57                            error!(
58                                "Failed to expire decisions for transaction {}: {}",
59                                transaction_id, e
60                            );
61                        }
62                    }
63                }
64            }
65        }
66    }
67}
68
69#[cfg(test)]
70mod tests {
71    use super::*;
72    use crate::storage::{DecisionStatus, DecisionType};
73    use serde_json::json;
74
75    #[tokio::test]
76    async fn test_expires_pending_decisions_on_terminal_state() {
77        let storage = Arc::new(Storage::new_in_memory().await.unwrap());
78        let handler = DecisionExpirationHandler::new(storage.clone());
79
80        let context = json!({"info": "test"});
81
82        // Insert a pending decision
83        let id = storage
84            .insert_decision(
85                "txn-100",
86                "did:key:z6MkAgent1",
87                DecisionType::AuthorizationRequired,
88                &context,
89            )
90            .await
91            .unwrap();
92
93        // Simulate transaction reaching Rejected state
94        handler
95            .handle_event(NodeEvent::TransactionStateChanged {
96                transaction_id: "txn-100".to_string(),
97                old_state: "received".to_string(),
98                new_state: "rejected".to_string(),
99                agent_did: Some("did:key:z6MkOther".to_string()),
100            })
101            .await;
102
103        let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
104        assert_eq!(entry.status, DecisionStatus::Expired);
105    }
106
107    #[tokio::test]
108    async fn test_expires_delivered_decisions_on_terminal_state() {
109        let storage = Arc::new(Storage::new_in_memory().await.unwrap());
110        let handler = DecisionExpirationHandler::new(storage.clone());
111
112        let context = json!({"info": "test"});
113
114        let id = storage
115            .insert_decision(
116                "txn-101",
117                "did:key:z6MkAgent1",
118                DecisionType::AuthorizationRequired,
119                &context,
120            )
121            .await
122            .unwrap();
123
124        // Mark as delivered
125        storage
126            .update_decision_status(id, DecisionStatus::Delivered, None, None)
127            .await
128            .unwrap();
129
130        // Transaction cancelled
131        handler
132            .handle_event(NodeEvent::TransactionStateChanged {
133                transaction_id: "txn-101".to_string(),
134                old_state: "received".to_string(),
135                new_state: "cancelled".to_string(),
136                agent_did: None,
137            })
138            .await;
139
140        let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
141        assert_eq!(entry.status, DecisionStatus::Expired);
142    }
143
144    #[tokio::test]
145    async fn test_does_not_expire_resolved_decisions() {
146        let storage = Arc::new(Storage::new_in_memory().await.unwrap());
147        let handler = DecisionExpirationHandler::new(storage.clone());
148
149        let context = json!({"info": "test"});
150
151        let id = storage
152            .insert_decision(
153                "txn-102",
154                "did:key:z6MkAgent1",
155                DecisionType::AuthorizationRequired,
156                &context,
157            )
158            .await
159            .unwrap();
160
161        // Resolve the decision
162        storage
163            .update_decision_status(id, DecisionStatus::Resolved, Some("authorize"), None)
164            .await
165            .unwrap();
166
167        // Transaction reverted
168        handler
169            .handle_event(NodeEvent::TransactionStateChanged {
170                transaction_id: "txn-102".to_string(),
171                old_state: "settled".to_string(),
172                new_state: "reverted".to_string(),
173                agent_did: None,
174            })
175            .await;
176
177        // Resolved decision should NOT be expired
178        let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
179        assert_eq!(entry.status, DecisionStatus::Resolved);
180    }
181
182    #[tokio::test]
183    async fn test_ignores_non_terminal_states() {
184        let storage = Arc::new(Storage::new_in_memory().await.unwrap());
185        let handler = DecisionExpirationHandler::new(storage.clone());
186
187        let context = json!({"info": "test"});
188
189        let id = storage
190            .insert_decision(
191                "txn-103",
192                "did:key:z6MkAgent1",
193                DecisionType::AuthorizationRequired,
194                &context,
195            )
196            .await
197            .unwrap();
198
199        // Transition to non-terminal state
200        handler
201            .handle_event(NodeEvent::TransactionStateChanged {
202                transaction_id: "txn-103".to_string(),
203                old_state: "received".to_string(),
204                new_state: "partially_authorized".to_string(),
205                agent_did: Some("did:key:z6MkAgent2".to_string()),
206            })
207            .await;
208
209        // Decision should still be pending
210        let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
211        assert_eq!(entry.status, DecisionStatus::Pending);
212    }
213
214    #[tokio::test]
215    async fn test_ignores_non_state_change_events() {
216        let storage = Arc::new(Storage::new_in_memory().await.unwrap());
217        let handler = DecisionExpirationHandler::new(storage.clone());
218
219        let context = json!({"info": "test"});
220
221        let id = storage
222            .insert_decision(
223                "txn-104",
224                "did:key:z6MkAgent1",
225                DecisionType::AuthorizationRequired,
226                &context,
227            )
228            .await
229            .unwrap();
230
231        // Send a completely different event
232        handler
233            .handle_event(NodeEvent::AgentRegistered {
234                did: "did:key:z6MkNew".to_string(),
235            })
236            .await;
237
238        // Decision should still be pending
239        let entry = storage.get_decision_by_id(id).await.unwrap().unwrap();
240        assert_eq!(entry.status, DecisionStatus::Pending);
241    }
242}