1mod config;
11mod config_generator;
12mod matcher;
13mod rewriter;
14pub mod strategy;
15
16pub use config::RedactConfig;
17#[allow(unused_imports)] pub use config::{RedactConfigBuilder, RedactYamlConfig, Rule};
19pub use config_generator::generate_config;
20pub use matcher::ColumnMatcher;
21pub use rewriter::ValueRewriter;
22pub use strategy::StrategyKind;
23
24use crate::parser::postgres_copy::parse_copy_columns;
25use crate::parser::{Parser, SqlDialect, StatementType};
26use crate::schema::{Schema, SchemaBuilder};
27use ahash::AHashMap;
28use schemars::JsonSchema;
29use std::fs::File;
30use std::io::{BufWriter, Write};
31use std::path::Path;
32
33#[derive(Debug, Default, serde::Serialize, JsonSchema)]
35pub struct RedactStats {
36 pub tables_processed: usize,
38 pub rows_redacted: u64,
40 pub columns_redacted: u64,
42 pub table_stats: Vec<TableRedactStats>,
44 pub warnings: Vec<String>,
46}
47
48#[derive(Debug, Clone, serde::Serialize, JsonSchema)]
50pub struct TableRedactStats {
51 pub name: String,
52 pub rows_processed: u64,
53 pub columns_redacted: u64,
54}
55
56pub struct Redactor {
58 config: RedactConfig,
59 schema: Schema,
60 matcher: ColumnMatcher,
61 rewriter: ValueRewriter,
62 stats: RedactStats,
63 pending_copy: Option<PendingCopy>,
65}
66
67struct PendingCopy {
69 header: Vec<u8>,
70 table_name: String,
71 columns: Vec<String>,
72}
73
74impl Redactor {
75 pub fn new(config: RedactConfig) -> anyhow::Result<Self> {
77 let schema = Self::build_schema(&config.input, config.dialect)?;
79
80 let matcher = ColumnMatcher::from_config(&config)?;
82
83 let rewriter = ValueRewriter::new(config.seed, config.dialect, config.locale.clone());
85
86 Ok(Self {
87 config,
88 schema,
89 matcher,
90 rewriter,
91 stats: RedactStats::default(),
92 pending_copy: None,
93 })
94 }
95
96 fn build_schema(input: &Path, dialect: SqlDialect) -> anyhow::Result<Schema> {
98 let file = File::open(input)?;
99 let mut parser = Parser::with_dialect(file, 64 * 1024, dialect);
100 let mut builder = SchemaBuilder::new();
101
102 while let Some(stmt) = parser.read_statement()? {
103 let (stmt_type, _table_name) =
104 Parser::<&[u8]>::parse_statement_with_dialect(&stmt, dialect);
105
106 if stmt_type == StatementType::CreateTable {
107 let stmt_str = String::from_utf8_lossy(&stmt);
108 builder.parse_create_table(&stmt_str);
109 }
110 }
111
112 Ok(builder.build())
113 }
114
115 pub fn run(&mut self) -> anyhow::Result<RedactStats> {
117 if self.config.dry_run {
118 return self.dry_run();
119 }
120
121 let output: Box<dyn Write> = if let Some(ref path) = self.config.output {
123 Box::new(BufWriter::new(File::create(path)?))
124 } else {
125 Box::new(std::io::stdout())
126 };
127
128 self.process_file(output)?;
129
130 Ok(std::mem::take(&mut self.stats))
131 }
132
133 fn dry_run(&mut self) -> anyhow::Result<RedactStats> {
135 let file = File::open(&self.config.input)?;
136 let mut parser = Parser::with_dialect(file, 64 * 1024, self.config.dialect);
137
138 let mut tables_seen: AHashMap<String, u64> = AHashMap::new();
139
140 while let Some(stmt) = parser.read_statement()? {
141 let (stmt_type, table_name) =
142 Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.config.dialect);
143
144 if !table_name.is_empty()
145 && (stmt_type == StatementType::Insert || stmt_type == StatementType::Copy)
146 {
147 *tables_seen.entry(table_name).or_insert(0) += 1;
148 }
149 }
150
151 for (name, count) in tables_seen {
153 if let Some(table) = self.schema.get_table(&name) {
154 let columns_matched = self.matcher.count_matches(&name, table);
155 if columns_matched > 0 {
156 self.stats.tables_processed += 1;
157 self.stats.rows_redacted += count;
158 self.stats.columns_redacted += columns_matched as u64 * count;
159 self.stats.table_stats.push(TableRedactStats {
160 name,
161 rows_processed: count,
162 columns_redacted: columns_matched as u64,
163 });
164 }
165 }
166 }
167
168 Ok(std::mem::take(&mut self.stats))
169 }
170
171 fn process_file(&mut self, mut output: Box<dyn Write>) -> anyhow::Result<()> {
173 let file = File::open(&self.config.input)?;
174 let mut parser = Parser::with_dialect(file, 64 * 1024, self.config.dialect);
175
176 while let Some(stmt) = parser.read_statement()? {
177 let (stmt_type, table_name) =
178 Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.config.dialect);
179
180 let redacted = match stmt_type {
181 StatementType::Insert if !table_name.is_empty() => {
182 self.redact_insert(&stmt, &table_name)?
183 }
184 StatementType::Copy if !table_name.is_empty() => {
185 if self.config.dialect == SqlDialect::Postgres {
187 let header_str = String::from_utf8_lossy(&stmt);
188 let columns = parse_copy_columns(&header_str);
189 self.pending_copy = Some(PendingCopy {
190 header: stmt.clone(),
191 table_name: table_name.clone(),
192 columns,
193 });
194 continue;
196 }
197 self.redact_copy(&stmt, &table_name)?
198 }
199 StatementType::Unknown
200 if self.config.dialect == SqlDialect::Postgres
201 && self.pending_copy.is_some()
202 && (stmt.ends_with(b"\\.\n") || stmt.ends_with(b"\\.\r\n")) =>
203 {
204 self.redact_copy_data(&stmt)?
206 }
207 _ => {
208 if let Some(pending) = self.pending_copy.take() {
211 output.write_all(&pending.header)?;
212 }
213 stmt
214 }
215 };
216
217 output.write_all(&redacted)?;
218 }
219
220 if let Some(pending) = self.pending_copy.take() {
222 output.write_all(&pending.header)?;
223 }
224
225 output.flush()?;
226 Ok(())
227 }
228
229 fn redact_insert(&mut self, stmt: &[u8], table_name: &str) -> anyhow::Result<Vec<u8>> {
231 if self.should_skip_table(table_name) {
233 return Ok(stmt.to_vec());
234 }
235
236 let Some(table) = self.schema.get_table(table_name) else {
238 self.stats.warnings.push(format!(
239 "No schema for table '{}', passing through unchanged",
240 table_name
241 ));
242 return Ok(stmt.to_vec());
243 };
244
245 let strategies = self.matcher.get_strategies(table_name, table);
247
248 if strategies.iter().all(|s| matches!(s, StrategyKind::Skip)) {
250 return Ok(stmt.to_vec());
251 }
252
253 let (redacted, rows_redacted, cols_redacted) =
255 self.rewriter
256 .rewrite_insert(stmt, table_name, table, &strategies)?;
257
258 if rows_redacted > 0 {
260 self.stats.rows_redacted += rows_redacted;
261 self.stats.columns_redacted += cols_redacted;
262
263 if let Some(ts) = self
265 .stats
266 .table_stats
267 .iter_mut()
268 .find(|t| t.name == table_name)
269 {
270 ts.rows_processed += rows_redacted;
271 ts.columns_redacted += cols_redacted;
272 } else {
273 self.stats.tables_processed += 1;
274 self.stats.table_stats.push(TableRedactStats {
275 name: table_name.to_string(),
276 rows_processed: rows_redacted,
277 columns_redacted: cols_redacted,
278 });
279 }
280 }
281
282 Ok(redacted)
283 }
284
285 fn redact_copy(&mut self, stmt: &[u8], table_name: &str) -> anyhow::Result<Vec<u8>> {
287 if self.should_skip_table(table_name) {
289 return Ok(stmt.to_vec());
290 }
291
292 let Some(table) = self.schema.get_table(table_name) else {
294 self.stats.warnings.push(format!(
295 "No schema for table '{}', passing through unchanged",
296 table_name
297 ));
298 return Ok(stmt.to_vec());
299 };
300
301 let strategies = self.matcher.get_strategies(table_name, table);
303
304 if strategies.iter().all(|s| matches!(s, StrategyKind::Skip)) {
306 return Ok(stmt.to_vec());
307 }
308
309 let (redacted, rows_redacted, cols_redacted) =
311 self.rewriter
312 .rewrite_copy(stmt, table_name, table, &strategies)?;
313
314 if rows_redacted > 0 {
316 self.stats.rows_redacted += rows_redacted;
317 self.stats.columns_redacted += cols_redacted;
318
319 if let Some(ts) = self
321 .stats
322 .table_stats
323 .iter_mut()
324 .find(|t| t.name == table_name)
325 {
326 ts.rows_processed += rows_redacted;
327 ts.columns_redacted += cols_redacted;
328 } else {
329 self.stats.tables_processed += 1;
330 self.stats.table_stats.push(TableRedactStats {
331 name: table_name.to_string(),
332 rows_processed: rows_redacted,
333 columns_redacted: cols_redacted,
334 });
335 }
336 }
337
338 Ok(redacted)
339 }
340
341 fn redact_copy_data(&mut self, data_block: &[u8]) -> anyhow::Result<Vec<u8>> {
343 let pending = self
344 .pending_copy
345 .take()
346 .ok_or_else(|| anyhow::anyhow!("COPY data block without pending header"))?;
347
348 let table_name = &pending.table_name;
349
350 if self.should_skip_table(table_name) {
352 let mut result = pending.header;
354 result.extend_from_slice(data_block);
355 return Ok(result);
356 }
357
358 let Some(table) = self.schema.get_table(table_name) else {
360 self.stats.warnings.push(format!(
361 "No schema for table '{}', passing through unchanged",
362 table_name
363 ));
364 let mut result = pending.header;
365 result.extend_from_slice(data_block);
366 return Ok(result);
367 };
368
369 let strategies = self.matcher.get_strategies(table_name, table);
371
372 if strategies.iter().all(|s| matches!(s, StrategyKind::Skip)) {
374 let mut result = pending.header;
375 result.extend_from_slice(data_block);
376 return Ok(result);
377 }
378
379 let (redacted_data, rows_redacted, cols_redacted) =
381 self.rewriter
382 .rewrite_copy_data(data_block, table, &strategies, &pending.columns)?;
383
384 if rows_redacted > 0 {
386 self.stats.rows_redacted += rows_redacted;
387 self.stats.columns_redacted += cols_redacted;
388
389 if let Some(ts) = self
390 .stats
391 .table_stats
392 .iter_mut()
393 .find(|t| t.name == *table_name)
394 {
395 ts.rows_processed += rows_redacted;
396 ts.columns_redacted += cols_redacted;
397 } else {
398 self.stats.tables_processed += 1;
399 self.stats.table_stats.push(TableRedactStats {
400 name: table_name.to_string(),
401 rows_processed: rows_redacted,
402 columns_redacted: cols_redacted,
403 });
404 }
405 }
406
407 let mut result = pending.header;
410 if !result.ends_with(b"\n") {
411 result.push(b'\n');
412 }
413 result.extend_from_slice(&redacted_data);
414 Ok(result)
415 }
416
417 fn should_skip_table(&self, name: &str) -> bool {
419 if self
421 .config
422 .exclude
423 .iter()
424 .any(|e| e.eq_ignore_ascii_case(name))
425 {
426 return true;
427 }
428
429 if let Some(ref tables) = self.config.tables_filter {
431 if !tables.iter().any(|t| t.eq_ignore_ascii_case(name)) {
432 return true;
433 }
434 }
435
436 false
437 }
438}