Skip to main content

dm_database_sqllog2db/features/
mod.rs

1#[cfg(feature = "filters")]
2pub mod filters;
3#[cfg(feature = "filters")]
4#[allow(unused_imports)]
5pub use filters::{FiltersFeature, IndicatorFilters, MetaFilters, SqlFilters};
6
7use dm_database_parser_sqllog::Sqllog;
8use serde::Deserialize;
9
10/// 功能开关配置
11#[derive(Debug, Deserialize, Clone, Default)]
12pub struct FeaturesConfig {
13    #[cfg(feature = "filters")]
14    pub filters: Option<FiltersFeature>,
15}
16
17impl FeaturesConfig {
18    pub fn validate() {
19        #[cfg(feature = "filters")]
20        FiltersFeature::validate();
21    }
22}
23
24/// 记录处理器接口:实现此接口即可加入处理管线
25/// 返回 true 表示保留该记录,false 表示丢弃
26pub trait LogProcessor: Send + Sync + std::fmt::Debug {
27    fn process(&self, record: &Sqllog) -> bool;
28}
29
30/// 处理管线:按顺序执行处理器,任一返回 false 则丢弃记录
31#[derive(Debug, Default)]
32pub struct Pipeline {
33    processors: Vec<Box<dyn LogProcessor>>,
34}
35
36impl Pipeline {
37    #[must_use]
38    pub fn new() -> Self {
39        Self::default()
40    }
41
42    /// 添加处理器到管线末尾
43    #[cfg(feature = "filters")]
44    pub fn add(&mut self, processor: Box<dyn LogProcessor>) {
45        self.processors.push(processor);
46    }
47
48    /// 管线是否为空(空管线可以走零开销快速路径)
49    #[must_use]
50    pub fn is_empty(&self) -> bool {
51        self.processors.is_empty()
52    }
53
54    /// 顺序执行所有处理器
55    #[inline]
56    #[must_use]
57    pub fn run(&self, record: &Sqllog) -> bool {
58        self.processors.iter().all(|p| p.process(record))
59    }
60}