moduforge_core/
flow.rs

1use std::{
2    fmt::{Display, Formatter},
3    sync::Arc,
4};
5
6use moduforge_state::{
7    state::{State, TransactionResult},
8    transaction::Transaction,
9};
10use crate::async_processor::{
11    TaskProcessor, AsyncProcessor, ProcessorConfig, ProcessorError, TaskResult,
12};
13use async_trait::async_trait;
14
15pub type Result<T> = std::result::Result<T, FlowError>;
16
17#[derive(Debug, Clone, PartialEq)]
18pub enum TransactionStatus {
19    Pending,
20    Processing,
21    Completed,
22    Failed(String),
23    Rolled,
24    NotFound,
25}
26
27#[derive(Debug)]
28pub enum FlowError {
29    QueueFull,
30    TransactionNotFound,
31    TransactionTimeout,
32    TransactionFailed(String),
33    PluginError(String),
34    StateError(String),
35    InvalidTransaction(String),
36    InternalError(String),
37}
38
39impl Display for FlowError {
40    fn fmt(
41        &self,
42        f: &mut Formatter<'_>,
43    ) -> std::fmt::Result {
44        match self {
45            FlowError::QueueFull => write!(f, "Transaction queue is full"),
46            FlowError::TransactionNotFound => {
47                write!(f, "Transaction not found")
48            },
49            FlowError::TransactionTimeout => write!(f, "Transaction timed out"),
50            FlowError::TransactionFailed(msg) => {
51                write!(f, "Transaction failed: {}", msg)
52            },
53            FlowError::PluginError(msg) => write!(f, "Plugin error: {}", msg),
54            FlowError::StateError(msg) => write!(f, "State error: {}", msg),
55            FlowError::InvalidTransaction(msg) => {
56                write!(f, "Invalid transaction: {}", msg)
57            },
58            FlowError::InternalError(msg) => {
59                write!(f, "Internal error: {}", msg)
60            },
61        }
62    }
63}
64
65impl std::error::Error for FlowError {}
66
67impl From<ProcessorError> for FlowError {
68    fn from(error: ProcessorError) -> Self {
69        match error {
70            ProcessorError::QueueFull => FlowError::QueueFull,
71            ProcessorError::TaskFailed(msg) => {
72                FlowError::TransactionFailed(msg)
73            },
74            ProcessorError::InternalError(msg) => FlowError::InternalError(msg),
75            ProcessorError::TaskTimeout => FlowError::TransactionTimeout,
76            ProcessorError::TaskCancelled => {
77                FlowError::TransactionFailed("Task was cancelled".to_string())
78            },
79            ProcessorError::RetryExhausted(msg) => {
80                FlowError::TransactionFailed(format!(
81                    "Retry attempts exhausted: {}",
82                    msg
83                ))
84            },
85        }
86    }
87}
88
89#[derive(Debug, Clone)]
90pub struct ProcessorResult {
91    pub status: TransactionStatus,
92    pub error: Option<String>,
93    pub result: Option<TransactionResult>,
94}
95/// 事务处理器
96pub struct TransactionProcessor;
97pub type TaskParams = (Arc<State>, Transaction);
98
99#[async_trait]
100impl TaskProcessor<TaskParams, ProcessorResult> for TransactionProcessor {
101    async fn process(
102        &self,
103        (state, tr): TaskParams,
104    ) -> std::result::Result<ProcessorResult, ProcessorError> {
105        match state.apply(tr).await {
106            Ok(result) => Ok(ProcessorResult {
107                status: TransactionStatus::Completed,
108                error: None,
109                result: Some(result),
110            }),
111            Err(e) => Ok(ProcessorResult {
112                status: TransactionStatus::Failed(e.to_string()),
113                error: None,
114                result: None,
115            }),
116        }
117    }
118}
119
120#[derive(Clone)]
121pub struct FlowEngine {
122    processor:
123        Arc<AsyncProcessor<TaskParams, ProcessorResult, TransactionProcessor>>,
124}
125
126impl FlowEngine {
127    pub fn new() -> Result<Self> {
128        let config = ProcessorConfig::default();
129        let mut processor = AsyncProcessor::new(config, TransactionProcessor);
130        processor.start();
131
132        Ok(Self { processor: Arc::new(processor) })
133    }
134
135    pub async fn submit_transaction(
136        &self,
137        params: TaskParams,
138    ) -> Result<(
139        u64,
140        tokio::sync::mpsc::Receiver<TaskResult<TaskParams, ProcessorResult>>,
141    )> {
142        self.processor.submit_task(params, 0).await.map_err(Into::into)
143    }
144
145    pub async fn submit_transactions(
146        &self,
147        paramss: Vec<TaskParams>,
148    ) -> Result<
149        Vec<(
150            u64,
151            tokio::sync::mpsc::Receiver<
152                TaskResult<TaskParams, ProcessorResult>,
153            >,
154        )>,
155    > {
156        let mut results = Vec::new();
157        for transaction in paramss {
158            let result = self.submit_transaction(transaction).await?;
159            results.push(result);
160        }
161        Ok(results)
162    }
163}