use anyhow::{bail, Result};
use rusqlite::Connection;
pub(crate) const CURRENT_VERSION: u32 = 11;
pub(crate) fn initialize(conn: &Connection) -> Result<()> {
conn.execute_batch(SCHEMA_CURRENT)?;
conn.pragma_update(None, "user_version", CURRENT_VERSION)?;
Ok(())
}
pub(crate) fn migrate(conn: &Connection) -> Result<()> {
let version: u32 = conn.pragma_query_value(None, "user_version", |row| row.get(0))?;
if version == CURRENT_VERSION {
return Ok(());
}
if version > CURRENT_VERSION {
bail!(
"state.db schema version {version} is newer than supported ({CURRENT_VERSION}); \
upgrade the ccd CLI"
);
}
if version == 0 {
conn.execute_batch(SCHEMA_CURRENT)?;
repair_zero_version_schema(conn)?;
conn.pragma_update(None, "user_version", CURRENT_VERSION)?;
return Ok(());
} else if version < 2 {
migrate_v1_to_v2(conn)?;
}
if version < 3 {
migrate_v2_to_v3(conn)?;
}
if version < 4 {
migrate_v3_to_v4(conn)?;
}
if version < 5 {
migrate_v4_to_v5(conn)?;
}
if version < 6 {
migrate_v5_to_v6(conn)?;
}
if version < 7 {
migrate_v6_to_v7(conn)?;
}
if version < 8 {
migrate_v7_to_v8(conn)?;
}
if version < 9 {
migrate_v8_to_v9(conn)?;
}
if version < 10 {
migrate_v9_to_v10(conn)?;
}
if version < 11 {
migrate_v10_to_v11(conn)?;
}
conn.pragma_update(None, "user_version", CURRENT_VERSION)?;
Ok(())
}
fn repair_zero_version_schema(conn: &Connection) -> Result<()> {
if !table_has_column(conn, "session", "mode")? {
migrate_v3_to_v4(conn)?;
}
if !table_has_column(conn, "telemetry_cost", "next_step_key")? {
migrate_v4_to_v5(conn)?;
migrate_v9_to_v10(conn)?;
}
if !table_has_column(conn, "session", "revision")? {
migrate_v5_to_v6(conn)?;
}
if !table_has_column(conn, "handoff", "revision")?
|| !table_has_column(conn, "execution_gates", "revision")?
{
migrate_v6_to_v7(conn)?;
}
if !table_exists(conn, "memory_op_queue")? {
migrate_v8_to_v9(conn)?;
}
if table_has_column(conn, "telemetry_cost", "focus_key")? {
migrate_v9_to_v10(conn)?;
}
if !table_has_column(conn, "projection_metadata", "session_id")? {
migrate_v10_to_v11(conn)?;
}
Ok(())
}
fn table_has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
let mut rows = stmt.query([])?;
while let Some(row) = rows.next()? {
if row.get::<_, String>(1)? == column {
return Ok(true);
}
}
Ok(false)
}
fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = ?1",
[table],
|row| row.get(0),
)?;
Ok(count > 0)
}
const SCHEMA_CURRENT: &str = r#"
CREATE TABLE IF NOT EXISTS handoff (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]',
revision INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS session (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 4,
started_at_epoch_s INTEGER NOT NULL,
last_started_at_epoch_s INTEGER NOT NULL,
start_count INTEGER NOT NULL DEFAULT 1,
session_id TEXT,
mode TEXT NOT NULL DEFAULT 'general' CHECK (mode IN ('general', 'research', 'implement')),
owner_kind TEXT CHECK (owner_kind IN ('interactive', 'runtime_supervisor', 'runtime_worker')),
owner_id TEXT,
supervisor_id TEXT,
lease_ttl_secs INTEGER,
last_heartbeat_at_epoch_s INTEGER,
revision INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS session_activity (
id INTEGER PRIMARY KEY CHECK (id = 1),
session_id TEXT NOT NULL,
actor_id TEXT NOT NULL,
current_activity TEXT NOT NULL,
updated_at_epoch_s INTEGER NOT NULL,
session_revision INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS escalation (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL CHECK (kind IN ('blocking', 'non_blocking')),
reason TEXT NOT NULL,
created_at_epoch_s INTEGER NOT NULL,
session_id TEXT
);
CREATE TABLE IF NOT EXISTS recovery (
id INTEGER PRIMARY KEY CHECK (id = 1),
checkpoint_origin TEXT,
checkpoint_captured_at_epoch_s INTEGER,
checkpoint_session_started_at_epoch_s INTEGER,
checkpoint_summary TEXT,
checkpoint_immediate_actions TEXT,
checkpoint_key_files TEXT,
buffer_origin TEXT,
buffer_captured_at_epoch_s INTEGER,
buffer_session_started_at_epoch_s INTEGER,
buffer_summary_lines TEXT
);
CREATE TABLE IF NOT EXISTS projection_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
source_fingerprint TEXT NOT NULL,
projection_digests TEXT,
tool_surface_fingerprint TEXT,
session_id TEXT
);
CREATE TABLE IF NOT EXISTS execution_gates (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 2,
seeded_from TEXT,
gates TEXT NOT NULL DEFAULT '[]',
revision INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS telemetry_cost (
session_id TEXT PRIMARY KEY,
recorded_at_epoch_s INTEGER NOT NULL,
next_step_key TEXT NOT NULL DEFAULT '',
next_step_title TEXT,
ccd_id INTEGER NOT NULL DEFAULT 0,
github_issue_number INTEGER NOT NULL DEFAULT 0,
backlog_provider TEXT,
backlog_kind TEXT,
backlog_id TEXT,
backlog_url TEXT,
model TEXT,
session_cost_usd REAL NOT NULL,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
cache_creation_input_tokens INTEGER NOT NULL DEFAULT 0,
cache_read_input_tokens INTEGER NOT NULL DEFAULT 0,
blended_total_tokens INTEGER
);
CREATE INDEX IF NOT EXISTS telemetry_cost_next_step_key_idx
ON telemetry_cost (next_step_key);
CREATE TABLE IF NOT EXISTS memory_op_queue (
id TEXT PRIMARY KEY,
command TEXT NOT NULL,
request_fingerprint TEXT NOT NULL,
plan_json TEXT NOT NULL,
staged_at_epoch_s INTEGER NOT NULL,
updated_at_epoch_s INTEGER NOT NULL,
actor_id TEXT,
session_id TEXT,
reconciled INTEGER NOT NULL DEFAULT 0,
outcome TEXT,
authored_entry_ids TEXT NOT NULL DEFAULT '[]',
authored_target_path TEXT,
authored_source_path TEXT,
snapshot_json TEXT
);
"#;
fn migrate_v1_to_v2(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
DROP TABLE IF EXISTS handoff_v2;
CREATE TABLE handoff_v2 (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]'
);
INSERT INTO handoff_v2
(id, schema_version, title, immediate_actions, completed_state,
operational_guardrails, key_files, definition_of_done)
SELECT
id, schema_version, title, immediate_actions, completed_state,
operational_guardrails, key_files, definition_of_done
FROM handoff;
DROP TABLE handoff;
ALTER TABLE handoff_v2 RENAME TO handoff;
"#,
)?;
Ok(())
}
fn migrate_v2_to_v3(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS execution_gates (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
seeded_from TEXT,
gates TEXT NOT NULL DEFAULT '[]'
);
"#,
)?;
Ok(())
}
fn migrate_v3_to_v4(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
ALTER TABLE session
ADD COLUMN mode TEXT NOT NULL DEFAULT 'general'
CHECK (mode IN ('general', 'research', 'implement'));
"#,
)?;
Ok(())
}
fn migrate_v4_to_v5(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS telemetry_cost (
session_id TEXT PRIMARY KEY,
recorded_at_epoch_s INTEGER NOT NULL,
focus_key TEXT NOT NULL DEFAULT '',
focus_title TEXT,
ccd_id INTEGER NOT NULL DEFAULT 0,
github_issue_number INTEGER NOT NULL DEFAULT 0,
backlog_provider TEXT,
backlog_kind TEXT,
backlog_id TEXT,
backlog_url TEXT,
model TEXT,
session_cost_usd REAL NOT NULL,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
cache_creation_input_tokens INTEGER NOT NULL DEFAULT 0,
cache_read_input_tokens INTEGER NOT NULL DEFAULT 0,
blended_total_tokens INTEGER
);
CREATE INDEX IF NOT EXISTS telemetry_cost_focus_key_idx
ON telemetry_cost (focus_key);
"#,
)?;
Ok(())
}
fn migrate_v5_to_v6(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
ALTER TABLE session
ADD COLUMN owner_kind TEXT
CHECK (owner_kind IN ('interactive', 'runtime_supervisor', 'runtime_worker'));
ALTER TABLE session
ADD COLUMN owner_id TEXT;
ALTER TABLE session
ADD COLUMN supervisor_id TEXT;
ALTER TABLE session
ADD COLUMN lease_ttl_secs INTEGER;
ALTER TABLE session
ADD COLUMN last_heartbeat_at_epoch_s INTEGER;
ALTER TABLE session
ADD COLUMN revision INTEGER NOT NULL DEFAULT 0;
UPDATE session
SET owner_kind = 'interactive'
WHERE owner_kind IS NULL
AND session_id IS NOT NULL;
UPDATE session
SET owner_id = 'interactive'
WHERE owner_id IS NULL
AND session_id IS NOT NULL;
UPDATE session
SET revision = 1
WHERE session_id IS NOT NULL
AND revision = 0;
"#,
)?;
Ok(())
}
fn migrate_v6_to_v7(conn: &Connection) -> Result<()> {
if !table_has_column(conn, "handoff", "revision")? {
conn.execute_batch(
r#"
ALTER TABLE handoff
ADD COLUMN revision INTEGER NOT NULL DEFAULT 0;
UPDATE handoff
SET revision = 1
WHERE revision = 0
AND (
title != ''
OR immediate_actions != '[]'
OR completed_state != '[]'
OR operational_guardrails != '[]'
OR key_files != '[]'
OR definition_of_done != '[]'
);
"#,
)?;
}
if !table_has_column(conn, "execution_gates", "revision")? {
conn.execute_batch(
r#"
ALTER TABLE execution_gates
ADD COLUMN revision INTEGER NOT NULL DEFAULT 0;
UPDATE execution_gates
SET schema_version = CASE
WHEN schema_version < 2 THEN 2
ELSE schema_version
END,
revision = CASE
WHEN revision = 0 AND gates != '[]' THEN 1
ELSE revision
END;
"#,
)?;
}
Ok(())
}
fn migrate_v7_to_v8(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS session_activity (
id INTEGER PRIMARY KEY CHECK (id = 1),
session_id TEXT NOT NULL,
actor_id TEXT NOT NULL,
current_activity TEXT NOT NULL,
updated_at_epoch_s INTEGER NOT NULL,
session_revision INTEGER NOT NULL
);
"#,
)?;
Ok(())
}
fn migrate_v8_to_v9(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS memory_op_queue (
id TEXT PRIMARY KEY,
command TEXT NOT NULL,
request_fingerprint TEXT NOT NULL,
plan_json TEXT NOT NULL,
staged_at_epoch_s INTEGER NOT NULL,
updated_at_epoch_s INTEGER NOT NULL,
actor_id TEXT,
session_id TEXT,
reconciled INTEGER NOT NULL DEFAULT 0,
outcome TEXT,
authored_entry_ids TEXT NOT NULL DEFAULT '[]',
authored_target_path TEXT,
authored_source_path TEXT,
snapshot_json TEXT
);
"#,
)?;
Ok(())
}
fn migrate_v9_to_v10(conn: &Connection) -> Result<()> {
if table_has_column(conn, "telemetry_cost", "focus_key")? {
conn.execute_batch("ALTER TABLE telemetry_cost RENAME COLUMN focus_key TO next_step_key;")?;
}
if table_has_column(conn, "telemetry_cost", "focus_title")? {
conn.execute_batch(
"ALTER TABLE telemetry_cost RENAME COLUMN focus_title TO next_step_title;",
)?;
}
conn.execute_batch(
r#"
DROP INDEX IF EXISTS telemetry_cost_focus_key_idx;
CREATE INDEX IF NOT EXISTS telemetry_cost_next_step_key_idx
ON telemetry_cost (next_step_key);
"#,
)?;
Ok(())
}
fn migrate_v10_to_v11(conn: &Connection) -> Result<()> {
if !table_has_column(conn, "projection_metadata", "session_id")? {
conn.execute_batch("ALTER TABLE projection_metadata ADD COLUMN session_id TEXT;")?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use rusqlite::Connection;
#[test]
fn initialize_creates_all_tables() {
let conn = Connection::open_in_memory().unwrap();
initialize(&conn).unwrap();
let version: u32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, CURRENT_VERSION);
for table in [
"handoff",
"session",
"session_activity",
"escalation",
"recovery",
"projection_metadata",
"execution_gates",
"telemetry_cost",
"memory_op_queue",
] {
let count: i64 = conn
.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(count, 0, "table {table} should exist and be empty");
}
}
#[test]
fn migrate_is_idempotent() {
let conn = Connection::open_in_memory().unwrap();
initialize(&conn).unwrap();
migrate(&conn).unwrap();
let version: u32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, CURRENT_VERSION);
}
#[test]
fn migrate_rejects_future_version() {
let conn = Connection::open_in_memory().unwrap();
initialize(&conn).unwrap();
conn.pragma_update(None, "user_version", CURRENT_VERSION + 1)
.unwrap();
let result = migrate(&conn);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("newer than supported"));
}
#[test]
fn migrate_recovers_version_zero_db() {
let conn = Connection::open_in_memory().unwrap();
migrate(&conn).unwrap();
let version: u32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, CURRENT_VERSION);
for table in [
"handoff",
"session",
"session_activity",
"escalation",
"recovery",
"projection_metadata",
"execution_gates",
"telemetry_cost",
] {
let count: i64 = conn
.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(count, 0, "table {table} should exist after v0 recovery");
}
}
#[test]
fn migrate_repairs_version_zero_partial_session_schema() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE session (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 3,
started_at_epoch_s INTEGER NOT NULL,
last_started_at_epoch_s INTEGER NOT NULL,
start_count INTEGER NOT NULL DEFAULT 1,
session_id TEXT
);
"#,
)
.unwrap();
conn.execute(
"INSERT INTO session
(id, schema_version, started_at_epoch_s, last_started_at_epoch_s, start_count, session_id)
VALUES (1, 3, 10, 10, 1, 'ses_partial')",
[],
)
.unwrap();
migrate(&conn).unwrap();
let columns = conn
.prepare("PRAGMA table_info(session)")
.unwrap()
.query_map([], |row| row.get::<_, String>(1))
.unwrap()
.collect::<rusqlite::Result<Vec<_>>>()
.unwrap();
assert!(columns.iter().any(|column| column == "mode"));
assert!(columns.iter().any(|column| column == "owner_kind"));
assert!(columns.iter().any(|column| column == "revision"));
let mode: String = conn
.query_row("SELECT mode FROM session WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(mode, "general");
let owner_kind: String = conn
.query_row("SELECT owner_kind FROM session WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(owner_kind, "interactive");
let revision: u64 = conn
.query_row("SELECT revision FROM session WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(revision, 1);
}
#[test]
fn migrate_v1_to_v2_drops_current_system_state_column() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE handoff (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
current_system_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]'
);
CREATE TABLE session (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 3,
started_at_epoch_s INTEGER NOT NULL,
last_started_at_epoch_s INTEGER NOT NULL,
start_count INTEGER NOT NULL DEFAULT 1,
session_id TEXT
);
CREATE TABLE escalation (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL CHECK (kind IN ('blocking', 'non_blocking')),
reason TEXT NOT NULL,
created_at_epoch_s INTEGER NOT NULL,
session_id TEXT
);
CREATE TABLE recovery (
id INTEGER PRIMARY KEY CHECK (id = 1),
checkpoint_origin TEXT,
checkpoint_captured_at_epoch_s INTEGER,
checkpoint_session_started_at_epoch_s INTEGER,
checkpoint_summary TEXT,
checkpoint_immediate_actions TEXT,
checkpoint_key_files TEXT,
buffer_origin TEXT,
buffer_captured_at_epoch_s INTEGER,
buffer_session_started_at_epoch_s INTEGER,
buffer_summary_lines TEXT
);
CREATE TABLE projection_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
source_fingerprint TEXT NOT NULL,
projection_digests TEXT,
tool_surface_fingerprint TEXT
);
"#,
)
.unwrap();
conn.pragma_update(None, "user_version", 1).unwrap();
conn.execute(
"INSERT INTO handoff
(id, schema_version, title, immediate_actions, completed_state,
current_system_state, operational_guardrails, key_files, definition_of_done)
VALUES (1, 1, 'Test', '[]', '[]', '[\"stale\"]', '[]', '[]', '[]')",
[],
)
.unwrap();
migrate(&conn).unwrap();
let version: u32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, CURRENT_VERSION);
let columns = conn
.prepare("PRAGMA table_info(handoff)")
.unwrap()
.query_map([], |row| row.get::<_, String>(1))
.unwrap()
.collect::<rusqlite::Result<Vec<_>>>()
.unwrap();
assert!(!columns
.iter()
.any(|column| column == "current_system_state"));
}
#[test]
fn migrate_v1_to_v2_recovers_from_leftover_temp_table() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE handoff (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
current_system_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]'
);
CREATE TABLE handoff_v2 (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]'
);
CREATE TABLE session (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 3,
started_at_epoch_s INTEGER NOT NULL,
last_started_at_epoch_s INTEGER NOT NULL,
start_count INTEGER NOT NULL DEFAULT 1,
session_id TEXT
);
CREATE TABLE escalation (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL CHECK (kind IN ('blocking', 'non_blocking')),
reason TEXT NOT NULL,
created_at_epoch_s INTEGER NOT NULL,
session_id TEXT
);
CREATE TABLE recovery (
id INTEGER PRIMARY KEY CHECK (id = 1),
checkpoint_origin TEXT,
checkpoint_captured_at_epoch_s INTEGER,
checkpoint_session_started_at_epoch_s INTEGER,
checkpoint_summary TEXT,
checkpoint_immediate_actions TEXT,
checkpoint_key_files TEXT,
buffer_origin TEXT,
buffer_captured_at_epoch_s INTEGER,
buffer_session_started_at_epoch_s INTEGER,
buffer_summary_lines TEXT
);
CREATE TABLE projection_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
source_fingerprint TEXT NOT NULL,
projection_digests TEXT,
tool_surface_fingerprint TEXT
);
"#,
)
.unwrap();
conn.pragma_update(None, "user_version", 1).unwrap();
conn.execute(
"INSERT INTO handoff
(id, schema_version, title, immediate_actions, completed_state,
current_system_state, operational_guardrails, key_files, definition_of_done)
VALUES (1, 1, 'Retry-safe', '[]', '[]', '[\"stale\"]', '[]', '[]', '[]')",
[],
)
.unwrap();
migrate(&conn).unwrap();
let title: String = conn
.query_row("SELECT title FROM handoff WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(title, "Retry-safe");
}
#[test]
fn migrate_v2_to_v3_adds_execution_gates_table() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE handoff (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]'
);
CREATE TABLE session (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 3,
started_at_epoch_s INTEGER NOT NULL,
last_started_at_epoch_s INTEGER NOT NULL,
start_count INTEGER NOT NULL DEFAULT 1,
session_id TEXT
);
CREATE TABLE escalation (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL CHECK (kind IN ('blocking', 'non_blocking')),
reason TEXT NOT NULL,
created_at_epoch_s INTEGER NOT NULL,
session_id TEXT
);
CREATE TABLE recovery (
id INTEGER PRIMARY KEY CHECK (id = 1),
checkpoint_origin TEXT,
checkpoint_captured_at_epoch_s INTEGER,
checkpoint_session_started_at_epoch_s INTEGER,
checkpoint_summary TEXT,
checkpoint_immediate_actions TEXT,
checkpoint_key_files TEXT,
buffer_origin TEXT,
buffer_captured_at_epoch_s INTEGER,
buffer_session_started_at_epoch_s INTEGER,
buffer_summary_lines TEXT
);
CREATE TABLE projection_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
source_fingerprint TEXT NOT NULL,
projection_digests TEXT,
tool_surface_fingerprint TEXT
);
"#,
)
.unwrap();
conn.pragma_update(None, "user_version", 2).unwrap();
migrate(&conn).unwrap();
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM execution_gates", [], |row| row.get(0))
.unwrap();
assert_eq!(count, 0);
}
#[test]
fn migrate_v3_to_v4_adds_session_mode_column() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE handoff (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]'
);
CREATE TABLE session (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 3,
started_at_epoch_s INTEGER NOT NULL,
last_started_at_epoch_s INTEGER NOT NULL,
start_count INTEGER NOT NULL DEFAULT 1,
session_id TEXT
);
CREATE TABLE escalation (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL CHECK (kind IN ('blocking', 'non_blocking')),
reason TEXT NOT NULL,
created_at_epoch_s INTEGER NOT NULL,
session_id TEXT
);
CREATE TABLE recovery (
id INTEGER PRIMARY KEY CHECK (id = 1),
checkpoint_origin TEXT,
checkpoint_captured_at_epoch_s INTEGER,
checkpoint_session_started_at_epoch_s INTEGER,
checkpoint_summary TEXT,
checkpoint_immediate_actions TEXT,
checkpoint_key_files TEXT,
buffer_origin TEXT,
buffer_captured_at_epoch_s INTEGER,
buffer_session_started_at_epoch_s INTEGER,
buffer_summary_lines TEXT
);
CREATE TABLE projection_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
source_fingerprint TEXT NOT NULL,
projection_digests TEXT,
tool_surface_fingerprint TEXT
);
CREATE TABLE execution_gates (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
seeded_from TEXT,
gates TEXT NOT NULL DEFAULT '[]'
);
"#,
)
.unwrap();
conn.pragma_update(None, "user_version", 3).unwrap();
conn.execute(
"INSERT INTO session
(id, schema_version, started_at_epoch_s, last_started_at_epoch_s, start_count, session_id)
VALUES (1, 3, 1000, 1000, 1, 'ses_V3')",
[],
)
.unwrap();
migrate(&conn).unwrap();
let mode: String = conn
.query_row("SELECT mode FROM session WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(mode, "general");
let owner_kind: String = conn
.query_row("SELECT owner_kind FROM session WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(owner_kind, "interactive");
let revision: u64 = conn
.query_row("SELECT revision FROM session WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(revision, 1);
}
#[test]
fn migrate_v6_to_v8_adds_session_activity_table_without_disturbing_surface_revisions() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE handoff (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]'
);
CREATE TABLE session (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 4,
started_at_epoch_s INTEGER NOT NULL,
last_started_at_epoch_s INTEGER NOT NULL,
start_count INTEGER NOT NULL DEFAULT 1,
session_id TEXT,
mode TEXT NOT NULL DEFAULT 'general' CHECK (mode IN ('general', 'research', 'implement')),
owner_kind TEXT CHECK (owner_kind IN ('interactive', 'runtime_supervisor', 'runtime_worker')),
owner_id TEXT,
supervisor_id TEXT,
lease_ttl_secs INTEGER,
last_heartbeat_at_epoch_s INTEGER,
revision INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE escalation (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL CHECK (kind IN ('blocking', 'non_blocking')),
reason TEXT NOT NULL,
created_at_epoch_s INTEGER NOT NULL,
session_id TEXT
);
CREATE TABLE recovery (
id INTEGER PRIMARY KEY CHECK (id = 1),
checkpoint_origin TEXT,
checkpoint_captured_at_epoch_s INTEGER,
checkpoint_session_started_at_epoch_s INTEGER,
checkpoint_summary TEXT,
checkpoint_immediate_actions TEXT,
checkpoint_key_files TEXT,
buffer_origin TEXT,
buffer_captured_at_epoch_s INTEGER,
buffer_session_started_at_epoch_s INTEGER,
buffer_summary_lines TEXT
);
CREATE TABLE projection_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
source_fingerprint TEXT NOT NULL,
projection_digests TEXT,
tool_surface_fingerprint TEXT
);
CREATE TABLE execution_gates (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
seeded_from TEXT,
gates TEXT NOT NULL DEFAULT '[]'
);
CREATE TABLE telemetry_cost (
id INTEGER PRIMARY KEY AUTOINCREMENT,
backlog_item_ref TEXT NOT NULL,
focus_key TEXT NOT NULL DEFAULT '',
observed_at_epoch_s INTEGER NOT NULL,
host TEXT,
model TEXT,
cost_usd REAL,
total_tokens INTEGER,
input_tokens INTEGER,
output_tokens INTEGER,
context_used_pct INTEGER,
context_window_tokens INTEGER,
compacted INTEGER
);
"#,
)
.unwrap();
conn.pragma_update(None, "user_version", 6).unwrap();
conn.execute(
"INSERT INTO handoff
(id, schema_version, title, immediate_actions, completed_state, operational_guardrails, key_files, definition_of_done)
VALUES (1, 1, 'Seeded', '[\"one\"]', '[]', '[]', '[]', '[\"done\"]')",
[],
)
.unwrap();
conn.execute(
"INSERT INTO execution_gates (id, schema_version, seeded_from, gates)
VALUES (1, 1, 'handoff:immediate_actions', '[{\"text\":\"gate\",\"status\":\"open\"}]')",
[],
)
.unwrap();
migrate(&conn).unwrap();
let handoff_revision: u64 = conn
.query_row("SELECT revision FROM handoff WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(handoff_revision, 1);
let gate_schema_version: u32 = conn
.query_row(
"SELECT schema_version FROM execution_gates WHERE id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(gate_schema_version, 2);
let gate_revision: u64 = conn
.query_row(
"SELECT revision FROM execution_gates WHERE id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(gate_revision, 1);
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM session_activity", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(count, 0);
}
#[test]
fn migrate_v9_to_v10_renames_focus_columns() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE telemetry_cost (
session_id TEXT PRIMARY KEY,
recorded_at_epoch_s INTEGER NOT NULL,
focus_key TEXT NOT NULL DEFAULT '',
focus_title TEXT,
ccd_id INTEGER NOT NULL DEFAULT 0,
github_issue_number INTEGER NOT NULL DEFAULT 0,
backlog_provider TEXT,
backlog_kind TEXT,
backlog_id TEXT,
backlog_url TEXT,
model TEXT,
session_cost_usd REAL NOT NULL,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
cache_creation_input_tokens INTEGER NOT NULL DEFAULT 0,
cache_read_input_tokens INTEGER NOT NULL DEFAULT 0,
blended_total_tokens INTEGER
);
CREATE INDEX telemetry_cost_focus_key_idx
ON telemetry_cost (focus_key);
INSERT INTO telemetry_cost
(session_id, recorded_at_epoch_s, focus_key, focus_title,
session_cost_usd)
VALUES
('ses_1', 1000, 'ccd:42', 'Widget refactor', 1.50);
"#,
)
.unwrap();
migrate_v9_to_v10(&conn).unwrap();
assert!(!table_has_column(&conn, "telemetry_cost", "focus_key").unwrap());
assert!(!table_has_column(&conn, "telemetry_cost", "focus_title").unwrap());
assert!(table_has_column(&conn, "telemetry_cost", "next_step_key").unwrap());
assert!(table_has_column(&conn, "telemetry_cost", "next_step_title").unwrap());
let key: String = conn
.query_row(
"SELECT next_step_key FROM telemetry_cost WHERE session_id = 'ses_1'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(key, "ccd:42");
let title: String = conn
.query_row(
"SELECT next_step_title FROM telemetry_cost WHERE session_id = 'ses_1'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(title, "Widget refactor");
}
#[test]
fn migrate_v10_to_v11_adds_session_id_column() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE projection_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
source_fingerprint TEXT NOT NULL,
projection_digests TEXT,
tool_surface_fingerprint TEXT
);
INSERT INTO projection_metadata
(observed_at_epoch_s, source_fingerprint, projection_digests, tool_surface_fingerprint)
VALUES
(1000, 'fp_1', NULL, 'tool_abc');
"#,
)
.unwrap();
migrate_v10_to_v11(&conn).unwrap();
assert!(table_has_column(&conn, "projection_metadata", "session_id").unwrap());
let session_id: Option<String> = conn
.query_row(
"SELECT session_id FROM projection_metadata WHERE source_fingerprint = 'fp_1'",
[],
|row| row.get(0),
)
.unwrap();
assert!(session_id.is_none());
conn.execute(
"INSERT INTO projection_metadata
(observed_at_epoch_s, source_fingerprint, projection_digests,
tool_surface_fingerprint, session_id)
VALUES (2000, 'fp_2', NULL, NULL, 'ses_test')",
[],
)
.unwrap();
let session_id: Option<String> = conn
.query_row(
"SELECT session_id FROM projection_metadata WHERE source_fingerprint = 'fp_2'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(session_id.as_deref(), Some("ses_test"));
migrate_v10_to_v11(&conn).unwrap();
}
}