1use std::path::{Path, PathBuf};
16use std::time::{SystemTime, UNIX_EPOCH};
17
18use anyhow::{Context, Result};
19use once_cell::sync::OnceCell;
20use rmp_serde as rmps;
21use sha2::{Digest, Sha256};
22use surrealkv::{
23 Durability as SkvDurability, HistoryOptions, LSMIterator, Mode, Options, Transaction, Tree,
24 TreeBuilder, VLogChecksumLevel,
25};
26
27use serde::{Deserialize, Serialize};
28
29use super::record::Record;
30use super::Durability;
31use crate::search::Search;
32
33const SESSIONS_RETENTION_NS: u64 = 90 * 24 * 60 * 60 * 1_000_000_000u64;
35
36const SEARCH_STALE_MARKER: &str = "search_stale";
40const SEARCH_SYNC_PENDING: &str = "search_sync_pending";
43
44const KNOWLEDGE_NAMESPACES: &[&str] = &[
50 "gotcha:",
51 "decision:",
52 "file:",
53 "stage:",
54 "dev_note:",
55 "dep:",
56];
57
58fn is_knowledge_key(key: &str) -> bool {
64 key.starts_with("file:")
65 || key.starts_with("gotcha:")
66 || key.starts_with("decision:")
67 || key.starts_with("dep:")
68 || key.starts_with("dev_note:")
69 || key.starts_with("stage:")
70}
71
72const SESSION_NAMESPACES: &[&str] = &["session:", "analytics:", "hook_event:", "compliance:"];
77
78pub enum KnowledgeWriteOp<'a> {
83 PutRecord { key: &'a str, record: &'a Record },
85 PutRaw { key: &'a str, value: &'a [u8] },
87}
88
89pub struct Store {
97 knowledge: Tree,
98 sessions: Tree,
99 search: OnceCell<Search>,
106 pub root: PathBuf,
108 index_needs_rebuild: bool,
112}
113
114impl Store {
115 pub async fn open(repo_root: &Path) -> Result<Self> {
125 let slug = derive_slug(repo_root);
126 let home = dirs::home_dir().context("cannot determine home directory")?;
127 let root = home.join(".mati").join(&slug);
128 std::fs::create_dir_all(&root)
129 .with_context(|| format!("cannot create mati dir at {}", root.display()))?;
130
131 let knowledge = open_knowledge_tree(root.join("knowledge.db"))
132 .map_err(|e| lock_error_hint(e, &root.join("knowledge.db")))?;
133 let sessions = open_sessions_tree(root.join("sessions.db"))
134 .map_err(|e| lock_error_hint(e, &root.join("sessions.db")))?;
135
136 let store = Self {
141 knowledge,
142 sessions,
143 search: OnceCell::new(),
144 root,
145 index_needs_rebuild: false,
146 };
147
148 super::migrations::migrate(&store).await?;
155
156 Ok(store)
157 }
158
159 pub async fn open_and_rebuild(repo_root: &Path) -> Result<Self> {
169 let mut store = Self::open(repo_root).await?;
170
171 let search_path = store.root.join("search_index");
172 let stale_marker = store.root.join(SEARCH_STALE_MARKER);
173 let has_sync_pending = store.root.join(SEARCH_SYNC_PENDING).exists();
174
175 let has_stale_marker = stale_marker.exists();
180 if (has_stale_marker || has_sync_pending) && search_path.exists() {
181 std::fs::remove_dir_all(&search_path).with_context(|| {
182 format!(
183 "failed to remove stale search index at {}",
184 search_path.display()
185 )
186 })?;
187 }
188
189 match Search::open(&search_path) {
191 Ok(s) => {
192 let _ = store.search.set(s);
193 }
194 Err(e) => {
195 tracing::warn!(
196 error = %e,
197 path = %search_path.display(),
198 "search index corrupt or schema-incompatible — wiping and scheduling rebuild"
199 );
200 if search_path.exists() {
201 std::fs::remove_dir_all(&search_path).with_context(|| {
202 format!(
203 "failed to remove corrupt search index at {}",
204 search_path.display()
205 )
206 })?;
207 }
208 let s = Search::open(&search_path)
209 .context("failed to open fresh search index after clearing corrupt data")?;
210 let _ = store.search.set(s);
211 store.index_needs_rebuild = true;
212 }
213 }
214
215 if has_stale_marker {
216 store.index_needs_rebuild = true;
217 }
218
219 if has_sync_pending {
222 tracing::warn!("tantivy crash-window desync detected — scheduling rebuild");
223 store.index_needs_rebuild = true;
224 }
225
226 if store.index_needs_rebuild() {
227 store.rebuild_search_index().await?;
228 let _ = std::fs::remove_file(store.root.join(SEARCH_SYNC_PENDING));
231 if has_stale_marker {
234 let _ = std::fs::remove_file(&stale_marker);
235 }
236 }
237 Ok(store)
238 }
239
240 #[must_use]
247 pub fn index_needs_rebuild(&self) -> bool {
248 self.index_needs_rebuild
249 }
250
251 fn ensure_search(&self) -> Result<&Search> {
258 self.search.get_or_try_init(|| {
259 let search_path = self.root.join("search_index");
260 match Search::open(&search_path) {
261 Ok(s) => Ok(s),
262 Err(e) => {
263 tracing::warn!(
264 error = %e,
265 path = %search_path.display(),
266 "search index corrupt on lazy init — wiping and creating fresh"
267 );
268 if search_path.exists() {
269 std::fs::remove_dir_all(&search_path).with_context(|| {
270 format!(
271 "failed to remove corrupt search index at {}",
272 search_path.display()
273 )
274 })?;
275 }
276 Search::open(&search_path)
277 .context("failed to open fresh search index after clearing corrupt data")
278 }
279 }
280 })
281 }
282
283 pub async fn rebuild_search_index(&self) -> Result<usize> {
293 let search = self.ensure_search()?;
294
295 let mut committed = 0usize;
299
300 for ns in KNOWLEDGE_NAMESPACES.iter().chain(SESSION_NAMESPACES) {
301 let records = self.scan_prefix(ns).await?;
302 if records.is_empty() {
303 continue;
304 }
305 let refs: Vec<&Record> = records.iter().collect();
306 committed += search.add_records(&refs)?;
307 }
308
309 tracing::info!(committed, "search index rebuilt from SurrealKV");
310
311 Ok(committed)
312 }
313
314 pub async fn get(&self, key: &str) -> Result<Option<Record>> {
320 let txn = self.tree_for(key).begin_with_mode(Mode::ReadOnly)?;
321 read_record(&txn, key)
322 }
323
324 pub async fn put(&self, key: &str, record: &Record) -> Result<()> {
328 let durability = Durability::for_key(key);
329 let tree = self.tree_for(key);
330 let mut txn = tree.begin_with_mode(Mode::WriteOnly)?;
331 txn.set_durability(skv_durability(durability));
332
333 let bytes = rmps::to_vec_named(record)
334 .with_context(|| format!("failed to serialize record for key '{key}'"))?;
335 txn.set(key.as_bytes(), bytes)?;
336 txn.commit().await?;
337
338 if is_knowledge_key(key) {
342 let _ = std::fs::write(self.root.join(SEARCH_SYNC_PENDING), b"");
343 }
344
345 let mut search_synced = false;
354 match self.ensure_search() {
355 Ok(search) => {
356 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
357 search.add_record(record)
358 })) {
359 Ok(Ok(())) => {
360 search_synced = true;
361 }
362 Ok(Err(e)) => {
363 tracing::warn!("search index update failed for '{key}': {e}");
364 }
365 Err(_panic) => {
366 tracing::error!(
367 "search index panicked during put for '{key}' — \
368 index will be rebuilt on next startup"
369 );
370 }
371 }
372 }
373 Err(e) => {
374 tracing::warn!("search index unavailable during put: {e}");
375 }
376 }
377 if is_knowledge_key(key) {
378 self.bump_write_seq();
379 if search_synced {
380 let _ = std::fs::remove_file(self.root.join(SEARCH_SYNC_PENDING));
381 }
382 }
383 Ok(())
384 }
385
386 pub async fn put_batch_kv_only(&self, records: &[(&str, &Record)]) -> Result<()> {
394 if records.is_empty() {
395 return Ok(());
396 }
397 let mut immediate: Vec<(&str, &Record)> = Vec::new();
398 let mut eventual: Vec<(&str, &Record)> = Vec::new();
399 for &(key, record) in records {
400 match Durability::for_key(key) {
401 Durability::Immediate => immediate.push((key, record)),
402 Durability::Eventual => eventual.push((key, record)),
403 }
404 }
405 if !immediate.is_empty() {
406 let mut txn = self.knowledge.begin_with_mode(Mode::WriteOnly)?;
407 txn.set_durability(SkvDurability::Immediate);
408 for (key, record) in &immediate {
409 let bytes = rmps::to_vec_named(record)
410 .with_context(|| format!("failed to serialize record for key '{key}'"))?;
411 txn.set(key.as_bytes(), bytes)?;
412 }
413 txn.commit().await?;
414 }
415 if !eventual.is_empty() {
416 let mut txn = self.sessions.begin_with_mode(Mode::WriteOnly)?;
417 txn.set_durability(SkvDurability::Eventual);
418 for (key, record) in &eventual {
419 let bytes = rmps::to_vec_named(record)
420 .with_context(|| format!("failed to serialize record for key '{key}'"))?;
421 txn.set(key.as_bytes(), bytes)?;
422 }
423 txn.commit().await?;
424 }
425 if records.iter().any(|(k, _)| is_knowledge_key(k)) {
426 self.bump_write_seq();
427 }
428 Ok(())
429 }
430
431 pub fn mark_search_stale(&self) {
439 let _ = std::fs::write(self.root.join(SEARCH_STALE_MARKER), b"");
440 }
441
442 pub async fn put_batch(&self, records: &[(&str, &Record)]) -> Result<()> {
451 if records.is_empty() {
452 return Ok(());
453 }
454
455 let mut immediate: Vec<(&str, &Record)> = Vec::new();
457 let mut eventual: Vec<(&str, &Record)> = Vec::new();
458 for &(key, record) in records {
459 match Durability::for_key(key) {
460 Durability::Immediate => immediate.push((key, record)),
461 Durability::Eventual => eventual.push((key, record)),
462 }
463 }
464
465 if !immediate.is_empty() {
466 let mut txn = self.knowledge.begin_with_mode(Mode::WriteOnly)?;
467 txn.set_durability(SkvDurability::Immediate);
468 for (key, record) in &immediate {
469 let bytes = rmps::to_vec_named(record)
470 .with_context(|| format!("failed to serialize record for key '{key}'"))?;
471 txn.set(key.as_bytes(), bytes)?;
472 }
473 txn.commit().await?;
474 }
475
476 if !eventual.is_empty() {
477 let mut txn = self.sessions.begin_with_mode(Mode::WriteOnly)?;
478 txn.set_durability(SkvDurability::Eventual);
479 for (key, record) in &eventual {
480 let bytes = rmps::to_vec_named(record)
481 .with_context(|| format!("failed to serialize record for key '{key}'"))?;
482 txn.set(key.as_bytes(), bytes)?;
483 }
484 txn.commit().await?;
485 }
486
487 let has_knowledge = records.iter().any(|(k, _)| is_knowledge_key(k));
488
489 if has_knowledge {
491 let _ = std::fs::write(self.root.join(SEARCH_SYNC_PENDING), b"");
492 }
493
494 let mut search_synced = false;
498 match self.ensure_search() {
499 Ok(search) => {
500 let search_records: Vec<&Record> = records.iter().map(|(_, r)| *r).collect();
501 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
502 search.add_records(&search_records)
503 })) {
504 Ok(Ok(_)) => {
505 search_synced = true;
506 }
507 Ok(Err(e)) => {
508 tracing::warn!("search index update failed in put_batch: {e}");
509 }
510 Err(_panic) => {
511 tracing::error!(
512 "search index panicked during put_batch — \
513 index will be rebuilt on next startup"
514 );
515 }
516 }
517 }
518 Err(e) => {
519 tracing::warn!("search index unavailable during put_batch: {e}");
520 }
521 }
522 if has_knowledge {
523 self.bump_write_seq();
524 if search_synced {
525 let _ = std::fs::remove_file(self.root.join(SEARCH_SYNC_PENDING));
526 }
527 }
528 Ok(())
529 }
530
531 pub async fn delete(&self, key: &str) -> Result<()> {
533 let tree = self.tree_for(key);
534 let mut txn = tree.begin_with_mode(Mode::WriteOnly)?;
535 txn.set_durability(skv_durability(Durability::for_key(key)));
536 txn.delete(key.as_bytes())?;
537 txn.commit().await?;
538
539 if is_knowledge_key(key) {
540 let _ = std::fs::write(self.root.join(SEARCH_SYNC_PENDING), b"");
541 }
542
543 let mut search_synced = false;
544 match self.ensure_search() {
545 Ok(search) => {
546 search.delete_key(key)?;
547 search_synced = true;
548 }
549 Err(e) => {
550 tracing::warn!("search index unavailable during delete: {e}");
551 }
552 }
553
554 if is_knowledge_key(key) {
555 self.bump_write_seq();
556 if search_synced {
557 let _ = std::fs::remove_file(self.root.join(SEARCH_SYNC_PENDING));
558 }
559 }
560 Ok(())
561 }
562
563 pub async fn scan_prefix(&self, prefix: &str) -> Result<Vec<Record>> {
570 let tree = self.tree_for(prefix);
571 let txn = tree.begin_with_mode(Mode::ReadOnly)?;
572
573 let end = prefix_end(prefix);
575 let iter = txn.range(prefix.as_bytes(), end.as_bytes())?;
576
577 let mut records = Vec::new();
578 let mut cursor = iter;
579 while cursor.next()? {
580 let bytes = cursor.value()?;
581 match rmps::from_slice::<Record>(&bytes) {
582 Ok(record) => records.push(record),
583 Err(e) => {
584 tracing::warn!("skipping malformed record during scan: {e}");
585 }
586 }
587 }
588 Ok(records)
589 }
590
591 pub async fn scan_prefix_each<F>(&self, prefix: &str, mut callback: F) -> Result<()>
603 where
604 F: FnMut(Record),
605 {
606 let tree = self.tree_for(prefix);
607 let txn = tree.begin_with_mode(Mode::ReadOnly)?;
608 let end = prefix_end(prefix);
609 let mut cursor = txn.range(prefix.as_bytes(), end.as_bytes())?;
610 while cursor.next()? {
611 let bytes = cursor.value()?;
612 match rmps::from_slice::<Record>(&bytes) {
613 Ok(record) => callback(record),
614 Err(e) => {
615 tracing::warn!("skipping malformed record during scan: {e}");
616 }
617 }
618 }
619 Ok(())
620 }
621
622 pub async fn search(&self, text: &str, limit: usize) -> Result<Vec<Record>> {
631 let search = self.ensure_search()?;
632 let keys = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
633 search.query_keys(text, limit)
634 })) {
635 Ok(result) => result?,
636 Err(_panic) => {
637 tracing::error!("search index panicked during query — returning empty results");
638 return Ok(vec![]);
639 }
640 };
641 let mut records = Vec::with_capacity(keys.len());
642 for key in &keys {
643 if let Some(record) = self.get(key).await? {
644 records.push(record);
645 }
646 }
647 Ok(records)
648 }
649
650 pub async fn search_scored(&self, text: &str, limit: usize) -> Result<Vec<(f32, Record)>> {
656 let search = self.ensure_search()?;
657 let scored_keys = match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
658 search.query_keys_scored(text, limit)
659 })) {
660 Ok(result) => result?,
661 Err(_panic) => {
662 tracing::error!(
663 "search index panicked during scored query — returning empty results"
664 );
665 return Ok(vec![]);
666 }
667 };
668 let mut results = Vec::with_capacity(scored_keys.len());
669 for (score, key) in &scored_keys {
670 if let Some(record) = self.get(key).await? {
671 results.push((*score, record));
672 }
673 }
674 Ok(results)
675 }
676
677 pub async fn get_raw_bytes(&self, key: &str) -> Result<Option<Vec<u8>>> {
682 let tree = self.tree_for(key);
683 let txn = tree.begin_with_mode(Mode::ReadOnly)?;
684 match txn.get(key.as_bytes())? {
685 None => Ok(None),
686 Some(bytes) => Ok(Some(bytes.to_vec())),
687 }
688 }
689
690 pub async fn put_raw(&self, key: &str, value: &[u8]) -> Result<()> {
697 let durability = Durability::for_key(key);
698 let tree = self.tree_for(key);
699 let mut txn = tree.begin_with_mode(Mode::WriteOnly)?;
700 txn.set_durability(skv_durability(durability));
701 txn.set(key.as_bytes(), value.to_vec())?;
702 txn.commit().await?;
703 Ok(())
704 }
705
706 pub async fn put_batch_raw(&self, records: &[(&str, &[u8])]) -> Result<()> {
711 if records.is_empty() {
712 return Ok(());
713 }
714
715 let mut immediate: Vec<(&str, &[u8])> = Vec::new();
716 let mut eventual: Vec<(&str, &[u8])> = Vec::new();
717 for &(key, value) in records {
718 match Durability::for_key(key) {
719 Durability::Immediate => immediate.push((key, value)),
720 Durability::Eventual => eventual.push((key, value)),
721 }
722 }
723
724 if !immediate.is_empty() {
725 let mut txn = self.knowledge.begin_with_mode(Mode::WriteOnly)?;
726 txn.set_durability(SkvDurability::Immediate);
727 for (key, value) in &immediate {
728 txn.set(key.as_bytes(), value.to_vec())?;
729 }
730 txn.commit().await?;
731 }
732
733 if !eventual.is_empty() {
734 let mut txn = self.sessions.begin_with_mode(Mode::WriteOnly)?;
735 txn.set_durability(SkvDurability::Eventual);
736 for (key, value) in &eventual {
737 txn.set(key.as_bytes(), value.to_vec())?;
738 }
739 txn.commit().await?;
740 }
741
742 Ok(())
743 }
744
745 pub async fn transact_knowledge(&self, ops: &[KnowledgeWriteOp<'_>]) -> Result<()> {
763 if ops.is_empty() {
764 return Ok(());
765 }
766 for op in ops {
767 let k = match op {
768 KnowledgeWriteOp::PutRecord { key, .. } => *key,
769 KnowledgeWriteOp::PutRaw { key, .. } => *key,
770 };
771 if Durability::for_key(k) != Durability::Immediate {
772 anyhow::bail!(
773 "transact_knowledge: key '{k}' routes to sessions tree, not knowledge"
774 );
775 }
776 }
777
778 let mut record_refs: Vec<&Record> = Vec::new();
780
781 let mut txn = self.knowledge.begin_with_mode(Mode::WriteOnly)?;
782 txn.set_durability(SkvDurability::Immediate);
783 for op in ops {
784 match op {
785 KnowledgeWriteOp::PutRecord { key, record } => {
786 let bytes = rmps::to_vec_named(record)
787 .with_context(|| format!("failed to serialize record for key '{key}'"))?;
788 txn.set(key.as_bytes(), bytes)?;
789 record_refs.push(record);
790 }
791 KnowledgeWriteOp::PutRaw { key, value } => {
792 txn.set(key.as_bytes(), value.to_vec())?;
793 }
794 }
795 }
796 txn.commit().await?;
797
798 let has_knowledge = ops.iter().any(|op| {
800 let k = match op {
801 KnowledgeWriteOp::PutRecord { key, .. } => key,
802 KnowledgeWriteOp::PutRaw { key, .. } => key,
803 };
804 is_knowledge_key(k)
805 });
806 if has_knowledge {
807 let _ = std::fs::write(self.root.join(SEARCH_SYNC_PENDING), b"");
808 }
809 let mut search_synced = false;
810 if !record_refs.is_empty() {
811 if let Ok(search) = self.ensure_search() {
812 match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
813 search.add_records(&record_refs)
814 })) {
815 Ok(Ok(_)) => search_synced = true,
816 Ok(Err(e)) => tracing::warn!("transact_knowledge: tantivy sync failed: {e}"),
817 Err(_) => tracing::error!("transact_knowledge: tantivy panicked"),
818 }
819 }
820 }
821 if has_knowledge {
822 self.bump_write_seq();
823 if search_synced {
824 let _ = std::fs::remove_file(self.root.join(SEARCH_SYNC_PENDING));
825 }
826 }
827 Ok(())
828 }
829
830 pub async fn transact_sessions_raw(&self, entries: &[(&str, &[u8])]) -> Result<()> {
838 if entries.is_empty() {
839 return Ok(());
840 }
841 for (k, _) in entries {
842 if Durability::for_key(k) != Durability::Eventual {
843 anyhow::bail!(
844 "transact_sessions_raw: key '{k}' routes to knowledge tree, not sessions"
845 );
846 }
847 }
848
849 let mut txn = self.sessions.begin_with_mode(Mode::WriteOnly)?;
850 txn.set_durability(SkvDurability::Eventual);
851 for (key, value) in entries {
852 txn.set(key.as_bytes(), value.to_vec())?;
853 }
854 txn.commit().await?;
855 Ok(())
856 }
857
858 pub async fn scan_keys(&self, prefix: &str) -> Result<Vec<String>> {
864 let tree = self.tree_for(prefix);
865 let txn = tree.begin_with_mode(Mode::ReadOnly)?;
866 let end = prefix_end(prefix);
867 let mut cursor = txn.range(prefix.as_bytes(), end.as_bytes())?;
868
869 let mut keys = Vec::new();
870 while cursor.next()? {
871 let user_key = cursor.key().user_key();
872 match std::str::from_utf8(user_key) {
873 Ok(s) => keys.push(s.to_string()),
874 Err(e) => tracing::warn!("skipping non-UTF8 key in scan_keys: {e}"),
875 }
876 }
877 Ok(keys)
878 }
879
880 pub fn history(&self, key: &str, limit: usize) -> Result<Vec<HistoryEntry>> {
891 anyhow::ensure!(!key.is_empty(), "history key must not be empty");
892 let tree = self.tree_for(key);
893 let txn = tree.begin_with_mode(Mode::ReadOnly)?;
894
895 let mut opts = HistoryOptions::new().with_tombstones(true);
896 if limit > 0 {
897 opts = opts.with_limit(limit);
898 }
899
900 history_impl(&txn, key, &opts)
901 }
902
903 pub fn history_since(
908 &self,
909 key: &str,
910 since_ts: u64,
911 limit: usize,
912 ) -> Result<Vec<HistoryEntry>> {
913 anyhow::ensure!(!key.is_empty(), "history key must not be empty");
914 let tree = self.tree_for(key);
915 let txn = tree.begin_with_mode(Mode::ReadOnly)?;
916
917 let since_ns = since_ts.saturating_mul(1_000_000_000);
918 let mut opts = HistoryOptions::new()
919 .with_tombstones(true)
920 .with_ts_range(since_ns, u64::MAX);
921 if limit > 0 {
922 opts = opts.with_limit(limit);
923 }
924
925 history_impl(&txn, key, &opts)
926 }
927
928 pub async fn records_since(&self, since_ts: u64, limit: usize) -> Result<Vec<Record>> {
934 let mut results = Vec::new();
935 for ns in KNOWLEDGE_NAMESPACES {
936 let records = self.scan_prefix(ns).await?;
937 for r in records {
938 if r.updated_at >= since_ts {
939 results.push(r);
940 }
941 }
942 }
943 results.sort_by(|a, b| {
945 b.updated_at
946 .cmp(&a.updated_at)
947 .then_with(|| a.key.cmp(&b.key))
948 });
949 if limit > 0 && results.len() > limit {
950 results.truncate(limit);
951 }
952 Ok(results)
953 }
954
955 pub async fn close(self) -> Result<()> {
966 tokio::try_join!(self.knowledge.close(), self.sessions.close())?;
967 if let Some(search) = self.search.into_inner() {
969 search.close()?;
970 }
971 Ok(())
972 }
973
974 pub async fn flush_for_shutdown(&self) {
991 if let Err(e) = self.knowledge.flush_wal(true) {
992 tracing::warn!("flush_for_shutdown: knowledge tree flush failed: {e}");
993 }
994 if let Err(e) = self.sessions.flush_wal(true) {
995 tracing::warn!("flush_for_shutdown: sessions tree flush failed: {e}");
996 }
997 }
998
999 pub async fn ping(&self) -> Result<u64> {
1008 let start = now_micros();
1009
1010 let sentinel_key = "analytics:ping_probe";
1011 let ts = start.to_string();
1012 let mut txn = self.sessions.begin_with_mode(Mode::WriteOnly)?;
1013 txn.set_durability(SkvDurability::Eventual);
1014 txn.set(sentinel_key.as_bytes(), ts.as_bytes())?;
1015 txn.commit().await?;
1016
1017 let txn = self.sessions.begin_with_mode(Mode::ReadOnly)?;
1018 let result = txn.get(sentinel_key.as_bytes())?;
1019 anyhow::ensure!(
1020 result.is_some(),
1021 "ping sentinel write was not visible on read-back"
1022 );
1023
1024 Ok(now_micros() - start)
1025 }
1026
1027 fn write_seq_path(&self) -> PathBuf {
1033 self.root.join("health_write_seq")
1034 }
1035
1036 pub fn read_write_seq(&self) -> u64 {
1041 std::fs::read_to_string(self.write_seq_path())
1042 .ok()
1043 .and_then(|s| s.trim().parse().ok())
1044 .unwrap_or(0)
1045 }
1046
1047 fn bump_write_seq(&self) {
1052 let next = self.read_write_seq().wrapping_add(1);
1053 let _ = std::fs::write(self.write_seq_path(), next.to_string());
1054 }
1055
1056 fn tree_for(&self, key: &str) -> &Tree {
1062 match Durability::for_key(key) {
1063 Durability::Eventual => &self.sessions,
1064 Durability::Immediate => &self.knowledge,
1065 }
1066 }
1067
1068 pub fn sessions_tree(&self) -> &Tree {
1073 &self.sessions
1074 }
1075}
1076
1077fn lock_error_hint(err: anyhow::Error, db_path: &std::path::Path) -> anyhow::Error {
1091 let msg = format!("{err}");
1092 if msg.contains("already locked") || msg.contains("WouldBlock") {
1093 let lock_file = db_path.join("LOCK");
1096 let pid_hint = std::fs::read_to_string(&lock_file)
1097 .ok()
1098 .and_then(|s| s.trim().parse::<u32>().ok())
1099 .map(|pid| format!(" (holder PID: {pid})"))
1100 .unwrap_or_default();
1101 anyhow::anyhow!(
1102 "cannot open {} — another mati process holds the lock{pid_hint}.\n\
1103 This is usually the MCP server (mati serve) or a background daemon.\n\
1104 To stop the daemon: `mati daemon stop`\n\
1105 To check: `lsof {}/LOCK`",
1106 db_path.display(),
1107 db_path.display()
1108 )
1109 } else {
1110 err
1111 }
1112}
1113
1114fn open_knowledge_tree(path: PathBuf) -> Result<Tree> {
1115 let opts = Options::new()
1118 .with_path(path)
1119 .with_versioning(true, 0) .with_enable_vlog(true)
1121 .with_vlog_value_threshold(0)
1122 .with_vlog_checksum_verification(VLogChecksumLevel::Full);
1123 TreeBuilder::with_options(opts)
1124 .build()
1125 .context("failed to open knowledge.db")
1126}
1127
1128fn open_sessions_tree(path: PathBuf) -> Result<Tree> {
1129 let opts = Options::new()
1133 .with_path(path)
1134 .with_versioning(true, SESSIONS_RETENTION_NS)
1135 .with_enable_vlog(true)
1136 .with_vlog_value_threshold(0);
1137 TreeBuilder::with_options(opts)
1138 .build()
1139 .context("failed to open sessions.db")
1140}
1141
1142pub fn derive_slug(repo_root: &Path) -> String {
1154 let canon = std::fs::canonicalize(repo_root).unwrap_or_else(|_| repo_root.to_path_buf());
1161
1162 let git_root = walk_up_for_git_root(&canon).unwrap_or_else(|| canon.clone());
1167
1168 let input =
1169 read_remote_url(&git_root).unwrap_or_else(|| git_root.to_string_lossy().into_owned());
1170
1171 let digest = Sha256::digest(input.as_bytes());
1172 hex::encode(&digest[..4]) }
1174
1175fn walk_up_for_git_root(start: &Path) -> Option<PathBuf> {
1179 let mut cur = start;
1180 loop {
1181 if cur.join(".git").join("config").is_file() {
1182 return Some(cur.to_path_buf());
1183 }
1184 cur = cur.parent()?;
1185 }
1186}
1187
1188fn read_remote_url(repo_root: &Path) -> Option<String> {
1190 let config = std::fs::read_to_string(repo_root.join(".git").join("config")).ok()?;
1191 config
1192 .lines()
1193 .find(|l| l.trim_start().starts_with("url ="))
1194 .map(|l| {
1195 l.split_once('=')
1196 .map(|(_, v)| v.trim().to_owned())
1197 .unwrap_or_default()
1198 })
1199}
1200
1201fn read_record(txn: &Transaction, key: &str) -> Result<Option<Record>> {
1207 match txn.get(key.as_bytes())? {
1208 None => Ok(None),
1209 Some(bytes) => {
1210 let record = rmps::from_slice::<Record>(&bytes)
1211 .with_context(|| format!("corrupt record at key '{key}'"))?;
1212 Ok(Some(record))
1213 }
1214 }
1215}
1216
1217fn skv_durability(d: Durability) -> SkvDurability {
1219 match d {
1220 Durability::Immediate => SkvDurability::Immediate,
1221 Durability::Eventual => SkvDurability::Eventual,
1222 }
1223}
1224
1225fn prefix_end(prefix: &str) -> String {
1229 let mut bytes = prefix.as_bytes().to_vec();
1230 for b in bytes.iter_mut().rev() {
1232 if *b < 0xff {
1233 *b += 1;
1234 return String::from_utf8(bytes).unwrap_or_else(|_| "\u{ffff}".to_owned());
1235 }
1236 *b = 0x00;
1237 }
1238 "\u{ffff}".to_owned()
1240}
1241
1242#[derive(Debug, Clone, Serialize, Deserialize)]
1248pub struct HistoryEntry {
1249 pub timestamp_secs: u64,
1251 pub timestamp_ns: u64,
1253 pub record: Option<Record>,
1255 pub is_tombstone: bool,
1257}
1258
1259fn history_impl(txn: &Transaction, key: &str, opts: &HistoryOptions) -> Result<Vec<HistoryEntry>> {
1265 let mut upper = key.as_bytes().to_vec();
1268 upper.push(0x00);
1269
1270 let mut cursor = txn.history_with_options(key.as_bytes(), upper.as_slice(), opts)?;
1271
1272 let mut entries = Vec::new();
1273 while cursor.next()? {
1274 let key_ref = cursor.key();
1275
1276 if key_ref.user_key() != key.as_bytes() {
1278 continue;
1279 }
1280
1281 let is_tombstone = key_ref.is_tombstone();
1282 let ts_ns = key_ref.timestamp();
1283 let ts_secs = ts_ns / 1_000_000_000;
1284
1285 let record = if is_tombstone {
1286 None
1287 } else {
1288 match cursor.value() {
1289 Ok(bytes) => rmps::from_slice::<Record>(&bytes).ok(),
1290 Err(_) => None,
1291 }
1292 };
1293
1294 entries.push(HistoryEntry {
1295 timestamp_secs: ts_secs,
1296 timestamp_ns: ts_ns,
1297 record,
1298 is_tombstone,
1299 });
1300 }
1301
1302 entries.sort_by_key(|e| std::cmp::Reverse(e.timestamp_ns));
1305 Ok(entries)
1306}
1307
1308fn now_micros() -> u64 {
1310 SystemTime::now()
1311 .duration_since(UNIX_EPOCH)
1312 .map(|d| d.as_micros() as u64)
1313 .unwrap_or(0)
1314}
1315
1316#[cfg(test)]
1321mod tests {
1322 use super::*;
1323 use tempfile::TempDir;
1324
1325 fn temp_store() -> (Store, TempDir) {
1327 let dir = TempDir::new().unwrap();
1328 let root = dir.path().join("mati_test");
1330 std::fs::create_dir_all(&root).unwrap();
1331 let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
1332 let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
1333 let search = OnceCell::new();
1334 let _ = search.set(Search::open(&root.join("search_index")).unwrap());
1335 let store = Store {
1336 knowledge,
1337 sessions,
1338 search,
1339 root: root.clone(),
1340 index_needs_rebuild: false,
1341 };
1342 (store, dir)
1343 }
1344
1345 #[tokio::test]
1346 async fn ping_roundtrip() {
1347 let (store, _dir) = temp_store();
1348 let latency = store.ping().await.unwrap();
1349 assert!(latency < 5_000_000, "ping took >5s: {latency}µs");
1350 }
1351
1352 #[tokio::test]
1353 async fn put_get_roundtrip() {
1354 use crate::store::record::{
1355 Category, ConfidenceScore, Priority, QualityScore, Record, RecordLifecycle,
1356 RecordSource, RecordVersion, StalenessScore,
1357 };
1358 use uuid::Uuid;
1359
1360 let (store, _dir) = temp_store();
1361
1362 let device_id = Uuid::new_v4();
1363 let record = Record {
1364 key: "gotcha:test-key".to_string(),
1365 value: "test value".to_string(),
1366 category: Category::Gotcha,
1367 priority: Priority::High,
1368 tags: vec!["test".to_string()],
1369 created_at: 0,
1370 updated_at: 0,
1371 ref_url: None,
1372 staleness: StalenessScore::fresh(),
1373 lifecycle: RecordLifecycle::Active,
1374 version: RecordVersion {
1375 device_id,
1376 logical_clock: 1,
1377 wall_clock: 0,
1378 },
1379 quality: QualityScore::layer0_default(),
1380 access_count: 0,
1381 last_accessed: 0,
1382 source: RecordSource::StaticAnalysis,
1383 confidence: ConfidenceScore::for_new_record(&RecordSource::StaticAnalysis),
1384 gap_analysis_score: 0.0,
1385 payload: None,
1386 };
1387
1388 store.put("gotcha:test-key", &record).await.unwrap();
1389 let got = store.get("gotcha:test-key").await.unwrap();
1390 assert!(got.is_some());
1391 assert_eq!(got.unwrap().key, "gotcha:test-key");
1392 }
1393
1394 #[tokio::test]
1395 async fn put_delete_get_returns_none() {
1396 use crate::store::record::{
1397 Category, ConfidenceScore, Priority, QualityScore, Record, RecordLifecycle,
1398 RecordSource, RecordVersion, StalenessScore,
1399 };
1400 use uuid::Uuid;
1401
1402 let (store, _dir) = temp_store();
1403
1404 let device_id = Uuid::new_v4();
1405 let record = Record {
1406 key: "file:src/main.rs".to_string(),
1407 value: "entry point".to_string(),
1408 category: Category::File,
1409 priority: Priority::Normal,
1410 tags: vec![],
1411 created_at: 0,
1412 updated_at: 0,
1413 ref_url: None,
1414 staleness: StalenessScore::fresh(),
1415 lifecycle: RecordLifecycle::Active,
1416 version: RecordVersion {
1417 device_id,
1418 logical_clock: 1,
1419 wall_clock: 0,
1420 },
1421 quality: QualityScore::layer0_default(),
1422 access_count: 0,
1423 last_accessed: 0,
1424 source: RecordSource::StaticAnalysis,
1425 confidence: ConfidenceScore::for_new_record(&RecordSource::StaticAnalysis),
1426 gap_analysis_score: 0.0,
1427 payload: None,
1428 };
1429
1430 store.put("file:src/main.rs", &record).await.unwrap();
1431 store.delete("file:src/main.rs").await.unwrap();
1432 let got = store.get("file:src/main.rs").await.unwrap();
1433 assert!(got.is_none());
1434 }
1435
1436 #[tokio::test]
1437 async fn scan_prefix_returns_matching_keys() {
1438 use crate::store::record::{
1439 Category, ConfidenceScore, Priority, QualityScore, Record, RecordLifecycle,
1440 RecordSource, RecordVersion, StalenessScore,
1441 };
1442 use uuid::Uuid;
1443
1444 let (store, _dir) = temp_store();
1445 let device_id = Uuid::new_v4();
1446
1447 let make_record = |key: &str| Record {
1448 key: key.to_string(),
1449 value: "v".to_string(),
1450 category: Category::Gotcha,
1451 priority: Priority::Normal,
1452 tags: vec![],
1453 created_at: 0,
1454 updated_at: 0,
1455 ref_url: None,
1456 staleness: StalenessScore::fresh(),
1457 lifecycle: RecordLifecycle::Active,
1458 version: RecordVersion {
1459 device_id,
1460 logical_clock: 1,
1461 wall_clock: 0,
1462 },
1463 quality: QualityScore::layer0_default(),
1464 access_count: 0,
1465 last_accessed: 0,
1466 source: RecordSource::StaticAnalysis,
1467 confidence: ConfidenceScore::for_new_record(&RecordSource::StaticAnalysis),
1468 gap_analysis_score: 0.0,
1469 payload: None,
1470 };
1471
1472 store
1473 .put("gotcha:alpha", &make_record("gotcha:alpha"))
1474 .await
1475 .unwrap();
1476 store
1477 .put("gotcha:beta", &make_record("gotcha:beta"))
1478 .await
1479 .unwrap();
1480 store
1481 .put("gotcha:gamma", &make_record("gotcha:gamma"))
1482 .await
1483 .unwrap();
1484 store
1485 .put("file:src/main.rs", &make_record("file:src/main.rs"))
1486 .await
1487 .unwrap();
1488
1489 let results = store.scan_prefix("gotcha:").await.unwrap();
1490 assert_eq!(results.len(), 3);
1491 }
1492
1493 #[tokio::test]
1494 async fn write_100_records_survive_reopen() {
1495 use crate::store::record::{
1496 Category, ConfidenceScore, Priority, QualityScore, Record, RecordLifecycle,
1497 RecordSource, RecordVersion, StalenessScore,
1498 };
1499 use uuid::Uuid;
1500
1501 let dir = TempDir::new().unwrap();
1502 let root = dir.path().join("mati_test");
1503 std::fs::create_dir_all(&root).unwrap();
1504 let device_id = Uuid::new_v4();
1505
1506 let make_record = |i: usize| {
1507 let key = format!("gotcha:item-{i:03}");
1508 Record {
1509 key: key.clone(),
1510 value: format!("value {i}"),
1511 category: Category::Gotcha,
1512 priority: Priority::Normal,
1513 tags: vec![],
1514 created_at: i as u64,
1515 updated_at: i as u64,
1516 ref_url: None,
1517 staleness: StalenessScore::fresh(),
1518 lifecycle: RecordLifecycle::Active,
1519 version: RecordVersion {
1520 device_id,
1521 logical_clock: i as u64,
1522 wall_clock: i as u64,
1523 },
1524 quality: QualityScore::layer0_default(),
1525 access_count: 0,
1526 last_accessed: 0,
1527 source: RecordSource::StaticAnalysis,
1528 confidence: ConfidenceScore::for_new_record(&RecordSource::StaticAnalysis),
1529 gap_analysis_score: 0.0,
1530 payload: None,
1531 }
1532 };
1533
1534 {
1536 let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
1537 let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
1538 let search = OnceCell::new();
1539 let _ = search.set(Search::open(&root.join("search_index")).unwrap());
1540 let store = Store {
1541 knowledge,
1542 sessions,
1543 search,
1544 root: root.clone(),
1545 index_needs_rebuild: false,
1546 };
1547 for i in 0..100 {
1548 let r = make_record(i);
1549 store.put(&r.key, &r).await.unwrap();
1550 }
1551 store.close().await.unwrap();
1552 }
1553
1554 {
1556 let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
1557 let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
1558 let search = OnceCell::new();
1559 let _ = search.set(Search::open(&root.join("search_index")).unwrap());
1560 let store = Store {
1561 knowledge,
1562 sessions,
1563 search,
1564 root: root.clone(),
1565 index_needs_rebuild: false,
1566 };
1567 let results = store.scan_prefix("gotcha:").await.unwrap();
1568 assert_eq!(
1569 results.len(),
1570 100,
1571 "expected 100 records after reopen, got {}",
1572 results.len()
1573 );
1574 store.close().await.unwrap();
1575 }
1576 }
1577
1578 #[test]
1579 fn slug_is_8_hex_chars() {
1580 let slug = derive_slug(Path::new("/some/repo"));
1581 assert_eq!(slug.len(), 8);
1582 assert!(slug.chars().all(|c| c.is_ascii_hexdigit()));
1583 }
1584
1585 #[test]
1586 fn slug_is_deterministic() {
1587 let a = derive_slug(Path::new("/some/repo"));
1588 let b = derive_slug(Path::new("/some/repo"));
1589 assert_eq!(a, b);
1590 }
1591
1592 #[test]
1593 fn prefix_end_increments_last_byte() {
1594 assert_eq!(prefix_end("gotcha:"), "gotcha;");
1596 let all_ff = String::from_utf8(vec![0xff, 0xff]).unwrap_or_default();
1598 let end = prefix_end(&all_ff);
1599 assert_eq!(end, "\u{ffff}");
1600 }
1601
1602 fn make_record(key: &str) -> Record {
1605 use crate::store::record::{
1606 Category, ConfidenceScore, Priority, QualityScore, RecordLifecycle, RecordSource,
1607 RecordVersion, StalenessScore,
1608 };
1609 Record {
1610 key: key.to_string(),
1611 value: format!("value for {key}"),
1612 category: Category::Gotcha,
1613 priority: Priority::Normal,
1614 tags: vec![],
1615 created_at: 0,
1616 updated_at: 0,
1617 ref_url: None,
1618 staleness: StalenessScore::fresh(),
1619 lifecycle: RecordLifecycle::Active,
1620 version: RecordVersion {
1621 device_id: uuid::Uuid::new_v4(),
1622 logical_clock: 1,
1623 wall_clock: 0,
1624 },
1625 quality: QualityScore::layer0_default(),
1626 access_count: 0,
1627 last_accessed: 0,
1628 source: RecordSource::StaticAnalysis,
1629 confidence: ConfidenceScore::for_new_record(&RecordSource::StaticAnalysis),
1630 gap_analysis_score: 0.0,
1631 payload: None,
1632 }
1633 }
1634
1635 #[tokio::test]
1638 async fn get_never_written_key_returns_none() {
1639 let (store, _dir) = temp_store();
1640 assert!(store.get("gotcha:does-not-exist").await.unwrap().is_none());
1641 }
1642
1643 #[tokio::test]
1644 async fn put_twice_second_value_wins() {
1645 let (store, _dir) = temp_store();
1646 let mut r = make_record("gotcha:overwrite-me");
1647 store.put("gotcha:overwrite-me", &r).await.unwrap();
1648 r.value = "updated value".to_string();
1649 r.version.logical_clock = 2;
1650 store.put("gotcha:overwrite-me", &r).await.unwrap();
1651 let got = store.get("gotcha:overwrite-me").await.unwrap().unwrap();
1652 assert_eq!(got.value, "updated value", "second write must win");
1653 assert_eq!(got.version.logical_clock, 2);
1654 }
1655
1656 #[tokio::test]
1657 async fn delete_nonexistent_key_is_noop() {
1658 let (store, _dir) = temp_store();
1659 store.delete("gotcha:never-existed").await.unwrap();
1660 assert!(store.get("gotcha:never-existed").await.unwrap().is_none());
1661 }
1662
1663 #[tokio::test]
1664 async fn delete_does_not_remove_sibling_keys() {
1665 let (store, _dir) = temp_store();
1666 store
1667 .put("gotcha:keep", &make_record("gotcha:keep"))
1668 .await
1669 .unwrap();
1670 store
1671 .put("gotcha:remove", &make_record("gotcha:remove"))
1672 .await
1673 .unwrap();
1674 store.delete("gotcha:remove").await.unwrap();
1675 assert!(
1676 store.get("gotcha:keep").await.unwrap().is_some(),
1677 "sibling must survive"
1678 );
1679 assert!(store.get("gotcha:remove").await.unwrap().is_none());
1680 }
1681
1682 #[tokio::test]
1685 async fn scan_prefix_empty_result() {
1686 let (store, _dir) = temp_store();
1687 assert!(store.scan_prefix("gotcha:").await.unwrap().is_empty());
1688 }
1689
1690 #[tokio::test]
1691 async fn scan_prefix_does_not_spill_across_namespaces() {
1692 let (store, _dir) = temp_store();
1693 store
1694 .put("gotcha:alpha", &make_record("gotcha:alpha"))
1695 .await
1696 .unwrap();
1697 store
1698 .put("file:src/main.rs", &make_record("file:src/main.rs"))
1699 .await
1700 .unwrap();
1701 store
1702 .put("decision:arch", &make_record("decision:arch"))
1703 .await
1704 .unwrap();
1705
1706 let gotcha = store.scan_prefix("gotcha:").await.unwrap();
1707 assert_eq!(gotcha.len(), 1);
1708 assert_eq!(gotcha[0].key, "gotcha:alpha");
1709
1710 let file = store.scan_prefix("file:").await.unwrap();
1711 assert_eq!(file.len(), 1);
1712 assert_eq!(file[0].key, "file:src/main.rs");
1713
1714 let decision = store.scan_prefix("decision:").await.unwrap();
1715 assert_eq!(decision.len(), 1);
1716 assert_eq!(decision[0].key, "decision:arch");
1717 }
1718
1719 #[tokio::test]
1720 async fn scan_prefix_values_match_stored_values() {
1721 let (store, _dir) = temp_store();
1722 for key in ["gotcha:alpha", "gotcha:beta", "gotcha:gamma"] {
1723 let mut r = make_record(key);
1724 r.value = format!("sentinel:{key}");
1725 store.put(key, &r).await.unwrap();
1726 }
1727 let mut results = store.scan_prefix("gotcha:").await.unwrap();
1728 results.sort_by(|a, b| a.key.cmp(&b.key));
1729 assert_eq!(results.len(), 3);
1730 for r in &results {
1731 assert_eq!(
1732 r.value,
1733 format!("sentinel:{}", r.key),
1734 "value mismatch for key '{}'",
1735 r.key
1736 );
1737 }
1738 }
1739
1740 #[tokio::test]
1741 async fn scan_prefix_excludes_adjacent_namespaces() {
1742 let (store, _dir) = temp_store();
1744 store
1745 .put("gotcha:real", &make_record("gotcha:real"))
1746 .await
1747 .unwrap();
1748 store
1749 .put("decision:before", &make_record("decision:before"))
1750 .await
1751 .unwrap();
1752 store
1753 .put("file:after", &make_record("file:after"))
1754 .await
1755 .unwrap();
1756
1757 let results = store.scan_prefix("gotcha:").await.unwrap();
1758 assert_eq!(results.len(), 1, "only gotcha: keys should appear");
1759 assert_eq!(results[0].key, "gotcha:real");
1760 }
1761
1762 #[tokio::test]
1765 async fn knowledge_and_session_trees_are_isolated() {
1766 let (store, _dir) = temp_store();
1767 store
1768 .put("gotcha:in-knowledge", &make_record("gotcha:in-knowledge"))
1769 .await
1770 .unwrap();
1771 store
1772 .put("session:12345", &make_record("session:12345"))
1773 .await
1774 .unwrap();
1775
1776 let gotcha_results = store.scan_prefix("gotcha:").await.unwrap();
1777 let session_results = store.scan_prefix("session:").await.unwrap();
1778
1779 assert_eq!(gotcha_results.len(), 1);
1780 assert_eq!(gotcha_results[0].key, "gotcha:in-knowledge");
1781 assert_eq!(session_results.len(), 1);
1782 assert_eq!(session_results[0].key, "session:12345");
1783 assert!(
1784 gotcha_results
1785 .iter()
1786 .all(|r| !r.key.starts_with("session:")),
1787 "session records must not appear in gotcha: scan"
1788 );
1789 assert!(
1790 session_results
1791 .iter()
1792 .all(|r| !r.key.starts_with("gotcha:")),
1793 "gotcha records must not appear in session: scan"
1794 );
1795 }
1796
1797 #[tokio::test]
1800 async fn scan_prefix_skips_corrupt_records_and_returns_valid_ones() {
1801 let (store, _dir) = temp_store();
1802 store
1803 .put("gotcha:good", &make_record("gotcha:good"))
1804 .await
1805 .unwrap();
1806
1807 {
1809 let mut txn = store.knowledge.begin().unwrap();
1810 txn.set_durability(SkvDurability::Immediate);
1811 txn.set(b"gotcha:corrupted", b"not valid json {{{").unwrap();
1812 txn.commit().await.unwrap();
1813 }
1814
1815 let results = store.scan_prefix("gotcha:").await.unwrap();
1816 assert_eq!(results.len(), 1, "corrupt record must be silently skipped");
1817 assert_eq!(results[0].key, "gotcha:good");
1818 }
1819
1820 #[tokio::test]
1821 async fn scan_prefix_all_corrupt_returns_empty_not_panic() {
1822 let (store, _dir) = temp_store();
1823 {
1824 let mut txn = store.knowledge.begin().unwrap();
1825 txn.set_durability(SkvDurability::Immediate);
1826 txn.set(b"gotcha:bad1", b"null").unwrap();
1827 txn.set(b"gotcha:bad2", b"{\"x\":1}").unwrap(); txn.commit().await.unwrap();
1829 }
1830 let results = store.scan_prefix("gotcha:").await.unwrap();
1831 assert_eq!(
1832 results.len(),
1833 0,
1834 "all corrupt — must return empty, not panic"
1835 );
1836 }
1837
1838 #[tokio::test]
1841 async fn ping_multiple_calls_all_succeed() {
1842 let (store, _dir) = temp_store();
1843 for i in 0..10 {
1844 let latency = store
1845 .ping()
1846 .await
1847 .unwrap_or_else(|e| panic!("ping #{i} failed: {e}"));
1848 assert!(latency < 5_000_000, "ping #{i} took >5 s: {latency} µs");
1849 }
1850 }
1851
1852 #[test]
1855 fn slug_differs_for_different_paths() {
1856 let a = derive_slug(Path::new("/repo/project-alpha"));
1857 let b = derive_slug(Path::new("/repo/project-beta"));
1858 assert_ne!(a, b, "distinct paths must produce distinct slugs");
1859 }
1860
1861 #[test]
1862 fn slug_uses_remote_url_not_local_path() {
1863 let url = "https://github.com/example/mati.git";
1866 let expected_slug = {
1867 let digest = Sha256::digest(url.as_bytes());
1868 hex::encode(&digest[..4])
1869 };
1870
1871 let dir = tempfile::TempDir::new().unwrap();
1872 let git_dir = dir.path().join(".git");
1873 std::fs::create_dir_all(&git_dir).unwrap();
1874 std::fs::write(
1875 git_dir.join("config"),
1876 format!("[remote \"origin\"]\n\turl = {url}\n"),
1877 )
1878 .unwrap();
1879
1880 let actual_slug = derive_slug(dir.path());
1881 assert_eq!(
1882 actual_slug, expected_slug,
1883 "slug must equal SHA-256(remote URL)[0..4] hex"
1884 );
1885
1886 let path_slug = {
1889 let input = dir.path().to_string_lossy().into_owned();
1890 let digest = Sha256::digest(input.as_bytes());
1891 hex::encode(&digest[..4])
1892 };
1893 assert_ne!(
1894 actual_slug, path_slug,
1895 "URL slug must differ from the path slug for the same directory"
1896 );
1897 }
1898
1899 #[test]
1900 fn slug_is_stable_for_identical_remote_urls() {
1901 let make_repo = |url: &str| {
1902 let dir = tempfile::TempDir::new().unwrap();
1903 let git_dir = dir.path().join(".git");
1904 std::fs::create_dir_all(&git_dir).unwrap();
1905 std::fs::write(
1906 git_dir.join("config"),
1907 format!("[remote \"origin\"]\n\turl = {url}\n"),
1908 )
1909 .unwrap();
1910 (derive_slug(dir.path()), dir)
1911 };
1912 let (slug_a, _dir_a) = make_repo("https://github.com/example/same-repo.git");
1913 let (slug_b, _dir_b) = make_repo("https://github.com/example/same-repo.git");
1914 assert_eq!(
1915 slug_a, slug_b,
1916 "same remote URL must always produce the same slug"
1917 );
1918 }
1919
1920 #[test]
1921 fn slug_differs_for_different_remote_urls() {
1922 let make_repo = |url: &str| {
1923 let dir = tempfile::TempDir::new().unwrap();
1924 let git_dir = dir.path().join(".git");
1925 std::fs::create_dir_all(&git_dir).unwrap();
1926 std::fs::write(
1927 git_dir.join("config"),
1928 format!("[remote \"origin\"]\n\turl = {url}\n"),
1929 )
1930 .unwrap();
1931 (derive_slug(dir.path()), dir)
1932 };
1933 let (slug_a, _dir_a) = make_repo("https://github.com/org/repo-alpha.git");
1934 let (slug_b, _dir_b) = make_repo("https://github.com/org/repo-beta.git");
1935 assert_ne!(
1936 slug_a, slug_b,
1937 "different remote URLs must produce different slugs"
1938 );
1939 }
1940
1941 #[test]
1944 fn prefix_end_empty_prefix_returns_sentinel() {
1945 assert_eq!(prefix_end(""), "\u{ffff}");
1947 }
1948
1949 #[test]
1950 fn prefix_end_single_ascii_char() {
1951 assert_eq!(prefix_end("a"), "b"); assert_eq!(prefix_end("z"), "{"); }
1954
1955 #[test]
1956 fn prefix_end_known_namespace_boundaries() {
1957 assert_eq!(prefix_end("gotcha:"), "gotcha;");
1959 assert_eq!(prefix_end("file:"), "file;");
1960 assert_eq!(prefix_end("decision:"), "decision;");
1961 assert_eq!(prefix_end("session:"), "session;");
1962 }
1963
1964 #[tokio::test]
1967 async fn delete_then_scan_excludes_deleted_key() {
1968 let (store, _dir) = temp_store();
1970 for key in ["gotcha:a", "gotcha:b", "gotcha:c", "gotcha:d"] {
1971 store.put(key, &make_record(key)).await.unwrap();
1972 }
1973 store.delete("gotcha:b").await.unwrap();
1974 store.delete("gotcha:d").await.unwrap();
1975
1976 let results = store.scan_prefix("gotcha:").await.unwrap();
1977 assert_eq!(results.len(), 2, "deleted keys must not appear in scan");
1978 let keys: Vec<_> = results.iter().map(|r| r.key.as_str()).collect();
1979 assert!(keys.contains(&"gotcha:a"), "gotcha:a must survive");
1980 assert!(keys.contains(&"gotcha:c"), "gotcha:c must survive");
1981 assert!(!keys.contains(&"gotcha:b"), "gotcha:b must be gone");
1982 assert!(!keys.contains(&"gotcha:d"), "gotcha:d must be gone");
1983 }
1984
1985 #[tokio::test]
1988 async fn overwrite_does_not_create_duplicate_in_scan() {
1989 let (store, _dir) = temp_store();
1992 let mut r = make_record("gotcha:dedup-me");
1993 store.put("gotcha:dedup-me", &r).await.unwrap();
1994 r.value = "v2".to_string();
1995 r.version.logical_clock = 2;
1996 store.put("gotcha:dedup-me", &r).await.unwrap();
1997 r.value = "v3".to_string();
1998 r.version.logical_clock = 3;
1999 store.put("gotcha:dedup-me", &r).await.unwrap();
2000
2001 let results = store.scan_prefix("gotcha:").await.unwrap();
2002 assert_eq!(
2003 results.len(),
2004 1,
2005 "3 overwrites of the same key must yield 1 result in scan"
2006 );
2007 assert_eq!(results[0].value, "v3", "scan must return the latest value");
2008 assert_eq!(results[0].version.logical_clock, 3);
2009 }
2010
2011 #[tokio::test]
2014 async fn put_get_preserves_all_record_fields() {
2015 use crate::store::record::{
2016 Category, ConfidenceScore, Priority, QualityScore, QualitySignal, QualityTier, Record,
2017 RecordLifecycle, RecordSource, RecordVersion, StalenessScore, StalenessSignal,
2018 StalenessTier,
2019 };
2020
2021 let (store, _dir) = temp_store();
2022 let device_id = uuid::Uuid::new_v4();
2023
2024 let written = Record {
2026 key: "gotcha:full-fields".to_string(),
2027 value: "Never hold a write txn across an await point.".to_string(),
2028 category: Category::Gotcha,
2029 priority: Priority::Critical,
2030 tags: vec![
2031 "async".to_string(),
2032 "tokio".to_string(),
2033 "surrealkv".to_string(),
2034 ],
2035 created_at: 1_710_520_800,
2036 updated_at: 1_710_520_900,
2037 ref_url: Some("https://github.com/example/issue/99".to_string()),
2038 staleness: StalenessScore {
2039 value: 0.42,
2040 tier: StalenessTier::Stale,
2041 signals: vec![
2042 StalenessSignal::NotAccessedDays(45),
2043 StalenessSignal::LinesChangedPct(0.3),
2044 ],
2045 computed_at: 1_710_520_800,
2046 last_record_sha: "abc123def456".to_string(),
2047 },
2048 lifecycle: RecordLifecycle::Active,
2049 version: RecordVersion {
2050 device_id,
2051 logical_clock: 7,
2052 wall_clock: 1_710_520_900,
2053 },
2054 quality: QualityScore {
2055 value: 0.78,
2056 tier: QualityTier::Good,
2057 signals: vec![
2058 QualitySignal::HasImperativeVerb,
2059 QualitySignal::HasCausality,
2060 ],
2061 computed_at: 1_710_520_800,
2062 },
2063 access_count: 12,
2064 last_accessed: 1_710_520_888,
2065 source: RecordSource::DeveloperManual,
2066 confidence: ConfidenceScore {
2067 value: 0.75,
2068 confirmation_count: 3,
2069 contributor_count: 2,
2070 last_challenged: Some(1_710_500_000),
2071 challenge_count: 1,
2072 },
2073 gap_analysis_score: 0.31,
2074 payload: None,
2075 };
2076
2077 store.put("gotcha:full-fields", &written).await.unwrap();
2078 let read = store.get("gotcha:full-fields").await.unwrap().unwrap();
2079
2080 assert_eq!(read.key, written.key);
2082 assert_eq!(read.value, written.value);
2083 assert_eq!(read.category, written.category);
2084 assert_eq!(read.priority, written.priority);
2085 assert_eq!(read.tags, written.tags);
2086 assert_eq!(read.created_at, written.created_at);
2087 assert_eq!(read.updated_at, written.updated_at);
2088 assert_eq!(read.ref_url, written.ref_url);
2089 assert_eq!(read.staleness.tier, written.staleness.tier);
2090 assert_eq!(
2091 read.staleness.last_record_sha,
2092 written.staleness.last_record_sha
2093 );
2094 assert_eq!(read.staleness.signals.len(), 2);
2095 assert_eq!(read.lifecycle, written.lifecycle);
2096 assert_eq!(read.version.device_id, written.version.device_id);
2097 assert_eq!(read.version.logical_clock, written.version.logical_clock);
2098 assert_eq!(read.version.wall_clock, written.version.wall_clock);
2099 assert_eq!(read.quality.tier, written.quality.tier);
2100 assert_eq!(read.quality.signals.len(), 2);
2101 assert_eq!(read.access_count, written.access_count);
2102 assert_eq!(read.last_accessed, written.last_accessed);
2103 assert_eq!(read.source, written.source);
2104 assert_eq!(
2105 read.confidence.confirmation_count,
2106 written.confidence.confirmation_count
2107 );
2108 assert_eq!(
2109 read.confidence.contributor_count,
2110 written.confidence.contributor_count
2111 );
2112 assert_eq!(
2113 read.confidence.last_challenged,
2114 written.confidence.last_challenged
2115 );
2116 assert_eq!(
2117 read.confidence.challenge_count,
2118 written.confidence.challenge_count
2119 );
2120 assert!((read.gap_analysis_score - written.gap_analysis_score).abs() < f32::EPSILON);
2121 }
2122
2123 #[tokio::test]
2126 async fn eventual_keys_survive_clean_close_and_reopen() {
2127 let dir = tempfile::TempDir::new().unwrap();
2130 let root = dir.path().join("mati_test");
2131 std::fs::create_dir_all(&root).unwrap();
2132
2133 {
2134 let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
2135 let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
2136 let search = OnceCell::new();
2137 let _ = search.set(Search::open(&root.join("search_index")).unwrap());
2138 let store = Store {
2139 knowledge,
2140 sessions,
2141 search,
2142 root: root.clone(),
2143 index_needs_rebuild: false,
2144 };
2145 for i in 0..10 {
2146 let key = format!("session:{i:04}");
2147 store.put(&key, &make_record(&key)).await.unwrap();
2148 }
2149 store.close().await.unwrap(); }
2151
2152 {
2153 let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
2154 let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
2155 let search = OnceCell::new();
2156 let _ = search.set(Search::open(&root.join("search_index")).unwrap());
2157 let store = Store {
2158 knowledge,
2159 sessions,
2160 search,
2161 root: root.clone(),
2162 index_needs_rebuild: false,
2163 };
2164 let results = store.scan_prefix("session:").await.unwrap();
2165 assert_eq!(
2166 results.len(),
2167 10,
2168 "Eventual session records must survive a clean close+reopen"
2169 );
2170 store.close().await.unwrap();
2171 }
2172 }
2173
2174 #[tokio::test]
2177 async fn scan_prefix_corrupt_in_middle_does_not_stop_iteration() {
2178 let (store, _dir) = temp_store();
2181
2182 store
2183 .put("gotcha:aaa", &make_record("gotcha:aaa"))
2184 .await
2185 .unwrap(); store
2187 .put("gotcha:zzz", &make_record("gotcha:zzz"))
2188 .await
2189 .unwrap(); {
2193 let mut txn = store.knowledge.begin().unwrap();
2194 txn.set_durability(SkvDurability::Immediate);
2195 txn.set(b"gotcha:mmm", b"not json").unwrap();
2196 txn.commit().await.unwrap();
2197 }
2198
2199 let results = store.scan_prefix("gotcha:").await.unwrap();
2200 assert_eq!(
2201 results.len(),
2202 2,
2203 "corruption in the middle must not truncate the scan"
2204 );
2205 let keys: Vec<_> = results.iter().map(|r| r.key.as_str()).collect();
2206 assert!(
2207 keys.contains(&"gotcha:aaa"),
2208 "record before corruption must be returned"
2209 );
2210 assert!(
2211 keys.contains(&"gotcha:zzz"),
2212 "record after corruption must be returned"
2213 );
2214 }
2215
2216 #[tokio::test]
2219 async fn tombstoned_record_survives_store_round_trip() {
2220 use crate::store::record::{RecordLifecycle, TombstoneReason};
2221 let (store, _dir) = temp_store();
2222 let mut r = make_record("file:src/deleted.rs");
2223 r.lifecycle = RecordLifecycle::Tombstoned {
2224 reason: TombstoneReason::FileDeleted,
2225 at: 1_710_520_800,
2226 };
2227 store.put("file:src/deleted.rs", &r).await.unwrap();
2228 let got = store.get("file:src/deleted.rs").await.unwrap().unwrap();
2229 match got.lifecycle {
2230 RecordLifecycle::Tombstoned { reason, at } => {
2231 assert_eq!(reason, TombstoneReason::FileDeleted);
2232 assert_eq!(at, 1_710_520_800);
2233 }
2234 other => panic!("expected Tombstoned, got {other:?}"),
2235 }
2236 }
2237
2238 #[tokio::test]
2239 async fn superseded_record_survives_store_round_trip() {
2240 use crate::store::record::RecordLifecycle;
2241 let (store, _dir) = temp_store();
2242 let mut r = make_record("gotcha:old-rule");
2243 r.lifecycle = RecordLifecycle::Superseded {
2244 by_key: "gotcha:new-rule".to_string(),
2245 };
2246 store.put("gotcha:old-rule", &r).await.unwrap();
2247 let got = store.get("gotcha:old-rule").await.unwrap().unwrap();
2248 match got.lifecycle {
2249 RecordLifecycle::Superseded { by_key } => {
2250 assert_eq!(by_key, "gotcha:new-rule");
2251 }
2252 other => panic!("expected Superseded, got {other:?}"),
2253 }
2254 }
2255
2256 #[test]
2259 fn slug_with_git_config_but_no_url_line_falls_back_to_path() {
2260 let dir = tempfile::TempDir::new().unwrap();
2261 let git_dir = dir.path().join(".git");
2262 std::fs::create_dir_all(&git_dir).unwrap();
2263 std::fs::write(
2265 git_dir.join("config"),
2266 "[core]\n\trepositoryformatversion = 0\n\tfilemode = true\n",
2267 )
2268 .unwrap();
2269
2270 let slug = derive_slug(dir.path());
2271 let expected = {
2274 let canon = std::fs::canonicalize(dir.path()).unwrap();
2275 let input = canon.to_string_lossy().into_owned();
2276 let digest = Sha256::digest(input.as_bytes());
2277 hex::encode(&digest[..4])
2278 };
2279 assert_eq!(slug, expected, "no url= line must fall back to path hash");
2280 }
2281
2282 #[test]
2283 fn slug_with_no_git_dir_falls_back_to_path() {
2284 let dir = tempfile::TempDir::new().unwrap();
2285 let slug = derive_slug(dir.path());
2287 let expected = {
2288 let canon = std::fs::canonicalize(dir.path()).unwrap();
2289 let input = canon.to_string_lossy().into_owned();
2290 let digest = Sha256::digest(input.as_bytes());
2291 hex::encode(&digest[..4])
2292 };
2293 assert_eq!(slug, expected);
2294 }
2295
2296 #[test]
2299 fn prefix_end_0x7f_byte_increments_to_0x80_which_is_invalid_utf8() {
2300 let input = String::from_utf8(vec![0x61, 0x7f]).unwrap(); let result = prefix_end(&input);
2304 assert_eq!(
2306 result, "\u{ffff}",
2307 "increment of 0x7f produces invalid UTF-8; must fall back to sentinel"
2308 );
2309 }
2310
2311 #[test]
2312 fn prefix_end_0xfe_byte_increments_to_0xff_still_invalid_utf8() {
2313 let input = unsafe { String::from_utf8_unchecked(vec![0x61, 0xfe]) };
2315 let result = prefix_end(&input);
2316 assert_eq!(result, "\u{ffff}");
2317 }
2318
2319 #[tokio::test]
2322 async fn put_batch_empty_is_noop() {
2323 let (store, _dir) = temp_store();
2324 store.put_batch(&[]).await.unwrap();
2325 assert!(store.scan_prefix("gotcha:").await.unwrap().is_empty());
2326 }
2327
2328 #[tokio::test]
2329 async fn put_batch_single_record_readable() {
2330 let (store, _dir) = temp_store();
2331 let r = make_record("gotcha:batch-single");
2332 store
2333 .put_batch(&[("gotcha:batch-single", &r)])
2334 .await
2335 .unwrap();
2336 let got = store.get("gotcha:batch-single").await.unwrap().unwrap();
2337 assert_eq!(got.key, "gotcha:batch-single");
2338 assert_eq!(got.value, r.value);
2339 }
2340
2341 #[tokio::test]
2342 async fn put_batch_all_records_readable() {
2343 let (store, _dir) = temp_store();
2344 let records: Vec<Record> = (0..10)
2345 .map(|i| make_record(&format!("gotcha:b{i}")))
2346 .collect();
2347 let pairs: Vec<(&str, &Record)> = records.iter().map(|r| (r.key.as_str(), r)).collect();
2348 store.put_batch(&pairs).await.unwrap();
2349 let results = store.scan_prefix("gotcha:b").await.unwrap();
2350 assert_eq!(results.len(), 10);
2351 }
2352
2353 #[tokio::test]
2354 async fn put_batch_mixed_durability_both_trees_written() {
2355 let (store, _dir) = temp_store();
2356 let immediate = make_record("gotcha:imm");
2357 let eventual = make_record("session:evt");
2358 store
2359 .put_batch(&[("gotcha:imm", &immediate), ("session:evt", &eventual)])
2360 .await
2361 .unwrap();
2362 assert!(store.get("gotcha:imm").await.unwrap().is_some());
2363 assert!(store.get("session:evt").await.unwrap().is_some());
2364 }
2365
2366 #[tokio::test]
2367 async fn put_batch_matches_sequential_put_for_same_records() {
2368 let (store_a, _dir_a) = temp_store();
2369 let (store_b, _dir_b) = temp_store();
2370 let records: Vec<Record> = (0..20)
2371 .map(|i| make_record(&format!("file:src/mod{i}.rs")))
2372 .collect();
2373
2374 for r in &records {
2376 store_a.put(&r.key, r).await.unwrap();
2377 }
2378 let pairs: Vec<(&str, &Record)> = records.iter().map(|r| (r.key.as_str(), r)).collect();
2380 store_b.put_batch(&pairs).await.unwrap();
2381
2382 let a = {
2383 let mut v = store_a.scan_prefix("file:").await.unwrap();
2384 v.sort_by(|x, y| x.key.cmp(&y.key));
2385 v
2386 };
2387 let b = {
2388 let mut v = store_b.scan_prefix("file:").await.unwrap();
2389 v.sort_by(|x, y| x.key.cmp(&y.key));
2390 v
2391 };
2392 assert_eq!(a.len(), b.len());
2393 for (ra, rb) in a.iter().zip(b.iter()) {
2394 assert_eq!(ra.key, rb.key);
2395 assert_eq!(ra.value, rb.value);
2396 }
2397 }
2398
2399 #[tokio::test]
2404 #[ignore]
2405 async fn put_batch_1200_faster_than_sequential() {
2406 use std::time::Instant;
2407
2408 let (store_seq, _dir_seq) = temp_store();
2409 let (store_bat, _dir_bat) = temp_store();
2410 let records: Vec<Record> = (0..1200)
2411 .map(|i| make_record(&format!("file:src/f{i}.rs")))
2412 .collect();
2413
2414 let seq_start = Instant::now();
2416 for r in &records {
2417 store_seq.put(&r.key, r).await.unwrap();
2418 }
2419 let seq_ms = seq_start.elapsed().as_millis();
2420
2421 let pairs: Vec<(&str, &Record)> = records.iter().map(|r| (r.key.as_str(), r)).collect();
2423 let bat_start = Instant::now();
2424 store_bat.put_batch(&pairs).await.unwrap();
2425 let bat_ms = bat_start.elapsed().as_millis();
2426
2427 assert!(
2429 bat_ms < seq_ms,
2430 "put_batch ({bat_ms}ms) was not faster than sequential puts ({seq_ms}ms)"
2431 );
2432
2433 let results = store_bat.scan_prefix("file:").await.unwrap();
2435 assert_eq!(results.len(), 1200);
2436 }
2437
2438 #[tokio::test]
2441 async fn search_returns_matching_records() {
2442 let (store, _dir) = temp_store();
2443 let mut r = make_record("gotcha:async-race");
2444 r.value = "never use inference inside async context".to_string();
2445 store.put(&r.key, &r).await.unwrap();
2446
2447 let results = store.search("inference", 10).await.unwrap();
2448 assert_eq!(results.len(), 1);
2449 assert_eq!(results[0].key, "gotcha:async-race");
2450 }
2451
2452 #[tokio::test]
2453 async fn search_empty_and_whitespace_query_returns_empty() {
2454 let (store, _dir) = temp_store();
2455 let r = make_record("gotcha:foo");
2456 store.put(&r.key, &r).await.unwrap();
2457 for blank in ["", " ", "\t", "\n"] {
2458 assert!(
2459 store.search(blank, 10).await.unwrap().is_empty(),
2460 "blank query {blank:?} must return empty"
2461 );
2462 }
2463 }
2464
2465 #[tokio::test]
2466 async fn search_no_match_returns_empty() {
2467 let (store, _dir) = temp_store();
2468 let r = make_record("gotcha:foo");
2469 store.put(&r.key, &r).await.unwrap();
2470 assert!(store
2471 .search("absolutely_no_match_xyzzy99", 10)
2472 .await
2473 .unwrap()
2474 .is_empty());
2475 }
2476
2477 #[tokio::test]
2478 async fn search_malformed_query_returns_partial_not_error() {
2479 let (store, _dir) = temp_store();
2482 let mut r = make_record("gotcha:async-race");
2483 r.value = "tokio runtime inference race condition".to_string();
2484 store.put(&r.key, &r).await.unwrap();
2485 let result = store.search("tokio AND", 10).await;
2487 assert!(result.is_ok(), "malformed query must not return Err");
2488 }
2489
2490 #[tokio::test]
2491 async fn search_limit_caps_results() {
2492 let (store, _dir) = temp_store();
2493 for i in 0..10 {
2494 let mut r = make_record(&format!("gotcha:item-{i:02}"));
2495 r.value = "tokio runtime executor gotcha performance".to_string();
2496 store.put(&r.key, &r).await.unwrap();
2497 }
2498 assert_eq!(store.search("tokio", 1).await.unwrap().len(), 1);
2499 assert_eq!(store.search("tokio", 5).await.unwrap().len(), 5);
2500 assert_eq!(store.search("tokio", 10).await.unwrap().len(), 10);
2501 assert_eq!(store.search("tokio", 999).await.unwrap().len(), 10);
2503 }
2504
2505 #[tokio::test]
2506 async fn search_deleted_record_not_returned() {
2507 let (store, _dir) = temp_store();
2510 let mut r = make_record("gotcha:deleted");
2511 r.value = "this_unique_sentinel_deleted_record".to_string();
2512 store.put(&r.key, &r).await.unwrap();
2513
2514 assert_eq!(
2516 store
2517 .search("this_unique_sentinel_deleted_record", 10)
2518 .await
2519 .unwrap()
2520 .len(),
2521 1
2522 );
2523
2524 store.delete("gotcha:deleted").await.unwrap();
2526
2527 let results = store
2529 .search("this_unique_sentinel_deleted_record", 10)
2530 .await
2531 .unwrap();
2532 assert!(
2533 results.is_empty(),
2534 "deleted record must not appear in search results"
2535 );
2536 }
2537
2538 #[tokio::test]
2539 async fn search_delete_does_not_consume_top_k_slot() {
2540 let (store, _dir) = temp_store();
2541
2542 let mut deleted = make_record("gotcha:deleted-slot");
2543 deleted.value = "shared_sentinel_term".to_string();
2544 store.put(&deleted.key, &deleted).await.unwrap();
2545
2546 let mut live = make_record("gotcha:live-slot");
2547 live.value = "shared_sentinel_term".to_string();
2548 store.put(&live.key, &live).await.unwrap();
2549
2550 store.delete(&deleted.key).await.unwrap();
2551
2552 let results = store.search("shared_sentinel_term", 1).await.unwrap();
2553 assert_eq!(
2554 results.len(),
2555 1,
2556 "live hit should still fill the top-k slot"
2557 );
2558 assert_eq!(results[0].key, "gotcha:live-slot");
2559 }
2560
2561 #[tokio::test]
2562 async fn search_returns_full_record_from_surrealkv_not_tantivy_stored_fields() {
2563 let (store, _dir) = temp_store();
2566 let mut r = make_record("gotcha:full-record-check");
2567 r.value = "sentinel_fullrecord_uniqueterm_xqz".to_string();
2568 r.tags = vec!["production".to_string(), "critical-path".to_string()];
2569 store.put(&r.key, &r).await.unwrap();
2570
2571 let results = store
2572 .search("sentinel_fullrecord_uniqueterm_xqz", 10)
2573 .await
2574 .unwrap();
2575 assert_eq!(results.len(), 1);
2576 assert_eq!(
2577 results[0].tags,
2578 vec!["production", "critical-path"],
2579 "full tags must come from SurrealKV, not tantivy stored fields"
2580 );
2581 }
2582
2583 #[tokio::test]
2586 async fn search_m05f_20_records_returns_exactly_correct_5() {
2587 let (store, _dir) = temp_store();
2588
2589 for i in 0..15 {
2591 let mut r = make_record(&format!("gotcha:noise-{i:02}"));
2592 r.value = format!("background noise record about rayon and petgraph item {i}");
2593 store.put(&r.key, &r).await.unwrap();
2594 }
2595
2596 let mut target_keys = Vec::new();
2598 for i in 0..5 {
2599 let mut r = make_record(&format!("gotcha:target-{i}"));
2600 r.value = format!("sentinel_m05f_unique record index {i} with extra text");
2601 store.put(&r.key, &r).await.unwrap();
2602 target_keys.push(r.key.clone());
2603 }
2604
2605 let results = store.search("sentinel_m05f_unique", 20).await.unwrap();
2606 let result_keys: Vec<&str> = results.iter().map(|r| r.key.as_str()).collect();
2607
2608 assert_eq!(
2609 results.len(),
2610 5,
2611 "expected exactly 5 results, got {}: {:?}",
2612 results.len(),
2613 result_keys
2614 );
2615
2616 for k in &target_keys {
2617 assert!(
2618 result_keys.contains(&k.as_str()),
2619 "target key '{k}' missing from results"
2620 );
2621 }
2622
2623 for r in &results {
2625 assert!(
2626 r.key.starts_with("gotcha:target-"),
2627 "noise record '{}' must not appear in results",
2628 r.key
2629 );
2630 }
2631 }
2632
2633 #[tokio::test]
2639 async fn search_5k_records_zero_false_positives_limit_and_full_record_correct() {
2640 let (store, _dir) = temp_store();
2641
2642 let noise: Vec<Record> = (0..4_980_usize)
2644 .map(|i| {
2645 let mut r = make_record(&format!("file:src/module_{i:04}.rs"));
2646 r.value = format!(
2647 "module {i} handles initialization routing configuration management dispatch"
2648 );
2649 r
2650 })
2651 .collect();
2652 let noise_pairs: Vec<(&str, &Record)> = noise.iter().map(|r| (r.key.as_str(), r)).collect();
2653 store.put_batch(&noise_pairs).await.unwrap();
2654
2655 let targets: Vec<Record> = (0..20_usize)
2658 .map(|i| {
2659 let mut r = make_record(&format!("gotcha:target-{i:02}"));
2660 r.value = format!("zqx_sentinel_5k_proof unique term record {i}");
2661 r.tags = vec!["verified-from-surrealkv".to_string()];
2662 r
2663 })
2664 .collect();
2665 let target_pairs: Vec<(&str, &Record)> =
2666 targets.iter().map(|r| (r.key.as_str(), r)).collect();
2667 store.put_batch(&target_pairs).await.unwrap();
2668
2669 let results = store.search("zqx_sentinel_5k_proof", 100).await.unwrap();
2672 assert_eq!(
2673 results.len(),
2674 20,
2675 "expected 20 hits from 5,000 records, got {}",
2676 results.len()
2677 );
2678
2679 let result_keys: Vec<&str> = results.iter().map(|r| r.key.as_str()).collect();
2680 let target_keys: Vec<&str> = targets.iter().map(|r| r.key.as_str()).collect();
2681
2682 for k in &target_keys {
2684 assert!(result_keys.contains(k), "missing target: {k}");
2685 }
2686 for r in &results {
2688 assert!(
2689 r.key.starts_with("gotcha:target-"),
2690 "noise record '{}' must not appear in results",
2691 r.key
2692 );
2693 }
2694
2695 for r in &results {
2697 assert_eq!(
2698 r.tags,
2699 vec!["verified-from-surrealkv"],
2700 "tags must be fetched from SurrealKV, key: {}",
2701 r.key
2702 );
2703 }
2704
2705 let limited = store.search("zqx_sentinel_5k_proof", 5).await.unwrap();
2708 assert_eq!(limited.len(), 5, "limit=5 must cap results at scale");
2709
2710 let over = store.search("zqx_sentinel_5k_proof", 999).await.unwrap();
2712 assert_eq!(
2713 over.len(),
2714 20,
2715 "limit > match count must return all 20 matches, not panic"
2716 );
2717
2718 let noise_only_results = store.search("zqx_sentinel_5k_proof", 100).await.unwrap();
2723 for r in &noise_only_results {
2724 assert!(
2725 !r.key.starts_with("file:src/module_"),
2726 "noise module record should not match sentinel query: {}",
2727 r.key
2728 );
2729 }
2730 }
2731
2732 fn make_record_v(key: &str, value: &str) -> Record {
2736 let mut r = make_record(key);
2737 r.value = value.to_string();
2738 r
2739 }
2740
2741 fn reopen_store(root: &std::path::Path) -> Store {
2744 let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
2745 let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
2746 let search = OnceCell::new();
2747 let _ = search.set(Search::open(&root.join("search_index")).unwrap());
2748 Store {
2749 knowledge,
2750 sessions,
2751 search,
2752 root: root.to_path_buf(),
2753 index_needs_rebuild: false,
2754 }
2755 }
2756
2757 async fn reopen_store_open_and_rebuild_like(root: &std::path::Path) -> Store {
2758 let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
2759 let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
2760 let mut store = Store {
2761 knowledge,
2762 sessions,
2763 search: OnceCell::new(),
2764 root: root.to_path_buf(),
2765 index_needs_rebuild: false,
2766 };
2767
2768 let search_path = store.root.join("search_index");
2769 let stale_marker = store.root.join(SEARCH_STALE_MARKER);
2770 let has_stale_marker = stale_marker.exists();
2771 let has_sync_pending = store.root.join(SEARCH_SYNC_PENDING).exists();
2772
2773 if (has_stale_marker || has_sync_pending) && search_path.exists() {
2774 std::fs::remove_dir_all(&search_path).unwrap();
2775 }
2776
2777 match Search::open(&search_path) {
2778 Ok(s) => {
2779 let _ = store.search.set(s);
2780 }
2781 Err(_) => {
2782 if search_path.exists() {
2783 std::fs::remove_dir_all(&search_path).unwrap();
2784 }
2785 let _ = store.search.set(Search::open(&search_path).unwrap());
2786 store.index_needs_rebuild = true;
2787 }
2788 }
2789
2790 if has_stale_marker || has_sync_pending {
2791 store.index_needs_rebuild = true;
2792 }
2793
2794 if store.index_needs_rebuild {
2795 store.rebuild_search_index().await.unwrap();
2796 let _ = std::fs::remove_file(store.root.join(SEARCH_SYNC_PENDING));
2797 if has_stale_marker {
2798 let _ = std::fs::remove_file(&stale_marker);
2799 }
2800 }
2801
2802 store
2803 }
2804
2805 #[tokio::test]
2808 async fn rebuild_search_index_after_missing_index_restores_search() {
2809 let (store, _dir) = temp_store();
2810 let root = store.root.clone();
2811
2812 let records: Vec<Record> = (0..10)
2814 .map(|i| {
2815 make_record_v(
2816 &format!("gotcha:rebuild-miss-{i:02}"),
2817 "xq_rebuild_missing_sentinel unique term",
2818 )
2819 })
2820 .collect();
2821 let pairs: Vec<(&str, &Record)> = records.iter().map(|r| (r.key.as_str(), r)).collect();
2822 store.put_batch(&pairs).await.unwrap();
2823 store.close().await.unwrap();
2824
2825 std::fs::remove_dir_all(root.join("search_index")).unwrap();
2827
2828 let store2 = reopen_store(&root);
2830 assert!(
2831 !store2.index_needs_rebuild(),
2832 "reopen_store sets flag=false; we test rebuild directly"
2833 );
2834
2835 let committed = store2.rebuild_search_index().await.unwrap();
2836 assert_eq!(committed, 10, "rebuild must commit all 10 records");
2837
2838 let results = store2
2839 .search("xq_rebuild_missing_sentinel", 20)
2840 .await
2841 .unwrap();
2842 assert_eq!(
2843 results.len(),
2844 10,
2845 "all records must be findable after rebuild"
2846 );
2847 }
2848
2849 #[tokio::test]
2852 async fn rebuild_search_index_after_corrupt_index_restores_search() {
2853 let (store, _dir) = temp_store();
2854 let root = store.root.clone();
2855
2856 let r = make_record_v(
2857 "gotcha:rebuild-corrupt",
2858 "xq_rebuild_corrupt_sentinel unique",
2859 );
2860 store.put("gotcha:rebuild-corrupt", &r).await.unwrap();
2861 store.close().await.unwrap();
2862
2863 std::fs::write(
2865 root.join("search_index").join("meta.json"),
2866 b"not valid json {{{{",
2867 )
2868 .unwrap();
2869
2870 let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
2872 let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
2873 let search_path = root.join("search_index");
2874 let (search_cell, needs_rebuild) = {
2875 let cell = OnceCell::new();
2876 match Search::open(&search_path) {
2877 Ok(s) => {
2878 let _ = cell.set(s);
2879 (cell, false)
2880 }
2881 Err(_) => {
2882 std::fs::remove_dir_all(&search_path).unwrap();
2883 let _ = cell.set(Search::open(&search_path).unwrap());
2884 (cell, true)
2885 }
2886 }
2887 };
2888 let store2 = Store {
2889 knowledge,
2890 sessions,
2891 search: search_cell,
2892 root: root.clone(),
2893 index_needs_rebuild: needs_rebuild,
2894 };
2895
2896 assert!(
2897 store2.index_needs_rebuild(),
2898 "corrupt meta.json must trigger rebuild flag"
2899 );
2900
2901 store2.rebuild_search_index().await.unwrap();
2902
2903 let results = store2
2904 .search("xq_rebuild_corrupt_sentinel", 10)
2905 .await
2906 .unwrap();
2907 assert_eq!(
2908 results.len(),
2909 1,
2910 "record must be searchable after rebuild from corrupt state"
2911 );
2912 assert_eq!(results[0].key, "gotcha:rebuild-corrupt");
2913 }
2914
2915 #[tokio::test]
2917 async fn rebuild_search_index_returns_committed_count() {
2918 let (store, _dir) = temp_store();
2919 let root = store.root.clone();
2920
2921 let records: Vec<Record> = (0..7)
2922 .map(|i| make_record(&format!("file:src/mod_{i}.rs")))
2923 .collect();
2924 let pairs: Vec<(&str, &Record)> = records.iter().map(|r| (r.key.as_str(), r)).collect();
2925 store.put_batch(&pairs).await.unwrap();
2926 store.close().await.unwrap();
2927
2928 std::fs::remove_dir_all(root.join("search_index")).unwrap();
2930 let store2 = reopen_store(&root);
2931 let committed = store2.rebuild_search_index().await.unwrap();
2932 assert_eq!(
2933 committed, 7,
2934 "committed count must equal number of records in SurrealKV"
2935 );
2936 }
2937
2938 #[tokio::test]
2939 async fn open_and_rebuild_like_wipes_stale_index_when_sync_pending_exists() {
2940 let (store, _dir) = temp_store();
2941 let root = store.root.clone();
2942
2943 let deleted = make_record_v("gotcha:deleted-after-crash", "shared_crash_sentinel");
2944 let live = make_record_v("gotcha:live-after-crash", "shared_crash_sentinel");
2945
2946 store.put(&deleted.key, &deleted).await.unwrap();
2947 store.put(&live.key, &live).await.unwrap();
2948 store.delete(&deleted.key).await.unwrap();
2949
2950 store.ensure_search().unwrap().add_record(&deleted).unwrap();
2952 std::fs::write(root.join(SEARCH_SYNC_PENDING), b"").unwrap();
2953 store.close().await.unwrap();
2954
2955 let reopened = reopen_store_open_and_rebuild_like(&root).await;
2956 let results = reopened.search("shared_crash_sentinel", 1).await.unwrap();
2957 assert_eq!(
2958 results.len(),
2959 1,
2960 "live record should fill top-k after rebuild"
2961 );
2962 assert_eq!(results[0].key, "gotcha:live-after-crash");
2963 assert!(
2964 !root.join(SEARCH_SYNC_PENDING).exists(),
2965 "successful rebuild should clear sync-pending marker"
2966 );
2967 }
2968
2969 #[tokio::test]
2970 async fn put_leaves_sync_pending_when_search_cannot_initialize() {
2971 let dir = TempDir::new().unwrap();
2972 let root = dir.path().join("mati_test");
2973 std::fs::create_dir_all(&root).unwrap();
2974 let knowledge = open_knowledge_tree(root.join("knowledge.db")).unwrap();
2975 let sessions = open_sessions_tree(root.join("sessions.db")).unwrap();
2976 std::fs::write(root.join("search_index"), b"not a directory").unwrap();
2977
2978 let store = Store {
2979 knowledge,
2980 sessions,
2981 search: OnceCell::new(),
2982 root: root.clone(),
2983 index_needs_rebuild: false,
2984 };
2985
2986 let record = make_record("gotcha:search-sync-failure");
2987 store.put(&record.key, &record).await.unwrap();
2988
2989 assert!(
2990 root.join(SEARCH_SYNC_PENDING).exists(),
2991 "failed search sync must leave the crash-fence marker in place"
2992 );
2993 }
2994
2995 #[tokio::test]
2998 async fn rebuild_search_index_twice_is_safe() {
2999 let (store, _dir) = temp_store();
3000 let r = make_record_v("gotcha:idempotent", "xq_rebuild_idempotent_sentinel unique");
3001 store.put("gotcha:idempotent", &r).await.unwrap();
3002
3003 store.rebuild_search_index().await.unwrap();
3004 store.rebuild_search_index().await.unwrap();
3005
3006 let results = store
3007 .search("xq_rebuild_idempotent_sentinel", 10)
3008 .await
3009 .unwrap();
3010 assert_eq!(
3011 results.len(),
3012 1,
3013 "dedup must collapse duplicate tantivy entries to one result"
3014 );
3015 }
3016
3017 #[tokio::test]
3019 async fn open_healthy_index_does_not_set_rebuild_flag() {
3020 let (store, _dir) = temp_store();
3021 assert!(!store.index_needs_rebuild());
3022 }
3023
3024 #[tokio::test]
3027 async fn history_empty_key_returns_error() {
3028 let (store, _dir) = temp_store();
3029 let result = store.history("", 0);
3030 assert!(result.is_err(), "empty key must be rejected");
3031 }
3032
3033 #[tokio::test]
3034 async fn history_single_version() {
3035 let (store, _dir) = temp_store();
3036 store
3037 .put("gotcha:single", &make_record("gotcha:single"))
3038 .await
3039 .unwrap();
3040
3041 let entries = store.history("gotcha:single", 0).unwrap();
3042 assert!(!entries.is_empty(), "must return at least one version");
3043 assert!(!entries[0].is_tombstone);
3044 assert!(entries[0].record.is_some());
3045 assert_eq!(entries[0].record.as_ref().unwrap().key, "gotcha:single");
3046 }
3047
3048 #[tokio::test]
3049 async fn history_multiple_versions_newest_first() {
3050 let (store, _dir) = temp_store();
3051 let mut r = make_record("gotcha:multi");
3052 r.value = "v1".to_string();
3053 store.put("gotcha:multi", &r).await.unwrap();
3054 r.value = "v2".to_string();
3055 r.version.logical_clock = 2;
3056 store.put("gotcha:multi", &r).await.unwrap();
3057 r.value = "v3".to_string();
3058 r.version.logical_clock = 3;
3059 store.put("gotcha:multi", &r).await.unwrap();
3060
3061 let entries = store.history("gotcha:multi", 0).unwrap();
3062 assert!(
3063 entries.len() >= 3,
3064 "expected >=3 versions, got {}",
3065 entries.len()
3066 );
3067
3068 for pair in entries.windows(2) {
3070 assert!(
3071 pair[0].timestamp_ns >= pair[1].timestamp_ns,
3072 "history must be newest-first: {} >= {}",
3073 pair[0].timestamp_ns,
3074 pair[1].timestamp_ns,
3075 );
3076 }
3077
3078 let newest = entries[0].record.as_ref().unwrap();
3080 assert_eq!(newest.value, "v3");
3081 }
3082
3083 #[tokio::test]
3084 async fn history_includes_tombstones() {
3085 let (store, _dir) = temp_store();
3086 store
3087 .put("gotcha:tomb", &make_record("gotcha:tomb"))
3088 .await
3089 .unwrap();
3090
3091 {
3096 let mut txn = store.knowledge.begin_with_mode(Mode::WriteOnly).unwrap();
3097 txn.set_durability(SkvDurability::Immediate);
3098 txn.soft_delete(b"gotcha:tomb").unwrap();
3099 txn.commit().await.unwrap();
3100 }
3101
3102 let entries = store.history("gotcha:tomb", 0).unwrap();
3103 assert!(
3104 entries.len() >= 2,
3105 "must have create + soft-delete, got {}",
3106 entries.len()
3107 );
3108 assert!(
3110 entries.iter().any(|e| e.is_tombstone),
3111 "tombstone must be present in history",
3112 );
3113 }
3114
3115 #[tokio::test]
3116 async fn history_no_key_spill() {
3117 let (store, _dir) = temp_store();
3118 store
3119 .put("gotcha:alpha", &make_record("gotcha:alpha"))
3120 .await
3121 .unwrap();
3122 store
3123 .put(
3124 "gotcha:alpha-extended",
3125 &make_record("gotcha:alpha-extended"),
3126 )
3127 .await
3128 .unwrap();
3129 store
3130 .put("gotcha:beta", &make_record("gotcha:beta"))
3131 .await
3132 .unwrap();
3133
3134 let entries = store.history("gotcha:alpha", 0).unwrap();
3135 for e in &entries {
3136 if let Some(ref rec) = e.record {
3137 assert_eq!(
3138 rec.key, "gotcha:alpha",
3139 "spilled into adjacent key: {}",
3140 rec.key
3141 );
3142 }
3143 }
3144 }
3145
3146 #[tokio::test]
3147 async fn history_limit() {
3148 let (store, _dir) = temp_store();
3149 let mut r = make_record("gotcha:limited");
3150 for i in 0..5 {
3151 r.value = format!("v{i}");
3152 r.version.logical_clock = i as u64;
3153 store.put("gotcha:limited", &r).await.unwrap();
3154 }
3155
3156 let entries = store.history("gotcha:limited", 2).unwrap();
3157 assert!(
3158 entries.len() <= 2,
3159 "limit=2 but got {} entries",
3160 entries.len()
3161 );
3162 }
3163
3164 #[tokio::test]
3165 async fn history_since_filters_old_versions() {
3166 let (store, _dir) = temp_store();
3167 let mut r = make_record("gotcha:since");
3168 r.value = "old".to_string();
3169 store.put("gotcha:since", &r).await.unwrap();
3170
3171 let since_secs = SystemTime::now()
3174 .duration_since(UNIX_EPOCH)
3175 .unwrap()
3176 .as_secs();
3177
3178 r.value = "new".to_string();
3179 r.version.logical_clock = 2;
3180 store.put("gotcha:since", &r).await.unwrap();
3181
3182 let entries = store.history_since("gotcha:since", since_secs, 0).unwrap();
3183 assert!(
3185 !entries.is_empty(),
3186 "since filter should include the recent write",
3187 );
3188 for e in &entries {
3190 assert!(
3191 e.timestamp_secs >= since_secs.saturating_sub(1),
3192 "entry ts {} is before since {}",
3193 e.timestamp_secs,
3194 since_secs,
3195 );
3196 }
3197 }
3198
3199 #[tokio::test]
3200 async fn records_since_with_dep() {
3201 let (store, _dir) = temp_store();
3202
3203 let now = SystemTime::now()
3204 .duration_since(UNIX_EPOCH)
3205 .unwrap()
3206 .as_secs();
3207 let old_ts = now.saturating_sub(3600);
3208
3209 let mut old_rec = make_record("gotcha:old");
3210 old_rec.updated_at = old_ts;
3211 store.put("gotcha:old", &old_rec).await.unwrap();
3212
3213 let mut new_gotcha = make_record("gotcha:new");
3214 new_gotcha.updated_at = now;
3215 store.put("gotcha:new", &new_gotcha).await.unwrap();
3216
3217 let mut dep_rec = make_record("dep:cargo:serde");
3218 dep_rec.category = crate::store::record::Category::Dependency;
3219 dep_rec.updated_at = now;
3220 store.put("dep:cargo:serde", &dep_rec).await.unwrap();
3221
3222 let since = now.saturating_sub(60);
3223 let results = store.records_since(since, 0).await.unwrap();
3224 let keys: Vec<&str> = results.iter().map(|r| r.key.as_str()).collect();
3225
3226 assert!(keys.contains(&"gotcha:new"), "new gotcha should appear");
3227 assert!(
3228 keys.contains(&"dep:cargo:serde"),
3229 "dep record should appear"
3230 );
3231 assert!(
3232 !keys.contains(&"gotcha:old"),
3233 "old gotcha should be excluded"
3234 );
3235
3236 for pair in results.windows(2) {
3238 assert!(
3239 pair[0].updated_at >= pair[1].updated_at,
3240 "results must be newest-first",
3241 );
3242 }
3243 }
3244
3245 #[tokio::test]
3246 async fn records_since_respects_limit() {
3247 let (store, _dir) = temp_store();
3248 let now = SystemTime::now()
3249 .duration_since(UNIX_EPOCH)
3250 .unwrap()
3251 .as_secs();
3252
3253 for i in 0..10 {
3254 let mut r = make_record(&format!("gotcha:lim-{i:02}"));
3255 r.updated_at = now;
3256 store.put(&r.key, &r).await.unwrap();
3257 }
3258
3259 let results = store.records_since(now.saturating_sub(1), 3).await.unwrap();
3260 assert_eq!(results.len(), 3, "limit=3 should cap at 3");
3261 }
3262
3263 #[test]
3264 fn history_entry_timestamp_conversion() {
3265 let entry = HistoryEntry {
3266 timestamp_secs: 1_710_520_800,
3267 timestamp_ns: 1_710_520_800_000_000_000,
3268 record: None,
3269 is_tombstone: false,
3270 };
3271 assert_eq!(entry.timestamp_secs, entry.timestamp_ns / 1_000_000_000);
3272 }
3273
3274 #[test]
3277 fn lock_error_hint_rewrites_real_lock_contention_error() {
3278 let dir = TempDir::new().unwrap();
3279 let db_path = dir.path().join("knowledge.db");
3280 std::fs::create_dir_all(&db_path).unwrap();
3281
3282 std::fs::write(db_path.join("LOCK"), "12345\n").unwrap();
3284
3285 let err = anyhow::anyhow!("Database at /foo/LOCK is already locked by another process");
3286 let result = lock_error_hint(err, &db_path);
3287 let msg = format!("{result}");
3288 assert!(
3289 msg.contains("another mati process holds the lock"),
3290 "should rewrite lock error, got: {msg}"
3291 );
3292 assert!(
3293 msg.contains("PID: 12345"),
3294 "should include holder PID, got: {msg}"
3295 );
3296 }
3297
3298 #[test]
3299 fn lock_error_hint_passes_through_non_lock_errors() {
3300 let dir = TempDir::new().unwrap();
3301 let db_path = dir.path().join("knowledge.db");
3302 std::fs::create_dir_all(&db_path).unwrap();
3303
3304 std::fs::write(db_path.join("LOCK"), "99999\n").unwrap();
3306
3307 let err = anyhow::anyhow!("WAL segment corrupt at offset 1234");
3308 let result = lock_error_hint(err, &db_path);
3309 let msg = format!("{result}");
3310 assert!(
3311 msg.contains("WAL segment corrupt"),
3312 "non-lock errors must pass through unchanged, got: {msg}"
3313 );
3314 assert!(
3315 !msg.contains("another mati process"),
3316 "non-lock errors must NOT be rewritten to lock errors, got: {msg}"
3317 );
3318 }
3319
3320 #[tokio::test]
3323 async fn transact_knowledge_rejects_sessions_key() {
3324 let dir = tempfile::tempdir().unwrap();
3325 let store = Store::open(dir.path()).await.unwrap();
3326
3327 let ops = vec![KnowledgeWriteOp::PutRaw {
3329 key: "session:foo",
3330 value: b"data",
3331 }];
3332 let err = store.transact_knowledge(&ops).await.unwrap_err();
3333 assert!(
3334 err.to_string().contains("routes to sessions tree"),
3335 "wrong-tree key must be rejected: {err}"
3336 );
3337 }
3338
3339 #[tokio::test]
3340 async fn transact_sessions_raw_rejects_knowledge_key() {
3341 let dir = tempfile::tempdir().unwrap();
3342 let store = Store::open(dir.path()).await.unwrap();
3343
3344 let entries: Vec<(&str, &[u8])> = vec![("gotcha:foo", b"data")];
3346 let err = store.transact_sessions_raw(&entries).await.unwrap_err();
3347 assert!(
3348 err.to_string().contains("routes to knowledge tree"),
3349 "wrong-tree key must be rejected: {err}"
3350 );
3351 }
3352
3353 #[tokio::test]
3354 async fn transact_knowledge_accepts_valid_knowledge_keys() {
3355 let dir = tempfile::tempdir().unwrap();
3356 let store = Store::open(dir.path()).await.unwrap();
3357
3358 let ops = vec![
3359 KnowledgeWriteOp::PutRaw {
3360 key: "gotcha:test",
3361 value: b"data1",
3362 },
3363 KnowledgeWriteOp::PutRaw {
3364 key: "audit:knowledge:123",
3365 value: b"data2",
3366 },
3367 ];
3368 store
3369 .transact_knowledge(&ops)
3370 .await
3371 .expect("valid knowledge keys must succeed");
3372 }
3373
3374 #[tokio::test]
3375 async fn transact_sessions_raw_accepts_valid_session_keys() {
3376 let dir = tempfile::tempdir().unwrap();
3377 let store = Store::open(dir.path()).await.unwrap();
3378
3379 let entries: Vec<(&str, &[u8])> = vec![
3380 ("session:consulted:file:foo", b"data1"),
3381 ("audit:session:123", b"data2"),
3382 ("analytics:hit_2026-04-09", b"data3"),
3383 ];
3384 store
3385 .transact_sessions_raw(&entries)
3386 .await
3387 .expect("valid session keys must succeed");
3388 }
3389}