grex-core 1.3.0

Core library for grex, the nested meta-repo manager: manifest, lockfile, scheduler, pack model, plugin traits.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
//! Append-and-read for the manifest JSONL log.

use super::error::ManifestError;
use super::event::Event;
use std::collections::HashSet;
use std::fs::OpenOptions;
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write};
use std::path::Path;

/// Heal a torn trailing line in the manifest, if present.
///
/// If the file exists and does NOT end with `\n`, scan backwards to find the
/// last newline and truncate everything after it. This prevents the next
/// append from fusing its bytes onto a partial line (which would turn an
/// otherwise-recoverable torn tail into mid-line corruption).
///
/// No-ops on:
///   * missing file
///   * empty file
///   * file already ending with `\n`
fn heal_torn_trailing_line(path: &Path) -> Result<(), ManifestError> {
    let mut file = match OpenOptions::new().read(true).write(true).open(path) {
        Ok(f) => f,
        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(()),
        Err(e) => return Err(ManifestError::Io(e)),
    };
    let len = file.metadata()?.len();
    if len == 0 || last_byte_is_newline(&mut file, len)? {
        return Ok(());
    }
    truncate_to_last_newline(&mut file, len)
}

/// Returns `true` if the byte at `len - 1` is `\n`.
fn last_byte_is_newline(file: &mut std::fs::File, len: u64) -> Result<bool, ManifestError> {
    let mut buf = [0u8; 1];
    file.seek(SeekFrom::Start(len - 1))?;
    file.read_exact(&mut buf)?;
    Ok(buf[0] == b'\n')
}

/// Scan backwards from `len - 1` for the last `\n` and truncate to keep
/// everything up to and including it. If no newline exists, truncate the
/// whole file. Caller must have opened `file` for write.
fn truncate_to_last_newline(file: &mut std::fs::File, len: u64) -> Result<(), ManifestError> {
    let mut buf = [0u8; 1];
    // pos is the index of the byte we're about to inspect.
    let mut pos = len - 1;
    while pos > 0 {
        pos -= 1;
        file.seek(SeekFrom::Start(pos))?;
        file.read_exact(&mut buf)?;
        if buf[0] == b'\n' {
            let keep = pos + 1;
            tracing::warn!(
                truncated_from = len,
                truncated_to = keep,
                "healing manifest: truncating torn trailing line"
            );
            file.set_len(keep)?;
            file.sync_data()?;
            return Ok(());
        }
    }
    // No newline anywhere → whole file is a torn partial line.
    tracing::warn!("healing manifest: truncating entire torn tail (no prior newline)");
    file.set_len(0)?;
    file.sync_data()?;
    Ok(())
}

/// Append one event to the event log, creating the file (and any
/// missing parent directories) if needed.
///
/// Writes `<serialized-json>\n` and fsyncs the data portion. Callers
/// holding an exclusive [`crate::fs::ManifestLock`] are guaranteed that no
/// torn-interleave can occur across processes.
///
/// Before writing, a torn trailing line (file not ending in `\n`) is healed
/// by truncating back to the last newline. This prevents a prior crash from
/// fusing partial bytes with the next valid append.
///
/// Parent directories are created on demand so the v2 canonical path
/// (`<workspace>/.grex/events.jsonl`) does not require a separate
/// `create_dir_all` call at every site.
///
/// # Errors
///
/// Returns [`ManifestError::Io`] on I/O failure or
/// [`ManifestError::Serialize`] if the event cannot be serialized.
///
/// # Write-side guard for `Event::Unknown`
///
/// [`Event::Unknown`] is the read-side `#[serde(other)]` fallback for
/// unknown `op` discriminants emitted by NEWER writers — it must never
/// originate from the write side. We refuse to serialize it so a stray
/// `Unknown` literal in caller code cannot pollute the on-disk log.
/// Debug builds `debug_assert!` to surface the bug; release builds
/// `tracing::error!` and no-op so production never panics on a stray
/// caller mistake.
pub fn append_event(path: &Path, event: &Event) -> Result<(), ManifestError> {
    if matches!(event, Event::Unknown) {
        debug_assert!(false, "Event::Unknown must never reach the write side");
        tracing::error!(
            "refusing to write Event::Unknown (forward-compat read-side fallback only)"
        );
        return Ok(());
    }
    if let Some(parent) = path.parent() {
        if !parent.as_os_str().is_empty() {
            std::fs::create_dir_all(parent)?;
        }
    }
    heal_torn_trailing_line(path)?;
    let mut file = OpenOptions::new().append(true).create(true).open(path)?;
    let line = serde_json::to_string(event).map_err(ManifestError::Serialize)?;
    file.write_all(line.as_bytes())?;
    file.write_all(b"\n")?;
    // fsync the data blocks; metadata flush is not strictly needed for
    // append-only semantics.
    file.sync_data()?;
    Ok(())
}

/// Read every event from the manifest log.
///
/// Byte-oriented line splitter (tolerant of non-UTF-8 in a torn tail).
/// Missing file → empty `Vec`.
///
/// # Torn-line recovery
///
/// A parse error (invalid UTF-8 or invalid JSON) on the **last sequential
/// line** is treated as a torn write left by a crash: the line is discarded
/// with a `tracing::warn!` and earlier events are returned intact. A parse
/// error on any **earlier** line returns [`ManifestError::Corruption`].
///
/// We collect all raw lines up front (byte-oriented) so `is_last` can be
/// decided by line index rather than by the presence of a trailing `\n`.
///
/// # Forward-compat: `Event::Unknown` is silently dropped
///
/// `Event::Unknown` (the `#[serde(other)]` fallback) is silently dropped
/// regardless of position to preserve forward-compat: older readers
/// ignore unknown ops without erroring on the rest of the log. Mid-file
/// `Unknown` rows are mirror-treated like the legitimately torn-tail
/// path (warn + skip), NOT promoted to `ManifestError::Corruption` —
/// otherwise an older v1.2.x binary reading a v1.2.5+ log that contains
/// a `QuarantineRestored` (or any future) row sandwiched between known
/// rows would hard-error mid-stream.
pub fn read_all(path: &Path) -> Result<Vec<Event>, ManifestError> {
    let Some(raw_lines) = slurp_raw_lines(path)? else {
        return Ok(Vec::new());
    };
    let total = raw_lines.len();
    let mut events = Vec::new();
    for (idx, bytes) in raw_lines.into_iter().enumerate() {
        let line_num = idx + 1;
        let is_last = line_num == total;
        match decode_and_parse_line(&bytes, line_num, is_last)? {
            LineOutcome::Event(ev) => {
                // v1.2.5 — `Event::Unknown` is the `#[serde(other)]`
                // forward-compat fallback so an OLDER reader can decode
                // a NEWER writer's unknown `op` discriminants without
                // crashing the whole `read_all`. The variant is
                // preserved INTERNALLY for that decode path but is
                // filtered out of the public read API so legacy
                // schema-discard semantics survive.
                //
                // Position policy: ALL `Unknown` rows are silently
                // dropped (mirroring how torn lines are skipped). This
                // is what makes forward-compat actually forward-compat:
                // an older reader that sees a newer log with unknown
                // `op` rows sandwiched between known rows must keep
                // streaming, not abort with `Corruption`.
                if matches!(ev, Event::Unknown) {
                    tracing::debug!(
                        line = line_num,
                        "dropping Event::Unknown (forward-compat: unknown op tag)"
                    );
                    continue;
                }
                events.push(ev);
            }
            LineOutcome::Skip => continue,
            LineOutcome::StopTorn => break,
        }
    }
    emit_semantic_warnings(&events);
    Ok(events)
}

/// Read every byte line from the file. Returns `None` if the file is missing.
fn slurp_raw_lines(path: &Path) -> Result<Option<Vec<Vec<u8>>>, ManifestError> {
    let file = match OpenOptions::new().read(true).open(path) {
        Ok(f) => f,
        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
        Err(e) => return Err(ManifestError::Io(e)),
    };
    let mut reader = BufReader::new(file);
    let mut lines: Vec<Vec<u8>> = Vec::new();
    loop {
        let mut buf: Vec<u8> = Vec::new();
        let n = reader.read_until(b'\n', &mut buf)?;
        if n == 0 {
            break;
        }
        lines.push(buf);
    }
    Ok(Some(lines))
}

enum LineOutcome {
    Event(Event),
    Skip,
    StopTorn,
}

/// Strip line terminator, decide if the line is skippable, decode UTF-8, parse JSON.
fn decode_and_parse_line(
    bytes: &[u8],
    line_num: usize,
    is_last: bool,
) -> Result<LineOutcome, ManifestError> {
    // Strip trailing \n and optional \r.
    let mut end = bytes.len();
    if bytes.last() == Some(&b'\n') {
        end -= 1;
        if end > 0 && bytes[end - 1] == b'\r' {
            end -= 1;
        }
    }
    let content = &bytes[..end];
    if content.iter().all(|b| b.is_ascii_whitespace()) {
        return Ok(LineOutcome::Skip);
    }
    let s = match std::str::from_utf8(content) {
        Ok(s) => s,
        Err(_) if is_last => {
            tracing::warn!(
                line = line_num,
                "discarding torn trailing line in manifest (invalid UTF-8)"
            );
            return Ok(LineOutcome::StopTorn);
        }
        Err(_) => {
            tracing::error!(line = line_num, "manifest corruption detected (invalid UTF-8)");
            return Err(ManifestError::Corruption {
                line: line_num,
                source: serde_json::Error::io(std::io::Error::new(
                    std::io::ErrorKind::InvalidData,
                    "invalid UTF-8 in manifest line",
                )),
            });
        }
    };
    match serde_json::from_str::<Event>(s) {
        Ok(ev) => Ok(LineOutcome::Event(ev)),
        Err(e) if is_last => {
            tracing::warn!(line = line_num, error = %e, "discarding torn trailing line in manifest");
            Ok(LineOutcome::StopTorn)
        }
        Err(e) => {
            tracing::error!(line = line_num, error = %e, "manifest corruption detected");
            Err(ManifestError::Corruption { line: line_num, source: e })
        }
    }
}

/// Scan parsed events for semantic anomalies and log `tracing::warn!` for each.
///
/// Anomalies detected:
///   * **Duplicate Add**: two `Add` events for the same id. The fold layer
///     treats the second `Add` as an override; we warn so callers notice.
///   * **Orphan op**: `Update`/`Sync`/`Rm` referring to an id that never had
///     a prior `Add` (or was already `Rm`'d). The fold layer silently ignores
///     these; the warning surfaces the lost intent.
///
/// The folded state remains valid regardless — this is diagnostic only. A
/// future `read_all_strict` could upgrade these to hard errors.
fn emit_semantic_warnings(events: &[Event]) {
    let mut live: HashSet<&str> = HashSet::new();
    for (idx, ev) in events.iter().enumerate() {
        let line_num = idx + 1;
        match ev {
            Event::Add { id, .. } => {
                if !live.insert(id.as_str()) {
                    tracing::warn!(
                        line = line_num,
                        id = %id,
                        "duplicate Add for pack id; second Add overrides first"
                    );
                }
            }
            Event::Update { id, .. } | Event::Sync { id, .. } => {
                if !live.contains(id.as_str()) {
                    tracing::warn!(
                        line = line_num,
                        id = %id,
                        op = ?std::mem::discriminant(ev),
                        "manifest event references unknown pack id (no prior Add)"
                    );
                }
            }
            Event::Rm { id, .. } => {
                if !live.remove(id.as_str()) {
                    tracing::warn!(
                        line = line_num,
                        id = %id,
                        "Rm for unknown pack id (no prior Add)"
                    );
                }
            }
            // Action-audit variants are informational for crash recovery;
            // they intentionally skip the live-set check because a pack's
            // actions may run against a pack whose `Add` was emitted in a
            // prior, now-compacted log.
            Event::ActionStarted { .. }
            | Event::ActionCompleted { .. }
            | Event::ActionHalted { .. } => {}
            // v1.2.0 Stage 1.l — workspace-scoped audit; not tied to a
            // pack id so it has no live-set check.
            Event::ForcePruneExecuted { .. } => {}
            // v1.2.1 Item 5b — quarantine lifecycle events are
            // workspace-scoped audits keyed on the dest path; not tied
            // to a pack id so they have no live-set check.
            Event::QuarantineStart { .. }
            | Event::QuarantineComplete { .. }
            | Event::QuarantineFailed { .. } => {}
            // v1.2.5 — quarantine restore + GC sweep audits are also
            // workspace-scoped (keyed on the trash entry / dest path);
            // no live-set check applies.
            Event::QuarantineRestored { .. } | Event::QuarantineGcSwept { .. } => {}
            // v1.2.5 — forward-compat fallback for unknown discriminants
            // emitted by newer writers; no payload to inspect.
            Event::Unknown => {}
        }
    }
}

#[cfg(test)]
mod tests {
    use super::super::event::SCHEMA_VERSION;
    use super::*;
    use chrono::{TimeZone, Utc};
    use tempfile::tempdir;

    fn sample() -> Event {
        Event::Add {
            ts: Utc.with_ymd_and_hms(2026, 4, 19, 10, 0, 0).unwrap(),
            id: "a".into(),
            url: "u".into(),
            path: "a".into(),
            pack_type: "declarative".into(),
            schema_version: SCHEMA_VERSION.into(),
        }
    }

    #[test]
    fn append_and_read_roundtrip() {
        let dir = tempdir().unwrap();
        let p = dir.path().join(".grex/events.jsonl");
        let e = sample();
        append_event(&p, &e).unwrap();
        let got = read_all(&p).unwrap();
        assert_eq!(got, vec![e]);
    }

    #[test]
    fn read_missing_file_is_empty() {
        let dir = tempdir().unwrap();
        let p = dir.path().join("absent.jsonl");
        assert!(read_all(&p).unwrap().is_empty());
    }

    #[test]
    fn torn_trailing_line_is_discarded() {
        let dir = tempdir().unwrap();
        let p = dir.path().join(".grex/events.jsonl");
        append_event(&p, &sample()).unwrap();
        // Simulate a torn append: partial JSON on a new trailing line.
        let mut f = OpenOptions::new().append(true).open(&p).unwrap();
        f.write_all(b"{\"op\":\"add\",\"ts\":\"2026-04").unwrap();
        drop(f);
        let got = read_all(&p).unwrap();
        assert_eq!(got.len(), 1);
    }

    #[test]
    fn earlier_corruption_is_hard_error() {
        let dir = tempdir().unwrap();
        let p = dir.path().join(".grex/events.jsonl");
        std::fs::create_dir_all(p.parent().unwrap()).unwrap();
        // Line 1 is garbage, line 2 is valid — so garbage is NOT the last line.
        let mut f = OpenOptions::new().create(true).append(true).open(&p).unwrap();
        f.write_all(b"not-json\n").unwrap();
        drop(f);
        append_event(&p, &sample()).unwrap();
        let err = read_all(&p).unwrap_err();
        assert!(matches!(err, ManifestError::Corruption { line: 1, .. }));
    }

    #[test]
    fn empty_lines_are_skipped() {
        let dir = tempdir().unwrap();
        let p = dir.path().join(".grex/events.jsonl");
        append_event(&p, &sample()).unwrap();
        let mut f = OpenOptions::new().append(true).open(&p).unwrap();
        f.write_all(b"\n").unwrap();
        drop(f);
        append_event(&p, &sample()).unwrap();
        assert_eq!(read_all(&p).unwrap().len(), 2);
    }

    #[test]
    fn unknown_op_mid_file_is_silently_dropped() {
        // Forward-compat: an older reader streaming a newer log must
        // skip unknown `op` rows sandwiched between known rows without
        // erroring. Layout: [valid_a, {"op":"future_op"}, valid_b].
        let dir = tempdir().unwrap();
        let p = dir.path().join(".grex/events.jsonl");
        let valid_a = sample();
        let valid_b =
            Event::Rm { ts: Utc.with_ymd_and_hms(2026, 4, 19, 10, 0, 1).unwrap(), id: "b".into() };
        append_event(&p, &valid_a).unwrap();
        // Inject the unknown-op line directly so the writer-side guard
        // doesn't suppress it.
        let mut f = OpenOptions::new().append(true).open(&p).unwrap();
        f.write_all(b"{\"op\":\"future_op\",\"some_field\":42}\n").unwrap();
        drop(f);
        append_event(&p, &valid_b).unwrap();

        let got = read_all(&p).unwrap();
        assert_eq!(got.len(), 2, "mid-file Unknown must be silently dropped, not error");
        assert_eq!(got[0], valid_a);
        assert_eq!(got[1], valid_b);
    }

    #[test]
    fn heal_on_append_truncates_torn_tail() {
        // Prior complete event + partial trailing fragment (no \n).
        // Next append must heal the fragment so the fused bytes don't
        // become a middle-line corruption on next read_all.
        let dir = tempdir().unwrap();
        let p = dir.path().join(".grex/events.jsonl");
        append_event(&p, &sample()).unwrap();
        let mut f = OpenOptions::new().append(true).open(&p).unwrap();
        f.write_all(b"{\"op\":\"add\",\"ts\":\"2026").unwrap();
        drop(f);

        append_event(&p, &sample()).unwrap();
        let got = read_all(&p).unwrap();
        assert_eq!(got.len(), 2, "healed torn fragment; both valid events present");
    }
}