use std::cell::OnceCell;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{Context, Result};
use indexmap::IndexMap;
use pkgsrc::{AllDepends, PkgName, PkgPath, ScanDepends, ScanIndex};
use rusqlite::{Connection, params};
use tracing::{debug, warn};
use strum::VariantArray;
use crate::build::{BuildResult, PkgBuildStats, Stage};
use crate::config::PkgsrcEnv;
use crate::scan::ScanResult;
use crate::try_println;
use crate::{HistoryKind, PackageState, PackageStateKind};
pub type ReportRow = (String, ScanIndex, Option<i32>, Option<String>);
fn stage_values() -> String {
Stage::VARIANTS
.iter()
.map(|v| format!("({}, '{}')", *v as i32, v.into_str()))
.collect::<Vec<_>>()
.join(", ")
}
fn history_schema() -> String {
use strum::VariantArray;
HistoryKind::VARIANTS
.iter()
.map(|v| {
let name: &str = v.into();
let sql = match v {
HistoryKind::Pkgpath | HistoryKind::Pkgname | HistoryKind::Pkgbase => {
"TEXT NOT NULL"
}
HistoryKind::Outcome => "INTEGER NOT NULL REFERENCES outcome_types(id)",
HistoryKind::Stage => "INTEGER REFERENCES stage_types(id)",
HistoryKind::Duration | HistoryKind::Timestamp => "INTEGER NOT NULL",
HistoryKind::MakeJobs => "INTEGER",
HistoryKind::DiskUsage => "INTEGER",
HistoryKind::Wrkobjdir => "TEXT",
HistoryKind::BuildId => "TEXT",
};
format!("{name} {sql}")
})
.collect::<Vec<_>>()
.join(",\n ")
}
const SCHEMA_VERSION: i32 = 20260425;
const HISTORY_SCHEMA_VERSION: i32 = 20260406;
#[derive(Clone, Debug)]
pub struct PkgBuildHistory {
pub outcome: Option<PackageStateKind>,
pub disk_usage: Option<u64>,
pub make_jobs: Option<u32>,
pub wrkobjdir: Option<String>,
}
#[derive(Clone, Debug, serde::Deserialize)]
pub struct PackageRow {
pub id: i64,
pub pkgname: String,
pub pkg_location: String,
pub pkg_skip_reason: Option<String>,
pub pkg_fail_reason: Option<String>,
pub bootstrap_pkg: Option<String>,
pub pbulk_weight: Option<String>,
pub multi_version: Option<String>,
}
#[derive(Clone, Debug, serde::Deserialize)]
pub struct PackageStatusRow {
pub id: i64,
pub pkgname: String,
pub pkg_location: String,
pub pkg_skip_reason: Option<String>,
pub pkg_fail_reason: Option<String>,
pub build_reason: Option<String>,
pub multi_version: Option<String>,
pub build_outcome: Option<i32>,
pub outcome_detail: Option<String>,
}
fn build_result_from_row(row: &rusqlite::Row) -> rusqlite::Result<Option<BuildResult>> {
let Some(state) = PackageState::from_db(row.get("outcome")?, row.get("outcome_detail")?) else {
return Ok(None);
};
Ok(Some(BuildResult {
pkgname: PkgName::new(&row.get::<_, String>("pkgname")?),
pkgpath: row
.get::<_, Option<String>>("pkgpath")?
.and_then(|p| PkgPath::new(&p).ok()),
state,
log_dir: row.get::<_, Option<String>>("log_dir")?.map(PathBuf::from),
build_stats: PkgBuildStats {
stage: row
.get::<_, Option<i32>>("stage")?
.and_then(Stage::from_repr),
duration: Duration::from_millis(row.get::<_, i64>("duration_ms")? as u64),
..PkgBuildStats::default()
},
}))
}
pub struct Database {
conn: Connection,
dbdir: PathBuf,
history_conn: OnceCell<Connection>,
}
pub struct TransactionGuard<'a> {
conn: &'a Connection,
committed: bool,
}
impl<'a> TransactionGuard<'a> {
fn new(conn: &'a Connection) -> Result<Self> {
conn.execute("BEGIN TRANSACTION", [])?;
Ok(Self {
conn,
committed: false,
})
}
pub fn commit(mut self) -> Result<()> {
self.conn.execute("COMMIT", [])?;
self.committed = true;
Ok(())
}
}
impl Drop for TransactionGuard<'_> {
fn drop(&mut self) {
if !self.committed {
let _ = self.conn.execute("ROLLBACK", []);
}
}
}
impl Database {
pub fn open(dbdir: &Path) -> Result<Self> {
std::fs::create_dir_all(dbdir).context("Failed to create database directory")?;
let conn = Connection::open(dbdir.join("bob.db")).context("Failed to open database")?;
let db = Self {
conn,
dbdir: dbdir.to_path_buf(),
history_conn: OnceCell::new(),
};
db.configure_pragmas()?;
db.init()?;
Ok(db)
}
pub fn dbdir(&self) -> &Path {
&self.dbdir
}
pub(crate) fn conn(&self) -> &Connection {
&self.conn
}
fn query_rows<T, P>(&self, sql: &str, params: P) -> Result<Vec<T>>
where
T: for<'de> serde::Deserialize<'de>,
P: rusqlite::Params,
{
let mut stmt = self.conn.prepare(sql)?;
let rows = stmt.query(params)?;
Ok(serde_rusqlite::from_rows::<T>(rows).collect::<Result<_, _>>()?)
}
fn query_one<T, P>(&self, sql: &str, params: P) -> Result<Option<T>>
where
T: for<'de> serde::Deserialize<'de>,
P: rusqlite::Params,
{
let mut stmt = self.conn.prepare(sql)?;
let rows = stmt.query(params)?;
Ok(serde_rusqlite::from_rows::<T>(rows).next().transpose()?)
}
pub fn transaction(&self) -> Result<TransactionGuard<'_>> {
TransactionGuard::new(&self.conn)
}
fn configure_pragmas(&self) -> Result<()> {
self.conn.execute_batch(
"PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA temp_store = MEMORY;
PRAGMA foreign_keys = ON;",
)?;
Ok(())
}
fn init(&self) -> Result<()> {
let has_schema_version: bool = self.conn.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_version'",
[],
|row| row.get::<_, i32>(0).map(|c| c > 0),
)?;
if !has_schema_version {
self.create_schema()?;
} else {
let version: i32 =
self.conn
.query_row("SELECT version FROM schema_version LIMIT 1", [], |row| {
row.get(0)
})?;
if version != SCHEMA_VERSION {
anyhow::bail!(
"Schema mismatch: found v{}, expected v{}. \
Run 'bob clean' to restart.",
version,
SCHEMA_VERSION
);
}
}
check_history_schema(&self.dbdir)?;
Ok(())
}
fn create_schema(&self) -> Result<()> {
self.conn.execute_batch(&format!(
"CREATE TABLE schema_version (version INTEGER NOT NULL);
INSERT INTO schema_version (version) VALUES ({});
CREATE TABLE outcome_types (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL
);
INSERT INTO outcome_types (id, name) VALUES {outcome_types};
CREATE TABLE stage_types (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL
);
INSERT INTO stage_types (id, name) VALUES {stages};
CREATE TABLE scan_index (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pkgname TEXT UNIQUE NOT NULL,
pkg_location TEXT NOT NULL,
all_depends TEXT,
pkg_skip_reason TEXT,
pkg_fail_reason TEXT,
no_bin_on_ftp TEXT,
restricted TEXT,
categories TEXT,
maintainer TEXT,
use_destdir TEXT,
bootstrap_pkg TEXT,
usergroup_phase TEXT,
scan_depends TEXT,
make_jobs_safe TEXT,
pbulk_weight TEXT,
multi_version TEXT,
scan_outcome INTEGER REFERENCES outcome_types(id),
scan_outcome_detail TEXT
);
CREATE INDEX idx_scan_index_pkg_location ON scan_index(pkg_location);
CREATE INDEX idx_scan_index_status
ON scan_index(pkg_skip_reason, pkg_fail_reason);
CREATE TABLE package_state (
package_id INTEGER PRIMARY KEY
REFERENCES scan_index(id) ON DELETE CASCADE,
selected INTEGER NOT NULL DEFAULT 0,
build_reason TEXT
);
CREATE TABLE resolved_depends (
id INTEGER PRIMARY KEY AUTOINCREMENT,
package_id INTEGER NOT NULL REFERENCES scan_index(id) ON DELETE CASCADE,
depends_on_id INTEGER NOT NULL REFERENCES scan_index(id) ON DELETE CASCADE,
UNIQUE(package_id, depends_on_id)
);
CREATE INDEX idx_resolved_depends_package ON resolved_depends(package_id);
CREATE INDEX idx_resolved_depends_depends_on ON resolved_depends(depends_on_id);
CREATE TABLE builds (
id INTEGER PRIMARY KEY AUTOINCREMENT,
package_id INTEGER NOT NULL REFERENCES scan_index(id) ON DELETE CASCADE,
outcome INTEGER NOT NULL REFERENCES outcome_types(id),
outcome_detail TEXT,
stage INTEGER REFERENCES stage_types(id),
duration_ms INTEGER NOT NULL DEFAULT 0,
log_dir TEXT,
failed_dep_id INTEGER REFERENCES scan_index(id),
UNIQUE(package_id)
);
CREATE INDEX idx_builds_outcome ON builds(outcome);
CREATE INDEX idx_builds_package ON builds(package_id);
CREATE INDEX idx_builds_failed_dep ON builds(failed_dep_id);
CREATE TABLE scan_failures (
pkgpath TEXT PRIMARY KEY,
error TEXT NOT NULL
);
CREATE TABLE metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);",
SCHEMA_VERSION,
outcome_types = PackageState::db_values(),
stages = stage_values(),
))?;
let build_id = chrono::Utc::now().format("%Y%m%dT%H%M%SZ").to_string();
self.conn.execute(
"INSERT INTO metadata (key, value) VALUES ('build_id', ?1)",
params![build_id],
)?;
debug!(version = SCHEMA_VERSION, build_id = %build_id, "Created schema");
Ok(())
}
pub fn store_package(&self, pkgpath: &str, index: &ScanIndex) -> Result<i64> {
let pkgname = index.pkgname.pkgname();
let skip_reason = index.pkg_skip_reason.as_deref().filter(|s| !s.is_empty());
let fail_reason = index.pkg_fail_reason.as_deref().filter(|s| !s.is_empty());
let all_depends = index.all_depends.as_ref().map(|d| d.as_str());
let multi_version = index.multi_version.as_ref().map(|v| v.join(" "));
let mut stmt = self.conn.prepare_cached(
"INSERT OR REPLACE INTO scan_index
(pkgname, pkg_location, all_depends,
pkg_skip_reason, pkg_fail_reason, no_bin_on_ftp,
restricted, categories, maintainer, use_destdir,
bootstrap_pkg, usergroup_phase, scan_depends,
make_jobs_safe, pbulk_weight, multi_version)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
?11, ?12, ?13, ?14, ?15, ?16)",
)?;
stmt.execute(params![
pkgname,
pkgpath,
all_depends,
skip_reason,
fail_reason,
index.no_bin_on_ftp,
index.restricted,
index.categories,
index.maintainer,
index.use_destdir,
index.bootstrap_pkg,
index.usergroup_phase,
index.scan_depends.as_ref().map(|d| d.as_str()),
index.make_jobs_safe,
index.pbulk_weight,
multi_version,
])?;
drop(stmt);
let package_id = self.conn.last_insert_rowid();
self.conn.execute(
"INSERT OR IGNORE INTO package_state (package_id) VALUES (?1)",
params![package_id],
)?;
debug!(pkgname = pkgname, package_id = package_id, "Stored package");
Ok(package_id)
}
pub fn store_scan_pkgpath(&self, pkgpath: &str, indexes: &[ScanIndex]) -> Result<()> {
for index in indexes {
self.store_package(pkgpath, index)?;
}
Ok(())
}
pub fn get_package_by_name(&self, pkgname: &str) -> Result<Option<PackageRow>> {
self.query_one("SELECT * FROM scan_index WHERE pkgname = ?1", [pkgname])
}
pub fn get_pkgname(&self, package_id: i64) -> Result<String> {
self.conn
.query_row(
"SELECT pkgname FROM scan_index WHERE id = ?1",
[package_id],
|row| row.get(0),
)
.context("Package not found")
}
pub fn get_packages_by_path(&self, pkgpath: &str) -> Result<Vec<PackageRow>> {
self.query_rows(
"SELECT * FROM scan_index WHERE pkg_location = ?1",
[pkgpath],
)
}
pub fn get_scanned_pkgpaths(&self) -> Result<HashSet<String>> {
let mut stmt = self
.conn
.prepare("SELECT DISTINCT pkg_location FROM scan_index")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
rows.collect::<Result<HashSet<_>, _>>().map_err(Into::into)
}
pub fn get_unscanned_dependencies(&self) -> Result<HashSet<String>> {
let scanned = self.get_scanned_pkgpaths()?;
let mut stmt = self
.conn
.prepare("SELECT all_depends FROM scan_index WHERE all_depends IS NOT NULL")?;
let mut rows = stmt.query([])?;
let mut unscanned = HashSet::new();
while let Some(row) = rows.next()? {
let raw: String = row.get(0)?;
let deps = AllDepends::from(raw.as_str());
for entry in deps.iter().flatten() {
let pkgpath: String = entry.pkgpath().to_string();
if !scanned.contains(&pkgpath) {
unscanned.insert(pkgpath);
}
}
}
Ok(unscanned)
}
pub fn count_packages(&self) -> Result<i64> {
self.conn
.query_row("SELECT COUNT(*) FROM scan_index", [], |row| row.get(0))
.context("Failed to count packages")
}
pub fn get_all_packages(&self) -> Result<Vec<PackageRow>> {
self.query_rows("SELECT * FROM scan_index ORDER BY id", [])
}
pub fn get_all_package_status(&self) -> Result<Vec<PackageStatusRow>> {
self.query_rows(
"SELECT p.id, p.pkgname, p.pkg_location,
p.pkg_skip_reason, p.pkg_fail_reason, p.multi_version,
s.build_reason,
b.outcome AS build_outcome, b.outcome_detail
FROM scan_index p
JOIN package_state s ON s.package_id = p.id
LEFT JOIN builds b ON b.package_id = p.id
WHERE s.selected = 1",
[],
)
}
pub fn get_buildable_packages(&self) -> Result<Vec<PackageRow>> {
self.query_rows(
"SELECT p.* FROM scan_index p
JOIN package_state s ON s.package_id = p.id
WHERE s.selected = 1
AND p.pkg_skip_reason IS NULL
AND p.pkg_fail_reason IS NULL",
[],
)
}
fn scan_index_from_row(row: &rusqlite::Row) -> rusqlite::Result<ScanIndex> {
Ok(ScanIndex {
pkgname: row.get::<_, String>("pkgname")?.into(),
pkg_location: row
.get::<_, Option<String>>("pkg_location")?
.and_then(|s| s.parse().ok()),
all_depends: row
.get::<_, Option<String>>("all_depends")?
.map(|s| AllDepends::from(s.as_str())),
pkg_skip_reason: row.get("pkg_skip_reason")?,
pkg_fail_reason: row.get("pkg_fail_reason")?,
no_bin_on_ftp: row.get("no_bin_on_ftp")?,
restricted: row.get("restricted")?,
categories: row.get("categories")?,
maintainer: row.get("maintainer")?,
use_destdir: row.get("use_destdir")?,
bootstrap_pkg: row.get("bootstrap_pkg")?,
usergroup_phase: row.get("usergroup_phase")?,
scan_depends: row
.get::<_, Option<String>>("scan_depends")?
.map(|s| ScanDepends::from(s.as_str())),
make_jobs_safe: row.get("make_jobs_safe")?,
pbulk_weight: row.get("pbulk_weight")?,
multi_version: row
.get::<_, Option<String>>("multi_version")?
.map(|s| s.split_ascii_whitespace().map(str::to_string).collect()),
resolved_depends: None,
})
}
pub fn get_full_scan_index(&self, package_id: i64) -> Result<ScanIndex> {
let mut stmt = self
.conn
.prepare_cached("SELECT * FROM scan_index WHERE id = ?1")?;
let mut rows = stmt.query([package_id])?;
let row = rows
.next()?
.ok_or_else(|| anyhow::anyhow!("Package {package_id} not found"))?;
Self::scan_index_from_row(row).map_err(Into::into)
}
pub fn with_scan_data<F, R>(&self, f: F) -> Result<R>
where
F: FnOnce(&mut dyn FnMut() -> Result<Option<ScanIndex>>) -> Result<R>,
{
let mut stmt = self.conn.prepare("SELECT * FROM scan_index ORDER BY id")?;
let mut rows = stmt.query([])?;
let mut pull = || -> Result<Option<ScanIndex>> {
let Some(row) = rows.next()? else {
return Ok(None);
};
let id: i64 = row.get("id")?;
let index = Self::scan_index_from_row(row)
.with_context(|| format!("Failed to read package {id}"))?;
Ok(Some(index))
};
f(&mut pull)
}
pub fn clear_scan(&self) -> Result<()> {
self.conn.execute("DELETE FROM scan_index", [])?;
self.clear_full_scan_complete()?;
Ok(())
}
pub fn store_build_reason(&self, pkgname: &str, reason: &str) -> Result<()> {
self.conn.execute(
"UPDATE package_state SET build_reason = ?1
WHERE package_id = (SELECT id FROM scan_index WHERE pkgname = ?2)",
params![reason, pkgname],
)?;
Ok(())
}
pub fn clear_build_reasons(&self) -> Result<()> {
self.conn
.execute("UPDATE package_state SET build_reason = NULL", [])?;
Ok(())
}
pub fn get_build_reason(&self, pkgname: &str) -> Result<Option<String>> {
let result = self.conn.query_row(
"SELECT s.build_reason FROM package_state s
JOIN scan_index p ON p.id = s.package_id
WHERE p.pkgname = ?1",
[pkgname],
|row| row.get(0),
);
match result {
Ok(reason) => Ok(reason),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn get_all_build_reasons(&self) -> Result<HashMap<String, String>> {
let mut stmt = self.conn.prepare(
"SELECT p.pkgname, s.build_reason FROM scan_index p
JOIN package_state s ON s.package_id = p.id
WHERE s.build_reason IS NOT NULL",
)?;
let rows = stmt.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})?;
Ok(rows.collect::<rusqlite::Result<_>>()?)
}
pub fn store_resolved_deps(&self, summary: &crate::scan::ScanSummary) -> Result<()> {
let id_map: HashMap<String, i64> = self
.conn
.prepare("SELECT pkgname, id FROM scan_index")?
.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<rusqlite::Result<_>>()?;
let tx = self.transaction()?;
let mut stmt = self.conn.prepare(
"INSERT OR IGNORE INTO resolved_depends (package_id, depends_on_id) VALUES (?1, ?2)",
)?;
let mut count: usize = 0;
for pkg in &summary.packages {
let Some(pkgname) = pkg.pkgname() else {
continue;
};
let Some(&pkg_id) = id_map.get(pkgname.pkgname()) else {
continue;
};
for dep in pkg.depends() {
if let Some(&dep_id) = id_map.get(dep.pkgname()) {
stmt.execute(params![pkg_id, dep_id])?;
count += 1;
}
}
}
drop(stmt);
tx.commit()?;
if count > 0 {
debug!(count, "Stored resolved dependencies");
}
Ok(())
}
pub fn store_scan_skipped(&self, summary: &crate::scan::ScanSummary) -> Result<()> {
let tx = self.transaction()?;
let mut stmt = self.conn.prepare(
"UPDATE scan_index
SET scan_outcome = ?1, scan_outcome_detail = ?2
WHERE pkgname = ?3",
)?;
for pkg in &summary.packages {
if let ScanResult::Skipped { state, index, .. } = pkg {
let Some(idx) = index else { continue };
stmt.execute(params![
state.db_id(),
state.to_string(),
idx.pkgname.pkgname(),
])?;
}
}
drop(stmt);
tx.commit()
}
pub fn store_scan_failures(&self, summary: &crate::scan::ScanSummary) -> Result<()> {
self.conn.execute("DELETE FROM scan_failures", [])?;
let mut stmt = self
.conn
.prepare("INSERT INTO scan_failures (pkgpath, error) VALUES (?1, ?2)")?;
for pkg in &summary.packages {
if let ScanResult::ScanFail { pkgpath, error } = pkg {
stmt.execute(params![pkgpath.as_path().display().to_string(), error])?;
}
}
Ok(())
}
pub fn get_scan_failures(&self) -> Result<Vec<(String, String)>> {
let mut stmt = self
.conn
.prepare("SELECT pkgpath, error FROM scan_failures ORDER BY pkgpath")?;
let rows = stmt
.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(rows)
}
pub fn store_resolved_selection(&self, summary: &crate::scan::ScanSummary) -> Result<()> {
let tx = self.transaction()?;
self.conn
.execute("UPDATE package_state SET selected = 0", [])?;
let mut stmt = self.conn.prepare(
"UPDATE package_state SET selected = 1
WHERE package_id = (SELECT id FROM scan_index WHERE pkgname = ?1)",
)?;
for pkg in &summary.packages {
let Some(pkgname) = pkg.pkgname() else {
continue;
};
stmt.execute([pkgname.pkgname()])?;
}
drop(stmt);
tx.commit()
}
pub fn get_transitive_reverse_deps(&self, package_id: i64) -> Result<Vec<i64>> {
let mut stmt = self.conn.prepare(
"WITH RECURSIVE affected(id) AS (
SELECT ?1
UNION
SELECT rd.package_id
FROM resolved_depends rd
JOIN affected a ON rd.depends_on_id = a.id
)
SELECT id FROM affected WHERE id != ?1",
)?;
let rows = stmt.query_map([package_id], |row| row.get::<_, i64>(0))?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn clear_resolved_depends(&self) -> Result<()> {
self.conn.execute("DELETE FROM resolved_depends", [])?;
Ok(())
}
pub fn get_all_resolved_deps(&self) -> Result<HashMap<String, Vec<String>>> {
let mut stmt = self.conn.prepare(
"SELECT p1.pkgname, p2.pkgname
FROM resolved_depends rd
JOIN scan_index p1 ON rd.package_id = p1.id
JOIN scan_index p2 ON rd.depends_on_id = p2.id
ORDER BY p1.pkgname, p2.pkgname",
)?;
let rows = stmt.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})?;
let mut deps: HashMap<String, Vec<String>> = HashMap::new();
for row in rows {
let (pkg, dep) = row?;
deps.entry(pkg).or_default().push(dep);
}
Ok(deps)
}
pub fn load_buildable_packages(
&self,
) -> Result<IndexMap<PkgName, crate::scan::ResolvedPackage>> {
let all_deps = self.get_all_resolved_deps()?;
let mut stmt = self.conn.prepare(
"SELECT p.pkgname, p.pkg_location, p.bootstrap_pkg,
p.usergroup_phase, p.multi_version
FROM scan_index p
JOIN package_state s ON s.package_id = p.id
WHERE s.selected = 1
AND p.scan_outcome IS NULL
ORDER BY p.id",
)?;
let mut out = IndexMap::new();
let mut rows = stmt.query([])?;
while let Some(row) = rows.next()? {
let pkgname: String = row.get("pkgname")?;
let pkg_location: String = row.get("pkg_location")?;
let multi_version = row
.get::<_, Option<String>>("multi_version")?
.map(|s| s.split_ascii_whitespace().map(str::to_string).collect());
let resolved_depends: Vec<PkgName> = all_deps
.get(&pkgname)
.map(|ds| ds.iter().map(|d| d.parse()).collect::<Result<Vec<_>, _>>())
.transpose()?
.unwrap_or_default();
let key: PkgName = pkgname.clone().into();
out.insert(
key,
crate::scan::ResolvedPackage {
pkgpath: pkg_location.parse()?,
index: ScanIndex {
pkgname: pkgname.into(),
bootstrap_pkg: row.get("bootstrap_pkg")?,
usergroup_phase: row.get("usergroup_phase")?,
multi_version,
resolved_depends: Some(resolved_depends),
..Default::default()
},
},
);
}
Ok(out)
}
pub fn get_resolved_dep_ids(&self) -> Result<Vec<(i64, i64)>> {
let mut stmt = self
.conn
.prepare("SELECT package_id, depends_on_id FROM resolved_depends")?;
let rows = stmt.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn store_build_result(&self, package_id: i64, result: &BuildResult) -> Result<()> {
let outcome = result.state.db_id();
let detail = result.state.detail().map(String::from);
let stage = result.build_stats.stage.map(|s| s as i32);
let duration_ms = result.build_stats.duration.as_millis() as i64;
let log_dir = result.log_dir.as_ref().map(|p| p.display().to_string());
self.conn.execute(
"INSERT OR REPLACE INTO builds
(package_id, outcome, outcome_detail, stage, duration_ms, log_dir)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
params![package_id, outcome, detail, stage, duration_ms, log_dir],
)?;
debug!(
package_id = package_id,
outcome = outcome,
"Stored build result"
);
Ok(())
}
pub fn is_successful(&self, pkgname: &str) -> Result<bool> {
let success = PackageStateKind::Success as i32;
Ok(self.conn.query_row(
"SELECT COUNT(*) FROM builds b
JOIN scan_index p ON b.package_id = p.id
WHERE p.pkgname = ?1 AND b.outcome = ?2",
params![pkgname, success],
|row| row.get::<_, i64>(0),
)? > 0)
}
pub fn store_build_by_name(&self, result: &BuildResult) -> Result<()> {
if let Some(pkg) = self.get_package_by_name(result.pkgname.pkgname())? {
self.store_build_result(pkg.id, result)
} else {
warn!(pkgname = %result.pkgname.pkgname(), "Package not found in database for build result");
Ok(())
}
}
pub fn get_build_result(&self, package_id: i64) -> Result<Option<BuildResult>> {
let mut stmt = self.conn.prepare(
"SELECT p.pkgname, p.pkg_location AS pkgpath,
b.outcome, b.outcome_detail,
b.stage, b.duration_ms, b.log_dir
FROM builds b
JOIN scan_index p ON b.package_id = p.id
WHERE b.package_id = ?1",
)?;
let mut rows = stmt.query([package_id])?;
match rows.next()? {
Some(row) => Ok(build_result_from_row(row)?),
None => Ok(None),
}
}
pub fn delete_build_by_name(&self, pkgname: &str) -> Result<bool> {
let rows = self.conn.execute(
"DELETE FROM builds WHERE package_id IN (SELECT id FROM scan_index WHERE pkgname = ?1)",
params![pkgname],
)?;
Ok(rows > 0)
}
pub fn delete_build_by_pkgpath(&self, pkgpath: &str) -> Result<usize> {
let rows = self.conn.execute(
"DELETE FROM builds WHERE package_id IN (SELECT id FROM scan_index WHERE pkg_location = ?1)",
params![pkgpath],
)?;
Ok(rows)
}
pub fn clear_builds(&self) -> Result<usize> {
let rows = self.conn.execute("DELETE FROM builds", [])?;
Ok(rows)
}
pub fn get_all_build_results(&self) -> Result<Vec<BuildResult>> {
let mut stmt = self.conn.prepare(
"SELECT p.pkgname, p.pkg_location AS pkgpath,
b.outcome, b.outcome_detail,
b.stage, b.duration_ms, b.log_dir
FROM builds b
JOIN scan_index p ON b.package_id = p.id
ORDER BY p.pkgname",
)?;
let mut rows = stmt.query([])?;
let mut results = Vec::new();
while let Some(row) = rows.next()? {
if let Some(r) = build_result_from_row(row)? {
results.push(r);
}
}
Ok(results)
}
pub fn count_breaks_for_failed(&self) -> Result<HashMap<String, usize>> {
let mut counts: HashMap<String, usize> = HashMap::new();
let mut stmt = self.conn.prepare(
"SELECT p.pkgname, COUNT(*)
FROM builds b
JOIN scan_index p ON b.failed_dep_id = p.id
WHERE b.outcome = ?1
GROUP BY b.failed_dep_id",
)?;
let rows = stmt.query_map([PackageStateKind::IndirectFailed as i32], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
})?;
for row in rows {
let (pkgname, count) = row?;
counts.insert(pkgname, count as usize);
}
Ok(counts)
}
pub fn add_build_duration(&self, duration: Duration) -> Result<()> {
let existing = self.get_build_duration()?;
let total_ms = (existing + duration).as_millis() as i64;
self.conn.execute(
"INSERT OR REPLACE INTO metadata (key, value) VALUES ('build_duration_ms', ?1)",
params![total_ms.to_string()],
)?;
Ok(())
}
pub fn get_build_duration(&self) -> Result<Duration> {
let ms: i64 = self
.conn
.query_row(
"SELECT COALESCE(
(SELECT value FROM metadata WHERE key = 'build_duration_ms'),
'0'
)",
[],
|row| {
let s: String = row.get(0)?;
Ok(s.parse::<i64>().unwrap_or(0))
},
)
.unwrap_or(0);
Ok(Duration::from_millis(ms as u64))
}
pub fn get_blockers(&self, package_id: i64) -> Result<Vec<(String, String, String)>> {
let mut stmt = self.conn.prepare(
"WITH RECURSIVE
blocking(id, outcome) AS (
-- Direct dependencies that have failed/skipped builds
SELECT rd.depends_on_id, b.outcome
FROM resolved_depends rd
JOIN builds b ON b.package_id = rd.depends_on_id
WHERE rd.package_id = ?1
AND b.outcome NOT IN (?2, ?3)
UNION
-- Transitive: deps of deps that are blocked
SELECT rd.depends_on_id, b.outcome
FROM resolved_depends rd
JOIN blocking bl ON rd.package_id = bl.id
JOIN builds b ON b.package_id = rd.depends_on_id
WHERE b.outcome NOT IN (?2, ?3)
)
SELECT DISTINCT p.pkgname, p.pkg_location, bl.outcome
FROM blocking bl
JOIN scan_index p ON bl.id = p.id
-- Only show root causes (failed or prefailed/preskipped), not indirect
WHERE bl.outcome IN (?4, ?5, ?6, ?7)
ORDER BY p.pkgname",
)?;
let rows = stmt.query_map(
params![
package_id,
PackageStateKind::Success as i32,
PackageStateKind::UpToDate as i32,
PackageStateKind::Failed as i32,
PackageStateKind::PreFailed as i32,
PackageStateKind::PreSkipped as i32,
PackageStateKind::Unresolved as i32,
],
|row| {
let pkgname: String = row.get(0)?;
let pkgpath: String = row.get(1)?;
let outcome_id: i32 = row.get(2)?;
let kind = PackageStateKind::from_repr(outcome_id).ok_or_else(|| {
rusqlite::Error::FromSqlConversionFailure(
2,
rusqlite::types::Type::Integer,
format!("unknown outcome type id: {}", outcome_id).into(),
)
})?;
let status: &str = kind.into();
Ok((pkgname, pkgpath, status.to_string()))
},
)?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn get_blocked_by(&self, package_id: i64) -> Result<Vec<(String, String)>> {
let mut stmt = self.conn.prepare(
"WITH RECURSIVE
affected(id) AS (
-- Direct reverse dependencies
SELECT rd.package_id
FROM resolved_depends rd
WHERE rd.depends_on_id = ?1
UNION
-- Transitive reverse dependencies
SELECT rd.package_id
FROM resolved_depends rd
JOIN affected a ON rd.depends_on_id = a.id
)
SELECT p.pkgname, p.pkg_location
FROM affected a
JOIN scan_index p ON a.id = p.id
ORDER BY p.pkgname",
)?;
let rows = stmt.query_map([package_id], |row| Ok((row.get(0)?, row.get(1)?)))?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn get_prefailskip_packages(&self) -> Result<Vec<(String, Option<String>, PackageState)>> {
let mut stmt = self.conn.prepare(
"SELECT pkgname, pkg_location, scan_outcome, scan_outcome_detail
FROM scan_index
WHERE scan_outcome IS NOT NULL
ORDER BY pkgname",
)?;
let rows = stmt.query_map([], |row| {
let outcome: i32 = row.get("scan_outcome")?;
let detail: Option<String> = row.get("scan_outcome_detail")?;
let state = PackageState::from_db(outcome, detail).ok_or_else(|| {
rusqlite::Error::FromSqlConversionFailure(
2,
rusqlite::types::Type::Integer,
format!("unknown scan_outcome id: {outcome}").into(),
)
})?;
Ok((row.get("pkgname")?, row.get("pkg_location")?, state))
})?;
Ok(rows.collect::<Result<_, _>>()?)
}
pub fn get_indirect_failures(&self) -> Result<Vec<(String, Option<String>, String)>> {
let mut stmt = self.conn.prepare(
"WITH RECURSIVE
-- Only direct failures are root causes
failed_pkgs(id) AS (
SELECT package_id FROM builds
WHERE outcome IN (?1, ?2)
),
-- Packages affected by failures (transitive closure)
affected(id, root_id) AS (
SELECT id, id FROM failed_pkgs
UNION
SELECT rd.package_id, a.root_id
FROM resolved_depends rd
JOIN affected a ON rd.depends_on_id = a.id
WHERE rd.package_id NOT IN (SELECT id FROM failed_pkgs)
)
SELECT p.pkgname, p.pkg_location, GROUP_CONCAT(DISTINCT fp.pkgname) as failed_deps
FROM affected a
JOIN scan_index p ON a.id = p.id
JOIN scan_index fp ON a.root_id = fp.id
WHERE a.id != a.root_id
AND NOT EXISTS (SELECT 1 FROM builds b WHERE b.package_id = a.id)
AND p.scan_outcome IS NULL
GROUP BY p.id, p.pkgname, p.pkg_location
ORDER BY p.pkgname",
)?;
let rows = stmt.query_map(
params![
PackageStateKind::Failed as i32,
PackageStateKind::PreFailed as i32,
],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
)?;
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub fn mark_failure_cascade(
&self,
package_id: i64,
reason: &str,
duration: Duration,
) -> Result<usize> {
let pkgname = self.get_pkgname(package_id)?;
let mut stmt = self.conn.prepare(
"WITH RECURSIVE affected(id, depth) AS (
SELECT ?1, 0
UNION
SELECT rd.package_id, a.depth + 1
FROM resolved_depends rd
JOIN affected a ON rd.depends_on_id = a.id
)
SELECT id, depth FROM affected ORDER BY depth",
)?;
let affected: Vec<(i64, i32)> = stmt
.query_map([package_id], |row| {
Ok((row.get::<_, i64>(0)?, row.get::<_, i32>(1)?))
})?
.filter_map(|r| r.ok())
.collect();
let tx = self.transaction()?;
for (id, depth) in &affected {
let (outcome, detail, dur, dep_id) = if *depth == 0 {
(
PackageStateKind::Failed as i32,
reason.to_string(),
duration.as_millis() as i64,
None,
)
} else {
(
PackageStateKind::IndirectFailed as i32,
format!("depends on failed {}", pkgname),
0,
Some(package_id),
)
};
self.conn.execute(
"INSERT OR REPLACE INTO builds
(package_id, outcome, outcome_detail, duration_ms,
failed_dep_id)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![id, outcome, detail, dur, dep_id],
)?;
}
tx.commit()?;
debug!(
package_id = package_id,
affected_count = affected.len(),
"Marked failure cascade"
);
Ok(affected.len())
}
pub fn build_id(&self) -> Result<String> {
self.conn
.query_row(
"SELECT value FROM metadata WHERE key = 'build_id'",
[],
|row| row.get(0),
)
.context("build_id not found in database")
}
pub fn full_scan_complete(&self) -> bool {
self.conn
.query_row(
"SELECT value FROM metadata WHERE key = 'full_scan_complete'",
[],
|row| row.get::<_, String>(0),
)
.map(|v| v == "true")
.unwrap_or(false)
}
pub fn set_full_scan_complete(&self) -> Result<()> {
self.conn.execute(
"INSERT OR REPLACE INTO metadata (key, value) VALUES ('full_scan_complete', 'true')",
[],
)?;
Ok(())
}
pub fn clear_full_scan_complete(&self) -> Result<()> {
self.conn
.execute("DELETE FROM metadata WHERE key = 'full_scan_complete'", [])?;
Ok(())
}
pub fn store_pkgsrc_env(&self, env: &PkgsrcEnv) -> Result<()> {
let json = serde_json::json!({
"packages": env.packages,
"pkgtools": env.pkgtools,
"prefix": env.prefix,
"pkg_dbdir": env.pkg_dbdir,
"pkg_refcount_dbdir": env.pkg_refcount_dbdir,
"metadata": env.metadata,
"cachevars": env.cachevars,
});
self.conn.execute(
"INSERT INTO metadata (key, value) VALUES ('pkgsrc_env', ?1)",
params![json.to_string()],
)?;
Ok(())
}
pub fn load_pkgsrc_env(&self) -> Result<PkgsrcEnv> {
let json_str: String = self
.conn
.query_row(
"SELECT value FROM metadata WHERE key = 'pkgsrc_env'",
[],
|row| row.get(0),
)
.context("pkgsrc environment not found in database")?;
let json: serde_json::Value =
serde_json::from_str(&json_str).context("Invalid pkgsrc_env JSON")?;
let get_path = |key: &str| -> Result<PathBuf> {
json.get(key)
.and_then(|v| v.as_str())
.map(PathBuf::from)
.ok_or_else(|| anyhow::anyhow!("Missing {} in pkgsrc_env", key))
};
let metadata: HashMap<String, String> = json
.get("metadata")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
let cachevars: HashMap<String, String> = json
.get("cachevars")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
Ok(PkgsrcEnv {
packages: get_path("packages")?,
pkgtools: get_path("pkgtools")?,
prefix: get_path("prefix")?,
pkg_dbdir: get_path("pkg_dbdir")?,
pkg_refcount_dbdir: get_path("pkg_refcount_dbdir")?,
metadata,
cachevars,
})
}
pub fn store_vcs_info(&self, info: &crate::vcs::VcsInfo) -> Result<()> {
let json = serde_json::to_string(info)?;
self.conn.execute(
"INSERT OR REPLACE INTO metadata (key, value) VALUES ('vcs_info', ?1)",
params![json],
)?;
Ok(())
}
pub fn load_vcs_info(&self) -> Result<crate::vcs::VcsInfo> {
let json_str: String = self
.conn
.query_row(
"SELECT value FROM metadata WHERE key = 'vcs_info'",
[],
|row| row.get(0),
)
.context("vcs_info not found in database")?;
serde_json::from_str(&json_str).context("Invalid vcs_info JSON")
}
pub fn get_failed_packages(&self) -> Result<Vec<String>> {
let mut stmt = self.conn.prepare(
"SELECT p.pkgname FROM builds b
JOIN scan_index p ON b.package_id = p.id
WHERE b.outcome = ?1
ORDER BY p.pkgname",
)?;
let pkgnames = stmt
.query_map([PackageStateKind::Failed as i32], |row| {
row.get::<_, String>(0)
})?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(pkgnames)
}
pub fn get_successful_packages(&self) -> Result<Vec<String>> {
let mut stmt = self.conn.prepare(
"SELECT p.pkgname FROM builds b
JOIN scan_index p ON b.package_id = p.id
WHERE b.outcome IN (?1, ?2)
ORDER BY p.pkgname",
)?;
let pkgnames = stmt
.query_map(
params![
PackageStateKind::Success as i32,
PackageStateKind::UpToDate as i32,
],
|row| row.get::<_, String>(0),
)?
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(pkgnames)
}
pub fn get_restricted_packages(&self) -> Result<HashMap<String, String>> {
let mut stmt = self.conn.prepare(
"SELECT pkgname, no_bin_on_ftp FROM scan_index
WHERE no_bin_on_ftp IS NOT NULL AND no_bin_on_ftp != ''",
)?;
let restricted = stmt
.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})?
.collect::<std::result::Result<HashMap<_, _>, _>>()?;
Ok(restricted)
}
pub fn get_report_data(&self) -> Result<Vec<ReportRow>> {
let resolved = self.get_all_resolved_deps()?;
let mut stmt = self.conn.prepare(
"SELECT p.*,
COALESCE(b.outcome, p.scan_outcome) AS outcome,
COALESCE(b.outcome_detail, p.scan_outcome_detail) AS outcome_detail
FROM scan_index p
LEFT JOIN builds b ON b.package_id = p.id
ORDER BY p.pkgname",
)?;
let mut rows = stmt.query([])?;
let mut out = Vec::new();
while let Some(row) = rows.next()? {
let pkgname: String = row.get("pkgname")?;
let outcome: Option<i32> = row.get("outcome")?;
let detail: Option<String> = row.get("outcome_detail")?;
let mut index = Self::scan_index_from_row(row)?;
if let Some(deps) = resolved.get(&pkgname) {
index.resolved_depends = Some(deps.iter().map(|d| d.parse()).collect::<Result<
Vec<PkgName>,
_,
>>(
)?);
}
out.push((pkgname, index, outcome, detail));
}
Ok(out)
}
pub fn execute_raw(&self, sql: &str) -> Result<()> {
let mut stmt = self.conn.prepare(sql)?;
let column_count = stmt.column_count();
if column_count == 0 {
let affected = stmt.execute([])?;
if affected > 0 {
println!("{} row(s) affected", affected);
}
} else {
let mut rows = stmt.query([])?;
while let Some(row) = rows.next()? {
let values: Vec<String> = (0..column_count)
.map(|i| {
row.get_ref(i)
.map(|v| match v {
rusqlite::types::ValueRef::Null => String::new(),
rusqlite::types::ValueRef::Integer(i) => i.to_string(),
rusqlite::types::ValueRef::Real(f) => f.to_string(),
rusqlite::types::ValueRef::Text(s) => {
String::from_utf8_lossy(s).to_string()
}
rusqlite::types::ValueRef::Blob(b) => {
format!("<blob:{} bytes>", b.len())
}
})
.unwrap_or_default()
})
.collect();
if !try_println(&values.join("|")) {
break;
}
}
}
Ok(())
}
pub(crate) fn history_conn(&self) -> Result<&Connection> {
if let Some(conn) = self.history_conn.get() {
return Ok(conn);
}
let conn = open_history_conn(&self.dbdir)?;
let _ = self.history_conn.set(conn);
self.history_conn
.get()
.ok_or_else(|| anyhow::anyhow!("history connection not initialized"))
}
pub fn record_history(&self, rec: &crate::History) -> Result<()> {
let conn = self.history_conn()?;
record_history_to(conn, rec)
}
pub fn query_history(&self, pattern: Option<®ex::Regex>) -> Result<Vec<crate::History>> {
let conn = self.history_conn()?;
let cols: String = HistoryKind::VARIANTS
.iter()
.map(|v| format!("bh.{}", <&str>::from(v)))
.collect::<Vec<_>>()
.join(", ");
let sql = format!(
"SELECT bh.id, {cols}, \
wt.stage, wt.duration, ct.duration \
FROM build_history bh \
LEFT JOIN wall_times wt ON wt.history_id = bh.id \
LEFT JOIN cpu_times ct ON ct.history_id = bh.id \
AND ct.stage = wt.stage \
WHERE bh.outcome IN ({success}, {failed}) \
ORDER BY bh.timestamp DESC, bh.id DESC",
success = PackageStateKind::Success as i32,
failed = PackageStateKind::Failed as i32,
);
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, i64>(0)?,
row.get::<_, i64>(1)?,
row.get::<_, String>(2)?,
row.get::<_, String>(3)?,
row.get::<_, String>(4)?,
row.get::<_, i32>(5)?,
row.get::<_, Option<i32>>(6)?,
row.get::<_, Option<i64>>(7)?.map(|v| v as usize),
row.get::<_, i64>(8)?,
row.get::<_, Option<i64>>(9)?,
row.get::<_, Option<String>>(10)?,
row.get::<_, Option<String>>(11)?,
row.get::<_, Option<i32>>(12)?,
row.get::<_, Option<i64>>(13)?,
row.get::<_, Option<i64>>(14)?,
))
})?;
let mut results: Vec<crate::History> = Vec::new();
let mut current_id: Option<i64> = None;
let mut current_accepted = false;
for row in rows {
let (
id,
timestamp,
pkgpath,
pkgname,
pkgbase,
outcome_id,
stage_id,
make_jobs,
duration,
disk_usage,
wrkobjdir,
build_id,
wt_stage,
wt_duration,
ct_duration,
) = row?;
if Some(id) == current_id {
if current_accepted {
if let Some(last) = results.last_mut() {
if let Some(stage_id) = wt_stage {
let stage = Stage::from_repr(stage_id)
.ok_or_else(|| anyhow::anyhow!("Unknown stage id: {}", stage_id))?;
if let Some(ms) = wt_duration {
last.stage_durations
.push((stage, Duration::from_millis(ms as u64)));
}
if let Some(ms) = ct_duration {
last.stage_cpu_times
.push((stage, Duration::from_millis(ms as u64)));
}
}
}
}
continue;
}
current_id = Some(id);
current_accepted = false;
if let Some(re) = pattern {
if !re.is_match(&pkgpath) && !re.is_match(&pkgname) {
continue;
}
}
current_accepted = true;
let outcome = PackageState::from_db(outcome_id, None)
.ok_or_else(|| anyhow::anyhow!("Unknown outcome type id: {}", outcome_id))?;
let stage = stage_id
.map(|id| {
Stage::from_repr(id).ok_or_else(|| anyhow::anyhow!("Unknown stage id: {}", id))
})
.transpose()?;
let mut stage_durations = Vec::new();
let mut stage_cpu_times = Vec::new();
if let Some(st_id) = wt_stage {
let st = Stage::from_repr(st_id)
.ok_or_else(|| anyhow::anyhow!("Unknown stage id: {}", st_id))?;
if let Some(ms) = wt_duration {
stage_durations.push((st, Duration::from_millis(ms as u64)));
}
if let Some(ms) = ct_duration {
stage_cpu_times.push((st, Duration::from_millis(ms as u64)));
}
}
results.push(crate::History {
timestamp,
pkgpath,
pkgname,
pkgbase,
outcome,
stage,
make_jobs,
duration: Duration::from_millis(duration as u64),
disk_usage: disk_usage.map(|s| s as u64),
wrkobjdir: wrkobjdir.and_then(|s| s.parse().ok()),
stage_durations,
stage_cpu_times,
build_id,
});
}
Ok(results)
}
pub fn build_history_by_pkg_all(
&self,
build_id: Option<&str>,
) -> HashMap<String, PkgBuildHistory> {
let conn = match self.history_conn() {
Ok(c) => c,
Err(e) => {
warn!(
error = format!("{e:#}"),
"build_history_by_pkg_all: failed to open history db"
);
return HashMap::new();
}
};
let du: &str = HistoryKind::DiskUsage.into();
let out: &str = HistoryKind::Outcome.into();
let mj: &str = HistoryKind::MakeJobs.into();
let wo: &str = HistoryKind::Wrkobjdir.into();
let pkgbase_col: &str = HistoryKind::Pkgbase.into();
let pkgpath_col: &str = HistoryKind::Pkgpath.into();
let where_clause = if build_id.is_some() {
"WHERE h.build_id = ?1"
} else {
""
};
let sql = format!(
"WITH latest AS ( \
SELECT h.{pkgbase_col}, h.{du}, h.{out}, h.{mj}, h.{wo}, \
ROW_NUMBER() OVER ( \
PARTITION BY h.{pkgpath_col}, h.{pkgbase_col} \
ORDER BY h.id DESC \
) AS rn \
FROM build_history h \
{where_clause} \
) \
SELECT {pkgbase_col}, {out}, {du}, {mj}, {wo} FROM latest \
WHERE rn = 1",
);
let mut stmt = match conn.prepare(&sql) {
Ok(s) => s,
Err(e) => {
warn!(
error = format!("{e:#}"),
"build_history_by_pkg_all: failed to prepare query"
);
return HashMap::new();
}
};
let map_row = |row: &rusqlite::Row<'_>| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, i32>(1)?,
row.get::<_, Option<i64>>(2)?,
row.get::<_, Option<i64>>(3)?,
row.get::<_, Option<String>>(4)?,
))
};
let rows = match build_id {
Some(id) => stmt.query_map(params![id], map_row),
None => stmt.query_map([], map_row),
};
let rows = match rows {
Ok(r) => r,
Err(e) => {
warn!(
error = format!("{e:#}"),
"build_history_by_pkg_all: query failed"
);
return HashMap::new();
}
};
let mut result = HashMap::new();
for row in rows.flatten() {
let (pkgbase, outcome, du, mj, wo) = row;
result.insert(
pkgbase,
PkgBuildHistory {
outcome: PackageStateKind::from_repr(outcome),
disk_usage: du.map(|v| v as u64),
make_jobs: mj.map(|v| v as u32),
wrkobjdir: wo,
},
);
}
result
}
pub fn duration_by_pkg_all(&self) -> HashMap<String, u64> {
let conn = match self.history_conn() {
Ok(c) => c,
Err(e) => {
warn!(
error = format!("{e:#}"),
"duration_by_pkg_all: failed to open history db"
);
return HashMap::new();
}
};
let dur: &str = HistoryKind::Duration.into();
let out: &str = HistoryKind::Outcome.into();
let pkgbase_col: &str = HistoryKind::Pkgbase.into();
let pkgpath_col: &str = HistoryKind::Pkgpath.into();
let success_outcome = PackageStateKind::Success as i32;
let build_stage = Stage::Build as i32;
let jobs: &str = HistoryKind::MakeJobs.into();
let sql = format!(
"WITH matched AS ( \
SELECT h.{pkgbase_col}, h.{dur}, h.{jobs}, h.id, \
ROW_NUMBER() OVER ( \
PARTITION BY h.{pkgpath_col}, h.{pkgbase_col} \
ORDER BY h.id DESC \
) AS rn \
FROM build_history h \
WHERE h.{out} = {success_outcome} \
) \
SELECT m.{pkgbase_col}, \
COALESCE(wt.duration, m.{dur}) \
* COALESCE(m.{jobs}, 1) \
FROM matched m \
LEFT JOIN wall_times wt ON wt.history_id = m.id \
AND wt.stage = {build_stage} \
WHERE m.rn = 1",
);
let mut stmt = match conn.prepare(&sql) {
Ok(s) => s,
Err(e) => {
warn!(
error = format!("{e:#}"),
"duration_by_pkg_all: failed to prepare query"
);
return HashMap::new();
}
};
let rows = match stmt.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
}) {
Ok(r) => r,
Err(e) => {
warn!(
error = format!("{e:#}"),
"duration_by_pkg_all: query failed"
);
return HashMap::new();
}
};
let mut result = HashMap::new();
for row in rows.flatten() {
let (pkgbase, ms) = row;
result.insert(pkgbase, ms as u64);
}
result
}
pub fn store_cpu_usage(&self, samples: &[crate::cpu::CpuSample]) -> Result<()> {
if samples.is_empty() {
return Ok(());
}
let conn = self.history_conn()?;
let tx = conn.unchecked_transaction()?;
{
let mut stmt = tx.prepare_cached(
"INSERT INTO cpu_usage (timestamp, user_pct, sys_pct) \
VALUES (?1, ?2, ?3)",
)?;
for s in samples {
stmt.execute(params![s.timestamp, s.user_pct as i32, s.sys_pct as i32])?;
}
}
tx.commit()?;
Ok(())
}
pub fn list_history_builds(&self) -> Result<Vec<BuildListEntry>> {
let conn = self.history_conn()?;
let success = PackageStateKind::Success as i32;
let up_to_date = PackageStateKind::UpToDate as i32;
let failed = PackageStateKind::Failed as i32;
let mut stmt = conn.prepare(
"WITH latest AS ( \
SELECT build_id, outcome, \
ROW_NUMBER() OVER ( \
PARTITION BY build_id, pkgpath \
ORDER BY id DESC \
) AS rn \
FROM build_history \
WHERE build_id IS NOT NULL \
) \
SELECT build_id, COUNT(*) AS packages, \
SUM(CASE WHEN outcome = ?1 THEN 1 ELSE 0 END), \
SUM(CASE WHEN outcome = ?2 THEN 1 ELSE 0 END), \
SUM(CASE WHEN outcome = ?3 THEN 1 ELSE 0 END), \
SUM(CASE WHEN outcome NOT IN (?1, ?2, ?3) THEN 1 ELSE 0 END) \
FROM latest \
WHERE rn = 1 \
GROUP BY build_id \
ORDER BY build_id DESC",
)?;
let rows = stmt.query_map(params![success, up_to_date, failed], |row| {
Ok(BuildListEntry {
build_id: row.get(0)?,
package_count: row.get::<_, i64>(1)? as usize,
succeeded: row.get::<_, i64>(2)? as usize,
up_to_date: row.get::<_, i64>(3)? as usize,
failed: row.get::<_, i64>(4)? as usize,
masked: row.get::<_, i64>(5)? as usize,
})
})?;
rows.collect::<Result<Vec<_>, _>>()
.context("Failed to list history builds")
}
pub fn store_build_revision(&self, build_id: &str, revision: &str) -> Result<()> {
let conn = self.history_conn()?;
conn.execute(
"INSERT OR REPLACE INTO build_metadata (build_id, revision) VALUES (?1, ?2)",
params![build_id, revision],
)?;
Ok(())
}
pub fn get_build_revision(&self, build_id: &str) -> Result<Option<String>> {
let conn = self.history_conn()?;
match conn.query_row(
"SELECT revision FROM build_metadata WHERE build_id = ?1",
[build_id],
|row| row.get(0),
) {
Ok(rev) => Ok(Some(rev)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn compute_build_diff(&self, build1_id: &str, build2_id: &str) -> Result<BuildDiff> {
let conn = self.history_conn()?;
type PkgRecord = (String, i32, Option<i32>);
let query_build = |bid: &str| -> Result<HashMap<String, PkgRecord>> {
let mut stmt = conn.prepare(
"SELECT pkgpath, pkgname, outcome, stage FROM ( \
SELECT pkgpath, pkgname, outcome, stage, \
ROW_NUMBER() OVER (PARTITION BY pkgpath ORDER BY id DESC) AS rn \
FROM build_history WHERE build_id = ?1 \
) WHERE rn = 1",
)?;
let mut map = HashMap::new();
let rows = stmt.query_map([bid], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, i32>(2)?,
row.get::<_, Option<i32>>(3)?,
))
})?;
for row in rows {
let (pkgpath, pkgname, outcome, stage) = row?;
map.insert(pkgpath, (pkgname, outcome, stage));
}
Ok(map)
};
let b1 = query_build(build1_id)?;
let b2 = query_build(build2_id)?;
let mut diff = BuildDiff {
build1_id: build1_id.to_string(),
build2_id: build2_id.to_string(),
new_failures: Vec::new(),
fixes: Vec::new(),
version_changes: Vec::new(),
other_changes: Vec::new(),
};
use PackageStateKind::*;
for (pkgpath, (b2_pkgname, b2_outcome_id, b2_stage_id)) in &b2 {
let b2_outcome = PackageStateKind::from_repr(*b2_outcome_id);
let b2_stage = b2_stage_id.and_then(Stage::from_repr);
let entry_from = |b1_pkg: Option<&PkgRecord>| DiffEntry {
pkgpath: pkgpath.clone(),
build1_pkgname: b1_pkg.map(|(n, _, _)| n.clone()),
build2_pkgname: Some(b2_pkgname.clone()),
build1_outcome: b1_pkg.and_then(|(_, o, _)| PackageStateKind::from_repr(*o)),
build2_outcome: b2_outcome,
build1_stage: b1_pkg.and_then(|(_, _, s)| s.and_then(Stage::from_repr)),
build2_stage: b2_stage,
};
match b1.get(pkgpath) {
Some(b1_rec) => {
let b1_outcome = PackageStateKind::from_repr(b1_rec.1);
let b1_stage = b1_rec.2.and_then(Stage::from_repr);
if b1_outcome == b2_outcome && b1_rec.0 == *b2_pkgname && b1_stage == b2_stage {
continue;
}
let entry = entry_from(Some(b1_rec));
match (b1_outcome, b2_outcome) {
(Some(b1o), Some(b2o)) => {
let was_failed = matches!(b1o, Failed);
let now_failed = matches!(b2o, Failed);
let now_ok = matches!(b2o, Success | UpToDate);
if !was_failed && now_failed {
diff.new_failures.push(entry);
} else if was_failed && now_ok {
diff.fixes.push(entry);
} else if was_failed && now_failed {
diff.version_changes.push(entry);
} else {
diff.other_changes.push(entry);
}
}
_ => diff.other_changes.push(entry),
}
}
None => {
let entry = entry_from(None);
if matches!(b2_outcome, Some(Failed)) {
diff.new_failures.push(entry);
} else {
diff.other_changes.push(entry);
}
}
}
}
for (pkgpath, (b1_pkgname, b1_outcome_id, b1_stage_id)) in &b1 {
if b2.contains_key(pkgpath) {
continue;
}
let b1_outcome = PackageStateKind::from_repr(*b1_outcome_id);
let entry = DiffEntry {
pkgpath: pkgpath.clone(),
build1_pkgname: Some(b1_pkgname.clone()),
build2_pkgname: None,
build1_outcome: b1_outcome,
build2_outcome: None,
build1_stage: b1_stage_id.and_then(Stage::from_repr),
build2_stage: None,
};
if matches!(b1_outcome, Some(Failed)) {
diff.fixes.push(entry);
} else {
diff.other_changes.push(entry);
}
}
Ok(diff)
}
}
pub struct DiffEntry {
pub pkgpath: String,
pub build1_pkgname: Option<String>,
pub build2_pkgname: Option<String>,
pub build1_outcome: Option<PackageStateKind>,
pub build2_outcome: Option<PackageStateKind>,
pub build1_stage: Option<Stage>,
pub build2_stage: Option<Stage>,
}
pub struct BuildDiff {
pub build1_id: String,
pub build2_id: String,
pub new_failures: Vec<DiffEntry>,
pub fixes: Vec<DiffEntry>,
pub version_changes: Vec<DiffEntry>,
pub other_changes: Vec<DiffEntry>,
}
pub struct BuildListEntry {
pub build_id: String,
pub package_count: usize,
pub succeeded: usize,
pub up_to_date: usize,
pub failed: usize,
pub masked: usize,
}
fn check_history_schema(dbdir: &Path) -> Result<()> {
let path = dbdir.join("history.db");
if !path.exists() {
return Ok(());
}
let conn = Connection::open(&path).context("Failed to open history database")?;
let has_schema: bool = conn.query_row(
"SELECT COUNT(*) FROM sqlite_master \
WHERE type='table' AND name='schema_version'",
[],
|row| row.get::<_, i32>(0).map(|c| c > 0),
)?;
if has_schema {
let version: i32 =
conn.query_row("SELECT version FROM schema_version LIMIT 1", [], |row| {
row.get(0)
})?;
if version != HISTORY_SCHEMA_VERSION {
anyhow::bail!(
"History schema mismatch: found v{version}, expected v{expected}. \
Remove {} to reset.",
path.display(),
expected = HISTORY_SCHEMA_VERSION
);
}
}
Ok(())
}
fn open_history_conn(dbdir: &Path) -> Result<Connection> {
let conn =
Connection::open(dbdir.join("history.db")).context("Failed to open history database")?;
conn.execute_batch(
"PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA cache_size = -8000;
PRAGMA temp_store = MEMORY;
PRAGMA foreign_keys = ON;",
)?;
let has_schema: bool = conn.query_row(
"SELECT COUNT(*) FROM sqlite_master \
WHERE type='table' AND name='schema_version'",
[],
|row| row.get::<_, i32>(0).map(|c| c > 0),
)?;
if !has_schema {
create_history_schema(&conn)?;
}
Ok(conn)
}
pub(crate) fn create_history_schema(conn: &Connection) -> Result<()> {
conn.execute_batch(&format!(
"CREATE TABLE schema_version (version INTEGER NOT NULL);
INSERT INTO schema_version (version) VALUES ({});
CREATE TABLE outcome_types (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL
);
INSERT INTO outcome_types (id, name) VALUES {outcome_types};
CREATE TABLE stage_types (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL
);
INSERT INTO stage_types (id, name) VALUES {stages};
CREATE TABLE build_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
{history_columns}
);
CREATE INDEX idx_history_pkgpath
ON build_history(pkgpath);
CREATE INDEX idx_history_pkgpath_pkgbase
ON build_history(pkgpath, pkgbase);
CREATE INDEX idx_history_pkgpath_outcome
ON build_history(pkgpath, outcome);
CREATE INDEX idx_history_timestamp
ON build_history(timestamp);
CREATE INDEX idx_history_build_id
ON build_history(build_id, pkgpath, id);
CREATE TABLE wall_times (
history_id INTEGER NOT NULL
REFERENCES build_history(id) ON DELETE CASCADE,
stage INTEGER NOT NULL REFERENCES stage_types(id),
duration INTEGER NOT NULL,
PRIMARY KEY (history_id, stage)
);
CREATE TABLE cpu_times (
history_id INTEGER NOT NULL
REFERENCES build_history(id) ON DELETE CASCADE,
stage INTEGER NOT NULL REFERENCES stage_types(id),
duration INTEGER NOT NULL,
PRIMARY KEY (history_id, stage)
);
CREATE TABLE cpu_usage (
timestamp INTEGER NOT NULL,
user_pct INTEGER NOT NULL,
sys_pct INTEGER NOT NULL
);
CREATE INDEX idx_cpu_timestamp ON cpu_usage(timestamp);
CREATE TABLE build_metadata (
build_id TEXT PRIMARY KEY,
revision TEXT
);",
HISTORY_SCHEMA_VERSION,
outcome_types = PackageState::db_values(),
stages = stage_values(),
history_columns = history_schema(),
))?;
Ok(())
}
pub(crate) fn record_history_to(conn: &Connection, rec: &crate::History) -> Result<()> {
let cols: Vec<&str> = HistoryKind::VARIANTS.iter().map(<&str>::from).collect();
let placeholders: String = (1..=cols.len())
.map(|i| format!("?{}", i))
.collect::<Vec<_>>()
.join(", ");
let sql = format!(
"INSERT INTO build_history ({}) VALUES ({})",
cols.join(", "),
placeholders,
);
let dur_ms = |d: Duration| d.as_millis() as i64;
conn.execute(
&sql,
params![
rec.timestamp,
rec.pkgpath,
rec.pkgname,
rec.pkgbase,
rec.outcome.db_id(),
rec.stage.map(|s| s as i32),
rec.make_jobs.map(|j| j as i64),
dur_ms(rec.duration),
rec.disk_usage.map(|s| s as i64),
rec.wrkobjdir.as_ref().map(|k| k.to_string()),
rec.build_id.as_deref(),
],
)?;
let history_id = conn.last_insert_rowid();
if !rec.stage_durations.is_empty() {
let mut stmt = conn.prepare_cached(
"INSERT INTO wall_times \
(history_id, stage, duration) \
VALUES (?1, ?2, ?3)",
)?;
for &(stage, duration) in &rec.stage_durations {
stmt.execute(params![history_id, stage as i32, dur_ms(duration)])?;
}
}
if !rec.stage_cpu_times.is_empty() {
let mut stmt = conn.prepare_cached(
"INSERT INTO cpu_times \
(history_id, stage, duration) \
VALUES (?1, ?2, ?3)",
)?;
for &(stage, duration) in &rec.stage_cpu_times {
stmt.execute(params![history_id, stage as i32, dur_ms(duration)])?;
}
}
Ok(())
}
pub(crate) struct SelectedPackage {
pub id: i64,
pub pkgname: PkgName,
pub pkg_location: String,
pub pbulk_weight: usize,
pub make_jobs_safe: bool,
}
pub(crate) fn query_selected_packages(conn: &Connection) -> Result<Vec<SelectedPackage>> {
let mut stmt = conn.prepare(
"SELECT p.id, p.pkgname, p.pkg_location, p.pbulk_weight, p.make_jobs_safe \
FROM scan_index p \
JOIN package_state s ON s.package_id = p.id \
WHERE s.selected = 1",
)?;
let rows = stmt.query_map([], |row| {
Ok(SelectedPackage {
id: row.get(0)?,
pkgname: PkgName::from(row.get::<_, String>(1)?),
pkg_location: row.get::<_, Option<String>>(2)?.unwrap_or_default(),
pbulk_weight: row
.get::<_, Option<String>>(3)?
.and_then(|s| s.parse().ok())
.unwrap_or(100),
make_jobs_safe: !matches!(
row.get::<_, Option<String>>(4)?.as_deref(),
Some(s) if s.eq_ignore_ascii_case("no")
),
})
})?;
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
}
pub(crate) fn query_resolved_deps(conn: &Connection) -> Result<Vec<(i64, i64)>> {
let mut stmt = conn.prepare("SELECT package_id, depends_on_id FROM resolved_depends")?;
let rows = stmt.query_map([], |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)))?;
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
}
pub(crate) struct BuildStageTiming {
pub cpu_ms: u64,
}
pub(crate) fn query_build_stage_timings(
conn: &Connection,
) -> HashMap<(String, String), BuildStageTiming> {
let out: &str = HistoryKind::Outcome.into();
let pkgbase_col: &str = HistoryKind::Pkgbase.into();
let pkgpath_col: &str = HistoryKind::Pkgpath.into();
let success_outcome = PackageStateKind::Success as i32;
let build_stage = Stage::Build as i32;
let sql = format!(
"WITH matched AS ( \
SELECT h.{pkgpath_col}, h.{pkgbase_col}, h.id, \
ROW_NUMBER() OVER ( \
PARTITION BY h.{pkgpath_col}, h.{pkgbase_col} \
ORDER BY h.id DESC \
) AS rn \
FROM build_history h \
WHERE h.{out} = {success_outcome} \
) \
SELECT m.{pkgpath_col}, m.{pkgbase_col}, ct.duration \
FROM matched m \
JOIN cpu_times ct ON ct.history_id = m.id \
AND ct.stage = {build_stage} \
WHERE m.rn = 1",
);
let mut stmt = match conn.prepare(&sql) {
Ok(s) => s,
Err(e) => {
warn!(
error = format!("{e:#}"),
"query_build_stage_timings: failed to prepare query"
);
return HashMap::new();
}
};
let rows = match stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, i64>(2)?,
))
}) {
Ok(r) => r,
Err(e) => {
warn!(
error = format!("{e:#}"),
"query_build_stage_timings: query failed"
);
return HashMap::new();
}
};
let mut result = HashMap::new();
for row in rows.flatten() {
let (pkgpath, pkgbase, cpu_ms) = row;
result.insert(
(pkgpath, pkgbase),
BuildStageTiming {
cpu_ms: cpu_ms as u64,
},
);
}
result
}
#[cfg(test)]
mod tests {
use super::*;
use crate::build::Stage;
fn test_history_db() -> Connection {
let conn = Connection::open_in_memory().expect("failed to open in-memory db");
create_history_schema(&conn).expect("failed to create schema");
conn
}
fn insert_build(conn: &Connection, pkgpath: &str, pkgname: &str, wall_ms: u64, cpu_ms: u64) {
use crate::{History, PackageState};
use std::time::Duration;
let rec = History {
timestamp: 0,
pkgpath: pkgpath.to_string(),
pkgname: pkgname.to_string(),
pkgbase: PkgName::new(pkgname).pkgbase().to_string(),
outcome: PackageState::Success,
stage: None,
make_jobs: Some(1),
duration: Duration::ZERO,
disk_usage: None,
wrkobjdir: None,
stage_durations: vec![(Stage::Build, Duration::from_millis(wall_ms))],
stage_cpu_times: vec![(Stage::Build, Duration::from_millis(cpu_ms))],
build_id: None,
};
record_history_to(conn, &rec).expect("failed to insert build");
}
fn key(pkgpath: &str, pkgbase: &str) -> (String, String) {
(pkgpath.to_string(), pkgbase.to_string())
}
#[test]
fn build_stage_timings_basic() {
let conn = test_history_db();
insert_build(&conn, "devel/cmake", "cmake-3.28.0", 60000, 180000);
let timings = query_build_stage_timings(&conn);
assert_eq!(timings.len(), 1);
let t = &timings[&key("devel/cmake", "cmake")];
assert_eq!(t.cpu_ms, 180000);
}
#[test]
fn build_stage_timings_latest() {
let conn = test_history_db();
insert_build(&conn, "devel/cmake", "cmake-3.27.0", 1000, 2000);
insert_build(&conn, "devel/cmake", "cmake-3.28.0", 5000, 15000);
let timings = query_build_stage_timings(&conn);
assert_eq!(timings.len(), 1);
let t = &timings[&key("devel/cmake", "cmake")];
assert_eq!(t.cpu_ms, 15000);
}
#[test]
fn build_stage_timings_pkgpath_disambiguates() {
let conn = test_history_db();
insert_build(
&conn,
"databases/mysql80-client",
"mysql-client-8.0.1",
3000,
9000,
);
insert_build(
&conn,
"databases/mysql57-client",
"mysql-client-5.7.42",
2000,
4000,
);
let timings = query_build_stage_timings(&conn);
assert_eq!(timings.len(), 2);
let t80 = &timings[&key("databases/mysql80-client", "mysql-client")];
assert_eq!(t80.cpu_ms, 9000);
let t57 = &timings[&key("databases/mysql57-client", "mysql-client")];
assert_eq!(t57.cpu_ms, 4000);
}
#[test]
fn build_stage_timings_empty() {
let conn = test_history_db();
let timings = query_build_stage_timings(&conn);
assert!(timings.is_empty());
}
}