Skip to main content

dbmd_core/
index.rs

1//! `index` — the hierarchical content catalog.
2//!
3//! A uniform three-level tree: root + per-layer + per-type-folder. **Two
4//! artifacts per type-folder:** the human `index.md` (capped 500, recency
5//! browse) and the machine `index.jsonl` (complete, structured — one JSON
6//! object per file). Both read `summary` + key frontmatter fields + links
7//! directly from each file — there is no extraction logic here.
8//!
9//! **Maintained write-through** by the write commands ([`Index::on_write`] /
10//! [`Index::on_rename`] / [`Index::on_remove`] — the loop path, O(changed), no
11//! store walk); [`Index::rebuild_all`] is the from-scratch SWEEP repair.
12//!
13//! **Key invariant:** write-through must produce a byte-identical `index.md`
14//! and (post-compaction) `index.jsonl` to a full [`Index::rebuild_all`] over
15//! the same end state — the loop path can never drift from the repair path.
16//!
17//! # Implementation notes (deviations the reader should know)
18//!
19//! - **Self-contained, by design.** This module does its own shard-aware folder
20//!   walk, its own minimal frontmatter read, and its own atomic write, using
21//!   only `store.root` (a public field) and the `serde_norway` / `serde_json` /
22//!   `chrono` / `walkdir` crates rather than routing through the sibling
23//!   `store`/`parser` helpers ([`Store::walk_type_folder`],
24//!   [`Store::recent_in_type_folder`], [`parser::read_file`], …). The index has
25//!   to stamp a *deterministic* `updated:` and emit a *canonical, compacted*
26//!   `index.jsonl` (see the two notes below); keeping the read/walk/write local
27//!   is what makes the byte-identity invariant a true byte comparison, free of
28//!   any incidental formatting the shared readers might introduce. The public
29//!   signatures in `lib.rs` are untouched.
30//! - **Deterministic `updated:` on the index files themselves.** An index's own
31//!   `updated` frontmatter is derived as the max `updated` over the files it
32//!   catalogs (max over children for root/layer) — NOT wall-clock-now. This is
33//!   what makes the byte-identity invariant a *true* byte comparison: a
34//!   write-through write and a `rebuild_all` over the same end state stamp the
35//!   same value. (The SPEC's rendered examples show a wall-clock-looking value;
36//!   the conventions list only requires `updated: <RFC3339>`, and the
37//!   property-tested invariant dominates.)
38//! - **`index.jsonl` is always compacted.** Write-through rewrites the affected
39//!   type-folder's jsonl in canonical form (one current line per path, recency
40//!   order) rather than appending superseded/tombstone lines, so the jsonl is
41//!   byte-identical to `rebuild_all` *immediately* (a strictly stronger
42//!   guarantee than the SPEC's "post-compaction"). This keeps the loop cost at
43//!   one sidecar read + one rewrite per touched type-folder — O(folder), the
44//!   sanctioned loop primitive, never a whole-`Store::walk`.
45//! - **Root/layer entry styling** follows plan §index (`(N)` numeric counts;
46//!   layer headings in the root carry the layer's total count) which is more
47//!   specific than the SPEC's illustrative `(42 files)` prose example. Type
48//!   folders are listed alphabetically (a deterministic order a derived artifact
49//!   needs); `scope: type-folder` follows the conventions list, not the one
50//!   SPEC example that wrote `scope: folder`.
51
52use std::collections::BTreeMap;
53use std::fs;
54use std::io::Write as _;
55use std::path::{Path, PathBuf};
56
57use chrono::{DateTime, FixedOffset, SecondsFormat};
58use serde::{Deserialize, Serialize};
59use serde_json::Value;
60
61use crate::parser::FolderMeta;
62use crate::store::{Layer, Store};
63
64/// The browse-view cap for a type-folder `index.md`.
65const MD_CAP: usize = 500;
66
67/// Placeholder summary for a content file that has no `summary` frontmatter.
68/// The index never invents a real summary — that is `dbmd fm init`'s job; this
69/// marker is what `dbmd validate` keys off (`INDEX`-class issue).
70const MISSING_SUMMARY: &str = "(no summary)";
71
72/// The root `index.md` H1.
73const ROOT_TITLE: &str = "Knowledge base index";
74
75/// Which level of the catalog an [`Index`] represents.
76#[derive(Debug, Clone, PartialEq, Eq)]
77pub enum IndexLevel {
78    /// The store-wide root `index.md` (layers + per-type counts).
79    Root,
80    /// A layer `index.md` (every type-folder under one layer).
81    Layer(Layer),
82    /// A type-folder `index.md` + `index.jsonl` (every file in the folder).
83    TypeFolder(PathBuf),
84}
85
86/// One record in a type-folder's `index.jsonl` — the complete, structured twin
87/// of a single `index.md` browse entry.
88///
89/// `tags` are the document's flat labels; `links` are its concept/relationship
90/// wiki-link targets. Both are copied verbatim from the file — never inferred.
91/// `fields` holds the remaining type-specific frontmatter so the structured
92/// query path can filter on any key without opening the file.
93#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
94pub struct IndexRecord {
95    /// Store-relative path of the file (the upsert key; last-write-wins).
96    /// Serialized with forward slashes regardless of OS (see [`path_serde`]) so
97    /// the `index.jsonl` catalog is byte-portable across platforms.
98    #[serde(with = "path_serde")]
99    pub path: PathBuf,
100    /// The file's `type`.
101    #[serde(rename = "type")]
102    pub type_: String,
103    /// The file's `summary`.
104    pub summary: String,
105    /// The file's flat `tags`.
106    #[serde(default)]
107    pub tags: Vec<String>,
108    /// The file's concept/relationship wiki-link targets (store-relative).
109    #[serde(default)]
110    pub links: Vec<String>,
111    /// `created` timestamp.
112    pub created: Option<DateTime<FixedOffset>>,
113    /// `updated` timestamp (the recency key for the `index.md` cap order).
114    pub updated: Option<DateTime<FixedOffset>>,
115    /// Remaining type-specific frontmatter fields, verbatim.
116    #[serde(flatten)]
117    pub fields: BTreeMap<String, Value>,
118}
119
120/// A built (or being-built) catalog for one [`IndexLevel`], with both rendered
121/// artifacts available. Pure data until written via [`Index::write_level`].
122#[derive(Debug, Clone, PartialEq)]
123pub struct Index {
124    /// Which level this catalog is for.
125    pub level: IndexLevel,
126    /// The complete record set for this level (type-folder level; empty for
127    /// root/layer rollups, which carry only counts).
128    pub records: Vec<IndexRecord>,
129    /// Per-child counts for root/layer rollups (child path → file count).
130    pub child_counts: BTreeMap<PathBuf, usize>,
131}
132
133impl Index {
134    /// Build a type-folder catalog by aggregating across date-shards, producing
135    /// both artifacts. `index.md` selection is recency (updated desc, ties by
136    /// path asc; cap 500 with a `## More` footer over the cap); `index.jsonl`
137    /// holds every file. A file missing `summary` gets a placeholder + a
138    /// validate-detectable issue (the index never invents summaries).
139    pub fn build_type_folder(store: &Store, type_folder: &Path) -> crate::Result<Index> {
140        let rel = normalize_rel(type_folder);
141        let abs = store.root.join(&rel);
142        let mut records = Vec::new();
143        for file_abs in walk_type_folder_files(&abs) {
144            let rel_path =
145                rel_to_store(&store.root, &file_abs).expect("walked file is under the store root");
146            // Abort the build on a malformed file rather than skip it. A skipped
147            // file would still be a content member the validator requires to be
148            // catalogued (`validate::walk_content_files` enumerates by filename,
149            // not by parseability), so silently dropping it would leave the store
150            // in a permanently invalid state (`INDEX_MISSING_ENTRY` /
151            // `INDEX_JSONL_DESYNC` that no rebuild can clear) and would desync the
152            // rollups (`build_layer`/`build_root` count the raw `.md` files). The
153            // loud `?` is the right outcome: `cleanup` now preserves the prior
154            // canonical sidecars (`min_depth(2)`), so an aborted rebuild leaves
155            // the existing catalogs intact and the operator a clear error naming
156            // the file to fix — never a destroyed or silently-wrong index.
157            records.push(record_from_file(&file_abs, rel_path)?);
158        }
159        sort_records(&mut records);
160        Ok(Index {
161            level: IndexLevel::TypeFolder(rel),
162            records,
163            child_counts: BTreeMap::new(),
164        })
165    }
166
167    /// Build a layer catalog: every non-empty type-folder under the layer with
168    /// `(N)` counts and a newest-file `summary` preview (≤ 80 chars), plus the
169    /// **loose records** that live directly at the layer root (files with no
170    /// type-folder between them and the layer). The type-folder rollup is the
171    /// `index.md`; the loose records are the layer's own `index.jsonl` (so
172    /// structured reads — `query`, dedup, `graph` — see a loose file the same
173    /// way they see a canonical one). A layer with no loose files carries no
174    /// `index.jsonl`, so existing stores are byte-unchanged.
175    pub fn build_layer(store: &Store, layer: Layer) -> crate::Result<Index> {
176        let mut child_counts = BTreeMap::new();
177        for tf in type_folders_in_layer(store, layer) {
178            let abs = store.root.join(&tf);
179            let n = walk_type_folder_files(&abs).len();
180            if n > 0 {
181                child_counts.insert(tf, n);
182            }
183        }
184        let mut records = Vec::new();
185        for file_abs in loose_files_in_layer(store, layer) {
186            let rel_path =
187                rel_to_store(&store.root, &file_abs).expect("walked file is under the store root");
188            // Abort on a malformed loose file rather than skip it, mirroring
189            // `build_type_folder`: a skipped file is still a content member the
190            // validator requires to be catalogued, so dropping it would leave a
191            // permanently-invalid index. The loud `?` names the file to fix.
192            records.push(record_from_file(&file_abs, rel_path)?);
193        }
194        sort_records(&mut records);
195        Ok(Index {
196            level: IndexLevel::Layer(layer),
197            records,
198            child_counts,
199        })
200    }
201
202    /// Build the store-wide root catalog: one heading per non-empty layer with
203    /// total count + bulleted per-type sub-entries with `(N)` counts.
204    pub fn build_root(store: &Store) -> crate::Result<Index> {
205        let mut child_counts = BTreeMap::new();
206        for layer in Layer::all() {
207            for tf in type_folders_in_layer(store, layer) {
208                let abs = store.root.join(&tf);
209                let n = walk_type_folder_files(&abs).len();
210                if n > 0 {
211                    child_counts.insert(tf, n);
212                }
213            }
214        }
215        Ok(Index {
216            level: IndexLevel::Root,
217            records: Vec::new(),
218            child_counts,
219        })
220    }
221
222    /// Render this catalog as a canonical `index.md`.
223    pub fn to_markdown(&self) -> String {
224        match &self.level {
225            IndexLevel::TypeFolder(folder) => self.render_type_folder_md(folder),
226            IndexLevel::Layer(layer) => self.render_layer_md(*layer),
227            IndexLevel::Root => self.render_root_md(),
228        }
229    }
230
231    /// Render this catalog's `records` as the complete `index.jsonl` (one JSON
232    /// object per file, stable key order so diffs stay minimal). Used at the
233    /// type-folder level for its files, and at the layer level for the loose
234    /// files that live directly at the layer root. The root rollup carries no
235    /// records, so it never produces a jsonl.
236    pub fn to_jsonl(&self) -> String {
237        let mut out = String::new();
238        for rec in &self.records {
239            // The record type derives a deterministic, sorted key order
240            // (declared fields first, then the flattened `fields` BTreeMap).
241            let line = serde_json::to_string(rec).expect("IndexRecord serializes");
242            out.push_str(&line);
243            out.push('\n');
244        }
245        out
246    }
247
248    // ── rendering helpers ────────────────────────────────────────────────
249
250    fn render_type_folder_md(&self, folder: &Path) -> String {
251        let folder_disp = path_to_unix(folder);
252        let updated = max_updated(self.records.iter().map(|r| r.updated.as_ref()));
253        let mut s = String::new();
254        s.push_str("---\n");
255        s.push_str("type: index\n");
256        s.push_str("scope: type-folder\n");
257        s.push_str(&format!("folder: {folder_disp}\n"));
258        if let Some(ts) = updated {
259            s.push_str(&format!("updated: {}\n", fmt_ts(&ts)));
260        }
261        s.push_str("---\n\n");
262        s.push_str(&format!("# {folder_disp}\n\n"));
263
264        let shown = self.records.len().min(MD_CAP);
265        for rec in self.records.iter().take(shown) {
266            s.push_str(&format_md_entry(rec));
267            s.push('\n');
268        }
269
270        if self.records.len() > MD_CAP {
271            let type_ = self.records.first().map(|r| r.type_.as_str()).unwrap_or("");
272            let layer = folder
273                .components()
274                .next()
275                .and_then(|c| c.as_os_str().to_str())
276                .unwrap_or("");
277            s.push('\n');
278            s.push_str(&more_footer(self.records.len(), type_, layer));
279        }
280        s
281    }
282
283    /// Store-less layer rollup: counts only, no preview / no derived `updated`
284    /// (a layer index needs each child's on-disk jsonl for those — see
285    /// [`render_layer_md_with_store`], the canonical path every disk write
286    /// uses). This pure-data render is structurally identical sans preview.
287    fn render_layer_md(&self, layer: Layer) -> String {
288        let layer_dir = layer_dir_name(layer);
289        let mut s = String::new();
290        s.push_str("---\n");
291        s.push_str("type: index\n");
292        s.push_str("scope: layer\n");
293        s.push_str(&format!("folder: {layer_dir}\n"));
294        s.push_str("---\n\n");
295        s.push_str(&format!("# {layer_dir}\n\n"));
296        for (tf, n) in &self.child_counts {
297            let tf_unix = path_to_unix(tf);
298            let display = capitalize(folder_basename(tf));
299            s.push_str(&format!("- [[{tf_unix}/index|{display}]] ({n})\n"));
300        }
301        s
302    }
303
304    /// Store-less root rollup: counts only (the canonical disk render adds a
305    /// derived `updated` — see [`render_root_md_with_store`]).
306    fn render_root_md(&self) -> String {
307        let mut s = String::new();
308        s.push_str("---\n");
309        s.push_str("type: index\n");
310        s.push_str("scope: root\n");
311        s.push_str("---\n\n");
312        s.push_str(&format!("# {ROOT_TITLE}\n"));
313        for layer in Layer::all() {
314            let layer_dir = layer_dir_name(layer);
315            let prefix = format!("{layer_dir}/");
316            let children: Vec<(&PathBuf, &usize)> = self
317                .child_counts
318                .iter()
319                .filter(|(tf, _)| path_to_unix(tf).starts_with(&prefix))
320                .collect();
321            if children.is_empty() {
322                continue;
323            }
324            let total: usize = children.iter().map(|(_, n)| **n).sum();
325            s.push('\n');
326            s.push_str(&format!("## {} ({total})\n", capitalize(layer_dir)));
327            for (tf, n) in children {
328                let tf_unix = path_to_unix(tf);
329                let display = capitalize(folder_basename(tf));
330                s.push_str(&format!("- [[{tf_unix}/index|{display}]] ({n})\n"));
331            }
332        }
333        s
334    }
335}
336
337// ─────────────────────────────────────────────────────────────────────────
338// Write-through + sweep (free functions on the impl block).
339// ─────────────────────────────────────────────────────────────────────────
340
341impl Index {
342    /// **Write-through (loop, O(changed)).** Upsert a new/updated content file.
343    /// Reads the affected type-folder's `index.jsonl` (the sanctioned per-folder
344    /// sidecar read — never a whole-store walk), applies the change, and
345    /// atomically rewrites that folder's `index.md` + `index.jsonl` plus the
346    /// parent layer + root rollups so the artifacts equal a `rebuild_all` over
347    /// the same end state.
348    pub fn on_write(store: &Store, file: &Path) -> crate::Result<()> {
349        let file_rel = normalize_rel(file);
350        // The generated catalog files are not content — never upsert one into
351        // itself. `build_type_folder`'s walk already excludes `index.md`
352        // (`walk_type_folder_files`); the loop path must apply the same
353        // exclusion or editing `index.md` via `fm set` inserts a phantom
354        // self-row, inflating every `(N)` count and breaking the
355        // write-through == rebuild byte-identity invariant.
356        if is_index_artifact(&file_rel) {
357            return Ok(());
358        }
359        // A loose file (directly at a layer root, no type-folder) is catalogued
360        // in its layer's own `index.jsonl`; the layer `index.md` rollup is
361        // unaffected (loose files do not change type-folder counts).
362        if let Some(layer) = loose_layer_of(&file_rel) {
363            return apply_loose_change(store, layer, &file_rel, false);
364        }
365        let file_abs = store.root.join(&file_rel);
366        let folder = type_folder_of(&file_rel)
367            .ok_or_else(|| bad_index(&file_rel, "file is not inside a layer/type-folder"))?;
368        let record = record_from_file(&file_abs, file_rel.clone())?;
369
370        // Serialize the sidecar read-modify-write so concurrent sanctioned
371        // writes to this folder don't clobber each other's rows (lost update).
372        let _lock = FolderLock::acquire(&store.root.join(&folder));
373        let mut records = read_jsonl_records(&store.root.join(&folder).join("index.jsonl"))?;
374        records.retain(|r| r.path != record.path);
375        records.push(record);
376        sort_records(&mut records);
377
378        write_type_folder_artifacts(store, &folder, &records)?;
379        update_parents(store, &folder)?;
380        Ok(())
381    }
382
383    /// **Write-through (loop, O(changed)).** Move a file's entry between
384    /// type-folder indexes (or within, if the same folder) in both `index.md`
385    /// and `index.jsonl`, fixing counts on both sides.
386    pub fn on_rename(store: &Store, old: &Path, new: &Path) -> crate::Result<()> {
387        let old_rel = normalize_rel(old);
388        let new_rel = normalize_rel(new);
389        // Index artifacts are generated, not catalogued — a rename of/into one
390        // is not a content move (same reasoning as `on_write`). Skip rather than
391        // insert a phantom self-row.
392        if is_index_artifact(&old_rel) || is_index_artifact(&new_rel) {
393            return Ok(());
394        }
395        // If either side is a loose file (layer root, no type-folder), decompose
396        // into remove-old + add-new: each entry point routes to the correct
397        // catalog (the layer `index.jsonl` for a loose side, the type-folder for
398        // the other), giving the same end state as the cross-folder path below
399        // while reusing the tested single-file paths.
400        if loose_layer_of(&old_rel).is_some() || loose_layer_of(&new_rel).is_some() {
401            Self::on_remove(store, &old_rel)?;
402            Self::on_write(store, &new_rel)?;
403            return Ok(());
404        }
405        let old_folder = type_folder_of(&old_rel)
406            .ok_or_else(|| bad_index(&old_rel, "source is not inside a layer/type-folder"))?;
407        let new_folder = type_folder_of(&new_rel)
408            .ok_or_else(|| bad_index(&new_rel, "target is not inside a layer/type-folder"))?;
409
410        // Serialize the sidecar read-modify-write(s). For a cross-folder rename,
411        // lock BOTH folders, always in sorted order, so two renames touching the
412        // same pair can't deadlock. Held for the whole operation via RAII.
413        let _locks = lock_folders(store, &old_folder, &new_folder);
414
415        // Drop from the old folder.
416        let mut old_records =
417            read_jsonl_records(&store.root.join(&old_folder).join("index.jsonl"))?;
418        old_records.retain(|r| r.path != old_rel);
419
420        if old_folder == new_folder {
421            // Same folder: re-read the (now-renamed) file and upsert.
422            let record = record_from_file(&store.root.join(&new_rel), new_rel.clone())?;
423            old_records.retain(|r| r.path != record.path);
424            old_records.push(record);
425            sort_records(&mut old_records);
426            write_type_folder_artifacts(store, &old_folder, &old_records)?;
427            update_parents(store, &old_folder)?;
428            return Ok(());
429        }
430
431        // Cross-folder: write the trimmed old folder (or drop its indexes if
432        // now empty), then upsert into the new folder.
433        sort_records(&mut old_records);
434        write_type_folder_artifacts(store, &old_folder, &old_records)?;
435
436        let record = record_from_file(&store.root.join(&new_rel), new_rel.clone())?;
437        let mut new_records =
438            read_jsonl_records(&store.root.join(&new_folder).join("index.jsonl"))?;
439        new_records.retain(|r| r.path != record.path);
440        new_records.push(record);
441        sort_records(&mut new_records);
442        write_type_folder_artifacts(store, &new_folder, &new_records)?;
443
444        update_parents(store, &old_folder)?;
445        update_parents(store, &new_folder)?;
446        Ok(())
447    }
448
449    /// **Write-through (loop, O(changed)).** Drop a file's entry from both
450    /// `index.md` and `index.jsonl`; decrement counts; if the browse view drops
451    /// below the cap, the next-most-recent is already present in the complete
452    /// jsonl record set and re-renders into the md automatically.
453    pub fn on_remove(store: &Store, file: &Path) -> crate::Result<()> {
454        let file_rel = normalize_rel(file);
455        // Removing a generated catalog artifact is not a content removal; it has
456        // no row to drop (it was never catalogued). Skip, mirroring `on_write`.
457        if is_index_artifact(&file_rel) {
458            return Ok(());
459        }
460        // Loose file → drop its row from the layer `index.jsonl`.
461        if let Some(layer) = loose_layer_of(&file_rel) {
462            return apply_loose_change(store, layer, &file_rel, true);
463        }
464        let folder = type_folder_of(&file_rel)
465            .ok_or_else(|| bad_index(&file_rel, "file is not inside a layer/type-folder"))?;
466        // Serialize the sidecar read-modify-write (see `on_write`).
467        let _lock = FolderLock::acquire(&store.root.join(&folder));
468        let mut records = read_jsonl_records(&store.root.join(&folder).join("index.jsonl"))?;
469        let before = records.len();
470        records.retain(|r| r.path != file_rel);
471        if records.len() == before {
472            // Nothing to remove; still normalize the folder + parents so the
473            // artifacts stay canonical.
474        }
475        sort_records(&mut records);
476        write_type_folder_artifacts(store, &folder, &records)?;
477        update_parents(store, &folder)?;
478        Ok(())
479    }
480
481    /// **SWEEP repair.** Walk the store once and atomically (re)write root +
482    /// every non-empty layer + every non-empty type-folder `index.md` and
483    /// `index.jsonl` (compacting the jsonl). Also runs [`Index::cleanup`].
484    pub fn rebuild_all(store: &Store) -> crate::Result<()> {
485        Index::cleanup(store)?;
486        for layer in Layer::all() {
487            for tf in type_folders_in_layer(store, layer) {
488                let idx = Index::build_type_folder(store, &tf)?;
489                if idx.records.is_empty() {
490                    continue;
491                }
492                write_type_folder_artifacts(store, &tf, &idx.records)?;
493            }
494            let layer_idx = Index::build_layer(store, layer)?;
495            let layer_index_md = store.root.join(layer_dir_name(layer)).join("index.md");
496            if layer_idx.child_counts.is_empty() {
497                remove_if_exists(&layer_index_md)?;
498            } else {
499                write_atomic(
500                    &layer_index_md,
501                    render_layer_md_with_store(store, &layer_idx),
502                )?;
503            }
504            // The layer's own `index.jsonl` — present iff the layer has loose
505            // files directly at its root. Independent of the rollup above: a
506            // layer can have loose files but no type-folders, or vice versa.
507            write_layer_jsonl(store, layer, &layer_idx.records)?;
508        }
509        let root_idx = Index::build_root(store)?;
510        let root_index_md = store.root.join("index.md");
511        if root_idx.child_counts.is_empty() {
512            remove_if_exists(&root_index_md)?;
513        } else {
514            write_atomic(&root_index_md, render_root_md_with_store(store, &root_idx))?;
515        }
516        Ok(())
517    }
518
519    /// Rebuild ONE type-folder's `index.md`/`index.jsonl` from a fresh walk, then
520    /// cascade the new child count up to the layer and root rollups — so a
521    /// scoped `dbmd index rebuild --folder` leaves the hierarchy consistent,
522    /// exactly like `rebuild_all` and the loop-path `on_write` already do.
523    /// (Writing only the folder, as the CLI used to, left stale layer/root
524    /// counts that `validate` would then flag as an index desync.)
525    pub fn rebuild_folder(store: &Store, folder: &Path) -> crate::Result<()> {
526        Self::write_level(store, &IndexLevel::TypeFolder(folder.to_path_buf()))?;
527        update_parents(store, folder)
528    }
529
530    /// Atomically write a single level's artifact(s) to disk.
531    pub fn write_level(store: &Store, level: &IndexLevel) -> crate::Result<()> {
532        match level {
533            IndexLevel::TypeFolder(folder) => {
534                let idx = Index::build_type_folder(store, folder)?;
535                if idx.records.is_empty() {
536                    remove_if_exists(&store.root.join(folder).join("index.md"))?;
537                    remove_if_exists(&store.root.join(folder).join("index.jsonl"))?;
538                } else {
539                    write_type_folder_artifacts(store, folder, &idx.records)?;
540                }
541            }
542            IndexLevel::Layer(layer) => {
543                let idx = Index::build_layer(store, *layer)?;
544                let p = store.root.join(layer_dir_name(*layer)).join("index.md");
545                if idx.child_counts.is_empty() {
546                    remove_if_exists(&p)?;
547                } else {
548                    write_atomic(&p, render_layer_md_with_store(store, &idx))?;
549                }
550                write_layer_jsonl(store, *layer, &idx.records)?;
551            }
552            IndexLevel::Root => {
553                let idx = Index::build_root(store)?;
554                let p = store.root.join("index.md");
555                if idx.child_counts.is_empty() {
556                    remove_if_exists(&p)?;
557                } else {
558                    write_atomic(&p, render_root_md_with_store(store, &idx))?;
559                }
560            }
561        }
562        Ok(())
563    }
564
565    /// Render the generated indexes to a string with `--- <path> ---`
566    /// separators instead of writing them (`--dry-run`).
567    pub fn render_dry_run(store: &Store, level: &IndexLevel) -> crate::Result<String> {
568        let mut out = String::new();
569        match level {
570            IndexLevel::TypeFolder(folder) => {
571                let idx = Index::build_type_folder(store, folder)?;
572                let md_path = path_to_unix(&folder.join("index.md"));
573                let jsonl_path = path_to_unix(&folder.join("index.jsonl"));
574                out.push_str(&format!("--- {md_path} ---\n"));
575                out.push_str(&idx.to_markdown());
576                out.push_str(&format!("--- {jsonl_path} ---\n"));
577                out.push_str(&idx.to_jsonl());
578            }
579            IndexLevel::Layer(layer) => {
580                let idx = Index::build_layer(store, *layer)?;
581                let md_path = format!("{}/index.md", layer_dir_name(*layer));
582                out.push_str(&format!("--- {md_path} ---\n"));
583                out.push_str(&render_layer_md_with_store(store, &idx));
584            }
585            IndexLevel::Root => {
586                let idx = Index::build_root(store)?;
587                out.push_str("--- index.md ---\n");
588                out.push_str(&render_root_md_with_store(store, &idx));
589            }
590        }
591        Ok(out)
592    }
593
594    /// Cleanup pass (part of [`Index::rebuild_all`]): delete `index.md` /
595    /// `index.jsonl` in non-canonical folders (date-shards that should carry
596    /// none). Symmetric with index creation.
597    ///
598    /// **Only deletes generated catalog artifacts, never user content.** Two
599    /// guards keep this from eating data:
600    /// - `min_depth(2)` so the walk starts *below* the type-folder root — the
601    ///   canonical `<type-folder>/index.md` + `index.jsonl` are never targeted
602    ///   here (they are rewritten by the per-folder builders, or removed only
603    ///   when the folder is genuinely empty, in the dedicated branch below). The
604    ///   old `min_depth(1)` deleted them up front, so a rebuild aborted by one
605    ///   malformed file left every type-folder catalog destroyed.
606    /// - [`is_deletable_catalog_artifact`] confirms a shard-level `index.md` is
607    ///   an actual generated catalog (or stale/garbage leftover), NOT a content
608    ///   file a user wrote at that name (e.g. `dbmd write …/index.md --type
609    ///   email`, plausible when mirroring a website/doc export). Matching by
610    ///   filename alone silently deleted such records on the next rebuild.
611    pub fn cleanup(store: &Store) -> crate::Result<()> {
612        for layer in Layer::all() {
613            let layer_dir = store.root.join(layer_dir_name(layer));
614            if !layer_dir.is_dir() {
615                continue;
616            }
617            for tf in type_folders_in_layer(store, layer) {
618                let tf_abs = store.root.join(&tf);
619                // Any generated index inside a shard (below the type-folder
620                // root) is non-canonical: delete it. Never touch a user content
621                // file that merely happens to be named index.md.
622                for entry in walkdir::WalkDir::new(&tf_abs)
623                    .min_depth(2)
624                    .into_iter()
625                    .filter_map(|e| e.ok())
626                {
627                    let p = entry.path();
628                    if is_index_artifact(p) && is_deletable_catalog_artifact(p) {
629                        remove_if_exists(p)?;
630                    }
631                }
632                // Empty type-folder → no index at its root either. Same content
633                // guard: an `index.md` here that is actually a user record (the
634                // only file in the folder) is preserved, not deleted.
635                if walk_type_folder_files(&tf_abs).is_empty() {
636                    let md = tf_abs.join("index.md");
637                    if is_deletable_catalog_artifact(&md) {
638                        remove_if_exists(&md)?;
639                    }
640                    remove_if_exists(&tf_abs.join("index.jsonl"))?;
641                }
642            }
643        }
644        Ok(())
645    }
646}
647
648// ─────────────────────────────────────────────────────────────────────────
649// Private free helpers — all self-contained, none call back into Store/parser.
650// ─────────────────────────────────────────────────────────────────────────
651
652/// Write both artifacts for a type-folder, or delete them if the folder is now
653/// empty. The single funnel both write-through and rebuild go through, so their
654/// output is byte-identical by construction.
655fn write_type_folder_artifacts(
656    store: &Store,
657    folder: &Path,
658    records: &[IndexRecord],
659) -> crate::Result<()> {
660    let folder_abs = store.root.join(folder);
661    let md_path = folder_abs.join("index.md");
662    let jsonl_path = folder_abs.join("index.jsonl");
663    if records.is_empty() {
664        remove_if_exists(&md_path)?;
665        remove_if_exists(&jsonl_path)?;
666        return Ok(());
667    }
668    let idx = Index {
669        level: IndexLevel::TypeFolder(folder.to_path_buf()),
670        records: records.to_vec(),
671        child_counts: BTreeMap::new(),
672    };
673    write_atomic(&md_path, idx.to_markdown())?;
674    write_atomic(&jsonl_path, idx.to_jsonl())?;
675    Ok(())
676}
677
678/// Re-render the layer + root rollups that sit above `folder` — the
679/// **loop path**, O(changed). Counts + previews come from the type-folders'
680/// on-disk `index.jsonl` sidecars ([`collect_child_stats`]), NOT from a
681/// content-tree walk: a single write reads one sidecar per type-folder (shared
682/// across the layer and root rollups) — never the millions of files under the
683/// shards. `build_layer` / `build_root` (which *do* walk the content tree) are
684/// reserved for the from-scratch sweeps ([`Index::rebuild_all`],
685/// [`Index::write_level`], [`Index::render_dry_run`]). The result is
686/// byte-identical to those builders because in the loop — exactly as in
687/// `rebuild_all` — every touched folder's jsonl is rewritten before its parents
688/// are rolled up, so the per-folder stat (`count` / `newest`) equals what a
689/// from-scratch walk would compute.
690fn update_parents(store: &Store, folder: &Path) -> crate::Result<()> {
691    // Read every type-folder's sidecar EXACTLY ONCE into a stat cache (`count` +
692    // `newest` record), then render both rollups from the cache. This removed the
693    // old 2–3×-per-write reparse (`child_counts_from_jsonl` for a count, plus
694    // `render_layer_md_with_store` / `render_root_md_with_store` each doing a full
695    // `read_jsonl_records` parse + sort just to take `.first()`); the output stays
696    // byte-identical (`count` == `read_jsonl_records().len()`, `newest` == its
697    // `.first()`).
698    //
699    // COST, stated honestly: this is `O(total catalogued records)` per write, NOT
700    // `O(changed)`. `collect_child_stats` reads and line-parses EVERY type-folder
701    // sidecar in the store to recompute the rollups, so a single high-volume
702    // folder (months of ingested emails) makes an unrelated tiny write scan that
703    // whole sidecar (a ~50× slowdown at ~200k records was measured). The crate's
704    // literal `Store::walk` guard holds — this reads `index.jsonl` sidecars, not
705    // the content tree — but the broader `O(changed)` complexity the loop path
706    // advertises is NOT met here. Restoring true `O(changed)` needs a persisted
707    // per-folder stat cache (or an in-place rollup patch for `on_write`); that is
708    // a deliberate change to the catalog hot path, tracked as a follow-up, not
709    // done inline. Until then, do not describe this op as `O(changed)`.
710    let stats = collect_child_stats(store, &Layer::all())?;
711
712    let layer = folder
713        .components()
714        .next()
715        .and_then(|c| c.as_os_str().to_str())
716        .and_then(layer_from_dir_name);
717    if let Some(layer) = layer {
718        let p = store.root.join(layer_dir_name(layer)).join("index.md");
719        if layer_has_children(&stats, layer) {
720            write_atomic(
721                &p,
722                render_layer_md_from_stats(layer, &stats, &store.config.folders),
723            )?;
724        } else {
725            remove_if_exists(&p)?;
726        }
727    }
728    let rp = store.root.join("index.md");
729    if stats.values().any(|s| s.count > 0) {
730        write_atomic(
731            &rp,
732            render_root_md_from_stats(&stats, &store.config.folders),
733        )?;
734    } else {
735        remove_if_exists(&rp)?;
736    }
737    Ok(())
738}
739
740/// True if `layer` has at least one non-empty child type-folder in `stats`.
741fn layer_has_children(stats: &BTreeMap<PathBuf, FolderStat>, layer: Layer) -> bool {
742    let prefix = format!("{}/", layer_dir_name(layer));
743    stats
744        .iter()
745        .any(|(tf, s)| s.count > 0 && path_to_unix(tf).starts_with(&prefix))
746}
747
748/// Render a layer `index.md` from the prebuilt per-folder stat cache — each
749/// child's count + newest summary/updated come from its single cached sidecar
750/// read, so the rollup matches the folder artifacts exactly (write-through and
751/// rebuild alike) without re-reading any sidecar.
752fn render_layer_md_from_stats(
753    layer: Layer,
754    stats: &BTreeMap<PathBuf, FolderStat>,
755    folders: &BTreeMap<String, FolderMeta>,
756) -> String {
757    let layer_dir = layer_dir_name(layer);
758    let prefix = format!("{layer_dir}/");
759    let mut max_upd: Option<DateTime<FixedOffset>> = None;
760    let mut entries = String::new();
761    for (tf, stat) in stats {
762        if stat.count == 0 || !path_to_unix(tf).starts_with(&prefix) {
763            continue;
764        }
765        if let Some(u) = stat.newest.as_ref().and_then(|r| r.updated) {
766            max_upd = Some(match max_upd {
767                Some(cur) if cur >= u => cur,
768                _ => u,
769            });
770        }
771        let tf_unix = path_to_unix(tf);
772        let (display, description) = folder_label(&tf_unix, folder_basename(tf), folders);
773        entries.push_str(&folder_entry(&tf_unix, &display, stat.count, description));
774    }
775    let mut s = String::new();
776    s.push_str("---\n");
777    s.push_str("type: index\n");
778    s.push_str("scope: layer\n");
779    s.push_str(&format!("folder: {layer_dir}\n"));
780    if let Some(ts) = max_upd {
781        s.push_str(&format!("updated: {}\n", fmt_ts(&ts)));
782    }
783    s.push_str("---\n\n");
784    s.push_str(&format!("# {layer_dir}\n\n"));
785    s.push_str(&entries);
786    s
787}
788
789/// Render the root `index.md` from the prebuilt per-folder stat cache.
790fn render_root_md_from_stats(
791    stats: &BTreeMap<PathBuf, FolderStat>,
792    folders: &BTreeMap<String, FolderMeta>,
793) -> String {
794    let mut max_upd: Option<DateTime<FixedOffset>> = None;
795    for stat in stats.values() {
796        if stat.count == 0 {
797            continue;
798        }
799        if let Some(u) = stat.newest.as_ref().and_then(|r| r.updated) {
800            max_upd = Some(match max_upd {
801                Some(cur) if cur >= u => cur,
802                _ => u,
803            });
804        }
805    }
806    let mut s = String::new();
807    s.push_str("---\n");
808    s.push_str("type: index\n");
809    s.push_str("scope: root\n");
810    if let Some(ts) = max_upd {
811        s.push_str(&format!("updated: {}\n", fmt_ts(&ts)));
812    }
813    s.push_str("---\n\n");
814    s.push_str(&format!("# {ROOT_TITLE}\n"));
815    for layer in Layer::all() {
816        let layer_dir = layer_dir_name(layer);
817        let prefix = format!("{layer_dir}/");
818        let children: Vec<(&PathBuf, usize)> = stats
819            .iter()
820            .filter(|(tf, s)| s.count > 0 && path_to_unix(tf).starts_with(&prefix))
821            .map(|(tf, s)| (tf, s.count))
822            .collect();
823        if children.is_empty() {
824            continue;
825        }
826        let total: usize = children.iter().map(|(_, n)| *n).sum();
827        s.push('\n');
828        s.push_str(&format!("## {} ({total})\n", capitalize(layer_dir)));
829        for (tf, n) in children {
830            let tf_unix = path_to_unix(tf);
831            let (display, description) = folder_label(&tf_unix, folder_basename(tf), folders);
832            s.push_str(&folder_entry(&tf_unix, &display, n, description));
833        }
834    }
835    s
836}
837
838/// Render a layer `index.md`, reading each child's newest summary + max-updated
839/// straight from its on-disk `index.jsonl` (so the rollup matches the folder
840/// artifacts exactly, write-through and rebuild alike). The **sweep-path**
841/// renderer used by [`Index::rebuild_all`] / [`Index::write_level`] /
842/// [`Index::render_dry_run`]; the loop path uses the cache-based
843/// [`render_layer_md_from_stats`] to avoid re-reading sidecars.
844fn render_layer_md_with_store(store: &Store, idx: &Index) -> String {
845    let layer = match idx.level {
846        IndexLevel::Layer(l) => l,
847        _ => unreachable!("render_layer_md_with_store called on non-layer"),
848    };
849    let layer_dir = layer_dir_name(layer);
850    let mut max_upd: Option<DateTime<FixedOffset>> = None;
851    let mut entries = String::new();
852    for (tf, n) in &idx.child_counts {
853        let recs = read_jsonl_records(&store.root.join(tf).join("index.jsonl")).unwrap_or_default();
854        let newest = recs.first();
855        if let Some(u) = newest.and_then(|r| r.updated) {
856            max_upd = Some(match max_upd {
857                Some(cur) if cur >= u => cur,
858                _ => u,
859            });
860        }
861        let tf_unix = path_to_unix(tf);
862        let (display, description) =
863            folder_label(&tf_unix, folder_basename(tf), &store.config.folders);
864        entries.push_str(&folder_entry(&tf_unix, &display, *n, description));
865    }
866    let mut s = String::new();
867    s.push_str("---\n");
868    s.push_str("type: index\n");
869    s.push_str("scope: layer\n");
870    s.push_str(&format!("folder: {layer_dir}\n"));
871    if let Some(ts) = max_upd {
872        s.push_str(&format!("updated: {}\n", fmt_ts(&ts)));
873    }
874    s.push_str("---\n\n");
875    s.push_str(&format!("# {layer_dir}\n\n"));
876    s.push_str(&entries);
877    s
878}
879
880/// Render the root `index.md`, taking each child's max-updated from its on-disk
881/// `index.jsonl`. The **sweep-path** renderer (the loop path uses
882/// [`render_root_md_from_stats`]).
883fn render_root_md_with_store(store: &Store, idx: &Index) -> String {
884    let mut max_upd: Option<DateTime<FixedOffset>> = None;
885    for tf in idx.child_counts.keys() {
886        let recs = read_jsonl_records(&store.root.join(tf).join("index.jsonl")).unwrap_or_default();
887        if let Some(u) = recs.first().and_then(|r| r.updated) {
888            max_upd = Some(match max_upd {
889                Some(cur) if cur >= u => cur,
890                _ => u,
891            });
892        }
893    }
894    let mut s = String::new();
895    s.push_str("---\n");
896    s.push_str("type: index\n");
897    s.push_str("scope: root\n");
898    if let Some(ts) = max_upd {
899        s.push_str(&format!("updated: {}\n", fmt_ts(&ts)));
900    }
901    s.push_str("---\n\n");
902    s.push_str(&format!("# {ROOT_TITLE}\n"));
903    for layer in Layer::all() {
904        let layer_dir = layer_dir_name(layer);
905        let prefix = format!("{layer_dir}/");
906        let children: Vec<(&PathBuf, &usize)> = idx
907            .child_counts
908            .iter()
909            .filter(|(tf, _)| path_to_unix(tf).starts_with(&prefix))
910            .collect();
911        if children.is_empty() {
912            continue;
913        }
914        let total: usize = children.iter().map(|(_, n)| **n).sum();
915        s.push('\n');
916        s.push_str(&format!("## {} ({total})\n", capitalize(layer_dir)));
917        for (tf, n) in children {
918            let tf_unix = path_to_unix(tf);
919            let (display, description) =
920                folder_label(&tf_unix, folder_basename(tf), &store.config.folders);
921            s.push_str(&folder_entry(&tf_unix, &display, *n, description));
922        }
923    }
924    s
925}
926
927/// One `index.md` browse line: `- [[path]] — summary  ·  #tag #tag` (the
928/// `  ·  #…` suffix omitted when the file has no tags). The wiki-link target is
929/// the canonical **bare** store-relative path (no `.md` extension — the
930/// doctrine the writers emit and `validate` enforces via
931/// `WIKI_LINK_HAS_EXTENSION`); the jsonl `path` keeps the real on-disk name.
932fn format_md_entry(rec: &IndexRecord) -> String {
933    let path = wiki_target(&rec.path);
934    // Collapse the summary to a single line before interpolating it into the
935    // one-line browse entry. A hand-written file may legally carry a YAML block
936    // scalar (`summary: |-`) whose value spans multiple lines; rendered verbatim
937    // those embedded newlines break the line-oriented `index.md` format and can
938    // forge a standalone catalog entry (`\n- [[…|Click me]] — injected`). The
939    // CLI writers already collapse whitespace; do the same here so the spec's
940    // primary write path (agents writing files directly) can't corrupt the
941    // catalog.
942    let summary = collapse_whitespace(&rec.summary);
943    let mut line = format!("- [[{path}]] — {summary}");
944    if !rec.tags.is_empty() {
945        let tags = rec
946            .tags
947            .iter()
948            .map(|t| format!("#{t}"))
949            .collect::<Vec<_>>()
950            .join(" ");
951        line.push_str(&format!("  ·  {tags}"));
952    }
953    line
954}
955
956/// The deterministic `## More` footer for an over-cap type-folder.
957fn more_footer(total: usize, type_: &str, layer: &str) -> String {
958    format!(
959        "## More\n\nThis folder has {total} files. The {MD_CAP} most recent are listed above.\nUse `dbmd index query --type {type_} --in {layer}` for the complete catalog.\n"
960    )
961}
962
963/// Canonical total order: `updated` descending (None sorts last), ties broken
964/// by store-relative path ascending. A *total* order, so write-through and
965/// rebuild never disagree on #500 vs #501.
966fn sort_records(records: &mut [IndexRecord]) {
967    records.sort_by(record_recency_cmp);
968}
969
970impl IndexRecord {
971    /// Build the [`IndexRecord`] a freshly-rebuilt `index.jsonl` *should* hold
972    /// for the file at `abs` (catalogued under store-relative `rel`).
973    ///
974    /// This is the single canonical projection from frontmatter → sidecar
975    /// record: [`Index::build_type_folder`] uses the same path to write the
976    /// jsonl, so the validator can rebuild the expected record here and compare
977    /// it field-for-field against the committed line — covering **every**
978    /// queryable/dedup field the query path reads (`summary`, `type`, `tags`,
979    /// `links`, `created`, `updated`, and every type-specific `fields` entry
980    /// like `email` / `domain` / `company` / `amount` / `vendor`) without the
981    /// validator hand-rolling (and drifting from) the projection per field.
982    pub(crate) fn expected_from_file(abs: &Path, rel: PathBuf) -> crate::Result<IndexRecord> {
983        record_from_file(abs, rel)
984    }
985}
986
987/// Build an [`IndexRecord`] from a file on disk. Missing `summary` →
988/// [`MISSING_SUMMARY`] placeholder (the index never invents a summary).
989fn record_from_file(abs: &Path, rel: PathBuf) -> crate::Result<IndexRecord> {
990    let mut meta = read_frontmatter(abs)?;
991    // Records carry an effective `meta-type` in the catalog: the declared value
992    // (already spilled into `fields` by `read_frontmatter`), or the default
993    // `fact` when absent — so `--where meta-type=fact` sees un-annotated records.
994    // Sources are evidence and carry no meta-type.
995    if rel.starts_with("records") {
996        meta.fields
997            .entry("meta-type".to_string())
998            .or_insert_with(|| Value::String("fact".to_string()));
999    }
1000    Ok(IndexRecord {
1001        path: rel,
1002        type_: meta.type_.unwrap_or_default(),
1003        summary: meta.summary.unwrap_or_else(|| MISSING_SUMMARY.to_string()),
1004        tags: meta.tags,
1005        links: meta.links,
1006        created: meta.created,
1007        updated: meta.updated,
1008        fields: meta.fields,
1009    })
1010}
1011
1012/// The slice of a frontmatter this module needs.
1013struct FileMeta {
1014    type_: Option<String>,
1015    summary: Option<String>,
1016    tags: Vec<String>,
1017    links: Vec<String>,
1018    created: Option<DateTime<FixedOffset>>,
1019    updated: Option<DateTime<FixedOffset>>,
1020    fields: BTreeMap<String, Value>,
1021}
1022
1023/// Minimal frontmatter read: split the leading `---`…`---` block and parse it
1024/// as YAML, extracting the typed fields and spilling the rest into `fields`.
1025/// Self-contained (does not route through the `parser` module).
1026///
1027/// **Body bytes are never required to be UTF-8.** `sources/` is "preserved
1028/// verbatim" per the SPEC and routinely carries non-UTF-8 imports (Latin-1
1029/// emails dropped in by `rsync`/`mbsync`/`cp`); the body can hold any byte. We
1030/// read the file as raw bytes and lossily decode *only* the leading frontmatter
1031/// region, so a stray non-UTF-8 byte in the body can never abort the projection
1032/// (the old `fs::read_to_string` failed on the first such byte anywhere in the
1033/// file, taking a whole `rebuild_all` / write-through down with it). The
1034/// frontmatter itself is expected to be UTF-8; if it isn't, `U+FFFD` markers
1035/// surface in the parsed values rather than a hard abort.
1036fn read_frontmatter(abs: &Path) -> crate::Result<FileMeta> {
1037    let bytes = fs::read(abs)?;
1038    let yaml = extract_frontmatter_block_lossy(&bytes).unwrap_or_default();
1039    let map: serde_norway::Mapping = if yaml.trim().is_empty() {
1040        serde_norway::Mapping::new()
1041    } else {
1042        serde_norway::from_str(&yaml).map_err(|e| {
1043            crate::Error::Store(crate::store::StoreError::BadTypeIndex {
1044                path: abs.to_path_buf(),
1045                message: format!("frontmatter YAML: {e}"),
1046            })
1047        })?
1048    };
1049
1050    let mut type_ = None;
1051    let mut summary = None;
1052    let mut tags = Vec::new();
1053    let mut links = Vec::new();
1054    let mut created = None;
1055    let mut updated = None;
1056    let mut fields = BTreeMap::new();
1057
1058    for (k, v) in map {
1059        let key = match k.as_str() {
1060            Some(s) => s.to_string(),
1061            None => continue,
1062        };
1063        match key.as_str() {
1064            // `type` and `summary` are coerced with the SAME scalar rule the
1065            // validator applies (`validate::scalar_string`: String/Number/Bool →
1066            // string). A bare `v.as_str()` returns `None` for an unquoted numeric
1067            // or boolean scalar (`summary: 2026`, `type: true`), so the index
1068            // would write the `(no summary)` / empty-type placeholder while
1069            // `dbmd validate` reads the file as HAVING that summary/type —
1070            // yielding a permanently-unfixable `INDEX_SUMMARY_MISMATCH` (every
1071            // rebuild reproduces the same mismatched placeholder). Coercing here
1072            // keeps the writer and the validator byte-for-byte in agreement.
1073            "type" => type_ = scalar_string(&v),
1074            "summary" => summary = scalar_string(&v),
1075            "tags" => tags = yaml_string_list(&v),
1076            "links" => links = yaml_string_list(&v),
1077            "created" => created = v.as_str().and_then(parse_ts),
1078            "updated" => updated = v.as_str().and_then(parse_ts),
1079            // `path`, `type`, `summary`, `tags`, `links`, `created`, `updated`
1080            // are the reserved IndexRecord keys; everything else (including
1081            // `id`, `status`, type-specific fields) goes to `fields`.
1082            "path" => {}
1083            _ => {
1084                fields.insert(key, yaml_to_json_value(&v));
1085            }
1086        }
1087    }
1088
1089    Ok(FileMeta {
1090        type_,
1091        summary,
1092        tags,
1093        links,
1094        created,
1095        updated,
1096        fields,
1097    })
1098}
1099
1100/// A YAML scalar (`String`/`Number`/`Bool`) rendered as a string; `None` for
1101/// sequences/mappings/null. **Must stay identical to `validate::scalar_string`**
1102/// so the index writer and the validator coerce `type`/`summary` the same way
1103/// (see [`read_frontmatter`]); an unquoted `summary: 2026` becomes `"2026"` in
1104/// both, not a placeholder here and a real value there.
1105fn scalar_string(v: &serde_norway::Value) -> Option<String> {
1106    match v {
1107        serde_norway::Value::String(s) => Some(s.clone()),
1108        serde_norway::Value::Number(n) => Some(n.to_string()),
1109        serde_norway::Value::Bool(b) => Some(b.to_string()),
1110        _ => None,
1111    }
1112}
1113
1114/// Lossily decode the leading frontmatter region of a file given its raw bytes,
1115/// then pull the YAML between the opening `---` and the next `---`. Only the
1116/// frontmatter region needs to be valid UTF-8 in practice; the body may carry
1117/// arbitrary bytes (a verbatim `sources/` import). Returns `None` when the file
1118/// has no frontmatter fence at its very start.
1119fn extract_frontmatter_block_lossy(bytes: &[u8]) -> Option<String> {
1120    // Decode lossily so a non-UTF-8 body byte never aborts the read. The
1121    // frontmatter is at the very start of the file, so a lossy whole-file decode
1122    // is correct for extracting it (and cheap relative to the YAML parse). A
1123    // leading UTF-8 BOM is stripped by `extract_frontmatter_block`.
1124    let text = String::from_utf8_lossy(bytes);
1125    extract_frontmatter_block(&text)
1126}
1127
1128/// Pull the YAML between a leading `---` line and the next `---` line. Returns
1129/// `None` when the file has no frontmatter fence at its very start.
1130fn extract_frontmatter_block(text: &str) -> Option<String> {
1131    let trimmed = text.strip_prefix('\u{feff}').unwrap_or(text);
1132    let mut lines = trimmed.lines();
1133    let first = lines.next()?;
1134    if first.trim_end() != "---" {
1135        return None;
1136    }
1137    let mut block = String::new();
1138    for line in lines {
1139        if line.trim_end() == "---" {
1140            return Some(block);
1141        }
1142        block.push_str(line);
1143        block.push('\n');
1144    }
1145    None // no closing fence
1146}
1147
1148/// Read a string scalar or a sequence-of-string-scalars into a `Vec<String>`.
1149/// Wiki-link items keep their `[[…]]` form verbatim.
1150fn yaml_string_list(v: &serde_norway::Value) -> Vec<String> {
1151    match v {
1152        serde_norway::Value::String(s) => vec![s.clone()],
1153        serde_norway::Value::Sequence(seq) => seq
1154            .iter()
1155            .filter_map(yaml_string_or_wiki_link_literal)
1156            .collect(),
1157        _ => Vec::new(),
1158    }
1159}
1160
1161fn yaml_string_or_wiki_link_literal(v: &serde_norway::Value) -> Option<String> {
1162    v.as_str()
1163        .map(str::to_string)
1164        .or_else(|| unquoted_wiki_link_literal(v))
1165}
1166
1167fn yaml_to_json_value(v: &serde_norway::Value) -> Value {
1168    if let Some(link) = unquoted_wiki_link_literal(v) {
1169        return Value::String(link);
1170    }
1171    match v {
1172        serde_norway::Value::String(s) => Value::String(s.clone()),
1173        serde_norway::Value::Bool(b) => Value::Bool(*b),
1174        serde_norway::Value::Number(n) => {
1175            serde_json::to_value(n).unwrap_or_else(|_| Value::String(n.to_string()))
1176        }
1177        serde_norway::Value::Sequence(seq) => {
1178            Value::Array(seq.iter().map(yaml_to_json_value).collect())
1179        }
1180        serde_norway::Value::Mapping(_) | serde_norway::Value::Tagged(_) => {
1181            serde_json::to_value(v).unwrap_or(Value::Null)
1182        }
1183        serde_norway::Value::Null => Value::Null,
1184    }
1185}
1186
1187fn unquoted_wiki_link_literal(v: &serde_norway::Value) -> Option<String> {
1188    let serde_norway::Value::Sequence(outer) = v else {
1189        return None;
1190    };
1191    if outer.len() != 1 {
1192        return None;
1193    }
1194    let serde_norway::Value::Sequence(inner) = &outer[0] else {
1195        return None;
1196    };
1197    let [serde_norway::Value::String(target)] = inner.as_slice() else {
1198        return None;
1199    };
1200    Some(format!("[[{target}]]"))
1201}
1202
1203/// Parse an RFC3339 timestamp scalar.
1204fn parse_ts(s: &str) -> Option<DateTime<FixedOffset>> {
1205    DateTime::parse_from_rfc3339(s.trim()).ok()
1206}
1207
1208/// Render a timestamp the same way `serde_json` renders an `IndexRecord`
1209/// timestamp (RFC3339, `Z` for UTC, sub-seconds preserved) so the md
1210/// frontmatter and the jsonl agree byte-for-byte.
1211fn fmt_ts(ts: &DateTime<FixedOffset>) -> String {
1212    ts.to_rfc3339_opts(SecondsFormat::AutoSi, true)
1213}
1214
1215/// Max `updated` over an iterator of optional timestamps.
1216fn max_updated<'a>(
1217    it: impl Iterator<Item = Option<&'a DateTime<FixedOffset>>>,
1218) -> Option<DateTime<FixedOffset>> {
1219    let mut best: Option<DateTime<FixedOffset>> = None;
1220    for ts in it.flatten() {
1221        best = Some(match best {
1222            Some(cur) if cur >= *ts => cur,
1223            _ => *ts,
1224        });
1225    }
1226    best
1227}
1228
1229/// Read a type-folder's `index.jsonl` into records, applying last-write-wins by
1230/// `path` over any un-compacted lines (so a half-compacted jsonl still reads
1231/// cleanly). Missing file → empty set. Returns records in canonical order.
1232fn read_jsonl_records(jsonl: &Path) -> crate::Result<Vec<IndexRecord>> {
1233    let text = match fs::read_to_string(jsonl) {
1234        Ok(t) => t,
1235        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
1236        Err(e) => return Err(e.into()),
1237    };
1238    // Last-write-wins by path; preserve only the final occurrence.
1239    let mut by_path: BTreeMap<PathBuf, IndexRecord> = BTreeMap::new();
1240    for (i, line) in text.lines().enumerate() {
1241        if line.trim().is_empty() {
1242            continue;
1243        }
1244        let rec: IndexRecord = serde_json::from_str(line).map_err(|e| {
1245            crate::Error::Store(crate::store::StoreError::BadTypeIndex {
1246                path: jsonl.to_path_buf(),
1247                message: format!("line {}: {e}", i + 1),
1248            })
1249        })?;
1250        by_path.insert(rec.path.clone(), rec);
1251    }
1252    let mut records: Vec<IndexRecord> = by_path.into_values().collect();
1253    sort_records(&mut records);
1254    Ok(records)
1255}
1256
1257/// The minimal rollup stat a parent index needs from one type-folder's
1258/// `index.jsonl`: how many distinct files it catalogs (`count`) and the single
1259/// newest record (`newest`, the recency-sorted `.first()` — its `updated` feeds
1260/// the parent's derived `updated`, its `summary` the layer preview). Holding the
1261/// newest record alone, rather than the whole sidecar, is what keeps a rollup
1262/// recompute cheap regardless of how large the sidecar grows.
1263#[derive(Debug, Clone, Default, PartialEq)]
1264struct FolderStat {
1265    count: usize,
1266    newest: Option<IndexRecord>,
1267}
1268
1269/// Read a type-folder's `index.jsonl` ONCE and reduce it to a [`FolderStat`]:
1270/// distinct-`path` count (last-write-wins) plus the recency-newest record. A
1271/// missing sidecar is the default (`count: 0`, `newest: None`). This is the
1272/// **loop-path** rollup primitive — one streaming pass per sidecar, never the
1273/// content tree and never the 2–3× full reparse the old
1274/// `jsonl_record_count` + `read_jsonl_records` pair did. `count` is
1275/// byte-identical to [`read_jsonl_records`]`.len()` and `newest` to its
1276/// `.first()`, so a rollup built from these stats matches the from-scratch
1277/// builders byte-for-byte.
1278fn read_folder_stat(jsonl: &Path) -> crate::Result<FolderStat> {
1279    let text = match fs::read_to_string(jsonl) {
1280        Ok(t) => t,
1281        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(FolderStat::default()),
1282        Err(e) => return Err(e.into()),
1283    };
1284    // Last-write-wins by path, exactly like `read_jsonl_records`, so count and
1285    // newest are computed over the same compacted record set.
1286    let mut by_path: BTreeMap<PathBuf, IndexRecord> = BTreeMap::new();
1287    for (i, line) in text.lines().enumerate() {
1288        if line.trim().is_empty() {
1289            continue;
1290        }
1291        let rec: IndexRecord = serde_json::from_str(line).map_err(|e| {
1292            crate::Error::Store(crate::store::StoreError::BadTypeIndex {
1293                path: jsonl.to_path_buf(),
1294                message: format!("line {}: {e}", i + 1),
1295            })
1296        })?;
1297        by_path.insert(rec.path.clone(), rec);
1298    }
1299    let count = by_path.len();
1300    // The newest record is the minimum under `sort_records`' order (updated
1301    // desc, None last, ties by path asc) — i.e. what `.first()` returns. Find it
1302    // with a single min-scan instead of sorting the whole set.
1303    let newest = by_path.into_values().min_by(record_recency_cmp);
1304    Ok(FolderStat { count, newest })
1305}
1306
1307/// The total order [`sort_records`] imposes, as a comparator over two records:
1308/// `updated` descending (None last), ties broken by store-relative path
1309/// ascending. Kept in one place so `read_folder_stat`'s min-scan agrees with the
1310/// sort byte-for-byte on which record is "newest".
1311fn record_recency_cmp(a: &IndexRecord, b: &IndexRecord) -> std::cmp::Ordering {
1312    match (b.updated, a.updated) {
1313        (Some(bu), Some(au)) => bu.cmp(&au),
1314        (Some(_), None) => std::cmp::Ordering::Greater, // a is None → after b
1315        (None, Some(_)) => std::cmp::Ordering::Less,    // b is None → after a
1316        (None, None) => std::cmp::Ordering::Equal,
1317    }
1318    .then_with(|| a.path.cmp(&b.path))
1319}
1320
1321/// Per-child rollup stats for `layers`, read from each type-folder's on-disk
1322/// `index.jsonl` (one [`read_folder_stat`] pass each) rather than walked from the
1323/// content tree. The **loop-path** counterpart to the from-scratch counting in
1324/// [`Index::build_layer`] / [`Index::build_root`], reusing one read per sidecar
1325/// across BOTH the layer and root rollups. Empty folders (`count == 0`) are kept
1326/// out of the map.
1327///
1328/// NOTE on cost: this performs one read per type-folder, but each read line-parses
1329/// that folder's entire `index.jsonl`, so the total is `O(total catalogued
1330/// records)`, not `O(type-folders)` — it reads the whole catalog every call. It
1331/// avoids the content-tree walk ([`Store::walk`]), but it is NOT `O(changed)`. See
1332/// [`update_parents`] for the honest bound and the follow-up to fix it.
1333fn collect_child_stats(
1334    store: &Store,
1335    layers: &[Layer],
1336) -> crate::Result<BTreeMap<PathBuf, FolderStat>> {
1337    let mut stats = BTreeMap::new();
1338    for &layer in layers {
1339        for tf in type_folders_in_layer(store, layer) {
1340            let stat = read_folder_stat(&store.root.join(&tf).join("index.jsonl"))?;
1341            if stat.count > 0 {
1342                stats.insert(tf, stat);
1343            }
1344        }
1345    }
1346    Ok(stats)
1347}
1348
1349/// Walk a type-folder's `.md` content files, recursing through date-shards,
1350/// excluding the `index.md` artifact itself and any hidden entries.
1351fn walk_type_folder_files(folder_abs: &Path) -> Vec<PathBuf> {
1352    let mut out = Vec::new();
1353    if !folder_abs.is_dir() {
1354        return out;
1355    }
1356    for entry in walkdir::WalkDir::new(folder_abs)
1357        .into_iter()
1358        .filter_entry(|e| !is_hidden(e.file_name()))
1359        .filter_map(|e| e.ok())
1360    {
1361        if !entry.file_type().is_file() {
1362            continue;
1363        }
1364        let p = entry.path();
1365        if p.extension().and_then(|e| e.to_str()) != Some("md") {
1366            continue;
1367        }
1368        if p.file_name().and_then(|n| n.to_str()) == Some("index.md") {
1369            continue;
1370        }
1371        out.push(p.to_path_buf());
1372    }
1373    out
1374}
1375
1376/// The immediate type-folders under a layer (one directory level below the
1377/// layer dir), as store-relative paths. Hidden dirs and `log/` are skipped.
1378fn type_folders_in_layer(store: &Store, layer: Layer) -> Vec<PathBuf> {
1379    let layer_dir = store.root.join(layer_dir_name(layer));
1380    let mut out = Vec::new();
1381    let rd = match fs::read_dir(&layer_dir) {
1382        Ok(rd) => rd,
1383        Err(_) => return out,
1384    };
1385    for entry in rd.flatten() {
1386        if !entry.path().is_dir() {
1387            continue;
1388        }
1389        let name = entry.file_name();
1390        let name = match name.to_str() {
1391            Some(n) => n,
1392            None => continue,
1393        };
1394        if is_hidden(entry.file_name().as_os_str()) || name == "log" {
1395            continue;
1396        }
1397        out.push(PathBuf::from(layer_dir_name(layer)).join(name));
1398    }
1399    out.sort();
1400    out
1401}
1402
1403/// The layer a *loose* content file sits directly in: `<layer>/<file>.md` with
1404/// no type-folder between them — exactly two path components, the first a known
1405/// layer. `None` for a file inside a type-folder (`<layer>/<type>/…`, the common
1406/// case) or one outside any layer. A loose file is catalogued in the layer's own
1407/// `index.jsonl`, not a type-folder's.
1408fn loose_layer_of(file_rel: &Path) -> Option<Layer> {
1409    let mut comps = file_rel.components();
1410    let layer = layer_from_dir_name(comps.next()?.as_os_str().to_str()?)?;
1411    comps.next()?; // the file segment must exist…
1412    if comps.next().is_some() {
1413        return None; // …and be the last one (else it's inside a type-folder)
1414    }
1415    Some(layer)
1416}
1417
1418/// The `.md` content files that live directly at a layer root (loose files),
1419/// excluding `index.md` and any subdirectory (type-folders are walked
1420/// separately). Non-recursive: only the layer's immediate children.
1421fn loose_files_in_layer(store: &Store, layer: Layer) -> Vec<PathBuf> {
1422    let layer_dir = store.root.join(layer_dir_name(layer));
1423    let mut out = Vec::new();
1424    let rd = match fs::read_dir(&layer_dir) {
1425        Ok(rd) => rd,
1426        Err(_) => return out,
1427    };
1428    for entry in rd.flatten() {
1429        let p = entry.path();
1430        if !p.is_file() {
1431            continue;
1432        }
1433        if p.extension().and_then(|e| e.to_str()) != Some("md") {
1434            continue;
1435        }
1436        if is_index_artifact(&p) || is_hidden(entry.file_name().as_os_str()) {
1437            continue;
1438        }
1439        out.push(p);
1440    }
1441    out
1442}
1443
1444/// Write (or remove, when empty) a layer's own `index.jsonl` — the complete twin
1445/// for the loose files that live directly at the layer root. The single funnel
1446/// both write-through (`on_write`/`on_remove`/`on_rename`) and the sweeps
1447/// (`rebuild_all`/`write_level`) go through, so their output is byte-identical.
1448fn write_layer_jsonl(store: &Store, layer: Layer, records: &[IndexRecord]) -> crate::Result<()> {
1449    let path = store.root.join(layer_dir_name(layer)).join("index.jsonl");
1450    if records.is_empty() {
1451        remove_if_exists(&path)?;
1452        return Ok(());
1453    }
1454    let idx = Index {
1455        level: IndexLevel::Layer(layer),
1456        records: records.to_vec(),
1457        child_counts: BTreeMap::new(),
1458    };
1459    write_atomic(&path, idx.to_jsonl())
1460}
1461
1462/// Upsert (`removing` = false) or remove (`removing` = true) a loose file's row
1463/// in its layer `index.jsonl`, serialising the read-modify-write under a folder
1464/// lock (same discipline as the type-folder write-through). The layer `index.md`
1465/// rollup is untouched — loose files do not change type-folder counts.
1466fn apply_loose_change(
1467    store: &Store,
1468    layer: Layer,
1469    file_rel: &Path,
1470    removing: bool,
1471) -> crate::Result<()> {
1472    let layer_dir = store.root.join(layer_dir_name(layer));
1473    let _lock = FolderLock::acquire(&layer_dir);
1474    let jsonl = layer_dir.join("index.jsonl");
1475    let mut records = read_jsonl_records(&jsonl)?;
1476    records.retain(|r| r.path != file_rel);
1477    if !removing {
1478        records.push(record_from_file(
1479            &store.root.join(file_rel),
1480            file_rel.to_path_buf(),
1481        )?);
1482    }
1483    sort_records(&mut records);
1484    write_layer_jsonl(store, layer, &records)
1485}
1486
1487/// The type-folder a content file belongs to: `<layer>/<type>` (the first two
1488/// path components), or `None` if the path is not under a known layer with at
1489/// least a type segment.
1490fn type_folder_of(file_rel: &Path) -> Option<PathBuf> {
1491    let mut comps = file_rel.components();
1492    let layer = comps.next()?.as_os_str().to_str()?;
1493    layer_from_dir_name(layer)?;
1494    let type_seg = comps.next()?.as_os_str().to_str()?;
1495    Some(PathBuf::from(layer).join(type_seg))
1496}
1497
1498/// Convert an absolute path under `root` to a store-relative path.
1499fn rel_to_store(root: &Path, abs: &Path) -> Option<PathBuf> {
1500    abs.strip_prefix(root).ok().map(|p| p.to_path_buf())
1501}
1502
1503/// Normalize a possibly-absolute or `./`-prefixed path to a clean
1504/// store-relative form (drops a leading `./`; leaves already-relative paths).
1505fn normalize_rel(p: &Path) -> PathBuf {
1506    let s = path_to_unix(p);
1507    let s = s.strip_prefix("./").unwrap_or(&s);
1508    PathBuf::from(s)
1509}
1510
1511fn is_index_artifact(p: &Path) -> bool {
1512    matches!(
1513        p.file_name().and_then(|n| n.to_str()),
1514        Some("index.md") | Some("index.jsonl")
1515    )
1516}
1517
1518/// True when a file named `index.md` / `index.jsonl` is safe for [`Index::cleanup`]
1519/// to delete — i.e. it is a generated catalog artifact (or a stale/garbage
1520/// leftover from a previous build), NOT a user content file that merely happens
1521/// to be named `index.md`.
1522///
1523/// - `index.jsonl` is always a machine artifact (content files are `.md`), so it
1524///   is always deletable.
1525/// - `index.md` is deletable UNLESS it parses as a content file — frontmatter
1526///   whose `type` is some real record type (anything other than `index`). A
1527///   generated catalog carries `type: index`; a user record carries its own type
1528///   (`email`, `note`, …) and must be preserved (deleting it is silent,
1529///   unrecoverable data loss). A leftover with no/garbage frontmatter (e.g. a
1530///   bare `stale\n`) is treated as a deletable stale artifact.
1531fn is_deletable_catalog_artifact(p: &Path) -> bool {
1532    match p.file_name().and_then(|n| n.to_str()) {
1533        Some("index.jsonl") => true,
1534        Some("index.md") => match read_frontmatter(p) {
1535            // Real content file (non-`index` type) → preserve, never delete.
1536            Ok(meta) => meta.type_.as_deref().is_none_or(|t| t == "index"),
1537            // Unreadable / no frontmatter → a stale or garbage artifact, deletable.
1538            Err(_) => true,
1539        },
1540        _ => false,
1541    }
1542}
1543
1544fn is_hidden(name: &std::ffi::OsStr) -> bool {
1545    name.to_str().map(|s| s.starts_with('.')).unwrap_or(false)
1546}
1547
1548fn layer_dir_name(layer: Layer) -> &'static str {
1549    match layer {
1550        Layer::Sources => "sources",
1551        Layer::Records => "records",
1552    }
1553}
1554
1555/// Local layer-name parse. Mirrors the contract of [`Layer::from_dir_name`];
1556/// kept local to keep this module's walk self-contained (see the module header).
1557fn layer_from_dir_name(name: &str) -> Option<Layer> {
1558    match name {
1559        "sources" => Some(Layer::Sources),
1560        "records" => Some(Layer::Records),
1561        _ => None,
1562    }
1563}
1564
1565/// The final path component as a `&str` (folder basename).
1566fn folder_basename(p: &Path) -> &str {
1567    p.file_name().and_then(|n| n.to_str()).unwrap_or("")
1568}
1569
1570/// The canonical wiki-link target for a content path: the store-relative path
1571/// with `/` separators and the trailing `.md` stripped (the bare form the
1572/// `index.md` browse view links to).
1573fn wiki_target(p: &Path) -> String {
1574    let unix = path_to_unix(p);
1575    unix.strip_suffix(".md").unwrap_or(&unix).to_string()
1576}
1577
1578/// Render a path with `/` separators regardless of host OS, so artifacts are
1579/// identical on every platform.
1580///
1581/// A non-UTF-8 path component (reachable on Linux/ext4, db.md's primary
1582/// deployment target, where `sources/` files arrive verbatim from Latin-1
1583/// exports) is decoded **lossily** with `U+FFFD` markers rather than silently
1584/// dropped. The old `filter_map(|c| c.as_os_str().to_str())` dropped any bad
1585/// component entirely, so `sources/emails/caf\xe9.md` serialized as
1586/// `sources/emails` — a path pointing at the *directory*, not the file, that
1587/// also collapsed distinct files onto one `index.jsonl` key. Lossy decoding
1588/// keeps the leaf present and visibly marked.
1589fn path_to_unix(p: &Path) -> String {
1590    p.components()
1591        .map(|c| c.as_os_str().to_string_lossy().into_owned())
1592        .collect::<Vec<_>>()
1593        .join("/")
1594}
1595
1596/// Serde for [`IndexRecord::path`]: always forward-slash on the wire, so the
1597/// `index.jsonl` catalog is identical whether the store was written on POSIX or
1598/// Windows (a git clone across OSes yields the same paths, and the last-write-
1599/// wins upsert key never splits on separator style). On POSIX this matches the
1600/// default `PathBuf` serialization; on Windows it rewrites `\` to `/`.
1601mod path_serde {
1602    use super::path_to_unix;
1603    use serde::{Deserialize, Deserializer, Serializer};
1604    use std::path::{Path, PathBuf};
1605
1606    pub fn serialize<S: Serializer>(p: &Path, s: S) -> Result<S::Ok, S::Error> {
1607        s.serialize_str(&path_to_unix(p))
1608    }
1609
1610    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<PathBuf, D::Error> {
1611        Ok(PathBuf::from(String::deserialize(d)?))
1612    }
1613}
1614
1615/// ASCII-capitalize the first character.
1616fn capitalize(s: &str) -> String {
1617    let mut chars = s.chars();
1618    match chars.next() {
1619        Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(),
1620        None => String::new(),
1621    }
1622}
1623
1624/// Collapse all runs of whitespace (including newlines) into single spaces and
1625/// trim the ends — the single-line normalization the `index.md` browse entry
1626/// ([`format_md_entry`]) applies so a multi-line block-scalar summary can never
1627/// inject a newline into a catalog line.
1628fn collapse_whitespace(s: &str) -> String {
1629    s.split_whitespace().collect::<Vec<_>>().join(" ")
1630}
1631
1632/// Derive a folder's display name from its basename: separators (`-`, `_`)
1633/// become spaces and the first character is upper-cased (`hubspot-exports` →
1634/// `Hubspot exports`). A deterministic floor — the curator overrides it via
1635/// `DB.md ## Folders` (`records/x|HubSpot exports`) for casing the tool cannot
1636/// guess. The tool tidies a folder's *name*; it never infers its *meaning*.
1637fn default_display(basename: &str) -> String {
1638    let spaced: String = basename
1639        .chars()
1640        .map(|c| if c == '-' || c == '_' { ' ' } else { c })
1641        .collect();
1642    capitalize(&spaced)
1643}
1644
1645/// The display name + optional description a root/layer rollup shows for a child
1646/// type-folder: the curator's `## Folders` metadata when present, else the
1647/// derived display name and **no description**. This is the whole anti-"tool
1648/// invents the curator's judgment" contract for the rollups — a description is
1649/// surfaced only when the agent authored one; it is never composed from the
1650/// folder's newest member or any other content.
1651fn folder_label<'a>(
1652    tf_unix: &str,
1653    basename: &str,
1654    folders: &'a BTreeMap<String, FolderMeta>,
1655) -> (String, Option<&'a str>) {
1656    let meta = folders.get(tf_unix);
1657    let display = meta
1658        .and_then(|m| m.display.as_deref())
1659        .map(str::to_string)
1660        .unwrap_or_else(|| default_display(basename));
1661    (display, meta.and_then(|m| m.description.as_deref()))
1662}
1663
1664/// One root/layer rollup entry: `- [[<tf>/index|<Display>]] (<count>)` with an
1665/// ` — <description>` suffix only when the curator authored one.
1666fn folder_entry(tf_unix: &str, display: &str, count: usize, description: Option<&str>) -> String {
1667    match description {
1668        Some(d) => format!("- [[{tf_unix}/index|{display}]] ({count}) — {d}\n"),
1669        None => format!("- [[{tf_unix}/index|{display}]] ({count})\n"),
1670    }
1671}
1672
1673/// Atomic (rename-based) write for the **derived** catalog (`index.md` /
1674/// `index.jsonl`). Deliberately NOT `fsync`-durable like [`crate::fsx`]: the
1675/// index is rebuildable (`dbmd index rebuild`) and this is the O(changed)
1676/// write-through path, so a per-write `fsync` would be cost without benefit — a
1677/// crash-lost catalog write is recovered by a rebuild, not data loss. (Primary
1678/// data — content records, `log.md` — uses the durable `crate::fsx` path.)
1679fn write_atomic(path: &Path, contents: String) -> crate::Result<()> {
1680    if let Some(parent) = path.parent() {
1681        fs::create_dir_all(parent)?;
1682    }
1683    let dir = path.parent().unwrap_or_else(|| Path::new("."));
1684    let mut tmp = tempfile_in(dir)?;
1685    tmp.write_all(contents.as_bytes())?;
1686    tmp.flush()?;
1687    tmp.persist(path)?;
1688    Ok(())
1689}
1690
1691fn remove_if_exists(path: &Path) -> crate::Result<()> {
1692    match fs::remove_file(path) {
1693        Ok(()) => Ok(()),
1694        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1695        Err(e) => Err(e.into()),
1696    }
1697}
1698
1699fn bad_index(path: &Path, msg: &str) -> crate::Error {
1700    crate::Error::Store(crate::store::StoreError::BadTypeIndex {
1701        path: path.to_path_buf(),
1702        message: msg.to_string(),
1703    })
1704}
1705
1706/// Per-type-folder advisory lock for the write-through sidecar read-modify-write.
1707///
1708/// The write-through update of a folder's `index.jsonl`/`index.md` is a
1709/// read-snapshot → modify → atomic-rename-over-whole-file sequence. The SPEC
1710/// sanctions many-writer concurrency for `records/` (`dbmd write` is
1711/// `create_new`-race-safe for the *content* file), but two concurrent writers to
1712/// the SAME type-folder would each read the same sidecar snapshot, add only their
1713/// own row, and rename their whole file over the other's — a classic lost update,
1714/// dropping most rows until a manual `dbmd index rebuild`. This lock serializes
1715/// the per-folder RMW (the content file is already serialized by `create_new`),
1716/// so concurrent sanctioned writes each see the other's row.
1717///
1718/// Implementation: a hidden `<type-folder>/.index.lock` acquired via `create_new`
1719/// (the same O_EXCL primitive `cmd/write.rs` uses), bounded-spin with a small
1720/// sleep, and stale-lock breaking by mtime age so a crashed writer can't wedge
1721/// the folder forever. The dotfile name keeps it out of the content walk
1722/// (`walk_type_folder_files` skips hidden) and out of `cleanup`
1723/// (`is_index_artifact` only matches `index.md`/`index.jsonl`). RAII: the lock is
1724/// released (file removed) on drop, including on the error paths.
1725struct FolderLock {
1726    path: PathBuf,
1727    held: bool,
1728}
1729
1730impl FolderLock {
1731    /// Acquire the lock for `folder_abs`. Spins (with a short sleep) up to a
1732    /// bounded number of attempts, breaking a lock older than the staleness
1733    /// window so a crash can't deadlock the folder. Best-effort: if the lock
1734    /// genuinely can't be taken (extremely rare contention), it proceeds
1735    /// unlocked rather than failing the write — degrading to the prior behavior
1736    /// instead of erroring a sanctioned operation.
1737    fn acquire(folder_abs: &Path) -> Self {
1738        use std::time::{Duration, SystemTime};
1739        const MAX_ATTEMPTS: u32 = 600; // ~6s at 10ms/attempt
1740        const SPIN: Duration = Duration::from_millis(10);
1741        const STALE_AFTER: Duration = Duration::from_secs(30);
1742
1743        let path = folder_abs.join(".index.lock");
1744        // Ensure the folder exists so the lockfile create can succeed.
1745        let _ = fs::create_dir_all(folder_abs);
1746        for _ in 0..MAX_ATTEMPTS {
1747            match fs::OpenOptions::new()
1748                .write(true)
1749                .create_new(true)
1750                .open(&path)
1751            {
1752                Ok(_) => {
1753                    return FolderLock { path, held: true };
1754                }
1755                Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
1756                    // Break a stale lock left by a crashed writer.
1757                    if let Ok(meta) = fs::metadata(&path) {
1758                        if let Ok(modified) = meta.modified() {
1759                            if SystemTime::now()
1760                                .duration_since(modified)
1761                                .map(|age| age > STALE_AFTER)
1762                                .unwrap_or(false)
1763                            {
1764                                let _ = fs::remove_file(&path);
1765                                continue;
1766                            }
1767                        }
1768                    }
1769                    std::thread::sleep(SPIN);
1770                }
1771                // Any other error (e.g. permissions): give up on locking and
1772                // proceed unlocked rather than failing the write.
1773                Err(_) => return FolderLock { path, held: false },
1774            }
1775        }
1776        // Contention budget exhausted: proceed unlocked (best-effort).
1777        FolderLock { path, held: false }
1778    }
1779}
1780
1781impl Drop for FolderLock {
1782    fn drop(&mut self) {
1783        if self.held {
1784            let _ = fs::remove_file(&self.path);
1785        }
1786    }
1787}
1788
1789/// Acquire the write-through lock for one or two type-folders. When `a == b`
1790/// (same-folder rename) only one lock is taken. For two distinct folders the
1791/// locks are always acquired in sorted order so a pair of concurrent renames
1792/// touching the same two folders can't deadlock by grabbing them in opposite
1793/// orders. Returns the guard(s); drop releases them.
1794fn lock_folders(store: &Store, a: &Path, b: &Path) -> Vec<FolderLock> {
1795    if a == b {
1796        return vec![FolderLock::acquire(&store.root.join(a))];
1797    }
1798    let (first, second) = if a < b { (a, b) } else { (b, a) };
1799    vec![
1800        FolderLock::acquire(&store.root.join(first)),
1801        FolderLock::acquire(&store.root.join(second)),
1802    ]
1803}
1804
1805// A tiny atomic-write helper. `tempfile` is a dev-dependency for tests; for
1806// the library path we hand-roll a temp-file-then-rename so writes are atomic
1807// without pulling `tempfile` into the non-dev dependency set. The file handle
1808// is held in an `Option` so `persist` can take it out without fighting the
1809// `Drop` impl (which only cleans up an un-persisted temp file).
1810struct AtomicTemp {
1811    file: Option<fs::File>,
1812    path: PathBuf,
1813    persisted: bool,
1814}
1815
1816impl AtomicTemp {
1817    fn write_all(&mut self, bytes: &[u8]) -> std::io::Result<()> {
1818        self.file.as_mut().expect("temp file open").write_all(bytes)
1819    }
1820    fn flush(&mut self) -> std::io::Result<()> {
1821        self.file.as_mut().expect("temp file open").flush()
1822    }
1823    fn persist(mut self, dest: &Path) -> std::io::Result<()> {
1824        if let Some(f) = self.file.take() {
1825            f.sync_all().ok();
1826            // `f` dropped here, closing the handle before the rename.
1827        }
1828        fs::rename(&self.path, dest)?;
1829        self.persisted = true;
1830        Ok(())
1831    }
1832}
1833
1834impl Drop for AtomicTemp {
1835    fn drop(&mut self) {
1836        // Best-effort cleanup if not persisted (an error path bailed out).
1837        if !self.persisted {
1838            let _ = fs::remove_file(&self.path);
1839        }
1840    }
1841}
1842
1843fn tempfile_in(dir: &Path) -> std::io::Result<AtomicTemp> {
1844    use std::time::{SystemTime, UNIX_EPOCH};
1845    let nanos = SystemTime::now()
1846        .duration_since(UNIX_EPOCH)
1847        .map(|d| d.as_nanos())
1848        .unwrap_or(0);
1849    let pid = std::process::id();
1850    // Monotonic-ish unique suffix; the dir is the destination dir so rename is
1851    // same-filesystem and therefore atomic.
1852    let counter = next_temp_counter();
1853    let name = format!(".dbmd-index-{pid}-{nanos}-{counter}.tmp");
1854    let path = dir.join(name);
1855    let file = fs::OpenOptions::new()
1856        .write(true)
1857        .create_new(true)
1858        .open(&path)?;
1859    Ok(AtomicTemp {
1860        file: Some(file),
1861        path,
1862        persisted: false,
1863    })
1864}
1865
1866fn next_temp_counter() -> u64 {
1867    use std::sync::atomic::{AtomicU64, Ordering};
1868    static C: AtomicU64 = AtomicU64::new(0);
1869    C.fetch_add(1, Ordering::Relaxed)
1870}
1871
1872#[cfg(test)]
1873mod tests {
1874    use super::*;
1875    use std::collections::BTreeSet;
1876    use std::fs;
1877    use tempfile::TempDir;
1878
1879    // ── fixtures ─────────────────────────────────────────────────────────
1880
1881    /// A temp store with a `DB.md` marker. `store.config` is the parser default
1882    /// (these tests never exercise the config parser).
1883    fn mk_store() -> (TempDir, Store) {
1884        let dir = TempDir::new().unwrap();
1885        fs::write(dir.path().join("DB.md"), "# test store\n").unwrap();
1886        let store = Store {
1887            root: dir.path().to_path_buf(),
1888            config: crate::parser::Config::default(),
1889        };
1890        (dir, store)
1891    }
1892
1893    /// Write a content file at `rel` with the given frontmatter lines + body.
1894    /// `fm` is the raw YAML body between the fences (no `---`).
1895    fn write_raw(store: &Store, rel: &str, fm: &str, body: &str) {
1896        let abs = store.root.join(rel);
1897        fs::create_dir_all(abs.parent().unwrap()).unwrap();
1898        fs::write(&abs, format!("---\n{fm}\n---\n{body}")).unwrap();
1899    }
1900
1901    /// Convenience: write a typed content file with summary/updated/extras.
1902    fn write_doc(
1903        store: &Store,
1904        rel: &str,
1905        type_: &str,
1906        summary: Option<&str>,
1907        updated: Option<&str>,
1908        extra_yaml: &str,
1909    ) {
1910        let mut fm = format!("type: {type_}\n");
1911        if let Some(s) = summary {
1912            fm.push_str(&format!("summary: {s}\n"));
1913        }
1914        if let Some(u) = updated {
1915            fm.push_str(&format!("updated: {u}\n"));
1916        }
1917        fm.push_str(extra_yaml);
1918        write_raw(store, rel, fm.trim_end(), "\nbody text\n");
1919    }
1920
1921    fn read(store: &Store, rel: &str) -> String {
1922        fs::read_to_string(store.root.join(rel)).unwrap()
1923    }
1924
1925    fn exists(store: &Store, rel: &str) -> bool {
1926        store.root.join(rel).exists()
1927    }
1928
1929    /// Collect every `index.md` + `index.jsonl` under the store, mapped to its
1930    /// bytes — the surface the byte-identity invariant compares.
1931    fn snapshot_artifacts(store: &Store) -> BTreeMap<String, String> {
1932        let mut out = BTreeMap::new();
1933        for entry in walkdir::WalkDir::new(&store.root)
1934            .into_iter()
1935            .filter_map(|e| e.ok())
1936        {
1937            let p = entry.path();
1938            if is_index_artifact(p) {
1939                let rel = path_to_unix(&rel_to_store(&store.root, p).unwrap());
1940                out.insert(rel, fs::read_to_string(p).unwrap());
1941            }
1942        }
1943        out
1944    }
1945
1946    // ── build_type_folder + to_markdown ──────────────────────────────────
1947
1948    #[test]
1949    fn type_folder_aggregates_across_shards_in_recency_order() {
1950        let (_d, store) = mk_store();
1951        // Three emails across two month-shards, deliberately written
1952        // out-of-recency-order on disk.
1953        write_doc(
1954            &store,
1955            "sources/emails/2026/05/b-old.md",
1956            "email",
1957            Some("Older mail"),
1958            Some("2026-05-01T09:00:00Z"),
1959            "",
1960        );
1961        write_doc(
1962            &store,
1963            "sources/emails/2026/06/c-new.md",
1964            "email",
1965            Some("Newest mail"),
1966            Some("2026-06-15T12:00:00Z"),
1967            "",
1968        );
1969        write_doc(
1970            &store,
1971            "sources/emails/2026/05/a-mid.md",
1972            "email",
1973            Some("Middle mail"),
1974            Some("2026-05-20T08:00:00Z"),
1975            "",
1976        );
1977
1978        let idx = Index::build_type_folder(&store, Path::new("sources/emails")).unwrap();
1979        let paths: Vec<String> = idx.records.iter().map(|r| path_to_unix(&r.path)).collect();
1980        assert_eq!(
1981            paths,
1982            vec![
1983                "sources/emails/2026/06/c-new.md",
1984                "sources/emails/2026/05/a-mid.md",
1985                "sources/emails/2026/05/b-old.md",
1986            ],
1987            "records must aggregate across shards, newest `updated` first"
1988        );
1989    }
1990
1991    #[test]
1992    fn type_folder_md_format_entries_tags_and_derived_updated() {
1993        let (_d, store) = mk_store();
1994        write_doc(
1995            &store,
1996            "records/contacts/sarah-chen.md",
1997            "contact",
1998            Some("Renewal champion at Acme"),
1999            Some("2026-05-27T10:00:00Z"),
2000            "tags:\n  - renewal\n  - acme\n",
2001        );
2002        write_doc(
2003            &store,
2004            "records/contacts/no-tags.md",
2005            "contact",
2006            Some("Plain contact"),
2007            Some("2026-05-26T10:00:00Z"),
2008            "",
2009        );
2010
2011        let idx = Index::build_type_folder(&store, Path::new("records/contacts")).unwrap();
2012        let md = idx.to_markdown();
2013
2014        // Frontmatter is exact and the index's own `updated` is the MAX member
2015        // updated (the determinism the byte-identity invariant rests on).
2016        assert!(md.starts_with(
2017            "---\ntype: index\nscope: type-folder\nfolder: records/contacts\nupdated: 2026-05-27T10:00:00Z\n---\n\n# records/contacts\n"
2018        ), "frontmatter/heading wrong:\n{md}");
2019
2020        // Entry with tags: `— summary  ·  #tag #tag`.
2021        assert!(
2022            md.contains(
2023                "- [[records/contacts/sarah-chen]] — Renewal champion at Acme  ·  #renewal #acme\n"
2024            ),
2025            "tagged entry wrong:\n{md}"
2026        );
2027        // Entry without tags omits the `  ·  ` suffix entirely.
2028        assert!(
2029            md.contains("- [[records/contacts/no-tags]] — Plain contact\n"),
2030            "untagged entry wrong:\n{md}"
2031        );
2032        assert!(
2033            !md.contains("Plain contact  ·"),
2034            "untagged entry must not emit a tag separator"
2035        );
2036        // No `## More` below the cap.
2037        assert!(!md.contains("## More"), "no footer expected under the cap");
2038    }
2039
2040    #[test]
2041    fn missing_summary_becomes_placeholder_not_invented() {
2042        let (_d, store) = mk_store();
2043        write_doc(
2044            &store,
2045            "records/notes/x.md",
2046            "note",
2047            None,
2048            Some("2026-05-27T10:00:00Z"),
2049            "",
2050        );
2051        let idx = Index::build_type_folder(&store, Path::new("records/notes")).unwrap();
2052        assert_eq!(idx.records[0].summary, MISSING_SUMMARY);
2053        let md = idx.to_markdown();
2054        assert!(
2055            md.contains("- [[records/notes/x]] — (no summary)\n"),
2056            "missing summary must render the placeholder, not invent text:\n{md}"
2057        );
2058    }
2059
2060    // ── to_jsonl ─────────────────────────────────────────────────────────
2061
2062    #[test]
2063    fn jsonl_is_complete_structured_and_round_trips() {
2064        let (_d, store) = mk_store();
2065        write_doc(
2066            &store,
2067            "records/expenses/2026/05/e1.md",
2068            "expense",
2069            Some("Lunch with vendor"),
2070            Some("2026-05-10T10:00:00Z"),
2071            "created: 2026-05-10T09:00:00Z\nstatus: paid\namount: 42\ncompany: [[records/companies/acme]]\nrelated:\n  - [[records/concepts/spend]]\ntags:\n  - food\nlinks:\n  - records/concepts/spend\n  - [[records/concepts/renewal]]\n",
2072        );
2073        write_doc(
2074            &store,
2075            "records/expenses/2026/06/e2.md",
2076            "expense",
2077            Some("Cloud bill"),
2078            Some("2026-06-01T10:00:00Z"),
2079            "amount: 100\n",
2080        );
2081
2082        let idx = Index::build_type_folder(&store, Path::new("records/expenses")).unwrap();
2083        let jsonl = idx.to_jsonl();
2084        let lines: Vec<&str> = jsonl.lines().collect();
2085        assert_eq!(lines.len(), 2, "one JSON object per file, uncapped");
2086
2087        // Newest first (e2), and each line parses back to an equal record.
2088        let r0: IndexRecord = serde_json::from_str(lines[0]).unwrap();
2089        assert_eq!(path_to_unix(&r0.path), "records/expenses/2026/06/e2.md");
2090        assert_eq!(
2091            r0, idx.records[0],
2092            "jsonl line must round-trip to the record"
2093        );
2094
2095        // The first (data) record carries every reserved field + the extras in
2096        // `fields` (status/amount), and links/tags verbatim.
2097        let r1: IndexRecord = serde_json::from_str(lines[1]).unwrap();
2098        assert_eq!(r1.type_, "expense");
2099        assert_eq!(r1.summary, "Lunch with vendor");
2100        assert_eq!(r1.tags, vec!["food".to_string()]);
2101        assert_eq!(
2102            r1.links,
2103            vec![
2104                "records/concepts/spend".to_string(),
2105                "[[records/concepts/renewal]]".to_string()
2106            ]
2107        );
2108        assert_eq!(
2109            r1.created,
2110            Some(DateTime::parse_from_rfc3339("2026-05-10T09:00:00Z").unwrap())
2111        );
2112        assert_eq!(r1.fields.get("status"), Some(&Value::from("paid")));
2113        assert_eq!(r1.fields.get("amount"), Some(&Value::from(42)));
2114        assert_eq!(
2115            r1.fields.get("company"),
2116            Some(&Value::from("[[records/companies/acme]]"))
2117        );
2118        assert_eq!(
2119            r1.fields.get("related"),
2120            Some(&serde_json::json!(["[[records/concepts/spend]]"]))
2121        );
2122        // Reserved keys never leak into `fields`.
2123        for reserved in [
2124            "path", "type", "summary", "tags", "links", "created", "updated",
2125        ] {
2126            assert!(
2127                !r1.fields.contains_key(reserved),
2128                "reserved key {reserved} must not appear in fields"
2129            );
2130        }
2131
2132        // Stable key order: declared fields first, then sorted extras.
2133        assert!(
2134            lines[1].starts_with(
2135                r#"{"path":"records/expenses/2026/05/e1.md","type":"expense","summary":"Lunch with vendor","tags":["food"],"links":["records/concepts/spend","[[records/concepts/renewal]]"],"created":"2026-05-10T09:00:00Z","updated":"2026-05-10T10:00:00Z","#
2136            ),
2137            "jsonl key order not stable:\n{}",
2138            lines[1]
2139        );
2140        // The flattened extras come in BTreeMap (sorted) order. The catalog
2141        // injects `meta-type: fact` into every records-layer file that does not
2142        // declare one, so it appears among the sorted extras (between `company`
2143        // and `related`).
2144        assert!(
2145            lines[1].ends_with(r#""amount":42,"company":"[[records/companies/acme]]","meta-type":"fact","related":["[[records/concepts/spend]]"],"status":"paid"}"#),
2146            "extras must be sorted:\n{}",
2147            lines[1]
2148        );
2149    }
2150
2151    // ── cap + footer ─────────────────────────────────────────────────────
2152
2153    #[test]
2154    fn over_cap_md_shows_500_plus_footer_jsonl_holds_all() {
2155        let (_d, store) = mk_store();
2156        let total = MD_CAP + 7;
2157        for i in 0..total {
2158            // Distinct, monotonically increasing `updated` so order is total.
2159            let day = 1 + (i % 27);
2160            let rel = format!("sources/emails/2026/05/m-{i:04}.md");
2161            let updated = format!("2026-05-{day:02}T00:00:{:02}Z", i % 60);
2162            write_doc(
2163                &store,
2164                &rel,
2165                "email",
2166                Some(&format!("mail {i}")),
2167                Some(&updated),
2168                "",
2169            );
2170        }
2171        let idx = Index::build_type_folder(&store, Path::new("sources/emails")).unwrap();
2172        assert_eq!(idx.records.len(), total, "jsonl/records keep every file");
2173
2174        let md = idx.to_markdown();
2175        let entry_lines = md.lines().filter(|l| l.starts_with("- [[")).count();
2176        assert_eq!(entry_lines, MD_CAP, "md browse view is capped at 500");
2177
2178        assert!(
2179            md.contains("## More\n\n"),
2180            "over-cap md needs a More footer"
2181        );
2182        assert!(
2183            md.contains(&format!(
2184                "This folder has {total} files. The 500 most recent are listed above.\n"
2185            )),
2186            "footer count wrong:\n{md}"
2187        );
2188        assert!(
2189            md.contains(
2190                "Use `dbmd index query --type email --in sources` for the complete catalog.\n"
2191            ),
2192            "footer must infer type=email layer=sources:\n{md}"
2193        );
2194
2195        let jsonl = idx.to_jsonl();
2196        assert_eq!(jsonl.lines().count(), total, "jsonl is uncapped");
2197    }
2198
2199    // ── sort total order ─────────────────────────────────────────────────
2200
2201    #[test]
2202    fn sort_breaks_ties_by_path_and_puts_undated_last() {
2203        let mut recs = vec![
2204            rec("z/a.md", Some("2026-05-01T00:00:00Z")),
2205            rec("a/b.md", Some("2026-05-01T00:00:00Z")), // same updated, path < z/a
2206            rec("m/c.md", None),                         // undated → last
2207            rec("b/d.md", Some("2026-06-01T00:00:00Z")), // newest
2208        ];
2209        sort_records(&mut recs);
2210        let order: Vec<String> = recs.iter().map(|r| path_to_unix(&r.path)).collect();
2211        assert_eq!(order, vec!["b/d.md", "a/b.md", "z/a.md", "m/c.md"]);
2212    }
2213
2214    fn rec(path: &str, updated: Option<&str>) -> IndexRecord {
2215        IndexRecord {
2216            path: PathBuf::from(path),
2217            type_: "t".into(),
2218            summary: "s".into(),
2219            tags: vec![],
2220            links: vec![],
2221            created: None,
2222            updated: updated.map(|u| DateTime::parse_from_rfc3339(u).unwrap()),
2223            fields: BTreeMap::new(),
2224        }
2225    }
2226
2227    // ── build_layer / build_root ─────────────────────────────────────────
2228
2229    #[test]
2230    fn layer_index_lists_type_folders_with_counts() {
2231        let (_d, store) = mk_store();
2232        write_doc(
2233            &store,
2234            "records/contacts/a.md",
2235            "contact",
2236            Some("Contact A older"),
2237            Some("2026-05-01T00:00:00Z"),
2238            "",
2239        );
2240        write_doc(
2241            &store,
2242            "records/contacts/b.md",
2243            "contact",
2244            Some("Contact B newest"),
2245            Some("2026-05-09T00:00:00Z"),
2246            "",
2247        );
2248        write_doc(
2249            &store,
2250            "records/companies/x.md",
2251            "company",
2252            Some("Acme Inc"),
2253            Some("2026-05-05T00:00:00Z"),
2254            "",
2255        );
2256        // build the type-folder artifacts first (layer preview reads their jsonl)
2257        Index::write_level(&store, &IndexLevel::TypeFolder("records/contacts".into())).unwrap();
2258        Index::write_level(&store, &IndexLevel::TypeFolder("records/companies".into())).unwrap();
2259
2260        Index::write_level(&store, &IndexLevel::Layer(Layer::Records)).unwrap();
2261        let md = read(&store, "records/index.md");
2262
2263        assert!(
2264            md.starts_with("---\ntype: index\nscope: layer\nfolder: records\n"),
2265            "layer fm:\n{md}"
2266        );
2267        // Alphabetical type-folder order: companies before contacts.
2268        let companies_at = md.find("companies/index").unwrap();
2269        let contacts_at = md.find("contacts/index").unwrap();
2270        assert!(
2271            companies_at < contacts_at,
2272            "type folders must be alphabetical"
2273        );
2274        // Count + display only — with no `## Folders`, the rollup never invents
2275        // a per-folder description from a member summary.
2276        assert!(
2277            md.contains("- [[records/contacts/index|Contacts]] (2)\n"),
2278            "contacts entry:\n{md}"
2279        );
2280        assert!(
2281            md.contains("- [[records/companies/index|Companies]] (1)\n"),
2282            "companies entry:\n{md}"
2283        );
2284        // Crucially: no member summary leaked into the rollup as a description.
2285        assert!(
2286            !md.contains("Contact B newest") && !md.contains("Acme Inc"),
2287            "layer rollup must not quote a member summary:\n{md}"
2288        );
2289        // Layer `updated` is the max across children (contacts b = 05-09).
2290        assert!(
2291            md.contains("updated: 2026-05-09T00:00:00Z\n"),
2292            "layer updated must be max child:\n{md}"
2293        );
2294    }
2295
2296    #[test]
2297    fn folders_section_supplies_authored_display_and_description() {
2298        // The aligned contract: rollups surface the curator's `## Folders`
2299        // display + description; the tool never invents one. A folder with no
2300        // entry shows counts only — no member summary leaks in as a description.
2301        let (_d, mut store) = mk_store();
2302        store.config.folders.insert(
2303            "records/contacts".into(),
2304            crate::parser::FolderMeta {
2305                display: None,
2306                description: Some("people across customer + prospect accounts".into()),
2307            },
2308        );
2309        store.config.folders.insert(
2310            "sources/hubspot-exports".into(),
2311            crate::parser::FolderMeta {
2312                display: Some("HubSpot exports".into()),
2313                description: Some("deal + pipeline exports".into()),
2314            },
2315        );
2316        write_doc(
2317            &store,
2318            "records/contacts/a.md",
2319            "contact",
2320            Some("Contact A"),
2321            Some("2026-05-01T00:00:00Z"),
2322            "",
2323        );
2324        // companies has NO `## Folders` entry → counts only.
2325        write_doc(
2326            &store,
2327            "records/companies/x.md",
2328            "company",
2329            Some("Acme Inc"),
2330            Some("2026-05-05T00:00:00Z"),
2331            "",
2332        );
2333        write_doc(
2334            &store,
2335            "sources/hubspot-exports/d.md",
2336            "hubspot-export",
2337            Some("a single deal export"),
2338            Some("2026-05-03T00:00:00Z"),
2339            "",
2340        );
2341
2342        Index::rebuild_all(&store).unwrap();
2343
2344        // Authored description surfaced (contacts), with the derived display.
2345        let records_layer = read(&store, "records/index.md");
2346        assert!(
2347            records_layer.contains("- [[records/contacts/index|Contacts]] (1) — people across customer + prospect accounts\n"),
2348            "authored description must surface:\n{records_layer}"
2349        );
2350        // No `## Folders` entry ⇒ counts only; the member summary never leaks in.
2351        assert!(
2352            records_layer.contains("- [[records/companies/index|Companies]] (1)\n")
2353                && !records_layer.contains("Acme Inc"),
2354            "un-described folder is counts-only:\n{records_layer}"
2355        );
2356
2357        // Display override beats the derived "Hubspot exports".
2358        let sources_layer = read(&store, "sources/index.md");
2359        assert!(
2360            sources_layer.contains("- [[sources/hubspot-exports/index|HubSpot exports]] (1) — deal + pipeline exports\n"),
2361            "display override + description must surface:\n{sources_layer}"
2362        );
2363
2364        // Root rollup carries the same authored metadata (display + description).
2365        let root = read(&store, "index.md");
2366        assert!(
2367            root.contains("- [[records/contacts/index|Contacts]] (1) — people across customer + prospect accounts\n"),
2368            "root surfaces authored description:\n{root}"
2369        );
2370        assert!(
2371            root.contains("- [[sources/hubspot-exports/index|HubSpot exports]] (1) — deal + pipeline exports\n"),
2372            "root surfaces display override:\n{root}"
2373        );
2374    }
2375
2376    #[test]
2377    fn default_display_turns_separators_to_spaces_and_caps() {
2378        assert_eq!(default_display("contacts"), "Contacts");
2379        assert_eq!(default_display("hubspot-exports"), "Hubspot exports");
2380        assert_eq!(default_display("usage_exports"), "Usage exports");
2381    }
2382
2383    #[test]
2384    fn root_index_groups_layers_with_totals_and_per_type_counts() {
2385        let (_d, store) = mk_store();
2386        write_doc(
2387            &store,
2388            "sources/emails/2026/05/a.md",
2389            "email",
2390            Some("Mail"),
2391            Some("2026-05-01T00:00:00Z"),
2392            "",
2393        );
2394        write_doc(
2395            &store,
2396            "sources/docs/d.md",
2397            "doc",
2398            Some("Doc"),
2399            Some("2026-05-02T00:00:00Z"),
2400            "",
2401        );
2402        write_doc(
2403            &store,
2404            "records/contacts/c.md",
2405            "contact",
2406            Some("C"),
2407            Some("2026-05-03T00:00:00Z"),
2408            "",
2409        );
2410        // wiki empty → no Wiki section
2411
2412        Index::rebuild_all(&store).unwrap();
2413        let md = read(&store, "index.md");
2414
2415        assert!(
2416            md.starts_with("---\ntype: index\nscope: root\n"),
2417            "root fm:\n{md}"
2418        );
2419        assert!(md.contains("# Knowledge base index\n"), "root title:\n{md}");
2420        // Layer heading with total count; Sources before Records (canonical).
2421        let sources_h = md
2422            .find("## Sources (2)")
2423            .expect("sources heading w/ total 2");
2424        let records_h = md
2425            .find("## Records (1)")
2426            .expect("records heading w/ total 1");
2427        assert!(sources_h < records_h, "Sources must precede Records");
2428        assert!(!md.contains("## Wiki"), "empty layer gets no section");
2429        // Per-type sub-entries with (N), no preview at root.
2430        assert!(
2431            md.contains("- [[sources/docs/index|Docs]] (1)\n"),
2432            "root docs entry:\n{md}"
2433        );
2434        assert!(
2435            md.contains("- [[sources/emails/index|Emails]] (1)\n"),
2436            "root emails entry:\n{md}"
2437        );
2438        assert!(
2439            md.contains("- [[records/contacts/index|Contacts]] (1)\n"),
2440            "root contacts entry:\n{md}"
2441        );
2442        assert!(!md.contains("— "), "root entries carry no preview text");
2443    }
2444
2445    // ── write-through == rebuild (THE invariant) ─────────────────────────
2446
2447    #[test]
2448    fn on_write_matches_rebuild_byte_for_byte() {
2449        // Build a store incrementally via on_write, and a second identical store
2450        // via a single rebuild_all, then assert every index artifact is equal.
2451        let (_d1, wt) = mk_store();
2452        let (_d2, rb) = mk_store();
2453
2454        let docs: &[(&str, &str, &str, &str, &str)] = &[
2455            (
2456                "sources/emails/2026/05/e1.md",
2457                "email",
2458                "First mail",
2459                "2026-05-01T10:00:00Z",
2460                "tags:\n  - inbox\n",
2461            ),
2462            (
2463                "sources/emails/2026/06/e2.md",
2464                "email",
2465                "Second mail",
2466                "2026-06-01T10:00:00Z",
2467                "",
2468            ),
2469            (
2470                "records/contacts/sarah.md",
2471                "contact",
2472                "Sarah",
2473                "2026-05-15T10:00:00Z",
2474                "links:\n  - records/profiles/sarah\n",
2475            ),
2476            (
2477                "records/contacts/elena.md",
2478                "contact",
2479                "Elena",
2480                "2026-05-20T10:00:00Z",
2481                "status: active\n",
2482            ),
2483            (
2484                "records/profiles/sarah.md",
2485                "profile",
2486                "Sarah bio",
2487                "2026-05-21T10:00:00Z",
2488                "",
2489            ),
2490        ];
2491
2492        for (rel, t, sum, upd, extra) in docs {
2493            write_doc(&wt, rel, t, Some(sum), Some(upd), extra);
2494            write_doc(&rb, rel, t, Some(sum), Some(upd), extra);
2495            Index::on_write(&wt, Path::new(rel)).unwrap();
2496        }
2497        Index::rebuild_all(&rb).unwrap();
2498
2499        let a = snapshot_artifacts(&wt);
2500        let b = snapshot_artifacts(&rb);
2501        assert_eq!(
2502            a.keys().collect::<Vec<_>>(),
2503            b.keys().collect::<Vec<_>>(),
2504            "same set of index artifacts must exist"
2505        );
2506        for (k, v) in &a {
2507            assert_eq!(v, &b[k], "artifact {k} differs between write-through and rebuild:\n--- write-through ---\n{v}\n--- rebuild ---\n{}", b[k]);
2508        }
2509        // Sanity: artifacts actually exist (not a vacuous comparison of empties).
2510        assert!(a.contains_key("index.md"));
2511        assert!(a.contains_key("sources/emails/index.jsonl"));
2512        assert!(a.contains_key("records/contacts/index.md"));
2513    }
2514
2515    /// Regression (O(changed) bound, not just correctness): a loop op must
2516    /// recompute its parent rollups from the type-folder `index.jsonl` sidecars
2517    /// — never by walking the content tree of *sibling* folders it wasn't asked
2518    /// about. The byte-identity property test (which always indexes every folder
2519    /// before comparing) can't catch a violation, because a full-store walk
2520    /// produces the *correct* counts too; it just does so in `O(store files)`.
2521    ///
2522    /// The behavioral fingerprint of the old `update_parents → build_layer /
2523    /// build_root` (which called `walk_type_folder_files` on every type-folder in
2524    /// the store): a single `on_write` to `records/contacts/sarah.md` would
2525    /// surface, in the layer + root rollups, the file count of
2526    /// `records/companies` — a sibling that has content on disk but was NEVER
2527    /// passed to a write/index op, so it has no `index.jsonl`. An O(changed) loop
2528    /// op cannot "see" that un-indexed folder; a whole-store walk can. So this
2529    /// asserts the rollups reflect ONLY the sidecar-indexed folder, proving no
2530    /// content-tree walk happened.
2531    #[test]
2532    fn loop_op_does_not_walk_sibling_content_tree() {
2533        let (_d, store) = mk_store();
2534
2535        // A sibling type-folder with real content on disk, but deliberately
2536        // never indexed (no on_write / write_level / rebuild over it) ⇒ no
2537        // `records/companies/index.jsonl` exists.
2538        write_doc(
2539            &store,
2540            "records/companies/acme.md",
2541            "company",
2542            Some("Acme Inc"),
2543            Some("2026-05-05T00:00:00Z"),
2544            "",
2545        );
2546        write_doc(
2547            &store,
2548            "records/companies/globex.md",
2549            "company",
2550            Some("Globex"),
2551            Some("2026-05-06T00:00:00Z"),
2552            "",
2553        );
2554        assert!(
2555            !exists(&store, "records/companies/index.jsonl"),
2556            "precondition: companies must be un-indexed"
2557        );
2558
2559        // The ONLY loop op: a single write to a different type-folder.
2560        write_doc(
2561            &store,
2562            "records/contacts/sarah.md",
2563            "contact",
2564            Some("Sarah"),
2565            Some("2026-05-15T00:00:00Z"),
2566            "",
2567        );
2568        Index::on_write(&store, Path::new("records/contacts/sarah.md")).unwrap();
2569
2570        // The written folder is reflected in both rollups...
2571        let layer_md = read(&store, "records/index.md");
2572        let root_md = read(&store, "index.md");
2573        // (both rollups show counts only — no `## Folders` here, so no preview)
2574        assert!(
2575            layer_md.contains("- [[records/contacts/index|Contacts]] (1)\n")
2576                && !layer_md.contains("Sarah"),
2577            "layer must reflect the written folder, counts only:\n{layer_md}"
2578        );
2579        assert!(
2580            root_md.contains("- [[records/contacts/index|Contacts]] (1)\n"),
2581            "root must reflect the written folder:\n{root_md}"
2582        );
2583
2584        // ...but the un-indexed sibling must be INVISIBLE to a loop op. If the
2585        // rollups mention `records/companies` at all, `on_write` walked the whole
2586        // content tree — the O(store) regression.
2587        assert!(
2588            !layer_md.contains("companies"),
2589            "loop op walked the sibling content tree: layer rollup counts un-indexed records/companies\n{layer_md}"
2590        );
2591        assert!(
2592            !root_md.contains("companies"),
2593            "loop op walked the sibling content tree: root rollup counts un-indexed records/companies\n{root_md}"
2594        );
2595        // The layer's only child is contacts ⇒ its total is exactly 1, not 3.
2596        assert!(
2597            root_md.contains("## Records (1)"),
2598            "root layer total must count only the sidecar-indexed folder (1), not walked siblings (would be 3):\n{root_md}"
2599        );
2600
2601        // And the sidecar-derived count IS what a full walk WOULD yield once the
2602        // sibling is indexed too — i.e. the fix changes cost, not the eventual
2603        // result. Index companies, then confirm the rollups now (and only now)
2604        // include it, byte-identical to a from-scratch rebuild.
2605        let (_d2, rb) = mk_store();
2606        for (rel, t, s, u) in [
2607            (
2608                "records/companies/acme.md",
2609                "company",
2610                "Acme Inc",
2611                "2026-05-05T00:00:00Z",
2612            ),
2613            (
2614                "records/companies/globex.md",
2615                "company",
2616                "Globex",
2617                "2026-05-06T00:00:00Z",
2618            ),
2619            (
2620                "records/contacts/sarah.md",
2621                "contact",
2622                "Sarah",
2623                "2026-05-15T00:00:00Z",
2624            ),
2625        ] {
2626            write_doc(&rb, rel, t, Some(s), Some(u), "");
2627        }
2628        Index::on_write(&store, Path::new("records/companies/acme.md")).unwrap();
2629        Index::on_write(&store, Path::new("records/companies/globex.md")).unwrap();
2630        Index::rebuild_all(&rb).unwrap();
2631        let a = snapshot_artifacts(&store);
2632        let b = snapshot_artifacts(&rb);
2633        assert_eq!(
2634            a.keys().collect::<BTreeSet<_>>(),
2635            b.keys().collect::<BTreeSet<_>>(),
2636            "same artifact set after indexing both folders"
2637        );
2638        for (k, v) in &a {
2639            assert_eq!(
2640                v, &b[k],
2641                "after indexing the sibling too, loop result must equal rebuild for {k}"
2642            );
2643        }
2644        assert!(
2645            read(&store, "index.md").contains("## Records (3)"),
2646            "now that both folders are indexed, the root total is 3"
2647        );
2648    }
2649
2650    /// Regression: a type filed at the path the toolkit ITSELF computes
2651    /// (`Store::shard_path_for`) must be indexable end-to-end. The class of bug
2652    /// is a 2-component `<layer>/<file>` path, which `type_folder_of` treats as
2653    /// having no type-folder — making the producer (path computation) disagree
2654    /// with the consumer (index): the loop path crashes (`on_write` → `Err`, it
2655    /// tries to write `index.md` *inside* a file) while the sweep path silently
2656    /// drops the page from every catalog. A conclusion `profile` is a custom
2657    /// (non-built-in) type, so `shard_path_for` files it under the records-layer
2658    /// fallback `records/profile/<file>` — a conforming 3-component path. This test
2659    /// drives both paths through the real `shard_path_for` output and asserts
2660    /// (1) `on_write` succeeds, (2) the page appears in the rebuilt catalog, and
2661    /// (3) write-through == rebuild.
2662    #[test]
2663    fn custom_type_at_shard_path_for_is_indexable_end_to_end() {
2664        let (_d1, wt) = mk_store();
2665        let (_d2, rb) = mk_store();
2666
2667        // The toolkit's own canonical write path for a custom-type record.
2668        let rel = wt
2669            .shard_path_for(
2670                "profile",
2671                &crate::parser::Frontmatter::default(),
2672                "renewal-theme",
2673            )
2674            .unwrap();
2675        let rel_str = path_to_unix(&rel);
2676        // Guard the precondition the consumer requires: 3+ components so
2677        // `type_folder_of` resolves a real `<layer>/<type-folder>`.
2678        assert!(
2679            type_folder_of(&rel).is_some(),
2680            "shard_path_for produced a path the index cannot file: {rel_str}"
2681        );
2682
2683        write_doc(
2684            &wt,
2685            &rel_str,
2686            "profile",
2687            Some("Renewal theme"),
2688            Some("2026-05-21T10:00:00Z"),
2689            "",
2690        );
2691        write_doc(
2692            &rb,
2693            &rel_str,
2694            "profile",
2695            Some("Renewal theme"),
2696            Some("2026-05-21T10:00:00Z"),
2697            "",
2698        );
2699
2700        // (1) Loop path must NOT error (a 2-component `<layer>/<file>` shape
2701        // returned Err(Io(NotADirectory))).
2702        Index::on_write(&wt, &rel)
2703            .expect("on_write must succeed for a toolkit-computed custom-type path");
2704        Index::rebuild_all(&rb).unwrap();
2705
2706        // (2) The page is present in the rebuilt catalog (the old flat-path bug
2707        // silently omitted it from every artifact). The individual page link
2708        // lives in the *type-folder* index; the *layer* index rolls the
2709        // type-folder up — assert both, since the bug erased both. A custom
2710        // type's canonical folder is the records-layer fallback `records/profile`.
2711        let page_link = wiki_target(&rel); // records/profile/renewal-theme
2712        let tf_md = read(&rb, "records/profile/index.md");
2713        assert!(
2714            tf_md.contains(&format!("[[{page_link}]]")),
2715            "type-folder index must list the page link, got:\n{tf_md}"
2716        );
2717        assert!(
2718            exists(&rb, "records/profile/index.jsonl"),
2719            "type-folder jsonl must exist"
2720        );
2721        assert!(
2722            read(&rb, "records/profile/index.jsonl").contains(&rel_str),
2723            "type-folder jsonl must contain the page row"
2724        );
2725        // The layer index rolls the type-folder up (proves the page's folder is
2726        // visible to the layer catalog, not dropped).
2727        let layer_md = read(&rb, "records/index.md");
2728        assert!(
2729            layer_md.contains("records/profile/index"),
2730            "layer index must roll up the records/profile type-folder, got:\n{layer_md}"
2731        );
2732
2733        // (3) Write-through equals rebuild byte-for-byte — loop and sweep agree.
2734        let a = snapshot_artifacts(&wt);
2735        let b = snapshot_artifacts(&rb);
2736        assert_eq!(
2737            a.keys().collect::<Vec<_>>(),
2738            b.keys().collect::<Vec<_>>(),
2739            "loop and sweep must produce the same artifact set"
2740        );
2741        for (k, v) in &a {
2742            assert_eq!(
2743                v, &b[k],
2744                "custom-type artifact {k} differs between on_write and rebuild"
2745            );
2746        }
2747    }
2748
2749    #[test]
2750    fn on_remove_then_rebuild_match_and_pull_in_next_over_cap() {
2751        let (_d1, wt) = mk_store();
2752        let (_d2, rb) = mk_store();
2753        let total = MD_CAP + 3; // 503 files; removing one keeps md full at 500
2754        let mut all_rels = Vec::new();
2755        for i in 0..total {
2756            let rel = format!("sources/emails/2026/05/m-{i:04}.md");
2757            // `updated` strictly increasing across i by varying both minute and second
2758            let updated = format!("2026-05-10T00:{:02}:{:02}Z", i / 60, i % 60);
2759            write_doc(
2760                &wt,
2761                &rel,
2762                "email",
2763                Some(&format!("mail {i}")),
2764                Some(&updated),
2765                "",
2766            );
2767            write_doc(
2768                &rb,
2769                &rel,
2770                "email",
2771                Some(&format!("mail {i}")),
2772                Some(&updated),
2773                "",
2774            );
2775            all_rels.push(rel);
2776        }
2777        // Build write-through index, then remove the single newest file.
2778        Index::rebuild_all(&wt).unwrap();
2779        let newest = &all_rels[total - 1]; // highest i = newest updated
2780        fs::remove_file(wt.root.join(newest)).unwrap();
2781        Index::on_remove(&wt, Path::new(newest)).unwrap();
2782
2783        // Rebuild side: same end state (file physically absent).
2784        fs::remove_file(rb.root.join(newest)).unwrap();
2785        Index::rebuild_all(&rb).unwrap();
2786
2787        let a = snapshot_artifacts(&wt);
2788        let b = snapshot_artifacts(&rb);
2789        for (k, v) in &a {
2790            assert_eq!(v, &b[k], "after remove, artifact {k} drifted from rebuild");
2791        }
2792
2793        // The md must still hold exactly 500 entries (the 501st got pulled in)
2794        // and the removed file must be gone from both artifacts.
2795        let md = read(&wt, "sources/emails/index.md");
2796        assert_eq!(md.lines().filter(|l| l.starts_with("- [[")).count(), MD_CAP);
2797        // Removed (newest) file is gone from the bare-path md and the .md jsonl.
2798        assert!(
2799            !md.contains(&format!("[[{}]]", wiki_target(Path::new(newest)))),
2800            "removed file must not be listed in md"
2801        );
2802        // The file previously at rank 501 (excluded under the cap) is `all_rels[2]`
2803        // — `updated` increases with index, so newest-first rank 500 = index 2.
2804        // After dropping the newest it shifts into the visible 500.
2805        let pulled_in = &all_rels[2];
2806        assert!(
2807            md.contains(&format!("[[{}]]", wiki_target(Path::new(pulled_in)))),
2808            "the 501st-most-recent must be pulled into the browse view after a removal"
2809        );
2810        assert!(
2811            md.contains(&format!("This folder has {} files.", total - 1)),
2812            "footer count must decrement:\n{}",
2813            md.lines().rev().take(4).collect::<Vec<_>>().join("\n")
2814        );
2815        let jsonl = read(&wt, "sources/emails/index.jsonl");
2816        assert_eq!(
2817            jsonl.lines().count(),
2818            total - 1,
2819            "jsonl loses exactly the removed file"
2820        );
2821        assert!(
2822            !jsonl.contains(&path_to_unix(Path::new(newest))),
2823            "removed file must be gone from the jsonl too"
2824        );
2825    }
2826
2827    #[test]
2828    fn on_rename_cross_folder_matches_rebuild() {
2829        let (_d1, wt) = mk_store();
2830        let (_d2, rb) = mk_store();
2831        // Seed both stores identically.
2832        let seed: &[(&str, &str, &str, &str)] = &[
2833            (
2834                "records/contacts/a.md",
2835                "contact",
2836                "A",
2837                "2026-05-01T00:00:00Z",
2838            ),
2839            (
2840                "records/contacts/b.md",
2841                "contact",
2842                "B",
2843                "2026-05-02T00:00:00Z",
2844            ),
2845            (
2846                "records/companies/x.md",
2847                "company",
2848                "X",
2849                "2026-05-03T00:00:00Z",
2850            ),
2851        ];
2852        for (rel, t, s, u) in seed {
2853            write_doc(&wt, rel, t, Some(s), Some(u), "");
2854            write_doc(&rb, rel, t, Some(s), Some(u), "");
2855        }
2856        Index::rebuild_all(&wt).unwrap();
2857
2858        // Rename contacts/b.md -> companies/b.md (cross type-folder). The file's
2859        // `type` changes to match its new folder, as a real `dbmd rename` would.
2860        let old = "records/contacts/b.md";
2861        let new = "records/companies/b.md";
2862        fs::create_dir_all(wt.root.join("records/companies")).unwrap();
2863        fs::rename(wt.root.join(old), wt.root.join(new)).unwrap();
2864        // (type stays "contact" here; index copies frontmatter verbatim — the
2865        // test only asserts placement + parity with rebuild.)
2866        Index::on_rename(&wt, Path::new(old), Path::new(new)).unwrap();
2867
2868        // Rebuild side: same end state.
2869        fs::create_dir_all(rb.root.join("records/companies")).unwrap();
2870        fs::rename(rb.root.join(old), rb.root.join(new)).unwrap();
2871        Index::rebuild_all(&rb).unwrap();
2872
2873        let a = snapshot_artifacts(&wt);
2874        let b = snapshot_artifacts(&rb);
2875        assert_eq!(a.keys().collect::<Vec<_>>(), b.keys().collect::<Vec<_>>());
2876        for (k, v) in &a {
2877            assert_eq!(v, &b[k], "rename: artifact {k} drifted from rebuild");
2878        }
2879        // Concretely: b is gone from contacts, present in companies.
2880        let contacts = read(&wt, "records/contacts/index.md");
2881        assert!(!contacts.contains("records/contacts/b]]"));
2882        let companies = read(&wt, "records/companies/index.md");
2883        assert!(companies.contains("[[records/companies/b]]"));
2884    }
2885
2886    #[test]
2887    fn on_write_updates_existing_entry_in_place() {
2888        let (_d, store) = mk_store();
2889        write_doc(
2890            &store,
2891            "records/contacts/a.md",
2892            "contact",
2893            Some("Original"),
2894            Some("2026-05-01T00:00:00Z"),
2895            "",
2896        );
2897        Index::on_write(&store, Path::new("records/contacts/a.md")).unwrap();
2898        // Edit the same file: new summary + newer updated.
2899        write_doc(
2900            &store,
2901            "records/contacts/a.md",
2902            "contact",
2903            Some("Revised"),
2904            Some("2026-05-09T00:00:00Z"),
2905            "",
2906        );
2907        Index::on_write(&store, Path::new("records/contacts/a.md")).unwrap();
2908
2909        let jsonl = read(&store, "records/contacts/index.jsonl");
2910        assert_eq!(
2911            jsonl.lines().count(),
2912            1,
2913            "upsert must not duplicate the line"
2914        );
2915        assert!(jsonl.contains("Revised"), "jsonl must reflect the update");
2916        assert!(
2917            !jsonl.contains("Original"),
2918            "stale line must be gone (compacted)"
2919        );
2920        let md = read(&store, "records/contacts/index.md");
2921        assert!(md.contains("- [[records/contacts/a]] — Revised\n"));
2922        assert!(
2923            md.contains("updated: 2026-05-09T00:00:00Z\n"),
2924            "index updated must track the newer member"
2925        );
2926    }
2927
2928    // ── dry-run + cleanup ────────────────────────────────────────────────
2929
2930    #[test]
2931    fn dry_run_emits_separators_and_writes_nothing() {
2932        let (_d, store) = mk_store();
2933        write_doc(
2934            &store,
2935            "sources/emails/2026/05/a.md",
2936            "email",
2937            Some("Mail"),
2938            Some("2026-05-01T00:00:00Z"),
2939            "",
2940        );
2941        let out = Index::render_dry_run(&store, &IndexLevel::TypeFolder("sources/emails".into()))
2942            .unwrap();
2943        assert!(
2944            out.contains("--- sources/emails/index.md ---\n"),
2945            "md separator:\n{out}"
2946        );
2947        assert!(
2948            out.contains("--- sources/emails/index.jsonl ---\n"),
2949            "jsonl separator:\n{out}"
2950        );
2951        assert!(
2952            out.contains("- [[sources/emails/2026/05/a]] — Mail"),
2953            "md body present"
2954        );
2955        // Nothing was written to disk.
2956        assert!(
2957            !exists(&store, "sources/emails/index.md"),
2958            "dry-run must not write"
2959        );
2960        assert!(
2961            !exists(&store, "sources/emails/index.jsonl"),
2962            "dry-run must not write"
2963        );
2964    }
2965
2966    #[test]
2967    fn cleanup_removes_noncanonical_and_empty_indexes() {
2968        let (_d, store) = mk_store();
2969        write_doc(
2970            &store,
2971            "sources/emails/2026/05/a.md",
2972            "email",
2973            Some("Mail"),
2974            Some("2026-05-01T00:00:00Z"),
2975            "",
2976        );
2977        // A stray index inside a date-shard (non-canonical) ...
2978        fs::write(
2979            store.root.join("sources/emails/2026/05/index.md"),
2980            "stale\n",
2981        )
2982        .unwrap();
2983        fs::write(
2984            store.root.join("sources/emails/2026/05/index.jsonl"),
2985            "stale\n",
2986        )
2987        .unwrap();
2988        // ... and an index in an empty type-folder.
2989        fs::create_dir_all(store.root.join("records/empty")).unwrap();
2990        fs::write(store.root.join("records/empty/index.md"), "stale\n").unwrap();
2991
2992        Index::cleanup(&store).unwrap();
2993
2994        assert!(
2995            !exists(&store, "sources/emails/2026/05/index.md"),
2996            "shard index must be deleted"
2997        );
2998        assert!(
2999            !exists(&store, "sources/emails/2026/05/index.jsonl"),
3000            "shard jsonl must be deleted"
3001        );
3002        assert!(
3003            !exists(&store, "records/empty/index.md"),
3004            "empty-folder index must be deleted"
3005        );
3006        // The canonical type-folder file itself is untouched by cleanup.
3007        assert!(exists(&store, "sources/emails/2026/05/a.md"));
3008    }
3009
3010    #[test]
3011    fn rebuild_deletes_stale_indexes_for_emptied_folders() {
3012        let (_d, store) = mk_store();
3013        write_doc(
3014            &store,
3015            "records/contacts/a.md",
3016            "contact",
3017            Some("A"),
3018            Some("2026-05-01T00:00:00Z"),
3019            "",
3020        );
3021        Index::rebuild_all(&store).unwrap();
3022        assert!(exists(&store, "records/contacts/index.md"));
3023        assert!(exists(&store, "records/index.md"));
3024        assert!(exists(&store, "index.md"));
3025
3026        // Empty the folder entirely, then rebuild: all three levels vanish.
3027        fs::remove_file(store.root.join("records/contacts/a.md")).unwrap();
3028        Index::rebuild_all(&store).unwrap();
3029        assert!(
3030            !exists(&store, "records/contacts/index.md"),
3031            "emptied type-folder index gone"
3032        );
3033        assert!(
3034            !exists(&store, "records/index.md"),
3035            "now-empty layer index gone"
3036        );
3037        assert!(!exists(&store, "index.md"), "now-empty root index gone");
3038    }
3039
3040    // ── randomized parity (property-style) ───────────────────────────────
3041
3042    #[test]
3043    fn property_writethrough_equals_rebuild_under_mixed_ops() {
3044        // Deterministic pseudo-random op sequence (no rand crate): a small LCG.
3045        let (_d1, wt) = mk_store();
3046        let (_d2, rb) = mk_store();
3047        let mut seed: u64 = 0x9E3779B97F4A7C15;
3048        let mut next = || {
3049            seed = seed
3050                .wrapping_mul(6364136223846793005)
3051                .wrapping_add(1442695040888963407);
3052            (seed >> 33) as u32
3053        };
3054
3055        let folders = ["sources/emails", "records/contacts", "records/profiles"];
3056        let types = ["email", "contact", "profile"];
3057        let mut live: Vec<String> = Vec::new(); // store-relative paths that exist
3058
3059        for step in 0..120u32 {
3060            let r = next();
3061            let op = r % 10;
3062            if op < 6 || live.is_empty() {
3063                // CREATE/UPDATE
3064                let fi = (next() as usize) % folders.len();
3065                let folder = folders[fi];
3066                let id = next() % 40;
3067                let rel = if folder == "sources/emails" {
3068                    let month = 5 + (id % 2); // shard across two months
3069                    format!("{folder}/2026/{month:02}/f-{id:02}.md")
3070                } else {
3071                    format!("{folder}/f-{id:02}.md")
3072                };
3073                // recency varies with step so order is meaningful + total
3074                let updated = format!(
3075                    "2026-05-{:02}T{:02}:{:02}:00Z",
3076                    1 + (step % 27),
3077                    step % 24,
3078                    id % 60
3079                );
3080                let extra = if id % 3 == 0 {
3081                    "tags:\n  - x\n  - y\n"
3082                } else {
3083                    ""
3084                };
3085                write_doc(
3086                    &wt,
3087                    &rel,
3088                    types[fi],
3089                    Some(&format!("sum {step}")),
3090                    Some(&updated),
3091                    extra,
3092                );
3093                write_doc(
3094                    &rb,
3095                    &rel,
3096                    types[fi],
3097                    Some(&format!("sum {step}")),
3098                    Some(&updated),
3099                    extra,
3100                );
3101                Index::on_write(&wt, Path::new(&rel)).unwrap();
3102                if !live.contains(&rel) {
3103                    live.push(rel);
3104                }
3105            } else if op < 8 {
3106                // REMOVE a live file
3107                let idx = (next() as usize) % live.len();
3108                let rel = live.remove(idx);
3109                fs::remove_file(wt.root.join(&rel)).unwrap();
3110                fs::remove_file(rb.root.join(&rel)).ok();
3111                Index::on_remove(&wt, Path::new(&rel)).unwrap();
3112            } else {
3113                // RENAME a live file within the same layer (new id, maybe new type-folder)
3114                let idx = (next() as usize) % live.len();
3115                let old = live[idx].clone();
3116                // pick a destination folder in the same layer-ish set
3117                let fi = (next() as usize) % folders.len();
3118                let folder = folders[fi];
3119                let id = 50 + (next() % 40);
3120                let new = if folder == "sources/emails" {
3121                    format!("{folder}/2026/05/f-{id:02}.md")
3122                } else {
3123                    format!("{folder}/f-{id:02}.md")
3124                };
3125                if new == old || live.contains(&new) {
3126                    continue;
3127                }
3128                fs::create_dir_all(wt.root.join(&new).parent().unwrap()).unwrap();
3129                fs::create_dir_all(rb.root.join(&new).parent().unwrap()).unwrap();
3130                fs::rename(wt.root.join(&old), wt.root.join(&new)).unwrap();
3131                fs::rename(rb.root.join(&old), rb.root.join(&new)).unwrap();
3132                Index::on_rename(&wt, Path::new(&old), Path::new(&new)).unwrap();
3133                live[idx] = new;
3134            }
3135        }
3136
3137        // Now rebuild the rb side from the shared end state and compare.
3138        Index::rebuild_all(&rb).unwrap();
3139        let a = snapshot_artifacts(&wt);
3140        let b = snapshot_artifacts(&rb);
3141        assert_eq!(
3142            a.keys().collect::<BTreeSet<_>>(),
3143            b.keys().collect::<BTreeSet<_>>(),
3144            "write-through and rebuild must produce the same set of artifacts"
3145        );
3146        for (k, v) in &a {
3147            assert_eq!(
3148                v, &b[k],
3149                "INVARIANT VIOLATED: artifact {k} differs after mixed ops\n--- write-through ---\n{v}\n--- rebuild ---\n{}",
3150                b[k]
3151            );
3152        }
3153        assert!(
3154            !a.is_empty(),
3155            "the run must have produced at least one artifact"
3156        );
3157    }
3158
3159    // ── regressions: cleanup must not delete user content ─────────────────
3160
3161    /// CRITICAL regression: a user content file named `index.md` inside a date
3162    /// shard (e.g. from a website/doc-export mirror) must SURVIVE `cleanup` /
3163    /// `rebuild_all`. The old filename-only match silently deleted it.
3164    #[test]
3165    fn cleanup_preserves_user_content_named_index_md_in_shard() {
3166        let (_d, store) = mk_store();
3167        // A real content record that merely happens to be named index.md.
3168        write_doc(
3169            &store,
3170            "sources/emails/2026/06/index.md",
3171            "email",
3172            Some("Important imported mail"),
3173            Some("2026-06-11T04:23:25Z"),
3174            "",
3175        );
3176        Index::cleanup(&store).unwrap();
3177        assert!(
3178            exists(&store, "sources/emails/2026/06/index.md"),
3179            "cleanup must not delete a user content file named index.md"
3180        );
3181        // A full rebuild (which runs cleanup first) must also preserve it.
3182        Index::rebuild_all(&store).unwrap();
3183        assert!(
3184            exists(&store, "sources/emails/2026/06/index.md"),
3185            "rebuild_all must not delete a user content file named index.md"
3186        );
3187        let kept = read(&store, "sources/emails/2026/06/index.md");
3188        assert!(
3189            kept.contains("Important imported mail"),
3190            "the user's record content must be intact"
3191        );
3192    }
3193
3194    /// HIGH regression: `cleanup` uses `min_depth(2)`, so the canonical
3195    /// type-folder-root `index.md`/`index.jsonl` are NOT deleted up front. A
3196    /// genuine generated catalog at the type-folder root survives a cleanup pass
3197    /// (it is only ever rewritten, or removed when the folder is truly empty).
3198    #[test]
3199    fn cleanup_keeps_canonical_type_folder_root_sidecars() {
3200        let (_d, store) = mk_store();
3201        write_doc(
3202            &store,
3203            "records/contacts/alice.md",
3204            "contact",
3205            Some("Alice"),
3206            Some("2026-05-01T00:00:00Z"),
3207            "",
3208        );
3209        Index::write_level(&store, &IndexLevel::TypeFolder("records/contacts".into())).unwrap();
3210        assert!(exists(&store, "records/contacts/index.md"));
3211        assert!(exists(&store, "records/contacts/index.jsonl"));
3212        Index::cleanup(&store).unwrap();
3213        assert!(
3214            exists(&store, "records/contacts/index.md"),
3215            "cleanup must keep the canonical type-folder index.md (non-empty folder)"
3216        );
3217        assert!(
3218            exists(&store, "records/contacts/index.jsonl"),
3219            "cleanup must keep the canonical type-folder index.jsonl (non-empty folder)"
3220        );
3221    }
3222
3223    // ── regression: write-through must not catalog index artifacts ────────
3224
3225    /// HIGH regression: routing a generated `index.md` through `on_write` (as
3226    /// `dbmd fm set records/contacts/index.md …` would) must NOT insert a phantom
3227    /// self-row — counts and bytes stay equal to a rebuild.
3228    #[test]
3229    fn on_write_ignores_index_artifact_no_phantom_row() {
3230        let (_d, store) = mk_store();
3231        write_doc(
3232            &store,
3233            "records/contacts/alice.md",
3234            "contact",
3235            Some("Alice"),
3236            Some("2026-05-01T00:00:00Z"),
3237            "",
3238        );
3239        Index::on_write(&store, Path::new("records/contacts/alice.md")).unwrap();
3240        let jsonl_before = read(&store, "records/contacts/index.jsonl");
3241        assert_eq!(jsonl_before.lines().count(), 1);
3242
3243        // Tamper: route the catalog file itself through on_write.
3244        Index::on_write(&store, Path::new("records/contacts/index.md")).unwrap();
3245
3246        let jsonl_after = read(&store, "records/contacts/index.jsonl");
3247        assert_eq!(
3248            jsonl_after.lines().count(),
3249            1,
3250            "on_write on index.md must not add a phantom self-row"
3251        );
3252        assert!(
3253            !jsonl_after.contains("\"type\":\"index\""),
3254            "the catalog artifact must never appear as a catalogued row"
3255        );
3256        // Root rollup count stays 1 (not inflated to 2).
3257        let root = read(&store, "index.md");
3258        assert!(
3259            root.contains("[[records/contacts/index|Contacts]] (1)"),
3260            "count must not inflate:\n{root}"
3261        );
3262    }
3263
3264    // ── regression: multi-line summary cannot inject a catalog line ───────
3265
3266    /// HIGH regression: a block-scalar summary spanning multiple lines must be
3267    /// collapsed to one line in the browse entry, so it cannot forge a standalone
3268    /// `- [[…]]` catalog line.
3269    #[test]
3270    fn multiline_summary_is_single_lined_in_index_md() {
3271        let (_d, store) = mk_store();
3272        // A YAML block scalar whose value embeds a forged-looking entry line.
3273        write_raw(
3274            &store,
3275            "records/notes/evil.md",
3276            "type: note\nupdated: 2026-06-10T00:00:00Z\nsummary: |-\n  legit first line\n  - [[records/secrets/fake|Click me]] — injected entry",
3277            "\nbody\n",
3278        );
3279        let idx = Index::build_type_folder(&store, Path::new("records/notes")).unwrap();
3280        let md = idx.to_markdown();
3281        // Exactly one browse entry line, and no embedded newline forging a second.
3282        let entry_lines = md.lines().filter(|l| l.starts_with("- [[")).count();
3283        assert_eq!(
3284            entry_lines, 1,
3285            "a multi-line summary must not produce extra entry lines:\n{md}"
3286        );
3287        assert!(
3288            md.contains(
3289                "- [[records/notes/evil]] — legit first line - [[records/secrets/fake|Click me]] — injected entry\n"
3290            ),
3291            "summary newlines must collapse to spaces inline:\n{md}"
3292        );
3293    }
3294
3295    // ── regression: writer/validator scalar coercion agreement ────────────
3296
3297    /// HIGH regression: an unquoted non-string scalar `summary`/`type`
3298    /// (`summary: 2026`, `type: true`) must be coerced to a string by the index
3299    /// writer exactly as `validate::scalar_string` does — so the index entry holds
3300    /// the real value (`2026`), not the `(no summary)` placeholder that produced a
3301    /// permanently-unfixable INDEX_SUMMARY_MISMATCH.
3302    #[test]
3303    fn non_string_scalar_summary_and_type_are_coerced_like_validator() {
3304        let (_d, store) = mk_store();
3305        write_raw(
3306            &store,
3307            "records/contacts/a.md",
3308            "type: contact\nupdated: 2026-05-01T00:00:00Z\nsummary: 2026",
3309            "\nbody\n",
3310        );
3311        let rec = record_from_file(
3312            &store.root.join("records/contacts/a.md"),
3313            PathBuf::from("records/contacts/a.md"),
3314        )
3315        .unwrap();
3316        // `summary: 2026` (YAML number) coerces to the string "2026", matching
3317        // the validator's `scalar_string` (Number -> n.to_string()).
3318        assert_eq!(rec.summary, "2026");
3319        assert_eq!(rec.type_, "contact");
3320
3321        // And the rendered index entry quotes the real value, not the placeholder.
3322        let idx = Index::build_type_folder(&store, Path::new("records/contacts")).unwrap();
3323        let md = idx.to_markdown();
3324        assert!(
3325            md.contains("- [[records/contacts/a]] — 2026\n"),
3326            "index entry must hold the coerced scalar, not the placeholder:\n{md}"
3327        );
3328
3329        // A boolean scalar type coerces to "true" (mirrors scalar_string(Bool)).
3330        write_raw(
3331            &store,
3332            "records/contacts/b.md",
3333            "type: true\nupdated: 2026-05-02T00:00:00Z\nsummary: hi",
3334            "\nbody\n",
3335        );
3336        let rec_b = record_from_file(
3337            &store.root.join("records/contacts/b.md"),
3338            PathBuf::from("records/contacts/b.md"),
3339        )
3340        .unwrap();
3341        assert_eq!(rec_b.type_, "true");
3342    }
3343
3344    // ── regression: non-UTF-8 body must not abort the projection ──────────
3345
3346    /// HIGH regression: a content file with valid-UTF-8 frontmatter but a
3347    /// non-UTF-8 byte in the BODY (a verbatim Latin-1 `sources/` import) must
3348    /// still project to an IndexRecord — `record_from_file` reads frontmatter
3349    /// without requiring the whole file to be UTF-8, so a stray byte can't abort
3350    /// `rebuild_all` / write-through for the entire store.
3351    #[test]
3352    fn non_utf8_body_does_not_abort_record_projection() {
3353        let (_d, store) = mk_store();
3354        let rel = "sources/emails/2026/06/x.md";
3355        let abs = store.root.join(rel);
3356        fs::create_dir_all(abs.parent().unwrap()).unwrap();
3357        // Valid-UTF-8 frontmatter; a raw 0xE9 (Latin-1 'é') in the body.
3358        let mut bytes: Vec<u8> =
3359            b"---\ntype: email\nupdated: 2026-06-11T00:00:00Z\nsummary: An imported email\n---\n\nCaf"
3360                .to_vec();
3361        bytes.push(0xE9);
3362        bytes.extend_from_slice(b" meeting notes\n");
3363        fs::write(&abs, bytes).unwrap();
3364
3365        let rec = record_from_file(&abs, PathBuf::from(rel))
3366            .expect("non-UTF-8 body must not abort the frontmatter read");
3367        assert_eq!(rec.summary, "An imported email");
3368        assert_eq!(rec.type_, "email");
3369
3370        // The full sweep indexes the folder rather than aborting the whole store.
3371        Index::rebuild_all(&store).unwrap();
3372        assert!(
3373            exists(&store, "sources/emails/index.jsonl"),
3374            "rebuild must produce the catalog despite a non-UTF-8 body byte"
3375        );
3376        assert!(
3377            read(&store, "sources/emails/index.jsonl").contains("An imported email"),
3378            "the record must be catalogued"
3379        );
3380    }
3381
3382    /// HIGH regression: a single malformed-YAML file must abort the rebuild
3383    /// loudly (not be silently skipped) — skipping it would leave the store in a
3384    /// permanently invalid state (`INDEX_MISSING_ENTRY` / `INDEX_JSONL_DESYNC`
3385    /// that no rebuild clears, since the validator enumerates members by
3386    /// filename, not by parseability) and would desync the rollups. The abort is
3387    /// safe because `cleanup` preserves the prior canonical catalogs
3388    /// (`min_depth(2)`), so an aborted rebuild leaves the existing sidecars
3389    /// intact and surfaces a clear error naming the file to fix.
3390    #[test]
3391    fn rebuild_aborts_on_malformed_file_and_keeps_prior_catalogs() {
3392        let (_d, store) = mk_store();
3393        write_doc(
3394            &store,
3395            "records/contacts/alice.md",
3396            "contact",
3397            Some("Alice"),
3398            Some("2026-05-01T00:00:00Z"),
3399            "",
3400        );
3401        write_doc(
3402            &store,
3403            "records/companies/acme.md",
3404            "company",
3405            Some("Acme"),
3406            Some("2026-05-02T00:00:00Z"),
3407            "",
3408        );
3409
3410        // A clean first rebuild establishes the canonical catalogs.
3411        Index::rebuild_all(&store).expect("clean rebuild succeeds");
3412        assert!(exists(&store, "records/contacts/index.jsonl"));
3413        assert!(exists(&store, "records/companies/index.jsonl"));
3414
3415        // Routine malformed file: unterminated quoted scalar.
3416        let bad = store.root.join("records/contacts/broken.md");
3417        fs::write(
3418            &bad,
3419            "---\ntype: contact\nsummary: \"unterminated\n---\nbody\n",
3420        )
3421        .unwrap();
3422
3423        // Must abort loudly — a silent skip leaves a file the validator requires
3424        // to be catalogued out of the index forever.
3425        Index::rebuild_all(&store)
3426            .expect_err("rebuild must abort, not silently skip, on a malformed file");
3427
3428        // The prior canonical catalogs survive the aborted rebuild: `cleanup`'s
3429        // `min_depth(2)` never deletes a type-folder's root-level sidecars, so a
3430        // mid-sweep abort leaves the existing indexes intact rather than wiped.
3431        assert!(
3432            exists(&store, "records/companies/index.jsonl"),
3433            "an aborted rebuild must not destroy a clean sibling folder's catalog"
3434        );
3435        assert!(
3436            exists(&store, "records/contacts/index.jsonl"),
3437            "an aborted rebuild must not destroy the affected folder's prior catalog"
3438        );
3439        let contacts_jsonl = read(&store, "records/contacts/index.jsonl");
3440        assert!(contacts_jsonl.contains("records/contacts/alice.md"));
3441    }
3442
3443    /// HIGH regression (problem B): `rebuild_all`'s rollup `(N)` counts must
3444    /// equal the catalogued `index.jsonl` record counts — never a raw `.md` walk
3445    /// that disagrees with the sidecar. The over-corrected skip-with-diagnostic
3446    /// build excluded a malformed file from `index.jsonl` while `build_layer` /
3447    /// `build_root` kept counting it via `walk_type_folder_files`, so a folder
3448    /// would show `Contacts (2)` in the root/layer rollups while its `index.jsonl`
3449    /// held only 1 record — and a single subsequent write-through (which derives
3450    /// `(N)` from the jsonl) rewrote it to `Contacts (1)`, making `rebuild_all`
3451    /// and write-through emit different bytes for the same state. With the loud
3452    /// abort, the only successful-rebuild states are fully consistent: every
3453    /// rollup `(N)` equals the catalogued record count AND equals what a
3454    /// write-through over the same files produces.
3455    #[test]
3456    fn rebuild_rollup_counts_equal_jsonl_records_and_write_through() {
3457        let (_d, store) = mk_store();
3458        // Two well-formed contacts: the rollups must read (2), matching the two
3459        // jsonl records — this is the count the skip-version inflated to a phantom
3460        // extra when a malformed sibling was present-but-uncatalogued.
3461        write_doc(
3462            &store,
3463            "records/contacts/alice.md",
3464            "contact",
3465            Some("Alice"),
3466            Some("2026-05-01T00:00:00Z"),
3467            "",
3468        );
3469        write_doc(
3470            &store,
3471            "records/contacts/bob.md",
3472            "contact",
3473            Some("Bob"),
3474            Some("2026-05-02T00:00:00Z"),
3475            "",
3476        );
3477        Index::rebuild_all(&store).expect("clean rebuild succeeds");
3478
3479        // The catalogued record set (index.jsonl) and the rollup (N) must agree.
3480        let jsonl_lines = read(&store, "records/contacts/index.jsonl")
3481            .lines()
3482            .filter(|l| !l.trim().is_empty())
3483            .count();
3484        assert_eq!(jsonl_lines, 2, "two well-formed files ⇒ two jsonl records");
3485        let layer_md = read(&store, "records/index.md");
3486        let root_md = read(&store, "index.md");
3487        assert!(
3488            layer_md.contains("- [[records/contacts/index|Contacts]] (2)"),
3489            "layer rollup (N) must equal the jsonl record count (2), not a raw .md walk:\n{layer_md}"
3490        );
3491        assert!(
3492            root_md.contains("- [[records/contacts/index|Contacts]] (2)\n")
3493                && root_md.contains("## Records (2)"),
3494            "root rollup (N)/layer total must equal the jsonl record count (2):\n{root_md}"
3495        );
3496
3497        // The decisive write-through == rebuild_all byte-identity check on the
3498        // SAME end state: a single on_write must not rewrite the rollups to a
3499        // different (N). Under the skip-version, rebuild_all's rollup walked the
3500        // raw .md tree while on_write derived (N) from the jsonl, so the two
3501        // diverged; the loud abort keeps both deriving (N) from the catalogued
3502        // records, so the bytes match exactly.
3503        let (_d2, wt) = mk_store();
3504        write_doc(
3505            &wt,
3506            "records/contacts/alice.md",
3507            "contact",
3508            Some("Alice"),
3509            Some("2026-05-01T00:00:00Z"),
3510            "",
3511        );
3512        write_doc(
3513            &wt,
3514            "records/contacts/bob.md",
3515            "contact",
3516            Some("Bob"),
3517            Some("2026-05-02T00:00:00Z"),
3518            "",
3519        );
3520        Index::on_write(&wt, Path::new("records/contacts/alice.md")).unwrap();
3521        Index::on_write(&wt, Path::new("records/contacts/bob.md")).unwrap();
3522
3523        let a = snapshot_artifacts(&wt);
3524        let b = snapshot_artifacts(&store);
3525        assert_eq!(
3526            a.keys().collect::<BTreeSet<_>>(),
3527            b.keys().collect::<BTreeSet<_>>(),
3528            "write-through and rebuild_all must produce the same artifact set"
3529        );
3530        for (k, v) in &a {
3531            assert_eq!(
3532                v, &b[k],
3533                "rollup bytes diverged between write-through and rebuild_all for {k} \
3534                 (a skip-version inflates rebuild_all's (N) above the jsonl record \
3535                 count, which write-through then rewrites):\n--- write-through ---\n{v}\n--- rebuild ---\n{}",
3536                b[k]
3537            );
3538        }
3539    }
3540
3541    /// MEDIUM regression: a non-UTF-8 path component must be lossily decoded
3542    /// (kept, with U+FFFD), not silently dropped — so the index key points at the
3543    /// file, not its parent directory. Unix-only (ext4 allows the filename; APFS
3544    /// rejects it at the VFS layer).
3545    #[cfg(unix)]
3546    #[test]
3547    fn non_utf8_path_component_is_kept_not_dropped() {
3548        use std::ffi::OsStr;
3549        use std::os::unix::ffi::OsStrExt;
3550        // sources/emails/caf\xE9.md — the leaf has a non-UTF-8 byte.
3551        let mut leaf = b"caf".to_vec();
3552        leaf.push(0xE9);
3553        leaf.extend_from_slice(b".md");
3554        let p = Path::new("sources/emails").join(OsStr::from_bytes(&leaf));
3555        let unix = path_to_unix(&p);
3556        // The leaf is preserved (lossy), so the path is NOT collapsed to the
3557        // parent directory "sources/emails".
3558        assert_ne!(
3559            unix, "sources/emails",
3560            "non-UTF-8 leaf must not be dropped, collapsing the path to its parent dir"
3561        );
3562        assert!(
3563            unix.starts_with("sources/emails/caf"),
3564            "the lossy leaf must remain under its folder: {unix}"
3565        );
3566    }
3567
3568    // ── loose files (directly at a layer root, no type-folder) ───────────────
3569
3570    #[test]
3571    fn loose_file_is_catalogued_in_layer_jsonl_not_type_folder() {
3572        let (_d, store) = mk_store();
3573        // One canonical file (in a type-folder) and one loose file at the root.
3574        write_doc(
3575            &store,
3576            "records/contacts/alice.md",
3577            "contact",
3578            Some("Alice"),
3579            Some("2026-06-01T08:00:00Z"),
3580            "id: alice\n",
3581        );
3582        write_doc(
3583            &store,
3584            "records/loose.md",
3585            "contact",
3586            Some("Loose"),
3587            Some("2026-06-01T08:00:00Z"),
3588            "id: loose\n",
3589        );
3590        Index::rebuild_all(&store).unwrap();
3591
3592        // The layer carries its own jsonl listing exactly the loose file —
3593        // disjoint from the type-folder jsonl, so no double-count.
3594        assert!(
3595            exists(&store, "records/index.jsonl"),
3596            "layer jsonl must exist when loose files are present"
3597        );
3598        let layer_jsonl = read(&store, "records/index.jsonl");
3599        assert!(
3600            layer_jsonl.contains("records/loose.md"),
3601            "layer jsonl must list the loose file, got:\n{layer_jsonl}"
3602        );
3603        assert!(
3604            !layer_jsonl.contains("records/contacts/alice.md"),
3605            "layer jsonl must NOT list type-folder files"
3606        );
3607        let tf_jsonl = read(&store, "records/contacts/index.jsonl");
3608        assert!(tf_jsonl.contains("records/contacts/alice.md"));
3609        assert!(!tf_jsonl.contains("records/loose.md"));
3610
3611        // The layer index.md stays a pure type-folder rollup — no loose entry.
3612        let layer_md = read(&store, "records/index.md");
3613        assert!(
3614            layer_md.contains("records/contacts/index"),
3615            "layer md must roll up the type-folder, got:\n{layer_md}"
3616        );
3617        assert!(
3618            !layer_md.contains("records/loose"),
3619            "layer md must stay a rollup, not list loose files, got:\n{layer_md}"
3620        );
3621    }
3622
3623    #[test]
3624    fn loose_file_write_through_equals_rebuild() {
3625        let (_d1, wt) = mk_store();
3626        let (_d2, rb) = mk_store();
3627        for s in [&wt, &rb] {
3628            write_doc(
3629                s,
3630                "records/contacts/alice.md",
3631                "contact",
3632                Some("Alice"),
3633                Some("2026-06-01T08:00:00Z"),
3634                "id: alice\n",
3635            );
3636            write_doc(
3637                s,
3638                "records/loose.md",
3639                "contact",
3640                Some("Loose"),
3641                Some("2026-06-02T08:00:00Z"),
3642                "id: loose\n",
3643            );
3644        }
3645        // wt: write-through (loop); rb: full rebuild (sweep). Must agree byte-wise.
3646        Index::on_write(&wt, Path::new("records/contacts/alice.md")).unwrap();
3647        Index::on_write(&wt, Path::new("records/loose.md")).unwrap();
3648        Index::rebuild_all(&rb).unwrap();
3649
3650        let a = snapshot_artifacts(&wt);
3651        let b = snapshot_artifacts(&rb);
3652        assert_eq!(
3653            a.keys().collect::<Vec<_>>(),
3654            b.keys().collect::<Vec<_>>(),
3655            "loose-file loop and sweep must produce the same artifact set"
3656        );
3657        for (k, v) in &a {
3658            assert_eq!(
3659                v, &b[k],
3660                "loose-file artifact {k} differs between loop and sweep"
3661            );
3662        }
3663    }
3664
3665    #[test]
3666    fn removing_last_loose_file_clears_layer_jsonl() {
3667        let (_d, store) = mk_store();
3668        write_doc(
3669            &store,
3670            "records/loose.md",
3671            "contact",
3672            Some("Loose"),
3673            Some("2026-06-01T08:00:00Z"),
3674            "id: loose\n",
3675        );
3676        Index::on_write(&store, Path::new("records/loose.md")).unwrap();
3677        assert!(
3678            exists(&store, "records/index.jsonl"),
3679            "layer jsonl present after a loose write"
3680        );
3681        fs::remove_file(store.root.join("records/loose.md")).unwrap();
3682        Index::on_remove(&store, Path::new("records/loose.md")).unwrap();
3683        assert!(
3684            !exists(&store, "records/index.jsonl"),
3685            "layer jsonl must be removed once the last loose file is gone"
3686        );
3687    }
3688}