dm_database_sqllog2db/cli/
run.rs1use crate::config::Config;
2use crate::error::ParserError;
3use crate::error::{Error, Result};
4use crate::error_logger::ErrorLogger;
5use crate::exporter::ExporterManager;
6use crate::features::Pipeline;
7use crate::parser::SqllogParser;
8use dm_database_parser_sqllog::LogParser;
9use log::{info, warn};
10use std::time::Instant;
11
12#[cfg(feature = "filters")]
13use std::collections::HashSet;
14
15#[cfg(feature = "filters")]
16use crate::features::LogProcessor;
17
18fn build_pipeline(cfg: &Config) -> Pipeline {
20 #[allow(unused_mut)]
21 let mut pipeline = Pipeline::new();
22
23 #[cfg(feature = "filters")]
24 if let Some(f) = &cfg.features.filters {
25 if f.has_filters() {
26 pipeline.add(Box::new(FilterProcessor { filter: f.clone() }));
27 }
28 }
29
30 #[cfg(not(feature = "filters"))]
31 let _ = cfg;
32
33 pipeline
34}
35
36#[cfg(feature = "filters")]
37#[derive(Debug)]
38struct FilterProcessor {
39 filter: crate::features::FiltersFeature,
40}
41
42#[cfg(feature = "filters")]
43impl LogProcessor for FilterProcessor {
44 fn process(&self, record: &dm_database_parser_sqllog::Sqllog) -> bool {
45 let meta = record.parse_meta();
46 self.filter.should_keep(
47 record.ts.as_ref(),
48 &meta.trxid,
49 &meta.client_ip,
50 &meta.sess_id,
51 &meta.thrd_id,
52 &meta.username,
53 &meta.statement,
54 &meta.appname,
55 record.tag.as_deref(),
56 )
57 }
58}
59
60fn process_log_file(
61 file_path: &str,
62 file_index: usize,
63 total_files: usize,
64 exporter_manager: &mut ExporterManager,
65 error_logger: &mut ErrorLogger,
66 pipeline: &Pipeline,
67) -> Result<()> {
68 let file_start = Instant::now();
69 eprintln!("[{file_index}/{total_files}] Processing: {file_path}");
70
71 let parser = LogParser::from_path(file_path).map_err(|e| {
72 Error::Parser(ParserError::InvalidPath {
73 path: file_path.into(),
74 reason: format!("{e}"),
75 })
76 })?;
77
78 let mut records_in_file = 0usize;
79 let mut errors_in_file = 0usize;
80 let mut batch = Vec::with_capacity(5000);
81
82 if pipeline.is_empty() {
83 for result in parser.iter() {
85 match result {
86 Ok(record) => {
87 batch.push(record);
88 if batch.len() >= 5000 {
89 records_in_file += batch.len();
90 exporter_manager.export_batch(&batch)?;
91 batch.clear();
92 }
93 }
94 Err(e) => {
95 errors_in_file += 1;
96 if !batch.is_empty() {
97 records_in_file += batch.len();
98 exporter_manager.export_batch(&batch)?;
99 batch.clear();
100 }
101 if let Err(log_err) = error_logger.log_parse_error(file_path, &e) {
102 warn!("Failed to record parse error: {log_err}");
103 }
104 }
105 }
106 }
107 } else {
108 for result in parser.iter() {
110 match result {
111 Ok(record) => {
112 if pipeline.run(&record) {
113 batch.push(record);
114 if batch.len() >= 5000 {
115 records_in_file += batch.len();
116 exporter_manager.export_batch(&batch)?;
117 batch.clear();
118 }
119 }
120 }
121 Err(e) => {
122 errors_in_file += 1;
123 if !batch.is_empty() {
124 records_in_file += batch.len();
125 exporter_manager.export_batch(&batch)?;
126 batch.clear();
127 }
128 if let Err(log_err) = error_logger.log_parse_error(file_path, &e) {
129 warn!("Failed to record parse error: {log_err}");
130 }
131 }
132 }
133 }
134 }
135
136 if !batch.is_empty() {
137 records_in_file += batch.len();
138 exporter_manager.export_batch(&batch)?;
139 }
140
141 info!(
142 "File {}: {} records, {} errors, total {:.2}s",
143 file_path,
144 records_in_file,
145 errors_in_file,
146 file_start.elapsed().as_secs_f64()
147 );
148
149 Ok(())
150}
151
152#[cfg(feature = "filters")]
153fn scan_log_file_for_trxids(
154 file_path: &str,
155 cfg: &Config,
156 remaining_exec_ids: &mut HashSet<i64>,
157 found_trxids: &mut HashSet<String>,
158) {
159 let Ok(parser) = LogParser::from_path(file_path) else {
160 return;
161 };
162 let filters = match &cfg.features.filters {
163 Some(f) if f.has_transaction_filters() => f,
164 _ => return,
165 };
166
167 for result in parser.iter().flatten() {
168 let mut matched = false;
169 if let Some(ind) = result.parse_indicators() {
170 #[allow(clippy::cast_possible_truncation)]
171 let runtime_ms = ind.exectime.round() as i64;
172 if filters
173 .indicators
174 .matches(ind.exec_id, runtime_ms, i64::from(ind.rowcount))
175 {
176 matched = true;
177 remaining_exec_ids.remove(&ind.exec_id);
178 }
179 }
180 if !matched && filters.sql.has_filters() && filters.sql.matches(result.body().as_ref()) {
181 matched = true;
182 }
183 if matched {
184 let meta = result.parse_meta();
185 found_trxids.insert(meta.trxid.to_string());
186 if remaining_exec_ids.is_empty()
187 && filters.indicators.min_runtime_ms.is_none()
188 && filters.indicators.min_row_count.is_none()
189 && !filters.sql.has_filters()
190 {
191 break;
192 }
193 }
194 }
195}
196
197#[cfg(feature = "filters")]
198fn scan_for_trxids_by_transaction_filters(
199 log_files: &[std::path::PathBuf],
200 cfg: &Config,
201) -> HashSet<String> {
202 let mut found_trxids = HashSet::new();
203 let mut remaining_exec_ids: HashSet<i64> = cfg
204 .features
205 .filters
206 .as_ref()
207 .and_then(|f| f.indicators.exec_ids.clone())
208 .unwrap_or_default()
209 .into_iter()
210 .collect();
211
212 eprintln!(
213 "Pre-scanning {} files for transaction-level filters...",
214 log_files.len()
215 );
216
217 for log_file in log_files {
218 scan_log_file_for_trxids(
219 &log_file.to_string_lossy(),
220 cfg,
221 &mut remaining_exec_ids,
222 &mut found_trxids,
223 );
224 if remaining_exec_ids.is_empty()
225 && !cfg.features.filters.as_ref().is_some_and(|f| {
226 f.indicators.min_runtime_ms.is_some()
227 || f.indicators.min_row_count.is_some()
228 || f.sql.has_filters()
229 })
230 {
231 break;
232 }
233 }
234 found_trxids
235}
236
237pub fn handle_run(cfg: &Config) -> Result<()> {
238 let total_start = Instant::now();
239 let log_files = SqllogParser::new(&cfg.sqllog.directory).log_files()?;
240 if log_files.is_empty() {
241 warn!("No log files found");
242 return Ok(());
243 }
244
245 #[cfg(feature = "filters")]
246 let mut final_cfg = cfg.clone();
247 #[cfg(feature = "filters")]
248 if cfg
249 .features
250 .filters
251 .as_ref()
252 .is_some_and(crate::features::FiltersFeature::has_transaction_filters)
253 {
254 let extra_trxids = scan_for_trxids_by_transaction_filters(&log_files, cfg);
255 if let Some(f) = &mut final_cfg.features.filters {
256 f.merge_found_trxids(extra_trxids.into_iter().collect());
257 }
258 }
259 #[cfg(feature = "filters")]
260 let final_cfg = &final_cfg;
261 #[cfg(not(feature = "filters"))]
262 let final_cfg = cfg;
263
264 let pipeline = build_pipeline(final_cfg);
265 let mut exporter_manager = ExporterManager::from_config(final_cfg)?;
266 let mut error_logger = ErrorLogger::new(&final_cfg.error.file)?;
267 exporter_manager.initialize()?;
268
269 info!("Parsing and exporting SQL logs...");
270 for (idx, log_file) in log_files.iter().enumerate() {
271 process_log_file(
272 &log_file.to_string_lossy(),
273 idx + 1,
274 log_files.len(),
275 &mut exporter_manager,
276 &mut error_logger,
277 &pipeline,
278 )?;
279 }
280
281 exporter_manager.finalize()?;
282 error_logger.finalize()?;
283
284 eprintln!(
285 "\n✓ SQL Log Export Task Completed in {:.2}s",
286 total_start.elapsed().as_secs_f64()
287 );
288 exporter_manager.log_stats();
289 Ok(())
290}