Skip to main content

mati_core/store/
db.rs

1//! SurrealKV storage layer (M-03).
2//!
3//! Two trees per project:
4//! - `knowledge.db` — all user-visible records, indefinite versioning
5//! - `sessions.db`  — session analytics and hook events, 90-day retention
6//!
7//! Path: `~/.mati/<slug>/knowledge.db` and `sessions.db`
8//! Slug: first 8 hex chars of SHA-256(git remote URL), falls back to
9//!       SHA-256(canonicalized repo root path).
10//!
11//! Write durability follows the split defined in [`crate::store::Durability`]:
12//! - `Immediate` → fsync before commit (knowledge records)
13//! - `Eventual`  → OS write buffer (session / analytics records)
14
15use std::path::{Path, PathBuf};
16use std::time::{SystemTime, UNIX_EPOCH};
17
18use anyhow::{Context, Result};
19use once_cell::sync::OnceCell;
20use rmp_serde as rmps;
21use sha2::{Digest, Sha256};
22use surrealkv::{
23    Durability as SkvDurability, HistoryOptions, LSMIterator, Mode, Options, Transaction, Tree,
24    TreeBuilder, VLogChecksumLevel,
25};
26
27use serde::{Deserialize, Serialize};
28
29use super::record::Record;
30use super::Durability;
31use crate::search::Search;
32
33// 90 days expressed as nanoseconds — retention period for sessions.db
34const SESSIONS_RETENTION_NS: u64 = 90 * 24 * 60 * 60 * 1_000_000_000u64;
35
36/// Marker file written by `mati init` when tantivy indexing is deferred.
37/// Detected by [`Store::open_and_rebuild`] (MCP server startup) to trigger
38/// a full rebuild before serving search queries.
39const SEARCH_STALE_MARKER: &str = "search_stale";
40/// Written before every tantivy commit on knowledge keys; removed on success.
41/// Presence on startup means a crash interrupted the KV→tantivy sync window.
42const SEARCH_SYNC_PENDING: &str = "search_sync_pending";
43
44/// Key namespaces stored in the `knowledge` tree that contain [`Record`] structs.
45///
46/// Used by [`Store::rebuild_search_index`] to scan everything that was indexed
47/// during normal `put`/`put_batch` calls. Must stay in sync with
48/// [`Durability::for_key`]'s Immediate set.
49const KNOWLEDGE_NAMESPACES: &[&str] = &[
50    "gotcha:",
51    "decision:",
52    "file:",
53    "stage:",
54    "dev_note:",
55    "dep:",
56];
57
58/// Returns `true` for keys whose writes should invalidate cached stats snapshots.
59///
60/// These are the namespaces that affect the knowledge coverage aggregates
61/// displayed by `mati stats` and `mati gaps`. Must stay in sync with
62/// [`KNOWLEDGE_NAMESPACES`].
63fn is_knowledge_key(key: &str) -> bool {
64    key.starts_with("file:")
65        || key.starts_with("gotcha:")
66        || key.starts_with("decision:")
67        || key.starts_with("dep:")
68        || key.starts_with("dev_note:")
69        || key.starts_with("stage:")
70}
71
72/// Key namespaces stored in the `sessions` tree that contain [`Record`] structs.
73///
74/// `graph:edge:*` is intentionally excluded — those values are raw 8-byte
75/// timestamps, not `Record` structs, and must not be fed to the search index.
76const SESSION_NAMESPACES: &[&str] = &["session:", "analytics:", "hook_event:", "compliance:"];
77
78/// A write operation for a knowledge-tree transaction.
79///
80/// Supports both Record writes (indexed by tantivy) and raw byte writes
81/// (e.g., audit entries) in the same atomic commit.
82pub enum KnowledgeWriteOp<'a> {
83    /// Write a Record (serialized via MessagePack, indexed by tantivy).
84    PutRecord { key: &'a str, record: &'a Record },
85    /// Write raw bytes (not a Record, not indexed by tantivy).
86    PutRaw { key: &'a str, value: &'a [u8] },
87}
88
89/// Persistent knowledge store for a single mati project.
90///
91/// Wraps two SurrealKV trees:
92/// - `knowledge` — user-visible records (gotchas, files, decisions, …)
93/// - `sessions`  — analytics, hook events, compliance logs
94///
95/// All public methods are `async`; callers must be in a `tokio` context.
96pub struct Store {
97    knowledge: Tree,
98    sessions: Tree,
99    /// Tantivy full-text index — lazily initialized on first use.
100    ///
101    /// Hook commands (`get`, `log-hit`, `log-miss`, `reparse`) never touch the
102    /// search index, so we skip the ~30-50ms tantivy init on `Store::open`.
103    /// The index is created on the first call to a method that needs it
104    /// (`put`, `put_batch`, `search`, `rebuild_search_index`).
105    search: OnceCell<Search>,
106    /// Absolute path to `~/.mati/<slug>/`
107    pub root: PathBuf,
108    /// Set by [`Store::open`] when the search index was corrupt or schema-
109    /// incompatible on startup. Callers should use [`Store::open_and_rebuild`]
110    /// rather than inspecting this field directly.
111    index_needs_rebuild: bool,
112}
113
114impl Store {
115    /// Open (or create) both trees for the project rooted at `repo_root`.
116    ///
117    /// Creates `~/.mati/<slug>/` if it does not exist.
118    ///
119    /// If the search index is corrupt or schema-incompatible, it is wiped and
120    /// replaced with a fresh empty index. [`Store::index_needs_rebuild`] will
121    /// return `true` in that case — call [`Store::rebuild_search_index`] before
122    /// issuing any search queries, or use [`Store::open_and_rebuild`] which
123    /// handles this automatically.
124    pub async fn open(repo_root: &Path) -> Result<Self> {
125        let slug = derive_slug(repo_root);
126        let home = dirs::home_dir().context("cannot determine home directory")?;
127        let root = home.join(".mati").join(&slug);
128        std::fs::create_dir_all(&root)
129            .with_context(|| format!("cannot create mati dir at {}", root.display()))?;
130
131        let knowledge = open_knowledge_tree(root.join("knowledge.db"))
132            .map_err(|e| lock_error_hint(e, &root.join("knowledge.db")))?;
133        let sessions = open_sessions_tree(root.join("sessions.db"))
134            .map_err(|e| lock_error_hint(e, &root.join("sessions.db")))?;
135
136        // Tantivy is NOT initialized here — it is lazily created on first use
137        // via `ensure_search()`. This saves ~30-50ms for hook commands that
138        // only need KV reads/writes (get, log-hit, log-miss, reparse).
139
140        let store = Self {
141            knowledge,
142            sessions,
143            search: OnceCell::new(),
144            root,
145            index_needs_rebuild: false,
146        };
147
148        // Run forward schema migrations atomically. Single-process flock
149        // (SurrealKV's exclusive lock) means no concurrent migrator can
150        // collide here. If the store is already at the current version
151        // this is a single `Store::get` and returns in microseconds.
152        // Migrations refuse to open the store on detected downgrade,
153        // which propagates the error up to the caller via `?`.
154        super::migrations::migrate(&store).await?;
155
156        Ok(store)
157    }
158
159    /// Open the store and rebuild the search index from SurrealKV if needed.
160    ///
161    /// This is the recommended entry point for the CLI and MCP server. It
162    /// combines [`Store::open`] with an automatic [`Store::rebuild_search_index`]
163    /// call when the index was corrupt or missing (C4). Search queries are safe
164    /// to issue immediately on the returned store.
165    ///
166    /// Unlike [`Store::open`], this eagerly initializes tantivy so corruption
167    /// can be detected and recovered from before any queries are issued.
168    pub async fn open_and_rebuild(repo_root: &Path) -> Result<Self> {
169        let mut store = Self::open(repo_root).await?;
170
171        let search_path = store.root.join("search_index");
172        let stale_marker = store.root.join(SEARCH_STALE_MARKER);
173        let has_sync_pending = store.root.join(SEARCH_SYNC_PENDING).exists();
174
175        // Stale marker is written by `mati init` when tantivy indexing was
176        // deferred. SEARCH_SYNC_PENDING means a crash or sync failure interrupted
177        // the KV → tantivy window. In both cases we must wipe the index before
178        // rebuild so removed keys and old versions cannot survive restart.
179        let has_stale_marker = stale_marker.exists();
180        if (has_stale_marker || has_sync_pending) && search_path.exists() {
181            std::fs::remove_dir_all(&search_path).with_context(|| {
182                format!(
183                    "failed to remove stale search index at {}",
184                    search_path.display()
185                )
186            })?;
187        }
188
189        // Eagerly initialize tantivy — detect and recover from corruption.
190        match Search::open(&search_path) {
191            Ok(s) => {
192                let _ = store.search.set(s);
193            }
194            Err(e) => {
195                tracing::warn!(
196                    error = %e,
197                    path  = %search_path.display(),
198                    "search index corrupt or schema-incompatible — wiping and scheduling rebuild"
199                );
200                if search_path.exists() {
201                    std::fs::remove_dir_all(&search_path).with_context(|| {
202                        format!(
203                            "failed to remove corrupt search index at {}",
204                            search_path.display()
205                        )
206                    })?;
207                }
208                let s = Search::open(&search_path)
209                    .context("failed to open fresh search index after clearing corrupt data")?;
210                let _ = store.search.set(s);
211                store.index_needs_rebuild = true;
212            }
213        }
214
215        if has_stale_marker {
216            store.index_needs_rebuild = true;
217        }
218
219        // Detect crash-window desync: KV write committed but the tantivy
220        // commit was interrupted before the fence could be cleared.
221        if has_sync_pending {
222            tracing::warn!("tantivy crash-window desync detected — scheduling rebuild");
223            store.index_needs_rebuild = true;
224        }
225
226        if store.index_needs_rebuild() {
227            store.rebuild_search_index().await?;
228            // Clear the crash-fence if present — a full rebuild is a complete
229            // re-sync from KV, so the index is authoritative again.
230            let _ = std::fs::remove_file(store.root.join(SEARCH_SYNC_PENDING));
231            // Remove stale marker only after a successful rebuild so a
232            // crashed rebuild retries on the next open_and_rebuild call.
233            if has_stale_marker {
234                let _ = std::fs::remove_file(&stale_marker);
235            }
236        }
237        Ok(store)
238    }
239
240    /// True when the search index was corrupt or missing on open.
241    ///
242    /// This flag reflects the state detected at open time and is not reset
243    /// after [`Store::rebuild_search_index`] completes. Use it only to decide
244    /// whether to call `rebuild_search_index` — not as a post-rebuild status.
245    /// [`Store::open_and_rebuild`] handles this automatically.
246    #[must_use]
247    pub fn index_needs_rebuild(&self) -> bool {
248        self.index_needs_rebuild
249    }
250
251    /// Lazily initialize (or return) the tantivy search index.
252    ///
253    /// First call opens the index at `<root>/search_index/`, creating the
254    /// directory and schema if absent. Subsequent calls return the cached
255    /// reference in O(1). If the index is corrupt, the corrupt directory is
256    /// wiped and a fresh index is created.
257    fn ensure_search(&self) -> Result<&Search> {
258        self.search.get_or_try_init(|| {
259            let search_path = self.root.join("search_index");
260            match Search::open(&search_path) {
261                Ok(s) => Ok(s),
262                Err(e) => {
263                    tracing::warn!(
264                        error = %e,
265                        path  = %search_path.display(),
266                        "search index corrupt on lazy init — wiping and creating fresh"
267                    );
268                    if search_path.exists() {
269                        std::fs::remove_dir_all(&search_path).with_context(|| {
270                            format!(
271                                "failed to remove corrupt search index at {}",
272                                search_path.display()
273                            )
274                        })?;
275                    }
276                    Search::open(&search_path)
277                        .context("failed to open fresh search index after clearing corrupt data")
278                }
279            }
280        })
281    }
282
283    /// Rebuild the tantivy search index from scratch by scanning all
284    /// [`Record`]-containing namespaces in SurrealKV (C4).
285    ///
286    /// Must be called on a store whose search index is empty — i.e. immediately
287    /// after [`Store::open`] detected a corrupt/missing index, before any writes.
288    /// Calling on a non-empty index will produce duplicate entries; use the
289    /// deduplication in [`Search::query_keys`] to tolerate this if it occurs.
290    ///
291    /// Returns the total number of records committed to the index.
292    pub async fn rebuild_search_index(&self) -> Result<usize> {
293        let search = self.ensure_search()?;
294
295        // Scan and index one namespace at a time — avoids loading all records
296        // into memory simultaneously. Peak RSS is bounded by the largest single
297        // namespace (typically `file:`) rather than the entire corpus.
298        let mut committed = 0usize;
299
300        for ns in KNOWLEDGE_NAMESPACES.iter().chain(SESSION_NAMESPACES) {
301            let records = self.scan_prefix(ns).await?;
302            if records.is_empty() {
303                continue;
304            }
305            let refs: Vec<&Record> = records.iter().collect();
306            committed += search.add_records(&refs)?;
307        }
308
309        tracing::info!(committed, "search index rebuilt from SurrealKV");
310
311        Ok(committed)
312    }
313
314    // -------------------------------------------------------------------------
315    // Core CRUD
316    // -------------------------------------------------------------------------
317
318    /// Read a record by key. Returns `None` if not found.
319    pub async fn get(&self, key: &str) -> Result<Option<Record>> {
320        let txn = self.tree_for(key).begin_with_mode(Mode::ReadOnly)?;
321        read_record(&txn, key)
322    }
323
324    /// Write a record with the appropriate durability level.
325    ///
326    /// Durability is derived from the key prefix via [`Durability::for_key`].
327    pub async fn put(&self, key: &str, record: &Record) -> Result<()> {
328        let durability = Durability::for_key(key);
329        let tree = self.tree_for(key);
330        let mut txn = tree.begin_with_mode(Mode::WriteOnly)?;
331        txn.set_durability(skv_durability(durability));
332
333        let bytes = rmps::to_vec_named(record)
334            .with_context(|| format!("failed to serialize record for key '{key}'"))?;
335        txn.set(key.as_bytes(), bytes)?;
336        txn.commit().await?;
337
338        // Crash-fence: written after KV commit, removed after tantivy commit.
339        // If the process dies between these two points, open_and_rebuild sees
340        // the marker on the next start and triggers a full index rebuild.
341        if is_knowledge_key(key) {
342            let _ = std::fs::write(self.root.join(SEARCH_SYNC_PENDING), b"");
343        }
344
345        // Update search index — KV write is primary, search is secondary.
346        // We replace by key rather than append, so tantivy stays aligned with
347        // the latest KV state without waiting for a full rebuild.
348        //
349        // Wrapped in catch_unwind: a tantivy panic (e.g., corrupted segment)
350        // must never crash the server. The KV write already committed above —
351        // the search index will be rebuilt on next startup via the
352        // SEARCH_SYNC_PENDING crash-fence marker.
353        let mut search_synced = false;
354        match self.ensure_search() {
355            Ok(search) => {
356                match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
357                    search.add_record(record)
358                })) {
359                    Ok(Ok(())) => {
360                        search_synced = true;
361                    }
362                    Ok(Err(e)) => {
363                        tracing::warn!("search index update failed for '{key}': {e}");
364                    }
365                    Err(_panic) => {
366                        tracing::error!(
367                            "search index panicked during put for '{key}' — \
368                             index will be rebuilt on next startup"
369                        );
370                    }
371                }
372            }
373            Err(e) => {
374                tracing::warn!("search index unavailable during put: {e}");
375            }
376        }
377        if is_knowledge_key(key) {
378            self.bump_write_seq();
379            if search_synced {
380                let _ = std::fs::remove_file(self.root.join(SEARCH_SYNC_PENDING));
381            }
382        }
383        Ok(())
384    }
385
386    /// Write multiple records to KV only, skipping the tantivy search index.
387    ///
388    /// Use this during bulk init passes where search indexing would block the
389    /// critical path. Follow with [`Self::rebuild_search_index`] to update tantivy
390    /// from the same in-memory records without a KV round-trip.
391    ///
392    /// Same durability semantics as [`Self::put_batch`]: at most 2 fsyncs.
393    pub async fn put_batch_kv_only(&self, records: &[(&str, &Record)]) -> Result<()> {
394        if records.is_empty() {
395            return Ok(());
396        }
397        let mut immediate: Vec<(&str, &Record)> = Vec::new();
398        let mut eventual: Vec<(&str, &Record)> = Vec::new();
399        for &(key, record) in records {
400            match Durability::for_key(key) {
401                Durability::Immediate => immediate.push((key, record)),
402                Durability::Eventual => eventual.push((key, record)),
403            }
404        }
405        if !immediate.is_empty() {
406            let mut txn = self.knowledge.begin_with_mode(Mode::WriteOnly)?;
407            txn.set_durability(SkvDurability::Immediate);
408            for (key, record) in &immediate {
409                let bytes = rmps::to_vec_named(record)
410                    .with_context(|| format!("failed to serialize record for key '{key}'"))?;
411                txn.set(key.as_bytes(), bytes)?;
412            }
413            txn.commit().await?;
414        }
415        if !eventual.is_empty() {
416            let mut txn = self.sessions.begin_with_mode(Mode::WriteOnly)?;
417            txn.set_durability(SkvDurability::Eventual);
418            for (key, record) in &eventual {
419                let bytes = rmps::to_vec_named(record)
420                    .with_context(|| format!("failed to serialize record for key '{key}'"))?;
421                txn.set(key.as_bytes(), bytes)?;
422            }
423            txn.commit().await?;
424        }
425        if records.iter().any(|(k, _)| is_knowledge_key(k)) {
426            self.bump_write_seq();
427        }
428        Ok(())
429    }
430
431    /// Mark the search index as stale so the next [`Self::open_and_rebuild`]
432    /// call wipes and rebuilds it from KV.
433    ///
434    /// Written by `mati init` after a cold init pass to defer the tantivy
435    /// indexing cost (~400ms on 27k records) to the first MCP server startup.
436    /// Best-effort: a write failure is silently discarded — the worst outcome
437    /// is that the search index contains stale data until the next full rebuild.
438    pub fn mark_search_stale(&self) {
439        let _ = std::fs::write(self.root.join(SEARCH_STALE_MARKER), b"");
440    }
441
442    /// Write multiple records in a single transaction per durability class.
443    ///
444    /// Records are grouped by their key prefix: all `Immediate` keys share one
445    /// transaction on `knowledge` (1 fsync), all `Eventual` keys share one on
446    /// `sessions` (1 fsync). The whole batch costs at most 2 fsyncs regardless
447    /// of how many records it contains — critical for Layer 0 bulk inserts.
448    ///
449    /// Empty slice is a no-op. Mixed-durability batches are handled correctly.
450    pub async fn put_batch(&self, records: &[(&str, &Record)]) -> Result<()> {
451        if records.is_empty() {
452            return Ok(());
453        }
454
455        // Partition by durability class so each tree gets exactly one commit.
456        let mut immediate: Vec<(&str, &Record)> = Vec::new();
457        let mut eventual: Vec<(&str, &Record)> = Vec::new();
458        for &(key, record) in records {
459            match Durability::for_key(key) {
460                Durability::Immediate => immediate.push((key, record)),
461                Durability::Eventual => eventual.push((key, record)),
462            }
463        }
464
465        if !immediate.is_empty() {
466            let mut txn = self.knowledge.begin_with_mode(Mode::WriteOnly)?;
467            txn.set_durability(SkvDurability::Immediate);
468            for (key, record) in &immediate {
469                let bytes = rmps::to_vec_named(record)
470                    .with_context(|| format!("failed to serialize record for key '{key}'"))?;
471                txn.set(key.as_bytes(), bytes)?;
472            }
473            txn.commit().await?;
474        }
475
476        if !eventual.is_empty() {
477            let mut txn = self.sessions.begin_with_mode(Mode::WriteOnly)?;
478            txn.set_durability(SkvDurability::Eventual);
479            for (key, record) in &eventual {
480                let bytes = rmps::to_vec_named(record)
481                    .with_context(|| format!("failed to serialize record for key '{key}'"))?;
482                txn.set(key.as_bytes(), bytes)?;
483            }
484            txn.commit().await?;
485        }
486
487        let has_knowledge = records.iter().any(|(k, _)| is_knowledge_key(k));
488
489        // Crash-fence — same pattern as put().
490        if has_knowledge {
491            let _ = std::fs::write(self.root.join(SEARCH_SYNC_PENDING), b"");
492        }
493
494        // Update search index — KV write is primary, search is secondary.
495        // If tantivy fails to initialize, the KV writes still succeeded.
496        // Wrapped in catch_unwind for the same reason as put().
497        let mut search_synced = false;
498        match self.ensure_search() {
499            Ok(search) => {
500                let search_records: Vec<&Record> = records.iter().map(|(_, r)| *r).collect();
501                match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
502                    search.add_records(&search_records)
503                })) {
504                    Ok(Ok(_)) => {
505                        search_synced = true;
506                    }
507                    Ok(Err(e)) => {
508                        tracing::warn!("search index update failed in put_batch: {e}");
509                    }
510                    Err(_panic) => {
511                        tracing::error!(
512                            "search index panicked during put_batch — \
513                             index will be rebuilt on next startup"
514                        );
515                    }
516                }
517            }
518            Err(e) => {
519                tracing::warn!("search index unavailable during put_batch: {e}");
520            }
521        }
522        if has_knowledge {
523            self.bump_write_seq();
524            if search_synced {
525                let _ = std::fs::remove_file(self.root.join(SEARCH_SYNC_PENDING));
526            }
527        }
528        Ok(())
529    }
530
531    /// Delete a record by key. No-op if the key does not exist.
532    pub async fn delete(&self, key: &str) -> Result<()> {
533        let tree = self.tree_for(key);
534        let mut txn = tree.begin_with_mode(Mode::WriteOnly)?;
535        txn.set_durability(skv_durability(Durability::for_key(key)));
536        txn.delete(key.as_bytes())?;
537        txn.commit().await?;
538
539        if is_knowledge_key(key) {
540            let _ = std::fs::write(self.root.join(SEARCH_SYNC_PENDING), b"");
541        }
542
543        let mut search_synced = false;
544        match self.ensure_search() {
545            Ok(search) => {
546                search.delete_key(key)?;
547                search_synced = true;
548            }
549            Err(e) => {
550                tracing::warn!("search index unavailable during delete: {e}");
551            }
552        }
553
554        if is_knowledge_key(key) {
555            self.bump_write_seq();
556            if search_synced {
557                let _ = std::fs::remove_file(self.root.join(SEARCH_SYNC_PENDING));
558            }
559        }
560        Ok(())
561    }
562
563    /// Return all records whose key starts with `prefix`.
564    ///
565    /// Prefix must use one of the known key namespaces so the correct tree is
566    /// selected. Unknown prefixes are scanned from `knowledge`.
567    ///
568    /// Return order is not guaranteed. Callers that need a stable order must sort.
569    pub async fn scan_prefix(&self, prefix: &str) -> Result<Vec<Record>> {
570        let tree = self.tree_for(prefix);
571        let txn = tree.begin_with_mode(Mode::ReadOnly)?;
572
573        // Range: [prefix, prefix\xff) covers all keys with this prefix
574        let end = prefix_end(prefix);
575        let iter = txn.range(prefix.as_bytes(), end.as_bytes())?;
576
577        let mut records = Vec::new();
578        let mut cursor = iter;
579        while cursor.next()? {
580            let bytes = cursor.value()?;
581            match rmps::from_slice::<Record>(&bytes) {
582                Ok(record) => records.push(record),
583                Err(e) => {
584                    tracing::warn!("skipping malformed record during scan: {e}");
585                }
586            }
587        }
588        Ok(records)
589    }
590
591    /// Scan records whose key starts with `prefix`, invoking `callback` for each.
592    ///
593    /// Same tree routing and prefix semantics as [`Self::scan_prefix`], but records
594    /// are deserialized and passed to `callback` one at a time rather than
595    /// collected into a `Vec`. Callers can begin processing (e.g. printing to
596    /// stdout) before the full scan completes, giving time-to-first-row
597    /// latency proportional to a single deserialization rather than the full
598    /// scan.
599    ///
600    /// Return order is lexicographic (underlying KV order). Callers that need
601    /// a different order must collect and sort after the fact.
602    pub async fn scan_prefix_each<F>(&self, prefix: &str, mut callback: F) -> Result<()>
603    where
604        F: FnMut(Record),
605    {
606        let tree = self.tree_for(prefix);
607        let txn = tree.begin_with_mode(Mode::ReadOnly)?;
608        let end = prefix_end(prefix);
609        let mut cursor = txn.range(prefix.as_bytes(), end.as_bytes())?;
610        while cursor.next()? {
611            let bytes = cursor.value()?;
612            match rmps::from_slice::<Record>(&bytes) {
613                Ok(record) => callback(record),
614                Err(e) => {
615                    tracing::warn!("skipping malformed record during scan: {e}");
616                }
617            }
618        }
619        Ok(())
620    }
621
622    /// Full-text BM25 search over all indexed records.
623    ///
624    /// Calls tantivy for the top `limit` matching keys, then fetches each full
625    /// record from SurrealKV. Keys that tantivy returns but are not found in
626    /// the store (e.g. deleted since last commit) are silently skipped.
627    ///
628    /// Returns results ordered by descending BM25 relevance score. Returns an
629    /// empty `Vec` when `text` is blank or `limit` is 0.
630    pub async fn search(&self, text: &str, limit: usize) -> Result<Vec<Record>> {
631        let search = self.ensure_search()?;
632        let keys = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
633            search.query_keys(text, limit)
634        })) {
635            Ok(result) => result?,
636            Err(_panic) => {
637                tracing::error!("search index panicked during query — returning empty results");
638                return Ok(vec![]);
639            }
640        };
641        let mut records = Vec::with_capacity(keys.len());
642        for key in &keys {
643            if let Some(record) = self.get(key).await? {
644                records.push(record);
645            }
646        }
647        Ok(records)
648    }
649
650    /// Full-text BM25 search returning `(score, Record)` pairs.
651    ///
652    /// Same semantics as [`Self::search`] but preserves the raw BM25
653    /// relevance score from tantivy. Used by `mem_query` text mode to
654    /// include relevance in the agent-facing response.
655    pub async fn search_scored(&self, text: &str, limit: usize) -> Result<Vec<(f32, Record)>> {
656        let search = self.ensure_search()?;
657        let scored_keys = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
658            search.query_keys_scored(text, limit)
659        })) {
660            Ok(result) => result?,
661            Err(_panic) => {
662                tracing::error!(
663                    "search index panicked during scored query — returning empty results"
664                );
665                return Ok(vec![]);
666            }
667        };
668        let mut results = Vec::with_capacity(scored_keys.len());
669        for (score, key) in &scored_keys {
670            if let Some(record) = self.get(key).await? {
671                results.push((*score, record));
672            }
673        }
674        Ok(results)
675    }
676
677    /// Read raw bytes by key. Returns `None` if the key does not exist.
678    ///
679    /// Counterpart to [`Self::put_raw`]. Used for structural metadata,
680    /// enforcement events, and other non-Record values stored as raw bytes.
681    pub async fn get_raw_bytes(&self, key: &str) -> Result<Option<Vec<u8>>> {
682        let tree = self.tree_for(key);
683        let txn = tree.begin_with_mode(Mode::ReadOnly)?;
684        match txn.get(key.as_bytes())? {
685            None => Ok(None),
686            Some(bytes) => Ok(Some(bytes.to_vec())),
687        }
688    }
689
690    /// Write raw bytes under `key` with automatically routed durability.
691    ///
692    /// Same durability routing as [`Self::put`] — callers do not need to know
693    /// which tree a key belongs to. Use this for structural metadata (graph
694    /// edges, etc.) where the value is not a [`Record`] and does not need to
695    /// be deserialised on reads.
696    pub async fn put_raw(&self, key: &str, value: &[u8]) -> Result<()> {
697        let durability = Durability::for_key(key);
698        let tree = self.tree_for(key);
699        let mut txn = tree.begin_with_mode(Mode::WriteOnly)?;
700        txn.set_durability(skv_durability(durability));
701        txn.set(key.as_bytes(), value.to_vec())?;
702        txn.commit().await?;
703        Ok(())
704    }
705
706    /// Write multiple raw-byte values in a single transaction per durability class.
707    ///
708    /// Same batch semantics as [`Self::put_batch`] (at most 2 fsyncs for the
709    /// whole batch). Use for bulk structural writes like graph edge inserts.
710    pub async fn put_batch_raw(&self, records: &[(&str, &[u8])]) -> Result<()> {
711        if records.is_empty() {
712            return Ok(());
713        }
714
715        let mut immediate: Vec<(&str, &[u8])> = Vec::new();
716        let mut eventual: Vec<(&str, &[u8])> = Vec::new();
717        for &(key, value) in records {
718            match Durability::for_key(key) {
719                Durability::Immediate => immediate.push((key, value)),
720                Durability::Eventual => eventual.push((key, value)),
721            }
722        }
723
724        if !immediate.is_empty() {
725            let mut txn = self.knowledge.begin_with_mode(Mode::WriteOnly)?;
726            txn.set_durability(SkvDurability::Immediate);
727            for (key, value) in &immediate {
728                txn.set(key.as_bytes(), value.to_vec())?;
729            }
730            txn.commit().await?;
731        }
732
733        if !eventual.is_empty() {
734            let mut txn = self.sessions.begin_with_mode(Mode::WriteOnly)?;
735            txn.set_durability(SkvDurability::Eventual);
736            for (key, value) in &eventual {
737                txn.set(key.as_bytes(), value.to_vec())?;
738            }
739            txn.commit().await?;
740        }
741
742        Ok(())
743    }
744
745    // -------------------------------------------------------------------------
746    // Transactional batch writes (mutation + audit atomic commit)
747    //
748    // SurrealKV supports multi-key atomic transactions within a single tree.
749    // The real constraint is mati's two-tree architecture: no single
750    // transaction can span both the knowledge and sessions trees.
751    // -------------------------------------------------------------------------
752
753    /// Atomically commit multiple writes to the knowledge tree in a single
754    /// transaction.
755    ///
756    /// Supports mixed Record + raw byte writes. All keys MUST route to the
757    /// knowledge tree (`Durability::Immediate`). Returns an error if any key
758    /// routes to sessions.
759    ///
760    /// Handles crash-fence + tantivy sync + write-seq after commit.
761    /// Use this for mutation + audit atomic commit on knowledge-side commands.
762    pub async fn transact_knowledge(&self, ops: &[KnowledgeWriteOp<'_>]) -> Result<()> {
763        if ops.is_empty() {
764            return Ok(());
765        }
766        for op in ops {
767            let k = match op {
768                KnowledgeWriteOp::PutRecord { key, .. } => *key,
769                KnowledgeWriteOp::PutRaw { key, .. } => *key,
770            };
771            if Durability::for_key(k) != Durability::Immediate {
772                anyhow::bail!(
773                    "transact_knowledge: key '{k}' routes to sessions tree, not knowledge"
774                );
775            }
776        }
777
778        // Collect Records for tantivy sync before committing (we need &Record refs).
779        let mut record_refs: Vec<&Record> = Vec::new();
780
781        let mut txn = self.knowledge.begin_with_mode(Mode::WriteOnly)?;
782        txn.set_durability(SkvDurability::Immediate);
783        for op in ops {
784            match op {
785                KnowledgeWriteOp::PutRecord { key, record } => {
786                    let bytes = rmps::to_vec_named(record)
787                        .with_context(|| format!("failed to serialize record for key '{key}'"))?;
788                    txn.set(key.as_bytes(), bytes)?;
789                    record_refs.push(record);
790                }
791                KnowledgeWriteOp::PutRaw { key, value } => {
792                    txn.set(key.as_bytes(), value.to_vec())?;
793                }
794            }
795        }
796        txn.commit().await?;
797
798        // Crash-fence + tantivy sync (same pattern as put/put_batch).
799        let has_knowledge = ops.iter().any(|op| {
800            let k = match op {
801                KnowledgeWriteOp::PutRecord { key, .. } => key,
802                KnowledgeWriteOp::PutRaw { key, .. } => key,
803            };
804            is_knowledge_key(k)
805        });
806        if has_knowledge {
807            let _ = std::fs::write(self.root.join(SEARCH_SYNC_PENDING), b"");
808        }
809        let mut search_synced = false;
810        if !record_refs.is_empty() {
811            if let Ok(search) = self.ensure_search() {
812                match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
813                    search.add_records(&record_refs)
814                })) {
815                    Ok(Ok(_)) => search_synced = true,
816                    Ok(Err(e)) => tracing::warn!("transact_knowledge: tantivy sync failed: {e}"),
817                    Err(_) => tracing::error!("transact_knowledge: tantivy panicked"),
818                }
819            }
820        }
821        if has_knowledge {
822            self.bump_write_seq();
823            if search_synced {
824                let _ = std::fs::remove_file(self.root.join(SEARCH_SYNC_PENDING));
825            }
826        }
827        Ok(())
828    }
829
830    /// Atomically commit multiple raw byte writes to the sessions tree in a
831    /// single transaction.
832    ///
833    /// All keys MUST route to the sessions tree (`Durability::Eventual`).
834    /// Returns an error if any key routes to knowledge.
835    ///
836    /// Use this for mutation + audit atomic commit on session-side commands.
837    pub async fn transact_sessions_raw(&self, entries: &[(&str, &[u8])]) -> Result<()> {
838        if entries.is_empty() {
839            return Ok(());
840        }
841        for (k, _) in entries {
842            if Durability::for_key(k) != Durability::Eventual {
843                anyhow::bail!(
844                    "transact_sessions_raw: key '{k}' routes to knowledge tree, not sessions"
845                );
846            }
847        }
848
849        let mut txn = self.sessions.begin_with_mode(Mode::WriteOnly)?;
850        txn.set_durability(SkvDurability::Eventual);
851        for (key, value) in entries {
852            txn.set(key.as_bytes(), value.to_vec())?;
853        }
854        txn.commit().await?;
855        Ok(())
856    }
857
858    /// Return all keys whose prefix matches, without deserialising values.
859    ///
860    /// Cheaper than [`Self::scan_prefix`] when only the key is needed (e.g.
861    /// graph edge loading, existence checks). Uses the SurrealKV iterator
862    /// `key().user_key()` path so value bytes are never read from disk.
863    pub async fn scan_keys(&self, prefix: &str) -> Result<Vec<String>> {
864        let tree = self.tree_for(prefix);
865        let txn = tree.begin_with_mode(Mode::ReadOnly)?;
866        let end = prefix_end(prefix);
867        let mut cursor = txn.range(prefix.as_bytes(), end.as_bytes())?;
868
869        let mut keys = Vec::new();
870        while cursor.next()? {
871            let user_key = cursor.key().user_key();
872            match std::str::from_utf8(user_key) {
873                Ok(s) => keys.push(s.to_string()),
874                Err(e) => tracing::warn!("skipping non-UTF8 key in scan_keys: {e}"),
875            }
876        }
877        Ok(keys)
878    }
879
880    // -------------------------------------------------------------------------
881    // History (M-14)
882    // -------------------------------------------------------------------------
883
884    /// Return version history for a single key, newest first.
885    ///
886    /// Includes tombstones (deletions). Uses the tight upper bound `key + \0`
887    /// so adjacent keys never spill into the result set.
888    ///
889    /// `limit` caps the number of entries returned; `0` means unlimited.
890    pub fn history(&self, key: &str, limit: usize) -> Result<Vec<HistoryEntry>> {
891        anyhow::ensure!(!key.is_empty(), "history key must not be empty");
892        let tree = self.tree_for(key);
893        let txn = tree.begin_with_mode(Mode::ReadOnly)?;
894
895        let mut opts = HistoryOptions::new().with_tombstones(true);
896        if limit > 0 {
897            opts = opts.with_limit(limit);
898        }
899
900        history_impl(&txn, key, &opts)
901    }
902
903    /// Return version history for a single key since `since_ts` (seconds),
904    /// newest first.
905    ///
906    /// Timestamps are converted to nanoseconds for the SurrealKV range filter.
907    pub fn history_since(
908        &self,
909        key: &str,
910        since_ts: u64,
911        limit: usize,
912    ) -> Result<Vec<HistoryEntry>> {
913        anyhow::ensure!(!key.is_empty(), "history key must not be empty");
914        let tree = self.tree_for(key);
915        let txn = tree.begin_with_mode(Mode::ReadOnly)?;
916
917        let since_ns = since_ts.saturating_mul(1_000_000_000);
918        let mut opts = HistoryOptions::new()
919            .with_tombstones(true)
920            .with_ts_range(since_ns, u64::MAX);
921        if limit > 0 {
922            opts = opts.with_limit(limit);
923        }
924
925        history_impl(&txn, key, &opts)
926    }
927
928    /// Return all records updated since `since_ts` (seconds), newest first.
929    ///
930    /// Scans every knowledge namespace (including `dep:`) and returns records
931    /// whose `updated_at >= since_ts`. Results are sorted by `updated_at`
932    /// descending with secondary sort by key for deterministic ordering.
933    pub async fn records_since(&self, since_ts: u64, limit: usize) -> Result<Vec<Record>> {
934        let mut results = Vec::new();
935        for ns in KNOWLEDGE_NAMESPACES {
936            let records = self.scan_prefix(ns).await?;
937            for r in records {
938                if r.updated_at >= since_ts {
939                    results.push(r);
940                }
941            }
942        }
943        // Newest first, secondary sort by key for determinism
944        results.sort_by(|a, b| {
945            b.updated_at
946                .cmp(&a.updated_at)
947                .then_with(|| a.key.cmp(&b.key))
948        });
949        if limit > 0 && results.len() > limit {
950            results.truncate(limit);
951        }
952        Ok(results)
953    }
954
955    // -------------------------------------------------------------------------
956    // Lifecycle
957    // -------------------------------------------------------------------------
958
959    /// Flush and close both trees, releasing the LOCK files.
960    ///
961    /// Must be called before dropping `Store` if another process (or test) will
962    /// reopen the same database directory. SurrealKV holds an exclusive lock
963    /// for the lifetime of a `Tree`; reopening without closing first fails with
964    /// "already locked by another process".
965    pub async fn close(self) -> Result<()> {
966        tokio::try_join!(self.knowledge.close(), self.sessions.close())?;
967        // Only close search if it was initialized during this session.
968        if let Some(search) = self.search.into_inner() {
969            search.close()?;
970        }
971        Ok(())
972    }
973
974    /// Best-effort durability flush for shutdown paths.
975    ///
976    /// Calls SurrealKV's `flush_wal(sync=true)` on both trees so every
977    /// previously-committed transaction reaches disk. Non-consuming and
978    /// `&self` — works through a shared `Arc<RwLock<Graph>>` read lock on
979    /// the daemon shutdown path where ownership cannot be reclaimed.
980    ///
981    /// Necessary because SurrealKV's `Tree::Drop` only fire-and-forget-spawns
982    /// `core.close()` onto the current tokio runtime; if the runtime is
983    /// shutting down (signal handler, main return) that spawned task may not
984    /// run before the process exits, losing buffered "Eventual" writes.
985    ///
986    /// Errors are logged via `tracing::warn!` and not propagated — shutdown
987    /// paths must be infallible. Search index pending writes are committed
988    /// per `Search::add_record`/`add_records` call, so no separate flush
989    /// is needed here.
990    pub async fn flush_for_shutdown(&self) {
991        if let Err(e) = self.knowledge.flush_wal(true) {
992            tracing::warn!("flush_for_shutdown: knowledge tree flush failed: {e}");
993        }
994        if let Err(e) = self.sessions.flush_wal(true) {
995            tracing::warn!("flush_for_shutdown: sessions tree flush failed: {e}");
996        }
997    }
998
999    // -------------------------------------------------------------------------
1000    // Health / ping
1001    // -------------------------------------------------------------------------
1002
1003    /// Ping the store. Writes a sentinel key and reads it back; returns
1004    /// round-trip latency in microseconds.
1005    ///
1006    /// Used by `mati ping` and by hook fast-path availability checks.
1007    pub async fn ping(&self) -> Result<u64> {
1008        let start = now_micros();
1009
1010        let sentinel_key = "analytics:ping_probe";
1011        let ts = start.to_string();
1012        let mut txn = self.sessions.begin_with_mode(Mode::WriteOnly)?;
1013        txn.set_durability(SkvDurability::Eventual);
1014        txn.set(sentinel_key.as_bytes(), ts.as_bytes())?;
1015        txn.commit().await?;
1016
1017        let txn = self.sessions.begin_with_mode(Mode::ReadOnly)?;
1018        let result = txn.get(sentinel_key.as_bytes())?;
1019        anyhow::ensure!(
1020            result.is_some(),
1021            "ping sentinel write was not visible on read-back"
1022        );
1023
1024        Ok(now_micros() - start)
1025    }
1026
1027    // -------------------------------------------------------------------------
1028    // Write-seq cache invalidation
1029    // -------------------------------------------------------------------------
1030
1031    /// Path to the monotonic counter file: `~/.mati/<slug>/health_write_seq`.
1032    fn write_seq_path(&self) -> PathBuf {
1033        self.root.join("health_write_seq")
1034    }
1035
1036    /// Read the current knowledge write-sequence counter.
1037    ///
1038    /// Returns `0` if the file does not exist or cannot be parsed — callers
1039    /// treat `0` as "no valid cached snapshot" and recompute.
1040    pub fn read_write_seq(&self) -> u64 {
1041        std::fs::read_to_string(self.write_seq_path())
1042            .ok()
1043            .and_then(|s| s.trim().parse().ok())
1044            .unwrap_or(0)
1045    }
1046
1047    /// Increment the write-seq counter. Called after every knowledge-key write.
1048    ///
1049    /// Best-effort: file write errors are silently discarded — a failed bump
1050    /// causes the next stats call to recompute, which is correct behaviour.
1051    fn bump_write_seq(&self) {
1052        let next = self.read_write_seq().wrapping_add(1);
1053        let _ = std::fs::write(self.write_seq_path(), next.to_string());
1054    }
1055
1056    // -------------------------------------------------------------------------
1057    // Internals
1058    // -------------------------------------------------------------------------
1059
1060    /// Choose the correct tree based on the key's durability class.
1061    fn tree_for(&self, key: &str) -> &Tree {
1062        match Durability::for_key(key) {
1063            Durability::Eventual => &self.sessions,
1064            Durability::Immediate => &self.knowledge,
1065        }
1066    }
1067
1068    /// Direct access to the sessions tree for audit reads in tests.
1069    ///
1070    /// Production code should use the key-routing methods (`get`, `put_raw`,
1071    /// `scan_keys`) rather than accessing trees directly.
1072    pub fn sessions_tree(&self) -> &Tree {
1073        &self.sessions
1074    }
1075}
1076
1077// ---------------------------------------------------------------------------
1078// Tree construction helpers
1079// ---------------------------------------------------------------------------
1080
1081/// If a store open fails and the LOCK file exists, another mati process (MCP
1082/// server or daemon) holds the exclusive SurrealKV lock. Replace the raw OS
1083/// error with an actionable message.
1084/// Improve SurrealKV open errors with actionable context.
1085///
1086/// SurrealKV's LOCK file always exists after first use — it is never deleted.
1087/// The OS-level flock is what prevents concurrent access, not the file's
1088/// existence. So we detect lock contention by checking the *error message*,
1089/// not by checking if the LOCK file exists.
1090fn lock_error_hint(err: anyhow::Error, db_path: &std::path::Path) -> anyhow::Error {
1091    let msg = format!("{err}");
1092    if msg.contains("already locked") || msg.contains("WouldBlock") {
1093        // Real lock contention — another process holds the flock.
1094        // Read the PID from the LOCK file if available.
1095        let lock_file = db_path.join("LOCK");
1096        let pid_hint = std::fs::read_to_string(&lock_file)
1097            .ok()
1098            .and_then(|s| s.trim().parse::<u32>().ok())
1099            .map(|pid| format!(" (holder PID: {pid})"))
1100            .unwrap_or_default();
1101        anyhow::anyhow!(
1102            "cannot open {} — another mati process holds the lock{pid_hint}.\n\
1103             This is usually the MCP server (mati serve) or a background daemon.\n\
1104             To stop the daemon: `mati daemon stop`\n\
1105             To check: `lsof {}/LOCK`",
1106            db_path.display(),
1107            db_path.display()
1108        )
1109    } else {
1110        err
1111    }
1112}
1113
1114fn open_knowledge_tree(path: PathBuf) -> Result<Tree> {
1115    // vlog_value_threshold must be 0 when versioning is enabled — SurrealKV
1116    // requires all values to be in the VLog for time-travel to work.
1117    let opts = Options::new()
1118        .with_path(path)
1119        .with_versioning(true, 0) // indefinite retention
1120        .with_enable_vlog(true)
1121        .with_vlog_value_threshold(0)
1122        .with_vlog_checksum_verification(VLogChecksumLevel::Full);
1123    TreeBuilder::with_options(opts)
1124        .build()
1125        .context("failed to open knowledge.db")
1126}
1127
1128fn open_sessions_tree(path: PathBuf) -> Result<Tree> {
1129    // Same constraint: vlog_value_threshold = 0 required when versioning is on.
1130    // VLogChecksumLevel is intentionally omitted — session writes are high-frequency
1131    // and acceptable to lose on crash. Do not add checksum verification here.
1132    let opts = Options::new()
1133        .with_path(path)
1134        .with_versioning(true, SESSIONS_RETENTION_NS)
1135        .with_enable_vlog(true)
1136        .with_vlog_value_threshold(0);
1137    TreeBuilder::with_options(opts)
1138        .build()
1139        .context("failed to open sessions.db")
1140}
1141
1142// ---------------------------------------------------------------------------
1143// Slug derivation
1144// ---------------------------------------------------------------------------
1145
1146/// Derive a project slug from the repo root.
1147///
1148/// Algorithm (matches ARCHITECTURE.md §22):
1149/// 1. Try to read the first `fetch` remote URL from `.git/config`.
1150/// 2. Fall back to SHA-256 of the canonicalized repo root path.
1151///
1152/// Returns the first 8 hex characters of the SHA-256 digest.
1153pub fn derive_slug(repo_root: &Path) -> String {
1154    // Canonicalize first so `/tmp/...` and `/private/tmp/...` (macOS symlink),
1155    // trailing-slash variants, and `./foo` vs `foo` all resolve to the same
1156    // path before hashing. Without this every CLI invocation that resolves
1157    // paths differently (`current_dir()` vs `canonicalize(arg)`) used to
1158    // produce a different slug, scattering records across sibling
1159    // `~/.mati/<slug>/` directories.
1160    let canon = std::fs::canonicalize(repo_root).unwrap_or_else(|_| repo_root.to_path_buf());
1161
1162    // Walk up to find the nearest `.git/config` so an invocation from a
1163    // subdirectory of the repo produces the same slug as one from the root.
1164    // Pre-fix, `mati show` from `src/store/` and `mati serve` from the repo
1165    // root produced different slugs whenever the subdir lacked a `.git/`.
1166    let git_root = walk_up_for_git_root(&canon).unwrap_or_else(|| canon.clone());
1167
1168    let input =
1169        read_remote_url(&git_root).unwrap_or_else(|| git_root.to_string_lossy().into_owned());
1170
1171    let digest = Sha256::digest(input.as_bytes());
1172    hex::encode(&digest[..4]) // 4 bytes = 8 hex chars
1173}
1174
1175/// Walk `start` and its ancestors looking for a directory that contains a
1176/// `.git/config` file. Returns the first such directory, or `None` if the
1177/// walk reaches the filesystem root without finding one.
1178fn walk_up_for_git_root(start: &Path) -> Option<PathBuf> {
1179    let mut cur = start;
1180    loop {
1181        if cur.join(".git").join("config").is_file() {
1182            return Some(cur.to_path_buf());
1183        }
1184        cur = cur.parent()?;
1185    }
1186}
1187
1188/// Attempt to extract the first `url =` line from `.git/config`.
1189fn read_remote_url(repo_root: &Path) -> Option<String> {
1190    let config = std::fs::read_to_string(repo_root.join(".git").join("config")).ok()?;
1191    config
1192        .lines()
1193        .find(|l| l.trim_start().starts_with("url ="))
1194        .map(|l| {
1195            l.split_once('=')
1196                .map(|(_, v)| v.trim().to_owned())
1197                .unwrap_or_default()
1198        })
1199}
1200
1201// ---------------------------------------------------------------------------
1202// Misc helpers
1203// ---------------------------------------------------------------------------
1204
1205/// Read and deserialize a record from an active transaction.
1206fn read_record(txn: &Transaction, key: &str) -> Result<Option<Record>> {
1207    match txn.get(key.as_bytes())? {
1208        None => Ok(None),
1209        Some(bytes) => {
1210            let record = rmps::from_slice::<Record>(&bytes)
1211                .with_context(|| format!("corrupt record at key '{key}'"))?;
1212            Ok(Some(record))
1213        }
1214    }
1215}
1216
1217/// Map mati's `Durability` enum to SurrealKV's `Durability`.
1218fn skv_durability(d: Durability) -> SkvDurability {
1219    match d {
1220        Durability::Immediate => SkvDurability::Immediate,
1221        Durability::Eventual => SkvDurability::Eventual,
1222    }
1223}
1224
1225/// Return the smallest string that is lexicographically greater than all keys
1226/// starting with `prefix`. Used to form the exclusive upper bound for range
1227/// scans.
1228fn prefix_end(prefix: &str) -> String {
1229    let mut bytes = prefix.as_bytes().to_vec();
1230    // Increment the last byte; if it wraps (0xff → 0x00) keep carrying.
1231    for b in bytes.iter_mut().rev() {
1232        if *b < 0xff {
1233            *b += 1;
1234            return String::from_utf8(bytes).unwrap_or_else(|_| "\u{ffff}".to_owned());
1235        }
1236        *b = 0x00;
1237    }
1238    // All bytes were 0xff — no upper bound needed; use a sentinel
1239    "\u{ffff}".to_owned()
1240}
1241
1242/// A single versioned entry from the SurrealKV history iterator.
1243///
1244/// Timestamps come from SurrealKV's internal clock (nanoseconds since epoch).
1245/// Both seconds and nanoseconds are exposed for callers that need either
1246/// precision level.
1247#[derive(Debug, Clone, Serialize, Deserialize)]
1248pub struct HistoryEntry {
1249    /// Timestamp in whole seconds (nanosecond timestamp / 1_000_000_000).
1250    pub timestamp_secs: u64,
1251    /// Raw nanosecond timestamp from SurrealKV.
1252    pub timestamp_ns: u64,
1253    /// Deserialized record, `None` for tombstones or corrupt values.
1254    pub record: Option<Record>,
1255    /// `true` when this version represents a deletion.
1256    pub is_tombstone: bool,
1257}
1258
1259/// Shared synchronous implementation for key history queries.
1260///
1261/// Iterates all versions of `key` using `history_with_options` with the tight
1262/// upper bound `key + \0` (not `prefix_end`) to guarantee no adjacent key
1263/// spills. Returns entries sorted newest first.
1264fn history_impl(txn: &Transaction, key: &str, opts: &HistoryOptions) -> Result<Vec<HistoryEntry>> {
1265    // Upper bound: key + NUL byte — tighter than prefix_end which increments
1266    // the last byte. This ensures only exact-key versions are returned.
1267    let mut upper = key.as_bytes().to_vec();
1268    upper.push(0x00);
1269
1270    let mut cursor = txn.history_with_options(key.as_bytes(), upper.as_slice(), opts)?;
1271
1272    let mut entries = Vec::new();
1273    while cursor.next()? {
1274        let key_ref = cursor.key();
1275
1276        // Guard: only process entries whose user_key matches exactly
1277        if key_ref.user_key() != key.as_bytes() {
1278            continue;
1279        }
1280
1281        let is_tombstone = key_ref.is_tombstone();
1282        let ts_ns = key_ref.timestamp();
1283        let ts_secs = ts_ns / 1_000_000_000;
1284
1285        let record = if is_tombstone {
1286            None
1287        } else {
1288            match cursor.value() {
1289                Ok(bytes) => rmps::from_slice::<Record>(&bytes).ok(),
1290                Err(_) => None,
1291            }
1292        };
1293
1294        entries.push(HistoryEntry {
1295            timestamp_secs: ts_secs,
1296            timestamp_ns: ts_ns,
1297            record,
1298            is_tombstone,
1299        });
1300    }
1301
1302    // Newest first — SurrealKV history iterator order is not guaranteed to be
1303    // reverse-chronological, so sort explicitly.
1304    entries.sort_by_key(|e| std::cmp::Reverse(e.timestamp_ns));
1305    Ok(entries)
1306}
1307
1308/// Current time in microseconds since UNIX epoch.
1309fn now_micros() -> u64 {
1310    SystemTime::now()
1311        .duration_since(UNIX_EPOCH)
1312        .map(|d| d.as_micros() as u64)
1313        .unwrap_or(0)
1314}
1315
1316// ---------------------------------------------------------------------------
1317// Tests
1318// ---------------------------------------------------------------------------
1319
1320#[cfg(test)]
1321mod tests {
1322    use super::*;
1323    use tempfile::TempDir;
1324
1325    // Helper: open a store backed by a temp directory (no real git repo needed)
1326    fn temp_store() -> (Store, TempDir) {
1327        let dir = TempDir::new().unwrap();
1328        // Override slug derivation by constructing store path manually
1329        let root = dir.path().join("mati_test");
1330        std::fs::create_dir_all(&root).unwrap();
1331        let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
1332        let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
1333        let search = OnceCell::new();
1334        let _ = search.set(Search::open(&root.join("search_index")).unwrap());
1335        let store = Store {
1336            knowledge,
1337            sessions,
1338            search,
1339            root: root.clone(),
1340            index_needs_rebuild: false,
1341        };
1342        (store, dir)
1343    }
1344
1345    #[tokio::test]
1346    async fn ping_roundtrip() {
1347        let (store, _dir) = temp_store();
1348        let latency = store.ping().await.unwrap();
1349        assert!(latency < 5_000_000, "ping took >5s: {latency}µs");
1350    }
1351
1352    #[tokio::test]
1353    async fn put_get_roundtrip() {
1354        use crate::store::record::{
1355            Category, ConfidenceScore, Priority, QualityScore, Record, RecordLifecycle,
1356            RecordSource, RecordVersion, StalenessScore,
1357        };
1358        use uuid::Uuid;
1359
1360        let (store, _dir) = temp_store();
1361
1362        let device_id = Uuid::new_v4();
1363        let record = Record {
1364            key: "gotcha:test-key".to_string(),
1365            value: "test value".to_string(),
1366            category: Category::Gotcha,
1367            priority: Priority::High,
1368            tags: vec!["test".to_string()],
1369            created_at: 0,
1370            updated_at: 0,
1371            ref_url: None,
1372            staleness: StalenessScore::fresh(),
1373            lifecycle: RecordLifecycle::Active,
1374            version: RecordVersion {
1375                device_id,
1376                logical_clock: 1,
1377                wall_clock: 0,
1378            },
1379            quality: QualityScore::layer0_default(),
1380            access_count: 0,
1381            last_accessed: 0,
1382            source: RecordSource::StaticAnalysis,
1383            confidence: ConfidenceScore::for_new_record(&RecordSource::StaticAnalysis),
1384            gap_analysis_score: 0.0,
1385            payload: None,
1386        };
1387
1388        store.put("gotcha:test-key", &record).await.unwrap();
1389        let got = store.get("gotcha:test-key").await.unwrap();
1390        assert!(got.is_some());
1391        assert_eq!(got.unwrap().key, "gotcha:test-key");
1392    }
1393
1394    #[tokio::test]
1395    async fn put_delete_get_returns_none() {
1396        use crate::store::record::{
1397            Category, ConfidenceScore, Priority, QualityScore, Record, RecordLifecycle,
1398            RecordSource, RecordVersion, StalenessScore,
1399        };
1400        use uuid::Uuid;
1401
1402        let (store, _dir) = temp_store();
1403
1404        let device_id = Uuid::new_v4();
1405        let record = Record {
1406            key: "file:src/main.rs".to_string(),
1407            value: "entry point".to_string(),
1408            category: Category::File,
1409            priority: Priority::Normal,
1410            tags: vec![],
1411            created_at: 0,
1412            updated_at: 0,
1413            ref_url: None,
1414            staleness: StalenessScore::fresh(),
1415            lifecycle: RecordLifecycle::Active,
1416            version: RecordVersion {
1417                device_id,
1418                logical_clock: 1,
1419                wall_clock: 0,
1420            },
1421            quality: QualityScore::layer0_default(),
1422            access_count: 0,
1423            last_accessed: 0,
1424            source: RecordSource::StaticAnalysis,
1425            confidence: ConfidenceScore::for_new_record(&RecordSource::StaticAnalysis),
1426            gap_analysis_score: 0.0,
1427            payload: None,
1428        };
1429
1430        store.put("file:src/main.rs", &record).await.unwrap();
1431        store.delete("file:src/main.rs").await.unwrap();
1432        let got = store.get("file:src/main.rs").await.unwrap();
1433        assert!(got.is_none());
1434    }
1435
1436    #[tokio::test]
1437    async fn scan_prefix_returns_matching_keys() {
1438        use crate::store::record::{
1439            Category, ConfidenceScore, Priority, QualityScore, Record, RecordLifecycle,
1440            RecordSource, RecordVersion, StalenessScore,
1441        };
1442        use uuid::Uuid;
1443
1444        let (store, _dir) = temp_store();
1445        let device_id = Uuid::new_v4();
1446
1447        let make_record = |key: &str| Record {
1448            key: key.to_string(),
1449            value: "v".to_string(),
1450            category: Category::Gotcha,
1451            priority: Priority::Normal,
1452            tags: vec![],
1453            created_at: 0,
1454            updated_at: 0,
1455            ref_url: None,
1456            staleness: StalenessScore::fresh(),
1457            lifecycle: RecordLifecycle::Active,
1458            version: RecordVersion {
1459                device_id,
1460                logical_clock: 1,
1461                wall_clock: 0,
1462            },
1463            quality: QualityScore::layer0_default(),
1464            access_count: 0,
1465            last_accessed: 0,
1466            source: RecordSource::StaticAnalysis,
1467            confidence: ConfidenceScore::for_new_record(&RecordSource::StaticAnalysis),
1468            gap_analysis_score: 0.0,
1469            payload: None,
1470        };
1471
1472        store
1473            .put("gotcha:alpha", &make_record("gotcha:alpha"))
1474            .await
1475            .unwrap();
1476        store
1477            .put("gotcha:beta", &make_record("gotcha:beta"))
1478            .await
1479            .unwrap();
1480        store
1481            .put("gotcha:gamma", &make_record("gotcha:gamma"))
1482            .await
1483            .unwrap();
1484        store
1485            .put("file:src/main.rs", &make_record("file:src/main.rs"))
1486            .await
1487            .unwrap();
1488
1489        let results = store.scan_prefix("gotcha:").await.unwrap();
1490        assert_eq!(results.len(), 3);
1491    }
1492
1493    #[tokio::test]
1494    async fn write_100_records_survive_reopen() {
1495        use crate::store::record::{
1496            Category, ConfidenceScore, Priority, QualityScore, Record, RecordLifecycle,
1497            RecordSource, RecordVersion, StalenessScore,
1498        };
1499        use uuid::Uuid;
1500
1501        let dir = TempDir::new().unwrap();
1502        let root = dir.path().join("mati_test");
1503        std::fs::create_dir_all(&root).unwrap();
1504        let device_id = Uuid::new_v4();
1505
1506        let make_record = |i: usize| {
1507            let key = format!("gotcha:item-{i:03}");
1508            Record {
1509                key: key.clone(),
1510                value: format!("value {i}"),
1511                category: Category::Gotcha,
1512                priority: Priority::Normal,
1513                tags: vec![],
1514                created_at: i as u64,
1515                updated_at: i as u64,
1516                ref_url: None,
1517                staleness: StalenessScore::fresh(),
1518                lifecycle: RecordLifecycle::Active,
1519                version: RecordVersion {
1520                    device_id,
1521                    logical_clock: i as u64,
1522                    wall_clock: i as u64,
1523                },
1524                quality: QualityScore::layer0_default(),
1525                access_count: 0,
1526                last_accessed: 0,
1527                source: RecordSource::StaticAnalysis,
1528                confidence: ConfidenceScore::for_new_record(&RecordSource::StaticAnalysis),
1529                gap_analysis_score: 0.0,
1530                payload: None,
1531            }
1532        };
1533
1534        // Write 100 records, then explicitly close to release LOCK.
1535        {
1536            let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
1537            let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
1538            let search = OnceCell::new();
1539            let _ = search.set(Search::open(&root.join("search_index")).unwrap());
1540            let store = Store {
1541                knowledge,
1542                sessions,
1543                search,
1544                root: root.clone(),
1545                index_needs_rebuild: false,
1546            };
1547            for i in 0..100 {
1548                let r = make_record(i);
1549                store.put(&r.key, &r).await.unwrap();
1550            }
1551            store.close().await.unwrap();
1552        }
1553
1554        // Reopen and verify all 100 are present.
1555        {
1556            let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
1557            let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
1558            let search = OnceCell::new();
1559            let _ = search.set(Search::open(&root.join("search_index")).unwrap());
1560            let store = Store {
1561                knowledge,
1562                sessions,
1563                search,
1564                root: root.clone(),
1565                index_needs_rebuild: false,
1566            };
1567            let results = store.scan_prefix("gotcha:").await.unwrap();
1568            assert_eq!(
1569                results.len(),
1570                100,
1571                "expected 100 records after reopen, got {}",
1572                results.len()
1573            );
1574            store.close().await.unwrap();
1575        }
1576    }
1577
1578    #[test]
1579    fn slug_is_8_hex_chars() {
1580        let slug = derive_slug(Path::new("/some/repo"));
1581        assert_eq!(slug.len(), 8);
1582        assert!(slug.chars().all(|c| c.is_ascii_hexdigit()));
1583    }
1584
1585    #[test]
1586    fn slug_is_deterministic() {
1587        let a = derive_slug(Path::new("/some/repo"));
1588        let b = derive_slug(Path::new("/some/repo"));
1589        assert_eq!(a, b);
1590    }
1591
1592    #[test]
1593    fn prefix_end_increments_last_byte() {
1594        // ':' is ASCII 58; incrementing gives ';' (59) → "gotcha;"
1595        assert_eq!(prefix_end("gotcha:"), "gotcha;");
1596        // All 0xff bytes — falls back to sentinel
1597        let all_ff = String::from_utf8(vec![0xff, 0xff]).unwrap_or_default();
1598        let end = prefix_end(&all_ff);
1599        assert_eq!(end, "\u{ffff}");
1600    }
1601
1602    // ─── Shared helper ────────────────────────────────────────────────────────
1603
1604    fn make_record(key: &str) -> Record {
1605        use crate::store::record::{
1606            Category, ConfidenceScore, Priority, QualityScore, RecordLifecycle, RecordSource,
1607            RecordVersion, StalenessScore,
1608        };
1609        Record {
1610            key: key.to_string(),
1611            value: format!("value for {key}"),
1612            category: Category::Gotcha,
1613            priority: Priority::Normal,
1614            tags: vec![],
1615            created_at: 0,
1616            updated_at: 0,
1617            ref_url: None,
1618            staleness: StalenessScore::fresh(),
1619            lifecycle: RecordLifecycle::Active,
1620            version: RecordVersion {
1621                device_id: uuid::Uuid::new_v4(),
1622                logical_clock: 1,
1623                wall_clock: 0,
1624            },
1625            quality: QualityScore::layer0_default(),
1626            access_count: 0,
1627            last_accessed: 0,
1628            source: RecordSource::StaticAnalysis,
1629            confidence: ConfidenceScore::for_new_record(&RecordSource::StaticAnalysis),
1630            gap_analysis_score: 0.0,
1631            payload: None,
1632        }
1633    }
1634
1635    // ─── get / put / delete ────────────────────────────────────────────────────
1636
1637    #[tokio::test]
1638    async fn get_never_written_key_returns_none() {
1639        let (store, _dir) = temp_store();
1640        assert!(store.get("gotcha:does-not-exist").await.unwrap().is_none());
1641    }
1642
1643    #[tokio::test]
1644    async fn put_twice_second_value_wins() {
1645        let (store, _dir) = temp_store();
1646        let mut r = make_record("gotcha:overwrite-me");
1647        store.put("gotcha:overwrite-me", &r).await.unwrap();
1648        r.value = "updated value".to_string();
1649        r.version.logical_clock = 2;
1650        store.put("gotcha:overwrite-me", &r).await.unwrap();
1651        let got = store.get("gotcha:overwrite-me").await.unwrap().unwrap();
1652        assert_eq!(got.value, "updated value", "second write must win");
1653        assert_eq!(got.version.logical_clock, 2);
1654    }
1655
1656    #[tokio::test]
1657    async fn delete_nonexistent_key_is_noop() {
1658        let (store, _dir) = temp_store();
1659        store.delete("gotcha:never-existed").await.unwrap();
1660        assert!(store.get("gotcha:never-existed").await.unwrap().is_none());
1661    }
1662
1663    #[tokio::test]
1664    async fn delete_does_not_remove_sibling_keys() {
1665        let (store, _dir) = temp_store();
1666        store
1667            .put("gotcha:keep", &make_record("gotcha:keep"))
1668            .await
1669            .unwrap();
1670        store
1671            .put("gotcha:remove", &make_record("gotcha:remove"))
1672            .await
1673            .unwrap();
1674        store.delete("gotcha:remove").await.unwrap();
1675        assert!(
1676            store.get("gotcha:keep").await.unwrap().is_some(),
1677            "sibling must survive"
1678        );
1679        assert!(store.get("gotcha:remove").await.unwrap().is_none());
1680    }
1681
1682    // ─── scan_prefix isolation ─────────────────────────────────────────────────
1683
1684    #[tokio::test]
1685    async fn scan_prefix_empty_result() {
1686        let (store, _dir) = temp_store();
1687        assert!(store.scan_prefix("gotcha:").await.unwrap().is_empty());
1688    }
1689
1690    #[tokio::test]
1691    async fn scan_prefix_does_not_spill_across_namespaces() {
1692        let (store, _dir) = temp_store();
1693        store
1694            .put("gotcha:alpha", &make_record("gotcha:alpha"))
1695            .await
1696            .unwrap();
1697        store
1698            .put("file:src/main.rs", &make_record("file:src/main.rs"))
1699            .await
1700            .unwrap();
1701        store
1702            .put("decision:arch", &make_record("decision:arch"))
1703            .await
1704            .unwrap();
1705
1706        let gotcha = store.scan_prefix("gotcha:").await.unwrap();
1707        assert_eq!(gotcha.len(), 1);
1708        assert_eq!(gotcha[0].key, "gotcha:alpha");
1709
1710        let file = store.scan_prefix("file:").await.unwrap();
1711        assert_eq!(file.len(), 1);
1712        assert_eq!(file[0].key, "file:src/main.rs");
1713
1714        let decision = store.scan_prefix("decision:").await.unwrap();
1715        assert_eq!(decision.len(), 1);
1716        assert_eq!(decision[0].key, "decision:arch");
1717    }
1718
1719    #[tokio::test]
1720    async fn scan_prefix_values_match_stored_values() {
1721        let (store, _dir) = temp_store();
1722        for key in ["gotcha:alpha", "gotcha:beta", "gotcha:gamma"] {
1723            let mut r = make_record(key);
1724            r.value = format!("sentinel:{key}");
1725            store.put(key, &r).await.unwrap();
1726        }
1727        let mut results = store.scan_prefix("gotcha:").await.unwrap();
1728        results.sort_by(|a, b| a.key.cmp(&b.key));
1729        assert_eq!(results.len(), 3);
1730        for r in &results {
1731            assert_eq!(
1732                r.value,
1733                format!("sentinel:{}", r.key),
1734                "value mismatch for key '{}'",
1735                r.key
1736            );
1737        }
1738    }
1739
1740    #[tokio::test]
1741    async fn scan_prefix_excludes_adjacent_namespaces() {
1742        // prefix_end("gotcha:") == "gotcha;" — "decision:" and "file:" fall outside.
1743        let (store, _dir) = temp_store();
1744        store
1745            .put("gotcha:real", &make_record("gotcha:real"))
1746            .await
1747            .unwrap();
1748        store
1749            .put("decision:before", &make_record("decision:before"))
1750            .await
1751            .unwrap();
1752        store
1753            .put("file:after", &make_record("file:after"))
1754            .await
1755            .unwrap();
1756
1757        let results = store.scan_prefix("gotcha:").await.unwrap();
1758        assert_eq!(results.len(), 1, "only gotcha: keys should appear");
1759        assert_eq!(results[0].key, "gotcha:real");
1760    }
1761
1762    // ─── cross-tree isolation ──────────────────────────────────────────────────
1763
1764    #[tokio::test]
1765    async fn knowledge_and_session_trees_are_isolated() {
1766        let (store, _dir) = temp_store();
1767        store
1768            .put("gotcha:in-knowledge", &make_record("gotcha:in-knowledge"))
1769            .await
1770            .unwrap();
1771        store
1772            .put("session:12345", &make_record("session:12345"))
1773            .await
1774            .unwrap();
1775
1776        let gotcha_results = store.scan_prefix("gotcha:").await.unwrap();
1777        let session_results = store.scan_prefix("session:").await.unwrap();
1778
1779        assert_eq!(gotcha_results.len(), 1);
1780        assert_eq!(gotcha_results[0].key, "gotcha:in-knowledge");
1781        assert_eq!(session_results.len(), 1);
1782        assert_eq!(session_results[0].key, "session:12345");
1783        assert!(
1784            gotcha_results
1785                .iter()
1786                .all(|r| !r.key.starts_with("session:")),
1787            "session records must not appear in gotcha: scan"
1788        );
1789        assert!(
1790            session_results
1791                .iter()
1792                .all(|r| !r.key.starts_with("gotcha:")),
1793            "gotcha records must not appear in session: scan"
1794        );
1795    }
1796
1797    // ─── corrupt record tolerance ──────────────────────────────────────────────
1798
1799    #[tokio::test]
1800    async fn scan_prefix_skips_corrupt_records_and_returns_valid_ones() {
1801        let (store, _dir) = temp_store();
1802        store
1803            .put("gotcha:good", &make_record("gotcha:good"))
1804            .await
1805            .unwrap();
1806
1807        // Inject garbage bytes directly — simulates disk corruption or schema mismatch.
1808        {
1809            let mut txn = store.knowledge.begin().unwrap();
1810            txn.set_durability(SkvDurability::Immediate);
1811            txn.set(b"gotcha:corrupted", b"not valid json {{{").unwrap();
1812            txn.commit().await.unwrap();
1813        }
1814
1815        let results = store.scan_prefix("gotcha:").await.unwrap();
1816        assert_eq!(results.len(), 1, "corrupt record must be silently skipped");
1817        assert_eq!(results[0].key, "gotcha:good");
1818    }
1819
1820    #[tokio::test]
1821    async fn scan_prefix_all_corrupt_returns_empty_not_panic() {
1822        let (store, _dir) = temp_store();
1823        {
1824            let mut txn = store.knowledge.begin().unwrap();
1825            txn.set_durability(SkvDurability::Immediate);
1826            txn.set(b"gotcha:bad1", b"null").unwrap();
1827            txn.set(b"gotcha:bad2", b"{\"x\":1}").unwrap(); // valid JSON, wrong shape
1828            txn.commit().await.unwrap();
1829        }
1830        let results = store.scan_prefix("gotcha:").await.unwrap();
1831        assert_eq!(
1832            results.len(),
1833            0,
1834            "all corrupt — must return empty, not panic"
1835        );
1836    }
1837
1838    // ─── ping ──────────────────────────────────────────────────────────────────
1839
1840    #[tokio::test]
1841    async fn ping_multiple_calls_all_succeed() {
1842        let (store, _dir) = temp_store();
1843        for i in 0..10 {
1844            let latency = store
1845                .ping()
1846                .await
1847                .unwrap_or_else(|e| panic!("ping #{i} failed: {e}"));
1848            assert!(latency < 5_000_000, "ping #{i} took >5 s: {latency} µs");
1849        }
1850    }
1851
1852    // ─── slug derivation ───────────────────────────────────────────────────────
1853
1854    #[test]
1855    fn slug_differs_for_different_paths() {
1856        let a = derive_slug(Path::new("/repo/project-alpha"));
1857        let b = derive_slug(Path::new("/repo/project-beta"));
1858        assert_ne!(a, b, "distinct paths must produce distinct slugs");
1859    }
1860
1861    #[test]
1862    fn slug_uses_remote_url_not_local_path() {
1863        // Verify the slug is actually derived from the URL, not the filesystem path.
1864        // We know the algorithm: first 8 hex chars of SHA-256(url).
1865        let url = "https://github.com/example/mati.git";
1866        let expected_slug = {
1867            let digest = Sha256::digest(url.as_bytes());
1868            hex::encode(&digest[..4])
1869        };
1870
1871        let dir = tempfile::TempDir::new().unwrap();
1872        let git_dir = dir.path().join(".git");
1873        std::fs::create_dir_all(&git_dir).unwrap();
1874        std::fs::write(
1875            git_dir.join("config"),
1876            format!("[remote \"origin\"]\n\turl = {url}\n"),
1877        )
1878        .unwrap();
1879
1880        let actual_slug = derive_slug(dir.path());
1881        assert_eq!(
1882            actual_slug, expected_slug,
1883            "slug must equal SHA-256(remote URL)[0..4] hex"
1884        );
1885
1886        // Also verify the path-derived slug for the same dir would differ
1887        // (i.e., the URL was actually preferred over the path).
1888        let path_slug = {
1889            let input = dir.path().to_string_lossy().into_owned();
1890            let digest = Sha256::digest(input.as_bytes());
1891            hex::encode(&digest[..4])
1892        };
1893        assert_ne!(
1894            actual_slug, path_slug,
1895            "URL slug must differ from the path slug for the same directory"
1896        );
1897    }
1898
1899    #[test]
1900    fn slug_is_stable_for_identical_remote_urls() {
1901        let make_repo = |url: &str| {
1902            let dir = tempfile::TempDir::new().unwrap();
1903            let git_dir = dir.path().join(".git");
1904            std::fs::create_dir_all(&git_dir).unwrap();
1905            std::fs::write(
1906                git_dir.join("config"),
1907                format!("[remote \"origin\"]\n\turl = {url}\n"),
1908            )
1909            .unwrap();
1910            (derive_slug(dir.path()), dir)
1911        };
1912        let (slug_a, _dir_a) = make_repo("https://github.com/example/same-repo.git");
1913        let (slug_b, _dir_b) = make_repo("https://github.com/example/same-repo.git");
1914        assert_eq!(
1915            slug_a, slug_b,
1916            "same remote URL must always produce the same slug"
1917        );
1918    }
1919
1920    #[test]
1921    fn slug_differs_for_different_remote_urls() {
1922        let make_repo = |url: &str| {
1923            let dir = tempfile::TempDir::new().unwrap();
1924            let git_dir = dir.path().join(".git");
1925            std::fs::create_dir_all(&git_dir).unwrap();
1926            std::fs::write(
1927                git_dir.join("config"),
1928                format!("[remote \"origin\"]\n\turl = {url}\n"),
1929            )
1930            .unwrap();
1931            (derive_slug(dir.path()), dir)
1932        };
1933        let (slug_a, _dir_a) = make_repo("https://github.com/org/repo-alpha.git");
1934        let (slug_b, _dir_b) = make_repo("https://github.com/org/repo-beta.git");
1935        assert_ne!(
1936            slug_a, slug_b,
1937            "different remote URLs must produce different slugs"
1938        );
1939    }
1940
1941    // ─── prefix_end edge cases ─────────────────────────────────────────────────
1942
1943    #[test]
1944    fn prefix_end_empty_prefix_returns_sentinel() {
1945        // No bytes to increment → sentinel covers the whole keyspace.
1946        assert_eq!(prefix_end(""), "\u{ffff}");
1947    }
1948
1949    #[test]
1950    fn prefix_end_single_ascii_char() {
1951        assert_eq!(prefix_end("a"), "b"); // 0x61 → 0x62
1952        assert_eq!(prefix_end("z"), "{"); // 0x7a → 0x7b
1953    }
1954
1955    #[test]
1956    fn prefix_end_known_namespace_boundaries() {
1957        // ':' (0x3a) + 1 = ';' (0x3b) for every namespace prefix.
1958        assert_eq!(prefix_end("gotcha:"), "gotcha;");
1959        assert_eq!(prefix_end("file:"), "file;");
1960        assert_eq!(prefix_end("decision:"), "decision;");
1961        assert_eq!(prefix_end("session:"), "session;");
1962    }
1963
1964    // ─── delete + scan interaction ─────────────────────────────────────────────
1965
1966    #[tokio::test]
1967    async fn delete_then_scan_excludes_deleted_key() {
1968        // Phantom-record regression: delete a key, then scan — must not return it.
1969        let (store, _dir) = temp_store();
1970        for key in ["gotcha:a", "gotcha:b", "gotcha:c", "gotcha:d"] {
1971            store.put(key, &make_record(key)).await.unwrap();
1972        }
1973        store.delete("gotcha:b").await.unwrap();
1974        store.delete("gotcha:d").await.unwrap();
1975
1976        let results = store.scan_prefix("gotcha:").await.unwrap();
1977        assert_eq!(results.len(), 2, "deleted keys must not appear in scan");
1978        let keys: Vec<_> = results.iter().map(|r| r.key.as_str()).collect();
1979        assert!(keys.contains(&"gotcha:a"), "gotcha:a must survive");
1980        assert!(keys.contains(&"gotcha:c"), "gotcha:c must survive");
1981        assert!(!keys.contains(&"gotcha:b"), "gotcha:b must be gone");
1982        assert!(!keys.contains(&"gotcha:d"), "gotcha:d must be gone");
1983    }
1984
1985    // ─── overwrite + scan deduplication ──────────────────────────────────────
1986
1987    #[tokio::test]
1988    async fn overwrite_does_not_create_duplicate_in_scan() {
1989        // If SurrealKV MVCC versioning misbehaves, an overwrite could produce
1990        // two versions both visible under the same key during a range scan.
1991        let (store, _dir) = temp_store();
1992        let mut r = make_record("gotcha:dedup-me");
1993        store.put("gotcha:dedup-me", &r).await.unwrap();
1994        r.value = "v2".to_string();
1995        r.version.logical_clock = 2;
1996        store.put("gotcha:dedup-me", &r).await.unwrap();
1997        r.value = "v3".to_string();
1998        r.version.logical_clock = 3;
1999        store.put("gotcha:dedup-me", &r).await.unwrap();
2000
2001        let results = store.scan_prefix("gotcha:").await.unwrap();
2002        assert_eq!(
2003            results.len(),
2004            1,
2005            "3 overwrites of the same key must yield 1 result in scan"
2006        );
2007        assert_eq!(results[0].value, "v3", "scan must return the latest value");
2008        assert_eq!(results[0].version.logical_clock, 3);
2009    }
2010
2011    // ─── full field integrity through the store ────────────────────────────────
2012
2013    #[tokio::test]
2014    async fn put_get_preserves_all_record_fields() {
2015        use crate::store::record::{
2016            Category, ConfidenceScore, Priority, QualityScore, QualitySignal, QualityTier, Record,
2017            RecordLifecycle, RecordSource, RecordVersion, StalenessScore, StalenessSignal,
2018            StalenessTier,
2019        };
2020
2021        let (store, _dir) = temp_store();
2022        let device_id = uuid::Uuid::new_v4();
2023
2024        // Construct a fully-populated record — every non-default field set.
2025        let written = Record {
2026            key: "gotcha:full-fields".to_string(),
2027            value: "Never hold a write txn across an await point.".to_string(),
2028            category: Category::Gotcha,
2029            priority: Priority::Critical,
2030            tags: vec![
2031                "async".to_string(),
2032                "tokio".to_string(),
2033                "surrealkv".to_string(),
2034            ],
2035            created_at: 1_710_520_800,
2036            updated_at: 1_710_520_900,
2037            ref_url: Some("https://github.com/example/issue/99".to_string()),
2038            staleness: StalenessScore {
2039                value: 0.42,
2040                tier: StalenessTier::Stale,
2041                signals: vec![
2042                    StalenessSignal::NotAccessedDays(45),
2043                    StalenessSignal::LinesChangedPct(0.3),
2044                ],
2045                computed_at: 1_710_520_800,
2046                last_record_sha: "abc123def456".to_string(),
2047            },
2048            lifecycle: RecordLifecycle::Active,
2049            version: RecordVersion {
2050                device_id,
2051                logical_clock: 7,
2052                wall_clock: 1_710_520_900,
2053            },
2054            quality: QualityScore {
2055                value: 0.78,
2056                tier: QualityTier::Good,
2057                signals: vec![
2058                    QualitySignal::HasImperativeVerb,
2059                    QualitySignal::HasCausality,
2060                ],
2061                computed_at: 1_710_520_800,
2062            },
2063            access_count: 12,
2064            last_accessed: 1_710_520_888,
2065            source: RecordSource::DeveloperManual,
2066            confidence: ConfidenceScore {
2067                value: 0.75,
2068                confirmation_count: 3,
2069                contributor_count: 2,
2070                last_challenged: Some(1_710_500_000),
2071                challenge_count: 1,
2072            },
2073            gap_analysis_score: 0.31,
2074            payload: None,
2075        };
2076
2077        store.put("gotcha:full-fields", &written).await.unwrap();
2078        let read = store.get("gotcha:full-fields").await.unwrap().unwrap();
2079
2080        // Verify every field survives the store round-trip.
2081        assert_eq!(read.key, written.key);
2082        assert_eq!(read.value, written.value);
2083        assert_eq!(read.category, written.category);
2084        assert_eq!(read.priority, written.priority);
2085        assert_eq!(read.tags, written.tags);
2086        assert_eq!(read.created_at, written.created_at);
2087        assert_eq!(read.updated_at, written.updated_at);
2088        assert_eq!(read.ref_url, written.ref_url);
2089        assert_eq!(read.staleness.tier, written.staleness.tier);
2090        assert_eq!(
2091            read.staleness.last_record_sha,
2092            written.staleness.last_record_sha
2093        );
2094        assert_eq!(read.staleness.signals.len(), 2);
2095        assert_eq!(read.lifecycle, written.lifecycle);
2096        assert_eq!(read.version.device_id, written.version.device_id);
2097        assert_eq!(read.version.logical_clock, written.version.logical_clock);
2098        assert_eq!(read.version.wall_clock, written.version.wall_clock);
2099        assert_eq!(read.quality.tier, written.quality.tier);
2100        assert_eq!(read.quality.signals.len(), 2);
2101        assert_eq!(read.access_count, written.access_count);
2102        assert_eq!(read.last_accessed, written.last_accessed);
2103        assert_eq!(read.source, written.source);
2104        assert_eq!(
2105            read.confidence.confirmation_count,
2106            written.confidence.confirmation_count
2107        );
2108        assert_eq!(
2109            read.confidence.contributor_count,
2110            written.confidence.contributor_count
2111        );
2112        assert_eq!(
2113            read.confidence.last_challenged,
2114            written.confidence.last_challenged
2115        );
2116        assert_eq!(
2117            read.confidence.challenge_count,
2118            written.confidence.challenge_count
2119        );
2120        assert!((read.gap_analysis_score - written.gap_analysis_score).abs() < f32::EPSILON);
2121    }
2122
2123    // ─── Eventual durability persistence ──────────────────────────────────────
2124
2125    #[tokio::test]
2126    async fn eventual_keys_survive_clean_close_and_reopen() {
2127        // The existing reopen test only exercises Immediate (gotcha:) keys.
2128        // This verifies that session: (Eventual) data also persists after close().
2129        let dir = tempfile::TempDir::new().unwrap();
2130        let root = dir.path().join("mati_test");
2131        std::fs::create_dir_all(&root).unwrap();
2132
2133        {
2134            let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
2135            let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
2136            let search = OnceCell::new();
2137            let _ = search.set(Search::open(&root.join("search_index")).unwrap());
2138            let store = Store {
2139                knowledge,
2140                sessions,
2141                search,
2142                root: root.clone(),
2143                index_needs_rebuild: false,
2144            };
2145            for i in 0..10 {
2146                let key = format!("session:{i:04}");
2147                store.put(&key, &make_record(&key)).await.unwrap();
2148            }
2149            store.close().await.unwrap(); // must fsync/flush sessions tree on clean close
2150        }
2151
2152        {
2153            let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
2154            let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
2155            let search = OnceCell::new();
2156            let _ = search.set(Search::open(&root.join("search_index")).unwrap());
2157            let store = Store {
2158                knowledge,
2159                sessions,
2160                search,
2161                root: root.clone(),
2162                index_needs_rebuild: false,
2163            };
2164            let results = store.scan_prefix("session:").await.unwrap();
2165            assert_eq!(
2166                results.len(),
2167                10,
2168                "Eventual session records must survive a clean close+reopen"
2169            );
2170            store.close().await.unwrap();
2171        }
2172    }
2173
2174    // ─── corruption tolerance: corrupt record in the middle ───────────────────
2175
2176    #[tokio::test]
2177    async fn scan_prefix_corrupt_in_middle_does_not_stop_iteration() {
2178        // Regression: if scan stops early on corruption rather than skipping,
2179        // valid records after the corrupt one would silently vanish.
2180        let (store, _dir) = temp_store();
2181
2182        store
2183            .put("gotcha:aaa", &make_record("gotcha:aaa"))
2184            .await
2185            .unwrap(); // before corrupt
2186        store
2187            .put("gotcha:zzz", &make_record("gotcha:zzz"))
2188            .await
2189            .unwrap(); // after corrupt
2190
2191        // Inject corruption lexicographically between the two valid records.
2192        {
2193            let mut txn = store.knowledge.begin().unwrap();
2194            txn.set_durability(SkvDurability::Immediate);
2195            txn.set(b"gotcha:mmm", b"not json").unwrap();
2196            txn.commit().await.unwrap();
2197        }
2198
2199        let results = store.scan_prefix("gotcha:").await.unwrap();
2200        assert_eq!(
2201            results.len(),
2202            2,
2203            "corruption in the middle must not truncate the scan"
2204        );
2205        let keys: Vec<_> = results.iter().map(|r| r.key.as_str()).collect();
2206        assert!(
2207            keys.contains(&"gotcha:aaa"),
2208            "record before corruption must be returned"
2209        );
2210        assert!(
2211            keys.contains(&"gotcha:zzz"),
2212            "record after corruption must be returned"
2213        );
2214    }
2215
2216    // ─── tombstoned lifecycle through the store ───────────────────────────────
2217
2218    #[tokio::test]
2219    async fn tombstoned_record_survives_store_round_trip() {
2220        use crate::store::record::{RecordLifecycle, TombstoneReason};
2221        let (store, _dir) = temp_store();
2222        let mut r = make_record("file:src/deleted.rs");
2223        r.lifecycle = RecordLifecycle::Tombstoned {
2224            reason: TombstoneReason::FileDeleted,
2225            at: 1_710_520_800,
2226        };
2227        store.put("file:src/deleted.rs", &r).await.unwrap();
2228        let got = store.get("file:src/deleted.rs").await.unwrap().unwrap();
2229        match got.lifecycle {
2230            RecordLifecycle::Tombstoned { reason, at } => {
2231                assert_eq!(reason, TombstoneReason::FileDeleted);
2232                assert_eq!(at, 1_710_520_800);
2233            }
2234            other => panic!("expected Tombstoned, got {other:?}"),
2235        }
2236    }
2237
2238    #[tokio::test]
2239    async fn superseded_record_survives_store_round_trip() {
2240        use crate::store::record::RecordLifecycle;
2241        let (store, _dir) = temp_store();
2242        let mut r = make_record("gotcha:old-rule");
2243        r.lifecycle = RecordLifecycle::Superseded {
2244            by_key: "gotcha:new-rule".to_string(),
2245        };
2246        store.put("gotcha:old-rule", &r).await.unwrap();
2247        let got = store.get("gotcha:old-rule").await.unwrap().unwrap();
2248        match got.lifecycle {
2249            RecordLifecycle::Superseded { by_key } => {
2250                assert_eq!(by_key, "gotcha:new-rule");
2251            }
2252            other => panic!("expected Superseded, got {other:?}"),
2253        }
2254    }
2255
2256    // ─── slug: error-recovery path ────────────────────────────────────────────
2257
2258    #[test]
2259    fn slug_with_git_config_but_no_url_line_falls_back_to_path() {
2260        let dir = tempfile::TempDir::new().unwrap();
2261        let git_dir = dir.path().join(".git");
2262        std::fs::create_dir_all(&git_dir).unwrap();
2263        // Valid .git/config, but no `url =` line — read_remote_url returns None.
2264        std::fs::write(
2265            git_dir.join("config"),
2266            "[core]\n\trepositoryformatversion = 0\n\tfilemode = true\n",
2267        )
2268        .unwrap();
2269
2270        let slug = derive_slug(dir.path());
2271        // derive_slug canonicalizes first (resolves /var → /private/var on macOS)
2272        // before falling back to the path hash, so the expected input must match.
2273        let expected = {
2274            let canon = std::fs::canonicalize(dir.path()).unwrap();
2275            let input = canon.to_string_lossy().into_owned();
2276            let digest = Sha256::digest(input.as_bytes());
2277            hex::encode(&digest[..4])
2278        };
2279        assert_eq!(slug, expected, "no url= line must fall back to path hash");
2280    }
2281
2282    #[test]
2283    fn slug_with_no_git_dir_falls_back_to_path() {
2284        let dir = tempfile::TempDir::new().unwrap();
2285        // Completely fresh dir, no .git at all.
2286        let slug = derive_slug(dir.path());
2287        let expected = {
2288            let canon = std::fs::canonicalize(dir.path()).unwrap();
2289            let input = canon.to_string_lossy().into_owned();
2290            let digest = Sha256::digest(input.as_bytes());
2291            hex::encode(&digest[..4])
2292        };
2293        assert_eq!(slug, expected);
2294    }
2295
2296    // ─── prefix_end: invalid UTF-8 after increment ────────────────────────────
2297
2298    #[test]
2299    fn prefix_end_0x7f_byte_increments_to_0x80_which_is_invalid_utf8() {
2300        // 0x7f (DEL) + 1 = 0x80, which is an invalid lone UTF-8 byte.
2301        // from_utf8 will fail → must return the sentinel "\u{ffff}", not panic.
2302        let input = String::from_utf8(vec![0x61, 0x7f]).unwrap(); // "a\x7f"
2303        let result = prefix_end(&input);
2304        // 0x7f increments to 0x80 — invalid UTF-8 → sentinel fallback
2305        assert_eq!(
2306            result, "\u{ffff}",
2307            "increment of 0x7f produces invalid UTF-8; must fall back to sentinel"
2308        );
2309    }
2310
2311    #[test]
2312    fn prefix_end_0xfe_byte_increments_to_0xff_still_invalid_utf8() {
2313        // Similarly, 0xfe → 0xff is also invalid UTF-8.
2314        let input = unsafe { String::from_utf8_unchecked(vec![0x61, 0xfe]) };
2315        let result = prefix_end(&input);
2316        assert_eq!(result, "\u{ffff}");
2317    }
2318
2319    // ─── put_batch ────────────────────────────────────────────────────────────
2320
2321    #[tokio::test]
2322    async fn put_batch_empty_is_noop() {
2323        let (store, _dir) = temp_store();
2324        store.put_batch(&[]).await.unwrap();
2325        assert!(store.scan_prefix("gotcha:").await.unwrap().is_empty());
2326    }
2327
2328    #[tokio::test]
2329    async fn put_batch_single_record_readable() {
2330        let (store, _dir) = temp_store();
2331        let r = make_record("gotcha:batch-single");
2332        store
2333            .put_batch(&[("gotcha:batch-single", &r)])
2334            .await
2335            .unwrap();
2336        let got = store.get("gotcha:batch-single").await.unwrap().unwrap();
2337        assert_eq!(got.key, "gotcha:batch-single");
2338        assert_eq!(got.value, r.value);
2339    }
2340
2341    #[tokio::test]
2342    async fn put_batch_all_records_readable() {
2343        let (store, _dir) = temp_store();
2344        let records: Vec<Record> = (0..10)
2345            .map(|i| make_record(&format!("gotcha:b{i}")))
2346            .collect();
2347        let pairs: Vec<(&str, &Record)> = records.iter().map(|r| (r.key.as_str(), r)).collect();
2348        store.put_batch(&pairs).await.unwrap();
2349        let results = store.scan_prefix("gotcha:b").await.unwrap();
2350        assert_eq!(results.len(), 10);
2351    }
2352
2353    #[tokio::test]
2354    async fn put_batch_mixed_durability_both_trees_written() {
2355        let (store, _dir) = temp_store();
2356        let immediate = make_record("gotcha:imm");
2357        let eventual = make_record("session:evt");
2358        store
2359            .put_batch(&[("gotcha:imm", &immediate), ("session:evt", &eventual)])
2360            .await
2361            .unwrap();
2362        assert!(store.get("gotcha:imm").await.unwrap().is_some());
2363        assert!(store.get("session:evt").await.unwrap().is_some());
2364    }
2365
2366    #[tokio::test]
2367    async fn put_batch_matches_sequential_put_for_same_records() {
2368        let (store_a, _dir_a) = temp_store();
2369        let (store_b, _dir_b) = temp_store();
2370        let records: Vec<Record> = (0..20)
2371            .map(|i| make_record(&format!("file:src/mod{i}.rs")))
2372            .collect();
2373
2374        // Sequential puts.
2375        for r in &records {
2376            store_a.put(&r.key, r).await.unwrap();
2377        }
2378        // Batch put.
2379        let pairs: Vec<(&str, &Record)> = records.iter().map(|r| (r.key.as_str(), r)).collect();
2380        store_b.put_batch(&pairs).await.unwrap();
2381
2382        let a = {
2383            let mut v = store_a.scan_prefix("file:").await.unwrap();
2384            v.sort_by(|x, y| x.key.cmp(&y.key));
2385            v
2386        };
2387        let b = {
2388            let mut v = store_b.scan_prefix("file:").await.unwrap();
2389            v.sort_by(|x, y| x.key.cmp(&y.key));
2390            v
2391        };
2392        assert_eq!(a.len(), b.len());
2393        for (ra, rb) in a.iter().zip(b.iter()) {
2394            assert_eq!(ra.key, rb.key);
2395            assert_eq!(ra.value, rb.value);
2396        }
2397    }
2398
2399    /// 1,200-record batch must be measurably faster than 1,200 sequential puts.
2400    /// This test guards against the batch accidentally falling back to N fsyncs.
2401    ///
2402    /// Ignored by default (~60s). Run with: `cargo test --lib put_batch_1200 -- --ignored`
2403    #[tokio::test]
2404    #[ignore]
2405    async fn put_batch_1200_faster_than_sequential() {
2406        use std::time::Instant;
2407
2408        let (store_seq, _dir_seq) = temp_store();
2409        let (store_bat, _dir_bat) = temp_store();
2410        let records: Vec<Record> = (0..1200)
2411            .map(|i| make_record(&format!("file:src/f{i}.rs")))
2412            .collect();
2413
2414        // Sequential baseline.
2415        let seq_start = Instant::now();
2416        for r in &records {
2417            store_seq.put(&r.key, r).await.unwrap();
2418        }
2419        let seq_ms = seq_start.elapsed().as_millis();
2420
2421        // Batch.
2422        let pairs: Vec<(&str, &Record)> = records.iter().map(|r| (r.key.as_str(), r)).collect();
2423        let bat_start = Instant::now();
2424        store_bat.put_batch(&pairs).await.unwrap();
2425        let bat_ms = bat_start.elapsed().as_millis();
2426
2427        // Batch must be strictly faster (at least 2× in any environment).
2428        assert!(
2429            bat_ms < seq_ms,
2430            "put_batch ({bat_ms}ms) was not faster than sequential puts ({seq_ms}ms)"
2431        );
2432
2433        // Verify all records landed correctly.
2434        let results = store_bat.scan_prefix("file:").await.unwrap();
2435        assert_eq!(results.len(), 1200);
2436    }
2437
2438    // ─── search (M-05-C / M-05-F) ─────────────────────────────────────────────
2439
2440    #[tokio::test]
2441    async fn search_returns_matching_records() {
2442        let (store, _dir) = temp_store();
2443        let mut r = make_record("gotcha:async-race");
2444        r.value = "never use inference inside async context".to_string();
2445        store.put(&r.key, &r).await.unwrap();
2446
2447        let results = store.search("inference", 10).await.unwrap();
2448        assert_eq!(results.len(), 1);
2449        assert_eq!(results[0].key, "gotcha:async-race");
2450    }
2451
2452    #[tokio::test]
2453    async fn search_empty_and_whitespace_query_returns_empty() {
2454        let (store, _dir) = temp_store();
2455        let r = make_record("gotcha:foo");
2456        store.put(&r.key, &r).await.unwrap();
2457        for blank in ["", "  ", "\t", "\n"] {
2458            assert!(
2459                store.search(blank, 10).await.unwrap().is_empty(),
2460                "blank query {blank:?} must return empty"
2461            );
2462        }
2463    }
2464
2465    #[tokio::test]
2466    async fn search_no_match_returns_empty() {
2467        let (store, _dir) = temp_store();
2468        let r = make_record("gotcha:foo");
2469        store.put(&r.key, &r).await.unwrap();
2470        assert!(store
2471            .search("absolutely_no_match_xyzzy99", 10)
2472            .await
2473            .unwrap()
2474            .is_empty());
2475    }
2476
2477    #[tokio::test]
2478    async fn search_malformed_query_returns_partial_not_error() {
2479        // Malformed queries must not propagate Err — lenient parse returns
2480        // best-effort results.
2481        let (store, _dir) = temp_store();
2482        let mut r = make_record("gotcha:async-race");
2483        r.value = "tokio runtime inference race condition".to_string();
2484        store.put(&r.key, &r).await.unwrap();
2485        // "tokio AND" has a trailing operator — must not error
2486        let result = store.search("tokio AND", 10).await;
2487        assert!(result.is_ok(), "malformed query must not return Err");
2488    }
2489
2490    #[tokio::test]
2491    async fn search_limit_caps_results() {
2492        let (store, _dir) = temp_store();
2493        for i in 0..10 {
2494            let mut r = make_record(&format!("gotcha:item-{i:02}"));
2495            r.value = "tokio runtime executor gotcha performance".to_string();
2496            store.put(&r.key, &r).await.unwrap();
2497        }
2498        assert_eq!(store.search("tokio", 1).await.unwrap().len(), 1);
2499        assert_eq!(store.search("tokio", 5).await.unwrap().len(), 5);
2500        assert_eq!(store.search("tokio", 10).await.unwrap().len(), 10);
2501        // limit > total docs must return all docs, not panic or error
2502        assert_eq!(store.search("tokio", 999).await.unwrap().len(), 10);
2503    }
2504
2505    #[tokio::test]
2506    async fn search_deleted_record_not_returned() {
2507        // Delete should evict the tantivy entry too, not just rely on
2508        // post-filtering missing keys after a search hit.
2509        let (store, _dir) = temp_store();
2510        let mut r = make_record("gotcha:deleted");
2511        r.value = "this_unique_sentinel_deleted_record".to_string();
2512        store.put(&r.key, &r).await.unwrap();
2513
2514        // Confirm it is searchable before deletion
2515        assert_eq!(
2516            store
2517                .search("this_unique_sentinel_deleted_record", 10)
2518                .await
2519                .unwrap()
2520                .len(),
2521            1
2522        );
2523
2524        // Delete from SurrealKV (tantivy index still has the entry)
2525        store.delete("gotcha:deleted").await.unwrap();
2526
2527        // Must return empty — the index hit is silently skipped
2528        let results = store
2529            .search("this_unique_sentinel_deleted_record", 10)
2530            .await
2531            .unwrap();
2532        assert!(
2533            results.is_empty(),
2534            "deleted record must not appear in search results"
2535        );
2536    }
2537
2538    #[tokio::test]
2539    async fn search_delete_does_not_consume_top_k_slot() {
2540        let (store, _dir) = temp_store();
2541
2542        let mut deleted = make_record("gotcha:deleted-slot");
2543        deleted.value = "shared_sentinel_term".to_string();
2544        store.put(&deleted.key, &deleted).await.unwrap();
2545
2546        let mut live = make_record("gotcha:live-slot");
2547        live.value = "shared_sentinel_term".to_string();
2548        store.put(&live.key, &live).await.unwrap();
2549
2550        store.delete(&deleted.key).await.unwrap();
2551
2552        let results = store.search("shared_sentinel_term", 1).await.unwrap();
2553        assert_eq!(
2554            results.len(),
2555            1,
2556            "live hit should still fill the top-k slot"
2557        );
2558        assert_eq!(results[0].key, "gotcha:live-slot");
2559    }
2560
2561    #[tokio::test]
2562    async fn search_returns_full_record_from_surrealkv_not_tantivy_stored_fields() {
2563        // Tantivy only stores 6 fields. The full Record (tags, confidence,
2564        // staleness, etc.) must come from SurrealKV via the key lookup.
2565        let (store, _dir) = temp_store();
2566        let mut r = make_record("gotcha:full-record-check");
2567        r.value = "sentinel_fullrecord_uniqueterm_xqz".to_string();
2568        r.tags = vec!["production".to_string(), "critical-path".to_string()];
2569        store.put(&r.key, &r).await.unwrap();
2570
2571        let results = store
2572            .search("sentinel_fullrecord_uniqueterm_xqz", 10)
2573            .await
2574            .unwrap();
2575        assert_eq!(results.len(), 1);
2576        assert_eq!(
2577            results[0].tags,
2578            vec!["production", "critical-path"],
2579            "full tags must come from SurrealKV, not tantivy stored fields"
2580        );
2581    }
2582
2583    /// M-05-F: 20 records total, 5 contain a unique sentinel term.
2584    /// Query must return exactly those 5 with no false positives.
2585    #[tokio::test]
2586    async fn search_m05f_20_records_returns_exactly_correct_5() {
2587        let (store, _dir) = temp_store();
2588
2589        // 15 records with unrelated, varied content — must not appear in results
2590        for i in 0..15 {
2591            let mut r = make_record(&format!("gotcha:noise-{i:02}"));
2592            r.value = format!("background noise record about rayon and petgraph item {i}");
2593            store.put(&r.key, &r).await.unwrap();
2594        }
2595
2596        // 5 target records containing the unique sentinel term
2597        let mut target_keys = Vec::new();
2598        for i in 0..5 {
2599            let mut r = make_record(&format!("gotcha:target-{i}"));
2600            r.value = format!("sentinel_m05f_unique record index {i} with extra text");
2601            store.put(&r.key, &r).await.unwrap();
2602            target_keys.push(r.key.clone());
2603        }
2604
2605        let results = store.search("sentinel_m05f_unique", 20).await.unwrap();
2606        let result_keys: Vec<&str> = results.iter().map(|r| r.key.as_str()).collect();
2607
2608        assert_eq!(
2609            results.len(),
2610            5,
2611            "expected exactly 5 results, got {}: {:?}",
2612            results.len(),
2613            result_keys
2614        );
2615
2616        for k in &target_keys {
2617            assert!(
2618                result_keys.contains(&k.as_str()),
2619                "target key '{k}' missing from results"
2620            );
2621        }
2622
2623        // No noise records leaked in
2624        for r in &results {
2625            assert!(
2626                r.key.starts_with("gotcha:target-"),
2627                "noise record '{}' must not appear in results",
2628                r.key
2629            );
2630        }
2631    }
2632
2633    /// Worst-case Store volume: 5,000 records at realistic mati project scale
2634    /// (2,000-file codebase × 2-3 record types). All writes use put_batch
2635    /// (2 fsyncs total: 1 SurrealKV + 1 tantivy) matching how Layer 0 works.
2636    /// Proves BM25 returns exactly 20 targets from 4,980 noise records and
2637    /// that limit enforcement and full-record retrieval both hold at this scale.
2638    #[tokio::test]
2639    async fn search_5k_records_zero_false_positives_limit_and_full_record_correct() {
2640        let (store, _dir) = temp_store();
2641
2642        // 4,980 noise records — single put_batch call (2 fsyncs total)
2643        let noise: Vec<Record> = (0..4_980_usize)
2644            .map(|i| {
2645                let mut r = make_record(&format!("file:src/module_{i:04}.rs"));
2646                r.value = format!(
2647                    "module {i} handles initialization routing configuration management dispatch"
2648                );
2649                r
2650            })
2651            .collect();
2652        let noise_pairs: Vec<(&str, &Record)> = noise.iter().map(|r| (r.key.as_str(), r)).collect();
2653        store.put_batch(&noise_pairs).await.unwrap();
2654
2655        // 20 target records with unique sentinel + a meaningful tag we can
2656        // verify came from SurrealKV (not tantivy stored fields)
2657        let targets: Vec<Record> = (0..20_usize)
2658            .map(|i| {
2659                let mut r = make_record(&format!("gotcha:target-{i:02}"));
2660                r.value = format!("zqx_sentinel_5k_proof unique term record {i}");
2661                r.tags = vec!["verified-from-surrealkv".to_string()];
2662                r
2663            })
2664            .collect();
2665        let target_pairs: Vec<(&str, &Record)> =
2666            targets.iter().map(|r| (r.key.as_str(), r)).collect();
2667        store.put_batch(&target_pairs).await.unwrap();
2668
2669        // ── correctness at limit=100 ─────────────────────────────────────────
2670
2671        let results = store.search("zqx_sentinel_5k_proof", 100).await.unwrap();
2672        assert_eq!(
2673            results.len(),
2674            20,
2675            "expected 20 hits from 5,000 records, got {}",
2676            results.len()
2677        );
2678
2679        let result_keys: Vec<&str> = results.iter().map(|r| r.key.as_str()).collect();
2680        let target_keys: Vec<&str> = targets.iter().map(|r| r.key.as_str()).collect();
2681
2682        // All 20 targets present
2683        for k in &target_keys {
2684            assert!(result_keys.contains(k), "missing target: {k}");
2685        }
2686        // Zero noise leaked through
2687        for r in &results {
2688            assert!(
2689                r.key.starts_with("gotcha:target-"),
2690                "noise record '{}' must not appear in results",
2691                r.key
2692            );
2693        }
2694
2695        // Full record came from SurrealKV — tantivy doesn't store tags
2696        for r in &results {
2697            assert_eq!(
2698                r.tags,
2699                vec!["verified-from-surrealkv"],
2700                "tags must be fetched from SurrealKV, key: {}",
2701                r.key
2702            );
2703        }
2704
2705        // ── limit enforcement at scale ────────────────────────────────────────
2706
2707        let limited = store.search("zqx_sentinel_5k_proof", 5).await.unwrap();
2708        assert_eq!(limited.len(), 5, "limit=5 must cap results at scale");
2709
2710        // Over-limit returns exactly the matching set
2711        let over = store.search("zqx_sentinel_5k_proof", 999).await.unwrap();
2712        assert_eq!(
2713            over.len(),
2714            20,
2715            "limit > match count must return all 20 matches, not panic"
2716        );
2717
2718        // ── ensure noise records are NOT findable by sentinel term ────────────
2719
2720        // Pick a random noise record key, search by a term from its value
2721        // that does NOT appear in sentinel records
2722        let noise_only_results = store.search("zqx_sentinel_5k_proof", 100).await.unwrap();
2723        for r in &noise_only_results {
2724            assert!(
2725                !r.key.starts_with("file:src/module_"),
2726                "noise module record should not match sentinel query: {}",
2727                r.key
2728            );
2729        }
2730    }
2731
2732    // ─── M-05-D: index rebuild ────────────────────────────────────────────────
2733
2734    // Helper: make_record with a custom value (needed to control searchable content).
2735    fn make_record_v(key: &str, value: &str) -> Record {
2736        let mut r = make_record(key);
2737        r.value = value.to_string();
2738        r
2739    }
2740
2741    // Helper: open a fresh store over an existing data directory (bypasses slug
2742    // derivation so tests can point at a tempdir directly).
2743    fn reopen_store(root: &std::path::Path) -> Store {
2744        let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
2745        let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
2746        let search = OnceCell::new();
2747        let _ = search.set(Search::open(&root.join("search_index")).unwrap());
2748        Store {
2749            knowledge,
2750            sessions,
2751            search,
2752            root: root.to_path_buf(),
2753            index_needs_rebuild: false,
2754        }
2755    }
2756
2757    async fn reopen_store_open_and_rebuild_like(root: &std::path::Path) -> Store {
2758        let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
2759        let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
2760        let mut store = Store {
2761            knowledge,
2762            sessions,
2763            search: OnceCell::new(),
2764            root: root.to_path_buf(),
2765            index_needs_rebuild: false,
2766        };
2767
2768        let search_path = store.root.join("search_index");
2769        let stale_marker = store.root.join(SEARCH_STALE_MARKER);
2770        let has_stale_marker = stale_marker.exists();
2771        let has_sync_pending = store.root.join(SEARCH_SYNC_PENDING).exists();
2772
2773        if (has_stale_marker || has_sync_pending) && search_path.exists() {
2774            std::fs::remove_dir_all(&search_path).unwrap();
2775        }
2776
2777        match Search::open(&search_path) {
2778            Ok(s) => {
2779                let _ = store.search.set(s);
2780            }
2781            Err(_) => {
2782                if search_path.exists() {
2783                    std::fs::remove_dir_all(&search_path).unwrap();
2784                }
2785                let _ = store.search.set(Search::open(&search_path).unwrap());
2786                store.index_needs_rebuild = true;
2787            }
2788        }
2789
2790        if has_stale_marker || has_sync_pending {
2791            store.index_needs_rebuild = true;
2792        }
2793
2794        if store.index_needs_rebuild {
2795            store.rebuild_search_index().await.unwrap();
2796            let _ = std::fs::remove_file(store.root.join(SEARCH_SYNC_PENDING));
2797            if has_stale_marker {
2798                let _ = std::fs::remove_file(&stale_marker);
2799            }
2800        }
2801
2802        store
2803    }
2804
2805    /// Write records, close, delete search_index/, reopen with a fresh empty
2806    /// index, call rebuild_search_index — all records must be searchable again.
2807    #[tokio::test]
2808    async fn rebuild_search_index_after_missing_index_restores_search() {
2809        let (store, _dir) = temp_store();
2810        let root = store.root.clone();
2811
2812        // Write 10 records with a unique sentinel term in their values
2813        let records: Vec<Record> = (0..10)
2814            .map(|i| {
2815                make_record_v(
2816                    &format!("gotcha:rebuild-miss-{i:02}"),
2817                    "xq_rebuild_missing_sentinel unique term",
2818                )
2819            })
2820            .collect();
2821        let pairs: Vec<(&str, &Record)> = records.iter().map(|r| (r.key.as_str(), r)).collect();
2822        store.put_batch(&pairs).await.unwrap();
2823        store.close().await.unwrap();
2824
2825        // Simulate missing index (deleted by user, first run after migration, etc.)
2826        std::fs::remove_dir_all(root.join("search_index")).unwrap();
2827
2828        // Reopen with fresh empty index, then rebuild
2829        let store2 = reopen_store(&root);
2830        assert!(
2831            !store2.index_needs_rebuild(),
2832            "reopen_store sets flag=false; we test rebuild directly"
2833        );
2834
2835        let committed = store2.rebuild_search_index().await.unwrap();
2836        assert_eq!(committed, 10, "rebuild must commit all 10 records");
2837
2838        let results = store2
2839            .search("xq_rebuild_missing_sentinel", 20)
2840            .await
2841            .unwrap();
2842        assert_eq!(
2843            results.len(),
2844            10,
2845            "all records must be findable after rebuild"
2846        );
2847    }
2848
2849    /// Corrupt meta.json → Store-level open logic must wipe and flag rebuild.
2850    /// After rebuild_search_index the record is searchable again.
2851    #[tokio::test]
2852    async fn rebuild_search_index_after_corrupt_index_restores_search() {
2853        let (store, _dir) = temp_store();
2854        let root = store.root.clone();
2855
2856        let r = make_record_v(
2857            "gotcha:rebuild-corrupt",
2858            "xq_rebuild_corrupt_sentinel unique",
2859        );
2860        store.put("gotcha:rebuild-corrupt", &r).await.unwrap();
2861        store.close().await.unwrap();
2862
2863        // Corrupt the index by overwriting meta.json with garbage
2864        std::fs::write(
2865            root.join("search_index").join("meta.json"),
2866            b"not valid json {{{{",
2867        )
2868        .unwrap();
2869
2870        // Replicate the Store::open_and_rebuild recovery path: Search::open fails → wipe → reopen
2871        let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
2872        let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
2873        let search_path = root.join("search_index");
2874        let (search_cell, needs_rebuild) = {
2875            let cell = OnceCell::new();
2876            match Search::open(&search_path) {
2877                Ok(s) => {
2878                    let _ = cell.set(s);
2879                    (cell, false)
2880                }
2881                Err(_) => {
2882                    std::fs::remove_dir_all(&search_path).unwrap();
2883                    let _ = cell.set(Search::open(&search_path).unwrap());
2884                    (cell, true)
2885                }
2886            }
2887        };
2888        let store2 = Store {
2889            knowledge,
2890            sessions,
2891            search: search_cell,
2892            root: root.clone(),
2893            index_needs_rebuild: needs_rebuild,
2894        };
2895
2896        assert!(
2897            store2.index_needs_rebuild(),
2898            "corrupt meta.json must trigger rebuild flag"
2899        );
2900
2901        store2.rebuild_search_index().await.unwrap();
2902
2903        let results = store2
2904            .search("xq_rebuild_corrupt_sentinel", 10)
2905            .await
2906            .unwrap();
2907        assert_eq!(
2908            results.len(),
2909            1,
2910            "record must be searchable after rebuild from corrupt state"
2911        );
2912        assert_eq!(results[0].key, "gotcha:rebuild-corrupt");
2913    }
2914
2915    /// rebuild_search_index returns the exact number of records it committed.
2916    #[tokio::test]
2917    async fn rebuild_search_index_returns_committed_count() {
2918        let (store, _dir) = temp_store();
2919        let root = store.root.clone();
2920
2921        let records: Vec<Record> = (0..7)
2922            .map(|i| make_record(&format!("file:src/mod_{i}.rs")))
2923            .collect();
2924        let pairs: Vec<(&str, &Record)> = records.iter().map(|r| (r.key.as_str(), r)).collect();
2925        store.put_batch(&pairs).await.unwrap();
2926        store.close().await.unwrap();
2927
2928        // Fresh empty index — simulates post-corrupt open
2929        std::fs::remove_dir_all(root.join("search_index")).unwrap();
2930        let store2 = reopen_store(&root);
2931        let committed = store2.rebuild_search_index().await.unwrap();
2932        assert_eq!(
2933            committed, 7,
2934            "committed count must equal number of records in SurrealKV"
2935        );
2936    }
2937
2938    #[tokio::test]
2939    async fn open_and_rebuild_like_wipes_stale_index_when_sync_pending_exists() {
2940        let (store, _dir) = temp_store();
2941        let root = store.root.clone();
2942
2943        let deleted = make_record_v("gotcha:deleted-after-crash", "shared_crash_sentinel");
2944        let live = make_record_v("gotcha:live-after-crash", "shared_crash_sentinel");
2945
2946        store.put(&deleted.key, &deleted).await.unwrap();
2947        store.put(&live.key, &live).await.unwrap();
2948        store.delete(&deleted.key).await.unwrap();
2949
2950        // Simulate a stale tantivy entry surviving a crash window.
2951        store.ensure_search().unwrap().add_record(&deleted).unwrap();
2952        std::fs::write(root.join(SEARCH_SYNC_PENDING), b"").unwrap();
2953        store.close().await.unwrap();
2954
2955        let reopened = reopen_store_open_and_rebuild_like(&root).await;
2956        let results = reopened.search("shared_crash_sentinel", 1).await.unwrap();
2957        assert_eq!(
2958            results.len(),
2959            1,
2960            "live record should fill top-k after rebuild"
2961        );
2962        assert_eq!(results[0].key, "gotcha:live-after-crash");
2963        assert!(
2964            !root.join(SEARCH_SYNC_PENDING).exists(),
2965            "successful rebuild should clear sync-pending marker"
2966        );
2967    }
2968
2969    #[tokio::test]
2970    async fn put_leaves_sync_pending_when_search_cannot_initialize() {
2971        let dir = TempDir::new().unwrap();
2972        let root = dir.path().join("mati_test");
2973        std::fs::create_dir_all(&root).unwrap();
2974        let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
2975        let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
2976        std::fs::write(root.join("search_index"), b"not a directory").unwrap();
2977
2978        let store = Store {
2979            knowledge,
2980            sessions,
2981            search: OnceCell::new(),
2982            root: root.clone(),
2983            index_needs_rebuild: false,
2984        };
2985
2986        let record = make_record("gotcha:search-sync-failure");
2987        store.put(&record.key, &record).await.unwrap();
2988
2989        assert!(
2990            root.join(SEARCH_SYNC_PENDING).exists(),
2991            "failed search sync must leave the crash-fence marker in place"
2992        );
2993    }
2994
2995    /// Calling rebuild_search_index twice must not panic; query deduplication
2996    /// in query_keys ensures each key appears exactly once in results.
2997    #[tokio::test]
2998    async fn rebuild_search_index_twice_is_safe() {
2999        let (store, _dir) = temp_store();
3000        let r = make_record_v("gotcha:idempotent", "xq_rebuild_idempotent_sentinel unique");
3001        store.put("gotcha:idempotent", &r).await.unwrap();
3002
3003        store.rebuild_search_index().await.unwrap();
3004        store.rebuild_search_index().await.unwrap();
3005
3006        let results = store
3007            .search("xq_rebuild_idempotent_sentinel", 10)
3008            .await
3009            .unwrap();
3010        assert_eq!(
3011            results.len(),
3012            1,
3013            "dedup must collapse duplicate tantivy entries to one result"
3014        );
3015    }
3016
3017    /// Normal open (healthy index) must not set index_needs_rebuild.
3018    #[tokio::test]
3019    async fn open_healthy_index_does_not_set_rebuild_flag() {
3020        let (store, _dir) = temp_store();
3021        assert!(!store.index_needs_rebuild());
3022    }
3023
3024    // ─── history (M-14) ──────────────────────────────────────────────────────
3025
3026    #[tokio::test]
3027    async fn history_empty_key_returns_error() {
3028        let (store, _dir) = temp_store();
3029        let result = store.history("", 0);
3030        assert!(result.is_err(), "empty key must be rejected");
3031    }
3032
3033    #[tokio::test]
3034    async fn history_single_version() {
3035        let (store, _dir) = temp_store();
3036        store
3037            .put("gotcha:single", &make_record("gotcha:single"))
3038            .await
3039            .unwrap();
3040
3041        let entries = store.history("gotcha:single", 0).unwrap();
3042        assert!(!entries.is_empty(), "must return at least one version");
3043        assert!(!entries[0].is_tombstone);
3044        assert!(entries[0].record.is_some());
3045        assert_eq!(entries[0].record.as_ref().unwrap().key, "gotcha:single");
3046    }
3047
3048    #[tokio::test]
3049    async fn history_multiple_versions_newest_first() {
3050        let (store, _dir) = temp_store();
3051        let mut r = make_record("gotcha:multi");
3052        r.value = "v1".to_string();
3053        store.put("gotcha:multi", &r).await.unwrap();
3054        r.value = "v2".to_string();
3055        r.version.logical_clock = 2;
3056        store.put("gotcha:multi", &r).await.unwrap();
3057        r.value = "v3".to_string();
3058        r.version.logical_clock = 3;
3059        store.put("gotcha:multi", &r).await.unwrap();
3060
3061        let entries = store.history("gotcha:multi", 0).unwrap();
3062        assert!(
3063            entries.len() >= 3,
3064            "expected >=3 versions, got {}",
3065            entries.len()
3066        );
3067
3068        // Newest first: timestamps must be non-increasing
3069        for pair in entries.windows(2) {
3070            assert!(
3071                pair[0].timestamp_ns >= pair[1].timestamp_ns,
3072                "history must be newest-first: {} >= {}",
3073                pair[0].timestamp_ns,
3074                pair[1].timestamp_ns,
3075            );
3076        }
3077
3078        // Newest entry should have the latest value
3079        let newest = entries[0].record.as_ref().unwrap();
3080        assert_eq!(newest.value, "v3");
3081    }
3082
3083    #[tokio::test]
3084    async fn history_includes_tombstones() {
3085        let (store, _dir) = temp_store();
3086        store
3087            .put("gotcha:tomb", &make_record("gotcha:tomb"))
3088            .await
3089            .unwrap();
3090
3091        // Use soft_delete (not hard delete) so SurrealKV retains the tombstone
3092        // marker in the version history. Store::delete is a hard delete that
3093        // erases all versions completely — the history API surfaces soft-delete
3094        // tombstones from lifecycle transitions.
3095        {
3096            let mut txn = store.knowledge.begin_with_mode(Mode::WriteOnly).unwrap();
3097            txn.set_durability(SkvDurability::Immediate);
3098            txn.soft_delete(b"gotcha:tomb").unwrap();
3099            txn.commit().await.unwrap();
3100        }
3101
3102        let entries = store.history("gotcha:tomb", 0).unwrap();
3103        assert!(
3104            entries.len() >= 2,
3105            "must have create + soft-delete, got {}",
3106            entries.len()
3107        );
3108        // At least one tombstone must exist
3109        assert!(
3110            entries.iter().any(|e| e.is_tombstone),
3111            "tombstone must be present in history",
3112        );
3113    }
3114
3115    #[tokio::test]
3116    async fn history_no_key_spill() {
3117        let (store, _dir) = temp_store();
3118        store
3119            .put("gotcha:alpha", &make_record("gotcha:alpha"))
3120            .await
3121            .unwrap();
3122        store
3123            .put(
3124                "gotcha:alpha-extended",
3125                &make_record("gotcha:alpha-extended"),
3126            )
3127            .await
3128            .unwrap();
3129        store
3130            .put("gotcha:beta", &make_record("gotcha:beta"))
3131            .await
3132            .unwrap();
3133
3134        let entries = store.history("gotcha:alpha", 0).unwrap();
3135        for e in &entries {
3136            if let Some(ref rec) = e.record {
3137                assert_eq!(
3138                    rec.key, "gotcha:alpha",
3139                    "spilled into adjacent key: {}",
3140                    rec.key
3141                );
3142            }
3143        }
3144    }
3145
3146    #[tokio::test]
3147    async fn history_limit() {
3148        let (store, _dir) = temp_store();
3149        let mut r = make_record("gotcha:limited");
3150        for i in 0..5 {
3151            r.value = format!("v{i}");
3152            r.version.logical_clock = i as u64;
3153            store.put("gotcha:limited", &r).await.unwrap();
3154        }
3155
3156        let entries = store.history("gotcha:limited", 2).unwrap();
3157        assert!(
3158            entries.len() <= 2,
3159            "limit=2 but got {} entries",
3160            entries.len()
3161        );
3162    }
3163
3164    #[tokio::test]
3165    async fn history_since_filters_old_versions() {
3166        let (store, _dir) = temp_store();
3167        let mut r = make_record("gotcha:since");
3168        r.value = "old".to_string();
3169        store.put("gotcha:since", &r).await.unwrap();
3170
3171        // Capture a "since" timestamp between writes — use nanosecond
3172        // granularity so we can convert to seconds.
3173        let since_secs = SystemTime::now()
3174            .duration_since(UNIX_EPOCH)
3175            .unwrap()
3176            .as_secs();
3177
3178        r.value = "new".to_string();
3179        r.version.logical_clock = 2;
3180        store.put("gotcha:since", &r).await.unwrap();
3181
3182        let entries = store.history_since("gotcha:since", since_secs, 0).unwrap();
3183        // Should contain at least the "new" version
3184        assert!(
3185            !entries.is_empty(),
3186            "since filter should include the recent write",
3187        );
3188        // Verify all returned timestamps are >= since_secs
3189        for e in &entries {
3190            assert!(
3191                e.timestamp_secs >= since_secs.saturating_sub(1),
3192                "entry ts {} is before since {}",
3193                e.timestamp_secs,
3194                since_secs,
3195            );
3196        }
3197    }
3198
3199    #[tokio::test]
3200    async fn records_since_with_dep() {
3201        let (store, _dir) = temp_store();
3202
3203        let now = SystemTime::now()
3204            .duration_since(UNIX_EPOCH)
3205            .unwrap()
3206            .as_secs();
3207        let old_ts = now.saturating_sub(3600);
3208
3209        let mut old_rec = make_record("gotcha:old");
3210        old_rec.updated_at = old_ts;
3211        store.put("gotcha:old", &old_rec).await.unwrap();
3212
3213        let mut new_gotcha = make_record("gotcha:new");
3214        new_gotcha.updated_at = now;
3215        store.put("gotcha:new", &new_gotcha).await.unwrap();
3216
3217        let mut dep_rec = make_record("dep:cargo:serde");
3218        dep_rec.category = crate::store::record::Category::Dependency;
3219        dep_rec.updated_at = now;
3220        store.put("dep:cargo:serde", &dep_rec).await.unwrap();
3221
3222        let since = now.saturating_sub(60);
3223        let results = store.records_since(since, 0).await.unwrap();
3224        let keys: Vec<&str> = results.iter().map(|r| r.key.as_str()).collect();
3225
3226        assert!(keys.contains(&"gotcha:new"), "new gotcha should appear");
3227        assert!(
3228            keys.contains(&"dep:cargo:serde"),
3229            "dep record should appear"
3230        );
3231        assert!(
3232            !keys.contains(&"gotcha:old"),
3233            "old gotcha should be excluded"
3234        );
3235
3236        // Verify newest-first ordering
3237        for pair in results.windows(2) {
3238            assert!(
3239                pair[0].updated_at >= pair[1].updated_at,
3240                "results must be newest-first",
3241            );
3242        }
3243    }
3244
3245    #[tokio::test]
3246    async fn records_since_respects_limit() {
3247        let (store, _dir) = temp_store();
3248        let now = SystemTime::now()
3249            .duration_since(UNIX_EPOCH)
3250            .unwrap()
3251            .as_secs();
3252
3253        for i in 0..10 {
3254            let mut r = make_record(&format!("gotcha:lim-{i:02}"));
3255            r.updated_at = now;
3256            store.put(&r.key, &r).await.unwrap();
3257        }
3258
3259        let results = store.records_since(now.saturating_sub(1), 3).await.unwrap();
3260        assert_eq!(results.len(), 3, "limit=3 should cap at 3");
3261    }
3262
3263    #[test]
3264    fn history_entry_timestamp_conversion() {
3265        let entry = HistoryEntry {
3266            timestamp_secs: 1_710_520_800,
3267            timestamp_ns: 1_710_520_800_000_000_000,
3268            record: None,
3269            is_tombstone: false,
3270        };
3271        assert_eq!(entry.timestamp_secs, entry.timestamp_ns / 1_000_000_000);
3272    }
3273
3274    // ─── lock_error_hint ──────────────────────────────────────────────────
3275
3276    #[test]
3277    fn lock_error_hint_rewrites_real_lock_contention_error() {
3278        let dir = TempDir::new().unwrap();
3279        let db_path = dir.path().join("knowledge.db");
3280        std::fs::create_dir_all(&db_path).unwrap();
3281
3282        // Write a fake LOCK file with a PID
3283        std::fs::write(db_path.join("LOCK"), "12345\n").unwrap();
3284
3285        let err = anyhow::anyhow!("Database at /foo/LOCK is already locked by another process");
3286        let result = lock_error_hint(err, &db_path);
3287        let msg = format!("{result}");
3288        assert!(
3289            msg.contains("another mati process holds the lock"),
3290            "should rewrite lock error, got: {msg}"
3291        );
3292        assert!(
3293            msg.contains("PID: 12345"),
3294            "should include holder PID, got: {msg}"
3295        );
3296    }
3297
3298    #[test]
3299    fn lock_error_hint_passes_through_non_lock_errors() {
3300        let dir = TempDir::new().unwrap();
3301        let db_path = dir.path().join("knowledge.db");
3302        std::fs::create_dir_all(&db_path).unwrap();
3303
3304        // LOCK file exists (as it always does after first use)
3305        std::fs::write(db_path.join("LOCK"), "99999\n").unwrap();
3306
3307        let err = anyhow::anyhow!("WAL segment corrupt at offset 1234");
3308        let result = lock_error_hint(err, &db_path);
3309        let msg = format!("{result}");
3310        assert!(
3311            msg.contains("WAL segment corrupt"),
3312            "non-lock errors must pass through unchanged, got: {msg}"
3313        );
3314        assert!(
3315            !msg.contains("another mati process"),
3316            "non-lock errors must NOT be rewritten to lock errors, got: {msg}"
3317        );
3318    }
3319
3320    // ── Transaction tree-routing invariant tests ───────────────────────
3321
3322    #[tokio::test]
3323    async fn transact_knowledge_rejects_sessions_key() {
3324        let dir = tempfile::tempdir().unwrap();
3325        let store = Store::open(dir.path()).await.unwrap();
3326
3327        // "session:foo" routes to Eventual (sessions tree).
3328        let ops = vec![KnowledgeWriteOp::PutRaw {
3329            key: "session:foo",
3330            value: b"data",
3331        }];
3332        let err = store.transact_knowledge(&ops).await.unwrap_err();
3333        assert!(
3334            err.to_string().contains("routes to sessions tree"),
3335            "wrong-tree key must be rejected: {err}"
3336        );
3337    }
3338
3339    #[tokio::test]
3340    async fn transact_sessions_raw_rejects_knowledge_key() {
3341        let dir = tempfile::tempdir().unwrap();
3342        let store = Store::open(dir.path()).await.unwrap();
3343
3344        // "gotcha:foo" routes to Immediate (knowledge tree).
3345        let entries: Vec<(&str, &[u8])> = vec![("gotcha:foo", b"data")];
3346        let err = store.transact_sessions_raw(&entries).await.unwrap_err();
3347        assert!(
3348            err.to_string().contains("routes to knowledge tree"),
3349            "wrong-tree key must be rejected: {err}"
3350        );
3351    }
3352
3353    #[tokio::test]
3354    async fn transact_knowledge_accepts_valid_knowledge_keys() {
3355        let dir = tempfile::tempdir().unwrap();
3356        let store = Store::open(dir.path()).await.unwrap();
3357
3358        let ops = vec![
3359            KnowledgeWriteOp::PutRaw {
3360                key: "gotcha:test",
3361                value: b"data1",
3362            },
3363            KnowledgeWriteOp::PutRaw {
3364                key: "audit:knowledge:123",
3365                value: b"data2",
3366            },
3367        ];
3368        store
3369            .transact_knowledge(&ops)
3370            .await
3371            .expect("valid knowledge keys must succeed");
3372    }
3373
3374    #[tokio::test]
3375    async fn transact_sessions_raw_accepts_valid_session_keys() {
3376        let dir = tempfile::tempdir().unwrap();
3377        let store = Store::open(dir.path()).await.unwrap();
3378
3379        let entries: Vec<(&str, &[u8])> = vec![
3380            ("session:consulted:file:foo", b"data1"),
3381            ("audit:session:123", b"data2"),
3382            ("analytics:hit_2026-04-09", b"data3"),
3383        ];
3384        store
3385            .transact_sessions_raw(&entries)
3386            .await
3387            .expect("valid session keys must succeed");
3388    }
3389}