use std::fs;
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, PartialEq)]
pub struct InboxEntry {
pub decoded: String,
pub raw_line: String,
pub start_offset: u64,
pub end_offset: u64,
}
pub fn read_next_entry(inbox: &Path, offset_file: &Path) -> io::Result<Option<InboxEntry>> {
let size = match fs::metadata(inbox) {
Ok(m) => m.len(),
Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(e),
};
let mut file = fs::File::open(inbox)?;
loop {
let start_offset: u64 = read_offset(offset_file).unwrap_or(0);
if size <= start_offset {
return Ok(None);
}
file.seek(SeekFrom::Start(start_offset))?;
let remaining = (size - start_offset) as usize;
let mut buf = vec![0u8; remaining];
file.read_exact(&mut buf)?;
let (line_bytes, consumed) = match buf.iter().position(|&b| b == b'\n') {
Some(nl) => (&buf[..nl], nl + 1),
None => (&buf[..], buf.len()),
};
let end_offset = start_offset + consumed as u64;
let line = String::from_utf8_lossy(line_bytes);
let line = line.trim();
if line.is_empty() {
write_offset(offset_file, end_offset)?;
continue;
}
let raw_line = line.to_string();
let decoded = decode_line(line);
return Ok(Some(InboxEntry {
decoded,
raw_line,
start_offset,
end_offset,
}));
}
}
pub fn acknowledge(offset_file: &Path, new_offset: u64, in_flight_path: &Path) -> io::Result<()> {
let current = read_offset(offset_file).unwrap_or(0);
if new_offset < current {
let _ = fs::remove_file(in_flight_path);
return Ok(());
}
write_offset(offset_file, new_offset)?;
let _ = fs::remove_file(in_flight_path);
Ok(())
}
fn decode_line(line: &str) -> String {
if line.starts_with('"') {
if let Ok(serde_json::Value::String(s)) = serde_json::from_str(line) {
return s;
}
}
line.to_string()
}
pub fn read_offset(offset_file: &Path) -> Option<u64> {
fs::read_to_string(offset_file)
.ok()
.and_then(|s| s.trim().parse().ok())
}
pub fn write_offset(offset_file: &Path, offset: u64) -> io::Result<()> {
let tmp = offset_file.with_extension("tmp");
{
let mut f = fs::File::create(&tmp)?;
write!(f, "{}", offset)?;
f.sync_all()?;
}
fs::rename(&tmp, offset_file)?;
Ok(())
}
pub fn offset_file_for(inbox: &Path) -> PathBuf {
let dir = inbox.parent().unwrap_or(Path::new("."));
dir.join(".inbox-offset")
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::TempDir;
fn make_inbox(dir: &TempDir, name: &str) -> PathBuf {
dir.path().join(name)
}
fn write_inbox(path: &Path, content: &str) {
let mut f = fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
.unwrap();
f.write_all(content.as_bytes()).unwrap();
}
fn ack(dir: &TempDir, entry: &InboxEntry) {
let offset_file = dir.path().join(".inbox-offset");
let in_flight = dir.path().join(".in-flight");
acknowledge(&offset_file, entry.end_offset, &in_flight).unwrap();
}
#[test]
fn empty_inbox_returns_none() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
fs::write(&inbox, "").unwrap();
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn single_line_with_newline() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "hello world\n");
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(entry.decoded, "hello world");
assert_eq!(entry.raw_line, "hello world");
assert_eq!(entry.start_offset, 0);
assert_eq!(entry.end_offset, 12);
let entry2 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(entry2.decoded, "hello world");
ack(&dir, &entry);
let next = read_next_entry(&inbox, &offset).unwrap();
assert_eq!(next, None);
}
#[test]
fn single_line_without_newline() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "partial write no newline");
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(entry.decoded, "partial write no newline");
assert_eq!(entry.start_offset, 0);
assert_eq!(entry.end_offset, 24);
ack(&dir, &entry);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn multiple_lines_consumed_in_order() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "line one\nline two\nline three\n");
let e1 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e1.decoded, "line one");
ack(&dir, &e1);
let e2 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e2.decoded, "line two");
ack(&dir, &e2);
let e3 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e3.decoded, "line three");
ack(&dir, &e3);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn no_ack_means_same_entry_re_read() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "important entry\n");
for _ in 0..3 {
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(entry.decoded, "important entry");
}
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
ack(&dir, &entry);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn byte_offset_survives_append() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "first\n");
let e1 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e1.decoded, "first");
ack(&dir, &e1);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
write_inbox(&inbox, "second\n");
let e2 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e2.decoded, "second");
ack(&dir, &e2);
}
#[test]
fn json_encoded_multiline_is_decoded() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
let prompt = "line one\nline two\nline three";
let encoded = serde_json::to_string(prompt).unwrap();
write_inbox(&inbox, &format!("{}\n", encoded));
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(entry.decoded, prompt);
assert_eq!(entry.raw_line, encoded); ack(&dir, &entry);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn json_encoded_messages_drain_in_order() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
let msg1 = "triage batch one\nwith multiple\nlines";
let msg2 = "triage batch two\nalso multiline";
write_inbox(
&inbox,
&format!("{}\n", serde_json::to_string(msg1).unwrap()),
);
write_inbox(
&inbox,
&format!("{}\n", serde_json::to_string(msg2).unwrap()),
);
let e1 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e1.decoded, msg1);
ack(&dir, &e1);
let e2 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e2.decoded, msg2);
ack(&dir, &e2);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn plain_text_passthrough_unchanged() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "plain message no encoding\n");
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(entry.decoded, "plain message no encoding");
ack(&dir, &entry);
}
#[test]
fn blank_lines_skipped_not_returned_as_none() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "\n\nreal message\n\n");
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(entry.decoded, "real message");
ack(&dir, &entry);
let next = read_next_entry(&inbox, &offset).unwrap();
assert_eq!(next, None);
}
#[test]
fn blank_lines_between_messages_skipped() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "first\n\n\nsecond\n");
let e1 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e1.decoded, "first");
ack(&dir, &e1);
let e2 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e2.decoded, "second");
ack(&dir, &e2);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn offset_file_for_places_in_same_dir() {
let inbox = Path::new("/some/dir/inbox.jsonl");
let offset = offset_file_for(inbox);
assert_eq!(offset, PathBuf::from("/some/dir/.inbox-offset"));
}
#[test]
fn acknowledge_advances_offset_and_removes_in_flight() {
let dir = TempDir::new().unwrap();
let offset_file = dir.path().join(".inbox-offset");
let in_flight = dir.path().join(".in-flight");
fs::write(&in_flight, "{}").unwrap();
acknowledge(&offset_file, 42, &in_flight).unwrap();
let written = read_offset(&offset_file).unwrap();
assert_eq!(written, 42);
assert!(!in_flight.exists(), ".in-flight should be removed on ack");
}
#[test]
fn acknowledge_is_ok_if_in_flight_already_gone() {
let dir = TempDir::new().unwrap();
let offset_file = dir.path().join(".inbox-offset");
let in_flight = dir.path().join(".in-flight");
acknowledge(&offset_file, 10, &in_flight).unwrap();
assert_eq!(read_offset(&offset_file).unwrap(), 10);
}
#[test]
fn invalid_utf8_before_newline_cursor_stays_aligned() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
let mut f = fs::OpenOptions::new()
.create(true)
.append(true)
.open(&inbox)
.unwrap();
f.write_all(b"hel\xFFlo\nworld\n").unwrap();
drop(f);
let e1 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e1.start_offset, 0);
assert_eq!(
e1.end_offset, 7,
"consumed must be 7 bytes (6 content + 1 newline)"
);
ack(&dir, &e1);
let e2 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e2.start_offset, 7);
assert_eq!(e2.decoded, "world");
ack(&dir, &e2);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn invalid_utf8_no_newline_cursor_stays_aligned() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
let mut f = fs::OpenOptions::new()
.create(true)
.append(true)
.open(&inbox)
.unwrap();
f.write_all(b"bad\xFFbytes").unwrap();
drop(f);
let e1 = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(e1.start_offset, 0);
assert_eq!(e1.end_offset, 9, "consumed must be 9 bytes (entire buffer)");
assert!(
e1.decoded.contains("bad"),
"content prefix preserved: {}",
e1.decoded
);
assert!(
e1.decoded.contains("bytes"),
"content suffix preserved: {}",
e1.decoded
);
ack(&dir, &e1);
assert_eq!(read_next_entry(&inbox, &offset).unwrap(), None);
}
#[test]
fn crash_c1_before_in_flight_write_recovers_naturally() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
write_inbox(&inbox, "entry K\n");
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(entry.decoded, "entry K");
ack(&dir, &entry); }
#[test]
fn crash_c2_in_flight_written_offset_not_advanced() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
let in_flight = dir.path().join(".in-flight");
write_inbox(&inbox, "entry K\n");
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
use crate::in_flight::InFlightEntry;
let inflight = InFlightEntry::new(&entry.raw_line, entry.start_offset, entry.end_offset);
inflight.write_to(&in_flight).unwrap();
assert_eq!(read_offset(&offset), None);
let current_offset = read_offset(&offset).unwrap_or(0);
assert!(!inflight.is_stale(current_offset), "should be live orphan");
let re_entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
assert_eq!(re_entry.decoded, "entry K");
}
#[test]
fn crash_c3_after_response_before_ack_is_live_orphan() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
let in_flight = dir.path().join(".in-flight");
write_inbox(&inbox, "entry K\n");
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
use crate::in_flight::InFlightEntry;
let inflight = InFlightEntry::new(&entry.raw_line, entry.start_offset, entry.end_offset);
inflight.write_to(&in_flight).unwrap();
let current_offset = read_offset(&offset).unwrap_or(0);
assert!(!inflight.is_stale(current_offset));
let read_back = crate::in_flight::InFlightEntry::read_from(&in_flight)
.unwrap()
.unwrap();
assert!(!read_back.is_stale(current_offset));
assert!(read_back.is_stale(entry.end_offset + 1));
}
#[test]
fn stale_orphan_detected_by_cursor_past_end_offset() {
let dir = TempDir::new().unwrap();
let inbox = make_inbox(&dir, "inbox.jsonl");
let offset = dir.path().join(".inbox-offset");
let in_flight = dir.path().join(".in-flight");
write_inbox(&inbox, "entry K\n");
let entry = read_next_entry(&inbox, &offset).unwrap().unwrap();
use crate::in_flight::InFlightEntry;
let inflight = InFlightEntry::new(&entry.raw_line, entry.start_offset, entry.end_offset);
inflight.write_to(&in_flight).unwrap();
write_offset(&offset, entry.end_offset).unwrap();
assert!(in_flight.exists());
let read_back = InFlightEntry::read_from(&in_flight).unwrap().unwrap();
let current_offset = read_offset(&offset).unwrap();
assert!(read_back.is_stale(current_offset));
assert!(!read_back.is_stale(current_offset - 1));
}
}