Skip to main content

uls_db/
repository.rs

1//! Database repository for ULS data operations.
2//!
3//! Provides high-level methods for inserting, updating, and querying ULS data.
4
5use std::fmt;
6use std::path::Path;
7
8use r2d2::{CustomizeConnection, Pool, PooledConnection};
9use r2d2_sqlite::SqliteConnectionManager;
10use rusqlite::{params, Connection};
11use tracing::{debug, info};
12
13use uls_core::codes::{EntityType, LicenseStatus, OperatorClass, RadioService};
14use uls_core::records::{
15    AmateurRecord, CommentRecord, EntityRecord, HeaderRecord, HistoryRecord,
16    SpecialConditionRecord, UlsRecord,
17};
18
19use crate::config::DatabaseConfig;
20use crate::enum_adapters::{read_license_status, read_operator_class, read_radio_service};
21use crate::error::Result;
22use crate::models::{License, LicenseStats};
23use crate::schema::Schema;
24
25/// Connection customizer that applies PRAGMA settings to each new connection.
26///
27/// This ensures every connection from the pool has consistent settings,
28/// not just the first connection checked out.
29#[derive(Clone)]
30struct SqliteConnectionCustomizer {
31    cache_size: i32,
32    foreign_keys: bool,
33}
34
35impl fmt::Debug for SqliteConnectionCustomizer {
36    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37        f.debug_struct("SqliteConnectionCustomizer")
38            .field("cache_size", &self.cache_size)
39            .field("foreign_keys", &self.foreign_keys)
40            .finish()
41    }
42}
43
44impl CustomizeConnection<Connection, rusqlite::Error> for SqliteConnectionCustomizer {
45    fn on_acquire(&self, conn: &mut Connection) -> std::result::Result<(), rusqlite::Error> {
46        // Set cache size
47        conn.execute_batch(&format!("PRAGMA cache_size = {};", self.cache_size))?;
48
49        // Enable foreign keys if configured
50        if self.foreign_keys {
51            conn.execute_batch("PRAGMA foreign_keys = ON;")?;
52        }
53
54        // Other per-connection optimizations
55        conn.execute_batch(
56            r#"
57            PRAGMA busy_timeout = 5000;
58            PRAGMA synchronous = NORMAL;
59            PRAGMA temp_store = MEMORY;
60            PRAGMA mmap_size = 268435456;
61            "#,
62        )?;
63
64        Ok(())
65    }
66}
67
68/// Database connection pool and operations.
69pub struct Database {
70    pool: Pool<SqliteConnectionManager>,
71    config: DatabaseConfig,
72}
73
74impl Database {
75    /// Open a database at the given path.
76    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
77        let config = DatabaseConfig::with_path(path.as_ref());
78        Self::with_config(config)
79    }
80
81    /// Open a database with the given configuration.
82    pub fn with_config(config: DatabaseConfig) -> Result<Self> {
83        // Create parent directory if needed
84        if let Some(parent) = config.path.parent() {
85            if !parent.exists() && config.path.to_str() != Some(":memory:") {
86                std::fs::create_dir_all(parent)?;
87            }
88        }
89
90        let manager = SqliteConnectionManager::file(&config.path);
91
92        // Create customizer to apply PRAGMAs on every pooled connection
93        let customizer = SqliteConnectionCustomizer {
94            cache_size: config.cache_size,
95            foreign_keys: config.foreign_keys,
96        };
97
98        let pool = Pool::builder()
99            .max_size(config.max_connections)
100            .min_idle(Some(0))
101            .connection_timeout(config.connection_timeout)
102            .connection_customizer(Box::new(customizer))
103            .build(manager)?;
104
105        let db = Self { pool, config };
106
107        // Set WAL mode once (it's a database-wide setting, not per-connection)
108        if db.config.enable_wal {
109            let conn = db.conn()?;
110            conn.execute_batch("PRAGMA journal_mode = WAL;")?;
111        }
112
113        Ok(db)
114    }
115
116    /// Get a connection from the pool.
117    pub fn conn(&self) -> Result<PooledConnection<SqliteConnectionManager>> {
118        Ok(self.pool.get()?)
119    }
120
121    /// Initialize the database schema.
122    pub fn initialize(&self) -> Result<()> {
123        let conn = self.conn()?;
124        Schema::initialize(&conn)?;
125        info!(
126            "Database initialized with schema version {}",
127            crate::schema::SCHEMA_VERSION
128        );
129        Ok(())
130    }
131
132    /// Check if the database is initialized.
133    pub fn is_initialized(&self) -> Result<bool> {
134        let conn = self.conn()?;
135        Ok(Schema::get_version(&conn)?.is_some())
136    }
137
138    /// Migrate the database schema if needed.
139    ///
140    /// Call this on an existing database to upgrade it to the current schema version.
141    pub fn migrate_if_needed(&self) -> Result<()> {
142        let conn = self.conn()?;
143        Schema::migrate_if_needed(&conn)
144    }
145
146    /// Begin a transaction for bulk operations.
147    pub fn begin_transaction(&self) -> Result<Transaction> {
148        let conn = self.pool.get()?;
149        conn.execute("BEGIN TRANSACTION", [])?;
150        Ok(Transaction { conn })
151    }
152
153    /// Insert a ULS record into the database.
154    pub fn insert_record(&self, record: &UlsRecord) -> Result<()> {
155        let conn = self.conn()?;
156        Self::insert_record_conn(&conn, record)
157    }
158
159    /// Insert a record using an existing connection.
160    fn insert_record_conn(conn: &Connection, record: &UlsRecord) -> Result<()> {
161        match record {
162            UlsRecord::Header(hd) => Self::insert_header(conn, hd),
163            UlsRecord::Entity(en) => Self::insert_entity(conn, en),
164            UlsRecord::Amateur(am) => Self::insert_amateur(conn, am),
165            UlsRecord::History(hs) => Self::insert_history(conn, hs),
166            UlsRecord::Comment(co) => Self::insert_comment(conn, co),
167            UlsRecord::SpecialCondition(sc) => Self::insert_special_condition(conn, sc),
168            _ => {
169                debug!(
170                    "Skipping unsupported record type: {:?}",
171                    record.record_type()
172                );
173                Ok(())
174            }
175        }
176    }
177
178    /// Insert a header record.
179    fn insert_header(conn: &Connection, hd: &HeaderRecord) -> Result<()> {
180        // Convert license_status char to integer code
181        let license_status_code: Option<u8> = hd.license_status.and_then(|c| {
182            c.to_string()
183                .parse::<LicenseStatus>()
184                .ok()
185                .map(|s| s.to_u8())
186        });
187        // Convert radio_service_code string to integer code
188        let radio_service_code: Option<u8> = hd
189            .radio_service_code
190            .as_ref()
191            .and_then(|s| s.parse::<RadioService>().ok().map(|r| r.to_u8()));
192
193        let mut stmt = conn.prepare_cached(
194            r#"INSERT OR REPLACE INTO licenses (
195                unique_system_identifier, uls_file_number, ebf_number, call_sign,
196                license_status, radio_service_code, grant_date, expired_date,
197                cancellation_date, effective_date, last_action_date
198            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)"#,
199        )?;
200        stmt.execute(params![
201            hd.unique_system_identifier,
202            hd.uls_file_number,
203            hd.ebf_number,
204            hd.call_sign,
205            license_status_code,
206            radio_service_code,
207            hd.grant_date.map(|d| d.to_string()),
208            hd.expired_date.map(|d| d.to_string()),
209            hd.cancellation_date.map(|d| d.to_string()),
210            hd.effective_date.map(|d| d.to_string()),
211            hd.last_action_date.map(|d| d.to_string()),
212        ])?;
213        Ok(())
214    }
215
216    /// Insert an entity record.
217    fn insert_entity(conn: &Connection, en: &EntityRecord) -> Result<()> {
218        // Convert entity_type string to integer code
219        let entity_type_code: Option<u8> = en
220            .entity_type
221            .as_ref()
222            .and_then(|s| s.parse::<EntityType>().ok().map(|e| e.to_u8()));
223
224        let mut stmt = conn.prepare_cached(
225            r#"INSERT OR REPLACE INTO entities (
226                unique_system_identifier, uls_file_number, ebf_number, call_sign,
227                entity_type, licensee_id, entity_name, first_name, middle_initial,
228                last_name, suffix, phone, fax, email, street_address, city, state,
229                zip_code, po_box, attention_line, sgin, frn, applicant_type_code,
230                status_code, status_date
231            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25)"#,
232        )?;
233        stmt.execute(params![
234            en.unique_system_identifier,
235            en.uls_file_number,
236            en.ebf_number,
237            en.call_sign,
238            entity_type_code,
239            en.licensee_id,
240            en.entity_name,
241            en.first_name,
242            en.mi.map(|c| c.to_string()),
243            en.last_name,
244            en.suffix,
245            en.phone,
246            en.fax,
247            en.email,
248            en.street_address,
249            en.city,
250            en.state,
251            en.zip_code,
252            en.po_box,
253            en.attention_line,
254            en.sgin,
255            en.frn,
256            en.applicant_type_code.map(|c| c.to_string()),
257            en.status_code.map(|c| c.to_string()),
258            en.status_date,
259        ])?;
260        Ok(())
261    }
262
263    /// Insert an amateur record.
264    fn insert_amateur(conn: &Connection, am: &AmateurRecord) -> Result<()> {
265        // Convert operator_class char to integer code
266        let operator_class_code: Option<u8> = am.operator_class.and_then(|c| {
267            c.to_string()
268                .parse::<OperatorClass>()
269                .ok()
270                .map(|o| o.to_u8())
271        });
272        let prev_operator_class_code: Option<u8> = am.previous_operator_class.and_then(|c| {
273            c.to_string()
274                .parse::<OperatorClass>()
275                .ok()
276                .map(|o| o.to_u8())
277        });
278
279        let mut stmt = conn.prepare_cached(
280            r#"INSERT OR REPLACE INTO amateur_operators (
281                unique_system_identifier, uls_file_number, ebf_number, call_sign,
282                operator_class, group_code, region_code, trustee_call_sign,
283                trustee_indicator, physician_certification, ve_signature,
284                systematic_call_sign_change, vanity_call_sign_change,
285                vanity_relationship, previous_call_sign, previous_operator_class,
286                trustee_name
287            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)"#,
288        )?;
289        stmt.execute(params![
290            am.unique_system_identifier,
291            am.uls_file_num,
292            am.ebf_number,
293            am.callsign,
294            operator_class_code,
295            am.group_code.map(|c| c.to_string()),
296            am.region_code,
297            am.trustee_callsign,
298            am.trustee_indicator.map(|c| c.to_string()),
299            am.physician_certification.map(|c| c.to_string()),
300            am.ve_signature.map(|c| c.to_string()),
301            am.systematic_callsign_change.map(|c| c.to_string()),
302            am.vanity_callsign_change.map(|c| c.to_string()),
303            am.vanity_relationship,
304            am.previous_callsign,
305            prev_operator_class_code,
306            am.trustee_name,
307        ])?;
308        Ok(())
309    }
310
311    /// Insert a history record.
312    fn insert_history(conn: &Connection, hs: &HistoryRecord) -> Result<()> {
313        let mut stmt = conn.prepare_cached(
314            r#"INSERT OR REPLACE INTO history (
315                unique_system_identifier, uls_file_number, callsign, log_date, code
316            ) VALUES (?1, ?2, ?3, ?4, ?5)"#,
317        )?;
318        stmt.execute(params![
319            hs.unique_system_identifier,
320            hs.uls_file_number,
321            hs.callsign,
322            hs.log_date,
323            hs.code,
324        ])?;
325        Ok(())
326    }
327
328    /// Insert a comment record.
329    fn insert_comment(conn: &Connection, co: &CommentRecord) -> Result<()> {
330        let mut stmt = conn.prepare_cached(
331            r#"INSERT OR REPLACE INTO comments (
332                unique_system_identifier, uls_file_number, callsign, comment_date,
333                description, status_code, status_date
334            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)"#,
335        )?;
336        stmt.execute(params![
337            co.unique_system_identifier,
338            co.uls_file_num,
339            co.callsign,
340            co.comment_date,
341            co.description,
342            co.status_code.map(|c| c.to_string()),
343            co.status_date,
344        ])?;
345        Ok(())
346    }
347
348    /// Insert a special condition record.
349    fn insert_special_condition(conn: &Connection, sc: &SpecialConditionRecord) -> Result<()> {
350        let mut stmt = conn.prepare_cached(
351            r#"INSERT OR REPLACE INTO special_conditions (
352                unique_system_identifier, uls_file_number, ebf_number, callsign,
353                special_condition_type, special_condition_code, status_code, status_date
354            ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)"#,
355        )?;
356        stmt.execute(params![
357            sc.unique_system_identifier,
358            sc.uls_file_number,
359            sc.ebf_number,
360            sc.callsign,
361            sc.special_condition_type.map(|c| c.to_string()),
362            sc.special_condition_code,
363            sc.status_code.map(|c| c.to_string()),
364            sc.status_date,
365        ])?;
366        Ok(())
367    }
368
369    /// Look up a license by call sign.
370    pub fn get_license_by_callsign(&self, callsign: &str) -> Result<Option<License>> {
371        let conn = self.conn()?;
372        let callsign = callsign.to_uppercase();
373
374        let result = conn.query_row(
375            r#"
376            SELECT
377                l.unique_system_identifier, l.call_sign,
378                e.entity_name, e.first_name, e.middle_initial, e.last_name,
379                l.license_status, l.radio_service_code,
380                l.grant_date, l.expired_date, l.cancellation_date,
381                e.frn, NULL as previous_call_sign,
382                e.street_address, e.city, e.state, e.zip_code, e.po_box,
383                a.operator_class
384            FROM licenses l
385            LEFT JOIN entities e ON l.unique_system_identifier = e.unique_system_identifier
386            LEFT JOIN amateur_operators a ON l.unique_system_identifier = a.unique_system_identifier
387            WHERE l.call_sign = ?1
388            ORDER BY l.license_status ASC, l.grant_date DESC
389            LIMIT 1
390            "#,
391            [&callsign],
392            |row| {
393                // Use centralized enum adapter helpers
394                let status = read_license_status(row, 6)?;
395                let radio_service = read_radio_service(row, 7)?;
396                let operator_class = read_operator_class(row, 18)?;
397
398                Ok(License {
399                    unique_system_identifier: row.get(0)?,
400                    call_sign: row.get::<_, Option<String>>(1)?.unwrap_or_default(),
401                    licensee_name: row.get::<_, Option<String>>(2)?.unwrap_or_default(),
402                    first_name: row.get(3)?,
403                    middle_initial: row.get(4)?,
404                    last_name: row.get(5)?,
405                    status,
406                    radio_service,
407                    grant_date: row
408                        .get::<_, Option<String>>(8)?
409                        .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
410                    expired_date: row
411                        .get::<_, Option<String>>(9)?
412                        .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
413                    cancellation_date: row
414                        .get::<_, Option<String>>(10)?
415                        .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
416                    frn: row.get(11)?,
417                    previous_call_sign: row.get(12)?,
418                    street_address: row.get(13)?,
419                    city: row.get(14)?,
420                    state: row.get(15)?,
421                    zip_code: row.get(16)?,
422                    po_box: row.get(17)?,
423                    operator_class,
424                })
425            },
426        );
427
428        match result {
429            Ok(license) => Ok(Some(license)),
430            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
431            Err(e) => Err(e.into()),
432        }
433    }
434
435    /// Look up all licenses by FRN (FCC Registration Number).
436    pub fn get_licenses_by_frn(&self, frn: &str) -> Result<Vec<License>> {
437        let conn = self.conn()?;
438        // Normalize FRN - strip leading zeros for comparison or pad to 10 digits
439        let frn = frn.trim();
440
441        let mut stmt = conn.prepare(
442            r#"
443            SELECT
444                l.unique_system_identifier, l.call_sign,
445                e.entity_name, e.first_name, e.middle_initial, e.last_name,
446                l.license_status, l.radio_service_code,
447                l.grant_date, l.expired_date, l.cancellation_date,
448                e.frn, NULL as previous_call_sign,
449                e.street_address, e.city, e.state, e.zip_code, e.po_box,
450                a.operator_class
451            FROM licenses l
452            INNER JOIN entities e ON l.unique_system_identifier = e.unique_system_identifier
453            LEFT JOIN amateur_operators a ON l.unique_system_identifier = a.unique_system_identifier
454            WHERE e.frn = ?1
455            GROUP BY l.unique_system_identifier
456            ORDER BY l.radio_service_code, l.call_sign
457            "#,
458        )?;
459
460        let licenses = stmt.query_map([frn], |row| {
461            // Use centralized enum adapter helpers
462            let status = read_license_status(row, 6)?;
463            let radio_service = read_radio_service(row, 7)?;
464            let operator_class = read_operator_class(row, 18)?;
465
466            Ok(License {
467                unique_system_identifier: row.get(0)?,
468                call_sign: row.get::<_, Option<String>>(1)?.unwrap_or_default(),
469                licensee_name: row.get::<_, Option<String>>(2)?.unwrap_or_default(),
470                first_name: row.get(3)?,
471                middle_initial: row.get(4)?,
472                last_name: row.get(5)?,
473                status,
474                radio_service,
475                grant_date: row
476                    .get::<_, Option<String>>(8)?
477                    .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
478                expired_date: row
479                    .get::<_, Option<String>>(9)?
480                    .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
481                cancellation_date: row
482                    .get::<_, Option<String>>(10)?
483                    .and_then(|s| chrono::NaiveDate::parse_from_str(&s, "%Y-%m-%d").ok()),
484                frn: row.get(11)?,
485                previous_call_sign: row.get(12)?,
486                street_address: row.get(13)?,
487                city: row.get(14)?,
488                state: row.get(15)?,
489                zip_code: row.get(16)?,
490                po_box: row.get(17)?,
491                operator_class,
492            })
493        })?;
494
495        let mut result = Vec::new();
496        for license in licenses {
497            result.push(license?);
498        }
499        Ok(result)
500    }
501
502    /// Get database statistics.
503    pub fn get_stats(&self) -> Result<LicenseStats> {
504        let conn = self.conn()?;
505
506        let total_licenses: u64 = conn.query_row("SELECT COUNT(*) FROM licenses", [], |row| {
507            row.get::<_, i64>(0)
508        })? as u64;
509
510        // Use integer codes for status comparisons
511        let active_code = LicenseStatus::Active.to_u8();
512        let expired_code = LicenseStatus::Expired.to_u8();
513        let cancelled_code = LicenseStatus::Cancelled.to_u8();
514
515        let active_licenses: u64 = conn.query_row(
516            "SELECT COUNT(*) FROM licenses WHERE license_status = ?1",
517            [active_code],
518            |row| row.get::<_, i64>(0),
519        )? as u64;
520
521        let expired_licenses: u64 = conn.query_row(
522            "SELECT COUNT(*) FROM licenses WHERE license_status = ?1",
523            [expired_code],
524            |row| row.get::<_, i64>(0),
525        )? as u64;
526
527        let cancelled_licenses: u64 = conn.query_row(
528            "SELECT COUNT(*) FROM licenses WHERE license_status = ?1",
529            [cancelled_code],
530            |row| row.get::<_, i64>(0),
531        )? as u64;
532
533        let schema_version = Schema::get_version(&conn)?.unwrap_or(0);
534        let last_updated = Schema::get_metadata(&conn, "last_updated")?;
535
536        Ok(LicenseStats {
537            total_licenses,
538            active_licenses,
539            expired_licenses,
540            cancelled_licenses,
541            by_service: Vec::new(),
542            by_operator_class: Vec::new(),
543            last_updated,
544            schema_version,
545        })
546    }
547
548    /// Count licenses by radio service code(s).
549    /// Pass service codes like ["HA", "HV"] for amateur or ["ZA"] for GMRS.
550    pub fn count_by_service(&self, service_codes: &[&str]) -> Result<u64> {
551        if service_codes.is_empty() {
552            return Ok(0);
553        }
554
555        // Convert string service codes to integer codes
556        let int_codes: Vec<u8> = service_codes
557            .iter()
558            .filter_map(|s| s.parse::<RadioService>().ok().map(|r| r.to_u8()))
559            .collect();
560
561        if int_codes.is_empty() {
562            return Ok(0);
563        }
564
565        let conn = self.conn()?;
566        let placeholders: String = int_codes.iter().map(|_| "?").collect::<Vec<_>>().join(",");
567        let sql = format!(
568            "SELECT COUNT(*) FROM licenses WHERE radio_service_code IN ({})",
569            placeholders
570        );
571
572        let mut stmt = conn.prepare(&sql)?;
573        let count: u64 = stmt.query_row(rusqlite::params_from_iter(int_codes.iter()), |row| {
574            row.get::<_, i64>(0)
575        })? as u64;
576
577        Ok(count)
578    }
579
580    /// Set the last updated timestamp.
581    pub fn set_last_updated(&self, timestamp: &str) -> Result<()> {
582        let conn = self.conn()?;
583        Schema::set_metadata(&conn, "last_updated", timestamp)?;
584        Ok(())
585    }
586
587    /// Get the ETag of the last imported file for a service.
588    pub fn get_imported_etag(&self, service: &str) -> Result<Option<String>> {
589        let conn = self.conn()?;
590        let key = format!("imported_etag_{}", service);
591        Schema::get_metadata(&conn, &key)
592    }
593
594    /// Set the ETag of the last imported file for a service.
595    pub fn set_imported_etag(&self, service: &str, etag: &str) -> Result<()> {
596        let conn = self.conn()?;
597        let key = format!("imported_etag_{}", service);
598        Schema::set_metadata(&conn, &key, etag)?;
599        Ok(())
600    }
601
602    // ========================================================================
603    // Import Status Tracking
604    // ========================================================================
605
606    /// Check if a record type has been imported for a service.
607    pub fn has_record_type(&self, service: &str, record_type: &str) -> Result<bool> {
608        let conn = self.conn()?;
609        let count: i64 = conn.query_row(
610            "SELECT COUNT(*) FROM import_status WHERE radio_service_code = ?1 AND record_type = ?2",
611            params![service, record_type],
612            |row| row.get(0),
613        )?;
614        Ok(count > 0)
615    }
616
617    /// Get list of imported record types for a service.
618    pub fn get_imported_types(&self, service: &str) -> Result<Vec<String>> {
619        let conn = self.conn()?;
620        let mut stmt = conn.prepare(
621            "SELECT record_type FROM import_status WHERE radio_service_code = ?1 ORDER BY record_type"
622        )?;
623        let iter = stmt.query_map(params![service], |row| row.get(0))?;
624        let mut types = Vec::new();
625        for record_type in iter {
626            types.push(record_type?);
627        }
628        Ok(types)
629    }
630
631    /// Mark record type as imported for a service.
632    pub fn mark_imported(&self, service: &str, record_type: &str, count: usize) -> Result<()> {
633        let conn = self.conn()?;
634        let now = chrono::Utc::now().to_rfc3339();
635        conn.execute(
636            "INSERT OR REPLACE INTO import_status (radio_service_code, record_type, imported_at, record_count) 
637             VALUES (?1, ?2, ?3, ?4)",
638            params![service, record_type, now, count as i64],
639        )?;
640        Ok(())
641    }
642
643    /// Clear import status for a service (used when doing full re-import).
644    pub fn clear_import_status(&self, service: &str) -> Result<()> {
645        let conn = self.conn()?;
646        conn.execute(
647            "DELETE FROM import_status WHERE radio_service_code = ?1",
648            params![service],
649        )?;
650        Ok(())
651    }
652
653    /// Get record count for an imported record type.
654    pub fn get_imported_count(&self, service: &str, record_type: &str) -> Result<Option<usize>> {
655        let conn = self.conn()?;
656        let result: rusqlite::Result<i64> = conn.query_row(
657            "SELECT record_count FROM import_status WHERE radio_service_code = ?1 AND record_type = ?2",
658            params![service, record_type],
659            |row| row.get(0),
660        );
661        match result {
662            Ok(count) => Ok(Some(count as usize)),
663            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
664            Err(e) => Err(e.into()),
665        }
666    }
667
668    // ========================================================================
669    // Data Freshness and Patch Tracking
670    // ========================================================================
671
672    /// Get the last updated timestamp for a service.
673    pub fn get_last_updated(&self) -> Result<Option<String>> {
674        let conn = self.conn()?;
675        Schema::get_metadata(&conn, "last_updated")
676    }
677
678    /// Get data freshness information for a service.
679    pub fn get_freshness(
680        &self,
681        service: &str,
682        threshold_days: i64,
683    ) -> Result<crate::freshness::DataFreshness> {
684        let last_updated = self.get_last_updated()?;
685        let mut freshness = crate::freshness::DataFreshness::from_timestamp(
686            service,
687            last_updated.as_deref(),
688            threshold_days,
689        );
690
691        // Get last weekly date from metadata
692        let weekly_key = format!("last_weekly_date_{}", service);
693        let conn = self.conn()?;
694        if let Some(date_str) = Schema::get_metadata(&conn, &weekly_key)? {
695            freshness.last_weekly_date =
696                chrono::NaiveDate::parse_from_str(&date_str, "%Y-%m-%d").ok();
697        }
698
699        // Get applied patches
700        freshness.applied_patch_dates = self
701            .get_applied_patches(service)?
702            .into_iter()
703            .map(|p| p.patch_date)
704            .collect();
705
706        Ok(freshness)
707    }
708
709    /// Check if data for a service is stale.
710    pub fn is_stale(&self, service: &str, threshold_days: i64) -> Result<bool> {
711        let freshness = self.get_freshness(service, threshold_days)?;
712        Ok(freshness.is_stale)
713    }
714
715    /// Record that a daily patch has been applied.
716    pub fn record_applied_patch(
717        &self,
718        service: &str,
719        patch_date: chrono::NaiveDate,
720        weekday: &str,
721        etag: Option<&str>,
722        record_count: Option<usize>,
723    ) -> Result<()> {
724        let conn = self.conn()?;
725        let now = chrono::Utc::now().to_rfc3339();
726        let date_str = patch_date.format("%Y-%m-%d").to_string();
727
728        conn.execute(
729            "INSERT OR REPLACE INTO applied_patches 
730             (radio_service_code, patch_date, patch_weekday, applied_at, etag, record_count)
731             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
732            params![
733                service,
734                date_str,
735                weekday,
736                now,
737                etag,
738                record_count.map(|c| c as i64)
739            ],
740        )?;
741        Ok(())
742    }
743
744    /// Get all applied patches for a service since last weekly.
745    pub fn get_applied_patches(
746        &self,
747        service: &str,
748    ) -> Result<Vec<crate::freshness::AppliedPatch>> {
749        let conn = self.conn()?;
750        let mut stmt = conn.prepare(
751            "SELECT radio_service_code, patch_date, patch_weekday, applied_at, etag, record_count 
752             FROM applied_patches 
753             WHERE radio_service_code = ?1 
754             ORDER BY patch_date",
755        )?;
756
757        let iter = stmt.query_map(params![service], |row| {
758            let date_str: String = row.get(1)?;
759            let applied_at_str: String = row.get(3)?;
760
761            Ok(crate::freshness::AppliedPatch {
762                service: row.get(0)?,
763                patch_date: chrono::NaiveDate::parse_from_str(&date_str, "%Y-%m-%d")
764                    .unwrap_or_else(|_| chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap()),
765                weekday: row.get(2)?,
766                applied_at: chrono::DateTime::parse_from_rfc3339(&applied_at_str)
767                    .map(|dt| dt.with_timezone(&chrono::Utc))
768                    .unwrap_or_else(|_| chrono::Utc::now()),
769                etag: row.get(4)?,
770                record_count: row.get::<_, Option<i64>>(5)?.map(|c| c as usize),
771            })
772        })?;
773
774        let mut patches = Vec::new();
775        for patch in iter {
776            patches.push(patch?);
777        }
778        Ok(patches)
779    }
780
781    /// Clear applied patches for a service (called when new weekly is imported).
782    pub fn clear_applied_patches(&self, service: &str) -> Result<()> {
783        let conn = self.conn()?;
784        conn.execute(
785            "DELETE FROM applied_patches WHERE radio_service_code = ?1",
786            params![service],
787        )?;
788        Ok(())
789    }
790
791    /// Set the date of the last weekly import for a service.
792    pub fn set_last_weekly_date(&self, service: &str, date: chrono::NaiveDate) -> Result<()> {
793        let conn = self.conn()?;
794        let key = format!("last_weekly_date_{}", service);
795        let date_str = date.format("%Y-%m-%d").to_string();
796        Schema::set_metadata(&conn, &key, &date_str)?;
797        Ok(())
798    }
799
800    /// Get the date of the last weekly import for a service.
801    pub fn get_last_weekly_date(&self, service: &str) -> Result<Option<chrono::NaiveDate>> {
802        let conn = self.conn()?;
803        let key = format!("last_weekly_date_{}", service);
804        if let Some(date_str) = Schema::get_metadata(&conn, &key)? {
805            Ok(chrono::NaiveDate::parse_from_str(&date_str, "%Y-%m-%d").ok())
806        } else {
807            Ok(None)
808        }
809    }
810}
811
812/// A database transaction for bulk operations.
813pub struct Transaction {
814    conn: PooledConnection<SqliteConnectionManager>,
815}
816
817impl Transaction {
818    /// Insert a record within this transaction.
819    pub fn insert_record(&self, record: &UlsRecord) -> Result<()> {
820        Database::insert_record_conn(&self.conn, record)
821    }
822
823    /// Commit the transaction.
824    pub fn commit(self) -> Result<()> {
825        self.conn.execute("COMMIT", [])?;
826        Ok(())
827    }
828
829    /// Rollback the transaction.
830    pub fn rollback(self) -> Result<()> {
831        self.conn.execute("ROLLBACK", [])?;
832        Ok(())
833    }
834}
835
836#[cfg(test)]
837mod tests {
838    use super::*;
839    use uls_core::records::HeaderRecord;
840
841    fn create_test_db() -> Database {
842        let config = DatabaseConfig::in_memory();
843        let db = Database::with_config(config).unwrap();
844        db.initialize().unwrap();
845        db
846    }
847
848    fn create_test_header() -> HeaderRecord {
849        let mut hd = HeaderRecord::from_fields(&["HD", "12345"]);
850        hd.unique_system_identifier = 12345;
851        hd.call_sign = Some("W1TEST".to_string());
852        hd.license_status = Some('A');
853        hd.radio_service_code = Some("HA".to_string());
854        hd
855    }
856
857    #[test]
858    fn test_open_database() {
859        let db = create_test_db();
860        assert!(db.is_initialized().unwrap());
861    }
862
863    #[test]
864    fn test_insert_and_query() {
865        let db = create_test_db();
866
867        let header = create_test_header();
868        db.insert_record(&UlsRecord::Header(header)).unwrap();
869
870        let license = db.get_license_by_callsign("W1TEST").unwrap();
871        assert!(license.is_some());
872
873        let license = license.unwrap();
874        assert_eq!(license.call_sign, "W1TEST");
875        assert_eq!(license.status, 'A');
876        assert!(license.is_active());
877    }
878
879    #[test]
880    fn test_case_insensitive_lookup() {
881        let db = create_test_db();
882
883        let header = create_test_header();
884        db.insert_record(&UlsRecord::Header(header)).unwrap();
885
886        // Should find with different case
887        let license = db.get_license_by_callsign("w1test").unwrap();
888        assert!(license.is_some());
889    }
890
891    #[test]
892    fn test_stats() {
893        let db = create_test_db();
894
895        let header = create_test_header();
896        db.insert_record(&UlsRecord::Header(header)).unwrap();
897
898        let stats = db.get_stats().unwrap();
899        assert_eq!(stats.total_licenses, 1);
900        assert_eq!(stats.active_licenses, 1);
901    }
902
903    #[test]
904    fn test_transaction() {
905        let db = create_test_db();
906
907        let tx = db.begin_transaction().unwrap();
908
909        let header = create_test_header();
910        tx.insert_record(&UlsRecord::Header(header)).unwrap();
911        tx.commit().unwrap();
912
913        let license = db.get_license_by_callsign("W1TEST").unwrap();
914        assert!(license.is_some());
915    }
916
917    #[test]
918    fn test_insert_entity() {
919        use uls_core::records::EntityRecord;
920
921        let db = create_test_db();
922
923        // Insert header first for FK constraint
924        let header = create_test_header();
925        db.insert_record(&UlsRecord::Header(header)).unwrap();
926
927        // Insert entity
928        let entity = EntityRecord::from_fields(&[
929            "EN",
930            "12345",
931            "",
932            "",
933            "W1TEST",
934            "L",
935            "L00100001",
936            "DOE, JOHN A",
937            "JOHN",
938            "A",
939            "DOE",
940            "",
941            "555-555-1234",
942            "",
943            "test@example.com",
944            "123 Main St",
945            "ANYTOWN",
946            "CA",
947            "90210",
948            "",
949            "",
950            "000",
951            "0001234567",
952            "I",
953            "",
954            "",
955            "",
956            "",
957            "",
958            "",
959        ]);
960        db.insert_record(&UlsRecord::Entity(entity)).unwrap();
961    }
962
963    #[test]
964    fn test_insert_amateur() {
965        use uls_core::records::AmateurRecord;
966
967        let db = create_test_db();
968
969        let header = create_test_header();
970        db.insert_record(&UlsRecord::Header(header)).unwrap();
971
972        let amateur = AmateurRecord::from_fields(&[
973            "AM", "12345", "", "", "W1TEST", "E", "D", "6", "", "", "", "", "", "", "", "", "", "",
974        ]);
975        db.insert_record(&UlsRecord::Amateur(amateur)).unwrap();
976    }
977
978    #[test]
979    fn test_insert_history() {
980        use uls_core::records::HistoryRecord;
981
982        let db = create_test_db();
983
984        let header = create_test_header();
985        db.insert_record(&UlsRecord::Header(header)).unwrap();
986
987        let history =
988            HistoryRecord::from_fields(&["HS", "12345", "", "W1TEST", "01/01/2020", "LIISS"]);
989        db.insert_record(&UlsRecord::History(history)).unwrap();
990    }
991
992    #[test]
993    fn test_insert_comment() {
994        use uls_core::records::CommentRecord;
995
996        let db = create_test_db();
997
998        let header = create_test_header();
999        db.insert_record(&UlsRecord::Header(header)).unwrap();
1000
1001        let comment = CommentRecord::from_fields(&[
1002            "CO",
1003            "12345",
1004            "",
1005            "W1TEST",
1006            "01/01/2020",
1007            "Test comment",
1008        ]);
1009        db.insert_record(&UlsRecord::Comment(comment)).unwrap();
1010    }
1011
1012    #[test]
1013    fn test_insert_special_condition() {
1014        use uls_core::records::SpecialConditionRecord;
1015
1016        let db = create_test_db();
1017
1018        let header = create_test_header();
1019        db.insert_record(&UlsRecord::Header(header)).unwrap();
1020
1021        let sc = SpecialConditionRecord::from_fields(&[
1022            "SC", "12345", "", "", "W1TEST", "P", "999", "", "",
1023        ]);
1024        db.insert_record(&UlsRecord::SpecialCondition(sc)).unwrap();
1025    }
1026
1027    #[test]
1028    fn test_get_licenses_by_frn() {
1029        use uls_core::records::EntityRecord;
1030
1031        let db = create_test_db();
1032
1033        let header = create_test_header();
1034        db.insert_record(&UlsRecord::Header(header)).unwrap();
1035
1036        // Insert entity with FRN
1037        let entity = EntityRecord::from_fields(&[
1038            "EN",
1039            "12345",
1040            "",
1041            "",
1042            "W1TEST",
1043            "L",
1044            "L00100001",
1045            "DOE, JOHN A",
1046            "JOHN",
1047            "A",
1048            "DOE",
1049            "",
1050            "",
1051            "",
1052            "",
1053            "",
1054            "",
1055            "",
1056            "",
1057            "",
1058            "",
1059            "000",
1060            "0001234567",
1061            "I",
1062            "",
1063            "",
1064            "",
1065            "",
1066            "",
1067            "",
1068        ]);
1069        db.insert_record(&UlsRecord::Entity(entity)).unwrap();
1070
1071        let licenses = db.get_licenses_by_frn("0001234567").unwrap();
1072        assert_eq!(licenses.len(), 1);
1073        assert_eq!(licenses[0].call_sign, "W1TEST");
1074    }
1075
1076    #[test]
1077    fn test_get_licenses_by_frn_not_found() {
1078        let db = create_test_db();
1079
1080        let licenses = db.get_licenses_by_frn("9999999999").unwrap();
1081        assert!(licenses.is_empty());
1082    }
1083
1084    #[test]
1085    fn test_lookup_prefers_active_over_cancelled() {
1086        let db = create_test_db();
1087
1088        // Insert a cancelled license first (lower USI = inserted first)
1089        let mut cancelled = HeaderRecord::from_fields(&["HD", "10001"]);
1090        cancelled.unique_system_identifier = 10001;
1091        cancelled.call_sign = Some("K2QA".to_string());
1092        cancelled.license_status = Some('C');
1093        cancelled.radio_service_code = Some("HA".to_string());
1094        db.insert_record(&UlsRecord::Header(cancelled)).unwrap();
1095
1096        // Insert an active license with the same callsign (higher USI)
1097        let mut active = HeaderRecord::from_fields(&["HD", "20002"]);
1098        active.unique_system_identifier = 20002;
1099        active.call_sign = Some("K2QA".to_string());
1100        active.license_status = Some('A');
1101        active.radio_service_code = Some("HA".to_string());
1102        db.insert_record(&UlsRecord::Header(active)).unwrap();
1103
1104        // Lookup should return the active record, not the cancelled one
1105        let license = db.get_license_by_callsign("K2QA").unwrap();
1106        assert!(license.is_some(), "Should find license for K2QA");
1107        let license = license.unwrap();
1108        assert_eq!(
1109            license.status, 'A',
1110            "Should return active license, not cancelled (got status='{}')",
1111            license.status
1112        );
1113        assert_eq!(license.unique_system_identifier, 20002);
1114    }
1115
1116    #[test]
1117    fn test_lookup_returns_cancelled_when_no_active() {
1118        let db = create_test_db();
1119
1120        // Insert only a cancelled license
1121        let mut cancelled = HeaderRecord::from_fields(&["HD", "10001"]);
1122        cancelled.unique_system_identifier = 10001;
1123        cancelled.call_sign = Some("W9OLD".to_string());
1124        cancelled.license_status = Some('C');
1125        cancelled.radio_service_code = Some("HA".to_string());
1126        db.insert_record(&UlsRecord::Header(cancelled)).unwrap();
1127
1128        // Should still return the cancelled record when it's the only one
1129        let license = db.get_license_by_callsign("W9OLD").unwrap();
1130        assert!(license.is_some(), "Should find cancelled-only license");
1131        assert_eq!(license.unwrap().status, 'C');
1132    }
1133
1134    #[test]
1135    fn test_lookup_prefers_most_recent_inactive_record() {
1136        let db = create_test_db();
1137
1138        // Insert an older expired license (granted 2015)
1139        let mut older = HeaderRecord::from_fields(&["HD", "10001"]);
1140        older.unique_system_identifier = 10001;
1141        older.call_sign = Some("W3OLD".to_string());
1142        older.license_status = Some('E');
1143        older.radio_service_code = Some("HA".to_string());
1144        older.grant_date = Some(chrono::NaiveDate::from_ymd_opt(2015, 3, 1).unwrap());
1145        older.expired_date = Some(chrono::NaiveDate::from_ymd_opt(2025, 3, 1).unwrap());
1146        db.insert_record(&UlsRecord::Header(older)).unwrap();
1147
1148        // Insert a newer expired license (granted 2020)
1149        let mut newer = HeaderRecord::from_fields(&["HD", "20002"]);
1150        newer.unique_system_identifier = 20002;
1151        newer.call_sign = Some("W3OLD".to_string());
1152        newer.license_status = Some('E');
1153        newer.radio_service_code = Some("HA".to_string());
1154        newer.grant_date = Some(chrono::NaiveDate::from_ymd_opt(2020, 6, 15).unwrap());
1155        newer.expired_date = Some(chrono::NaiveDate::from_ymd_opt(2030, 6, 15).unwrap());
1156        db.insert_record(&UlsRecord::Header(newer)).unwrap();
1157
1158        // Should return the more recently granted record
1159        let license = db.get_license_by_callsign("W3OLD").unwrap();
1160        assert!(license.is_some(), "Should find expired license for W3OLD");
1161        let license = license.unwrap();
1162        assert_eq!(
1163            license.unique_system_identifier, 20002,
1164            "Should return the most recently granted expired record"
1165        );
1166        assert_eq!(license.status, 'E');
1167    }
1168
1169    #[test]
1170    fn test_count_by_service() {
1171        let db = create_test_db();
1172
1173        let header = create_test_header(); // Has radio_service_code = "HA"
1174        db.insert_record(&UlsRecord::Header(header)).unwrap();
1175
1176        let count = db.count_by_service(&["HA"]).unwrap();
1177        assert_eq!(count, 1);
1178
1179        let count = db.count_by_service(&["ZA"]).unwrap(); // GMRS, shouldn't match
1180        assert_eq!(count, 0);
1181    }
1182
1183    #[test]
1184    fn test_etag_operations() {
1185        let db = create_test_db();
1186
1187        // Initially no etag
1188        let etag = db.get_imported_etag("l_amat").unwrap();
1189        assert!(etag.is_none());
1190
1191        // Set etag
1192        db.set_imported_etag("l_amat", "abc123").unwrap();
1193
1194        // Should retrieve it
1195        let etag = db.get_imported_etag("l_amat").unwrap();
1196        assert_eq!(etag, Some("abc123".to_string()));
1197
1198        // Update etag
1199        db.set_imported_etag("l_amat", "xyz789").unwrap();
1200        let etag = db.get_imported_etag("l_amat").unwrap();
1201        assert_eq!(etag, Some("xyz789".to_string()));
1202    }
1203
1204    #[test]
1205    fn test_set_last_updated() {
1206        let db = create_test_db();
1207
1208        db.set_last_updated("2025-01-17T12:00:00Z").unwrap();
1209        // Just verify it doesn't error - metadata retrieval would need Schema::get_metadata
1210    }
1211
1212    #[test]
1213    fn test_license_not_found() {
1214        let db = create_test_db();
1215
1216        let license = db.get_license_by_callsign("NOTEXIST").unwrap();
1217        assert!(license.is_none());
1218    }
1219
1220    #[test]
1221    fn test_transaction_rollback() {
1222        let db = create_test_db();
1223
1224        let tx = db.begin_transaction().unwrap();
1225        let header = create_test_header();
1226        tx.insert_record(&UlsRecord::Header(header)).unwrap();
1227        tx.rollback().unwrap();
1228
1229        // Should not be found after rollback
1230        let license = db.get_license_by_callsign("W1TEST").unwrap();
1231        assert!(license.is_none());
1232    }
1233
1234    #[test]
1235    fn test_open_database_with_path() {
1236        use tempfile::TempDir;
1237
1238        let temp_dir = TempDir::new().unwrap();
1239        let db_path = temp_dir.path().join("subdir").join("test.db");
1240
1241        // Database::open should work and create parent directory
1242        let db = Database::open(&db_path).unwrap();
1243        db.initialize().unwrap();
1244        assert!(db.is_initialized().unwrap());
1245        assert!(db_path.parent().unwrap().exists());
1246    }
1247
1248    #[test]
1249    fn test_insert_unsupported_record_type() {
1250        use uls_core::records::LocationRecord;
1251
1252        let db = create_test_db();
1253
1254        // Location is not supported in repository insert_record
1255        let location = LocationRecord::from_fields(&["LO", "12345", "", "", "W1TEST"]);
1256        // Should not error, just skip
1257        db.insert_record(&UlsRecord::Location(location)).unwrap();
1258    }
1259
1260    #[test]
1261    fn test_count_by_service_empty() {
1262        let db = create_test_db();
1263
1264        // Empty service codes should return 0
1265        let count = db.count_by_service(&[]).unwrap();
1266        assert_eq!(count, 0);
1267    }
1268
1269    #[test]
1270    fn test_get_imported_types() {
1271        let db = create_test_db();
1272
1273        // Initially empty
1274        let types = db.get_imported_types("HA").unwrap();
1275        assert!(types.is_empty());
1276
1277        // Mark some record types as imported
1278        db.mark_imported("HA", "HD", 100).unwrap();
1279        db.mark_imported("HA", "EN", 50).unwrap();
1280        db.mark_imported("HA", "AM", 25).unwrap();
1281
1282        // Should retrieve them sorted
1283        let types = db.get_imported_types("HA").unwrap();
1284        assert_eq!(types, vec!["AM", "EN", "HD"]); // Sorted alphabetically
1285
1286        // Different service should be empty
1287        let types = db.get_imported_types("ZA").unwrap();
1288        assert!(types.is_empty());
1289    }
1290
1291    #[test]
1292    fn test_get_imported_count() {
1293        let db = create_test_db();
1294
1295        // Initially not found
1296        let count = db.get_imported_count("HA", "HD").unwrap();
1297        assert!(count.is_none());
1298
1299        // Mark as imported with count
1300        db.mark_imported("HA", "HD", 500).unwrap();
1301
1302        // Should retrieve the count
1303        let count = db.get_imported_count("HA", "HD").unwrap();
1304        assert_eq!(count, Some(500));
1305
1306        // Non-existent record type should return None
1307        let count = db.get_imported_count("HA", "XX").unwrap();
1308        assert!(count.is_none());
1309
1310        // Non-existent service should return None
1311        let count = db.get_imported_count("ZZ", "HD").unwrap();
1312        assert!(count.is_none());
1313    }
1314
1315    #[test]
1316    fn test_import_status_lifecycle() {
1317        let db = create_test_db();
1318
1319        // Mark several types as imported
1320        db.mark_imported("HA", "HD", 100).unwrap();
1321        db.mark_imported("HA", "EN", 200).unwrap();
1322
1323        // Verify they're tracked
1324        assert!(db.has_record_type("HA", "HD").unwrap());
1325        assert!(db.has_record_type("HA", "EN").unwrap());
1326        assert!(!db.has_record_type("HA", "AM").unwrap());
1327
1328        // Clear import status
1329        db.clear_import_status("HA").unwrap();
1330
1331        // All should be cleared
1332        assert!(!db.has_record_type("HA", "HD").unwrap());
1333        assert!(!db.has_record_type("HA", "EN").unwrap());
1334
1335        // get_imported_types should return empty
1336        let types = db.get_imported_types("HA").unwrap();
1337        assert!(types.is_empty());
1338    }
1339
1340    #[test]
1341    fn test_pool_pragma_settings_on_all_connections() {
1342        // Use a path-based database with multiple connections to test pool behavior
1343        let temp_dir = tempfile::tempdir().unwrap();
1344        let db_path = temp_dir.path().join("test_pool.db");
1345
1346        let config = crate::config::DatabaseConfig {
1347            path: db_path.clone(),
1348            max_connections: 3,
1349            foreign_keys: true,
1350            enable_wal: true,
1351            ..Default::default()
1352        };
1353
1354        let db = Database::with_config(config).unwrap();
1355        db.initialize().unwrap();
1356
1357        // Get multiple connections and verify each has foreign_keys enabled
1358        let mut connections = Vec::new();
1359        for i in 0..3 {
1360            let conn = db.conn().unwrap();
1361            let fk_enabled: i32 = conn
1362                .query_row("PRAGMA foreign_keys", [], |row| row.get(0))
1363                .unwrap();
1364            assert_eq!(fk_enabled, 1, "Connection {i} should have foreign_keys ON");
1365            connections.push(conn);
1366        }
1367
1368        // Return connections and get new ones to verify they still work
1369        drop(connections);
1370
1371        for i in 0..2 {
1372            let conn = db.conn().unwrap();
1373            let fk_enabled: i32 = conn
1374                .query_row("PRAGMA foreign_keys", [], |row| row.get(0))
1375                .unwrap();
1376            assert_eq!(
1377                fk_enabled, 1,
1378                "Re-acquired connection {i} should have foreign_keys ON"
1379            );
1380        }
1381    }
1382}