moduforge_core/
sync_flow.rs

1use std::{sync::Arc, time::Duration};
2
3use crate::{
4    sync_processor::{
5        ProcessorError, SyncProcessor, TaskProcessor, TaskResult,
6    },
7    types::{ProcessorResult, TaskParams, TransactionStatus},
8    ForgeResult,
9};
10use async_trait::async_trait;
11
12/// 事务处理器
13pub struct TransactionProcessor;
14
15#[async_trait]
16impl TaskProcessor<TaskParams, ProcessorResult> for TransactionProcessor {
17    async fn process(
18        &self,
19        (state, tr): TaskParams,
20    ) -> std::result::Result<ProcessorResult, ProcessorError> {
21        match state.apply(tr).await {
22            Ok(result) => Ok(ProcessorResult {
23                status: TransactionStatus::Completed,
24                error: None,
25                result: Some(result),
26            }),
27            Err(e) => Ok(ProcessorResult {
28                status: TransactionStatus::Failed(e.to_string()),
29                error: None,
30                result: None,
31            }),
32        }
33    }
34}
35
36#[derive(Clone)]
37pub struct FlowEngine {
38    processor:
39        Arc<SyncProcessor<TaskParams, ProcessorResult, TransactionProcessor>>,
40}
41
42impl FlowEngine {
43    pub fn new() -> ForgeResult<Self> {
44        let processor =
45            SyncProcessor::new(TransactionProcessor, 3, Duration::from_secs(1));
46        Ok(Self { processor: Arc::new(processor) })
47    }
48
49    pub async fn submit(
50        &self,
51        params: TaskParams,
52    ) -> TaskResult<TaskParams, ProcessorResult> {
53        self.processor.process_task(params).await
54    }
55}