1use std::sync::Arc;
2use std::time::Instant;
3
4use async_trait::async_trait;
5
6use crate::{
7 config::ForgeConfig,
8 debug::{debug, info},
9 error::{error_utils, ForgeResult},
10 event::{Event, EventBus},
11 extension_manager::ExtensionManager,
12 helpers::{
13 create_doc, event_helper::EventHelper, history_helper::HistoryHelper,
14 middleware_helper::MiddlewareHelper,
15 },
16 history_manager::HistoryManager,
17 metrics,
18 runtime::{runtime_trait::RuntimeTrait, sync_flow::FlowEngine},
19 types::{HistoryEntryWithMeta, ProcessorResult, RuntimeOptions},
20};
21
22use mf_model::{node_pool::NodePool, schema::Schema};
23use mf_state::{
24 ops::GlobalResourceManager,
25 state::{State, StateConfig},
26 transaction::{Command, Transaction},
27};
28
29pub struct ForgeRuntime {
32 event_bus: EventBus<Event>,
33 state: Arc<State>,
34 flow_engine: Arc<FlowEngine>,
35 extension_manager: ExtensionManager,
36 history_manager: HistoryManager<HistoryEntryWithMeta>,
37 options: RuntimeOptions,
38 config: ForgeConfig,
39}
40impl ForgeRuntime {
41 pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
54 Self::create_with_config(options, ForgeConfig::default()).await
55 }
56
57 pub async fn from_xml_schema_path(
79 xml_schema_path: &str,
80 options: Option<RuntimeOptions>,
81 config: Option<ForgeConfig>,
82 ) -> ForgeResult<Self> {
83 let mut config = config.unwrap_or_default();
84 config.extension.xml_schema_paths = vec![xml_schema_path.to_string()];
85 Self::create_with_config(options.unwrap_or_default(), config).await
86 }
87
88 fn merge_options_with_extensions(
97 options: Option<RuntimeOptions>,
98 extension_manager: ExtensionManager,
99 ) -> RuntimeOptions {
100 match options {
101 Some(opts) => {
102 let schema = extension_manager.get_schema();
104 let mut xml_extensions = Vec::new();
105 let factory = schema.factory();
106 let (nodes, marks) = factory.definitions();
107 for (name, node_type) in nodes {
109 let node =
110 crate::node::Node::create(name, node_type.spec.clone());
111 xml_extensions.push(crate::types::Extensions::N(node));
112 }
113
114 for (name, mark_type) in marks {
116 let mark =
117 crate::mark::Mark::new(name, mark_type.spec.clone());
118 xml_extensions.push(crate::types::Extensions::M(mark));
119 }
120
121 let existing_extensions = opts.get_extensions();
123 xml_extensions.extend(existing_extensions);
124 opts.set_extensions(xml_extensions)
125 },
126 None => {
127 RuntimeOptions::from_extension_manager(extension_manager)
129 },
130 }
131 }
132
133 pub async fn from_xml_schemas(
143 xml_schema_paths: &[&str],
144 options: Option<RuntimeOptions>,
145 config: Option<ForgeConfig>,
146 ) -> ForgeResult<Self> {
147 let mut config = config.unwrap_or_default();
148 config.extension.xml_schema_paths =
149 xml_schema_paths.iter().map(|s| s.to_string()).collect();
150 Self::create_with_config(options.unwrap_or_default(), config).await
151 }
152
153 pub async fn from_xml_content(
178 xml_content: &str,
179 options: Option<RuntimeOptions>,
180 config: Option<ForgeConfig>,
181 ) -> ForgeResult<Self> {
182 let extension_manager = ExtensionManager::from_xml_string(xml_content)?;
183 let final_options =
184 Self::merge_options_with_extensions(options, extension_manager);
185
186 Self::create_with_config(final_options, config.unwrap_or_default())
187 .await
188 }
189
190 pub async fn create_with_config(
204 options: RuntimeOptions,
205 config: ForgeConfig,
206 ) -> ForgeResult<Self> {
207 let start_time = Instant::now();
208 info!("正在创建新的编辑器实例");
209
210 let extension_manager =
212 Self::create_extension_manager(&options, &config)?;
213
214 debug!("已初始化扩展管理器");
215
216 let op_state = GlobalResourceManager::new();
217 for op_fn in extension_manager.get_op_fns() {
218 op_fn(&op_state)?;
219 }
220
221 let mut state_config = StateConfig {
222 schema: Some(extension_manager.get_schema()),
223 doc: None,
224 stored_marks: None,
225 plugins: Some(extension_manager.get_plugins().clone()),
226 resource_manager: Some(Arc::new(op_state)),
227 };
228 create_doc::create_doc(&options.get_content(), &mut state_config)
229 .await?;
230 let state: State = State::create(state_config).await?;
231
232 let state: Arc<State> = Arc::new(state);
233 debug!("已创建编辑器状态");
234
235 let event_bus = EventHelper::create_and_init_event_bus(
237 &config,
238 &options,
239 state.clone(),
240 )
241 .await?;
242
243 let runtime = ForgeRuntime {
244 event_bus,
245 state: state.clone(),
246 flow_engine: Arc::new(FlowEngine::new()?),
247 extension_manager,
248 history_manager: HistoryManager::with_config(
249 HistoryEntryWithMeta::new(
250 state.clone(),
251 "创建工程项目".to_string(),
252 serde_json::Value::Null,
253 ),
254 config.history.clone(),
255 ),
256 options,
257 config,
258 };
259 info!("编辑器实例创建成功");
260 metrics::editor_creation_duration(start_time.elapsed());
261 Ok(runtime)
262 }
263
264 fn create_extension_manager(
273 options: &RuntimeOptions,
274 config: &ForgeConfig,
275 ) -> ForgeResult<ExtensionManager> {
276 crate::helpers::runtime_common::ExtensionManagerHelper::create_extension_manager(
277 options, config,
278 )
279 }
280
281 pub async fn destroy(&mut self) -> ForgeResult<()> {
283 debug!("正在销毁编辑器实例");
284 EventHelper::destroy_event_bus(&mut self.event_bus).await?;
285 debug!("编辑器实例销毁成功");
286 Ok(())
287 }
288
289 pub async fn emit_event(
290 &mut self,
291 event: Event,
292 ) -> ForgeResult<()> {
293 EventHelper::emit_event(&mut self.event_bus, event).await
294 }
295 pub async fn run_before_middleware(
296 &mut self,
297 transaction: &mut Transaction,
298 ) -> ForgeResult<()> {
299 MiddlewareHelper::run_before_middleware(
300 transaction,
301 &self.options.get_middleware_stack(),
302 &self.config,
303 )
304 .await
305 }
306 pub async fn run_after_middleware(
307 &mut self,
308 state: &mut Option<Arc<State>>,
309 transactions: &mut Vec<Arc<Transaction>>,
310 ) -> ForgeResult<()> {
311 debug!("执行后置中间件链");
312 for middleware in &self.options.get_middleware_stack().middlewares {
313 let start_time = Instant::now();
314 let timeout = std::time::Duration::from_millis(
315 self.config.performance.middleware_timeout_ms,
316 );
317 let middleware_result = match tokio::time::timeout(
318 timeout,
319 middleware.after_dispatch(state.clone(), transactions),
320 )
321 .await
322 {
323 Ok(Ok(result)) => {
324 metrics::middleware_execution_duration(
325 start_time.elapsed(),
326 "after",
327 middleware.name().as_str(),
328 );
329 result
330 },
331 Ok(Err(e)) => {
332 return Err(error_utils::middleware_error(format!(
333 "后置中间件执行失败: {e}"
334 )));
335 },
336 Err(_) => {
337 return Err(error_utils::middleware_error(format!(
338 "后置中间件执行超时({}ms)",
339 self.config.performance.middleware_timeout_ms
340 )));
341 },
342 };
343
344 if let Some(mut transaction) = middleware_result {
345 transaction.commit()?;
347 let task_result = self
348 .flow_engine
349 .submit((self.state.clone(), transaction))
350 .await;
351 let Some(ProcessorResult { result: Some(result), .. }) =
352 task_result.output
353 else {
354 return Err(error_utils::state_error(
355 "附加事务处理结果无效".to_string(),
356 ));
357 };
358 *state = Some(result.state);
359 transactions.extend(result.transactions);
360 }
361 }
362 Ok(())
363 }
364 pub async fn command(
365 &mut self,
366 command: Arc<dyn Command>,
367 ) -> ForgeResult<()> {
368 debug!("正在执行命令: {}", command.name());
369 metrics::command_executed(command.name().as_str());
370 let mut tr = self.get_tr();
371 command.execute(&mut tr).await?;
372 tr.commit()?;
373 self.dispatch(tr).await
374 }
375
376 pub async fn command_with_meta(
377 &mut self,
378 command: Arc<dyn Command>,
379 description: String,
380 meta: serde_json::Value,
381 ) -> ForgeResult<()> {
382 debug!("正在执行命令: {}", command.name());
383 metrics::command_executed(command.name().as_str());
384 let mut tr = self.get_tr();
385 command.execute(&mut tr).await?;
386 tr.commit()?;
387 self.dispatch_with_meta(tr, description, meta).await
388 }
389
390 pub async fn dispatch(
398 &mut self,
399 transaction: Transaction,
400 ) -> ForgeResult<()> {
401 self.dispatch_with_meta(
402 transaction,
403 "".to_string(),
404 serde_json::Value::Null,
405 )
406 .await
407 }
408 pub async fn dispatch_with_meta(
410 &mut self,
411 transaction: Transaction,
412 description: String,
413 meta: serde_json::Value,
414 ) -> ForgeResult<()> {
415 metrics::transaction_dispatched();
416 let old_id = self.get_state().version;
417 let mut current_transaction = transaction;
419 self.run_before_middleware(&mut current_transaction).await?;
420
421 let task_result = self
423 .flow_engine
424 .submit((self.state.clone(), current_transaction.clone()))
425 .await;
426 let Some(ProcessorResult { result: Some(result), .. }) =
427 task_result.output
428 else {
429 return Err(error_utils::state_error(
430 "任务处理结果无效".to_string(),
431 ));
432 };
433 let mut state_update = None;
435 let mut transactions = Vec::new();
436 transactions.extend(result.transactions);
437 if transactions.last().is_some() {
439 state_update = Some(result.state);
440 }
441 self.run_after_middleware(&mut state_update, &mut transactions).await?;
443
444 if let Some(state) = state_update {
446 self.update_state_with_meta(state.clone(), description, meta)
447 .await?;
448 self.emit_event(Event::TrApply(old_id, transactions, state))
449 .await?;
450 }
451 Ok(())
452 }
453 pub async fn update_state(
455 &mut self,
456 state: Arc<State>,
457 ) -> ForgeResult<()> {
458 self.update_state_with_meta(
459 state,
460 "".to_string(),
461 serde_json::Value::Null,
462 )
463 .await
464 }
465 pub async fn update_state_with_meta(
467 &mut self,
468 state: Arc<State>,
469 description: String,
470 meta: serde_json::Value,
471 ) -> ForgeResult<()> {
472 self.state = state.clone();
473 HistoryHelper::insert(
474 &mut self.history_manager,
475 state,
476 description,
477 meta,
478 );
479 Ok(())
480 }
481
482 pub async fn register_plugin(&mut self) -> ForgeResult<()> {
483 info!("正在注册新插件");
484 let state = self
485 .get_state()
486 .reconfigure(StateConfig {
487 schema: Some(self.get_schema()),
488 doc: Some(self.get_state().doc()),
489 stored_marks: None,
490 plugins: Some(self.get_state().plugins().await),
491 resource_manager: Some(
492 self.get_state().resource_manager().clone(),
493 ),
494 })
495 .await?;
496 self.update_state(Arc::new(state)).await?;
497 info!("插件注册成功");
498 Ok(())
499 }
500
501 pub async fn unregister_plugin(
502 &mut self,
503 plugin_key: String,
504 ) -> ForgeResult<()> {
505 info!("正在注销插件: {}", plugin_key);
506 let ps = self
507 .get_state()
508 .plugins()
509 .await
510 .iter()
511 .filter(|p| p.key != plugin_key)
512 .cloned()
513 .collect();
514 let state = self
515 .get_state()
516 .reconfigure(StateConfig {
517 schema: Some(self.get_schema().clone()),
518 doc: Some(self.get_state().doc()),
519 stored_marks: None,
520 plugins: Some(ps),
521 resource_manager: Some(
522 self.get_state().resource_manager().clone(),
523 ),
524 })
525 .await?;
526 self.update_state(Arc::new(state)).await?;
527 info!("插件注销成功");
528 Ok(())
529 }
530
531 pub fn doc(&self) -> Arc<NodePool> {
533 self.state.doc()
534 }
535
536 pub fn get_options(&self) -> &RuntimeOptions {
537 &self.options
538 }
539
540 pub fn get_config(&self) -> &ForgeConfig {
542 &self.config
543 }
544
545 pub fn update_config(
547 &mut self,
548 config: ForgeConfig,
549 ) {
550 self.config = config;
551 }
552
553 pub fn get_state(&self) -> &Arc<State> {
554 &self.state
555 }
556
557 pub fn get_schema(&self) -> Arc<Schema> {
558 self.extension_manager.get_schema()
559 }
560
561 pub fn get_event_bus(&self) -> &EventBus<Event> {
562 &self.event_bus
563 }
564
565 pub fn get_tr(&self) -> Transaction {
566 self.get_state().tr()
567 }
568
569 pub fn undo(&mut self) {
570 self.state = HistoryHelper::undo(&mut self.history_manager);
571 }
572
573 pub fn redo(&mut self) {
574 self.state = HistoryHelper::redo(&mut self.history_manager);
575 }
576
577 pub fn jump(
578 &mut self,
579 n: isize,
580 ) {
581 self.state = HistoryHelper::jump(&mut self.history_manager, n);
582 }
583 pub fn get_history_manager(&self) -> &HistoryManager<HistoryEntryWithMeta> {
584 &self.history_manager
585 }
586}
587impl Drop for ForgeRuntime {
588 fn drop(&mut self) {
589 EventHelper::destroy_event_bus_blocking(&mut self.event_bus);
591 }
592}
593
594#[async_trait]
597impl RuntimeTrait for ForgeRuntime {
598 async fn dispatch(
599 &mut self,
600 transaction: Transaction,
601 ) -> ForgeResult<()> {
602 self.dispatch(transaction).await
603 }
604
605 async fn dispatch_with_meta(
606 &mut self,
607 transaction: Transaction,
608 description: String,
609 meta: serde_json::Value,
610 ) -> ForgeResult<()> {
611 self.dispatch_with_meta(transaction, description, meta).await
612 }
613
614 async fn command(
615 &mut self,
616 command: Arc<dyn Command>,
617 ) -> ForgeResult<()> {
618 self.command(command).await
619 }
620
621 async fn command_with_meta(
622 &mut self,
623 command: Arc<dyn Command>,
624 description: String,
625 meta: serde_json::Value,
626 ) -> ForgeResult<()> {
627 self.command_with_meta(command, description, meta).await
628 }
629
630 async fn get_state(&self) -> ForgeResult<Arc<State>> {
631 Ok(self.get_state().clone())
632 }
633
634 async fn get_tr(&self) -> ForgeResult<Transaction> {
635 Ok(self.get_tr())
636 }
637
638 async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
639 Ok(self.get_schema())
640 }
641
642 async fn undo(&mut self) -> ForgeResult<()> {
643 self.undo();
644 Ok(())
645 }
646
647 async fn redo(&mut self) -> ForgeResult<()> {
648 self.redo();
649 Ok(())
650 }
651
652 async fn jump(
653 &mut self,
654 steps: isize,
655 ) -> ForgeResult<()> {
656 self.jump(steps);
657 Ok(())
658 }
659
660 fn get_config(&self) -> &ForgeConfig {
661 self.get_config()
662 }
663
664 fn update_config(
665 &mut self,
666 config: ForgeConfig,
667 ) {
668 self.update_config(config);
669 }
670
671 fn get_options(&self) -> &RuntimeOptions {
672 self.get_options()
673 }
674
675 async fn destroy(&mut self) -> ForgeResult<()> {
676 self.destroy().await
677 }
678}