mf_core/runtime/
async_flow.rs

1use std::{sync::Arc};
2
3use crate::{
4    runtime::async_processor::{
5        AsyncProcessor, ProcessorError, TaskProcessor, TaskResult,
6    },
7    config::ProcessorConfig,
8    debug::debug,
9    types::{ProcessorResult, TaskParams, TransactionStatus},
10    ForgeResult,
11};
12use async_trait::async_trait;
13
14/// 事务处理器
15pub struct TransactionProcessor;
16
17#[async_trait]
18impl TaskProcessor<TaskParams, ProcessorResult> for TransactionProcessor {
19    async fn process(
20        &self,
21        (state, tr): TaskParams,
22    ) -> std::result::Result<ProcessorResult, ProcessorError> {
23        match state.apply(tr).await {
24            Ok(result) => Ok(ProcessorResult {
25                status: TransactionStatus::Completed,
26                error: None,
27                result: Some(result),
28            }),
29            Err(e) => Ok(ProcessorResult {
30                status: TransactionStatus::Failed(e.to_string()),
31                error: None,
32                result: None,
33            }),
34        }
35    }
36}
37
38#[derive(Clone)]
39pub struct FlowEngine {
40    processor:
41        Arc<AsyncProcessor<TaskParams, ProcessorResult, TransactionProcessor>>,
42}
43
44impl FlowEngine {
45    pub async fn new() -> ForgeResult<Self> {
46        let config = ProcessorConfig::default();
47        let mut processor = AsyncProcessor::new(config, TransactionProcessor);
48        processor.start().await.map_err(|e| {
49            crate::error::error_utils::engine_error(format!(
50                "启动异步处理器失败: {e}"
51            ))
52        })?;
53
54        Ok(Self { processor: Arc::new(processor) })
55    }
56
57    pub async fn submit_transaction(
58        &self,
59        params: TaskParams,
60    ) -> ForgeResult<(
61        u64,
62        tokio::sync::mpsc::Receiver<TaskResult<TaskParams, ProcessorResult>>,
63    )> {
64        self.processor.submit_task(params, 0).await
65    }
66
67    pub async fn submit_transactions(
68        &self,
69        paramss: Vec<TaskParams>,
70    ) -> ForgeResult<
71        Vec<(
72            u64,
73            tokio::sync::mpsc::Receiver<
74                TaskResult<TaskParams, ProcessorResult>,
75            >,
76        )>,
77    > {
78        let mut results = Vec::new();
79        for transaction in paramss {
80            let result = self.submit_transaction(transaction).await?;
81            results.push(result);
82        }
83        Ok(results)
84    }
85
86    /// 关闭流引擎
87    ///
88    /// 注意:由于 processor 被包装在 Arc 中,这个方法只能发送关闭信号
89    /// 实际的关闭需要等到所有 Arc 引用都被释放
90    pub async fn shutdown(&self) -> ForgeResult<()> {
91        // 由于 processor 在 Arc 中,我们无法获取可变引用来调用 shutdown
92        // 这是设计上的限制,实际的关闭会在 Drop 时自动发生
93        debug!("FlowEngine shutdown 被调用,实际关闭将在 Drop 时发生");
94        Ok(())
95    }
96}