mf_core/runtime/
async_flow.rs

1use std::{sync::Arc};
2
3use crate::{
4    runtime::async_processor::{
5        AsyncProcessor, ProcessorError, TaskProcessor,
6        TaskResult,
7    },
8    config::ProcessorConfig,
9    types::{ProcessorResult, TaskParams, TransactionStatus},
10    ForgeResult,
11};
12use async_trait::async_trait;
13use mf_state::debug;
14
15/// 事务处理器
16pub struct TransactionProcessor;
17
18#[async_trait]
19impl TaskProcessor<TaskParams, ProcessorResult> for TransactionProcessor {
20    async fn process(
21        &self,
22        (state, tr): TaskParams,
23    ) -> std::result::Result<ProcessorResult, ProcessorError> {
24        match state.apply(tr).await {
25            Ok(result) => Ok(ProcessorResult {
26                status: TransactionStatus::Completed,
27                error: None,
28                result: Some(result),
29            }),
30            Err(e) => Ok(ProcessorResult {
31                status: TransactionStatus::Failed(e.to_string()),
32                error: None,
33                result: None,
34            }),
35        }
36    }
37}
38
39#[derive(Clone)]
40pub struct FlowEngine {
41    processor:
42        Arc<AsyncProcessor<TaskParams, ProcessorResult, TransactionProcessor>>,
43}
44
45impl FlowEngine {
46    pub async fn new() -> ForgeResult<Self> {
47        let config = ProcessorConfig::default();
48        let mut processor = AsyncProcessor::new(config, TransactionProcessor);
49        processor.start().await.map_err(|e| {
50            crate::error::error_utils::engine_error(format!(
51                "启动异步处理器失败: {}",
52                e
53            ))
54        })?;
55
56        Ok(Self { processor: Arc::new(processor) })
57    }
58
59    pub async fn submit_transaction(
60        &self,
61        params: TaskParams,
62    ) -> ForgeResult<(
63        u64,
64        tokio::sync::mpsc::Receiver<TaskResult<TaskParams, ProcessorResult>>,
65    )> {
66        self.processor.submit_task(params, 0).await.map_err(Into::into)
67    }
68
69    pub async fn submit_transactions(
70        &self,
71        paramss: Vec<TaskParams>,
72    ) -> ForgeResult<
73        Vec<(
74            u64,
75            tokio::sync::mpsc::Receiver<
76                TaskResult<TaskParams, ProcessorResult>,
77            >,
78        )>,
79    > {
80        let mut results = Vec::new();
81        for transaction in paramss {
82            let result = self.submit_transaction(transaction).await?;
83            results.push(result);
84        }
85        Ok(results)
86    }
87
88    /// 关闭流引擎
89    ///
90    /// 注意:由于 processor 被包装在 Arc 中,这个方法只能发送关闭信号
91    /// 实际的关闭需要等到所有 Arc 引用都被释放
92    pub async fn shutdown(&self) -> ForgeResult<()> {
93        // 由于 processor 在 Arc 中,我们无法获取可变引用来调用 shutdown
94        // 这是设计上的限制,实际的关闭会在 Drop 时自动发生
95        debug!("FlowEngine shutdown 被调用,实际关闭将在 Drop 时发生");
96        Ok(())
97    }
98}