Skip to main content

tap_node/state_machine/
mod.rs

1//! Transaction state machine for TAP Node
2//!
3//! This module implements a state machine for managing transaction lifecycle,
4//! including automatic state transitions and Settle message generation.
5//!
6//! ## Sub-modules
7//!
8//! - [`fsm`]: Formal finite state machine with explicit states, transitions,
9//!   and decision points for the full transaction lifecycle.
10
11pub mod fsm;
12
13use crate::agent::AgentRegistry;
14use crate::error::{Error, Result};
15use crate::event::EventBus;
16use crate::storage::Storage;
17use async_trait::async_trait;
18use dashmap::DashMap;
19use fsm::{
20    AutoApproveHandler, Decision, DecisionHandler, DecisionMode, FsmEvent, LogOnlyHandler,
21    TransactionContext, TransactionFsm,
22};
23use std::sync::Arc;
24use tap_agent::Agent;
25use tap_msg::didcomm::PlainMessage;
26use tap_msg::message::TapMessage;
27
28/// Trait for processing transaction state changes
29#[async_trait]
30pub trait TransactionStateProcessor: Send + Sync {
31    /// Process an incoming message and update transaction state
32    async fn process_message(&self, message: &PlainMessage) -> Result<()>;
33}
34
35/// Standard transaction state processor
36///
37/// Routes incoming TAP messages through the FSM and delegates decisions
38/// to the configured [`DecisionHandler`].
39pub struct StandardTransactionProcessor {
40    storage: Arc<Storage>,
41    event_bus: Arc<EventBus>,
42    agents: Arc<AgentRegistry>,
43    /// In-memory FSM contexts keyed by transaction ID.
44    contexts: DashMap<String, TransactionContext>,
45    /// Handler for FSM decision points.
46    decision_handler: Arc<dyn DecisionHandler>,
47    /// Whether to auto-act on decisions (send Authorize/Settle messages).
48    auto_act: bool,
49}
50
51impl StandardTransactionProcessor {
52    /// Create a new standard transaction processor with the given decision mode.
53    pub fn new(
54        storage: Arc<Storage>,
55        event_bus: Arc<EventBus>,
56        agents: Arc<AgentRegistry>,
57        decision_mode: DecisionMode,
58    ) -> Self {
59        let (decision_handler, auto_act): (Arc<dyn DecisionHandler>, bool) = match decision_mode {
60            DecisionMode::AutoApprove => (Arc::new(AutoApproveHandler), true),
61            DecisionMode::EventBus => (Arc::new(LogOnlyHandler), false),
62            DecisionMode::Custom(handler) => (handler, false),
63        };
64
65        Self {
66            storage,
67            event_bus,
68            agents,
69            contexts: DashMap::new(),
70            decision_handler,
71            auto_act,
72        }
73    }
74
75    /// Extract agents from a Transfer or Payment message.
76    /// Returns (agent_did, role) pairs for agents only (not primary parties).
77    fn extract_agents_from_tap_message(tap_message: &TapMessage) -> Vec<(String, String)> {
78        let agents_list = match tap_message {
79            TapMessage::Transfer(t) => &t.agents,
80            TapMessage::Payment(p) => &p.agents,
81            _ => return Vec::new(),
82        };
83        agents_list
84            .iter()
85            .map(|a| {
86                let role = match a.role.as_deref() {
87                    Some("compliance") => "compliance",
88                    _ => "other",
89                };
90                (a.id.clone(), role.to_string())
91            })
92            .collect()
93    }
94
95    /// Get or create the FSM context for a transaction.
96    fn get_or_create_context(
97        &self,
98        transaction_id: &str,
99        agent_dids: Vec<String>,
100    ) -> TransactionContext {
101        self.contexts
102            .entry(transaction_id.to_string())
103            .or_insert_with(|| TransactionContext::new(transaction_id.to_string(), agent_dids))
104            .clone()
105    }
106
107    /// Persist the FSM context back to the in-memory map.
108    fn save_context(&self, ctx: &TransactionContext) {
109        self.contexts
110            .insert(ctx.transaction_id.clone(), ctx.clone());
111    }
112
113    /// Convert a TapMessage + PlainMessage into an FsmEvent.
114    fn to_fsm_event(tap_message: &TapMessage, plain: &PlainMessage) -> Option<FsmEvent> {
115        match tap_message {
116            TapMessage::Transfer(_) | TapMessage::Payment(_) => {
117                let agent_dids: Vec<String> = Self::extract_agents_from_tap_message(tap_message)
118                    .into_iter()
119                    .map(|(did, _)| did)
120                    .collect();
121                Some(FsmEvent::TransactionReceived { agent_dids })
122            }
123            TapMessage::Authorize(auth) => Some(FsmEvent::AuthorizeReceived {
124                agent_did: plain.from.clone(),
125                settlement_address: auth.settlement_address.clone(),
126                expiry: auth.expiry.clone(),
127            }),
128            TapMessage::Reject(reject) => Some(FsmEvent::RejectReceived {
129                agent_did: plain.from.clone(),
130                reason: reject.reason.clone(),
131            }),
132            TapMessage::Cancel(cancel) => Some(FsmEvent::CancelReceived {
133                by_did: plain.from.clone(),
134                reason: cancel.reason.clone(),
135            }),
136            TapMessage::Settle(settle) => Some(FsmEvent::SettleReceived {
137                settlement_id: settle.settlement_id.clone(),
138                amount: settle.amount.clone(),
139            }),
140            TapMessage::Revert(revert) => Some(FsmEvent::RevertReceived {
141                by_did: plain.from.clone(),
142                reason: revert.reason.clone(),
143            }),
144            TapMessage::AddAgents(add) => Some(FsmEvent::AgentsAdded {
145                agent_dids: add.agents.iter().map(|a| a.id.clone()).collect(),
146            }),
147            TapMessage::UpdatePolicies(_) => Some(FsmEvent::PoliciesReceived {
148                from_did: plain.from.clone(),
149            }),
150            TapMessage::Presentation(_) => Some(FsmEvent::PresentationReceived {
151                from_did: plain.from.clone(),
152            }),
153            _ => None,
154        }
155    }
156
157    /// Get the transaction_id that a TAP message references.
158    fn transaction_id_for(tap_message: &TapMessage, plain: &PlainMessage) -> String {
159        match tap_message {
160            TapMessage::Transfer(_) | TapMessage::Payment(_) => plain.id.clone(),
161            TapMessage::Authorize(a) => a.transaction_id.clone(),
162            TapMessage::Reject(r) => r.transaction_id.clone(),
163            TapMessage::Cancel(c) => c.transaction_id.clone(),
164            TapMessage::Settle(s) => s.transaction_id.clone(),
165            TapMessage::Revert(r) => r.transaction_id.clone(),
166            TapMessage::AddAgents(a) => a.transaction_id.clone(),
167            TapMessage::UpdatePolicies(u) => u.transaction_id.clone(),
168            // Presentation uses pthid/thid for threading
169            _ => plain.thid.clone().unwrap_or_default(),
170        }
171    }
172
173    /// Publish a decision to the event bus.
174    async fn publish_decision(&self, ctx: &TransactionContext, decision: &Decision) {
175        let decision_json = serde_json::to_value(decision).unwrap_or_default();
176        self.event_bus
177            .publish_decision_required(
178                ctx.transaction_id.clone(),
179                ctx.state.to_string(),
180                decision_json,
181                ctx.pending_agents(),
182            )
183            .await;
184    }
185
186    // ---- Auto-act methods (only called in AutoApprove mode) ----
187
188    /// Automatically send Authorize for our registered agents.
189    async fn auto_authorize_transaction(&self, message: &PlainMessage) -> Result<()> {
190        let tap_message = TapMessage::from_plain_message(message)
191            .map_err(|e| Error::InvalidPlainMessage(e.to_string()))?;
192
193        let transaction_id = match &tap_message {
194            TapMessage::Transfer(transfer) => &transfer.transaction_id,
195            TapMessage::Payment(payment) => &payment.transaction_id,
196            _ => return Ok(()),
197        };
198
199        let our_agents = self.agents.get_all_dids();
200        let transaction_agents = Self::extract_agents_from_tap_message(&tap_message);
201
202        for (agent_did, _role) in transaction_agents {
203            if our_agents.contains(&agent_did) {
204                if let Ok(agent) = self.agents.get_agent(&agent_did).await {
205                    use tap_msg::message::tap_message_trait::Authorizable;
206                    let authorize_message = match &tap_message {
207                        TapMessage::Transfer(transfer) => {
208                            transfer.authorize(&agent_did, None, None)
209                        }
210                        TapMessage::Payment(payment) => payment.authorize(&agent_did, None, None),
211                        _ => continue,
212                    };
213
214                    let auth_body = authorize_message.body;
215                    let recipients_list = vec![message.from.as_str()];
216
217                    log::info!(
218                        "Auto-authorizing transaction {:?} from agent {}",
219                        transaction_id,
220                        agent_did
221                    );
222
223                    if let Err(e) = agent.send_message(&auth_body, recipients_list, true).await {
224                        log::warn!(
225                            "Failed to send auto-Authorize for transaction {:?} from agent {}: {}",
226                            transaction_id,
227                            agent_did,
228                            e
229                        );
230                    }
231                }
232            }
233        }
234
235        Ok(())
236    }
237
238    /// Check if all agents authorized and send Settle if so.
239    async fn check_and_send_settle(&self, transaction_id: &str) -> Result<()> {
240        let transaction = self
241            .storage
242            .get_transaction_by_id(transaction_id)
243            .await
244            .map_err(|e| Error::Storage(e.to_string()))?
245            .ok_or_else(|| Error::Storage(format!("Transaction {} not found", transaction_id)))?;
246
247        let our_agents = self.agents.get_all_dids();
248        let is_sender = transaction
249            .from_did
250            .as_ref()
251            .map(|did| our_agents.contains(did))
252            .unwrap_or(false);
253
254        if !is_sender {
255            return Ok(());
256        }
257
258        let all_authorized = self
259            .storage
260            .are_all_agents_authorized(transaction_id)
261            .await
262            .map_err(|e| Error::Storage(e.to_string()))?;
263
264        if !all_authorized {
265            return Ok(());
266        }
267
268        if transaction.status.to_string() == "confirmed" {
269            return Ok(());
270        }
271
272        log::info!(
273            "All agents authorized for transaction {}, sending Settle message",
274            transaction_id
275        );
276
277        let sender_did = transaction
278            .from_did
279            .as_ref()
280            .ok_or_else(|| Error::Processing("Transaction missing from_did".to_string()))?;
281
282        let agent = self
283            .agents
284            .get_agent(sender_did)
285            .await
286            .map_err(|e| Error::Agent(e.to_string()))?;
287
288        let settlement_id = format!("settle_{}", transaction_id);
289
290        let transaction_message: PlainMessage =
291            serde_json::from_value(transaction.message_json.clone()).map_err(|e| {
292                Error::Serialization(format!("Failed to parse transaction message: {}", e))
293            })?;
294
295        let tap_message = TapMessage::from_plain_message(&transaction_message)
296            .map_err(|e| Error::InvalidPlainMessage(e.to_string()))?;
297
298        use tap_msg::message::tap_message_trait::Transaction;
299
300        let agents = Self::extract_agents_from_tap_message(&tap_message);
301
302        if agents.is_empty() {
303            log::debug!(
304                "No agents to send Settle message to for transaction {}",
305                transaction_id
306            );
307            return Ok(());
308        }
309
310        for (agent_did, _role) in agents {
311            if agent_did != *sender_did {
312                let settle_message = match &tap_message {
313                    TapMessage::Transfer(transfer) => {
314                        transfer.settle(sender_did, &settlement_id, None)
315                    }
316                    TapMessage::Payment(payment) => {
317                        payment.settle(sender_did, &settlement_id, None)
318                    }
319                    _ => {
320                        log::warn!(
321                            "Unexpected message type for settlement: {}",
322                            transaction.message_type
323                        );
324                        continue;
325                    }
326                };
327
328                let settle_body = settle_message.body;
329                let recipients_list = vec![agent_did.as_str()];
330                let _ = agent
331                    .send_message(&settle_body, recipients_list, true)
332                    .await
333                    .map_err(|e| {
334                        log::warn!("Failed to send Settle to {}: {}", agent_did, e);
335                        e
336                    });
337            }
338        }
339
340        self.storage
341            .update_transaction_status(transaction_id, "confirmed")
342            .await
343            .map_err(|e| Error::Storage(e.to_string()))?;
344
345        self.event_bus
346            .publish_transaction_state_changed(
347                transaction_id.to_string(),
348                "pending".to_string(),
349                "confirmed".to_string(),
350                Some(sender_did.clone()),
351            )
352            .await;
353
354        Ok(())
355    }
356}
357
358#[async_trait]
359impl TransactionStateProcessor for StandardTransactionProcessor {
360    async fn process_message(&self, message: &PlainMessage) -> Result<()> {
361        let tap_message = TapMessage::from_plain_message(message)
362            .map_err(|e| Error::InvalidPlainMessage(e.to_string()))?;
363
364        let transaction_id = Self::transaction_id_for(&tap_message, message);
365
366        // Convert message to FSM event
367        let fsm_event = Self::to_fsm_event(&tap_message, message);
368
369        // --- Storage operations (always run regardless of decision mode) ---
370        match &tap_message {
371            TapMessage::Transfer(_) | TapMessage::Payment(_) => {
372                if let Err(e) = self.storage.insert_transaction(message).await {
373                    log::warn!("Failed to insert transaction {}: {}", transaction_id, e);
374                }
375                let agents = Self::extract_agents_from_tap_message(&tap_message);
376                for (agent_did, role) in &agents {
377                    if let Err(e) = self
378                        .storage
379                        .insert_transaction_agent(&transaction_id, agent_did, role)
380                        .await
381                    {
382                        log::warn!(
383                            "Failed to insert agent {} for transaction {}: {}",
384                            agent_did,
385                            transaction_id,
386                            e
387                        );
388                    }
389                }
390            }
391            TapMessage::Authorize(_) => {
392                if let Err(e) = self
393                    .storage
394                    .update_transaction_agent_status(&transaction_id, &message.from, "authorized")
395                    .await
396                {
397                    log::warn!(
398                        "Failed to update agent {} status for transaction {}: {}",
399                        message.from,
400                        transaction_id,
401                        e
402                    );
403                }
404            }
405            TapMessage::Reject(_) => {
406                let _ = self
407                    .storage
408                    .update_transaction_agent_status(&transaction_id, &message.from, "rejected")
409                    .await;
410                let _ = self
411                    .storage
412                    .update_transaction_status(&transaction_id, "failed")
413                    .await;
414            }
415            TapMessage::Cancel(_) => {
416                let _ = self
417                    .storage
418                    .update_transaction_agent_status(&transaction_id, &message.from, "cancelled")
419                    .await;
420                let _ = self
421                    .storage
422                    .update_transaction_status(&transaction_id, "cancelled")
423                    .await;
424            }
425            TapMessage::Settle(_) => {
426                let _ = self
427                    .storage
428                    .update_transaction_status(&transaction_id, "confirmed")
429                    .await;
430            }
431            TapMessage::Revert(_) => {
432                let _ = self
433                    .storage
434                    .update_transaction_status(&transaction_id, "reverted")
435                    .await;
436            }
437            TapMessage::AddAgents(add) => {
438                for agent in &add.agents {
439                    let role_str = match agent.role.as_deref() {
440                        Some("compliance") => "compliance",
441                        _ => "other",
442                    };
443                    let _ = self
444                        .storage
445                        .insert_transaction_agent(&transaction_id, &agent.id, role_str)
446                        .await;
447                }
448            }
449            _ => {}
450        }
451
452        // --- FSM transition ---
453        if let Some(event) = fsm_event {
454            let agent_dids: Vec<String> = Self::extract_agents_from_tap_message(&tap_message)
455                .into_iter()
456                .map(|(did, _)| did)
457                .collect();
458
459            let mut ctx = self.get_or_create_context(&transaction_id, agent_dids);
460            let old_state = ctx.state.to_string();
461
462            match TransactionFsm::apply(&mut ctx, event) {
463                Ok(transition) => {
464                    let new_state = transition.to_state.to_string();
465
466                    // Persist FSM context
467                    self.save_context(&ctx);
468
469                    // Publish state change if it actually changed
470                    if old_state != new_state {
471                        self.event_bus
472                            .publish_transaction_state_changed(
473                                transaction_id.clone(),
474                                old_state,
475                                new_state,
476                                Some(message.from.clone()),
477                            )
478                            .await;
479                    }
480
481                    // Handle decision if one was produced
482                    if let Some(ref decision) = transition.decision {
483                        // Always notify the decision handler
484                        self.decision_handler.handle_decision(&ctx, decision).await;
485
486                        // Always publish to event bus for observability
487                        self.publish_decision(&ctx, decision).await;
488
489                        // Auto-act if configured
490                        if self.auto_act {
491                            match decision {
492                                Decision::AuthorizationRequired { .. } => {
493                                    if let Err(e) = self.auto_authorize_transaction(message).await {
494                                        log::warn!(
495                                            "Failed to auto-authorize transaction {}: {}",
496                                            transaction_id,
497                                            e
498                                        );
499                                    }
500                                }
501                                Decision::SettlementRequired { transaction_id } => {
502                                    if let Err(e) = self.check_and_send_settle(transaction_id).await
503                                    {
504                                        log::warn!(
505                                            "Failed to check/send settle for transaction {}: {}",
506                                            transaction_id,
507                                            e
508                                        );
509                                    }
510                                }
511                                Decision::PolicySatisfactionRequired { .. } => {
512                                    log::debug!(
513                                        "Policy satisfaction required for transaction {} — no auto-action available",
514                                        transaction_id
515                                    );
516                                }
517                            }
518                        }
519                    }
520                }
521                Err(e) => {
522                    log::warn!(
523                        "FSM transition error for transaction {}: {}",
524                        transaction_id,
525                        e
526                    );
527                }
528            }
529        }
530
531        Ok(())
532    }
533}