1use std::{
39 ops::{Deref, DerefMut},
40 sync::Arc,
41 time::Duration,
42};
43use async_trait::async_trait;
44use crate::runtime::runtime::ForgeRuntime;
45use crate::runtime::runtime_trait::RuntimeTrait;
46use crate::types::ProcessorResult;
47use crate::{
48 config::{ForgeConfig, PerformanceConfig},
49 debug::debug,
50 error::error_utils,
51 event::Event,
52 runtime::async_flow::{FlowEngine},
53 types::RuntimeOptions,
54 ForgeResult,
55};
56use mf_model::schema::Schema;
57use mf_state::{
58 state::TransactionResult,
59 transaction::{Command, Transaction},
60 State,
61};
62
63pub struct ForgeAsyncRuntime {
73 base: ForgeRuntime,
74 flow_engine: FlowEngine,
75}
76
77impl Deref for ForgeAsyncRuntime {
78 type Target = ForgeRuntime;
79
80 fn deref(&self) -> &Self::Target {
81 &self.base
82 }
83}
84
85impl DerefMut for ForgeAsyncRuntime {
86 fn deref_mut(&mut self) -> &mut Self::Target {
87 &mut self.base
88 }
89}
90impl ForgeAsyncRuntime {
91 pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
104 Self::create_with_config(options, ForgeConfig::default()).await
105 }
106
107 pub async fn from_xml_schema_path(
129 xml_schema_path: &str,
130 options: Option<RuntimeOptions>,
131 config: Option<ForgeConfig>,
132 ) -> ForgeResult<Self> {
133 let mut config = config.unwrap_or_default();
134 config.extension.xml_schema_paths = vec![xml_schema_path.to_string()];
135 Self::create_with_config(options.unwrap_or_default(), config).await
136 }
137
138 pub async fn from_xml_schemas(
148 xml_schema_paths: &[&str],
149 options: Option<RuntimeOptions>,
150 config: Option<ForgeConfig>,
151 ) -> ForgeResult<Self> {
152 let mut config = config.unwrap_or_default();
153 config.extension.xml_schema_paths =
154 xml_schema_paths.iter().map(|s| s.to_string()).collect();
155 Self::create_with_config(options.unwrap_or_default(), config).await
156 }
157
158 pub async fn from_xml_content(
168 xml_content: &str,
169 options: Option<RuntimeOptions>,
170 config: Option<ForgeConfig>,
171 ) -> ForgeResult<Self> {
172 let base = ForgeRuntime::from_xml_content(xml_content, options, config)
173 .await?;
174 Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
175 }
176
177 pub async fn create_with_config(
191 options: RuntimeOptions,
192 config: ForgeConfig,
193 ) -> ForgeResult<Self> {
194 let base = ForgeRuntime::create_with_config(options, config).await?;
195 Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
196 }
197
198 pub fn set_performance_config(
200 &mut self,
201 perf_config: PerformanceConfig,
202 ) {
203 self.base.update_config({
204 let mut config = self.base.get_config().clone();
205 config.performance = perf_config;
206 config
207 });
208 }
209
210 pub fn get_config(&self) -> &ForgeConfig {
212 self.base.get_config()
213 }
214
215 pub fn update_config(
217 &mut self,
218 config: ForgeConfig,
219 ) {
220 self.base.update_config(config);
221 }
222
223 fn log_performance(
225 &self,
226 operation: &str,
227 duration: Duration,
228 ) {
229 if self.base.get_config().performance.enable_monitoring
230 && duration.as_millis()
231 > self.base.get_config().performance.log_threshold_ms as u128
232 {
233 debug!("{} 耗时: {}ms", operation, duration.as_millis());
234 }
235 }
236 pub async fn command(
237 &mut self,
238 command: Arc<dyn Command>,
239 ) -> ForgeResult<()> {
240 self.command_with_meta(command, "".to_string(), serde_json::Value::Null)
241 .await
242 }
243
244 pub async fn command_with_meta(
255 &mut self,
256 command: Arc<dyn Command>,
257 description: String,
258 meta: serde_json::Value,
259 ) -> ForgeResult<()> {
260 let cmd_name = command.name();
261 debug!("正在执行命令: {}", cmd_name);
262
263 let mut tr = self.base.get_tr();
265 command.execute(&mut tr).await?;
266 tr.commit()?;
267 match self.dispatch_flow_with_meta(tr, description, meta).await {
269 Ok(_) => {
270 debug!("命令 '{}' 执行成功", cmd_name);
271 Ok(())
272 },
273 Err(e) => {
274 debug!("命令 '{}' 执行失败: {}", cmd_name, e);
275 Err(e)
276 },
277 }
278 }
279 pub async fn dispatch_flow(
280 &mut self,
281 transaction: Transaction,
282 ) -> ForgeResult<()> {
283 self.dispatch_flow_with_meta(
284 transaction,
285 "".to_string(),
286 serde_json::Value::Null,
287 )
288 .await
289 }
290 pub async fn dispatch_flow_with_meta(
304 &mut self,
305 transaction: Transaction,
306 description: String,
307 meta: serde_json::Value,
308 ) -> ForgeResult<()> {
309 let start_time = std::time::Instant::now();
310 let mut current_transaction = transaction;
311 let old_id = self.get_state().await?.version;
312 let middleware_start = std::time::Instant::now();
314 self.run_before_middleware(&mut current_transaction).await?;
315 self.log_performance("前置中间件处理", middleware_start.elapsed());
316
317 let (_id, mut rx) = self
319 .flow_engine
320 .submit_transaction((
321 self.base.get_state().clone(),
322 current_transaction,
323 ))
324 .await?;
325
326 let recv_start = std::time::Instant::now();
328 let task_receive_timeout = Duration::from_millis(
329 self.base.get_config().performance.task_receive_timeout_ms,
330 );
331 let task_result =
332 match tokio::time::timeout(task_receive_timeout, rx.recv()).await {
333 Ok(Some(result)) => result,
334 Ok(None) => {
335 return Err(error_utils::state_error(
336 "任务接收通道已关闭".to_string(),
337 ));
338 },
339 Err(_) => {
340 return Err(error_utils::state_error(format!(
341 "任务接收超时({}ms)",
342 self.base
343 .get_config()
344 .performance
345 .task_receive_timeout_ms
346 )));
347 },
348 };
349 self.log_performance("接收任务结果", recv_start.elapsed());
350
351 let Some(ProcessorResult { result: Some(result), .. }) =
353 task_result.output
354 else {
355 return Err(error_utils::state_error(
356 "任务处理结果无效".to_string(),
357 ));
358 };
359
360 let mut current_state = None;
362 let mut transactions = Vec::new();
363 transactions.extend(result.transactions);
364
365 if transactions.last().is_some() {
367 current_state = Some(result.state);
368 }
369
370 let after_start = std::time::Instant::now();
372 self.run_after_middleware(&mut current_state, &mut transactions)
373 .await?;
374 self.log_performance("后置中间件处理", after_start.elapsed());
375
376 if let Some(state) = current_state {
378 self.base
379 .update_state_with_meta(state.clone(), description, meta)
380 .await?;
381
382 let event_start = std::time::Instant::now();
383 self.base
384 .emit_event(Event::TrApply(old_id, transactions, state))
385 .await?;
386 self.log_performance("事件广播", event_start.elapsed());
387 }
388
389 self.log_performance("事务处理总耗时", start_time.elapsed());
390 Ok(())
391 }
392
393 pub async fn run_before_middleware(
394 &mut self,
395 transaction: &mut Transaction,
396 ) -> ForgeResult<()> {
397 use crate::helpers::middleware_helper::MiddlewareHelper;
398
399 MiddlewareHelper::run_before_middleware(
400 transaction,
401 &self.base.get_options().get_middleware_stack(),
402 self.base.get_config(),
403 )
404 .await?;
405
406 transaction.commit()?;
407 Ok(())
408 }
409 pub async fn run_after_middleware(
410 &mut self,
411 state: &mut Option<Arc<State>>,
412 transactions: &mut Vec<Arc<Transaction>>,
413 ) -> ForgeResult<()> {
414 debug!("执行后置中间件链");
415 for middleware in
416 &self.base.get_options().get_middleware_stack().middlewares
417 {
418 let timeout = std::time::Duration::from_millis(
421 self.base.get_config().performance.middleware_timeout_ms,
422 );
423
424 let start_time = std::time::Instant::now();
426
427 let middleware_result = match tokio::time::timeout(
428 timeout,
429 middleware.after_dispatch(state.clone(), transactions),
430 )
431 .await
432 {
433 Ok(result) => match result {
434 Ok(r) => r,
435 Err(e) => {
436 debug!("中间件执行失败: {}", e);
438 return Err(error_utils::middleware_error(format!(
439 "中间件执行失败: {e}"
440 )));
441 },
442 },
443 Err(e) => {
444 debug!("中间件执行超时: {}", e);
445 return Err(error_utils::middleware_error(format!(
446 "中间件执行超时: {e}"
447 )));
448 },
449 };
450
451 let elapsed = start_time.elapsed();
453 if elapsed.as_millis() > 100 {
454 debug!("中间件执行时间较长: {}ms", elapsed.as_millis());
455 }
456
457 if let Some(mut transaction) = middleware_result {
458 transaction.commit()?;
459 let tx_start_time = std::time::Instant::now();
461
462 let result = match self
463 .flow_engine
464 .submit_transaction((
465 self.base.get_state().clone(),
466 transaction,
467 ))
468 .await
469 {
470 Ok(result) => result,
471 Err(e) => {
472 debug!("附加事务提交失败: {}", e);
473 return Err(error_utils::state_error(format!(
474 "附加事务提交失败: {e}"
475 )));
476 },
477 };
478
479 let (_id, mut rx) = result;
480
481 let task_receive_timeout = Duration::from_millis(
483 self.base.get_config().performance.task_receive_timeout_ms,
484 );
485 let task_result =
486 match tokio::time::timeout(task_receive_timeout, rx.recv())
487 .await
488 {
489 Ok(Some(result)) => result,
490 Ok(None) => {
491 debug!("附加事务接收通道已关闭");
492 return Ok(());
493 },
494 Err(_) => {
495 debug!("附加事务接收超时");
496 return Err(error_utils::state_error(format!(
497 "附加事务接收超时({}ms)",
498 self.base
499 .get_config()
500 .performance
501 .task_receive_timeout_ms
502 )));
503 },
504 };
505
506 let Some(ProcessorResult { result: Some(result), .. }) =
507 task_result.output
508 else {
509 debug!("附加事务处理结果无效");
510 return Ok(());
511 };
512
513 let TransactionResult { state: new_state, transactions: trs } =
514 result;
515 *state = Some(new_state);
516 transactions.extend(trs);
517
518 let tx_elapsed = tx_start_time.elapsed();
520 if tx_elapsed.as_millis() > 50 {
521 debug!(
522 "附加事务处理时间较长: {}ms",
523 tx_elapsed.as_millis()
524 );
525 }
526 }
527 }
528 Ok(())
529 }
530
531 pub async fn shutdown(&mut self) -> ForgeResult<()> {
539 debug!("开始关闭异步运行时");
540
541 self.base.destroy().await?;
543
544 debug!("正在关闭流引擎...");
548
549 debug!("异步运行时已成功关闭");
550 Ok(())
551 }
552}
553
554#[async_trait]
557impl RuntimeTrait for ForgeAsyncRuntime {
558 async fn dispatch(
559 &mut self,
560 transaction: Transaction,
561 ) -> ForgeResult<()> {
562 self.dispatch_flow(transaction).await
564 }
565
566 async fn dispatch_with_meta(
567 &mut self,
568 transaction: Transaction,
569 description: String,
570 meta: serde_json::Value,
571 ) -> ForgeResult<()> {
572 self.dispatch_flow_with_meta(transaction, description, meta).await
574 }
575
576 async fn command(
577 &mut self,
578 command: Arc<dyn Command>,
579 ) -> ForgeResult<()> {
580 self.command(command).await
581 }
582
583 async fn command_with_meta(
584 &mut self,
585 command: Arc<dyn Command>,
586 description: String,
587 meta: serde_json::Value,
588 ) -> ForgeResult<()> {
589 self.command_with_meta(command, description, meta).await
590 }
591
592 async fn get_state(&self) -> ForgeResult<Arc<State>> {
593 Ok(self.base.get_state().clone())
594 }
595
596 async fn get_tr(&self) -> ForgeResult<Transaction> {
597 Ok(self.base.get_tr())
598 }
599
600 async fn get_schema(&self) -> ForgeResult<Arc<Schema>> {
601 Ok(self.base.get_schema())
602 }
603
604 async fn undo(&mut self) -> ForgeResult<()> {
605 self.base.undo();
606 Ok(())
607 }
608
609 async fn redo(&mut self) -> ForgeResult<()> {
610 self.base.redo();
611 Ok(())
612 }
613
614 async fn jump(
615 &mut self,
616 steps: isize,
617 ) -> ForgeResult<()> {
618 self.base.jump(steps);
619 Ok(())
620 }
621
622 fn get_config(&self) -> &ForgeConfig {
623 self.base.get_config()
624 }
625
626 fn update_config(
627 &mut self,
628 config: ForgeConfig,
629 ) {
630 self.base.update_config(config);
631 }
632
633 fn get_options(&self) -> &RuntimeOptions {
634 self.base.get_options()
635 }
636
637 async fn destroy(&mut self) -> ForgeResult<()> {
638 self.shutdown().await
639 }
640}