moduforge_core/
flow.rs

1use std::{sync::Arc};
2
3use moduforge_state::{
4    state::{State, TransactionResult},
5    transaction::Transaction,
6};
7use crate::{
8    async_processor::{
9        AsyncProcessor, ProcessorConfig, ProcessorError, TaskProcessor,
10        TaskResult,
11    },
12    ForgeResult,
13};
14use async_trait::async_trait;
15
16#[derive(Debug, Clone, PartialEq)]
17pub enum TransactionStatus {
18    Pending,
19    Processing,
20    Completed,
21    Failed(String),
22    Rolled,
23    NotFound,
24}
25
26#[derive(Debug, Clone)]
27pub struct ProcessorResult {
28    pub status: TransactionStatus,
29    pub error: Option<String>,
30    pub result: Option<TransactionResult>,
31}
32/// 事务处理器
33pub struct TransactionProcessor;
34pub type TaskParams = (Arc<State>, Transaction);
35
36#[async_trait]
37impl TaskProcessor<TaskParams, ProcessorResult> for TransactionProcessor {
38    async fn process(
39        &self,
40        (state, tr): TaskParams,
41    ) -> std::result::Result<ProcessorResult, ProcessorError> {
42        match state.apply(tr).await {
43            Ok(result) => Ok(ProcessorResult {
44                status: TransactionStatus::Completed,
45                error: None,
46                result: Some(result),
47            }),
48            Err(e) => Ok(ProcessorResult {
49                status: TransactionStatus::Failed(e.to_string()),
50                error: None,
51                result: None,
52            }),
53        }
54    }
55}
56
57#[derive(Clone)]
58pub struct FlowEngine {
59    processor:
60        Arc<AsyncProcessor<TaskParams, ProcessorResult, TransactionProcessor>>,
61}
62
63impl FlowEngine {
64    pub fn new() -> ForgeResult<Self> {
65        let config = ProcessorConfig::default();
66        let mut processor = AsyncProcessor::new(config, TransactionProcessor);
67        processor.start();
68
69        Ok(Self { processor: Arc::new(processor) })
70    }
71
72    pub async fn submit_transaction(
73        &self,
74        params: TaskParams,
75    ) -> ForgeResult<(
76        u64,
77        tokio::sync::mpsc::Receiver<TaskResult<TaskParams, ProcessorResult>>,
78    )> {
79        self.processor.submit_task(params, 0).await.map_err(Into::into)
80    }
81
82    pub async fn submit_transactions(
83        &self,
84        paramss: Vec<TaskParams>,
85    ) -> ForgeResult<
86        Vec<(
87            u64,
88            tokio::sync::mpsc::Receiver<
89                TaskResult<TaskParams, ProcessorResult>,
90            >,
91        )>,
92    > {
93        let mut results = Vec::new();
94        for transaction in paramss {
95            let result = self.submit_transaction(transaction).await?;
96            results.push(result);
97        }
98        Ok(results)
99    }
100}