Skip to main content

dbmd_core/
log.rs

1//! `log` — the append-only, month-rotating chronological log.
2//!
3//! One logical timeline: the active `log.md` at the store root plus
4//! `log/<YYYY-MM>.md` archives. [`Log::append`] rolls older months into
5//! archives on write so the active file stays current-month. [`Log::tail`] and
6//! [`Log::since`] **reverse-read from EOF**. Both read each file they touch in
7//! full — the on-disk order is not guaranteed monotonic, so neither can
8//! early-stop within a file — and select by timestamp: `tail` keeps the `n`
9//! newest, `since` keeps everything newer than the cutoff. Both cross into
10//! month archives only as far back as the requested window reaches (by the
11//! cutoff's month for `since`, by the current `n`th-newest's month for `tail`)
12//! — never the whole history.
13//!
14//! Append-only contract: there is no rewrite API. Corrective entries go on the
15//! end; out-of-order timestamps are a validate warning (`LOG_OUT_OF_ORDER`),
16//! signalling a probable rewrite.
17
18use std::collections::BTreeMap;
19use std::fs::{self, File};
20use std::io::{Read, Seek, SeekFrom, Write};
21use std::path::{Path, PathBuf};
22
23use chrono::{DateTime, Datelike, FixedOffset, NaiveDateTime, TimeZone};
24
25use crate::store::Store;
26
27/// The on-disk header timestamp format: `YYYY-MM-DD HH:MM` (minute precision,
28/// no timezone). Parsing reattaches UTC; emitting renders the entry's own
29/// wall-clock, so a read→write→read round-trip is stable at minute precision.
30const TS_FORMAT: &str = "%Y-%m-%d %H:%M";
31
32/// The frontmatter block written when the active `log.md` is created.
33const LOG_FRONTMATTER: &str = "---\ntype: log\n---\n\n# Curator log\n";
34
35/// Block size for the backward (reverse-from-EOF) reader.
36const REVERSE_BLOCK: usize = 8 * 1024;
37
38/// A recognized `log.md` entry kind. Custom kinds are valid in the format
39/// (`dbmd validate` warns on unrecognized via `LOG_UNKNOWN_KIND`); this enum
40/// carries the recognized vocabulary plus a [`LogKind::Custom`] catch-all so an
41/// unknown kind round-trips without loss.
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum LogKind {
44    /// A source artifact was ingested.
45    Ingest,
46    /// A file was created.
47    Create,
48    /// A file was updated.
49    Update,
50    /// A file was deleted.
51    Delete,
52    /// A file was renamed/moved.
53    Rename,
54    /// A wiki-link was added.
55    Link,
56    /// A validation pass ran.
57    Validate,
58    /// The index was rebuilt.
59    IndexRebuild,
60    /// A contradiction between sources was flagged.
61    Contradiction,
62    /// Any kind outside the recognized vocabulary, preserved verbatim.
63    Custom(String),
64}
65
66impl LogKind {
67    /// The canonical lowercase string for this kind, as it appears in a log
68    /// header (`ingest`, `index-rebuild`, …).
69    pub fn as_str(&self) -> &str {
70        match self {
71            LogKind::Ingest => "ingest",
72            LogKind::Create => "create",
73            LogKind::Update => "update",
74            LogKind::Delete => "delete",
75            LogKind::Rename => "rename",
76            LogKind::Link => "link",
77            LogKind::Validate => "validate",
78            LogKind::IndexRebuild => "index-rebuild",
79            LogKind::Contradiction => "contradiction",
80            LogKind::Custom(s) => s,
81        }
82    }
83
84    /// Parse a kind from its header token; non-canonical tokens become
85    /// [`LogKind::Custom`].
86    pub fn parse(token: &str) -> LogKind {
87        match token {
88            "ingest" => LogKind::Ingest,
89            "create" => LogKind::Create,
90            "update" => LogKind::Update,
91            "delete" => LogKind::Delete,
92            "rename" => LogKind::Rename,
93            "link" => LogKind::Link,
94            "validate" => LogKind::Validate,
95            "index-rebuild" => LogKind::IndexRebuild,
96            "contradiction" => LogKind::Contradiction,
97            other => LogKind::Custom(other.to_string()),
98        }
99    }
100
101    /// True if this is one of the recognized kinds (i.e. not
102    /// [`LogKind::Custom`]).
103    pub fn is_recognized(&self) -> bool {
104        !matches!(self, LogKind::Custom(_))
105    }
106}
107
108/// One parsed `log.md` entry: a header
109/// (`## [YYYY-MM-DD HH:MM] <kind> | <object>`) plus its body.
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct LogEntry {
112    /// The entry timestamp from the header.
113    pub timestamp: DateTime<FixedOffset>,
114    /// The entry kind.
115    pub kind: LogKind,
116    /// The object slot — a store-relative path/wiki-link target, or `None` for
117    /// store-wide actions like `validate`.
118    pub object: Option<String>,
119    /// The free-form body (one or more lines) explaining what happened.
120    pub note: String,
121}
122
123impl LogEntry {
124    /// Render this entry as it appears on disk: the `## [...]` header line,
125    /// then the note body, then a trailing blank line so successive entries are
126    /// separated. The note is emitted verbatim (trailing whitespace trimmed).
127    fn render(&self) -> String {
128        let ts = self.timestamp.format(TS_FORMAT);
129        let mut out = String::new();
130        match &self.object {
131            Some(obj) => {
132                out.push_str(&format!("## [{}] {} | {}\n", ts, self.kind.as_str(), obj));
133            }
134            None => {
135                out.push_str(&format!("## [{}] {}\n", ts, self.kind.as_str()));
136            }
137        }
138        let note = self.note.trim_end_matches(['\n', '\r', ' ', '\t']);
139        if !note.is_empty() {
140            out.push_str(note);
141            out.push('\n');
142        }
143        out.push('\n');
144        out
145    }
146
147    /// The `(year, month)` of this entry's wall-clock timestamp — the rotation
148    /// bucket.
149    fn year_month(&self) -> (i32, u32) {
150        (self.timestamp.year(), self.timestamp.month())
151    }
152}
153
154/// The store's chronological log: a thin handle for the append-only timeline.
155/// All methods take the [`Store`] so they resolve the active `log.md` and the
156/// `log/` archives under the store root.
157#[derive(Debug, Clone)]
158pub struct Log;
159
160impl Log {
161    /// Atomically append `entry` to the active `log.md`, creating it (with
162    /// `type: log` frontmatter) if absent. **If the active log holds entries
163    /// from a prior month, roll those older months into `log/<YYYY-MM>.md`
164    /// first** (atomic move), keeping the active file to the current month.
165    pub fn append(store: &Store, entry: &LogEntry) -> crate::Result<()> {
166        let active = active_log_path(store);
167
168        // Read the active file's current contents (if any). The "current month"
169        // is the month of the entry being appended (the newest in the timeline);
170        // every existing entry from a strictly-earlier month rolls to archives.
171        let current_ym = entry.year_month();
172
173        if active.exists() {
174            let content = fs::read_to_string(&active)?;
175            let (header, entries) = parse_active(&content);
176
177            // Partition existing entries into prior-month (roll out) and
178            // current-or-later (keep in the active file).
179            let mut by_month: BTreeMap<(i32, u32), Vec<LogEntry>> = BTreeMap::new();
180            let mut keep: Vec<LogEntry> = Vec::new();
181            for e in entries {
182                if e.year_month() < current_ym {
183                    by_month.entry(e.year_month()).or_default().push(e);
184                } else {
185                    keep.push(e);
186                }
187            }
188
189            if !by_month.is_empty() {
190                // Roll each prior month into its archive (atomic per-file),
191                // appending to any existing archive for that month.
192                let dir = archive_dir(store);
193                fs::create_dir_all(&dir)?;
194                for ((y, m), month_entries) in &by_month {
195                    let path = archive_path(store, *y, *m);
196                    append_to_archive(&path, month_entries)?;
197                }
198
199                // Rewrite the active file to the kept (current-month) entries
200                // plus the new entry — atomically.
201                let mut body = String::new();
202                for e in &keep {
203                    body.push_str(&e.render());
204                }
205                body.push_str(&entry.render());
206                let full = compose_active(&header, &body);
207                write_atomic(&active, full.as_bytes())?;
208                return Ok(());
209            }
210
211            // No rotation needed: plain atomic append of the rendered entry.
212            let mut full = content;
213            if !full.ends_with('\n') {
214                full.push('\n');
215            }
216            full.push_str(&entry.render());
217            write_atomic(&active, full.as_bytes())?;
218            Ok(())
219        } else {
220            // Fresh log: frontmatter + the single entry.
221            if let Some(parent) = active.parent() {
222                fs::create_dir_all(parent)?;
223            }
224            let body = entry.render();
225            let full = compose_active(LOG_FRONTMATTER, &body);
226            write_atomic(&active, full.as_bytes())?;
227            Ok(())
228        }
229    }
230
231    /// The `n` most-recent entries **by timestamp**, returned oldest→newest.
232    ///
233    /// **Out-of-order safety (mirrors [`Log::since`]).** The log is append-only
234    /// but *not* guaranteed to be in non-decreasing timestamp order on disk: a
235    /// corrective entry is appended below the entry it corrects, a
236    /// backdated/clock-skewed write lands physically after newer entries, and a
237    /// `merge=union` clone merge interleaves both sides until a later agent
238    /// reorders. Out-of-order is only a `LOG_OUT_OF_ORDER` warning, never
239    /// rejected. So the last `n` *physical* entries are **not** the `n` newest
240    /// by time — taking them would omit a genuinely-recent entry that sits
241    /// physically before an older one, and the documented curator warm-up
242    /// (`dbmd log tail 20`) would report a stale picture of what was done lately.
243    /// We therefore feed every entry of each file we touch through a bounded
244    /// newest-by-timestamp window and let it select the true top `n`.
245    ///
246    /// Bounded cost: the active `log.md` is kept to the current month by
247    /// rotation, so a full read of it is cheap and is not a whole-store walk.
248    /// Across archives we *can* prune: each `log/<YYYY-MM>.md` holds only entries
249    /// from that month (rotation buckets by the entry's own year-month), so once
250    /// the window is full, an archive whose month is strictly before the
251    /// window-minimum's month cannot contain any entry newer than the current
252    /// `n`th-newest. We cross archives newest-month-first and stop at the first
253    /// such archive.
254    pub fn tail(store: &Store, n: usize) -> crate::Result<Vec<LogEntry>> {
255        if n == 0 {
256            return Ok(Vec::new());
257        }
258
259        // A bounded window of the `n` entries with the largest timestamps. No
260        // within-file early stop: out-of-order entries mean a newer entry can
261        // sit physically before an older one, so each file is read fully.
262        let mut window = NewestWindow::new(n);
263
264        // Active file: scan fully (current-month-bounded by rotation).
265        let active = active_log_path(store);
266        if active.exists() {
267            reverse_collect(&active, |e| {
268                window.consider(e);
269                false
270            })?;
271        }
272
273        // Archives, newest-month-first. Once the window is full, an archive
274        // whose month is strictly before the window-minimum's month holds only
275        // entries older than the current cutoff, so it (and every older archive)
276        // is skippable.
277        for archive in list_archives_desc(store)? {
278            if let (true, Some(cutoff_ym), Some(arch_ym)) = (
279                window.is_full(),
280                window.min_year_month(),
281                archive_year_month(&archive),
282            ) {
283                if arch_ym < cutoff_ym {
284                    break;
285                }
286            }
287            reverse_collect(&archive, |e| {
288                window.consider(e);
289                false
290            })?;
291        }
292
293        Ok(window.into_sorted())
294    }
295
296    /// Entries strictly newer than `time`, reverse-scanning active → archives.
297    ///
298    /// **No within-file early stop.** The log is append-only but *not*
299    /// guaranteed to be in non-decreasing timestamp order on disk: a corrective
300    /// entry is appended below the entry it corrects (SPEC: "if a finding is
301    /// wrong, append a corrective entry below it"), a backdated/clock-skewed
302    /// write lands physically after newer entries, and a `merge=union` clone
303    /// merge interleaves both sides until a later agent reorders. Out-of-order
304    /// is only a `LOG_OUT_OF_ORDER` warning, never rejected. So a newer entry
305    /// can sit physically *before* an older one; stopping at the first
306    /// older-than-`time` entry would silently drop those — the documented
307    /// curator warm-up (`dbmd log since <ts>`) would miss real recent work.
308    /// We therefore read every entry of each file we touch.
309    ///
310    /// Bounded cost: the active `log.md` is kept to the current month by
311    /// rotation, so a full read of it is cheap (the same read `tail` does for a
312    /// large `n`) and is not a whole-store walk. Across archives we *can* stop:
313    /// each `log/<YYYY-MM>.md` holds only entries from that month (rotation
314    /// buckets by the entry's own year-month), so an archive whose month is
315    /// strictly before `time`'s month cannot contain any entry newer than
316    /// `time`. We cross archives newest-month-first and stop at the first whose
317    /// month is entirely at or before `time`'s.
318    pub fn since(store: &Store, time: DateTime<FixedOffset>) -> crate::Result<Vec<LogEntry>> {
319        let mut collected: Vec<LogEntry> = Vec::new();
320
321        // Active file: scan fully, no early stop (out-of-order safe).
322        let active = active_log_path(store);
323        if active.exists() {
324            reverse_collect(&active, |e| {
325                if e.timestamp > time {
326                    collected.push(e);
327                }
328                false
329            })?;
330        }
331
332        // The cutoff's own (year, month): any archive strictly before it holds
333        // only older entries and is skippable.
334        let cutoff_ym = (time.year(), time.month());
335
336        for archive in list_archives_desc(store)? {
337            // Archives are newest-month-first; once a month is strictly before
338            // the cutoff's month, every remaining (older) archive is too.
339            if let Some(arch_ym) = archive_year_month(&archive) {
340                if arch_ym < cutoff_ym {
341                    break;
342                }
343            }
344            // Scan this archive fully — within a month, entries may still be
345            // out of order, so no within-file early stop.
346            reverse_collect(&archive, |e| {
347                if e.timestamp > time {
348                    collected.push(e);
349                }
350                false
351            })?;
352        }
353
354        collected.reverse();
355        Ok(collected)
356    }
357
358    /// The timestamp of the most recent `validate` entry — the default `since`
359    /// window for working-set validation ([`crate::validate::validate_working_set`]).
360    pub fn last_validate_at(store: &Store) -> crate::Result<Option<DateTime<FixedOffset>>> {
361        let mut found: Option<DateTime<FixedOffset>> = None;
362
363        let active = active_log_path(store);
364        if active.exists() {
365            reverse_collect(&active, |e| {
366                if e.kind == LogKind::Validate {
367                    found = Some(e.timestamp);
368                    true
369                } else {
370                    false
371                }
372            })?;
373        }
374
375        if found.is_none() {
376            for archive in list_archives_desc(store)? {
377                reverse_collect(&archive, |e| {
378                    if e.kind == LogKind::Validate {
379                        found = Some(e.timestamp);
380                        true
381                    } else {
382                        false
383                    }
384                })?;
385                if found.is_some() {
386                    break;
387                }
388            }
389        }
390
391        Ok(found)
392    }
393
394    /// Parse a single entry header (`## [YYYY-MM-DD HH:MM] <kind> | <object>`)
395    /// into its timestamp, kind, and object. Returns `None` if the line isn't a
396    /// well-formed entry header.
397    pub fn parse_header(line: &str) -> Option<(DateTime<FixedOffset>, LogKind, Option<String>)> {
398        let line = line.trim_end_matches(['\n', '\r']);
399        let rest = line.strip_prefix("## [")?;
400        let close = rest.find(']')?;
401        let ts_str = &rest[..close];
402        let timestamp = parse_timestamp(ts_str)?;
403
404        // Everything after the closing bracket: ` <kind> | <object>` or
405        // ` <kind>`.
406        let after = rest[close + 1..].trim();
407        if after.is_empty() {
408            return None;
409        }
410
411        let (kind_str, object) = match after.split_once('|') {
412            Some((k, o)) => {
413                let obj = o.trim();
414                let obj = if obj.is_empty() {
415                    None
416                } else {
417                    Some(obj.to_string())
418                };
419                (k.trim(), obj)
420            }
421            None => (after, None),
422        };
423
424        if kind_str.is_empty() {
425            return None;
426        }
427
428        Some((timestamp, LogKind::parse(kind_str), object))
429    }
430}
431
432// ── Internal helpers ────────────────────────────────────────────────────────
433
434/// A bounded window of the `n` entries with the largest timestamps, fed by a
435/// **reverse (newest-physical-first) scan** and used by [`Log::tail`].
436///
437/// Why this exists: the last `n` *physical* entries are the `n` newest only
438/// when the log is in non-decreasing time order. That's the append-only
439/// contract, not a guarantee — a backdated, clock-skewed, or merge-interleaved
440/// entry violates it (and trips the `LOG_OUT_OF_ORDER` validate warning). The
441/// window decouples `tail` from that assumption: it keeps the `n` largest
442/// timestamps seen regardless of the order they arrive in, so the caller can
443/// read each file fully (no fragile within-file early stop) and still get the
444/// true top `n`.
445///
446/// Tie-break: entries sharing a timestamp at the window boundary are ordered by
447/// **physical recency** — the one appended later (encountered earlier in the
448/// reverse scan, i.e. a smaller `arrival`) wins. "Newest" means most-recently
449/// recorded.
450struct NewestWindow {
451    cap: usize,
452    /// Min-by-(timestamp, then physical-oldest) heap: the root is always the
453    /// next entry to evict once the window is full.
454    heap: std::collections::BinaryHeap<WindowItem>,
455    /// Count of entries fed in, in reverse-scan order, used as the tie-break
456    /// key (0 = newest physical).
457    next_arrival: u64,
458}
459
460impl NewestWindow {
461    fn new(cap: usize) -> Self {
462        NewestWindow {
463            cap,
464            heap: std::collections::BinaryHeap::with_capacity(cap),
465            next_arrival: 0,
466        }
467    }
468
469    /// Offer one entry from the scan. If the window isn't full it's kept; once
470    /// full, it's kept (evicting the current minimum) iff its timestamp is `>=`
471    /// the window minimum. Equal-timestamp boundary entries resolve by physical
472    /// recency (see the type doc).
473    fn consider(&mut self, entry: LogEntry) {
474        let arrival = self.next_arrival;
475        self.next_arrival += 1;
476
477        if self.heap.len() < self.cap {
478            self.heap.push(WindowItem { entry, arrival });
479            return;
480        }
481
482        // Window full. The heap root is the current minimum (oldest-by-
483        // timestamp held; on a tie, the oldest-physical).
484        let root = self.heap.peek().expect("full window has a root");
485        if entry.timestamp > root.entry.timestamp {
486            // Strictly newer than the window minimum: it belongs; evict the min.
487            self.heap.pop();
488            self.heap.push(WindowItem { entry, arrival });
489        }
490        // On `<=` we keep the window as-is. `<` is plainly too old. `==` is the
491        // tie case: the scan is newest-physical-first, so this entry is
492        // physically *older* than the held one of equal timestamp, and the
493        // tie-break keeps the physically-newer (most-recently-recorded) entry —
494        // so the incoming one is dropped.
495    }
496
497    /// Whether the window already holds its full `cap` entries.
498    fn is_full(&self) -> bool {
499        self.heap.len() >= self.cap
500    }
501
502    /// The `(year, month)` of the window's current minimum (oldest kept) entry,
503    /// or `None` when the window is empty. Used to prune older archives: an
504    /// archive month strictly before this can't beat the current cutoff.
505    fn min_year_month(&self) -> Option<(i32, u32)> {
506        self.heap
507            .peek()
508            .map(|item| (item.entry.timestamp.year(), item.entry.timestamp.month()))
509    }
510
511    /// The held entries, oldest→newest (chronological), ties broken
512    /// oldest-physical→newest-physical.
513    fn into_sorted(self) -> Vec<LogEntry> {
514        let mut items: Vec<WindowItem> = self.heap.into_vec();
515        // Ascending by timestamp; on a tie, oldest-physical (larger arrival)
516        // first so the most-recently-recorded entry sorts last.
517        items.sort_by(|a, b| {
518            a.entry
519                .timestamp
520                .cmp(&b.entry.timestamp)
521                .then(b.arrival.cmp(&a.arrival))
522        });
523        items.into_iter().map(|i| i.entry).collect()
524    }
525}
526
527/// One slot in [`NewestWindow`]'s heap. `Ord` is defined so the heap is a
528/// **min-heap on `(timestamp, physical-oldest)`**: `BinaryHeap` is a max-heap,
529/// so the root (max under this `Ord`) is the eviction candidate — the smallest
530/// timestamp, and on a tie the oldest-physical (largest `arrival`).
531struct WindowItem {
532    entry: LogEntry,
533    arrival: u64,
534}
535
536impl PartialEq for WindowItem {
537    fn eq(&self, other: &Self) -> bool {
538        self.entry.timestamp == other.entry.timestamp && self.arrival == other.arrival
539    }
540}
541impl Eq for WindowItem {}
542
543impl Ord for WindowItem {
544    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
545        // Reverse on timestamp so the *smallest* timestamp is the heap max
546        // (eviction candidate). On equal timestamps, the larger `arrival`
547        // (older physical) is the heap max so it is evicted first.
548        other
549            .entry
550            .timestamp
551            .cmp(&self.entry.timestamp)
552            .then(self.arrival.cmp(&other.arrival))
553    }
554}
555impl PartialOrd for WindowItem {
556    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
557        Some(self.cmp(other))
558    }
559}
560
561/// The active `log.md` path under the store root.
562fn active_log_path(store: &Store) -> PathBuf {
563    store.root.join("log.md")
564}
565
566/// The `log/` archive directory under the store root.
567fn archive_dir(store: &Store) -> PathBuf {
568    store.root.join("log")
569}
570
571/// The `log/<YYYY-MM>.md` archive path for a given month.
572fn archive_path(store: &Store, year: i32, month: u32) -> PathBuf {
573    archive_dir(store).join(format!("{:04}-{:02}.md", year, month))
574}
575
576/// Parse a `YYYY-MM-DD HH:MM` header timestamp, reattaching UTC. `None` on any
577/// malformed shape.
578fn parse_timestamp(s: &str) -> Option<DateTime<FixedOffset>> {
579    let naive = NaiveDateTime::parse_from_str(s.trim(), TS_FORMAT).ok()?;
580    let utc = FixedOffset::east_opt(0)?;
581    utc.from_local_datetime(&naive).single()
582}
583
584/// Split a `log.md` / archive file into its leading frontmatter+heading block
585/// (everything up to and including the line before the first `## [` header) and
586/// its parsed entries. If there are no entries, the whole content is the header
587/// block.
588fn parse_active(content: &str) -> (String, Vec<LogEntry>) {
589    match find_first_header(content) {
590        Some(idx) => {
591            let header = content[..idx].to_string();
592            let entries = parse_entries(&content[idx..]);
593            (header, entries)
594        }
595        None => (content.to_string(), Vec::new()),
596    }
597}
598
599/// Byte offset of the first entry header (`## [` at the start of a line), or
600/// `None`.
601fn find_first_header(content: &str) -> Option<usize> {
602    if content.starts_with("## [") {
603        return Some(0);
604    }
605    content.match_indices("\n## [").next().map(|(i, _)| i + 1)
606}
607
608/// Parse every entry in a slice that begins at (or before, header-block
609/// included) a sequence of `## [` headers. Headers that fail to parse are
610/// skipped (their body folds into the previous valid entry's note is avoided —
611/// they simply start no new entry).
612fn parse_entries(text: &str) -> Vec<LogEntry> {
613    let mut entries: Vec<LogEntry> = Vec::new();
614    let mut cur_header: Option<(DateTime<FixedOffset>, LogKind, Option<String>)> = None;
615    let mut cur_note: Vec<&str> = Vec::new();
616
617    let flush = |entries: &mut Vec<LogEntry>,
618                 header: &mut Option<(DateTime<FixedOffset>, LogKind, Option<String>)>,
619                 note: &mut Vec<&str>| {
620        if let Some((timestamp, kind, object)) = header.take() {
621            let joined = note.join("\n");
622            let note_str = joined.trim_matches(['\n', '\r']).to_string();
623            entries.push(LogEntry {
624                timestamp,
625                kind,
626                object,
627                note: note_str,
628            });
629        }
630        note.clear();
631    };
632
633    for line in text.lines() {
634        if line.starts_with("## [") {
635            if let Some(parsed) = Log::parse_header(line) {
636                // Close the previous entry, start a new one.
637                flush(&mut entries, &mut cur_header, &mut cur_note);
638                cur_header = Some(parsed);
639                continue;
640            }
641            // Unparseable `## [` line: treat as body of the current entry.
642        }
643        if cur_header.is_some() {
644            cur_note.push(line);
645        }
646    }
647    flush(&mut entries, &mut cur_header, &mut cur_note);
648    entries
649}
650
651/// Recompose an active/archive file from a header block and an entry body.
652fn compose_active(header: &str, body: &str) -> String {
653    let mut out = String::new();
654    out.push_str(header);
655    if !header.is_empty() && !header.ends_with('\n') {
656        out.push('\n');
657    }
658    // Exactly one blank line between the heading block and the first entry.
659    if !header.is_empty() && !out.ends_with("\n\n") {
660        out.push('\n');
661    }
662    out.push_str(body);
663    out
664}
665
666/// Append entries to a month archive, creating it with `type: log` frontmatter
667/// if absent. Atomic (temp-file rename). Entries are appended in the given
668/// order (callers pass them already chronological within the month).
669fn append_to_archive(path: &Path, entries: &[LogEntry]) -> crate::Result<()> {
670    let mut body = String::new();
671    for e in entries {
672        body.push_str(&e.render());
673    }
674
675    if path.exists() {
676        let existing = fs::read_to_string(path)?;
677        let mut full = existing;
678        if !full.ends_with('\n') {
679            full.push('\n');
680        }
681        full.push_str(&body);
682        write_atomic(path, full.as_bytes())?;
683    } else {
684        if let Some(parent) = path.parent() {
685            fs::create_dir_all(parent)?;
686        }
687        let full = compose_active(LOG_FRONTMATTER, &body);
688        write_atomic(path, full.as_bytes())?;
689    }
690    Ok(())
691}
692
693/// Atomic write: write to a temp file in the same directory, fsync, then
694/// rename over the destination — so a concurrent reader never sees a
695/// half-written file. Mirrors the parser's write path.
696fn write_atomic(dest: &Path, bytes: &[u8]) -> crate::Result<()> {
697    let dir = dest.parent().unwrap_or_else(|| Path::new("."));
698    fs::create_dir_all(dir)?;
699
700    let file_name = dest
701        .file_name()
702        .and_then(|s| s.to_str())
703        .unwrap_or("log.md");
704    let (mut f, tmp) = create_temp_file(dir, file_name)?;
705
706    {
707        f.write_all(bytes)?;
708        f.sync_all()?;
709    }
710    // rename over the destination; clean up the temp on failure.
711    match fs::rename(&tmp, dest) {
712        Ok(()) => {
713            sync_parent_dir(dir);
714            Ok(())
715        }
716        Err(e) => {
717            let _ = fs::remove_file(&tmp);
718            Err(e.into())
719        }
720    }
721}
722
723fn create_temp_file(dir: &Path, file_name: &str) -> std::io::Result<(File, PathBuf)> {
724    use std::sync::atomic::{AtomicU64, Ordering};
725    use std::time::{SystemTime, UNIX_EPOCH};
726
727    static TMP_SEQ: AtomicU64 = AtomicU64::new(0);
728    let pid = std::process::id();
729    let nanos = SystemTime::now()
730        .duration_since(UNIX_EPOCH)
731        .map(|d| d.as_nanos())
732        .unwrap_or(0);
733
734    for _ in 0..128 {
735        let seq = TMP_SEQ.fetch_add(1, Ordering::Relaxed);
736        let tmp = dir.join(format!(".{file_name}.{pid}.{nanos}.{seq}.tmp"));
737        match fs::OpenOptions::new()
738            .write(true)
739            .create_new(true)
740            .open(&tmp)
741        {
742            Ok(file) => return Ok((file, tmp)),
743            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => continue,
744            Err(e) => return Err(e),
745        }
746    }
747
748    Err(std::io::Error::new(
749        std::io::ErrorKind::AlreadyExists,
750        "could not allocate a unique dbmd log temp file",
751    ))
752}
753
754fn sync_parent_dir(dir: &Path) {
755    if let Ok(parent) = File::open(dir) {
756        let _ = parent.sync_all();
757    }
758}
759
760/// Every `log/<YYYY-MM>.md` archive, sorted **newest month first**.
761fn list_archives_desc(store: &Store) -> crate::Result<Vec<PathBuf>> {
762    let dir = archive_dir(store);
763    if !dir.is_dir() {
764        return Ok(Vec::new());
765    }
766    let mut months: Vec<(String, PathBuf)> = Vec::new();
767    for entry in fs::read_dir(&dir)? {
768        let entry = entry?;
769        let path = entry.path();
770        if !path.is_file() {
771            continue;
772        }
773        let name = match path.file_name().and_then(|s| s.to_str()) {
774            Some(n) => n,
775            None => continue,
776        };
777        // Match `YYYY-MM.md`.
778        if let Some(stem) = name.strip_suffix(".md") {
779            if is_year_month(stem) {
780                months.push((stem.to_string(), path.clone()));
781            }
782        }
783    }
784    // `YYYY-MM` strings sort lexically == chronologically; reverse for newest
785    // first.
786    months.sort_by(|a, b| b.0.cmp(&a.0));
787    Ok(months.into_iter().map(|(_, p)| p).collect())
788}
789
790/// The `(year, month)` an archive file represents, parsed from its
791/// `log/<YYYY-MM>.md` name. `None` if the name isn't a well-formed month
792/// archive (in which case the caller scans it rather than risk skipping it).
793fn archive_year_month(path: &Path) -> Option<(i32, u32)> {
794    let stem = path
795        .file_name()
796        .and_then(|s| s.to_str())
797        .and_then(|n| n.strip_suffix(".md"))?;
798    if !is_year_month(stem) {
799        return None;
800    }
801    let year: i32 = stem[..4].parse().ok()?;
802    let month: u32 = stem[5..7].parse().ok()?;
803    Some((year, month))
804}
805
806/// True if `s` looks like `YYYY-MM` (4 digits, dash, 2 digits).
807fn is_year_month(s: &str) -> bool {
808    let bytes = s.as_bytes();
809    if bytes.len() != 7 {
810        return false;
811    }
812    bytes[..4].iter().all(u8::is_ascii_digit)
813        && bytes[4] == b'-'
814        && bytes[5].is_ascii_digit()
815        && bytes[6].is_ascii_digit()
816}
817
818/// Reverse-read `path` from EOF, parsing entries newest-first and feeding each
819/// to `take`. `take` returns `true` to stop early (enough collected). The file
820/// is read backward in blocks; only the tail region needed to satisfy `take`
821/// is read — the whole file is read only if `take` never returns `true`.
822fn reverse_collect<F>(path: &Path, mut take: F) -> crate::Result<()>
823where
824    F: FnMut(LogEntry) -> bool,
825{
826    let mut file = File::open(path)?;
827    let len = file.metadata()?.len();
828    if len == 0 {
829        return Ok(());
830    }
831
832    // Algorithm: grow a tail buffer leftward one block at a time, emitting
833    // entries strictly newest-first as their left boundary is confirmed, and
834    // stopping the instant `take` says enough. The whole file is read only if
835    // `take` never returns `true` (e.g. `tail(n)` with n ≥ entry count).
836    //
837    // Invariant: a `## [` line-start anywhere in the buffer is a *complete*
838    // entry — its header is the entry's first line, and its body lies to the
839    // right and is therefore already buffered (we read right-to-left). So we
840    // never split an entry across blocks.
841    //
842    // `buf` holds the file's bytes from absolute offset `start` (growing
843    // leftward toward 0) to EOF. `emitted_abs` records the absolute offsets of
844    // headers already handed to `take`, so re-deriving headers each block never
845    // double-emits.
846    let mut buf: Vec<u8> = Vec::new();
847    let mut start = len;
848    // O(1) membership: a `Vec` + `.contains()` here is O(E^2) across a large
849    // single-month file (every header re-scanned against all prior emissions).
850    let mut emitted_abs: std::collections::HashSet<u64> = std::collections::HashSet::new();
851    let mut stop = false;
852
853    while start > 0 && !stop {
854        let block = std::cmp::min(REVERSE_BLOCK as u64, start);
855        let new_start = start - block;
856        file.seek(SeekFrom::Start(new_start))?;
857        let mut chunk = vec![0u8; block as usize];
858        file.read_exact(&mut chunk)?;
859        chunk.extend_from_slice(&buf);
860        buf = chunk;
861        start = new_start;
862
863        // Find absolute offsets of every header line-start in the current
864        // buffer.
865        let headers = header_offsets(&buf, start);
866
867        // Process newest (largest offset) → oldest (smallest), emitting any
868        // header not yet emitted. Hold back only the buffer's *leftmost* header
869        // while we have not reached file start (`start > 0`): older entries may
870        // still lie to its left in unread blocks, and newest-first order
871        // requires we not emit it until we've confirmed it really is the oldest
872        // (or read enough to bound it on the left). One extra block read at
873        // most; on the next iteration its left boundary is in-buffer.
874        for i in (0..headers.len()).rev() {
875            let abs = headers[i];
876            if emitted_abs.contains(&abs) {
877                continue;
878            }
879            let is_oldest_in_buf = i == 0;
880            if is_oldest_in_buf && start > 0 {
881                continue;
882            }
883
884            let entry_text = entry_text_at(&buf, start, abs, &headers, i);
885            if let Some(entry) = parse_single_entry(&entry_text) {
886                emitted_abs.insert(abs);
887                if take(entry) {
888                    stop = true;
889                    break;
890                }
891            } else {
892                emitted_abs.insert(abs);
893            }
894        }
895    }
896
897    // Reached file start (or stopped). If we stopped, done. If we reached
898    // start, emit any held-back oldest header(s) now (start == 0 means the
899    // buffer's first header is genuinely the oldest).
900    if !stop && start == 0 {
901        let headers = header_offsets(&buf, start);
902        for i in (0..headers.len()).rev() {
903            let abs = headers[i];
904            if emitted_abs.contains(&abs) {
905                continue;
906            }
907            let entry_text = entry_text_at(&buf, start, abs, &headers, i);
908            if let Some(entry) = parse_single_entry(&entry_text) {
909                emitted_abs.insert(abs);
910                if take(entry) {
911                    break;
912                }
913            } else {
914                emitted_abs.insert(abs);
915            }
916        }
917    }
918
919    Ok(())
920}
921
922/// Absolute byte offsets of every `## [` line-start in `buf`, where `buf`
923/// begins at absolute offset `base`.
924fn header_offsets(buf: &[u8], base: u64) -> Vec<u64> {
925    const PAT: &[u8] = b"## [";
926    let mut out = Vec::new();
927    let n = buf.len();
928    let mut i = 0;
929    while i + PAT.len() <= n {
930        if &buf[i..i + PAT.len()] == PAT {
931            let at_line_start = i == 0 || buf[i - 1] == b'\n';
932            if at_line_start {
933                out.push(base + i as u64);
934                // skip ahead past this marker
935                i += PAT.len();
936                continue;
937            }
938        }
939        i += 1;
940    }
941    out
942}
943
944/// Extract the text of the entry whose header is at absolute offset
945/// `header_abs` (the `headers[idx]` entry), spanning to the next header (or
946/// buffer end). `buf` begins at absolute offset `base`.
947fn entry_text_at(buf: &[u8], base: u64, header_abs: u64, headers: &[u64], idx: usize) -> String {
948    let rel_start = (header_abs - base) as usize;
949    let rel_end = if idx + 1 < headers.len() {
950        (headers[idx + 1] - base) as usize
951    } else {
952        buf.len()
953    };
954    String::from_utf8_lossy(&buf[rel_start..rel_end]).into_owned()
955}
956
957/// Parse a single entry from a text block that begins at its header line.
958fn parse_single_entry(text: &str) -> Option<LogEntry> {
959    parse_entries(text).into_iter().next()
960}
961
962#[cfg(test)]
963mod tests {
964    use super::*;
965    use crate::parser::Config;
966    use std::fs;
967    use tempfile::TempDir;
968
969    /// Build a `Store` rooted at a fresh temp dir with a minimal `DB.md`.
970    /// Construct the `Store` struct directly so the test stays narrow and never
971    /// exercises the `Store::open` parser path.
972    fn temp_store() -> (TempDir, Store) {
973        let dir = tempfile::tempdir().expect("tempdir");
974        fs::write(dir.path().join("DB.md"), "---\ntype: db-md\n---\n").expect("write DB.md");
975        let store = Store {
976            root: dir.path().to_path_buf(),
977            config: Config::default(),
978        };
979        (dir, store)
980    }
981
982    /// A timestamp at UTC from `YYYY-MM-DD HH:MM` components.
983    fn ts(y: i32, mo: u32, d: u32, h: u32, mi: u32) -> DateTime<FixedOffset> {
984        let naive = chrono::NaiveDate::from_ymd_opt(y, mo, d)
985            .unwrap()
986            .and_hms_opt(h, mi, 0)
987            .unwrap();
988        FixedOffset::east_opt(0)
989            .unwrap()
990            .from_local_datetime(&naive)
991            .single()
992            .unwrap()
993    }
994
995    #[allow(clippy::too_many_arguments)] // test fixture builder; struct-ifying churns every call site
996    fn entry(
997        y: i32,
998        mo: u32,
999        d: u32,
1000        h: u32,
1001        mi: u32,
1002        kind: LogKind,
1003        object: Option<&str>,
1004        note: &str,
1005    ) -> LogEntry {
1006        LogEntry {
1007            timestamp: ts(y, mo, d, h, mi),
1008            kind,
1009            object: object.map(|s| s.to_string()),
1010            note: note.to_string(),
1011        }
1012    }
1013
1014    // ── parse_header ────────────────────────────────────────────────────────
1015
1016    #[test]
1017    fn parse_header_with_object() {
1018        let (t, k, o) =
1019            Log::parse_header("## [2026-05-27 10:00] ingest | sources/emails/x.eml").unwrap();
1020        assert_eq!(t, ts(2026, 5, 27, 10, 0));
1021        assert_eq!(k, LogKind::Ingest);
1022        assert_eq!(o.as_deref(), Some("sources/emails/x.eml"));
1023    }
1024
1025    #[test]
1026    fn parse_header_without_object_is_none_object() {
1027        let (t, k, o) = Log::parse_header("## [2026-05-27 10:20] validate").unwrap();
1028        assert_eq!(t, ts(2026, 5, 27, 10, 20));
1029        assert_eq!(k, LogKind::Validate);
1030        assert_eq!(o, None);
1031    }
1032
1033    #[test]
1034    fn parse_header_custom_kind_roundtrips_token() {
1035        let (_, k, o) = Log::parse_header("## [2026-05-27 10:00] proposal | records/x").unwrap();
1036        assert_eq!(k, LogKind::Custom("proposal".to_string()));
1037        assert!(!k.is_recognized());
1038        assert_eq!(o.as_deref(), Some("records/x"));
1039    }
1040
1041    #[test]
1042    fn parse_header_index_rebuild_hyphenated_kind() {
1043        let (_, k, _) = Log::parse_header("## [2026-05-27 10:00] index-rebuild").unwrap();
1044        assert_eq!(k, LogKind::IndexRebuild);
1045        assert_eq!(k.as_str(), "index-rebuild");
1046    }
1047
1048    #[test]
1049    fn parse_header_rejects_non_headers() {
1050        assert!(Log::parse_header("Not a header").is_none());
1051        assert!(Log::parse_header("# Curator log").is_none());
1052        assert!(Log::parse_header("## [garbage] ingest | x").is_none());
1053        assert!(Log::parse_header("## [2026-05-27 10:00]").is_none()); // no kind
1054                                                                       // A bracketed but non-timestamp date must be rejected (LOG_BAD_TIMESTAMP territory).
1055        assert!(Log::parse_header("## [2026-13-40 99:99] ingest | x").is_none());
1056    }
1057
1058    // ── kind round-trip ───────────────────────────────────────────────────────
1059
1060    #[test]
1061    fn kind_as_str_parse_roundtrip_for_all_recognized() {
1062        for k in [
1063            LogKind::Ingest,
1064            LogKind::Create,
1065            LogKind::Update,
1066            LogKind::Delete,
1067            LogKind::Rename,
1068            LogKind::Link,
1069            LogKind::Validate,
1070            LogKind::IndexRebuild,
1071            LogKind::Contradiction,
1072        ] {
1073            assert_eq!(LogKind::parse(k.as_str()), k);
1074            assert!(k.is_recognized());
1075        }
1076    }
1077
1078    // ── append: creation + frontmatter ───────────────────────────────────────
1079
1080    #[test]
1081    fn append_creates_log_with_frontmatter_and_entry() {
1082        let (_d, store) = temp_store();
1083        let e = entry(
1084            2026,
1085            5,
1086            27,
1087            10,
1088            0,
1089            LogKind::Ingest,
1090            Some("sources/emails/x.eml"),
1091            "Email received.",
1092        );
1093        Log::append(&store, &e).unwrap();
1094
1095        let content = fs::read_to_string(store.root.join("log.md")).unwrap();
1096        // type: log frontmatter present.
1097        assert!(
1098            content.starts_with("---\ntype: log\n---\n"),
1099            "missing log frontmatter; got:\n{content}"
1100        );
1101        // The entry header is rendered verbatim.
1102        assert!(content.contains("## [2026-05-27 10:00] ingest | sources/emails/x.eml"));
1103        assert!(content.contains("Email received."));
1104        // No archive dir created when nothing rotates.
1105        assert!(!store.root.join("log").exists());
1106    }
1107
1108    // ── append → tail → since round-trip ─────────────────────────────────────
1109
1110    #[test]
1111    fn append_tail_since_roundtrip() {
1112        let (_d, store) = temp_store();
1113        let e1 = entry(2026, 5, 27, 10, 0, LogKind::Ingest, Some("a"), "first");
1114        let e2 = entry(2026, 5, 27, 10, 5, LogKind::Create, Some("b"), "second");
1115        let e3 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("c"), "third");
1116        Log::append(&store, &e1).unwrap();
1117        Log::append(&store, &e2).unwrap();
1118        Log::append(&store, &e3).unwrap();
1119
1120        // tail(2) returns the two newest, in chronological order.
1121        let tail = Log::tail(&store, 2).unwrap();
1122        assert_eq!(tail.len(), 2);
1123        assert_eq!(tail[0], e2);
1124        assert_eq!(tail[1], e3);
1125
1126        // tail(n) larger than the log returns everything, chronologically.
1127        let all = Log::tail(&store, 99).unwrap();
1128        assert_eq!(all, vec![e1.clone(), e2.clone(), e3.clone()]);
1129
1130        // since(10:05) returns strictly-newer entries (excludes the 10:05 one).
1131        let since = Log::since(&store, ts(2026, 5, 27, 10, 5)).unwrap();
1132        assert_eq!(since, vec![e3.clone()]);
1133
1134        // since before everything returns all.
1135        let since_all = Log::since(&store, ts(2026, 5, 27, 9, 0)).unwrap();
1136        assert_eq!(since_all, vec![e1, e2, e3]);
1137    }
1138
1139    #[test]
1140    fn tail_zero_is_empty() {
1141        let (_d, store) = temp_store();
1142        Log::append(
1143            &store,
1144            &entry(2026, 5, 27, 10, 0, LogKind::Ingest, Some("a"), "x"),
1145        )
1146        .unwrap();
1147        assert!(Log::tail(&store, 0).unwrap().is_empty());
1148    }
1149
1150    #[test]
1151    fn tail_and_since_on_missing_log_are_empty() {
1152        let (_d, store) = temp_store();
1153        assert!(Log::tail(&store, 5).unwrap().is_empty());
1154        assert!(Log::since(&store, ts(2000, 1, 1, 0, 0)).unwrap().is_empty());
1155        assert!(Log::last_validate_at(&store).unwrap().is_none());
1156    }
1157
1158    #[test]
1159    fn since_exact_timestamp_is_exclusive() {
1160        let (_d, store) = temp_store();
1161        let e = entry(2026, 5, 27, 10, 0, LogKind::Validate, None, "PASS");
1162        Log::append(&store, &e).unwrap();
1163        // Equal timestamp must NOT be included (strictly newer).
1164        assert!(Log::since(&store, ts(2026, 5, 27, 10, 0))
1165            .unwrap()
1166            .is_empty());
1167    }
1168
1169    // ── since: out-of-order on disk (append-only correction / merge=union) ────
1170
1171    /// Write a `log.md` at the store root from `entries` in the EXACT given
1172    /// physical order, with the standard `type: log` frontmatter. Unlike
1173    /// [`Log::append`] (which always lands the newest entry at EOF), this lets a
1174    /// test author the non-monotonic on-disk shape the SPEC permits — a
1175    /// backdated corrective entry below the entry it corrects, or a
1176    /// `merge=union` interleave.
1177    fn write_raw_log(store: &Store, entries: &[LogEntry]) {
1178        let mut content = String::from(LOG_FRONTMATTER);
1179        content.push('\n');
1180        for e in entries {
1181            content.push_str(&e.render());
1182        }
1183        fs::write(store.root.join("log.md"), content).expect("write raw log.md");
1184    }
1185
1186    #[test]
1187    fn since_returns_newer_entries_even_when_disk_order_is_non_monotonic() {
1188        // The demonstrated regression: a curator appended a backdated CORRECTIVE
1189        // entry (10:00) below newer entries (10:10, 10:05), so the physical
1190        // on-disk order is 10:10, 10:05, 10:00 — newest-first, not chronological.
1191        // The append-only SPEC explicitly permits this ("append a corrective
1192        // entry below it"; out-of-order is only LOG_OUT_OF_ORDER, a warning).
1193        let (_d, store) = temp_store();
1194        let e_1010 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("c"), "newest");
1195        let e_1005 = entry(2026, 5, 27, 10, 5, LogKind::Create, Some("b"), "middle");
1196        let e_1000 = entry(
1197            2026,
1198            5,
1199            27,
1200            10,
1201            0,
1202            LogKind::Update,
1203            Some("a"),
1204            "backdated fix",
1205        );
1206        // Physical order on disk: 10:10, 10:05, then the backdated 10:00 LAST.
1207        write_raw_log(&store, &[e_1010, e_1005, e_1000]);
1208
1209        // since 10:02 must return BOTH entries strictly newer than 10:02
1210        // (10:05 and 10:10). The old early-stop hit the physically-last 10:00
1211        // entry (<= 10:02), stopped, and returned EMPTY — silently dropping the
1212        // two newer entries that sit earlier in the file.
1213        let got = Log::since(&store, ts(2026, 5, 27, 10, 2)).unwrap();
1214        let stamps: std::collections::BTreeSet<_> = got.iter().map(|e| e.timestamp).collect();
1215        assert_eq!(
1216            stamps,
1217            [ts(2026, 5, 27, 10, 5), ts(2026, 5, 27, 10, 10)]
1218                .into_iter()
1219                .collect(),
1220            "since(10:02) must include both 10:05 and 10:10 despite the backdated \
1221             10:00 entry sitting physically last, and exclude 10:00; got {got:?}"
1222        );
1223
1224        // A cutoff before everything still returns all three, regardless of the
1225        // scrambled disk order.
1226        let all = Log::since(&store, ts(2026, 5, 27, 9, 0)).unwrap();
1227        let all_stamps: std::collections::BTreeSet<_> = all.iter().map(|e| e.timestamp).collect();
1228        assert_eq!(
1229            all_stamps,
1230            [
1231                ts(2026, 5, 27, 10, 0),
1232                ts(2026, 5, 27, 10, 5),
1233                ts(2026, 5, 27, 10, 10),
1234            ]
1235            .into_iter()
1236            .collect()
1237        );
1238    }
1239
1240    #[test]
1241    fn since_crosses_archive_when_newer_entry_is_out_of_order_inside_it() {
1242        // Out-of-order INSIDE an archive month, with the cutoff landing in that
1243        // month. The April archive is authored newest-physical-first (04-20,
1244        // then a backdated 04-05 last); a naive early-stop on the first
1245        // older-than-cutoff entry would miss the later April entry. The active
1246        // file holds a clean May entry. Cutoff = mid-April.
1247        let (_d, store) = temp_store();
1248
1249        // Active file: one current-month (May) entry.
1250        let may = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may1");
1251        write_raw_log(&store, &[may]);
1252
1253        // April archive authored out of order: 04-20 first, backdated 04-05 last.
1254        let apr_late = entry(
1255            2026,
1256            4,
1257            20,
1258            9,
1259            0,
1260            LogKind::Create,
1261            Some("apr-b"),
1262            "apr-late",
1263        );
1264        let apr_early = entry(
1265            2026,
1266            4,
1267            5,
1268            9,
1269            0,
1270            LogKind::Ingest,
1271            Some("apr-a"),
1272            "apr-early",
1273        );
1274        let dir = store.root.join("log");
1275        fs::create_dir_all(&dir).unwrap();
1276        let mut arch = String::from(LOG_FRONTMATTER);
1277        arch.push('\n');
1278        arch.push_str(&apr_late.render());
1279        arch.push_str(&apr_early.render());
1280        fs::write(dir.join("2026-04.md"), arch).unwrap();
1281
1282        // since mid-April: the later April entry (04-20) AND the May entry must
1283        // come back; the early April entry (04-05) must not.
1284        let got = Log::since(&store, ts(2026, 4, 15, 0, 0)).unwrap();
1285        let stamps: std::collections::BTreeSet<_> = got.iter().map(|e| e.timestamp).collect();
1286        assert_eq!(
1287            stamps,
1288            [ts(2026, 4, 20, 9, 0), ts(2026, 5, 2, 8, 0)]
1289                .into_iter()
1290                .collect(),
1291            "since(mid-April) must include the out-of-order later April entry \
1292             and the May entry, and exclude the earlier April entry; got {got:?}"
1293        );
1294    }
1295
1296    // ── multi-line notes ──────────────────────────────────────────────────────
1297
1298    #[test]
1299    fn multiline_note_is_preserved() {
1300        let (_d, store) = temp_store();
1301        let e = entry(
1302            2026,
1303            5,
1304            27,
1305            10,
1306            0,
1307            LogKind::Create,
1308            Some("records/x"),
1309            "Line one.\nLine two.\nLine three.",
1310        );
1311        Log::append(&store, &e).unwrap();
1312        let got = Log::tail(&store, 1).unwrap();
1313        assert_eq!(got[0].note, "Line one.\nLine two.\nLine three.");
1314    }
1315
1316    #[test]
1317    fn empty_note_roundtrips_as_empty() {
1318        let (_d, store) = temp_store();
1319        let e = entry(2026, 5, 27, 10, 0, LogKind::Validate, None, "");
1320        Log::append(&store, &e).unwrap();
1321        let got = Log::tail(&store, 1).unwrap();
1322        assert_eq!(got[0], e);
1323        assert_eq!(got[0].note, "");
1324    }
1325
1326    // ── last_validate_at ─────────────────────────────────────────────────────
1327
1328    #[test]
1329    fn last_validate_at_finds_most_recent_validate() {
1330        let (_d, store) = temp_store();
1331        Log::append(
1332            &store,
1333            &entry(2026, 5, 27, 10, 0, LogKind::Validate, None, "first pass"),
1334        )
1335        .unwrap();
1336        Log::append(
1337            &store,
1338            &entry(2026, 5, 27, 10, 5, LogKind::Create, Some("a"), "made a"),
1339        )
1340        .unwrap();
1341        Log::append(
1342            &store,
1343            &entry(2026, 5, 27, 10, 10, LogKind::Validate, None, "second pass"),
1344        )
1345        .unwrap();
1346        Log::append(
1347            &store,
1348            &entry(2026, 5, 27, 10, 15, LogKind::Update, Some("a"), "edit a"),
1349        )
1350        .unwrap();
1351
1352        let last = Log::last_validate_at(&store).unwrap();
1353        assert_eq!(last, Some(ts(2026, 5, 27, 10, 10)));
1354    }
1355
1356    #[test]
1357    fn last_validate_at_none_when_no_validate() {
1358        let (_d, store) = temp_store();
1359        Log::append(
1360            &store,
1361            &entry(2026, 5, 27, 10, 0, LogKind::Create, Some("a"), "x"),
1362        )
1363        .unwrap();
1364        assert_eq!(Log::last_validate_at(&store).unwrap(), None);
1365    }
1366
1367    // ── month-boundary rotation ──────────────────────────────────────────────
1368
1369    #[test]
1370    fn rotation_rolls_prior_months_into_archives() {
1371        let (_d, store) = temp_store();
1372        // Two April entries and one May entry, all written while "current" was
1373        // their own month (append-only chronological order).
1374        let a1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
1375        let a2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
1376        Log::append(&store, &a1).unwrap();
1377        Log::append(&store, &a2).unwrap();
1378
1379        // Before rotation: no archive dir, both April entries in active.
1380        assert!(!store.root.join("log").exists());
1381
1382        // Appending a May entry must roll April into log/2026-04.md.
1383        let m1 = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may one");
1384        Log::append(&store, &m1).unwrap();
1385
1386        // Archive exists and holds both April entries with frontmatter.
1387        let arch_path = store.root.join("log").join("2026-04.md");
1388        assert!(arch_path.exists(), "expected April archive to be created");
1389        let arch = fs::read_to_string(&arch_path).unwrap();
1390        assert!(arch.starts_with("---\ntype: log\n---\n"));
1391        assert!(arch.contains("## [2026-04-10 09:00] ingest | apr-a"));
1392        assert!(arch.contains("## [2026-04-20 09:00] create | apr-b"));
1393        assert!(arch.contains("apr one"));
1394        assert!(arch.contains("apr two"));
1395
1396        // Active file now holds ONLY the May entry (no April entries).
1397        let active = fs::read_to_string(store.root.join("log.md")).unwrap();
1398        assert!(active.contains("## [2026-05-02 08:00] update | may-a"));
1399        assert!(
1400            !active.contains("apr-a") && !active.contains("apr-b"),
1401            "April entries must be gone from the active file; got:\n{active}"
1402        );
1403
1404        // The full timeline (archives ++ active) is intact and chronological.
1405        let all = Log::tail(&store, 99).unwrap();
1406        assert_eq!(all, vec![a1, a2, m1]);
1407    }
1408
1409    #[test]
1410    fn rotation_groups_distinct_prior_months_into_separate_archives() {
1411        let (_d, store) = temp_store();
1412        // March + April entries accumulate, then a May append rolls BOTH prior
1413        // months into their own archive files.
1414        let mar = entry(2026, 3, 5, 9, 0, LogKind::Ingest, Some("mar"), "march");
1415        let apr = entry(2026, 4, 5, 9, 0, LogKind::Create, Some("apr"), "april");
1416        Log::append(&store, &mar).unwrap();
1417        Log::append(&store, &apr).unwrap();
1418        // At this point April is current, March already rolled into its archive.
1419        assert!(store.root.join("log").join("2026-03.md").exists());
1420
1421        let may = entry(2026, 5, 5, 9, 0, LogKind::Update, Some("may"), "may");
1422        Log::append(&store, &may).unwrap();
1423
1424        assert!(store.root.join("log").join("2026-03.md").exists());
1425        assert!(store.root.join("log").join("2026-04.md").exists());
1426
1427        // Each archive holds only its own month.
1428        let mar_arch = fs::read_to_string(store.root.join("log").join("2026-03.md")).unwrap();
1429        let apr_arch = fs::read_to_string(store.root.join("log").join("2026-04.md")).unwrap();
1430        assert!(mar_arch.contains("mar") && !mar_arch.contains("apr"));
1431        assert!(apr_arch.contains("apr") && !apr_arch.contains("mar"));
1432
1433        // Active holds only May.
1434        let active = fs::read_to_string(store.root.join("log.md")).unwrap();
1435        assert!(active.contains("may") && !active.contains("mar") && !active.contains("apr"));
1436
1437        // Timeline intact and ordered across both archives + active.
1438        let all = Log::tail(&store, 99).unwrap();
1439        assert_eq!(all, vec![mar, apr, may]);
1440    }
1441
1442    #[test]
1443    fn tail_crosses_into_archive_when_n_spans_month_boundary() {
1444        let (_d, store) = temp_store();
1445        let a1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr1");
1446        let a2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr2");
1447        let m1 = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may1");
1448        let m2 = entry(2026, 5, 3, 8, 0, LogKind::Update, Some("may-b"), "may2");
1449        for e in [&a1, &a2, &m1, &m2] {
1450            Log::append(&store, e).unwrap();
1451        }
1452        // April is now archived; active holds only May. tail(3) must reach back
1453        // into the archive for the third-newest entry.
1454        let tail3 = Log::tail(&store, 3).unwrap();
1455        assert_eq!(tail3, vec![a2.clone(), m1.clone(), m2.clone()]);
1456
1457        // tail within the active month does NOT need the archive but is still
1458        // correct.
1459        let tail2 = Log::tail(&store, 2).unwrap();
1460        assert_eq!(tail2, vec![m1, m2]);
1461    }
1462
1463    #[test]
1464    fn since_crosses_into_archive_and_early_stops() {
1465        let (_d, store) = temp_store();
1466        let a1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr1");
1467        let a2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr2");
1468        let m1 = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may1");
1469        for e in [&a1, &a2, &m1] {
1470            Log::append(&store, e).unwrap();
1471        }
1472        // since a mid-April time: must include the later April entry (from the
1473        // archive) and the May entry, but not the earlier April one.
1474        let got = Log::since(&store, ts(2026, 4, 15, 0, 0)).unwrap();
1475        assert_eq!(got, vec![a2, m1]);
1476    }
1477
1478    #[test]
1479    fn last_validate_at_crosses_into_archive() {
1480        let (_d, store) = temp_store();
1481        // A validate in April, then non-validate work that rolls April away.
1482        Log::append(
1483            &store,
1484            &entry(2026, 4, 10, 9, 0, LogKind::Validate, None, "apr validate"),
1485        )
1486        .unwrap();
1487        Log::append(
1488            &store,
1489            &entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may work"),
1490        )
1491        .unwrap();
1492        // Active has only the May update; the most-recent validate lives in the
1493        // April archive and must still be found.
1494        let last = Log::last_validate_at(&store).unwrap();
1495        assert_eq!(last, Some(ts(2026, 4, 10, 9, 0)));
1496    }
1497
1498    // ── reverse-read correctness on a large (multi-block) log ────────────────
1499
1500    #[test]
1501    fn reverse_read_correct_on_large_single_month_log() {
1502        let (_d, store) = temp_store();
1503        // Append many same-month entries with chunky multi-line notes so the
1504        // file spans well past one REVERSE_BLOCK (8 KiB). Timestamps are
1505        // strictly increasing (a real append-only log is monotonic): each entry
1506        // is 3 minutes after the previous, all within June, so physical order
1507        // equals chronological order and the last-k-physical ARE the k-newest.
1508        let n = 400usize;
1509        let mut expected: Vec<LogEntry> = Vec::new();
1510        for i in 0..n {
1511            let total_min = (i as u32) * 3;
1512            let day = 1 + total_min / (24 * 60);
1513            let hour = (total_min / 60) % 24;
1514            let min = total_min % 60;
1515            // Unique, multi-line note to bulk up the file and detect mis-parses.
1516            let note = format!(
1517                "entry number {i}\nbody line A for {i}\nbody line B for {i} with padding {}",
1518                "x".repeat(40)
1519            );
1520            let e = entry(
1521                2026,
1522                6,
1523                day,
1524                hour,
1525                min,
1526                LogKind::Update,
1527                Some(&format!("records/item-{i:04}")),
1528                &note,
1529            );
1530            Log::append(&store, &e).unwrap();
1531            expected.push(e);
1532        }
1533
1534        // File must actually be multi-block to exercise the backward reader.
1535        let size = fs::metadata(store.root.join("log.md")).unwrap().len();
1536        assert!(
1537            size > (REVERSE_BLOCK as u64) * 2,
1538            "test log not large enough ({size} bytes) to exercise multi-block reverse-read"
1539        );
1540
1541        // tail(5) must equal the 5 newest, exactly.
1542        let tail5 = Log::tail(&store, 5).unwrap();
1543        assert_eq!(tail5, expected[n - 5..].to_vec());
1544
1545        // tail(50) must equal the 50 newest.
1546        let tail50 = Log::tail(&store, 50).unwrap();
1547        assert_eq!(tail50, expected[n - 50..].to_vec());
1548
1549        // tail(all) must reconstruct the whole timeline in order.
1550        let all = Log::tail(&store, n + 10).unwrap();
1551        assert_eq!(all.len(), n);
1552        assert_eq!(all, expected);
1553    }
1554
1555    // ── tail on OUT-OF-ORDER logs (newest-by-timestamp, not last-physical) ────
1556    //
1557    // The append-only contract is non-decreasing time order, but it's only a
1558    // `LOG_OUT_OF_ORDER` warning when violated (corrective entries land below
1559    // the entry they correct; backdated / clock-skewed writes; `merge=union`
1560    // clone merges). `tail N` must return the N newest *by timestamp*, never the
1561    // last N *physical* entries.
1562
1563    /// Write `log.md` verbatim from rendered entries in the given **physical
1564    /// (file) order**, bypassing `Log::append` so the test controls on-disk
1565    /// order exactly (append never reorders within a month, but this is the
1566    /// clearest way to pin a specific physical layout).
1567    fn write_log_physical(store: &Store, entries: &[LogEntry]) {
1568        let mut body = String::new();
1569        for e in entries {
1570            body.push_str(&e.render());
1571        }
1572        let full = compose_active(LOG_FRONTMATTER, &body);
1573        fs::write(store.root.join("log.md"), full).expect("write log.md");
1574    }
1575
1576    #[test]
1577    fn tail_returns_newest_by_timestamp_on_demonstrated_out_of_order_log() {
1578        // The exact case from the review finding: physical order 10:10, 10:05,
1579        // 10:00 (a backdated entry tail). The OLD code returned the last two
1580        // physical entries {10:05, 10:00}; the correct answer is the two newest
1581        // by time {10:05, 10:10}.
1582        let (_d, store) = temp_store();
1583        let e_1010 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("c"), "ten-ten");
1584        let e_1005 = entry(
1585            2026,
1586            5,
1587            27,
1588            10,
1589            5,
1590            LogKind::Create,
1591            Some("b"),
1592            "ten-oh-five",
1593        );
1594        let e_1000 = entry(2026, 5, 27, 10, 0, LogKind::Ingest, Some("a"), "ten-oh-oh");
1595        // Physical order: newest first, then the two older ones — out of order.
1596        write_log_physical(&store, &[e_1010.clone(), e_1005.clone(), e_1000.clone()]);
1597
1598        let tail2 = Log::tail(&store, 2).unwrap();
1599        assert_eq!(
1600            tail2,
1601            vec![e_1005.clone(), e_1010.clone()],
1602            "tail(2) must be the two NEWEST by timestamp (chronological), \
1603             not the last two physical entries"
1604        );
1605        // The newest entry must be present and the oldest absent.
1606        assert!(tail2.contains(&e_1010), "newest (10:10) must be included");
1607        assert!(!tail2.contains(&e_1000), "oldest (10:00) must be excluded");
1608
1609        // tail(1) is just the single newest.
1610        assert_eq!(Log::tail(&store, 1).unwrap(), vec![e_1010.clone()]);
1611        // tail(all) is the full set in chronological order.
1612        assert_eq!(Log::tail(&store, 99).unwrap(), vec![e_1000, e_1005, e_1010]);
1613    }
1614
1615    #[test]
1616    fn tail_no_early_stop_when_newer_entry_sits_before_an_older_one() {
1617        // Guards the unsound within-file early stop: a newer entry (10:50) sits
1618        // PHYSICALLY BEFORE a much older one (10:00). Reading newest-physical-
1619        // first, the scan meets 10:00 before 10:50; any "stop at the first entry
1620        // below the window minimum" rule would bail and drop 10:50.
1621        //
1622        // Physical (top→bottom): 10:55, 10:10, 10:50, 10:00.
1623        // Reverse-scan order:     10:00, 10:50, 10:10, 10:55.
1624        let (_d, store) = temp_store();
1625        let e55 = entry(2026, 5, 27, 10, 55, LogKind::Update, Some("x55"), "55");
1626        let e10 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("x10"), "10");
1627        let e50 = entry(2026, 5, 27, 10, 50, LogKind::Update, Some("x50"), "50");
1628        let e00 = entry(2026, 5, 27, 10, 0, LogKind::Update, Some("x00"), "00");
1629        write_log_physical(
1630            &store,
1631            &[e55.clone(), e10.clone(), e50.clone(), e00.clone()],
1632        );
1633
1634        // The two newest by timestamp are 10:55 and 10:50 — NOT the early-stop
1635        // victim 10:10, and NOT the last-physical 10:00.
1636        let tail2 = Log::tail(&store, 2).unwrap();
1637        assert_eq!(tail2, vec![e50.clone(), e55.clone()]);
1638
1639        let tail3 = Log::tail(&store, 3).unwrap();
1640        assert_eq!(tail3, vec![e10.clone(), e50.clone(), e55.clone()]);
1641    }
1642
1643    #[test]
1644    fn tail_orders_equal_timestamps_by_physical_recency() {
1645        // Three entries share 10:00; one is at 09:59. tail(2) must keep both
1646        // 10:00 entries, and among the equal pair the one appended LATER
1647        // (physically last) sorts last ("newest" = most-recently recorded).
1648        let (_d, store) = temp_store();
1649        let early = entry(2026, 5, 27, 9, 59, LogKind::Create, Some("early"), "before");
1650        let tie_a = entry(
1651            2026,
1652            5,
1653            27,
1654            10,
1655            0,
1656            LogKind::Update,
1657            Some("tie-a"),
1658            "first 10:00",
1659        );
1660        let tie_b = entry(
1661            2026,
1662            5,
1663            27,
1664            10,
1665            0,
1666            LogKind::Update,
1667            Some("tie-b"),
1668            "second 10:00",
1669        );
1670        // Physical append order: early, tie_a, tie_b.
1671        write_log_physical(&store, &[early.clone(), tie_a.clone(), tie_b.clone()]);
1672
1673        let tail2 = Log::tail(&store, 2).unwrap();
1674        assert_eq!(
1675            tail2,
1676            vec![tie_a.clone(), tie_b.clone()],
1677            "both 10:00 entries kept, physically-later one (tie_b) last; 09:59 dropped"
1678        );
1679        // tail(1) keeps only the most-recently-recorded of the equal pair.
1680        assert_eq!(Log::tail(&store, 1).unwrap(), vec![tie_b]);
1681    }
1682
1683    #[test]
1684    fn tail_finds_newest_across_a_backdated_entry_spanning_the_month_boundary() {
1685        // A backdated entry can land physically after newer entries even across
1686        // a rotation: append May entries, then a June entry (rolls May to its
1687        // archive), then append a May-dated correction — it goes into the ACTIVE
1688        // file, physically after June. tail must still rank by timestamp, so the
1689        // June entry stays newest and the backdated May entry is not mistaken
1690        // for the tail.
1691        let (_d, store) = temp_store();
1692        let may1 = entry(2026, 5, 10, 9, 0, LogKind::Ingest, Some("may-1"), "may one");
1693        let may2 = entry(2026, 5, 20, 9, 0, LogKind::Create, Some("may-2"), "may two");
1694        let jun1 = entry(2026, 6, 2, 8, 0, LogKind::Update, Some("jun-1"), "jun one");
1695        Log::append(&store, &may1).unwrap();
1696        Log::append(&store, &may2).unwrap();
1697        Log::append(&store, &jun1).unwrap(); // rotates May -> log/2026-05.md
1698        assert!(store.root.join("log").join("2026-05.md").exists());
1699
1700        // A backdated May correction, appended now: it lands in the active file
1701        // (its month May is not strictly before the active month June), so the
1702        // active file is physically [jun1, may_corr] — out of order.
1703        let may_corr = entry(
1704            2026,
1705            5,
1706            25,
1707            9,
1708            0,
1709            LogKind::Update,
1710            Some("may-2"),
1711            "may correction",
1712        );
1713        Log::append(&store, &may_corr).unwrap();
1714        let active = fs::read_to_string(store.root.join("log.md")).unwrap();
1715        assert!(
1716            active.contains("jun-1") && active.contains("may correction"),
1717            "backdated May entry should be in the active file alongside June; got:\n{active}"
1718        );
1719
1720        // The single newest by timestamp is the June entry, even though the
1721        // backdated May entry is physically last.
1722        assert_eq!(Log::tail(&store, 1).unwrap(), vec![jun1.clone()]);
1723
1724        // tail(2): the two newest by time are may_corr (05-25) and jun1 (06-02).
1725        let tail2 = Log::tail(&store, 2).unwrap();
1726        assert_eq!(tail2, vec![may_corr.clone(), jun1.clone()]);
1727
1728        // tail(3) must reach into the May archive for the third-newest (may2,
1729        // 05-20), proving archive crossing still works on an out-of-order store.
1730        let tail3 = Log::tail(&store, 3).unwrap();
1731        assert_eq!(tail3, vec![may2.clone(), may_corr.clone(), jun1.clone()]);
1732
1733        // tail(all) reconstructs the whole timeline in chronological order.
1734        let all = Log::tail(&store, 99).unwrap();
1735        assert_eq!(all, vec![may1, may2, may_corr, jun1]);
1736    }
1737
1738    #[test]
1739    fn parse_entries_skips_unparseable_header_folding_into_body() {
1740        // A `## [` line that is NOT a valid header should not start a new entry;
1741        // it folds into the preceding entry's note. This guards the
1742        // parse_entries header-validation branch.
1743        let text = "\
1744## [2026-05-27 10:00] create | records/x
1745Body mentions a literal: ## [not a real header here]
1746More body.
1747
1748## [2026-05-27 10:05] update | records/y
1749Second.
1750";
1751        let entries = parse_entries(text);
1752        assert_eq!(entries.len(), 2);
1753        assert_eq!(entries[0].kind, LogKind::Create);
1754        assert!(entries[0].note.contains("## [not a real header here]"));
1755        assert!(entries[0].note.contains("More body."));
1756        assert_eq!(entries[1].kind, LogKind::Update);
1757        assert_eq!(entries[1].note, "Second.");
1758    }
1759
1760    // ── append-only: corrective entries go on the end ─────────────────────────
1761
1762    #[test]
1763    fn append_only_corrective_entry_goes_on_end_without_rewriting() {
1764        let (_d, store) = temp_store();
1765        let original = entry(
1766            2026,
1767            5,
1768            27,
1769            10,
1770            0,
1771            LogKind::Update,
1772            Some("records/northstar"),
1773            "Seat count 120 -> 175.",
1774        );
1775        Log::append(&store, &original).unwrap();
1776        let after_first = fs::read_to_string(store.root.join("log.md")).unwrap();
1777
1778        // A correction is a NEW entry appended on the end; the original text is
1779        // left byte-for-byte intact (append-only contract: no rewrite API).
1780        let correction = entry(
1781            2026,
1782            5,
1783            27,
1784            11,
1785            0,
1786            LogKind::Update,
1787            Some("records/northstar"),
1788            "Correction: seat count is 165, not 175.",
1789        );
1790        Log::append(&store, &correction).unwrap();
1791        let after_second = fs::read_to_string(store.root.join("log.md")).unwrap();
1792
1793        assert!(
1794            after_second.starts_with(&after_first),
1795            "appending must not rewrite earlier bytes"
1796        );
1797        assert!(after_second.contains("Correction: seat count is 165, not 175."));
1798
1799        // Both entries are readable, in order.
1800        let all = Log::tail(&store, 99).unwrap();
1801        assert_eq!(all, vec![original, correction]);
1802    }
1803
1804    // ── concurrent append safety (atomic via temp-file rename) ────────────────
1805
1806    #[test]
1807    fn concurrent_appends_are_atomic_and_total() {
1808        use std::sync::{Arc, Barrier};
1809        use std::thread;
1810
1811        let (_d, store) = temp_store();
1812        // Seed the file so all threads take the read-modify-write path.
1813        Log::append(
1814            &store,
1815            &entry(2026, 7, 1, 0, 0, LogKind::Create, Some("seed"), "seed"),
1816        )
1817        .unwrap();
1818
1819        let threads = 8usize;
1820        let per = 25usize;
1821        let barrier = Arc::new(Barrier::new(threads));
1822        let store = Arc::new(store);
1823
1824        let mut handles = Vec::new();
1825        for tnum in 0..threads {
1826            let b = Arc::clone(&barrier);
1827            let s = Arc::clone(&store);
1828            handles.push(thread::spawn(move || {
1829                b.wait();
1830                for i in 0..per {
1831                    let e = entry(
1832                        2026,
1833                        7,
1834                        1,
1835                        (tnum % 24) as u32,
1836                        (i % 60) as u32,
1837                        LogKind::Update,
1838                        Some(&format!("t{tnum}-i{i}")),
1839                        &format!("thread {tnum} item {i}"),
1840                    );
1841                    Log::append(&s, &e).unwrap();
1842                }
1843            }));
1844        }
1845        for h in handles {
1846            h.join().unwrap();
1847        }
1848
1849        // The atomic temp-file-rename write means no append truncates or
1850        // corrupts another: the file must remain parseable and every line of
1851        // every entry header must be well-formed. Crucially, no entry should be
1852        // lost to a torn write of the *content already on disk* — though
1853        // interleaved read-modify-write WILL drop some appends (last-writer-
1854        // wins on the snapshot). We therefore assert integrity + that the file
1855        // never went empty / corrupt, not an exact count.
1856        let content = fs::read_to_string(store.root.join("log.md")).unwrap();
1857        assert!(content.starts_with("---\ntype: log\n---\n"));
1858
1859        // Every `## [` line must parse as a valid header (no half-written line).
1860        for line in content.lines() {
1861            if line.starts_with("## [") {
1862                assert!(
1863                    Log::parse_header(line).is_some(),
1864                    "corrupt/torn header line on disk: {line:?}"
1865                );
1866            }
1867        }
1868
1869        // The seed entry must survive (it was written before the race and
1870        // every snapshot included it).
1871        assert!(content.contains("## [2026-07-01 00:00] create | seed"));
1872
1873        // The reverse reader must still produce a clean, fully-parseable view.
1874        let all = Log::tail(&store, 10_000).unwrap();
1875        assert!(!all.is_empty());
1876        // No duplicate adjacent identical headers from a torn write: every
1877        // returned entry must have a recognized-or-custom kind and a parseable
1878        // timestamp (already guaranteed by parse), and the list must be
1879        // internally consistent (re-render → re-parse identity for each).
1880        for e in &all {
1881            let rendered = e.render();
1882            let reparsed = parse_single_entry(&rendered).unwrap();
1883            assert_eq!(&reparsed, e);
1884        }
1885    }
1886
1887    // ── render/parse identity ────────────────────────────────────────────────
1888
1889    #[test]
1890    fn render_then_parse_is_identity() {
1891        let cases = vec![
1892            entry(
1893                2026,
1894                1,
1895                2,
1896                3,
1897                4,
1898                LogKind::Ingest,
1899                Some("sources/a.eml"),
1900                "n",
1901            ),
1902            entry(
1903                2026,
1904                12,
1905                31,
1906                23,
1907                59,
1908                LogKind::Validate,
1909                None,
1910                "PASS - 0 errors",
1911            ),
1912            entry(
1913                2026,
1914                6,
1915                15,
1916                12,
1917                30,
1918                LogKind::Custom("proposal".to_string()),
1919                Some("records/p"),
1920                "multi\nline\nnote",
1921            ),
1922            entry(2026, 6, 15, 12, 30, LogKind::Contradiction, Some("obj"), ""),
1923        ];
1924        for e in cases {
1925            let rendered = e.render();
1926            let parsed = parse_single_entry(&rendered).unwrap_or_else(|| {
1927                panic!("failed to reparse rendered entry:\n{rendered}");
1928            });
1929            assert_eq!(parsed, e, "round-trip mismatch for {e:?}");
1930        }
1931    }
1932}