use std::{sync::Arc};
use crate::{
runtime::async_processor::{
AsyncProcessor, ProcessorError, TaskProcessor, TaskResult,
},
config::ProcessorConfig,
debug::debug,
types::{ProcessorResult, TaskParams, TransactionStatus},
ForgeResult,
};
use async_trait::async_trait;
pub struct TransactionProcessor;
#[async_trait]
impl TaskProcessor<TaskParams, ProcessorResult> for TransactionProcessor {
async fn process(
&self,
(state, tr): TaskParams,
) -> std::result::Result<ProcessorResult, ProcessorError> {
match state.apply(tr).await {
Ok(result) => Ok(ProcessorResult {
status: TransactionStatus::Completed,
error: None,
result: Some(result),
}),
Err(e) => Ok(ProcessorResult {
status: TransactionStatus::Failed(e.to_string()),
error: None,
result: None,
}),
}
}
}
#[derive(Clone)]
pub struct FlowEngine {
processor:
Arc<AsyncProcessor<TaskParams, ProcessorResult, TransactionProcessor>>,
}
impl FlowEngine {
pub async fn new() -> ForgeResult<Self> {
let config = ProcessorConfig::default();
let mut processor = AsyncProcessor::new(config, TransactionProcessor);
processor.start().await.map_err(|e| {
crate::error::error_utils::engine_error(format!(
"启动异步处理器失败: {e}"
))
})?;
Ok(Self { processor: Arc::new(processor) })
}
pub async fn submit_transaction(
&self,
params: TaskParams,
) -> ForgeResult<(
u64,
tokio::sync::mpsc::Receiver<TaskResult<TaskParams, ProcessorResult>>,
)> {
self.processor.submit_task(params, 0).await
}
pub async fn submit_transactions(
&self,
paramss: Vec<TaskParams>,
) -> ForgeResult<
Vec<(
u64,
tokio::sync::mpsc::Receiver<
TaskResult<TaskParams, ProcessorResult>,
>,
)>,
> {
let mut results = Vec::new();
for transaction in paramss {
let result = self.submit_transaction(transaction).await?;
results.push(result);
}
Ok(results)
}
pub async fn shutdown(&self) -> ForgeResult<()> {
debug!("FlowEngine shutdown 被调用,实际关闭将在 Drop 时发生");
Ok(())
}
}