moduforge_core/
async_runtime.rs

1//! # 异步运行时超时机制改进
2//!
3//! 本模块为 ModuForge 异步运行时添加了全面的超时保护机制,解决了以下问题:
4//!
5//! ## 主要改进
6//!
7//! 1. **任务接收超时**:防止 `rx.recv().await` 无限等待
8//! 2. **中间件超时配置化**:统一使用配置而非硬编码超时时间
9//!
10//! ## 配置说明
11//!
12//! 通过 `PerformanceConfig` 可以配置各种超时时间:
13//!
14//! ```rust
15//! use moduforge_core::async_runtime::PerformanceConfig;
16//!
17//! let config = PerformanceConfig {
18//!     enable_monitoring: true,
19//!     middleware_timeout_ms: 1000,         // 中间件超时 1秒
20//!     task_receive_timeout_ms: 5000,       // 任务接收超时 5秒
21//!     ..Default::default()
22//! };
23//! ```
24//!
25//! ## 使用建议
26//!
27//! - **开发环境**:使用较长的超时时间(如 10-30 秒)便于调试
28//! - **生产环境**:使用较短的超时时间(如 1-5 秒)保证响应性
29//! - **高负载环境**:根据实际性能测试调整超时时间
30//!
31//! ## 错误处理
32//!
33//! 所有超时都会产生详细的错误信息,包含:
34//! - 超时的具体操作类型
35//! - 配置的超时时间
36//! - 便于调试的上下文信息
37
38use std::{
39    ops::{Deref, DerefMut},
40    sync::Arc,
41    time::Duration,
42};
43
44use crate::runtime::Editor;
45use crate::{
46    error_utils,
47    event::Event,
48    flow::{FlowEngine, ProcessorResult},
49    types::EditorOptions,
50    EditorResult,
51};
52use moduforge_state::{
53    debug,
54    state::TransactionResult,
55    transaction::{Command, Transaction},
56    State,
57};
58
59/// 性能监控配置
60#[derive(Debug, Clone)]
61pub struct PerformanceConfig {
62    /// 是否启用性能监控
63    pub enable_monitoring: bool,
64    /// 中间件执行超时时间(毫秒)
65    /// 推荐值:500-2000ms,取决于中间件复杂度
66    pub middleware_timeout_ms: u64,
67    /// 性能日志记录阈值(毫秒)
68    /// 超过此时间的操作将被记录到日志
69    pub log_threshold_ms: u64,
70    /// 任务接收超时时间(毫秒)
71    /// 等待异步任务结果的最大时间
72    /// 推荐值:3000-10000ms,取决于任务复杂度
73    pub task_receive_timeout_ms: u64,
74
75}
76
77impl Default for PerformanceConfig {
78    fn default() -> Self {
79        Self {
80            enable_monitoring: false,
81            middleware_timeout_ms: 500,
82            log_threshold_ms: 50,
83            task_receive_timeout_ms: 5000,     // 5秒
84        }
85    }
86}
87
88/// Editor 结构体代表编辑器的核心功能实现
89/// 负责管理文档状态、事件处理、插件系统和存储等核心功能
90pub struct AsyncEditor {
91    base: Editor,
92    flow_engine: FlowEngine,
93    perf_config: PerformanceConfig,
94}
95unsafe impl Send for AsyncEditor {}
96unsafe impl Sync for AsyncEditor {}
97
98impl Deref for AsyncEditor {
99    type Target = Editor;
100
101    fn deref(&self) -> &Self::Target {
102        &self.base
103    }
104}
105
106impl DerefMut for AsyncEditor {
107    fn deref_mut(&mut self) -> &mut Self::Target {
108        &mut self.base
109    }
110}
111impl AsyncEditor {
112    /// 创建新的编辑器实例
113    /// options: 编辑器配置选项
114    pub async fn create(options: EditorOptions) -> EditorResult<Self> {
115        let base = Editor::create(options).await?;
116        Ok(AsyncEditor {
117            base,
118            flow_engine: FlowEngine::new()?,
119            perf_config: PerformanceConfig::default(),
120        })
121    }
122
123    /// 设置性能监控配置
124    pub fn set_performance_config(
125        &mut self,
126        config: PerformanceConfig,
127    ) {
128        self.perf_config = config;
129    }
130
131    /// 记录性能指标
132    fn log_performance(
133        &self,
134        operation: &str,
135        duration: Duration,
136    ) {
137        if self.perf_config.enable_monitoring
138            && duration.as_millis() > self.perf_config.log_threshold_ms as u128
139        {
140            debug!("{} 耗时: {}ms", operation, duration.as_millis());
141        }
142    }
143    pub async fn command(
144        &mut self,
145        command: Arc<dyn Command>,
146    ) -> EditorResult<()> {
147        self.command_with_meta(command, "".to_string(), serde_json::Value::Null).await
148    }
149
150    /// 执行命令并生成相应的事务
151    ///
152    /// 此方法封装了命令到事务的转换过程,并使用高性能的`dispatch_flow`来处理生成的事务。
153    /// 适用于需要执行编辑器命令而不直接构建事务的场景。
154    ///
155    /// # 参数
156    /// * `command` - 要执行的命令
157    ///
158    /// # 返回值
159    /// * `EditorResult<()>` - 命令执行结果
160    pub async fn command_with_meta(
161        &mut self,
162        command: Arc<dyn Command>,
163        description: String,
164        meta: serde_json::Value,
165    ) -> EditorResult<()> {
166        let cmd_name = command.name();
167        debug!("正在执行命令: {}", cmd_name);
168
169        // 创建事务并应用命令
170        let mut tr = self.get_tr();
171        command.execute(&mut tr).await?;
172        tr.commit();
173        // 使用高性能处理引擎处理事务
174        match self.dispatch_flow_with_meta(tr, description, meta).await {
175            Ok(_) => {
176                debug!("命令 '{}' 执行成功", cmd_name);
177                Ok(())
178            },
179            Err(e) => {
180                debug!("命令 '{}' 执行失败: {}", cmd_name, e);
181                Err(e)
182            },
183        }
184    }
185    pub async fn dispatch_flow(
186        &mut self,
187        transaction: Transaction,
188    ) -> EditorResult<()> {
189        self.dispatch_flow_with_meta(transaction, "".to_string(), serde_json::Value::Null).await
190    }
191    /// 高性能事务处理方法,使用FlowEngine处理事务
192    ///
193    /// 与标准的dispatch方法相比,此方法具有以下优势:
194    /// 1. 利用FlowEngine提供的并行处理能力
195    /// 2. 通过异步流水线处理提高性能
196    /// 3. 减少阻塞操作,提升UI响应性
197    /// 4. 更好地处理大型文档的编辑操作
198    ///
199    /// # 参数
200    /// * `transaction` - 要处理的事务对象
201    ///
202    /// # 返回值
203    /// * `EditorResult<()>` - 处理结果,成功返回Ok(()), 失败返回错误
204    pub async fn dispatch_flow_with_meta(
205        &mut self,
206        transaction: Transaction,
207        description: String,
208        meta: serde_json::Value,
209    ) -> EditorResult<()> {
210        let start_time = std::time::Instant::now();
211        let mut current_transaction = transaction;
212        let old_id = self.get_state().version;
213        // 前置中间件处理
214        let middleware_start = std::time::Instant::now();
215        self.run_before_middleware(&mut current_transaction).await?;
216        self.log_performance("前置中间件处理", middleware_start.elapsed());
217
218        // 使用 flow_engine 提交事务
219        let (_id, mut rx) = self
220            .flow_engine
221            .submit_transaction((
222                self.base.get_state().clone(),
223                current_transaction,
224            ))
225            .await?;
226
227        // 等待任务结果(添加超时保护)
228        let recv_start = std::time::Instant::now();
229        let task_receive_timeout = Duration::from_millis(self.perf_config.task_receive_timeout_ms);
230        let task_result = match tokio::time::timeout(task_receive_timeout, rx.recv()).await {
231            Ok(Some(result)) => result,
232            Ok(None) => {
233                return Err(error_utils::state_error(
234                    "任务接收通道已关闭".to_string(),
235                ));
236            },
237            Err(_) => {
238                return Err(error_utils::state_error(
239                    format!("任务接收超时({}ms)", self.perf_config.task_receive_timeout_ms),
240                ));
241            },
242        };
243        self.log_performance("接收任务结果", recv_start.elapsed());
244
245        // 获取处理结果
246        let Some(ProcessorResult { result: Some(result), .. }) =
247            task_result.output
248        else {
249            return Err(error_utils::state_error(
250                "任务处理结果无效".to_string(),
251            ));
252        };
253
254        // 更新编辑器状态
255        let mut current_state = None;
256        let mut transactions = Vec::new();
257        transactions.extend(result.transactions);
258
259        // 检查最后一个事务是否改变了文档
260        if let Some(_) = transactions.last() {
261            current_state = Some(Arc::new(result.state));
262        }
263
264        // 执行后置中间件链
265        let after_start = std::time::Instant::now();
266        self.run_after_middleware(&mut current_state, &mut transactions)
267            .await?;
268        self.log_performance("后置中间件处理", after_start.elapsed());
269
270        // 更新状态并广播事件(状态更新无需超时保护,事件广播需要)
271        if let Some(state) = current_state {
272            self.base.update_state_with_meta(state.clone(), description, meta).await?;
273
274            let event_start = std::time::Instant::now();
275            self.base
276                .emit_event(Event::TrApply(
277                    old_id,
278                    Arc::new(transactions),
279                    state,
280                ))
281                .await?;
282            self.log_performance("事件广播", event_start.elapsed());
283        }
284
285        self.log_performance("事务处理总耗时", start_time.elapsed());
286        Ok(())
287    }
288
289    pub async fn run_before_middleware(
290        &mut self,
291        transaction: &mut Transaction,
292    ) -> EditorResult<()> {
293        debug!("执行前置中间件链");
294        for middleware in
295            &self.base.get_options().get_middleware_stack().middlewares
296        {
297            let timeout = Duration::from_millis(self.perf_config.middleware_timeout_ms);
298            match tokio::time::timeout(
299                timeout,
300                middleware.before_dispatch(transaction),
301            )
302            .await
303            {
304                Ok(Ok(())) => {
305                    // 中间件执行成功
306                    continue;
307                },
308                Ok(Err(e)) => {
309                    return Err(error_utils::middleware_error(format!(
310                        "前置中间件执行失败: {}",
311                        e
312                    )));
313                },
314                Err(_) => {
315                    return Err(error_utils::middleware_error(format!(
316                        "前置中间件执行超时({}ms)",
317                        self.perf_config.middleware_timeout_ms
318                    )));
319                },
320            }
321        }
322        transaction.commit();
323        Ok(())
324    }
325    pub async fn run_after_middleware(
326        &mut self,
327        state: &mut Option<Arc<State>>,
328        transactions: &mut Vec<Transaction>,
329    ) -> EditorResult<()> {
330        debug!("执行后置中间件链");
331        for middleware in
332            &self.base.get_options().get_middleware_stack().middlewares
333        {
334            // 使用常量定义超时时间,便于配置调整
335
336            let timeout = std::time::Duration::from_millis(
337                self.perf_config.middleware_timeout_ms,
338            );
339
340            // 记录中间件执行开始时间,用于性能监控
341            let start_time = std::time::Instant::now();
342
343            let middleware_result = match tokio::time::timeout(
344                timeout,
345                middleware.after_dispatch(state.clone(), transactions),
346            )
347            .await
348            {
349                Ok(result) => match result {
350                    Ok(r) => r,
351                    Err(e) => {
352                        // 记录更详细的错误信息
353                        debug!("中间件执行失败: {}", e);
354                        return Err(error_utils::middleware_error(format!(
355                            "中间件执行失败: {}",
356                            e
357                        )));
358                    },
359                },
360                Err(e) => {
361                    debug!("中间件执行超时: {}", e);
362                    return Err(error_utils::middleware_error(format!(
363                        "中间件执行超时: {}",
364                        e
365                    )));
366                },
367            };
368
369            // 记录中间件执行时间,用于性能监控
370            let elapsed = start_time.elapsed();
371            if elapsed.as_millis() > 100 {
372                debug!("中间件执行时间较长: {}ms", elapsed.as_millis());
373            }
374
375            if let Some(mut transaction) = middleware_result.additional_transaction
376            {
377                transaction.commit();
378                // 记录额外事务处理开始时间
379                let tx_start_time = std::time::Instant::now();
380
381                let result = match self
382                    .flow_engine
383                    .submit_transaction((
384                        self.base.get_state().clone(),
385                        transaction,
386                    ))
387                    .await
388                {
389                    Ok(result) => result,
390                    Err(e) => {
391                        debug!("附加事务提交失败: {}", e);
392                        return Err(error_utils::state_error(format!(
393                            "附加事务提交失败: {}",
394                            e
395                        )));
396                    },
397                };
398
399                let (_id, mut rx) = result;
400
401                // 添加任务接收超时保护
402                let task_receive_timeout = Duration::from_millis(self.perf_config.task_receive_timeout_ms);
403                let task_result = match tokio::time::timeout(task_receive_timeout, rx.recv()).await {
404                    Ok(Some(result)) => result,
405                    Ok(None) => {
406                        debug!("附加事务接收通道已关闭");
407                        return Ok(());
408                    },
409                    Err(_) => {
410                        debug!("附加事务接收超时");
411                        return Err(error_utils::state_error(format!(
412                            "附加事务接收超时({}ms)",
413                            self.perf_config.task_receive_timeout_ms
414                        )));
415                    },
416                };
417
418                let Some(ProcessorResult { result: Some(result), .. }) =
419                    task_result.output
420                else {
421                    debug!("附加事务处理结果无效");
422                    return Ok(());
423                };
424
425                let TransactionResult { state: new_state, transactions: trs } =
426                    result;
427                *state = Some(Arc::new(new_state));
428                transactions.extend(trs);
429
430                // 记录额外事务处理时间
431                let tx_elapsed = tx_start_time.elapsed();
432                if tx_elapsed.as_millis() > 50 {
433                    debug!(
434                        "附加事务处理时间较长: {}ms",
435                        tx_elapsed.as_millis()
436                    );
437                }
438            }
439        }
440        Ok(())
441    }
442}