1use std::{
39 ops::{Deref, DerefMut},
40 sync::Arc,
41 time::Duration,
42};
43use async_trait::async_trait;
44use crate::runtime::runtime::ForgeRuntime;
45use crate::runtime::runtime_trait::RuntimeTrait;
46use crate::types::ProcessorResult;
47use crate::{
48 config::{ForgeConfig, PerformanceConfig},
49 debug::debug,
50 error::error_utils,
51 event::Event,
52 runtime::async_flow::{FlowEngine},
53 types::RuntimeOptions,
54 ForgeResult,
55};
56use mf_model::schema::Schema;
57use mf_state::{
58 state::TransactionResult,
59 transaction::{Command, Transaction},
60 State,
61};
62
63pub struct ForgeAsyncRuntime {
73 base: ForgeRuntime,
74 flow_engine: FlowEngine,
75}
76
77impl Deref for ForgeAsyncRuntime {
78 type Target = ForgeRuntime;
79
80 fn deref(&self) -> &Self::Target {
81 &self.base
82 }
83}
84
85impl DerefMut for ForgeAsyncRuntime {
86 fn deref_mut(&mut self) -> &mut Self::Target {
87 &mut self.base
88 }
89}
90impl ForgeAsyncRuntime {
91 #[cfg_attr(
104 feature = "dev-tracing",
105 tracing::instrument(
106 skip(options),
107 fields(crate_name = "core", runtime_type = "async")
108 )
109 )]
110 pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
111 Self::create_with_config(options, ForgeConfig::default()).await
112 }
113
114 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
136 crate_name = "core",
137 schema_path = xml_schema_path,
138 runtime_type = "async"
139 )))]
140 pub async fn from_xml_schema_path(
141 xml_schema_path: &str,
142 options: Option<RuntimeOptions>,
143 config: Option<ForgeConfig>,
144 ) -> ForgeResult<Self> {
145 let mut config = config.unwrap_or_default();
146 config.extension.xml_schema_paths = vec![xml_schema_path.to_string()];
147 Self::create_with_config(options.unwrap_or_default(), config).await
148 }
149
150 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_schema_paths, options, config), fields(
160 crate_name = "core",
161 schema_count = xml_schema_paths.len(),
162 runtime_type = "async"
163 )))]
164 pub async fn from_xml_schemas(
165 xml_schema_paths: &[&str],
166 options: Option<RuntimeOptions>,
167 config: Option<ForgeConfig>,
168 ) -> ForgeResult<Self> {
169 let mut config = config.unwrap_or_default();
170 config.extension.xml_schema_paths =
171 xml_schema_paths.iter().map(|s| s.to_string()).collect();
172 Self::create_with_config(options.unwrap_or_default(), config).await
173 }
174
175 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(xml_content, options, config), fields(
185 crate_name = "core",
186 content_size = xml_content.len(),
187 runtime_type = "async"
188 )))]
189 pub async fn from_xml_content(
190 xml_content: &str,
191 options: Option<RuntimeOptions>,
192 config: Option<ForgeConfig>,
193 ) -> ForgeResult<Self> {
194 let base = ForgeRuntime::from_xml_content(xml_content, options, config)
195 .await?;
196 Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
197 }
198
199 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(options, config), fields(
213 crate_name = "core",
214 runtime_type = "async",
215 has_middleware = !options.get_middleware_stack().is_empty()
216 )))]
217 pub async fn create_with_config(
218 options: RuntimeOptions,
219 config: ForgeConfig,
220 ) -> ForgeResult<Self> {
221 let base = ForgeRuntime::create_with_config(options, config).await?;
222 Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
223 }
224
225 pub fn set_performance_config(
227 &mut self,
228 perf_config: PerformanceConfig,
229 ) {
230 self.base.update_config({
231 let mut config = self.base.get_config().clone();
232 config.performance = perf_config;
233 config
234 });
235 }
236
237 pub fn get_config(&self) -> &ForgeConfig {
239 self.base.get_config()
240 }
241
242 pub fn update_config(
244 &mut self,
245 config: ForgeConfig,
246 ) {
247 self.base.update_config(config);
248 }
249
250 fn log_performance(
252 &self,
253 operation: &str,
254 duration: Duration,
255 ) {
256 if self.base.get_config().performance.enable_monitoring
257 && duration.as_millis()
258 > self.base.get_config().performance.log_threshold_ms as u128
259 {
260 debug!("{} 耗时: {}ms", operation, duration.as_millis());
261 }
262 }
263 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command), fields(
264 crate_name = "core",
265 command_name = %command.name(),
266 runtime_type = "async"
267 )))]
268 pub async fn command(
269 &mut self,
270 command: Arc<dyn Command>,
271 ) -> ForgeResult<()> {
272 self.command_with_meta(command, "".to_string(), serde_json::Value::Null)
273 .await
274 }
275
276 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, command, meta), fields(
287 crate_name = "core",
288 command_name = %command.name(),
289 description = %description,
290 runtime_type = "async"
291 )))]
292 pub async fn command_with_meta(
293 &mut self,
294 command: Arc<dyn Command>,
295 description: String,
296 meta: serde_json::Value,
297 ) -> ForgeResult<()> {
298 let cmd_name = command.name();
299 debug!("正在执行命令: {}", cmd_name);
300
301 let mut tr = self.base.get_tr();
303 command.execute(&mut tr).await?;
304 tr.commit()?;
305 match self.dispatch_flow_with_meta(tr, description, meta).await {
307 Ok(_) => {
308 debug!("命令 '{}' 执行成功", cmd_name);
309 Ok(())
310 },
311 Err(e) => {
312 debug!("命令 '{}' 执行失败: {}", cmd_name, e);
313 Err(e)
314 },
315 }
316 }
317 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
318 crate_name = "core",
319 tr_id = %transaction.id,
320 runtime_type = "async"
321 )))]
322 pub async fn dispatch_flow(
323 &mut self,
324 transaction: Transaction,
325 ) -> ForgeResult<()> {
326 self.dispatch_flow_with_meta(
327 transaction,
328 "".to_string(),
329 serde_json::Value::Null,
330 )
331 .await
332 }
333 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction, meta), fields(
347 crate_name = "core",
348 tr_id = %transaction.id,
349 description = %description,
350 runtime_type = "async"
351 )))]
352 pub async fn dispatch_flow_with_meta(
353 &mut self,
354 transaction: Transaction,
355 description: String,
356 meta: serde_json::Value,
357 ) -> ForgeResult<()> {
358 let start_time = std::time::Instant::now();
359 let mut current_transaction = transaction;
360 let _old_id = self.get_state().await?.version;
361 let middleware_start = std::time::Instant::now();
363 self.run_before_middleware(&mut current_transaction).await?;
364 self.log_performance("前置中间件处理", middleware_start.elapsed());
365
366 let (_id, mut rx) = self
368 .flow_engine
369 .submit_transaction((
370 self.base.get_state().clone(),
371 current_transaction,
372 ))
373 .await?;
374
375 let recv_start = std::time::Instant::now();
377 let task_receive_timeout = Duration::from_millis(
378 self.base.get_config().performance.task_receive_timeout_ms,
379 );
380 let task_result =
381 match tokio::time::timeout(task_receive_timeout, rx.recv()).await {
382 Ok(Some(result)) => result,
383 Ok(None) => {
384 return Err(error_utils::state_error(
385 "任务接收通道已关闭".to_string(),
386 ));
387 },
388 Err(_) => {
389 return Err(error_utils::state_error(format!(
390 "任务接收超时({}ms)",
391 self.base
392 .get_config()
393 .performance
394 .task_receive_timeout_ms
395 )));
396 },
397 };
398 self.log_performance("接收任务结果", recv_start.elapsed());
399
400 let Some(ProcessorResult { result: Some(result), .. }) =
402 task_result.output
403 else {
404 return Err(error_utils::state_error(
405 "任务处理结果无效".to_string(),
406 ));
407 };
408
409 let mut current_state = None;
411 let mut transactions = Vec::new();
412 transactions.extend(result.transactions);
413
414 if transactions.last().is_some() {
416 current_state = Some(result.state);
417 }
418
419 let after_start = std::time::Instant::now();
421 self.run_after_middleware(&mut current_state, &mut transactions)
422 .await?;
423 self.log_performance("后置中间件处理", after_start.elapsed());
424
425 if let Some(new_state) = current_state {
427 let old_state = self.base.get_state().clone();
428 self.base
429 .update_state_with_meta(new_state.clone(), transactions.clone(), description, meta)
430 .await?;
431
432 let event_start = std::time::Instant::now();
433 self.base
434 .emit_event(Event::TrApply {
435 old_state,
436 new_state,
437 transactions,
438 })
439 .await?;
440 self.log_performance("事件广播", event_start.elapsed());
441 }
442
443 self.log_performance("事务处理总耗时", start_time.elapsed());
444 Ok(())
445 }
446
447 #[cfg_attr(feature = "dev-tracing", tracing::instrument(skip(self, transaction), fields(
448 crate_name = "core",
449 tr_id = %transaction.id,
450 middleware_count = self.base.get_options().get_middleware_stack().middlewares.len(),
451 runtime_type = "async"
452 )))]
453 pub async fn run_before_middleware(
454 &mut self,
455 transaction: &mut Transaction,
456 ) -> ForgeResult<()> {
457 use crate::helpers::middleware_helper::MiddlewareHelper;
458
459 MiddlewareHelper::run_before_middleware(
460 transaction,
461 &self.base.get_options().get_middleware_stack(),
462 self.base.get_config(),
463 )
464 .await?;
465
466 transaction.commit()?;
467 Ok(())
468 }
469 pub async fn run_after_middleware(
470 &mut self,
471 state: &mut Option<Arc<State>>,
472 transactions: &mut Vec<Arc<Transaction>>,
473 ) -> ForgeResult<()> {
474 debug!("执行后置中间件链");
475 for middleware in
476 &self.base.get_options().get_middleware_stack().middlewares
477 {
478 let timeout = std::time::Duration::from_millis(
481 self.base.get_config().performance.middleware_timeout_ms,
482 );
483
484 let start_time = std::time::Instant::now();
486
487 let middleware_result = match tokio::time::timeout(
488 timeout,
489 middleware.after_dispatch(state.clone(), transactions),
490 )
491 .await
492 {
493 Ok(result) => match result {
494 Ok(r) => r,
495 Err(e) => {
496 debug!("中间件执行失败: {}", e);
498 return Err(error_utils::middleware_error(format!(
499 "中间件执行失败: {e}"
500 )));
501 },
502 },
503 Err(e) => {
504 debug!("中间件执行超时: {}", e);
505 return Err(error_utils::middleware_error(format!(
506 "中间件执行超时: {e}"
507 )));
508 },
509 };
510
511 let elapsed = start_time.elapsed();
513 if elapsed.as_millis() > 100 {
514 debug!("中间件执行时间较长: {}ms", elapsed.as_millis());
515 }
516
517 if let Some(mut transaction) = middleware_result {
518 transaction.commit()?;
519 let tx_start_time = std::time::Instant::now();
521
522 let result = match self
523 .flow_engine
524 .submit_transaction((
525 self.base.get_state().clone(),
526 transaction,
527 ))
528 .await
529 {
530 Ok(result) => result,
531 Err(e) => {
532 debug!("附加事务提交失败: {}", e);
533 return Err(error_utils::state_error(format!(
534 "附加事务提交失败: {e}"
535 )));
536 },
537 };
538
539 let (_id, mut rx) = result;
540
541 let task_receive_timeout = Duration::from_millis(
543 self.base.get_config().performance.task_receive_timeout_ms,
544 );
545 let task_result =
546 match tokio::time::timeout(task_receive_timeout, rx.recv())
547 .await
548 {
549 Ok(Some(result)) => result,
550 Ok(None) => {
551 debug!("附加事务接收通道已关闭");
552 return Ok(());
553 },
554 Err(_) => {
555 debug!("附加事务接收超时");
556 return Err(error_utils::state_error(format!(
557 "附加事务接收超时({}ms)",
558 self.base
559 .get_config()
560 .performance
561 .task_receive_timeout_ms
562 )));
563 },
564 };
565
566 let Some(ProcessorResult { result: Some(result), .. }) =
567 task_result.output
568 else {
569 debug!("附加事务处理结果无效");
570 return Ok(());
571 };
572
573 let TransactionResult { state: new_state, transactions: trs } =
574 result;
575 *state = Some(new_state);
576 transactions.extend(trs);
577
578 let tx_elapsed = tx_start_time.elapsed();
580 if tx_elapsed.as_millis() > 50 {
581 debug!(
582 "附加事务处理时间较长: {}ms",
583 tx_elapsed.as_millis()
584 );
585 }
586 }
587 }
588 Ok(())
589 }
590
591 #[cfg_attr(
599 feature = "dev-tracing",
600 tracing::instrument(
601 skip(self),
602 fields(crate_name = "core", runtime_type = "async")
603 )
604 )]
605 pub async fn shutdown(&mut self) -> ForgeResult<()> {
606 debug!("开始关闭异步运行时");
607
608 self.base.destroy().await?;
610
611 debug!("正在关闭流引擎...");
615
616 debug!("异步运行时已成功关闭");
617 Ok(())
618 }
619}
620
621#[async_trait]
624impl RuntimeTrait for ForgeAsyncRuntime {
625 async fn dispatch(
626 &mut self,
627 transaction: Transaction,
628 ) -> ForgeResult<()> {
629 self.dispatch_flow(transaction).await
631 }
632
633 async fn dispatch_with_meta(
634 &mut self,
635 transaction: Transaction,
636 description: String,
637 meta: serde_json::Value,
638 ) -> ForgeResult<()> {
639 self.dispatch_flow_with_meta(transaction, description, meta).await
641 }
642
643 async fn command(
644 &mut self,
645 command: Arc<dyn Command>,
646 ) -> ForgeResult<()> {
647 self.command(command).await
648 }
649
650 async fn command_with_meta(
651 &mut self,
652 command: Arc<dyn Command>,
653 description: String,
654 meta: serde_json::Value,
655 ) -> ForgeResult<()> {
656 self.command_with_meta(command, description, meta).await
657 }
658
659 async fn get_state(&self) -> ForgeResult<Arc<State>> {
660 Ok(self.base.get_state().clone())
661 }
662
663 async fn get_tr(&self) -> ForgeResult<Transaction> {
664 Ok(self.base.get_tr())
665 }
666
667 async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
668 Ok(self.base.get_schema())
669 }
670
671 async fn undo(&mut self) -> ForgeResult<()> {
672 self.base.undo();
673 Ok(())
674 }
675
676 async fn redo(&mut self) -> ForgeResult<()> {
677 self.base.redo();
678 Ok(())
679 }
680
681 async fn jump(
682 &mut self,
683 steps: isize,
684 ) -> ForgeResult<()> {
685 self.base.jump(steps);
686 Ok(())
687 }
688
689 fn get_config(&self) -> &ForgeConfig {
690 self.base.get_config()
691 }
692
693 fn update_config(
694 &mut self,
695 config: ForgeConfig,
696 ) {
697 self.base.update_config(config);
698 }
699
700 fn get_options(&self) -> &RuntimeOptions {
701 self.base.get_options()
702 }
703
704 async fn destroy(&mut self) -> ForgeResult<()> {
705 self.shutdown().await
706 }
707}