use std::sync::Arc;
use std::time::Instant;
use async_trait::async_trait;
use tokio::sync::oneshot;
use crate::{
actors::{
system::{ForgeActorSystem, ForgeActorSystemHandle, ActorSystemConfig},
transaction_processor::TransactionMessage,
state_actor::StateMessage,
event_bus::EventBusMessage,
},
config::ForgeConfig,
debug::debug,
error::{error_utils, ForgeResult},
event::Event,
runtime::runtime_trait::RuntimeTrait,
types::RuntimeOptions,
metrics,
};
use mf_model::schema::Schema;
use mf_state::{
state::State,
transaction::{Command, Transaction},
};
pub struct ForgeActorRuntime {
actor_system: Option<ForgeActorSystemHandle>,
config: ForgeConfig,
started: bool,
}
impl ForgeActorRuntime {
fn actor_system(&self) -> ForgeResult<&ForgeActorSystemHandle> {
self.actor_system.as_ref().ok_or_else(|| {
error_utils::engine_error("Actor系统未初始化".to_string())
})
}
#[cfg_attr(
feature = "dev-tracing",
tracing::instrument(
skip(options),
fields(crate_name = "core", runtime_type = "actor")
)
)]
pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
Self::create_with_config(options, ForgeConfig::default()).await
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
crate_name = "core",
runtime_type = "actor",
has_middleware = !options.get_middleware_stack().is_empty()
)))]
pub async fn create_with_config(
options: RuntimeOptions,
config: ForgeConfig,
) -> ForgeResult<Self> {
let start_time = Instant::now();
debug!("正在创建Actor运行时实例");
let actor_system = ForgeActorSystem::start(
options,
config.clone(),
ActorSystemConfig::default(),
)
.await
.map_err(|e| {
error_utils::engine_error(format!("启动Actor系统失败: {e}"))
})?;
debug!("Actor运行时实例创建成功");
metrics::editor_creation_duration(start_time.elapsed());
Ok(ForgeActorRuntime {
actor_system: Some(actor_system),
config,
started: true,
})
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
crate_name = "core",
tr_id = %transaction.id,
runtime_type = "actor"
)))]
pub async fn dispatch(
&mut self,
transaction: Transaction,
) -> ForgeResult<()> {
self.dispatch_with_meta(
transaction,
"".to_string(),
serde_json::Value::Null,
)
.await
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction, meta), fields(
crate_name = "core",
tr_id = %transaction.id,
description = %description,
runtime_type = "actor"
)))]
pub async fn dispatch_with_meta(
&mut self,
transaction: Transaction,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
if !self.started {
return Err(error_utils::engine_error("运行时未启动".to_string()));
}
let (tx, rx) = oneshot::channel();
self.actor_system()?
.transaction_processor
.send_message(TransactionMessage::ProcessTransaction {
transaction,
description,
meta,
reply: tx,
})
.map_err(|e| {
error_utils::engine_error(format!("发送事务消息失败: {e}"))
})?;
rx.await.map_err(|e| {
error_utils::engine_error(format!("等待事务处理结果失败: {e}"))
})?
}
#[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command), fields(
crate_name = "core",
command_name = %command.name(),
runtime_type = "actor"
)))]
pub async fn command(
&mut self,
command: Arc<dyn Command>,
) -> ForgeResult<()> {
debug!("正在执行命令: {}", command.name());
metrics::command_executed(command.name().as_str());
let mut tr = self.get_tr().await?;
command.execute(&mut tr).await?;
tr.commit()?;
self.dispatch(tr).await
}
pub async fn command_with_meta(
&mut self,
command: Arc<dyn Command>,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
debug!("正在执行命令: {}", command.name());
metrics::command_executed(command.name().as_str());
let mut tr = self.get_tr().await?;
command.execute(&mut tr).await?;
tr.commit()?;
self.dispatch_with_meta(tr, description, meta).await
}
pub async fn get_state(&self) -> ForgeResult<Arc<State>> {
let (tx, rx) = oneshot::channel();
self.actor_system()?
.state_actor
.send_message(StateMessage::GetState { reply: tx })
.map_err(|e| {
error_utils::state_error(format!("发送获取状态消息失败: {e}"))
})?;
rx.await.map_err(|e| {
error_utils::state_error(format!("接收状态响应失败: {e}"))
})
}
pub async fn get_tr(&self) -> ForgeResult<Transaction> {
let state = self.get_state().await?;
Ok(state.tr())
}
pub async fn undo(&mut self) -> ForgeResult<()> {
let (tx, rx) = oneshot::channel();
self.actor_system()?
.state_actor
.send_message(StateMessage::Undo { reply: tx })
.map_err(|e| {
error_utils::state_error(format!("发送撤销消息失败: {e}"))
})?;
rx.await
.map_err(|e| {
error_utils::state_error(format!("接收撤销响应失败: {e}"))
})?
.map(|_| ())
}
pub async fn redo(&mut self) -> ForgeResult<()> {
let (tx, rx) = oneshot::channel();
self.actor_system()?
.state_actor
.send_message(StateMessage::Redo { reply: tx })
.map_err(|e| {
error_utils::state_error(format!("发送重做消息失败: {e}"))
})?;
rx.await
.map_err(|e| {
error_utils::state_error(format!("接收重做响应失败: {e}"))
})?
.map(|_| ())
}
pub async fn jump(
&mut self,
steps: isize,
) -> ForgeResult<()> {
let (tx, rx) = oneshot::channel();
self.actor_system()?
.state_actor
.send_message(StateMessage::Jump { steps, reply: tx })
.map_err(|e| {
error_utils::state_error(format!("发送跳转消息失败: {e}"))
})?;
rx.await
.map_err(|e| {
error_utils::state_error(format!("接收跳转响应失败: {e}"))
})?
.map(|_| ())
}
pub async fn emit_event(
&mut self,
event: Event,
) -> ForgeResult<()> {
metrics::event_emitted(event.name());
self.actor_system()?
.event_bus
.send_message(EventBusMessage::PublishEvent { event })
.map_err(|e| {
error_utils::event_error(format!("发送事件消息失败: {e}"))
})?;
Ok(())
}
pub fn get_config(&self) -> &ForgeConfig {
&self.config
}
pub fn update_config(
&mut self,
config: ForgeConfig,
) {
self.config = config;
}
#[cfg_attr(
feature = "dev-tracing",
tracing::instrument(
skip(self),
fields(crate_name = "core", runtime_type = "actor")
)
)]
pub async fn destroy(&mut self) -> ForgeResult<()> {
debug!("正在销毁Actor运行时实例");
if self.started {
let _ = self.emit_event(Event::Destroy).await;
if let Some(actor_system) = self.actor_system.take() {
ForgeActorSystem::shutdown(actor_system).await.map_err(
|e| {
error_utils::engine_error(format!(
"关闭Actor系统失败: {e}"
))
},
)?;
}
self.started = false;
}
debug!("Actor运行时实例销毁成功");
Ok(())
}
pub fn is_started(&self) -> bool {
self.started
}
pub async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
let state = self.get_state().await?;
Ok(state.schema())
}
pub fn get_options(&self) -> RuntimeOptions {
RuntimeOptions::default()
}
}
impl Drop for ForgeActorRuntime {
fn drop(&mut self) {
if self.started {
debug!("ForgeActorRuntime Drop: 检测到未正确关闭的运行时");
}
}
}
#[async_trait]
impl RuntimeTrait for ForgeActorRuntime {
async fn dispatch(
&mut self,
transaction: Transaction,
) -> ForgeResult<()> {
self.dispatch(transaction).await
}
async fn dispatch_with_meta(
&mut self,
transaction: Transaction,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
self.dispatch_with_meta(transaction, description, meta).await
}
async fn command(
&mut self,
command: Arc<dyn Command>,
) -> ForgeResult<()> {
self.command(command).await
}
async fn command_with_meta(
&mut self,
command: Arc<dyn Command>,
description: String,
meta: serde_json::Value,
) -> ForgeResult<()> {
self.command_with_meta(command, description, meta).await
}
async fn get_state(&self) -> ForgeResult<Arc<State>> {
self.get_state().await
}
async fn get_tr(&self) -> ForgeResult<Transaction> {
self.get_tr().await
}
async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
self.get_schema().await
}
async fn undo(&mut self) -> ForgeResult<()> {
self.undo().await
}
async fn redo(&mut self) -> ForgeResult<()> {
self.redo().await
}
async fn jump(
&mut self,
steps: isize,
) -> ForgeResult<()> {
self.jump(steps).await
}
fn get_config(&self) -> &ForgeConfig {
self.get_config()
}
fn update_config(
&mut self,
config: ForgeConfig,
) {
self.update_config(config);
}
fn get_options(&self) -> &RuntimeOptions {
thread_local! {
static DEFAULT_OPTIONS: RuntimeOptions = RuntimeOptions::default();
}
DEFAULT_OPTIONS.with(|opts| unsafe {
std::mem::transmute::<&RuntimeOptions, &'static RuntimeOptions>(
opts,
)
})
}
async fn destroy(&mut self) -> ForgeResult<()> {
self.destroy().await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_actor_runtime_creation() {
let options = RuntimeOptions::default();
let result = ForgeActorRuntime::create(options).await;
assert!(result.is_ok() || result.is_err()); }
#[tokio::test]
async fn test_actor_runtime_api_compatibility() {
let options = RuntimeOptions::default();
if let Ok(mut runtime) = ForgeActorRuntime::create(options).await {
let _ = runtime.get_config();
let _ = runtime.is_started();
let _ = runtime.destroy().await;
}
}
}