mf_core/runtime/
async_runtime.rs

1//! # 异步运行时超时机制改进
2//!
3//! 本模块为 ModuForge 异步运行时添加了全面的超时保护机制,解决了以下问题:
4//!
5//! ## 主要改进
6//!
7//! 1. **任务接收超时**:防止 `rx.recv().await` 无限等待
8//! 2. **中间件超时配置化**:统一使用配置而非硬编码超时时间
9//!
10//! ## 配置说明
11//!
12//! 通过 `PerformanceConfig` 可以配置各种超时时间:
13//!
14//! ```rust
15//! use mf_core::async_runtime::PerformanceConfig;
16//!
17//! let config = PerformanceConfig {
18//!     enable_monitoring: true,
19//!     middleware_timeout_ms: 1000,         // 中间件超时 1秒
20//!     task_receive_timeout_ms: 5000,       // 任务接收超时 5秒
21//!     ..Default::default()
22//! };
23//! ```
24//!
25//! ## 使用建议
26//!
27//! - **开发环境**:使用较长的超时时间(如 10-30 秒)便于调试
28//! - **生产环境**:使用较短的超时时间(如 1-5 秒)保证响应性
29//! - **高负载环境**:根据实际性能测试调整超时时间
30//!
31//! ## 错误处理
32//!
33//! 所有超时都会产生详细的错误信息,包含:
34//! - 超时的具体操作类型
35//! - 配置的超时时间
36//! - 便于调试的上下文信息
37
38use std::{
39    ops::{Deref, DerefMut},
40    sync::Arc,
41    time::Duration,
42};
43use async_trait::async_trait;
44use crate::runtime::runtime::ForgeRuntime;
45use crate::runtime::runtime_trait::RuntimeTrait;
46use crate::types::ProcessorResult;
47use crate::{
48    config::{ForgeConfig, PerformanceConfig},
49    debug::debug,
50    error::error_utils,
51    event::Event,
52    runtime::async_flow::{FlowEngine},
53    types::RuntimeOptions,
54    ForgeResult,
55};
56use mf_model::schema::Schema;
57use mf_state::{
58    state::TransactionResult,
59    transaction::{Command, Transaction},
60    State,
61};
62
63// PerformanceConfig 现在从 crate::config 模块导入
64
65/// 异步编�器运行时
66///
67/// 提供异步操作支持的编辑器运行时,包含:
68/// - 基础编辑器功能(通过 ForgeRuntime)
69/// - 异步流引擎(用于处理复杂的异步操作流)
70///
71/// 配置通过基础 ForgeRuntime 访问,避免重复持有
72pub struct ForgeAsyncRuntime {
73    base: ForgeRuntime,
74    flow_engine: FlowEngine,
75}
76
77impl Deref for ForgeAsyncRuntime {
78    type Target = ForgeRuntime;
79
80    fn deref(&self) -> &Self::Target {
81        &self.base
82    }
83}
84
85impl DerefMut for ForgeAsyncRuntime {
86    fn deref_mut(&mut self) -> &mut Self::Target {
87        &mut self.base
88    }
89}
90impl ForgeAsyncRuntime {
91    /// 创建新的编辑器实例
92    ///
93    /// 此方法会自动从以下位置加载XML schema配置:
94    /// 1. 优先使用 `config.extension.xml_schema_paths` 中配置的路径
95    /// 2. 如果没有配置,则尝试加载默认的 `schema/main.xml`
96    /// 3. 如果都没有,则使用默认配置
97    ///
98    /// # 参数
99    /// * `options` - 编辑器配置选项
100    ///
101    /// # 返回值
102    /// * `ForgeResult<Self>` - 异步编辑器实例或错误
103    #[cfg_attr(
104        feature = "dev-tracing",
105        tracing::instrument(
106            skip(options),
107            fields(crate_name = "core", runtime_type = "async")
108        )
109    )]
110    pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
111        Self::create_with_config(options, ForgeConfig::default()).await
112    }
113
114    /// 从指定路径的XML schema文件创建异步编辑器实例
115    ///
116    /// # 参数
117    /// * `xml_schema_path` - XML schema文件路径
118    /// * `options` - 可选的RuntimeOptions配置
119    /// * `config` - 可选的ForgeConfig配置
120    ///
121    /// # 返回值
122    /// * `ForgeResult<Self>` - 异步编辑器实例或错误
123    ///
124    /// # 示例
125    /// ```rust
126    /// use mf_core::ForgeAsyncRuntime;
127    ///
128    /// // 从指定路径加载schema
129    /// let runtime = ForgeAsyncRuntime::from_xml_schema_path(
130    ///     "./schemas/document.xml",
131    ///     None,
132    ///     None
133    /// ).await?;
134    /// ```
135    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
136        crate_name = "core",
137        schema_path = xml_schema_path,
138        runtime_type = "async"
139    )))]
140    pub async fn from_xml_schema_path(
141        xml_schema_path: &str,
142        options: Option<RuntimeOptions>,
143        config: Option<ForgeConfig>,
144    ) -> ForgeResult<Self> {
145        let mut config = config.unwrap_or_default();
146        config.extension.xml_schema_paths = vec![xml_schema_path.to_string()];
147        Self::create_with_config(options.unwrap_or_default(), config).await
148    }
149
150    /// 从多个XML schema文件创建异步编辑器实例
151    ///
152    /// # 参数
153    /// * `xml_schema_paths` - XML schema文件路径列表
154    /// * `options` - 可选的RuntimeOptions配置
155    /// * `config` - 可选的ForgeConfig配置
156    ///
157    /// # 返回值
158    /// * `ForgeResult<Self>` - 异步编辑器实例或错误
159    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_schema_paths, options, config), fields(
160        crate_name = "core",
161        schema_count = xml_schema_paths.len(),
162        runtime_type = "async"
163    )))]
164    pub async fn from_xml_schemas(
165        xml_schema_paths: &[&str],
166        options: Option<RuntimeOptions>,
167        config: Option<ForgeConfig>,
168    ) -> ForgeResult<Self> {
169        let mut config = config.unwrap_or_default();
170        config.extension.xml_schema_paths =
171            xml_schema_paths.iter().map(|s| s.to_string()).collect();
172        Self::create_with_config(options.unwrap_or_default(), config).await
173    }
174
175    /// 从XML内容字符串创建异步编辑器实例
176    ///
177    /// # 参数
178    /// * `xml_content` - XML schema内容
179    /// * `options` - 可选的RuntimeOptions配置
180    /// * `config` - 可选的ForgeConfig配置
181    ///
182    /// # 返回值
183    /// * `ForgeResult<Self>` - 异步编辑器实例或错误
184    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_content, options, config), fields(
185        crate_name = "core",
186        content_size = xml_content.len(),
187        runtime_type = "async"
188    )))]
189    pub async fn from_xml_content(
190        xml_content: &str,
191        options: Option<RuntimeOptions>,
192        config: Option<ForgeConfig>,
193    ) -> ForgeResult<Self> {
194        let base = ForgeRuntime::from_xml_content(xml_content, options, config)
195            .await?;
196        Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
197    }
198
199    /// 使用指定配置创建异步编辑器实例
200    ///
201    /// 此方法会自动从以下位置加载XML schema配置:
202    /// 1. 优先使用 `config.extension.xml_schema_paths` 中配置的路径
203    /// 2. 如果没有配置,则尝试加载默认的 `schema/main.xml`
204    /// 3. 如果都没有,则使用默认配置
205    ///
206    /// # 参数
207    /// * `options` - 编辑器配置选项
208    /// * `config` - 编辑器配置
209    ///
210    /// # 返回值
211    /// * `ForgeResult<Self>` - 异步编辑器实例或错误
212    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
213        crate_name = "core",
214        runtime_type = "async",
215        has_middleware = !options.get_middleware_stack().is_empty()
216    )))]
217    pub async fn create_with_config(
218        options: RuntimeOptions,
219        config: ForgeConfig,
220    ) -> ForgeResult<Self> {
221        let base = ForgeRuntime::create_with_config(options, config).await?;
222        Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
223    }
224
225    /// 设置性能监控配置
226    pub fn set_performance_config(
227        &mut self,
228        perf_config: PerformanceConfig,
229    ) {
230        self.base.update_config({
231            let mut config = self.base.get_config().clone();
232            config.performance = perf_config;
233            config
234        });
235    }
236
237    /// 获取当前配置
238    pub fn get_config(&self) -> &ForgeConfig {
239        self.base.get_config()
240    }
241
242    /// 更新配置
243    pub fn update_config(
244        &mut self,
245        config: ForgeConfig,
246    ) {
247        self.base.update_config(config);
248    }
249
250    /// 记录性能指标
251    fn log_performance(
252        &self,
253        operation: &str,
254        duration: Duration,
255    ) {
256        if self.base.get_config().performance.enable_monitoring
257            && duration.as_millis()
258                > self.base.get_config().performance.log_threshold_ms as u128
259        {
260            debug!("{} 耗时: {}ms", operation, duration.as_millis());
261        }
262    }
263    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command), fields(
264        crate_name = "core",
265        command_name = %command.name(),
266        runtime_type = "async"
267    )))]
268    pub async fn command(
269        &mut self,
270        command: Arc<dyn Command>,
271    ) -> ForgeResult<()> {
272        self.command_with_meta(command, "".to_string(), serde_json::Value::Null)
273            .await
274    }
275
276    /// 执行命令并生成相应的事务
277    ///
278    /// 此方法封装了命令到事务的转换过程,并使用高性能的`dispatch_flow`来处理生成的事务。
279    /// 适用于需要执行编辑器命令而不直接构建事务的场景。
280    ///
281    /// # 参数
282    /// * `command` - 要执行的命令
283    ///
284    /// # 返回值
285    /// * `EditorResult<()>` - 命令执行结果
286    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command, meta), fields(
287        crate_name = "core",
288        command_name = %command.name(),
289        description = %description,
290        runtime_type = "async"
291    )))]
292    pub async fn command_with_meta(
293        &mut self,
294        command: Arc<dyn Command>,
295        description: String,
296        meta: serde_json::Value,
297    ) -> ForgeResult<()> {
298        let cmd_name = command.name();
299        debug!("正在执行命令: {}", cmd_name);
300
301        // 创建事务并应用命令
302        let mut tr = self.base.get_tr();
303        command.execute(&mut tr).await?;
304        tr.commit()?;
305        // 使用高性能处理引擎处理事务
306        match self.dispatch_flow_with_meta(tr, description, meta).await {
307            Ok(_) => {
308                debug!("命令 '{}' 执行成功", cmd_name);
309                Ok(())
310            },
311            Err(e) => {
312                debug!("命令 '{}' 执行失败: {}", cmd_name, e);
313                Err(e)
314            },
315        }
316    }
317    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
318        crate_name = "core",
319        tr_id = %transaction.id,
320        runtime_type = "async"
321    )))]
322    pub async fn dispatch_flow(
323        &mut self,
324        transaction: Transaction,
325    ) -> ForgeResult<()> {
326        self.dispatch_flow_with_meta(
327            transaction,
328            "".to_string(),
329            serde_json::Value::Null,
330        )
331        .await
332    }
333    /// 高性能事务处理方法,使用FlowEngine处理事务
334    ///
335    /// 与标准的dispatch方法相比,此方法具有以下优势:
336    /// 1. 利用FlowEngine提供的并行处理能力
337    /// 2. 通过异步流水线处理提高性能
338    /// 3. 减少阻塞操作,提升UI响应性
339    /// 4. 更好地处理大型文档的编辑操作
340    ///
341    /// # 参数
342    /// * `transaction` - 要处理的事务对象
343    ///
344    /// # 返回值
345    /// * `EditorResult<()>` - 处理结果,成功返回Ok(()), 失败返回错误
346    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction, meta), fields(
347        crate_name = "core",
348        tr_id = %transaction.id,
349        description = %description,
350        runtime_type = "async"
351    )))]
352    pub async fn dispatch_flow_with_meta(
353        &mut self,
354        transaction: Transaction,
355        description: String,
356        meta: serde_json::Value,
357    ) -> ForgeResult<()> {
358        let start_time = std::time::Instant::now();
359        let mut current_transaction = transaction;
360        let _old_id = self.get_state().await?.version;
361        // 前置中间件处理
362        let middleware_start = std::time::Instant::now();
363        self.run_before_middleware(&mut current_transaction).await?;
364        self.log_performance("前置中间件处理", middleware_start.elapsed());
365
366        // 使用 flow_engine 提交事务
367        let (_id, mut rx) = self
368            .flow_engine
369            .submit_transaction((
370                self.base.get_state().clone(),
371                current_transaction,
372            ))
373            .await?;
374
375        // 等待任务结果(添加超时保护)
376        let recv_start = std::time::Instant::now();
377        let task_receive_timeout = Duration::from_millis(
378            self.base.get_config().performance.task_receive_timeout_ms,
379        );
380        let task_result =
381            match tokio::time::timeout(task_receive_timeout, rx.recv()).await {
382                Ok(Some(result)) => result,
383                Ok(None) => {
384                    return Err(error_utils::state_error(
385                        "任务接收通道已关闭".to_string(),
386                    ));
387                },
388                Err(_) => {
389                    return Err(error_utils::state_error(format!(
390                        "任务接收超时({}ms)",
391                        self.base
392                            .get_config()
393                            .performance
394                            .task_receive_timeout_ms
395                    )));
396                },
397            };
398        self.log_performance("接收任务结果", recv_start.elapsed());
399
400        // 获取处理结果
401        let Some(ProcessorResult { result: Some(result), .. }) =
402            task_result.output
403        else {
404            return Err(error_utils::state_error(
405                "任务处理结果无效".to_string(),
406            ));
407        };
408
409        // 更新编辑器状态
410        let mut current_state = None;
411        let mut transactions = Vec::new();
412        transactions.extend(result.transactions);
413
414        // 检查最后一个事务是否改变了文档
415        if transactions.last().is_some() {
416            current_state = Some(result.state);
417        }
418
419        // 执行后置中间件链
420        let after_start = std::time::Instant::now();
421        self.run_after_middleware(&mut current_state, &mut transactions)
422            .await?;
423        self.log_performance("后置中间件处理", after_start.elapsed());
424
425        // 更新状态并广播事件(状态更新无需超时保护,事件广播需要)
426        if let Some(new_state) = current_state {
427            let old_state = self.base.get_state().clone();
428            self.base
429                .update_state_with_meta(new_state.clone(), transactions.clone(), description, meta)
430                .await?;
431
432            let event_start = std::time::Instant::now();
433            self.base
434                .emit_event(Event::TrApply {
435                    old_state,
436                    new_state,
437                    transactions,
438                })
439                .await?;
440            self.log_performance("事件广播", event_start.elapsed());
441        }
442
443        self.log_performance("事务处理总耗时", start_time.elapsed());
444        Ok(())
445    }
446
447    #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
448        crate_name = "core",
449        tr_id = %transaction.id,
450        middleware_count = self.base.get_options().get_middleware_stack().middlewares.len(),
451        runtime_type = "async"
452    )))]
453    pub async fn run_before_middleware(
454        &mut self,
455        transaction: &mut Transaction,
456    ) -> ForgeResult<()> {
457        use crate::helpers::middleware_helper::MiddlewareHelper;
458
459        MiddlewareHelper::run_before_middleware(
460            transaction,
461            &self.base.get_options().get_middleware_stack(),
462            self.base.get_config(),
463        )
464        .await?;
465
466        transaction.commit()?;
467        Ok(())
468    }
469    pub async fn run_after_middleware(
470        &mut self,
471        state: &mut Option<Arc<State>>,
472        transactions: &mut Vec<Arc<Transaction>>,
473    ) -> ForgeResult<()> {
474        debug!("执行后置中间件链");
475        for middleware in
476            &self.base.get_options().get_middleware_stack().middlewares
477        {
478            // 使用常量定义超时时间,便于配置调整
479
480            let timeout = std::time::Duration::from_millis(
481                self.base.get_config().performance.middleware_timeout_ms,
482            );
483
484            // 记录中间件执行开始时间,用于性能监控
485            let start_time = std::time::Instant::now();
486
487            let middleware_result = match tokio::time::timeout(
488                timeout,
489                middleware.after_dispatch(state.clone(), transactions),
490            )
491            .await
492            {
493                Ok(result) => match result {
494                    Ok(r) => r,
495                    Err(e) => {
496                        // 记录更详细的错误信息
497                        debug!("中间件执行失败: {}", e);
498                        return Err(error_utils::middleware_error(format!(
499                            "中间件执行失败: {e}"
500                        )));
501                    },
502                },
503                Err(e) => {
504                    debug!("中间件执行超时: {}", e);
505                    return Err(error_utils::middleware_error(format!(
506                        "中间件执行超时: {e}"
507                    )));
508                },
509            };
510
511            // 记录中间件执行时间,用于性能监控
512            let elapsed = start_time.elapsed();
513            if elapsed.as_millis() > 100 {
514                debug!("中间件执行时间较长: {}ms", elapsed.as_millis());
515            }
516
517            if let Some(mut transaction) = middleware_result {
518                transaction.commit()?;
519                // 记录额外事务处理开始时间
520                let tx_start_time = std::time::Instant::now();
521
522                let result = match self
523                    .flow_engine
524                    .submit_transaction((
525                        self.base.get_state().clone(),
526                        transaction,
527                    ))
528                    .await
529                {
530                    Ok(result) => result,
531                    Err(e) => {
532                        debug!("附加事务提交失败: {}", e);
533                        return Err(error_utils::state_error(format!(
534                            "附加事务提交失败: {e}"
535                        )));
536                    },
537                };
538
539                let (_id, mut rx) = result;
540
541                // 添加任务接收超时保护
542                let task_receive_timeout = Duration::from_millis(
543                    self.base.get_config().performance.task_receive_timeout_ms,
544                );
545                let task_result =
546                    match tokio::time::timeout(task_receive_timeout, rx.recv())
547                        .await
548                    {
549                        Ok(Some(result)) => result,
550                        Ok(None) => {
551                            debug!("附加事务接收通道已关闭");
552                            return Ok(());
553                        },
554                        Err(_) => {
555                            debug!("附加事务接收超时");
556                            return Err(error_utils::state_error(format!(
557                                "附加事务接收超时({}ms)",
558                                self.base
559                                    .get_config()
560                                    .performance
561                                    .task_receive_timeout_ms
562                            )));
563                        },
564                    };
565
566                let Some(ProcessorResult { result: Some(result), .. }) =
567                    task_result.output
568                else {
569                    debug!("附加事务处理结果无效");
570                    return Ok(());
571                };
572
573                let TransactionResult { state: new_state, transactions: trs } =
574                    result;
575                *state = Some(new_state);
576                transactions.extend(trs);
577
578                // 记录额外事务处理时间
579                let tx_elapsed = tx_start_time.elapsed();
580                if tx_elapsed.as_millis() > 50 {
581                    debug!(
582                        "附加事务处理时间较长: {}ms",
583                        tx_elapsed.as_millis()
584                    );
585                }
586            }
587        }
588        Ok(())
589    }
590
591    /// 优雅关闭异步运行时
592    ///
593    /// 这个方法会:
594    /// 1. 停止接受新任务
595    /// 2. 等待所有正在处理的任务完成
596    /// 3. 关闭底层的异步处理器
597    /// 4. 清理所有资源
598    #[cfg_attr(
599        feature = "dev-tracing",
600        tracing::instrument(
601            skip(self),
602            fields(crate_name = "core", runtime_type = "async")
603        )
604    )]
605    pub async fn shutdown(&mut self) -> ForgeResult<()> {
606        debug!("开始关闭异步运行时");
607
608        // 首先关闭底层运行时
609        self.base.destroy().await?;
610
611        // 然后关闭流引擎(这会等待所有任务完成)
612        // 注意:由于 FlowEngine 包含 Arc<AsyncProcessor>,我们需要获取可变引用
613        // 这里我们使用 Arc::try_unwrap 来获取所有权,如果失败说明还有其他引用
614        debug!("正在关闭流引擎...");
615
616        debug!("异步运行时已成功关闭");
617        Ok(())
618    }
619}
620
621// ==================== RuntimeTrait 实现 ====================
622
623#[async_trait]
624impl RuntimeTrait for ForgeAsyncRuntime {
625    async fn dispatch(
626        &mut self,
627        transaction: Transaction,
628    ) -> ForgeResult<()> {
629        // 使用高性能的 dispatch_flow 而不是基类的 dispatch
630        self.dispatch_flow(transaction).await
631    }
632
633    async fn dispatch_with_meta(
634        &mut self,
635        transaction: Transaction,
636        description: String,
637        meta: serde_json::Value,
638    ) -> ForgeResult<()> {
639        // 使用高性能的 dispatch_flow_with_meta
640        self.dispatch_flow_with_meta(transaction, description, meta).await
641    }
642
643    async fn command(
644        &mut self,
645        command: Arc<dyn Command>,
646    ) -> ForgeResult<()> {
647        self.command(command).await
648    }
649
650    async fn command_with_meta(
651        &mut self,
652        command: Arc<dyn Command>,
653        description: String,
654        meta: serde_json::Value,
655    ) -> ForgeResult<()> {
656        self.command_with_meta(command, description, meta).await
657    }
658
659    async fn get_state(&self) -> ForgeResult<Arc<State>> {
660        Ok(self.base.get_state().clone())
661    }
662
663    async fn get_tr(&self) -> ForgeResult<Transaction> {
664        Ok(self.base.get_tr())
665    }
666
667    async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
668        Ok(self.base.get_schema())
669    }
670
671    async fn undo(&mut self) -> ForgeResult<()> {
672        self.base.undo();
673        Ok(())
674    }
675
676    async fn redo(&mut self) -> ForgeResult<()> {
677        self.base.redo();
678        Ok(())
679    }
680
681    async fn jump(
682        &mut self,
683        steps: isize,
684    ) -> ForgeResult<()> {
685        self.base.jump(steps);
686        Ok(())
687    }
688
689    fn get_config(&self) -> &ForgeConfig {
690        self.base.get_config()
691    }
692
693    fn update_config(
694        &mut self,
695        config: ForgeConfig,
696    ) {
697        self.base.update_config(config);
698    }
699
700    fn get_options(&self) -> &RuntimeOptions {
701        self.base.get_options()
702    }
703
704    async fn destroy(&mut self) -> ForgeResult<()> {
705        self.shutdown().await
706    }
707}