mf_core/helpers/
middleware_helper.rs

1//! 中间件执行框架辅助模块
2//!
3//! 提供统一的中间件执行逻辑,包括:
4//! - before_dispatch 中间件链执行
5//! - after_dispatch 中间件链执行
6//! - 超时处理
7//! - 错误处理
8//! - 性能指标记录
9
10use crate::{
11    config::ForgeConfig,
12    debug::debug,
13    error::{error_utils, ForgeResult},
14    metrics,
15    middleware::MiddlewareStack,
16};
17use mf_state::{state::State, transaction::Transaction};
18use std::sync::Arc;
19use std::time::Instant;
20
21/// 中间件执行辅助器
22pub struct MiddlewareHelper;
23
24impl MiddlewareHelper {
25    /// 执行前置中间件链
26    ///
27    /// # 参数
28    /// * `transaction` - 可变的事务引用
29    /// * `middleware_stack` - 中间件栈
30    /// * `config` - Forge配置(用于获取超时设置)
31    ///
32    /// # 返回值
33    /// * `ForgeResult<()>` - 成功或错误
34    pub async fn run_before_middleware(
35        transaction: &mut Transaction,
36        middleware_stack: &MiddlewareStack,
37        config: &ForgeConfig,
38    ) -> ForgeResult<()> {
39        debug!("执行前置中间件链");
40
41        let timeout = std::time::Duration::from_millis(
42            config.performance.middleware_timeout_ms,
43        );
44
45        for middleware in &middleware_stack.middlewares {
46            let start_time = Instant::now();
47
48            match tokio::time::timeout(
49                timeout,
50                middleware.before_dispatch(transaction),
51            )
52            .await
53            {
54                Ok(Ok(())) => {
55                    // 中间件执行成功
56                    metrics::middleware_execution_duration(
57                        start_time.elapsed(),
58                        "before",
59                        middleware.name().as_str(),
60                    );
61                    continue;
62                },
63                Ok(Err(e)) => {
64                    return Err(error_utils::middleware_error(format!(
65                        "前置中间件执行失败: {e}"
66                    )));
67                },
68                Err(_) => {
69                    return Err(error_utils::middleware_error(format!(
70                        "前置中间件执行超时({}ms)",
71                        config.performance.middleware_timeout_ms
72                    )));
73                },
74            }
75        }
76
77        Ok(())
78    }
79
80    /// 执行后置中间件链
81    ///
82    /// # 参数
83    /// * `state` - 可变的状态选项引用
84    /// * `transactions` - 可变的事务列表引用
85    /// * `middleware_stack` - 中间件栈
86    /// * `config` - Forge配置(用于获取超时设置)
87    ///
88    /// # 返回值
89    /// * `ForgeResult<()>` - 成功或错误
90    pub async fn run_after_middleware(
91        state: &mut Option<Arc<State>>,
92        transactions: &mut [Arc<Transaction>],
93        middleware_stack: &MiddlewareStack,
94        config: &ForgeConfig,
95    ) -> ForgeResult<()> {
96        debug!("执行后置中间件链");
97
98        let timeout = std::time::Duration::from_millis(
99            config.performance.middleware_timeout_ms,
100        );
101
102        for middleware in &middleware_stack.middlewares {
103            let start_time = Instant::now();
104
105            let middleware_result = match tokio::time::timeout(
106                timeout,
107                middleware.after_dispatch(state.clone(), transactions),
108            )
109            .await
110            {
111                Ok(Ok(result)) => {
112                    metrics::middleware_execution_duration(
113                        start_time.elapsed(),
114                        "after",
115                        middleware.name().as_str(),
116                    );
117                    result
118                },
119                Ok(Err(e)) => {
120                    return Err(error_utils::middleware_error(format!(
121                        "后置中间件执行失败: {e}"
122                    )));
123                },
124                Err(_) => {
125                    return Err(error_utils::middleware_error(format!(
126                        "后置中间件执行超时({}ms)",
127                        config.performance.middleware_timeout_ms
128                    )));
129                },
130            };
131
132            // 处理中间件返回的额外事务
133            // 注意:原始实现中,middleware_result 是 Option<Transaction>
134            // 如果返回了 Transaction,需要由调用者进一步处理
135            // 这里我们暂时保留这个返回值,由调用者决定如何处理
136            if middleware_result.is_some() {
137                // 中间件返回了额外的事务,但这里无法直接处理
138                // 因为需要通过 flow_engine 提交,这是运行时特定的逻辑
139                // 所以这部分逻辑保留在 ForgeRuntime 中
140            }
141        }
142
143        Ok(())
144    }
145}