moduforge_core/
async_flow.rs1use std::{sync::Arc};
2
3use crate::{
4 async_processor::{
5 AsyncProcessor, ProcessorConfig, ProcessorError, TaskProcessor,
6 TaskResult,
7 },
8 types::{ProcessorResult, TaskParams, TransactionStatus},
9 ForgeResult,
10};
11use async_trait::async_trait;
12
13pub struct TransactionProcessor;
15
16#[async_trait]
17impl TaskProcessor<TaskParams, ProcessorResult> for TransactionProcessor {
18 async fn process(
19 &self,
20 (state, tr): TaskParams,
21 ) -> std::result::Result<ProcessorResult, ProcessorError> {
22 match state.apply(tr).await {
23 Ok(result) => Ok(ProcessorResult {
24 status: TransactionStatus::Completed,
25 error: None,
26 result: Some(result),
27 }),
28 Err(e) => Ok(ProcessorResult {
29 status: TransactionStatus::Failed(e.to_string()),
30 error: None,
31 result: None,
32 }),
33 }
34 }
35}
36
37#[derive(Clone)]
38pub struct FlowEngine {
39 processor:
40 Arc<AsyncProcessor<TaskParams, ProcessorResult, TransactionProcessor>>,
41}
42
43impl FlowEngine {
44 pub fn new() -> ForgeResult<Self> {
45 let config = ProcessorConfig::default();
46 let mut processor = AsyncProcessor::new(config, TransactionProcessor);
47 processor.start();
48
49 Ok(Self { processor: Arc::new(processor) })
50 }
51
52 pub async fn submit_transaction(
53 &self,
54 params: TaskParams,
55 ) -> ForgeResult<(
56 u64,
57 tokio::sync::mpsc::Receiver<TaskResult<TaskParams, ProcessorResult>>,
58 )> {
59 self.processor.submit_task(params, 0).await.map_err(Into::into)
60 }
61
62 pub async fn submit_transactions(
63 &self,
64 paramss: Vec<TaskParams>,
65 ) -> ForgeResult<
66 Vec<(
67 u64,
68 tokio::sync::mpsc::Receiver<
69 TaskResult<TaskParams, ProcessorResult>,
70 >,
71 )>,
72 > {
73 let mut results = Vec::new();
74 for transaction in paramss {
75 let result = self.submit_transaction(transaction).await?;
76 results.push(result);
77 }
78 Ok(results)
79 }
80}