Skip to main content

dbmd_core/
index.rs

1//! `index` — the hierarchical content catalog.
2//!
3//! A uniform three-level tree: root + per-layer + per-type-folder. **Two
4//! artifacts per type-folder:** the human `index.md` (capped 500, recency
5//! browse) and the machine `index.jsonl` (complete, structured — one JSON
6//! object per file). Both read `summary` + key frontmatter fields + links
7//! directly from each file — there is no extraction logic here.
8//!
9//! **Maintained write-through** by the write commands ([`Index::on_write`] /
10//! [`Index::on_rename`] / [`Index::on_remove`] — the loop path, O(changed), no
11//! store walk); [`Index::rebuild_all`] is the from-scratch SWEEP repair.
12//!
13//! **Key invariant:** write-through must produce a byte-identical `index.md`
14//! and (post-compaction) `index.jsonl` to a full [`Index::rebuild_all`] over
15//! the same end state — the loop path can never drift from the repair path.
16//!
17//! # Implementation notes (deviations the reader should know)
18//!
19//! - **Self-contained, by design.** This module does its own shard-aware folder
20//!   walk, its own minimal frontmatter read, and its own atomic write, using
21//!   only `store.root` (a public field) and the `serde_norway` / `serde_json` /
22//!   `chrono` / `walkdir` crates rather than routing through the sibling
23//!   `store`/`parser` helpers ([`Store::walk_type_folder`],
24//!   [`Store::recent_in_type_folder`], [`parser::read_file`], …). The index has
25//!   to stamp a *deterministic* `updated:` and emit a *canonical, compacted*
26//!   `index.jsonl` (see the two notes below); keeping the read/walk/write local
27//!   is what makes the byte-identity invariant a true byte comparison, free of
28//!   any incidental formatting the shared readers might introduce. The public
29//!   signatures in `lib.rs` are untouched.
30//! - **Deterministic `updated:` on the index files themselves.** An index's own
31//!   `updated` frontmatter is derived as the max `updated` over the files it
32//!   catalogs (max over children for root/layer) — NOT wall-clock-now. This is
33//!   what makes the byte-identity invariant a *true* byte comparison: a
34//!   write-through write and a `rebuild_all` over the same end state stamp the
35//!   same value. (The SPEC's rendered examples show a wall-clock-looking value;
36//!   the conventions list only requires `updated: <RFC3339>`, and the
37//!   property-tested invariant dominates.)
38//! - **`index.jsonl` is always compacted.** Write-through rewrites the affected
39//!   type-folder's jsonl in canonical form (one current line per path, recency
40//!   order) rather than appending superseded/tombstone lines, so the jsonl is
41//!   byte-identical to `rebuild_all` *immediately* (a strictly stronger
42//!   guarantee than the SPEC's "post-compaction"). This keeps the loop cost at
43//!   one sidecar read + one rewrite per touched type-folder — O(folder), the
44//!   sanctioned loop primitive, never a whole-`Store::walk`.
45//! - **Root/layer entry styling** follows plan §index (`(N)` numeric counts;
46//!   layer headings in the root carry the layer's total count) which is more
47//!   specific than the SPEC's illustrative `(42 files)` prose example. Type
48//!   folders are listed alphabetically (a deterministic order a derived artifact
49//!   needs); `scope: type-folder` follows the conventions list, not the one
50//!   SPEC example that wrote `scope: folder`.
51
52use std::collections::BTreeMap;
53use std::fs;
54use std::io::Write as _;
55use std::path::{Path, PathBuf};
56
57use chrono::{DateTime, FixedOffset, SecondsFormat};
58use serde::{Deserialize, Serialize};
59use serde_json::Value;
60
61use crate::parser::FolderMeta;
62use crate::store::{Layer, Store};
63
64/// The browse-view cap for a type-folder `index.md`.
65const MD_CAP: usize = 500;
66
67/// Placeholder summary for a content file that has no `summary` frontmatter.
68/// The index never invents a real summary — that is `dbmd fm init`'s job; this
69/// marker is what `dbmd validate` keys off (`INDEX`-class issue).
70const MISSING_SUMMARY: &str = "(no summary)";
71
72/// The root `index.md` H1.
73const ROOT_TITLE: &str = "Knowledge base index";
74
75/// Which level of the catalog an [`Index`] represents.
76#[derive(Debug, Clone, PartialEq, Eq)]
77pub enum IndexLevel {
78    /// The store-wide root `index.md` (layers + per-type counts).
79    Root,
80    /// A layer `index.md` (every type-folder under one layer).
81    Layer(Layer),
82    /// A type-folder `index.md` + `index.jsonl` (every file in the folder).
83    TypeFolder(PathBuf),
84}
85
86/// One record in a type-folder's `index.jsonl` — the complete, structured twin
87/// of a single `index.md` browse entry.
88///
89/// `tags` are the document's flat labels; `links` are its concept/relationship
90/// wiki-link targets. Both are copied verbatim from the file — never inferred.
91/// `fields` holds the remaining type-specific frontmatter so the structured
92/// query path can filter on any key without opening the file.
93#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
94pub struct IndexRecord {
95    /// Store-relative path of the file (the upsert key; last-write-wins).
96    /// Serialized with forward slashes regardless of OS (see [`path_serde`]) so
97    /// the `index.jsonl` catalog is byte-portable across platforms.
98    #[serde(with = "path_serde")]
99    pub path: PathBuf,
100    /// The file's `type`.
101    #[serde(rename = "type")]
102    pub type_: String,
103    /// The file's `summary`.
104    pub summary: String,
105    /// The file's flat `tags`.
106    #[serde(default)]
107    pub tags: Vec<String>,
108    /// The file's concept/relationship wiki-link targets (store-relative).
109    #[serde(default)]
110    pub links: Vec<String>,
111    /// `created` timestamp.
112    pub created: Option<DateTime<FixedOffset>>,
113    /// `updated` timestamp (the recency key for the `index.md` cap order).
114    pub updated: Option<DateTime<FixedOffset>>,
115    /// Remaining type-specific frontmatter fields, verbatim.
116    #[serde(flatten)]
117    pub fields: BTreeMap<String, Value>,
118}
119
120/// A built (or being-built) catalog for one [`IndexLevel`], with both rendered
121/// artifacts available. Pure data until written via [`Index::write_level`].
122#[derive(Debug, Clone, PartialEq)]
123pub struct Index {
124    /// Which level this catalog is for.
125    pub level: IndexLevel,
126    /// The complete record set for this level (type-folder level; empty for
127    /// root/layer rollups, which carry only counts).
128    pub records: Vec<IndexRecord>,
129    /// Per-child counts for root/layer rollups (child path → file count).
130    pub child_counts: BTreeMap<PathBuf, usize>,
131}
132
133impl Index {
134    /// Build a type-folder catalog by aggregating across date-shards, producing
135    /// both artifacts. `index.md` selection is recency (updated desc, ties by
136    /// path asc; cap 500 with a `## More` footer over the cap); `index.jsonl`
137    /// holds every file. A file missing `summary` gets a placeholder + a
138    /// validate-detectable issue (the index never invents summaries).
139    pub fn build_type_folder(store: &Store, type_folder: &Path) -> crate::Result<Index> {
140        let rel = normalize_rel(type_folder);
141        let abs = store.root.join(&rel);
142        let mut records = Vec::new();
143        for file_abs in walk_type_folder_files(&abs) {
144            let rel_path =
145                rel_to_store(&store.root, &file_abs).expect("walked file is under the store root");
146            // Abort the build on a malformed file rather than skip it. A skipped
147            // file would still be a content member the validator requires to be
148            // catalogued (`validate::walk_content_files` enumerates by filename,
149            // not by parseability), so silently dropping it would leave the store
150            // in a permanently invalid state (`INDEX_MISSING_ENTRY` /
151            // `INDEX_JSONL_DESYNC` that no rebuild can clear) and would desync the
152            // rollups (`build_layer`/`build_root` count the raw `.md` files). The
153            // loud `?` is the right outcome: `cleanup` now preserves the prior
154            // canonical sidecars (`min_depth(2)`), so an aborted rebuild leaves
155            // the existing catalogs intact and the operator a clear error naming
156            // the file to fix — never a destroyed or silently-wrong index.
157            records.push(record_from_file(&file_abs, rel_path)?);
158        }
159        sort_records(&mut records);
160        Ok(Index {
161            level: IndexLevel::TypeFolder(rel),
162            records,
163            child_counts: BTreeMap::new(),
164        })
165    }
166
167    /// Build a layer catalog: every non-empty type-folder under the layer with
168    /// `(N)` counts and a newest-file `summary` preview (≤ 80 chars), plus the
169    /// **loose records** that live directly at the layer root (files with no
170    /// type-folder between them and the layer). The type-folder rollup is the
171    /// `index.md`; the loose records are the layer's own `index.jsonl` (so
172    /// structured reads — `query`, dedup, `graph` — see a loose file the same
173    /// way they see a canonical one). A layer with no loose files carries no
174    /// `index.jsonl`, so existing stores are byte-unchanged.
175    pub fn build_layer(store: &Store, layer: Layer) -> crate::Result<Index> {
176        let mut child_counts = BTreeMap::new();
177        for tf in type_folders_in_layer(store, layer) {
178            let abs = store.root.join(&tf);
179            let n = walk_type_folder_files(&abs).len();
180            if n > 0 {
181                child_counts.insert(tf, n);
182            }
183        }
184        let mut records = Vec::new();
185        for file_abs in loose_files_in_layer(store, layer) {
186            let rel_path =
187                rel_to_store(&store.root, &file_abs).expect("walked file is under the store root");
188            // Abort on a malformed loose file rather than skip it, mirroring
189            // `build_type_folder`: a skipped file is still a content member the
190            // validator requires to be catalogued, so dropping it would leave a
191            // permanently-invalid index. The loud `?` names the file to fix.
192            records.push(record_from_file(&file_abs, rel_path)?);
193        }
194        sort_records(&mut records);
195        Ok(Index {
196            level: IndexLevel::Layer(layer),
197            records,
198            child_counts,
199        })
200    }
201
202    /// Build the store-wide root catalog: one heading per non-empty layer with
203    /// total count + bulleted per-type sub-entries with `(N)` counts.
204    pub fn build_root(store: &Store) -> crate::Result<Index> {
205        let mut child_counts = BTreeMap::new();
206        for layer in Layer::all() {
207            for tf in type_folders_in_layer(store, layer) {
208                let abs = store.root.join(&tf);
209                let n = walk_type_folder_files(&abs).len();
210                if n > 0 {
211                    child_counts.insert(tf, n);
212                }
213            }
214        }
215        Ok(Index {
216            level: IndexLevel::Root,
217            records: Vec::new(),
218            child_counts,
219        })
220    }
221
222    /// Render this catalog as a canonical `index.md`.
223    pub fn to_markdown(&self) -> String {
224        match &self.level {
225            IndexLevel::TypeFolder(folder) => self.render_type_folder_md(folder),
226            IndexLevel::Layer(layer) => self.render_layer_md(*layer),
227            IndexLevel::Root => self.render_root_md(),
228        }
229    }
230
231    /// Render this catalog's `records` as the complete `index.jsonl` (one JSON
232    /// object per file, stable key order so diffs stay minimal). Used at the
233    /// type-folder level for its files, and at the layer level for the loose
234    /// files that live directly at the layer root. The root rollup carries no
235    /// records, so it never produces a jsonl.
236    pub fn to_jsonl(&self) -> String {
237        let mut out = String::new();
238        for rec in &self.records {
239            // The record type derives a deterministic, sorted key order
240            // (declared fields first, then the flattened `fields` BTreeMap).
241            let line = serde_json::to_string(rec).expect("IndexRecord serializes");
242            out.push_str(&line);
243            out.push('\n');
244        }
245        out
246    }
247
248    // ── rendering helpers ────────────────────────────────────────────────
249
250    fn render_type_folder_md(&self, folder: &Path) -> String {
251        let folder_disp = path_to_unix(folder);
252        let updated = max_updated(self.records.iter().map(|r| r.updated.as_ref()));
253        let mut s = String::new();
254        s.push_str("---\n");
255        s.push_str("type: index\n");
256        s.push_str("scope: type-folder\n");
257        s.push_str(&format!("folder: {folder_disp}\n"));
258        if let Some(ts) = updated {
259            s.push_str(&format!("updated: {}\n", fmt_ts(&ts)));
260        }
261        s.push_str("---\n\n");
262        s.push_str(&format!("# {folder_disp}\n\n"));
263
264        let shown = self.records.len().min(MD_CAP);
265        for rec in self.records.iter().take(shown) {
266            s.push_str(&format_md_entry(rec));
267            s.push('\n');
268        }
269
270        if self.records.len() > MD_CAP {
271            let type_ = self.records.first().map(|r| r.type_.as_str()).unwrap_or("");
272            let layer = folder
273                .components()
274                .next()
275                .and_then(|c| c.as_os_str().to_str())
276                .unwrap_or("");
277            s.push('\n');
278            s.push_str(&more_footer(self.records.len(), type_, layer));
279        }
280        s
281    }
282
283    /// Store-less layer rollup: counts only, no preview / no derived `updated`
284    /// (a layer index needs each child's on-disk jsonl for those — see
285    /// [`render_layer_md_with_store`], the canonical path every disk write
286    /// uses). This pure-data render is structurally identical sans preview.
287    fn render_layer_md(&self, layer: Layer) -> String {
288        let layer_dir = layer_dir_name(layer);
289        let mut s = String::new();
290        s.push_str("---\n");
291        s.push_str("type: index\n");
292        s.push_str("scope: layer\n");
293        s.push_str(&format!("folder: {layer_dir}\n"));
294        s.push_str("---\n\n");
295        s.push_str(&format!("# {layer_dir}\n\n"));
296        for (tf, n) in &self.child_counts {
297            let tf_unix = path_to_unix(tf);
298            let display = capitalize(folder_basename(tf));
299            s.push_str(&format!("- [[{tf_unix}/index|{display}]] ({n})\n"));
300        }
301        s
302    }
303
304    /// Store-less root rollup: counts only (the canonical disk render adds a
305    /// derived `updated` — see [`render_root_md_with_store`]).
306    fn render_root_md(&self) -> String {
307        let mut s = String::new();
308        s.push_str("---\n");
309        s.push_str("type: index\n");
310        s.push_str("scope: root\n");
311        s.push_str("---\n\n");
312        s.push_str(&format!("# {ROOT_TITLE}\n"));
313        for layer in Layer::all() {
314            let layer_dir = layer_dir_name(layer);
315            let prefix = format!("{layer_dir}/");
316            let children: Vec<(&PathBuf, &usize)> = self
317                .child_counts
318                .iter()
319                .filter(|(tf, _)| path_to_unix(tf).starts_with(&prefix))
320                .collect();
321            if children.is_empty() {
322                continue;
323            }
324            let total: usize = children.iter().map(|(_, n)| **n).sum();
325            s.push('\n');
326            s.push_str(&format!("## {} ({total})\n", capitalize(layer_dir)));
327            for (tf, n) in children {
328                let tf_unix = path_to_unix(tf);
329                let display = capitalize(folder_basename(tf));
330                s.push_str(&format!("- [[{tf_unix}/index|{display}]] ({n})\n"));
331            }
332        }
333        s
334    }
335}
336
337// ─────────────────────────────────────────────────────────────────────────
338// Write-through + sweep (free functions on the impl block).
339// ─────────────────────────────────────────────────────────────────────────
340
341impl Index {
342    /// **Write-through (loop, O(changed)).** Upsert a new/updated content file.
343    /// Reads the affected type-folder's `index.jsonl` (the sanctioned per-folder
344    /// sidecar read — never a whole-store walk), applies the change, and
345    /// atomically rewrites that folder's `index.md` + `index.jsonl` plus the
346    /// parent layer + root rollups so the artifacts equal a `rebuild_all` over
347    /// the same end state.
348    pub fn on_write(store: &Store, file: &Path) -> crate::Result<()> {
349        let file_rel = normalize_rel(file);
350        // The generated catalog files are not content — never upsert one into
351        // itself. `build_type_folder`'s walk already excludes `index.md`
352        // (`walk_type_folder_files`); the loop path must apply the same
353        // exclusion or editing `index.md` via `fm set` inserts a phantom
354        // self-row, inflating every `(N)` count and breaking the
355        // write-through == rebuild byte-identity invariant.
356        if is_index_artifact(&file_rel) {
357            return Ok(());
358        }
359        // A loose file (directly at a layer root, no type-folder) is catalogued
360        // in its layer's own `index.jsonl`; the layer `index.md` rollup is
361        // unaffected (loose files do not change type-folder counts).
362        if let Some(layer) = loose_layer_of(&file_rel) {
363            return apply_loose_change(store, layer, &file_rel, false);
364        }
365        let file_abs = store.root.join(&file_rel);
366        let folder = type_folder_of(&file_rel)
367            .ok_or_else(|| bad_index(&file_rel, "file is not inside a layer/type-folder"))?;
368        let record = record_from_file(&file_abs, file_rel.clone())?;
369
370        // Serialize the sidecar read-modify-write so concurrent sanctioned
371        // writes to this folder don't clobber each other's rows (lost update).
372        let _lock = FolderLock::acquire(&store.root.join(&folder));
373        let mut records = read_jsonl_records(&store.root.join(&folder).join("index.jsonl"))?;
374        records.retain(|r| r.path != record.path);
375        records.push(record);
376        sort_records(&mut records);
377
378        write_type_folder_artifacts(store, &folder, &records)?;
379        update_parents(store, &folder)?;
380        Ok(())
381    }
382
383    /// **Write-through (loop, O(changed)).** Move a file's entry between
384    /// type-folder indexes (or within, if the same folder) in both `index.md`
385    /// and `index.jsonl`, fixing counts on both sides.
386    pub fn on_rename(store: &Store, old: &Path, new: &Path) -> crate::Result<()> {
387        let old_rel = normalize_rel(old);
388        let new_rel = normalize_rel(new);
389        // Index artifacts are generated, not catalogued — a rename of/into one
390        // is not a content move (same reasoning as `on_write`). Skip rather than
391        // insert a phantom self-row.
392        if is_index_artifact(&old_rel) || is_index_artifact(&new_rel) {
393            return Ok(());
394        }
395        // If either side is a loose file (layer root, no type-folder), decompose
396        // into remove-old + add-new: each entry point routes to the correct
397        // catalog (the layer `index.jsonl` for a loose side, the type-folder for
398        // the other), giving the same end state as the cross-folder path below
399        // while reusing the tested single-file paths.
400        if loose_layer_of(&old_rel).is_some() || loose_layer_of(&new_rel).is_some() {
401            Self::on_remove(store, &old_rel)?;
402            Self::on_write(store, &new_rel)?;
403            return Ok(());
404        }
405        let old_folder = type_folder_of(&old_rel)
406            .ok_or_else(|| bad_index(&old_rel, "source is not inside a layer/type-folder"))?;
407        let new_folder = type_folder_of(&new_rel)
408            .ok_or_else(|| bad_index(&new_rel, "target is not inside a layer/type-folder"))?;
409
410        // Serialize the sidecar read-modify-write(s). For a cross-folder rename,
411        // lock BOTH folders, always in sorted order, so two renames touching the
412        // same pair can't deadlock. Held for the whole operation via RAII.
413        let _locks = lock_folders(store, &old_folder, &new_folder);
414
415        // Drop from the old folder.
416        let mut old_records =
417            read_jsonl_records(&store.root.join(&old_folder).join("index.jsonl"))?;
418        old_records.retain(|r| r.path != old_rel);
419
420        if old_folder == new_folder {
421            // Same folder: re-read the (now-renamed) file and upsert.
422            let record = record_from_file(&store.root.join(&new_rel), new_rel.clone())?;
423            old_records.retain(|r| r.path != record.path);
424            old_records.push(record);
425            sort_records(&mut old_records);
426            write_type_folder_artifacts(store, &old_folder, &old_records)?;
427            update_parents(store, &old_folder)?;
428            return Ok(());
429        }
430
431        // Cross-folder: write the trimmed old folder (or drop its indexes if
432        // now empty), then upsert into the new folder.
433        sort_records(&mut old_records);
434        write_type_folder_artifacts(store, &old_folder, &old_records)?;
435
436        let record = record_from_file(&store.root.join(&new_rel), new_rel.clone())?;
437        let mut new_records =
438            read_jsonl_records(&store.root.join(&new_folder).join("index.jsonl"))?;
439        new_records.retain(|r| r.path != record.path);
440        new_records.push(record);
441        sort_records(&mut new_records);
442        write_type_folder_artifacts(store, &new_folder, &new_records)?;
443
444        update_parents(store, &old_folder)?;
445        update_parents(store, &new_folder)?;
446        Ok(())
447    }
448
449    /// **Write-through (loop, O(changed)).** Drop a file's entry from both
450    /// `index.md` and `index.jsonl`; decrement counts; if the browse view drops
451    /// below the cap, the next-most-recent is already present in the complete
452    /// jsonl record set and re-renders into the md automatically.
453    pub fn on_remove(store: &Store, file: &Path) -> crate::Result<()> {
454        let file_rel = normalize_rel(file);
455        // Removing a generated catalog artifact is not a content removal; it has
456        // no row to drop (it was never catalogued). Skip, mirroring `on_write`.
457        if is_index_artifact(&file_rel) {
458            return Ok(());
459        }
460        // Loose file → drop its row from the layer `index.jsonl`.
461        if let Some(layer) = loose_layer_of(&file_rel) {
462            return apply_loose_change(store, layer, &file_rel, true);
463        }
464        let folder = type_folder_of(&file_rel)
465            .ok_or_else(|| bad_index(&file_rel, "file is not inside a layer/type-folder"))?;
466        // Serialize the sidecar read-modify-write (see `on_write`).
467        let _lock = FolderLock::acquire(&store.root.join(&folder));
468        let mut records = read_jsonl_records(&store.root.join(&folder).join("index.jsonl"))?;
469        let before = records.len();
470        records.retain(|r| r.path != file_rel);
471        if records.len() == before {
472            // Nothing to remove; still normalize the folder + parents so the
473            // artifacts stay canonical.
474        }
475        sort_records(&mut records);
476        write_type_folder_artifacts(store, &folder, &records)?;
477        update_parents(store, &folder)?;
478        Ok(())
479    }
480
481    /// **SWEEP repair.** Walk the store once and atomically (re)write root +
482    /// every non-empty layer + every non-empty type-folder `index.md` and
483    /// `index.jsonl` (compacting the jsonl). Also runs [`Index::cleanup`].
484    pub fn rebuild_all(store: &Store) -> crate::Result<()> {
485        Index::cleanup(store)?;
486        for layer in Layer::all() {
487            for tf in type_folders_in_layer(store, layer) {
488                let idx = Index::build_type_folder(store, &tf)?;
489                if idx.records.is_empty() {
490                    continue;
491                }
492                write_type_folder_artifacts(store, &tf, &idx.records)?;
493            }
494            let layer_idx = Index::build_layer(store, layer)?;
495            let layer_index_md = store.root.join(layer_dir_name(layer)).join("index.md");
496            if layer_idx.child_counts.is_empty() {
497                remove_if_exists(&layer_index_md)?;
498            } else {
499                write_atomic(
500                    &layer_index_md,
501                    render_layer_md_with_store(store, &layer_idx),
502                )?;
503            }
504            // The layer's own `index.jsonl` — present iff the layer has loose
505            // files directly at its root. Independent of the rollup above: a
506            // layer can have loose files but no type-folders, or vice versa.
507            write_layer_jsonl(store, layer, &layer_idx.records)?;
508        }
509        let root_idx = Index::build_root(store)?;
510        let root_index_md = store.root.join("index.md");
511        if root_idx.child_counts.is_empty() {
512            remove_if_exists(&root_index_md)?;
513        } else {
514            write_atomic(&root_index_md, render_root_md_with_store(store, &root_idx))?;
515        }
516        Ok(())
517    }
518
519    /// Rebuild ONE type-folder's `index.md`/`index.jsonl` from a fresh walk, then
520    /// cascade the new child count up to the layer and root rollups — so a
521    /// scoped `dbmd index rebuild --folder` leaves the hierarchy consistent,
522    /// exactly like `rebuild_all` and the loop-path `on_write` already do.
523    /// (Writing only the folder, as the CLI used to, left stale layer/root
524    /// counts that `validate` would then flag as an index desync.)
525    pub fn rebuild_folder(store: &Store, folder: &Path) -> crate::Result<()> {
526        Self::write_level(store, &IndexLevel::TypeFolder(folder.to_path_buf()))?;
527        update_parents(store, folder)
528    }
529
530    /// Atomically write a single level's artifact(s) to disk.
531    pub fn write_level(store: &Store, level: &IndexLevel) -> crate::Result<()> {
532        match level {
533            IndexLevel::TypeFolder(folder) => {
534                let idx = Index::build_type_folder(store, folder)?;
535                if idx.records.is_empty() {
536                    remove_if_exists(&store.root.join(folder).join("index.md"))?;
537                    remove_if_exists(&store.root.join(folder).join("index.jsonl"))?;
538                } else {
539                    write_type_folder_artifacts(store, folder, &idx.records)?;
540                }
541            }
542            IndexLevel::Layer(layer) => {
543                let idx = Index::build_layer(store, *layer)?;
544                let p = store.root.join(layer_dir_name(*layer)).join("index.md");
545                if idx.child_counts.is_empty() {
546                    remove_if_exists(&p)?;
547                } else {
548                    write_atomic(&p, render_layer_md_with_store(store, &idx))?;
549                }
550                write_layer_jsonl(store, *layer, &idx.records)?;
551            }
552            IndexLevel::Root => {
553                let idx = Index::build_root(store)?;
554                let p = store.root.join("index.md");
555                if idx.child_counts.is_empty() {
556                    remove_if_exists(&p)?;
557                } else {
558                    write_atomic(&p, render_root_md_with_store(store, &idx))?;
559                }
560            }
561        }
562        Ok(())
563    }
564
565    /// Render the generated indexes to a string with `--- <path> ---`
566    /// separators instead of writing them (`--dry-run`).
567    pub fn render_dry_run(store: &Store, level: &IndexLevel) -> crate::Result<String> {
568        let mut out = String::new();
569        match level {
570            IndexLevel::TypeFolder(folder) => {
571                let idx = Index::build_type_folder(store, folder)?;
572                let md_path = path_to_unix(&folder.join("index.md"));
573                let jsonl_path = path_to_unix(&folder.join("index.jsonl"));
574                out.push_str(&format!("--- {md_path} ---\n"));
575                out.push_str(&idx.to_markdown());
576                out.push_str(&format!("--- {jsonl_path} ---\n"));
577                out.push_str(&idx.to_jsonl());
578            }
579            IndexLevel::Layer(layer) => {
580                let idx = Index::build_layer(store, *layer)?;
581                let md_path = format!("{}/index.md", layer_dir_name(*layer));
582                out.push_str(&format!("--- {md_path} ---\n"));
583                out.push_str(&render_layer_md_with_store(store, &idx));
584            }
585            IndexLevel::Root => {
586                let idx = Index::build_root(store)?;
587                out.push_str("--- index.md ---\n");
588                out.push_str(&render_root_md_with_store(store, &idx));
589            }
590        }
591        Ok(out)
592    }
593
594    /// Cleanup pass (part of [`Index::rebuild_all`]): delete `index.md` /
595    /// `index.jsonl` in non-canonical folders (date-shards that should carry
596    /// none). Symmetric with index creation.
597    ///
598    /// **Only deletes generated catalog artifacts, never user content.** Two
599    /// guards keep this from eating data:
600    /// - `min_depth(2)` so the walk starts *below* the type-folder root — the
601    ///   canonical `<type-folder>/index.md` + `index.jsonl` are never targeted
602    ///   here (they are rewritten by the per-folder builders, or removed only
603    ///   when the folder is genuinely empty, in the dedicated branch below). The
604    ///   old `min_depth(1)` deleted them up front, so a rebuild aborted by one
605    ///   malformed file left every type-folder catalog destroyed.
606    /// - [`is_deletable_catalog_artifact`] confirms a shard-level `index.md` is
607    ///   an actual generated catalog (or stale/garbage leftover), NOT a content
608    ///   file a user wrote at that name (e.g. `dbmd write …/index.md --type
609    ///   email`, plausible when mirroring a website/doc export). Matching by
610    ///   filename alone silently deleted such records on the next rebuild.
611    pub fn cleanup(store: &Store) -> crate::Result<()> {
612        for layer in Layer::all() {
613            let layer_dir = store.root.join(layer_dir_name(layer));
614            if !layer_dir.is_dir() {
615                continue;
616            }
617            for tf in type_folders_in_layer(store, layer) {
618                let tf_abs = store.root.join(&tf);
619                // Any generated index inside a shard (below the type-folder
620                // root) is non-canonical: delete it. Never touch a user content
621                // file that merely happens to be named index.md.
622                for entry in walkdir::WalkDir::new(&tf_abs)
623                    .min_depth(2)
624                    .into_iter()
625                    .filter_map(|e| e.ok())
626                {
627                    let p = entry.path();
628                    if is_index_artifact(p) && is_deletable_catalog_artifact(p) {
629                        remove_if_exists(p)?;
630                    }
631                }
632                // Empty type-folder → no index at its root either. Same content
633                // guard: an `index.md` here that is actually a user record (the
634                // only file in the folder) is preserved, not deleted.
635                if walk_type_folder_files(&tf_abs).is_empty() {
636                    let md = tf_abs.join("index.md");
637                    if is_deletable_catalog_artifact(&md) {
638                        remove_if_exists(&md)?;
639                    }
640                    remove_if_exists(&tf_abs.join("index.jsonl"))?;
641                }
642            }
643        }
644        Ok(())
645    }
646}
647
648// ─────────────────────────────────────────────────────────────────────────
649// Private free helpers — all self-contained, none call back into Store/parser.
650// ─────────────────────────────────────────────────────────────────────────
651
652/// Write both artifacts for a type-folder, or delete them if the folder is now
653/// empty. The single funnel both write-through and rebuild go through, so their
654/// output is byte-identical by construction.
655fn write_type_folder_artifacts(
656    store: &Store,
657    folder: &Path,
658    records: &[IndexRecord],
659) -> crate::Result<()> {
660    let folder_abs = store.root.join(folder);
661    let md_path = folder_abs.join("index.md");
662    let jsonl_path = folder_abs.join("index.jsonl");
663    if records.is_empty() {
664        remove_if_exists(&md_path)?;
665        remove_if_exists(&jsonl_path)?;
666        return Ok(());
667    }
668    let idx = Index {
669        level: IndexLevel::TypeFolder(folder.to_path_buf()),
670        records: records.to_vec(),
671        child_counts: BTreeMap::new(),
672    };
673    write_atomic(&md_path, idx.to_markdown())?;
674    write_atomic(&jsonl_path, idx.to_jsonl())?;
675    Ok(())
676}
677
678/// Re-render the layer + root rollups that sit above `folder` — the
679/// **loop path**, O(changed). Counts + previews come from the type-folders'
680/// on-disk `index.jsonl` sidecars ([`collect_child_stats`]), NOT from a
681/// content-tree walk: a single write reads one sidecar per type-folder (shared
682/// across the layer and root rollups) — never the millions of files under the
683/// shards. `build_layer` / `build_root` (which *do* walk the content tree) are
684/// reserved for the from-scratch sweeps ([`Index::rebuild_all`],
685/// [`Index::write_level`], [`Index::render_dry_run`]). The result is
686/// byte-identical to those builders because in the loop — exactly as in
687/// `rebuild_all` — every touched folder's jsonl is rewritten before its parents
688/// are rolled up, so the per-folder stat (`count` / `newest`) equals what a
689/// from-scratch walk would compute.
690fn update_parents(store: &Store, folder: &Path) -> crate::Result<()> {
691    // Read every type-folder's sidecar EXACTLY ONCE into a stat cache (`count` +
692    // `newest` record), then render both rollups from the cache. This removed the
693    // old 2–3×-per-write reparse (`child_counts_from_jsonl` for a count, plus
694    // `render_layer_md_with_store` / `render_root_md_with_store` each doing a full
695    // `read_jsonl_records` parse + sort just to take `.first()`); the output stays
696    // byte-identical (`count` == `read_jsonl_records().len()`, `newest` == its
697    // `.first()`).
698    //
699    // COST, stated honestly: this is `O(total catalogued records)` per write, NOT
700    // `O(changed)`. `collect_child_stats` reads and line-parses EVERY type-folder
701    // sidecar in the store to recompute the rollups, so a single high-volume
702    // folder (months of ingested emails) makes an unrelated tiny write scan that
703    // whole sidecar (a ~50× slowdown at ~200k records was measured). The crate's
704    // literal `Store::walk` guard holds — this reads `index.jsonl` sidecars, not
705    // the content tree — but the broader `O(changed)` complexity the loop path
706    // advertises is NOT met here. Restoring true `O(changed)` needs a persisted
707    // per-folder stat cache (or an in-place rollup patch for `on_write`); that is
708    // a deliberate change to the catalog hot path, tracked as a follow-up, not
709    // done inline. Until then, do not describe this op as `O(changed)`.
710    //
711    // CONCURRENCY: the layer `index.md` and the root `index.md` are SHARED across
712    // every type-folder, but the calling write only holds a lock on its OWN
713    // type-folder (`on_write`/`on_remove`/`on_rename`). Two concurrent writes to
714    // *different* type-folders would otherwise both read the sidecar set and both
715    // rewrite the same two rollups, losing one update (a stale rollup that no
716    // longer matches `rebuild_all` — a write-through/rebuild parity violation).
717    // Serialize the whole read-stats + render + write under a store-root lock so
718    // the last writer to commit its sidecar (each write commits its own
719    // `index.jsonl` BEFORE calling here) observes every committed sidecar. Lock
720    // order is always type-folder(s) → root, and nothing acquires the root lock
721    // before a type-folder lock, so this cannot deadlock with the per-folder
722    // locks held by the caller.
723    let _root_lock = FolderLock::acquire(&store.root);
724    let stats = collect_child_stats(store, &Layer::all())?;
725
726    let layer = folder
727        .components()
728        .next()
729        .and_then(|c| c.as_os_str().to_str())
730        .and_then(layer_from_dir_name);
731    if let Some(layer) = layer {
732        let p = store.root.join(layer_dir_name(layer)).join("index.md");
733        if layer_has_children(&stats, layer) {
734            write_atomic(
735                &p,
736                render_layer_md_from_stats(layer, &stats, &store.config.folders),
737            )?;
738        } else {
739            remove_if_exists(&p)?;
740        }
741    }
742    let rp = store.root.join("index.md");
743    if stats.values().any(|s| s.count > 0) {
744        write_atomic(
745            &rp,
746            render_root_md_from_stats(&stats, &store.config.folders),
747        )?;
748    } else {
749        remove_if_exists(&rp)?;
750    }
751    Ok(())
752}
753
754/// True if `layer` has at least one non-empty child type-folder in `stats`.
755fn layer_has_children(stats: &BTreeMap<PathBuf, FolderStat>, layer: Layer) -> bool {
756    let prefix = format!("{}/", layer_dir_name(layer));
757    stats
758        .iter()
759        .any(|(tf, s)| s.count > 0 && path_to_unix(tf).starts_with(&prefix))
760}
761
762/// Render a layer `index.md` from the prebuilt per-folder stat cache — each
763/// child's count + newest summary/updated come from its single cached sidecar
764/// read, so the rollup matches the folder artifacts exactly (write-through and
765/// rebuild alike) without re-reading any sidecar.
766fn render_layer_md_from_stats(
767    layer: Layer,
768    stats: &BTreeMap<PathBuf, FolderStat>,
769    folders: &BTreeMap<String, FolderMeta>,
770) -> String {
771    let layer_dir = layer_dir_name(layer);
772    let prefix = format!("{layer_dir}/");
773    let mut max_upd: Option<DateTime<FixedOffset>> = None;
774    let mut entries = String::new();
775    for (tf, stat) in stats {
776        if stat.count == 0 || !path_to_unix(tf).starts_with(&prefix) {
777            continue;
778        }
779        if let Some(u) = stat.newest.as_ref().and_then(|r| r.updated) {
780            max_upd = Some(match max_upd {
781                Some(cur) if cur >= u => cur,
782                _ => u,
783            });
784        }
785        let tf_unix = path_to_unix(tf);
786        let (display, description) = folder_label(&tf_unix, folder_basename(tf), folders);
787        entries.push_str(&folder_entry(&tf_unix, &display, stat.count, description));
788    }
789    let mut s = String::new();
790    s.push_str("---\n");
791    s.push_str("type: index\n");
792    s.push_str("scope: layer\n");
793    s.push_str(&format!("folder: {layer_dir}\n"));
794    if let Some(ts) = max_upd {
795        s.push_str(&format!("updated: {}\n", fmt_ts(&ts)));
796    }
797    s.push_str("---\n\n");
798    s.push_str(&format!("# {layer_dir}\n\n"));
799    s.push_str(&entries);
800    s
801}
802
803/// Render the root `index.md` from the prebuilt per-folder stat cache.
804fn render_root_md_from_stats(
805    stats: &BTreeMap<PathBuf, FolderStat>,
806    folders: &BTreeMap<String, FolderMeta>,
807) -> String {
808    let mut max_upd: Option<DateTime<FixedOffset>> = None;
809    for stat in stats.values() {
810        if stat.count == 0 {
811            continue;
812        }
813        if let Some(u) = stat.newest.as_ref().and_then(|r| r.updated) {
814            max_upd = Some(match max_upd {
815                Some(cur) if cur >= u => cur,
816                _ => u,
817            });
818        }
819    }
820    let mut s = String::new();
821    s.push_str("---\n");
822    s.push_str("type: index\n");
823    s.push_str("scope: root\n");
824    if let Some(ts) = max_upd {
825        s.push_str(&format!("updated: {}\n", fmt_ts(&ts)));
826    }
827    s.push_str("---\n\n");
828    s.push_str(&format!("# {ROOT_TITLE}\n"));
829    for layer in Layer::all() {
830        let layer_dir = layer_dir_name(layer);
831        let prefix = format!("{layer_dir}/");
832        let children: Vec<(&PathBuf, usize)> = stats
833            .iter()
834            .filter(|(tf, s)| s.count > 0 && path_to_unix(tf).starts_with(&prefix))
835            .map(|(tf, s)| (tf, s.count))
836            .collect();
837        if children.is_empty() {
838            continue;
839        }
840        let total: usize = children.iter().map(|(_, n)| *n).sum();
841        s.push('\n');
842        s.push_str(&format!("## {} ({total})\n", capitalize(layer_dir)));
843        for (tf, n) in children {
844            let tf_unix = path_to_unix(tf);
845            let (display, description) = folder_label(&tf_unix, folder_basename(tf), folders);
846            s.push_str(&folder_entry(&tf_unix, &display, n, description));
847        }
848    }
849    s
850}
851
852/// Render a layer `index.md`, reading each child's newest summary + max-updated
853/// straight from its on-disk `index.jsonl` (so the rollup matches the folder
854/// artifacts exactly, write-through and rebuild alike). The **sweep-path**
855/// renderer used by [`Index::rebuild_all`] / [`Index::write_level`] /
856/// [`Index::render_dry_run`]; the loop path uses the cache-based
857/// [`render_layer_md_from_stats`] to avoid re-reading sidecars.
858fn render_layer_md_with_store(store: &Store, idx: &Index) -> String {
859    let layer = match idx.level {
860        IndexLevel::Layer(l) => l,
861        _ => unreachable!("render_layer_md_with_store called on non-layer"),
862    };
863    let layer_dir = layer_dir_name(layer);
864    let mut max_upd: Option<DateTime<FixedOffset>> = None;
865    let mut entries = String::new();
866    for (tf, n) in &idx.child_counts {
867        let recs = read_jsonl_records(&store.root.join(tf).join("index.jsonl")).unwrap_or_default();
868        let newest = recs.first();
869        if let Some(u) = newest.and_then(|r| r.updated) {
870            max_upd = Some(match max_upd {
871                Some(cur) if cur >= u => cur,
872                _ => u,
873            });
874        }
875        let tf_unix = path_to_unix(tf);
876        let (display, description) =
877            folder_label(&tf_unix, folder_basename(tf), &store.config.folders);
878        entries.push_str(&folder_entry(&tf_unix, &display, *n, description));
879    }
880    let mut s = String::new();
881    s.push_str("---\n");
882    s.push_str("type: index\n");
883    s.push_str("scope: layer\n");
884    s.push_str(&format!("folder: {layer_dir}\n"));
885    if let Some(ts) = max_upd {
886        s.push_str(&format!("updated: {}\n", fmt_ts(&ts)));
887    }
888    s.push_str("---\n\n");
889    s.push_str(&format!("# {layer_dir}\n\n"));
890    s.push_str(&entries);
891    s
892}
893
894/// Render the root `index.md`, taking each child's max-updated from its on-disk
895/// `index.jsonl`. The **sweep-path** renderer (the loop path uses
896/// [`render_root_md_from_stats`]).
897fn render_root_md_with_store(store: &Store, idx: &Index) -> String {
898    let mut max_upd: Option<DateTime<FixedOffset>> = None;
899    for tf in idx.child_counts.keys() {
900        let recs = read_jsonl_records(&store.root.join(tf).join("index.jsonl")).unwrap_or_default();
901        if let Some(u) = recs.first().and_then(|r| r.updated) {
902            max_upd = Some(match max_upd {
903                Some(cur) if cur >= u => cur,
904                _ => u,
905            });
906        }
907    }
908    let mut s = String::new();
909    s.push_str("---\n");
910    s.push_str("type: index\n");
911    s.push_str("scope: root\n");
912    if let Some(ts) = max_upd {
913        s.push_str(&format!("updated: {}\n", fmt_ts(&ts)));
914    }
915    s.push_str("---\n\n");
916    s.push_str(&format!("# {ROOT_TITLE}\n"));
917    for layer in Layer::all() {
918        let layer_dir = layer_dir_name(layer);
919        let prefix = format!("{layer_dir}/");
920        let children: Vec<(&PathBuf, &usize)> = idx
921            .child_counts
922            .iter()
923            .filter(|(tf, _)| path_to_unix(tf).starts_with(&prefix))
924            .collect();
925        if children.is_empty() {
926            continue;
927        }
928        let total: usize = children.iter().map(|(_, n)| **n).sum();
929        s.push('\n');
930        s.push_str(&format!("## {} ({total})\n", capitalize(layer_dir)));
931        for (tf, n) in children {
932            let tf_unix = path_to_unix(tf);
933            let (display, description) =
934                folder_label(&tf_unix, folder_basename(tf), &store.config.folders);
935            s.push_str(&folder_entry(&tf_unix, &display, *n, description));
936        }
937    }
938    s
939}
940
941/// One `index.md` browse line: `- [[path]] — summary  ·  #tag #tag` (the
942/// `  ·  #…` suffix omitted when the file has no tags). The wiki-link target is
943/// the canonical **bare** store-relative path (no `.md` extension — the
944/// doctrine the writers emit and `validate` enforces via
945/// `WIKI_LINK_HAS_EXTENSION`); the jsonl `path` keeps the real on-disk name.
946fn format_md_entry(rec: &IndexRecord) -> String {
947    let path = wiki_target(&rec.path);
948    // Collapse the summary to a single line before interpolating it into the
949    // one-line browse entry. A hand-written file may legally carry a YAML block
950    // scalar (`summary: |-`) whose value spans multiple lines; rendered verbatim
951    // those embedded newlines break the line-oriented `index.md` format and can
952    // forge a standalone catalog entry (`\n- [[…|Click me]] — injected`). The
953    // CLI writers already collapse whitespace; do the same here so the spec's
954    // primary write path (agents writing files directly) can't corrupt the
955    // catalog.
956    let summary = collapse_whitespace(&rec.summary);
957    let mut line = format!("- [[{path}]] — {summary}");
958    if !rec.tags.is_empty() {
959        let tags = rec
960            .tags
961            .iter()
962            .map(|t| format!("#{t}"))
963            .collect::<Vec<_>>()
964            .join(" ");
965        line.push_str(&format!("  ·  {tags}"));
966    }
967    line
968}
969
970/// The deterministic `## More` footer for an over-cap type-folder.
971fn more_footer(total: usize, type_: &str, layer: &str) -> String {
972    format!(
973        "## More\n\nThis folder has {total} files. The {MD_CAP} most recent are listed above.\nUse `dbmd index query --type {type_} --in {layer}` for the complete catalog.\n"
974    )
975}
976
977/// Canonical total order: `updated` descending (None sorts last), ties broken
978/// by store-relative path ascending. A *total* order, so write-through and
979/// rebuild never disagree on #500 vs #501.
980fn sort_records(records: &mut [IndexRecord]) {
981    records.sort_by(record_recency_cmp);
982}
983
984impl IndexRecord {
985    /// Build the [`IndexRecord`] a freshly-rebuilt `index.jsonl` *should* hold
986    /// for the file at `abs` (catalogued under store-relative `rel`).
987    ///
988    /// This is the single canonical projection from frontmatter → sidecar
989    /// record: [`Index::build_type_folder`] uses the same path to write the
990    /// jsonl, so the validator can rebuild the expected record here and compare
991    /// it field-for-field against the committed line — covering **every**
992    /// queryable/dedup field the query path reads (`summary`, `type`, `tags`,
993    /// `links`, `created`, `updated`, and every type-specific `fields` entry
994    /// like `email` / `domain` / `company` / `amount` / `vendor`) without the
995    /// validator hand-rolling (and drifting from) the projection per field.
996    pub(crate) fn expected_from_file(abs: &Path, rel: PathBuf) -> crate::Result<IndexRecord> {
997        record_from_file(abs, rel)
998    }
999}
1000
1001/// Build an [`IndexRecord`] from a file on disk. Missing `summary` →
1002/// [`MISSING_SUMMARY`] placeholder (the index never invents a summary).
1003fn record_from_file(abs: &Path, rel: PathBuf) -> crate::Result<IndexRecord> {
1004    let mut meta = read_frontmatter(abs)?;
1005    // Records carry an effective `meta-type` in the catalog: the declared value
1006    // (already spilled into `fields` by `read_frontmatter`), or the default
1007    // `fact` when absent — so `--where meta-type=fact` sees un-annotated records.
1008    // Sources are evidence and carry no meta-type.
1009    if rel.starts_with("records") {
1010        meta.fields
1011            .entry("meta-type".to_string())
1012            .or_insert_with(|| Value::String("fact".to_string()));
1013    }
1014    Ok(IndexRecord {
1015        path: rel,
1016        type_: meta.type_.unwrap_or_default(),
1017        summary: meta.summary.unwrap_or_else(|| MISSING_SUMMARY.to_string()),
1018        tags: meta.tags,
1019        links: meta.links,
1020        created: meta.created,
1021        updated: meta.updated,
1022        fields: meta.fields,
1023    })
1024}
1025
1026/// The slice of a frontmatter this module needs.
1027struct FileMeta {
1028    type_: Option<String>,
1029    summary: Option<String>,
1030    tags: Vec<String>,
1031    links: Vec<String>,
1032    created: Option<DateTime<FixedOffset>>,
1033    updated: Option<DateTime<FixedOffset>>,
1034    fields: BTreeMap<String, Value>,
1035}
1036
1037/// Minimal frontmatter read: split the leading `---`…`---` block and parse it
1038/// as YAML, extracting the typed fields and spilling the rest into `fields`.
1039/// Self-contained (does not route through the `parser` module).
1040///
1041/// **Body bytes are never required to be UTF-8.** `sources/` is "preserved
1042/// verbatim" per the SPEC and routinely carries non-UTF-8 imports (Latin-1
1043/// emails dropped in by `rsync`/`mbsync`/`cp`); the body can hold any byte. We
1044/// read the file as raw bytes and lossily decode *only* the leading frontmatter
1045/// region, so a stray non-UTF-8 byte in the body can never abort the projection
1046/// (the old `fs::read_to_string` failed on the first such byte anywhere in the
1047/// file, taking a whole `rebuild_all` / write-through down with it). The
1048/// frontmatter itself is expected to be UTF-8; if it isn't, `U+FFFD` markers
1049/// surface in the parsed values rather than a hard abort.
1050fn read_frontmatter(abs: &Path) -> crate::Result<FileMeta> {
1051    let bytes = fs::read(abs)?;
1052    let yaml = extract_frontmatter_block_lossy(&bytes).unwrap_or_default();
1053    let map: serde_norway::Mapping = if yaml.trim().is_empty() {
1054        serde_norway::Mapping::new()
1055    } else {
1056        serde_norway::from_str(&yaml).map_err(|e| {
1057            crate::Error::Store(crate::store::StoreError::BadTypeIndex {
1058                path: abs.to_path_buf(),
1059                message: format!("frontmatter YAML: {e}"),
1060            })
1061        })?
1062    };
1063
1064    let mut type_ = None;
1065    let mut summary = None;
1066    let mut tags = Vec::new();
1067    let mut links = Vec::new();
1068    let mut created = None;
1069    let mut updated = None;
1070    let mut fields = BTreeMap::new();
1071
1072    for (k, v) in map {
1073        let key = match k.as_str() {
1074            Some(s) => s.to_string(),
1075            None => continue,
1076        };
1077        match key.as_str() {
1078            // `type` and `summary` are coerced with the SAME scalar rule the
1079            // validator applies (`validate::scalar_string`: String/Number/Bool →
1080            // string). A bare `v.as_str()` returns `None` for an unquoted numeric
1081            // or boolean scalar (`summary: 2026`, `type: true`), so the index
1082            // would write the `(no summary)` / empty-type placeholder while
1083            // `dbmd validate` reads the file as HAVING that summary/type —
1084            // yielding a permanently-unfixable `INDEX_SUMMARY_MISMATCH` (every
1085            // rebuild reproduces the same mismatched placeholder). Coercing here
1086            // keeps the writer and the validator byte-for-byte in agreement.
1087            "type" => type_ = scalar_string(&v),
1088            "summary" => summary = scalar_string(&v),
1089            "tags" => tags = yaml_string_list(&v),
1090            "links" => links = yaml_string_list(&v),
1091            "created" => created = v.as_str().and_then(parse_ts),
1092            "updated" => updated = v.as_str().and_then(parse_ts),
1093            // `path`, `type`, `summary`, `tags`, `links`, `created`, `updated`
1094            // are the reserved IndexRecord keys; everything else (including
1095            // `id`, `status`, type-specific fields) goes to `fields`.
1096            "path" => {}
1097            _ => {
1098                fields.insert(key, yaml_to_json_value(&v));
1099            }
1100        }
1101    }
1102
1103    Ok(FileMeta {
1104        type_,
1105        summary,
1106        tags,
1107        links,
1108        created,
1109        updated,
1110        fields,
1111    })
1112}
1113
1114/// A YAML scalar (`String`/`Number`/`Bool`) rendered as a string; `None` for
1115/// sequences/mappings/null. **Must stay identical to `validate::scalar_string`**
1116/// so the index writer and the validator coerce `type`/`summary` the same way
1117/// (see [`read_frontmatter`]); an unquoted `summary: 2026` becomes `"2026"` in
1118/// both, not a placeholder here and a real value there.
1119fn scalar_string(v: &serde_norway::Value) -> Option<String> {
1120    match v {
1121        serde_norway::Value::String(s) => Some(s.clone()),
1122        serde_norway::Value::Number(n) => Some(n.to_string()),
1123        serde_norway::Value::Bool(b) => Some(b.to_string()),
1124        _ => None,
1125    }
1126}
1127
1128/// Lossily decode the leading frontmatter region of a file given its raw bytes,
1129/// then pull the YAML between the opening `---` and the next `---`. Only the
1130/// frontmatter region needs to be valid UTF-8 in practice; the body may carry
1131/// arbitrary bytes (a verbatim `sources/` import). Returns `None` when the file
1132/// has no frontmatter fence at its very start.
1133fn extract_frontmatter_block_lossy(bytes: &[u8]) -> Option<String> {
1134    // Decode lossily so a non-UTF-8 body byte never aborts the read. The
1135    // frontmatter is at the very start of the file, so a lossy whole-file decode
1136    // is correct for extracting it (and cheap relative to the YAML parse). A
1137    // leading UTF-8 BOM is stripped by `extract_frontmatter_block`.
1138    let text = String::from_utf8_lossy(bytes);
1139    extract_frontmatter_block(&text)
1140}
1141
1142/// Pull the YAML between a leading `---` line and the next `---` line. Returns
1143/// `None` when the file has no frontmatter fence at its very start.
1144fn extract_frontmatter_block(text: &str) -> Option<String> {
1145    let trimmed = text.strip_prefix('\u{feff}').unwrap_or(text);
1146    let mut lines = trimmed.lines();
1147    let first = lines.next()?;
1148    if first.trim_end() != "---" {
1149        return None;
1150    }
1151    let mut block = String::new();
1152    for line in lines {
1153        if line.trim_end() == "---" {
1154            return Some(block);
1155        }
1156        block.push_str(line);
1157        block.push('\n');
1158    }
1159    None // no closing fence
1160}
1161
1162/// Read a string scalar or a sequence-of-string-scalars into a `Vec<String>`.
1163/// Wiki-link items keep their `[[…]]` form verbatim.
1164fn yaml_string_list(v: &serde_norway::Value) -> Vec<String> {
1165    match v {
1166        serde_norway::Value::String(s) => vec![s.clone()],
1167        serde_norway::Value::Sequence(seq) => seq
1168            .iter()
1169            .filter_map(yaml_string_or_wiki_link_literal)
1170            .collect(),
1171        _ => Vec::new(),
1172    }
1173}
1174
1175fn yaml_string_or_wiki_link_literal(v: &serde_norway::Value) -> Option<String> {
1176    v.as_str()
1177        .map(str::to_string)
1178        .or_else(|| unquoted_wiki_link_literal(v))
1179}
1180
1181fn yaml_to_json_value(v: &serde_norway::Value) -> Value {
1182    if let Some(link) = unquoted_wiki_link_literal(v) {
1183        return Value::String(link);
1184    }
1185    match v {
1186        serde_norway::Value::String(s) => Value::String(s.clone()),
1187        serde_norway::Value::Bool(b) => Value::Bool(*b),
1188        serde_norway::Value::Number(n) => {
1189            serde_json::to_value(n).unwrap_or_else(|_| Value::String(n.to_string()))
1190        }
1191        serde_norway::Value::Sequence(seq) => {
1192            Value::Array(seq.iter().map(yaml_to_json_value).collect())
1193        }
1194        serde_norway::Value::Mapping(_) | serde_norway::Value::Tagged(_) => {
1195            serde_json::to_value(v).unwrap_or(Value::Null)
1196        }
1197        serde_norway::Value::Null => Value::Null,
1198    }
1199}
1200
1201fn unquoted_wiki_link_literal(v: &serde_norway::Value) -> Option<String> {
1202    let serde_norway::Value::Sequence(outer) = v else {
1203        return None;
1204    };
1205    if outer.len() != 1 {
1206        return None;
1207    }
1208    let serde_norway::Value::Sequence(inner) = &outer[0] else {
1209        return None;
1210    };
1211    let [serde_norway::Value::String(target)] = inner.as_slice() else {
1212        return None;
1213    };
1214    Some(format!("[[{target}]]"))
1215}
1216
1217/// Parse an RFC3339 timestamp scalar.
1218fn parse_ts(s: &str) -> Option<DateTime<FixedOffset>> {
1219    DateTime::parse_from_rfc3339(s.trim()).ok()
1220}
1221
1222/// Render a timestamp the same way `serde_json` renders an `IndexRecord`
1223/// timestamp (RFC3339, `Z` for UTC, sub-seconds preserved) so the md
1224/// frontmatter and the jsonl agree byte-for-byte.
1225fn fmt_ts(ts: &DateTime<FixedOffset>) -> String {
1226    ts.to_rfc3339_opts(SecondsFormat::AutoSi, true)
1227}
1228
1229/// Max `updated` over an iterator of optional timestamps.
1230fn max_updated<'a>(
1231    it: impl Iterator<Item = Option<&'a DateTime<FixedOffset>>>,
1232) -> Option<DateTime<FixedOffset>> {
1233    let mut best: Option<DateTime<FixedOffset>> = None;
1234    for ts in it.flatten() {
1235        best = Some(match best {
1236            Some(cur) if cur >= *ts => cur,
1237            _ => *ts,
1238        });
1239    }
1240    best
1241}
1242
1243/// Read a type-folder's `index.jsonl` into records, applying last-write-wins by
1244/// `path` over any un-compacted lines (so a half-compacted jsonl still reads
1245/// cleanly). Missing file → empty set. Returns records in canonical order.
1246fn read_jsonl_records(jsonl: &Path) -> crate::Result<Vec<IndexRecord>> {
1247    let text = match fs::read_to_string(jsonl) {
1248        Ok(t) => t,
1249        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
1250        Err(e) => return Err(e.into()),
1251    };
1252    // Last-write-wins by path; preserve only the final occurrence.
1253    let mut by_path: BTreeMap<PathBuf, IndexRecord> = BTreeMap::new();
1254    for (i, line) in text.lines().enumerate() {
1255        if line.trim().is_empty() {
1256            continue;
1257        }
1258        let rec: IndexRecord = serde_json::from_str(line).map_err(|e| {
1259            crate::Error::Store(crate::store::StoreError::BadTypeIndex {
1260                path: jsonl.to_path_buf(),
1261                message: format!("line {}: {e}", i + 1),
1262            })
1263        })?;
1264        by_path.insert(rec.path.clone(), rec);
1265    }
1266    let mut records: Vec<IndexRecord> = by_path.into_values().collect();
1267    sort_records(&mut records);
1268    Ok(records)
1269}
1270
1271/// The minimal rollup stat a parent index needs from one type-folder's
1272/// `index.jsonl`: how many distinct files it catalogs (`count`) and the single
1273/// newest record (`newest`, the recency-sorted `.first()` — its `updated` feeds
1274/// the parent's derived `updated`, its `summary` the layer preview). Holding the
1275/// newest record alone, rather than the whole sidecar, is what keeps a rollup
1276/// recompute cheap regardless of how large the sidecar grows.
1277#[derive(Debug, Clone, Default, PartialEq)]
1278struct FolderStat {
1279    count: usize,
1280    newest: Option<IndexRecord>,
1281}
1282
1283/// Read a type-folder's `index.jsonl` ONCE and reduce it to a [`FolderStat`]:
1284/// distinct-`path` count (last-write-wins) plus the recency-newest record. A
1285/// missing sidecar is the default (`count: 0`, `newest: None`). This is the
1286/// **loop-path** rollup primitive — one streaming pass per sidecar, never the
1287/// content tree and never the 2–3× full reparse the old
1288/// `jsonl_record_count` + `read_jsonl_records` pair did. `count` is
1289/// byte-identical to [`read_jsonl_records`]`.len()` and `newest` to its
1290/// `.first()`, so a rollup built from these stats matches the from-scratch
1291/// builders byte-for-byte.
1292fn read_folder_stat(jsonl: &Path) -> crate::Result<FolderStat> {
1293    let text = match fs::read_to_string(jsonl) {
1294        Ok(t) => t,
1295        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(FolderStat::default()),
1296        Err(e) => return Err(e.into()),
1297    };
1298    // Last-write-wins by path, exactly like `read_jsonl_records`, so count and
1299    // newest are computed over the same compacted record set.
1300    let mut by_path: BTreeMap<PathBuf, IndexRecord> = BTreeMap::new();
1301    for (i, line) in text.lines().enumerate() {
1302        if line.trim().is_empty() {
1303            continue;
1304        }
1305        let rec: IndexRecord = serde_json::from_str(line).map_err(|e| {
1306            crate::Error::Store(crate::store::StoreError::BadTypeIndex {
1307                path: jsonl.to_path_buf(),
1308                message: format!("line {}: {e}", i + 1),
1309            })
1310        })?;
1311        by_path.insert(rec.path.clone(), rec);
1312    }
1313    let count = by_path.len();
1314    // The newest record is the minimum under `sort_records`' order (updated
1315    // desc, None last, ties by path asc) — i.e. what `.first()` returns. Find it
1316    // with a single min-scan instead of sorting the whole set.
1317    let newest = by_path.into_values().min_by(record_recency_cmp);
1318    Ok(FolderStat { count, newest })
1319}
1320
1321/// The total order [`sort_records`] imposes, as a comparator over two records:
1322/// `updated` descending (None last), ties broken by store-relative path
1323/// ascending. Kept in one place so `read_folder_stat`'s min-scan agrees with the
1324/// sort byte-for-byte on which record is "newest".
1325fn record_recency_cmp(a: &IndexRecord, b: &IndexRecord) -> std::cmp::Ordering {
1326    match (b.updated, a.updated) {
1327        (Some(bu), Some(au)) => bu.cmp(&au),
1328        (Some(_), None) => std::cmp::Ordering::Greater, // a is None → after b
1329        (None, Some(_)) => std::cmp::Ordering::Less,    // b is None → after a
1330        (None, None) => std::cmp::Ordering::Equal,
1331    }
1332    .then_with(|| a.path.cmp(&b.path))
1333}
1334
1335/// Per-child rollup stats for `layers`, read from each type-folder's on-disk
1336/// `index.jsonl` (one [`read_folder_stat`] pass each) rather than walked from the
1337/// content tree. The **loop-path** counterpart to the from-scratch counting in
1338/// [`Index::build_layer`] / [`Index::build_root`], reusing one read per sidecar
1339/// across BOTH the layer and root rollups. Empty folders (`count == 0`) are kept
1340/// out of the map.
1341///
1342/// NOTE on cost: this performs one read per type-folder, but each read line-parses
1343/// that folder's entire `index.jsonl`, so the total is `O(total catalogued
1344/// records)`, not `O(type-folders)` — it reads the whole catalog every call. It
1345/// avoids the content-tree walk ([`Store::walk`]), but it is NOT `O(changed)`. See
1346/// [`update_parents`] for the honest bound and the follow-up to fix it.
1347fn collect_child_stats(
1348    store: &Store,
1349    layers: &[Layer],
1350) -> crate::Result<BTreeMap<PathBuf, FolderStat>> {
1351    let mut stats = BTreeMap::new();
1352    for &layer in layers {
1353        for tf in type_folders_in_layer(store, layer) {
1354            let stat = read_folder_stat(&store.root.join(&tf).join("index.jsonl"))?;
1355            if stat.count > 0 {
1356                stats.insert(tf, stat);
1357            }
1358        }
1359    }
1360    Ok(stats)
1361}
1362
1363/// Walk a type-folder's `.md` content files, recursing through date-shards,
1364/// excluding the `index.md` artifact itself and any hidden entries.
1365fn walk_type_folder_files(folder_abs: &Path) -> Vec<PathBuf> {
1366    let mut out = Vec::new();
1367    if !folder_abs.is_dir() {
1368        return out;
1369    }
1370    for entry in walkdir::WalkDir::new(folder_abs)
1371        .into_iter()
1372        .filter_entry(|e| !is_hidden(e.file_name()))
1373        .filter_map(|e| e.ok())
1374    {
1375        if !entry.file_type().is_file() {
1376            continue;
1377        }
1378        let p = entry.path();
1379        if p.extension().and_then(|e| e.to_str()) != Some("md") {
1380            continue;
1381        }
1382        if p.file_name().and_then(|n| n.to_str()) == Some("index.md") {
1383            continue;
1384        }
1385        out.push(p.to_path_buf());
1386    }
1387    out
1388}
1389
1390/// The immediate type-folders under a layer (one directory level below the
1391/// layer dir), as store-relative paths. Hidden dirs and `log/` are skipped.
1392fn type_folders_in_layer(store: &Store, layer: Layer) -> Vec<PathBuf> {
1393    let layer_dir = store.root.join(layer_dir_name(layer));
1394    let mut out = Vec::new();
1395    let rd = match fs::read_dir(&layer_dir) {
1396        Ok(rd) => rd,
1397        Err(_) => return out,
1398    };
1399    for entry in rd.flatten() {
1400        if !entry.path().is_dir() {
1401            continue;
1402        }
1403        let name = entry.file_name();
1404        let name = match name.to_str() {
1405            Some(n) => n,
1406            None => continue,
1407        };
1408        if is_hidden(entry.file_name().as_os_str()) || name == "log" {
1409            continue;
1410        }
1411        out.push(PathBuf::from(layer_dir_name(layer)).join(name));
1412    }
1413    out.sort();
1414    out
1415}
1416
1417/// The layer a *loose* content file sits directly in: `<layer>/<file>.md` with
1418/// no type-folder between them — exactly two path components, the first a known
1419/// layer. `None` for a file inside a type-folder (`<layer>/<type>/…`, the common
1420/// case) or one outside any layer. A loose file is catalogued in the layer's own
1421/// `index.jsonl`, not a type-folder's.
1422fn loose_layer_of(file_rel: &Path) -> Option<Layer> {
1423    let mut comps = file_rel.components();
1424    let layer = layer_from_dir_name(comps.next()?.as_os_str().to_str()?)?;
1425    comps.next()?; // the file segment must exist…
1426    if comps.next().is_some() {
1427        return None; // …and be the last one (else it's inside a type-folder)
1428    }
1429    Some(layer)
1430}
1431
1432/// The `.md` content files that live directly at a layer root (loose files),
1433/// excluding `index.md` and any subdirectory (type-folders are walked
1434/// separately). Non-recursive: only the layer's immediate children.
1435fn loose_files_in_layer(store: &Store, layer: Layer) -> Vec<PathBuf> {
1436    let layer_dir = store.root.join(layer_dir_name(layer));
1437    let mut out = Vec::new();
1438    let rd = match fs::read_dir(&layer_dir) {
1439        Ok(rd) => rd,
1440        Err(_) => return out,
1441    };
1442    for entry in rd.flatten() {
1443        let p = entry.path();
1444        if !p.is_file() {
1445            continue;
1446        }
1447        if p.extension().and_then(|e| e.to_str()) != Some("md") {
1448            continue;
1449        }
1450        if is_index_artifact(&p) || is_hidden(entry.file_name().as_os_str()) {
1451            continue;
1452        }
1453        out.push(p);
1454    }
1455    out
1456}
1457
1458/// Write (or remove, when empty) a layer's own `index.jsonl` — the complete twin
1459/// for the loose files that live directly at the layer root. The single funnel
1460/// both write-through (`on_write`/`on_remove`/`on_rename`) and the sweeps
1461/// (`rebuild_all`/`write_level`) go through, so their output is byte-identical.
1462fn write_layer_jsonl(store: &Store, layer: Layer, records: &[IndexRecord]) -> crate::Result<()> {
1463    let path = store.root.join(layer_dir_name(layer)).join("index.jsonl");
1464    if records.is_empty() {
1465        remove_if_exists(&path)?;
1466        return Ok(());
1467    }
1468    let idx = Index {
1469        level: IndexLevel::Layer(layer),
1470        records: records.to_vec(),
1471        child_counts: BTreeMap::new(),
1472    };
1473    write_atomic(&path, idx.to_jsonl())
1474}
1475
1476/// Upsert (`removing` = false) or remove (`removing` = true) a loose file's row
1477/// in its layer `index.jsonl`, serialising the read-modify-write under a folder
1478/// lock (same discipline as the type-folder write-through). The layer `index.md`
1479/// rollup is untouched — loose files do not change type-folder counts.
1480fn apply_loose_change(
1481    store: &Store,
1482    layer: Layer,
1483    file_rel: &Path,
1484    removing: bool,
1485) -> crate::Result<()> {
1486    let layer_dir = store.root.join(layer_dir_name(layer));
1487    let _lock = FolderLock::acquire(&layer_dir);
1488    let jsonl = layer_dir.join("index.jsonl");
1489    let mut records = read_jsonl_records(&jsonl)?;
1490    records.retain(|r| r.path != file_rel);
1491    if !removing {
1492        records.push(record_from_file(
1493            &store.root.join(file_rel),
1494            file_rel.to_path_buf(),
1495        )?);
1496    }
1497    sort_records(&mut records);
1498    write_layer_jsonl(store, layer, &records)
1499}
1500
1501/// The type-folder a content file belongs to: `<layer>/<type>` (the first two
1502/// path components), or `None` if the path is not under a known layer with at
1503/// least a type segment.
1504fn type_folder_of(file_rel: &Path) -> Option<PathBuf> {
1505    let mut comps = file_rel.components();
1506    let layer = comps.next()?.as_os_str().to_str()?;
1507    layer_from_dir_name(layer)?;
1508    let type_seg = comps.next()?.as_os_str().to_str()?;
1509    Some(PathBuf::from(layer).join(type_seg))
1510}
1511
1512/// Convert an absolute path under `root` to a store-relative path.
1513fn rel_to_store(root: &Path, abs: &Path) -> Option<PathBuf> {
1514    abs.strip_prefix(root).ok().map(|p| p.to_path_buf())
1515}
1516
1517/// Normalize a possibly-absolute or `./`-prefixed path to a clean
1518/// store-relative form (drops a leading `./`; leaves already-relative paths).
1519fn normalize_rel(p: &Path) -> PathBuf {
1520    let s = path_to_unix(p);
1521    let s = s.strip_prefix("./").unwrap_or(&s);
1522    PathBuf::from(s)
1523}
1524
1525fn is_index_artifact(p: &Path) -> bool {
1526    matches!(
1527        p.file_name().and_then(|n| n.to_str()),
1528        Some("index.md") | Some("index.jsonl")
1529    )
1530}
1531
1532/// True when a file named `index.md` / `index.jsonl` is safe for [`Index::cleanup`]
1533/// to delete — i.e. it is a generated catalog artifact (or a stale/garbage
1534/// leftover from a previous build), NOT a user content file that merely happens
1535/// to be named `index.md`.
1536///
1537/// - `index.jsonl` is always a machine artifact (content files are `.md`), so it
1538///   is always deletable.
1539/// - `index.md` is deletable UNLESS it parses as a content file — frontmatter
1540///   whose `type` is some real record type (anything other than `index`). A
1541///   generated catalog carries `type: index`; a user record carries its own type
1542///   (`email`, `note`, …) and must be preserved (deleting it is silent,
1543///   unrecoverable data loss). A leftover with no/garbage frontmatter (e.g. a
1544///   bare `stale\n`) is treated as a deletable stale artifact.
1545fn is_deletable_catalog_artifact(p: &Path) -> bool {
1546    match p.file_name().and_then(|n| n.to_str()) {
1547        Some("index.jsonl") => true,
1548        Some("index.md") => match read_frontmatter(p) {
1549            // Real content file (non-`index` type) → preserve, never delete.
1550            Ok(meta) => meta.type_.as_deref().is_none_or(|t| t == "index"),
1551            // Unreadable / no frontmatter → a stale or garbage artifact, deletable.
1552            Err(_) => true,
1553        },
1554        _ => false,
1555    }
1556}
1557
1558fn is_hidden(name: &std::ffi::OsStr) -> bool {
1559    name.to_str().map(|s| s.starts_with('.')).unwrap_or(false)
1560}
1561
1562fn layer_dir_name(layer: Layer) -> &'static str {
1563    match layer {
1564        Layer::Sources => "sources",
1565        Layer::Records => "records",
1566    }
1567}
1568
1569/// Local layer-name parse. Mirrors the contract of [`Layer::from_dir_name`];
1570/// kept local to keep this module's walk self-contained (see the module header).
1571fn layer_from_dir_name(name: &str) -> Option<Layer> {
1572    match name {
1573        "sources" => Some(Layer::Sources),
1574        "records" => Some(Layer::Records),
1575        _ => None,
1576    }
1577}
1578
1579/// The final path component as a `&str` (folder basename).
1580fn folder_basename(p: &Path) -> &str {
1581    p.file_name().and_then(|n| n.to_str()).unwrap_or("")
1582}
1583
1584/// The canonical wiki-link target for a content path: the store-relative path
1585/// with `/` separators and the trailing `.md` stripped (the bare form the
1586/// `index.md` browse view links to).
1587fn wiki_target(p: &Path) -> String {
1588    let unix = path_to_unix(p);
1589    unix.strip_suffix(".md").unwrap_or(&unix).to_string()
1590}
1591
1592/// Render a path with `/` separators regardless of host OS, so artifacts are
1593/// identical on every platform.
1594///
1595/// A non-UTF-8 path component (reachable on Linux/ext4, db.md's primary
1596/// deployment target, where `sources/` files arrive verbatim from Latin-1
1597/// exports) is decoded **lossily** with `U+FFFD` markers rather than silently
1598/// dropped. The old `filter_map(|c| c.as_os_str().to_str())` dropped any bad
1599/// component entirely, so `sources/emails/caf\xe9.md` serialized as
1600/// `sources/emails` — a path pointing at the *directory*, not the file, that
1601/// also collapsed distinct files onto one `index.jsonl` key. Lossy decoding
1602/// keeps the leaf present and visibly marked.
1603fn path_to_unix(p: &Path) -> String {
1604    p.components()
1605        .map(|c| c.as_os_str().to_string_lossy().into_owned())
1606        .collect::<Vec<_>>()
1607        .join("/")
1608}
1609
1610/// Serde for [`IndexRecord::path`]: always forward-slash on the wire, so the
1611/// `index.jsonl` catalog is identical whether the store was written on POSIX or
1612/// Windows (a git clone across OSes yields the same paths, and the last-write-
1613/// wins upsert key never splits on separator style). On POSIX this matches the
1614/// default `PathBuf` serialization; on Windows it rewrites `\` to `/`.
1615mod path_serde {
1616    use super::path_to_unix;
1617    use serde::{Deserialize, Deserializer, Serializer};
1618    use std::path::{Path, PathBuf};
1619
1620    pub fn serialize<S: Serializer>(p: &Path, s: S) -> Result<S::Ok, S::Error> {
1621        s.serialize_str(&path_to_unix(p))
1622    }
1623
1624    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<PathBuf, D::Error> {
1625        Ok(PathBuf::from(String::deserialize(d)?))
1626    }
1627}
1628
1629/// ASCII-capitalize the first character.
1630fn capitalize(s: &str) -> String {
1631    let mut chars = s.chars();
1632    match chars.next() {
1633        Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(),
1634        None => String::new(),
1635    }
1636}
1637
1638/// Collapse all runs of whitespace (including newlines) into single spaces and
1639/// trim the ends — the single-line normalization the `index.md` browse entry
1640/// ([`format_md_entry`]) applies so a multi-line block-scalar summary can never
1641/// inject a newline into a catalog line.
1642fn collapse_whitespace(s: &str) -> String {
1643    s.split_whitespace().collect::<Vec<_>>().join(" ")
1644}
1645
1646/// Derive a folder's display name from its basename: separators (`-`, `_`)
1647/// become spaces and the first character is upper-cased (`hubspot-exports` →
1648/// `Hubspot exports`). A deterministic floor — the curator overrides it via
1649/// `DB.md ## Folders` (`records/x|HubSpot exports`) for casing the tool cannot
1650/// guess. The tool tidies a folder's *name*; it never infers its *meaning*.
1651fn default_display(basename: &str) -> String {
1652    let spaced: String = basename
1653        .chars()
1654        .map(|c| if c == '-' || c == '_' { ' ' } else { c })
1655        .collect();
1656    capitalize(&spaced)
1657}
1658
1659/// The display name + optional description a root/layer rollup shows for a child
1660/// type-folder: the curator's `## Folders` metadata when present, else the
1661/// derived display name and **no description**. This is the whole anti-"tool
1662/// invents the curator's judgment" contract for the rollups — a description is
1663/// surfaced only when the agent authored one; it is never composed from the
1664/// folder's newest member or any other content.
1665fn folder_label<'a>(
1666    tf_unix: &str,
1667    basename: &str,
1668    folders: &'a BTreeMap<String, FolderMeta>,
1669) -> (String, Option<&'a str>) {
1670    let meta = folders.get(tf_unix);
1671    let display = meta
1672        .and_then(|m| m.display.as_deref())
1673        .map(str::to_string)
1674        .unwrap_or_else(|| default_display(basename));
1675    (display, meta.and_then(|m| m.description.as_deref()))
1676}
1677
1678/// One root/layer rollup entry: `- [[<tf>/index|<Display>]] (<count>)` with an
1679/// ` — <description>` suffix only when the curator authored one.
1680fn folder_entry(tf_unix: &str, display: &str, count: usize, description: Option<&str>) -> String {
1681    match description {
1682        Some(d) => format!("- [[{tf_unix}/index|{display}]] ({count}) — {d}\n"),
1683        None => format!("- [[{tf_unix}/index|{display}]] ({count})\n"),
1684    }
1685}
1686
1687/// Atomic (rename-based) write for the **derived** catalog (`index.md` /
1688/// `index.jsonl`). Deliberately NOT `fsync`-durable like [`crate::fsx`]: the
1689/// index is rebuildable (`dbmd index rebuild`) and this is the O(changed)
1690/// write-through path, so a per-write `fsync` would be cost without benefit — a
1691/// crash-lost catalog write is recovered by a rebuild, not data loss. (Primary
1692/// data — content records, `log.md` — uses the durable `crate::fsx` path.)
1693fn write_atomic(path: &Path, contents: String) -> crate::Result<()> {
1694    if let Some(parent) = path.parent() {
1695        fs::create_dir_all(parent)?;
1696    }
1697    let dir = path.parent().unwrap_or_else(|| Path::new("."));
1698    let mut tmp = tempfile_in(dir)?;
1699    tmp.write_all(contents.as_bytes())?;
1700    tmp.flush()?;
1701    tmp.persist(path)?;
1702    Ok(())
1703}
1704
1705fn remove_if_exists(path: &Path) -> crate::Result<()> {
1706    match fs::remove_file(path) {
1707        Ok(()) => Ok(()),
1708        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
1709        Err(e) => Err(e.into()),
1710    }
1711}
1712
1713fn bad_index(path: &Path, msg: &str) -> crate::Error {
1714    crate::Error::Store(crate::store::StoreError::BadTypeIndex {
1715        path: path.to_path_buf(),
1716        message: msg.to_string(),
1717    })
1718}
1719
1720/// Per-type-folder advisory lock for the write-through sidecar read-modify-write.
1721///
1722/// The write-through update of a folder's `index.jsonl`/`index.md` is a
1723/// read-snapshot → modify → atomic-rename-over-whole-file sequence. The SPEC
1724/// sanctions many-writer concurrency for `records/` (`dbmd write` is
1725/// `create_new`-race-safe for the *content* file), but two concurrent writers to
1726/// the SAME type-folder would each read the same sidecar snapshot, add only their
1727/// own row, and rename their whole file over the other's — a classic lost update,
1728/// dropping most rows until a manual `dbmd index rebuild`. This lock serializes
1729/// the per-folder RMW (the content file is already serialized by `create_new`),
1730/// so concurrent sanctioned writes each see the other's row.
1731///
1732/// Implementation: a hidden `<type-folder>/.index.lock` acquired via `create_new`
1733/// (the same O_EXCL primitive `cmd/write.rs` uses), bounded-spin with a small
1734/// sleep, and stale-lock breaking by mtime age so a crashed writer can't wedge
1735/// the folder forever. The dotfile name keeps it out of the content walk
1736/// (`walk_type_folder_files` skips hidden) and out of `cleanup`
1737/// (`is_index_artifact` only matches `index.md`/`index.jsonl`). RAII: the lock is
1738/// released (file removed) on drop, including on the error paths.
1739struct FolderLock {
1740    path: PathBuf,
1741    held: bool,
1742}
1743
1744impl FolderLock {
1745    /// Acquire the lock for `folder_abs`. Waits until it either takes the lock or
1746    /// breaks a genuinely-stale one (a crashed writer's leftover, older than the
1747    /// staleness window). It does **not** give up after a fixed budget and
1748    /// proceed unlocked under contention.
1749    ///
1750    /// Why no contention budget: a single legitimate write can hold this lock for
1751    /// several seconds — `on_write`/`on_remove`/`on_rename` hold it across the
1752    /// whole body, and `update_parents` recomputes the rollups in
1753    /// `O(total catalogued records)`. A short give-up budget (the old ~6s) would
1754    /// expire while a LIVE writer still held the lock, and the loser would then
1755    /// run the sidecar read-modify-write with no mutual exclusion — both writers
1756    /// read the same `index.jsonl` snapshot, each adds only its own row, and one
1757    /// overwrites the other, silently dropping a catalogued record (the lost
1758    /// update this lock exists to prevent; surfaced only by a full
1759    /// `validate --all` as `INDEX_JSONL_DESYNC`). So a live holder is always
1760    /// waited out, never raced. Forward progress is still bounded against a
1761    /// *dead* holder: a lockfile older than `STALE_AFTER` is broken.
1762    ///
1763    /// Residual limitation (documented, follow-up): a single legitimate hold
1764    /// longer than `STALE_AFTER` could be mistaken for a crash and broken. That
1765    /// needs a pathological store (an `update_parents` rollup exceeding the
1766    /// window — itself the flagged `O(total)` hot-path cost). The complete fix is
1767    /// a holder heartbeat that refreshes the lockfile mtime during long ops; not
1768    /// done inline to keep this change surgical. Only a genuine non-contention
1769    /// error (e.g. a permission failure creating the lockfile) degrades to
1770    /// proceeding unlocked — never contention.
1771    fn acquire(folder_abs: &Path) -> Self {
1772        use std::time::{Duration, SystemTime};
1773        const SPIN: Duration = Duration::from_millis(10);
1774        const STALE_AFTER: Duration = Duration::from_secs(30);
1775
1776        let path = folder_abs.join(".index.lock");
1777        // Ensure the folder exists so the lockfile create can succeed.
1778        let _ = fs::create_dir_all(folder_abs);
1779        loop {
1780            match fs::OpenOptions::new()
1781                .write(true)
1782                .create_new(true)
1783                .open(&path)
1784            {
1785                Ok(_) => return FolderLock { path, held: true },
1786                Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
1787                    // Break a stale lock left by a crashed writer; otherwise wait
1788                    // for the live holder to release. NEVER proceed unlocked here.
1789                    let stale = fs::metadata(&path)
1790                        .and_then(|m| m.modified())
1791                        .ok()
1792                        .and_then(|t| SystemTime::now().duration_since(t).ok())
1793                        .map(|age| age > STALE_AFTER)
1794                        .unwrap_or(false);
1795                    if stale {
1796                        let _ = fs::remove_file(&path);
1797                        continue;
1798                    }
1799                    std::thread::sleep(SPIN);
1800                }
1801                // A non-contention error (permissions, read-only fs): we cannot
1802                // lock here at all, so proceed unlocked rather than fail a
1803                // sanctioned write — the prior best-effort behavior, but ONLY for
1804                // hard errors, never for contention.
1805                Err(_) => return FolderLock { path, held: false },
1806            }
1807        }
1808    }
1809}
1810
1811impl Drop for FolderLock {
1812    fn drop(&mut self) {
1813        if self.held {
1814            let _ = fs::remove_file(&self.path);
1815        }
1816    }
1817}
1818
1819/// Acquire the write-through lock for one or two type-folders. When `a == b`
1820/// (same-folder rename) only one lock is taken. For two distinct folders the
1821/// locks are always acquired in sorted order so a pair of concurrent renames
1822/// touching the same two folders can't deadlock by grabbing them in opposite
1823/// orders. Returns the guard(s); drop releases them.
1824fn lock_folders(store: &Store, a: &Path, b: &Path) -> Vec<FolderLock> {
1825    if a == b {
1826        return vec![FolderLock::acquire(&store.root.join(a))];
1827    }
1828    let (first, second) = if a < b { (a, b) } else { (b, a) };
1829    vec![
1830        FolderLock::acquire(&store.root.join(first)),
1831        FolderLock::acquire(&store.root.join(second)),
1832    ]
1833}
1834
1835// A tiny atomic-write helper. `tempfile` is a dev-dependency for tests; for
1836// the library path we hand-roll a temp-file-then-rename so writes are atomic
1837// without pulling `tempfile` into the non-dev dependency set. The file handle
1838// is held in an `Option` so `persist` can take it out without fighting the
1839// `Drop` impl (which only cleans up an un-persisted temp file).
1840struct AtomicTemp {
1841    file: Option<fs::File>,
1842    path: PathBuf,
1843    persisted: bool,
1844}
1845
1846impl AtomicTemp {
1847    fn write_all(&mut self, bytes: &[u8]) -> std::io::Result<()> {
1848        self.file.as_mut().expect("temp file open").write_all(bytes)
1849    }
1850    fn flush(&mut self) -> std::io::Result<()> {
1851        self.file.as_mut().expect("temp file open").flush()
1852    }
1853    fn persist(mut self, dest: &Path) -> std::io::Result<()> {
1854        if let Some(f) = self.file.take() {
1855            f.sync_all().ok();
1856            // `f` dropped here, closing the handle before the rename.
1857        }
1858        fs::rename(&self.path, dest)?;
1859        self.persisted = true;
1860        Ok(())
1861    }
1862}
1863
1864impl Drop for AtomicTemp {
1865    fn drop(&mut self) {
1866        // Best-effort cleanup if not persisted (an error path bailed out).
1867        if !self.persisted {
1868            let _ = fs::remove_file(&self.path);
1869        }
1870    }
1871}
1872
1873fn tempfile_in(dir: &Path) -> std::io::Result<AtomicTemp> {
1874    use std::time::{SystemTime, UNIX_EPOCH};
1875    let nanos = SystemTime::now()
1876        .duration_since(UNIX_EPOCH)
1877        .map(|d| d.as_nanos())
1878        .unwrap_or(0);
1879    let pid = std::process::id();
1880    // Monotonic-ish unique suffix; the dir is the destination dir so rename is
1881    // same-filesystem and therefore atomic.
1882    let counter = next_temp_counter();
1883    let name = format!(".dbmd-index-{pid}-{nanos}-{counter}.tmp");
1884    let path = dir.join(name);
1885    let file = fs::OpenOptions::new()
1886        .write(true)
1887        .create_new(true)
1888        .open(&path)?;
1889    Ok(AtomicTemp {
1890        file: Some(file),
1891        path,
1892        persisted: false,
1893    })
1894}
1895
1896fn next_temp_counter() -> u64 {
1897    use std::sync::atomic::{AtomicU64, Ordering};
1898    static C: AtomicU64 = AtomicU64::new(0);
1899    C.fetch_add(1, Ordering::Relaxed)
1900}
1901
1902#[cfg(test)]
1903mod tests {
1904    use super::*;
1905    use std::collections::BTreeSet;
1906    use std::fs;
1907    use tempfile::TempDir;
1908
1909    // ── fixtures ─────────────────────────────────────────────────────────
1910
1911    /// A temp store with a `DB.md` marker. `store.config` is the parser default
1912    /// (these tests never exercise the config parser).
1913    fn mk_store() -> (TempDir, Store) {
1914        let dir = TempDir::new().unwrap();
1915        fs::write(dir.path().join("DB.md"), "# test store\n").unwrap();
1916        let store = Store {
1917            root: dir.path().to_path_buf(),
1918            config: crate::parser::Config::default(),
1919        };
1920        (dir, store)
1921    }
1922
1923    /// Write a content file at `rel` with the given frontmatter lines + body.
1924    /// `fm` is the raw YAML body between the fences (no `---`).
1925    fn write_raw(store: &Store, rel: &str, fm: &str, body: &str) {
1926        let abs = store.root.join(rel);
1927        fs::create_dir_all(abs.parent().unwrap()).unwrap();
1928        fs::write(&abs, format!("---\n{fm}\n---\n{body}")).unwrap();
1929    }
1930
1931    /// Convenience: write a typed content file with summary/updated/extras.
1932    fn write_doc(
1933        store: &Store,
1934        rel: &str,
1935        type_: &str,
1936        summary: Option<&str>,
1937        updated: Option<&str>,
1938        extra_yaml: &str,
1939    ) {
1940        let mut fm = format!("type: {type_}\n");
1941        if let Some(s) = summary {
1942            fm.push_str(&format!("summary: {s}\n"));
1943        }
1944        if let Some(u) = updated {
1945            fm.push_str(&format!("updated: {u}\n"));
1946        }
1947        fm.push_str(extra_yaml);
1948        write_raw(store, rel, fm.trim_end(), "\nbody text\n");
1949    }
1950
1951    fn read(store: &Store, rel: &str) -> String {
1952        fs::read_to_string(store.root.join(rel)).unwrap()
1953    }
1954
1955    fn exists(store: &Store, rel: &str) -> bool {
1956        store.root.join(rel).exists()
1957    }
1958
1959    /// Collect every `index.md` + `index.jsonl` under the store, mapped to its
1960    /// bytes — the surface the byte-identity invariant compares.
1961    fn snapshot_artifacts(store: &Store) -> BTreeMap<String, String> {
1962        let mut out = BTreeMap::new();
1963        for entry in walkdir::WalkDir::new(&store.root)
1964            .into_iter()
1965            .filter_map(|e| e.ok())
1966        {
1967            let p = entry.path();
1968            if is_index_artifact(p) {
1969                let rel = path_to_unix(&rel_to_store(&store.root, p).unwrap());
1970                out.insert(rel, fs::read_to_string(p).unwrap());
1971            }
1972        }
1973        out
1974    }
1975
1976    // ── build_type_folder + to_markdown ──────────────────────────────────
1977
1978    #[test]
1979    fn type_folder_aggregates_across_shards_in_recency_order() {
1980        let (_d, store) = mk_store();
1981        // Three emails across two month-shards, deliberately written
1982        // out-of-recency-order on disk.
1983        write_doc(
1984            &store,
1985            "sources/emails/2026/05/b-old.md",
1986            "email",
1987            Some("Older mail"),
1988            Some("2026-05-01T09:00:00Z"),
1989            "",
1990        );
1991        write_doc(
1992            &store,
1993            "sources/emails/2026/06/c-new.md",
1994            "email",
1995            Some("Newest mail"),
1996            Some("2026-06-15T12:00:00Z"),
1997            "",
1998        );
1999        write_doc(
2000            &store,
2001            "sources/emails/2026/05/a-mid.md",
2002            "email",
2003            Some("Middle mail"),
2004            Some("2026-05-20T08:00:00Z"),
2005            "",
2006        );
2007
2008        let idx = Index::build_type_folder(&store, Path::new("sources/emails")).unwrap();
2009        let paths: Vec<String> = idx.records.iter().map(|r| path_to_unix(&r.path)).collect();
2010        assert_eq!(
2011            paths,
2012            vec![
2013                "sources/emails/2026/06/c-new.md",
2014                "sources/emails/2026/05/a-mid.md",
2015                "sources/emails/2026/05/b-old.md",
2016            ],
2017            "records must aggregate across shards, newest `updated` first"
2018        );
2019    }
2020
2021    #[test]
2022    fn type_folder_md_format_entries_tags_and_derived_updated() {
2023        let (_d, store) = mk_store();
2024        write_doc(
2025            &store,
2026            "records/contacts/sarah-chen.md",
2027            "contact",
2028            Some("Renewal champion at Acme"),
2029            Some("2026-05-27T10:00:00Z"),
2030            "tags:\n  - renewal\n  - acme\n",
2031        );
2032        write_doc(
2033            &store,
2034            "records/contacts/no-tags.md",
2035            "contact",
2036            Some("Plain contact"),
2037            Some("2026-05-26T10:00:00Z"),
2038            "",
2039        );
2040
2041        let idx = Index::build_type_folder(&store, Path::new("records/contacts")).unwrap();
2042        let md = idx.to_markdown();
2043
2044        // Frontmatter is exact and the index's own `updated` is the MAX member
2045        // updated (the determinism the byte-identity invariant rests on).
2046        assert!(md.starts_with(
2047            "---\ntype: index\nscope: type-folder\nfolder: records/contacts\nupdated: 2026-05-27T10:00:00Z\n---\n\n# records/contacts\n"
2048        ), "frontmatter/heading wrong:\n{md}");
2049
2050        // Entry with tags: `— summary  ·  #tag #tag`.
2051        assert!(
2052            md.contains(
2053                "- [[records/contacts/sarah-chen]] — Renewal champion at Acme  ·  #renewal #acme\n"
2054            ),
2055            "tagged entry wrong:\n{md}"
2056        );
2057        // Entry without tags omits the `  ·  ` suffix entirely.
2058        assert!(
2059            md.contains("- [[records/contacts/no-tags]] — Plain contact\n"),
2060            "untagged entry wrong:\n{md}"
2061        );
2062        assert!(
2063            !md.contains("Plain contact  ·"),
2064            "untagged entry must not emit a tag separator"
2065        );
2066        // No `## More` below the cap.
2067        assert!(!md.contains("## More"), "no footer expected under the cap");
2068    }
2069
2070    #[test]
2071    fn missing_summary_becomes_placeholder_not_invented() {
2072        let (_d, store) = mk_store();
2073        write_doc(
2074            &store,
2075            "records/notes/x.md",
2076            "note",
2077            None,
2078            Some("2026-05-27T10:00:00Z"),
2079            "",
2080        );
2081        let idx = Index::build_type_folder(&store, Path::new("records/notes")).unwrap();
2082        assert_eq!(idx.records[0].summary, MISSING_SUMMARY);
2083        let md = idx.to_markdown();
2084        assert!(
2085            md.contains("- [[records/notes/x]] — (no summary)\n"),
2086            "missing summary must render the placeholder, not invent text:\n{md}"
2087        );
2088    }
2089
2090    // ── to_jsonl ─────────────────────────────────────────────────────────
2091
2092    #[test]
2093    fn jsonl_is_complete_structured_and_round_trips() {
2094        let (_d, store) = mk_store();
2095        write_doc(
2096            &store,
2097            "records/expenses/2026/05/e1.md",
2098            "expense",
2099            Some("Lunch with vendor"),
2100            Some("2026-05-10T10:00:00Z"),
2101            "created: 2026-05-10T09:00:00Z\nstatus: paid\namount: 42\ncompany: [[records/companies/acme]]\nrelated:\n  - [[records/concepts/spend]]\ntags:\n  - food\nlinks:\n  - records/concepts/spend\n  - [[records/concepts/renewal]]\n",
2102        );
2103        write_doc(
2104            &store,
2105            "records/expenses/2026/06/e2.md",
2106            "expense",
2107            Some("Cloud bill"),
2108            Some("2026-06-01T10:00:00Z"),
2109            "amount: 100\n",
2110        );
2111
2112        let idx = Index::build_type_folder(&store, Path::new("records/expenses")).unwrap();
2113        let jsonl = idx.to_jsonl();
2114        let lines: Vec<&str> = jsonl.lines().collect();
2115        assert_eq!(lines.len(), 2, "one JSON object per file, uncapped");
2116
2117        // Newest first (e2), and each line parses back to an equal record.
2118        let r0: IndexRecord = serde_json::from_str(lines[0]).unwrap();
2119        assert_eq!(path_to_unix(&r0.path), "records/expenses/2026/06/e2.md");
2120        assert_eq!(
2121            r0, idx.records[0],
2122            "jsonl line must round-trip to the record"
2123        );
2124
2125        // The first (data) record carries every reserved field + the extras in
2126        // `fields` (status/amount), and links/tags verbatim.
2127        let r1: IndexRecord = serde_json::from_str(lines[1]).unwrap();
2128        assert_eq!(r1.type_, "expense");
2129        assert_eq!(r1.summary, "Lunch with vendor");
2130        assert_eq!(r1.tags, vec!["food".to_string()]);
2131        assert_eq!(
2132            r1.links,
2133            vec![
2134                "records/concepts/spend".to_string(),
2135                "[[records/concepts/renewal]]".to_string()
2136            ]
2137        );
2138        assert_eq!(
2139            r1.created,
2140            Some(DateTime::parse_from_rfc3339("2026-05-10T09:00:00Z").unwrap())
2141        );
2142        assert_eq!(r1.fields.get("status"), Some(&Value::from("paid")));
2143        assert_eq!(r1.fields.get("amount"), Some(&Value::from(42)));
2144        assert_eq!(
2145            r1.fields.get("company"),
2146            Some(&Value::from("[[records/companies/acme]]"))
2147        );
2148        assert_eq!(
2149            r1.fields.get("related"),
2150            Some(&serde_json::json!(["[[records/concepts/spend]]"]))
2151        );
2152        // Reserved keys never leak into `fields`.
2153        for reserved in [
2154            "path", "type", "summary", "tags", "links", "created", "updated",
2155        ] {
2156            assert!(
2157                !r1.fields.contains_key(reserved),
2158                "reserved key {reserved} must not appear in fields"
2159            );
2160        }
2161
2162        // Stable key order: declared fields first, then sorted extras.
2163        assert!(
2164            lines[1].starts_with(
2165                r#"{"path":"records/expenses/2026/05/e1.md","type":"expense","summary":"Lunch with vendor","tags":["food"],"links":["records/concepts/spend","[[records/concepts/renewal]]"],"created":"2026-05-10T09:00:00Z","updated":"2026-05-10T10:00:00Z","#
2166            ),
2167            "jsonl key order not stable:\n{}",
2168            lines[1]
2169        );
2170        // The flattened extras come in BTreeMap (sorted) order. The catalog
2171        // injects `meta-type: fact` into every records-layer file that does not
2172        // declare one, so it appears among the sorted extras (between `company`
2173        // and `related`).
2174        assert!(
2175            lines[1].ends_with(r#""amount":42,"company":"[[records/companies/acme]]","meta-type":"fact","related":["[[records/concepts/spend]]"],"status":"paid"}"#),
2176            "extras must be sorted:\n{}",
2177            lines[1]
2178        );
2179    }
2180
2181    // ── cap + footer ─────────────────────────────────────────────────────
2182
2183    #[test]
2184    fn over_cap_md_shows_500_plus_footer_jsonl_holds_all() {
2185        let (_d, store) = mk_store();
2186        let total = MD_CAP + 7;
2187        for i in 0..total {
2188            // Distinct, monotonically increasing `updated` so order is total.
2189            let day = 1 + (i % 27);
2190            let rel = format!("sources/emails/2026/05/m-{i:04}.md");
2191            let updated = format!("2026-05-{day:02}T00:00:{:02}Z", i % 60);
2192            write_doc(
2193                &store,
2194                &rel,
2195                "email",
2196                Some(&format!("mail {i}")),
2197                Some(&updated),
2198                "",
2199            );
2200        }
2201        let idx = Index::build_type_folder(&store, Path::new("sources/emails")).unwrap();
2202        assert_eq!(idx.records.len(), total, "jsonl/records keep every file");
2203
2204        let md = idx.to_markdown();
2205        let entry_lines = md.lines().filter(|l| l.starts_with("- [[")).count();
2206        assert_eq!(entry_lines, MD_CAP, "md browse view is capped at 500");
2207
2208        assert!(
2209            md.contains("## More\n\n"),
2210            "over-cap md needs a More footer"
2211        );
2212        assert!(
2213            md.contains(&format!(
2214                "This folder has {total} files. The 500 most recent are listed above.\n"
2215            )),
2216            "footer count wrong:\n{md}"
2217        );
2218        assert!(
2219            md.contains(
2220                "Use `dbmd index query --type email --in sources` for the complete catalog.\n"
2221            ),
2222            "footer must infer type=email layer=sources:\n{md}"
2223        );
2224
2225        let jsonl = idx.to_jsonl();
2226        assert_eq!(jsonl.lines().count(), total, "jsonl is uncapped");
2227    }
2228
2229    // ── sort total order ─────────────────────────────────────────────────
2230
2231    #[test]
2232    fn sort_breaks_ties_by_path_and_puts_undated_last() {
2233        let mut recs = vec![
2234            rec("z/a.md", Some("2026-05-01T00:00:00Z")),
2235            rec("a/b.md", Some("2026-05-01T00:00:00Z")), // same updated, path < z/a
2236            rec("m/c.md", None),                         // undated → last
2237            rec("b/d.md", Some("2026-06-01T00:00:00Z")), // newest
2238        ];
2239        sort_records(&mut recs);
2240        let order: Vec<String> = recs.iter().map(|r| path_to_unix(&r.path)).collect();
2241        assert_eq!(order, vec!["b/d.md", "a/b.md", "z/a.md", "m/c.md"]);
2242    }
2243
2244    fn rec(path: &str, updated: Option<&str>) -> IndexRecord {
2245        IndexRecord {
2246            path: PathBuf::from(path),
2247            type_: "t".into(),
2248            summary: "s".into(),
2249            tags: vec![],
2250            links: vec![],
2251            created: None,
2252            updated: updated.map(|u| DateTime::parse_from_rfc3339(u).unwrap()),
2253            fields: BTreeMap::new(),
2254        }
2255    }
2256
2257    // ── build_layer / build_root ─────────────────────────────────────────
2258
2259    #[test]
2260    fn layer_index_lists_type_folders_with_counts() {
2261        let (_d, store) = mk_store();
2262        write_doc(
2263            &store,
2264            "records/contacts/a.md",
2265            "contact",
2266            Some("Contact A older"),
2267            Some("2026-05-01T00:00:00Z"),
2268            "",
2269        );
2270        write_doc(
2271            &store,
2272            "records/contacts/b.md",
2273            "contact",
2274            Some("Contact B newest"),
2275            Some("2026-05-09T00:00:00Z"),
2276            "",
2277        );
2278        write_doc(
2279            &store,
2280            "records/companies/x.md",
2281            "company",
2282            Some("Acme Inc"),
2283            Some("2026-05-05T00:00:00Z"),
2284            "",
2285        );
2286        // build the type-folder artifacts first (layer preview reads their jsonl)
2287        Index::write_level(&store, &IndexLevel::TypeFolder("records/contacts".into())).unwrap();
2288        Index::write_level(&store, &IndexLevel::TypeFolder("records/companies".into())).unwrap();
2289
2290        Index::write_level(&store, &IndexLevel::Layer(Layer::Records)).unwrap();
2291        let md = read(&store, "records/index.md");
2292
2293        assert!(
2294            md.starts_with("---\ntype: index\nscope: layer\nfolder: records\n"),
2295            "layer fm:\n{md}"
2296        );
2297        // Alphabetical type-folder order: companies before contacts.
2298        let companies_at = md.find("companies/index").unwrap();
2299        let contacts_at = md.find("contacts/index").unwrap();
2300        assert!(
2301            companies_at < contacts_at,
2302            "type folders must be alphabetical"
2303        );
2304        // Count + display only — with no `## Folders`, the rollup never invents
2305        // a per-folder description from a member summary.
2306        assert!(
2307            md.contains("- [[records/contacts/index|Contacts]] (2)\n"),
2308            "contacts entry:\n{md}"
2309        );
2310        assert!(
2311            md.contains("- [[records/companies/index|Companies]] (1)\n"),
2312            "companies entry:\n{md}"
2313        );
2314        // Crucially: no member summary leaked into the rollup as a description.
2315        assert!(
2316            !md.contains("Contact B newest") && !md.contains("Acme Inc"),
2317            "layer rollup must not quote a member summary:\n{md}"
2318        );
2319        // Layer `updated` is the max across children (contacts b = 05-09).
2320        assert!(
2321            md.contains("updated: 2026-05-09T00:00:00Z\n"),
2322            "layer updated must be max child:\n{md}"
2323        );
2324    }
2325
2326    #[test]
2327    fn folders_section_supplies_authored_display_and_description() {
2328        // The aligned contract: rollups surface the curator's `## Folders`
2329        // display + description; the tool never invents one. A folder with no
2330        // entry shows counts only — no member summary leaks in as a description.
2331        let (_d, mut store) = mk_store();
2332        store.config.folders.insert(
2333            "records/contacts".into(),
2334            crate::parser::FolderMeta {
2335                display: None,
2336                description: Some("people across customer + prospect accounts".into()),
2337            },
2338        );
2339        store.config.folders.insert(
2340            "sources/hubspot-exports".into(),
2341            crate::parser::FolderMeta {
2342                display: Some("HubSpot exports".into()),
2343                description: Some("deal + pipeline exports".into()),
2344            },
2345        );
2346        write_doc(
2347            &store,
2348            "records/contacts/a.md",
2349            "contact",
2350            Some("Contact A"),
2351            Some("2026-05-01T00:00:00Z"),
2352            "",
2353        );
2354        // companies has NO `## Folders` entry → counts only.
2355        write_doc(
2356            &store,
2357            "records/companies/x.md",
2358            "company",
2359            Some("Acme Inc"),
2360            Some("2026-05-05T00:00:00Z"),
2361            "",
2362        );
2363        write_doc(
2364            &store,
2365            "sources/hubspot-exports/d.md",
2366            "hubspot-export",
2367            Some("a single deal export"),
2368            Some("2026-05-03T00:00:00Z"),
2369            "",
2370        );
2371
2372        Index::rebuild_all(&store).unwrap();
2373
2374        // Authored description surfaced (contacts), with the derived display.
2375        let records_layer = read(&store, "records/index.md");
2376        assert!(
2377            records_layer.contains("- [[records/contacts/index|Contacts]] (1) — people across customer + prospect accounts\n"),
2378            "authored description must surface:\n{records_layer}"
2379        );
2380        // No `## Folders` entry ⇒ counts only; the member summary never leaks in.
2381        assert!(
2382            records_layer.contains("- [[records/companies/index|Companies]] (1)\n")
2383                && !records_layer.contains("Acme Inc"),
2384            "un-described folder is counts-only:\n{records_layer}"
2385        );
2386
2387        // Display override beats the derived "Hubspot exports".
2388        let sources_layer = read(&store, "sources/index.md");
2389        assert!(
2390            sources_layer.contains("- [[sources/hubspot-exports/index|HubSpot exports]] (1) — deal + pipeline exports\n"),
2391            "display override + description must surface:\n{sources_layer}"
2392        );
2393
2394        // Root rollup carries the same authored metadata (display + description).
2395        let root = read(&store, "index.md");
2396        assert!(
2397            root.contains("- [[records/contacts/index|Contacts]] (1) — people across customer + prospect accounts\n"),
2398            "root surfaces authored description:\n{root}"
2399        );
2400        assert!(
2401            root.contains("- [[sources/hubspot-exports/index|HubSpot exports]] (1) — deal + pipeline exports\n"),
2402            "root surfaces display override:\n{root}"
2403        );
2404    }
2405
2406    #[test]
2407    fn default_display_turns_separators_to_spaces_and_caps() {
2408        assert_eq!(default_display("contacts"), "Contacts");
2409        assert_eq!(default_display("hubspot-exports"), "Hubspot exports");
2410        assert_eq!(default_display("usage_exports"), "Usage exports");
2411    }
2412
2413    #[test]
2414    fn root_index_groups_layers_with_totals_and_per_type_counts() {
2415        let (_d, store) = mk_store();
2416        write_doc(
2417            &store,
2418            "sources/emails/2026/05/a.md",
2419            "email",
2420            Some("Mail"),
2421            Some("2026-05-01T00:00:00Z"),
2422            "",
2423        );
2424        write_doc(
2425            &store,
2426            "sources/docs/d.md",
2427            "doc",
2428            Some("Doc"),
2429            Some("2026-05-02T00:00:00Z"),
2430            "",
2431        );
2432        write_doc(
2433            &store,
2434            "records/contacts/c.md",
2435            "contact",
2436            Some("C"),
2437            Some("2026-05-03T00:00:00Z"),
2438            "",
2439        );
2440        // wiki empty → no Wiki section
2441
2442        Index::rebuild_all(&store).unwrap();
2443        let md = read(&store, "index.md");
2444
2445        assert!(
2446            md.starts_with("---\ntype: index\nscope: root\n"),
2447            "root fm:\n{md}"
2448        );
2449        assert!(md.contains("# Knowledge base index\n"), "root title:\n{md}");
2450        // Layer heading with total count; Sources before Records (canonical).
2451        let sources_h = md
2452            .find("## Sources (2)")
2453            .expect("sources heading w/ total 2");
2454        let records_h = md
2455            .find("## Records (1)")
2456            .expect("records heading w/ total 1");
2457        assert!(sources_h < records_h, "Sources must precede Records");
2458        assert!(!md.contains("## Wiki"), "empty layer gets no section");
2459        // Per-type sub-entries with (N), no preview at root.
2460        assert!(
2461            md.contains("- [[sources/docs/index|Docs]] (1)\n"),
2462            "root docs entry:\n{md}"
2463        );
2464        assert!(
2465            md.contains("- [[sources/emails/index|Emails]] (1)\n"),
2466            "root emails entry:\n{md}"
2467        );
2468        assert!(
2469            md.contains("- [[records/contacts/index|Contacts]] (1)\n"),
2470            "root contacts entry:\n{md}"
2471        );
2472        assert!(!md.contains("— "), "root entries carry no preview text");
2473    }
2474
2475    // ── write-through == rebuild (THE invariant) ─────────────────────────
2476
2477    #[test]
2478    fn on_write_matches_rebuild_byte_for_byte() {
2479        // Build a store incrementally via on_write, and a second identical store
2480        // via a single rebuild_all, then assert every index artifact is equal.
2481        let (_d1, wt) = mk_store();
2482        let (_d2, rb) = mk_store();
2483
2484        let docs: &[(&str, &str, &str, &str, &str)] = &[
2485            (
2486                "sources/emails/2026/05/e1.md",
2487                "email",
2488                "First mail",
2489                "2026-05-01T10:00:00Z",
2490                "tags:\n  - inbox\n",
2491            ),
2492            (
2493                "sources/emails/2026/06/e2.md",
2494                "email",
2495                "Second mail",
2496                "2026-06-01T10:00:00Z",
2497                "",
2498            ),
2499            (
2500                "records/contacts/sarah.md",
2501                "contact",
2502                "Sarah",
2503                "2026-05-15T10:00:00Z",
2504                "links:\n  - records/profiles/sarah\n",
2505            ),
2506            (
2507                "records/contacts/elena.md",
2508                "contact",
2509                "Elena",
2510                "2026-05-20T10:00:00Z",
2511                "status: active\n",
2512            ),
2513            (
2514                "records/profiles/sarah.md",
2515                "profile",
2516                "Sarah bio",
2517                "2026-05-21T10:00:00Z",
2518                "",
2519            ),
2520        ];
2521
2522        for (rel, t, sum, upd, extra) in docs {
2523            write_doc(&wt, rel, t, Some(sum), Some(upd), extra);
2524            write_doc(&rb, rel, t, Some(sum), Some(upd), extra);
2525            Index::on_write(&wt, Path::new(rel)).unwrap();
2526        }
2527        Index::rebuild_all(&rb).unwrap();
2528
2529        let a = snapshot_artifacts(&wt);
2530        let b = snapshot_artifacts(&rb);
2531        assert_eq!(
2532            a.keys().collect::<Vec<_>>(),
2533            b.keys().collect::<Vec<_>>(),
2534            "same set of index artifacts must exist"
2535        );
2536        for (k, v) in &a {
2537            assert_eq!(v, &b[k], "artifact {k} differs between write-through and rebuild:\n--- write-through ---\n{v}\n--- rebuild ---\n{}", b[k]);
2538        }
2539        // Sanity: artifacts actually exist (not a vacuous comparison of empties).
2540        assert!(a.contains_key("index.md"));
2541        assert!(a.contains_key("sources/emails/index.jsonl"));
2542        assert!(a.contains_key("records/contacts/index.md"));
2543    }
2544
2545    /// Regression (O(changed) bound, not just correctness): a loop op must
2546    /// recompute its parent rollups from the type-folder `index.jsonl` sidecars
2547    /// — never by walking the content tree of *sibling* folders it wasn't asked
2548    /// about. The byte-identity property test (which always indexes every folder
2549    /// before comparing) can't catch a violation, because a full-store walk
2550    /// produces the *correct* counts too; it just does so in `O(store files)`.
2551    ///
2552    /// The behavioral fingerprint of the old `update_parents → build_layer /
2553    /// build_root` (which called `walk_type_folder_files` on every type-folder in
2554    /// the store): a single `on_write` to `records/contacts/sarah.md` would
2555    /// surface, in the layer + root rollups, the file count of
2556    /// `records/companies` — a sibling that has content on disk but was NEVER
2557    /// passed to a write/index op, so it has no `index.jsonl`. An O(changed) loop
2558    /// op cannot "see" that un-indexed folder; a whole-store walk can. So this
2559    /// asserts the rollups reflect ONLY the sidecar-indexed folder, proving no
2560    /// content-tree walk happened.
2561    #[test]
2562    fn loop_op_does_not_walk_sibling_content_tree() {
2563        let (_d, store) = mk_store();
2564
2565        // A sibling type-folder with real content on disk, but deliberately
2566        // never indexed (no on_write / write_level / rebuild over it) ⇒ no
2567        // `records/companies/index.jsonl` exists.
2568        write_doc(
2569            &store,
2570            "records/companies/acme.md",
2571            "company",
2572            Some("Acme Inc"),
2573            Some("2026-05-05T00:00:00Z"),
2574            "",
2575        );
2576        write_doc(
2577            &store,
2578            "records/companies/globex.md",
2579            "company",
2580            Some("Globex"),
2581            Some("2026-05-06T00:00:00Z"),
2582            "",
2583        );
2584        assert!(
2585            !exists(&store, "records/companies/index.jsonl"),
2586            "precondition: companies must be un-indexed"
2587        );
2588
2589        // The ONLY loop op: a single write to a different type-folder.
2590        write_doc(
2591            &store,
2592            "records/contacts/sarah.md",
2593            "contact",
2594            Some("Sarah"),
2595            Some("2026-05-15T00:00:00Z"),
2596            "",
2597        );
2598        Index::on_write(&store, Path::new("records/contacts/sarah.md")).unwrap();
2599
2600        // The written folder is reflected in both rollups...
2601        let layer_md = read(&store, "records/index.md");
2602        let root_md = read(&store, "index.md");
2603        // (both rollups show counts only — no `## Folders` here, so no preview)
2604        assert!(
2605            layer_md.contains("- [[records/contacts/index|Contacts]] (1)\n")
2606                && !layer_md.contains("Sarah"),
2607            "layer must reflect the written folder, counts only:\n{layer_md}"
2608        );
2609        assert!(
2610            root_md.contains("- [[records/contacts/index|Contacts]] (1)\n"),
2611            "root must reflect the written folder:\n{root_md}"
2612        );
2613
2614        // ...but the un-indexed sibling must be INVISIBLE to a loop op. If the
2615        // rollups mention `records/companies` at all, `on_write` walked the whole
2616        // content tree — the O(store) regression.
2617        assert!(
2618            !layer_md.contains("companies"),
2619            "loop op walked the sibling content tree: layer rollup counts un-indexed records/companies\n{layer_md}"
2620        );
2621        assert!(
2622            !root_md.contains("companies"),
2623            "loop op walked the sibling content tree: root rollup counts un-indexed records/companies\n{root_md}"
2624        );
2625        // The layer's only child is contacts ⇒ its total is exactly 1, not 3.
2626        assert!(
2627            root_md.contains("## Records (1)"),
2628            "root layer total must count only the sidecar-indexed folder (1), not walked siblings (would be 3):\n{root_md}"
2629        );
2630
2631        // And the sidecar-derived count IS what a full walk WOULD yield once the
2632        // sibling is indexed too — i.e. the fix changes cost, not the eventual
2633        // result. Index companies, then confirm the rollups now (and only now)
2634        // include it, byte-identical to a from-scratch rebuild.
2635        let (_d2, rb) = mk_store();
2636        for (rel, t, s, u) in [
2637            (
2638                "records/companies/acme.md",
2639                "company",
2640                "Acme Inc",
2641                "2026-05-05T00:00:00Z",
2642            ),
2643            (
2644                "records/companies/globex.md",
2645                "company",
2646                "Globex",
2647                "2026-05-06T00:00:00Z",
2648            ),
2649            (
2650                "records/contacts/sarah.md",
2651                "contact",
2652                "Sarah",
2653                "2026-05-15T00:00:00Z",
2654            ),
2655        ] {
2656            write_doc(&rb, rel, t, Some(s), Some(u), "");
2657        }
2658        Index::on_write(&store, Path::new("records/companies/acme.md")).unwrap();
2659        Index::on_write(&store, Path::new("records/companies/globex.md")).unwrap();
2660        Index::rebuild_all(&rb).unwrap();
2661        let a = snapshot_artifacts(&store);
2662        let b = snapshot_artifacts(&rb);
2663        assert_eq!(
2664            a.keys().collect::<BTreeSet<_>>(),
2665            b.keys().collect::<BTreeSet<_>>(),
2666            "same artifact set after indexing both folders"
2667        );
2668        for (k, v) in &a {
2669            assert_eq!(
2670                v, &b[k],
2671                "after indexing the sibling too, loop result must equal rebuild for {k}"
2672            );
2673        }
2674        assert!(
2675            read(&store, "index.md").contains("## Records (3)"),
2676            "now that both folders are indexed, the root total is 3"
2677        );
2678    }
2679
2680    /// Regression: a type filed at the path the toolkit ITSELF computes
2681    /// (`Store::shard_path_for`) must be indexable end-to-end. The class of bug
2682    /// is a 2-component `<layer>/<file>` path, which `type_folder_of` treats as
2683    /// having no type-folder — making the producer (path computation) disagree
2684    /// with the consumer (index): the loop path crashes (`on_write` → `Err`, it
2685    /// tries to write `index.md` *inside* a file) while the sweep path silently
2686    /// drops the page from every catalog. A conclusion `profile` is a custom
2687    /// (non-built-in) type, so `shard_path_for` files it under the records-layer
2688    /// fallback `records/profile/<file>` — a conforming 3-component path. This test
2689    /// drives both paths through the real `shard_path_for` output and asserts
2690    /// (1) `on_write` succeeds, (2) the page appears in the rebuilt catalog, and
2691    /// (3) write-through == rebuild.
2692    #[test]
2693    fn custom_type_at_shard_path_for_is_indexable_end_to_end() {
2694        let (_d1, wt) = mk_store();
2695        let (_d2, rb) = mk_store();
2696
2697        // The toolkit's own canonical write path for a custom-type record.
2698        let rel = wt
2699            .shard_path_for(
2700                "profile",
2701                &crate::parser::Frontmatter::default(),
2702                "renewal-theme",
2703            )
2704            .unwrap();
2705        let rel_str = path_to_unix(&rel);
2706        // Guard the precondition the consumer requires: 3+ components so
2707        // `type_folder_of` resolves a real `<layer>/<type-folder>`.
2708        assert!(
2709            type_folder_of(&rel).is_some(),
2710            "shard_path_for produced a path the index cannot file: {rel_str}"
2711        );
2712
2713        write_doc(
2714            &wt,
2715            &rel_str,
2716            "profile",
2717            Some("Renewal theme"),
2718            Some("2026-05-21T10:00:00Z"),
2719            "",
2720        );
2721        write_doc(
2722            &rb,
2723            &rel_str,
2724            "profile",
2725            Some("Renewal theme"),
2726            Some("2026-05-21T10:00:00Z"),
2727            "",
2728        );
2729
2730        // (1) Loop path must NOT error (a 2-component `<layer>/<file>` shape
2731        // returned Err(Io(NotADirectory))).
2732        Index::on_write(&wt, &rel)
2733            .expect("on_write must succeed for a toolkit-computed custom-type path");
2734        Index::rebuild_all(&rb).unwrap();
2735
2736        // (2) The page is present in the rebuilt catalog (the old flat-path bug
2737        // silently omitted it from every artifact). The individual page link
2738        // lives in the *type-folder* index; the *layer* index rolls the
2739        // type-folder up — assert both, since the bug erased both. A custom
2740        // type's canonical folder is the records-layer fallback `records/profile`.
2741        let page_link = wiki_target(&rel); // records/profile/renewal-theme
2742        let tf_md = read(&rb, "records/profile/index.md");
2743        assert!(
2744            tf_md.contains(&format!("[[{page_link}]]")),
2745            "type-folder index must list the page link, got:\n{tf_md}"
2746        );
2747        assert!(
2748            exists(&rb, "records/profile/index.jsonl"),
2749            "type-folder jsonl must exist"
2750        );
2751        assert!(
2752            read(&rb, "records/profile/index.jsonl").contains(&rel_str),
2753            "type-folder jsonl must contain the page row"
2754        );
2755        // The layer index rolls the type-folder up (proves the page's folder is
2756        // visible to the layer catalog, not dropped).
2757        let layer_md = read(&rb, "records/index.md");
2758        assert!(
2759            layer_md.contains("records/profile/index"),
2760            "layer index must roll up the records/profile type-folder, got:\n{layer_md}"
2761        );
2762
2763        // (3) Write-through equals rebuild byte-for-byte — loop and sweep agree.
2764        let a = snapshot_artifacts(&wt);
2765        let b = snapshot_artifacts(&rb);
2766        assert_eq!(
2767            a.keys().collect::<Vec<_>>(),
2768            b.keys().collect::<Vec<_>>(),
2769            "loop and sweep must produce the same artifact set"
2770        );
2771        for (k, v) in &a {
2772            assert_eq!(
2773                v, &b[k],
2774                "custom-type artifact {k} differs between on_write and rebuild"
2775            );
2776        }
2777    }
2778
2779    #[test]
2780    fn on_remove_then_rebuild_match_and_pull_in_next_over_cap() {
2781        let (_d1, wt) = mk_store();
2782        let (_d2, rb) = mk_store();
2783        let total = MD_CAP + 3; // 503 files; removing one keeps md full at 500
2784        let mut all_rels = Vec::new();
2785        for i in 0..total {
2786            let rel = format!("sources/emails/2026/05/m-{i:04}.md");
2787            // `updated` strictly increasing across i by varying both minute and second
2788            let updated = format!("2026-05-10T00:{:02}:{:02}Z", i / 60, i % 60);
2789            write_doc(
2790                &wt,
2791                &rel,
2792                "email",
2793                Some(&format!("mail {i}")),
2794                Some(&updated),
2795                "",
2796            );
2797            write_doc(
2798                &rb,
2799                &rel,
2800                "email",
2801                Some(&format!("mail {i}")),
2802                Some(&updated),
2803                "",
2804            );
2805            all_rels.push(rel);
2806        }
2807        // Build write-through index, then remove the single newest file.
2808        Index::rebuild_all(&wt).unwrap();
2809        let newest = &all_rels[total - 1]; // highest i = newest updated
2810        fs::remove_file(wt.root.join(newest)).unwrap();
2811        Index::on_remove(&wt, Path::new(newest)).unwrap();
2812
2813        // Rebuild side: same end state (file physically absent).
2814        fs::remove_file(rb.root.join(newest)).unwrap();
2815        Index::rebuild_all(&rb).unwrap();
2816
2817        let a = snapshot_artifacts(&wt);
2818        let b = snapshot_artifacts(&rb);
2819        for (k, v) in &a {
2820            assert_eq!(v, &b[k], "after remove, artifact {k} drifted from rebuild");
2821        }
2822
2823        // The md must still hold exactly 500 entries (the 501st got pulled in)
2824        // and the removed file must be gone from both artifacts.
2825        let md = read(&wt, "sources/emails/index.md");
2826        assert_eq!(md.lines().filter(|l| l.starts_with("- [[")).count(), MD_CAP);
2827        // Removed (newest) file is gone from the bare-path md and the .md jsonl.
2828        assert!(
2829            !md.contains(&format!("[[{}]]", wiki_target(Path::new(newest)))),
2830            "removed file must not be listed in md"
2831        );
2832        // The file previously at rank 501 (excluded under the cap) is `all_rels[2]`
2833        // — `updated` increases with index, so newest-first rank 500 = index 2.
2834        // After dropping the newest it shifts into the visible 500.
2835        let pulled_in = &all_rels[2];
2836        assert!(
2837            md.contains(&format!("[[{}]]", wiki_target(Path::new(pulled_in)))),
2838            "the 501st-most-recent must be pulled into the browse view after a removal"
2839        );
2840        assert!(
2841            md.contains(&format!("This folder has {} files.", total - 1)),
2842            "footer count must decrement:\n{}",
2843            md.lines().rev().take(4).collect::<Vec<_>>().join("\n")
2844        );
2845        let jsonl = read(&wt, "sources/emails/index.jsonl");
2846        assert_eq!(
2847            jsonl.lines().count(),
2848            total - 1,
2849            "jsonl loses exactly the removed file"
2850        );
2851        assert!(
2852            !jsonl.contains(&path_to_unix(Path::new(newest))),
2853            "removed file must be gone from the jsonl too"
2854        );
2855    }
2856
2857    #[test]
2858    fn on_rename_cross_folder_matches_rebuild() {
2859        let (_d1, wt) = mk_store();
2860        let (_d2, rb) = mk_store();
2861        // Seed both stores identically.
2862        let seed: &[(&str, &str, &str, &str)] = &[
2863            (
2864                "records/contacts/a.md",
2865                "contact",
2866                "A",
2867                "2026-05-01T00:00:00Z",
2868            ),
2869            (
2870                "records/contacts/b.md",
2871                "contact",
2872                "B",
2873                "2026-05-02T00:00:00Z",
2874            ),
2875            (
2876                "records/companies/x.md",
2877                "company",
2878                "X",
2879                "2026-05-03T00:00:00Z",
2880            ),
2881        ];
2882        for (rel, t, s, u) in seed {
2883            write_doc(&wt, rel, t, Some(s), Some(u), "");
2884            write_doc(&rb, rel, t, Some(s), Some(u), "");
2885        }
2886        Index::rebuild_all(&wt).unwrap();
2887
2888        // Rename contacts/b.md -> companies/b.md (cross type-folder). The file's
2889        // `type` changes to match its new folder, as a real `dbmd rename` would.
2890        let old = "records/contacts/b.md";
2891        let new = "records/companies/b.md";
2892        fs::create_dir_all(wt.root.join("records/companies")).unwrap();
2893        fs::rename(wt.root.join(old), wt.root.join(new)).unwrap();
2894        // (type stays "contact" here; index copies frontmatter verbatim — the
2895        // test only asserts placement + parity with rebuild.)
2896        Index::on_rename(&wt, Path::new(old), Path::new(new)).unwrap();
2897
2898        // Rebuild side: same end state.
2899        fs::create_dir_all(rb.root.join("records/companies")).unwrap();
2900        fs::rename(rb.root.join(old), rb.root.join(new)).unwrap();
2901        Index::rebuild_all(&rb).unwrap();
2902
2903        let a = snapshot_artifacts(&wt);
2904        let b = snapshot_artifacts(&rb);
2905        assert_eq!(a.keys().collect::<Vec<_>>(), b.keys().collect::<Vec<_>>());
2906        for (k, v) in &a {
2907            assert_eq!(v, &b[k], "rename: artifact {k} drifted from rebuild");
2908        }
2909        // Concretely: b is gone from contacts, present in companies.
2910        let contacts = read(&wt, "records/contacts/index.md");
2911        assert!(!contacts.contains("records/contacts/b]]"));
2912        let companies = read(&wt, "records/companies/index.md");
2913        assert!(companies.contains("[[records/companies/b]]"));
2914    }
2915
2916    #[test]
2917    fn on_write_updates_existing_entry_in_place() {
2918        let (_d, store) = mk_store();
2919        write_doc(
2920            &store,
2921            "records/contacts/a.md",
2922            "contact",
2923            Some("Original"),
2924            Some("2026-05-01T00:00:00Z"),
2925            "",
2926        );
2927        Index::on_write(&store, Path::new("records/contacts/a.md")).unwrap();
2928        // Edit the same file: new summary + newer updated.
2929        write_doc(
2930            &store,
2931            "records/contacts/a.md",
2932            "contact",
2933            Some("Revised"),
2934            Some("2026-05-09T00:00:00Z"),
2935            "",
2936        );
2937        Index::on_write(&store, Path::new("records/contacts/a.md")).unwrap();
2938
2939        let jsonl = read(&store, "records/contacts/index.jsonl");
2940        assert_eq!(
2941            jsonl.lines().count(),
2942            1,
2943            "upsert must not duplicate the line"
2944        );
2945        assert!(jsonl.contains("Revised"), "jsonl must reflect the update");
2946        assert!(
2947            !jsonl.contains("Original"),
2948            "stale line must be gone (compacted)"
2949        );
2950        let md = read(&store, "records/contacts/index.md");
2951        assert!(md.contains("- [[records/contacts/a]] — Revised\n"));
2952        assert!(
2953            md.contains("updated: 2026-05-09T00:00:00Z\n"),
2954            "index updated must track the newer member"
2955        );
2956    }
2957
2958    // ── dry-run + cleanup ────────────────────────────────────────────────
2959
2960    #[test]
2961    fn dry_run_emits_separators_and_writes_nothing() {
2962        let (_d, store) = mk_store();
2963        write_doc(
2964            &store,
2965            "sources/emails/2026/05/a.md",
2966            "email",
2967            Some("Mail"),
2968            Some("2026-05-01T00:00:00Z"),
2969            "",
2970        );
2971        let out = Index::render_dry_run(&store, &IndexLevel::TypeFolder("sources/emails".into()))
2972            .unwrap();
2973        assert!(
2974            out.contains("--- sources/emails/index.md ---\n"),
2975            "md separator:\n{out}"
2976        );
2977        assert!(
2978            out.contains("--- sources/emails/index.jsonl ---\n"),
2979            "jsonl separator:\n{out}"
2980        );
2981        assert!(
2982            out.contains("- [[sources/emails/2026/05/a]] — Mail"),
2983            "md body present"
2984        );
2985        // Nothing was written to disk.
2986        assert!(
2987            !exists(&store, "sources/emails/index.md"),
2988            "dry-run must not write"
2989        );
2990        assert!(
2991            !exists(&store, "sources/emails/index.jsonl"),
2992            "dry-run must not write"
2993        );
2994    }
2995
2996    #[test]
2997    fn cleanup_removes_noncanonical_and_empty_indexes() {
2998        let (_d, store) = mk_store();
2999        write_doc(
3000            &store,
3001            "sources/emails/2026/05/a.md",
3002            "email",
3003            Some("Mail"),
3004            Some("2026-05-01T00:00:00Z"),
3005            "",
3006        );
3007        // A stray index inside a date-shard (non-canonical) ...
3008        fs::write(
3009            store.root.join("sources/emails/2026/05/index.md"),
3010            "stale\n",
3011        )
3012        .unwrap();
3013        fs::write(
3014            store.root.join("sources/emails/2026/05/index.jsonl"),
3015            "stale\n",
3016        )
3017        .unwrap();
3018        // ... and an index in an empty type-folder.
3019        fs::create_dir_all(store.root.join("records/empty")).unwrap();
3020        fs::write(store.root.join("records/empty/index.md"), "stale\n").unwrap();
3021
3022        Index::cleanup(&store).unwrap();
3023
3024        assert!(
3025            !exists(&store, "sources/emails/2026/05/index.md"),
3026            "shard index must be deleted"
3027        );
3028        assert!(
3029            !exists(&store, "sources/emails/2026/05/index.jsonl"),
3030            "shard jsonl must be deleted"
3031        );
3032        assert!(
3033            !exists(&store, "records/empty/index.md"),
3034            "empty-folder index must be deleted"
3035        );
3036        // The canonical type-folder file itself is untouched by cleanup.
3037        assert!(exists(&store, "sources/emails/2026/05/a.md"));
3038    }
3039
3040    #[test]
3041    fn rebuild_deletes_stale_indexes_for_emptied_folders() {
3042        let (_d, store) = mk_store();
3043        write_doc(
3044            &store,
3045            "records/contacts/a.md",
3046            "contact",
3047            Some("A"),
3048            Some("2026-05-01T00:00:00Z"),
3049            "",
3050        );
3051        Index::rebuild_all(&store).unwrap();
3052        assert!(exists(&store, "records/contacts/index.md"));
3053        assert!(exists(&store, "records/index.md"));
3054        assert!(exists(&store, "index.md"));
3055
3056        // Empty the folder entirely, then rebuild: all three levels vanish.
3057        fs::remove_file(store.root.join("records/contacts/a.md")).unwrap();
3058        Index::rebuild_all(&store).unwrap();
3059        assert!(
3060            !exists(&store, "records/contacts/index.md"),
3061            "emptied type-folder index gone"
3062        );
3063        assert!(
3064            !exists(&store, "records/index.md"),
3065            "now-empty layer index gone"
3066        );
3067        assert!(!exists(&store, "index.md"), "now-empty root index gone");
3068    }
3069
3070    // ── randomized parity (property-style) ───────────────────────────────
3071
3072    #[test]
3073    fn property_writethrough_equals_rebuild_under_mixed_ops() {
3074        // Deterministic pseudo-random op sequence (no rand crate): a small LCG.
3075        let (_d1, wt) = mk_store();
3076        let (_d2, rb) = mk_store();
3077        let mut seed: u64 = 0x9E3779B97F4A7C15;
3078        let mut next = || {
3079            seed = seed
3080                .wrapping_mul(6364136223846793005)
3081                .wrapping_add(1442695040888963407);
3082            (seed >> 33) as u32
3083        };
3084
3085        let folders = ["sources/emails", "records/contacts", "records/profiles"];
3086        let types = ["email", "contact", "profile"];
3087        let mut live: Vec<String> = Vec::new(); // store-relative paths that exist
3088
3089        for step in 0..120u32 {
3090            let r = next();
3091            let op = r % 10;
3092            if op < 6 || live.is_empty() {
3093                // CREATE/UPDATE
3094                let fi = (next() as usize) % folders.len();
3095                let folder = folders[fi];
3096                let id = next() % 40;
3097                let rel = if folder == "sources/emails" {
3098                    let month = 5 + (id % 2); // shard across two months
3099                    format!("{folder}/2026/{month:02}/f-{id:02}.md")
3100                } else {
3101                    format!("{folder}/f-{id:02}.md")
3102                };
3103                // recency varies with step so order is meaningful + total
3104                let updated = format!(
3105                    "2026-05-{:02}T{:02}:{:02}:00Z",
3106                    1 + (step % 27),
3107                    step % 24,
3108                    id % 60
3109                );
3110                let extra = if id % 3 == 0 {
3111                    "tags:\n  - x\n  - y\n"
3112                } else {
3113                    ""
3114                };
3115                write_doc(
3116                    &wt,
3117                    &rel,
3118                    types[fi],
3119                    Some(&format!("sum {step}")),
3120                    Some(&updated),
3121                    extra,
3122                );
3123                write_doc(
3124                    &rb,
3125                    &rel,
3126                    types[fi],
3127                    Some(&format!("sum {step}")),
3128                    Some(&updated),
3129                    extra,
3130                );
3131                Index::on_write(&wt, Path::new(&rel)).unwrap();
3132                if !live.contains(&rel) {
3133                    live.push(rel);
3134                }
3135            } else if op < 8 {
3136                // REMOVE a live file
3137                let idx = (next() as usize) % live.len();
3138                let rel = live.remove(idx);
3139                fs::remove_file(wt.root.join(&rel)).unwrap();
3140                fs::remove_file(rb.root.join(&rel)).ok();
3141                Index::on_remove(&wt, Path::new(&rel)).unwrap();
3142            } else {
3143                // RENAME a live file within the same layer (new id, maybe new type-folder)
3144                let idx = (next() as usize) % live.len();
3145                let old = live[idx].clone();
3146                // pick a destination folder in the same layer-ish set
3147                let fi = (next() as usize) % folders.len();
3148                let folder = folders[fi];
3149                let id = 50 + (next() % 40);
3150                let new = if folder == "sources/emails" {
3151                    format!("{folder}/2026/05/f-{id:02}.md")
3152                } else {
3153                    format!("{folder}/f-{id:02}.md")
3154                };
3155                if new == old || live.contains(&new) {
3156                    continue;
3157                }
3158                fs::create_dir_all(wt.root.join(&new).parent().unwrap()).unwrap();
3159                fs::create_dir_all(rb.root.join(&new).parent().unwrap()).unwrap();
3160                fs::rename(wt.root.join(&old), wt.root.join(&new)).unwrap();
3161                fs::rename(rb.root.join(&old), rb.root.join(&new)).unwrap();
3162                Index::on_rename(&wt, Path::new(&old), Path::new(&new)).unwrap();
3163                live[idx] = new;
3164            }
3165        }
3166
3167        // Now rebuild the rb side from the shared end state and compare.
3168        Index::rebuild_all(&rb).unwrap();
3169        let a = snapshot_artifacts(&wt);
3170        let b = snapshot_artifacts(&rb);
3171        assert_eq!(
3172            a.keys().collect::<BTreeSet<_>>(),
3173            b.keys().collect::<BTreeSet<_>>(),
3174            "write-through and rebuild must produce the same set of artifacts"
3175        );
3176        for (k, v) in &a {
3177            assert_eq!(
3178                v, &b[k],
3179                "INVARIANT VIOLATED: artifact {k} differs after mixed ops\n--- write-through ---\n{v}\n--- rebuild ---\n{}",
3180                b[k]
3181            );
3182        }
3183        assert!(
3184            !a.is_empty(),
3185            "the run must have produced at least one artifact"
3186        );
3187    }
3188
3189    // ── regressions: cleanup must not delete user content ─────────────────
3190
3191    /// CRITICAL regression: a user content file named `index.md` inside a date
3192    /// shard (e.g. from a website/doc-export mirror) must SURVIVE `cleanup` /
3193    /// `rebuild_all`. The old filename-only match silently deleted it.
3194    #[test]
3195    fn cleanup_preserves_user_content_named_index_md_in_shard() {
3196        let (_d, store) = mk_store();
3197        // A real content record that merely happens to be named index.md.
3198        write_doc(
3199            &store,
3200            "sources/emails/2026/06/index.md",
3201            "email",
3202            Some("Important imported mail"),
3203            Some("2026-06-11T04:23:25Z"),
3204            "",
3205        );
3206        Index::cleanup(&store).unwrap();
3207        assert!(
3208            exists(&store, "sources/emails/2026/06/index.md"),
3209            "cleanup must not delete a user content file named index.md"
3210        );
3211        // A full rebuild (which runs cleanup first) must also preserve it.
3212        Index::rebuild_all(&store).unwrap();
3213        assert!(
3214            exists(&store, "sources/emails/2026/06/index.md"),
3215            "rebuild_all must not delete a user content file named index.md"
3216        );
3217        let kept = read(&store, "sources/emails/2026/06/index.md");
3218        assert!(
3219            kept.contains("Important imported mail"),
3220            "the user's record content must be intact"
3221        );
3222    }
3223
3224    /// HIGH regression: `cleanup` uses `min_depth(2)`, so the canonical
3225    /// type-folder-root `index.md`/`index.jsonl` are NOT deleted up front. A
3226    /// genuine generated catalog at the type-folder root survives a cleanup pass
3227    /// (it is only ever rewritten, or removed when the folder is truly empty).
3228    #[test]
3229    fn cleanup_keeps_canonical_type_folder_root_sidecars() {
3230        let (_d, store) = mk_store();
3231        write_doc(
3232            &store,
3233            "records/contacts/alice.md",
3234            "contact",
3235            Some("Alice"),
3236            Some("2026-05-01T00:00:00Z"),
3237            "",
3238        );
3239        Index::write_level(&store, &IndexLevel::TypeFolder("records/contacts".into())).unwrap();
3240        assert!(exists(&store, "records/contacts/index.md"));
3241        assert!(exists(&store, "records/contacts/index.jsonl"));
3242        Index::cleanup(&store).unwrap();
3243        assert!(
3244            exists(&store, "records/contacts/index.md"),
3245            "cleanup must keep the canonical type-folder index.md (non-empty folder)"
3246        );
3247        assert!(
3248            exists(&store, "records/contacts/index.jsonl"),
3249            "cleanup must keep the canonical type-folder index.jsonl (non-empty folder)"
3250        );
3251    }
3252
3253    // ── regression: write-through must not catalog index artifacts ────────
3254
3255    /// HIGH regression: routing a generated `index.md` through `on_write` (as
3256    /// `dbmd fm set records/contacts/index.md …` would) must NOT insert a phantom
3257    /// self-row — counts and bytes stay equal to a rebuild.
3258    #[test]
3259    fn on_write_ignores_index_artifact_no_phantom_row() {
3260        let (_d, store) = mk_store();
3261        write_doc(
3262            &store,
3263            "records/contacts/alice.md",
3264            "contact",
3265            Some("Alice"),
3266            Some("2026-05-01T00:00:00Z"),
3267            "",
3268        );
3269        Index::on_write(&store, Path::new("records/contacts/alice.md")).unwrap();
3270        let jsonl_before = read(&store, "records/contacts/index.jsonl");
3271        assert_eq!(jsonl_before.lines().count(), 1);
3272
3273        // Tamper: route the catalog file itself through on_write.
3274        Index::on_write(&store, Path::new("records/contacts/index.md")).unwrap();
3275
3276        let jsonl_after = read(&store, "records/contacts/index.jsonl");
3277        assert_eq!(
3278            jsonl_after.lines().count(),
3279            1,
3280            "on_write on index.md must not add a phantom self-row"
3281        );
3282        assert!(
3283            !jsonl_after.contains("\"type\":\"index\""),
3284            "the catalog artifact must never appear as a catalogued row"
3285        );
3286        // Root rollup count stays 1 (not inflated to 2).
3287        let root = read(&store, "index.md");
3288        assert!(
3289            root.contains("[[records/contacts/index|Contacts]] (1)"),
3290            "count must not inflate:\n{root}"
3291        );
3292    }
3293
3294    // ── regression: multi-line summary cannot inject a catalog line ───────
3295
3296    /// HIGH regression: a block-scalar summary spanning multiple lines must be
3297    /// collapsed to one line in the browse entry, so it cannot forge a standalone
3298    /// `- [[…]]` catalog line.
3299    #[test]
3300    fn multiline_summary_is_single_lined_in_index_md() {
3301        let (_d, store) = mk_store();
3302        // A YAML block scalar whose value embeds a forged-looking entry line.
3303        write_raw(
3304            &store,
3305            "records/notes/evil.md",
3306            "type: note\nupdated: 2026-06-10T00:00:00Z\nsummary: |-\n  legit first line\n  - [[records/secrets/fake|Click me]] — injected entry",
3307            "\nbody\n",
3308        );
3309        let idx = Index::build_type_folder(&store, Path::new("records/notes")).unwrap();
3310        let md = idx.to_markdown();
3311        // Exactly one browse entry line, and no embedded newline forging a second.
3312        let entry_lines = md.lines().filter(|l| l.starts_with("- [[")).count();
3313        assert_eq!(
3314            entry_lines, 1,
3315            "a multi-line summary must not produce extra entry lines:\n{md}"
3316        );
3317        assert!(
3318            md.contains(
3319                "- [[records/notes/evil]] — legit first line - [[records/secrets/fake|Click me]] — injected entry\n"
3320            ),
3321            "summary newlines must collapse to spaces inline:\n{md}"
3322        );
3323    }
3324
3325    // ── regression: writer/validator scalar coercion agreement ────────────
3326
3327    /// HIGH regression: an unquoted non-string scalar `summary`/`type`
3328    /// (`summary: 2026`, `type: true`) must be coerced to a string by the index
3329    /// writer exactly as `validate::scalar_string` does — so the index entry holds
3330    /// the real value (`2026`), not the `(no summary)` placeholder that produced a
3331    /// permanently-unfixable INDEX_SUMMARY_MISMATCH.
3332    #[test]
3333    fn non_string_scalar_summary_and_type_are_coerced_like_validator() {
3334        let (_d, store) = mk_store();
3335        write_raw(
3336            &store,
3337            "records/contacts/a.md",
3338            "type: contact\nupdated: 2026-05-01T00:00:00Z\nsummary: 2026",
3339            "\nbody\n",
3340        );
3341        let rec = record_from_file(
3342            &store.root.join("records/contacts/a.md"),
3343            PathBuf::from("records/contacts/a.md"),
3344        )
3345        .unwrap();
3346        // `summary: 2026` (YAML number) coerces to the string "2026", matching
3347        // the validator's `scalar_string` (Number -> n.to_string()).
3348        assert_eq!(rec.summary, "2026");
3349        assert_eq!(rec.type_, "contact");
3350
3351        // And the rendered index entry quotes the real value, not the placeholder.
3352        let idx = Index::build_type_folder(&store, Path::new("records/contacts")).unwrap();
3353        let md = idx.to_markdown();
3354        assert!(
3355            md.contains("- [[records/contacts/a]] — 2026\n"),
3356            "index entry must hold the coerced scalar, not the placeholder:\n{md}"
3357        );
3358
3359        // A boolean scalar type coerces to "true" (mirrors scalar_string(Bool)).
3360        write_raw(
3361            &store,
3362            "records/contacts/b.md",
3363            "type: true\nupdated: 2026-05-02T00:00:00Z\nsummary: hi",
3364            "\nbody\n",
3365        );
3366        let rec_b = record_from_file(
3367            &store.root.join("records/contacts/b.md"),
3368            PathBuf::from("records/contacts/b.md"),
3369        )
3370        .unwrap();
3371        assert_eq!(rec_b.type_, "true");
3372    }
3373
3374    // ── regression: non-UTF-8 body must not abort the projection ──────────
3375
3376    /// HIGH regression: a content file with valid-UTF-8 frontmatter but a
3377    /// non-UTF-8 byte in the BODY (a verbatim Latin-1 `sources/` import) must
3378    /// still project to an IndexRecord — `record_from_file` reads frontmatter
3379    /// without requiring the whole file to be UTF-8, so a stray byte can't abort
3380    /// `rebuild_all` / write-through for the entire store.
3381    #[test]
3382    fn non_utf8_body_does_not_abort_record_projection() {
3383        let (_d, store) = mk_store();
3384        let rel = "sources/emails/2026/06/x.md";
3385        let abs = store.root.join(rel);
3386        fs::create_dir_all(abs.parent().unwrap()).unwrap();
3387        // Valid-UTF-8 frontmatter; a raw 0xE9 (Latin-1 'é') in the body.
3388        let mut bytes: Vec<u8> =
3389            b"---\ntype: email\nupdated: 2026-06-11T00:00:00Z\nsummary: An imported email\n---\n\nCaf"
3390                .to_vec();
3391        bytes.push(0xE9);
3392        bytes.extend_from_slice(b" meeting notes\n");
3393        fs::write(&abs, bytes).unwrap();
3394
3395        let rec = record_from_file(&abs, PathBuf::from(rel))
3396            .expect("non-UTF-8 body must not abort the frontmatter read");
3397        assert_eq!(rec.summary, "An imported email");
3398        assert_eq!(rec.type_, "email");
3399
3400        // The full sweep indexes the folder rather than aborting the whole store.
3401        Index::rebuild_all(&store).unwrap();
3402        assert!(
3403            exists(&store, "sources/emails/index.jsonl"),
3404            "rebuild must produce the catalog despite a non-UTF-8 body byte"
3405        );
3406        assert!(
3407            read(&store, "sources/emails/index.jsonl").contains("An imported email"),
3408            "the record must be catalogued"
3409        );
3410    }
3411
3412    /// HIGH regression: a single malformed-YAML file must abort the rebuild
3413    /// loudly (not be silently skipped) — skipping it would leave the store in a
3414    /// permanently invalid state (`INDEX_MISSING_ENTRY` / `INDEX_JSONL_DESYNC`
3415    /// that no rebuild clears, since the validator enumerates members by
3416    /// filename, not by parseability) and would desync the rollups. The abort is
3417    /// safe because `cleanup` preserves the prior canonical catalogs
3418    /// (`min_depth(2)`), so an aborted rebuild leaves the existing sidecars
3419    /// intact and surfaces a clear error naming the file to fix.
3420    #[test]
3421    fn rebuild_aborts_on_malformed_file_and_keeps_prior_catalogs() {
3422        let (_d, store) = mk_store();
3423        write_doc(
3424            &store,
3425            "records/contacts/alice.md",
3426            "contact",
3427            Some("Alice"),
3428            Some("2026-05-01T00:00:00Z"),
3429            "",
3430        );
3431        write_doc(
3432            &store,
3433            "records/companies/acme.md",
3434            "company",
3435            Some("Acme"),
3436            Some("2026-05-02T00:00:00Z"),
3437            "",
3438        );
3439
3440        // A clean first rebuild establishes the canonical catalogs.
3441        Index::rebuild_all(&store).expect("clean rebuild succeeds");
3442        assert!(exists(&store, "records/contacts/index.jsonl"));
3443        assert!(exists(&store, "records/companies/index.jsonl"));
3444
3445        // Routine malformed file: unterminated quoted scalar.
3446        let bad = store.root.join("records/contacts/broken.md");
3447        fs::write(
3448            &bad,
3449            "---\ntype: contact\nsummary: \"unterminated\n---\nbody\n",
3450        )
3451        .unwrap();
3452
3453        // Must abort loudly — a silent skip leaves a file the validator requires
3454        // to be catalogued out of the index forever.
3455        Index::rebuild_all(&store)
3456            .expect_err("rebuild must abort, not silently skip, on a malformed file");
3457
3458        // The prior canonical catalogs survive the aborted rebuild: `cleanup`'s
3459        // `min_depth(2)` never deletes a type-folder's root-level sidecars, so a
3460        // mid-sweep abort leaves the existing indexes intact rather than wiped.
3461        assert!(
3462            exists(&store, "records/companies/index.jsonl"),
3463            "an aborted rebuild must not destroy a clean sibling folder's catalog"
3464        );
3465        assert!(
3466            exists(&store, "records/contacts/index.jsonl"),
3467            "an aborted rebuild must not destroy the affected folder's prior catalog"
3468        );
3469        let contacts_jsonl = read(&store, "records/contacts/index.jsonl");
3470        assert!(contacts_jsonl.contains("records/contacts/alice.md"));
3471    }
3472
3473    /// HIGH regression (problem B): `rebuild_all`'s rollup `(N)` counts must
3474    /// equal the catalogued `index.jsonl` record counts — never a raw `.md` walk
3475    /// that disagrees with the sidecar. The over-corrected skip-with-diagnostic
3476    /// build excluded a malformed file from `index.jsonl` while `build_layer` /
3477    /// `build_root` kept counting it via `walk_type_folder_files`, so a folder
3478    /// would show `Contacts (2)` in the root/layer rollups while its `index.jsonl`
3479    /// held only 1 record — and a single subsequent write-through (which derives
3480    /// `(N)` from the jsonl) rewrote it to `Contacts (1)`, making `rebuild_all`
3481    /// and write-through emit different bytes for the same state. With the loud
3482    /// abort, the only successful-rebuild states are fully consistent: every
3483    /// rollup `(N)` equals the catalogued record count AND equals what a
3484    /// write-through over the same files produces.
3485    #[test]
3486    fn rebuild_rollup_counts_equal_jsonl_records_and_write_through() {
3487        let (_d, store) = mk_store();
3488        // Two well-formed contacts: the rollups must read (2), matching the two
3489        // jsonl records — this is the count the skip-version inflated to a phantom
3490        // extra when a malformed sibling was present-but-uncatalogued.
3491        write_doc(
3492            &store,
3493            "records/contacts/alice.md",
3494            "contact",
3495            Some("Alice"),
3496            Some("2026-05-01T00:00:00Z"),
3497            "",
3498        );
3499        write_doc(
3500            &store,
3501            "records/contacts/bob.md",
3502            "contact",
3503            Some("Bob"),
3504            Some("2026-05-02T00:00:00Z"),
3505            "",
3506        );
3507        Index::rebuild_all(&store).expect("clean rebuild succeeds");
3508
3509        // The catalogued record set (index.jsonl) and the rollup (N) must agree.
3510        let jsonl_lines = read(&store, "records/contacts/index.jsonl")
3511            .lines()
3512            .filter(|l| !l.trim().is_empty())
3513            .count();
3514        assert_eq!(jsonl_lines, 2, "two well-formed files ⇒ two jsonl records");
3515        let layer_md = read(&store, "records/index.md");
3516        let root_md = read(&store, "index.md");
3517        assert!(
3518            layer_md.contains("- [[records/contacts/index|Contacts]] (2)"),
3519            "layer rollup (N) must equal the jsonl record count (2), not a raw .md walk:\n{layer_md}"
3520        );
3521        assert!(
3522            root_md.contains("- [[records/contacts/index|Contacts]] (2)\n")
3523                && root_md.contains("## Records (2)"),
3524            "root rollup (N)/layer total must equal the jsonl record count (2):\n{root_md}"
3525        );
3526
3527        // The decisive write-through == rebuild_all byte-identity check on the
3528        // SAME end state: a single on_write must not rewrite the rollups to a
3529        // different (N). Under the skip-version, rebuild_all's rollup walked the
3530        // raw .md tree while on_write derived (N) from the jsonl, so the two
3531        // diverged; the loud abort keeps both deriving (N) from the catalogued
3532        // records, so the bytes match exactly.
3533        let (_d2, wt) = mk_store();
3534        write_doc(
3535            &wt,
3536            "records/contacts/alice.md",
3537            "contact",
3538            Some("Alice"),
3539            Some("2026-05-01T00:00:00Z"),
3540            "",
3541        );
3542        write_doc(
3543            &wt,
3544            "records/contacts/bob.md",
3545            "contact",
3546            Some("Bob"),
3547            Some("2026-05-02T00:00:00Z"),
3548            "",
3549        );
3550        Index::on_write(&wt, Path::new("records/contacts/alice.md")).unwrap();
3551        Index::on_write(&wt, Path::new("records/contacts/bob.md")).unwrap();
3552
3553        let a = snapshot_artifacts(&wt);
3554        let b = snapshot_artifacts(&store);
3555        assert_eq!(
3556            a.keys().collect::<BTreeSet<_>>(),
3557            b.keys().collect::<BTreeSet<_>>(),
3558            "write-through and rebuild_all must produce the same artifact set"
3559        );
3560        for (k, v) in &a {
3561            assert_eq!(
3562                v, &b[k],
3563                "rollup bytes diverged between write-through and rebuild_all for {k} \
3564                 (a skip-version inflates rebuild_all's (N) above the jsonl record \
3565                 count, which write-through then rewrites):\n--- write-through ---\n{v}\n--- rebuild ---\n{}",
3566                b[k]
3567            );
3568        }
3569    }
3570
3571    /// MEDIUM regression: a non-UTF-8 path component must be lossily decoded
3572    /// (kept, with U+FFFD), not silently dropped — so the index key points at the
3573    /// file, not its parent directory. Unix-only (ext4 allows the filename; APFS
3574    /// rejects it at the VFS layer).
3575    #[cfg(unix)]
3576    #[test]
3577    fn non_utf8_path_component_is_kept_not_dropped() {
3578        use std::ffi::OsStr;
3579        use std::os::unix::ffi::OsStrExt;
3580        // sources/emails/caf\xE9.md — the leaf has a non-UTF-8 byte.
3581        let mut leaf = b"caf".to_vec();
3582        leaf.push(0xE9);
3583        leaf.extend_from_slice(b".md");
3584        let p = Path::new("sources/emails").join(OsStr::from_bytes(&leaf));
3585        let unix = path_to_unix(&p);
3586        // The leaf is preserved (lossy), so the path is NOT collapsed to the
3587        // parent directory "sources/emails".
3588        assert_ne!(
3589            unix, "sources/emails",
3590            "non-UTF-8 leaf must not be dropped, collapsing the path to its parent dir"
3591        );
3592        assert!(
3593            unix.starts_with("sources/emails/caf"),
3594            "the lossy leaf must remain under its folder: {unix}"
3595        );
3596    }
3597
3598    // ── loose files (directly at a layer root, no type-folder) ───────────────
3599
3600    #[test]
3601    fn loose_file_is_catalogued_in_layer_jsonl_not_type_folder() {
3602        let (_d, store) = mk_store();
3603        // One canonical file (in a type-folder) and one loose file at the root.
3604        write_doc(
3605            &store,
3606            "records/contacts/alice.md",
3607            "contact",
3608            Some("Alice"),
3609            Some("2026-06-01T08:00:00Z"),
3610            "id: alice\n",
3611        );
3612        write_doc(
3613            &store,
3614            "records/loose.md",
3615            "contact",
3616            Some("Loose"),
3617            Some("2026-06-01T08:00:00Z"),
3618            "id: loose\n",
3619        );
3620        Index::rebuild_all(&store).unwrap();
3621
3622        // The layer carries its own jsonl listing exactly the loose file —
3623        // disjoint from the type-folder jsonl, so no double-count.
3624        assert!(
3625            exists(&store, "records/index.jsonl"),
3626            "layer jsonl must exist when loose files are present"
3627        );
3628        let layer_jsonl = read(&store, "records/index.jsonl");
3629        assert!(
3630            layer_jsonl.contains("records/loose.md"),
3631            "layer jsonl must list the loose file, got:\n{layer_jsonl}"
3632        );
3633        assert!(
3634            !layer_jsonl.contains("records/contacts/alice.md"),
3635            "layer jsonl must NOT list type-folder files"
3636        );
3637        let tf_jsonl = read(&store, "records/contacts/index.jsonl");
3638        assert!(tf_jsonl.contains("records/contacts/alice.md"));
3639        assert!(!tf_jsonl.contains("records/loose.md"));
3640
3641        // The layer index.md stays a pure type-folder rollup — no loose entry.
3642        let layer_md = read(&store, "records/index.md");
3643        assert!(
3644            layer_md.contains("records/contacts/index"),
3645            "layer md must roll up the type-folder, got:\n{layer_md}"
3646        );
3647        assert!(
3648            !layer_md.contains("records/loose"),
3649            "layer md must stay a rollup, not list loose files, got:\n{layer_md}"
3650        );
3651    }
3652
3653    #[test]
3654    fn loose_file_write_through_equals_rebuild() {
3655        let (_d1, wt) = mk_store();
3656        let (_d2, rb) = mk_store();
3657        for s in [&wt, &rb] {
3658            write_doc(
3659                s,
3660                "records/contacts/alice.md",
3661                "contact",
3662                Some("Alice"),
3663                Some("2026-06-01T08:00:00Z"),
3664                "id: alice\n",
3665            );
3666            write_doc(
3667                s,
3668                "records/loose.md",
3669                "contact",
3670                Some("Loose"),
3671                Some("2026-06-02T08:00:00Z"),
3672                "id: loose\n",
3673            );
3674        }
3675        // wt: write-through (loop); rb: full rebuild (sweep). Must agree byte-wise.
3676        Index::on_write(&wt, Path::new("records/contacts/alice.md")).unwrap();
3677        Index::on_write(&wt, Path::new("records/loose.md")).unwrap();
3678        Index::rebuild_all(&rb).unwrap();
3679
3680        let a = snapshot_artifacts(&wt);
3681        let b = snapshot_artifacts(&rb);
3682        assert_eq!(
3683            a.keys().collect::<Vec<_>>(),
3684            b.keys().collect::<Vec<_>>(),
3685            "loose-file loop and sweep must produce the same artifact set"
3686        );
3687        for (k, v) in &a {
3688            assert_eq!(
3689                v, &b[k],
3690                "loose-file artifact {k} differs between loop and sweep"
3691            );
3692        }
3693    }
3694
3695    #[test]
3696    fn removing_last_loose_file_clears_layer_jsonl() {
3697        let (_d, store) = mk_store();
3698        write_doc(
3699            &store,
3700            "records/loose.md",
3701            "contact",
3702            Some("Loose"),
3703            Some("2026-06-01T08:00:00Z"),
3704            "id: loose\n",
3705        );
3706        Index::on_write(&store, Path::new("records/loose.md")).unwrap();
3707        assert!(
3708            exists(&store, "records/index.jsonl"),
3709            "layer jsonl present after a loose write"
3710        );
3711        fs::remove_file(store.root.join("records/loose.md")).unwrap();
3712        Index::on_remove(&store, Path::new("records/loose.md")).unwrap();
3713        assert!(
3714            !exists(&store, "records/index.jsonl"),
3715            "layer jsonl must be removed once the last loose file is gone"
3716        );
3717    }
3718
3719    // ── concurrency: shared layer/root rollup under parallel write-through ────
3720
3721    #[test]
3722    fn concurrent_writes_to_different_type_folders_match_rebuild() {
3723        use std::sync::Arc;
3724        use std::thread;
3725
3726        // Two threads, each owning a DISTINCT type-folder, drive `on_write`
3727        // concurrently. The layer `index.md` and root `index.md` are shared
3728        // across both folders, but each `on_write` only locks its own
3729        // type-folder — so before the `update_parents` store-root lock, the two
3730        // threads raced to rewrite those shared rollups and one update was lost
3731        // (the rollup no longer matched `rebuild_all`). With the lock the final
3732        // rollups must be byte-identical to a from-scratch rebuild, regardless
3733        // of interleaving.
3734        let (_d, store) = mk_store();
3735        let folders = ["records/contacts", "records/companies"];
3736        let n = 12usize;
3737
3738        // Pre-create all content files (disjoint paths) so the threads race only
3739        // on the index write-through, not on content creation.
3740        for (fi, folder) in folders.iter().enumerate() {
3741            for i in 0..n {
3742                write_doc(
3743                    &store,
3744                    &format!("{folder}/f{fi}_{i}.md"),
3745                    "contact",
3746                    Some(&format!("Summary {fi}-{i}")),
3747                    Some(&format!("2026-06-{:02}T08:00:00Z", i + 1)),
3748                    &format!("id: f{fi}_{i}\n"),
3749                );
3750            }
3751        }
3752
3753        let store = Arc::new(store);
3754        let handles: Vec<_> = folders
3755            .iter()
3756            .enumerate()
3757            .map(|(fi, folder)| {
3758                let store = Arc::clone(&store);
3759                let folder = folder.to_string();
3760                thread::spawn(move || {
3761                    for i in 0..n {
3762                        let rel = format!("{folder}/f{fi}_{i}.md");
3763                        Index::on_write(&store, Path::new(&rel)).unwrap();
3764                    }
3765                })
3766            })
3767            .collect();
3768        for h in handles {
3769            h.join().unwrap();
3770        }
3771
3772        // Snapshot the write-through artifacts, then rebuild from scratch over
3773        // the identical content and snapshot again — they must agree exactly.
3774        let got = snapshot_artifacts(&store);
3775        Index::rebuild_all(&store).unwrap();
3776        let want = snapshot_artifacts(&store);
3777
3778        assert_eq!(
3779            got.keys().collect::<Vec<_>>(),
3780            want.keys().collect::<Vec<_>>(),
3781            "artifact set after concurrent write-through must match rebuild"
3782        );
3783        for (k, v) in &want {
3784            assert_eq!(
3785                &got[k], v,
3786                "rollup artifact {k} diverged from rebuild after concurrent writes"
3787            );
3788        }
3789    }
3790}