1use std::fmt;
6use std::path::Path;
7
8use r2d2::{CustomizeConnection, Pool, PooledConnection};
9use r2d2_sqlite::SqliteConnectionManager;
10use rusqlite::{params, Connection};
11use tracing::{debug, info};
12
13use uls_core::codes::{EntityType, LicenseStatus, OperatorClass, RadioService};
14use uls_core::records::{
15 AmateurRecord, CommentRecord, EntityRecord, HeaderRecord, HistoryRecord,
16 SpecialConditionRecord, UlsRecord,
17};
18
19use crate::config::DatabaseConfig;
20use crate::enum_adapters::{read_license_status, read_operator_class, read_radio_service};
21use crate::error::Result;
22use crate::models::{License, LicenseStats};
23use crate::schema::Schema;
24
25#[derive(Clone)]
30struct SqliteConnectionCustomizer {
31 cache_size: i32,
32 foreign_keys: bool,
33}
34
35impl fmt::Debug for SqliteConnectionCustomizer {
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 f.debug_struct("SqliteConnectionCustomizer")
38 .field("cache_size", &self.cache_size)
39 .field("foreign_keys", &self.foreign_keys)
40 .finish()
41 }
42}
43
44impl CustomizeConnection<Connection, rusqlite::Error> for SqliteConnectionCustomizer {
45 fn on_acquire(&self, conn: &mut Connection) -> std::result::Result<(), rusqlite::Error> {
46 conn.execute_batch(&format!("PRAGMA cache_size = {};", self.cache_size))?;
48
49 if self.foreign_keys {
51 conn.execute_batch("PRAGMA foreign_keys = ON;")?;
52 }
53
54 conn.execute_batch(
56 r#"
57 PRAGMA busy_timeout = 5000;
58 PRAGMA synchronous = NORMAL;
59 PRAGMA temp_store = MEMORY;
60 PRAGMA mmap_size = 268435456;
61 "#,
62 )?;
63
64 Ok(())
65 }
66}
67
68pub struct Database {
70 pool: Pool<SqliteConnectionManager>,
71 config: DatabaseConfig,
72}
73
74impl Database {
75 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
77 let config = DatabaseConfig::with_path(path.as_ref());
78 Self::with_config(config)
79 }
80
81 pub fn with_config(config: DatabaseConfig) -> Result<Self> {
83 if let Some(parent) = config.path.parent() {
85 if !parent.exists() && config.path.to_str() != Some(":memory:") {
86 std::fs::create_dir_all(parent)?;
87 }
88 }
89
90 let manager = SqliteConnectionManager::file(&config.path);
91
92 let customizer = SqliteConnectionCustomizer {
94 cache_size: config.cache_size,
95 foreign_keys: config.foreign_keys,
96 };
97
98 let pool = Pool::builder()
99 .max_size(config.max_connections)
100 .min_idle(Some(0))
101 .connection_timeout(config.connection_timeout)
102 .connection_customizer(Box::new(customizer))
103 .build(manager)?;
104
105 let db = Self { pool, config };
106
107 if db.config.enable_wal {
109 let conn = db.conn()?;
110 conn.execute_batch("PRAGMA journal_mode = WAL;")?;
111 }
112
113 Ok(db)
114 }
115
116 pub fn conn(&self) -> Result<PooledConnection<SqliteConnectionManager>> {
118 Ok(self.pool.get()?)
119 }
120
121 pub fn initialize(&self) -> Result<()> {
123 let conn = self.conn()?;
124 Schema::initialize(&conn)?;
125 info!(
126 "Database initialized with schema version {}",
127 crate::schema::SCHEMA_VERSION
128 );
129 Ok(())
130 }
131
132 pub fn is_initialized(&self) -> Result<bool> {
134 let conn = self.conn()?;
135 Ok(Schema::get_version(&conn)?.is_some())
136 }
137
138 pub fn migrate_if_needed(&self) -> Result<()> {
142 let conn = self.conn()?;
143 Schema::migrate_if_needed(&conn)
144 }
145
146 pub fn begin_transaction(&self) -> Result<Transaction> {
148 let conn = self.pool.get()?;
149 conn.execute("BEGIN TRANSACTION", [])?;
150 Ok(Transaction { conn })
151 }
152
153 pub fn insert_record(&self, record: &UlsRecord) -> Result<()> {
155 let conn = self.conn()?;
156 Self::insert_record_conn(&conn, record)
157 }
158
159 fn insert_record_conn(conn: &Connection, record: &UlsRecord) -> Result<()> {
161 match record {
162 UlsRecord::Header(hd) => Self::insert_header(conn, hd),
163 UlsRecord::Entity(en) => Self::insert_entity(conn, en),
164 UlsRecord::Amateur(am) => Self::insert_amateur(conn, am),
165 UlsRecord::History(hs) => Self::insert_history(conn, hs),
166 UlsRecord::Comment(co) => Self::insert_comment(conn, co),
167 UlsRecord::SpecialCondition(sc) => Self::insert_special_condition(conn, sc),
168 _ => {
169 debug!(
170 "Skipping unsupported record type: {:?}",
171 record.record_type()
172 );
173 Ok(())
174 }
175 }
176 }
177
178 fn insert_header(conn: &Connection, hd: &HeaderRecord) -> Result<()> {
180 let license_status_code: Option<u8> = hd.license_status.and_then(|c| {
182 c.to_string()
183 .parse::<LicenseStatus>()
184 .ok()
185 .map(|s| s.to_u8())
186 });
187 let radio_service_code: Option<u8> = hd
189 .radio_service_code
190 .as_ref()
191 .and_then(|s| s.parse::<RadioService>().ok().map(|r| r.to_u8()));
192
193 let mut stmt = conn.prepare_cached(
194 r#"INSERT OR REPLACE INTO licenses (
195 unique_system_identifier, uls_file_number, ebf_number, call_sign,
196 license_status, radio_service_code, grant_date, expired_date,
197 cancellation_date, effective_date, last_action_date
198 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)"#,
199 )?;
200 stmt.execute(params![
201 hd.unique_system_identifier,
202 hd.uls_file_number,
203 hd.ebf_number,
204 hd.call_sign,
205 license_status_code,
206 radio_service_code,
207 hd.grant_date.map(|d| d.to_string()),
208 hd.expired_date.map(|d| d.to_string()),
209 hd.cancellation_date.map(|d| d.to_string()),
210 hd.effective_date.map(|d| d.to_string()),
211 hd.last_action_date.map(|d| d.to_string()),
212 ])?;
213 Ok(())
214 }
215
216 fn insert_entity(conn: &Connection, en: &EntityRecord) -> Result<()> {
218 let entity_type_code: Option<u8> = en
220 .entity_type
221 .as_ref()
222 .and_then(|s| s.parse::<EntityType>().ok().map(|e| e.to_u8()));
223
224 let mut stmt = conn.prepare_cached(
225 r#"INSERT OR REPLACE INTO entities (
226 unique_system_identifier, uls_file_number, ebf_number, call_sign,
227 entity_type, licensee_id, entity_name, first_name, middle_initial,
228 last_name, suffix, phone, fax, email, street_address, city, state,
229 zip_code, po_box, attention_line, sgin, frn, applicant_type_code,
230 status_code, status_date
231 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25)"#,
232 )?;
233 stmt.execute(params![
234 en.unique_system_identifier,
235 en.uls_file_number,
236 en.ebf_number,
237 en.call_sign,
238 entity_type_code,
239 en.licensee_id,
240 en.entity_name,
241 en.first_name,
242 en.mi.map(|c| c.to_string()),
243 en.last_name,
244 en.suffix,
245 en.phone,
246 en.fax,
247 en.email,
248 en.street_address,
249 en.city,
250 en.state,
251 en.zip_code,
252 en.po_box,
253 en.attention_line,
254 en.sgin,
255 en.frn,
256 en.applicant_type_code.map(|c| c.to_string()),
257 en.status_code.map(|c| c.to_string()),
258 en.status_date,
259 ])?;
260 Ok(())
261 }
262
263 fn insert_amateur(conn: &Connection, am: &AmateurRecord) -> Result<()> {
265 let operator_class_code: Option<u8> = am.operator_class.and_then(|c| {
267 c.to_string()
268 .parse::<OperatorClass>()
269 .ok()
270 .map(|o| o.to_u8())
271 });
272 let prev_operator_class_code: Option<u8> = am.previous_operator_class.and_then(|c| {
273 c.to_string()
274 .parse::<OperatorClass>()
275 .ok()
276 .map(|o| o.to_u8())
277 });
278
279 let mut stmt = conn.prepare_cached(
280 r#"INSERT OR REPLACE INTO amateur_operators (
281 unique_system_identifier, uls_file_number, ebf_number, call_sign,
282 operator_class, group_code, region_code, trustee_call_sign,
283 trustee_indicator, physician_certification, ve_signature,
284 systematic_call_sign_change, vanity_call_sign_change,
285 vanity_relationship, previous_call_sign, previous_operator_class,
286 trustee_name
287 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)"#,
288 )?;
289 stmt.execute(params![
290 am.unique_system_identifier,
291 am.uls_file_num,
292 am.ebf_number,
293 am.callsign,
294 operator_class_code,
295 am.group_code.map(|c| c.to_string()),
296 am.region_code,
297 am.trustee_callsign,
298 am.trustee_indicator.map(|c| c.to_string()),
299 am.physician_certification.map(|c| c.to_string()),
300 am.ve_signature.map(|c| c.to_string()),
301 am.systematic_callsign_change.map(|c| c.to_string()),
302 am.vanity_callsign_change.map(|c| c.to_string()),
303 am.vanity_relationship,
304 am.previous_callsign,
305 prev_operator_class_code,
306 am.trustee_name,
307 ])?;
308 Ok(())
309 }
310
311 fn insert_history(conn: &Connection, hs: &HistoryRecord) -> Result<()> {
313 let mut stmt = conn.prepare_cached(
314 r#"INSERT OR REPLACE INTO history (
315 unique_system_identifier, uls_file_number, callsign, log_date, code
316 ) VALUES (?1, ?2, ?3, ?4, ?5)"#,
317 )?;
318 stmt.execute(params![
319 hs.unique_system_identifier,
320 hs.uls_file_number,
321 hs.callsign,
322 hs.log_date,
323 hs.code,
324 ])?;
325 Ok(())
326 }
327
328 fn insert_comment(conn: &Connection, co: &CommentRecord) -> Result<()> {
330 let mut stmt = conn.prepare_cached(
331 r#"INSERT OR REPLACE INTO comments (
332 unique_system_identifier, uls_file_number, callsign, comment_date,
333 description, status_code, status_date
334 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"#,
335 )?;
336 stmt.execute(params![
337 co.unique_system_identifier,
338 co.uls_file_num,
339 co.callsign,
340 co.comment_date,
341 co.description,
342 co.status_code.map(|c| c.to_string()),
343 co.status_date,
344 ])?;
345 Ok(())
346 }
347
348 fn insert_special_condition(conn: &Connection, sc: &SpecialConditionRecord) -> Result<()> {
350 let mut stmt = conn.prepare_cached(
351 r#"INSERT OR REPLACE INTO special_conditions (
352 unique_system_identifier, uls_file_number, ebf_number, callsign,
353 special_condition_type, special_condition_code, status_code, status_date
354 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)"#,
355 )?;
356 stmt.execute(params![
357 sc.unique_system_identifier,
358 sc.uls_file_number,
359 sc.ebf_number,
360 sc.callsign,
361 sc.special_condition_type.map(|c| c.to_string()),
362 sc.special_condition_code,
363 sc.status_code.map(|c| c.to_string()),
364 sc.status_date,
365 ])?;
366 Ok(())
367 }
368
369 pub fn get_license_by_callsign(&self, callsign: &str) -> Result<Option<License>> {
371 let conn = self.conn()?;
372 let callsign = callsign.to_uppercase();
373
374 let result = conn.query_row(
375 r#"
376 SELECT
377 l.unique_system_identifier, l.call_sign,
378 e.entity_name, e.first_name, e.middle_initial, e.last_name,
379 l.license_status, l.radio_service_code,
380 l.grant_date, l.expired_date, l.cancellation_date,
381 e.frn, NULL as previous_call_sign,
382 e.street_address, e.city, e.state, e.zip_code, e.po_box,
383 a.operator_class
384 FROM licenses l
385 LEFT JOIN entities e ON l.unique_system_identifier = e.unique_system_identifier
386 LEFT JOIN amateur_operators a ON l.unique_system_identifier = a.unique_system_identifier
387 WHERE l.call_sign = ?1
388 ORDER BY l.license_status ASC, l.grant_date DESC
389 LIMIT 1
390 "#,
391 [&callsign],
392 |row| {
393 let status = read_license_status(row, 6)?;
395 let radio_service = read_radio_service(row, 7)?;
396 let operator_class = read_operator_class(row, 18)?;
397
398 Ok(License {
399 unique_system_identifier: row.get(0)?,
400 call_sign: row.get::<_, Option<String>>(1)?.unwrap_or_default(),
401 licensee_name: row.get::<_, Option<String>>(2)?.unwrap_or_default(),
402 first_name: row.get(3)?,
403 middle_initial: row.get(4)?,
404 last_name: row.get(5)?,
405 status,
406 radio_service,
407 grant_date: row
408 .get::<_, Option<String>>(8)?
409 .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
410 expired_date: row
411 .get::<_, Option<String>>(9)?
412 .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
413 cancellation_date: row
414 .get::<_, Option<String>>(10)?
415 .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
416 frn: row.get(11)?,
417 previous_call_sign: row.get(12)?,
418 street_address: row.get(13)?,
419 city: row.get(14)?,
420 state: row.get(15)?,
421 zip_code: row.get(16)?,
422 po_box: row.get(17)?,
423 operator_class,
424 })
425 },
426 );
427
428 match result {
429 Ok(license) => Ok(Some(license)),
430 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
431 Err(e) => Err(e.into()),
432 }
433 }
434
435 pub fn get_licenses_by_frn(&self, frn: &str) -> Result<Vec<License>> {
437 let conn = self.conn()?;
438 let frn = frn.trim();
440
441 let mut stmt = conn.prepare(
442 r#"
443 SELECT
444 l.unique_system_identifier, l.call_sign,
445 e.entity_name, e.first_name, e.middle_initial, e.last_name,
446 l.license_status, l.radio_service_code,
447 l.grant_date, l.expired_date, l.cancellation_date,
448 e.frn, NULL as previous_call_sign,
449 e.street_address, e.city, e.state, e.zip_code, e.po_box,
450 a.operator_class
451 FROM licenses l
452 INNER JOIN entities e ON l.unique_system_identifier = e.unique_system_identifier
453 LEFT JOIN amateur_operators a ON l.unique_system_identifier = a.unique_system_identifier
454 WHERE e.frn = ?1
455 GROUP BY l.unique_system_identifier
456 ORDER BY l.radio_service_code, l.call_sign
457 "#,
458 )?;
459
460 let licenses = stmt.query_map([frn], |row| {
461 let status = read_license_status(row, 6)?;
463 let radio_service = read_radio_service(row, 7)?;
464 let operator_class = read_operator_class(row, 18)?;
465
466 Ok(License {
467 unique_system_identifier: row.get(0)?,
468 call_sign: row.get::<_, Option<String>>(1)?.unwrap_or_default(),
469 licensee_name: row.get::<_, Option<String>>(2)?.unwrap_or_default(),
470 first_name: row.get(3)?,
471 middle_initial: row.get(4)?,
472 last_name: row.get(5)?,
473 status,
474 radio_service,
475 grant_date: row
476 .get::<_, Option<String>>(8)?
477 .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
478 expired_date: row
479 .get::<_, Option<String>>(9)?
480 .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
481 cancellation_date: row
482 .get::<_, Option<String>>(10)?
483 .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
484 frn: row.get(11)?,
485 previous_call_sign: row.get(12)?,
486 street_address: row.get(13)?,
487 city: row.get(14)?,
488 state: row.get(15)?,
489 zip_code: row.get(16)?,
490 po_box: row.get(17)?,
491 operator_class,
492 })
493 })?;
494
495 let mut result = Vec::new();
496 for license in licenses {
497 result.push(license?);
498 }
499 Ok(result)
500 }
501
502 pub fn get_stats(&self) -> Result<LicenseStats> {
504 let conn = self.conn()?;
505
506 let total_licenses: u64 = conn.query_row("SELECT COUNT(*) FROM licenses", [], |row| {
507 row.get::<_, i64>(0)
508 })? as u64;
509
510 let active_code = LicenseStatus::Active.to_u8();
512 let expired_code = LicenseStatus::Expired.to_u8();
513 let cancelled_code = LicenseStatus::Cancelled.to_u8();
514
515 let active_licenses: u64 = conn.query_row(
516 "SELECT COUNT(*) FROM licenses WHERE license_status = ?1",
517 [active_code],
518 |row| row.get::<_, i64>(0),
519 )? as u64;
520
521 let expired_licenses: u64 = conn.query_row(
522 "SELECT COUNT(*) FROM licenses WHERE license_status = ?1",
523 [expired_code],
524 |row| row.get::<_, i64>(0),
525 )? as u64;
526
527 let cancelled_licenses: u64 = conn.query_row(
528 "SELECT COUNT(*) FROM licenses WHERE license_status = ?1",
529 [cancelled_code],
530 |row| row.get::<_, i64>(0),
531 )? as u64;
532
533 let schema_version = Schema::get_version(&conn)?.unwrap_or(0);
534 let last_updated = Schema::get_metadata(&conn, "last_updated")?;
535
536 Ok(LicenseStats {
537 total_licenses,
538 active_licenses,
539 expired_licenses,
540 cancelled_licenses,
541 by_service: Vec::new(),
542 by_operator_class: Vec::new(),
543 last_updated,
544 schema_version,
545 })
546 }
547
548 pub fn count_by_service(&self, service_codes: &[&str]) -> Result<u64> {
551 if service_codes.is_empty() {
552 return Ok(0);
553 }
554
555 let int_codes: Vec<u8> = service_codes
557 .iter()
558 .filter_map(|s| s.parse::<RadioService>().ok().map(|r| r.to_u8()))
559 .collect();
560
561 if int_codes.is_empty() {
562 return Ok(0);
563 }
564
565 let conn = self.conn()?;
566 let placeholders: String = int_codes.iter().map(|_| "?").collect::<Vec<_>>().join(",");
567 let sql = format!(
568 "SELECT COUNT(*) FROM licenses WHERE radio_service_code IN ({})",
569 placeholders
570 );
571
572 let mut stmt = conn.prepare(&sql)?;
573 let count: u64 = stmt.query_row(rusqlite::params_from_iter(int_codes.iter()), |row| {
574 row.get::<_, i64>(0)
575 })? as u64;
576
577 Ok(count)
578 }
579
580 pub fn set_last_updated(&self, timestamp: &str) -> Result<()> {
582 let conn = self.conn()?;
583 Schema::set_metadata(&conn, "last_updated", timestamp)?;
584 Ok(())
585 }
586
587 pub fn get_imported_etag(&self, service: &str) -> Result<Option<String>> {
589 let conn = self.conn()?;
590 let key = format!("imported_etag_{}", service);
591 Schema::get_metadata(&conn, &key)
592 }
593
594 pub fn set_imported_etag(&self, service: &str, etag: &str) -> Result<()> {
596 let conn = self.conn()?;
597 let key = format!("imported_etag_{}", service);
598 Schema::set_metadata(&conn, &key, etag)?;
599 Ok(())
600 }
601
602 pub fn has_record_type(&self, service: &str, record_type: &str) -> Result<bool> {
608 let conn = self.conn()?;
609 let count: i64 = conn.query_row(
610 "SELECT COUNT(*) FROM import_status WHERE radio_service_code = ?1 AND record_type = ?2",
611 params![service, record_type],
612 |row| row.get(0),
613 )?;
614 Ok(count > 0)
615 }
616
617 pub fn get_imported_types(&self, service: &str) -> Result<Vec<String>> {
619 let conn = self.conn()?;
620 let mut stmt = conn.prepare(
621 "SELECT record_type FROM import_status WHERE radio_service_code = ?1 ORDER BY record_type"
622 )?;
623 let iter = stmt.query_map(params![service], |row| row.get(0))?;
624 let mut types = Vec::new();
625 for record_type in iter {
626 types.push(record_type?);
627 }
628 Ok(types)
629 }
630
631 pub fn mark_imported(&self, service: &str, record_type: &str, count: usize) -> Result<()> {
633 let conn = self.conn()?;
634 let now = chrono::Utc::now().to_rfc3339();
635 conn.execute(
636 "INSERT OR REPLACE INTO import_status (radio_service_code, record_type, imported_at, record_count)
637 VALUES (?1, ?2, ?3, ?4)",
638 params![service, record_type, now, count as i64],
639 )?;
640 Ok(())
641 }
642
643 pub fn clear_import_status(&self, service: &str) -> Result<()> {
645 let conn = self.conn()?;
646 conn.execute(
647 "DELETE FROM import_status WHERE radio_service_code = ?1",
648 params![service],
649 )?;
650 Ok(())
651 }
652
653 pub fn get_imported_count(&self, service: &str, record_type: &str) -> Result<Option<usize>> {
655 let conn = self.conn()?;
656 let result: rusqlite::Result<i64> = conn.query_row(
657 "SELECT record_count FROM import_status WHERE radio_service_code = ?1 AND record_type = ?2",
658 params![service, record_type],
659 |row| row.get(0),
660 );
661 match result {
662 Ok(count) => Ok(Some(count as usize)),
663 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
664 Err(e) => Err(e.into()),
665 }
666 }
667
668 pub fn get_last_updated(&self) -> Result<Option<String>> {
674 let conn = self.conn()?;
675 Schema::get_metadata(&conn, "last_updated")
676 }
677
678 pub fn get_freshness(
680 &self,
681 service: &str,
682 threshold_days: i64,
683 ) -> Result<crate::freshness::DataFreshness> {
684 let last_updated = self.get_last_updated()?;
685 let mut freshness = crate::freshness::DataFreshness::from_timestamp(
686 service,
687 last_updated.as_deref(),
688 threshold_days,
689 );
690
691 let weekly_key = format!("last_weekly_date_{}", service);
693 let conn = self.conn()?;
694 if let Some(date_str) = Schema::get_metadata(&conn, &weekly_key)? {
695 freshness.last_weekly_date =
696 chrono::NaiveDate::parse_from_str(&date_str, "%Y-%m-%d").ok();
697 }
698
699 freshness.applied_patch_dates = self
701 .get_applied_patches(service)?
702 .into_iter()
703 .map(|p| p.patch_date)
704 .collect();
705
706 Ok(freshness)
707 }
708
709 pub fn is_stale(&self, service: &str, threshold_days: i64) -> Result<bool> {
711 let freshness = self.get_freshness(service, threshold_days)?;
712 Ok(freshness.is_stale)
713 }
714
715 pub fn record_applied_patch(
717 &self,
718 service: &str,
719 patch_date: chrono::NaiveDate,
720 weekday: &str,
721 etag: Option<&str>,
722 record_count: Option<usize>,
723 ) -> Result<()> {
724 let conn = self.conn()?;
725 let now = chrono::Utc::now().to_rfc3339();
726 let date_str = patch_date.format("%Y-%m-%d").to_string();
727
728 conn.execute(
729 "INSERT OR REPLACE INTO applied_patches
730 (radio_service_code, patch_date, patch_weekday, applied_at, etag, record_count)
731 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
732 params![
733 service,
734 date_str,
735 weekday,
736 now,
737 etag,
738 record_count.map(|c| c as i64)
739 ],
740 )?;
741 Ok(())
742 }
743
744 pub fn get_applied_patches(
746 &self,
747 service: &str,
748 ) -> Result<Vec<crate::freshness::AppliedPatch>> {
749 let conn = self.conn()?;
750 let mut stmt = conn.prepare(
751 "SELECT radio_service_code, patch_date, patch_weekday, applied_at, etag, record_count
752 FROM applied_patches
753 WHERE radio_service_code = ?1
754 ORDER BY patch_date",
755 )?;
756
757 let iter = stmt.query_map(params![service], |row| {
758 let date_str: String = row.get(1)?;
759 let applied_at_str: String = row.get(3)?;
760
761 Ok(crate::freshness::AppliedPatch {
762 service: row.get(0)?,
763 patch_date: chrono::NaiveDate::parse_from_str(&date_str, "%Y-%m-%d")
764 .unwrap_or_else(|_| chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()),
765 weekday: row.get(2)?,
766 applied_at: chrono::DateTime::parse_from_rfc3339(&applied_at_str)
767 .map(|dt| dt.with_timezone(&chrono::Utc))
768 .unwrap_or_else(|_| chrono::Utc::now()),
769 etag: row.get(4)?,
770 record_count: row.get::<_, Option<i64>>(5)?.map(|c| c as usize),
771 })
772 })?;
773
774 let mut patches = Vec::new();
775 for patch in iter {
776 patches.push(patch?);
777 }
778 Ok(patches)
779 }
780
781 pub fn clear_applied_patches(&self, service: &str) -> Result<()> {
783 let conn = self.conn()?;
784 conn.execute(
785 "DELETE FROM applied_patches WHERE radio_service_code = ?1",
786 params![service],
787 )?;
788 Ok(())
789 }
790
791 pub fn set_last_weekly_date(&self, service: &str, date: chrono::NaiveDate) -> Result<()> {
793 let conn = self.conn()?;
794 let key = format!("last_weekly_date_{}", service);
795 let date_str = date.format("%Y-%m-%d").to_string();
796 Schema::set_metadata(&conn, &key, &date_str)?;
797 Ok(())
798 }
799
800 pub fn get_last_weekly_date(&self, service: &str) -> Result<Option<chrono::NaiveDate>> {
802 let conn = self.conn()?;
803 let key = format!("last_weekly_date_{}", service);
804 if let Some(date_str) = Schema::get_metadata(&conn, &key)? {
805 Ok(chrono::NaiveDate::parse_from_str(&date_str, "%Y-%m-%d").ok())
806 } else {
807 Ok(None)
808 }
809 }
810}
811
812pub struct Transaction {
814 conn: PooledConnection<SqliteConnectionManager>,
815}
816
817impl Transaction {
818 pub fn insert_record(&self, record: &UlsRecord) -> Result<()> {
820 Database::insert_record_conn(&self.conn, record)
821 }
822
823 pub fn commit(self) -> Result<()> {
825 self.conn.execute("COMMIT", [])?;
826 Ok(())
827 }
828
829 pub fn rollback(self) -> Result<()> {
831 self.conn.execute("ROLLBACK", [])?;
832 Ok(())
833 }
834}
835
836#[cfg(test)]
837mod tests {
838 use super::*;
839 use uls_core::records::HeaderRecord;
840
841 fn create_test_db() -> Database {
842 let config = DatabaseConfig::in_memory();
843 let db = Database::with_config(config).unwrap();
844 db.initialize().unwrap();
845 db
846 }
847
848 fn create_test_header() -> HeaderRecord {
849 let mut hd = HeaderRecord::from_fields(&["HD", "12345"]);
850 hd.unique_system_identifier = 12345;
851 hd.call_sign = Some("W1TEST".to_string());
852 hd.license_status = Some('A');
853 hd.radio_service_code = Some("HA".to_string());
854 hd
855 }
856
857 #[test]
858 fn test_open_database() {
859 let db = create_test_db();
860 assert!(db.is_initialized().unwrap());
861 }
862
863 #[test]
864 fn test_insert_and_query() {
865 let db = create_test_db();
866
867 let header = create_test_header();
868 db.insert_record(&UlsRecord::Header(header)).unwrap();
869
870 let license = db.get_license_by_callsign("W1TEST").unwrap();
871 assert!(license.is_some());
872
873 let license = license.unwrap();
874 assert_eq!(license.call_sign, "W1TEST");
875 assert_eq!(license.status, 'A');
876 assert!(license.is_active());
877 }
878
879 #[test]
880 fn test_case_insensitive_lookup() {
881 let db = create_test_db();
882
883 let header = create_test_header();
884 db.insert_record(&UlsRecord::Header(header)).unwrap();
885
886 let license = db.get_license_by_callsign("w1test").unwrap();
888 assert!(license.is_some());
889 }
890
891 #[test]
892 fn test_stats() {
893 let db = create_test_db();
894
895 let header = create_test_header();
896 db.insert_record(&UlsRecord::Header(header)).unwrap();
897
898 let stats = db.get_stats().unwrap();
899 assert_eq!(stats.total_licenses, 1);
900 assert_eq!(stats.active_licenses, 1);
901 }
902
903 #[test]
904 fn test_transaction() {
905 let db = create_test_db();
906
907 let tx = db.begin_transaction().unwrap();
908
909 let header = create_test_header();
910 tx.insert_record(&UlsRecord::Header(header)).unwrap();
911 tx.commit().unwrap();
912
913 let license = db.get_license_by_callsign("W1TEST").unwrap();
914 assert!(license.is_some());
915 }
916
917 #[test]
918 fn test_insert_entity() {
919 use uls_core::records::EntityRecord;
920
921 let db = create_test_db();
922
923 let header = create_test_header();
925 db.insert_record(&UlsRecord::Header(header)).unwrap();
926
927 let entity = EntityRecord::from_fields(&[
929 "EN",
930 "12345",
931 "",
932 "",
933 "W1TEST",
934 "L",
935 "L00100001",
936 "DOE, JOHN A",
937 "JOHN",
938 "A",
939 "DOE",
940 "",
941 "555-555-1234",
942 "",
943 "test@example.com",
944 "123 Main St",
945 "ANYTOWN",
946 "CA",
947 "90210",
948 "",
949 "",
950 "000",
951 "0001234567",
952 "I",
953 "",
954 "",
955 "",
956 "",
957 "",
958 "",
959 ]);
960 db.insert_record(&UlsRecord::Entity(entity)).unwrap();
961 }
962
963 #[test]
964 fn test_insert_amateur() {
965 use uls_core::records::AmateurRecord;
966
967 let db = create_test_db();
968
969 let header = create_test_header();
970 db.insert_record(&UlsRecord::Header(header)).unwrap();
971
972 let amateur = AmateurRecord::from_fields(&[
973 "AM", "12345", "", "", "W1TEST", "E", "D", "6", "", "", "", "", "", "", "", "", "", "",
974 ]);
975 db.insert_record(&UlsRecord::Amateur(amateur)).unwrap();
976 }
977
978 #[test]
979 fn test_insert_history() {
980 use uls_core::records::HistoryRecord;
981
982 let db = create_test_db();
983
984 let header = create_test_header();
985 db.insert_record(&UlsRecord::Header(header)).unwrap();
986
987 let history =
988 HistoryRecord::from_fields(&["HS", "12345", "", "W1TEST", "01/01/2020", "LIISS"]);
989 db.insert_record(&UlsRecord::History(history)).unwrap();
990 }
991
992 #[test]
993 fn test_insert_comment() {
994 use uls_core::records::CommentRecord;
995
996 let db = create_test_db();
997
998 let header = create_test_header();
999 db.insert_record(&UlsRecord::Header(header)).unwrap();
1000
1001 let comment = CommentRecord::from_fields(&[
1002 "CO",
1003 "12345",
1004 "",
1005 "W1TEST",
1006 "01/01/2020",
1007 "Test comment",
1008 ]);
1009 db.insert_record(&UlsRecord::Comment(comment)).unwrap();
1010 }
1011
1012 #[test]
1013 fn test_insert_special_condition() {
1014 use uls_core::records::SpecialConditionRecord;
1015
1016 let db = create_test_db();
1017
1018 let header = create_test_header();
1019 db.insert_record(&UlsRecord::Header(header)).unwrap();
1020
1021 let sc = SpecialConditionRecord::from_fields(&[
1022 "SC", "12345", "", "", "W1TEST", "P", "999", "", "",
1023 ]);
1024 db.insert_record(&UlsRecord::SpecialCondition(sc)).unwrap();
1025 }
1026
1027 #[test]
1028 fn test_get_licenses_by_frn() {
1029 use uls_core::records::EntityRecord;
1030
1031 let db = create_test_db();
1032
1033 let header = create_test_header();
1034 db.insert_record(&UlsRecord::Header(header)).unwrap();
1035
1036 let entity = EntityRecord::from_fields(&[
1038 "EN",
1039 "12345",
1040 "",
1041 "",
1042 "W1TEST",
1043 "L",
1044 "L00100001",
1045 "DOE, JOHN A",
1046 "JOHN",
1047 "A",
1048 "DOE",
1049 "",
1050 "",
1051 "",
1052 "",
1053 "",
1054 "",
1055 "",
1056 "",
1057 "",
1058 "",
1059 "000",
1060 "0001234567",
1061 "I",
1062 "",
1063 "",
1064 "",
1065 "",
1066 "",
1067 "",
1068 ]);
1069 db.insert_record(&UlsRecord::Entity(entity)).unwrap();
1070
1071 let licenses = db.get_licenses_by_frn("0001234567").unwrap();
1072 assert_eq!(licenses.len(), 1);
1073 assert_eq!(licenses[0].call_sign, "W1TEST");
1074 }
1075
1076 #[test]
1077 fn test_get_licenses_by_frn_not_found() {
1078 let db = create_test_db();
1079
1080 let licenses = db.get_licenses_by_frn("9999999999").unwrap();
1081 assert!(licenses.is_empty());
1082 }
1083
1084 #[test]
1085 fn test_lookup_prefers_active_over_cancelled() {
1086 let db = create_test_db();
1087
1088 let mut cancelled = HeaderRecord::from_fields(&["HD", "10001"]);
1090 cancelled.unique_system_identifier = 10001;
1091 cancelled.call_sign = Some("K2QA".to_string());
1092 cancelled.license_status = Some('C');
1093 cancelled.radio_service_code = Some("HA".to_string());
1094 db.insert_record(&UlsRecord::Header(cancelled)).unwrap();
1095
1096 let mut active = HeaderRecord::from_fields(&["HD", "20002"]);
1098 active.unique_system_identifier = 20002;
1099 active.call_sign = Some("K2QA".to_string());
1100 active.license_status = Some('A');
1101 active.radio_service_code = Some("HA".to_string());
1102 db.insert_record(&UlsRecord::Header(active)).unwrap();
1103
1104 let license = db.get_license_by_callsign("K2QA").unwrap();
1106 assert!(license.is_some(), "Should find license for K2QA");
1107 let license = license.unwrap();
1108 assert_eq!(
1109 license.status, 'A',
1110 "Should return active license, not cancelled (got status='{}')",
1111 license.status
1112 );
1113 assert_eq!(license.unique_system_identifier, 20002);
1114 }
1115
1116 #[test]
1117 fn test_lookup_returns_cancelled_when_no_active() {
1118 let db = create_test_db();
1119
1120 let mut cancelled = HeaderRecord::from_fields(&["HD", "10001"]);
1122 cancelled.unique_system_identifier = 10001;
1123 cancelled.call_sign = Some("W9OLD".to_string());
1124 cancelled.license_status = Some('C');
1125 cancelled.radio_service_code = Some("HA".to_string());
1126 db.insert_record(&UlsRecord::Header(cancelled)).unwrap();
1127
1128 let license = db.get_license_by_callsign("W9OLD").unwrap();
1130 assert!(license.is_some(), "Should find cancelled-only license");
1131 assert_eq!(license.unwrap().status, 'C');
1132 }
1133
1134 #[test]
1135 fn test_lookup_prefers_most_recent_inactive_record() {
1136 let db = create_test_db();
1137
1138 let mut older = HeaderRecord::from_fields(&["HD", "10001"]);
1140 older.unique_system_identifier = 10001;
1141 older.call_sign = Some("W3OLD".to_string());
1142 older.license_status = Some('E');
1143 older.radio_service_code = Some("HA".to_string());
1144 older.grant_date = Some(chrono::NaiveDate::from_ymd_opt(2015, 3, 1).unwrap());
1145 older.expired_date = Some(chrono::NaiveDate::from_ymd_opt(2025, 3, 1).unwrap());
1146 db.insert_record(&UlsRecord::Header(older)).unwrap();
1147
1148 let mut newer = HeaderRecord::from_fields(&["HD", "20002"]);
1150 newer.unique_system_identifier = 20002;
1151 newer.call_sign = Some("W3OLD".to_string());
1152 newer.license_status = Some('E');
1153 newer.radio_service_code = Some("HA".to_string());
1154 newer.grant_date = Some(chrono::NaiveDate::from_ymd_opt(2020, 6, 15).unwrap());
1155 newer.expired_date = Some(chrono::NaiveDate::from_ymd_opt(2030, 6, 15).unwrap());
1156 db.insert_record(&UlsRecord::Header(newer)).unwrap();
1157
1158 let license = db.get_license_by_callsign("W3OLD").unwrap();
1160 assert!(license.is_some(), "Should find expired license for W3OLD");
1161 let license = license.unwrap();
1162 assert_eq!(
1163 license.unique_system_identifier, 20002,
1164 "Should return the most recently granted expired record"
1165 );
1166 assert_eq!(license.status, 'E');
1167 }
1168
1169 #[test]
1170 fn test_count_by_service() {
1171 let db = create_test_db();
1172
1173 let header = create_test_header(); db.insert_record(&UlsRecord::Header(header)).unwrap();
1175
1176 let count = db.count_by_service(&["HA"]).unwrap();
1177 assert_eq!(count, 1);
1178
1179 let count = db.count_by_service(&["ZA"]).unwrap(); assert_eq!(count, 0);
1181 }
1182
1183 #[test]
1184 fn test_etag_operations() {
1185 let db = create_test_db();
1186
1187 let etag = db.get_imported_etag("l_amat").unwrap();
1189 assert!(etag.is_none());
1190
1191 db.set_imported_etag("l_amat", "abc123").unwrap();
1193
1194 let etag = db.get_imported_etag("l_amat").unwrap();
1196 assert_eq!(etag, Some("abc123".to_string()));
1197
1198 db.set_imported_etag("l_amat", "xyz789").unwrap();
1200 let etag = db.get_imported_etag("l_amat").unwrap();
1201 assert_eq!(etag, Some("xyz789".to_string()));
1202 }
1203
1204 #[test]
1205 fn test_set_last_updated() {
1206 let db = create_test_db();
1207
1208 db.set_last_updated("2025-01-17T12:00:00Z").unwrap();
1209 }
1211
1212 #[test]
1213 fn test_license_not_found() {
1214 let db = create_test_db();
1215
1216 let license = db.get_license_by_callsign("NOTEXIST").unwrap();
1217 assert!(license.is_none());
1218 }
1219
1220 #[test]
1221 fn test_transaction_rollback() {
1222 let db = create_test_db();
1223
1224 let tx = db.begin_transaction().unwrap();
1225 let header = create_test_header();
1226 tx.insert_record(&UlsRecord::Header(header)).unwrap();
1227 tx.rollback().unwrap();
1228
1229 let license = db.get_license_by_callsign("W1TEST").unwrap();
1231 assert!(license.is_none());
1232 }
1233
1234 #[test]
1235 fn test_open_database_with_path() {
1236 use tempfile::TempDir;
1237
1238 let temp_dir = TempDir::new().unwrap();
1239 let db_path = temp_dir.path().join("subdir").join("test.db");
1240
1241 let db = Database::open(&db_path).unwrap();
1243 db.initialize().unwrap();
1244 assert!(db.is_initialized().unwrap());
1245 assert!(db_path.parent().unwrap().exists());
1246 }
1247
1248 #[test]
1249 fn test_insert_unsupported_record_type() {
1250 use uls_core::records::LocationRecord;
1251
1252 let db = create_test_db();
1253
1254 let location = LocationRecord::from_fields(&["LO", "12345", "", "", "W1TEST"]);
1256 db.insert_record(&UlsRecord::Location(location)).unwrap();
1258 }
1259
1260 #[test]
1261 fn test_count_by_service_empty() {
1262 let db = create_test_db();
1263
1264 let count = db.count_by_service(&[]).unwrap();
1266 assert_eq!(count, 0);
1267 }
1268
1269 #[test]
1270 fn test_get_imported_types() {
1271 let db = create_test_db();
1272
1273 let types = db.get_imported_types("HA").unwrap();
1275 assert!(types.is_empty());
1276
1277 db.mark_imported("HA", "HD", 100).unwrap();
1279 db.mark_imported("HA", "EN", 50).unwrap();
1280 db.mark_imported("HA", "AM", 25).unwrap();
1281
1282 let types = db.get_imported_types("HA").unwrap();
1284 assert_eq!(types, vec!["AM", "EN", "HD"]); let types = db.get_imported_types("ZA").unwrap();
1288 assert!(types.is_empty());
1289 }
1290
1291 #[test]
1292 fn test_get_imported_count() {
1293 let db = create_test_db();
1294
1295 let count = db.get_imported_count("HA", "HD").unwrap();
1297 assert!(count.is_none());
1298
1299 db.mark_imported("HA", "HD", 500).unwrap();
1301
1302 let count = db.get_imported_count("HA", "HD").unwrap();
1304 assert_eq!(count, Some(500));
1305
1306 let count = db.get_imported_count("HA", "XX").unwrap();
1308 assert!(count.is_none());
1309
1310 let count = db.get_imported_count("ZZ", "HD").unwrap();
1312 assert!(count.is_none());
1313 }
1314
1315 #[test]
1316 fn test_import_status_lifecycle() {
1317 let db = create_test_db();
1318
1319 db.mark_imported("HA", "HD", 100).unwrap();
1321 db.mark_imported("HA", "EN", 200).unwrap();
1322
1323 assert!(db.has_record_type("HA", "HD").unwrap());
1325 assert!(db.has_record_type("HA", "EN").unwrap());
1326 assert!(!db.has_record_type("HA", "AM").unwrap());
1327
1328 db.clear_import_status("HA").unwrap();
1330
1331 assert!(!db.has_record_type("HA", "HD").unwrap());
1333 assert!(!db.has_record_type("HA", "EN").unwrap());
1334
1335 let types = db.get_imported_types("HA").unwrap();
1337 assert!(types.is_empty());
1338 }
1339
1340 #[test]
1341 fn test_pool_pragma_settings_on_all_connections() {
1342 let temp_dir = tempfile::tempdir().unwrap();
1344 let db_path = temp_dir.path().join("test_pool.db");
1345
1346 let config = crate::config::DatabaseConfig {
1347 path: db_path.clone(),
1348 max_connections: 3,
1349 foreign_keys: true,
1350 enable_wal: true,
1351 ..Default::default()
1352 };
1353
1354 let db = Database::with_config(config).unwrap();
1355 db.initialize().unwrap();
1356
1357 let mut connections = Vec::new();
1359 for i in 0..3 {
1360 let conn = db.conn().unwrap();
1361 let fk_enabled: i32 = conn
1362 .query_row("PRAGMA foreign_keys", [], |row| row.get(0))
1363 .unwrap();
1364 assert_eq!(fk_enabled, 1, "Connection {i} should have foreign_keys ON");
1365 connections.push(conn);
1366 }
1367
1368 drop(connections);
1370
1371 for i in 0..2 {
1372 let conn = db.conn().unwrap();
1373 let fk_enabled: i32 = conn
1374 .query_row("PRAGMA foreign_keys", [], |row| row.get(0))
1375 .unwrap();
1376 assert_eq!(
1377 fk_enabled, 1,
1378 "Re-acquired connection {i} should have foreign_keys ON"
1379 );
1380 }
1381 }
1382}