moduforge_core/
async_runtime.rs

1//! # 异步运行时超时机制改进
2//!
3//! 本模块为 ModuForge 异步运行时添加了全面的超时保护机制,解决了以下问题:
4//!
5//! ## 主要改进
6//!
7//! 1. **任务接收超时**:防止 `rx.recv().await` 无限等待
8//! 2. **中间件超时配置化**:统一使用配置而非硬编码超时时间
9//!
10//! ## 配置说明
11//!
12//! 通过 `PerformanceConfig` 可以配置各种超时时间:
13//!
14//! ```rust
15//! use moduforge_core::async_runtime::PerformanceConfig;
16//!
17//! let config = PerformanceConfig {
18//!     enable_monitoring: true,
19//!     middleware_timeout_ms: 1000,         // 中间件超时 1秒
20//!     task_receive_timeout_ms: 5000,       // 任务接收超时 5秒
21//!     ..Default::default()
22//! };
23//! ```
24//!
25//! ## 使用建议
26//!
27//! - **开发环境**:使用较长的超时时间(如 10-30 秒)便于调试
28//! - **生产环境**:使用较短的超时时间(如 1-5 秒)保证响应性
29//! - **高负载环境**:根据实际性能测试调整超时时间
30//!
31//! ## 错误处理
32//!
33//! 所有超时都会产生详细的错误信息,包含:
34//! - 超时的具体操作类型
35//! - 配置的超时时间
36//! - 便于调试的上下文信息
37
38use std::{
39    ops::{Deref, DerefMut},
40    sync::Arc,
41    time::Duration,
42};
43
44use crate::runtime::ForgeRuntime;
45use crate::{
46    error_utils,
47    event::Event,
48    flow::{FlowEngine, ProcessorResult},
49    types::RuntimeOptions,
50    ForgeResult,
51};
52use moduforge_state::{
53    debug,
54    state::TransactionResult,
55    transaction::{Command, Transaction},
56    State,
57};
58
59/// 性能监控配置
60#[derive(Debug, Clone)]
61pub struct PerformanceConfig {
62    /// 是否启用性能监控
63    pub enable_monitoring: bool,
64    /// 中间件执行超时时间(毫秒)
65    /// 推荐值:500-2000ms,取决于中间件复杂度
66    pub middleware_timeout_ms: u64,
67    /// 性能日志记录阈值(毫秒)
68    /// 超过此时间的操作将被记录到日志
69    pub log_threshold_ms: u64,
70    /// 任务接收超时时间(毫秒)
71    /// 等待异步任务结果的最大时间
72    /// 推荐值:3000-10000ms,取决于任务复杂度
73    pub task_receive_timeout_ms: u64,
74}
75
76impl Default for PerformanceConfig {
77    fn default() -> Self {
78        Self {
79            enable_monitoring: false,
80            middleware_timeout_ms: 500,
81            log_threshold_ms: 50,
82            task_receive_timeout_ms: 5000, // 5秒
83        }
84    }
85}
86
87/// Editor 结构体代表编辑器的核心功能实现
88/// 负责管理文档状态、事件处理、插件系统和存储等核心功能
89pub struct ForgeAsyncRuntime {
90    base: ForgeRuntime,
91    flow_engine: FlowEngine,
92    perf_config: PerformanceConfig,
93}
94unsafe impl Send for ForgeAsyncRuntime {}
95unsafe impl Sync for ForgeAsyncRuntime {}
96
97impl Deref for ForgeAsyncRuntime {
98    type Target = ForgeRuntime;
99
100    fn deref(&self) -> &Self::Target {
101        &self.base
102    }
103}
104
105impl DerefMut for ForgeAsyncRuntime {
106    fn deref_mut(&mut self) -> &mut Self::Target {
107        &mut self.base
108    }
109}
110impl ForgeAsyncRuntime {
111    /// 创建新的编辑器实例
112    /// options: 编辑器配置选项
113    pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
114        let base = ForgeRuntime::create(options).await?;
115        Ok(ForgeAsyncRuntime {
116            base,
117            flow_engine: FlowEngine::new()?,
118            perf_config: PerformanceConfig::default(),
119        })
120    }
121
122    /// 设置性能监控配置
123    pub fn set_performance_config(
124        &mut self,
125        config: PerformanceConfig,
126    ) {
127        self.perf_config = config;
128    }
129
130    /// 记录性能指标
131    fn log_performance(
132        &self,
133        operation: &str,
134        duration: Duration,
135    ) {
136        if self.perf_config.enable_monitoring
137            && duration.as_millis() > self.perf_config.log_threshold_ms as u128
138        {
139            debug!("{} 耗时: {}ms", operation, duration.as_millis());
140        }
141    }
142    pub async fn command(
143        &mut self,
144        command: Arc<dyn Command>,
145    ) -> ForgeResult<()> {
146        self.command_with_meta(command, "".to_string(), serde_json::Value::Null)
147            .await
148    }
149
150    /// 执行命令并生成相应的事务
151    ///
152    /// 此方法封装了命令到事务的转换过程,并使用高性能的`dispatch_flow`来处理生成的事务。
153    /// 适用于需要执行编辑器命令而不直接构建事务的场景。
154    ///
155    /// # 参数
156    /// * `command` - 要执行的命令
157    ///
158    /// # 返回值
159    /// * `EditorResult<()>` - 命令执行结果
160    pub async fn command_with_meta(
161        &mut self,
162        command: Arc<dyn Command>,
163        description: String,
164        meta: serde_json::Value,
165    ) -> ForgeResult<()> {
166        let cmd_name = command.name();
167        debug!("正在执行命令: {}", cmd_name);
168
169        // 创建事务并应用命令
170        let mut tr = self.get_tr();
171        command.execute(&mut tr).await?;
172        tr.commit();
173        // 使用高性能处理引擎处理事务
174        match self.dispatch_flow_with_meta(tr, description, meta).await {
175            Ok(_) => {
176                debug!("命令 '{}' 执行成功", cmd_name);
177                Ok(())
178            },
179            Err(e) => {
180                debug!("命令 '{}' 执行失败: {}", cmd_name, e);
181                Err(e)
182            },
183        }
184    }
185    pub async fn dispatch_flow(
186        &mut self,
187        transaction: Transaction,
188    ) -> ForgeResult<()> {
189        self.dispatch_flow_with_meta(
190            transaction,
191            "".to_string(),
192            serde_json::Value::Null,
193        )
194        .await
195    }
196    /// 高性能事务处理方法,使用FlowEngine处理事务
197    ///
198    /// 与标准的dispatch方法相比,此方法具有以下优势:
199    /// 1. 利用FlowEngine提供的并行处理能力
200    /// 2. 通过异步流水线处理提高性能
201    /// 3. 减少阻塞操作,提升UI响应性
202    /// 4. 更好地处理大型文档的编辑操作
203    ///
204    /// # 参数
205    /// * `transaction` - 要处理的事务对象
206    ///
207    /// # 返回值
208    /// * `EditorResult<()>` - 处理结果,成功返回Ok(()), 失败返回错误
209    pub async fn dispatch_flow_with_meta(
210        &mut self,
211        transaction: Transaction,
212        description: String,
213        meta: serde_json::Value,
214    ) -> ForgeResult<()> {
215        let start_time = std::time::Instant::now();
216        let mut current_transaction = transaction;
217        let old_id = self.get_state().version;
218        // 前置中间件处理
219        let middleware_start = std::time::Instant::now();
220        self.run_before_middleware(&mut current_transaction).await?;
221        self.log_performance("前置中间件处理", middleware_start.elapsed());
222
223        // 使用 flow_engine 提交事务
224        let (_id, mut rx) = self
225            .flow_engine
226            .submit_transaction((
227                self.base.get_state().clone(),
228                current_transaction,
229            ))
230            .await?;
231
232        // 等待任务结果(添加超时保护)
233        let recv_start = std::time::Instant::now();
234        let task_receive_timeout =
235            Duration::from_millis(self.perf_config.task_receive_timeout_ms);
236        let task_result =
237            match tokio::time::timeout(task_receive_timeout, rx.recv()).await {
238                Ok(Some(result)) => result,
239                Ok(None) => {
240                    return Err(error_utils::state_error(
241                        "任务接收通道已关闭".to_string(),
242                    ));
243                },
244                Err(_) => {
245                    return Err(error_utils::state_error(format!(
246                        "任务接收超时({}ms)",
247                        self.perf_config.task_receive_timeout_ms
248                    )));
249                },
250            };
251        self.log_performance("接收任务结果", recv_start.elapsed());
252
253        // 获取处理结果
254        let Some(ProcessorResult { result: Some(result), .. }) =
255            task_result.output
256        else {
257            return Err(error_utils::state_error(
258                "任务处理结果无效".to_string(),
259            ));
260        };
261
262        // 更新编辑器状态
263        let mut current_state = None;
264        let mut transactions = Vec::new();
265        transactions.extend(result.transactions);
266
267        // 检查最后一个事务是否改变了文档
268        if let Some(_) = transactions.last() {
269            current_state = Some(Arc::new(result.state));
270        }
271
272        // 执行后置中间件链
273        let after_start = std::time::Instant::now();
274        self.run_after_middleware(&mut current_state, &mut transactions)
275            .await?;
276        self.log_performance("后置中间件处理", after_start.elapsed());
277
278        // 更新状态并广播事件(状态更新无需超时保护,事件广播需要)
279        if let Some(state) = current_state {
280            self.base
281                .update_state_with_meta(state.clone(), description, meta)
282                .await?;
283
284            let event_start = std::time::Instant::now();
285            self.base
286                .emit_event(Event::TrApply(
287                    old_id,
288                    Arc::new(transactions),
289                    state,
290                ))
291                .await?;
292            self.log_performance("事件广播", event_start.elapsed());
293        }
294
295        self.log_performance("事务处理总耗时", start_time.elapsed());
296        Ok(())
297    }
298
299    pub async fn run_before_middleware(
300        &mut self,
301        transaction: &mut Transaction,
302    ) -> ForgeResult<()> {
303        debug!("执行前置中间件链");
304        for middleware in
305            &self.base.get_options().get_middleware_stack().middlewares
306        {
307            let timeout =
308                Duration::from_millis(self.perf_config.middleware_timeout_ms);
309            match tokio::time::timeout(
310                timeout,
311                middleware.before_dispatch(transaction),
312            )
313            .await
314            {
315                Ok(Ok(())) => {
316                    // 中间件执行成功
317                    continue;
318                },
319                Ok(Err(e)) => {
320                    return Err(error_utils::middleware_error(format!(
321                        "前置中间件执行失败: {}",
322                        e
323                    )));
324                },
325                Err(_) => {
326                    return Err(error_utils::middleware_error(format!(
327                        "前置中间件执行超时({}ms)",
328                        self.perf_config.middleware_timeout_ms
329                    )));
330                },
331            }
332        }
333        transaction.commit();
334        Ok(())
335    }
336    pub async fn run_after_middleware(
337        &mut self,
338        state: &mut Option<Arc<State>>,
339        transactions: &mut Vec<Transaction>,
340        ) -> ForgeResult<()> {
341        debug!("执行后置中间件链");
342        for middleware in
343            &self.base.get_options().get_middleware_stack().middlewares
344        {
345            // 使用常量定义超时时间,便于配置调整
346
347            let timeout = std::time::Duration::from_millis(
348                self.perf_config.middleware_timeout_ms,
349            );
350
351            // 记录中间件执行开始时间,用于性能监控
352            let start_time = std::time::Instant::now();
353
354            let middleware_result = match tokio::time::timeout(
355                timeout,
356                middleware.after_dispatch(state.clone(), transactions),
357            )
358            .await
359            {
360                Ok(result) => match result {
361                    Ok(r) => r,
362                    Err(e) => {
363                        // 记录更详细的错误信息
364                        debug!("中间件执行失败: {}", e);
365                        return Err(error_utils::middleware_error(format!(
366                            "中间件执行失败: {}",
367                            e
368                        )));
369                    },
370                },
371                Err(e) => {
372                    debug!("中间件执行超时: {}", e);
373                    return Err(error_utils::middleware_error(format!(
374                        "中间件执行超时: {}",
375                        e
376                    )));
377                },
378            };
379
380            // 记录中间件执行时间,用于性能监控
381            let elapsed = start_time.elapsed();
382            if elapsed.as_millis() > 100 {
383                debug!("中间件执行时间较长: {}ms", elapsed.as_millis());
384            }
385
386            if let Some(mut transaction) =
387                middleware_result
388            {
389                transaction.commit();
390                // 记录额外事务处理开始时间
391                let tx_start_time = std::time::Instant::now();
392
393                let result = match self
394                    .flow_engine
395                    .submit_transaction((
396                        self.base.get_state().clone(),
397                        transaction,
398                    ))
399                    .await
400                {
401                    Ok(result) => result,
402                    Err(e) => {
403                        debug!("附加事务提交失败: {}", e);
404                        return Err(error_utils::state_error(format!(
405                            "附加事务提交失败: {}",
406                            e
407                        )));
408                    },
409                };
410
411                let (_id, mut rx) = result;
412
413                // 添加任务接收超时保护
414                let task_receive_timeout = Duration::from_millis(
415                    self.perf_config.task_receive_timeout_ms,
416                );
417                let task_result =
418                    match tokio::time::timeout(task_receive_timeout, rx.recv())
419                        .await
420                    {
421                        Ok(Some(result)) => result,
422                        Ok(None) => {
423                            debug!("附加事务接收通道已关闭");
424                            return Ok(());
425                        },
426                        Err(_) => {
427                            debug!("附加事务接收超时");
428                            return Err(error_utils::state_error(format!(
429                                "附加事务接收超时({}ms)",
430                                self.perf_config.task_receive_timeout_ms
431                            )));
432                        },
433                    };
434
435                let Some(ProcessorResult { result: Some(result), .. }) =
436                    task_result.output
437                else {
438                    debug!("附加事务处理结果无效");
439                    return Ok(());
440                };
441
442                let TransactionResult { state: new_state, transactions: trs } =
443                    result;
444                *state = Some(Arc::new(new_state));
445                transactions.extend(trs);
446
447                // 记录额外事务处理时间
448                let tx_elapsed = tx_start_time.elapsed();
449                if tx_elapsed.as_millis() > 50 {
450                    debug!(
451                        "附加事务处理时间较长: {}ms",
452                        tx_elapsed.as_millis()
453                    );
454                }
455            }
456        }
457        Ok(())
458    }
459}