1use crate::agent::AgentRegistry;
7use crate::error::{Error, Result};
8use crate::event::EventBus;
9use crate::storage::Storage;
10use async_trait::async_trait;
11use std::sync::Arc;
12use tap_agent::Agent;
13use tap_msg::didcomm::PlainMessage;
14use tap_msg::message::TapMessage;
15
16#[async_trait]
18pub trait TransactionStateProcessor: Send + Sync {
19 async fn process_message(&self, message: &PlainMessage) -> Result<()>;
21}
22
23pub struct StandardTransactionProcessor {
25 storage: Arc<Storage>,
26 event_bus: Arc<EventBus>,
27 agents: Arc<AgentRegistry>,
28}
29
30impl StandardTransactionProcessor {
31 pub fn new(
33 storage: Arc<Storage>,
34 event_bus: Arc<EventBus>,
35 agents: Arc<AgentRegistry>,
36 ) -> Self {
37 Self {
38 storage,
39 event_bus,
40 agents,
41 }
42 }
43
44 async fn extract_agents_from_message(
48 &self,
49 message: &PlainMessage,
50 ) -> Result<Vec<(String, String)>> {
51 let tap_message = TapMessage::from_plain_message(message)
52 .map_err(|e| Error::InvalidPlainMessage(e.to_string()))?;
53
54 let mut agents = Vec::new();
55
56 match tap_message {
57 TapMessage::Transfer(transfer) => {
58 for agent in &transfer.agents {
60 let role_str = match agent.role.as_str() {
61 "compliance" => "compliance",
62 _ => "other",
63 };
64 agents.push((agent.id.clone(), role_str.to_string()));
65 }
66 }
67 TapMessage::Payment(payment) => {
68 for agent in &payment.agents {
70 let role_str = match agent.role.as_str() {
71 "compliance" => "compliance",
72 _ => "other",
73 };
74 agents.push((agent.id.clone(), role_str.to_string()));
75 }
76 }
77 _ => {
78 return Ok(agents);
80 }
81 }
82
83 Ok(agents)
84 }
85
86 async fn auto_authorize_transaction(&self, message: &PlainMessage) -> Result<()> {
88 let tap_message = TapMessage::from_plain_message(message)
89 .map_err(|e| Error::InvalidPlainMessage(e.to_string()))?;
90
91 let transaction_id = match &tap_message {
93 TapMessage::Transfer(transfer) => &transfer.transaction_id,
94 TapMessage::Payment(payment) => &payment.transaction_id,
95 _ => return Ok(()), };
97
98 let our_agents = self.agents.get_all_dids();
100 let transaction_agents = self.extract_agents_from_message(message).await?;
101
102 for (agent_did, _role) in transaction_agents {
104 if our_agents.contains(&agent_did) {
105 if let Ok(agent) = self.agents.get_agent(&agent_did).await {
107 use tap_msg::message::tap_message_trait::Authorizable;
109 let authorize_message = match &tap_message {
110 TapMessage::Transfer(transfer) => {
111 transfer.authorize(&agent_did, None, None)
112 }
113 TapMessage::Payment(payment) => payment.authorize(&agent_did, None, None),
114 _ => continue, };
116
117 let auth_body = authorize_message.body;
119
120 let recipients_list = vec![message.from.as_str()];
122
123 log::info!(
124 "Auto-authorizing transaction {} from agent {}",
125 transaction_id,
126 agent_did
127 );
128
129 if let Err(e) = agent.send_message(&auth_body, recipients_list, true).await {
130 log::warn!(
131 "Failed to send auto-Authorize for transaction {} from agent {}: {}",
132 transaction_id,
133 agent_did,
134 e
135 );
136 }
137 }
138 }
139 }
140
141 Ok(())
142 }
143
144 async fn check_and_send_settle(&self, transaction_id: &str) -> Result<()> {
146 let transaction = self
148 .storage
149 .get_transaction_by_id(transaction_id)
150 .await
151 .map_err(|e| Error::Storage(e.to_string()))?
152 .ok_or_else(|| Error::Storage(format!("Transaction {} not found", transaction_id)))?;
153
154 let our_agents = self.agents.get_all_dids();
156 let is_sender = transaction
157 .from_did
158 .as_ref()
159 .map(|did| our_agents.contains(did))
160 .unwrap_or(false);
161
162 if !is_sender {
163 return Ok(()); }
165
166 let all_authorized = self
168 .storage
169 .are_all_agents_authorized(transaction_id)
170 .await
171 .map_err(|e| Error::Storage(e.to_string()))?;
172
173 if !all_authorized {
174 return Ok(()); }
176
177 if transaction.status.to_string() == "confirmed" {
179 return Ok(()); }
181
182 log::info!(
184 "All agents authorized for transaction {}, sending Settle message",
185 transaction_id
186 );
187
188 let sender_did = transaction
190 .from_did
191 .as_ref()
192 .ok_or_else(|| Error::Processing("Transaction missing from_did".to_string()))?;
193
194 let agent = self
195 .agents
196 .get_agent(sender_did)
197 .await
198 .map_err(|e| Error::Agent(e.to_string()))?;
199
200 let settlement_id = format!("settle_{}", transaction_id);
203
204 let transaction_message: PlainMessage =
206 serde_json::from_value(transaction.message_json.clone()).map_err(|e| {
207 Error::Serialization(format!("Failed to parse transaction message: {}", e))
208 })?;
209
210 let tap_message = TapMessage::from_plain_message(&transaction_message)
211 .map_err(|e| Error::InvalidPlainMessage(e.to_string()))?;
212
213 use tap_msg::message::tap_message_trait::Transaction;
215
216 let agents = self
219 .extract_agents_from_message(&transaction_message)
220 .await?;
221
222 if agents.is_empty() {
223 log::debug!(
224 "No agents to send Settle message to for transaction {}",
225 transaction_id
226 );
227 return Ok(());
228 }
229
230 for (agent_did, _role) in agents {
232 if agent_did != *sender_did {
233 let settle_message = match &tap_message {
235 TapMessage::Transfer(transfer) => {
236 transfer.settle(sender_did, &settlement_id, None)
237 }
238 TapMessage::Payment(payment) => {
239 payment.settle(sender_did, &settlement_id, None)
240 }
241 _ => {
242 log::warn!(
243 "Unexpected message type for settlement: {}",
244 transaction.message_type
245 );
246 continue;
247 }
248 };
249
250 let settle_body = settle_message.body;
252
253 let recipients_list = vec![agent_did.as_str()];
255 let _ = agent
256 .send_message(&settle_body, recipients_list, true)
257 .await
258 .map_err(|e| {
259 log::warn!("Failed to send Settle to {}: {}", agent_did, e);
260 e
261 });
262 }
263 }
264
265 self.storage
267 .update_transaction_status(transaction_id, "confirmed")
268 .await
269 .map_err(|e| Error::Storage(e.to_string()))?;
270
271 self.event_bus
273 .publish_transaction_state_changed(
274 transaction_id.to_string(),
275 "pending".to_string(),
276 "confirmed".to_string(),
277 Some(sender_did.clone()),
278 )
279 .await;
280
281 Ok(())
282 }
283}
284
285#[async_trait]
286impl TransactionStateProcessor for StandardTransactionProcessor {
287 async fn process_message(&self, message: &PlainMessage) -> Result<()> {
288 let tap_message = TapMessage::from_plain_message(message)
289 .map_err(|e| Error::InvalidPlainMessage(e.to_string()))?;
290
291 match tap_message {
292 TapMessage::Transfer(_) | TapMessage::Payment(_) => {
293 let transaction_id = &message.id;
295 if let Err(e) = self.storage.insert_transaction(message).await {
296 log::warn!("Failed to insert transaction {}: {}", transaction_id, e);
297 }
298
299 let agents = self.extract_agents_from_message(message).await?;
301
302 for (agent_did, role) in agents {
303 if let Err(e) = self
304 .storage
305 .insert_transaction_agent(transaction_id, &agent_did, &role)
306 .await
307 {
308 log::warn!(
309 "Failed to insert agent {} for transaction {}: {}",
310 agent_did,
311 transaction_id,
312 e
313 );
314 }
315 }
316
317 if let Err(e) = self.auto_authorize_transaction(message).await {
319 log::warn!(
320 "Failed to auto-authorize transaction {}: {}",
321 transaction_id,
322 e
323 );
324 }
325 }
326 TapMessage::Authorize(auth) => {
327 let transaction_id = &auth.transaction_id;
328 let agent_did = &message.from;
329
330 if let Err(e) = self
332 .storage
333 .update_transaction_agent_status(transaction_id, agent_did, "authorized")
334 .await
335 {
336 log::warn!(
337 "Failed to update agent {} status for transaction {}: {}",
338 agent_did,
339 transaction_id,
340 e
341 );
342 } else {
343 self.event_bus
345 .publish_transaction_state_changed(
346 transaction_id.clone(),
347 "pending".to_string(),
348 "pending".to_string(), Some(agent_did.clone()),
350 )
351 .await;
352
353 if let Err(e) = self.check_and_send_settle(transaction_id).await {
355 log::warn!(
356 "Failed to check/send settle for transaction {}: {}",
357 transaction_id,
358 e
359 );
360 }
361 }
362 }
363 TapMessage::Cancel(cancel) => {
364 let transaction_id = &cancel.transaction_id;
365 let agent_did = &message.from;
366
367 if let Err(e) = self
369 .storage
370 .update_transaction_agent_status(transaction_id, agent_did, "cancelled")
371 .await
372 {
373 log::warn!(
374 "Failed to update agent {} status for transaction {}: {}",
375 agent_did,
376 transaction_id,
377 e
378 );
379 }
380
381 if let Err(e) = self
383 .storage
384 .update_transaction_status(transaction_id, "cancelled")
385 .await
386 {
387 log::warn!(
388 "Failed to update transaction {} status: {}",
389 transaction_id,
390 e
391 );
392 } else {
393 self.event_bus
395 .publish_transaction_state_changed(
396 transaction_id.clone(),
397 "pending".to_string(),
398 "cancelled".to_string(),
399 Some(agent_did.clone()),
400 )
401 .await;
402 }
403 }
404 TapMessage::Reject(reject) => {
405 let transaction_id = &reject.transaction_id;
406 let agent_did = &message.from;
407
408 if let Err(e) = self
410 .storage
411 .update_transaction_agent_status(transaction_id, agent_did, "rejected")
412 .await
413 {
414 log::warn!(
415 "Failed to update agent {} status for transaction {}: {}",
416 agent_did,
417 transaction_id,
418 e
419 );
420 }
421
422 if let Err(e) = self
424 .storage
425 .update_transaction_status(transaction_id, "failed")
426 .await
427 {
428 log::warn!(
429 "Failed to update transaction {} status: {}",
430 transaction_id,
431 e
432 );
433 } else {
434 self.event_bus
436 .publish_transaction_state_changed(
437 transaction_id.clone(),
438 "pending".to_string(),
439 "failed".to_string(),
440 Some(agent_did.clone()),
441 )
442 .await;
443 }
444 }
445 TapMessage::Settle(settle) => {
446 let transaction_id = &settle.transaction_id;
447
448 if let Err(e) = self
450 .storage
451 .update_transaction_status(transaction_id, "confirmed")
452 .await
453 {
454 log::warn!(
455 "Failed to update transaction {} status: {}",
456 transaction_id,
457 e
458 );
459 } else {
460 self.event_bus
462 .publish_transaction_state_changed(
463 transaction_id.clone(),
464 "pending".to_string(),
465 "confirmed".to_string(),
466 Some(message.from.clone()),
467 )
468 .await;
469 }
470 }
471 TapMessage::Revert(revert) => {
472 let transaction_id = &revert.transaction_id;
473
474 if let Err(e) = self
476 .storage
477 .update_transaction_status(transaction_id, "reverted")
478 .await
479 {
480 log::warn!(
481 "Failed to update transaction {} status: {}",
482 transaction_id,
483 e
484 );
485 } else {
486 self.event_bus
488 .publish_transaction_state_changed(
489 transaction_id.clone(),
490 "confirmed".to_string(),
491 "reverted".to_string(),
492 Some(message.from.clone()),
493 )
494 .await;
495 }
496 }
497 TapMessage::AddAgents(add) => {
498 let transaction_id = &add.transaction_id;
500
501 for agent in &add.agents {
502 let role_str = match agent.role.as_str() {
503 "compliance" => "compliance",
504 _ => "other",
505 };
506
507 if let Err(e) = self
508 .storage
509 .insert_transaction_agent(transaction_id, &agent.id, role_str)
510 .await
511 {
512 log::warn!(
513 "Failed to add agent {} to transaction {}: {}",
514 agent.id,
515 transaction_id,
516 e
517 );
518 }
519 }
520 }
521 TapMessage::UpdatePolicies(_) => {
522 log::debug!("UpdatePolicies message received, but policy storage not implemented");
526 }
527 _ => {
528 }
530 }
531
532 Ok(())
533 }
534}