moduforge_core/
runtime.rs

1use std::sync::Arc;
2use std::time::Instant;
3
4use crate::{
5    error::{error_utils, ForgeResult},
6    event::{Event, EventBus},
7    extension_manager::ExtensionManager,
8    helpers::create_doc,
9    history_manager::HistoryManager,
10    metrics,
11    sync_flow::FlowEngine,
12    types::{HistoryEntryWithMeta, ProcessorResult, RuntimeOptions},
13};
14
15use moduforge_model::{node_pool::NodePool, schema::Schema};
16use moduforge_state::{
17    debug, error, info,
18    ops::GlobalResourceManager,
19    state::{State, StateConfig, TransactionResult},
20    transaction::{Command, Transaction},
21};
22
23/// 默认中间件超时时间(毫秒)
24const DEFAULT_MIDDLEWARE_TIMEOUT_MS: u64 = 500;
25
26/// Editor 结构体代表编辑器的核心功能实现
27/// 负责管理文档状态、事件处理、插件系统和存储等核心功能
28pub struct ForgeRuntime {
29    event_bus: EventBus<Event>,
30    state: Arc<State>,
31    flow_engine: Arc<FlowEngine>,
32    extension_manager: ExtensionManager,
33    history_manager: HistoryManager<HistoryEntryWithMeta>,
34    options: RuntimeOptions,
35}
36unsafe impl Send for ForgeRuntime {}
37unsafe impl Sync for ForgeRuntime {}
38impl ForgeRuntime {
39    /// 创建新的编辑器实例
40    /// options: 编辑器配置选项
41    pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
42        let start_time = Instant::now();
43        info!("正在创建新的编辑器实例");
44        let extension_manager =
45            ExtensionManager::new(&options.get_extensions())?;
46        debug!("已初始化扩展管理器");
47
48        let event_bus = EventBus::new();
49        debug!("已创建文档和事件总线");
50        let op_state = GlobalResourceManager::new();
51        for op_fn in extension_manager.get_op_fns() {
52            op_fn(&op_state)?;
53        }
54
55        let mut config = StateConfig {
56            schema: Some(extension_manager.get_schema()),
57            doc: None,
58            stored_marks: None,
59            plugins: Some(extension_manager.get_plugins().clone()),
60            resource_manager: Some(Arc::new(op_state)),
61        };
62        create_doc::create_doc(&options.get_content(), &mut config).await?;
63        let state: State = State::create(config).await?;
64
65        let state: Arc<State> = Arc::new(state);
66        debug!("已创建编辑器状态");
67
68        let mut runtime = ForgeRuntime {
69            event_bus,
70            state: state.clone(),
71            flow_engine: Arc::new(FlowEngine::new()?),
72            extension_manager,
73            history_manager: HistoryManager::new(
74                HistoryEntryWithMeta::new(
75                    state.clone(),
76                    "创建工程项目".to_string(),
77                    serde_json::Value::Null,
78                ),
79                options.get_history_limit(),
80            ),
81            options,
82        };
83        runtime.init().await?;
84        info!("编辑器实例创建成功");
85        metrics::editor_creation_duration(start_time.elapsed());
86        Ok(runtime)
87    }
88
89    /// 初始化编辑器,设置事件处理器并启动事件循环
90    async fn init(&mut self) -> ForgeResult<()> {
91        debug!("正在初始化编辑器");
92        self.event_bus.add_event_handlers(self.options.get_event_handlers())?;
93        self.event_bus.start_event_loop();
94        debug!("事件总线已启动");
95
96        self.event_bus
97            .broadcast_blocking(Event::Create(self.state.clone()))
98            .map_err(|e| {
99                error!("广播创建事件失败: {}", e);
100                error_utils::event_error(format!(
101                    "Failed to broadcast create event: {}",
102                    e
103                ))
104            })?;
105        debug!("已广播创建事件");
106        Ok(())
107    }
108
109    /// 销毁编辑器实例
110    pub async fn destroy(&mut self) -> ForgeResult<()> {
111        debug!("正在销毁编辑器实例");
112        // 广播销毁事件
113        self.event_bus.broadcast(Event::Destroy).await?;
114        // 停止事件循环
115        self.event_bus.broadcast(Event::Stop).await?;
116        debug!("编辑器实例销毁成功");
117        Ok(())
118    }
119
120    pub async fn emit_event(
121        &mut self,
122        event: Event,
123    ) -> ForgeResult<()> {
124        metrics::event_emitted(event.name());
125        self.event_bus.broadcast(event).await?;
126        Ok(())
127    }
128    pub async fn run_before_middleware(
129        &mut self,
130        transaction: &mut Transaction,
131    ) -> ForgeResult<()> {
132        debug!("执行前置中间件链");
133        for middleware in &self.options.get_middleware_stack().middlewares {
134            let start_time = Instant::now();
135            let timeout =
136                std::time::Duration::from_millis(DEFAULT_MIDDLEWARE_TIMEOUT_MS);
137            match tokio::time::timeout(
138                timeout,
139                middleware.before_dispatch(transaction),
140            )
141            .await
142            {
143                Ok(Ok(())) => {
144                    // 中间件执行成功
145                    metrics::middleware_execution_duration(
146                        start_time.elapsed(),
147                        "before",
148                        middleware.name().as_str(),
149                    );
150                    continue;
151                },
152                Ok(Err(e)) => {
153                    return Err(error_utils::middleware_error(format!(
154                        "前置中间件执行失败: {}",
155                        e
156                    )));
157                },
158                Err(_) => {
159                    return Err(error_utils::middleware_error(format!(
160                        "前置中间件执行超时({}ms)",
161                        DEFAULT_MIDDLEWARE_TIMEOUT_MS
162                    )));
163                },
164            }
165        }
166        transaction.commit();
167        Ok(())
168    }
169    pub async fn run_after_middleware(
170        &mut self,
171        state: &mut Option<Arc<State>>,
172        transactions: &mut Vec<Transaction>,
173    ) -> ForgeResult<()> {
174        debug!("执行后置中间件链");
175        for middleware in &self.options.get_middleware_stack().middlewares {
176            let start_time = Instant::now();
177            let timeout =
178                std::time::Duration::from_millis(DEFAULT_MIDDLEWARE_TIMEOUT_MS);
179            let middleware_result = match tokio::time::timeout(
180                timeout,
181                middleware.after_dispatch(state.clone(), transactions),
182            )
183            .await
184            {
185                Ok(Ok(result)) => {
186                    metrics::middleware_execution_duration(
187                        start_time.elapsed(),
188                        "after",
189                        middleware.name().as_str(),
190                    );
191                    result
192                },
193                Ok(Err(e)) => {
194                    return Err(error_utils::middleware_error(format!(
195                        "后置中间件执行失败: {}",
196                        e
197                    )));
198                },
199                Err(_) => {
200                    return Err(error_utils::middleware_error(format!(
201                        "后置中间件执行超时({}ms)",
202                        DEFAULT_MIDDLEWARE_TIMEOUT_MS
203                    )));
204                },
205            };
206
207            if let Some(mut transaction) = middleware_result {
208                transaction.commit();
209                let TransactionResult { state: new_state, transactions: trs } =
210                    self.state.apply(transaction).await.map_err(|e| {
211                        error_utils::state_error(format!(
212                            "附加事务应用失败: {}",
213                            e
214                        ))
215                    })?;
216                *state = Some(Arc::new(new_state));
217                transactions.extend(trs);
218            }
219        }
220        Ok(())
221    }
222    pub async fn command(
223        &mut self,
224        command: Arc<dyn Command>,
225    ) -> ForgeResult<()> {
226        debug!("正在执行命令: {}", command.name());
227        metrics::command_executed(command.name().as_str());
228        let mut tr = self.get_tr();
229        command.execute(&mut tr).await?;
230        tr.commit();
231        self.dispatch(tr).await
232    }
233
234    pub async fn command_with_meta(
235        &mut self,
236        command: Arc<dyn Command>,
237        description: String,
238        meta: serde_json::Value,
239    ) -> ForgeResult<()> {
240        debug!("正在执行命令: {}", command.name());
241        metrics::command_executed(command.name().as_str());
242        let mut tr = self.get_tr();
243        command.execute(&mut tr).await?;
244        tr.commit();
245        self.dispatch_with_meta(tr, description, meta).await
246    }
247
248    /// 处理编辑器事务的核心方法
249    ///
250    /// # 参数
251    /// * `transaction` - 要处理的事务对象
252    ///
253    /// # 返回值
254    /// * `EditorResult<()>` - 处理结果,成功返回 Ok(()), 失败返回错误
255    pub async fn dispatch(
256        &mut self,
257        transaction: Transaction,
258    ) -> ForgeResult<()> {
259        self.dispatch_with_meta(
260            transaction,
261            "".to_string(),
262            serde_json::Value::Null,
263        )
264        .await
265    }
266    /// 更新编辑器状态并记录到历史记录 包含描述和元信息
267    pub async fn dispatch_with_meta(
268        &mut self,
269        transaction: Transaction,
270        description: String,
271        meta: serde_json::Value,
272    ) -> ForgeResult<()> {
273        metrics::transaction_dispatched();
274        let old_id = self.get_state().version;
275        // 保存当前事务的副本,用于中间件处理
276        let mut current_transaction = transaction;
277        self.run_before_middleware(&mut current_transaction).await?;
278
279        // 应用事务到编辑器状态,获取新的状态和产生的事务列表
280        let task_result = self
281            .flow_engine
282            .submit((self.state.clone(), current_transaction.clone()))
283            .await;
284        let Some(ProcessorResult { result: Some(result), .. }) =
285            task_result.output
286        else {
287            return Err(error_utils::state_error(
288                "任务处理结果无效".to_string(),
289            ));
290        };
291        // 使用 Option 来避免不必要的克隆
292        let mut state_update = None;
293        let mut transactions = Vec::new();
294        transactions.extend(result.transactions);
295        // 检查最后一个事务是否改变了文档
296        if let Some(_) = transactions.last() {
297            state_update = Some(Arc::new(result.state));
298        }
299        // 执行后置中间件链,允许中间件在事务应用后执行额外操作
300        self.run_after_middleware(&mut state_update, &mut transactions).await?;
301
302        // 如果有新的状态,更新编辑器状态并记录到历史记录
303        if let Some(state) = state_update {
304            self.update_state_with_meta(state.clone(), description, meta)
305                .await?;
306            self.emit_event(Event::TrApply(
307                old_id,
308                Arc::new(transactions),
309                state,
310            ))
311            .await?;
312        }
313        Ok(())
314    }
315    /// 更新编辑器状态并记录到历史记录 不包含描述和元信息
316    pub async fn update_state(
317        &mut self,
318        state: Arc<State>,
319    ) -> ForgeResult<()> {
320        self.update_state_with_meta(
321            state,
322            "".to_string(),
323            serde_json::Value::Null,
324        )
325        .await
326    }
327    /// 更新编辑器状态并记录到历史记录 包含描述和元信息
328    pub async fn update_state_with_meta(
329        &mut self,
330        state: Arc<State>,
331        description: String,
332        meta: serde_json::Value,
333    ) -> ForgeResult<()> {
334        self.state = state.clone();
335        self.history_manager.insert(HistoryEntryWithMeta::new(
336            state,
337            description,
338            meta,
339        ));
340        Ok(())
341    }
342
343    pub async fn register_plugin(&mut self) -> ForgeResult<()> {
344        info!("正在注册新插件");
345        let state = self
346            .get_state()
347            .reconfigure(StateConfig {
348                schema: Some(self.get_schema()),
349                doc: Some(self.get_state().doc()),
350                stored_marks: None,
351                plugins: Some(self.get_state().plugins().clone()),
352                resource_manager: Some(
353                    self.get_state().resource_manager().clone(),
354                ),
355            })
356            .await?;
357        self.update_state(Arc::new(state)).await?;
358        info!("插件注册成功");
359        Ok(())
360    }
361
362    pub async fn unregister_plugin(
363        &mut self,
364        plugin_key: String,
365    ) -> ForgeResult<()> {
366        info!("正在注销插件: {}", plugin_key);
367        let ps = self
368            .get_state()
369            .plugins()
370            .iter()
371            .filter(|p| p.key != plugin_key)
372            .cloned()
373            .collect();
374        let state = self
375            .get_state()
376            .reconfigure(StateConfig {
377                schema: Some(self.get_schema().clone()),
378                doc: Some(self.get_state().doc()),
379                stored_marks: None,
380                plugins: Some(ps),
381                resource_manager: Some(
382                    self.get_state().resource_manager().clone(),
383                ),
384            })
385            .await?;
386        self.update_state(Arc::new(state)).await?;
387        info!("插件注销成功");
388        Ok(())
389    }
390
391    /// 共享的基础实现方法
392    pub fn doc(&self) -> Arc<NodePool> {
393        self.state.doc()
394    }
395
396    pub fn get_options(&self) -> &RuntimeOptions {
397        &self.options
398    }
399
400    pub fn get_state(&self) -> &Arc<State> {
401        &self.state
402    }
403
404    pub fn get_schema(&self) -> Arc<Schema> {
405        self.extension_manager.get_schema()
406    }
407
408    pub fn get_event_bus(&self) -> &EventBus<Event> {
409        &self.event_bus
410    }
411
412    pub fn get_tr(&self) -> Transaction {
413        let tr = self.get_state().tr();
414        tr
415    }
416
417    pub fn undo(&mut self) {
418        self.history_manager.jump(-1);
419        self.state = self.history_manager.get_present().state;
420        metrics::history_operation("undo");
421    }
422
423    pub fn redo(&mut self) {
424        self.history_manager.jump(1);
425        self.state = self.history_manager.get_present().state;
426        metrics::history_operation("redo");
427    }
428
429    pub fn jump(
430        &mut self,
431        n: isize,
432    ) {
433        self.history_manager.jump(n);
434        self.state = self.history_manager.get_present().state;
435        metrics::history_operation("jump");
436    }
437    pub fn get_history_manager(&self) -> &HistoryManager<HistoryEntryWithMeta> {
438        &self.history_manager
439    }
440}
441impl Drop for ForgeRuntime {
442    fn drop(&mut self) {
443        self.event_bus.destroy();
444    }
445}