Skip to main content

dm_database_sqllog2db/features/
filters.rs

1use serde::Deserialize;
2use std::collections::HashSet;
3
4/// 过滤器配置 (重构后)
5#[derive(Debug, Deserialize, Clone, Default)]
6pub struct FiltersFeature {
7    /// 是否启用过滤器
8    pub enable: bool,
9    /// 元数据过滤器 (记录级: 只要命中其中一个就保留该记录 - OR 逻辑)
10    #[serde(flatten)]
11    pub meta: MetaFilters,
12    /// 指标过滤器 (事务级: 命中即保留整笔事务 - 需要预扫描)
13    #[serde(default)]
14    pub indicators: IndicatorFilters,
15    /// SQL 内容过滤器 (事务级: 未来扩展)
16    #[serde(default)]
17    pub sql: SqlFilters,
18}
19
20/// 元数据过滤器 (Record-level)
21#[derive(Debug, Deserialize, Clone, Default)]
22pub struct MetaFilters {
23    pub start_ts: Option<String>,
24    pub end_ts: Option<String>,
25    pub sess_ids: Option<Vec<String>>,
26    pub thrd_ids: Option<Vec<String>>,
27    pub usernames: Option<Vec<String>>,
28    pub trxids: Option<Vec<String>>,
29    pub statements: Option<Vec<String>>,
30    pub appnames: Option<Vec<String>>,
31    pub client_ips: Option<Vec<String>>,
32    pub tags: Option<Vec<String>>,
33}
34
35/// 指标过滤器 (Transaction-level)
36#[derive(Debug, Deserialize, Clone, Default)]
37pub struct IndicatorFilters {
38    pub exec_ids: Option<Vec<i64>>,
39    pub min_runtime_ms: Option<i64>,
40    pub min_row_count: Option<i64>,
41}
42
43/// SQL 过滤器 (未来扩展)
44#[derive(Debug, Deserialize, Clone, Default)]
45pub struct SqlFilters {
46    pub include_patterns: Option<Vec<String>>,
47    pub exclude_patterns: Option<Vec<String>>,
48}
49
50impl FiltersFeature {
51    pub fn validate() {}
52
53    /// 检查是否配置了任何过滤器
54    #[must_use]
55    pub fn has_filters(&self) -> bool {
56        if !self.enable {
57            return false;
58        }
59        self.meta.start_ts.is_some()
60            || self.meta.end_ts.is_some()
61            || self.meta.has_filters()
62            || self.indicators.has_filters()
63            || self.sql.has_filters()
64    }
65
66    /// 检查是否提供了需要预扫描的过滤器 (Transaction-level)
67    #[must_use]
68    pub fn has_transaction_filters(&self) -> bool {
69        // 如果未开启过滤器功能,则不执行预扫描
70        if !self.enable {
71            return false;
72        }
73        self.indicators.has_filters() || self.sql.has_filters()
74    }
75
76    /// 检查记录是否应该被保留
77    /// 逻辑:(满足时间过滤) AND ( (没有任何其他过滤) OR (满足任一元数据过滤) OR (属于被选中的事务) )
78    #[allow(clippy::too_many_arguments)]
79    #[must_use]
80    pub fn should_keep(
81        &self,
82        ts: &str,
83        trxid: &str,
84        ip: &str,
85        sess: &str,
86        thrd: &str,
87        user: &str,
88        stmt: &str,
89        app: &str,
90        tag: Option<&str>,
91    ) -> bool {
92        // 1. 时间范围过滤 (AND 逻辑: 如果配置了时间,必须通过时间检查)
93        if let Some(start) = &self.meta.start_ts {
94            if ts < start.as_str() && !ts.starts_with(start.as_str()) {
95                return false;
96            }
97        }
98        if let Some(end) = &self.meta.end_ts {
99            if ts > end.as_str() && !ts.starts_with(end.as_str()) {
100                return false;
101            }
102        }
103
104        // 2. 元数据过滤 (OR 逻辑: 在通过时间过滤的前提下,如果配置了元数据过滤,需命中其中之一)
105        // 如果 meta.has_filters() 为 false,且通过了时间过滤,则保留。
106        if !self.meta.has_filters() {
107            return true;
108        }
109
110        self.meta
111            .should_keep(ts, trxid, ip, sess, thrd, user, stmt, app, tag)
112    }
113
114    /// 合并预扫描发现的事务 ID 到 `MetaFilters` 中,以便在正式扫描时直接通过 trxid 匹配保留整笔事务
115    pub fn merge_found_trxids(&mut self, trxids: Vec<String>) {
116        if (!self.enable && !self.has_filters()) || trxids.is_empty() {
117            return;
118        }
119        let current = self.meta.trxids.take().unwrap_or_default();
120        let mut set: HashSet<_> = current.into_iter().collect();
121        for id in trxids {
122            set.insert(id);
123        }
124        self.meta.trxids = Some(set.into_iter().collect());
125    }
126}
127
128impl MetaFilters {
129    #[must_use]
130    pub fn has_filters(&self) -> bool {
131        self.trxids.as_ref().is_some_and(|v| !v.is_empty())
132            || self.client_ips.as_ref().is_some_and(|v| !v.is_empty())
133            || self.sess_ids.as_ref().is_some_and(|v| !v.is_empty())
134            || self.thrd_ids.as_ref().is_some_and(|v| !v.is_empty())
135            || self.usernames.as_ref().is_some_and(|v| !v.is_empty())
136            || self.statements.as_ref().is_some_and(|v| !v.is_empty())
137            || self.appnames.as_ref().is_some_and(|v| !v.is_empty())
138            || self.tags.as_ref().is_some_and(|v| !v.is_empty())
139    }
140
141    #[allow(clippy::too_many_arguments)]
142    #[must_use]
143    pub fn should_keep(
144        &self,
145        _ts: &str,
146        trxid: &str,
147        ip: &str,
148        sess: &str,
149        thrd: &str,
150        user: &str,
151        stmt: &str,
152        app: &str,
153        tag: Option<&str>,
154    ) -> bool {
155        // OR 逻辑:命中任何一个已定义的列表即保留 (前提是已通过时间过滤)
156        Self::match_list(self.trxids.as_ref(), trxid, true)
157            || Self::match_list(self.client_ips.as_ref(), ip, false)
158            || Self::match_list(self.sess_ids.as_ref(), sess, false)
159            || Self::match_list(self.thrd_ids.as_ref(), thrd, false)
160            || Self::match_list(self.usernames.as_ref(), user, false)
161            || Self::match_list(self.statements.as_ref(), stmt, false)
162            || Self::match_list(self.appnames.as_ref(), app, false)
163            || tag.is_some_and(|t| Self::match_list(self.tags.as_ref(), t, false))
164    }
165
166    fn match_list(list: Option<&Vec<String>>, val: &str, exact: bool) -> bool {
167        if let Some(items) = list {
168            if items.is_empty() {
169                return false;
170            }
171            if exact {
172                items.iter().any(|i| i == val)
173            } else {
174                items.iter().any(|i| val.contains(i))
175            }
176        } else {
177            false
178        }
179    }
180}
181
182impl IndicatorFilters {
183    #[must_use]
184    pub fn has_filters(&self) -> bool {
185        self.exec_ids.as_ref().is_some_and(|v| !v.is_empty())
186            || self.min_runtime_ms.is_some()
187            || self.min_row_count.is_some()
188    }
189
190    #[must_use]
191    pub fn matches(&self, exec_id: i64, runtime_ms: i64, row_count: i64) -> bool {
192        if !self.has_filters() {
193            return false;
194        }
195
196        if let Some(ids) = &self.exec_ids {
197            if ids.contains(&exec_id) {
198                return true;
199            }
200        }
201        if let Some(min_t) = self.min_runtime_ms {
202            if runtime_ms >= min_t {
203                return true;
204            }
205        }
206        if let Some(min_r) = self.min_row_count {
207            if row_count >= min_r {
208                return true;
209            }
210        }
211        false
212    }
213}
214
215impl SqlFilters {
216    #[must_use]
217    pub fn has_filters(&self) -> bool {
218        self.include_patterns
219            .as_ref()
220            .is_some_and(|v| !v.is_empty())
221            || self
222                .exclude_patterns
223                .as_ref()
224                .is_some_and(|v| !v.is_empty())
225    }
226
227    #[must_use]
228    pub fn matches(&self, sql: &str) -> bool {
229        if !self.has_filters() {
230            return false;
231        }
232
233        // 如果指定了包含模式,必须命中其中之一
234        let include_match = if let Some(patterns) = &self.include_patterns {
235            if patterns.is_empty() {
236                true
237            } else {
238                patterns.iter().any(|p| sql.contains(p))
239            }
240        } else {
241            true
242        };
243
244        if !include_match {
245            return false;
246        }
247
248        // 如果指定了排除模式,不能命中任何一个
249        if let Some(patterns) = &self.exclude_patterns {
250            if patterns.iter().any(|p| sql.contains(p)) {
251                return false;
252            }
253        }
254
255        true
256    }
257}