1use std::sync::Arc;
2use std::time::Instant;
3
4use async_trait::async_trait;
5
6use crate::{
7 config::ForgeConfig,
8 debug::{debug, info},
9 error::{error_utils, ForgeResult},
10 event::{Event, EventBus},
11 extension_manager::ExtensionManager,
12 helpers::{
13 create_doc, event_helper::EventHelper, history_helper::HistoryHelper,
14 middleware_helper::MiddlewareHelper,
15 },
16 history_manager::HistoryManager,
17 metrics,
18 runtime::{runtime_trait::RuntimeTrait, sync_flow::FlowEngine},
19 types::{HistoryEntryWithMeta, ProcessorResult, RuntimeOptions},
20};
21
22use mf_model::{node_pool::NodePool, schema::Schema};
23use mf_state::{
24 ops::GlobalResourceManager,
25 state::{State, StateConfig},
26 transaction::{Command, Transaction},
27};
28
29pub struct ForgeRuntime {
32 event_bus: EventBus<Event>,
33 state: Arc<State>,
34 flow_engine: Arc<FlowEngine>,
35 extension_manager: ExtensionManager,
36 history_manager: HistoryManager<HistoryEntryWithMeta>,
37 options: RuntimeOptions,
38 config: ForgeConfig,
39}
40impl ForgeRuntime {
41 pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
54 Self::create_with_config(options, ForgeConfig::default()).await
55 }
56
57 pub async fn from_xml_schema_path(
79 xml_schema_path: &str,
80 options: Option<RuntimeOptions>,
81 config: Option<ForgeConfig>,
82 ) -> ForgeResult<Self> {
83 let mut config = config.unwrap_or_default();
84 config.extension.xml_schema_paths = vec![xml_schema_path.to_string()];
85 Self::create_with_config(options.unwrap_or_default(), config).await
86 }
87
88 fn merge_options_with_extensions(
97 options: Option<RuntimeOptions>,
98 extension_manager: ExtensionManager,
99 ) -> RuntimeOptions {
100 match options {
101 Some(opts) => {
102 let schema = extension_manager.get_schema();
104 let mut xml_extensions = Vec::new();
105
106 for (name, node_type) in &schema.nodes {
108 let node =
109 crate::node::Node::create(name, node_type.spec.clone());
110 xml_extensions.push(crate::types::Extensions::N(node));
111 }
112
113 for (name, mark_type) in &schema.marks {
115 let mark =
116 crate::mark::Mark::new(name, mark_type.spec.clone());
117 xml_extensions.push(crate::types::Extensions::M(mark));
118 }
119
120 let existing_extensions = opts.get_extensions();
122 xml_extensions.extend(existing_extensions);
123 opts.set_extensions(xml_extensions)
124 },
125 None => {
126 RuntimeOptions::from_extension_manager(extension_manager)
128 },
129 }
130 }
131
132 pub async fn from_xml_schemas(
142 xml_schema_paths: &[&str],
143 options: Option<RuntimeOptions>,
144 config: Option<ForgeConfig>,
145 ) -> ForgeResult<Self> {
146 let mut config = config.unwrap_or_default();
147 config.extension.xml_schema_paths =
148 xml_schema_paths.iter().map(|s| s.to_string()).collect();
149 Self::create_with_config(options.unwrap_or_default(), config).await
150 }
151
152 pub async fn from_xml_content(
177 xml_content: &str,
178 options: Option<RuntimeOptions>,
179 config: Option<ForgeConfig>,
180 ) -> ForgeResult<Self> {
181 let extension_manager = ExtensionManager::from_xml_string(xml_content)?;
182 let final_options =
183 Self::merge_options_with_extensions(options, extension_manager);
184
185 Self::create_with_config(final_options, config.unwrap_or_default())
186 .await
187 }
188
189 pub async fn create_with_config(
203 options: RuntimeOptions,
204 config: ForgeConfig,
205 ) -> ForgeResult<Self> {
206 let start_time = Instant::now();
207 info!("正在创建新的编辑器实例");
208
209 let extension_manager =
211 Self::create_extension_manager(&options, &config)?;
212
213 debug!("已初始化扩展管理器");
214
215 let op_state = GlobalResourceManager::new();
216 for op_fn in extension_manager.get_op_fns() {
217 op_fn(&op_state)?;
218 }
219
220 let mut state_config = StateConfig {
221 schema: Some(extension_manager.get_schema()),
222 doc: None,
223 stored_marks: None,
224 plugins: Some(extension_manager.get_plugins().clone()),
225 resource_manager: Some(Arc::new(op_state)),
226 };
227 create_doc::create_doc(&options.get_content(), &mut state_config)
228 .await?;
229 let state: State = State::create(state_config).await?;
230
231 let state: Arc<State> = Arc::new(state);
232 debug!("已创建编辑器状态");
233
234 let event_bus = EventHelper::create_and_init_event_bus(
236 &config,
237 &options,
238 state.clone(),
239 )
240 .await?;
241
242 let runtime = ForgeRuntime {
243 event_bus,
244 state: state.clone(),
245 flow_engine: Arc::new(FlowEngine::new()?),
246 extension_manager,
247 history_manager: HistoryManager::with_config(
248 HistoryEntryWithMeta::new(
249 state.clone(),
250 "创建工程项目".to_string(),
251 serde_json::Value::Null,
252 ),
253 config.history.clone(),
254 ),
255 options,
256 config,
257 };
258 info!("编辑器实例创建成功");
259 metrics::editor_creation_duration(start_time.elapsed());
260 Ok(runtime)
261 }
262
263 fn create_extension_manager(
272 options: &RuntimeOptions,
273 config: &ForgeConfig,
274 ) -> ForgeResult<ExtensionManager> {
275 crate::helpers::runtime_common::ExtensionManagerHelper::create_extension_manager(
276 options, config,
277 )
278 }
279
280 pub async fn destroy(&mut self) -> ForgeResult<()> {
282 debug!("正在销毁编辑器实例");
283 EventHelper::destroy_event_bus(&mut self.event_bus).await?;
284 debug!("编辑器实例销毁成功");
285 Ok(())
286 }
287
288 pub async fn emit_event(
289 &mut self,
290 event: Event,
291 ) -> ForgeResult<()> {
292 EventHelper::emit_event(&mut self.event_bus, event).await
293 }
294 pub async fn run_before_middleware(
295 &mut self,
296 transaction: &mut Transaction,
297 ) -> ForgeResult<()> {
298 MiddlewareHelper::run_before_middleware(
299 transaction,
300 &self.options.get_middleware_stack(),
301 &self.config,
302 )
303 .await
304 }
305 pub async fn run_after_middleware(
306 &mut self,
307 state: &mut Option<Arc<State>>,
308 transactions: &mut Vec<Arc<Transaction>>,
309 ) -> ForgeResult<()> {
310 debug!("执行后置中间件链");
311 for middleware in &self.options.get_middleware_stack().middlewares {
312 let start_time = Instant::now();
313 let timeout = std::time::Duration::from_millis(
314 self.config.performance.middleware_timeout_ms,
315 );
316 let middleware_result = match tokio::time::timeout(
317 timeout,
318 middleware.after_dispatch(state.clone(), transactions),
319 )
320 .await
321 {
322 Ok(Ok(result)) => {
323 metrics::middleware_execution_duration(
324 start_time.elapsed(),
325 "after",
326 middleware.name().as_str(),
327 );
328 result
329 },
330 Ok(Err(e)) => {
331 return Err(error_utils::middleware_error(format!(
332 "后置中间件执行失败: {e}"
333 )));
334 },
335 Err(_) => {
336 return Err(error_utils::middleware_error(format!(
337 "后置中间件执行超时({}ms)",
338 self.config.performance.middleware_timeout_ms
339 )));
340 },
341 };
342
343 if let Some(mut transaction) = middleware_result {
344 transaction.commit()?;
346 let task_result = self
347 .flow_engine
348 .submit((self.state.clone(), transaction))
349 .await;
350 let Some(ProcessorResult { result: Some(result), .. }) =
351 task_result.output
352 else {
353 return Err(error_utils::state_error(
354 "附加事务处理结果无效".to_string(),
355 ));
356 };
357 *state = Some(result.state);
358 transactions.extend(result.transactions);
359 }
360 }
361 Ok(())
362 }
363 pub async fn command(
364 &mut self,
365 command: Arc<dyn Command>,
366 ) -> ForgeResult<()> {
367 debug!("正在执行命令: {}", command.name());
368 metrics::command_executed(command.name().as_str());
369 let mut tr = self.get_tr();
370 command.execute(&mut tr).await?;
371 tr.commit()?;
372 self.dispatch(tr).await
373 }
374
375 pub async fn command_with_meta(
376 &mut self,
377 command: Arc<dyn Command>,
378 description: String,
379 meta: serde_json::Value,
380 ) -> ForgeResult<()> {
381 debug!("正在执行命令: {}", command.name());
382 metrics::command_executed(command.name().as_str());
383 let mut tr = self.get_tr();
384 command.execute(&mut tr).await?;
385 tr.commit()?;
386 self.dispatch_with_meta(tr, description, meta).await
387 }
388
389 pub async fn dispatch(
397 &mut self,
398 transaction: Transaction,
399 ) -> ForgeResult<()> {
400 self.dispatch_with_meta(
401 transaction,
402 "".to_string(),
403 serde_json::Value::Null,
404 )
405 .await
406 }
407 pub async fn dispatch_with_meta(
409 &mut self,
410 transaction: Transaction,
411 description: String,
412 meta: serde_json::Value,
413 ) -> ForgeResult<()> {
414 metrics::transaction_dispatched();
415 let old_id = self.get_state().version;
416 let mut current_transaction = transaction;
418 self.run_before_middleware(&mut current_transaction).await?;
419
420 let task_result = self
422 .flow_engine
423 .submit((self.state.clone(), current_transaction.clone()))
424 .await;
425 let Some(ProcessorResult { result: Some(result), .. }) =
426 task_result.output
427 else {
428 return Err(error_utils::state_error(
429 "任务处理结果无效".to_string(),
430 ));
431 };
432 let mut state_update = None;
434 let mut transactions = Vec::new();
435 transactions.extend(result.transactions);
436 if transactions.last().is_some() {
438 state_update = Some(result.state);
439 }
440 self.run_after_middleware(&mut state_update, &mut transactions).await?;
442
443 if let Some(state) = state_update {
445 self.update_state_with_meta(state.clone(), description, meta)
446 .await?;
447 self.emit_event(Event::TrApply(old_id, transactions, state))
448 .await?;
449 }
450 Ok(())
451 }
452 pub async fn update_state(
454 &mut self,
455 state: Arc<State>,
456 ) -> ForgeResult<()> {
457 self.update_state_with_meta(
458 state,
459 "".to_string(),
460 serde_json::Value::Null,
461 )
462 .await
463 }
464 pub async fn update_state_with_meta(
466 &mut self,
467 state: Arc<State>,
468 description: String,
469 meta: serde_json::Value,
470 ) -> ForgeResult<()> {
471 self.state = state.clone();
472 HistoryHelper::insert(
473 &mut self.history_manager,
474 state,
475 description,
476 meta,
477 );
478 Ok(())
479 }
480
481 pub async fn register_plugin(&mut self) -> ForgeResult<()> {
482 info!("正在注册新插件");
483 let state = self
484 .get_state()
485 .reconfigure(StateConfig {
486 schema: Some(self.get_schema()),
487 doc: Some(self.get_state().doc()),
488 stored_marks: None,
489 plugins: Some(self.get_state().plugins().await),
490 resource_manager: Some(
491 self.get_state().resource_manager().clone(),
492 ),
493 })
494 .await?;
495 self.update_state(Arc::new(state)).await?;
496 info!("插件注册成功");
497 Ok(())
498 }
499
500 pub async fn unregister_plugin(
501 &mut self,
502 plugin_key: String,
503 ) -> ForgeResult<()> {
504 info!("正在注销插件: {}", plugin_key);
505 let ps = self
506 .get_state()
507 .plugins()
508 .await
509 .iter()
510 .filter(|p| p.key != plugin_key)
511 .cloned()
512 .collect();
513 let state = self
514 .get_state()
515 .reconfigure(StateConfig {
516 schema: Some(self.get_schema().clone()),
517 doc: Some(self.get_state().doc()),
518 stored_marks: None,
519 plugins: Some(ps),
520 resource_manager: Some(
521 self.get_state().resource_manager().clone(),
522 ),
523 })
524 .await?;
525 self.update_state(Arc::new(state)).await?;
526 info!("插件注销成功");
527 Ok(())
528 }
529
530 pub fn doc(&self) -> Arc<NodePool> {
532 self.state.doc()
533 }
534
535 pub fn get_options(&self) -> &RuntimeOptions {
536 &self.options
537 }
538
539 pub fn get_config(&self) -> &ForgeConfig {
541 &self.config
542 }
543
544 pub fn update_config(
546 &mut self,
547 config: ForgeConfig,
548 ) {
549 self.config = config;
550 }
551
552 pub fn get_state(&self) -> &Arc<State> {
553 &self.state
554 }
555
556 pub fn get_schema(&self) -> Arc<Schema> {
557 self.extension_manager.get_schema()
558 }
559
560 pub fn get_event_bus(&self) -> &EventBus<Event> {
561 &self.event_bus
562 }
563
564 pub fn get_tr(&self) -> Transaction {
565 self.get_state().tr()
566 }
567
568 pub fn undo(&mut self) {
569 self.state = HistoryHelper::undo(&mut self.history_manager);
570 }
571
572 pub fn redo(&mut self) {
573 self.state = HistoryHelper::redo(&mut self.history_manager);
574 }
575
576 pub fn jump(
577 &mut self,
578 n: isize,
579 ) {
580 self.state = HistoryHelper::jump(&mut self.history_manager, n);
581 }
582 pub fn get_history_manager(&self) -> &HistoryManager<HistoryEntryWithMeta> {
583 &self.history_manager
584 }
585}
586impl Drop for ForgeRuntime {
587 fn drop(&mut self) {
588 EventHelper::destroy_event_bus_blocking(&mut self.event_bus);
590 }
591}
592
593#[async_trait]
596impl RuntimeTrait for ForgeRuntime {
597 async fn dispatch(
598 &mut self,
599 transaction: Transaction,
600 ) -> ForgeResult<()> {
601 self.dispatch(transaction).await
602 }
603
604 async fn dispatch_with_meta(
605 &mut self,
606 transaction: Transaction,
607 description: String,
608 meta: serde_json::Value,
609 ) -> ForgeResult<()> {
610 self.dispatch_with_meta(transaction, description, meta).await
611 }
612
613 async fn command(
614 &mut self,
615 command: Arc<dyn Command>,
616 ) -> ForgeResult<()> {
617 self.command(command).await
618 }
619
620 async fn command_with_meta(
621 &mut self,
622 command: Arc<dyn Command>,
623 description: String,
624 meta: serde_json::Value,
625 ) -> ForgeResult<()> {
626 self.command_with_meta(command, description, meta).await
627 }
628
629 async fn get_state(&self) -> ForgeResult<Arc<State>> {
630 Ok(self.get_state().clone())
631 }
632
633 async fn get_tr(&self) -> ForgeResult<Transaction> {
634 Ok(self.get_tr())
635 }
636
637 async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
638 Ok(self.get_schema())
639 }
640
641 async fn undo(&mut self) -> ForgeResult<()> {
642 self.undo();
643 Ok(())
644 }
645
646 async fn redo(&mut self) -> ForgeResult<()> {
647 self.redo();
648 Ok(())
649 }
650
651 async fn jump(
652 &mut self,
653 steps: isize,
654 ) -> ForgeResult<()> {
655 self.jump(steps);
656 Ok(())
657 }
658
659 fn get_config(&self) -> &ForgeConfig {
660 self.get_config()
661 }
662
663 fn update_config(
664 &mut self,
665 config: ForgeConfig,
666 ) {
667 self.update_config(config);
668 }
669
670 fn get_options(&self) -> &RuntimeOptions {
671 self.get_options()
672 }
673
674 async fn destroy(&mut self) -> ForgeResult<()> {
675 self.destroy().await
676 }
677}