sql_splitter/redactor/
mod.rs1mod config;
11mod config_generator;
12mod matcher;
13pub mod strategy;
14
15pub use config::RedactConfig;
16#[allow(unused_imports)]
18pub use config::{RedactConfigBuilder, RedactYamlConfig, Rule};
19pub use config_generator::generate_config;
20pub use matcher::ColumnMatcher;
21pub use strategy::StrategyKind;
22
23use crate::parser::{Parser, SqlDialect, StatementType};
24use crate::schema::{Schema, SchemaBuilder};
25use ahash::AHashMap;
26use std::fs::File;
27use std::io::{BufWriter, Write};
28use std::path::Path;
29
30#[derive(Debug, Default, serde::Serialize)]
32pub struct RedactStats {
33 pub tables_processed: usize,
35 pub rows_redacted: u64,
37 pub columns_redacted: u64,
39 pub table_stats: Vec<TableRedactStats>,
41 pub warnings: Vec<String>,
43}
44
45#[derive(Debug, Clone, serde::Serialize)]
47pub struct TableRedactStats {
48 pub name: String,
49 pub rows_processed: u64,
50 pub columns_redacted: u64,
51}
52
53pub struct Redactor {
55 config: RedactConfig,
56 schema: Schema,
57 matcher: ColumnMatcher,
58 stats: RedactStats,
59}
60
61impl Redactor {
62 pub fn new(config: RedactConfig) -> anyhow::Result<Self> {
64 let schema = Self::build_schema(&config.input, config.dialect)?;
66
67 let matcher = ColumnMatcher::from_config(&config)?;
69
70 Ok(Self {
71 config,
72 schema,
73 matcher,
74 stats: RedactStats::default(),
75 })
76 }
77
78 fn build_schema(input: &Path, dialect: SqlDialect) -> anyhow::Result<Schema> {
80 let file = File::open(input)?;
81 let mut parser = Parser::with_dialect(file, 64 * 1024, dialect);
82 let mut builder = SchemaBuilder::new();
83
84 while let Some(stmt) = parser.read_statement()? {
85 let (stmt_type, _table_name) =
86 Parser::<&[u8]>::parse_statement_with_dialect(&stmt, dialect);
87
88 if stmt_type == StatementType::CreateTable {
89 let stmt_str = String::from_utf8_lossy(&stmt);
90 builder.parse_create_table(&stmt_str);
91 }
92 }
93
94 Ok(builder.build())
95 }
96
97 pub fn run(&mut self) -> anyhow::Result<RedactStats> {
99 if self.config.dry_run {
100 return self.dry_run();
101 }
102
103 let output: Box<dyn Write> = if let Some(ref path) = self.config.output {
105 Box::new(BufWriter::new(File::create(path)?))
106 } else {
107 Box::new(std::io::stdout())
108 };
109
110 self.process_file(output)?;
111
112 Ok(std::mem::take(&mut self.stats))
113 }
114
115 fn dry_run(&mut self) -> anyhow::Result<RedactStats> {
117 let file = File::open(&self.config.input)?;
118 let mut parser = Parser::with_dialect(file, 64 * 1024, self.config.dialect);
119
120 let mut tables_seen: AHashMap<String, u64> = AHashMap::new();
121
122 while let Some(stmt) = parser.read_statement()? {
123 let (stmt_type, table_name) =
124 Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.config.dialect);
125
126 if !table_name.is_empty()
127 && (stmt_type == StatementType::Insert || stmt_type == StatementType::Copy)
128 {
129 *tables_seen.entry(table_name).or_insert(0) += 1;
130 }
131 }
132
133 for (name, count) in tables_seen {
135 if let Some(table) = self.schema.get_table(&name) {
136 let columns_matched = self.matcher.count_matches(&name, table);
137 if columns_matched > 0 {
138 self.stats.tables_processed += 1;
139 self.stats.rows_redacted += count;
140 self.stats.columns_redacted += columns_matched as u64 * count;
141 self.stats.table_stats.push(TableRedactStats {
142 name,
143 rows_processed: count,
144 columns_redacted: columns_matched as u64,
145 });
146 }
147 }
148 }
149
150 Ok(std::mem::take(&mut self.stats))
151 }
152
153 fn process_file(&mut self, mut output: Box<dyn Write>) -> anyhow::Result<()> {
155 let file = File::open(&self.config.input)?;
156 let mut parser = Parser::with_dialect(file, 64 * 1024, self.config.dialect);
157
158 while let Some(stmt) = parser.read_statement()? {
159 let (stmt_type, table_name) =
160 Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.config.dialect);
161
162 let redacted = match stmt_type {
163 StatementType::Insert if !table_name.is_empty() => {
164 self.redact_insert(&stmt, &table_name)?
165 }
166 StatementType::Copy if !table_name.is_empty() => {
167 self.redact_copy(&stmt, &table_name)?
168 }
169 _ => stmt,
170 };
171
172 output.write_all(&redacted)?;
173 }
174
175 output.flush()?;
176 Ok(())
177 }
178
179 fn redact_insert(&mut self, stmt: &[u8], table_name: &str) -> anyhow::Result<Vec<u8>> {
181 if self.should_skip_table(table_name) {
183 return Ok(stmt.to_vec());
184 }
185
186 let Some(table) = self.schema.get_table(table_name) else {
188 self.stats.warnings.push(format!(
189 "No schema for table '{}', passing through unchanged",
190 table_name
191 ));
192 return Ok(stmt.to_vec());
193 };
194
195 let strategies = self.matcher.get_strategies(table_name, table);
197
198 if strategies.iter().all(|s| matches!(s, StrategyKind::Skip)) {
200 return Ok(stmt.to_vec());
201 }
202
203 Ok(stmt.to_vec())
206 }
207
208 fn redact_copy(&mut self, stmt: &[u8], table_name: &str) -> anyhow::Result<Vec<u8>> {
210 if self.should_skip_table(table_name) {
212 return Ok(stmt.to_vec());
213 }
214
215 let Some(table) = self.schema.get_table(table_name) else {
217 self.stats.warnings.push(format!(
218 "No schema for table '{}', passing through unchanged",
219 table_name
220 ));
221 return Ok(stmt.to_vec());
222 };
223
224 let strategies = self.matcher.get_strategies(table_name, table);
226
227 if strategies.iter().all(|s| matches!(s, StrategyKind::Skip)) {
229 return Ok(stmt.to_vec());
230 }
231
232 Ok(stmt.to_vec())
235 }
236
237 fn should_skip_table(&self, name: &str) -> bool {
239 if self
241 .config
242 .exclude
243 .iter()
244 .any(|e| e.eq_ignore_ascii_case(name))
245 {
246 return true;
247 }
248
249 if let Some(ref tables) = self.config.tables_filter {
251 if !tables.iter().any(|t| t.eq_ignore_ascii_case(name)) {
252 return true;
253 }
254 }
255
256 false
257 }
258}