mod migrations;
use rusqlite::{params, Connection};
use serde::{Deserialize, Serialize};
use std::{
path::{Path, PathBuf},
sync::{Arc, Mutex},
time::Duration,
};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum StorageError {
#[error("io: {0}")]
Io(#[from] std::io::Error),
#[error("sqlite: {0}")]
Sqlite(#[from] rusqlite::Error),
#[error("migration: {0}")]
Migration(#[from] rusqlite_migration::Error),
#[error("ledger path: {0}")]
LedgerPath(String),
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LedgerRow {
pub session_id: String,
pub tool: String,
pub ts: i64,
pub role: String,
pub content: String,
pub tool_calls_json: Option<String>,
pub files_touched_json: Option<String>,
pub parent_id: Option<String>,
}
#[derive(Clone)]
pub struct Ledger {
conn: Arc<Mutex<Connection>>,
}
impl Ledger {
pub fn open(path: &Path) -> Result<Self, StorageError> {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent)?;
}
}
let mut conn = Connection::open(path)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o600))?;
}
conn.busy_timeout(Duration::from_millis(5000))?;
conn.execute_batch(
"PRAGMA journal_mode=WAL;
PRAGMA synchronous=NORMAL;
PRAGMA foreign_keys=ON;",
)?;
migrations::migrations().to_latest(&mut conn)?;
conn.cache_flush()?;
Ok(Self {
conn: Arc::new(Mutex::new(conn)),
})
}
pub fn default_path() -> Result<PathBuf, StorageError> {
let home = dirs::home_dir().ok_or_else(|| {
StorageError::LedgerPath("could not resolve home directory".to_string())
})?;
Self::default_path_in(&home)
}
fn default_path_in(home: &Path) -> Result<PathBuf, StorageError> {
let dir = home.join(".carryover");
#[cfg(unix)]
{
use std::os::unix::fs::DirBuilderExt;
std::fs::DirBuilder::new()
.recursive(true)
.mode(0o700)
.create(&dir)?;
}
#[cfg(not(unix))]
{
std::fs::create_dir_all(&dir)?;
}
Ok(dir.join("ledger.sqlite"))
}
pub fn insert(&self, row: &LedgerRow) -> Result<i64, StorageError> {
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT INTO events
(session_id, tool, ts, role, content,
tool_calls_json, files_touched_json, parent_id)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
params![
row.session_id,
row.tool,
row.ts,
row.role,
row.content,
row.tool_calls_json,
row.files_touched_json,
row.parent_id,
],
)?;
Ok(conn.last_insert_rowid())
}
pub fn insert_batch(&self, rows: &[LedgerRow]) -> Result<Vec<i64>, StorageError> {
let mut conn = self.conn.lock().unwrap();
let tx = conn.transaction()?;
let mut ids = Vec::with_capacity(rows.len());
for row in rows {
tx.execute(
"INSERT INTO events
(session_id, tool, ts, role, content,
tool_calls_json, files_touched_json, parent_id)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
params![
row.session_id,
row.tool,
row.ts,
row.role,
row.content,
row.tool_calls_json,
row.files_touched_json,
row.parent_id,
],
)?;
ids.push(tx.last_insert_rowid());
}
tx.commit()?;
Ok(ids)
}
pub fn query_session(&self, session_id: &str) -> Result<Vec<LedgerRow>, StorageError> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare_cached(
"SELECT session_id, tool, ts, role, content,
tool_calls_json, files_touched_json, parent_id
FROM events
WHERE session_id = ?1
ORDER BY ts ASC",
)?;
let rows = stmt
.query_map(params![session_id], map_row)?
.collect::<Result<Vec<_>, _>>()?;
Ok(rows)
}
pub fn query_recent(&self, tool: &str, limit: usize) -> Result<Vec<LedgerRow>, StorageError> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare_cached(
"SELECT session_id, tool, ts, role, content,
tool_calls_json, files_touched_json, parent_id
FROM events
WHERE tool = ?1
ORDER BY ts DESC
LIMIT ?2",
)?;
let rows = stmt
.query_map(params![tool, limit as i64], map_row)?
.collect::<Result<Vec<_>, _>>()?;
Ok(rows)
}
pub fn save_cursor(
&self,
tool: &str,
session_id: &str,
cursor_json: &str,
) -> Result<(), StorageError> {
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT INTO cursors (tool, session_id, cursor_json)
VALUES (?1, ?2, ?3)
ON CONFLICT(tool, session_id) DO UPDATE SET cursor_json = excluded.cursor_json",
params![tool, session_id, cursor_json],
)?;
Ok(())
}
pub fn load_cursor(
&self,
tool: &str,
session_id: &str,
) -> Result<Option<String>, StorageError> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare_cached(
"SELECT cursor_json FROM cursors WHERE tool = ?1 AND session_id = ?2",
)?;
match stmt.query_row(params![tool, session_id], |row| row.get::<_, String>(0)) {
Ok(s) => Ok(Some(s)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(StorageError::Sqlite(e)),
}
}
}
fn map_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<LedgerRow> {
Ok(LedgerRow {
session_id: row.get(0)?,
tool: row.get(1)?,
ts: row.get(2)?,
role: row.get(3)?,
content: row.get(4)?,
tool_calls_json: row.get(5)?,
files_touched_json: row.get(6)?,
parent_id: row.get(7)?,
})
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
fn sample_row(session_id: &str, ts: i64) -> LedgerRow {
LedgerRow {
session_id: session_id.to_string(),
tool: "claude".to_string(),
ts,
role: "user".to_string(),
content: "hello world".to_string(),
tool_calls_json: None,
files_touched_json: None,
parent_id: None,
}
}
#[test]
fn open_creates_db() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("ledger.sqlite");
let ledger = Ledger::open(&path).expect("open should succeed");
let id = ledger.insert(&sample_row("s1", 1000)).unwrap();
assert!(id > 0);
}
#[test]
fn open_is_idempotent() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("ledger.sqlite");
Ledger::open(&path).expect("first open");
Ledger::open(&path).expect("second open should not fail or re-apply migrations");
}
#[test]
fn insert_and_query_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let ledger = Ledger::open(&dir.path().join("ledger.sqlite")).unwrap();
let original = LedgerRow {
session_id: "sess-abc".to_string(),
tool: "cursor".to_string(),
ts: 9999,
role: "assistant".to_string(),
content: "some response".to_string(),
tool_calls_json: Some(r#"["edit"]"#.to_string()),
files_touched_json: Some(r#"["src/main.rs"]"#.to_string()),
parent_id: Some("parent-uuid".to_string()),
};
ledger.insert(&original).unwrap();
let rows = ledger.query_session("sess-abc").unwrap();
assert_eq!(rows.len(), 1);
let got = &rows[0];
assert_eq!(got.session_id, original.session_id);
assert_eq!(got.tool, original.tool);
assert_eq!(got.ts, original.ts);
assert_eq!(got.role, original.role);
assert_eq!(got.content, original.content);
assert_eq!(got.tool_calls_json, original.tool_calls_json);
assert_eq!(got.files_touched_json, original.files_touched_json);
assert_eq!(got.parent_id, original.parent_id);
}
#[test]
fn insert_batch_atomic() {
let dir = tempfile::tempdir().unwrap();
let ledger = Ledger::open(&dir.path().join("ledger.sqlite")).unwrap();
let rows: Vec<LedgerRow> = (0..100).map(|i| sample_row("batch-sess", i)).collect();
let ids = ledger.insert_batch(&rows).unwrap();
assert_eq!(ids.len(), 100);
for w in ids.windows(2) {
assert_eq!(w[1], w[0] + 1, "rowids should be sequential");
}
let stored = ledger.query_session("batch-sess").unwrap();
assert_eq!(stored.len(), 100);
}
#[test]
fn concurrent_writes() {
let dir = tempfile::tempdir().unwrap();
let ledger = Arc::new(Ledger::open(&dir.path().join("ledger.sqlite")).unwrap());
let handles: Vec<_> = (0..2)
.map(|t| {
let l = Arc::clone(&ledger);
std::thread::spawn(move || {
let rows: Vec<LedgerRow> = (0..500)
.map(|i| {
let mut r = sample_row("concurrent-sess", i + t * 500);
r.content = format!("thread {} row {}", t, i);
r
})
.collect();
l.insert_batch(&rows).expect("no SQLITE_BUSY expected")
})
})
.collect();
for h in handles {
h.join().expect("thread panicked");
}
let total = ledger.query_session("concurrent-sess").unwrap();
assert_eq!(total.len(), 1000, "all 1000 rows should be present");
}
#[test]
fn wal_mode_active() {
let dir = tempfile::tempdir().unwrap();
let ledger = Ledger::open(&dir.path().join("ledger.sqlite")).unwrap();
let mode: String = {
let conn = ledger.conn.lock().unwrap();
conn.query_row("PRAGMA journal_mode", [], |r| r.get(0))
.unwrap()
};
assert_eq!(mode, "wal");
}
#[test]
fn query_session_orders_by_ts() {
let dir = tempfile::tempdir().unwrap();
let ledger = Ledger::open(&dir.path().join("ledger.sqlite")).unwrap();
let rows = vec![
sample_row("order-sess", 300),
sample_row("order-sess", 100),
sample_row("order-sess", 200),
];
ledger.insert_batch(&rows).unwrap();
let result = ledger.query_session("order-sess").unwrap();
assert_eq!(result.len(), 3);
assert_eq!(result[0].ts, 100);
assert_eq!(result[1].ts, 200);
assert_eq!(result[2].ts, 300);
}
#[test]
fn default_path_creates_carryover_dir() {
let dir = tempfile::tempdir().unwrap();
let path = Ledger::default_path_in(dir.path()).expect("default_path_in should succeed");
let expected_dir = dir.path().join(".carryover");
assert!(expected_dir.exists(), ".carryover dir should be created");
assert_eq!(path, expected_dir.join("ledger.sqlite"));
}
#[cfg(unix)]
#[test]
fn default_path_in_sets_owner_only_perms() {
use std::os::unix::fs::PermissionsExt;
let dir = tempfile::tempdir().unwrap();
Ledger::default_path_in(dir.path()).unwrap();
let mode = std::fs::metadata(dir.path().join(".carryover"))
.unwrap()
.permissions()
.mode();
assert_eq!(mode & 0o777, 0o700, "expected 0o700 on ~/.carryover/");
}
#[cfg(unix)]
#[test]
fn open_sets_ledger_file_to_0600() {
use std::os::unix::fs::PermissionsExt;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("ledger.sqlite");
Ledger::open(&path).unwrap();
let mode = std::fs::metadata(&path).unwrap().permissions().mode();
assert_eq!(mode & 0o777, 0o600, "expected 0o600 on ledger.sqlite");
}
#[test]
fn query_recent_orders_filters_and_limits() {
let dir = tempfile::tempdir().unwrap();
let ledger = Ledger::open(&dir.path().join("ledger.sqlite")).unwrap();
let mut rows = Vec::new();
for i in 0..5 {
let mut r = sample_row("recent-sess", 1000 + i * 10);
r.tool = "claude".to_string();
r.content = format!("claude {}", i);
rows.push(r);
}
for i in 0..3 {
let mut r = sample_row("recent-sess", 2000 + i * 10);
r.tool = "cursor".to_string();
r.content = format!("cursor {}", i);
rows.push(r);
}
ledger.insert_batch(&rows).unwrap();
let recent = ledger.query_recent("claude", 3).unwrap();
assert_eq!(recent.len(), 3, "limit honored");
assert!(
recent.iter().all(|r| r.tool == "claude"),
"tool filter exclusive"
);
assert_eq!(recent[0].ts, 1040);
assert_eq!(recent[1].ts, 1030);
assert_eq!(recent[2].ts, 1020);
let all_cursor = ledger.query_recent("cursor", 100).unwrap();
assert_eq!(all_cursor.len(), 3);
let none = ledger.query_recent("codex", 10).unwrap();
assert!(none.is_empty());
}
}