Skip to main content

dynoxide/import/
mod.rs

1//! Import CLI for DynamoDB Export data.
2//!
3//! Parses DynamoDB Export JSON Lines files, optionally applies anonymisation
4//! rules, and imports the data into a Dynoxide SQLite database.
5//!
6//! ## Pipeline
7//!
8//! 1. Parse TOML config (validate all rules upfront)
9//! 2. Source table schemas from `--schema <file>`
10//! 3. Create tables in output SQLite database
11//! 4. For each table: read JSON Lines → parse → anonymise → batch insert
12//! 5. VACUUM (compact the SQLite file)
13//! 6. Optionally compress with zstd
14
15pub(crate) mod anonymise;
16pub(crate) mod config;
17pub(crate) mod consistency;
18pub(crate) mod parser;
19pub(crate) mod schema;
20
21use crate::{Database, ImportOptions};
22use consistency::ConsistencyMap;
23use indicatif::{ProgressBar, ProgressStyle};
24use std::collections::HashSet;
25use std::path::Path;
26
27/// Errors from the import pipeline.
28#[derive(Debug)]
29pub enum ImportError {
30    /// Configuration or validation error (e.g., invalid TOML, missing schema).
31    Config(String),
32    /// I/O or parsing error during data import.
33    Data(String),
34    /// Database error during table creation or item insertion.
35    Database(String),
36}
37
38impl std::fmt::Display for ImportError {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        match self {
41            ImportError::Config(msg) => write!(f, "{msg}"),
42            ImportError::Data(msg) => write!(f, "{msg}"),
43            ImportError::Database(msg) => write!(f, "{msg}"),
44        }
45    }
46}
47
48impl std::error::Error for ImportError {}
49
50impl From<String> for ImportError {
51    fn from(s: String) -> Self {
52        ImportError::Data(s)
53    }
54}
55
56/// Configuration for the import operation.
57pub struct ImportCommand {
58    /// Source directory containing export files.
59    pub source: std::path::PathBuf,
60    /// Output SQLite database path (required for file-based import, None for in-memory).
61    pub output: Option<std::path::PathBuf>,
62    /// Schema file path (DescribeTable JSON format).
63    pub schema: std::path::PathBuf,
64    /// Optional anonymisation rules TOML file.
65    pub rules: Option<std::path::PathBuf>,
66    /// Optional table name filter (comma-separated).
67    pub tables: Option<Vec<String>>,
68    /// Optional zstd compression of output (only valid with file output).
69    pub compress: bool,
70    /// Overwrite existing output file without prompting.
71    pub force: bool,
72    /// Continue importing when a batch fails (default: fail-fast).
73    /// When true, batch errors are recorded as warnings and import continues.
74    /// When false (default), the first batch error aborts the import.
75    pub continue_on_error: bool,
76}
77
78/// Result of an import operation.
79#[derive(Debug)]
80pub struct ImportSummary {
81    /// Per-table import statistics.
82    pub tables: Vec<TableImportResult>,
83    /// Total items imported across all tables.
84    pub total_items: usize,
85    /// Total bytes imported.
86    pub total_bytes: usize,
87    /// Total lines skipped due to parse errors.
88    pub total_skipped: usize,
89    /// Warnings generated during import.
90    pub warnings: Vec<String>,
91    /// Output file path (may differ from input if compressed). None for in-memory imports.
92    pub output_path: Option<std::path::PathBuf>,
93}
94
95/// Per-table import result.
96#[derive(Debug)]
97pub struct TableImportResult {
98    pub table_name: String,
99    pub items_imported: usize,
100    pub bytes_imported: usize,
101    pub lines_skipped: usize,
102}
103
104/// Execute the import pipeline into a caller-provided database.
105///
106/// This is the core import logic — database-agnostic. The caller is
107/// responsible for creating the database and any post-import steps
108/// (VACUUM, compression). This makes import usable with both file-backed
109/// and in-memory databases.
110pub fn run_into(db: &Database, cmd: ImportCommand) -> Result<ImportSummary, ImportError> {
111    // 1. Load and validate anonymisation rules (if provided)
112    let (rules, consistency_config) = if let Some(ref rules_path) = cmd.rules {
113        let (rules, consistency) =
114            config::load_and_validate(rules_path).map_err(ImportError::Config)?;
115        eprintln!(
116            "Loaded {} anonymisation rules from {}",
117            rules.len(),
118            rules_path.display()
119        );
120        (rules, consistency)
121    } else {
122        (Vec::new(), None)
123    };
124
125    let consistency_fields: std::collections::HashSet<String> = consistency_config
126        .as_ref()
127        .map(|c| c.fields.iter().cloned().collect())
128        .unwrap_or_default();
129    let mut consistency_map = ConsistencyMap::new();
130
131    // 2. Load table schemas (returns both parsed schemas and raw JSON)
132    let (schemas, schema_json) = schema::load_schemas(&cmd.schema)?;
133    eprintln!(
134        "Loaded {} table schemas from {}",
135        schemas.len(),
136        cmd.schema.display()
137    );
138
139    // 3. Discover export files
140    let table_filter = cmd.tables.as_deref();
141    let export_files = parser::discover_export_files(&cmd.source, table_filter)?;
142
143    if export_files.is_empty() {
144        return Err(ImportError::Config(format!(
145            "No export files found in {}. Expected DynamoDB Export directory structure \
146             (<dir>/<TableName>/data/*.json.gz) or flat directory (<dir>/*.json[.gz]).",
147            cmd.source.display()
148        )));
149    }
150
151    // Build a schema lookup map
152    let schema_map: std::collections::HashMap<&str, &schema::TableSchema> =
153        schemas.iter().map(|s| (s.table_name.as_str(), s)).collect();
154
155    // 4. Create tables from schemas
156    for (table_name, _) in &export_files {
157        if !schema_map.contains_key(table_name.as_str()) {
158            return Err(ImportError::Config(format!(
159                "No schema found for table '{}'. Available schemas: {}",
160                table_name,
161                schemas
162                    .iter()
163                    .map(|s| s.table_name.as_str())
164                    .collect::<Vec<_>>()
165                    .join(", ")
166            )));
167        }
168
169        // Find the matching schema JSON and deserialize into a fresh CreateTableRequest
170        let table_json = find_table_json(&schema_json, table_name)
171            .ok_or_else(|| format!("Schema JSON not found for table '{table_name}'"))?;
172
173        let create_request: crate::actions::create_table::CreateTableRequest =
174            serde_json::from_value(table_json)
175                .map_err(|e| format!("Failed to deserialize schema for '{}': {e}", table_name))?;
176
177        db.create_table(create_request)
178            .map_err(|e| format!("Failed to create table '{}': {e}", table_name))?;
179    }
180
181    // 5. Enable bulk-loading PRAGMAs (safe: fresh DB, can re-import on crash)
182    db.enable_bulk_loading()
183        .map_err(|e| format!("Failed to enable bulk loading: {e}"))?;
184
185    // 6. Import data for each table
186    let mut summary = ImportSummary {
187        tables: Vec::new(),
188        total_items: 0,
189        total_bytes: 0,
190        total_skipped: 0,
191        warnings: Vec::new(),
192        output_path: cmd.output.clone(),
193    };
194
195    let mut seen_warnings: HashSet<String> = HashSet::new();
196
197    for (table_name, files) in &export_files {
198        let table_schema = schema_map.get(table_name.as_str()).unwrap();
199        let key_attrs = extract_key_attrs(&table_schema.create_request);
200
201        let file_count = files.len();
202        eprintln!("Importing table '{}' ({} files)...", table_name, file_count);
203
204        let pb = ProgressBar::new_spinner();
205        pb.set_style(
206            ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] {msg}")
207                .unwrap()
208                .tick_chars("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"),
209        );
210        pb.set_message(format!("{}: parsing...", table_name));
211
212        let mut table_items = 0usize;
213        let mut table_bytes = 0usize;
214        let mut table_skipped = 0usize;
215        let mut batch_error: Option<String> = None;
216
217        const BATCH_SIZE: usize = 10_000;
218
219        for file_path in files {
220            let mut batch: Vec<crate::types::Item> = Vec::with_capacity(BATCH_SIZE);
221
222            let stats = parser::parse_export_file_streaming(file_path, |mut item| {
223                // Skip processing if we've already hit a fatal batch error
224                if batch_error.is_some() {
225                    return;
226                }
227
228                // Apply anonymisation rules
229                if !rules.is_empty() {
230                    let warnings = anonymise::apply_rules(
231                        &mut item,
232                        &rules,
233                        &mut consistency_map,
234                        &consistency_fields,
235                        &key_attrs,
236                    );
237                    for w in warnings {
238                        if !seen_warnings.contains(&w) {
239                            seen_warnings.insert(w.clone());
240                            summary.warnings.push(w);
241                        }
242                    }
243                }
244                batch.push(item);
245
246                // Flush batch when full
247                if batch.len() >= BATCH_SIZE {
248                    let chunk = std::mem::replace(&mut batch, Vec::with_capacity(BATCH_SIZE));
249                    match db.import_items_fresh(table_name, chunk, ImportOptions::default()) {
250                        Ok(result) => {
251                            table_items += result.items_imported;
252                            table_bytes += result.bytes_imported;
253                        }
254                        Err(e) => {
255                            let msg = format!("Batch import error for '{}': {e}", table_name);
256                            if cmd.continue_on_error {
257                                summary.warnings.push(msg);
258                            } else {
259                                batch_error = Some(msg);
260                                return;
261                            }
262                        }
263                    }
264                    pb.set_message(format!("{}: {} items", table_name, table_items));
265                    pb.tick();
266                }
267            })?;
268
269            // Propagate batch error after the streaming callback completes
270            if let Some(err) = batch_error.take() {
271                pb.abandon_with_message(format!("{}: FAILED", table_name));
272                return Err(ImportError::Database(err));
273            }
274
275            table_skipped += stats.skipped;
276            for warning in stats.warnings {
277                summary.warnings.push(warning);
278            }
279
280            // Flush remaining items
281            if !batch.is_empty() {
282                let import_result = db
283                    .import_items_fresh(table_name, batch, ImportOptions::default())
284                    .map_err(|e| format!("Failed to import items into '{}': {e}", table_name))?;
285                table_items += import_result.items_imported;
286                table_bytes += import_result.bytes_imported;
287                pb.set_message(format!("{}: {} items", table_name, table_items));
288                pb.tick();
289            }
290        }
291
292        pb.finish_with_message(format!(
293            "{}: {} items, {} bytes{}",
294            table_name,
295            table_items,
296            format_bytes(table_bytes),
297            if table_skipped > 0 {
298                format!(", {} skipped", table_skipped)
299            } else {
300                String::new()
301            }
302        ));
303
304        summary.tables.push(TableImportResult {
305            table_name: table_name.clone(),
306            items_imported: table_items,
307            bytes_imported: table_bytes,
308            lines_skipped: table_skipped,
309        });
310        summary.total_items += table_items;
311        summary.total_bytes += table_bytes;
312        summary.total_skipped += table_skipped;
313    }
314
315    // 7. Restore normal PRAGMAs (important if DB will be served after import)
316    db.disable_bulk_loading()
317        .map_err(|e| format!("Failed to disable bulk loading: {e}"))?;
318
319    // Report consistency map stats
320    if consistency_map.field_count() > 0 {
321        eprintln!(
322            "Consistency map: {} fields, {} total mappings",
323            consistency_map.field_count(),
324            consistency_map.total_mappings()
325        );
326    }
327
328    Ok(summary)
329}
330
331/// Execute the import pipeline with file-based output.
332///
333/// Creates a new database at a temporary path, imports data, VACUUMs,
334/// then atomically renames to the final output path. If the import fails
335/// at any point, the temp file is cleaned up automatically and any
336/// existing output file is preserved.
337pub fn run(cmd: ImportCommand) -> Result<ImportSummary, ImportError> {
338    let output = cmd
339        .output
340        .as_ref()
341        .ok_or_else(|| ImportError::Config("output path required for file-based import".into()))?;
342
343    // Check for existing output file
344    if output.exists() && !cmd.force {
345        return Err(ImportError::Config(format!(
346            "Output file '{}' already exists. Use --force to overwrite.",
347            output.display()
348        )));
349    }
350
351    let output_path = output.clone();
352    let compress = cmd.compress;
353
354    // Write to a temp file in the same directory as the output so that
355    // persist() can do an atomic rename (same filesystem). On failure,
356    // NamedTempFile's Drop cleans up automatically.
357    let output_dir = output_path.parent().unwrap_or(Path::new("."));
358    let tmp_file = tempfile::NamedTempFile::new_in(output_dir)
359        .map_err(|e| ImportError::Database(format!("Failed to create temp file: {e}")))?;
360    let tmp_path = tmp_file.path().to_path_buf();
361
362    // Close the temp file handle — Database::new will open it by path.
363    // Keep the NamedTempFile alive so it cleans up on error.
364    let tmp_file = tmp_file.into_temp_path();
365
366    let db = Database::new(
367        tmp_path
368            .to_str()
369            .ok_or_else(|| ImportError::Config("Invalid temp path".to_string()))?,
370    )
371    .map_err(|e| ImportError::Database(format!("Failed to create output database: {e}")))?;
372
373    let mut summary = run_into(&db, cmd)?;
374
375    // VACUUM for compact output.
376    // Drop the db and reopen to release any in-process state before compacting.
377    drop(db);
378    {
379        let db = Database::new(
380            tmp_path
381                .to_str()
382                .ok_or_else(|| ImportError::Config("Invalid temp path".to_string()))?,
383        )
384        .map_err(|e| ImportError::Database(format!("Failed to reopen database for VACUUM: {e}")))?;
385        db.vacuum()
386            .map_err(|e| ImportError::Database(format!("VACUUM failed: {e}")))?;
387    }
388    eprintln!("Database compacted.");
389
390    // Atomically move the temp file to the final output path.
391    // This overwrites any existing file (--force was already checked above).
392    tmp_file.persist(&output_path).map_err(|e| {
393        ImportError::Database(format!("Failed to move database to output path: {e}"))
394    })?;
395
396    summary.output_path = Some(output_path.clone());
397
398    // Optionally compress with zstd
399    if compress {
400        let compressed_path = compress_output(&output_path)?;
401        summary.output_path = Some(compressed_path);
402    }
403
404    Ok(summary)
405}
406
407/// Find the raw JSON for a specific table in the schema file.
408/// Converts from DescribeTable format (with "Table" wrapper) to CreateTableRequest format.
409fn find_table_json(schema_json: &serde_json::Value, table_name: &str) -> Option<serde_json::Value> {
410    let items: Vec<&serde_json::Value> = match schema_json {
411        serde_json::Value::Array(arr) => arr.iter().collect(),
412        obj @ serde_json::Value::Object(_) => vec![obj],
413        _ => return None,
414    };
415
416    for item in items {
417        let table = item.get("Table").unwrap_or(item);
418        if table.get("TableName").and_then(|v| v.as_str()) == Some(table_name) {
419            // Convert from DescribeTable format to CreateTableRequest format
420            // The field names are the same (PascalCase) — just strip the "Table" wrapper
421            return Some(table.clone());
422        }
423    }
424    None
425}
426
427/// Extract key attribute names from a CreateTableRequest.
428fn extract_key_attrs(request: &crate::actions::create_table::CreateTableRequest) -> Vec<String> {
429    request
430        .key_schema
431        .iter()
432        .map(|ks| ks.attribute_name.clone())
433        .collect()
434}
435
436/// Compress a file with zstd, removing the original.
437fn compress_output(path: &Path) -> Result<std::path::PathBuf, String> {
438    let compressed_path = path.with_extension("db.zst");
439    eprintln!("Compressing to {}...", compressed_path.display());
440
441    let input = std::fs::File::open(path)
442        .map_err(|e| format!("Failed to open {} for compression: {e}", path.display()))?;
443
444    let output = std::fs::File::create(&compressed_path)
445        .map_err(|e| format!("Failed to create {}: {e}", compressed_path.display()))?;
446
447    let mut encoder =
448        zstd::Encoder::new(output, 3).map_err(|e| format!("Failed to create zstd encoder: {e}"))?;
449
450    std::io::copy(&mut std::io::BufReader::new(input), &mut encoder)
451        .map_err(|e| format!("Compression failed: {e}"))?;
452
453    encoder
454        .finish()
455        .map_err(|e| format!("Failed to finalize compression: {e}"))?;
456
457    // Remove the uncompressed file
458    std::fs::remove_file(path).map_err(|e| format!("Failed to remove uncompressed file: {e}"))?;
459
460    let compressed_size = std::fs::metadata(&compressed_path)
461        .map(|m| m.len())
462        .unwrap_or(0);
463    eprintln!(
464        "Compressed output: {}",
465        format_bytes(compressed_size as usize)
466    );
467
468    Ok(compressed_path)
469}
470
471/// Format bytes as human-readable.
472fn format_bytes(bytes: usize) -> String {
473    if bytes < 1024 {
474        format!("{bytes} B")
475    } else if bytes < 1024 * 1024 {
476        format!("{:.1} KB", bytes as f64 / 1024.0)
477    } else {
478        format!("{:.1} MB", bytes as f64 / (1024.0 * 1024.0))
479    }
480}