sql_splitter/redactor/
mod.rs

1//! Redactor module for anonymizing sensitive data in SQL dumps.
2//!
3//! This module provides:
4//! - YAML configuration parsing for redaction rules
5//! - Column pattern matching with glob support
6//! - Multiple redaction strategies (null, constant, hash, mask, shuffle, fake, skip)
7//! - Streaming redaction of INSERT and COPY statements
8//! - Config auto-generation from schema analysis
9
10mod config;
11mod config_generator;
12mod matcher;
13pub mod strategy;
14
15pub use config::RedactConfig;
16// These will be used when additional features are implemented
17#[allow(unused_imports)]
18pub use config::{RedactConfigBuilder, RedactYamlConfig, Rule};
19pub use config_generator::generate_config;
20pub use matcher::ColumnMatcher;
21pub use strategy::StrategyKind;
22
23use crate::parser::{Parser, SqlDialect, StatementType};
24use crate::schema::{Schema, SchemaBuilder};
25use ahash::AHashMap;
26use std::fs::File;
27use std::io::{BufWriter, Write};
28use std::path::Path;
29
30/// Statistics from redaction operation
31#[derive(Debug, Default, serde::Serialize)]
32pub struct RedactStats {
33    /// Number of tables processed
34    pub tables_processed: usize,
35    /// Number of rows redacted
36    pub rows_redacted: u64,
37    /// Number of columns redacted
38    pub columns_redacted: u64,
39    /// Per-table statistics
40    pub table_stats: Vec<TableRedactStats>,
41    /// Warning messages
42    pub warnings: Vec<String>,
43}
44
45/// Per-table redaction statistics
46#[derive(Debug, Clone, serde::Serialize)]
47pub struct TableRedactStats {
48    pub name: String,
49    pub rows_processed: u64,
50    pub columns_redacted: u64,
51}
52
53/// Main redactor struct
54pub struct Redactor {
55    config: RedactConfig,
56    schema: Schema,
57    matcher: ColumnMatcher,
58    stats: RedactStats,
59}
60
61impl Redactor {
62    /// Create a new Redactor with the given configuration
63    pub fn new(config: RedactConfig) -> anyhow::Result<Self> {
64        // Build schema from input file (Pass 1)
65        let schema = Self::build_schema(&config.input, config.dialect)?;
66
67        // Build column matcher from config rules
68        let matcher = ColumnMatcher::from_config(&config)?;
69
70        Ok(Self {
71            config,
72            schema,
73            matcher,
74            stats: RedactStats::default(),
75        })
76    }
77
78    /// Build schema from input file
79    fn build_schema(input: &Path, dialect: SqlDialect) -> anyhow::Result<Schema> {
80        let file = File::open(input)?;
81        let mut parser = Parser::with_dialect(file, 64 * 1024, dialect);
82        let mut builder = SchemaBuilder::new();
83
84        while let Some(stmt) = parser.read_statement()? {
85            let (stmt_type, _table_name) =
86                Parser::<&[u8]>::parse_statement_with_dialect(&stmt, dialect);
87
88            if stmt_type == StatementType::CreateTable {
89                let stmt_str = String::from_utf8_lossy(&stmt);
90                builder.parse_create_table(&stmt_str);
91            }
92        }
93
94        Ok(builder.build())
95    }
96
97    /// Run the redaction process
98    pub fn run(&mut self) -> anyhow::Result<RedactStats> {
99        if self.config.dry_run {
100            return self.dry_run();
101        }
102
103        // Open output
104        let output: Box<dyn Write> = if let Some(ref path) = self.config.output {
105            Box::new(BufWriter::new(File::create(path)?))
106        } else {
107            Box::new(std::io::stdout())
108        };
109
110        self.process_file(output)?;
111
112        Ok(std::mem::take(&mut self.stats))
113    }
114
115    /// Dry run - analyze without writing
116    fn dry_run(&mut self) -> anyhow::Result<RedactStats> {
117        let file = File::open(&self.config.input)?;
118        let mut parser = Parser::with_dialect(file, 64 * 1024, self.config.dialect);
119
120        let mut tables_seen: AHashMap<String, u64> = AHashMap::new();
121
122        while let Some(stmt) = parser.read_statement()? {
123            let (stmt_type, table_name) =
124                Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.config.dialect);
125
126            if !table_name.is_empty()
127                && (stmt_type == StatementType::Insert || stmt_type == StatementType::Copy)
128            {
129                *tables_seen.entry(table_name).or_insert(0) += 1;
130            }
131        }
132
133        // Build stats from dry run
134        for (name, count) in tables_seen {
135            if let Some(table) = self.schema.get_table(&name) {
136                let columns_matched = self.matcher.count_matches(&name, table);
137                if columns_matched > 0 {
138                    self.stats.tables_processed += 1;
139                    self.stats.rows_redacted += count;
140                    self.stats.columns_redacted += columns_matched as u64 * count;
141                    self.stats.table_stats.push(TableRedactStats {
142                        name,
143                        rows_processed: count,
144                        columns_redacted: columns_matched as u64,
145                    });
146                }
147            }
148        }
149
150        Ok(std::mem::take(&mut self.stats))
151    }
152
153    /// Process the file and write redacted output
154    fn process_file(&mut self, mut output: Box<dyn Write>) -> anyhow::Result<()> {
155        let file = File::open(&self.config.input)?;
156        let mut parser = Parser::with_dialect(file, 64 * 1024, self.config.dialect);
157
158        while let Some(stmt) = parser.read_statement()? {
159            let (stmt_type, table_name) =
160                Parser::<&[u8]>::parse_statement_with_dialect(&stmt, self.config.dialect);
161
162            let redacted = match stmt_type {
163                StatementType::Insert if !table_name.is_empty() => {
164                    self.redact_insert(&stmt, &table_name)?
165                }
166                StatementType::Copy if !table_name.is_empty() => {
167                    self.redact_copy(&stmt, &table_name)?
168                }
169                _ => stmt,
170            };
171
172            output.write_all(&redacted)?;
173        }
174
175        output.flush()?;
176        Ok(())
177    }
178
179    /// Redact an INSERT statement
180    fn redact_insert(&mut self, stmt: &[u8], table_name: &str) -> anyhow::Result<Vec<u8>> {
181        // Skip if table should be excluded
182        if self.should_skip_table(table_name) {
183            return Ok(stmt.to_vec());
184        }
185
186        // Get table schema
187        let Some(table) = self.schema.get_table(table_name) else {
188            self.stats.warnings.push(format!(
189                "No schema for table '{}', passing through unchanged",
190                table_name
191            ));
192            return Ok(stmt.to_vec());
193        };
194
195        // Get strategies for each column
196        let strategies = self.matcher.get_strategies(table_name, table);
197
198        // If no columns need redaction, pass through
199        if strategies.iter().all(|s| matches!(s, StrategyKind::Skip)) {
200            return Ok(stmt.to_vec());
201        }
202
203        // TODO: Implement actual INSERT rewriting in Phase 3
204        // For now, pass through
205        Ok(stmt.to_vec())
206    }
207
208    /// Redact a COPY statement (PostgreSQL)
209    fn redact_copy(&mut self, stmt: &[u8], table_name: &str) -> anyhow::Result<Vec<u8>> {
210        // Skip if table should be excluded
211        if self.should_skip_table(table_name) {
212            return Ok(stmt.to_vec());
213        }
214
215        // Get table schema
216        let Some(table) = self.schema.get_table(table_name) else {
217            self.stats.warnings.push(format!(
218                "No schema for table '{}', passing through unchanged",
219                table_name
220            ));
221            return Ok(stmt.to_vec());
222        };
223
224        // Get strategies for each column
225        let strategies = self.matcher.get_strategies(table_name, table);
226
227        // If no columns need redaction, pass through
228        if strategies.iter().all(|s| matches!(s, StrategyKind::Skip)) {
229            return Ok(stmt.to_vec());
230        }
231
232        // TODO: Implement actual COPY rewriting in Phase 3
233        // For now, pass through
234        Ok(stmt.to_vec())
235    }
236
237    /// Check if a table should be skipped
238    fn should_skip_table(&self, name: &str) -> bool {
239        // Check exclude list
240        if self
241            .config
242            .exclude
243            .iter()
244            .any(|e| e.eq_ignore_ascii_case(name))
245        {
246            return true;
247        }
248
249        // Check include list (if specified)
250        if let Some(ref tables) = self.config.tables_filter {
251            if !tables.iter().any(|t| t.eq_ignore_ascii_case(name)) {
252                return true;
253            }
254        }
255
256        false
257    }
258}