Skip to main content

dm_database_sqllog2db/features/
filters.rs

1use serde::Deserialize;
2use std::collections::HashSet;
3
4/// 过滤器配置 (重构后)
5#[derive(Debug, Deserialize, Clone, Default)]
6pub struct FiltersFeature {
7    /// 是否启用过滤器
8    pub enable: bool,
9    /// 元数据过滤器 (记录级: 只要命中其中一个就保留该记录 - OR 逻辑)
10    #[serde(flatten)]
11    pub meta: MetaFilters,
12    /// 指标过滤器 (事务级: 命中即保留整笔事务 - 需要预扫描)
13    #[serde(default)]
14    pub indicators: IndicatorFilters,
15    /// SQL 内容过滤器 (事务级: 未来扩展)
16    #[serde(default)]
17    pub sql: SqlFilters,
18}
19
20/// 元数据过滤器 (Record-level)
21#[derive(Debug, Deserialize, Clone, Default)]
22pub struct MetaFilters {
23    pub start_ts: Option<String>,
24    pub end_ts: Option<String>,
25    pub sess_ids: Option<Vec<String>>,
26    pub thrd_ids: Option<Vec<String>>,
27    pub usernames: Option<Vec<String>>,
28    pub trxids: Option<Vec<String>>,
29    pub statements: Option<Vec<String>>,
30    pub appnames: Option<Vec<String>>,
31    pub client_ips: Option<Vec<String>>,
32    pub tags: Option<Vec<String>>,
33}
34
35/// 指标过滤器 (Transaction-level)
36#[derive(Debug, Deserialize, Clone, Default)]
37pub struct IndicatorFilters {
38    pub exec_ids: Option<Vec<i64>>,
39    pub min_runtime_ms: Option<i64>,
40    pub min_row_count: Option<i64>,
41}
42
43/// SQL 过滤器 (未来扩展)
44#[derive(Debug, Deserialize, Clone, Default)]
45pub struct SqlFilters {
46    pub include_patterns: Option<Vec<String>>,
47    pub exclude_patterns: Option<Vec<String>>,
48}
49
50impl FiltersFeature {
51    pub fn validate() {}
52
53    /// 检查是否配置了任何过滤器
54    #[must_use]
55    pub fn has_filters(&self) -> bool {
56        self.meta.start_ts.is_some()
57            || self.meta.end_ts.is_some()
58            || self.meta.has_filters()
59            || self.indicators.has_filters()
60            || self.sql.has_filters()
61    }
62
63    /// 检查是否提供了需要预扫描的过滤器 (Transaction-level)
64    #[must_use]
65    pub fn has_transaction_filters(&self) -> bool {
66        // 如果未开启过滤器功能,则不执行预扫描
67        if !self.enable && !self.has_filters() {
68            return false;
69        }
70        self.indicators.has_filters() || self.sql.has_filters()
71    }
72
73    /// 检查记录是否应该被保留
74    /// 逻辑:(满足时间过滤) AND ( (没有任何其他过滤) OR (满足任一元数据过滤) OR (属于被选中的事务) )
75    #[allow(clippy::too_many_arguments)]
76    #[must_use]
77    pub fn should_keep(
78        &self,
79        ts: &str,
80        trxid: &str,
81        ip: &str,
82        sess: &str,
83        thrd: &str,
84        user: &str,
85        stmt: &str,
86        app: &str,
87        tag: Option<&str>,
88    ) -> bool {
89        // 如果未配置任何过滤器,保留所有记录
90        if !self.has_filters() {
91            return true;
92        }
93
94        // 如果配置了过滤器但未明确启用,且有配置项,我们就执行过滤
95        if !self.enable && !self.has_filters() {
96            return true;
97        }
98
99        // 1. 时间范围过滤 (AND 逻辑: 如果配置了时间,必须通过时间检查)
100        if let Some(start) = &self.meta.start_ts {
101            if ts < start.as_str() && !ts.starts_with(start.as_str()) {
102                return false;
103            }
104        }
105        if let Some(end) = &self.meta.end_ts {
106            if ts > end.as_str() && !ts.starts_with(end.as_str()) {
107                return false;
108            }
109        }
110
111        // 2. 元数据过滤 (OR 逻辑: 在通过时间过滤的前提下,如果配置了元数据过滤,需命中其中之一)
112        // 注意:如果 meta.has_filters() 为 false,说明只有时间过滤器或指标过滤器,
113        // 既然已经通过了时间过滤(如果有的话),且没有其他元数据要求,则暂且保留(指标过滤由后续逻辑或 merge_found_trxids 处理)
114        if !self.meta.has_filters() {
115            return true;
116        }
117
118        self.meta
119            .should_keep(ts, trxid, ip, sess, thrd, user, stmt, app, tag)
120    }
121
122    /// 合并预扫描发现的事务 ID 到 `MetaFilters` 中,以便在正式扫描时直接通过 trxid 匹配保留整笔事务
123    pub fn merge_found_trxids(&mut self, trxids: Vec<String>) {
124        if (!self.enable && !self.has_filters()) || trxids.is_empty() {
125            return;
126        }
127        let current = self.meta.trxids.take().unwrap_or_default();
128        let mut set: HashSet<_> = current.into_iter().collect();
129        for id in trxids {
130            set.insert(id);
131        }
132        self.meta.trxids = Some(set.into_iter().collect());
133    }
134}
135
136impl MetaFilters {
137    #[must_use]
138    pub fn has_filters(&self) -> bool {
139        self.trxids.as_ref().is_some_and(|v| !v.is_empty())
140            || self.client_ips.as_ref().is_some_and(|v| !v.is_empty())
141            || self.sess_ids.as_ref().is_some_and(|v| !v.is_empty())
142            || self.thrd_ids.as_ref().is_some_and(|v| !v.is_empty())
143            || self.usernames.as_ref().is_some_and(|v| !v.is_empty())
144            || self.statements.as_ref().is_some_and(|v| !v.is_empty())
145            || self.appnames.as_ref().is_some_and(|v| !v.is_empty())
146            || self.tags.as_ref().is_some_and(|v| !v.is_empty())
147    }
148
149    #[allow(clippy::too_many_arguments)]
150    #[must_use]
151    pub fn should_keep(
152        &self,
153        _ts: &str,
154        trxid: &str,
155        ip: &str,
156        sess: &str,
157        thrd: &str,
158        user: &str,
159        stmt: &str,
160        app: &str,
161        tag: Option<&str>,
162    ) -> bool {
163        // OR 逻辑:命中任何一个已定义的列表即保留 (前提是已通过时间过滤)
164        Self::match_list(self.trxids.as_ref(), trxid, true)
165            || Self::match_list(self.client_ips.as_ref(), ip, false)
166            || Self::match_list(self.sess_ids.as_ref(), sess, false)
167            || Self::match_list(self.thrd_ids.as_ref(), thrd, false)
168            || Self::match_list(self.usernames.as_ref(), user, false)
169            || Self::match_list(self.statements.as_ref(), stmt, false)
170            || Self::match_list(self.appnames.as_ref(), app, false)
171            || tag.is_some_and(|t| Self::match_list(self.tags.as_ref(), t, false))
172    }
173
174    fn match_list(list: Option<&Vec<String>>, val: &str, exact: bool) -> bool {
175        if let Some(items) = list {
176            if items.is_empty() {
177                return false;
178            }
179            if exact {
180                items.iter().any(|i| i == val)
181            } else {
182                items.iter().any(|i| val.contains(i))
183            }
184        } else {
185            false
186        }
187    }
188}
189
190impl IndicatorFilters {
191    #[must_use]
192    pub fn has_filters(&self) -> bool {
193        self.exec_ids.as_ref().is_some_and(|v| !v.is_empty())
194            || self.min_runtime_ms.is_some()
195            || self.min_row_count.is_some()
196    }
197
198    #[must_use]
199    pub fn matches(&self, exec_id: i64, runtime_ms: i64, row_count: i64) -> bool {
200        if !self.has_filters() {
201            return false;
202        }
203
204        if let Some(ids) = &self.exec_ids {
205            if ids.contains(&exec_id) {
206                return true;
207            }
208        }
209        if let Some(min_t) = self.min_runtime_ms {
210            if runtime_ms >= min_t {
211                return true;
212            }
213        }
214        if let Some(min_r) = self.min_row_count {
215            if row_count >= min_r {
216                return true;
217            }
218        }
219        false
220    }
221}
222
223impl SqlFilters {
224    #[must_use]
225    pub fn has_filters(&self) -> bool {
226        self.include_patterns
227            .as_ref()
228            .is_some_and(|v| !v.is_empty())
229            || self
230                .exclude_patterns
231                .as_ref()
232                .is_some_and(|v| !v.is_empty())
233    }
234
235    #[must_use]
236    pub fn matches(&self, sql: &str) -> bool {
237        if !self.has_filters() {
238            return false;
239        }
240
241        // 如果指定了包含模式,必须命中其中之一
242        let include_match = if let Some(patterns) = &self.include_patterns {
243            if patterns.is_empty() {
244                true
245            } else {
246                patterns.iter().any(|p| sql.contains(p))
247            }
248        } else {
249            true
250        };
251
252        if !include_match {
253            return false;
254        }
255
256        // 如果指定了排除模式,不能命中任何一个
257        if let Some(patterns) = &self.exclude_patterns {
258            if patterns.iter().any(|p| sql.contains(p)) {
259                return false;
260            }
261        }
262
263        true
264    }
265}