sql_splitter/analyzer/
mod.rs1use crate::parser::{determine_buffer_size, Parser, SqlDialect, StatementType};
4use crate::progress::ProgressReader;
5use crate::splitter::Compression;
6use ahash::AHashMap;
7use serde::Serialize;
8use std::fs::File;
9use std::io::Read;
10use std::path::PathBuf;
11
12#[derive(Debug, Clone, Serialize)]
14pub struct TableStats {
15 pub table_name: String,
17 pub insert_count: u64,
19 pub create_count: u64,
21 pub total_bytes: u64,
23 pub statement_count: u64,
25}
26
27impl TableStats {
28 fn new(table_name: String) -> Self {
29 Self {
30 table_name,
31 insert_count: 0,
32 create_count: 0,
33 total_bytes: 0,
34 statement_count: 0,
35 }
36 }
37}
38
39pub struct Analyzer {
41 input_file: PathBuf,
42 dialect: SqlDialect,
43 stats: AHashMap<String, TableStats>,
44}
45
46impl Analyzer {
47 pub fn new(input_file: PathBuf) -> Self {
49 Self {
50 input_file,
51 dialect: SqlDialect::default(),
52 stats: AHashMap::new(),
53 }
54 }
55
56 pub fn with_dialect(mut self, dialect: SqlDialect) -> Self {
58 self.dialect = dialect;
59 self
60 }
61
62 pub fn analyze(mut self) -> anyhow::Result<Vec<TableStats>> {
64 let file = File::open(&self.input_file)?;
65 let file_size = file.metadata()?.len();
66 let buffer_size = determine_buffer_size(file_size);
67 let dialect = self.dialect;
68
69 let compression = Compression::from_path(&self.input_file);
71 let reader: Box<dyn Read> = compression.wrap_reader(Box::new(file))?;
72
73 let mut parser = Parser::with_dialect(reader, buffer_size, dialect);
74
75 while let Some(stmt) = parser.read_statement()? {
76 let (stmt_type, table_name) =
77 Parser::<&[u8]>::parse_statement_with_dialect(&stmt, dialect);
78
79 if stmt_type == StatementType::Unknown || table_name.is_empty() {
80 continue;
81 }
82
83 self.update_stats(&table_name, stmt_type, stmt.len() as u64);
84 }
85
86 Ok(self.get_sorted_stats())
87 }
88
89 pub fn analyze_with_progress<F: Fn(u64) + 'static>(
91 mut self,
92 progress_fn: F,
93 ) -> anyhow::Result<Vec<TableStats>> {
94 let file = File::open(&self.input_file)?;
95 let file_size = file.metadata()?.len();
96 let buffer_size = determine_buffer_size(file_size);
97 let dialect = self.dialect;
98
99 let compression = Compression::from_path(&self.input_file);
101 let progress_reader = ProgressReader::new(file, progress_fn);
102 let reader: Box<dyn Read> = compression.wrap_reader(Box::new(progress_reader))?;
103
104 let mut parser = Parser::with_dialect(reader, buffer_size, dialect);
105
106 while let Some(stmt) = parser.read_statement()? {
107 let (stmt_type, table_name) =
108 Parser::<&[u8]>::parse_statement_with_dialect(&stmt, dialect);
109
110 if stmt_type == StatementType::Unknown || table_name.is_empty() {
111 continue;
112 }
113
114 self.update_stats(&table_name, stmt_type, stmt.len() as u64);
115 }
116
117 Ok(self.get_sorted_stats())
118 }
119
120 fn update_stats(&mut self, table_name: &str, stmt_type: StatementType, bytes: u64) {
121 let stats = self
122 .stats
123 .entry(table_name.to_string())
124 .or_insert_with(|| TableStats::new(table_name.to_string()));
125
126 stats.statement_count += 1;
127 stats.total_bytes += bytes;
128
129 match stmt_type {
130 StatementType::CreateTable => stats.create_count += 1,
131 StatementType::Insert | StatementType::Copy => stats.insert_count += 1,
132 _ => {}
133 }
134 }
135
136 fn get_sorted_stats(&self) -> Vec<TableStats> {
137 let mut result: Vec<TableStats> = self.stats.values().cloned().collect();
138 result.sort_by(|a, b| b.insert_count.cmp(&a.insert_count));
139 result
140 }
141}