1use std::fmt;
6use std::path::Path;
7
8use r2d2::{CustomizeConnection, Pool, PooledConnection};
9use r2d2_sqlite::SqliteConnectionManager;
10use rusqlite::{params, Connection};
11use tracing::{debug, info};
12
13use uls_core::codes::{EntityType, LicenseStatus, OperatorClass, RadioService};
14use uls_core::records::{
15 AmateurRecord, CommentRecord, EntityRecord, HeaderRecord, HistoryRecord,
16 SpecialConditionRecord, UlsRecord,
17};
18
19use crate::config::DatabaseConfig;
20use crate::enum_adapters::{read_license_status, read_operator_class, read_radio_service};
21use crate::error::Result;
22use crate::models::{License, LicenseStats};
23use crate::schema::Schema;
24
25#[derive(Clone)]
30struct SqliteConnectionCustomizer {
31 cache_size: i32,
32 foreign_keys: bool,
33}
34
35impl fmt::Debug for SqliteConnectionCustomizer {
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 f.debug_struct("SqliteConnectionCustomizer")
38 .field("cache_size", &self.cache_size)
39 .field("foreign_keys", &self.foreign_keys)
40 .finish()
41 }
42}
43
44impl CustomizeConnection<Connection, rusqlite::Error> for SqliteConnectionCustomizer {
45 fn on_acquire(&self, conn: &mut Connection) -> std::result::Result<(), rusqlite::Error> {
46 conn.execute_batch(&format!("PRAGMA cache_size = {};", self.cache_size))?;
48
49 if self.foreign_keys {
51 conn.execute_batch("PRAGMA foreign_keys = ON;")?;
52 }
53
54 conn.execute_batch(
56 r#"
57 PRAGMA busy_timeout = 5000;
58 PRAGMA synchronous = NORMAL;
59 PRAGMA temp_store = MEMORY;
60 PRAGMA mmap_size = 268435456;
61 "#,
62 )?;
63
64 Ok(())
65 }
66}
67
68pub struct Database {
70 pool: Pool<SqliteConnectionManager>,
71 config: DatabaseConfig,
72}
73
74impl Database {
75 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
77 let config = DatabaseConfig::with_path(path.as_ref());
78 Self::with_config(config)
79 }
80
81 pub fn with_config(config: DatabaseConfig) -> Result<Self> {
83 if let Some(parent) = config.path.parent() {
85 if !parent.exists() && config.path.to_str() != Some(":memory:") {
86 std::fs::create_dir_all(parent)?;
87 }
88 }
89
90 let manager = SqliteConnectionManager::file(&config.path);
91
92 let customizer = SqliteConnectionCustomizer {
94 cache_size: config.cache_size,
95 foreign_keys: config.foreign_keys,
96 };
97
98 let pool = Pool::builder()
99 .max_size(config.max_connections)
100 .min_idle(Some(0))
101 .connection_timeout(config.connection_timeout)
102 .connection_customizer(Box::new(customizer))
103 .build(manager)?;
104
105 let db = Self { pool, config };
106
107 if db.config.enable_wal {
109 let conn = db.conn()?;
110 conn.execute_batch("PRAGMA journal_mode = WAL;")?;
111 }
112
113 Ok(db)
114 }
115
116 pub fn conn(&self) -> Result<PooledConnection<SqliteConnectionManager>> {
118 Ok(self.pool.get()?)
119 }
120
121 pub fn initialize(&self) -> Result<()> {
123 let conn = self.conn()?;
124 Schema::initialize(&conn)?;
125 info!(
126 "Database initialized with schema version {}",
127 crate::schema::SCHEMA_VERSION
128 );
129 Ok(())
130 }
131
132 pub fn is_initialized(&self) -> Result<bool> {
134 let conn = self.conn()?;
135 Ok(Schema::get_version(&conn)?.is_some())
136 }
137
138 pub fn migrate_if_needed(&self) -> Result<()> {
142 let conn = self.conn()?;
143 Schema::migrate_if_needed(&conn)
144 }
145
146 pub fn begin_transaction(&self) -> Result<Transaction> {
148 let conn = self.pool.get()?;
149 conn.execute("BEGIN TRANSACTION", [])?;
150 Ok(Transaction { conn })
151 }
152
153 pub fn insert_record(&self, record: &UlsRecord) -> Result<()> {
155 let conn = self.conn()?;
156 Self::insert_record_conn(&conn, record)
157 }
158
159 fn insert_record_conn(conn: &Connection, record: &UlsRecord) -> Result<()> {
161 match record {
162 UlsRecord::Header(hd) => Self::insert_header(conn, hd),
163 UlsRecord::Entity(en) => Self::insert_entity(conn, en),
164 UlsRecord::Amateur(am) => Self::insert_amateur(conn, am),
165 UlsRecord::History(hs) => Self::insert_history(conn, hs),
166 UlsRecord::Comment(co) => Self::insert_comment(conn, co),
167 UlsRecord::SpecialCondition(sc) => Self::insert_special_condition(conn, sc),
168 _ => {
169 debug!(
170 "Skipping unsupported record type: {:?}",
171 record.record_type()
172 );
173 Ok(())
174 }
175 }
176 }
177
178 fn insert_header(conn: &Connection, hd: &HeaderRecord) -> Result<()> {
180 let license_status_code: Option<u8> = hd.license_status.and_then(|c| {
182 c.to_string()
183 .parse::<LicenseStatus>()
184 .ok()
185 .map(|s| s.to_u8())
186 });
187 let radio_service_code: Option<u8> = hd
189 .radio_service_code
190 .as_ref()
191 .and_then(|s| s.parse::<RadioService>().ok().map(|r| r.to_u8()));
192
193 let mut stmt = conn.prepare_cached(
194 r#"INSERT OR REPLACE INTO licenses (
195 unique_system_identifier, uls_file_number, ebf_number, call_sign,
196 license_status, radio_service_code, grant_date, expired_date,
197 cancellation_date, effective_date, last_action_date
198 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)"#,
199 )?;
200 stmt.execute(params![
201 hd.unique_system_identifier,
202 hd.uls_file_number,
203 hd.ebf_number,
204 hd.call_sign,
205 license_status_code,
206 radio_service_code,
207 hd.grant_date.map(|d| d.to_string()),
208 hd.expired_date.map(|d| d.to_string()),
209 hd.cancellation_date.map(|d| d.to_string()),
210 hd.effective_date.map(|d| d.to_string()),
211 hd.last_action_date.map(|d| d.to_string()),
212 ])?;
213 Ok(())
214 }
215
216 fn insert_entity(conn: &Connection, en: &EntityRecord) -> Result<()> {
218 let entity_type_code: Option<u8> = en
220 .entity_type
221 .as_ref()
222 .and_then(|s| s.parse::<EntityType>().ok().map(|e| e.to_u8()));
223
224 let mut stmt = conn.prepare_cached(
225 r#"INSERT OR REPLACE INTO entities (
226 unique_system_identifier, uls_file_number, ebf_number, call_sign,
227 entity_type, licensee_id, entity_name, first_name, middle_initial,
228 last_name, suffix, phone, fax, email, street_address, city, state,
229 zip_code, po_box, attention_line, sgin, frn, applicant_type_code,
230 status_code, status_date
231 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25)"#,
232 )?;
233 stmt.execute(params![
234 en.unique_system_identifier,
235 en.uls_file_number,
236 en.ebf_number,
237 en.call_sign,
238 entity_type_code,
239 en.licensee_id,
240 en.entity_name,
241 en.first_name,
242 en.mi.map(|c| c.to_string()),
243 en.last_name,
244 en.suffix,
245 en.phone,
246 en.fax,
247 en.email,
248 en.street_address,
249 en.city,
250 en.state,
251 en.zip_code,
252 en.po_box,
253 en.attention_line,
254 en.sgin,
255 en.frn,
256 en.applicant_type_code.map(|c| c.to_string()),
257 en.status_code.map(|c| c.to_string()),
258 en.status_date,
259 ])?;
260 Ok(())
261 }
262
263 fn insert_amateur(conn: &Connection, am: &AmateurRecord) -> Result<()> {
265 let operator_class_code: Option<u8> = am.operator_class.and_then(|c| {
267 c.to_string()
268 .parse::<OperatorClass>()
269 .ok()
270 .map(|o| o.to_u8())
271 });
272 let prev_operator_class_code: Option<u8> = am.previous_operator_class.and_then(|c| {
273 c.to_string()
274 .parse::<OperatorClass>()
275 .ok()
276 .map(|o| o.to_u8())
277 });
278
279 let mut stmt = conn.prepare_cached(
280 r#"INSERT OR REPLACE INTO amateur_operators (
281 unique_system_identifier, uls_file_number, ebf_number, call_sign,
282 operator_class, group_code, region_code, trustee_call_sign,
283 trustee_indicator, physician_certification, ve_signature,
284 systematic_call_sign_change, vanity_call_sign_change,
285 vanity_relationship, previous_call_sign, previous_operator_class,
286 trustee_name
287 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)"#,
288 )?;
289 stmt.execute(params![
290 am.unique_system_identifier,
291 am.uls_file_num,
292 am.ebf_number,
293 am.callsign,
294 operator_class_code,
295 am.group_code.map(|c| c.to_string()),
296 am.region_code,
297 am.trustee_callsign,
298 am.trustee_indicator.map(|c| c.to_string()),
299 am.physician_certification.map(|c| c.to_string()),
300 am.ve_signature.map(|c| c.to_string()),
301 am.systematic_callsign_change.map(|c| c.to_string()),
302 am.vanity_callsign_change.map(|c| c.to_string()),
303 am.vanity_relationship,
304 am.previous_callsign,
305 prev_operator_class_code,
306 am.trustee_name,
307 ])?;
308 Ok(())
309 }
310
311 fn insert_history(conn: &Connection, hs: &HistoryRecord) -> Result<()> {
313 let mut stmt = conn.prepare_cached(
314 r#"INSERT OR REPLACE INTO history (
315 unique_system_identifier, uls_file_number, callsign, log_date, code
316 ) VALUES (?1, ?2, ?3, ?4, ?5)"#,
317 )?;
318 stmt.execute(params![
319 hs.unique_system_identifier,
320 hs.uls_file_number,
321 hs.callsign,
322 hs.log_date,
323 hs.code,
324 ])?;
325 Ok(())
326 }
327
328 fn insert_comment(conn: &Connection, co: &CommentRecord) -> Result<()> {
330 let mut stmt = conn.prepare_cached(
331 r#"INSERT OR REPLACE INTO comments (
332 unique_system_identifier, uls_file_number, callsign, comment_date,
333 description, status_code, status_date
334 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"#,
335 )?;
336 stmt.execute(params![
337 co.unique_system_identifier,
338 co.uls_file_num,
339 co.callsign,
340 co.comment_date,
341 co.description,
342 co.status_code.map(|c| c.to_string()),
343 co.status_date,
344 ])?;
345 Ok(())
346 }
347
348 fn insert_special_condition(conn: &Connection, sc: &SpecialConditionRecord) -> Result<()> {
350 let mut stmt = conn.prepare_cached(
351 r#"INSERT OR REPLACE INTO special_conditions (
352 unique_system_identifier, uls_file_number, ebf_number, callsign,
353 special_condition_type, special_condition_code, status_code, status_date
354 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)"#,
355 )?;
356 stmt.execute(params![
357 sc.unique_system_identifier,
358 sc.uls_file_number,
359 sc.ebf_number,
360 sc.callsign,
361 sc.special_condition_type.map(|c| c.to_string()),
362 sc.special_condition_code,
363 sc.status_code.map(|c| c.to_string()),
364 sc.status_date,
365 ])?;
366 Ok(())
367 }
368
369 pub fn get_license_by_callsign(&self, callsign: &str) -> Result<Option<License>> {
371 let conn = self.conn()?;
372 let callsign = callsign.to_uppercase();
373
374 let result = conn.query_row(
375 r#"
376 SELECT
377 l.unique_system_identifier, l.call_sign,
378 e.entity_name, e.first_name, e.middle_initial, e.last_name,
379 l.license_status, l.radio_service_code,
380 l.grant_date, l.expired_date, l.cancellation_date,
381 e.frn, NULL as previous_call_sign,
382 e.street_address, e.city, e.state, e.zip_code,
383 a.operator_class
384 FROM licenses l
385 LEFT JOIN entities e ON l.unique_system_identifier = e.unique_system_identifier
386 LEFT JOIN amateur_operators a ON l.unique_system_identifier = a.unique_system_identifier
387 WHERE l.call_sign = ?1
388 ORDER BY l.license_status ASC, l.grant_date DESC
389 LIMIT 1
390 "#,
391 [&callsign],
392 |row| {
393 let status = read_license_status(row, 6)?;
395 let radio_service = read_radio_service(row, 7)?;
396 let operator_class = read_operator_class(row, 17)?;
397
398 Ok(License {
399 unique_system_identifier: row.get(0)?,
400 call_sign: row.get::<_, Option<String>>(1)?.unwrap_or_default(),
401 licensee_name: row.get::<_, Option<String>>(2)?.unwrap_or_default(),
402 first_name: row.get(3)?,
403 middle_initial: row.get(4)?,
404 last_name: row.get(5)?,
405 status,
406 radio_service,
407 grant_date: row
408 .get::<_, Option<String>>(8)?
409 .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
410 expired_date: row
411 .get::<_, Option<String>>(9)?
412 .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
413 cancellation_date: row
414 .get::<_, Option<String>>(10)?
415 .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
416 frn: row.get(11)?,
417 previous_call_sign: row.get(12)?,
418 street_address: row.get(13)?,
419 city: row.get(14)?,
420 state: row.get(15)?,
421 zip_code: row.get(16)?,
422 operator_class,
423 })
424 },
425 );
426
427 match result {
428 Ok(license) => Ok(Some(license)),
429 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
430 Err(e) => Err(e.into()),
431 }
432 }
433
434 pub fn get_licenses_by_frn(&self, frn: &str) -> Result<Vec<License>> {
436 let conn = self.conn()?;
437 let frn = frn.trim();
439
440 let mut stmt = conn.prepare(
441 r#"
442 SELECT
443 l.unique_system_identifier, l.call_sign,
444 e.entity_name, e.first_name, e.middle_initial, e.last_name,
445 l.license_status, l.radio_service_code,
446 l.grant_date, l.expired_date, l.cancellation_date,
447 e.frn, NULL as previous_call_sign,
448 e.street_address, e.city, e.state, e.zip_code,
449 a.operator_class
450 FROM licenses l
451 INNER JOIN entities e ON l.unique_system_identifier = e.unique_system_identifier
452 LEFT JOIN amateur_operators a ON l.unique_system_identifier = a.unique_system_identifier
453 WHERE e.frn = ?1
454 GROUP BY l.unique_system_identifier
455 ORDER BY l.radio_service_code, l.call_sign
456 "#,
457 )?;
458
459 let licenses = stmt.query_map([frn], |row| {
460 let status = read_license_status(row, 6)?;
462 let radio_service = read_radio_service(row, 7)?;
463 let operator_class = read_operator_class(row, 17)?;
464
465 Ok(License {
466 unique_system_identifier: row.get(0)?,
467 call_sign: row.get::<_, Option<String>>(1)?.unwrap_or_default(),
468 licensee_name: row.get::<_, Option<String>>(2)?.unwrap_or_default(),
469 first_name: row.get(3)?,
470 middle_initial: row.get(4)?,
471 last_name: row.get(5)?,
472 status,
473 radio_service,
474 grant_date: row
475 .get::<_, Option<String>>(8)?
476 .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
477 expired_date: row
478 .get::<_, Option<String>>(9)?
479 .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
480 cancellation_date: row
481 .get::<_, Option<String>>(10)?
482 .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
483 frn: row.get(11)?,
484 previous_call_sign: row.get(12)?,
485 street_address: row.get(13)?,
486 city: row.get(14)?,
487 state: row.get(15)?,
488 zip_code: row.get(16)?,
489 operator_class,
490 })
491 })?;
492
493 let mut result = Vec::new();
494 for license in licenses {
495 result.push(license?);
496 }
497 Ok(result)
498 }
499
500 pub fn get_stats(&self) -> Result<LicenseStats> {
502 let conn = self.conn()?;
503
504 let total_licenses: u64 = conn.query_row("SELECT COUNT(*) FROM licenses", [], |row| {
505 row.get::<_, i64>(0)
506 })? as u64;
507
508 let active_code = LicenseStatus::Active.to_u8();
510 let expired_code = LicenseStatus::Expired.to_u8();
511 let cancelled_code = LicenseStatus::Cancelled.to_u8();
512
513 let active_licenses: u64 = conn.query_row(
514 "SELECT COUNT(*) FROM licenses WHERE license_status = ?1",
515 [active_code],
516 |row| row.get::<_, i64>(0),
517 )? as u64;
518
519 let expired_licenses: u64 = conn.query_row(
520 "SELECT COUNT(*) FROM licenses WHERE license_status = ?1",
521 [expired_code],
522 |row| row.get::<_, i64>(0),
523 )? as u64;
524
525 let cancelled_licenses: u64 = conn.query_row(
526 "SELECT COUNT(*) FROM licenses WHERE license_status = ?1",
527 [cancelled_code],
528 |row| row.get::<_, i64>(0),
529 )? as u64;
530
531 let schema_version = Schema::get_version(&conn)?.unwrap_or(0);
532 let last_updated = Schema::get_metadata(&conn, "last_updated")?;
533
534 Ok(LicenseStats {
535 total_licenses,
536 active_licenses,
537 expired_licenses,
538 cancelled_licenses,
539 by_service: Vec::new(),
540 by_operator_class: Vec::new(),
541 last_updated,
542 schema_version,
543 })
544 }
545
546 pub fn count_by_service(&self, service_codes: &[&str]) -> Result<u64> {
549 if service_codes.is_empty() {
550 return Ok(0);
551 }
552
553 let int_codes: Vec<u8> = service_codes
555 .iter()
556 .filter_map(|s| s.parse::<RadioService>().ok().map(|r| r.to_u8()))
557 .collect();
558
559 if int_codes.is_empty() {
560 return Ok(0);
561 }
562
563 let conn = self.conn()?;
564 let placeholders: String = int_codes.iter().map(|_| "?").collect::<Vec<_>>().join(",");
565 let sql = format!(
566 "SELECT COUNT(*) FROM licenses WHERE radio_service_code IN ({})",
567 placeholders
568 );
569
570 let mut stmt = conn.prepare(&sql)?;
571 let count: u64 = stmt.query_row(rusqlite::params_from_iter(int_codes.iter()), |row| {
572 row.get::<_, i64>(0)
573 })? as u64;
574
575 Ok(count)
576 }
577
578 pub fn set_last_updated(&self, timestamp: &str) -> Result<()> {
580 let conn = self.conn()?;
581 Schema::set_metadata(&conn, "last_updated", timestamp)?;
582 Ok(())
583 }
584
585 pub fn get_imported_etag(&self, service: &str) -> Result<Option<String>> {
587 let conn = self.conn()?;
588 let key = format!("imported_etag_{}", service);
589 Schema::get_metadata(&conn, &key)
590 }
591
592 pub fn set_imported_etag(&self, service: &str, etag: &str) -> Result<()> {
594 let conn = self.conn()?;
595 let key = format!("imported_etag_{}", service);
596 Schema::set_metadata(&conn, &key, etag)?;
597 Ok(())
598 }
599
600 pub fn has_record_type(&self, service: &str, record_type: &str) -> Result<bool> {
606 let conn = self.conn()?;
607 let count: i64 = conn.query_row(
608 "SELECT COUNT(*) FROM import_status WHERE radio_service_code = ?1 AND record_type = ?2",
609 params![service, record_type],
610 |row| row.get(0),
611 )?;
612 Ok(count > 0)
613 }
614
615 pub fn get_imported_types(&self, service: &str) -> Result<Vec<String>> {
617 let conn = self.conn()?;
618 let mut stmt = conn.prepare(
619 "SELECT record_type FROM import_status WHERE radio_service_code = ?1 ORDER BY record_type"
620 )?;
621 let iter = stmt.query_map(params![service], |row| row.get(0))?;
622 let mut types = Vec::new();
623 for record_type in iter {
624 types.push(record_type?);
625 }
626 Ok(types)
627 }
628
629 pub fn mark_imported(&self, service: &str, record_type: &str, count: usize) -> Result<()> {
631 let conn = self.conn()?;
632 let now = chrono::Utc::now().to_rfc3339();
633 conn.execute(
634 "INSERT OR REPLACE INTO import_status (radio_service_code, record_type, imported_at, record_count)
635 VALUES (?1, ?2, ?3, ?4)",
636 params![service, record_type, now, count as i64],
637 )?;
638 Ok(())
639 }
640
641 pub fn clear_import_status(&self, service: &str) -> Result<()> {
643 let conn = self.conn()?;
644 conn.execute(
645 "DELETE FROM import_status WHERE radio_service_code = ?1",
646 params![service],
647 )?;
648 Ok(())
649 }
650
651 pub fn get_imported_count(&self, service: &str, record_type: &str) -> Result<Option<usize>> {
653 let conn = self.conn()?;
654 let result: rusqlite::Result<i64> = conn.query_row(
655 "SELECT record_count FROM import_status WHERE radio_service_code = ?1 AND record_type = ?2",
656 params![service, record_type],
657 |row| row.get(0),
658 );
659 match result {
660 Ok(count) => Ok(Some(count as usize)),
661 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
662 Err(e) => Err(e.into()),
663 }
664 }
665
666 pub fn get_last_updated(&self) -> Result<Option<String>> {
672 let conn = self.conn()?;
673 Schema::get_metadata(&conn, "last_updated")
674 }
675
676 pub fn get_freshness(
678 &self,
679 service: &str,
680 threshold_days: i64,
681 ) -> Result<crate::freshness::DataFreshness> {
682 let last_updated = self.get_last_updated()?;
683 let mut freshness = crate::freshness::DataFreshness::from_timestamp(
684 service,
685 last_updated.as_deref(),
686 threshold_days,
687 );
688
689 let weekly_key = format!("last_weekly_date_{}", service);
691 let conn = self.conn()?;
692 if let Some(date_str) = Schema::get_metadata(&conn, &weekly_key)? {
693 freshness.last_weekly_date =
694 chrono::NaiveDate::parse_from_str(&date_str, "%Y-%m-%d").ok();
695 }
696
697 freshness.applied_patch_dates = self
699 .get_applied_patches(service)?
700 .into_iter()
701 .map(|p| p.patch_date)
702 .collect();
703
704 Ok(freshness)
705 }
706
707 pub fn is_stale(&self, service: &str, threshold_days: i64) -> Result<bool> {
709 let freshness = self.get_freshness(service, threshold_days)?;
710 Ok(freshness.is_stale)
711 }
712
713 pub fn record_applied_patch(
715 &self,
716 service: &str,
717 patch_date: chrono::NaiveDate,
718 weekday: &str,
719 etag: Option<&str>,
720 record_count: Option<usize>,
721 ) -> Result<()> {
722 let conn = self.conn()?;
723 let now = chrono::Utc::now().to_rfc3339();
724 let date_str = patch_date.format("%Y-%m-%d").to_string();
725
726 conn.execute(
727 "INSERT OR REPLACE INTO applied_patches
728 (radio_service_code, patch_date, patch_weekday, applied_at, etag, record_count)
729 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
730 params![
731 service,
732 date_str,
733 weekday,
734 now,
735 etag,
736 record_count.map(|c| c as i64)
737 ],
738 )?;
739 Ok(())
740 }
741
742 pub fn get_applied_patches(
744 &self,
745 service: &str,
746 ) -> Result<Vec<crate::freshness::AppliedPatch>> {
747 let conn = self.conn()?;
748 let mut stmt = conn.prepare(
749 "SELECT radio_service_code, patch_date, patch_weekday, applied_at, etag, record_count
750 FROM applied_patches
751 WHERE radio_service_code = ?1
752 ORDER BY patch_date",
753 )?;
754
755 let iter = stmt.query_map(params![service], |row| {
756 let date_str: String = row.get(1)?;
757 let applied_at_str: String = row.get(3)?;
758
759 Ok(crate::freshness::AppliedPatch {
760 service: row.get(0)?,
761 patch_date: chrono::NaiveDate::parse_from_str(&date_str, "%Y-%m-%d")
762 .unwrap_or_else(|_| chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()),
763 weekday: row.get(2)?,
764 applied_at: chrono::DateTime::parse_from_rfc3339(&applied_at_str)
765 .map(|dt| dt.with_timezone(&chrono::Utc))
766 .unwrap_or_else(|_| chrono::Utc::now()),
767 etag: row.get(4)?,
768 record_count: row.get::<_, Option<i64>>(5)?.map(|c| c as usize),
769 })
770 })?;
771
772 let mut patches = Vec::new();
773 for patch in iter {
774 patches.push(patch?);
775 }
776 Ok(patches)
777 }
778
779 pub fn clear_applied_patches(&self, service: &str) -> Result<()> {
781 let conn = self.conn()?;
782 conn.execute(
783 "DELETE FROM applied_patches WHERE radio_service_code = ?1",
784 params![service],
785 )?;
786 Ok(())
787 }
788
789 pub fn set_last_weekly_date(&self, service: &str, date: chrono::NaiveDate) -> Result<()> {
791 let conn = self.conn()?;
792 let key = format!("last_weekly_date_{}", service);
793 let date_str = date.format("%Y-%m-%d").to_string();
794 Schema::set_metadata(&conn, &key, &date_str)?;
795 Ok(())
796 }
797
798 pub fn get_last_weekly_date(&self, service: &str) -> Result<Option<chrono::NaiveDate>> {
800 let conn = self.conn()?;
801 let key = format!("last_weekly_date_{}", service);
802 if let Some(date_str) = Schema::get_metadata(&conn, &key)? {
803 Ok(chrono::NaiveDate::parse_from_str(&date_str, "%Y-%m-%d").ok())
804 } else {
805 Ok(None)
806 }
807 }
808}
809
810pub struct Transaction {
812 conn: PooledConnection<SqliteConnectionManager>,
813}
814
815impl Transaction {
816 pub fn insert_record(&self, record: &UlsRecord) -> Result<()> {
818 Database::insert_record_conn(&self.conn, record)
819 }
820
821 pub fn commit(self) -> Result<()> {
823 self.conn.execute("COMMIT", [])?;
824 Ok(())
825 }
826
827 pub fn rollback(self) -> Result<()> {
829 self.conn.execute("ROLLBACK", [])?;
830 Ok(())
831 }
832}
833
834#[cfg(test)]
835mod tests {
836 use super::*;
837 use uls_core::records::HeaderRecord;
838
839 fn create_test_db() -> Database {
840 let config = DatabaseConfig::in_memory();
841 let db = Database::with_config(config).unwrap();
842 db.initialize().unwrap();
843 db
844 }
845
846 fn create_test_header() -> HeaderRecord {
847 let mut hd = HeaderRecord::from_fields(&["HD", "12345"]);
848 hd.unique_system_identifier = 12345;
849 hd.call_sign = Some("W1TEST".to_string());
850 hd.license_status = Some('A');
851 hd.radio_service_code = Some("HA".to_string());
852 hd
853 }
854
855 #[test]
856 fn test_open_database() {
857 let db = create_test_db();
858 assert!(db.is_initialized().unwrap());
859 }
860
861 #[test]
862 fn test_insert_and_query() {
863 let db = create_test_db();
864
865 let header = create_test_header();
866 db.insert_record(&UlsRecord::Header(header)).unwrap();
867
868 let license = db.get_license_by_callsign("W1TEST").unwrap();
869 assert!(license.is_some());
870
871 let license = license.unwrap();
872 assert_eq!(license.call_sign, "W1TEST");
873 assert_eq!(license.status, 'A');
874 assert!(license.is_active());
875 }
876
877 #[test]
878 fn test_case_insensitive_lookup() {
879 let db = create_test_db();
880
881 let header = create_test_header();
882 db.insert_record(&UlsRecord::Header(header)).unwrap();
883
884 let license = db.get_license_by_callsign("w1test").unwrap();
886 assert!(license.is_some());
887 }
888
889 #[test]
890 fn test_stats() {
891 let db = create_test_db();
892
893 let header = create_test_header();
894 db.insert_record(&UlsRecord::Header(header)).unwrap();
895
896 let stats = db.get_stats().unwrap();
897 assert_eq!(stats.total_licenses, 1);
898 assert_eq!(stats.active_licenses, 1);
899 }
900
901 #[test]
902 fn test_transaction() {
903 let db = create_test_db();
904
905 let tx = db.begin_transaction().unwrap();
906
907 let header = create_test_header();
908 tx.insert_record(&UlsRecord::Header(header)).unwrap();
909 tx.commit().unwrap();
910
911 let license = db.get_license_by_callsign("W1TEST").unwrap();
912 assert!(license.is_some());
913 }
914
915 #[test]
916 fn test_insert_entity() {
917 use uls_core::records::EntityRecord;
918
919 let db = create_test_db();
920
921 let header = create_test_header();
923 db.insert_record(&UlsRecord::Header(header)).unwrap();
924
925 let entity = EntityRecord::from_fields(&[
927 "EN",
928 "12345",
929 "",
930 "",
931 "W1TEST",
932 "L",
933 "L00100001",
934 "DOE, JOHN A",
935 "JOHN",
936 "A",
937 "DOE",
938 "",
939 "555-555-1234",
940 "",
941 "test@example.com",
942 "123 Main St",
943 "ANYTOWN",
944 "CA",
945 "90210",
946 "",
947 "",
948 "000",
949 "0001234567",
950 "I",
951 "",
952 "",
953 "",
954 "",
955 "",
956 "",
957 ]);
958 db.insert_record(&UlsRecord::Entity(entity)).unwrap();
959 }
960
961 #[test]
962 fn test_insert_amateur() {
963 use uls_core::records::AmateurRecord;
964
965 let db = create_test_db();
966
967 let header = create_test_header();
968 db.insert_record(&UlsRecord::Header(header)).unwrap();
969
970 let amateur = AmateurRecord::from_fields(&[
971 "AM", "12345", "", "", "W1TEST", "E", "D", "6", "", "", "", "", "", "", "", "", "", "",
972 ]);
973 db.insert_record(&UlsRecord::Amateur(amateur)).unwrap();
974 }
975
976 #[test]
977 fn test_insert_history() {
978 use uls_core::records::HistoryRecord;
979
980 let db = create_test_db();
981
982 let header = create_test_header();
983 db.insert_record(&UlsRecord::Header(header)).unwrap();
984
985 let history =
986 HistoryRecord::from_fields(&["HS", "12345", "", "W1TEST", "01/01/2020", "LIISS"]);
987 db.insert_record(&UlsRecord::History(history)).unwrap();
988 }
989
990 #[test]
991 fn test_insert_comment() {
992 use uls_core::records::CommentRecord;
993
994 let db = create_test_db();
995
996 let header = create_test_header();
997 db.insert_record(&UlsRecord::Header(header)).unwrap();
998
999 let comment = CommentRecord::from_fields(&[
1000 "CO",
1001 "12345",
1002 "",
1003 "W1TEST",
1004 "01/01/2020",
1005 "Test comment",
1006 ]);
1007 db.insert_record(&UlsRecord::Comment(comment)).unwrap();
1008 }
1009
1010 #[test]
1011 fn test_insert_special_condition() {
1012 use uls_core::records::SpecialConditionRecord;
1013
1014 let db = create_test_db();
1015
1016 let header = create_test_header();
1017 db.insert_record(&UlsRecord::Header(header)).unwrap();
1018
1019 let sc = SpecialConditionRecord::from_fields(&[
1020 "SC", "12345", "", "", "W1TEST", "P", "999", "", "",
1021 ]);
1022 db.insert_record(&UlsRecord::SpecialCondition(sc)).unwrap();
1023 }
1024
1025 #[test]
1026 fn test_get_licenses_by_frn() {
1027 use uls_core::records::EntityRecord;
1028
1029 let db = create_test_db();
1030
1031 let header = create_test_header();
1032 db.insert_record(&UlsRecord::Header(header)).unwrap();
1033
1034 let entity = EntityRecord::from_fields(&[
1036 "EN",
1037 "12345",
1038 "",
1039 "",
1040 "W1TEST",
1041 "L",
1042 "L00100001",
1043 "DOE, JOHN A",
1044 "JOHN",
1045 "A",
1046 "DOE",
1047 "",
1048 "",
1049 "",
1050 "",
1051 "",
1052 "",
1053 "",
1054 "",
1055 "",
1056 "",
1057 "000",
1058 "0001234567",
1059 "I",
1060 "",
1061 "",
1062 "",
1063 "",
1064 "",
1065 "",
1066 ]);
1067 db.insert_record(&UlsRecord::Entity(entity)).unwrap();
1068
1069 let licenses = db.get_licenses_by_frn("0001234567").unwrap();
1070 assert_eq!(licenses.len(), 1);
1071 assert_eq!(licenses[0].call_sign, "W1TEST");
1072 }
1073
1074 #[test]
1075 fn test_get_licenses_by_frn_not_found() {
1076 let db = create_test_db();
1077
1078 let licenses = db.get_licenses_by_frn("9999999999").unwrap();
1079 assert!(licenses.is_empty());
1080 }
1081
1082 #[test]
1083 fn test_lookup_prefers_active_over_cancelled() {
1084 let db = create_test_db();
1085
1086 let mut cancelled = HeaderRecord::from_fields(&["HD", "10001"]);
1088 cancelled.unique_system_identifier = 10001;
1089 cancelled.call_sign = Some("K2QA".to_string());
1090 cancelled.license_status = Some('C');
1091 cancelled.radio_service_code = Some("HA".to_string());
1092 db.insert_record(&UlsRecord::Header(cancelled)).unwrap();
1093
1094 let mut active = HeaderRecord::from_fields(&["HD", "20002"]);
1096 active.unique_system_identifier = 20002;
1097 active.call_sign = Some("K2QA".to_string());
1098 active.license_status = Some('A');
1099 active.radio_service_code = Some("HA".to_string());
1100 db.insert_record(&UlsRecord::Header(active)).unwrap();
1101
1102 let license = db.get_license_by_callsign("K2QA").unwrap();
1104 assert!(license.is_some(), "Should find license for K2QA");
1105 let license = license.unwrap();
1106 assert_eq!(
1107 license.status, 'A',
1108 "Should return active license, not cancelled (got status='{}')",
1109 license.status
1110 );
1111 assert_eq!(license.unique_system_identifier, 20002);
1112 }
1113
1114 #[test]
1115 fn test_lookup_returns_cancelled_when_no_active() {
1116 let db = create_test_db();
1117
1118 let mut cancelled = HeaderRecord::from_fields(&["HD", "10001"]);
1120 cancelled.unique_system_identifier = 10001;
1121 cancelled.call_sign = Some("W9OLD".to_string());
1122 cancelled.license_status = Some('C');
1123 cancelled.radio_service_code = Some("HA".to_string());
1124 db.insert_record(&UlsRecord::Header(cancelled)).unwrap();
1125
1126 let license = db.get_license_by_callsign("W9OLD").unwrap();
1128 assert!(license.is_some(), "Should find cancelled-only license");
1129 assert_eq!(license.unwrap().status, 'C');
1130 }
1131
1132 #[test]
1133 fn test_lookup_prefers_most_recent_inactive_record() {
1134 let db = create_test_db();
1135
1136 let mut older = HeaderRecord::from_fields(&["HD", "10001"]);
1138 older.unique_system_identifier = 10001;
1139 older.call_sign = Some("W3OLD".to_string());
1140 older.license_status = Some('E');
1141 older.radio_service_code = Some("HA".to_string());
1142 older.grant_date = Some(chrono::NaiveDate::from_ymd_opt(2015, 3, 1).unwrap());
1143 older.expired_date = Some(chrono::NaiveDate::from_ymd_opt(2025, 3, 1).unwrap());
1144 db.insert_record(&UlsRecord::Header(older)).unwrap();
1145
1146 let mut newer = HeaderRecord::from_fields(&["HD", "20002"]);
1148 newer.unique_system_identifier = 20002;
1149 newer.call_sign = Some("W3OLD".to_string());
1150 newer.license_status = Some('E');
1151 newer.radio_service_code = Some("HA".to_string());
1152 newer.grant_date = Some(chrono::NaiveDate::from_ymd_opt(2020, 6, 15).unwrap());
1153 newer.expired_date = Some(chrono::NaiveDate::from_ymd_opt(2030, 6, 15).unwrap());
1154 db.insert_record(&UlsRecord::Header(newer)).unwrap();
1155
1156 let license = db.get_license_by_callsign("W3OLD").unwrap();
1158 assert!(license.is_some(), "Should find expired license for W3OLD");
1159 let license = license.unwrap();
1160 assert_eq!(
1161 license.unique_system_identifier, 20002,
1162 "Should return the most recently granted expired record"
1163 );
1164 assert_eq!(license.status, 'E');
1165 }
1166
1167 #[test]
1168 fn test_count_by_service() {
1169 let db = create_test_db();
1170
1171 let header = create_test_header(); db.insert_record(&UlsRecord::Header(header)).unwrap();
1173
1174 let count = db.count_by_service(&["HA"]).unwrap();
1175 assert_eq!(count, 1);
1176
1177 let count = db.count_by_service(&["ZA"]).unwrap(); assert_eq!(count, 0);
1179 }
1180
1181 #[test]
1182 fn test_etag_operations() {
1183 let db = create_test_db();
1184
1185 let etag = db.get_imported_etag("l_amat").unwrap();
1187 assert!(etag.is_none());
1188
1189 db.set_imported_etag("l_amat", "abc123").unwrap();
1191
1192 let etag = db.get_imported_etag("l_amat").unwrap();
1194 assert_eq!(etag, Some("abc123".to_string()));
1195
1196 db.set_imported_etag("l_amat", "xyz789").unwrap();
1198 let etag = db.get_imported_etag("l_amat").unwrap();
1199 assert_eq!(etag, Some("xyz789".to_string()));
1200 }
1201
1202 #[test]
1203 fn test_set_last_updated() {
1204 let db = create_test_db();
1205
1206 db.set_last_updated("2025-01-17T12:00:00Z").unwrap();
1207 }
1209
1210 #[test]
1211 fn test_license_not_found() {
1212 let db = create_test_db();
1213
1214 let license = db.get_license_by_callsign("NOTEXIST").unwrap();
1215 assert!(license.is_none());
1216 }
1217
1218 #[test]
1219 fn test_transaction_rollback() {
1220 let db = create_test_db();
1221
1222 let tx = db.begin_transaction().unwrap();
1223 let header = create_test_header();
1224 tx.insert_record(&UlsRecord::Header(header)).unwrap();
1225 tx.rollback().unwrap();
1226
1227 let license = db.get_license_by_callsign("W1TEST").unwrap();
1229 assert!(license.is_none());
1230 }
1231
1232 #[test]
1233 fn test_open_database_with_path() {
1234 use tempfile::TempDir;
1235
1236 let temp_dir = TempDir::new().unwrap();
1237 let db_path = temp_dir.path().join("subdir").join("test.db");
1238
1239 let db = Database::open(&db_path).unwrap();
1241 db.initialize().unwrap();
1242 assert!(db.is_initialized().unwrap());
1243 assert!(db_path.parent().unwrap().exists());
1244 }
1245
1246 #[test]
1247 fn test_insert_unsupported_record_type() {
1248 use uls_core::records::LocationRecord;
1249
1250 let db = create_test_db();
1251
1252 let location = LocationRecord::from_fields(&["LO", "12345", "", "", "W1TEST"]);
1254 db.insert_record(&UlsRecord::Location(location)).unwrap();
1256 }
1257
1258 #[test]
1259 fn test_count_by_service_empty() {
1260 let db = create_test_db();
1261
1262 let count = db.count_by_service(&[]).unwrap();
1264 assert_eq!(count, 0);
1265 }
1266
1267 #[test]
1268 fn test_get_imported_types() {
1269 let db = create_test_db();
1270
1271 let types = db.get_imported_types("HA").unwrap();
1273 assert!(types.is_empty());
1274
1275 db.mark_imported("HA", "HD", 100).unwrap();
1277 db.mark_imported("HA", "EN", 50).unwrap();
1278 db.mark_imported("HA", "AM", 25).unwrap();
1279
1280 let types = db.get_imported_types("HA").unwrap();
1282 assert_eq!(types, vec!["AM", "EN", "HD"]); let types = db.get_imported_types("ZA").unwrap();
1286 assert!(types.is_empty());
1287 }
1288
1289 #[test]
1290 fn test_get_imported_count() {
1291 let db = create_test_db();
1292
1293 let count = db.get_imported_count("HA", "HD").unwrap();
1295 assert!(count.is_none());
1296
1297 db.mark_imported("HA", "HD", 500).unwrap();
1299
1300 let count = db.get_imported_count("HA", "HD").unwrap();
1302 assert_eq!(count, Some(500));
1303
1304 let count = db.get_imported_count("HA", "XX").unwrap();
1306 assert!(count.is_none());
1307
1308 let count = db.get_imported_count("ZZ", "HD").unwrap();
1310 assert!(count.is_none());
1311 }
1312
1313 #[test]
1314 fn test_import_status_lifecycle() {
1315 let db = create_test_db();
1316
1317 db.mark_imported("HA", "HD", 100).unwrap();
1319 db.mark_imported("HA", "EN", 200).unwrap();
1320
1321 assert!(db.has_record_type("HA", "HD").unwrap());
1323 assert!(db.has_record_type("HA", "EN").unwrap());
1324 assert!(!db.has_record_type("HA", "AM").unwrap());
1325
1326 db.clear_import_status("HA").unwrap();
1328
1329 assert!(!db.has_record_type("HA", "HD").unwrap());
1331 assert!(!db.has_record_type("HA", "EN").unwrap());
1332
1333 let types = db.get_imported_types("HA").unwrap();
1335 assert!(types.is_empty());
1336 }
1337
1338 #[test]
1339 fn test_pool_pragma_settings_on_all_connections() {
1340 let temp_dir = tempfile::tempdir().unwrap();
1342 let db_path = temp_dir.path().join("test_pool.db");
1343
1344 let config = crate::config::DatabaseConfig {
1345 path: db_path.clone(),
1346 max_connections: 3,
1347 foreign_keys: true,
1348 enable_wal: true,
1349 ..Default::default()
1350 };
1351
1352 let db = Database::with_config(config).unwrap();
1353 db.initialize().unwrap();
1354
1355 let mut connections = Vec::new();
1357 for i in 0..3 {
1358 let conn = db.conn().unwrap();
1359 let fk_enabled: i32 = conn
1360 .query_row("PRAGMA foreign_keys", [], |row| row.get(0))
1361 .unwrap();
1362 assert_eq!(fk_enabled, 1, "Connection {i} should have foreign_keys ON");
1363 connections.push(conn);
1364 }
1365
1366 drop(connections);
1368
1369 for i in 0..2 {
1370 let conn = db.conn().unwrap();
1371 let fk_enabled: i32 = conn
1372 .query_row("PRAGMA foreign_keys", [], |row| row.get(0))
1373 .unwrap();
1374 assert_eq!(
1375 fk_enabled, 1,
1376 "Re-acquired connection {i} should have foreign_keys ON"
1377 );
1378 }
1379 }
1380}