mf_core/runtime/
async_flow.rs

1use std::{sync::Arc};
2
3use crate::{
4    runtime::async_processor::{
5        AsyncProcessor, ProcessorError, TaskProcessor, TaskResult,
6    },
7    config::ProcessorConfig,
8    types::{ProcessorResult, TaskParams, TransactionStatus},
9    ForgeResult,
10};
11use async_trait::async_trait;
12use mf_state::debug;
13
14/// 事务处理器
15pub struct TransactionProcessor;
16
17#[async_trait]
18impl TaskProcessor<TaskParams, ProcessorResult> for TransactionProcessor {
19    async fn process(
20        &self,
21        (state, tr): TaskParams,
22    ) -> std::result::Result<ProcessorResult, ProcessorError> {
23        match state.apply(tr).await {
24            Ok(result) => Ok(ProcessorResult {
25                status: TransactionStatus::Completed,
26                error: None,
27                result: Some(result),
28            }),
29            Err(e) => Ok(ProcessorResult {
30                status: TransactionStatus::Failed(e.to_string()),
31                error: None,
32                result: None,
33            }),
34        }
35    }
36}
37
38#[derive(Clone)]
39pub struct FlowEngine {
40    processor:
41        Arc<AsyncProcessor<TaskParams, ProcessorResult, TransactionProcessor>>,
42}
43
44impl FlowEngine {
45    pub async fn new() -> ForgeResult<Self> {
46        let config = ProcessorConfig::default();
47        let mut processor = AsyncProcessor::new(config, TransactionProcessor);
48        processor.start().await.map_err(|e| {
49            crate::error::error_utils::engine_error(format!(
50                "启动异步处理器失败: {}",
51                e
52            ))
53        })?;
54
55        Ok(Self { processor: Arc::new(processor) })
56    }
57
58    pub async fn submit_transaction(
59        &self,
60        params: TaskParams,
61    ) -> ForgeResult<(
62        u64,
63        tokio::sync::mpsc::Receiver<TaskResult<TaskParams, ProcessorResult>>,
64    )> {
65        self.processor.submit_task(params, 0).await.map_err(Into::into)
66    }
67
68    pub async fn submit_transactions(
69        &self,
70        paramss: Vec<TaskParams>,
71    ) -> ForgeResult<
72        Vec<(
73            u64,
74            tokio::sync::mpsc::Receiver<
75                TaskResult<TaskParams, ProcessorResult>,
76            >,
77        )>,
78    > {
79        let mut results = Vec::new();
80        for transaction in paramss {
81            let result = self.submit_transaction(transaction).await?;
82            results.push(result);
83        }
84        Ok(results)
85    }
86
87    /// 关闭流引擎
88    ///
89    /// 注意:由于 processor 被包装在 Arc 中,这个方法只能发送关闭信号
90    /// 实际的关闭需要等到所有 Arc 引用都被释放
91    pub async fn shutdown(&self) -> ForgeResult<()> {
92        // 由于 processor 在 Arc 中,我们无法获取可变引用来调用 shutdown
93        // 这是设计上的限制,实际的关闭会在 Drop 时自动发生
94        debug!("FlowEngine shutdown 被调用,实际关闭将在 Drop 时发生");
95        Ok(())
96    }
97}