1use std::sync::Arc;
2use std::time::Instant;
3
4use async_trait::async_trait;
5
6use crate::{
7 config::ForgeConfig,
8 debug::{debug, info},
9 error::{error_utils, ForgeResult},
10 event::{Event, EventBus},
11 extension_manager::ExtensionManager,
12 helpers::{
13 create_doc, event_helper::EventHelper, history_helper::HistoryHelper,
14 middleware_helper::MiddlewareHelper,
15 },
16 history_manager::HistoryManager,
17 metrics,
18 runtime::{runtime_trait::RuntimeTrait, sync_flow::FlowEngine},
19 types::{HistoryEntryWithMeta, ProcessorResult, RuntimeOptions},
20};
21
22use mf_model::{node_pool::NodePool, schema::Schema};
23use mf_state::{
24 ops::GlobalResourceManager,
25 state::{State, StateConfig},
26 transaction::{Command, Transaction},
27};
28
29pub struct ForgeRuntime {
32 event_bus: EventBus<Event>,
33 state: Arc<State>,
34 flow_engine: Arc<FlowEngine>,
35 extension_manager: ExtensionManager,
36 history_manager: HistoryManager<HistoryEntryWithMeta>,
37 options: RuntimeOptions,
38 config: ForgeConfig,
39}
40impl ForgeRuntime {
41 #[cfg_attr(
54 feature = "dev-tracing",
55 tracing::instrument(
56 skip(options),
57 fields(crate_name = "core", runtime_type = "sync")
58 )
59 )]
60 pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
61 Self::create_with_config(options, ForgeConfig::default()).await
62 }
63
64 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
86 crate_name = "core",
87 schema_path = xml_schema_path
88 )))]
89 pub async fn from_xml_schema_path(
90 xml_schema_path: &str,
91 options: Option<RuntimeOptions>,
92 config: Option<ForgeConfig>,
93 ) -> ForgeResult<Self> {
94 let mut config = config.unwrap_or_default();
95 config.extension.xml_schema_paths = vec![xml_schema_path.to_string()];
96 Self::create_with_config(options.unwrap_or_default(), config).await
97 }
98
99 fn merge_options_with_extensions(
108 options: Option<RuntimeOptions>,
109 extension_manager: ExtensionManager,
110 ) -> RuntimeOptions {
111 match options {
112 Some(opts) => {
113 let schema = extension_manager.get_schema();
115 let mut xml_extensions = Vec::new();
116 let factory = schema.factory();
117 let (nodes, marks) = factory.definitions();
118 for (name, node_type) in nodes {
120 let node =
121 crate::node::Node::create(name, node_type.spec.clone());
122 xml_extensions.push(crate::types::Extensions::N(node));
123 }
124
125 for (name, mark_type) in marks {
127 let mark =
128 crate::mark::Mark::new(name, mark_type.spec.clone());
129 xml_extensions.push(crate::types::Extensions::M(mark));
130 }
131
132 let existing_extensions = opts.get_extensions();
134 xml_extensions.extend(existing_extensions);
135 opts.set_extensions(xml_extensions)
136 },
137 None => {
138 RuntimeOptions::from_extension_manager(extension_manager)
140 },
141 }
142 }
143
144 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_schema_paths, options, config), fields(
154 crate_name = "core",
155 schema_count = xml_schema_paths.len(),
156 runtime_type = "sync"
157 )))]
158 pub async fn from_xml_schemas(
159 xml_schema_paths: &[&str],
160 options: Option<RuntimeOptions>,
161 config: Option<ForgeConfig>,
162 ) -> ForgeResult<Self> {
163 let mut config = config.unwrap_or_default();
164 config.extension.xml_schema_paths =
165 xml_schema_paths.iter().map(|s| s.to_string()).collect();
166 Self::create_with_config(options.unwrap_or_default(), config).await
167 }
168
169 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_content, options, config), fields(
194 crate_name = "core",
195 content_size = xml_content.len(),
196 runtime_type = "sync"
197 )))]
198 pub async fn from_xml_content(
199 xml_content: &str,
200 options: Option<RuntimeOptions>,
201 config: Option<ForgeConfig>,
202 ) -> ForgeResult<Self> {
203 let extension_manager = ExtensionManager::from_xml_string(xml_content)?;
204 let final_options =
205 Self::merge_options_with_extensions(options, extension_manager);
206
207 Self::create_with_config(final_options, config.unwrap_or_default())
208 .await
209 }
210
211 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
225 crate_name = "core",
226 runtime_type = "sync",
227 has_middleware = !options.get_middleware_stack().is_empty()
228 )))]
229 pub async fn create_with_config(
230 options: RuntimeOptions,
231 config: ForgeConfig,
232 ) -> ForgeResult<Self> {
233 let start_time = Instant::now();
234 info!("正在创建新的编辑器实例");
235
236 let extension_manager =
238 Self::create_extension_manager(&options, &config)?;
239
240 debug!("已初始化扩展管理器");
241
242 let op_state = GlobalResourceManager::new();
243 for op_fn in extension_manager.get_op_fns() {
244 op_fn(&op_state)?;
245 }
246
247 let mut state_config = StateConfig {
248 schema: Some(extension_manager.get_schema()),
249 doc: None,
250 stored_marks: None,
251 plugins: Some(extension_manager.get_plugins().clone()),
252 resource_manager: Some(Arc::new(op_state)),
253 };
254 create_doc::create_doc(&options.get_content(), &mut state_config)
255 .await?;
256 let state: State = State::create(state_config).await?;
257
258 let state: Arc<State> = Arc::new(state);
259 debug!("已创建编辑器状态");
260
261 let event_bus = EventHelper::create_and_init_event_bus(
263 &config,
264 &options,
265 state.clone(),
266 )
267 .await?;
268
269 let runtime = ForgeRuntime {
270 event_bus,
271 state: state.clone(),
272 flow_engine: Arc::new(FlowEngine::new()?),
273 extension_manager,
274 history_manager: HistoryManager::with_config(
275 HistoryEntryWithMeta::new(
276 state.clone(),
277 "创建工程项目".to_string(),
278 serde_json::Value::Null,
279 ),
280 config.history.clone(),
281 ),
282 options,
283 config,
284 };
285 info!("编辑器实例创建成功");
286 metrics::editor_creation_duration(start_time.elapsed());
287 Ok(runtime)
288 }
289
290 fn create_extension_manager(
299 options: &RuntimeOptions,
300 config: &ForgeConfig,
301 ) -> ForgeResult<ExtensionManager> {
302 crate::helpers::runtime_common::ExtensionManagerHelper::create_extension_manager(
303 options, config,
304 )
305 }
306
307 #[cfg_attr(
309 feature = "dev-tracing",
310 tracing::instrument(skip(self), fields(crate_name = "core"))
311 )]
312 pub async fn destroy(&mut self) -> ForgeResult<()> {
313 debug!("正在销毁编辑器实例");
314 EventHelper::destroy_event_bus(&mut self.event_bus).await?;
315 debug!("编辑器实例销毁成功");
316 Ok(())
317 }
318
319 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, event), fields(
320 crate_name = "core",
321 event_type = std::any::type_name_of_val(&event)
322 )))]
323 pub async fn emit_event(
324 &mut self,
325 event: Event,
326 ) -> ForgeResult<()> {
327 EventHelper::emit_event(&mut self.event_bus, event).await
328 }
329 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
330 crate_name = "core",
331 tr_id = %transaction.id,
332 middleware_count = self.options.get_middleware_stack().middlewares.len()
333 )))]
334 pub async fn run_before_middleware(
335 &mut self,
336 transaction: &mut Transaction,
337 ) -> ForgeResult<()> {
338 MiddlewareHelper::run_before_middleware(
339 transaction,
340 &self.options.get_middleware_stack(),
341 &self.config,
342 )
343 .await
344 }
345 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, state, transactions), fields(
346 crate_name = "core",
347 has_state = state.is_some(),
348 tr_count = transactions.len(),
349 middleware_count = self.options.get_middleware_stack().middlewares.len()
350 )))]
351 pub async fn run_after_middleware(
352 &mut self,
353 state: &mut Option<Arc<State>>,
354 transactions: &mut Vec<Arc<Transaction>>,
355 ) -> ForgeResult<()> {
356 debug!("执行后置中间件链");
357 for middleware in &self.options.get_middleware_stack().middlewares {
358 let start_time = Instant::now();
359 let timeout = std::time::Duration::from_millis(
360 self.config.performance.middleware_timeout_ms,
361 );
362 let middleware_result = match tokio::time::timeout(
363 timeout,
364 middleware.after_dispatch(state.clone(), transactions),
365 )
366 .await
367 {
368 Ok(Ok(result)) => {
369 metrics::middleware_execution_duration(
370 start_time.elapsed(),
371 "after",
372 middleware.name().as_str(),
373 );
374 result
375 },
376 Ok(Err(e)) => {
377 return Err(error_utils::middleware_error(format!(
378 "后置中间件执行失败: {e}"
379 )));
380 },
381 Err(_) => {
382 return Err(error_utils::middleware_error(format!(
383 "后置中间件执行超时({}ms)",
384 self.config.performance.middleware_timeout_ms
385 )));
386 },
387 };
388
389 if let Some(mut transaction) = middleware_result {
390 transaction.commit()?;
392 let task_result = self
393 .flow_engine
394 .submit((self.state.clone(), transaction))
395 .await;
396 let Some(ProcessorResult { result: Some(result), .. }) =
397 task_result.output
398 else {
399 return Err(error_utils::state_error(
400 "附加事务处理结果无效".to_string(),
401 ));
402 };
403 *state = Some(result.state);
404 transactions.extend(result.transactions);
405 }
406 }
407 Ok(())
408 }
409 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command), fields(
410 crate_name = "core",
411 command_name = %command.name()
412 )))]
413 pub async fn command(
414 &mut self,
415 command: Arc<dyn Command>,
416 ) -> ForgeResult<()> {
417 debug!("正在执行命令: {}", command.name());
418 metrics::command_executed(command.name().as_str());
419 let mut tr = self.get_tr();
420 command.execute(&mut tr).await?;
421 tr.commit()?;
422 self.dispatch(tr).await
423 }
424
425 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command, meta), fields(
426 crate_name = "core",
427 command_name = %command.name(),
428 description = %description
429 )))]
430 pub async fn command_with_meta(
431 &mut self,
432 command: Arc<dyn Command>,
433 description: String,
434 meta: serde_json::Value,
435 ) -> ForgeResult<()> {
436 debug!("正在执行命令: {}", command.name());
437 metrics::command_executed(command.name().as_str());
438 let mut tr = self.get_tr();
439 command.execute(&mut tr).await?;
440 tr.commit()?;
441 self.dispatch_with_meta(tr, description, meta).await
442 }
443
444 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
452 crate_name = "core",
453 tr_id = %transaction.id,
454 runtime_type = "sync"
455 )))]
456 pub async fn dispatch(
457 &mut self,
458 transaction: Transaction,
459 ) -> ForgeResult<()> {
460 self.dispatch_with_meta(
461 transaction,
462 "".to_string(),
463 serde_json::Value::Null,
464 )
465 .await
466 }
467 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction, meta), fields(
469 crate_name = "core",
470 tr_id = %transaction.id,
471 description = %description,
472 runtime_type = "sync"
473 )))]
474 pub async fn dispatch_with_meta(
475 &mut self,
476 transaction: Transaction,
477 description: String,
478 meta: serde_json::Value,
479 ) -> ForgeResult<()> {
480 metrics::transaction_dispatched();
481 let old_id = self.get_state().version;
482 let mut current_transaction = transaction;
484 self.run_before_middleware(&mut current_transaction).await?;
485
486 let task_result = self
488 .flow_engine
489 .submit((self.state.clone(), current_transaction.clone()))
490 .await;
491 let Some(ProcessorResult { result: Some(result), .. }) =
492 task_result.output
493 else {
494 return Err(error_utils::state_error(
495 "任务处理结果无效".to_string(),
496 ));
497 };
498 let mut state_update = None;
500 let mut transactions = Vec::new();
501 transactions.extend(result.transactions);
502 if transactions.last().is_some() {
504 state_update = Some(result.state);
505 }
506 self.run_after_middleware(&mut state_update, &mut transactions).await?;
508
509 if let Some(state) = state_update {
511 self.update_state_with_meta(state.clone(), description, meta)
512 .await?;
513 self.emit_event(Event::TrApply(old_id, transactions, state))
514 .await?;
515 }
516 Ok(())
517 }
518 pub async fn update_state(
520 &mut self,
521 state: Arc<State>,
522 ) -> ForgeResult<()> {
523 self.update_state_with_meta(
524 state,
525 "".to_string(),
526 serde_json::Value::Null,
527 )
528 .await
529 }
530 pub async fn update_state_with_meta(
532 &mut self,
533 state: Arc<State>,
534 description: String,
535 meta: serde_json::Value,
536 ) -> ForgeResult<()> {
537 self.state = state.clone();
538 HistoryHelper::insert(
539 &mut self.history_manager,
540 state,
541 description,
542 meta,
543 );
544 Ok(())
545 }
546
547 pub async fn register_plugin(&mut self) -> ForgeResult<()> {
548 info!("正在注册新插件");
549 let state = self
550 .get_state()
551 .reconfigure(StateConfig {
552 schema: Some(self.get_schema()),
553 doc: Some(self.get_state().doc()),
554 stored_marks: None,
555 plugins: Some(self.get_state().plugins().await),
556 resource_manager: Some(
557 self.get_state().resource_manager().clone(),
558 ),
559 })
560 .await?;
561 self.update_state(Arc::new(state)).await?;
562 info!("插件注册成功");
563 Ok(())
564 }
565
566 pub async fn unregister_plugin(
567 &mut self,
568 plugin_key: String,
569 ) -> ForgeResult<()> {
570 info!("正在注销插件: {}", plugin_key);
571 let ps = self
572 .get_state()
573 .plugins()
574 .await
575 .iter()
576 .filter(|p| p.key != plugin_key)
577 .cloned()
578 .collect();
579 let state = self
580 .get_state()
581 .reconfigure(StateConfig {
582 schema: Some(self.get_schema().clone()),
583 doc: Some(self.get_state().doc()),
584 stored_marks: None,
585 plugins: Some(ps),
586 resource_manager: Some(
587 self.get_state().resource_manager().clone(),
588 ),
589 })
590 .await?;
591 self.update_state(Arc::new(state)).await?;
592 info!("插件注销成功");
593 Ok(())
594 }
595
596 pub fn doc(&self) -> Arc<NodePool> {
598 self.state.doc()
599 }
600
601 pub fn get_options(&self) -> &RuntimeOptions {
602 &self.options
603 }
604
605 pub fn get_config(&self) -> &ForgeConfig {
607 &self.config
608 }
609
610 pub fn update_config(
612 &mut self,
613 config: ForgeConfig,
614 ) {
615 self.config = config;
616 }
617
618 pub fn get_state(&self) -> &Arc<State> {
619 &self.state
620 }
621
622 pub fn get_schema(&self) -> Arc<Schema> {
623 self.extension_manager.get_schema()
624 }
625
626 pub fn get_event_bus(&self) -> &EventBus<Event> {
627 &self.event_bus
628 }
629
630 pub fn get_tr(&self) -> Transaction {
631 self.get_state().tr()
632 }
633
634 pub fn undo(&mut self) {
635 self.state = HistoryHelper::undo(&mut self.history_manager);
636 }
637
638 pub fn redo(&mut self) {
639 self.state = HistoryHelper::redo(&mut self.history_manager);
640 }
641
642 pub fn jump(
643 &mut self,
644 n: isize,
645 ) {
646 self.state = HistoryHelper::jump(&mut self.history_manager, n);
647 }
648 pub fn get_history_manager(&self) -> &HistoryManager<HistoryEntryWithMeta> {
649 &self.history_manager
650 }
651}
652impl Drop for ForgeRuntime {
653 fn drop(&mut self) {
654 EventHelper::destroy_event_bus_blocking(&mut self.event_bus);
656 }
657}
658
659#[async_trait]
662impl RuntimeTrait for ForgeRuntime {
663 async fn dispatch(
664 &mut self,
665 transaction: Transaction,
666 ) -> ForgeResult<()> {
667 self.dispatch(transaction).await
668 }
669
670 async fn dispatch_with_meta(
671 &mut self,
672 transaction: Transaction,
673 description: String,
674 meta: serde_json::Value,
675 ) -> ForgeResult<()> {
676 self.dispatch_with_meta(transaction, description, meta).await
677 }
678
679 async fn command(
680 &mut self,
681 command: Arc<dyn Command>,
682 ) -> ForgeResult<()> {
683 self.command(command).await
684 }
685
686 async fn command_with_meta(
687 &mut self,
688 command: Arc<dyn Command>,
689 description: String,
690 meta: serde_json::Value,
691 ) -> ForgeResult<()> {
692 self.command_with_meta(command, description, meta).await
693 }
694
695 async fn get_state(&self) -> ForgeResult<Arc<State>> {
696 Ok(self.get_state().clone())
697 }
698
699 async fn get_tr(&self) -> ForgeResult<Transaction> {
700 Ok(self.get_tr())
701 }
702
703 async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
704 Ok(self.get_schema())
705 }
706
707 async fn undo(&mut self) -> ForgeResult<()> {
708 self.undo();
709 Ok(())
710 }
711
712 async fn redo(&mut self) -> ForgeResult<()> {
713 self.redo();
714 Ok(())
715 }
716
717 async fn jump(
718 &mut self,
719 steps: isize,
720 ) -> ForgeResult<()> {
721 self.jump(steps);
722 Ok(())
723 }
724
725 fn get_config(&self) -> &ForgeConfig {
726 self.get_config()
727 }
728
729 fn update_config(
730 &mut self,
731 config: ForgeConfig,
732 ) {
733 self.update_config(config);
734 }
735
736 fn get_options(&self) -> &RuntimeOptions {
737 self.get_options()
738 }
739
740 async fn destroy(&mut self) -> ForgeResult<()> {
741 self.destroy().await
742 }
743}