use anyhow::Result;
use rusqlite::Connection;
use crate::state::runtime::{RecoveryOrigin, RuntimeCheckpointState, RuntimeWorkingBufferState};
pub(crate) fn read_checkpoint(conn: &Connection) -> Result<Option<RuntimeCheckpointState>> {
let mut stmt = conn.prepare(
"SELECT checkpoint_origin, checkpoint_captured_at_epoch_s,
checkpoint_session_started_at_epoch_s, checkpoint_summary,
checkpoint_immediate_actions, checkpoint_key_files
FROM recovery WHERE id = 1",
)?;
let result = stmt.query_row([], |row| {
let origin: Option<String> = row.get(0)?;
let captured: Option<u64> = row.get(1)?;
let session_started: Option<u64> = row.get(2)?;
let summary: Option<String> = row.get(3)?;
let actions: Option<String> = row.get(4)?;
let files: Option<String> = row.get(5)?;
Ok((origin, captured, session_started, summary, actions, files))
});
match result {
Ok((
Some(origin),
Some(captured),
Some(session_started),
Some(summary),
actions,
files,
)) => Ok(Some(RuntimeCheckpointState {
origin: parse_origin(&origin)?,
captured_at_epoch_s: captured,
session_started_at_epoch_s: session_started,
summary,
immediate_actions: parse_json_vec(actions.as_deref())?,
key_files: parse_json_vec(files.as_deref())?,
})),
Ok(_) => Ok(None),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub(crate) fn read_working_buffer(conn: &Connection) -> Result<Option<RuntimeWorkingBufferState>> {
let mut stmt = conn.prepare(
"SELECT buffer_origin, buffer_captured_at_epoch_s,
buffer_session_started_at_epoch_s, buffer_summary_lines
FROM recovery WHERE id = 1",
)?;
let result = stmt.query_row([], |row| {
let origin: Option<String> = row.get(0)?;
let captured: Option<u64> = row.get(1)?;
let session_started: Option<u64> = row.get(2)?;
let summary_lines: Option<String> = row.get(3)?;
Ok((origin, captured, session_started, summary_lines))
});
match result {
Ok((Some(origin), Some(captured), Some(session_started), Some(summary_lines))) => {
Ok(Some(RuntimeWorkingBufferState {
origin: parse_origin(&origin)?,
captured_at_epoch_s: captured,
session_started_at_epoch_s: session_started,
summary_lines: serde_json::from_str(&summary_lines)?,
}))
}
Ok(_) => Ok(None),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e.into()),
}
}
pub(crate) fn write_checkpoint(conn: &Connection, state: &RuntimeCheckpointState) -> Result<()> {
ensure_row(conn)?;
conn.execute(
"UPDATE recovery SET
checkpoint_origin = ?1,
checkpoint_captured_at_epoch_s = ?2,
checkpoint_session_started_at_epoch_s = ?3,
checkpoint_summary = ?4,
checkpoint_immediate_actions = ?5,
checkpoint_key_files = ?6
WHERE id = 1",
rusqlite::params![
origin_to_str(state.origin),
state.captured_at_epoch_s,
state.session_started_at_epoch_s,
state.summary,
serde_json::to_string(&state.immediate_actions)?,
serde_json::to_string(&state.key_files)?,
],
)?;
Ok(())
}
pub(crate) fn write_working_buffer(
conn: &Connection,
state: &RuntimeWorkingBufferState,
) -> Result<()> {
ensure_row(conn)?;
conn.execute(
"UPDATE recovery SET
buffer_origin = ?1,
buffer_captured_at_epoch_s = ?2,
buffer_session_started_at_epoch_s = ?3,
buffer_summary_lines = ?4
WHERE id = 1",
rusqlite::params![
origin_to_str(state.origin),
state.captured_at_epoch_s,
state.session_started_at_epoch_s,
serde_json::to_string(&state.summary_lines)?,
],
)?;
Ok(())
}
fn ensure_row(conn: &Connection) -> Result<()> {
conn.execute("INSERT OR IGNORE INTO recovery (id) VALUES (1)", [])?;
Ok(())
}
fn origin_to_str(origin: RecoveryOrigin) -> &'static str {
match origin {
RecoveryOrigin::Compaction => "compaction",
RecoveryOrigin::RiskyPause => "risky_pause",
RecoveryOrigin::Manual => "manual",
}
}
fn parse_origin(s: &str) -> Result<RecoveryOrigin> {
match s {
"compaction" => Ok(RecoveryOrigin::Compaction),
"risky_pause" => Ok(RecoveryOrigin::RiskyPause),
"manual" => Ok(RecoveryOrigin::Manual),
other => anyhow::bail!("unknown recovery origin: {other}"),
}
}
fn parse_json_vec(json: Option<&str>) -> Result<Vec<String>> {
match json {
Some(s) => Ok(serde_json::from_str(s)?),
None => Ok(Vec::new()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::schema;
fn test_conn() -> Connection {
let conn = Connection::open_in_memory().unwrap();
schema::initialize(&conn).unwrap();
conn
}
#[test]
fn read_returns_none_when_empty() {
let conn = test_conn();
assert!(read_checkpoint(&conn).unwrap().is_none());
assert!(read_working_buffer(&conn).unwrap().is_none());
}
#[test]
fn checkpoint_roundtrip() {
let conn = test_conn();
let state = RuntimeCheckpointState {
origin: RecoveryOrigin::Compaction,
captured_at_epoch_s: 1_000_000,
session_started_at_epoch_s: 999_000,
summary: "Implementing SQLite migration".to_owned(),
immediate_actions: vec!["Continue with table ops".to_owned()],
key_files: vec!["src/db/mod.rs".to_owned(), "src/db/schema.rs".to_owned()],
};
write_checkpoint(&conn, &state).unwrap();
let loaded = read_checkpoint(&conn).unwrap().expect("should exist");
assert_eq!(loaded.origin, RecoveryOrigin::Compaction);
assert_eq!(loaded.captured_at_epoch_s, 1_000_000);
assert_eq!(loaded.summary, "Implementing SQLite migration");
assert_eq!(loaded.immediate_actions, vec!["Continue with table ops"]);
assert_eq!(loaded.key_files.len(), 2);
}
#[test]
fn working_buffer_roundtrip() {
let conn = test_conn();
let state = RuntimeWorkingBufferState {
origin: RecoveryOrigin::RiskyPause,
captured_at_epoch_s: 1_000_000,
session_started_at_epoch_s: 999_000,
summary_lines: vec!["Line 1".to_owned(), "Line 2".to_owned()],
};
write_working_buffer(&conn, &state).unwrap();
let loaded = read_working_buffer(&conn).unwrap().expect("should exist");
assert_eq!(loaded.origin, RecoveryOrigin::RiskyPause);
assert_eq!(loaded.summary_lines, vec!["Line 1", "Line 2"]);
}
#[test]
fn checkpoint_and_buffer_coexist() {
let conn = test_conn();
let checkpoint = RuntimeCheckpointState {
origin: RecoveryOrigin::Manual,
captured_at_epoch_s: 1_000_000,
session_started_at_epoch_s: 999_000,
summary: "Test".to_owned(),
immediate_actions: vec![],
key_files: vec![],
};
let buffer = RuntimeWorkingBufferState {
origin: RecoveryOrigin::Compaction,
captured_at_epoch_s: 1_000_100,
session_started_at_epoch_s: 999_000,
summary_lines: vec!["Recent work".to_owned()],
};
write_checkpoint(&conn, &checkpoint).unwrap();
write_working_buffer(&conn, &buffer).unwrap();
let loaded_cp = read_checkpoint(&conn).unwrap().expect("checkpoint");
let loaded_buf = read_working_buffer(&conn).unwrap().expect("buffer");
assert_eq!(loaded_cp.origin, RecoveryOrigin::Manual);
assert_eq!(loaded_buf.origin, RecoveryOrigin::Compaction);
}
fn delete(conn: &Connection) -> Result<()> {
conn.execute("DELETE FROM recovery WHERE id = 1", [])?;
Ok(())
}
#[test]
fn delete_removes_both() {
let conn = test_conn();
write_checkpoint(
&conn,
&RuntimeCheckpointState {
origin: RecoveryOrigin::Manual,
captured_at_epoch_s: 1,
session_started_at_epoch_s: 1,
summary: "x".to_owned(),
immediate_actions: vec![],
key_files: vec![],
},
)
.unwrap();
delete(&conn).unwrap();
assert!(read_checkpoint(&conn).unwrap().is_none());
assert!(read_working_buffer(&conn).unwrap().is_none());
}
}