1use std::{
39 ops::{Deref, DerefMut},
40 sync::Arc,
41 time::Duration,
42};
43use crate::runtime::runtime::ForgeRuntime;
44use crate::types::ProcessorResult;
45use crate::{
46 config::{ForgeConfig, PerformanceConfig},
47 error::error_utils,
48 event::Event,
49 runtime::async_flow::{FlowEngine},
50 types::RuntimeOptions,
51 ForgeResult,
52};
53use mf_state::{
54 debug,
55 state::TransactionResult,
56 transaction::{Command, Transaction},
57 State,
58};
59
60pub struct ForgeAsyncRuntime {
70 base: ForgeRuntime,
71 flow_engine: FlowEngine,
72}
73
74impl Deref for ForgeAsyncRuntime {
75 type Target = ForgeRuntime;
76
77 fn deref(&self) -> &Self::Target {
78 &self.base
79 }
80}
81
82impl DerefMut for ForgeAsyncRuntime {
83 fn deref_mut(&mut self) -> &mut Self::Target {
84 &mut self.base
85 }
86}
87impl ForgeAsyncRuntime {
88 pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
101 Self::create_with_config(options, ForgeConfig::default()).await
102 }
103
104 pub async fn from_xml_schema_path(
126 xml_schema_path: &str,
127 options: Option<RuntimeOptions>,
128 config: Option<ForgeConfig>,
129 ) -> ForgeResult<Self> {
130 let mut config = config.unwrap_or_default();
131 config.extension.xml_schema_paths = vec![xml_schema_path.to_string()];
132 Self::create_with_config(options.unwrap_or_default(), config).await
133 }
134
135 pub async fn from_xml_schemas(
145 xml_schema_paths: &[&str],
146 options: Option<RuntimeOptions>,
147 config: Option<ForgeConfig>,
148 ) -> ForgeResult<Self> {
149 let mut config = config.unwrap_or_default();
150 config.extension.xml_schema_paths =
151 xml_schema_paths.iter().map(|s| s.to_string()).collect();
152 Self::create_with_config(options.unwrap_or_default(), config).await
153 }
154
155 pub async fn from_xml_content(
165 xml_content: &str,
166 options: Option<RuntimeOptions>,
167 config: Option<ForgeConfig>,
168 ) -> ForgeResult<Self> {
169 let base = ForgeRuntime::from_xml_content(xml_content, options, config)
170 .await?;
171 Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
172 }
173
174 pub async fn create_with_config(
188 options: RuntimeOptions,
189 config: ForgeConfig,
190 ) -> ForgeResult<Self> {
191 let base = ForgeRuntime::create_with_config(options, config).await?;
192 Ok(ForgeAsyncRuntime { base, flow_engine: FlowEngine::new().await? })
193 }
194
195 pub fn set_performance_config(
197 &mut self,
198 perf_config: PerformanceConfig,
199 ) {
200 self.base.update_config({
201 let mut config = self.base.get_config().clone();
202 config.performance = perf_config;
203 config
204 });
205 }
206
207 pub fn get_config(&self) -> &ForgeConfig {
209 self.base.get_config()
210 }
211
212 pub fn update_config(
214 &mut self,
215 config: ForgeConfig,
216 ) {
217 self.base.update_config(config);
218 }
219
220 fn log_performance(
222 &self,
223 operation: &str,
224 duration: Duration,
225 ) {
226 if self.base.get_config().performance.enable_monitoring
227 && duration.as_millis()
228 > self.base.get_config().performance.log_threshold_ms as u128
229 {
230 debug!("{} 耗时: {}ms", operation, duration.as_millis());
231 }
232 }
233 pub async fn command(
234 &mut self,
235 command: Arc<dyn Command>,
236 ) -> ForgeResult<()> {
237 self.command_with_meta(command, "".to_string(), serde_json::Value::Null)
238 .await
239 }
240
241 pub async fn command_with_meta(
252 &mut self,
253 command: Arc<dyn Command>,
254 description: String,
255 meta: serde_json::Value,
256 ) -> ForgeResult<()> {
257 let cmd_name = command.name();
258 debug!("正在执行命令: {}", cmd_name);
259
260 let mut tr = self.get_tr();
262 command.execute(&mut tr).await?;
263 tr.commit();
264 match self.dispatch_flow_with_meta(tr, description, meta).await {
266 Ok(_) => {
267 debug!("命令 '{}' 执行成功", cmd_name);
268 Ok(())
269 },
270 Err(e) => {
271 debug!("命令 '{}' 执行失败: {}", cmd_name, e);
272 Err(e)
273 },
274 }
275 }
276 pub async fn dispatch_flow(
277 &mut self,
278 transaction: Transaction,
279 ) -> ForgeResult<()> {
280 self.dispatch_flow_with_meta(
281 transaction,
282 "".to_string(),
283 serde_json::Value::Null,
284 )
285 .await
286 }
287 pub async fn dispatch_flow_with_meta(
301 &mut self,
302 transaction: Transaction,
303 description: String,
304 meta: serde_json::Value,
305 ) -> ForgeResult<()> {
306 let start_time = std::time::Instant::now();
307 let mut current_transaction = transaction;
308 let old_id = self.get_state().version;
309 let middleware_start = std::time::Instant::now();
311 self.run_before_middleware(&mut current_transaction).await?;
312 self.log_performance("前置中间件处理", middleware_start.elapsed());
313
314 let (_id, mut rx) = self
316 .flow_engine
317 .submit_transaction((
318 self.base.get_state().clone(),
319 current_transaction,
320 ))
321 .await?;
322
323 let recv_start = std::time::Instant::now();
325 let task_receive_timeout = Duration::from_millis(
326 self.base.get_config().performance.task_receive_timeout_ms,
327 );
328 let task_result =
329 match tokio::time::timeout(task_receive_timeout, rx.recv()).await {
330 Ok(Some(result)) => result,
331 Ok(None) => {
332 return Err(error_utils::state_error(
333 "任务接收通道已关闭".to_string(),
334 ));
335 },
336 Err(_) => {
337 return Err(error_utils::state_error(format!(
338 "任务接收超时({}ms)",
339 self.base
340 .get_config()
341 .performance
342 .task_receive_timeout_ms
343 )));
344 },
345 };
346 self.log_performance("接收任务结果", recv_start.elapsed());
347
348 let Some(ProcessorResult { result: Some(result), .. }) =
350 task_result.output
351 else {
352 return Err(error_utils::state_error(
353 "任务处理结果无效".to_string(),
354 ));
355 };
356
357 let mut current_state = None;
359 let mut transactions = Vec::new();
360 transactions.extend(result.transactions);
361
362 if let Some(_) = transactions.last() {
364 current_state = Some(Arc::new(result.state));
365 }
366
367 let after_start = std::time::Instant::now();
369 self.run_after_middleware(&mut current_state, &mut transactions)
370 .await?;
371 self.log_performance("后置中间件处理", after_start.elapsed());
372
373 if let Some(state) = current_state {
375 self.base
376 .update_state_with_meta(state.clone(), description, meta)
377 .await?;
378
379 let event_start = std::time::Instant::now();
380 self.base
381 .emit_event(Event::TrApply(
382 old_id,
383 Arc::new(transactions),
384 state,
385 ))
386 .await?;
387 self.log_performance("事件广播", event_start.elapsed());
388 }
389
390 self.log_performance("事务处理总耗时", start_time.elapsed());
391 Ok(())
392 }
393
394 pub async fn run_before_middleware(
395 &mut self,
396 transaction: &mut Transaction,
397 ) -> ForgeResult<()> {
398 debug!("执行前置中间件链");
399 for middleware in
400 &self.base.get_options().get_middleware_stack().middlewares
401 {
402 let timeout = Duration::from_millis(
403 self.base.get_config().performance.middleware_timeout_ms,
404 );
405 match tokio::time::timeout(
406 timeout,
407 middleware.before_dispatch(transaction),
408 )
409 .await
410 {
411 Ok(Ok(())) => {
412 continue;
414 },
415 Ok(Err(e)) => {
416 return Err(error_utils::middleware_error(format!(
417 "前置中间件执行失败: {}",
418 e
419 )));
420 },
421 Err(_) => {
422 return Err(error_utils::middleware_error(format!(
423 "前置中间件执行超时({}ms)",
424 self.base
425 .get_config()
426 .performance
427 .middleware_timeout_ms
428 )));
429 },
430 }
431 }
432 transaction.commit();
433 Ok(())
434 }
435 pub async fn run_after_middleware(
436 &mut self,
437 state: &mut Option<Arc<State>>,
438 transactions: &mut Vec<Transaction>,
439 ) -> ForgeResult<()> {
440 debug!("执行后置中间件链");
441 for middleware in
442 &self.base.get_options().get_middleware_stack().middlewares
443 {
444 let timeout = std::time::Duration::from_millis(
447 self.base.get_config().performance.middleware_timeout_ms,
448 );
449
450 let start_time = std::time::Instant::now();
452
453 let middleware_result = match tokio::time::timeout(
454 timeout,
455 middleware.after_dispatch(state.clone(), transactions),
456 )
457 .await
458 {
459 Ok(result) => match result {
460 Ok(r) => r,
461 Err(e) => {
462 debug!("中间件执行失败: {}", e);
464 return Err(error_utils::middleware_error(format!(
465 "中间件执行失败: {}",
466 e
467 )));
468 },
469 },
470 Err(e) => {
471 debug!("中间件执行超时: {}", e);
472 return Err(error_utils::middleware_error(format!(
473 "中间件执行超时: {}",
474 e
475 )));
476 },
477 };
478
479 let elapsed = start_time.elapsed();
481 if elapsed.as_millis() > 100 {
482 debug!("中间件执行时间较长: {}ms", elapsed.as_millis());
483 }
484
485 if let Some(mut transaction) = middleware_result {
486 transaction.commit();
487 let tx_start_time = std::time::Instant::now();
489
490 let result = match self
491 .flow_engine
492 .submit_transaction((
493 self.base.get_state().clone(),
494 transaction,
495 ))
496 .await
497 {
498 Ok(result) => result,
499 Err(e) => {
500 debug!("附加事务提交失败: {}", e);
501 return Err(error_utils::state_error(format!(
502 "附加事务提交失败: {}",
503 e
504 )));
505 },
506 };
507
508 let (_id, mut rx) = result;
509
510 let task_receive_timeout = Duration::from_millis(
512 self.base.get_config().performance.task_receive_timeout_ms,
513 );
514 let task_result =
515 match tokio::time::timeout(task_receive_timeout, rx.recv())
516 .await
517 {
518 Ok(Some(result)) => result,
519 Ok(None) => {
520 debug!("附加事务接收通道已关闭");
521 return Ok(());
522 },
523 Err(_) => {
524 debug!("附加事务接收超时");
525 return Err(error_utils::state_error(format!(
526 "附加事务接收超时({}ms)",
527 self.base
528 .get_config()
529 .performance
530 .task_receive_timeout_ms
531 )));
532 },
533 };
534
535 let Some(ProcessorResult { result: Some(result), .. }) =
536 task_result.output
537 else {
538 debug!("附加事务处理结果无效");
539 return Ok(());
540 };
541
542 let TransactionResult { state: new_state, transactions: trs } =
543 result;
544 *state = Some(Arc::new(new_state));
545 transactions.extend(trs);
546
547 let tx_elapsed = tx_start_time.elapsed();
549 if tx_elapsed.as_millis() > 50 {
550 debug!(
551 "附加事务处理时间较长: {}ms",
552 tx_elapsed.as_millis()
553 );
554 }
555 }
556 }
557 Ok(())
558 }
559
560 pub async fn shutdown(&mut self) -> ForgeResult<()> {
568 debug!("开始关闭异步运行时");
569
570 self.base.destroy().await?;
572
573 debug!("正在关闭流引擎...");
577
578 debug!("异步运行时已成功关闭");
579 Ok(())
580 }
581}