mf_core/runtime/
actor_runtime.rs

1//! Actor运行时 - 提供与现有API兼容的Actor实现
2//!
3//! 这个模块作为新Actor系统的Facade,保持与现有ForgeRuntime API的完全兼容性。
4
5use std::sync::Arc;
6use std::time::Instant;
7use async_trait::async_trait;
8use tokio::sync::oneshot;
9
10use crate::{
11    actors::{
12        system::{ForgeActorSystem, ForgeActorSystemHandle, ActorSystemConfig},
13        transaction_processor::TransactionMessage,
14        state_actor::StateMessage,
15        event_bus::EventBusMessage,
16    },
17    config::ForgeConfig,
18    debug::debug,
19    error::{error_utils, ForgeResult},
20    event::Event,
21    runtime::runtime_trait::RuntimeTrait,
22    types::RuntimeOptions,
23    metrics,
24};
25
26use mf_model::schema::Schema;
27use mf_state::{
28    state::State,
29    transaction::{Command, Transaction},
30};
31
32/// Actor运行时 - 新的基于Actor的实现
33///
34/// 提供与原始ForgeRuntime完全相同的API,但内部使用Actor系统实现。
35/// 这确保了现有代码无需修改即可使用新的架构。
36pub struct ForgeActorRuntime {
37    /// Actor系统句柄
38    actor_system: Option<ForgeActorSystemHandle>,
39    /// 配置
40    config: ForgeConfig,
41    /// 是否已启动
42    started: bool,
43}
44
45impl ForgeActorRuntime {
46    /// 获取Actor系统句柄引用
47    fn actor_system(&self) -> ForgeResult<&ForgeActorSystemHandle> {
48        self.actor_system.as_ref().ok_or_else(|| {
49            error_utils::engine_error("Actor系统未初始化".to_string())
50        })
51    }
52
53    /// 创建新的Actor运行时实例
54    ///
55    /// # 参数
56    /// * `options` - 运行时选项
57    ///
58    /// # 返回值
59    /// * `ForgeResult<Self>` - Actor运行时实例或错误
60    pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
61        Self::create_with_config(options, ForgeConfig::default()).await
62    }
63
64    /// 使用指定配置创建Actor运行时实例
65    ///
66    /// # 参数
67    /// * `options` - 运行时选项
68    /// * `config` - Forge配置
69    ///
70    /// # 返回值
71    /// * `ForgeResult<Self>` - Actor运行时实例或错误
72    pub async fn create_with_config(
73        options: RuntimeOptions,
74        config: ForgeConfig,
75    ) -> ForgeResult<Self> {
76        let start_time = Instant::now();
77        debug!("正在创建Actor运行时实例");
78
79        // 启动Actor系统
80        let actor_system = ForgeActorSystem::start(
81            options,
82            config.clone(),
83            ActorSystemConfig::default(),
84        )
85        .await
86        .map_err(|e| {
87            error_utils::engine_error(format!("启动Actor系统失败: {e}"))
88        })?;
89
90        debug!("Actor运行时实例创建成功");
91        metrics::editor_creation_duration(start_time.elapsed());
92
93        Ok(ForgeActorRuntime {
94            actor_system: Some(actor_system),
95            config,
96            started: true,
97        })
98    }
99
100    /// 🎯 处理事务 - 与原始dispatch完全相同的API
101    ///
102    /// 保持与runtime.rs:662-672行完全相同的接口
103    pub async fn dispatch(
104        &mut self,
105        transaction: Transaction,
106    ) -> ForgeResult<()> {
107        self.dispatch_with_meta(
108            transaction,
109            "".to_string(),
110            serde_json::Value::Null,
111        )
112        .await
113    }
114
115    /// 🎯 处理事务(包含元信息)- 与原始dispatch_with_meta完全相同的API
116    ///
117    /// 保持与runtime.rs:674-721行完全相同的接口和语义
118    pub async fn dispatch_with_meta(
119        &mut self,
120        transaction: Transaction,
121        description: String,
122        meta: serde_json::Value,
123    ) -> ForgeResult<()> {
124        if !self.started {
125            return Err(error_utils::engine_error("运行时未启动".to_string()));
126        }
127
128        // 通过Actor系统处理事务,但保持完全相同的语义
129        let (tx, rx) = oneshot::channel();
130
131        self.actor_system()?
132            .transaction_processor
133            .send_message(TransactionMessage::ProcessTransaction {
134                transaction,
135                description,
136                meta,
137                reply: tx,
138            })
139            .map_err(|e| {
140                error_utils::engine_error(format!("发送事务消息失败: {e}"))
141            })?;
142
143        rx.await.map_err(|e| {
144            error_utils::engine_error(format!("等待事务处理结果失败: {e}"))
145        })?
146    }
147
148    /// 🎯 执行命令 - 与原始command完全相同的API
149    ///
150    /// 保持与runtime.rs:629-639行完全相同的接口
151    pub async fn command(
152        &mut self,
153        command: Arc<dyn Command>,
154    ) -> ForgeResult<()> {
155        debug!("正在执行命令: {}", command.name());
156        metrics::command_executed(command.name().as_str());
157
158        let mut tr = self.get_tr().await?;
159        command.execute(&mut tr).await?;
160        tr.commit()?;
161        self.dispatch(tr).await
162    }
163
164    /// 🎯 执行命令(包含元信息)- 与原始command_with_meta完全相同的API
165    ///
166    /// 保持与runtime.rs:641-653行完全相同的接口
167    pub async fn command_with_meta(
168        &mut self,
169        command: Arc<dyn Command>,
170        description: String,
171        meta: serde_json::Value,
172    ) -> ForgeResult<()> {
173        debug!("正在执行命令: {}", command.name());
174        metrics::command_executed(command.name().as_str());
175
176        let mut tr = self.get_tr().await?;
177        command.execute(&mut tr).await?;
178        tr.commit()?;
179        self.dispatch_with_meta(tr, description, meta).await
180    }
181
182    /// 🎯 获取当前状态 - 与原始get_state完全相同的API
183    ///
184    /// 保持与runtime.rs:821-823行完全相同的接口
185    pub async fn get_state(&self) -> ForgeResult<Arc<State>> {
186        let (tx, rx) = oneshot::channel();
187
188        self.actor_system()?
189            .state_actor
190            .send_message(StateMessage::GetState { reply: tx })
191            .map_err(|e| {
192                error_utils::state_error(format!("发送获取状态消息失败: {e}"))
193            })?;
194
195        rx.await.map_err(|e| {
196            error_utils::state_error(format!("接收状态响应失败: {e}"))
197        })
198    }
199
200    /// 🎯 获取事务对象 - 与原始get_tr完全相同的API
201    ///
202    /// 保持与runtime.rs:833-836行完全相同的接口
203    pub async fn get_tr(&self) -> ForgeResult<Transaction> {
204        let state = self.get_state().await?;
205        Ok(state.tr())
206    }
207
208    /// 🎯 撤销操作 - 与原始undo完全相同的API
209    ///
210    /// 保持与runtime.rs:838-842行完全相同的接口
211    pub async fn undo(&mut self) -> ForgeResult<()> {
212        let (tx, rx) = oneshot::channel();
213
214        self.actor_system()?
215            .state_actor
216            .send_message(StateMessage::Undo { reply: tx })
217            .map_err(|e| {
218                error_utils::state_error(format!("发送撤销消息失败: {e}"))
219            })?;
220
221        rx.await
222            .map_err(|e| {
223                error_utils::state_error(format!("接收撤销响应失败: {e}"))
224            })?
225            .map(|_| ())
226    }
227
228    /// 🎯 重做操作 - 与原始redo完全相同的API
229    ///
230    /// 保持与runtime.rs:844-848行完全相同的接口
231    pub async fn redo(&mut self) -> ForgeResult<()> {
232        let (tx, rx) = oneshot::channel();
233
234        self.actor_system()?
235            .state_actor
236            .send_message(StateMessage::Redo { reply: tx })
237            .map_err(|e| {
238                error_utils::state_error(format!("发送重做消息失败: {e}"))
239            })?;
240
241        rx.await
242            .map_err(|e| {
243                error_utils::state_error(format!("接收重做响应失败: {e}"))
244            })?
245            .map(|_| ())
246    }
247
248    /// 🎯 跳转到指定历史位置 - 与原始jump完全相同的API
249    ///
250    /// 保持与runtime.rs:850-856行完全相同的接口
251    pub async fn jump(
252        &mut self,
253        steps: isize,
254    ) -> ForgeResult<()> {
255        let (tx, rx) = oneshot::channel();
256
257        self.actor_system()?
258            .state_actor
259            .send_message(StateMessage::Jump { steps, reply: tx })
260            .map_err(|e| {
261                error_utils::state_error(format!("发送跳转消息失败: {e}"))
262            })?;
263
264        rx.await
265            .map_err(|e| {
266                error_utils::state_error(format!("接收跳转响应失败: {e}"))
267            })?
268            .map(|_| ())
269    }
270
271    /// 🎯 发送事件 - 与原始emit_event完全相同的API
272    ///
273    /// 保持与runtime.rs:521-528行完全相同的接口
274    pub async fn emit_event(
275        &mut self,
276        event: Event,
277    ) -> ForgeResult<()> {
278        metrics::event_emitted(event.name());
279
280        self.actor_system()?
281            .event_bus
282            .send_message(EventBusMessage::PublishEvent { event })
283            .map_err(|e| {
284                error_utils::event_error(format!("发送事件消息失败: {e}"))
285            })?;
286
287        Ok(())
288    }
289
290    /// 🎯 获取配置 - 与原始get_config完全相同的API
291    ///
292    /// 保持与runtime.rs:809-811行完全相同的接口
293    pub fn get_config(&self) -> &ForgeConfig {
294        &self.config
295    }
296
297    /// 🎯 更新配置 - 与原始update_config完全相同的API
298    ///
299    /// 保持与runtime.rs:814-819行完全相同的接口
300    pub fn update_config(
301        &mut self,
302        config: ForgeConfig,
303    ) {
304        self.config = config;
305        // 这里可以向各个Actor发送配置更新消息
306    }
307
308    /// 🎯 销毁运行时 - 与原始destroy完全相同的API
309    ///
310    /// 保持与runtime.rs:511-519行完全相同的接口
311    pub async fn destroy(&mut self) -> ForgeResult<()> {
312        debug!("正在销毁Actor运行时实例");
313
314        if self.started {
315            // 广播销毁事件
316            let _ = self.emit_event(Event::Destroy).await;
317
318            // 关闭Actor系统
319            if let Some(actor_system) = self.actor_system.take() {
320                ForgeActorSystem::shutdown(actor_system).await.map_err(
321                    |e| {
322                        error_utils::engine_error(format!(
323                            "关闭Actor系统失败: {e}"
324                        ))
325                    },
326                )?;
327            }
328
329            self.started = false;
330        }
331
332        debug!("Actor运行时实例销毁成功");
333        Ok(())
334    }
335
336    /// 检查运行时是否已启动
337    pub fn is_started(&self) -> bool {
338        self.started
339    }
340
341    /// 获取schema
342    pub async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
343        let state = self.get_state().await?;
344        Ok(state.schema())
345    }
346
347    /// 获取运行时选项 (占位方法,Actor运行时不直接持有options)
348    pub fn get_options(&self) -> RuntimeOptions {
349        RuntimeOptions::default()
350    }
351}
352
353/// 确保在Drop时清理资源
354impl Drop for ForgeActorRuntime {
355    fn drop(&mut self) {
356        if self.started {
357            debug!("ForgeActorRuntime Drop: 检测到未正确关闭的运行时");
358            // 在Drop中只能做同步操作
359            // 异步清理应该通过显式调用destroy()来完成
360        }
361    }
362}
363
364// ==================== RuntimeTrait 实现 ====================
365
366#[async_trait]
367impl RuntimeTrait for ForgeActorRuntime {
368    async fn dispatch(
369        &mut self,
370        transaction: Transaction,
371    ) -> ForgeResult<()> {
372        self.dispatch(transaction).await
373    }
374
375    async fn dispatch_with_meta(
376        &mut self,
377        transaction: Transaction,
378        description: String,
379        meta: serde_json::Value,
380    ) -> ForgeResult<()> {
381        self.dispatch_with_meta(transaction, description, meta).await
382    }
383
384    async fn command(
385        &mut self,
386        command: Arc<dyn Command>,
387    ) -> ForgeResult<()> {
388        self.command(command).await
389    }
390
391    async fn command_with_meta(
392        &mut self,
393        command: Arc<dyn Command>,
394        description: String,
395        meta: serde_json::Value,
396    ) -> ForgeResult<()> {
397        self.command_with_meta(command, description, meta).await
398    }
399
400    async fn get_state(&self) -> ForgeResult<Arc<State>> {
401        self.get_state().await
402    }
403
404    async fn get_tr(&self) -> ForgeResult<Transaction> {
405        self.get_tr().await
406    }
407
408    async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
409        self.get_schema().await
410    }
411
412    async fn undo(&mut self) -> ForgeResult<()> {
413        self.undo().await
414    }
415
416    async fn redo(&mut self) -> ForgeResult<()> {
417        self.redo().await
418    }
419
420    async fn jump(
421        &mut self,
422        steps: isize,
423    ) -> ForgeResult<()> {
424        self.jump(steps).await
425    }
426
427    fn get_config(&self) -> &ForgeConfig {
428        self.get_config()
429    }
430
431    fn update_config(
432        &mut self,
433        config: ForgeConfig,
434    ) {
435        self.update_config(config);
436    }
437
438    fn get_options(&self) -> &RuntimeOptions {
439        // Actor运行时不直接持有options,返回一个静态引用
440        // 这是一个权衡,因为RuntimeTrait需要返回引用
441        thread_local! {
442            static DEFAULT_OPTIONS: RuntimeOptions = RuntimeOptions::default();
443        }
444        DEFAULT_OPTIONS.with(|opts| unsafe {
445            // SAFETY: 这是一个只读的thread_local变量,生命周期与线程绑定
446            std::mem::transmute::<&RuntimeOptions, &'static RuntimeOptions>(
447                opts,
448            )
449        })
450    }
451
452    async fn destroy(&mut self) -> ForgeResult<()> {
453        self.destroy().await
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460
461    #[tokio::test]
462    async fn test_actor_runtime_creation() {
463        let options = RuntimeOptions::default();
464        let result = ForgeActorRuntime::create(options).await;
465
466        // 基本创建测试 - 完整测试在集成测试中进行
467        // 这里只验证编译和基本结构
468        assert!(result.is_ok() || result.is_err()); // 确保返回了某种结果
469    }
470
471    #[tokio::test]
472    async fn test_actor_runtime_api_compatibility() {
473        // 测试API签名是否与原始ForgeRuntime兼容
474        // 这确保了API层面的兼容性
475
476        let options = RuntimeOptions::default();
477        if let Ok(mut runtime) = ForgeActorRuntime::create(options).await {
478            // 这些调用应该编译通过,验证API兼容性
479            let _ = runtime.get_config();
480            let _ = runtime.is_started();
481
482            // 清理
483            let _ = runtime.destroy().await;
484        }
485    }
486}