1mod config;
11mod config_generator;
12mod matcher;
13mod rewriter;
14pub mod strategy;
15
16pub use config::RedactConfig;
17#[allow(unused_imports)]
19pub use config::{RedactConfigBuilder, RedactYamlConfig, Rule};
20pub use config_generator::generate_config;
21pub use matcher::ColumnMatcher;
22pub use rewriter::ValueRewriter;
23pub use strategy::StrategyKind;
24
25use crate::parser::postgres_copy::parse_copy_columns;
26use crate::parser::{Parser, SqlDialect, StatementType};
27use crate::schema::{Schema, SchemaBuilder};
28use ahash::AHashMap;
29use std::fs::File;
30use std::io::{BufWriter, Write};
31use std::path::Path;
32
33#[derive(Debug, Default, serde::Serialize)]
35pub struct RedactStats {
36 pub tables_processed: usize,
38 pub rows_redacted: u64,
40 pub columns_redacted: u64,
42 pub table_stats: Vec<TableRedactStats>,
44 pub warnings: Vec<String>,
46}
47
48#[derive(Debug, Clone, serde::Serialize)]
50pub struct TableRedactStats {
51 pub name: String,
52 pub rows_processed: u64,
53 pub columns_redacted: u64,
54}
55
56pub struct Redactor {
58 config: RedactConfig,
59 schema: Schema,
60 matcher: ColumnMatcher,
61 rewriter: ValueRewriter,
62 stats: RedactStats,
63 pending_copy: Option<PendingCopy>,
65}
66
67struct PendingCopy {
69 header: Vec<u8>,
70 table_name: String,
71 columns: Vec<String>,
72}
73
74impl Redactor {
75 pub fn new(config: RedactConfig) -> anyhow::Result<Self> {
77 let schema = Self::build_schema(&config.input, config.dialect)?;
79
80 let matcher = ColumnMatcher::from_config(&config)?;
82
83 let rewriter = ValueRewriter::new(config.seed, config.dialect, config.locale.clone());
85
86 Ok(Self {
87 config,
88 schema,
89 matcher,
90 rewriter,
91 stats: RedactStats::default(),
92 pending_copy: None,
93 })
94 }
95
96 fn build_schema(input: &Path, dialect: SqlDialect) -> anyhow::Result<Schema> {
98 let file = File::open(input)?;
99 let mut parser = Parser::with_dialect(file, 64 * 1024, dialect);
100 let mut builder = SchemaBuilder::new();
101
102 while let Some(stmt) = parser.read_statement()? {
103 let (stmt_type, _table_name) =
104 Parser::<&[u8]>::parse_statement_with_dialect(&stmt, dialect);
105
106 if stmt_type == StatementType::CreateTable {
107 let stmt_str = String::from_utf8_lossy(&stmt);
108 builder.parse_create_table(&stmt_str);
109 }
110 }
111
112 Ok(builder.build())
113 }
114
115 pub fn run(&mut self) -> anyhow::Result<RedactStats> {
117 if self.config.dry_run {
118 return self.dry_run();
119 }
120
121 let output: Box<dyn Write> = if let Some(ref path) = self.config.output {
123 Box::new(BufWriter::new(File::create(path)?))
124 } else {
125 Box::new(std::io::stdout())
126 };
127
128 self.process_file(output)?;
129
130 Ok(std::mem::take(&mut self.stats))
131 }
132
133 fn dry_run(&mut self) -> anyhow::Result<RedactStats> {
135 let file = File::open(&self.config.input)?;
136 let mut parser = Parser::with_dialect(file, 64 * 1024, self.config.dialect);
137
138 let mut tables_seen: AHashMap<String, u64> = AHashMap::new();
139
140 while let Some(stmt) = parser.read_statement()? {
141 let (stmt_type, table_name) =
142 Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.config.dialect);
143
144 if !table_name.is_empty()
145 && (stmt_type == StatementType::Insert || stmt_type == StatementType::Copy)
146 {
147 *tables_seen.entry(table_name).or_insert(0) += 1;
148 }
149 }
150
151 for (name, count) in tables_seen {
153 if let Some(table) = self.schema.get_table(&name) {
154 let columns_matched = self.matcher.count_matches(&name, table);
155 if columns_matched > 0 {
156 self.stats.tables_processed += 1;
157 self.stats.rows_redacted += count;
158 self.stats.columns_redacted += columns_matched as u64 * count;
159 self.stats.table_stats.push(TableRedactStats {
160 name,
161 rows_processed: count,
162 columns_redacted: columns_matched as u64,
163 });
164 }
165 }
166 }
167
168 Ok(std::mem::take(&mut self.stats))
169 }
170
171 fn process_file(&mut self, mut output: Box<dyn Write>) -> anyhow::Result<()> {
173 let file = File::open(&self.config.input)?;
174 let mut parser = Parser::with_dialect(file, 64 * 1024, self.config.dialect);
175
176 while let Some(stmt) = parser.read_statement()? {
177 let (stmt_type, table_name) =
178 Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.config.dialect);
179
180 let redacted = match stmt_type {
181 StatementType::Insert if !table_name.is_empty() => {
182 self.redact_insert(&stmt, &table_name)?
183 }
184 StatementType::Copy if !table_name.is_empty() => {
185 if self.config.dialect == SqlDialect::Postgres {
187 let header_str = String::from_utf8_lossy(&stmt);
188 let columns = parse_copy_columns(&header_str);
189 self.pending_copy = Some(PendingCopy {
190 header: stmt.clone(),
191 table_name: table_name.clone(),
192 columns,
193 });
194 continue;
196 }
197 self.redact_copy(&stmt, &table_name)?
198 }
199 StatementType::Unknown
200 if self.config.dialect == SqlDialect::Postgres
201 && self.pending_copy.is_some()
202 && (stmt.ends_with(b"\\.\n") || stmt.ends_with(b"\\.\r\n")) =>
203 {
204 self.redact_copy_data(&stmt)?
206 }
207 _ => {
208 if let Some(pending) = self.pending_copy.take() {
211 output.write_all(&pending.header)?;
212 }
213 stmt
214 }
215 };
216
217 output.write_all(&redacted)?;
218 }
219
220 if let Some(pending) = self.pending_copy.take() {
222 output.write_all(&pending.header)?;
223 }
224
225 output.flush()?;
226 Ok(())
227 }
228
229 fn redact_insert(&mut self, stmt: &[u8], table_name: &str) -> anyhow::Result<Vec<u8>> {
231 if self.should_skip_table(table_name) {
233 return Ok(stmt.to_vec());
234 }
235
236 let Some(table) = self.schema.get_table(table_name) else {
238 self.stats.warnings.push(format!(
239 "No schema for table '{}', passing through unchanged",
240 table_name
241 ));
242 return Ok(stmt.to_vec());
243 };
244
245 let strategies = self.matcher.get_strategies(table_name, table);
247
248 if strategies.iter().all(|s| matches!(s, StrategyKind::Skip)) {
250 return Ok(stmt.to_vec());
251 }
252
253 let (redacted, rows_redacted, cols_redacted) =
255 self.rewriter.rewrite_insert(stmt, table_name, table, &strategies)?;
256
257 if rows_redacted > 0 {
259 self.stats.rows_redacted += rows_redacted;
260 self.stats.columns_redacted += cols_redacted;
261
262 if let Some(ts) = self.stats.table_stats.iter_mut().find(|t| t.name == table_name) {
264 ts.rows_processed += rows_redacted;
265 ts.columns_redacted += cols_redacted;
266 } else {
267 self.stats.tables_processed += 1;
268 self.stats.table_stats.push(TableRedactStats {
269 name: table_name.to_string(),
270 rows_processed: rows_redacted,
271 columns_redacted: cols_redacted,
272 });
273 }
274 }
275
276 Ok(redacted)
277 }
278
279 fn redact_copy(&mut self, stmt: &[u8], table_name: &str) -> anyhow::Result<Vec<u8>> {
281 if self.should_skip_table(table_name) {
283 return Ok(stmt.to_vec());
284 }
285
286 let Some(table) = self.schema.get_table(table_name) else {
288 self.stats.warnings.push(format!(
289 "No schema for table '{}', passing through unchanged",
290 table_name
291 ));
292 return Ok(stmt.to_vec());
293 };
294
295 let strategies = self.matcher.get_strategies(table_name, table);
297
298 if strategies.iter().all(|s| matches!(s, StrategyKind::Skip)) {
300 return Ok(stmt.to_vec());
301 }
302
303 let (redacted, rows_redacted, cols_redacted) =
305 self.rewriter.rewrite_copy(stmt, table_name, table, &strategies)?;
306
307 if rows_redacted > 0 {
309 self.stats.rows_redacted += rows_redacted;
310 self.stats.columns_redacted += cols_redacted;
311
312 if let Some(ts) = self.stats.table_stats.iter_mut().find(|t| t.name == table_name) {
314 ts.rows_processed += rows_redacted;
315 ts.columns_redacted += cols_redacted;
316 } else {
317 self.stats.tables_processed += 1;
318 self.stats.table_stats.push(TableRedactStats {
319 name: table_name.to_string(),
320 rows_processed: rows_redacted,
321 columns_redacted: cols_redacted,
322 });
323 }
324 }
325
326 Ok(redacted)
327 }
328
329 fn redact_copy_data(&mut self, data_block: &[u8]) -> anyhow::Result<Vec<u8>> {
331 let pending = self.pending_copy.take().ok_or_else(|| {
332 anyhow::anyhow!("COPY data block without pending header")
333 })?;
334
335 let table_name = &pending.table_name;
336
337 if self.should_skip_table(table_name) {
339 let mut result = pending.header;
341 result.extend_from_slice(data_block);
342 return Ok(result);
343 }
344
345 let Some(table) = self.schema.get_table(table_name) else {
347 self.stats.warnings.push(format!(
348 "No schema for table '{}', passing through unchanged",
349 table_name
350 ));
351 let mut result = pending.header;
352 result.extend_from_slice(data_block);
353 return Ok(result);
354 };
355
356 let strategies = self.matcher.get_strategies(table_name, table);
358
359 if strategies.iter().all(|s| matches!(s, StrategyKind::Skip)) {
361 let mut result = pending.header;
362 result.extend_from_slice(data_block);
363 return Ok(result);
364 }
365
366 let (redacted_data, rows_redacted, cols_redacted) =
368 self.rewriter.rewrite_copy_data(data_block, table, &strategies, &pending.columns)?;
369
370 if rows_redacted > 0 {
372 self.stats.rows_redacted += rows_redacted;
373 self.stats.columns_redacted += cols_redacted;
374
375 if let Some(ts) = self.stats.table_stats.iter_mut().find(|t| t.name == *table_name) {
376 ts.rows_processed += rows_redacted;
377 ts.columns_redacted += cols_redacted;
378 } else {
379 self.stats.tables_processed += 1;
380 self.stats.table_stats.push(TableRedactStats {
381 name: table_name.to_string(),
382 rows_processed: rows_redacted,
383 columns_redacted: cols_redacted,
384 });
385 }
386 }
387
388 let mut result = pending.header;
391 if !result.ends_with(b"\n") {
392 result.push(b'\n');
393 }
394 result.extend_from_slice(&redacted_data);
395 Ok(result)
396 }
397
398 fn should_skip_table(&self, name: &str) -> bool {
400 if self
402 .config
403 .exclude
404 .iter()
405 .any(|e| e.eq_ignore_ascii_case(name))
406 {
407 return true;
408 }
409
410 if let Some(ref tables) = self.config.tables_filter {
412 if !tables.iter().any(|t| t.eq_ignore_ascii_case(name)) {
413 return true;
414 }
415 }
416
417 false
418 }
419}