use std::fs;
use std::path::Path;
use std::process::ExitCode;
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::{bail, Context, Result};
use serde::{Deserialize, Serialize};
use crate::db;
use crate::display_safe::display_safe;
use crate::handoff;
use crate::output::CommandReport;
use crate::paths::state::StateLayout;
use crate::profile::{self, ProfileName};
use crate::repo::marker as repo_marker;
use crate::state::machine_presence::{self, MachineRuntimeHealth, DEFAULT_IDLE_LEASE_SECS};
use crate::state::{
compiled as compiled_state, escalation as escalation_state, projection_metadata,
runtime as runtime_state, session_gates,
};
use crate::telemetry::cost as telemetry_cost;
const SESSION_SCHEMA_VERSION: u32 = 4;
pub(crate) const STALE_AFTER_SECS: u64 = 8 * 60 * 60;
const MAX_ACTIVITY_CHARS: usize = 280;
#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub(crate) enum SessionMode {
#[default]
General,
Research,
Implement,
}
impl SessionMode {
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::General => "general",
Self::Research => "research",
Self::Implement => "implement",
}
}
pub(crate) fn from_str(value: &str) -> Option<Self> {
match value {
"general" => Some(Self::General),
"research" => Some(Self::Research),
"implement" => Some(Self::Implement),
_ => None,
}
}
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub(crate) enum SessionLifecycle {
Interactive,
Autonomous,
}
#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub(crate) enum SessionOwnerKind {
#[default]
Interactive,
RuntimeSupervisor,
RuntimeWorker,
}
impl SessionOwnerKind {
pub(crate) fn as_str(self) -> &'static str {
match self {
Self::Interactive => "interactive",
Self::RuntimeSupervisor => "runtime_supervisor",
Self::RuntimeWorker => "runtime_worker",
}
}
pub(crate) fn lifecycle(self) -> SessionLifecycle {
match self {
Self::Interactive => SessionLifecycle::Interactive,
Self::RuntimeSupervisor | Self::RuntimeWorker => SessionLifecycle::Autonomous,
}
}
pub(crate) fn from_str(value: &str) -> Option<Self> {
match value {
"interactive" => Some(Self::Interactive),
"runtime_supervisor" => Some(Self::RuntimeSupervisor),
"runtime_worker" => Some(Self::RuntimeWorker),
_ => None,
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub(crate) struct SessionStateFile {
pub(crate) schema_version: u32,
pub(crate) started_at_epoch_s: u64,
pub(crate) last_started_at_epoch_s: u64,
pub(crate) start_count: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) session_id: Option<String>,
#[serde(default)]
pub(crate) mode: SessionMode,
#[serde(default)]
pub(crate) owner_kind: SessionOwnerKind,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) owner_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) supervisor_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) lease_ttl_secs: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) last_heartbeat_at_epoch_s: Option<u64>,
#[serde(default)]
pub(crate) revision: u64,
}
impl SessionStateFile {
pub(crate) fn lifecycle(&self) -> SessionLifecycle {
self.owner_kind.lifecycle()
}
pub(crate) fn lease_expires_at_epoch_s(&self) -> Option<u64> {
self.last_heartbeat_at_epoch_s.zip(self.lease_ttl_secs).map(
|(last_heartbeat_at_epoch_s, lease_ttl_secs)| {
last_heartbeat_at_epoch_s.saturating_add(lease_ttl_secs)
},
)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct SessionActivityState {
pub(crate) session_id: String,
pub(crate) actor_id: String,
pub(crate) current_activity: String,
pub(crate) updated_at_epoch_s: u64,
pub(crate) session_revision: u64,
}
#[derive(Debug, Clone)]
pub(crate) struct SessionStartOptions {
pub(crate) mode: Option<SessionMode>,
pub(crate) lifecycle: SessionLifecycle,
pub(crate) owner_kind: Option<SessionOwnerKind>,
pub(crate) actor_id: Option<String>,
pub(crate) supervisor_id: Option<String>,
pub(crate) lease_ttl_secs: Option<u64>,
}
impl SessionStartOptions {
pub(crate) fn interactive(mode: Option<SessionMode>) -> Self {
Self {
mode,
lifecycle: SessionLifecycle::Interactive,
owner_kind: None,
actor_id: None,
supervisor_id: None,
lease_ttl_secs: None,
}
}
}
impl Default for SessionStartOptions {
fn default() -> Self {
Self::interactive(None)
}
}
#[derive(Debug, Clone, Default)]
pub(crate) struct SessionClearOptions {
pub(crate) actor_id: Option<String>,
pub(crate) reason: Option<String>,
}
#[derive(Debug, Clone)]
pub(crate) struct SessionHeartbeatOptions {
pub(crate) actor_id: String,
pub(crate) activity: Option<String>,
}
#[derive(Debug, Clone)]
pub(crate) struct SessionTakeoverOptions {
pub(crate) actor_id: String,
pub(crate) supervisor_id: Option<String>,
pub(crate) reason: String,
}
#[derive(Debug, Clone, Serialize)]
pub(crate) struct SessionLifecycleProjection {
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) lifecycle: Option<SessionLifecycle>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) owner_kind: Option<SessionOwnerKind>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) actor_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) supervisor_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) lease_ttl_secs: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) last_heartbeat_at_epoch_s: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) lease_expires_at_epoch_s: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) stale: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) revision: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) current_activity: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) activity_updated_at_epoch_s: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub(crate) ownership_conflict: Option<bool>,
}
impl SessionLifecycleProjection {
pub(crate) fn missing() -> Self {
Self {
lifecycle: None,
owner_kind: None,
actor_id: None,
supervisor_id: None,
lease_ttl_secs: None,
last_heartbeat_at_epoch_s: None,
lease_expires_at_epoch_s: None,
stale: None,
revision: None,
current_activity: None,
activity_updated_at_epoch_s: None,
ownership_conflict: None,
}
}
}
#[derive(Serialize)]
pub struct SessionStateStartReport {
command: &'static str,
ok: bool,
profile: String,
path: String,
started_at_epoch_s: u64,
last_started_at_epoch_s: u64,
start_count: u32,
#[serde(skip_serializing_if = "Option::is_none")]
session_id: Option<String>,
mode: SessionMode,
lifecycle: SessionLifecycle,
owner_kind: SessionOwnerKind,
#[serde(skip_serializing_if = "Option::is_none")]
actor_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
supervisor_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
lease_ttl_secs: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
last_heartbeat_at_epoch_s: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
lease_expires_at_epoch_s: Option<u64>,
revision: u64,
#[serde(skip_serializing_if = "Option::is_none")]
current_activity: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
activity_updated_at_epoch_s: Option<u64>,
reset: bool,
warning: Option<String>,
}
impl SessionStateStartReport {
pub(crate) fn summary_line(&self) -> String {
format!(
"mode={} start_count={} started_at={} reset={}",
self.mode.as_str(),
self.start_count,
self.started_at_epoch_s,
self.reset
)
}
pub(crate) fn warning_message(&self) -> Option<&str> {
self.warning.as_deref()
}
pub(crate) fn lifecycle(&self) -> SessionLifecycle {
self.lifecycle
}
pub(crate) fn session_id(&self) -> Option<&str> {
self.session_id.as_deref()
}
pub(crate) fn started_at_epoch_s(&self) -> u64 {
self.started_at_epoch_s
}
pub(crate) fn last_started_at_epoch_s(&self) -> u64 {
self.last_started_at_epoch_s
}
pub(crate) fn start_count(&self) -> u32 {
self.start_count
}
}
#[derive(Serialize)]
pub struct SessionStateClearReport {
command: &'static str,
ok: bool,
profile: String,
path: String,
cleared: bool,
#[serde(skip_serializing_if = "Option::is_none")]
reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
lifecycle: Option<SessionLifecycle>,
#[serde(skip_serializing_if = "Option::is_none")]
owner_kind: Option<SessionOwnerKind>,
#[serde(skip_serializing_if = "Option::is_none")]
actor_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
supervisor_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
cleared_revision: Option<u64>,
}
#[derive(Serialize)]
pub struct SessionStateHeartbeatReport {
command: &'static str,
ok: bool,
profile: String,
path: String,
session_id: String,
actor_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
activity: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
current_activity: Option<String>,
last_heartbeat_at_epoch_s: u64,
lease_expires_at_epoch_s: u64,
revision: u64,
#[serde(skip_serializing_if = "Option::is_none")]
activity_updated_at_epoch_s: Option<u64>,
}
#[derive(Serialize)]
pub struct SessionStateTakeoverReport {
command: &'static str,
ok: bool,
profile: String,
path: String,
prior_session_id: String,
prior_actor_id: String,
session_id: String,
actor_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
supervisor_id: Option<String>,
lease_ttl_secs: u64,
last_heartbeat_at_epoch_s: u64,
lease_expires_at_epoch_s: u64,
revision: u64,
#[serde(skip_serializing_if = "Option::is_none")]
current_activity: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
activity_updated_at_epoch_s: Option<u64>,
reason: String,
}
impl CommandReport for SessionStateStartReport {
fn exit_code(&self) -> ExitCode {
ExitCode::SUCCESS
}
fn render_text(&self) {
if let Some(warning) = &self.warning {
println!("Warning: {warning}");
}
println!("Session state: {}", self.summary_line());
}
}
impl CommandReport for SessionStateClearReport {
fn exit_code(&self) -> ExitCode {
ExitCode::SUCCESS
}
fn render_text(&self) {
if self.cleared {
println!("Cleared session state at {}", self.path);
} else {
println!("No session state found at {}", self.path);
}
}
}
impl CommandReport for SessionStateHeartbeatReport {
fn exit_code(&self) -> ExitCode {
ExitCode::SUCCESS
}
fn render_text(&self) {
println!(
"Heartbeat recorded for `{}` on session `{}` (revision={}, lease_expires_at={}).",
self.actor_id, self.session_id, self.revision, self.lease_expires_at_epoch_s
);
}
}
impl CommandReport for SessionStateTakeoverReport {
fn exit_code(&self) -> ExitCode {
ExitCode::SUCCESS
}
fn render_text(&self) {
println!(
"Session takeover recorded: `{}` replaced `{}` on `{}` (revision={}).",
self.actor_id, self.prior_actor_id, self.path, self.revision
);
}
}
pub fn start(
repo_root: &Path,
explicit_profile: Option<&str>,
locality_id: Option<&str>,
options: SessionStartOptions,
) -> Result<SessionStateStartReport> {
let profile = profile::resolve(explicit_profile)?;
let layout = required_layout(repo_root, &profile)?;
let db = open_session_db(&layout)?;
let path = layout.state_db_path();
let ownership = resolve_start_ownership(&options)?;
let existing_hint = db::session::read(db.conn())?;
let now_hint = now_epoch_s()?;
let hint_next = build_next_state(existing_hint.clone(), now_hint, &options, &ownership)?;
let hint_reset = hint_next.start_count == 1;
if hint_reset {
if let (Some(ref old_state), Some(locality_id)) = (&existing_hint, locality_id) {
if let Some(old_id) = &old_state.session_id {
record_session_cost_best_effort(
db.conn(),
repo_root,
&layout,
locality_id,
old_id,
"before stale reset",
);
}
}
}
escalation_state::migrate_legacy_on_shared_db(&db, &layout)
.context("failed to migrate legacy escalation state before start")?;
let mut tx_outcome: Option<(
SessionStateFile,
Option<SessionActivityState>,
bool,
Option<String>,
)> = None;
run_session_lifecycle_transaction(&db, |db| {
let fresh = db::session::read(db.conn())?;
let now_in_tx = now_epoch_s()?;
let next_state = build_next_state(fresh.clone(), now_in_tx, &options, &ownership)?;
let reset = next_state.start_count == 1;
let stale_reset = reset && locality_id.is_some();
let warning = fresh.as_ref().and_then(|state| {
if is_stale(state, now_in_tx) {
None
} else if state.session_id.is_some() {
match next_state.lifecycle() {
SessionLifecycle::Interactive => Some(format!(
"active session already exists for profile `{profile}` in this clone; incrementing start_count"
)),
SessionLifecycle::Autonomous => Some(format!(
"active autonomous session owned by `{}` already exists for profile `{profile}` in this workspace; refreshing lease and incrementing start_count",
display_safe(next_state.owner_id.as_deref().unwrap_or("unknown")),
)),
}
} else {
Some(format!(
"legacy session state detected for profile `{profile}`; starting a fresh session and refreshing session telemetry"
))
}
});
if stale_reset {
escalation_state::clear_non_blocking_on_shared_db(db)
.context("failed to clear non-blocking escalations on stale reset")?;
session_gates::clear_on_shared_db(db)
.context("failed to clear execution gates on stale reset")?;
}
let expected_revision = fresh.as_ref().map(|state| state.revision).unwrap_or(0);
match db::session::write_if_revision_matches(db.conn(), &next_state, expected_revision)? {
db::session::ExclusiveWriteResult::Applied { .. } => {}
db::session::ExclusiveWriteResult::RevisionConflict { current_revision } => {
bail!(
"start detected concurrent session mutation: expected revision {expected_revision}, current is {current_revision}; retry `ccd session-state start`"
);
}
}
let current_activity = if next_state.lifecycle() == SessionLifecycle::Autonomous && !reset {
load_session_activity_for_state(db.conn(), &next_state)?
} else {
db::session_activity::delete(db.conn())?;
db::work_stream_decay::delete(db.conn())?;
None
};
tx_outcome = Some((next_state, current_activity, reset, warning));
Ok(())
})?;
drop(db);
let (next_state, current_activity, reset, warning) =
tx_outcome.expect("lifecycle transaction committed successfully but produced no outcome");
let now = next_state.last_started_at_epoch_s;
record_machine_presence_best_effort(
&layout,
locality_id,
MachineRuntimeHealth::Ready,
next_state.lease_ttl_secs,
now,
"after session state update",
);
if let Some(locality_id) = locality_id {
if let Some(ref session_id) = next_state.session_id {
record_projection_baseline(repo_root, &layout, locality_id, session_id);
}
}
Ok(SessionStateStartReport {
command: "session-state-start",
ok: true,
profile: profile.to_string(),
path: path.display().to_string(),
started_at_epoch_s: next_state.started_at_epoch_s,
last_started_at_epoch_s: next_state.last_started_at_epoch_s,
start_count: next_state.start_count,
session_id: next_state.session_id.clone(),
mode: next_state.mode,
lifecycle: next_state.lifecycle(),
owner_kind: next_state.owner_kind,
actor_id: next_state.owner_id.clone(),
supervisor_id: next_state.supervisor_id.clone(),
lease_ttl_secs: next_state.lease_ttl_secs,
last_heartbeat_at_epoch_s: next_state.last_heartbeat_at_epoch_s,
lease_expires_at_epoch_s: next_state.lease_expires_at_epoch_s(),
revision: next_state.revision,
current_activity: current_activity
.as_ref()
.map(|value| value.current_activity.clone()),
activity_updated_at_epoch_s: current_activity
.as_ref()
.map(|value| value.updated_at_epoch_s),
reset,
warning,
})
}
fn mint_session_id() -> String {
format!("ses_{}", ulid::Ulid::new())
}
struct LifecycleTransactionGuard<'a> {
db: &'a db::StateDb,
finalized: bool,
}
impl<'a> LifecycleTransactionGuard<'a> {
fn begin(db: &'a db::StateDb) -> Result<Self> {
db.conn().execute_batch("BEGIN IMMEDIATE")?;
Ok(Self {
db,
finalized: false,
})
}
fn commit(mut self) -> Result<()> {
self.db.conn().execute_batch("COMMIT")?;
self.finalized = true;
Ok(())
}
}
impl Drop for LifecycleTransactionGuard<'_> {
fn drop(&mut self) {
if self.finalized {
return;
}
if let Err(rollback_err) = self.db.conn().execute_batch("ROLLBACK") {
tracing::error!(
error = %rollback_err,
"lifecycle transaction ROLLBACK failed after body error or panic; \
connection may be inconsistent"
);
}
}
}
fn run_session_lifecycle_transaction<F>(db: &db::StateDb, body: F) -> Result<()>
where
F: FnOnce(&db::StateDb) -> Result<()>,
{
let guard = LifecycleTransactionGuard::begin(db)?;
body(db)?;
guard.commit()
}
fn build_next_state(
existing: Option<SessionStateFile>,
now: u64,
options: &SessionStartOptions,
ownership: &ResolvedStartOwnership,
) -> Result<SessionStateFile> {
match existing {
Some(state) if !is_stale(&state, now) && state.session_id.is_some() => {
match (ownership.owner_kind.lifecycle(), state.lifecycle()) {
(SessionLifecycle::Interactive, SessionLifecycle::Interactive) => {
Ok(refreshed_session_state(
&state,
now,
options.mode.unwrap_or(state.mode),
ownership,
))
}
(SessionLifecycle::Autonomous, SessionLifecycle::Autonomous) => {
let existing_actor_id = state.owner_id.as_deref().unwrap_or("unknown");
if ownership.owner_id != existing_actor_id {
bail!(
"active autonomous session is owned by `{}`; explicit takeover or clear is required before a different actor can start",
display_safe(existing_actor_id),
);
}
Ok(refreshed_session_state(
&state,
now,
options.mode.unwrap_or(state.mode),
ownership,
))
}
(SessionLifecycle::Interactive, SessionLifecycle::Autonomous) => bail!(
"active autonomous session is owned by `{}`; clear it or wait for it to become stale before switching back to interactive work",
display_safe(state.owner_id.as_deref().unwrap_or("unknown")),
),
(SessionLifecycle::Autonomous, SessionLifecycle::Interactive) => bail!(
"active interactive session already exists in this workspace; clear it or wait for it to become stale before autonomous start"
),
}
}
Some(state) => {
if ownership.owner_kind.lifecycle() == SessionLifecycle::Autonomous
&& state.lifecycle() == SessionLifecycle::Autonomous
&& state.owner_id.as_deref() != Some(ownership.owner_id.as_str())
{
bail!(
"stale autonomous session is owned by `{}`; use `ccd session-state takeover` to adopt it",
display_safe(state.owner_id.as_deref().unwrap_or("unknown")),
);
}
if ownership.owner_kind.lifecycle() == SessionLifecycle::Interactive
&& state.lifecycle() == SessionLifecycle::Autonomous
{
bail!(
"stale autonomous session is owned by `{}`; clear it or use explicit takeover before interactive start",
display_safe(state.owner_id.as_deref().unwrap_or("unknown")),
);
}
Ok(fresh_session_state(
now,
options.mode.unwrap_or_default(),
ownership,
Some(next_revision(&state)),
options
.supervisor_id
.clone()
.or_else(|| state.supervisor_id.clone()),
))
}
None => Ok(fresh_session_state(
now,
options.mode.unwrap_or_default(),
ownership,
None,
options.supervisor_id.clone(),
)),
}
}
fn warn_session_cost_persistence(context: &str, err: &anyhow::Error) {
eprintln!("warning: failed to persist session cost {context}: {err:#}");
}
fn record_session_cost_best_effort(
conn: &rusqlite::Connection,
repo_root: &Path,
layout: &StateLayout,
locality_id: &str,
session_id: &str,
context: &str,
) {
let focus = runtime_state::load_runtime_state(repo_root, layout, locality_id)
.ok()
.and_then(|runtime| {
let continuity_actions = runtime
.state
.handoff
.immediate_actions
.iter()
.filter(|item| item.lifecycle.is_active())
.map(|item| item.text.clone())
.collect::<Vec<_>>();
telemetry_cost::continuity_target(
runtime.execution_gates.view.attention_anchor.as_ref(),
&runtime.state.handoff.title,
&continuity_actions,
)
});
if let Err(err) = telemetry_cost::record_session_cost(
conn,
repo_root,
layout,
locality_id,
session_id,
focus.as_ref(),
) {
warn_session_cost_persistence(context, &err);
}
}
fn warn_machine_presence_persistence(context: &str, err: &anyhow::Error) {
eprintln!("warning: failed to record machine presence {context}: {err:#}");
}
fn record_machine_presence_best_effort(
layout: &StateLayout,
locality_id: Option<&str>,
runtime_health: MachineRuntimeHealth,
lease_ttl_secs: Option<u64>,
observed_at_epoch_s: u64,
context: &str,
) {
if let Err(err) = machine_presence::record_machine_presence(
layout,
locality_id,
runtime_health,
lease_ttl_secs,
observed_at_epoch_s,
) {
warn_machine_presence_persistence(context, &err);
}
}
pub(crate) fn load_session_id(layout: &StateLayout) -> Result<Option<String>> {
load_session_id_from_db(try_open_session_db(layout)?.as_ref())
}
pub(crate) fn load_session_id_from_db(db: Option<&db::StateDb>) -> Result<Option<String>> {
let Some(db) = db else {
return Ok(None);
};
let now = now_epoch_s()?;
db::session::load_session_id(db.conn(), now)
}
pub(crate) fn load_session_id_from_shared_db(
shared_db: &db::StateDb,
layout: &StateLayout,
) -> Result<Option<String>> {
migrate_session_json(shared_db, layout)?;
load_session_id_from_db(Some(shared_db))
}
pub fn clear(
repo_root: &Path,
explicit_profile: Option<&str>,
locality_id: Option<&str>,
options: SessionClearOptions,
) -> Result<SessionStateClearReport> {
let profile = profile::resolve(explicit_profile)?;
let layout = required_layout(repo_root, &profile)?;
let path = layout.state_db_path();
let mut cleared_lifecycle = None;
let mut cleared_owner_kind = None;
let mut cleared_actor_id = None;
let mut cleared_supervisor_id = None;
let mut cleared_revision = None;
let cleared = if let Some(db) = try_open_session_db(&layout)? {
let existing = db::session::read(db.conn())?;
let now = now_epoch_s().ok();
authorize_clear(existing.as_ref(), options.actor_id.as_deref(), now)?;
if let Some(state) = existing.as_ref() {
cleared_lifecycle = Some(state.lifecycle());
cleared_owner_kind = Some(state.owner_kind);
cleared_actor_id = state.owner_id.clone();
cleared_supervisor_id = state.supervisor_id.clone();
cleared_revision = Some(next_revision(state));
}
if let (Some(state), Some(locality_id)) = (&existing, locality_id) {
if let Some(session_id) = &state.session_id {
record_session_cost_best_effort(
db.conn(),
repo_root,
&layout,
locality_id,
session_id,
"before session clear",
);
}
}
escalation_state::migrate_legacy_on_shared_db(&db, &layout)
.context("failed to migrate legacy escalation state before session close-out")?;
let pre_tx_identity = existing
.as_ref()
.map(|state| (state.session_id.clone(), state.revision));
let mut cleared_this_tx = false;
run_session_lifecycle_transaction(&db, |db| {
let fresh = db::session::read(db.conn())?;
let now_in_tx = now_epoch_s().ok();
authorize_clear(fresh.as_ref(), options.actor_id.as_deref(), now_in_tx).map_err(
|err| {
anyhow::anyhow!(
"{err}; the session row was modified between the pre-clear authorization check and the transaction lock (most likely a concurrent `ccd session-state takeover`)"
)
},
)?;
let fresh_identity = fresh
.as_ref()
.map(|state| (state.session_id.clone(), state.revision));
if fresh_identity != pre_tx_identity {
bail!(
"session identity changed between the pre-clear authorization check and the transaction lock (most likely a concurrent `ccd session-state takeover` or `session-state start`); retry `ccd session-state clear`"
);
}
if fresh.is_some() {
db::session_activity::delete(db.conn())?;
db::work_stream_decay::delete(db.conn())?;
db::session::delete(db.conn())?;
cleared_this_tx = true;
}
escalation_state::clear_all_on_shared_db(db)
.context("failed to clear escalation state on session close-out")?;
session_gates::clear_on_shared_db(db)
.context("failed to clear execution gates on session close-out")?;
Ok(())
})?;
cleared_this_tx
} else {
escalation_state::clear_all_for_session_boundary(&layout)
.context("failed to clear escalation state on session close-out")?;
session_gates::clear_for_session_boundary(&layout)
.context("failed to clear execution gates on session close-out")?;
false
};
let cleared_at_epoch_s = now_epoch_s()?;
record_machine_presence_best_effort(
&layout,
locality_id,
MachineRuntimeHealth::Idle,
Some(DEFAULT_IDLE_LEASE_SECS),
cleared_at_epoch_s,
"after session clear",
);
Ok(SessionStateClearReport {
command: "session-state-clear",
ok: true,
profile: profile.to_string(),
path: path.display().to_string(),
cleared,
reason: options.reason,
lifecycle: cleared_lifecycle,
owner_kind: cleared_owner_kind,
actor_id: cleared_actor_id,
supervisor_id: cleared_supervisor_id,
cleared_revision,
})
}
pub fn heartbeat(
repo_root: &Path,
explicit_profile: Option<&str>,
options: SessionHeartbeatOptions,
) -> Result<SessionStateHeartbeatReport> {
let profile = profile::resolve(explicit_profile)?;
let layout = required_layout(repo_root, &profile)?;
let path = layout.state_db_path();
let db = open_session_db(&layout)?;
let now = now_epoch_s()?;
let locality_id = repo_marker::load(repo_root)?.map(|marker| marker.locality_id);
let Some(existing) = db::session::read(db.conn())? else {
bail!(
"no active session telemetry is available; run `ccd session-state start --path .` first"
);
};
if existing.lifecycle() != SessionLifecycle::Autonomous {
bail!("heartbeat requires an active autonomous session");
}
if existing.owner_id.as_deref() != Some(options.actor_id.as_str()) {
bail!(
"heartbeat requires actor `{}` to match the active autonomous owner `{}`",
display_safe(&options.actor_id),
display_safe(existing.owner_id.as_deref().unwrap_or("unknown")),
);
}
let session_id = existing
.session_id
.clone()
.ok_or_else(|| anyhow::anyhow!("active autonomous session is missing session_id"))?;
if is_stale(&existing, now) {
bail!(
"autonomous session lease for `{}` is stale; restart with the same actor or use takeover deliberately",
display_safe(&options.actor_id),
);
}
let normalized_activity = normalize_activity(options.activity)?;
let mut tx_outcome: Option<(SessionStateFile, Option<SessionActivityState>, u64)> = None;
run_session_lifecycle_transaction(&db, |db| {
let Some(fresh) = db::session::read(db.conn())? else {
bail!(
"session row disappeared between the pre-heartbeat check and the transaction; another caller cleared this workspace"
);
};
if fresh.lifecycle() != SessionLifecycle::Autonomous
|| fresh.owner_id.as_deref() != Some(options.actor_id.as_str())
|| fresh.session_id.as_deref() != Some(session_id.as_str())
{
bail!(
"session ownership changed between the pre-heartbeat check and the transaction; actor `{}` is no longer the autonomous owner",
display_safe(&options.actor_id),
);
}
let now_in_tx = now_epoch_s()?;
if is_stale(&fresh, now_in_tx) {
bail!(
"autonomous session lease for `{}` became stale while waiting for the heartbeat transaction lock; restart with the same actor or use takeover deliberately",
display_safe(&options.actor_id),
);
}
let expected_revision = fresh.revision;
let mut next = fresh.clone();
next.last_heartbeat_at_epoch_s = Some(now_in_tx);
next.revision = next_revision(&fresh);
let activity_to_write =
normalized_activity
.clone()
.map(|current_activity| SessionActivityState {
session_id: session_id.clone(),
actor_id: options.actor_id.clone(),
current_activity,
updated_at_epoch_s: now_in_tx,
session_revision: next.revision,
});
match db::session::write_if_revision_matches(db.conn(), &next, expected_revision)? {
db::session::ExclusiveWriteResult::Applied { .. } => {}
db::session::ExclusiveWriteResult::RevisionConflict { current_revision } => {
bail!(
"heartbeat detected concurrent session mutation: expected revision {expected_revision}, current is {current_revision}; retry `ccd session-state heartbeat`"
);
}
}
if let Some(ref activity) = activity_to_write {
db::session_activity::write(db.conn(), activity)?;
}
tx_outcome = Some((next, activity_to_write, now_in_tx));
Ok(())
})?;
let (next, activity_from_tx, now) =
tx_outcome.expect("lifecycle transaction committed successfully but produced no outcome");
let activity = match activity_from_tx {
Some(current) => Some(current),
None => load_session_activity_for_state(db.conn(), &next)?,
};
record_machine_presence_best_effort(
&layout,
locality_id.as_deref(),
MachineRuntimeHealth::Ready,
next.lease_ttl_secs,
now,
"after persisting session heartbeat",
);
Ok(SessionStateHeartbeatReport {
command: "session-state-heartbeat",
ok: true,
profile: profile.to_string(),
path: path.display().to_string(),
session_id,
actor_id: options.actor_id,
activity: activity
.as_ref()
.map(|value| value.current_activity.clone()),
last_heartbeat_at_epoch_s: now,
lease_expires_at_epoch_s: next.lease_expires_at_epoch_s().unwrap_or(now),
revision: next.revision,
current_activity: activity
.as_ref()
.map(|value| value.current_activity.clone()),
activity_updated_at_epoch_s: activity.as_ref().map(|value| value.updated_at_epoch_s),
})
}
pub fn takeover(
repo_root: &Path,
explicit_profile: Option<&str>,
locality_id: Option<&str>,
options: SessionTakeoverOptions,
) -> Result<SessionStateTakeoverReport> {
let profile = profile::resolve(explicit_profile)?;
let layout = required_layout(repo_root, &profile)?;
let db = open_session_db(&layout)?;
let path = layout.state_db_path();
let now = now_epoch_s()?;
let Some(existing) = db::session::read(db.conn())? else {
bail!("no stale autonomous session exists to take over");
};
if existing.lifecycle() != SessionLifecycle::Autonomous {
bail!("takeover only applies to stale autonomous sessions");
}
if !is_stale(&existing, now) {
bail!("takeover is only allowed after the active autonomous lease becomes stale");
}
let prior_actor_id = existing
.owner_id
.clone()
.unwrap_or_else(|| "unknown".to_owned());
if prior_actor_id == options.actor_id {
bail!(
"takeover requires a different actor than the stale owner; use `ccd session-state start` to recover the same actor lease"
);
}
let lease_ttl_secs = existing.lease_ttl_secs.ok_or_else(|| {
anyhow::anyhow!(
"stale autonomous session is missing lease_ttl_secs and cannot be taken over safely"
)
})?;
let prior_session_id = existing
.session_id
.clone()
.ok_or_else(|| anyhow::anyhow!("stale autonomous session is missing session_id"))?;
if let Some(locality_id) = locality_id {
record_session_cost_best_effort(
db.conn(),
repo_root,
&layout,
locality_id,
&prior_session_id,
"before session takeover",
);
}
let next_state = SessionStateFile {
schema_version: SESSION_SCHEMA_VERSION,
started_at_epoch_s: now,
last_started_at_epoch_s: now,
start_count: 1,
session_id: Some(mint_session_id()),
mode: existing.mode,
owner_kind: SessionOwnerKind::RuntimeWorker,
owner_id: Some(options.actor_id.clone()),
supervisor_id: options
.supervisor_id
.clone()
.or_else(|| existing.supervisor_id.clone()),
lease_ttl_secs: Some(lease_ttl_secs),
last_heartbeat_at_epoch_s: Some(now),
revision: next_revision(&existing),
};
db::session::write(db.conn(), &next_state)?;
db::session_activity::delete(db.conn())?;
record_machine_presence_best_effort(
&layout,
locality_id,
MachineRuntimeHealth::Ready,
next_state.lease_ttl_secs,
now,
"during session takeover",
);
Ok(SessionStateTakeoverReport {
command: "session-state-takeover",
ok: true,
profile: profile.to_string(),
path: path.display().to_string(),
prior_session_id,
prior_actor_id,
session_id: next_state.session_id.clone().unwrap_or_default(),
actor_id: options.actor_id,
supervisor_id: next_state.supervisor_id.clone(),
lease_ttl_secs,
last_heartbeat_at_epoch_s: now,
lease_expires_at_epoch_s: next_state.lease_expires_at_epoch_s().unwrap_or(now),
revision: next_state.revision,
current_activity: None,
activity_updated_at_epoch_s: None,
reason: options.reason,
})
}
fn resolve_start_ownership(options: &SessionStartOptions) -> Result<ResolvedStartOwnership> {
match options.lifecycle {
SessionLifecycle::Interactive => {
if options.owner_kind.is_some() {
bail!("`--owner-kind` is only valid with `--lifecycle autonomous`");
}
if options.actor_id.is_some() {
bail!("`--actor-id` is only valid with `--lifecycle autonomous`");
}
if options.supervisor_id.is_some() {
bail!("`--supervisor-id` is only valid with `--lifecycle autonomous`");
}
if options.lease_ttl_secs.is_some() {
bail!("`--lease-seconds` is only valid with `--lifecycle autonomous`");
}
Ok(ResolvedStartOwnership {
owner_kind: SessionOwnerKind::Interactive,
owner_id: "interactive".to_owned(),
supervisor_id: None,
lease_ttl_secs: None,
})
}
SessionLifecycle::Autonomous => {
let actor_id = options.actor_id.clone().ok_or_else(|| {
anyhow::anyhow!("`--actor-id` is required with `--lifecycle autonomous`")
})?;
let lease_ttl_secs = options.lease_ttl_secs.ok_or_else(|| {
anyhow::anyhow!("`--lease-seconds` is required with `--lifecycle autonomous`")
})?;
let owner_kind = match options
.owner_kind
.unwrap_or(SessionOwnerKind::RuntimeWorker)
{
SessionOwnerKind::RuntimeWorker => SessionOwnerKind::RuntimeWorker,
SessionOwnerKind::RuntimeSupervisor => SessionOwnerKind::RuntimeSupervisor,
SessionOwnerKind::Interactive => {
bail!("`interactive` is not a valid `--owner-kind` for autonomous sessions")
}
};
Ok(ResolvedStartOwnership {
owner_kind,
owner_id: actor_id,
supervisor_id: options.supervisor_id.clone(),
lease_ttl_secs: Some(lease_ttl_secs),
})
}
}
}
fn refreshed_session_state(
existing: &SessionStateFile,
now: u64,
mode: SessionMode,
ownership: &ResolvedStartOwnership,
) -> SessionStateFile {
SessionStateFile {
schema_version: SESSION_SCHEMA_VERSION,
started_at_epoch_s: existing.started_at_epoch_s,
last_started_at_epoch_s: now,
start_count: existing.start_count.saturating_add(1),
session_id: existing.session_id.clone(),
mode,
owner_kind: ownership.owner_kind,
owner_id: Some(ownership.owner_id.clone()),
supervisor_id: ownership
.supervisor_id
.clone()
.or_else(|| existing.supervisor_id.clone()),
lease_ttl_secs: ownership.lease_ttl_secs,
last_heartbeat_at_epoch_s: match ownership.owner_kind.lifecycle() {
SessionLifecycle::Interactive => None,
SessionLifecycle::Autonomous => Some(now),
},
revision: next_revision(existing),
}
}
fn fresh_session_state(
now: u64,
mode: SessionMode,
ownership: &ResolvedStartOwnership,
next_revision: Option<u64>,
supervisor_id: Option<String>,
) -> SessionStateFile {
SessionStateFile {
schema_version: SESSION_SCHEMA_VERSION,
started_at_epoch_s: now,
last_started_at_epoch_s: now,
start_count: 1,
session_id: Some(mint_session_id()),
mode,
owner_kind: ownership.owner_kind,
owner_id: Some(ownership.owner_id.clone()),
supervisor_id,
lease_ttl_secs: ownership.lease_ttl_secs,
last_heartbeat_at_epoch_s: match ownership.owner_kind.lifecycle() {
SessionLifecycle::Interactive => None,
SessionLifecycle::Autonomous => Some(now),
},
revision: next_revision.unwrap_or(1),
}
}
fn next_revision(existing: &SessionStateFile) -> u64 {
existing.revision.saturating_add(1).max(1)
}
fn authorize_clear(
existing: Option<&SessionStateFile>,
actor_id: Option<&str>,
now_epoch_s: Option<u64>,
) -> Result<()> {
let Some(existing) = existing else {
return Ok(());
};
if existing.lifecycle() == SessionLifecycle::Interactive {
return Ok(());
}
let Some(actor_id) = actor_id else {
bail!(
"clearing an autonomous session requires `--actor-id` matching the owner or supervisor"
);
};
if existing.owner_id.as_deref() == Some(actor_id)
|| existing.supervisor_id.as_deref() == Some(actor_id)
{
return Ok(());
}
if let Some(now) = now_epoch_s {
if is_stale(existing, now)
&& existing.session_id.is_some()
&& existing.lease_ttl_secs.is_some()
{
bail!(
"actor `{}` is not authorized to clear the stale autonomous session owned by `{}`; use `ccd session-state takeover --actor-id <actor-id> --reason <reason>` to reclaim the lease for a different actor, or retry `clear` with the owner or supervisor actor-id",
display_safe(actor_id),
display_safe(existing.owner_id.as_deref().unwrap_or("unknown")),
);
}
}
bail!(
"actor `{}` is not authorized to clear the active autonomous session owned by `{}`",
display_safe(actor_id),
display_safe(existing.owner_id.as_deref().unwrap_or("unknown")),
)
}
#[derive(Debug, Clone)]
struct ResolvedStartOwnership {
owner_kind: SessionOwnerKind,
owner_id: String,
supervisor_id: Option<String>,
lease_ttl_secs: Option<u64>,
}
pub(crate) fn lifecycle_projection(
state: &SessionStateFile,
now_epoch_s: u64,
caller_actor_id: Option<&str>,
activity: Option<&SessionActivityState>,
) -> SessionLifecycleProjection {
let actor_id = state.owner_id.clone();
let stale = is_stale(state, now_epoch_s);
let ownership_conflict = caller_actor_id.map(|caller_actor_id| {
state.lifecycle() == SessionLifecycle::Autonomous
&& state.owner_id.as_deref() != Some(caller_actor_id)
&& !stale
});
let projected_activity = activity.filter(|activity| activity_matches_state(state, activity));
SessionLifecycleProjection {
lifecycle: Some(state.lifecycle()),
owner_kind: Some(state.owner_kind),
actor_id,
supervisor_id: state.supervisor_id.clone(),
lease_ttl_secs: state.lease_ttl_secs,
last_heartbeat_at_epoch_s: state.last_heartbeat_at_epoch_s,
lease_expires_at_epoch_s: state.lease_expires_at_epoch_s(),
stale: Some(stale),
revision: Some(state.revision),
current_activity: projected_activity.map(|value| value.current_activity.clone()),
activity_updated_at_epoch_s: projected_activity.map(|value| value.updated_at_epoch_s),
ownership_conflict,
}
}
pub(crate) fn load_for_layout(layout: &StateLayout) -> Result<Option<SessionStateFile>> {
load_from_db(try_open_session_db(layout)?.as_ref())
}
pub(crate) fn load_from_db(db: Option<&db::StateDb>) -> Result<Option<SessionStateFile>> {
let Some(db) = db else {
return Ok(None);
};
db::session::read(db.conn())
}
pub(crate) fn load_activity_for_layout(
layout: &StateLayout,
) -> Result<Option<SessionActivityState>> {
load_activity_from_db(try_open_session_db(layout)?.as_ref())
}
pub(crate) fn load_activity_from_db(
db: Option<&db::StateDb>,
) -> Result<Option<SessionActivityState>> {
let Some(db) = db else {
return Ok(None);
};
db::session_activity::read(db.conn())
}
pub(crate) fn now_epoch_s() -> Result<u64> {
Ok(SystemTime::now()
.duration_since(UNIX_EPOCH)
.context("system clock is before UNIX_EPOCH")?
.as_secs())
}
pub(crate) fn session_minutes(state: &SessionStateFile, now_epoch_s: u64) -> u64 {
now_epoch_s.saturating_sub(state.started_at_epoch_s) / 60
}
pub(crate) fn is_stale(state: &SessionStateFile, now_epoch_s: u64) -> bool {
if state.session_id.is_none() {
return true;
}
match state.lifecycle() {
SessionLifecycle::Interactive => {
now_epoch_s.saturating_sub(state.last_started_at_epoch_s) > STALE_AFTER_SECS
}
SessionLifecycle::Autonomous => state
.lease_expires_at_epoch_s()
.is_none_or(|lease_expires_at_epoch_s| lease_expires_at_epoch_s <= now_epoch_s),
}
}
fn open_session_db(layout: &StateLayout) -> Result<db::StateDb> {
let db = db::StateDb::open(&layout.state_db_path())?;
migrate_session_json(&db, layout)?;
Ok(db)
}
fn try_open_session_db(layout: &StateLayout) -> Result<Option<db::StateDb>> {
db::StateDb::try_open_with_migration(
layout,
|| layout.session_state_path().exists(),
|db| migrate_session_json(db, layout),
)
}
fn migrate_session_json(db: &db::StateDb, layout: &StateLayout) -> Result<()> {
let path = layout.session_state_path();
let contents = match fs::read_to_string(&path) {
Ok(c) => c,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(e) => return Err(e).with_context(|| format!("failed to read {}", path.display())),
};
if db::session::read(db.conn())?.is_none() {
let state: SessionStateFile = serde_json::from_str(&contents)
.with_context(|| format!("failed to parse {}", path.display()))?;
db::session::write(db.conn(), &state)?;
}
let mut migrated = path.as_os_str().to_owned();
migrated.push(".migrated");
fs::rename(&path, Path::new(&migrated))
.with_context(|| format!("failed to rename {} to .migrated", path.display()))?;
Ok(())
}
fn required_layout(repo_root: &Path, profile: &ProfileName) -> Result<StateLayout> {
let layout = StateLayout::resolve(repo_root, profile.clone())?;
ensure_profile_exists(&layout)?;
Ok(layout)
}
fn ensure_profile_exists(layout: &StateLayout) -> Result<()> {
let profile_root = layout.profile_root();
if profile_root.is_dir() {
return Ok(());
}
bail!(
"profile `{}` does not exist at {}; bootstrap it before using session-state",
layout.profile(),
profile_root.display()
)
}
fn record_projection_baseline(
repo_root: &Path,
layout: &StateLayout,
locality_id: &str,
session_id: &str,
) {
let Ok(runtime) = runtime_state::load_runtime_state(repo_root, layout, locality_id) else {
return;
};
let Ok(store) = compiled_state::preview_for_target_with_cache(
layout,
&runtime,
compiled_state::ProjectionTarget::Session,
) else {
return;
};
let store = store.value;
let base_digests = compiled_state::compute_projection_digests(&store);
let execution_gates_json =
serialize_view_or_empty(|| serde_json::to_string(&runtime.execution_gates.view));
let escalation_json = serialize_view_or_empty(|| {
let entries = escalation_state::load_for_layout(layout).unwrap_or_default();
serde_json::to_string(&escalation_state::build_view(layout, &entries))
});
let recovery_json = serialize_view_or_empty(|| {
serde_json::to_string(&runtime_state::recovery_view(&runtime.recovery))
});
let git_state_json = if layout.resolved_substrate().is_git() {
handoff::read_git_state(repo_root, handoff::BranchMode::AllowDetachedHead)
.ok()
.and_then(|git| serde_json::to_string(&Some(git)).ok())
.unwrap_or_default()
} else {
serialize_view_or_empty(|| serde_json::to_string(&None::<handoff::GitState>))
};
let session_state_json = serialize_view_or_empty(|| {
serde_json::to_string(&build_session_state_view_for_baseline(layout))
});
let extended = compiled_state::compute_extended_digests(
&base_digests,
&execution_gates_json,
&escalation_json,
&recovery_json,
&git_state_json,
&session_state_json,
);
if let Err(error) = projection_metadata::record_baseline(layout, &store, session_id, &extended)
{
projection_metadata::warn_record_error(layout, &error);
}
}
fn serialize_view_or_empty<F: FnOnce() -> serde_json::Result<String>>(f: F) -> String {
f().unwrap_or_default()
}
fn build_session_state_view_for_baseline(layout: &StateLayout) -> serde_json::Value {
let tracked_session = load_for_layout(layout).ok().flatten();
let tracked_activity = load_activity_for_layout(layout).ok().flatten();
let path = layout.state_db_path().display().to_string();
match tracked_session {
Some(ref state) => {
let status = if state.session_id.is_some() {
"active"
} else {
"stale"
};
let mut map = serde_json::Map::new();
map.insert("path".into(), serde_json::Value::String(path));
map.insert("status".into(), serde_json::Value::String(status.into()));
if let Some(ref sid) = state.session_id {
map.insert("session_id".into(), serde_json::Value::String(sid.clone()));
}
map.insert(
"mode".into(),
serde_json::to_value(state.mode).unwrap_or_default(),
);
map.insert(
"start_count".into(),
serde_json::Value::Number(state.start_count.into()),
);
let projection = lifecycle_projection(
state,
now_epoch_s().unwrap_or_default(),
None,
tracked_activity.as_ref(),
);
if let Ok(serde_json::Value::Object(proj_map)) = serde_json::to_value(&projection) {
for (k, v) in proj_map {
map.insert(k, v);
}
}
serde_json::Value::Object(map)
}
None => {
let mut map = serde_json::Map::new();
map.insert("path".into(), serde_json::Value::String(path));
map.insert("status".into(), serde_json::Value::String("missing".into()));
let missing = SessionLifecycleProjection::missing();
if let Ok(serde_json::Value::Object(proj_map)) = serde_json::to_value(&missing) {
for (k, v) in proj_map {
map.insert(k, v);
}
}
serde_json::Value::Object(map)
}
}
}
fn normalize_activity(activity: Option<String>) -> Result<Option<String>> {
let Some(activity) = activity else {
return Ok(None);
};
let trimmed = activity.trim();
if trimmed.is_empty() {
return Ok(None);
}
if trimmed.chars().count() > MAX_ACTIVITY_CHARS {
bail!("activity text is too long (max {MAX_ACTIVITY_CHARS} characters)");
}
Ok(Some(trimmed.to_owned()))
}
fn load_session_activity_for_state(
conn: &rusqlite::Connection,
state: &SessionStateFile,
) -> Result<Option<SessionActivityState>> {
let Some(activity) = db::session_activity::read(conn)? else {
return Ok(None);
};
if !activity_matches_state(state, &activity) {
return Ok(None);
}
Ok(Some(activity))
}
fn activity_matches_state(state: &SessionStateFile, activity: &SessionActivityState) -> bool {
state.lifecycle() == SessionLifecycle::Autonomous
&& Some(activity.session_id.as_str()) == state.session_id.as_deref()
&& state.owner_id.as_deref() == Some(activity.actor_id.as_str())
}
#[cfg(test)]
mod tests {
use super::*;
fn interactive_state(
schema_version: u32,
started_at_epoch_s: u64,
last_started_at_epoch_s: u64,
start_count: u32,
session_id: Option<&str>,
mode: SessionMode,
) -> SessionStateFile {
SessionStateFile {
schema_version,
started_at_epoch_s,
last_started_at_epoch_s,
start_count,
session_id: session_id.map(str::to_owned),
mode,
owner_kind: SessionOwnerKind::Interactive,
owner_id: session_id.map(|_| "interactive".to_owned()),
supervisor_id: None,
lease_ttl_secs: None,
last_heartbeat_at_epoch_s: None,
revision: u64::from(session_id.is_some()),
}
}
fn interactive_start(
mode: Option<SessionMode>,
) -> (SessionStartOptions, ResolvedStartOwnership) {
let options = SessionStartOptions::interactive(mode);
let ownership = resolve_start_ownership(&options).unwrap();
(options, ownership)
}
#[test]
fn deserialize_v2_without_session_id() {
let json = r#"{
"schema_version": 2,
"started_at_epoch_s": 1000,
"last_started_at_epoch_s": 2000,
"start_count": 3
}"#;
let state: SessionStateFile = serde_json::from_str(json).unwrap();
assert_eq!(state.schema_version, 2);
assert_eq!(state.start_count, 3);
assert!(state.session_id.is_none());
}
#[test]
fn deserialize_v3_with_session_id() {
let json = r#"{
"schema_version": 3,
"started_at_epoch_s": 1000,
"last_started_at_epoch_s": 2000,
"start_count": 1,
"session_id": "ses_01ABC"
}"#;
let state: SessionStateFile = serde_json::from_str(json).unwrap();
assert_eq!(state.schema_version, 3);
assert_eq!(state.session_id.as_deref(), Some("ses_01ABC"));
}
#[test]
fn deserialize_corrupted_json_fails() {
let json = r#"{ not valid json }"#;
let result = serde_json::from_str::<SessionStateFile>(json);
assert!(result.is_err());
}
#[test]
fn serialize_v3_omits_none_session_id() {
let state = interactive_state(3, 1000, 2000, 1, None, SessionMode::General);
let json = serde_json::to_string(&state).unwrap();
assert!(!json.contains("session_id"));
}
#[test]
fn serialize_v3_includes_session_id_when_present() {
let state = interactive_state(3, 1000, 2000, 1, Some("ses_01ABC"), SessionMode::General);
let json = serde_json::to_string(&state).unwrap();
assert!(json.contains("ses_01ABC"));
}
#[test]
fn start_mints_session_id_on_fresh_start() {
let now = 1_000_000;
let (options, ownership) = interactive_start(None);
let state = build_next_state(None, now, &options, &ownership).unwrap();
assert_eq!(state.start_count, 1);
assert_eq!(state.started_at_epoch_s, now);
assert_eq!(state.mode, SessionMode::General);
let sid = state.session_id.expect("should mint session_id");
assert!(
sid.starts_with("ses_"),
"session_id should have ses_ prefix, got: {sid}"
);
assert_eq!(state.owner_kind, SessionOwnerKind::Interactive);
assert_eq!(state.owner_id.as_deref(), Some("interactive"));
assert_eq!(state.revision, 1);
}
#[test]
fn start_preserves_session_id_on_non_stale_increment() {
let now = 1_000_000;
let existing = interactive_state(
3,
now - 100,
now - 50,
2,
Some("ses_EXISTING"),
SessionMode::Research,
);
let (options, ownership) = interactive_start(None);
let state = build_next_state(Some(existing), now, &options, &ownership).unwrap();
assert_eq!(state.start_count, 3);
assert_eq!(state.session_id.as_deref(), Some("ses_EXISTING"));
assert_eq!(state.started_at_epoch_s, now - 100);
assert_eq!(state.mode, SessionMode::Research);
assert_eq!(state.revision, 2);
}
#[test]
fn start_mints_new_id_on_stale_reset() {
let now = 1_000_000;
let stale_time = now - STALE_AFTER_SECS - 1;
let existing = interactive_state(
3,
stale_time - 100,
stale_time,
5,
Some("ses_OLD"),
SessionMode::Research,
);
let (options, ownership) = interactive_start(None);
let state = build_next_state(Some(existing), now, &options, &ownership).unwrap();
assert_eq!(state.start_count, 1);
assert_eq!(state.mode, SessionMode::General);
let sid = state.session_id.expect("should mint new session_id");
assert!(sid.starts_with("ses_"));
assert_ne!(sid, "ses_OLD");
assert_eq!(state.revision, 2);
}
#[test]
fn start_mints_new_id_when_v2_file_has_no_session_id() {
let now = 1_000_000;
let existing = interactive_state(2, now - 100, now - 50, 3, None, SessionMode::General);
let (options, ownership) = interactive_start(None);
let state = build_next_state(Some(existing), now, &options, &ownership).unwrap();
assert_eq!(state.start_count, 1);
let sid = state
.session_id
.expect("should mint session_id for v2 upgrade");
assert!(sid.starts_with("ses_"));
assert_eq!(state.revision, 1);
}
#[test]
fn start_accepts_explicit_mode_override() {
let now = 1_000_000;
let existing = interactive_state(
3,
now - 100,
now - 50,
2,
Some("ses_EXISTING"),
SessionMode::Research,
);
let (options, ownership) = interactive_start(Some(SessionMode::Implement));
let state = build_next_state(Some(existing), now, &options, &ownership).unwrap();
assert_eq!(state.start_count, 3);
assert_eq!(state.mode, SessionMode::Implement);
}
#[test]
fn db_roundtrip_session_state() {
let conn = rusqlite::Connection::open_in_memory().unwrap();
crate::db::schema::initialize(&conn).unwrap();
let state = interactive_state(
3,
1_000_000,
1_000_050,
2,
Some("ses_DB_RT"),
SessionMode::Research,
);
db::session::write(&conn, &state).unwrap();
let loaded = db::session::read(&conn).unwrap().expect("should exist");
assert_eq!(loaded.session_id.as_deref(), Some("ses_DB_RT"));
assert_eq!(loaded.start_count, 2);
assert_eq!(loaded.mode, SessionMode::Research);
}
#[test]
fn db_load_session_id_returns_none_for_stale() {
let conn = rusqlite::Connection::open_in_memory().unwrap();
crate::db::schema::initialize(&conn).unwrap();
let now = 1_000_000u64;
let state = interactive_state(
3,
now - STALE_AFTER_SECS - 200,
now - STALE_AFTER_SECS - 100,
1,
Some("ses_STALE"),
SessionMode::General,
);
db::session::write(&conn, &state).unwrap();
assert!(db::session::load_session_id(&conn, now).unwrap().is_none());
}
#[test]
fn db_load_session_id_returns_id_for_active() {
let conn = rusqlite::Connection::open_in_memory().unwrap();
crate::db::schema::initialize(&conn).unwrap();
let now = 1_000_000u64;
let state = interactive_state(
3,
now - 100,
now - 50,
1,
Some("ses_ACTIVE"),
SessionMode::General,
);
db::session::write(&conn, &state).unwrap();
assert_eq!(
db::session::load_session_id(&conn, now).unwrap().as_deref(),
Some("ses_ACTIVE")
);
}
fn test_layout_for_lifecycle_tx(temp: &Path) -> StateLayout {
StateLayout::new(
temp.join(".ccd"),
temp.join("repo/.git/ccd"),
ProfileName::new("main").expect("profile"),
)
}
fn setup_dirs_for_lifecycle_tx(layout: &StateLayout) {
std::fs::create_dir_all(layout.clone_profile_root()).expect("clone profile root");
std::fs::create_dir_all(layout.clone_runtime_state_root()).expect("clone runtime root");
}
#[test]
fn lifecycle_transaction_rolls_back_on_error_preserves_session_and_escalation() {
use crate::state::escalation::{EscalationEntry, EscalationKind};
let temp = tempfile::tempdir().unwrap();
let layout = test_layout_for_lifecycle_tx(temp.path());
setup_dirs_for_lifecycle_tx(&layout);
let db = db::StateDb::open(&layout.state_db_path()).unwrap();
let now = now_epoch_s().unwrap();
let seeded_session = interactive_state(
SESSION_SCHEMA_VERSION,
now - 100,
now - 50,
1,
Some("ses_ROLLBACK_TEST"),
SessionMode::General,
);
db::session::write(db.conn(), &seeded_session).unwrap();
db::escalation::insert(
db.conn(),
&EscalationEntry {
id: "esc_B_keep_on_rollback".to_owned(),
kind: EscalationKind::Blocking,
reason: "rollback-regression".to_owned(),
created_at_epoch_s: now,
session_id: None,
},
)
.unwrap();
let outcome = run_session_lifecycle_transaction(&db, |db| {
db::session::delete(db.conn())?;
db::escalation::clear_all(db.conn())?;
anyhow::bail!("simulated mid-transaction failure");
});
assert!(
outcome.is_err(),
"body returned Err so transaction helper must propagate the error",
);
let session_after = db::session::read(db.conn()).unwrap();
assert_eq!(
session_after.as_ref().and_then(|s| s.session_id.as_deref()),
Some("ses_ROLLBACK_TEST"),
"session row must survive the rolled-back delete",
);
let escalations_after = db::escalation::list(db.conn()).unwrap();
assert_eq!(
escalations_after.len(),
1,
"escalation must survive the rolled-back clear",
);
assert_eq!(escalations_after[0].id, "esc_B_keep_on_rollback");
}
#[test]
fn pre_migration_preserves_legacy_escalations_across_transaction_rollback() {
use crate::state::escalation::{self as escalation_state, EscalationStateFile};
let temp = tempfile::tempdir().unwrap();
let layout = test_layout_for_lifecycle_tx(temp.path());
setup_dirs_for_lifecycle_tx(&layout);
let legacy_file = EscalationStateFile {
schema_version: 1,
entries: vec![crate::state::escalation::EscalationEntry {
id: "esc_LEGACY_KEEP".to_owned(),
kind: crate::state::escalation::EscalationKind::Blocking,
reason: "must survive rollback".to_owned(),
created_at_epoch_s: 1_700_000_000,
session_id: None,
}],
};
let escalation_json_path = layout.escalation_state_path();
std::fs::create_dir_all(escalation_json_path.parent().unwrap()).unwrap();
std::fs::write(
&escalation_json_path,
serde_json::to_string(&legacy_file).unwrap(),
)
.unwrap();
let db = db::StateDb::open(&layout.state_db_path()).unwrap();
escalation_state::migrate_legacy_on_shared_db(&db, &layout).unwrap();
assert!(!escalation_json_path.exists());
let mut migrated_path = escalation_json_path.as_os_str().to_owned();
migrated_path.push(".migrated");
assert!(Path::new(&migrated_path).exists());
assert_eq!(db::escalation::list(db.conn()).unwrap().len(), 1);
let outcome = run_session_lifecycle_transaction(&db, |db| {
db::escalation::clear_all(db.conn())?;
anyhow::bail!("simulated post-clear failure");
});
assert!(outcome.is_err());
let surviving = db::escalation::list(db.conn()).unwrap();
assert_eq!(
surviving.len(),
1,
"legacy escalation must survive rollback because pre-migration put it in the DB outside the transaction"
);
assert_eq!(surviving[0].id, "esc_LEGACY_KEEP");
}
#[test]
fn lifecycle_transaction_rolls_back_on_body_panic() {
use std::panic::{self, AssertUnwindSafe};
let temp = tempfile::tempdir().unwrap();
let layout = test_layout_for_lifecycle_tx(temp.path());
setup_dirs_for_lifecycle_tx(&layout);
let db = db::StateDb::open(&layout.state_db_path()).unwrap();
let now = now_epoch_s().unwrap();
let seeded_session = interactive_state(
SESSION_SCHEMA_VERSION,
now - 100,
now - 50,
1,
Some("ses_PANIC_SURVIVOR"),
SessionMode::General,
);
db::session::write(db.conn(), &seeded_session).unwrap();
let panic_result = panic::catch_unwind(AssertUnwindSafe(|| {
let _ = run_session_lifecycle_transaction(&db, |db| {
db::session::delete(db.conn())?;
panic!("simulated panic inside lifecycle transaction body");
});
}));
assert!(panic_result.is_err(), "closure panic must propagate");
let after = db::session::read(db.conn()).unwrap();
assert_eq!(
after.as_ref().and_then(|s| s.session_id.as_deref()),
Some("ses_PANIC_SURVIVOR"),
"Drop guard must issue ROLLBACK even on panic so the pre-transaction state is preserved",
);
run_session_lifecycle_transaction(&db, |_| Ok(()))
.expect("connection must remain usable after rolled-back panic");
}
#[test]
fn lifecycle_transaction_commits_when_body_succeeds() {
use crate::state::escalation::{EscalationEntry, EscalationKind};
let temp = tempfile::tempdir().unwrap();
let layout = test_layout_for_lifecycle_tx(temp.path());
setup_dirs_for_lifecycle_tx(&layout);
let db = db::StateDb::open(&layout.state_db_path()).unwrap();
let now = now_epoch_s().unwrap();
let seeded_session = interactive_state(
SESSION_SCHEMA_VERSION,
now - 100,
now - 50,
1,
Some("ses_COMMIT_TEST"),
SessionMode::General,
);
db::session::write(db.conn(), &seeded_session).unwrap();
db::escalation::insert(
db.conn(),
&EscalationEntry {
id: "esc_commit_then_clear".to_owned(),
kind: EscalationKind::NonBlocking,
reason: "commit-regression".to_owned(),
created_at_epoch_s: now,
session_id: None,
},
)
.unwrap();
run_session_lifecycle_transaction(&db, |db| {
db::session::delete(db.conn())?;
db::escalation::clear_all(db.conn())?;
Ok(())
})
.expect("commit path must succeed");
assert!(
db::session::read(db.conn()).unwrap().is_none(),
"session row must be gone after committed delete",
);
assert!(
db::escalation::list(db.conn()).unwrap().is_empty(),
"escalations must be gone after committed clear",
);
}
fn autonomous_state(owner: &str, now: u64, heartbeat_age_secs: u64) -> SessionStateFile {
SessionStateFile {
schema_version: SESSION_SCHEMA_VERSION,
started_at_epoch_s: now.saturating_sub(heartbeat_age_secs),
last_started_at_epoch_s: now.saturating_sub(heartbeat_age_secs),
start_count: 1,
session_id: Some("ses_AUTO".to_owned()),
mode: SessionMode::General,
owner_kind: SessionOwnerKind::RuntimeWorker,
owner_id: Some(owner.to_owned()),
supervisor_id: Some("runtime/supervisor".to_owned()),
lease_ttl_secs: Some(60),
last_heartbeat_at_epoch_s: Some(now.saturating_sub(heartbeat_age_secs)),
revision: 1,
}
}
#[test]
fn authorize_clear_on_active_autonomous_mentions_active_not_takeover() {
let now = 1_000_000;
let existing = autonomous_state("runtime/worker-1", now, 0);
let err = authorize_clear(Some(&existing), Some("runtime/other"), Some(now))
.expect_err("clear must be refused for a different actor");
let message = err.to_string();
assert!(
message.contains("active autonomous session"),
"non-stale refusal should retain the original text, got: {message}"
);
assert!(
!message.contains("takeover"),
"non-stale refusal must not point at takeover, got: {message}"
);
}
#[test]
fn authorize_clear_on_stale_autonomous_points_at_takeover() {
let now = 1_000_000;
let existing = autonomous_state("runtime/worker-1", now, STALE_AFTER_SECS + 1);
let err = authorize_clear(Some(&existing), Some("runtime/other"), Some(now))
.expect_err("clear must be refused for a different actor");
let message = err.to_string();
assert!(
message.contains("stale autonomous session"),
"stale refusal must say the session is stale, got: {message}"
);
assert!(
message
.contains("ccd session-state takeover --actor-id <actor-id> --reason <reason>"),
"stale refusal must include both takeover required flags as placeholders so the guidance stays safe regardless of actor_id contents, got: {message}"
);
assert!(
message.contains("runtime/worker-1"),
"stale refusal must still name the current owner, got: {message}"
);
}
#[test]
fn authorize_clear_stale_but_takeover_ineligible_falls_back_to_generic_text() {
let now = 1_000_000;
let mut missing_lease = autonomous_state("runtime/worker-1", now, STALE_AFTER_SECS + 1);
missing_lease.lease_ttl_secs = None;
let err = authorize_clear(Some(&missing_lease), Some("runtime/other"), Some(now))
.expect_err("clear must be refused for a different actor");
let message = err.to_string();
assert!(
!message.contains("takeover"),
"stale row with missing lease_ttl_secs must not recommend takeover (takeover itself requires lease_ttl_secs), got: {message}"
);
assert!(
message.contains("active autonomous session"),
"fallback must retain the generic refusal text, got: {message}"
);
let mut missing_session_id =
autonomous_state("runtime/worker-1", now, STALE_AFTER_SECS + 1);
missing_session_id.session_id = None;
let err = authorize_clear(Some(&missing_session_id), Some("runtime/other"), Some(now))
.expect_err("clear must be refused for a different actor");
let message = err.to_string();
assert!(
!message.contains("takeover"),
"stale row with missing session_id must not recommend takeover (takeover itself requires session_id), got: {message}"
);
}
#[test]
fn authorize_clear_stale_message_uses_placeholders_not_raw_actor_id() {
let now = 1_000_000;
let existing = autonomous_state("runtime/worker-1", now, STALE_AFTER_SECS + 1);
let hostile = "foo; rm -rf /";
let err = authorize_clear(Some(&existing), Some(hostile), Some(now))
.expect_err("clear must be refused for a different actor");
let message = err.to_string();
assert!(
message.contains("--actor-id <actor-id> --reason <reason>"),
"takeover hint must use placeholders, got: {message}"
);
assert!(
!message.contains("--actor-id foo; rm -rf /"),
"takeover hint must not interpolate the raw actor_id into shell syntax, got: {message}"
);
}
#[test]
fn authorize_clear_owner_succeeds_regardless_of_staleness() {
let now = 1_000_000;
let existing = autonomous_state("runtime/worker-1", now, STALE_AFTER_SECS + 1);
authorize_clear(Some(&existing), Some("runtime/worker-1"), Some(now))
.expect("owner must clear even a stale session");
authorize_clear(Some(&existing), Some("runtime/supervisor"), Some(now))
.expect("supervisor must clear even a stale session");
}
#[test]
fn authorize_clear_owner_succeeds_without_clock() {
let now = 1_000_000;
let existing = autonomous_state("runtime/worker-1", now, STALE_AFTER_SECS + 1);
authorize_clear(Some(&existing), Some("runtime/worker-1"), None)
.expect("owner must clear even when wall clock is unavailable");
authorize_clear(Some(&existing), Some("runtime/supervisor"), None)
.expect("supervisor must clear even when wall clock is unavailable");
let err = authorize_clear(Some(&existing), Some("runtime/other"), None)
.expect_err("non-owner must still be refused");
let message = err.to_string();
assert!(
message.contains("active autonomous session"),
"clock-failure refusal must fall back to the generic text, got: {message}"
);
assert!(
!message.contains("takeover"),
"clock-failure refusal must not emit the stale-specific takeover hint, got: {message}"
);
}
#[test]
fn authorize_clear_stale_refusal_escapes_newlines_in_actor_ids() {
let state = SessionStateFile {
schema_version: SESSION_SCHEMA_VERSION,
started_at_epoch_s: 0,
last_started_at_epoch_s: 0,
start_count: 1,
session_id: Some("ses_stale".to_owned()),
mode: SessionMode::General,
owner_kind: SessionOwnerKind::RuntimeWorker,
owner_id: Some("owner\nFAKE\x1b[2Jlog".to_owned()),
supervisor_id: None,
lease_ttl_secs: Some(1),
last_heartbeat_at_epoch_s: Some(0),
revision: 1,
};
let now = 10_000;
let err = authorize_clear(Some(&state), Some("attacker\nroot"), Some(now))
.expect_err("non-owner on a stale session must be refused");
let message = err.to_string();
assert!(
!message.contains('\n'),
"rendered refusal must not contain literal newlines; got: {message:?}"
);
assert!(
!message.contains('\x1b'),
"rendered refusal must not contain literal ESC bytes; got: {message:?}"
);
assert!(
message.contains(r"attacker\nroot"),
"attacker id must appear in escaped form; got: {message:?}"
);
assert!(
message.contains(r"owner\nFAKE\x1blog") || message.contains(r"owner\nFAKE\x1b[2Jlog"),
"owner id must appear in escaped form; got: {message:?}"
);
}
}