use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{Context, Result};
use pkgsrc::{PkgName, PkgPath, ScanIndex};
use rusqlite::{Connection, params};
use tracing::{debug, warn};
use crate::build::{BuildOutcome, BuildResult};
use crate::config::PkgsrcEnv;
use crate::scan::SkipReason;
const SCHEMA_VERSION: i32 = 3;
#[derive(Clone, Debug)]
pub struct PackageRow {
pub id: i64,
pub pkgname: String,
pub pkgpath: String,
pub skip_reason: Option<String>,
pub fail_reason: Option<String>,
pub is_bootstrap: bool,
pub pbulk_weight: i32,
}
pub struct Database {
conn: Connection,
}
impl Database {
pub fn open(path: &Path) -> Result<Self> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).context("Failed to create database directory")?;
}
let conn = Connection::open(path).context("Failed to open database")?;
let db = Self { conn };
db.configure_pragmas()?;
db.init()?;
Ok(db)
}
pub fn begin_transaction(&self) -> Result<()> {
self.conn.execute("BEGIN TRANSACTION", [])?;
Ok(())
}
pub fn commit(&self) -> Result<()> {
self.conn.execute("COMMIT", [])?;
Ok(())
}
fn configure_pragmas(&self) -> Result<()> {
self.conn.execute_batch(
"PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA cache_size = -64000;
PRAGMA temp_store = MEMORY;
PRAGMA mmap_size = 268435456;
PRAGMA foreign_keys = ON;",
)?;
Ok(())
}
fn init(&self) -> Result<()> {
let has_schema_version: bool = self.conn.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_version'",
[],
|row| row.get::<_, i32>(0).map(|c| c > 0),
)?;
if !has_schema_version {
self.create_schema()?;
} else {
let version: i32 =
self.conn
.query_row("SELECT version FROM schema_version LIMIT 1", [], |row| {
row.get(0)
})?;
if version != SCHEMA_VERSION {
anyhow::bail!(
"Schema mismatch: found v{}, expected v{}. \
Run 'bob clean' to restart.",
version,
SCHEMA_VERSION
);
}
}
Ok(())
}
fn create_schema(&self) -> Result<()> {
self.conn.execute_batch(&format!(
"CREATE TABLE schema_version (version INTEGER NOT NULL);
INSERT INTO schema_version (version) VALUES ({});
CREATE TABLE packages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pkgname TEXT UNIQUE NOT NULL,
pkgpath TEXT NOT NULL,
skip_reason TEXT,
fail_reason TEXT,
is_bootstrap INTEGER DEFAULT 0,
pbulk_weight INTEGER DEFAULT 100,
scan_data TEXT
);
CREATE INDEX idx_packages_pkgpath ON packages(pkgpath);
CREATE INDEX idx_packages_status ON packages(skip_reason, fail_reason);
CREATE TABLE depends (
id INTEGER PRIMARY KEY AUTOINCREMENT,
package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
depend_pattern TEXT NOT NULL,
depend_pkgpath TEXT NOT NULL,
UNIQUE(package_id, depend_pattern)
);
CREATE INDEX idx_depends_package ON depends(package_id);
CREATE INDEX idx_depends_pkgpath ON depends(depend_pkgpath);
CREATE TABLE resolved_depends (
id INTEGER PRIMARY KEY AUTOINCREMENT,
package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
depends_on_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
UNIQUE(package_id, depends_on_id)
);
CREATE INDEX idx_resolved_depends_package ON resolved_depends(package_id);
CREATE INDEX idx_resolved_depends_depends_on ON resolved_depends(depends_on_id);
CREATE TABLE builds (
id INTEGER PRIMARY KEY AUTOINCREMENT,
package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
outcome TEXT NOT NULL,
outcome_detail TEXT,
duration_ms INTEGER NOT NULL DEFAULT 0,
log_dir TEXT,
UNIQUE(package_id)
);
CREATE INDEX idx_builds_outcome ON builds(outcome);
CREATE INDEX idx_builds_package ON builds(package_id);
CREATE TABLE metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);",
SCHEMA_VERSION
))?;
debug!(version = SCHEMA_VERSION, "Created schema");
Ok(())
}
pub fn store_package(&self, pkgpath: &str, index: &ScanIndex) -> Result<i64> {
let pkgname = index.pkgname.pkgname();
let skip_reason = index.pkg_skip_reason.as_ref().filter(|s| !s.is_empty());
let fail_reason = index.pkg_fail_reason.as_ref().filter(|s| !s.is_empty());
let is_bootstrap = index.bootstrap_pkg.as_deref() == Some("yes");
let pbulk_weight: i32 = index
.pbulk_weight
.as_ref()
.and_then(|s| s.parse().ok())
.unwrap_or(100);
let scan_data = serde_json::to_string(index)?;
{
let mut stmt = self.conn.prepare_cached(
"INSERT OR REPLACE INTO packages
(pkgname, pkgpath, skip_reason, fail_reason,
is_bootstrap, pbulk_weight, scan_data)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
)?;
stmt.execute(params![
pkgname,
pkgpath,
skip_reason,
fail_reason,
is_bootstrap,
pbulk_weight,
scan_data
])?;
}
let package_id = self.conn.last_insert_rowid();
if let Some(ref deps) = index.all_depends {
let mut stmt = self.conn.prepare_cached(
"INSERT OR IGNORE INTO depends (package_id, depend_pattern, depend_pkgpath)
VALUES (?1, ?2, ?3)",
)?;
for dep in deps {
stmt.execute(params![
package_id,
dep.pattern().pattern(),
dep.pkgpath().to_string()
])?;
}
}
debug!(pkgname = pkgname, package_id = package_id, "Stored package");
Ok(package_id)
}
pub fn store_scan_pkgpath(&self, pkgpath: &str, indexes: &[ScanIndex]) -> Result<()> {
for index in indexes {
self.store_package(pkgpath, index)?;
}
Ok(())
}
pub fn get_package_by_name(&self, pkgname: &str) -> Result<Option<PackageRow>> {
let result = self.conn.query_row(
"SELECT id, pkgname, pkgpath, skip_reason, fail_reason, is_bootstrap, pbulk_weight
FROM packages WHERE pkgname = ?1",
[pkgname],
|row| {
Ok(PackageRow {
id: row.get(0)?,
pkgname: row.get(1)?,
pkgpath: row.get(2)?,
skip_reason: row.get(3)?,
fail_reason: row.get(4)?,
is_bootstrap: row.get::<_, i32>(5)? != 0,
pbulk_weight: row.get(6)?,
})
},
);
match result {
Ok(pkg) => Ok(Some(pkg)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn get_package_id(&self, pkgname: &str) -> Result<Option<i64>> {
let result = self.conn.query_row(
"SELECT id FROM packages WHERE pkgname = ?1",
[pkgname],
|row| row.get(0),
);
match result {
Ok(id) => Ok(Some(id)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn get_pkgname(&self, package_id: i64) -> Result<String> {
self.conn
.query_row(
"SELECT pkgname FROM packages WHERE id = ?1",
[package_id],
|row| row.get(0),
)
.context("Package not found")
}
pub fn get_packages_by_path(&self, pkgpath: &str) -> Result<Vec<PackageRow>> {
let mut stmt = self.conn.prepare(
"SELECT id, pkgname, pkgpath, skip_reason, fail_reason, is_bootstrap, pbulk_weight
FROM packages WHERE pkgpath = ?1",
)?;
let rows = stmt.query_map([pkgpath], |row| {
Ok(PackageRow {
id: row.get(0)?,
pkgname: row.get(1)?,
pkgpath: row.get(2)?,
skip_reason: row.get(3)?,
fail_reason: row.get(4)?,
is_bootstrap: row.get::<_, i32>(5)? != 0,
pbulk_weight: row.get(6)?,
})
})?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn get_scanned_pkgpaths(&self) -> Result<HashSet<String>> {
let mut stmt = self.conn.prepare("SELECT DISTINCT pkgpath FROM packages")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
rows.collect::<Result<HashSet<_>, _>>().map_err(Into::into)
}
pub fn get_unscanned_dependencies(&self) -> Result<HashSet<String>> {
let mut stmt = self.conn.prepare(
"SELECT DISTINCT d.depend_pkgpath
FROM depends d
WHERE d.depend_pkgpath NOT IN (SELECT pkgpath FROM packages)",
)?;
let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
rows.collect::<Result<HashSet<_>, _>>().map_err(Into::into)
}
pub fn count_packages(&self) -> Result<i64> {
self.conn
.query_row("SELECT COUNT(*) FROM packages", [], |row| row.get(0))
.context("Failed to count packages")
}
pub fn get_all_packages(&self) -> Result<Vec<PackageRow>> {
let mut stmt = self.conn.prepare(
"SELECT id, pkgname, pkgpath, skip_reason, fail_reason, is_bootstrap, pbulk_weight
FROM packages ORDER BY id",
)?;
let rows = stmt.query_map([], |row| {
Ok(PackageRow {
id: row.get(0)?,
pkgname: row.get(1)?,
pkgpath: row.get(2)?,
skip_reason: row.get(3)?,
fail_reason: row.get(4)?,
is_bootstrap: row.get::<_, i32>(5)? != 0,
pbulk_weight: row.get(6)?,
})
})?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn get_buildable_packages(&self) -> Result<Vec<PackageRow>> {
let mut stmt = self.conn.prepare(
"SELECT id, pkgname, pkgpath, skip_reason, fail_reason, is_bootstrap, pbulk_weight
FROM packages WHERE skip_reason IS NULL AND fail_reason IS NULL",
)?;
let rows = stmt.query_map([], |row| {
Ok(PackageRow {
id: row.get(0)?,
pkgname: row.get(1)?,
pkgpath: row.get(2)?,
skip_reason: row.get(3)?,
fail_reason: row.get(4)?,
is_bootstrap: row.get::<_, i32>(5)? != 0,
pbulk_weight: row.get(6)?,
})
})?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn get_full_scan_index(&self, package_id: i64) -> Result<ScanIndex> {
let json: String = self.conn.query_row(
"SELECT scan_data FROM packages WHERE id = ?1",
[package_id],
|row| row.get(0),
)?;
serde_json::from_str(&json).context("Failed to deserialize scan data")
}
pub fn get_all_scan_data(&self) -> Result<Vec<ScanIndex>> {
let mut stmt = self
.conn
.prepare("SELECT id, scan_data FROM packages ORDER BY id")?;
let rows = stmt.query_map([], |row| {
let id: i64 = row.get(0)?;
let json: String = row.get(1)?;
Ok((id, json))
})?;
let mut results = Vec::new();
for row in rows {
let (id, json) = row?;
let index: ScanIndex = serde_json::from_str(&json)
.with_context(|| format!("Failed to deserialize scan data for package {}", id))?;
results.push(index);
}
Ok(results)
}
pub fn clear_scan(&self) -> Result<()> {
self.conn.execute("DELETE FROM packages", [])?;
self.clear_full_scan_complete()?;
Ok(())
}
pub fn store_resolved_deps(&self, summary: &crate::scan::ScanSummary) -> Result<()> {
use crate::scan::ScanResult;
let mut resolved_deps: Vec<(i64, i64)> = Vec::new();
for pkg in &summary.packages {
match pkg {
ScanResult::Buildable(resolved) => {
if let Some(pkg_id) = self.get_package_id(resolved.pkgname().pkgname())? {
for dep in resolved.depends() {
if let Some(dep_id) = self.get_package_id(dep.pkgname())? {
resolved_deps.push((pkg_id, dep_id));
}
}
}
}
ScanResult::Skipped {
index,
resolved_depends,
..
} => {
let Some(idx) = index else { continue };
if let Some(pkg_id) = self.get_package_id(idx.pkgname.pkgname())? {
for dep in resolved_depends {
if let Some(dep_id) = self.get_package_id(dep.pkgname())? {
resolved_deps.push((pkg_id, dep_id));
}
}
}
}
ScanResult::ScanFail { .. } => {}
}
}
if !resolved_deps.is_empty() {
self.store_resolved_dependencies_batch(&resolved_deps)?;
debug!(count = resolved_deps.len(), "Stored resolved dependencies");
}
Ok(())
}
pub fn store_scan_skipped(&self, summary: &crate::scan::ScanSummary) -> Result<()> {
use crate::scan::{ScanResult, SkipReason};
self.conn.execute("BEGIN TRANSACTION", [])?;
for pkg in &summary.packages {
if let ScanResult::Skipped { reason, index, .. } = pkg {
let Some(idx) = index else { continue };
let pkgname = idx.pkgname.pkgname();
let Some(pkg_row) = self.get_package_by_name(pkgname)? else {
continue;
};
let (outcome, detail) = match reason {
SkipReason::PkgSkip(s) => ("pkg_skip", Some(s.clone())),
SkipReason::PkgFail(s) => ("pkg_fail", Some(s.clone())),
SkipReason::IndirectSkip(s) | SkipReason::IndirectFail(s) => {
("indirect_skip", Some(s.clone()))
}
SkipReason::UnresolvedDep(s) => ("unresolved_dep", Some(s.clone())),
};
self.conn.execute(
"INSERT OR IGNORE INTO builds
(package_id, outcome, outcome_detail, duration_ms)
VALUES (?1, ?2, ?3, 0)",
params![pkg_row.id, outcome, detail],
)?;
}
}
self.conn.execute("COMMIT", [])?;
Ok(())
}
fn store_resolved_dependencies_batch(&self, deps: &[(i64, i64)]) -> Result<()> {
self.conn.execute("BEGIN TRANSACTION", [])?;
let mut stmt = self.conn.prepare(
"INSERT OR IGNORE INTO resolved_depends (package_id, depends_on_id) VALUES (?1, ?2)",
)?;
for (package_id, depends_on_id) in deps {
stmt.execute(params![package_id, depends_on_id])?;
}
drop(stmt);
self.conn.execute("COMMIT", [])?;
Ok(())
}
pub fn get_transitive_reverse_deps(&self, package_id: i64) -> Result<Vec<i64>> {
let mut stmt = self.conn.prepare(
"WITH RECURSIVE affected(id) AS (
SELECT ?1
UNION
SELECT rd.package_id
FROM resolved_depends rd
JOIN affected a ON rd.depends_on_id = a.id
)
SELECT id FROM affected WHERE id != ?1",
)?;
let rows = stmt.query_map([package_id], |row| row.get::<_, i64>(0))?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn clear_resolved_depends(&self) -> Result<()> {
self.conn.execute("DELETE FROM resolved_depends", [])?;
Ok(())
}
pub fn store_build_result(&self, package_id: i64, result: &BuildResult) -> Result<()> {
let (outcome, detail) = build_outcome_to_db(&result.outcome);
let duration_ms = result.duration.as_millis() as i64;
let log_dir = result.log_dir.as_ref().map(|p| p.display().to_string());
self.conn.execute(
"INSERT OR REPLACE INTO builds
(package_id, outcome, outcome_detail, duration_ms, log_dir)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![package_id, outcome, detail, duration_ms, log_dir],
)?;
debug!(
package_id = package_id,
outcome = outcome,
"Stored build result"
);
Ok(())
}
pub fn store_build_by_name(&self, result: &BuildResult) -> Result<()> {
if let Some(pkg) = self.get_package_by_name(result.pkgname.pkgname())? {
self.store_build_result(pkg.id, result)
} else {
warn!(pkgname = %result.pkgname.pkgname(), "Package not found in database for build result");
Ok(())
}
}
pub fn get_build_result(&self, package_id: i64) -> Result<Option<BuildResult>> {
let result = self.conn.query_row(
"SELECT p.pkgname, p.pkgpath, b.outcome, b.outcome_detail, b.duration_ms, b.log_dir
FROM builds b
JOIN packages p ON b.package_id = p.id
WHERE b.package_id = ?1",
[package_id],
|row| {
let pkgname: String = row.get(0)?;
let pkgpath: Option<String> = row.get(1)?;
let outcome: String = row.get(2)?;
let detail: Option<String> = row.get(3)?;
let duration_ms: i64 = row.get(4)?;
let log_dir: Option<String> = row.get(5)?;
Ok((pkgname, pkgpath, outcome, detail, duration_ms, log_dir))
},
);
match result {
Ok((pkgname, pkgpath, outcome, detail, duration_ms, log_dir)) => {
let build_outcome = db_outcome_to_build(&outcome, detail);
Ok(Some(BuildResult {
pkgname: PkgName::new(&pkgname),
pkgpath: pkgpath.and_then(|p| PkgPath::new(&p).ok()),
outcome: build_outcome,
duration: Duration::from_millis(duration_ms as u64),
log_dir: log_dir.map(std::path::PathBuf::from),
}))
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn delete_build_by_name(&self, pkgname: &str) -> Result<bool> {
let rows = self.conn.execute(
"DELETE FROM builds WHERE package_id IN (SELECT id FROM packages WHERE pkgname = ?1)",
params![pkgname],
)?;
Ok(rows > 0)
}
pub fn delete_build_by_pkgpath(&self, pkgpath: &str) -> Result<usize> {
let rows = self.conn.execute(
"DELETE FROM builds WHERE package_id IN (SELECT id FROM packages WHERE pkgpath = ?1)",
params![pkgpath],
)?;
Ok(rows)
}
pub fn clear_builds(&self) -> Result<usize> {
let rows = self.conn.execute("DELETE FROM builds", [])?;
Ok(rows)
}
pub fn get_all_build_results(&self) -> Result<Vec<BuildResult>> {
let mut stmt = self.conn.prepare(
"SELECT p.pkgname, p.pkgpath, b.outcome, b.outcome_detail, b.duration_ms, b.log_dir
FROM builds b
JOIN packages p ON b.package_id = p.id
ORDER BY p.pkgname",
)?;
let rows = stmt.query_map([], |row| {
let pkgname: String = row.get(0)?;
let pkgpath: Option<String> = row.get(1)?;
let outcome: String = row.get(2)?;
let detail: Option<String> = row.get(3)?;
let duration_ms: i64 = row.get(4)?;
let log_dir: Option<String> = row.get(5)?;
Ok((pkgname, pkgpath, outcome, detail, duration_ms, log_dir))
})?;
let mut results = Vec::new();
for row in rows {
let (pkgname, pkgpath, outcome, detail, duration_ms, log_dir) = row?;
let build_outcome = db_outcome_to_build(&outcome, detail);
results.push(BuildResult {
pkgname: PkgName::new(&pkgname),
pkgpath: pkgpath.and_then(|p| PkgPath::new(&p).ok()),
outcome: build_outcome,
duration: Duration::from_millis(duration_ms as u64),
log_dir: log_dir.map(std::path::PathBuf::from),
});
}
Ok(results)
}
pub fn count_breaks_for_failed(&self) -> Result<std::collections::HashMap<String, usize>> {
use std::collections::HashMap;
let mut counts: HashMap<String, usize> = HashMap::new();
let mut stmt = self.conn.prepare(
"SELECT p.id, p.pkgname FROM builds b
JOIN packages p ON b.package_id = p.id
WHERE b.outcome = 'failed'",
)?;
let failed: Vec<(i64, String)> = stmt
.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
.filter_map(|r| r.ok())
.collect();
for (_pkg_id, pkgname) in failed {
let count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM builds b
JOIN packages p ON b.package_id = p.id
WHERE b.outcome = 'indirect_failed'
AND b.outcome_detail LIKE ?1",
params![format!("%{}", pkgname)],
|row| row.get(0),
)?;
counts.insert(pkgname, count as usize);
}
Ok(counts)
}
pub fn get_total_build_duration(&self) -> Result<Duration> {
let total_ms: i64 = self.conn.query_row(
"SELECT COALESCE(SUM(duration_ms), 0) FROM builds",
[],
|row| row.get(0),
)?;
Ok(Duration::from_millis(total_ms as u64))
}
pub fn get_blockers(&self, package: &str) -> Result<Vec<(String, String, String)>> {
let pkg = if package.contains('/') {
let pkgs = self.get_packages_by_path(package)?;
pkgs.into_iter().next()
} else {
self.get_package_by_name(package)?
};
let Some(pkg) = pkg else {
anyhow::bail!("Package '{}' not found in database", package);
};
let mut stmt = self.conn.prepare(
"WITH RECURSIVE
blocking(id, reason) AS (
-- Direct dependencies that have failed/skipped builds
SELECT rd.depends_on_id,
CASE b.outcome
WHEN 'failed' THEN 'failed'
WHEN 'pkg_skip' THEN 'prefailed'
WHEN 'pkg_fail' THEN 'prefailed'
WHEN 'indirect_skip' THEN 'indirect-prefailed'
WHEN 'indirect_fail' THEN 'indirect-failed'
WHEN 'indirect_failed' THEN 'indirect-failed'
WHEN 'unresolved_dep' THEN 'unresolved'
ELSE b.outcome
END
FROM resolved_depends rd
JOIN builds b ON b.package_id = rd.depends_on_id
WHERE rd.package_id = ?1
AND b.outcome NOT IN ('success', 'up_to_date')
UNION
-- Transitive: deps of deps that are blocked
SELECT rd.depends_on_id,
CASE b.outcome
WHEN 'failed' THEN 'failed'
WHEN 'pkg_skip' THEN 'prefailed'
WHEN 'pkg_fail' THEN 'prefailed'
WHEN 'indirect_skip' THEN 'indirect-prefailed'
WHEN 'indirect_fail' THEN 'indirect-failed'
WHEN 'indirect_failed' THEN 'indirect-failed'
WHEN 'unresolved_dep' THEN 'unresolved'
ELSE b.outcome
END
FROM resolved_depends rd
JOIN blocking bl ON rd.package_id = bl.id
JOIN builds b ON b.package_id = rd.depends_on_id
WHERE b.outcome NOT IN ('success', 'up_to_date')
)
SELECT DISTINCT p.pkgname, p.pkgpath, bl.reason
FROM blocking bl
JOIN packages p ON bl.id = p.id
-- Only show root causes (failed or prefailed), not indirect
WHERE bl.reason IN ('failed', 'prefailed', 'unresolved')
ORDER BY p.pkgname",
)?;
let rows = stmt.query_map([pkg.id], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn get_blocked_by(&self, package: &str) -> Result<Vec<(String, String)>> {
let pkg = if package.contains('/') {
let pkgs = self.get_packages_by_path(package)?;
pkgs.into_iter().next()
} else {
self.get_package_by_name(package)?
};
let Some(pkg) = pkg else {
anyhow::bail!("Package '{}' not found in database", package);
};
let mut stmt = self.conn.prepare(
"WITH RECURSIVE
affected(id) AS (
-- Direct reverse dependencies
SELECT rd.package_id
FROM resolved_depends rd
WHERE rd.depends_on_id = ?1
UNION
-- Transitive reverse dependencies
SELECT rd.package_id
FROM resolved_depends rd
JOIN affected a ON rd.depends_on_id = a.id
)
SELECT p.pkgname, p.pkgpath
FROM affected a
JOIN packages p ON a.id = p.id
ORDER BY p.pkgname",
)?;
let rows = stmt.query_map([pkg.id], |row| Ok((row.get(0)?, row.get(1)?)))?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn get_prefailed_packages(&self) -> Result<Vec<(String, Option<String>, String)>> {
let mut stmt = self.conn.prepare(
"SELECT p.pkgname, p.pkgpath,
COALESCE(p.fail_reason, p.skip_reason) as reason
FROM packages p
WHERE (p.skip_reason IS NOT NULL OR p.fail_reason IS NOT NULL)
AND NOT EXISTS (SELECT 1 FROM builds b WHERE b.package_id = p.id)
ORDER BY p.pkgname",
)?;
let rows = stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn get_indirect_failures(&self) -> Result<Vec<(String, Option<String>, String)>> {
let mut stmt = self.conn.prepare(
"WITH RECURSIVE
-- Only direct failures are root causes
failed_pkgs(id) AS (
SELECT package_id FROM builds
WHERE outcome IN ('failed', 'prefailed')
),
-- Packages affected by failures (transitive closure)
affected(id, root_id) AS (
SELECT id, id FROM failed_pkgs
UNION
SELECT rd.package_id, a.root_id
FROM resolved_depends rd
JOIN affected a ON rd.depends_on_id = a.id
WHERE rd.package_id NOT IN (SELECT id FROM failed_pkgs)
)
SELECT p.pkgname, p.pkgpath, GROUP_CONCAT(DISTINCT fp.pkgname) as failed_deps
FROM affected a
JOIN packages p ON a.id = p.id
JOIN packages fp ON a.root_id = fp.id
WHERE a.id != a.root_id
AND NOT EXISTS (SELECT 1 FROM builds b WHERE b.package_id = a.id)
AND p.skip_reason IS NULL
AND p.fail_reason IS NULL
GROUP BY p.id, p.pkgname, p.pkgpath
ORDER BY p.pkgname",
)?;
let rows = stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn mark_failure_cascade(
&self,
package_id: i64,
reason: &str,
duration: Duration,
) -> Result<usize> {
let pkgname = self.get_pkgname(package_id)?;
let mut stmt = self.conn.prepare(
"WITH RECURSIVE affected(id, depth) AS (
SELECT ?1, 0
UNION
SELECT rd.package_id, a.depth + 1
FROM resolved_depends rd
JOIN affected a ON rd.depends_on_id = a.id
)
SELECT id, depth FROM affected ORDER BY depth",
)?;
let affected: Vec<(i64, i32)> = stmt
.query_map([package_id], |row| {
Ok((row.get::<_, i64>(0)?, row.get::<_, i32>(1)?))
})?
.filter_map(|r| r.ok())
.collect();
self.conn.execute("BEGIN TRANSACTION", [])?;
for (id, depth) in &affected {
let (outcome, detail, dur) = if *depth == 0 {
("failed", reason.to_string(), duration.as_millis() as i64)
} else {
(
"indirect_failed",
format!("depends on failed {}", pkgname),
0,
)
};
self.conn.execute(
"INSERT OR REPLACE INTO builds
(package_id, outcome, outcome_detail, duration_ms)
VALUES (?1, ?2, ?3, ?4)",
params![id, outcome, detail, dur],
)?;
}
self.conn.execute("COMMIT", [])?;
debug!(
package_id = package_id,
affected_count = affected.len(),
"Marked failure cascade"
);
Ok(affected.len())
}
pub fn full_scan_complete(&self) -> bool {
self.conn
.query_row(
"SELECT value FROM metadata WHERE key = 'full_scan_complete'",
[],
|row| row.get::<_, String>(0),
)
.map(|v| v == "true")
.unwrap_or(false)
}
pub fn set_full_scan_complete(&self) -> Result<()> {
self.conn.execute(
"INSERT OR REPLACE INTO metadata (key, value) VALUES ('full_scan_complete', 'true')",
[],
)?;
Ok(())
}
pub fn clear_full_scan_complete(&self) -> Result<()> {
self.conn
.execute("DELETE FROM metadata WHERE key = 'full_scan_complete'", [])?;
Ok(())
}
pub fn store_pkgsrc_env(&self, env: &PkgsrcEnv) -> Result<()> {
let json = serde_json::json!({
"packages": env.packages,
"pkgtools": env.pkgtools,
"prefix": env.prefix,
"pkg_dbdir": env.pkg_dbdir,
"pkg_refcount_dbdir": env.pkg_refcount_dbdir,
"cachevars": env.cachevars,
});
self.conn.execute(
"INSERT INTO metadata (key, value) VALUES ('pkgsrc_env', ?1)",
params![json.to_string()],
)?;
Ok(())
}
pub fn load_pkgsrc_env(&self) -> Result<PkgsrcEnv> {
let json_str: String = self
.conn
.query_row(
"SELECT value FROM metadata WHERE key = 'pkgsrc_env'",
[],
|row| row.get(0),
)
.context("pkgsrc environment not found in database")?;
let json: serde_json::Value =
serde_json::from_str(&json_str).context("Invalid pkgsrc_env JSON")?;
let get_path = |key: &str| -> Result<PathBuf> {
json.get(key)
.and_then(|v| v.as_str())
.map(PathBuf::from)
.ok_or_else(|| anyhow::anyhow!("Missing {} in pkgsrc_env", key))
};
let cachevars: HashMap<String, String> = json
.get("cachevars")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
Ok(PkgsrcEnv {
packages: get_path("packages")?,
pkgtools: get_path("pkgtools")?,
prefix: get_path("prefix")?,
pkg_dbdir: get_path("pkg_dbdir")?,
pkg_refcount_dbdir: get_path("pkg_refcount_dbdir")?,
cachevars,
})
}
pub fn get_successful_packages(&self) -> Result<Vec<String>> {
let mut stmt = self.conn.prepare(
"SELECT p.pkgname FROM builds b
JOIN packages p ON b.package_id = p.id
WHERE b.outcome IN ('success', 'up_to_date')
ORDER BY p.pkgname",
)?;
let pkgnames = stmt
.query_map([], |row| row.get::<_, String>(0))?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(pkgnames)
}
pub fn execute_raw(&self, sql: &str) -> Result<()> {
let mut stmt = self.conn.prepare(sql)?;
let column_count = stmt.column_count();
if column_count == 0 {
let affected = stmt.execute([])?;
if affected > 0 {
println!("{} row(s) affected", affected);
}
} else {
let mut rows = stmt.query([])?;
while let Some(row) = rows.next()? {
let values: Vec<String> = (0..column_count)
.map(|i| {
row.get_ref(i)
.map(|v| match v {
rusqlite::types::ValueRef::Null => String::new(),
rusqlite::types::ValueRef::Integer(i) => i.to_string(),
rusqlite::types::ValueRef::Real(f) => f.to_string(),
rusqlite::types::ValueRef::Text(s) => {
String::from_utf8_lossy(s).to_string()
}
rusqlite::types::ValueRef::Blob(b) => {
format!("<blob:{} bytes>", b.len())
}
})
.unwrap_or_default()
})
.collect();
println!("{}", values.join("|"));
}
}
Ok(())
}
}
fn build_outcome_to_db(outcome: &BuildOutcome) -> (&'static str, Option<String>) {
match outcome {
BuildOutcome::Success => ("success", None),
BuildOutcome::UpToDate => ("up_to_date", None),
BuildOutcome::Failed(s) => ("failed", Some(s.clone())),
BuildOutcome::Skipped(reason) => match reason {
SkipReason::PkgSkip(s) => ("pkg_skip", Some(s.clone())),
SkipReason::PkgFail(s) => ("pkg_fail", Some(s.clone())),
SkipReason::IndirectSkip(s) => ("indirect_skip", Some(s.clone())),
SkipReason::IndirectFail(s) => ("indirect_fail", Some(s.clone())),
SkipReason::UnresolvedDep(s) => ("unresolved_dep", Some(s.clone())),
},
}
}
fn db_outcome_to_build(outcome: &str, detail: Option<String>) -> BuildOutcome {
match outcome {
"success" => BuildOutcome::Success,
"up_to_date" => BuildOutcome::UpToDate,
"failed" => BuildOutcome::Failed(detail.unwrap_or_default()),
"pkg_skip" => BuildOutcome::Skipped(SkipReason::PkgSkip(detail.unwrap_or_default())),
"pkg_fail" => BuildOutcome::Skipped(SkipReason::PkgFail(detail.unwrap_or_default())),
"indirect_skip" => {
BuildOutcome::Skipped(SkipReason::IndirectSkip(detail.unwrap_or_default()))
}
"indirect_fail" | "indirect_failed" => {
BuildOutcome::Skipped(SkipReason::IndirectFail(detail.unwrap_or_default()))
}
"unresolved_dep" => {
BuildOutcome::Skipped(SkipReason::UnresolvedDep(detail.unwrap_or_default()))
}
_ => BuildOutcome::Failed(format!("Unknown outcome: {}", outcome)),
}
}