use std::fs;
use std::path::Path;
use std::time::Duration;
use crate::error::{HeartbeatError, Result};
use crate::in_flight::{self, InFlightEntry};
use crate::inbox;
#[derive(Debug, Clone, PartialEq)]
pub enum Mode {
Drain,
Persist,
}
#[derive(Debug, PartialEq)]
pub enum Decision {
Block(String),
Approve,
IdleTick,
}
pub fn run(inbox_path: &Path, mode: &Mode, idle_interval_secs: u64) -> Result<Decision> {
let io_dir = inbox_path.parent().unwrap_or(Path::new("."));
let responded_flag = io_dir.join(".responded");
let offset_file = inbox::offset_file_for(inbox_path);
let in_flight_path = in_flight::in_flight_file_for(inbox_path);
if responded_flag.exists() {
let in_flight_entry = match InFlightEntry::read_from(&in_flight_path)? {
Some(entry) => entry,
None => {
return Err(HeartbeatError::InconsistentState {
io_dir: io_dir.to_owned(),
});
}
};
inbox::acknowledge(&offset_file, in_flight_entry.end_offset, &in_flight_path)?;
let _ = fs::remove_file(&responded_flag);
match inbox::read_next_entry(inbox_path, &offset_file)? {
Some(next) => {
let new_in_flight =
InFlightEntry::new(&next.raw_line, next.start_offset, next.end_offset);
new_in_flight.write_to(&in_flight_path)?;
touch(&responded_flag)?;
return Ok(Decision::Block(next.decoded));
}
None => {
return Ok(approve_or_idle(mode, idle_interval_secs));
}
}
}
match inbox::read_next_entry(inbox_path, &offset_file)? {
Some(entry) => {
let new_in_flight =
InFlightEntry::new(&entry.raw_line, entry.start_offset, entry.end_offset);
new_in_flight.write_to(&in_flight_path)?;
touch(&responded_flag)?;
Ok(Decision::Block(entry.decoded))
}
None => Ok(approve_or_idle(mode, idle_interval_secs)),
}
}
pub fn serialize(decision: &Decision) -> String {
match decision {
Decision::Block(reason) => {
let obj = serde_json::json!({
"decision": "block",
"reason": reason
});
obj.to_string()
}
Decision::Approve => String::new(),
Decision::IdleTick => {
let obj = serde_json::json!({
"decision": "block",
"reason": "--- IDLE TICK ---"
});
obj.to_string()
}
}
}
fn approve_or_idle(mode: &Mode, idle_interval_secs: u64) -> Decision {
match mode {
Mode::Drain => Decision::Approve,
Mode::Persist => {
if idle_interval_secs > 0 {
std::thread::sleep(Duration::from_secs(idle_interval_secs));
}
Decision::IdleTick
}
}
}
fn touch(path: &Path) -> Result<()> {
fs::OpenOptions::new()
.create(true)
.truncate(false)
.write(true)
.open(path)
.map_err(|source| HeartbeatError::InFlightWrite {
path: path.to_owned(),
source,
})?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::in_flight::InFlightEntry;
use std::io::Write;
use std::path::PathBuf;
use tempfile::TempDir;
fn make_inbox(dir: &TempDir) -> PathBuf {
dir.path().join("inbox.jsonl")
}
fn write_line(inbox: &Path, line: &str) {
let mut f = fs::OpenOptions::new()
.create(true)
.append(true)
.open(inbox)
.unwrap();
writeln!(f, "{}", line).unwrap();
}
fn responded(dir: &TempDir) -> PathBuf {
dir.path().join(".responded")
}
fn in_flight(dir: &TempDir) -> PathBuf {
dir.path().join(".in-flight")
}
fn offset(dir: &TempDir) -> PathBuf {
dir.path().join(".inbox-offset")
}
#[test]
fn no_flag_with_message_blocks_and_writes_in_flight() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
write_line(&inbox, "triage these emails please");
let decision = run(&inbox, &Mode::Drain, 0).unwrap();
assert_eq!(
decision,
Decision::Block("triage these emails please".to_string())
);
assert!(responded(&dir).exists());
let inf = InFlightEntry::read_from(&in_flight(&dir)).unwrap().unwrap();
assert_eq!(inf.raw_line, "triage these emails please");
assert_eq!(inf.start_offset, 0);
}
#[test]
fn no_flag_empty_inbox_approves_in_drain_mode() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
let decision = run(&inbox, &Mode::Drain, 0).unwrap();
assert_eq!(decision, Decision::Approve);
}
#[test]
fn no_flag_empty_inbox_idles_in_persist_mode() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
let decision = run(&inbox, &Mode::Persist, 0).unwrap();
assert_eq!(decision, Decision::IdleTick);
}
#[test]
fn flag_with_more_messages_acknowledges_and_delivers_next() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
write_line(&inbox, "message one");
write_line(&inbox, "message two");
let first = run(&inbox, &Mode::Drain, 0).unwrap();
assert_eq!(first, Decision::Block("message one".to_string()));
assert!(responded(&dir).exists());
assert!(in_flight(&dir).exists());
let cur = inbox::read_offset(&offset(&dir)).unwrap().unwrap_or(0);
assert_eq!(cur, 0, "cursor must not advance on delivery in Fix B");
let second = run(&inbox, &Mode::Drain, 0).unwrap();
assert_eq!(second, Decision::Block("message two".to_string()));
assert!(responded(&dir).exists());
let inf2 = InFlightEntry::read_from(&in_flight(&dir)).unwrap().unwrap();
assert_eq!(inf2.raw_line, "message two");
}
#[test]
fn flag_with_empty_inbox_acknowledges_and_approves_in_drain_mode() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
write_line(&inbox, "only message");
run(&inbox, &Mode::Drain, 0).unwrap();
let decision = run(&inbox, &Mode::Drain, 0).unwrap();
assert_eq!(decision, Decision::Approve);
assert!(!responded(&dir).exists());
assert!(!in_flight(&dir).exists());
}
#[test]
fn flag_with_empty_inbox_idles_in_persist_mode() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
write_line(&inbox, "only message");
run(&inbox, &Mode::Persist, 0).unwrap();
let decision = run(&inbox, &Mode::Persist, 0).unwrap();
assert_eq!(decision, Decision::IdleTick);
}
#[test]
fn cursor_advances_only_on_second_tick_not_on_delivery() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
write_line(&inbox, "entry A");
assert_eq!(inbox::read_offset(&offset(&dir)).unwrap().unwrap_or(0), 0);
run(&inbox, &Mode::Drain, 0).unwrap();
assert_eq!(
inbox::read_offset(&offset(&dir)).unwrap().unwrap_or(0),
0,
"cursor must not advance on delivery"
);
run(&inbox, &Mode::Drain, 0).unwrap();
let after = inbox::read_offset(&offset(&dir)).unwrap().unwrap_or(0);
assert!(after > 0, "cursor must advance after ack");
}
#[test]
fn in_flight_removed_after_ack() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
write_line(&inbox, "entry");
run(&inbox, &Mode::Drain, 0).unwrap();
assert!(in_flight(&dir).exists());
run(&inbox, &Mode::Drain, 0).unwrap();
assert!(!in_flight(&dir).exists());
}
#[test]
fn crash_window_a_launcher_crash_leaves_live_orphan() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
write_line(&inbox, "entry K");
run(&inbox, &Mode::Drain, 0).unwrap();
let inf = InFlightEntry::read_from(&in_flight(&dir)).unwrap().unwrap();
let current_cursor = inbox::read_offset(&offset(&dir)).unwrap().unwrap_or(0);
assert!(!inf.is_stale(current_cursor));
assert_eq!(inf.start_offset, 0);
assert!(in_flight(&dir).exists());
}
#[test]
fn crash_window_b_agent_crash_leaves_same_orphan_signal() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
write_line(&inbox, "entry K");
run(&inbox, &Mode::Drain, 0).unwrap();
assert!(responded(&dir).exists());
assert!(in_flight(&dir).exists());
let cur = inbox::read_offset(&offset(&dir)).unwrap().unwrap_or(0);
assert_eq!(cur, 0);
let inf = InFlightEntry::read_from(&in_flight(&dir)).unwrap().unwrap();
assert!(!inf.is_stale(cur));
}
#[test]
fn crash_window_c_stale_orphan_cursor_past_end_offset() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
write_line(&inbox, "entry K\n");
run(&inbox, &Mode::Drain, 0).unwrap();
let inf_before = InFlightEntry::read_from(&in_flight(&dir)).unwrap().unwrap();
inbox::write_offset(&offset(&dir), inf_before.end_offset).unwrap();
let cur = inbox::read_offset(&offset(&dir)).unwrap().unwrap();
let inf = InFlightEntry::read_from(&in_flight(&dir)).unwrap().unwrap();
assert!(
inf.is_stale(cur),
"cursor at end_offset must be detected as stale"
);
assert!(
cur >= inf.end_offset,
"cursor at or past end_offset means entry was acknowledged"
);
}
#[test]
fn block_serializes_correctly() {
let out = serialize(&Decision::Block("do the thing".to_string()));
let parsed: serde_json::Value = serde_json::from_str(&out).unwrap();
assert_eq!(parsed["decision"], "block");
assert_eq!(parsed["reason"], "do the thing");
}
#[test]
fn approve_serializes_to_empty_string() {
let out = serialize(&Decision::Approve);
assert_eq!(out, "");
}
#[test]
fn idle_tick_serializes_as_block() {
let out = serialize(&Decision::IdleTick);
let parsed: serde_json::Value = serde_json::from_str(&out).unwrap();
assert_eq!(parsed["decision"], "block");
}
#[test]
fn responded_without_in_flight_returns_error() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
write_line(&inbox, "entry K");
run(&inbox, &Mode::Drain, 0).unwrap();
assert!(in_flight(&dir).exists());
assert!(responded(&dir).exists());
fs::remove_file(in_flight(&dir)).unwrap();
let result = run(&inbox, &Mode::Drain, 0);
assert!(
result.is_err(),
"hook must error on .responded without .in-flight, not silently re-deliver"
);
let err = result.unwrap_err();
assert!(
matches!(err, HeartbeatError::InconsistentState { .. }),
"expected InconsistentState variant, got: {:?}",
err
);
assert!(
err.to_string().contains("inconsistent state"),
"error message must name the inconsistency: {}",
err
);
}
#[test]
fn drain_nonexistent_inbox_approves() {
let dir = TempDir::new().unwrap();
let inbox = dir.path().join("inbox.jsonl");
let decision = run(&inbox, &Mode::Drain, 0).unwrap();
assert_eq!(
decision,
Decision::Approve,
"nonexistent inbox must approve in drain mode — no entries means done"
);
}
#[test]
fn blank_line_between_entries_skipped_at_hook_level() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
write_line(&inbox, "first entry");
write_line(&inbox, "");
write_line(&inbox, "second entry");
let d1 = run(&inbox, &Mode::Drain, 0).unwrap();
assert_eq!(d1, Decision::Block("first entry".to_string()));
assert!(responded(&dir).exists());
let d2 = run(&inbox, &Mode::Drain, 0).unwrap();
assert_eq!(
d2,
Decision::Block("second entry".to_string()),
"blank line between entries must be skipped — second entry delivered directly"
);
let d3 = run(&inbox, &Mode::Drain, 0).unwrap();
assert_eq!(d3, Decision::Approve);
}
#[test]
fn multi_entry_inbox_delivers_in_byte_order_then_approves() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
write_line(&inbox, "alpha");
write_line(&inbox, "bravo");
write_line(&inbox, "charlie");
let d1 = run(&inbox, &Mode::Drain, 0).unwrap();
assert_eq!(
d1,
Decision::Block("alpha".to_string()),
"tick 1 must deliver first entry"
);
assert!(responded(&dir).exists(), "tick 1 must set .responded");
assert_eq!(
inbox::read_offset(&offset(&dir)).unwrap().unwrap_or(0),
0,
"cursor must stay at 0 after delivery of first entry (Fix B)"
);
let d2 = run(&inbox, &Mode::Drain, 0).unwrap();
assert_eq!(
d2,
Decision::Block("bravo".to_string()),
"tick 2 must deliver second entry"
);
assert_eq!(
inbox::read_offset(&offset(&dir)).unwrap().unwrap_or(0),
6,
"cursor must advance to 6 (past 'alpha\\n') after ack"
);
assert!(responded(&dir).exists(), "tick 2 must re-set .responded");
let d3 = run(&inbox, &Mode::Drain, 0).unwrap();
assert_eq!(
d3,
Decision::Block("charlie".to_string()),
"tick 3 must deliver third entry"
);
assert_eq!(
inbox::read_offset(&offset(&dir)).unwrap().unwrap_or(0),
12,
"cursor must advance to 12 (past 'bravo\\n') after ack"
);
assert!(responded(&dir).exists(), "tick 3 must re-set .responded");
assert!(
in_flight(&dir).exists(),
"tick 3 must write .in-flight for 'charlie'"
);
let d4 = run(&inbox, &Mode::Drain, 0).unwrap();
assert_eq!(
d4,
Decision::Approve,
"tick 4 must approve once all entries are drained"
);
assert_eq!(
inbox::read_offset(&offset(&dir)).unwrap().unwrap_or(0),
20,
"cursor must advance to 20 (past 'charlie\\n') after final ack"
);
assert!(
!responded(&dir).exists(),
".responded must be removed after final ack"
);
assert!(
!in_flight(&dir).exists(),
".in-flight must be removed after final ack"
);
}
#[test]
fn responded_without_in_flight_returns_inconsistent_state_with_io_dir() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir);
write_line(&inbox, "msg");
run(&inbox, &Mode::Drain, 0).unwrap();
fs::remove_file(in_flight(&dir)).unwrap();
assert!(responded(&dir).exists());
assert!(!in_flight(&dir).exists());
let result = run(&inbox, &Mode::Drain, 0);
match result {
Err(HeartbeatError::InconsistentState { io_dir }) => {
assert_eq!(
io_dir,
inbox.parent().unwrap(),
"io_dir must be the parent directory of the inbox"
);
}
other => panic!(
"expected Err(HeartbeatError::InconsistentState {{ io_dir }}), got {:?}",
other
),
}
}
}