tap_node/state_machine/
mod.rs

1//! Transaction state machine for TAP Node
2//!
3//! This module implements a state machine for managing transaction lifecycle,
4//! including automatic state transitions and Settle message generation.
5
6use crate::agent::AgentRegistry;
7use crate::error::{Error, Result};
8use crate::event::EventBus;
9use crate::storage::Storage;
10use async_trait::async_trait;
11use std::sync::Arc;
12use tap_agent::Agent;
13use tap_msg::didcomm::PlainMessage;
14use tap_msg::message::TapMessage;
15
16/// Trait for processing transaction state changes
17#[async_trait]
18pub trait TransactionStateProcessor: Send + Sync {
19    /// Process an incoming message and update transaction state
20    async fn process_message(&self, message: &PlainMessage) -> Result<()>;
21}
22
23/// Standard transaction state processor
24pub struct StandardTransactionProcessor {
25    storage: Arc<Storage>,
26    event_bus: Arc<EventBus>,
27    agents: Arc<AgentRegistry>,
28}
29
30impl StandardTransactionProcessor {
31    /// Create a new standard transaction processor
32    pub fn new(
33        storage: Arc<Storage>,
34        event_bus: Arc<EventBus>,
35        agents: Arc<AgentRegistry>,
36    ) -> Self {
37        Self {
38            storage,
39            event_bus,
40            agents,
41        }
42    }
43
44    /// Extract agents from a Transfer or Payment message
45    /// Note: This only extracts actual agents (compliance, etc.), not the primary parties
46    /// (originator/beneficiary for Transfer, customer/merchant for Payment)
47    async fn extract_agents_from_message(
48        &self,
49        message: &PlainMessage,
50    ) -> Result<Vec<(String, String)>> {
51        let tap_message = TapMessage::from_plain_message(message)
52            .map_err(|e| Error::InvalidPlainMessage(e.to_string()))?;
53
54        let mut agents = Vec::new();
55
56        match tap_message {
57            TapMessage::Transfer(transfer) => {
58                // Only add agents from the agents field, not the primary parties
59                for agent in &transfer.agents {
60                    let role_str = match agent.role.as_str() {
61                        "compliance" => "compliance",
62                        _ => "other",
63                    };
64                    agents.push((agent.id.clone(), role_str.to_string()));
65                }
66            }
67            TapMessage::Payment(payment) => {
68                // Only add agents from the agents field, not the primary parties
69                for agent in &payment.agents {
70                    let role_str = match agent.role.as_str() {
71                        "compliance" => "compliance",
72                        _ => "other",
73                    };
74                    agents.push((agent.id.clone(), role_str.to_string()));
75                }
76            }
77            _ => {
78                // Not a Transfer or Payment
79                return Ok(agents);
80            }
81        }
82
83        Ok(agents)
84    }
85
86    /// Automatically send Authorize message for incoming Transfer or Payment messages
87    async fn auto_authorize_transaction(&self, message: &PlainMessage) -> Result<()> {
88        let tap_message = TapMessage::from_plain_message(message)
89            .map_err(|e| Error::InvalidPlainMessage(e.to_string()))?;
90
91        // Only auto-authorize Transfer and Payment messages
92        let transaction_id = match &tap_message {
93            TapMessage::Transfer(transfer) => &transfer.transaction_id,
94            TapMessage::Payment(payment) => &payment.transaction_id,
95            _ => return Ok(()), // Not a transaction message
96        };
97
98        // Find agents in our registry that are involved in this transaction
99        let our_agents = self.agents.get_all_dids();
100        let transaction_agents = self.extract_agents_from_message(message).await?;
101
102        // Check if any of our agents are involved in this transaction
103        for (agent_did, _role) in transaction_agents {
104            if our_agents.contains(&agent_did) {
105                // Get the agent from registry
106                if let Ok(agent) = self.agents.get_agent(&agent_did).await {
107                    // Create an Authorize message using the Authorizable trait
108                    use tap_msg::message::tap_message_trait::Authorizable;
109                    let authorize_message = match &tap_message {
110                        TapMessage::Transfer(transfer) => {
111                            transfer.authorize(&agent_did, None, None)
112                        }
113                        TapMessage::Payment(payment) => payment.authorize(&agent_did, None, None),
114                        _ => continue, // Should not happen due to earlier check
115                    };
116
117                    // Convert to body for sending
118                    let auth_body = authorize_message.body;
119
120                    // Send to the original sender
121                    let recipients_list = vec![message.from.as_str()];
122
123                    log::info!(
124                        "Auto-authorizing transaction {} from agent {}",
125                        transaction_id,
126                        agent_did
127                    );
128
129                    if let Err(e) = agent.send_message(&auth_body, recipients_list, true).await {
130                        log::warn!(
131                            "Failed to send auto-Authorize for transaction {} from agent {}: {}",
132                            transaction_id,
133                            agent_did,
134                            e
135                        );
136                    }
137                }
138            }
139        }
140
141        Ok(())
142    }
143
144    /// Check if we should send a Settle message
145    async fn check_and_send_settle(&self, transaction_id: &str) -> Result<()> {
146        // Get the transaction
147        let transaction = self
148            .storage
149            .get_transaction_by_id(transaction_id)
150            .await
151            .map_err(|e| Error::Storage(e.to_string()))?
152            .ok_or_else(|| Error::Storage(format!("Transaction {} not found", transaction_id)))?;
153
154        // Check if we're the sender (originator)
155        let our_agents = self.agents.get_all_dids();
156        let is_sender = transaction
157            .from_did
158            .as_ref()
159            .map(|did| our_agents.contains(did))
160            .unwrap_or(false);
161
162        if !is_sender {
163            return Ok(()); // Only sender sends Settle
164        }
165
166        // Check if all agents have authorized
167        let all_authorized = self
168            .storage
169            .are_all_agents_authorized(transaction_id)
170            .await
171            .map_err(|e| Error::Storage(e.to_string()))?;
172
173        if !all_authorized {
174            return Ok(()); // Not all agents have authorized yet
175        }
176
177        // Check if transaction is already in 'confirmed' status
178        if transaction.status.to_string() == "confirmed" {
179            return Ok(()); // Already settled
180        }
181
182        // Create and send Settle message
183        log::info!(
184            "All agents authorized for transaction {}, sending Settle message",
185            transaction_id
186        );
187
188        // Get the sender agent
189        let sender_did = transaction
190            .from_did
191            .as_ref()
192            .ok_or_else(|| Error::Processing("Transaction missing from_did".to_string()))?;
193
194        let agent = self
195            .agents
196            .get_agent(sender_did)
197            .await
198            .map_err(|e| Error::Agent(e.to_string()))?;
199
200        // Create Settle message using the Transaction trait's settle() method
201        // This is the proper way to create settlement messages using the TAP framework
202        let settlement_id = format!("settle_{}", transaction_id);
203
204        // Parse the original transaction message to use the Transaction trait
205        let transaction_message: PlainMessage =
206            serde_json::from_value(transaction.message_json.clone()).map_err(|e| {
207                Error::Serialization(format!("Failed to parse transaction message: {}", e))
208            })?;
209
210        let tap_message = TapMessage::from_plain_message(&transaction_message)
211            .map_err(|e| Error::InvalidPlainMessage(e.to_string()))?;
212
213        // Use the Transaction trait to create the Settle message
214        use tap_msg::message::tap_message_trait::Transaction;
215
216        // Send Settle message only to agents (not primary parties)
217        // Get all agents for this transaction
218        let agents = self
219            .extract_agents_from_message(&transaction_message)
220            .await?;
221
222        if agents.is_empty() {
223            log::debug!(
224                "No agents to send Settle message to for transaction {}",
225                transaction_id
226            );
227            return Ok(());
228        }
229
230        // Send settle message to all agents
231        for (agent_did, _role) in agents {
232            if agent_did != *sender_did {
233                // Use the Transaction trait to create a proper Settle message for this agent
234                let settle_message = match &tap_message {
235                    TapMessage::Transfer(transfer) => {
236                        transfer.settle(sender_did, &settlement_id, None)
237                    }
238                    TapMessage::Payment(payment) => {
239                        payment.settle(sender_did, &settlement_id, None)
240                    }
241                    _ => {
242                        log::warn!(
243                            "Unexpected message type for settlement: {}",
244                            transaction.message_type
245                        );
246                        continue;
247                    }
248                };
249
250                // Convert to body for sending
251                let settle_body = settle_message.body;
252
253                // Send message using the Agent trait's send_message method for proper signing
254                let recipients_list = vec![agent_did.as_str()];
255                let _ = agent
256                    .send_message(&settle_body, recipients_list, true)
257                    .await
258                    .map_err(|e| {
259                        log::warn!("Failed to send Settle to {}: {}", agent_did, e);
260                        e
261                    });
262            }
263        }
264
265        // Update transaction status to 'confirmed'
266        self.storage
267            .update_transaction_status(transaction_id, "confirmed")
268            .await
269            .map_err(|e| Error::Storage(e.to_string()))?;
270
271        // Emit state change event
272        self.event_bus
273            .publish_transaction_state_changed(
274                transaction_id.to_string(),
275                "pending".to_string(),
276                "confirmed".to_string(),
277                Some(sender_did.clone()),
278            )
279            .await;
280
281        Ok(())
282    }
283}
284
285#[async_trait]
286impl TransactionStateProcessor for StandardTransactionProcessor {
287    async fn process_message(&self, message: &PlainMessage) -> Result<()> {
288        let tap_message = TapMessage::from_plain_message(message)
289            .map_err(|e| Error::InvalidPlainMessage(e.to_string()))?;
290
291        match tap_message {
292            TapMessage::Transfer(_) | TapMessage::Payment(_) => {
293                // First, store the transaction itself
294                let transaction_id = &message.id;
295                if let Err(e) = self.storage.insert_transaction(message).await {
296                    log::warn!("Failed to insert transaction {}: {}", transaction_id, e);
297                }
298
299                // Extract and store agents for new transaction
300                let agents = self.extract_agents_from_message(message).await?;
301
302                for (agent_did, role) in agents {
303                    if let Err(e) = self
304                        .storage
305                        .insert_transaction_agent(transaction_id, &agent_did, &role)
306                        .await
307                    {
308                        log::warn!(
309                            "Failed to insert agent {} for transaction {}: {}",
310                            agent_did,
311                            transaction_id,
312                            e
313                        );
314                    }
315                }
316
317                // Automatically send Authorize messages for our agents involved in this transaction
318                if let Err(e) = self.auto_authorize_transaction(message).await {
319                    log::warn!(
320                        "Failed to auto-authorize transaction {}: {}",
321                        transaction_id,
322                        e
323                    );
324                }
325            }
326            TapMessage::Authorize(auth) => {
327                let transaction_id = &auth.transaction_id;
328                let agent_did = &message.from;
329
330                // Update agent status to 'authorized'
331                if let Err(e) = self
332                    .storage
333                    .update_transaction_agent_status(transaction_id, agent_did, "authorized")
334                    .await
335                {
336                    log::warn!(
337                        "Failed to update agent {} status for transaction {}: {}",
338                        agent_did,
339                        transaction_id,
340                        e
341                    );
342                } else {
343                    // Emit state change event
344                    self.event_bus
345                        .publish_transaction_state_changed(
346                            transaction_id.clone(),
347                            "pending".to_string(),
348                            "pending".to_string(), // Individual agent authorized, but transaction still pending
349                            Some(agent_did.clone()),
350                        )
351                        .await;
352
353                    // Check if we should send Settle
354                    if let Err(e) = self.check_and_send_settle(transaction_id).await {
355                        log::warn!(
356                            "Failed to check/send settle for transaction {}: {}",
357                            transaction_id,
358                            e
359                        );
360                    }
361                }
362            }
363            TapMessage::Cancel(cancel) => {
364                let transaction_id = &cancel.transaction_id;
365                let agent_did = &message.from;
366
367                // Update agent status to 'cancelled'
368                if let Err(e) = self
369                    .storage
370                    .update_transaction_agent_status(transaction_id, agent_did, "cancelled")
371                    .await
372                {
373                    log::warn!(
374                        "Failed to update agent {} status for transaction {}: {}",
375                        agent_did,
376                        transaction_id,
377                        e
378                    );
379                }
380
381                // Update transaction status to 'cancelled'
382                if let Err(e) = self
383                    .storage
384                    .update_transaction_status(transaction_id, "cancelled")
385                    .await
386                {
387                    log::warn!(
388                        "Failed to update transaction {} status: {}",
389                        transaction_id,
390                        e
391                    );
392                } else {
393                    // Emit state change event
394                    self.event_bus
395                        .publish_transaction_state_changed(
396                            transaction_id.clone(),
397                            "pending".to_string(),
398                            "cancelled".to_string(),
399                            Some(agent_did.clone()),
400                        )
401                        .await;
402                }
403            }
404            TapMessage::Reject(reject) => {
405                let transaction_id = &reject.transaction_id;
406                let agent_did = &message.from;
407
408                // Update agent status to 'rejected'
409                if let Err(e) = self
410                    .storage
411                    .update_transaction_agent_status(transaction_id, agent_did, "rejected")
412                    .await
413                {
414                    log::warn!(
415                        "Failed to update agent {} status for transaction {}: {}",
416                        agent_did,
417                        transaction_id,
418                        e
419                    );
420                }
421
422                // Update transaction status to 'failed'
423                if let Err(e) = self
424                    .storage
425                    .update_transaction_status(transaction_id, "failed")
426                    .await
427                {
428                    log::warn!(
429                        "Failed to update transaction {} status: {}",
430                        transaction_id,
431                        e
432                    );
433                } else {
434                    // Emit state change event
435                    self.event_bus
436                        .publish_transaction_state_changed(
437                            transaction_id.clone(),
438                            "pending".to_string(),
439                            "failed".to_string(),
440                            Some(agent_did.clone()),
441                        )
442                        .await;
443                }
444            }
445            TapMessage::Settle(settle) => {
446                let transaction_id = &settle.transaction_id;
447
448                // Update transaction status to 'confirmed'
449                if let Err(e) = self
450                    .storage
451                    .update_transaction_status(transaction_id, "confirmed")
452                    .await
453                {
454                    log::warn!(
455                        "Failed to update transaction {} status: {}",
456                        transaction_id,
457                        e
458                    );
459                } else {
460                    // Emit state change event
461                    self.event_bus
462                        .publish_transaction_state_changed(
463                            transaction_id.clone(),
464                            "pending".to_string(),
465                            "confirmed".to_string(),
466                            Some(message.from.clone()),
467                        )
468                        .await;
469                }
470            }
471            TapMessage::Revert(revert) => {
472                let transaction_id = &revert.transaction_id;
473
474                // Update transaction status to 'reverted'
475                if let Err(e) = self
476                    .storage
477                    .update_transaction_status(transaction_id, "reverted")
478                    .await
479                {
480                    log::warn!(
481                        "Failed to update transaction {} status: {}",
482                        transaction_id,
483                        e
484                    );
485                } else {
486                    // Emit state change event
487                    self.event_bus
488                        .publish_transaction_state_changed(
489                            transaction_id.clone(),
490                            "confirmed".to_string(),
491                            "reverted".to_string(),
492                            Some(message.from.clone()),
493                        )
494                        .await;
495                }
496            }
497            TapMessage::AddAgents(add) => {
498                // Update agents based on TAIP-5
499                let transaction_id = &add.transaction_id;
500
501                for agent in &add.agents {
502                    let role_str = match agent.role.as_str() {
503                        "compliance" => "compliance",
504                        _ => "other",
505                    };
506
507                    if let Err(e) = self
508                        .storage
509                        .insert_transaction_agent(transaction_id, &agent.id, role_str)
510                        .await
511                    {
512                        log::warn!(
513                            "Failed to add agent {} to transaction {}: {}",
514                            agent.id,
515                            transaction_id,
516                            e
517                        );
518                    }
519                }
520            }
521            TapMessage::UpdatePolicies(_) => {
522                // Update policies based on TAIP-7
523                // This would update transaction metadata, but we don't have a specific
524                // field for policies in our current schema, so we'll skip for now
525                log::debug!("UpdatePolicies message received, but policy storage not implemented");
526            }
527            _ => {
528                // Other message types don't affect transaction state
529            }
530        }
531
532        Ok(())
533    }
534}