use std::{
ops::{Deref, DerefMut},
sync::Arc,
time::Duration,
};
use async_trait::async_trait;
use crate::runtime::runtime::ForgeRuntime;
use crate::runtime::runtime_trait::RuntimeTrait;
use crate::types::ProcessorResult;
use crate::{
config::{ForgeConfig, PerformanceConfig},
debug::debug,
error::error_utils,
event::Event,
runtime::async_flow::{FlowEngine},
types::RuntimeOptions,
ForgeResult,
};
use mf_model::schema::Schema;
use mf_state::{
state::TransactionResult,
transaction::{Command, Transaction},
State,
};
pub struct ForgeAsyncRuntime {
base: ForgeRuntime,
flow_engine: FlowEngine,
}
impl Deref for ForgeAsyncRuntime {
type Target = ForgeRuntime;
fn deref(&self) -> &Self::Target {
&self.base
}
}
impl DerefMut for ForgeAsyncRuntime {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.base
}
}
impl ForgeAsyncRuntime {
#[cfg_attr(
feature = "dev-tracing",
tracing::instrument(
skip(options),
fields(crate_name = "core", runtime_type = "async")
)
)]
pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
Self::create_with_config(options, ForgeConfig::default()).await
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
crate_name = "core",
schema_path = xml_schema_path,
runtime_type = "async"
)))]
pub async fn from_xml_schema_path(
xml_schema_path: &str,
options: Option<RuntimeOptions>,
config: Option<ForgeConfig>,
) -> ForgeResult<Self> {
let mut config = config.unwrap_or_default();
config.extension.xml_schema_paths = vec![xml_schema_path.to_string()];
Self::create_with_config(options.unwrap_or_default(), config).await
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_schema_paths, options, config), fields(
crate_name = "core",
schema_count = xml_schema_paths.len(),
runtime_type = "async"
)))]
pub async fn from_xml_schemas(
xml_schema_paths: &[&str],
options: Option<RuntimeOptions>,
config: Option<ForgeConfig>,
) -> ForgeResult<Self> {
let mut config = config.unwrap_or_default();
config.extension.xml_schema_paths =
xml_schema_paths.iter().map(|s| s.to_string()).collect();
Self::create_with_config(options.unwrap_or_default(), config).await
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_content, options, config), fields(
crate_name = "core",
content_size = xml_content.len(),
runtime_type = "async"
)))]
pub async fn from_xml_content(
xml_content: &str,
options: Option<RuntimeOptions>,
config: Option<ForgeConfig>,
) -> ForgeResult<Self> {
let base = ForgeRuntime::from_xml_content(xml_content, options, config)
.await?;
Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
crate_name = "core",
runtime_type = "async",
has_middleware = !options.get_middleware_stack().is_empty()
)))]
pub async fn create_with_config(
options: RuntimeOptions,
config: ForgeConfig,
) -> ForgeResult<Self> {
let base = ForgeRuntime::create_with_config(options, config).await?;
Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
}
pub fn set_performance_config(
&mut self,
perf_config: PerformanceConfig,
) {
self.base.update_config({
let mut config = self.base.get_config().clone();
config.performance = perf_config;
config
});
}
pub fn get_config(&self) -> &ForgeConfig {
self.base.get_config()
}
pub fn update_config(
&mut self,
config: ForgeConfig,
) {
self.base.update_config(config);
}
fn log_performance(
&self,
operation: &str,
duration: Duration,
) {
if self.base.get_config().performance.enable_monitoring
&& duration.as_millis()
> self.base.get_config().performance.log_threshold_ms as u128
{
debug!("{} 耗时: {}ms", operation, duration.as_millis());
}
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command), fields(
crate_name = "core",
command_name = %command.name(),
runtime_type = "async"
)))]
pub async fn command(
&mut self,
command: Arc<dyn Command>,
) -> ForgeResult<()> {
self.command_with_meta(command, "".to_string(), serde_json::Value::Null)
.await
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command, meta), fields(
crate_name = "core",
command_name = %command.name(),
description = %description,
runtime_type = "async"
)))]
pub async fn command_with_meta(
&mut self,
command: Arc<dyn Command>,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
let cmd_name = command.name();
debug!("正在执行命令: {}", cmd_name);
let mut tr = self.base.get_tr();
command.execute(&mut tr).await?;
tr.commit()?;
match self.dispatch_flow_with_meta(tr, description, meta).await {
Ok(_) => {
debug!("命令 '{}' 执行成功", cmd_name);
Ok(())
},
Err(e) => {
debug!("命令 '{}' 执行失败: {}", cmd_name, e);
Err(e)
},
}
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
crate_name = "core",
tr_id = %transaction.id,
runtime_type = "async"
)))]
pub async fn dispatch_flow(
&mut self,
transaction: Transaction,
) -> ForgeResult<()> {
self.dispatch_flow_with_meta(
transaction,
"".to_string(),
serde_json::Value::Null,
)
.await
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction, meta), fields(
crate_name = "core",
tr_id = %transaction.id,
description = %description,
runtime_type = "async"
)))]
pub async fn dispatch_flow_with_meta(
&mut self,
transaction: Transaction,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
let start_time = std::time::Instant::now();
let mut current_transaction = transaction;
let _old_id = self.get_state().await?.version;
let middleware_start = std::time::Instant::now();
self.run_before_middleware(&mut current_transaction).await?;
self.log_performance("前置中间件处理", middleware_start.elapsed());
let (_id, mut rx) = self
.flow_engine
.submit_transaction((
self.base.get_state().clone(),
current_transaction,
))
.await?;
let recv_start = std::time::Instant::now();
let task_receive_timeout = Duration::from_millis(
self.base.get_config().performance.task_receive_timeout_ms,
);
let task_result =
match tokio::time::timeout(task_receive_timeout, rx.recv()).await {
Ok(Some(result)) => result,
Ok(None) => {
return Err(error_utils::state_error(
"任务接收通道已关闭".to_string(),
));
},
Err(_) => {
return Err(error_utils::state_error(format!(
"任务接收超时({}ms)",
self.base
.get_config()
.performance
.task_receive_timeout_ms
)));
},
};
self.log_performance("接收任务结果", recv_start.elapsed());
let Some(ProcessorResult { result: Some(result), .. }) =
task_result.output
else {
return Err(error_utils::state_error(
"任务处理结果无效".to_string(),
));
};
let mut current_state = None;
let mut transactions = Vec::new();
transactions.extend(result.transactions);
if transactions.last().is_some() {
current_state = Some(result.state);
}
let after_start = std::time::Instant::now();
self.run_after_middleware(&mut current_state, &mut transactions)
.await?;
self.log_performance("后置中间件处理", after_start.elapsed());
if let Some(new_state) = current_state {
let old_state = self.base.get_state().clone();
self.base
.update_state_with_meta(
new_state.clone(),
transactions.clone(),
description,
meta,
)
.await?;
let event_start = std::time::Instant::now();
self.base
.emit_event(Event::TrApply {
old_state,
new_state,
transactions,
})
.await?;
self.log_performance("事件广播", event_start.elapsed());
}
self.log_performance("事务处理总耗时", start_time.elapsed());
Ok(())
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
crate_name = "core",
tr_id = %transaction.id,
middleware_count = self.base.get_options().get_middleware_stack().middlewares.len(),
runtime_type = "async"
)))]
pub async fn run_before_middleware(
&mut self,
transaction: &mut Transaction,
) -> ForgeResult<()> {
use crate::helpers::middleware_helper::MiddlewareHelper;
MiddlewareHelper::run_before_middleware(
transaction,
&self.base.get_options().get_middleware_stack(),
self.base.get_config(),
)
.await?;
transaction.commit()?;
Ok(())
}
pub async fn run_after_middleware(
&mut self,
state: &mut Option<Arc<State>>,
transactions: &mut Vec<Arc<Transaction>>,
) -> ForgeResult<()> {
debug!("执行后置中间件链");
for middleware in
&self.base.get_options().get_middleware_stack().middlewares
{
let timeout = std::time::Duration::from_millis(
self.base.get_config().performance.middleware_timeout_ms,
);
let start_time = std::time::Instant::now();
let middleware_result = match tokio::time::timeout(
timeout,
middleware.after_dispatch(state.clone(), transactions),
)
.await
{
Ok(result) => match result {
Ok(r) => r,
Err(e) => {
debug!("中间件执行失败: {}", e);
return Err(error_utils::middleware_error(format!(
"中间件执行失败: {e}"
)));
},
},
Err(e) => {
debug!("中间件执行超时: {}", e);
return Err(error_utils::middleware_error(format!(
"中间件执行超时: {e}"
)));
},
};
let elapsed = start_time.elapsed();
if elapsed.as_millis() > 100 {
debug!("中间件执行时间较长: {}ms", elapsed.as_millis());
}
if let Some(mut transaction) = middleware_result {
transaction.commit()?;
let tx_start_time = std::time::Instant::now();
let result = match self
.flow_engine
.submit_transaction((
self.base.get_state().clone(),
transaction,
))
.await
{
Ok(result) => result,
Err(e) => {
debug!("附加事务提交失败: {}", e);
return Err(error_utils::state_error(format!(
"附加事务提交失败: {e}"
)));
},
};
let (_id, mut rx) = result;
let task_receive_timeout = Duration::from_millis(
self.base.get_config().performance.task_receive_timeout_ms,
);
let task_result =
match tokio::time::timeout(task_receive_timeout, rx.recv())
.await
{
Ok(Some(result)) => result,
Ok(None) => {
debug!("附加事务接收通道已关闭");
return Ok(());
},
Err(_) => {
debug!("附加事务接收超时");
return Err(error_utils::state_error(format!(
"附加事务接收超时({}ms)",
self.base
.get_config()
.performance
.task_receive_timeout_ms
)));
},
};
let Some(ProcessorResult { result: Some(result), .. }) =
task_result.output
else {
debug!("附加事务处理结果无效");
return Ok(());
};
let TransactionResult { state: new_state, transactions: trs } =
result;
*state = Some(new_state);
transactions.extend(trs);
let tx_elapsed = tx_start_time.elapsed();
if tx_elapsed.as_millis() > 50 {
debug!(
"附加事务处理时间较长: {}ms",
tx_elapsed.as_millis()
);
}
}
}
Ok(())
}
#[cfg_attr(
feature = "dev-tracing",
tracing::instrument(
skip(self),
fields(crate_name = "core", runtime_type = "async")
)
)]
pub async fn shutdown(&mut self) -> ForgeResult<()> {
debug!("开始关闭异步运行时");
self.base.destroy().await?;
debug!("正在关闭流引擎...");
debug!("异步运行时已成功关闭");
Ok(())
}
}
#[async_trait]
impl RuntimeTrait for ForgeAsyncRuntime {
async fn dispatch(
&mut self,
transaction: Transaction,
) -> ForgeResult<()> {
self.dispatch_flow(transaction).await
}
async fn dispatch_with_meta(
&mut self,
transaction: Transaction,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
self.dispatch_flow_with_meta(transaction, description, meta).await
}
async fn command(
&mut self,
command: Arc<dyn Command>,
) -> ForgeResult<()> {
self.command(command).await
}
async fn command_with_meta(
&mut self,
command: Arc<dyn Command>,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
self.command_with_meta(command, description, meta).await
}
async fn get_state(&self) -> ForgeResult<Arc<State>> {
Ok(self.base.get_state().clone())
}
async fn get_tr(&self) -> ForgeResult<Transaction> {
Ok(self.base.get_tr())
}
async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
Ok(self.base.get_schema())
}
async fn undo(&mut self) -> ForgeResult<()> {
self.base.undo();
Ok(())
}
async fn redo(&mut self) -> ForgeResult<()> {
self.base.redo();
Ok(())
}
async fn jump(
&mut self,
steps: isize,
) -> ForgeResult<()> {
self.base.jump(steps);
Ok(())
}
fn get_config(&self) -> &ForgeConfig {
self.base.get_config()
}
fn update_config(
&mut self,
config: ForgeConfig,
) {
self.base.update_config(config);
}
fn get_options(&self) -> &RuntimeOptions {
self.base.get_options()
}
async fn destroy(&mut self) -> ForgeResult<()> {
self.shutdown().await
}
}