1use std::{
39 ops::{Deref, DerefMut},
40 sync::Arc,
41 time::Duration,
42};
43
44use crate::runtime::ForgeRuntime;
45use crate::{
46 error_utils,
47 event::Event,
48 flow::{FlowEngine, ProcessorResult},
49 types::RuntimeOptions,
50 ForgeResult,
51};
52use moduforge_state::{
53 debug,
54 state::TransactionResult,
55 transaction::{Command, Transaction},
56 State,
57};
58
59#[derive(Debug, Clone)]
61pub struct PerformanceConfig {
62 pub enable_monitoring: bool,
64 pub middleware_timeout_ms: u64,
67 pub log_threshold_ms: u64,
70 pub task_receive_timeout_ms: u64,
74}
75
76impl Default for PerformanceConfig {
77 fn default() -> Self {
78 Self {
79 enable_monitoring: false,
80 middleware_timeout_ms: 500,
81 log_threshold_ms: 50,
82 task_receive_timeout_ms: 5000, }
84 }
85}
86
87pub struct ForgeAsyncRuntime {
90 base: ForgeRuntime,
91 flow_engine: FlowEngine,
92 perf_config: PerformanceConfig,
93}
94unsafe impl Send for ForgeAsyncRuntime {}
95unsafe impl Sync for ForgeAsyncRuntime {}
96
97impl Deref for ForgeAsyncRuntime {
98 type Target = ForgeRuntime;
99
100 fn deref(&self) -> &Self::Target {
101 &self.base
102 }
103}
104
105impl DerefMut for ForgeAsyncRuntime {
106 fn deref_mut(&mut self) -> &mut Self::Target {
107 &mut self.base
108 }
109}
110impl ForgeAsyncRuntime {
111 pub async fn create(options: RuntimeOptions) -> ForgeResult<Self> {
114 let base = ForgeRuntime::create(options).await?;
115 Ok(ForgeAsyncRuntime {
116 base,
117 flow_engine: FlowEngine::new()?,
118 perf_config: PerformanceConfig::default(),
119 })
120 }
121
122 pub fn set_performance_config(
124 &mut self,
125 config: PerformanceConfig,
126 ) {
127 self.perf_config = config;
128 }
129
130 fn log_performance(
132 &self,
133 operation: &str,
134 duration: Duration,
135 ) {
136 if self.perf_config.enable_monitoring
137 && duration.as_millis() > self.perf_config.log_threshold_ms as u128
138 {
139 debug!("{} 耗时: {}ms", operation, duration.as_millis());
140 }
141 }
142 pub async fn command(
143 &mut self,
144 command: Arc<dyn Command>,
145 ) -> ForgeResult<()> {
146 self.command_with_meta(command, "".to_string(), serde_json::Value::Null)
147 .await
148 }
149
150 pub async fn command_with_meta(
161 &mut self,
162 command: Arc<dyn Command>,
163 description: String,
164 meta: serde_json::Value,
165 ) -> ForgeResult<()> {
166 let cmd_name = command.name();
167 debug!("正在执行命令: {}", cmd_name);
168
169 let mut tr = self.get_tr();
171 command.execute(&mut tr).await?;
172 tr.commit();
173 match self.dispatch_flow_with_meta(tr, description, meta).await {
175 Ok(_) => {
176 debug!("命令 '{}' 执行成功", cmd_name);
177 Ok(())
178 },
179 Err(e) => {
180 debug!("命令 '{}' 执行失败: {}", cmd_name, e);
181 Err(e)
182 },
183 }
184 }
185 pub async fn dispatch_flow(
186 &mut self,
187 transaction: Transaction,
188 ) -> ForgeResult<()> {
189 self.dispatch_flow_with_meta(
190 transaction,
191 "".to_string(),
192 serde_json::Value::Null,
193 )
194 .await
195 }
196 pub async fn dispatch_flow_with_meta(
210 &mut self,
211 transaction: Transaction,
212 description: String,
213 meta: serde_json::Value,
214 ) -> ForgeResult<()> {
215 let start_time = std::time::Instant::now();
216 let mut current_transaction = transaction;
217 let old_id = self.get_state().version;
218 let middleware_start = std::time::Instant::now();
220 self.run_before_middleware(&mut current_transaction).await?;
221 self.log_performance("前置中间件处理", middleware_start.elapsed());
222
223 let (_id, mut rx) = self
225 .flow_engine
226 .submit_transaction((
227 self.base.get_state().clone(),
228 current_transaction,
229 ))
230 .await?;
231
232 let recv_start = std::time::Instant::now();
234 let task_receive_timeout =
235 Duration::from_millis(self.perf_config.task_receive_timeout_ms);
236 let task_result =
237 match tokio::time::timeout(task_receive_timeout, rx.recv()).await {
238 Ok(Some(result)) => result,
239 Ok(None) => {
240 return Err(error_utils::state_error(
241 "任务接收通道已关闭".to_string(),
242 ));
243 },
244 Err(_) => {
245 return Err(error_utils::state_error(format!(
246 "任务接收超时({}ms)",
247 self.perf_config.task_receive_timeout_ms
248 )));
249 },
250 };
251 self.log_performance("接收任务结果", recv_start.elapsed());
252
253 let Some(ProcessorResult { result: Some(result), .. }) =
255 task_result.output
256 else {
257 return Err(error_utils::state_error(
258 "任务处理结果无效".to_string(),
259 ));
260 };
261
262 let mut current_state = None;
264 let mut transactions = Vec::new();
265 transactions.extend(result.transactions);
266
267 if let Some(_) = transactions.last() {
269 current_state = Some(Arc::new(result.state));
270 }
271
272 let after_start = std::time::Instant::now();
274 self.run_after_middleware(&mut current_state, &mut transactions)
275 .await?;
276 self.log_performance("后置中间件处理", after_start.elapsed());
277
278 if let Some(state) = current_state {
280 self.base
281 .update_state_with_meta(state.clone(), description, meta)
282 .await?;
283
284 let event_start = std::time::Instant::now();
285 self.base
286 .emit_event(Event::TrApply(
287 old_id,
288 Arc::new(transactions),
289 state,
290 ))
291 .await?;
292 self.log_performance("事件广播", event_start.elapsed());
293 }
294
295 self.log_performance("事务处理总耗时", start_time.elapsed());
296 Ok(())
297 }
298
299 pub async fn run_before_middleware(
300 &mut self,
301 transaction: &mut Transaction,
302 ) -> ForgeResult<()> {
303 debug!("执行前置中间件链");
304 for middleware in
305 &self.base.get_options().get_middleware_stack().middlewares
306 {
307 let timeout =
308 Duration::from_millis(self.perf_config.middleware_timeout_ms);
309 match tokio::time::timeout(
310 timeout,
311 middleware.before_dispatch(transaction),
312 )
313 .await
314 {
315 Ok(Ok(())) => {
316 continue;
318 },
319 Ok(Err(e)) => {
320 return Err(error_utils::middleware_error(format!(
321 "前置中间件执行失败: {}",
322 e
323 )));
324 },
325 Err(_) => {
326 return Err(error_utils::middleware_error(format!(
327 "前置中间件执行超时({}ms)",
328 self.perf_config.middleware_timeout_ms
329 )));
330 },
331 }
332 }
333 transaction.commit();
334 Ok(())
335 }
336 pub async fn run_after_middleware(
337 &mut self,
338 state: &mut Option<Arc<State>>,
339 transactions: &mut Vec<Transaction>,
340 ) -> ForgeResult<()> {
341 debug!("执行后置中间件链");
342 for middleware in
343 &self.base.get_options().get_middleware_stack().middlewares
344 {
345 let timeout = std::time::Duration::from_millis(
348 self.perf_config.middleware_timeout_ms,
349 );
350
351 let start_time = std::time::Instant::now();
353
354 let middleware_result = match tokio::time::timeout(
355 timeout,
356 middleware.after_dispatch(state.clone(), transactions),
357 )
358 .await
359 {
360 Ok(result) => match result {
361 Ok(r) => r,
362 Err(e) => {
363 debug!("中间件执行失败: {}", e);
365 return Err(error_utils::middleware_error(format!(
366 "中间件执行失败: {}",
367 e
368 )));
369 },
370 },
371 Err(e) => {
372 debug!("中间件执行超时: {}", e);
373 return Err(error_utils::middleware_error(format!(
374 "中间件执行超时: {}",
375 e
376 )));
377 },
378 };
379
380 let elapsed = start_time.elapsed();
382 if elapsed.as_millis() > 100 {
383 debug!("中间件执行时间较长: {}ms", elapsed.as_millis());
384 }
385
386 if let Some(mut transaction) =
387 middleware_result
388 {
389 transaction.commit();
390 let tx_start_time = std::time::Instant::now();
392
393 let result = match self
394 .flow_engine
395 .submit_transaction((
396 self.base.get_state().clone(),
397 transaction,
398 ))
399 .await
400 {
401 Ok(result) => result,
402 Err(e) => {
403 debug!("附加事务提交失败: {}", e);
404 return Err(error_utils::state_error(format!(
405 "附加事务提交失败: {}",
406 e
407 )));
408 },
409 };
410
411 let (_id, mut rx) = result;
412
413 let task_receive_timeout = Duration::from_millis(
415 self.perf_config.task_receive_timeout_ms,
416 );
417 let task_result =
418 match tokio::time::timeout(task_receive_timeout, rx.recv())
419 .await
420 {
421 Ok(Some(result)) => result,
422 Ok(None) => {
423 debug!("附加事务接收通道已关闭");
424 return Ok(());
425 },
426 Err(_) => {
427 debug!("附加事务接收超时");
428 return Err(error_utils::state_error(format!(
429 "附加事务接收超时({}ms)",
430 self.perf_config.task_receive_timeout_ms
431 )));
432 },
433 };
434
435 let Some(ProcessorResult { result: Some(result), .. }) =
436 task_result.output
437 else {
438 debug!("附加事务处理结果无效");
439 return Ok(());
440 };
441
442 let TransactionResult { state: new_state, transactions: trs } =
443 result;
444 *state = Some(Arc::new(new_state));
445 transactions.extend(trs);
446
447 let tx_elapsed = tx_start_time.elapsed();
449 if tx_elapsed.as_millis() > 50 {
450 debug!(
451 "附加事务处理时间较长: {}ms",
452 tx_elapsed.as_millis()
453 );
454 }
455 }
456 }
457 Ok(())
458 }
459}