Skip to main content

dm_database_sqllog2db/cli/
run.rs

1use crate::config::Config;
2use crate::error::ParserError;
3use crate::error::{Error, Result};
4use crate::error_logger::ErrorLogger;
5use crate::exporter::ExporterManager;
6use crate::features::Pipeline;
7use crate::parser::SqllogParser;
8use dm_database_parser_sqllog::LogParser;
9use log::{info, warn};
10use std::time::Instant;
11
12#[cfg(feature = "filters")]
13use std::collections::HashSet;
14
15#[cfg(feature = "filters")]
16use crate::features::LogProcessor;
17
18/// 构建处理器管线
19fn build_pipeline(cfg: &Config) -> Pipeline {
20    #[allow(unused_mut)]
21    let mut pipeline = Pipeline::new();
22
23    #[cfg(feature = "filters")]
24    if let Some(f) = &cfg.features.filters {
25        if f.has_filters() {
26            pipeline.add(Box::new(FilterProcessor { filter: f.clone() }));
27        }
28    }
29
30    #[cfg(not(feature = "filters"))]
31    let _ = cfg;
32
33    pipeline
34}
35
36#[cfg(feature = "filters")]
37#[derive(Debug)]
38struct FilterProcessor {
39    filter: crate::features::FiltersFeature,
40}
41
42#[cfg(feature = "filters")]
43impl LogProcessor for FilterProcessor {
44    fn process(&self, record: &dm_database_parser_sqllog::Sqllog) -> bool {
45        let meta = record.parse_meta();
46        self.filter.should_keep(
47            record.ts.as_ref(),
48            &meta.trxid,
49            &meta.client_ip,
50            &meta.sess_id,
51            &meta.thrd_id,
52            &meta.username,
53            &meta.statement,
54            &meta.appname,
55            record.tag.as_deref(),
56        )
57    }
58}
59
60fn process_log_file(
61    file_path: &str,
62    file_index: usize,
63    total_files: usize,
64    exporter_manager: &mut ExporterManager,
65    error_logger: &mut ErrorLogger,
66    pipeline: &Pipeline,
67) -> Result<()> {
68    let file_start = Instant::now();
69    eprintln!("[{file_index}/{total_files}] Processing: {file_path}");
70
71    let parser = LogParser::from_path(file_path).map_err(|e| {
72        Error::Parser(ParserError::InvalidPath {
73            path: file_path.into(),
74            reason: format!("{e}"),
75        })
76    })?;
77
78    let mut records_in_file = 0usize;
79    let mut errors_in_file = 0usize;
80    let mut batch = Vec::with_capacity(5000);
81
82    if pipeline.is_empty() {
83        // 快速路径:无处理器,零开销循环
84        for result in parser.iter() {
85            match result {
86                Ok(record) => {
87                    batch.push(record);
88                    if batch.len() >= 5000 {
89                        records_in_file += batch.len();
90                        exporter_manager.export_batch(&batch)?;
91                        batch.clear();
92                    }
93                }
94                Err(e) => {
95                    errors_in_file += 1;
96                    if !batch.is_empty() {
97                        records_in_file += batch.len();
98                        exporter_manager.export_batch(&batch)?;
99                        batch.clear();
100                    }
101                    if let Err(log_err) = error_logger.log_parse_error(file_path, &e) {
102                        warn!("Failed to record parse error: {log_err}");
103                    }
104                }
105            }
106        }
107    } else {
108        // 管线路径:每条记录经过处理器过滤
109        for result in parser.iter() {
110            match result {
111                Ok(record) => {
112                    if pipeline.run(&record) {
113                        batch.push(record);
114                        if batch.len() >= 5000 {
115                            records_in_file += batch.len();
116                            exporter_manager.export_batch(&batch)?;
117                            batch.clear();
118                        }
119                    }
120                }
121                Err(e) => {
122                    errors_in_file += 1;
123                    if !batch.is_empty() {
124                        records_in_file += batch.len();
125                        exporter_manager.export_batch(&batch)?;
126                        batch.clear();
127                    }
128                    if let Err(log_err) = error_logger.log_parse_error(file_path, &e) {
129                        warn!("Failed to record parse error: {log_err}");
130                    }
131                }
132            }
133        }
134    }
135
136    if !batch.is_empty() {
137        records_in_file += batch.len();
138        exporter_manager.export_batch(&batch)?;
139    }
140
141    info!(
142        "File {}: {} records, {} errors, total {:.2}s",
143        file_path,
144        records_in_file,
145        errors_in_file,
146        file_start.elapsed().as_secs_f64()
147    );
148
149    Ok(())
150}
151
152#[cfg(feature = "filters")]
153fn scan_log_file_for_trxids(
154    file_path: &str,
155    cfg: &Config,
156    remaining_exec_ids: &mut HashSet<i64>,
157    found_trxids: &mut HashSet<String>,
158) {
159    let Ok(parser) = LogParser::from_path(file_path) else {
160        return;
161    };
162    let filters = match &cfg.features.filters {
163        Some(f) if f.has_transaction_filters() => f,
164        _ => return,
165    };
166
167    for result in parser.iter().flatten() {
168        let mut matched = false;
169        if let Some(ind) = result.parse_indicators() {
170            #[allow(clippy::cast_possible_truncation)]
171            let runtime_ms = ind.exectime.round() as i64;
172            if filters
173                .indicators
174                .matches(ind.exec_id, runtime_ms, i64::from(ind.rowcount))
175            {
176                matched = true;
177                remaining_exec_ids.remove(&ind.exec_id);
178            }
179        }
180        if !matched && filters.sql.has_filters() && filters.sql.matches(result.body().as_ref()) {
181            matched = true;
182        }
183        if matched {
184            let meta = result.parse_meta();
185            found_trxids.insert(meta.trxid.to_string());
186            if remaining_exec_ids.is_empty()
187                && filters.indicators.min_runtime_ms.is_none()
188                && filters.indicators.min_row_count.is_none()
189                && !filters.sql.has_filters()
190            {
191                break;
192            }
193        }
194    }
195}
196
197#[cfg(feature = "filters")]
198fn scan_for_trxids_by_transaction_filters(
199    log_files: &[std::path::PathBuf],
200    cfg: &Config,
201) -> HashSet<String> {
202    let mut found_trxids = HashSet::new();
203    let mut remaining_exec_ids: HashSet<i64> = cfg
204        .features
205        .filters
206        .as_ref()
207        .and_then(|f| f.indicators.exec_ids.clone())
208        .unwrap_or_default()
209        .into_iter()
210        .collect();
211
212    eprintln!(
213        "Pre-scanning {} files for transaction-level filters...",
214        log_files.len()
215    );
216
217    for log_file in log_files {
218        scan_log_file_for_trxids(
219            &log_file.to_string_lossy(),
220            cfg,
221            &mut remaining_exec_ids,
222            &mut found_trxids,
223        );
224        if remaining_exec_ids.is_empty()
225            && !cfg.features.filters.as_ref().is_some_and(|f| {
226                f.indicators.min_runtime_ms.is_some()
227                    || f.indicators.min_row_count.is_some()
228                    || f.sql.has_filters()
229            })
230        {
231            break;
232        }
233    }
234    found_trxids
235}
236
237pub fn handle_run(cfg: &Config) -> Result<()> {
238    let total_start = Instant::now();
239    let log_files = SqllogParser::new(&cfg.sqllog.directory).log_files()?;
240    if log_files.is_empty() {
241        warn!("No log files found");
242        return Ok(());
243    }
244
245    #[cfg(feature = "filters")]
246    let mut final_cfg = cfg.clone();
247    #[cfg(feature = "filters")]
248    if cfg
249        .features
250        .filters
251        .as_ref()
252        .is_some_and(crate::features::FiltersFeature::has_transaction_filters)
253    {
254        let extra_trxids = scan_for_trxids_by_transaction_filters(&log_files, cfg);
255        if let Some(f) = &mut final_cfg.features.filters {
256            f.merge_found_trxids(extra_trxids.into_iter().collect());
257        }
258    }
259    #[cfg(feature = "filters")]
260    let final_cfg = &final_cfg;
261    #[cfg(not(feature = "filters"))]
262    let final_cfg = cfg;
263
264    let pipeline = build_pipeline(final_cfg);
265    let mut exporter_manager = ExporterManager::from_config(final_cfg)?;
266    let mut error_logger = ErrorLogger::new(&final_cfg.error.file)?;
267    exporter_manager.initialize()?;
268
269    info!("Parsing and exporting SQL logs...");
270    for (idx, log_file) in log_files.iter().enumerate() {
271        process_log_file(
272            &log_file.to_string_lossy(),
273            idx + 1,
274            log_files.len(),
275            &mut exporter_manager,
276            &mut error_logger,
277            &pipeline,
278        )?;
279    }
280
281    exporter_manager.finalize()?;
282    error_logger.finalize()?;
283
284    eprintln!(
285        "\n✓ SQL Log Export Task Completed in {:.2}s",
286        total_start.elapsed().as_secs_f64()
287    );
288    exporter_manager.log_stats();
289    Ok(())
290}