use relayburn_sdk::{
ingest_all, Ledger, LedgerHandle, LedgerOpenOptions, ResetSummary, StateStatus,
};
use crate::cli::{
ArchiveAction, GlobalArgs, StateArgs, StateRebuildArgs, StateRebuildTarget, StateSubcommand,
};
use crate::render::error::{report_advisory, report_error, report_ledger_error};
use crate::render::json::render_json;
use crate::render::progress::TaskProgress;
pub fn run(globals: &GlobalArgs, args: StateArgs) -> i32 {
let sub = args
.command
.unwrap_or(StateSubcommand::Status(Default::default()));
match sub {
StateSubcommand::Status(_) => run_status(globals),
StateSubcommand::Rebuild(rebuild) => run_rebuild(globals, rebuild),
StateSubcommand::Prune(prune) => run_prune(globals, prune),
StateSubcommand::Reset(reset) => run_reset(globals, reset),
}
}
fn run_status(globals: &GlobalArgs) -> i32 {
let progress = TaskProgress::new(globals, "state");
let opts = LedgerOpenOptions {
home: globals.ledger_path.clone(),
content_home: None,
};
progress.set_task("opening ledger");
let handle = match Ledger::open(opts) {
Ok(h) => h,
Err(err) => {
progress.finish_and_clear();
return report_anyhow(&err, globals);
}
};
progress.set_task("reading derived state");
let status = match handle.state_status() {
Ok(s) => s,
Err(err) => {
progress.finish_and_clear();
return report_anyhow(&err, globals);
}
};
progress.finish_and_clear();
if globals.json {
if let Err(err) = render_json(&status) {
return report_error(&err, globals);
}
return 0;
}
print!("{}", format_status(&status));
0
}
fn format_status(s: &StateStatus) -> String {
let mut out = String::new();
out.push_str(&format!("derived state at {}:\n", s.home));
out.push_str("events DB (burn.sqlite):\n");
out.push_str(&format!(
" path: {}\n",
rel_to_home(&s.burn.path, &s.home)
));
if !s.burn.exists {
out.push_str(" status: not built yet\n");
}
out.push_str(&format!(
" tracked rows: {}\n",
format_int(s.burn.tracked_rows)
));
out.push_str(&format!(
" turns: {}\n",
format_int(s.burn.rows.turns)
));
out.push_str(&format!(
" user_turns: {}\n",
format_int(s.burn.rows.user_turns)
));
out.push_str(&format!(
" compactions: {}\n",
format_int(s.burn.rows.compactions)
));
out.push_str(&format!(
" relationships: {}\n",
format_int(s.burn.rows.relationships)
));
out.push_str(&format!(
" tool_result_events: {}\n",
format_int(s.burn.rows.tool_result_events)
));
out.push_str(&format!(
" sessions: {}\n",
format_int(s.burn.rows.sessions)
));
out.push_str(&format!(
" stamps: {}\n",
format_int(s.burn.rows.stamps)
));
out.push_str("content DB (content.sqlite):\n");
out.push_str(&format!(
" path: {}\n",
rel_to_home(&s.content.path, &s.home)
));
if !s.content.exists {
out.push_str(" status: not built yet\n");
}
out.push_str(&format!(" rows: {}\n", format_int(s.content.rows)));
out.push_str("archive state:\n");
out.push_str(&format!(" schema version: {}\n", s.archive.schema_version));
out.push_str(&format!(
" last built: {}\n",
s.archive.last_built_at.as_deref().unwrap_or("never")
));
out.push_str(&format!(
" last rebuild: {}\n",
s.archive.last_rebuild_at.as_deref().unwrap_or("never")
));
out.push_str("config:\n");
out.push_str(&format!(" store: {}\n", s.config.store));
let retention = if s.config.retention_forever {
"forever".to_string()
} else {
match s.config.retention_days {
Some(d) => format!("{} days", format_retention_days(d)),
None => "forever".to_string(),
}
};
out.push_str(&format!(" retention: {}\n", retention));
out
}
fn rel_to_home(path: &str, home: &str) -> String {
if home.is_empty() {
return path.to_string();
}
let home = home.trim_end_matches('/');
if home.is_empty() {
return path.to_string();
}
let rest = match path.strip_prefix(home) {
Some("") => "",
Some(after) if after.starts_with('/') => after.trim_start_matches('/'),
_ => return path.to_string(),
};
format!("${{RELAYBURN_HOME}}/{}", rest)
}
fn format_int(n: u64) -> String {
let s = n.to_string();
let mut out = String::with_capacity(s.len() + s.len() / 3);
for (i, ch) in s.chars().rev().enumerate() {
if i > 0 && i % 3 == 0 {
out.push(',');
}
out.push(ch);
}
out.chars().rev().collect()
}
fn format_bytes(n: u64) -> String {
if n < 1024 {
return format!("{} bytes", n);
}
let units = ["KB", "MB", "GB", "TB"];
let mut v = n as f64 / 1024.0;
let mut i = 0usize;
while v >= 1024.0 && i < units.len() - 1 {
v /= 1024.0;
i += 1;
}
let formatted = if v >= 100.0 {
format!("{:.0}", v)
} else if v >= 10.0 {
format!("{:.1}", v)
} else {
format!("{:.2}", v)
};
format!("{} {}", formatted, units[i])
}
fn format_retention_days(d: f64) -> String {
if d.fract() == 0.0 {
format!("{}", d as u64)
} else {
format!("{}", d)
}
}
fn run_rebuild(globals: &GlobalArgs, args: StateRebuildArgs) -> i32 {
match args.target {
StateRebuildTarget::Index | StateRebuildTarget::Content => run_rebuild_derivable(globals),
StateRebuildTarget::All(all_args) => {
if all_args.force {
report_advisory(
"state rebuild all --force: standalone reclassify is not yet \
implemented in the Rust port (#240 follow-up); --force will \
apply once classify is wired",
globals,
);
}
run_rebuild_derivable(globals)
}
StateRebuildTarget::Archive(archive_args) => {
if archive_args.full {
report_advisory(
"state rebuild archive --full: in 2.0 every rebuild replays \
from zero, so --full is a no-op",
globals,
);
}
if archive_args.vacuum || matches!(archive_args.action, Some(ArchiveAction::Vacuum)) {
report_advisory(
"state rebuild archive vacuum: 2.0 collapses archive.sqlite \
into burn.sqlite, so there is nothing to vacuum",
globals,
);
}
run_rebuild_derivable(globals)
}
StateRebuildTarget::Classify(_) => {
let msg = "burn state rebuild classify: standalone reclassify is not yet \
implemented in the Rust port (filed as a follow-up under #240). \
Today the ingest pipeline classifies at append time; run \
`burn state rebuild all` to drop + replay derivable tables.";
if globals.json {
let envelope = serde_json::json!({ "error": msg });
let _ = render_json(&envelope);
} else {
eprintln!("burn: {msg}");
}
1
}
}
}
fn run_rebuild_derivable(globals: &GlobalArgs) -> i32 {
let progress = TaskProgress::new(globals, "state");
let opts = LedgerOpenOptions {
home: globals.ledger_path.clone(),
content_home: None,
};
progress.set_task("opening ledger");
let mut handle = match Ledger::open(opts) {
Ok(h) => h,
Err(err) => {
progress.finish_and_clear();
return report_anyhow(&err, globals);
}
};
progress.set_task("rebuilding derivable state");
let summary = match handle.raw_mut().rebuild_derivable() {
Ok(s) => s,
Err(err) => {
progress.finish_and_clear();
return report_ledger_error(&err, globals);
}
};
progress.finish_and_clear();
if globals.json {
let payload = serde_json::json!({
"rowsDropped": summary.rows_dropped,
"contentRowsDropped": summary.content_rows_dropped,
});
if let Err(err) = render_json(&payload) {
return report_error(&err, globals);
}
} else {
println!(
"rebuilt derivable state: dropped {} event rows + {} content rows",
format_int(summary.rows_dropped as u64),
format_int(summary.content_rows_dropped as u64),
);
println!(
" re-ingest from upstream session files via 'burn ingest' to \
repopulate."
);
}
0
}
fn run_prune(globals: &GlobalArgs, args: crate::cli::StatePruneArgs) -> i32 {
use relayburn_sdk::{load_config_with_home, Retention};
let progress = TaskProgress::new(globals, "state");
progress.set_task("loading retention config");
let cfg = match load_config_with_home(globals.ledger_path.as_deref()) {
Ok(c) => c,
Err(err) => {
progress.finish_and_clear();
return report_ledger_error(&err, globals);
}
};
let retention = match args.days.as_deref() {
Some(s) => match parse_retention(s) {
Some(r) => r,
None => {
progress.finish_and_clear();
let msg = format!(
"invalid --days value: {:?} (expected a number or \"forever\")",
s
);
if globals.json {
let envelope = serde_json::json!({ "error": msg });
let _ = render_json(&envelope);
} else {
eprintln!("burn state prune: {msg}");
}
return 2;
}
},
None => cfg.content.retention_days,
};
let cutoff_ms = match retention {
Retention::Forever => {
progress.finish_and_clear();
if globals.json {
let payload =
serde_json::json!({ "rowsDeleted": 0, "bytesFreed": 0, "retention": "forever" });
let _ = render_json(&payload);
} else {
println!("content retention=forever - nothing to prune");
}
return 0;
}
Retention::Days(_) => match retention.as_millis() {
Some(ms) => ms,
None => {
progress.finish_and_clear();
if globals.json {
let payload = serde_json::json!({
"rowsDeleted": 0, "bytesFreed": 0, "retention": "forever"
});
let _ = render_json(&payload);
} else {
println!("content retention=forever - nothing to prune");
}
return 0;
}
},
};
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let cutoff_ms = now_ms.saturating_sub(cutoff_ms);
let cutoff = format_cutoff_ts(cutoff_ms);
let opts = LedgerOpenOptions {
home: globals.ledger_path.clone(),
content_home: None,
};
progress.set_task("opening ledger");
let mut handle = match Ledger::open(opts) {
Ok(h) => h,
Err(err) => {
progress.finish_and_clear();
return report_anyhow(&err, globals);
}
};
progress.set_task("pruning content rows");
let stats = match handle.raw_mut().prune_content_older_than(&cutoff) {
Ok(s) => s,
Err(err) => {
progress.finish_and_clear();
return report_ledger_error(&err, globals);
}
};
progress.finish_and_clear();
let _ = args.force;
if globals.json {
let payload = serde_json::json!({
"rowsDeleted": stats.rows_deleted,
"bytesFreed": stats.bytes_freed,
"cutoff": cutoff,
});
if let Err(err) = render_json(&payload) {
return report_error(&err, globals);
}
} else {
println!(
"pruned {} content row{} ({})",
format_int(stats.rows_deleted as u64),
if stats.rows_deleted == 1 { "" } else { "s" },
format_bytes(stats.bytes_freed.max(0) as u64)
);
}
0
}
fn format_cutoff_ts(ms: u64) -> String {
let secs = ms / 1_000;
let nanos = (ms % 1_000) * 1_000_000;
format!("ts:{:020}.{:09}", secs, nanos)
}
fn parse_retention(s: &str) -> Option<relayburn_sdk::Retention> {
let trimmed = s.trim();
if trimmed.is_empty() {
return None;
}
if trimmed.eq_ignore_ascii_case("forever") {
return Some(relayburn_sdk::Retention::Forever);
}
let n: f64 = trimmed.parse().ok()?;
if !n.is_finite() {
return None;
}
if n < 0.0 {
return Some(relayburn_sdk::Retention::Forever);
}
Some(relayburn_sdk::Retention::Days(n))
}
fn run_reset(globals: &GlobalArgs, args: crate::cli::StateResetArgs) -> i32 {
let progress = TaskProgress::new(globals, "state");
let opts = LedgerOpenOptions {
home: globals.ledger_path.clone(),
content_home: None,
};
progress.set_task("opening ledger");
let mut handle = match Ledger::open(opts) {
Ok(h) => h,
Err(err) => {
progress.finish_and_clear();
return report_anyhow(&err, globals);
}
};
if !args.force {
progress.set_task("counting reset targets");
let summary = match handle.raw().count_reset_targets() {
Ok(s) => s,
Err(err) => {
progress.finish_and_clear();
return report_ledger_error(&err, globals);
}
};
progress.finish_and_clear();
return print_reset_report(globals, &summary, false, None);
}
progress.set_task("resetting derived state");
let summary = match handle.raw_mut().reset() {
Ok(s) => s,
Err(err) => {
progress.finish_and_clear();
return report_ledger_error(&err, globals);
}
};
let ingest_report = if args.reingest {
match run_reset_reingest(&mut handle, globals.ledger_path.clone(), &progress) {
Ok(r) => Some(r),
Err(err) => {
progress.finish_and_clear();
return report_error(&err, globals);
}
}
} else {
None
};
progress.finish_and_clear();
print_reset_report(globals, &summary, true, ingest_report.as_ref())
}
fn run_reset_reingest(
handle: &mut LedgerHandle,
ledger_home: Option<std::path::PathBuf>,
progress: &TaskProgress,
) -> anyhow::Result<relayburn_sdk::IngestReport> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
progress.set_task("re-ingesting sessions");
let opts = progress.ingest_options(ledger_home);
rt.block_on(ingest_all(handle.raw_mut(), &opts))
}
fn print_reset_report(
globals: &GlobalArgs,
summary: &ResetSummary,
executed: bool,
ingest_report: Option<&relayburn_sdk::IngestReport>,
) -> i32 {
if globals.json {
let mut payload = serde_json::json!({
"executed": executed,
"rowsDropped": summary.rows_dropped,
"stampsDropped": summary.stamps_dropped,
"contentRowsDropped": summary.content_rows_dropped,
});
if let Some(report) = ingest_report {
payload["reingest"] = serde_json::json!({
"scannedSessions": report.scanned_sessions,
"ingestedSessions": report.ingested_sessions,
"appendedTurns": report.appended_turns,
"appliedPendingStamps": report.applied_pending_stamps,
});
}
if let Err(err) = render_json(&payload) {
return report_error(&err, globals);
}
return 0;
}
if executed {
println!(
"reset derived state: dropped {} event row{} + {} stamp{} + {} content row{}",
format_int(summary.rows_dropped as u64),
if summary.rows_dropped == 1 { "" } else { "s" },
format_int(summary.stamps_dropped as u64),
if summary.stamps_dropped == 1 { "" } else { "s" },
format_int(summary.content_rows_dropped as u64),
if summary.content_rows_dropped == 1 { "" } else { "s" },
);
match ingest_report {
Some(report) => {
println!(
" re-ingested {} session{} (+{} turn{}).",
format_int(report.ingested_sessions as u64),
if report.ingested_sessions == 1 { "" } else { "s" },
format_int(report.appended_turns as u64),
if report.appended_turns == 1 { "" } else { "s" },
);
}
None => {
println!(
" re-ingest from upstream session files via 'burn ingest' to \
repopulate (or re-run with --reingest)."
);
}
}
} else {
println!(
"burn state reset (dry run): would drop {} event row{} + {} stamp{} + {} content row{}.",
format_int(summary.rows_dropped as u64),
if summary.rows_dropped == 1 { "" } else { "s" },
format_int(summary.stamps_dropped as u64),
if summary.stamps_dropped == 1 { "" } else { "s" },
format_int(summary.content_rows_dropped as u64),
if summary.content_rows_dropped == 1 { "" } else { "s" },
);
println!(" re-run with --force to actually wipe (add --reingest to repopulate).");
}
0
}
fn report_anyhow(err: &anyhow::Error, globals: &GlobalArgs) -> i32 {
if let Some(le) = err.downcast_ref::<relayburn_sdk::LedgerError>() {
return report_ledger_error(le, globals);
}
report_error(err, globals)
}
#[cfg(test)]
mod tests {
use super::{format_cutoff_ts, rel_to_home};
fn writer_style_ts(secs: u64, nanos_part: u64) -> String {
format!("ts:{:020}.{:09}", secs, nanos_part)
}
#[test]
fn cutoff_matches_writer_format_byte_for_byte() {
let ms = 1_234_567u64;
let writer = writer_style_ts(1_234, 567_000_000);
assert_eq!(format_cutoff_ts(ms), writer);
}
#[test]
fn cutoff_is_lex_comparable_against_writer_rows() {
let cutoff = format_cutoff_ts(2_000); let earlier_row = writer_style_ts(1, 500_000_000); let later_row = writer_style_ts(2, 500_000_000); assert!(earlier_row.as_str() < cutoff.as_str());
assert!(later_row.as_str() > cutoff.as_str());
}
#[test]
fn cutoff_padding_widths_are_stable() {
assert_eq!(format_cutoff_ts(0).len(), 33);
assert_eq!(format_cutoff_ts(u64::MAX).len(), 33);
}
#[test]
fn rel_to_home_rewrites_paths_inside_home() {
assert_eq!(
rel_to_home("/x/home/burn.sqlite", "/x/home"),
"${RELAYBURN_HOME}/burn.sqlite"
);
assert_eq!(
rel_to_home("/x/home/sub/dir/file", "/x/home"),
"${RELAYBURN_HOME}/sub/dir/file"
);
}
#[test]
fn rel_to_home_rejects_byte_prefix_siblings() {
assert_eq!(
rel_to_home("/x/home2/burn.sqlite", "/x/home"),
"/x/home2/burn.sqlite"
);
assert_eq!(rel_to_home("/x/homer", "/x/home"), "/x/homer");
}
#[test]
fn rel_to_home_normalizes_trailing_slash_on_home() {
assert_eq!(
rel_to_home("/x/home/burn.sqlite", "/x/home/"),
"${RELAYBURN_HOME}/burn.sqlite"
);
assert_eq!(
rel_to_home("/x/home2/foo", "/x/home/"),
"/x/home2/foo"
);
}
#[test]
fn rel_to_home_handles_degenerate_home_inputs() {
assert_eq!(rel_to_home("/x/home/foo", ""), "/x/home/foo");
assert_eq!(rel_to_home("/x/home/foo", "/"), "/x/home/foo");
assert_eq!(rel_to_home("/x/home/foo", "//"), "/x/home/foo");
}
#[test]
fn rel_to_home_path_equals_home() {
assert_eq!(rel_to_home("/x/home", "/x/home"), "${RELAYBURN_HOME}/");
assert_eq!(rel_to_home("/x/home/", "/x/home"), "${RELAYBURN_HOME}/");
}
}