Skip to main content

dbmd_core/
log.rs

1//! `log` — the append-only, month-rotating chronological log.
2//!
3//! One logical timeline: the active `log.md` at the store root plus
4//! `log/<YYYY-MM>.md` archives. [`Log::append`] rolls older months into
5//! archives on write so the active file stays current-month. [`Log::tail`] and
6//! [`Log::since`] **reverse-read from EOF**. Both read each file they touch in
7//! full — the on-disk order is not guaranteed monotonic, so neither can
8//! early-stop within a file — and select by timestamp: `tail` keeps the `n`
9//! newest, `since` keeps everything newer than the cutoff. Both cross into
10//! month archives only as far back as the requested window reaches (by the
11//! cutoff's month for `since`, by the current `n`th-newest's month for `tail`)
12//! — never the whole history.
13//!
14//! Append-only contract: there is no rewrite API. Corrective entries go on the
15//! end; out-of-order timestamps are a validate warning (`LOG_OUT_OF_ORDER`),
16//! signalling a probable rewrite.
17
18use std::collections::BTreeMap;
19use std::fs::{self, File};
20use std::io::{Read, Seek, SeekFrom};
21use std::path::{Path, PathBuf};
22
23use chrono::{DateTime, Datelike, FixedOffset, NaiveDateTime, TimeZone, Utc};
24
25use crate::store::Store;
26
27/// The on-disk header timestamp format: `YYYY-MM-DD HH:MM` (minute precision,
28/// no timezone). Parsing reattaches UTC; emitting renders the entry's own
29/// wall-clock, so a read→write→read round-trip is stable at minute precision.
30const TS_FORMAT: &str = "%Y-%m-%d %H:%M";
31
32/// The frontmatter block written when the active `log.md` is created.
33const LOG_FRONTMATTER: &str = "---\ntype: log\n---\n\n# Curator log\n";
34
35/// Block size for the backward (reverse-from-EOF) reader.
36const REVERSE_BLOCK: usize = 8 * 1024;
37
38/// A recognized `log.md` entry kind. Custom kinds are valid in the format
39/// (`dbmd validate` warns on unrecognized via `LOG_UNKNOWN_KIND`); this enum
40/// carries the recognized vocabulary plus a [`LogKind::Custom`] catch-all so an
41/// unknown kind round-trips without loss.
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum LogKind {
44    /// A source artifact was ingested.
45    Ingest,
46    /// A file was created.
47    Create,
48    /// A file was updated.
49    Update,
50    /// A file was deleted.
51    Delete,
52    /// A file was renamed/moved.
53    Rename,
54    /// A wiki-link was added.
55    Link,
56    /// A validation pass ran.
57    Validate,
58    /// The index was rebuilt.
59    IndexRebuild,
60    /// A contradiction between sources was flagged.
61    Contradiction,
62    /// Any kind outside the recognized vocabulary, preserved verbatim.
63    Custom(String),
64}
65
66impl LogKind {
67    /// The canonical lowercase string for this kind, as it appears in a log
68    /// header (`ingest`, `index-rebuild`, …).
69    pub fn as_str(&self) -> &str {
70        match self {
71            LogKind::Ingest => "ingest",
72            LogKind::Create => "create",
73            LogKind::Update => "update",
74            LogKind::Delete => "delete",
75            LogKind::Rename => "rename",
76            LogKind::Link => "link",
77            LogKind::Validate => "validate",
78            LogKind::IndexRebuild => "index-rebuild",
79            LogKind::Contradiction => "contradiction",
80            LogKind::Custom(s) => s,
81        }
82    }
83
84    /// Parse a kind from its header token; non-canonical tokens become
85    /// [`LogKind::Custom`].
86    pub fn parse(token: &str) -> LogKind {
87        match token {
88            "ingest" => LogKind::Ingest,
89            "create" => LogKind::Create,
90            "update" => LogKind::Update,
91            "delete" => LogKind::Delete,
92            "rename" => LogKind::Rename,
93            "link" => LogKind::Link,
94            "validate" => LogKind::Validate,
95            "index-rebuild" => LogKind::IndexRebuild,
96            "contradiction" => LogKind::Contradiction,
97            other => LogKind::Custom(other.to_string()),
98        }
99    }
100
101    /// True if this is one of the recognized kinds (i.e. not
102    /// [`LogKind::Custom`]).
103    pub fn is_recognized(&self) -> bool {
104        !matches!(self, LogKind::Custom(_))
105    }
106}
107
108/// One parsed `log.md` entry: a header
109/// (`## [YYYY-MM-DD HH:MM] <kind> | <object>`) plus its body.
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct LogEntry {
112    /// The entry timestamp from the header.
113    pub timestamp: DateTime<FixedOffset>,
114    /// The entry kind.
115    pub kind: LogKind,
116    /// The object slot — a store-relative path/wiki-link target, or `None` for
117    /// store-wide actions like `validate`.
118    pub object: Option<String>,
119    /// The free-form body (one or more lines) explaining what happened.
120    pub note: String,
121}
122
123impl LogEntry {
124    /// Render this entry as it appears on disk: the `## [...]` header line,
125    /// then the note body, then a trailing blank line so successive entries are
126    /// separated. The note is emitted with header-shaped continuation lines
127    /// **escaped** (see [`escape_note_line`]) so a note line that happens to
128    /// match the entry-header shape (`## [YYYY-MM-DD HH:MM] <kind> | <obj>`) can
129    /// never be mistaken for a real entry header on readback or on the next
130    /// rotation. The escape round-trips exactly through [`unescape_note_line`].
131    fn render(&self) -> String {
132        let ts = self.timestamp.format(TS_FORMAT);
133        let mut out = String::new();
134        match &self.object {
135            Some(obj) => {
136                out.push_str(&format!("## [{}] {} | {}\n", ts, self.kind.as_str(), obj));
137            }
138            None => {
139                out.push_str(&format!("## [{}] {}\n", ts, self.kind.as_str()));
140            }
141        }
142        // Trim only the structural line terminators (`\n`/`\r`) — the trailing
143        // blank line separating entries is appended below, so a note's own
144        // trailing newlines would otherwise stack up and shift on every
145        // re-render. Spaces and tabs are legitimate note *content* and must be
146        // preserved verbatim, so the round-trip is exact: readback
147        // (`parse_entries`) trims the same `['\n', '\r']` set and no more, and a
148        // note ending in a space (`"note 0 "`) must reconstruct unchanged.
149        let note = self.note.trim_end_matches(['\n', '\r']);
150        if !note.is_empty() {
151            // Escape per line: a note line that parses as an entry header is
152            // prefixed so it is no longer at column 0 as `## [` — it stays note
153            // body on readback and on rotation, never a fabricated entry.
154            for (i, line) in note.split('\n').enumerate() {
155                if i > 0 {
156                    out.push('\n');
157                }
158                out.push_str(&escape_note_line(line));
159            }
160            out.push('\n');
161        }
162        out.push('\n');
163        out
164    }
165
166    /// The `(year, month)` of this entry's wall-clock timestamp — the rotation
167    /// bucket.
168    fn year_month(&self) -> (i32, u32) {
169        (self.timestamp.year(), self.timestamp.month())
170    }
171}
172
173/// The store's chronological log: a thin handle for the append-only timeline.
174/// All methods take the [`Store`] so they resolve the active `log.md` and the
175/// `log/` archives under the store root.
176#[derive(Debug, Clone)]
177pub struct Log;
178
179impl Log {
180    /// Atomically append `entry` to the active `log.md`, creating it (with
181    /// `type: log` frontmatter) if absent. **If the active log holds entries
182    /// from a prior month, roll those older months into `log/<YYYY-MM>.md`
183    /// first** (atomic move), keeping the active file to the current month.
184    ///
185    /// **Concurrency.** `append` is a read-modify-write of the whole active file
186    /// (`write_atomic` is atomic at the file level, but the read→render→write
187    /// window is not). Two concurrent appenders — the manager and a cron-driven
188    /// background system, say — would otherwise both read the same N-entry
189    /// snapshot and each write N+1 entries, the second rename clobbering the
190    /// first and silently dropping an audit entry. We serialize the whole
191    /// read-modify-write under an advisory file lock (`flock`, held for the
192    /// duration) so concurrent appends queue instead of racing. The lock is
193    /// advisory and process-scoped; it guards the toolkit's own appends, which is
194    /// the realistic contention path.
195    pub fn append(store: &Store, entry: &LogEntry) -> crate::Result<()> {
196        let active = active_log_path(store);
197
198        // Serialize concurrent appends for the whole read-modify-write. Held
199        // until `_lock` drops at function exit (covering both the rotation and
200        // the plain-append paths). A lock failure is non-fatal: we proceed
201        // unlocked rather than refuse to log (best-effort, same posture as the
202        // pre-fix behaviour on platforms without advisory locks).
203        let _lock = AppendLock::acquire(&active);
204
205        // Read the active file's current contents (if any). The "current month"
206        // is the month of the entry being appended (the newest in the timeline);
207        // every existing entry from a strictly-earlier month rolls to archives.
208        let current_ym = entry.year_month();
209
210        if active.exists() {
211            let content = fs::read_to_string(&active)?;
212            let (header, entries) = parse_active(&content);
213
214            // Partition existing entries into prior-month (roll out) and
215            // current-or-later (keep in the active file).
216            let mut by_month: BTreeMap<(i32, u32), Vec<LogEntry>> = BTreeMap::new();
217            let mut keep: Vec<LogEntry> = Vec::new();
218            for e in entries {
219                if e.year_month() < current_ym {
220                    by_month.entry(e.year_month()).or_default().push(e);
221                } else {
222                    keep.push(e);
223                }
224            }
225
226            // A rotation is two non-atomic durable writes (archive append, then
227            // active trim). The marker disambiguates a crash-retry re-roll from a
228            // fresh rotation so a genuinely-distinct same-minute entry is never
229            // dropped (see `rotation_marker_path`). `recovering` is captured
230            // BEFORE we (re)write the marker, so the current attempt's archive
231            // append uses the right mode; the marker only changes what a LATER
232            // retry sees.
233            let marker = rotation_marker_path(store);
234            let recovering = marker.exists();
235
236            if !by_month.is_empty() {
237                // Roll each prior month into its archive (atomic per-file),
238                // appending to any existing archive for that month.
239                let dir = archive_dir(store);
240                fs::create_dir_all(&dir)?;
241                // Mark the rotation in-flight so a crash before the active trim
242                // is recoverable as a re-roll (deduped), not re-appended.
243                if !recovering {
244                    fs::write(&marker, b"")?;
245                }
246
247                // Scope the crash-recovery dedup correctly. The marker only tells
248                // us a rotation may have been interrupted; on its own it does NOT
249                // prove these specific entries are a re-roll. A genuine interrupted
250                // rotation commits its archive appends FIRST and crashes before the
251                // active trim, so on retry every prior-month entry still in the
252                // active file already has a matching copy in its archive — the
253                // whole roll-out batch is a multiset-subset of the archives. Only
254                // then is the dedup the right thing: suppress the copies the prior
255                // attempt already wrote.
256                //
257                // If instead some prior-month entry has NO matching archive copy,
258                // no completed archive write exists to re-roll: the marker is stale
259                // (e.g. committed/synced into a `merge=union` clone after a crash
260                // stranded it) and these entries are a FRESH roll. Treating them as
261                // a re-roll would dedup a genuinely-distinct same-(minute,kind,
262                // object,note) entry against an unrelated pre-existing archive entry
263                // and, because the active file is trimmed unconditionally below,
264                // drop it from disk entirely. So we only enter recovery mode when
265                // the entire batch is already reflected in the archives. The
266                // tradeoff favors preservation: a rare genuine partial multi-month
267                // crash may re-append an already-archived entry (a visible,
268                // recoverable duplicate) rather than ever silently losing one.
269                let recovering_reroll = recovering && batch_is_archived(store, &by_month)?;
270
271                for ((y, m), month_entries) in &by_month {
272                    let path = archive_path(store, *y, *m);
273                    append_to_archive(&path, month_entries, recovering_reroll)?;
274                }
275
276                // Rewrite the active file to the kept (current-month) entries
277                // plus the new entry — atomically.
278                let mut body = String::new();
279                for e in &keep {
280                    body.push_str(&e.render());
281                }
282                body.push_str(&entry.render());
283                let full = compose_active(&header, &body);
284                crate::fsx::write_atomic(&active, full.as_bytes())?;
285                // Rotation committed (active trimmed): clear the in-flight marker.
286                let _ = fs::remove_file(&marker);
287                return Ok(());
288            }
289
290            // No rotation needed. If a stale marker lingers (a crash that trimmed
291            // the active file but never deleted the marker), clear it so the next
292            // real rotation is treated as fresh, not stuck in recovery mode.
293            if recovering {
294                let _ = fs::remove_file(&marker);
295            }
296            // Plain atomic append of the rendered entry.
297            let mut full = content;
298            if !full.ends_with('\n') {
299                full.push('\n');
300            }
301            full.push_str(&entry.render());
302            crate::fsx::write_atomic(&active, full.as_bytes())?;
303            Ok(())
304        } else {
305            // Fresh log: frontmatter + the single entry.
306            if let Some(parent) = active.parent() {
307                fs::create_dir_all(parent)?;
308            }
309            let body = entry.render();
310            let full = compose_active(LOG_FRONTMATTER, &body);
311            crate::fsx::write_atomic(&active, full.as_bytes())?;
312            Ok(())
313        }
314    }
315
316    /// The `n` most-recent entries **by timestamp**, returned oldest→newest.
317    ///
318    /// **Out-of-order safety (mirrors [`Log::since`]).** The log is append-only
319    /// but *not* guaranteed to be in non-decreasing timestamp order on disk: a
320    /// corrective entry is appended below the entry it corrects, a
321    /// backdated/clock-skewed write lands physically after newer entries, and a
322    /// `merge=union` clone merge interleaves both sides until a later agent
323    /// reorders. Out-of-order is only a `LOG_OUT_OF_ORDER` warning, never
324    /// rejected. So the last `n` *physical* entries are **not** the `n` newest
325    /// by time — taking them would omit a genuinely-recent entry that sits
326    /// physically before an older one, and the documented curator warm-up
327    /// (`dbmd log tail 20`) would report a stale picture of what was done lately.
328    /// We therefore feed every entry of each file we touch through a bounded
329    /// newest-by-timestamp window and let it select the true top `n`.
330    ///
331    /// Bounded cost: the active `log.md` is kept to the current month by
332    /// rotation, so a full read of it is cheap and is not a whole-store walk.
333    /// Across archives we *can* prune: each `log/<YYYY-MM>.md` holds only entries
334    /// from that month (rotation buckets by the entry's own year-month), so once
335    /// the window is full, an archive whose month is strictly before the
336    /// window-minimum's month cannot contain any entry newer than the current
337    /// `n`th-newest. We cross archives newest-month-first and stop at the first
338    /// such archive.
339    pub fn tail(store: &Store, n: usize) -> crate::Result<Vec<LogEntry>> {
340        if n == 0 {
341            return Ok(Vec::new());
342        }
343
344        // A bounded window of the `n` entries with the largest timestamps. No
345        // within-file early stop: out-of-order entries mean a newer entry can
346        // sit physically before an older one, so each file is read fully.
347        let mut window = NewestWindow::new(n);
348        // Active↔archive overlap dedup, narrowly scoped AND gated on the
349        // crash-recovery marker (mirrors `since` and the write side). The only
350        // legitimate source of an active↔archive overlap is a rotation
351        // interrupted between its two non-atomic durable writes (archive append
352        // committed, active trim not), which leaves the SAME entry in both the
353        // untrimmed active file and its month archive — exactly the state the
354        // `.rotating` marker records (see `rotation_marker_path`). When the
355        // marker is ABSENT (normal operation) there is no such overlap: a
356        // backdated append that rotated as a FRESH roll and merely collides on
357        // (minute,kind,object,note) with an active entry is a genuinely-DISTINCT
358        // event the write side deliberately preserved on disk, and the reader
359        // must report it. The old code deduped unconditionally and silently
360        // dropped that distinct entry (the bug). We therefore build `active_seen`
361        // — and suppress matching archive entries below — ONLY while the marker
362        // is present. Even then it suppresses only an ARCHIVE entry that matches
363        // an ACTIVE one, never active-vs-active or archive-vs-archive.
364        let recovering = rotation_marker_path(store).exists();
365        let mut active_seen: std::collections::HashSet<EntryKey> = std::collections::HashSet::new();
366
367        // Active file: scan fully (current-month-bounded by rotation). Record
368        // every identity for overlap detection *only while recovering* (the
369        // marker is present); consider every entry regardless — a same-minute
370        // duplicate WITHIN the active file is two distinct appends.
371        let active = active_log_path(store);
372        if active.exists() {
373            reverse_collect(&active, |e| {
374                if recovering {
375                    active_seen.insert(entry_key(&e));
376                }
377                window.consider(e);
378                false
379            })?;
380        }
381
382        // Archives, newest-month-first. Once the window is full, an archive
383        // whose month is strictly before the window-minimum's month holds only
384        // entries older than the current cutoff, so it (and every older archive)
385        // is skippable.
386        for archive in list_archives_desc(store)? {
387            if let (true, Some(cutoff_ym), Some(arch_ym)) = (
388                window.is_full(),
389                window.min_year_month(),
390                archive_year_month(&archive),
391            ) {
392                if arch_ym < cutoff_ym {
393                    break;
394                }
395            }
396            reverse_collect(&archive, |e| {
397                // Suppress only a crash-retry active↔archive overlap, and only
398                // when recovering (marker present). `active_seen` is empty
399                // otherwise, so this never suppresses in normal operation — a
400                // distinct same-(minute,kind,object,note) archive entry survives
401                // even when an active entry collides on those fields. Archives
402                // are never deduped against each other.
403                if !active_seen.contains(&entry_key(&e)) {
404                    window.consider(e);
405                }
406                false
407            })?;
408        }
409
410        Ok(window.into_sorted())
411    }
412
413    /// Entries strictly newer than `time`, reverse-scanning active → archives.
414    ///
415    /// **No within-file early stop.** The log is append-only but *not*
416    /// guaranteed to be in non-decreasing timestamp order on disk: a corrective
417    /// entry is appended below the entry it corrects (SPEC: "if a finding is
418    /// wrong, append a corrective entry below it"), a backdated/clock-skewed
419    /// write lands physically after newer entries, and a `merge=union` clone
420    /// merge interleaves both sides until a later agent reorders. Out-of-order
421    /// is only a `LOG_OUT_OF_ORDER` warning, never rejected. So a newer entry
422    /// can sit physically *before* an older one; stopping at the first
423    /// older-than-`time` entry would silently drop those — the documented
424    /// curator warm-up (`dbmd log since <ts>`) would miss real recent work.
425    /// We therefore read every entry of each file we touch.
426    ///
427    /// Bounded cost: the active `log.md` is kept to the current month by
428    /// rotation, so a full read of it is cheap (the same read `tail` does for a
429    /// large `n`) and is not a whole-store walk. Across archives we *can* stop:
430    /// each `log/<YYYY-MM>.md` holds only entries from that month (rotation
431    /// buckets by the entry's own year-month), so an archive whose month is
432    /// strictly before `time`'s month cannot contain any entry newer than
433    /// `time`. We cross archives newest-month-first and stop at the first whose
434    /// month is entirely at or before `time`'s.
435    pub fn since(store: &Store, time: DateTime<FixedOffset>) -> crate::Result<Vec<LogEntry>> {
436        let mut collected: Vec<LogEntry> = Vec::new();
437        // Active↔archive overlap dedup, narrowly scoped AND gated on the
438        // crash-recovery marker (mirrors `tail` and the write side). An overlap
439        // (the SAME entry in both the untrimmed active file and the archive)
440        // arises ONLY from a rotation interrupted between its two non-atomic
441        // durable writes — exactly the state the `.rotating` marker records (see
442        // `rotation_marker_path`). When the marker is ABSENT (normal operation)
443        // there is no overlap to mask: a backdated append that rotated as a FRESH
444        // roll and merely collides on (minute,kind,object,note) with an active
445        // entry is a genuinely-DISTINCT event the write side preserved on disk,
446        // and the reader must report it. Deduping unconditionally silently
447        // dropped that distinct entry (the bug). We therefore record ACTIVE
448        // identities — and suppress matching archive entries below — ONLY while
449        // recovering; even then it suppresses an ARCHIVE entry against an ACTIVE
450        // one, never active-vs-active or archive-vs-archive.
451        let recovering = rotation_marker_path(store).exists();
452        let mut active_seen: std::collections::HashSet<EntryKey> = std::collections::HashSet::new();
453
454        // Active file: scan fully, no early stop (out-of-order safe). Collect
455        // every in-window entry (a same-minute duplicate within the active file
456        // is two distinct appends), recording identities for overlap detection
457        // only while recovering (the marker is present).
458        let active = active_log_path(store);
459        if active.exists() {
460            reverse_collect(&active, |e| {
461                if e.timestamp > time {
462                    if recovering {
463                        active_seen.insert(entry_key(&e));
464                    }
465                    collected.push(e);
466                }
467                false
468            })?;
469        }
470
471        // The cutoff's own (year, month): any archive strictly before it holds
472        // only older entries and is skippable. Archive months are bucketed on
473        // the UTC calendar (on-disk timestamps are offset-free and re-read as
474        // UTC; rotation buckets by the entry's UTC year-month), so the pruning
475        // calendar must be UTC too. A non-UTC `since` offset (advertised in the
476        // CLI hint, e.g. `…T00:30:00+07:00`) whose local month differs from its
477        // UTC month would otherwise prune away an archive holding entries that
478        // are strictly newer than `time` — `time.year()/.month()` read the
479        // offset-LOCAL calendar, not UTC.
480        let cutoff_utc = time.with_timezone(&Utc);
481        let cutoff_ym = (cutoff_utc.year(), cutoff_utc.month());
482
483        for archive in list_archives_desc(store)? {
484            // Archives are newest-month-first; once a month is strictly before
485            // the cutoff's month, every remaining (older) archive is too.
486            if let Some(arch_ym) = archive_year_month(&archive) {
487                if arch_ym < cutoff_ym {
488                    break;
489                }
490            }
491            // Scan this archive fully — within a month, entries may still be
492            // out of order, so no within-file early stop.
493            reverse_collect(&archive, |e| {
494                // Suppress only a crash-retry active↔archive overlap, and only
495                // when recovering (marker present). `active_seen` is empty
496                // otherwise, so a distinct same-(minute,kind,object,note) archive
497                // entry survives in normal operation even when an active entry
498                // collides on those fields.
499                if e.timestamp > time && !active_seen.contains(&entry_key(&e)) {
500                    collected.push(e);
501                }
502                false
503            })?;
504        }
505
506        collected.reverse();
507        Ok(collected)
508    }
509
510    /// The timestamp of the most recent `validate` entry — the default `since`
511    /// window for working-set validation ([`crate::validate::validate_working_set`]).
512    pub fn last_validate_at(store: &Store) -> crate::Result<Option<DateTime<FixedOffset>>> {
513        let mut found: Option<DateTime<FixedOffset>> = None;
514
515        let active = active_log_path(store);
516        if active.exists() {
517            reverse_collect(&active, |e| {
518                if e.kind == LogKind::Validate {
519                    found = Some(e.timestamp);
520                    true
521                } else {
522                    false
523                }
524            })?;
525        }
526
527        if found.is_none() {
528            for archive in list_archives_desc(store)? {
529                reverse_collect(&archive, |e| {
530                    if e.kind == LogKind::Validate {
531                        found = Some(e.timestamp);
532                        true
533                    } else {
534                        false
535                    }
536                })?;
537                if found.is_some() {
538                    break;
539                }
540            }
541        }
542
543        Ok(found)
544    }
545
546    /// Parse a single entry header (`## [YYYY-MM-DD HH:MM] <kind> | <object>`)
547    /// into its timestamp, kind, and object. Returns `None` if the line isn't a
548    /// well-formed entry header.
549    pub fn parse_header(line: &str) -> Option<(DateTime<FixedOffset>, LogKind, Option<String>)> {
550        let line = line.trim_end_matches(['\n', '\r']);
551        let rest = line.strip_prefix("## [")?;
552        let close = rest.find(']')?;
553        let ts_str = &rest[..close];
554        let timestamp = parse_timestamp(ts_str)?;
555
556        // Everything after the closing bracket: ` <kind> | <object>` or
557        // ` <kind>`.
558        let after = rest[close + 1..].trim();
559        if after.is_empty() {
560            return None;
561        }
562
563        let (kind_str, object) = match after.split_once('|') {
564            Some((k, o)) => {
565                let obj = o.trim();
566                let obj = if obj.is_empty() {
567                    None
568                } else {
569                    Some(obj.to_string())
570                };
571                (k.trim(), obj)
572            }
573            None => (after, None),
574        };
575
576        if kind_str.is_empty() {
577            return None;
578        }
579
580        Some((timestamp, LogKind::parse(kind_str), object))
581    }
582}
583
584// ── Internal helpers ────────────────────────────────────────────────────────
585
586/// A bounded window of the `n` entries with the largest timestamps, fed by a
587/// **reverse (newest-physical-first) scan** and used by [`Log::tail`].
588///
589/// Why this exists: the last `n` *physical* entries are the `n` newest only
590/// when the log is in non-decreasing time order. That's the append-only
591/// contract, not a guarantee — a backdated, clock-skewed, or merge-interleaved
592/// entry violates it (and trips the `LOG_OUT_OF_ORDER` validate warning). The
593/// window decouples `tail` from that assumption: it keeps the `n` largest
594/// timestamps seen regardless of the order they arrive in, so the caller can
595/// read each file fully (no fragile within-file early stop) and still get the
596/// true top `n`.
597///
598/// Tie-break: entries sharing a timestamp at the window boundary are ordered by
599/// **physical recency** — the one appended later (encountered earlier in the
600/// reverse scan, i.e. a smaller `arrival`) wins. "Newest" means most-recently
601/// recorded.
602struct NewestWindow {
603    cap: usize,
604    /// Min-by-(timestamp, then physical-oldest) heap: the root is always the
605    /// next entry to evict once the window is full.
606    heap: std::collections::BinaryHeap<WindowItem>,
607    /// Count of entries fed in, in reverse-scan order, used as the tie-break
608    /// key (0 = newest physical).
609    next_arrival: u64,
610}
611
612impl NewestWindow {
613    fn new(cap: usize) -> Self {
614        NewestWindow {
615            cap,
616            heap: std::collections::BinaryHeap::with_capacity(cap),
617            next_arrival: 0,
618        }
619    }
620
621    /// Offer one entry from the scan. If the window isn't full it's kept; once
622    /// full, it's kept (evicting the current minimum) iff its timestamp is `>=`
623    /// the window minimum. Equal-timestamp boundary entries resolve by physical
624    /// recency (see the type doc).
625    fn consider(&mut self, entry: LogEntry) {
626        let arrival = self.next_arrival;
627        self.next_arrival += 1;
628
629        if self.heap.len() < self.cap {
630            self.heap.push(WindowItem { entry, arrival });
631            return;
632        }
633
634        // Window full. The heap root is the current minimum (oldest-by-
635        // timestamp held; on a tie, the oldest-physical).
636        let root = self.heap.peek().expect("full window has a root");
637        if entry.timestamp > root.entry.timestamp {
638            // Strictly newer than the window minimum: it belongs; evict the min.
639            self.heap.pop();
640            self.heap.push(WindowItem { entry, arrival });
641        }
642        // On `<=` we keep the window as-is. `<` is plainly too old. `==` is the
643        // tie case: the scan is newest-physical-first, so this entry is
644        // physically *older* than the held one of equal timestamp, and the
645        // tie-break keeps the physically-newer (most-recently-recorded) entry —
646        // so the incoming one is dropped.
647    }
648
649    /// Whether the window already holds its full `cap` entries.
650    fn is_full(&self) -> bool {
651        self.heap.len() >= self.cap
652    }
653
654    /// The `(year, month)` of the window's current minimum (oldest kept) entry,
655    /// or `None` when the window is empty. Used to prune older archives: an
656    /// archive month strictly before this can't beat the current cutoff.
657    fn min_year_month(&self) -> Option<(i32, u32)> {
658        self.heap
659            .peek()
660            .map(|item| (item.entry.timestamp.year(), item.entry.timestamp.month()))
661    }
662
663    /// The held entries, oldest→newest (chronological), ties broken
664    /// oldest-physical→newest-physical.
665    fn into_sorted(self) -> Vec<LogEntry> {
666        let mut items: Vec<WindowItem> = self.heap.into_vec();
667        // Ascending by timestamp; on a tie, oldest-physical (larger arrival)
668        // first so the most-recently-recorded entry sorts last.
669        items.sort_by(|a, b| {
670            a.entry
671                .timestamp
672                .cmp(&b.entry.timestamp)
673                .then(b.arrival.cmp(&a.arrival))
674        });
675        items.into_iter().map(|i| i.entry).collect()
676    }
677}
678
679/// One slot in [`NewestWindow`]'s heap. `Ord` is defined so the heap is a
680/// **min-heap on `(timestamp, physical-oldest)`**: `BinaryHeap` is a max-heap,
681/// so the root (max under this `Ord`) is the eviction candidate — the smallest
682/// timestamp, and on a tie the oldest-physical (largest `arrival`).
683struct WindowItem {
684    entry: LogEntry,
685    arrival: u64,
686}
687
688impl PartialEq for WindowItem {
689    fn eq(&self, other: &Self) -> bool {
690        self.entry.timestamp == other.entry.timestamp && self.arrival == other.arrival
691    }
692}
693impl Eq for WindowItem {}
694
695impl Ord for WindowItem {
696    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
697        // Reverse on timestamp so the *smallest* timestamp is the heap max
698        // (eviction candidate). On equal timestamps, the larger `arrival`
699        // (older physical) is the heap max so it is evicted first.
700        other
701            .entry
702            .timestamp
703            .cmp(&self.entry.timestamp)
704            .then(self.arrival.cmp(&other.arrival))
705    }
706}
707impl PartialOrd for WindowItem {
708    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
709        Some(self.cmp(other))
710    }
711}
712
713/// An advisory, exclusive lock serializing concurrent [`Log::append`] calls.
714///
715/// Held on a dedicated sibling lock file (`<active>.lock`) rather than on
716/// `log.md` itself: `write_atomic` replaces the active file by `rename`, so the
717/// active inode changes under us and a lock on its fd would not cover the new
718/// file. The lock file is stable, so the lock spans the whole read-modify-write.
719///
720/// On Unix this is `flock(LOCK_EX)`, released on drop (or implicitly when the
721/// process exits / the fd closes, so a crash never strands the lock). The
722/// lock file is created if absent and intentionally left on disk between runs
723/// (locking it does not depend on its contents). On non-Unix targets the lock
724/// is a no-op — db.md's append surface is Unix-targeted, and a missing advisory
725/// lock degrades to the pre-fix last-writer-wins, never to incorrectness of a
726/// single writer.
727struct AppendLock {
728    #[cfg(unix)]
729    file: Option<File>,
730}
731
732impl AppendLock {
733    /// Acquire the exclusive append lock for the store whose active log is
734    /// `active`. Best-effort: any failure to open or lock the lock file yields
735    /// an unlocked guard (we log rather than refuse to log). Blocks until the
736    /// lock is granted when another appender holds it.
737    fn acquire(active: &Path) -> AppendLock {
738        #[cfg(unix)]
739        {
740            let file = Self::open_and_lock(active);
741            AppendLock { file }
742        }
743        #[cfg(not(unix))]
744        {
745            let _ = active;
746            AppendLock {}
747        }
748    }
749
750    #[cfg(unix)]
751    fn open_and_lock(active: &Path) -> Option<File> {
752        use std::os::unix::io::AsRawFd;
753
754        // The lock file lives beside the active log; ensure its parent exists
755        // (the fresh-log path may run before `log.md`'s directory is created).
756        if let Some(parent) = active.parent() {
757            let _ = fs::create_dir_all(parent);
758        }
759        let lock_path = lock_path_for(active);
760        let file = std::fs::OpenOptions::new()
761            .create(true)
762            .truncate(false)
763            .write(true)
764            .open(&lock_path)
765            .ok()?;
766
767        // Blocking exclusive advisory lock. `flock` is in libc, which every Rust
768        // binary links, so the bare `extern "C"` declaration needs no crate dep.
769        let rc = unsafe { flock(file.as_raw_fd(), LOCK_EX) };
770        if rc != 0 {
771            // Could not lock (e.g. a filesystem without flock support): proceed
772            // unlocked rather than fail the append.
773            return None;
774        }
775        Some(file)
776    }
777}
778
779#[cfg(unix)]
780impl Drop for AppendLock {
781    fn drop(&mut self) {
782        use std::os::unix::io::AsRawFd;
783        if let Some(file) = &self.file {
784            // Release explicitly; the fd close on drop would also release it.
785            unsafe { flock(file.as_raw_fd(), LOCK_UN) };
786        }
787    }
788}
789
790#[cfg(unix)]
791extern "C" {
792    fn flock(fd: std::os::raw::c_int, operation: std::os::raw::c_int) -> std::os::raw::c_int;
793}
794
795/// `flock` operation: exclusive lock (`LOCK_EX`), blocking.
796#[cfg(unix)]
797const LOCK_EX: std::os::raw::c_int = 2;
798/// `flock` operation: unlock (`LOCK_UN`).
799#[cfg(unix)]
800const LOCK_UN: std::os::raw::c_int = 8;
801
802/// The advisory-lock sibling path for an active log file (`<name>.lock`).
803#[cfg(unix)]
804fn lock_path_for(active: &Path) -> PathBuf {
805    let mut name = active
806        .file_name()
807        .map(|s| s.to_os_string())
808        .unwrap_or_else(|| std::ffi::OsString::from("log.md"));
809    name.push(".lock");
810    match active.parent() {
811        Some(parent) => parent.join(name),
812        None => PathBuf::from(name),
813    }
814}
815
816/// The active `log.md` path under the store root.
817fn active_log_path(store: &Store) -> PathBuf {
818    store.root.join("log.md")
819}
820
821/// The `log/` archive directory under the store root.
822fn archive_dir(store: &Store) -> PathBuf {
823    store.root.join("log")
824}
825
826/// The `log/<YYYY-MM>.md` archive path for a given month.
827fn archive_path(store: &Store, year: i32, month: u32) -> PathBuf {
828    archive_dir(store).join(format!("{:04}-{:02}.md", year, month))
829}
830
831/// The crash-recovery marker for an in-progress rotation.
832///
833/// Its **presence** at the start of [`Log::append`] means a prior rotation
834/// appended prior-month entries to their archives but may not have trimmed the
835/// active file (a crash, or an active-rewrite error, between the two non-atomic
836/// durable writes). The retry must then DEDUP the re-rolled entries against the
837/// archive so it adds nothing.
838///
839/// Its **absence** means a fresh rotation: every prior-month entry being rolled
840/// is genuinely new to its archive and is appended UNCONDITIONALLY. This is the
841/// load-bearing distinction — a content-only dedup cannot tell an idempotent
842/// re-roll of one physical entry from a genuinely-distinct same-minute repeat
843/// (on-disk headers are minute-precision, so two real appends to the same object
844/// in the same minute with the same note render byte-identically). Gating the
845/// dedup on "are we recovering a crashed rotation?" lets a backdated duplicate
846/// survive while still suppressing a true re-roll.
847///
848/// Lives in `log/` (toolkit-managed; a dotfile, so never walked, indexed, or
849/// validated as content — `list_archives_desc` matches only `YYYY-MM.md`).
850fn rotation_marker_path(store: &Store) -> PathBuf {
851    archive_dir(store).join(".rotating")
852}
853
854/// Parse a `YYYY-MM-DD HH:MM` header timestamp, reattaching UTC. `None` on any
855/// malformed shape.
856fn parse_timestamp(s: &str) -> Option<DateTime<FixedOffset>> {
857    let naive = NaiveDateTime::parse_from_str(s.trim(), TS_FORMAT).ok()?;
858    let utc = FixedOffset::east_opt(0)?;
859    utc.from_local_datetime(&naive).single()
860}
861
862/// Split a `log.md` / archive file into its leading frontmatter+heading block
863/// (everything up to and including the line before the first `## [` header) and
864/// its parsed entries. If there are no entries, the whole content is the header
865/// block.
866fn parse_active(content: &str) -> (String, Vec<LogEntry>) {
867    match find_first_header(content) {
868        Some(idx) => {
869            let header = content[..idx].to_string();
870            let entries = parse_entries(&content[idx..]);
871            (header, entries)
872        }
873        None => (content.to_string(), Vec::new()),
874    }
875}
876
877/// Byte offset of the first **valid** entry header — a `## [` line-start that
878/// [`Log::parse_header`] accepts — or `None`.
879///
880/// Crucially this skips `## [`-SHAPED lines that `parse_header` REJECTS (a
881/// merge-orphaned note, an exporter-malformed line) appearing before the first
882/// real entry: everything up to the first valid header becomes the preserved
883/// `header` block in [`parse_active`], so a rotation re-emits it verbatim.
884/// Returning the first `## [`-shaped line instead (as this once did) put those
885/// pre-entry lines into the entries region, where [`parse_entries`] — which
886/// opens an entry only on a parseable header — dropped them on the floor,
887/// silently erasing append-only content on the next rotation.
888fn find_first_header(content: &str) -> Option<usize> {
889    let mut offset = 0usize;
890    for line in content.split_inclusive('\n') {
891        let line_str = line.trim_end_matches(['\r', '\n']);
892        if line_str.starts_with("## [") && Log::parse_header(line_str).is_some() {
893            return Some(offset);
894        }
895        offset += line.len();
896    }
897    None
898}
899
900/// Whether `line` is a note line that — left unescaped — could be mistaken for
901/// an entry header. It is *header-ambiguous* when it is a (possibly empty) run
902/// of leading backslashes followed by a string that [`Log::parse_header`]
903/// accepts. The escape (one leading backslash) and only the escape is added to,
904/// or stripped from, such lines, so the transform is fully reversible:
905/// `## [..]` (a real header shape in note text) ⇄ `\## [..]`, and a literal
906/// `\## [..]` a note already contains ⇄ `\\## [..]`.
907fn is_header_ambiguous(line: &str) -> bool {
908    let stripped = line.trim_start_matches('\\');
909    // Only treat it as ambiguous if some backslashes were the *only* prefix and
910    // the remainder is a valid header — a backslash run that does not lead into
911    // a header (e.g. `\not a header`) is ordinary note text, left untouched.
912    Log::parse_header(stripped).is_some()
913}
914
915/// Escape one note line for on-disk emission so it can never be parsed as an
916/// entry header (the [write-path fix] for header-shaped notes corrupting the
917/// append-only log). A header-ambiguous line is prefixed with a single
918/// backslash, moving its `## [` off column 0; every other line is emitted
919/// verbatim. Reversed exactly by [`unescape_note_line`].
920fn escape_note_line(line: &str) -> std::borrow::Cow<'_, str> {
921    if is_header_ambiguous(line) {
922        std::borrow::Cow::Owned(format!("\\{line}"))
923    } else {
924        std::borrow::Cow::Borrowed(line)
925    }
926}
927
928/// Reverse [`escape_note_line`]: strip exactly one leading backslash from a
929/// header-ambiguous on-disk note line, restoring the literal the author wrote.
930/// A line that is not header-ambiguous (including a genuine `\not a header`) is
931/// returned untouched, so the round-trip is lossless for arbitrary note text.
932fn unescape_note_line(line: &str) -> std::borrow::Cow<'_, str> {
933    if let Some(rest) = line.strip_prefix('\\') {
934        if is_header_ambiguous(line) {
935            return std::borrow::Cow::Borrowed(rest);
936        }
937    }
938    std::borrow::Cow::Borrowed(line)
939}
940
941/// Parse every entry in a slice that begins at (or before, header-block
942/// included) a sequence of `## [` headers. Headers that fail to parse are
943/// skipped (their body folds into the previous valid entry's note is avoided —
944/// they simply start no new entry).
945fn parse_entries(text: &str) -> Vec<LogEntry> {
946    let mut entries: Vec<LogEntry> = Vec::new();
947    let mut cur_header: Option<(DateTime<FixedOffset>, LogKind, Option<String>)> = None;
948    let mut cur_note: Vec<&str> = Vec::new();
949
950    let flush = |entries: &mut Vec<LogEntry>,
951                 header: &mut Option<(DateTime<FixedOffset>, LogKind, Option<String>)>,
952                 note: &mut Vec<&str>| {
953        if let Some((timestamp, kind, object)) = header.take() {
954            // Reverse the per-line header escape `render` applies so an escaped
955            // header-shaped note line round-trips back to its literal form.
956            let joined = note
957                .iter()
958                .map(|line| unescape_note_line(line))
959                .collect::<Vec<_>>()
960                .join("\n");
961            let note_str = joined.trim_matches(['\n', '\r']).to_string();
962            entries.push(LogEntry {
963                timestamp,
964                kind,
965                object,
966                note: note_str,
967            });
968        }
969        note.clear();
970    };
971
972    for line in text.lines() {
973        if line.starts_with("## [") {
974            if let Some(parsed) = Log::parse_header(line) {
975                // Close the previous entry, start a new one.
976                flush(&mut entries, &mut cur_header, &mut cur_note);
977                cur_header = Some(parsed);
978                continue;
979            }
980            // Unparseable `## [` line: treat as body of the current entry.
981        }
982        if cur_header.is_some() {
983            cur_note.push(line);
984        }
985    }
986    flush(&mut entries, &mut cur_header, &mut cur_note);
987    entries
988}
989
990/// Recompose an active/archive file from a header block and an entry body.
991fn compose_active(header: &str, body: &str) -> String {
992    let mut out = String::new();
993    out.push_str(header);
994    if !header.is_empty() && !header.ends_with('\n') {
995        out.push('\n');
996    }
997    // Exactly one blank line between the heading block and the first entry.
998    if !header.is_empty() && !out.ends_with("\n\n") {
999        out.push('\n');
1000    }
1001    out.push_str(body);
1002    out
1003}
1004
1005/// Append entries to a month archive, creating it with `type: log` frontmatter
1006/// if absent. Atomic (temp-file rename). Entries are appended in the given
1007/// order (callers pass them already chronological within the month).
1008///
1009/// **`recovering` — the re-roll gate.** Rotation in [`Log::append`] is two
1010/// non-atomic durable writes: roll prior-month entries into the archive, then
1011/// rewrite (trim) the active file. If the process crashes or the active rewrite
1012/// errors *after* the archive write commits, the prior-month entries remain in
1013/// the still-untrimmed active file and the agent's retry re-rolls them here. A
1014/// naive concatenate would then duplicate every entry, amplifying on each retry.
1015///
1016/// We CANNOT dedup that away by content alone: on-disk headers are
1017/// minute-precision, so two genuinely-distinct appends to the same object in the
1018/// same minute with the same note render byte-identically — indistinguishable
1019/// from a re-roll of one physical entry. Deduping unconditionally therefore
1020/// silently destroyed a legitimately-distinct backdated duplicate (the bug).
1021///
1022/// So the caller passes `recovering`: `true` only when an in-progress-rotation
1023/// marker was found (a crash-retry), where we dedup the incoming batch against
1024/// the archive **by multiplicity** (skip an incoming entry only while the
1025/// archive still holds an unconsumed copy of its identity) so a re-roll of the
1026/// SAME physical entries adds nothing. On a fresh rotation (`false`) every entry
1027/// is genuinely new to the archive and is appended unconditionally, so a
1028/// distinct same-minute repeat survives.
1029fn append_to_archive(path: &Path, entries: &[LogEntry], recovering: bool) -> crate::Result<()> {
1030    if path.exists() {
1031        let existing = fs::read_to_string(path)?;
1032
1033        let mut body = String::new();
1034        if recovering {
1035            // Crash-retry: the prior (crashed) attempt may already have appended
1036            // some/all of these. Dedup by MULTIPLICITY, not set-membership, so a
1037            // partial-then-retried roll converges exactly and a re-roll of the
1038            // full batch is a no-op.
1039            let (_header, existing_entries) = parse_active(&existing);
1040            let mut remaining: std::collections::HashMap<EntryKey, usize> =
1041                std::collections::HashMap::new();
1042            for e in &existing_entries {
1043                *remaining.entry(entry_key(e)).or_insert(0) += 1;
1044            }
1045            for e in entries {
1046                match remaining.get_mut(&entry_key(e)) {
1047                    // An archived copy is still unconsumed: this incoming entry is
1048                    // that re-roll, suppress it.
1049                    Some(count) if *count > 0 => *count -= 1,
1050                    _ => body.push_str(&e.render()),
1051                }
1052            }
1053        } else {
1054            // Fresh rotation: append every entry. A same-minute, same-fields
1055            // entry that already exists in the archive is a DISTINCT append, not
1056            // a re-roll, and must be preserved.
1057            for e in entries {
1058                body.push_str(&e.render());
1059            }
1060        }
1061
1062        // Nothing new to add (a fully-duplicate re-roll): leave the archive
1063        // byte-for-byte untouched (append-only: don't rewrite identical data).
1064        if body.is_empty() {
1065            return Ok(());
1066        }
1067
1068        let mut full = existing;
1069        if !full.ends_with('\n') {
1070            full.push('\n');
1071        }
1072        full.push_str(&body);
1073        crate::fsx::write_atomic(path, full.as_bytes())?;
1074    } else {
1075        let mut body = String::new();
1076        for e in entries {
1077            body.push_str(&e.render());
1078        }
1079        if let Some(parent) = path.parent() {
1080            fs::create_dir_all(parent)?;
1081        }
1082        let full = compose_active(LOG_FRONTMATTER, &body);
1083        crate::fsx::write_atomic(path, full.as_bytes())?;
1084    }
1085    Ok(())
1086}
1087
1088/// True iff every prior-month entry about to be rolled out (`by_month`) already
1089/// has a matching, unconsumed copy in its month archive — i.e. the whole
1090/// roll-out batch is a multiset-subset of the archives.
1091///
1092/// This is the load-bearing test for "is the `.rotating` marker a genuine
1093/// interrupted-rotation re-roll, or a stale marker over a fresh roll?" A genuine
1094/// interrupted rotation commits its per-month archive appends BEFORE the active
1095/// trim, so on retry the still-untrimmed active file's prior-month entries are
1096/// all present in the archives — exactly the duplicates the dedup must suppress.
1097/// If any entry is missing from its archive, no completed archive write exists to
1098/// re-roll: the marker is stale and these are a fresh roll that must be appended,
1099/// never deduped (deduping a genuinely-distinct same-(minute,kind,object,note)
1100/// entry against an unrelated pre-existing archive copy would, with the
1101/// unconditional active trim, drop it from disk — the bug).
1102///
1103/// Multiset semantics: each archived copy is consumed at most once, so two
1104/// distinct same-minute entries in the batch require two archived copies to count
1105/// as a re-roll. Cheap: only the months actually being rolled are read, and only
1106/// when the marker is present (the cold recovery path).
1107fn batch_is_archived(
1108    store: &Store,
1109    by_month: &BTreeMap<(i32, u32), Vec<LogEntry>>,
1110) -> crate::Result<bool> {
1111    for ((y, m), month_entries) in by_month {
1112        let path = archive_path(store, *y, *m);
1113        if !path.exists() {
1114            // No archive for this month: nothing was rolled here yet, so the
1115            // batch cannot be a completed re-roll.
1116            return Ok(false);
1117        }
1118        let (_header, archived) = parse_active(&fs::read_to_string(&path)?);
1119        let mut available: std::collections::HashMap<EntryKey, usize> =
1120            std::collections::HashMap::new();
1121        for e in &archived {
1122            *available.entry(entry_key(e)).or_insert(0) += 1;
1123        }
1124        for e in month_entries {
1125            match available.get_mut(&entry_key(e)) {
1126                Some(count) if *count > 0 => *count -= 1,
1127                _ => return Ok(false),
1128            }
1129        }
1130    }
1131    Ok(true)
1132}
1133
1134/// A hashable identity for a log entry, used to dedup an idempotent archive
1135/// re-roll (see [`append_to_archive`]). Two entries are "the same" when their
1136/// timestamp, kind, object, and note all match — exactly the fields that
1137/// round-trip through `render`/`parse`, so a re-rolled entry compares equal to
1138/// the one already archived. Owned (rather than borrowed) so keys from the
1139/// existing archive and from the incoming entries share one type regardless of
1140/// where they came from; the cost is paid only on the cold rotation path.
1141type EntryKey = (DateTime<FixedOffset>, String, Option<String>, String);
1142
1143/// Derive the dedup key for `e` (see [`EntryKey`]). Keying on `kind.as_str()`
1144/// (rather than `LogKind`, which is not `Hash`) is exact: `as_str`/`parse`
1145/// round-trips every recognized kind and preserves any `Custom` token.
1146fn entry_key(e: &LogEntry) -> EntryKey {
1147    (
1148        e.timestamp,
1149        e.kind.as_str().to_string(),
1150        e.object.clone(),
1151        e.note.clone(),
1152    )
1153}
1154
1155/// Every `log/<YYYY-MM>.md` archive, sorted **newest month first**.
1156fn list_archives_desc(store: &Store) -> crate::Result<Vec<PathBuf>> {
1157    let dir = archive_dir(store);
1158    if !dir.is_dir() {
1159        return Ok(Vec::new());
1160    }
1161    let mut months: Vec<(String, PathBuf)> = Vec::new();
1162    for entry in fs::read_dir(&dir)? {
1163        let entry = entry?;
1164        let path = entry.path();
1165        if !path.is_file() {
1166            continue;
1167        }
1168        let name = match path.file_name().and_then(|s| s.to_str()) {
1169            Some(n) => n,
1170            None => continue,
1171        };
1172        // Match `YYYY-MM.md`.
1173        if let Some(stem) = name.strip_suffix(".md") {
1174            if is_year_month(stem) {
1175                months.push((stem.to_string(), path.clone()));
1176            }
1177        }
1178    }
1179    // `YYYY-MM` strings sort lexically == chronologically; reverse for newest
1180    // first.
1181    months.sort_by(|a, b| b.0.cmp(&a.0));
1182    Ok(months.into_iter().map(|(_, p)| p).collect())
1183}
1184
1185/// The `(year, month)` an archive file represents, parsed from its
1186/// `log/<YYYY-MM>.md` name. `None` if the name isn't a well-formed month
1187/// archive (in which case the caller scans it rather than risk skipping it).
1188fn archive_year_month(path: &Path) -> Option<(i32, u32)> {
1189    let stem = path
1190        .file_name()
1191        .and_then(|s| s.to_str())
1192        .and_then(|n| n.strip_suffix(".md"))?;
1193    if !is_year_month(stem) {
1194        return None;
1195    }
1196    let year: i32 = stem[..4].parse().ok()?;
1197    let month: u32 = stem[5..7].parse().ok()?;
1198    // The month must be a real calendar month. A hand-created / externally-
1199    // produced `log/2026-00.md` or `log/2026-13.md` parses as two digits but
1200    // names no month; returning `Some((year, 0))` would sort it below every
1201    // legitimate month, so the newest-month-first early-break in `since`/`tail`
1202    // could prune it and silently drop its entries. Out-of-range → `None`, so the
1203    // caller scans the file instead of risk-skipping it (the safe fallback).
1204    if !(1..=12).contains(&month) {
1205        return None;
1206    }
1207    Some((year, month))
1208}
1209
1210/// True if `s` looks like `YYYY-MM` (4 digits, dash, 2 digits).
1211fn is_year_month(s: &str) -> bool {
1212    let bytes = s.as_bytes();
1213    if bytes.len() != 7 {
1214        return false;
1215    }
1216    bytes[..4].iter().all(u8::is_ascii_digit)
1217        && bytes[4] == b'-'
1218        && bytes[5].is_ascii_digit()
1219        && bytes[6].is_ascii_digit()
1220}
1221
1222/// Reverse-read `path` from EOF, parsing entries newest-first and feeding each
1223/// to `take`. `take` returns `true` to stop early (enough collected). The file
1224/// is read backward in blocks; only the tail region needed to satisfy `take`
1225/// is read — the whole file is read only if `take` never returns `true`.
1226fn reverse_collect<F>(path: &Path, mut take: F) -> crate::Result<()>
1227where
1228    F: FnMut(LogEntry) -> bool,
1229{
1230    let mut file = File::open(path)?;
1231    let len = file.metadata()?.len();
1232    if len == 0 {
1233        return Ok(());
1234    }
1235
1236    // Algorithm: grow a tail buffer leftward one block at a time, emitting
1237    // entries strictly newest-first as their left boundary is confirmed, and
1238    // stopping the instant `take` says enough. The whole file is read only if
1239    // `take` never returns `true` (e.g. `tail(n)` with n ≥ entry count).
1240    //
1241    // Invariant: a `## [` line-start anywhere in the buffer is a *complete*
1242    // entry — its header is the entry's first line, and its body lies to the
1243    // right and is therefore already buffered (we read right-to-left). So we
1244    // never split an entry across blocks.
1245    //
1246    // `buf` holds the file's bytes from absolute offset `start` (growing
1247    // leftward toward 0) to EOF. `emitted_abs` records the absolute offsets of
1248    // headers already handed to `take`, so re-visiting a header in a later block
1249    // never double-emits.
1250    let mut buf: Vec<u8> = Vec::new();
1251    let mut start = len;
1252    // O(1) membership: a `Vec` + `.contains()` here would be O(E²) across a large
1253    // single-month file (every header re-checked against all prior emissions).
1254    let mut emitted_abs: std::collections::HashSet<u64> = std::collections::HashSet::new();
1255    // Every header's absolute offset found so far, ascending. Built
1256    // *incrementally*: each block contributes only the markers whose `#` starts
1257    // inside it (all strictly smaller than any already-known offset, so they
1258    // prepend in order). This is the fix for the accidental O(file²) scan — the
1259    // old code re-ran `header_offsets` over the whole accumulated buffer on every
1260    // block (O(file²/block) byte comparisons on the default no-early-stop
1261    // tail/since path); now each byte is scanned for a header exactly once.
1262    let mut headers: Vec<u64> = Vec::new();
1263    let mut stop = false;
1264    // The first backward block has no already-scanned region to its right, so it
1265    // scans exactly `[0, block)`; every later block scans one byte further
1266    // (`block + 1`) to re-classify the prior block's deferred left-edge candidate
1267    // now that its left neighbour is buffered (see the scan call below).
1268    let mut first = true;
1269
1270    while start > 0 && !stop {
1271        let block = std::cmp::min(REVERSE_BLOCK as u64, start);
1272        let new_start = start - block;
1273        file.seek(SeekFrom::Start(new_start))?;
1274        let mut chunk = vec![0u8; block as usize];
1275        file.read_exact(&mut chunk)?;
1276        chunk.extend_from_slice(&buf);
1277        buf = chunk;
1278        start = new_start;
1279
1280        // Scan the freshly-prepended block (buffer indices `[0, block)`) for new
1281        // header markers. A marker straddling the block boundary has its `#` in
1282        // this window and so is still caught (see `header_offsets_range`).
1283        //
1284        // One subtlety the scan must respect: a `## [` whose `#` sits at the
1285        // block's LEFT edge (buffer index 0, absolute offset `start`) cannot have
1286        // its line-start confirmed yet when `start > 0` — the byte at `start - 1`
1287        // is not buffered. Treating index 0 as a line start there fabricates an
1288        // entry from a mid-line `## [` fragment that happens to align with a block
1289        // boundary. So `header_offsets_range` DEFERS the leftmost candidate when
1290        // `base` is not the true file start, and we re-scan one byte further
1291        // right next time: after the first block the buffer carries the previous
1292        // block's left-edge byte at index `block` with its left neighbour now in
1293        // hand, so extending the window to `block + 1` re-classifies that exactly
1294        // once. `first` guards the first block (nothing to re-check on its right).
1295        let base_is_file_start = start == 0;
1296        let scan_hi = if first { block } else { block + 1 } as usize;
1297        let mut new_headers = header_offsets_range(&buf, start, 0, scan_hi, base_is_file_start);
1298        first = false;
1299        if !new_headers.is_empty() {
1300            new_headers.extend_from_slice(&headers);
1301            headers = new_headers;
1302        }
1303
1304        // Process newest (largest offset) → oldest (smallest), emitting any
1305        // header not yet emitted. Hold back only the buffer's *leftmost* header
1306        // while we have not reached file start (`start > 0`): older entries may
1307        // still lie to its left in unread blocks, and newest-first order
1308        // requires we not emit it until we've confirmed it really is the oldest
1309        // (or read enough to bound it on the left). One extra block read at
1310        // most; on the next iteration its left boundary is in-buffer.
1311        for i in (0..headers.len()).rev() {
1312            let abs = headers[i];
1313            if emitted_abs.contains(&abs) {
1314                continue;
1315            }
1316            let is_oldest_in_buf = i == 0;
1317            if is_oldest_in_buf && start > 0 {
1318                continue;
1319            }
1320
1321            let entry_text = entry_text_at(&buf, start, abs, &headers, i);
1322            if let Some(entry) = parse_single_entry(&entry_text) {
1323                emitted_abs.insert(abs);
1324                if take(entry) {
1325                    stop = true;
1326                    break;
1327                }
1328            } else {
1329                emitted_abs.insert(abs);
1330            }
1331        }
1332    }
1333
1334    // Reached file start (or stopped). If we stopped, done. If we reached
1335    // start, emit any held-back oldest header(s) now (start == 0 means the
1336    // buffer's first header is genuinely the oldest). `headers` already holds
1337    // every offset (the loop scanned down to start == 0), so reuse it.
1338    if !stop && start == 0 {
1339        for i in (0..headers.len()).rev() {
1340            let abs = headers[i];
1341            if emitted_abs.contains(&abs) {
1342                continue;
1343            }
1344            let entry_text = entry_text_at(&buf, start, abs, &headers, i);
1345            if let Some(entry) = parse_single_entry(&entry_text) {
1346                emitted_abs.insert(abs);
1347                if take(entry) {
1348                    break;
1349                }
1350            } else {
1351                emitted_abs.insert(abs);
1352            }
1353        }
1354    }
1355
1356    Ok(())
1357}
1358
1359/// Absolute byte offsets of every **valid** entry-header line-start (`## […]`)
1360/// in `buf`, where `buf` begins at absolute offset `base`.
1361///
1362/// Only a `## [` line that [`Log::parse_header`] accepts is an entry boundary,
1363/// mirroring the forward parser ([`parse_entries`]), which folds an unparseable
1364/// `## [` line into the preceding entry's note rather than starting a new entry.
1365/// Without this validity check the reverse reader would split a real entry's
1366/// multi-line note at a continuation line beginning at column 0 with `## [`
1367/// (a shape the SPEC permits — notes are "one or more lines" with no
1368/// restriction), truncating the note and dropping the carved pseudo-entry, so
1369/// `tail`/`since`/`last_validate_at` would return a note diverging from the
1370/// intact on-disk bytes.
1371///
1372/// Whole-buffer convenience wrapper over [`header_offsets_range`]. The runtime
1373/// reverse reader now always scans incrementally (one freshly-prepended window
1374/// per backward block), so this whole-buffer form is retained only as the
1375/// oracle the range-scan tests check the incremental scan against.
1376#[cfg(test)]
1377fn header_offsets(buf: &[u8], base: u64) -> Vec<u64> {
1378    // The whole-buffer oracle treats `base` as the file start iff it is 0, so a
1379    // `## [` at buffer index 0 is a real line-start there.
1380    header_offsets_range(buf, base, 0, buf.len(), base == 0)
1381}
1382
1383/// Like [`header_offsets`] but only reports header *markers whose `#` starts in*
1384/// `buf[scan_lo..scan_hi)`, while still consulting bytes outside that window —
1385/// to the left for the line-start (`buf[i-1] == b'\n'`) check and to the right
1386/// for the header line's content. This is the incremental scan
1387/// [`reverse_collect`] uses: each backward block searches only the freshly-
1388/// prepended region for *new* markers, so total header-scan work is linear in
1389/// the file size, not the O(file²) of re-scanning the whole growing buffer on
1390/// every block.
1391///
1392/// A `## [` marker that *straddles* the boundary (its `#` in the new block, its
1393/// `[` or trailing bytes in the already-scanned region) is still detected here:
1394/// its `#` index is `< scan_hi`, so it falls in this window, and it was never
1395/// reported by an earlier scan (whose window was `[block, …)`, strictly to the
1396/// right of this one) — so each marker is reported exactly once across all
1397/// blocks.
1398///
1399/// **Left-edge line-start safety.** A `## [` whose `#` is at buffer index 0 has
1400/// no buffered left neighbour, so its line-start cannot be confirmed unless
1401/// index 0 really is the file start. `base_is_file_start` says so: when it is
1402/// `false`, an index-0 candidate is DEFERRED (not reported) rather than assumed
1403/// to be at a line start — otherwise a mid-line `## […]` fragment that happens
1404/// to align with a block's left edge would be fabricated into an entry,
1405/// truncating the real entry's note and (after rotation) corrupting the
1406/// append-only archive. The caller re-scans that byte on the next block, once
1407/// its left neighbour is buffered, so a genuine boundary header is still found
1408/// exactly once.
1409fn header_offsets_range(
1410    buf: &[u8],
1411    base: u64,
1412    scan_lo: usize,
1413    scan_hi: usize,
1414    base_is_file_start: bool,
1415) -> Vec<u64> {
1416    const PAT: &[u8] = b"## [";
1417    let mut out = Vec::new();
1418    let n = buf.len();
1419    let hi = scan_hi.min(n);
1420    let mut i = scan_lo;
1421    // A marker's `#` must start strictly before `hi`; the pattern/line content
1422    // may read past `hi` into `buf` (the right neighbour is already buffered).
1423    while i < hi && i + PAT.len() <= n {
1424        if &buf[i..i + PAT.len()] == PAT {
1425            // Index 0 is a line start only when it is the genuine file start;
1426            // otherwise its left neighbour is unbuffered and the candidate is
1427            // deferred to the next block (see the doc comment).
1428            let at_line_start = if i == 0 {
1429                base_is_file_start
1430            } else {
1431                buf[i - 1] == b'\n'
1432            };
1433            if at_line_start && is_valid_header_line(buf, i) {
1434                out.push(base + i as u64);
1435                // skip ahead past this marker
1436                i += PAT.len();
1437                continue;
1438            }
1439        }
1440        i += 1;
1441    }
1442    out
1443}
1444
1445/// Whether the `## [` line starting at byte `i` in `buf` parses as a valid
1446/// entry header. Reads the line up to (but not including) the next `\n` (or
1447/// buffer end) and defers to [`Log::parse_header`] — the same validity gate the
1448/// forward parser applies, keeping the reverse reader's boundary set identical
1449/// to the forward one.
1450fn is_valid_header_line(buf: &[u8], i: usize) -> bool {
1451    let line_end = buf[i..]
1452        .iter()
1453        .position(|&b| b == b'\n')
1454        .map(|p| i + p)
1455        .unwrap_or(buf.len());
1456    let line = String::from_utf8_lossy(&buf[i..line_end]);
1457    Log::parse_header(&line).is_some()
1458}
1459
1460/// Extract the text of the entry whose header is at absolute offset
1461/// `header_abs` (the `headers[idx]` entry), spanning to the next header (or
1462/// buffer end). `buf` begins at absolute offset `base`.
1463fn entry_text_at(buf: &[u8], base: u64, header_abs: u64, headers: &[u64], idx: usize) -> String {
1464    let rel_start = (header_abs - base) as usize;
1465    let rel_end = if idx + 1 < headers.len() {
1466        (headers[idx + 1] - base) as usize
1467    } else {
1468        buf.len()
1469    };
1470    String::from_utf8_lossy(&buf[rel_start..rel_end]).into_owned()
1471}
1472
1473/// Parse a single entry from a text block that begins at its header line.
1474fn parse_single_entry(text: &str) -> Option<LogEntry> {
1475    parse_entries(text).into_iter().next()
1476}
1477
1478#[cfg(test)]
1479mod tests {
1480    use super::*;
1481    use crate::parser::Config;
1482    use std::fs;
1483    use tempfile::TempDir;
1484
1485    /// Build a `Store` rooted at a fresh temp dir with a minimal `DB.md`.
1486    /// Construct the `Store` struct directly so the test stays narrow and never
1487    /// exercises the `Store::open` parser path.
1488    fn temp_store() -> (TempDir, Store) {
1489        let dir = tempfile::tempdir().expect("tempdir");
1490        fs::write(dir.path().join("DB.md"), "---\ntype: db-md\n---\n").expect("write DB.md");
1491        let store = Store {
1492            root: dir.path().to_path_buf(),
1493            config: Config::default(),
1494        };
1495        (dir, store)
1496    }
1497
1498    /// Regression (adversarial review): a hand-created / externally-produced
1499    /// archive with an out-of-range month (`00`, `13`..`99`) must NOT parse as a
1500    /// real month archive — otherwise its `(year, 0)` bucket sorts below every
1501    /// legitimate month and the newest-first early-break in `since`/`tail` can
1502    /// silently prune it. Out-of-range → `None` (the caller scans it instead).
1503    #[test]
1504    fn archive_year_month_rejects_out_of_range_months() {
1505        use std::path::Path;
1506        assert_eq!(
1507            archive_year_month(Path::new("log/2026-05.md")),
1508            Some((2026, 5))
1509        );
1510        assert_eq!(
1511            archive_year_month(Path::new("log/2026-01.md")),
1512            Some((2026, 1))
1513        );
1514        assert_eq!(
1515            archive_year_month(Path::new("log/2026-12.md")),
1516            Some((2026, 12))
1517        );
1518        for bad in ["log/2026-00.md", "log/2026-13.md", "log/2026-99.md"] {
1519            assert_eq!(
1520                archive_year_month(Path::new(bad)),
1521                None,
1522                "{bad} has an out-of-range month and must not parse as an archive"
1523            );
1524        }
1525    }
1526
1527    /// A timestamp at UTC from `YYYY-MM-DD HH:MM` components.
1528    fn ts(y: i32, mo: u32, d: u32, h: u32, mi: u32) -> DateTime<FixedOffset> {
1529        let naive = chrono::NaiveDate::from_ymd_opt(y, mo, d)
1530            .unwrap()
1531            .and_hms_opt(h, mi, 0)
1532            .unwrap();
1533        FixedOffset::east_opt(0)
1534            .unwrap()
1535            .from_local_datetime(&naive)
1536            .single()
1537            .unwrap()
1538    }
1539
1540    #[allow(clippy::too_many_arguments)] // test fixture builder; struct-ifying churns every call site
1541    fn entry(
1542        y: i32,
1543        mo: u32,
1544        d: u32,
1545        h: u32,
1546        mi: u32,
1547        kind: LogKind,
1548        object: Option<&str>,
1549        note: &str,
1550    ) -> LogEntry {
1551        LogEntry {
1552            timestamp: ts(y, mo, d, h, mi),
1553            kind,
1554            object: object.map(|s| s.to_string()),
1555            note: note.to_string(),
1556        }
1557    }
1558
1559    // ── parse_header ────────────────────────────────────────────────────────
1560
1561    #[test]
1562    fn parse_header_with_object() {
1563        let (t, k, o) =
1564            Log::parse_header("## [2026-05-27 10:00] ingest | sources/emails/x.eml").unwrap();
1565        assert_eq!(t, ts(2026, 5, 27, 10, 0));
1566        assert_eq!(k, LogKind::Ingest);
1567        assert_eq!(o.as_deref(), Some("sources/emails/x.eml"));
1568    }
1569
1570    #[test]
1571    fn parse_header_without_object_is_none_object() {
1572        let (t, k, o) = Log::parse_header("## [2026-05-27 10:20] validate").unwrap();
1573        assert_eq!(t, ts(2026, 5, 27, 10, 20));
1574        assert_eq!(k, LogKind::Validate);
1575        assert_eq!(o, None);
1576    }
1577
1578    #[test]
1579    fn parse_header_custom_kind_roundtrips_token() {
1580        let (_, k, o) = Log::parse_header("## [2026-05-27 10:00] proposal | records/x").unwrap();
1581        assert_eq!(k, LogKind::Custom("proposal".to_string()));
1582        assert!(!k.is_recognized());
1583        assert_eq!(o.as_deref(), Some("records/x"));
1584    }
1585
1586    #[test]
1587    fn parse_header_index_rebuild_hyphenated_kind() {
1588        let (_, k, _) = Log::parse_header("## [2026-05-27 10:00] index-rebuild").unwrap();
1589        assert_eq!(k, LogKind::IndexRebuild);
1590        assert_eq!(k.as_str(), "index-rebuild");
1591    }
1592
1593    #[test]
1594    fn parse_header_rejects_non_headers() {
1595        assert!(Log::parse_header("Not a header").is_none());
1596        assert!(Log::parse_header("# Curator log").is_none());
1597        assert!(Log::parse_header("## [garbage] ingest | x").is_none());
1598        assert!(Log::parse_header("## [2026-05-27 10:00]").is_none()); // no kind
1599                                                                       // A bracketed but non-timestamp date must be rejected (LOG_BAD_TIMESTAMP territory).
1600        assert!(Log::parse_header("## [2026-13-40 99:99] ingest | x").is_none());
1601    }
1602
1603    // ── kind round-trip ───────────────────────────────────────────────────────
1604
1605    #[test]
1606    fn kind_as_str_parse_roundtrip_for_all_recognized() {
1607        for k in [
1608            LogKind::Ingest,
1609            LogKind::Create,
1610            LogKind::Update,
1611            LogKind::Delete,
1612            LogKind::Rename,
1613            LogKind::Link,
1614            LogKind::Validate,
1615            LogKind::IndexRebuild,
1616            LogKind::Contradiction,
1617        ] {
1618            assert_eq!(LogKind::parse(k.as_str()), k);
1619            assert!(k.is_recognized());
1620        }
1621    }
1622
1623    // ── append: creation + frontmatter ───────────────────────────────────────
1624
1625    #[test]
1626    fn append_creates_log_with_frontmatter_and_entry() {
1627        let (_d, store) = temp_store();
1628        let e = entry(
1629            2026,
1630            5,
1631            27,
1632            10,
1633            0,
1634            LogKind::Ingest,
1635            Some("sources/emails/x.eml"),
1636            "Email received.",
1637        );
1638        Log::append(&store, &e).unwrap();
1639
1640        let content = fs::read_to_string(store.root.join("log.md")).unwrap();
1641        // type: log frontmatter present.
1642        assert!(
1643            content.starts_with("---\ntype: log\n---\n"),
1644            "missing log frontmatter; got:\n{content}"
1645        );
1646        // The entry header is rendered verbatim.
1647        assert!(content.contains("## [2026-05-27 10:00] ingest | sources/emails/x.eml"));
1648        assert!(content.contains("Email received."));
1649        // No archive dir created when nothing rotates.
1650        assert!(!store.root.join("log").exists());
1651    }
1652
1653    // ── append → tail → since round-trip ─────────────────────────────────────
1654
1655    #[test]
1656    fn append_tail_since_roundtrip() {
1657        let (_d, store) = temp_store();
1658        let e1 = entry(2026, 5, 27, 10, 0, LogKind::Ingest, Some("a"), "first");
1659        let e2 = entry(2026, 5, 27, 10, 5, LogKind::Create, Some("b"), "second");
1660        let e3 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("c"), "third");
1661        Log::append(&store, &e1).unwrap();
1662        Log::append(&store, &e2).unwrap();
1663        Log::append(&store, &e3).unwrap();
1664
1665        // tail(2) returns the two newest, in chronological order.
1666        let tail = Log::tail(&store, 2).unwrap();
1667        assert_eq!(tail.len(), 2);
1668        assert_eq!(tail[0], e2);
1669        assert_eq!(tail[1], e3);
1670
1671        // tail(n) larger than the log returns everything, chronologically.
1672        let all = Log::tail(&store, 99).unwrap();
1673        assert_eq!(all, vec![e1.clone(), e2.clone(), e3.clone()]);
1674
1675        // since(10:05) returns strictly-newer entries (excludes the 10:05 one).
1676        let since = Log::since(&store, ts(2026, 5, 27, 10, 5)).unwrap();
1677        assert_eq!(since, vec![e3.clone()]);
1678
1679        // since before everything returns all.
1680        let since_all = Log::since(&store, ts(2026, 5, 27, 9, 0)).unwrap();
1681        assert_eq!(since_all, vec![e1, e2, e3]);
1682    }
1683
1684    #[test]
1685    fn tail_zero_is_empty() {
1686        let (_d, store) = temp_store();
1687        Log::append(
1688            &store,
1689            &entry(2026, 5, 27, 10, 0, LogKind::Ingest, Some("a"), "x"),
1690        )
1691        .unwrap();
1692        assert!(Log::tail(&store, 0).unwrap().is_empty());
1693    }
1694
1695    #[test]
1696    fn tail_and_since_on_missing_log_are_empty() {
1697        let (_d, store) = temp_store();
1698        assert!(Log::tail(&store, 5).unwrap().is_empty());
1699        assert!(Log::since(&store, ts(2000, 1, 1, 0, 0)).unwrap().is_empty());
1700        assert!(Log::last_validate_at(&store).unwrap().is_none());
1701    }
1702
1703    #[test]
1704    fn since_exact_timestamp_is_exclusive() {
1705        let (_d, store) = temp_store();
1706        let e = entry(2026, 5, 27, 10, 0, LogKind::Validate, None, "PASS");
1707        Log::append(&store, &e).unwrap();
1708        // Equal timestamp must NOT be included (strictly newer).
1709        assert!(Log::since(&store, ts(2026, 5, 27, 10, 0))
1710            .unwrap()
1711            .is_empty());
1712    }
1713
1714    // ── since: out-of-order on disk (append-only correction / merge=union) ────
1715
1716    /// Write a `log.md` at the store root from `entries` in the EXACT given
1717    /// physical order, with the standard `type: log` frontmatter. Unlike
1718    /// [`Log::append`] (which always lands the newest entry at EOF), this lets a
1719    /// test author the non-monotonic on-disk shape the SPEC permits — a
1720    /// backdated corrective entry below the entry it corrects, or a
1721    /// `merge=union` interleave.
1722    fn write_raw_log(store: &Store, entries: &[LogEntry]) {
1723        let mut content = String::from(LOG_FRONTMATTER);
1724        content.push('\n');
1725        for e in entries {
1726            content.push_str(&e.render());
1727        }
1728        fs::write(store.root.join("log.md"), content).expect("write raw log.md");
1729    }
1730
1731    #[test]
1732    fn since_returns_newer_entries_even_when_disk_order_is_non_monotonic() {
1733        // The demonstrated regression: a curator appended a backdated CORRECTIVE
1734        // entry (10:00) below newer entries (10:10, 10:05), so the physical
1735        // on-disk order is 10:10, 10:05, 10:00 — newest-first, not chronological.
1736        // The append-only SPEC explicitly permits this ("append a corrective
1737        // entry below it"; out-of-order is only LOG_OUT_OF_ORDER, a warning).
1738        let (_d, store) = temp_store();
1739        let e_1010 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("c"), "newest");
1740        let e_1005 = entry(2026, 5, 27, 10, 5, LogKind::Create, Some("b"), "middle");
1741        let e_1000 = entry(
1742            2026,
1743            5,
1744            27,
1745            10,
1746            0,
1747            LogKind::Update,
1748            Some("a"),
1749            "backdated fix",
1750        );
1751        // Physical order on disk: 10:10, 10:05, then the backdated 10:00 LAST.
1752        write_raw_log(&store, &[e_1010, e_1005, e_1000]);
1753
1754        // since 10:02 must return BOTH entries strictly newer than 10:02
1755        // (10:05 and 10:10). The old early-stop hit the physically-last 10:00
1756        // entry (<= 10:02), stopped, and returned EMPTY — silently dropping the
1757        // two newer entries that sit earlier in the file.
1758        let got = Log::since(&store, ts(2026, 5, 27, 10, 2)).unwrap();
1759        let stamps: std::collections::BTreeSet<_> = got.iter().map(|e| e.timestamp).collect();
1760        assert_eq!(
1761            stamps,
1762            [ts(2026, 5, 27, 10, 5), ts(2026, 5, 27, 10, 10)]
1763                .into_iter()
1764                .collect(),
1765            "since(10:02) must include both 10:05 and 10:10 despite the backdated \
1766             10:00 entry sitting physically last, and exclude 10:00; got {got:?}"
1767        );
1768
1769        // A cutoff before everything still returns all three, regardless of the
1770        // scrambled disk order.
1771        let all = Log::since(&store, ts(2026, 5, 27, 9, 0)).unwrap();
1772        let all_stamps: std::collections::BTreeSet<_> = all.iter().map(|e| e.timestamp).collect();
1773        assert_eq!(
1774            all_stamps,
1775            [
1776                ts(2026, 5, 27, 10, 0),
1777                ts(2026, 5, 27, 10, 5),
1778                ts(2026, 5, 27, 10, 10),
1779            ]
1780            .into_iter()
1781            .collect()
1782        );
1783    }
1784
1785    #[test]
1786    fn since_crosses_archive_when_newer_entry_is_out_of_order_inside_it() {
1787        // Out-of-order INSIDE an archive month, with the cutoff landing in that
1788        // month. The April archive is authored newest-physical-first (04-20,
1789        // then a backdated 04-05 last); a naive early-stop on the first
1790        // older-than-cutoff entry would miss the later April entry. The active
1791        // file holds a clean May entry. Cutoff = mid-April.
1792        let (_d, store) = temp_store();
1793
1794        // Active file: one current-month (May) entry.
1795        let may = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may1");
1796        write_raw_log(&store, &[may]);
1797
1798        // April archive authored out of order: 04-20 first, backdated 04-05 last.
1799        let apr_late = entry(
1800            2026,
1801            4,
1802            20,
1803            9,
1804            0,
1805            LogKind::Create,
1806            Some("apr-b"),
1807            "apr-late",
1808        );
1809        let apr_early = entry(
1810            2026,
1811            4,
1812            5,
1813            9,
1814            0,
1815            LogKind::Ingest,
1816            Some("apr-a"),
1817            "apr-early",
1818        );
1819        let dir = store.root.join("log");
1820        fs::create_dir_all(&dir).unwrap();
1821        let mut arch = String::from(LOG_FRONTMATTER);
1822        arch.push('\n');
1823        arch.push_str(&apr_late.render());
1824        arch.push_str(&apr_early.render());
1825        fs::write(dir.join("2026-04.md"), arch).unwrap();
1826
1827        // since mid-April: the later April entry (04-20) AND the May entry must
1828        // come back; the early April entry (04-05) must not.
1829        let got = Log::since(&store, ts(2026, 4, 15, 0, 0)).unwrap();
1830        let stamps: std::collections::BTreeSet<_> = got.iter().map(|e| e.timestamp).collect();
1831        assert_eq!(
1832            stamps,
1833            [ts(2026, 4, 20, 9, 0), ts(2026, 5, 2, 8, 0)]
1834                .into_iter()
1835                .collect(),
1836            "since(mid-April) must include the out-of-order later April entry \
1837             and the May entry, and exclude the earlier April entry; got {got:?}"
1838        );
1839    }
1840
1841    // ── multi-line notes ──────────────────────────────────────────────────────
1842
1843    #[test]
1844    fn multiline_note_is_preserved() {
1845        let (_d, store) = temp_store();
1846        let e = entry(
1847            2026,
1848            5,
1849            27,
1850            10,
1851            0,
1852            LogKind::Create,
1853            Some("records/x"),
1854            "Line one.\nLine two.\nLine three.",
1855        );
1856        Log::append(&store, &e).unwrap();
1857        let got = Log::tail(&store, 1).unwrap();
1858        assert_eq!(got[0].note, "Line one.\nLine two.\nLine three.");
1859    }
1860
1861    #[test]
1862    fn empty_note_roundtrips_as_empty() {
1863        let (_d, store) = temp_store();
1864        let e = entry(2026, 5, 27, 10, 0, LogKind::Validate, None, "");
1865        Log::append(&store, &e).unwrap();
1866        let got = Log::tail(&store, 1).unwrap();
1867        assert_eq!(got[0], e);
1868        assert_eq!(got[0].note, "");
1869    }
1870
1871    // ── last_validate_at ─────────────────────────────────────────────────────
1872
1873    #[test]
1874    fn last_validate_at_finds_most_recent_validate() {
1875        let (_d, store) = temp_store();
1876        Log::append(
1877            &store,
1878            &entry(2026, 5, 27, 10, 0, LogKind::Validate, None, "first pass"),
1879        )
1880        .unwrap();
1881        Log::append(
1882            &store,
1883            &entry(2026, 5, 27, 10, 5, LogKind::Create, Some("a"), "made a"),
1884        )
1885        .unwrap();
1886        Log::append(
1887            &store,
1888            &entry(2026, 5, 27, 10, 10, LogKind::Validate, None, "second pass"),
1889        )
1890        .unwrap();
1891        Log::append(
1892            &store,
1893            &entry(2026, 5, 27, 10, 15, LogKind::Update, Some("a"), "edit a"),
1894        )
1895        .unwrap();
1896
1897        let last = Log::last_validate_at(&store).unwrap();
1898        assert_eq!(last, Some(ts(2026, 5, 27, 10, 10)));
1899    }
1900
1901    #[test]
1902    fn last_validate_at_none_when_no_validate() {
1903        let (_d, store) = temp_store();
1904        Log::append(
1905            &store,
1906            &entry(2026, 5, 27, 10, 0, LogKind::Create, Some("a"), "x"),
1907        )
1908        .unwrap();
1909        assert_eq!(Log::last_validate_at(&store).unwrap(), None);
1910    }
1911
1912    // ── month-boundary rotation ──────────────────────────────────────────────
1913
1914    #[test]
1915    fn rotation_rolls_prior_months_into_archives() {
1916        let (_d, store) = temp_store();
1917        // Two April entries and one May entry, all written while "current" was
1918        // their own month (append-only chronological order).
1919        let a1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
1920        let a2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
1921        Log::append(&store, &a1).unwrap();
1922        Log::append(&store, &a2).unwrap();
1923
1924        // Before rotation: no archive dir, both April entries in active.
1925        assert!(!store.root.join("log").exists());
1926
1927        // Appending a May entry must roll April into log/2026-04.md.
1928        let m1 = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may one");
1929        Log::append(&store, &m1).unwrap();
1930
1931        // Archive exists and holds both April entries with frontmatter.
1932        let arch_path = store.root.join("log").join("2026-04.md");
1933        assert!(arch_path.exists(), "expected April archive to be created");
1934        let arch = fs::read_to_string(&arch_path).unwrap();
1935        assert!(arch.starts_with("---\ntype: log\n---\n"));
1936        assert!(arch.contains("## [2026-04-10 09:00] ingest | apr-a"));
1937        assert!(arch.contains("## [2026-04-20 09:00] create | apr-b"));
1938        assert!(arch.contains("apr one"));
1939        assert!(arch.contains("apr two"));
1940
1941        // Active file now holds ONLY the May entry (no April entries).
1942        let active = fs::read_to_string(store.root.join("log.md")).unwrap();
1943        assert!(active.contains("## [2026-05-02 08:00] update | may-a"));
1944        assert!(
1945            !active.contains("apr-a") && !active.contains("apr-b"),
1946            "April entries must be gone from the active file; got:\n{active}"
1947        );
1948
1949        // The full timeline (archives ++ active) is intact and chronological.
1950        let all = Log::tail(&store, 99).unwrap();
1951        assert_eq!(all, vec![a1, a2, m1]);
1952    }
1953
1954    #[test]
1955    fn rotation_groups_distinct_prior_months_into_separate_archives() {
1956        let (_d, store) = temp_store();
1957        // March + April entries accumulate, then a May append rolls BOTH prior
1958        // months into their own archive files.
1959        let mar = entry(2026, 3, 5, 9, 0, LogKind::Ingest, Some("mar"), "march");
1960        let apr = entry(2026, 4, 5, 9, 0, LogKind::Create, Some("apr"), "april");
1961        Log::append(&store, &mar).unwrap();
1962        Log::append(&store, &apr).unwrap();
1963        // At this point April is current, March already rolled into its archive.
1964        assert!(store.root.join("log").join("2026-03.md").exists());
1965
1966        let may = entry(2026, 5, 5, 9, 0, LogKind::Update, Some("may"), "may");
1967        Log::append(&store, &may).unwrap();
1968
1969        assert!(store.root.join("log").join("2026-03.md").exists());
1970        assert!(store.root.join("log").join("2026-04.md").exists());
1971
1972        // Each archive holds only its own month.
1973        let mar_arch = fs::read_to_string(store.root.join("log").join("2026-03.md")).unwrap();
1974        let apr_arch = fs::read_to_string(store.root.join("log").join("2026-04.md")).unwrap();
1975        assert!(mar_arch.contains("mar") && !mar_arch.contains("apr"));
1976        assert!(apr_arch.contains("apr") && !apr_arch.contains("mar"));
1977
1978        // Active holds only May.
1979        let active = fs::read_to_string(store.root.join("log.md")).unwrap();
1980        assert!(active.contains("may") && !active.contains("mar") && !active.contains("apr"));
1981
1982        // Timeline intact and ordered across both archives + active.
1983        let all = Log::tail(&store, 99).unwrap();
1984        assert_eq!(all, vec![mar, apr, may]);
1985    }
1986
1987    #[test]
1988    fn tail_crosses_into_archive_when_n_spans_month_boundary() {
1989        let (_d, store) = temp_store();
1990        let a1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr1");
1991        let a2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr2");
1992        let m1 = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may1");
1993        let m2 = entry(2026, 5, 3, 8, 0, LogKind::Update, Some("may-b"), "may2");
1994        for e in [&a1, &a2, &m1, &m2] {
1995            Log::append(&store, e).unwrap();
1996        }
1997        // April is now archived; active holds only May. tail(3) must reach back
1998        // into the archive for the third-newest entry.
1999        let tail3 = Log::tail(&store, 3).unwrap();
2000        assert_eq!(tail3, vec![a2.clone(), m1.clone(), m2.clone()]);
2001
2002        // tail within the active month does NOT need the archive but is still
2003        // correct.
2004        let tail2 = Log::tail(&store, 2).unwrap();
2005        assert_eq!(tail2, vec![m1, m2]);
2006    }
2007
2008    #[test]
2009    fn since_crosses_into_archive_and_early_stops() {
2010        let (_d, store) = temp_store();
2011        let a1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr1");
2012        let a2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr2");
2013        let m1 = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may1");
2014        for e in [&a1, &a2, &m1] {
2015            Log::append(&store, e).unwrap();
2016        }
2017        // since a mid-April time: must include the later April entry (from the
2018        // archive) and the May entry, but not the earlier April one.
2019        let got = Log::since(&store, ts(2026, 4, 15, 0, 0)).unwrap();
2020        assert_eq!(got, vec![a2, m1]);
2021    }
2022
2023    #[test]
2024    fn last_validate_at_crosses_into_archive() {
2025        let (_d, store) = temp_store();
2026        // A validate in April, then non-validate work that rolls April away.
2027        Log::append(
2028            &store,
2029            &entry(2026, 4, 10, 9, 0, LogKind::Validate, None, "apr validate"),
2030        )
2031        .unwrap();
2032        Log::append(
2033            &store,
2034            &entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may work"),
2035        )
2036        .unwrap();
2037        // Active has only the May update; the most-recent validate lives in the
2038        // April archive and must still be found.
2039        let last = Log::last_validate_at(&store).unwrap();
2040        assert_eq!(last, Some(ts(2026, 4, 10, 9, 0)));
2041    }
2042
2043    // ── reverse-read correctness on a large (multi-block) log ────────────────
2044
2045    #[test]
2046    fn reverse_read_correct_on_large_single_month_log() {
2047        let (_d, store) = temp_store();
2048        // Append many same-month entries with chunky multi-line notes so the
2049        // file spans well past one REVERSE_BLOCK (8 KiB). Timestamps are
2050        // strictly increasing (a real append-only log is monotonic): each entry
2051        // is 3 minutes after the previous, all within June, so physical order
2052        // equals chronological order and the last-k-physical ARE the k-newest.
2053        let n = 400usize;
2054        let mut expected: Vec<LogEntry> = Vec::new();
2055        for i in 0..n {
2056            let total_min = (i as u32) * 3;
2057            let day = 1 + total_min / (24 * 60);
2058            let hour = (total_min / 60) % 24;
2059            let min = total_min % 60;
2060            // Unique, multi-line note to bulk up the file and detect mis-parses.
2061            let note = format!(
2062                "entry number {i}\nbody line A for {i}\nbody line B for {i} with padding {}",
2063                "x".repeat(40)
2064            );
2065            let e = entry(
2066                2026,
2067                6,
2068                day,
2069                hour,
2070                min,
2071                LogKind::Update,
2072                Some(&format!("records/item-{i:04}")),
2073                &note,
2074            );
2075            Log::append(&store, &e).unwrap();
2076            expected.push(e);
2077        }
2078
2079        // File must actually be multi-block to exercise the backward reader.
2080        let size = fs::metadata(store.root.join("log.md")).unwrap().len();
2081        assert!(
2082            size > (REVERSE_BLOCK as u64) * 2,
2083            "test log not large enough ({size} bytes) to exercise multi-block reverse-read"
2084        );
2085
2086        // tail(5) must equal the 5 newest, exactly.
2087        let tail5 = Log::tail(&store, 5).unwrap();
2088        assert_eq!(tail5, expected[n - 5..].to_vec());
2089
2090        // tail(50) must equal the 50 newest.
2091        let tail50 = Log::tail(&store, 50).unwrap();
2092        assert_eq!(tail50, expected[n - 50..].to_vec());
2093
2094        // tail(all) must reconstruct the whole timeline in order.
2095        let all = Log::tail(&store, n + 10).unwrap();
2096        assert_eq!(all.len(), n);
2097        assert_eq!(all, expected);
2098    }
2099
2100    // ── tail on OUT-OF-ORDER logs (newest-by-timestamp, not last-physical) ────
2101    //
2102    // The append-only contract is non-decreasing time order, but it's only a
2103    // `LOG_OUT_OF_ORDER` warning when violated (corrective entries land below
2104    // the entry they correct; backdated / clock-skewed writes; `merge=union`
2105    // clone merges). `tail N` must return the N newest *by timestamp*, never the
2106    // last N *physical* entries.
2107
2108    /// Write `log.md` verbatim from rendered entries in the given **physical
2109    /// (file) order**, bypassing `Log::append` so the test controls on-disk
2110    /// order exactly (append never reorders within a month, but this is the
2111    /// clearest way to pin a specific physical layout).
2112    fn write_log_physical(store: &Store, entries: &[LogEntry]) {
2113        let mut body = String::new();
2114        for e in entries {
2115            body.push_str(&e.render());
2116        }
2117        let full = compose_active(LOG_FRONTMATTER, &body);
2118        fs::write(store.root.join("log.md"), full).expect("write log.md");
2119    }
2120
2121    #[test]
2122    fn tail_returns_newest_by_timestamp_on_demonstrated_out_of_order_log() {
2123        // The exact case from the review finding: physical order 10:10, 10:05,
2124        // 10:00 (a backdated entry tail). The OLD code returned the last two
2125        // physical entries {10:05, 10:00}; the correct answer is the two newest
2126        // by time {10:05, 10:10}.
2127        let (_d, store) = temp_store();
2128        let e_1010 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("c"), "ten-ten");
2129        let e_1005 = entry(
2130            2026,
2131            5,
2132            27,
2133            10,
2134            5,
2135            LogKind::Create,
2136            Some("b"),
2137            "ten-oh-five",
2138        );
2139        let e_1000 = entry(2026, 5, 27, 10, 0, LogKind::Ingest, Some("a"), "ten-oh-oh");
2140        // Physical order: newest first, then the two older ones — out of order.
2141        write_log_physical(&store, &[e_1010.clone(), e_1005.clone(), e_1000.clone()]);
2142
2143        let tail2 = Log::tail(&store, 2).unwrap();
2144        assert_eq!(
2145            tail2,
2146            vec![e_1005.clone(), e_1010.clone()],
2147            "tail(2) must be the two NEWEST by timestamp (chronological), \
2148             not the last two physical entries"
2149        );
2150        // The newest entry must be present and the oldest absent.
2151        assert!(tail2.contains(&e_1010), "newest (10:10) must be included");
2152        assert!(!tail2.contains(&e_1000), "oldest (10:00) must be excluded");
2153
2154        // tail(1) is just the single newest.
2155        assert_eq!(Log::tail(&store, 1).unwrap(), vec![e_1010.clone()]);
2156        // tail(all) is the full set in chronological order.
2157        assert_eq!(Log::tail(&store, 99).unwrap(), vec![e_1000, e_1005, e_1010]);
2158    }
2159
2160    #[test]
2161    fn tail_no_early_stop_when_newer_entry_sits_before_an_older_one() {
2162        // Guards the unsound within-file early stop: a newer entry (10:50) sits
2163        // PHYSICALLY BEFORE a much older one (10:00). Reading newest-physical-
2164        // first, the scan meets 10:00 before 10:50; any "stop at the first entry
2165        // below the window minimum" rule would bail and drop 10:50.
2166        //
2167        // Physical (top→bottom): 10:55, 10:10, 10:50, 10:00.
2168        // Reverse-scan order:     10:00, 10:50, 10:10, 10:55.
2169        let (_d, store) = temp_store();
2170        let e55 = entry(2026, 5, 27, 10, 55, LogKind::Update, Some("x55"), "55");
2171        let e10 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("x10"), "10");
2172        let e50 = entry(2026, 5, 27, 10, 50, LogKind::Update, Some("x50"), "50");
2173        let e00 = entry(2026, 5, 27, 10, 0, LogKind::Update, Some("x00"), "00");
2174        write_log_physical(
2175            &store,
2176            &[e55.clone(), e10.clone(), e50.clone(), e00.clone()],
2177        );
2178
2179        // The two newest by timestamp are 10:55 and 10:50 — NOT the early-stop
2180        // victim 10:10, and NOT the last-physical 10:00.
2181        let tail2 = Log::tail(&store, 2).unwrap();
2182        assert_eq!(tail2, vec![e50.clone(), e55.clone()]);
2183
2184        let tail3 = Log::tail(&store, 3).unwrap();
2185        assert_eq!(tail3, vec![e10.clone(), e50.clone(), e55.clone()]);
2186    }
2187
2188    #[test]
2189    fn tail_orders_equal_timestamps_by_physical_recency() {
2190        // Three entries share 10:00; one is at 09:59. tail(2) must keep both
2191        // 10:00 entries, and among the equal pair the one appended LATER
2192        // (physically last) sorts last ("newest" = most-recently recorded).
2193        let (_d, store) = temp_store();
2194        let early = entry(2026, 5, 27, 9, 59, LogKind::Create, Some("early"), "before");
2195        let tie_a = entry(
2196            2026,
2197            5,
2198            27,
2199            10,
2200            0,
2201            LogKind::Update,
2202            Some("tie-a"),
2203            "first 10:00",
2204        );
2205        let tie_b = entry(
2206            2026,
2207            5,
2208            27,
2209            10,
2210            0,
2211            LogKind::Update,
2212            Some("tie-b"),
2213            "second 10:00",
2214        );
2215        // Physical append order: early, tie_a, tie_b.
2216        write_log_physical(&store, &[early.clone(), tie_a.clone(), tie_b.clone()]);
2217
2218        let tail2 = Log::tail(&store, 2).unwrap();
2219        assert_eq!(
2220            tail2,
2221            vec![tie_a.clone(), tie_b.clone()],
2222            "both 10:00 entries kept, physically-later one (tie_b) last; 09:59 dropped"
2223        );
2224        // tail(1) keeps only the most-recently-recorded of the equal pair.
2225        assert_eq!(Log::tail(&store, 1).unwrap(), vec![tie_b]);
2226    }
2227
2228    #[test]
2229    fn tail_finds_newest_across_a_backdated_entry_spanning_the_month_boundary() {
2230        // A backdated entry can land physically after newer entries even across
2231        // a rotation: append May entries, then a June entry (rolls May to its
2232        // archive), then append a May-dated correction — it goes into the ACTIVE
2233        // file, physically after June. tail must still rank by timestamp, so the
2234        // June entry stays newest and the backdated May entry is not mistaken
2235        // for the tail.
2236        let (_d, store) = temp_store();
2237        let may1 = entry(2026, 5, 10, 9, 0, LogKind::Ingest, Some("may-1"), "may one");
2238        let may2 = entry(2026, 5, 20, 9, 0, LogKind::Create, Some("may-2"), "may two");
2239        let jun1 = entry(2026, 6, 2, 8, 0, LogKind::Update, Some("jun-1"), "jun one");
2240        Log::append(&store, &may1).unwrap();
2241        Log::append(&store, &may2).unwrap();
2242        Log::append(&store, &jun1).unwrap(); // rotates May -> log/2026-05.md
2243        assert!(store.root.join("log").join("2026-05.md").exists());
2244
2245        // A backdated May correction, appended now: it lands in the active file
2246        // (its month May is not strictly before the active month June), so the
2247        // active file is physically [jun1, may_corr] — out of order.
2248        let may_corr = entry(
2249            2026,
2250            5,
2251            25,
2252            9,
2253            0,
2254            LogKind::Update,
2255            Some("may-2"),
2256            "may correction",
2257        );
2258        Log::append(&store, &may_corr).unwrap();
2259        let active = fs::read_to_string(store.root.join("log.md")).unwrap();
2260        assert!(
2261            active.contains("jun-1") && active.contains("may correction"),
2262            "backdated May entry should be in the active file alongside June; got:\n{active}"
2263        );
2264
2265        // The single newest by timestamp is the June entry, even though the
2266        // backdated May entry is physically last.
2267        assert_eq!(Log::tail(&store, 1).unwrap(), vec![jun1.clone()]);
2268
2269        // tail(2): the two newest by time are may_corr (05-25) and jun1 (06-02).
2270        let tail2 = Log::tail(&store, 2).unwrap();
2271        assert_eq!(tail2, vec![may_corr.clone(), jun1.clone()]);
2272
2273        // tail(3) must reach into the May archive for the third-newest (may2,
2274        // 05-20), proving archive crossing still works on an out-of-order store.
2275        let tail3 = Log::tail(&store, 3).unwrap();
2276        assert_eq!(tail3, vec![may2.clone(), may_corr.clone(), jun1.clone()]);
2277
2278        // tail(all) reconstructs the whole timeline in chronological order.
2279        let all = Log::tail(&store, 99).unwrap();
2280        assert_eq!(all, vec![may1, may2, may_corr, jun1]);
2281    }
2282
2283    #[test]
2284    fn parse_entries_skips_unparseable_header_folding_into_body() {
2285        // A `## [` line that is NOT a valid header should not start a new entry;
2286        // it folds into the preceding entry's note. This guards the
2287        // parse_entries header-validation branch.
2288        let text = "\
2289## [2026-05-27 10:00] create | records/x
2290Body mentions a literal: ## [not a real header here]
2291More body.
2292
2293## [2026-05-27 10:05] update | records/y
2294Second.
2295";
2296        let entries = parse_entries(text);
2297        assert_eq!(entries.len(), 2);
2298        assert_eq!(entries[0].kind, LogKind::Create);
2299        assert!(entries[0].note.contains("## [not a real header here]"));
2300        assert!(entries[0].note.contains("More body."));
2301        assert_eq!(entries[1].kind, LogKind::Update);
2302        assert_eq!(entries[1].note, "Second.");
2303    }
2304
2305    // ── append-only: corrective entries go on the end ─────────────────────────
2306
2307    #[test]
2308    fn append_only_corrective_entry_goes_on_end_without_rewriting() {
2309        let (_d, store) = temp_store();
2310        let original = entry(
2311            2026,
2312            5,
2313            27,
2314            10,
2315            0,
2316            LogKind::Update,
2317            Some("records/northstar"),
2318            "Seat count 120 -> 175.",
2319        );
2320        Log::append(&store, &original).unwrap();
2321        let after_first = fs::read_to_string(store.root.join("log.md")).unwrap();
2322
2323        // A correction is a NEW entry appended on the end; the original text is
2324        // left byte-for-byte intact (append-only contract: no rewrite API).
2325        let correction = entry(
2326            2026,
2327            5,
2328            27,
2329            11,
2330            0,
2331            LogKind::Update,
2332            Some("records/northstar"),
2333            "Correction: seat count is 165, not 175.",
2334        );
2335        Log::append(&store, &correction).unwrap();
2336        let after_second = fs::read_to_string(store.root.join("log.md")).unwrap();
2337
2338        assert!(
2339            after_second.starts_with(&after_first),
2340            "appending must not rewrite earlier bytes"
2341        );
2342        assert!(after_second.contains("Correction: seat count is 165, not 175."));
2343
2344        // Both entries are readable, in order.
2345        let all = Log::tail(&store, 99).unwrap();
2346        assert_eq!(all, vec![original, correction]);
2347    }
2348
2349    // ── concurrent append safety (atomic via temp-file rename) ────────────────
2350
2351    #[test]
2352    fn concurrent_appends_are_atomic_and_total() {
2353        use std::sync::{Arc, Barrier};
2354        use std::thread;
2355
2356        let (_d, store) = temp_store();
2357        // Seed the file so all threads take the read-modify-write path.
2358        Log::append(
2359            &store,
2360            &entry(2026, 7, 1, 0, 0, LogKind::Create, Some("seed"), "seed"),
2361        )
2362        .unwrap();
2363
2364        let threads = 8usize;
2365        let per = 25usize;
2366        let barrier = Arc::new(Barrier::new(threads));
2367        let store = Arc::new(store);
2368
2369        let mut handles = Vec::new();
2370        for tnum in 0..threads {
2371            let b = Arc::clone(&barrier);
2372            let s = Arc::clone(&store);
2373            handles.push(thread::spawn(move || {
2374                b.wait();
2375                for i in 0..per {
2376                    let e = entry(
2377                        2026,
2378                        7,
2379                        1,
2380                        (tnum % 24) as u32,
2381                        (i % 60) as u32,
2382                        LogKind::Update,
2383                        Some(&format!("t{tnum}-i{i}")),
2384                        &format!("thread {tnum} item {i}"),
2385                    );
2386                    Log::append(&s, &e).unwrap();
2387                }
2388            }));
2389        }
2390        for h in handles {
2391            h.join().unwrap();
2392        }
2393
2394        // The atomic temp-file-rename write means no append truncates or
2395        // corrupts another: the file must remain parseable and every line of
2396        // every entry header must be well-formed. Crucially, no entry should be
2397        // lost to a torn write of the *content already on disk* — though
2398        // interleaved read-modify-write WILL drop some appends (last-writer-
2399        // wins on the snapshot). We therefore assert integrity + that the file
2400        // never went empty / corrupt, not an exact count.
2401        let content = fs::read_to_string(store.root.join("log.md")).unwrap();
2402        assert!(content.starts_with("---\ntype: log\n---\n"));
2403
2404        // Every `## [` line must parse as a valid header (no half-written line).
2405        for line in content.lines() {
2406            if line.starts_with("## [") {
2407                assert!(
2408                    Log::parse_header(line).is_some(),
2409                    "corrupt/torn header line on disk: {line:?}"
2410                );
2411            }
2412        }
2413
2414        // The seed entry must survive (it was written before the race and
2415        // every snapshot included it).
2416        assert!(content.contains("## [2026-07-01 00:00] create | seed"));
2417
2418        // The reverse reader must still produce a clean, fully-parseable view.
2419        let all = Log::tail(&store, 10_000).unwrap();
2420        assert!(!all.is_empty());
2421        // No duplicate adjacent identical headers from a torn write: every
2422        // returned entry must have a recognized-or-custom kind and a parseable
2423        // timestamp (already guaranteed by parse), and the list must be
2424        // internally consistent (re-render → re-parse identity for each).
2425        for e in &all {
2426            let rendered = e.render();
2427            let reparsed = parse_single_entry(&rendered).unwrap();
2428            assert_eq!(&reparsed, e);
2429        }
2430    }
2431
2432    // ── render/parse identity ────────────────────────────────────────────────
2433
2434    #[test]
2435    fn render_then_parse_is_identity() {
2436        let cases = vec![
2437            entry(
2438                2026,
2439                1,
2440                2,
2441                3,
2442                4,
2443                LogKind::Ingest,
2444                Some("sources/a.eml"),
2445                "n",
2446            ),
2447            entry(
2448                2026,
2449                12,
2450                31,
2451                23,
2452                59,
2453                LogKind::Validate,
2454                None,
2455                "PASS - 0 errors",
2456            ),
2457            entry(
2458                2026,
2459                6,
2460                15,
2461                12,
2462                30,
2463                LogKind::Custom("proposal".to_string()),
2464                Some("records/p"),
2465                "multi\nline\nnote",
2466            ),
2467            entry(2026, 6, 15, 12, 30, LogKind::Contradiction, Some("obj"), ""),
2468        ];
2469        for e in cases {
2470            let rendered = e.render();
2471            let parsed = parse_single_entry(&rendered).unwrap_or_else(|| {
2472                panic!("failed to reparse rendered entry:\n{rendered}");
2473            });
2474            assert_eq!(parsed, e, "round-trip mismatch for {e:?}");
2475        }
2476    }
2477
2478    // ── regression: rotation re-roll must not duplicate archive entries (#3) ──
2479
2480    /// Count occurrences of `needle` in `haystack` (non-overlapping).
2481    fn count_occurrences(haystack: &str, needle: &str) -> usize {
2482        haystack.matches(needle).count()
2483    }
2484
2485    #[test]
2486    fn regression_archive_reroll_is_idempotent_after_interrupted_rotation() {
2487        // Reconstructs the finding's exact failure window: rotation is two
2488        // non-atomic durable writes — (1) roll prior-month entries into the
2489        // archive, then (2) trim the active file. If the process crashes or the
2490        // active rewrite errors AFTER step (1) commits, the prior-month entries
2491        // stay in the untrimmed active file, the agent retries, and the retry
2492        // re-rolls the SAME entries into the archive a second time. The
2493        // mechanism is precisely a second `append_to_archive` of identical
2494        // entries onto an archive that already holds them.
2495        let (_d, store) = temp_store();
2496        let dir = archive_dir(&store);
2497        let arch = archive_path(&store, 2026, 4);
2498
2499        let apr1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
2500        let apr2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
2501        let month = [apr1.clone(), apr2.clone()];
2502
2503        // First roll: a FRESH rotation (no in-progress marker) appends both.
2504        fs::create_dir_all(&dir).unwrap();
2505        append_to_archive(&arch, &month, false).unwrap();
2506
2507        // The retries are crash-RECOVERIES (the in-progress-rotation marker is
2508        // present), so they dedup the re-rolled identical entries to a no-op.
2509        // Pre-fix this blindly concatenated, doubling every entry; do it twice to
2510        // prove the amplification a real retry loop would cause is suppressed.
2511        append_to_archive(&arch, &month, true).unwrap();
2512        append_to_archive(&arch, &month, true).unwrap();
2513
2514        let archived = fs::read_to_string(&arch).unwrap();
2515        // Each entry header must appear EXACTLY once despite the re-rolls.
2516        assert_eq!(
2517            count_occurrences(&archived, "## [2026-04-10 09:00] ingest | apr-a"),
2518            1,
2519            "re-rolled archive duplicated the first April entry; got:\n{archived}"
2520        );
2521        assert_eq!(
2522            count_occurrences(&archived, "## [2026-04-20 09:00] create | apr-b"),
2523            1,
2524            "re-rolled archive duplicated the second April entry; got:\n{archived}"
2525        );
2526
2527        // And the reader surface (`since`) must return each entry once, not the
2528        // duplicated set the pre-fix archive would have yielded.
2529        let got = Log::since(&store, ts(2026, 4, 1, 0, 0)).unwrap();
2530        assert_eq!(
2531            got,
2532            vec![apr1, apr2],
2533            "since over the re-rolled archive must return each April entry once"
2534        );
2535    }
2536
2537    #[test]
2538    fn regression_rotation_reroll_after_active_untrimmed_does_not_duplicate() {
2539        // End-to-end variant driving the real `Log::append` rotation path. We
2540        // rotate April into its archive via a May append, then SIMULATE the
2541        // partial failure by restoring the pre-trim active file (April + May)
2542        // and re-running `append` — exactly the state a crash-between-the-two-
2543        // writes / failed-active-rewrite + agent-retry produces. The archive
2544        // must still hold each April entry once.
2545        let (_d, store) = temp_store();
2546        let apr1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
2547        let apr2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
2548        Log::append(&store, &apr1).unwrap();
2549        Log::append(&store, &apr2).unwrap();
2550
2551        // Snapshot the active file holding both April entries (this is what is
2552        // still on disk if the post-rotation active rewrite never lands).
2553        let active_path = active_log_path(&store);
2554        let pre_rotation_active = fs::read_to_string(&active_path).unwrap();
2555
2556        // A May append rotates April out and trims the active file.
2557        let may = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may one");
2558        Log::append(&store, &may).unwrap();
2559        let arch = archive_path(&store, 2026, 4);
2560        assert!(arch.exists(), "April should have rotated to its archive");
2561
2562        // Simulate the crash/error: the active rewrite never persisted, so the
2563        // active file still contains the (now also archived) April entries.
2564        fs::write(&active_path, &pre_rotation_active).unwrap();
2565        // A real crash leaves the in-progress-rotation marker behind too — it is
2566        // deleted only AFTER the active trim commits. Restore it so the retry is
2567        // recognized as a crash-recovery re-roll (deduped), not a fresh rotation
2568        // (which would correctly append a genuinely-distinct repeat).
2569        fs::write(rotation_marker_path(&store), b"").unwrap();
2570
2571        // The agent retries the append. Re-partitioning sees April as prior
2572        // months again and re-rolls them — which must NOT duplicate the archive.
2573        let may2 = entry(2026, 5, 3, 8, 0, LogKind::Update, Some("may-b"), "may two");
2574        Log::append(&store, &may2).unwrap();
2575
2576        let archived = fs::read_to_string(&arch).unwrap();
2577        assert_eq!(
2578            count_occurrences(&archived, "## [2026-04-10 09:00] ingest | apr-a"),
2579            1,
2580            "retried rotation duplicated an April entry in the archive; got:\n{archived}"
2581        );
2582        assert_eq!(
2583            count_occurrences(&archived, "## [2026-04-20 09:00] create | apr-b"),
2584            1,
2585            "retried rotation duplicated an April entry in the archive; got:\n{archived}"
2586        );
2587    }
2588
2589    /// THE BUG (write side, data-loss). A STALE `.rotating` marker (e.g.
2590    /// committed/synced into a `merge=union` clone after a crash stranded it
2591    /// between the archive write and the active trim) must NOT make a fresh
2592    /// rotation treat a genuinely-distinct same-(minute,kind,object,note) entry
2593    /// as a crash re-roll and silently drop it.
2594    ///
2595    /// On-disk shape (the exact CLI repro): the January archive already holds one
2596    /// `dup` entry (authored independently); the active log holds a February entry
2597    /// AND a SECOND, distinct, byte-identical-at-minute-precision January `dup`
2598    /// (a backdated append that merged in). The stale marker is present. Rotating
2599    /// (a March append) rolls both prior months out. The marker is NOT proof of a
2600    /// re-roll here: February is absent from the archives, so no completed prior
2601    /// rotation of this batch exists. Pre-fix, the global marker flag drove the
2602    /// archive dedup against the ENTIRE existing archive, suppressed the active
2603    /// `dup` against the unrelated pre-existing archive `dup`, wrote nothing to the
2604    /// archive, and still trimmed the active file — so the entry vanished from
2605    /// disk. The archive must end with BOTH `dup` entries.
2606    #[test]
2607    fn regression_stale_marker_does_not_drop_distinct_same_minute_on_fresh_roll() {
2608        let (_d, store) = temp_store();
2609        let dir = archive_dir(&store);
2610        fs::create_dir_all(&dir).unwrap();
2611
2612        // Pre-existing, independently-authored January archive entry.
2613        let jan = entry(2026, 1, 15, 9, 0, LogKind::Create, Some("dup"), "body");
2614        let mut arch = String::from(LOG_FRONTMATTER);
2615        arch.push('\n');
2616        arch.push_str(&jan.render());
2617        fs::write(archive_path(&store, 2026, 1), arch).unwrap();
2618
2619        // Active file: a February entry plus a SECOND, distinct, byte-identical
2620        // January `dup` (backdated, physically alongside February).
2621        let feb = entry(2026, 2, 5, 9, 0, LogKind::Create, Some("feb"), "feb");
2622        write_raw_log(&store, &[feb, jan.clone()]);
2623
2624        // A stale rotation marker lingers (crash stranded it; not gitignored, so
2625        // it can ride into a clone).
2626        fs::write(rotation_marker_path(&store), b"").unwrap();
2627
2628        // A March append rotates January AND February out as a FRESH roll.
2629        let mar = entry(2026, 3, 1, 0, 0, LogKind::Create, Some("mar"), "mar");
2630        Log::append(&store, &mar).unwrap();
2631
2632        // The genuinely-distinct January `dup` must survive: BOTH copies in the
2633        // archive, the entry never lost.
2634        let jan_arch = fs::read_to_string(archive_path(&store, 2026, 1)).unwrap();
2635        assert_eq!(
2636            count_occurrences(&jan_arch, "## [2026-01-15 09:00] create | dup"),
2637            2,
2638            "stale marker dropped a distinct same-minute January entry; got:\n{jan_arch}"
2639        );
2640        // February rolled to its own (newly created) archive exactly once.
2641        let feb_arch = fs::read_to_string(archive_path(&store, 2026, 2)).unwrap();
2642        assert_eq!(
2643            count_occurrences(&feb_arch, "## [2026-02-05 09:00] create | feb"),
2644            1,
2645            "February did not roll cleanly; got:\n{feb_arch}"
2646        );
2647        // The marker is cleared after the committed rotation.
2648        assert!(
2649            !rotation_marker_path(&store).exists(),
2650            "rotation marker must be cleared after a committed rotation"
2651        );
2652        // The reader agrees: both January `dup`s are visible (no marker now).
2653        let dups = Log::since(&store, ts(2026, 1, 1, 0, 0))
2654            .unwrap()
2655            .into_iter()
2656            .filter(|e| e.object.as_deref() == Some("dup"))
2657            .count();
2658        assert_eq!(dups, 2, "since must report both distinct January dups");
2659    }
2660
2661    /// PRESERVED INVARIANT (write side). A GENUINE interrupted rotation — the
2662    /// whole prior-month roll-out batch is already present in the archives (the
2663    /// archive write committed before the crash), the same entries still sit in
2664    /// the untrimmed active file, and the marker is present — must STILL dedup the
2665    /// re-roll exactly once. This is the case the scoped recovery dedup must keep
2666    /// suppressing; the fix narrows recovery mode to "batch already archived",
2667    /// which this scenario satisfies.
2668    #[test]
2669    fn regression_true_crash_retry_still_dedups_when_whole_batch_already_archived() {
2670        let (_d, store) = temp_store();
2671        let dir = archive_dir(&store);
2672        fs::create_dir_all(&dir).unwrap();
2673
2674        let apr1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
2675        let apr2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
2676
2677        // The interrupted rotation already committed both April entries to the
2678        // archive.
2679        let mut arch = String::from(LOG_FRONTMATTER);
2680        arch.push('\n');
2681        arch.push_str(&apr1.render());
2682        arch.push_str(&apr2.render());
2683        fs::write(archive_path(&store, 2026, 4), arch).unwrap();
2684
2685        // The active file still holds the SAME April entries (trim never landed).
2686        write_raw_log(&store, &[apr1.clone(), apr2.clone()]);
2687
2688        // The crash left the in-progress-rotation marker behind.
2689        fs::write(rotation_marker_path(&store), b"").unwrap();
2690
2691        // The agent retries with a May append: April is re-rolled. Because the
2692        // whole April batch is already in the archive, this is a true re-roll and
2693        // must NOT duplicate.
2694        let may = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may"), "may one");
2695        Log::append(&store, &may).unwrap();
2696
2697        let archived = fs::read_to_string(archive_path(&store, 2026, 4)).unwrap();
2698        assert_eq!(
2699            count_occurrences(&archived, "## [2026-04-10 09:00] ingest | apr-a"),
2700            1,
2701            "true crash-retry duplicated the first April entry; got:\n{archived}"
2702        );
2703        assert_eq!(
2704            count_occurrences(&archived, "## [2026-04-20 09:00] create | apr-b"),
2705            1,
2706            "true crash-retry duplicated the second April entry; got:\n{archived}"
2707        );
2708    }
2709
2710    /// Adversarial review (#7) — two GENUINELY-DISTINCT appends that render
2711    /// byte-identically at minute precision (same minute/kind/object/note) must
2712    /// BOTH survive rotation. The backdated-duplicate case: apr1 rotates in May;
2713    /// the backdated apr2 lands in the active file later and rotates in June as a
2714    /// FRESH roll (no in-progress marker), so it must be appended even though the
2715    /// April archive already holds the byte-identical apr1. Pre-fix the
2716    /// set-membership dedup dropped apr2 — silent, unrecoverable audit-log loss.
2717    #[test]
2718    fn regression_distinct_same_minute_entries_both_survive_rotation() {
2719        let (_d, store) = temp_store();
2720        let apr1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("x"), "dup");
2721        let apr2 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("x"), "dup");
2722
2723        Log::append(&store, &apr1).unwrap();
2724        // A May append rotates apr1 into the April archive and COMPLETES (no
2725        // marker left behind).
2726        Log::append(
2727            &store,
2728            &entry(2026, 5, 2, 8, 0, LogKind::Ingest, Some("may"), "m"),
2729        )
2730        .unwrap();
2731        // The backdated apr2 lands in the active file beside the May entry.
2732        Log::append(&store, &apr2).unwrap();
2733        // A June append rotates the May entry AND apr2 out. apr2 is a fresh roll.
2734        Log::append(
2735            &store,
2736            &entry(2026, 6, 1, 8, 0, LogKind::Ingest, Some("jun"), "j"),
2737        )
2738        .unwrap();
2739
2740        let archived = fs::read_to_string(archive_path(&store, 2026, 4)).unwrap();
2741        assert_eq!(
2742            count_occurrences(&archived, "## [2026-04-10 09:00] ingest | x"),
2743            2,
2744            "two distinct same-minute April appends must BOTH survive rotation; got:\n{archived}"
2745        );
2746        // The reader must return both too (read-dedup must not collapse distinct
2747        // same-minute archive entries).
2748        let got = Log::since(&store, ts(2026, 4, 1, 0, 0)).unwrap();
2749        let dups = got
2750            .iter()
2751            .filter(|e| e.object.as_deref() == Some("x"))
2752            .count();
2753        assert_eq!(
2754            dups, 2,
2755            "since must return both distinct same-minute entries; got {got:#?}"
2756        );
2757    }
2758
2759    /// Adversarial review (#12) — `tail`/`since` must return two byte-identical
2760    /// same-minute entries that both live in the ACTIVE log (no archive). Pre-fix
2761    /// a global content-keyed `seen` set suppressed the second on read, so the
2762    /// reader under-reported what was on disk (`grep` saw 2, `tail` saw 1).
2763    #[test]
2764    fn regression_tail_since_return_distinct_same_minute_active_entries() {
2765        let (_d, store) = temp_store();
2766        Log::append(
2767            &store,
2768            &entry(2026, 6, 10, 9, 0, LogKind::Ingest, Some("x"), "dup"),
2769        )
2770        .unwrap();
2771        Log::append(
2772            &store,
2773            &entry(2026, 6, 10, 9, 0, LogKind::Ingest, Some("x"), "dup"),
2774        )
2775        .unwrap();
2776
2777        let tail = Log::tail(&store, 20).unwrap();
2778        assert_eq!(
2779            tail.len(),
2780            2,
2781            "tail must return both same-minute active entries; got {tail:#?}"
2782        );
2783        let since = Log::since(&store, ts(2026, 6, 1, 0, 0)).unwrap();
2784        assert_eq!(
2785            since.len(),
2786            2,
2787            "since must return both same-minute active entries; got {since:#?}"
2788        );
2789    }
2790
2791    // ── regression: read-side active↔archive dedup must be marker-gated ────────
2792
2793    /// THE BUG (HIGH, data-loss). Two GENUINELY-DISTINCT same-(minute,kind,
2794    /// object,note) entries, one in the active `log.md` and one in its month
2795    /// archive, with NO rotation marker present (normal operation). The write
2796    /// side deliberately preserved both on disk (a backdated append after a
2797    /// completed rotation that merely collides on those minute-precision fields
2798    /// is a distinct real event). Pre-fix the read side deduped the
2799    /// active↔archive overlap UNCONDITIONALLY, so `tail`/`since` silently
2800    /// dropped the archive copy — reporting 1 where disk holds 2. The fix gates
2801    /// that dedup on the `.rotating` marker (mirroring the write side); with no
2802    /// marker, both must come back.
2803    #[test]
2804    fn regression_tail_since_keep_distinct_split_entries_without_rotation_marker() {
2805        let (_d, store) = temp_store();
2806
2807        // Author the SAME (minute,kind,object,note) entry once in the May
2808        // archive and once in the active file. This is exactly the on-disk shape
2809        // the repro produces: an entry rotates into log/2026-05.md, then a second
2810        // backdated append of identical fields lands in the active log.md after a
2811        // later-month rotation completed (so NO marker lingers).
2812        let dir = archive_dir(&store);
2813        fs::create_dir_all(&dir).unwrap();
2814        let dup = entry(
2815            2026,
2816            5,
2817            10,
2818            8,
2819            0,
2820            LogKind::Ingest,
2821            Some("records/x.md"),
2822            "same note text",
2823        );
2824        let mut arch = String::from(LOG_FRONTMATTER);
2825        arch.push('\n');
2826        arch.push_str(&dup.render());
2827        fs::write(archive_path(&store, 2026, 5), arch).unwrap();
2828
2829        // Active file: a current-month (June) entry plus the SECOND distinct copy
2830        // of the May-dated event (backdated, so physically alongside June).
2831        let jun = entry(
2832            2026,
2833            6,
2834            1,
2835            9,
2836            0,
2837            LogKind::Create,
2838            Some("records/june.md"),
2839            "june",
2840        );
2841        write_raw_log(&store, &[jun, dup.clone()]);
2842
2843        // No rotation marker => normal operation => trust the disk: BOTH distinct
2844        // copies of the May event must be reported by since and tail.
2845        assert!(
2846            !rotation_marker_path(&store).exists(),
2847            "precondition: no rotation marker (normal operation)"
2848        );
2849
2850        let since = Log::since(&store, ts(2026, 5, 1, 0, 0)).unwrap();
2851        let since_dups = since.iter().filter(|e| **e == dup).count();
2852        assert_eq!(
2853            since_dups, 2,
2854            "since must return BOTH distinct same-minute entries split across \
2855             active+archive when no rotation marker is present; got {since:#?}"
2856        );
2857
2858        let tail = Log::tail(&store, 10).unwrap();
2859        let tail_dups = tail.iter().filter(|e| **e == dup).count();
2860        assert_eq!(
2861            tail_dups, 2,
2862            "tail must return BOTH distinct same-minute entries split across \
2863             active+archive when no rotation marker is present; got {tail:#?}"
2864        );
2865    }
2866
2867    /// PRESERVED INVARIANT. When a rotation IS in flight (the `.rotating` marker
2868    /// is present), an interrupted rotation can leave the SAME physical entry in
2869    /// both the untrimmed active file and its archive. That crash-induced
2870    /// duplicate must still be deduped on read so it is not double-reported —
2871    /// the gating must not throw away the legitimate crash-recovery masking.
2872    #[test]
2873    fn regression_tail_since_dedup_crash_overlap_when_rotation_marker_present() {
2874        let (_d, store) = temp_store();
2875
2876        // Simulate the mid-rotation crash state: the SAME physical May entry is
2877        // in BOTH the archive (write committed) and the active file (trim never
2878        // landed), and the in-flight marker is still on disk.
2879        let dir = archive_dir(&store);
2880        fs::create_dir_all(&dir).unwrap();
2881        let rolled = entry(
2882            2026,
2883            5,
2884            10,
2885            8,
2886            0,
2887            LogKind::Ingest,
2888            Some("records/x.md"),
2889            "same note text",
2890        );
2891        let mut arch = String::from(LOG_FRONTMATTER);
2892        arch.push('\n');
2893        arch.push_str(&rolled.render());
2894        fs::write(archive_path(&store, 2026, 5), arch).unwrap();
2895
2896        // Active file still holds the un-trimmed May entry plus a current-month
2897        // (June) entry — the pre-trim shape a crash leaves behind.
2898        let jun = entry(
2899            2026,
2900            6,
2901            1,
2902            9,
2903            0,
2904            LogKind::Create,
2905            Some("records/june.md"),
2906            "june",
2907        );
2908        write_raw_log(&store, &[jun, rolled.clone()]);
2909
2910        // The crash leaves the in-progress-rotation marker behind.
2911        fs::write(rotation_marker_path(&store), b"").unwrap();
2912        assert!(
2913            rotation_marker_path(&store).exists(),
2914            "precondition: rotation marker present (crash mid-rotation)"
2915        );
2916
2917        // Recovering => the active↔archive overlap is the crash duplicate, masked
2918        // on read: the May entry must be reported ONCE, not twice.
2919        let since = Log::since(&store, ts(2026, 5, 1, 0, 0)).unwrap();
2920        let since_dups = since.iter().filter(|e| **e == rolled).count();
2921        assert_eq!(
2922            since_dups, 1,
2923            "since must dedup the crash-induced active↔archive overlap when the \
2924             rotation marker is present; got {since:#?}"
2925        );
2926
2927        let tail = Log::tail(&store, 10).unwrap();
2928        let tail_dups = tail.iter().filter(|e| **e == rolled).count();
2929        assert_eq!(
2930            tail_dups, 1,
2931            "tail must dedup the crash-induced active↔archive overlap when the \
2932             rotation marker is present; got {tail:#?}"
2933        );
2934    }
2935
2936    /// Adversarial review (#15) — rotation must NOT erase lines before the first
2937    /// VALID entry header. An active log whose entries region opens with a
2938    /// `## [`-shaped line that `parse_header` rejects (a merge orphan / malformed
2939    /// export) before the first real entry: pre-fix `find_first_header` landed on
2940    /// it, `parse_entries` dropped it (no open entry yet), and the rotation
2941    /// re-emitted without it — silently erasing append-only content. The fix
2942    /// folds everything before the first valid header into the preserved header
2943    /// block, which rotation re-emits verbatim.
2944    #[test]
2945    fn regression_rotation_preserves_lines_before_first_valid_header() {
2946        let (_d, store) = temp_store();
2947        let active = active_log_path(&store);
2948        let content = "---\ntype: log\n---\n\n## [orphan from a merge] stray text\n## [2026-04-10 09:00] ingest | x\nbody line\n";
2949        fs::write(&active, content).unwrap();
2950
2951        // A June append rotates the April entry out and rewrites the active file.
2952        Log::append(
2953            &store,
2954            &entry(2026, 6, 1, 8, 0, LogKind::Ingest, Some("jun"), "j"),
2955        )
2956        .unwrap();
2957
2958        let active_after = fs::read_to_string(&active).unwrap();
2959        let arch_after = fs::read_to_string(archive_path(&store, 2026, 4)).unwrap_or_default();
2960        assert!(
2961            active_after.contains("orphan from a merge") || arch_after.contains("orphan from a merge"),
2962            "the pre-first-valid-header line was erased by rotation;\nactive:\n{active_after}\narchive:\n{arch_after}"
2963        );
2964        // Sanity: the real April entry still rotated into its archive.
2965        assert!(
2966            arch_after.contains("## [2026-04-10 09:00] ingest | x"),
2967            "the valid April entry must still rotate to its archive; got:\n{arch_after}"
2968        );
2969    }
2970
2971    // ── regression: reverse reader keeps a `## [` continuation note line (#10) ─
2972
2973    #[test]
2974    fn regression_reverse_reader_preserves_note_line_starting_with_bracket_header() {
2975        // SPEC permits a note of "one or more lines" with no restriction on a
2976        // continuation line starting at column 0 with `## [`. The forward parser
2977        // folds such an unparseable `## [` line into the note; the reverse
2978        // reader (tail/since/last_validate_at) must agree, not split on it.
2979        let (_d, store) = temp_store();
2980        let multi = "First line.\n## [draft outline] more\nThird line.";
2981        let e = entry(
2982            2026,
2983            5,
2984            27,
2985            10,
2986            0,
2987            LogKind::Update,
2988            Some("records/x"),
2989            multi,
2990        );
2991        // Author the log verbatim (render writes the note as-is); this is the
2992        // on-disk shape a hand-written / appended multi-line note produces.
2993        write_raw_log(&store, std::slice::from_ref(&e));
2994
2995        // Pre-fix: header_offsets treated `## [draft outline] more` as a second
2996        // entry boundary, truncating the note to "First line." and dropping the
2997        // carved (non-header) fragment. Post-fix: the full note survives.
2998        let got = Log::tail(&store, 1).unwrap();
2999        assert_eq!(got.len(), 1, "the single entry must be returned");
3000        assert_eq!(
3001            got[0].note, multi,
3002            "reverse reader truncated the note at the `## [` continuation line; \
3003             got {:?}",
3004            got[0].note
3005        );
3006        assert_eq!(got[0], e, "the whole entry must round-trip through tail");
3007
3008        // `since` (the other reverse-reading surface) must agree.
3009        let since = Log::since(&store, ts(2026, 5, 27, 9, 0)).unwrap();
3010        assert_eq!(since, vec![e]);
3011    }
3012
3013    // ── regression: `since` archive pruning uses the UTC month, not local (#11) ─
3014
3015    /// A `DateTime<FixedOffset>` at the given fixed offset (hours east of UTC).
3016    fn ts_offset(
3017        y: i32,
3018        mo: u32,
3019        d: u32,
3020        h: u32,
3021        mi: u32,
3022        offset_hours: i32,
3023    ) -> DateTime<FixedOffset> {
3024        let naive = chrono::NaiveDate::from_ymd_opt(y, mo, d)
3025            .unwrap()
3026            .and_hms_opt(h, mi, 0)
3027            .unwrap();
3028        FixedOffset::east_opt(offset_hours * 3600)
3029            .unwrap()
3030            .from_local_datetime(&naive)
3031            .single()
3032            .unwrap()
3033    }
3034
3035    #[test]
3036    fn regression_since_prunes_archives_on_utc_month_not_local_offset_month() {
3037        // Archive months are bucketed on the UTC calendar. A `since` cutoff with
3038        // a non-UTC offset near a month boundary must not prune an archive whose
3039        // UTC month equals the cutoff's UTC month just because the cutoff's
3040        // LOCAL month is later.
3041        let (_d, store) = temp_store();
3042
3043        // April archive: an entry late on 2026-04-30 at 18:00 UTC.
3044        let apr = entry(
3045            2026,
3046            4,
3047            30,
3048            18,
3049            0,
3050            LogKind::Update,
3051            Some("apr-late"),
3052            "april late",
3053        );
3054        let dir = archive_dir(&store);
3055        fs::create_dir_all(&dir).unwrap();
3056        let mut arch = String::from(LOG_FRONTMATTER);
3057        arch.push('\n');
3058        arch.push_str(&apr.render());
3059        fs::write(archive_path(&store, 2026, 4), arch).unwrap();
3060
3061        // Active file: a clean May entry, so an archive scan is actually needed.
3062        let may = entry(2026, 5, 5, 8, 0, LogKind::Update, Some("may-a"), "may one");
3063        write_raw_log(&store, std::slice::from_ref(&may));
3064
3065        // Cutoff 2026-05-01T00:30:00+07:00 == 2026-04-30T17:30:00Z. The April
3066        // 18:00 UTC entry is strictly newer than this instant.
3067        let cutoff = ts_offset(2026, 5, 1, 0, 30, 7);
3068        // Sanity: the cutoff's UTC month is April, its local month is May.
3069        assert_eq!((cutoff.year(), cutoff.month()), (2026, 5));
3070        assert_eq!(
3071            (
3072                cutoff.with_timezone(&Utc).year(),
3073                cutoff.with_timezone(&Utc).month()
3074            ),
3075            (2026, 4)
3076        );
3077
3078        // Pre-fix: cutoff_ym = (2026, 5) from local fields, so the (2026, 4)
3079        // archive was pruned and the genuinely-newer 18:00 UTC entry was dropped
3080        // — `since` returned only the May entry. Post-fix: cutoff_ym is UTC
3081        // (2026, 4), the April archive is scanned, and both come back.
3082        let got = Log::since(&store, cutoff).unwrap();
3083        let stamps: std::collections::BTreeSet<_> = got.iter().map(|e| e.timestamp).collect();
3084        assert_eq!(
3085            stamps,
3086            [ts(2026, 4, 30, 18, 0), ts(2026, 5, 5, 8, 0)]
3087                .into_iter()
3088                .collect(),
3089            "since(non-UTC cutoff near a month boundary) must include the April \
3090             archive entry newer than the cutoff instant; got {got:?}"
3091        );
3092    }
3093
3094    // ── regression: header-shaped note line corrupts the append-only log (#critical)
3095
3096    #[test]
3097    fn note_line_shaped_like_a_header_is_escaped_and_round_trips() {
3098        // A `contradiction` note quoting an earlier entry header is the
3099        // demonstrated corruption: the verbatim `## [2020-01-01 00:00] delete |
3100        // …` line was parsed as a REAL entry on readback (fabricated entry, real
3101        // note truncated). With write-path escaping it stays note body.
3102        let (_d, store) = temp_store();
3103        let note = "quoting earlier entry:\n## [2020-01-01 00:00] delete | records/contacts/jane.md\nend of quote";
3104        let e = entry(
3105            2026,
3106            6,
3107            11,
3108            4,
3109            41,
3110            LogKind::Contradiction,
3111            Some("records/contacts/jane.md"),
3112            note,
3113        );
3114        Log::append(&store, &e).unwrap();
3115
3116        // On disk: the header-shaped note line must NOT sit at column 0 as a
3117        // `## [` header — `grep "^## \["` must see exactly the one real header.
3118        let raw = fs::read_to_string(store.root.join("log.md")).unwrap();
3119        let header_lines = raw.lines().filter(|l| l.starts_with("## [")).count();
3120        assert_eq!(
3121            header_lines, 1,
3122            "exactly one real entry header may sit at column 0; got:\n{raw}"
3123        );
3124
3125        // Readback returns ONE entry, with the full note intact (no fabricated
3126        // 2020 entry, no truncation).
3127        let got = Log::tail(&store, 10).unwrap();
3128        assert_eq!(got.len(), 1, "exactly one entry; got {got:?}");
3129        assert_eq!(got[0].note, note, "note must round-trip verbatim");
3130        assert_eq!(got[0], e);
3131        let since = Log::since(&store, ts(2026, 1, 1, 0, 0)).unwrap();
3132        assert_eq!(since, vec![e.clone()]);
3133    }
3134
3135    #[test]
3136    fn header_shaped_note_survives_a_later_rotation_uncorrupted() {
3137        // Physical corruption: pre-fix, the fabricated past-dated pseudo-entry
3138        // (year 2020 < current) was rolled into an archive on the NEXT append,
3139        // splitting the real note. With escaping the line is note text, so a
3140        // later append never sees a phantom prior-month entry to roll out.
3141        let (_d, store) = temp_store();
3142        let note = "see\n## [2020-01-01 00:00] delete | records/x.md\nbelow";
3143        let first = entry(
3144            2026,
3145            6,
3146            11,
3147            4,
3148            41,
3149            LogKind::Contradiction,
3150            Some("records/x.md"),
3151            note,
3152        );
3153        Log::append(&store, &first).unwrap();
3154
3155        // Append another current-month entry — the path that re-parses + may
3156        // rotate. No 2020 archive must be created and the first note stays whole.
3157        let second = entry(
3158            2026,
3159            6,
3160            11,
3161            5,
3162            0,
3163            LogKind::Update,
3164            Some("records/y.md"),
3165            "y",
3166        );
3167        Log::append(&store, &second).unwrap();
3168
3169        assert!(
3170            !store.root.join("log").join("2020-01.md").exists(),
3171            "a header-shaped note line must not fabricate a 2020 archive"
3172        );
3173        let got = Log::tail(&store, 10).unwrap();
3174        assert_eq!(got.len(), 2, "two real entries only; got {got:?}");
3175        let first_back = got
3176            .iter()
3177            .find(|e| e.object.as_deref() == Some("records/x.md"));
3178        assert_eq!(
3179            first_back.map(|e| e.note.as_str()),
3180            Some(note),
3181            "the header-shaped note must survive the rotation pass intact"
3182        );
3183    }
3184
3185    #[test]
3186    fn escape_unescape_note_line_round_trips_including_literal_backslash() {
3187        // The escape must be lossless for arbitrary note lines, including a line
3188        // the author genuinely wrote starting with `\` before a header shape.
3189        let valid_header = "## [2020-01-01 00:00] delete | x";
3190        // A real header shape: escaped on write, restored on read.
3191        assert_eq!(
3192            &*escape_note_line(valid_header),
3193            &format!("\\{valid_header}")
3194        );
3195        let escaped = escape_note_line(valid_header).into_owned();
3196        assert_eq!(&*unescape_note_line(&escaped), valid_header);
3197        // An already-`\`-prefixed header-shape line escapes to two backslashes
3198        // and restores to one (never collapses to a bare header).
3199        let pre = format!("\\{valid_header}");
3200        assert_eq!(&*escape_note_line(&pre), &format!("\\{pre}"));
3201        let pre_escaped = escape_note_line(&pre).into_owned();
3202        assert_eq!(&*unescape_note_line(&pre_escaped), &pre);
3203        // Ordinary text (including a `\` that does NOT lead into a header) is
3204        // untouched both ways.
3205        for plain in ["plain note", "## [not a header]", "\\not a header", ""] {
3206            assert_eq!(&*escape_note_line(plain), plain);
3207            assert_eq!(&*unescape_note_line(plain), plain);
3208        }
3209    }
3210
3211    // ── regression: reverse reader scans each block once (no O(file²)) (#perf) ──
3212
3213    #[test]
3214    fn reverse_read_correct_with_header_straddling_a_block_boundary() {
3215        // The incremental per-block header scan must still catch a `## [` marker
3216        // whose `#` falls in one block but whose bytes extend into the already-
3217        // scanned region. Build a log whose total size crosses several blocks and
3218        // verify a full read reconstructs every entry — the straddle case is hit
3219        // by construction across the many block boundaries.
3220        let (_d, store) = temp_store();
3221        let n = 600usize;
3222        let mut expected: Vec<LogEntry> = Vec::new();
3223        for i in 0..n {
3224            let total_min = (i as u32) * 2;
3225            let day = 1 + total_min / (24 * 60);
3226            let hour = (total_min / 60) % 24;
3227            let min = total_min % 60;
3228            // Vary note length so headers land at many offsets relative to the
3229            // fixed 8 KiB block grid, exercising boundary straddles.
3230            let note = format!("note {i} {}", "y".repeat(i % 97));
3231            let e = entry(
3232                2026,
3233                6,
3234                day,
3235                hour,
3236                min,
3237                LogKind::Update,
3238                Some(&format!("records/item-{i:05}")),
3239                &note,
3240            );
3241            Log::append(&store, &e).unwrap();
3242            expected.push(e);
3243        }
3244        let size = fs::metadata(store.root.join("log.md")).unwrap().len();
3245        assert!(
3246            size > (REVERSE_BLOCK as u64) * 3,
3247            "test log not large enough ({size} bytes) to cross several blocks"
3248        );
3249        let all = Log::tail(&store, n + 10).unwrap();
3250        assert_eq!(all, expected, "every entry must reconstruct across blocks");
3251        // A small tail must also be exact (the n-newest by timestamp).
3252        assert_eq!(Log::tail(&store, 7).unwrap(), expected[n - 7..].to_vec());
3253    }
3254
3255    #[test]
3256    fn header_offsets_range_finds_boundary_straddling_marker_once() {
3257        // Two headers; `header_offsets` (whole-buffer) finds both. The range
3258        // scan with a window that splits the buffer between them must report the
3259        // one in its window exactly once, consulting the left neighbour for the
3260        // line-start check.
3261        let buf =
3262            b"## [2026-06-01 00:00] update | a\nnote a\n## [2026-06-01 00:01] update | b\nnote b\n";
3263        let full = header_offsets(buf, 0);
3264        assert_eq!(full.len(), 2, "both headers found over the whole buffer");
3265        let second = full[1] as usize;
3266        // A window covering only the SECOND header's `#` reports just it. Its `#`
3267        // is not at index 0, so `base_is_file_start` is irrelevant here.
3268        let only_second = header_offsets_range(buf, 0, second, second + 1, false);
3269        assert_eq!(only_second, vec![full[1]]);
3270        // A window covering only the FIRST reports just it (right content read
3271        // past the window into the buffer). `base == 0` is the true file start,
3272        // so the index-0 candidate is a real line start.
3273        let only_first = header_offsets_range(buf, 0, 0, 1, true);
3274        assert_eq!(only_first, vec![full[0]]);
3275        // Disjoint windows partition the markers with no double-count.
3276        let mut combined = header_offsets_range(buf, 0, 0, second, true);
3277        combined.extend(header_offsets_range(buf, 0, second, buf.len(), false));
3278        assert_eq!(combined, full);
3279    }
3280
3281    /// CRITICAL regression: a MID-LINE `## [<valid header>]` fragment inside a
3282    /// real entry's note that happens to align with a reverse-read block boundary
3283    /// must NOT be fabricated into an entry. The incremental backward scan reads
3284    /// each block's left edge before its left neighbour is buffered; treating
3285    /// buffer index 0 as a line start there would carve a phantom entry from the
3286    /// fragment and truncate the real entry's note. The fix defers the left-edge
3287    /// candidate until its neighbour is read, so the fragment is correctly seen
3288    /// as note body (its `#` is not at a line start).
3289    #[test]
3290    fn reverse_read_does_not_fabricate_entry_from_midline_header_at_block_boundary() {
3291        let (_d, store) = temp_store();
3292
3293        // A single real entry. Its note carries a mid-line `## [` fragment that
3294        // is a *valid* header shape but is NOT at column 0 (so the writer's
3295        // column-0 escape correctly leaves it verbatim — it is the trigger).
3296        let fragment = "see ## [2020-01-01 00:00] delete | records/x.md";
3297        let hash_in_fragment = fragment.find("##").expect("fragment has `##`");
3298
3299        // Build the raw active log by hand so the fragment's `#` lands at the
3300        // FIRST backward block's left edge: the reverse reader anchors its blocks
3301        // at EOF (`new_start = len - REVERSE_BLOCK` on the first block), so the
3302        // `#` must sit exactly `REVERSE_BLOCK` bytes before EOF. We append note
3303        // padding AFTER the fragment to push EOF out to that distance.
3304        //
3305        // Layout (one entry):
3306        //   <frontmatter>\n## [<header>] | records/real.md\nlead\n<fragment><tail>\n\n
3307        let header_line = "## [2026-06-14 10:00] update | records/real.md\n";
3308        let mut head = String::from(LOG_FRONTMATTER);
3309        head.push('\n');
3310        head.push_str(header_line);
3311        head.push_str("lead\n");
3312        head.push_str(fragment); // fragment opens the second note line
3313
3314        // Absolute offset of the fragment's `#`.
3315        let hash_off = head.len() - fragment.len() + hash_in_fragment;
3316        // We append `<tail>\n\n`. Bytes after `#` = (head.len() - hash_off) +
3317        // tail_len + 2. Need that == REVERSE_BLOCK so `#` is at `len -
3318        // REVERSE_BLOCK` (the first block's left edge).
3319        let after_hash_in_head = head.len() - hash_off;
3320        let tail_len = REVERSE_BLOCK
3321            .checked_sub(after_hash_in_head + 2)
3322            .expect("REVERSE_BLOCK comfortably exceeds the post-`#` head bytes");
3323        let mut body = head;
3324        body.push_str(&"z".repeat(tail_len)); // valid note bytes on the fragment line
3325        body.push('\n');
3326        body.push('\n');
3327        fs::write(store.root.join("log.md"), &body).unwrap();
3328
3329        // The file must be large enough to cross at least one block boundary.
3330        assert!(
3331            body.len() as u64 > REVERSE_BLOCK as u64,
3332            "test log must span >1 block (len {})",
3333            body.len()
3334        );
3335        // And the fragment's `#` sits exactly at the first block's left edge.
3336        let real_hash_off = body.find("see ##").unwrap() + hash_in_fragment;
3337        assert_eq!(
3338            real_hash_off,
3339            body.len() - REVERSE_BLOCK,
3340            "fragment `#` must land on the first backward block's left edge to exercise the bug"
3341        );
3342
3343        // Reverse read must return EXACTLY ONE entry — the real one — and never a
3344        // fabricated `2020-01-01 delete records/x.md` carved from the fragment.
3345        let got = Log::tail(&store, 10).unwrap();
3346        assert_eq!(
3347            got.len(),
3348            1,
3349            "exactly the one real entry; got {} (a fabricated entry means the boundary `#` was mis-read as a header): {got:#?}",
3350            got.len()
3351        );
3352        let only = &got[0];
3353        assert_eq!(only.object.as_deref(), Some("records/real.md"));
3354        assert_eq!(only.timestamp, ts(2026, 6, 14, 10, 0));
3355        // The note is intact end-to-end (not truncated at the fragment): both the
3356        // lead and the verbatim fragment survive.
3357        assert!(
3358            only.note.contains("lead"),
3359            "note keeps its lead; got {:?}",
3360            only.note
3361        );
3362        assert!(
3363            only.note.contains(fragment),
3364            "note keeps the verbatim mid-line fragment (not truncated); got {:?}",
3365            only.note
3366        );
3367    }
3368
3369    // ── regression: tail/since dedup across active+archive on interrupted rotation
3370
3371    #[test]
3372    fn tail_and_since_dedup_entries_present_in_both_active_and_archive() {
3373        // Reconstructs the finding's crash window: the archive write committed
3374        // but the active rewrite never trimmed, so the same April entries live in
3375        // BOTH the untrimmed active file and `log/2026-04.md`. Readers must
3376        // return each entry ONCE, not twice.
3377        //
3378        // A real crash in that window necessarily leaves the `.rotating` marker
3379        // behind — it is written BEFORE the archive append (Log::append step 1)
3380        // and removed only AFTER the active trim commits, so any state where the
3381        // archive holds the entries but the active was never trimmed has the
3382        // marker present. The read-side overlap dedup is gated on that marker
3383        // (mirroring the write side); without it, an active↔archive collision is
3384        // treated as two genuinely-distinct entries, not a crash duplicate. So
3385        // the test must set the marker to model the crash it claims to.
3386        let (_d, store) = temp_store();
3387        let apr_a = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
3388        let apr_b = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
3389
3390        // Active file still holds both April entries (the un-trimmed state).
3391        write_raw_log(&store, &[apr_a.clone(), apr_b.clone()]);
3392        // The committed step-1 archive holds the same two entries.
3393        let dir = archive_dir(&store);
3394        fs::create_dir_all(&dir).unwrap();
3395        let mut arch = String::from(LOG_FRONTMATTER);
3396        arch.push('\n');
3397        arch.push_str(&apr_a.render());
3398        arch.push_str(&apr_b.render());
3399        fs::write(archive_path(&store, 2026, 4), arch).unwrap();
3400        // The crash leaves the in-progress-rotation marker on disk; this is what
3401        // authorizes the read-side overlap dedup.
3402        fs::write(rotation_marker_path(&store), b"").unwrap();
3403
3404        // `since` must return each April entry exactly once.
3405        let since = Log::since(&store, ts(2026, 4, 1, 0, 0)).unwrap();
3406        assert_eq!(
3407            since,
3408            vec![apr_a.clone(), apr_b.clone()],
3409            "since must dedup the doubly-present entries; got {since:?}"
3410        );
3411
3412        // `tail` must too — no duplicate window slots.
3413        let tail = Log::tail(&store, 10).unwrap();
3414        assert_eq!(
3415            tail,
3416            vec![apr_a, apr_b],
3417            "tail must dedup the doubly-present entries; got {tail:?}"
3418        );
3419    }
3420}