1pub(crate) mod anonymise;
16pub(crate) mod config;
17pub(crate) mod consistency;
18pub(crate) mod parser;
19pub(crate) mod schema;
20
21use crate::{Database, ImportOptions};
22use consistency::ConsistencyMap;
23use indicatif::{ProgressBar, ProgressStyle};
24use std::collections::HashSet;
25use std::path::Path;
26
27#[derive(Debug)]
29pub enum ImportError {
30 Config(String),
32 Data(String),
34 Database(String),
36}
37
38impl std::fmt::Display for ImportError {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 match self {
41 ImportError::Config(msg) => write!(f, "{msg}"),
42 ImportError::Data(msg) => write!(f, "{msg}"),
43 ImportError::Database(msg) => write!(f, "{msg}"),
44 }
45 }
46}
47
48impl std::error::Error for ImportError {}
49
50impl From<String> for ImportError {
51 fn from(s: String) -> Self {
52 ImportError::Data(s)
53 }
54}
55
56pub struct ImportCommand {
58 pub source: std::path::PathBuf,
60 pub output: Option<std::path::PathBuf>,
62 pub schema: std::path::PathBuf,
64 pub rules: Option<std::path::PathBuf>,
66 pub tables: Option<Vec<String>>,
68 pub compress: bool,
70 pub force: bool,
72 pub continue_on_error: bool,
76}
77
78#[derive(Debug)]
80pub struct ImportSummary {
81 pub tables: Vec<TableImportResult>,
83 pub total_items: usize,
85 pub total_bytes: usize,
87 pub total_skipped: usize,
89 pub warnings: Vec<String>,
91 pub output_path: Option<std::path::PathBuf>,
93}
94
95#[derive(Debug)]
97pub struct TableImportResult {
98 pub table_name: String,
99 pub items_imported: usize,
100 pub bytes_imported: usize,
101 pub lines_skipped: usize,
102}
103
104pub fn run_into(db: &Database, cmd: ImportCommand) -> Result<ImportSummary, ImportError> {
111 let (rules, consistency_config) = if let Some(ref rules_path) = cmd.rules {
113 let (rules, consistency) =
114 config::load_and_validate(rules_path).map_err(ImportError::Config)?;
115 eprintln!(
116 "Loaded {} anonymisation rules from {}",
117 rules.len(),
118 rules_path.display()
119 );
120 (rules, consistency)
121 } else {
122 (Vec::new(), None)
123 };
124
125 let consistency_fields: std::collections::HashSet<String> = consistency_config
126 .as_ref()
127 .map(|c| c.fields.iter().cloned().collect())
128 .unwrap_or_default();
129 let mut consistency_map = ConsistencyMap::new();
130
131 let (schemas, schema_json) = schema::load_schemas(&cmd.schema)?;
133 eprintln!(
134 "Loaded {} table schemas from {}",
135 schemas.len(),
136 cmd.schema.display()
137 );
138
139 let table_filter = cmd.tables.as_deref();
141 let export_files = parser::discover_export_files(&cmd.source, table_filter)?;
142
143 if export_files.is_empty() {
144 return Err(ImportError::Config(format!(
145 "No export files found in {}. Expected DynamoDB Export directory structure \
146 (<dir>/<TableName>/data/*.json.gz) or flat directory (<dir>/*.json[.gz]).",
147 cmd.source.display()
148 )));
149 }
150
151 let schema_map: std::collections::HashMap<&str, &schema::TableSchema> =
153 schemas.iter().map(|s| (s.table_name.as_str(), s)).collect();
154
155 for (table_name, _) in &export_files {
157 if !schema_map.contains_key(table_name.as_str()) {
158 return Err(ImportError::Config(format!(
159 "No schema found for table '{}'. Available schemas: {}",
160 table_name,
161 schemas
162 .iter()
163 .map(|s| s.table_name.as_str())
164 .collect::<Vec<_>>()
165 .join(", ")
166 )));
167 }
168
169 let table_json = find_table_json(&schema_json, table_name)
171 .ok_or_else(|| format!("Schema JSON not found for table '{table_name}'"))?;
172
173 let create_request: crate::actions::create_table::CreateTableRequest =
174 serde_json::from_value(table_json)
175 .map_err(|e| format!("Failed to deserialize schema for '{}': {e}", table_name))?;
176
177 db.create_table(create_request)
178 .map_err(|e| format!("Failed to create table '{}': {e}", table_name))?;
179 }
180
181 db.enable_bulk_loading()
183 .map_err(|e| format!("Failed to enable bulk loading: {e}"))?;
184
185 let mut summary = ImportSummary {
187 tables: Vec::new(),
188 total_items: 0,
189 total_bytes: 0,
190 total_skipped: 0,
191 warnings: Vec::new(),
192 output_path: cmd.output.clone(),
193 };
194
195 let mut seen_warnings: HashSet<String> = HashSet::new();
196
197 for (table_name, files) in &export_files {
198 let table_schema = schema_map.get(table_name.as_str()).unwrap();
199 let key_attrs = extract_key_attrs(&table_schema.create_request);
200
201 let file_count = files.len();
202 eprintln!("Importing table '{}' ({} files)...", table_name, file_count);
203
204 let pb = ProgressBar::new_spinner();
205 pb.set_style(
206 ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] {msg}")
207 .unwrap()
208 .tick_chars("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"),
209 );
210 pb.set_message(format!("{}: parsing...", table_name));
211
212 let mut table_items = 0usize;
213 let mut table_bytes = 0usize;
214 let mut table_skipped = 0usize;
215 let mut batch_error: Option<String> = None;
216
217 const BATCH_SIZE: usize = 10_000;
218
219 for file_path in files {
220 let mut batch: Vec<crate::types::Item> = Vec::with_capacity(BATCH_SIZE);
221
222 let stats = parser::parse_export_file_streaming(file_path, |mut item| {
223 if batch_error.is_some() {
225 return;
226 }
227
228 if !rules.is_empty() {
230 let warnings = anonymise::apply_rules(
231 &mut item,
232 &rules,
233 &mut consistency_map,
234 &consistency_fields,
235 &key_attrs,
236 );
237 for w in warnings {
238 if !seen_warnings.contains(&w) {
239 seen_warnings.insert(w.clone());
240 summary.warnings.push(w);
241 }
242 }
243 }
244 batch.push(item);
245
246 if batch.len() >= BATCH_SIZE {
248 let chunk = std::mem::replace(&mut batch, Vec::with_capacity(BATCH_SIZE));
249 match db.import_items_fresh(table_name, chunk, ImportOptions::default()) {
250 Ok(result) => {
251 table_items += result.items_imported;
252 table_bytes += result.bytes_imported;
253 }
254 Err(e) => {
255 let msg = format!("Batch import error for '{}': {e}", table_name);
256 if cmd.continue_on_error {
257 summary.warnings.push(msg);
258 } else {
259 batch_error = Some(msg);
260 return;
261 }
262 }
263 }
264 pb.set_message(format!("{}: {} items", table_name, table_items));
265 pb.tick();
266 }
267 })?;
268
269 if let Some(err) = batch_error.take() {
271 pb.abandon_with_message(format!("{}: FAILED", table_name));
272 return Err(ImportError::Database(err));
273 }
274
275 table_skipped += stats.skipped;
276 for warning in stats.warnings {
277 summary.warnings.push(warning);
278 }
279
280 if !batch.is_empty() {
282 let import_result = db
283 .import_items_fresh(table_name, batch, ImportOptions::default())
284 .map_err(|e| format!("Failed to import items into '{}': {e}", table_name))?;
285 table_items += import_result.items_imported;
286 table_bytes += import_result.bytes_imported;
287 pb.set_message(format!("{}: {} items", table_name, table_items));
288 pb.tick();
289 }
290 }
291
292 pb.finish_with_message(format!(
293 "{}: {} items, {} bytes{}",
294 table_name,
295 table_items,
296 format_bytes(table_bytes),
297 if table_skipped > 0 {
298 format!(", {} skipped", table_skipped)
299 } else {
300 String::new()
301 }
302 ));
303
304 summary.tables.push(TableImportResult {
305 table_name: table_name.clone(),
306 items_imported: table_items,
307 bytes_imported: table_bytes,
308 lines_skipped: table_skipped,
309 });
310 summary.total_items += table_items;
311 summary.total_bytes += table_bytes;
312 summary.total_skipped += table_skipped;
313 }
314
315 db.disable_bulk_loading()
317 .map_err(|e| format!("Failed to disable bulk loading: {e}"))?;
318
319 if consistency_map.field_count() > 0 {
321 eprintln!(
322 "Consistency map: {} fields, {} total mappings",
323 consistency_map.field_count(),
324 consistency_map.total_mappings()
325 );
326 }
327
328 Ok(summary)
329}
330
331pub fn run(cmd: ImportCommand) -> Result<ImportSummary, ImportError> {
338 let output = cmd
339 .output
340 .as_ref()
341 .ok_or_else(|| ImportError::Config("output path required for file-based import".into()))?;
342
343 if output.exists() && !cmd.force {
345 return Err(ImportError::Config(format!(
346 "Output file '{}' already exists. Use --force to overwrite.",
347 output.display()
348 )));
349 }
350
351 let output_path = output.clone();
352 let compress = cmd.compress;
353
354 let output_dir = output_path.parent().unwrap_or(Path::new("."));
358 let tmp_file = tempfile::NamedTempFile::new_in(output_dir)
359 .map_err(|e| ImportError::Database(format!("Failed to create temp file: {e}")))?;
360 let tmp_path = tmp_file.path().to_path_buf();
361
362 let tmp_file = tmp_file.into_temp_path();
365
366 let db = Database::new(
367 tmp_path
368 .to_str()
369 .ok_or_else(|| ImportError::Config("Invalid temp path".to_string()))?,
370 )
371 .map_err(|e| ImportError::Database(format!("Failed to create output database: {e}")))?;
372
373 let mut summary = run_into(&db, cmd)?;
374
375 drop(db);
378 {
379 let db = Database::new(
380 tmp_path
381 .to_str()
382 .ok_or_else(|| ImportError::Config("Invalid temp path".to_string()))?,
383 )
384 .map_err(|e| ImportError::Database(format!("Failed to reopen database for VACUUM: {e}")))?;
385 db.vacuum()
386 .map_err(|e| ImportError::Database(format!("VACUUM failed: {e}")))?;
387 }
388 eprintln!("Database compacted.");
389
390 tmp_file.persist(&output_path).map_err(|e| {
393 ImportError::Database(format!("Failed to move database to output path: {e}"))
394 })?;
395
396 summary.output_path = Some(output_path.clone());
397
398 if compress {
400 let compressed_path = compress_output(&output_path)?;
401 summary.output_path = Some(compressed_path);
402 }
403
404 Ok(summary)
405}
406
407fn find_table_json(schema_json: &serde_json::Value, table_name: &str) -> Option<serde_json::Value> {
410 let items: Vec<&serde_json::Value> = match schema_json {
411 serde_json::Value::Array(arr) => arr.iter().collect(),
412 obj @ serde_json::Value::Object(_) => vec![obj],
413 _ => return None,
414 };
415
416 for item in items {
417 let table = item.get("Table").unwrap_or(item);
418 if table.get("TableName").and_then(|v| v.as_str()) == Some(table_name) {
419 return Some(table.clone());
422 }
423 }
424 None
425}
426
427fn extract_key_attrs(request: &crate::actions::create_table::CreateTableRequest) -> Vec<String> {
429 request
430 .key_schema
431 .iter()
432 .map(|ks| ks.attribute_name.clone())
433 .collect()
434}
435
436fn compress_output(path: &Path) -> Result<std::path::PathBuf, String> {
438 let compressed_path = path.with_extension("db.zst");
439 eprintln!("Compressing to {}...", compressed_path.display());
440
441 let input = std::fs::File::open(path)
442 .map_err(|e| format!("Failed to open {} for compression: {e}", path.display()))?;
443
444 let output = std::fs::File::create(&compressed_path)
445 .map_err(|e| format!("Failed to create {}: {e}", compressed_path.display()))?;
446
447 let mut encoder =
448 zstd::Encoder::new(output, 3).map_err(|e| format!("Failed to create zstd encoder: {e}"))?;
449
450 std::io::copy(&mut std::io::BufReader::new(input), &mut encoder)
451 .map_err(|e| format!("Compression failed: {e}"))?;
452
453 encoder
454 .finish()
455 .map_err(|e| format!("Failed to finalize compression: {e}"))?;
456
457 std::fs::remove_file(path).map_err(|e| format!("Failed to remove uncompressed file: {e}"))?;
459
460 let compressed_size = std::fs::metadata(&compressed_path)
461 .map(|m| m.len())
462 .unwrap_or(0);
463 eprintln!(
464 "Compressed output: {}",
465 format_bytes(compressed_size as usize)
466 );
467
468 Ok(compressed_path)
469}
470
471fn format_bytes(bytes: usize) -> String {
473 if bytes < 1024 {
474 format!("{bytes} B")
475 } else if bytes < 1024 * 1024 {
476 format!("{:.1} KB", bytes as f64 / 1024.0)
477 } else {
478 format!("{:.1} MB", bytes as f64 / (1024.0 * 1024.0))
479 }
480}