Skip to main content

sql_splitter/redactor/
mod.rs

1//! Redactor module for anonymizing sensitive data in SQL dumps.
2//!
3//! This module provides:
4//! - YAML configuration parsing for redaction rules
5//! - Column pattern matching with glob support
6//! - Multiple redaction strategies (null, constant, hash, mask, shuffle, fake, skip)
7//! - Streaming redaction of INSERT and COPY statements
8//! - Config auto-generation from schema analysis
9
10mod config;
11mod config_generator;
12mod matcher;
13mod rewriter;
14pub mod strategy;
15
16pub use config::RedactConfig;
17#[allow(unused_imports)] // Public API re-exports used by external consumers
18pub use config::{RedactConfigBuilder, RedactYamlConfig, Rule};
19pub use config_generator::generate_config;
20pub use matcher::ColumnMatcher;
21pub use rewriter::ValueRewriter;
22pub use strategy::StrategyKind;
23
24use crate::parser::postgres_copy::parse_copy_columns;
25use crate::parser::{Parser, SqlDialect, StatementType};
26use crate::schema::{Schema, SchemaBuilder};
27use ahash::AHashMap;
28use schemars::JsonSchema;
29use std::fs::File;
30use std::io::{BufWriter, Write};
31use std::path::Path;
32
33/// Statistics from redaction operation
34#[derive(Debug, Default, serde::Serialize, JsonSchema)]
35pub struct RedactStats {
36    /// Number of tables processed
37    pub tables_processed: usize,
38    /// Number of rows redacted
39    pub rows_redacted: u64,
40    /// Number of columns redacted
41    pub columns_redacted: u64,
42    /// Per-table statistics
43    pub table_stats: Vec<TableRedactStats>,
44    /// Warning messages
45    pub warnings: Vec<String>,
46}
47
48/// Per-table redaction statistics
49#[derive(Debug, Clone, serde::Serialize, JsonSchema)]
50pub struct TableRedactStats {
51    pub name: String,
52    pub rows_processed: u64,
53    pub columns_redacted: u64,
54}
55
56/// Main redactor struct
57pub struct Redactor {
58    config: RedactConfig,
59    schema: Schema,
60    matcher: ColumnMatcher,
61    rewriter: ValueRewriter,
62    stats: RedactStats,
63    /// Pending COPY header for PostgreSQL (header comes before data block)
64    pending_copy: Option<PendingCopy>,
65}
66
67/// Pending COPY statement awaiting data block
68struct PendingCopy {
69    header: Vec<u8>,
70    table_name: String,
71    columns: Vec<String>,
72}
73
74impl Redactor {
75    /// Create a new Redactor with the given configuration
76    pub fn new(config: RedactConfig) -> anyhow::Result<Self> {
77        // Build schema from input file (Pass 1)
78        let schema = Self::build_schema(&config.input, config.dialect)?;
79
80        // Build column matcher from config rules
81        let matcher = ColumnMatcher::from_config(&config)?;
82
83        // Create value rewriter with seed for reproducibility
84        let rewriter = ValueRewriter::new(config.seed, config.dialect, config.locale.clone());
85
86        Ok(Self {
87            config,
88            schema,
89            matcher,
90            rewriter,
91            stats: RedactStats::default(),
92            pending_copy: None,
93        })
94    }
95
96    /// Build schema from input file
97    fn build_schema(input: &Path, dialect: SqlDialect) -> anyhow::Result<Schema> {
98        let file = File::open(input)?;
99        let mut parser = Parser::with_dialect(file, 64 * 1024, dialect);
100        let mut builder = SchemaBuilder::new();
101
102        while let Some(stmt) = parser.read_statement()? {
103            let (stmt_type, _table_name) =
104                Parser::<&[u8]>::parse_statement_with_dialect(&stmt, dialect);
105
106            if stmt_type == StatementType::CreateTable {
107                let stmt_str = String::from_utf8_lossy(&stmt);
108                builder.parse_create_table(&stmt_str);
109            }
110        }
111
112        Ok(builder.build())
113    }
114
115    /// Run the redaction process
116    pub fn run(&mut self) -> anyhow::Result<RedactStats> {
117        if self.config.dry_run {
118            return self.dry_run();
119        }
120
121        // Open output
122        let output: Box<dyn Write> = if let Some(ref path) = self.config.output {
123            Box::new(BufWriter::new(File::create(path)?))
124        } else {
125            Box::new(std::io::stdout())
126        };
127
128        self.process_file(output)?;
129
130        Ok(std::mem::take(&mut self.stats))
131    }
132
133    /// Dry run - analyze without writing
134    fn dry_run(&mut self) -> anyhow::Result<RedactStats> {
135        let file = File::open(&self.config.input)?;
136        let mut parser = Parser::with_dialect(file, 64 * 1024, self.config.dialect);
137
138        let mut tables_seen: AHashMap<String, u64> = AHashMap::new();
139
140        while let Some(stmt) = parser.read_statement()? {
141            let (stmt_type, table_name) =
142                Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.config.dialect);
143
144            if !table_name.is_empty()
145                && (stmt_type == StatementType::Insert || stmt_type == StatementType::Copy)
146            {
147                *tables_seen.entry(table_name).or_insert(0) += 1;
148            }
149        }
150
151        // Build stats from dry run
152        for (name, count) in tables_seen {
153            if let Some(table) = self.schema.get_table(&name) {
154                let columns_matched = self.matcher.count_matches(&name, table);
155                if columns_matched > 0 {
156                    self.stats.tables_processed += 1;
157                    self.stats.rows_redacted += count;
158                    self.stats.columns_redacted += columns_matched as u64 * count;
159                    self.stats.table_stats.push(TableRedactStats {
160                        name,
161                        rows_processed: count,
162                        columns_redacted: columns_matched as u64,
163                    });
164                }
165            }
166        }
167
168        Ok(std::mem::take(&mut self.stats))
169    }
170
171    /// Process the file and write redacted output
172    fn process_file(&mut self, mut output: Box<dyn Write>) -> anyhow::Result<()> {
173        let file = File::open(&self.config.input)?;
174        let mut parser = Parser::with_dialect(file, 64 * 1024, self.config.dialect);
175
176        while let Some(stmt) = parser.read_statement()? {
177            let (stmt_type, table_name) =
178                Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.config.dialect);
179
180            let redacted = match stmt_type {
181                StatementType::Insert if !table_name.is_empty() => {
182                    self.redact_insert(&stmt, &table_name)?
183                }
184                StatementType::Copy if !table_name.is_empty() => {
185                    // PostgreSQL COPY: store header, wait for data block
186                    if self.config.dialect == SqlDialect::Postgres {
187                        let header_str = String::from_utf8_lossy(&stmt);
188                        let columns = parse_copy_columns(&header_str);
189                        self.pending_copy = Some(PendingCopy {
190                            header: stmt.clone(),
191                            table_name: table_name.clone(),
192                            columns,
193                        });
194                        // Don't output yet - wait for data block
195                        continue;
196                    }
197                    self.redact_copy(&stmt, &table_name)?
198                }
199                StatementType::Unknown
200                    if self.config.dialect == SqlDialect::Postgres
201                        && self.pending_copy.is_some()
202                        && (stmt.ends_with(b"\\.\n") || stmt.ends_with(b"\\.\r\n")) =>
203                {
204                    // This is the COPY data block
205                    self.redact_copy_data(&stmt)?
206                }
207                _ => {
208                    // If we have a pending COPY that wasn't followed by a data block,
209                    // output it as-is
210                    if let Some(pending) = self.pending_copy.take() {
211                        output.write_all(&pending.header)?;
212                    }
213                    stmt
214                }
215            };
216
217            output.write_all(&redacted)?;
218        }
219
220        // Handle any remaining pending COPY header at EOF
221        if let Some(pending) = self.pending_copy.take() {
222            output.write_all(&pending.header)?;
223        }
224
225        output.flush()?;
226        Ok(())
227    }
228
229    /// Redact an INSERT statement
230    fn redact_insert(&mut self, stmt: &[u8], table_name: &str) -> anyhow::Result<Vec<u8>> {
231        // Skip if table should be excluded
232        if self.should_skip_table(table_name) {
233            return Ok(stmt.to_vec());
234        }
235
236        // Get table schema
237        let Some(table) = self.schema.get_table(table_name) else {
238            self.stats.warnings.push(format!(
239                "No schema for table '{}', passing through unchanged",
240                table_name
241            ));
242            return Ok(stmt.to_vec());
243        };
244
245        // Get strategies for each column
246        let strategies = self.matcher.get_strategies(table_name, table);
247
248        // If no columns need redaction, pass through
249        if strategies.iter().all(|s| matches!(s, StrategyKind::Skip)) {
250            return Ok(stmt.to_vec());
251        }
252
253        // Rewrite the INSERT statement with redacted values
254        let (redacted, rows_redacted, cols_redacted) =
255            self.rewriter
256                .rewrite_insert(stmt, table_name, table, &strategies)?;
257
258        // Update stats
259        if rows_redacted > 0 {
260            self.stats.rows_redacted += rows_redacted;
261            self.stats.columns_redacted += cols_redacted;
262
263            // Find or create table stats entry
264            if let Some(ts) = self
265                .stats
266                .table_stats
267                .iter_mut()
268                .find(|t| t.name == table_name)
269            {
270                ts.rows_processed += rows_redacted;
271                ts.columns_redacted += cols_redacted;
272            } else {
273                self.stats.tables_processed += 1;
274                self.stats.table_stats.push(TableRedactStats {
275                    name: table_name.to_string(),
276                    rows_processed: rows_redacted,
277                    columns_redacted: cols_redacted,
278                });
279            }
280        }
281
282        Ok(redacted)
283    }
284
285    /// Redact a COPY statement (PostgreSQL)
286    fn redact_copy(&mut self, stmt: &[u8], table_name: &str) -> anyhow::Result<Vec<u8>> {
287        // Skip if table should be excluded
288        if self.should_skip_table(table_name) {
289            return Ok(stmt.to_vec());
290        }
291
292        // Get table schema
293        let Some(table) = self.schema.get_table(table_name) else {
294            self.stats.warnings.push(format!(
295                "No schema for table '{}', passing through unchanged",
296                table_name
297            ));
298            return Ok(stmt.to_vec());
299        };
300
301        // Get strategies for each column
302        let strategies = self.matcher.get_strategies(table_name, table);
303
304        // If no columns need redaction, pass through
305        if strategies.iter().all(|s| matches!(s, StrategyKind::Skip)) {
306            return Ok(stmt.to_vec());
307        }
308
309        // Rewrite the COPY statement with redacted values
310        let (redacted, rows_redacted, cols_redacted) =
311            self.rewriter
312                .rewrite_copy(stmt, table_name, table, &strategies)?;
313
314        // Update stats
315        if rows_redacted > 0 {
316            self.stats.rows_redacted += rows_redacted;
317            self.stats.columns_redacted += cols_redacted;
318
319            // Find or create table stats entry
320            if let Some(ts) = self
321                .stats
322                .table_stats
323                .iter_mut()
324                .find(|t| t.name == table_name)
325            {
326                ts.rows_processed += rows_redacted;
327                ts.columns_redacted += cols_redacted;
328            } else {
329                self.stats.tables_processed += 1;
330                self.stats.table_stats.push(TableRedactStats {
331                    name: table_name.to_string(),
332                    rows_processed: rows_redacted,
333                    columns_redacted: cols_redacted,
334                });
335            }
336        }
337
338        Ok(redacted)
339    }
340
341    /// Redact a PostgreSQL COPY data block (comes after the header)
342    fn redact_copy_data(&mut self, data_block: &[u8]) -> anyhow::Result<Vec<u8>> {
343        let pending = self
344            .pending_copy
345            .take()
346            .ok_or_else(|| anyhow::anyhow!("COPY data block without pending header"))?;
347
348        let table_name = &pending.table_name;
349
350        // Skip if table should be excluded
351        if self.should_skip_table(table_name) {
352            // Output header + data unchanged
353            let mut result = pending.header;
354            result.extend_from_slice(data_block);
355            return Ok(result);
356        }
357
358        // Get table schema
359        let Some(table) = self.schema.get_table(table_name) else {
360            self.stats.warnings.push(format!(
361                "No schema for table '{}', passing through unchanged",
362                table_name
363            ));
364            let mut result = pending.header;
365            result.extend_from_slice(data_block);
366            return Ok(result);
367        };
368
369        // Get strategies for each column
370        let strategies = self.matcher.get_strategies(table_name, table);
371
372        // If no columns need redaction, pass through
373        if strategies.iter().all(|s| matches!(s, StrategyKind::Skip)) {
374            let mut result = pending.header;
375            result.extend_from_slice(data_block);
376            return Ok(result);
377        }
378
379        // Rewrite the COPY data block with redacted values
380        let (redacted_data, rows_redacted, cols_redacted) =
381            self.rewriter
382                .rewrite_copy_data(data_block, table, &strategies, &pending.columns)?;
383
384        // Update stats
385        if rows_redacted > 0 {
386            self.stats.rows_redacted += rows_redacted;
387            self.stats.columns_redacted += cols_redacted;
388
389            if let Some(ts) = self
390                .stats
391                .table_stats
392                .iter_mut()
393                .find(|t| t.name == *table_name)
394            {
395                ts.rows_processed += rows_redacted;
396                ts.columns_redacted += cols_redacted;
397            } else {
398                self.stats.tables_processed += 1;
399                self.stats.table_stats.push(TableRedactStats {
400                    name: table_name.to_string(),
401                    rows_processed: rows_redacted,
402                    columns_redacted: cols_redacted,
403                });
404            }
405        }
406
407        // Combine header + redacted data
408        // The header typically doesn't end with newline, so add one
409        let mut result = pending.header;
410        if !result.ends_with(b"\n") {
411            result.push(b'\n');
412        }
413        result.extend_from_slice(&redacted_data);
414        Ok(result)
415    }
416
417    /// Check if a table should be skipped
418    fn should_skip_table(&self, name: &str) -> bool {
419        // Check exclude list
420        if self
421            .config
422            .exclude
423            .iter()
424            .any(|e| e.eq_ignore_ascii_case(name))
425        {
426            return true;
427        }
428
429        // Check include list (if specified)
430        if let Some(ref tables) = self.config.tables_filter {
431            if !tables.iter().any(|t| t.eq_ignore_ascii_case(name)) {
432                return true;
433            }
434        }
435
436        false
437    }
438}