use crate::error::{HeartbeatError, Result};
use std::fs;
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, PartialEq)]
pub struct InboxEntry {
pub decoded: String,
pub raw_line: String,
pub start_offset: u64,
pub end_offset: u64,
}
pub fn read_next_entry(inbox: &Path, offset_file: &Path) -> Result<Option<InboxEntry>> {
let size = match fs::metadata(inbox) {
Ok(m) => m.len(),
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(HeartbeatError::InboxRead(e)),
};
let mut file = fs::File::open(inbox).map_err(HeartbeatError::InboxRead)?;
loop {
let start_offset: u64 = read_offset(offset_file)?.unwrap_or_default();
if size <= start_offset {
return Ok(None);
}
file.seek(SeekFrom::Start(start_offset))
.map_err(HeartbeatError::InboxRead)?;
let remaining = (size - start_offset) as usize;
let mut buf = vec![0u8; remaining];
file.read_exact(&mut buf)
.map_err(HeartbeatError::InboxRead)?;
let (line_bytes, consumed) = match buf.iter().position(|&b| b == b'\n') {
Some(nl) => (&buf[..nl], nl + 1),
None => (&buf[..], buf.len()),
};
let end_offset = start_offset + consumed as u64;
let line = String::from_utf8_lossy(line_bytes);
let line = line.trim();
if line.is_empty() {
write_offset(offset_file, end_offset)?;
continue;
}
let raw_line = line.to_string();
let decoded = decode_line(line);
return Ok(Some(InboxEntry {
decoded,
raw_line,
start_offset,
end_offset,
}));
}
}
pub fn acknowledge(offset_file: &Path, new_offset: u64, in_flight_path: &Path) -> Result<()> {
let current: u64 = read_offset(offset_file)?.unwrap_or_default();
if new_offset < current {
let _ = fs::remove_file(in_flight_path);
return Ok(());
}
write_offset(offset_file, new_offset)?;
let _ = fs::remove_file(in_flight_path);
Ok(())
}
fn decode_line(line: &str) -> String {
if line.starts_with('"') {
if let Ok(serde_json::Value::String(s)) = serde_json::from_str(line) {
return s;
}
}
line.to_string()
}
pub fn read_offset(offset_file: &Path) -> Result<Option<u64>> {
let content = match fs::read_to_string(offset_file) {
Ok(s) => s,
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
Err(e) => {
return Err(HeartbeatError::OffsetRead {
path: offset_file.to_owned(),
source: e,
})
}
};
let trimmed = content.trim();
match trimmed.parse::<u64>() {
Ok(n) => Ok(Some(n)),
Err(_) => Err(HeartbeatError::OffsetCorrupt {
path: offset_file.to_owned(),
content: trimmed.to_owned(),
}),
}
}
pub fn write_offset(offset_file: &Path, offset: u64) -> Result<()> {
let tmp = offset_file.with_extension("tmp");
let write_err = |e: io::Error| HeartbeatError::OffsetWrite {
path: offset_file.to_owned(),
source: e,
};
{
let mut f = fs::File::create(&tmp).map_err(write_err)?;
write!(f, "{}", offset).map_err(write_err)?;
f.sync_all().map_err(write_err)?;
}
fs::rename(&tmp, offset_file).map_err(write_err)?;
Ok(())
}
pub fn offset_file_for(inbox: &Path) -> PathBuf {
let dir = inbox.parent().unwrap_or(Path::new("."));
dir.join(".inbox-offset")
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::TempDir;
fn make_inbox(dir: &TempDir, name: &str) -> PathBuf {
dir.path().join(name)
}
fn write_inbox(path: &Path, content: &str) {
let mut f = fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.unwrap();
f.write_all(content.as_bytes()).unwrap();
}
fn ack(dir: &TempDir, entry: &InboxEntry) {
let offset_file = dir.path().join(".inbox-offset");
let in_flight = dir.path().join(".in-flight");
acknowledge(&offset_file, entry.end_offset, &in_flight).unwrap();
}
#[test]
fn empty_inbox_returns_none() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
fs::write(&inbox, "").unwrap();
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn single_line_with_newline() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "hello world\n");
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(entry.decoded, "hello world");
assert_eq!(entry.raw_line, "hello world");
assert_eq!(entry.start_offset, 0);
assert_eq!(entry.end_offset, 12);
let entry2 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(entry2.decoded, "hello world");
ack(&dir, &entry);
let next = read_next_entry(&inbox, &offset).unwrap();
assert_eq!(next, None);
}
#[test]
fn single_line_without_newline() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "partial write no newline");
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(entry.decoded, "partial write no newline");
assert_eq!(entry.start_offset, 0);
assert_eq!(entry.end_offset, 24);
ack(&dir, &entry);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn multiple_lines_consumed_in_order() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "line one\nline two\nline three\n");
let e1 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e1.decoded, "line one");
ack(&dir, &e1);
let e2 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e2.decoded, "line two");
ack(&dir, &e2);
let e3 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e3.decoded, "line three");
ack(&dir, &e3);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn no_ack_means_same_entry_re_read() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "important entry\n");
for _ in 0..3 {
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(entry.decoded, "important entry");
}
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
ack(&dir, &entry);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn byte_offset_survives_append() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "first\n");
let e1 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e1.decoded, "first");
ack(&dir, &e1);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
write_inbox(&inbox, "second\n");
let e2 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e2.decoded, "second");
ack(&dir, &e2);
}
#[test]
fn json_encoded_multiline_is_decoded() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
let prompt = "line one\nline two\nline three";
let encoded = serde_json::to_string(prompt).unwrap();
write_inbox(&inbox, &format!("{}\n", encoded));
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(entry.decoded, prompt);
assert_eq!(entry.raw_line, encoded); ack(&dir, &entry);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn json_encoded_messages_drain_in_order() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
let msg1 = "triage batch one\nwith multiple\nlines";
let msg2 = "triage batch two\nalso multiline";
write_inbox(
&inbox,
&format!("{}\n", serde_json::to_string(msg1).unwrap()),
);
write_inbox(
&inbox,
&format!("{}\n", serde_json::to_string(msg2).unwrap()),
);
let e1 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e1.decoded, msg1);
ack(&dir, &e1);
let e2 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e2.decoded, msg2);
ack(&dir, &e2);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn plain_text_passthrough_unchanged() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "plain message no encoding\n");
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(entry.decoded, "plain message no encoding");
ack(&dir, &entry);
}
#[test]
fn blank_lines_skipped_not_returned_as_none() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "\n\nreal message\n\n");
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(entry.decoded, "real message");
ack(&dir, &entry);
let next = read_next_entry(&inbox, &offset).unwrap();
assert_eq!(next, None);
}
#[test]
fn blank_lines_between_messages_skipped() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "first\n\n\nsecond\n");
let e1 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e1.decoded, "first");
ack(&dir, &e1);
let e2 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e2.decoded, "second");
ack(&dir, &e2);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn offset_file_for_places_in_same_dir() {
let inbox = Path::new("/some/dir/inbox.jsonl");
let offset = offset_file_for(inbox);
assert_eq!(offset, PathBuf::from("/some/dir/.inbox-offset"));
}
#[test]
fn acknowledge_advances_offset_and_removes_in_flight() {
let dir = TempDir::new().unwrap();
let offset_file = dir.path().join(".inbox-offset");
let in_flight = dir.path().join(".in-flight");
fs::write(&in_flight, "{}").unwrap();
acknowledge(&offset_file, 42, &in_flight).unwrap();
let written = read_offset(&offset_file).unwrap().unwrap();
assert_eq!(written, 42);
assert!(!in_flight.exists(), ".in-flight should be removed on ack");
}
#[test]
fn acknowledge_is_ok_if_in_flight_already_gone() {
let dir = TempDir::new().unwrap();
let offset_file = dir.path().join(".inbox-offset");
let in_flight = dir.path().join(".in-flight");
acknowledge(&offset_file, 10, &in_flight).unwrap();
assert_eq!(read_offset(&offset_file).unwrap().unwrap(), 10);
}
#[test]
fn invalid_utf8_before_newline_cursor_stays_aligned() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
let mut f = fs::OpenOptions::new()
.create(true)
.append(true)
.open(&inbox)
.unwrap();
f.write_all(b"hel\xFFlo\nworld\n").unwrap();
drop(f);
let e1 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e1.start_offset, 0);
assert_eq!(
e1.end_offset, 7,
"consumed must be 7 bytes (6 content + 1 newline)"
);
ack(&dir, &e1);
let e2 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e2.start_offset, 7);
assert_eq!(e2.decoded, "world");
ack(&dir, &e2);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn invalid_utf8_no_newline_cursor_stays_aligned() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
let mut f = fs::OpenOptions::new()
.create(true)
.append(true)
.open(&inbox)
.unwrap();
f.write_all(b"bad\xFFbytes").unwrap();
drop(f);
let e1 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e1.start_offset, 0);
assert_eq!(e1.end_offset, 9, "consumed must be 9 bytes (entire buffer)");
assert!(
e1.decoded.contains("bad"),
"content prefix preserved: {}",
e1.decoded
);
assert!(
e1.decoded.contains("bytes"),
"content suffix preserved: {}",
e1.decoded
);
ack(&dir, &e1);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn crash_c1_before_in_flight_write_recovers_naturally() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "entry K\n");
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(entry.decoded, "entry K");
ack(&dir, &entry); }
#[test]
fn crash_c2_in_flight_written_offset_not_advanced() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
let in_flight = dir.path().join(".in-flight");
write_inbox(&inbox, "entry K\n");
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
use crate::in_flight::InFlightEntry;
let inflight = InFlightEntry::new(&entry.raw_line, entry.start_offset, entry.end_offset);
inflight.write_to(&in_flight).unwrap();
assert_eq!(read_offset(&offset).unwrap(), None);
let current_offset = read_offset(&offset).unwrap().unwrap_or(0);
assert!(!inflight.is_stale(current_offset), "should be live orphan");
let re_entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(re_entry.decoded, "entry K");
}
#[test]
fn crash_c3_after_response_before_ack_is_live_orphan() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
let in_flight = dir.path().join(".in-flight");
write_inbox(&inbox, "entry K\n");
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
use crate::in_flight::InFlightEntry;
let inflight = InFlightEntry::new(&entry.raw_line, entry.start_offset, entry.end_offset);
inflight.write_to(&in_flight).unwrap();
let current_offset = read_offset(&offset).unwrap().unwrap_or(0);
assert!(!inflight.is_stale(current_offset));
let read_back = crate::in_flight::InFlightEntry::read_from(&in_flight)
.unwrap()
.unwrap();
assert!(!read_back.is_stale(current_offset));
assert!(read_back.is_stale(entry.end_offset + 1));
}
#[test]
fn stale_orphan_detected_by_cursor_past_end_offset() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
let in_flight = dir.path().join(".in-flight");
write_inbox(&inbox, "entry K\n");
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
use crate::in_flight::InFlightEntry;
let inflight = InFlightEntry::new(&entry.raw_line, entry.start_offset, entry.end_offset);
inflight.write_to(&in_flight).unwrap();
write_offset(&offset, entry.end_offset).unwrap();
assert!(in_flight.exists());
let read_back = InFlightEntry::read_from(&in_flight).unwrap().unwrap();
let current_offset = read_offset(&offset).unwrap().unwrap();
assert!(read_back.is_stale(current_offset));
assert!(!read_back.is_stale(current_offset - 1));
}
#[test]
fn corrupt_offset_read_next_entry_returns_offset_corrupt() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset_file = dir.path().join(".inbox-offset");
write_inbox(&inbox, "some entry\n");
fs::write(&offset_file, "notanumber").unwrap();
let result = read_next_entry(&inbox, &offset_file);
match result {
Err(crate::error::HeartbeatError::OffsetCorrupt { path, content }) => {
assert_eq!(path, offset_file, "path must be the offset file path");
assert_eq!(
content, "notanumber",
"content must be the trimmed string that failed to parse"
);
}
other => panic!("expected OffsetCorrupt, got {:?}", other),
}
}
#[test]
fn corrupt_offset_acknowledge_returns_offset_corrupt() {
let dir = TempDir::new().unwrap();
let offset_file = dir.path().join(".inbox-offset");
let in_flight = dir.path().join(".in-flight");
fs::write(&offset_file, "CORRUPT_BYTES").unwrap();
let result = acknowledge(&offset_file, 42, &in_flight);
match result {
Err(crate::error::HeartbeatError::OffsetCorrupt { path, content }) => {
assert_eq!(path, offset_file, "path must be the offset file path");
assert_eq!(
content, "CORRUPT_BYTES",
"content must be the trimmed string that failed to parse"
);
}
other => panic!("expected OffsetCorrupt, got {:?}", other),
}
}
#[test]
fn offset_with_whitespace_and_newline_parses_ok() {
let dir = TempDir::new().unwrap();
let offset_file = dir.path().join(".inbox-offset");
fs::write(&offset_file, " 42 \n").unwrap();
let result = read_offset(&offset_file).unwrap();
assert_eq!(
result,
Some(42),
"offset with surrounding whitespace must parse as 42"
);
}
#[test]
fn consumer_warn_and_fallback_explicit_match_pattern() {
let dir = TempDir::new().unwrap();
let offset_file = dir.path().join(".inbox-offset");
fs::write(&offset_file, "definitely-not-a-number").unwrap();
let mut warned = false;
let start_offset = match read_offset(&offset_file) {
Ok(None) => 0u64,
Ok(Some(n)) => n,
Err(crate::error::HeartbeatError::OffsetCorrupt { .. }) => {
warned = true;
0
}
Err(e) => panic!("unexpected error: {:?}", e),
};
assert!(warned, "consumer must have detected the corrupt offset");
assert_eq!(start_offset, 0, "fallback must be 0");
}
}