Skip to main content

sql_splitter/analyzer/
mod.rs

1//! SQL dump analyzer for gathering per-table statistics.
2
3use crate::parser::{determine_buffer_size, Parser, SqlDialect, StatementType};
4use crate::progress::ProgressReader;
5use crate::splitter::Compression;
6use ahash::AHashMap;
7use serde::Serialize;
8use std::fs::File;
9use std::io::Read;
10use std::path::PathBuf;
11
12/// Per-table statistics gathered during analysis.
13#[derive(Debug, Clone, Serialize)]
14pub struct TableStats {
15    /// Name of the table.
16    pub table_name: String,
17    /// Number of INSERT (or COPY) statements for this table.
18    pub insert_count: u64,
19    /// Number of CREATE TABLE statements for this table.
20    pub create_count: u64,
21    /// Total bytes of all statements for this table.
22    pub total_bytes: u64,
23    /// Total number of statements for this table.
24    pub statement_count: u64,
25}
26
27impl TableStats {
28    fn new(table_name: String) -> Self {
29        Self {
30            table_name,
31            insert_count: 0,
32            create_count: 0,
33            total_bytes: 0,
34            statement_count: 0,
35        }
36    }
37}
38
39/// Streaming SQL dump analyzer that gathers per-table statistics.
40pub struct Analyzer {
41    input_file: PathBuf,
42    dialect: SqlDialect,
43    stats: AHashMap<String, TableStats>,
44}
45
46impl Analyzer {
47    /// Create a new analyzer for the given input file.
48    pub fn new(input_file: PathBuf) -> Self {
49        Self {
50            input_file,
51            dialect: SqlDialect::default(),
52            stats: AHashMap::new(),
53        }
54    }
55
56    /// Set the SQL dialect for parsing.
57    pub fn with_dialect(mut self, dialect: SqlDialect) -> Self {
58        self.dialect = dialect;
59        self
60    }
61
62    /// Run the analysis, returning sorted table statistics.
63    pub fn analyze(mut self) -> anyhow::Result<Vec<TableStats>> {
64        let file = File::open(&self.input_file)?;
65        let file_size = file.metadata()?.len();
66        let buffer_size = determine_buffer_size(file_size);
67        let dialect = self.dialect;
68
69        // Detect and apply decompression
70        let compression = Compression::from_path(&self.input_file);
71        let reader: Box<dyn Read> = compression.wrap_reader(Box::new(file))?;
72
73        let mut parser = Parser::with_dialect(reader, buffer_size, dialect);
74
75        while let Some(stmt) = parser.read_statement()? {
76            let (stmt_type, table_name) =
77                Parser::<&[u8]>::parse_statement_with_dialect(&stmt, dialect);
78
79            if stmt_type == StatementType::Unknown || table_name.is_empty() {
80                continue;
81            }
82
83            self.update_stats(&table_name, stmt_type, stmt.len() as u64);
84        }
85
86        Ok(self.get_sorted_stats())
87    }
88
89    /// Run the analysis with a progress callback, returning sorted table statistics.
90    pub fn analyze_with_progress<F: Fn(u64) + 'static>(
91        mut self,
92        progress_fn: F,
93    ) -> anyhow::Result<Vec<TableStats>> {
94        let file = File::open(&self.input_file)?;
95        let file_size = file.metadata()?.len();
96        let buffer_size = determine_buffer_size(file_size);
97        let dialect = self.dialect;
98
99        // Detect and apply decompression
100        let compression = Compression::from_path(&self.input_file);
101        let progress_reader = ProgressReader::new(file, progress_fn);
102        let reader: Box<dyn Read> = compression.wrap_reader(Box::new(progress_reader))?;
103
104        let mut parser = Parser::with_dialect(reader, buffer_size, dialect);
105
106        while let Some(stmt) = parser.read_statement()? {
107            let (stmt_type, table_name) =
108                Parser::<&[u8]>::parse_statement_with_dialect(&stmt, dialect);
109
110            if stmt_type == StatementType::Unknown || table_name.is_empty() {
111                continue;
112            }
113
114            self.update_stats(&table_name, stmt_type, stmt.len() as u64);
115        }
116
117        Ok(self.get_sorted_stats())
118    }
119
120    fn update_stats(&mut self, table_name: &str, stmt_type: StatementType, bytes: u64) {
121        let stats = self
122            .stats
123            .entry(table_name.to_string())
124            .or_insert_with(|| TableStats::new(table_name.to_string()));
125
126        stats.statement_count += 1;
127        stats.total_bytes += bytes;
128
129        match stmt_type {
130            StatementType::CreateTable => stats.create_count += 1,
131            StatementType::Insert | StatementType::Copy => stats.insert_count += 1,
132            _ => {}
133        }
134    }
135
136    fn get_sorted_stats(&self) -> Vec<TableStats> {
137        let mut result: Vec<TableStats> = self.stats.values().cloned().collect();
138        result.sort_by(|a, b| b.insert_count.cmp(&a.insert_count));
139        result
140    }
141}