use std::cell::OnceCell;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{Context, Result};
use pkgsrc::{PkgName, PkgPath, ScanIndex};
use rusqlite::{Connection, params};
use tracing::{debug, warn};
use strum::VariantArray;
use crate::build::{BuildResult, PkgBuildStats, Stage};
use crate::config::PkgsrcEnv;
use crate::scan::ScanResult;
use crate::try_println;
use crate::{HistoryKind, PackageState, PackageStateKind};
pub type ReportRow = (
String,
Option<String>,
Option<String>,
Option<i32>,
Option<String>,
);
fn stage_values() -> String {
Stage::VARIANTS
.iter()
.map(|v| format!("({}, '{}')", *v as i32, v.into_str()))
.collect::<Vec<_>>()
.join(", ")
}
fn history_schema() -> String {
use strum::VariantArray;
HistoryKind::VARIANTS
.iter()
.map(|v| {
let name: &str = v.into();
let sql = match v {
HistoryKind::Pkgpath | HistoryKind::Pkgname | HistoryKind::Pkgbase => {
"TEXT NOT NULL"
}
HistoryKind::Outcome => "INTEGER NOT NULL REFERENCES outcome_types(id)",
HistoryKind::Stage => "INTEGER REFERENCES stage_types(id)",
HistoryKind::Duration | HistoryKind::Timestamp => "INTEGER NOT NULL",
HistoryKind::MakeJobs => "INTEGER",
HistoryKind::DiskUsage => "INTEGER",
HistoryKind::Wrkobjdir => "TEXT",
HistoryKind::BuildId => "TEXT",
};
format!("{name} {sql}")
})
.collect::<Vec<_>>()
.join(",\n ")
}
const SCHEMA_VERSION: i32 = 20260317;
const HISTORY_SCHEMA_VERSION: i32 = 20260406;
#[derive(Clone, Debug)]
pub struct PkgBuildHistory {
pub outcome: Option<PackageStateKind>,
pub disk_usage: Option<u64>,
}
#[derive(Clone, Debug)]
pub struct PackageRow {
pub id: i64,
pub pkgname: String,
pub pkgpath: String,
pub skip_reason: Option<String>,
pub fail_reason: Option<String>,
pub is_bootstrap: bool,
pub pbulk_weight: i32,
pub multi_version: Option<String>,
}
#[derive(Clone, Debug)]
pub struct PackageStatusRow {
pub id: i64,
pub pkgname: String,
pub pkgpath: String,
pub skip_reason: Option<String>,
pub fail_reason: Option<String>,
pub build_reason: Option<String>,
pub multi_version: Option<String>,
pub build_outcome: Option<i32>,
pub outcome_detail: Option<String>,
}
impl PackageRow {
fn from_row(row: &rusqlite::Row) -> rusqlite::Result<Self> {
Ok(Self {
id: row.get(0)?,
pkgname: row.get(1)?,
pkgpath: row.get(2)?,
skip_reason: row.get(3)?,
fail_reason: row.get(4)?,
is_bootstrap: row.get::<_, i32>(5)? != 0,
pbulk_weight: row.get(6)?,
multi_version: row.get(7)?,
})
}
}
pub struct Database {
conn: Connection,
dbdir: PathBuf,
history_conn: OnceCell<Connection>,
}
pub struct TransactionGuard<'a> {
conn: &'a Connection,
committed: bool,
}
impl<'a> TransactionGuard<'a> {
fn new(conn: &'a Connection) -> Result<Self> {
conn.execute("BEGIN TRANSACTION", [])?;
Ok(Self {
conn,
committed: false,
})
}
pub fn commit(mut self) -> Result<()> {
self.conn.execute("COMMIT", [])?;
self.committed = true;
Ok(())
}
}
impl Drop for TransactionGuard<'_> {
fn drop(&mut self) {
if !self.committed {
let _ = self.conn.execute("ROLLBACK", []);
}
}
}
impl Database {
pub fn open(dbdir: &Path) -> Result<Self> {
std::fs::create_dir_all(dbdir).context("Failed to create database directory")?;
let conn = Connection::open(dbdir.join("bob.db")).context("Failed to open database")?;
let db = Self {
conn,
dbdir: dbdir.to_path_buf(),
history_conn: OnceCell::new(),
};
db.configure_pragmas()?;
db.init()?;
Ok(db)
}
pub fn dbdir(&self) -> &Path {
&self.dbdir
}
pub(crate) fn conn(&self) -> &Connection {
&self.conn
}
pub fn transaction(&self) -> Result<TransactionGuard<'_>> {
TransactionGuard::new(&self.conn)
}
fn configure_pragmas(&self) -> Result<()> {
self.conn.execute_batch(
"PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA cache_size = -64000;
PRAGMA temp_store = MEMORY;
PRAGMA mmap_size = 268435456;
PRAGMA foreign_keys = ON;",
)?;
Ok(())
}
fn init(&self) -> Result<()> {
let has_schema_version: bool = self.conn.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_version'",
[],
|row| row.get::<_, i32>(0).map(|c| c > 0),
)?;
if !has_schema_version {
self.create_schema()?;
} else {
let version: i32 =
self.conn
.query_row("SELECT version FROM schema_version LIMIT 1", [], |row| {
row.get(0)
})?;
if version != SCHEMA_VERSION {
anyhow::bail!(
"Schema mismatch: found v{}, expected v{}. \
Run 'bob clean' to restart.",
version,
SCHEMA_VERSION
);
}
}
check_history_schema(&self.dbdir)?;
Ok(())
}
fn create_schema(&self) -> Result<()> {
self.conn.execute_batch(&format!(
"CREATE TABLE schema_version (version INTEGER NOT NULL);
INSERT INTO schema_version (version) VALUES ({});
CREATE TABLE outcome_types (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL
);
INSERT INTO outcome_types (id, name) VALUES {outcome_types};
CREATE TABLE stage_types (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL
);
INSERT INTO stage_types (id, name) VALUES {stages};
CREATE TABLE packages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pkgname TEXT UNIQUE NOT NULL,
pkgpath TEXT NOT NULL,
skip_reason TEXT,
fail_reason TEXT,
build_reason TEXT,
selected INTEGER NOT NULL DEFAULT 0,
is_bootstrap INTEGER DEFAULT 0,
pbulk_weight INTEGER DEFAULT 100,
make_jobs_safe INTEGER NOT NULL DEFAULT 1,
scan_data TEXT
);
CREATE INDEX idx_packages_pkgpath ON packages(pkgpath);
CREATE INDEX idx_packages_status ON packages(skip_reason, fail_reason);
CREATE TABLE depends (
id INTEGER PRIMARY KEY AUTOINCREMENT,
package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
depend_pattern TEXT NOT NULL,
depend_pkgpath TEXT NOT NULL,
UNIQUE(package_id, depend_pattern)
);
CREATE INDEX idx_depends_package ON depends(package_id);
CREATE INDEX idx_depends_pkgpath ON depends(depend_pkgpath);
CREATE TABLE resolved_depends (
id INTEGER PRIMARY KEY AUTOINCREMENT,
package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
depends_on_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
UNIQUE(package_id, depends_on_id)
);
CREATE INDEX idx_resolved_depends_package ON resolved_depends(package_id);
CREATE INDEX idx_resolved_depends_depends_on ON resolved_depends(depends_on_id);
CREATE TABLE builds (
id INTEGER PRIMARY KEY AUTOINCREMENT,
package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
outcome INTEGER NOT NULL REFERENCES outcome_types(id),
outcome_detail TEXT,
stage INTEGER REFERENCES stage_types(id),
duration_ms INTEGER NOT NULL DEFAULT 0,
log_dir TEXT,
failed_dep_id INTEGER REFERENCES packages(id),
UNIQUE(package_id)
);
CREATE INDEX idx_builds_outcome ON builds(outcome);
CREATE INDEX idx_builds_package ON builds(package_id);
CREATE INDEX idx_builds_failed_dep ON builds(failed_dep_id);
CREATE TABLE scan_failures (
pkgpath TEXT PRIMARY KEY,
error TEXT NOT NULL
);
CREATE TABLE metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);",
SCHEMA_VERSION,
outcome_types = PackageState::db_values(),
stages = stage_values(),
))?;
let build_id = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
self.conn.execute(
"INSERT INTO metadata (key, value) VALUES ('build_id', ?1)",
params![build_id],
)?;
debug!(version = SCHEMA_VERSION, build_id = %build_id, "Created schema");
Ok(())
}
pub fn store_package(&self, pkgpath: &str, index: &ScanIndex) -> Result<i64> {
let pkgname = index.pkgname.pkgname();
let skip_reason = index.pkg_skip_reason.as_ref().filter(|s| !s.is_empty());
let fail_reason = index.pkg_fail_reason.as_ref().filter(|s| !s.is_empty());
let is_bootstrap = index.bootstrap_pkg.as_deref() == Some("yes");
let pbulk_weight: i32 = index
.pbulk_weight
.as_ref()
.and_then(|s| s.parse().ok())
.unwrap_or(100);
let make_jobs_safe = index
.make_jobs_safe
.as_ref()
.map(|v| !v.eq_ignore_ascii_case("no"))
.unwrap_or(true);
let scan_data = serde_json::to_string(index)?;
{
let mut stmt = self.conn.prepare_cached(
"INSERT OR REPLACE INTO packages
(pkgname, pkgpath, skip_reason, fail_reason,
is_bootstrap, pbulk_weight, make_jobs_safe, scan_data)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
)?;
stmt.execute(params![
pkgname,
pkgpath,
skip_reason,
fail_reason,
is_bootstrap,
pbulk_weight,
make_jobs_safe,
scan_data
])?;
}
let package_id = self.conn.last_insert_rowid();
if let Some(ref deps) = index.all_depends {
let mut stmt = self.conn.prepare_cached(
"INSERT OR IGNORE INTO depends (package_id, depend_pattern, depend_pkgpath)
VALUES (?1, ?2, ?3)",
)?;
for dep in deps.depends().flatten() {
stmt.execute(params![
package_id,
dep.pattern().pattern(),
dep.pkgpath().as_str()
])?;
}
}
debug!(pkgname = pkgname, package_id = package_id, "Stored package");
Ok(package_id)
}
pub fn store_scan_pkgpath(&self, pkgpath: &str, indexes: &[ScanIndex]) -> Result<()> {
for index in indexes {
self.store_package(pkgpath, index)?;
}
Ok(())
}
pub fn get_package_by_name(&self, pkgname: &str) -> Result<Option<PackageRow>> {
let result = self.conn.query_row(
"SELECT id, pkgname, pkgpath, skip_reason, fail_reason, is_bootstrap, pbulk_weight,
json_extract(scan_data, '$.MULTI_VERSION')
FROM packages WHERE pkgname = ?1",
[pkgname],
PackageRow::from_row,
);
match result {
Ok(pkg) => Ok(Some(pkg)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn get_package_id(&self, pkgname: &str) -> Result<Option<i64>> {
let result = self.conn.query_row(
"SELECT id FROM packages WHERE pkgname = ?1",
[pkgname],
|row| row.get(0),
);
match result {
Ok(id) => Ok(Some(id)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn get_pkgname(&self, package_id: i64) -> Result<String> {
self.conn
.query_row(
"SELECT pkgname FROM packages WHERE id = ?1",
[package_id],
|row| row.get(0),
)
.context("Package not found")
}
pub fn get_packages_by_path(&self, pkgpath: &str) -> Result<Vec<PackageRow>> {
let mut stmt = self.conn.prepare(
"SELECT id, pkgname, pkgpath, skip_reason, fail_reason, is_bootstrap, pbulk_weight,
json_extract(scan_data, '$.MULTI_VERSION')
FROM packages WHERE pkgpath = ?1",
)?;
let rows = stmt.query_map([pkgpath], PackageRow::from_row)?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn get_scanned_pkgpaths(&self) -> Result<HashSet<String>> {
let mut stmt = self.conn.prepare("SELECT DISTINCT pkgpath FROM packages")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
rows.collect::<Result<HashSet<_>, _>>().map_err(Into::into)
}
pub fn get_unscanned_dependencies(&self) -> Result<HashSet<String>> {
let mut stmt = self.conn.prepare(
"SELECT DISTINCT d.depend_pkgpath
FROM depends d
WHERE d.depend_pkgpath NOT IN (SELECT pkgpath FROM packages)",
)?;
let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
rows.collect::<Result<HashSet<_>, _>>().map_err(Into::into)
}
pub fn count_packages(&self) -> Result<i64> {
self.conn
.query_row("SELECT COUNT(*) FROM packages", [], |row| row.get(0))
.context("Failed to count packages")
}
pub fn get_all_packages(&self) -> Result<Vec<PackageRow>> {
let mut stmt = self.conn.prepare(
"SELECT id, pkgname, pkgpath, skip_reason, fail_reason, is_bootstrap, pbulk_weight,
json_extract(scan_data, '$.MULTI_VERSION')
FROM packages ORDER BY id",
)?;
let rows = stmt.query_map([], PackageRow::from_row)?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn get_all_package_status(&self, need_multi: bool) -> Result<Vec<PackageStatusRow>> {
let sql = if need_multi {
"SELECT p.id, p.pkgname, p.pkgpath, p.skip_reason,
p.fail_reason, p.build_reason,
json_extract(p.scan_data, '$.MULTI_VERSION'),
b.outcome, b.outcome_detail
FROM packages p
LEFT JOIN builds b ON b.package_id = p.id
WHERE p.selected = 1"
} else {
"SELECT p.id, p.pkgname, p.pkgpath, p.skip_reason,
p.fail_reason, p.build_reason,
NULL,
b.outcome, b.outcome_detail
FROM packages p
LEFT JOIN builds b ON b.package_id = p.id
WHERE p.selected = 1"
};
let mut stmt = self.conn.prepare(sql)?;
let mapped = stmt.query_map([], |row| {
Ok(PackageStatusRow {
id: row.get(0)?,
pkgname: row.get(1)?,
pkgpath: row.get(2)?,
skip_reason: row.get(3)?,
fail_reason: row.get(4)?,
build_reason: row.get(5)?,
multi_version: row.get(6)?,
build_outcome: row.get(7)?,
outcome_detail: row.get(8)?,
})
})?;
mapped.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn get_buildable_packages(&self) -> Result<Vec<PackageRow>> {
let mut stmt = self.conn.prepare(
"SELECT id, pkgname, pkgpath, skip_reason, fail_reason, is_bootstrap, pbulk_weight,
json_extract(scan_data, '$.MULTI_VERSION')
FROM packages
WHERE selected = 1 AND skip_reason IS NULL AND fail_reason IS NULL",
)?;
let rows = stmt.query_map([], PackageRow::from_row)?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn get_full_scan_index(&self, package_id: i64) -> Result<ScanIndex> {
let json: String = self.conn.query_row(
"SELECT scan_data FROM packages WHERE id = ?1",
[package_id],
|row| row.get(0),
)?;
serde_json::from_str(&json).context("Failed to deserialize scan data")
}
pub fn get_all_scan_data(&self) -> Result<Vec<ScanIndex>> {
let mut stmt = self
.conn
.prepare("SELECT id, scan_data FROM packages ORDER BY id")?;
let rows = stmt.query_map([], |row| {
let id: i64 = row.get(0)?;
let json: String = row.get(1)?;
Ok((id, json))
})?;
let mut results = Vec::new();
for row in rows {
let (id, json) = row?;
let index: ScanIndex = serde_json::from_str(&json)
.with_context(|| format!("Failed to deserialize scan data for package {}", id))?;
results.push(index);
}
Ok(results)
}
pub fn clear_scan(&self) -> Result<()> {
self.conn.execute("DELETE FROM packages", [])?;
self.clear_full_scan_complete()?;
Ok(())
}
pub fn store_build_reason(&self, pkgname: &str, reason: &str) -> Result<()> {
self.conn.execute(
"UPDATE packages SET build_reason = ?1 WHERE pkgname = ?2",
params![reason, pkgname],
)?;
Ok(())
}
pub fn clear_build_reasons(&self) -> Result<()> {
self.conn
.execute("UPDATE packages SET build_reason = NULL", [])?;
Ok(())
}
pub fn get_build_reason(&self, pkgname: &str) -> Result<Option<String>> {
let result = self.conn.query_row(
"SELECT build_reason FROM packages WHERE pkgname = ?1",
[pkgname],
|row| row.get(0),
);
match result {
Ok(reason) => Ok(reason),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn get_all_build_reasons(&self) -> Result<HashMap<String, String>> {
let mut stmt = self
.conn
.prepare("SELECT pkgname, build_reason FROM packages WHERE build_reason IS NOT NULL")?;
let rows = stmt.query_map([], |row| {
let pkgname: String = row.get(0)?;
let reason: String = row.get(1)?;
Ok((pkgname, reason))
})?;
let mut result = HashMap::new();
for row in rows {
let (pkgname, reason) = row?;
result.insert(pkgname, reason);
}
Ok(result)
}
pub fn store_resolved_deps(&self, summary: &crate::scan::ScanSummary) -> Result<()> {
let mut resolved_deps: Vec<(i64, i64)> = Vec::new();
for pkg in &summary.packages {
match pkg {
ScanResult::Buildable(resolved) => {
if let Some(pkg_id) = self.get_package_id(resolved.pkgname().pkgname())? {
for dep in resolved.depends() {
if let Some(dep_id) = self.get_package_id(dep.pkgname())? {
resolved_deps.push((pkg_id, dep_id));
}
}
}
}
ScanResult::Skipped {
index,
resolved_depends,
..
} => {
let Some(idx) = index else { continue };
if let Some(pkg_id) = self.get_package_id(idx.pkgname.pkgname())? {
for dep in resolved_depends {
if let Some(dep_id) = self.get_package_id(dep.pkgname())? {
resolved_deps.push((pkg_id, dep_id));
}
}
}
}
ScanResult::ScanFail { .. } => {}
}
}
if !resolved_deps.is_empty() {
self.store_resolved_dependencies_batch(&resolved_deps)?;
debug!(count = resolved_deps.len(), "Stored resolved dependencies");
}
Ok(())
}
pub fn store_scan_skipped(&self, summary: &crate::scan::ScanSummary) -> Result<()> {
let tx = self.transaction()?;
for pkg in &summary.packages {
if let ScanResult::Skipped { state, index, .. } = pkg {
let Some(idx) = index else { continue };
let pkgname = idx.pkgname.pkgname();
let Some(pkg_row) = self.get_package_by_name(pkgname)? else {
continue;
};
let outcome = state.db_id();
let detail = state.to_string();
self.conn.execute(
"INSERT OR IGNORE INTO builds
(package_id, outcome, outcome_detail, duration_ms)
VALUES (?1, ?2, ?3, 0)",
params![pkg_row.id, outcome, detail],
)?;
}
}
tx.commit()
}
pub fn store_scan_failures(&self, summary: &crate::scan::ScanSummary) -> Result<()> {
self.conn.execute("DELETE FROM scan_failures", [])?;
let mut stmt = self
.conn
.prepare("INSERT INTO scan_failures (pkgpath, error) VALUES (?1, ?2)")?;
for pkg in &summary.packages {
if let ScanResult::ScanFail { pkgpath, error } = pkg {
stmt.execute(params![pkgpath.as_path().display().to_string(), error])?;
}
}
Ok(())
}
pub fn get_scan_failures(&self) -> Result<Vec<(String, String)>> {
let mut stmt = self
.conn
.prepare("SELECT pkgpath, error FROM scan_failures ORDER BY pkgpath")?;
let rows = stmt
.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(rows)
}
pub fn store_resolved_selection(&self, summary: &crate::scan::ScanSummary) -> Result<()> {
let tx = self.transaction()?;
self.conn.execute("UPDATE packages SET selected = 0", [])?;
let mut stmt = self
.conn
.prepare("UPDATE packages SET selected = 1 WHERE pkgname = ?1")?;
for pkg in &summary.packages {
let Some(pkgname) = pkg.pkgname() else {
continue;
};
stmt.execute([pkgname.pkgname()])?;
}
drop(stmt);
tx.commit()
}
fn store_resolved_dependencies_batch(&self, deps: &[(i64, i64)]) -> Result<()> {
let tx = self.transaction()?;
let mut stmt = self.conn.prepare(
"INSERT OR IGNORE INTO resolved_depends (package_id, depends_on_id) VALUES (?1, ?2)",
)?;
for (package_id, depends_on_id) in deps {
stmt.execute(params![package_id, depends_on_id])?;
}
drop(stmt);
tx.commit()
}
pub fn get_transitive_reverse_deps(&self, package_id: i64) -> Result<Vec<i64>> {
let mut stmt = self.conn.prepare(
"WITH RECURSIVE affected(id) AS (
SELECT ?1
UNION
SELECT rd.package_id
FROM resolved_depends rd
JOIN affected a ON rd.depends_on_id = a.id
)
SELECT id FROM affected WHERE id != ?1",
)?;
let rows = stmt.query_map([package_id], |row| row.get::<_, i64>(0))?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn clear_resolved_depends(&self) -> Result<()> {
self.conn.execute("DELETE FROM resolved_depends", [])?;
Ok(())
}
pub fn get_all_resolved_deps(&self) -> Result<HashMap<String, Vec<String>>> {
let mut stmt = self.conn.prepare(
"SELECT p1.pkgname, p2.pkgname
FROM resolved_depends rd
JOIN packages p1 ON rd.package_id = p1.id
JOIN packages p2 ON rd.depends_on_id = p2.id
ORDER BY p1.pkgname, p2.pkgname",
)?;
let rows = stmt.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})?;
let mut deps: HashMap<String, Vec<String>> = HashMap::new();
for row in rows {
let (pkg, dep) = row?;
deps.entry(pkg).or_default().push(dep);
}
Ok(deps)
}
pub fn load_resolved_packages(&self) -> Result<Vec<crate::scan::ResolvedPackage>> {
let skipped_ids: HashSet<i64> = {
let mut stmt = self
.conn
.prepare("SELECT package_id, outcome FROM builds")?;
let rows =
stmt.query_map([], |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i32>(1)?)))?;
let mut ids = HashSet::new();
for row in rows {
let (id, outcome_id) = row?;
if let Some(state) = PackageState::from_db(outcome_id, None) {
if state.is_skip() {
ids.insert(id);
}
}
}
ids
};
let buildable = self.get_buildable_packages()?;
let all_deps = self.get_all_resolved_deps()?;
let mut packages = Vec::with_capacity(buildable.len());
for row in &buildable {
if skipped_ids.contains(&row.id) {
continue;
}
let mut index = self.get_full_scan_index(row.id)?;
let deps = all_deps.get(&row.pkgname).cloned().unwrap_or_default();
index.resolved_depends = Some(
deps.into_iter()
.map(|d| d.parse())
.collect::<Result<Vec<PkgName>, _>>()?,
);
packages.push(crate::scan::ResolvedPackage {
pkgpath: row.pkgpath.parse()?,
index,
});
}
Ok(packages)
}
pub fn get_resolved_dep_ids(&self) -> Result<Vec<(i64, i64)>> {
let mut stmt = self
.conn
.prepare("SELECT package_id, depends_on_id FROM resolved_depends")?;
let rows = stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn store_build_result(&self, package_id: i64, result: &BuildResult) -> Result<()> {
let outcome = result.state.db_id();
let detail = result.state.detail().map(String::from);
let stage = result.build_stats.stage.map(|s| s as i32);
let duration_ms = result.build_stats.duration.as_millis() as i64;
let log_dir = result.log_dir.as_ref().map(|p| p.display().to_string());
self.conn.execute(
"INSERT OR REPLACE INTO builds
(package_id, outcome, outcome_detail, stage, duration_ms, log_dir)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![package_id, outcome, detail, stage, duration_ms, log_dir],
)?;
debug!(
package_id = package_id,
outcome = outcome,
"Stored build result"
);
Ok(())
}
pub fn is_successful(&self, pkgname: &str) -> Result<bool> {
let success = PackageStateKind::Success as i32;
Ok(self.conn.query_row(
"SELECT COUNT(*) FROM builds b
JOIN packages p ON b.package_id = p.id
WHERE p.pkgname = ?1 AND b.outcome = ?2",
params![pkgname, success],
|row| row.get::<_, i64>(0),
)? > 0)
}
pub fn store_build_by_name(&self, result: &BuildResult) -> Result<()> {
if let Some(pkg) = self.get_package_by_name(result.pkgname.pkgname())? {
self.store_build_result(pkg.id, result)
} else {
warn!(pkgname = %result.pkgname.pkgname(), "Package not found in database for build result");
Ok(())
}
}
pub fn get_build_result(&self, package_id: i64) -> Result<Option<BuildResult>> {
let result = self.conn.query_row(
"SELECT p.pkgname, p.pkgpath, b.outcome, b.outcome_detail,
b.stage, b.duration_ms, b.log_dir
FROM builds b
JOIN packages p ON b.package_id = p.id
WHERE b.package_id = ?1",
[package_id],
|row| {
let pkgname: String = row.get(0)?;
let pkgpath: Option<String> = row.get(1)?;
let outcome_id: i32 = row.get(2)?;
let detail: Option<String> = row.get(3)?;
let stage_id: Option<i32> = row.get(4)?;
let duration_ms: i64 = row.get(5)?;
let log_dir: Option<String> = row.get(6)?;
Ok((
pkgname,
pkgpath,
outcome_id,
detail,
stage_id,
duration_ms,
log_dir,
))
},
);
match result {
Ok((pkgname, pkgpath, outcome_id, detail, stage_id, duration_ms, log_dir)) => {
let outcome = PackageState::from_db(outcome_id, detail)
.ok_or_else(|| anyhow::anyhow!("Unknown outcome type id: {}", outcome_id))?;
let stage = stage_id
.map(|id| {
Stage::from_repr(id)
.ok_or_else(|| anyhow::anyhow!("Unknown stage id: {}", id))
})
.transpose()?;
Ok(Some(BuildResult {
pkgname: PkgName::new(&pkgname),
pkgpath: pkgpath.and_then(|p| PkgPath::new(&p).ok()),
state: outcome,
log_dir: log_dir.map(std::path::PathBuf::from),
build_stats: PkgBuildStats {
stage,
duration: Duration::from_millis(duration_ms as u64),
..PkgBuildStats::default()
},
}))
}
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn delete_build_by_name(&self, pkgname: &str) -> Result<bool> {
let rows = self.conn.execute(
"DELETE FROM builds WHERE package_id IN (SELECT id FROM packages WHERE pkgname = ?1)",
params![pkgname],
)?;
Ok(rows > 0)
}
pub fn delete_build_by_pkgpath(&self, pkgpath: &str) -> Result<usize> {
let rows = self.conn.execute(
"DELETE FROM builds WHERE package_id IN (SELECT id FROM packages WHERE pkgpath = ?1)",
params![pkgpath],
)?;
Ok(rows)
}
pub fn clear_builds(&self) -> Result<usize> {
let rows = self.conn.execute("DELETE FROM builds", [])?;
Ok(rows)
}
pub fn get_all_build_results(&self) -> Result<Vec<BuildResult>> {
let mut stmt = self.conn.prepare(
"SELECT p.pkgname, p.pkgpath, b.outcome, b.outcome_detail,
b.stage, b.duration_ms, b.log_dir
FROM builds b
JOIN packages p ON b.package_id = p.id
ORDER BY p.pkgname",
)?;
let rows = stmt.query_map([], |row| {
let pkgname: String = row.get(0)?;
let pkgpath: Option<String> = row.get(1)?;
let outcome_id: i32 = row.get(2)?;
let detail: Option<String> = row.get(3)?;
let stage_id: Option<i32> = row.get(4)?;
let duration_ms: i64 = row.get(5)?;
let log_dir: Option<String> = row.get(6)?;
Ok((
pkgname,
pkgpath,
outcome_id,
detail,
stage_id,
duration_ms,
log_dir,
))
})?;
let mut results = Vec::new();
for row in rows {
let (pkgname, pkgpath, outcome_id, detail, stage_id, duration_ms, log_dir) = row?;
let Some(outcome) = PackageState::from_db(outcome_id, detail) else {
continue;
};
let stage = stage_id.and_then(Stage::from_repr);
results.push(BuildResult {
pkgname: PkgName::new(&pkgname),
pkgpath: pkgpath.and_then(|p| PkgPath::new(&p).ok()),
state: outcome,
log_dir: log_dir.map(std::path::PathBuf::from),
build_stats: PkgBuildStats {
stage,
duration: Duration::from_millis(duration_ms as u64),
..PkgBuildStats::default()
},
});
}
Ok(results)
}
pub fn count_breaks_for_failed(&self) -> Result<HashMap<String, usize>> {
let mut counts: HashMap<String, usize> = HashMap::new();
let mut stmt = self.conn.prepare(
"SELECT p.pkgname, COUNT(*)
FROM builds b
JOIN packages p ON b.failed_dep_id = p.id
WHERE b.outcome = ?1
GROUP BY b.failed_dep_id",
)?;
let rows = stmt.query_map([PackageStateKind::IndirectFailed as i32], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
})?;
for row in rows {
let (pkgname, count) = row?;
counts.insert(pkgname, count as usize);
}
Ok(counts)
}
pub fn add_build_duration(&self, duration: Duration) -> Result<()> {
let existing = self.get_build_duration()?;
let total_ms = (existing + duration).as_millis() as i64;
self.conn.execute(
"INSERT OR REPLACE INTO metadata (key, value) VALUES ('build_duration_ms', ?1)",
params![total_ms.to_string()],
)?;
Ok(())
}
pub fn get_build_duration(&self) -> Result<Duration> {
let ms: i64 = self
.conn
.query_row(
"SELECT COALESCE(
(SELECT value FROM metadata WHERE key = 'build_duration_ms'),
'0'
)",
[],
|row| {
let s: String = row.get(0)?;
Ok(s.parse::<i64>().unwrap_or(0))
},
)
.unwrap_or(0);
Ok(Duration::from_millis(ms as u64))
}
pub fn get_blockers(&self, package_id: i64) -> Result<Vec<(String, String, String)>> {
let mut stmt = self.conn.prepare(
"WITH RECURSIVE
blocking(id, outcome) AS (
-- Direct dependencies that have failed/skipped builds
SELECT rd.depends_on_id, b.outcome
FROM resolved_depends rd
JOIN builds b ON b.package_id = rd.depends_on_id
WHERE rd.package_id = ?1
AND b.outcome NOT IN (?2, ?3)
UNION
-- Transitive: deps of deps that are blocked
SELECT rd.depends_on_id, b.outcome
FROM resolved_depends rd
JOIN blocking bl ON rd.package_id = bl.id
JOIN builds b ON b.package_id = rd.depends_on_id
WHERE b.outcome NOT IN (?2, ?3)
)
SELECT DISTINCT p.pkgname, p.pkgpath, bl.outcome
FROM blocking bl
JOIN packages p ON bl.id = p.id
-- Only show root causes (failed or prefailed/preskipped), not indirect
WHERE bl.outcome IN (?4, ?5, ?6, ?7)
ORDER BY p.pkgname",
)?;
let rows = stmt.query_map(
params![
package_id,
PackageStateKind::Success as i32,
PackageStateKind::UpToDate as i32,
PackageStateKind::Failed as i32,
PackageStateKind::PreFailed as i32,
PackageStateKind::PreSkipped as i32,
PackageStateKind::Unresolved as i32,
],
|row| {
let pkgname: String = row.get(0)?;
let pkgpath: String = row.get(1)?;
let outcome_id: i32 = row.get(2)?;
let kind = PackageStateKind::from_repr(outcome_id).ok_or_else(|| {
rusqlite::Error::FromSqlConversionFailure(
2,
rusqlite::types::Type::Integer,
format!("unknown outcome type id: {}", outcome_id).into(),
)
})?;
let status: &str = kind.into();
Ok((pkgname, pkgpath, status.to_string()))
},
)?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn get_blocked_by(&self, package_id: i64) -> Result<Vec<(String, String)>> {
let mut stmt = self.conn.prepare(
"WITH RECURSIVE
affected(id) AS (
-- Direct reverse dependencies
SELECT rd.package_id
FROM resolved_depends rd
WHERE rd.depends_on_id = ?1
UNION
-- Transitive reverse dependencies
SELECT rd.package_id
FROM resolved_depends rd
JOIN affected a ON rd.depends_on_id = a.id
)
SELECT p.pkgname, p.pkgpath
FROM affected a
JOIN packages p ON a.id = p.id
ORDER BY p.pkgname",
)?;
let rows = stmt.query_map([package_id], |row| Ok((row.get(0)?, row.get(1)?)))?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn get_prefailskip_packages(&self) -> Result<Vec<(String, Option<String>, PackageState)>> {
let mut stmt = self.conn.prepare(
"SELECT p.pkgname, p.pkgpath, p.skip_reason, p.fail_reason
FROM packages p
WHERE (p.skip_reason IS NOT NULL OR p.fail_reason IS NOT NULL)
AND NOT EXISTS (SELECT 1 FROM builds b WHERE b.package_id = p.id)
ORDER BY p.pkgname",
)?;
let rows = stmt.query_map([], |row| {
let pkgname: String = row.get(0)?;
let pkgpath: Option<String> = row.get(1)?;
let skip: Option<String> = row.get(2)?;
let fail: Option<String> = row.get(3)?;
let state = match (fail, skip) {
(Some(r), _) => PackageState::PreFailed(r),
(_, Some(r)) => PackageState::PreSkipped(r),
_ => PackageState::PreFailed(String::new()),
};
Ok((pkgname, pkgpath, state))
})?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn get_indirect_failures(&self) -> Result<Vec<(String, Option<String>, String)>> {
let mut stmt = self.conn.prepare(
"WITH RECURSIVE
-- Only direct failures are root causes
failed_pkgs(id) AS (
SELECT package_id FROM builds
WHERE outcome IN (?1, ?2)
),
-- Packages affected by failures (transitive closure)
affected(id, root_id) AS (
SELECT id, id FROM failed_pkgs
UNION
SELECT rd.package_id, a.root_id
FROM resolved_depends rd
JOIN affected a ON rd.depends_on_id = a.id
WHERE rd.package_id NOT IN (SELECT id FROM failed_pkgs)
)
SELECT p.pkgname, p.pkgpath, GROUP_CONCAT(DISTINCT fp.pkgname) as failed_deps
FROM affected a
JOIN packages p ON a.id = p.id
JOIN packages fp ON a.root_id = fp.id
WHERE a.id != a.root_id
AND NOT EXISTS (SELECT 1 FROM builds b WHERE b.package_id = a.id)
AND p.skip_reason IS NULL
AND p.fail_reason IS NULL
GROUP BY p.id, p.pkgname, p.pkgpath
ORDER BY p.pkgname",
)?;
let rows = stmt.query_map(
params![
PackageStateKind::Failed as i32,
PackageStateKind::PreFailed as i32,
],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
)?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn mark_failure_cascade(
&self,
package_id: i64,
reason: &str,
duration: Duration,
) -> Result<usize> {
let pkgname = self.get_pkgname(package_id)?;
let mut stmt = self.conn.prepare(
"WITH RECURSIVE affected(id, depth) AS (
SELECT ?1, 0
UNION
SELECT rd.package_id, a.depth + 1
FROM resolved_depends rd
JOIN affected a ON rd.depends_on_id = a.id
)
SELECT id, depth FROM affected ORDER BY depth",
)?;
let affected: Vec<(i64, i32)> = stmt
.query_map([package_id], |row| {
Ok((row.get::<_, i64>(0)?, row.get::<_, i32>(1)?))
})?
.filter_map(|r| r.ok())
.collect();
let tx = self.transaction()?;
for (id, depth) in &affected {
let (outcome, detail, dur, dep_id) = if *depth == 0 {
(
PackageStateKind::Failed as i32,
reason.to_string(),
duration.as_millis() as i64,
None,
)
} else {
(
PackageStateKind::IndirectFailed as i32,
format!("depends on failed {}", pkgname),
0,
Some(package_id),
)
};
self.conn.execute(
"INSERT OR REPLACE INTO builds
(package_id, outcome, outcome_detail, duration_ms,
failed_dep_id)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![id, outcome, detail, dur, dep_id],
)?;
}
tx.commit()?;
debug!(
package_id = package_id,
affected_count = affected.len(),
"Marked failure cascade"
);
Ok(affected.len())
}
pub fn build_id(&self) -> Result<String> {
self.conn
.query_row(
"SELECT value FROM metadata WHERE key = 'build_id'",
[],
|row| row.get(0),
)
.context("build_id not found in database")
}
pub fn full_scan_complete(&self) -> bool {
self.conn
.query_row(
"SELECT value FROM metadata WHERE key = 'full_scan_complete'",
[],
|row| row.get::<_, String>(0),
)
.map(|v| v == "true")
.unwrap_or(false)
}
pub fn set_full_scan_complete(&self) -> Result<()> {
self.conn.execute(
"INSERT OR REPLACE INTO metadata (key, value) VALUES ('full_scan_complete', 'true')",
[],
)?;
Ok(())
}
pub fn clear_full_scan_complete(&self) -> Result<()> {
self.conn
.execute("DELETE FROM metadata WHERE key = 'full_scan_complete'", [])?;
Ok(())
}
pub fn store_pkgsrc_env(&self, env: &PkgsrcEnv) -> Result<()> {
let json = serde_json::json!({
"packages": env.packages,
"pkgtools": env.pkgtools,
"prefix": env.prefix,
"pkg_dbdir": env.pkg_dbdir,
"pkg_refcount_dbdir": env.pkg_refcount_dbdir,
"metadata": env.metadata,
"cachevars": env.cachevars,
});
self.conn.execute(
"INSERT INTO metadata (key, value) VALUES ('pkgsrc_env', ?1)",
params![json.to_string()],
)?;
Ok(())
}
pub fn load_pkgsrc_env(&self) -> Result<PkgsrcEnv> {
let json_str: String = self
.conn
.query_row(
"SELECT value FROM metadata WHERE key = 'pkgsrc_env'",
[],
|row| row.get(0),
)
.context("pkgsrc environment not found in database")?;
let json: serde_json::Value =
serde_json::from_str(&json_str).context("Invalid pkgsrc_env JSON")?;
let get_path = |key: &str| -> Result<PathBuf> {
json.get(key)
.and_then(|v| v.as_str())
.map(PathBuf::from)
.ok_or_else(|| anyhow::anyhow!("Missing {} in pkgsrc_env", key))
};
let metadata: HashMap<String, String> = json
.get("metadata")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
let cachevars: HashMap<String, String> = json
.get("cachevars")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
Ok(PkgsrcEnv {
packages: get_path("packages")?,
pkgtools: get_path("pkgtools")?,
prefix: get_path("prefix")?,
pkg_dbdir: get_path("pkg_dbdir")?,
pkg_refcount_dbdir: get_path("pkg_refcount_dbdir")?,
metadata,
cachevars,
})
}
pub fn store_vcs_info(&self, info: &crate::vcs::VcsInfo) -> Result<()> {
let json = serde_json::to_string(info)?;
self.conn.execute(
"INSERT OR REPLACE INTO metadata (key, value) VALUES ('vcs_info', ?1)",
params![json],
)?;
Ok(())
}
pub fn load_vcs_info(&self) -> Result<crate::vcs::VcsInfo> {
let json_str: String = self
.conn
.query_row(
"SELECT value FROM metadata WHERE key = 'vcs_info'",
[],
|row| row.get(0),
)
.context("vcs_info not found in database")?;
serde_json::from_str(&json_str).context("Invalid vcs_info JSON")
}
pub fn get_failed_packages(&self) -> Result<Vec<String>> {
let mut stmt = self.conn.prepare(
"SELECT p.pkgname FROM builds b
JOIN packages p ON b.package_id = p.id
WHERE b.outcome = ?1
ORDER BY p.pkgname",
)?;
let pkgnames = stmt
.query_map([PackageStateKind::Failed as i32], |row| {
row.get::<_, String>(0)
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(pkgnames)
}
pub fn get_successful_packages(&self) -> Result<Vec<String>> {
let mut stmt = self.conn.prepare(
"SELECT p.pkgname FROM builds b
JOIN packages p ON b.package_id = p.id
WHERE b.outcome IN (?1, ?2)
ORDER BY p.pkgname",
)?;
let pkgnames = stmt
.query_map(
params![
PackageStateKind::Success as i32,
PackageStateKind::UpToDate as i32,
],
|row| row.get::<_, String>(0),
)?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(pkgnames)
}
pub fn get_restricted_packages(&self) -> Result<HashSet<String>> {
let mut stmt = self.conn.prepare(
"SELECT pkgname FROM packages
WHERE json_extract(scan_data, '$.NO_BIN_ON_FTP') IS NOT NULL
AND json_extract(scan_data, '$.NO_BIN_ON_FTP') != ''",
)?;
let restricted = stmt
.query_map([], |row| row.get::<_, String>(0))?
.collect::<std::result::Result<HashSet<_>, _>>()?;
Ok(restricted)
}
pub fn get_report_data(&self) -> Result<Vec<ReportRow>> {
let mut stmt = self.conn.prepare(
"SELECT p.pkgname, p.pkgpath, p.scan_data, b.outcome, b.outcome_detail
FROM packages p
LEFT JOIN builds b ON b.package_id = p.id
ORDER BY p.pkgname",
)?;
let rows = stmt
.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, Option<String>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, Option<i32>>(3)?,
row.get::<_, Option<String>>(4)?,
))
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(rows)
}
pub fn execute_raw(&self, sql: &str) -> Result<()> {
let mut stmt = self.conn.prepare(sql)?;
let column_count = stmt.column_count();
if column_count == 0 {
let affected = stmt.execute([])?;
if affected > 0 {
println!("{} row(s) affected", affected);
}
} else {
let mut rows = stmt.query([])?;
while let Some(row) = rows.next()? {
let values: Vec<String> = (0..column_count)
.map(|i| {
row.get_ref(i)
.map(|v| match v {
rusqlite::types::ValueRef::Null => String::new(),
rusqlite::types::ValueRef::Integer(i) => i.to_string(),
rusqlite::types::ValueRef::Real(f) => f.to_string(),
rusqlite::types::ValueRef::Text(s) => {
String::from_utf8_lossy(s).to_string()
}
rusqlite::types::ValueRef::Blob(b) => {
format!("<blob:{} bytes>", b.len())
}
})
.unwrap_or_default()
})
.collect();
if !try_println(&values.join("|")) {
break;
}
}
}
Ok(())
}
pub(crate) fn history_conn(&self) -> Result<&Connection> {
if let Some(conn) = self.history_conn.get() {
return Ok(conn);
}
let conn = open_history_conn(&self.dbdir)?;
let _ = self.history_conn.set(conn);
self.history_conn
.get()
.ok_or_else(|| anyhow::anyhow!("history connection not initialized"))
}
pub fn record_history(&self, rec: &crate::History) -> Result<()> {
let conn = self.history_conn()?;
record_history_to(conn, rec)
}
pub fn query_history(&self, pattern: Option<®ex::Regex>) -> Result<Vec<crate::History>> {
let conn = self.history_conn()?;
let cols: String = HistoryKind::VARIANTS
.iter()
.map(|v| format!("bh.{}", <&str>::from(v)))
.collect::<Vec<_>>()
.join(", ");
let sql = format!(
"SELECT bh.id, {cols}, \
wt.stage, wt.duration, ct.duration \
FROM build_history bh \
LEFT JOIN wall_times wt ON wt.history_id = bh.id \
LEFT JOIN cpu_times ct ON ct.history_id = bh.id \
AND ct.stage = wt.stage \
WHERE bh.outcome IN ({success}, {failed}) \
ORDER BY bh.timestamp DESC, bh.id DESC",
success = PackageStateKind::Success as i32,
failed = PackageStateKind::Failed as i32,
);
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, i64>(0)?,
row.get::<_, i64>(1)?,
row.get::<_, String>(2)?,
row.get::<_, String>(3)?,
row.get::<_, String>(4)?,
row.get::<_, i32>(5)?,
row.get::<_, Option<i32>>(6)?,
row.get::<_, Option<i64>>(7)?.map(|v| v as usize),
row.get::<_, i64>(8)?,
row.get::<_, Option<i64>>(9)?,
row.get::<_, Option<String>>(10)?,
row.get::<_, Option<String>>(11)?,
row.get::<_, Option<i32>>(12)?,
row.get::<_, Option<i64>>(13)?,
row.get::<_, Option<i64>>(14)?,
))
})?;
let mut results: Vec<crate::History> = Vec::new();
let mut current_id: Option<i64> = None;
let mut current_accepted = false;
for row in rows {
let (
id,
timestamp,
pkgpath,
pkgname,
pkgbase,
outcome_id,
stage_id,
make_jobs,
duration,
disk_usage,
wrkobjdir,
build_id,
wt_stage,
wt_duration,
ct_duration,
) = row?;
if Some(id) == current_id {
if current_accepted {
if let Some(last) = results.last_mut() {
if let Some(stage_id) = wt_stage {
let stage = Stage::from_repr(stage_id)
.ok_or_else(|| anyhow::anyhow!("Unknown stage id: {}", stage_id))?;
if let Some(ms) = wt_duration {
last.stage_durations
.push((stage, Duration::from_millis(ms as u64)));
}
if let Some(ms) = ct_duration {
last.stage_cpu_times
.push((stage, Duration::from_millis(ms as u64)));
}
}
}
}
continue;
}
current_id = Some(id);
current_accepted = false;
if let Some(re) = pattern {
if !re.is_match(&pkgpath) && !re.is_match(&pkgname) {
continue;
}
}
current_accepted = true;
let outcome = PackageState::from_db(outcome_id, None)
.ok_or_else(|| anyhow::anyhow!("Unknown outcome type id: {}", outcome_id))?;
let stage = stage_id
.map(|id| {
Stage::from_repr(id).ok_or_else(|| anyhow::anyhow!("Unknown stage id: {}", id))
})
.transpose()?;
let mut stage_durations = Vec::new();
let mut stage_cpu_times = Vec::new();
if let Some(st_id) = wt_stage {
let st = Stage::from_repr(st_id)
.ok_or_else(|| anyhow::anyhow!("Unknown stage id: {}", st_id))?;
if let Some(ms) = wt_duration {
stage_durations.push((st, Duration::from_millis(ms as u64)));
}
if let Some(ms) = ct_duration {
stage_cpu_times.push((st, Duration::from_millis(ms as u64)));
}
}
results.push(crate::History {
timestamp,
pkgpath,
pkgname,
pkgbase,
outcome,
stage,
make_jobs,
duration: Duration::from_millis(duration as u64),
disk_usage: disk_usage.map(|s| s as u64),
wrkobjdir: wrkobjdir.and_then(|s| s.parse().ok()),
stage_durations,
stage_cpu_times,
build_id,
});
}
Ok(results)
}
pub fn build_history_by_pkg_all(&self) -> HashMap<String, PkgBuildHistory> {
let conn = match self.history_conn() {
Ok(c) => c,
Err(e) => {
warn!(error = %e, "build_history_by_pkg_all: failed to open history db");
return HashMap::new();
}
};
let du: &str = HistoryKind::DiskUsage.into();
let out: &str = HistoryKind::Outcome.into();
let pkgbase_col: &str = HistoryKind::Pkgbase.into();
let pkgpath_col: &str = HistoryKind::Pkgpath.into();
let sql = format!(
"WITH latest AS ( \
SELECT h.{pkgbase_col}, h.{du}, h.{out}, \
ROW_NUMBER() OVER ( \
PARTITION BY h.{pkgpath_col}, h.{pkgbase_col} \
ORDER BY h.id DESC \
) AS rn \
FROM build_history h \
) \
SELECT {pkgbase_col}, {out}, {du} FROM latest \
WHERE rn = 1",
);
let mut stmt = match conn.prepare(&sql) {
Ok(s) => s,
Err(e) => {
warn!(error = %e, "build_history_by_pkg_all: failed to prepare query");
return HashMap::new();
}
};
let rows = match stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, i32>(1)?,
row.get::<_, Option<i64>>(2)?,
))
}) {
Ok(r) => r,
Err(e) => {
warn!(error = %e, "build_history_by_pkg_all: query failed");
return HashMap::new();
}
};
let mut result = HashMap::new();
for row in rows.flatten() {
let (pkgbase, outcome, du) = row;
result.insert(
pkgbase,
PkgBuildHistory {
outcome: PackageStateKind::from_repr(outcome),
disk_usage: du.map(|v| v as u64),
},
);
}
result
}
pub fn duration_by_pkg_all(&self) -> HashMap<String, u64> {
let conn = match self.history_conn() {
Ok(c) => c,
Err(e) => {
warn!(error = %e, "duration_by_pkg_all: failed to open history db");
return HashMap::new();
}
};
let dur: &str = HistoryKind::Duration.into();
let out: &str = HistoryKind::Outcome.into();
let pkgbase_col: &str = HistoryKind::Pkgbase.into();
let pkgpath_col: &str = HistoryKind::Pkgpath.into();
let success_outcome = PackageStateKind::Success as i32;
let build_stage = Stage::Build as i32;
let jobs: &str = HistoryKind::MakeJobs.into();
let sql = format!(
"WITH matched AS ( \
SELECT h.{pkgbase_col}, h.{dur}, h.{jobs}, h.id, \
ROW_NUMBER() OVER ( \
PARTITION BY h.{pkgpath_col}, h.{pkgbase_col} \
ORDER BY h.id DESC \
) AS rn \
FROM build_history h \
WHERE h.{out} = {success_outcome} \
) \
SELECT m.{pkgbase_col}, \
COALESCE(wt.duration, m.{dur}) \
* COALESCE(m.{jobs}, 1) \
FROM matched m \
LEFT JOIN wall_times wt ON wt.history_id = m.id \
AND wt.stage = {build_stage} \
WHERE m.rn = 1",
);
let mut stmt = match conn.prepare(&sql) {
Ok(s) => s,
Err(e) => {
warn!(error = %e, "duration_by_pkg_all: failed to prepare query");
return HashMap::new();
}
};
let rows = match stmt.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
}) {
Ok(r) => r,
Err(e) => {
warn!(error = %e, "duration_by_pkg_all: query failed");
return HashMap::new();
}
};
let mut result = HashMap::new();
for row in rows.flatten() {
let (pkgbase, ms) = row;
result.insert(pkgbase, ms as u64);
}
result
}
pub fn store_cpu_usage(&self, samples: &[crate::cpu::CpuSample]) -> Result<()> {
if samples.is_empty() {
return Ok(());
}
let conn = self.history_conn()?;
let tx = conn.unchecked_transaction()?;
{
let mut stmt = tx.prepare_cached(
"INSERT INTO cpu_usage (timestamp, user_pct, sys_pct) \
VALUES (?1, ?2, ?3)",
)?;
for s in samples {
stmt.execute(params![s.timestamp, s.user_pct as i32, s.sys_pct as i32])?;
}
}
tx.commit()?;
Ok(())
}
pub fn list_history_builds(&self) -> Result<Vec<BuildListEntry>> {
let conn = self.history_conn()?;
let success = PackageStateKind::Success as i32;
let up_to_date = PackageStateKind::UpToDate as i32;
let failed = PackageStateKind::Failed as i32;
let mut stmt = conn.prepare(
"WITH latest AS ( \
SELECT build_id, outcome, \
ROW_NUMBER() OVER ( \
PARTITION BY build_id, pkgpath \
ORDER BY id DESC \
) AS rn \
FROM build_history \
WHERE build_id IS NOT NULL \
) \
SELECT build_id, COUNT(*) AS packages, \
SUM(CASE WHEN outcome IN (?1, ?2) THEN 1 ELSE 0 END), \
SUM(CASE WHEN outcome = ?3 THEN 1 ELSE 0 END) \
FROM latest \
WHERE rn = 1 \
GROUP BY build_id \
ORDER BY build_id DESC",
)?;
let rows = stmt.query_map(params![success, up_to_date, failed], |row| {
Ok(BuildListEntry {
build_id: row.get(0)?,
package_count: row.get::<_, i64>(1)? as usize,
succeeded: row.get::<_, i64>(2)? as usize,
failed: row.get::<_, i64>(3)? as usize,
})
})?;
rows.collect::<Result<Vec<_>, _>>()
.context("Failed to list history builds")
}
pub fn store_build_revision(&self, build_id: &str, revision: &str) -> Result<()> {
let conn = self.history_conn()?;
conn.execute(
"INSERT OR REPLACE INTO build_metadata (build_id, revision) VALUES (?1, ?2)",
params![build_id, revision],
)?;
Ok(())
}
pub fn get_build_revision(&self, build_id: &str) -> Result<Option<String>> {
let conn = self.history_conn()?;
match conn.query_row(
"SELECT revision FROM build_metadata WHERE build_id = ?1",
[build_id],
|row| row.get(0),
) {
Ok(rev) => Ok(Some(rev)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn compute_build_diff(&self, build1_id: &str, build2_id: &str) -> Result<BuildDiff> {
let conn = self.history_conn()?;
type PkgRecord = (String, i32, Option<i32>);
let query_build = |bid: &str| -> Result<HashMap<String, PkgRecord>> {
let mut stmt = conn.prepare(
"SELECT pkgpath, pkgname, outcome, stage FROM ( \
SELECT pkgpath, pkgname, outcome, stage, \
ROW_NUMBER() OVER (PARTITION BY pkgpath ORDER BY id DESC) AS rn \
FROM build_history WHERE build_id = ?1 \
) WHERE rn = 1",
)?;
let mut map = HashMap::new();
let rows = stmt.query_map([bid], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, i32>(2)?,
row.get::<_, Option<i32>>(3)?,
))
})?;
for row in rows {
let (pkgpath, pkgname, outcome, stage) = row?;
map.insert(pkgpath, (pkgname, outcome, stage));
}
Ok(map)
};
let b1 = query_build(build1_id)?;
let b2 = query_build(build2_id)?;
let mut diff = BuildDiff {
build1_id: build1_id.to_string(),
build2_id: build2_id.to_string(),
new_failures: Vec::new(),
fixes: Vec::new(),
version_changes: Vec::new(),
other_changes: Vec::new(),
};
use PackageStateKind::*;
for (pkgpath, (b2_pkgname, b2_outcome_id, b2_stage_id)) in &b2 {
let b2_outcome = PackageStateKind::from_repr(*b2_outcome_id);
let b2_stage = b2_stage_id.and_then(Stage::from_repr);
let entry_from = |b1_pkg: Option<&PkgRecord>| DiffEntry {
pkgpath: pkgpath.clone(),
build1_pkgname: b1_pkg.map(|(n, _, _)| n.clone()),
build2_pkgname: Some(b2_pkgname.clone()),
build1_outcome: b1_pkg.and_then(|(_, o, _)| PackageStateKind::from_repr(*o)),
build2_outcome: b2_outcome,
build1_stage: b1_pkg.and_then(|(_, _, s)| s.and_then(Stage::from_repr)),
build2_stage: b2_stage,
};
match b1.get(pkgpath) {
Some(b1_rec) => {
let b1_outcome = PackageStateKind::from_repr(b1_rec.1);
let b1_stage = b1_rec.2.and_then(Stage::from_repr);
if b1_outcome == b2_outcome && b1_rec.0 == *b2_pkgname && b1_stage == b2_stage {
continue;
}
let entry = entry_from(Some(b1_rec));
match (b1_outcome, b2_outcome) {
(Some(b1o), Some(b2o)) => {
let was_failed = matches!(b1o, Failed);
let now_failed = matches!(b2o, Failed);
let now_ok = matches!(b2o, Success | UpToDate);
if !was_failed && now_failed {
diff.new_failures.push(entry);
} else if was_failed && now_ok {
diff.fixes.push(entry);
} else if was_failed && now_failed {
diff.version_changes.push(entry);
} else {
diff.other_changes.push(entry);
}
}
_ => diff.other_changes.push(entry),
}
}
None => {
let entry = entry_from(None);
if matches!(b2_outcome, Some(Failed)) {
diff.new_failures.push(entry);
} else {
diff.other_changes.push(entry);
}
}
}
}
for (pkgpath, (b1_pkgname, b1_outcome_id, b1_stage_id)) in &b1 {
if b2.contains_key(pkgpath) {
continue;
}
let b1_outcome = PackageStateKind::from_repr(*b1_outcome_id);
let entry = DiffEntry {
pkgpath: pkgpath.clone(),
build1_pkgname: Some(b1_pkgname.clone()),
build2_pkgname: None,
build1_outcome: b1_outcome,
build2_outcome: None,
build1_stage: b1_stage_id.and_then(Stage::from_repr),
build2_stage: None,
};
if matches!(b1_outcome, Some(Failed)) {
diff.fixes.push(entry);
} else {
diff.other_changes.push(entry);
}
}
Ok(diff)
}
}
pub struct DiffEntry {
pub pkgpath: String,
pub build1_pkgname: Option<String>,
pub build2_pkgname: Option<String>,
pub build1_outcome: Option<PackageStateKind>,
pub build2_outcome: Option<PackageStateKind>,
pub build1_stage: Option<Stage>,
pub build2_stage: Option<Stage>,
}
pub struct BuildDiff {
pub build1_id: String,
pub build2_id: String,
pub new_failures: Vec<DiffEntry>,
pub fixes: Vec<DiffEntry>,
pub version_changes: Vec<DiffEntry>,
pub other_changes: Vec<DiffEntry>,
}
pub struct BuildListEntry {
pub build_id: String,
pub package_count: usize,
pub succeeded: usize,
pub failed: usize,
}
fn check_history_schema(dbdir: &Path) -> Result<()> {
let path = dbdir.join("history.db");
if !path.exists() {
return Ok(());
}
let conn = Connection::open(&path).context("Failed to open history database")?;
let has_schema: bool = conn.query_row(
"SELECT COUNT(*) FROM sqlite_master \
WHERE type='table' AND name='schema_version'",
[],
|row| row.get::<_, i32>(0).map(|c| c > 0),
)?;
if has_schema {
let version: i32 =
conn.query_row("SELECT version FROM schema_version LIMIT 1", [], |row| {
row.get(0)
})?;
if version != HISTORY_SCHEMA_VERSION {
anyhow::bail!(
"History schema mismatch: found v{version}, expected v{expected}. \
Remove {} to reset.",
path.display(),
expected = HISTORY_SCHEMA_VERSION
);
}
}
Ok(())
}
fn open_history_conn(dbdir: &Path) -> Result<Connection> {
let conn =
Connection::open(dbdir.join("history.db")).context("Failed to open history database")?;
conn.execute_batch(
"PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA cache_size = -8000;
PRAGMA temp_store = MEMORY;
PRAGMA foreign_keys = ON;",
)?;
let has_schema: bool = conn.query_row(
"SELECT COUNT(*) FROM sqlite_master \
WHERE type='table' AND name='schema_version'",
[],
|row| row.get::<_, i32>(0).map(|c| c > 0),
)?;
if !has_schema {
create_history_schema(&conn)?;
}
Ok(conn)
}
pub(crate) fn create_history_schema(conn: &Connection) -> Result<()> {
conn.execute_batch(&format!(
"CREATE TABLE schema_version (version INTEGER NOT NULL);
INSERT INTO schema_version (version) VALUES ({});
CREATE TABLE outcome_types (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL
);
INSERT INTO outcome_types (id, name) VALUES {outcome_types};
CREATE TABLE stage_types (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL
);
INSERT INTO stage_types (id, name) VALUES {stages};
CREATE TABLE build_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
{history_columns}
);
CREATE INDEX idx_history_pkgpath
ON build_history(pkgpath);
CREATE INDEX idx_history_pkgpath_pkgbase
ON build_history(pkgpath, pkgbase);
CREATE INDEX idx_history_pkgpath_outcome
ON build_history(pkgpath, outcome);
CREATE INDEX idx_history_timestamp
ON build_history(timestamp);
CREATE INDEX idx_history_build_id
ON build_history(build_id, pkgpath, id);
CREATE TABLE wall_times (
history_id INTEGER NOT NULL
REFERENCES build_history(id) ON DELETE CASCADE,
stage INTEGER NOT NULL REFERENCES stage_types(id),
duration INTEGER NOT NULL,
PRIMARY KEY (history_id, stage)
);
CREATE TABLE cpu_times (
history_id INTEGER NOT NULL
REFERENCES build_history(id) ON DELETE CASCADE,
stage INTEGER NOT NULL REFERENCES stage_types(id),
duration INTEGER NOT NULL,
PRIMARY KEY (history_id, stage)
);
CREATE TABLE cpu_usage (
timestamp INTEGER NOT NULL,
user_pct INTEGER NOT NULL,
sys_pct INTEGER NOT NULL
);
CREATE INDEX idx_cpu_timestamp ON cpu_usage(timestamp);
CREATE TABLE build_metadata (
build_id TEXT PRIMARY KEY,
revision TEXT
);",
HISTORY_SCHEMA_VERSION,
outcome_types = PackageState::db_values(),
stages = stage_values(),
history_columns = history_schema(),
))?;
Ok(())
}
pub(crate) fn record_history_to(conn: &Connection, rec: &crate::History) -> Result<()> {
let cols: Vec<&str> = HistoryKind::VARIANTS.iter().map(<&str>::from).collect();
let placeholders: String = (1..=cols.len())
.map(|i| format!("?{}", i))
.collect::<Vec<_>>()
.join(", ");
let sql = format!(
"INSERT INTO build_history ({}) VALUES ({})",
cols.join(", "),
placeholders,
);
let dur_ms = |d: Duration| d.as_millis() as i64;
conn.execute(
&sql,
params![
rec.timestamp,
rec.pkgpath,
rec.pkgname,
rec.pkgbase,
rec.outcome.db_id(),
rec.stage.map(|s| s as i32),
rec.make_jobs.map(|j| j as i64),
dur_ms(rec.duration),
rec.disk_usage.map(|s| s as i64),
rec.wrkobjdir.as_ref().map(|k| k.to_string()),
rec.build_id.as_deref(),
],
)?;
let history_id = conn.last_insert_rowid();
if !rec.stage_durations.is_empty() {
let mut stmt = conn.prepare_cached(
"INSERT INTO wall_times \
(history_id, stage, duration) \
VALUES (?1, ?2, ?3)",
)?;
for &(stage, duration) in &rec.stage_durations {
stmt.execute(params![history_id, stage as i32, dur_ms(duration)])?;
}
}
if !rec.stage_cpu_times.is_empty() {
let mut stmt = conn.prepare_cached(
"INSERT INTO cpu_times \
(history_id, stage, duration) \
VALUES (?1, ?2, ?3)",
)?;
for &(stage, duration) in &rec.stage_cpu_times {
stmt.execute(params![history_id, stage as i32, dur_ms(duration)])?;
}
}
Ok(())
}
pub(crate) struct SelectedPackage {
pub id: i64,
pub pkgname: PkgName,
pub pkgpath: String,
pub pbulk_weight: usize,
pub make_jobs_safe: bool,
}
pub(crate) fn query_selected_packages(conn: &Connection) -> Result<Vec<SelectedPackage>> {
let mut stmt = conn.prepare(
"SELECT id, pkgname, pkgpath, pbulk_weight, make_jobs_safe \
FROM packages WHERE selected = 1",
)?;
let rows = stmt.query_map([], |row| {
Ok(SelectedPackage {
id: row.get(0)?,
pkgname: PkgName::from(row.get::<_, String>(1)?),
pkgpath: row.get(2)?,
pbulk_weight: row.get::<_, i32>(3)? as usize,
make_jobs_safe: row.get::<_, bool>(4)?,
})
})?;
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
}
pub(crate) fn query_resolved_deps(conn: &Connection) -> Result<Vec<(i64, i64)>> {
let mut stmt = conn.prepare("SELECT package_id, depends_on_id FROM resolved_depends")?;
let rows = stmt.query_map([], |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)))?;
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
}
pub(crate) struct BuildStageTiming {
pub cpu_ms: u64,
}
pub(crate) fn query_build_stage_timings(
conn: &Connection,
) -> HashMap<(String, String), BuildStageTiming> {
let out: &str = HistoryKind::Outcome.into();
let pkgbase_col: &str = HistoryKind::Pkgbase.into();
let pkgpath_col: &str = HistoryKind::Pkgpath.into();
let success_outcome = PackageStateKind::Success as i32;
let build_stage = Stage::Build as i32;
let sql = format!(
"WITH matched AS ( \
SELECT h.{pkgpath_col}, h.{pkgbase_col}, h.id, \
ROW_NUMBER() OVER ( \
PARTITION BY h.{pkgpath_col}, h.{pkgbase_col} \
ORDER BY h.id DESC \
) AS rn \
FROM build_history h \
WHERE h.{out} = {success_outcome} \
) \
SELECT m.{pkgpath_col}, m.{pkgbase_col}, ct.duration \
FROM matched m \
JOIN cpu_times ct ON ct.history_id = m.id \
AND ct.stage = {build_stage} \
WHERE m.rn = 1",
);
let mut stmt = match conn.prepare(&sql) {
Ok(s) => s,
Err(e) => {
warn!(error = %e, "query_build_stage_timings: failed to prepare query");
return HashMap::new();
}
};
let rows = match stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, i64>(2)?,
))
}) {
Ok(r) => r,
Err(e) => {
warn!(error = %e, "query_build_stage_timings: query failed");
return HashMap::new();
}
};
let mut result = HashMap::new();
for row in rows.flatten() {
let (pkgpath, pkgbase, cpu_ms) = row;
result.insert(
(pkgpath, pkgbase),
BuildStageTiming {
cpu_ms: cpu_ms as u64,
},
);
}
result
}
#[cfg(test)]
mod tests {
use super::*;
use crate::build::Stage;
fn test_history_db() -> Connection {
let conn = Connection::open_in_memory().expect("failed to open in-memory db");
create_history_schema(&conn).expect("failed to create schema");
conn
}
fn insert_build(conn: &Connection, pkgpath: &str, pkgname: &str, wall_ms: u64, cpu_ms: u64) {
use crate::{History, PackageState};
use std::time::Duration;
let rec = History {
timestamp: 0,
pkgpath: pkgpath.to_string(),
pkgname: pkgname.to_string(),
pkgbase: PkgName::new(pkgname).pkgbase().to_string(),
outcome: PackageState::Success,
stage: None,
make_jobs: Some(1),
duration: Duration::ZERO,
disk_usage: None,
wrkobjdir: None,
stage_durations: vec![(Stage::Build, Duration::from_millis(wall_ms))],
stage_cpu_times: vec![(Stage::Build, Duration::from_millis(cpu_ms))],
build_id: None,
};
record_history_to(conn, &rec).expect("failed to insert build");
}
fn key(pkgpath: &str, pkgbase: &str) -> (String, String) {
(pkgpath.to_string(), pkgbase.to_string())
}
#[test]
fn build_stage_timings_basic() {
let conn = test_history_db();
insert_build(&conn, "devel/cmake", "cmake-3.28.0", 60000, 180000);
let timings = query_build_stage_timings(&conn);
assert_eq!(timings.len(), 1);
let t = &timings[&key("devel/cmake", "cmake")];
assert_eq!(t.cpu_ms, 180000);
}
#[test]
fn build_stage_timings_latest() {
let conn = test_history_db();
insert_build(&conn, "devel/cmake", "cmake-3.27.0", 1000, 2000);
insert_build(&conn, "devel/cmake", "cmake-3.28.0", 5000, 15000);
let timings = query_build_stage_timings(&conn);
assert_eq!(timings.len(), 1);
let t = &timings[&key("devel/cmake", "cmake")];
assert_eq!(t.cpu_ms, 15000);
}
#[test]
fn build_stage_timings_pkgpath_disambiguates() {
let conn = test_history_db();
insert_build(
&conn,
"databases/mysql80-client",
"mysql-client-8.0.1",
3000,
9000,
);
insert_build(
&conn,
"databases/mysql57-client",
"mysql-client-5.7.42",
2000,
4000,
);
let timings = query_build_stage_timings(&conn);
assert_eq!(timings.len(), 2);
let t80 = &timings[&key("databases/mysql80-client", "mysql-client")];
assert_eq!(t80.cpu_ms, 9000);
let t57 = &timings[&key("databases/mysql57-client", "mysql-client")];
assert_eq!(t57.cpu_ms, 4000);
}
#[test]
fn build_stage_timings_empty() {
let conn = test_history_db();
let timings = query_build_stage_timings(&conn);
assert!(timings.is_empty());
}
}