1use std::sync::Arc;
2use std::time::Instant;
3
4use async_trait::async_trait;
5
6use crate::{
7 config::ForgeConfig,
8 debug::{debug, info},
9 error::{error_utils, ForgeResult},
10 event::{Event, EventBus},
11 extension_manager::ExtensionManager,
12 helpers::{
13 create_doc, event_helper::EventHelper, history_helper::HistoryHelper,
14 middleware_helper::MiddlewareHelper,
15 },
16 history_manager::HistoryManager,
17 metrics,
18 runtime::{runtime_trait::RuntimeTrait, sync_flow::FlowEngine},
19 types::{HistoryEntryWithMeta, ProcessorResult, RuntimeOptions},
20};
21
22use mf_model::{node_pool::NodePool, schema::Schema};
23use mf_state::{
24 ops::GlobalResourceManager,
25 state::{State, StateConfig},
26 transaction::{Command, Transaction},
27};
28
29pub struct ForgeRuntime {
32 event_bus: EventBus<Event>,
33 state: Arc<State>,
34 flow_engine: Arc<FlowEngine>,
35 extension_manager: ExtensionManager,
36 history_manager: HistoryManager<HistoryEntryWithMeta>,
37 options: RuntimeOptions,
38 config: ForgeConfig,
39}
40impl ForgeRuntime {
41 #[cfg_attr(
54 feature = "dev-tracing",
55 tracing::instrument(
56 skip(options),
57 fields(crate_name = "core", runtime_type = "sync")
58 )
59 )]
60 pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
61 Self::create_with_config(options, ForgeConfig::default()).await
62 }
63
64 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
86 crate_name = "core",
87 schema_path = xml_schema_path
88 )))]
89 pub async fn from_xml_schema_path(
90 xml_schema_path: &str,
91 options: Option<RuntimeOptions>,
92 config: Option<ForgeConfig>,
93 ) -> ForgeResult<Self> {
94 let mut config = config.unwrap_or_default();
95 config.extension.xml_schema_paths = vec![xml_schema_path.to_string()];
96 Self::create_with_config(options.unwrap_or_default(), config).await
97 }
98
99 fn merge_options_with_extensions(
108 options: Option<RuntimeOptions>,
109 extension_manager: ExtensionManager,
110 ) -> RuntimeOptions {
111 match options {
112 Some(opts) => {
113 let schema = extension_manager.get_schema();
115 let mut xml_extensions = Vec::new();
116 let factory = schema.factory();
117 let (nodes, marks) = factory.definitions();
118 for (name, node_type) in nodes {
120 let node =
121 crate::node::Node::create(name, node_type.spec.clone());
122 xml_extensions.push(crate::types::Extensions::N(node));
123 }
124
125 for (name, mark_type) in marks {
127 let mark =
128 crate::mark::Mark::new(name, mark_type.spec.clone());
129 xml_extensions.push(crate::types::Extensions::M(mark));
130 }
131
132 let existing_extensions = opts.get_extensions();
134 xml_extensions.extend(existing_extensions);
135 opts.set_extensions(xml_extensions)
136 },
137 None => {
138 RuntimeOptions::from_extension_manager(extension_manager)
140 },
141 }
142 }
143
144 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_schema_paths, options, config), fields(
154 crate_name = "core",
155 schema_count = xml_schema_paths.len(),
156 runtime_type = "sync"
157 )))]
158 pub async fn from_xml_schemas(
159 xml_schema_paths: &[&str],
160 options: Option<RuntimeOptions>,
161 config: Option<ForgeConfig>,
162 ) -> ForgeResult<Self> {
163 let mut config = config.unwrap_or_default();
164 config.extension.xml_schema_paths =
165 xml_schema_paths.iter().map(|s| s.to_string()).collect();
166 Self::create_with_config(options.unwrap_or_default(), config).await
167 }
168
169 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_content, options, config), fields(
194 crate_name = "core",
195 content_size = xml_content.len(),
196 runtime_type = "sync"
197 )))]
198 pub async fn from_xml_content(
199 xml_content: &str,
200 options: Option<RuntimeOptions>,
201 config: Option<ForgeConfig>,
202 ) -> ForgeResult<Self> {
203 let extension_manager = ExtensionManager::from_xml_string(xml_content)?;
204 let final_options =
205 Self::merge_options_with_extensions(options, extension_manager);
206
207 Self::create_with_config(final_options, config.unwrap_or_default())
208 .await
209 }
210
211 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
225 crate_name = "core",
226 runtime_type = "sync",
227 has_middleware = !options.get_middleware_stack().is_empty()
228 )))]
229 pub async fn create_with_config(
230 options: RuntimeOptions,
231 config: ForgeConfig,
232 ) -> ForgeResult<Self> {
233 let start_time = Instant::now();
234 info!("正在创建新的编辑器实例");
235
236 let extension_manager =
238 Self::create_extension_manager(&options, &config)?;
239
240 debug!("已初始化扩展管理器");
241
242 let op_state = GlobalResourceManager::new();
243 for op_fn in extension_manager.get_op_fns() {
244 op_fn(&op_state)?;
245 }
246
247 let mut state_config = StateConfig {
248 schema: Some(extension_manager.get_schema()),
249 doc: None,
250 stored_marks: None,
251 plugins: Some(extension_manager.get_plugins().clone()),
252 resource_manager: Some(Arc::new(op_state)),
253 };
254 create_doc::create_doc(&options.get_content(), &mut state_config)
255 .await?;
256 let state: State = State::create(state_config).await?;
257
258 let state: Arc<State> = Arc::new(state);
259 debug!("已创建编辑器状态");
260
261 let event_bus = EventHelper::create_and_init_event_bus(
263 &config,
264 &options,
265 state.clone(),
266 )
267 .await?;
268
269 let initial_transaction = state.tr();
271
272 let runtime = ForgeRuntime {
273 event_bus,
274 state: state.clone(),
275 flow_engine: Arc::new(FlowEngine::new()?),
276 extension_manager,
277 history_manager: HistoryManager::with_config(
278 HistoryEntryWithMeta::new(
279 Arc::new(initial_transaction),
280 state.clone(),
281 "创建工程项目".to_string(),
282 serde_json::Value::Null,
283 ),
284 config.history.clone(),
285 ),
286 options,
287 config,
288 };
289 info!("编辑器实例创建成功");
290 metrics::editor_creation_duration(start_time.elapsed());
291 Ok(runtime)
292 }
293
294 fn create_extension_manager(
303 options: &RuntimeOptions,
304 config: &ForgeConfig,
305 ) -> ForgeResult<ExtensionManager> {
306 crate::helpers::runtime_common::ExtensionManagerHelper::create_extension_manager(
307 options, config,
308 )
309 }
310
311 #[cfg_attr(
313 feature = "dev-tracing",
314 tracing::instrument(skip(self), fields(crate_name = "core"))
315 )]
316 pub async fn destroy(&mut self) -> ForgeResult<()> {
317 debug!("正在销毁编辑器实例");
318 EventHelper::destroy_event_bus(&mut self.event_bus).await?;
319 debug!("编辑器实例销毁成功");
320 Ok(())
321 }
322
323 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, event), fields(
324 crate_name = "core",
325 event_type = std::any::type_name_of_val(&event)
326 )))]
327 pub async fn emit_event(
328 &mut self,
329 event: Event,
330 ) -> ForgeResult<()> {
331 EventHelper::emit_event(&mut self.event_bus, event).await
332 }
333 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
334 crate_name = "core",
335 tr_id = %transaction.id,
336 middleware_count = self.options.get_middleware_stack().middlewares.len()
337 )))]
338 pub async fn run_before_middleware(
339 &mut self,
340 transaction: &mut Transaction,
341 ) -> ForgeResult<()> {
342 MiddlewareHelper::run_before_middleware(
343 transaction,
344 &self.options.get_middleware_stack(),
345 &self.config,
346 )
347 .await
348 }
349 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, state, transactions), fields(
350 crate_name = "core",
351 has_state = state.is_some(),
352 tr_count = transactions.len(),
353 middleware_count = self.options.get_middleware_stack().middlewares.len()
354 )))]
355 pub async fn run_after_middleware(
356 &mut self,
357 state: &mut Option<Arc<State>>,
358 transactions: &mut Vec<Arc<Transaction>>,
359 ) -> ForgeResult<()> {
360 debug!("执行后置中间件链");
361 for middleware in &self.options.get_middleware_stack().middlewares {
362 let start_time = Instant::now();
363 let timeout = std::time::Duration::from_millis(
364 self.config.performance.middleware_timeout_ms,
365 );
366 let middleware_result = match tokio::time::timeout(
367 timeout,
368 middleware.after_dispatch(state.clone(), transactions),
369 )
370 .await
371 {
372 Ok(Ok(result)) => {
373 metrics::middleware_execution_duration(
374 start_time.elapsed(),
375 "after",
376 middleware.name().as_str(),
377 );
378 result
379 },
380 Ok(Err(e)) => {
381 return Err(error_utils::middleware_error(format!(
382 "后置中间件执行失败: {e}"
383 )));
384 },
385 Err(_) => {
386 return Err(error_utils::middleware_error(format!(
387 "后置中间件执行超时({}ms)",
388 self.config.performance.middleware_timeout_ms
389 )));
390 },
391 };
392
393 if let Some(mut transaction) = middleware_result {
394 transaction.commit()?;
396 let task_result = self
397 .flow_engine
398 .submit((self.state.clone(), transaction))
399 .await;
400 let Some(ProcessorResult { result: Some(result), .. }) =
401 task_result.output
402 else {
403 return Err(error_utils::state_error(
404 "附加事务处理结果无效".to_string(),
405 ));
406 };
407 *state = Some(result.state);
408 transactions.extend(result.transactions);
409 }
410 }
411 Ok(())
412 }
413 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command), fields(
414 crate_name = "core",
415 command_name = %command.name()
416 )))]
417 pub async fn command(
418 &mut self,
419 command: Arc<dyn Command>,
420 ) -> ForgeResult<()> {
421 debug!("正在执行命令: {}", command.name());
422 metrics::command_executed(command.name().as_str());
423 let mut tr = self.get_tr();
424 command.execute(&mut tr).await?;
425 tr.commit()?;
426 self.dispatch(tr).await
427 }
428
429 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command, meta), fields(
430 crate_name = "core",
431 command_name = %command.name(),
432 description = %description
433 )))]
434 pub async fn command_with_meta(
435 &mut self,
436 command: Arc<dyn Command>,
437 description: String,
438 meta: serde_json::Value,
439 ) -> ForgeResult<()> {
440 debug!("正在执行命令: {}", command.name());
441 metrics::command_executed(command.name().as_str());
442 let mut tr = self.get_tr();
443 command.execute(&mut tr).await?;
444 tr.commit()?;
445 self.dispatch_with_meta(tr, description, meta).await
446 }
447
448 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
456 crate_name = "core",
457 tr_id = %transaction.id,
458 runtime_type = "sync"
459 )))]
460 pub async fn dispatch(
461 &mut self,
462 transaction: Transaction,
463 ) -> ForgeResult<()> {
464 self.dispatch_with_meta(
465 transaction,
466 "".to_string(),
467 serde_json::Value::Null,
468 )
469 .await
470 }
471 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction, meta), fields(
473 crate_name = "core",
474 tr_id = %transaction.id,
475 description = %description,
476 runtime_type = "sync"
477 )))]
478 pub async fn dispatch_with_meta(
479 &mut self,
480 transaction: Transaction,
481 description: String,
482 meta: serde_json::Value,
483 ) -> ForgeResult<()> {
484 metrics::transaction_dispatched();
485 let _old_id = self.get_state().version;
486 let mut current_transaction = transaction;
488 self.run_before_middleware(&mut current_transaction).await?;
489
490 let task_result = self
492 .flow_engine
493 .submit((self.state.clone(), current_transaction.clone()))
494 .await;
495 let Some(ProcessorResult { result: Some(result), .. }) =
496 task_result.output
497 else {
498 return Err(error_utils::state_error(
499 "任务处理结果无效".to_string(),
500 ));
501 };
502 let mut state_update = None;
504 let mut transactions = Vec::new();
505 transactions.extend(result.transactions);
506 if transactions.last().is_some() {
508 state_update = Some(result.state);
509 }
510 self.run_after_middleware(&mut state_update, &mut transactions).await?;
512
513 if let Some(new_state) = state_update {
515 let old_state = self.state.clone();
516 self.update_state_with_meta(
517 new_state.clone(),
518 transactions.clone(),
519 description,
520 meta,
521 )
522 .await?;
523 self.emit_event(Event::TrApply {
524 old_state,
525 new_state,
526 transactions,
527 })
528 .await?;
529 }
530 Ok(())
531 }
532 pub async fn update_state(
534 &mut self,
535 state: Arc<State>,
536 ) -> ForgeResult<()> {
537 self.update_state_with_meta(
538 state,
539 vec![],
540 "".to_string(),
541 serde_json::Value::Null,
542 )
543 .await
544 }
545 pub async fn update_state_with_meta(
547 &mut self,
548 state: Arc<State>,
549 transactions: Vec<Arc<mf_state::Transaction>>,
550 description: String,
551 meta: serde_json::Value,
552 ) -> ForgeResult<()> {
553 self.state = state.clone();
554 HistoryHelper::insert(
555 &mut self.history_manager,
556 state,
557 transactions,
558 description,
559 meta,
560 );
561 Ok(())
562 }
563
564 pub async fn register_plugin(&mut self) -> ForgeResult<()> {
565 info!("正在注册新插件");
566 let state = self
567 .get_state()
568 .reconfigure(StateConfig {
569 schema: Some(self.get_schema()),
570 doc: Some(self.get_state().doc()),
571 stored_marks: None,
572 plugins: Some(self.get_state().plugins().await),
573 resource_manager: Some(
574 self.get_state().resource_manager().clone(),
575 ),
576 })
577 .await?;
578 self.update_state(Arc::new(state)).await?;
579 info!("插件注册成功");
580 Ok(())
581 }
582
583 pub async fn unregister_plugin(
584 &mut self,
585 plugin_key: String,
586 ) -> ForgeResult<()> {
587 info!("正在注销插件: {}", plugin_key);
588 let ps = self
589 .get_state()
590 .plugins()
591 .await
592 .iter()
593 .filter(|p| p.key != plugin_key)
594 .cloned()
595 .collect();
596 let state = self
597 .get_state()
598 .reconfigure(StateConfig {
599 schema: Some(self.get_schema().clone()),
600 doc: Some(self.get_state().doc()),
601 stored_marks: None,
602 plugins: Some(ps),
603 resource_manager: Some(
604 self.get_state().resource_manager().clone(),
605 ),
606 })
607 .await?;
608 self.update_state(Arc::new(state)).await?;
609 info!("插件注销成功");
610 Ok(())
611 }
612
613 pub fn doc(&self) -> Arc<NodePool> {
615 self.state.doc()
616 }
617
618 pub fn get_options(&self) -> &RuntimeOptions {
619 &self.options
620 }
621
622 pub fn get_config(&self) -> &ForgeConfig {
624 &self.config
625 }
626
627 pub fn update_config(
629 &mut self,
630 config: ForgeConfig,
631 ) {
632 self.config = config;
633 }
634
635 pub fn get_state(&self) -> &Arc<State> {
636 &self.state
637 }
638
639 pub fn get_schema(&self) -> Arc<Schema> {
640 self.extension_manager.get_schema()
641 }
642
643 pub fn get_event_bus(&self) -> &EventBus<Event> {
644 &self.event_bus
645 }
646
647 pub fn get_tr(&self) -> Transaction {
648 self.get_state().tr()
649 }
650
651 pub fn undo(&mut self) {
652 if let Some(result) =
653 HistoryHelper::undo(&mut self.history_manager, self.state.clone())
654 {
655 self.state = result.new_state.clone();
656
657 let _ = self.event_bus.broadcast_blocking(Event::Undo {
659 old_state: result.old_state,
660 new_state: result.new_state,
661 transactions: result.transactions,
662 });
663 }
664 }
665
666 pub fn redo(&mut self) {
667 if let Some(result) =
668 HistoryHelper::redo(&mut self.history_manager, self.state.clone())
669 {
670 self.state = result.new_state.clone();
671
672 let _ = self.event_bus.broadcast_blocking(Event::Redo {
674 old_state: result.old_state,
675 new_state: result.new_state,
676 transactions: result.transactions,
677 });
678 }
679 }
680
681 pub fn jump(
682 &mut self,
683 n: isize,
684 ) {
685 if let Some(result) = HistoryHelper::jump(
686 &mut self.history_manager,
687 self.state.clone(),
688 n,
689 ) {
690 self.state = result.new_state.clone();
691
692 let _ = self.event_bus.broadcast_blocking(Event::Jump {
694 old_state: result.old_state,
695 new_state: result.new_state,
696 transactions: result.transactions,
697 steps: n,
698 });
699 }
700 }
701 pub fn get_history_manager(&self) -> &HistoryManager<HistoryEntryWithMeta> {
702 &self.history_manager
703 }
704}
705impl Drop for ForgeRuntime {
706 fn drop(&mut self) {
707 EventHelper::destroy_event_bus_blocking(&mut self.event_bus);
709 }
710}
711
712#[async_trait]
715impl RuntimeTrait for ForgeRuntime {
716 async fn dispatch(
717 &mut self,
718 transaction: Transaction,
719 ) -> ForgeResult<()> {
720 self.dispatch(transaction).await
721 }
722
723 async fn dispatch_with_meta(
724 &mut self,
725 transaction: Transaction,
726 description: String,
727 meta: serde_json::Value,
728 ) -> ForgeResult<()> {
729 self.dispatch_with_meta(transaction, description, meta).await
730 }
731
732 async fn command(
733 &mut self,
734 command: Arc<dyn Command>,
735 ) -> ForgeResult<()> {
736 self.command(command).await
737 }
738
739 async fn command_with_meta(
740 &mut self,
741 command: Arc<dyn Command>,
742 description: String,
743 meta: serde_json::Value,
744 ) -> ForgeResult<()> {
745 self.command_with_meta(command, description, meta).await
746 }
747
748 async fn get_state(&self) -> ForgeResult<Arc<State>> {
749 Ok(self.get_state().clone())
750 }
751
752 async fn get_tr(&self) -> ForgeResult<Transaction> {
753 Ok(self.get_tr())
754 }
755
756 async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
757 Ok(self.get_schema())
758 }
759
760 async fn undo(&mut self) -> ForgeResult<()> {
761 self.undo();
762 Ok(())
763 }
764
765 async fn redo(&mut self) -> ForgeResult<()> {
766 self.redo();
767 Ok(())
768 }
769
770 async fn jump(
771 &mut self,
772 steps: isize,
773 ) -> ForgeResult<()> {
774 self.jump(steps);
775 Ok(())
776 }
777
778 fn get_config(&self) -> &ForgeConfig {
779 self.get_config()
780 }
781
782 fn update_config(
783 &mut self,
784 config: ForgeConfig,
785 ) {
786 self.update_config(config);
787 }
788
789 fn get_options(&self) -> &RuntimeOptions {
790 self.get_options()
791 }
792
793 async fn destroy(&mut self) -> ForgeResult<()> {
794 self.destroy().await
795 }
796}