use std::fs;
use std::io;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use crate::event::parser;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RecoveryReport {
pub shard_path: PathBuf,
pub events_preserved: usize,
pub events_discarded: usize,
pub corruption_offset: Option<u64>,
pub action_taken: RecoveryAction,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RecoveryAction {
Truncated {
bytes_removed: u64,
},
Quarantined {
backup_path: PathBuf,
},
NoActionNeeded,
}
#[derive(Debug, thiserror::Error)]
pub enum RecoveryError {
#[error("recovery I/O error: {0}")]
Io(#[from] io::Error),
#[error("shard file not found: {}", .0.display())]
ShardNotFound(PathBuf),
#[error("events directory not found: {}", .0.display())]
EventsDirNotFound(PathBuf),
#[error("invalid database path: {}", .0.display())]
InvalidDbPath(PathBuf),
#[error("rebuild failed: {0}")]
RebuildFailed(String),
#[error("database locked after {0:?} — another process may hold the lock")]
LockTimeout(Duration),
}
pub fn recover_partial_write(path: &Path) -> Result<u64, RecoveryError> {
if !path.exists() {
return Err(RecoveryError::ShardNotFound(path.to_path_buf()));
}
let content = fs::read(path)?;
if content.is_empty() || content.last() == Some(&b'\n') {
return Ok(0);
}
let last_newline = content.iter().rposition(|&b| b == b'\n');
let truncate_to = last_newline.map_or(0, |pos| pos + 1);
let bytes_removed = content.len() - truncate_to;
let file = fs::OpenOptions::new().write(true).open(path)?;
file.set_len(truncate_to as u64)?;
tracing::warn!(
path = %path.display(),
bytes_removed,
"torn write repaired: truncated incomplete trailing line"
);
Ok(bytes_removed as u64)
}
pub fn recover_corrupt_shard(path: &Path) -> Result<RecoveryReport, RecoveryError> {
if !path.exists() {
return Err(RecoveryError::ShardNotFound(path.to_path_buf()));
}
let content = fs::read_to_string(path).map_err(|e| {
tracing::error!(path = %path.display(), error = %e, "shard is not valid UTF-8");
RecoveryError::Io(e)
})?;
if content.is_empty() {
return Ok(RecoveryReport {
shard_path: path.to_path_buf(),
events_preserved: 0,
events_discarded: 0,
corruption_offset: None,
action_taken: RecoveryAction::NoActionNeeded,
});
}
let lines: Vec<&str> = content.lines().collect();
let mut events_preserved = 0;
let mut first_bad_line = None;
for (i, line) in lines.iter().enumerate() {
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with('#') {
continue;
}
if parser::parse_line(line).is_ok() {
events_preserved += 1;
} else {
first_bad_line = Some(i);
break;
}
}
if first_bad_line.is_none() {
return Ok(RecoveryReport {
shard_path: path.to_path_buf(),
events_preserved,
events_discarded: 0,
corruption_offset: None,
action_taken: RecoveryAction::NoActionNeeded,
});
}
let bad_idx = first_bad_line.expect("checked is_some above");
let events_discarded = lines[bad_idx..]
.iter()
.filter(|l| {
let t = l.trim();
!t.is_empty() && !t.starts_with('#')
})
.count();
let corruption_offset: u64 = content
.lines()
.take(bad_idx)
.map(|l| l.len() as u64 + 1) .sum();
let backup_path = path.with_extension("corrupt");
let corrupt_content: String = lines[bad_idx..].iter().fold(String::new(), |mut acc, l| {
use std::fmt::Write;
let _ = writeln!(acc, "{l}");
acc
});
fs::write(&backup_path, &corrupt_content)?;
let valid_content: String = lines[..bad_idx].iter().fold(String::new(), |mut acc, l| {
use std::fmt::Write;
let _ = writeln!(acc, "{l}");
acc
});
fs::write(path, &valid_content)?;
tracing::warn!(
path = %path.display(),
events_preserved,
events_discarded,
corruption_offset,
backup = %backup_path.display(),
"corrupt shard recovered: quarantined bad data to backup file"
);
Ok(RecoveryReport {
shard_path: path.to_path_buf(),
events_preserved,
events_discarded,
corruption_offset: Some(corruption_offset),
action_taken: RecoveryAction::Quarantined { backup_path },
})
}
pub fn recover_missing_db(
events_dir: &Path,
db_path: &Path,
) -> Result<RecoveryReport, RecoveryError> {
if !events_dir.exists() {
return Err(RecoveryError::EventsDirNotFound(events_dir.to_path_buf()));
}
let db_existed = db_path.exists();
if db_existed {
let backup_path = db_path.with_extension("db.corrupt");
if let Err(e) = fs::copy(db_path, &backup_path) {
tracing::warn!(
error = %e,
"could not back up corrupt DB before rebuild"
);
}
}
let rebuild_result = crate::db::rebuild::rebuild(events_dir, db_path)
.map_err(|e| RecoveryError::RebuildFailed(e.to_string()))?;
let action = if db_existed {
let backup_path = db_path.with_extension("db.corrupt");
tracing::info!(
events = rebuild_result.event_count,
items = rebuild_result.item_count,
elapsed_ms = rebuild_result.elapsed.as_millis(),
"rebuilt corrupt projection from event log"
);
RecoveryAction::Quarantined { backup_path }
} else {
tracing::info!(
events = rebuild_result.event_count,
items = rebuild_result.item_count,
elapsed_ms = rebuild_result.elapsed.as_millis(),
"rebuilt missing projection from event log"
);
RecoveryAction::NoActionNeeded
};
Ok(RecoveryReport {
shard_path: db_path.to_path_buf(),
events_preserved: rebuild_result.event_count,
events_discarded: 0,
corruption_offset: None,
action_taken: action,
})
}
pub fn recover_corrupt_cache(cache_path: &Path) -> Result<bool, RecoveryError> {
if !cache_path.exists() {
return Ok(false);
}
fs::remove_file(cache_path)?;
tracing::info!(
path = %cache_path.display(),
"deleted corrupt binary cache — will be rebuilt on next access"
);
Ok(true)
}
pub fn open_db_with_retry(
db_path: &Path,
timeout: Duration,
) -> Result<rusqlite::Connection, RecoveryError> {
let start = Instant::now();
let mut delay = Duration::from_millis(50);
let max_delay = Duration::from_secs(2);
loop {
match crate::db::open_projection(db_path) {
Ok(conn) => {
match conn.execute_batch("SELECT 1") {
Ok(()) => return Ok(conn),
Err(e) if is_locked_error(&e) => {
tracing::debug!(
elapsed_ms = start.elapsed().as_millis(),
"database locked, retrying..."
);
}
Err(e) => {
return Err(RecoveryError::Io(io::Error::other(e.to_string())));
}
}
}
Err(e) => {
let err_str = e.to_string();
if err_str.contains("locked") || err_str.contains("busy") {
tracing::debug!(
elapsed_ms = start.elapsed().as_millis(),
"database locked on open, retrying..."
);
} else {
return Err(RecoveryError::Io(io::Error::other(err_str)));
}
}
}
if start.elapsed() >= timeout {
return Err(RecoveryError::LockTimeout(timeout));
}
std::thread::sleep(delay);
delay = (delay * 2).min(max_delay);
}
}
fn is_locked_error(e: &rusqlite::Error) -> bool {
if let rusqlite::Error::SqliteFailure(err, _) = e {
matches!(
err.code,
rusqlite::ffi::ErrorCode::DatabaseBusy | rusqlite::ffi::ErrorCode::DatabaseLocked
)
} else {
let s = e.to_string();
s.contains("locked") || s.contains("busy")
}
}
#[derive(Debug, Clone)]
pub struct HealthCheckResult {
pub project_valid: bool,
pub torn_write_repairs: Vec<(PathBuf, u64)>,
pub db_rebuilt: bool,
pub caches_cleaned: usize,
pub warnings: Vec<String>,
}
pub fn auto_recover(bones_dir: &Path) -> Result<HealthCheckResult, RecoveryError> {
let mut result = HealthCheckResult {
project_valid: false,
torn_write_repairs: Vec::new(),
db_rebuilt: false,
caches_cleaned: 0,
warnings: Vec::new(),
};
if !bones_dir.exists() || !bones_dir.is_dir() {
return Ok(result); }
result.project_valid = true;
let events_dir = bones_dir.join("events");
let db_path = bones_dir.join("bones.db");
let cache_dir = bones_dir.join("cache");
if events_dir.exists() {
match fs::read_dir(&events_dir) {
Ok(entries) => {
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|e| e.to_str()) == Some("events") {
match recover_partial_write(&path) {
Ok(bytes) if bytes > 0 => {
result.torn_write_repairs.push((path, bytes));
}
Ok(_) => {} Err(e) => {
result.warnings.push(format!(
"torn-write check failed for {}: {e}",
path.display()
));
}
}
}
}
}
Err(e) => {
result.warnings.push(format!("cannot read events dir: {e}"));
}
}
}
if events_dir.exists() {
let need_rebuild = !db_path.exists()
|| crate::db::open_projection(&db_path).map_or(true, |conn| {
conn.execute_batch("SELECT COUNT(*) FROM items").is_err()
});
if need_rebuild {
match recover_missing_db(&events_dir, &db_path) {
Ok(_report) => {
result.db_rebuilt = true;
}
Err(e) => {
result.warnings.push(format!("DB rebuild failed: {e}"));
}
}
}
}
if cache_dir.exists() {
let cache_events_bin = cache_dir.join("events.bin");
if cache_events_bin.exists() {
let is_valid = fs::read(&cache_events_bin)
.map(|data| data.len() >= 4 && &data[..4] == b"BCEV")
.unwrap_or(false);
if !is_valid {
match recover_corrupt_cache(&cache_events_bin) {
Ok(true) => result.caches_cleaned += 1,
Ok(false) => {}
Err(e) => {
result.warnings.push(format!("cache cleanup failed: {e}"));
}
}
}
}
}
tracing::info!(
torn_writes = result.torn_write_repairs.len(),
db_rebuilt = result.db_rebuilt,
caches_cleaned = result.caches_cleaned,
warnings = result.warnings.len(),
"auto-recovery complete"
);
Ok(result)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::TempDir;
#[test]
fn partial_write_clean_file() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.events");
fs::write(&path, "line1\nline2\n").unwrap();
let bytes = recover_partial_write(&path).unwrap();
assert_eq!(bytes, 0);
let content = fs::read_to_string(&path).unwrap();
assert_eq!(content, "line1\nline2\n");
}
#[test]
fn partial_write_truncates_incomplete_line() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.events");
fs::write(&path, "line1\nline2\npartial").unwrap();
let bytes = recover_partial_write(&path).unwrap();
assert_eq!(bytes, 7);
let content = fs::read_to_string(&path).unwrap();
assert_eq!(content, "line1\nline2\n");
}
#[test]
fn partial_write_no_complete_lines() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.events");
fs::write(&path, "no newline at all").unwrap();
let bytes = recover_partial_write(&path).unwrap();
assert_eq!(bytes, 17);
let content = fs::read_to_string(&path).unwrap();
assert_eq!(content, "");
}
#[test]
fn partial_write_empty_file() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.events");
fs::write(&path, "").unwrap();
let bytes = recover_partial_write(&path).unwrap();
assert_eq!(bytes, 0);
}
#[test]
fn partial_write_nonexistent_file() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("nope.events");
let result = recover_partial_write(&path);
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
RecoveryError::ShardNotFound(_)
));
}
#[test]
fn corrupt_shard_clean_file() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.events");
fs::write(&path, "# bones event log v1\n# comment\n\n").unwrap();
let report = recover_corrupt_shard(&path).unwrap();
assert_eq!(report.events_preserved, 0);
assert_eq!(report.events_discarded, 0);
assert_eq!(report.action_taken, RecoveryAction::NoActionNeeded);
}
#[test]
fn corrupt_shard_empty_file() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.events");
fs::write(&path, "").unwrap();
let report = recover_corrupt_shard(&path).unwrap();
assert_eq!(report.events_preserved, 0);
assert_eq!(report.action_taken, RecoveryAction::NoActionNeeded);
}
#[test]
fn corrupt_shard_with_bad_data() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("test.events");
fs::write(&path, "# header\nthis is garbage data\nmore garbage\n").unwrap();
let report = recover_corrupt_shard(&path).unwrap();
assert_eq!(report.events_preserved, 0);
assert_eq!(report.events_discarded, 2);
assert!(report.corruption_offset.is_some());
match &report.action_taken {
RecoveryAction::Quarantined { backup_path } => {
assert!(backup_path.exists());
let backup = fs::read_to_string(backup_path).unwrap();
assert!(backup.contains("garbage data"));
}
_ => panic!("expected Quarantined"),
}
let content = fs::read_to_string(&path).unwrap();
assert_eq!(content, "# header\n");
}
#[test]
fn corrupt_shard_nonexistent_file() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("nope.events");
let result = recover_corrupt_shard(&path);
assert!(result.is_err());
}
#[test]
fn cache_recovery_deletes_file() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("events.bin");
fs::write(&path, "corrupt data").unwrap();
let deleted = recover_corrupt_cache(&path).unwrap();
assert!(deleted);
assert!(!path.exists());
}
#[test]
fn cache_recovery_nonexistent_file() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("events.bin");
let deleted = recover_corrupt_cache(&path).unwrap();
assert!(!deleted);
}
#[test]
fn missing_db_no_events_dir() {
let dir = TempDir::new().unwrap();
let events_dir = dir.path().join("events");
let db_path = dir.path().join("bones.db");
let result = recover_missing_db(&events_dir, &db_path);
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
RecoveryError::EventsDirNotFound(_)
));
}
#[test]
fn missing_db_empty_events() {
let dir = TempDir::new().unwrap();
let bones_dir = dir.path();
let shard_mgr = crate::shard::ShardManager::new(bones_dir);
shard_mgr.ensure_dirs().expect("ensure dirs");
shard_mgr.init().expect("init");
let events_dir = bones_dir.join("events");
let db_path = bones_dir.join("bones.db");
let report = recover_missing_db(&events_dir, &db_path).unwrap();
assert_eq!(report.events_preserved, 0);
assert!(db_path.exists());
}
#[test]
fn missing_db_with_events_rebuilds() {
let dir = TempDir::new().unwrap();
let bones_dir = dir.path();
let shard_mgr = crate::shard::ShardManager::new(bones_dir);
shard_mgr.ensure_dirs().expect("ensure dirs");
shard_mgr.init().expect("init");
use crate::event::Event;
use crate::event::data::*;
use crate::event::types::EventType;
use crate::event::writer;
use crate::model::item::{Kind, Size, Urgency};
use crate::model::item_id::ItemId;
use std::collections::BTreeMap;
let mut event = Event {
wall_ts_us: 1000,
agent: "test".into(),
itc: "itc:AQ".into(),
parents: vec![],
event_type: EventType::Create,
item_id: ItemId::new_unchecked("bn-001"),
data: EventData::Create(CreateData {
title: "Test item".into(),
kind: Kind::Task,
size: Some(Size::M),
urgency: Urgency::Default,
labels: vec![],
parent: None,
causation: None,
description: None,
extra: BTreeMap::new(),
}),
event_hash: String::new(),
};
writer::write_event(&mut event).expect("hash");
let line = writer::write_line(&event).expect("serialize");
let (year, month) = shard_mgr.active_shard().unwrap().unwrap();
shard_mgr.append_raw(year, month, &line).expect("append");
let events_dir = bones_dir.join("events");
let db_path = bones_dir.join("bones.db");
let report = recover_missing_db(&events_dir, &db_path).unwrap();
assert_eq!(report.events_preserved, 1);
assert!(db_path.exists());
let conn = crate::db::open_projection(&db_path).unwrap();
let title: String = conn
.query_row(
"SELECT title FROM items WHERE item_id = 'bn-001'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(title, "Test item");
}
#[test]
fn corrupt_db_is_backed_up_before_rebuild() {
let dir = TempDir::new().unwrap();
let bones_dir = dir.path();
let shard_mgr = crate::shard::ShardManager::new(bones_dir);
shard_mgr.ensure_dirs().expect("ensure dirs");
shard_mgr.init().expect("init");
let events_dir = bones_dir.join("events");
let db_path = bones_dir.join("bones.db");
fs::write(&db_path, "this is not sqlite").unwrap();
let report = recover_missing_db(&events_dir, &db_path).unwrap();
let backup_path = db_path.with_extension("db.corrupt");
match &report.action_taken {
RecoveryAction::Quarantined { backup_path: bp } => {
assert_eq!(bp, &backup_path);
assert!(backup_path.exists());
let backup_content = fs::read_to_string(&backup_path).unwrap();
assert_eq!(backup_content, "this is not sqlite");
}
_ => panic!("expected Quarantined action"),
}
}
#[test]
fn auto_recover_nonexistent_project() {
let dir = TempDir::new().unwrap();
let bones_dir = dir.path().join(".bones");
let result = auto_recover(&bones_dir).unwrap();
assert!(!result.project_valid);
}
#[test]
fn auto_recover_healthy_project() {
let dir = TempDir::new().unwrap();
let bones_dir = dir.path();
let shard_mgr = crate::shard::ShardManager::new(bones_dir);
shard_mgr.ensure_dirs().expect("ensure dirs");
shard_mgr.init().expect("init");
let events_dir = bones_dir.join("events");
let db_path = bones_dir.join("bones.db");
crate::db::rebuild::rebuild(&events_dir, &db_path).unwrap();
let result = auto_recover(bones_dir).unwrap();
assert!(result.project_valid);
assert!(result.torn_write_repairs.is_empty());
assert!(!result.db_rebuilt);
assert_eq!(result.caches_cleaned, 0);
assert!(result.warnings.is_empty());
}
#[test]
fn auto_recover_repairs_torn_write() {
let dir = TempDir::new().unwrap();
let bones_dir = dir.path();
let shard_mgr = crate::shard::ShardManager::new(bones_dir);
shard_mgr.ensure_dirs().expect("ensure dirs");
shard_mgr.init().expect("init");
let events_dir = bones_dir.join("events");
let db_path = bones_dir.join("bones.db");
crate::db::rebuild::rebuild(&events_dir, &db_path).unwrap();
let (year, month) = shard_mgr.active_shard().unwrap().unwrap();
let shard_path = events_dir.join(format!("{year:04}-{month:02}.events"));
let mut file = fs::OpenOptions::new()
.append(true)
.open(&shard_path)
.unwrap();
file.write_all(b"incomplete line without newline").unwrap();
let result = auto_recover(bones_dir).unwrap();
assert!(result.project_valid);
assert_eq!(result.torn_write_repairs.len(), 1);
assert!(result.torn_write_repairs[0].1 > 0);
}
#[test]
fn auto_recover_rebuilds_missing_db() {
let dir = TempDir::new().unwrap();
let bones_dir = dir.path();
let shard_mgr = crate::shard::ShardManager::new(bones_dir);
shard_mgr.ensure_dirs().expect("ensure dirs");
shard_mgr.init().expect("init");
let result = auto_recover(bones_dir).unwrap();
assert!(result.project_valid);
assert!(result.db_rebuilt);
}
#[test]
fn auto_recover_cleans_corrupt_cache() {
let dir = TempDir::new().unwrap();
let bones_dir = dir.path();
let shard_mgr = crate::shard::ShardManager::new(bones_dir);
shard_mgr.ensure_dirs().expect("ensure dirs");
shard_mgr.init().expect("init");
let events_dir = bones_dir.join("events");
let db_path = bones_dir.join("bones.db");
crate::db::rebuild::rebuild(&events_dir, &db_path).unwrap();
let cache_dir = bones_dir.join("cache");
fs::create_dir_all(&cache_dir).unwrap();
fs::write(cache_dir.join("events.bin"), "not a valid cache").unwrap();
let result = auto_recover(bones_dir).unwrap();
assert!(result.project_valid);
assert_eq!(result.caches_cleaned, 1);
assert!(!cache_dir.join("events.bin").exists());
}
#[test]
fn open_db_with_retry_succeeds_immediately() {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("test.db");
let conn = rusqlite::Connection::open(&db_path).unwrap();
conn.execute_batch("CREATE TABLE test (id INTEGER)")
.unwrap();
drop(conn);
let result = open_db_with_retry(&db_path, Duration::from_secs(1));
assert!(result.is_ok());
}
#[test]
fn open_db_with_retry_handles_missing_db() {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("test.db");
let result = open_db_with_retry(&db_path, Duration::from_secs(1));
assert!(result.is_ok());
}
#[test]
fn recovery_action_debug() {
let action = RecoveryAction::Truncated { bytes_removed: 42 };
let debug = format!("{action:?}");
assert!(debug.contains("42"));
let action = RecoveryAction::Quarantined {
backup_path: PathBuf::from("/tmp/test.corrupt"),
};
let debug = format!("{action:?}");
assert!(debug.contains("test.corrupt"));
}
#[test]
fn recovery_error_display() {
let err = RecoveryError::ShardNotFound(PathBuf::from("/tmp/test.events"));
let display = format!("{err}");
assert!(display.contains("not found"));
let err = RecoveryError::LockTimeout(Duration::from_secs(30));
let display = format!("{err}");
assert!(display.contains("30s"));
}
}