use std::io::Read as _;
use std::path::Path;
use std::process::ExitCode;
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::{bail, Context, Result};
use serde::{Deserialize, Serialize};
use crate::db;
use crate::output::CommandReport;
use crate::paths::state::StateLayout;
use crate::profile;
use crate::repo::marker as repo_marker;
use crate::state::{
protected_write::{self, AppendWriteAuthority, AppendWriteOutcome},
runtime::{RecoveryOrigin, RuntimeCheckpointState, RuntimeWorkingBufferState},
session as session_state,
};
#[derive(Debug, Clone, Deserialize)]
pub(crate) struct CheckpointInput {
summary: String,
#[serde(default)]
immediate_actions: Vec<String>,
#[serde(default)]
key_files: Vec<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub(crate) struct WorkingBufferInput {
summary_lines: Vec<String>,
}
#[derive(Debug, Clone, Deserialize)]
pub(crate) struct RecoveryWriteInput {
#[serde(default)]
pub(crate) origin: Option<String>,
#[serde(default)]
pub(crate) checkpoint: Option<CheckpointInput>,
#[serde(default)]
pub(crate) working_buffer: Option<WorkingBufferInput>,
}
#[derive(Serialize)]
pub struct RecoveryWriteReport {
command: &'static str,
ok: bool,
outcome: AppendWriteOutcome,
path: String,
profile: String,
native_state_path: String,
wrote_checkpoint: bool,
wrote_working_buffer: bool,
origin: String,
#[serde(skip_serializing_if = "Option::is_none")]
checkpoint_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
checkpoint_outcome: Option<AppendWriteOutcome>,
#[serde(skip_serializing_if = "Option::is_none")]
working_buffer_id: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
working_buffer_outcome: Option<AppendWriteOutcome>,
#[serde(skip_serializing_if = "Option::is_none")]
message: Option<String>,
}
impl CommandReport for RecoveryWriteReport {
fn exit_code(&self) -> ExitCode {
if self.ok {
ExitCode::SUCCESS
} else {
ExitCode::from(1)
}
}
fn render_text(&self) {
if !self.ok {
println!(
"{}",
self.message
.as_deref()
.unwrap_or("Recovery write was rejected.")
);
return;
}
println!("Wrote recovery state to {}", self.native_state_path);
println!("Origin: {}", self.origin);
if self.wrote_checkpoint {
println!("Checkpoint: written");
} else if self.checkpoint_outcome == Some(AppendWriteOutcome::IdempotentNoop) {
println!("Checkpoint: idempotent noop");
}
if self.wrote_working_buffer {
println!("Working buffer: written");
} else if self.working_buffer_outcome == Some(AppendWriteOutcome::IdempotentNoop) {
println!("Working buffer: idempotent noop");
}
}
}
fn ensure_profile_exists(layout: &StateLayout, repo_root: &Path) -> Result<()> {
let profile_root = layout.profile_root();
if profile_root.is_dir() {
return Ok(());
}
bail!(
"profile `{}` does not exist at {}; bootstrap it with `ccd attach --path {}` before writing recovery state",
layout.profile(),
profile_root.display(),
repo_root.display()
)
}
fn ensure_repo_linked(repo_root: &Path) -> Result<String> {
let Some(marker) = repo_marker::load(repo_root)? else {
bail!(
"repo is not linked: {} is missing; run `ccd attach --path {}` or `ccd link --path {}` first",
repo_root.join(repo_marker::MARKER_FILE).display(),
repo_root.display(),
repo_root.display()
)
};
Ok(marker.locality_id)
}
fn parse_origin(input: Option<&str>) -> Result<RecoveryOrigin> {
match input {
Some("compaction") => Ok(RecoveryOrigin::Compaction),
Some("risky_pause") => Ok(RecoveryOrigin::RiskyPause),
Some("manual") | None => Ok(RecoveryOrigin::Manual),
Some(other) => bail!(
"unknown recovery origin `{other}`; expected one of: compaction, risky_pause, manual"
),
}
}
fn origin_label(origin: RecoveryOrigin) -> &'static str {
match origin {
RecoveryOrigin::Compaction => "compaction",
RecoveryOrigin::RiskyPause => "risky_pause",
RecoveryOrigin::Manual => "manual",
}
}
fn now_epoch_s() -> Result<u64> {
Ok(SystemTime::now()
.duration_since(UNIX_EPOCH)
.context("system clock before UNIX epoch")?
.as_secs())
}
fn require_active_session(layout: &StateLayout) -> Result<session_state::SessionStateFile> {
let db_path = layout.state_db_path();
if !db_path.exists() {
bail!(
"no active session at {}; run `ccd session-state start --path .` before writing recovery state",
db_path.display()
);
}
match session_state::load_for_layout(layout)? {
Some(state) if state.started_at_epoch_s > 0 => Ok(state),
_ => bail!(
"no active session at {}; run `ccd session-state start --path .` before writing recovery state",
db_path.display()
),
}
}
const MAX_CHECKPOINT_SUMMARY_CHARS: usize = 280;
const MAX_RECOVERY_IMMEDIATE_ACTIONS: usize = 8;
const MAX_RECOVERY_KEY_FILES: usize = 8;
const MAX_WORKING_BUFFER_SUMMARY_LINES: usize = 12;
const MAX_WORKING_BUFFER_BYTES: usize = 4096;
fn validate_checkpoint(cp: &CheckpointInput) -> Result<()> {
let summary = cp.summary.trim();
if summary.is_empty() {
bail!("checkpoint summary must not be empty");
}
if summary.len() > MAX_CHECKPOINT_SUMMARY_CHARS {
bail!(
"checkpoint summary is {} characters; the limit is {}",
summary.len(),
MAX_CHECKPOINT_SUMMARY_CHARS
);
}
if cp.immediate_actions.len() > MAX_RECOVERY_IMMEDIATE_ACTIONS {
bail!(
"checkpoint has {} immediate_actions; the limit is {}",
cp.immediate_actions.len(),
MAX_RECOVERY_IMMEDIATE_ACTIONS
);
}
if cp.immediate_actions.iter().any(|a| a.trim().is_empty()) {
bail!("checkpoint immediate_actions must not contain empty entries");
}
if cp.key_files.len() > MAX_RECOVERY_KEY_FILES {
bail!(
"checkpoint has {} key_files; the limit is {}",
cp.key_files.len(),
MAX_RECOVERY_KEY_FILES
);
}
if cp.key_files.iter().any(|f| f.trim().is_empty()) {
bail!("checkpoint key_files must not contain empty entries");
}
Ok(())
}
fn validate_working_buffer(wb: &WorkingBufferInput) -> Result<()> {
if wb.summary_lines.is_empty() {
bail!("working_buffer summary_lines must not be empty");
}
if wb.summary_lines.len() > MAX_WORKING_BUFFER_SUMMARY_LINES {
bail!(
"working_buffer has {} summary_lines; the limit is {}",
wb.summary_lines.len(),
MAX_WORKING_BUFFER_SUMMARY_LINES
);
}
if wb.summary_lines.iter().any(|l| l.trim().is_empty()) {
bail!("working_buffer summary_lines must not contain empty entries");
}
let total_bytes: usize = wb.summary_lines.iter().map(|l| l.len()).sum();
if total_bytes > MAX_WORKING_BUFFER_BYTES {
bail!(
"working_buffer summary_lines total {total_bytes} bytes; the limit is {MAX_WORKING_BUFFER_BYTES} bytes"
);
}
Ok(())
}
pub fn write(
repo_root: &Path,
explicit_profile: Option<&str>,
write_options: protected_write::ExclusiveWriteOptions,
) -> Result<RecoveryWriteReport> {
write_with_input(
repo_root,
explicit_profile,
read_write_input_from_stdin()?,
write_options,
)
}
pub(crate) fn read_write_input_from_stdin() -> Result<RecoveryWriteInput> {
let mut buf = String::new();
std::io::stdin()
.read_to_string(&mut buf)
.context("failed to read recovery JSON from stdin")?;
serde_json::from_str(&buf).context("failed to parse recovery JSON from stdin")
}
pub(crate) fn write_with_input(
repo_root: &Path,
explicit_profile: Option<&str>,
input: RecoveryWriteInput,
write_options: protected_write::ExclusiveWriteOptions,
) -> Result<RecoveryWriteReport> {
let profile = profile::resolve(explicit_profile)?;
let layout = StateLayout::resolve(repo_root, profile.clone())?;
ensure_profile_exists(&layout, repo_root)?;
let _locality_id = ensure_repo_linked(repo_root)?;
if input.checkpoint.is_none() && input.working_buffer.is_none() {
bail!("at least one of `checkpoint` or `working_buffer` must be present in the input");
}
let origin = parse_origin(input.origin.as_deref())?;
let captured_at = now_epoch_s()?;
let active_session = require_active_session(&layout)?;
let session_started_at = active_session.started_at_epoch_s;
if let Some(conflict) = protected_write::authorize_append_surface_write(
&layout,
"recovery",
&write_options,
AppendWriteAuthority::OwnerOnly,
)? {
return Ok(RecoveryWriteReport {
command: "recovery-write",
ok: false,
outcome: conflict.outcome,
path: repo_root.display().to_string(),
profile: profile.to_string(),
native_state_path: layout.state_db_path().display().to_string(),
wrote_checkpoint: false,
wrote_working_buffer: false,
origin: origin_label(origin).to_owned(),
checkpoint_id: input
.checkpoint
.as_ref()
.map(|_| stable_recovery_id("checkpoint", origin)),
checkpoint_outcome: None,
working_buffer_id: input
.working_buffer
.as_ref()
.map(|_| stable_recovery_id("working_buffer", origin)),
working_buffer_outcome: None,
message: Some(conflict.message),
});
}
if let Some(ref cp) = input.checkpoint {
validate_checkpoint(cp)?;
}
if let Some(ref wb) = input.working_buffer {
validate_working_buffer(wb)?;
}
let db = db::StateDb::open(&layout.state_db_path())?;
let existing_checkpoint = db::recovery::read_checkpoint(db.conn())?;
let existing_working_buffer = db::recovery::read_working_buffer(db.conn())?;
let checkpoint_id = input
.checkpoint
.as_ref()
.map(|_| stable_recovery_id("checkpoint", origin));
let working_buffer_id = input
.working_buffer
.as_ref()
.map(|_| stable_recovery_id("working_buffer", origin));
let checkpoint_outcome = if let Some(cp) = input.checkpoint {
if existing_checkpoint
.as_ref()
.is_some_and(|existing| checkpoint_matches(existing, origin, session_started_at, &cp))
{
Some(AppendWriteOutcome::IdempotentNoop)
} else {
db::recovery::write_checkpoint(
db.conn(),
&RuntimeCheckpointState {
origin,
captured_at_epoch_s: captured_at,
session_started_at_epoch_s: session_started_at,
summary: cp.summary,
immediate_actions: cp.immediate_actions,
key_files: cp.key_files,
},
)?;
Some(AppendWriteOutcome::Applied)
}
} else {
None
};
let working_buffer_outcome = if let Some(wb) = input.working_buffer {
if existing_working_buffer.as_ref().is_some_and(|existing| {
working_buffer_matches(existing, origin, session_started_at, &wb)
}) {
Some(AppendWriteOutcome::IdempotentNoop)
} else {
db::recovery::write_working_buffer(
db.conn(),
&RuntimeWorkingBufferState {
origin,
captured_at_epoch_s: captured_at,
session_started_at_epoch_s: session_started_at,
summary_lines: wb.summary_lines,
},
)?;
Some(AppendWriteOutcome::Applied)
}
} else {
None
};
let wrote_checkpoint = checkpoint_outcome == Some(AppendWriteOutcome::Applied);
let wrote_working_buffer = working_buffer_outcome == Some(AppendWriteOutcome::Applied);
let outcome = combine_recovery_outcomes(checkpoint_outcome, working_buffer_outcome);
Ok(RecoveryWriteReport {
command: "recovery-write",
ok: true,
outcome,
path: repo_root.display().to_string(),
profile: profile.to_string(),
native_state_path: layout.state_db_path().display().to_string(),
wrote_checkpoint,
wrote_working_buffer,
origin: origin_label(origin).to_owned(),
checkpoint_id,
checkpoint_outcome,
working_buffer_id,
working_buffer_outcome,
message: None,
})
}
fn stable_recovery_id(kind: &str, origin: RecoveryOrigin) -> String {
format!("recovery_{kind}:{}", origin_label(origin))
}
fn checkpoint_matches(
existing: &RuntimeCheckpointState,
origin: RecoveryOrigin,
session_started_at_epoch_s: u64,
input: &CheckpointInput,
) -> bool {
existing.origin == origin
&& existing.session_started_at_epoch_s == session_started_at_epoch_s
&& existing.summary == input.summary
&& existing.immediate_actions == input.immediate_actions
&& existing.key_files == input.key_files
}
fn working_buffer_matches(
existing: &RuntimeWorkingBufferState,
origin: RecoveryOrigin,
session_started_at_epoch_s: u64,
input: &WorkingBufferInput,
) -> bool {
existing.origin == origin
&& existing.session_started_at_epoch_s == session_started_at_epoch_s
&& existing.summary_lines == input.summary_lines
}
fn combine_recovery_outcomes(
checkpoint_outcome: Option<AppendWriteOutcome>,
working_buffer_outcome: Option<AppendWriteOutcome>,
) -> AppendWriteOutcome {
if checkpoint_outcome == Some(AppendWriteOutcome::Applied)
|| working_buffer_outcome == Some(AppendWriteOutcome::Applied)
{
AppendWriteOutcome::Applied
} else {
AppendWriteOutcome::IdempotentNoop
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_origin_defaults_to_manual() {
assert_eq!(parse_origin(None).unwrap(), RecoveryOrigin::Manual);
assert_eq!(
parse_origin(Some("manual")).unwrap(),
RecoveryOrigin::Manual
);
}
#[test]
fn parse_origin_accepts_known_values() {
assert_eq!(
parse_origin(Some("compaction")).unwrap(),
RecoveryOrigin::Compaction
);
assert_eq!(
parse_origin(Some("risky_pause")).unwrap(),
RecoveryOrigin::RiskyPause
);
}
#[test]
fn parse_origin_rejects_unknown() {
assert!(parse_origin(Some("bogus")).is_err());
}
#[test]
fn parse_minimal_checkpoint_input() {
let json = r#"{"checkpoint": {"summary": "Working on X"}}"#;
let input: RecoveryWriteInput = serde_json::from_str(json).unwrap();
assert!(input.checkpoint.is_some());
assert!(input.working_buffer.is_none());
assert_eq!(input.checkpoint.unwrap().summary, "Working on X");
}
#[test]
fn parse_working_buffer_input() {
let json = r#"{"working_buffer": {"summary_lines": ["line1", "line2"]}}"#;
let input: RecoveryWriteInput = serde_json::from_str(json).unwrap();
assert!(input.checkpoint.is_none());
assert!(input.working_buffer.is_some());
assert_eq!(input.working_buffer.unwrap().summary_lines.len(), 2);
}
#[test]
fn parse_both_checkpoint_and_buffer() {
let json = r#"{
"origin": "compaction",
"checkpoint": {"summary": "mid-task", "immediate_actions": ["finish it"], "key_files": ["src/foo.rs"]},
"working_buffer": {"summary_lines": ["recent context"]}
}"#;
let input: RecoveryWriteInput = serde_json::from_str(json).unwrap();
assert!(input.checkpoint.is_some());
assert!(input.working_buffer.is_some());
assert_eq!(input.origin.as_deref(), Some("compaction"));
}
#[test]
fn rejects_empty_input() {
let json = r#"{}"#;
let input: RecoveryWriteInput = serde_json::from_str(json).unwrap();
assert!(input.checkpoint.is_none());
assert!(input.working_buffer.is_none());
}
#[test]
fn validate_checkpoint_rejects_empty_summary() {
let cp = CheckpointInput {
summary: "".to_owned(),
immediate_actions: vec![],
key_files: vec![],
};
assert!(validate_checkpoint(&cp)
.unwrap_err()
.to_string()
.contains("summary must not be empty"));
}
#[test]
fn validate_checkpoint_rejects_oversized_summary() {
let cp = CheckpointInput {
summary: "x".repeat(MAX_CHECKPOINT_SUMMARY_CHARS + 1),
immediate_actions: vec![],
key_files: vec![],
};
assert!(validate_checkpoint(&cp)
.unwrap_err()
.to_string()
.contains("limit is"));
}
#[test]
fn validate_checkpoint_rejects_too_many_actions() {
let cp = CheckpointInput {
summary: "ok".to_owned(),
immediate_actions: (0..MAX_RECOVERY_IMMEDIATE_ACTIONS + 1)
.map(|i| format!("action {i}"))
.collect(),
key_files: vec![],
};
assert!(validate_checkpoint(&cp)
.unwrap_err()
.to_string()
.contains("immediate_actions"));
}
#[test]
fn validate_working_buffer_rejects_empty_lines() {
let wb = WorkingBufferInput {
summary_lines: vec![],
};
assert!(validate_working_buffer(&wb)
.unwrap_err()
.to_string()
.contains("must not be empty"));
}
#[test]
fn validate_working_buffer_rejects_too_many_lines() {
let wb = WorkingBufferInput {
summary_lines: (0..MAX_WORKING_BUFFER_SUMMARY_LINES + 1)
.map(|i| format!("line {i}"))
.collect(),
};
assert!(validate_working_buffer(&wb)
.unwrap_err()
.to_string()
.contains("limit is"));
}
#[test]
fn validate_working_buffer_rejects_oversized_bytes() {
let wb = WorkingBufferInput {
summary_lines: vec!["x".repeat(MAX_WORKING_BUFFER_BYTES + 1)],
};
let err = validate_working_buffer(&wb).unwrap_err().to_string();
assert!(
err.contains("bytes"),
"expected byte limit error, got: {err}"
);
}
}