sql_splitter/redactor/
mod.rs

1//! Redactor module for anonymizing sensitive data in SQL dumps.
2//!
3//! This module provides:
4//! - YAML configuration parsing for redaction rules
5//! - Column pattern matching with glob support
6//! - Multiple redaction strategies (null, constant, hash, mask, shuffle, fake, skip)
7//! - Streaming redaction of INSERT and COPY statements
8//! - Config auto-generation from schema analysis
9
10mod config;
11mod config_generator;
12mod matcher;
13mod rewriter;
14pub mod strategy;
15
16pub use config::RedactConfig;
17// These will be used when additional features are implemented
18#[allow(unused_imports)]
19pub use config::{RedactConfigBuilder, RedactYamlConfig, Rule};
20pub use config_generator::generate_config;
21pub use matcher::ColumnMatcher;
22pub use rewriter::ValueRewriter;
23pub use strategy::StrategyKind;
24
25use crate::parser::postgres_copy::parse_copy_columns;
26use crate::parser::{Parser, SqlDialect, StatementType};
27use crate::schema::{Schema, SchemaBuilder};
28use ahash::AHashMap;
29use std::fs::File;
30use std::io::{BufWriter, Write};
31use std::path::Path;
32
33/// Statistics from redaction operation
34#[derive(Debug, Default, serde::Serialize)]
35pub struct RedactStats {
36    /// Number of tables processed
37    pub tables_processed: usize,
38    /// Number of rows redacted
39    pub rows_redacted: u64,
40    /// Number of columns redacted
41    pub columns_redacted: u64,
42    /// Per-table statistics
43    pub table_stats: Vec<TableRedactStats>,
44    /// Warning messages
45    pub warnings: Vec<String>,
46}
47
48/// Per-table redaction statistics
49#[derive(Debug, Clone, serde::Serialize)]
50pub struct TableRedactStats {
51    pub name: String,
52    pub rows_processed: u64,
53    pub columns_redacted: u64,
54}
55
56/// Main redactor struct
57pub struct Redactor {
58    config: RedactConfig,
59    schema: Schema,
60    matcher: ColumnMatcher,
61    rewriter: ValueRewriter,
62    stats: RedactStats,
63    /// Pending COPY header for PostgreSQL (header comes before data block)
64    pending_copy: Option<PendingCopy>,
65}
66
67/// Pending COPY statement awaiting data block
68struct PendingCopy {
69    header: Vec<u8>,
70    table_name: String,
71    columns: Vec<String>,
72}
73
74impl Redactor {
75    /// Create a new Redactor with the given configuration
76    pub fn new(config: RedactConfig) -> anyhow::Result<Self> {
77        // Build schema from input file (Pass 1)
78        let schema = Self::build_schema(&config.input, config.dialect)?;
79
80        // Build column matcher from config rules
81        let matcher = ColumnMatcher::from_config(&config)?;
82
83        // Create value rewriter with seed for reproducibility
84        let rewriter = ValueRewriter::new(config.seed, config.dialect, config.locale.clone());
85
86        Ok(Self {
87            config,
88            schema,
89            matcher,
90            rewriter,
91            stats: RedactStats::default(),
92            pending_copy: None,
93        })
94    }
95
96    /// Build schema from input file
97    fn build_schema(input: &Path, dialect: SqlDialect) -> anyhow::Result<Schema> {
98        let file = File::open(input)?;
99        let mut parser = Parser::with_dialect(file, 64 * 1024, dialect);
100        let mut builder = SchemaBuilder::new();
101
102        while let Some(stmt) = parser.read_statement()? {
103            let (stmt_type, _table_name) =
104                Parser::<&[u8]>::parse_statement_with_dialect(&stmt, dialect);
105
106            if stmt_type == StatementType::CreateTable {
107                let stmt_str = String::from_utf8_lossy(&stmt);
108                builder.parse_create_table(&stmt_str);
109            }
110        }
111
112        Ok(builder.build())
113    }
114
115    /// Run the redaction process
116    pub fn run(&mut self) -> anyhow::Result<RedactStats> {
117        if self.config.dry_run {
118            return self.dry_run();
119        }
120
121        // Open output
122        let output: Box<dyn Write> = if let Some(ref path) = self.config.output {
123            Box::new(BufWriter::new(File::create(path)?))
124        } else {
125            Box::new(std::io::stdout())
126        };
127
128        self.process_file(output)?;
129
130        Ok(std::mem::take(&mut self.stats))
131    }
132
133    /// Dry run - analyze without writing
134    fn dry_run(&mut self) -> anyhow::Result<RedactStats> {
135        let file = File::open(&self.config.input)?;
136        let mut parser = Parser::with_dialect(file, 64 * 1024, self.config.dialect);
137
138        let mut tables_seen: AHashMap<String, u64> = AHashMap::new();
139
140        while let Some(stmt) = parser.read_statement()? {
141            let (stmt_type, table_name) =
142                Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.config.dialect);
143
144            if !table_name.is_empty()
145                && (stmt_type == StatementType::Insert || stmt_type == StatementType::Copy)
146            {
147                *tables_seen.entry(table_name).or_insert(0) += 1;
148            }
149        }
150
151        // Build stats from dry run
152        for (name, count) in tables_seen {
153            if let Some(table) = self.schema.get_table(&name) {
154                let columns_matched = self.matcher.count_matches(&name, table);
155                if columns_matched > 0 {
156                    self.stats.tables_processed += 1;
157                    self.stats.rows_redacted += count;
158                    self.stats.columns_redacted += columns_matched as u64 * count;
159                    self.stats.table_stats.push(TableRedactStats {
160                        name,
161                        rows_processed: count,
162                        columns_redacted: columns_matched as u64,
163                    });
164                }
165            }
166        }
167
168        Ok(std::mem::take(&mut self.stats))
169    }
170
171    /// Process the file and write redacted output
172    fn process_file(&mut self, mut output: Box<dyn Write>) -> anyhow::Result<()> {
173        let file = File::open(&self.config.input)?;
174        let mut parser = Parser::with_dialect(file, 64 * 1024, self.config.dialect);
175
176        while let Some(stmt) = parser.read_statement()? {
177            let (stmt_type, table_name) =
178                Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.config.dialect);
179
180            let redacted = match stmt_type {
181                StatementType::Insert if !table_name.is_empty() => {
182                    self.redact_insert(&stmt, &table_name)?
183                }
184                StatementType::Copy if !table_name.is_empty() => {
185                    // PostgreSQL COPY: store header, wait for data block
186                    if self.config.dialect == SqlDialect::Postgres {
187                        let header_str = String::from_utf8_lossy(&stmt);
188                        let columns = parse_copy_columns(&header_str);
189                        self.pending_copy = Some(PendingCopy {
190                            header: stmt.clone(),
191                            table_name: table_name.clone(),
192                            columns,
193                        });
194                        // Don't output yet - wait for data block
195                        continue;
196                    }
197                    self.redact_copy(&stmt, &table_name)?
198                }
199                StatementType::Unknown
200                    if self.config.dialect == SqlDialect::Postgres
201                        && self.pending_copy.is_some()
202                        && (stmt.ends_with(b"\\.\n") || stmt.ends_with(b"\\.\r\n")) =>
203                {
204                    // This is the COPY data block
205                    self.redact_copy_data(&stmt)?
206                }
207                _ => {
208                    // If we have a pending COPY that wasn't followed by a data block,
209                    // output it as-is
210                    if let Some(pending) = self.pending_copy.take() {
211                        output.write_all(&pending.header)?;
212                    }
213                    stmt
214                }
215            };
216
217            output.write_all(&redacted)?;
218        }
219
220        // Handle any remaining pending COPY header at EOF
221        if let Some(pending) = self.pending_copy.take() {
222            output.write_all(&pending.header)?;
223        }
224
225        output.flush()?;
226        Ok(())
227    }
228
229    /// Redact an INSERT statement
230    fn redact_insert(&mut self, stmt: &[u8], table_name: &str) -> anyhow::Result<Vec<u8>> {
231        // Skip if table should be excluded
232        if self.should_skip_table(table_name) {
233            return Ok(stmt.to_vec());
234        }
235
236        // Get table schema
237        let Some(table) = self.schema.get_table(table_name) else {
238            self.stats.warnings.push(format!(
239                "No schema for table '{}', passing through unchanged",
240                table_name
241            ));
242            return Ok(stmt.to_vec());
243        };
244
245        // Get strategies for each column
246        let strategies = self.matcher.get_strategies(table_name, table);
247
248        // If no columns need redaction, pass through
249        if strategies.iter().all(|s| matches!(s, StrategyKind::Skip)) {
250            return Ok(stmt.to_vec());
251        }
252
253        // Rewrite the INSERT statement with redacted values
254        let (redacted, rows_redacted, cols_redacted) =
255            self.rewriter.rewrite_insert(stmt, table_name, table, &strategies)?;
256
257        // Update stats
258        if rows_redacted > 0 {
259            self.stats.rows_redacted += rows_redacted;
260            self.stats.columns_redacted += cols_redacted;
261
262            // Find or create table stats entry
263            if let Some(ts) = self.stats.table_stats.iter_mut().find(|t| t.name == table_name) {
264                ts.rows_processed += rows_redacted;
265                ts.columns_redacted += cols_redacted;
266            } else {
267                self.stats.tables_processed += 1;
268                self.stats.table_stats.push(TableRedactStats {
269                    name: table_name.to_string(),
270                    rows_processed: rows_redacted,
271                    columns_redacted: cols_redacted,
272                });
273            }
274        }
275
276        Ok(redacted)
277    }
278
279    /// Redact a COPY statement (PostgreSQL)
280    fn redact_copy(&mut self, stmt: &[u8], table_name: &str) -> anyhow::Result<Vec<u8>> {
281        // Skip if table should be excluded
282        if self.should_skip_table(table_name) {
283            return Ok(stmt.to_vec());
284        }
285
286        // Get table schema
287        let Some(table) = self.schema.get_table(table_name) else {
288            self.stats.warnings.push(format!(
289                "No schema for table '{}', passing through unchanged",
290                table_name
291            ));
292            return Ok(stmt.to_vec());
293        };
294
295        // Get strategies for each column
296        let strategies = self.matcher.get_strategies(table_name, table);
297
298        // If no columns need redaction, pass through
299        if strategies.iter().all(|s| matches!(s, StrategyKind::Skip)) {
300            return Ok(stmt.to_vec());
301        }
302
303        // Rewrite the COPY statement with redacted values
304        let (redacted, rows_redacted, cols_redacted) =
305            self.rewriter.rewrite_copy(stmt, table_name, table, &strategies)?;
306
307        // Update stats
308        if rows_redacted > 0 {
309            self.stats.rows_redacted += rows_redacted;
310            self.stats.columns_redacted += cols_redacted;
311
312            // Find or create table stats entry
313            if let Some(ts) = self.stats.table_stats.iter_mut().find(|t| t.name == table_name) {
314                ts.rows_processed += rows_redacted;
315                ts.columns_redacted += cols_redacted;
316            } else {
317                self.stats.tables_processed += 1;
318                self.stats.table_stats.push(TableRedactStats {
319                    name: table_name.to_string(),
320                    rows_processed: rows_redacted,
321                    columns_redacted: cols_redacted,
322                });
323            }
324        }
325
326        Ok(redacted)
327    }
328
329    /// Redact a PostgreSQL COPY data block (comes after the header)
330    fn redact_copy_data(&mut self, data_block: &[u8]) -> anyhow::Result<Vec<u8>> {
331        let pending = self.pending_copy.take().ok_or_else(|| {
332            anyhow::anyhow!("COPY data block without pending header")
333        })?;
334
335        let table_name = &pending.table_name;
336
337        // Skip if table should be excluded
338        if self.should_skip_table(table_name) {
339            // Output header + data unchanged
340            let mut result = pending.header;
341            result.extend_from_slice(data_block);
342            return Ok(result);
343        }
344
345        // Get table schema
346        let Some(table) = self.schema.get_table(table_name) else {
347            self.stats.warnings.push(format!(
348                "No schema for table '{}', passing through unchanged",
349                table_name
350            ));
351            let mut result = pending.header;
352            result.extend_from_slice(data_block);
353            return Ok(result);
354        };
355
356        // Get strategies for each column
357        let strategies = self.matcher.get_strategies(table_name, table);
358
359        // If no columns need redaction, pass through
360        if strategies.iter().all(|s| matches!(s, StrategyKind::Skip)) {
361            let mut result = pending.header;
362            result.extend_from_slice(data_block);
363            return Ok(result);
364        }
365
366        // Rewrite the COPY data block with redacted values
367        let (redacted_data, rows_redacted, cols_redacted) =
368            self.rewriter.rewrite_copy_data(data_block, table, &strategies, &pending.columns)?;
369
370        // Update stats
371        if rows_redacted > 0 {
372            self.stats.rows_redacted += rows_redacted;
373            self.stats.columns_redacted += cols_redacted;
374
375            if let Some(ts) = self.stats.table_stats.iter_mut().find(|t| t.name == *table_name) {
376                ts.rows_processed += rows_redacted;
377                ts.columns_redacted += cols_redacted;
378            } else {
379                self.stats.tables_processed += 1;
380                self.stats.table_stats.push(TableRedactStats {
381                    name: table_name.to_string(),
382                    rows_processed: rows_redacted,
383                    columns_redacted: cols_redacted,
384                });
385            }
386        }
387
388        // Combine header + redacted data
389        // The header typically doesn't end with newline, so add one
390        let mut result = pending.header;
391        if !result.ends_with(b"\n") {
392            result.push(b'\n');
393        }
394        result.extend_from_slice(&redacted_data);
395        Ok(result)
396    }
397
398    /// Check if a table should be skipped
399    fn should_skip_table(&self, name: &str) -> bool {
400        // Check exclude list
401        if self
402            .config
403            .exclude
404            .iter()
405            .any(|e| e.eq_ignore_ascii_case(name))
406        {
407            return true;
408        }
409
410        // Check include list (if specified)
411        if let Some(ref tables) = self.config.tables_filter {
412            if !tables.iter().any(|t| t.eq_ignore_ascii_case(name)) {
413                return true;
414            }
415        }
416
417        false
418    }
419}