use ractor::{Actor, ActorRef, ActorProcessingErr};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::oneshot;
use crate::{
config::ForgeConfig,
debug::debug,
error::{error_utils, ForgeResult},
event::Event,
middleware::MiddlewareStack,
runtime::sync_flow::FlowEngine,
types::ProcessorResult,
metrics,
};
use mf_state::{state::State, transaction::Transaction};
use super::{ActorMetrics, ActorSystemResult};
#[derive(Debug)]
pub enum TransactionMessage {
ProcessTransaction {
transaction: Transaction,
description: String,
meta: serde_json::Value,
reply: oneshot::Sender<ForgeResult<()>>,
},
GetStats { reply: oneshot::Sender<TransactionStats> },
UpdateConfig {
config: ForgeConfig,
reply: oneshot::Sender<ForgeResult<()>>,
},
}
#[derive(Debug, Clone)]
pub struct TransactionStats {
pub transactions_processed: u64,
pub transaction_failures: u64,
pub avg_processing_time_ms: u64,
pub middleware_timeouts: u64,
}
pub struct TransactionProcessorState {
state_actor: ActorRef<super::StateMessage>,
event_bus: ActorRef<super::EventBusMessage>,
middleware_stack: MiddlewareStack,
flow_engine: Arc<FlowEngine>,
config: ForgeConfig,
metrics: ActorMetrics,
stats: TransactionStats,
}
pub struct TransactionProcessorActor;
#[ractor::async_trait]
impl Actor for TransactionProcessorActor {
type Msg = TransactionMessage;
type State = TransactionProcessorState;
type Arguments = (
ActorRef<super::StateMessage>,
ActorRef<super::EventBusMessage>,
MiddlewareStack,
Arc<FlowEngine>,
ForgeConfig,
);
async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let (state_actor, event_bus, middleware_stack, flow_engine, config) =
args;
debug!("启动事务处理Actor");
Ok(TransactionProcessorState {
state_actor,
event_bus,
middleware_stack,
flow_engine,
config,
metrics: ActorMetrics::default(),
stats: TransactionStats {
transactions_processed: 0,
transaction_failures: 0,
avg_processing_time_ms: 0,
middleware_timeouts: 0,
},
})
}
async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
TransactionMessage::ProcessTransaction {
transaction,
description,
meta,
reply,
} => {
let start_time = Instant::now();
let result = self
.dispatch_with_meta_exact_logic(
state,
transaction,
description,
meta,
)
.await;
let processing_time = start_time.elapsed();
state.stats.transactions_processed += 1;
if result.is_err() {
state.stats.transaction_failures += 1;
state.metrics.increment_errors();
}
state.stats.avg_processing_time_ms =
processing_time.as_millis() as u64;
state
.metrics
.update_processing_time(processing_time.as_millis() as u64);
state.metrics.increment_messages();
let _ = reply.send(result);
},
TransactionMessage::GetStats { reply } => {
let _ = reply.send(state.stats.clone());
},
TransactionMessage::UpdateConfig { config, reply } => {
state.config = config;
let _ = reply.send(Ok(()));
},
}
Ok(())
}
async fn post_stop(
&self,
_myself: ActorRef<Self::Msg>,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
debug!("停止事务处理Actor");
Ok(())
}
}
impl TransactionProcessorActor {
async fn dispatch_with_meta_exact_logic(
&self,
state: &mut TransactionProcessorState,
transaction: Transaction,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
metrics::transaction_dispatched();
let current_state = self.get_current_state(&state.state_actor).await?;
let old_state = current_state.clone();
let mut current_transaction = transaction;
self.run_before_middleware(state, &mut current_transaction).await?;
let task_result = state
.flow_engine
.submit((current_state, current_transaction.clone()))
.await;
let Some(ProcessorResult { result: Some(result), .. }) =
task_result.output
else {
return Err(error_utils::state_error(
"任务处理结果无效".to_string(),
));
};
let mut state_update = None;
let mut transactions = Vec::new();
transactions.extend(result.transactions);
if transactions.last().is_some() {
state_update = Some(result.state);
}
self.run_after_middleware(state, &mut state_update, &mut transactions)
.await?;
if let Some(new_state) = state_update {
self.record_transactions(
&state.state_actor,
new_state.clone(),
transactions.clone(),
description,
meta,
)
.await?;
self.emit_event(
&state.event_bus,
Event::TrApply { old_state, new_state, transactions },
)
.await?;
}
Ok(())
}
async fn get_current_state(
&self,
state_actor: &ActorRef<super::StateMessage>,
) -> ForgeResult<Arc<State>> {
let (tx, rx) = oneshot::channel();
state_actor
.send_message(super::StateMessage::GetState { reply: tx })
.map_err(|e| {
error_utils::state_error(format!("发送获取状态消息失败: {e}"))
})?;
rx.await.map_err(|e| {
error_utils::state_error(format!("接收状态响应失败: {e}"))
})
}
async fn run_before_middleware(
&self,
actor_state: &mut TransactionProcessorState,
transaction: &mut Transaction,
) -> ForgeResult<()> {
debug!("执行前置中间件链");
for middleware in &actor_state.middleware_stack.middlewares {
let start_time = Instant::now();
let timeout = std::time::Duration::from_millis(
actor_state.config.performance.middleware_timeout_ms,
);
match tokio::time::timeout(
timeout,
middleware.before_dispatch(transaction),
)
.await
{
Ok(Ok(())) => {
metrics::middleware_execution_duration(
start_time.elapsed(),
"before",
middleware.name().as_str(),
);
continue;
},
Ok(Err(e)) => {
return Err(error_utils::middleware_error(format!(
"前置中间件执行失败: {e}"
)));
},
Err(_) => {
actor_state.stats.middleware_timeouts += 1;
return Err(error_utils::middleware_error(format!(
"前置中间件执行超时({}ms)",
actor_state.config.performance.middleware_timeout_ms
)));
},
}
}
Ok(())
}
async fn run_after_middleware(
&self,
actor_state: &mut TransactionProcessorState,
state_update: &mut Option<Arc<State>>,
transactions: &mut Vec<Arc<Transaction>>,
) -> ForgeResult<()> {
debug!("执行后置中间件链");
for middleware in &actor_state.middleware_stack.middlewares {
let start_time = Instant::now();
let timeout = std::time::Duration::from_millis(
actor_state.config.performance.middleware_timeout_ms,
);
let middleware_result = match tokio::time::timeout(
timeout,
middleware.after_dispatch(state_update.clone(), transactions),
)
.await
{
Ok(Ok(result)) => {
metrics::middleware_execution_duration(
start_time.elapsed(),
"after",
middleware.name().as_str(),
);
result
},
Ok(Err(e)) => {
return Err(error_utils::middleware_error(format!(
"后置中间件执行失败: {e}"
)));
},
Err(_) => {
actor_state.stats.middleware_timeouts += 1;
return Err(error_utils::middleware_error(format!(
"后置中间件执行超时({}ms)",
actor_state.config.performance.middleware_timeout_ms
)));
},
};
if let Some(mut additional_transaction) = middleware_result {
additional_transaction.commit()?;
let current_state = state_update
.as_ref()
.ok_or_else(|| {
error_utils::state_error(
"处理附加事务时状态为空".to_string(),
)
})?
.clone();
let task_result = actor_state
.flow_engine
.submit((current_state, additional_transaction))
.await;
let Some(ProcessorResult { result: Some(result), .. }) =
task_result.output
else {
return Err(error_utils::state_error(
"附加事务处理结果无效".to_string(),
));
};
*state_update = Some(result.state);
transactions.extend(result.transactions);
}
}
Ok(())
}
async fn record_transactions(
&self,
state_actor: &ActorRef<super::StateMessage>,
state: Arc<State>,
transactions: Vec<Arc<mf_state::Transaction>>,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
let (tx, rx) = oneshot::channel();
state_actor
.send_message(super::StateMessage::RecordTransactions {
state,
transactions,
description,
meta,
reply: tx,
})
.map_err(|e| {
error_utils::state_error(format!("发送记录事务消息失败: {e}"))
})?;
rx.await.map_err(|e| {
error_utils::state_error(format!("接收记录事务响应失败: {e}"))
})?
}
async fn emit_event(
&self,
event_bus: &ActorRef<super::EventBusMessage>,
event: Event,
) -> ForgeResult<()> {
event_bus
.send_message(super::EventBusMessage::PublishEvent { event })
.map_err(|e| {
error_utils::event_error(format!("发送事件消息失败: {e}"))
})?;
Ok(())
}
}
pub struct TransactionProcessorManager;
impl TransactionProcessorManager {
pub async fn start(
state_actor: ActorRef<super::StateMessage>,
event_bus: ActorRef<super::EventBusMessage>,
middleware_stack: MiddlewareStack,
flow_engine: Arc<FlowEngine>,
config: ForgeConfig,
) -> ActorSystemResult<ActorRef<TransactionMessage>> {
let (actor_ref, _handle) = Actor::spawn(
Some("TransactionProcessor".to_string()),
TransactionProcessorActor,
(state_actor, event_bus, middleware_stack, flow_engine, config),
)
.await
.map_err(|e| super::ActorSystemError::ActorStartupFailed {
actor_name: "TransactionProcessor".to_string(),
source: e,
})?;
debug!("事务处理Actor启动成功");
Ok(actor_ref)
}
}
#[cfg(test)]
mod tests {
#[tokio::test]
async fn test_transaction_processor_actor_creation() {
}
}