moduforge_core/
async_runtime.rs

1use std::{
2    ops::{Deref, DerefMut},
3    sync::Arc,
4    time::Duration,
5};
6
7use crate::runtime::Editor;
8use crate::{
9    error_utils,
10    event::Event,
11    flow::{FlowEngine, ProcessorResult},
12    types::EditorOptions,
13    EditorResult,
14};
15use moduforge_state::{
16    debug,
17    state::TransactionResult,
18    transaction::{Command, Transaction},
19    State,
20};
21
22/// 性能监控配置
23#[derive(Debug, Clone)]
24pub struct PerformanceConfig {
25    pub enable_monitoring: bool,
26    pub middleware_timeout_ms: u64,
27    pub log_threshold_ms: u64,
28}
29
30impl Default for PerformanceConfig {
31    fn default() -> Self {
32        Self {
33            enable_monitoring: false,
34            middleware_timeout_ms: 500,
35            log_threshold_ms: 50,
36        }
37    }
38}
39
40/// Editor 结构体代表编辑器的核心功能实现
41/// 负责管理文档状态、事件处理、插件系统和存储等核心功能
42pub struct AsyncEditor {
43    base: Editor,
44    flow_engine: FlowEngine,
45    perf_config: PerformanceConfig,
46}
47unsafe impl Send for AsyncEditor {}
48unsafe impl Sync for AsyncEditor {}
49
50impl Deref for AsyncEditor {
51    type Target = Editor;
52
53    fn deref(&self) -> &Self::Target {
54        &self.base
55    }
56}
57
58impl DerefMut for AsyncEditor {
59    fn deref_mut(&mut self) -> &mut Self::Target {
60        &mut self.base
61    }
62}
63impl AsyncEditor {
64    /// 创建新的编辑器实例
65    /// options: 编辑器配置选项
66    pub async fn create(
67        options: EditorOptions
68    ) -> Result<Self, Box<dyn std::error::Error>> {
69        let base = Editor::create(options).await?;
70        Ok(AsyncEditor {
71            base,
72            flow_engine: FlowEngine::new()?,
73            perf_config: PerformanceConfig::default(),
74        })
75    }
76
77    /// 设置性能监控配置
78    pub fn set_performance_config(
79        &mut self,
80        config: PerformanceConfig,
81    ) {
82        self.perf_config = config;
83    }
84
85    /// 记录性能指标
86    fn log_performance(
87        &self,
88        operation: &str,
89        duration: Duration,
90    ) {
91        if self.perf_config.enable_monitoring
92            && duration.as_millis() > self.perf_config.log_threshold_ms as u128
93        {
94            debug!("{} 耗时: {}ms", operation, duration.as_millis());
95        }
96    }
97
98    /// 执行命令并生成相应的事务
99    ///
100    /// 此方法封装了命令到事务的转换过程,并使用高性能的`dispatch_flow`来处理生成的事务。
101    /// 适用于需要执行编辑器命令而不直接构建事务的场景。
102    ///
103    /// # 参数
104    /// * `command` - 要执行的命令
105    ///
106    /// # 返回值
107    /// * `EditorResult<()>` - 命令执行结果
108    pub async fn command(
109        &mut self,
110        command: Arc<dyn Command>,
111    ) -> EditorResult<()> {
112        let cmd_name = command.name();
113        debug!("正在执行命令: {}", cmd_name);
114
115        // 创建事务并应用命令
116        let mut tr = self.get_tr();
117        command.execute(&mut tr).await.map_err(|e| {
118            error_utils::state_error(format!("命令执行失败: {}", e))
119        })?;
120
121        // 使用高性能处理引擎处理事务
122        match self.dispatch_flow(tr).await {
123            Ok(_) => {
124                debug!("命令 '{}' 执行成功", cmd_name);
125                Ok(())
126            },
127            Err(e) => {
128                debug!("命令 '{}' 执行失败: {}", cmd_name, e);
129                Err(e)
130            },
131        }
132    }
133
134    /// 高性能事务处理方法,使用FlowEngine处理事务
135    ///
136    /// 与标准的dispatch方法相比,此方法具有以下优势:
137    /// 1. 利用FlowEngine提供的并行处理能力
138    /// 2. 通过异步流水线处理提高性能
139    /// 3. 减少阻塞操作,提升UI响应性
140    /// 4. 更好地处理大型文档的编辑操作
141    ///
142    /// # 参数
143    /// * `transaction` - 要处理的事务对象
144    ///
145    /// # 返回值
146    /// * `EditorResult<()>` - 处理结果,成功返回Ok(()), 失败返回错误
147    pub async fn dispatch_flow(
148        &mut self,
149        transaction: Transaction,
150    ) -> EditorResult<()> {
151        let start_time = std::time::Instant::now();
152        let mut current_transaction = transaction;
153        let old_id = self.get_state().version;
154        // 前置中间件处理
155        let middleware_start = std::time::Instant::now();
156        self.run_before_middleware(&mut current_transaction).await?;
157        self.log_performance("前置中间件处理", middleware_start.elapsed());
158
159        // 使用 flow_engine 提交事务
160        let flow_start = std::time::Instant::now();
161        let (_id, mut rx) = self
162            .flow_engine
163            .submit_transaction((
164                self.base.get_state().clone(),
165                current_transaction,
166            ))
167            .await?;
168        self.log_performance("提交事务", flow_start.elapsed());
169
170        // 等待任务结果
171        let recv_start = std::time::Instant::now();
172        let Some(task_result) = rx.recv().await else {
173            return Err(error_utils::state_error(
174                "无法接收任务结果".to_string(),
175            ));
176        };
177        self.log_performance("接收任务结果", recv_start.elapsed());
178
179        // 获取处理结果
180        let Some(ProcessorResult { result: Some(result), .. }) =
181            task_result.output
182        else {
183            return Err(error_utils::state_error(
184                "任务处理结果无效".to_string(),
185            ));
186        };
187
188        // 更新编辑器状态
189        let mut current_state = None;
190        let mut transactions = Vec::new();
191        transactions.extend(result.transactions);
192
193        // 检查最后一个事务是否改变了文档
194        if let Some(tr) = transactions.last() {
195            if tr.doc_changed() {
196                current_state = Some(Arc::new(result.state));
197            }
198        }
199
200        // 执行后置中间件链
201        let after_start = std::time::Instant::now();
202        self.run_after_middleware(&mut current_state, &mut transactions)
203            .await?;
204        self.log_performance("后置中间件处理", after_start.elapsed());
205
206        // 更新状态并广播事件
207        if let Some(state) = current_state {
208            let update_start = std::time::Instant::now();
209            self.base.update_state(state.clone()).await?;
210            self.log_performance("状态更新", update_start.elapsed());
211
212            let event_start = std::time::Instant::now();
213            self.base
214                .emit_event(Event::TrApply(
215                    old_id,
216                    Arc::new(transactions),
217                    state,
218                ))
219                .await?;
220            self.log_performance("事件广播", event_start.elapsed());
221        }
222
223        self.log_performance("事务处理总耗时", start_time.elapsed());
224        Ok(())
225    }
226
227    pub async fn run_before_middleware(
228        &mut self,
229        transaction: &mut Transaction,
230    ) -> EditorResult<()> {
231        debug!("执行前置中间件链");
232        for middleware in
233            &self.base.get_options().get_middleware_stack().middlewares
234        {
235            let timeout = std::time::Duration::from_millis(500);
236            if let Err(e) = tokio::time::timeout(
237                timeout,
238                middleware.before_dispatch(transaction),
239            )
240            .await
241            {
242                return Err(error_utils::middleware_error(format!(
243                    "中间件执行超时: {}",
244                    e
245                )));
246            }
247        }
248        Ok(())
249    }
250    pub async fn run_after_middleware(
251        &mut self,
252        state: &mut Option<Arc<State>>,
253        transactions: &mut Vec<Transaction>,
254    ) -> EditorResult<()> {
255        debug!("执行后置中间件链");
256        for middleware in
257            &self.base.get_options().get_middleware_stack().middlewares
258        {
259            // 使用常量定义超时时间,便于配置调整
260
261            let timeout = std::time::Duration::from_millis(
262                self.perf_config.middleware_timeout_ms,
263            );
264
265            // 记录中间件执行开始时间,用于性能监控
266            let start_time = std::time::Instant::now();
267
268            let middleware_result = match tokio::time::timeout(
269                timeout,
270                middleware.after_dispatch(state.clone(), transactions),
271            )
272            .await
273            {
274                Ok(result) => match result {
275                    Ok(r) => r,
276                    Err(e) => {
277                        // 记录更详细的错误信息
278                        debug!("中间件执行失败: {}", e);
279                        return Err(error_utils::middleware_error(format!(
280                            "中间件执行失败: {}",
281                            e
282                        )));
283                    },
284                },
285                Err(e) => {
286                    debug!("中间件执行超时: {}", e);
287                    return Err(error_utils::middleware_error(format!(
288                        "中间件执行超时: {}",
289                        e
290                    )));
291                },
292            };
293
294            // 记录中间件执行时间,用于性能监控
295            let elapsed = start_time.elapsed();
296            if elapsed.as_millis() > 100 {
297                debug!("中间件执行时间较长: {}ms", elapsed.as_millis());
298            }
299
300            if let Some(transaction) = middleware_result.additional_transaction
301            {
302                // 记录额外事务处理开始时间
303                let tx_start_time = std::time::Instant::now();
304
305                let result = match self
306                    .flow_engine
307                    .submit_transaction((
308                        self.base.get_state().clone(),
309                        transaction,
310                    ))
311                    .await
312                {
313                    Ok(result) => result,
314                    Err(e) => {
315                        debug!("附加事务提交失败: {}", e);
316                        return Err(error_utils::state_error(format!(
317                            "附加事务提交失败: {}",
318                            e
319                        )));
320                    },
321                };
322
323                let (_id, mut rx) = result;
324
325                let Some(task_result) = rx.recv().await else {
326                    debug!("接收事务处理结果失败");
327                    return Ok(());
328                };
329
330                let Some(ProcessorResult { result: Some(result), .. }) =
331                    task_result.output
332                else {
333                    debug!("处理结果无效");
334                    return Ok(());
335                };
336
337                let TransactionResult { state: new_state, transactions: trs } =
338                    result;
339                *state = Some(Arc::new(new_state));
340                transactions.extend(trs);
341
342                // 记录额外事务处理时间
343                let tx_elapsed = tx_start_time.elapsed();
344                if tx_elapsed.as_millis() > 50 {
345                    debug!(
346                        "附加事务处理时间较长: {}ms",
347                        tx_elapsed.as_millis()
348                    );
349                }
350            }
351        }
352        Ok(())
353    }
354}