mf_core/runtime/
async_flow.rs1use std::{sync::Arc};
2
3use crate::{
4 runtime::async_processor::{
5 AsyncProcessor, ProcessorError, TaskProcessor, TaskResult,
6 },
7 config::ProcessorConfig,
8 debug::debug,
9 types::{ProcessorResult, TaskParams, TransactionStatus},
10 ForgeResult,
11};
12use async_trait::async_trait;
13
14pub struct TransactionProcessor;
16
17#[async_trait]
18impl TaskProcessor<TaskParams, ProcessorResult> for TransactionProcessor {
19 async fn process(
20 &self,
21 (state, tr): TaskParams,
22 ) -> std::result::Result<ProcessorResult, ProcessorError> {
23 match state.apply(tr).await {
24 Ok(result) => Ok(ProcessorResult {
25 status: TransactionStatus::Completed,
26 error: None,
27 result: Some(result),
28 }),
29 Err(e) => Ok(ProcessorResult {
30 status: TransactionStatus::Failed(e.to_string()),
31 error: None,
32 result: None,
33 }),
34 }
35 }
36}
37
38#[derive(Clone)]
39pub struct FlowEngine {
40 processor:
41 Arc<AsyncProcessor<TaskParams, ProcessorResult, TransactionProcessor>>,
42}
43
44impl FlowEngine {
45 pub async fn new() -> ForgeResult<Self> {
46 let config = ProcessorConfig::default();
47 let mut processor = AsyncProcessor::new(config, TransactionProcessor);
48 processor.start().await.map_err(|e| {
49 crate::error::error_utils::engine_error(format!(
50 "启动异步处理器失败: {e}"
51 ))
52 })?;
53
54 Ok(Self { processor: Arc::new(processor) })
55 }
56
57 pub async fn submit_transaction(
58 &self,
59 params: TaskParams,
60 ) -> ForgeResult<(
61 u64,
62 tokio::sync::mpsc::Receiver<TaskResult<TaskParams, ProcessorResult>>,
63 )> {
64 self.processor.submit_task(params, 0).await
65 }
66
67 pub async fn submit_transactions(
68 &self,
69 paramss: Vec<TaskParams>,
70 ) -> ForgeResult<
71 Vec<(
72 u64,
73 tokio::sync::mpsc::Receiver<
74 TaskResult<TaskParams, ProcessorResult>,
75 >,
76 )>,
77 > {
78 let mut results = Vec::new();
79 for transaction in paramss {
80 let result = self.submit_transaction(transaction).await?;
81 results.push(result);
82 }
83 Ok(results)
84 }
85
86 pub async fn shutdown(&self) -> ForgeResult<()> {
91 debug!("FlowEngine shutdown 被调用,实际关闭将在 Drop 时发生");
94 Ok(())
95 }
96}