moduforge-core 0.7.0

moduforge 核心模块
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
//! Actor运行时 - 提供与现有API兼容的Actor实现
//!
//! 这个模块作为新Actor系统的Facade,保持与现有ForgeRuntime API的完全兼容性。

use std::sync::Arc;
use std::time::Instant;
use async_trait::async_trait;
use tokio::sync::oneshot;

use crate::{
    actors::{
        system::{ForgeActorSystem, ForgeActorSystemHandle, ActorSystemConfig},
        transaction_processor::TransactionMessage,
        state_actor::StateMessage,
        event_bus::EventBusMessage,
    },
    config::ForgeConfig,
    debug::debug,
    error::{error_utils, ForgeResult},
    event::Event,
    runtime::runtime_trait::RuntimeTrait,
    types::RuntimeOptions,
    metrics,
};

use mf_model::schema::Schema;
use mf_state::{
    state::State,
    transaction::{Command, Transaction},
};

/// Actor运行时 - 新的基于Actor的实现
///
/// 提供与原始ForgeRuntime完全相同的API,但内部使用Actor系统实现。
/// 这确保了现有代码无需修改即可使用新的架构。
pub struct ForgeActorRuntime {
    /// Actor系统句柄
    actor_system: Option<ForgeActorSystemHandle>,
    /// 配置
    config: ForgeConfig,
    /// 是否已启动
    started: bool,
}

impl ForgeActorRuntime {
    /// 获取Actor系统句柄引用
    fn actor_system(&self) -> ForgeResult<&ForgeActorSystemHandle> {
        self.actor_system.as_ref().ok_or_else(|| {
            error_utils::engine_error("Actor系统未初始化".to_string())
        })
    }

    /// 创建新的Actor运行时实例
    ///
    /// # 参数
    /// * `options` - 运行时选项
    ///
    /// # 返回值
    /// * `ForgeResult<Self>` - Actor运行时实例或错误
    #[cfg_attr(
        feature = "dev-tracing",
        tracing::instrument(
            skip(options),
            fields(crate_name = "core", runtime_type = "actor")
        )
    )]
    pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
        Self::create_with_config(options, ForgeConfig::default()).await
    }

    /// 使用指定配置创建Actor运行时实例
    ///
    /// # 参数
    /// * `options` - 运行时选项
    /// * `config` - Forge配置
    ///
    /// # 返回值
    /// * `ForgeResult<Self>` - Actor运行时实例或错误
    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
        crate_name = "core",
        runtime_type = "actor",
        has_middleware = !options.get_middleware_stack().is_empty()
    )))]
    pub async fn create_with_config(
        options: RuntimeOptions,
        config: ForgeConfig,
    ) -> ForgeResult<Self> {
        let start_time = Instant::now();
        debug!("正在创建Actor运行时实例");

        // 启动Actor系统
        let actor_system = ForgeActorSystem::start(
            options,
            config.clone(),
            ActorSystemConfig::default(),
        )
        .await
        .map_err(|e| {
            error_utils::engine_error(format!("启动Actor系统失败: {e}"))
        })?;

        debug!("Actor运行时实例创建成功");
        metrics::editor_creation_duration(start_time.elapsed());

        Ok(ForgeActorRuntime {
            actor_system: Some(actor_system),
            config,
            started: true,
        })
    }

    /// 🎯 处理事务 - 与原始dispatch完全相同的API
    ///
    /// 保持与runtime.rs:662-672行完全相同的接口
    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
        crate_name = "core",
        tr_id = %transaction.id,
        runtime_type = "actor"
    )))]
    pub async fn dispatch(
        &mut self,
        transaction: Transaction,
    ) -> ForgeResult<()> {
        self.dispatch_with_meta(
            transaction,
            "".to_string(),
            serde_json::Value::Null,
        )
        .await
    }

    /// 🎯 处理事务(包含元信息)- 与原始dispatch_with_meta完全相同的API
    ///
    /// 保持与runtime.rs:674-721行完全相同的接口和语义
    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction, meta), fields(
        crate_name = "core",
        tr_id = %transaction.id,
        description = %description,
        runtime_type = "actor"
    )))]
    pub async fn dispatch_with_meta(
        &mut self,
        transaction: Transaction,
        description: String,
        meta: serde_json::Value,
    ) -> ForgeResult<()> {
        if !self.started {
            return Err(error_utils::engine_error("运行时未启动".to_string()));
        }

        // 通过Actor系统处理事务,但保持完全相同的语义
        let (tx, rx) = oneshot::channel();

        self.actor_system()?
            .transaction_processor
            .send_message(TransactionMessage::ProcessTransaction {
                transaction,
                description,
                meta,
                reply: tx,
            })
            .map_err(|e| {
                error_utils::engine_error(format!("发送事务消息失败: {e}"))
            })?;

        rx.await.map_err(|e| {
            error_utils::engine_error(format!("等待事务处理结果失败: {e}"))
        })?
    }

    /// 🎯 执行命令 - 与原始command完全相同的API
    ///
    /// 保持与runtime.rs:629-639行完全相同的接口
    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command), fields(
        crate_name = "core",
        command_name = %command.name(),
        runtime_type = "actor"
    )))]
    pub async fn command(
        &mut self,
        command: Arc<dyn Command>,
    ) -> ForgeResult<()> {
        debug!("正在执行命令: {}", command.name());
        metrics::command_executed(command.name().as_str());

        let mut tr = self.get_tr().await?;
        command.execute(&mut tr).await?;
        tr.commit()?;
        self.dispatch(tr).await
    }

    /// 🎯 执行命令(包含元信息)- 与原始command_with_meta完全相同的API
    ///
    /// 保持与runtime.rs:641-653行完全相同的接口
    pub async fn command_with_meta(
        &mut self,
        command: Arc<dyn Command>,
        description: String,
        meta: serde_json::Value,
    ) -> ForgeResult<()> {
        debug!("正在执行命令: {}", command.name());
        metrics::command_executed(command.name().as_str());

        let mut tr = self.get_tr().await?;
        command.execute(&mut tr).await?;
        tr.commit()?;
        self.dispatch_with_meta(tr, description, meta).await
    }

    /// 🎯 获取当前状态 - 与原始get_state完全相同的API
    ///
    /// 保持与runtime.rs:821-823行完全相同的接口
    pub async fn get_state(&self) -> ForgeResult<Arc<State>> {
        let (tx, rx) = oneshot::channel();

        self.actor_system()?
            .state_actor
            .send_message(StateMessage::GetState { reply: tx })
            .map_err(|e| {
                error_utils::state_error(format!("发送获取状态消息失败: {e}"))
            })?;

        rx.await.map_err(|e| {
            error_utils::state_error(format!("接收状态响应失败: {e}"))
        })
    }

    /// 🎯 获取事务对象 - 与原始get_tr完全相同的API
    ///
    /// 保持与runtime.rs:833-836行完全相同的接口
    pub async fn get_tr(&self) -> ForgeResult<Transaction> {
        let state = self.get_state().await?;
        Ok(state.tr())
    }

    /// 🎯 撤销操作 - 与原始undo完全相同的API
    ///
    /// 保持与runtime.rs:838-842行完全相同的接口
    pub async fn undo(&mut self) -> ForgeResult<()> {
        let (tx, rx) = oneshot::channel();

        self.actor_system()?
            .state_actor
            .send_message(StateMessage::Undo { reply: tx })
            .map_err(|e| {
                error_utils::state_error(format!("发送撤销消息失败: {e}"))
            })?;

        rx.await
            .map_err(|e| {
                error_utils::state_error(format!("接收撤销响应失败: {e}"))
            })?
            .map(|_| ())
    }

    /// 🎯 重做操作 - 与原始redo完全相同的API
    ///
    /// 保持与runtime.rs:844-848行完全相同的接口
    pub async fn redo(&mut self) -> ForgeResult<()> {
        let (tx, rx) = oneshot::channel();

        self.actor_system()?
            .state_actor
            .send_message(StateMessage::Redo { reply: tx })
            .map_err(|e| {
                error_utils::state_error(format!("发送重做消息失败: {e}"))
            })?;

        rx.await
            .map_err(|e| {
                error_utils::state_error(format!("接收重做响应失败: {e}"))
            })?
            .map(|_| ())
    }

    /// 🎯 跳转到指定历史位置 - 与原始jump完全相同的API
    ///
    /// 保持与runtime.rs:850-856行完全相同的接口
    pub async fn jump(
        &mut self,
        steps: isize,
    ) -> ForgeResult<()> {
        let (tx, rx) = oneshot::channel();

        self.actor_system()?
            .state_actor
            .send_message(StateMessage::Jump { steps, reply: tx })
            .map_err(|e| {
                error_utils::state_error(format!("发送跳转消息失败: {e}"))
            })?;

        rx.await
            .map_err(|e| {
                error_utils::state_error(format!("接收跳转响应失败: {e}"))
            })?
            .map(|_| ())
    }

    /// 🎯 发送事件 - 与原始emit_event完全相同的API
    ///
    /// 保持与runtime.rs:521-528行完全相同的接口
    pub async fn emit_event(
        &mut self,
        event: Event,
    ) -> ForgeResult<()> {
        metrics::event_emitted(event.name());

        self.actor_system()?
            .event_bus
            .send_message(EventBusMessage::PublishEvent { event })
            .map_err(|e| {
                error_utils::event_error(format!("发送事件消息失败: {e}"))
            })?;

        Ok(())
    }

    /// 🎯 获取配置 - 与原始get_config完全相同的API
    ///
    /// 保持与runtime.rs:809-811行完全相同的接口
    pub fn get_config(&self) -> &ForgeConfig {
        &self.config
    }

    /// 🎯 更新配置 - 与原始update_config完全相同的API
    ///
    /// 保持与runtime.rs:814-819行完全相同的接口
    pub fn update_config(
        &mut self,
        config: ForgeConfig,
    ) {
        self.config = config;
        // 这里可以向各个Actor发送配置更新消息
    }

    /// 🎯 销毁运行时 - 与原始destroy完全相同的API
    ///
    /// 保持与runtime.rs:511-519行完全相同的接口
    #[cfg_attr(
        feature = "dev-tracing",
        tracing::instrument(
            skip(self),
            fields(crate_name = "core", runtime_type = "actor")
        )
    )]
    pub async fn destroy(&mut self) -> ForgeResult<()> {
        debug!("正在销毁Actor运行时实例");

        if self.started {
            // 广播销毁事件
            let _ = self.emit_event(Event::Destroy).await;

            // 关闭Actor系统
            if let Some(actor_system) = self.actor_system.take() {
                ForgeActorSystem::shutdown(actor_system).await.map_err(
                    |e| {
                        error_utils::engine_error(format!(
                            "关闭Actor系统失败: {e}"
                        ))
                    },
                )?;
            }

            self.started = false;
        }

        debug!("Actor运行时实例销毁成功");
        Ok(())
    }

    /// 检查运行时是否已启动
    pub fn is_started(&self) -> bool {
        self.started
    }

    /// 获取schema
    pub async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
        let state = self.get_state().await?;
        Ok(state.schema())
    }

    /// 获取运行时选项 (占位方法,Actor运行时不直接持有options)
    pub fn get_options(&self) -> RuntimeOptions {
        RuntimeOptions::default()
    }
}

/// 确保在Drop时清理资源
impl Drop for ForgeActorRuntime {
    fn drop(&mut self) {
        if self.started {
            debug!("ForgeActorRuntime Drop: 检测到未正确关闭的运行时");
            // 在Drop中只能做同步操作
            // 异步清理应该通过显式调用destroy()来完成
        }
    }
}

// ==================== RuntimeTrait 实现 ====================

#[async_trait]
impl RuntimeTrait for ForgeActorRuntime {
    async fn dispatch(
        &mut self,
        transaction: Transaction,
    ) -> ForgeResult<()> {
        self.dispatch(transaction).await
    }

    async fn dispatch_with_meta(
        &mut self,
        transaction: Transaction,
        description: String,
        meta: serde_json::Value,
    ) -> ForgeResult<()> {
        self.dispatch_with_meta(transaction, description, meta).await
    }

    async fn command(
        &mut self,
        command: Arc<dyn Command>,
    ) -> ForgeResult<()> {
        self.command(command).await
    }

    async fn command_with_meta(
        &mut self,
        command: Arc<dyn Command>,
        description: String,
        meta: serde_json::Value,
    ) -> ForgeResult<()> {
        self.command_with_meta(command, description, meta).await
    }

    async fn get_state(&self) -> ForgeResult<Arc<State>> {
        self.get_state().await
    }

    async fn get_tr(&self) -> ForgeResult<Transaction> {
        self.get_tr().await
    }

    async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
        self.get_schema().await
    }

    async fn undo(&mut self) -> ForgeResult<()> {
        self.undo().await
    }

    async fn redo(&mut self) -> ForgeResult<()> {
        self.redo().await
    }

    async fn jump(
        &mut self,
        steps: isize,
    ) -> ForgeResult<()> {
        self.jump(steps).await
    }

    fn get_config(&self) -> &ForgeConfig {
        self.get_config()
    }

    fn update_config(
        &mut self,
        config: ForgeConfig,
    ) {
        self.update_config(config);
    }

    fn get_options(&self) -> &RuntimeOptions {
        // Actor运行时不直接持有options,返回一个静态引用
        // 这是一个权衡,因为RuntimeTrait需要返回引用
        thread_local! {
            static DEFAULT_OPTIONS: RuntimeOptions = RuntimeOptions::default();
        }
        DEFAULT_OPTIONS.with(|opts| unsafe {
            // SAFETY: 这是一个只读的thread_local变量,生命周期与线程绑定
            std::mem::transmute::<&RuntimeOptions, &'static RuntimeOptions>(
                opts,
            )
        })
    }

    async fn destroy(&mut self) -> ForgeResult<()> {
        self.destroy().await
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_actor_runtime_creation() {
        let options = RuntimeOptions::default();
        let result = ForgeActorRuntime::create(options).await;

        // 基本创建测试 - 完整测试在集成测试中进行
        // 这里只验证编译和基本结构
        assert!(result.is_ok() || result.is_err()); // 确保返回了某种结果
    }

    #[tokio::test]
    async fn test_actor_runtime_api_compatibility() {
        // 测试API签名是否与原始ForgeRuntime兼容
        // 这确保了API层面的兼容性

        let options = RuntimeOptions::default();
        if let Ok(mut runtime) = ForgeActorRuntime::create(options).await {
            // 这些调用应该编译通过,验证API兼容性
            let _ = runtime.get_config();
            let _ = runtime.is_started();

            // 清理
            let _ = runtime.destroy().await;
        }
    }
}