dbmd_core/log.rs
1//! `log` — the append-only, month-rotating chronological log.
2//!
3//! One logical timeline: the active `log.md` at the store root plus
4//! `log/<YYYY-MM>.md` archives. [`Log::append`] rolls older months into
5//! archives on write so the active file stays current-month. [`Log::tail`] and
6//! [`Log::since`] **reverse-read from EOF**. Both read each file they touch in
7//! full — the on-disk order is not guaranteed monotonic, so neither can
8//! early-stop within a file — and select by timestamp: `tail` keeps the `n`
9//! newest, `since` keeps everything newer than the cutoff. Both cross into
10//! month archives only as far back as the requested window reaches (by the
11//! cutoff's month for `since`, by the current `n`th-newest's month for `tail`)
12//! — never the whole history.
13//!
14//! Append-only contract: there is no rewrite API. Corrective entries go on the
15//! end; out-of-order timestamps are a validate warning (`LOG_OUT_OF_ORDER`),
16//! signalling a probable rewrite.
17
18use std::collections::BTreeMap;
19use std::fs::{self, File};
20use std::io::{Read, Seek, SeekFrom};
21use std::path::{Path, PathBuf};
22
23use chrono::{DateTime, Datelike, FixedOffset, NaiveDateTime, TimeZone, Utc};
24
25use crate::store::Store;
26
27/// The on-disk header timestamp format: `YYYY-MM-DD HH:MM` (minute precision,
28/// no timezone). Parsing reattaches UTC; emitting renders the entry's own
29/// wall-clock, so a read→write→read round-trip is stable at minute precision.
30const TS_FORMAT: &str = "%Y-%m-%d %H:%M";
31
32/// The frontmatter block written when the active `log.md` is created.
33const LOG_FRONTMATTER: &str = "---\ntype: log\n---\n\n# Curator log\n";
34
35/// Block size for the backward (reverse-from-EOF) reader.
36const REVERSE_BLOCK: usize = 8 * 1024;
37
38/// A recognized `log.md` entry kind. Custom kinds are valid in the format
39/// (`dbmd validate` warns on unrecognized via `LOG_UNKNOWN_KIND`); this enum
40/// carries the recognized vocabulary plus a [`LogKind::Custom`] catch-all so an
41/// unknown kind round-trips without loss.
42#[derive(Debug, Clone, PartialEq, Eq)]
43pub enum LogKind {
44 /// A source artifact was ingested.
45 Ingest,
46 /// A file was created.
47 Create,
48 /// A file was updated.
49 Update,
50 /// A file was deleted.
51 Delete,
52 /// A file was renamed/moved.
53 Rename,
54 /// A wiki-link was added.
55 Link,
56 /// A validation pass ran.
57 Validate,
58 /// The index was rebuilt.
59 IndexRebuild,
60 /// A contradiction between sources was flagged.
61 Contradiction,
62 /// Any kind outside the recognized vocabulary, preserved verbatim.
63 Custom(String),
64}
65
66impl LogKind {
67 /// The canonical lowercase string for this kind, as it appears in a log
68 /// header (`ingest`, `index-rebuild`, …).
69 pub fn as_str(&self) -> &str {
70 match self {
71 LogKind::Ingest => "ingest",
72 LogKind::Create => "create",
73 LogKind::Update => "update",
74 LogKind::Delete => "delete",
75 LogKind::Rename => "rename",
76 LogKind::Link => "link",
77 LogKind::Validate => "validate",
78 LogKind::IndexRebuild => "index-rebuild",
79 LogKind::Contradiction => "contradiction",
80 LogKind::Custom(s) => s,
81 }
82 }
83
84 /// Parse a kind from its header token; non-canonical tokens become
85 /// [`LogKind::Custom`].
86 pub fn parse(token: &str) -> LogKind {
87 match token {
88 "ingest" => LogKind::Ingest,
89 "create" => LogKind::Create,
90 "update" => LogKind::Update,
91 "delete" => LogKind::Delete,
92 "rename" => LogKind::Rename,
93 "link" => LogKind::Link,
94 "validate" => LogKind::Validate,
95 "index-rebuild" => LogKind::IndexRebuild,
96 "contradiction" => LogKind::Contradiction,
97 other => LogKind::Custom(other.to_string()),
98 }
99 }
100
101 /// True if this is one of the recognized kinds (i.e. not
102 /// [`LogKind::Custom`]).
103 pub fn is_recognized(&self) -> bool {
104 !matches!(self, LogKind::Custom(_))
105 }
106}
107
108/// One parsed `log.md` entry: a header
109/// (`## [YYYY-MM-DD HH:MM] <kind> | <object>`) plus its body.
110#[derive(Debug, Clone, PartialEq, Eq)]
111pub struct LogEntry {
112 /// The entry timestamp from the header.
113 pub timestamp: DateTime<FixedOffset>,
114 /// The entry kind.
115 pub kind: LogKind,
116 /// The object slot — a store-relative path/wiki-link target, or `None` for
117 /// store-wide actions like `validate`.
118 pub object: Option<String>,
119 /// The free-form body (one or more lines) explaining what happened.
120 pub note: String,
121}
122
123impl LogEntry {
124 /// Render this entry as it appears on disk: the `## [...]` header line,
125 /// then the note body, then a trailing blank line so successive entries are
126 /// separated. The note is emitted verbatim (trailing whitespace trimmed).
127 fn render(&self) -> String {
128 let ts = self.timestamp.format(TS_FORMAT);
129 let mut out = String::new();
130 match &self.object {
131 Some(obj) => {
132 out.push_str(&format!("## [{}] {} | {}\n", ts, self.kind.as_str(), obj));
133 }
134 None => {
135 out.push_str(&format!("## [{}] {}\n", ts, self.kind.as_str()));
136 }
137 }
138 let note = self.note.trim_end_matches(['\n', '\r', ' ', '\t']);
139 if !note.is_empty() {
140 out.push_str(note);
141 out.push('\n');
142 }
143 out.push('\n');
144 out
145 }
146
147 /// The `(year, month)` of this entry's wall-clock timestamp — the rotation
148 /// bucket.
149 fn year_month(&self) -> (i32, u32) {
150 (self.timestamp.year(), self.timestamp.month())
151 }
152}
153
154/// The store's chronological log: a thin handle for the append-only timeline.
155/// All methods take the [`Store`] so they resolve the active `log.md` and the
156/// `log/` archives under the store root.
157#[derive(Debug, Clone)]
158pub struct Log;
159
160impl Log {
161 /// Atomically append `entry` to the active `log.md`, creating it (with
162 /// `type: log` frontmatter) if absent. **If the active log holds entries
163 /// from a prior month, roll those older months into `log/<YYYY-MM>.md`
164 /// first** (atomic move), keeping the active file to the current month.
165 pub fn append(store: &Store, entry: &LogEntry) -> crate::Result<()> {
166 let active = active_log_path(store);
167
168 // Read the active file's current contents (if any). The "current month"
169 // is the month of the entry being appended (the newest in the timeline);
170 // every existing entry from a strictly-earlier month rolls to archives.
171 let current_ym = entry.year_month();
172
173 if active.exists() {
174 let content = fs::read_to_string(&active)?;
175 let (header, entries) = parse_active(&content);
176
177 // Partition existing entries into prior-month (roll out) and
178 // current-or-later (keep in the active file).
179 let mut by_month: BTreeMap<(i32, u32), Vec<LogEntry>> = BTreeMap::new();
180 let mut keep: Vec<LogEntry> = Vec::new();
181 for e in entries {
182 if e.year_month() < current_ym {
183 by_month.entry(e.year_month()).or_default().push(e);
184 } else {
185 keep.push(e);
186 }
187 }
188
189 if !by_month.is_empty() {
190 // Roll each prior month into its archive (atomic per-file),
191 // appending to any existing archive for that month.
192 let dir = archive_dir(store);
193 fs::create_dir_all(&dir)?;
194 for ((y, m), month_entries) in &by_month {
195 let path = archive_path(store, *y, *m);
196 append_to_archive(&path, month_entries)?;
197 }
198
199 // Rewrite the active file to the kept (current-month) entries
200 // plus the new entry — atomically.
201 let mut body = String::new();
202 for e in &keep {
203 body.push_str(&e.render());
204 }
205 body.push_str(&entry.render());
206 let full = compose_active(&header, &body);
207 crate::fsx::write_atomic(&active, full.as_bytes())?;
208 return Ok(());
209 }
210
211 // No rotation needed: plain atomic append of the rendered entry.
212 let mut full = content;
213 if !full.ends_with('\n') {
214 full.push('\n');
215 }
216 full.push_str(&entry.render());
217 crate::fsx::write_atomic(&active, full.as_bytes())?;
218 Ok(())
219 } else {
220 // Fresh log: frontmatter + the single entry.
221 if let Some(parent) = active.parent() {
222 fs::create_dir_all(parent)?;
223 }
224 let body = entry.render();
225 let full = compose_active(LOG_FRONTMATTER, &body);
226 crate::fsx::write_atomic(&active, full.as_bytes())?;
227 Ok(())
228 }
229 }
230
231 /// The `n` most-recent entries **by timestamp**, returned oldest→newest.
232 ///
233 /// **Out-of-order safety (mirrors [`Log::since`]).** The log is append-only
234 /// but *not* guaranteed to be in non-decreasing timestamp order on disk: a
235 /// corrective entry is appended below the entry it corrects, a
236 /// backdated/clock-skewed write lands physically after newer entries, and a
237 /// `merge=union` clone merge interleaves both sides until a later agent
238 /// reorders. Out-of-order is only a `LOG_OUT_OF_ORDER` warning, never
239 /// rejected. So the last `n` *physical* entries are **not** the `n` newest
240 /// by time — taking them would omit a genuinely-recent entry that sits
241 /// physically before an older one, and the documented curator warm-up
242 /// (`dbmd log tail 20`) would report a stale picture of what was done lately.
243 /// We therefore feed every entry of each file we touch through a bounded
244 /// newest-by-timestamp window and let it select the true top `n`.
245 ///
246 /// Bounded cost: the active `log.md` is kept to the current month by
247 /// rotation, so a full read of it is cheap and is not a whole-store walk.
248 /// Across archives we *can* prune: each `log/<YYYY-MM>.md` holds only entries
249 /// from that month (rotation buckets by the entry's own year-month), so once
250 /// the window is full, an archive whose month is strictly before the
251 /// window-minimum's month cannot contain any entry newer than the current
252 /// `n`th-newest. We cross archives newest-month-first and stop at the first
253 /// such archive.
254 pub fn tail(store: &Store, n: usize) -> crate::Result<Vec<LogEntry>> {
255 if n == 0 {
256 return Ok(Vec::new());
257 }
258
259 // A bounded window of the `n` entries with the largest timestamps. No
260 // within-file early stop: out-of-order entries mean a newer entry can
261 // sit physically before an older one, so each file is read fully.
262 let mut window = NewestWindow::new(n);
263
264 // Active file: scan fully (current-month-bounded by rotation).
265 let active = active_log_path(store);
266 if active.exists() {
267 reverse_collect(&active, |e| {
268 window.consider(e);
269 false
270 })?;
271 }
272
273 // Archives, newest-month-first. Once the window is full, an archive
274 // whose month is strictly before the window-minimum's month holds only
275 // entries older than the current cutoff, so it (and every older archive)
276 // is skippable.
277 for archive in list_archives_desc(store)? {
278 if let (true, Some(cutoff_ym), Some(arch_ym)) = (
279 window.is_full(),
280 window.min_year_month(),
281 archive_year_month(&archive),
282 ) {
283 if arch_ym < cutoff_ym {
284 break;
285 }
286 }
287 reverse_collect(&archive, |e| {
288 window.consider(e);
289 false
290 })?;
291 }
292
293 Ok(window.into_sorted())
294 }
295
296 /// Entries strictly newer than `time`, reverse-scanning active → archives.
297 ///
298 /// **No within-file early stop.** The log is append-only but *not*
299 /// guaranteed to be in non-decreasing timestamp order on disk: a corrective
300 /// entry is appended below the entry it corrects (SPEC: "if a finding is
301 /// wrong, append a corrective entry below it"), a backdated/clock-skewed
302 /// write lands physically after newer entries, and a `merge=union` clone
303 /// merge interleaves both sides until a later agent reorders. Out-of-order
304 /// is only a `LOG_OUT_OF_ORDER` warning, never rejected. So a newer entry
305 /// can sit physically *before* an older one; stopping at the first
306 /// older-than-`time` entry would silently drop those — the documented
307 /// curator warm-up (`dbmd log since <ts>`) would miss real recent work.
308 /// We therefore read every entry of each file we touch.
309 ///
310 /// Bounded cost: the active `log.md` is kept to the current month by
311 /// rotation, so a full read of it is cheap (the same read `tail` does for a
312 /// large `n`) and is not a whole-store walk. Across archives we *can* stop:
313 /// each `log/<YYYY-MM>.md` holds only entries from that month (rotation
314 /// buckets by the entry's own year-month), so an archive whose month is
315 /// strictly before `time`'s month cannot contain any entry newer than
316 /// `time`. We cross archives newest-month-first and stop at the first whose
317 /// month is entirely at or before `time`'s.
318 pub fn since(store: &Store, time: DateTime<FixedOffset>) -> crate::Result<Vec<LogEntry>> {
319 let mut collected: Vec<LogEntry> = Vec::new();
320
321 // Active file: scan fully, no early stop (out-of-order safe).
322 let active = active_log_path(store);
323 if active.exists() {
324 reverse_collect(&active, |e| {
325 if e.timestamp > time {
326 collected.push(e);
327 }
328 false
329 })?;
330 }
331
332 // The cutoff's own (year, month): any archive strictly before it holds
333 // only older entries and is skippable. Archive months are bucketed on
334 // the UTC calendar (on-disk timestamps are offset-free and re-read as
335 // UTC; rotation buckets by the entry's UTC year-month), so the pruning
336 // calendar must be UTC too. A non-UTC `since` offset (advertised in the
337 // CLI hint, e.g. `…T00:30:00+07:00`) whose local month differs from its
338 // UTC month would otherwise prune away an archive holding entries that
339 // are strictly newer than `time` — `time.year()/.month()` read the
340 // offset-LOCAL calendar, not UTC.
341 let cutoff_utc = time.with_timezone(&Utc);
342 let cutoff_ym = (cutoff_utc.year(), cutoff_utc.month());
343
344 for archive in list_archives_desc(store)? {
345 // Archives are newest-month-first; once a month is strictly before
346 // the cutoff's month, every remaining (older) archive is too.
347 if let Some(arch_ym) = archive_year_month(&archive) {
348 if arch_ym < cutoff_ym {
349 break;
350 }
351 }
352 // Scan this archive fully — within a month, entries may still be
353 // out of order, so no within-file early stop.
354 reverse_collect(&archive, |e| {
355 if e.timestamp > time {
356 collected.push(e);
357 }
358 false
359 })?;
360 }
361
362 collected.reverse();
363 Ok(collected)
364 }
365
366 /// The timestamp of the most recent `validate` entry — the default `since`
367 /// window for working-set validation ([`crate::validate::validate_working_set`]).
368 pub fn last_validate_at(store: &Store) -> crate::Result<Option<DateTime<FixedOffset>>> {
369 let mut found: Option<DateTime<FixedOffset>> = None;
370
371 let active = active_log_path(store);
372 if active.exists() {
373 reverse_collect(&active, |e| {
374 if e.kind == LogKind::Validate {
375 found = Some(e.timestamp);
376 true
377 } else {
378 false
379 }
380 })?;
381 }
382
383 if found.is_none() {
384 for archive in list_archives_desc(store)? {
385 reverse_collect(&archive, |e| {
386 if e.kind == LogKind::Validate {
387 found = Some(e.timestamp);
388 true
389 } else {
390 false
391 }
392 })?;
393 if found.is_some() {
394 break;
395 }
396 }
397 }
398
399 Ok(found)
400 }
401
402 /// Parse a single entry header (`## [YYYY-MM-DD HH:MM] <kind> | <object>`)
403 /// into its timestamp, kind, and object. Returns `None` if the line isn't a
404 /// well-formed entry header.
405 pub fn parse_header(line: &str) -> Option<(DateTime<FixedOffset>, LogKind, Option<String>)> {
406 let line = line.trim_end_matches(['\n', '\r']);
407 let rest = line.strip_prefix("## [")?;
408 let close = rest.find(']')?;
409 let ts_str = &rest[..close];
410 let timestamp = parse_timestamp(ts_str)?;
411
412 // Everything after the closing bracket: ` <kind> | <object>` or
413 // ` <kind>`.
414 let after = rest[close + 1..].trim();
415 if after.is_empty() {
416 return None;
417 }
418
419 let (kind_str, object) = match after.split_once('|') {
420 Some((k, o)) => {
421 let obj = o.trim();
422 let obj = if obj.is_empty() {
423 None
424 } else {
425 Some(obj.to_string())
426 };
427 (k.trim(), obj)
428 }
429 None => (after, None),
430 };
431
432 if kind_str.is_empty() {
433 return None;
434 }
435
436 Some((timestamp, LogKind::parse(kind_str), object))
437 }
438}
439
440// ── Internal helpers ────────────────────────────────────────────────────────
441
442/// A bounded window of the `n` entries with the largest timestamps, fed by a
443/// **reverse (newest-physical-first) scan** and used by [`Log::tail`].
444///
445/// Why this exists: the last `n` *physical* entries are the `n` newest only
446/// when the log is in non-decreasing time order. That's the append-only
447/// contract, not a guarantee — a backdated, clock-skewed, or merge-interleaved
448/// entry violates it (and trips the `LOG_OUT_OF_ORDER` validate warning). The
449/// window decouples `tail` from that assumption: it keeps the `n` largest
450/// timestamps seen regardless of the order they arrive in, so the caller can
451/// read each file fully (no fragile within-file early stop) and still get the
452/// true top `n`.
453///
454/// Tie-break: entries sharing a timestamp at the window boundary are ordered by
455/// **physical recency** — the one appended later (encountered earlier in the
456/// reverse scan, i.e. a smaller `arrival`) wins. "Newest" means most-recently
457/// recorded.
458struct NewestWindow {
459 cap: usize,
460 /// Min-by-(timestamp, then physical-oldest) heap: the root is always the
461 /// next entry to evict once the window is full.
462 heap: std::collections::BinaryHeap<WindowItem>,
463 /// Count of entries fed in, in reverse-scan order, used as the tie-break
464 /// key (0 = newest physical).
465 next_arrival: u64,
466}
467
468impl NewestWindow {
469 fn new(cap: usize) -> Self {
470 NewestWindow {
471 cap,
472 heap: std::collections::BinaryHeap::with_capacity(cap),
473 next_arrival: 0,
474 }
475 }
476
477 /// Offer one entry from the scan. If the window isn't full it's kept; once
478 /// full, it's kept (evicting the current minimum) iff its timestamp is `>=`
479 /// the window minimum. Equal-timestamp boundary entries resolve by physical
480 /// recency (see the type doc).
481 fn consider(&mut self, entry: LogEntry) {
482 let arrival = self.next_arrival;
483 self.next_arrival += 1;
484
485 if self.heap.len() < self.cap {
486 self.heap.push(WindowItem { entry, arrival });
487 return;
488 }
489
490 // Window full. The heap root is the current minimum (oldest-by-
491 // timestamp held; on a tie, the oldest-physical).
492 let root = self.heap.peek().expect("full window has a root");
493 if entry.timestamp > root.entry.timestamp {
494 // Strictly newer than the window minimum: it belongs; evict the min.
495 self.heap.pop();
496 self.heap.push(WindowItem { entry, arrival });
497 }
498 // On `<=` we keep the window as-is. `<` is plainly too old. `==` is the
499 // tie case: the scan is newest-physical-first, so this entry is
500 // physically *older* than the held one of equal timestamp, and the
501 // tie-break keeps the physically-newer (most-recently-recorded) entry —
502 // so the incoming one is dropped.
503 }
504
505 /// Whether the window already holds its full `cap` entries.
506 fn is_full(&self) -> bool {
507 self.heap.len() >= self.cap
508 }
509
510 /// The `(year, month)` of the window's current minimum (oldest kept) entry,
511 /// or `None` when the window is empty. Used to prune older archives: an
512 /// archive month strictly before this can't beat the current cutoff.
513 fn min_year_month(&self) -> Option<(i32, u32)> {
514 self.heap
515 .peek()
516 .map(|item| (item.entry.timestamp.year(), item.entry.timestamp.month()))
517 }
518
519 /// The held entries, oldest→newest (chronological), ties broken
520 /// oldest-physical→newest-physical.
521 fn into_sorted(self) -> Vec<LogEntry> {
522 let mut items: Vec<WindowItem> = self.heap.into_vec();
523 // Ascending by timestamp; on a tie, oldest-physical (larger arrival)
524 // first so the most-recently-recorded entry sorts last.
525 items.sort_by(|a, b| {
526 a.entry
527 .timestamp
528 .cmp(&b.entry.timestamp)
529 .then(b.arrival.cmp(&a.arrival))
530 });
531 items.into_iter().map(|i| i.entry).collect()
532 }
533}
534
535/// One slot in [`NewestWindow`]'s heap. `Ord` is defined so the heap is a
536/// **min-heap on `(timestamp, physical-oldest)`**: `BinaryHeap` is a max-heap,
537/// so the root (max under this `Ord`) is the eviction candidate — the smallest
538/// timestamp, and on a tie the oldest-physical (largest `arrival`).
539struct WindowItem {
540 entry: LogEntry,
541 arrival: u64,
542}
543
544impl PartialEq for WindowItem {
545 fn eq(&self, other: &Self) -> bool {
546 self.entry.timestamp == other.entry.timestamp && self.arrival == other.arrival
547 }
548}
549impl Eq for WindowItem {}
550
551impl Ord for WindowItem {
552 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
553 // Reverse on timestamp so the *smallest* timestamp is the heap max
554 // (eviction candidate). On equal timestamps, the larger `arrival`
555 // (older physical) is the heap max so it is evicted first.
556 other
557 .entry
558 .timestamp
559 .cmp(&self.entry.timestamp)
560 .then(self.arrival.cmp(&other.arrival))
561 }
562}
563impl PartialOrd for WindowItem {
564 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
565 Some(self.cmp(other))
566 }
567}
568
569/// The active `log.md` path under the store root.
570fn active_log_path(store: &Store) -> PathBuf {
571 store.root.join("log.md")
572}
573
574/// The `log/` archive directory under the store root.
575fn archive_dir(store: &Store) -> PathBuf {
576 store.root.join("log")
577}
578
579/// The `log/<YYYY-MM>.md` archive path for a given month.
580fn archive_path(store: &Store, year: i32, month: u32) -> PathBuf {
581 archive_dir(store).join(format!("{:04}-{:02}.md", year, month))
582}
583
584/// Parse a `YYYY-MM-DD HH:MM` header timestamp, reattaching UTC. `None` on any
585/// malformed shape.
586fn parse_timestamp(s: &str) -> Option<DateTime<FixedOffset>> {
587 let naive = NaiveDateTime::parse_from_str(s.trim(), TS_FORMAT).ok()?;
588 let utc = FixedOffset::east_opt(0)?;
589 utc.from_local_datetime(&naive).single()
590}
591
592/// Split a `log.md` / archive file into its leading frontmatter+heading block
593/// (everything up to and including the line before the first `## [` header) and
594/// its parsed entries. If there are no entries, the whole content is the header
595/// block.
596fn parse_active(content: &str) -> (String, Vec<LogEntry>) {
597 match find_first_header(content) {
598 Some(idx) => {
599 let header = content[..idx].to_string();
600 let entries = parse_entries(&content[idx..]);
601 (header, entries)
602 }
603 None => (content.to_string(), Vec::new()),
604 }
605}
606
607/// Byte offset of the first entry header (`## [` at the start of a line), or
608/// `None`.
609fn find_first_header(content: &str) -> Option<usize> {
610 if content.starts_with("## [") {
611 return Some(0);
612 }
613 content.match_indices("\n## [").next().map(|(i, _)| i + 1)
614}
615
616/// Parse every entry in a slice that begins at (or before, header-block
617/// included) a sequence of `## [` headers. Headers that fail to parse are
618/// skipped (their body folds into the previous valid entry's note is avoided —
619/// they simply start no new entry).
620fn parse_entries(text: &str) -> Vec<LogEntry> {
621 let mut entries: Vec<LogEntry> = Vec::new();
622 let mut cur_header: Option<(DateTime<FixedOffset>, LogKind, Option<String>)> = None;
623 let mut cur_note: Vec<&str> = Vec::new();
624
625 let flush = |entries: &mut Vec<LogEntry>,
626 header: &mut Option<(DateTime<FixedOffset>, LogKind, Option<String>)>,
627 note: &mut Vec<&str>| {
628 if let Some((timestamp, kind, object)) = header.take() {
629 let joined = note.join("\n");
630 let note_str = joined.trim_matches(['\n', '\r']).to_string();
631 entries.push(LogEntry {
632 timestamp,
633 kind,
634 object,
635 note: note_str,
636 });
637 }
638 note.clear();
639 };
640
641 for line in text.lines() {
642 if line.starts_with("## [") {
643 if let Some(parsed) = Log::parse_header(line) {
644 // Close the previous entry, start a new one.
645 flush(&mut entries, &mut cur_header, &mut cur_note);
646 cur_header = Some(parsed);
647 continue;
648 }
649 // Unparseable `## [` line: treat as body of the current entry.
650 }
651 if cur_header.is_some() {
652 cur_note.push(line);
653 }
654 }
655 flush(&mut entries, &mut cur_header, &mut cur_note);
656 entries
657}
658
659/// Recompose an active/archive file from a header block and an entry body.
660fn compose_active(header: &str, body: &str) -> String {
661 let mut out = String::new();
662 out.push_str(header);
663 if !header.is_empty() && !header.ends_with('\n') {
664 out.push('\n');
665 }
666 // Exactly one blank line between the heading block and the first entry.
667 if !header.is_empty() && !out.ends_with("\n\n") {
668 out.push('\n');
669 }
670 out.push_str(body);
671 out
672}
673
674/// Append entries to a month archive, creating it with `type: log` frontmatter
675/// if absent. Atomic (temp-file rename). Entries are appended in the given
676/// order (callers pass them already chronological within the month).
677///
678/// **Idempotent re-roll.** Rotation in [`Log::append`] is two non-atomic durable
679/// writes — roll prior-month entries into the archive, *then* rewrite the active
680/// file. If the process crashes or the active rewrite errors (e.g. ENOSPC,
681/// permission) *after* the archive write commits, the prior-month entries remain
682/// in the still-untrimmed active file, and `Log::append` surfaces the error so
683/// the agent retries. The retry re-partitions the same prior-month entries and
684/// re-rolls them here — so a naive concatenate would duplicate every entry in
685/// the month archive, amplifying on each retry, with no validate check to detect
686/// or repair it (the log is primary, no-rewrite data). To make the re-roll a
687/// no-op, we skip any incoming entry already present verbatim in the archive,
688/// keyed on the full entry identity `(timestamp, kind, object, note)`.
689fn append_to_archive(path: &Path, entries: &[LogEntry]) -> crate::Result<()> {
690 if path.exists() {
691 let existing = fs::read_to_string(path)?;
692 // Identities already on disk in this archive, so an interrupted-then-
693 // retried rotation re-rolling identical entries adds nothing.
694 let (_header, existing_entries) = parse_active(&existing);
695 let present: std::collections::HashSet<EntryKey> =
696 existing_entries.iter().map(entry_key).collect();
697
698 let mut body = String::new();
699 for e in entries {
700 if present.contains(&entry_key(e)) {
701 continue;
702 }
703 body.push_str(&e.render());
704 }
705 // Nothing new to add (a fully-duplicate re-roll): leave the archive
706 // byte-for-byte untouched (append-only: don't rewrite identical data).
707 if body.is_empty() {
708 return Ok(());
709 }
710
711 let mut full = existing;
712 if !full.ends_with('\n') {
713 full.push('\n');
714 }
715 full.push_str(&body);
716 crate::fsx::write_atomic(path, full.as_bytes())?;
717 } else {
718 let mut body = String::new();
719 for e in entries {
720 body.push_str(&e.render());
721 }
722 if let Some(parent) = path.parent() {
723 fs::create_dir_all(parent)?;
724 }
725 let full = compose_active(LOG_FRONTMATTER, &body);
726 crate::fsx::write_atomic(path, full.as_bytes())?;
727 }
728 Ok(())
729}
730
731/// A hashable identity for a log entry, used to dedup an idempotent archive
732/// re-roll (see [`append_to_archive`]). Two entries are "the same" when their
733/// timestamp, kind, object, and note all match — exactly the fields that
734/// round-trip through `render`/`parse`, so a re-rolled entry compares equal to
735/// the one already archived. Owned (rather than borrowed) so keys from the
736/// existing archive and from the incoming entries share one type regardless of
737/// where they came from; the cost is paid only on the cold rotation path.
738type EntryKey = (DateTime<FixedOffset>, String, Option<String>, String);
739
740/// Derive the dedup key for `e` (see [`EntryKey`]). Keying on `kind.as_str()`
741/// (rather than `LogKind`, which is not `Hash`) is exact: `as_str`/`parse`
742/// round-trips every recognized kind and preserves any `Custom` token.
743fn entry_key(e: &LogEntry) -> EntryKey {
744 (
745 e.timestamp,
746 e.kind.as_str().to_string(),
747 e.object.clone(),
748 e.note.clone(),
749 )
750}
751
752/// Every `log/<YYYY-MM>.md` archive, sorted **newest month first**.
753fn list_archives_desc(store: &Store) -> crate::Result<Vec<PathBuf>> {
754 let dir = archive_dir(store);
755 if !dir.is_dir() {
756 return Ok(Vec::new());
757 }
758 let mut months: Vec<(String, PathBuf)> = Vec::new();
759 for entry in fs::read_dir(&dir)? {
760 let entry = entry?;
761 let path = entry.path();
762 if !path.is_file() {
763 continue;
764 }
765 let name = match path.file_name().and_then(|s| s.to_str()) {
766 Some(n) => n,
767 None => continue,
768 };
769 // Match `YYYY-MM.md`.
770 if let Some(stem) = name.strip_suffix(".md") {
771 if is_year_month(stem) {
772 months.push((stem.to_string(), path.clone()));
773 }
774 }
775 }
776 // `YYYY-MM` strings sort lexically == chronologically; reverse for newest
777 // first.
778 months.sort_by(|a, b| b.0.cmp(&a.0));
779 Ok(months.into_iter().map(|(_, p)| p).collect())
780}
781
782/// The `(year, month)` an archive file represents, parsed from its
783/// `log/<YYYY-MM>.md` name. `None` if the name isn't a well-formed month
784/// archive (in which case the caller scans it rather than risk skipping it).
785fn archive_year_month(path: &Path) -> Option<(i32, u32)> {
786 let stem = path
787 .file_name()
788 .and_then(|s| s.to_str())
789 .and_then(|n| n.strip_suffix(".md"))?;
790 if !is_year_month(stem) {
791 return None;
792 }
793 let year: i32 = stem[..4].parse().ok()?;
794 let month: u32 = stem[5..7].parse().ok()?;
795 Some((year, month))
796}
797
798/// True if `s` looks like `YYYY-MM` (4 digits, dash, 2 digits).
799fn is_year_month(s: &str) -> bool {
800 let bytes = s.as_bytes();
801 if bytes.len() != 7 {
802 return false;
803 }
804 bytes[..4].iter().all(u8::is_ascii_digit)
805 && bytes[4] == b'-'
806 && bytes[5].is_ascii_digit()
807 && bytes[6].is_ascii_digit()
808}
809
810/// Reverse-read `path` from EOF, parsing entries newest-first and feeding each
811/// to `take`. `take` returns `true` to stop early (enough collected). The file
812/// is read backward in blocks; only the tail region needed to satisfy `take`
813/// is read — the whole file is read only if `take` never returns `true`.
814fn reverse_collect<F>(path: &Path, mut take: F) -> crate::Result<()>
815where
816 F: FnMut(LogEntry) -> bool,
817{
818 let mut file = File::open(path)?;
819 let len = file.metadata()?.len();
820 if len == 0 {
821 return Ok(());
822 }
823
824 // Algorithm: grow a tail buffer leftward one block at a time, emitting
825 // entries strictly newest-first as their left boundary is confirmed, and
826 // stopping the instant `take` says enough. The whole file is read only if
827 // `take` never returns `true` (e.g. `tail(n)` with n ≥ entry count).
828 //
829 // Invariant: a `## [` line-start anywhere in the buffer is a *complete*
830 // entry — its header is the entry's first line, and its body lies to the
831 // right and is therefore already buffered (we read right-to-left). So we
832 // never split an entry across blocks.
833 //
834 // `buf` holds the file's bytes from absolute offset `start` (growing
835 // leftward toward 0) to EOF. `emitted_abs` records the absolute offsets of
836 // headers already handed to `take`, so re-deriving headers each block never
837 // double-emits.
838 let mut buf: Vec<u8> = Vec::new();
839 let mut start = len;
840 // O(1) membership: a `Vec` + `.contains()` here is O(E^2) across a large
841 // single-month file (every header re-scanned against all prior emissions).
842 let mut emitted_abs: std::collections::HashSet<u64> = std::collections::HashSet::new();
843 let mut stop = false;
844
845 while start > 0 && !stop {
846 let block = std::cmp::min(REVERSE_BLOCK as u64, start);
847 let new_start = start - block;
848 file.seek(SeekFrom::Start(new_start))?;
849 let mut chunk = vec![0u8; block as usize];
850 file.read_exact(&mut chunk)?;
851 chunk.extend_from_slice(&buf);
852 buf = chunk;
853 start = new_start;
854
855 // Find absolute offsets of every header line-start in the current
856 // buffer.
857 let headers = header_offsets(&buf, start);
858
859 // Process newest (largest offset) → oldest (smallest), emitting any
860 // header not yet emitted. Hold back only the buffer's *leftmost* header
861 // while we have not reached file start (`start > 0`): older entries may
862 // still lie to its left in unread blocks, and newest-first order
863 // requires we not emit it until we've confirmed it really is the oldest
864 // (or read enough to bound it on the left). One extra block read at
865 // most; on the next iteration its left boundary is in-buffer.
866 for i in (0..headers.len()).rev() {
867 let abs = headers[i];
868 if emitted_abs.contains(&abs) {
869 continue;
870 }
871 let is_oldest_in_buf = i == 0;
872 if is_oldest_in_buf && start > 0 {
873 continue;
874 }
875
876 let entry_text = entry_text_at(&buf, start, abs, &headers, i);
877 if let Some(entry) = parse_single_entry(&entry_text) {
878 emitted_abs.insert(abs);
879 if take(entry) {
880 stop = true;
881 break;
882 }
883 } else {
884 emitted_abs.insert(abs);
885 }
886 }
887 }
888
889 // Reached file start (or stopped). If we stopped, done. If we reached
890 // start, emit any held-back oldest header(s) now (start == 0 means the
891 // buffer's first header is genuinely the oldest).
892 if !stop && start == 0 {
893 let headers = header_offsets(&buf, start);
894 for i in (0..headers.len()).rev() {
895 let abs = headers[i];
896 if emitted_abs.contains(&abs) {
897 continue;
898 }
899 let entry_text = entry_text_at(&buf, start, abs, &headers, i);
900 if let Some(entry) = parse_single_entry(&entry_text) {
901 emitted_abs.insert(abs);
902 if take(entry) {
903 break;
904 }
905 } else {
906 emitted_abs.insert(abs);
907 }
908 }
909 }
910
911 Ok(())
912}
913
914/// Absolute byte offsets of every **valid** entry-header line-start (`## […]`)
915/// in `buf`, where `buf` begins at absolute offset `base`.
916///
917/// Only a `## [` line that [`Log::parse_header`] accepts is an entry boundary,
918/// mirroring the forward parser ([`parse_entries`]), which folds an unparseable
919/// `## [` line into the preceding entry's note rather than starting a new entry.
920/// Without this validity check the reverse reader would split a real entry's
921/// multi-line note at a continuation line beginning at column 0 with `## [`
922/// (a shape the SPEC permits — notes are "one or more lines" with no
923/// restriction), truncating the note and dropping the carved pseudo-entry, so
924/// `tail`/`since`/`last_validate_at` would return a note diverging from the
925/// intact on-disk bytes.
926fn header_offsets(buf: &[u8], base: u64) -> Vec<u64> {
927 const PAT: &[u8] = b"## [";
928 let mut out = Vec::new();
929 let n = buf.len();
930 let mut i = 0;
931 while i + PAT.len() <= n {
932 if &buf[i..i + PAT.len()] == PAT {
933 let at_line_start = i == 0 || buf[i - 1] == b'\n';
934 if at_line_start && is_valid_header_line(buf, i) {
935 out.push(base + i as u64);
936 // skip ahead past this marker
937 i += PAT.len();
938 continue;
939 }
940 }
941 i += 1;
942 }
943 out
944}
945
946/// Whether the `## [` line starting at byte `i` in `buf` parses as a valid
947/// entry header. Reads the line up to (but not including) the next `\n` (or
948/// buffer end) and defers to [`Log::parse_header`] — the same validity gate the
949/// forward parser applies, keeping the reverse reader's boundary set identical
950/// to the forward one.
951fn is_valid_header_line(buf: &[u8], i: usize) -> bool {
952 let line_end = buf[i..]
953 .iter()
954 .position(|&b| b == b'\n')
955 .map(|p| i + p)
956 .unwrap_or(buf.len());
957 let line = String::from_utf8_lossy(&buf[i..line_end]);
958 Log::parse_header(&line).is_some()
959}
960
961/// Extract the text of the entry whose header is at absolute offset
962/// `header_abs` (the `headers[idx]` entry), spanning to the next header (or
963/// buffer end). `buf` begins at absolute offset `base`.
964fn entry_text_at(buf: &[u8], base: u64, header_abs: u64, headers: &[u64], idx: usize) -> String {
965 let rel_start = (header_abs - base) as usize;
966 let rel_end = if idx + 1 < headers.len() {
967 (headers[idx + 1] - base) as usize
968 } else {
969 buf.len()
970 };
971 String::from_utf8_lossy(&buf[rel_start..rel_end]).into_owned()
972}
973
974/// Parse a single entry from a text block that begins at its header line.
975fn parse_single_entry(text: &str) -> Option<LogEntry> {
976 parse_entries(text).into_iter().next()
977}
978
979#[cfg(test)]
980mod tests {
981 use super::*;
982 use crate::parser::Config;
983 use std::fs;
984 use tempfile::TempDir;
985
986 /// Build a `Store` rooted at a fresh temp dir with a minimal `DB.md`.
987 /// Construct the `Store` struct directly so the test stays narrow and never
988 /// exercises the `Store::open` parser path.
989 fn temp_store() -> (TempDir, Store) {
990 let dir = tempfile::tempdir().expect("tempdir");
991 fs::write(dir.path().join("DB.md"), "---\ntype: db-md\n---\n").expect("write DB.md");
992 let store = Store {
993 root: dir.path().to_path_buf(),
994 config: Config::default(),
995 };
996 (dir, store)
997 }
998
999 /// A timestamp at UTC from `YYYY-MM-DD HH:MM` components.
1000 fn ts(y: i32, mo: u32, d: u32, h: u32, mi: u32) -> DateTime<FixedOffset> {
1001 let naive = chrono::NaiveDate::from_ymd_opt(y, mo, d)
1002 .unwrap()
1003 .and_hms_opt(h, mi, 0)
1004 .unwrap();
1005 FixedOffset::east_opt(0)
1006 .unwrap()
1007 .from_local_datetime(&naive)
1008 .single()
1009 .unwrap()
1010 }
1011
1012 #[allow(clippy::too_many_arguments)] // test fixture builder; struct-ifying churns every call site
1013 fn entry(
1014 y: i32,
1015 mo: u32,
1016 d: u32,
1017 h: u32,
1018 mi: u32,
1019 kind: LogKind,
1020 object: Option<&str>,
1021 note: &str,
1022 ) -> LogEntry {
1023 LogEntry {
1024 timestamp: ts(y, mo, d, h, mi),
1025 kind,
1026 object: object.map(|s| s.to_string()),
1027 note: note.to_string(),
1028 }
1029 }
1030
1031 // ── parse_header ────────────────────────────────────────────────────────
1032
1033 #[test]
1034 fn parse_header_with_object() {
1035 let (t, k, o) =
1036 Log::parse_header("## [2026-05-27 10:00] ingest | sources/emails/x.eml").unwrap();
1037 assert_eq!(t, ts(2026, 5, 27, 10, 0));
1038 assert_eq!(k, LogKind::Ingest);
1039 assert_eq!(o.as_deref(), Some("sources/emails/x.eml"));
1040 }
1041
1042 #[test]
1043 fn parse_header_without_object_is_none_object() {
1044 let (t, k, o) = Log::parse_header("## [2026-05-27 10:20] validate").unwrap();
1045 assert_eq!(t, ts(2026, 5, 27, 10, 20));
1046 assert_eq!(k, LogKind::Validate);
1047 assert_eq!(o, None);
1048 }
1049
1050 #[test]
1051 fn parse_header_custom_kind_roundtrips_token() {
1052 let (_, k, o) = Log::parse_header("## [2026-05-27 10:00] proposal | records/x").unwrap();
1053 assert_eq!(k, LogKind::Custom("proposal".to_string()));
1054 assert!(!k.is_recognized());
1055 assert_eq!(o.as_deref(), Some("records/x"));
1056 }
1057
1058 #[test]
1059 fn parse_header_index_rebuild_hyphenated_kind() {
1060 let (_, k, _) = Log::parse_header("## [2026-05-27 10:00] index-rebuild").unwrap();
1061 assert_eq!(k, LogKind::IndexRebuild);
1062 assert_eq!(k.as_str(), "index-rebuild");
1063 }
1064
1065 #[test]
1066 fn parse_header_rejects_non_headers() {
1067 assert!(Log::parse_header("Not a header").is_none());
1068 assert!(Log::parse_header("# Curator log").is_none());
1069 assert!(Log::parse_header("## [garbage] ingest | x").is_none());
1070 assert!(Log::parse_header("## [2026-05-27 10:00]").is_none()); // no kind
1071 // A bracketed but non-timestamp date must be rejected (LOG_BAD_TIMESTAMP territory).
1072 assert!(Log::parse_header("## [2026-13-40 99:99] ingest | x").is_none());
1073 }
1074
1075 // ── kind round-trip ───────────────────────────────────────────────────────
1076
1077 #[test]
1078 fn kind_as_str_parse_roundtrip_for_all_recognized() {
1079 for k in [
1080 LogKind::Ingest,
1081 LogKind::Create,
1082 LogKind::Update,
1083 LogKind::Delete,
1084 LogKind::Rename,
1085 LogKind::Link,
1086 LogKind::Validate,
1087 LogKind::IndexRebuild,
1088 LogKind::Contradiction,
1089 ] {
1090 assert_eq!(LogKind::parse(k.as_str()), k);
1091 assert!(k.is_recognized());
1092 }
1093 }
1094
1095 // ── append: creation + frontmatter ───────────────────────────────────────
1096
1097 #[test]
1098 fn append_creates_log_with_frontmatter_and_entry() {
1099 let (_d, store) = temp_store();
1100 let e = entry(
1101 2026,
1102 5,
1103 27,
1104 10,
1105 0,
1106 LogKind::Ingest,
1107 Some("sources/emails/x.eml"),
1108 "Email received.",
1109 );
1110 Log::append(&store, &e).unwrap();
1111
1112 let content = fs::read_to_string(store.root.join("log.md")).unwrap();
1113 // type: log frontmatter present.
1114 assert!(
1115 content.starts_with("---\ntype: log\n---\n"),
1116 "missing log frontmatter; got:\n{content}"
1117 );
1118 // The entry header is rendered verbatim.
1119 assert!(content.contains("## [2026-05-27 10:00] ingest | sources/emails/x.eml"));
1120 assert!(content.contains("Email received."));
1121 // No archive dir created when nothing rotates.
1122 assert!(!store.root.join("log").exists());
1123 }
1124
1125 // ── append → tail → since round-trip ─────────────────────────────────────
1126
1127 #[test]
1128 fn append_tail_since_roundtrip() {
1129 let (_d, store) = temp_store();
1130 let e1 = entry(2026, 5, 27, 10, 0, LogKind::Ingest, Some("a"), "first");
1131 let e2 = entry(2026, 5, 27, 10, 5, LogKind::Create, Some("b"), "second");
1132 let e3 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("c"), "third");
1133 Log::append(&store, &e1).unwrap();
1134 Log::append(&store, &e2).unwrap();
1135 Log::append(&store, &e3).unwrap();
1136
1137 // tail(2) returns the two newest, in chronological order.
1138 let tail = Log::tail(&store, 2).unwrap();
1139 assert_eq!(tail.len(), 2);
1140 assert_eq!(tail[0], e2);
1141 assert_eq!(tail[1], e3);
1142
1143 // tail(n) larger than the log returns everything, chronologically.
1144 let all = Log::tail(&store, 99).unwrap();
1145 assert_eq!(all, vec![e1.clone(), e2.clone(), e3.clone()]);
1146
1147 // since(10:05) returns strictly-newer entries (excludes the 10:05 one).
1148 let since = Log::since(&store, ts(2026, 5, 27, 10, 5)).unwrap();
1149 assert_eq!(since, vec![e3.clone()]);
1150
1151 // since before everything returns all.
1152 let since_all = Log::since(&store, ts(2026, 5, 27, 9, 0)).unwrap();
1153 assert_eq!(since_all, vec![e1, e2, e3]);
1154 }
1155
1156 #[test]
1157 fn tail_zero_is_empty() {
1158 let (_d, store) = temp_store();
1159 Log::append(
1160 &store,
1161 &entry(2026, 5, 27, 10, 0, LogKind::Ingest, Some("a"), "x"),
1162 )
1163 .unwrap();
1164 assert!(Log::tail(&store, 0).unwrap().is_empty());
1165 }
1166
1167 #[test]
1168 fn tail_and_since_on_missing_log_are_empty() {
1169 let (_d, store) = temp_store();
1170 assert!(Log::tail(&store, 5).unwrap().is_empty());
1171 assert!(Log::since(&store, ts(2000, 1, 1, 0, 0)).unwrap().is_empty());
1172 assert!(Log::last_validate_at(&store).unwrap().is_none());
1173 }
1174
1175 #[test]
1176 fn since_exact_timestamp_is_exclusive() {
1177 let (_d, store) = temp_store();
1178 let e = entry(2026, 5, 27, 10, 0, LogKind::Validate, None, "PASS");
1179 Log::append(&store, &e).unwrap();
1180 // Equal timestamp must NOT be included (strictly newer).
1181 assert!(Log::since(&store, ts(2026, 5, 27, 10, 0))
1182 .unwrap()
1183 .is_empty());
1184 }
1185
1186 // ── since: out-of-order on disk (append-only correction / merge=union) ────
1187
1188 /// Write a `log.md` at the store root from `entries` in the EXACT given
1189 /// physical order, with the standard `type: log` frontmatter. Unlike
1190 /// [`Log::append`] (which always lands the newest entry at EOF), this lets a
1191 /// test author the non-monotonic on-disk shape the SPEC permits — a
1192 /// backdated corrective entry below the entry it corrects, or a
1193 /// `merge=union` interleave.
1194 fn write_raw_log(store: &Store, entries: &[LogEntry]) {
1195 let mut content = String::from(LOG_FRONTMATTER);
1196 content.push('\n');
1197 for e in entries {
1198 content.push_str(&e.render());
1199 }
1200 fs::write(store.root.join("log.md"), content).expect("write raw log.md");
1201 }
1202
1203 #[test]
1204 fn since_returns_newer_entries_even_when_disk_order_is_non_monotonic() {
1205 // The demonstrated regression: a curator appended a backdated CORRECTIVE
1206 // entry (10:00) below newer entries (10:10, 10:05), so the physical
1207 // on-disk order is 10:10, 10:05, 10:00 — newest-first, not chronological.
1208 // The append-only SPEC explicitly permits this ("append a corrective
1209 // entry below it"; out-of-order is only LOG_OUT_OF_ORDER, a warning).
1210 let (_d, store) = temp_store();
1211 let e_1010 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("c"), "newest");
1212 let e_1005 = entry(2026, 5, 27, 10, 5, LogKind::Create, Some("b"), "middle");
1213 let e_1000 = entry(
1214 2026,
1215 5,
1216 27,
1217 10,
1218 0,
1219 LogKind::Update,
1220 Some("a"),
1221 "backdated fix",
1222 );
1223 // Physical order on disk: 10:10, 10:05, then the backdated 10:00 LAST.
1224 write_raw_log(&store, &[e_1010, e_1005, e_1000]);
1225
1226 // since 10:02 must return BOTH entries strictly newer than 10:02
1227 // (10:05 and 10:10). The old early-stop hit the physically-last 10:00
1228 // entry (<= 10:02), stopped, and returned EMPTY — silently dropping the
1229 // two newer entries that sit earlier in the file.
1230 let got = Log::since(&store, ts(2026, 5, 27, 10, 2)).unwrap();
1231 let stamps: std::collections::BTreeSet<_> = got.iter().map(|e| e.timestamp).collect();
1232 assert_eq!(
1233 stamps,
1234 [ts(2026, 5, 27, 10, 5), ts(2026, 5, 27, 10, 10)]
1235 .into_iter()
1236 .collect(),
1237 "since(10:02) must include both 10:05 and 10:10 despite the backdated \
1238 10:00 entry sitting physically last, and exclude 10:00; got {got:?}"
1239 );
1240
1241 // A cutoff before everything still returns all three, regardless of the
1242 // scrambled disk order.
1243 let all = Log::since(&store, ts(2026, 5, 27, 9, 0)).unwrap();
1244 let all_stamps: std::collections::BTreeSet<_> = all.iter().map(|e| e.timestamp).collect();
1245 assert_eq!(
1246 all_stamps,
1247 [
1248 ts(2026, 5, 27, 10, 0),
1249 ts(2026, 5, 27, 10, 5),
1250 ts(2026, 5, 27, 10, 10),
1251 ]
1252 .into_iter()
1253 .collect()
1254 );
1255 }
1256
1257 #[test]
1258 fn since_crosses_archive_when_newer_entry_is_out_of_order_inside_it() {
1259 // Out-of-order INSIDE an archive month, with the cutoff landing in that
1260 // month. The April archive is authored newest-physical-first (04-20,
1261 // then a backdated 04-05 last); a naive early-stop on the first
1262 // older-than-cutoff entry would miss the later April entry. The active
1263 // file holds a clean May entry. Cutoff = mid-April.
1264 let (_d, store) = temp_store();
1265
1266 // Active file: one current-month (May) entry.
1267 let may = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may1");
1268 write_raw_log(&store, &[may]);
1269
1270 // April archive authored out of order: 04-20 first, backdated 04-05 last.
1271 let apr_late = entry(
1272 2026,
1273 4,
1274 20,
1275 9,
1276 0,
1277 LogKind::Create,
1278 Some("apr-b"),
1279 "apr-late",
1280 );
1281 let apr_early = entry(
1282 2026,
1283 4,
1284 5,
1285 9,
1286 0,
1287 LogKind::Ingest,
1288 Some("apr-a"),
1289 "apr-early",
1290 );
1291 let dir = store.root.join("log");
1292 fs::create_dir_all(&dir).unwrap();
1293 let mut arch = String::from(LOG_FRONTMATTER);
1294 arch.push('\n');
1295 arch.push_str(&apr_late.render());
1296 arch.push_str(&apr_early.render());
1297 fs::write(dir.join("2026-04.md"), arch).unwrap();
1298
1299 // since mid-April: the later April entry (04-20) AND the May entry must
1300 // come back; the early April entry (04-05) must not.
1301 let got = Log::since(&store, ts(2026, 4, 15, 0, 0)).unwrap();
1302 let stamps: std::collections::BTreeSet<_> = got.iter().map(|e| e.timestamp).collect();
1303 assert_eq!(
1304 stamps,
1305 [ts(2026, 4, 20, 9, 0), ts(2026, 5, 2, 8, 0)]
1306 .into_iter()
1307 .collect(),
1308 "since(mid-April) must include the out-of-order later April entry \
1309 and the May entry, and exclude the earlier April entry; got {got:?}"
1310 );
1311 }
1312
1313 // ── multi-line notes ──────────────────────────────────────────────────────
1314
1315 #[test]
1316 fn multiline_note_is_preserved() {
1317 let (_d, store) = temp_store();
1318 let e = entry(
1319 2026,
1320 5,
1321 27,
1322 10,
1323 0,
1324 LogKind::Create,
1325 Some("records/x"),
1326 "Line one.\nLine two.\nLine three.",
1327 );
1328 Log::append(&store, &e).unwrap();
1329 let got = Log::tail(&store, 1).unwrap();
1330 assert_eq!(got[0].note, "Line one.\nLine two.\nLine three.");
1331 }
1332
1333 #[test]
1334 fn empty_note_roundtrips_as_empty() {
1335 let (_d, store) = temp_store();
1336 let e = entry(2026, 5, 27, 10, 0, LogKind::Validate, None, "");
1337 Log::append(&store, &e).unwrap();
1338 let got = Log::tail(&store, 1).unwrap();
1339 assert_eq!(got[0], e);
1340 assert_eq!(got[0].note, "");
1341 }
1342
1343 // ── last_validate_at ─────────────────────────────────────────────────────
1344
1345 #[test]
1346 fn last_validate_at_finds_most_recent_validate() {
1347 let (_d, store) = temp_store();
1348 Log::append(
1349 &store,
1350 &entry(2026, 5, 27, 10, 0, LogKind::Validate, None, "first pass"),
1351 )
1352 .unwrap();
1353 Log::append(
1354 &store,
1355 &entry(2026, 5, 27, 10, 5, LogKind::Create, Some("a"), "made a"),
1356 )
1357 .unwrap();
1358 Log::append(
1359 &store,
1360 &entry(2026, 5, 27, 10, 10, LogKind::Validate, None, "second pass"),
1361 )
1362 .unwrap();
1363 Log::append(
1364 &store,
1365 &entry(2026, 5, 27, 10, 15, LogKind::Update, Some("a"), "edit a"),
1366 )
1367 .unwrap();
1368
1369 let last = Log::last_validate_at(&store).unwrap();
1370 assert_eq!(last, Some(ts(2026, 5, 27, 10, 10)));
1371 }
1372
1373 #[test]
1374 fn last_validate_at_none_when_no_validate() {
1375 let (_d, store) = temp_store();
1376 Log::append(
1377 &store,
1378 &entry(2026, 5, 27, 10, 0, LogKind::Create, Some("a"), "x"),
1379 )
1380 .unwrap();
1381 assert_eq!(Log::last_validate_at(&store).unwrap(), None);
1382 }
1383
1384 // ── month-boundary rotation ──────────────────────────────────────────────
1385
1386 #[test]
1387 fn rotation_rolls_prior_months_into_archives() {
1388 let (_d, store) = temp_store();
1389 // Two April entries and one May entry, all written while "current" was
1390 // their own month (append-only chronological order).
1391 let a1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
1392 let a2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
1393 Log::append(&store, &a1).unwrap();
1394 Log::append(&store, &a2).unwrap();
1395
1396 // Before rotation: no archive dir, both April entries in active.
1397 assert!(!store.root.join("log").exists());
1398
1399 // Appending a May entry must roll April into log/2026-04.md.
1400 let m1 = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may one");
1401 Log::append(&store, &m1).unwrap();
1402
1403 // Archive exists and holds both April entries with frontmatter.
1404 let arch_path = store.root.join("log").join("2026-04.md");
1405 assert!(arch_path.exists(), "expected April archive to be created");
1406 let arch = fs::read_to_string(&arch_path).unwrap();
1407 assert!(arch.starts_with("---\ntype: log\n---\n"));
1408 assert!(arch.contains("## [2026-04-10 09:00] ingest | apr-a"));
1409 assert!(arch.contains("## [2026-04-20 09:00] create | apr-b"));
1410 assert!(arch.contains("apr one"));
1411 assert!(arch.contains("apr two"));
1412
1413 // Active file now holds ONLY the May entry (no April entries).
1414 let active = fs::read_to_string(store.root.join("log.md")).unwrap();
1415 assert!(active.contains("## [2026-05-02 08:00] update | may-a"));
1416 assert!(
1417 !active.contains("apr-a") && !active.contains("apr-b"),
1418 "April entries must be gone from the active file; got:\n{active}"
1419 );
1420
1421 // The full timeline (archives ++ active) is intact and chronological.
1422 let all = Log::tail(&store, 99).unwrap();
1423 assert_eq!(all, vec![a1, a2, m1]);
1424 }
1425
1426 #[test]
1427 fn rotation_groups_distinct_prior_months_into_separate_archives() {
1428 let (_d, store) = temp_store();
1429 // March + April entries accumulate, then a May append rolls BOTH prior
1430 // months into their own archive files.
1431 let mar = entry(2026, 3, 5, 9, 0, LogKind::Ingest, Some("mar"), "march");
1432 let apr = entry(2026, 4, 5, 9, 0, LogKind::Create, Some("apr"), "april");
1433 Log::append(&store, &mar).unwrap();
1434 Log::append(&store, &apr).unwrap();
1435 // At this point April is current, March already rolled into its archive.
1436 assert!(store.root.join("log").join("2026-03.md").exists());
1437
1438 let may = entry(2026, 5, 5, 9, 0, LogKind::Update, Some("may"), "may");
1439 Log::append(&store, &may).unwrap();
1440
1441 assert!(store.root.join("log").join("2026-03.md").exists());
1442 assert!(store.root.join("log").join("2026-04.md").exists());
1443
1444 // Each archive holds only its own month.
1445 let mar_arch = fs::read_to_string(store.root.join("log").join("2026-03.md")).unwrap();
1446 let apr_arch = fs::read_to_string(store.root.join("log").join("2026-04.md")).unwrap();
1447 assert!(mar_arch.contains("mar") && !mar_arch.contains("apr"));
1448 assert!(apr_arch.contains("apr") && !apr_arch.contains("mar"));
1449
1450 // Active holds only May.
1451 let active = fs::read_to_string(store.root.join("log.md")).unwrap();
1452 assert!(active.contains("may") && !active.contains("mar") && !active.contains("apr"));
1453
1454 // Timeline intact and ordered across both archives + active.
1455 let all = Log::tail(&store, 99).unwrap();
1456 assert_eq!(all, vec![mar, apr, may]);
1457 }
1458
1459 #[test]
1460 fn tail_crosses_into_archive_when_n_spans_month_boundary() {
1461 let (_d, store) = temp_store();
1462 let a1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr1");
1463 let a2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr2");
1464 let m1 = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may1");
1465 let m2 = entry(2026, 5, 3, 8, 0, LogKind::Update, Some("may-b"), "may2");
1466 for e in [&a1, &a2, &m1, &m2] {
1467 Log::append(&store, e).unwrap();
1468 }
1469 // April is now archived; active holds only May. tail(3) must reach back
1470 // into the archive for the third-newest entry.
1471 let tail3 = Log::tail(&store, 3).unwrap();
1472 assert_eq!(tail3, vec![a2.clone(), m1.clone(), m2.clone()]);
1473
1474 // tail within the active month does NOT need the archive but is still
1475 // correct.
1476 let tail2 = Log::tail(&store, 2).unwrap();
1477 assert_eq!(tail2, vec![m1, m2]);
1478 }
1479
1480 #[test]
1481 fn since_crosses_into_archive_and_early_stops() {
1482 let (_d, store) = temp_store();
1483 let a1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr1");
1484 let a2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr2");
1485 let m1 = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may1");
1486 for e in [&a1, &a2, &m1] {
1487 Log::append(&store, e).unwrap();
1488 }
1489 // since a mid-April time: must include the later April entry (from the
1490 // archive) and the May entry, but not the earlier April one.
1491 let got = Log::since(&store, ts(2026, 4, 15, 0, 0)).unwrap();
1492 assert_eq!(got, vec![a2, m1]);
1493 }
1494
1495 #[test]
1496 fn last_validate_at_crosses_into_archive() {
1497 let (_d, store) = temp_store();
1498 // A validate in April, then non-validate work that rolls April away.
1499 Log::append(
1500 &store,
1501 &entry(2026, 4, 10, 9, 0, LogKind::Validate, None, "apr validate"),
1502 )
1503 .unwrap();
1504 Log::append(
1505 &store,
1506 &entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may work"),
1507 )
1508 .unwrap();
1509 // Active has only the May update; the most-recent validate lives in the
1510 // April archive and must still be found.
1511 let last = Log::last_validate_at(&store).unwrap();
1512 assert_eq!(last, Some(ts(2026, 4, 10, 9, 0)));
1513 }
1514
1515 // ── reverse-read correctness on a large (multi-block) log ────────────────
1516
1517 #[test]
1518 fn reverse_read_correct_on_large_single_month_log() {
1519 let (_d, store) = temp_store();
1520 // Append many same-month entries with chunky multi-line notes so the
1521 // file spans well past one REVERSE_BLOCK (8 KiB). Timestamps are
1522 // strictly increasing (a real append-only log is monotonic): each entry
1523 // is 3 minutes after the previous, all within June, so physical order
1524 // equals chronological order and the last-k-physical ARE the k-newest.
1525 let n = 400usize;
1526 let mut expected: Vec<LogEntry> = Vec::new();
1527 for i in 0..n {
1528 let total_min = (i as u32) * 3;
1529 let day = 1 + total_min / (24 * 60);
1530 let hour = (total_min / 60) % 24;
1531 let min = total_min % 60;
1532 // Unique, multi-line note to bulk up the file and detect mis-parses.
1533 let note = format!(
1534 "entry number {i}\nbody line A for {i}\nbody line B for {i} with padding {}",
1535 "x".repeat(40)
1536 );
1537 let e = entry(
1538 2026,
1539 6,
1540 day,
1541 hour,
1542 min,
1543 LogKind::Update,
1544 Some(&format!("records/item-{i:04}")),
1545 ¬e,
1546 );
1547 Log::append(&store, &e).unwrap();
1548 expected.push(e);
1549 }
1550
1551 // File must actually be multi-block to exercise the backward reader.
1552 let size = fs::metadata(store.root.join("log.md")).unwrap().len();
1553 assert!(
1554 size > (REVERSE_BLOCK as u64) * 2,
1555 "test log not large enough ({size} bytes) to exercise multi-block reverse-read"
1556 );
1557
1558 // tail(5) must equal the 5 newest, exactly.
1559 let tail5 = Log::tail(&store, 5).unwrap();
1560 assert_eq!(tail5, expected[n - 5..].to_vec());
1561
1562 // tail(50) must equal the 50 newest.
1563 let tail50 = Log::tail(&store, 50).unwrap();
1564 assert_eq!(tail50, expected[n - 50..].to_vec());
1565
1566 // tail(all) must reconstruct the whole timeline in order.
1567 let all = Log::tail(&store, n + 10).unwrap();
1568 assert_eq!(all.len(), n);
1569 assert_eq!(all, expected);
1570 }
1571
1572 // ── tail on OUT-OF-ORDER logs (newest-by-timestamp, not last-physical) ────
1573 //
1574 // The append-only contract is non-decreasing time order, but it's only a
1575 // `LOG_OUT_OF_ORDER` warning when violated (corrective entries land below
1576 // the entry they correct; backdated / clock-skewed writes; `merge=union`
1577 // clone merges). `tail N` must return the N newest *by timestamp*, never the
1578 // last N *physical* entries.
1579
1580 /// Write `log.md` verbatim from rendered entries in the given **physical
1581 /// (file) order**, bypassing `Log::append` so the test controls on-disk
1582 /// order exactly (append never reorders within a month, but this is the
1583 /// clearest way to pin a specific physical layout).
1584 fn write_log_physical(store: &Store, entries: &[LogEntry]) {
1585 let mut body = String::new();
1586 for e in entries {
1587 body.push_str(&e.render());
1588 }
1589 let full = compose_active(LOG_FRONTMATTER, &body);
1590 fs::write(store.root.join("log.md"), full).expect("write log.md");
1591 }
1592
1593 #[test]
1594 fn tail_returns_newest_by_timestamp_on_demonstrated_out_of_order_log() {
1595 // The exact case from the review finding: physical order 10:10, 10:05,
1596 // 10:00 (a backdated entry tail). The OLD code returned the last two
1597 // physical entries {10:05, 10:00}; the correct answer is the two newest
1598 // by time {10:05, 10:10}.
1599 let (_d, store) = temp_store();
1600 let e_1010 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("c"), "ten-ten");
1601 let e_1005 = entry(
1602 2026,
1603 5,
1604 27,
1605 10,
1606 5,
1607 LogKind::Create,
1608 Some("b"),
1609 "ten-oh-five",
1610 );
1611 let e_1000 = entry(2026, 5, 27, 10, 0, LogKind::Ingest, Some("a"), "ten-oh-oh");
1612 // Physical order: newest first, then the two older ones — out of order.
1613 write_log_physical(&store, &[e_1010.clone(), e_1005.clone(), e_1000.clone()]);
1614
1615 let tail2 = Log::tail(&store, 2).unwrap();
1616 assert_eq!(
1617 tail2,
1618 vec![e_1005.clone(), e_1010.clone()],
1619 "tail(2) must be the two NEWEST by timestamp (chronological), \
1620 not the last two physical entries"
1621 );
1622 // The newest entry must be present and the oldest absent.
1623 assert!(tail2.contains(&e_1010), "newest (10:10) must be included");
1624 assert!(!tail2.contains(&e_1000), "oldest (10:00) must be excluded");
1625
1626 // tail(1) is just the single newest.
1627 assert_eq!(Log::tail(&store, 1).unwrap(), vec![e_1010.clone()]);
1628 // tail(all) is the full set in chronological order.
1629 assert_eq!(Log::tail(&store, 99).unwrap(), vec![e_1000, e_1005, e_1010]);
1630 }
1631
1632 #[test]
1633 fn tail_no_early_stop_when_newer_entry_sits_before_an_older_one() {
1634 // Guards the unsound within-file early stop: a newer entry (10:50) sits
1635 // PHYSICALLY BEFORE a much older one (10:00). Reading newest-physical-
1636 // first, the scan meets 10:00 before 10:50; any "stop at the first entry
1637 // below the window minimum" rule would bail and drop 10:50.
1638 //
1639 // Physical (top→bottom): 10:55, 10:10, 10:50, 10:00.
1640 // Reverse-scan order: 10:00, 10:50, 10:10, 10:55.
1641 let (_d, store) = temp_store();
1642 let e55 = entry(2026, 5, 27, 10, 55, LogKind::Update, Some("x55"), "55");
1643 let e10 = entry(2026, 5, 27, 10, 10, LogKind::Update, Some("x10"), "10");
1644 let e50 = entry(2026, 5, 27, 10, 50, LogKind::Update, Some("x50"), "50");
1645 let e00 = entry(2026, 5, 27, 10, 0, LogKind::Update, Some("x00"), "00");
1646 write_log_physical(
1647 &store,
1648 &[e55.clone(), e10.clone(), e50.clone(), e00.clone()],
1649 );
1650
1651 // The two newest by timestamp are 10:55 and 10:50 — NOT the early-stop
1652 // victim 10:10, and NOT the last-physical 10:00.
1653 let tail2 = Log::tail(&store, 2).unwrap();
1654 assert_eq!(tail2, vec![e50.clone(), e55.clone()]);
1655
1656 let tail3 = Log::tail(&store, 3).unwrap();
1657 assert_eq!(tail3, vec![e10.clone(), e50.clone(), e55.clone()]);
1658 }
1659
1660 #[test]
1661 fn tail_orders_equal_timestamps_by_physical_recency() {
1662 // Three entries share 10:00; one is at 09:59. tail(2) must keep both
1663 // 10:00 entries, and among the equal pair the one appended LATER
1664 // (physically last) sorts last ("newest" = most-recently recorded).
1665 let (_d, store) = temp_store();
1666 let early = entry(2026, 5, 27, 9, 59, LogKind::Create, Some("early"), "before");
1667 let tie_a = entry(
1668 2026,
1669 5,
1670 27,
1671 10,
1672 0,
1673 LogKind::Update,
1674 Some("tie-a"),
1675 "first 10:00",
1676 );
1677 let tie_b = entry(
1678 2026,
1679 5,
1680 27,
1681 10,
1682 0,
1683 LogKind::Update,
1684 Some("tie-b"),
1685 "second 10:00",
1686 );
1687 // Physical append order: early, tie_a, tie_b.
1688 write_log_physical(&store, &[early.clone(), tie_a.clone(), tie_b.clone()]);
1689
1690 let tail2 = Log::tail(&store, 2).unwrap();
1691 assert_eq!(
1692 tail2,
1693 vec![tie_a.clone(), tie_b.clone()],
1694 "both 10:00 entries kept, physically-later one (tie_b) last; 09:59 dropped"
1695 );
1696 // tail(1) keeps only the most-recently-recorded of the equal pair.
1697 assert_eq!(Log::tail(&store, 1).unwrap(), vec![tie_b]);
1698 }
1699
1700 #[test]
1701 fn tail_finds_newest_across_a_backdated_entry_spanning_the_month_boundary() {
1702 // A backdated entry can land physically after newer entries even across
1703 // a rotation: append May entries, then a June entry (rolls May to its
1704 // archive), then append a May-dated correction — it goes into the ACTIVE
1705 // file, physically after June. tail must still rank by timestamp, so the
1706 // June entry stays newest and the backdated May entry is not mistaken
1707 // for the tail.
1708 let (_d, store) = temp_store();
1709 let may1 = entry(2026, 5, 10, 9, 0, LogKind::Ingest, Some("may-1"), "may one");
1710 let may2 = entry(2026, 5, 20, 9, 0, LogKind::Create, Some("may-2"), "may two");
1711 let jun1 = entry(2026, 6, 2, 8, 0, LogKind::Update, Some("jun-1"), "jun one");
1712 Log::append(&store, &may1).unwrap();
1713 Log::append(&store, &may2).unwrap();
1714 Log::append(&store, &jun1).unwrap(); // rotates May -> log/2026-05.md
1715 assert!(store.root.join("log").join("2026-05.md").exists());
1716
1717 // A backdated May correction, appended now: it lands in the active file
1718 // (its month May is not strictly before the active month June), so the
1719 // active file is physically [jun1, may_corr] — out of order.
1720 let may_corr = entry(
1721 2026,
1722 5,
1723 25,
1724 9,
1725 0,
1726 LogKind::Update,
1727 Some("may-2"),
1728 "may correction",
1729 );
1730 Log::append(&store, &may_corr).unwrap();
1731 let active = fs::read_to_string(store.root.join("log.md")).unwrap();
1732 assert!(
1733 active.contains("jun-1") && active.contains("may correction"),
1734 "backdated May entry should be in the active file alongside June; got:\n{active}"
1735 );
1736
1737 // The single newest by timestamp is the June entry, even though the
1738 // backdated May entry is physically last.
1739 assert_eq!(Log::tail(&store, 1).unwrap(), vec![jun1.clone()]);
1740
1741 // tail(2): the two newest by time are may_corr (05-25) and jun1 (06-02).
1742 let tail2 = Log::tail(&store, 2).unwrap();
1743 assert_eq!(tail2, vec![may_corr.clone(), jun1.clone()]);
1744
1745 // tail(3) must reach into the May archive for the third-newest (may2,
1746 // 05-20), proving archive crossing still works on an out-of-order store.
1747 let tail3 = Log::tail(&store, 3).unwrap();
1748 assert_eq!(tail3, vec![may2.clone(), may_corr.clone(), jun1.clone()]);
1749
1750 // tail(all) reconstructs the whole timeline in chronological order.
1751 let all = Log::tail(&store, 99).unwrap();
1752 assert_eq!(all, vec![may1, may2, may_corr, jun1]);
1753 }
1754
1755 #[test]
1756 fn parse_entries_skips_unparseable_header_folding_into_body() {
1757 // A `## [` line that is NOT a valid header should not start a new entry;
1758 // it folds into the preceding entry's note. This guards the
1759 // parse_entries header-validation branch.
1760 let text = "\
1761## [2026-05-27 10:00] create | records/x
1762Body mentions a literal: ## [not a real header here]
1763More body.
1764
1765## [2026-05-27 10:05] update | records/y
1766Second.
1767";
1768 let entries = parse_entries(text);
1769 assert_eq!(entries.len(), 2);
1770 assert_eq!(entries[0].kind, LogKind::Create);
1771 assert!(entries[0].note.contains("## [not a real header here]"));
1772 assert!(entries[0].note.contains("More body."));
1773 assert_eq!(entries[1].kind, LogKind::Update);
1774 assert_eq!(entries[1].note, "Second.");
1775 }
1776
1777 // ── append-only: corrective entries go on the end ─────────────────────────
1778
1779 #[test]
1780 fn append_only_corrective_entry_goes_on_end_without_rewriting() {
1781 let (_d, store) = temp_store();
1782 let original = entry(
1783 2026,
1784 5,
1785 27,
1786 10,
1787 0,
1788 LogKind::Update,
1789 Some("records/northstar"),
1790 "Seat count 120 -> 175.",
1791 );
1792 Log::append(&store, &original).unwrap();
1793 let after_first = fs::read_to_string(store.root.join("log.md")).unwrap();
1794
1795 // A correction is a NEW entry appended on the end; the original text is
1796 // left byte-for-byte intact (append-only contract: no rewrite API).
1797 let correction = entry(
1798 2026,
1799 5,
1800 27,
1801 11,
1802 0,
1803 LogKind::Update,
1804 Some("records/northstar"),
1805 "Correction: seat count is 165, not 175.",
1806 );
1807 Log::append(&store, &correction).unwrap();
1808 let after_second = fs::read_to_string(store.root.join("log.md")).unwrap();
1809
1810 assert!(
1811 after_second.starts_with(&after_first),
1812 "appending must not rewrite earlier bytes"
1813 );
1814 assert!(after_second.contains("Correction: seat count is 165, not 175."));
1815
1816 // Both entries are readable, in order.
1817 let all = Log::tail(&store, 99).unwrap();
1818 assert_eq!(all, vec![original, correction]);
1819 }
1820
1821 // ── concurrent append safety (atomic via temp-file rename) ────────────────
1822
1823 #[test]
1824 fn concurrent_appends_are_atomic_and_total() {
1825 use std::sync::{Arc, Barrier};
1826 use std::thread;
1827
1828 let (_d, store) = temp_store();
1829 // Seed the file so all threads take the read-modify-write path.
1830 Log::append(
1831 &store,
1832 &entry(2026, 7, 1, 0, 0, LogKind::Create, Some("seed"), "seed"),
1833 )
1834 .unwrap();
1835
1836 let threads = 8usize;
1837 let per = 25usize;
1838 let barrier = Arc::new(Barrier::new(threads));
1839 let store = Arc::new(store);
1840
1841 let mut handles = Vec::new();
1842 for tnum in 0..threads {
1843 let b = Arc::clone(&barrier);
1844 let s = Arc::clone(&store);
1845 handles.push(thread::spawn(move || {
1846 b.wait();
1847 for i in 0..per {
1848 let e = entry(
1849 2026,
1850 7,
1851 1,
1852 (tnum % 24) as u32,
1853 (i % 60) as u32,
1854 LogKind::Update,
1855 Some(&format!("t{tnum}-i{i}")),
1856 &format!("thread {tnum} item {i}"),
1857 );
1858 Log::append(&s, &e).unwrap();
1859 }
1860 }));
1861 }
1862 for h in handles {
1863 h.join().unwrap();
1864 }
1865
1866 // The atomic temp-file-rename write means no append truncates or
1867 // corrupts another: the file must remain parseable and every line of
1868 // every entry header must be well-formed. Crucially, no entry should be
1869 // lost to a torn write of the *content already on disk* — though
1870 // interleaved read-modify-write WILL drop some appends (last-writer-
1871 // wins on the snapshot). We therefore assert integrity + that the file
1872 // never went empty / corrupt, not an exact count.
1873 let content = fs::read_to_string(store.root.join("log.md")).unwrap();
1874 assert!(content.starts_with("---\ntype: log\n---\n"));
1875
1876 // Every `## [` line must parse as a valid header (no half-written line).
1877 for line in content.lines() {
1878 if line.starts_with("## [") {
1879 assert!(
1880 Log::parse_header(line).is_some(),
1881 "corrupt/torn header line on disk: {line:?}"
1882 );
1883 }
1884 }
1885
1886 // The seed entry must survive (it was written before the race and
1887 // every snapshot included it).
1888 assert!(content.contains("## [2026-07-01 00:00] create | seed"));
1889
1890 // The reverse reader must still produce a clean, fully-parseable view.
1891 let all = Log::tail(&store, 10_000).unwrap();
1892 assert!(!all.is_empty());
1893 // No duplicate adjacent identical headers from a torn write: every
1894 // returned entry must have a recognized-or-custom kind and a parseable
1895 // timestamp (already guaranteed by parse), and the list must be
1896 // internally consistent (re-render → re-parse identity for each).
1897 for e in &all {
1898 let rendered = e.render();
1899 let reparsed = parse_single_entry(&rendered).unwrap();
1900 assert_eq!(&reparsed, e);
1901 }
1902 }
1903
1904 // ── render/parse identity ────────────────────────────────────────────────
1905
1906 #[test]
1907 fn render_then_parse_is_identity() {
1908 let cases = vec![
1909 entry(
1910 2026,
1911 1,
1912 2,
1913 3,
1914 4,
1915 LogKind::Ingest,
1916 Some("sources/a.eml"),
1917 "n",
1918 ),
1919 entry(
1920 2026,
1921 12,
1922 31,
1923 23,
1924 59,
1925 LogKind::Validate,
1926 None,
1927 "PASS - 0 errors",
1928 ),
1929 entry(
1930 2026,
1931 6,
1932 15,
1933 12,
1934 30,
1935 LogKind::Custom("proposal".to_string()),
1936 Some("records/p"),
1937 "multi\nline\nnote",
1938 ),
1939 entry(2026, 6, 15, 12, 30, LogKind::Contradiction, Some("obj"), ""),
1940 ];
1941 for e in cases {
1942 let rendered = e.render();
1943 let parsed = parse_single_entry(&rendered).unwrap_or_else(|| {
1944 panic!("failed to reparse rendered entry:\n{rendered}");
1945 });
1946 assert_eq!(parsed, e, "round-trip mismatch for {e:?}");
1947 }
1948 }
1949
1950 // ── regression: rotation re-roll must not duplicate archive entries (#3) ──
1951
1952 /// Count occurrences of `needle` in `haystack` (non-overlapping).
1953 fn count_occurrences(haystack: &str, needle: &str) -> usize {
1954 haystack.matches(needle).count()
1955 }
1956
1957 #[test]
1958 fn regression_archive_reroll_is_idempotent_after_interrupted_rotation() {
1959 // Reconstructs the finding's exact failure window: rotation is two
1960 // non-atomic durable writes — (1) roll prior-month entries into the
1961 // archive, then (2) trim the active file. If the process crashes or the
1962 // active rewrite errors AFTER step (1) commits, the prior-month entries
1963 // stay in the untrimmed active file, the agent retries, and the retry
1964 // re-rolls the SAME entries into the archive a second time. The
1965 // mechanism is precisely a second `append_to_archive` of identical
1966 // entries onto an archive that already holds them.
1967 let (_d, store) = temp_store();
1968 let dir = archive_dir(&store);
1969 let arch = archive_path(&store, 2026, 4);
1970
1971 let apr1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
1972 let apr2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
1973 let month = [apr1.clone(), apr2.clone()];
1974
1975 // First roll (the committed step-(1) write before the crash).
1976 fs::create_dir_all(&dir).unwrap();
1977 append_to_archive(&arch, &month).unwrap();
1978
1979 // The retry re-rolls the identical prior-month entries. Pre-fix this
1980 // blindly concatenated, doubling every entry; do it twice to prove the
1981 // amplification a real retry loop would cause is fully suppressed.
1982 append_to_archive(&arch, &month).unwrap();
1983 append_to_archive(&arch, &month).unwrap();
1984
1985 let archived = fs::read_to_string(&arch).unwrap();
1986 // Each entry header must appear EXACTLY once despite the re-rolls.
1987 assert_eq!(
1988 count_occurrences(&archived, "## [2026-04-10 09:00] ingest | apr-a"),
1989 1,
1990 "re-rolled archive duplicated the first April entry; got:\n{archived}"
1991 );
1992 assert_eq!(
1993 count_occurrences(&archived, "## [2026-04-20 09:00] create | apr-b"),
1994 1,
1995 "re-rolled archive duplicated the second April entry; got:\n{archived}"
1996 );
1997
1998 // And the reader surface (`since`) must return each entry once, not the
1999 // duplicated set the pre-fix archive would have yielded.
2000 let got = Log::since(&store, ts(2026, 4, 1, 0, 0)).unwrap();
2001 assert_eq!(
2002 got,
2003 vec![apr1, apr2],
2004 "since over the re-rolled archive must return each April entry once"
2005 );
2006 }
2007
2008 #[test]
2009 fn regression_rotation_reroll_after_active_untrimmed_does_not_duplicate() {
2010 // End-to-end variant driving the real `Log::append` rotation path. We
2011 // rotate April into its archive via a May append, then SIMULATE the
2012 // partial failure by restoring the pre-trim active file (April + May)
2013 // and re-running `append` — exactly the state a crash-between-the-two-
2014 // writes / failed-active-rewrite + agent-retry produces. The archive
2015 // must still hold each April entry once.
2016 let (_d, store) = temp_store();
2017 let apr1 = entry(2026, 4, 10, 9, 0, LogKind::Ingest, Some("apr-a"), "apr one");
2018 let apr2 = entry(2026, 4, 20, 9, 0, LogKind::Create, Some("apr-b"), "apr two");
2019 Log::append(&store, &apr1).unwrap();
2020 Log::append(&store, &apr2).unwrap();
2021
2022 // Snapshot the active file holding both April entries (this is what is
2023 // still on disk if the post-rotation active rewrite never lands).
2024 let active_path = active_log_path(&store);
2025 let pre_rotation_active = fs::read_to_string(&active_path).unwrap();
2026
2027 // A May append rotates April out and trims the active file.
2028 let may = entry(2026, 5, 2, 8, 0, LogKind::Update, Some("may-a"), "may one");
2029 Log::append(&store, &may).unwrap();
2030 let arch = archive_path(&store, 2026, 4);
2031 assert!(arch.exists(), "April should have rotated to its archive");
2032
2033 // Simulate the crash/error: the active rewrite never persisted, so the
2034 // active file still contains the (now also archived) April entries.
2035 fs::write(&active_path, &pre_rotation_active).unwrap();
2036
2037 // The agent retries the append. Re-partitioning sees April as prior
2038 // months again and re-rolls them — which must NOT duplicate the archive.
2039 let may2 = entry(2026, 5, 3, 8, 0, LogKind::Update, Some("may-b"), "may two");
2040 Log::append(&store, &may2).unwrap();
2041
2042 let archived = fs::read_to_string(&arch).unwrap();
2043 assert_eq!(
2044 count_occurrences(&archived, "## [2026-04-10 09:00] ingest | apr-a"),
2045 1,
2046 "retried rotation duplicated an April entry in the archive; got:\n{archived}"
2047 );
2048 assert_eq!(
2049 count_occurrences(&archived, "## [2026-04-20 09:00] create | apr-b"),
2050 1,
2051 "retried rotation duplicated an April entry in the archive; got:\n{archived}"
2052 );
2053 }
2054
2055 // ── regression: reverse reader keeps a `## [` continuation note line (#10) ─
2056
2057 #[test]
2058 fn regression_reverse_reader_preserves_note_line_starting_with_bracket_header() {
2059 // SPEC permits a note of "one or more lines" with no restriction on a
2060 // continuation line starting at column 0 with `## [`. The forward parser
2061 // folds such an unparseable `## [` line into the note; the reverse
2062 // reader (tail/since/last_validate_at) must agree, not split on it.
2063 let (_d, store) = temp_store();
2064 let multi = "First line.\n## [draft outline] more\nThird line.";
2065 let e = entry(
2066 2026,
2067 5,
2068 27,
2069 10,
2070 0,
2071 LogKind::Update,
2072 Some("records/x"),
2073 multi,
2074 );
2075 // Author the log verbatim (render writes the note as-is); this is the
2076 // on-disk shape a hand-written / appended multi-line note produces.
2077 write_raw_log(&store, std::slice::from_ref(&e));
2078
2079 // Pre-fix: header_offsets treated `## [draft outline] more` as a second
2080 // entry boundary, truncating the note to "First line." and dropping the
2081 // carved (non-header) fragment. Post-fix: the full note survives.
2082 let got = Log::tail(&store, 1).unwrap();
2083 assert_eq!(got.len(), 1, "the single entry must be returned");
2084 assert_eq!(
2085 got[0].note, multi,
2086 "reverse reader truncated the note at the `## [` continuation line; \
2087 got {:?}",
2088 got[0].note
2089 );
2090 assert_eq!(got[0], e, "the whole entry must round-trip through tail");
2091
2092 // `since` (the other reverse-reading surface) must agree.
2093 let since = Log::since(&store, ts(2026, 5, 27, 9, 0)).unwrap();
2094 assert_eq!(since, vec![e]);
2095 }
2096
2097 // ── regression: `since` archive pruning uses the UTC month, not local (#11) ─
2098
2099 /// A `DateTime<FixedOffset>` at the given fixed offset (hours east of UTC).
2100 fn ts_offset(
2101 y: i32,
2102 mo: u32,
2103 d: u32,
2104 h: u32,
2105 mi: u32,
2106 offset_hours: i32,
2107 ) -> DateTime<FixedOffset> {
2108 let naive = chrono::NaiveDate::from_ymd_opt(y, mo, d)
2109 .unwrap()
2110 .and_hms_opt(h, mi, 0)
2111 .unwrap();
2112 FixedOffset::east_opt(offset_hours * 3600)
2113 .unwrap()
2114 .from_local_datetime(&naive)
2115 .single()
2116 .unwrap()
2117 }
2118
2119 #[test]
2120 fn regression_since_prunes_archives_on_utc_month_not_local_offset_month() {
2121 // Archive months are bucketed on the UTC calendar. A `since` cutoff with
2122 // a non-UTC offset near a month boundary must not prune an archive whose
2123 // UTC month equals the cutoff's UTC month just because the cutoff's
2124 // LOCAL month is later.
2125 let (_d, store) = temp_store();
2126
2127 // April archive: an entry late on 2026-04-30 at 18:00 UTC.
2128 let apr = entry(
2129 2026,
2130 4,
2131 30,
2132 18,
2133 0,
2134 LogKind::Update,
2135 Some("apr-late"),
2136 "april late",
2137 );
2138 let dir = archive_dir(&store);
2139 fs::create_dir_all(&dir).unwrap();
2140 let mut arch = String::from(LOG_FRONTMATTER);
2141 arch.push('\n');
2142 arch.push_str(&apr.render());
2143 fs::write(archive_path(&store, 2026, 4), arch).unwrap();
2144
2145 // Active file: a clean May entry, so an archive scan is actually needed.
2146 let may = entry(2026, 5, 5, 8, 0, LogKind::Update, Some("may-a"), "may one");
2147 write_raw_log(&store, std::slice::from_ref(&may));
2148
2149 // Cutoff 2026-05-01T00:30:00+07:00 == 2026-04-30T17:30:00Z. The April
2150 // 18:00 UTC entry is strictly newer than this instant.
2151 let cutoff = ts_offset(2026, 5, 1, 0, 30, 7);
2152 // Sanity: the cutoff's UTC month is April, its local month is May.
2153 assert_eq!((cutoff.year(), cutoff.month()), (2026, 5));
2154 assert_eq!(
2155 (
2156 cutoff.with_timezone(&Utc).year(),
2157 cutoff.with_timezone(&Utc).month()
2158 ),
2159 (2026, 4)
2160 );
2161
2162 // Pre-fix: cutoff_ym = (2026, 5) from local fields, so the (2026, 4)
2163 // archive was pruned and the genuinely-newer 18:00 UTC entry was dropped
2164 // — `since` returned only the May entry. Post-fix: cutoff_ym is UTC
2165 // (2026, 4), the April archive is scanned, and both come back.
2166 let got = Log::since(&store, cutoff).unwrap();
2167 let stamps: std::collections::BTreeSet<_> = got.iter().map(|e| e.timestamp).collect();
2168 assert_eq!(
2169 stamps,
2170 [ts(2026, 4, 30, 18, 0), ts(2026, 5, 5, 8, 0)]
2171 .into_iter()
2172 .collect(),
2173 "since(non-UTC cutoff near a month boundary) must include the April \
2174 archive entry newer than the cutoff instant; got {got:?}"
2175 );
2176 }
2177}