mf_core/runtime/
actor_runtime.rs

1//! Actor运行时 - 提供与现有API兼容的Actor实现
2//!
3//! 这个模块作为新Actor系统的Facade,保持与现有ForgeRuntime API的完全兼容性。
4
5use std::sync::Arc;
6use std::time::Instant;
7use async_trait::async_trait;
8use tokio::sync::oneshot;
9
10use crate::{
11    actors::{
12        system::{ForgeActorSystem, ForgeActorSystemHandle, ActorSystemConfig},
13        transaction_processor::TransactionMessage,
14        state_actor::StateMessage,
15        event_bus::EventBusMessage,
16    },
17    config::ForgeConfig,
18    debug::debug,
19    error::{error_utils, ForgeResult},
20    event::Event,
21    runtime::runtime_trait::RuntimeTrait,
22    types::RuntimeOptions,
23    metrics,
24};
25
26use mf_model::schema::Schema;
27use mf_state::{
28    state::State,
29    transaction::{Command, Transaction},
30};
31
32/// Actor运行时 - 新的基于Actor的实现
33///
34/// 提供与原始ForgeRuntime完全相同的API,但内部使用Actor系统实现。
35/// 这确保了现有代码无需修改即可使用新的架构。
36pub struct ForgeActorRuntime {
37    /// Actor系统句柄
38    actor_system: Option<ForgeActorSystemHandle>,
39    /// 配置
40    config: ForgeConfig,
41    /// 是否已启动
42    started: bool,
43}
44
45impl ForgeActorRuntime {
46    /// 获取Actor系统句柄引用
47    fn actor_system(&self) -> ForgeResult<&ForgeActorSystemHandle> {
48        self.actor_system.as_ref().ok_or_else(|| {
49            error_utils::engine_error("Actor系统未初始化".to_string())
50        })
51    }
52
53    /// 创建新的Actor运行时实例
54    ///
55    /// # 参数
56    /// * `options` - 运行时选项
57    ///
58    /// # 返回值
59    /// * `ForgeResult<Self>` - Actor运行时实例或错误
60    #[cfg_attr(
61        feature = "dev-tracing",
62        tracing::instrument(
63            skip(options),
64            fields(crate_name = "core", runtime_type = "actor")
65        )
66    )]
67    pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
68        Self::create_with_config(options, ForgeConfig::default()).await
69    }
70
71    /// 使用指定配置创建Actor运行时实例
72    ///
73    /// # 参数
74    /// * `options` - 运行时选项
75    /// * `config` - Forge配置
76    ///
77    /// # 返回值
78    /// * `ForgeResult<Self>` - Actor运行时实例或错误
79    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
80        crate_name = "core",
81        runtime_type = "actor",
82        has_middleware = !options.get_middleware_stack().is_empty()
83    )))]
84    pub async fn create_with_config(
85        options: RuntimeOptions,
86        config: ForgeConfig,
87    ) -> ForgeResult<Self> {
88        let start_time = Instant::now();
89        debug!("正在创建Actor运行时实例");
90
91        // 启动Actor系统
92        let actor_system = ForgeActorSystem::start(
93            options,
94            config.clone(),
95            ActorSystemConfig::default(),
96        )
97        .await
98        .map_err(|e| {
99            error_utils::engine_error(format!("启动Actor系统失败: {e}"))
100        })?;
101
102        debug!("Actor运行时实例创建成功");
103        metrics::editor_creation_duration(start_time.elapsed());
104
105        Ok(ForgeActorRuntime {
106            actor_system: Some(actor_system),
107            config,
108            started: true,
109        })
110    }
111
112    /// 🎯 处理事务 - 与原始dispatch完全相同的API
113    ///
114    /// 保持与runtime.rs:662-672行完全相同的接口
115    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
116        crate_name = "core",
117        tr_id = %transaction.id,
118        runtime_type = "actor"
119    )))]
120    pub async fn dispatch(
121        &mut self,
122        transaction: Transaction,
123    ) -> ForgeResult<()> {
124        self.dispatch_with_meta(
125            transaction,
126            "".to_string(),
127            serde_json::Value::Null,
128        )
129        .await
130    }
131
132    /// 🎯 处理事务(包含元信息)- 与原始dispatch_with_meta完全相同的API
133    ///
134    /// 保持与runtime.rs:674-721行完全相同的接口和语义
135    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction, meta), fields(
136        crate_name = "core",
137        tr_id = %transaction.id,
138        description = %description,
139        runtime_type = "actor"
140    )))]
141    pub async fn dispatch_with_meta(
142        &mut self,
143        transaction: Transaction,
144        description: String,
145        meta: serde_json::Value,
146    ) -> ForgeResult<()> {
147        if !self.started {
148            return Err(error_utils::engine_error("运行时未启动".to_string()));
149        }
150
151        // 通过Actor系统处理事务,但保持完全相同的语义
152        let (tx, rx) = oneshot::channel();
153
154        self.actor_system()?
155            .transaction_processor
156            .send_message(TransactionMessage::ProcessTransaction {
157                transaction,
158                description,
159                meta,
160                reply: tx,
161            })
162            .map_err(|e| {
163                error_utils::engine_error(format!("发送事务消息失败: {e}"))
164            })?;
165
166        rx.await.map_err(|e| {
167            error_utils::engine_error(format!("等待事务处理结果失败: {e}"))
168        })?
169    }
170
171    /// 🎯 执行命令 - 与原始command完全相同的API
172    ///
173    /// 保持与runtime.rs:629-639行完全相同的接口
174    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command), fields(
175        crate_name = "core",
176        command_name = %command.name(),
177        runtime_type = "actor"
178    )))]
179    pub async fn command(
180        &mut self,
181        command: Arc<dyn Command>,
182    ) -> ForgeResult<()> {
183        debug!("正在执行命令: {}", command.name());
184        metrics::command_executed(command.name().as_str());
185
186        let mut tr = self.get_tr().await?;
187        command.execute(&mut tr).await?;
188        tr.commit()?;
189        self.dispatch(tr).await
190    }
191
192    /// 🎯 执行命令(包含元信息)- 与原始command_with_meta完全相同的API
193    ///
194    /// 保持与runtime.rs:641-653行完全相同的接口
195    pub async fn command_with_meta(
196        &mut self,
197        command: Arc<dyn Command>,
198        description: String,
199        meta: serde_json::Value,
200    ) -> ForgeResult<()> {
201        debug!("正在执行命令: {}", command.name());
202        metrics::command_executed(command.name().as_str());
203
204        let mut tr = self.get_tr().await?;
205        command.execute(&mut tr).await?;
206        tr.commit()?;
207        self.dispatch_with_meta(tr, description, meta).await
208    }
209
210    /// 🎯 获取当前状态 - 与原始get_state完全相同的API
211    ///
212    /// 保持与runtime.rs:821-823行完全相同的接口
213    pub async fn get_state(&self) -> ForgeResult<Arc<State>> {
214        let (tx, rx) = oneshot::channel();
215
216        self.actor_system()?
217            .state_actor
218            .send_message(StateMessage::GetState { reply: tx })
219            .map_err(|e| {
220                error_utils::state_error(format!("发送获取状态消息失败: {e}"))
221            })?;
222
223        rx.await.map_err(|e| {
224            error_utils::state_error(format!("接收状态响应失败: {e}"))
225        })
226    }
227
228    /// 🎯 获取事务对象 - 与原始get_tr完全相同的API
229    ///
230    /// 保持与runtime.rs:833-836行完全相同的接口
231    pub async fn get_tr(&self) -> ForgeResult<Transaction> {
232        let state = self.get_state().await?;
233        Ok(state.tr())
234    }
235
236    /// 🎯 撤销操作 - 与原始undo完全相同的API
237    ///
238    /// 保持与runtime.rs:838-842行完全相同的接口
239    pub async fn undo(&mut self) -> ForgeResult<()> {
240        let (tx, rx) = oneshot::channel();
241
242        self.actor_system()?
243            .state_actor
244            .send_message(StateMessage::Undo { reply: tx })
245            .map_err(|e| {
246                error_utils::state_error(format!("发送撤销消息失败: {e}"))
247            })?;
248
249        rx.await
250            .map_err(|e| {
251                error_utils::state_error(format!("接收撤销响应失败: {e}"))
252            })?
253            .map(|_| ())
254    }
255
256    /// 🎯 重做操作 - 与原始redo完全相同的API
257    ///
258    /// 保持与runtime.rs:844-848行完全相同的接口
259    pub async fn redo(&mut self) -> ForgeResult<()> {
260        let (tx, rx) = oneshot::channel();
261
262        self.actor_system()?
263            .state_actor
264            .send_message(StateMessage::Redo { reply: tx })
265            .map_err(|e| {
266                error_utils::state_error(format!("发送重做消息失败: {e}"))
267            })?;
268
269        rx.await
270            .map_err(|e| {
271                error_utils::state_error(format!("接收重做响应失败: {e}"))
272            })?
273            .map(|_| ())
274    }
275
276    /// 🎯 跳转到指定历史位置 - 与原始jump完全相同的API
277    ///
278    /// 保持与runtime.rs:850-856行完全相同的接口
279    pub async fn jump(
280        &mut self,
281        steps: isize,
282    ) -> ForgeResult<()> {
283        let (tx, rx) = oneshot::channel();
284
285        self.actor_system()?
286            .state_actor
287            .send_message(StateMessage::Jump { steps, reply: tx })
288            .map_err(|e| {
289                error_utils::state_error(format!("发送跳转消息失败: {e}"))
290            })?;
291
292        rx.await
293            .map_err(|e| {
294                error_utils::state_error(format!("接收跳转响应失败: {e}"))
295            })?
296            .map(|_| ())
297    }
298
299    /// 🎯 发送事件 - 与原始emit_event完全相同的API
300    ///
301    /// 保持与runtime.rs:521-528行完全相同的接口
302    pub async fn emit_event(
303        &mut self,
304        event: Event,
305    ) -> ForgeResult<()> {
306        metrics::event_emitted(event.name());
307
308        self.actor_system()?
309            .event_bus
310            .send_message(EventBusMessage::PublishEvent { event })
311            .map_err(|e| {
312                error_utils::event_error(format!("发送事件消息失败: {e}"))
313            })?;
314
315        Ok(())
316    }
317
318    /// 🎯 获取配置 - 与原始get_config完全相同的API
319    ///
320    /// 保持与runtime.rs:809-811行完全相同的接口
321    pub fn get_config(&self) -> &ForgeConfig {
322        &self.config
323    }
324
325    /// 🎯 更新配置 - 与原始update_config完全相同的API
326    ///
327    /// 保持与runtime.rs:814-819行完全相同的接口
328    pub fn update_config(
329        &mut self,
330        config: ForgeConfig,
331    ) {
332        self.config = config;
333        // 这里可以向各个Actor发送配置更新消息
334    }
335
336    /// 🎯 销毁运行时 - 与原始destroy完全相同的API
337    ///
338    /// 保持与runtime.rs:511-519行完全相同的接口
339    #[cfg_attr(
340        feature = "dev-tracing",
341        tracing::instrument(
342            skip(self),
343            fields(crate_name = "core", runtime_type = "actor")
344        )
345    )]
346    pub async fn destroy(&mut self) -> ForgeResult<()> {
347        debug!("正在销毁Actor运行时实例");
348
349        if self.started {
350            // 广播销毁事件
351            let _ = self.emit_event(Event::Destroy).await;
352
353            // 关闭Actor系统
354            if let Some(actor_system) = self.actor_system.take() {
355                ForgeActorSystem::shutdown(actor_system).await.map_err(
356                    |e| {
357                        error_utils::engine_error(format!(
358                            "关闭Actor系统失败: {e}"
359                        ))
360                    },
361                )?;
362            }
363
364            self.started = false;
365        }
366
367        debug!("Actor运行时实例销毁成功");
368        Ok(())
369    }
370
371    /// 检查运行时是否已启动
372    pub fn is_started(&self) -> bool {
373        self.started
374    }
375
376    /// 获取schema
377    pub async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
378        let state = self.get_state().await?;
379        Ok(state.schema())
380    }
381
382    /// 获取运行时选项 (占位方法,Actor运行时不直接持有options)
383    pub fn get_options(&self) -> RuntimeOptions {
384        RuntimeOptions::default()
385    }
386}
387
388/// 确保在Drop时清理资源
389impl Drop for ForgeActorRuntime {
390    fn drop(&mut self) {
391        if self.started {
392            debug!("ForgeActorRuntime Drop: 检测到未正确关闭的运行时");
393            // 在Drop中只能做同步操作
394            // 异步清理应该通过显式调用destroy()来完成
395        }
396    }
397}
398
399// ==================== RuntimeTrait 实现 ====================
400
401#[async_trait]
402impl RuntimeTrait for ForgeActorRuntime {
403    async fn dispatch(
404        &mut self,
405        transaction: Transaction,
406    ) -> ForgeResult<()> {
407        self.dispatch(transaction).await
408    }
409
410    async fn dispatch_with_meta(
411        &mut self,
412        transaction: Transaction,
413        description: String,
414        meta: serde_json::Value,
415    ) -> ForgeResult<()> {
416        self.dispatch_with_meta(transaction, description, meta).await
417    }
418
419    async fn command(
420        &mut self,
421        command: Arc<dyn Command>,
422    ) -> ForgeResult<()> {
423        self.command(command).await
424    }
425
426    async fn command_with_meta(
427        &mut self,
428        command: Arc<dyn Command>,
429        description: String,
430        meta: serde_json::Value,
431    ) -> ForgeResult<()> {
432        self.command_with_meta(command, description, meta).await
433    }
434
435    async fn get_state(&self) -> ForgeResult<Arc<State>> {
436        self.get_state().await
437    }
438
439    async fn get_tr(&self) -> ForgeResult<Transaction> {
440        self.get_tr().await
441    }
442
443    async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
444        self.get_schema().await
445    }
446
447    async fn undo(&mut self) -> ForgeResult<()> {
448        self.undo().await
449    }
450
451    async fn redo(&mut self) -> ForgeResult<()> {
452        self.redo().await
453    }
454
455    async fn jump(
456        &mut self,
457        steps: isize,
458    ) -> ForgeResult<()> {
459        self.jump(steps).await
460    }
461
462    fn get_config(&self) -> &ForgeConfig {
463        self.get_config()
464    }
465
466    fn update_config(
467        &mut self,
468        config: ForgeConfig,
469    ) {
470        self.update_config(config);
471    }
472
473    fn get_options(&self) -> &RuntimeOptions {
474        // Actor运行时不直接持有options,返回一个静态引用
475        // 这是一个权衡,因为RuntimeTrait需要返回引用
476        thread_local! {
477            static DEFAULT_OPTIONS: RuntimeOptions = RuntimeOptions::default();
478        }
479        DEFAULT_OPTIONS.with(|opts| unsafe {
480            // SAFETY: 这是一个只读的thread_local变量,生命周期与线程绑定
481            std::mem::transmute::<&RuntimeOptions, &'static RuntimeOptions>(
482                opts,
483            )
484        })
485    }
486
487    async fn destroy(&mut self) -> ForgeResult<()> {
488        self.destroy().await
489    }
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495
496    #[tokio::test]
497    async fn test_actor_runtime_creation() {
498        let options = RuntimeOptions::default();
499        let result = ForgeActorRuntime::create(options).await;
500
501        // 基本创建测试 - 完整测试在集成测试中进行
502        // 这里只验证编译和基本结构
503        assert!(result.is_ok() || result.is_err()); // 确保返回了某种结果
504    }
505
506    #[tokio::test]
507    async fn test_actor_runtime_api_compatibility() {
508        // 测试API签名是否与原始ForgeRuntime兼容
509        // 这确保了API层面的兼容性
510
511        let options = RuntimeOptions::default();
512        if let Ok(mut runtime) = ForgeActorRuntime::create(options).await {
513            // 这些调用应该编译通过,验证API兼容性
514            let _ = runtime.get_config();
515            let _ = runtime.is_started();
516
517            // 清理
518            let _ = runtime.destroy().await;
519        }
520    }
521}