use std::fs;
use std::io::{self, Write};
use std::path::Path;
use crate::error::{HeartbeatError, Result};
use crate::in_flight::{self, InFlightEntry};
use crate::inbox;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OrphanPolicy {
Retry,
DeadLetter,
Drop,
}
#[derive(Debug, Clone, PartialEq)]
pub enum RecoveryOutcome {
NothingToRecover,
StaleOrphanDeleted { entry_id: String },
ReQueued { entry_id: String },
DeadLettered { entry_id: String },
Dropped { entry_id: String },
}
pub fn recover(inbox_path: &Path, policy: OrphanPolicy) -> Result<RecoveryOutcome> {
let io_dir = inbox_path.parent().unwrap_or(Path::new("."));
let in_flight_path = in_flight::in_flight_file_for(inbox_path);
let responded_flag = io_dir.join(".responded");
let offset_file = inbox::offset_file_for(inbox_path);
let dead_letter_tmp = io_dir.join(".dead-letter.jsonl.tmp");
let _ = fs::remove_file(&dead_letter_tmp);
let entry = match InFlightEntry::read_from(&in_flight_path)? {
Some(e) => e,
None => {
let _ = fs::remove_file(&responded_flag);
return Ok(RecoveryOutcome::NothingToRecover);
}
};
let current_offset: u64 = inbox::read_offset(&offset_file)?.unwrap_or_default();
if current_offset >= entry.end_offset {
let _ = fs::remove_file(&in_flight_path);
let _ = fs::remove_file(&responded_flag);
return Ok(RecoveryOutcome::StaleOrphanDeleted {
entry_id: entry.entry_id,
});
}
match policy {
OrphanPolicy::Retry => {
inbox::write_offset(&offset_file, entry.start_offset)?;
let _ = fs::remove_file(&in_flight_path);
let _ = fs::remove_file(&responded_flag);
Ok(RecoveryOutcome::ReQueued {
entry_id: entry.entry_id,
})
}
OrphanPolicy::DeadLetter => {
let dead_letter_path = io_dir.join(".dead-letter.jsonl");
let dead_letter_tmp = io_dir.join(".dead-letter.jsonl.tmp");
let record = serde_json::json!({
"entry_id": entry.entry_id,
"start_offset": entry.start_offset,
"end_offset": entry.end_offset,
"raw_line": entry.raw_line,
"delivered_at": entry.delivered_at,
});
let new_line = format!("{}\n", record);
let dl_err = |e: io::Error| HeartbeatError::DeadLetterWrite {
path: dead_letter_path.clone(),
source: e,
};
let existing = match fs::read(&dead_letter_path) {
Ok(b) => b,
Err(e) if e.kind() == io::ErrorKind::NotFound => vec![],
Err(e) => return Err(dl_err(e)),
};
{
let mut f = fs::File::create(&dead_letter_tmp).map_err(dl_err)?;
f.write_all(&existing).map_err(dl_err)?;
f.write_all(new_line.as_bytes()).map_err(dl_err)?;
f.sync_all().map_err(dl_err)?;
}
fs::rename(&dead_letter_tmp, &dead_letter_path).map_err(dl_err)?;
inbox::write_offset(&offset_file, entry.end_offset)?;
let _ = fs::remove_file(&in_flight_path);
let _ = fs::remove_file(&responded_flag);
Ok(RecoveryOutcome::DeadLettered {
entry_id: entry.entry_id,
})
}
OrphanPolicy::Drop => {
inbox::write_offset(&offset_file, entry.end_offset)?;
let _ = fs::remove_file(&in_flight_path);
let _ = fs::remove_file(&responded_flag);
Ok(RecoveryOutcome::Dropped {
entry_id: entry.entry_id,
})
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::HeartbeatError;
use crate::hook;
use crate::in_flight::InFlightEntry;
use crate::inbox;
use std::io::Write;
use std::path::PathBuf;
use tempfile::TempDir;
fn make_inbox(dir: &TempDir) -> PathBuf {
dir.path().join("inbox.jsonl")
}
fn write_line(inbox: &Path, line: &str) {
let mut f = fs::OpenOptions::new()
.create(true)
.append(true)
.open(inbox)
.unwrap();
writeln!(f, "{}", line).unwrap();
}
fn in_flight(dir: &TempDir) -> PathBuf {
dir.path().join(".in-flight")
}
fn dead_letter(dir: &TempDir) -> PathBuf {
dir.path().join(".dead-letter.jsonl")
}
#[test]
fn no_in_flight_returns_nothing_to_recover() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
write_line(&inbox, "entry");
let outcome = recover(&inbox, OrphanPolicy::DeadLetter).unwrap();
assert_eq!(outcome, RecoveryOutcome::NothingToRecover);
}
#[test]
fn stale_orphan_deleted_when_cursor_past_end_offset() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
let offset_file = dir.path().join(".inbox-offset");
let in_flight_path = in_flight(&dir);
write_line(&inbox, "entry K");
let entry = InFlightEntry::new("entry K", 0, 8); entry.write_to(&in_flight_path).unwrap();
inbox::write_offset(&offset_file, 9).unwrap();
let outcome = recover(&inbox, OrphanPolicy::DeadLetter).unwrap();
match outcome {
RecoveryOutcome::StaleOrphanDeleted { .. } => {}
other => panic!("expected StaleOrphanDeleted, got {:?}", other),
}
assert!(!in_flight_path.exists());
}
#[test]
fn retry_resets_cursor_to_start_offset_no_duplicate() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
let offset_file = dir.path().join(".inbox-offset");
let in_flight_path = in_flight(&dir);
write_line(&inbox, "entry K");
write_line(&inbox, "entry K+1");
let inbox_before = fs::read_to_string(&inbox).unwrap();
hook::run(&inbox, &hook::Mode::Drain, 0).unwrap();
assert!(in_flight_path.exists());
let outcome = recover(&inbox, OrphanPolicy::Retry).unwrap();
match &outcome {
RecoveryOutcome::ReQueued { entry_id } => {
assert!(!entry_id.is_empty());
}
other => panic!("expected ReQueued, got {:?}", other),
}
assert!(!in_flight_path.exists());
let cur = inbox::read_offset(&offset_file).unwrap().unwrap_or(0);
assert_eq!(cur, 0, "cursor must be reset to start_offset of orphan");
let inbox_after = fs::read_to_string(&inbox).unwrap();
assert_eq!(
inbox_before, inbox_after,
"retry must not modify inbox contents — orphan is already in place"
);
let count = inbox_after.lines().filter(|l| *l == "entry K").count();
assert_eq!(count, 1, "entry K must appear exactly once after retry");
}
#[test]
fn retry_on_poison_entry_does_not_grow_inbox_over_5_cycles() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
let in_flight_path = in_flight(&dir);
write_line(&inbox, "poison");
let initial_line_count = fs::read_to_string(&inbox).unwrap().lines().count();
for cycle in 1..=5 {
hook::run(&inbox, &hook::Mode::Drain, 0).unwrap();
assert!(
in_flight_path.exists(),
"cycle {}: .in-flight must exist after delivery",
cycle
);
let _ = fs::remove_file(dir.path().join(".responded"));
recover(&inbox, OrphanPolicy::Retry).unwrap();
let line_count = fs::read_to_string(&inbox).unwrap().lines().count();
assert_eq!(
line_count, initial_line_count,
"cycle {}: inbox must not grow — had {} lines, now {} lines",
cycle, initial_line_count, line_count
);
}
}
#[test]
fn deadletter_moves_orphan_to_dead_letter_file() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
let offset_file = dir.path().join(".inbox-offset");
let in_flight_path = in_flight(&dir);
let dl_path = dead_letter(&dir);
write_line(&inbox, "entry K");
hook::run(&inbox, &hook::Mode::Drain, 0).unwrap();
assert!(in_flight_path.exists());
let outcome = recover(&inbox, OrphanPolicy::DeadLetter).unwrap();
match &outcome {
RecoveryOutcome::DeadLettered { entry_id } => {
assert!(!entry_id.is_empty());
}
other => panic!("expected DeadLettered, got {:?}", other),
}
assert!(!in_flight_path.exists());
assert!(dl_path.exists());
let dl_contents = fs::read_to_string(&dl_path).unwrap();
let parsed: serde_json::Value = serde_json::from_str(dl_contents.trim()).unwrap();
assert_eq!(parsed["raw_line"], "entry K");
let cur = inbox::read_offset(&offset_file).unwrap().unwrap_or(0);
assert!(cur > 0, "cursor must advance after dead-letter");
}
#[test]
fn deadletter_appends_multiple_orphans() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
let dl_path = dead_letter(&dir);
for entry_text in &["orphan one", "orphan two"] {
let _ = fs::remove_file(&inbox);
let _ = fs::remove_file(dir.path().join(".responded"));
inbox::write_offset(&dir.path().join(".inbox-offset"), 0).unwrap();
write_line(&inbox, entry_text);
hook::run(&inbox, &hook::Mode::Drain, 0).unwrap();
recover(&inbox, OrphanPolicy::DeadLetter).unwrap();
}
let dl_contents = fs::read_to_string(&dl_path).unwrap();
let lines: Vec<&str> = dl_contents.lines().collect();
assert_eq!(lines.len(), 2, "dead-letter must have two entries");
}
#[test]
fn drop_deletes_in_flight_and_advances_cursor() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
let offset_file = dir.path().join(".inbox-offset");
let in_flight_path = in_flight(&dir);
write_line(&inbox, "entry K");
hook::run(&inbox, &hook::Mode::Drain, 0).unwrap();
assert!(in_flight_path.exists());
let outcome = recover(&inbox, OrphanPolicy::Drop).unwrap();
match &outcome {
RecoveryOutcome::Dropped { entry_id } => {
assert!(!entry_id.is_empty());
}
other => panic!("expected Dropped, got {:?}", other),
}
assert!(!in_flight_path.exists());
let cur = inbox::read_offset(&offset_file).unwrap().unwrap_or(0);
assert!(cur > 0);
}
#[test]
fn fen_n1_happy_path_no_orphan() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
write_line(&inbox, "fen triage batch");
hook::run(&inbox, &hook::Mode::Drain, 0).unwrap(); hook::run(&inbox, &hook::Mode::Drain, 0).unwrap();
assert!(!in_flight(&dir).exists());
let outcome = recover(&inbox, OrphanPolicy::Drop).unwrap();
assert_eq!(outcome, RecoveryOutcome::NothingToRecover);
}
#[test]
fn fen_n1_failure_path_drop_policy() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
let in_flight_path = in_flight(&dir);
write_line(&inbox, "fen triage batch");
hook::run(&inbox, &hook::Mode::Drain, 0).unwrap();
assert!(in_flight_path.exists());
let outcome = recover(&inbox, OrphanPolicy::Drop).unwrap();
match &outcome {
RecoveryOutcome::Dropped { .. } => {}
other => panic!("expected Dropped for Fen policy, got {:?}", other),
}
assert!(!in_flight_path.exists());
}
#[test]
fn recover_errors_on_corrupt_in_flight() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
let in_flight_path = in_flight(&dir);
fs::write(&inbox, "some entry\n").unwrap();
fs::write(&in_flight_path, "{not valid json").unwrap();
let result = recover(&inbox, OrphanPolicy::DeadLetter);
match result {
Err(HeartbeatError::InFlightCorrupt { path, source: _ }) => {
assert_eq!(
path, in_flight_path,
"InFlightCorrupt path must be the .in-flight file path"
);
}
other => panic!(
"expected Err(HeartbeatError::InFlightCorrupt), got {:?}",
other
),
}
}
#[test]
fn corrupt_offset_recover_returns_offset_corrupt() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
let in_flight_path = in_flight(&dir);
let offset_file = dir.path().join(".inbox-offset");
write_line(&inbox, "entry K");
let entry = InFlightEntry::new("entry K", 0, 8);
entry.write_to(&in_flight_path).unwrap();
fs::write(&offset_file, "CORRUPT").unwrap();
let result = recover(&inbox, OrphanPolicy::DeadLetter);
match result {
Err(HeartbeatError::OffsetCorrupt { path, content }) => {
assert_eq!(path, offset_file, "path must be the offset file path");
assert_eq!(
content, "CORRUPT",
"content must be the trimmed string that failed to parse"
);
}
other => panic!(
"expected Err(HeartbeatError::OffsetCorrupt), got {:?}",
other
),
}
}
#[test]
fn retry_followed_by_hook_run_re_delivers_entry_k() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
let in_flight_path = in_flight(&dir);
write_line(&inbox, "entry K");
write_line(&inbox, "entry K+1");
let d1 = hook::run(&inbox, &hook::Mode::Drain, 0).unwrap();
assert_eq!(d1, hook::Decision::Block("entry K".to_string()));
assert!(in_flight_path.exists());
let outcome = recover(&inbox, OrphanPolicy::Retry).unwrap();
match &outcome {
RecoveryOutcome::ReQueued { .. } => {}
other => panic!("expected ReQueued, got {:?}", other),
}
assert!(!in_flight_path.exists());
assert!(
!dir.path().join(".responded").exists(),
"recover must remove .responded so next session starts clean"
);
let d2 = hook::run(&inbox, &hook::Mode::Drain, 0).unwrap();
assert_eq!(
d2,
hook::Decision::Block("entry K".to_string()),
"hook must re-deliver entry K after retry recovery"
);
let d3 = hook::run(&inbox, &hook::Mode::Drain, 0).unwrap();
assert_eq!(d3, hook::Decision::Block("entry K+1".to_string()));
}
#[test]
fn fen_cascade_drop_policy_clears_responded_before_next_session() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
let in_flight_path = in_flight(&dir);
let responded_path = dir.path().join(".responded");
write_line(&inbox, "cycle N email batch");
hook::run(&inbox, &hook::Mode::Drain, 0).unwrap();
assert!(in_flight_path.exists(), "cycle N: .in-flight must exist");
assert!(responded_path.exists(), "cycle N: .responded must exist");
let outcome = recover(&inbox, OrphanPolicy::Drop).unwrap();
match &outcome {
RecoveryOutcome::Dropped { .. } => {}
other => panic!("expected Dropped, got {:?}", other),
}
assert!(!in_flight_path.exists(), "recover must remove .in-flight");
assert!(
!responded_path.exists(),
"recover must remove .responded — without this, next hook tick fires F12"
);
fs::write(&inbox, "").unwrap();
inbox::write_offset(&dir.path().join(".inbox-offset"), 0).unwrap();
write_line(&inbox, "cycle N+1 email batch");
let decision = hook::run(&inbox, &hook::Mode::Drain, 0).unwrap();
assert_eq!(
decision,
hook::Decision::Block("cycle N+1 email batch".to_string()),
"first hook tick of cycle N+1 must deliver new batch, not error out"
);
}
#[test]
fn crash_between_ack_step1_and_step2_recovered_by_stale_orphan_path() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
let in_flight_path = in_flight(&dir);
let responded_path = dir.path().join(".responded");
let offset_file = dir.path().join(".inbox-offset");
write_line(&inbox, "entry K");
write_line(&inbox, "entry K+1");
hook::run(&inbox, &hook::Mode::Drain, 0).unwrap();
assert!(in_flight_path.exists());
assert!(responded_path.exists());
let in_flight_entry = crate::in_flight::InFlightEntry::read_from(&in_flight_path)
.unwrap()
.unwrap();
inbox::write_offset(&offset_file, in_flight_entry.end_offset).unwrap();
fs::remove_file(&in_flight_path).unwrap();
assert!(responded_path.exists());
assert!(!in_flight_path.exists());
let outcome = recover(&inbox, OrphanPolicy::Drop).unwrap();
match &outcome {
RecoveryOutcome::NothingToRecover => {}
RecoveryOutcome::StaleOrphanDeleted { .. } => {}
other => panic!(
"expected NothingToRecover or StaleOrphanDeleted, got {:?}",
other
),
}
assert!(
!in_flight_path.exists(),
"recover must remove .in-flight (or it was already gone)"
);
assert!(
!responded_path.exists(),
"recover must remove .responded after stale-orphan fast path"
);
let decision = hook::run(&inbox, &hook::Mode::Drain, 0).unwrap();
assert_eq!(
decision,
hook::Decision::Block("entry K+1".to_string()),
"hook must deliver K+1 cleanly after crash-window recovery"
);
}
}