use std::cell::OnceCell;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{Context, Result, anyhow};
use indexmap::IndexMap;
use pkgsrc::{AllDepends, BootstrapPkg, MakeJobsSafe, PkgName, PkgPath, ScanDepends, ScanIndex};
use rusqlite::{Connection, OptionalExtension, params};
use tracing::{debug, warn};
use strum::VariantArray;
use crate::build::{BuildResult, PkgBuildStats, Stage};
use crate::config::PkgsrcEnv;
use crate::scan::ScanResult;
use crate::try_println;
use crate::{HistoryKind, PackageState};
pub type ReportRow = (String, ScanIndex, Option<i32>);
pub type PkgKey = (String, String);
fn stage_values() -> String {
Stage::VARIANTS
.iter()
.map(|v| format!("({}, '{}')", *v as i32, v.into_str()))
.collect::<Vec<_>>()
.join(", ")
}
fn outcome_values() -> String {
PackageState::VARIANTS
.iter()
.filter(|k| **k != PackageState::Pending)
.map(|k| format!("({}, '{}')", k.id(), k.as_str()))
.collect::<Vec<_>>()
.join(", ")
}
fn history_schema() -> String {
use strum::VariantArray;
HistoryKind::VARIANTS
.iter()
.map(|v| {
let name: &str = v.into();
let sql = match v {
HistoryKind::Pkgpath | HistoryKind::Pkgname | HistoryKind::Pkgbase => {
"TEXT NOT NULL"
}
HistoryKind::Outcome => "INTEGER NOT NULL REFERENCES outcome_types(id)",
HistoryKind::Stage => "INTEGER REFERENCES stage_types(id)",
HistoryKind::Duration | HistoryKind::Timestamp => "INTEGER NOT NULL",
HistoryKind::MakeJobs | HistoryKind::DiskUsage => "INTEGER",
HistoryKind::Wrkobjdir | HistoryKind::BuildId => "TEXT",
};
format!("{name} {sql}")
})
.collect::<Vec<_>>()
.join(",\n ")
}
const SCHEMA_VERSION: i32 = 20260610;
const HISTORY_SCHEMA_VERSION: i32 = 20260610;
#[derive(Clone, Debug)]
pub struct PkgBuildHistory {
pub outcome: PackageState,
pub disk_usage: Option<u64>,
pub make_jobs: Option<u32>,
pub wrkobjdir: Option<String>,
}
#[derive(Clone, Debug, serde::Deserialize)]
pub struct PackageRow {
pub id: i64,
pub pkgname: String,
pub pkg_location: String,
pub multi_version: Option<String>,
}
#[derive(Clone, Debug, serde::Deserialize)]
pub struct PackageStatusRow {
pub id: i64,
pub pkgname: String,
pub pkg_location: String,
pub pkg_skip_reason: Option<String>,
pub pkg_fail_reason: Option<String>,
pub build_reason: Option<String>,
pub multi_version: Option<String>,
pub build_outcome: Option<i32>,
pub build_stage: Option<i32>,
pub scan_outcome: Option<i32>,
pub scan_outcome_detail: Option<String>,
pub total_pbulk_weight: i64,
pub dep_count: i64,
pub make_jobs_safe: Option<bool>,
}
fn build_result_from_row(row: &rusqlite::Row) -> rusqlite::Result<Option<BuildResult>> {
let Ok(state) = PackageState::try_from(row.get::<_, i32>("outcome")?) else {
return Ok(None);
};
Ok(Some(BuildResult {
pkgname: PkgName::new(&row.get::<_, String>("pkgname")?),
pkgpath: row
.get::<_, Option<String>>("pkgpath")?
.and_then(|p| PkgPath::new(&p).ok()),
state,
log_dir: row.get::<_, Option<String>>("log_dir")?.map(PathBuf::from),
build_stats: PkgBuildStats {
stage: row
.get::<_, Option<i32>>("stage")?
.and_then(Stage::from_repr),
duration: Duration::from_millis(row.get::<_, i64>("duration_ms")? as u64),
..PkgBuildStats::default()
},
}))
}
#[derive(Clone, Copy)]
pub enum ScanIndexFields {
Full,
Resolve,
}
pub struct Database {
conn: Connection,
dbdir: PathBuf,
history_conn: OnceCell<Connection>,
}
pub struct TransactionGuard<'a> {
conn: &'a Connection,
committed: bool,
}
impl<'a> TransactionGuard<'a> {
fn new(conn: &'a Connection) -> Result<Self> {
conn.execute("BEGIN TRANSACTION", [])?;
Ok(Self {
conn,
committed: false,
})
}
pub fn commit(mut self) -> Result<()> {
self.conn.execute("COMMIT", [])?;
self.committed = true;
Ok(())
}
}
impl Drop for TransactionGuard<'_> {
fn drop(&mut self) {
if !self.committed {
let _ = self.conn.execute("ROLLBACK", []);
}
}
}
impl Database {
pub fn open(dbdir: &Path) -> Result<Self> {
std::fs::create_dir_all(dbdir).context("Failed to create database directory")?;
let conn = Connection::open(dbdir.join("bob.db")).context("Failed to open database")?;
let db = Self {
conn,
dbdir: dbdir.to_path_buf(),
history_conn: OnceCell::new(),
};
db.configure_pragmas()?;
db.init()?;
Ok(db)
}
pub fn dbdir(&self) -> &Path {
&self.dbdir
}
pub(crate) fn conn(&self) -> &Connection {
&self.conn
}
fn query_rows<T, P>(&self, sql: &str, params: P) -> Result<Vec<T>>
where
T: for<'de> serde::Deserialize<'de>,
P: rusqlite::Params,
{
let mut stmt = self.conn.prepare(sql)?;
let rows = stmt.query(params)?;
Ok(serde_rusqlite::from_rows::<T>(rows).collect::<Result<_, _>>()?)
}
fn query_one<T, P>(&self, sql: &str, params: P) -> Result<Option<T>>
where
T: for<'de> serde::Deserialize<'de>,
P: rusqlite::Params,
{
let mut stmt = self.conn.prepare(sql)?;
let rows = stmt.query(params)?;
Ok(serde_rusqlite::from_rows::<T>(rows).next().transpose()?)
}
pub fn transaction(&self) -> Result<TransactionGuard<'_>> {
TransactionGuard::new(&self.conn)
}
fn configure_pragmas(&self) -> Result<()> {
self.conn.execute_batch(
"PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA temp_store = MEMORY;
PRAGMA foreign_keys = ON;",
)?;
Ok(())
}
fn init(&self) -> Result<()> {
let has_schema_version: bool = self.conn.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_version'",
[],
|row| row.get::<_, i32>(0).map(|c| c > 0),
)?;
if !has_schema_version {
self.create_schema()?;
} else {
let version: i32 =
self.conn
.query_row("SELECT version FROM schema_version LIMIT 1", [], |row| {
row.get(0)
})?;
if version != SCHEMA_VERSION {
anyhow::bail!(
"Schema mismatch: found v{}, expected v{}. \
Run 'bob clean' to restart.",
version,
SCHEMA_VERSION
);
}
}
check_history_schema(&self.dbdir)?;
Ok(())
}
fn create_schema(&self) -> Result<()> {
self.conn.execute_batch(&format!(
"CREATE TABLE schema_version (version INTEGER NOT NULL) STRICT;
INSERT INTO schema_version (version) VALUES ({});
CREATE TABLE outcome_types (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL
) STRICT;
INSERT INTO outcome_types (id, name) VALUES {outcome_types};
CREATE TABLE stage_types (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL
) STRICT;
INSERT INTO stage_types (id, name) VALUES {stages};
CREATE TABLE scan_index (
id INTEGER PRIMARY KEY AUTOINCREMENT,
pkgname TEXT UNIQUE NOT NULL,
pkg_location TEXT NOT NULL,
all_depends TEXT,
pkg_skip_reason TEXT,
pkg_fail_reason TEXT,
no_bin_on_ftp TEXT,
restricted TEXT,
categories TEXT,
maintainer TEXT,
use_destdir TEXT,
bootstrap_pkg INTEGER,
usergroup_phase TEXT,
scan_depends TEXT,
make_jobs_safe INTEGER,
pbulk_weight INTEGER,
multi_version TEXT
) STRICT;
CREATE INDEX idx_scan_index_pkg_location ON scan_index(pkg_location);
CREATE TABLE package_state (
package_id INTEGER PRIMARY KEY
REFERENCES scan_index(id) ON DELETE CASCADE,
selected INTEGER NOT NULL DEFAULT 0,
build_reason TEXT,
total_pbulk_weight INTEGER NOT NULL DEFAULT 0,
dep_count INTEGER NOT NULL DEFAULT 0
) STRICT;
CREATE TABLE resolved_depends (
package_id INTEGER NOT NULL REFERENCES scan_index(id) ON DELETE CASCADE,
depends_on_id INTEGER NOT NULL REFERENCES scan_index(id) ON DELETE CASCADE,
PRIMARY KEY (package_id, depends_on_id)
) STRICT, WITHOUT ROWID;
CREATE INDEX idx_resolved_depends_depends_on ON resolved_depends(depends_on_id);
CREATE TABLE builds (
id INTEGER PRIMARY KEY AUTOINCREMENT,
package_id INTEGER NOT NULL REFERENCES scan_index(id) ON DELETE CASCADE,
outcome INTEGER NOT NULL REFERENCES outcome_types(id),
stage INTEGER REFERENCES stage_types(id),
duration_ms INTEGER NOT NULL DEFAULT 0,
log_dir TEXT,
UNIQUE(package_id)
) STRICT;
CREATE INDEX idx_builds_outcome ON builds(outcome);
CREATE INDEX idx_builds_package ON builds(package_id);
CREATE TABLE scan_outcomes (
package_id INTEGER PRIMARY KEY
REFERENCES scan_index(id) ON DELETE CASCADE,
outcome INTEGER NOT NULL REFERENCES outcome_types(id),
detail TEXT
) STRICT;
CREATE INDEX idx_scan_outcomes_outcome ON scan_outcomes(outcome);
CREATE VIEW buildable AS
SELECT p.*
FROM scan_index p
JOIN package_state s
ON s.package_id = p.id AND s.selected = 1
WHERE NOT EXISTS (
SELECT 1 FROM scan_outcomes o WHERE o.package_id = p.id
);
CREATE TABLE scan_failures (
pkgpath TEXT PRIMARY KEY,
error TEXT NOT NULL
) STRICT;
CREATE TABLE metadata (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
) STRICT;",
SCHEMA_VERSION,
outcome_types = outcome_values(),
stages = stage_values(),
))?;
let build_id = chrono::Utc::now()
.format(crate::BUILD_ID_FORMAT)
.to_string();
self.conn.execute(
"INSERT INTO metadata (key, value) VALUES ('build_id', ?1)",
params![build_id],
)?;
debug!(version = SCHEMA_VERSION, build_id = %build_id, "Created schema");
Ok(())
}
pub fn store_package(&self, pkgpath: &str, index: &ScanIndex) -> Result<()> {
let pkgname = index.pkgname.pkgname();
let skip_reason = index.pkg_skip_reason.as_deref().filter(|s| !s.is_empty());
let fail_reason = index.pkg_fail_reason.as_deref().filter(|s| !s.is_empty());
let all_depends = index.all_depends.as_ref().map(|d| d.as_str());
let multi_version = index.multi_version.as_ref().map(|v| v.join(" "));
let mut stmt = self.conn.prepare_cached(
"INSERT INTO scan_index
(pkgname, pkg_location, all_depends,
pkg_skip_reason, pkg_fail_reason, no_bin_on_ftp,
restricted, categories, maintainer, use_destdir,
bootstrap_pkg, usergroup_phase, scan_depends,
make_jobs_safe, pbulk_weight, multi_version)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10,
?11, ?12, ?13, ?14, ?15, ?16)
ON CONFLICT(pkgname) DO NOTHING",
)?;
let inserted = stmt.execute(params![
pkgname,
pkgpath,
all_depends,
skip_reason,
fail_reason,
index.no_bin_on_ftp,
index.restricted,
index.categories,
index.maintainer,
index.use_destdir,
index.bootstrap_pkg.as_ref().map(BootstrapPkg::is_bootstrap),
index.usergroup_phase,
index.scan_depends.as_ref().map(|d| d.as_str()),
index.make_jobs_safe.as_ref().map(MakeJobsSafe::is_safe),
index.pbulk_weight,
multi_version,
])?;
drop(stmt);
if inserted == 0 {
debug!(pkgname = pkgname, "Skipping duplicate pkgname");
return Ok(());
}
let package_id = self.conn.last_insert_rowid();
self.conn
.prepare_cached("INSERT OR IGNORE INTO package_state (package_id) VALUES (?1)")?
.execute(params![package_id])?;
debug!(pkgname = pkgname, package_id = package_id, "Stored package");
Ok(())
}
pub fn store_scan_pkgpath(&self, pkgpath: &str, indexes: &[ScanIndex]) -> Result<()> {
let tx = self.transaction()?;
for index in indexes {
self.store_package(pkgpath, index)?;
}
tx.commit()?;
Ok(())
}
pub fn get_package_by_name(&self, pkgname: &str) -> Result<Option<PackageRow>> {
self.query_one("SELECT * FROM scan_index WHERE pkgname = ?1", [pkgname])
}
pub fn get_pkgname(&self, package_id: i64) -> Result<String> {
self.query_one("SELECT pkgname FROM scan_index WHERE id = ?1", [package_id])?
.context("Package not found")
}
pub fn get_packages_by_path(&self, pkgpath: &str) -> Result<Vec<PackageRow>> {
self.query_rows(
"SELECT * FROM scan_index WHERE pkg_location = ?1",
[pkgpath],
)
}
pub fn get_scanned_pkgpaths(&self) -> Result<HashSet<String>> {
Ok(self
.query_rows("SELECT DISTINCT pkg_location FROM scan_index", [])?
.into_iter()
.collect())
}
pub fn get_unscanned_dependencies(&self) -> Result<HashSet<String>> {
let scanned = self.get_scanned_pkgpaths()?;
let mut stmt = self
.conn
.prepare("SELECT all_depends FROM scan_index WHERE all_depends IS NOT NULL")?;
let mut rows = stmt.query([])?;
let mut unscanned = HashSet::new();
while let Some(row) = rows.next()? {
let raw: String = row.get(0)?;
let deps = AllDepends::from(raw.as_str());
for entry in deps.iter().flatten() {
let pkgpath: String = entry.pkgpath().to_string();
if !scanned.contains(&pkgpath) {
unscanned.insert(pkgpath);
}
}
}
Ok(unscanned)
}
pub fn count_packages(&self) -> Result<i64> {
self.conn
.query_row("SELECT COUNT(*) FROM scan_index", [], |row| row.get(0))
.context("Failed to count packages")
}
pub fn get_all_packages(&self) -> Result<Vec<PackageRow>> {
self.query_rows("SELECT * FROM scan_index ORDER BY id", [])
}
pub fn get_all_package_status(&self) -> Result<Vec<PackageStatusRow>> {
self.query_rows(
"SELECT p.id, p.pkgname, p.pkg_location, p.make_jobs_safe,
p.pkg_skip_reason, p.pkg_fail_reason, p.multi_version,
s.build_reason, s.total_pbulk_weight, s.dep_count,
b.outcome AS build_outcome, b.stage AS build_stage,
o.outcome AS scan_outcome, o.detail AS scan_outcome_detail
FROM scan_index p
JOIN package_state s ON s.package_id = p.id
LEFT JOIN builds b ON b.package_id = p.id
LEFT JOIN scan_outcomes o ON o.package_id = p.id
WHERE s.selected = 1",
[],
)
}
fn scan_index_from_row(
row: &rusqlite::Row,
fields: ScanIndexFields,
) -> rusqlite::Result<ScanIndex> {
let mut index = ScanIndex {
pkgname: row.get::<_, String>("pkgname")?.into(),
pkg_location: row
.get::<_, Option<String>>("pkg_location")?
.and_then(|s| s.parse().ok()),
all_depends: row
.get::<_, Option<String>>("all_depends")?
.map(|s| AllDepends::from(s.as_str())),
pkg_skip_reason: row.get("pkg_skip_reason")?,
pkg_fail_reason: row.get("pkg_fail_reason")?,
..Default::default()
};
match fields {
ScanIndexFields::Resolve => return Ok(index),
ScanIndexFields::Full => {}
}
index.no_bin_on_ftp = row.get("no_bin_on_ftp")?;
index.restricted = row.get("restricted")?;
index.categories = row.get("categories")?;
index.maintainer = row.get("maintainer")?;
index.use_destdir = row.get("use_destdir")?;
index.bootstrap_pkg = row
.get::<_, Option<bool>>("bootstrap_pkg")?
.map(BootstrapPkg::from);
index.usergroup_phase = row.get("usergroup_phase")?;
index.scan_depends = row
.get::<_, Option<String>>("scan_depends")?
.map(|s| ScanDepends::from(s.as_str()));
index.make_jobs_safe = row
.get::<_, Option<bool>>("make_jobs_safe")?
.map(MakeJobsSafe::from);
index.pbulk_weight = row.get("pbulk_weight")?;
index.multi_version = row
.get::<_, Option<String>>("multi_version")?
.map(|s| s.split_ascii_whitespace().map(str::to_string).collect());
Ok(index)
}
pub fn get_full_scan_index(&self, package_id: i64) -> Result<ScanIndex> {
let mut stmt = self
.conn
.prepare_cached("SELECT * FROM scan_index WHERE id = ?1")?;
let mut rows = stmt.query([package_id])?;
let row = rows
.next()?
.ok_or_else(|| anyhow::anyhow!("Package {package_id} not found"))?;
Self::scan_index_from_row(row, ScanIndexFields::Full).map_err(Into::into)
}
pub fn with_scan_data<F, R>(&self, fields: ScanIndexFields, f: F) -> Result<R>
where
F: FnOnce(&mut dyn FnMut() -> Result<Option<ScanIndex>>) -> Result<R>,
{
let mut stmt = self.conn.prepare("SELECT * FROM scan_index ORDER BY id")?;
let mut rows = stmt.query([])?;
let mut pull = || -> Result<Option<ScanIndex>> {
let Some(row) = rows.next()? else {
return Ok(None);
};
let id: i64 = row.get("id")?;
let index = Self::scan_index_from_row(row, fields)
.with_context(|| format!("Failed to read package {id}"))?;
Ok(Some(index))
};
f(&mut pull)
}
pub fn clear_scan(&self) -> Result<()> {
self.conn.execute("DELETE FROM scan_index", [])?;
self.clear_full_scan_complete()?;
Ok(())
}
pub fn store_build_reason(&self, package_id: i64, reason: &str) -> Result<()> {
self.conn.execute(
"UPDATE package_state SET build_reason = ?1 WHERE package_id = ?2",
params![reason, package_id],
)?;
Ok(())
}
pub fn clear_build_reasons(&self) -> Result<()> {
self.conn
.execute("UPDATE package_state SET build_reason = NULL", [])?;
Ok(())
}
pub fn store_resolution(&self, summary: &crate::scan::ScanSummary) -> Result<()> {
let id_map: HashMap<String, i64> = self
.conn
.prepare("SELECT pkgname, id FROM scan_index")?
.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<rusqlite::Result<_>>()?;
let tx = self.transaction()?;
self.conn
.execute("UPDATE package_state SET selected = 0", [])?;
self.conn.execute("DELETE FROM scan_failures", [])?;
let mut select_stmt = self
.conn
.prepare("UPDATE package_state SET selected = 1 WHERE package_id = ?1")?;
let mut dep_stmt = self.conn.prepare(
"INSERT OR IGNORE INTO resolved_depends (package_id, depends_on_id) VALUES (?1, ?2)",
)?;
let mut skip_stmt = self.conn.prepare(
"INSERT OR REPLACE INTO scan_outcomes (package_id, outcome, detail)
VALUES (?1, ?2, ?3)",
)?;
let mut fail_stmt = self
.conn
.prepare("INSERT INTO scan_failures (pkgpath, error) VALUES (?1, ?2)")?;
let mut count: usize = 0;
for pkg in &summary.packages {
if let ScanResult::ScanFail { pkgpath, error } = pkg {
fail_stmt.execute(params![pkgpath.as_path().display().to_string(), error])?;
continue;
}
let Some(pkgname) = pkg.pkgname() else {
continue;
};
let Some(&pkg_id) = id_map.get(pkgname.pkgname()) else {
continue;
};
select_stmt.execute([pkg_id])?;
for dep in pkg.depends() {
if let Some(&dep_id) = id_map.get(dep.pkgname()) {
dep_stmt.execute(params![pkg_id, dep_id])?;
count += 1;
}
}
if let ScanResult::Skipped { state, reason, .. } = pkg {
skip_stmt.execute(params![pkg_id, state.id(), reason])?;
}
}
drop(select_stmt);
drop(dep_stmt);
drop(skip_stmt);
drop(fail_stmt);
tx.commit()?;
if count > 0 {
debug!(count, "Stored resolved dependencies");
}
Ok(())
}
pub fn get_scan_failures(&self) -> Result<Vec<(String, String)>> {
self.query_rows(
"SELECT pkgpath, error FROM scan_failures ORDER BY pkgpath",
[],
)
}
pub fn store_pbulk_weights(&self) -> Result<()> {
let selected = query_selected_packages(&self.conn)?;
let by_id: HashMap<i64, usize> = selected
.iter()
.enumerate()
.map(|(i, p)| (p.id, i))
.collect();
let weights: Vec<usize> = selected.iter().map(|p| p.pbulk_weight).collect();
let mut rdeps: Vec<Vec<usize>> = vec![Vec::new(); selected.len()];
for (pkg, dep) in query_resolved_deps(&self.conn)? {
if let (Some(&p), Some(&d)) = (by_id.get(&pkg), by_id.get(&dep)) {
rdeps[d].push(p);
}
}
drop(by_id);
let (total_weights, dep_counts) =
crate::scheduler::compute_total_pbulk_weights_by_position(&weights, &rdeps);
let tx = self.transaction()?;
let mut stmt = self.conn.prepare(
"UPDATE package_state SET total_pbulk_weight = ?2, dep_count = ?3 \
WHERE package_id = ?1",
)?;
for (i, pkg) in selected.iter().enumerate() {
stmt.execute(params![
pkg.id,
total_weights[i] as i64,
dep_counts[i] as i64
])?;
}
drop(stmt);
tx.commit()
}
pub fn get_transitive_reverse_deps(&self, package_id: i64) -> Result<Vec<i64>> {
self.query_rows(
"WITH RECURSIVE affected(id) AS (
SELECT ?1
UNION
SELECT rd.package_id
FROM resolved_depends rd
JOIN affected a ON rd.depends_on_id = a.id
)
SELECT id FROM affected WHERE id != ?1",
[package_id],
)
}
pub fn clear_resolved_depends(&self) -> Result<()> {
self.conn.execute("DELETE FROM resolved_depends", [])?;
Ok(())
}
pub fn get_all_resolved_deps(&self) -> Result<HashMap<PkgName, Vec<PkgName>>> {
let mut stmt = self.conn.prepare(
"SELECT p1.pkgname, p2.pkgname
FROM resolved_depends rd
JOIN scan_index p1 ON rd.package_id = p1.id
JOIN scan_index p2 ON rd.depends_on_id = p2.id
ORDER BY p1.pkgname, p2.pkgname",
)?;
let rows = stmt.query_map([], |row| {
Ok((
PkgName::from(row.get::<_, String>(0)?),
PkgName::from(row.get::<_, String>(1)?),
))
})?;
let mut deps: HashMap<PkgName, Vec<PkgName>> = HashMap::new();
for row in rows {
let (pkg, dep) = row?;
deps.entry(pkg).or_default().push(dep);
}
Ok(deps)
}
pub fn dep_counts(&self) -> Result<HashMap<String, usize>> {
let rows: Vec<(String, i64)> = self.query_rows(
"SELECT p.pkgname, s.dep_count
FROM scan_index p
JOIN package_state s ON s.package_id = p.id
WHERE s.selected = 1",
[],
)?;
Ok(rows.into_iter().map(|(p, n)| (p, n as usize)).collect())
}
pub fn get_buildable_packages(
&self,
) -> Result<IndexMap<PkgName, crate::scan::ResolvedPackage>> {
let mut stmt = self.conn.prepare(
"SELECT pkgname, pkg_location, bootstrap_pkg, usergroup_phase, multi_version
FROM buildable
ORDER BY id",
)?;
let mut out = IndexMap::new();
let mut rows = stmt.query([])?;
while let Some(row) = rows.next()? {
let pkgname: String = row.get("pkgname")?;
let pkg_location: String = row.get("pkg_location")?;
let multi_version = row
.get::<_, Option<String>>("multi_version")?
.map(|s| s.split_ascii_whitespace().map(str::to_string).collect());
out.insert(
pkgname.clone().into(),
crate::scan::ResolvedPackage {
pkgpath: pkg_location.parse()?,
index: ScanIndex {
pkgname: pkgname.into(),
bootstrap_pkg: row
.get::<_, Option<bool>>("bootstrap_pkg")?
.map(BootstrapPkg::from),
usergroup_phase: row.get("usergroup_phase")?,
multi_version,
..Default::default()
},
},
);
}
Ok(out)
}
pub fn get_buildable_pkgpaths(&self) -> Result<HashMap<PkgName, PkgPath>> {
let rows: Vec<(String, String)> =
self.query_rows("SELECT pkgname, pkg_location FROM buildable", [])?;
rows.into_iter()
.map(|(pkgname, pkg_location)| Ok((pkgname.into(), pkg_location.parse()?)))
.collect()
}
pub fn get_buildable_rows(&self) -> Result<Vec<(i64, String, String)>> {
self.query_rows(
"SELECT p.id, p.pkgname, p.pkg_location
FROM buildable p
JOIN package_state s ON s.package_id = p.id
ORDER BY s.dep_count DESC, p.pkgname",
[],
)
}
pub fn get_buildable_depends(&self) -> Result<Vec<(i64, i64)>> {
self.query_rows(
"SELECT rd.package_id, rd.depends_on_id
FROM resolved_depends rd
JOIN buildable p ON p.id = rd.package_id
JOIN buildable d ON d.id = rd.depends_on_id
JOIN package_state s ON s.package_id = rd.depends_on_id
ORDER BY rd.package_id, s.dep_count DESC, d.pkgname",
[],
)
}
pub fn store_build_result(&self, package_id: i64, result: &BuildResult) -> Result<()> {
let outcome = result.state.id();
let stage = result.build_stats.stage.map(|s| s as i32);
let duration_ms = result.build_stats.duration.as_millis() as i64;
let log_dir = result.log_dir.as_ref().map(|p| p.display().to_string());
self.conn
.prepare_cached(
"INSERT OR REPLACE INTO builds
(package_id, outcome, stage, duration_ms, log_dir)
VALUES (?1, ?2, ?3, ?4, ?5)",
)?
.execute(params![package_id, outcome, stage, duration_ms, log_dir])?;
debug!(
package_id = package_id,
outcome = outcome,
"Stored build result"
);
Ok(())
}
pub fn is_successful(&self, package_id: i64) -> Result<bool> {
Ok(self.conn.query_row(
"SELECT EXISTS(
SELECT 1 FROM builds WHERE package_id = ?1 AND outcome = ?2)",
params![package_id, PackageState::Success.id()],
|row| row.get::<_, i64>(0),
)? != 0)
}
pub fn store_build_by_name(&self, result: &BuildResult) -> Result<()> {
let id: i64 = self
.conn
.query_row(
"SELECT id FROM scan_index WHERE pkgname = ?1",
[result.pkgname.pkgname()],
|row| row.get(0),
)
.optional()?
.with_context(|| {
format!(
"build result for package not in scan_index: {}",
result.pkgname.pkgname()
)
})?;
self.store_build_result(id, result)
}
pub fn get_build_result(&self, package_id: i64) -> Result<Option<BuildResult>> {
let mut stmt = self.conn.prepare(
"SELECT p.pkgname, p.pkg_location AS pkgpath,
b.outcome,
b.stage, b.duration_ms, b.log_dir
FROM builds b
JOIN scan_index p ON b.package_id = p.id
WHERE b.package_id = ?1",
)?;
let mut rows = stmt.query([package_id])?;
match rows.next()? {
Some(row) => Ok(build_result_from_row(row)?),
None => Ok(None),
}
}
pub fn delete_build_by_name(&self, pkgname: &str) -> Result<bool> {
let rows = self.conn.execute(
"DELETE FROM builds WHERE package_id IN (SELECT id FROM scan_index WHERE pkgname = ?1)",
params![pkgname],
)?;
Ok(rows > 0)
}
pub fn delete_build_by_pkgpath(&self, pkgpath: &str) -> Result<usize> {
let rows = self.conn.execute(
"DELETE FROM builds WHERE package_id IN (SELECT id FROM scan_index WHERE pkg_location = ?1)",
params![pkgpath],
)?;
Ok(rows)
}
pub fn clear_builds(&self) -> Result<usize> {
let rows = self.conn.execute("DELETE FROM builds", [])?;
Ok(rows)
}
pub fn get_all_build_results(&self) -> Result<Vec<BuildResult>> {
let mut stmt = self.conn.prepare(
"SELECT p.pkgname, p.pkg_location AS pkgpath,
b.outcome,
b.stage, b.duration_ms, b.log_dir
FROM builds b
JOIN scan_index p ON b.package_id = p.id
ORDER BY p.pkgname",
)?;
let mut rows = stmt.query([])?;
let mut results = Vec::new();
while let Some(row) = rows.next()? {
if let Some(r) = build_result_from_row(row)? {
results.push(r);
}
}
Ok(results)
}
pub fn add_build_duration(&self, duration: Duration) -> Result<()> {
let build_id = self.build_id()?;
let conn = self.history_conn()?;
conn.execute(
"INSERT INTO build_metadata (build_id, duration_ms) VALUES (?1, ?2) \
ON CONFLICT(build_id) DO UPDATE SET duration_ms = duration_ms + excluded.duration_ms",
params![build_id, duration.as_millis() as i64],
)?;
Ok(())
}
pub fn get_build_duration(&self) -> Result<Duration> {
let build_id = self.build_id()?;
let conn = self.history_conn()?;
let ms: i64 = conn.query_row(
"SELECT COALESCE( \
(SELECT duration_ms FROM build_metadata WHERE build_id = ?1), \
0 \
)",
params![build_id],
|row| row.get(0),
)?;
Ok(Duration::from_millis(ms as u64))
}
pub fn get_blockers(&self, package_id: i64) -> Result<Vec<(String, String, String)>> {
let rows: Vec<(String, String, i32)> = self.query_rows(
"WITH RECURSIVE
blocking(id, outcome) AS (
-- Direct dependencies that have failed/skipped builds
SELECT rd.depends_on_id, b.outcome
FROM resolved_depends rd
JOIN builds b ON b.package_id = rd.depends_on_id
WHERE rd.package_id = ?1
AND b.outcome NOT IN (?2, ?3)
UNION
-- Transitive: deps of deps that are blocked
SELECT rd.depends_on_id, b.outcome
FROM resolved_depends rd
JOIN blocking bl ON rd.package_id = bl.id
JOIN builds b ON b.package_id = rd.depends_on_id
WHERE b.outcome NOT IN (?2, ?3)
)
SELECT DISTINCT p.pkgname, p.pkg_location, bl.outcome
FROM blocking bl
JOIN scan_index p ON bl.id = p.id
-- Only show root causes (failed or prefailed/preskipped), not indirect
WHERE bl.outcome IN (?4, ?5, ?6, ?7)
ORDER BY p.pkgname",
params![
package_id,
PackageState::Success.id(),
PackageState::UpToDate.id(),
PackageState::Failed.id(),
PackageState::PreFailed.id(),
PackageState::PreSkipped.id(),
PackageState::Unresolved.id(),
],
)?;
rows.into_iter()
.map(|(pkgname, pkgpath, outcome_id)| {
let kind = PackageState::try_from(outcome_id)
.map_err(|_| anyhow!("unknown outcome type id: {}", outcome_id))?;
Ok((pkgname, pkgpath, kind.as_str().to_string()))
})
.collect()
}
pub fn get_blocked_by(&self, package_id: i64) -> Result<Vec<(String, String)>> {
self.query_rows(
"WITH RECURSIVE
affected(id) AS (
-- Direct reverse dependencies
SELECT rd.package_id
FROM resolved_depends rd
WHERE rd.depends_on_id = ?1
UNION
-- Transitive reverse dependencies
SELECT rd.package_id
FROM resolved_depends rd
JOIN affected a ON rd.depends_on_id = a.id
)
SELECT p.pkgname, p.pkg_location
FROM affected a
JOIN scan_index p ON a.id = p.id
ORDER BY p.pkgname",
[package_id],
)
}
pub fn get_scan_outcomes(&self) -> Result<Vec<BuildResult>> {
let rows: Vec<(String, Option<String>, i32)> = self.query_rows(
"SELECT p.pkgname, p.pkg_location, o.outcome
FROM scan_outcomes o
JOIN scan_index p ON o.package_id = p.id
ORDER BY p.pkgname",
[],
)?;
rows.into_iter()
.map(|(pkgname, pkg_location, outcome)| {
let state = PackageState::try_from(outcome)
.map_err(|_| anyhow!("unknown outcome type id: {}", outcome))?;
Ok(BuildResult {
pkgname: PkgName::new(&pkgname),
pkgpath: pkg_location.and_then(|p| PkgPath::new(&p).ok()),
state,
log_dir: None,
build_stats: PkgBuildStats::default(),
})
})
.collect()
}
pub fn get_unresolved_reasons(&self) -> Result<Vec<(PkgPath, String)>> {
let rows: Vec<(String, String)> = self.query_rows(
"SELECT p.pkg_location, o.detail
FROM scan_outcomes o
JOIN scan_index p ON o.package_id = p.id
WHERE o.outcome = ?1 AND o.detail IS NOT NULL
ORDER BY p.pkgname",
[PackageState::Unresolved.id()],
)?;
rows.into_iter()
.map(|(location, reason)| Ok((PkgPath::new(&location)?, reason)))
.collect()
}
pub fn blockers(&self) -> Result<HashMap<String, Vec<String>>> {
let mut stmt = self.conn.prepare(
"WITH RECURSIVE
blockers(id) AS (
SELECT package_id FROM builds WHERE outcome = ?1
UNION
SELECT package_id FROM scan_outcomes WHERE outcome IN (?2, ?3, ?4)
),
affected(id, blocker_id) AS (
SELECT id, id FROM blockers
UNION
SELECT rd.package_id, a.blocker_id
FROM resolved_depends rd
JOIN affected a ON rd.depends_on_id = a.id
WHERE rd.package_id NOT IN (SELECT id FROM blockers)
),
impact(blocker_id, n) AS (
SELECT blocker_id, COUNT(*) FROM affected WHERE id != blocker_id GROUP BY blocker_id
)
SELECT p.pkgname, bp.pkgname
FROM affected a
JOIN scan_index p ON a.id = p.id
JOIN scan_index bp ON a.blocker_id = bp.id
JOIN impact i ON i.blocker_id = a.blocker_id
WHERE a.id != a.blocker_id
ORDER BY i.n DESC, bp.pkgname",
)?;
let rows = stmt.query_map(
params![
PackageState::Failed.id(),
PackageState::PreSkipped.id(),
PackageState::PreFailed.id(),
PackageState::Unresolved.id(),
],
|row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)),
)?;
let mut map: HashMap<String, Vec<String>> = HashMap::new();
for row in rows {
let (pkgname, blocker) = row?;
map.entry(pkgname).or_default().push(blocker);
}
Ok(map)
}
pub fn build_id(&self) -> Result<String> {
self.conn
.query_row(
"SELECT value FROM metadata WHERE key = 'build_id'",
[],
|row| row.get(0),
)
.context("build_id not found in database")
}
pub fn full_scan_complete(&self) -> bool {
self.conn
.query_row(
"SELECT value FROM metadata WHERE key = 'full_scan_complete'",
[],
|row| row.get::<_, String>(0),
)
.map(|v| v == "true")
.unwrap_or(false)
}
pub fn set_full_scan_complete(&self) -> Result<()> {
self.conn.execute(
"INSERT OR REPLACE INTO metadata (key, value) VALUES ('full_scan_complete', 'true')",
[],
)?;
Ok(())
}
pub fn clear_full_scan_complete(&self) -> Result<()> {
self.conn
.execute("DELETE FROM metadata WHERE key = 'full_scan_complete'", [])?;
Ok(())
}
pub fn store_pkgsrc_env(&self, env: &PkgsrcEnv) -> Result<()> {
let json = serde_json::json!({
"packages": env.packages,
"pkgtools": env.pkgtools,
"prefix": env.prefix,
"pkg_dbdir": env.pkg_dbdir,
"pkg_refcount_dbdir": env.pkg_refcount_dbdir,
"metadata": env.metadata,
"cachevars": env.cachevars,
});
self.conn.execute(
"INSERT INTO metadata (key, value) VALUES ('pkgsrc_env', ?1)",
params![json.to_string()],
)?;
Ok(())
}
pub fn load_pkgsrc_env(&self) -> Result<PkgsrcEnv> {
let json_str: String = self
.conn
.query_row(
"SELECT value FROM metadata WHERE key = 'pkgsrc_env'",
[],
|row| row.get(0),
)
.context("pkgsrc environment not found in database")?;
let json: serde_json::Value =
serde_json::from_str(&json_str).context("Invalid pkgsrc_env JSON")?;
let get_path = |key: &str| -> Result<PathBuf> {
json.get(key)
.and_then(|v| v.as_str())
.map(PathBuf::from)
.ok_or_else(|| anyhow::anyhow!("Missing {} in pkgsrc_env", key))
};
let metadata: HashMap<String, String> = json
.get("metadata")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
let cachevars: HashMap<String, String> = json
.get("cachevars")
.and_then(|v| serde_json::from_value(v.clone()).ok())
.unwrap_or_default();
Ok(PkgsrcEnv {
packages: get_path("packages")?,
pkgtools: get_path("pkgtools")?,
prefix: get_path("prefix")?,
pkg_dbdir: get_path("pkg_dbdir")?,
pkg_refcount_dbdir: get_path("pkg_refcount_dbdir")?,
metadata,
cachevars,
})
}
pub fn store_vcs_info(&self, info: &crate::vcs::VcsInfo) -> Result<()> {
let json = serde_json::to_string(info)?;
self.conn.execute(
"INSERT OR REPLACE INTO metadata (key, value) VALUES ('vcs_info', ?1)",
params![json],
)?;
Ok(())
}
pub fn load_vcs_info(&self) -> Result<crate::vcs::VcsInfo> {
let json_str: String = self
.conn
.query_row(
"SELECT value FROM metadata WHERE key = 'vcs_info'",
[],
|row| row.get(0),
)
.context("vcs_info not found in database")?;
serde_json::from_str(&json_str).context("Invalid vcs_info JSON")
}
pub fn get_failed_packages(&self) -> Result<Vec<String>> {
self.query_rows(
"SELECT p.pkgname FROM builds b
JOIN scan_index p ON b.package_id = p.id
WHERE b.outcome = ?1
ORDER BY p.pkgname",
[PackageState::Failed.id()],
)
}
pub fn get_successful_packages(&self) -> Result<Vec<String>> {
self.query_rows(
"SELECT p.pkgname FROM builds b
JOIN scan_index p ON b.package_id = p.id
WHERE b.outcome IN (?1, ?2)
ORDER BY p.pkgname",
params![PackageState::Success.id(), PackageState::UpToDate.id()],
)
}
pub fn get_restricted_packages(&self) -> Result<HashMap<String, String>> {
let rows: Vec<(String, String)> = self.query_rows(
"SELECT pkgname, no_bin_on_ftp FROM scan_index
WHERE no_bin_on_ftp IS NOT NULL AND no_bin_on_ftp != ''",
[],
)?;
Ok(rows.into_iter().collect())
}
pub fn get_report_data(&self) -> Result<Vec<ReportRow>> {
let resolved = self.get_all_resolved_deps()?;
let mut stmt = self.conn.prepare(
"SELECT p.*,
COALESCE(b.outcome, o.outcome) AS outcome
FROM scan_index p
LEFT JOIN builds b ON b.package_id = p.id
LEFT JOIN scan_outcomes o ON o.package_id = p.id
ORDER BY p.pkgname",
)?;
let mut rows = stmt.query([])?;
let mut out = Vec::new();
while let Some(row) = rows.next()? {
let pkgname: String = row.get("pkgname")?;
let outcome: Option<i32> = row.get("outcome")?;
let mut index = Self::scan_index_from_row(row, ScanIndexFields::Full)?;
if let Some(deps) = resolved.get(pkgname.as_str()) {
index.resolved_depends = Some(deps.clone());
}
out.push((pkgname, index, outcome));
}
Ok(out)
}
pub fn execute_raw(&self, sql: &str) -> Result<()> {
let mut stmt = self.conn.prepare(sql)?;
let column_count = stmt.column_count();
if column_count == 0 {
let affected = stmt.execute([])?;
if affected > 0 {
println!("{} row(s) affected", affected);
}
} else {
let mut rows = stmt.query([])?;
while let Some(row) = rows.next()? {
let values: Vec<String> = (0..column_count)
.map(|i| {
row.get_ref(i)
.map(|v| match v {
rusqlite::types::ValueRef::Null => String::new(),
rusqlite::types::ValueRef::Integer(i) => i.to_string(),
rusqlite::types::ValueRef::Real(f) => f.to_string(),
rusqlite::types::ValueRef::Text(s) => {
String::from_utf8_lossy(s).to_string()
}
rusqlite::types::ValueRef::Blob(b) => {
format!("<blob:{} bytes>", b.len())
}
})
.unwrap_or_default()
})
.collect();
if !try_println(&values.join("|")) {
break;
}
}
}
Ok(())
}
pub(crate) fn history_conn(&self) -> Result<&Connection> {
if let Some(conn) = self.history_conn.get() {
return Ok(conn);
}
let conn = open_history_conn(&self.dbdir)?;
let _ = self.history_conn.set(conn);
self.history_conn
.get()
.ok_or_else(|| anyhow::anyhow!("history connection not initialized"))
}
pub fn record_history(&self, rec: &crate::History) -> Result<()> {
let conn = self.history_conn()?;
record_history_to(conn, rec)
}
pub fn record_history_batch(&self, recs: &[crate::History]) -> Result<()> {
if recs.is_empty() {
return Ok(());
}
let conn = self.history_conn()?;
let tx = conn.unchecked_transaction()?;
for rec in recs {
record_history_to(&tx, rec)?;
}
tx.commit()?;
Ok(())
}
pub fn query_history(
&self,
patterns: &[regex::Regex],
all: bool,
) -> Result<Vec<crate::History>> {
let conn = self.history_conn()?;
let cols: String = HistoryKind::VARIANTS
.iter()
.map(|v| format!("bh.{}", <&str>::from(v)))
.collect::<Vec<_>>()
.join(", ");
let where_clause = if all {
String::new()
} else {
format!(
"WHERE bh.outcome IN ({success}, {failed})",
success = PackageState::Success.id(),
failed = PackageState::Failed.id(),
)
};
let sql = format!(
"SELECT bh.id, {cols}, \
wt.stage, wt.duration, ct.duration \
FROM build_history bh \
LEFT JOIN wall_times wt ON wt.history_id = bh.id \
LEFT JOIN cpu_times ct ON ct.history_id = bh.id \
AND ct.stage = wt.stage \
{where_clause} \
ORDER BY bh.timestamp DESC, bh.id DESC",
);
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, i64>(0)?,
row.get::<_, i64>(1)?,
row.get::<_, String>(2)?,
row.get::<_, String>(3)?,
row.get::<_, String>(4)?,
row.get::<_, i32>(5)?,
row.get::<_, Option<i32>>(6)?,
row.get::<_, Option<i64>>(7)?.map(|v| v as usize),
row.get::<_, i64>(8)?,
row.get::<_, Option<i64>>(9)?,
row.get::<_, Option<String>>(10)?,
row.get::<_, Option<String>>(11)?,
row.get::<_, Option<i32>>(12)?,
row.get::<_, Option<i64>>(13)?,
row.get::<_, Option<i64>>(14)?,
))
})?;
let mut results: Vec<crate::History> = Vec::new();
let mut current_id: Option<i64> = None;
let mut current_accepted = false;
for row in rows {
let (
id,
timestamp,
pkgpath,
pkgname,
pkgbase,
outcome_id,
stage_id,
make_jobs,
duration,
disk_usage,
wrkobjdir,
build_id,
wt_stage,
wt_duration,
ct_duration,
) = row?;
if Some(id) == current_id {
if current_accepted
&& let Some(last) = results.last_mut()
&& let Some(stage_id) = wt_stage
{
let stage = Stage::from_repr(stage_id)
.ok_or_else(|| anyhow::anyhow!("Unknown stage id: {}", stage_id))?;
if let Some(ms) = wt_duration {
last.stage_durations
.push((stage, Duration::from_millis(ms as u64)));
}
if let Some(ms) = ct_duration {
last.stage_cpu_times
.push((stage, Duration::from_millis(ms as u64)));
}
}
continue;
}
current_id = Some(id);
current_accepted = false;
if !patterns.is_empty()
&& !patterns
.iter()
.any(|re| re.is_match(&pkgpath) || re.is_match(&pkgname))
{
continue;
}
current_accepted = true;
let outcome = PackageState::try_from(outcome_id)
.map_err(|_| anyhow::anyhow!("Unknown outcome type id: {}", outcome_id))?;
let stage = stage_id
.map(|id| {
Stage::from_repr(id).ok_or_else(|| anyhow::anyhow!("Unknown stage id: {}", id))
})
.transpose()?;
let mut stage_durations = Vec::new();
let mut stage_cpu_times = Vec::new();
if let Some(st_id) = wt_stage {
let st = Stage::from_repr(st_id)
.ok_or_else(|| anyhow::anyhow!("Unknown stage id: {}", st_id))?;
if let Some(ms) = wt_duration {
stage_durations.push((st, Duration::from_millis(ms as u64)));
}
if let Some(ms) = ct_duration {
stage_cpu_times.push((st, Duration::from_millis(ms as u64)));
}
}
results.push(crate::History {
timestamp,
pkgpath,
pkgname,
pkgbase,
outcome,
stage,
make_jobs,
duration: Duration::from_millis(duration as u64),
disk_usage: disk_usage.map(|s| s as u64),
wrkobjdir: wrkobjdir.and_then(|s| s.parse().ok()),
stage_durations,
stage_cpu_times,
build_id,
});
}
Ok(results)
}
pub fn build_stage_cpu_times(&self) -> HashMap<PkgKey, u64> {
match self.history_conn() {
Ok(conn) => query_build_stage_timings(conn)
.into_iter()
.map(|(key, timing)| (key, timing.cpu_ms))
.collect(),
Err(e) => {
warn!(
error = format!("{e:#}"),
"build_stage_cpu_times: failed to open history db"
);
HashMap::new()
}
}
}
pub fn build_history_by_pkg_all(
&self,
build_id: Option<&str>,
) -> HashMap<PkgKey, PkgBuildHistory> {
let conn = match self.history_conn() {
Ok(c) => c,
Err(e) => {
warn!(
error = format!("{e:#}"),
"build_history_by_pkg_all: failed to open history db"
);
return HashMap::new();
}
};
let du: &str = HistoryKind::DiskUsage.into();
let out: &str = HistoryKind::Outcome.into();
let mj: &str = HistoryKind::MakeJobs.into();
let wo: &str = HistoryKind::Wrkobjdir.into();
let pkgbase_col: &str = HistoryKind::Pkgbase.into();
let pkgpath_col: &str = HistoryKind::Pkgpath.into();
let up_to_date = PackageState::UpToDate.id();
let where_clause = if build_id.is_some() {
format!("WHERE h.build_id = ?1 AND h.{out} != {up_to_date}")
} else {
format!("WHERE h.{out} != {up_to_date}")
};
let sql = format!(
"SELECT b.{pkgpath_col}, b.{pkgbase_col}, b.{out}, b.{du}, b.{mj}, b.{wo} \
FROM build_history b \
JOIN (SELECT MAX(h.id) AS id FROM build_history h \
{where_clause} \
GROUP BY h.{pkgpath_col}, h.{pkgbase_col}) latest \
ON b.id = latest.id",
);
let mut stmt = match conn.prepare(&sql) {
Ok(s) => s,
Err(e) => {
warn!(
error = format!("{e:#}"),
"build_history_by_pkg_all: failed to prepare query"
);
return HashMap::new();
}
};
let map_row = |row: &rusqlite::Row<'_>| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, i32>(2)?,
row.get::<_, Option<i64>>(3)?,
row.get::<_, Option<i64>>(4)?,
row.get::<_, Option<String>>(5)?,
))
};
let rows = match build_id {
Some(id) => stmt.query_map(params![id], map_row),
None => stmt.query_map([], map_row),
};
let rows = match rows {
Ok(r) => r,
Err(e) => {
warn!(
error = format!("{e:#}"),
"build_history_by_pkg_all: query failed"
);
return HashMap::new();
}
};
let mut result = HashMap::new();
for row in rows.flatten() {
let (pkgpath, pkgbase, outcome, du, mj, wo) = row;
let outcome = match PackageState::try_from(outcome) {
Ok(k) => k,
Err(e) => {
warn!(error = e, "build_history_by_pkg_all: skipping row");
continue;
}
};
result.insert(
(pkgpath, pkgbase),
PkgBuildHistory {
outcome,
disk_usage: du.map(|v| v as u64),
make_jobs: mj.map(|v| v as u32),
wrkobjdir: wo,
},
);
}
result
}
pub fn store_cpu_samples(&self, sampler: Option<crate::cpu::CpuSamplerHandle>, what: &str) {
let Some(sampler) = sampler else { return };
let samples = sampler.stop();
if samples.is_empty() {
return;
}
match self.store_cpu_usage(&samples) {
Ok(()) => debug!(count = samples.len(), "Saved {} samples", what),
Err(e) => warn!(error = format!("{e:#}"), "Failed to save {} samples", what),
}
}
pub fn store_cpu_usage(&self, samples: &[crate::cpu::CpuSample]) -> Result<()> {
if samples.is_empty() {
return Ok(());
}
let conn = self.history_conn()?;
let tx = conn.unchecked_transaction()?;
{
let mut stmt = tx.prepare_cached(
"INSERT INTO cpu_usage (timestamp, user_pct, sys_pct) \
VALUES (?1, ?2, ?3)",
)?;
for s in samples {
stmt.execute(params![s.timestamp, s.user_pct as i32, s.sys_pct as i32])?;
}
}
tx.commit()?;
Ok(())
}
pub fn list_history_builds(&self) -> Result<Vec<BuildListEntry>> {
let conn = self.history_conn()?;
let success = PackageState::Success.id();
let failed = PackageState::Failed.id();
let up_to_date = PackageState::UpToDate.id();
let mut stmt = conn.prepare(
"WITH attempted AS ( \
SELECT build_id, \
SUM(CASE WHEN outcome = ?1 THEN 1 ELSE 0 END) AS succeeded, \
SUM(CASE WHEN outcome = ?2 THEN 1 ELSE 0 END) AS failed, \
SUM(CASE WHEN outcome NOT IN (?1, ?2, ?3) THEN 1 ELSE 0 END) AS masked \
FROM build_history \
WHERE build_id IS NOT NULL \
GROUP BY build_id \
), \
all_builds AS ( \
SELECT build_id FROM attempted \
UNION \
SELECT build_id FROM build_metadata \
) \
SELECT b.build_id, \
COALESCE(a.succeeded, 0) + COALESCE(a.failed, 0) \
+ COALESCE(a.masked, 0) + COALESCE(m.up_to_date, 0), \
COALESCE(a.succeeded, 0), \
COALESCE(m.up_to_date, 0), \
COALESCE(a.failed, 0), \
COALESCE(a.masked, 0), \
COALESCE(m.duration_ms, 0) \
FROM all_builds b \
LEFT JOIN attempted a ON a.build_id = b.build_id \
LEFT JOIN build_metadata m ON m.build_id = b.build_id \
ORDER BY b.build_id DESC",
)?;
let rows = stmt.query_map(params![success, failed, up_to_date], |row| {
Ok(BuildListEntry {
build_id: row.get(0)?,
package_count: row.get::<_, i64>(1)? as usize,
succeeded: row.get::<_, i64>(2)? as usize,
up_to_date: row.get::<_, i64>(3)? as usize,
failed: row.get::<_, i64>(4)? as usize,
masked: row.get::<_, i64>(5)? as usize,
duration_ms: row.get::<_, i64>(6)? as u64,
})
})?;
rows.collect::<Result<Vec<_>, _>>()
.context("Failed to list history builds")
}
pub fn prune_builds(&self, build_ids: &[String]) -> Result<()> {
if build_ids.is_empty() {
return Ok(());
}
let conn = self.history_conn()?;
let tx = conn.unchecked_transaction()?;
{
let mut hist = tx.prepare("DELETE FROM build_history WHERE build_id = ?1")?;
let mut meta = tx.prepare("DELETE FROM build_metadata WHERE build_id = ?1")?;
for id in build_ids {
hist.execute([id])?;
meta.execute([id])?;
}
}
tx.commit()?;
conn.execute_batch("VACUUM")?;
Ok(())
}
pub fn store_build_revision(&self, build_id: &str, revision: &str) -> Result<()> {
let conn = self.history_conn()?;
conn.execute(
"INSERT INTO build_metadata (build_id, revision) VALUES (?1, ?2) \
ON CONFLICT(build_id) DO UPDATE SET revision = excluded.revision",
params![build_id, revision],
)?;
Ok(())
}
pub fn record_up_to_date_count(&self, build_id: &str, count: usize) -> Result<()> {
let conn = self.history_conn()?;
conn.execute(
"INSERT INTO build_metadata (build_id, up_to_date) VALUES (?1, ?2) \
ON CONFLICT(build_id) DO UPDATE SET up_to_date = excluded.up_to_date",
params![build_id, count as i64],
)?;
Ok(())
}
pub fn get_build_revision(&self, build_id: &str) -> Result<Option<String>> {
let conn = self.history_conn()?;
match conn.query_row(
"SELECT revision FROM build_metadata WHERE build_id = ?1",
[build_id],
|row| row.get(0),
) {
Ok(rev) => Ok(Some(rev)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub fn compute_build_diff(&self, build1_id: &str, build2_id: &str) -> Result<BuildDiff> {
let conn = self.history_conn()?;
struct PkgRecord {
pkgname: String,
outcome: i32,
stage: Option<i32>,
}
let sql = "SELECT pkgpath, pkgbase, pkgname, outcome, stage \
FROM build_history WHERE build_id = ?1";
let query_build = |bid: &str| -> Result<HashMap<PkgKey, PkgRecord>> {
let mut stmt = conn.prepare(sql)?;
let mut map = HashMap::new();
let rows = stmt.query_map([bid], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
row.get::<_, i32>(3)?,
row.get::<_, Option<i32>>(4)?,
))
})?;
for row in rows {
let (pkgpath, pkgbase, pkgname, outcome, stage) = row?;
map.insert(
(pkgpath, pkgbase),
PkgRecord {
pkgname,
outcome,
stage,
},
);
}
Ok(map)
};
let b1 = query_build(build1_id)?;
let b2 = query_build(build2_id)?;
let mut diff = BuildDiff {
build1_id: build1_id.to_string(),
build2_id: build2_id.to_string(),
new_failures: Vec::new(),
fixes: Vec::new(),
version_changes: Vec::new(),
other_changes: Vec::new(),
};
use PackageState::*;
for ((pkgpath, pkgbase), b2_rec) in &b2 {
let b2_outcome = PackageState::try_from(b2_rec.outcome).ok();
let b2_stage = b2_rec.stage.and_then(Stage::from_repr);
let entry_from = |b1_rec: Option<&PkgRecord>| DiffEntry {
pkgpath: pkgpath.clone(),
build1_pkgname: b1_rec.map(|r| r.pkgname.clone()),
build2_pkgname: Some(b2_rec.pkgname.clone()),
build1_outcome: b1_rec.and_then(|r| PackageState::try_from(r.outcome).ok()),
build2_outcome: b2_outcome,
build1_stage: b1_rec.and_then(|r| r.stage.and_then(Stage::from_repr)),
build2_stage: b2_stage,
};
match b1.get(&(pkgpath.clone(), pkgbase.clone())) {
Some(b1_rec) => {
let b1_outcome = PackageState::try_from(b1_rec.outcome).ok();
let b1_stage = b1_rec.stage.and_then(Stage::from_repr);
let pkgname_changed = b1_rec.pkgname != b2_rec.pkgname;
let both_ok = matches!(
(b1_outcome, b2_outcome),
(Some(Success | UpToDate), Some(Success | UpToDate))
);
let no_change = if both_ok {
!pkgname_changed
} else {
b1_outcome == b2_outcome && b1_stage == b2_stage && !pkgname_changed
};
if no_change {
continue;
}
let entry = entry_from(Some(b1_rec));
match (b1_outcome, b2_outcome) {
(Some(b1o), Some(b2o)) => {
let was_failed = matches!(b1o, Failed);
let now_failed = matches!(b2o, Failed);
let now_ok = matches!(b2o, Success | UpToDate);
if !was_failed && now_failed {
diff.new_failures.push(entry);
} else if was_failed && now_ok {
diff.fixes.push(entry);
} else if was_failed && now_failed {
diff.version_changes.push(entry);
} else {
diff.other_changes.push(entry);
}
}
_ => diff.other_changes.push(entry),
}
}
None => {
let entry = entry_from(None);
if matches!(b2_outcome, Some(Failed)) {
diff.new_failures.push(entry);
} else {
diff.other_changes.push(entry);
}
}
}
}
for ((pkgpath, pkgbase), b1_rec) in &b1 {
if b2.contains_key(&(pkgpath.clone(), pkgbase.clone())) {
continue;
}
let b1_outcome = PackageState::try_from(b1_rec.outcome).ok();
let entry = DiffEntry {
pkgpath: pkgpath.clone(),
build1_pkgname: Some(b1_rec.pkgname.clone()),
build2_pkgname: None,
build1_outcome: b1_outcome,
build2_outcome: None,
build1_stage: b1_rec.stage.and_then(Stage::from_repr),
build2_stage: None,
};
if matches!(b1_outcome, Some(Failed)) {
diff.fixes.push(entry);
} else {
diff.other_changes.push(entry);
}
}
Ok(diff)
}
}
pub struct DiffEntry {
pub pkgpath: String,
pub build1_pkgname: Option<String>,
pub build2_pkgname: Option<String>,
pub build1_outcome: Option<PackageState>,
pub build2_outcome: Option<PackageState>,
pub build1_stage: Option<Stage>,
pub build2_stage: Option<Stage>,
}
pub struct BuildDiff {
pub build1_id: String,
pub build2_id: String,
pub new_failures: Vec<DiffEntry>,
pub fixes: Vec<DiffEntry>,
pub version_changes: Vec<DiffEntry>,
pub other_changes: Vec<DiffEntry>,
}
pub struct BuildListEntry {
pub build_id: String,
pub package_count: usize,
pub succeeded: usize,
pub up_to_date: usize,
pub failed: usize,
pub masked: usize,
pub duration_ms: u64,
}
type HistoryMigration = (i32, i32, fn(&Connection) -> Result<()>);
const HISTORY_MIGRATIONS: &[HistoryMigration] = &[
(20260406, 20260513, migrate_history_20260406_to_20260513),
(20260513, 20260515, migrate_history_20260513_to_20260515),
(20260515, 20260609, migrate_history_20260515_to_20260609),
(20260609, 20260610, migrate_history_20260609_to_20260610),
];
fn check_history_schema(dbdir: &Path) -> Result<()> {
let path = dbdir.join("history.db");
if !path.exists() {
return Ok(());
}
let conn = Connection::open(&path).context("Failed to open history database")?;
let has_schema: bool = conn.query_row(
"SELECT COUNT(*) FROM sqlite_master \
WHERE type='table' AND name='schema_version'",
[],
|row| row.get::<_, i32>(0).map(|c| c > 0),
)?;
if !has_schema {
let table_count: i32 = conn.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table'",
[],
|row| row.get(0),
)?;
if table_count > 0 {
anyhow::bail!(
"{} has tables but no schema_version. Remove the file to reset.",
path.display(),
);
}
return Ok(());
}
let mut version: i32 =
conn.query_row("SELECT version FROM schema_version LIMIT 1", [], |row| {
row.get(0)
})?;
for &(from, to, apply) in HISTORY_MIGRATIONS {
if version == from {
apply(&conn)
.with_context(|| format!("Failed to migrate history schema v{from} to v{to}"))?;
version = conn.query_row("SELECT version FROM schema_version LIMIT 1", [], |row| {
row.get(0)
})?;
}
}
if version == HISTORY_SCHEMA_VERSION {
return Ok(());
}
anyhow::bail!(
"History schema mismatch: found v{version}, expected v{expected}. \
Remove {} to reset.",
path.display(),
expected = HISTORY_SCHEMA_VERSION
);
}
fn migrate_history_20260406_to_20260513(conn: &Connection) -> Result<()> {
let tx = conn.unchecked_transaction()?;
tx.execute_batch(
"ALTER TABLE build_metadata ADD COLUMN up_to_date INTEGER NOT NULL DEFAULT 0;
ALTER TABLE build_metadata ADD COLUMN duration_ms INTEGER NOT NULL DEFAULT 0;",
)?;
tx.execute("UPDATE schema_version SET version = ?1", params![20260513])?;
tx.commit()?;
Ok(())
}
fn migrate_history_20260513_to_20260515(conn: &Connection) -> Result<()> {
conn.execute_batch("PRAGMA foreign_keys=OFF")?;
let tx = conn.unchecked_transaction()?;
let history_cols = history_schema();
tx.execute_batch(&format!(
"CREATE TABLE build_history_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
{history_cols},
UNIQUE (build_id, pkgpath, pkgbase)
);
INSERT INTO build_history_new
SELECT * FROM build_history
WHERE build_id IS NULL
OR id IN (
SELECT MAX(id) FROM build_history
WHERE build_id IS NOT NULL
GROUP BY build_id, pkgpath, pkgbase
);
DELETE FROM wall_times
WHERE history_id NOT IN (SELECT id FROM build_history_new);
DELETE FROM cpu_times
WHERE history_id NOT IN (SELECT id FROM build_history_new);
DROP TABLE build_history;
ALTER TABLE build_history_new RENAME TO build_history;
CREATE INDEX idx_history_pkgpath
ON build_history(pkgpath);
CREATE INDEX idx_history_pkgpath_pkgbase
ON build_history(pkgpath, pkgbase);
CREATE INDEX idx_history_pkgpath_outcome
ON build_history(pkgpath, outcome);
CREATE INDEX idx_history_timestamp
ON build_history(timestamp);
CREATE INDEX idx_history_build_id
ON build_history(build_id, pkgpath, id);",
))?;
tx.execute("UPDATE schema_version SET version = ?1", params![20260515])?;
tx.commit()?;
conn.execute_batch("PRAGMA foreign_keys=ON")?;
Ok(())
}
fn migrate_history_20260515_to_20260609(conn: &Connection) -> Result<()> {
let tx = conn.unchecked_transaction()?;
tx.execute_batch(
"CREATE INDEX IF NOT EXISTS idx_history_outcome_pkg
ON build_history(outcome, pkgpath, pkgbase);",
)?;
tx.execute("UPDATE schema_version SET version = ?1", params![20260609])?;
tx.commit()?;
Ok(())
}
fn migrate_history_20260609_to_20260610(conn: &Connection) -> Result<()> {
let tx = conn.unchecked_transaction()?;
tx.execute_batch(
"CREATE INDEX IF NOT EXISTS idx_history_pkg_latest
ON build_history(pkgpath, pkgbase, id, outcome);",
)?;
tx.execute("UPDATE schema_version SET version = ?1", params![20260610])?;
tx.commit()?;
Ok(())
}
fn open_history_conn(dbdir: &Path) -> Result<Connection> {
let conn =
Connection::open(dbdir.join("history.db")).context("Failed to open history database")?;
conn.execute_batch(
"PRAGMA journal_mode = WAL;
PRAGMA synchronous = NORMAL;
PRAGMA cache_size = -8000;
PRAGMA temp_store = MEMORY;
PRAGMA foreign_keys = ON;",
)?;
let has_schema: bool = conn.query_row(
"SELECT COUNT(*) FROM sqlite_master \
WHERE type='table' AND name='schema_version'",
[],
|row| row.get::<_, i32>(0).map(|c| c > 0),
)?;
if !has_schema {
create_history_schema(&conn)?;
}
Ok(conn)
}
pub(crate) fn create_history_schema(conn: &Connection) -> Result<()> {
conn.execute_batch(&format!(
"CREATE TABLE schema_version (version INTEGER NOT NULL);
INSERT INTO schema_version (version) VALUES ({});
CREATE TABLE outcome_types (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL
);
INSERT INTO outcome_types (id, name) VALUES {outcome_types};
CREATE TABLE stage_types (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL
);
INSERT INTO stage_types (id, name) VALUES {stages};
/*
* build_history is keyed by (build_id, pkgpath, pkgbase) and
* holds the latest outcome per key, not an append-only event
* log: record_history_to upserts on conflict, replacing any
* prior row for the same key (e.g. a transient failure
* superseded by a successful rebuild within the same
* build_id). Within-build retries collapse to the final
* state; cross-build history is preserved because each build
* has its own build_id.
*/
CREATE TABLE build_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
{history_columns},
UNIQUE (build_id, pkgpath, pkgbase)
);
CREATE INDEX idx_history_pkgpath
ON build_history(pkgpath);
CREATE INDEX idx_history_pkgpath_pkgbase
ON build_history(pkgpath, pkgbase);
CREATE INDEX idx_history_pkgpath_outcome
ON build_history(pkgpath, outcome);
CREATE INDEX idx_history_timestamp
ON build_history(timestamp);
CREATE INDEX idx_history_build_id
ON build_history(build_id, pkgpath, id);
CREATE INDEX idx_history_outcome_pkg
ON build_history(outcome, pkgpath, pkgbase);
CREATE INDEX idx_history_pkg_latest
ON build_history(pkgpath, pkgbase, id, outcome);
CREATE TABLE wall_times (
history_id INTEGER NOT NULL
REFERENCES build_history(id) ON DELETE CASCADE,
stage INTEGER NOT NULL REFERENCES stage_types(id),
duration INTEGER NOT NULL,
PRIMARY KEY (history_id, stage)
);
CREATE TABLE cpu_times (
history_id INTEGER NOT NULL
REFERENCES build_history(id) ON DELETE CASCADE,
stage INTEGER NOT NULL REFERENCES stage_types(id),
duration INTEGER NOT NULL,
PRIMARY KEY (history_id, stage)
);
CREATE TABLE cpu_usage (
timestamp INTEGER NOT NULL,
user_pct INTEGER NOT NULL,
sys_pct INTEGER NOT NULL
);
CREATE INDEX idx_cpu_timestamp ON cpu_usage(timestamp);
CREATE TABLE build_metadata (
build_id TEXT PRIMARY KEY,
revision TEXT,
up_to_date INTEGER NOT NULL DEFAULT 0,
duration_ms INTEGER NOT NULL DEFAULT 0
);",
HISTORY_SCHEMA_VERSION,
outcome_types = outcome_values(),
stages = stage_values(),
history_columns = history_schema(),
))?;
Ok(())
}
pub(crate) fn record_history_to(conn: &Connection, rec: &crate::History) -> Result<()> {
let cols: Vec<&str> = HistoryKind::VARIANTS.iter().map(<&str>::from).collect();
let placeholders: String = (1..=cols.len())
.map(|i| format!("?{}", i))
.collect::<Vec<_>>()
.join(", ");
let dur_ms = |d: Duration| d.as_millis() as i64;
let values = params![
rec.timestamp,
rec.pkgpath,
rec.pkgname,
rec.pkgbase,
rec.outcome.id(),
rec.stage.map(|s| s as i32),
rec.make_jobs.map(|j| j as i64),
dur_ms(rec.duration),
rec.disk_usage.map(|s| s as i64),
rec.wrkobjdir.as_ref().map(|k| k.to_string()),
rec.build_id.as_deref(),
];
if rec.outcome == crate::PackageState::UpToDate {
let sql = format!(
"INSERT INTO build_history ({}) VALUES ({}) \
ON CONFLICT(build_id, pkgpath, pkgbase) DO NOTHING",
cols.join(", "),
placeholders,
);
conn.prepare_cached(&sql)?.execute(values)?;
return Ok(());
}
let update_assignments: String = cols
.iter()
.map(|c| format!("{c} = excluded.{c}"))
.collect::<Vec<_>>()
.join(", ");
let sql = format!(
"INSERT INTO build_history ({}) VALUES ({}) \
ON CONFLICT(build_id, pkgpath, pkgbase) DO UPDATE SET {} \
RETURNING id",
cols.join(", "),
placeholders,
update_assignments,
);
let history_id: i64 = conn.query_row(&sql, values, |row| row.get(0))?;
conn.execute(
"DELETE FROM wall_times WHERE history_id = ?1",
params![history_id],
)?;
conn.execute(
"DELETE FROM cpu_times WHERE history_id = ?1",
params![history_id],
)?;
if !rec.stage_durations.is_empty() {
let mut stmt = conn.prepare_cached(
"INSERT INTO wall_times \
(history_id, stage, duration) \
VALUES (?1, ?2, ?3)",
)?;
for &(stage, duration) in &rec.stage_durations {
stmt.execute(params![history_id, stage as i32, dur_ms(duration)])?;
}
}
if !rec.stage_cpu_times.is_empty() {
let mut stmt = conn.prepare_cached(
"INSERT INTO cpu_times \
(history_id, stage, duration) \
VALUES (?1, ?2, ?3)",
)?;
for &(stage, duration) in &rec.stage_cpu_times {
stmt.execute(params![history_id, stage as i32, dur_ms(duration)])?;
}
}
Ok(())
}
pub(crate) struct SelectedPackage {
pub id: i64,
pub pkgname: PkgName,
pub pkg_location: String,
pub pbulk_weight: usize,
pub make_jobs_safe: bool,
pub total_pbulk_weight: usize,
pub dep_count: usize,
}
pub(crate) fn query_selected_packages(conn: &Connection) -> Result<Vec<SelectedPackage>> {
let mut stmt = conn.prepare(
"SELECT p.id, p.pkgname, p.pkg_location, p.pbulk_weight, p.make_jobs_safe, \
s.total_pbulk_weight, s.dep_count \
FROM scan_index p \
JOIN package_state s ON s.package_id = p.id \
WHERE s.selected = 1 \
ORDER BY s.dep_count DESC, p.pkgname",
)?;
let rows = stmt.query_map([], |row| {
Ok(SelectedPackage {
id: row.get(0)?,
pkgname: PkgName::from(row.get::<_, String>(1)?),
pkg_location: row.get::<_, Option<String>>(2)?.unwrap_or_default(),
pbulk_weight: row.get::<_, Option<u32>>(3)?.map_or(100, |w| w as usize),
make_jobs_safe: row.get::<_, Option<bool>>(4)?.is_none_or(|s| s),
total_pbulk_weight: row.get::<_, i64>(5)? as usize,
dep_count: row.get::<_, i64>(6)? as usize,
})
})?;
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
}
pub(crate) fn query_resolved_deps(conn: &Connection) -> Result<Vec<(i64, i64)>> {
let mut stmt = conn.prepare("SELECT package_id, depends_on_id FROM resolved_depends")?;
let rows = stmt.query_map([], |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)))?;
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(Into::into)
}
pub(crate) struct BuildStageTiming {
pub cpu_ms: u64,
}
pub(crate) fn query_build_stage_timings(conn: &Connection) -> HashMap<PkgKey, BuildStageTiming> {
let out: &str = HistoryKind::Outcome.into();
let pkgbase_col: &str = HistoryKind::Pkgbase.into();
let pkgpath_col: &str = HistoryKind::Pkgpath.into();
let success_outcome = PackageState::Success.id();
let build_stage = Stage::Build as i32;
let sql = format!(
"WITH latest AS ( \
SELECT {pkgpath_col}, {pkgbase_col}, MAX(id) AS id \
FROM build_history \
WHERE {out} = {success_outcome} \
GROUP BY {pkgpath_col}, {pkgbase_col} \
) \
SELECT l.{pkgpath_col}, l.{pkgbase_col}, ct.duration \
FROM latest l \
JOIN cpu_times ct ON ct.history_id = l.id \
AND ct.stage = {build_stage}",
);
let mut stmt = match conn.prepare(&sql) {
Ok(s) => s,
Err(e) => {
warn!(
error = format!("{e:#}"),
"query_build_stage_timings: failed to prepare query"
);
return HashMap::new();
}
};
let rows = match stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, i64>(2)?,
))
}) {
Ok(r) => r,
Err(e) => {
warn!(
error = format!("{e:#}"),
"query_build_stage_timings: query failed"
);
return HashMap::new();
}
};
let mut result = HashMap::new();
for row in rows.flatten() {
let (pkgpath, pkgbase, cpu_ms) = row;
result.insert(
(pkgpath, pkgbase),
BuildStageTiming {
cpu_ms: cpu_ms as u64,
},
);
}
result
}
#[cfg(test)]
mod tests {
use super::*;
use crate::build::Stage;
fn test_history_db() -> Connection {
let conn = Connection::open_in_memory().expect("failed to open in-memory db");
create_history_schema(&conn).expect("failed to create schema");
conn
}
fn insert_build(conn: &Connection, pkgpath: &str, pkgname: &str, wall_ms: u64, cpu_ms: u64) {
use crate::{History, PackageState};
use std::time::Duration;
let rec = History {
timestamp: 0,
pkgpath: pkgpath.to_string(),
pkgname: pkgname.to_string(),
pkgbase: PkgName::new(pkgname).pkgbase().to_string(),
outcome: PackageState::Success,
stage: None,
make_jobs: Some(1),
duration: Duration::ZERO,
disk_usage: None,
wrkobjdir: None,
stage_durations: vec![(Stage::Build, Duration::from_millis(wall_ms))],
stage_cpu_times: vec![(Stage::Build, Duration::from_millis(cpu_ms))],
build_id: None,
};
record_history_to(conn, &rec).expect("failed to insert build");
}
fn key(pkgpath: &str, pkgbase: &str) -> (String, String) {
(pkgpath.to_string(), pkgbase.to_string())
}
#[test]
fn build_stage_timings_basic() {
let conn = test_history_db();
insert_build(&conn, "devel/cmake", "cmake-3.28.0", 60000, 180000);
let timings = query_build_stage_timings(&conn);
assert_eq!(timings.len(), 1);
let t = &timings[&key("devel/cmake", "cmake")];
assert_eq!(t.cpu_ms, 180000);
}
#[test]
fn build_stage_timings_latest() {
let conn = test_history_db();
insert_build(&conn, "devel/cmake", "cmake-3.27.0", 1000, 2000);
insert_build(&conn, "devel/cmake", "cmake-3.28.0", 5000, 15000);
let timings = query_build_stage_timings(&conn);
assert_eq!(timings.len(), 1);
let t = &timings[&key("devel/cmake", "cmake")];
assert_eq!(t.cpu_ms, 15000);
}
#[test]
fn build_stage_timings_pkgpath_disambiguates() {
let conn = test_history_db();
insert_build(
&conn,
"databases/mysql80-client",
"mysql-client-8.0.1",
3000,
9000,
);
insert_build(
&conn,
"databases/mysql57-client",
"mysql-client-5.7.42",
2000,
4000,
);
let timings = query_build_stage_timings(&conn);
assert_eq!(timings.len(), 2);
let t80 = &timings[&key("databases/mysql80-client", "mysql-client")];
assert_eq!(t80.cpu_ms, 9000);
let t57 = &timings[&key("databases/mysql57-client", "mysql-client")];
assert_eq!(t57.cpu_ms, 4000);
}
#[test]
fn build_stage_timings_empty() {
let conn = test_history_db();
let timings = query_build_stage_timings(&conn);
assert!(timings.is_empty());
}
}