bob/
db.rs

1/*
2 * Copyright (c) 2025 Jonathan Perkin <jonathan@perkin.org.uk>
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 */
16
17//! SQLite database for caching scan and build results.
18//!
19//! This module provides optimized database access with:
20//! - Lazy loading to minimize memory usage
21//! - Indexed reverse dependency lookups for fast failure cascades
22//! - Normalized dependency tables for efficient queries
23//! - Hybrid storage: columns for hot fields, JSON for cold data
24//!
25//! # Schema
26//!
27//! - `packages` - Core package identity and status
28//! - `depends` - Raw dependency patterns from scans
29//! - `resolved_depends` - Resolved dependencies after pattern matching
30//! - `builds` - Build results with indexed outcome
31//! - `metadata` - Key-value store for flags and cached data
32
33use crate::build::{BuildOutcome, BuildResult};
34use anyhow::{Context, Result};
35use pkgsrc::{PkgName, PkgPath, ScanIndex};
36use rusqlite::{Connection, params};
37use std::collections::HashSet;
38use std::path::Path;
39use std::time::Duration;
40use tracing::{debug, info, warn};
41
42/// Current schema version for migrations.
43const SCHEMA_VERSION: i32 = 2;
44
45/// Lightweight package row without full scan data.
46#[derive(Clone, Debug)]
47pub struct PackageRow {
48    pub id: i64,
49    pub pkgname: String,
50    pub pkgpath: String,
51    pub skip_reason: Option<String>,
52    pub fail_reason: Option<String>,
53    pub is_bootstrap: bool,
54    pub pbulk_weight: i32,
55}
56
57/// SQLite database for scan and build caching.
58pub struct Database {
59    conn: Connection,
60}
61
62impl Database {
63    /// Open or create a database at the given path.
64    pub fn open(path: &Path) -> Result<Self> {
65        if let Some(parent) = path.parent() {
66            std::fs::create_dir_all(parent)
67                .context("Failed to create database directory")?;
68        }
69        let conn = Connection::open(path).context("Failed to open database")?;
70        let db = Self { conn };
71        db.configure_pragmas()?;
72        db.init_or_migrate()?;
73        Ok(db)
74    }
75
76    /// Begin a transaction.
77    pub fn begin_transaction(&self) -> Result<()> {
78        self.conn.execute("BEGIN TRANSACTION", [])?;
79        Ok(())
80    }
81
82    /// Commit the current transaction.
83    pub fn commit(&self) -> Result<()> {
84        self.conn.execute("COMMIT", [])?;
85        Ok(())
86    }
87
88    /// Rollback the current transaction.
89    pub fn rollback(&self) -> Result<()> {
90        self.conn.execute("ROLLBACK", [])?;
91        Ok(())
92    }
93
94    /// Configure SQLite for performance.
95    fn configure_pragmas(&self) -> Result<()> {
96        self.conn.execute_batch(
97            "PRAGMA journal_mode = WAL;
98             PRAGMA synchronous = NORMAL;
99             PRAGMA cache_size = -64000;
100             PRAGMA temp_store = MEMORY;
101             PRAGMA mmap_size = 268435456;
102             PRAGMA foreign_keys = ON;",
103        )?;
104        Ok(())
105    }
106
107    /// Initialize schema or migrate from older versions.
108    fn init_or_migrate(&self) -> Result<()> {
109        // Check if schema_version table exists
110        let has_schema_version: bool = self.conn.query_row(
111            "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_version'",
112            [],
113            |row| row.get::<_, i32>(0).map(|c| c > 0),
114        )?;
115
116        if !has_schema_version {
117            // Check for old schema (v1)
118            let has_old_scan: bool = self.conn.query_row(
119                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='scan'",
120                [],
121                |row| row.get::<_, i32>(0).map(|c| c > 0),
122            )?;
123
124            if has_old_scan {
125                info!("Migrating from schema v1 to v{}", SCHEMA_VERSION);
126                self.migrate_v1_to_v2()?;
127            } else {
128                self.create_schema_v2()?;
129            }
130        } else {
131            let version: i32 = self.conn.query_row(
132                "SELECT version FROM schema_version ORDER BY version DESC LIMIT 1",
133                [],
134                |row| row.get(0),
135            ).unwrap_or(1);
136
137            if version < SCHEMA_VERSION {
138                info!(
139                    "Migrating from schema v{} to v{}",
140                    version, SCHEMA_VERSION
141                );
142                if version == 1 {
143                    self.migrate_v1_to_v2()?;
144                }
145            }
146        }
147
148        Ok(())
149    }
150
151    /// Create the v2 schema from scratch.
152    fn create_schema_v2(&self) -> Result<()> {
153        self.conn.execute_batch(
154            "CREATE TABLE IF NOT EXISTS schema_version (
155                version INTEGER PRIMARY KEY
156            );
157
158            INSERT OR REPLACE INTO schema_version (version) VALUES (2);
159
160            CREATE TABLE IF NOT EXISTS packages (
161                id INTEGER PRIMARY KEY AUTOINCREMENT,
162                pkgname TEXT UNIQUE NOT NULL,
163                pkgpath TEXT NOT NULL,
164                pkgname_base TEXT NOT NULL,
165                version TEXT NOT NULL,
166                skip_reason TEXT,
167                fail_reason TEXT,
168                is_bootstrap INTEGER DEFAULT 0,
169                pbulk_weight INTEGER DEFAULT 100,
170                scan_data TEXT,
171                scanned_at INTEGER NOT NULL
172            );
173
174            CREATE INDEX IF NOT EXISTS idx_packages_pkgpath ON packages(pkgpath);
175            CREATE INDEX IF NOT EXISTS idx_packages_pkgname_base ON packages(pkgname_base);
176            CREATE INDEX IF NOT EXISTS idx_packages_status ON packages(skip_reason, fail_reason);
177
178            CREATE TABLE IF NOT EXISTS depends (
179                id INTEGER PRIMARY KEY AUTOINCREMENT,
180                package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
181                depend_pattern TEXT NOT NULL,
182                depend_pkgpath TEXT NOT NULL,
183                UNIQUE(package_id, depend_pattern)
184            );
185
186            CREATE INDEX IF NOT EXISTS idx_depends_package ON depends(package_id);
187            CREATE INDEX IF NOT EXISTS idx_depends_pkgpath ON depends(depend_pkgpath);
188
189            CREATE TABLE IF NOT EXISTS resolved_depends (
190                id INTEGER PRIMARY KEY AUTOINCREMENT,
191                package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
192                depends_on_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
193                UNIQUE(package_id, depends_on_id)
194            );
195
196            CREATE INDEX IF NOT EXISTS idx_resolved_depends_package ON resolved_depends(package_id);
197            CREATE INDEX IF NOT EXISTS idx_resolved_depends_depends_on ON resolved_depends(depends_on_id);
198
199            CREATE TABLE IF NOT EXISTS builds (
200                id INTEGER PRIMARY KEY AUTOINCREMENT,
201                package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
202                outcome TEXT NOT NULL,
203                outcome_detail TEXT,
204                duration_ms INTEGER NOT NULL DEFAULT 0,
205                built_at INTEGER NOT NULL,
206                log_dir TEXT,
207                UNIQUE(package_id)
208            );
209
210            CREATE INDEX IF NOT EXISTS idx_builds_outcome ON builds(outcome);
211            CREATE INDEX IF NOT EXISTS idx_builds_package ON builds(package_id);
212
213            CREATE TABLE IF NOT EXISTS metadata (
214                key TEXT PRIMARY KEY,
215                value TEXT NOT NULL
216            );"
217        )?;
218
219        debug!("Created schema v2");
220        Ok(())
221    }
222
223    /// Migrate from v1 (old scan/build tables) to v2.
224    fn migrate_v1_to_v2(&self) -> Result<()> {
225        // Create new schema first
226        self.create_schema_v2()?;
227
228        // Migrate scan data
229        let mut stmt = self
230            .conn
231            .prepare("SELECT pkgpath, data FROM scan ORDER BY rowid")?;
232
233        let rows: Vec<(String, String)> = stmt
234            .query_map([], |row| {
235                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
236            })?
237            .filter_map(|r| r.ok())
238            .collect();
239
240        let mut migrated_count = 0;
241        for (pkgpath, json) in rows {
242            let indexes: Vec<ScanIndex> = match serde_json::from_str(&json) {
243                Ok(idx) => idx,
244                Err(e) => {
245                    warn!(pkgpath = %pkgpath, error = %e, "Failed to parse scan data during migration");
246                    continue;
247                }
248            };
249
250            for idx in indexes {
251                if let Err(e) = self.store_package(&pkgpath, &idx) {
252                    warn!(pkgpath = %pkgpath, error = %e, "Failed to migrate package");
253                }
254            }
255            migrated_count += 1;
256        }
257
258        // Migrate build data
259        let mut stmt = self
260            .conn
261            .prepare("SELECT pkgname, data FROM build ORDER BY rowid")?;
262
263        let rows: Vec<(String, String)> = stmt
264            .query_map([], |row| {
265                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
266            })?
267            .filter_map(|r| r.ok())
268            .collect();
269
270        let mut build_count = 0;
271        for (pkgname, json) in rows {
272            let result: BuildResult = match serde_json::from_str(&json) {
273                Ok(r) => r,
274                Err(e) => {
275                    warn!(pkgname = %pkgname, error = %e, "Failed to parse build data during migration");
276                    continue;
277                }
278            };
279
280            // Look up package_id
281            if let Ok(Some(pkg)) = self.get_package_by_name(&pkgname) {
282                if let Err(e) = self.store_build_result(pkg.id, &result) {
283                    warn!(pkgname = %pkgname, error = %e, "Failed to migrate build result");
284                }
285                build_count += 1;
286            }
287        }
288
289        // Drop old tables
290        self.conn.execute_batch(
291            "DROP TABLE IF EXISTS scan;
292             DROP TABLE IF EXISTS build;",
293        )?;
294
295        info!(
296            packages = migrated_count,
297            builds = build_count,
298            "Migration to v2 complete"
299        );
300        Ok(())
301    }
302
303    // ========================================================================
304    // PACKAGE QUERIES
305    // ========================================================================
306
307    /// Store a package from scan results.
308    pub fn store_package(
309        &self,
310        pkgpath: &str,
311        index: &ScanIndex,
312    ) -> Result<i64> {
313        let pkgname = index.pkgname.pkgname();
314        let (base, version) = split_pkgname(pkgname);
315
316        let skip_reason =
317            index.pkg_skip_reason.as_ref().filter(|s| !s.is_empty());
318        let fail_reason =
319            index.pkg_fail_reason.as_ref().filter(|s| !s.is_empty());
320        let is_bootstrap = index.bootstrap_pkg.as_deref() == Some("yes");
321        let pbulk_weight: i32 = index
322            .pbulk_weight
323            .as_ref()
324            .and_then(|s| s.parse().ok())
325            .unwrap_or(100);
326
327        let scan_data = serde_json::to_string(index)?;
328        let now = std::time::SystemTime::now()
329            .duration_since(std::time::UNIX_EPOCH)?
330            .as_secs() as i64;
331
332        {
333            let mut stmt = self.conn.prepare_cached(
334                "INSERT OR REPLACE INTO packages
335                 (pkgname, pkgpath, pkgname_base, version, skip_reason, fail_reason,
336                  is_bootstrap, pbulk_weight, scan_data, scanned_at)
337                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
338            )?;
339            stmt.execute(params![
340                pkgname,
341                pkgpath,
342                base,
343                version,
344                skip_reason,
345                fail_reason,
346                is_bootstrap,
347                pbulk_weight,
348                scan_data,
349                now
350            ])?;
351        }
352
353        let package_id = self.conn.last_insert_rowid();
354
355        // Store raw dependencies
356        if let Some(ref deps) = index.all_depends {
357            let mut stmt = self.conn.prepare_cached(
358                "INSERT OR IGNORE INTO depends (package_id, depend_pattern, depend_pkgpath)
359                 VALUES (?1, ?2, ?3)",
360            )?;
361            for dep in deps {
362                stmt.execute(params![
363                    package_id,
364                    dep.pattern().pattern(),
365                    dep.pkgpath().to_string()
366                ])?;
367            }
368        }
369
370        debug!(pkgname = pkgname, package_id = package_id, "Stored package");
371        Ok(package_id)
372    }
373
374    /// Store scan results for a pkgpath.
375    pub fn store_scan_pkgpath(
376        &self,
377        pkgpath: &str,
378        indexes: &[ScanIndex],
379    ) -> Result<()> {
380        for index in indexes {
381            self.store_package(pkgpath, index)?;
382        }
383        Ok(())
384    }
385
386    /// Get package by name.
387    pub fn get_package_by_name(
388        &self,
389        pkgname: &str,
390    ) -> Result<Option<PackageRow>> {
391        let result = self.conn.query_row(
392            "SELECT id, pkgname, pkgpath, skip_reason, fail_reason, is_bootstrap, pbulk_weight
393             FROM packages WHERE pkgname = ?1",
394            [pkgname],
395            |row| Ok(PackageRow {
396                id: row.get(0)?,
397                pkgname: row.get(1)?,
398                pkgpath: row.get(2)?,
399                skip_reason: row.get(3)?,
400                fail_reason: row.get(4)?,
401                is_bootstrap: row.get::<_, i32>(5)? != 0,
402                pbulk_weight: row.get(6)?,
403            }),
404        );
405
406        match result {
407            Ok(pkg) => Ok(Some(pkg)),
408            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
409            Err(e) => Err(e.into()),
410        }
411    }
412
413    /// Get package ID by name.
414    pub fn get_package_id(&self, pkgname: &str) -> Result<Option<i64>> {
415        let result = self.conn.query_row(
416            "SELECT id FROM packages WHERE pkgname = ?1",
417            [pkgname],
418            |row| row.get(0),
419        );
420
421        match result {
422            Ok(id) => Ok(Some(id)),
423            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
424            Err(e) => Err(e.into()),
425        }
426    }
427
428    /// Get pkgname by package ID.
429    pub fn get_pkgname(&self, package_id: i64) -> Result<String> {
430        self.conn
431            .query_row(
432                "SELECT pkgname FROM packages WHERE id = ?1",
433                [package_id],
434                |row| row.get(0),
435            )
436            .context("Package not found")
437    }
438
439    /// Get packages by pkgpath.
440    pub fn get_packages_by_path(
441        &self,
442        pkgpath: &str,
443    ) -> Result<Vec<PackageRow>> {
444        let mut stmt = self.conn.prepare(
445            "SELECT id, pkgname, pkgpath, skip_reason, fail_reason, is_bootstrap, pbulk_weight
446             FROM packages WHERE pkgpath = ?1"
447        )?;
448
449        let rows = stmt.query_map([pkgpath], |row| {
450            Ok(PackageRow {
451                id: row.get(0)?,
452                pkgname: row.get(1)?,
453                pkgpath: row.get(2)?,
454                skip_reason: row.get(3)?,
455                fail_reason: row.get(4)?,
456                is_bootstrap: row.get::<_, i32>(5)? != 0,
457                pbulk_weight: row.get(6)?,
458            })
459        })?;
460
461        rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
462    }
463
464    /// Check if pkgpath is scanned.
465    pub fn is_pkgpath_scanned(&self, pkgpath: &str) -> Result<bool> {
466        let count: i32 = self.conn.query_row(
467            "SELECT COUNT(*) FROM packages WHERE pkgpath = ?1",
468            [pkgpath],
469            |row| row.get(0),
470        )?;
471        Ok(count > 0)
472    }
473
474    /// Get all scanned pkgpaths.
475    pub fn get_scanned_pkgpaths(&self) -> Result<HashSet<String>> {
476        let mut stmt =
477            self.conn.prepare("SELECT DISTINCT pkgpath FROM packages")?;
478        let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
479        rows.collect::<Result<HashSet<_>, _>>().map_err(Into::into)
480    }
481
482    /// Get pkgpaths that are referenced as dependencies but haven't been scanned yet.
483    /// These are dependencies that were discovered during scanning but the scan was
484    /// interrupted before they could be processed.
485    pub fn get_unscanned_dependencies(&self) -> Result<HashSet<String>> {
486        let mut stmt = self.conn.prepare(
487            "SELECT DISTINCT d.depend_pkgpath
488             FROM depends d
489             WHERE d.depend_pkgpath NOT IN (SELECT pkgpath FROM packages)",
490        )?;
491        let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
492        rows.collect::<Result<HashSet<_>, _>>().map_err(Into::into)
493    }
494
495    /// Count of scanned packages.
496    pub fn count_packages(&self) -> Result<i64> {
497        self.conn
498            .query_row("SELECT COUNT(*) FROM packages", [], |row| row.get(0))
499            .context("Failed to count packages")
500    }
501
502    /// Count of scanned pkgpaths.
503    pub fn count_scan(&self) -> Result<i64> {
504        self.conn
505            .query_row(
506                "SELECT COUNT(DISTINCT pkgpath) FROM packages",
507                [],
508                |row| row.get(0),
509            )
510            .context("Failed to count scan")
511    }
512
513    /// Get all packages (lightweight).
514    pub fn get_all_packages(&self) -> Result<Vec<PackageRow>> {
515        let mut stmt = self.conn.prepare(
516            "SELECT id, pkgname, pkgpath, skip_reason, fail_reason, is_bootstrap, pbulk_weight
517             FROM packages ORDER BY id"
518        )?;
519
520        let rows = stmt.query_map([], |row| {
521            Ok(PackageRow {
522                id: row.get(0)?,
523                pkgname: row.get(1)?,
524                pkgpath: row.get(2)?,
525                skip_reason: row.get(3)?,
526                fail_reason: row.get(4)?,
527                is_bootstrap: row.get::<_, i32>(5)? != 0,
528                pbulk_weight: row.get(6)?,
529            })
530        })?;
531
532        rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
533    }
534
535    /// Get all buildable packages (no skip/fail reason).
536    pub fn get_buildable_packages(&self) -> Result<Vec<PackageRow>> {
537        let mut stmt = self.conn.prepare(
538            "SELECT id, pkgname, pkgpath, skip_reason, fail_reason, is_bootstrap, pbulk_weight
539             FROM packages WHERE skip_reason IS NULL AND fail_reason IS NULL"
540        )?;
541
542        let rows = stmt.query_map([], |row| {
543            Ok(PackageRow {
544                id: row.get(0)?,
545                pkgname: row.get(1)?,
546                pkgpath: row.get(2)?,
547                skip_reason: row.get(3)?,
548                fail_reason: row.get(4)?,
549                is_bootstrap: row.get::<_, i32>(5)? != 0,
550                pbulk_weight: row.get(6)?,
551            })
552        })?;
553
554        rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
555    }
556
557    /// Load full ScanIndex for a package.
558    pub fn get_full_scan_index(&self, package_id: i64) -> Result<ScanIndex> {
559        let json: String = self.conn.query_row(
560            "SELECT scan_data FROM packages WHERE id = ?1",
561            [package_id],
562            |row| row.get(0),
563        )?;
564        serde_json::from_str(&json).context("Failed to deserialize scan data")
565    }
566
567    /// Load all ScanIndex data in one query.
568    pub fn get_all_scan_indexes(&self) -> Result<Vec<(i64, ScanIndex)>> {
569        let mut stmt = self
570            .conn
571            .prepare("SELECT id, scan_data FROM packages ORDER BY id")?;
572        let rows = stmt.query_map([], |row| {
573            let id: i64 = row.get(0)?;
574            let json: String = row.get(1)?;
575            Ok((id, json))
576        })?;
577        let mut results = Vec::new();
578        for row in rows {
579            let (id, json) = row?;
580            let index: ScanIndex =
581                serde_json::from_str(&json).with_context(|| {
582                    format!(
583                        "Failed to deserialize scan data for package {}",
584                        id
585                    )
586                })?;
587            results.push((id, index));
588        }
589        Ok(results)
590    }
591
592    /// Load full ScanIndex by pkgname.
593    pub fn get_scan_index_by_name(
594        &self,
595        pkgname: &str,
596    ) -> Result<Option<ScanIndex>> {
597        let result = self.conn.query_row(
598            "SELECT scan_data FROM packages WHERE pkgname = ?1",
599            [pkgname],
600            |row| row.get::<_, String>(0),
601        );
602
603        match result {
604            Ok(json) => {
605                let index: ScanIndex = serde_json::from_str(&json)
606                    .context("Failed to deserialize scan data")?;
607                Ok(Some(index))
608            }
609            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
610            Err(e) => Err(e.into()),
611        }
612    }
613
614    /// Clear all scan data.
615    pub fn clear_scan(&self) -> Result<()> {
616        self.conn.execute("DELETE FROM packages", [])?;
617        self.clear_full_scan_complete()?;
618        self.clear_resolved_depends()?;
619        Ok(())
620    }
621
622    // ========================================================================
623    // DEPENDENCY QUERIES
624    // ========================================================================
625
626    /// Store a resolved dependency.
627    pub fn store_resolved_dependency(
628        &self,
629        package_id: i64,
630        depends_on_id: i64,
631    ) -> Result<()> {
632        self.conn.execute(
633            "INSERT OR IGNORE INTO resolved_depends (package_id, depends_on_id) VALUES (?1, ?2)",
634            params![package_id, depends_on_id],
635        )?;
636        Ok(())
637    }
638
639    /// Store resolved dependencies in batch.
640    pub fn store_resolved_dependencies_batch(
641        &self,
642        deps: &[(i64, i64)],
643    ) -> Result<()> {
644        self.conn.execute("BEGIN TRANSACTION", [])?;
645        let mut stmt = self.conn.prepare(
646            "INSERT OR IGNORE INTO resolved_depends (package_id, depends_on_id) VALUES (?1, ?2)",
647        )?;
648        for (package_id, depends_on_id) in deps {
649            stmt.execute(params![package_id, depends_on_id])?;
650        }
651        drop(stmt);
652        self.conn.execute("COMMIT", [])?;
653        Ok(())
654    }
655
656    /// Get direct dependencies of a package.
657    pub fn get_dependencies(&self, package_id: i64) -> Result<Vec<i64>> {
658        let mut stmt = self.conn.prepare(
659            "SELECT depends_on_id FROM resolved_depends WHERE package_id = ?1",
660        )?;
661        let rows = stmt.query_map([package_id], |row| row.get::<_, i64>(0))?;
662        rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
663    }
664
665    /// Get reverse dependencies (packages that depend on this one).
666    pub fn get_reverse_dependencies(
667        &self,
668        package_id: i64,
669    ) -> Result<Vec<i64>> {
670        let mut stmt = self.conn.prepare(
671            "SELECT package_id FROM resolved_depends WHERE depends_on_id = ?1",
672        )?;
673        let rows = stmt.query_map([package_id], |row| row.get::<_, i64>(0))?;
674        rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
675    }
676
677    /// Get all transitive reverse dependencies using recursive CTE.
678    pub fn get_transitive_reverse_deps(
679        &self,
680        package_id: i64,
681    ) -> Result<Vec<i64>> {
682        let mut stmt = self.conn.prepare(
683            "WITH RECURSIVE affected(id) AS (
684                SELECT ?1
685                UNION
686                SELECT rd.package_id
687                FROM resolved_depends rd
688                JOIN affected a ON rd.depends_on_id = a.id
689            )
690            SELECT id FROM affected WHERE id != ?1",
691        )?;
692        let rows = stmt.query_map([package_id], |row| row.get::<_, i64>(0))?;
693        rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
694    }
695
696    /// Check if dependencies are resolved.
697    pub fn is_resolved(&self) -> Result<bool> {
698        let count: i64 = self.conn.query_row(
699            "SELECT COUNT(*) FROM resolved_depends",
700            [],
701            |row| row.get(0),
702        )?;
703        Ok(count > 0)
704    }
705
706    /// Clear all resolved dependencies.
707    pub fn clear_resolved_depends(&self) -> Result<()> {
708        self.conn.execute("DELETE FROM resolved_depends", [])?;
709        Ok(())
710    }
711
712    /// Get raw dependencies for pattern matching.
713    pub fn get_raw_dependencies(
714        &self,
715        package_id: i64,
716    ) -> Result<Vec<(String, String)>> {
717        let mut stmt = self.conn.prepare(
718            "SELECT depend_pattern, depend_pkgpath FROM depends WHERE package_id = ?1"
719        )?;
720        let rows = stmt.query_map([package_id], |row| {
721            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
722        })?;
723        rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
724    }
725
726    // ========================================================================
727    // BUILD QUERIES
728    // ========================================================================
729
730    /// Store a build result by package ID.
731    pub fn store_build_result(
732        &self,
733        package_id: i64,
734        result: &BuildResult,
735    ) -> Result<()> {
736        let (outcome, detail) = build_outcome_to_db(&result.outcome);
737        let duration_ms = result.duration.as_millis() as i64;
738        let now = std::time::SystemTime::now()
739            .duration_since(std::time::UNIX_EPOCH)?
740            .as_secs() as i64;
741        let log_dir = result.log_dir.as_ref().map(|p| p.display().to_string());
742
743        self.conn.execute(
744            "INSERT OR REPLACE INTO builds
745             (package_id, outcome, outcome_detail, duration_ms, built_at, log_dir)
746             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
747            params![package_id, outcome, detail, duration_ms, now, log_dir],
748        )?;
749
750        debug!(
751            package_id = package_id,
752            outcome = outcome,
753            "Stored build result"
754        );
755        Ok(())
756    }
757
758    /// Store a build result by pkgname.
759    pub fn store_build_by_name(&self, result: &BuildResult) -> Result<()> {
760        if let Some(pkg) = self.get_package_by_name(result.pkgname.pkgname())? {
761            self.store_build_result(pkg.id, result)
762        } else {
763            warn!(pkgname = %result.pkgname.pkgname(), "Package not found in database for build result");
764            Ok(())
765        }
766    }
767
768    /// Store multiple build results in a transaction.
769    pub fn store_build_batch(&self, results: &[BuildResult]) -> Result<()> {
770        self.conn.execute("BEGIN TRANSACTION", [])?;
771        for result in results {
772            if let Err(e) = self.store_build_by_name(result) {
773                let _ = self.conn.execute("ROLLBACK", []);
774                return Err(e);
775            }
776        }
777        self.conn.execute("COMMIT", [])?;
778        debug!(count = results.len(), "Stored build results batch");
779        Ok(())
780    }
781
782    /// Get build result for a package.
783    pub fn get_build_result(
784        &self,
785        package_id: i64,
786    ) -> Result<Option<BuildResult>> {
787        let result = self.conn.query_row(
788            "SELECT p.pkgname, p.pkgpath, b.outcome, b.outcome_detail, b.duration_ms, b.log_dir
789             FROM builds b
790             JOIN packages p ON b.package_id = p.id
791             WHERE b.package_id = ?1",
792            [package_id],
793            |row| {
794                let pkgname: String = row.get(0)?;
795                let pkgpath: Option<String> = row.get(1)?;
796                let outcome: String = row.get(2)?;
797                let detail: Option<String> = row.get(3)?;
798                let duration_ms: i64 = row.get(4)?;
799                let log_dir: Option<String> = row.get(5)?;
800                Ok((pkgname, pkgpath, outcome, detail, duration_ms, log_dir))
801            },
802        );
803
804        match result {
805            Ok((pkgname, pkgpath, outcome, detail, duration_ms, log_dir)) => {
806                let build_outcome = db_outcome_to_build(&outcome, detail);
807                Ok(Some(BuildResult {
808                    pkgname: PkgName::new(&pkgname),
809                    pkgpath: pkgpath.and_then(|p| PkgPath::new(&p).ok()),
810                    outcome: build_outcome,
811                    duration: Duration::from_millis(duration_ms as u64),
812                    log_dir: log_dir.map(std::path::PathBuf::from),
813                }))
814            }
815            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
816            Err(e) => Err(e.into()),
817        }
818    }
819
820    /// Check if a package is already built (success or up_to_date).
821    pub fn is_package_complete(&self, package_id: i64) -> Result<bool> {
822        let result = self.conn.query_row(
823            "SELECT outcome FROM builds WHERE package_id = ?1",
824            [package_id],
825            |row| row.get::<_, String>(0),
826        );
827
828        match result {
829            Ok(outcome) => Ok(outcome == "success" || outcome == "up_to_date"),
830            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(false),
831            Err(e) => Err(e.into()),
832        }
833    }
834
835    /// Check if a package build has failed.
836    pub fn is_package_failed(&self, package_id: i64) -> Result<bool> {
837        let result = self.conn.query_row(
838            "SELECT outcome FROM builds WHERE package_id = ?1",
839            [package_id],
840            |row| row.get::<_, String>(0),
841        );
842
843        match result {
844            Ok(outcome) => Ok(outcome != "success" && outcome != "up_to_date"),
845            Err(rusqlite::Error::QueryReturnedNoRows) => Ok(false),
846            Err(e) => Err(e.into()),
847        }
848    }
849
850    /// Get all completed package IDs.
851    pub fn get_completed_package_ids(&self) -> Result<HashSet<i64>> {
852        let mut stmt = self.conn.prepare(
853            "SELECT package_id FROM builds WHERE outcome IN ('success', 'up_to_date')"
854        )?;
855        let rows = stmt.query_map([], |row| row.get::<_, i64>(0))?;
856        rows.collect::<Result<HashSet<_>, _>>().map_err(Into::into)
857    }
858
859    /// Get all failed package IDs.
860    pub fn get_failed_package_ids(&self) -> Result<HashSet<i64>> {
861        let mut stmt = self.conn.prepare(
862            "SELECT package_id FROM builds WHERE outcome NOT IN ('success', 'up_to_date')"
863        )?;
864        let rows = stmt.query_map([], |row| row.get::<_, i64>(0))?;
865        rows.collect::<Result<HashSet<_>, _>>().map_err(Into::into)
866    }
867
868    /// Count of build results.
869    pub fn count_build(&self) -> Result<i64> {
870        self.conn
871            .query_row("SELECT COUNT(*) FROM builds", [], |row| row.get(0))
872            .context("Failed to count builds")
873    }
874
875    /// Clear all build data.
876    pub fn clear_build(&self) -> Result<()> {
877        self.conn.execute("DELETE FROM builds", [])?;
878        Ok(())
879    }
880
881    /// Delete build result for a pkgname.
882    pub fn delete_build_by_name(&self, pkgname: &str) -> Result<bool> {
883        let rows = self.conn.execute(
884            "DELETE FROM builds WHERE package_id IN (SELECT id FROM packages WHERE pkgname = ?1)",
885            params![pkgname],
886        )?;
887        Ok(rows > 0)
888    }
889
890    /// Delete build results by pkgpath.
891    pub fn delete_build_by_pkgpath(&self, pkgpath: &str) -> Result<usize> {
892        let rows = self.conn.execute(
893            "DELETE FROM builds WHERE package_id IN (SELECT id FROM packages WHERE pkgpath = ?1)",
894            params![pkgpath],
895        )?;
896        Ok(rows)
897    }
898
899    /// Get all build results from the database.
900    pub fn get_all_build_results(&self) -> Result<Vec<BuildResult>> {
901        let mut stmt = self.conn.prepare(
902            "SELECT p.pkgname, p.pkgpath, b.outcome, b.outcome_detail, b.duration_ms, b.log_dir
903             FROM builds b
904             JOIN packages p ON b.package_id = p.id
905             ORDER BY p.pkgname"
906        )?;
907
908        let rows = stmt.query_map([], |row| {
909            let pkgname: String = row.get(0)?;
910            let pkgpath: Option<String> = row.get(1)?;
911            let outcome: String = row.get(2)?;
912            let detail: Option<String> = row.get(3)?;
913            let duration_ms: i64 = row.get(4)?;
914            let log_dir: Option<String> = row.get(5)?;
915            Ok((pkgname, pkgpath, outcome, detail, duration_ms, log_dir))
916        })?;
917
918        let mut results = Vec::new();
919        for row in rows {
920            let (pkgname, pkgpath, outcome, detail, duration_ms, log_dir) =
921                row?;
922            let build_outcome = db_outcome_to_build(&outcome, detail);
923            results.push(BuildResult {
924                pkgname: PkgName::new(&pkgname),
925                pkgpath: pkgpath.and_then(|p| PkgPath::new(&p).ok()),
926                outcome: build_outcome,
927                duration: Duration::from_millis(duration_ms as u64),
928                log_dir: log_dir.map(std::path::PathBuf::from),
929            });
930        }
931
932        Ok(results)
933    }
934
935    /// Count how many packages are broken by each failed package.
936    /// Returns a map from pkgname to the count of packages that depend on it.
937    pub fn count_breaks_for_failed(
938        &self,
939    ) -> Result<std::collections::HashMap<String, usize>> {
940        use std::collections::HashMap;
941
942        let mut counts: HashMap<String, usize> = HashMap::new();
943
944        // Get all failed package IDs and their names
945        let mut stmt = self.conn.prepare(
946            "SELECT p.id, p.pkgname FROM builds b
947             JOIN packages p ON b.package_id = p.id
948             WHERE b.outcome = 'failed'",
949        )?;
950
951        let failed: Vec<(i64, String)> = stmt
952            .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
953            .filter_map(|r| r.ok())
954            .collect();
955
956        // For each failed package, count indirect failures that reference it
957        for (_pkg_id, pkgname) in failed {
958            let count: i64 = self.conn.query_row(
959                "SELECT COUNT(*) FROM builds b
960                 JOIN packages p ON b.package_id = p.id
961                 WHERE b.outcome = 'indirect_failed'
962                 AND b.outcome_detail LIKE ?1",
963                params![format!("%{}", pkgname)],
964                |row| row.get(0),
965            )?;
966            counts.insert(pkgname, count as usize);
967        }
968
969        Ok(counts)
970    }
971
972    /// Get total build duration from all builds.
973    pub fn get_total_build_duration(&self) -> Result<Duration> {
974        let total_ms: i64 = self.conn.query_row(
975            "SELECT COALESCE(SUM(duration_ms), 0) FROM builds",
976            [],
977            |row| row.get(0),
978        )?;
979        Ok(Duration::from_millis(total_ms as u64))
980    }
981
982    /// Get pre-failed packages (those with skip_reason or fail_reason but no build result).
983    /// Returns (pkgname, pkgpath, reason).
984    pub fn get_prefailed_packages(
985        &self,
986    ) -> Result<Vec<(String, Option<String>, String)>> {
987        let mut stmt = self.conn.prepare(
988            "SELECT p.pkgname, p.pkgpath,
989                    COALESCE(p.fail_reason, p.skip_reason) as reason
990             FROM packages p
991             WHERE (p.skip_reason IS NOT NULL OR p.fail_reason IS NOT NULL)
992               AND NOT EXISTS (SELECT 1 FROM builds b WHERE b.package_id = p.id)
993             ORDER BY p.pkgname",
994        )?;
995
996        let rows = stmt
997            .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?;
998
999        rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1000    }
1001
1002    /// Get packages without build results that depend on failed packages.
1003    /// Returns (pkgname, pkgpath, failed_deps) where failed_deps is comma-separated.
1004    /// Excludes packages that have skip_reason or fail_reason (they're pre-failed).
1005    /// Only lists root failures (direct failures), not indirect failures.
1006    pub fn get_indirect_failures(
1007        &self,
1008    ) -> Result<Vec<(String, Option<String>, String)>> {
1009        // Find packages that:
1010        // 1. Have no build result
1011        // 2. Have no skip_reason or fail_reason (not pre-failed)
1012        // 3. Depend (transitively) on a package with a direct failure
1013        // Group by package and aggregate failed deps into comma-separated string
1014        // Only 'failed' and 'prefailed' are root causes, not 'indirect_*'
1015        let mut stmt = self.conn.prepare(
1016            "WITH RECURSIVE
1017             -- Only direct failures are root causes
1018             failed_pkgs(id) AS (
1019                 SELECT package_id FROM builds
1020                 WHERE outcome IN ('failed', 'prefailed')
1021             ),
1022             -- Packages affected by failures (transitive closure)
1023             affected(id, root_id) AS (
1024                 SELECT id, id FROM failed_pkgs
1025                 UNION
1026                 SELECT rd.package_id, a.root_id
1027                 FROM resolved_depends rd
1028                 JOIN affected a ON rd.depends_on_id = a.id
1029                 WHERE rd.package_id NOT IN (SELECT id FROM failed_pkgs)
1030             )
1031             SELECT p.pkgname, p.pkgpath, GROUP_CONCAT(DISTINCT fp.pkgname) as failed_deps
1032             FROM affected a
1033             JOIN packages p ON a.id = p.id
1034             JOIN packages fp ON a.root_id = fp.id
1035             WHERE a.id != a.root_id
1036               AND NOT EXISTS (SELECT 1 FROM builds b WHERE b.package_id = a.id)
1037               AND p.skip_reason IS NULL
1038               AND p.fail_reason IS NULL
1039             GROUP BY p.id, p.pkgname, p.pkgpath
1040             ORDER BY p.pkgname",
1041        )?;
1042
1043        let rows = stmt
1044            .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?;
1045
1046        rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1047    }
1048
1049    /// Mark a package and all its transitive reverse dependencies as failed.
1050    /// Returns the count of packages marked.
1051    pub fn mark_failure_cascade(
1052        &self,
1053        package_id: i64,
1054        reason: &str,
1055        duration: Duration,
1056    ) -> Result<usize> {
1057        let now = std::time::SystemTime::now()
1058            .duration_since(std::time::UNIX_EPOCH)?
1059            .as_secs() as i64;
1060
1061        let pkgname = self.get_pkgname(package_id)?;
1062
1063        // Get all affected packages using recursive CTE
1064        let mut stmt = self.conn.prepare(
1065            "WITH RECURSIVE affected(id, depth) AS (
1066                SELECT ?1, 0
1067                UNION
1068                SELECT rd.package_id, a.depth + 1
1069                FROM resolved_depends rd
1070                JOIN affected a ON rd.depends_on_id = a.id
1071            )
1072            SELECT id, depth FROM affected ORDER BY depth",
1073        )?;
1074
1075        let affected: Vec<(i64, i32)> = stmt
1076            .query_map([package_id], |row| {
1077                Ok((row.get::<_, i64>(0)?, row.get::<_, i32>(1)?))
1078            })?
1079            .filter_map(|r| r.ok())
1080            .collect();
1081
1082        // Batch insert failures
1083        self.conn.execute("BEGIN TRANSACTION", [])?;
1084
1085        for (id, depth) in &affected {
1086            let (outcome, detail, dur) = if *depth == 0 {
1087                ("failed", reason.to_string(), duration.as_millis() as i64)
1088            } else {
1089                ("indirect_failed", format!("depends on failed {}", pkgname), 0)
1090            };
1091
1092            self.conn.execute(
1093                "INSERT OR REPLACE INTO builds
1094                 (package_id, outcome, outcome_detail, duration_ms, built_at)
1095                 VALUES (?1, ?2, ?3, ?4, ?5)",
1096                params![id, outcome, detail, dur, now],
1097            )?;
1098        }
1099
1100        self.conn.execute("COMMIT", [])?;
1101
1102        debug!(
1103            package_id = package_id,
1104            affected_count = affected.len(),
1105            "Marked failure cascade"
1106        );
1107        Ok(affected.len())
1108    }
1109
1110    // ========================================================================
1111    // METADATA
1112    // ========================================================================
1113
1114    /// Check if a full tree scan has been completed.
1115    pub fn full_scan_complete(&self) -> bool {
1116        self.conn
1117            .query_row(
1118                "SELECT value FROM metadata WHERE key = 'full_scan_complete'",
1119                [],
1120                |row| row.get::<_, String>(0),
1121            )
1122            .map(|v| v == "true")
1123            .unwrap_or(false)
1124    }
1125
1126    /// Mark a full tree scan as complete.
1127    pub fn set_full_scan_complete(&self) -> Result<()> {
1128        self.conn.execute(
1129            "INSERT OR REPLACE INTO metadata (key, value) VALUES ('full_scan_complete', 'true')",
1130            [],
1131        )?;
1132        Ok(())
1133    }
1134
1135    /// Clear the full tree scan complete marker.
1136    pub fn clear_full_scan_complete(&self) -> Result<()> {
1137        self.conn.execute(
1138            "DELETE FROM metadata WHERE key = 'full_scan_complete'",
1139            [],
1140        )?;
1141        Ok(())
1142    }
1143
1144    /// Get the buildable package count.
1145    pub fn get_buildable_count(&self) -> Result<i64> {
1146        self.conn.query_row(
1147            "SELECT COUNT(*) FROM packages WHERE skip_reason IS NULL AND fail_reason IS NULL",
1148            [],
1149            |row| row.get(0),
1150        ).context("Failed to count buildable packages")
1151    }
1152
1153    // ========================================================================
1154    // CHANGE DETECTION
1155    // ========================================================================
1156
1157    /// Compare requested pkgpaths against cached ones.
1158    pub fn compare_pkgpath_lists(
1159        &self,
1160        requested: &[&str],
1161    ) -> Result<(Vec<String>, Vec<String>, Vec<String>)> {
1162        let scanned = self.get_scanned_pkgpaths()?;
1163        let requested_set: HashSet<_> =
1164            requested.iter().map(|s| s.to_string()).collect();
1165
1166        let to_add: Vec<_> =
1167            requested_set.difference(&scanned).cloned().collect();
1168        let to_remove: Vec<_> =
1169            scanned.difference(&requested_set).cloned().collect();
1170        let unchanged: Vec<_> =
1171            scanned.intersection(&requested_set).cloned().collect();
1172
1173        Ok((to_add, to_remove, unchanged))
1174    }
1175
1176    /// Delete packages for pkgpaths no longer in the list.
1177    pub fn delete_pkgpaths(&self, pkgpaths: &[&str]) -> Result<usize> {
1178        if pkgpaths.is_empty() {
1179            return Ok(0);
1180        }
1181
1182        let mut count = 0;
1183        for pkgpath in pkgpaths {
1184            count += self.conn.execute(
1185                "DELETE FROM packages WHERE pkgpath = ?1",
1186                [pkgpath],
1187            )?;
1188        }
1189        Ok(count)
1190    }
1191
1192    /// Execute arbitrary SQL and print results.
1193    pub fn execute_raw(&self, sql: &str) -> Result<()> {
1194        let mut stmt = self.conn.prepare(sql)?;
1195        let column_count = stmt.column_count();
1196
1197        if column_count == 0 {
1198            // Non-query statement (INSERT, UPDATE, DELETE, etc.)
1199            let affected = stmt.execute([])?;
1200            if affected > 0 {
1201                println!("{} row(s) affected", affected);
1202            }
1203        } else {
1204            // Query statement (SELECT, PRAGMA, etc.)
1205            let column_names: Vec<String> =
1206                stmt.column_names().iter().map(|s| s.to_string()).collect();
1207
1208            let mut rows = stmt.query([])?;
1209            let mut first = true;
1210
1211            while let Some(row) = rows.next()? {
1212                if first {
1213                    println!("{}", column_names.join("|"));
1214                    first = false;
1215                }
1216
1217                let values: Vec<String> = (0..column_count)
1218                    .map(|i| {
1219                        row.get_ref(i)
1220                            .map(|v| match v {
1221                                rusqlite::types::ValueRef::Null => {
1222                                    String::new()
1223                                }
1224                                rusqlite::types::ValueRef::Integer(i) => {
1225                                    i.to_string()
1226                                }
1227                                rusqlite::types::ValueRef::Real(f) => {
1228                                    f.to_string()
1229                                }
1230                                rusqlite::types::ValueRef::Text(s) => {
1231                                    String::from_utf8_lossy(s).to_string()
1232                                }
1233                                rusqlite::types::ValueRef::Blob(b) => {
1234                                    format!("<blob:{} bytes>", b.len())
1235                                }
1236                            })
1237                            .unwrap_or_default()
1238                    })
1239                    .collect();
1240                println!("{}", values.join("|"));
1241            }
1242        }
1243
1244        Ok(())
1245    }
1246}
1247
1248// ============================================================================
1249// HELPER FUNCTIONS
1250// ============================================================================
1251
1252/// Split pkgname into base and version.
1253fn split_pkgname(pkgname: &str) -> (String, String) {
1254    // Find the last dash that's followed by a digit
1255    let bytes = pkgname.as_bytes();
1256    for i in (0..bytes.len()).rev() {
1257        if bytes[i] == b'-'
1258            && i + 1 < bytes.len()
1259            && bytes[i + 1].is_ascii_digit()
1260        {
1261            return (pkgname[..i].to_string(), pkgname[i + 1..].to_string());
1262        }
1263    }
1264    // No version found
1265    (pkgname.to_string(), String::new())
1266}
1267
1268/// Convert BuildOutcome to database format.
1269fn build_outcome_to_db(
1270    outcome: &BuildOutcome,
1271) -> (&'static str, Option<String>) {
1272    match outcome {
1273        BuildOutcome::Success => ("success", None),
1274        BuildOutcome::UpToDate => ("up_to_date", None),
1275        BuildOutcome::Failed(s) => ("failed", Some(s.clone())),
1276        BuildOutcome::PreFailed(s) => ("pre_failed", Some(s.clone())),
1277        BuildOutcome::IndirectFailed(s) => ("indirect_failed", Some(s.clone())),
1278        BuildOutcome::IndirectPreFailed(s) => {
1279            ("indirect_pre_failed", Some(s.clone()))
1280        }
1281    }
1282}
1283
1284/// Convert database format to BuildOutcome.
1285fn db_outcome_to_build(outcome: &str, detail: Option<String>) -> BuildOutcome {
1286    match outcome {
1287        "success" => BuildOutcome::Success,
1288        "up_to_date" => BuildOutcome::UpToDate,
1289        "failed" => BuildOutcome::Failed(detail.unwrap_or_default()),
1290        "pre_failed" => BuildOutcome::PreFailed(detail.unwrap_or_default()),
1291        "indirect_failed" => {
1292            BuildOutcome::IndirectFailed(detail.unwrap_or_default())
1293        }
1294        "indirect_pre_failed" => {
1295            BuildOutcome::IndirectPreFailed(detail.unwrap_or_default())
1296        }
1297        _ => BuildOutcome::Failed(format!("Unknown outcome: {}", outcome)),
1298    }
1299}