1use std::path::Path;
7use std::time::Instant;
8
9use rusqlite::Connection;
10use tracing::{debug, info, warn};
11use uls_parser::archive::ZipExtractor;
12
13use crate::bulk_inserter::BulkInserter;
14use crate::schema::Schema;
15use crate::{Database, Result};
16
17struct ImportGuard<'a> {
21 conn: &'a Connection,
22 indexes_dropped: bool,
23 pragmas_modified: bool,
24}
25
26impl<'a> ImportGuard<'a> {
27 fn new(conn: &'a Connection) -> Self {
28 Self {
29 conn,
30 indexes_dropped: false,
31 pragmas_modified: false,
32 }
33 }
34
35 fn mark_indexes_dropped(&mut self) {
37 self.indexes_dropped = true;
38 }
39
40 fn mark_pragmas_modified(&mut self) {
42 self.pragmas_modified = true;
43 }
44
45 fn mark_indexes_restored(&mut self) {
47 self.indexes_dropped = false;
48 }
49
50 fn mark_pragmas_restored(&mut self) {
52 self.pragmas_modified = false;
53 }
54}
55
56impl Drop for ImportGuard<'_> {
57 fn drop(&mut self) {
58 if self.indexes_dropped {
60 if let Err(e) = Schema::create_indexes(self.conn) {
61 warn!("Failed to restore indexes during cleanup: {}", e);
62 }
63 }
64
65 if self.pragmas_modified {
67 if let Err(e) = self.conn.execute_batch(
68 "PRAGMA synchronous = NORMAL;
69 PRAGMA journal_mode = WAL;",
70 ) {
71 warn!("Failed to restore PRAGMAs during cleanup: {}", e);
72 }
73 }
74 }
75}
76
77#[derive(Debug, Clone, Default)]
79pub struct ImportStats {
80 pub records: usize,
82 pub files: usize,
84 pub parse_errors: usize,
86 pub insert_errors: usize,
88 pub duration_secs: f64,
90}
91
92impl ImportStats {
93 pub fn rate(&self) -> f64 {
95 if self.duration_secs > 0.0 {
96 self.records as f64 / self.duration_secs
97 } else {
98 0.0
99 }
100 }
101
102 pub fn is_successful(&self) -> bool {
104 self.insert_errors == 0
105 }
106}
107
108pub struct ImportProgress {
110 pub current_file: usize,
112 pub total_files: usize,
114 pub file_name: String,
116 pub records: usize,
118 pub errors: usize,
120}
121
122pub type ProgressCallback = Box<dyn Fn(&ImportProgress) + Send + Sync>;
124
125#[derive(Debug, Clone, Default)]
127pub enum ImportMode {
128 Minimal,
130 #[default]
132 Full,
133 Selective(Vec<String>),
135}
136
137impl ImportMode {
138 pub const MINIMAL_TYPES: &'static [&'static str] = &["HD", "EN", "AM"];
140
141 pub fn should_import(&self, record_type: &str) -> bool {
143 match self {
144 ImportMode::Minimal => {
145 Self::MINIMAL_TYPES.contains(&record_type.to_uppercase().as_str())
146 }
147 ImportMode::Full => true,
148 ImportMode::Selective(types) => {
149 types.iter().any(|t| t.eq_ignore_ascii_case(record_type))
150 }
151 }
152 }
153
154 pub fn should_import_file(&self, filename: &str) -> bool {
156 let record_type = filename.split('.').next().unwrap_or("").to_uppercase();
158 self.should_import(&record_type)
159 }
160}
161
162pub struct Importer<'a> {
164 db: &'a Database,
165}
166
167impl<'a> Importer<'a> {
168 pub fn new(db: &'a Database) -> Self {
170 Self { db }
171 }
172
173 pub fn import_zip(
177 &self,
178 zip_path: &Path,
179 progress: Option<ProgressCallback>,
180 ) -> Result<ImportStats> {
181 self.import_zip_with_mode(zip_path, ImportMode::Full, progress)
182 }
183
184 pub fn import_zip_with_mode(
201 &self,
202 zip_path: &Path,
203 mode: ImportMode,
204 progress: Option<ProgressCallback>,
205 ) -> Result<ImportStats> {
206 let start = Instant::now();
207
208 let mut extractor = ZipExtractor::open(zip_path)?;
209 let all_dat_files = extractor.list_dat_files();
210
211 let mut dat_files: Vec<String> = all_dat_files
213 .into_iter()
214 .filter(|f| mode.should_import_file(f))
215 .collect();
216
217 dat_files.sort_by(|a, b| {
219 let priority = |s: &str| -> u8 {
220 let upper = s.to_uppercase();
221 if upper.contains("HD") {
222 0
223 } else if upper.contains("EN") {
224 1
225 } else if upper.contains("AM") {
226 2
227 } else {
228 3
229 }
230 };
231 priority(a).cmp(&priority(b))
232 });
233
234 info!(
235 "Processing {} DAT files (mode={:?}): {:?}",
236 dat_files.len(),
237 mode,
238 dat_files
239 );
240
241 let conn = self.db.conn()?;
243
244 let mut guard = ImportGuard::new(&conn);
246
247 conn.execute_batch(
248 "PRAGMA synchronous = OFF;
249 PRAGMA journal_mode = MEMORY;
250 PRAGMA temp_store = MEMORY;
251 PRAGMA cache_size = -64000;",
252 )?;
253 guard.mark_pragmas_modified();
254
255 debug!("Dropping indexes for bulk import performance");
257 Schema::drop_indexes(&conn)?;
258 guard.mark_indexes_dropped();
259
260 conn.execute("BEGIN TRANSACTION", [])?;
262
263 let mut inserter = BulkInserter::new(&conn)?;
265
266 let mut stats = ImportStats {
267 files: dat_files.len(),
268 ..Default::default()
269 };
270
271 let mut records_per_type: std::collections::HashMap<String, usize> =
273 std::collections::HashMap::new();
274
275 for (idx, dat_file) in dat_files.iter().enumerate() {
276 let mut file_records = 0usize;
277 let mut file_parse_errors = 0usize;
278 let mut file_insert_errors = 0usize;
279
280 extractor.process_dat_streaming(dat_file, |line| {
281 match line.to_record() {
282 Ok(record) => {
283 if let Err(e) = inserter.insert(&record) {
284 file_insert_errors += 1;
285 if file_insert_errors <= 5 {
286 warn!("Insert error in {}: {}", dat_file, e);
287 }
288 } else {
289 file_records += 1;
290 }
291 }
292 Err(e) => {
293 file_parse_errors += 1;
294 if file_parse_errors <= 5 {
295 warn!("Parse error in {}: {}", dat_file, e);
296 }
297 }
298 }
299
300 if let Some(ref cb) = progress {
302 #[allow(clippy::manual_is_multiple_of)]
303 if (file_records + file_parse_errors) % 10_000 == 0 {
304 cb(&ImportProgress {
305 current_file: idx + 1,
306 total_files: dat_files.len(),
307 file_name: dat_file.clone(),
308 records: stats.records + file_records,
309 errors: stats.parse_errors
310 + stats.insert_errors
311 + file_parse_errors
312 + file_insert_errors,
313 });
314 }
315 }
316 true
317 })?;
318
319 stats.records += file_records;
320 stats.parse_errors += file_parse_errors;
321 stats.insert_errors += file_insert_errors;
322
323 let record_type = dat_file.split('.').next().unwrap_or("").to_uppercase();
325 *records_per_type.entry(record_type).or_insert(0) += file_records;
326
327 if file_parse_errors > 0 || file_insert_errors > 0 {
328 warn!(
329 "{}: {} records, {} parse errors, {} insert errors",
330 dat_file, file_records, file_parse_errors, file_insert_errors
331 );
332 }
333
334 if let Some(ref cb) = progress {
336 cb(&ImportProgress {
337 current_file: idx + 1,
338 total_files: dat_files.len(),
339 file_name: dat_file.clone(),
340 records: stats.records,
341 errors: stats.parse_errors + stats.insert_errors,
342 });
343 }
344 }
345
346 drop(inserter);
348
349 conn.execute("COMMIT", [])?;
351
352 let index_start = Instant::now();
354 debug!("Rebuilding indexes after bulk import");
355 Schema::create_indexes(&conn)?;
356 guard.mark_indexes_restored();
357 let index_duration = index_start.elapsed();
358 debug!(
359 "Index rebuild completed in {:.2}s",
360 index_duration.as_secs_f64()
361 );
362
363 conn.execute_batch(
365 "PRAGMA synchronous = NORMAL;
366 PRAGMA journal_mode = WAL;",
367 )?;
368 guard.mark_pragmas_restored();
369
370 stats.duration_secs = start.elapsed().as_secs_f64();
371
372 info!(
373 "Import complete: {} records in {:.1}s ({:.0}/sec), index rebuild: {:.2}s",
374 stats.records,
375 stats.duration_secs,
376 stats.rate(),
377 index_duration.as_secs_f64()
378 );
379
380 Ok(stats)
381 }
382
383 pub fn import_for_service(
388 &self,
389 zip_path: &Path,
390 service: &str,
391 mode: ImportMode,
392 progress: Option<ProgressCallback>,
393 ) -> Result<ImportStats> {
394 self.db.clear_import_status(service)?;
396
397 let mut extractor = uls_parser::archive::ZipExtractor::open(zip_path)?;
398 let all_dat_files = extractor.list_dat_files();
399
400 let imported_types: Vec<String> = all_dat_files
402 .iter()
403 .filter(|f| mode.should_import_file(f))
404 .map(|f| f.split('.').next().unwrap_or("").to_uppercase())
405 .collect();
406
407 let stats = self.import_zip_with_mode(zip_path, mode, progress)?;
409
410 for record_type in imported_types {
414 self.db.mark_imported(service, &record_type, 0)?;
415 }
416
417 Ok(stats)
418 }
419
420 pub fn import_patch(
430 &self,
431 zip_path: &Path,
432 mode: ImportMode,
433 progress: Option<ProgressCallback>,
434 ) -> Result<ImportStats> {
435 let start = Instant::now();
436
437 let mut extractor = ZipExtractor::open(zip_path)?;
438 let all_dat_files = extractor.list_dat_files();
439
440 let mut dat_files: Vec<String> = all_dat_files
442 .into_iter()
443 .filter(|f| mode.should_import_file(f))
444 .collect();
445
446 dat_files.sort_by(|a, b| {
448 let priority = |s: &str| -> u8 {
449 let upper = s.to_uppercase();
450 if upper.contains("HD") {
451 0
452 } else if upper.contains("EN") {
453 1
454 } else if upper.contains("AM") {
455 2
456 } else {
457 3
458 }
459 };
460 priority(a).cmp(&priority(b))
461 });
462
463 info!(
464 "Applying patch with {} DAT files (mode={:?}): {:?}",
465 dat_files.len(),
466 mode,
467 dat_files
468 );
469
470 let conn = self.db.conn()?;
472 conn.execute_batch(
473 "PRAGMA synchronous = OFF;
474 PRAGMA journal_mode = MEMORY;
475 PRAGMA temp_store = MEMORY;
476 PRAGMA cache_size = -64000;",
477 )?;
478
479 conn.execute("BEGIN TRANSACTION", [])?;
481
482 let mut inserter = BulkInserter::new(&conn)?;
484
485 let mut stats = ImportStats {
486 files: dat_files.len(),
487 ..Default::default()
488 };
489
490 for (idx, dat_file) in dat_files.iter().enumerate() {
491 let mut file_records = 0usize;
492 let mut file_parse_errors = 0usize;
493 let mut file_insert_errors = 0usize;
494
495 extractor.process_dat_streaming(dat_file, |line| {
496 match line.to_record() {
497 Ok(record) => {
498 if let Err(e) = inserter.insert(&record) {
499 file_insert_errors += 1;
500 if file_insert_errors <= 5 {
501 warn!("Insert error in {}: {}", dat_file, e);
502 }
503 } else {
504 file_records += 1;
505 }
506 }
507 Err(e) => {
508 file_parse_errors += 1;
509 if file_parse_errors <= 5 {
510 warn!("Parse error in {}: {}", dat_file, e);
511 }
512 }
513 }
514
515 if let Some(ref cb) = progress {
517 #[allow(clippy::manual_is_multiple_of)]
518 if (file_records + file_parse_errors) % 1_000 == 0 {
519 cb(&ImportProgress {
520 current_file: idx + 1,
521 total_files: dat_files.len(),
522 file_name: dat_file.clone(),
523 records: stats.records + file_records,
524 errors: stats.parse_errors
525 + stats.insert_errors
526 + file_parse_errors
527 + file_insert_errors,
528 });
529 }
530 }
531 true
532 })?;
533
534 stats.records += file_records;
535 stats.parse_errors += file_parse_errors;
536 stats.insert_errors += file_insert_errors;
537
538 if file_parse_errors > 0 || file_insert_errors > 0 {
539 warn!(
540 "{}: {} records, {} parse errors, {} insert errors",
541 dat_file, file_records, file_parse_errors, file_insert_errors
542 );
543 }
544
545 if let Some(ref cb) = progress {
547 cb(&ImportProgress {
548 current_file: idx + 1,
549 total_files: dat_files.len(),
550 file_name: dat_file.clone(),
551 records: stats.records,
552 errors: stats.parse_errors + stats.insert_errors,
553 });
554 }
555 }
556
557 drop(inserter);
559
560 conn.execute("COMMIT", [])?;
562
563 conn.execute_batch(
565 "PRAGMA synchronous = NORMAL;
566 PRAGMA journal_mode = WAL;",
567 )?;
568
569 stats.duration_secs = start.elapsed().as_secs_f64();
570
571 info!(
572 "Patch applied: {} records in {:.2}s ({:.0}/sec)",
573 stats.records,
574 stats.duration_secs,
575 stats.rate()
576 );
577
578 Ok(stats)
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585 use crate::DatabaseConfig;
586 use std::io::Write;
587 use tempfile::TempDir;
588 use zip::write::FileOptions;
589 use zip::ZipWriter;
590
591 fn create_test_db() -> (TempDir, Database) {
592 let temp_dir = TempDir::new().unwrap();
593 let db_path = temp_dir.path().join("test.db");
594 let config = DatabaseConfig::with_path(db_path);
595 let db = Database::with_config(config).unwrap();
596 db.initialize().unwrap();
597 (temp_dir, db)
598 }
599
600 fn create_test_zip(temp_dir: &TempDir) -> std::path::PathBuf {
601 let zip_path = temp_dir.path().join("test.zip");
602 let file = std::fs::File::create(&zip_path).unwrap();
603 let mut zip = ZipWriter::new(file);
604 let options: FileOptions<()> = FileOptions::default();
605
606 zip.start_file("HD.dat", options).unwrap();
608 zip.write_all(b"HD|100001|0000000001||W1TEST|A|HA|01/15/2020|01/15/2030|||||||||||||||||||||||||||||||||||N|||||||||||01/15/2020|01/15/2020|||||||||||||||\n").unwrap();
609
610 zip.start_file("EN.dat", options).unwrap();
612 zip.write_all(b"EN|100001|||W1TEST|L|L00100001|DOE, JOHN A|JOHN|A|DOE||555-555-1234||test@example.com|123 Main St|ANYTOWN|CA|90210||||000|0001234567|I||||||\n").unwrap();
613
614 zip.start_file("AM.dat", options).unwrap();
616 zip.write_all(b"AM|100001|||W1TEST|E|D|6||||||||||\n")
617 .unwrap();
618
619 zip.finish().unwrap();
620 zip_path
621 }
622
623 #[test]
624 fn test_import_stats_default() {
625 let stats = ImportStats::default();
626 assert_eq!(stats.records, 0);
627 assert_eq!(stats.files, 0);
628 assert_eq!(stats.parse_errors, 0);
629 assert_eq!(stats.insert_errors, 0);
630 assert_eq!(stats.duration_secs, 0.0);
631 }
632
633 #[test]
634 fn test_import_stats_rate() {
635 let stats = ImportStats {
636 records: 1000,
637 duration_secs: 2.0,
638 ..Default::default()
639 };
640 assert_eq!(stats.rate(), 500.0);
641 }
642
643 #[test]
644 fn test_import_stats_rate_zero_duration() {
645 let stats = ImportStats {
646 records: 1000,
647 duration_secs: 0.0,
648 ..Default::default()
649 };
650 assert_eq!(stats.rate(), 0.0);
651 }
652
653 #[test]
654 fn test_import_stats_is_successful() {
655 let stats = ImportStats {
656 insert_errors: 0,
657 ..Default::default()
658 };
659 assert!(stats.is_successful());
660
661 let stats_with_errors = ImportStats {
662 insert_errors: 1,
663 ..Default::default()
664 };
665 assert!(!stats_with_errors.is_successful());
666 }
667
668 #[test]
669 fn test_importer_new() {
670 let (_temp_dir, db) = create_test_db();
671 let importer = Importer::new(&db);
672 assert!(std::ptr::eq(importer.db, &db));
674 }
675
676 #[test]
677 fn test_import_zip_basic() {
678 let (temp_dir, db) = create_test_db();
679 let zip_path = create_test_zip(&temp_dir);
680
681 let importer = Importer::new(&db);
682 let stats = importer.import_zip(&zip_path, None).unwrap();
683
684 assert_eq!(stats.files, 3); assert_eq!(stats.records, 3);
686 assert_eq!(stats.parse_errors, 0);
687 assert_eq!(stats.insert_errors, 0);
688 assert!(stats.is_successful());
689 }
690
691 #[test]
692 fn test_import_zip_with_progress_callback() {
693 let (temp_dir, db) = create_test_db();
694 let zip_path = create_test_zip(&temp_dir);
695
696 let progress_called = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
697 let progress_called_clone = progress_called.clone();
698
699 let callback: ProgressCallback = Box::new(move |_progress| {
700 progress_called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
701 });
702
703 let importer = Importer::new(&db);
704 let stats = importer.import_zip(&zip_path, Some(callback)).unwrap();
705
706 assert!(stats.is_successful());
707 }
709
710 #[test]
711 fn test_import_zip_nonexistent_file() {
712 let (_temp_dir, db) = create_test_db();
713 let importer = Importer::new(&db);
714
715 let result = importer.import_zip(Path::new("/nonexistent/file.zip"), None);
716 assert!(result.is_err());
717 }
718
719 #[test]
720 fn test_import_zip_verifies_data() {
721 let (temp_dir, db) = create_test_db();
722 let zip_path = create_test_zip(&temp_dir);
723
724 let importer = Importer::new(&db);
725 importer.import_zip(&zip_path, None).unwrap();
726
727 if let Some(license) = db.get_license_by_callsign("W1TEST").unwrap() {
729 assert_eq!(license.call_sign, "W1TEST");
730 assert_eq!(license.radio_service.as_str(), "HA");
731 } else {
732 panic!("License W1TEST should have been imported");
733 }
734 }
735
736 #[test]
741 fn test_import_mode_should_import_minimal() {
742 let mode = ImportMode::Minimal;
743
744 assert!(mode.should_import("HD"));
746 assert!(mode.should_import("EN"));
747 assert!(mode.should_import("AM"));
748 assert!(mode.should_import("hd")); assert!(!mode.should_import("HS"));
752 assert!(!mode.should_import("CO"));
753 assert!(!mode.should_import("SC"));
754 assert!(!mode.should_import("LA"));
755 assert!(!mode.should_import("SF"));
756 }
757
758 #[test]
759 fn test_import_mode_should_import_full() {
760 let mode = ImportMode::Full;
761
762 assert!(mode.should_import("HD"));
764 assert!(mode.should_import("EN"));
765 assert!(mode.should_import("AM"));
766 assert!(mode.should_import("HS"));
767 assert!(mode.should_import("CO"));
768 assert!(mode.should_import("SC"));
769 assert!(mode.should_import("LA"));
770 assert!(mode.should_import("SF"));
771 assert!(mode.should_import("ANYTYPE")); }
773
774 #[test]
775 fn test_import_mode_should_import_selective() {
776 let mode = ImportMode::Selective(vec!["HD".to_string(), "CO".to_string()]);
777
778 assert!(mode.should_import("HD"));
779 assert!(mode.should_import("CO"));
780 assert!(mode.should_import("hd")); assert!(!mode.should_import("EN"));
783 assert!(!mode.should_import("AM"));
784 assert!(!mode.should_import("HS"));
785 }
786
787 #[test]
788 fn test_import_mode_should_import_file() {
789 let mode = ImportMode::Minimal;
790
791 assert!(mode.should_import_file("HD.dat"));
792 assert!(mode.should_import_file("EN.dat"));
793 assert!(mode.should_import_file("AM.dat"));
794
795 assert!(!mode.should_import_file("HS.dat"));
796 assert!(!mode.should_import_file("CO.dat"));
797 }
798
799 #[test]
800 fn test_import_mode_default_is_full() {
801 let mode = ImportMode::default();
802 assert!(matches!(mode, ImportMode::Full));
803 }
804
805 fn create_multi_type_test_zip(temp_dir: &TempDir) -> std::path::PathBuf {
806 let zip_path = temp_dir.path().join("multi.zip");
807 let file = std::fs::File::create(&zip_path).unwrap();
808 let mut zip = ZipWriter::new(file);
809 let options: FileOptions<()> = FileOptions::default();
810
811 zip.start_file("HD.dat", options).unwrap();
813 zip.write_all(b"HD|100001|0000000001||W1TEST|A|HA|01/15/2020|01/15/2030|||||||||||||||||||||||||||||||||||N|||||||||||01/15/2020|01/15/2020|||||||||||||||\n").unwrap();
814
815 zip.start_file("EN.dat", options).unwrap();
817 zip.write_all(b"EN|100001|||W1TEST|L|L00100001|DOE, JOHN|JOHN||DOE||||||||||||000|0001234567|I||||||\n").unwrap();
818
819 zip.start_file("AM.dat", options).unwrap();
821 zip.write_all(b"AM|100001|||W1TEST|E|D|6||||||||||\n")
822 .unwrap();
823
824 zip.start_file("HS.dat", options).unwrap();
826 zip.write_all(b"HS|100001||W1TEST|01/15/2020|LIISS\n")
827 .unwrap();
828
829 zip.start_file("CO.dat", options).unwrap();
831 zip.write_all(b"CO|100001||W1TEST|01/15/2020|Test comment||\n")
832 .unwrap();
833
834 zip.finish().unwrap();
835 zip_path
836 }
837
838 #[test]
839 fn test_import_zip_with_mode_minimal() {
840 let (temp_dir, db) = create_test_db();
841 let zip_path = create_multi_type_test_zip(&temp_dir);
842
843 let importer = Importer::new(&db);
844 let stats = importer
845 .import_zip_with_mode(&zip_path, ImportMode::Minimal, None)
846 .unwrap();
847
848 assert_eq!(stats.files, 3);
850 assert_eq!(stats.records, 3);
851 assert!(stats.is_successful());
852
853 assert!(db.get_license_by_callsign("W1TEST").unwrap().is_some());
855 }
856
857 #[test]
858 fn test_import_zip_with_mode_full() {
859 let (temp_dir, db) = create_test_db();
860 let zip_path = create_multi_type_test_zip(&temp_dir);
861
862 let importer = Importer::new(&db);
863 let stats = importer
864 .import_zip_with_mode(&zip_path, ImportMode::Full, None)
865 .unwrap();
866
867 assert_eq!(stats.files, 5);
869 assert_eq!(stats.records, 5);
870 assert!(stats.is_successful());
871 }
872
873 #[test]
874 fn test_import_zip_with_mode_selective() {
875 let (temp_dir, db) = create_test_db();
876 let zip_path = create_multi_type_test_zip(&temp_dir);
877
878 let importer = Importer::new(&db);
879 let mode = ImportMode::Selective(vec!["HD".to_string(), "EN".to_string()]);
880 let stats = importer
881 .import_zip_with_mode(&zip_path, mode, None)
882 .unwrap();
883
884 assert_eq!(stats.files, 2);
886 assert_eq!(stats.records, 2);
887 assert!(stats.is_successful());
888 }
889}