mf_core/runtime/
async_flow.rs1use std::{sync::Arc};
2
3use crate::{
4 runtime::async_processor::{
5 AsyncProcessor, ProcessorError, TaskProcessor, TaskResult,
6 },
7 config::ProcessorConfig,
8 types::{ProcessorResult, TaskParams, TransactionStatus},
9 ForgeResult,
10};
11use async_trait::async_trait;
12use mf_state::debug;
13
14pub struct TransactionProcessor;
16
17#[async_trait]
18impl TaskProcessor<TaskParams, ProcessorResult> for TransactionProcessor {
19 async fn process(
20 &self,
21 (state, tr): TaskParams,
22 ) -> std::result::Result<ProcessorResult, ProcessorError> {
23 match state.apply(tr).await {
24 Ok(result) => Ok(ProcessorResult {
25 status: TransactionStatus::Completed,
26 error: None,
27 result: Some(result),
28 }),
29 Err(e) => Ok(ProcessorResult {
30 status: TransactionStatus::Failed(e.to_string()),
31 error: None,
32 result: None,
33 }),
34 }
35 }
36}
37
38#[derive(Clone)]
39pub struct FlowEngine {
40 processor:
41 Arc<AsyncProcessor<TaskParams, ProcessorResult, TransactionProcessor>>,
42}
43
44impl FlowEngine {
45 pub async fn new() -> ForgeResult<Self> {
46 let config = ProcessorConfig::default();
47 let mut processor = AsyncProcessor::new(config, TransactionProcessor);
48 processor.start().await.map_err(|e| {
49 crate::error::error_utils::engine_error(format!(
50 "启动异步处理器失败: {}",
51 e
52 ))
53 })?;
54
55 Ok(Self { processor: Arc::new(processor) })
56 }
57
58 pub async fn submit_transaction(
59 &self,
60 params: TaskParams,
61 ) -> ForgeResult<(
62 u64,
63 tokio::sync::mpsc::Receiver<TaskResult<TaskParams, ProcessorResult>>,
64 )> {
65 self.processor.submit_task(params, 0).await.map_err(Into::into)
66 }
67
68 pub async fn submit_transactions(
69 &self,
70 paramss: Vec<TaskParams>,
71 ) -> ForgeResult<
72 Vec<(
73 u64,
74 tokio::sync::mpsc::Receiver<
75 TaskResult<TaskParams, ProcessorResult>,
76 >,
77 )>,
78 > {
79 let mut results = Vec::new();
80 for transaction in paramss {
81 let result = self.submit_transaction(transaction).await?;
82 results.push(result);
83 }
84 Ok(results)
85 }
86
87 pub async fn shutdown(&self) -> ForgeResult<()> {
92 debug!("FlowEngine shutdown 被调用,实际关闭将在 Drop 时发生");
95 Ok(())
96 }
97}