1use std::{
39 ops::{Deref, DerefMut},
40 sync::Arc,
41 time::Duration,
42};
43use async_trait::async_trait;
44use crate::runtime::runtime::ForgeRuntime;
45use crate::runtime::runtime_trait::RuntimeTrait;
46use crate::types::ProcessorResult;
47use crate::{
48 config::{ForgeConfig, PerformanceConfig},
49 debug::debug,
50 error::error_utils,
51 event::Event,
52 runtime::async_flow::{FlowEngine},
53 types::RuntimeOptions,
54 ForgeResult,
55};
56use mf_model::schema::Schema;
57use mf_state::{
58 state::TransactionResult,
59 transaction::{Command, Transaction},
60 State,
61};
62
63pub struct ForgeAsyncRuntime {
73 base: ForgeRuntime,
74 flow_engine: FlowEngine,
75}
76
77impl Deref for ForgeAsyncRuntime {
78 type Target = ForgeRuntime;
79
80 fn deref(&self) -> &Self::Target {
81 &self.base
82 }
83}
84
85impl DerefMut for ForgeAsyncRuntime {
86 fn deref_mut(&mut self) -> &mut Self::Target {
87 &mut self.base
88 }
89}
90impl ForgeAsyncRuntime {
91 #[cfg_attr(
104 feature = "dev-tracing",
105 tracing::instrument(
106 skip(options),
107 fields(crate_name = "core", runtime_type = "async")
108 )
109 )]
110 pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
111 Self::create_with_config(options, ForgeConfig::default()).await
112 }
113
114 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
136 crate_name = "core",
137 schema_path = xml_schema_path,
138 runtime_type = "async"
139 )))]
140 pub async fn from_xml_schema_path(
141 xml_schema_path: &str,
142 options: Option<RuntimeOptions>,
143 config: Option<ForgeConfig>,
144 ) -> ForgeResult<Self> {
145 let mut config = config.unwrap_or_default();
146 config.extension.xml_schema_paths = vec![xml_schema_path.to_string()];
147 Self::create_with_config(options.unwrap_or_default(), config).await
148 }
149
150 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_schema_paths, options, config), fields(
160 crate_name = "core",
161 schema_count = xml_schema_paths.len(),
162 runtime_type = "async"
163 )))]
164 pub async fn from_xml_schemas(
165 xml_schema_paths: &[&str],
166 options: Option<RuntimeOptions>,
167 config: Option<ForgeConfig>,
168 ) -> ForgeResult<Self> {
169 let mut config = config.unwrap_or_default();
170 config.extension.xml_schema_paths =
171 xml_schema_paths.iter().map(|s| s.to_string()).collect();
172 Self::create_with_config(options.unwrap_or_default(), config).await
173 }
174
175 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_content, options, config), fields(
185 crate_name = "core",
186 content_size = xml_content.len(),
187 runtime_type = "async"
188 )))]
189 pub async fn from_xml_content(
190 xml_content: &str,
191 options: Option<RuntimeOptions>,
192 config: Option<ForgeConfig>,
193 ) -> ForgeResult<Self> {
194 let base = ForgeRuntime::from_xml_content(xml_content, options, config)
195 .await?;
196 Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
197 }
198
199 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
213 crate_name = "core",
214 runtime_type = "async",
215 has_middleware = !options.get_middleware_stack().is_empty()
216 )))]
217 pub async fn create_with_config(
218 options: RuntimeOptions,
219 config: ForgeConfig,
220 ) -> ForgeResult<Self> {
221 let base = ForgeRuntime::create_with_config(options, config).await?;
222 Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
223 }
224
225 pub fn set_performance_config(
227 &mut self,
228 perf_config: PerformanceConfig,
229 ) {
230 self.base.update_config({
231 let mut config = self.base.get_config().clone();
232 config.performance = perf_config;
233 config
234 });
235 }
236
237 pub fn get_config(&self) -> &ForgeConfig {
239 self.base.get_config()
240 }
241
242 pub fn update_config(
244 &mut self,
245 config: ForgeConfig,
246 ) {
247 self.base.update_config(config);
248 }
249
250 fn log_performance(
252 &self,
253 operation: &str,
254 duration: Duration,
255 ) {
256 if self.base.get_config().performance.enable_monitoring
257 && duration.as_millis()
258 > self.base.get_config().performance.log_threshold_ms as u128
259 {
260 debug!("{} 耗时: {}ms", operation, duration.as_millis());
261 }
262 }
263 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command), fields(
264 crate_name = "core",
265 command_name = %command.name(),
266 runtime_type = "async"
267 )))]
268 pub async fn command(
269 &mut self,
270 command: Arc<dyn Command>,
271 ) -> ForgeResult<()> {
272 self.command_with_meta(command, "".to_string(), serde_json::Value::Null)
273 .await
274 }
275
276 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command, meta), fields(
287 crate_name = "core",
288 command_name = %command.name(),
289 description = %description,
290 runtime_type = "async"
291 )))]
292 pub async fn command_with_meta(
293 &mut self,
294 command: Arc<dyn Command>,
295 description: String,
296 meta: serde_json::Value,
297 ) -> ForgeResult<()> {
298 let cmd_name = command.name();
299 debug!("正在执行命令: {}", cmd_name);
300
301 let mut tr = self.base.get_tr();
303 command.execute(&mut tr).await?;
304 tr.commit()?;
305 match self.dispatch_flow_with_meta(tr, description, meta).await {
307 Ok(_) => {
308 debug!("命令 '{}' 执行成功", cmd_name);
309 Ok(())
310 },
311 Err(e) => {
312 debug!("命令 '{}' 执行失败: {}", cmd_name, e);
313 Err(e)
314 },
315 }
316 }
317 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
318 crate_name = "core",
319 tr_id = %transaction.id,
320 runtime_type = "async"
321 )))]
322 pub async fn dispatch_flow(
323 &mut self,
324 transaction: Transaction,
325 ) -> ForgeResult<()> {
326 self.dispatch_flow_with_meta(
327 transaction,
328 "".to_string(),
329 serde_json::Value::Null,
330 )
331 .await
332 }
333 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction, meta), fields(
347 crate_name = "core",
348 tr_id = %transaction.id,
349 description = %description,
350 runtime_type = "async"
351 )))]
352 pub async fn dispatch_flow_with_meta(
353 &mut self,
354 transaction: Transaction,
355 description: String,
356 meta: serde_json::Value,
357 ) -> ForgeResult<()> {
358 let start_time = std::time::Instant::now();
359 let mut current_transaction = transaction;
360 let _old_id = self.get_state().await?.version;
361 let middleware_start = std::time::Instant::now();
363 self.run_before_middleware(&mut current_transaction).await?;
364 self.log_performance("前置中间件处理", middleware_start.elapsed());
365
366 let (_id, mut rx) = self
368 .flow_engine
369 .submit_transaction((
370 self.base.get_state().clone(),
371 current_transaction,
372 ))
373 .await?;
374
375 let recv_start = std::time::Instant::now();
377 let task_receive_timeout = Duration::from_millis(
378 self.base.get_config().performance.task_receive_timeout_ms,
379 );
380 let task_result =
381 match tokio::time::timeout(task_receive_timeout, rx.recv()).await {
382 Ok(Some(result)) => result,
383 Ok(None) => {
384 return Err(error_utils::state_error(
385 "任务接收通道已关闭".to_string(),
386 ));
387 },
388 Err(_) => {
389 return Err(error_utils::state_error(format!(
390 "任务接收超时({}ms)",
391 self.base
392 .get_config()
393 .performance
394 .task_receive_timeout_ms
395 )));
396 },
397 };
398 self.log_performance("接收任务结果", recv_start.elapsed());
399
400 let Some(ProcessorResult { result: Some(result), .. }) =
402 task_result.output
403 else {
404 return Err(error_utils::state_error(
405 "任务处理结果无效".to_string(),
406 ));
407 };
408
409 let mut current_state = None;
411 let mut transactions = Vec::new();
412 transactions.extend(result.transactions);
413
414 if transactions.last().is_some() {
416 current_state = Some(result.state);
417 }
418
419 let after_start = std::time::Instant::now();
421 self.run_after_middleware(&mut current_state, &mut transactions)
422 .await?;
423 self.log_performance("后置中间件处理", after_start.elapsed());
424
425 if let Some(new_state) = current_state {
427 let old_state = self.base.get_state().clone();
428 self.base
429 .update_state_with_meta(
430 new_state.clone(),
431 transactions.clone(),
432 description,
433 meta,
434 )
435 .await?;
436
437 let event_start = std::time::Instant::now();
438 self.base
439 .emit_event(Event::TrApply {
440 old_state,
441 new_state,
442 transactions,
443 })
444 .await?;
445 self.log_performance("事件广播", event_start.elapsed());
446 }
447
448 self.log_performance("事务处理总耗时", start_time.elapsed());
449 Ok(())
450 }
451
452 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
453 crate_name = "core",
454 tr_id = %transaction.id,
455 middleware_count = self.base.get_options().get_middleware_stack().middlewares.len(),
456 runtime_type = "async"
457 )))]
458 pub async fn run_before_middleware(
459 &mut self,
460 transaction: &mut Transaction,
461 ) -> ForgeResult<()> {
462 use crate::helpers::middleware_helper::MiddlewareHelper;
463
464 MiddlewareHelper::run_before_middleware(
465 transaction,
466 &self.base.get_options().get_middleware_stack(),
467 self.base.get_config(),
468 )
469 .await?;
470
471 transaction.commit()?;
472 Ok(())
473 }
474 pub async fn run_after_middleware(
475 &mut self,
476 state: &mut Option<Arc<State>>,
477 transactions: &mut Vec<Arc<Transaction>>,
478 ) -> ForgeResult<()> {
479 debug!("执行后置中间件链");
480 for middleware in
481 &self.base.get_options().get_middleware_stack().middlewares
482 {
483 let timeout = std::time::Duration::from_millis(
486 self.base.get_config().performance.middleware_timeout_ms,
487 );
488
489 let start_time = std::time::Instant::now();
491
492 let middleware_result = match tokio::time::timeout(
493 timeout,
494 middleware.after_dispatch(state.clone(), transactions),
495 )
496 .await
497 {
498 Ok(result) => match result {
499 Ok(r) => r,
500 Err(e) => {
501 debug!("中间件执行失败: {}", e);
503 return Err(error_utils::middleware_error(format!(
504 "中间件执行失败: {e}"
505 )));
506 },
507 },
508 Err(e) => {
509 debug!("中间件执行超时: {}", e);
510 return Err(error_utils::middleware_error(format!(
511 "中间件执行超时: {e}"
512 )));
513 },
514 };
515
516 let elapsed = start_time.elapsed();
518 if elapsed.as_millis() > 100 {
519 debug!("中间件执行时间较长: {}ms", elapsed.as_millis());
520 }
521
522 if let Some(mut transaction) = middleware_result {
523 transaction.commit()?;
524 let tx_start_time = std::time::Instant::now();
526
527 let result = match self
528 .flow_engine
529 .submit_transaction((
530 self.base.get_state().clone(),
531 transaction,
532 ))
533 .await
534 {
535 Ok(result) => result,
536 Err(e) => {
537 debug!("附加事务提交失败: {}", e);
538 return Err(error_utils::state_error(format!(
539 "附加事务提交失败: {e}"
540 )));
541 },
542 };
543
544 let (_id, mut rx) = result;
545
546 let task_receive_timeout = Duration::from_millis(
548 self.base.get_config().performance.task_receive_timeout_ms,
549 );
550 let task_result =
551 match tokio::time::timeout(task_receive_timeout, rx.recv())
552 .await
553 {
554 Ok(Some(result)) => result,
555 Ok(None) => {
556 debug!("附加事务接收通道已关闭");
557 return Ok(());
558 },
559 Err(_) => {
560 debug!("附加事务接收超时");
561 return Err(error_utils::state_error(format!(
562 "附加事务接收超时({}ms)",
563 self.base
564 .get_config()
565 .performance
566 .task_receive_timeout_ms
567 )));
568 },
569 };
570
571 let Some(ProcessorResult { result: Some(result), .. }) =
572 task_result.output
573 else {
574 debug!("附加事务处理结果无效");
575 return Ok(());
576 };
577
578 let TransactionResult { state: new_state, transactions: trs } =
579 result;
580 *state = Some(new_state);
581 transactions.extend(trs);
582
583 let tx_elapsed = tx_start_time.elapsed();
585 if tx_elapsed.as_millis() > 50 {
586 debug!(
587 "附加事务处理时间较长: {}ms",
588 tx_elapsed.as_millis()
589 );
590 }
591 }
592 }
593 Ok(())
594 }
595
596 #[cfg_attr(
604 feature = "dev-tracing",
605 tracing::instrument(
606 skip(self),
607 fields(crate_name = "core", runtime_type = "async")
608 )
609 )]
610 pub async fn shutdown(&mut self) -> ForgeResult<()> {
611 debug!("开始关闭异步运行时");
612
613 self.base.destroy().await?;
615
616 debug!("正在关闭流引擎...");
620
621 debug!("异步运行时已成功关闭");
622 Ok(())
623 }
624}
625
626#[async_trait]
629impl RuntimeTrait for ForgeAsyncRuntime {
630 async fn dispatch(
631 &mut self,
632 transaction: Transaction,
633 ) -> ForgeResult<()> {
634 self.dispatch_flow(transaction).await
636 }
637
638 async fn dispatch_with_meta(
639 &mut self,
640 transaction: Transaction,
641 description: String,
642 meta: serde_json::Value,
643 ) -> ForgeResult<()> {
644 self.dispatch_flow_with_meta(transaction, description, meta).await
646 }
647
648 async fn command(
649 &mut self,
650 command: Arc<dyn Command>,
651 ) -> ForgeResult<()> {
652 self.command(command).await
653 }
654
655 async fn command_with_meta(
656 &mut self,
657 command: Arc<dyn Command>,
658 description: String,
659 meta: serde_json::Value,
660 ) -> ForgeResult<()> {
661 self.command_with_meta(command, description, meta).await
662 }
663
664 async fn get_state(&self) -> ForgeResult<Arc<State>> {
665 Ok(self.base.get_state().clone())
666 }
667
668 async fn get_tr(&self) -> ForgeResult<Transaction> {
669 Ok(self.base.get_tr())
670 }
671
672 async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
673 Ok(self.base.get_schema())
674 }
675
676 async fn undo(&mut self) -> ForgeResult<()> {
677 self.base.undo();
678 Ok(())
679 }
680
681 async fn redo(&mut self) -> ForgeResult<()> {
682 self.base.redo();
683 Ok(())
684 }
685
686 async fn jump(
687 &mut self,
688 steps: isize,
689 ) -> ForgeResult<()> {
690 self.base.jump(steps);
691 Ok(())
692 }
693
694 fn get_config(&self) -> &ForgeConfig {
695 self.base.get_config()
696 }
697
698 fn update_config(
699 &mut self,
700 config: ForgeConfig,
701 ) {
702 self.base.update_config(config);
703 }
704
705 fn get_options(&self) -> &RuntimeOptions {
706 self.base.get_options()
707 }
708
709 async fn destroy(&mut self) -> ForgeResult<()> {
710 self.shutdown().await
711 }
712}