mf_core/runtime/
async_runtime.rs

1//! # 异步运行时超时机制改进
2//!
3//! 本模块为 ModuForge 异步运行时添加了全面的超时保护机制,解决了以下问题:
4//!
5//! ## 主要改进
6//!
7//! 1. **任务接收超时**:防止 `rx.recv().await` 无限等待
8//! 2. **中间件超时配置化**:统一使用配置而非硬编码超时时间
9//!
10//! ## 配置说明
11//!
12//! 通过 `PerformanceConfig` 可以配置各种超时时间:
13//!
14//! ```rust
15//! use mf_core::async_runtime::PerformanceConfig;
16//!
17//! let config = PerformanceConfig {
18//!     enable_monitoring: true,
19//!     middleware_timeout_ms: 1000,         // 中间件超时 1秒
20//!     task_receive_timeout_ms: 5000,       // 任务接收超时 5秒
21//!     ..Default::default()
22//! };
23//! ```
24//!
25//! ## 使用建议
26//!
27//! - **开发环境**:使用较长的超时时间(如 10-30 秒)便于调试
28//! - **生产环境**:使用较短的超时时间(如 1-5 秒)保证响应性
29//! - **高负载环境**:根据实际性能测试调整超时时间
30//!
31//! ## 错误处理
32//!
33//! 所有超时都会产生详细的错误信息,包含:
34//! - 超时的具体操作类型
35//! - 配置的超时时间
36//! - 便于调试的上下文信息
37
38use std::{
39    ops::{Deref, DerefMut},
40    sync::Arc,
41    time::Duration,
42};
43use async_trait::async_trait;
44use crate::runtime::runtime::ForgeRuntime;
45use crate::runtime::runtime_trait::RuntimeTrait;
46use crate::types::ProcessorResult;
47use crate::{
48    config::{ForgeConfig, PerformanceConfig},
49    debug::debug,
50    error::error_utils,
51    event::Event,
52    runtime::async_flow::{FlowEngine},
53    types::RuntimeOptions,
54    ForgeResult,
55};
56use mf_model::schema::Schema;
57use mf_state::{
58    state::TransactionResult,
59    transaction::{Command, Transaction},
60    State,
61};
62
63// PerformanceConfig 现在从 crate::config 模块导入
64
65/// 异步编�器运行时
66///
67/// 提供异步操作支持的编辑器运行时,包含:
68/// - 基础编辑器功能(通过 ForgeRuntime)
69/// - 异步流引擎(用于处理复杂的异步操作流)
70///
71/// 配置通过基础 ForgeRuntime 访问,避免重复持有
72pub struct ForgeAsyncRuntime {
73    base: ForgeRuntime,
74    flow_engine: FlowEngine,
75}
76
77impl Deref for ForgeAsyncRuntime {
78    type Target = ForgeRuntime;
79
80    fn deref(&self) -> &Self::Target {
81        &self.base
82    }
83}
84
85impl DerefMut for ForgeAsyncRuntime {
86    fn deref_mut(&mut self) -> &mut Self::Target {
87        &mut self.base
88    }
89}
90impl ForgeAsyncRuntime {
91    /// 创建新的编辑器实例
92    ///
93    /// 此方法会自动从以下位置加载XML schema配置:
94    /// 1. 优先使用 `config.extension.xml_schema_paths` 中配置的路径
95    /// 2. 如果没有配置,则尝试加载默认的 `schema/main.xml`
96    /// 3. 如果都没有,则使用默认配置
97    ///
98    /// # 参数
99    /// * `options` - 编辑器配置选项
100    ///
101    /// # 返回值
102    /// * `ForgeResult<Self>` - 异步编辑器实例或错误
103    pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
104        Self::create_with_config(options, ForgeConfig::default()).await
105    }
106
107    /// 从指定路径的XML schema文件创建异步编辑器实例
108    ///
109    /// # 参数
110    /// * `xml_schema_path` - XML schema文件路径
111    /// * `options` - 可选的RuntimeOptions配置
112    /// * `config` - 可选的ForgeConfig配置
113    ///
114    /// # 返回值
115    /// * `ForgeResult<Self>` - 异步编辑器实例或错误
116    ///
117    /// # 示例
118    /// ```rust
119    /// use mf_core::ForgeAsyncRuntime;
120    ///
121    /// // 从指定路径加载schema
122    /// let runtime = ForgeAsyncRuntime::from_xml_schema_path(
123    ///     "./schemas/document.xml",
124    ///     None,
125    ///     None
126    /// ).await?;
127    /// ```
128    pub async fn from_xml_schema_path(
129        xml_schema_path: &str,
130        options: Option<RuntimeOptions>,
131        config: Option<ForgeConfig>,
132    ) -> ForgeResult<Self> {
133        let mut config = config.unwrap_or_default();
134        config.extension.xml_schema_paths = vec![xml_schema_path.to_string()];
135        Self::create_with_config(options.unwrap_or_default(), config).await
136    }
137
138    /// 从多个XML schema文件创建异步编辑器实例
139    ///
140    /// # 参数
141    /// * `xml_schema_paths` - XML schema文件路径列表
142    /// * `options` - 可选的RuntimeOptions配置
143    /// * `config` - 可选的ForgeConfig配置
144    ///
145    /// # 返回值
146    /// * `ForgeResult<Self>` - 异步编辑器实例或错误
147    pub async fn from_xml_schemas(
148        xml_schema_paths: &[&str],
149        options: Option<RuntimeOptions>,
150        config: Option<ForgeConfig>,
151    ) -> ForgeResult<Self> {
152        let mut config = config.unwrap_or_default();
153        config.extension.xml_schema_paths =
154            xml_schema_paths.iter().map(|s| s.to_string()).collect();
155        Self::create_with_config(options.unwrap_or_default(), config).await
156    }
157
158    /// 从XML内容字符串创建异步编辑器实例
159    ///
160    /// # 参数
161    /// * `xml_content` - XML schema内容
162    /// * `options` - 可选的RuntimeOptions配置
163    /// * `config` - 可选的ForgeConfig配置
164    ///
165    /// # 返回值
166    /// * `ForgeResult<Self>` - 异步编辑器实例或错误
167    pub async fn from_xml_content(
168        xml_content: &str,
169        options: Option<RuntimeOptions>,
170        config: Option<ForgeConfig>,
171    ) -> ForgeResult<Self> {
172        let base = ForgeRuntime::from_xml_content(xml_content, options, config)
173            .await?;
174        Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
175    }
176
177    /// 使用指定配置创建异步编辑器实例
178    ///
179    /// 此方法会自动从以下位置加载XML schema配置:
180    /// 1. 优先使用 `config.extension.xml_schema_paths` 中配置的路径
181    /// 2. 如果没有配置,则尝试加载默认的 `schema/main.xml`
182    /// 3. 如果都没有,则使用默认配置
183    ///
184    /// # 参数
185    /// * `options` - 编辑器配置选项
186    /// * `config` - 编辑器配置
187    ///
188    /// # 返回值
189    /// * `ForgeResult<Self>` - 异步编辑器实例或错误
190    pub async fn create_with_config(
191        options: RuntimeOptions,
192        config: ForgeConfig,
193    ) -> ForgeResult<Self> {
194        let base = ForgeRuntime::create_with_config(options, config).await?;
195        Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
196    }
197
198    /// 设置性能监控配置
199    pub fn set_performance_config(
200        &mut self,
201        perf_config: PerformanceConfig,
202    ) {
203        self.base.update_config({
204            let mut config = self.base.get_config().clone();
205            config.performance = perf_config;
206            config
207        });
208    }
209
210    /// 获取当前配置
211    pub fn get_config(&self) -> &ForgeConfig {
212        self.base.get_config()
213    }
214
215    /// 更新配置
216    pub fn update_config(
217        &mut self,
218        config: ForgeConfig,
219    ) {
220        self.base.update_config(config);
221    }
222
223    /// 记录性能指标
224    fn log_performance(
225        &self,
226        operation: &str,
227        duration: Duration,
228    ) {
229        if self.base.get_config().performance.enable_monitoring
230            && duration.as_millis()
231                > self.base.get_config().performance.log_threshold_ms as u128
232        {
233            debug!("{} 耗时: {}ms", operation, duration.as_millis());
234        }
235    }
236    pub async fn command(
237        &mut self,
238        command: Arc<dyn Command>,
239    ) -> ForgeResult<()> {
240        self.command_with_meta(command, "".to_string(), serde_json::Value::Null)
241            .await
242    }
243
244    /// 执行命令并生成相应的事务
245    ///
246    /// 此方法封装了命令到事务的转换过程,并使用高性能的`dispatch_flow`来处理生成的事务。
247    /// 适用于需要执行编辑器命令而不直接构建事务的场景。
248    ///
249    /// # 参数
250    /// * `command` - 要执行的命令
251    ///
252    /// # 返回值
253    /// * `EditorResult<()>` - 命令执行结果
254    pub async fn command_with_meta(
255        &mut self,
256        command: Arc<dyn Command>,
257        description: String,
258        meta: serde_json::Value,
259    ) -> ForgeResult<()> {
260        let cmd_name = command.name();
261        debug!("正在执行命令: {}", cmd_name);
262
263        // 创建事务并应用命令
264        let mut tr = self.base.get_tr();
265        command.execute(&mut tr).await?;
266        tr.commit()?;
267        // 使用高性能处理引擎处理事务
268        match self.dispatch_flow_with_meta(tr, description, meta).await {
269            Ok(_) => {
270                debug!("命令 '{}' 执行成功", cmd_name);
271                Ok(())
272            },
273            Err(e) => {
274                debug!("命令 '{}' 执行失败: {}", cmd_name, e);
275                Err(e)
276            },
277        }
278    }
279    pub async fn dispatch_flow(
280        &mut self,
281        transaction: Transaction,
282    ) -> ForgeResult<()> {
283        self.dispatch_flow_with_meta(
284            transaction,
285            "".to_string(),
286            serde_json::Value::Null,
287        )
288        .await
289    }
290    /// 高性能事务处理方法,使用FlowEngine处理事务
291    ///
292    /// 与标准的dispatch方法相比,此方法具有以下优势:
293    /// 1. 利用FlowEngine提供的并行处理能力
294    /// 2. 通过异步流水线处理提高性能
295    /// 3. 减少阻塞操作,提升UI响应性
296    /// 4. 更好地处理大型文档的编辑操作
297    ///
298    /// # 参数
299    /// * `transaction` - 要处理的事务对象
300    ///
301    /// # 返回值
302    /// * `EditorResult<()>` - 处理结果,成功返回Ok(()), 失败返回错误
303    pub async fn dispatch_flow_with_meta(
304        &mut self,
305        transaction: Transaction,
306        description: String,
307        meta: serde_json::Value,
308    ) -> ForgeResult<()> {
309        let start_time = std::time::Instant::now();
310        let mut current_transaction = transaction;
311        let old_id = self.get_state().await?.version;
312        // 前置中间件处理
313        let middleware_start = std::time::Instant::now();
314        self.run_before_middleware(&mut current_transaction).await?;
315        self.log_performance("前置中间件处理", middleware_start.elapsed());
316
317        // 使用 flow_engine 提交事务
318        let (_id, mut rx) = self
319            .flow_engine
320            .submit_transaction((
321                self.base.get_state().clone(),
322                current_transaction,
323            ))
324            .await?;
325
326        // 等待任务结果(添加超时保护)
327        let recv_start = std::time::Instant::now();
328        let task_receive_timeout = Duration::from_millis(
329            self.base.get_config().performance.task_receive_timeout_ms,
330        );
331        let task_result =
332            match tokio::time::timeout(task_receive_timeout, rx.recv()).await {
333                Ok(Some(result)) => result,
334                Ok(None) => {
335                    return Err(error_utils::state_error(
336                        "任务接收通道已关闭".to_string(),
337                    ));
338                },
339                Err(_) => {
340                    return Err(error_utils::state_error(format!(
341                        "任务接收超时({}ms)",
342                        self.base
343                            .get_config()
344                            .performance
345                            .task_receive_timeout_ms
346                    )));
347                },
348            };
349        self.log_performance("接收任务结果", recv_start.elapsed());
350
351        // 获取处理结果
352        let Some(ProcessorResult { result: Some(result), .. }) =
353            task_result.output
354        else {
355            return Err(error_utils::state_error(
356                "任务处理结果无效".to_string(),
357            ));
358        };
359
360        // 更新编辑器状态
361        let mut current_state = None;
362        let mut transactions = Vec::new();
363        transactions.extend(result.transactions);
364
365        // 检查最后一个事务是否改变了文档
366        if transactions.last().is_some() {
367            current_state = Some(result.state);
368        }
369
370        // 执行后置中间件链
371        let after_start = std::time::Instant::now();
372        self.run_after_middleware(&mut current_state, &mut transactions)
373            .await?;
374        self.log_performance("后置中间件处理", after_start.elapsed());
375
376        // 更新状态并广播事件(状态更新无需超时保护,事件广播需要)
377        if let Some(state) = current_state {
378            self.base
379                .update_state_with_meta(state.clone(), description, meta)
380                .await?;
381
382            let event_start = std::time::Instant::now();
383            self.base
384                .emit_event(Event::TrApply(old_id, transactions, state))
385                .await?;
386            self.log_performance("事件广播", event_start.elapsed());
387        }
388
389        self.log_performance("事务处理总耗时", start_time.elapsed());
390        Ok(())
391    }
392
393    pub async fn run_before_middleware(
394        &mut self,
395        transaction: &mut Transaction,
396    ) -> ForgeResult<()> {
397        use crate::helpers::middleware_helper::MiddlewareHelper;
398
399        MiddlewareHelper::run_before_middleware(
400            transaction,
401            &self.base.get_options().get_middleware_stack(),
402            self.base.get_config(),
403        )
404        .await?;
405
406        transaction.commit()?;
407        Ok(())
408    }
409    pub async fn run_after_middleware(
410        &mut self,
411        state: &mut Option<Arc<State>>,
412        transactions: &mut Vec<Arc<Transaction>>,
413    ) -> ForgeResult<()> {
414        debug!("执行后置中间件链");
415        for middleware in
416            &self.base.get_options().get_middleware_stack().middlewares
417        {
418            // 使用常量定义超时时间,便于配置调整
419
420            let timeout = std::time::Duration::from_millis(
421                self.base.get_config().performance.middleware_timeout_ms,
422            );
423
424            // 记录中间件执行开始时间,用于性能监控
425            let start_time = std::time::Instant::now();
426
427            let middleware_result = match tokio::time::timeout(
428                timeout,
429                middleware.after_dispatch(state.clone(), transactions),
430            )
431            .await
432            {
433                Ok(result) => match result {
434                    Ok(r) => r,
435                    Err(e) => {
436                        // 记录更详细的错误信息
437                        debug!("中间件执行失败: {}", e);
438                        return Err(error_utils::middleware_error(format!(
439                            "中间件执行失败: {e}"
440                        )));
441                    },
442                },
443                Err(e) => {
444                    debug!("中间件执行超时: {}", e);
445                    return Err(error_utils::middleware_error(format!(
446                        "中间件执行超时: {e}"
447                    )));
448                },
449            };
450
451            // 记录中间件执行时间,用于性能监控
452            let elapsed = start_time.elapsed();
453            if elapsed.as_millis() > 100 {
454                debug!("中间件执行时间较长: {}ms", elapsed.as_millis());
455            }
456
457            if let Some(mut transaction) = middleware_result {
458                transaction.commit()?;
459                // 记录额外事务处理开始时间
460                let tx_start_time = std::time::Instant::now();
461
462                let result = match self
463                    .flow_engine
464                    .submit_transaction((
465                        self.base.get_state().clone(),
466                        transaction,
467                    ))
468                    .await
469                {
470                    Ok(result) => result,
471                    Err(e) => {
472                        debug!("附加事务提交失败: {}", e);
473                        return Err(error_utils::state_error(format!(
474                            "附加事务提交失败: {e}"
475                        )));
476                    },
477                };
478
479                let (_id, mut rx) = result;
480
481                // 添加任务接收超时保护
482                let task_receive_timeout = Duration::from_millis(
483                    self.base.get_config().performance.task_receive_timeout_ms,
484                );
485                let task_result =
486                    match tokio::time::timeout(task_receive_timeout, rx.recv())
487                        .await
488                    {
489                        Ok(Some(result)) => result,
490                        Ok(None) => {
491                            debug!("附加事务接收通道已关闭");
492                            return Ok(());
493                        },
494                        Err(_) => {
495                            debug!("附加事务接收超时");
496                            return Err(error_utils::state_error(format!(
497                                "附加事务接收超时({}ms)",
498                                self.base
499                                    .get_config()
500                                    .performance
501                                    .task_receive_timeout_ms
502                            )));
503                        },
504                    };
505
506                let Some(ProcessorResult { result: Some(result), .. }) =
507                    task_result.output
508                else {
509                    debug!("附加事务处理结果无效");
510                    return Ok(());
511                };
512
513                let TransactionResult { state: new_state, transactions: trs } =
514                    result;
515                *state = Some(new_state);
516                transactions.extend(trs);
517
518                // 记录额外事务处理时间
519                let tx_elapsed = tx_start_time.elapsed();
520                if tx_elapsed.as_millis() > 50 {
521                    debug!(
522                        "附加事务处理时间较长: {}ms",
523                        tx_elapsed.as_millis()
524                    );
525                }
526            }
527        }
528        Ok(())
529    }
530
531    /// 优雅关闭异步运行时
532    ///
533    /// 这个方法会:
534    /// 1. 停止接受新任务
535    /// 2. 等待所有正在处理的任务完成
536    /// 3. 关闭底层的异步处理器
537    /// 4. 清理所有资源
538    pub async fn shutdown(&mut self) -> ForgeResult<()> {
539        debug!("开始关闭异步运行时");
540
541        // 首先关闭底层运行时
542        self.base.destroy().await?;
543
544        // 然后关闭流引擎(这会等待所有任务完成)
545        // 注意:由于 FlowEngine 包含 Arc<AsyncProcessor>,我们需要获取可变引用
546        // 这里我们使用 Arc::try_unwrap 来获取所有权,如果失败说明还有其他引用
547        debug!("正在关闭流引擎...");
548
549        debug!("异步运行时已成功关闭");
550        Ok(())
551    }
552}
553
554// ==================== RuntimeTrait 实现 ====================
555
556#[async_trait]
557impl RuntimeTrait for ForgeAsyncRuntime {
558    async fn dispatch(
559        &mut self,
560        transaction: Transaction,
561    ) -> ForgeResult<()> {
562        // 使用高性能的 dispatch_flow 而不是基类的 dispatch
563        self.dispatch_flow(transaction).await
564    }
565
566    async fn dispatch_with_meta(
567        &mut self,
568        transaction: Transaction,
569        description: String,
570        meta: serde_json::Value,
571    ) -> ForgeResult<()> {
572        // 使用高性能的 dispatch_flow_with_meta
573        self.dispatch_flow_with_meta(transaction, description, meta).await
574    }
575
576    async fn command(
577        &mut self,
578        command: Arc<dyn Command>,
579    ) -> ForgeResult<()> {
580        self.command(command).await
581    }
582
583    async fn command_with_meta(
584        &mut self,
585        command: Arc<dyn Command>,
586        description: String,
587        meta: serde_json::Value,
588    ) -> ForgeResult<()> {
589        self.command_with_meta(command, description, meta).await
590    }
591
592    async fn get_state(&self) -> ForgeResult<Arc<State>> {
593        Ok(self.base.get_state().clone())
594    }
595
596    async fn get_tr(&self) -> ForgeResult<Transaction> {
597        Ok(self.base.get_tr())
598    }
599
600    async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
601        Ok(self.base.get_schema())
602    }
603
604    async fn undo(&mut self) -> ForgeResult<()> {
605        self.base.undo();
606        Ok(())
607    }
608
609    async fn redo(&mut self) -> ForgeResult<()> {
610        self.base.redo();
611        Ok(())
612    }
613
614    async fn jump(
615        &mut self,
616        steps: isize,
617    ) -> ForgeResult<()> {
618        self.base.jump(steps);
619        Ok(())
620    }
621
622    fn get_config(&self) -> &ForgeConfig {
623        self.base.get_config()
624    }
625
626    fn update_config(
627        &mut self,
628        config: ForgeConfig,
629    ) {
630        self.base.update_config(config);
631    }
632
633    fn get_options(&self) -> &RuntimeOptions {
634        self.base.get_options()
635    }
636
637    async fn destroy(&mut self) -> ForgeResult<()> {
638        self.shutdown().await
639    }
640}