use std::path::Path;
use anyhow::{bail, Result};
use rusqlite::Connection;
pub(crate) const CURRENT_VERSION: u32 = 16;
const TELEMETRY_COST_TABLE: &str = "telemetry_cost";
const LEGACY_TELEMETRY_IDENTITY_COLUMNS: &[&[&str]] = &[
&["ccd", "_id"],
&["github", "_issue_number"],
&["backlog", "_provider"],
&["backlog", "_kind"],
&["backlog", "_id"],
&["backlog", "_url"],
];
pub(crate) fn initialize(conn: &Connection, _pods_root: Option<&Path>) -> Result<()> {
conn.execute_batch(SCHEMA_CURRENT)?;
conn.pragma_update(None, "user_version", CURRENT_VERSION)?;
Ok(())
}
pub(crate) fn migrate(conn: &Connection, pods_root: Option<&Path>) -> Result<()> {
let version: u32 = conn.pragma_query_value(None, "user_version", |row| row.get(0))?;
if version == CURRENT_VERSION {
return Ok(());
}
if version > CURRENT_VERSION {
bail!(
"state.db schema version {version} is newer than supported ({CURRENT_VERSION}); \
upgrade the ccd CLI"
);
}
if version == 0 {
if let Some(pods_root) = pods_root {
check_pod_layer_collapse_gate_b(pods_root)?;
}
conn.execute_batch(SCHEMA_CURRENT)?;
repair_zero_version_schema(conn, pods_root)?;
conn.pragma_update(None, "user_version", CURRENT_VERSION)?;
return Ok(());
} else if version < 2 {
migrate_v1_to_v2(conn)?;
}
if version < 3 {
migrate_v2_to_v3(conn)?;
}
if version < 4 {
migrate_v3_to_v4(conn)?;
}
if version < 5 {
migrate_v4_to_v5(conn)?;
}
if version < 6 {
migrate_v5_to_v6(conn)?;
}
if version < 7 {
migrate_v6_to_v7(conn)?;
}
if version < 8 {
migrate_v7_to_v8(conn)?;
}
if version < 9 {
migrate_v8_to_v9(conn)?;
}
if version < 10 {
migrate_v9_to_v10(conn)?;
}
if version < 11 {
migrate_v10_to_v11(conn)?;
}
if version < 12 {
migrate_v11_to_v12(conn)?;
}
if version < 13 {
migrate_v12_to_v13(conn)?;
}
if version < 14 {
migrate_v13_to_v14(conn)?;
}
if version < 15 {
migrate_v14_to_v15(conn)?;
}
if version < 16 {
migrate_v15_to_v16_with_pods_root(conn, pods_root)?;
}
conn.pragma_update(None, "user_version", CURRENT_VERSION)?;
Ok(())
}
fn repair_zero_version_schema(conn: &Connection, pods_root: Option<&Path>) -> Result<()> {
if !table_has_column(conn, "session", "mode")? {
migrate_v3_to_v4(conn)?;
}
if !table_has_column(conn, TELEMETRY_COST_TABLE, "next_step_key")? {
migrate_v4_to_v5(conn)?;
migrate_v9_to_v10(conn)?;
}
if !table_has_column(conn, "session", "revision")? {
migrate_v5_to_v6(conn)?;
}
if !table_has_column(conn, "handoff", "revision")?
|| !table_has_column(conn, "execution_gates", "revision")?
{
migrate_v6_to_v7(conn)?;
}
if !table_exists(conn, "memory_op_queue")? {
migrate_v8_to_v9(conn)?;
}
if table_has_column(conn, TELEMETRY_COST_TABLE, "focus_key")? {
migrate_v9_to_v10(conn)?;
}
if !table_has_column(conn, "projection_metadata", "session_id")? {
migrate_v10_to_v11(conn)?;
}
if !table_exists(conn, "memory_evidence")? {
migrate_v11_to_v12(conn)?;
}
if !table_exists(conn, "host_loop_events")? {
migrate_v12_to_v13(conn)?;
}
if !table_exists(conn, "projection_cache_entries")?
|| !table_exists(conn, "projection_cache_events")?
|| !table_exists(conn, "work_stream_decay")?
{
migrate_v13_to_v14(conn)?;
}
if has_legacy_telemetry_identity(conn)? {
migrate_v14_to_v15(conn)?;
}
if memory_evidence_check_allows_pod(conn)? {
migrate_v15_to_v16_with_pods_root(conn, pods_root)?;
} else if let Some(pods_root) = pods_root {
check_pod_layer_collapse_gate_b(pods_root)?;
}
Ok(())
}
fn is_safe_sql_identifier(ident: &str) -> bool {
let mut chars = ident.chars();
match chars.next() {
Some(c) if c == '_' || c.is_ascii_alphabetic() => {}
_ => return false,
}
chars.all(|c| c == '_' || c.is_ascii_alphanumeric())
}
fn table_has_column(conn: &Connection, table: &str, column: &str) -> Result<bool> {
if !is_safe_sql_identifier(table) {
bail!("invalid SQL identifier for table: {table:?}");
}
let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
let mut rows = stmt.query([])?;
while let Some(row) = rows.next()? {
if row.get::<_, String>(1)? == column {
return Ok(true);
}
}
Ok(false)
}
fn table_exists(conn: &Connection, table: &str) -> Result<bool> {
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = ?1",
[table],
|row| row.get(0),
)?;
Ok(count > 0)
}
fn legacy_telemetry_column(parts: &[&str]) -> String {
parts.concat()
}
fn has_legacy_telemetry_identity(conn: &Connection) -> Result<bool> {
for column in LEGACY_TELEMETRY_IDENTITY_COLUMNS {
if table_has_column(conn, TELEMETRY_COST_TABLE, &legacy_telemetry_column(column))? {
return Ok(true);
}
}
Ok(false)
}
fn legacy_telemetry_identity_column_defs() -> String {
[
format!(
" {} INTEGER NOT NULL DEFAULT 0,",
legacy_telemetry_column(LEGACY_TELEMETRY_IDENTITY_COLUMNS[0])
),
format!(
" {} INTEGER NOT NULL DEFAULT 0,",
legacy_telemetry_column(LEGACY_TELEMETRY_IDENTITY_COLUMNS[1])
),
format!(
" {} TEXT,",
legacy_telemetry_column(LEGACY_TELEMETRY_IDENTITY_COLUMNS[2])
),
format!(
" {} TEXT,",
legacy_telemetry_column(LEGACY_TELEMETRY_IDENTITY_COLUMNS[3])
),
format!(
" {} TEXT,",
legacy_telemetry_column(LEGACY_TELEMETRY_IDENTITY_COLUMNS[4])
),
format!(
" {} TEXT,",
legacy_telemetry_column(LEGACY_TELEMETRY_IDENTITY_COLUMNS[5])
),
]
.join("\n")
}
const SCHEMA_CURRENT: &str = r#"
CREATE TABLE IF NOT EXISTS handoff (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]',
revision INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS session (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 4,
started_at_epoch_s INTEGER NOT NULL,
last_started_at_epoch_s INTEGER NOT NULL,
start_count INTEGER NOT NULL DEFAULT 1,
session_id TEXT,
mode TEXT NOT NULL DEFAULT 'general' CHECK (mode IN ('general', 'research', 'implement')),
owner_kind TEXT CHECK (owner_kind IN ('interactive', 'runtime_supervisor', 'runtime_worker')),
owner_id TEXT,
supervisor_id TEXT,
lease_ttl_secs INTEGER,
last_heartbeat_at_epoch_s INTEGER,
revision INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS session_activity (
id INTEGER PRIMARY KEY CHECK (id = 1),
session_id TEXT NOT NULL,
actor_id TEXT NOT NULL,
current_activity TEXT NOT NULL,
updated_at_epoch_s INTEGER NOT NULL,
session_revision INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS escalation (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL CHECK (kind IN ('blocking', 'non_blocking')),
reason TEXT NOT NULL,
created_at_epoch_s INTEGER NOT NULL,
session_id TEXT
);
CREATE TABLE IF NOT EXISTS recovery (
id INTEGER PRIMARY KEY CHECK (id = 1),
checkpoint_origin TEXT,
checkpoint_captured_at_epoch_s INTEGER,
checkpoint_session_started_at_epoch_s INTEGER,
checkpoint_summary TEXT,
checkpoint_immediate_actions TEXT,
checkpoint_key_files TEXT,
buffer_origin TEXT,
buffer_captured_at_epoch_s INTEGER,
buffer_session_started_at_epoch_s INTEGER,
buffer_summary_lines TEXT
);
CREATE TABLE IF NOT EXISTS projection_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
source_fingerprint TEXT NOT NULL,
projection_digests TEXT,
tool_surface_fingerprint TEXT,
session_id TEXT
);
CREATE TABLE IF NOT EXISTS execution_gates (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 2,
seeded_from TEXT,
gates TEXT NOT NULL DEFAULT '[]',
revision INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS telemetry_cost (
session_id TEXT PRIMARY KEY,
recorded_at_epoch_s INTEGER NOT NULL,
next_step_key TEXT NOT NULL DEFAULT '',
next_step_title TEXT,
model TEXT,
session_cost_usd REAL NOT NULL,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
cache_creation_input_tokens INTEGER NOT NULL DEFAULT 0,
cache_read_input_tokens INTEGER NOT NULL DEFAULT 0,
blended_total_tokens INTEGER
);
CREATE INDEX IF NOT EXISTS telemetry_cost_next_step_key_idx
ON telemetry_cost (next_step_key);
CREATE TABLE IF NOT EXISTS host_loop_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
session_id TEXT,
host TEXT NOT NULL,
hook TEXT NOT NULL,
status TEXT NOT NULL,
session_boundary_action TEXT,
source_fingerprint TEXT,
normalized_payload_hash TEXT,
payload_chars INTEGER,
payload_estimated_tokens INTEGER,
host_total_context_chars INTEGER,
overhead_ratio REAL,
session_started_at_epoch_s INTEGER,
session_last_started_at_epoch_s INTEGER,
session_start_count INTEGER,
section_metrics_json TEXT
);
CREATE INDEX IF NOT EXISTS host_loop_events_observed_idx
ON host_loop_events (observed_at_epoch_s DESC);
CREATE INDEX IF NOT EXISTS host_loop_events_host_hook_idx
ON host_loop_events (host, hook, observed_at_epoch_s DESC);
CREATE TABLE IF NOT EXISTS projection_cache_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
target TEXT NOT NULL CHECK (target IN ('default', 'planning', 'session')),
format TEXT NOT NULL CHECK (format IN ('narrative', 'symbolic', 'bundle')),
source_fingerprint TEXT NOT NULL,
tool_surface_fingerprint TEXT NOT NULL DEFAULT '',
payload_json TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS projection_cache_entries_key_idx
ON projection_cache_entries (target, format, source_fingerprint, tool_surface_fingerprint);
CREATE INDEX IF NOT EXISTS projection_cache_entries_observed_idx
ON projection_cache_entries (observed_at_epoch_s DESC);
CREATE TABLE IF NOT EXISTS projection_cache_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
session_id TEXT,
target TEXT NOT NULL CHECK (target IN ('default', 'planning', 'session')),
format TEXT NOT NULL CHECK (format IN ('narrative', 'symbolic', 'bundle')),
source_fingerprint TEXT NOT NULL,
tool_surface_fingerprint TEXT NOT NULL DEFAULT '',
cache_status TEXT NOT NULL CHECK (cache_status IN ('hit', 'miss'))
);
CREATE INDEX IF NOT EXISTS projection_cache_events_target_format_idx
ON projection_cache_events (target, format, observed_at_epoch_s DESC);
CREATE INDEX IF NOT EXISTS projection_cache_events_session_idx
ON projection_cache_events (session_id, observed_at_epoch_s DESC);
CREATE TABLE IF NOT EXISTS work_stream_decay (
id INTEGER PRIMARY KEY CHECK (id = 1),
session_id TEXT NOT NULL,
consecutive_no_progress INTEGER NOT NULL DEFAULT 0,
last_outcome TEXT NOT NULL CHECK (last_outcome IN ('progress', 'no_progress', 'neutral')),
updated_at_epoch_s INTEGER NOT NULL,
last_progress_at_epoch_s INTEGER
);
CREATE TABLE IF NOT EXISTS memory_op_queue (
id TEXT PRIMARY KEY,
command TEXT NOT NULL,
request_fingerprint TEXT NOT NULL,
plan_json TEXT NOT NULL,
staged_at_epoch_s INTEGER NOT NULL,
updated_at_epoch_s INTEGER NOT NULL,
actor_id TEXT,
session_id TEXT,
reconciled INTEGER NOT NULL DEFAULT 0,
outcome TEXT,
authored_entry_ids TEXT NOT NULL DEFAULT '[]',
authored_target_path TEXT,
authored_source_path TEXT,
snapshot_json TEXT
);
CREATE TABLE IF NOT EXISTS memory_evidence (
id TEXT PRIMARY KEY,
scope TEXT NOT NULL CHECK (scope IN ('workspace', 'work_stream', 'project', 'profile', 'project_truth')),
entry_type TEXT NOT NULL CHECK (entry_type IN ('rule', 'constraint', 'heuristic', 'observation', 'attempt')),
source_kind TEXT NOT NULL CHECK (source_kind IN ('transcript', 'session', 'event_stream', 'hook_output', 'log', 'document')),
summary TEXT NOT NULL,
summary_digest TEXT NOT NULL,
observed_at_epoch_s INTEGER NOT NULL,
actor_id TEXT,
session_id TEXT,
source_ref TEXT,
host TEXT,
host_hook TEXT,
host_session_id TEXT,
host_run_id TEXT,
host_task_id TEXT,
provider_name TEXT,
provider_ref TEXT,
extracted INTEGER NOT NULL DEFAULT 0,
extracted_candidate_id TEXT,
extracted_at_epoch_s INTEGER
);
CREATE INDEX IF NOT EXISTS memory_evidence_extracted_idx
ON memory_evidence (extracted, observed_at_epoch_s);
"#;
fn migrate_v1_to_v2(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
DROP TABLE IF EXISTS handoff_v2;
CREATE TABLE handoff_v2 (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]'
);
INSERT INTO handoff_v2
(id, schema_version, title, immediate_actions, completed_state,
operational_guardrails, key_files, definition_of_done)
SELECT
id, schema_version, title, immediate_actions, completed_state,
operational_guardrails, key_files, definition_of_done
FROM handoff;
DROP TABLE handoff;
ALTER TABLE handoff_v2 RENAME TO handoff;
"#,
)?;
Ok(())
}
fn migrate_v2_to_v3(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS execution_gates (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
seeded_from TEXT,
gates TEXT NOT NULL DEFAULT '[]'
);
"#,
)?;
Ok(())
}
fn migrate_v3_to_v4(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
ALTER TABLE session
ADD COLUMN mode TEXT NOT NULL DEFAULT 'general'
CHECK (mode IN ('general', 'research', 'implement'));
"#,
)?;
Ok(())
}
fn migrate_v4_to_v5(conn: &Connection) -> Result<()> {
conn.execute_batch(&format!(
r#"
CREATE TABLE IF NOT EXISTS telemetry_cost (
session_id TEXT PRIMARY KEY,
recorded_at_epoch_s INTEGER NOT NULL,
focus_key TEXT NOT NULL DEFAULT '',
focus_title TEXT,
{legacy_identity_columns}
model TEXT,
session_cost_usd REAL NOT NULL,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
cache_creation_input_tokens INTEGER NOT NULL DEFAULT 0,
cache_read_input_tokens INTEGER NOT NULL DEFAULT 0,
blended_total_tokens INTEGER
);
CREATE INDEX IF NOT EXISTS telemetry_cost_focus_key_idx
ON telemetry_cost (focus_key);
"#,
legacy_identity_columns = legacy_telemetry_identity_column_defs()
))?;
Ok(())
}
fn migrate_v5_to_v6(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
ALTER TABLE session
ADD COLUMN owner_kind TEXT
CHECK (owner_kind IN ('interactive', 'runtime_supervisor', 'runtime_worker'));
ALTER TABLE session
ADD COLUMN owner_id TEXT;
ALTER TABLE session
ADD COLUMN supervisor_id TEXT;
ALTER TABLE session
ADD COLUMN lease_ttl_secs INTEGER;
ALTER TABLE session
ADD COLUMN last_heartbeat_at_epoch_s INTEGER;
ALTER TABLE session
ADD COLUMN revision INTEGER NOT NULL DEFAULT 0;
UPDATE session
SET owner_kind = 'interactive'
WHERE owner_kind IS NULL
AND session_id IS NOT NULL;
UPDATE session
SET owner_id = 'interactive'
WHERE owner_id IS NULL
AND session_id IS NOT NULL;
UPDATE session
SET revision = 1
WHERE session_id IS NOT NULL
AND revision = 0;
"#,
)?;
Ok(())
}
fn migrate_v6_to_v7(conn: &Connection) -> Result<()> {
if !table_has_column(conn, "handoff", "revision")? {
conn.execute_batch(
r#"
ALTER TABLE handoff
ADD COLUMN revision INTEGER NOT NULL DEFAULT 0;
UPDATE handoff
SET revision = 1
WHERE revision = 0
AND (
title != ''
OR immediate_actions != '[]'
OR completed_state != '[]'
OR operational_guardrails != '[]'
OR key_files != '[]'
OR definition_of_done != '[]'
);
"#,
)?;
}
if !table_has_column(conn, "execution_gates", "revision")? {
conn.execute_batch(
r#"
ALTER TABLE execution_gates
ADD COLUMN revision INTEGER NOT NULL DEFAULT 0;
UPDATE execution_gates
SET schema_version = CASE
WHEN schema_version < 2 THEN 2
ELSE schema_version
END,
revision = CASE
WHEN revision = 0 AND gates != '[]' THEN 1
ELSE revision
END;
"#,
)?;
}
Ok(())
}
fn migrate_v7_to_v8(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS session_activity (
id INTEGER PRIMARY KEY CHECK (id = 1),
session_id TEXT NOT NULL,
actor_id TEXT NOT NULL,
current_activity TEXT NOT NULL,
updated_at_epoch_s INTEGER NOT NULL,
session_revision INTEGER NOT NULL
);
"#,
)?;
Ok(())
}
fn migrate_v8_to_v9(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS memory_op_queue (
id TEXT PRIMARY KEY,
command TEXT NOT NULL,
request_fingerprint TEXT NOT NULL,
plan_json TEXT NOT NULL,
staged_at_epoch_s INTEGER NOT NULL,
updated_at_epoch_s INTEGER NOT NULL,
actor_id TEXT,
session_id TEXT,
reconciled INTEGER NOT NULL DEFAULT 0,
outcome TEXT,
authored_entry_ids TEXT NOT NULL DEFAULT '[]',
authored_target_path TEXT,
authored_source_path TEXT,
snapshot_json TEXT
);
"#,
)?;
Ok(())
}
fn migrate_v9_to_v10(conn: &Connection) -> Result<()> {
if table_has_column(conn, "telemetry_cost", "focus_key")? {
conn.execute_batch("ALTER TABLE telemetry_cost RENAME COLUMN focus_key TO next_step_key;")?;
}
if table_has_column(conn, "telemetry_cost", "focus_title")? {
conn.execute_batch(
"ALTER TABLE telemetry_cost RENAME COLUMN focus_title TO next_step_title;",
)?;
}
conn.execute_batch(
r#"
DROP INDEX IF EXISTS telemetry_cost_focus_key_idx;
CREATE INDEX IF NOT EXISTS telemetry_cost_next_step_key_idx
ON telemetry_cost (next_step_key);
"#,
)?;
Ok(())
}
fn migrate_v10_to_v11(conn: &Connection) -> Result<()> {
if !table_has_column(conn, "projection_metadata", "session_id")? {
conn.execute_batch("ALTER TABLE projection_metadata ADD COLUMN session_id TEXT;")?;
}
Ok(())
}
fn migrate_v11_to_v12(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS memory_evidence (
id TEXT PRIMARY KEY,
scope TEXT NOT NULL CHECK (scope IN ('workspace', 'work_stream', 'project', 'profile', 'pod', 'project_truth')),
entry_type TEXT NOT NULL CHECK (entry_type IN ('rule', 'constraint', 'heuristic', 'observation', 'attempt')),
source_kind TEXT NOT NULL CHECK (source_kind IN ('transcript', 'session', 'event_stream', 'hook_output', 'log', 'document')),
summary TEXT NOT NULL,
summary_digest TEXT NOT NULL,
observed_at_epoch_s INTEGER NOT NULL,
actor_id TEXT,
session_id TEXT,
source_ref TEXT,
host TEXT,
host_hook TEXT,
host_session_id TEXT,
host_run_id TEXT,
host_task_id TEXT,
provider_name TEXT,
provider_ref TEXT,
extracted INTEGER NOT NULL DEFAULT 0,
extracted_candidate_id TEXT,
extracted_at_epoch_s INTEGER
);
CREATE INDEX IF NOT EXISTS memory_evidence_extracted_idx
ON memory_evidence (extracted, observed_at_epoch_s);
"#,
)?;
Ok(())
}
fn migrate_v12_to_v13(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS host_loop_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
session_id TEXT,
host TEXT NOT NULL,
hook TEXT NOT NULL,
status TEXT NOT NULL,
session_boundary_action TEXT,
source_fingerprint TEXT,
normalized_payload_hash TEXT,
payload_chars INTEGER,
payload_estimated_tokens INTEGER,
host_total_context_chars INTEGER,
overhead_ratio REAL,
session_started_at_epoch_s INTEGER,
session_last_started_at_epoch_s INTEGER,
session_start_count INTEGER,
section_metrics_json TEXT
);
CREATE INDEX IF NOT EXISTS host_loop_events_observed_idx
ON host_loop_events (observed_at_epoch_s DESC);
CREATE INDEX IF NOT EXISTS host_loop_events_host_hook_idx
ON host_loop_events (host, hook, observed_at_epoch_s DESC);
"#,
)?;
Ok(())
}
fn migrate_v13_to_v14(conn: &Connection) -> Result<()> {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS projection_cache_entries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
target TEXT NOT NULL CHECK (target IN ('default', 'planning', 'session')),
format TEXT NOT NULL CHECK (format IN ('narrative', 'symbolic', 'bundle')),
source_fingerprint TEXT NOT NULL,
tool_surface_fingerprint TEXT NOT NULL DEFAULT '',
payload_json TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS projection_cache_entries_key_idx
ON projection_cache_entries (target, format, source_fingerprint, tool_surface_fingerprint);
CREATE INDEX IF NOT EXISTS projection_cache_entries_observed_idx
ON projection_cache_entries (observed_at_epoch_s DESC);
CREATE TABLE IF NOT EXISTS projection_cache_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
session_id TEXT,
target TEXT NOT NULL CHECK (target IN ('default', 'planning', 'session')),
format TEXT NOT NULL CHECK (format IN ('narrative', 'symbolic', 'bundle')),
source_fingerprint TEXT NOT NULL,
tool_surface_fingerprint TEXT NOT NULL DEFAULT '',
cache_status TEXT NOT NULL CHECK (cache_status IN ('hit', 'miss'))
);
CREATE INDEX IF NOT EXISTS projection_cache_events_target_format_idx
ON projection_cache_events (target, format, observed_at_epoch_s DESC);
CREATE INDEX IF NOT EXISTS projection_cache_events_session_idx
ON projection_cache_events (session_id, observed_at_epoch_s DESC);
CREATE TABLE IF NOT EXISTS work_stream_decay (
id INTEGER PRIMARY KEY CHECK (id = 1),
session_id TEXT NOT NULL,
consecutive_no_progress INTEGER NOT NULL DEFAULT 0,
last_outcome TEXT NOT NULL CHECK (last_outcome IN ('progress', 'no_progress', 'neutral')),
updated_at_epoch_s INTEGER NOT NULL,
last_progress_at_epoch_s INTEGER
);
"#,
)?;
Ok(())
}
fn migrate_v14_to_v15(conn: &Connection) -> Result<()> {
if !table_exists(conn, TELEMETRY_COST_TABLE)? {
return Ok(());
}
if !has_legacy_telemetry_identity(conn)? {
return Ok(());
}
conn.execute_batch(
r#"
DROP INDEX IF EXISTS telemetry_cost_next_step_key_idx;
ALTER TABLE telemetry_cost RENAME TO telemetry_cost_v14;
CREATE TABLE telemetry_cost (
session_id TEXT PRIMARY KEY,
recorded_at_epoch_s INTEGER NOT NULL,
next_step_key TEXT NOT NULL DEFAULT '',
next_step_title TEXT,
model TEXT,
session_cost_usd REAL NOT NULL,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
cache_creation_input_tokens INTEGER NOT NULL DEFAULT 0,
cache_read_input_tokens INTEGER NOT NULL DEFAULT 0,
blended_total_tokens INTEGER
);
INSERT INTO telemetry_cost
(session_id, recorded_at_epoch_s, next_step_key, next_step_title,
model, session_cost_usd, input_tokens, output_tokens,
cache_creation_input_tokens, cache_read_input_tokens, blended_total_tokens)
SELECT
session_id, recorded_at_epoch_s, next_step_key, next_step_title,
model, session_cost_usd, input_tokens, output_tokens,
cache_creation_input_tokens, cache_read_input_tokens, blended_total_tokens
FROM telemetry_cost_v14;
DROP TABLE telemetry_cost_v14;
CREATE INDEX IF NOT EXISTS telemetry_cost_next_step_key_idx
ON telemetry_cost (next_step_key);
"#,
)?;
Ok(())
}
fn migrate_v15_to_v16_with_pods_root(conn: &Connection, pods_root: Option<&Path>) -> Result<()> {
let has_current = table_exists(conn, "memory_evidence")?;
let has_leftover = table_exists(conn, "memory_evidence_v15")?;
if has_leftover {
bail!(
"state.db schema bump to v16 refused: leftover `memory_evidence_v15` table detected \
from an interrupted earlier migration. Restore the state.db from backup or drop \
the leftover table manually before retrying; the prior schema is untouched."
);
}
if !has_current {
return Ok(());
}
check_pod_layer_collapse_gate_a(conn)?;
if let Some(pods_root) = pods_root {
check_pod_layer_collapse_gate_b(pods_root)?;
}
conn.execute_batch("BEGIN")?;
let ddl = conn.execute_batch(
r#"
DROP INDEX IF EXISTS memory_evidence_extracted_idx;
ALTER TABLE memory_evidence RENAME TO memory_evidence_v15;
CREATE TABLE memory_evidence (
id TEXT PRIMARY KEY,
scope TEXT NOT NULL CHECK (scope IN ('workspace', 'work_stream', 'project', 'profile', 'project_truth')),
entry_type TEXT NOT NULL CHECK (entry_type IN ('rule', 'constraint', 'heuristic', 'observation', 'attempt')),
source_kind TEXT NOT NULL CHECK (source_kind IN ('transcript', 'session', 'event_stream', 'hook_output', 'log', 'document')),
summary TEXT NOT NULL,
summary_digest TEXT NOT NULL,
observed_at_epoch_s INTEGER NOT NULL,
actor_id TEXT,
session_id TEXT,
source_ref TEXT,
host TEXT,
host_hook TEXT,
host_session_id TEXT,
host_run_id TEXT,
host_task_id TEXT,
provider_name TEXT,
provider_ref TEXT,
extracted INTEGER NOT NULL DEFAULT 0,
extracted_candidate_id TEXT,
extracted_at_epoch_s INTEGER
);
INSERT INTO memory_evidence
(id, scope, entry_type, source_kind, summary, summary_digest,
observed_at_epoch_s, actor_id, session_id, source_ref, host, host_hook,
host_session_id, host_run_id, host_task_id, provider_name, provider_ref,
extracted, extracted_candidate_id, extracted_at_epoch_s)
SELECT
id, scope, entry_type, source_kind, summary, summary_digest,
observed_at_epoch_s, actor_id, session_id, source_ref, host, host_hook,
host_session_id, host_run_id, host_task_id, provider_name, provider_ref,
extracted, extracted_candidate_id, extracted_at_epoch_s
FROM memory_evidence_v15;
DROP TABLE memory_evidence_v15;
CREATE INDEX IF NOT EXISTS memory_evidence_extracted_idx
ON memory_evidence (extracted, observed_at_epoch_s);
"#,
);
match ddl {
Ok(()) => {
conn.execute_batch("COMMIT")?;
Ok(())
}
Err(error) => {
let _ = conn.execute_batch("ROLLBACK");
Err(error.into())
}
}
}
fn check_pod_layer_collapse_gate_a(conn: &Connection) -> Result<()> {
let pod_row_count: i64 = conn.query_row(
"SELECT COUNT(*) FROM memory_evidence WHERE scope = 'pod'",
[],
|row| row.get(0),
)?;
if pod_row_count > 0 {
bail!(
"state.db schema bump to v16 refused: {pod_row_count} memory_evidence row(s) \
still have scope='pod'. Run `ccd migrate from-pod-layout` and retire residual \
pod-scoped evidence before retrying; the prior schema is untouched."
);
}
Ok(())
}
fn check_pod_layer_collapse_gate_b(pods_root: &Path) -> Result<()> {
let offenders = find_unmigrated_pods(pods_root)?;
if offenders.is_empty() {
return Ok(());
}
let mut missing_marker: Vec<&str> = Vec::new();
let mut stale_marker: Vec<String> = Vec::new();
for offender in &offenders {
match offender {
PodOffender::MissingMarker { name } => missing_marker.push(name),
PodOffender::StaleMarkerOverSources { name, sources } => {
stale_marker.push(format!("{name} (pending: {})", sources.join(", ")));
}
}
}
let mut detail_parts: Vec<String> = Vec::new();
if !missing_marker.is_empty() {
let (noun, verb) = if missing_marker.len() == 1 {
("directory", "lacks")
} else {
("directories", "lack")
};
detail_parts.push(format!(
"{} pod {noun} {verb} MIGRATED.md ({})",
missing_marker.len(),
missing_marker.join(", ")
));
}
if !stale_marker.is_empty() {
let (noun, verb) = if stale_marker.len() == 1 {
("directory", "carries")
} else {
("directories", "carry")
};
detail_parts.push(format!(
"{} pod {noun} {verb} MIGRATED.md but still hold pod-scoped sources [{}]",
stale_marker.len(),
stale_marker.join("; ")
));
}
bail!(
"state.db schema bump to v16 refused under {pods_root}: {detail}. Run \
`ccd migrate from-pod-layout` before retrying; the prior schema is untouched.",
pods_root = pods_root.display(),
detail = detail_parts.join("; "),
);
}
const POD_MARKER_NAMES: &[&str] = &[
"pod.toml",
"machine.toml",
"memory.md",
"policy.md",
"presence",
"repos",
];
const PENDING_SOURCE_NAMES: &[&str] = &["machine.toml"];
const PENDING_PRESENCE_DIR: &str = "presence";
fn is_pod_dir(path: &Path) -> bool {
POD_MARKER_NAMES
.iter()
.any(|marker| path.join(marker).exists())
}
enum PodOffender {
MissingMarker { name: String },
StaleMarkerOverSources { name: String, sources: Vec<String> },
}
fn find_unmigrated_pods(pods_root: &Path) -> Result<Vec<PodOffender>> {
let iter = match std::fs::read_dir(pods_root) {
Ok(iter) => iter,
Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(error) => bail!("failed to read {}: {error}", pods_root.display()),
};
let mut offenders = Vec::new();
for entry in iter {
let entry = entry?;
if !entry.file_type()?.is_dir() {
continue;
}
let path = entry.path();
if !is_pod_dir(&path) {
continue;
}
let name = entry
.file_name()
.to_str()
.map(String::from)
.unwrap_or_else(|| path.display().to_string());
let marker = path.join("MIGRATED.md");
if !marker.is_file() {
offenders.push(PodOffender::MissingMarker { name });
continue;
}
let sources = pending_pod_sources(&path)?;
if !sources.is_empty() {
offenders.push(PodOffender::StaleMarkerOverSources { name, sources });
}
}
offenders.sort_by(|a, b| offender_name(a).cmp(offender_name(b)));
Ok(offenders)
}
fn offender_name(offender: &PodOffender) -> &str {
match offender {
PodOffender::MissingMarker { name } => name,
PodOffender::StaleMarkerOverSources { name, .. } => name,
}
}
fn pending_pod_sources(pod_dir: &Path) -> Result<Vec<String>> {
let mut pending = Vec::new();
for leaf in PENDING_SOURCE_NAMES {
let candidate = pod_dir.join(leaf);
if candidate.is_file() {
pending.push((*leaf).to_string());
}
}
let presence_root = pod_dir.join(PENDING_PRESENCE_DIR);
if presence_root.is_dir() {
let entries = match std::fs::read_dir(&presence_root) {
Ok(entries) => entries,
Err(error) => bail!("failed to read {}: {error}", presence_root.display()),
};
for entry in entries {
let entry = entry?;
if !entry.file_type()?.is_file() {
continue;
}
let file_name = entry.file_name();
let name = match file_name.to_str() {
Some(name) => name,
None => continue,
};
if name.ends_with(".json") {
pending.push(format!("{PENDING_PRESENCE_DIR}/{name}"));
}
}
}
pending.sort();
Ok(pending)
}
fn memory_evidence_check_allows_pod(conn: &Connection) -> Result<bool> {
let sql: Option<String> = conn
.query_row(
"SELECT sql FROM sqlite_master WHERE type = 'table' AND name = 'memory_evidence'",
[],
|row| row.get(0),
)
.ok();
Ok(sql.is_some_and(|s| s.contains("'pod'")))
}
#[cfg(test)]
mod tests {
use super::*;
use rusqlite::Connection;
#[test]
fn table_has_column_rejects_empty_table_name() {
let conn = Connection::open_in_memory().unwrap();
let err = table_has_column(&conn, "", "x").unwrap_err();
assert!(
err.to_string().contains("invalid SQL identifier"),
"got: {err}"
);
}
#[test]
fn table_has_column_rejects_digit_leading_table_name() {
let conn = Connection::open_in_memory().unwrap();
let err = table_has_column(&conn, "123foo", "x").unwrap_err();
assert!(
err.to_string().contains("invalid SQL identifier"),
"got: {err}"
);
}
#[test]
fn table_has_column_rejects_table_name_with_semicolon() {
let conn = Connection::open_in_memory().unwrap();
let err = table_has_column(&conn, "foo; DROP TABLE bar", "x").unwrap_err();
assert!(
err.to_string().contains("invalid SQL identifier"),
"got: {err}"
);
}
#[test]
fn table_has_column_rejects_table_name_with_space() {
let conn = Connection::open_in_memory().unwrap();
let err = table_has_column(&conn, "foo bar", "x").unwrap_err();
assert!(
err.to_string().contains("invalid SQL identifier"),
"got: {err}"
);
}
#[test]
fn table_has_column_rejects_table_name_with_quote() {
let conn = Connection::open_in_memory().unwrap();
let err = table_has_column(&conn, "foo\"bar", "x").unwrap_err();
assert!(
err.to_string().contains("invalid SQL identifier"),
"got: {err}"
);
}
#[test]
fn table_has_column_accepts_valid_identifier() {
let conn = Connection::open_in_memory().unwrap();
initialize(&conn, None).unwrap();
assert!(table_has_column(&conn, "session", "mode").unwrap());
assert!(!table_has_column(&conn, "session", "no_such_column").unwrap());
}
#[test]
fn initialize_creates_all_tables() {
let conn = Connection::open_in_memory().unwrap();
initialize(&conn, None).unwrap();
let version: u32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, CURRENT_VERSION);
for table in [
"handoff",
"session",
"session_activity",
"escalation",
"recovery",
"projection_metadata",
"execution_gates",
"telemetry_cost",
"host_loop_events",
"projection_cache_entries",
"projection_cache_events",
"work_stream_decay",
"memory_op_queue",
"memory_evidence",
] {
let count: i64 = conn
.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(count, 0, "table {table} should exist and be empty");
}
}
#[test]
fn migrate_is_idempotent() {
let conn = Connection::open_in_memory().unwrap();
initialize(&conn, None).unwrap();
migrate(&conn, None).unwrap();
let version: u32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, CURRENT_VERSION);
}
#[test]
fn migrate_rejects_future_version() {
let conn = Connection::open_in_memory().unwrap();
initialize(&conn, None).unwrap();
conn.pragma_update(None, "user_version", CURRENT_VERSION + 1)
.unwrap();
let result = migrate(&conn, None);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("newer than supported"));
}
#[test]
fn migrate_recovers_version_zero_db() {
let conn = Connection::open_in_memory().unwrap();
migrate(&conn, None).unwrap();
let version: u32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, CURRENT_VERSION);
for table in [
"handoff",
"session",
"session_activity",
"escalation",
"recovery",
"projection_metadata",
"execution_gates",
"telemetry_cost",
"host_loop_events",
"memory_op_queue",
"memory_evidence",
] {
let count: i64 = conn
.query_row(&format!("SELECT COUNT(*) FROM {table}"), [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(count, 0, "table {table} should exist after v0 recovery");
}
}
#[test]
fn migrate_repairs_version_zero_partial_session_schema() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE session (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 3,
started_at_epoch_s INTEGER NOT NULL,
last_started_at_epoch_s INTEGER NOT NULL,
start_count INTEGER NOT NULL DEFAULT 1,
session_id TEXT
);
"#,
)
.unwrap();
conn.execute(
"INSERT INTO session
(id, schema_version, started_at_epoch_s, last_started_at_epoch_s, start_count, session_id)
VALUES (1, 3, 10, 10, 1, 'ses_partial')",
[],
)
.unwrap();
migrate(&conn, None).unwrap();
let columns = conn
.prepare("PRAGMA table_info(session)")
.unwrap()
.query_map([], |row| row.get::<_, String>(1))
.unwrap()
.collect::<rusqlite::Result<Vec<_>>>()
.unwrap();
assert!(columns.iter().any(|column| column == "mode"));
assert!(columns.iter().any(|column| column == "owner_kind"));
assert!(columns.iter().any(|column| column == "revision"));
let mode: String = conn
.query_row("SELECT mode FROM session WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(mode, "general");
let owner_kind: String = conn
.query_row("SELECT owner_kind FROM session WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(owner_kind, "interactive");
let revision: u64 = conn
.query_row("SELECT revision FROM session WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(revision, 1);
}
#[test]
fn migrate_v1_to_v2_drops_current_system_state_column() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE handoff (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
current_system_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]'
);
CREATE TABLE session (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 3,
started_at_epoch_s INTEGER NOT NULL,
last_started_at_epoch_s INTEGER NOT NULL,
start_count INTEGER NOT NULL DEFAULT 1,
session_id TEXT
);
CREATE TABLE escalation (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL CHECK (kind IN ('blocking', 'non_blocking')),
reason TEXT NOT NULL,
created_at_epoch_s INTEGER NOT NULL,
session_id TEXT
);
CREATE TABLE recovery (
id INTEGER PRIMARY KEY CHECK (id = 1),
checkpoint_origin TEXT,
checkpoint_captured_at_epoch_s INTEGER,
checkpoint_session_started_at_epoch_s INTEGER,
checkpoint_summary TEXT,
checkpoint_immediate_actions TEXT,
checkpoint_key_files TEXT,
buffer_origin TEXT,
buffer_captured_at_epoch_s INTEGER,
buffer_session_started_at_epoch_s INTEGER,
buffer_summary_lines TEXT
);
CREATE TABLE projection_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
source_fingerprint TEXT NOT NULL,
projection_digests TEXT,
tool_surface_fingerprint TEXT
);
"#,
)
.unwrap();
conn.pragma_update(None, "user_version", 1).unwrap();
conn.execute(
"INSERT INTO handoff
(id, schema_version, title, immediate_actions, completed_state,
current_system_state, operational_guardrails, key_files, definition_of_done)
VALUES (1, 1, 'Test', '[]', '[]', '[\"stale\"]', '[]', '[]', '[]')",
[],
)
.unwrap();
migrate(&conn, None).unwrap();
let version: u32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, CURRENT_VERSION);
let columns = conn
.prepare("PRAGMA table_info(handoff)")
.unwrap()
.query_map([], |row| row.get::<_, String>(1))
.unwrap()
.collect::<rusqlite::Result<Vec<_>>>()
.unwrap();
assert!(!columns
.iter()
.any(|column| column == "current_system_state"));
}
#[test]
fn migrate_v1_to_v2_recovers_from_leftover_temp_table() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE handoff (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
current_system_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]'
);
CREATE TABLE handoff_v2 (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]'
);
CREATE TABLE session (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 3,
started_at_epoch_s INTEGER NOT NULL,
last_started_at_epoch_s INTEGER NOT NULL,
start_count INTEGER NOT NULL DEFAULT 1,
session_id TEXT
);
CREATE TABLE escalation (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL CHECK (kind IN ('blocking', 'non_blocking')),
reason TEXT NOT NULL,
created_at_epoch_s INTEGER NOT NULL,
session_id TEXT
);
CREATE TABLE recovery (
id INTEGER PRIMARY KEY CHECK (id = 1),
checkpoint_origin TEXT,
checkpoint_captured_at_epoch_s INTEGER,
checkpoint_session_started_at_epoch_s INTEGER,
checkpoint_summary TEXT,
checkpoint_immediate_actions TEXT,
checkpoint_key_files TEXT,
buffer_origin TEXT,
buffer_captured_at_epoch_s INTEGER,
buffer_session_started_at_epoch_s INTEGER,
buffer_summary_lines TEXT
);
CREATE TABLE projection_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
source_fingerprint TEXT NOT NULL,
projection_digests TEXT,
tool_surface_fingerprint TEXT
);
"#,
)
.unwrap();
conn.pragma_update(None, "user_version", 1).unwrap();
conn.execute(
"INSERT INTO handoff
(id, schema_version, title, immediate_actions, completed_state,
current_system_state, operational_guardrails, key_files, definition_of_done)
VALUES (1, 1, 'Retry-safe', '[]', '[]', '[\"stale\"]', '[]', '[]', '[]')",
[],
)
.unwrap();
migrate(&conn, None).unwrap();
let title: String = conn
.query_row("SELECT title FROM handoff WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(title, "Retry-safe");
}
#[test]
fn migrate_v2_to_v3_adds_execution_gates_table() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE handoff (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]'
);
CREATE TABLE session (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 3,
started_at_epoch_s INTEGER NOT NULL,
last_started_at_epoch_s INTEGER NOT NULL,
start_count INTEGER NOT NULL DEFAULT 1,
session_id TEXT
);
CREATE TABLE escalation (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL CHECK (kind IN ('blocking', 'non_blocking')),
reason TEXT NOT NULL,
created_at_epoch_s INTEGER NOT NULL,
session_id TEXT
);
CREATE TABLE recovery (
id INTEGER PRIMARY KEY CHECK (id = 1),
checkpoint_origin TEXT,
checkpoint_captured_at_epoch_s INTEGER,
checkpoint_session_started_at_epoch_s INTEGER,
checkpoint_summary TEXT,
checkpoint_immediate_actions TEXT,
checkpoint_key_files TEXT,
buffer_origin TEXT,
buffer_captured_at_epoch_s INTEGER,
buffer_session_started_at_epoch_s INTEGER,
buffer_summary_lines TEXT
);
CREATE TABLE projection_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
source_fingerprint TEXT NOT NULL,
projection_digests TEXT,
tool_surface_fingerprint TEXT
);
"#,
)
.unwrap();
conn.pragma_update(None, "user_version", 2).unwrap();
migrate(&conn, None).unwrap();
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM execution_gates", [], |row| row.get(0))
.unwrap();
assert_eq!(count, 0);
}
#[test]
fn migrate_v3_to_v4_adds_session_mode_column() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE handoff (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]'
);
CREATE TABLE session (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 3,
started_at_epoch_s INTEGER NOT NULL,
last_started_at_epoch_s INTEGER NOT NULL,
start_count INTEGER NOT NULL DEFAULT 1,
session_id TEXT
);
CREATE TABLE escalation (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL CHECK (kind IN ('blocking', 'non_blocking')),
reason TEXT NOT NULL,
created_at_epoch_s INTEGER NOT NULL,
session_id TEXT
);
CREATE TABLE recovery (
id INTEGER PRIMARY KEY CHECK (id = 1),
checkpoint_origin TEXT,
checkpoint_captured_at_epoch_s INTEGER,
checkpoint_session_started_at_epoch_s INTEGER,
checkpoint_summary TEXT,
checkpoint_immediate_actions TEXT,
checkpoint_key_files TEXT,
buffer_origin TEXT,
buffer_captured_at_epoch_s INTEGER,
buffer_session_started_at_epoch_s INTEGER,
buffer_summary_lines TEXT
);
CREATE TABLE projection_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
source_fingerprint TEXT NOT NULL,
projection_digests TEXT,
tool_surface_fingerprint TEXT
);
CREATE TABLE execution_gates (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
seeded_from TEXT,
gates TEXT NOT NULL DEFAULT '[]'
);
"#,
)
.unwrap();
conn.pragma_update(None, "user_version", 3).unwrap();
conn.execute(
"INSERT INTO session
(id, schema_version, started_at_epoch_s, last_started_at_epoch_s, start_count, session_id)
VALUES (1, 3, 1000, 1000, 1, 'ses_V3')",
[],
)
.unwrap();
migrate(&conn, None).unwrap();
let mode: String = conn
.query_row("SELECT mode FROM session WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(mode, "general");
let owner_kind: String = conn
.query_row("SELECT owner_kind FROM session WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(owner_kind, "interactive");
let revision: u64 = conn
.query_row("SELECT revision FROM session WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(revision, 1);
}
#[test]
fn migrate_v6_to_v8_adds_session_activity_table_without_disturbing_surface_revisions() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE handoff (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]'
);
CREATE TABLE session (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 4,
started_at_epoch_s INTEGER NOT NULL,
last_started_at_epoch_s INTEGER NOT NULL,
start_count INTEGER NOT NULL DEFAULT 1,
session_id TEXT,
mode TEXT NOT NULL DEFAULT 'general' CHECK (mode IN ('general', 'research', 'implement')),
owner_kind TEXT CHECK (owner_kind IN ('interactive', 'runtime_supervisor', 'runtime_worker')),
owner_id TEXT,
supervisor_id TEXT,
lease_ttl_secs INTEGER,
last_heartbeat_at_epoch_s INTEGER,
revision INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE escalation (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL CHECK (kind IN ('blocking', 'non_blocking')),
reason TEXT NOT NULL,
created_at_epoch_s INTEGER NOT NULL,
session_id TEXT
);
CREATE TABLE recovery (
id INTEGER PRIMARY KEY CHECK (id = 1),
checkpoint_origin TEXT,
checkpoint_captured_at_epoch_s INTEGER,
checkpoint_session_started_at_epoch_s INTEGER,
checkpoint_summary TEXT,
checkpoint_immediate_actions TEXT,
checkpoint_key_files TEXT,
buffer_origin TEXT,
buffer_captured_at_epoch_s INTEGER,
buffer_session_started_at_epoch_s INTEGER,
buffer_summary_lines TEXT
);
CREATE TABLE projection_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
source_fingerprint TEXT NOT NULL,
projection_digests TEXT,
tool_surface_fingerprint TEXT
);
CREATE TABLE execution_gates (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
seeded_from TEXT,
gates TEXT NOT NULL DEFAULT '[]'
);
CREATE TABLE telemetry_cost (
id INTEGER PRIMARY KEY AUTOINCREMENT,
backlog_item_ref TEXT NOT NULL,
focus_key TEXT NOT NULL DEFAULT '',
observed_at_epoch_s INTEGER NOT NULL,
host TEXT,
model TEXT,
cost_usd REAL,
total_tokens INTEGER,
input_tokens INTEGER,
output_tokens INTEGER,
context_used_pct INTEGER,
context_window_tokens INTEGER,
compacted INTEGER
);
"#,
)
.unwrap();
conn.pragma_update(None, "user_version", 6).unwrap();
conn.execute(
"INSERT INTO handoff
(id, schema_version, title, immediate_actions, completed_state, operational_guardrails, key_files, definition_of_done)
VALUES (1, 1, 'Seeded', '[\"one\"]', '[]', '[]', '[]', '[\"done\"]')",
[],
)
.unwrap();
conn.execute(
"INSERT INTO execution_gates (id, schema_version, seeded_from, gates)
VALUES (1, 1, 'handoff:immediate_actions', '[{\"text\":\"gate\",\"status\":\"open\"}]')",
[],
)
.unwrap();
migrate(&conn, None).unwrap();
let handoff_revision: u64 = conn
.query_row("SELECT revision FROM handoff WHERE id = 1", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(handoff_revision, 1);
let gate_schema_version: u32 = conn
.query_row(
"SELECT schema_version FROM execution_gates WHERE id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(gate_schema_version, 2);
let gate_revision: u64 = conn
.query_row(
"SELECT revision FROM execution_gates WHERE id = 1",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(gate_revision, 1);
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM session_activity", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(count, 0);
}
#[test]
fn migrate_v9_to_v10_renames_focus_columns() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE telemetry_cost (
session_id TEXT PRIMARY KEY,
recorded_at_epoch_s INTEGER NOT NULL,
focus_key TEXT NOT NULL DEFAULT '',
focus_title TEXT,
ccd_id INTEGER NOT NULL DEFAULT 0,
github_issue_number INTEGER NOT NULL DEFAULT 0,
backlog_provider TEXT,
backlog_kind TEXT,
backlog_id TEXT,
backlog_url TEXT,
model TEXT,
session_cost_usd REAL NOT NULL,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
cache_creation_input_tokens INTEGER NOT NULL DEFAULT 0,
cache_read_input_tokens INTEGER NOT NULL DEFAULT 0,
blended_total_tokens INTEGER
);
CREATE INDEX telemetry_cost_focus_key_idx
ON telemetry_cost (focus_key);
INSERT INTO telemetry_cost
(session_id, recorded_at_epoch_s, focus_key, focus_title,
session_cost_usd)
VALUES
('ses_1', 1000, 'ccd:42', 'Widget refactor', 1.50);
"#,
)
.unwrap();
migrate_v9_to_v10(&conn).unwrap();
assert!(!table_has_column(&conn, "telemetry_cost", "focus_key").unwrap());
assert!(!table_has_column(&conn, "telemetry_cost", "focus_title").unwrap());
assert!(table_has_column(&conn, "telemetry_cost", "next_step_key").unwrap());
assert!(table_has_column(&conn, "telemetry_cost", "next_step_title").unwrap());
let key: String = conn
.query_row(
"SELECT next_step_key FROM telemetry_cost WHERE session_id = 'ses_1'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(key, "ccd:42");
let title: String = conn
.query_row(
"SELECT next_step_title FROM telemetry_cost WHERE session_id = 'ses_1'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(title, "Widget refactor");
}
#[test]
fn migrate_v10_to_v11_adds_session_id_column() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE projection_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
source_fingerprint TEXT NOT NULL,
projection_digests TEXT,
tool_surface_fingerprint TEXT
);
INSERT INTO projection_metadata
(observed_at_epoch_s, source_fingerprint, projection_digests, tool_surface_fingerprint)
VALUES
(1000, 'fp_1', NULL, 'tool_abc');
"#,
)
.unwrap();
migrate_v10_to_v11(&conn).unwrap();
assert!(table_has_column(&conn, "projection_metadata", "session_id").unwrap());
let session_id: Option<String> = conn
.query_row(
"SELECT session_id FROM projection_metadata WHERE source_fingerprint = 'fp_1'",
[],
|row| row.get(0),
)
.unwrap();
assert!(session_id.is_none());
conn.execute(
"INSERT INTO projection_metadata
(observed_at_epoch_s, source_fingerprint, projection_digests,
tool_surface_fingerprint, session_id)
VALUES (2000, 'fp_2', NULL, NULL, 'ses_test')",
[],
)
.unwrap();
let session_id: Option<String> = conn
.query_row(
"SELECT session_id FROM projection_metadata WHERE source_fingerprint = 'fp_2'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(session_id.as_deref(), Some("ses_test"));
migrate_v10_to_v11(&conn).unwrap();
}
#[test]
fn migrate_v11_to_v12_adds_memory_evidence_table() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE handoff (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]',
revision INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE session (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 4,
started_at_epoch_s INTEGER NOT NULL,
last_started_at_epoch_s INTEGER NOT NULL,
start_count INTEGER NOT NULL DEFAULT 1,
session_id TEXT,
mode TEXT NOT NULL DEFAULT 'general',
owner_kind TEXT,
owner_id TEXT,
supervisor_id TEXT,
lease_ttl_secs INTEGER,
last_heartbeat_at_epoch_s INTEGER,
revision INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE session_activity (
id INTEGER PRIMARY KEY CHECK (id = 1),
session_id TEXT NOT NULL,
actor_id TEXT NOT NULL,
current_activity TEXT NOT NULL,
updated_at_epoch_s INTEGER NOT NULL,
session_revision INTEGER NOT NULL
);
CREATE TABLE escalation (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL CHECK (kind IN ('blocking', 'non_blocking')),
reason TEXT NOT NULL,
created_at_epoch_s INTEGER NOT NULL,
session_id TEXT
);
CREATE TABLE recovery (
id INTEGER PRIMARY KEY CHECK (id = 1),
checkpoint_origin TEXT,
checkpoint_captured_at_epoch_s INTEGER,
checkpoint_session_started_at_epoch_s INTEGER,
checkpoint_summary TEXT,
checkpoint_immediate_actions TEXT,
checkpoint_key_files TEXT,
buffer_origin TEXT,
buffer_captured_at_epoch_s INTEGER,
buffer_session_started_at_epoch_s INTEGER,
buffer_summary_lines TEXT
);
CREATE TABLE projection_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
source_fingerprint TEXT NOT NULL,
projection_digests TEXT,
tool_surface_fingerprint TEXT,
session_id TEXT
);
CREATE TABLE execution_gates (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 2,
seeded_from TEXT,
gates TEXT NOT NULL DEFAULT '[]',
revision INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE telemetry_cost (
session_id TEXT PRIMARY KEY,
recorded_at_epoch_s INTEGER NOT NULL,
next_step_key TEXT NOT NULL DEFAULT '',
next_step_title TEXT,
ccd_id INTEGER NOT NULL DEFAULT 0,
github_issue_number INTEGER NOT NULL DEFAULT 0,
backlog_provider TEXT,
backlog_kind TEXT,
backlog_id TEXT,
backlog_url TEXT,
model TEXT,
session_cost_usd REAL NOT NULL,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
cache_creation_input_tokens INTEGER NOT NULL DEFAULT 0,
cache_read_input_tokens INTEGER NOT NULL DEFAULT 0,
blended_total_tokens INTEGER
);
CREATE INDEX telemetry_cost_next_step_key_idx
ON telemetry_cost (next_step_key);
CREATE TABLE memory_op_queue (
id TEXT PRIMARY KEY,
command TEXT NOT NULL,
request_fingerprint TEXT NOT NULL,
plan_json TEXT NOT NULL,
staged_at_epoch_s INTEGER NOT NULL,
updated_at_epoch_s INTEGER NOT NULL,
actor_id TEXT,
session_id TEXT,
reconciled INTEGER NOT NULL DEFAULT 0,
outcome TEXT,
authored_entry_ids TEXT NOT NULL DEFAULT '[]',
authored_target_path TEXT,
authored_source_path TEXT,
snapshot_json TEXT
);
"#,
)
.unwrap();
conn.pragma_update(None, "user_version", 11).unwrap();
migrate(&conn, None).unwrap();
let version: u32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, CURRENT_VERSION);
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM memory_evidence", [], |row| row.get(0))
.unwrap();
assert_eq!(count, 0);
}
#[test]
fn migrate_v12_to_v13_adds_host_loop_events_table() {
let conn = Connection::open_in_memory().unwrap();
initialize(&conn, None).unwrap();
conn.execute_batch(
r#"
DROP INDEX IF EXISTS host_loop_events_host_hook_idx;
DROP INDEX IF EXISTS host_loop_events_observed_idx;
DROP TABLE IF EXISTS host_loop_events;
"#,
)
.unwrap();
conn.pragma_update(None, "user_version", 12).unwrap();
migrate(&conn, None).unwrap();
let version: u32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, CURRENT_VERSION);
let count: i64 = conn
.query_row("SELECT COUNT(*) FROM host_loop_events", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(count, 0);
}
#[test]
fn migrate_v13_to_v14_adds_projection_cache_and_decay_tables() {
let conn = Connection::open_in_memory().unwrap();
initialize(&conn, None).unwrap();
conn.execute_batch(
r#"
DROP INDEX IF EXISTS projection_cache_events_session_idx;
DROP INDEX IF EXISTS projection_cache_events_target_format_idx;
DROP TABLE IF EXISTS projection_cache_events;
DROP INDEX IF EXISTS projection_cache_entries_observed_idx;
DROP INDEX IF EXISTS projection_cache_entries_key_idx;
DROP TABLE IF EXISTS projection_cache_entries;
DROP TABLE IF EXISTS work_stream_decay;
"#,
)
.unwrap();
conn.pragma_update(None, "user_version", 13).unwrap();
migrate(&conn, None).unwrap();
let version: u32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, CURRENT_VERSION);
let cache_entries: i64 = conn
.query_row("SELECT COUNT(*) FROM projection_cache_entries", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(cache_entries, 0);
let cache_events: i64 = conn
.query_row("SELECT COUNT(*) FROM projection_cache_events", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(cache_events, 0);
let decay_rows: i64 = conn
.query_row("SELECT COUNT(*) FROM work_stream_decay", [], |row| {
row.get(0)
})
.unwrap();
assert_eq!(decay_rows, 0);
}
#[test]
fn migrate_v14_to_v15_drops_legacy_telemetry_identity_columns() {
let conn = Connection::open_in_memory().unwrap();
initialize(&conn, None).unwrap();
conn.execute_batch(
r#"
DROP INDEX IF EXISTS telemetry_cost_next_step_key_idx;
DROP TABLE IF EXISTS telemetry_cost;
CREATE TABLE telemetry_cost (
session_id TEXT PRIMARY KEY,
recorded_at_epoch_s INTEGER NOT NULL,
next_step_key TEXT NOT NULL DEFAULT '',
next_step_title TEXT,
ccd_id INTEGER NOT NULL DEFAULT 0,
github_issue_number INTEGER NOT NULL DEFAULT 0,
backlog_provider TEXT,
backlog_kind TEXT,
backlog_id TEXT,
backlog_url TEXT,
model TEXT,
session_cost_usd REAL NOT NULL,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
cache_creation_input_tokens INTEGER NOT NULL DEFAULT 0,
cache_read_input_tokens INTEGER NOT NULL DEFAULT 0,
blended_total_tokens INTEGER
);
CREATE INDEX telemetry_cost_next_step_key_idx
ON telemetry_cost (next_step_key);
INSERT INTO telemetry_cost
(session_id, recorded_at_epoch_s, next_step_key, next_step_title,
ccd_id, github_issue_number, backlog_provider, backlog_kind, backlog_id,
backlog_url, model, session_cost_usd, input_tokens, output_tokens,
cache_creation_input_tokens, cache_read_input_tokens, blended_total_tokens)
VALUES
('ses_1', 1000, 'handoff_title:seeded', 'Next Session: Runtime cleanup',
42, 142, 'github-issues', 'issue', '142',
'https://example.test/issues/142', 'gpt-5', 1.50, 100, 50, 25, 10, 185);
"#,
)
.unwrap();
conn.pragma_update(None, "user_version", 14).unwrap();
migrate(&conn, None).unwrap();
assert!(!table_has_column(&conn, "telemetry_cost", "ccd_id").unwrap());
assert!(!table_has_column(&conn, "telemetry_cost", "github_issue_number").unwrap());
assert!(!table_has_column(&conn, "telemetry_cost", "backlog_provider").unwrap());
assert!(!table_has_column(&conn, "telemetry_cost", "backlog_kind").unwrap());
assert!(!table_has_column(&conn, "telemetry_cost", "backlog_id").unwrap());
assert!(!table_has_column(&conn, "telemetry_cost", "backlog_url").unwrap());
let row: (String, String, String, f64) = conn
.query_row(
"SELECT next_step_key, next_step_title, model, session_cost_usd
FROM telemetry_cost WHERE session_id = 'ses_1'",
[],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?, row.get(3)?)),
)
.unwrap();
assert_eq!(row.0, "handoff_title:seeded");
assert_eq!(row.1, "Next Session: Runtime cleanup");
assert_eq!(row.2, "gpt-5");
assert!((row.3 - 1.5).abs() < f64::EPSILON);
}
fn seed_v15_memory_evidence(conn: &Connection) {
conn.execute_batch(
r#"
CREATE TABLE memory_evidence (
id TEXT PRIMARY KEY,
scope TEXT NOT NULL CHECK (scope IN ('workspace', 'work_stream', 'project', 'profile', 'pod', 'project_truth')),
entry_type TEXT NOT NULL CHECK (entry_type IN ('rule', 'constraint', 'heuristic', 'observation', 'attempt')),
source_kind TEXT NOT NULL CHECK (source_kind IN ('transcript', 'session', 'event_stream', 'hook_output', 'log', 'document')),
summary TEXT NOT NULL,
summary_digest TEXT NOT NULL,
observed_at_epoch_s INTEGER NOT NULL,
actor_id TEXT,
session_id TEXT,
source_ref TEXT,
host TEXT,
host_hook TEXT,
host_session_id TEXT,
host_run_id TEXT,
host_task_id TEXT,
provider_name TEXT,
provider_ref TEXT,
extracted INTEGER NOT NULL DEFAULT 0,
extracted_candidate_id TEXT,
extracted_at_epoch_s INTEGER
);
CREATE INDEX memory_evidence_extracted_idx
ON memory_evidence (extracted, observed_at_epoch_s);
"#,
)
.unwrap();
}
fn insert_evidence(conn: &Connection, id: &str, scope: &str) {
conn.execute(
"INSERT INTO memory_evidence
(id, scope, entry_type, source_kind, summary, summary_digest,
observed_at_epoch_s)
VALUES (?1, ?2, 'rule', 'transcript', 'x', 'd', 1000)",
rusqlite::params![id, scope],
)
.unwrap();
}
#[test]
fn migrate_v15_to_v16_refuses_when_pod_rows_exist() {
let conn = Connection::open_in_memory().unwrap();
seed_v15_memory_evidence(&conn);
insert_evidence(&conn, "ev_1", "pod");
let before_sql: String = conn
.query_row(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='memory_evidence'",
[],
|row| row.get(0),
)
.unwrap();
let err = migrate_v15_to_v16_with_pods_root(&conn, None).expect_err("gate must refuse");
let msg = err.to_string();
assert!(msg.contains("scope='pod'"), "got: {msg}");
assert!(
msg.contains("ccd migrate from-pod-layout"),
"error must name the migration tool, got: {msg}"
);
let after_sql: String = conn
.query_row(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='memory_evidence'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(before_sql, after_sql);
}
#[test]
fn migrate_v15_to_v16_refuses_when_pod_dirs_lack_migrated_md() {
let temp = tempfile::tempdir().unwrap();
let pods_root = temp.path().join("pods");
std::fs::create_dir_all(pods_root.join("alpha")).unwrap();
std::fs::create_dir_all(pods_root.join("beta")).unwrap();
std::fs::write(pods_root.join("alpha/pod.toml"), "").unwrap();
std::fs::write(pods_root.join("beta/pod.toml"), "").unwrap();
std::fs::write(pods_root.join("alpha/MIGRATED.md"), "ok").unwrap();
let conn = Connection::open_in_memory().unwrap();
seed_v15_memory_evidence(&conn);
let err = migrate_v15_to_v16_with_pods_root(&conn, Some(&pods_root))
.expect_err("gate must refuse");
let msg = err.to_string();
assert!(
msg.contains("beta"),
"error must name the offending pod: {msg}"
);
assert!(
!msg.contains("alpha"),
"migrated pod must not be listed: {msg}"
);
assert!(msg.contains("ccd migrate from-pod-layout"), "got: {msg}");
assert!(memory_evidence_check_allows_pod(&conn).unwrap());
}
#[test]
fn migrate_v15_to_v16_succeeds_when_gate_passes() {
let temp = tempfile::tempdir().unwrap();
let pods_root = temp.path().join("pods");
let conn = Connection::open_in_memory().unwrap();
seed_v15_memory_evidence(&conn);
insert_evidence(&conn, "ev_keep", "workspace");
migrate_v15_to_v16_with_pods_root(&conn, Some(&pods_root)).unwrap();
let surviving: String = conn
.query_row(
"SELECT scope FROM memory_evidence WHERE id = 'ev_keep'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(surviving, "workspace");
assert!(!memory_evidence_check_allows_pod(&conn).unwrap());
let rejected = conn.execute(
"INSERT INTO memory_evidence
(id, scope, entry_type, source_kind, summary, summary_digest,
observed_at_epoch_s)
VALUES ('ev_bad', 'pod', 'rule', 'transcript', 'x', 'd', 1000)",
[],
);
assert!(
rejected.is_err(),
"post-migration CHECK must reject scope='pod'"
);
let idx_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master
WHERE type='index' AND name='memory_evidence_extracted_idx'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(idx_count, 1);
}
#[test]
fn migrate_v15_to_v16_succeeds_when_all_pods_migrated() {
let temp = tempfile::tempdir().unwrap();
let pods_root = temp.path().join("pods");
std::fs::create_dir_all(pods_root.join("alpha")).unwrap();
std::fs::create_dir_all(pods_root.join("beta")).unwrap();
std::fs::write(pods_root.join("alpha/pod.toml"), "").unwrap();
std::fs::write(pods_root.join("beta/pod.toml"), "").unwrap();
std::fs::write(pods_root.join("alpha/MIGRATED.md"), "ok").unwrap();
std::fs::write(pods_root.join("beta/MIGRATED.md"), "ok").unwrap();
let conn = Connection::open_in_memory().unwrap();
seed_v15_memory_evidence(&conn);
migrate_v15_to_v16_with_pods_root(&conn, Some(&pods_root)).unwrap();
assert!(!memory_evidence_check_allows_pod(&conn).unwrap());
}
#[test]
fn migrate_v15_to_v16_ignores_foreign_dirs_without_pod_markers() {
let temp = tempfile::tempdir().unwrap();
let pods_root = temp.path().join("pods");
std::fs::create_dir_all(pods_root.join("legacy-backup")).unwrap();
std::fs::write(pods_root.join("legacy-backup/notes.txt"), "junk").unwrap();
std::fs::create_dir_all(pods_root.join("scratch/nested")).unwrap();
let conn = Connection::open_in_memory().unwrap();
seed_v15_memory_evidence(&conn);
migrate_v15_to_v16_with_pods_root(&conn, Some(&pods_root)).unwrap();
assert!(!memory_evidence_check_allows_pod(&conn).unwrap());
}
#[test]
fn migrate_v15_to_v16_flags_real_pods_alongside_foreign_dirs() {
let temp = tempfile::tempdir().unwrap();
let pods_root = temp.path().join("pods");
std::fs::create_dir_all(pods_root.join("realpod")).unwrap();
std::fs::write(pods_root.join("realpod/machine.toml"), "").unwrap();
std::fs::create_dir_all(pods_root.join("random-backup")).unwrap();
std::fs::write(pods_root.join("random-backup/data.bin"), "junk").unwrap();
let conn = Connection::open_in_memory().unwrap();
seed_v15_memory_evidence(&conn);
let err = migrate_v15_to_v16_with_pods_root(&conn, Some(&pods_root))
.expect_err("real pod without MIGRATED.md must still block");
let msg = err.to_string();
assert!(msg.contains("realpod"), "got: {msg}");
assert!(
!msg.contains("random-backup"),
"foreign dir must not be reported: {msg}"
);
}
#[test]
fn is_pod_dir_recognizes_each_marker() {
for marker in POD_MARKER_NAMES {
let temp = tempfile::tempdir().unwrap();
let pod = temp.path().join("candidate");
std::fs::create_dir_all(&pod).unwrap();
let marker_path = pod.join(marker);
if *marker == "presence" || *marker == "repos" {
std::fs::create_dir_all(&marker_path).unwrap();
} else {
std::fs::write(&marker_path, "").unwrap();
}
assert!(is_pod_dir(&pod), "marker {marker} must count as a pod");
}
let temp = tempfile::tempdir().unwrap();
let pod = temp.path().join("not-a-pod");
std::fs::create_dir_all(&pod).unwrap();
std::fs::write(pod.join("README.txt"), "").unwrap();
assert!(
!is_pod_dir(&pod),
"dir without markers must not count as a pod"
);
}
#[test]
fn migrate_v15_to_v16_is_noop_when_memory_evidence_absent() {
let conn = Connection::open_in_memory().unwrap();
migrate_v15_to_v16_with_pods_root(&conn, None).unwrap();
}
#[test]
fn full_migrate_chain_from_v11_drops_pod_from_check() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
r#"
CREATE TABLE handoff (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
title TEXT NOT NULL DEFAULT '',
immediate_actions TEXT NOT NULL DEFAULT '[]',
completed_state TEXT NOT NULL DEFAULT '[]',
operational_guardrails TEXT NOT NULL DEFAULT '[]',
key_files TEXT NOT NULL DEFAULT '[]',
definition_of_done TEXT NOT NULL DEFAULT '[]',
revision INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE session (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 4,
started_at_epoch_s INTEGER NOT NULL,
last_started_at_epoch_s INTEGER NOT NULL,
start_count INTEGER NOT NULL DEFAULT 1,
session_id TEXT,
mode TEXT NOT NULL DEFAULT 'general',
owner_kind TEXT,
owner_id TEXT,
supervisor_id TEXT,
lease_ttl_secs INTEGER,
last_heartbeat_at_epoch_s INTEGER,
revision INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE execution_gates (
id INTEGER PRIMARY KEY CHECK (id = 1),
schema_version INTEGER NOT NULL DEFAULT 1,
seeded_from TEXT,
gates TEXT NOT NULL DEFAULT '[]',
revision INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE escalation (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL,
reason TEXT NOT NULL,
created_at_epoch_s INTEGER NOT NULL,
session_id TEXT
);
CREATE TABLE projection_metadata (
id INTEGER PRIMARY KEY AUTOINCREMENT,
observed_at_epoch_s INTEGER NOT NULL,
source_fingerprint TEXT NOT NULL,
projection_digests TEXT,
tool_surface_fingerprint TEXT,
session_id TEXT
);
CREATE TABLE session_activity (
session_id TEXT NOT NULL,
started_at_epoch_s INTEGER NOT NULL,
last_active_at_epoch_s INTEGER NOT NULL
);
CREATE TABLE telemetry_cost (
session_id TEXT PRIMARY KEY,
recorded_at_epoch_s INTEGER NOT NULL,
next_step_key TEXT NOT NULL DEFAULT '',
next_step_title TEXT,
model TEXT,
session_cost_usd REAL NOT NULL,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
cache_creation_input_tokens INTEGER NOT NULL DEFAULT 0,
cache_read_input_tokens INTEGER NOT NULL DEFAULT 0,
blended_total_tokens INTEGER
);
CREATE INDEX telemetry_cost_next_step_key_idx
ON telemetry_cost (next_step_key);
CREATE TABLE memory_op_queue (
id TEXT PRIMARY KEY,
command TEXT NOT NULL,
request_fingerprint TEXT NOT NULL,
plan_json TEXT NOT NULL,
staged_at_epoch_s INTEGER NOT NULL,
updated_at_epoch_s INTEGER NOT NULL,
actor_id TEXT,
session_id TEXT,
reconciled INTEGER NOT NULL DEFAULT 0,
outcome TEXT,
authored_entry_ids TEXT NOT NULL DEFAULT '[]',
authored_target_path TEXT,
authored_source_path TEXT,
snapshot_json TEXT
);
"#,
)
.unwrap();
conn.pragma_update(None, "user_version", 11).unwrap();
migrate(&conn, None).unwrap();
let version: u32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, CURRENT_VERSION);
assert!(!memory_evidence_check_allows_pod(&conn).unwrap());
}
#[test]
fn migrate_v15_to_v16_refuses_stale_marker_over_live_machine_toml() {
let temp = tempfile::tempdir().unwrap();
let pods_root = temp.path().join("pods");
std::fs::create_dir_all(pods_root.join("stale")).unwrap();
std::fs::write(pods_root.join("stale/pod.toml"), "").unwrap();
std::fs::write(pods_root.join("stale/machine.toml"), "").unwrap();
std::fs::write(pods_root.join("stale/MIGRATED.md"), "ok").unwrap();
let conn = Connection::open_in_memory().unwrap();
seed_v15_memory_evidence(&conn);
let err = migrate_v15_to_v16_with_pods_root(&conn, Some(&pods_root))
.expect_err("stale marker must block the bump");
let msg = err.to_string();
assert!(msg.contains("stale"), "pod name must appear: {msg}");
assert!(
msg.contains("machine.toml"),
"pending source must be named: {msg}"
);
assert!(msg.contains("ccd migrate from-pod-layout"), "got: {msg}");
assert!(memory_evidence_check_allows_pod(&conn).unwrap());
}
#[test]
fn migrate_v15_to_v16_refuses_stale_marker_over_pending_presence_json() {
let temp = tempfile::tempdir().unwrap();
let pods_root = temp.path().join("pods");
let pod = pods_root.join("stale");
std::fs::create_dir_all(pod.join("presence")).unwrap();
std::fs::write(pod.join("pod.toml"), "").unwrap();
std::fs::write(pod.join("presence/laptop.json"), "{}").unwrap();
std::fs::write(pod.join("MIGRATED.md"), "ok").unwrap();
let conn = Connection::open_in_memory().unwrap();
seed_v15_memory_evidence(&conn);
let err = migrate_v15_to_v16_with_pods_root(&conn, Some(&pods_root))
.expect_err("presence/*.json must block a stale marker");
let msg = err.to_string();
assert!(msg.contains("presence/laptop.json"), "got: {msg}");
}
#[test]
fn migrate_v15_to_v16_accepts_marker_over_inert_legacy_files() {
let temp = tempfile::tempdir().unwrap();
let pod = temp.path().join("pods/migrated");
std::fs::create_dir_all(&pod).unwrap();
std::fs::write(pod.join("pod.toml"), "").unwrap();
std::fs::write(pod.join("memory.md"), "stale memory").unwrap();
std::fs::write(pod.join("policy.md"), "stale policy").unwrap();
std::fs::write(pod.join("MIGRATED.md"), "ok").unwrap();
let conn = Connection::open_in_memory().unwrap();
seed_v15_memory_evidence(&conn);
migrate_v15_to_v16_with_pods_root(&conn, Some(&temp.path().join("pods"))).unwrap();
assert!(!memory_evidence_check_allows_pod(&conn).unwrap());
}
#[test]
fn migrate_v15_to_v16_refuses_leftover_v15_table_from_prior_attempt() {
let conn = Connection::open_in_memory().unwrap();
seed_v15_memory_evidence(&conn);
conn.execute_batch("ALTER TABLE memory_evidence RENAME TO memory_evidence_v15;")
.unwrap();
let err = migrate_v15_to_v16_with_pods_root(&conn, None)
.expect_err("leftover v15 table must block the bump");
assert!(
err.to_string().contains("memory_evidence_v15"),
"error must name the leftover table: {err}"
);
}
#[test]
fn migrate_v15_to_v16_rolls_back_ddl_on_mid_migration_failure() {
let conn = Connection::open_in_memory().unwrap();
seed_v15_memory_evidence(&conn);
conn.execute_batch(
r#"
DROP INDEX IF EXISTS memory_evidence_extracted_idx;
DROP TABLE memory_evidence;
CREATE TABLE memory_evidence (
id TEXT PRIMARY KEY,
scope TEXT NOT NULL,
entry_type TEXT NOT NULL,
source_kind TEXT NOT NULL,
summary TEXT NOT NULL,
summary_digest TEXT NOT NULL,
observed_at_epoch_s INTEGER NOT NULL,
actor_id TEXT, session_id TEXT, source_ref TEXT, host TEXT,
host_hook TEXT, host_session_id TEXT, host_run_id TEXT,
host_task_id TEXT, provider_name TEXT, provider_ref TEXT,
extracted INTEGER NOT NULL DEFAULT 0,
extracted_candidate_id TEXT, extracted_at_epoch_s INTEGER
);
INSERT INTO memory_evidence
(id, scope, entry_type, source_kind, summary, summary_digest, observed_at_epoch_s)
VALUES ('ev_bad', 'workspace', 'not_a_real_entry_type', 'transcript', 'x', 'd', 1);
"#,
)
.unwrap();
let result = migrate_v15_to_v16_with_pods_root(&conn, None);
assert!(result.is_err(), "migration must fail on invalid entry_type");
assert!(table_exists(&conn, "memory_evidence").unwrap());
assert!(!table_exists(&conn, "memory_evidence_v15").unwrap());
let row_count: i64 = conn
.query_row("SELECT COUNT(*) FROM memory_evidence", [], |row| row.get(0))
.unwrap();
assert_eq!(row_count, 1, "pre-migration row must survive rollback");
}
#[test]
fn migrate_v0_repair_runs_gate_b_against_unmigrated_pods() {
let temp = tempfile::tempdir().unwrap();
let pods_root = temp.path().join("pods");
std::fs::create_dir_all(pods_root.join("pending")).unwrap();
std::fs::write(pods_root.join("pending/pod.toml"), "").unwrap();
let conn = Connection::open_in_memory().unwrap();
let err = migrate(&conn, Some(&pods_root)).expect_err("v0 repair path must honor gate (b)");
assert!(err.to_string().contains("pending"), "got: {err}");
let version: u32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(
version, 0,
"bump must be rolled back to pre-migration state"
);
let table_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(
table_count, 0,
"refused v0 migration must leave the DB byte-for-byte untouched"
);
}
#[test]
fn migrate_v0_repair_allows_clean_environment() {
let temp = tempfile::tempdir().unwrap();
let pods_root = temp.path().join("pods");
let conn = Connection::open_in_memory().unwrap();
migrate(&conn, Some(&pods_root)).unwrap();
let version: u32 = conn
.pragma_query_value(None, "user_version", |row| row.get(0))
.unwrap();
assert_eq!(version, CURRENT_VERSION);
assert!(!memory_evidence_check_allows_pod(&conn).unwrap());
}
}