moduforge_core/
async_flow.rs

1use std::{sync::Arc};
2
3use crate::{
4    async_processor::{
5        AsyncProcessor, ProcessorConfig, ProcessorError, TaskProcessor,
6        TaskResult,
7    },
8    types::{ProcessorResult, TaskParams, TransactionStatus},
9    ForgeResult,
10};
11use async_trait::async_trait;
12
13/// 事务处理器
14pub struct TransactionProcessor;
15
16#[async_trait]
17impl TaskProcessor<TaskParams, ProcessorResult> for TransactionProcessor {
18    async fn process(
19        &self,
20        (state, tr): TaskParams,
21    ) -> std::result::Result<ProcessorResult, ProcessorError> {
22        match state.apply(tr).await {
23            Ok(result) => Ok(ProcessorResult {
24                status: TransactionStatus::Completed,
25                error: None,
26                result: Some(result),
27            }),
28            Err(e) => Ok(ProcessorResult {
29                status: TransactionStatus::Failed(e.to_string()),
30                error: None,
31                result: None,
32            }),
33        }
34    }
35}
36
37#[derive(Clone)]
38pub struct FlowEngine {
39    processor:
40        Arc<AsyncProcessor<TaskParams, ProcessorResult, TransactionProcessor>>,
41}
42
43impl FlowEngine {
44    pub fn new() -> ForgeResult<Self> {
45        let config = ProcessorConfig::default();
46        let mut processor = AsyncProcessor::new(config, TransactionProcessor);
47        processor.start();
48
49        Ok(Self { processor: Arc::new(processor) })
50    }
51
52    pub async fn submit_transaction(
53        &self,
54        params: TaskParams,
55    ) -> ForgeResult<(
56        u64,
57        tokio::sync::mpsc::Receiver<TaskResult<TaskParams, ProcessorResult>>,
58    )> {
59        self.processor.submit_task(params, 0).await.map_err(Into::into)
60    }
61
62    pub async fn submit_transactions(
63        &self,
64        paramss: Vec<TaskParams>,
65    ) -> ForgeResult<
66        Vec<(
67            u64,
68            tokio::sync::mpsc::Receiver<
69                TaskResult<TaskParams, ProcessorResult>,
70            >,
71        )>,
72    > {
73        let mut results = Vec::new();
74        for transaction in paramss {
75            let result = self.submit_transaction(transaction).await?;
76            results.push(result);
77        }
78        Ok(results)
79    }
80}