pub(crate) mod escalation;
pub(crate) mod handoff;
pub(crate) mod memory_ops;
pub(crate) mod migration;
pub(crate) mod projection;
pub(crate) mod recovery;
pub(crate) mod schema;
pub(crate) mod session;
pub(crate) mod session_activity;
pub(crate) mod session_gates;
pub(crate) mod telemetry_cost;
use std::path::{Component, Path};
use anyhow::{Context, Result};
use rusqlite::{Connection, Error as SqliteError, ErrorCode};
use tracing::debug;
use crate::paths::state::StateLayout;
pub(crate) struct StateDb {
conn: Connection,
}
impl StateDb {
pub(crate) fn open(path: &Path) -> Result<Self> {
let needs_init = !path.exists();
debug!(path = %path.display(), needs_init, "opening state.db");
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("create state.db parent: {}", parent.display()))?;
}
let conn = Connection::open(path)
.map_err(|error| wrap_state_db_open_error(path, needs_init, error))?;
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
if needs_init {
debug!("initializing fresh state.db schema");
schema::initialize(&conn)?;
} else {
debug!("running state.db schema migration check");
schema::migrate(&conn)?;
}
debug!(path = %path.display(), "state.db ready");
Ok(Self { conn })
}
pub(crate) fn open_with_migration(path: &Path, layout: &StateLayout) -> Result<Self> {
let db = Self::open(path)?;
let report = migration::import_legacy_json(&db, layout)?;
if report.imported_count > 0 {
eprintln!(
"ccd: migrated {} legacy state file(s) to state.db",
report.imported_count
);
}
Ok(db)
}
pub(crate) fn open_for_layout(layout: &StateLayout) -> Result<Self> {
Self::open_with_migration(&layout.state_db_path(), layout)
}
pub(crate) fn conn(&self) -> &Connection {
&self.conn
}
}
fn wrap_state_db_open_error(path: &Path, needs_init: bool, error: SqliteError) -> anyhow::Error {
let is_cannot_open =
matches!(&error, SqliteError::SqliteFailure(err, _) if err.code == ErrorCode::CannotOpen);
let base = anyhow::Error::new(error).context(format!("open state.db: {}", path.display()));
if needs_init && is_linked_worktree_clone_state_path(path) && is_cannot_open {
return base.context(format!(
"first-create failed for workspace-local state under the shared linked-worktree gitdir at {}; some sandboxed environments can read the checkout but cannot create new files under `.git/worktrees/...`. Re-run this command once with broader filesystem access or pre-create `state.db` outside the sandbox",
path.display()
));
}
base
}
fn is_linked_worktree_clone_state_path(path: &Path) -> bool {
let mut saw_dot_git = false;
for component in path.components() {
let Component::Normal(segment) = component else {
continue;
};
if !saw_dot_git {
if segment == ".git" {
saw_dot_git = true;
}
continue;
}
return segment == "worktrees";
}
false
}
#[cfg(test)]
mod integration_tests {
use std::fs;
use tempfile::tempdir;
use super::*;
use crate::profile::ProfileName;
use crate::state::escalation::{EscalationEntry, EscalationKind};
use crate::state::projection_metadata::ProjectionObservation;
use crate::state::runtime::{
RecoveryOrigin, RuntimeCheckpointState, RuntimeHandoffItem, RuntimeHandoffState,
RuntimeLifecycle, RuntimeWorkingBufferState,
};
use crate::state::session::SessionStateFile;
fn test_layout(temp: &Path) -> StateLayout {
StateLayout::new(
temp.join(".ccd"),
temp.join("repo/.git/ccd"),
ProfileName::new("main").expect("profile"),
)
}
fn setup_dirs(layout: &StateLayout) {
fs::create_dir_all(layout.clone_profile_root()).expect("clone profile root");
fs::create_dir_all(layout.clone_runtime_state_root()).expect("clone runtime root");
}
fn interactive_session(
schema_version: u32,
started_at_epoch_s: u64,
last_started_at_epoch_s: u64,
start_count: u32,
session_id: Option<&str>,
mode: crate::state::session::SessionMode,
) -> SessionStateFile {
SessionStateFile {
schema_version,
started_at_epoch_s,
last_started_at_epoch_s,
start_count,
session_id: session_id.map(str::to_owned),
mode,
owner_kind: crate::state::session::SessionOwnerKind::Interactive,
owner_id: session_id.map(|_| "interactive".to_owned()),
supervisor_id: None,
lease_ttl_secs: None,
last_heartbeat_at_epoch_s: None,
revision: u64::from(session_id.is_some()),
}
}
#[test]
fn full_lifecycle_across_all_tables() {
let temp = tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
let db = StateDb::open_for_layout(&layout).unwrap();
let sess = interactive_session(
3,
1_000_000,
1_000_050,
2,
Some("ses_LIFECYCLE"),
crate::state::session::SessionMode::Research,
);
session::write(db.conn(), &sess).unwrap();
let loaded_sess = session::read(db.conn()).unwrap().expect("session");
assert_eq!(loaded_sess.session_id.as_deref(), Some("ses_LIFECYCLE"));
assert_eq!(loaded_sess.start_count, 2);
assert_eq!(
loaded_sess.mode,
crate::state::session::SessionMode::Research
);
let ho = RuntimeHandoffState {
title: "Integration test".to_owned(),
immediate_actions: vec![RuntimeHandoffItem {
text: "Run full lifecycle test.".to_owned(),
lifecycle: RuntimeLifecycle::Active,
}],
completed_state: vec![RuntimeHandoffItem {
text: "Schema created.".to_owned(),
lifecycle: RuntimeLifecycle::Active,
}],
..RuntimeHandoffState::default()
};
handoff::write(db.conn(), &ho).unwrap();
let loaded_ho = handoff::read(db.conn()).unwrap().expect("handoff");
assert_eq!(loaded_ho.title, "Integration test");
assert_eq!(loaded_ho.immediate_actions.len(), 1);
assert_eq!(loaded_ho.completed_state.len(), 1);
let esc = EscalationEntry {
id: "esc_INT_1".to_owned(),
kind: EscalationKind::Blocking,
reason: "integration test".to_owned(),
created_at_epoch_s: 1_000_000,
session_id: Some("ses_LIFECYCLE".to_owned()),
};
escalation::insert(db.conn(), &esc).unwrap();
assert!(escalation::has_blocking(db.conn()).unwrap());
let entries = escalation::list(db.conn()).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].id, "esc_INT_1");
let cp = RuntimeCheckpointState {
origin: RecoveryOrigin::Compaction,
captured_at_epoch_s: 1_000_100,
session_started_at_epoch_s: 1_000_000,
summary: "Full lifecycle checkpoint".to_owned(),
immediate_actions: vec!["Next step".to_owned()],
key_files: vec!["src/db/mod.rs".to_owned()],
};
recovery::write_checkpoint(db.conn(), &cp).unwrap();
let loaded_cp = recovery::read_checkpoint(db.conn())
.unwrap()
.expect("checkpoint");
assert_eq!(loaded_cp.summary, "Full lifecycle checkpoint");
let buf = RuntimeWorkingBufferState {
origin: RecoveryOrigin::RiskyPause,
captured_at_epoch_s: 1_000_200,
session_started_at_epoch_s: 1_000_000,
summary_lines: vec!["Line 1".to_owned(), "Line 2".to_owned()],
};
recovery::write_working_buffer(db.conn(), &buf).unwrap();
let loaded_buf = recovery::read_working_buffer(db.conn())
.unwrap()
.expect("buffer");
assert_eq!(loaded_buf.summary_lines.len(), 2);
let obs = ProjectionObservation {
observed_at_epoch_s: 1_000_300,
source_fingerprint: "fp_lifecycle".to_owned(),
projection_digests: None,
tool_surface_fingerprint: Some("tool_v1".to_owned()),
session_id: None,
};
projection::record(db.conn(), &obs).unwrap();
let projections = projection::list(db.conn()).unwrap();
assert_eq!(projections.len(), 1);
assert_eq!(projections[0].source_fingerprint, "fp_lifecycle");
}
#[test]
fn open_for_layout_migrates_legacy_json() {
let temp = tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
fs::write(
layout.session_state_path(),
r#"{"schema_version":3,"started_at_epoch_s":5000,"last_started_at_epoch_s":6000,"start_count":4,"session_id":"ses_LEGACY"}"#,
).unwrap();
fs::write(
layout.clone_runtime_state_path(),
r#"{"schema_version":1,"handoff":{"title":"Legacy handoff","immediate_actions":[{"text":"Migrated action.","lifecycle":"active"}],"completed_state":[],"operational_guardrails":[],"key_files":[],"definition_of_done":[]}}"#,
).unwrap();
fs::write(
layout.escalation_state_path(),
r#"{"schema_version":1,"entries":[{"id":"esc_LEGACY","kind":"non_blocking","reason":"legacy escalation","created_at_epoch_s":7000}]}"#,
).unwrap();
let db = StateDb::open_for_layout(&layout).unwrap();
let sess = session::read(db.conn()).unwrap().expect("migrated session");
assert_eq!(sess.session_id.as_deref(), Some("ses_LEGACY"));
assert_eq!(sess.start_count, 4);
let ho = handoff::read(db.conn()).unwrap().expect("migrated handoff");
assert_eq!(ho.title, "Legacy handoff");
assert_eq!(ho.immediate_actions.len(), 1);
let entries = escalation::list(db.conn()).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].id, "esc_LEGACY");
assert_eq!(entries[0].kind, EscalationKind::NonBlocking);
assert!(!layout.session_state_path().exists());
assert!(!layout.clone_runtime_state_path().exists());
assert!(!layout.escalation_state_path().exists());
let mut migrated = layout.session_state_path().as_os_str().to_owned();
migrated.push(".migrated");
assert!(Path::new(&migrated).exists());
}
#[test]
fn second_open_skips_migration() {
let temp = tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
{
let db = StateDb::open_for_layout(&layout).unwrap();
let sess = interactive_session(
3,
1000,
2000,
1,
Some("ses_FIRST"),
crate::state::session::SessionMode::General,
);
session::write(db.conn(), &sess).unwrap();
}
let db = StateDb::open_for_layout(&layout).unwrap();
let loaded = session::read(db.conn())
.unwrap()
.expect("session persisted");
assert_eq!(loaded.session_id.as_deref(), Some("ses_FIRST"));
}
#[test]
fn deleted_state_db_regenerates_cleanly() {
let temp = tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
{
let db = StateDb::open_for_layout(&layout).unwrap();
session::write(
db.conn(),
&interactive_session(
3,
1000,
2000,
1,
Some("ses_GONE"),
crate::state::session::SessionMode::General,
),
)
.unwrap();
}
fs::remove_file(layout.state_db_path()).unwrap();
let db = StateDb::open_for_layout(&layout).unwrap();
assert!(session::read(db.conn()).unwrap().is_none());
session::write(
db.conn(),
&interactive_session(
3,
3000,
4000,
1,
Some("ses_NEW"),
crate::state::session::SessionMode::General,
),
)
.unwrap();
let loaded = session::read(db.conn()).unwrap().expect("fresh session");
assert_eq!(loaded.session_id.as_deref(), Some("ses_NEW"));
}
#[test]
fn session_lifecycle_start_increment_stale_reset() {
let temp = tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
let db = StateDb::open_for_layout(&layout).unwrap();
let now = 1_000_000u64;
let state1 = interactive_session(
3,
now,
now,
1,
Some("ses_START"),
crate::state::session::SessionMode::General,
);
session::write(db.conn(), &state1).unwrap();
let sid = session::load_session_id(db.conn(), now + 100).unwrap();
assert_eq!(sid.as_deref(), Some("ses_START"));
let stale_sid = session::load_session_id(db.conn(), now + 8 * 3600 + 1).unwrap();
assert!(stale_sid.is_none());
}
#[test]
fn escalation_lifecycle() {
let temp = tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
let db = StateDb::open_for_layout(&layout).unwrap();
escalation::insert(
db.conn(),
&EscalationEntry {
id: "esc_B1".to_owned(),
kind: EscalationKind::Blocking,
reason: "critical".to_owned(),
created_at_epoch_s: 1000,
session_id: Some("ses_1".to_owned()),
},
)
.unwrap();
escalation::insert(
db.conn(),
&EscalationEntry {
id: "esc_NB1".to_owned(),
kind: EscalationKind::NonBlocking,
reason: "info".to_owned(),
created_at_epoch_s: 2000,
session_id: Some("ses_1".to_owned()),
},
)
.unwrap();
escalation::insert(
db.conn(),
&EscalationEntry {
id: "esc_NB2".to_owned(),
kind: EscalationKind::NonBlocking,
reason: "info 2".to_owned(),
created_at_epoch_s: 3000,
session_id: None,
},
)
.unwrap();
assert!(escalation::has_blocking(db.conn()).unwrap());
assert_eq!(escalation::list(db.conn()).unwrap().len(), 3);
let cleared = escalation::clear_non_blocking(db.conn()).unwrap();
assert_eq!(cleared, 2);
assert!(escalation::has_blocking(db.conn()).unwrap());
assert_eq!(escalation::list(db.conn()).unwrap().len(), 1);
escalation::clear_all(db.conn()).unwrap();
assert!(!escalation::has_blocking(db.conn()).unwrap());
assert!(escalation::list(db.conn()).unwrap().is_empty());
}
#[test]
fn handoff_preserves_inactive_items() {
let temp = tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
let db = StateDb::open_for_layout(&layout).unwrap();
let ho = RuntimeHandoffState {
title: "Mixed lifecycle".to_owned(),
immediate_actions: vec![
RuntimeHandoffItem {
text: "Active action.".to_owned(),
lifecycle: RuntimeLifecycle::Active,
},
RuntimeHandoffItem {
text: "Archived action.".to_owned(),
lifecycle: RuntimeLifecycle::Inactive,
},
],
..RuntimeHandoffState::default()
};
handoff::write(db.conn(), &ho).unwrap();
let loaded = handoff::read(db.conn()).unwrap().expect("handoff");
assert_eq!(loaded.immediate_actions.len(), 2);
assert_eq!(
loaded.immediate_actions[0].lifecycle,
RuntimeLifecycle::Active
);
assert_eq!(
loaded.immediate_actions[1].lifecycle,
RuntimeLifecycle::Inactive
);
}
#[test]
fn open_fails_cleanly_on_unwritable_path() {
let result = StateDb::open(Path::new("/proc/nonexistent/state.db"));
let err_msg = match result {
Err(e) => e.to_string(),
Ok(_) => panic!("expected error for unwritable path"),
};
assert!(
err_msg.contains("state.db"),
"error should reference state.db: {err_msg}"
);
}
}