mf_core/runtime/
actor_runtime.rs

1//! Actor运行时 - 提供与现有API兼容的Actor实现
2//!
3//! 这个模块作为新Actor系统的Facade,保持与现有ForgeRuntime API的完全兼容性。
4
5use std::sync::Arc;
6use std::time::Instant;
7use async_trait::async_trait;
8use tokio::sync::oneshot;
9
10use crate::{
11    actors::{
12        system::{ForgeActorSystem, ForgeActorSystemHandle, ActorSystemConfig},
13        transaction_processor::TransactionMessage,
14        state_actor::StateMessage,
15        event_bus::EventBusMessage,
16    },
17    config::ForgeConfig,
18    debug::debug,
19    error::{error_utils, ForgeResult},
20    event::Event,
21    runtime::runtime_trait::RuntimeTrait,
22    types::RuntimeOptions,
23    metrics,
24};
25
26use mf_model::schema::Schema;
27use mf_state::{
28    state::State,
29    transaction::{Command, Transaction},
30};
31
32/// Actor运行时 - 新的基于Actor的实现
33///
34/// 提供与原始ForgeRuntime完全相同的API,但内部使用Actor系统实现。
35/// 这确保了现有代码无需修改即可使用新的架构。
36pub struct ForgeActorRuntime {
37    /// Actor系统句柄
38    actor_system: Option<ForgeActorSystemHandle>,
39    /// 配置
40    config: ForgeConfig,
41    /// 是否已启动
42    started: bool,
43}
44
45impl ForgeActorRuntime {
46    /// 获取Actor系统句柄引用
47    fn actor_system(&self) -> ForgeResult<&ForgeActorSystemHandle> {
48        self.actor_system.as_ref().ok_or_else(|| {
49            error_utils::engine_error("Actor系统未初始化".to_string())
50        })
51    }
52
53    /// 创建新的Actor运行时实例
54    ///
55    /// # 参数
56    /// * `options` - 运行时选项
57    ///
58    /// # 返回值
59    /// * `ForgeResult<Self>` - Actor运行时实例或错误
60    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options), fields(
61        crate_name = "core",
62        runtime_type = "actor"
63    )))]
64    pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
65        Self::create_with_config(options, ForgeConfig::default()).await
66    }
67
68    /// 使用指定配置创建Actor运行时实例
69    ///
70    /// # 参数
71    /// * `options` - 运行时选项
72    /// * `config` - Forge配置
73    ///
74    /// # 返回值
75    /// * `ForgeResult<Self>` - Actor运行时实例或错误
76    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
77        crate_name = "core",
78        runtime_type = "actor",
79        has_middleware = !options.get_middleware_stack().is_empty()
80    )))]
81    pub async fn create_with_config(
82        options: RuntimeOptions,
83        config: ForgeConfig,
84    ) -> ForgeResult<Self> {
85        let start_time = Instant::now();
86        debug!("正在创建Actor运行时实例");
87
88        // 启动Actor系统
89        let actor_system = ForgeActorSystem::start(
90            options,
91            config.clone(),
92            ActorSystemConfig::default(),
93        )
94        .await
95        .map_err(|e| {
96            error_utils::engine_error(format!("启动Actor系统失败: {e}"))
97        })?;
98
99        debug!("Actor运行时实例创建成功");
100        metrics::editor_creation_duration(start_time.elapsed());
101
102        Ok(ForgeActorRuntime {
103            actor_system: Some(actor_system),
104            config,
105            started: true,
106        })
107    }
108
109    /// 🎯 处理事务 - 与原始dispatch完全相同的API
110    ///
111    /// 保持与runtime.rs:662-672行完全相同的接口
112    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
113        crate_name = "core",
114        tr_id = %transaction.id,
115        runtime_type = "actor"
116    )))]
117    pub async fn dispatch(
118        &mut self,
119        transaction: Transaction,
120    ) -> ForgeResult<()> {
121        self.dispatch_with_meta(
122            transaction,
123            "".to_string(),
124            serde_json::Value::Null,
125        )
126        .await
127    }
128
129    /// 🎯 处理事务(包含元信息)- 与原始dispatch_with_meta完全相同的API
130    ///
131    /// 保持与runtime.rs:674-721行完全相同的接口和语义
132    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction, meta), fields(
133        crate_name = "core",
134        tr_id = %transaction.id,
135        description = %description,
136        runtime_type = "actor"
137    )))]
138    pub async fn dispatch_with_meta(
139        &mut self,
140        transaction: Transaction,
141        description: String,
142        meta: serde_json::Value,
143    ) -> ForgeResult<()> {
144        if !self.started {
145            return Err(error_utils::engine_error("运行时未启动".to_string()));
146        }
147
148        // 通过Actor系统处理事务,但保持完全相同的语义
149        let (tx, rx) = oneshot::channel();
150
151        self.actor_system()?
152            .transaction_processor
153            .send_message(TransactionMessage::ProcessTransaction {
154                transaction,
155                description,
156                meta,
157                reply: tx,
158            })
159            .map_err(|e| {
160                error_utils::engine_error(format!("发送事务消息失败: {e}"))
161            })?;
162
163        rx.await.map_err(|e| {
164            error_utils::engine_error(format!("等待事务处理结果失败: {e}"))
165        })?
166    }
167
168    /// 🎯 执行命令 - 与原始command完全相同的API
169    ///
170    /// 保持与runtime.rs:629-639行完全相同的接口
171    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command), fields(
172        crate_name = "core",
173        command_name = %command.name(),
174        runtime_type = "actor"
175    )))]
176    pub async fn command(
177        &mut self,
178        command: Arc<dyn Command>,
179    ) -> ForgeResult<()> {
180        debug!("正在执行命令: {}", command.name());
181        metrics::command_executed(command.name().as_str());
182
183        let mut tr = self.get_tr().await?;
184        command.execute(&mut tr).await?;
185        tr.commit()?;
186        self.dispatch(tr).await
187    }
188
189    /// 🎯 执行命令(包含元信息)- 与原始command_with_meta完全相同的API
190    ///
191    /// 保持与runtime.rs:641-653行完全相同的接口
192    pub async fn command_with_meta(
193        &mut self,
194        command: Arc<dyn Command>,
195        description: String,
196        meta: serde_json::Value,
197    ) -> ForgeResult<()> {
198        debug!("正在执行命令: {}", command.name());
199        metrics::command_executed(command.name().as_str());
200
201        let mut tr = self.get_tr().await?;
202        command.execute(&mut tr).await?;
203        tr.commit()?;
204        self.dispatch_with_meta(tr, description, meta).await
205    }
206
207    /// 🎯 获取当前状态 - 与原始get_state完全相同的API
208    ///
209    /// 保持与runtime.rs:821-823行完全相同的接口
210    pub async fn get_state(&self) -> ForgeResult<Arc<State>> {
211        let (tx, rx) = oneshot::channel();
212
213        self.actor_system()?
214            .state_actor
215            .send_message(StateMessage::GetState { reply: tx })
216            .map_err(|e| {
217                error_utils::state_error(format!("发送获取状态消息失败: {e}"))
218            })?;
219
220        rx.await.map_err(|e| {
221            error_utils::state_error(format!("接收状态响应失败: {e}"))
222        })
223    }
224
225    /// 🎯 获取事务对象 - 与原始get_tr完全相同的API
226    ///
227    /// 保持与runtime.rs:833-836行完全相同的接口
228    pub async fn get_tr(&self) -> ForgeResult<Transaction> {
229        let state = self.get_state().await?;
230        Ok(state.tr())
231    }
232
233    /// 🎯 撤销操作 - 与原始undo完全相同的API
234    ///
235    /// 保持与runtime.rs:838-842行完全相同的接口
236    pub async fn undo(&mut self) -> ForgeResult<()> {
237        let (tx, rx) = oneshot::channel();
238
239        self.actor_system()?
240            .state_actor
241            .send_message(StateMessage::Undo { reply: tx })
242            .map_err(|e| {
243                error_utils::state_error(format!("发送撤销消息失败: {e}"))
244            })?;
245
246        rx.await
247            .map_err(|e| {
248                error_utils::state_error(format!("接收撤销响应失败: {e}"))
249            })?
250            .map(|_| ())
251    }
252
253    /// 🎯 重做操作 - 与原始redo完全相同的API
254    ///
255    /// 保持与runtime.rs:844-848行完全相同的接口
256    pub async fn redo(&mut self) -> ForgeResult<()> {
257        let (tx, rx) = oneshot::channel();
258
259        self.actor_system()?
260            .state_actor
261            .send_message(StateMessage::Redo { reply: tx })
262            .map_err(|e| {
263                error_utils::state_error(format!("发送重做消息失败: {e}"))
264            })?;
265
266        rx.await
267            .map_err(|e| {
268                error_utils::state_error(format!("接收重做响应失败: {e}"))
269            })?
270            .map(|_| ())
271    }
272
273    /// 🎯 跳转到指定历史位置 - 与原始jump完全相同的API
274    ///
275    /// 保持与runtime.rs:850-856行完全相同的接口
276    pub async fn jump(
277        &mut self,
278        steps: isize,
279    ) -> ForgeResult<()> {
280        let (tx, rx) = oneshot::channel();
281
282        self.actor_system()?
283            .state_actor
284            .send_message(StateMessage::Jump { steps, reply: tx })
285            .map_err(|e| {
286                error_utils::state_error(format!("发送跳转消息失败: {e}"))
287            })?;
288
289        rx.await
290            .map_err(|e| {
291                error_utils::state_error(format!("接收跳转响应失败: {e}"))
292            })?
293            .map(|_| ())
294    }
295
296    /// 🎯 发送事件 - 与原始emit_event完全相同的API
297    ///
298    /// 保持与runtime.rs:521-528行完全相同的接口
299    pub async fn emit_event(
300        &mut self,
301        event: Event,
302    ) -> ForgeResult<()> {
303        metrics::event_emitted(event.name());
304
305        self.actor_system()?
306            .event_bus
307            .send_message(EventBusMessage::PublishEvent { event })
308            .map_err(|e| {
309                error_utils::event_error(format!("发送事件消息失败: {e}"))
310            })?;
311
312        Ok(())
313    }
314
315    /// 🎯 获取配置 - 与原始get_config完全相同的API
316    ///
317    /// 保持与runtime.rs:809-811行完全相同的接口
318    pub fn get_config(&self) -> &ForgeConfig {
319        &self.config
320    }
321
322    /// 🎯 更新配置 - 与原始update_config完全相同的API
323    ///
324    /// 保持与runtime.rs:814-819行完全相同的接口
325    pub fn update_config(
326        &mut self,
327        config: ForgeConfig,
328    ) {
329        self.config = config;
330        // 这里可以向各个Actor发送配置更新消息
331    }
332
333    /// 🎯 销毁运行时 - 与原始destroy完全相同的API
334    ///
335    /// 保持与runtime.rs:511-519行完全相同的接口
336    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self), fields(
337        crate_name = "core",
338        runtime_type = "actor"
339    )))]
340    pub async fn destroy(&mut self) -> ForgeResult<()> {
341        debug!("正在销毁Actor运行时实例");
342
343        if self.started {
344            // 广播销毁事件
345            let _ = self.emit_event(Event::Destroy).await;
346
347            // 关闭Actor系统
348            if let Some(actor_system) = self.actor_system.take() {
349                ForgeActorSystem::shutdown(actor_system).await.map_err(
350                    |e| {
351                        error_utils::engine_error(format!(
352                            "关闭Actor系统失败: {e}"
353                        ))
354                    },
355                )?;
356            }
357
358            self.started = false;
359        }
360
361        debug!("Actor运行时实例销毁成功");
362        Ok(())
363    }
364
365    /// 检查运行时是否已启动
366    pub fn is_started(&self) -> bool {
367        self.started
368    }
369
370    /// 获取schema
371    pub async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
372        let state = self.get_state().await?;
373        Ok(state.schema())
374    }
375
376    /// 获取运行时选项 (占位方法,Actor运行时不直接持有options)
377    pub fn get_options(&self) -> RuntimeOptions {
378        RuntimeOptions::default()
379    }
380}
381
382/// 确保在Drop时清理资源
383impl Drop for ForgeActorRuntime {
384    fn drop(&mut self) {
385        if self.started {
386            debug!("ForgeActorRuntime Drop: 检测到未正确关闭的运行时");
387            // 在Drop中只能做同步操作
388            // 异步清理应该通过显式调用destroy()来完成
389        }
390    }
391}
392
393// ==================== RuntimeTrait 实现 ====================
394
395#[async_trait]
396impl RuntimeTrait for ForgeActorRuntime {
397    async fn dispatch(
398        &mut self,
399        transaction: Transaction,
400    ) -> ForgeResult<()> {
401        self.dispatch(transaction).await
402    }
403
404    async fn dispatch_with_meta(
405        &mut self,
406        transaction: Transaction,
407        description: String,
408        meta: serde_json::Value,
409    ) -> ForgeResult<()> {
410        self.dispatch_with_meta(transaction, description, meta).await
411    }
412
413    async fn command(
414        &mut self,
415        command: Arc<dyn Command>,
416    ) -> ForgeResult<()> {
417        self.command(command).await
418    }
419
420    async fn command_with_meta(
421        &mut self,
422        command: Arc<dyn Command>,
423        description: String,
424        meta: serde_json::Value,
425    ) -> ForgeResult<()> {
426        self.command_with_meta(command, description, meta).await
427    }
428
429    async fn get_state(&self) -> ForgeResult<Arc<State>> {
430        self.get_state().await
431    }
432
433    async fn get_tr(&self) -> ForgeResult<Transaction> {
434        self.get_tr().await
435    }
436
437    async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
438        self.get_schema().await
439    }
440
441    async fn undo(&mut self) -> ForgeResult<()> {
442        self.undo().await
443    }
444
445    async fn redo(&mut self) -> ForgeResult<()> {
446        self.redo().await
447    }
448
449    async fn jump(
450        &mut self,
451        steps: isize,
452    ) -> ForgeResult<()> {
453        self.jump(steps).await
454    }
455
456    fn get_config(&self) -> &ForgeConfig {
457        self.get_config()
458    }
459
460    fn update_config(
461        &mut self,
462        config: ForgeConfig,
463    ) {
464        self.update_config(config);
465    }
466
467    fn get_options(&self) -> &RuntimeOptions {
468        // Actor运行时不直接持有options,返回一个静态引用
469        // 这是一个权衡,因为RuntimeTrait需要返回引用
470        thread_local! {
471            static DEFAULT_OPTIONS: RuntimeOptions = RuntimeOptions::default();
472        }
473        DEFAULT_OPTIONS.with(|opts| unsafe {
474            // SAFETY: 这是一个只读的thread_local变量,生命周期与线程绑定
475            std::mem::transmute::<&RuntimeOptions, &'static RuntimeOptions>(
476                opts,
477            )
478        })
479    }
480
481    async fn destroy(&mut self) -> ForgeResult<()> {
482        self.destroy().await
483    }
484}
485
486#[cfg(test)]
487mod tests {
488    use super::*;
489
490    #[tokio::test]
491    async fn test_actor_runtime_creation() {
492        let options = RuntimeOptions::default();
493        let result = ForgeActorRuntime::create(options).await;
494
495        // 基本创建测试 - 完整测试在集成测试中进行
496        // 这里只验证编译和基本结构
497        assert!(result.is_ok() || result.is_err()); // 确保返回了某种结果
498    }
499
500    #[tokio::test]
501    async fn test_actor_runtime_api_compatibility() {
502        // 测试API签名是否与原始ForgeRuntime兼容
503        // 这确保了API层面的兼容性
504
505        let options = RuntimeOptions::default();
506        if let Ok(mut runtime) = ForgeActorRuntime::create(options).await {
507            // 这些调用应该编译通过,验证API兼容性
508            let _ = runtime.get_config();
509            let _ = runtime.is_started();
510
511            // 清理
512            let _ = runtime.destroy().await;
513        }
514    }
515}