use ractor::ActorRef;
use std::sync::Arc;
use tokio::sync::oneshot;
use crate::{
config::ForgeConfig,
debug::debug,
extension_manager::ExtensionManager,
history_manager::HistoryManager,
runtime::sync_flow::FlowEngine,
types::{RuntimeOptions, HistoryEntryWithMeta},
};
use mf_state::state::State;
use super::{
event_bus::{EventBusActorManager, EventBusMessage},
extension_manager::{ExtensionManagerActorManager, ExtensionMessage},
state_actor::{StateActorManager, StateMessage},
transaction_processor::{TransactionProcessorManager, TransactionMessage},
ActorSystemError, ActorSystemResult,
};
#[derive(Debug, Clone)]
pub struct ActorSystemConfig {
pub system_name: String,
pub enable_supervision: bool,
pub shutdown_timeout_ms: u64,
pub enable_metrics: bool,
}
impl Default for ActorSystemConfig {
fn default() -> Self {
Self {
system_name: "ForgeActorSystem".to_string(),
enable_supervision: true,
shutdown_timeout_ms: 5000,
enable_metrics: true,
}
}
}
pub struct ForgeActorSystemHandle {
pub transaction_processor: ActorRef<TransactionMessage>,
pub state_actor: ActorRef<StateMessage>,
pub event_bus: ActorRef<EventBusMessage>,
pub extension_manager: ActorRef<ExtensionMessage>,
pub config: ActorSystemConfig,
}
pub struct ForgeActorSystem;
impl ForgeActorSystem {
pub async fn start(
runtime_options: RuntimeOptions,
forge_config: ForgeConfig,
system_config: ActorSystemConfig,
) -> ActorSystemResult<ForgeActorSystemHandle> {
debug!("启动ForgeActorSystem: {}", system_config.system_name);
let extension_manager =
Self::create_extension_manager(&runtime_options, &forge_config)?;
let extension_manager_actor =
ExtensionManagerActorManager::start(extension_manager).await?;
let (initial_state, history_manager) = Self::create_state_and_history(
&runtime_options,
&forge_config,
&extension_manager_actor,
)
.await?;
let state_actor =
StateActorManager::start(initial_state, history_manager).await?;
let event_bus =
EventBusActorManager::start(forge_config.event.clone()).await?;
if !runtime_options.get_event_handlers().is_empty() {
EventBusActorManager::add_handlers(
&event_bus,
runtime_options.get_event_handlers(),
)
.await
.map_err(|e| ActorSystemError::ConfigurationError {
message: format!("添加事件处理器失败: {e}"),
})?;
}
let flow_engine = Arc::new(FlowEngine::new().map_err(|e| {
ActorSystemError::ConfigurationError {
message: format!("创建流引擎失败: {e}"),
}
})?);
let transaction_processor = TransactionProcessorManager::start(
state_actor.clone(),
event_bus.clone(),
runtime_options.get_middleware_stack(),
flow_engine,
forge_config,
)
.await?;
debug!("ForgeActorSystem启动完成");
Ok(ForgeActorSystemHandle {
transaction_processor,
state_actor,
event_bus,
extension_manager: extension_manager_actor,
config: system_config,
})
}
pub async fn shutdown(
handle: ForgeActorSystemHandle
) -> ActorSystemResult<()> {
debug!("关闭ForgeActorSystem: {}", handle.config.system_name);
let shutdown_timeout = tokio::time::Duration::from_millis(
handle.config.shutdown_timeout_ms,
);
let _ = tokio::time::timeout(shutdown_timeout, async {
handle.transaction_processor.stop(None);
})
.await;
let _ = tokio::time::timeout(shutdown_timeout, async {
handle.event_bus.stop(None);
})
.await;
let _ = tokio::time::timeout(shutdown_timeout, async {
handle.state_actor.stop(None);
})
.await;
let _ = tokio::time::timeout(shutdown_timeout, async {
handle.extension_manager.stop(None);
})
.await;
debug!("ForgeActorSystem关闭完成");
Ok(())
}
fn create_extension_manager(
runtime_options: &RuntimeOptions,
forge_config: &ForgeConfig,
) -> ActorSystemResult<ExtensionManager> {
crate::helpers::runtime_common::ExtensionManagerHelper::create_extension_manager(
runtime_options,
forge_config,
)
.map_err(|e| ActorSystemError::ConfigurationError {
message: format!("创建扩展管理器失败: {e}"),
})
}
async fn create_state_and_history(
runtime_options: &RuntimeOptions,
forge_config: &ForgeConfig,
extension_manager_actor: &ActorRef<ExtensionMessage>,
) -> ActorSystemResult<(Arc<State>, HistoryManager<HistoryEntryWithMeta>)>
{
let (tx, rx) = oneshot::channel();
extension_manager_actor
.send_message(ExtensionMessage::GetSchema { reply: tx })
.map_err(|e| ActorSystemError::CommunicationFailed {
message: format!("获取Schema失败: {e}"),
})?;
let schema =
rx.await.map_err(|e| ActorSystemError::CommunicationFailed {
message: format!("接收Schema失败: {e}"),
})?;
let (tx, rx) = oneshot::channel();
extension_manager_actor
.send_message(ExtensionMessage::GetPlugins { reply: tx })
.map_err(|e| ActorSystemError::CommunicationFailed {
message: format!("获取插件失败: {e}"),
})?;
let plugins =
rx.await.map_err(|e| ActorSystemError::CommunicationFailed {
message: format!("接收插件失败: {e}"),
})?;
println!("获取插件: {:?}", plugins.len());
let (tx, rx) = oneshot::channel();
extension_manager_actor
.send_message(ExtensionMessage::GetOpFns { reply: tx })
.map_err(|e| ActorSystemError::CommunicationFailed {
message: format!("获取操作函数失败: {e}"),
})?;
let op_fns =
rx.await.map_err(|e| ActorSystemError::CommunicationFailed {
message: format!("接收操作函数失败: {e}"),
})?;
let op_state = mf_state::ops::GlobalResourceManager::new();
for op_fn in &op_fns {
op_fn(&op_state).map_err(|e| {
ActorSystemError::ConfigurationError {
message: format!("执行操作函数失败: {e}"),
}
})?;
}
let mut state_config = mf_state::state::StateConfig {
schema: Some(schema),
doc: None,
stored_marks: None,
plugins: Some(plugins),
resource_manager: Some(Arc::new(op_state)),
};
crate::helpers::create_doc::create_doc(
&runtime_options.get_content(),
&mut state_config,
)
.await
.map_err(|e| ActorSystemError::ConfigurationError {
message: format!("创建文档失败: {e}"),
})?;
let state = State::create(state_config).await.map_err(|e| {
ActorSystemError::ConfigurationError {
message: format!("创建状态失败: {e}"),
}
})?;
let state = Arc::new(state);
let initial_transaction = state.tr();
let history_manager = HistoryManager::with_config(
HistoryEntryWithMeta::new(
Arc::new(initial_transaction),
state.clone(),
"创建工程项目".to_string(),
serde_json::Value::Null,
),
forge_config.history.clone(),
);
Ok((state, history_manager))
}
}