mf_core/runtime/
async_runtime.rs

1//! # 异步运行时超时机制改进
2//!
3//! 本模块为 ModuForge 异步运行时添加了全面的超时保护机制,解决了以下问题:
4//!
5//! ## 主要改进
6//!
7//! 1. **任务接收超时**:防止 `rx.recv().await` 无限等待
8//! 2. **中间件超时配置化**:统一使用配置而非硬编码超时时间
9//!
10//! ## 配置说明
11//!
12//! 通过 `PerformanceConfig` 可以配置各种超时时间:
13//!
14//! ```rust
15//! use mf_core::async_runtime::PerformanceConfig;
16//!
17//! let config = PerformanceConfig {
18//!     enable_monitoring: true,
19//!     middleware_timeout_ms: 1000,         // 中间件超时 1秒
20//!     task_receive_timeout_ms: 5000,       // 任务接收超时 5秒
21//!     ..Default::default()
22//! };
23//! ```
24//!
25//! ## 使用建议
26//!
27//! - **开发环境**:使用较长的超时时间(如 10-30 秒)便于调试
28//! - **生产环境**:使用较短的超时时间(如 1-5 秒)保证响应性
29//! - **高负载环境**:根据实际性能测试调整超时时间
30//!
31//! ## 错误处理
32//!
33//! 所有超时都会产生详细的错误信息,包含:
34//! - 超时的具体操作类型
35//! - 配置的超时时间
36//! - 便于调试的上下文信息
37
38use std::{
39    ops::{Deref, DerefMut},
40    sync::Arc,
41    time::Duration,
42};
43use crate::runtime::runtime::ForgeRuntime;
44use crate::types::ProcessorResult;
45use crate::{
46    config::{ForgeConfig, PerformanceConfig},
47    error::error_utils,
48    event::Event,
49    runtime::async_flow::{FlowEngine},
50    types::RuntimeOptions,
51    ForgeResult,
52};
53use mf_state::{
54    debug,
55    state::TransactionResult,
56    transaction::{Command, Transaction},
57    State,
58};
59
60// PerformanceConfig 现在从 crate::config 模块导入
61
62/// 异步编�器运行时
63///
64/// 提供异步操作支持的编辑器运行时,包含:
65/// - 基础编辑器功能(通过 ForgeRuntime)
66/// - 异步流引擎(用于处理复杂的异步操作流)
67///
68/// 配置通过基础 ForgeRuntime 访问,避免重复持有
69pub struct ForgeAsyncRuntime {
70    base: ForgeRuntime,
71    flow_engine: FlowEngine,
72}
73
74impl Deref for ForgeAsyncRuntime {
75    type Target = ForgeRuntime;
76
77    fn deref(&self) -> &Self::Target {
78        &self.base
79    }
80}
81
82impl DerefMut for ForgeAsyncRuntime {
83    fn deref_mut(&mut self) -> &mut Self::Target {
84        &mut self.base
85    }
86}
87impl ForgeAsyncRuntime {
88    /// 创建新的编辑器实例
89    ///
90    /// 此方法会自动从以下位置加载XML schema配置:
91    /// 1. 优先使用 `config.extension.xml_schema_paths` 中配置的路径
92    /// 2. 如果没有配置,则尝试加载默认的 `schema/main.xml`
93    /// 3. 如果都没有,则使用默认配置
94    ///
95    /// # 参数
96    /// * `options` - 编辑器配置选项
97    ///
98    /// # 返回值
99    /// * `ForgeResult<Self>` - 异步编辑器实例或错误
100    pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
101        Self::create_with_config(options, ForgeConfig::default()).await
102    }
103
104    /// 从指定路径的XML schema文件创建异步编辑器实例
105    ///
106    /// # 参数
107    /// * `xml_schema_path` - XML schema文件路径
108    /// * `options` - 可选的RuntimeOptions配置
109    /// * `config` - 可选的ForgeConfig配置
110    ///
111    /// # 返回值
112    /// * `ForgeResult<Self>` - 异步编辑器实例或错误
113    ///
114    /// # 示例
115    /// ```rust
116    /// use mf_core::ForgeAsyncRuntime;
117    ///
118    /// // 从指定路径加载schema
119    /// let runtime = ForgeAsyncRuntime::from_xml_schema_path(
120    ///     "./schemas/document.xml",
121    ///     None,
122    ///     None
123    /// ).await?;
124    /// ```
125    pub async fn from_xml_schema_path(
126        xml_schema_path: &str,
127        options: Option<RuntimeOptions>,
128        config: Option<ForgeConfig>,
129    ) -> ForgeResult<Self> {
130        let mut config = config.unwrap_or_default();
131        config.extension.xml_schema_paths = vec![xml_schema_path.to_string()];
132        Self::create_with_config(options.unwrap_or_default(), config).await
133    }
134
135    /// 从多个XML schema文件创建异步编辑器实例
136    ///
137    /// # 参数
138    /// * `xml_schema_paths` - XML schema文件路径列表
139    /// * `options` - 可选的RuntimeOptions配置
140    /// * `config` - 可选的ForgeConfig配置
141    ///
142    /// # 返回值
143    /// * `ForgeResult<Self>` - 异步编辑器实例或错误
144    pub async fn from_xml_schemas(
145        xml_schema_paths: &[&str],
146        options: Option<RuntimeOptions>,
147        config: Option<ForgeConfig>,
148    ) -> ForgeResult<Self> {
149        let mut config = config.unwrap_or_default();
150        config.extension.xml_schema_paths =
151            xml_schema_paths.iter().map(|s| s.to_string()).collect();
152        Self::create_with_config(options.unwrap_or_default(), config).await
153    }
154
155    /// 从XML内容字符串创建异步编辑器实例
156    ///
157    /// # 参数
158    /// * `xml_content` - XML schema内容
159    /// * `options` - 可选的RuntimeOptions配置
160    /// * `config` - 可选的ForgeConfig配置
161    ///
162    /// # 返回值
163    /// * `ForgeResult<Self>` - 异步编辑器实例或错误
164    pub async fn from_xml_content(
165        xml_content: &str,
166        options: Option<RuntimeOptions>,
167        config: Option<ForgeConfig>,
168    ) -> ForgeResult<Self> {
169        let base = ForgeRuntime::from_xml_content(xml_content, options, config)
170            .await?;
171        Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
172    }
173
174    /// 使用指定配置创建异步编辑器实例
175    ///
176    /// 此方法会自动从以下位置加载XML schema配置:
177    /// 1. 优先使用 `config.extension.xml_schema_paths` 中配置的路径
178    /// 2. 如果没有配置,则尝试加载默认的 `schema/main.xml`
179    /// 3. 如果都没有,则使用默认配置
180    ///
181    /// # 参数
182    /// * `options` - 编辑器配置选项
183    /// * `config` - 编辑器配置
184    ///
185    /// # 返回值
186    /// * `ForgeResult<Self>` - 异步编辑器实例或错误
187    pub async fn create_with_config(
188        options: RuntimeOptions,
189        config: ForgeConfig,
190    ) -> ForgeResult<Self> {
191        let base = ForgeRuntime::create_with_config(options, config).await?;
192        Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
193    }
194
195    /// 设置性能监控配置
196    pub fn set_performance_config(
197        &mut self,
198        perf_config: PerformanceConfig,
199    ) {
200        self.base.update_config({
201            let mut config = self.base.get_config().clone();
202            config.performance = perf_config;
203            config
204        });
205    }
206
207    /// 获取当前配置
208    pub fn get_config(&self) -> &ForgeConfig {
209        self.base.get_config()
210    }
211
212    /// 更新配置
213    pub fn update_config(
214        &mut self,
215        config: ForgeConfig,
216    ) {
217        self.base.update_config(config);
218    }
219
220    /// 记录性能指标
221    fn log_performance(
222        &self,
223        operation: &str,
224        duration: Duration,
225    ) {
226        if self.base.get_config().performance.enable_monitoring
227            && duration.as_millis()
228                > self.base.get_config().performance.log_threshold_ms as u128
229        {
230            debug!("{} 耗时: {}ms", operation, duration.as_millis());
231        }
232    }
233    pub async fn command(
234        &mut self,
235        command: Arc<dyn Command>,
236    ) -> ForgeResult<()> {
237        self.command_with_meta(command, "".to_string(), serde_json::Value::Null)
238            .await
239    }
240
241    /// 执行命令并生成相应的事务
242    ///
243    /// 此方法封装了命令到事务的转换过程,并使用高性能的`dispatch_flow`来处理生成的事务。
244    /// 适用于需要执行编辑器命令而不直接构建事务的场景。
245    ///
246    /// # 参数
247    /// * `command` - 要执行的命令
248    ///
249    /// # 返回值
250    /// * `EditorResult<()>` - 命令执行结果
251    pub async fn command_with_meta(
252        &mut self,
253        command: Arc<dyn Command>,
254        description: String,
255        meta: serde_json::Value,
256    ) -> ForgeResult<()> {
257        let cmd_name = command.name();
258        debug!("正在执行命令: {}", cmd_name);
259
260        // 创建事务并应用命令
261        let mut tr = self.get_tr();
262        command.execute(&mut tr).await?;
263        tr.commit();
264        // 使用高性能处理引擎处理事务
265        match self.dispatch_flow_with_meta(tr, description, meta).await {
266            Ok(_) => {
267                debug!("命令 '{}' 执行成功", cmd_name);
268                Ok(())
269            },
270            Err(e) => {
271                debug!("命令 '{}' 执行失败: {}", cmd_name, e);
272                Err(e)
273            },
274        }
275    }
276    pub async fn dispatch_flow(
277        &mut self,
278        transaction: Transaction,
279    ) -> ForgeResult<()> {
280        self.dispatch_flow_with_meta(
281            transaction,
282            "".to_string(),
283            serde_json::Value::Null,
284        )
285        .await
286    }
287    /// 高性能事务处理方法,使用FlowEngine处理事务
288    ///
289    /// 与标准的dispatch方法相比,此方法具有以下优势:
290    /// 1. 利用FlowEngine提供的并行处理能力
291    /// 2. 通过异步流水线处理提高性能
292    /// 3. 减少阻塞操作,提升UI响应性
293    /// 4. 更好地处理大型文档的编辑操作
294    ///
295    /// # 参数
296    /// * `transaction` - 要处理的事务对象
297    ///
298    /// # 返回值
299    /// * `EditorResult<()>` - 处理结果,成功返回Ok(()), 失败返回错误
300    pub async fn dispatch_flow_with_meta(
301        &mut self,
302        transaction: Transaction,
303        description: String,
304        meta: serde_json::Value,
305    ) -> ForgeResult<()> {
306        let start_time = std::time::Instant::now();
307        let mut current_transaction = transaction;
308        let old_id = self.get_state().version;
309        // 前置中间件处理
310        let middleware_start = std::time::Instant::now();
311        self.run_before_middleware(&mut current_transaction).await?;
312        self.log_performance("前置中间件处理", middleware_start.elapsed());
313
314        // 使用 flow_engine 提交事务
315        let (_id, mut rx) = self
316            .flow_engine
317            .submit_transaction((
318                self.base.get_state().clone(),
319                current_transaction,
320            ))
321            .await?;
322
323        // 等待任务结果(添加超时保护)
324        let recv_start = std::time::Instant::now();
325        let task_receive_timeout = Duration::from_millis(
326            self.base.get_config().performance.task_receive_timeout_ms,
327        );
328        let task_result =
329            match tokio::time::timeout(task_receive_timeout, rx.recv()).await {
330                Ok(Some(result)) => result,
331                Ok(None) => {
332                    return Err(error_utils::state_error(
333                        "任务接收通道已关闭".to_string(),
334                    ));
335                },
336                Err(_) => {
337                    return Err(error_utils::state_error(format!(
338                        "任务接收超时({}ms)",
339                        self.base
340                            .get_config()
341                            .performance
342                            .task_receive_timeout_ms
343                    )));
344                },
345            };
346        self.log_performance("接收任务结果", recv_start.elapsed());
347
348        // 获取处理结果
349        let Some(ProcessorResult { result: Some(result), .. }) =
350            task_result.output
351        else {
352            return Err(error_utils::state_error(
353                "任务处理结果无效".to_string(),
354            ));
355        };
356
357        // 更新编辑器状态
358        let mut current_state = None;
359        let mut transactions = Vec::new();
360        transactions.extend(result.transactions);
361
362        // 检查最后一个事务是否改变了文档
363        if let Some(_) = transactions.last() {
364            current_state = Some(Arc::new(result.state));
365        }
366
367        // 执行后置中间件链
368        let after_start = std::time::Instant::now();
369        self.run_after_middleware(&mut current_state, &mut transactions)
370            .await?;
371        self.log_performance("后置中间件处理", after_start.elapsed());
372
373        // 更新状态并广播事件(状态更新无需超时保护,事件广播需要)
374        if let Some(state) = current_state {
375            self.base
376                .update_state_with_meta(state.clone(), description, meta)
377                .await?;
378
379            let event_start = std::time::Instant::now();
380            self.base
381                .emit_event(Event::TrApply(
382                    old_id,
383                    Arc::new(transactions),
384                    state,
385                ))
386                .await?;
387            self.log_performance("事件广播", event_start.elapsed());
388        }
389
390        self.log_performance("事务处理总耗时", start_time.elapsed());
391        Ok(())
392    }
393
394    pub async fn run_before_middleware(
395        &mut self,
396        transaction: &mut Transaction,
397    ) -> ForgeResult<()> {
398        debug!("执行前置中间件链");
399        for middleware in
400            &self.base.get_options().get_middleware_stack().middlewares
401        {
402            let timeout = Duration::from_millis(
403                self.base.get_config().performance.middleware_timeout_ms,
404            );
405            match tokio::time::timeout(
406                timeout,
407                middleware.before_dispatch(transaction),
408            )
409            .await
410            {
411                Ok(Ok(())) => {
412                    // 中间件执行成功
413                    continue;
414                },
415                Ok(Err(e)) => {
416                    return Err(error_utils::middleware_error(format!(
417                        "前置中间件执行失败: {}",
418                        e
419                    )));
420                },
421                Err(_) => {
422                    return Err(error_utils::middleware_error(format!(
423                        "前置中间件执行超时({}ms)",
424                        self.base
425                            .get_config()
426                            .performance
427                            .middleware_timeout_ms
428                    )));
429                },
430            }
431        }
432        transaction.commit();
433        Ok(())
434    }
435    pub async fn run_after_middleware(
436        &mut self,
437        state: &mut Option<Arc<State>>,
438        transactions: &mut Vec<Transaction>,
439    ) -> ForgeResult<()> {
440        debug!("执行后置中间件链");
441        for middleware in
442            &self.base.get_options().get_middleware_stack().middlewares
443        {
444            // 使用常量定义超时时间,便于配置调整
445
446            let timeout = std::time::Duration::from_millis(
447                self.base.get_config().performance.middleware_timeout_ms,
448            );
449
450            // 记录中间件执行开始时间,用于性能监控
451            let start_time = std::time::Instant::now();
452
453            let middleware_result = match tokio::time::timeout(
454                timeout,
455                middleware.after_dispatch(state.clone(), transactions),
456            )
457            .await
458            {
459                Ok(result) => match result {
460                    Ok(r) => r,
461                    Err(e) => {
462                        // 记录更详细的错误信息
463                        debug!("中间件执行失败: {}", e);
464                        return Err(error_utils::middleware_error(format!(
465                            "中间件执行失败: {}",
466                            e
467                        )));
468                    },
469                },
470                Err(e) => {
471                    debug!("中间件执行超时: {}", e);
472                    return Err(error_utils::middleware_error(format!(
473                        "中间件执行超时: {}",
474                        e
475                    )));
476                },
477            };
478
479            // 记录中间件执行时间,用于性能监控
480            let elapsed = start_time.elapsed();
481            if elapsed.as_millis() > 100 {
482                debug!("中间件执行时间较长: {}ms", elapsed.as_millis());
483            }
484
485            if let Some(mut transaction) = middleware_result {
486                transaction.commit();
487                // 记录额外事务处理开始时间
488                let tx_start_time = std::time::Instant::now();
489
490                let result = match self
491                    .flow_engine
492                    .submit_transaction((
493                        self.base.get_state().clone(),
494                        transaction,
495                    ))
496                    .await
497                {
498                    Ok(result) => result,
499                    Err(e) => {
500                        debug!("附加事务提交失败: {}", e);
501                        return Err(error_utils::state_error(format!(
502                            "附加事务提交失败: {}",
503                            e
504                        )));
505                    },
506                };
507
508                let (_id, mut rx) = result;
509
510                // 添加任务接收超时保护
511                let task_receive_timeout = Duration::from_millis(
512                    self.base.get_config().performance.task_receive_timeout_ms,
513                );
514                let task_result =
515                    match tokio::time::timeout(task_receive_timeout, rx.recv())
516                        .await
517                    {
518                        Ok(Some(result)) => result,
519                        Ok(None) => {
520                            debug!("附加事务接收通道已关闭");
521                            return Ok(());
522                        },
523                        Err(_) => {
524                            debug!("附加事务接收超时");
525                            return Err(error_utils::state_error(format!(
526                                "附加事务接收超时({}ms)",
527                                self.base
528                                    .get_config()
529                                    .performance
530                                    .task_receive_timeout_ms
531                            )));
532                        },
533                    };
534
535                let Some(ProcessorResult { result: Some(result), .. }) =
536                    task_result.output
537                else {
538                    debug!("附加事务处理结果无效");
539                    return Ok(());
540                };
541
542                let TransactionResult { state: new_state, transactions: trs } =
543                    result;
544                *state = Some(Arc::new(new_state));
545                transactions.extend(trs);
546
547                // 记录额外事务处理时间
548                let tx_elapsed = tx_start_time.elapsed();
549                if tx_elapsed.as_millis() > 50 {
550                    debug!(
551                        "附加事务处理时间较长: {}ms",
552                        tx_elapsed.as_millis()
553                    );
554                }
555            }
556        }
557        Ok(())
558    }
559
560    /// 优雅关闭异步运行时
561    ///
562    /// 这个方法会:
563    /// 1. 停止接受新任务
564    /// 2. 等待所有正在处理的任务完成
565    /// 3. 关闭底层的异步处理器
566    /// 4. 清理所有资源
567    pub async fn shutdown(&mut self) -> ForgeResult<()> {
568        debug!("开始关闭异步运行时");
569
570        // 首先关闭底层运行时
571        self.base.destroy().await?;
572
573        // 然后关闭流引擎(这会等待所有任务完成)
574        // 注意:由于 FlowEngine 包含 Arc<AsyncProcessor>,我们需要获取可变引用
575        // 这里我们使用 Arc::try_unwrap 来获取所有权,如果失败说明还有其他引用
576        debug!("正在关闭流引擎...");
577
578        debug!("异步运行时已成功关闭");
579        Ok(())
580    }
581}