mf_core/runtime/
async_flow.rs1use std::{sync::Arc};
2
3use crate::{
4 runtime::async_processor::{
5 AsyncProcessor, ProcessorError, TaskProcessor,
6 TaskResult,
7 },
8 config::ProcessorConfig,
9 types::{ProcessorResult, TaskParams, TransactionStatus},
10 ForgeResult,
11};
12use async_trait::async_trait;
13use mf_state::debug;
14
15pub struct TransactionProcessor;
17
18#[async_trait]
19impl TaskProcessor<TaskParams, ProcessorResult> for TransactionProcessor {
20 async fn process(
21 &self,
22 (state, tr): TaskParams,
23 ) -> std::result::Result<ProcessorResult, ProcessorError> {
24 match state.apply(tr).await {
25 Ok(result) => Ok(ProcessorResult {
26 status: TransactionStatus::Completed,
27 error: None,
28 result: Some(result),
29 }),
30 Err(e) => Ok(ProcessorResult {
31 status: TransactionStatus::Failed(e.to_string()),
32 error: None,
33 result: None,
34 }),
35 }
36 }
37}
38
39#[derive(Clone)]
40pub struct FlowEngine {
41 processor:
42 Arc<AsyncProcessor<TaskParams, ProcessorResult, TransactionProcessor>>,
43}
44
45impl FlowEngine {
46 pub async fn new() -> ForgeResult<Self> {
47 let config = ProcessorConfig::default();
48 let mut processor = AsyncProcessor::new(config, TransactionProcessor);
49 processor.start().await.map_err(|e| {
50 crate::error::error_utils::engine_error(format!(
51 "启动异步处理器失败: {}",
52 e
53 ))
54 })?;
55
56 Ok(Self { processor: Arc::new(processor) })
57 }
58
59 pub async fn submit_transaction(
60 &self,
61 params: TaskParams,
62 ) -> ForgeResult<(
63 u64,
64 tokio::sync::mpsc::Receiver<TaskResult<TaskParams, ProcessorResult>>,
65 )> {
66 self.processor.submit_task(params, 0).await.map_err(Into::into)
67 }
68
69 pub async fn submit_transactions(
70 &self,
71 paramss: Vec<TaskParams>,
72 ) -> ForgeResult<
73 Vec<(
74 u64,
75 tokio::sync::mpsc::Receiver<
76 TaskResult<TaskParams, ProcessorResult>,
77 >,
78 )>,
79 > {
80 let mut results = Vec::new();
81 for transaction in paramss {
82 let result = self.submit_transaction(transaction).await?;
83 results.push(result);
84 }
85 Ok(results)
86 }
87
88 pub async fn shutdown(&self) -> ForgeResult<()> {
93 debug!("FlowEngine shutdown 被调用,实际关闭将在 Drop 时发生");
96 Ok(())
97 }
98}