1pub mod fsm;
12
13use crate::agent::AgentRegistry;
14use crate::error::{Error, Result};
15use crate::event::EventBus;
16use crate::storage::Storage;
17use async_trait::async_trait;
18use dashmap::DashMap;
19use fsm::{
20 AutoApproveHandler, Decision, DecisionHandler, DecisionMode, FsmEvent, LogOnlyHandler,
21 TransactionContext, TransactionFsm,
22};
23use std::sync::Arc;
24use tap_agent::Agent;
25use tap_msg::didcomm::PlainMessage;
26use tap_msg::message::TapMessage;
27
28#[async_trait]
30pub trait TransactionStateProcessor: Send + Sync {
31 async fn process_message(&self, message: &PlainMessage) -> Result<()>;
33}
34
35pub struct StandardTransactionProcessor {
40 storage: Arc<Storage>,
41 event_bus: Arc<EventBus>,
42 agents: Arc<AgentRegistry>,
43 contexts: DashMap<String, TransactionContext>,
45 decision_handler: Arc<dyn DecisionHandler>,
47 auto_act: bool,
49}
50
51impl StandardTransactionProcessor {
52 pub fn new(
54 storage: Arc<Storage>,
55 event_bus: Arc<EventBus>,
56 agents: Arc<AgentRegistry>,
57 decision_mode: DecisionMode,
58 ) -> Self {
59 let (decision_handler, auto_act): (Arc<dyn DecisionHandler>, bool) = match decision_mode {
60 DecisionMode::AutoApprove => (Arc::new(AutoApproveHandler), true),
61 DecisionMode::EventBus => (Arc::new(LogOnlyHandler), false),
62 DecisionMode::Custom(handler) => (handler, false),
63 };
64
65 Self {
66 storage,
67 event_bus,
68 agents,
69 contexts: DashMap::new(),
70 decision_handler,
71 auto_act,
72 }
73 }
74
75 fn extract_agents_from_tap_message(tap_message: &TapMessage) -> Vec<(String, String)> {
78 let agents_list = match tap_message {
79 TapMessage::Transfer(t) => &t.agents,
80 TapMessage::Payment(p) => &p.agents,
81 _ => return Vec::new(),
82 };
83 agents_list
84 .iter()
85 .map(|a| {
86 let role = match a.role.as_deref() {
87 Some("compliance") => "compliance",
88 _ => "other",
89 };
90 (a.id.clone(), role.to_string())
91 })
92 .collect()
93 }
94
95 fn get_or_create_context(
97 &self,
98 transaction_id: &str,
99 agent_dids: Vec<String>,
100 ) -> TransactionContext {
101 self.contexts
102 .entry(transaction_id.to_string())
103 .or_insert_with(|| TransactionContext::new(transaction_id.to_string(), agent_dids))
104 .clone()
105 }
106
107 fn save_context(&self, ctx: &TransactionContext) {
109 self.contexts
110 .insert(ctx.transaction_id.clone(), ctx.clone());
111 }
112
113 fn to_fsm_event(tap_message: &TapMessage, plain: &PlainMessage) -> Option<FsmEvent> {
115 match tap_message {
116 TapMessage::Transfer(_) | TapMessage::Payment(_) => {
117 let agent_dids: Vec<String> = Self::extract_agents_from_tap_message(tap_message)
118 .into_iter()
119 .map(|(did, _)| did)
120 .collect();
121 Some(FsmEvent::TransactionReceived { agent_dids })
122 }
123 TapMessage::Authorize(auth) => Some(FsmEvent::AuthorizeReceived {
124 agent_did: plain.from.clone(),
125 settlement_address: auth.settlement_address.clone(),
126 expiry: auth.expiry.clone(),
127 }),
128 TapMessage::Reject(reject) => Some(FsmEvent::RejectReceived {
129 agent_did: plain.from.clone(),
130 reason: reject.reason.clone(),
131 }),
132 TapMessage::Cancel(cancel) => Some(FsmEvent::CancelReceived {
133 by_did: plain.from.clone(),
134 reason: cancel.reason.clone(),
135 }),
136 TapMessage::Settle(settle) => Some(FsmEvent::SettleReceived {
137 settlement_id: settle.settlement_id.clone(),
138 amount: settle.amount.clone(),
139 }),
140 TapMessage::Revert(revert) => Some(FsmEvent::RevertReceived {
141 by_did: plain.from.clone(),
142 reason: revert.reason.clone(),
143 }),
144 TapMessage::AddAgents(add) => Some(FsmEvent::AgentsAdded {
145 agent_dids: add.agents.iter().map(|a| a.id.clone()).collect(),
146 }),
147 TapMessage::UpdatePolicies(_) => Some(FsmEvent::PoliciesReceived {
148 from_did: plain.from.clone(),
149 }),
150 TapMessage::Presentation(_) => Some(FsmEvent::PresentationReceived {
151 from_did: plain.from.clone(),
152 }),
153 _ => None,
154 }
155 }
156
157 fn transaction_id_for(tap_message: &TapMessage, plain: &PlainMessage) -> String {
159 match tap_message {
160 TapMessage::Transfer(_) | TapMessage::Payment(_) => plain.id.clone(),
161 TapMessage::Authorize(a) => a.transaction_id.clone(),
162 TapMessage::Reject(r) => r.transaction_id.clone(),
163 TapMessage::Cancel(c) => c.transaction_id.clone(),
164 TapMessage::Settle(s) => s.transaction_id.clone(),
165 TapMessage::Revert(r) => r.transaction_id.clone(),
166 TapMessage::AddAgents(a) => a.transaction_id.clone(),
167 TapMessage::UpdatePolicies(u) => u.transaction_id.clone(),
168 _ => plain.thid.clone().unwrap_or_default(),
170 }
171 }
172
173 async fn publish_decision(&self, ctx: &TransactionContext, decision: &Decision) {
175 let decision_json = serde_json::to_value(decision).unwrap_or_default();
176 self.event_bus
177 .publish_decision_required(
178 ctx.transaction_id.clone(),
179 ctx.state.to_string(),
180 decision_json,
181 ctx.pending_agents(),
182 )
183 .await;
184 }
185
186 async fn auto_authorize_transaction(&self, message: &PlainMessage) -> Result<()> {
190 let tap_message = TapMessage::from_plain_message(message)
191 .map_err(|e| Error::InvalidPlainMessage(e.to_string()))?;
192
193 let transaction_id = match &tap_message {
194 TapMessage::Transfer(transfer) => &transfer.transaction_id,
195 TapMessage::Payment(payment) => &payment.transaction_id,
196 _ => return Ok(()),
197 };
198
199 let our_agents = self.agents.get_all_dids();
200 let transaction_agents = Self::extract_agents_from_tap_message(&tap_message);
201
202 for (agent_did, _role) in transaction_agents {
203 if our_agents.contains(&agent_did) {
204 if let Ok(agent) = self.agents.get_agent(&agent_did).await {
205 use tap_msg::message::tap_message_trait::Authorizable;
206 let authorize_message = match &tap_message {
207 TapMessage::Transfer(transfer) => {
208 transfer.authorize(&agent_did, None, None)
209 }
210 TapMessage::Payment(payment) => payment.authorize(&agent_did, None, None),
211 _ => continue,
212 };
213
214 let auth_body = authorize_message.body;
215 let recipients_list = vec![message.from.as_str()];
216
217 log::info!(
218 "Auto-authorizing transaction {:?} from agent {}",
219 transaction_id,
220 agent_did
221 );
222
223 if let Err(e) = agent.send_message(&auth_body, recipients_list, true).await {
224 log::warn!(
225 "Failed to send auto-Authorize for transaction {:?} from agent {}: {}",
226 transaction_id,
227 agent_did,
228 e
229 );
230 }
231 }
232 }
233 }
234
235 Ok(())
236 }
237
238 async fn check_and_send_settle(&self, transaction_id: &str) -> Result<()> {
240 let transaction = self
241 .storage
242 .get_transaction_by_id(transaction_id)
243 .await
244 .map_err(|e| Error::Storage(e.to_string()))?
245 .ok_or_else(|| Error::Storage(format!("Transaction {} not found", transaction_id)))?;
246
247 let our_agents = self.agents.get_all_dids();
248 let is_sender = transaction
249 .from_did
250 .as_ref()
251 .map(|did| our_agents.contains(did))
252 .unwrap_or(false);
253
254 if !is_sender {
255 return Ok(());
256 }
257
258 let all_authorized = self
259 .storage
260 .are_all_agents_authorized(transaction_id)
261 .await
262 .map_err(|e| Error::Storage(e.to_string()))?;
263
264 if !all_authorized {
265 return Ok(());
266 }
267
268 if transaction.status.to_string() == "confirmed" {
269 return Ok(());
270 }
271
272 log::info!(
273 "All agents authorized for transaction {}, sending Settle message",
274 transaction_id
275 );
276
277 let sender_did = transaction
278 .from_did
279 .as_ref()
280 .ok_or_else(|| Error::Processing("Transaction missing from_did".to_string()))?;
281
282 let agent = self
283 .agents
284 .get_agent(sender_did)
285 .await
286 .map_err(|e| Error::Agent(e.to_string()))?;
287
288 let settlement_id = format!("settle_{}", transaction_id);
289
290 let transaction_message: PlainMessage =
291 serde_json::from_value(transaction.message_json.clone()).map_err(|e| {
292 Error::Serialization(format!("Failed to parse transaction message: {}", e))
293 })?;
294
295 let tap_message = TapMessage::from_plain_message(&transaction_message)
296 .map_err(|e| Error::InvalidPlainMessage(e.to_string()))?;
297
298 use tap_msg::message::tap_message_trait::Transaction;
299
300 let agents = Self::extract_agents_from_tap_message(&tap_message);
301
302 if agents.is_empty() {
303 log::debug!(
304 "No agents to send Settle message to for transaction {}",
305 transaction_id
306 );
307 return Ok(());
308 }
309
310 for (agent_did, _role) in agents {
311 if agent_did != *sender_did {
312 let settle_message = match &tap_message {
313 TapMessage::Transfer(transfer) => {
314 transfer.settle(sender_did, &settlement_id, None)
315 }
316 TapMessage::Payment(payment) => {
317 payment.settle(sender_did, &settlement_id, None)
318 }
319 _ => {
320 log::warn!(
321 "Unexpected message type for settlement: {}",
322 transaction.message_type
323 );
324 continue;
325 }
326 };
327
328 let settle_body = settle_message.body;
329 let recipients_list = vec![agent_did.as_str()];
330 let _ = agent
331 .send_message(&settle_body, recipients_list, true)
332 .await
333 .map_err(|e| {
334 log::warn!("Failed to send Settle to {}: {}", agent_did, e);
335 e
336 });
337 }
338 }
339
340 self.storage
341 .update_transaction_status(transaction_id, "confirmed")
342 .await
343 .map_err(|e| Error::Storage(e.to_string()))?;
344
345 self.event_bus
346 .publish_transaction_state_changed(
347 transaction_id.to_string(),
348 "pending".to_string(),
349 "confirmed".to_string(),
350 Some(sender_did.clone()),
351 )
352 .await;
353
354 Ok(())
355 }
356}
357
358#[async_trait]
359impl TransactionStateProcessor for StandardTransactionProcessor {
360 async fn process_message(&self, message: &PlainMessage) -> Result<()> {
361 let tap_message = TapMessage::from_plain_message(message)
362 .map_err(|e| Error::InvalidPlainMessage(e.to_string()))?;
363
364 let transaction_id = Self::transaction_id_for(&tap_message, message);
365
366 let fsm_event = Self::to_fsm_event(&tap_message, message);
368
369 match &tap_message {
371 TapMessage::Transfer(_) | TapMessage::Payment(_) => {
372 if let Err(e) = self.storage.insert_transaction(message).await {
373 log::warn!("Failed to insert transaction {}: {}", transaction_id, e);
374 }
375 let agents = Self::extract_agents_from_tap_message(&tap_message);
376 for (agent_did, role) in &agents {
377 if let Err(e) = self
378 .storage
379 .insert_transaction_agent(&transaction_id, agent_did, role)
380 .await
381 {
382 log::warn!(
383 "Failed to insert agent {} for transaction {}: {}",
384 agent_did,
385 transaction_id,
386 e
387 );
388 }
389 }
390 }
391 TapMessage::Authorize(_) => {
392 if let Err(e) = self
393 .storage
394 .update_transaction_agent_status(&transaction_id, &message.from, "authorized")
395 .await
396 {
397 log::warn!(
398 "Failed to update agent {} status for transaction {}: {}",
399 message.from,
400 transaction_id,
401 e
402 );
403 }
404 }
405 TapMessage::Reject(_) => {
406 let _ = self
407 .storage
408 .update_transaction_agent_status(&transaction_id, &message.from, "rejected")
409 .await;
410 let _ = self
411 .storage
412 .update_transaction_status(&transaction_id, "failed")
413 .await;
414 }
415 TapMessage::Cancel(_) => {
416 let _ = self
417 .storage
418 .update_transaction_agent_status(&transaction_id, &message.from, "cancelled")
419 .await;
420 let _ = self
421 .storage
422 .update_transaction_status(&transaction_id, "cancelled")
423 .await;
424 }
425 TapMessage::Settle(_) => {
426 let _ = self
427 .storage
428 .update_transaction_status(&transaction_id, "confirmed")
429 .await;
430 }
431 TapMessage::Revert(_) => {
432 let _ = self
433 .storage
434 .update_transaction_status(&transaction_id, "reverted")
435 .await;
436 }
437 TapMessage::AddAgents(add) => {
438 for agent in &add.agents {
439 let role_str = match agent.role.as_deref() {
440 Some("compliance") => "compliance",
441 _ => "other",
442 };
443 let _ = self
444 .storage
445 .insert_transaction_agent(&transaction_id, &agent.id, role_str)
446 .await;
447 }
448 }
449 _ => {}
450 }
451
452 if let Some(event) = fsm_event {
454 let agent_dids: Vec<String> = Self::extract_agents_from_tap_message(&tap_message)
455 .into_iter()
456 .map(|(did, _)| did)
457 .collect();
458
459 let mut ctx = self.get_or_create_context(&transaction_id, agent_dids);
460 let old_state = ctx.state.to_string();
461
462 match TransactionFsm::apply(&mut ctx, event) {
463 Ok(transition) => {
464 let new_state = transition.to_state.to_string();
465
466 self.save_context(&ctx);
468
469 if old_state != new_state {
471 self.event_bus
472 .publish_transaction_state_changed(
473 transaction_id.clone(),
474 old_state,
475 new_state,
476 Some(message.from.clone()),
477 )
478 .await;
479 }
480
481 if let Some(ref decision) = transition.decision {
483 self.decision_handler.handle_decision(&ctx, decision).await;
485
486 self.publish_decision(&ctx, decision).await;
488
489 if self.auto_act {
491 match decision {
492 Decision::AuthorizationRequired { .. } => {
493 if let Err(e) = self.auto_authorize_transaction(message).await {
494 log::warn!(
495 "Failed to auto-authorize transaction {}: {}",
496 transaction_id,
497 e
498 );
499 }
500 }
501 Decision::SettlementRequired { transaction_id } => {
502 if let Err(e) = self.check_and_send_settle(transaction_id).await
503 {
504 log::warn!(
505 "Failed to check/send settle for transaction {}: {}",
506 transaction_id,
507 e
508 );
509 }
510 }
511 Decision::PolicySatisfactionRequired { .. } => {
512 log::debug!(
513 "Policy satisfaction required for transaction {} — no auto-action available",
514 transaction_id
515 );
516 }
517 }
518 }
519 }
520 }
521 Err(e) => {
522 log::warn!(
523 "FSM transition error for transaction {}: {}",
524 transaction_id,
525 e
526 );
527 }
528 }
529 }
530
531 Ok(())
532 }
533}