1use std::{sync::Arc};
2
3use moduforge_state::{
4 state::{State, TransactionResult},
5 transaction::Transaction,
6};
7use crate::{
8 async_processor::{
9 AsyncProcessor, ProcessorConfig, ProcessorError, TaskProcessor,
10 TaskResult,
11 },
12 ForgeResult,
13};
14use async_trait::async_trait;
15
16#[derive(Debug, Clone, PartialEq)]
17pub enum TransactionStatus {
18 Pending,
19 Processing,
20 Completed,
21 Failed(String),
22 Rolled,
23 NotFound,
24}
25
26#[derive(Debug, Clone)]
27pub struct ProcessorResult {
28 pub status: TransactionStatus,
29 pub error: Option<String>,
30 pub result: Option<TransactionResult>,
31}
32pub struct TransactionProcessor;
34pub type TaskParams = (Arc<State>, Transaction);
35
36#[async_trait]
37impl TaskProcessor<TaskParams, ProcessorResult> for TransactionProcessor {
38 async fn process(
39 &self,
40 (state, tr): TaskParams,
41 ) -> std::result::Result<ProcessorResult, ProcessorError> {
42 match state.apply(tr).await {
43 Ok(result) => Ok(ProcessorResult {
44 status: TransactionStatus::Completed,
45 error: None,
46 result: Some(result),
47 }),
48 Err(e) => Ok(ProcessorResult {
49 status: TransactionStatus::Failed(e.to_string()),
50 error: None,
51 result: None,
52 }),
53 }
54 }
55}
56
57#[derive(Clone)]
58pub struct FlowEngine {
59 processor:
60 Arc<AsyncProcessor<TaskParams, ProcessorResult, TransactionProcessor>>,
61}
62
63impl FlowEngine {
64 pub fn new() -> ForgeResult<Self> {
65 let config = ProcessorConfig::default();
66 let mut processor = AsyncProcessor::new(config, TransactionProcessor);
67 processor.start();
68
69 Ok(Self { processor: Arc::new(processor) })
70 }
71
72 pub async fn submit_transaction(
73 &self,
74 params: TaskParams,
75 ) -> ForgeResult<(
76 u64,
77 tokio::sync::mpsc::Receiver<TaskResult<TaskParams, ProcessorResult>>,
78 )> {
79 self.processor.submit_task(params, 0).await.map_err(Into::into)
80 }
81
82 pub async fn submit_transactions(
83 &self,
84 paramss: Vec<TaskParams>,
85 ) -> ForgeResult<
86 Vec<(
87 u64,
88 tokio::sync::mpsc::Receiver<
89 TaskResult<TaskParams, ProcessorResult>,
90 >,
91 )>,
92 > {
93 let mut results = Vec::new();
94 for transaction in paramss {
95 let result = self.submit_transaction(transaction).await?;
96 results.push(result);
97 }
98 Ok(results)
99 }
100}