1use crate::build::{BuildOutcome, BuildResult};
34use anyhow::{Context, Result};
35use pkgsrc::{PkgName, PkgPath, ScanIndex};
36use rusqlite::{Connection, params};
37use std::collections::HashSet;
38use std::path::Path;
39use std::time::Duration;
40use tracing::{debug, info, warn};
41
42const SCHEMA_VERSION: i32 = 2;
44
45#[derive(Clone, Debug)]
47pub struct PackageRow {
48 pub id: i64,
49 pub pkgname: String,
50 pub pkgpath: String,
51 pub skip_reason: Option<String>,
52 pub fail_reason: Option<String>,
53 pub is_bootstrap: bool,
54 pub pbulk_weight: i32,
55}
56
57pub struct Database {
59 conn: Connection,
60}
61
62impl Database {
63 pub fn open(path: &Path) -> Result<Self> {
65 if let Some(parent) = path.parent() {
66 std::fs::create_dir_all(parent)
67 .context("Failed to create database directory")?;
68 }
69 let conn = Connection::open(path).context("Failed to open database")?;
70 let db = Self { conn };
71 db.configure_pragmas()?;
72 db.init_or_migrate()?;
73 Ok(db)
74 }
75
76 pub fn begin_transaction(&self) -> Result<()> {
78 self.conn.execute("BEGIN TRANSACTION", [])?;
79 Ok(())
80 }
81
82 pub fn commit(&self) -> Result<()> {
84 self.conn.execute("COMMIT", [])?;
85 Ok(())
86 }
87
88 pub fn rollback(&self) -> Result<()> {
90 self.conn.execute("ROLLBACK", [])?;
91 Ok(())
92 }
93
94 fn configure_pragmas(&self) -> Result<()> {
96 self.conn.execute_batch(
97 "PRAGMA journal_mode = WAL;
98 PRAGMA synchronous = NORMAL;
99 PRAGMA cache_size = -64000;
100 PRAGMA temp_store = MEMORY;
101 PRAGMA mmap_size = 268435456;
102 PRAGMA foreign_keys = ON;",
103 )?;
104 Ok(())
105 }
106
107 fn init_or_migrate(&self) -> Result<()> {
109 let has_schema_version: bool = self.conn.query_row(
111 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='schema_version'",
112 [],
113 |row| row.get::<_, i32>(0).map(|c| c > 0),
114 )?;
115
116 if !has_schema_version {
117 let has_old_scan: bool = self.conn.query_row(
119 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='scan'",
120 [],
121 |row| row.get::<_, i32>(0).map(|c| c > 0),
122 )?;
123
124 if has_old_scan {
125 info!("Migrating from schema v1 to v{}", SCHEMA_VERSION);
126 self.migrate_v1_to_v2()?;
127 } else {
128 self.create_schema_v2()?;
129 }
130 } else {
131 let version: i32 = self.conn.query_row(
132 "SELECT version FROM schema_version ORDER BY version DESC LIMIT 1",
133 [],
134 |row| row.get(0),
135 ).unwrap_or(1);
136
137 if version < SCHEMA_VERSION {
138 info!(
139 "Migrating from schema v{} to v{}",
140 version, SCHEMA_VERSION
141 );
142 if version == 1 {
143 self.migrate_v1_to_v2()?;
144 }
145 }
146 }
147
148 Ok(())
149 }
150
151 fn create_schema_v2(&self) -> Result<()> {
153 self.conn.execute_batch(
154 "CREATE TABLE IF NOT EXISTS schema_version (
155 version INTEGER PRIMARY KEY
156 );
157
158 INSERT OR REPLACE INTO schema_version (version) VALUES (2);
159
160 CREATE TABLE IF NOT EXISTS packages (
161 id INTEGER PRIMARY KEY AUTOINCREMENT,
162 pkgname TEXT UNIQUE NOT NULL,
163 pkgpath TEXT NOT NULL,
164 pkgname_base TEXT NOT NULL,
165 version TEXT NOT NULL,
166 skip_reason TEXT,
167 fail_reason TEXT,
168 is_bootstrap INTEGER DEFAULT 0,
169 pbulk_weight INTEGER DEFAULT 100,
170 scan_data TEXT,
171 scanned_at INTEGER NOT NULL
172 );
173
174 CREATE INDEX IF NOT EXISTS idx_packages_pkgpath ON packages(pkgpath);
175 CREATE INDEX IF NOT EXISTS idx_packages_pkgname_base ON packages(pkgname_base);
176 CREATE INDEX IF NOT EXISTS idx_packages_status ON packages(skip_reason, fail_reason);
177
178 CREATE TABLE IF NOT EXISTS depends (
179 id INTEGER PRIMARY KEY AUTOINCREMENT,
180 package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
181 depend_pattern TEXT NOT NULL,
182 depend_pkgpath TEXT NOT NULL,
183 UNIQUE(package_id, depend_pattern)
184 );
185
186 CREATE INDEX IF NOT EXISTS idx_depends_package ON depends(package_id);
187 CREATE INDEX IF NOT EXISTS idx_depends_pkgpath ON depends(depend_pkgpath);
188
189 CREATE TABLE IF NOT EXISTS resolved_depends (
190 id INTEGER PRIMARY KEY AUTOINCREMENT,
191 package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
192 depends_on_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
193 UNIQUE(package_id, depends_on_id)
194 );
195
196 CREATE INDEX IF NOT EXISTS idx_resolved_depends_package ON resolved_depends(package_id);
197 CREATE INDEX IF NOT EXISTS idx_resolved_depends_depends_on ON resolved_depends(depends_on_id);
198
199 CREATE TABLE IF NOT EXISTS builds (
200 id INTEGER PRIMARY KEY AUTOINCREMENT,
201 package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
202 outcome TEXT NOT NULL,
203 outcome_detail TEXT,
204 duration_ms INTEGER NOT NULL DEFAULT 0,
205 built_at INTEGER NOT NULL,
206 log_dir TEXT,
207 UNIQUE(package_id)
208 );
209
210 CREATE INDEX IF NOT EXISTS idx_builds_outcome ON builds(outcome);
211 CREATE INDEX IF NOT EXISTS idx_builds_package ON builds(package_id);
212
213 CREATE TABLE IF NOT EXISTS metadata (
214 key TEXT PRIMARY KEY,
215 value TEXT NOT NULL
216 );"
217 )?;
218
219 debug!("Created schema v2");
220 Ok(())
221 }
222
223 fn migrate_v1_to_v2(&self) -> Result<()> {
225 self.create_schema_v2()?;
227
228 let mut stmt = self
230 .conn
231 .prepare("SELECT pkgpath, data FROM scan ORDER BY rowid")?;
232
233 let rows: Vec<(String, String)> = stmt
234 .query_map([], |row| {
235 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
236 })?
237 .filter_map(|r| r.ok())
238 .collect();
239
240 let mut migrated_count = 0;
241 for (pkgpath, json) in rows {
242 let indexes: Vec<ScanIndex> = match serde_json::from_str(&json) {
243 Ok(idx) => idx,
244 Err(e) => {
245 warn!(pkgpath = %pkgpath, error = %e, "Failed to parse scan data during migration");
246 continue;
247 }
248 };
249
250 for idx in indexes {
251 if let Err(e) = self.store_package(&pkgpath, &idx) {
252 warn!(pkgpath = %pkgpath, error = %e, "Failed to migrate package");
253 }
254 }
255 migrated_count += 1;
256 }
257
258 let mut stmt = self
260 .conn
261 .prepare("SELECT pkgname, data FROM build ORDER BY rowid")?;
262
263 let rows: Vec<(String, String)> = stmt
264 .query_map([], |row| {
265 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
266 })?
267 .filter_map(|r| r.ok())
268 .collect();
269
270 let mut build_count = 0;
271 for (pkgname, json) in rows {
272 let result: BuildResult = match serde_json::from_str(&json) {
273 Ok(r) => r,
274 Err(e) => {
275 warn!(pkgname = %pkgname, error = %e, "Failed to parse build data during migration");
276 continue;
277 }
278 };
279
280 if let Ok(Some(pkg)) = self.get_package_by_name(&pkgname) {
282 if let Err(e) = self.store_build_result(pkg.id, &result) {
283 warn!(pkgname = %pkgname, error = %e, "Failed to migrate build result");
284 }
285 build_count += 1;
286 }
287 }
288
289 self.conn.execute_batch(
291 "DROP TABLE IF EXISTS scan;
292 DROP TABLE IF EXISTS build;",
293 )?;
294
295 info!(
296 packages = migrated_count,
297 builds = build_count,
298 "Migration to v2 complete"
299 );
300 Ok(())
301 }
302
303 pub fn store_package(
309 &self,
310 pkgpath: &str,
311 index: &ScanIndex,
312 ) -> Result<i64> {
313 let pkgname = index.pkgname.pkgname();
314 let (base, version) = split_pkgname(pkgname);
315
316 let skip_reason =
317 index.pkg_skip_reason.as_ref().filter(|s| !s.is_empty());
318 let fail_reason =
319 index.pkg_fail_reason.as_ref().filter(|s| !s.is_empty());
320 let is_bootstrap = index.bootstrap_pkg.as_deref() == Some("yes");
321 let pbulk_weight: i32 = index
322 .pbulk_weight
323 .as_ref()
324 .and_then(|s| s.parse().ok())
325 .unwrap_or(100);
326
327 let scan_data = serde_json::to_string(index)?;
328 let now = std::time::SystemTime::now()
329 .duration_since(std::time::UNIX_EPOCH)?
330 .as_secs() as i64;
331
332 {
333 let mut stmt = self.conn.prepare_cached(
334 "INSERT OR REPLACE INTO packages
335 (pkgname, pkgpath, pkgname_base, version, skip_reason, fail_reason,
336 is_bootstrap, pbulk_weight, scan_data, scanned_at)
337 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
338 )?;
339 stmt.execute(params![
340 pkgname,
341 pkgpath,
342 base,
343 version,
344 skip_reason,
345 fail_reason,
346 is_bootstrap,
347 pbulk_weight,
348 scan_data,
349 now
350 ])?;
351 }
352
353 let package_id = self.conn.last_insert_rowid();
354
355 if let Some(ref deps) = index.all_depends {
357 let mut stmt = self.conn.prepare_cached(
358 "INSERT OR IGNORE INTO depends (package_id, depend_pattern, depend_pkgpath)
359 VALUES (?1, ?2, ?3)",
360 )?;
361 for dep in deps {
362 stmt.execute(params![
363 package_id,
364 dep.pattern().pattern(),
365 dep.pkgpath().to_string()
366 ])?;
367 }
368 }
369
370 debug!(pkgname = pkgname, package_id = package_id, "Stored package");
371 Ok(package_id)
372 }
373
374 pub fn store_scan_pkgpath(
376 &self,
377 pkgpath: &str,
378 indexes: &[ScanIndex],
379 ) -> Result<()> {
380 for index in indexes {
381 self.store_package(pkgpath, index)?;
382 }
383 Ok(())
384 }
385
386 pub fn get_package_by_name(
388 &self,
389 pkgname: &str,
390 ) -> Result<Option<PackageRow>> {
391 let result = self.conn.query_row(
392 "SELECT id, pkgname, pkgpath, skip_reason, fail_reason, is_bootstrap, pbulk_weight
393 FROM packages WHERE pkgname = ?1",
394 [pkgname],
395 |row| Ok(PackageRow {
396 id: row.get(0)?,
397 pkgname: row.get(1)?,
398 pkgpath: row.get(2)?,
399 skip_reason: row.get(3)?,
400 fail_reason: row.get(4)?,
401 is_bootstrap: row.get::<_, i32>(5)? != 0,
402 pbulk_weight: row.get(6)?,
403 }),
404 );
405
406 match result {
407 Ok(pkg) => Ok(Some(pkg)),
408 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
409 Err(e) => Err(e.into()),
410 }
411 }
412
413 pub fn get_package_id(&self, pkgname: &str) -> Result<Option<i64>> {
415 let result = self.conn.query_row(
416 "SELECT id FROM packages WHERE pkgname = ?1",
417 [pkgname],
418 |row| row.get(0),
419 );
420
421 match result {
422 Ok(id) => Ok(Some(id)),
423 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
424 Err(e) => Err(e.into()),
425 }
426 }
427
428 pub fn get_pkgname(&self, package_id: i64) -> Result<String> {
430 self.conn
431 .query_row(
432 "SELECT pkgname FROM packages WHERE id = ?1",
433 [package_id],
434 |row| row.get(0),
435 )
436 .context("Package not found")
437 }
438
439 pub fn get_packages_by_path(
441 &self,
442 pkgpath: &str,
443 ) -> Result<Vec<PackageRow>> {
444 let mut stmt = self.conn.prepare(
445 "SELECT id, pkgname, pkgpath, skip_reason, fail_reason, is_bootstrap, pbulk_weight
446 FROM packages WHERE pkgpath = ?1"
447 )?;
448
449 let rows = stmt.query_map([pkgpath], |row| {
450 Ok(PackageRow {
451 id: row.get(0)?,
452 pkgname: row.get(1)?,
453 pkgpath: row.get(2)?,
454 skip_reason: row.get(3)?,
455 fail_reason: row.get(4)?,
456 is_bootstrap: row.get::<_, i32>(5)? != 0,
457 pbulk_weight: row.get(6)?,
458 })
459 })?;
460
461 rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
462 }
463
464 pub fn is_pkgpath_scanned(&self, pkgpath: &str) -> Result<bool> {
466 let count: i32 = self.conn.query_row(
467 "SELECT COUNT(*) FROM packages WHERE pkgpath = ?1",
468 [pkgpath],
469 |row| row.get(0),
470 )?;
471 Ok(count > 0)
472 }
473
474 pub fn get_scanned_pkgpaths(&self) -> Result<HashSet<String>> {
476 let mut stmt =
477 self.conn.prepare("SELECT DISTINCT pkgpath FROM packages")?;
478 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
479 rows.collect::<Result<HashSet<_>, _>>().map_err(Into::into)
480 }
481
482 pub fn get_unscanned_dependencies(&self) -> Result<HashSet<String>> {
486 let mut stmt = self.conn.prepare(
487 "SELECT DISTINCT d.depend_pkgpath
488 FROM depends d
489 WHERE d.depend_pkgpath NOT IN (SELECT pkgpath FROM packages)",
490 )?;
491 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
492 rows.collect::<Result<HashSet<_>, _>>().map_err(Into::into)
493 }
494
495 pub fn count_packages(&self) -> Result<i64> {
497 self.conn
498 .query_row("SELECT COUNT(*) FROM packages", [], |row| row.get(0))
499 .context("Failed to count packages")
500 }
501
502 pub fn count_scan(&self) -> Result<i64> {
504 self.conn
505 .query_row(
506 "SELECT COUNT(DISTINCT pkgpath) FROM packages",
507 [],
508 |row| row.get(0),
509 )
510 .context("Failed to count scan")
511 }
512
513 pub fn get_all_packages(&self) -> Result<Vec<PackageRow>> {
515 let mut stmt = self.conn.prepare(
516 "SELECT id, pkgname, pkgpath, skip_reason, fail_reason, is_bootstrap, pbulk_weight
517 FROM packages ORDER BY id"
518 )?;
519
520 let rows = stmt.query_map([], |row| {
521 Ok(PackageRow {
522 id: row.get(0)?,
523 pkgname: row.get(1)?,
524 pkgpath: row.get(2)?,
525 skip_reason: row.get(3)?,
526 fail_reason: row.get(4)?,
527 is_bootstrap: row.get::<_, i32>(5)? != 0,
528 pbulk_weight: row.get(6)?,
529 })
530 })?;
531
532 rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
533 }
534
535 pub fn get_buildable_packages(&self) -> Result<Vec<PackageRow>> {
537 let mut stmt = self.conn.prepare(
538 "SELECT id, pkgname, pkgpath, skip_reason, fail_reason, is_bootstrap, pbulk_weight
539 FROM packages WHERE skip_reason IS NULL AND fail_reason IS NULL"
540 )?;
541
542 let rows = stmt.query_map([], |row| {
543 Ok(PackageRow {
544 id: row.get(0)?,
545 pkgname: row.get(1)?,
546 pkgpath: row.get(2)?,
547 skip_reason: row.get(3)?,
548 fail_reason: row.get(4)?,
549 is_bootstrap: row.get::<_, i32>(5)? != 0,
550 pbulk_weight: row.get(6)?,
551 })
552 })?;
553
554 rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
555 }
556
557 pub fn get_full_scan_index(&self, package_id: i64) -> Result<ScanIndex> {
559 let json: String = self.conn.query_row(
560 "SELECT scan_data FROM packages WHERE id = ?1",
561 [package_id],
562 |row| row.get(0),
563 )?;
564 serde_json::from_str(&json).context("Failed to deserialize scan data")
565 }
566
567 pub fn get_all_scan_indexes(&self) -> Result<Vec<(i64, ScanIndex)>> {
569 let mut stmt = self
570 .conn
571 .prepare("SELECT id, scan_data FROM packages ORDER BY id")?;
572 let rows = stmt.query_map([], |row| {
573 let id: i64 = row.get(0)?;
574 let json: String = row.get(1)?;
575 Ok((id, json))
576 })?;
577 let mut results = Vec::new();
578 for row in rows {
579 let (id, json) = row?;
580 let index: ScanIndex =
581 serde_json::from_str(&json).with_context(|| {
582 format!(
583 "Failed to deserialize scan data for package {}",
584 id
585 )
586 })?;
587 results.push((id, index));
588 }
589 Ok(results)
590 }
591
592 pub fn get_scan_index_by_name(
594 &self,
595 pkgname: &str,
596 ) -> Result<Option<ScanIndex>> {
597 let result = self.conn.query_row(
598 "SELECT scan_data FROM packages WHERE pkgname = ?1",
599 [pkgname],
600 |row| row.get::<_, String>(0),
601 );
602
603 match result {
604 Ok(json) => {
605 let index: ScanIndex = serde_json::from_str(&json)
606 .context("Failed to deserialize scan data")?;
607 Ok(Some(index))
608 }
609 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
610 Err(e) => Err(e.into()),
611 }
612 }
613
614 pub fn clear_scan(&self) -> Result<()> {
616 self.conn.execute("DELETE FROM packages", [])?;
617 self.clear_full_scan_complete()?;
618 self.clear_resolved_depends()?;
619 Ok(())
620 }
621
622 pub fn store_resolved_dependency(
628 &self,
629 package_id: i64,
630 depends_on_id: i64,
631 ) -> Result<()> {
632 self.conn.execute(
633 "INSERT OR IGNORE INTO resolved_depends (package_id, depends_on_id) VALUES (?1, ?2)",
634 params![package_id, depends_on_id],
635 )?;
636 Ok(())
637 }
638
639 pub fn store_resolved_dependencies_batch(
641 &self,
642 deps: &[(i64, i64)],
643 ) -> Result<()> {
644 self.conn.execute("BEGIN TRANSACTION", [])?;
645 let mut stmt = self.conn.prepare(
646 "INSERT OR IGNORE INTO resolved_depends (package_id, depends_on_id) VALUES (?1, ?2)",
647 )?;
648 for (package_id, depends_on_id) in deps {
649 stmt.execute(params![package_id, depends_on_id])?;
650 }
651 drop(stmt);
652 self.conn.execute("COMMIT", [])?;
653 Ok(())
654 }
655
656 pub fn get_dependencies(&self, package_id: i64) -> Result<Vec<i64>> {
658 let mut stmt = self.conn.prepare(
659 "SELECT depends_on_id FROM resolved_depends WHERE package_id = ?1",
660 )?;
661 let rows = stmt.query_map([package_id], |row| row.get::<_, i64>(0))?;
662 rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
663 }
664
665 pub fn get_reverse_dependencies(
667 &self,
668 package_id: i64,
669 ) -> Result<Vec<i64>> {
670 let mut stmt = self.conn.prepare(
671 "SELECT package_id FROM resolved_depends WHERE depends_on_id = ?1",
672 )?;
673 let rows = stmt.query_map([package_id], |row| row.get::<_, i64>(0))?;
674 rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
675 }
676
677 pub fn get_transitive_reverse_deps(
679 &self,
680 package_id: i64,
681 ) -> Result<Vec<i64>> {
682 let mut stmt = self.conn.prepare(
683 "WITH RECURSIVE affected(id) AS (
684 SELECT ?1
685 UNION
686 SELECT rd.package_id
687 FROM resolved_depends rd
688 JOIN affected a ON rd.depends_on_id = a.id
689 )
690 SELECT id FROM affected WHERE id != ?1",
691 )?;
692 let rows = stmt.query_map([package_id], |row| row.get::<_, i64>(0))?;
693 rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
694 }
695
696 pub fn is_resolved(&self) -> Result<bool> {
698 let count: i64 = self.conn.query_row(
699 "SELECT COUNT(*) FROM resolved_depends",
700 [],
701 |row| row.get(0),
702 )?;
703 Ok(count > 0)
704 }
705
706 pub fn clear_resolved_depends(&self) -> Result<()> {
708 self.conn.execute("DELETE FROM resolved_depends", [])?;
709 Ok(())
710 }
711
712 pub fn get_raw_dependencies(
714 &self,
715 package_id: i64,
716 ) -> Result<Vec<(String, String)>> {
717 let mut stmt = self.conn.prepare(
718 "SELECT depend_pattern, depend_pkgpath FROM depends WHERE package_id = ?1"
719 )?;
720 let rows = stmt.query_map([package_id], |row| {
721 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
722 })?;
723 rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
724 }
725
726 pub fn store_build_result(
732 &self,
733 package_id: i64,
734 result: &BuildResult,
735 ) -> Result<()> {
736 let (outcome, detail) = build_outcome_to_db(&result.outcome);
737 let duration_ms = result.duration.as_millis() as i64;
738 let now = std::time::SystemTime::now()
739 .duration_since(std::time::UNIX_EPOCH)?
740 .as_secs() as i64;
741 let log_dir = result.log_dir.as_ref().map(|p| p.display().to_string());
742
743 self.conn.execute(
744 "INSERT OR REPLACE INTO builds
745 (package_id, outcome, outcome_detail, duration_ms, built_at, log_dir)
746 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
747 params![package_id, outcome, detail, duration_ms, now, log_dir],
748 )?;
749
750 debug!(
751 package_id = package_id,
752 outcome = outcome,
753 "Stored build result"
754 );
755 Ok(())
756 }
757
758 pub fn store_build_by_name(&self, result: &BuildResult) -> Result<()> {
760 if let Some(pkg) = self.get_package_by_name(result.pkgname.pkgname())? {
761 self.store_build_result(pkg.id, result)
762 } else {
763 warn!(pkgname = %result.pkgname.pkgname(), "Package not found in database for build result");
764 Ok(())
765 }
766 }
767
768 pub fn store_build_batch(&self, results: &[BuildResult]) -> Result<()> {
770 self.conn.execute("BEGIN TRANSACTION", [])?;
771 for result in results {
772 if let Err(e) = self.store_build_by_name(result) {
773 let _ = self.conn.execute("ROLLBACK", []);
774 return Err(e);
775 }
776 }
777 self.conn.execute("COMMIT", [])?;
778 debug!(count = results.len(), "Stored build results batch");
779 Ok(())
780 }
781
782 pub fn get_build_result(
784 &self,
785 package_id: i64,
786 ) -> Result<Option<BuildResult>> {
787 let result = self.conn.query_row(
788 "SELECT p.pkgname, p.pkgpath, b.outcome, b.outcome_detail, b.duration_ms, b.log_dir
789 FROM builds b
790 JOIN packages p ON b.package_id = p.id
791 WHERE b.package_id = ?1",
792 [package_id],
793 |row| {
794 let pkgname: String = row.get(0)?;
795 let pkgpath: Option<String> = row.get(1)?;
796 let outcome: String = row.get(2)?;
797 let detail: Option<String> = row.get(3)?;
798 let duration_ms: i64 = row.get(4)?;
799 let log_dir: Option<String> = row.get(5)?;
800 Ok((pkgname, pkgpath, outcome, detail, duration_ms, log_dir))
801 },
802 );
803
804 match result {
805 Ok((pkgname, pkgpath, outcome, detail, duration_ms, log_dir)) => {
806 let build_outcome = db_outcome_to_build(&outcome, detail);
807 Ok(Some(BuildResult {
808 pkgname: PkgName::new(&pkgname),
809 pkgpath: pkgpath.and_then(|p| PkgPath::new(&p).ok()),
810 outcome: build_outcome,
811 duration: Duration::from_millis(duration_ms as u64),
812 log_dir: log_dir.map(std::path::PathBuf::from),
813 }))
814 }
815 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
816 Err(e) => Err(e.into()),
817 }
818 }
819
820 pub fn is_package_complete(&self, package_id: i64) -> Result<bool> {
822 let result = self.conn.query_row(
823 "SELECT outcome FROM builds WHERE package_id = ?1",
824 [package_id],
825 |row| row.get::<_, String>(0),
826 );
827
828 match result {
829 Ok(outcome) => Ok(outcome == "success" || outcome == "up_to_date"),
830 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(false),
831 Err(e) => Err(e.into()),
832 }
833 }
834
835 pub fn is_package_failed(&self, package_id: i64) -> Result<bool> {
837 let result = self.conn.query_row(
838 "SELECT outcome FROM builds WHERE package_id = ?1",
839 [package_id],
840 |row| row.get::<_, String>(0),
841 );
842
843 match result {
844 Ok(outcome) => Ok(outcome != "success" && outcome != "up_to_date"),
845 Err(rusqlite::Error::QueryReturnedNoRows) => Ok(false),
846 Err(e) => Err(e.into()),
847 }
848 }
849
850 pub fn get_completed_package_ids(&self) -> Result<HashSet<i64>> {
852 let mut stmt = self.conn.prepare(
853 "SELECT package_id FROM builds WHERE outcome IN ('success', 'up_to_date')"
854 )?;
855 let rows = stmt.query_map([], |row| row.get::<_, i64>(0))?;
856 rows.collect::<Result<HashSet<_>, _>>().map_err(Into::into)
857 }
858
859 pub fn get_failed_package_ids(&self) -> Result<HashSet<i64>> {
861 let mut stmt = self.conn.prepare(
862 "SELECT package_id FROM builds WHERE outcome NOT IN ('success', 'up_to_date')"
863 )?;
864 let rows = stmt.query_map([], |row| row.get::<_, i64>(0))?;
865 rows.collect::<Result<HashSet<_>, _>>().map_err(Into::into)
866 }
867
868 pub fn count_build(&self) -> Result<i64> {
870 self.conn
871 .query_row("SELECT COUNT(*) FROM builds", [], |row| row.get(0))
872 .context("Failed to count builds")
873 }
874
875 pub fn clear_build(&self) -> Result<()> {
877 self.conn.execute("DELETE FROM builds", [])?;
878 Ok(())
879 }
880
881 pub fn delete_build_by_name(&self, pkgname: &str) -> Result<bool> {
883 let rows = self.conn.execute(
884 "DELETE FROM builds WHERE package_id IN (SELECT id FROM packages WHERE pkgname = ?1)",
885 params![pkgname],
886 )?;
887 Ok(rows > 0)
888 }
889
890 pub fn delete_build_by_pkgpath(&self, pkgpath: &str) -> Result<usize> {
892 let rows = self.conn.execute(
893 "DELETE FROM builds WHERE package_id IN (SELECT id FROM packages WHERE pkgpath = ?1)",
894 params![pkgpath],
895 )?;
896 Ok(rows)
897 }
898
899 pub fn get_all_build_results(&self) -> Result<Vec<BuildResult>> {
901 let mut stmt = self.conn.prepare(
902 "SELECT p.pkgname, p.pkgpath, b.outcome, b.outcome_detail, b.duration_ms, b.log_dir
903 FROM builds b
904 JOIN packages p ON b.package_id = p.id
905 ORDER BY p.pkgname"
906 )?;
907
908 let rows = stmt.query_map([], |row| {
909 let pkgname: String = row.get(0)?;
910 let pkgpath: Option<String> = row.get(1)?;
911 let outcome: String = row.get(2)?;
912 let detail: Option<String> = row.get(3)?;
913 let duration_ms: i64 = row.get(4)?;
914 let log_dir: Option<String> = row.get(5)?;
915 Ok((pkgname, pkgpath, outcome, detail, duration_ms, log_dir))
916 })?;
917
918 let mut results = Vec::new();
919 for row in rows {
920 let (pkgname, pkgpath, outcome, detail, duration_ms, log_dir) =
921 row?;
922 let build_outcome = db_outcome_to_build(&outcome, detail);
923 results.push(BuildResult {
924 pkgname: PkgName::new(&pkgname),
925 pkgpath: pkgpath.and_then(|p| PkgPath::new(&p).ok()),
926 outcome: build_outcome,
927 duration: Duration::from_millis(duration_ms as u64),
928 log_dir: log_dir.map(std::path::PathBuf::from),
929 });
930 }
931
932 Ok(results)
933 }
934
935 pub fn count_breaks_for_failed(
938 &self,
939 ) -> Result<std::collections::HashMap<String, usize>> {
940 use std::collections::HashMap;
941
942 let mut counts: HashMap<String, usize> = HashMap::new();
943
944 let mut stmt = self.conn.prepare(
946 "SELECT p.id, p.pkgname FROM builds b
947 JOIN packages p ON b.package_id = p.id
948 WHERE b.outcome = 'failed'",
949 )?;
950
951 let failed: Vec<(i64, String)> = stmt
952 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
953 .filter_map(|r| r.ok())
954 .collect();
955
956 for (_pkg_id, pkgname) in failed {
958 let count: i64 = self.conn.query_row(
959 "SELECT COUNT(*) FROM builds b
960 JOIN packages p ON b.package_id = p.id
961 WHERE b.outcome = 'indirect_failed'
962 AND b.outcome_detail LIKE ?1",
963 params![format!("%{}", pkgname)],
964 |row| row.get(0),
965 )?;
966 counts.insert(pkgname, count as usize);
967 }
968
969 Ok(counts)
970 }
971
972 pub fn get_total_build_duration(&self) -> Result<Duration> {
974 let total_ms: i64 = self.conn.query_row(
975 "SELECT COALESCE(SUM(duration_ms), 0) FROM builds",
976 [],
977 |row| row.get(0),
978 )?;
979 Ok(Duration::from_millis(total_ms as u64))
980 }
981
982 pub fn get_prefailed_packages(
985 &self,
986 ) -> Result<Vec<(String, Option<String>, String)>> {
987 let mut stmt = self.conn.prepare(
988 "SELECT p.pkgname, p.pkgpath,
989 COALESCE(p.fail_reason, p.skip_reason) as reason
990 FROM packages p
991 WHERE (p.skip_reason IS NOT NULL OR p.fail_reason IS NOT NULL)
992 AND NOT EXISTS (SELECT 1 FROM builds b WHERE b.package_id = p.id)
993 ORDER BY p.pkgname",
994 )?;
995
996 let rows = stmt
997 .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?;
998
999 rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1000 }
1001
1002 pub fn get_indirect_failures(
1007 &self,
1008 ) -> Result<Vec<(String, Option<String>, String)>> {
1009 let mut stmt = self.conn.prepare(
1016 "WITH RECURSIVE
1017 -- Only direct failures are root causes
1018 failed_pkgs(id) AS (
1019 SELECT package_id FROM builds
1020 WHERE outcome IN ('failed', 'prefailed')
1021 ),
1022 -- Packages affected by failures (transitive closure)
1023 affected(id, root_id) AS (
1024 SELECT id, id FROM failed_pkgs
1025 UNION
1026 SELECT rd.package_id, a.root_id
1027 FROM resolved_depends rd
1028 JOIN affected a ON rd.depends_on_id = a.id
1029 WHERE rd.package_id NOT IN (SELECT id FROM failed_pkgs)
1030 )
1031 SELECT p.pkgname, p.pkgpath, GROUP_CONCAT(DISTINCT fp.pkgname) as failed_deps
1032 FROM affected a
1033 JOIN packages p ON a.id = p.id
1034 JOIN packages fp ON a.root_id = fp.id
1035 WHERE a.id != a.root_id
1036 AND NOT EXISTS (SELECT 1 FROM builds b WHERE b.package_id = a.id)
1037 AND p.skip_reason IS NULL
1038 AND p.fail_reason IS NULL
1039 GROUP BY p.id, p.pkgname, p.pkgpath
1040 ORDER BY p.pkgname",
1041 )?;
1042
1043 let rows = stmt
1044 .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?;
1045
1046 rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
1047 }
1048
1049 pub fn mark_failure_cascade(
1052 &self,
1053 package_id: i64,
1054 reason: &str,
1055 duration: Duration,
1056 ) -> Result<usize> {
1057 let now = std::time::SystemTime::now()
1058 .duration_since(std::time::UNIX_EPOCH)?
1059 .as_secs() as i64;
1060
1061 let pkgname = self.get_pkgname(package_id)?;
1062
1063 let mut stmt = self.conn.prepare(
1065 "WITH RECURSIVE affected(id, depth) AS (
1066 SELECT ?1, 0
1067 UNION
1068 SELECT rd.package_id, a.depth + 1
1069 FROM resolved_depends rd
1070 JOIN affected a ON rd.depends_on_id = a.id
1071 )
1072 SELECT id, depth FROM affected ORDER BY depth",
1073 )?;
1074
1075 let affected: Vec<(i64, i32)> = stmt
1076 .query_map([package_id], |row| {
1077 Ok((row.get::<_, i64>(0)?, row.get::<_, i32>(1)?))
1078 })?
1079 .filter_map(|r| r.ok())
1080 .collect();
1081
1082 self.conn.execute("BEGIN TRANSACTION", [])?;
1084
1085 for (id, depth) in &affected {
1086 let (outcome, detail, dur) = if *depth == 0 {
1087 ("failed", reason.to_string(), duration.as_millis() as i64)
1088 } else {
1089 ("indirect_failed", format!("depends on failed {}", pkgname), 0)
1090 };
1091
1092 self.conn.execute(
1093 "INSERT OR REPLACE INTO builds
1094 (package_id, outcome, outcome_detail, duration_ms, built_at)
1095 VALUES (?1, ?2, ?3, ?4, ?5)",
1096 params![id, outcome, detail, dur, now],
1097 )?;
1098 }
1099
1100 self.conn.execute("COMMIT", [])?;
1101
1102 debug!(
1103 package_id = package_id,
1104 affected_count = affected.len(),
1105 "Marked failure cascade"
1106 );
1107 Ok(affected.len())
1108 }
1109
1110 pub fn full_scan_complete(&self) -> bool {
1116 self.conn
1117 .query_row(
1118 "SELECT value FROM metadata WHERE key = 'full_scan_complete'",
1119 [],
1120 |row| row.get::<_, String>(0),
1121 )
1122 .map(|v| v == "true")
1123 .unwrap_or(false)
1124 }
1125
1126 pub fn set_full_scan_complete(&self) -> Result<()> {
1128 self.conn.execute(
1129 "INSERT OR REPLACE INTO metadata (key, value) VALUES ('full_scan_complete', 'true')",
1130 [],
1131 )?;
1132 Ok(())
1133 }
1134
1135 pub fn clear_full_scan_complete(&self) -> Result<()> {
1137 self.conn.execute(
1138 "DELETE FROM metadata WHERE key = 'full_scan_complete'",
1139 [],
1140 )?;
1141 Ok(())
1142 }
1143
1144 pub fn get_buildable_count(&self) -> Result<i64> {
1146 self.conn.query_row(
1147 "SELECT COUNT(*) FROM packages WHERE skip_reason IS NULL AND fail_reason IS NULL",
1148 [],
1149 |row| row.get(0),
1150 ).context("Failed to count buildable packages")
1151 }
1152
1153 pub fn compare_pkgpath_lists(
1159 &self,
1160 requested: &[&str],
1161 ) -> Result<(Vec<String>, Vec<String>, Vec<String>)> {
1162 let scanned = self.get_scanned_pkgpaths()?;
1163 let requested_set: HashSet<_> =
1164 requested.iter().map(|s| s.to_string()).collect();
1165
1166 let to_add: Vec<_> =
1167 requested_set.difference(&scanned).cloned().collect();
1168 let to_remove: Vec<_> =
1169 scanned.difference(&requested_set).cloned().collect();
1170 let unchanged: Vec<_> =
1171 scanned.intersection(&requested_set).cloned().collect();
1172
1173 Ok((to_add, to_remove, unchanged))
1174 }
1175
1176 pub fn delete_pkgpaths(&self, pkgpaths: &[&str]) -> Result<usize> {
1178 if pkgpaths.is_empty() {
1179 return Ok(0);
1180 }
1181
1182 let mut count = 0;
1183 for pkgpath in pkgpaths {
1184 count += self.conn.execute(
1185 "DELETE FROM packages WHERE pkgpath = ?1",
1186 [pkgpath],
1187 )?;
1188 }
1189 Ok(count)
1190 }
1191
1192 pub fn execute_raw(&self, sql: &str) -> Result<()> {
1194 let mut stmt = self.conn.prepare(sql)?;
1195 let column_count = stmt.column_count();
1196
1197 if column_count == 0 {
1198 let affected = stmt.execute([])?;
1200 if affected > 0 {
1201 println!("{} row(s) affected", affected);
1202 }
1203 } else {
1204 let column_names: Vec<String> =
1206 stmt.column_names().iter().map(|s| s.to_string()).collect();
1207
1208 let mut rows = stmt.query([])?;
1209 let mut first = true;
1210
1211 while let Some(row) = rows.next()? {
1212 if first {
1213 println!("{}", column_names.join("|"));
1214 first = false;
1215 }
1216
1217 let values: Vec<String> = (0..column_count)
1218 .map(|i| {
1219 row.get_ref(i)
1220 .map(|v| match v {
1221 rusqlite::types::ValueRef::Null => {
1222 String::new()
1223 }
1224 rusqlite::types::ValueRef::Integer(i) => {
1225 i.to_string()
1226 }
1227 rusqlite::types::ValueRef::Real(f) => {
1228 f.to_string()
1229 }
1230 rusqlite::types::ValueRef::Text(s) => {
1231 String::from_utf8_lossy(s).to_string()
1232 }
1233 rusqlite::types::ValueRef::Blob(b) => {
1234 format!("<blob:{} bytes>", b.len())
1235 }
1236 })
1237 .unwrap_or_default()
1238 })
1239 .collect();
1240 println!("{}", values.join("|"));
1241 }
1242 }
1243
1244 Ok(())
1245 }
1246}
1247
1248fn split_pkgname(pkgname: &str) -> (String, String) {
1254 let bytes = pkgname.as_bytes();
1256 for i in (0..bytes.len()).rev() {
1257 if bytes[i] == b'-'
1258 && i + 1 < bytes.len()
1259 && bytes[i + 1].is_ascii_digit()
1260 {
1261 return (pkgname[..i].to_string(), pkgname[i + 1..].to_string());
1262 }
1263 }
1264 (pkgname.to_string(), String::new())
1266}
1267
1268fn build_outcome_to_db(
1270 outcome: &BuildOutcome,
1271) -> (&'static str, Option<String>) {
1272 match outcome {
1273 BuildOutcome::Success => ("success", None),
1274 BuildOutcome::UpToDate => ("up_to_date", None),
1275 BuildOutcome::Failed(s) => ("failed", Some(s.clone())),
1276 BuildOutcome::PreFailed(s) => ("pre_failed", Some(s.clone())),
1277 BuildOutcome::IndirectFailed(s) => ("indirect_failed", Some(s.clone())),
1278 BuildOutcome::IndirectPreFailed(s) => {
1279 ("indirect_pre_failed", Some(s.clone()))
1280 }
1281 }
1282}
1283
1284fn db_outcome_to_build(outcome: &str, detail: Option<String>) -> BuildOutcome {
1286 match outcome {
1287 "success" => BuildOutcome::Success,
1288 "up_to_date" => BuildOutcome::UpToDate,
1289 "failed" => BuildOutcome::Failed(detail.unwrap_or_default()),
1290 "pre_failed" => BuildOutcome::PreFailed(detail.unwrap_or_default()),
1291 "indirect_failed" => {
1292 BuildOutcome::IndirectFailed(detail.unwrap_or_default())
1293 }
1294 "indirect_pre_failed" => {
1295 BuildOutcome::IndirectPreFailed(detail.unwrap_or_default())
1296 }
1297 _ => BuildOutcome::Failed(format!("Unknown outcome: {}", outcome)),
1298 }
1299}