1use std::sync::Arc;
2use std::time::Instant;
3
4use async_trait::async_trait;
5
6use crate::{
7 config::ForgeConfig,
8 debug::{debug, info},
9 error::{error_utils, ForgeResult},
10 event::{Event, EventBus},
11 extension_manager::ExtensionManager,
12 helpers::{
13 create_doc, event_helper::EventHelper, history_helper::HistoryHelper,
14 middleware_helper::MiddlewareHelper,
15 },
16 history_manager::HistoryManager,
17 metrics,
18 runtime::{runtime_trait::RuntimeTrait, sync_flow::FlowEngine},
19 types::{HistoryEntryWithMeta, ProcessorResult, RuntimeOptions},
20};
21
22use mf_model::{node_pool::NodePool, schema::Schema};
23use mf_state::{
24 ops::GlobalResourceManager,
25 state::{State, StateConfig},
26 transaction::{Command, Transaction},
27};
28
29pub struct ForgeRuntime {
32 event_bus: EventBus<Event>,
33 state: Arc<State>,
34 flow_engine: Arc<FlowEngine>,
35 extension_manager: ExtensionManager,
36 history_manager: HistoryManager<HistoryEntryWithMeta>,
37 options: RuntimeOptions,
38 config: ForgeConfig,
39}
40impl ForgeRuntime {
41 #[cfg_attr(
54 feature = "dev-tracing",
55 tracing::instrument(
56 skip(options),
57 fields(crate_name = "core", runtime_type = "sync")
58 )
59 )]
60 pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
61 Self::create_with_config(options, ForgeConfig::default()).await
62 }
63
64 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
86 crate_name = "core",
87 schema_path = xml_schema_path
88 )))]
89 pub async fn from_xml_schema_path(
90 xml_schema_path: &str,
91 options: Option<RuntimeOptions>,
92 config: Option<ForgeConfig>,
93 ) -> ForgeResult<Self> {
94 let mut config = config.unwrap_or_default();
95 config.extension.xml_schema_paths = vec![xml_schema_path.to_string()];
96 Self::create_with_config(options.unwrap_or_default(), config).await
97 }
98
99 fn merge_options_with_extensions(
108 options: Option<RuntimeOptions>,
109 extension_manager: ExtensionManager,
110 ) -> RuntimeOptions {
111 match options {
112 Some(opts) => {
113 let schema = extension_manager.get_schema();
115 let mut xml_extensions = Vec::new();
116 let factory = schema.factory();
117 let (nodes, marks) = factory.definitions();
118 for (name, node_type) in nodes {
120 let node =
121 crate::node::Node::create(name, node_type.spec.clone());
122 xml_extensions.push(crate::types::Extensions::N(node));
123 }
124
125 for (name, mark_type) in marks {
127 let mark =
128 crate::mark::Mark::new(name, mark_type.spec.clone());
129 xml_extensions.push(crate::types::Extensions::M(mark));
130 }
131
132 let existing_extensions = opts.get_extensions();
134 xml_extensions.extend(existing_extensions);
135 opts.set_extensions(xml_extensions)
136 },
137 None => {
138 RuntimeOptions::from_extension_manager(extension_manager)
140 },
141 }
142 }
143
144 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_schema_paths, options, config), fields(
154 crate_name = "core",
155 schema_count = xml_schema_paths.len(),
156 runtime_type = "sync"
157 )))]
158 pub async fn from_xml_schemas(
159 xml_schema_paths: &[&str],
160 options: Option<RuntimeOptions>,
161 config: Option<ForgeConfig>,
162 ) -> ForgeResult<Self> {
163 let mut config = config.unwrap_or_default();
164 config.extension.xml_schema_paths =
165 xml_schema_paths.iter().map(|s| s.to_string()).collect();
166 Self::create_with_config(options.unwrap_or_default(), config).await
167 }
168
169 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_content, options, config), fields(
194 crate_name = "core",
195 content_size = xml_content.len(),
196 runtime_type = "sync"
197 )))]
198 pub async fn from_xml_content(
199 xml_content: &str,
200 options: Option<RuntimeOptions>,
201 config: Option<ForgeConfig>,
202 ) -> ForgeResult<Self> {
203 let extension_manager = ExtensionManager::from_xml_string(xml_content)?;
204 let final_options =
205 Self::merge_options_with_extensions(options, extension_manager);
206
207 Self::create_with_config(final_options, config.unwrap_or_default())
208 .await
209 }
210
211 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
225 crate_name = "core",
226 runtime_type = "sync",
227 has_middleware = !options.get_middleware_stack().is_empty()
228 )))]
229 pub async fn create_with_config(
230 options: RuntimeOptions,
231 config: ForgeConfig,
232 ) -> ForgeResult<Self> {
233 let start_time = Instant::now();
234 info!("正在创建新的编辑器实例");
235
236 let extension_manager =
238 Self::create_extension_manager(&options, &config)?;
239
240 debug!("已初始化扩展管理器");
241
242 let op_state = GlobalResourceManager::new();
243 for op_fn in extension_manager.get_op_fns() {
244 op_fn(&op_state)?;
245 }
246
247 let mut state_config = StateConfig {
248 schema: Some(extension_manager.get_schema()),
249 doc: None,
250 stored_marks: None,
251 plugins: Some(extension_manager.get_plugins().clone()),
252 resource_manager: Some(Arc::new(op_state)),
253 };
254 create_doc::create_doc(&options.get_content(), &mut state_config)
255 .await?;
256 let state: State = State::create(state_config).await?;
257
258 let state: Arc<State> = Arc::new(state);
259 debug!("已创建编辑器状态");
260
261 let event_bus = EventHelper::create_and_init_event_bus(
263 &config,
264 &options,
265 state.clone(),
266 )
267 .await?;
268
269 let initial_transaction = state.tr();
271
272 let runtime = ForgeRuntime {
273 event_bus,
274 state: state.clone(),
275 flow_engine: Arc::new(FlowEngine::new()?),
276 extension_manager,
277 history_manager: HistoryManager::with_config(
278 HistoryEntryWithMeta::new(
279 Arc::new(initial_transaction),
280 state.clone(),
281 "创建工程项目".to_string(),
282 serde_json::Value::Null,
283 ),
284 config.history.clone(),
285 ),
286 options,
287 config,
288 };
289 info!("编辑器实例创建成功");
290 metrics::editor_creation_duration(start_time.elapsed());
291 Ok(runtime)
292 }
293
294 fn create_extension_manager(
303 options: &RuntimeOptions,
304 config: &ForgeConfig,
305 ) -> ForgeResult<ExtensionManager> {
306 crate::helpers::runtime_common::ExtensionManagerHelper::create_extension_manager(
307 options, config,
308 )
309 }
310
311 #[cfg_attr(
313 feature = "dev-tracing",
314 tracing::instrument(skip(self), fields(crate_name = "core"))
315 )]
316 pub async fn destroy(&mut self) -> ForgeResult<()> {
317 debug!("正在销毁编辑器实例");
318 EventHelper::destroy_event_bus(&mut self.event_bus).await?;
319 debug!("编辑器实例销毁成功");
320 Ok(())
321 }
322
323 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, event), fields(
324 crate_name = "core",
325 event_type = std::any::type_name_of_val(&event)
326 )))]
327 pub async fn emit_event(
328 &mut self,
329 event: Event,
330 ) -> ForgeResult<()> {
331 EventHelper::emit_event(&mut self.event_bus, event).await
332 }
333 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
334 crate_name = "core",
335 tr_id = %transaction.id,
336 middleware_count = self.options.get_middleware_stack().middlewares.len()
337 )))]
338 pub async fn run_before_middleware(
339 &mut self,
340 transaction: &mut Transaction,
341 ) -> ForgeResult<()> {
342 MiddlewareHelper::run_before_middleware(
343 transaction,
344 &self.options.get_middleware_stack(),
345 &self.config,
346 )
347 .await
348 }
349 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, state, transactions), fields(
350 crate_name = "core",
351 has_state = state.is_some(),
352 tr_count = transactions.len(),
353 middleware_count = self.options.get_middleware_stack().middlewares.len()
354 )))]
355 pub async fn run_after_middleware(
356 &mut self,
357 state: &mut Option<Arc<State>>,
358 transactions: &mut Vec<Arc<Transaction>>,
359 ) -> ForgeResult<()> {
360 debug!("执行后置中间件链");
361 for middleware in &self.options.get_middleware_stack().middlewares {
362 let start_time = Instant::now();
363 let timeout = std::time::Duration::from_millis(
364 self.config.performance.middleware_timeout_ms,
365 );
366 let middleware_result = match tokio::time::timeout(
367 timeout,
368 middleware.after_dispatch(state.clone(), transactions),
369 )
370 .await
371 {
372 Ok(Ok(result)) => {
373 metrics::middleware_execution_duration(
374 start_time.elapsed(),
375 "after",
376 middleware.name().as_str(),
377 );
378 result
379 },
380 Ok(Err(e)) => {
381 return Err(error_utils::middleware_error(format!(
382 "后置中间件执行失败: {e}"
383 )));
384 },
385 Err(_) => {
386 return Err(error_utils::middleware_error(format!(
387 "后置中间件执行超时({}ms)",
388 self.config.performance.middleware_timeout_ms
389 )));
390 },
391 };
392
393 if let Some(mut transaction) = middleware_result {
394 transaction.commit()?;
396 let task_result = self
397 .flow_engine
398 .submit((self.state.clone(), transaction))
399 .await;
400 let Some(ProcessorResult { result: Some(result), .. }) =
401 task_result.output
402 else {
403 return Err(error_utils::state_error(
404 "附加事务处理结果无效".to_string(),
405 ));
406 };
407 *state = Some(result.state);
408 transactions.extend(result.transactions);
409 }
410 }
411 Ok(())
412 }
413 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command), fields(
414 crate_name = "core",
415 command_name = %command.name()
416 )))]
417 pub async fn command(
418 &mut self,
419 command: Arc<dyn Command>,
420 ) -> ForgeResult<()> {
421 debug!("正在执行命令: {}", command.name());
422 metrics::command_executed(command.name().as_str());
423 let mut tr = self.get_tr();
424 command.execute(&mut tr).await?;
425 tr.commit()?;
426 self.dispatch(tr).await
427 }
428
429 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command, meta), fields(
430 crate_name = "core",
431 command_name = %command.name(),
432 description = %description
433 )))]
434 pub async fn command_with_meta(
435 &mut self,
436 command: Arc<dyn Command>,
437 description: String,
438 meta: serde_json::Value,
439 ) -> ForgeResult<()> {
440 debug!("正在执行命令: {}", command.name());
441 metrics::command_executed(command.name().as_str());
442 let mut tr = self.get_tr();
443 command.execute(&mut tr).await?;
444 tr.commit()?;
445 self.dispatch_with_meta(tr, description, meta).await
446 }
447
448 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
456 crate_name = "core",
457 tr_id = %transaction.id,
458 runtime_type = "sync"
459 )))]
460 pub async fn dispatch(
461 &mut self,
462 transaction: Transaction,
463 ) -> ForgeResult<()> {
464 self.dispatch_with_meta(
465 transaction,
466 "".to_string(),
467 serde_json::Value::Null,
468 )
469 .await
470 }
471 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction, meta), fields(
473 crate_name = "core",
474 tr_id = %transaction.id,
475 description = %description,
476 runtime_type = "sync"
477 )))]
478 pub async fn dispatch_with_meta(
479 &mut self,
480 transaction: Transaction,
481 description: String,
482 meta: serde_json::Value,
483 ) -> ForgeResult<()> {
484 metrics::transaction_dispatched();
485 let _old_id = self.get_state().version;
486 let mut current_transaction = transaction;
488 self.run_before_middleware(&mut current_transaction).await?;
489
490 let task_result = self
492 .flow_engine
493 .submit((self.state.clone(), current_transaction.clone()))
494 .await;
495 let Some(ProcessorResult { result: Some(result), .. }) =
496 task_result.output
497 else {
498 return Err(error_utils::state_error(
499 "任务处理结果无效".to_string(),
500 ));
501 };
502 let mut state_update = None;
504 let mut transactions = Vec::new();
505 transactions.extend(result.transactions);
506 if transactions.last().is_some() {
508 state_update = Some(result.state);
509 }
510 self.run_after_middleware(&mut state_update, &mut transactions).await?;
512
513 if let Some(new_state) = state_update {
515 let old_state = self.state.clone();
516 self.update_state_with_meta(new_state.clone(), transactions.clone(), description, meta)
517 .await?;
518 self.emit_event(Event::TrApply {
519 old_state,
520 new_state,
521 transactions,
522 })
523 .await?;
524 }
525 Ok(())
526 }
527 pub async fn update_state(
529 &mut self,
530 state: Arc<State>,
531 ) -> ForgeResult<()> {
532 self.update_state_with_meta(
533 state,
534 vec![],
535 "".to_string(),
536 serde_json::Value::Null,
537 )
538 .await
539 }
540 pub async fn update_state_with_meta(
542 &mut self,
543 state: Arc<State>,
544 transactions: Vec<Arc<mf_state::Transaction>>,
545 description: String,
546 meta: serde_json::Value,
547 ) -> ForgeResult<()> {
548 self.state = state.clone();
549 HistoryHelper::insert(
550 &mut self.history_manager,
551 state,
552 transactions,
553 description,
554 meta,
555 );
556 Ok(())
557 }
558
559 pub async fn register_plugin(&mut self) -> ForgeResult<()> {
560 info!("正在注册新插件");
561 let state = self
562 .get_state()
563 .reconfigure(StateConfig {
564 schema: Some(self.get_schema()),
565 doc: Some(self.get_state().doc()),
566 stored_marks: None,
567 plugins: Some(self.get_state().plugins().await),
568 resource_manager: Some(
569 self.get_state().resource_manager().clone(),
570 ),
571 })
572 .await?;
573 self.update_state(Arc::new(state)).await?;
574 info!("插件注册成功");
575 Ok(())
576 }
577
578 pub async fn unregister_plugin(
579 &mut self,
580 plugin_key: String,
581 ) -> ForgeResult<()> {
582 info!("正在注销插件: {}", plugin_key);
583 let ps = self
584 .get_state()
585 .plugins()
586 .await
587 .iter()
588 .filter(|p| p.key != plugin_key)
589 .cloned()
590 .collect();
591 let state = self
592 .get_state()
593 .reconfigure(StateConfig {
594 schema: Some(self.get_schema().clone()),
595 doc: Some(self.get_state().doc()),
596 stored_marks: None,
597 plugins: Some(ps),
598 resource_manager: Some(
599 self.get_state().resource_manager().clone(),
600 ),
601 })
602 .await?;
603 self.update_state(Arc::new(state)).await?;
604 info!("插件注销成功");
605 Ok(())
606 }
607
608 pub fn doc(&self) -> Arc<NodePool> {
610 self.state.doc()
611 }
612
613 pub fn get_options(&self) -> &RuntimeOptions {
614 &self.options
615 }
616
617 pub fn get_config(&self) -> &ForgeConfig {
619 &self.config
620 }
621
622 pub fn update_config(
624 &mut self,
625 config: ForgeConfig,
626 ) {
627 self.config = config;
628 }
629
630 pub fn get_state(&self) -> &Arc<State> {
631 &self.state
632 }
633
634 pub fn get_schema(&self) -> Arc<Schema> {
635 self.extension_manager.get_schema()
636 }
637
638 pub fn get_event_bus(&self) -> &EventBus<Event> {
639 &self.event_bus
640 }
641
642 pub fn get_tr(&self) -> Transaction {
643 self.get_state().tr()
644 }
645
646 pub fn undo(&mut self) {
647 if let Some(result) = HistoryHelper::undo(&mut self.history_manager, self.state.clone()) {
648 self.state = result.new_state.clone();
649
650 let _ = self.event_bus.broadcast_blocking(Event::Undo {
652 old_state: result.old_state,
653 new_state: result.new_state,
654 transactions: result.transactions,
655 });
656 }
657 }
658
659 pub fn redo(&mut self) {
660 if let Some(result) = HistoryHelper::redo(&mut self.history_manager, self.state.clone()) {
661 self.state = result.new_state.clone();
662
663 let _ = self.event_bus.broadcast_blocking(Event::Redo {
665 old_state: result.old_state,
666 new_state: result.new_state,
667 transactions: result.transactions,
668 });
669 }
670 }
671
672 pub fn jump(
673 &mut self,
674 n: isize,
675 ) {
676 if let Some(result) = HistoryHelper::jump(&mut self.history_manager, self.state.clone(), n) {
677 self.state = result.new_state.clone();
678
679 let _ = self.event_bus.broadcast_blocking(Event::Jump {
681 old_state: result.old_state,
682 new_state: result.new_state,
683 transactions: result.transactions,
684 steps: n,
685 });
686 }
687 }
688 pub fn get_history_manager(&self) -> &HistoryManager<HistoryEntryWithMeta> {
689 &self.history_manager
690 }
691}
692impl Drop for ForgeRuntime {
693 fn drop(&mut self) {
694 EventHelper::destroy_event_bus_blocking(&mut self.event_bus);
696 }
697}
698
699#[async_trait]
702impl RuntimeTrait for ForgeRuntime {
703 async fn dispatch(
704 &mut self,
705 transaction: Transaction,
706 ) -> ForgeResult<()> {
707 self.dispatch(transaction).await
708 }
709
710 async fn dispatch_with_meta(
711 &mut self,
712 transaction: Transaction,
713 description: String,
714 meta: serde_json::Value,
715 ) -> ForgeResult<()> {
716 self.dispatch_with_meta(transaction, description, meta).await
717 }
718
719 async fn command(
720 &mut self,
721 command: Arc<dyn Command>,
722 ) -> ForgeResult<()> {
723 self.command(command).await
724 }
725
726 async fn command_with_meta(
727 &mut self,
728 command: Arc<dyn Command>,
729 description: String,
730 meta: serde_json::Value,
731 ) -> ForgeResult<()> {
732 self.command_with_meta(command, description, meta).await
733 }
734
735 async fn get_state(&self) -> ForgeResult<Arc<State>> {
736 Ok(self.get_state().clone())
737 }
738
739 async fn get_tr(&self) -> ForgeResult<Transaction> {
740 Ok(self.get_tr())
741 }
742
743 async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
744 Ok(self.get_schema())
745 }
746
747 async fn undo(&mut self) -> ForgeResult<()> {
748 self.undo();
749 Ok(())
750 }
751
752 async fn redo(&mut self) -> ForgeResult<()> {
753 self.redo();
754 Ok(())
755 }
756
757 async fn jump(
758 &mut self,
759 steps: isize,
760 ) -> ForgeResult<()> {
761 self.jump(steps);
762 Ok(())
763 }
764
765 fn get_config(&self) -> &ForgeConfig {
766 self.get_config()
767 }
768
769 fn update_config(
770 &mut self,
771 config: ForgeConfig,
772 ) {
773 self.update_config(config);
774 }
775
776 fn get_options(&self) -> &RuntimeOptions {
777 self.get_options()
778 }
779
780 async fn destroy(&mut self) -> ForgeResult<()> {
781 self.destroy().await
782 }
783}