use std::fs;
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
use crate::error::{HeartbeatError, Result};
use hex;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct InFlightEntry {
pub entry_id: String,
pub start_offset: u64,
pub end_offset: u64,
pub raw_line: String,
pub delivered_at: String,
}
impl InFlightEntry {
pub fn new(raw_line: &str, start_offset: u64, end_offset: u64) -> Self {
InFlightEntry {
entry_id: sha256_hex(raw_line),
start_offset,
end_offset,
raw_line: raw_line.to_string(),
delivered_at: utc_now_iso8601(),
}
}
pub fn write_to(&self, path: &Path) -> Result<()> {
let tmp = path.with_extension("tmp");
let write_err = |e: io::Error| HeartbeatError::InFlightWrite {
path: path.to_owned(),
source: e,
};
{
let mut f = fs::File::create(&tmp).map_err(write_err)?;
let json = serde_json::to_string(self)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
.map_err(write_err)?;
f.write_all(json.as_bytes()).map_err(write_err)?;
f.sync_all().map_err(write_err)?;
}
fs::rename(&tmp, path).map_err(write_err)?;
Ok(())
}
pub fn read_from(path: &Path) -> Result<Option<Self>> {
match fs::read_to_string(path) {
Ok(s) => {
let entry: InFlightEntry =
serde_json::from_str(&s).map_err(|e| HeartbeatError::InFlightCorrupt {
path: path.to_owned(),
source: e,
})?;
Ok(Some(entry))
}
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(HeartbeatError::InFlightRead {
path: path.to_owned(),
source: e,
}),
}
}
pub fn is_stale(&self, current_offset: u64) -> bool {
current_offset >= self.end_offset
}
}
pub fn in_flight_file_for(inbox: &Path) -> PathBuf {
let dir = inbox.parent().unwrap_or(Path::new("."));
dir.join(".in-flight")
}
pub fn sha256_hex(s: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(s.as_bytes());
hex::encode(hasher.finalize())
}
fn utc_now_iso8601() -> String {
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let s = secs;
let seconds = s % 60;
let minutes = (s / 60) % 60;
let hours = (s / 3600) % 24;
let days_since_epoch = s / 86400;
let (year, month, day) = days_since_epoch_to_ymd(days_since_epoch);
format!(
"{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
year, month, day, hours, minutes, seconds
)
}
fn days_since_epoch_to_ymd(days: u64) -> (u64, u64, u64) {
let z = days + 719468;
let era = z / 146097;
let doe = z % 146097;
let yoe = (doe - doe / 1460 + doe / 36524 - doe / 146096) / 365;
let y = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100);
let mp = (5 * doy + 2) / 153;
let d = doy - (153 * mp + 2) / 5 + 1;
let m = if mp < 10 { mp + 3 } else { mp - 9 };
let y = if m <= 2 { y + 1 } else { y };
(y, m, d)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn entry_id_is_sha256_of_raw_line() {
let e = InFlightEntry::new("hello world", 0, 12);
let expected = sha256_hex("hello world");
assert_eq!(e.entry_id, expected);
}
#[test]
fn entry_id_stable_for_same_raw_line() {
let e1 = InFlightEntry::new("same line", 0, 10);
let e2 = InFlightEntry::new("same line", 0, 10);
assert_eq!(e1.entry_id, e2.entry_id);
}
#[test]
fn entry_id_differs_for_different_raw_lines() {
let e1 = InFlightEntry::new("line A", 0, 7);
let e2 = InFlightEntry::new("line B", 0, 7);
assert_ne!(e1.entry_id, e2.entry_id);
}
#[test]
fn round_trip_write_read() {
let dir = TempDir::new().unwrap();
let path = dir.path().join(".in-flight");
let original = InFlightEntry::new("test entry line", 0, 16);
original.write_to(&path).unwrap();
let read_back = InFlightEntry::read_from(&path).unwrap().unwrap();
assert_eq!(original.entry_id, read_back.entry_id);
assert_eq!(original.start_offset, read_back.start_offset);
assert_eq!(original.end_offset, read_back.end_offset);
assert_eq!(original.raw_line, read_back.raw_line);
}
#[test]
fn read_from_missing_file_returns_none() {
let dir = TempDir::new().unwrap();
let path = dir.path().join(".in-flight");
let result = InFlightEntry::read_from(&path).unwrap();
assert!(result.is_none());
}
#[test]
fn is_stale_when_cursor_past_end_offset() {
let e = InFlightEntry::new("line", 10, 15);
assert!(e.is_stale(16));
assert!(e.is_stale(100));
}
#[test]
fn is_not_stale_when_cursor_before_end_offset() {
let e = InFlightEntry::new("line", 10, 15);
assert!(!e.is_stale(14));
assert!(!e.is_stale(10)); assert!(!e.is_stale(0)); }
#[test]
fn is_stale_when_cursor_at_end_offset() {
let e = InFlightEntry::new("line", 10, 15);
assert!(e.is_stale(15));
}
#[test]
fn in_flight_file_for_places_in_same_dir() {
let inbox = Path::new("/some/dir/inbox.jsonl");
let path = in_flight_file_for(inbox);
assert_eq!(path, PathBuf::from("/some/dir/.in-flight"));
}
#[test]
fn utc_timestamp_format_is_iso8601() {
let ts = utc_now_iso8601();
assert!(ts.ends_with('Z'), "timestamp must end with Z: {}", ts);
assert_eq!(ts.len(), 20, "timestamp must be 20 chars: {}", ts);
assert_eq!(&ts[4..5], "-");
assert_eq!(&ts[7..8], "-");
assert_eq!(&ts[10..11], "T");
}
#[test]
fn write_is_atomic_via_tmp_rename() {
let dir = TempDir::new().unwrap();
let path = dir.path().join(".in-flight");
let tmp = path.with_extension("tmp");
let entry = InFlightEntry::new("atomic test", 0, 12);
entry.write_to(&path).unwrap();
assert!(path.exists(), ".in-flight must exist after write");
assert!(
!tmp.exists(),
".in-flight.tmp must be cleaned up after rename"
);
}
#[test]
fn corrupt_in_flight_json_read_from_returns_in_flight_corrupt() {
let dir = TempDir::new().unwrap();
let path = dir.path().join(".in-flight");
fs::write(&path, "{not valid json at all").unwrap();
let result = InFlightEntry::read_from(&path);
match result {
Err(crate::error::HeartbeatError::InFlightCorrupt {
path: err_path,
source: _,
}) => {
assert_eq!(
err_path, path,
"InFlightCorrupt path must be the .in-flight file path"
);
}
other => panic!(
"expected Err(HeartbeatError::InFlightCorrupt), got {:?}",
other
),
}
}
#[test]
fn empty_in_flight_file_returns_in_flight_corrupt() {
let dir = TempDir::new().unwrap();
let path = dir.path().join(".in-flight");
fs::write(&path, "").unwrap();
let result = InFlightEntry::read_from(&path);
match result {
Err(crate::error::HeartbeatError::InFlightCorrupt { .. }) => {}
other => panic!(
"expected Err(HeartbeatError::InFlightCorrupt) for empty file, got {:?}",
other
),
}
}
}