import json
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from typing import Any
from bench.config import DB_PATH
_SCHEMA = """
CREATE TABLE IF NOT EXISTS runs (
run_id TEXT PRIMARY KEY,
sqc_version TEXT NOT NULL,
commit_sha TEXT NOT NULL,
mode TEXT NOT NULL DEFAULT 'fast',
status TEXT NOT NULL DEFAULT 'running',
started_at TEXT NOT NULL,
finished_at TEXT,
pid INTEGER,
jobs INTEGER,
total_cwes INTEGER,
hostname TEXT,
cpu_model TEXT,
cpu_cores INTEGER,
ram_gb REAL,
os_version TEXT
);
CREATE TABLE IF NOT EXISTS cwe_scans (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL REFERENCES runs(run_id),
cwe_id TEXT NOT NULL,
cwe_dir_name TEXT NOT NULL,
file_count INTEGER DEFAULT 0,
violation_count INTEGER DEFAULT 0,
duration_s REAL,
status TEXT NOT NULL DEFAULT 'pending',
UNIQUE(run_id, cwe_dir_name)
);
CREATE TABLE IF NOT EXISTS violations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cwe_scan_id INTEGER NOT NULL REFERENCES cwe_scans(id),
rule_id TEXT NOT NULL,
file_path TEXT NOT NULL,
line INTEGER NOT NULL,
classification TEXT,
in_bad_section INTEGER DEFAULT 0,
in_good_section INTEGER DEFAULT 0,
hits_flaw_line INTEGER DEFAULT 0,
is_cwe_matched INTEGER DEFAULT 0
);
CREATE TABLE IF NOT EXISTS cwe_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cwe_scan_id INTEGER NOT NULL UNIQUE REFERENCES cwe_scans(id),
tp_count INTEGER DEFAULT 0,
fp_count INTEGER DEFAULT 0,
tp_rate_pct REAL DEFAULT 0,
flaw_lines_total INTEGER DEFAULT 0,
flaw_lines_detected INTEGER DEFAULT 0,
flaw_detection_rate_pct REAL DEFAULT 0,
cwe_matched_tp INTEGER DEFAULT 0,
cwe_matched_fp INTEGER DEFAULT 0,
noise_count INTEGER DEFAULT 0,
noise_ratio REAL DEFAULT 0,
per_file_detected INTEGER DEFAULT 0,
per_file_total INTEGER DEFAULT 0,
per_file_rate REAL DEFAULT 0,
flaw_hit_detected INTEGER DEFAULT 0,
flaw_hit_total INTEGER DEFAULT 0,
flaw_hit_rate REAL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS rule_cwe_breakdown (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cwe_scan_id INTEGER NOT NULL REFERENCES cwe_scans(id),
rule_id TEXT NOT NULL,
tp_count INTEGER DEFAULT 0,
fp_count INTEGER DEFAULT 0,
flaw_line_count INTEGER DEFAULT 0,
is_cwe_matched INTEGER DEFAULT 0,
UNIQUE(cwe_scan_id, rule_id)
);
CREATE TABLE IF NOT EXISTS realworld_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sqc_version TEXT NOT NULL,
commit_sha TEXT,
scanned_at TEXT,
hostname TEXT,
cpu_model TEXT,
cpu_cores INTEGER,
notes TEXT
);
CREATE TABLE IF NOT EXISTS realworld_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id INTEGER NOT NULL REFERENCES realworld_runs(id),
project TEXT NOT NULL,
tool TEXT NOT NULL,
c_files INTEGER DEFAULT 0,
loc INTEGER DEFAULT 0,
violation_count INTEGER DEFAULT 0,
duration_s REAL,
UNIQUE(run_id, project, tool)
);
CREATE TABLE IF NOT EXISTS realworld_violations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
result_id INTEGER NOT NULL REFERENCES realworld_results(id),
rule_id TEXT NOT NULL,
file_path TEXT NOT NULL,
line INTEGER NOT NULL,
column_num INTEGER DEFAULT 0,
severity TEXT,
message TEXT,
suggestion TEXT
);
CREATE INDEX IF NOT EXISTS idx_violations_cwe_scan ON violations(cwe_scan_id);
CREATE INDEX IF NOT EXISTS idx_violations_rule ON violations(rule_id);
CREATE INDEX IF NOT EXISTS idx_violations_class ON violations(classification);
CREATE INDEX IF NOT EXISTS idx_cwe_scans_run ON cwe_scans(run_id);
CREATE INDEX IF NOT EXISTS idx_cwe_scans_cwe ON cwe_scans(cwe_id);
CREATE INDEX IF NOT EXISTS idx_runs_status ON runs(status);
CREATE INDEX IF NOT EXISTS idx_rw_results_run ON realworld_results(run_id);
CREATE INDEX IF NOT EXISTS idx_rw_results_project ON realworld_results(project);
CREATE INDEX IF NOT EXISTS idx_rw_violations_result ON realworld_violations(result_id);
CREATE INDEX IF NOT EXISTS idx_rw_violations_rule ON realworld_violations(rule_id);
"""
class BenchDB:
def __init__(self, db_path: Path | str | None = None):
self._db_path = str(db_path or DB_PATH)
self._ensure_schema()
def _connect(self) -> sqlite3.Connection:
conn = sqlite3.connect(self._db_path, timeout=30)
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.row_factory = sqlite3.Row
return conn
@contextmanager
def _cursor(self):
conn = self._connect()
try:
yield conn.cursor()
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def _ensure_schema(self):
conn = self._connect()
try:
conn.executescript(_SCHEMA)
conn.commit()
finally:
conn.close()
def create_run(self, run_id: str, sqc_version: str, commit_sha: str,
mode: str, started_at: str, pid: int, jobs: int,
total_cwes: int, machine: dict) -> None:
with self._cursor() as cur:
cur.execute("""
INSERT INTO runs (run_id, sqc_version, commit_sha, mode, status,
started_at, pid, jobs, total_cwes,
hostname, cpu_model, cpu_cores, ram_gb, os_version)
VALUES (?, ?, ?, ?, 'running', ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (run_id, sqc_version, commit_sha, mode, started_at,
pid, jobs, total_cwes,
machine.get("hostname"), machine.get("cpu_model"),
machine.get("cpu_cores"), machine.get("ram_gb"),
machine.get("os_version")))
def finish_run(self, run_id: str, status: str, finished_at: str) -> None:
with self._cursor() as cur:
cur.execute("""
UPDATE runs SET status = ?, finished_at = ? WHERE run_id = ?
""", (status, finished_at, run_id))
def update_run_status(self, run_id: str, status: str) -> None:
with self._cursor() as cur:
cur.execute("UPDATE runs SET status = ? WHERE run_id = ?",
(status, run_id))
def get_run(self, run_id: str) -> dict | None:
with self._cursor() as cur:
cur.execute("SELECT * FROM runs WHERE run_id = ?", (run_id,))
row = cur.fetchone()
return dict(row) if row else None
def list_runs(self) -> list[dict]:
with self._cursor() as cur:
cur.execute("SELECT * FROM runs ORDER BY started_at DESC")
return [dict(r) for r in cur.fetchall()]
def create_cwe_scan(self, run_id: str, cwe_id: str,
cwe_dir_name: str, file_count: int = 0) -> int:
with self._cursor() as cur:
cur.execute("""
INSERT INTO cwe_scans (run_id, cwe_id, cwe_dir_name, file_count, status)
VALUES (?, ?, ?, ?, 'pending')
""", (run_id, cwe_id, cwe_dir_name, file_count))
return cur.lastrowid
def update_cwe_scan(self, scan_id: int, **kwargs) -> None:
if not kwargs:
return
cols = ", ".join(f"{k} = ?" for k in kwargs)
vals = list(kwargs.values()) + [scan_id]
with self._cursor() as cur:
cur.execute(f"UPDATE cwe_scans SET {cols} WHERE id = ?", vals)
def get_cwe_scan(self, scan_id: int) -> dict | None:
with self._cursor() as cur:
cur.execute("SELECT * FROM cwe_scans WHERE id = ?", (scan_id,))
row = cur.fetchone()
return dict(row) if row else None
def get_completed_cwes(self, run_id: str) -> set[str]:
with self._cursor() as cur:
cur.execute("""
SELECT cwe_dir_name FROM cwe_scans
WHERE run_id = ? AND status = 'completed'
""", (run_id,))
return {row["cwe_dir_name"] for row in cur.fetchall()}
def insert_violations(self, violations: list[dict]) -> None:
if not violations:
return
with self._cursor() as cur:
cur.executemany("""
INSERT INTO violations
(cwe_scan_id, rule_id, file_path, line, classification,
in_bad_section, in_good_section, hits_flaw_line, is_cwe_matched)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [(v["cwe_scan_id"], v["rule_id"], v["file_path"], v["line"],
v["classification"], v["in_bad_section"], v["in_good_section"],
v["hits_flaw_line"], v["is_cwe_matched"])
for v in violations])
def insert_cwe_metrics(self, metrics: dict) -> None:
with self._cursor() as cur:
cur.execute("""
INSERT OR REPLACE INTO cwe_metrics
(cwe_scan_id, tp_count, fp_count, tp_rate_pct,
flaw_lines_total, flaw_lines_detected, flaw_detection_rate_pct,
cwe_matched_tp, cwe_matched_fp, noise_count, noise_ratio,
per_file_detected, per_file_total, per_file_rate,
flaw_hit_detected, flaw_hit_total, flaw_hit_rate)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (metrics["cwe_scan_id"],
metrics.get("tp_count", 0), metrics.get("fp_count", 0),
metrics.get("tp_rate_pct", 0),
metrics.get("flaw_lines_total", 0),
metrics.get("flaw_lines_detected", 0),
metrics.get("flaw_detection_rate_pct", 0),
metrics.get("cwe_matched_tp", 0),
metrics.get("cwe_matched_fp", 0),
metrics.get("noise_count", 0),
metrics.get("noise_ratio", 0),
metrics.get("per_file_detected", 0),
metrics.get("per_file_total", 0),
metrics.get("per_file_rate", 0),
metrics.get("flaw_hit_detected", 0),
metrics.get("flaw_hit_total", 0),
metrics.get("flaw_hit_rate", 0)))
def insert_rule_breakdown(self, rows: list[dict]) -> None:
if not rows:
return
with self._cursor() as cur:
cur.executemany("""
INSERT OR REPLACE INTO rule_cwe_breakdown
(cwe_scan_id, rule_id, tp_count, fp_count,
flaw_line_count, is_cwe_matched)
VALUES (?, ?, ?, ?, ?, ?)
""", [(r["cwe_scan_id"], r["rule_id"],
r.get("tp_count", 0), r.get("fp_count", 0),
r.get("flaw_line_count", 0), r.get("is_cwe_matched", 0))
for r in rows])
def get_progress(self, run_id: str) -> dict:
with self._cursor() as cur:
cur.execute("""
SELECT status, COUNT(*) as cnt
FROM cwe_scans WHERE run_id = ?
GROUP BY status
""", (run_id,))
status_counts = {row["status"]: row["cnt"] for row in cur.fetchall()}
cur.execute("SELECT * FROM runs WHERE run_id = ?", (run_id,))
run_row = cur.fetchone()
run = dict(run_row) if run_row else {}
cur.execute("""
SELECT cwe_dir_name, cwe_id, file_count, violation_count, duration_s
FROM cwe_scans
WHERE run_id = ? AND status = 'completed'
ORDER BY id DESC LIMIT 5
""", (run_id,))
recent = [dict(r) for r in cur.fetchall()]
total = sum(status_counts.values())
done = status_counts.get("completed", 0)
progress_pct = round(done / total * 100, 1) if total else 0
return {
"run": run,
"status_counts": status_counts,
"total_cwes": total,
"done_cwes": done,
"progress_pct": progress_pct,
"recently_completed": recent,
}
def get_run_summary(self, run_id: str) -> dict:
with self._cursor() as cur:
cur.execute("""
SELECT m.* FROM cwe_metrics m
JOIN cwe_scans s ON s.id = m.cwe_scan_id
WHERE s.run_id = ? AND s.cwe_id = 'ALL'
""", (run_id,))
agg_row = cur.fetchone()
cur.execute("""
SELECT COUNT(*) as cnt FROM cwe_scans
WHERE run_id = ? AND cwe_id != 'ALL'
""", (run_id,))
per_cwe_count = cur.fetchone()["cnt"]
if agg_row:
totals = {
"total_tp": agg_row["tp_count"],
"total_fp": agg_row["fp_count"],
"total_flaw_lines": agg_row["flaw_lines_total"],
"total_flaw_detected": agg_row["flaw_lines_detected"],
"total_cwe_matched_tp": agg_row["cwe_matched_tp"],
"total_cwe_matched_fp": agg_row["cwe_matched_fp"],
"total_noise": agg_row["noise_count"],
"total_per_file_detected": agg_row["per_file_detected"],
"total_per_file_total": agg_row["per_file_total"],
"total_flaw_hit_detected": agg_row["flaw_hit_detected"],
"total_flaw_hit_total": agg_row["flaw_hit_total"],
"cwes_analyzed": per_cwe_count or 1,
}
else:
cur.execute("""
SELECT
SUM(m.tp_count) as total_tp,
SUM(m.fp_count) as total_fp,
SUM(m.flaw_lines_total) as total_flaw_lines,
SUM(m.flaw_lines_detected) as total_flaw_detected,
SUM(m.cwe_matched_tp) as total_cwe_matched_tp,
SUM(m.cwe_matched_fp) as total_cwe_matched_fp,
SUM(m.noise_count) as total_noise,
SUM(m.per_file_detected) as total_per_file_detected,
SUM(m.per_file_total) as total_per_file_total,
SUM(m.flaw_hit_detected) as total_flaw_hit_detected,
SUM(m.flaw_hit_total) as total_flaw_hit_total,
COUNT(*) as cwes_analyzed
FROM cwe_metrics m
JOIN cwe_scans s ON s.id = m.cwe_scan_id
WHERE s.run_id = ?
""", (run_id,))
totals = dict(cur.fetchone())
cur.execute(f"""
SELECT
s.cwe_id, s.cwe_dir_name, s.file_count, s.duration_s,
m.tp_count, m.fp_count, m.tp_rate_pct,
m.flaw_lines_detected, m.flaw_lines_total,
m.flaw_detection_rate_pct,
m.cwe_matched_tp, m.cwe_matched_fp,
m.per_file_rate, m.flaw_hit_rate
FROM cwe_metrics m
JOIN cwe_scans s ON s.id = m.cwe_scan_id
WHERE s.run_id = ? AND s.cwe_id != 'ALL'
ORDER BY m.fp_count DESC
""", (run_id,))
per_cwe = [dict(r) for r in cur.fetchall()]
cur.execute("""
SELECT
rb.rule_id,
SUM(rb.tp_count) as tp,
SUM(rb.fp_count) as fp,
SUM(rb.tp_count) + SUM(rb.fp_count) as total
FROM rule_cwe_breakdown rb
JOIN cwe_scans s ON s.id = rb.cwe_scan_id
WHERE s.run_id = ? AND s.cwe_id != 'ALL'
GROUP BY rb.rule_id
ORDER BY fp DESC
LIMIT 20
""", (run_id,))
top_rules = [dict(r) for r in cur.fetchall()]
for r in top_rules:
r["fp_pct"] = round(r["fp"] / r["total"] * 100, 1) if r["total"] else 0
cur.execute("SELECT * FROM runs WHERE run_id = ?", (run_id,))
run_row = cur.fetchone()
run = dict(run_row) if run_row else {}
total_tp = totals["total_tp"] or 0
total_fp = totals["total_fp"] or 0
grand_total = total_tp + total_fp
summary = {
"total_violations": grand_total,
"total_tp": total_tp,
"total_fp": total_fp,
"tp_rate_pct": round(total_tp / grand_total * 100, 1) if grand_total else 0,
"fp_rate_pct": round(total_fp / grand_total * 100, 1) if grand_total else 0,
"cwes_analyzed": totals["cwes_analyzed"] or 0,
"run_name": run_id,
"version": run.get("sqc_version"),
"commit_sha": run.get("commit_sha"),
}
if run.get("started_at") and run.get("finished_at"):
summary["started_at"] = run["started_at"]
summary["finished_at"] = run["finished_at"]
cwe_matched_tp = totals["total_cwe_matched_tp"] or 0
cwe_matched_fp = totals["total_cwe_matched_fp"] or 0
cwe_matched_total = cwe_matched_tp + cwe_matched_fp
total_noise = totals["total_noise"] or 0
all_findings = cwe_matched_total + total_noise
per_file_detected = totals["total_per_file_detected"] or 0
per_file_total = totals["total_per_file_total"] or 0
flaw_hit_detected = totals["total_flaw_hit_detected"] or 0
flaw_hit_total = totals["total_flaw_hit_total"] or 0
cwe_aware = None
if cwe_matched_total > 0:
cwe_aware = {
"cwe_matched_tp": cwe_matched_tp,
"cwe_matched_fp": cwe_matched_fp,
"cwe_matched_total": cwe_matched_total,
"cwe_matched_tp_rate_pct": round(cwe_matched_tp / cwe_matched_total * 100, 1) if cwe_matched_total else 0,
"noise_total": total_noise,
"noise_ratio_pct": round(total_noise / all_findings * 100, 1) if all_findings else 0,
"per_file_detected": per_file_detected,
"per_file_total": per_file_total,
"per_file_rate_pct": round(per_file_detected / per_file_total * 100, 1) if per_file_total else 0,
"flaw_hit_detected": flaw_hit_detected,
"flaw_hit_total": flaw_hit_total,
"flaw_hit_rate_pct": round(flaw_hit_detected / flaw_hit_total * 100, 1) if flaw_hit_total else 0,
}
result = {
"summary": summary,
"top_rules": top_rules,
"per_cwe": per_cwe,
}
if cwe_aware:
result["cwe_aware"] = cwe_aware
return result
def get_cwe_detail(self, run_id: str, cwe_id: str) -> dict | None:
needle = cwe_id.upper()
if not needle.startswith("CWE-"):
if needle.startswith("CWE"):
needle = "CWE-" + needle[3:]
elif needle.isdigit():
needle = "CWE-" + needle
with self._cursor() as cur:
cur.execute("""
SELECT s.*, m.*
FROM cwe_scans s
LEFT JOIN cwe_metrics m ON m.cwe_scan_id = s.id
WHERE s.run_id = ? AND s.cwe_id = ?
""", (run_id, needle))
row = cur.fetchone()
if not row:
return None
data = dict(row)
scan_id = data["id"]
cur.execute("""
SELECT rule_id, tp_count, fp_count, flaw_line_count, is_cwe_matched
FROM rule_cwe_breakdown
WHERE cwe_scan_id = ?
ORDER BY fp_count DESC
""", (scan_id,))
rules = [dict(r) for r in cur.fetchall()]
tp = data.get("tp_count", 0) or 0
fp = data.get("fp_count", 0) or 0
total = tp + fp
detail = {
"cwe": data["cwe_dir_name"],
"cwe_id": data["cwe_id"],
"files_analyzed": data["file_count"],
"run_name": run_id,
"duration_s": data.get("duration_s"),
"summary": {
"total_violations": total,
"tp": tp,
"fp": fp,
"tp_rate_pct": round(tp / total * 100, 1) if total else 0,
"fp_rate_pct": round(fp / total * 100, 1) if total else 0,
"flaw_lines_detected": data.get("flaw_lines_detected", 0) or 0,
"flaw_lines_total": data.get("flaw_lines_total", 0) or 0,
"flaw_detection_rate_pct": data.get("flaw_detection_rate_pct", 0) or 0,
},
"top_tp_rules": [r for r in rules if r["tp_count"] > 0],
"top_fp_rules": [r for r in rules if r["fp_count"] > 0],
}
cwe_matched_tp = data.get("cwe_matched_tp", 0) or 0
cwe_matched_fp = data.get("cwe_matched_fp", 0) or 0
cwe_matched_total = cwe_matched_tp + cwe_matched_fp
if cwe_matched_total > 0:
detail["cwe_aware"] = {
"cwe_matched_tp": cwe_matched_tp,
"cwe_matched_fp": cwe_matched_fp,
"cwe_matched_total": cwe_matched_total,
"cwe_matched_tp_rate_pct": round(cwe_matched_tp / cwe_matched_total * 100, 1) if cwe_matched_total else 0,
"noise_count": data.get("noise_count", 0) or 0,
"noise_ratio_pct": data.get("noise_ratio", 0) or 0,
"per_file_detected": data.get("per_file_detected", 0) or 0,
"per_file_total": data.get("per_file_total", 0) or 0,
"per_file_rate_pct": data.get("per_file_rate", 0) or 0,
"flaw_hit_detected": data.get("flaw_hit_detected", 0) or 0,
"flaw_hit_total": data.get("flaw_hit_total", 0) or 0,
"flaw_hit_rate_pct": data.get("flaw_hit_rate", 0) or 0,
"cwe_matched_tp_rules": [r for r in rules if r["is_cwe_matched"] and r["tp_count"] > 0],
"cwe_matched_fp_rules": [r for r in rules if r["is_cwe_matched"] and r["fp_count"] > 0],
}
return detail
def compare_runs(self, base_id: str, target_id: str) -> dict:
base = self.get_run_summary(base_id)
target = self.get_run_summary(target_id)
if not base["summary"]["cwes_analyzed"] or not target["summary"]["cwes_analyzed"]:
return {"error": "One or both runs have no analyzed CWEs."}
bs, ts = base["summary"], target["summary"]
summary = {
"base_run": base_id,
"target_run": target_id,
"base": {
"tp": bs["total_tp"], "fp": bs["total_fp"],
"total": bs["total_violations"],
"tp_rate_pct": bs["tp_rate_pct"],
"cwes": bs["cwes_analyzed"],
},
"target": {
"tp": ts["total_tp"], "fp": ts["total_fp"],
"total": ts["total_violations"],
"tp_rate_pct": ts["tp_rate_pct"],
"cwes": ts["cwes_analyzed"],
},
"delta": {
"tp": ts["total_tp"] - bs["total_tp"],
"fp": ts["total_fp"] - bs["total_fp"],
"total": ts["total_violations"] - bs["total_violations"],
"tp_rate_pp": round(ts["tp_rate_pct"] - bs["tp_rate_pct"], 2),
},
}
base_cwes = {c["cwe_id"]: c for c in base["per_cwe"]}
target_cwes = {c["cwe_id"]: c for c in target["per_cwe"]}
all_cwe_ids = sorted(set(base_cwes) | set(target_cwes))
cwe_deltas = []
for cid in all_cwe_ids:
b = base_cwes.get(cid, {"tp_count": 0, "fp_count": 0, "tp_rate_pct": 0})
t = target_cwes.get(cid, {"tp_count": 0, "fp_count": 0, "tp_rate_pct": 0})
b_tp = b.get("tp_count", 0) or 0
b_fp = b.get("fp_count", 0) or 0
t_tp = t.get("tp_count", 0) or 0
t_fp = t.get("fp_count", 0) or 0
b_rate = b.get("tp_rate_pct", 0) or 0
t_rate = t.get("tp_rate_pct", 0) or 0
cwe_deltas.append({
"cwe_id": cid,
"base_tp": b_tp, "base_fp": b_fp,
"target_tp": t_tp, "target_fp": t_fp,
"delta_tp": t_tp - b_tp, "delta_fp": t_fp - b_fp,
"base_tp_pct": b_rate, "target_tp_pct": t_rate,
"delta_tp_rate_pp": round(t_rate - b_rate, 2),
})
cwe_deltas.sort(key=lambda x: x["delta_fp"])
improvements = [d for d in cwe_deltas if d["delta_fp"] < 0][:15]
regressions = sorted([d for d in cwe_deltas if d["delta_fp"] > 0],
key=lambda x: -x["delta_fp"])[:15]
base_rules = {r["rule_id"]: r for r in base["top_rules"]}
target_rules = {r["rule_id"]: r for r in target["top_rules"]}
all_rule_ids = sorted(set(base_rules) | set(target_rules))
rule_deltas = []
for rid in all_rule_ids:
b = base_rules.get(rid, {"tp": 0, "fp": 0})
t = target_rules.get(rid, {"tp": 0, "fp": 0})
rule_deltas.append({
"rule": rid,
"base_tp": b["tp"], "base_fp": b["fp"],
"target_tp": t["tp"], "target_fp": t["fp"],
"delta_tp": t["tp"] - b["tp"],
"delta_fp": t["fp"] - b["fp"],
})
rule_deltas.sort(key=lambda x: x["delta_fp"])
rule_improvements = [d for d in rule_deltas if d["delta_fp"] < 0][:10]
rule_regressions = sorted([d for d in rule_deltas if d["delta_fp"] > 0],
key=lambda x: -x["delta_fp"])[:10]
result = {
"summary": summary,
"cwe_improvements": improvements,
"cwe_regressions": regressions,
"rule_improvements": rule_improvements,
"rule_regressions": rule_regressions,
"all_cwe_deltas": cwe_deltas,
}
if base.get("cwe_aware") and target.get("cwe_aware"):
ba, ta = base["cwe_aware"], target["cwe_aware"]
result["cwe_aware"] = {
"base": ba,
"target": ta,
"delta": {
"cwe_matched_tp": ta["cwe_matched_tp"] - ba["cwe_matched_tp"],
"cwe_matched_fp": ta["cwe_matched_fp"] - ba["cwe_matched_fp"],
"cwe_matched_tp_rate_pp": round(
ta["cwe_matched_tp_rate_pct"] - ba["cwe_matched_tp_rate_pct"], 2),
"per_file_rate_pp": round(
ta["per_file_rate_pct"] - ba["per_file_rate_pct"], 2),
"flaw_hit_rate_pp": round(
ta["flaw_hit_rate_pct"] - ba["flaw_hit_rate_pct"], 2),
},
}
return result
def create_realworld_run(self, sqc_version: str, commit_sha: str = None,
scanned_at: str = None, hostname: str = None,
cpu_model: str = None, cpu_cores: int = None,
notes: str = None) -> int:
with self._cursor() as cur:
cur.execute("""
INSERT INTO realworld_runs
(sqc_version, commit_sha, scanned_at, hostname, cpu_model, cpu_cores, notes)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (sqc_version, commit_sha, scanned_at, hostname, cpu_model, cpu_cores, notes))
return cur.lastrowid
def insert_realworld_result(self, run_id: int, project: str, tool: str,
c_files: int = 0, loc: int = 0,
violation_count: int = 0,
duration_s: float = None) -> None:
with self._cursor() as cur:
cur.execute("""
INSERT OR REPLACE INTO realworld_results
(run_id, project, tool, c_files, loc, violation_count, duration_s)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (run_id, project, tool, c_files, loc, violation_count, duration_s))
def insert_realworld_violations(self, result_id: int,
violations: list[dict]) -> None:
if not violations:
return
with self._cursor() as cur:
cur.executemany("""
INSERT INTO realworld_violations
(result_id, rule_id, file_path, line, column_num,
severity, message, suggestion)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", [(result_id, v["rule_id"], v["file"], v["line"],
v.get("column", 0), v.get("severity"),
v.get("message"), v.get("suggestion"))
for v in violations])
def ingest_realworld_run(self, version_dir: str, results_path: str,
machine: dict = None,
durations: dict[str, float] = None) -> int:
import os
from datetime import datetime
parts = version_dir.split("-", 2)
version = parts[1] if len(parts) > 1 else version_dir
commit = parts[2] if len(parts) > 2 else None
machine = machine or {}
durations = durations or {}
run_id = self.create_realworld_run(
sqc_version=version,
commit_sha=commit,
scanned_at=datetime.now().isoformat(),
hostname=machine.get("hostname"),
cpu_model=machine.get("cpu_model"),
cpu_cores=machine.get("cpu_cores"),
notes=f"ingested from {version_dir}",
)
results_dir = Path(results_path)
for json_file in sorted(results_dir.glob("*.json")):
stem = json_file.stem
name_parts = stem.split("-")
if len(name_parts) >= 3:
project = name_parts[1]
else:
continue
violations = json.load(open(json_file))
violation_count = len(violations)
rule_counts = {}
for v in violations:
rid = v.get("rule_id", "unknown")
rule_counts[rid] = rule_counts.get(rid, 0) + 1
duration = durations.get(project)
result_id = None
with self._cursor() as cur:
cur.execute("""
INSERT OR REPLACE INTO realworld_results
(run_id, project, tool, c_files, loc, violation_count, duration_s)
VALUES (?, ?, 'sqc', 0, 0, ?, ?)
""", (run_id, project, violation_count, duration))
result_id = cur.lastrowid
if result_id and violations:
self.insert_realworld_violations(result_id, violations)
return run_id
def get_realworld_rule_summary(self, run_id: int, project: str = None) -> list[dict]:
with self._cursor() as cur:
if project:
cur.execute("""
SELECT rv.rule_id, COUNT(*) as count
FROM realworld_violations rv
JOIN realworld_results rr ON rr.id = rv.result_id
WHERE rr.run_id = ? AND rr.project = ?
GROUP BY rv.rule_id
ORDER BY count DESC
""", (run_id, project))
else:
cur.execute("""
SELECT rv.rule_id, COUNT(*) as count
FROM realworld_violations rv
JOIN realworld_results rr ON rr.id = rv.result_id
WHERE rr.run_id = ?
GROUP BY rv.rule_id
ORDER BY count DESC
""", (run_id,))
return [dict(r) for r in cur.fetchall()]
def compare_realworld_runs(self, base_run_id: int, target_run_id: int,
project: str = None) -> dict:
base_rules = {r["rule_id"]: r["count"]
for r in self.get_realworld_rule_summary(base_run_id, project)}
target_rules = {r["rule_id"]: r["count"]
for r in self.get_realworld_rule_summary(target_run_id, project)}
all_rules = sorted(set(base_rules) | set(target_rules))
deltas = []
for rid in all_rules:
b = base_rules.get(rid, 0)
t = target_rules.get(rid, 0)
if b != t:
deltas.append({"rule_id": rid, "base": b, "target": t, "delta": t - b})
deltas.sort(key=lambda x: x["delta"])
base_total = sum(base_rules.values())
target_total = sum(target_rules.values())
return {
"base_total": base_total,
"target_total": target_total,
"delta_total": target_total - base_total,
"rule_deltas": deltas,
}
def list_realworld_runs(self) -> list[dict]:
with self._cursor() as cur:
cur.execute("SELECT * FROM realworld_runs ORDER BY id DESC")
return [dict(r) for r in cur.fetchall()]
def get_realworld_results(self, run_id: int) -> list[dict]:
with self._cursor() as cur:
cur.execute("""
SELECT * FROM realworld_results
WHERE run_id = ? ORDER BY project, tool
""", (run_id,))
return [dict(r) for r in cur.fetchall()]
def get_realworld_project_history(self, project: str) -> list[dict]:
with self._cursor() as cur:
cur.execute("""
SELECT r.sqc_version, r.notes, rr.tool, rr.violation_count, rr.c_files, rr.loc
FROM realworld_results rr
JOIN realworld_runs r ON r.id = rr.run_id
WHERE rr.project = ?
ORDER BY r.sqc_version, rr.tool
""", (project,))
return [dict(r) for r in cur.fetchall()]
def resolve_realworld_run(self, identifier: str) -> int | None:
ident = identifier.strip()
with self._cursor() as cur:
if ident.lower() in ("latest", "current"):
cur.execute("SELECT id FROM realworld_runs ORDER BY id DESC LIMIT 1")
row = cur.fetchone()
return row["id"] if row else None
try:
int_id = int(ident)
cur.execute("SELECT id FROM realworld_runs WHERE id = ?", (int_id,))
row = cur.fetchone()
if row:
return row["id"]
except ValueError:
pass
cur.execute("""
SELECT id FROM realworld_runs
WHERE sqc_version = ?
ORDER BY id DESC LIMIT 1
""", (ident,))
row = cur.fetchone()
if row:
return row["id"]
cur.execute("""
SELECT id FROM realworld_runs
WHERE commit_sha = ?
ORDER BY id DESC LIMIT 1
""", (ident,))
row = cur.fetchone()
if row:
return row["id"]
cur.execute("""
SELECT id FROM realworld_runs
WHERE notes LIKE ?
ORDER BY id DESC LIMIT 1
""", (f"%{ident}%",))
row = cur.fetchone()
return row["id"] if row else None
def get_realworld_run(self, run_id: int) -> dict | None:
with self._cursor() as cur:
cur.execute("SELECT * FROM realworld_runs WHERE id = ?", (run_id,))
row = cur.fetchone()
return dict(row) if row else None
def get_realworld_rule_trend(self, rule_id: str,
project: str = None) -> list[dict]:
with self._cursor() as cur:
if project:
cur.execute("""
SELECT r.sqc_version, r.id as run_id, rr.project,
COUNT(*) as count
FROM realworld_violations rv
JOIN realworld_results rr ON rr.id = rv.result_id
JOIN realworld_runs r ON r.id = rr.run_id
WHERE rv.rule_id = ? AND rr.project = ?
GROUP BY r.id, rr.project
ORDER BY r.id
""", (rule_id, project))
else:
cur.execute("""
SELECT r.sqc_version, r.id as run_id, rr.project,
COUNT(*) as count
FROM realworld_violations rv
JOIN realworld_results rr ON rr.id = rv.result_id
JOIN realworld_runs r ON r.id = rr.run_id
WHERE rv.rule_id = ?
GROUP BY r.id, rr.project
ORDER BY r.id, rr.project
""", (rule_id,))
return [dict(r) for r in cur.fetchall()]
def get_realworld_run_summary(self, run_id: int) -> dict:
run = self.get_realworld_run(run_id)
if not run:
return {"error": f"Run {run_id} not found"}
results = self.get_realworld_results(run_id)
rule_summary = self.get_realworld_rule_summary(run_id)
per_project_rules = {}
with self._cursor() as cur:
for result in results:
if result["tool"] != "sqc":
continue
cur.execute("""
SELECT rv.rule_id, COUNT(*) as count
FROM realworld_violations rv
WHERE rv.result_id = ?
GROUP BY rv.rule_id
ORDER BY count DESC
""", (result["id"],))
per_project_rules[result["project"]] = [
dict(r) for r in cur.fetchall()
]
total_violations = sum(r["violation_count"] for r in results
if r["tool"] == "sqc")
return {
"run": run,
"total_violations": total_violations,
"projects": results,
"rule_summary": rule_summary,
"per_project_rules": per_project_rules,
}
def get_realworld_dashboard(self, run_id: int,
base_run_id: int | None = None,
top_n: int = 25) -> dict:
run = self.get_realworld_run(run_id)
if not run:
return {"error": f"Run {run_id} not found"}
results = self.get_realworld_results(run_id)
sqc_results = [r for r in results if r["tool"] == "sqc"]
rule_summary = self.get_realworld_rule_summary(run_id)
total_violations = sum(r["violation_count"] for r in sqc_results)
per_project = []
for r in sqc_results:
proj_rules = self.get_realworld_rule_summary(run_id, r["project"])
entry = {
"project": r["project"],
"violation_count": r["violation_count"],
}
if r.get("duration_s") is not None:
entry["duration_s"] = r["duration_s"]
entry["top_rules"] = proj_rules[:10]
per_project.append(entry)
top_rules = []
base_rules = {}
if base_run_id:
base_rules = {r["rule_id"]: r["count"]
for r in self.get_realworld_rule_summary(base_run_id)}
for r in rule_summary[:top_n]:
entry = {"rule_id": r["rule_id"], "count": r["count"]}
if base_rules:
base_count = base_rules.get(r["rule_id"], 0)
entry["base_count"] = base_count
entry["delta"] = r["count"] - base_count
top_rules.append(entry)
if base_rules:
target_rule_ids = {r["rule_id"] for r in rule_summary}
for rule_id, base_count in sorted(base_rules.items(),
key=lambda x: -x[1]):
if rule_id not in target_rule_ids and base_count > 0:
top_rules.append({
"rule_id": rule_id,
"count": 0,
"base_count": base_count,
"delta": -base_count,
})
dashboard = {
"run": run,
"total_violations": total_violations,
"top_rules": top_rules,
"per_project": per_project,
}
if base_run_id:
base_run = self.get_realworld_run(base_run_id)
base_total = sum(r["violation_count"]
for r in self.get_realworld_results(base_run_id)
if r["tool"] == "sqc")
dashboard["base_run"] = base_run
dashboard["base_total"] = base_total
dashboard["total_delta"] = total_violations - base_total
return dashboard
def resolve_run(self, identifier: str) -> str | None:
ident = identifier.strip()
if ident.lower() in ("latest", "current"):
with self._cursor() as cur:
cur.execute("SELECT run_id FROM runs ORDER BY started_at DESC LIMIT 1")
row = cur.fetchone()
return row["run_id"] if row else None
if self.get_run(ident):
return ident
with self._cursor() as cur:
cur.execute("SELECT run_id FROM runs WHERE commit_sha = ? ORDER BY started_at DESC LIMIT 1",
(ident,))
row = cur.fetchone()
if row:
return row["run_id"]
cur.execute("SELECT run_id FROM runs WHERE run_id LIKE ? ORDER BY started_at DESC LIMIT 1",
(f"%{ident}%",))
row = cur.fetchone()
return row["run_id"] if row else None