use super::error::ManifestError;
use super::event::Event;
use std::collections::HashSet;
use std::fs::OpenOptions;
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write};
use std::path::Path;
fn heal_torn_trailing_line(path: &Path) -> Result<(), ManifestError> {
let mut file = match OpenOptions::new().read(true).write(true).open(path) {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
Err(e) => return Err(ManifestError::Io(e)),
};
let len = file.metadata()?.len();
if len == 0 || last_byte_is_newline(&mut file, len)? {
return Ok(());
}
truncate_to_last_newline(&mut file, len)
}
fn last_byte_is_newline(file: &mut std::fs::File, len: u64) -> Result<bool, ManifestError> {
let mut buf = [0u8; 1];
file.seek(SeekFrom::Start(len - 1))?;
file.read_exact(&mut buf)?;
Ok(buf[0] == b'\n')
}
fn truncate_to_last_newline(file: &mut std::fs::File, len: u64) -> Result<(), ManifestError> {
let mut buf = [0u8; 1];
let mut pos = len - 1;
while pos > 0 {
pos -= 1;
file.seek(SeekFrom::Start(pos))?;
file.read_exact(&mut buf)?;
if buf[0] == b'\n' {
let keep = pos + 1;
tracing::warn!(
truncated_from = len,
truncated_to = keep,
"healing manifest: truncating torn trailing line"
);
file.set_len(keep)?;
file.sync_data()?;
return Ok(());
}
}
tracing::warn!("healing manifest: truncating entire torn tail (no prior newline)");
file.set_len(0)?;
file.sync_data()?;
Ok(())
}
pub fn append_event(path: &Path, event: &Event) -> Result<(), ManifestError> {
if let Some(parent) = path.parent() {
if !parent.as_os_str().is_empty() {
std::fs::create_dir_all(parent)?;
}
}
heal_torn_trailing_line(path)?;
let mut file = OpenOptions::new().append(true).create(true).open(path)?;
let line = serde_json::to_string(event).map_err(ManifestError::Serialize)?;
file.write_all(line.as_bytes())?;
file.write_all(b"\n")?;
file.sync_data()?;
Ok(())
}
pub fn read_all(path: &Path) -> Result<Vec<Event>, ManifestError> {
let Some(raw_lines) = slurp_raw_lines(path)? else {
return Ok(Vec::new());
};
let total = raw_lines.len();
let mut events = Vec::new();
for (idx, bytes) in raw_lines.into_iter().enumerate() {
let line_num = idx + 1;
let is_last = line_num == total;
match decode_and_parse_line(&bytes, line_num, is_last)? {
LineOutcome::Event(ev) => events.push(ev),
LineOutcome::Skip => continue,
LineOutcome::StopTorn => break,
}
}
emit_semantic_warnings(&events);
Ok(events)
}
fn slurp_raw_lines(path: &Path) -> Result<Option<Vec<Vec<u8>>>, ManifestError> {
let file = match OpenOptions::new().read(true).open(path) {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(ManifestError::Io(e)),
};
let mut reader = BufReader::new(file);
let mut lines: Vec<Vec<u8>> = Vec::new();
loop {
let mut buf: Vec<u8> = Vec::new();
let n = reader.read_until(b'\n', &mut buf)?;
if n == 0 {
break;
}
lines.push(buf);
}
Ok(Some(lines))
}
enum LineOutcome {
Event(Event),
Skip,
StopTorn,
}
fn decode_and_parse_line(
bytes: &[u8],
line_num: usize,
is_last: bool,
) -> Result<LineOutcome, ManifestError> {
let mut end = bytes.len();
if bytes.last() == Some(&b'\n') {
end -= 1;
if end > 0 && bytes[end - 1] == b'\r' {
end -= 1;
}
}
let content = &bytes[..end];
if content.iter().all(|b| b.is_ascii_whitespace()) {
return Ok(LineOutcome::Skip);
}
let s = match std::str::from_utf8(content) {
Ok(s) => s,
Err(_) if is_last => {
tracing::warn!(
line = line_num,
"discarding torn trailing line in manifest (invalid UTF-8)"
);
return Ok(LineOutcome::StopTorn);
}
Err(_) => {
tracing::error!(line = line_num, "manifest corruption detected (invalid UTF-8)");
return Err(ManifestError::Corruption {
line: line_num,
source: serde_json::Error::io(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"invalid UTF-8 in manifest line",
)),
});
}
};
match serde_json::from_str::<Event>(s) {
Ok(ev) => Ok(LineOutcome::Event(ev)),
Err(e) if is_last => {
tracing::warn!(line = line_num, error = %e, "discarding torn trailing line in manifest");
Ok(LineOutcome::StopTorn)
}
Err(e) => {
tracing::error!(line = line_num, error = %e, "manifest corruption detected");
Err(ManifestError::Corruption { line: line_num, source: e })
}
}
}
fn emit_semantic_warnings(events: &[Event]) {
let mut live: HashSet<&str> = HashSet::new();
for (idx, ev) in events.iter().enumerate() {
let line_num = idx + 1;
match ev {
Event::Add { id, .. } => {
if !live.insert(id.as_str()) {
tracing::warn!(
line = line_num,
id = %id,
"duplicate Add for pack id; second Add overrides first"
);
}
}
Event::Update { id, .. } | Event::Sync { id, .. } => {
if !live.contains(id.as_str()) {
tracing::warn!(
line = line_num,
id = %id,
op = ?std::mem::discriminant(ev),
"manifest event references unknown pack id (no prior Add)"
);
}
}
Event::Rm { id, .. } => {
if !live.remove(id.as_str()) {
tracing::warn!(
line = line_num,
id = %id,
"Rm for unknown pack id (no prior Add)"
);
}
}
Event::ActionStarted { .. }
| Event::ActionCompleted { .. }
| Event::ActionHalted { .. } => {}
Event::ForcePruneExecuted { .. } => {}
}
}
}
#[cfg(test)]
mod tests {
use super::super::event::SCHEMA_VERSION;
use super::*;
use chrono::{TimeZone, Utc};
use tempfile::tempdir;
fn sample() -> Event {
Event::Add {
ts: Utc.with_ymd_and_hms(2026, 4, 19, 10, 0, 0).unwrap(),
id: "a".into(),
url: "u".into(),
path: "a".into(),
pack_type: "declarative".into(),
schema_version: SCHEMA_VERSION.into(),
}
}
#[test]
fn append_and_read_roundtrip() {
let dir = tempdir().unwrap();
let p = dir.path().join(".grex/events.jsonl");
let e = sample();
append_event(&p, &e).unwrap();
let got = read_all(&p).unwrap();
assert_eq!(got, vec![e]);
}
#[test]
fn read_missing_file_is_empty() {
let dir = tempdir().unwrap();
let p = dir.path().join("absent.jsonl");
assert!(read_all(&p).unwrap().is_empty());
}
#[test]
fn torn_trailing_line_is_discarded() {
let dir = tempdir().unwrap();
let p = dir.path().join(".grex/events.jsonl");
append_event(&p, &sample()).unwrap();
let mut f = OpenOptions::new().append(true).open(&p).unwrap();
f.write_all(b"{\"op\":\"add\",\"ts\":\"2026-04").unwrap();
drop(f);
let got = read_all(&p).unwrap();
assert_eq!(got.len(), 1);
}
#[test]
fn earlier_corruption_is_hard_error() {
let dir = tempdir().unwrap();
let p = dir.path().join(".grex/events.jsonl");
std::fs::create_dir_all(p.parent().unwrap()).unwrap();
let mut f = OpenOptions::new().create(true).append(true).open(&p).unwrap();
f.write_all(b"not-json\n").unwrap();
drop(f);
append_event(&p, &sample()).unwrap();
let err = read_all(&p).unwrap_err();
assert!(matches!(err, ManifestError::Corruption { line: 1, .. }));
}
#[test]
fn empty_lines_are_skipped() {
let dir = tempdir().unwrap();
let p = dir.path().join(".grex/events.jsonl");
append_event(&p, &sample()).unwrap();
let mut f = OpenOptions::new().append(true).open(&p).unwrap();
f.write_all(b"\n").unwrap();
drop(f);
append_event(&p, &sample()).unwrap();
assert_eq!(read_all(&p).unwrap().len(), 2);
}
#[test]
fn heal_on_append_truncates_torn_tail() {
let dir = tempdir().unwrap();
let p = dir.path().join(".grex/events.jsonl");
append_event(&p, &sample()).unwrap();
let mut f = OpenOptions::new().append(true).open(&p).unwrap();
f.write_all(b"{\"op\":\"add\",\"ts\":\"2026").unwrap();
drop(f);
append_event(&p, &sample()).unwrap();
let got = read_all(&p).unwrap();
assert_eq!(got.len(), 2, "healed torn fragment; both valid events present");
}
}