Skip to main content

dbmd_core/
index.rs

1//! `index` — the hierarchical content catalog.
2//!
3//! A uniform three-level tree: root + per-layer + per-type-folder. **Two
4//! artifacts per type-folder:** the human `index.md` (capped 500, recency
5//! browse) and the machine `index.jsonl` (complete, structured — one JSON
6//! object per file). Both read `summary` + key frontmatter fields + links
7//! directly from each file — there is no extraction logic here.
8//!
9//! **Maintained write-through** by the write commands ([`Index::on_write`] /
10//! [`Index::on_rename`] / [`Index::on_remove`] — the loop path, O(changed), no
11//! store walk); [`Index::rebuild_all`] is the from-scratch SWEEP repair.
12//!
13//! **Key invariant:** write-through must produce a byte-identical `index.md`
14//! and (post-compaction) `index.jsonl` to a full [`Index::rebuild_all`] over
15//! the same end state — the loop path can never drift from the repair path.
16//!
17//! # Implementation notes (deviations the reader should know)
18//!
19//! - **Self-contained, by design.** This module does its own shard-aware folder
20//!   walk, its own minimal frontmatter read, and its own atomic write, using
21//!   only `store.root` (a public field) and the `serde_norway` / `serde_json` /
22//!   `chrono` / `walkdir` crates rather than routing through the sibling
23//!   `store`/`parser` helpers ([`Store::walk_type_folder`],
24//!   [`Store::recent_in_type_folder`], [`parser::read_file`], …). The index has
25//!   to stamp a *deterministic* `updated:` and emit a *canonical, compacted*
26//!   `index.jsonl` (see the two notes below); keeping the read/walk/write local
27//!   is what makes the byte-identity invariant a true byte comparison, free of
28//!   any incidental formatting the shared readers might introduce. The public
29//!   signatures in `lib.rs` are untouched.
30//! - **Deterministic `updated:` on the index files themselves.** An index's own
31//!   `updated` frontmatter is derived as the max `updated` over the files it
32//!   catalogs (max over children for root/layer) — NOT wall-clock-now. This is
33//!   what makes the byte-identity invariant a *true* byte comparison: a
34//!   write-through write and a `rebuild_all` over the same end state stamp the
35//!   same value. (The SPEC's rendered examples show a wall-clock-looking value;
36//!   the conventions list only requires `updated: <RFC3339>`, and the
37//!   property-tested invariant dominates.)
38//! - **`index.jsonl` is always compacted.** Write-through rewrites the affected
39//!   type-folder's jsonl in canonical form (one current line per path, recency
40//!   order) rather than appending superseded/tombstone lines, so the jsonl is
41//!   byte-identical to `rebuild_all` *immediately* (a strictly stronger
42//!   guarantee than the SPEC's "post-compaction"). This keeps the loop cost at
43//!   one sidecar read + one rewrite per touched type-folder — O(folder), the
44//!   sanctioned loop primitive, never a whole-`Store::walk`.
45//! - **Root/layer entry styling** follows plan §index (`(N)` numeric counts;
46//!   layer headings in the root carry the layer's total count) which is more
47//!   specific than the SPEC's illustrative `(42 files)` prose example. Type
48//!   folders are listed alphabetically (a deterministic order a derived artifact
49//!   needs); `scope: type-folder` follows the conventions list, not the one
50//!   SPEC example that wrote `scope: folder`.
51
52use std::collections::BTreeMap;
53use std::fs;
54use std::io::Write as _;
55use std::path::{Path, PathBuf};
56
57use chrono::{DateTime, FixedOffset, SecondsFormat};
58use serde::{Deserialize, Serialize};
59use serde_json::Value;
60
61use crate::parser::FolderMeta;
62use crate::store::{Layer, Store};
63
64/// The browse-view cap for a type-folder `index.md`.
65const MD_CAP: usize = 500;
66
67/// Placeholder summary for a content file that has no `summary` frontmatter.
68/// The index never invents a real summary — that is `dbmd fm init`'s job; this
69/// marker is what `dbmd validate` keys off (`INDEX`-class issue).
70const MISSING_SUMMARY: &str = "(no summary)";
71
72/// The root `index.md` H1.
73const ROOT_TITLE: &str = "Knowledge base index";
74
75/// Which level of the catalog an [`Index`] represents.
76#[derive(Debug, Clone, PartialEq, Eq)]
77pub enum IndexLevel {
78    /// The store-wide root `index.md` (layers + per-type counts).
79    Root,
80    /// A layer `index.md` (every type-folder under one layer).
81    Layer(Layer),
82    /// A type-folder `index.md` + `index.jsonl` (every file in the folder).
83    TypeFolder(PathBuf),
84}
85
86/// One record in a type-folder's `index.jsonl` — the complete, structured twin
87/// of a single `index.md` browse entry.
88///
89/// `tags` are the document's flat labels; `links` are its concept/relationship
90/// wiki-link targets. Both are copied verbatim from the file — never inferred.
91/// `fields` holds the remaining type-specific frontmatter so the structured
92/// query path can filter on any key without opening the file.
93#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
94pub struct IndexRecord {
95    /// Store-relative path of the file (the upsert key; last-write-wins).
96    /// Serialized with forward slashes regardless of OS (see [`path_serde`]) so
97    /// the `index.jsonl` catalog is byte-portable across platforms.
98    #[serde(with = "path_serde")]
99    pub path: PathBuf,
100    /// The file's `type`.
101    #[serde(rename = "type")]
102    pub type_: String,
103    /// The file's `summary`.
104    pub summary: String,
105    /// The file's flat `tags`.
106    #[serde(default)]
107    pub tags: Vec<String>,
108    /// The file's concept/relationship wiki-link targets (store-relative).
109    #[serde(default)]
110    pub links: Vec<String>,
111    /// `created` timestamp.
112    pub created: Option<DateTime<FixedOffset>>,
113    /// `updated` timestamp (the recency key for the `index.md` cap order).
114    pub updated: Option<DateTime<FixedOffset>>,
115    /// Remaining type-specific frontmatter fields, verbatim — including the
116    /// record's `id` (SPEC v0.4), which rides here like any other frontmatter
117    /// field rather than as a dedicated column, so `--where id=…` resolves
118    /// through the generic field path and existing sidecars stay byte-stable.
119    #[serde(flatten)]
120    pub fields: BTreeMap<String, Value>,
121}
122
123/// A built (or being-built) catalog for one [`IndexLevel`], with both rendered
124/// artifacts available. Pure data until written via [`Index::write_level`].
125#[derive(Debug, Clone, PartialEq)]
126pub struct Index {
127    /// Which level this catalog is for.
128    pub level: IndexLevel,
129    /// The complete record set for this level (type-folder level; empty for
130    /// root/layer rollups, which carry only counts).
131    pub records: Vec<IndexRecord>,
132    /// Per-child counts for root/layer rollups (child path → file count).
133    pub child_counts: BTreeMap<PathBuf, usize>,
134}
135
136impl Index {
137    /// Build a type-folder catalog by aggregating across date-shards, producing
138    /// both artifacts. `index.md` selection is recency (updated desc, ties by
139    /// path asc; cap 500 with a `## More` footer over the cap); `index.jsonl`
140    /// holds every file. A file missing `summary` gets a placeholder + a
141    /// validate-detectable issue (the index never invents summaries).
142    pub fn build_type_folder(store: &Store, type_folder: &Path) -> crate::Result<Index> {
143        let rel = normalize_rel(type_folder);
144        let abs = store.root.join(&rel);
145        let mut records = Vec::new();
146        for file_abs in walk_type_folder_files(&abs) {
147            let rel_path =
148                rel_to_store(&store.root, &file_abs).expect("walked file is under the store root");
149            // Abort the build on a malformed file rather than skip it. A skipped
150            // file would still be a content member the validator requires to be
151            // catalogued (`validate::walk_content_files` enumerates by filename,
152            // not by parseability), so silently dropping it would leave the store
153            // in a permanently invalid state (`INDEX_MISSING_ENTRY` /
154            // `INDEX_JSONL_DESYNC` that no rebuild can clear) and would desync the
155            // rollups (`build_layer`/`build_root` count the raw `.md` files). The
156            // loud `?` is the right outcome: `cleanup` now preserves the prior
157            // canonical sidecars (`min_depth(2)`), so an aborted rebuild leaves
158            // the existing catalogs intact and the operator a clear error naming
159            // the file to fix — never a destroyed or silently-wrong index.
160            records.push(record_from_file(&file_abs, rel_path)?);
161        }
162        sort_records(&mut records);
163        Ok(Index {
164            level: IndexLevel::TypeFolder(rel),
165            records,
166            child_counts: BTreeMap::new(),
167        })
168    }
169
170    /// Build a layer catalog: every non-empty type-folder under the layer with
171    /// `(N)` counts and a newest-file `summary` preview (≤ 80 chars), plus the
172    /// **loose records** that live directly at the layer root (files with no
173    /// type-folder between them and the layer). The type-folder rollup is the
174    /// `index.md`; the loose records are the layer's own `index.jsonl` (so
175    /// structured reads — `query`, dedup, `graph` — see a loose file the same
176    /// way they see a canonical one). A layer with no loose files carries no
177    /// `index.jsonl`, so existing stores are byte-unchanged.
178    pub fn build_layer(store: &Store, layer: Layer) -> crate::Result<Index> {
179        let mut child_counts = BTreeMap::new();
180        for tf in type_folders_in_layer(store, layer) {
181            let abs = store.root.join(&tf);
182            let n = walk_type_folder_files(&abs).len();
183            if n > 0 {
184                child_counts.insert(tf, n);
185            }
186        }
187        let mut records = Vec::new();
188        for file_abs in loose_files_in_layer(store, layer) {
189            let rel_path =
190                rel_to_store(&store.root, &file_abs).expect("walked file is under the store root");
191            // Abort on a malformed loose file rather than skip it, mirroring
192            // `build_type_folder`: a skipped file is still a content member the
193            // validator requires to be catalogued, so dropping it would leave a
194            // permanently-invalid index. The loud `?` names the file to fix.
195            records.push(record_from_file(&file_abs, rel_path)?);
196        }
197        sort_records(&mut records);
198        Ok(Index {
199            level: IndexLevel::Layer(layer),
200            records,
201            child_counts,
202        })
203    }
204
205    /// Build the store-wide root catalog: one heading per non-empty layer with
206    /// total count + bulleted per-type sub-entries with `(N)` counts.
207    pub fn build_root(store: &Store) -> crate::Result<Index> {
208        let mut child_counts = BTreeMap::new();
209        for layer in Layer::all() {
210            for tf in type_folders_in_layer(store, layer) {
211                let abs = store.root.join(&tf);
212                let n = walk_type_folder_files(&abs).len();
213                if n > 0 {
214                    child_counts.insert(tf, n);
215                }
216            }
217        }
218        Ok(Index {
219            level: IndexLevel::Root,
220            records: Vec::new(),
221            child_counts,
222        })
223    }
224
225    /// Render this catalog as a canonical `index.md`.
226    pub fn to_markdown(&self) -> String {
227        match &self.level {
228            IndexLevel::TypeFolder(folder) => self.render_type_folder_md(folder),
229            IndexLevel::Layer(layer) => self.render_layer_md(*layer),
230            IndexLevel::Root => self.render_root_md(),
231        }
232    }
233
234    /// Render this catalog's `records` as the complete `index.jsonl` (one JSON
235    /// object per file, stable key order so diffs stay minimal). Used at the
236    /// type-folder level for its files, and at the layer level for the loose
237    /// files that live directly at the layer root. The root rollup carries no
238    /// records, so it never produces a jsonl.
239    pub fn to_jsonl(&self) -> String {
240        let mut out = String::new();
241        for rec in &self.records {
242            // The record type derives a deterministic, sorted key order
243            // (declared fields first, then the flattened `fields` BTreeMap).
244            let line = serde_json::to_string(rec).expect("IndexRecord serializes");
245            out.push_str(&line);
246            out.push('\n');
247        }
248        out
249    }
250
251    // ── rendering helpers ────────────────────────────────────────────────
252
253    fn render_type_folder_md(&self, folder: &Path) -> String {
254        let folder_disp = path_to_unix(folder);
255        let updated = max_updated(self.records.iter().map(|r| r.updated.as_ref()));
256        let mut s = String::new();
257        s.push_str("---\n");
258        s.push_str("type: index\n");
259        s.push_str("scope: type-folder\n");
260        s.push_str(&format!("folder: {folder_disp}\n"));
261        if let Some(ts) = updated {
262            s.push_str(&format!("updated: {}\n", fmt_ts(&ts)));
263        }
264        s.push_str("---\n\n");
265        s.push_str(&format!("# {folder_disp}\n\n"));
266
267        let shown = self.records.len().min(MD_CAP);
268        for rec in self.records.iter().take(shown) {
269            s.push_str(&format_md_entry(rec));
270            s.push('\n');
271        }
272
273        if self.records.len() > MD_CAP {
274            let type_ = self.records.first().map(|r| r.type_.as_str()).unwrap_or("");
275            let layer = folder
276                .components()
277                .next()
278                .and_then(|c| c.as_os_str().to_str())
279                .unwrap_or("");
280            s.push('\n');
281            s.push_str(&more_footer(self.records.len(), type_, layer));
282        }
283        s
284    }
285
286    /// Store-less layer rollup: counts only, no preview / no derived `updated`
287    /// (a layer index needs each child's on-disk jsonl for those — see
288    /// [`render_layer_md_with_store`], the canonical path every disk write
289    /// uses). This pure-data render is structurally identical sans preview.
290    fn render_layer_md(&self, layer: Layer) -> String {
291        let layer_dir = layer_dir_name(layer);
292        let mut s = String::new();
293        s.push_str("---\n");
294        s.push_str("type: index\n");
295        s.push_str("scope: layer\n");
296        s.push_str(&format!("folder: {layer_dir}\n"));
297        s.push_str("---\n\n");
298        s.push_str(&format!("# {layer_dir}\n\n"));
299        for (tf, n) in &self.child_counts {
300            let tf_unix = path_to_unix(tf);
301            let display = capitalize(folder_basename(tf));
302            s.push_str(&format!("- [[{tf_unix}/index|{display}]] ({n})\n"));
303        }
304        s
305    }
306
307    /// Store-less root rollup: counts only (the canonical disk render adds a
308    /// derived `updated` — see [`render_root_md_with_store`]).
309    fn render_root_md(&self) -> String {
310        let mut s = String::new();
311        s.push_str("---\n");
312        s.push_str("type: index\n");
313        s.push_str("scope: root\n");
314        s.push_str("---\n\n");
315        s.push_str(&format!("# {ROOT_TITLE}\n"));
316        for layer in Layer::all() {
317            let layer_dir = layer_dir_name(layer);
318            let prefix = format!("{layer_dir}/");
319            let children: Vec<(&PathBuf, &usize)> = self
320                .child_counts
321                .iter()
322                .filter(|(tf, _)| path_to_unix(tf).starts_with(&prefix))
323                .collect();
324            if children.is_empty() {
325                continue;
326            }
327            let total: usize = children.iter().map(|(_, n)| **n).sum();
328            s.push('\n');
329            s.push_str(&format!("## {} ({total})\n", capitalize(layer_dir)));
330            for (tf, n) in children {
331                let tf_unix = path_to_unix(tf);
332                let display = capitalize(folder_basename(tf));
333                s.push_str(&format!("- [[{tf_unix}/index|{display}]] ({n})\n"));
334            }
335        }
336        s
337    }
338}
339
340// ─────────────────────────────────────────────────────────────────────────
341// Write-through + sweep (free functions on the impl block).
342// ─────────────────────────────────────────────────────────────────────────
343
344impl Index {
345    /// **Write-through (loop, O(changed)).** Upsert a new/updated content file.
346    /// Reads the affected type-folder's `index.jsonl` (the sanctioned per-folder
347    /// sidecar read — never a whole-store walk), applies the change, and
348    /// atomically rewrites that folder's `index.md` + `index.jsonl` plus the
349    /// parent layer + root rollups so the artifacts equal a `rebuild_all` over
350    /// the same end state.
351    pub fn on_write(store: &Store, file: &Path) -> crate::Result<()> {
352        let file_rel = normalize_rel(file);
353        // The generated catalog files are not content — never upsert one into
354        // itself. `build_type_folder`'s walk already excludes `index.md`
355        // (`walk_type_folder_files`); the loop path must apply the same
356        // exclusion or editing `index.md` via `fm set` inserts a phantom
357        // self-row, inflating every `(N)` count and breaking the
358        // write-through == rebuild byte-identity invariant.
359        if is_index_artifact(&file_rel) {
360            return Ok(());
361        }
362        // A loose file (directly at a layer root, no type-folder) is catalogued
363        // in its layer's own `index.jsonl`; the layer `index.md` rollup is
364        // unaffected (loose files do not change type-folder counts).
365        if let Some(layer) = loose_layer_of(&file_rel) {
366            return apply_loose_change(store, layer, &file_rel, false);
367        }
368        let file_abs = store.root.join(&file_rel);
369        let folder = type_folder_of(&file_rel)
370            .ok_or_else(|| bad_index(&file_rel, "file is not inside a layer/type-folder"))?;
371        let record = record_from_file(&file_abs, file_rel.clone())?;
372
373        // Serialize the sidecar read-modify-write so concurrent sanctioned
374        // writes to this folder don't clobber each other's rows (lost update).
375        let _lock = FolderLock::acquire(&store.root.join(&folder));
376        let mut records = read_jsonl_records(&store.root.join(&folder).join("index.jsonl"))?;
377        records.retain(|r| r.path != record.path);
378        records.push(record);
379        sort_records(&mut records);
380
381        write_type_folder_artifacts(store, &folder, &records)?;
382        update_parents(store, &folder)?;
383        Ok(())
384    }
385
386    /// **Write-through (loop, O(changed)).** Move a file's entry between
387    /// type-folder indexes (or within, if the same folder) in both `index.md`
388    /// and `index.jsonl`, fixing counts on both sides.
389    pub fn on_rename(store: &Store, old: &Path, new: &Path) -> crate::Result<()> {
390        let old_rel = normalize_rel(old);
391        let new_rel = normalize_rel(new);
392        // Index artifacts are generated, not catalogued — a rename of/into one
393        // is not a content move (same reasoning as `on_write`). Skip rather than
394        // insert a phantom self-row.
395        if is_index_artifact(&old_rel) || is_index_artifact(&new_rel) {
396            return Ok(());
397        }
398        // If either side is a loose file (layer root, no type-folder), decompose
399        // into remove-old + add-new: each entry point routes to the correct
400        // catalog (the layer `index.jsonl` for a loose side, the type-folder for
401        // the other), giving the same end state as the cross-folder path below
402        // while reusing the tested single-file paths.
403        if loose_layer_of(&old_rel).is_some() || loose_layer_of(&new_rel).is_some() {
404            Self::on_remove(store, &old_rel)?;
405            Self::on_write(store, &new_rel)?;
406            return Ok(());
407        }
408        let old_folder = type_folder_of(&old_rel)
409            .ok_or_else(|| bad_index(&old_rel, "source is not inside a layer/type-folder"))?;
410        let new_folder = type_folder_of(&new_rel)
411            .ok_or_else(|| bad_index(&new_rel, "target is not inside a layer/type-folder"))?;
412
413        // Serialize the sidecar read-modify-write(s). For a cross-folder rename,
414        // lock BOTH folders, always in sorted order, so two renames touching the
415        // same pair can't deadlock. Held for the whole operation via RAII.
416        let _locks = lock_folders(store, &old_folder, &new_folder);
417
418        // Drop from the old folder.
419        let mut old_records =
420            read_jsonl_records(&store.root.join(&old_folder).join("index.jsonl"))?;
421        old_records.retain(|r| r.path != old_rel);
422
423        if old_folder == new_folder {
424            // Same folder: re-read the (now-renamed) file and upsert.
425            let record = record_from_file(&store.root.join(&new_rel), new_rel.clone())?;
426            old_records.retain(|r| r.path != record.path);
427            old_records.push(record);
428            sort_records(&mut old_records);
429            write_type_folder_artifacts(store, &old_folder, &old_records)?;
430            update_parents(store, &old_folder)?;
431            return Ok(());
432        }
433
434        // Cross-folder: write the trimmed old folder (or drop its indexes if
435        // now empty), then upsert into the new folder.
436        sort_records(&mut old_records);
437        write_type_folder_artifacts(store, &old_folder, &old_records)?;
438
439        let record = record_from_file(&store.root.join(&new_rel), new_rel.clone())?;
440        let mut new_records =
441            read_jsonl_records(&store.root.join(&new_folder).join("index.jsonl"))?;
442        new_records.retain(|r| r.path != record.path);
443        new_records.push(record);
444        sort_records(&mut new_records);
445        write_type_folder_artifacts(store, &new_folder, &new_records)?;
446
447        update_parents(store, &old_folder)?;
448        update_parents(store, &new_folder)?;
449        Ok(())
450    }
451
452    /// **Write-through (loop, O(changed)).** Drop a file's entry from both
453    /// `index.md` and `index.jsonl`; decrement counts; if the browse view drops
454    /// below the cap, the next-most-recent is already present in the complete
455    /// jsonl record set and re-renders into the md automatically.
456    pub fn on_remove(store: &Store, file: &Path) -> crate::Result<()> {
457        let file_rel = normalize_rel(file);
458        // Removing a generated catalog artifact is not a content removal; it has
459        // no row to drop (it was never catalogued). Skip, mirroring `on_write`.
460        if is_index_artifact(&file_rel) {
461            return Ok(());
462        }
463        // Loose file → drop its row from the layer `index.jsonl`.
464        if let Some(layer) = loose_layer_of(&file_rel) {
465            return apply_loose_change(store, layer, &file_rel, true);
466        }
467        let folder = type_folder_of(&file_rel)
468            .ok_or_else(|| bad_index(&file_rel, "file is not inside a layer/type-folder"))?;
469        // Serialize the sidecar read-modify-write (see `on_write`).
470        let _lock = FolderLock::acquire(&store.root.join(&folder));
471        let mut records = read_jsonl_records(&store.root.join(&folder).join("index.jsonl"))?;
472        let before = records.len();
473        records.retain(|r| r.path != file_rel);
474        if records.len() == before {
475            // Nothing to remove; still normalize the folder + parents so the
476            // artifacts stay canonical.
477        }
478        sort_records(&mut records);
479        write_type_folder_artifacts(store, &folder, &records)?;
480        update_parents(store, &folder)?;
481        Ok(())
482    }
483
484    /// **SWEEP repair.** Walk the store once and atomically (re)write root +
485    /// every non-empty layer + every non-empty type-folder `index.md` and
486    /// `index.jsonl` (compacting the jsonl). Also runs [`Index::cleanup`].
487    pub fn rebuild_all(store: &Store) -> crate::Result<()> {
488        Index::cleanup(store)?;
489        for layer in Layer::all() {
490            for tf in type_folders_in_layer(store, layer) {
491                let idx = Index::build_type_folder(store, &tf)?;
492                if idx.records.is_empty() {
493                    continue;
494                }
495                write_type_folder_artifacts(store, &tf, &idx.records)?;
496            }
497            let layer_idx = Index::build_layer(store, layer)?;
498            let layer_index_md = store.root.join(layer_dir_name(layer)).join("index.md");
499            if layer_idx.child_counts.is_empty() {
500                remove_if_exists(&layer_index_md)?;
501            } else {
502                write_atomic(
503                    &layer_index_md,
504                    render_layer_md_with_store(store, &layer_idx),
505                )?;
506            }
507            // The layer's own `index.jsonl` — present iff the layer has loose
508            // files directly at its root. Independent of the rollup above: a
509            // layer can have loose files but no type-folders, or vice versa.
510            write_layer_jsonl(store, layer, &layer_idx.records)?;
511        }
512        let root_idx = Index::build_root(store)?;
513        let root_index_md = store.root.join("index.md");
514        if root_idx.child_counts.is_empty() {
515            remove_if_exists(&root_index_md)?;
516        } else {
517            write_atomic(&root_index_md, render_root_md_with_store(store, &root_idx))?;
518        }
519        Ok(())
520    }
521
522    /// Rebuild ONE type-folder's `index.md`/`index.jsonl` from a fresh walk, then
523    /// cascade the new child count up to the layer and root rollups — so a
524    /// scoped `dbmd index rebuild --folder` leaves the hierarchy consistent,
525    /// exactly like `rebuild_all` and the loop-path `on_write` already do.
526    /// (Writing only the folder, as the CLI used to, left stale layer/root
527    /// counts that `validate` would then flag as an index desync.)
528    pub fn rebuild_folder(store: &Store, folder: &Path) -> crate::Result<()> {
529        Self::write_level(store, &IndexLevel::TypeFolder(folder.to_path_buf()))?;
530        update_parents(store, folder)
531    }
532
533    /// Atomically write a single level's artifact(s) to disk.
534    pub fn write_level(store: &Store, level: &IndexLevel) -> crate::Result<()> {
535        match level {
536            IndexLevel::TypeFolder(folder) => {
537                let idx = Index::build_type_folder(store, folder)?;
538                if idx.records.is_empty() {
539                    remove_if_exists(&store.root.join(folder).join("index.md"))?;
540                    remove_if_exists(&store.root.join(folder).join("index.jsonl"))?;
541                } else {
542                    write_type_folder_artifacts(store, folder, &idx.records)?;
543                }
544            }
545            IndexLevel::Layer(layer) => {
546                let idx = Index::build_layer(store, *layer)?;
547                let p = store.root.join(layer_dir_name(*layer)).join("index.md");
548                if idx.child_counts.is_empty() {
549                    remove_if_exists(&p)?;
550                } else {
551                    write_atomic(&p, render_layer_md_with_store(store, &idx))?;
552                }
553                write_layer_jsonl(store, *layer, &idx.records)?;
554            }
555            IndexLevel::Root => {
556                let idx = Index::build_root(store)?;
557                let p = store.root.join("index.md");
558                if idx.child_counts.is_empty() {
559                    remove_if_exists(&p)?;
560                } else {
561                    write_atomic(&p, render_root_md_with_store(store, &idx))?;
562                }
563            }
564        }
565        Ok(())
566    }
567
568    /// Render the generated indexes to a string with `--- <path> ---`
569    /// separators instead of writing them (`--dry-run`).
570    pub fn render_dry_run(store: &Store, level: &IndexLevel) -> crate::Result<String> {
571        let mut out = String::new();
572        match level {
573            IndexLevel::TypeFolder(folder) => {
574                let idx = Index::build_type_folder(store, folder)?;
575                let md_path = path_to_unix(&folder.join("index.md"));
576                let jsonl_path = path_to_unix(&folder.join("index.jsonl"));
577                out.push_str(&format!("--- {md_path} ---\n"));
578                out.push_str(&idx.to_markdown());
579                out.push_str(&format!("--- {jsonl_path} ---\n"));
580                out.push_str(&idx.to_jsonl());
581            }
582            IndexLevel::Layer(layer) => {
583                let idx = Index::build_layer(store, *layer)?;
584                let md_path = format!("{}/index.md", layer_dir_name(*layer));
585                out.push_str(&format!("--- {md_path} ---\n"));
586                out.push_str(&render_layer_md_with_store(store, &idx));
587            }
588            IndexLevel::Root => {
589                let idx = Index::build_root(store)?;
590                out.push_str("--- index.md ---\n");
591                out.push_str(&render_root_md_with_store(store, &idx));
592            }
593        }
594        Ok(out)
595    }
596
597    /// Cleanup pass (part of [`Index::rebuild_all`]): delete `index.md` /
598    /// `index.jsonl` in non-canonical folders (date-shards that should carry
599    /// none). Symmetric with index creation.
600    ///
601    /// **Only deletes generated catalog artifacts, never user content.** Two
602    /// guards keep this from eating data:
603    /// - `min_depth(2)` so the walk starts *below* the type-folder root — the
604    ///   canonical `<type-folder>/index.md` + `index.jsonl` are never targeted
605    ///   here (they are rewritten by the per-folder builders, or removed only
606    ///   when the folder is genuinely empty, in the dedicated branch below). The
607    ///   old `min_depth(1)` deleted them up front, so a rebuild aborted by one
608    ///   malformed file left every type-folder catalog destroyed.
609    /// - [`is_deletable_catalog_artifact`] confirms a shard-level `index.md` is
610    ///   an actual generated catalog (or stale/garbage leftover), NOT a content
611    ///   file a user wrote at that name (e.g. `dbmd write …/index.md --type
612    ///   email`, plausible when mirroring a website/doc export). Matching by
613    ///   filename alone silently deleted such records on the next rebuild.
614    pub fn cleanup(store: &Store) -> crate::Result<()> {
615        for layer in Layer::all() {
616            let layer_dir = store.root.join(layer_dir_name(layer));
617            if !layer_dir.is_dir() {
618                continue;
619            }
620            for tf in type_folders_in_layer(store, layer) {
621                let tf_abs = store.root.join(&tf);
622                // Any generated index inside a shard (below the type-folder
623                // root) is non-canonical: delete it. Never touch a user content
624                // file that merely happens to be named index.md.
625                for entry in walkdir::WalkDir::new(&tf_abs)
626                    .min_depth(2)
627                    .into_iter()
628                    .filter_map(|e| e.ok())
629                {
630                    let p = entry.path();
631                    if is_index_artifact(p) && is_deletable_catalog_artifact(p) {
632                        remove_if_exists(p)?;
633                    }
634                }
635                // Empty type-folder → no index at its root either. Same content
636                // guard: an `index.md` here that is actually a user record (the
637                // only file in the folder) is preserved, not deleted.
638                if walk_type_folder_files(&tf_abs).is_empty() {
639                    let md = tf_abs.join("index.md");
640                    if is_deletable_catalog_artifact(&md) {
641                        remove_if_exists(&md)?;
642                    }
643                    remove_if_exists(&tf_abs.join("index.jsonl"))?;
644                }
645            }
646        }
647        Ok(())
648    }
649}
650
651// ─────────────────────────────────────────────────────────────────────────
652// Private free helpers — all self-contained, none call back into Store/parser.
653// ─────────────────────────────────────────────────────────────────────────
654
655/// Write both artifacts for a type-folder, or delete them if the folder is now
656/// empty. The single funnel both write-through and rebuild go through, so their
657/// output is byte-identical by construction.
658fn write_type_folder_artifacts(
659    store: &Store,
660    folder: &Path,
661    records: &[IndexRecord],
662) -> crate::Result<()> {
663    let folder_abs = store.root.join(folder);
664    let md_path = folder_abs.join("index.md");
665    let jsonl_path = folder_abs.join("index.jsonl");
666    if records.is_empty() {
667        remove_if_exists(&md_path)?;
668        remove_if_exists(&jsonl_path)?;
669        return Ok(());
670    }
671    let idx = Index {
672        level: IndexLevel::TypeFolder(folder.to_path_buf()),
673        records: records.to_vec(),
674        child_counts: BTreeMap::new(),
675    };
676    write_atomic(&md_path, idx.to_markdown())?;
677    write_atomic(&jsonl_path, idx.to_jsonl())?;
678    Ok(())
679}
680
681/// Re-render the layer + root rollups that sit above `folder` — the
682/// **loop path**, O(changed). Counts + previews come from the type-folders'
683/// on-disk `index.jsonl` sidecars ([`collect_child_stats`]), NOT from a
684/// content-tree walk: a single write reads one sidecar per type-folder (shared
685/// across the layer and root rollups) — never the millions of files under the
686/// shards. `build_layer` / `build_root` (which *do* walk the content tree) are
687/// reserved for the from-scratch sweeps ([`Index::rebuild_all`],
688/// [`Index::write_level`], [`Index::render_dry_run`]). The result is
689/// byte-identical to those builders because in the loop — exactly as in
690/// `rebuild_all` — every touched folder's jsonl is rewritten before its parents
691/// are rolled up, so the per-folder stat (`count` / `newest`) equals what a
692/// from-scratch walk would compute.
693fn update_parents(store: &Store, folder: &Path) -> crate::Result<()> {
694    // Read every type-folder's sidecar EXACTLY ONCE into a stat cache (`count` +
695    // `newest` record), then render both rollups from the cache. This removed the
696    // old 2–3×-per-write reparse (`child_counts_from_jsonl` for a count, plus
697    // `render_layer_md_with_store` / `render_root_md_with_store` each doing a full
698    // `read_jsonl_records` parse + sort just to take `.first()`); the output stays
699    // byte-identical (`count` == `read_jsonl_records().len()`, `newest` == its
700    // `.first()`).
701    //
702    // COST, stated honestly: this is `O(total catalogued records)` per write, NOT
703    // `O(changed)`. `collect_child_stats` reads and line-parses EVERY type-folder
704    // sidecar in the store to recompute the rollups, so a single high-volume
705    // folder (months of ingested emails) makes an unrelated tiny write scan that
706    // whole sidecar (a ~50× slowdown at ~200k records was measured). The crate's
707    // literal `Store::walk` guard holds — this reads `index.jsonl` sidecars, not
708    // the content tree — but the broader `O(changed)` complexity the loop path
709    // advertises is NOT met here. Restoring true `O(changed)` needs a persisted
710    // per-folder stat cache (or an in-place rollup patch for `on_write`); that is
711    // a deliberate change to the catalog hot path, tracked as a follow-up, not
712    // done inline. Until then, do not describe this op as `O(changed)`.
713    //
714    // CONCURRENCY: the layer `index.md` and the root `index.md` are SHARED across
715    // every type-folder, but the calling write only holds a lock on its OWN
716    // type-folder (`on_write`/`on_remove`/`on_rename`). Two concurrent writes to
717    // *different* type-folders would otherwise both read the sidecar set and both
718    // rewrite the same two rollups, losing one update (a stale rollup that no
719    // longer matches `rebuild_all` — a write-through/rebuild parity violation).
720    // Serialize the whole read-stats + render + write under a store-root lock so
721    // the last writer to commit its sidecar (each write commits its own
722    // `index.jsonl` BEFORE calling here) observes every committed sidecar. Lock
723    // order is always type-folder(s) → root, and nothing acquires the root lock
724    // before a type-folder lock, so this cannot deadlock with the per-folder
725    // locks held by the caller.
726    let _root_lock = FolderLock::acquire(&store.root);
727    let stats = collect_child_stats(store, &Layer::all())?;
728
729    let layer = folder
730        .components()
731        .next()
732        .and_then(|c| c.as_os_str().to_str())
733        .and_then(layer_from_dir_name);
734    if let Some(layer) = layer {
735        let p = store.root.join(layer_dir_name(layer)).join("index.md");
736        if layer_has_children(&stats, layer) {
737            write_atomic(
738                &p,
739                render_layer_md_from_stats(layer, &stats, &store.config.folders),
740            )?;
741        } else {
742            remove_if_exists(&p)?;
743        }
744    }
745    let rp = store.root.join("index.md");
746    if stats.values().any(|s| s.count > 0) {
747        write_atomic(
748            &rp,
749            render_root_md_from_stats(&stats, &store.config.folders),
750        )?;
751    } else {
752        remove_if_exists(&rp)?;
753    }
754    Ok(())
755}
756
757/// True if `layer` has at least one non-empty child type-folder in `stats`.
758fn layer_has_children(stats: &BTreeMap<PathBuf, FolderStat>, layer: Layer) -> bool {
759    let prefix = format!("{}/", layer_dir_name(layer));
760    stats
761        .iter()
762        .any(|(tf, s)| s.count > 0 && path_to_unix(tf).starts_with(&prefix))
763}
764
765/// Render a layer `index.md` from the prebuilt per-folder stat cache — each
766/// child's count + newest summary/updated come from its single cached sidecar
767/// read, so the rollup matches the folder artifacts exactly (write-through and
768/// rebuild alike) without re-reading any sidecar.
769fn render_layer_md_from_stats(
770    layer: Layer,
771    stats: &BTreeMap<PathBuf, FolderStat>,
772    folders: &BTreeMap<String, FolderMeta>,
773) -> String {
774    let layer_dir = layer_dir_name(layer);
775    let prefix = format!("{layer_dir}/");
776    let mut max_upd: Option<DateTime<FixedOffset>> = None;
777    let mut entries = String::new();
778    for (tf, stat) in stats {
779        if stat.count == 0 || !path_to_unix(tf).starts_with(&prefix) {
780            continue;
781        }
782        if let Some(u) = stat.newest.as_ref().and_then(|r| r.updated) {
783            max_upd = Some(match max_upd {
784                Some(cur) if cur >= u => cur,
785                _ => u,
786            });
787        }
788        let tf_unix = path_to_unix(tf);
789        let (display, description) = folder_label(&tf_unix, folder_basename(tf), folders);
790        entries.push_str(&folder_entry(&tf_unix, &display, stat.count, description));
791    }
792    let mut s = String::new();
793    s.push_str("---\n");
794    s.push_str("type: index\n");
795    s.push_str("scope: layer\n");
796    s.push_str(&format!("folder: {layer_dir}\n"));
797    if let Some(ts) = max_upd {
798        s.push_str(&format!("updated: {}\n", fmt_ts(&ts)));
799    }
800    s.push_str("---\n\n");
801    s.push_str(&format!("# {layer_dir}\n\n"));
802    s.push_str(&entries);
803    s
804}
805
806/// Render the root `index.md` from the prebuilt per-folder stat cache.
807fn render_root_md_from_stats(
808    stats: &BTreeMap<PathBuf, FolderStat>,
809    folders: &BTreeMap<String, FolderMeta>,
810) -> String {
811    let mut max_upd: Option<DateTime<FixedOffset>> = None;
812    for stat in stats.values() {
813        if stat.count == 0 {
814            continue;
815        }
816        if let Some(u) = stat.newest.as_ref().and_then(|r| r.updated) {
817            max_upd = Some(match max_upd {
818                Some(cur) if cur >= u => cur,
819                _ => u,
820            });
821        }
822    }
823    let mut s = String::new();
824    s.push_str("---\n");
825    s.push_str("type: index\n");
826    s.push_str("scope: root\n");
827    if let Some(ts) = max_upd {
828        s.push_str(&format!("updated: {}\n", fmt_ts(&ts)));
829    }
830    s.push_str("---\n\n");
831    s.push_str(&format!("# {ROOT_TITLE}\n"));
832    for layer in Layer::all() {
833        let layer_dir = layer_dir_name(layer);
834        let prefix = format!("{layer_dir}/");
835        let children: Vec<(&PathBuf, usize)> = stats
836            .iter()
837            .filter(|(tf, s)| s.count > 0 && path_to_unix(tf).starts_with(&prefix))
838            .map(|(tf, s)| (tf, s.count))
839            .collect();
840        if children.is_empty() {
841            continue;
842        }
843        let total: usize = children.iter().map(|(_, n)| *n).sum();
844        s.push('\n');
845        s.push_str(&format!("## {} ({total})\n", capitalize(layer_dir)));
846        for (tf, n) in children {
847            let tf_unix = path_to_unix(tf);
848            let (display, description) = folder_label(&tf_unix, folder_basename(tf), folders);
849            s.push_str(&folder_entry(&tf_unix, &display, n, description));
850        }
851    }
852    s
853}
854
855/// Render a layer `index.md`, reading each child's newest summary + max-updated
856/// straight from its on-disk `index.jsonl` (so the rollup matches the folder
857/// artifacts exactly, write-through and rebuild alike). The **sweep-path**
858/// renderer used by [`Index::rebuild_all`] / [`Index::write_level`] /
859/// [`Index::render_dry_run`]; the loop path uses the cache-based
860/// [`render_layer_md_from_stats`] to avoid re-reading sidecars.
861fn render_layer_md_with_store(store: &Store, idx: &Index) -> String {
862    let layer = match idx.level {
863        IndexLevel::Layer(l) => l,
864        _ => unreachable!("render_layer_md_with_store called on non-layer"),
865    };
866    let layer_dir = layer_dir_name(layer);
867    let mut max_upd: Option<DateTime<FixedOffset>> = None;
868    let mut entries = String::new();
869    for (tf, n) in &idx.child_counts {
870        let recs = read_jsonl_records(&store.root.join(tf).join("index.jsonl")).unwrap_or_default();
871        let newest = recs.first();
872        if let Some(u) = newest.and_then(|r| r.updated) {
873            max_upd = Some(match max_upd {
874                Some(cur) if cur >= u => cur,
875                _ => u,
876            });
877        }
878        let tf_unix = path_to_unix(tf);
879        let (display, description) =
880            folder_label(&tf_unix, folder_basename(tf), &store.config.folders);
881        entries.push_str(&folder_entry(&tf_unix, &display, *n, description));
882    }
883    let mut s = String::new();
884    s.push_str("---\n");
885    s.push_str("type: index\n");
886    s.push_str("scope: layer\n");
887    s.push_str(&format!("folder: {layer_dir}\n"));
888    if let Some(ts) = max_upd {
889        s.push_str(&format!("updated: {}\n", fmt_ts(&ts)));
890    }
891    s.push_str("---\n\n");
892    s.push_str(&format!("# {layer_dir}\n\n"));
893    s.push_str(&entries);
894    s
895}
896
897/// Render the root `index.md`, taking each child's max-updated from its on-disk
898/// `index.jsonl`. The **sweep-path** renderer (the loop path uses
899/// [`render_root_md_from_stats`]).
900fn render_root_md_with_store(store: &Store, idx: &Index) -> String {
901    let mut max_upd: Option<DateTime<FixedOffset>> = None;
902    for tf in idx.child_counts.keys() {
903        let recs = read_jsonl_records(&store.root.join(tf).join("index.jsonl")).unwrap_or_default();
904        if let Some(u) = recs.first().and_then(|r| r.updated) {
905            max_upd = Some(match max_upd {
906                Some(cur) if cur >= u => cur,
907                _ => u,
908            });
909        }
910    }
911    let mut s = String::new();
912    s.push_str("---\n");
913    s.push_str("type: index\n");
914    s.push_str("scope: root\n");
915    if let Some(ts) = max_upd {
916        s.push_str(&format!("updated: {}\n", fmt_ts(&ts)));
917    }
918    s.push_str("---\n\n");
919    s.push_str(&format!("# {ROOT_TITLE}\n"));
920    for layer in Layer::all() {
921        let layer_dir = layer_dir_name(layer);
922        let prefix = format!("{layer_dir}/");
923        let children: Vec<(&PathBuf, &usize)> = idx
924            .child_counts
925            .iter()
926            .filter(|(tf, _)| path_to_unix(tf).starts_with(&prefix))
927            .collect();
928        if children.is_empty() {
929            continue;
930        }
931        let total: usize = children.iter().map(|(_, n)| **n).sum();
932        s.push('\n');
933        s.push_str(&format!("## {} ({total})\n", capitalize(layer_dir)));
934        for (tf, n) in children {
935            let tf_unix = path_to_unix(tf);
936            let (display, description) =
937                folder_label(&tf_unix, folder_basename(tf), &store.config.folders);
938            s.push_str(&folder_entry(&tf_unix, &display, *n, description));
939        }
940    }
941    s
942}
943
944/// One `index.md` browse line: `- [[path]] — summary  ·  #tag #tag` (the
945/// `  ·  #…` suffix omitted when the file has no tags). The wiki-link target is
946/// the canonical **bare** store-relative path (no `.md` extension — the
947/// doctrine the writers emit and `validate` enforces via
948/// `WIKI_LINK_HAS_EXTENSION`); the jsonl `path` keeps the real on-disk name.
949fn format_md_entry(rec: &IndexRecord) -> String {
950    let path = wiki_target(&rec.path);
951    // Collapse the summary to a single line before interpolating it into the
952    // one-line browse entry. A hand-written file may legally carry a YAML block
953    // scalar (`summary: |-`) whose value spans multiple lines; rendered verbatim
954    // those embedded newlines break the line-oriented `index.md` format and can
955    // forge a standalone catalog entry (`\n- [[…|Click me]] — injected`). The
956    // CLI writers already collapse whitespace; do the same here so the spec's
957    // primary write path (agents writing files directly) can't corrupt the
958    // catalog.
959    let summary = collapse_whitespace(&rec.summary);
960    let mut line = format!("- [[{path}]] — {summary}");
961    if !rec.tags.is_empty() {
962        let tags = rec
963            .tags
964            .iter()
965            .map(|t| format!("#{t}"))
966            .collect::<Vec<_>>()
967            .join(" ");
968        line.push_str(&format!("  ·  {tags}"));
969    }
970    line
971}
972
973/// The deterministic `## More` footer for an over-cap type-folder.
974fn more_footer(total: usize, type_: &str, layer: &str) -> String {
975    format!(
976        "## More\n\nThis folder has {total} files. The {MD_CAP} most recent are listed above.\nUse `dbmd query --type {type_} --in {layer}` for the complete catalog.\n"
977    )
978}
979
980/// Canonical total order: `updated` descending (None sorts last), ties broken
981/// by store-relative path ascending. A *total* order, so write-through and
982/// rebuild never disagree on #500 vs #501.
983fn sort_records(records: &mut [IndexRecord]) {
984    records.sort_by(record_recency_cmp);
985}
986
987impl IndexRecord {
988    /// Build the [`IndexRecord`] a freshly-rebuilt `index.jsonl` *should* hold
989    /// for the file at `abs` (catalogued under store-relative `rel`).
990    ///
991    /// This is the single canonical projection from frontmatter → sidecar
992    /// record: [`Index::build_type_folder`] uses the same path to write the
993    /// jsonl, so the validator can rebuild the expected record here and compare
994    /// it field-for-field against the committed line — covering **every**
995    /// queryable/dedup field the query path reads (`summary`, `type`, `tags`,
996    /// `links`, `created`, `updated`, and every type-specific `fields` entry
997    /// like `email` / `domain` / `company` / `amount` / `vendor`) without the
998    /// validator hand-rolling (and drifting from) the projection per field.
999    pub(crate) fn expected_from_file(abs: &Path, rel: PathBuf) -> crate::Result<IndexRecord> {
1000        record_from_file(abs, rel)
1001    }
1002}
1003
1004/// Build an [`IndexRecord`] from a file on disk. Missing `summary` →
1005/// [`MISSING_SUMMARY`] placeholder (the index never invents a summary).
1006fn record_from_file(abs: &Path, rel: PathBuf) -> crate::Result<IndexRecord> {
1007    let mut meta = read_frontmatter(abs)?;
1008    // Records carry an effective `meta-type` in the catalog: the declared value
1009    // (already spilled into `fields` by `read_frontmatter`), or the default
1010    // `fact` when absent — so `--where meta-type=fact` sees un-annotated records.
1011    // Sources are evidence and carry no meta-type.
1012    if rel.starts_with("records") {
1013        meta.fields
1014            .entry("meta-type".to_string())
1015            .or_insert_with(|| Value::String("fact".to_string()));
1016    }
1017    Ok(IndexRecord {
1018        path: rel,
1019        type_: meta.type_.unwrap_or_default(),
1020        summary: meta.summary.unwrap_or_else(|| MISSING_SUMMARY.to_string()),
1021        tags: meta.tags,
1022        links: meta.links,
1023        created: meta.created,
1024        updated: meta.updated,
1025        fields: meta.fields,
1026    })
1027}
1028
1029/// The slice of a frontmatter this module needs.
1030struct FileMeta {
1031    type_: Option<String>,
1032    summary: Option<String>,
1033    tags: Vec<String>,
1034    links: Vec<String>,
1035    created: Option<DateTime<FixedOffset>>,
1036    updated: Option<DateTime<FixedOffset>>,
1037    fields: BTreeMap<String, Value>,
1038}
1039
1040/// Minimal frontmatter read: split the leading `---`…`---` block and parse it
1041/// as YAML, extracting the typed fields and spilling the rest into `fields`.
1042/// Self-contained (does not route through the `parser` module).
1043///
1044/// **Body bytes are never required to be UTF-8.** `sources/` is "preserved
1045/// verbatim" per the SPEC and routinely carries non-UTF-8 imports (Latin-1
1046/// emails dropped in by `rsync`/`mbsync`/`cp`); the body can hold any byte. We
1047/// read the file as raw bytes and lossily decode *only* the leading frontmatter
1048/// region, so a stray non-UTF-8 byte in the body can never abort the projection
1049/// (the old `fs::read_to_string` failed on the first such byte anywhere in the
1050/// file, taking a whole `rebuild_all` / write-through down with it). The
1051/// frontmatter itself is expected to be UTF-8; if it isn't, `U+FFFD` markers
1052/// surface in the parsed values rather than a hard abort.
1053fn read_frontmatter(abs: &Path) -> crate::Result<FileMeta> {
1054    let bytes = fs::read(abs)?;
1055    let yaml = extract_frontmatter_block_lossy(&bytes).unwrap_or_default();
1056    let map: serde_norway::Mapping = if yaml.trim().is_empty() {
1057        serde_norway::Mapping::new()
1058    } else {
1059        serde_norway::from_str(&yaml).map_err(|e| {
1060            crate::Error::Store(crate::store::StoreError::BadTypeIndex {
1061                path: abs.to_path_buf(),
1062                message: format!("frontmatter YAML: {e}"),
1063            })
1064        })?
1065    };
1066
1067    let mut type_ = None;
1068    let mut summary = None;
1069    let mut tags = Vec::new();
1070    let mut links = Vec::new();
1071    let mut created = None;
1072    let mut updated = None;
1073    let mut fields = BTreeMap::new();
1074
1075    for (k, v) in map {
1076        let key = match k.as_str() {
1077            Some(s) => s.to_string(),
1078            None => continue,
1079        };
1080        match key.as_str() {
1081            // `type` and `summary` are coerced with the SAME scalar rule the
1082            // validator applies (`validate::scalar_string`: String/Number/Bool →
1083            // string). A bare `v.as_str()` returns `None` for an unquoted numeric
1084            // or boolean scalar (`summary: 2026`, `type: true`), so the index
1085            // would write the `(no summary)` / empty-type placeholder while
1086            // `dbmd validate` reads the file as HAVING that summary/type —
1087            // yielding a permanently-unfixable `INDEX_SUMMARY_MISMATCH` (every
1088            // rebuild reproduces the same mismatched placeholder). Coercing here
1089            // keeps the writer and the validator byte-for-byte in agreement.
1090            "type" => type_ = scalar_string(&v),
1091            "summary" => summary = scalar_string(&v),
1092            "tags" => tags = yaml_string_list(&v),
1093            "links" => links = yaml_string_list(&v),
1094            "created" => created = v.as_str().and_then(parse_ts),
1095            "updated" => updated = v.as_str().and_then(parse_ts),
1096            // `path`, `type`, `summary`, `tags`, `links`, `created`, `updated`
1097            // are the reserved IndexRecord keys; everything else (including
1098            // `id`, `status`, type-specific fields) goes to `fields`.
1099            "path" => {}
1100            _ => {
1101                fields.insert(key, yaml_to_json_value(&v));
1102            }
1103        }
1104    }
1105
1106    Ok(FileMeta {
1107        type_,
1108        summary,
1109        tags,
1110        links,
1111        created,
1112        updated,
1113        fields,
1114    })
1115}
1116
1117/// A YAML scalar (`String`/`Number`/`Bool`) rendered as a string; `None` for
1118/// sequences/mappings/null. **Must stay identical to `validate::scalar_string`**
1119/// so the index writer and the validator coerce `type`/`summary` the same way
1120/// (see [`read_frontmatter`]); an unquoted `summary: 2026` becomes `"2026"` in
1121/// both, not a placeholder here and a real value there.
1122fn scalar_string(v: &serde_norway::Value) -> Option<String> {
1123    match v {
1124        serde_norway::Value::String(s) => Some(s.clone()),
1125        serde_norway::Value::Number(n) => Some(n.to_string()),
1126        serde_norway::Value::Bool(b) => Some(b.to_string()),
1127        _ => None,
1128    }
1129}
1130
1131/// Lossily decode the leading frontmatter region of a file given its raw bytes,
1132/// then pull the YAML between the opening `---` and the next `---`. Only the
1133/// frontmatter region needs to be valid UTF-8 in practice; the body may carry
1134/// arbitrary bytes (a verbatim `sources/` import). Returns `None` when the file
1135/// has no frontmatter fence at its very start.
1136fn extract_frontmatter_block_lossy(bytes: &[u8]) -> Option<String> {
1137    // Decode lossily so a non-UTF-8 body byte never aborts the read. The
1138    // frontmatter is at the very start of the file, so a lossy whole-file decode
1139    // is correct for extracting it (and cheap relative to the YAML parse). A
1140    // leading UTF-8 BOM is stripped by `extract_frontmatter_block`.
1141    let text = String::from_utf8_lossy(bytes);
1142    extract_frontmatter_block(&text)
1143}
1144
1145/// Pull the YAML between a leading `---` line and the next `---` line. Returns
1146/// `None` when the file has no frontmatter fence at its very start.
1147fn extract_frontmatter_block(text: &str) -> Option<String> {
1148    let trimmed = text.strip_prefix('\u{feff}').unwrap_or(text);
1149    let mut lines = trimmed.lines();
1150    let first = lines.next()?;
1151    if first.trim_end() != "---" {
1152        return None;
1153    }
1154    let mut block = String::new();
1155    for line in lines {
1156        if line.trim_end() == "---" {
1157            return Some(block);
1158        }
1159        block.push_str(line);
1160        block.push('\n');
1161    }
1162    None // no closing fence
1163}
1164
1165/// Read a string scalar or a sequence-of-string-scalars into a `Vec<String>`.
1166/// Wiki-link items keep their `[[…]]` form verbatim.
1167fn yaml_string_list(v: &serde_norway::Value) -> Vec<String> {
1168    match v {
1169        serde_norway::Value::String(s) => vec![s.clone()],
1170        serde_norway::Value::Sequence(seq) => seq
1171            .iter()
1172            .filter_map(yaml_string_or_wiki_link_literal)
1173            .collect(),
1174        _ => Vec::new(),
1175    }
1176}
1177
1178fn yaml_string_or_wiki_link_literal(v: &serde_norway::Value) -> Option<String> {
1179    v.as_str()
1180        .map(str::to_string)
1181        .or_else(|| unquoted_wiki_link_literal(v))
1182}
1183
1184fn yaml_to_json_value(v: &serde_norway::Value) -> Value {
1185    if let Some(link) = unquoted_wiki_link_literal(v) {
1186        return Value::String(link);
1187    }
1188    match v {
1189        serde_norway::Value::String(s) => Value::String(s.clone()),
1190        serde_norway::Value::Bool(b) => Value::Bool(*b),
1191        serde_norway::Value::Number(n) => {
1192            serde_json::to_value(n).unwrap_or_else(|_| Value::String(n.to_string()))
1193        }
1194        serde_norway::Value::Sequence(seq) => {
1195            Value::Array(seq.iter().map(yaml_to_json_value).collect())
1196        }
1197        serde_norway::Value::Mapping(_) | serde_norway::Value::Tagged(_) => {
1198            serde_json::to_value(v).unwrap_or(Value::Null)
1199        }
1200        serde_norway::Value::Null => Value::Null,
1201    }
1202}
1203
1204fn unquoted_wiki_link_literal(v: &serde_norway::Value) -> Option<String> {
1205    let serde_norway::Value::Sequence(outer) = v else {
1206        return None;
1207    };
1208    if outer.len() != 1 {
1209        return None;
1210    }
1211    let serde_norway::Value::Sequence(inner) = &outer[0] else {
1212        return None;
1213    };
1214    let [serde_norway::Value::String(target)] = inner.as_slice() else {
1215        return None;
1216    };
1217    Some(format!("[[{target}]]"))
1218}
1219
1220/// Parse an RFC3339 timestamp scalar.
1221fn parse_ts(s: &str) -> Option<DateTime<FixedOffset>> {
1222    DateTime::parse_from_rfc3339(s.trim()).ok()
1223}
1224
1225/// Render a timestamp the same way `serde_json` renders an `IndexRecord`
1226/// timestamp (RFC3339, `Z` for UTC, sub-seconds preserved) so the md
1227/// frontmatter and the jsonl agree byte-for-byte.
1228fn fmt_ts(ts: &DateTime<FixedOffset>) -> String {
1229    ts.to_rfc3339_opts(SecondsFormat::AutoSi, true)
1230}
1231
1232/// Max `updated` over an iterator of optional timestamps.
1233fn max_updated<'a>(
1234    it: impl Iterator<Item = Option<&'a DateTime<FixedOffset>>>,
1235) -> Option<DateTime<FixedOffset>> {
1236    let mut best: Option<DateTime<FixedOffset>> = None;
1237    for ts in it.flatten() {
1238        best = Some(match best {
1239            Some(cur) if cur >= *ts => cur,
1240            _ => *ts,
1241        });
1242    }
1243    best
1244}
1245
1246/// Read a type-folder's `index.jsonl` into records, applying last-write-wins by
1247/// `path` over any un-compacted lines (so a half-compacted jsonl still reads
1248/// cleanly). Missing file → empty set. Returns records in canonical order.
1249fn read_jsonl_records(jsonl: &Path) -> crate::Result<Vec<IndexRecord>> {
1250    let text = match fs::read_to_string(jsonl) {
1251        Ok(t) => t,
1252        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
1253        Err(e) => return Err(e.into()),
1254    };
1255    // Last-write-wins by path; preserve only the final occurrence.
1256    let mut by_path: BTreeMap<PathBuf, IndexRecord> = BTreeMap::new();
1257    for (i, line) in text.lines().enumerate() {
1258        if line.trim().is_empty() {
1259            continue;
1260        }
1261        let rec: IndexRecord = serde_json::from_str(line).map_err(|e| {
1262            crate::Error::Store(crate::store::StoreError::BadTypeIndex {
1263                path: jsonl.to_path_buf(),
1264                message: format!("line {}: {e}", i + 1),
1265            })
1266        })?;
1267        by_path.insert(rec.path.clone(), rec);
1268    }
1269    let mut records: Vec<IndexRecord> = by_path.into_values().collect();
1270    sort_records(&mut records);
1271    Ok(records)
1272}
1273
1274/// The minimal rollup stat a parent index needs from one type-folder's
1275/// `index.jsonl`: how many distinct files it catalogs (`count`) and the single
1276/// newest record (`newest`, the recency-sorted `.first()` — its `updated` feeds
1277/// the parent's derived `updated`, its `summary` the layer preview). Holding the
1278/// newest record alone, rather than the whole sidecar, is what keeps a rollup
1279/// recompute cheap regardless of how large the sidecar grows.
1280#[derive(Debug, Clone, Default, PartialEq)]
1281struct FolderStat {
1282    count: usize,
1283    newest: Option<IndexRecord>,
1284}
1285
1286/// Read a type-folder's `index.jsonl` ONCE and reduce it to a [`FolderStat`]:
1287/// distinct-`path` count (last-write-wins) plus the recency-newest record. A
1288/// missing sidecar is the default (`count: 0`, `newest: None`). This is the
1289/// **loop-path** rollup primitive — one streaming pass per sidecar, never the
1290/// content tree and never the 2–3× full reparse the old
1291/// `jsonl_record_count` + `read_jsonl_records` pair did. `count` is
1292/// byte-identical to [`read_jsonl_records`]`.len()` and `newest` to its
1293/// `.first()`, so a rollup built from these stats matches the from-scratch
1294/// builders byte-for-byte.
1295fn read_folder_stat(jsonl: &Path) -> crate::Result<FolderStat> {
1296    let text = match fs::read_to_string(jsonl) {
1297        Ok(t) => t,
1298        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(FolderStat::default()),
1299        Err(e) => return Err(e.into()),
1300    };
1301    // Last-write-wins by path, exactly like `read_jsonl_records`, so count and
1302    // newest are computed over the same compacted record set.
1303    let mut by_path: BTreeMap<PathBuf, IndexRecord> = BTreeMap::new();
1304    for (i, line) in text.lines().enumerate() {
1305        if line.trim().is_empty() {
1306            continue;
1307        }
1308        let rec: IndexRecord = serde_json::from_str(line).map_err(|e| {
1309            crate::Error::Store(crate::store::StoreError::BadTypeIndex {
1310                path: jsonl.to_path_buf(),
1311                message: format!("line {}: {e}", i + 1),
1312            })
1313        })?;
1314        by_path.insert(rec.path.clone(), rec);
1315    }
1316    let count = by_path.len();
1317    // The newest record is the minimum under `sort_records`' order (updated
1318    // desc, None last, ties by path asc) — i.e. what `.first()` returns. Find it
1319    // with a single min-scan instead of sorting the whole set.
1320    let newest = by_path.into_values().min_by(record_recency_cmp);
1321    Ok(FolderStat { count, newest })
1322}
1323
1324/// The total order [`sort_records`] imposes, as a comparator over two records:
1325/// `updated` descending (None last), ties broken by store-relative path
1326/// ascending. Kept in one place so `read_folder_stat`'s min-scan agrees with the
1327/// sort byte-for-byte on which record is "newest".
1328fn record_recency_cmp(a: &IndexRecord, b: &IndexRecord) -> std::cmp::Ordering {
1329    match (b.updated, a.updated) {
1330        (Some(bu), Some(au)) => bu.cmp(&au),
1331        (Some(_), None) => std::cmp::Ordering::Greater, // a is None → after b
1332        (None, Some(_)) => std::cmp::Ordering::Less,    // b is None → after a
1333        (None, None) => std::cmp::Ordering::Equal,
1334    }
1335    .then_with(|| a.path.cmp(&b.path))
1336}
1337
1338/// Per-child rollup stats for `layers`, read from each type-folder's on-disk
1339/// `index.jsonl` (one [`read_folder_stat`] pass each) rather than walked from the
1340/// content tree. The **loop-path** counterpart to the from-scratch counting in
1341/// [`Index::build_layer`] / [`Index::build_root`], reusing one read per sidecar
1342/// across BOTH the layer and root rollups. Empty folders (`count == 0`) are kept
1343/// out of the map.
1344///
1345/// NOTE on cost: this performs one read per type-folder, but each read line-parses
1346/// that folder's entire `index.jsonl`, so the total is `O(total catalogued
1347/// records)`, not `O(type-folders)` — it reads the whole catalog every call. It
1348/// avoids the content-tree walk ([`Store::walk`]), but it is NOT `O(changed)`. See
1349/// [`update_parents`] for the honest bound and the follow-up to fix it.
1350fn collect_child_stats(
1351    store: &Store,
1352    layers: &[Layer],
1353) -> crate::Result<BTreeMap<PathBuf, FolderStat>> {
1354    let mut stats = BTreeMap::new();
1355    for &layer in layers {
1356        for tf in type_folders_in_layer(store, layer) {
1357            let stat = read_folder_stat(&store.root.join(&tf).join("index.jsonl"))?;
1358            if stat.count > 0 {
1359                stats.insert(tf, stat);
1360            }
1361        }
1362    }
1363    Ok(stats)
1364}
1365
1366/// Walk a type-folder's `.md` content files, recursing through date-shards,
1367/// excluding the `index.md` artifact itself and any hidden entries.
1368fn walk_type_folder_files(folder_abs: &Path) -> Vec<PathBuf> {
1369    let mut out = Vec::new();
1370    if !folder_abs.is_dir() {
1371        return out;
1372    }
1373    for entry in walkdir::WalkDir::new(folder_abs)
1374        .into_iter()
1375        .filter_entry(|e| !is_hidden(e.file_name()))
1376        .filter_map(|e| e.ok())
1377    {
1378        if !entry.file_type().is_file() {
1379            continue;
1380        }
1381        let p = entry.path();
1382        if p.extension().and_then(|e| e.to_str()) != Some("md") {
1383            continue;
1384        }
1385        if p.file_name().and_then(|n| n.to_str()) == Some("index.md") {
1386            continue;
1387        }
1388        out.push(p.to_path_buf());
1389    }
1390    out
1391}
1392
1393/// The immediate type-folders under a layer (one directory level below the
1394/// layer dir), as store-relative paths. Hidden dirs and `log/` are skipped.
1395fn type_folders_in_layer(store: &Store, layer: Layer) -> Vec<PathBuf> {
1396    let layer_dir = store.root.join(layer_dir_name(layer));
1397    let mut out = Vec::new();
1398    let rd = match fs::read_dir(&layer_dir) {
1399        Ok(rd) => rd,
1400        Err(_) => return out,
1401    };
1402    for entry in rd.flatten() {
1403        if !entry.path().is_dir() {
1404            continue;
1405        }
1406        let name = entry.file_name();
1407        let name = match name.to_str() {
1408            Some(n) => n,
1409            None => continue,
1410        };
1411        if is_hidden(entry.file_name().as_os_str()) || name == "log" {
1412            continue;
1413        }
1414        out.push(PathBuf::from(layer_dir_name(layer)).join(name));
1415    }
1416    out.sort();
1417    out
1418}
1419
1420/// The layer a *loose* content file sits directly in: `<layer>/<file>.md` with
1421/// no type-folder between them — exactly two path components, the first a known
1422/// layer. `None` for a file inside a type-folder (`<layer>/<type>/…`, the common
1423/// case) or one outside any layer. A loose file is catalogued in the layer's own
1424/// `index.jsonl`, not a type-folder's.
1425fn loose_layer_of(file_rel: &Path) -> Option<Layer> {
1426    let mut comps = file_rel.components();
1427    let layer = layer_from_dir_name(comps.next()?.as_os_str().to_str()?)?;
1428    comps.next()?; // the file segment must exist…
1429    if comps.next().is_some() {
1430        return None; // …and be the last one (else it's inside a type-folder)
1431    }
1432    Some(layer)
1433}
1434
1435/// The `.md` content files that live directly at a layer root (loose files),
1436/// excluding `index.md` and any subdirectory (type-folders are walked
1437/// separately). Non-recursive: only the layer's immediate children.
1438fn loose_files_in_layer(store: &Store, layer: Layer) -> Vec<PathBuf> {
1439    let layer_dir = store.root.join(layer_dir_name(layer));
1440    let mut out = Vec::new();
1441    let rd = match fs::read_dir(&layer_dir) {
1442        Ok(rd) => rd,
1443        Err(_) => return out,
1444    };
1445    for entry in rd.flatten() {
1446        let p = entry.path();
1447        if !p.is_file() {
1448            continue;
1449        }
1450        if p.extension().and_then(|e| e.to_str()) != Some("md") {
1451            continue;
1452        }
1453        if is_index_artifact(&p) || is_hidden(entry.file_name().as_os_str()) {
1454            continue;
1455        }
1456        out.push(p);
1457    }
1458    out
1459}
1460
1461/// Write (or remove, when empty) a layer's own `index.jsonl` — the complete twin
1462/// for the loose files that live directly at the layer root. The single funnel
1463/// both write-through (`on_write`/`on_remove`/`on_rename`) and the sweeps
1464/// (`rebuild_all`/`write_level`) go through, so their output is byte-identical.
1465fn write_layer_jsonl(store: &Store, layer: Layer, records: &[IndexRecord]) -> crate::Result<()> {
1466    let path = store.root.join(layer_dir_name(layer)).join("index.jsonl");
1467    if records.is_empty() {
1468        remove_if_exists(&path)?;
1469        return Ok(());
1470    }
1471    let idx = Index {
1472        level: IndexLevel::Layer(layer),
1473        records: records.to_vec(),
1474        child_counts: BTreeMap::new(),
1475    };
1476    write_atomic(&path, idx.to_jsonl())
1477}
1478
1479/// Upsert (`removing` = false) or remove (`removing` = true) a loose file's row
1480/// in its layer `index.jsonl`, serialising the read-modify-write under a folder
1481/// lock (same discipline as the type-folder write-through). The layer `index.md`
1482/// rollup is untouched — loose files do not change type-folder counts.
1483fn apply_loose_change(
1484    store: &Store,
1485    layer: Layer,
1486    file_rel: &Path,
1487    removing: bool,
1488) -> crate::Result<()> {
1489    let layer_dir = store.root.join(layer_dir_name(layer));
1490    let _lock = FolderLock::acquire(&layer_dir);
1491    let jsonl = layer_dir.join("index.jsonl");
1492    let mut records = read_jsonl_records(&jsonl)?;
1493    records.retain(|r| r.path != file_rel);
1494    if !removing {
1495        records.push(record_from_file(
1496            &store.root.join(file_rel),
1497            file_rel.to_path_buf(),
1498        )?);
1499    }
1500    sort_records(&mut records);
1501    write_layer_jsonl(store, layer, &records)
1502}
1503
1504/// The type-folder a content file belongs to: `<layer>/<type>` (the first two
1505/// path components), or `None` if the path is not under a known layer with at
1506/// least a type segment.
1507fn type_folder_of(file_rel: &Path) -> Option<PathBuf> {
1508    let mut comps = file_rel.components();
1509    let layer = comps.next()?.as_os_str().to_str()?;
1510    layer_from_dir_name(layer)?;
1511    let type_seg = comps.next()?.as_os_str().to_str()?;
1512    Some(PathBuf::from(layer).join(type_seg))
1513}
1514
1515/// Convert an absolute path under `root` to a store-relative path.
1516fn rel_to_store(root: &Path, abs: &Path) -> Option<PathBuf> {
1517    abs.strip_prefix(root).ok().map(|p| p.to_path_buf())
1518}
1519
1520/// Normalize a possibly-absolute or `./`-prefixed path to a clean
1521/// store-relative form (drops a leading `./`; leaves already-relative paths).
1522fn normalize_rel(p: &Path) -> PathBuf {
1523    let s = path_to_unix(p);
1524    let s = s.strip_prefix("./").unwrap_or(&s);
1525    PathBuf::from(s)
1526}
1527
1528fn is_index_artifact(p: &Path) -> bool {
1529    matches!(
1530        p.file_name().and_then(|n| n.to_str()),
1531        Some("index.md") | Some("index.jsonl")
1532    )
1533}
1534
1535/// True when a file named `index.md` / `index.jsonl` is safe for [`Index::cleanup`]
1536/// to delete — i.e. it is a generated catalog artifact (or a stale/garbage
1537/// leftover from a previous build), NOT a user content file that merely happens
1538/// to be named `index.md`.
1539///
1540/// - `index.jsonl` is always a machine artifact (content files are `.md`), so it
1541///   is always deletable.
1542/// - `index.md` is deletable UNLESS it parses as a content file — frontmatter
1543///   whose `type` is some real record type (anything other than `index`). A
1544///   generated catalog carries `type: index`; a user record carries its own type
1545///   (`email`, `note`, …) and must be preserved (deleting it is silent,
1546///   unrecoverable data loss). A leftover with no/garbage frontmatter (e.g. a
1547///   bare `stale\n`) is treated as a deletable stale artifact.
1548fn is_deletable_catalog_artifact(p: &Path) -> bool {
1549    match p.file_name().and_then(|n| n.to_str()) {
1550        Some("index.jsonl") => true,
1551        Some("index.md") => match read_frontmatter(p) {
1552            // Real content file (non-`index` type) → preserve, never delete.
1553            Ok(meta) => meta.type_.as_deref().is_none_or(|t| t == "index"),
1554            // Unreadable / no frontmatter → a stale or garbage artifact, deletable.
1555            Err(_) => true,
1556        },
1557        _ => false,
1558    }
1559}
1560
1561fn is_hidden(name: &std::ffi::OsStr) -> bool {
1562    name.to_str().map(|s| s.starts_with('.')).unwrap_or(false)
1563}
1564
1565fn layer_dir_name(layer: Layer) -> &'static str {
1566    match layer {
1567        Layer::Sources => "sources",
1568        Layer::Records => "records",
1569    }
1570}
1571
1572/// Local layer-name parse. Mirrors the contract of [`Layer::from_dir_name`];
1573/// kept local to keep this module's walk self-contained (see the module header).
1574fn layer_from_dir_name(name: &str) -> Option<Layer> {
1575    match name {
1576        "sources" => Some(Layer::Sources),
1577        "records" => Some(Layer::Records),
1578        _ => None,
1579    }
1580}
1581
1582/// The final path component as a `&str` (folder basename).
1583fn folder_basename(p: &Path) -> &str {
1584    p.file_name().and_then(|n| n.to_str()).unwrap_or("")
1585}
1586
1587/// The canonical wiki-link target for a content path: the store-relative path
1588/// with `/` separators and the trailing `.md` stripped (the bare form the
1589/// `index.md` browse view links to).
1590fn wiki_target(p: &Path) -> String {
1591    let unix = path_to_unix(p);
1592    unix.strip_suffix(".md").unwrap_or(&unix).to_string()
1593}
1594
1595/// Render a path with `/` separators regardless of host OS, so artifacts are
1596/// identical on every platform.
1597///
1598/// A non-UTF-8 path component (reachable on Linux/ext4, db.md's primary
1599/// deployment target, where `sources/` files arrive verbatim from Latin-1
1600/// exports) is decoded **lossily** with `U+FFFD` markers rather than silently
1601/// dropped. The old `filter_map(|c| c.as_os_str().to_str())` dropped any bad
1602/// component entirely, so `sources/emails/caf\xe9.md` serialized as
1603/// `sources/emails` — a path pointing at the *directory*, not the file, that
1604/// also collapsed distinct files onto one `index.jsonl` key. Lossy decoding
1605/// keeps the leaf present and visibly marked.
1606fn path_to_unix(p: &Path) -> String {
1607    p.components()
1608        .map(|c| c.as_os_str().to_string_lossy().into_owned())
1609        .collect::<Vec<_>>()
1610        .join("/")
1611}
1612
1613/// Serde for [`IndexRecord::path`]: always forward-slash on the wire, so the
1614/// `index.jsonl` catalog is identical whether the store was written on POSIX or
1615/// Windows (a git clone across OSes yields the same paths, and the last-write-
1616/// wins upsert key never splits on separator style). On POSIX this matches the
1617/// default `PathBuf` serialization; on Windows it rewrites `\` to `/`.
1618mod path_serde {
1619    use super::path_to_unix;
1620    use serde::{Deserialize, Deserializer, Serializer};
1621    use std::path::{Path, PathBuf};
1622
1623    pub fn serialize<S: Serializer>(p: &Path, s: S) -> Result<S::Ok, S::Error> {
1624        s.serialize_str(&path_to_unix(p))
1625    }
1626
1627    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<PathBuf, D::Error> {
1628        Ok(PathBuf::from(String::deserialize(d)?))
1629    }
1630}
1631
1632/// ASCII-capitalize the first character.
1633fn capitalize(s: &str) -> String {
1634    let mut chars = s.chars();
1635    match chars.next() {
1636        Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(),
1637        None => String::new(),
1638    }
1639}
1640
1641/// Collapse all runs of whitespace (including newlines) into single spaces and
1642/// trim the ends — the single-line normalization the `index.md` browse entry
1643/// ([`format_md_entry`]) applies so a multi-line block-scalar summary can never
1644/// inject a newline into a catalog line.
1645fn collapse_whitespace(s: &str) -> String {
1646    s.split_whitespace().collect::<Vec<_>>().join(" ")
1647}
1648
1649/// Derive a folder's display name from its basename: separators (`-`, `_`)
1650/// become spaces and the first character is upper-cased (`hubspot-exports` →
1651/// `Hubspot exports`). A deterministic floor — the curator overrides it via
1652/// `DB.md ## Folders` (`records/x|HubSpot exports`) for casing the tool cannot
1653/// guess. The tool tidies a folder's *name*; it never infers its *meaning*.
1654fn default_display(basename: &str) -> String {
1655    let spaced: String = basename
1656        .chars()
1657        .map(|c| if c == '-' || c == '_' { ' ' } else { c })
1658        .collect();
1659    capitalize(&spaced)
1660}
1661
1662/// The display name + optional description a root/layer rollup shows for a child
1663/// type-folder: the curator's `## Folders` metadata when present, else the
1664/// derived display name and **no description**. This is the whole anti-"tool
1665/// invents the curator's judgment" contract for the rollups — a description is
1666/// surfaced only when the agent authored one; it is never composed from the
1667/// folder's newest member or any other content.
1668fn folder_label<'a>(
1669    tf_unix: &str,
1670    basename: &str,
1671    folders: &'a BTreeMap<String, FolderMeta>,
1672) -> (String, Option<&'a str>) {
1673    let meta = folders.get(tf_unix);
1674    let display = meta
1675        .and_then(|m| m.display.as_deref())
1676        .map(str::to_string)
1677        .unwrap_or_else(|| default_display(basename));
1678    (display, meta.and_then(|m| m.description.as_deref()))
1679}
1680
1681/// One root/layer rollup entry: `- [[<tf>/index|<Display>]] (<count>)` with an
1682/// ` — <description>` suffix only when the curator authored one.
1683fn folder_entry(tf_unix: &str, display: &str, count: usize, description: Option<&str>) -> String {
1684    match description {
1685        Some(d) => format!("- [[{tf_unix}/index|{display}]] ({count}) — {d}\n"),
1686        None => format!("- [[{tf_unix}/index|{display}]] ({count})\n"),
1687    }
1688}
1689
1690/// Atomic (rename-based) write for the **derived** catalog (`index.md` /
1691/// `index.jsonl`). Deliberately NOT `fsync`-durable like [`crate::fsx`]: the
1692/// index is rebuildable (`dbmd index rebuild`) and this is the O(changed)
1693/// write-through path, so a per-write `fsync` would be cost without benefit — a
1694/// crash-lost catalog write is recovered by a rebuild, not data loss. (Primary
1695/// data — content records, `log.md` — uses the durable `crate::fsx` path.)
1696fn write_atomic(path: &Path, contents: String) -> crate::Result<()> {
1697    if let Some(parent) = path.parent() {
1698        fs::create_dir_all(parent)?;
1699    }
1700    let dir = path.parent().unwrap_or_else(|| Path::new("."));
1701    let mut tmp = tempfile_in(dir)?;
1702    tmp.write_all(contents.as_bytes())?;
1703    tmp.flush()?;
1704    tmp.persist(path)?;
1705    Ok(())
1706}
1707
1708fn remove_if_exists(path: &Path) -> crate::Result<()> {
1709    match fs::remove_file(path) {
1710        Ok(()) => Ok(()),
1711        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1712        Err(e) => Err(e.into()),
1713    }
1714}
1715
1716fn bad_index(path: &Path, msg: &str) -> crate::Error {
1717    crate::Error::Store(crate::store::StoreError::BadTypeIndex {
1718        path: path.to_path_buf(),
1719        message: msg.to_string(),
1720    })
1721}
1722
1723/// Per-type-folder advisory lock for the write-through sidecar read-modify-write.
1724///
1725/// The write-through update of a folder's `index.jsonl`/`index.md` is a
1726/// read-snapshot → modify → atomic-rename-over-whole-file sequence. The SPEC
1727/// sanctions many-writer concurrency for `records/` (`dbmd write` is
1728/// `create_new`-race-safe for the *content* file), but two concurrent writers to
1729/// the SAME type-folder would each read the same sidecar snapshot, add only their
1730/// own row, and rename their whole file over the other's — a classic lost update,
1731/// dropping most rows until a manual `dbmd index rebuild`. This lock serializes
1732/// the per-folder RMW (the content file is already serialized by `create_new`),
1733/// so concurrent sanctioned writes each see the other's row.
1734///
1735/// Implementation: a hidden `<type-folder>/.index.lock` acquired via `create_new`
1736/// (the same O_EXCL primitive `cmd/write.rs` uses), bounded-spin with a small
1737/// sleep, and stale-lock breaking by mtime age so a crashed writer can't wedge
1738/// the folder forever. The dotfile name keeps it out of the content walk
1739/// (`walk_type_folder_files` skips hidden) and out of `cleanup`
1740/// (`is_index_artifact` only matches `index.md`/`index.jsonl`). RAII: the lock is
1741/// released (file removed) on drop, including on the error paths.
1742struct FolderLock {
1743    path: PathBuf,
1744    held: bool,
1745}
1746
1747impl FolderLock {
1748    /// Acquire the lock for `folder_abs`. Waits until it either takes the lock or
1749    /// breaks a genuinely-stale one (a crashed writer's leftover, older than the
1750    /// staleness window). It does **not** give up after a fixed budget and
1751    /// proceed unlocked under contention.
1752    ///
1753    /// Why no contention budget: a single legitimate write can hold this lock for
1754    /// several seconds — `on_write`/`on_remove`/`on_rename` hold it across the
1755    /// whole body, and `update_parents` recomputes the rollups in
1756    /// `O(total catalogued records)`. A short give-up budget (the old ~6s) would
1757    /// expire while a LIVE writer still held the lock, and the loser would then
1758    /// run the sidecar read-modify-write with no mutual exclusion — both writers
1759    /// read the same `index.jsonl` snapshot, each adds only its own row, and one
1760    /// overwrites the other, silently dropping a catalogued record (the lost
1761    /// update this lock exists to prevent; surfaced only by a full
1762    /// `validate --all` as `INDEX_JSONL_DESYNC`). So a live holder is always
1763    /// waited out, never raced. Forward progress is still bounded against a
1764    /// *dead* holder: a lockfile older than `STALE_AFTER` is broken.
1765    ///
1766    /// Residual limitation (documented, follow-up): a single legitimate hold
1767    /// longer than `STALE_AFTER` could be mistaken for a crash and broken. That
1768    /// needs a pathological store (an `update_parents` rollup exceeding the
1769    /// window — itself the flagged `O(total)` hot-path cost). The complete fix is
1770    /// a holder heartbeat that refreshes the lockfile mtime during long ops; not
1771    /// done inline to keep this change surgical. Only a genuine non-contention
1772    /// error (e.g. a permission failure creating the lockfile) degrades to
1773    /// proceeding unlocked — never contention.
1774    fn acquire(folder_abs: &Path) -> Self {
1775        use std::time::{Duration, SystemTime};
1776        const SPIN: Duration = Duration::from_millis(10);
1777        const STALE_AFTER: Duration = Duration::from_secs(30);
1778
1779        let path = folder_abs.join(".index.lock");
1780        // Ensure the folder exists so the lockfile create can succeed.
1781        let _ = fs::create_dir_all(folder_abs);
1782        loop {
1783            match fs::OpenOptions::new()
1784                .write(true)
1785                .create_new(true)
1786                .open(&path)
1787            {
1788                Ok(_) => return FolderLock { path, held: true },
1789                Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
1790                    // Break a stale lock left by a crashed writer; otherwise wait
1791                    // for the live holder to release. NEVER proceed unlocked here.
1792                    let stale = fs::metadata(&path)
1793                        .and_then(|m| m.modified())
1794                        .ok()
1795                        .and_then(|t| SystemTime::now().duration_since(t).ok())
1796                        .map(|age| age > STALE_AFTER)
1797                        .unwrap_or(false);
1798                    if stale {
1799                        let _ = fs::remove_file(&path);
1800                        continue;
1801                    }
1802                    std::thread::sleep(SPIN);
1803                }
1804                // A non-contention error (permissions, read-only fs): we cannot
1805                // lock here at all, so proceed unlocked rather than fail a
1806                // sanctioned write — the prior best-effort behavior, but ONLY for
1807                // hard errors, never for contention.
1808                Err(_) => return FolderLock { path, held: false },
1809            }
1810        }
1811    }
1812}
1813
1814impl Drop for FolderLock {
1815    fn drop(&mut self) {
1816        if self.held {
1817            let _ = fs::remove_file(&self.path);
1818        }
1819    }
1820}
1821
1822/// Acquire the write-through lock for one or two type-folders. When `a == b`
1823/// (same-folder rename) only one lock is taken. For two distinct folders the
1824/// locks are always acquired in sorted order so a pair of concurrent renames
1825/// touching the same two folders can't deadlock by grabbing them in opposite
1826/// orders. Returns the guard(s); drop releases them.
1827fn lock_folders(store: &Store, a: &Path, b: &Path) -> Vec<FolderLock> {
1828    if a == b {
1829        return vec![FolderLock::acquire(&store.root.join(a))];
1830    }
1831    let (first, second) = if a < b { (a, b) } else { (b, a) };
1832    vec![
1833        FolderLock::acquire(&store.root.join(first)),
1834        FolderLock::acquire(&store.root.join(second)),
1835    ]
1836}
1837
1838// A tiny atomic-write helper. `tempfile` is a dev-dependency for tests; for
1839// the library path we hand-roll a temp-file-then-rename so writes are atomic
1840// without pulling `tempfile` into the non-dev dependency set. The file handle
1841// is held in an `Option` so `persist` can take it out without fighting the
1842// `Drop` impl (which only cleans up an un-persisted temp file).
1843struct AtomicTemp {
1844    file: Option<fs::File>,
1845    path: PathBuf,
1846    persisted: bool,
1847}
1848
1849impl AtomicTemp {
1850    fn write_all(&mut self, bytes: &[u8]) -> std::io::Result<()> {
1851        self.file.as_mut().expect("temp file open").write_all(bytes)
1852    }
1853    fn flush(&mut self) -> std::io::Result<()> {
1854        self.file.as_mut().expect("temp file open").flush()
1855    }
1856    fn persist(mut self, dest: &Path) -> std::io::Result<()> {
1857        if let Some(f) = self.file.take() {
1858            f.sync_all().ok();
1859            // `f` dropped here, closing the handle before the rename.
1860        }
1861        fs::rename(&self.path, dest)?;
1862        self.persisted = true;
1863        Ok(())
1864    }
1865}
1866
1867impl Drop for AtomicTemp {
1868    fn drop(&mut self) {
1869        // Best-effort cleanup if not persisted (an error path bailed out).
1870        if !self.persisted {
1871            let _ = fs::remove_file(&self.path);
1872        }
1873    }
1874}
1875
1876fn tempfile_in(dir: &Path) -> std::io::Result<AtomicTemp> {
1877    use std::time::{SystemTime, UNIX_EPOCH};
1878    let nanos = SystemTime::now()
1879        .duration_since(UNIX_EPOCH)
1880        .map(|d| d.as_nanos())
1881        .unwrap_or(0);
1882    let pid = std::process::id();
1883    // Monotonic-ish unique suffix; the dir is the destination dir so rename is
1884    // same-filesystem and therefore atomic.
1885    let counter = next_temp_counter();
1886    let name = format!(".dbmd-index-{pid}-{nanos}-{counter}.tmp");
1887    let path = dir.join(name);
1888    let file = fs::OpenOptions::new()
1889        .write(true)
1890        .create_new(true)
1891        .open(&path)?;
1892    Ok(AtomicTemp {
1893        file: Some(file),
1894        path,
1895        persisted: false,
1896    })
1897}
1898
1899fn next_temp_counter() -> u64 {
1900    use std::sync::atomic::{AtomicU64, Ordering};
1901    static C: AtomicU64 = AtomicU64::new(0);
1902    C.fetch_add(1, Ordering::Relaxed)
1903}
1904
1905#[cfg(test)]
1906mod tests {
1907    use super::*;
1908    use std::collections::BTreeSet;
1909    use std::fs;
1910    use tempfile::TempDir;
1911
1912    // ── fixtures ─────────────────────────────────────────────────────────
1913
1914    /// A temp store with a `DB.md` marker. `store.config` is the parser default
1915    /// (these tests never exercise the config parser).
1916    fn mk_store() -> (TempDir, Store) {
1917        let dir = TempDir::new().unwrap();
1918        fs::write(dir.path().join("DB.md"), "# test store\n").unwrap();
1919        let store = Store {
1920            root: dir.path().to_path_buf(),
1921            config: crate::parser::Config::default(),
1922        };
1923        (dir, store)
1924    }
1925
1926    /// Write a content file at `rel` with the given frontmatter lines + body.
1927    /// `fm` is the raw YAML body between the fences (no `---`).
1928    fn write_raw(store: &Store, rel: &str, fm: &str, body: &str) {
1929        let abs = store.root.join(rel);
1930        fs::create_dir_all(abs.parent().unwrap()).unwrap();
1931        fs::write(&abs, format!("---\n{fm}\n---\n{body}")).unwrap();
1932    }
1933
1934    /// Convenience: write a typed content file with summary/updated/extras.
1935    fn write_doc(
1936        store: &Store,
1937        rel: &str,
1938        type_: &str,
1939        summary: Option<&str>,
1940        updated: Option<&str>,
1941        extra_yaml: &str,
1942    ) {
1943        let mut fm = format!("type: {type_}\n");
1944        if let Some(s) = summary {
1945            fm.push_str(&format!("summary: {s}\n"));
1946        }
1947        if let Some(u) = updated {
1948            fm.push_str(&format!("updated: {u}\n"));
1949        }
1950        fm.push_str(extra_yaml);
1951        write_raw(store, rel, fm.trim_end(), "\nbody text\n");
1952    }
1953
1954    fn read(store: &Store, rel: &str) -> String {
1955        fs::read_to_string(store.root.join(rel)).unwrap()
1956    }
1957
1958    fn exists(store: &Store, rel: &str) -> bool {
1959        store.root.join(rel).exists()
1960    }
1961
1962    /// Collect every `index.md` + `index.jsonl` under the store, mapped to its
1963    /// bytes — the surface the byte-identity invariant compares.
1964    fn snapshot_artifacts(store: &Store) -> BTreeMap<String, String> {
1965        let mut out = BTreeMap::new();
1966        for entry in walkdir::WalkDir::new(&store.root)
1967            .into_iter()
1968            .filter_map(|e| e.ok())
1969        {
1970            let p = entry.path();
1971            if is_index_artifact(p) {
1972                let rel = path_to_unix(&rel_to_store(&store.root, p).unwrap());
1973                out.insert(rel, fs::read_to_string(p).unwrap());
1974            }
1975        }
1976        out
1977    }
1978
1979    // ── build_type_folder + to_markdown ──────────────────────────────────
1980
1981    #[test]
1982    fn type_folder_aggregates_across_shards_in_recency_order() {
1983        let (_d, store) = mk_store();
1984        // Three emails across two month-shards, deliberately written
1985        // out-of-recency-order on disk.
1986        write_doc(
1987            &store,
1988            "sources/emails/2026/05/b-old.md",
1989            "email",
1990            Some("Older mail"),
1991            Some("2026-05-01T09:00:00Z"),
1992            "",
1993        );
1994        write_doc(
1995            &store,
1996            "sources/emails/2026/06/c-new.md",
1997            "email",
1998            Some("Newest mail"),
1999            Some("2026-06-15T12:00:00Z"),
2000            "",
2001        );
2002        write_doc(
2003            &store,
2004            "sources/emails/2026/05/a-mid.md",
2005            "email",
2006            Some("Middle mail"),
2007            Some("2026-05-20T08:00:00Z"),
2008            "",
2009        );
2010
2011        let idx = Index::build_type_folder(&store, Path::new("sources/emails")).unwrap();
2012        let paths: Vec<String> = idx.records.iter().map(|r| path_to_unix(&r.path)).collect();
2013        assert_eq!(
2014            paths,
2015            vec![
2016                "sources/emails/2026/06/c-new.md",
2017                "sources/emails/2026/05/a-mid.md",
2018                "sources/emails/2026/05/b-old.md",
2019            ],
2020            "records must aggregate across shards, newest `updated` first"
2021        );
2022    }
2023
2024    #[test]
2025    fn type_folder_md_format_entries_tags_and_derived_updated() {
2026        let (_d, store) = mk_store();
2027        write_doc(
2028            &store,
2029            "records/contacts/sarah-chen.md",
2030            "contact",
2031            Some("Renewal champion at Acme"),
2032            Some("2026-05-27T10:00:00Z"),
2033            "tags:\n  - renewal\n  - acme\n",
2034        );
2035        write_doc(
2036            &store,
2037            "records/contacts/no-tags.md",
2038            "contact",
2039            Some("Plain contact"),
2040            Some("2026-05-26T10:00:00Z"),
2041            "",
2042        );
2043
2044        let idx = Index::build_type_folder(&store, Path::new("records/contacts")).unwrap();
2045        let md = idx.to_markdown();
2046
2047        // Frontmatter is exact and the index's own `updated` is the MAX member
2048        // updated (the determinism the byte-identity invariant rests on).
2049        assert!(md.starts_with(
2050            "---\ntype: index\nscope: type-folder\nfolder: records/contacts\nupdated: 2026-05-27T10:00:00Z\n---\n\n# records/contacts\n"
2051        ), "frontmatter/heading wrong:\n{md}");
2052
2053        // Entry with tags: `— summary  ·  #tag #tag`.
2054        assert!(
2055            md.contains(
2056                "- [[records/contacts/sarah-chen]] — Renewal champion at Acme  ·  #renewal #acme\n"
2057            ),
2058            "tagged entry wrong:\n{md}"
2059        );
2060        // Entry without tags omits the `  ·  ` suffix entirely.
2061        assert!(
2062            md.contains("- [[records/contacts/no-tags]] — Plain contact\n"),
2063            "untagged entry wrong:\n{md}"
2064        );
2065        assert!(
2066            !md.contains("Plain contact  ·"),
2067            "untagged entry must not emit a tag separator"
2068        );
2069        // No `## More` below the cap.
2070        assert!(!md.contains("## More"), "no footer expected under the cap");
2071    }
2072
2073    #[test]
2074    fn missing_summary_becomes_placeholder_not_invented() {
2075        let (_d, store) = mk_store();
2076        write_doc(
2077            &store,
2078            "records/notes/x.md",
2079            "note",
2080            None,
2081            Some("2026-05-27T10:00:00Z"),
2082            "",
2083        );
2084        let idx = Index::build_type_folder(&store, Path::new("records/notes")).unwrap();
2085        assert_eq!(idx.records[0].summary, MISSING_SUMMARY);
2086        let md = idx.to_markdown();
2087        assert!(
2088            md.contains("- [[records/notes/x]] — (no summary)\n"),
2089            "missing summary must render the placeholder, not invent text:\n{md}"
2090        );
2091    }
2092
2093    // ── to_jsonl ─────────────────────────────────────────────────────────
2094
2095    #[test]
2096    fn jsonl_is_complete_structured_and_round_trips() {
2097        let (_d, store) = mk_store();
2098        write_doc(
2099            &store,
2100            "records/expenses/2026/05/e1.md",
2101            "expense",
2102            Some("Lunch with vendor"),
2103            Some("2026-05-10T10:00:00Z"),
2104            "created: 2026-05-10T09:00:00Z\nstatus: paid\namount: 42\ncompany: [[records/companies/acme]]\nrelated:\n  - [[records/concepts/spend]]\ntags:\n  - food\nlinks:\n  - records/concepts/spend\n  - [[records/concepts/renewal]]\n",
2105        );
2106        write_doc(
2107            &store,
2108            "records/expenses/2026/06/e2.md",
2109            "expense",
2110            Some("Cloud bill"),
2111            Some("2026-06-01T10:00:00Z"),
2112            "amount: 100\n",
2113        );
2114
2115        let idx = Index::build_type_folder(&store, Path::new("records/expenses")).unwrap();
2116        let jsonl = idx.to_jsonl();
2117        let lines: Vec<&str> = jsonl.lines().collect();
2118        assert_eq!(lines.len(), 2, "one JSON object per file, uncapped");
2119
2120        // Newest first (e2), and each line parses back to an equal record.
2121        let r0: IndexRecord = serde_json::from_str(lines[0]).unwrap();
2122        assert_eq!(path_to_unix(&r0.path), "records/expenses/2026/06/e2.md");
2123        assert_eq!(
2124            r0, idx.records[0],
2125            "jsonl line must round-trip to the record"
2126        );
2127
2128        // The first (data) record carries every reserved field + the extras in
2129        // `fields` (status/amount), and links/tags verbatim.
2130        let r1: IndexRecord = serde_json::from_str(lines[1]).unwrap();
2131        assert_eq!(r1.type_, "expense");
2132        assert_eq!(r1.summary, "Lunch with vendor");
2133        assert_eq!(r1.tags, vec!["food".to_string()]);
2134        assert_eq!(
2135            r1.links,
2136            vec![
2137                "records/concepts/spend".to_string(),
2138                "[[records/concepts/renewal]]".to_string()
2139            ]
2140        );
2141        assert_eq!(
2142            r1.created,
2143            Some(DateTime::parse_from_rfc3339("2026-05-10T09:00:00Z").unwrap())
2144        );
2145        assert_eq!(r1.fields.get("status"), Some(&Value::from("paid")));
2146        assert_eq!(r1.fields.get("amount"), Some(&Value::from(42)));
2147        assert_eq!(
2148            r1.fields.get("company"),
2149            Some(&Value::from("[[records/companies/acme]]"))
2150        );
2151        assert_eq!(
2152            r1.fields.get("related"),
2153            Some(&serde_json::json!(["[[records/concepts/spend]]"]))
2154        );
2155        // Reserved keys never leak into `fields`.
2156        for reserved in [
2157            "path", "type", "summary", "tags", "links", "created", "updated",
2158        ] {
2159            assert!(
2160                !r1.fields.contains_key(reserved),
2161                "reserved key {reserved} must not appear in fields"
2162            );
2163        }
2164
2165        // Stable key order: declared fields first, then sorted extras.
2166        assert!(
2167            lines[1].starts_with(
2168                r#"{"path":"records/expenses/2026/05/e1.md","type":"expense","summary":"Lunch with vendor","tags":["food"],"links":["records/concepts/spend","[[records/concepts/renewal]]"],"created":"2026-05-10T09:00:00Z","updated":"2026-05-10T10:00:00Z","#
2169            ),
2170            "jsonl key order not stable:\n{}",
2171            lines[1]
2172        );
2173        // The flattened extras come in BTreeMap (sorted) order. The catalog
2174        // injects `meta-type: fact` into every records-layer file that does not
2175        // declare one, so it appears among the sorted extras (between `company`
2176        // and `related`).
2177        assert!(
2178            lines[1].ends_with(r#""amount":42,"company":"[[records/companies/acme]]","meta-type":"fact","related":["[[records/concepts/spend]]"],"status":"paid"}"#),
2179            "extras must be sorted:\n{}",
2180            lines[1]
2181        );
2182    }
2183
2184    // ── cap + footer ─────────────────────────────────────────────────────
2185
2186    #[test]
2187    fn over_cap_md_shows_500_plus_footer_jsonl_holds_all() {
2188        let (_d, store) = mk_store();
2189        let total = MD_CAP + 7;
2190        for i in 0..total {
2191            // Distinct, monotonically increasing `updated` so order is total.
2192            let day = 1 + (i % 27);
2193            let rel = format!("sources/emails/2026/05/m-{i:04}.md");
2194            let updated = format!("2026-05-{day:02}T00:00:{:02}Z", i % 60);
2195            write_doc(
2196                &store,
2197                &rel,
2198                "email",
2199                Some(&format!("mail {i}")),
2200                Some(&updated),
2201                "",
2202            );
2203        }
2204        let idx = Index::build_type_folder(&store, Path::new("sources/emails")).unwrap();
2205        assert_eq!(idx.records.len(), total, "jsonl/records keep every file");
2206
2207        let md = idx.to_markdown();
2208        let entry_lines = md.lines().filter(|l| l.starts_with("- [[")).count();
2209        assert_eq!(entry_lines, MD_CAP, "md browse view is capped at 500");
2210
2211        assert!(
2212            md.contains("## More\n\n"),
2213            "over-cap md needs a More footer"
2214        );
2215        assert!(
2216            md.contains(&format!(
2217                "This folder has {total} files. The 500 most recent are listed above.\n"
2218            )),
2219            "footer count wrong:\n{md}"
2220        );
2221        assert!(
2222            md.contains("Use `dbmd query --type email --in sources` for the complete catalog.\n"),
2223            "footer must infer type=email layer=sources:\n{md}"
2224        );
2225
2226        let jsonl = idx.to_jsonl();
2227        assert_eq!(jsonl.lines().count(), total, "jsonl is uncapped");
2228    }
2229
2230    // ── sort total order ─────────────────────────────────────────────────
2231
2232    #[test]
2233    fn sort_breaks_ties_by_path_and_puts_undated_last() {
2234        let mut recs = vec![
2235            rec("z/a.md", Some("2026-05-01T00:00:00Z")),
2236            rec("a/b.md", Some("2026-05-01T00:00:00Z")), // same updated, path < z/a
2237            rec("m/c.md", None),                         // undated → last
2238            rec("b/d.md", Some("2026-06-01T00:00:00Z")), // newest
2239        ];
2240        sort_records(&mut recs);
2241        let order: Vec<String> = recs.iter().map(|r| path_to_unix(&r.path)).collect();
2242        assert_eq!(order, vec!["b/d.md", "a/b.md", "z/a.md", "m/c.md"]);
2243    }
2244
2245    fn rec(path: &str, updated: Option<&str>) -> IndexRecord {
2246        IndexRecord {
2247            path: PathBuf::from(path),
2248            type_: "t".into(),
2249            summary: "s".into(),
2250            tags: vec![],
2251            links: vec![],
2252            created: None,
2253            updated: updated.map(|u| DateTime::parse_from_rfc3339(u).unwrap()),
2254            fields: BTreeMap::new(),
2255        }
2256    }
2257
2258    // ── build_layer / build_root ─────────────────────────────────────────
2259
2260    #[test]
2261    fn layer_index_lists_type_folders_with_counts() {
2262        let (_d, store) = mk_store();
2263        write_doc(
2264            &store,
2265            "records/contacts/a.md",
2266            "contact",
2267            Some("Contact A older"),
2268            Some("2026-05-01T00:00:00Z"),
2269            "",
2270        );
2271        write_doc(
2272            &store,
2273            "records/contacts/b.md",
2274            "contact",
2275            Some("Contact B newest"),
2276            Some("2026-05-09T00:00:00Z"),
2277            "",
2278        );
2279        write_doc(
2280            &store,
2281            "records/companies/x.md",
2282            "company",
2283            Some("Acme Inc"),
2284            Some("2026-05-05T00:00:00Z"),
2285            "",
2286        );
2287        // build the type-folder artifacts first (layer preview reads their jsonl)
2288        Index::write_level(&store, &IndexLevel::TypeFolder("records/contacts".into())).unwrap();
2289        Index::write_level(&store, &IndexLevel::TypeFolder("records/companies".into())).unwrap();
2290
2291        Index::write_level(&store, &IndexLevel::Layer(Layer::Records)).unwrap();
2292        let md = read(&store, "records/index.md");
2293
2294        assert!(
2295            md.starts_with("---\ntype: index\nscope: layer\nfolder: records\n"),
2296            "layer fm:\n{md}"
2297        );
2298        // Alphabetical type-folder order: companies before contacts.
2299        let companies_at = md.find("companies/index").unwrap();
2300        let contacts_at = md.find("contacts/index").unwrap();
2301        assert!(
2302            companies_at < contacts_at,
2303            "type folders must be alphabetical"
2304        );
2305        // Count + display only — with no `## Folders`, the rollup never invents
2306        // a per-folder description from a member summary.
2307        assert!(
2308            md.contains("- [[records/contacts/index|Contacts]] (2)\n"),
2309            "contacts entry:\n{md}"
2310        );
2311        assert!(
2312            md.contains("- [[records/companies/index|Companies]] (1)\n"),
2313            "companies entry:\n{md}"
2314        );
2315        // Crucially: no member summary leaked into the rollup as a description.
2316        assert!(
2317            !md.contains("Contact B newest") && !md.contains("Acme Inc"),
2318            "layer rollup must not quote a member summary:\n{md}"
2319        );
2320        // Layer `updated` is the max across children (contacts b = 05-09).
2321        assert!(
2322            md.contains("updated: 2026-05-09T00:00:00Z\n"),
2323            "layer updated must be max child:\n{md}"
2324        );
2325    }
2326
2327    #[test]
2328    fn folders_section_supplies_authored_display_and_description() {
2329        // The aligned contract: rollups surface the curator's `## Folders`
2330        // display + description; the tool never invents one. A folder with no
2331        // entry shows counts only — no member summary leaks in as a description.
2332        let (_d, mut store) = mk_store();
2333        store.config.folders.insert(
2334            "records/contacts".into(),
2335            crate::parser::FolderMeta {
2336                display: None,
2337                description: Some("people across customer + prospect accounts".into()),
2338            },
2339        );
2340        store.config.folders.insert(
2341            "sources/hubspot-exports".into(),
2342            crate::parser::FolderMeta {
2343                display: Some("HubSpot exports".into()),
2344                description: Some("deal + pipeline exports".into()),
2345            },
2346        );
2347        write_doc(
2348            &store,
2349            "records/contacts/a.md",
2350            "contact",
2351            Some("Contact A"),
2352            Some("2026-05-01T00:00:00Z"),
2353            "",
2354        );
2355        // companies has NO `## Folders` entry → counts only.
2356        write_doc(
2357            &store,
2358            "records/companies/x.md",
2359            "company",
2360            Some("Acme Inc"),
2361            Some("2026-05-05T00:00:00Z"),
2362            "",
2363        );
2364        write_doc(
2365            &store,
2366            "sources/hubspot-exports/d.md",
2367            "hubspot-export",
2368            Some("a single deal export"),
2369            Some("2026-05-03T00:00:00Z"),
2370            "",
2371        );
2372
2373        Index::rebuild_all(&store).unwrap();
2374
2375        // Authored description surfaced (contacts), with the derived display.
2376        let records_layer = read(&store, "records/index.md");
2377        assert!(
2378            records_layer.contains("- [[records/contacts/index|Contacts]] (1) — people across customer + prospect accounts\n"),
2379            "authored description must surface:\n{records_layer}"
2380        );
2381        // No `## Folders` entry ⇒ counts only; the member summary never leaks in.
2382        assert!(
2383            records_layer.contains("- [[records/companies/index|Companies]] (1)\n")
2384                && !records_layer.contains("Acme Inc"),
2385            "un-described folder is counts-only:\n{records_layer}"
2386        );
2387
2388        // Display override beats the derived "Hubspot exports".
2389        let sources_layer = read(&store, "sources/index.md");
2390        assert!(
2391            sources_layer.contains("- [[sources/hubspot-exports/index|HubSpot exports]] (1) — deal + pipeline exports\n"),
2392            "display override + description must surface:\n{sources_layer}"
2393        );
2394
2395        // Root rollup carries the same authored metadata (display + description).
2396        let root = read(&store, "index.md");
2397        assert!(
2398            root.contains("- [[records/contacts/index|Contacts]] (1) — people across customer + prospect accounts\n"),
2399            "root surfaces authored description:\n{root}"
2400        );
2401        assert!(
2402            root.contains("- [[sources/hubspot-exports/index|HubSpot exports]] (1) — deal + pipeline exports\n"),
2403            "root surfaces display override:\n{root}"
2404        );
2405    }
2406
2407    #[test]
2408    fn default_display_turns_separators_to_spaces_and_caps() {
2409        assert_eq!(default_display("contacts"), "Contacts");
2410        assert_eq!(default_display("hubspot-exports"), "Hubspot exports");
2411        assert_eq!(default_display("usage_exports"), "Usage exports");
2412    }
2413
2414    #[test]
2415    fn root_index_groups_layers_with_totals_and_per_type_counts() {
2416        let (_d, store) = mk_store();
2417        write_doc(
2418            &store,
2419            "sources/emails/2026/05/a.md",
2420            "email",
2421            Some("Mail"),
2422            Some("2026-05-01T00:00:00Z"),
2423            "",
2424        );
2425        write_doc(
2426            &store,
2427            "sources/docs/d.md",
2428            "doc",
2429            Some("Doc"),
2430            Some("2026-05-02T00:00:00Z"),
2431            "",
2432        );
2433        write_doc(
2434            &store,
2435            "records/contacts/c.md",
2436            "contact",
2437            Some("C"),
2438            Some("2026-05-03T00:00:00Z"),
2439            "",
2440        );
2441        // wiki empty → no Wiki section
2442
2443        Index::rebuild_all(&store).unwrap();
2444        let md = read(&store, "index.md");
2445
2446        assert!(
2447            md.starts_with("---\ntype: index\nscope: root\n"),
2448            "root fm:\n{md}"
2449        );
2450        assert!(md.contains("# Knowledge base index\n"), "root title:\n{md}");
2451        // Layer heading with total count; Sources before Records (canonical).
2452        let sources_h = md
2453            .find("## Sources (2)")
2454            .expect("sources heading w/ total 2");
2455        let records_h = md
2456            .find("## Records (1)")
2457            .expect("records heading w/ total 1");
2458        assert!(sources_h < records_h, "Sources must precede Records");
2459        assert!(!md.contains("## Wiki"), "empty layer gets no section");
2460        // Per-type sub-entries with (N), no preview at root.
2461        assert!(
2462            md.contains("- [[sources/docs/index|Docs]] (1)\n"),
2463            "root docs entry:\n{md}"
2464        );
2465        assert!(
2466            md.contains("- [[sources/emails/index|Emails]] (1)\n"),
2467            "root emails entry:\n{md}"
2468        );
2469        assert!(
2470            md.contains("- [[records/contacts/index|Contacts]] (1)\n"),
2471            "root contacts entry:\n{md}"
2472        );
2473        assert!(!md.contains("— "), "root entries carry no preview text");
2474    }
2475
2476    // ── write-through == rebuild (THE invariant) ─────────────────────────
2477
2478    #[test]
2479    fn on_write_matches_rebuild_byte_for_byte() {
2480        // Build a store incrementally via on_write, and a second identical store
2481        // via a single rebuild_all, then assert every index artifact is equal.
2482        let (_d1, wt) = mk_store();
2483        let (_d2, rb) = mk_store();
2484
2485        let docs: &[(&str, &str, &str, &str, &str)] = &[
2486            (
2487                "sources/emails/2026/05/e1.md",
2488                "email",
2489                "First mail",
2490                "2026-05-01T10:00:00Z",
2491                "tags:\n  - inbox\n",
2492            ),
2493            (
2494                "sources/emails/2026/06/e2.md",
2495                "email",
2496                "Second mail",
2497                "2026-06-01T10:00:00Z",
2498                "",
2499            ),
2500            (
2501                "records/contacts/sarah.md",
2502                "contact",
2503                "Sarah",
2504                "2026-05-15T10:00:00Z",
2505                "links:\n  - records/profiles/sarah\n",
2506            ),
2507            (
2508                "records/contacts/elena.md",
2509                "contact",
2510                "Elena",
2511                "2026-05-20T10:00:00Z",
2512                "status: active\n",
2513            ),
2514            (
2515                "records/profiles/sarah.md",
2516                "profile",
2517                "Sarah bio",
2518                "2026-05-21T10:00:00Z",
2519                "",
2520            ),
2521        ];
2522
2523        for (rel, t, sum, upd, extra) in docs {
2524            write_doc(&wt, rel, t, Some(sum), Some(upd), extra);
2525            write_doc(&rb, rel, t, Some(sum), Some(upd), extra);
2526            Index::on_write(&wt, Path::new(rel)).unwrap();
2527        }
2528        Index::rebuild_all(&rb).unwrap();
2529
2530        let a = snapshot_artifacts(&wt);
2531        let b = snapshot_artifacts(&rb);
2532        assert_eq!(
2533            a.keys().collect::<Vec<_>>(),
2534            b.keys().collect::<Vec<_>>(),
2535            "same set of index artifacts must exist"
2536        );
2537        for (k, v) in &a {
2538            assert_eq!(v, &b[k], "artifact {k} differs between write-through and rebuild:\n--- write-through ---\n{v}\n--- rebuild ---\n{}", b[k]);
2539        }
2540        // Sanity: artifacts actually exist (not a vacuous comparison of empties).
2541        assert!(a.contains_key("index.md"));
2542        assert!(a.contains_key("sources/emails/index.jsonl"));
2543        assert!(a.contains_key("records/contacts/index.md"));
2544    }
2545
2546    /// Regression (O(changed) bound, not just correctness): a loop op must
2547    /// recompute its parent rollups from the type-folder `index.jsonl` sidecars
2548    /// — never by walking the content tree of *sibling* folders it wasn't asked
2549    /// about. The byte-identity property test (which always indexes every folder
2550    /// before comparing) can't catch a violation, because a full-store walk
2551    /// produces the *correct* counts too; it just does so in `O(store files)`.
2552    ///
2553    /// The behavioral fingerprint of the old `update_parents → build_layer /
2554    /// build_root` (which called `walk_type_folder_files` on every type-folder in
2555    /// the store): a single `on_write` to `records/contacts/sarah.md` would
2556    /// surface, in the layer + root rollups, the file count of
2557    /// `records/companies` — a sibling that has content on disk but was NEVER
2558    /// passed to a write/index op, so it has no `index.jsonl`. An O(changed) loop
2559    /// op cannot "see" that un-indexed folder; a whole-store walk can. So this
2560    /// asserts the rollups reflect ONLY the sidecar-indexed folder, proving no
2561    /// content-tree walk happened.
2562    #[test]
2563    fn loop_op_does_not_walk_sibling_content_tree() {
2564        let (_d, store) = mk_store();
2565
2566        // A sibling type-folder with real content on disk, but deliberately
2567        // never indexed (no on_write / write_level / rebuild over it) ⇒ no
2568        // `records/companies/index.jsonl` exists.
2569        write_doc(
2570            &store,
2571            "records/companies/acme.md",
2572            "company",
2573            Some("Acme Inc"),
2574            Some("2026-05-05T00:00:00Z"),
2575            "",
2576        );
2577        write_doc(
2578            &store,
2579            "records/companies/globex.md",
2580            "company",
2581            Some("Globex"),
2582            Some("2026-05-06T00:00:00Z"),
2583            "",
2584        );
2585        assert!(
2586            !exists(&store, "records/companies/index.jsonl"),
2587            "precondition: companies must be un-indexed"
2588        );
2589
2590        // The ONLY loop op: a single write to a different type-folder.
2591        write_doc(
2592            &store,
2593            "records/contacts/sarah.md",
2594            "contact",
2595            Some("Sarah"),
2596            Some("2026-05-15T00:00:00Z"),
2597            "",
2598        );
2599        Index::on_write(&store, Path::new("records/contacts/sarah.md")).unwrap();
2600
2601        // The written folder is reflected in both rollups...
2602        let layer_md = read(&store, "records/index.md");
2603        let root_md = read(&store, "index.md");
2604        // (both rollups show counts only — no `## Folders` here, so no preview)
2605        assert!(
2606            layer_md.contains("- [[records/contacts/index|Contacts]] (1)\n")
2607                && !layer_md.contains("Sarah"),
2608            "layer must reflect the written folder, counts only:\n{layer_md}"
2609        );
2610        assert!(
2611            root_md.contains("- [[records/contacts/index|Contacts]] (1)\n"),
2612            "root must reflect the written folder:\n{root_md}"
2613        );
2614
2615        // ...but the un-indexed sibling must be INVISIBLE to a loop op. If the
2616        // rollups mention `records/companies` at all, `on_write` walked the whole
2617        // content tree — the O(store) regression.
2618        assert!(
2619            !layer_md.contains("companies"),
2620            "loop op walked the sibling content tree: layer rollup counts un-indexed records/companies\n{layer_md}"
2621        );
2622        assert!(
2623            !root_md.contains("companies"),
2624            "loop op walked the sibling content tree: root rollup counts un-indexed records/companies\n{root_md}"
2625        );
2626        // The layer's only child is contacts ⇒ its total is exactly 1, not 3.
2627        assert!(
2628            root_md.contains("## Records (1)"),
2629            "root layer total must count only the sidecar-indexed folder (1), not walked siblings (would be 3):\n{root_md}"
2630        );
2631
2632        // And the sidecar-derived count IS what a full walk WOULD yield once the
2633        // sibling is indexed too — i.e. the fix changes cost, not the eventual
2634        // result. Index companies, then confirm the rollups now (and only now)
2635        // include it, byte-identical to a from-scratch rebuild.
2636        let (_d2, rb) = mk_store();
2637        for (rel, t, s, u) in [
2638            (
2639                "records/companies/acme.md",
2640                "company",
2641                "Acme Inc",
2642                "2026-05-05T00:00:00Z",
2643            ),
2644            (
2645                "records/companies/globex.md",
2646                "company",
2647                "Globex",
2648                "2026-05-06T00:00:00Z",
2649            ),
2650            (
2651                "records/contacts/sarah.md",
2652                "contact",
2653                "Sarah",
2654                "2026-05-15T00:00:00Z",
2655            ),
2656        ] {
2657            write_doc(&rb, rel, t, Some(s), Some(u), "");
2658        }
2659        Index::on_write(&store, Path::new("records/companies/acme.md")).unwrap();
2660        Index::on_write(&store, Path::new("records/companies/globex.md")).unwrap();
2661        Index::rebuild_all(&rb).unwrap();
2662        let a = snapshot_artifacts(&store);
2663        let b = snapshot_artifacts(&rb);
2664        assert_eq!(
2665            a.keys().collect::<BTreeSet<_>>(),
2666            b.keys().collect::<BTreeSet<_>>(),
2667            "same artifact set after indexing both folders"
2668        );
2669        for (k, v) in &a {
2670            assert_eq!(
2671                v, &b[k],
2672                "after indexing the sibling too, loop result must equal rebuild for {k}"
2673            );
2674        }
2675        assert!(
2676            read(&store, "index.md").contains("## Records (3)"),
2677            "now that both folders are indexed, the root total is 3"
2678        );
2679    }
2680
2681    /// Regression: a type filed at the path the toolkit ITSELF computes
2682    /// (`Store::shard_path_for`) must be indexable end-to-end. The class of bug
2683    /// is a 2-component `<layer>/<file>` path, which `type_folder_of` treats as
2684    /// having no type-folder — making the producer (path computation) disagree
2685    /// with the consumer (index): the loop path crashes (`on_write` → `Err`, it
2686    /// tries to write `index.md` *inside* a file) while the sweep path silently
2687    /// drops the page from every catalog. A conclusion `profile` is a custom
2688    /// (non-built-in) type, so `shard_path_for` files it under the records-layer
2689    /// fallback `records/profile/<file>` — a conforming 3-component path. This test
2690    /// drives both paths through the real `shard_path_for` output and asserts
2691    /// (1) `on_write` succeeds, (2) the page appears in the rebuilt catalog, and
2692    /// (3) write-through == rebuild.
2693    #[test]
2694    fn custom_type_at_shard_path_for_is_indexable_end_to_end() {
2695        let (_d1, wt) = mk_store();
2696        let (_d2, rb) = mk_store();
2697
2698        // The toolkit's own canonical write path for a custom-type record.
2699        let rel = wt
2700            .shard_path_for(
2701                "profile",
2702                &crate::parser::Frontmatter::default(),
2703                "renewal-theme",
2704            )
2705            .unwrap();
2706        let rel_str = path_to_unix(&rel);
2707        // Guard the precondition the consumer requires: 3+ components so
2708        // `type_folder_of` resolves a real `<layer>/<type-folder>`.
2709        assert!(
2710            type_folder_of(&rel).is_some(),
2711            "shard_path_for produced a path the index cannot file: {rel_str}"
2712        );
2713
2714        write_doc(
2715            &wt,
2716            &rel_str,
2717            "profile",
2718            Some("Renewal theme"),
2719            Some("2026-05-21T10:00:00Z"),
2720            "",
2721        );
2722        write_doc(
2723            &rb,
2724            &rel_str,
2725            "profile",
2726            Some("Renewal theme"),
2727            Some("2026-05-21T10:00:00Z"),
2728            "",
2729        );
2730
2731        // (1) Loop path must NOT error (a 2-component `<layer>/<file>` shape
2732        // returned Err(Io(NotADirectory))).
2733        Index::on_write(&wt, &rel)
2734            .expect("on_write must succeed for a toolkit-computed custom-type path");
2735        Index::rebuild_all(&rb).unwrap();
2736
2737        // (2) The page is present in the rebuilt catalog (the old flat-path bug
2738        // silently omitted it from every artifact). The individual page link
2739        // lives in the *type-folder* index; the *layer* index rolls the
2740        // type-folder up — assert both, since the bug erased both. A custom
2741        // type's canonical folder is the records-layer fallback `records/profile`.
2742        let page_link = wiki_target(&rel); // records/profile/renewal-theme
2743        let tf_md = read(&rb, "records/profile/index.md");
2744        assert!(
2745            tf_md.contains(&format!("[[{page_link}]]")),
2746            "type-folder index must list the page link, got:\n{tf_md}"
2747        );
2748        assert!(
2749            exists(&rb, "records/profile/index.jsonl"),
2750            "type-folder jsonl must exist"
2751        );
2752        assert!(
2753            read(&rb, "records/profile/index.jsonl").contains(&rel_str),
2754            "type-folder jsonl must contain the page row"
2755        );
2756        // The layer index rolls the type-folder up (proves the page's folder is
2757        // visible to the layer catalog, not dropped).
2758        let layer_md = read(&rb, "records/index.md");
2759        assert!(
2760            layer_md.contains("records/profile/index"),
2761            "layer index must roll up the records/profile type-folder, got:\n{layer_md}"
2762        );
2763
2764        // (3) Write-through equals rebuild byte-for-byte — loop and sweep agree.
2765        let a = snapshot_artifacts(&wt);
2766        let b = snapshot_artifacts(&rb);
2767        assert_eq!(
2768            a.keys().collect::<Vec<_>>(),
2769            b.keys().collect::<Vec<_>>(),
2770            "loop and sweep must produce the same artifact set"
2771        );
2772        for (k, v) in &a {
2773            assert_eq!(
2774                v, &b[k],
2775                "custom-type artifact {k} differs between on_write and rebuild"
2776            );
2777        }
2778    }
2779
2780    #[test]
2781    fn on_remove_then_rebuild_match_and_pull_in_next_over_cap() {
2782        let (_d1, wt) = mk_store();
2783        let (_d2, rb) = mk_store();
2784        let total = MD_CAP + 3; // 503 files; removing one keeps md full at 500
2785        let mut all_rels = Vec::new();
2786        for i in 0..total {
2787            let rel = format!("sources/emails/2026/05/m-{i:04}.md");
2788            // `updated` strictly increasing across i by varying both minute and second
2789            let updated = format!("2026-05-10T00:{:02}:{:02}Z", i / 60, i % 60);
2790            write_doc(
2791                &wt,
2792                &rel,
2793                "email",
2794                Some(&format!("mail {i}")),
2795                Some(&updated),
2796                "",
2797            );
2798            write_doc(
2799                &rb,
2800                &rel,
2801                "email",
2802                Some(&format!("mail {i}")),
2803                Some(&updated),
2804                "",
2805            );
2806            all_rels.push(rel);
2807        }
2808        // Build write-through index, then remove the single newest file.
2809        Index::rebuild_all(&wt).unwrap();
2810        let newest = &all_rels[total - 1]; // highest i = newest updated
2811        fs::remove_file(wt.root.join(newest)).unwrap();
2812        Index::on_remove(&wt, Path::new(newest)).unwrap();
2813
2814        // Rebuild side: same end state (file physically absent).
2815        fs::remove_file(rb.root.join(newest)).unwrap();
2816        Index::rebuild_all(&rb).unwrap();
2817
2818        let a = snapshot_artifacts(&wt);
2819        let b = snapshot_artifacts(&rb);
2820        for (k, v) in &a {
2821            assert_eq!(v, &b[k], "after remove, artifact {k} drifted from rebuild");
2822        }
2823
2824        // The md must still hold exactly 500 entries (the 501st got pulled in)
2825        // and the removed file must be gone from both artifacts.
2826        let md = read(&wt, "sources/emails/index.md");
2827        assert_eq!(md.lines().filter(|l| l.starts_with("- [[")).count(), MD_CAP);
2828        // Removed (newest) file is gone from the bare-path md and the .md jsonl.
2829        assert!(
2830            !md.contains(&format!("[[{}]]", wiki_target(Path::new(newest)))),
2831            "removed file must not be listed in md"
2832        );
2833        // The file previously at rank 501 (excluded under the cap) is `all_rels[2]`
2834        // — `updated` increases with index, so newest-first rank 500 = index 2.
2835        // After dropping the newest it shifts into the visible 500.
2836        let pulled_in = &all_rels[2];
2837        assert!(
2838            md.contains(&format!("[[{}]]", wiki_target(Path::new(pulled_in)))),
2839            "the 501st-most-recent must be pulled into the browse view after a removal"
2840        );
2841        assert!(
2842            md.contains(&format!("This folder has {} files.", total - 1)),
2843            "footer count must decrement:\n{}",
2844            md.lines().rev().take(4).collect::<Vec<_>>().join("\n")
2845        );
2846        let jsonl = read(&wt, "sources/emails/index.jsonl");
2847        assert_eq!(
2848            jsonl.lines().count(),
2849            total - 1,
2850            "jsonl loses exactly the removed file"
2851        );
2852        assert!(
2853            !jsonl.contains(&path_to_unix(Path::new(newest))),
2854            "removed file must be gone from the jsonl too"
2855        );
2856    }
2857
2858    #[test]
2859    fn on_rename_cross_folder_matches_rebuild() {
2860        let (_d1, wt) = mk_store();
2861        let (_d2, rb) = mk_store();
2862        // Seed both stores identically.
2863        let seed: &[(&str, &str, &str, &str)] = &[
2864            (
2865                "records/contacts/a.md",
2866                "contact",
2867                "A",
2868                "2026-05-01T00:00:00Z",
2869            ),
2870            (
2871                "records/contacts/b.md",
2872                "contact",
2873                "B",
2874                "2026-05-02T00:00:00Z",
2875            ),
2876            (
2877                "records/companies/x.md",
2878                "company",
2879                "X",
2880                "2026-05-03T00:00:00Z",
2881            ),
2882        ];
2883        for (rel, t, s, u) in seed {
2884            write_doc(&wt, rel, t, Some(s), Some(u), "");
2885            write_doc(&rb, rel, t, Some(s), Some(u), "");
2886        }
2887        Index::rebuild_all(&wt).unwrap();
2888
2889        // Rename contacts/b.md -> companies/b.md (cross type-folder). The file's
2890        // `type` changes to match its new folder, as a real `dbmd rename` would.
2891        let old = "records/contacts/b.md";
2892        let new = "records/companies/b.md";
2893        fs::create_dir_all(wt.root.join("records/companies")).unwrap();
2894        fs::rename(wt.root.join(old), wt.root.join(new)).unwrap();
2895        // (type stays "contact" here; index copies frontmatter verbatim — the
2896        // test only asserts placement + parity with rebuild.)
2897        Index::on_rename(&wt, Path::new(old), Path::new(new)).unwrap();
2898
2899        // Rebuild side: same end state.
2900        fs::create_dir_all(rb.root.join("records/companies")).unwrap();
2901        fs::rename(rb.root.join(old), rb.root.join(new)).unwrap();
2902        Index::rebuild_all(&rb).unwrap();
2903
2904        let a = snapshot_artifacts(&wt);
2905        let b = snapshot_artifacts(&rb);
2906        assert_eq!(a.keys().collect::<Vec<_>>(), b.keys().collect::<Vec<_>>());
2907        for (k, v) in &a {
2908            assert_eq!(v, &b[k], "rename: artifact {k} drifted from rebuild");
2909        }
2910        // Concretely: b is gone from contacts, present in companies.
2911        let contacts = read(&wt, "records/contacts/index.md");
2912        assert!(!contacts.contains("records/contacts/b]]"));
2913        let companies = read(&wt, "records/companies/index.md");
2914        assert!(companies.contains("[[records/companies/b]]"));
2915    }
2916
2917    #[test]
2918    fn on_write_updates_existing_entry_in_place() {
2919        let (_d, store) = mk_store();
2920        write_doc(
2921            &store,
2922            "records/contacts/a.md",
2923            "contact",
2924            Some("Original"),
2925            Some("2026-05-01T00:00:00Z"),
2926            "",
2927        );
2928        Index::on_write(&store, Path::new("records/contacts/a.md")).unwrap();
2929        // Edit the same file: new summary + newer updated.
2930        write_doc(
2931            &store,
2932            "records/contacts/a.md",
2933            "contact",
2934            Some("Revised"),
2935            Some("2026-05-09T00:00:00Z"),
2936            "",
2937        );
2938        Index::on_write(&store, Path::new("records/contacts/a.md")).unwrap();
2939
2940        let jsonl = read(&store, "records/contacts/index.jsonl");
2941        assert_eq!(
2942            jsonl.lines().count(),
2943            1,
2944            "upsert must not duplicate the line"
2945        );
2946        assert!(jsonl.contains("Revised"), "jsonl must reflect the update");
2947        assert!(
2948            !jsonl.contains("Original"),
2949            "stale line must be gone (compacted)"
2950        );
2951        let md = read(&store, "records/contacts/index.md");
2952        assert!(md.contains("- [[records/contacts/a]] — Revised\n"));
2953        assert!(
2954            md.contains("updated: 2026-05-09T00:00:00Z\n"),
2955            "index updated must track the newer member"
2956        );
2957    }
2958
2959    // ── dry-run + cleanup ────────────────────────────────────────────────
2960
2961    #[test]
2962    fn dry_run_emits_separators_and_writes_nothing() {
2963        let (_d, store) = mk_store();
2964        write_doc(
2965            &store,
2966            "sources/emails/2026/05/a.md",
2967            "email",
2968            Some("Mail"),
2969            Some("2026-05-01T00:00:00Z"),
2970            "",
2971        );
2972        let out = Index::render_dry_run(&store, &IndexLevel::TypeFolder("sources/emails".into()))
2973            .unwrap();
2974        assert!(
2975            out.contains("--- sources/emails/index.md ---\n"),
2976            "md separator:\n{out}"
2977        );
2978        assert!(
2979            out.contains("--- sources/emails/index.jsonl ---\n"),
2980            "jsonl separator:\n{out}"
2981        );
2982        assert!(
2983            out.contains("- [[sources/emails/2026/05/a]] — Mail"),
2984            "md body present"
2985        );
2986        // Nothing was written to disk.
2987        assert!(
2988            !exists(&store, "sources/emails/index.md"),
2989            "dry-run must not write"
2990        );
2991        assert!(
2992            !exists(&store, "sources/emails/index.jsonl"),
2993            "dry-run must not write"
2994        );
2995    }
2996
2997    #[test]
2998    fn cleanup_removes_noncanonical_and_empty_indexes() {
2999        let (_d, store) = mk_store();
3000        write_doc(
3001            &store,
3002            "sources/emails/2026/05/a.md",
3003            "email",
3004            Some("Mail"),
3005            Some("2026-05-01T00:00:00Z"),
3006            "",
3007        );
3008        // A stray index inside a date-shard (non-canonical) ...
3009        fs::write(
3010            store.root.join("sources/emails/2026/05/index.md"),
3011            "stale\n",
3012        )
3013        .unwrap();
3014        fs::write(
3015            store.root.join("sources/emails/2026/05/index.jsonl"),
3016            "stale\n",
3017        )
3018        .unwrap();
3019        // ... and an index in an empty type-folder.
3020        fs::create_dir_all(store.root.join("records/empty")).unwrap();
3021        fs::write(store.root.join("records/empty/index.md"), "stale\n").unwrap();
3022
3023        Index::cleanup(&store).unwrap();
3024
3025        assert!(
3026            !exists(&store, "sources/emails/2026/05/index.md"),
3027            "shard index must be deleted"
3028        );
3029        assert!(
3030            !exists(&store, "sources/emails/2026/05/index.jsonl"),
3031            "shard jsonl must be deleted"
3032        );
3033        assert!(
3034            !exists(&store, "records/empty/index.md"),
3035            "empty-folder index must be deleted"
3036        );
3037        // The canonical type-folder file itself is untouched by cleanup.
3038        assert!(exists(&store, "sources/emails/2026/05/a.md"));
3039    }
3040
3041    #[test]
3042    fn rebuild_deletes_stale_indexes_for_emptied_folders() {
3043        let (_d, store) = mk_store();
3044        write_doc(
3045            &store,
3046            "records/contacts/a.md",
3047            "contact",
3048            Some("A"),
3049            Some("2026-05-01T00:00:00Z"),
3050            "",
3051        );
3052        Index::rebuild_all(&store).unwrap();
3053        assert!(exists(&store, "records/contacts/index.md"));
3054        assert!(exists(&store, "records/index.md"));
3055        assert!(exists(&store, "index.md"));
3056
3057        // Empty the folder entirely, then rebuild: all three levels vanish.
3058        fs::remove_file(store.root.join("records/contacts/a.md")).unwrap();
3059        Index::rebuild_all(&store).unwrap();
3060        assert!(
3061            !exists(&store, "records/contacts/index.md"),
3062            "emptied type-folder index gone"
3063        );
3064        assert!(
3065            !exists(&store, "records/index.md"),
3066            "now-empty layer index gone"
3067        );
3068        assert!(!exists(&store, "index.md"), "now-empty root index gone");
3069    }
3070
3071    // ── randomized parity (property-style) ───────────────────────────────
3072
3073    #[test]
3074    fn property_writethrough_equals_rebuild_under_mixed_ops() {
3075        // Deterministic pseudo-random op sequence (no rand crate): a small LCG.
3076        let (_d1, wt) = mk_store();
3077        let (_d2, rb) = mk_store();
3078        let mut seed: u64 = 0x9E3779B97F4A7C15;
3079        let mut next = || {
3080            seed = seed
3081                .wrapping_mul(6364136223846793005)
3082                .wrapping_add(1442695040888963407);
3083            (seed >> 33) as u32
3084        };
3085
3086        let folders = ["sources/emails", "records/contacts", "records/profiles"];
3087        let types = ["email", "contact", "profile"];
3088        let mut live: Vec<String> = Vec::new(); // store-relative paths that exist
3089
3090        for step in 0..120u32 {
3091            let r = next();
3092            let op = r % 10;
3093            if op < 6 || live.is_empty() {
3094                // CREATE/UPDATE
3095                let fi = (next() as usize) % folders.len();
3096                let folder = folders[fi];
3097                let id = next() % 40;
3098                let rel = if folder == "sources/emails" {
3099                    let month = 5 + (id % 2); // shard across two months
3100                    format!("{folder}/2026/{month:02}/f-{id:02}.md")
3101                } else {
3102                    format!("{folder}/f-{id:02}.md")
3103                };
3104                // recency varies with step so order is meaningful + total
3105                let updated = format!(
3106                    "2026-05-{:02}T{:02}:{:02}:00Z",
3107                    1 + (step % 27),
3108                    step % 24,
3109                    id % 60
3110                );
3111                let extra = if id % 3 == 0 {
3112                    "tags:\n  - x\n  - y\n"
3113                } else {
3114                    ""
3115                };
3116                write_doc(
3117                    &wt,
3118                    &rel,
3119                    types[fi],
3120                    Some(&format!("sum {step}")),
3121                    Some(&updated),
3122                    extra,
3123                );
3124                write_doc(
3125                    &rb,
3126                    &rel,
3127                    types[fi],
3128                    Some(&format!("sum {step}")),
3129                    Some(&updated),
3130                    extra,
3131                );
3132                Index::on_write(&wt, Path::new(&rel)).unwrap();
3133                if !live.contains(&rel) {
3134                    live.push(rel);
3135                }
3136            } else if op < 8 {
3137                // REMOVE a live file
3138                let idx = (next() as usize) % live.len();
3139                let rel = live.remove(idx);
3140                fs::remove_file(wt.root.join(&rel)).unwrap();
3141                fs::remove_file(rb.root.join(&rel)).ok();
3142                Index::on_remove(&wt, Path::new(&rel)).unwrap();
3143            } else {
3144                // RENAME a live file within the same layer (new id, maybe new type-folder)
3145                let idx = (next() as usize) % live.len();
3146                let old = live[idx].clone();
3147                // pick a destination folder in the same layer-ish set
3148                let fi = (next() as usize) % folders.len();
3149                let folder = folders[fi];
3150                let id = 50 + (next() % 40);
3151                let new = if folder == "sources/emails" {
3152                    format!("{folder}/2026/05/f-{id:02}.md")
3153                } else {
3154                    format!("{folder}/f-{id:02}.md")
3155                };
3156                if new == old || live.contains(&new) {
3157                    continue;
3158                }
3159                fs::create_dir_all(wt.root.join(&new).parent().unwrap()).unwrap();
3160                fs::create_dir_all(rb.root.join(&new).parent().unwrap()).unwrap();
3161                fs::rename(wt.root.join(&old), wt.root.join(&new)).unwrap();
3162                fs::rename(rb.root.join(&old), rb.root.join(&new)).unwrap();
3163                Index::on_rename(&wt, Path::new(&old), Path::new(&new)).unwrap();
3164                live[idx] = new;
3165            }
3166        }
3167
3168        // Now rebuild the rb side from the shared end state and compare.
3169        Index::rebuild_all(&rb).unwrap();
3170        let a = snapshot_artifacts(&wt);
3171        let b = snapshot_artifacts(&rb);
3172        assert_eq!(
3173            a.keys().collect::<BTreeSet<_>>(),
3174            b.keys().collect::<BTreeSet<_>>(),
3175            "write-through and rebuild must produce the same set of artifacts"
3176        );
3177        for (k, v) in &a {
3178            assert_eq!(
3179                v, &b[k],
3180                "INVARIANT VIOLATED: artifact {k} differs after mixed ops\n--- write-through ---\n{v}\n--- rebuild ---\n{}",
3181                b[k]
3182            );
3183        }
3184        assert!(
3185            !a.is_empty(),
3186            "the run must have produced at least one artifact"
3187        );
3188    }
3189
3190    // ── regressions: cleanup must not delete user content ─────────────────
3191
3192    /// CRITICAL regression: a user content file named `index.md` inside a date
3193    /// shard (e.g. from a website/doc-export mirror) must SURVIVE `cleanup` /
3194    /// `rebuild_all`. The old filename-only match silently deleted it.
3195    #[test]
3196    fn cleanup_preserves_user_content_named_index_md_in_shard() {
3197        let (_d, store) = mk_store();
3198        // A real content record that merely happens to be named index.md.
3199        write_doc(
3200            &store,
3201            "sources/emails/2026/06/index.md",
3202            "email",
3203            Some("Important imported mail"),
3204            Some("2026-06-11T04:23:25Z"),
3205            "",
3206        );
3207        Index::cleanup(&store).unwrap();
3208        assert!(
3209            exists(&store, "sources/emails/2026/06/index.md"),
3210            "cleanup must not delete a user content file named index.md"
3211        );
3212        // A full rebuild (which runs cleanup first) must also preserve it.
3213        Index::rebuild_all(&store).unwrap();
3214        assert!(
3215            exists(&store, "sources/emails/2026/06/index.md"),
3216            "rebuild_all must not delete a user content file named index.md"
3217        );
3218        let kept = read(&store, "sources/emails/2026/06/index.md");
3219        assert!(
3220            kept.contains("Important imported mail"),
3221            "the user's record content must be intact"
3222        );
3223    }
3224
3225    /// HIGH regression: `cleanup` uses `min_depth(2)`, so the canonical
3226    /// type-folder-root `index.md`/`index.jsonl` are NOT deleted up front. A
3227    /// genuine generated catalog at the type-folder root survives a cleanup pass
3228    /// (it is only ever rewritten, or removed when the folder is truly empty).
3229    #[test]
3230    fn cleanup_keeps_canonical_type_folder_root_sidecars() {
3231        let (_d, store) = mk_store();
3232        write_doc(
3233            &store,
3234            "records/contacts/alice.md",
3235            "contact",
3236            Some("Alice"),
3237            Some("2026-05-01T00:00:00Z"),
3238            "",
3239        );
3240        Index::write_level(&store, &IndexLevel::TypeFolder("records/contacts".into())).unwrap();
3241        assert!(exists(&store, "records/contacts/index.md"));
3242        assert!(exists(&store, "records/contacts/index.jsonl"));
3243        Index::cleanup(&store).unwrap();
3244        assert!(
3245            exists(&store, "records/contacts/index.md"),
3246            "cleanup must keep the canonical type-folder index.md (non-empty folder)"
3247        );
3248        assert!(
3249            exists(&store, "records/contacts/index.jsonl"),
3250            "cleanup must keep the canonical type-folder index.jsonl (non-empty folder)"
3251        );
3252    }
3253
3254    // ── regression: write-through must not catalog index artifacts ────────
3255
3256    /// HIGH regression: routing a generated `index.md` through `on_write` (as
3257    /// `dbmd fm set records/contacts/index.md …` would) must NOT insert a phantom
3258    /// self-row — counts and bytes stay equal to a rebuild.
3259    #[test]
3260    fn on_write_ignores_index_artifact_no_phantom_row() {
3261        let (_d, store) = mk_store();
3262        write_doc(
3263            &store,
3264            "records/contacts/alice.md",
3265            "contact",
3266            Some("Alice"),
3267            Some("2026-05-01T00:00:00Z"),
3268            "",
3269        );
3270        Index::on_write(&store, Path::new("records/contacts/alice.md")).unwrap();
3271        let jsonl_before = read(&store, "records/contacts/index.jsonl");
3272        assert_eq!(jsonl_before.lines().count(), 1);
3273
3274        // Tamper: route the catalog file itself through on_write.
3275        Index::on_write(&store, Path::new("records/contacts/index.md")).unwrap();
3276
3277        let jsonl_after = read(&store, "records/contacts/index.jsonl");
3278        assert_eq!(
3279            jsonl_after.lines().count(),
3280            1,
3281            "on_write on index.md must not add a phantom self-row"
3282        );
3283        assert!(
3284            !jsonl_after.contains("\"type\":\"index\""),
3285            "the catalog artifact must never appear as a catalogued row"
3286        );
3287        // Root rollup count stays 1 (not inflated to 2).
3288        let root = read(&store, "index.md");
3289        assert!(
3290            root.contains("[[records/contacts/index|Contacts]] (1)"),
3291            "count must not inflate:\n{root}"
3292        );
3293    }
3294
3295    // ── regression: multi-line summary cannot inject a catalog line ───────
3296
3297    /// HIGH regression: a block-scalar summary spanning multiple lines must be
3298    /// collapsed to one line in the browse entry, so it cannot forge a standalone
3299    /// `- [[…]]` catalog line.
3300    #[test]
3301    fn multiline_summary_is_single_lined_in_index_md() {
3302        let (_d, store) = mk_store();
3303        // A YAML block scalar whose value embeds a forged-looking entry line.
3304        write_raw(
3305            &store,
3306            "records/notes/evil.md",
3307            "type: note\nupdated: 2026-06-10T00:00:00Z\nsummary: |-\n  legit first line\n  - [[records/secrets/fake|Click me]] — injected entry",
3308            "\nbody\n",
3309        );
3310        let idx = Index::build_type_folder(&store, Path::new("records/notes")).unwrap();
3311        let md = idx.to_markdown();
3312        // Exactly one browse entry line, and no embedded newline forging a second.
3313        let entry_lines = md.lines().filter(|l| l.starts_with("- [[")).count();
3314        assert_eq!(
3315            entry_lines, 1,
3316            "a multi-line summary must not produce extra entry lines:\n{md}"
3317        );
3318        assert!(
3319            md.contains(
3320                "- [[records/notes/evil]] — legit first line - [[records/secrets/fake|Click me]] — injected entry\n"
3321            ),
3322            "summary newlines must collapse to spaces inline:\n{md}"
3323        );
3324    }
3325
3326    // ── regression: writer/validator scalar coercion agreement ────────────
3327
3328    /// HIGH regression: an unquoted non-string scalar `summary`/`type`
3329    /// (`summary: 2026`, `type: true`) must be coerced to a string by the index
3330    /// writer exactly as `validate::scalar_string` does — so the index entry holds
3331    /// the real value (`2026`), not the `(no summary)` placeholder that produced a
3332    /// permanently-unfixable INDEX_SUMMARY_MISMATCH.
3333    #[test]
3334    fn non_string_scalar_summary_and_type_are_coerced_like_validator() {
3335        let (_d, store) = mk_store();
3336        write_raw(
3337            &store,
3338            "records/contacts/a.md",
3339            "type: contact\nupdated: 2026-05-01T00:00:00Z\nsummary: 2026",
3340            "\nbody\n",
3341        );
3342        let rec = record_from_file(
3343            &store.root.join("records/contacts/a.md"),
3344            PathBuf::from("records/contacts/a.md"),
3345        )
3346        .unwrap();
3347        // `summary: 2026` (YAML number) coerces to the string "2026", matching
3348        // the validator's `scalar_string` (Number -> n.to_string()).
3349        assert_eq!(rec.summary, "2026");
3350        assert_eq!(rec.type_, "contact");
3351
3352        // And the rendered index entry quotes the real value, not the placeholder.
3353        let idx = Index::build_type_folder(&store, Path::new("records/contacts")).unwrap();
3354        let md = idx.to_markdown();
3355        assert!(
3356            md.contains("- [[records/contacts/a]] — 2026\n"),
3357            "index entry must hold the coerced scalar, not the placeholder:\n{md}"
3358        );
3359
3360        // A boolean scalar type coerces to "true" (mirrors scalar_string(Bool)).
3361        write_raw(
3362            &store,
3363            "records/contacts/b.md",
3364            "type: true\nupdated: 2026-05-02T00:00:00Z\nsummary: hi",
3365            "\nbody\n",
3366        );
3367        let rec_b = record_from_file(
3368            &store.root.join("records/contacts/b.md"),
3369            PathBuf::from("records/contacts/b.md"),
3370        )
3371        .unwrap();
3372        assert_eq!(rec_b.type_, "true");
3373    }
3374
3375    // ── regression: non-UTF-8 body must not abort the projection ──────────
3376
3377    /// HIGH regression: a content file with valid-UTF-8 frontmatter but a
3378    /// non-UTF-8 byte in the BODY (a verbatim Latin-1 `sources/` import) must
3379    /// still project to an IndexRecord — `record_from_file` reads frontmatter
3380    /// without requiring the whole file to be UTF-8, so a stray byte can't abort
3381    /// `rebuild_all` / write-through for the entire store.
3382    #[test]
3383    fn non_utf8_body_does_not_abort_record_projection() {
3384        let (_d, store) = mk_store();
3385        let rel = "sources/emails/2026/06/x.md";
3386        let abs = store.root.join(rel);
3387        fs::create_dir_all(abs.parent().unwrap()).unwrap();
3388        // Valid-UTF-8 frontmatter; a raw 0xE9 (Latin-1 'é') in the body.
3389        let mut bytes: Vec<u8> =
3390            b"---\ntype: email\nupdated: 2026-06-11T00:00:00Z\nsummary: An imported email\n---\n\nCaf"
3391                .to_vec();
3392        bytes.push(0xE9);
3393        bytes.extend_from_slice(b" meeting notes\n");
3394        fs::write(&abs, bytes).unwrap();
3395
3396        let rec = record_from_file(&abs, PathBuf::from(rel))
3397            .expect("non-UTF-8 body must not abort the frontmatter read");
3398        assert_eq!(rec.summary, "An imported email");
3399        assert_eq!(rec.type_, "email");
3400
3401        // The full sweep indexes the folder rather than aborting the whole store.
3402        Index::rebuild_all(&store).unwrap();
3403        assert!(
3404            exists(&store, "sources/emails/index.jsonl"),
3405            "rebuild must produce the catalog despite a non-UTF-8 body byte"
3406        );
3407        assert!(
3408            read(&store, "sources/emails/index.jsonl").contains("An imported email"),
3409            "the record must be catalogued"
3410        );
3411    }
3412
3413    /// HIGH regression: a single malformed-YAML file must abort the rebuild
3414    /// loudly (not be silently skipped) — skipping it would leave the store in a
3415    /// permanently invalid state (`INDEX_MISSING_ENTRY` / `INDEX_JSONL_DESYNC`
3416    /// that no rebuild clears, since the validator enumerates members by
3417    /// filename, not by parseability) and would desync the rollups. The abort is
3418    /// safe because `cleanup` preserves the prior canonical catalogs
3419    /// (`min_depth(2)`), so an aborted rebuild leaves the existing sidecars
3420    /// intact and surfaces a clear error naming the file to fix.
3421    #[test]
3422    fn rebuild_aborts_on_malformed_file_and_keeps_prior_catalogs() {
3423        let (_d, store) = mk_store();
3424        write_doc(
3425            &store,
3426            "records/contacts/alice.md",
3427            "contact",
3428            Some("Alice"),
3429            Some("2026-05-01T00:00:00Z"),
3430            "",
3431        );
3432        write_doc(
3433            &store,
3434            "records/companies/acme.md",
3435            "company",
3436            Some("Acme"),
3437            Some("2026-05-02T00:00:00Z"),
3438            "",
3439        );
3440
3441        // A clean first rebuild establishes the canonical catalogs.
3442        Index::rebuild_all(&store).expect("clean rebuild succeeds");
3443        assert!(exists(&store, "records/contacts/index.jsonl"));
3444        assert!(exists(&store, "records/companies/index.jsonl"));
3445
3446        // Routine malformed file: unterminated quoted scalar.
3447        let bad = store.root.join("records/contacts/broken.md");
3448        fs::write(
3449            &bad,
3450            "---\ntype: contact\nsummary: \"unterminated\n---\nbody\n",
3451        )
3452        .unwrap();
3453
3454        // Must abort loudly — a silent skip leaves a file the validator requires
3455        // to be catalogued out of the index forever.
3456        Index::rebuild_all(&store)
3457            .expect_err("rebuild must abort, not silently skip, on a malformed file");
3458
3459        // The prior canonical catalogs survive the aborted rebuild: `cleanup`'s
3460        // `min_depth(2)` never deletes a type-folder's root-level sidecars, so a
3461        // mid-sweep abort leaves the existing indexes intact rather than wiped.
3462        assert!(
3463            exists(&store, "records/companies/index.jsonl"),
3464            "an aborted rebuild must not destroy a clean sibling folder's catalog"
3465        );
3466        assert!(
3467            exists(&store, "records/contacts/index.jsonl"),
3468            "an aborted rebuild must not destroy the affected folder's prior catalog"
3469        );
3470        let contacts_jsonl = read(&store, "records/contacts/index.jsonl");
3471        assert!(contacts_jsonl.contains("records/contacts/alice.md"));
3472    }
3473
3474    /// HIGH regression (problem B): `rebuild_all`'s rollup `(N)` counts must
3475    /// equal the catalogued `index.jsonl` record counts — never a raw `.md` walk
3476    /// that disagrees with the sidecar. The over-corrected skip-with-diagnostic
3477    /// build excluded a malformed file from `index.jsonl` while `build_layer` /
3478    /// `build_root` kept counting it via `walk_type_folder_files`, so a folder
3479    /// would show `Contacts (2)` in the root/layer rollups while its `index.jsonl`
3480    /// held only 1 record — and a single subsequent write-through (which derives
3481    /// `(N)` from the jsonl) rewrote it to `Contacts (1)`, making `rebuild_all`
3482    /// and write-through emit different bytes for the same state. With the loud
3483    /// abort, the only successful-rebuild states are fully consistent: every
3484    /// rollup `(N)` equals the catalogued record count AND equals what a
3485    /// write-through over the same files produces.
3486    #[test]
3487    fn rebuild_rollup_counts_equal_jsonl_records_and_write_through() {
3488        let (_d, store) = mk_store();
3489        // Two well-formed contacts: the rollups must read (2), matching the two
3490        // jsonl records — this is the count the skip-version inflated to a phantom
3491        // extra when a malformed sibling was present-but-uncatalogued.
3492        write_doc(
3493            &store,
3494            "records/contacts/alice.md",
3495            "contact",
3496            Some("Alice"),
3497            Some("2026-05-01T00:00:00Z"),
3498            "",
3499        );
3500        write_doc(
3501            &store,
3502            "records/contacts/bob.md",
3503            "contact",
3504            Some("Bob"),
3505            Some("2026-05-02T00:00:00Z"),
3506            "",
3507        );
3508        Index::rebuild_all(&store).expect("clean rebuild succeeds");
3509
3510        // The catalogued record set (index.jsonl) and the rollup (N) must agree.
3511        let jsonl_lines = read(&store, "records/contacts/index.jsonl")
3512            .lines()
3513            .filter(|l| !l.trim().is_empty())
3514            .count();
3515        assert_eq!(jsonl_lines, 2, "two well-formed files ⇒ two jsonl records");
3516        let layer_md = read(&store, "records/index.md");
3517        let root_md = read(&store, "index.md");
3518        assert!(
3519            layer_md.contains("- [[records/contacts/index|Contacts]] (2)"),
3520            "layer rollup (N) must equal the jsonl record count (2), not a raw .md walk:\n{layer_md}"
3521        );
3522        assert!(
3523            root_md.contains("- [[records/contacts/index|Contacts]] (2)\n")
3524                && root_md.contains("## Records (2)"),
3525            "root rollup (N)/layer total must equal the jsonl record count (2):\n{root_md}"
3526        );
3527
3528        // The decisive write-through == rebuild_all byte-identity check on the
3529        // SAME end state: a single on_write must not rewrite the rollups to a
3530        // different (N). Under the skip-version, rebuild_all's rollup walked the
3531        // raw .md tree while on_write derived (N) from the jsonl, so the two
3532        // diverged; the loud abort keeps both deriving (N) from the catalogued
3533        // records, so the bytes match exactly.
3534        let (_d2, wt) = mk_store();
3535        write_doc(
3536            &wt,
3537            "records/contacts/alice.md",
3538            "contact",
3539            Some("Alice"),
3540            Some("2026-05-01T00:00:00Z"),
3541            "",
3542        );
3543        write_doc(
3544            &wt,
3545            "records/contacts/bob.md",
3546            "contact",
3547            Some("Bob"),
3548            Some("2026-05-02T00:00:00Z"),
3549            "",
3550        );
3551        Index::on_write(&wt, Path::new("records/contacts/alice.md")).unwrap();
3552        Index::on_write(&wt, Path::new("records/contacts/bob.md")).unwrap();
3553
3554        let a = snapshot_artifacts(&wt);
3555        let b = snapshot_artifacts(&store);
3556        assert_eq!(
3557            a.keys().collect::<BTreeSet<_>>(),
3558            b.keys().collect::<BTreeSet<_>>(),
3559            "write-through and rebuild_all must produce the same artifact set"
3560        );
3561        for (k, v) in &a {
3562            assert_eq!(
3563                v, &b[k],
3564                "rollup bytes diverged between write-through and rebuild_all for {k} \
3565                 (a skip-version inflates rebuild_all's (N) above the jsonl record \
3566                 count, which write-through then rewrites):\n--- write-through ---\n{v}\n--- rebuild ---\n{}",
3567                b[k]
3568            );
3569        }
3570    }
3571
3572    /// MEDIUM regression: a non-UTF-8 path component must be lossily decoded
3573    /// (kept, with U+FFFD), not silently dropped — so the index key points at the
3574    /// file, not its parent directory. Unix-only (ext4 allows the filename; APFS
3575    /// rejects it at the VFS layer).
3576    #[cfg(unix)]
3577    #[test]
3578    fn non_utf8_path_component_is_kept_not_dropped() {
3579        use std::ffi::OsStr;
3580        use std::os::unix::ffi::OsStrExt;
3581        // sources/emails/caf\xE9.md — the leaf has a non-UTF-8 byte.
3582        let mut leaf = b"caf".to_vec();
3583        leaf.push(0xE9);
3584        leaf.extend_from_slice(b".md");
3585        let p = Path::new("sources/emails").join(OsStr::from_bytes(&leaf));
3586        let unix = path_to_unix(&p);
3587        // The leaf is preserved (lossy), so the path is NOT collapsed to the
3588        // parent directory "sources/emails".
3589        assert_ne!(
3590            unix, "sources/emails",
3591            "non-UTF-8 leaf must not be dropped, collapsing the path to its parent dir"
3592        );
3593        assert!(
3594            unix.starts_with("sources/emails/caf"),
3595            "the lossy leaf must remain under its folder: {unix}"
3596        );
3597    }
3598
3599    // ── loose files (directly at a layer root, no type-folder) ───────────────
3600
3601    #[test]
3602    fn loose_file_is_catalogued_in_layer_jsonl_not_type_folder() {
3603        let (_d, store) = mk_store();
3604        // One canonical file (in a type-folder) and one loose file at the root.
3605        write_doc(
3606            &store,
3607            "records/contacts/alice.md",
3608            "contact",
3609            Some("Alice"),
3610            Some("2026-06-01T08:00:00Z"),
3611            "id: alice\n",
3612        );
3613        write_doc(
3614            &store,
3615            "records/loose.md",
3616            "contact",
3617            Some("Loose"),
3618            Some("2026-06-01T08:00:00Z"),
3619            "id: loose\n",
3620        );
3621        Index::rebuild_all(&store).unwrap();
3622
3623        // The layer carries its own jsonl listing exactly the loose file —
3624        // disjoint from the type-folder jsonl, so no double-count.
3625        assert!(
3626            exists(&store, "records/index.jsonl"),
3627            "layer jsonl must exist when loose files are present"
3628        );
3629        let layer_jsonl = read(&store, "records/index.jsonl");
3630        assert!(
3631            layer_jsonl.contains("records/loose.md"),
3632            "layer jsonl must list the loose file, got:\n{layer_jsonl}"
3633        );
3634        assert!(
3635            !layer_jsonl.contains("records/contacts/alice.md"),
3636            "layer jsonl must NOT list type-folder files"
3637        );
3638        let tf_jsonl = read(&store, "records/contacts/index.jsonl");
3639        assert!(tf_jsonl.contains("records/contacts/alice.md"));
3640        assert!(!tf_jsonl.contains("records/loose.md"));
3641
3642        // The layer index.md stays a pure type-folder rollup — no loose entry.
3643        let layer_md = read(&store, "records/index.md");
3644        assert!(
3645            layer_md.contains("records/contacts/index"),
3646            "layer md must roll up the type-folder, got:\n{layer_md}"
3647        );
3648        assert!(
3649            !layer_md.contains("records/loose"),
3650            "layer md must stay a rollup, not list loose files, got:\n{layer_md}"
3651        );
3652    }
3653
3654    #[test]
3655    fn loose_file_write_through_equals_rebuild() {
3656        let (_d1, wt) = mk_store();
3657        let (_d2, rb) = mk_store();
3658        for s in [&wt, &rb] {
3659            write_doc(
3660                s,
3661                "records/contacts/alice.md",
3662                "contact",
3663                Some("Alice"),
3664                Some("2026-06-01T08:00:00Z"),
3665                "id: alice\n",
3666            );
3667            write_doc(
3668                s,
3669                "records/loose.md",
3670                "contact",
3671                Some("Loose"),
3672                Some("2026-06-02T08:00:00Z"),
3673                "id: loose\n",
3674            );
3675        }
3676        // wt: write-through (loop); rb: full rebuild (sweep). Must agree byte-wise.
3677        Index::on_write(&wt, Path::new("records/contacts/alice.md")).unwrap();
3678        Index::on_write(&wt, Path::new("records/loose.md")).unwrap();
3679        Index::rebuild_all(&rb).unwrap();
3680
3681        let a = snapshot_artifacts(&wt);
3682        let b = snapshot_artifacts(&rb);
3683        assert_eq!(
3684            a.keys().collect::<Vec<_>>(),
3685            b.keys().collect::<Vec<_>>(),
3686            "loose-file loop and sweep must produce the same artifact set"
3687        );
3688        for (k, v) in &a {
3689            assert_eq!(
3690                v, &b[k],
3691                "loose-file artifact {k} differs between loop and sweep"
3692            );
3693        }
3694    }
3695
3696    #[test]
3697    fn removing_last_loose_file_clears_layer_jsonl() {
3698        let (_d, store) = mk_store();
3699        write_doc(
3700            &store,
3701            "records/loose.md",
3702            "contact",
3703            Some("Loose"),
3704            Some("2026-06-01T08:00:00Z"),
3705            "id: loose\n",
3706        );
3707        Index::on_write(&store, Path::new("records/loose.md")).unwrap();
3708        assert!(
3709            exists(&store, "records/index.jsonl"),
3710            "layer jsonl present after a loose write"
3711        );
3712        fs::remove_file(store.root.join("records/loose.md")).unwrap();
3713        Index::on_remove(&store, Path::new("records/loose.md")).unwrap();
3714        assert!(
3715            !exists(&store, "records/index.jsonl"),
3716            "layer jsonl must be removed once the last loose file is gone"
3717        );
3718    }
3719
3720    // ── concurrency: shared layer/root rollup under parallel write-through ────
3721
3722    #[test]
3723    fn concurrent_writes_to_different_type_folders_match_rebuild() {
3724        use std::sync::Arc;
3725        use std::thread;
3726
3727        // Two threads, each owning a DISTINCT type-folder, drive `on_write`
3728        // concurrently. The layer `index.md` and root `index.md` are shared
3729        // across both folders, but each `on_write` only locks its own
3730        // type-folder — so before the `update_parents` store-root lock, the two
3731        // threads raced to rewrite those shared rollups and one update was lost
3732        // (the rollup no longer matched `rebuild_all`). With the lock the final
3733        // rollups must be byte-identical to a from-scratch rebuild, regardless
3734        // of interleaving.
3735        let (_d, store) = mk_store();
3736        let folders = ["records/contacts", "records/companies"];
3737        let n = 12usize;
3738
3739        // Pre-create all content files (disjoint paths) so the threads race only
3740        // on the index write-through, not on content creation.
3741        for (fi, folder) in folders.iter().enumerate() {
3742            for i in 0..n {
3743                write_doc(
3744                    &store,
3745                    &format!("{folder}/f{fi}_{i}.md"),
3746                    "contact",
3747                    Some(&format!("Summary {fi}-{i}")),
3748                    Some(&format!("2026-06-{:02}T08:00:00Z", i + 1)),
3749                    &format!("id: f{fi}_{i}\n"),
3750                );
3751            }
3752        }
3753
3754        let store = Arc::new(store);
3755        let handles: Vec<_> = folders
3756            .iter()
3757            .enumerate()
3758            .map(|(fi, folder)| {
3759                let store = Arc::clone(&store);
3760                let folder = folder.to_string();
3761                thread::spawn(move || {
3762                    for i in 0..n {
3763                        let rel = format!("{folder}/f{fi}_{i}.md");
3764                        Index::on_write(&store, Path::new(&rel)).unwrap();
3765                    }
3766                })
3767            })
3768            .collect();
3769        for h in handles {
3770            h.join().unwrap();
3771        }
3772
3773        // Snapshot the write-through artifacts, then rebuild from scratch over
3774        // the identical content and snapshot again — they must agree exactly.
3775        let got = snapshot_artifacts(&store);
3776        Index::rebuild_all(&store).unwrap();
3777        let want = snapshot_artifacts(&store);
3778
3779        assert_eq!(
3780            got.keys().collect::<Vec<_>>(),
3781            want.keys().collect::<Vec<_>>(),
3782            "artifact set after concurrent write-through must match rebuild"
3783        );
3784        for (k, v) in &want {
3785            assert_eq!(
3786                &got[k], v,
3787                "rollup artifact {k} diverged from rebuild after concurrent writes"
3788            );
3789        }
3790    }
3791}