Skip to main content

dbmd_core/
log.rs

1//! `log` — the append-only, month-rotating chronological log.
2//!
3//! One logical timeline: the active `log.md` at the store root plus
4//! `log/<YYYY-MM>.md` archives. [`Log::append`] rolls older months into
5//! archives on write so the active file stays current-month. [`Log::tail`] and
6//! [`Log::since`] **reverse-read from EOF**. Both read each file they touch in
7//! full — the on-disk order is not guaranteed monotonic, so neither can
8//! early-stop within a file — and select by timestamp: `tail` keeps the `n`
9//! newest, `since` keeps everything newer than the cutoff. Both cross into
10//! month archives only as far back as the requested window reaches (by the
11//! cutoff's month for `since`, by the current `n`th-newest's month for `tail`)
12//! — never the whole history.
13//!
14//! Append-only contract: there is no rewrite API. Corrective entries go on the
15//! end; out-of-order timestamps are a validate warning (`LOG_OUT_OF_ORDER`),
16//! signalling a probable rewrite.
17
18use std::collections::BTreeMap;
19use std::fs::{self, File};
20use std::io::{Read, Seek, SeekFrom};
21use std::path::{Path, PathBuf};
22
23use chrono::{DateTime, Datelike, FixedOffset, NaiveDateTime, TimeZone, Utc};
24
25use crate::store::Store;
26
27/// The on-disk header timestamp format: `YYYY-MM-DD HH:MM` (minute precision,
28/// no timezone). Parsing reattaches UTC; emitting renders the entry's own
29/// wall-clock, so a read→write→read round-trip is stable at minute precision.
30const TS_FORMAT: &str = "%Y-%m-%d %H:%M";
31
32/// The frontmatter block written when the active `log.md` is created.
33const LOG_FRONTMATTER: &str = "---\ntype: log\n---\n\n# Curator log\n";
34
35/// Block size for the backward (reverse-from-EOF) reader.
36const REVERSE_BLOCK: usize = 8 * 1024;
37
38/// A recognized `log.md` entry kind. Custom kinds are valid in the format
39/// (`dbmd validate` warns on unrecognized via `LOG_UNKNOWN_KIND`); this enum
40/// carries the recognized vocabulary plus a [`LogKind::Custom`] catch-all so an
41/// unknown kind round-trips without loss.
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum LogKind {
44    /// A source artifact was ingested.
45    Ingest,
46    /// A file was created.
47    Create,
48    /// A file was updated.
49    Update,
50    /// A file was deleted.
51    Delete,
52    /// A file was renamed/moved.
53    Rename,
54    /// A wiki-link was added.
55    Link,
56    /// A validation pass ran.
57    Validate,
58    /// The index was rebuilt.
59    IndexRebuild,
60    /// A contradiction between sources was flagged.
61    Contradiction,
62    /// Any kind outside the recognized vocabulary, preserved verbatim.
63    Custom(String),
64}
65
66impl LogKind {
67    /// The canonical lowercase string for this kind, as it appears in a log
68    /// header (`ingest`, `index-rebuild`, …).
69    pub fn as_str(&self) -> &str {
70        match self {
71            LogKind::Ingest => "ingest",
72            LogKind::Create => "create",
73            LogKind::Update => "update",
74            LogKind::Delete => "delete",
75            LogKind::Rename => "rename",
76            LogKind::Link => "link",
77            LogKind::Validate => "validate",
78            LogKind::IndexRebuild => "index-rebuild",
79            LogKind::Contradiction => "contradiction",
80            LogKind::Custom(s) => s,
81        }
82    }
83
84    /// Parse a kind from its header token; non-canonical tokens become
85    /// [`LogKind::Custom`].
86    pub fn parse(token: &str) -> LogKind {
87        match token {
88            "ingest" => LogKind::Ingest,
89            "create" => LogKind::Create,
90            "update" => LogKind::Update,
91            "delete" => LogKind::Delete,
92            "rename" => LogKind::Rename,
93            "link" => LogKind::Link,
94            "validate" => LogKind::Validate,
95            "index-rebuild" => LogKind::IndexRebuild,
96            "contradiction" => LogKind::Contradiction,
97            other => LogKind::Custom(other.to_string()),
98        }
99    }
100
101    /// True if this is one of the recognized kinds (i.e. not
102    /// [`LogKind::Custom`]).
103    pub fn is_recognized(&self) -> bool {
104        !matches!(self, LogKind::Custom(_))
105    }
106}
107
108/// One parsed `log.md` entry: a header
109/// (`## [YYYY-MM-DD HH:MM] <kind> | <object>`) plus its body.
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct LogEntry {
112    /// The entry timestamp from the header.
113    pub timestamp: DateTime<FixedOffset>,
114    /// The entry kind.
115    pub kind: LogKind,
116    /// The object slot — a store-relative path/wiki-link target, or `None` for
117    /// store-wide actions like `validate`.
118    pub object: Option<String>,
119    /// The free-form body (one or more lines) explaining what happened.
120    pub note: String,
121}
122
123impl LogEntry {
124    /// Render this entry as it appears on disk: the `## [...]` header line,
125    /// then the note body, then a trailing blank line so successive entries are
126    /// separated. The note is emitted with header-shaped continuation lines
127    /// **escaped** (see [`escape_note_line`]) so a note line that happens to
128    /// match the entry-header shape (`## [YYYY-MM-DD HH:MM] <kind> | <obj>`) can
129    /// never be mistaken for a real entry header on readback or on the next
130    /// rotation. The escape round-trips exactly through [`unescape_note_line`].
131    fn render(&self) -> String {
132        let ts = self.timestamp.format(TS_FORMAT);
133        let mut out = String::new();
134        match &self.object {
135            Some(obj) => {
136                out.push_str(&format!("## [{}] {} | {}\n", ts, self.kind.as_str(), obj));
137            }
138            None => {
139                out.push_str(&format!("## [{}] {}\n", ts, self.kind.as_str()));
140            }
141        }
142        // Trim only the structural line terminators (`\n`/`\r`) — the trailing
143        // blank line separating entries is appended below, so a note's own
144        // trailing newlines would otherwise stack up and shift on every
145        // re-render. Spaces and tabs are legitimate note *content* and must be
146        // preserved verbatim, so the round-trip is exact: readback
147        // (`parse_entries`) trims the same `['\n', '\r']` set and no more, and a
148        // note ending in a space (`"note 0 "`) must reconstruct unchanged.
149        let note = self.note.trim_end_matches(['\n', '\r']);
150        if !note.is_empty() {
151            // Escape per line: a note line that parses as an entry header is
152            // prefixed so it is no longer at column 0 as `## [` — it stays note
153            // body on readback and on rotation, never a fabricated entry.
154            for (i, line) in note.split('\n').enumerate() {
155                if i > 0 {
156                    out.push('\n');
157                }
158                out.push_str(&escape_note_line(line));
159            }
160            out.push('\n');
161        }
162        out.push('\n');
163        out
164    }
165
166    /// The `(year, month)` of this entry's wall-clock timestamp — the rotation
167    /// bucket.
168    fn year_month(&self) -> (i32, u32) {
169        (self.timestamp.year(), self.timestamp.month())
170    }
171}
172
173/// The store's chronological log: a thin handle for the append-only timeline.
174/// All methods take the [`Store`] so they resolve the active `log.md` and the
175/// `log/` archives under the store root.
176#[derive(Debug, Clone)]
177pub struct Log;
178
179impl Log {
180    /// Atomically append `entry` to the active `log.md`, creating it (with
181    /// `type: log` frontmatter) if absent. **If the active log holds entries
182    /// from a prior month, roll those older months into `log/<YYYY-MM>.md`
183    /// first** (atomic move), keeping the active file to the current month.
184    ///
185    /// **Concurrency.** `append` is a read-modify-write of the whole active file
186    /// (`write_atomic` is atomic at the file level, but the read→render→write
187    /// window is not). Two concurrent appenders — the manager and a cron-driven
188    /// background system, say — would otherwise both read the same N-entry
189    /// snapshot and each write N+1 entries, the second rename clobbering the
190    /// first and silently dropping an audit entry. We serialize the whole
191    /// read-modify-write under an advisory file lock (`flock`, held for the
192    /// duration) so concurrent appends queue instead of racing. The lock is
193    /// advisory and process-scoped; it guards the toolkit's own appends, which is
194    /// the realistic contention path.
195    pub fn append(store: &Store, entry: &LogEntry) -> crate::Result<()> {
196        let active = active_log_path(store);
197
198        // Serialize concurrent appends for the whole read-modify-write. Held
199        // until `_lock` drops at function exit (covering both the rotation and
200        // the plain-append paths). A lock failure is non-fatal: we proceed
201        // unlocked rather than refuse to log (best-effort, same posture as the
202        // pre-fix behaviour on platforms without advisory locks).
203        let _lock = AppendLock::acquire(&active);
204
205        // Read the active file's current contents (if any). The "current month"
206        // is the month of the entry being appended (the newest in the timeline);
207        // every existing entry from a strictly-earlier month rolls to archives.
208        let current_ym = entry.year_month();
209
210        if active.exists() {
211            let content = fs::read_to_string(&active)?;
212            let (header, entries) = parse_active(&content);
213
214            // Partition existing entries into prior-month (roll out) and
215            // current-or-later (keep in the active file).
216            let mut by_month: BTreeMap<(i32, u32), Vec<LogEntry>> = BTreeMap::new();
217            let mut keep: Vec<LogEntry> = Vec::new();
218            for e in entries {
219                if e.year_month() < current_ym {
220                    by_month.entry(e.year_month()).or_default().push(e);
221                } else {
222                    keep.push(e);
223                }
224            }
225
226            if !by_month.is_empty() {
227                // Roll each prior month into its archive (atomic per-file),
228                // appending to any existing archive for that month.
229                let dir = archive_dir(store);
230                fs::create_dir_all(&dir)?;
231                for ((y, m), month_entries) in &by_month {
232                    let path = archive_path(store, *y, *m);
233                    append_to_archive(&path, month_entries)?;
234                }
235
236                // Rewrite the active file to the kept (current-month) entries
237                // plus the new entry — atomically.
238                let mut body = String::new();
239                for e in &keep {
240                    body.push_str(&e.render());
241                }
242                body.push_str(&entry.render());
243                let full = compose_active(&header, &body);
244                crate::fsx::write_atomic(&active, full.as_bytes())?;
245                return Ok(());
246            }
247
248            // No rotation needed: plain atomic append of the rendered entry.
249            let mut full = content;
250            if !full.ends_with('\n') {
251                full.push('\n');
252            }
253            full.push_str(&entry.render());
254            crate::fsx::write_atomic(&active, full.as_bytes())?;
255            Ok(())
256        } else {
257            // Fresh log: frontmatter + the single entry.
258            if let Some(parent) = active.parent() {
259                fs::create_dir_all(parent)?;
260            }
261            let body = entry.render();
262            let full = compose_active(LOG_FRONTMATTER, &body);
263            crate::fsx::write_atomic(&active, full.as_bytes())?;
264            Ok(())
265        }
266    }
267
268    /// The `n` most-recent entries **by timestamp**, returned oldest→newest.
269    ///
270    /// **Out-of-order safety (mirrors [`Log::since`]).** The log is append-only
271    /// but *not* guaranteed to be in non-decreasing timestamp order on disk: a
272    /// corrective entry is appended below the entry it corrects, a
273    /// backdated/clock-skewed write lands physically after newer entries, and a
274    /// `merge=union` clone merge interleaves both sides until a later agent
275    /// reorders. Out-of-order is only a `LOG_OUT_OF_ORDER` warning, never
276    /// rejected. So the last `n` *physical* entries are **not** the `n` newest
277    /// by time — taking them would omit a genuinely-recent entry that sits
278    /// physically before an older one, and the documented curator warm-up
279    /// (`dbmd log tail 20`) would report a stale picture of what was done lately.
280    /// We therefore feed every entry of each file we touch through a bounded
281    /// newest-by-timestamp window and let it select the true top `n`.
282    ///
283    /// Bounded cost: the active `log.md` is kept to the current month by
284    /// rotation, so a full read of it is cheap and is not a whole-store walk.
285    /// Across archives we *can* prune: each `log/<YYYY-MM>.md` holds only entries
286    /// from that month (rotation buckets by the entry's own year-month), so once
287    /// the window is full, an archive whose month is strictly before the
288    /// window-minimum's month cannot contain any entry newer than the current
289    /// `n`th-newest. We cross archives newest-month-first and stop at the first
290    /// such archive.
291    pub fn tail(store: &Store, n: usize) -> crate::Result<Vec<LogEntry>> {
292        if n == 0 {
293            return Ok(Vec::new());
294        }
295
296        // A bounded window of the `n` entries with the largest timestamps. No
297        // within-file early stop: out-of-order entries mean a newer entry can
298        // sit physically before an older one, so each file is read fully.
299        let mut window = NewestWindow::new(n);
300        // Cross-file identity dedup (see `since`): an interrupted rotation can
301        // leave the same entry in both the untrimmed active file and the
302        // archive; without this the duplicate would occupy two window slots and
303        // surface twice. The active copy (scanned first) is the one kept.
304        let mut seen: std::collections::HashSet<EntryKey> = std::collections::HashSet::new();
305
306        // Active file: scan fully (current-month-bounded by rotation).
307        let active = active_log_path(store);
308        if active.exists() {
309            reverse_collect(&active, |e| {
310                if seen.insert(entry_key(&e)) {
311                    window.consider(e);
312                }
313                false
314            })?;
315        }
316
317        // Archives, newest-month-first. Once the window is full, an archive
318        // whose month is strictly before the window-minimum's month holds only
319        // entries older than the current cutoff, so it (and every older archive)
320        // is skippable.
321        for archive in list_archives_desc(store)? {
322            if let (true, Some(cutoff_ym), Some(arch_ym)) = (
323                window.is_full(),
324                window.min_year_month(),
325                archive_year_month(&archive),
326            ) {
327                if arch_ym < cutoff_ym {
328                    break;
329                }
330            }
331            reverse_collect(&archive, |e| {
332                if seen.insert(entry_key(&e)) {
333                    window.consider(e);
334                }
335                false
336            })?;
337        }
338
339        Ok(window.into_sorted())
340    }
341
342    /// Entries strictly newer than `time`, reverse-scanning active → archives.
343    ///
344    /// **No within-file early stop.** The log is append-only but *not*
345    /// guaranteed to be in non-decreasing timestamp order on disk: a corrective
346    /// entry is appended below the entry it corrects (SPEC: "if a finding is
347    /// wrong, append a corrective entry below it"), a backdated/clock-skewed
348    /// write lands physically after newer entries, and a `merge=union` clone
349    /// merge interleaves both sides until a later agent reorders. Out-of-order
350    /// is only a `LOG_OUT_OF_ORDER` warning, never rejected. So a newer entry
351    /// can sit physically *before* an older one; stopping at the first
352    /// older-than-`time` entry would silently drop those — the documented
353    /// curator warm-up (`dbmd log since <ts>`) would miss real recent work.
354    /// We therefore read every entry of each file we touch.
355    ///
356    /// Bounded cost: the active `log.md` is kept to the current month by
357    /// rotation, so a full read of it is cheap (the same read `tail` does for a
358    /// large `n`) and is not a whole-store walk. Across archives we *can* stop:
359    /// each `log/<YYYY-MM>.md` holds only entries from that month (rotation
360    /// buckets by the entry's own year-month), so an archive whose month is
361    /// strictly before `time`'s month cannot contain any entry newer than
362    /// `time`. We cross archives newest-month-first and stop at the first whose
363    /// month is entirely at or before `time`'s.
364    pub fn since(store: &Store, time: DateTime<FixedOffset>) -> crate::Result<Vec<LogEntry>> {
365        let mut collected: Vec<LogEntry> = Vec::new();
366        // Cross-file identity dedup. An interrupted rotation (archive write
367        // committed, active rewrite not) leaves the same entries in BOTH the
368        // untrimmed active file and the archive; without dedup every such entry
369        // comes back twice. Keyed on the full entry identity, so only a
370        // byte-identical duplicate is suppressed (the active copy, scanned first,
371        // is the one kept); two genuinely-distinct entries never collide.
372        let mut seen: std::collections::HashSet<EntryKey> = std::collections::HashSet::new();
373
374        // Active file: scan fully, no early stop (out-of-order safe).
375        let active = active_log_path(store);
376        if active.exists() {
377            reverse_collect(&active, |e| {
378                if e.timestamp > time && seen.insert(entry_key(&e)) {
379                    collected.push(e);
380                }
381                false
382            })?;
383        }
384
385        // The cutoff's own (year, month): any archive strictly before it holds
386        // only older entries and is skippable. Archive months are bucketed on
387        // the UTC calendar (on-disk timestamps are offset-free and re-read as
388        // UTC; rotation buckets by the entry's UTC year-month), so the pruning
389        // calendar must be UTC too. A non-UTC `since` offset (advertised in the
390        // CLI hint, e.g. `…T00:30:00+07:00`) whose local month differs from its
391        // UTC month would otherwise prune away an archive holding entries that
392        // are strictly newer than `time` — `time.year()/.month()` read the
393        // offset-LOCAL calendar, not UTC.
394        let cutoff_utc = time.with_timezone(&Utc);
395        let cutoff_ym = (cutoff_utc.year(), cutoff_utc.month());
396
397        for archive in list_archives_desc(store)? {
398            // Archives are newest-month-first; once a month is strictly before
399            // the cutoff's month, every remaining (older) archive is too.
400            if let Some(arch_ym) = archive_year_month(&archive) {
401                if arch_ym < cutoff_ym {
402                    break;
403                }
404            }
405            // Scan this archive fully — within a month, entries may still be
406            // out of order, so no within-file early stop.
407            reverse_collect(&archive, |e| {
408                if e.timestamp > time && seen.insert(entry_key(&e)) {
409                    collected.push(e);
410                }
411                false
412            })?;
413        }
414
415        collected.reverse();
416        Ok(collected)
417    }
418
419    /// The timestamp of the most recent `validate` entry — the default `since`
420    /// window for working-set validation ([`crate::validate::validate_working_set`]).
421    pub fn last_validate_at(store: &Store) -> crate::Result<Option<DateTime<FixedOffset>>> {
422        let mut found: Option<DateTime<FixedOffset>> = None;
423
424        let active = active_log_path(store);
425        if active.exists() {
426            reverse_collect(&active, |e| {
427                if e.kind == LogKind::Validate {
428                    found = Some(e.timestamp);
429                    true
430                } else {
431                    false
432                }
433            })?;
434        }
435
436        if found.is_none() {
437            for archive in list_archives_desc(store)? {
438                reverse_collect(&archive, |e| {
439                    if e.kind == LogKind::Validate {
440                        found = Some(e.timestamp);
441                        true
442                    } else {
443                        false
444                    }
445                })?;
446                if found.is_some() {
447                    break;
448                }
449            }
450        }
451
452        Ok(found)
453    }
454
455    /// Parse a single entry header (`## [YYYY-MM-DD HH:MM] <kind> | <object>`)
456    /// into its timestamp, kind, and object. Returns `None` if the line isn't a
457    /// well-formed entry header.
458    pub fn parse_header(line: &str) -> Option<(DateTime<FixedOffset>, LogKind, Option<String>)> {
459        let line = line.trim_end_matches(['\n', '\r']);
460        let rest = line.strip_prefix("## [")?;
461        let close = rest.find(']')?;
462        let ts_str = &rest[..close];
463        let timestamp = parse_timestamp(ts_str)?;
464
465        // Everything after the closing bracket: ` <kind> | <object>` or
466        // ` <kind>`.
467        let after = rest[close + 1..].trim();
468        if after.is_empty() {
469            return None;
470        }
471
472        let (kind_str, object) = match after.split_once('|') {
473            Some((k, o)) => {
474                let obj = o.trim();
475                let obj = if obj.is_empty() {
476                    None
477                } else {
478                    Some(obj.to_string())
479                };
480                (k.trim(), obj)
481            }
482            None => (after, None),
483        };
484
485        if kind_str.is_empty() {
486            return None;
487        }
488
489        Some((timestamp, LogKind::parse(kind_str), object))
490    }
491}
492
493// ── Internal helpers ────────────────────────────────────────────────────────
494
495/// A bounded window of the `n` entries with the largest timestamps, fed by a
496/// **reverse (newest-physical-first) scan** and used by [`Log::tail`].
497///
498/// Why this exists: the last `n` *physical* entries are the `n` newest only
499/// when the log is in non-decreasing time order. That's the append-only
500/// contract, not a guarantee — a backdated, clock-skewed, or merge-interleaved
501/// entry violates it (and trips the `LOG_OUT_OF_ORDER` validate warning). The
502/// window decouples `tail` from that assumption: it keeps the `n` largest
503/// timestamps seen regardless of the order they arrive in, so the caller can
504/// read each file fully (no fragile within-file early stop) and still get the
505/// true top `n`.
506///
507/// Tie-break: entries sharing a timestamp at the window boundary are ordered by
508/// **physical recency** — the one appended later (encountered earlier in the
509/// reverse scan, i.e. a smaller `arrival`) wins. "Newest" means most-recently
510/// recorded.
511struct NewestWindow {
512    cap: usize,
513    /// Min-by-(timestamp, then physical-oldest) heap: the root is always the
514    /// next entry to evict once the window is full.
515    heap: std::collections::BinaryHeap<WindowItem>,
516    /// Count of entries fed in, in reverse-scan order, used as the tie-break
517    /// key (0 = newest physical).
518    next_arrival: u64,
519}
520
521impl NewestWindow {
522    fn new(cap: usize) -> Self {
523        NewestWindow {
524            cap,
525            heap: std::collections::BinaryHeap::with_capacity(cap),
526            next_arrival: 0,
527        }
528    }
529
530    /// Offer one entry from the scan. If the window isn't full it's kept; once
531    /// full, it's kept (evicting the current minimum) iff its timestamp is `>=`
532    /// the window minimum. Equal-timestamp boundary entries resolve by physical
533    /// recency (see the type doc).
534    fn consider(&mut self, entry: LogEntry) {
535        let arrival = self.next_arrival;
536        self.next_arrival += 1;
537
538        if self.heap.len() < self.cap {
539            self.heap.push(WindowItem { entry, arrival });
540            return;
541        }
542
543        // Window full. The heap root is the current minimum (oldest-by-
544        // timestamp held; on a tie, the oldest-physical).
545        let root = self.heap.peek().expect("full window has a root");
546        if entry.timestamp > root.entry.timestamp {
547            // Strictly newer than the window minimum: it belongs; evict the min.
548            self.heap.pop();
549            self.heap.push(WindowItem { entry, arrival });
550        }
551        // On `<=` we keep the window as-is. `<` is plainly too old. `==` is the
552        // tie case: the scan is newest-physical-first, so this entry is
553        // physically *older* than the held one of equal timestamp, and the
554        // tie-break keeps the physically-newer (most-recently-recorded) entry —
555        // so the incoming one is dropped.
556    }
557
558    /// Whether the window already holds its full `cap` entries.
559    fn is_full(&self) -> bool {
560        self.heap.len() >= self.cap
561    }
562
563    /// The `(year, month)` of the window's current minimum (oldest kept) entry,
564    /// or `None` when the window is empty. Used to prune older archives: an
565    /// archive month strictly before this can't beat the current cutoff.
566    fn min_year_month(&self) -> Option<(i32, u32)> {
567        self.heap
568            .peek()
569            .map(|item| (item.entry.timestamp.year(), item.entry.timestamp.month()))
570    }
571
572    /// The held entries, oldest→newest (chronological), ties broken
573    /// oldest-physical→newest-physical.
574    fn into_sorted(self) -> Vec<LogEntry> {
575        let mut items: Vec<WindowItem> = self.heap.into_vec();
576        // Ascending by timestamp; on a tie, oldest-physical (larger arrival)
577        // first so the most-recently-recorded entry sorts last.
578        items.sort_by(|a, b| {
579            a.entry
580                .timestamp
581                .cmp(&b.entry.timestamp)
582                .then(b.arrival.cmp(&a.arrival))
583        });
584        items.into_iter().map(|i| i.entry).collect()
585    }
586}
587
588/// One slot in [`NewestWindow`]'s heap. `Ord` is defined so the heap is a
589/// **min-heap on `(timestamp, physical-oldest)`**: `BinaryHeap` is a max-heap,
590/// so the root (max under this `Ord`) is the eviction candidate — the smallest
591/// timestamp, and on a tie the oldest-physical (largest `arrival`).
592struct WindowItem {
593    entry: LogEntry,
594    arrival: u64,
595}
596
597impl PartialEq for WindowItem {
598    fn eq(&self, other: &Self) -> bool {
599        self.entry.timestamp == other.entry.timestamp && self.arrival == other.arrival
600    }
601}
602impl Eq for WindowItem {}
603
604impl Ord for WindowItem {
605    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
606        // Reverse on timestamp so the *smallest* timestamp is the heap max
607        // (eviction candidate). On equal timestamps, the larger `arrival`
608        // (older physical) is the heap max so it is evicted first.
609        other
610            .entry
611            .timestamp
612            .cmp(&self.entry.timestamp)
613            .then(self.arrival.cmp(&other.arrival))
614    }
615}
616impl PartialOrd for WindowItem {
617    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
618        Some(self.cmp(other))
619    }
620}
621
622/// An advisory, exclusive lock serializing concurrent [`Log::append`] calls.
623///
624/// Held on a dedicated sibling lock file (`<active>.lock`) rather than on
625/// `log.md` itself: `write_atomic` replaces the active file by `rename`, so the
626/// active inode changes under us and a lock on its fd would not cover the new
627/// file. The lock file is stable, so the lock spans the whole read-modify-write.
628///
629/// On Unix this is `flock(LOCK_EX)`, released on drop (or implicitly when the
630/// process exits / the fd closes, so a crash never strands the lock). The
631/// lock file is created if absent and intentionally left on disk between runs
632/// (locking it does not depend on its contents). On non-Unix targets the lock
633/// is a no-op — db.md's append surface is Unix-targeted, and a missing advisory
634/// lock degrades to the pre-fix last-writer-wins, never to incorrectness of a
635/// single writer.
636struct AppendLock {
637    #[cfg(unix)]
638    file: Option<File>,
639}
640
641impl AppendLock {
642    /// Acquire the exclusive append lock for the store whose active log is
643    /// `active`. Best-effort: any failure to open or lock the lock file yields
644    /// an unlocked guard (we log rather than refuse to log). Blocks until the
645    /// lock is granted when another appender holds it.
646    fn acquire(active: &Path) -> AppendLock {
647        #[cfg(unix)]
648        {
649            let file = Self::open_and_lock(active);
650            AppendLock { file }
651        }
652        #[cfg(not(unix))]
653        {
654            let _ = active;
655            AppendLock {}
656        }
657    }
658
659    #[cfg(unix)]
660    fn open_and_lock(active: &Path) -> Option<File> {
661        use std::os::unix::io::AsRawFd;
662
663        // The lock file lives beside the active log; ensure its parent exists
664        // (the fresh-log path may run before `log.md`'s directory is created).
665        if let Some(parent) = active.parent() {
666            let _ = fs::create_dir_all(parent);
667        }
668        let lock_path = lock_path_for(active);
669        let file = std::fs::OpenOptions::new()
670            .create(true)
671            .truncate(false)
672            .write(true)
673            .open(&lock_path)
674            .ok()?;
675
676        // Blocking exclusive advisory lock. `flock` is in libc, which every Rust
677        // binary links, so the bare `extern "C"` declaration needs no crate dep.
678        let rc = unsafe { flock(file.as_raw_fd(), LOCK_EX) };
679        if rc != 0 {
680            // Could not lock (e.g. a filesystem without flock support): proceed
681            // unlocked rather than fail the append.
682            return None;
683        }
684        Some(file)
685    }
686}
687
688#[cfg(unix)]
689impl Drop for AppendLock {
690    fn drop(&mut self) {
691        use std::os::unix::io::AsRawFd;
692        if let Some(file) = &self.file {
693            // Release explicitly; the fd close on drop would also release it.
694            unsafe { flock(file.as_raw_fd(), LOCK_UN) };
695        }
696    }
697}
698
699#[cfg(unix)]
700extern "C" {
701    fn flock(fd: std::os::raw::c_int, operation: std::os::raw::c_int) -> std::os::raw::c_int;
702}
703
704/// `flock` operation: exclusive lock (`LOCK_EX`), blocking.
705#[cfg(unix)]
706const LOCK_EX: std::os::raw::c_int = 2;
707/// `flock` operation: unlock (`LOCK_UN`).
708#[cfg(unix)]
709const LOCK_UN: std::os::raw::c_int = 8;
710
711/// The advisory-lock sibling path for an active log file (`<name>.lock`).
712#[cfg(unix)]
713fn lock_path_for(active: &Path) -> PathBuf {
714    let mut name = active
715        .file_name()
716        .map(|s| s.to_os_string())
717        .unwrap_or_else(|| std::ffi::OsString::from("log.md"));
718    name.push(".lock");
719    match active.parent() {
720        Some(parent) => parent.join(name),
721        None => PathBuf::from(name),
722    }
723}
724
725/// The active `log.md` path under the store root.
726fn active_log_path(store: &Store) -> PathBuf {
727    store.root.join("log.md")
728}
729
730/// The `log/` archive directory under the store root.
731fn archive_dir(store: &Store) -> PathBuf {
732    store.root.join("log")
733}
734
735/// The `log/<YYYY-MM>.md` archive path for a given month.
736fn archive_path(store: &Store, year: i32, month: u32) -> PathBuf {
737    archive_dir(store).join(format!("{:04}-{:02}.md", year, month))
738}
739
740/// Parse a `YYYY-MM-DD HH:MM` header timestamp, reattaching UTC. `None` on any
741/// malformed shape.
742fn parse_timestamp(s: &str) -> Option<DateTime<FixedOffset>> {
743    let naive = NaiveDateTime::parse_from_str(s.trim(), TS_FORMAT).ok()?;
744    let utc = FixedOffset::east_opt(0)?;
745    utc.from_local_datetime(&naive).single()
746}
747
748/// Split a `log.md` / archive file into its leading frontmatter+heading block
749/// (everything up to and including the line before the first `## [` header) and
750/// its parsed entries. If there are no entries, the whole content is the header
751/// block.
752fn parse_active(content: &str) -> (String, Vec<LogEntry>) {
753    match find_first_header(content) {
754        Some(idx) => {
755            let header = content[..idx].to_string();
756            let entries = parse_entries(&content[idx..]);
757            (header, entries)
758        }
759        None => (content.to_string(), Vec::new()),
760    }
761}
762
763/// Byte offset of the first entry header (`## [` at the start of a line), or
764/// `None`.
765fn find_first_header(content: &str) -> Option<usize> {
766    if content.starts_with("## [") {
767        return Some(0);
768    }
769    content.match_indices("\n## [").next().map(|(i, _)| i + 1)
770}
771
772/// Whether `line` is a note line that — left unescaped — could be mistaken for
773/// an entry header. It is *header-ambiguous* when it is a (possibly empty) run
774/// of leading backslashes followed by a string that [`Log::parse_header`]
775/// accepts. The escape (one leading backslash) and only the escape is added to,
776/// or stripped from, such lines, so the transform is fully reversible:
777/// `## [..]` (a real header shape in note text) ⇄ `\## [..]`, and a literal
778/// `\## [..]` a note already contains ⇄ `\\## [..]`.
779fn is_header_ambiguous(line: &str) -> bool {
780    let stripped = line.trim_start_matches('\\');
781    // Only treat it as ambiguous if some backslashes were the *only* prefix and
782    // the remainder is a valid header — a backslash run that does not lead into
783    // a header (e.g. `\not a header`) is ordinary note text, left untouched.
784    Log::parse_header(stripped).is_some()
785}
786
787/// Escape one note line for on-disk emission so it can never be parsed as an
788/// entry header (the [write-path fix] for header-shaped notes corrupting the
789/// append-only log). A header-ambiguous line is prefixed with a single
790/// backslash, moving its `## [` off column 0; every other line is emitted
791/// verbatim. Reversed exactly by [`unescape_note_line`].
792fn escape_note_line(line: &str) -> std::borrow::Cow<'_, str> {
793    if is_header_ambiguous(line) {
794        std::borrow::Cow::Owned(format!("\\{line}"))
795    } else {
796        std::borrow::Cow::Borrowed(line)
797    }
798}
799
800/// Reverse [`escape_note_line`]: strip exactly one leading backslash from a
801/// header-ambiguous on-disk note line, restoring the literal the author wrote.
802/// A line that is not header-ambiguous (including a genuine `\not a header`) is
803/// returned untouched, so the round-trip is lossless for arbitrary note text.
804fn unescape_note_line(line: &str) -> std::borrow::Cow<'_, str> {
805    if let Some(rest) = line.strip_prefix('\\') {
806        if is_header_ambiguous(line) {
807            return std::borrow::Cow::Borrowed(rest);
808        }
809    }
810    std::borrow::Cow::Borrowed(line)
811}
812
813/// Parse every entry in a slice that begins at (or before, header-block
814/// included) a sequence of `## [` headers. Headers that fail to parse are
815/// skipped (their body folds into the previous valid entry's note is avoided —
816/// they simply start no new entry).
817fn parse_entries(text: &str) -> Vec<LogEntry> {
818    let mut entries: Vec<LogEntry> = Vec::new();
819    let mut cur_header: Option<(DateTime<FixedOffset>, LogKind, Option<String>)> = None;
820    let mut cur_note: Vec<&str> = Vec::new();
821
822    let flush = |entries: &mut Vec<LogEntry>,
823                 header: &mut Option<(DateTime<FixedOffset>, LogKind, Option<String>)>,
824                 note: &mut Vec<&str>| {
825        if let Some((timestamp, kind, object)) = header.take() {
826            // Reverse the per-line header escape `render` applies so an escaped
827            // header-shaped note line round-trips back to its literal form.
828            let joined = note
829                .iter()
830                .map(|line| unescape_note_line(line))
831                .collect::<Vec<_>>()
832                .join("\n");
833            let note_str = joined.trim_matches(['\n', '\r']).to_string();
834            entries.push(LogEntry {
835                timestamp,
836                kind,
837                object,
838                note: note_str,
839            });
840        }
841        note.clear();
842    };
843
844    for line in text.lines() {
845        if line.starts_with("## [") {
846            if let Some(parsed) = Log::parse_header(line) {
847                // Close the previous entry, start a new one.
848                flush(&mut entries, &mut cur_header, &mut cur_note);
849                cur_header = Some(parsed);
850                continue;
851            }
852            // Unparseable `## [` line: treat as body of the current entry.
853        }
854        if cur_header.is_some() {
855            cur_note.push(line);
856        }
857    }
858    flush(&mut entries, &mut cur_header, &mut cur_note);
859    entries
860}
861
862/// Recompose an active/archive file from a header block and an entry body.
863fn compose_active(header: &str, body: &str) -> String {
864    let mut out = String::new();
865    out.push_str(header);
866    if !header.is_empty() && !header.ends_with('\n') {
867        out.push('\n');
868    }
869    // Exactly one blank line between the heading block and the first entry.
870    if !header.is_empty() && !out.ends_with("\n\n") {
871        out.push('\n');
872    }
873    out.push_str(body);
874    out
875}
876
877/// Append entries to a month archive, creating it with `type: log` frontmatter
878/// if absent. Atomic (temp-file rename). Entries are appended in the given
879/// order (callers pass them already chronological within the month).
880///
881/// **Idempotent re-roll.** Rotation in [`Log::append`] is two non-atomic durable
882/// writes — roll prior-month entries into the archive, *then* rewrite the active
883/// file. If the process crashes or the active rewrite errors (e.g. ENOSPC,
884/// permission) *after* the archive write commits, the prior-month entries remain
885/// in the still-untrimmed active file, and `Log::append` surfaces the error so
886/// the agent retries. The retry re-partitions the same prior-month entries and
887/// re-rolls them here — so a naive concatenate would duplicate every entry in
888/// the month archive, amplifying on each retry, with no validate check to detect
889/// or repair it (the log is primary, no-rewrite data). To make the re-roll a
890/// no-op, we skip any incoming entry already present verbatim in the archive,
891/// keyed on the full entry identity `(timestamp, kind, object, note)`.
892fn append_to_archive(path: &Path, entries: &[LogEntry]) -> crate::Result<()> {
893    if path.exists() {
894        let existing = fs::read_to_string(path)?;
895        // Identities already on disk in this archive, so an interrupted-then-
896        // retried rotation re-rolling identical entries adds nothing.
897        let (_header, existing_entries) = parse_active(&existing);
898        let present: std::collections::HashSet<EntryKey> =
899            existing_entries.iter().map(entry_key).collect();
900
901        let mut body = String::new();
902        for e in entries {
903            if present.contains(&entry_key(e)) {
904                continue;
905            }
906            body.push_str(&e.render());
907        }
908        // Nothing new to add (a fully-duplicate re-roll): leave the archive
909        // byte-for-byte untouched (append-only: don't rewrite identical data).
910        if body.is_empty() {
911            return Ok(());
912        }
913
914        let mut full = existing;
915        if !full.ends_with('\n') {
916            full.push('\n');
917        }
918        full.push_str(&body);
919        crate::fsx::write_atomic(path, full.as_bytes())?;
920    } else {
921        let mut body = String::new();
922        for e in entries {
923            body.push_str(&e.render());
924        }
925        if let Some(parent) = path.parent() {
926            fs::create_dir_all(parent)?;
927        }
928        let full = compose_active(LOG_FRONTMATTER, &body);
929        crate::fsx::write_atomic(path, full.as_bytes())?;
930    }
931    Ok(())
932}
933
934/// A hashable identity for a log entry, used to dedup an idempotent archive
935/// re-roll (see [`append_to_archive`]). Two entries are "the same" when their
936/// timestamp, kind, object, and note all match — exactly the fields that
937/// round-trip through `render`/`parse`, so a re-rolled entry compares equal to
938/// the one already archived. Owned (rather than borrowed) so keys from the
939/// existing archive and from the incoming entries share one type regardless of
940/// where they came from; the cost is paid only on the cold rotation path.
941type EntryKey = (DateTime<FixedOffset>, String, Option<String>, String);
942
943/// Derive the dedup key for `e` (see [`EntryKey`]). Keying on `kind.as_str()`
944/// (rather than `LogKind`, which is not `Hash`) is exact: `as_str`/`parse`
945/// round-trips every recognized kind and preserves any `Custom` token.
946fn entry_key(e: &LogEntry) -> EntryKey {
947    (
948        e.timestamp,
949        e.kind.as_str().to_string(),
950        e.object.clone(),
951        e.note.clone(),
952    )
953}
954
955/// Every `log/<YYYY-MM>.md` archive, sorted **newest month first**.
956fn list_archives_desc(store: &Store) -> crate::Result<Vec<PathBuf>> {
957    let dir = archive_dir(store);
958    if !dir.is_dir() {
959        return Ok(Vec::new());
960    }
961    let mut months: Vec<(String, PathBuf)> = Vec::new();
962    for entry in fs::read_dir(&dir)? {
963        let entry = entry?;
964        let path = entry.path();
965        if !path.is_file() {
966            continue;
967        }
968        let name = match path.file_name().and_then(|s| s.to_str()) {
969            Some(n) => n,
970            None => continue,
971        };
972        // Match `YYYY-MM.md`.
973        if let Some(stem) = name.strip_suffix(".md") {
974            if is_year_month(stem) {
975                months.push((stem.to_string(), path.clone()));
976            }
977        }
978    }
979    // `YYYY-MM` strings sort lexically == chronologically; reverse for newest
980    // first.
981    months.sort_by(|a, b| b.0.cmp(&a.0));
982    Ok(months.into_iter().map(|(_, p)| p).collect())
983}
984
985/// The `(year, month)` an archive file represents, parsed from its
986/// `log/<YYYY-MM>.md` name. `None` if the name isn't a well-formed month
987/// archive (in which case the caller scans it rather than risk skipping it).
988fn archive_year_month(path: &Path) -> Option<(i32, u32)> {
989    let stem = path
990        .file_name()
991        .and_then(|s| s.to_str())
992        .and_then(|n| n.strip_suffix(".md"))?;
993    if !is_year_month(stem) {
994        return None;
995    }
996    let year: i32 = stem[..4].parse().ok()?;
997    let month: u32 = stem[5..7].parse().ok()?;
998    Some((year, month))
999}
1000
1001/// True if `s` looks like `YYYY-MM` (4 digits, dash, 2 digits).
1002fn is_year_month(s: &str) -> bool {
1003    let bytes = s.as_bytes();
1004    if bytes.len() != 7 {
1005        return false;
1006    }
1007    bytes[..4].iter().all(u8::is_ascii_digit)
1008        && bytes[4] == b'-'
1009        && bytes[5].is_ascii_digit()
1010        && bytes[6].is_ascii_digit()
1011}
1012
1013/// Reverse-read `path` from EOF, parsing entries newest-first and feeding each
1014/// to `take`. `take` returns `true` to stop early (enough collected). The file
1015/// is read backward in blocks; only the tail region needed to satisfy `take`
1016/// is read — the whole file is read only if `take` never returns `true`.
1017fn reverse_collect<F>(path: &Path, mut take: F) -> crate::Result<()>
1018where
1019    F: FnMut(LogEntry) -> bool,
1020{
1021    let mut file = File::open(path)?;
1022    let len = file.metadata()?.len();
1023    if len == 0 {
1024        return Ok(());
1025    }
1026
1027    // Algorithm: grow a tail buffer leftward one block at a time, emitting
1028    // entries strictly newest-first as their left boundary is confirmed, and
1029    // stopping the instant `take` says enough. The whole file is read only if
1030    // `take` never returns `true` (e.g. `tail(n)` with n ≥ entry count).
1031    //
1032    // Invariant: a `## [` line-start anywhere in the buffer is a *complete*
1033    // entry — its header is the entry's first line, and its body lies to the
1034    // right and is therefore already buffered (we read right-to-left). So we
1035    // never split an entry across blocks.
1036    //
1037    // `buf` holds the file's bytes from absolute offset `start` (growing
1038    // leftward toward 0) to EOF. `emitted_abs` records the absolute offsets of
1039    // headers already handed to `take`, so re-visiting a header in a later block
1040    // never double-emits.
1041    let mut buf: Vec<u8> = Vec::new();
1042    let mut start = len;
1043    // O(1) membership: a `Vec` + `.contains()` here would be O(E²) across a large
1044    // single-month file (every header re-checked against all prior emissions).
1045    let mut emitted_abs: std::collections::HashSet<u64> = std::collections::HashSet::new();
1046    // Every header's absolute offset found so far, ascending. Built
1047    // *incrementally*: each block contributes only the markers whose `#` starts
1048    // inside it (all strictly smaller than any already-known offset, so they
1049    // prepend in order). This is the fix for the accidental O(file²) scan — the
1050    // old code re-ran `header_offsets` over the whole accumulated buffer on every
1051    // block (O(file²/block) byte comparisons on the default no-early-stop
1052    // tail/since path); now each byte is scanned for a header exactly once.
1053    let mut headers: Vec<u64> = Vec::new();
1054    let mut stop = false;
1055    // The first backward block has no already-scanned region to its right, so it
1056    // scans exactly `[0, block)`; every later block scans one byte further
1057    // (`block + 1`) to re-classify the prior block's deferred left-edge candidate
1058    // now that its left neighbour is buffered (see the scan call below).
1059    let mut first = true;
1060
1061    while start > 0 && !stop {
1062        let block = std::cmp::min(REVERSE_BLOCK as u64, start);
1063        let new_start = start - block;
1064        file.seek(SeekFrom::Start(new_start))?;
1065        let mut chunk = vec![0u8; block as usize];
1066        file.read_exact(&mut chunk)?;
1067        chunk.extend_from_slice(&buf);
1068        buf = chunk;
1069        start = new_start;
1070
1071        // Scan the freshly-prepended block (buffer indices `[0, block)`) for new
1072        // header markers. A marker straddling the block boundary has its `#` in
1073        // this window and so is still caught (see `header_offsets_range`).
1074        //
1075        // One subtlety the scan must respect: a `## [` whose `#` sits at the
1076        // block's LEFT edge (buffer index 0, absolute offset `start`) cannot have
1077        // its line-start confirmed yet when `start > 0` — the byte at `start - 1`
1078        // is not buffered. Treating index 0 as a line start there fabricates an
1079        // entry from a mid-line `## [` fragment that happens to align with a block
1080        // boundary. So `header_offsets_range` DEFERS the leftmost candidate when
1081        // `base` is not the true file start, and we re-scan one byte further
1082        // right next time: after the first block the buffer carries the previous
1083        // block's left-edge byte at index `block` with its left neighbour now in
1084        // hand, so extending the window to `block + 1` re-classifies that exactly
1085        // once. `first` guards the first block (nothing to re-check on its right).
1086        let base_is_file_start = start == 0;
1087        let scan_hi = if first { block } else { block + 1 } as usize;
1088        let mut new_headers = header_offsets_range(&buf, start, 0, scan_hi, base_is_file_start);
1089        first = false;
1090        if !new_headers.is_empty() {
1091            new_headers.extend_from_slice(&headers);
1092            headers = new_headers;
1093        }
1094
1095        // Process newest (largest offset) → oldest (smallest), emitting any
1096        // header not yet emitted. Hold back only the buffer's *leftmost* header
1097        // while we have not reached file start (`start > 0`): older entries may
1098        // still lie to its left in unread blocks, and newest-first order
1099        // requires we not emit it until we've confirmed it really is the oldest
1100        // (or read enough to bound it on the left). One extra block read at
1101        // most; on the next iteration its left boundary is in-buffer.
1102        for i in (0..headers.len()).rev() {
1103            let abs = headers[i];
1104            if emitted_abs.contains(&abs) {
1105                continue;
1106            }
1107            let is_oldest_in_buf = i == 0;
1108            if is_oldest_in_buf && start > 0 {
1109                continue;
1110            }
1111
1112            let entry_text = entry_text_at(&buf, start, abs, &headers, i);
1113            if let Some(entry) = parse_single_entry(&entry_text) {
1114                emitted_abs.insert(abs);
1115                if take(entry) {
1116                    stop = true;
1117                    break;
1118                }
1119            } else {
1120                emitted_abs.insert(abs);
1121            }
1122        }
1123    }
1124
1125    // Reached file start (or stopped). If we stopped, done. If we reached
1126    // start, emit any held-back oldest header(s) now (start == 0 means the
1127    // buffer's first header is genuinely the oldest). `headers` already holds
1128    // every offset (the loop scanned down to start == 0), so reuse it.
1129    if !stop && start == 0 {
1130        for i in (0..headers.len()).rev() {
1131            let abs = headers[i];
1132            if emitted_abs.contains(&abs) {
1133                continue;
1134            }
1135            let entry_text = entry_text_at(&buf, start, abs, &headers, i);
1136            if let Some(entry) = parse_single_entry(&entry_text) {
1137                emitted_abs.insert(abs);
1138                if take(entry) {
1139                    break;
1140                }
1141            } else {
1142                emitted_abs.insert(abs);
1143            }
1144        }
1145    }
1146
1147    Ok(())
1148}
1149
1150/// Absolute byte offsets of every **valid** entry-header line-start (`## […]`)
1151/// in `buf`, where `buf` begins at absolute offset `base`.
1152///
1153/// Only a `## [` line that [`Log::parse_header`] accepts is an entry boundary,
1154/// mirroring the forward parser ([`parse_entries`]), which folds an unparseable
1155/// `## [` line into the preceding entry's note rather than starting a new entry.
1156/// Without this validity check the reverse reader would split a real entry's
1157/// multi-line note at a continuation line beginning at column 0 with `## [`
1158/// (a shape the SPEC permits — notes are "one or more lines" with no
1159/// restriction), truncating the note and dropping the carved pseudo-entry, so
1160/// `tail`/`since`/`last_validate_at` would return a note diverging from the
1161/// intact on-disk bytes.
1162///
1163/// Whole-buffer convenience wrapper over [`header_offsets_range`]. The runtime
1164/// reverse reader now always scans incrementally (one freshly-prepended window
1165/// per backward block), so this whole-buffer form is retained only as the
1166/// oracle the range-scan tests check the incremental scan against.
1167#[cfg(test)]
1168fn header_offsets(buf: &[u8], base: u64) -> Vec<u64> {
1169    // The whole-buffer oracle treats `base` as the file start iff it is 0, so a
1170    // `## [` at buffer index 0 is a real line-start there.
1171    header_offsets_range(buf, base, 0, buf.len(), base == 0)
1172}
1173
1174/// Like [`header_offsets`] but only reports header *markers whose `#` starts in*
1175/// `buf[scan_lo..scan_hi)`, while still consulting bytes outside that window —
1176/// to the left for the line-start (`buf[i-1] == b'\n'`) check and to the right
1177/// for the header line's content. This is the incremental scan
1178/// [`reverse_collect`] uses: each backward block searches only the freshly-
1179/// prepended region for *new* markers, so total header-scan work is linear in
1180/// the file size, not the O(file²) of re-scanning the whole growing buffer on
1181/// every block.
1182///
1183/// A `## [` marker that *straddles* the boundary (its `#` in the new block, its
1184/// `[` or trailing bytes in the already-scanned region) is still detected here:
1185/// its `#` index is `< scan_hi`, so it falls in this window, and it was never
1186/// reported by an earlier scan (whose window was `[block, …)`, strictly to the
1187/// right of this one) — so each marker is reported exactly once across all
1188/// blocks.
1189///
1190/// **Left-edge line-start safety.** A `## [` whose `#` is at buffer index 0 has
1191/// no buffered left neighbour, so its line-start cannot be confirmed unless
1192/// index 0 really is the file start. `base_is_file_start` says so: when it is
1193/// `false`, an index-0 candidate is DEFERRED (not reported) rather than assumed
1194/// to be at a line start — otherwise a mid-line `## […]` fragment that happens
1195/// to align with a block's left edge would be fabricated into an entry,
1196/// truncating the real entry's note and (after rotation) corrupting the
1197/// append-only archive. The caller re-scans that byte on the next block, once
1198/// its left neighbour is buffered, so a genuine boundary header is still found
1199/// exactly once.
1200fn header_offsets_range(
1201    buf: &[u8],
1202    base: u64,
1203    scan_lo: usize,
1204    scan_hi: usize,
1205    base_is_file_start: bool,
1206) -> Vec<u64> {
1207    const PAT: &[u8] = b"## [";
1208    let mut out = Vec::new();
1209    let n = buf.len();
1210    let hi = scan_hi.min(n);
1211    let mut i = scan_lo;
1212    // A marker's `#` must start strictly before `hi`; the pattern/line content
1213    // may read past `hi` into `buf` (the right neighbour is already buffered).
1214    while i < hi && i + PAT.len() <= n {
1215        if &buf[i..i + PAT.len()] == PAT {
1216            // Index 0 is a line start only when it is the genuine file start;
1217            // otherwise its left neighbour is unbuffered and the candidate is
1218            // deferred to the next block (see the doc comment).
1219            let at_line_start = if i == 0 {
1220                base_is_file_start
1221            } else {
1222                buf[i - 1] == b'\n'
1223            };
1224            if at_line_start && is_valid_header_line(buf, i) {
1225                out.push(base + i as u64);
1226                // skip ahead past this marker
1227                i += PAT.len();
1228                continue;
1229            }
1230        }
1231        i += 1;
1232    }
1233    out
1234}
1235
1236/// Whether the `## [` line starting at byte `i` in `buf` parses as a valid
1237/// entry header. Reads the line up to (but not including) the next `\n` (or
1238/// buffer end) and defers to [`Log::parse_header`] — the same validity gate the
1239/// forward parser applies, keeping the reverse reader's boundary set identical
1240/// to the forward one.
1241fn is_valid_header_line(buf: &[u8], i: usize) -> bool {
1242    let line_end = buf[i..]
1243        .iter()
1244        .position(|&b| b == b'\n')
1245        .map(|p| i + p)
1246        .unwrap_or(buf.len());
1247    let line = String::from_utf8_lossy(&buf[i..line_end]);
1248    Log::parse_header(&line).is_some()
1249}
1250
1251/// Extract the text of the entry whose header is at absolute offset
1252/// `header_abs` (the `headers[idx]` entry), spanning to the next header (or
1253/// buffer end). `buf` begins at absolute offset `base`.
1254fn entry_text_at(buf: &[u8], base: u64, header_abs: u64, headers: &[u64], idx: usize) -> String {
1255    let rel_start = (header_abs - base) as usize;
1256    let rel_end = if idx + 1 < headers.len() {
1257        (headers[idx + 1] - base) as usize
1258    } else {
1259        buf.len()
1260    };
1261    String::from_utf8_lossy(&buf[rel_start..rel_end]).into_owned()
1262}
1263
1264/// Parse a single entry from a text block that begins at its header line.
1265fn parse_single_entry(text: &str) -> Option<LogEntry> {
1266    parse_entries(text).into_iter().next()
1267}
1268
1269#[cfg(test)]
1270mod tests {
1271    use super::*;
1272    use crate::parser::Config;
1273    use std::fs;
1274    use tempfile::TempDir;
1275
1276    /// Build a `Store` rooted at a fresh temp dir with a minimal `DB.md`.
1277    /// Construct the `Store` struct directly so the test stays narrow and never
1278    /// exercises the `Store::open` parser path.
1279    fn temp_store() -> (TempDir, Store) {
1280        let dir = tempfile::tempdir().expect("tempdir");
1281        fs::write(dir.path().join("DB.md"), "---\ntype: db-md\n---\n").expect("write DB.md");
1282        let store = Store {
1283            root: dir.path().to_path_buf(),
1284            config: Config::default(),
1285        };
1286        (dir, store)
1287    }
1288
1289    /// A timestamp at UTC from `YYYY-MM-DD HH:MM` components.
1290    fn ts(y: i32, mo: u32, d: u32, h: u32, mi: u32) -> DateTime<FixedOffset> {
1291        let naive = chrono::NaiveDate::from_ymd_opt(y, mo, d)
1292            .unwrap()
1293            .and_hms_opt(h, mi, 0)
1294            .unwrap();
1295        FixedOffset::east_opt(0)
1296            .unwrap()
1297            .from_local_datetime(&naive)
1298            .single()
1299            .unwrap()
1300    }
1301
1302    #[allow(clippy::too_many_arguments)] // test fixture builder; struct-ifying churns every call site
1303    fn entry(
1304        y: i32,
1305        mo: u32,
1306        d: u32,
1307        h: u32,
1308        mi: u32,
1309        kind: LogKind,
1310        object: Option<&str>,
1311        note: &str,
1312    ) -> LogEntry {
1313        LogEntry {
1314            timestamp: ts(y, mo, d, h, mi),
1315            kind,
1316            object: object.map(|s| s.to_string()),
1317            note: note.to_string(),
1318        }
1319    }
1320
1321    // ── parse_header ────────────────────────────────────────────────────────
1322
1323    #[test]
1324    fn parse_header_with_object() {
1325        let (t, k, o) =
1326            Log::parse_header("## [2026-05-27 10:00] ingest | sources/emails/x.eml").unwrap();
1327        assert_eq!(t, ts(2026, 5, 27, 10, 0));
1328        assert_eq!(k, LogKind::Ingest);
1329        assert_eq!(o.as_deref(), Some("sources/emails/x.eml"));
1330    }
1331
1332    #[test]
1333    fn parse_header_without_object_is_none_object() {
1334        let (t, k, o) = Log::parse_header("## [2026-05-27 10:20] validate").unwrap();
1335        assert_eq!(t, ts(2026, 5, 27, 10, 20));
1336        assert_eq!(k, LogKind::Validate);
1337        assert_eq!(o, None);
1338    }
1339
1340    #[test]
1341    fn parse_header_custom_kind_roundtrips_token() {
1342        let (_, k, o) = Log::parse_header("## [2026-05-27 10:00] proposal | records/x").unwrap();
1343        assert_eq!(k, LogKind::Custom("proposal".to_string()));
1344        assert!(!k.is_recognized());
1345        assert_eq!(o.as_deref(), Some("records/x"));
1346    }
1347
1348    #[test]
1349    fn parse_header_index_rebuild_hyphenated_kind() {
1350        let (_, k, _) = Log::parse_header("## [2026-05-27 10:00] index-rebuild").unwrap();
1351        assert_eq!(k, LogKind::IndexRebuild);
1352        assert_eq!(k.as_str(), "index-rebuild");
1353    }
1354
1355    #[test]
1356    fn parse_header_rejects_non_headers() {
1357        assert!(Log::parse_header("Not a header").is_none());
1358        assert!(Log::parse_header("# Curator log").is_none());
1359        assert!(Log::parse_header("## [garbage] ingest | x").is_none());
1360        assert!(Log::parse_header("## [2026-05-27 10:00]").is_none()); // no kind
1361                                                                       // A bracketed but non-timestamp date must be rejected (LOG_BAD_TIMESTAMP territory).
1362        assert!(Log::parse_header("## [2026-13-40 99:99] ingest | x").is_none());
1363    }
1364
1365    // ── kind round-trip ───────────────────────────────────────────────────────
1366
1367    #[test]
1368    fn kind_as_str_parse_roundtrip_for_all_recognized() {
1369        for k in [
1370            LogKind::Ingest,
1371            LogKind::Create,
1372            LogKind::Update,
1373            LogKind::Delete,
1374            LogKind::Rename,
1375            LogKind::Link,
1376            LogKind::Validate,
1377            LogKind::IndexRebuild,
1378            LogKind::Contradiction,
1379        ] {
1380            assert_eq!(LogKind::parse(k.as_str()), k);
1381            assert!(k.is_recognized());
1382        }
1383    }
1384
1385    // ── append: creation + frontmatter ───────────────────────────────────────
1386
1387    #[test]
1388    fn append_creates_log_with_frontmatter_and_entry() {
1389        let (_d, store) = temp_store();
1390        let e = entry(
1391            2026,
1392            5,
1393            27,
1394            10,
1395            0,
1396            LogKind::Ingest,
1397            Some("sources/emails/x.eml"),
1398            "Email received.",
1399        );
1400        Log::append(&store, &e).unwrap();
1401
1402        let content = fs::read_to_string(store.root.join("log.md")).unwrap();
1403        // type: log frontmatter present.
1404        assert!(
1405            content.starts_with("---\ntype: log\n---\n"),
1406            "missing log frontmatter; got:\n{content}"
1407        );
1408        // The entry header is rendered verbatim.
1409        assert!(content.contains("## [2026-05-27 10:00] ingest | sources/emails/x.eml"));
1410        assert!(content.contains("Email received."));
1411        // No archive dir created when nothing rotates.
1412        assert!(!store.root.join("log").exists());
1413    }
1414
1415    // ── append → tail → since round-trip ─────────────────────────────────────
1416
1417    #[test]
1418    fn append_tail_since_roundtrip() {
1419        let (_d, store) = temp_store();
1420        let e1 = entry(2026, 5, 27, 10, 0, LogKind::Ingest, Some("a"), "first");
1421        let e2 = entry(2026, 5, 27, 10, 5, LogKind::Create, Some("b"), "second");
1422        let e3 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("c"), "third");
1423        Log::append(&store, &e1).unwrap();
1424        Log::append(&store, &e2).unwrap();
1425        Log::append(&store, &e3).unwrap();
1426
1427        // tail(2) returns the two newest, in chronological order.
1428        let tail = Log::tail(&store, 2).unwrap();
1429        assert_eq!(tail.len(), 2);
1430        assert_eq!(tail[0], e2);
1431        assert_eq!(tail[1], e3);
1432
1433        // tail(n) larger than the log returns everything, chronologically.
1434        let all = Log::tail(&store, 99).unwrap();
1435        assert_eq!(all, vec![e1.clone(), e2.clone(), e3.clone()]);
1436
1437        // since(10:05) returns strictly-newer entries (excludes the 10:05 one).
1438        let since = Log::since(&store, ts(2026, 5, 27, 10, 5)).unwrap();
1439        assert_eq!(since, vec![e3.clone()]);
1440
1441        // since before everything returns all.
1442        let since_all = Log::since(&store, ts(2026, 5, 27, 9, 0)).unwrap();
1443        assert_eq!(since_all, vec![e1, e2, e3]);
1444    }
1445
1446    #[test]
1447    fn tail_zero_is_empty() {
1448        let (_d, store) = temp_store();
1449        Log::append(
1450            &store,
1451            &entry(2026, 5, 27, 10, 0, LogKind::Ingest, Some("a"), "x"),
1452        )
1453        .unwrap();
1454        assert!(Log::tail(&store, 0).unwrap().is_empty());
1455    }
1456
1457    #[test]
1458    fn tail_and_since_on_missing_log_are_empty() {
1459        let (_d, store) = temp_store();
1460        assert!(Log::tail(&store, 5).unwrap().is_empty());
1461        assert!(Log::since(&store, ts(2000, 1, 1, 0, 0)).unwrap().is_empty());
1462        assert!(Log::last_validate_at(&store).unwrap().is_none());
1463    }
1464
1465    #[test]
1466    fn since_exact_timestamp_is_exclusive() {
1467        let (_d, store) = temp_store();
1468        let e = entry(2026, 5, 27, 10, 0, LogKind::Validate, None, "PASS");
1469        Log::append(&store, &e).unwrap();
1470        // Equal timestamp must NOT be included (strictly newer).
1471        assert!(Log::since(&store, ts(2026, 5, 27, 10, 0))
1472            .unwrap()
1473            .is_empty());
1474    }
1475
1476    // ── since: out-of-order on disk (append-only correction / merge=union) ────
1477
1478    /// Write a `log.md` at the store root from `entries` in the EXACT given
1479    /// physical order, with the standard `type: log` frontmatter. Unlike
1480    /// [`Log::append`] (which always lands the newest entry at EOF), this lets a
1481    /// test author the non-monotonic on-disk shape the SPEC permits — a
1482    /// backdated corrective entry below the entry it corrects, or a
1483    /// `merge=union` interleave.
1484    fn write_raw_log(store: &Store, entries: &[LogEntry]) {
1485        let mut content = String::from(LOG_FRONTMATTER);
1486        content.push('\n');
1487        for e in entries {
1488            content.push_str(&e.render());
1489        }
1490        fs::write(store.root.join("log.md"), content).expect("write raw log.md");
1491    }
1492
1493    #[test]
1494    fn since_returns_newer_entries_even_when_disk_order_is_non_monotonic() {
1495        // The demonstrated regression: a curator appended a backdated CORRECTIVE
1496        // entry (10:00) below newer entries (10:10, 10:05), so the physical
1497        // on-disk order is 10:10, 10:05, 10:00 — newest-first, not chronological.
1498        // The append-only SPEC explicitly permits this ("append a corrective
1499        // entry below it"; out-of-order is only LOG_OUT_OF_ORDER, a warning).
1500        let (_d, store) = temp_store();
1501        let e_1010 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("c"), "newest");
1502        let e_1005 = entry(2026, 5, 27, 10, 5, LogKind::Create, Some("b"), "middle");
1503        let e_1000 = entry(
1504            2026,
1505            5,
1506            27,
1507            10,
1508            0,
1509            LogKind::Update,
1510            Some("a"),
1511            "backdated fix",
1512        );
1513        // Physical order on disk: 10:10, 10:05, then the backdated 10:00 LAST.
1514        write_raw_log(&store, &[e_1010, e_1005, e_1000]);
1515
1516        // since 10:02 must return BOTH entries strictly newer than 10:02
1517        // (10:05 and 10:10). The old early-stop hit the physically-last 10:00
1518        // entry (<= 10:02), stopped, and returned EMPTY — silently dropping the
1519        // two newer entries that sit earlier in the file.
1520        let got = Log::since(&store, ts(2026, 5, 27, 10, 2)).unwrap();
1521        let stamps: std::collections::BTreeSet<_> = got.iter().map(|e| e.timestamp).collect();
1522        assert_eq!(
1523            stamps,
1524            [ts(2026, 5, 27, 10, 5), ts(2026, 5, 27, 10, 10)]
1525                .into_iter()
1526                .collect(),
1527            "since(10:02) must include both 10:05 and 10:10 despite the backdated \
1528             10:00 entry sitting physically last, and exclude 10:00; got {got:?}"
1529        );
1530
1531        // A cutoff before everything still returns all three, regardless of the
1532        // scrambled disk order.
1533        let all = Log::since(&store, ts(2026, 5, 27, 9, 0)).unwrap();
1534        let all_stamps: std::collections::BTreeSet<_> = all.iter().map(|e| e.timestamp).collect();
1535        assert_eq!(
1536            all_stamps,
1537            [
1538                ts(2026, 5, 27, 10, 0),
1539                ts(2026, 5, 27, 10, 5),
1540                ts(2026, 5, 27, 10, 10),
1541            ]
1542            .into_iter()
1543            .collect()
1544        );
1545    }
1546
1547    #[test]
1548    fn since_crosses_archive_when_newer_entry_is_out_of_order_inside_it() {
1549        // Out-of-order INSIDE an archive month, with the cutoff landing in that
1550        // month. The April archive is authored newest-physical-first (04-20,
1551        // then a backdated 04-05 last); a naive early-stop on the first
1552        // older-than-cutoff entry would miss the later April entry. The active
1553        // file holds a clean May entry. Cutoff = mid-April.
1554        let (_d, store) = temp_store();
1555
1556        // Active file: one current-month (May) entry.
1557        let may = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may1");
1558        write_raw_log(&store, &[may]);
1559
1560        // April archive authored out of order: 04-20 first, backdated 04-05 last.
1561        let apr_late = entry(
1562            2026,
1563            4,
1564            20,
1565            9,
1566            0,
1567            LogKind::Create,
1568            Some("apr-b"),
1569            "apr-late",
1570        );
1571        let apr_early = entry(
1572            2026,
1573            4,
1574            5,
1575            9,
1576            0,
1577            LogKind::Ingest,
1578            Some("apr-a"),
1579            "apr-early",
1580        );
1581        let dir = store.root.join("log");
1582        fs::create_dir_all(&dir).unwrap();
1583        let mut arch = String::from(LOG_FRONTMATTER);
1584        arch.push('\n');
1585        arch.push_str(&apr_late.render());
1586        arch.push_str(&apr_early.render());
1587        fs::write(dir.join("2026-04.md"), arch).unwrap();
1588
1589        // since mid-April: the later April entry (04-20) AND the May entry must
1590        // come back; the early April entry (04-05) must not.
1591        let got = Log::since(&store, ts(2026, 4, 15, 0, 0)).unwrap();
1592        let stamps: std::collections::BTreeSet<_> = got.iter().map(|e| e.timestamp).collect();
1593        assert_eq!(
1594            stamps,
1595            [ts(2026, 4, 20, 9, 0), ts(2026, 5, 2, 8, 0)]
1596                .into_iter()
1597                .collect(),
1598            "since(mid-April) must include the out-of-order later April entry \
1599             and the May entry, and exclude the earlier April entry; got {got:?}"
1600        );
1601    }
1602
1603    // ── multi-line notes ──────────────────────────────────────────────────────
1604
1605    #[test]
1606    fn multiline_note_is_preserved() {
1607        let (_d, store) = temp_store();
1608        let e = entry(
1609            2026,
1610            5,
1611            27,
1612            10,
1613            0,
1614            LogKind::Create,
1615            Some("records/x"),
1616            "Line one.\nLine two.\nLine three.",
1617        );
1618        Log::append(&store, &e).unwrap();
1619        let got = Log::tail(&store, 1).unwrap();
1620        assert_eq!(got[0].note, "Line one.\nLine two.\nLine three.");
1621    }
1622
1623    #[test]
1624    fn empty_note_roundtrips_as_empty() {
1625        let (_d, store) = temp_store();
1626        let e = entry(2026, 5, 27, 10, 0, LogKind::Validate, None, "");
1627        Log::append(&store, &e).unwrap();
1628        let got = Log::tail(&store, 1).unwrap();
1629        assert_eq!(got[0], e);
1630        assert_eq!(got[0].note, "");
1631    }
1632
1633    // ── last_validate_at ─────────────────────────────────────────────────────
1634
1635    #[test]
1636    fn last_validate_at_finds_most_recent_validate() {
1637        let (_d, store) = temp_store();
1638        Log::append(
1639            &store,
1640            &entry(2026, 5, 27, 10, 0, LogKind::Validate, None, "first pass"),
1641        )
1642        .unwrap();
1643        Log::append(
1644            &store,
1645            &entry(2026, 5, 27, 10, 5, LogKind::Create, Some("a"), "made a"),
1646        )
1647        .unwrap();
1648        Log::append(
1649            &store,
1650            &entry(2026, 5, 27, 10, 10, LogKind::Validate, None, "second pass"),
1651        )
1652        .unwrap();
1653        Log::append(
1654            &store,
1655            &entry(2026, 5, 27, 10, 15, LogKind::Update, Some("a"), "edit a"),
1656        )
1657        .unwrap();
1658
1659        let last = Log::last_validate_at(&store).unwrap();
1660        assert_eq!(last, Some(ts(2026, 5, 27, 10, 10)));
1661    }
1662
1663    #[test]
1664    fn last_validate_at_none_when_no_validate() {
1665        let (_d, store) = temp_store();
1666        Log::append(
1667            &store,
1668            &entry(2026, 5, 27, 10, 0, LogKind::Create, Some("a"), "x"),
1669        )
1670        .unwrap();
1671        assert_eq!(Log::last_validate_at(&store).unwrap(), None);
1672    }
1673
1674    // ── month-boundary rotation ──────────────────────────────────────────────
1675
1676    #[test]
1677    fn rotation_rolls_prior_months_into_archives() {
1678        let (_d, store) = temp_store();
1679        // Two April entries and one May entry, all written while "current" was
1680        // their own month (append-only chronological order).
1681        let a1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
1682        let a2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
1683        Log::append(&store, &a1).unwrap();
1684        Log::append(&store, &a2).unwrap();
1685
1686        // Before rotation: no archive dir, both April entries in active.
1687        assert!(!store.root.join("log").exists());
1688
1689        // Appending a May entry must roll April into log/2026-04.md.
1690        let m1 = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may one");
1691        Log::append(&store, &m1).unwrap();
1692
1693        // Archive exists and holds both April entries with frontmatter.
1694        let arch_path = store.root.join("log").join("2026-04.md");
1695        assert!(arch_path.exists(), "expected April archive to be created");
1696        let arch = fs::read_to_string(&arch_path).unwrap();
1697        assert!(arch.starts_with("---\ntype: log\n---\n"));
1698        assert!(arch.contains("## [2026-04-10 09:00] ingest | apr-a"));
1699        assert!(arch.contains("## [2026-04-20 09:00] create | apr-b"));
1700        assert!(arch.contains("apr one"));
1701        assert!(arch.contains("apr two"));
1702
1703        // Active file now holds ONLY the May entry (no April entries).
1704        let active = fs::read_to_string(store.root.join("log.md")).unwrap();
1705        assert!(active.contains("## [2026-05-02 08:00] update | may-a"));
1706        assert!(
1707            !active.contains("apr-a") && !active.contains("apr-b"),
1708            "April entries must be gone from the active file; got:\n{active}"
1709        );
1710
1711        // The full timeline (archives ++ active) is intact and chronological.
1712        let all = Log::tail(&store, 99).unwrap();
1713        assert_eq!(all, vec![a1, a2, m1]);
1714    }
1715
1716    #[test]
1717    fn rotation_groups_distinct_prior_months_into_separate_archives() {
1718        let (_d, store) = temp_store();
1719        // March + April entries accumulate, then a May append rolls BOTH prior
1720        // months into their own archive files.
1721        let mar = entry(2026, 3, 5, 9, 0, LogKind::Ingest, Some("mar"), "march");
1722        let apr = entry(2026, 4, 5, 9, 0, LogKind::Create, Some("apr"), "april");
1723        Log::append(&store, &mar).unwrap();
1724        Log::append(&store, &apr).unwrap();
1725        // At this point April is current, March already rolled into its archive.
1726        assert!(store.root.join("log").join("2026-03.md").exists());
1727
1728        let may = entry(2026, 5, 5, 9, 0, LogKind::Update, Some("may"), "may");
1729        Log::append(&store, &may).unwrap();
1730
1731        assert!(store.root.join("log").join("2026-03.md").exists());
1732        assert!(store.root.join("log").join("2026-04.md").exists());
1733
1734        // Each archive holds only its own month.
1735        let mar_arch = fs::read_to_string(store.root.join("log").join("2026-03.md")).unwrap();
1736        let apr_arch = fs::read_to_string(store.root.join("log").join("2026-04.md")).unwrap();
1737        assert!(mar_arch.contains("mar") && !mar_arch.contains("apr"));
1738        assert!(apr_arch.contains("apr") && !apr_arch.contains("mar"));
1739
1740        // Active holds only May.
1741        let active = fs::read_to_string(store.root.join("log.md")).unwrap();
1742        assert!(active.contains("may") && !active.contains("mar") && !active.contains("apr"));
1743
1744        // Timeline intact and ordered across both archives + active.
1745        let all = Log::tail(&store, 99).unwrap();
1746        assert_eq!(all, vec![mar, apr, may]);
1747    }
1748
1749    #[test]
1750    fn tail_crosses_into_archive_when_n_spans_month_boundary() {
1751        let (_d, store) = temp_store();
1752        let a1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr1");
1753        let a2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr2");
1754        let m1 = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may1");
1755        let m2 = entry(2026, 5, 3, 8, 0, LogKind::Update, Some("may-b"), "may2");
1756        for e in [&a1, &a2, &m1, &m2] {
1757            Log::append(&store, e).unwrap();
1758        }
1759        // April is now archived; active holds only May. tail(3) must reach back
1760        // into the archive for the third-newest entry.
1761        let tail3 = Log::tail(&store, 3).unwrap();
1762        assert_eq!(tail3, vec![a2.clone(), m1.clone(), m2.clone()]);
1763
1764        // tail within the active month does NOT need the archive but is still
1765        // correct.
1766        let tail2 = Log::tail(&store, 2).unwrap();
1767        assert_eq!(tail2, vec![m1, m2]);
1768    }
1769
1770    #[test]
1771    fn since_crosses_into_archive_and_early_stops() {
1772        let (_d, store) = temp_store();
1773        let a1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr1");
1774        let a2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr2");
1775        let m1 = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may1");
1776        for e in [&a1, &a2, &m1] {
1777            Log::append(&store, e).unwrap();
1778        }
1779        // since a mid-April time: must include the later April entry (from the
1780        // archive) and the May entry, but not the earlier April one.
1781        let got = Log::since(&store, ts(2026, 4, 15, 0, 0)).unwrap();
1782        assert_eq!(got, vec![a2, m1]);
1783    }
1784
1785    #[test]
1786    fn last_validate_at_crosses_into_archive() {
1787        let (_d, store) = temp_store();
1788        // A validate in April, then non-validate work that rolls April away.
1789        Log::append(
1790            &store,
1791            &entry(2026, 4, 10, 9, 0, LogKind::Validate, None, "apr validate"),
1792        )
1793        .unwrap();
1794        Log::append(
1795            &store,
1796            &entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may work"),
1797        )
1798        .unwrap();
1799        // Active has only the May update; the most-recent validate lives in the
1800        // April archive and must still be found.
1801        let last = Log::last_validate_at(&store).unwrap();
1802        assert_eq!(last, Some(ts(2026, 4, 10, 9, 0)));
1803    }
1804
1805    // ── reverse-read correctness on a large (multi-block) log ────────────────
1806
1807    #[test]
1808    fn reverse_read_correct_on_large_single_month_log() {
1809        let (_d, store) = temp_store();
1810        // Append many same-month entries with chunky multi-line notes so the
1811        // file spans well past one REVERSE_BLOCK (8 KiB). Timestamps are
1812        // strictly increasing (a real append-only log is monotonic): each entry
1813        // is 3 minutes after the previous, all within June, so physical order
1814        // equals chronological order and the last-k-physical ARE the k-newest.
1815        let n = 400usize;
1816        let mut expected: Vec<LogEntry> = Vec::new();
1817        for i in 0..n {
1818            let total_min = (i as u32) * 3;
1819            let day = 1 + total_min / (24 * 60);
1820            let hour = (total_min / 60) % 24;
1821            let min = total_min % 60;
1822            // Unique, multi-line note to bulk up the file and detect mis-parses.
1823            let note = format!(
1824                "entry number {i}\nbody line A for {i}\nbody line B for {i} with padding {}",
1825                "x".repeat(40)
1826            );
1827            let e = entry(
1828                2026,
1829                6,
1830                day,
1831                hour,
1832                min,
1833                LogKind::Update,
1834                Some(&format!("records/item-{i:04}")),
1835                &note,
1836            );
1837            Log::append(&store, &e).unwrap();
1838            expected.push(e);
1839        }
1840
1841        // File must actually be multi-block to exercise the backward reader.
1842        let size = fs::metadata(store.root.join("log.md")).unwrap().len();
1843        assert!(
1844            size > (REVERSE_BLOCK as u64) * 2,
1845            "test log not large enough ({size} bytes) to exercise multi-block reverse-read"
1846        );
1847
1848        // tail(5) must equal the 5 newest, exactly.
1849        let tail5 = Log::tail(&store, 5).unwrap();
1850        assert_eq!(tail5, expected[n - 5..].to_vec());
1851
1852        // tail(50) must equal the 50 newest.
1853        let tail50 = Log::tail(&store, 50).unwrap();
1854        assert_eq!(tail50, expected[n - 50..].to_vec());
1855
1856        // tail(all) must reconstruct the whole timeline in order.
1857        let all = Log::tail(&store, n + 10).unwrap();
1858        assert_eq!(all.len(), n);
1859        assert_eq!(all, expected);
1860    }
1861
1862    // ── tail on OUT-OF-ORDER logs (newest-by-timestamp, not last-physical) ────
1863    //
1864    // The append-only contract is non-decreasing time order, but it's only a
1865    // `LOG_OUT_OF_ORDER` warning when violated (corrective entries land below
1866    // the entry they correct; backdated / clock-skewed writes; `merge=union`
1867    // clone merges). `tail N` must return the N newest *by timestamp*, never the
1868    // last N *physical* entries.
1869
1870    /// Write `log.md` verbatim from rendered entries in the given **physical
1871    /// (file) order**, bypassing `Log::append` so the test controls on-disk
1872    /// order exactly (append never reorders within a month, but this is the
1873    /// clearest way to pin a specific physical layout).
1874    fn write_log_physical(store: &Store, entries: &[LogEntry]) {
1875        let mut body = String::new();
1876        for e in entries {
1877            body.push_str(&e.render());
1878        }
1879        let full = compose_active(LOG_FRONTMATTER, &body);
1880        fs::write(store.root.join("log.md"), full).expect("write log.md");
1881    }
1882
1883    #[test]
1884    fn tail_returns_newest_by_timestamp_on_demonstrated_out_of_order_log() {
1885        // The exact case from the review finding: physical order 10:10, 10:05,
1886        // 10:00 (a backdated entry tail). The OLD code returned the last two
1887        // physical entries {10:05, 10:00}; the correct answer is the two newest
1888        // by time {10:05, 10:10}.
1889        let (_d, store) = temp_store();
1890        let e_1010 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("c"), "ten-ten");
1891        let e_1005 = entry(
1892            2026,
1893            5,
1894            27,
1895            10,
1896            5,
1897            LogKind::Create,
1898            Some("b"),
1899            "ten-oh-five",
1900        );
1901        let e_1000 = entry(2026, 5, 27, 10, 0, LogKind::Ingest, Some("a"), "ten-oh-oh");
1902        // Physical order: newest first, then the two older ones — out of order.
1903        write_log_physical(&store, &[e_1010.clone(), e_1005.clone(), e_1000.clone()]);
1904
1905        let tail2 = Log::tail(&store, 2).unwrap();
1906        assert_eq!(
1907            tail2,
1908            vec![e_1005.clone(), e_1010.clone()],
1909            "tail(2) must be the two NEWEST by timestamp (chronological), \
1910             not the last two physical entries"
1911        );
1912        // The newest entry must be present and the oldest absent.
1913        assert!(tail2.contains(&e_1010), "newest (10:10) must be included");
1914        assert!(!tail2.contains(&e_1000), "oldest (10:00) must be excluded");
1915
1916        // tail(1) is just the single newest.
1917        assert_eq!(Log::tail(&store, 1).unwrap(), vec![e_1010.clone()]);
1918        // tail(all) is the full set in chronological order.
1919        assert_eq!(Log::tail(&store, 99).unwrap(), vec![e_1000, e_1005, e_1010]);
1920    }
1921
1922    #[test]
1923    fn tail_no_early_stop_when_newer_entry_sits_before_an_older_one() {
1924        // Guards the unsound within-file early stop: a newer entry (10:50) sits
1925        // PHYSICALLY BEFORE a much older one (10:00). Reading newest-physical-
1926        // first, the scan meets 10:00 before 10:50; any "stop at the first entry
1927        // below the window minimum" rule would bail and drop 10:50.
1928        //
1929        // Physical (top→bottom): 10:55, 10:10, 10:50, 10:00.
1930        // Reverse-scan order:     10:00, 10:50, 10:10, 10:55.
1931        let (_d, store) = temp_store();
1932        let e55 = entry(2026, 5, 27, 10, 55, LogKind::Update, Some("x55"), "55");
1933        let e10 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("x10"), "10");
1934        let e50 = entry(2026, 5, 27, 10, 50, LogKind::Update, Some("x50"), "50");
1935        let e00 = entry(2026, 5, 27, 10, 0, LogKind::Update, Some("x00"), "00");
1936        write_log_physical(
1937            &store,
1938            &[e55.clone(), e10.clone(), e50.clone(), e00.clone()],
1939        );
1940
1941        // The two newest by timestamp are 10:55 and 10:50 — NOT the early-stop
1942        // victim 10:10, and NOT the last-physical 10:00.
1943        let tail2 = Log::tail(&store, 2).unwrap();
1944        assert_eq!(tail2, vec![e50.clone(), e55.clone()]);
1945
1946        let tail3 = Log::tail(&store, 3).unwrap();
1947        assert_eq!(tail3, vec![e10.clone(), e50.clone(), e55.clone()]);
1948    }
1949
1950    #[test]
1951    fn tail_orders_equal_timestamps_by_physical_recency() {
1952        // Three entries share 10:00; one is at 09:59. tail(2) must keep both
1953        // 10:00 entries, and among the equal pair the one appended LATER
1954        // (physically last) sorts last ("newest" = most-recently recorded).
1955        let (_d, store) = temp_store();
1956        let early = entry(2026, 5, 27, 9, 59, LogKind::Create, Some("early"), "before");
1957        let tie_a = entry(
1958            2026,
1959            5,
1960            27,
1961            10,
1962            0,
1963            LogKind::Update,
1964            Some("tie-a"),
1965            "first 10:00",
1966        );
1967        let tie_b = entry(
1968            2026,
1969            5,
1970            27,
1971            10,
1972            0,
1973            LogKind::Update,
1974            Some("tie-b"),
1975            "second 10:00",
1976        );
1977        // Physical append order: early, tie_a, tie_b.
1978        write_log_physical(&store, &[early.clone(), tie_a.clone(), tie_b.clone()]);
1979
1980        let tail2 = Log::tail(&store, 2).unwrap();
1981        assert_eq!(
1982            tail2,
1983            vec![tie_a.clone(), tie_b.clone()],
1984            "both 10:00 entries kept, physically-later one (tie_b) last; 09:59 dropped"
1985        );
1986        // tail(1) keeps only the most-recently-recorded of the equal pair.
1987        assert_eq!(Log::tail(&store, 1).unwrap(), vec![tie_b]);
1988    }
1989
1990    #[test]
1991    fn tail_finds_newest_across_a_backdated_entry_spanning_the_month_boundary() {
1992        // A backdated entry can land physically after newer entries even across
1993        // a rotation: append May entries, then a June entry (rolls May to its
1994        // archive), then append a May-dated correction — it goes into the ACTIVE
1995        // file, physically after June. tail must still rank by timestamp, so the
1996        // June entry stays newest and the backdated May entry is not mistaken
1997        // for the tail.
1998        let (_d, store) = temp_store();
1999        let may1 = entry(2026, 5, 10, 9, 0, LogKind::Ingest, Some("may-1"), "may one");
2000        let may2 = entry(2026, 5, 20, 9, 0, LogKind::Create, Some("may-2"), "may two");
2001        let jun1 = entry(2026, 6, 2, 8, 0, LogKind::Update, Some("jun-1"), "jun one");
2002        Log::append(&store, &may1).unwrap();
2003        Log::append(&store, &may2).unwrap();
2004        Log::append(&store, &jun1).unwrap(); // rotates May -> log/2026-05.md
2005        assert!(store.root.join("log").join("2026-05.md").exists());
2006
2007        // A backdated May correction, appended now: it lands in the active file
2008        // (its month May is not strictly before the active month June), so the
2009        // active file is physically [jun1, may_corr] — out of order.
2010        let may_corr = entry(
2011            2026,
2012            5,
2013            25,
2014            9,
2015            0,
2016            LogKind::Update,
2017            Some("may-2"),
2018            "may correction",
2019        );
2020        Log::append(&store, &may_corr).unwrap();
2021        let active = fs::read_to_string(store.root.join("log.md")).unwrap();
2022        assert!(
2023            active.contains("jun-1") && active.contains("may correction"),
2024            "backdated May entry should be in the active file alongside June; got:\n{active}"
2025        );
2026
2027        // The single newest by timestamp is the June entry, even though the
2028        // backdated May entry is physically last.
2029        assert_eq!(Log::tail(&store, 1).unwrap(), vec![jun1.clone()]);
2030
2031        // tail(2): the two newest by time are may_corr (05-25) and jun1 (06-02).
2032        let tail2 = Log::tail(&store, 2).unwrap();
2033        assert_eq!(tail2, vec![may_corr.clone(), jun1.clone()]);
2034
2035        // tail(3) must reach into the May archive for the third-newest (may2,
2036        // 05-20), proving archive crossing still works on an out-of-order store.
2037        let tail3 = Log::tail(&store, 3).unwrap();
2038        assert_eq!(tail3, vec![may2.clone(), may_corr.clone(), jun1.clone()]);
2039
2040        // tail(all) reconstructs the whole timeline in chronological order.
2041        let all = Log::tail(&store, 99).unwrap();
2042        assert_eq!(all, vec![may1, may2, may_corr, jun1]);
2043    }
2044
2045    #[test]
2046    fn parse_entries_skips_unparseable_header_folding_into_body() {
2047        // A `## [` line that is NOT a valid header should not start a new entry;
2048        // it folds into the preceding entry's note. This guards the
2049        // parse_entries header-validation branch.
2050        let text = "\
2051## [2026-05-27 10:00] create | records/x
2052Body mentions a literal: ## [not a real header here]
2053More body.
2054
2055## [2026-05-27 10:05] update | records/y
2056Second.
2057";
2058        let entries = parse_entries(text);
2059        assert_eq!(entries.len(), 2);
2060        assert_eq!(entries[0].kind, LogKind::Create);
2061        assert!(entries[0].note.contains("## [not a real header here]"));
2062        assert!(entries[0].note.contains("More body."));
2063        assert_eq!(entries[1].kind, LogKind::Update);
2064        assert_eq!(entries[1].note, "Second.");
2065    }
2066
2067    // ── append-only: corrective entries go on the end ─────────────────────────
2068
2069    #[test]
2070    fn append_only_corrective_entry_goes_on_end_without_rewriting() {
2071        let (_d, store) = temp_store();
2072        let original = entry(
2073            2026,
2074            5,
2075            27,
2076            10,
2077            0,
2078            LogKind::Update,
2079            Some("records/northstar"),
2080            "Seat count 120 -> 175.",
2081        );
2082        Log::append(&store, &original).unwrap();
2083        let after_first = fs::read_to_string(store.root.join("log.md")).unwrap();
2084
2085        // A correction is a NEW entry appended on the end; the original text is
2086        // left byte-for-byte intact (append-only contract: no rewrite API).
2087        let correction = entry(
2088            2026,
2089            5,
2090            27,
2091            11,
2092            0,
2093            LogKind::Update,
2094            Some("records/northstar"),
2095            "Correction: seat count is 165, not 175.",
2096        );
2097        Log::append(&store, &correction).unwrap();
2098        let after_second = fs::read_to_string(store.root.join("log.md")).unwrap();
2099
2100        assert!(
2101            after_second.starts_with(&after_first),
2102            "appending must not rewrite earlier bytes"
2103        );
2104        assert!(after_second.contains("Correction: seat count is 165, not 175."));
2105
2106        // Both entries are readable, in order.
2107        let all = Log::tail(&store, 99).unwrap();
2108        assert_eq!(all, vec![original, correction]);
2109    }
2110
2111    // ── concurrent append safety (atomic via temp-file rename) ────────────────
2112
2113    #[test]
2114    fn concurrent_appends_are_atomic_and_total() {
2115        use std::sync::{Arc, Barrier};
2116        use std::thread;
2117
2118        let (_d, store) = temp_store();
2119        // Seed the file so all threads take the read-modify-write path.
2120        Log::append(
2121            &store,
2122            &entry(2026, 7, 1, 0, 0, LogKind::Create, Some("seed"), "seed"),
2123        )
2124        .unwrap();
2125
2126        let threads = 8usize;
2127        let per = 25usize;
2128        let barrier = Arc::new(Barrier::new(threads));
2129        let store = Arc::new(store);
2130
2131        let mut handles = Vec::new();
2132        for tnum in 0..threads {
2133            let b = Arc::clone(&barrier);
2134            let s = Arc::clone(&store);
2135            handles.push(thread::spawn(move || {
2136                b.wait();
2137                for i in 0..per {
2138                    let e = entry(
2139                        2026,
2140                        7,
2141                        1,
2142                        (tnum % 24) as u32,
2143                        (i % 60) as u32,
2144                        LogKind::Update,
2145                        Some(&format!("t{tnum}-i{i}")),
2146                        &format!("thread {tnum} item {i}"),
2147                    );
2148                    Log::append(&s, &e).unwrap();
2149                }
2150            }));
2151        }
2152        for h in handles {
2153            h.join().unwrap();
2154        }
2155
2156        // The atomic temp-file-rename write means no append truncates or
2157        // corrupts another: the file must remain parseable and every line of
2158        // every entry header must be well-formed. Crucially, no entry should be
2159        // lost to a torn write of the *content already on disk* — though
2160        // interleaved read-modify-write WILL drop some appends (last-writer-
2161        // wins on the snapshot). We therefore assert integrity + that the file
2162        // never went empty / corrupt, not an exact count.
2163        let content = fs::read_to_string(store.root.join("log.md")).unwrap();
2164        assert!(content.starts_with("---\ntype: log\n---\n"));
2165
2166        // Every `## [` line must parse as a valid header (no half-written line).
2167        for line in content.lines() {
2168            if line.starts_with("## [") {
2169                assert!(
2170                    Log::parse_header(line).is_some(),
2171                    "corrupt/torn header line on disk: {line:?}"
2172                );
2173            }
2174        }
2175
2176        // The seed entry must survive (it was written before the race and
2177        // every snapshot included it).
2178        assert!(content.contains("## [2026-07-01 00:00] create | seed"));
2179
2180        // The reverse reader must still produce a clean, fully-parseable view.
2181        let all = Log::tail(&store, 10_000).unwrap();
2182        assert!(!all.is_empty());
2183        // No duplicate adjacent identical headers from a torn write: every
2184        // returned entry must have a recognized-or-custom kind and a parseable
2185        // timestamp (already guaranteed by parse), and the list must be
2186        // internally consistent (re-render → re-parse identity for each).
2187        for e in &all {
2188            let rendered = e.render();
2189            let reparsed = parse_single_entry(&rendered).unwrap();
2190            assert_eq!(&reparsed, e);
2191        }
2192    }
2193
2194    // ── render/parse identity ────────────────────────────────────────────────
2195
2196    #[test]
2197    fn render_then_parse_is_identity() {
2198        let cases = vec![
2199            entry(
2200                2026,
2201                1,
2202                2,
2203                3,
2204                4,
2205                LogKind::Ingest,
2206                Some("sources/a.eml"),
2207                "n",
2208            ),
2209            entry(
2210                2026,
2211                12,
2212                31,
2213                23,
2214                59,
2215                LogKind::Validate,
2216                None,
2217                "PASS - 0 errors",
2218            ),
2219            entry(
2220                2026,
2221                6,
2222                15,
2223                12,
2224                30,
2225                LogKind::Custom("proposal".to_string()),
2226                Some("records/p"),
2227                "multi\nline\nnote",
2228            ),
2229            entry(2026, 6, 15, 12, 30, LogKind::Contradiction, Some("obj"), ""),
2230        ];
2231        for e in cases {
2232            let rendered = e.render();
2233            let parsed = parse_single_entry(&rendered).unwrap_or_else(|| {
2234                panic!("failed to reparse rendered entry:\n{rendered}");
2235            });
2236            assert_eq!(parsed, e, "round-trip mismatch for {e:?}");
2237        }
2238    }
2239
2240    // ── regression: rotation re-roll must not duplicate archive entries (#3) ──
2241
2242    /// Count occurrences of `needle` in `haystack` (non-overlapping).
2243    fn count_occurrences(haystack: &str, needle: &str) -> usize {
2244        haystack.matches(needle).count()
2245    }
2246
2247    #[test]
2248    fn regression_archive_reroll_is_idempotent_after_interrupted_rotation() {
2249        // Reconstructs the finding's exact failure window: rotation is two
2250        // non-atomic durable writes — (1) roll prior-month entries into the
2251        // archive, then (2) trim the active file. If the process crashes or the
2252        // active rewrite errors AFTER step (1) commits, the prior-month entries
2253        // stay in the untrimmed active file, the agent retries, and the retry
2254        // re-rolls the SAME entries into the archive a second time. The
2255        // mechanism is precisely a second `append_to_archive` of identical
2256        // entries onto an archive that already holds them.
2257        let (_d, store) = temp_store();
2258        let dir = archive_dir(&store);
2259        let arch = archive_path(&store, 2026, 4);
2260
2261        let apr1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
2262        let apr2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
2263        let month = [apr1.clone(), apr2.clone()];
2264
2265        // First roll (the committed step-(1) write before the crash).
2266        fs::create_dir_all(&dir).unwrap();
2267        append_to_archive(&arch, &month).unwrap();
2268
2269        // The retry re-rolls the identical prior-month entries. Pre-fix this
2270        // blindly concatenated, doubling every entry; do it twice to prove the
2271        // amplification a real retry loop would cause is fully suppressed.
2272        append_to_archive(&arch, &month).unwrap();
2273        append_to_archive(&arch, &month).unwrap();
2274
2275        let archived = fs::read_to_string(&arch).unwrap();
2276        // Each entry header must appear EXACTLY once despite the re-rolls.
2277        assert_eq!(
2278            count_occurrences(&archived, "## [2026-04-10 09:00] ingest | apr-a"),
2279            1,
2280            "re-rolled archive duplicated the first April entry; got:\n{archived}"
2281        );
2282        assert_eq!(
2283            count_occurrences(&archived, "## [2026-04-20 09:00] create | apr-b"),
2284            1,
2285            "re-rolled archive duplicated the second April entry; got:\n{archived}"
2286        );
2287
2288        // And the reader surface (`since`) must return each entry once, not the
2289        // duplicated set the pre-fix archive would have yielded.
2290        let got = Log::since(&store, ts(2026, 4, 1, 0, 0)).unwrap();
2291        assert_eq!(
2292            got,
2293            vec![apr1, apr2],
2294            "since over the re-rolled archive must return each April entry once"
2295        );
2296    }
2297
2298    #[test]
2299    fn regression_rotation_reroll_after_active_untrimmed_does_not_duplicate() {
2300        // End-to-end variant driving the real `Log::append` rotation path. We
2301        // rotate April into its archive via a May append, then SIMULATE the
2302        // partial failure by restoring the pre-trim active file (April + May)
2303        // and re-running `append` — exactly the state a crash-between-the-two-
2304        // writes / failed-active-rewrite + agent-retry produces. The archive
2305        // must still hold each April entry once.
2306        let (_d, store) = temp_store();
2307        let apr1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
2308        let apr2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
2309        Log::append(&store, &apr1).unwrap();
2310        Log::append(&store, &apr2).unwrap();
2311
2312        // Snapshot the active file holding both April entries (this is what is
2313        // still on disk if the post-rotation active rewrite never lands).
2314        let active_path = active_log_path(&store);
2315        let pre_rotation_active = fs::read_to_string(&active_path).unwrap();
2316
2317        // A May append rotates April out and trims the active file.
2318        let may = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may one");
2319        Log::append(&store, &may).unwrap();
2320        let arch = archive_path(&store, 2026, 4);
2321        assert!(arch.exists(), "April should have rotated to its archive");
2322
2323        // Simulate the crash/error: the active rewrite never persisted, so the
2324        // active file still contains the (now also archived) April entries.
2325        fs::write(&active_path, &pre_rotation_active).unwrap();
2326
2327        // The agent retries the append. Re-partitioning sees April as prior
2328        // months again and re-rolls them — which must NOT duplicate the archive.
2329        let may2 = entry(2026, 5, 3, 8, 0, LogKind::Update, Some("may-b"), "may two");
2330        Log::append(&store, &may2).unwrap();
2331
2332        let archived = fs::read_to_string(&arch).unwrap();
2333        assert_eq!(
2334            count_occurrences(&archived, "## [2026-04-10 09:00] ingest | apr-a"),
2335            1,
2336            "retried rotation duplicated an April entry in the archive; got:\n{archived}"
2337        );
2338        assert_eq!(
2339            count_occurrences(&archived, "## [2026-04-20 09:00] create | apr-b"),
2340            1,
2341            "retried rotation duplicated an April entry in the archive; got:\n{archived}"
2342        );
2343    }
2344
2345    // ── regression: reverse reader keeps a `## [` continuation note line (#10) ─
2346
2347    #[test]
2348    fn regression_reverse_reader_preserves_note_line_starting_with_bracket_header() {
2349        // SPEC permits a note of "one or more lines" with no restriction on a
2350        // continuation line starting at column 0 with `## [`. The forward parser
2351        // folds such an unparseable `## [` line into the note; the reverse
2352        // reader (tail/since/last_validate_at) must agree, not split on it.
2353        let (_d, store) = temp_store();
2354        let multi = "First line.\n## [draft outline] more\nThird line.";
2355        let e = entry(
2356            2026,
2357            5,
2358            27,
2359            10,
2360            0,
2361            LogKind::Update,
2362            Some("records/x"),
2363            multi,
2364        );
2365        // Author the log verbatim (render writes the note as-is); this is the
2366        // on-disk shape a hand-written / appended multi-line note produces.
2367        write_raw_log(&store, std::slice::from_ref(&e));
2368
2369        // Pre-fix: header_offsets treated `## [draft outline] more` as a second
2370        // entry boundary, truncating the note to "First line." and dropping the
2371        // carved (non-header) fragment. Post-fix: the full note survives.
2372        let got = Log::tail(&store, 1).unwrap();
2373        assert_eq!(got.len(), 1, "the single entry must be returned");
2374        assert_eq!(
2375            got[0].note, multi,
2376            "reverse reader truncated the note at the `## [` continuation line; \
2377             got {:?}",
2378            got[0].note
2379        );
2380        assert_eq!(got[0], e, "the whole entry must round-trip through tail");
2381
2382        // `since` (the other reverse-reading surface) must agree.
2383        let since = Log::since(&store, ts(2026, 5, 27, 9, 0)).unwrap();
2384        assert_eq!(since, vec![e]);
2385    }
2386
2387    // ── regression: `since` archive pruning uses the UTC month, not local (#11) ─
2388
2389    /// A `DateTime<FixedOffset>` at the given fixed offset (hours east of UTC).
2390    fn ts_offset(
2391        y: i32,
2392        mo: u32,
2393        d: u32,
2394        h: u32,
2395        mi: u32,
2396        offset_hours: i32,
2397    ) -> DateTime<FixedOffset> {
2398        let naive = chrono::NaiveDate::from_ymd_opt(y, mo, d)
2399            .unwrap()
2400            .and_hms_opt(h, mi, 0)
2401            .unwrap();
2402        FixedOffset::east_opt(offset_hours * 3600)
2403            .unwrap()
2404            .from_local_datetime(&naive)
2405            .single()
2406            .unwrap()
2407    }
2408
2409    #[test]
2410    fn regression_since_prunes_archives_on_utc_month_not_local_offset_month() {
2411        // Archive months are bucketed on the UTC calendar. A `since` cutoff with
2412        // a non-UTC offset near a month boundary must not prune an archive whose
2413        // UTC month equals the cutoff's UTC month just because the cutoff's
2414        // LOCAL month is later.
2415        let (_d, store) = temp_store();
2416
2417        // April archive: an entry late on 2026-04-30 at 18:00 UTC.
2418        let apr = entry(
2419            2026,
2420            4,
2421            30,
2422            18,
2423            0,
2424            LogKind::Update,
2425            Some("apr-late"),
2426            "april late",
2427        );
2428        let dir = archive_dir(&store);
2429        fs::create_dir_all(&dir).unwrap();
2430        let mut arch = String::from(LOG_FRONTMATTER);
2431        arch.push('\n');
2432        arch.push_str(&apr.render());
2433        fs::write(archive_path(&store, 2026, 4), arch).unwrap();
2434
2435        // Active file: a clean May entry, so an archive scan is actually needed.
2436        let may = entry(2026, 5, 5, 8, 0, LogKind::Update, Some("may-a"), "may one");
2437        write_raw_log(&store, std::slice::from_ref(&may));
2438
2439        // Cutoff 2026-05-01T00:30:00+07:00 == 2026-04-30T17:30:00Z. The April
2440        // 18:00 UTC entry is strictly newer than this instant.
2441        let cutoff = ts_offset(2026, 5, 1, 0, 30, 7);
2442        // Sanity: the cutoff's UTC month is April, its local month is May.
2443        assert_eq!((cutoff.year(), cutoff.month()), (2026, 5));
2444        assert_eq!(
2445            (
2446                cutoff.with_timezone(&Utc).year(),
2447                cutoff.with_timezone(&Utc).month()
2448            ),
2449            (2026, 4)
2450        );
2451
2452        // Pre-fix: cutoff_ym = (2026, 5) from local fields, so the (2026, 4)
2453        // archive was pruned and the genuinely-newer 18:00 UTC entry was dropped
2454        // — `since` returned only the May entry. Post-fix: cutoff_ym is UTC
2455        // (2026, 4), the April archive is scanned, and both come back.
2456        let got = Log::since(&store, cutoff).unwrap();
2457        let stamps: std::collections::BTreeSet<_> = got.iter().map(|e| e.timestamp).collect();
2458        assert_eq!(
2459            stamps,
2460            [ts(2026, 4, 30, 18, 0), ts(2026, 5, 5, 8, 0)]
2461                .into_iter()
2462                .collect(),
2463            "since(non-UTC cutoff near a month boundary) must include the April \
2464             archive entry newer than the cutoff instant; got {got:?}"
2465        );
2466    }
2467
2468    // ── regression: header-shaped note line corrupts the append-only log (#critical)
2469
2470    #[test]
2471    fn note_line_shaped_like_a_header_is_escaped_and_round_trips() {
2472        // A `contradiction` note quoting an earlier entry header is the
2473        // demonstrated corruption: the verbatim `## [2020-01-01 00:00] delete |
2474        // …` line was parsed as a REAL entry on readback (fabricated entry, real
2475        // note truncated). With write-path escaping it stays note body.
2476        let (_d, store) = temp_store();
2477        let note = "quoting earlier entry:\n## [2020-01-01 00:00] delete | records/contacts/jane.md\nend of quote";
2478        let e = entry(
2479            2026,
2480            6,
2481            11,
2482            4,
2483            41,
2484            LogKind::Contradiction,
2485            Some("records/contacts/jane.md"),
2486            note,
2487        );
2488        Log::append(&store, &e).unwrap();
2489
2490        // On disk: the header-shaped note line must NOT sit at column 0 as a
2491        // `## [` header — `grep "^## \["` must see exactly the one real header.
2492        let raw = fs::read_to_string(store.root.join("log.md")).unwrap();
2493        let header_lines = raw.lines().filter(|l| l.starts_with("## [")).count();
2494        assert_eq!(
2495            header_lines, 1,
2496            "exactly one real entry header may sit at column 0; got:\n{raw}"
2497        );
2498
2499        // Readback returns ONE entry, with the full note intact (no fabricated
2500        // 2020 entry, no truncation).
2501        let got = Log::tail(&store, 10).unwrap();
2502        assert_eq!(got.len(), 1, "exactly one entry; got {got:?}");
2503        assert_eq!(got[0].note, note, "note must round-trip verbatim");
2504        assert_eq!(got[0], e);
2505        let since = Log::since(&store, ts(2026, 1, 1, 0, 0)).unwrap();
2506        assert_eq!(since, vec![e.clone()]);
2507    }
2508
2509    #[test]
2510    fn header_shaped_note_survives_a_later_rotation_uncorrupted() {
2511        // Physical corruption: pre-fix, the fabricated past-dated pseudo-entry
2512        // (year 2020 < current) was rolled into an archive on the NEXT append,
2513        // splitting the real note. With escaping the line is note text, so a
2514        // later append never sees a phantom prior-month entry to roll out.
2515        let (_d, store) = temp_store();
2516        let note = "see\n## [2020-01-01 00:00] delete | records/x.md\nbelow";
2517        let first = entry(
2518            2026,
2519            6,
2520            11,
2521            4,
2522            41,
2523            LogKind::Contradiction,
2524            Some("records/x.md"),
2525            note,
2526        );
2527        Log::append(&store, &first).unwrap();
2528
2529        // Append another current-month entry — the path that re-parses + may
2530        // rotate. No 2020 archive must be created and the first note stays whole.
2531        let second = entry(
2532            2026,
2533            6,
2534            11,
2535            5,
2536            0,
2537            LogKind::Update,
2538            Some("records/y.md"),
2539            "y",
2540        );
2541        Log::append(&store, &second).unwrap();
2542
2543        assert!(
2544            !store.root.join("log").join("2020-01.md").exists(),
2545            "a header-shaped note line must not fabricate a 2020 archive"
2546        );
2547        let got = Log::tail(&store, 10).unwrap();
2548        assert_eq!(got.len(), 2, "two real entries only; got {got:?}");
2549        let first_back = got
2550            .iter()
2551            .find(|e| e.object.as_deref() == Some("records/x.md"));
2552        assert_eq!(
2553            first_back.map(|e| e.note.as_str()),
2554            Some(note),
2555            "the header-shaped note must survive the rotation pass intact"
2556        );
2557    }
2558
2559    #[test]
2560    fn escape_unescape_note_line_round_trips_including_literal_backslash() {
2561        // The escape must be lossless for arbitrary note lines, including a line
2562        // the author genuinely wrote starting with `\` before a header shape.
2563        let valid_header = "## [2020-01-01 00:00] delete | x";
2564        // A real header shape: escaped on write, restored on read.
2565        assert_eq!(
2566            &*escape_note_line(valid_header),
2567            &format!("\\{valid_header}")
2568        );
2569        let escaped = escape_note_line(valid_header).into_owned();
2570        assert_eq!(&*unescape_note_line(&escaped), valid_header);
2571        // An already-`\`-prefixed header-shape line escapes to two backslashes
2572        // and restores to one (never collapses to a bare header).
2573        let pre = format!("\\{valid_header}");
2574        assert_eq!(&*escape_note_line(&pre), &format!("\\{pre}"));
2575        let pre_escaped = escape_note_line(&pre).into_owned();
2576        assert_eq!(&*unescape_note_line(&pre_escaped), &pre);
2577        // Ordinary text (including a `\` that does NOT lead into a header) is
2578        // untouched both ways.
2579        for plain in ["plain note", "## [not a header]", "\\not a header", ""] {
2580            assert_eq!(&*escape_note_line(plain), plain);
2581            assert_eq!(&*unescape_note_line(plain), plain);
2582        }
2583    }
2584
2585    // ── regression: reverse reader scans each block once (no O(file²)) (#perf) ──
2586
2587    #[test]
2588    fn reverse_read_correct_with_header_straddling_a_block_boundary() {
2589        // The incremental per-block header scan must still catch a `## [` marker
2590        // whose `#` falls in one block but whose bytes extend into the already-
2591        // scanned region. Build a log whose total size crosses several blocks and
2592        // verify a full read reconstructs every entry — the straddle case is hit
2593        // by construction across the many block boundaries.
2594        let (_d, store) = temp_store();
2595        let n = 600usize;
2596        let mut expected: Vec<LogEntry> = Vec::new();
2597        for i in 0..n {
2598            let total_min = (i as u32) * 2;
2599            let day = 1 + total_min / (24 * 60);
2600            let hour = (total_min / 60) % 24;
2601            let min = total_min % 60;
2602            // Vary note length so headers land at many offsets relative to the
2603            // fixed 8 KiB block grid, exercising boundary straddles.
2604            let note = format!("note {i} {}", "y".repeat(i % 97));
2605            let e = entry(
2606                2026,
2607                6,
2608                day,
2609                hour,
2610                min,
2611                LogKind::Update,
2612                Some(&format!("records/item-{i:05}")),
2613                &note,
2614            );
2615            Log::append(&store, &e).unwrap();
2616            expected.push(e);
2617        }
2618        let size = fs::metadata(store.root.join("log.md")).unwrap().len();
2619        assert!(
2620            size > (REVERSE_BLOCK as u64) * 3,
2621            "test log not large enough ({size} bytes) to cross several blocks"
2622        );
2623        let all = Log::tail(&store, n + 10).unwrap();
2624        assert_eq!(all, expected, "every entry must reconstruct across blocks");
2625        // A small tail must also be exact (the n-newest by timestamp).
2626        assert_eq!(Log::tail(&store, 7).unwrap(), expected[n - 7..].to_vec());
2627    }
2628
2629    #[test]
2630    fn header_offsets_range_finds_boundary_straddling_marker_once() {
2631        // Two headers; `header_offsets` (whole-buffer) finds both. The range
2632        // scan with a window that splits the buffer between them must report the
2633        // one in its window exactly once, consulting the left neighbour for the
2634        // line-start check.
2635        let buf =
2636            b"## [2026-06-01 00:00] update | a\nnote a\n## [2026-06-01 00:01] update | b\nnote b\n";
2637        let full = header_offsets(buf, 0);
2638        assert_eq!(full.len(), 2, "both headers found over the whole buffer");
2639        let second = full[1] as usize;
2640        // A window covering only the SECOND header's `#` reports just it. Its `#`
2641        // is not at index 0, so `base_is_file_start` is irrelevant here.
2642        let only_second = header_offsets_range(buf, 0, second, second + 1, false);
2643        assert_eq!(only_second, vec![full[1]]);
2644        // A window covering only the FIRST reports just it (right content read
2645        // past the window into the buffer). `base == 0` is the true file start,
2646        // so the index-0 candidate is a real line start.
2647        let only_first = header_offsets_range(buf, 0, 0, 1, true);
2648        assert_eq!(only_first, vec![full[0]]);
2649        // Disjoint windows partition the markers with no double-count.
2650        let mut combined = header_offsets_range(buf, 0, 0, second, true);
2651        combined.extend(header_offsets_range(buf, 0, second, buf.len(), false));
2652        assert_eq!(combined, full);
2653    }
2654
2655    /// CRITICAL regression: a MID-LINE `## [<valid header>]` fragment inside a
2656    /// real entry's note that happens to align with a reverse-read block boundary
2657    /// must NOT be fabricated into an entry. The incremental backward scan reads
2658    /// each block's left edge before its left neighbour is buffered; treating
2659    /// buffer index 0 as a line start there would carve a phantom entry from the
2660    /// fragment and truncate the real entry's note. The fix defers the left-edge
2661    /// candidate until its neighbour is read, so the fragment is correctly seen
2662    /// as note body (its `#` is not at a line start).
2663    #[test]
2664    fn reverse_read_does_not_fabricate_entry_from_midline_header_at_block_boundary() {
2665        let (_d, store) = temp_store();
2666
2667        // A single real entry. Its note carries a mid-line `## [` fragment that
2668        // is a *valid* header shape but is NOT at column 0 (so the writer's
2669        // column-0 escape correctly leaves it verbatim — it is the trigger).
2670        let fragment = "see ## [2020-01-01 00:00] delete | records/x.md";
2671        let hash_in_fragment = fragment.find("##").expect("fragment has `##`");
2672
2673        // Build the raw active log by hand so the fragment's `#` lands at the
2674        // FIRST backward block's left edge: the reverse reader anchors its blocks
2675        // at EOF (`new_start = len - REVERSE_BLOCK` on the first block), so the
2676        // `#` must sit exactly `REVERSE_BLOCK` bytes before EOF. We append note
2677        // padding AFTER the fragment to push EOF out to that distance.
2678        //
2679        // Layout (one entry):
2680        //   <frontmatter>\n## [<header>] | records/real.md\nlead\n<fragment><tail>\n\n
2681        let header_line = "## [2026-06-14 10:00] update | records/real.md\n";
2682        let mut head = String::from(LOG_FRONTMATTER);
2683        head.push('\n');
2684        head.push_str(header_line);
2685        head.push_str("lead\n");
2686        head.push_str(fragment); // fragment opens the second note line
2687
2688        // Absolute offset of the fragment's `#`.
2689        let hash_off = head.len() - fragment.len() + hash_in_fragment;
2690        // We append `<tail>\n\n`. Bytes after `#` = (head.len() - hash_off) +
2691        // tail_len + 2. Need that == REVERSE_BLOCK so `#` is at `len -
2692        // REVERSE_BLOCK` (the first block's left edge).
2693        let after_hash_in_head = head.len() - hash_off;
2694        let tail_len = REVERSE_BLOCK
2695            .checked_sub(after_hash_in_head + 2)
2696            .expect("REVERSE_BLOCK comfortably exceeds the post-`#` head bytes");
2697        let mut body = head;
2698        body.push_str(&"z".repeat(tail_len)); // valid note bytes on the fragment line
2699        body.push('\n');
2700        body.push('\n');
2701        fs::write(store.root.join("log.md"), &body).unwrap();
2702
2703        // The file must be large enough to cross at least one block boundary.
2704        assert!(
2705            body.len() as u64 > REVERSE_BLOCK as u64,
2706            "test log must span >1 block (len {})",
2707            body.len()
2708        );
2709        // And the fragment's `#` sits exactly at the first block's left edge.
2710        let real_hash_off = body.find("see ##").unwrap() + hash_in_fragment;
2711        assert_eq!(
2712            real_hash_off,
2713            body.len() - REVERSE_BLOCK,
2714            "fragment `#` must land on the first backward block's left edge to exercise the bug"
2715        );
2716
2717        // Reverse read must return EXACTLY ONE entry — the real one — and never a
2718        // fabricated `2020-01-01 delete records/x.md` carved from the fragment.
2719        let got = Log::tail(&store, 10).unwrap();
2720        assert_eq!(
2721            got.len(),
2722            1,
2723            "exactly the one real entry; got {} (a fabricated entry means the boundary `#` was mis-read as a header): {got:#?}",
2724            got.len()
2725        );
2726        let only = &got[0];
2727        assert_eq!(only.object.as_deref(), Some("records/real.md"));
2728        assert_eq!(only.timestamp, ts(2026, 6, 14, 10, 0));
2729        // The note is intact end-to-end (not truncated at the fragment): both the
2730        // lead and the verbatim fragment survive.
2731        assert!(
2732            only.note.contains("lead"),
2733            "note keeps its lead; got {:?}",
2734            only.note
2735        );
2736        assert!(
2737            only.note.contains(fragment),
2738            "note keeps the verbatim mid-line fragment (not truncated); got {:?}",
2739            only.note
2740        );
2741    }
2742
2743    // ── regression: tail/since dedup across active+archive on interrupted rotation
2744
2745    #[test]
2746    fn tail_and_since_dedup_entries_present_in_both_active_and_archive() {
2747        // Reconstructs the finding's crash window: the archive write committed
2748        // but the active rewrite never trimmed, so the same April entries live in
2749        // BOTH the untrimmed active file and `log/2026-04.md`. Readers must
2750        // return each entry ONCE, not twice.
2751        let (_d, store) = temp_store();
2752        let apr_a = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
2753        let apr_b = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
2754
2755        // Active file still holds both April entries (the un-trimmed state).
2756        write_raw_log(&store, &[apr_a.clone(), apr_b.clone()]);
2757        // The committed step-1 archive holds the same two entries.
2758        let dir = archive_dir(&store);
2759        fs::create_dir_all(&dir).unwrap();
2760        let mut arch = String::from(LOG_FRONTMATTER);
2761        arch.push('\n');
2762        arch.push_str(&apr_a.render());
2763        arch.push_str(&apr_b.render());
2764        fs::write(archive_path(&store, 2026, 4), arch).unwrap();
2765
2766        // `since` must return each April entry exactly once.
2767        let since = Log::since(&store, ts(2026, 4, 1, 0, 0)).unwrap();
2768        assert_eq!(
2769            since,
2770            vec![apr_a.clone(), apr_b.clone()],
2771            "since must dedup the doubly-present entries; got {since:?}"
2772        );
2773
2774        // `tail` must too — no duplicate window slots.
2775        let tail = Log::tail(&store, 10).unwrap();
2776        assert_eq!(
2777            tail,
2778            vec![apr_a, apr_b],
2779            "tail must dedup the doubly-present entries; got {tail:?}"
2780        );
2781    }
2782}