mf_core/actors/
system.rs

1//! ForgeActorSystem - Actor系统管理器
2//!
3//! 负责协调所有Actor的生命周期和通信。
4
5use ractor::ActorRef;
6use std::sync::Arc;
7use tokio::sync::oneshot;
8
9use crate::{
10    config::ForgeConfig,
11    debug::debug,
12    extension_manager::ExtensionManager,
13    history_manager::HistoryManager,
14    runtime::sync_flow::FlowEngine,
15    types::{RuntimeOptions, HistoryEntryWithMeta},
16};
17
18use mf_state::state::State;
19
20use super::{
21    event_bus::{EventBusActorManager, EventBusMessage},
22    extension_manager::{ExtensionManagerActorManager, ExtensionMessage},
23    state_actor::{StateActorManager, StateMessage},
24    transaction_processor::{TransactionProcessorManager, TransactionMessage},
25    ActorSystemError, ActorSystemResult,
26};
27
28/// Actor系统配置
29#[derive(Debug, Clone)]
30pub struct ActorSystemConfig {
31    /// 系统名称
32    pub system_name: String,
33    /// 是否启用监督
34    pub enable_supervision: bool,
35    /// Actor关闭超时时间(毫秒)
36    pub shutdown_timeout_ms: u64,
37    /// 是否启用指标收集
38    pub enable_metrics: bool,
39}
40
41impl Default for ActorSystemConfig {
42    fn default() -> Self {
43        Self {
44            system_name: "ForgeActorSystem".to_string(),
45            enable_supervision: true,
46            shutdown_timeout_ms: 5000,
47            enable_metrics: true,
48        }
49    }
50}
51
52/// Actor系统句柄
53pub struct ForgeActorSystemHandle {
54    /// 事务处理Actor
55    pub transaction_processor: ActorRef<TransactionMessage>,
56    /// 状态管理Actor
57    pub state_actor: ActorRef<StateMessage>,
58    /// 事件总线Actor
59    pub event_bus: ActorRef<EventBusMessage>,
60    /// 扩展管理Actor
61    pub extension_manager: ActorRef<ExtensionMessage>,
62    /// 系统配置
63    pub config: ActorSystemConfig,
64}
65
66/// Forge Actor系统
67pub struct ForgeActorSystem;
68
69impl ForgeActorSystem {
70    /// 创建并启动完整的Actor系统
71    ///
72    /// # 参数
73    /// * `runtime_options` - 运行时选项
74    /// * `forge_config` - Forge配置
75    /// * `system_config` - Actor系统配置
76    ///
77    /// # 返回值
78    /// * `ActorSystemResult<ForgeActorSystemHandle>` - Actor系统句柄
79    pub async fn start(
80        runtime_options: RuntimeOptions,
81        forge_config: ForgeConfig,
82        system_config: ActorSystemConfig,
83    ) -> ActorSystemResult<ForgeActorSystemHandle> {
84        debug!("启动ForgeActorSystem: {}", system_config.system_name);
85
86        // 1. 创建扩展管理器
87        let extension_manager =
88            Self::create_extension_manager(&runtime_options, &forge_config)?;
89        let extension_manager_actor =
90            ExtensionManagerActorManager::start(extension_manager).await?;
91
92        // 2. 创建初始状态和历史管理器
93        let (initial_state, history_manager) = Self::create_state_and_history(
94            &runtime_options,
95            &forge_config,
96            &extension_manager_actor,
97        )
98        .await?;
99
100        // 3. 启动状态Actor
101        let state_actor =
102            StateActorManager::start(initial_state, history_manager).await?;
103
104        // 4. 启动事件总线Actor
105        let event_bus =
106            EventBusActorManager::start(forge_config.event.clone()).await?;
107
108        // 5. 设置事件处理器
109        if !runtime_options.get_event_handlers().is_empty() {
110            EventBusActorManager::add_handlers(
111                &event_bus,
112                runtime_options.get_event_handlers(),
113            )
114            .await
115            .map_err(|e| ActorSystemError::ConfigurationError {
116                message: format!("添加事件处理器失败: {e}"),
117            })?;
118        }
119
120        // 6. 创建流引擎
121        let flow_engine = Arc::new(FlowEngine::new().map_err(|e| {
122            ActorSystemError::ConfigurationError {
123                message: format!("创建流引擎失败: {e}"),
124            }
125        })?);
126
127        // 7. 启动事务处理Actor
128        let transaction_processor = TransactionProcessorManager::start(
129            state_actor.clone(),
130            event_bus.clone(),
131            runtime_options.get_middleware_stack(),
132            flow_engine,
133            forge_config,
134        )
135        .await?;
136
137        debug!("ForgeActorSystem启动完成");
138
139        Ok(ForgeActorSystemHandle {
140            transaction_processor,
141            state_actor,
142            event_bus,
143            extension_manager: extension_manager_actor,
144            config: system_config,
145        })
146    }
147
148    /// 优雅关闭Actor系统
149    ///
150    /// # 参数
151    /// * `handle` - Actor系统句柄
152    ///
153    /// # 返回值
154    /// * `ActorSystemResult<()>` - 关闭结果
155    pub async fn shutdown(
156        handle: ForgeActorSystemHandle
157    ) -> ActorSystemResult<()> {
158        debug!("关闭ForgeActorSystem: {}", handle.config.system_name);
159
160        let shutdown_timeout = tokio::time::Duration::from_millis(
161            handle.config.shutdown_timeout_ms,
162        );
163
164        // 按依赖关系顺序关闭Actor
165        // 1. 首先关闭事务处理器(停止接受新事务)
166        let _ = tokio::time::timeout(shutdown_timeout, async {
167            handle.transaction_processor.stop(None);
168        })
169        .await;
170
171        // 2. 关闭事件总线
172        let _ = tokio::time::timeout(shutdown_timeout, async {
173            handle.event_bus.stop(None);
174        })
175        .await;
176
177        // 3. 关闭状态Actor
178        let _ = tokio::time::timeout(shutdown_timeout, async {
179            handle.state_actor.stop(None);
180        })
181        .await;
182
183        // 4. 最后关闭扩展管理器
184        let _ = tokio::time::timeout(shutdown_timeout, async {
185            handle.extension_manager.stop(None);
186        })
187        .await;
188
189        debug!("ForgeActorSystem关闭完成");
190        Ok(())
191    }
192
193    /// 创建扩展管理器 - 自动处理XML schema配置并合并代码扩展
194    fn create_extension_manager(
195        runtime_options: &RuntimeOptions,
196        forge_config: &ForgeConfig,
197    ) -> ActorSystemResult<ExtensionManager> {
198        crate::helpers::runtime_common::ExtensionManagerHelper::create_extension_manager(
199            runtime_options,
200            forge_config,
201        )
202        .map_err(|e| ActorSystemError::ConfigurationError {
203            message: format!("创建扩展管理器失败: {e}"),
204        })
205    }
206
207    /// 创建初始状态和历史管理器
208    async fn create_state_and_history(
209        runtime_options: &RuntimeOptions,
210        forge_config: &ForgeConfig,
211        extension_manager_actor: &ActorRef<ExtensionMessage>,
212    ) -> ActorSystemResult<(Arc<State>, HistoryManager<HistoryEntryWithMeta>)>
213    {
214        // 获取Schema
215        let (tx, rx) = oneshot::channel();
216        extension_manager_actor
217            .send_message(ExtensionMessage::GetSchema { reply: tx })
218            .map_err(|e| ActorSystemError::CommunicationFailed {
219                message: format!("获取Schema失败: {e}"),
220            })?;
221
222        let schema =
223            rx.await.map_err(|e| ActorSystemError::CommunicationFailed {
224                message: format!("接收Schema失败: {e}"),
225            })?;
226
227        // 获取插件
228        let (tx, rx) = oneshot::channel();
229        extension_manager_actor
230            .send_message(ExtensionMessage::GetPlugins { reply: tx })
231            .map_err(|e| ActorSystemError::CommunicationFailed {
232                message: format!("获取插件失败: {e}"),
233            })?;
234
235        let plugins =
236            rx.await.map_err(|e| ActorSystemError::CommunicationFailed {
237                message: format!("接收插件失败: {e}"),
238            })?;
239        println!("获取插件: {:?}", plugins.len());
240        // 获取操作函数
241        let (tx, rx) = oneshot::channel();
242        extension_manager_actor
243            .send_message(ExtensionMessage::GetOpFns { reply: tx })
244            .map_err(|e| ActorSystemError::CommunicationFailed {
245                message: format!("获取操作函数失败: {e}"),
246            })?;
247
248        let op_fns =
249            rx.await.map_err(|e| ActorSystemError::CommunicationFailed {
250                message: format!("接收操作函数失败: {e}"),
251            })?;
252
253        // 创建全局资源管理器
254        let op_state = mf_state::ops::GlobalResourceManager::new();
255        for op_fn in &op_fns {
256            op_fn(&op_state).map_err(|e| {
257                ActorSystemError::ConfigurationError {
258                    message: format!("执行操作函数失败: {e}"),
259                }
260            })?;
261        }
262
263        // 创建状态配置
264        let mut state_config = mf_state::state::StateConfig {
265            schema: Some(schema),
266            doc: None,
267            stored_marks: None,
268            plugins: Some(plugins),
269            resource_manager: Some(Arc::new(op_state)),
270        };
271
272        // 创建文档
273        crate::helpers::create_doc::create_doc(
274            &runtime_options.get_content(),
275            &mut state_config,
276        )
277        .await
278        .map_err(|e| ActorSystemError::ConfigurationError {
279            message: format!("创建文档失败: {e}"),
280        })?;
281
282        // 创建状态
283        let state = State::create(state_config).await.map_err(|e| {
284            ActorSystemError::ConfigurationError {
285                message: format!("创建状态失败: {e}"),
286            }
287        })?;
288
289        let state = Arc::new(state);
290
291        // 创建历史管理器
292        let history_manager = HistoryManager::with_config(
293            HistoryEntryWithMeta::new(
294                state.clone(),
295                "创建工程项目".to_string(),
296                serde_json::Value::Null,
297            ),
298            forge_config.history.clone(),
299        );
300
301        Ok((state, history_manager))
302    }
303}