Skip to main content

dbmd_core/
log.rs

1//! `log` — the append-only, month-rotating chronological log.
2//!
3//! One logical timeline: the active `log.md` at the store root plus
4//! `log/<YYYY-MM>.md` archives. [`Log::append`] rolls older months into
5//! archives on write so the active file stays current-month. [`Log::tail`] and
6//! [`Log::since`] **reverse-read from EOF**. Both read each file they touch in
7//! full — the on-disk order is not guaranteed monotonic, so neither can
8//! early-stop within a file — and select by timestamp: `tail` keeps the `n`
9//! newest, `since` keeps everything newer than the cutoff. Both cross into
10//! month archives only as far back as the requested window reaches (by the
11//! cutoff's month for `since`, by the current `n`th-newest's month for `tail`)
12//! — never the whole history.
13//!
14//! Append-only contract: there is no rewrite API. Corrective entries go on the
15//! end; out-of-order timestamps are a validate warning (`LOG_OUT_OF_ORDER`),
16//! signalling a probable rewrite.
17
18use std::collections::BTreeMap;
19use std::fs::{self, File};
20use std::io::{Read, Seek, SeekFrom};
21use std::path::{Path, PathBuf};
22
23use chrono::{DateTime, Datelike, FixedOffset, NaiveDateTime, TimeZone, Utc};
24
25use crate::store::Store;
26
27/// The on-disk header timestamp format: `YYYY-MM-DD HH:MM` (minute precision,
28/// no timezone). Parsing reattaches UTC; emitting renders the entry's own
29/// wall-clock, so a read→write→read round-trip is stable at minute precision.
30const TS_FORMAT: &str = "%Y-%m-%d %H:%M";
31
32/// The frontmatter block written when the active `log.md` is created.
33const LOG_FRONTMATTER: &str = "---\ntype: log\n---\n\n# Curator log\n";
34
35/// Block size for the backward (reverse-from-EOF) reader.
36const REVERSE_BLOCK: usize = 8 * 1024;
37
38/// A recognized `log.md` entry kind. Custom kinds are valid in the format
39/// (`dbmd validate` warns on unrecognized via `LOG_UNKNOWN_KIND`); this enum
40/// carries the recognized vocabulary plus a [`LogKind::Custom`] catch-all so an
41/// unknown kind round-trips without loss.
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum LogKind {
44    /// A source artifact was ingested.
45    Ingest,
46    /// A file was created.
47    Create,
48    /// A file was updated.
49    Update,
50    /// A file was deleted.
51    Delete,
52    /// A file was renamed/moved.
53    Rename,
54    /// A wiki-link was added.
55    Link,
56    /// A validation pass ran.
57    Validate,
58    /// The index was rebuilt.
59    IndexRebuild,
60    /// A contradiction between sources was flagged.
61    Contradiction,
62    /// Any kind outside the recognized vocabulary, preserved verbatim.
63    Custom(String),
64}
65
66impl LogKind {
67    /// The canonical lowercase string for this kind, as it appears in a log
68    /// header (`ingest`, `index-rebuild`, …).
69    pub fn as_str(&self) -> &str {
70        match self {
71            LogKind::Ingest => "ingest",
72            LogKind::Create => "create",
73            LogKind::Update => "update",
74            LogKind::Delete => "delete",
75            LogKind::Rename => "rename",
76            LogKind::Link => "link",
77            LogKind::Validate => "validate",
78            LogKind::IndexRebuild => "index-rebuild",
79            LogKind::Contradiction => "contradiction",
80            LogKind::Custom(s) => s,
81        }
82    }
83
84    /// Parse a kind from its header token; non-canonical tokens become
85    /// [`LogKind::Custom`].
86    pub fn parse(token: &str) -> LogKind {
87        match token {
88            "ingest" => LogKind::Ingest,
89            "create" => LogKind::Create,
90            "update" => LogKind::Update,
91            "delete" => LogKind::Delete,
92            "rename" => LogKind::Rename,
93            "link" => LogKind::Link,
94            "validate" => LogKind::Validate,
95            "index-rebuild" => LogKind::IndexRebuild,
96            "contradiction" => LogKind::Contradiction,
97            other => LogKind::Custom(other.to_string()),
98        }
99    }
100
101    /// True if this is one of the recognized kinds (i.e. not
102    /// [`LogKind::Custom`]).
103    pub fn is_recognized(&self) -> bool {
104        !matches!(self, LogKind::Custom(_))
105    }
106}
107
108/// One parsed `log.md` entry: a header
109/// (`## [YYYY-MM-DD HH:MM] <kind> | <object>`) plus its body.
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct LogEntry {
112    /// The entry timestamp from the header.
113    pub timestamp: DateTime<FixedOffset>,
114    /// The entry kind.
115    pub kind: LogKind,
116    /// The object slot — a store-relative path/wiki-link target, or `None` for
117    /// store-wide actions like `validate`.
118    pub object: Option<String>,
119    /// The free-form body (one or more lines) explaining what happened.
120    pub note: String,
121}
122
123impl LogEntry {
124    /// Render this entry as it appears on disk: the `## [...]` header line,
125    /// then the note body, then a trailing blank line so successive entries are
126    /// separated. The note is emitted with header-shaped continuation lines
127    /// **escaped** (see [`escape_note_line`]) so a note line that happens to
128    /// match the entry-header shape (`## [YYYY-MM-DD HH:MM] <kind> | <obj>`) can
129    /// never be mistaken for a real entry header on readback or on the next
130    /// rotation. The escape round-trips exactly through [`unescape_note_line`].
131    fn render(&self) -> String {
132        let ts = self.timestamp.format(TS_FORMAT);
133        let mut out = String::new();
134        match &self.object {
135            Some(obj) => {
136                out.push_str(&format!("## [{}] {} | {}\n", ts, self.kind.as_str(), obj));
137            }
138            None => {
139                out.push_str(&format!("## [{}] {}\n", ts, self.kind.as_str()));
140            }
141        }
142        // Trim only the structural line terminators (`\n`/`\r`) — the trailing
143        // blank line separating entries is appended below, so a note's own
144        // trailing newlines would otherwise stack up and shift on every
145        // re-render. Spaces and tabs are legitimate note *content* and must be
146        // preserved verbatim, so the round-trip is exact: readback
147        // (`parse_entries`) trims the same `['\n', '\r']` set and no more, and a
148        // note ending in a space (`"note 0 "`) must reconstruct unchanged.
149        let note = self.note.trim_end_matches(['\n', '\r']);
150        if !note.is_empty() {
151            // Escape per line: a note line that parses as an entry header is
152            // prefixed so it is no longer at column 0 as `## [` — it stays note
153            // body on readback and on rotation, never a fabricated entry.
154            for (i, line) in note.split('\n').enumerate() {
155                if i > 0 {
156                    out.push('\n');
157                }
158                out.push_str(&escape_note_line(line));
159            }
160            out.push('\n');
161        }
162        out.push('\n');
163        out
164    }
165
166    /// The `(year, month)` of this entry's wall-clock timestamp — the rotation
167    /// bucket.
168    fn year_month(&self) -> (i32, u32) {
169        (self.timestamp.year(), self.timestamp.month())
170    }
171}
172
173/// The store's chronological log: a thin handle for the append-only timeline.
174/// All methods take the [`Store`] so they resolve the active `log.md` and the
175/// `log/` archives under the store root.
176#[derive(Debug, Clone)]
177pub struct Log;
178
179impl Log {
180    /// Atomically append `entry` to the active `log.md`, creating it (with
181    /// `type: log` frontmatter) if absent. **If the active log holds entries
182    /// from a prior month, roll those older months into `log/<YYYY-MM>.md`
183    /// first** (atomic move), keeping the active file to the current month.
184    ///
185    /// **Concurrency.** `append` is a read-modify-write of the whole active file
186    /// (`write_atomic` is atomic at the file level, but the read→render→write
187    /// window is not). Two concurrent appenders — the manager and a cron-driven
188    /// background system, say — would otherwise both read the same N-entry
189    /// snapshot and each write N+1 entries, the second rename clobbering the
190    /// first and silently dropping an audit entry. We serialize the whole
191    /// read-modify-write under an advisory file lock (`flock`, held for the
192    /// duration) so concurrent appends queue instead of racing. The lock is
193    /// advisory and process-scoped; it guards the toolkit's own appends, which is
194    /// the realistic contention path.
195    pub fn append(store: &Store, entry: &LogEntry) -> crate::Result<()> {
196        let active = active_log_path(store);
197
198        // Serialize concurrent appends for the whole read-modify-write. Held
199        // until `_lock` drops at function exit (covering both the rotation and
200        // the plain-append paths). A lock failure is non-fatal: we proceed
201        // unlocked rather than refuse to log (best-effort, same posture as the
202        // pre-fix behaviour on platforms without advisory locks).
203        let _lock = AppendLock::acquire(&active);
204
205        // Read the active file's current contents (if any). The "current month"
206        // is the month of the entry being appended (the newest in the timeline);
207        // every existing entry from a strictly-earlier month rolls to archives.
208        let current_ym = entry.year_month();
209
210        if active.exists() {
211            let content = fs::read_to_string(&active)?;
212            let (header, entries) = parse_active(&content);
213
214            // Partition existing entries into prior-month (roll out) and
215            // current-or-later (keep in the active file).
216            let mut by_month: BTreeMap<(i32, u32), Vec<LogEntry>> = BTreeMap::new();
217            let mut keep: Vec<LogEntry> = Vec::new();
218            for e in entries {
219                if e.year_month() < current_ym {
220                    by_month.entry(e.year_month()).or_default().push(e);
221                } else {
222                    keep.push(e);
223                }
224            }
225
226            // A rotation is two non-atomic durable writes (archive append, then
227            // active trim). The marker disambiguates a crash-retry re-roll from a
228            // fresh rotation so a genuinely-distinct same-minute entry is never
229            // dropped (see `rotation_marker_path`). `recovering` is captured
230            // BEFORE we (re)write the marker, so the current attempt's archive
231            // append uses the right mode; the marker only changes what a LATER
232            // retry sees.
233            let marker = rotation_marker_path(store);
234            let recovering = marker.exists();
235
236            if !by_month.is_empty() {
237                // Roll each prior month into its archive (atomic per-file),
238                // appending to any existing archive for that month.
239                let dir = archive_dir(store);
240                fs::create_dir_all(&dir)?;
241                // Mark the rotation in-flight so a crash before the active trim
242                // is recoverable as a re-roll (deduped), not re-appended.
243                if !recovering {
244                    fs::write(&marker, b"")?;
245                }
246                for ((y, m), month_entries) in &by_month {
247                    let path = archive_path(store, *y, *m);
248                    append_to_archive(&path, month_entries, recovering)?;
249                }
250
251                // Rewrite the active file to the kept (current-month) entries
252                // plus the new entry — atomically.
253                let mut body = String::new();
254                for e in &keep {
255                    body.push_str(&e.render());
256                }
257                body.push_str(&entry.render());
258                let full = compose_active(&header, &body);
259                crate::fsx::write_atomic(&active, full.as_bytes())?;
260                // Rotation committed (active trimmed): clear the in-flight marker.
261                let _ = fs::remove_file(&marker);
262                return Ok(());
263            }
264
265            // No rotation needed. If a stale marker lingers (a crash that trimmed
266            // the active file but never deleted the marker), clear it so the next
267            // real rotation is treated as fresh, not stuck in recovery mode.
268            if recovering {
269                let _ = fs::remove_file(&marker);
270            }
271            // Plain atomic append of the rendered entry.
272            let mut full = content;
273            if !full.ends_with('\n') {
274                full.push('\n');
275            }
276            full.push_str(&entry.render());
277            crate::fsx::write_atomic(&active, full.as_bytes())?;
278            Ok(())
279        } else {
280            // Fresh log: frontmatter + the single entry.
281            if let Some(parent) = active.parent() {
282                fs::create_dir_all(parent)?;
283            }
284            let body = entry.render();
285            let full = compose_active(LOG_FRONTMATTER, &body);
286            crate::fsx::write_atomic(&active, full.as_bytes())?;
287            Ok(())
288        }
289    }
290
291    /// The `n` most-recent entries **by timestamp**, returned oldest→newest.
292    ///
293    /// **Out-of-order safety (mirrors [`Log::since`]).** The log is append-only
294    /// but *not* guaranteed to be in non-decreasing timestamp order on disk: a
295    /// corrective entry is appended below the entry it corrects, a
296    /// backdated/clock-skewed write lands physically after newer entries, and a
297    /// `merge=union` clone merge interleaves both sides until a later agent
298    /// reorders. Out-of-order is only a `LOG_OUT_OF_ORDER` warning, never
299    /// rejected. So the last `n` *physical* entries are **not** the `n` newest
300    /// by time — taking them would omit a genuinely-recent entry that sits
301    /// physically before an older one, and the documented curator warm-up
302    /// (`dbmd log tail 20`) would report a stale picture of what was done lately.
303    /// We therefore feed every entry of each file we touch through a bounded
304    /// newest-by-timestamp window and let it select the true top `n`.
305    ///
306    /// Bounded cost: the active `log.md` is kept to the current month by
307    /// rotation, so a full read of it is cheap and is not a whole-store walk.
308    /// Across archives we *can* prune: each `log/<YYYY-MM>.md` holds only entries
309    /// from that month (rotation buckets by the entry's own year-month), so once
310    /// the window is full, an archive whose month is strictly before the
311    /// window-minimum's month cannot contain any entry newer than the current
312    /// `n`th-newest. We cross archives newest-month-first and stop at the first
313    /// such archive.
314    pub fn tail(store: &Store, n: usize) -> crate::Result<Vec<LogEntry>> {
315        if n == 0 {
316            return Ok(Vec::new());
317        }
318
319        // A bounded window of the `n` entries with the largest timestamps. No
320        // within-file early stop: out-of-order entries mean a newer entry can
321        // sit physically before an older one, so each file is read fully.
322        let mut window = NewestWindow::new(n);
323        // Active↔archive overlap dedup, narrowly scoped (see `since`): an
324        // interrupted rotation can leave the SAME entry in both the untrimmed
325        // active file and its month archive; without suppression it would occupy
326        // two window slots and surface twice. We record every ACTIVE entry's
327        // identity and suppress only an ARCHIVE entry that matches one — NEVER an
328        // active entry against another active entry, nor an archive entry against
329        // another archive entry. A global content key over-reaches: on-disk
330        // headers are minute-precision, so two genuinely-distinct same-minute
331        // appends share an identity and a global dedup silently dropped the
332        // second on read.
333        let mut active_seen: std::collections::HashSet<EntryKey> = std::collections::HashSet::new();
334
335        // Active file: scan fully (current-month-bounded by rotation). Record
336        // every identity for overlap detection, but consider every entry — a
337        // same-minute duplicate WITHIN the active file is two distinct appends.
338        let active = active_log_path(store);
339        if active.exists() {
340            reverse_collect(&active, |e| {
341                active_seen.insert(entry_key(&e));
342                window.consider(e);
343                false
344            })?;
345        }
346
347        // Archives, newest-month-first. Once the window is full, an archive
348        // whose month is strictly before the window-minimum's month holds only
349        // entries older than the current cutoff, so it (and every older archive)
350        // is skippable.
351        for archive in list_archives_desc(store)? {
352            if let (true, Some(cutoff_ym), Some(arch_ym)) = (
353                window.is_full(),
354                window.min_year_month(),
355                archive_year_month(&archive),
356            ) {
357                if arch_ym < cutoff_ym {
358                    break;
359                }
360            }
361            reverse_collect(&archive, |e| {
362                // Suppress only the active↔archive crash-retry overlap; keep
363                // every distinct same-minute archive entry (archives are never
364                // deduped against each other).
365                if !active_seen.contains(&entry_key(&e)) {
366                    window.consider(e);
367                }
368                false
369            })?;
370        }
371
372        Ok(window.into_sorted())
373    }
374
375    /// Entries strictly newer than `time`, reverse-scanning active → archives.
376    ///
377    /// **No within-file early stop.** The log is append-only but *not*
378    /// guaranteed to be in non-decreasing timestamp order on disk: a corrective
379    /// entry is appended below the entry it corrects (SPEC: "if a finding is
380    /// wrong, append a corrective entry below it"), a backdated/clock-skewed
381    /// write lands physically after newer entries, and a `merge=union` clone
382    /// merge interleaves both sides until a later agent reorders. Out-of-order
383    /// is only a `LOG_OUT_OF_ORDER` warning, never rejected. So a newer entry
384    /// can sit physically *before* an older one; stopping at the first
385    /// older-than-`time` entry would silently drop those — the documented
386    /// curator warm-up (`dbmd log since <ts>`) would miss real recent work.
387    /// We therefore read every entry of each file we touch.
388    ///
389    /// Bounded cost: the active `log.md` is kept to the current month by
390    /// rotation, so a full read of it is cheap (the same read `tail` does for a
391    /// large `n`) and is not a whole-store walk. Across archives we *can* stop:
392    /// each `log/<YYYY-MM>.md` holds only entries from that month (rotation
393    /// buckets by the entry's own year-month), so an archive whose month is
394    /// strictly before `time`'s month cannot contain any entry newer than
395    /// `time`. We cross archives newest-month-first and stop at the first whose
396    /// month is entirely at or before `time`'s.
397    pub fn since(store: &Store, time: DateTime<FixedOffset>) -> crate::Result<Vec<LogEntry>> {
398        let mut collected: Vec<LogEntry> = Vec::new();
399        // Active↔archive overlap dedup, narrowly scoped. An interrupted rotation
400        // (archive write committed, active rewrite not) leaves the same entries
401        // in BOTH the untrimmed active file and the archive; without suppression
402        // each comes back twice. We record ACTIVE identities and suppress only an
403        // ARCHIVE entry that matches one — never active-vs-active or
404        // archive-vs-archive. A global content key would over-reach: on-disk
405        // headers are minute-precision, so two genuinely-distinct same-minute
406        // appends share an identity, and a global dedup silently under-reported
407        // the second.
408        let mut active_seen: std::collections::HashSet<EntryKey> = std::collections::HashSet::new();
409
410        // Active file: scan fully, no early stop (out-of-order safe). Collect
411        // every in-window entry (a same-minute duplicate within the active file
412        // is two distinct appends), recording identities for overlap detection.
413        let active = active_log_path(store);
414        if active.exists() {
415            reverse_collect(&active, |e| {
416                if e.timestamp > time {
417                    active_seen.insert(entry_key(&e));
418                    collected.push(e);
419                }
420                false
421            })?;
422        }
423
424        // The cutoff's own (year, month): any archive strictly before it holds
425        // only older entries and is skippable. Archive months are bucketed on
426        // the UTC calendar (on-disk timestamps are offset-free and re-read as
427        // UTC; rotation buckets by the entry's UTC year-month), so the pruning
428        // calendar must be UTC too. A non-UTC `since` offset (advertised in the
429        // CLI hint, e.g. `…T00:30:00+07:00`) whose local month differs from its
430        // UTC month would otherwise prune away an archive holding entries that
431        // are strictly newer than `time` — `time.year()/.month()` read the
432        // offset-LOCAL calendar, not UTC.
433        let cutoff_utc = time.with_timezone(&Utc);
434        let cutoff_ym = (cutoff_utc.year(), cutoff_utc.month());
435
436        for archive in list_archives_desc(store)? {
437            // Archives are newest-month-first; once a month is strictly before
438            // the cutoff's month, every remaining (older) archive is too.
439            if let Some(arch_ym) = archive_year_month(&archive) {
440                if arch_ym < cutoff_ym {
441                    break;
442                }
443            }
444            // Scan this archive fully — within a month, entries may still be
445            // out of order, so no within-file early stop.
446            reverse_collect(&archive, |e| {
447                // Suppress only the active↔archive crash-retry overlap; keep
448                // every distinct same-minute archive entry.
449                if e.timestamp > time && !active_seen.contains(&entry_key(&e)) {
450                    collected.push(e);
451                }
452                false
453            })?;
454        }
455
456        collected.reverse();
457        Ok(collected)
458    }
459
460    /// The timestamp of the most recent `validate` entry — the default `since`
461    /// window for working-set validation ([`crate::validate::validate_working_set`]).
462    pub fn last_validate_at(store: &Store) -> crate::Result<Option<DateTime<FixedOffset>>> {
463        let mut found: Option<DateTime<FixedOffset>> = None;
464
465        let active = active_log_path(store);
466        if active.exists() {
467            reverse_collect(&active, |e| {
468                if e.kind == LogKind::Validate {
469                    found = Some(e.timestamp);
470                    true
471                } else {
472                    false
473                }
474            })?;
475        }
476
477        if found.is_none() {
478            for archive in list_archives_desc(store)? {
479                reverse_collect(&archive, |e| {
480                    if e.kind == LogKind::Validate {
481                        found = Some(e.timestamp);
482                        true
483                    } else {
484                        false
485                    }
486                })?;
487                if found.is_some() {
488                    break;
489                }
490            }
491        }
492
493        Ok(found)
494    }
495
496    /// Parse a single entry header (`## [YYYY-MM-DD HH:MM] <kind> | <object>`)
497    /// into its timestamp, kind, and object. Returns `None` if the line isn't a
498    /// well-formed entry header.
499    pub fn parse_header(line: &str) -> Option<(DateTime<FixedOffset>, LogKind, Option<String>)> {
500        let line = line.trim_end_matches(['\n', '\r']);
501        let rest = line.strip_prefix("## [")?;
502        let close = rest.find(']')?;
503        let ts_str = &rest[..close];
504        let timestamp = parse_timestamp(ts_str)?;
505
506        // Everything after the closing bracket: ` <kind> | <object>` or
507        // ` <kind>`.
508        let after = rest[close + 1..].trim();
509        if after.is_empty() {
510            return None;
511        }
512
513        let (kind_str, object) = match after.split_once('|') {
514            Some((k, o)) => {
515                let obj = o.trim();
516                let obj = if obj.is_empty() {
517                    None
518                } else {
519                    Some(obj.to_string())
520                };
521                (k.trim(), obj)
522            }
523            None => (after, None),
524        };
525
526        if kind_str.is_empty() {
527            return None;
528        }
529
530        Some((timestamp, LogKind::parse(kind_str), object))
531    }
532}
533
534// ── Internal helpers ────────────────────────────────────────────────────────
535
536/// A bounded window of the `n` entries with the largest timestamps, fed by a
537/// **reverse (newest-physical-first) scan** and used by [`Log::tail`].
538///
539/// Why this exists: the last `n` *physical* entries are the `n` newest only
540/// when the log is in non-decreasing time order. That's the append-only
541/// contract, not a guarantee — a backdated, clock-skewed, or merge-interleaved
542/// entry violates it (and trips the `LOG_OUT_OF_ORDER` validate warning). The
543/// window decouples `tail` from that assumption: it keeps the `n` largest
544/// timestamps seen regardless of the order they arrive in, so the caller can
545/// read each file fully (no fragile within-file early stop) and still get the
546/// true top `n`.
547///
548/// Tie-break: entries sharing a timestamp at the window boundary are ordered by
549/// **physical recency** — the one appended later (encountered earlier in the
550/// reverse scan, i.e. a smaller `arrival`) wins. "Newest" means most-recently
551/// recorded.
552struct NewestWindow {
553    cap: usize,
554    /// Min-by-(timestamp, then physical-oldest) heap: the root is always the
555    /// next entry to evict once the window is full.
556    heap: std::collections::BinaryHeap<WindowItem>,
557    /// Count of entries fed in, in reverse-scan order, used as the tie-break
558    /// key (0 = newest physical).
559    next_arrival: u64,
560}
561
562impl NewestWindow {
563    fn new(cap: usize) -> Self {
564        NewestWindow {
565            cap,
566            heap: std::collections::BinaryHeap::with_capacity(cap),
567            next_arrival: 0,
568        }
569    }
570
571    /// Offer one entry from the scan. If the window isn't full it's kept; once
572    /// full, it's kept (evicting the current minimum) iff its timestamp is `>=`
573    /// the window minimum. Equal-timestamp boundary entries resolve by physical
574    /// recency (see the type doc).
575    fn consider(&mut self, entry: LogEntry) {
576        let arrival = self.next_arrival;
577        self.next_arrival += 1;
578
579        if self.heap.len() < self.cap {
580            self.heap.push(WindowItem { entry, arrival });
581            return;
582        }
583
584        // Window full. The heap root is the current minimum (oldest-by-
585        // timestamp held; on a tie, the oldest-physical).
586        let root = self.heap.peek().expect("full window has a root");
587        if entry.timestamp > root.entry.timestamp {
588            // Strictly newer than the window minimum: it belongs; evict the min.
589            self.heap.pop();
590            self.heap.push(WindowItem { entry, arrival });
591        }
592        // On `<=` we keep the window as-is. `<` is plainly too old. `==` is the
593        // tie case: the scan is newest-physical-first, so this entry is
594        // physically *older* than the held one of equal timestamp, and the
595        // tie-break keeps the physically-newer (most-recently-recorded) entry —
596        // so the incoming one is dropped.
597    }
598
599    /// Whether the window already holds its full `cap` entries.
600    fn is_full(&self) -> bool {
601        self.heap.len() >= self.cap
602    }
603
604    /// The `(year, month)` of the window's current minimum (oldest kept) entry,
605    /// or `None` when the window is empty. Used to prune older archives: an
606    /// archive month strictly before this can't beat the current cutoff.
607    fn min_year_month(&self) -> Option<(i32, u32)> {
608        self.heap
609            .peek()
610            .map(|item| (item.entry.timestamp.year(), item.entry.timestamp.month()))
611    }
612
613    /// The held entries, oldest→newest (chronological), ties broken
614    /// oldest-physical→newest-physical.
615    fn into_sorted(self) -> Vec<LogEntry> {
616        let mut items: Vec<WindowItem> = self.heap.into_vec();
617        // Ascending by timestamp; on a tie, oldest-physical (larger arrival)
618        // first so the most-recently-recorded entry sorts last.
619        items.sort_by(|a, b| {
620            a.entry
621                .timestamp
622                .cmp(&b.entry.timestamp)
623                .then(b.arrival.cmp(&a.arrival))
624        });
625        items.into_iter().map(|i| i.entry).collect()
626    }
627}
628
629/// One slot in [`NewestWindow`]'s heap. `Ord` is defined so the heap is a
630/// **min-heap on `(timestamp, physical-oldest)`**: `BinaryHeap` is a max-heap,
631/// so the root (max under this `Ord`) is the eviction candidate — the smallest
632/// timestamp, and on a tie the oldest-physical (largest `arrival`).
633struct WindowItem {
634    entry: LogEntry,
635    arrival: u64,
636}
637
638impl PartialEq for WindowItem {
639    fn eq(&self, other: &Self) -> bool {
640        self.entry.timestamp == other.entry.timestamp && self.arrival == other.arrival
641    }
642}
643impl Eq for WindowItem {}
644
645impl Ord for WindowItem {
646    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
647        // Reverse on timestamp so the *smallest* timestamp is the heap max
648        // (eviction candidate). On equal timestamps, the larger `arrival`
649        // (older physical) is the heap max so it is evicted first.
650        other
651            .entry
652            .timestamp
653            .cmp(&self.entry.timestamp)
654            .then(self.arrival.cmp(&other.arrival))
655    }
656}
657impl PartialOrd for WindowItem {
658    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
659        Some(self.cmp(other))
660    }
661}
662
663/// An advisory, exclusive lock serializing concurrent [`Log::append`] calls.
664///
665/// Held on a dedicated sibling lock file (`<active>.lock`) rather than on
666/// `log.md` itself: `write_atomic` replaces the active file by `rename`, so the
667/// active inode changes under us and a lock on its fd would not cover the new
668/// file. The lock file is stable, so the lock spans the whole read-modify-write.
669///
670/// On Unix this is `flock(LOCK_EX)`, released on drop (or implicitly when the
671/// process exits / the fd closes, so a crash never strands the lock). The
672/// lock file is created if absent and intentionally left on disk between runs
673/// (locking it does not depend on its contents). On non-Unix targets the lock
674/// is a no-op — db.md's append surface is Unix-targeted, and a missing advisory
675/// lock degrades to the pre-fix last-writer-wins, never to incorrectness of a
676/// single writer.
677struct AppendLock {
678    #[cfg(unix)]
679    file: Option<File>,
680}
681
682impl AppendLock {
683    /// Acquire the exclusive append lock for the store whose active log is
684    /// `active`. Best-effort: any failure to open or lock the lock file yields
685    /// an unlocked guard (we log rather than refuse to log). Blocks until the
686    /// lock is granted when another appender holds it.
687    fn acquire(active: &Path) -> AppendLock {
688        #[cfg(unix)]
689        {
690            let file = Self::open_and_lock(active);
691            AppendLock { file }
692        }
693        #[cfg(not(unix))]
694        {
695            let _ = active;
696            AppendLock {}
697        }
698    }
699
700    #[cfg(unix)]
701    fn open_and_lock(active: &Path) -> Option<File> {
702        use std::os::unix::io::AsRawFd;
703
704        // The lock file lives beside the active log; ensure its parent exists
705        // (the fresh-log path may run before `log.md`'s directory is created).
706        if let Some(parent) = active.parent() {
707            let _ = fs::create_dir_all(parent);
708        }
709        let lock_path = lock_path_for(active);
710        let file = std::fs::OpenOptions::new()
711            .create(true)
712            .truncate(false)
713            .write(true)
714            .open(&lock_path)
715            .ok()?;
716
717        // Blocking exclusive advisory lock. `flock` is in libc, which every Rust
718        // binary links, so the bare `extern "C"` declaration needs no crate dep.
719        let rc = unsafe { flock(file.as_raw_fd(), LOCK_EX) };
720        if rc != 0 {
721            // Could not lock (e.g. a filesystem without flock support): proceed
722            // unlocked rather than fail the append.
723            return None;
724        }
725        Some(file)
726    }
727}
728
729#[cfg(unix)]
730impl Drop for AppendLock {
731    fn drop(&mut self) {
732        use std::os::unix::io::AsRawFd;
733        if let Some(file) = &self.file {
734            // Release explicitly; the fd close on drop would also release it.
735            unsafe { flock(file.as_raw_fd(), LOCK_UN) };
736        }
737    }
738}
739
740#[cfg(unix)]
741extern "C" {
742    fn flock(fd: std::os::raw::c_int, operation: std::os::raw::c_int) -> std::os::raw::c_int;
743}
744
745/// `flock` operation: exclusive lock (`LOCK_EX`), blocking.
746#[cfg(unix)]
747const LOCK_EX: std::os::raw::c_int = 2;
748/// `flock` operation: unlock (`LOCK_UN`).
749#[cfg(unix)]
750const LOCK_UN: std::os::raw::c_int = 8;
751
752/// The advisory-lock sibling path for an active log file (`<name>.lock`).
753#[cfg(unix)]
754fn lock_path_for(active: &Path) -> PathBuf {
755    let mut name = active
756        .file_name()
757        .map(|s| s.to_os_string())
758        .unwrap_or_else(|| std::ffi::OsString::from("log.md"));
759    name.push(".lock");
760    match active.parent() {
761        Some(parent) => parent.join(name),
762        None => PathBuf::from(name),
763    }
764}
765
766/// The active `log.md` path under the store root.
767fn active_log_path(store: &Store) -> PathBuf {
768    store.root.join("log.md")
769}
770
771/// The `log/` archive directory under the store root.
772fn archive_dir(store: &Store) -> PathBuf {
773    store.root.join("log")
774}
775
776/// The `log/<YYYY-MM>.md` archive path for a given month.
777fn archive_path(store: &Store, year: i32, month: u32) -> PathBuf {
778    archive_dir(store).join(format!("{:04}-{:02}.md", year, month))
779}
780
781/// The crash-recovery marker for an in-progress rotation.
782///
783/// Its **presence** at the start of [`Log::append`] means a prior rotation
784/// appended prior-month entries to their archives but may not have trimmed the
785/// active file (a crash, or an active-rewrite error, between the two non-atomic
786/// durable writes). The retry must then DEDUP the re-rolled entries against the
787/// archive so it adds nothing.
788///
789/// Its **absence** means a fresh rotation: every prior-month entry being rolled
790/// is genuinely new to its archive and is appended UNCONDITIONALLY. This is the
791/// load-bearing distinction — a content-only dedup cannot tell an idempotent
792/// re-roll of one physical entry from a genuinely-distinct same-minute repeat
793/// (on-disk headers are minute-precision, so two real appends to the same object
794/// in the same minute with the same note render byte-identically). Gating the
795/// dedup on "are we recovering a crashed rotation?" lets a backdated duplicate
796/// survive while still suppressing a true re-roll.
797///
798/// Lives in `log/` (toolkit-managed; a dotfile, so never walked, indexed, or
799/// validated as content — `list_archives_desc` matches only `YYYY-MM.md`).
800fn rotation_marker_path(store: &Store) -> PathBuf {
801    archive_dir(store).join(".rotating")
802}
803
804/// Parse a `YYYY-MM-DD HH:MM` header timestamp, reattaching UTC. `None` on any
805/// malformed shape.
806fn parse_timestamp(s: &str) -> Option<DateTime<FixedOffset>> {
807    let naive = NaiveDateTime::parse_from_str(s.trim(), TS_FORMAT).ok()?;
808    let utc = FixedOffset::east_opt(0)?;
809    utc.from_local_datetime(&naive).single()
810}
811
812/// Split a `log.md` / archive file into its leading frontmatter+heading block
813/// (everything up to and including the line before the first `## [` header) and
814/// its parsed entries. If there are no entries, the whole content is the header
815/// block.
816fn parse_active(content: &str) -> (String, Vec<LogEntry>) {
817    match find_first_header(content) {
818        Some(idx) => {
819            let header = content[..idx].to_string();
820            let entries = parse_entries(&content[idx..]);
821            (header, entries)
822        }
823        None => (content.to_string(), Vec::new()),
824    }
825}
826
827/// Byte offset of the first **valid** entry header — a `## [` line-start that
828/// [`Log::parse_header`] accepts — or `None`.
829///
830/// Crucially this skips `## [`-SHAPED lines that `parse_header` REJECTS (a
831/// merge-orphaned note, an exporter-malformed line) appearing before the first
832/// real entry: everything up to the first valid header becomes the preserved
833/// `header` block in [`parse_active`], so a rotation re-emits it verbatim.
834/// Returning the first `## [`-shaped line instead (as this once did) put those
835/// pre-entry lines into the entries region, where [`parse_entries`] — which
836/// opens an entry only on a parseable header — dropped them on the floor,
837/// silently erasing append-only content on the next rotation.
838fn find_first_header(content: &str) -> Option<usize> {
839    let mut offset = 0usize;
840    for line in content.split_inclusive('\n') {
841        let line_str = line.trim_end_matches(['\r', '\n']);
842        if line_str.starts_with("## [") && Log::parse_header(line_str).is_some() {
843            return Some(offset);
844        }
845        offset += line.len();
846    }
847    None
848}
849
850/// Whether `line` is a note line that — left unescaped — could be mistaken for
851/// an entry header. It is *header-ambiguous* when it is a (possibly empty) run
852/// of leading backslashes followed by a string that [`Log::parse_header`]
853/// accepts. The escape (one leading backslash) and only the escape is added to,
854/// or stripped from, such lines, so the transform is fully reversible:
855/// `## [..]` (a real header shape in note text) ⇄ `\## [..]`, and a literal
856/// `\## [..]` a note already contains ⇄ `\\## [..]`.
857fn is_header_ambiguous(line: &str) -> bool {
858    let stripped = line.trim_start_matches('\\');
859    // Only treat it as ambiguous if some backslashes were the *only* prefix and
860    // the remainder is a valid header — a backslash run that does not lead into
861    // a header (e.g. `\not a header`) is ordinary note text, left untouched.
862    Log::parse_header(stripped).is_some()
863}
864
865/// Escape one note line for on-disk emission so it can never be parsed as an
866/// entry header (the [write-path fix] for header-shaped notes corrupting the
867/// append-only log). A header-ambiguous line is prefixed with a single
868/// backslash, moving its `## [` off column 0; every other line is emitted
869/// verbatim. Reversed exactly by [`unescape_note_line`].
870fn escape_note_line(line: &str) -> std::borrow::Cow<'_, str> {
871    if is_header_ambiguous(line) {
872        std::borrow::Cow::Owned(format!("\\{line}"))
873    } else {
874        std::borrow::Cow::Borrowed(line)
875    }
876}
877
878/// Reverse [`escape_note_line`]: strip exactly one leading backslash from a
879/// header-ambiguous on-disk note line, restoring the literal the author wrote.
880/// A line that is not header-ambiguous (including a genuine `\not a header`) is
881/// returned untouched, so the round-trip is lossless for arbitrary note text.
882fn unescape_note_line(line: &str) -> std::borrow::Cow<'_, str> {
883    if let Some(rest) = line.strip_prefix('\\') {
884        if is_header_ambiguous(line) {
885            return std::borrow::Cow::Borrowed(rest);
886        }
887    }
888    std::borrow::Cow::Borrowed(line)
889}
890
891/// Parse every entry in a slice that begins at (or before, header-block
892/// included) a sequence of `## [` headers. Headers that fail to parse are
893/// skipped (their body folds into the previous valid entry's note is avoided —
894/// they simply start no new entry).
895fn parse_entries(text: &str) -> Vec<LogEntry> {
896    let mut entries: Vec<LogEntry> = Vec::new();
897    let mut cur_header: Option<(DateTime<FixedOffset>, LogKind, Option<String>)> = None;
898    let mut cur_note: Vec<&str> = Vec::new();
899
900    let flush = |entries: &mut Vec<LogEntry>,
901                 header: &mut Option<(DateTime<FixedOffset>, LogKind, Option<String>)>,
902                 note: &mut Vec<&str>| {
903        if let Some((timestamp, kind, object)) = header.take() {
904            // Reverse the per-line header escape `render` applies so an escaped
905            // header-shaped note line round-trips back to its literal form.
906            let joined = note
907                .iter()
908                .map(|line| unescape_note_line(line))
909                .collect::<Vec<_>>()
910                .join("\n");
911            let note_str = joined.trim_matches(['\n', '\r']).to_string();
912            entries.push(LogEntry {
913                timestamp,
914                kind,
915                object,
916                note: note_str,
917            });
918        }
919        note.clear();
920    };
921
922    for line in text.lines() {
923        if line.starts_with("## [") {
924            if let Some(parsed) = Log::parse_header(line) {
925                // Close the previous entry, start a new one.
926                flush(&mut entries, &mut cur_header, &mut cur_note);
927                cur_header = Some(parsed);
928                continue;
929            }
930            // Unparseable `## [` line: treat as body of the current entry.
931        }
932        if cur_header.is_some() {
933            cur_note.push(line);
934        }
935    }
936    flush(&mut entries, &mut cur_header, &mut cur_note);
937    entries
938}
939
940/// Recompose an active/archive file from a header block and an entry body.
941fn compose_active(header: &str, body: &str) -> String {
942    let mut out = String::new();
943    out.push_str(header);
944    if !header.is_empty() && !header.ends_with('\n') {
945        out.push('\n');
946    }
947    // Exactly one blank line between the heading block and the first entry.
948    if !header.is_empty() && !out.ends_with("\n\n") {
949        out.push('\n');
950    }
951    out.push_str(body);
952    out
953}
954
955/// Append entries to a month archive, creating it with `type: log` frontmatter
956/// if absent. Atomic (temp-file rename). Entries are appended in the given
957/// order (callers pass them already chronological within the month).
958///
959/// **`recovering` — the re-roll gate.** Rotation in [`Log::append`] is two
960/// non-atomic durable writes: roll prior-month entries into the archive, then
961/// rewrite (trim) the active file. If the process crashes or the active rewrite
962/// errors *after* the archive write commits, the prior-month entries remain in
963/// the still-untrimmed active file and the agent's retry re-rolls them here. A
964/// naive concatenate would then duplicate every entry, amplifying on each retry.
965///
966/// We CANNOT dedup that away by content alone: on-disk headers are
967/// minute-precision, so two genuinely-distinct appends to the same object in the
968/// same minute with the same note render byte-identically — indistinguishable
969/// from a re-roll of one physical entry. Deduping unconditionally therefore
970/// silently destroyed a legitimately-distinct backdated duplicate (the bug).
971///
972/// So the caller passes `recovering`: `true` only when an in-progress-rotation
973/// marker was found (a crash-retry), where we dedup the incoming batch against
974/// the archive **by multiplicity** (skip an incoming entry only while the
975/// archive still holds an unconsumed copy of its identity) so a re-roll of the
976/// SAME physical entries adds nothing. On a fresh rotation (`false`) every entry
977/// is genuinely new to the archive and is appended unconditionally, so a
978/// distinct same-minute repeat survives.
979fn append_to_archive(path: &Path, entries: &[LogEntry], recovering: bool) -> crate::Result<()> {
980    if path.exists() {
981        let existing = fs::read_to_string(path)?;
982
983        let mut body = String::new();
984        if recovering {
985            // Crash-retry: the prior (crashed) attempt may already have appended
986            // some/all of these. Dedup by MULTIPLICITY, not set-membership, so a
987            // partial-then-retried roll converges exactly and a re-roll of the
988            // full batch is a no-op.
989            let (_header, existing_entries) = parse_active(&existing);
990            let mut remaining: std::collections::HashMap<EntryKey, usize> =
991                std::collections::HashMap::new();
992            for e in &existing_entries {
993                *remaining.entry(entry_key(e)).or_insert(0) += 1;
994            }
995            for e in entries {
996                match remaining.get_mut(&entry_key(e)) {
997                    // An archived copy is still unconsumed: this incoming entry is
998                    // that re-roll, suppress it.
999                    Some(count) if *count > 0 => *count -= 1,
1000                    _ => body.push_str(&e.render()),
1001                }
1002            }
1003        } else {
1004            // Fresh rotation: append every entry. A same-minute, same-fields
1005            // entry that already exists in the archive is a DISTINCT append, not
1006            // a re-roll, and must be preserved.
1007            for e in entries {
1008                body.push_str(&e.render());
1009            }
1010        }
1011
1012        // Nothing new to add (a fully-duplicate re-roll): leave the archive
1013        // byte-for-byte untouched (append-only: don't rewrite identical data).
1014        if body.is_empty() {
1015            return Ok(());
1016        }
1017
1018        let mut full = existing;
1019        if !full.ends_with('\n') {
1020            full.push('\n');
1021        }
1022        full.push_str(&body);
1023        crate::fsx::write_atomic(path, full.as_bytes())?;
1024    } else {
1025        let mut body = String::new();
1026        for e in entries {
1027            body.push_str(&e.render());
1028        }
1029        if let Some(parent) = path.parent() {
1030            fs::create_dir_all(parent)?;
1031        }
1032        let full = compose_active(LOG_FRONTMATTER, &body);
1033        crate::fsx::write_atomic(path, full.as_bytes())?;
1034    }
1035    Ok(())
1036}
1037
1038/// A hashable identity for a log entry, used to dedup an idempotent archive
1039/// re-roll (see [`append_to_archive`]). Two entries are "the same" when their
1040/// timestamp, kind, object, and note all match — exactly the fields that
1041/// round-trip through `render`/`parse`, so a re-rolled entry compares equal to
1042/// the one already archived. Owned (rather than borrowed) so keys from the
1043/// existing archive and from the incoming entries share one type regardless of
1044/// where they came from; the cost is paid only on the cold rotation path.
1045type EntryKey = (DateTime<FixedOffset>, String, Option<String>, String);
1046
1047/// Derive the dedup key for `e` (see [`EntryKey`]). Keying on `kind.as_str()`
1048/// (rather than `LogKind`, which is not `Hash`) is exact: `as_str`/`parse`
1049/// round-trips every recognized kind and preserves any `Custom` token.
1050fn entry_key(e: &LogEntry) -> EntryKey {
1051    (
1052        e.timestamp,
1053        e.kind.as_str().to_string(),
1054        e.object.clone(),
1055        e.note.clone(),
1056    )
1057}
1058
1059/// Every `log/<YYYY-MM>.md` archive, sorted **newest month first**.
1060fn list_archives_desc(store: &Store) -> crate::Result<Vec<PathBuf>> {
1061    let dir = archive_dir(store);
1062    if !dir.is_dir() {
1063        return Ok(Vec::new());
1064    }
1065    let mut months: Vec<(String, PathBuf)> = Vec::new();
1066    for entry in fs::read_dir(&dir)? {
1067        let entry = entry?;
1068        let path = entry.path();
1069        if !path.is_file() {
1070            continue;
1071        }
1072        let name = match path.file_name().and_then(|s| s.to_str()) {
1073            Some(n) => n,
1074            None => continue,
1075        };
1076        // Match `YYYY-MM.md`.
1077        if let Some(stem) = name.strip_suffix(".md") {
1078            if is_year_month(stem) {
1079                months.push((stem.to_string(), path.clone()));
1080            }
1081        }
1082    }
1083    // `YYYY-MM` strings sort lexically == chronologically; reverse for newest
1084    // first.
1085    months.sort_by(|a, b| b.0.cmp(&a.0));
1086    Ok(months.into_iter().map(|(_, p)| p).collect())
1087}
1088
1089/// The `(year, month)` an archive file represents, parsed from its
1090/// `log/<YYYY-MM>.md` name. `None` if the name isn't a well-formed month
1091/// archive (in which case the caller scans it rather than risk skipping it).
1092fn archive_year_month(path: &Path) -> Option<(i32, u32)> {
1093    let stem = path
1094        .file_name()
1095        .and_then(|s| s.to_str())
1096        .and_then(|n| n.strip_suffix(".md"))?;
1097    if !is_year_month(stem) {
1098        return None;
1099    }
1100    let year: i32 = stem[..4].parse().ok()?;
1101    let month: u32 = stem[5..7].parse().ok()?;
1102    // The month must be a real calendar month. A hand-created / externally-
1103    // produced `log/2026-00.md` or `log/2026-13.md` parses as two digits but
1104    // names no month; returning `Some((year, 0))` would sort it below every
1105    // legitimate month, so the newest-month-first early-break in `since`/`tail`
1106    // could prune it and silently drop its entries. Out-of-range → `None`, so the
1107    // caller scans the file instead of risk-skipping it (the safe fallback).
1108    if !(1..=12).contains(&month) {
1109        return None;
1110    }
1111    Some((year, month))
1112}
1113
1114/// True if `s` looks like `YYYY-MM` (4 digits, dash, 2 digits).
1115fn is_year_month(s: &str) -> bool {
1116    let bytes = s.as_bytes();
1117    if bytes.len() != 7 {
1118        return false;
1119    }
1120    bytes[..4].iter().all(u8::is_ascii_digit)
1121        && bytes[4] == b'-'
1122        && bytes[5].is_ascii_digit()
1123        && bytes[6].is_ascii_digit()
1124}
1125
1126/// Reverse-read `path` from EOF, parsing entries newest-first and feeding each
1127/// to `take`. `take` returns `true` to stop early (enough collected). The file
1128/// is read backward in blocks; only the tail region needed to satisfy `take`
1129/// is read — the whole file is read only if `take` never returns `true`.
1130fn reverse_collect<F>(path: &Path, mut take: F) -> crate::Result<()>
1131where
1132    F: FnMut(LogEntry) -> bool,
1133{
1134    let mut file = File::open(path)?;
1135    let len = file.metadata()?.len();
1136    if len == 0 {
1137        return Ok(());
1138    }
1139
1140    // Algorithm: grow a tail buffer leftward one block at a time, emitting
1141    // entries strictly newest-first as their left boundary is confirmed, and
1142    // stopping the instant `take` says enough. The whole file is read only if
1143    // `take` never returns `true` (e.g. `tail(n)` with n ≥ entry count).
1144    //
1145    // Invariant: a `## [` line-start anywhere in the buffer is a *complete*
1146    // entry — its header is the entry's first line, and its body lies to the
1147    // right and is therefore already buffered (we read right-to-left). So we
1148    // never split an entry across blocks.
1149    //
1150    // `buf` holds the file's bytes from absolute offset `start` (growing
1151    // leftward toward 0) to EOF. `emitted_abs` records the absolute offsets of
1152    // headers already handed to `take`, so re-visiting a header in a later block
1153    // never double-emits.
1154    let mut buf: Vec<u8> = Vec::new();
1155    let mut start = len;
1156    // O(1) membership: a `Vec` + `.contains()` here would be O(E²) across a large
1157    // single-month file (every header re-checked against all prior emissions).
1158    let mut emitted_abs: std::collections::HashSet<u64> = std::collections::HashSet::new();
1159    // Every header's absolute offset found so far, ascending. Built
1160    // *incrementally*: each block contributes only the markers whose `#` starts
1161    // inside it (all strictly smaller than any already-known offset, so they
1162    // prepend in order). This is the fix for the accidental O(file²) scan — the
1163    // old code re-ran `header_offsets` over the whole accumulated buffer on every
1164    // block (O(file²/block) byte comparisons on the default no-early-stop
1165    // tail/since path); now each byte is scanned for a header exactly once.
1166    let mut headers: Vec<u64> = Vec::new();
1167    let mut stop = false;
1168    // The first backward block has no already-scanned region to its right, so it
1169    // scans exactly `[0, block)`; every later block scans one byte further
1170    // (`block + 1`) to re-classify the prior block's deferred left-edge candidate
1171    // now that its left neighbour is buffered (see the scan call below).
1172    let mut first = true;
1173
1174    while start > 0 && !stop {
1175        let block = std::cmp::min(REVERSE_BLOCK as u64, start);
1176        let new_start = start - block;
1177        file.seek(SeekFrom::Start(new_start))?;
1178        let mut chunk = vec![0u8; block as usize];
1179        file.read_exact(&mut chunk)?;
1180        chunk.extend_from_slice(&buf);
1181        buf = chunk;
1182        start = new_start;
1183
1184        // Scan the freshly-prepended block (buffer indices `[0, block)`) for new
1185        // header markers. A marker straddling the block boundary has its `#` in
1186        // this window and so is still caught (see `header_offsets_range`).
1187        //
1188        // One subtlety the scan must respect: a `## [` whose `#` sits at the
1189        // block's LEFT edge (buffer index 0, absolute offset `start`) cannot have
1190        // its line-start confirmed yet when `start > 0` — the byte at `start - 1`
1191        // is not buffered. Treating index 0 as a line start there fabricates an
1192        // entry from a mid-line `## [` fragment that happens to align with a block
1193        // boundary. So `header_offsets_range` DEFERS the leftmost candidate when
1194        // `base` is not the true file start, and we re-scan one byte further
1195        // right next time: after the first block the buffer carries the previous
1196        // block's left-edge byte at index `block` with its left neighbour now in
1197        // hand, so extending the window to `block + 1` re-classifies that exactly
1198        // once. `first` guards the first block (nothing to re-check on its right).
1199        let base_is_file_start = start == 0;
1200        let scan_hi = if first { block } else { block + 1 } as usize;
1201        let mut new_headers = header_offsets_range(&buf, start, 0, scan_hi, base_is_file_start);
1202        first = false;
1203        if !new_headers.is_empty() {
1204            new_headers.extend_from_slice(&headers);
1205            headers = new_headers;
1206        }
1207
1208        // Process newest (largest offset) → oldest (smallest), emitting any
1209        // header not yet emitted. Hold back only the buffer's *leftmost* header
1210        // while we have not reached file start (`start > 0`): older entries may
1211        // still lie to its left in unread blocks, and newest-first order
1212        // requires we not emit it until we've confirmed it really is the oldest
1213        // (or read enough to bound it on the left). One extra block read at
1214        // most; on the next iteration its left boundary is in-buffer.
1215        for i in (0..headers.len()).rev() {
1216            let abs = headers[i];
1217            if emitted_abs.contains(&abs) {
1218                continue;
1219            }
1220            let is_oldest_in_buf = i == 0;
1221            if is_oldest_in_buf && start > 0 {
1222                continue;
1223            }
1224
1225            let entry_text = entry_text_at(&buf, start, abs, &headers, i);
1226            if let Some(entry) = parse_single_entry(&entry_text) {
1227                emitted_abs.insert(abs);
1228                if take(entry) {
1229                    stop = true;
1230                    break;
1231                }
1232            } else {
1233                emitted_abs.insert(abs);
1234            }
1235        }
1236    }
1237
1238    // Reached file start (or stopped). If we stopped, done. If we reached
1239    // start, emit any held-back oldest header(s) now (start == 0 means the
1240    // buffer's first header is genuinely the oldest). `headers` already holds
1241    // every offset (the loop scanned down to start == 0), so reuse it.
1242    if !stop && start == 0 {
1243        for i in (0..headers.len()).rev() {
1244            let abs = headers[i];
1245            if emitted_abs.contains(&abs) {
1246                continue;
1247            }
1248            let entry_text = entry_text_at(&buf, start, abs, &headers, i);
1249            if let Some(entry) = parse_single_entry(&entry_text) {
1250                emitted_abs.insert(abs);
1251                if take(entry) {
1252                    break;
1253                }
1254            } else {
1255                emitted_abs.insert(abs);
1256            }
1257        }
1258    }
1259
1260    Ok(())
1261}
1262
1263/// Absolute byte offsets of every **valid** entry-header line-start (`## […]`)
1264/// in `buf`, where `buf` begins at absolute offset `base`.
1265///
1266/// Only a `## [` line that [`Log::parse_header`] accepts is an entry boundary,
1267/// mirroring the forward parser ([`parse_entries`]), which folds an unparseable
1268/// `## [` line into the preceding entry's note rather than starting a new entry.
1269/// Without this validity check the reverse reader would split a real entry's
1270/// multi-line note at a continuation line beginning at column 0 with `## [`
1271/// (a shape the SPEC permits — notes are "one or more lines" with no
1272/// restriction), truncating the note and dropping the carved pseudo-entry, so
1273/// `tail`/`since`/`last_validate_at` would return a note diverging from the
1274/// intact on-disk bytes.
1275///
1276/// Whole-buffer convenience wrapper over [`header_offsets_range`]. The runtime
1277/// reverse reader now always scans incrementally (one freshly-prepended window
1278/// per backward block), so this whole-buffer form is retained only as the
1279/// oracle the range-scan tests check the incremental scan against.
1280#[cfg(test)]
1281fn header_offsets(buf: &[u8], base: u64) -> Vec<u64> {
1282    // The whole-buffer oracle treats `base` as the file start iff it is 0, so a
1283    // `## [` at buffer index 0 is a real line-start there.
1284    header_offsets_range(buf, base, 0, buf.len(), base == 0)
1285}
1286
1287/// Like [`header_offsets`] but only reports header *markers whose `#` starts in*
1288/// `buf[scan_lo..scan_hi)`, while still consulting bytes outside that window —
1289/// to the left for the line-start (`buf[i-1] == b'\n'`) check and to the right
1290/// for the header line's content. This is the incremental scan
1291/// [`reverse_collect`] uses: each backward block searches only the freshly-
1292/// prepended region for *new* markers, so total header-scan work is linear in
1293/// the file size, not the O(file²) of re-scanning the whole growing buffer on
1294/// every block.
1295///
1296/// A `## [` marker that *straddles* the boundary (its `#` in the new block, its
1297/// `[` or trailing bytes in the already-scanned region) is still detected here:
1298/// its `#` index is `< scan_hi`, so it falls in this window, and it was never
1299/// reported by an earlier scan (whose window was `[block, …)`, strictly to the
1300/// right of this one) — so each marker is reported exactly once across all
1301/// blocks.
1302///
1303/// **Left-edge line-start safety.** A `## [` whose `#` is at buffer index 0 has
1304/// no buffered left neighbour, so its line-start cannot be confirmed unless
1305/// index 0 really is the file start. `base_is_file_start` says so: when it is
1306/// `false`, an index-0 candidate is DEFERRED (not reported) rather than assumed
1307/// to be at a line start — otherwise a mid-line `## […]` fragment that happens
1308/// to align with a block's left edge would be fabricated into an entry,
1309/// truncating the real entry's note and (after rotation) corrupting the
1310/// append-only archive. The caller re-scans that byte on the next block, once
1311/// its left neighbour is buffered, so a genuine boundary header is still found
1312/// exactly once.
1313fn header_offsets_range(
1314    buf: &[u8],
1315    base: u64,
1316    scan_lo: usize,
1317    scan_hi: usize,
1318    base_is_file_start: bool,
1319) -> Vec<u64> {
1320    const PAT: &[u8] = b"## [";
1321    let mut out = Vec::new();
1322    let n = buf.len();
1323    let hi = scan_hi.min(n);
1324    let mut i = scan_lo;
1325    // A marker's `#` must start strictly before `hi`; the pattern/line content
1326    // may read past `hi` into `buf` (the right neighbour is already buffered).
1327    while i < hi && i + PAT.len() <= n {
1328        if &buf[i..i + PAT.len()] == PAT {
1329            // Index 0 is a line start only when it is the genuine file start;
1330            // otherwise its left neighbour is unbuffered and the candidate is
1331            // deferred to the next block (see the doc comment).
1332            let at_line_start = if i == 0 {
1333                base_is_file_start
1334            } else {
1335                buf[i - 1] == b'\n'
1336            };
1337            if at_line_start && is_valid_header_line(buf, i) {
1338                out.push(base + i as u64);
1339                // skip ahead past this marker
1340                i += PAT.len();
1341                continue;
1342            }
1343        }
1344        i += 1;
1345    }
1346    out
1347}
1348
1349/// Whether the `## [` line starting at byte `i` in `buf` parses as a valid
1350/// entry header. Reads the line up to (but not including) the next `\n` (or
1351/// buffer end) and defers to [`Log::parse_header`] — the same validity gate the
1352/// forward parser applies, keeping the reverse reader's boundary set identical
1353/// to the forward one.
1354fn is_valid_header_line(buf: &[u8], i: usize) -> bool {
1355    let line_end = buf[i..]
1356        .iter()
1357        .position(|&b| b == b'\n')
1358        .map(|p| i + p)
1359        .unwrap_or(buf.len());
1360    let line = String::from_utf8_lossy(&buf[i..line_end]);
1361    Log::parse_header(&line).is_some()
1362}
1363
1364/// Extract the text of the entry whose header is at absolute offset
1365/// `header_abs` (the `headers[idx]` entry), spanning to the next header (or
1366/// buffer end). `buf` begins at absolute offset `base`.
1367fn entry_text_at(buf: &[u8], base: u64, header_abs: u64, headers: &[u64], idx: usize) -> String {
1368    let rel_start = (header_abs - base) as usize;
1369    let rel_end = if idx + 1 < headers.len() {
1370        (headers[idx + 1] - base) as usize
1371    } else {
1372        buf.len()
1373    };
1374    String::from_utf8_lossy(&buf[rel_start..rel_end]).into_owned()
1375}
1376
1377/// Parse a single entry from a text block that begins at its header line.
1378fn parse_single_entry(text: &str) -> Option<LogEntry> {
1379    parse_entries(text).into_iter().next()
1380}
1381
1382#[cfg(test)]
1383mod tests {
1384    use super::*;
1385    use crate::parser::Config;
1386    use std::fs;
1387    use tempfile::TempDir;
1388
1389    /// Build a `Store` rooted at a fresh temp dir with a minimal `DB.md`.
1390    /// Construct the `Store` struct directly so the test stays narrow and never
1391    /// exercises the `Store::open` parser path.
1392    fn temp_store() -> (TempDir, Store) {
1393        let dir = tempfile::tempdir().expect("tempdir");
1394        fs::write(dir.path().join("DB.md"), "---\ntype: db-md\n---\n").expect("write DB.md");
1395        let store = Store {
1396            root: dir.path().to_path_buf(),
1397            config: Config::default(),
1398        };
1399        (dir, store)
1400    }
1401
1402    /// Regression (adversarial review): a hand-created / externally-produced
1403    /// archive with an out-of-range month (`00`, `13`..`99`) must NOT parse as a
1404    /// real month archive — otherwise its `(year, 0)` bucket sorts below every
1405    /// legitimate month and the newest-first early-break in `since`/`tail` can
1406    /// silently prune it. Out-of-range → `None` (the caller scans it instead).
1407    #[test]
1408    fn archive_year_month_rejects_out_of_range_months() {
1409        use std::path::Path;
1410        assert_eq!(
1411            archive_year_month(Path::new("log/2026-05.md")),
1412            Some((2026, 5))
1413        );
1414        assert_eq!(
1415            archive_year_month(Path::new("log/2026-01.md")),
1416            Some((2026, 1))
1417        );
1418        assert_eq!(
1419            archive_year_month(Path::new("log/2026-12.md")),
1420            Some((2026, 12))
1421        );
1422        for bad in ["log/2026-00.md", "log/2026-13.md", "log/2026-99.md"] {
1423            assert_eq!(
1424                archive_year_month(Path::new(bad)),
1425                None,
1426                "{bad} has an out-of-range month and must not parse as an archive"
1427            );
1428        }
1429    }
1430
1431    /// A timestamp at UTC from `YYYY-MM-DD HH:MM` components.
1432    fn ts(y: i32, mo: u32, d: u32, h: u32, mi: u32) -> DateTime<FixedOffset> {
1433        let naive = chrono::NaiveDate::from_ymd_opt(y, mo, d)
1434            .unwrap()
1435            .and_hms_opt(h, mi, 0)
1436            .unwrap();
1437        FixedOffset::east_opt(0)
1438            .unwrap()
1439            .from_local_datetime(&naive)
1440            .single()
1441            .unwrap()
1442    }
1443
1444    #[allow(clippy::too_many_arguments)] // test fixture builder; struct-ifying churns every call site
1445    fn entry(
1446        y: i32,
1447        mo: u32,
1448        d: u32,
1449        h: u32,
1450        mi: u32,
1451        kind: LogKind,
1452        object: Option<&str>,
1453        note: &str,
1454    ) -> LogEntry {
1455        LogEntry {
1456            timestamp: ts(y, mo, d, h, mi),
1457            kind,
1458            object: object.map(|s| s.to_string()),
1459            note: note.to_string(),
1460        }
1461    }
1462
1463    // ── parse_header ────────────────────────────────────────────────────────
1464
1465    #[test]
1466    fn parse_header_with_object() {
1467        let (t, k, o) =
1468            Log::parse_header("## [2026-05-27 10:00] ingest | sources/emails/x.eml").unwrap();
1469        assert_eq!(t, ts(2026, 5, 27, 10, 0));
1470        assert_eq!(k, LogKind::Ingest);
1471        assert_eq!(o.as_deref(), Some("sources/emails/x.eml"));
1472    }
1473
1474    #[test]
1475    fn parse_header_without_object_is_none_object() {
1476        let (t, k, o) = Log::parse_header("## [2026-05-27 10:20] validate").unwrap();
1477        assert_eq!(t, ts(2026, 5, 27, 10, 20));
1478        assert_eq!(k, LogKind::Validate);
1479        assert_eq!(o, None);
1480    }
1481
1482    #[test]
1483    fn parse_header_custom_kind_roundtrips_token() {
1484        let (_, k, o) = Log::parse_header("## [2026-05-27 10:00] proposal | records/x").unwrap();
1485        assert_eq!(k, LogKind::Custom("proposal".to_string()));
1486        assert!(!k.is_recognized());
1487        assert_eq!(o.as_deref(), Some("records/x"));
1488    }
1489
1490    #[test]
1491    fn parse_header_index_rebuild_hyphenated_kind() {
1492        let (_, k, _) = Log::parse_header("## [2026-05-27 10:00] index-rebuild").unwrap();
1493        assert_eq!(k, LogKind::IndexRebuild);
1494        assert_eq!(k.as_str(), "index-rebuild");
1495    }
1496
1497    #[test]
1498    fn parse_header_rejects_non_headers() {
1499        assert!(Log::parse_header("Not a header").is_none());
1500        assert!(Log::parse_header("# Curator log").is_none());
1501        assert!(Log::parse_header("## [garbage] ingest | x").is_none());
1502        assert!(Log::parse_header("## [2026-05-27 10:00]").is_none()); // no kind
1503                                                                       // A bracketed but non-timestamp date must be rejected (LOG_BAD_TIMESTAMP territory).
1504        assert!(Log::parse_header("## [2026-13-40 99:99] ingest | x").is_none());
1505    }
1506
1507    // ── kind round-trip ───────────────────────────────────────────────────────
1508
1509    #[test]
1510    fn kind_as_str_parse_roundtrip_for_all_recognized() {
1511        for k in [
1512            LogKind::Ingest,
1513            LogKind::Create,
1514            LogKind::Update,
1515            LogKind::Delete,
1516            LogKind::Rename,
1517            LogKind::Link,
1518            LogKind::Validate,
1519            LogKind::IndexRebuild,
1520            LogKind::Contradiction,
1521        ] {
1522            assert_eq!(LogKind::parse(k.as_str()), k);
1523            assert!(k.is_recognized());
1524        }
1525    }
1526
1527    // ── append: creation + frontmatter ───────────────────────────────────────
1528
1529    #[test]
1530    fn append_creates_log_with_frontmatter_and_entry() {
1531        let (_d, store) = temp_store();
1532        let e = entry(
1533            2026,
1534            5,
1535            27,
1536            10,
1537            0,
1538            LogKind::Ingest,
1539            Some("sources/emails/x.eml"),
1540            "Email received.",
1541        );
1542        Log::append(&store, &e).unwrap();
1543
1544        let content = fs::read_to_string(store.root.join("log.md")).unwrap();
1545        // type: log frontmatter present.
1546        assert!(
1547            content.starts_with("---\ntype: log\n---\n"),
1548            "missing log frontmatter; got:\n{content}"
1549        );
1550        // The entry header is rendered verbatim.
1551        assert!(content.contains("## [2026-05-27 10:00] ingest | sources/emails/x.eml"));
1552        assert!(content.contains("Email received."));
1553        // No archive dir created when nothing rotates.
1554        assert!(!store.root.join("log").exists());
1555    }
1556
1557    // ── append → tail → since round-trip ─────────────────────────────────────
1558
1559    #[test]
1560    fn append_tail_since_roundtrip() {
1561        let (_d, store) = temp_store();
1562        let e1 = entry(2026, 5, 27, 10, 0, LogKind::Ingest, Some("a"), "first");
1563        let e2 = entry(2026, 5, 27, 10, 5, LogKind::Create, Some("b"), "second");
1564        let e3 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("c"), "third");
1565        Log::append(&store, &e1).unwrap();
1566        Log::append(&store, &e2).unwrap();
1567        Log::append(&store, &e3).unwrap();
1568
1569        // tail(2) returns the two newest, in chronological order.
1570        let tail = Log::tail(&store, 2).unwrap();
1571        assert_eq!(tail.len(), 2);
1572        assert_eq!(tail[0], e2);
1573        assert_eq!(tail[1], e3);
1574
1575        // tail(n) larger than the log returns everything, chronologically.
1576        let all = Log::tail(&store, 99).unwrap();
1577        assert_eq!(all, vec![e1.clone(), e2.clone(), e3.clone()]);
1578
1579        // since(10:05) returns strictly-newer entries (excludes the 10:05 one).
1580        let since = Log::since(&store, ts(2026, 5, 27, 10, 5)).unwrap();
1581        assert_eq!(since, vec![e3.clone()]);
1582
1583        // since before everything returns all.
1584        let since_all = Log::since(&store, ts(2026, 5, 27, 9, 0)).unwrap();
1585        assert_eq!(since_all, vec![e1, e2, e3]);
1586    }
1587
1588    #[test]
1589    fn tail_zero_is_empty() {
1590        let (_d, store) = temp_store();
1591        Log::append(
1592            &store,
1593            &entry(2026, 5, 27, 10, 0, LogKind::Ingest, Some("a"), "x"),
1594        )
1595        .unwrap();
1596        assert!(Log::tail(&store, 0).unwrap().is_empty());
1597    }
1598
1599    #[test]
1600    fn tail_and_since_on_missing_log_are_empty() {
1601        let (_d, store) = temp_store();
1602        assert!(Log::tail(&store, 5).unwrap().is_empty());
1603        assert!(Log::since(&store, ts(2000, 1, 1, 0, 0)).unwrap().is_empty());
1604        assert!(Log::last_validate_at(&store).unwrap().is_none());
1605    }
1606
1607    #[test]
1608    fn since_exact_timestamp_is_exclusive() {
1609        let (_d, store) = temp_store();
1610        let e = entry(2026, 5, 27, 10, 0, LogKind::Validate, None, "PASS");
1611        Log::append(&store, &e).unwrap();
1612        // Equal timestamp must NOT be included (strictly newer).
1613        assert!(Log::since(&store, ts(2026, 5, 27, 10, 0))
1614            .unwrap()
1615            .is_empty());
1616    }
1617
1618    // ── since: out-of-order on disk (append-only correction / merge=union) ────
1619
1620    /// Write a `log.md` at the store root from `entries` in the EXACT given
1621    /// physical order, with the standard `type: log` frontmatter. Unlike
1622    /// [`Log::append`] (which always lands the newest entry at EOF), this lets a
1623    /// test author the non-monotonic on-disk shape the SPEC permits — a
1624    /// backdated corrective entry below the entry it corrects, or a
1625    /// `merge=union` interleave.
1626    fn write_raw_log(store: &Store, entries: &[LogEntry]) {
1627        let mut content = String::from(LOG_FRONTMATTER);
1628        content.push('\n');
1629        for e in entries {
1630            content.push_str(&e.render());
1631        }
1632        fs::write(store.root.join("log.md"), content).expect("write raw log.md");
1633    }
1634
1635    #[test]
1636    fn since_returns_newer_entries_even_when_disk_order_is_non_monotonic() {
1637        // The demonstrated regression: a curator appended a backdated CORRECTIVE
1638        // entry (10:00) below newer entries (10:10, 10:05), so the physical
1639        // on-disk order is 10:10, 10:05, 10:00 — newest-first, not chronological.
1640        // The append-only SPEC explicitly permits this ("append a corrective
1641        // entry below it"; out-of-order is only LOG_OUT_OF_ORDER, a warning).
1642        let (_d, store) = temp_store();
1643        let e_1010 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("c"), "newest");
1644        let e_1005 = entry(2026, 5, 27, 10, 5, LogKind::Create, Some("b"), "middle");
1645        let e_1000 = entry(
1646            2026,
1647            5,
1648            27,
1649            10,
1650            0,
1651            LogKind::Update,
1652            Some("a"),
1653            "backdated fix",
1654        );
1655        // Physical order on disk: 10:10, 10:05, then the backdated 10:00 LAST.
1656        write_raw_log(&store, &[e_1010, e_1005, e_1000]);
1657
1658        // since 10:02 must return BOTH entries strictly newer than 10:02
1659        // (10:05 and 10:10). The old early-stop hit the physically-last 10:00
1660        // entry (<= 10:02), stopped, and returned EMPTY — silently dropping the
1661        // two newer entries that sit earlier in the file.
1662        let got = Log::since(&store, ts(2026, 5, 27, 10, 2)).unwrap();
1663        let stamps: std::collections::BTreeSet<_> = got.iter().map(|e| e.timestamp).collect();
1664        assert_eq!(
1665            stamps,
1666            [ts(2026, 5, 27, 10, 5), ts(2026, 5, 27, 10, 10)]
1667                .into_iter()
1668                .collect(),
1669            "since(10:02) must include both 10:05 and 10:10 despite the backdated \
1670             10:00 entry sitting physically last, and exclude 10:00; got {got:?}"
1671        );
1672
1673        // A cutoff before everything still returns all three, regardless of the
1674        // scrambled disk order.
1675        let all = Log::since(&store, ts(2026, 5, 27, 9, 0)).unwrap();
1676        let all_stamps: std::collections::BTreeSet<_> = all.iter().map(|e| e.timestamp).collect();
1677        assert_eq!(
1678            all_stamps,
1679            [
1680                ts(2026, 5, 27, 10, 0),
1681                ts(2026, 5, 27, 10, 5),
1682                ts(2026, 5, 27, 10, 10),
1683            ]
1684            .into_iter()
1685            .collect()
1686        );
1687    }
1688
1689    #[test]
1690    fn since_crosses_archive_when_newer_entry_is_out_of_order_inside_it() {
1691        // Out-of-order INSIDE an archive month, with the cutoff landing in that
1692        // month. The April archive is authored newest-physical-first (04-20,
1693        // then a backdated 04-05 last); a naive early-stop on the first
1694        // older-than-cutoff entry would miss the later April entry. The active
1695        // file holds a clean May entry. Cutoff = mid-April.
1696        let (_d, store) = temp_store();
1697
1698        // Active file: one current-month (May) entry.
1699        let may = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may1");
1700        write_raw_log(&store, &[may]);
1701
1702        // April archive authored out of order: 04-20 first, backdated 04-05 last.
1703        let apr_late = entry(
1704            2026,
1705            4,
1706            20,
1707            9,
1708            0,
1709            LogKind::Create,
1710            Some("apr-b"),
1711            "apr-late",
1712        );
1713        let apr_early = entry(
1714            2026,
1715            4,
1716            5,
1717            9,
1718            0,
1719            LogKind::Ingest,
1720            Some("apr-a"),
1721            "apr-early",
1722        );
1723        let dir = store.root.join("log");
1724        fs::create_dir_all(&dir).unwrap();
1725        let mut arch = String::from(LOG_FRONTMATTER);
1726        arch.push('\n');
1727        arch.push_str(&apr_late.render());
1728        arch.push_str(&apr_early.render());
1729        fs::write(dir.join("2026-04.md"), arch).unwrap();
1730
1731        // since mid-April: the later April entry (04-20) AND the May entry must
1732        // come back; the early April entry (04-05) must not.
1733        let got = Log::since(&store, ts(2026, 4, 15, 0, 0)).unwrap();
1734        let stamps: std::collections::BTreeSet<_> = got.iter().map(|e| e.timestamp).collect();
1735        assert_eq!(
1736            stamps,
1737            [ts(2026, 4, 20, 9, 0), ts(2026, 5, 2, 8, 0)]
1738                .into_iter()
1739                .collect(),
1740            "since(mid-April) must include the out-of-order later April entry \
1741             and the May entry, and exclude the earlier April entry; got {got:?}"
1742        );
1743    }
1744
1745    // ── multi-line notes ──────────────────────────────────────────────────────
1746
1747    #[test]
1748    fn multiline_note_is_preserved() {
1749        let (_d, store) = temp_store();
1750        let e = entry(
1751            2026,
1752            5,
1753            27,
1754            10,
1755            0,
1756            LogKind::Create,
1757            Some("records/x"),
1758            "Line one.\nLine two.\nLine three.",
1759        );
1760        Log::append(&store, &e).unwrap();
1761        let got = Log::tail(&store, 1).unwrap();
1762        assert_eq!(got[0].note, "Line one.\nLine two.\nLine three.");
1763    }
1764
1765    #[test]
1766    fn empty_note_roundtrips_as_empty() {
1767        let (_d, store) = temp_store();
1768        let e = entry(2026, 5, 27, 10, 0, LogKind::Validate, None, "");
1769        Log::append(&store, &e).unwrap();
1770        let got = Log::tail(&store, 1).unwrap();
1771        assert_eq!(got[0], e);
1772        assert_eq!(got[0].note, "");
1773    }
1774
1775    // ── last_validate_at ─────────────────────────────────────────────────────
1776
1777    #[test]
1778    fn last_validate_at_finds_most_recent_validate() {
1779        let (_d, store) = temp_store();
1780        Log::append(
1781            &store,
1782            &entry(2026, 5, 27, 10, 0, LogKind::Validate, None, "first pass"),
1783        )
1784        .unwrap();
1785        Log::append(
1786            &store,
1787            &entry(2026, 5, 27, 10, 5, LogKind::Create, Some("a"), "made a"),
1788        )
1789        .unwrap();
1790        Log::append(
1791            &store,
1792            &entry(2026, 5, 27, 10, 10, LogKind::Validate, None, "second pass"),
1793        )
1794        .unwrap();
1795        Log::append(
1796            &store,
1797            &entry(2026, 5, 27, 10, 15, LogKind::Update, Some("a"), "edit a"),
1798        )
1799        .unwrap();
1800
1801        let last = Log::last_validate_at(&store).unwrap();
1802        assert_eq!(last, Some(ts(2026, 5, 27, 10, 10)));
1803    }
1804
1805    #[test]
1806    fn last_validate_at_none_when_no_validate() {
1807        let (_d, store) = temp_store();
1808        Log::append(
1809            &store,
1810            &entry(2026, 5, 27, 10, 0, LogKind::Create, Some("a"), "x"),
1811        )
1812        .unwrap();
1813        assert_eq!(Log::last_validate_at(&store).unwrap(), None);
1814    }
1815
1816    // ── month-boundary rotation ──────────────────────────────────────────────
1817
1818    #[test]
1819    fn rotation_rolls_prior_months_into_archives() {
1820        let (_d, store) = temp_store();
1821        // Two April entries and one May entry, all written while "current" was
1822        // their own month (append-only chronological order).
1823        let a1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
1824        let a2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
1825        Log::append(&store, &a1).unwrap();
1826        Log::append(&store, &a2).unwrap();
1827
1828        // Before rotation: no archive dir, both April entries in active.
1829        assert!(!store.root.join("log").exists());
1830
1831        // Appending a May entry must roll April into log/2026-04.md.
1832        let m1 = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may one");
1833        Log::append(&store, &m1).unwrap();
1834
1835        // Archive exists and holds both April entries with frontmatter.
1836        let arch_path = store.root.join("log").join("2026-04.md");
1837        assert!(arch_path.exists(), "expected April archive to be created");
1838        let arch = fs::read_to_string(&arch_path).unwrap();
1839        assert!(arch.starts_with("---\ntype: log\n---\n"));
1840        assert!(arch.contains("## [2026-04-10 09:00] ingest | apr-a"));
1841        assert!(arch.contains("## [2026-04-20 09:00] create | apr-b"));
1842        assert!(arch.contains("apr one"));
1843        assert!(arch.contains("apr two"));
1844
1845        // Active file now holds ONLY the May entry (no April entries).
1846        let active = fs::read_to_string(store.root.join("log.md")).unwrap();
1847        assert!(active.contains("## [2026-05-02 08:00] update | may-a"));
1848        assert!(
1849            !active.contains("apr-a") && !active.contains("apr-b"),
1850            "April entries must be gone from the active file; got:\n{active}"
1851        );
1852
1853        // The full timeline (archives ++ active) is intact and chronological.
1854        let all = Log::tail(&store, 99).unwrap();
1855        assert_eq!(all, vec![a1, a2, m1]);
1856    }
1857
1858    #[test]
1859    fn rotation_groups_distinct_prior_months_into_separate_archives() {
1860        let (_d, store) = temp_store();
1861        // March + April entries accumulate, then a May append rolls BOTH prior
1862        // months into their own archive files.
1863        let mar = entry(2026, 3, 5, 9, 0, LogKind::Ingest, Some("mar"), "march");
1864        let apr = entry(2026, 4, 5, 9, 0, LogKind::Create, Some("apr"), "april");
1865        Log::append(&store, &mar).unwrap();
1866        Log::append(&store, &apr).unwrap();
1867        // At this point April is current, March already rolled into its archive.
1868        assert!(store.root.join("log").join("2026-03.md").exists());
1869
1870        let may = entry(2026, 5, 5, 9, 0, LogKind::Update, Some("may"), "may");
1871        Log::append(&store, &may).unwrap();
1872
1873        assert!(store.root.join("log").join("2026-03.md").exists());
1874        assert!(store.root.join("log").join("2026-04.md").exists());
1875
1876        // Each archive holds only its own month.
1877        let mar_arch = fs::read_to_string(store.root.join("log").join("2026-03.md")).unwrap();
1878        let apr_arch = fs::read_to_string(store.root.join("log").join("2026-04.md")).unwrap();
1879        assert!(mar_arch.contains("mar") && !mar_arch.contains("apr"));
1880        assert!(apr_arch.contains("apr") && !apr_arch.contains("mar"));
1881
1882        // Active holds only May.
1883        let active = fs::read_to_string(store.root.join("log.md")).unwrap();
1884        assert!(active.contains("may") && !active.contains("mar") && !active.contains("apr"));
1885
1886        // Timeline intact and ordered across both archives + active.
1887        let all = Log::tail(&store, 99).unwrap();
1888        assert_eq!(all, vec![mar, apr, may]);
1889    }
1890
1891    #[test]
1892    fn tail_crosses_into_archive_when_n_spans_month_boundary() {
1893        let (_d, store) = temp_store();
1894        let a1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr1");
1895        let a2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr2");
1896        let m1 = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may1");
1897        let m2 = entry(2026, 5, 3, 8, 0, LogKind::Update, Some("may-b"), "may2");
1898        for e in [&a1, &a2, &m1, &m2] {
1899            Log::append(&store, e).unwrap();
1900        }
1901        // April is now archived; active holds only May. tail(3) must reach back
1902        // into the archive for the third-newest entry.
1903        let tail3 = Log::tail(&store, 3).unwrap();
1904        assert_eq!(tail3, vec![a2.clone(), m1.clone(), m2.clone()]);
1905
1906        // tail within the active month does NOT need the archive but is still
1907        // correct.
1908        let tail2 = Log::tail(&store, 2).unwrap();
1909        assert_eq!(tail2, vec![m1, m2]);
1910    }
1911
1912    #[test]
1913    fn since_crosses_into_archive_and_early_stops() {
1914        let (_d, store) = temp_store();
1915        let a1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr1");
1916        let a2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr2");
1917        let m1 = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may1");
1918        for e in [&a1, &a2, &m1] {
1919            Log::append(&store, e).unwrap();
1920        }
1921        // since a mid-April time: must include the later April entry (from the
1922        // archive) and the May entry, but not the earlier April one.
1923        let got = Log::since(&store, ts(2026, 4, 15, 0, 0)).unwrap();
1924        assert_eq!(got, vec![a2, m1]);
1925    }
1926
1927    #[test]
1928    fn last_validate_at_crosses_into_archive() {
1929        let (_d, store) = temp_store();
1930        // A validate in April, then non-validate work that rolls April away.
1931        Log::append(
1932            &store,
1933            &entry(2026, 4, 10, 9, 0, LogKind::Validate, None, "apr validate"),
1934        )
1935        .unwrap();
1936        Log::append(
1937            &store,
1938            &entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may work"),
1939        )
1940        .unwrap();
1941        // Active has only the May update; the most-recent validate lives in the
1942        // April archive and must still be found.
1943        let last = Log::last_validate_at(&store).unwrap();
1944        assert_eq!(last, Some(ts(2026, 4, 10, 9, 0)));
1945    }
1946
1947    // ── reverse-read correctness on a large (multi-block) log ────────────────
1948
1949    #[test]
1950    fn reverse_read_correct_on_large_single_month_log() {
1951        let (_d, store) = temp_store();
1952        // Append many same-month entries with chunky multi-line notes so the
1953        // file spans well past one REVERSE_BLOCK (8 KiB). Timestamps are
1954        // strictly increasing (a real append-only log is monotonic): each entry
1955        // is 3 minutes after the previous, all within June, so physical order
1956        // equals chronological order and the last-k-physical ARE the k-newest.
1957        let n = 400usize;
1958        let mut expected: Vec<LogEntry> = Vec::new();
1959        for i in 0..n {
1960            let total_min = (i as u32) * 3;
1961            let day = 1 + total_min / (24 * 60);
1962            let hour = (total_min / 60) % 24;
1963            let min = total_min % 60;
1964            // Unique, multi-line note to bulk up the file and detect mis-parses.
1965            let note = format!(
1966                "entry number {i}\nbody line A for {i}\nbody line B for {i} with padding {}",
1967                "x".repeat(40)
1968            );
1969            let e = entry(
1970                2026,
1971                6,
1972                day,
1973                hour,
1974                min,
1975                LogKind::Update,
1976                Some(&format!("records/item-{i:04}")),
1977                &note,
1978            );
1979            Log::append(&store, &e).unwrap();
1980            expected.push(e);
1981        }
1982
1983        // File must actually be multi-block to exercise the backward reader.
1984        let size = fs::metadata(store.root.join("log.md")).unwrap().len();
1985        assert!(
1986            size > (REVERSE_BLOCK as u64) * 2,
1987            "test log not large enough ({size} bytes) to exercise multi-block reverse-read"
1988        );
1989
1990        // tail(5) must equal the 5 newest, exactly.
1991        let tail5 = Log::tail(&store, 5).unwrap();
1992        assert_eq!(tail5, expected[n - 5..].to_vec());
1993
1994        // tail(50) must equal the 50 newest.
1995        let tail50 = Log::tail(&store, 50).unwrap();
1996        assert_eq!(tail50, expected[n - 50..].to_vec());
1997
1998        // tail(all) must reconstruct the whole timeline in order.
1999        let all = Log::tail(&store, n + 10).unwrap();
2000        assert_eq!(all.len(), n);
2001        assert_eq!(all, expected);
2002    }
2003
2004    // ── tail on OUT-OF-ORDER logs (newest-by-timestamp, not last-physical) ────
2005    //
2006    // The append-only contract is non-decreasing time order, but it's only a
2007    // `LOG_OUT_OF_ORDER` warning when violated (corrective entries land below
2008    // the entry they correct; backdated / clock-skewed writes; `merge=union`
2009    // clone merges). `tail N` must return the N newest *by timestamp*, never the
2010    // last N *physical* entries.
2011
2012    /// Write `log.md` verbatim from rendered entries in the given **physical
2013    /// (file) order**, bypassing `Log::append` so the test controls on-disk
2014    /// order exactly (append never reorders within a month, but this is the
2015    /// clearest way to pin a specific physical layout).
2016    fn write_log_physical(store: &Store, entries: &[LogEntry]) {
2017        let mut body = String::new();
2018        for e in entries {
2019            body.push_str(&e.render());
2020        }
2021        let full = compose_active(LOG_FRONTMATTER, &body);
2022        fs::write(store.root.join("log.md"), full).expect("write log.md");
2023    }
2024
2025    #[test]
2026    fn tail_returns_newest_by_timestamp_on_demonstrated_out_of_order_log() {
2027        // The exact case from the review finding: physical order 10:10, 10:05,
2028        // 10:00 (a backdated entry tail). The OLD code returned the last two
2029        // physical entries {10:05, 10:00}; the correct answer is the two newest
2030        // by time {10:05, 10:10}.
2031        let (_d, store) = temp_store();
2032        let e_1010 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("c"), "ten-ten");
2033        let e_1005 = entry(
2034            2026,
2035            5,
2036            27,
2037            10,
2038            5,
2039            LogKind::Create,
2040            Some("b"),
2041            "ten-oh-five",
2042        );
2043        let e_1000 = entry(2026, 5, 27, 10, 0, LogKind::Ingest, Some("a"), "ten-oh-oh");
2044        // Physical order: newest first, then the two older ones — out of order.
2045        write_log_physical(&store, &[e_1010.clone(), e_1005.clone(), e_1000.clone()]);
2046
2047        let tail2 = Log::tail(&store, 2).unwrap();
2048        assert_eq!(
2049            tail2,
2050            vec![e_1005.clone(), e_1010.clone()],
2051            "tail(2) must be the two NEWEST by timestamp (chronological), \
2052             not the last two physical entries"
2053        );
2054        // The newest entry must be present and the oldest absent.
2055        assert!(tail2.contains(&e_1010), "newest (10:10) must be included");
2056        assert!(!tail2.contains(&e_1000), "oldest (10:00) must be excluded");
2057
2058        // tail(1) is just the single newest.
2059        assert_eq!(Log::tail(&store, 1).unwrap(), vec![e_1010.clone()]);
2060        // tail(all) is the full set in chronological order.
2061        assert_eq!(Log::tail(&store, 99).unwrap(), vec![e_1000, e_1005, e_1010]);
2062    }
2063
2064    #[test]
2065    fn tail_no_early_stop_when_newer_entry_sits_before_an_older_one() {
2066        // Guards the unsound within-file early stop: a newer entry (10:50) sits
2067        // PHYSICALLY BEFORE a much older one (10:00). Reading newest-physical-
2068        // first, the scan meets 10:00 before 10:50; any "stop at the first entry
2069        // below the window minimum" rule would bail and drop 10:50.
2070        //
2071        // Physical (top→bottom): 10:55, 10:10, 10:50, 10:00.
2072        // Reverse-scan order:     10:00, 10:50, 10:10, 10:55.
2073        let (_d, store) = temp_store();
2074        let e55 = entry(2026, 5, 27, 10, 55, LogKind::Update, Some("x55"), "55");
2075        let e10 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("x10"), "10");
2076        let e50 = entry(2026, 5, 27, 10, 50, LogKind::Update, Some("x50"), "50");
2077        let e00 = entry(2026, 5, 27, 10, 0, LogKind::Update, Some("x00"), "00");
2078        write_log_physical(
2079            &store,
2080            &[e55.clone(), e10.clone(), e50.clone(), e00.clone()],
2081        );
2082
2083        // The two newest by timestamp are 10:55 and 10:50 — NOT the early-stop
2084        // victim 10:10, and NOT the last-physical 10:00.
2085        let tail2 = Log::tail(&store, 2).unwrap();
2086        assert_eq!(tail2, vec![e50.clone(), e55.clone()]);
2087
2088        let tail3 = Log::tail(&store, 3).unwrap();
2089        assert_eq!(tail3, vec![e10.clone(), e50.clone(), e55.clone()]);
2090    }
2091
2092    #[test]
2093    fn tail_orders_equal_timestamps_by_physical_recency() {
2094        // Three entries share 10:00; one is at 09:59. tail(2) must keep both
2095        // 10:00 entries, and among the equal pair the one appended LATER
2096        // (physically last) sorts last ("newest" = most-recently recorded).
2097        let (_d, store) = temp_store();
2098        let early = entry(2026, 5, 27, 9, 59, LogKind::Create, Some("early"), "before");
2099        let tie_a = entry(
2100            2026,
2101            5,
2102            27,
2103            10,
2104            0,
2105            LogKind::Update,
2106            Some("tie-a"),
2107            "first 10:00",
2108        );
2109        let tie_b = entry(
2110            2026,
2111            5,
2112            27,
2113            10,
2114            0,
2115            LogKind::Update,
2116            Some("tie-b"),
2117            "second 10:00",
2118        );
2119        // Physical append order: early, tie_a, tie_b.
2120        write_log_physical(&store, &[early.clone(), tie_a.clone(), tie_b.clone()]);
2121
2122        let tail2 = Log::tail(&store, 2).unwrap();
2123        assert_eq!(
2124            tail2,
2125            vec![tie_a.clone(), tie_b.clone()],
2126            "both 10:00 entries kept, physically-later one (tie_b) last; 09:59 dropped"
2127        );
2128        // tail(1) keeps only the most-recently-recorded of the equal pair.
2129        assert_eq!(Log::tail(&store, 1).unwrap(), vec![tie_b]);
2130    }
2131
2132    #[test]
2133    fn tail_finds_newest_across_a_backdated_entry_spanning_the_month_boundary() {
2134        // A backdated entry can land physically after newer entries even across
2135        // a rotation: append May entries, then a June entry (rolls May to its
2136        // archive), then append a May-dated correction — it goes into the ACTIVE
2137        // file, physically after June. tail must still rank by timestamp, so the
2138        // June entry stays newest and the backdated May entry is not mistaken
2139        // for the tail.
2140        let (_d, store) = temp_store();
2141        let may1 = entry(2026, 5, 10, 9, 0, LogKind::Ingest, Some("may-1"), "may one");
2142        let may2 = entry(2026, 5, 20, 9, 0, LogKind::Create, Some("may-2"), "may two");
2143        let jun1 = entry(2026, 6, 2, 8, 0, LogKind::Update, Some("jun-1"), "jun one");
2144        Log::append(&store, &may1).unwrap();
2145        Log::append(&store, &may2).unwrap();
2146        Log::append(&store, &jun1).unwrap(); // rotates May -> log/2026-05.md
2147        assert!(store.root.join("log").join("2026-05.md").exists());
2148
2149        // A backdated May correction, appended now: it lands in the active file
2150        // (its month May is not strictly before the active month June), so the
2151        // active file is physically [jun1, may_corr] — out of order.
2152        let may_corr = entry(
2153            2026,
2154            5,
2155            25,
2156            9,
2157            0,
2158            LogKind::Update,
2159            Some("may-2"),
2160            "may correction",
2161        );
2162        Log::append(&store, &may_corr).unwrap();
2163        let active = fs::read_to_string(store.root.join("log.md")).unwrap();
2164        assert!(
2165            active.contains("jun-1") && active.contains("may correction"),
2166            "backdated May entry should be in the active file alongside June; got:\n{active}"
2167        );
2168
2169        // The single newest by timestamp is the June entry, even though the
2170        // backdated May entry is physically last.
2171        assert_eq!(Log::tail(&store, 1).unwrap(), vec![jun1.clone()]);
2172
2173        // tail(2): the two newest by time are may_corr (05-25) and jun1 (06-02).
2174        let tail2 = Log::tail(&store, 2).unwrap();
2175        assert_eq!(tail2, vec![may_corr.clone(), jun1.clone()]);
2176
2177        // tail(3) must reach into the May archive for the third-newest (may2,
2178        // 05-20), proving archive crossing still works on an out-of-order store.
2179        let tail3 = Log::tail(&store, 3).unwrap();
2180        assert_eq!(tail3, vec![may2.clone(), may_corr.clone(), jun1.clone()]);
2181
2182        // tail(all) reconstructs the whole timeline in chronological order.
2183        let all = Log::tail(&store, 99).unwrap();
2184        assert_eq!(all, vec![may1, may2, may_corr, jun1]);
2185    }
2186
2187    #[test]
2188    fn parse_entries_skips_unparseable_header_folding_into_body() {
2189        // A `## [` line that is NOT a valid header should not start a new entry;
2190        // it folds into the preceding entry's note. This guards the
2191        // parse_entries header-validation branch.
2192        let text = "\
2193## [2026-05-27 10:00] create | records/x
2194Body mentions a literal: ## [not a real header here]
2195More body.
2196
2197## [2026-05-27 10:05] update | records/y
2198Second.
2199";
2200        let entries = parse_entries(text);
2201        assert_eq!(entries.len(), 2);
2202        assert_eq!(entries[0].kind, LogKind::Create);
2203        assert!(entries[0].note.contains("## [not a real header here]"));
2204        assert!(entries[0].note.contains("More body."));
2205        assert_eq!(entries[1].kind, LogKind::Update);
2206        assert_eq!(entries[1].note, "Second.");
2207    }
2208
2209    // ── append-only: corrective entries go on the end ─────────────────────────
2210
2211    #[test]
2212    fn append_only_corrective_entry_goes_on_end_without_rewriting() {
2213        let (_d, store) = temp_store();
2214        let original = entry(
2215            2026,
2216            5,
2217            27,
2218            10,
2219            0,
2220            LogKind::Update,
2221            Some("records/northstar"),
2222            "Seat count 120 -> 175.",
2223        );
2224        Log::append(&store, &original).unwrap();
2225        let after_first = fs::read_to_string(store.root.join("log.md")).unwrap();
2226
2227        // A correction is a NEW entry appended on the end; the original text is
2228        // left byte-for-byte intact (append-only contract: no rewrite API).
2229        let correction = entry(
2230            2026,
2231            5,
2232            27,
2233            11,
2234            0,
2235            LogKind::Update,
2236            Some("records/northstar"),
2237            "Correction: seat count is 165, not 175.",
2238        );
2239        Log::append(&store, &correction).unwrap();
2240        let after_second = fs::read_to_string(store.root.join("log.md")).unwrap();
2241
2242        assert!(
2243            after_second.starts_with(&after_first),
2244            "appending must not rewrite earlier bytes"
2245        );
2246        assert!(after_second.contains("Correction: seat count is 165, not 175."));
2247
2248        // Both entries are readable, in order.
2249        let all = Log::tail(&store, 99).unwrap();
2250        assert_eq!(all, vec![original, correction]);
2251    }
2252
2253    // ── concurrent append safety (atomic via temp-file rename) ────────────────
2254
2255    #[test]
2256    fn concurrent_appends_are_atomic_and_total() {
2257        use std::sync::{Arc, Barrier};
2258        use std::thread;
2259
2260        let (_d, store) = temp_store();
2261        // Seed the file so all threads take the read-modify-write path.
2262        Log::append(
2263            &store,
2264            &entry(2026, 7, 1, 0, 0, LogKind::Create, Some("seed"), "seed"),
2265        )
2266        .unwrap();
2267
2268        let threads = 8usize;
2269        let per = 25usize;
2270        let barrier = Arc::new(Barrier::new(threads));
2271        let store = Arc::new(store);
2272
2273        let mut handles = Vec::new();
2274        for tnum in 0..threads {
2275            let b = Arc::clone(&barrier);
2276            let s = Arc::clone(&store);
2277            handles.push(thread::spawn(move || {
2278                b.wait();
2279                for i in 0..per {
2280                    let e = entry(
2281                        2026,
2282                        7,
2283                        1,
2284                        (tnum % 24) as u32,
2285                        (i % 60) as u32,
2286                        LogKind::Update,
2287                        Some(&format!("t{tnum}-i{i}")),
2288                        &format!("thread {tnum} item {i}"),
2289                    );
2290                    Log::append(&s, &e).unwrap();
2291                }
2292            }));
2293        }
2294        for h in handles {
2295            h.join().unwrap();
2296        }
2297
2298        // The atomic temp-file-rename write means no append truncates or
2299        // corrupts another: the file must remain parseable and every line of
2300        // every entry header must be well-formed. Crucially, no entry should be
2301        // lost to a torn write of the *content already on disk* — though
2302        // interleaved read-modify-write WILL drop some appends (last-writer-
2303        // wins on the snapshot). We therefore assert integrity + that the file
2304        // never went empty / corrupt, not an exact count.
2305        let content = fs::read_to_string(store.root.join("log.md")).unwrap();
2306        assert!(content.starts_with("---\ntype: log\n---\n"));
2307
2308        // Every `## [` line must parse as a valid header (no half-written line).
2309        for line in content.lines() {
2310            if line.starts_with("## [") {
2311                assert!(
2312                    Log::parse_header(line).is_some(),
2313                    "corrupt/torn header line on disk: {line:?}"
2314                );
2315            }
2316        }
2317
2318        // The seed entry must survive (it was written before the race and
2319        // every snapshot included it).
2320        assert!(content.contains("## [2026-07-01 00:00] create | seed"));
2321
2322        // The reverse reader must still produce a clean, fully-parseable view.
2323        let all = Log::tail(&store, 10_000).unwrap();
2324        assert!(!all.is_empty());
2325        // No duplicate adjacent identical headers from a torn write: every
2326        // returned entry must have a recognized-or-custom kind and a parseable
2327        // timestamp (already guaranteed by parse), and the list must be
2328        // internally consistent (re-render → re-parse identity for each).
2329        for e in &all {
2330            let rendered = e.render();
2331            let reparsed = parse_single_entry(&rendered).unwrap();
2332            assert_eq!(&reparsed, e);
2333        }
2334    }
2335
2336    // ── render/parse identity ────────────────────────────────────────────────
2337
2338    #[test]
2339    fn render_then_parse_is_identity() {
2340        let cases = vec![
2341            entry(
2342                2026,
2343                1,
2344                2,
2345                3,
2346                4,
2347                LogKind::Ingest,
2348                Some("sources/a.eml"),
2349                "n",
2350            ),
2351            entry(
2352                2026,
2353                12,
2354                31,
2355                23,
2356                59,
2357                LogKind::Validate,
2358                None,
2359                "PASS - 0 errors",
2360            ),
2361            entry(
2362                2026,
2363                6,
2364                15,
2365                12,
2366                30,
2367                LogKind::Custom("proposal".to_string()),
2368                Some("records/p"),
2369                "multi\nline\nnote",
2370            ),
2371            entry(2026, 6, 15, 12, 30, LogKind::Contradiction, Some("obj"), ""),
2372        ];
2373        for e in cases {
2374            let rendered = e.render();
2375            let parsed = parse_single_entry(&rendered).unwrap_or_else(|| {
2376                panic!("failed to reparse rendered entry:\n{rendered}");
2377            });
2378            assert_eq!(parsed, e, "round-trip mismatch for {e:?}");
2379        }
2380    }
2381
2382    // ── regression: rotation re-roll must not duplicate archive entries (#3) ──
2383
2384    /// Count occurrences of `needle` in `haystack` (non-overlapping).
2385    fn count_occurrences(haystack: &str, needle: &str) -> usize {
2386        haystack.matches(needle).count()
2387    }
2388
2389    #[test]
2390    fn regression_archive_reroll_is_idempotent_after_interrupted_rotation() {
2391        // Reconstructs the finding's exact failure window: rotation is two
2392        // non-atomic durable writes — (1) roll prior-month entries into the
2393        // archive, then (2) trim the active file. If the process crashes or the
2394        // active rewrite errors AFTER step (1) commits, the prior-month entries
2395        // stay in the untrimmed active file, the agent retries, and the retry
2396        // re-rolls the SAME entries into the archive a second time. The
2397        // mechanism is precisely a second `append_to_archive` of identical
2398        // entries onto an archive that already holds them.
2399        let (_d, store) = temp_store();
2400        let dir = archive_dir(&store);
2401        let arch = archive_path(&store, 2026, 4);
2402
2403        let apr1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
2404        let apr2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
2405        let month = [apr1.clone(), apr2.clone()];
2406
2407        // First roll: a FRESH rotation (no in-progress marker) appends both.
2408        fs::create_dir_all(&dir).unwrap();
2409        append_to_archive(&arch, &month, false).unwrap();
2410
2411        // The retries are crash-RECOVERIES (the in-progress-rotation marker is
2412        // present), so they dedup the re-rolled identical entries to a no-op.
2413        // Pre-fix this blindly concatenated, doubling every entry; do it twice to
2414        // prove the amplification a real retry loop would cause is suppressed.
2415        append_to_archive(&arch, &month, true).unwrap();
2416        append_to_archive(&arch, &month, true).unwrap();
2417
2418        let archived = fs::read_to_string(&arch).unwrap();
2419        // Each entry header must appear EXACTLY once despite the re-rolls.
2420        assert_eq!(
2421            count_occurrences(&archived, "## [2026-04-10 09:00] ingest | apr-a"),
2422            1,
2423            "re-rolled archive duplicated the first April entry; got:\n{archived}"
2424        );
2425        assert_eq!(
2426            count_occurrences(&archived, "## [2026-04-20 09:00] create | apr-b"),
2427            1,
2428            "re-rolled archive duplicated the second April entry; got:\n{archived}"
2429        );
2430
2431        // And the reader surface (`since`) must return each entry once, not the
2432        // duplicated set the pre-fix archive would have yielded.
2433        let got = Log::since(&store, ts(2026, 4, 1, 0, 0)).unwrap();
2434        assert_eq!(
2435            got,
2436            vec![apr1, apr2],
2437            "since over the re-rolled archive must return each April entry once"
2438        );
2439    }
2440
2441    #[test]
2442    fn regression_rotation_reroll_after_active_untrimmed_does_not_duplicate() {
2443        // End-to-end variant driving the real `Log::append` rotation path. We
2444        // rotate April into its archive via a May append, then SIMULATE the
2445        // partial failure by restoring the pre-trim active file (April + May)
2446        // and re-running `append` — exactly the state a crash-between-the-two-
2447        // writes / failed-active-rewrite + agent-retry produces. The archive
2448        // must still hold each April entry once.
2449        let (_d, store) = temp_store();
2450        let apr1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
2451        let apr2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
2452        Log::append(&store, &apr1).unwrap();
2453        Log::append(&store, &apr2).unwrap();
2454
2455        // Snapshot the active file holding both April entries (this is what is
2456        // still on disk if the post-rotation active rewrite never lands).
2457        let active_path = active_log_path(&store);
2458        let pre_rotation_active = fs::read_to_string(&active_path).unwrap();
2459
2460        // A May append rotates April out and trims the active file.
2461        let may = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may one");
2462        Log::append(&store, &may).unwrap();
2463        let arch = archive_path(&store, 2026, 4);
2464        assert!(arch.exists(), "April should have rotated to its archive");
2465
2466        // Simulate the crash/error: the active rewrite never persisted, so the
2467        // active file still contains the (now also archived) April entries.
2468        fs::write(&active_path, &pre_rotation_active).unwrap();
2469        // A real crash leaves the in-progress-rotation marker behind too — it is
2470        // deleted only AFTER the active trim commits. Restore it so the retry is
2471        // recognized as a crash-recovery re-roll (deduped), not a fresh rotation
2472        // (which would correctly append a genuinely-distinct repeat).
2473        fs::write(rotation_marker_path(&store), b"").unwrap();
2474
2475        // The agent retries the append. Re-partitioning sees April as prior
2476        // months again and re-rolls them — which must NOT duplicate the archive.
2477        let may2 = entry(2026, 5, 3, 8, 0, LogKind::Update, Some("may-b"), "may two");
2478        Log::append(&store, &may2).unwrap();
2479
2480        let archived = fs::read_to_string(&arch).unwrap();
2481        assert_eq!(
2482            count_occurrences(&archived, "## [2026-04-10 09:00] ingest | apr-a"),
2483            1,
2484            "retried rotation duplicated an April entry in the archive; got:\n{archived}"
2485        );
2486        assert_eq!(
2487            count_occurrences(&archived, "## [2026-04-20 09:00] create | apr-b"),
2488            1,
2489            "retried rotation duplicated an April entry in the archive; got:\n{archived}"
2490        );
2491    }
2492
2493    /// Adversarial review (#7) — two GENUINELY-DISTINCT appends that render
2494    /// byte-identically at minute precision (same minute/kind/object/note) must
2495    /// BOTH survive rotation. The backdated-duplicate case: apr1 rotates in May;
2496    /// the backdated apr2 lands in the active file later and rotates in June as a
2497    /// FRESH roll (no in-progress marker), so it must be appended even though the
2498    /// April archive already holds the byte-identical apr1. Pre-fix the
2499    /// set-membership dedup dropped apr2 — silent, unrecoverable audit-log loss.
2500    #[test]
2501    fn regression_distinct_same_minute_entries_both_survive_rotation() {
2502        let (_d, store) = temp_store();
2503        let apr1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("x"), "dup");
2504        let apr2 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("x"), "dup");
2505
2506        Log::append(&store, &apr1).unwrap();
2507        // A May append rotates apr1 into the April archive and COMPLETES (no
2508        // marker left behind).
2509        Log::append(
2510            &store,
2511            &entry(2026, 5, 2, 8, 0, LogKind::Ingest, Some("may"), "m"),
2512        )
2513        .unwrap();
2514        // The backdated apr2 lands in the active file beside the May entry.
2515        Log::append(&store, &apr2).unwrap();
2516        // A June append rotates the May entry AND apr2 out. apr2 is a fresh roll.
2517        Log::append(
2518            &store,
2519            &entry(2026, 6, 1, 8, 0, LogKind::Ingest, Some("jun"), "j"),
2520        )
2521        .unwrap();
2522
2523        let archived = fs::read_to_string(archive_path(&store, 2026, 4)).unwrap();
2524        assert_eq!(
2525            count_occurrences(&archived, "## [2026-04-10 09:00] ingest | x"),
2526            2,
2527            "two distinct same-minute April appends must BOTH survive rotation; got:\n{archived}"
2528        );
2529        // The reader must return both too (read-dedup must not collapse distinct
2530        // same-minute archive entries).
2531        let got = Log::since(&store, ts(2026, 4, 1, 0, 0)).unwrap();
2532        let dups = got
2533            .iter()
2534            .filter(|e| e.object.as_deref() == Some("x"))
2535            .count();
2536        assert_eq!(
2537            dups, 2,
2538            "since must return both distinct same-minute entries; got {got:#?}"
2539        );
2540    }
2541
2542    /// Adversarial review (#12) — `tail`/`since` must return two byte-identical
2543    /// same-minute entries that both live in the ACTIVE log (no archive). Pre-fix
2544    /// a global content-keyed `seen` set suppressed the second on read, so the
2545    /// reader under-reported what was on disk (`grep` saw 2, `tail` saw 1).
2546    #[test]
2547    fn regression_tail_since_return_distinct_same_minute_active_entries() {
2548        let (_d, store) = temp_store();
2549        Log::append(
2550            &store,
2551            &entry(2026, 6, 10, 9, 0, LogKind::Ingest, Some("x"), "dup"),
2552        )
2553        .unwrap();
2554        Log::append(
2555            &store,
2556            &entry(2026, 6, 10, 9, 0, LogKind::Ingest, Some("x"), "dup"),
2557        )
2558        .unwrap();
2559
2560        let tail = Log::tail(&store, 20).unwrap();
2561        assert_eq!(
2562            tail.len(),
2563            2,
2564            "tail must return both same-minute active entries; got {tail:#?}"
2565        );
2566        let since = Log::since(&store, ts(2026, 6, 1, 0, 0)).unwrap();
2567        assert_eq!(
2568            since.len(),
2569            2,
2570            "since must return both same-minute active entries; got {since:#?}"
2571        );
2572    }
2573
2574    /// Adversarial review (#15) — rotation must NOT erase lines before the first
2575    /// VALID entry header. An active log whose entries region opens with a
2576    /// `## [`-shaped line that `parse_header` rejects (a merge orphan / malformed
2577    /// export) before the first real entry: pre-fix `find_first_header` landed on
2578    /// it, `parse_entries` dropped it (no open entry yet), and the rotation
2579    /// re-emitted without it — silently erasing append-only content. The fix
2580    /// folds everything before the first valid header into the preserved header
2581    /// block, which rotation re-emits verbatim.
2582    #[test]
2583    fn regression_rotation_preserves_lines_before_first_valid_header() {
2584        let (_d, store) = temp_store();
2585        let active = active_log_path(&store);
2586        let content = "---\ntype: log\n---\n\n## [orphan from a merge] stray text\n## [2026-04-10 09:00] ingest | x\nbody line\n";
2587        fs::write(&active, content).unwrap();
2588
2589        // A June append rotates the April entry out and rewrites the active file.
2590        Log::append(
2591            &store,
2592            &entry(2026, 6, 1, 8, 0, LogKind::Ingest, Some("jun"), "j"),
2593        )
2594        .unwrap();
2595
2596        let active_after = fs::read_to_string(&active).unwrap();
2597        let arch_after = fs::read_to_string(archive_path(&store, 2026, 4)).unwrap_or_default();
2598        assert!(
2599            active_after.contains("orphan from a merge") || arch_after.contains("orphan from a merge"),
2600            "the pre-first-valid-header line was erased by rotation;\nactive:\n{active_after}\narchive:\n{arch_after}"
2601        );
2602        // Sanity: the real April entry still rotated into its archive.
2603        assert!(
2604            arch_after.contains("## [2026-04-10 09:00] ingest | x"),
2605            "the valid April entry must still rotate to its archive; got:\n{arch_after}"
2606        );
2607    }
2608
2609    // ── regression: reverse reader keeps a `## [` continuation note line (#10) ─
2610
2611    #[test]
2612    fn regression_reverse_reader_preserves_note_line_starting_with_bracket_header() {
2613        // SPEC permits a note of "one or more lines" with no restriction on a
2614        // continuation line starting at column 0 with `## [`. The forward parser
2615        // folds such an unparseable `## [` line into the note; the reverse
2616        // reader (tail/since/last_validate_at) must agree, not split on it.
2617        let (_d, store) = temp_store();
2618        let multi = "First line.\n## [draft outline] more\nThird line.";
2619        let e = entry(
2620            2026,
2621            5,
2622            27,
2623            10,
2624            0,
2625            LogKind::Update,
2626            Some("records/x"),
2627            multi,
2628        );
2629        // Author the log verbatim (render writes the note as-is); this is the
2630        // on-disk shape a hand-written / appended multi-line note produces.
2631        write_raw_log(&store, std::slice::from_ref(&e));
2632
2633        // Pre-fix: header_offsets treated `## [draft outline] more` as a second
2634        // entry boundary, truncating the note to "First line." and dropping the
2635        // carved (non-header) fragment. Post-fix: the full note survives.
2636        let got = Log::tail(&store, 1).unwrap();
2637        assert_eq!(got.len(), 1, "the single entry must be returned");
2638        assert_eq!(
2639            got[0].note, multi,
2640            "reverse reader truncated the note at the `## [` continuation line; \
2641             got {:?}",
2642            got[0].note
2643        );
2644        assert_eq!(got[0], e, "the whole entry must round-trip through tail");
2645
2646        // `since` (the other reverse-reading surface) must agree.
2647        let since = Log::since(&store, ts(2026, 5, 27, 9, 0)).unwrap();
2648        assert_eq!(since, vec![e]);
2649    }
2650
2651    // ── regression: `since` archive pruning uses the UTC month, not local (#11) ─
2652
2653    /// A `DateTime<FixedOffset>` at the given fixed offset (hours east of UTC).
2654    fn ts_offset(
2655        y: i32,
2656        mo: u32,
2657        d: u32,
2658        h: u32,
2659        mi: u32,
2660        offset_hours: i32,
2661    ) -> DateTime<FixedOffset> {
2662        let naive = chrono::NaiveDate::from_ymd_opt(y, mo, d)
2663            .unwrap()
2664            .and_hms_opt(h, mi, 0)
2665            .unwrap();
2666        FixedOffset::east_opt(offset_hours * 3600)
2667            .unwrap()
2668            .from_local_datetime(&naive)
2669            .single()
2670            .unwrap()
2671    }
2672
2673    #[test]
2674    fn regression_since_prunes_archives_on_utc_month_not_local_offset_month() {
2675        // Archive months are bucketed on the UTC calendar. A `since` cutoff with
2676        // a non-UTC offset near a month boundary must not prune an archive whose
2677        // UTC month equals the cutoff's UTC month just because the cutoff's
2678        // LOCAL month is later.
2679        let (_d, store) = temp_store();
2680
2681        // April archive: an entry late on 2026-04-30 at 18:00 UTC.
2682        let apr = entry(
2683            2026,
2684            4,
2685            30,
2686            18,
2687            0,
2688            LogKind::Update,
2689            Some("apr-late"),
2690            "april late",
2691        );
2692        let dir = archive_dir(&store);
2693        fs::create_dir_all(&dir).unwrap();
2694        let mut arch = String::from(LOG_FRONTMATTER);
2695        arch.push('\n');
2696        arch.push_str(&apr.render());
2697        fs::write(archive_path(&store, 2026, 4), arch).unwrap();
2698
2699        // Active file: a clean May entry, so an archive scan is actually needed.
2700        let may = entry(2026, 5, 5, 8, 0, LogKind::Update, Some("may-a"), "may one");
2701        write_raw_log(&store, std::slice::from_ref(&may));
2702
2703        // Cutoff 2026-05-01T00:30:00+07:00 == 2026-04-30T17:30:00Z. The April
2704        // 18:00 UTC entry is strictly newer than this instant.
2705        let cutoff = ts_offset(2026, 5, 1, 0, 30, 7);
2706        // Sanity: the cutoff's UTC month is April, its local month is May.
2707        assert_eq!((cutoff.year(), cutoff.month()), (2026, 5));
2708        assert_eq!(
2709            (
2710                cutoff.with_timezone(&Utc).year(),
2711                cutoff.with_timezone(&Utc).month()
2712            ),
2713            (2026, 4)
2714        );
2715
2716        // Pre-fix: cutoff_ym = (2026, 5) from local fields, so the (2026, 4)
2717        // archive was pruned and the genuinely-newer 18:00 UTC entry was dropped
2718        // — `since` returned only the May entry. Post-fix: cutoff_ym is UTC
2719        // (2026, 4), the April archive is scanned, and both come back.
2720        let got = Log::since(&store, cutoff).unwrap();
2721        let stamps: std::collections::BTreeSet<_> = got.iter().map(|e| e.timestamp).collect();
2722        assert_eq!(
2723            stamps,
2724            [ts(2026, 4, 30, 18, 0), ts(2026, 5, 5, 8, 0)]
2725                .into_iter()
2726                .collect(),
2727            "since(non-UTC cutoff near a month boundary) must include the April \
2728             archive entry newer than the cutoff instant; got {got:?}"
2729        );
2730    }
2731
2732    // ── regression: header-shaped note line corrupts the append-only log (#critical)
2733
2734    #[test]
2735    fn note_line_shaped_like_a_header_is_escaped_and_round_trips() {
2736        // A `contradiction` note quoting an earlier entry header is the
2737        // demonstrated corruption: the verbatim `## [2020-01-01 00:00] delete |
2738        // …` line was parsed as a REAL entry on readback (fabricated entry, real
2739        // note truncated). With write-path escaping it stays note body.
2740        let (_d, store) = temp_store();
2741        let note = "quoting earlier entry:\n## [2020-01-01 00:00] delete | records/contacts/jane.md\nend of quote";
2742        let e = entry(
2743            2026,
2744            6,
2745            11,
2746            4,
2747            41,
2748            LogKind::Contradiction,
2749            Some("records/contacts/jane.md"),
2750            note,
2751        );
2752        Log::append(&store, &e).unwrap();
2753
2754        // On disk: the header-shaped note line must NOT sit at column 0 as a
2755        // `## [` header — `grep "^## \["` must see exactly the one real header.
2756        let raw = fs::read_to_string(store.root.join("log.md")).unwrap();
2757        let header_lines = raw.lines().filter(|l| l.starts_with("## [")).count();
2758        assert_eq!(
2759            header_lines, 1,
2760            "exactly one real entry header may sit at column 0; got:\n{raw}"
2761        );
2762
2763        // Readback returns ONE entry, with the full note intact (no fabricated
2764        // 2020 entry, no truncation).
2765        let got = Log::tail(&store, 10).unwrap();
2766        assert_eq!(got.len(), 1, "exactly one entry; got {got:?}");
2767        assert_eq!(got[0].note, note, "note must round-trip verbatim");
2768        assert_eq!(got[0], e);
2769        let since = Log::since(&store, ts(2026, 1, 1, 0, 0)).unwrap();
2770        assert_eq!(since, vec![e.clone()]);
2771    }
2772
2773    #[test]
2774    fn header_shaped_note_survives_a_later_rotation_uncorrupted() {
2775        // Physical corruption: pre-fix, the fabricated past-dated pseudo-entry
2776        // (year 2020 < current) was rolled into an archive on the NEXT append,
2777        // splitting the real note. With escaping the line is note text, so a
2778        // later append never sees a phantom prior-month entry to roll out.
2779        let (_d, store) = temp_store();
2780        let note = "see\n## [2020-01-01 00:00] delete | records/x.md\nbelow";
2781        let first = entry(
2782            2026,
2783            6,
2784            11,
2785            4,
2786            41,
2787            LogKind::Contradiction,
2788            Some("records/x.md"),
2789            note,
2790        );
2791        Log::append(&store, &first).unwrap();
2792
2793        // Append another current-month entry — the path that re-parses + may
2794        // rotate. No 2020 archive must be created and the first note stays whole.
2795        let second = entry(
2796            2026,
2797            6,
2798            11,
2799            5,
2800            0,
2801            LogKind::Update,
2802            Some("records/y.md"),
2803            "y",
2804        );
2805        Log::append(&store, &second).unwrap();
2806
2807        assert!(
2808            !store.root.join("log").join("2020-01.md").exists(),
2809            "a header-shaped note line must not fabricate a 2020 archive"
2810        );
2811        let got = Log::tail(&store, 10).unwrap();
2812        assert_eq!(got.len(), 2, "two real entries only; got {got:?}");
2813        let first_back = got
2814            .iter()
2815            .find(|e| e.object.as_deref() == Some("records/x.md"));
2816        assert_eq!(
2817            first_back.map(|e| e.note.as_str()),
2818            Some(note),
2819            "the header-shaped note must survive the rotation pass intact"
2820        );
2821    }
2822
2823    #[test]
2824    fn escape_unescape_note_line_round_trips_including_literal_backslash() {
2825        // The escape must be lossless for arbitrary note lines, including a line
2826        // the author genuinely wrote starting with `\` before a header shape.
2827        let valid_header = "## [2020-01-01 00:00] delete | x";
2828        // A real header shape: escaped on write, restored on read.
2829        assert_eq!(
2830            &*escape_note_line(valid_header),
2831            &format!("\\{valid_header}")
2832        );
2833        let escaped = escape_note_line(valid_header).into_owned();
2834        assert_eq!(&*unescape_note_line(&escaped), valid_header);
2835        // An already-`\`-prefixed header-shape line escapes to two backslashes
2836        // and restores to one (never collapses to a bare header).
2837        let pre = format!("\\{valid_header}");
2838        assert_eq!(&*escape_note_line(&pre), &format!("\\{pre}"));
2839        let pre_escaped = escape_note_line(&pre).into_owned();
2840        assert_eq!(&*unescape_note_line(&pre_escaped), &pre);
2841        // Ordinary text (including a `\` that does NOT lead into a header) is
2842        // untouched both ways.
2843        for plain in ["plain note", "## [not a header]", "\\not a header", ""] {
2844            assert_eq!(&*escape_note_line(plain), plain);
2845            assert_eq!(&*unescape_note_line(plain), plain);
2846        }
2847    }
2848
2849    // ── regression: reverse reader scans each block once (no O(file²)) (#perf) ──
2850
2851    #[test]
2852    fn reverse_read_correct_with_header_straddling_a_block_boundary() {
2853        // The incremental per-block header scan must still catch a `## [` marker
2854        // whose `#` falls in one block but whose bytes extend into the already-
2855        // scanned region. Build a log whose total size crosses several blocks and
2856        // verify a full read reconstructs every entry — the straddle case is hit
2857        // by construction across the many block boundaries.
2858        let (_d, store) = temp_store();
2859        let n = 600usize;
2860        let mut expected: Vec<LogEntry> = Vec::new();
2861        for i in 0..n {
2862            let total_min = (i as u32) * 2;
2863            let day = 1 + total_min / (24 * 60);
2864            let hour = (total_min / 60) % 24;
2865            let min = total_min % 60;
2866            // Vary note length so headers land at many offsets relative to the
2867            // fixed 8 KiB block grid, exercising boundary straddles.
2868            let note = format!("note {i} {}", "y".repeat(i % 97));
2869            let e = entry(
2870                2026,
2871                6,
2872                day,
2873                hour,
2874                min,
2875                LogKind::Update,
2876                Some(&format!("records/item-{i:05}")),
2877                &note,
2878            );
2879            Log::append(&store, &e).unwrap();
2880            expected.push(e);
2881        }
2882        let size = fs::metadata(store.root.join("log.md")).unwrap().len();
2883        assert!(
2884            size > (REVERSE_BLOCK as u64) * 3,
2885            "test log not large enough ({size} bytes) to cross several blocks"
2886        );
2887        let all = Log::tail(&store, n + 10).unwrap();
2888        assert_eq!(all, expected, "every entry must reconstruct across blocks");
2889        // A small tail must also be exact (the n-newest by timestamp).
2890        assert_eq!(Log::tail(&store, 7).unwrap(), expected[n - 7..].to_vec());
2891    }
2892
2893    #[test]
2894    fn header_offsets_range_finds_boundary_straddling_marker_once() {
2895        // Two headers; `header_offsets` (whole-buffer) finds both. The range
2896        // scan with a window that splits the buffer between them must report the
2897        // one in its window exactly once, consulting the left neighbour for the
2898        // line-start check.
2899        let buf =
2900            b"## [2026-06-01 00:00] update | a\nnote a\n## [2026-06-01 00:01] update | b\nnote b\n";
2901        let full = header_offsets(buf, 0);
2902        assert_eq!(full.len(), 2, "both headers found over the whole buffer");
2903        let second = full[1] as usize;
2904        // A window covering only the SECOND header's `#` reports just it. Its `#`
2905        // is not at index 0, so `base_is_file_start` is irrelevant here.
2906        let only_second = header_offsets_range(buf, 0, second, second + 1, false);
2907        assert_eq!(only_second, vec![full[1]]);
2908        // A window covering only the FIRST reports just it (right content read
2909        // past the window into the buffer). `base == 0` is the true file start,
2910        // so the index-0 candidate is a real line start.
2911        let only_first = header_offsets_range(buf, 0, 0, 1, true);
2912        assert_eq!(only_first, vec![full[0]]);
2913        // Disjoint windows partition the markers with no double-count.
2914        let mut combined = header_offsets_range(buf, 0, 0, second, true);
2915        combined.extend(header_offsets_range(buf, 0, second, buf.len(), false));
2916        assert_eq!(combined, full);
2917    }
2918
2919    /// CRITICAL regression: a MID-LINE `## [<valid header>]` fragment inside a
2920    /// real entry's note that happens to align with a reverse-read block boundary
2921    /// must NOT be fabricated into an entry. The incremental backward scan reads
2922    /// each block's left edge before its left neighbour is buffered; treating
2923    /// buffer index 0 as a line start there would carve a phantom entry from the
2924    /// fragment and truncate the real entry's note. The fix defers the left-edge
2925    /// candidate until its neighbour is read, so the fragment is correctly seen
2926    /// as note body (its `#` is not at a line start).
2927    #[test]
2928    fn reverse_read_does_not_fabricate_entry_from_midline_header_at_block_boundary() {
2929        let (_d, store) = temp_store();
2930
2931        // A single real entry. Its note carries a mid-line `## [` fragment that
2932        // is a *valid* header shape but is NOT at column 0 (so the writer's
2933        // column-0 escape correctly leaves it verbatim — it is the trigger).
2934        let fragment = "see ## [2020-01-01 00:00] delete | records/x.md";
2935        let hash_in_fragment = fragment.find("##").expect("fragment has `##`");
2936
2937        // Build the raw active log by hand so the fragment's `#` lands at the
2938        // FIRST backward block's left edge: the reverse reader anchors its blocks
2939        // at EOF (`new_start = len - REVERSE_BLOCK` on the first block), so the
2940        // `#` must sit exactly `REVERSE_BLOCK` bytes before EOF. We append note
2941        // padding AFTER the fragment to push EOF out to that distance.
2942        //
2943        // Layout (one entry):
2944        //   <frontmatter>\n## [<header>] | records/real.md\nlead\n<fragment><tail>\n\n
2945        let header_line = "## [2026-06-14 10:00] update | records/real.md\n";
2946        let mut head = String::from(LOG_FRONTMATTER);
2947        head.push('\n');
2948        head.push_str(header_line);
2949        head.push_str("lead\n");
2950        head.push_str(fragment); // fragment opens the second note line
2951
2952        // Absolute offset of the fragment's `#`.
2953        let hash_off = head.len() - fragment.len() + hash_in_fragment;
2954        // We append `<tail>\n\n`. Bytes after `#` = (head.len() - hash_off) +
2955        // tail_len + 2. Need that == REVERSE_BLOCK so `#` is at `len -
2956        // REVERSE_BLOCK` (the first block's left edge).
2957        let after_hash_in_head = head.len() - hash_off;
2958        let tail_len = REVERSE_BLOCK
2959            .checked_sub(after_hash_in_head + 2)
2960            .expect("REVERSE_BLOCK comfortably exceeds the post-`#` head bytes");
2961        let mut body = head;
2962        body.push_str(&"z".repeat(tail_len)); // valid note bytes on the fragment line
2963        body.push('\n');
2964        body.push('\n');
2965        fs::write(store.root.join("log.md"), &body).unwrap();
2966
2967        // The file must be large enough to cross at least one block boundary.
2968        assert!(
2969            body.len() as u64 > REVERSE_BLOCK as u64,
2970            "test log must span >1 block (len {})",
2971            body.len()
2972        );
2973        // And the fragment's `#` sits exactly at the first block's left edge.
2974        let real_hash_off = body.find("see ##").unwrap() + hash_in_fragment;
2975        assert_eq!(
2976            real_hash_off,
2977            body.len() - REVERSE_BLOCK,
2978            "fragment `#` must land on the first backward block's left edge to exercise the bug"
2979        );
2980
2981        // Reverse read must return EXACTLY ONE entry — the real one — and never a
2982        // fabricated `2020-01-01 delete records/x.md` carved from the fragment.
2983        let got = Log::tail(&store, 10).unwrap();
2984        assert_eq!(
2985            got.len(),
2986            1,
2987            "exactly the one real entry; got {} (a fabricated entry means the boundary `#` was mis-read as a header): {got:#?}",
2988            got.len()
2989        );
2990        let only = &got[0];
2991        assert_eq!(only.object.as_deref(), Some("records/real.md"));
2992        assert_eq!(only.timestamp, ts(2026, 6, 14, 10, 0));
2993        // The note is intact end-to-end (not truncated at the fragment): both the
2994        // lead and the verbatim fragment survive.
2995        assert!(
2996            only.note.contains("lead"),
2997            "note keeps its lead; got {:?}",
2998            only.note
2999        );
3000        assert!(
3001            only.note.contains(fragment),
3002            "note keeps the verbatim mid-line fragment (not truncated); got {:?}",
3003            only.note
3004        );
3005    }
3006
3007    // ── regression: tail/since dedup across active+archive on interrupted rotation
3008
3009    #[test]
3010    fn tail_and_since_dedup_entries_present_in_both_active_and_archive() {
3011        // Reconstructs the finding's crash window: the archive write committed
3012        // but the active rewrite never trimmed, so the same April entries live in
3013        // BOTH the untrimmed active file and `log/2026-04.md`. Readers must
3014        // return each entry ONCE, not twice.
3015        let (_d, store) = temp_store();
3016        let apr_a = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
3017        let apr_b = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
3018
3019        // Active file still holds both April entries (the un-trimmed state).
3020        write_raw_log(&store, &[apr_a.clone(), apr_b.clone()]);
3021        // The committed step-1 archive holds the same two entries.
3022        let dir = archive_dir(&store);
3023        fs::create_dir_all(&dir).unwrap();
3024        let mut arch = String::from(LOG_FRONTMATTER);
3025        arch.push('\n');
3026        arch.push_str(&apr_a.render());
3027        arch.push_str(&apr_b.render());
3028        fs::write(archive_path(&store, 2026, 4), arch).unwrap();
3029
3030        // `since` must return each April entry exactly once.
3031        let since = Log::since(&store, ts(2026, 4, 1, 0, 0)).unwrap();
3032        assert_eq!(
3033            since,
3034            vec![apr_a.clone(), apr_b.clone()],
3035            "since must dedup the doubly-present entries; got {since:?}"
3036        );
3037
3038        // `tail` must too — no duplicate window slots.
3039        let tail = Log::tail(&store, 10).unwrap();
3040        assert_eq!(
3041            tail,
3042            vec![apr_a, apr_b],
3043            "tail must dedup the doubly-present entries; got {tail:?}"
3044        );
3045    }
3046}