1use std::{
2 fmt::{Display, Formatter},
3 sync::Arc,
4};
5
6use moduforge_state::{
7 state::{State, TransactionResult},
8 transaction::Transaction,
9};
10use crate::async_processor::{
11 TaskProcessor, AsyncProcessor, ProcessorConfig, ProcessorError, TaskResult,
12};
13use async_trait::async_trait;
14
15pub type Result<T> = std::result::Result<T, FlowError>;
16
17#[derive(Debug, Clone, PartialEq)]
18pub enum TransactionStatus {
19 Pending,
20 Processing,
21 Completed,
22 Failed(String),
23 Rolled,
24 NotFound,
25}
26
27#[derive(Debug)]
28pub enum FlowError {
29 QueueFull,
30 TransactionNotFound,
31 TransactionTimeout,
32 TransactionFailed(String),
33 PluginError(String),
34 StateError(String),
35 InvalidTransaction(String),
36 InternalError(String),
37}
38
39impl Display for FlowError {
40 fn fmt(
41 &self,
42 f: &mut Formatter<'_>,
43 ) -> std::fmt::Result {
44 match self {
45 FlowError::QueueFull => write!(f, "Transaction queue is full"),
46 FlowError::TransactionNotFound => {
47 write!(f, "Transaction not found")
48 },
49 FlowError::TransactionTimeout => write!(f, "Transaction timed out"),
50 FlowError::TransactionFailed(msg) => {
51 write!(f, "Transaction failed: {}", msg)
52 },
53 FlowError::PluginError(msg) => write!(f, "Plugin error: {}", msg),
54 FlowError::StateError(msg) => write!(f, "State error: {}", msg),
55 FlowError::InvalidTransaction(msg) => {
56 write!(f, "Invalid transaction: {}", msg)
57 },
58 FlowError::InternalError(msg) => {
59 write!(f, "Internal error: {}", msg)
60 },
61 }
62 }
63}
64
65impl std::error::Error for FlowError {}
66
67impl From<ProcessorError> for FlowError {
68 fn from(error: ProcessorError) -> Self {
69 match error {
70 ProcessorError::QueueFull => FlowError::QueueFull,
71 ProcessorError::TaskFailed(msg) => {
72 FlowError::TransactionFailed(msg)
73 },
74 ProcessorError::InternalError(msg) => FlowError::InternalError(msg),
75 ProcessorError::TaskTimeout => FlowError::TransactionTimeout,
76 ProcessorError::TaskCancelled => {
77 FlowError::TransactionFailed("Task was cancelled".to_string())
78 },
79 ProcessorError::RetryExhausted(msg) => {
80 FlowError::TransactionFailed(format!(
81 "Retry attempts exhausted: {}",
82 msg
83 ))
84 },
85 }
86 }
87}
88
89#[derive(Debug, Clone)]
90pub struct ProcessorResult {
91 pub status: TransactionStatus,
92 pub error: Option<String>,
93 pub result: Option<TransactionResult>,
94}
95pub struct TransactionProcessor;
97pub type TaskParams = (Arc<State>, Transaction);
98
99#[async_trait]
100impl TaskProcessor<TaskParams, ProcessorResult> for TransactionProcessor {
101 async fn process(
102 &self,
103 (state, tr): TaskParams,
104 ) -> std::result::Result<ProcessorResult, ProcessorError> {
105 match state.apply(tr).await {
106 Ok(result) => Ok(ProcessorResult {
107 status: TransactionStatus::Completed,
108 error: None,
109 result: Some(result),
110 }),
111 Err(e) => Ok(ProcessorResult {
112 status: TransactionStatus::Failed(e.to_string()),
113 error: None,
114 result: None,
115 }),
116 }
117 }
118}
119
120#[derive(Clone)]
121pub struct FlowEngine {
122 processor:
123 Arc<AsyncProcessor<TaskParams, ProcessorResult, TransactionProcessor>>,
124}
125
126impl FlowEngine {
127 pub fn new() -> Result<Self> {
128 let config = ProcessorConfig::default();
129 let mut processor = AsyncProcessor::new(config, TransactionProcessor);
130 processor.start();
131
132 Ok(Self { processor: Arc::new(processor) })
133 }
134
135 pub async fn submit_transaction(
136 &self,
137 params: TaskParams,
138 ) -> Result<(
139 u64,
140 tokio::sync::mpsc::Receiver<TaskResult<TaskParams, ProcessorResult>>,
141 )> {
142 self.processor.submit_task(params, 0).await.map_err(Into::into)
143 }
144
145 pub async fn submit_transactions(
146 &self,
147 paramss: Vec<TaskParams>,
148 ) -> Result<
149 Vec<(
150 u64,
151 tokio::sync::mpsc::Receiver<
152 TaskResult<TaskParams, ProcessorResult>,
153 >,
154 )>,
155 > {
156 let mut results = Vec::new();
157 for transaction in paramss {
158 let result = self.submit_transaction(transaction).await?;
159 results.push(result);
160 }
161 Ok(results)
162 }
163}