use std::fs;
use std::path::{Path, PathBuf};
use anyhow::{bail, Context, Result};
use crate::db::StateDb;
use crate::paths::state::StateLayout;
use crate::state::escalation::EscalationEntry;
use crate::state::runtime::{RecoveryOrigin, RuntimeCheckpointState, RuntimeHandoffState};
use crate::state::session::SessionStateFile;
use super::{handoff, projection, recovery, session};
#[derive(Debug, Default)]
pub(crate) struct MigrationReport {
pub(crate) imported_count: usize,
pub(crate) skipped: Vec<String>,
pub(crate) imported: Vec<String>,
}
pub(crate) fn import_legacy_json(db: &StateDb, layout: &StateLayout) -> Result<MigrationReport> {
let mut report = MigrationReport::default();
let mut to_rename: Vec<PathBuf> = Vec::new();
db.conn().execute_batch("BEGIN")?;
let result = (|| -> Result<()> {
import_handoff(db, layout, &mut report, &mut to_rename)?;
import_session(db, layout, &mut report, &mut to_rename)?;
import_escalation(db, layout, &mut report, &mut to_rename)?;
import_checkpoint(db, layout, &mut report, &mut to_rename)?;
import_working_buffer(db, layout, &mut report, &mut to_rename)?;
import_projection_metadata(db, layout, &mut report, &mut to_rename)?;
Ok(())
})();
match result {
Ok(()) => {
db.conn().execute_batch("COMMIT")?;
for path in &to_rename {
rename_migrated(path)?;
}
Ok(report)
}
Err(e) => {
let _ = db.conn().execute_batch("ROLLBACK");
Err(e)
}
}
}
fn import_handoff(
db: &StateDb,
layout: &StateLayout,
report: &mut MigrationReport,
to_rename: &mut Vec<PathBuf>,
) -> Result<()> {
let path = layout.clone_runtime_state_path();
let Some(contents) = read_if_exists(&path)? else {
report.skipped.push("handoff (missing)".into());
return Ok(());
};
let native: NativeCloneRuntimeState = serde_json::from_str(&contents)
.with_context(|| format!("migration: failed to parse {}", path.display()))?;
handoff::write(db.conn(), &native.handoff)?;
to_rename.push(path);
report.imported.push("handoff".into());
report.imported_count += 1;
Ok(())
}
fn import_session(
db: &StateDb,
layout: &StateLayout,
report: &mut MigrationReport,
to_rename: &mut Vec<PathBuf>,
) -> Result<()> {
let path = layout.session_state_path();
let Some(contents) = read_if_exists(&path)? else {
report.skipped.push("session (missing)".into());
return Ok(());
};
let state: SessionStateFile = serde_json::from_str(&contents)
.with_context(|| format!("migration: failed to parse {}", path.display()))?;
session::write(db.conn(), &state)?;
to_rename.push(path);
report.imported.push("session".into());
report.imported_count += 1;
Ok(())
}
fn import_escalation(
db: &StateDb,
layout: &StateLayout,
report: &mut MigrationReport,
to_rename: &mut Vec<PathBuf>,
) -> Result<()> {
let path = layout.escalation_state_path();
let Some(contents) = read_if_exists(&path)? else {
report.skipped.push("escalation (missing)".into());
return Ok(());
};
let state: LegacyEscalationStateFile = serde_json::from_str(&contents)
.with_context(|| format!("migration: failed to parse {}", path.display()))?;
for entry in &state.entries {
db.conn().execute(
"INSERT OR IGNORE INTO escalation (id, kind, reason, created_at_epoch_s, session_id)
VALUES (?1, ?2, ?3, ?4, ?5)",
rusqlite::params![
entry.id,
match entry.kind {
crate::state::escalation::EscalationKind::Blocking => "blocking",
crate::state::escalation::EscalationKind::NonBlocking => "non_blocking",
},
entry.reason,
entry.created_at_epoch_s,
entry.session_id,
],
)?;
}
to_rename.push(path);
report.imported.push("escalation".into());
report.imported_count += 1;
Ok(())
}
fn import_checkpoint(
db: &StateDb,
layout: &StateLayout,
report: &mut MigrationReport,
to_rename: &mut Vec<PathBuf>,
) -> Result<()> {
let path = layout.clone_checkpoint_path();
let Some(contents) = read_if_exists(&path)? else {
report.skipped.push("checkpoint (missing)".into());
return Ok(());
};
if contents.trim().is_empty() {
to_rename.push(path);
report.skipped.push("checkpoint (empty)".into());
return Ok(());
}
let checkpoint: LegacyCheckpointFile = serde_json::from_str(&contents)
.with_context(|| format!("migration: failed to parse {}", path.display()))?;
recovery::write_checkpoint(
db.conn(),
&RuntimeCheckpointState {
origin: checkpoint.origin,
captured_at_epoch_s: checkpoint.captured_at_epoch_s,
session_started_at_epoch_s: checkpoint.session_started_at_epoch_s,
summary: checkpoint.summary,
immediate_actions: checkpoint.immediate_actions,
key_files: checkpoint.key_files,
},
)?;
to_rename.push(path);
report.imported.push("checkpoint".into());
report.imported_count += 1;
Ok(())
}
fn import_working_buffer(
db: &StateDb,
layout: &StateLayout,
report: &mut MigrationReport,
to_rename: &mut Vec<PathBuf>,
) -> Result<()> {
use crate::handoff::extract_bulleted_section;
use crate::state::runtime::RuntimeWorkingBufferState;
let path = layout.clone_working_buffer_path();
let Some(contents) = read_if_exists(&path)? else {
report.skipped.push("working_buffer (missing)".into());
return Ok(());
};
if contents.trim().is_empty() {
to_rename.push(path);
report.skipped.push("working_buffer (empty)".into());
return Ok(());
}
let provenance = extract_bulleted_section(&contents, "Provenance");
let summary_lines = extract_bulleted_section(&contents, "Recent Exchange Summary");
if summary_lines.is_empty() {
bail!(
"migration: {} has content but no `## Recent Exchange Summary` bullet items; \
fix or remove the file before retrying",
path.display()
);
}
let origin = parse_provenance_origin(&provenance).ok_or_else(|| {
anyhow::anyhow!(
"migration: {} is missing or has invalid `Origin` in `## Provenance`; \
fix or remove the file before retrying",
path.display()
)
})?;
let captured_at_epoch_s =
parse_provenance_epoch(&provenance, "captured at epoch").ok_or_else(|| {
anyhow::anyhow!(
"migration: {} is missing or has invalid `Captured At Epoch` in `## Provenance`; \
fix or remove the file before retrying",
path.display()
)
})?;
let session_started_at_epoch_s =
parse_provenance_epoch(&provenance, "session started at epoch").ok_or_else(|| {
anyhow::anyhow!(
"migration: {} is missing or has invalid `Session Started At Epoch` in `## Provenance`; \
fix or remove the file before retrying",
path.display()
)
})?;
recovery::write_working_buffer(
db.conn(),
&RuntimeWorkingBufferState {
origin,
captured_at_epoch_s,
session_started_at_epoch_s,
summary_lines,
},
)?;
to_rename.push(path);
report.imported.push("working_buffer".into());
report.imported_count += 1;
Ok(())
}
fn import_projection_metadata(
db: &StateDb,
layout: &StateLayout,
report: &mut MigrationReport,
to_rename: &mut Vec<PathBuf>,
) -> Result<()> {
use crate::state::projection_metadata::ProjectionMetadataFile;
let path = layout.clone_projection_metadata_path();
let Some(contents) = read_if_exists(&path)? else {
report.skipped.push("projection_metadata (missing)".into());
return Ok(());
};
let metadata: ProjectionMetadataFile = serde_json::from_str(&contents)
.with_context(|| format!("migration: failed to parse {}", path.display()))?;
db.conn().execute("DELETE FROM projection_metadata", [])?;
for obs in &metadata.observations {
let digests_json = obs
.projection_digests
.as_ref()
.map(serde_json::to_string)
.transpose()?;
db.conn().execute(
"INSERT INTO projection_metadata
(observed_at_epoch_s, source_fingerprint, projection_digests,
tool_surface_fingerprint)
VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![
obs.observed_at_epoch_s,
obs.source_fingerprint,
digests_json,
obs.tool_surface_fingerprint,
],
)?;
}
projection::prune(db.conn(), 32)?;
to_rename.push(path);
report.imported.push("projection_metadata".into());
report.imported_count += 1;
Ok(())
}
fn read_if_exists(path: &Path) -> Result<Option<String>> {
match fs::read_to_string(path) {
Ok(contents) => Ok(Some(contents)),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e).with_context(|| format!("migration: failed to read {}", path.display())),
}
}
fn rename_migrated(path: &Path) -> Result<()> {
let mut migrated = path.as_os_str().to_owned();
migrated.push(".migrated");
fs::rename(path, &migrated)
.with_context(|| format!("migration: failed to rename {}", path.display()))?;
Ok(())
}
fn parse_provenance_origin(items: &[String]) -> Option<RecoveryOrigin> {
for item in items {
if let Some((key, value)) = item.split_once(':') {
if key.trim().eq_ignore_ascii_case("origin") {
return match value.trim() {
"compaction" => Some(RecoveryOrigin::Compaction),
"risky_pause" => Some(RecoveryOrigin::RiskyPause),
"manual" => Some(RecoveryOrigin::Manual),
_ => None,
};
}
}
}
None
}
fn parse_provenance_epoch(items: &[String], key_lower: &str) -> Option<u64> {
for item in items {
if let Some((key, value)) = item.split_once(':') {
if key.trim().to_ascii_lowercase() == key_lower {
return value.trim().parse().ok();
}
}
}
None
}
#[derive(serde::Deserialize)]
struct NativeCloneRuntimeState {
#[serde(rename = "schema_version")]
_schema_version: u32,
handoff: RuntimeHandoffState,
}
#[derive(serde::Deserialize)]
struct LegacyEscalationStateFile {
#[serde(rename = "schema_version")]
_schema_version: u32,
entries: Vec<EscalationEntry>,
}
#[derive(serde::Deserialize)]
struct LegacyCheckpointFile {
#[serde(rename = "schema_version")]
_schema_version: u32,
origin: RecoveryOrigin,
captured_at_epoch_s: u64,
session_started_at_epoch_s: u64,
summary: String,
#[serde(default)]
immediate_actions: Vec<String>,
#[serde(default)]
key_files: Vec<String>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::{escalation, StateDb};
use crate::profile::ProfileName;
use crate::state::escalation::EscalationKind;
fn test_layout(temp: &std::path::Path) -> StateLayout {
StateLayout::new(
temp.join(".ccd"),
temp.join("repo/.git/ccd"),
ProfileName::new("main").expect("profile"),
)
}
fn setup_dirs(layout: &StateLayout) {
fs::create_dir_all(layout.clone_profile_root()).expect("clone profile root");
fs::create_dir_all(layout.clone_runtime_state_root()).expect("clone runtime root");
}
#[test]
fn import_with_no_legacy_files_is_noop() {
let temp = tempfile::tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
let db = StateDb::open(&layout.state_db_path()).unwrap();
let report = import_legacy_json(&db, &layout).unwrap();
assert_eq!(report.imported_count, 0);
assert_eq!(report.skipped.len(), 6);
}
#[test]
fn import_session_state() {
let temp = tempfile::tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
fs::write(
layout.session_state_path(),
r#"{"schema_version":3,"started_at_epoch_s":1000,"last_started_at_epoch_s":2000,"start_count":3,"session_id":"ses_TEST"}"#,
).unwrap();
let db = StateDb::open(&layout.state_db_path()).unwrap();
let report = import_legacy_json(&db, &layout).unwrap();
assert_eq!(report.imported_count, 1);
assert!(report.imported.contains(&"session".to_owned()));
let loaded = session::read(db.conn())
.unwrap()
.expect("session should exist");
assert_eq!(loaded.start_count, 3);
assert_eq!(loaded.session_id.as_deref(), Some("ses_TEST"));
assert!(!layout.session_state_path().exists());
let mut migrated = layout.session_state_path().as_os_str().to_owned();
migrated.push(".migrated");
assert!(std::path::Path::new(&migrated).exists());
}
#[test]
fn import_handoff_state() {
let temp = tempfile::tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
fs::write(
layout.clone_runtime_state_path(),
r#"{"schema_version":1,"handoff":{"title":"Test","immediate_actions":[],"completed_state":[],"operational_guardrails":[],"key_files":[],"definition_of_done":[]}}"#,
).unwrap();
let db = StateDb::open(&layout.state_db_path()).unwrap();
let report = import_legacy_json(&db, &layout).unwrap();
assert!(report.imported.contains(&"handoff".to_owned()));
let loaded = handoff::read(db.conn())
.unwrap()
.expect("handoff should exist");
assert_eq!(loaded.title, "Test");
}
#[test]
fn import_escalation_state() {
let temp = tempfile::tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
fs::write(
layout.escalation_state_path(),
r#"{"schema_version":1,"entries":[{"id":"esc_1","kind":"blocking","reason":"needs review","created_at_epoch_s":1000}]}"#,
).unwrap();
let db = StateDb::open(&layout.state_db_path()).unwrap();
let report = import_legacy_json(&db, &layout).unwrap();
assert!(report.imported.contains(&"escalation".to_owned()));
let entries = escalation::list(db.conn()).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].id, "esc_1");
assert_eq!(entries[0].kind, EscalationKind::Blocking);
}
#[test]
fn import_is_idempotent_after_rename() {
let temp = tempfile::tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
fs::write(
layout.session_state_path(),
r#"{"schema_version":3,"started_at_epoch_s":1000,"last_started_at_epoch_s":2000,"start_count":1,"session_id":"ses_1"}"#,
).unwrap();
let db = StateDb::open(&layout.state_db_path()).unwrap();
import_legacy_json(&db, &layout).unwrap();
let report2 = import_legacy_json(&db, &layout).unwrap();
assert_eq!(report2.imported_count, 0);
}
#[test]
fn import_partial_files() {
let temp = tempfile::tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
fs::write(
layout.session_state_path(),
r#"{"schema_version":3,"started_at_epoch_s":1000,"last_started_at_epoch_s":2000,"start_count":1}"#,
).unwrap();
let db = StateDb::open(&layout.state_db_path()).unwrap();
let report = import_legacy_json(&db, &layout).unwrap();
assert_eq!(report.imported_count, 1);
assert_eq!(report.skipped.len(), 5);
}
#[test]
fn malformed_working_buffer_is_rejected() {
let temp = tempfile::tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
fs::write(
layout.clone_working_buffer_path(),
"## Recent Exchange Summary\n\n- Did some work\n",
)
.unwrap();
let db = StateDb::open(&layout.state_db_path()).unwrap();
let result = import_legacy_json(&db, &layout);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("Origin"), "should mention Origin: {err}");
assert!(layout.clone_working_buffer_path().exists());
}
#[test]
fn failed_import_does_not_rename_any_files() {
let temp = tempfile::tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
fs::write(
layout.session_state_path(),
r#"{"schema_version":3,"started_at_epoch_s":1000,"last_started_at_epoch_s":2000,"start_count":1}"#,
).unwrap();
fs::write(
layout.clone_working_buffer_path(),
"## Recent Exchange Summary\n\n- Did some work\n",
)
.unwrap();
let db = StateDb::open(&layout.state_db_path()).unwrap();
let result = import_legacy_json(&db, &layout);
assert!(result.is_err());
assert!(layout.session_state_path().exists());
assert!(layout.clone_working_buffer_path().exists());
}
#[test]
fn failed_import_rolls_back_db_and_retry_succeeds() {
let temp = tempfile::tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
fs::write(
layout.escalation_state_path(),
r#"{"schema_version":1,"entries":[{"id":"esc_TX","kind":"blocking","reason":"test","created_at_epoch_s":1000}]}"#,
).unwrap();
fs::write(
layout.clone_working_buffer_path(),
"## Recent Exchange Summary\n\n- Did some work\n",
)
.unwrap();
let db = StateDb::open(&layout.state_db_path()).unwrap();
let result = import_legacy_json(&db, &layout);
assert!(result.is_err());
let entries = escalation::list(db.conn()).unwrap();
assert!(
entries.is_empty(),
"transaction should have rolled back escalation inserts"
);
assert!(layout.escalation_state_path().exists());
assert!(layout.clone_working_buffer_path().exists());
fs::write(
layout.clone_working_buffer_path(),
concat!(
"## Provenance\n\n",
"- Origin: compaction\n",
"- Captured At Epoch: 1000\n",
"- Session Started At Epoch: 900\n\n",
"## Recent Exchange Summary\n\n",
"- Did some work\n",
),
)
.unwrap();
let report = import_legacy_json(&db, &layout).unwrap();
assert_eq!(report.imported_count, 2);
let entries = escalation::list(db.conn()).unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].id, "esc_TX");
let buf = recovery::read_working_buffer(db.conn())
.unwrap()
.expect("working buffer");
assert_eq!(buf.summary_lines, vec!["Did some work"]);
assert!(!layout.escalation_state_path().exists());
assert!(!layout.clone_working_buffer_path().exists());
}
#[test]
fn reimport_after_partial_rename_is_idempotent() {
let temp = tempfile::tempdir().unwrap();
let layout = test_layout(temp.path());
setup_dirs(&layout);
fs::write(
layout.escalation_state_path(),
r#"{"schema_version":1,"entries":[{"id":"esc_IDEM","kind":"blocking","reason":"test","created_at_epoch_s":1000}]}"#,
).unwrap();
fs::write(
layout.clone_projection_metadata_path(),
r#"{"schema_version":1,"observations":[{"observed_at_epoch_s":2000,"source_fingerprint":"fp_IDEM"}]}"#,
).unwrap();
let db = StateDb::open(&layout.state_db_path()).unwrap();
let report1 = import_legacy_json(&db, &layout).unwrap();
assert_eq!(report1.imported_count, 2);
fs::write(
layout.escalation_state_path(),
r#"{"schema_version":1,"entries":[{"id":"esc_IDEM","kind":"blocking","reason":"test","created_at_epoch_s":1000}]}"#,
).unwrap();
fs::write(
layout.clone_projection_metadata_path(),
r#"{"schema_version":1,"observations":[{"observed_at_epoch_s":2000,"source_fingerprint":"fp_IDEM"}]}"#,
).unwrap();
let report2 = import_legacy_json(&db, &layout).unwrap();
assert_eq!(report2.imported_count, 2);
let entries = escalation::list(db.conn()).unwrap();
assert_eq!(entries.len(), 1, "escalation should not duplicate on retry");
assert_eq!(entries[0].id, "esc_IDEM");
let projections = projection::list(db.conn()).unwrap();
assert_eq!(
projections.len(),
1,
"projection should not duplicate on retry"
);
assert_eq!(projections[0].source_fingerprint, "fp_IDEM");
}
}