use rusqlite::{Connection, Result as SqlResult};
pub const SCHEMA_VERSION: u32 = 2;
const SCHEMA_SQL: &str = r#"
PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;
PRAGMA synchronous = NORMAL;
CREATE TABLE IF NOT EXISTS machines (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
hostname TEXT,
transport TEXT NOT NULL DEFAULT 'local',
ssh_host TEXT,
ssh_user TEXT,
ssh_port INTEGER DEFAULT 22,
first_seen TEXT NOT NULL,
last_seen TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'active'
);
CREATE TABLE IF NOT EXISTS generations (
id INTEGER PRIMARY KEY,
generation_num INTEGER NOT NULL UNIQUE,
run_id TEXT NOT NULL,
config_hash TEXT NOT NULL,
git_ref TEXT,
config_snapshot TEXT,
operator TEXT,
created_at TEXT NOT NULL,
parent_gen INTEGER REFERENCES generations(id),
action TEXT NOT NULL DEFAULT 'apply'
);
CREATE TABLE IF NOT EXISTS resources (
id INTEGER PRIMARY KEY,
resource_id TEXT NOT NULL,
machine_id INTEGER NOT NULL REFERENCES machines(id),
generation_id INTEGER NOT NULL REFERENCES generations(id),
resource_type TEXT NOT NULL,
status TEXT NOT NULL,
state_hash TEXT,
content_hash TEXT,
live_hash TEXT,
applied_at TEXT NOT NULL,
duration_secs REAL NOT NULL DEFAULT 0.0,
details_json TEXT NOT NULL DEFAULT '{}',
path TEXT,
packages TEXT,
content_preview TEXT,
reversibility TEXT NOT NULL DEFAULT 'reversible',
UNIQUE(resource_id, machine_id, generation_id)
);
CREATE VIRTUAL TABLE IF NOT EXISTS resources_fts USING fts5(
resource_id, resource_type, path, packages, content_preview,
content='resources',
content_rowid='id',
tokenize='porter unicode61 remove_diacritics 2'
);
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY,
run_id TEXT NOT NULL,
resource_id TEXT NOT NULL,
machine TEXT NOT NULL,
event_type TEXT NOT NULL,
timestamp TEXT NOT NULL,
duration_ms INTEGER,
exit_code INTEGER,
stdout_tail TEXT,
stderr_tail TEXT,
details TEXT
);
CREATE TABLE IF NOT EXISTS run_logs (
id INTEGER PRIMARY KEY,
run_id TEXT NOT NULL,
machine TEXT NOT NULL,
resource_id TEXT,
log_level TEXT NOT NULL DEFAULT 'info',
message TEXT NOT NULL,
timestamp TEXT NOT NULL
);
CREATE VIRTUAL TABLE IF NOT EXISTS run_logs_fts USING fts5(
message, resource_id, machine,
content='run_logs',
content_rowid='id'
);
CREATE TABLE IF NOT EXISTS destroy_log (
id INTEGER PRIMARY KEY,
machine_id INTEGER NOT NULL REFERENCES machines(id),
generation_id INTEGER NOT NULL REFERENCES generations(id),
resource_id TEXT NOT NULL,
resource_type TEXT NOT NULL,
pre_destroy_hash TEXT,
pre_destroy_details TEXT,
destroyed_at TEXT NOT NULL,
success INTEGER NOT NULL DEFAULT 1,
error TEXT
);
CREATE TABLE IF NOT EXISTS drift_findings (
id INTEGER PRIMARY KEY,
machine_id INTEGER NOT NULL REFERENCES machines(id),
resource_id TEXT NOT NULL,
resource_type TEXT NOT NULL,
expected_hash TEXT NOT NULL,
actual_hash TEXT NOT NULL,
detail TEXT NOT NULL,
detected_at TEXT NOT NULL,
resolved_at TEXT,
resolved_by TEXT
);
CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(
event_type, resource_id, error, action,
tokenize='porter unicode61 remove_diacritics 2'
);
CREATE TABLE IF NOT EXISTS ingest_cursor (
machine_id INTEGER PRIMARY KEY REFERENCES machines(id),
last_event_offset INTEGER NOT NULL DEFAULT 0,
last_lock_hash TEXT
);
CREATE INDEX IF NOT EXISTS idx_resources_machine ON resources(machine_id);
CREATE INDEX IF NOT EXISTS idx_resources_gen ON resources(generation_id);
CREATE INDEX IF NOT EXISTS idx_resources_type ON resources(resource_type);
CREATE INDEX IF NOT EXISTS idx_resources_status ON resources(status);
CREATE INDEX IF NOT EXISTS idx_resources_path ON resources(path);
CREATE INDEX IF NOT EXISTS idx_events_run ON events(run_id);
CREATE INDEX IF NOT EXISTS idx_events_resource ON events(resource_id);
CREATE INDEX IF NOT EXISTS idx_run_logs_run ON run_logs(run_id);
CREATE INDEX IF NOT EXISTS idx_drift_machine ON drift_findings(machine_id);
CREATE INDEX IF NOT EXISTS idx_drift_resolved ON drift_findings(resolved_at);
CREATE INDEX IF NOT EXISTS idx_destroy_machine ON destroy_log(machine_id);
CREATE INDEX IF NOT EXISTS idx_destroy_resource ON destroy_log(resource_id);
"#;
pub fn open_state_db(path: &std::path::Path) -> Result<Connection, String> {
let conn = Connection::open(path).map_err(|e| format!("sqlite open: {e}"))?;
conn.execute_batch(SCHEMA_SQL)
.map_err(|e| format!("sqlite schema: {e}"))?;
Ok(conn)
}
pub fn schema_version(conn: &Connection) -> SqlResult<u32> {
conn.pragma_query_value(None, "user_version", |row| row.get(0))
}
pub fn set_schema_version(conn: &Connection, version: u32) -> SqlResult<()> {
conn.pragma_update(None, "user_version", version)
}
pub fn fts5_search(conn: &Connection, query: &str, limit: u32) -> Result<Vec<FtsResult>, String> {
let trimmed = query.trim();
if trimmed.is_empty() {
return Ok(vec![]);
}
let escaped = trimmed.replace('"', "\"\"");
let safe_query = format!("\"{escaped}\"");
let mut stmt = conn
.prepare(
"SELECT r.resource_id, r.resource_type, r.status, r.path, fts.rank \
FROM resources_fts fts \
JOIN resources r ON r.id = fts.rowid \
WHERE resources_fts MATCH ?1 \
ORDER BY fts.rank LIMIT ?2",
)
.map_err(|e| format!("prepare: {e}"))?;
let rows = stmt
.query_map(rusqlite::params![safe_query, limit], |row| {
Ok(FtsResult {
resource_id: row.get(0)?,
resource_type: row.get(1)?,
status: row.get(2)?,
path: row.get(3)?,
rank: row.get(4)?,
})
})
.map_err(|e| format!("query: {e}"))?;
rows.collect::<SqlResult<Vec<_>>>()
.map_err(|e| format!("collect: {e}"))
}
pub fn list_all_resources(conn: &Connection, limit: u32) -> Result<Vec<FtsResult>, String> {
let mut stmt = conn
.prepare(
"SELECT resource_id, resource_type, status, path \
FROM resources ORDER BY resource_id LIMIT ?1",
)
.map_err(|e| format!("prepare: {e}"))?;
let rows = stmt
.query_map(rusqlite::params![limit], |row| {
Ok(FtsResult {
resource_id: row.get(0)?,
resource_type: row.get(1)?,
status: row.get(2)?,
path: row.get(3)?,
rank: 0.0,
})
})
.map_err(|e| format!("query: {e}"))?;
rows.collect::<SqlResult<Vec<_>>>()
.map_err(|e| format!("collect: {e}"))
}
#[derive(Debug, Clone)]
pub struct FtsResult {
pub resource_id: String,
pub resource_type: String,
pub status: String,
pub path: Option<String>,
pub rank: f64,
}