1use std::{
39 ops::{Deref, DerefMut},
40 sync::Arc,
41 time::Duration,
42};
43use async_trait::async_trait;
44use crate::runtime::runtime::ForgeRuntime;
45use crate::runtime::runtime_trait::RuntimeTrait;
46use crate::types::ProcessorResult;
47use crate::{
48 config::{ForgeConfig, PerformanceConfig},
49 debug::debug,
50 error::error_utils,
51 event::Event,
52 runtime::async_flow::{FlowEngine},
53 types::RuntimeOptions,
54 ForgeResult,
55};
56use mf_model::schema::Schema;
57use mf_state::{
58 state::TransactionResult,
59 transaction::{Command, Transaction},
60 State,
61};
62
63pub struct ForgeAsyncRuntime {
73 base: ForgeRuntime,
74 flow_engine: FlowEngine,
75}
76
77impl Deref for ForgeAsyncRuntime {
78 type Target = ForgeRuntime;
79
80 fn deref(&self) -> &Self::Target {
81 &self.base
82 }
83}
84
85impl DerefMut for ForgeAsyncRuntime {
86 fn deref_mut(&mut self) -> &mut Self::Target {
87 &mut self.base
88 }
89}
90impl ForgeAsyncRuntime {
91 #[cfg_attr(
104 feature = "dev-tracing",
105 tracing::instrument(
106 skip(options),
107 fields(crate_name = "core", runtime_type = "async")
108 )
109 )]
110 pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
111 Self::create_with_config(options, ForgeConfig::default()).await
112 }
113
114 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
136 crate_name = "core",
137 schema_path = xml_schema_path,
138 runtime_type = "async"
139 )))]
140 pub async fn from_xml_schema_path(
141 xml_schema_path: &str,
142 options: Option<RuntimeOptions>,
143 config: Option<ForgeConfig>,
144 ) -> ForgeResult<Self> {
145 let mut config = config.unwrap_or_default();
146 config.extension.xml_schema_paths = vec![xml_schema_path.to_string()];
147 Self::create_with_config(options.unwrap_or_default(), config).await
148 }
149
150 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_schema_paths, options, config), fields(
160 crate_name = "core",
161 schema_count = xml_schema_paths.len(),
162 runtime_type = "async"
163 )))]
164 pub async fn from_xml_schemas(
165 xml_schema_paths: &[&str],
166 options: Option<RuntimeOptions>,
167 config: Option<ForgeConfig>,
168 ) -> ForgeResult<Self> {
169 let mut config = config.unwrap_or_default();
170 config.extension.xml_schema_paths =
171 xml_schema_paths.iter().map(|s| s.to_string()).collect();
172 Self::create_with_config(options.unwrap_or_default(), config).await
173 }
174
175 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_content, options, config), fields(
185 crate_name = "core",
186 content_size = xml_content.len(),
187 runtime_type = "async"
188 )))]
189 pub async fn from_xml_content(
190 xml_content: &str,
191 options: Option<RuntimeOptions>,
192 config: Option<ForgeConfig>,
193 ) -> ForgeResult<Self> {
194 let base = ForgeRuntime::from_xml_content(xml_content, options, config)
195 .await?;
196 Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
197 }
198
199 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
213 crate_name = "core",
214 runtime_type = "async",
215 has_middleware = !options.get_middleware_stack().is_empty()
216 )))]
217 pub async fn create_with_config(
218 options: RuntimeOptions,
219 config: ForgeConfig,
220 ) -> ForgeResult<Self> {
221 let base = ForgeRuntime::create_with_config(options, config).await?;
222 Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
223 }
224
225 pub fn set_performance_config(
227 &mut self,
228 perf_config: PerformanceConfig,
229 ) {
230 self.base.update_config({
231 let mut config = self.base.get_config().clone();
232 config.performance = perf_config;
233 config
234 });
235 }
236
237 pub fn get_config(&self) -> &ForgeConfig {
239 self.base.get_config()
240 }
241
242 pub fn update_config(
244 &mut self,
245 config: ForgeConfig,
246 ) {
247 self.base.update_config(config);
248 }
249
250 fn log_performance(
252 &self,
253 operation: &str,
254 duration: Duration,
255 ) {
256 if self.base.get_config().performance.enable_monitoring
257 && duration.as_millis()
258 > self.base.get_config().performance.log_threshold_ms as u128
259 {
260 debug!("{} 耗时: {}ms", operation, duration.as_millis());
261 }
262 }
263 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command), fields(
264 crate_name = "core",
265 command_name = %command.name(),
266 runtime_type = "async"
267 )))]
268 pub async fn command(
269 &mut self,
270 command: Arc<dyn Command>,
271 ) -> ForgeResult<()> {
272 self.command_with_meta(command, "".to_string(), serde_json::Value::Null)
273 .await
274 }
275
276 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command, meta), fields(
287 crate_name = "core",
288 command_name = %command.name(),
289 description = %description,
290 runtime_type = "async"
291 )))]
292 pub async fn command_with_meta(
293 &mut self,
294 command: Arc<dyn Command>,
295 description: String,
296 meta: serde_json::Value,
297 ) -> ForgeResult<()> {
298 let cmd_name = command.name();
299 debug!("正在执行命令: {}", cmd_name);
300
301 let mut tr = self.base.get_tr();
303 command.execute(&mut tr).await?;
304 tr.commit()?;
305 match self.dispatch_flow_with_meta(tr, description, meta).await {
307 Ok(_) => {
308 debug!("命令 '{}' 执行成功", cmd_name);
309 Ok(())
310 },
311 Err(e) => {
312 debug!("命令 '{}' 执行失败: {}", cmd_name, e);
313 Err(e)
314 },
315 }
316 }
317 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
318 crate_name = "core",
319 tr_id = %transaction.id,
320 runtime_type = "async"
321 )))]
322 pub async fn dispatch_flow(
323 &mut self,
324 transaction: Transaction,
325 ) -> ForgeResult<()> {
326 self.dispatch_flow_with_meta(
327 transaction,
328 "".to_string(),
329 serde_json::Value::Null,
330 )
331 .await
332 }
333 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction, meta), fields(
347 crate_name = "core",
348 tr_id = %transaction.id,
349 description = %description,
350 runtime_type = "async"
351 )))]
352 pub async fn dispatch_flow_with_meta(
353 &mut self,
354 transaction: Transaction,
355 description: String,
356 meta: serde_json::Value,
357 ) -> ForgeResult<()> {
358 let start_time = std::time::Instant::now();
359 let mut current_transaction = transaction;
360 let old_id = self.get_state().await?.version;
361 let middleware_start = std::time::Instant::now();
363 self.run_before_middleware(&mut current_transaction).await?;
364 self.log_performance("前置中间件处理", middleware_start.elapsed());
365
366 let (_id, mut rx) = self
368 .flow_engine
369 .submit_transaction((
370 self.base.get_state().clone(),
371 current_transaction,
372 ))
373 .await?;
374
375 let recv_start = std::time::Instant::now();
377 let task_receive_timeout = Duration::from_millis(
378 self.base.get_config().performance.task_receive_timeout_ms,
379 );
380 let task_result =
381 match tokio::time::timeout(task_receive_timeout, rx.recv()).await {
382 Ok(Some(result)) => result,
383 Ok(None) => {
384 return Err(error_utils::state_error(
385 "任务接收通道已关闭".to_string(),
386 ));
387 },
388 Err(_) => {
389 return Err(error_utils::state_error(format!(
390 "任务接收超时({}ms)",
391 self.base
392 .get_config()
393 .performance
394 .task_receive_timeout_ms
395 )));
396 },
397 };
398 self.log_performance("接收任务结果", recv_start.elapsed());
399
400 let Some(ProcessorResult { result: Some(result), .. }) =
402 task_result.output
403 else {
404 return Err(error_utils::state_error(
405 "任务处理结果无效".to_string(),
406 ));
407 };
408
409 let mut current_state = None;
411 let mut transactions = Vec::new();
412 transactions.extend(result.transactions);
413
414 if transactions.last().is_some() {
416 current_state = Some(result.state);
417 }
418
419 let after_start = std::time::Instant::now();
421 self.run_after_middleware(&mut current_state, &mut transactions)
422 .await?;
423 self.log_performance("后置中间件处理", after_start.elapsed());
424
425 if let Some(state) = current_state {
427 self.base
428 .update_state_with_meta(state.clone(), description, meta)
429 .await?;
430
431 let event_start = std::time::Instant::now();
432 self.base
433 .emit_event(Event::TrApply(old_id, transactions, state))
434 .await?;
435 self.log_performance("事件广播", event_start.elapsed());
436 }
437
438 self.log_performance("事务处理总耗时", start_time.elapsed());
439 Ok(())
440 }
441
442 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
443 crate_name = "core",
444 tr_id = %transaction.id,
445 middleware_count = self.base.get_options().get_middleware_stack().middlewares.len(),
446 runtime_type = "async"
447 )))]
448 pub async fn run_before_middleware(
449 &mut self,
450 transaction: &mut Transaction,
451 ) -> ForgeResult<()> {
452 use crate::helpers::middleware_helper::MiddlewareHelper;
453
454 MiddlewareHelper::run_before_middleware(
455 transaction,
456 &self.base.get_options().get_middleware_stack(),
457 self.base.get_config(),
458 )
459 .await?;
460
461 transaction.commit()?;
462 Ok(())
463 }
464 pub async fn run_after_middleware(
465 &mut self,
466 state: &mut Option<Arc<State>>,
467 transactions: &mut Vec<Arc<Transaction>>,
468 ) -> ForgeResult<()> {
469 debug!("执行后置中间件链");
470 for middleware in
471 &self.base.get_options().get_middleware_stack().middlewares
472 {
473 let timeout = std::time::Duration::from_millis(
476 self.base.get_config().performance.middleware_timeout_ms,
477 );
478
479 let start_time = std::time::Instant::now();
481
482 let middleware_result = match tokio::time::timeout(
483 timeout,
484 middleware.after_dispatch(state.clone(), transactions),
485 )
486 .await
487 {
488 Ok(result) => match result {
489 Ok(r) => r,
490 Err(e) => {
491 debug!("中间件执行失败: {}", e);
493 return Err(error_utils::middleware_error(format!(
494 "中间件执行失败: {e}"
495 )));
496 },
497 },
498 Err(e) => {
499 debug!("中间件执行超时: {}", e);
500 return Err(error_utils::middleware_error(format!(
501 "中间件执行超时: {e}"
502 )));
503 },
504 };
505
506 let elapsed = start_time.elapsed();
508 if elapsed.as_millis() > 100 {
509 debug!("中间件执行时间较长: {}ms", elapsed.as_millis());
510 }
511
512 if let Some(mut transaction) = middleware_result {
513 transaction.commit()?;
514 let tx_start_time = std::time::Instant::now();
516
517 let result = match self
518 .flow_engine
519 .submit_transaction((
520 self.base.get_state().clone(),
521 transaction,
522 ))
523 .await
524 {
525 Ok(result) => result,
526 Err(e) => {
527 debug!("附加事务提交失败: {}", e);
528 return Err(error_utils::state_error(format!(
529 "附加事务提交失败: {e}"
530 )));
531 },
532 };
533
534 let (_id, mut rx) = result;
535
536 let task_receive_timeout = Duration::from_millis(
538 self.base.get_config().performance.task_receive_timeout_ms,
539 );
540 let task_result =
541 match tokio::time::timeout(task_receive_timeout, rx.recv())
542 .await
543 {
544 Ok(Some(result)) => result,
545 Ok(None) => {
546 debug!("附加事务接收通道已关闭");
547 return Ok(());
548 },
549 Err(_) => {
550 debug!("附加事务接收超时");
551 return Err(error_utils::state_error(format!(
552 "附加事务接收超时({}ms)",
553 self.base
554 .get_config()
555 .performance
556 .task_receive_timeout_ms
557 )));
558 },
559 };
560
561 let Some(ProcessorResult { result: Some(result), .. }) =
562 task_result.output
563 else {
564 debug!("附加事务处理结果无效");
565 return Ok(());
566 };
567
568 let TransactionResult { state: new_state, transactions: trs } =
569 result;
570 *state = Some(new_state);
571 transactions.extend(trs);
572
573 let tx_elapsed = tx_start_time.elapsed();
575 if tx_elapsed.as_millis() > 50 {
576 debug!(
577 "附加事务处理时间较长: {}ms",
578 tx_elapsed.as_millis()
579 );
580 }
581 }
582 }
583 Ok(())
584 }
585
586 #[cfg_attr(
594 feature = "dev-tracing",
595 tracing::instrument(
596 skip(self),
597 fields(crate_name = "core", runtime_type = "async")
598 )
599 )]
600 pub async fn shutdown(&mut self) -> ForgeResult<()> {
601 debug!("开始关闭异步运行时");
602
603 self.base.destroy().await?;
605
606 debug!("正在关闭流引擎...");
610
611 debug!("异步运行时已成功关闭");
612 Ok(())
613 }
614}
615
616#[async_trait]
619impl RuntimeTrait for ForgeAsyncRuntime {
620 async fn dispatch(
621 &mut self,
622 transaction: Transaction,
623 ) -> ForgeResult<()> {
624 self.dispatch_flow(transaction).await
626 }
627
628 async fn dispatch_with_meta(
629 &mut self,
630 transaction: Transaction,
631 description: String,
632 meta: serde_json::Value,
633 ) -> ForgeResult<()> {
634 self.dispatch_flow_with_meta(transaction, description, meta).await
636 }
637
638 async fn command(
639 &mut self,
640 command: Arc<dyn Command>,
641 ) -> ForgeResult<()> {
642 self.command(command).await
643 }
644
645 async fn command_with_meta(
646 &mut self,
647 command: Arc<dyn Command>,
648 description: String,
649 meta: serde_json::Value,
650 ) -> ForgeResult<()> {
651 self.command_with_meta(command, description, meta).await
652 }
653
654 async fn get_state(&self) -> ForgeResult<Arc<State>> {
655 Ok(self.base.get_state().clone())
656 }
657
658 async fn get_tr(&self) -> ForgeResult<Transaction> {
659 Ok(self.base.get_tr())
660 }
661
662 async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
663 Ok(self.base.get_schema())
664 }
665
666 async fn undo(&mut self) -> ForgeResult<()> {
667 self.base.undo();
668 Ok(())
669 }
670
671 async fn redo(&mut self) -> ForgeResult<()> {
672 self.base.redo();
673 Ok(())
674 }
675
676 async fn jump(
677 &mut self,
678 steps: isize,
679 ) -> ForgeResult<()> {
680 self.base.jump(steps);
681 Ok(())
682 }
683
684 fn get_config(&self) -> &ForgeConfig {
685 self.base.get_config()
686 }
687
688 fn update_config(
689 &mut self,
690 config: ForgeConfig,
691 ) {
692 self.base.update_config(config);
693 }
694
695 fn get_options(&self) -> &RuntimeOptions {
696 self.base.get_options()
697 }
698
699 async fn destroy(&mut self) -> ForgeResult<()> {
700 self.shutdown().await
701 }
702}