1use anamnesis_core::chunk::{Chunk, ContentHash};
7use anamnesis_core::model::{AnamnesisRecord, Kind, Provenance, RecordId, Scope, SourceDescriptor};
8use chrono::{DateTime, TimeZone, Utc};
9use rusqlite::{params, OptionalExtension, Transaction};
10
11use crate::{Result, Store, StoreError};
12
13fn ts(dt: DateTime<Utc>) -> i64 {
18 dt.timestamp()
19}
20
21fn dt(ts: i64) -> DateTime<Utc> {
22 Utc.timestamp_opt(ts, 0).single().unwrap_or_else(Utc::now)
23}
24
25fn scope_str(s: Scope) -> &'static str {
26 match s {
27 Scope::User => "user",
28 Scope::Project => "project",
29 Scope::Session => "session",
30 Scope::Ephemeral => "ephemeral",
31 }
32}
33
34fn scope_from(s: &str) -> Scope {
35 match s {
36 "user" => Scope::User,
37 "project" => Scope::Project,
38 "session" => Scope::Session,
39 "ephemeral" => Scope::Ephemeral,
40 _ => Scope::Ephemeral,
41 }
42}
43
44fn kind_str(k: Kind) -> &'static str {
45 match k {
46 Kind::Fact => "fact",
47 Kind::Preference => "preference",
48 Kind::Feedback => "feedback",
49 Kind::Reference => "reference",
50 Kind::Episode => "episode",
51 Kind::Skill => "skill",
52 Kind::Unknown => "unknown",
53 }
54}
55
56fn kind_from(s: &str) -> Kind {
57 match s {
58 "fact" => Kind::Fact,
59 "preference" => Kind::Preference,
60 "feedback" => Kind::Feedback,
61 "reference" => Kind::Reference,
62 "episode" => Kind::Episode,
63 "skill" => Kind::Skill,
64 _ => Kind::Unknown,
65 }
66}
67
68pub fn f32_to_blob(v: &[f32]) -> Vec<u8> {
70 let mut out = Vec::with_capacity(v.len() * 4);
71 for x in v {
72 out.extend_from_slice(&x.to_le_bytes());
73 }
74 out
75}
76
77pub fn blob_to_f32(b: &[u8]) -> Result<Vec<f32>> {
79 if b.len() % 4 != 0 {
80 return Err(StoreError::Sqlite(rusqlite::Error::InvalidQuery));
81 }
82 Ok(b.chunks_exact(4)
83 .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
84 .collect())
85}
86
87fn cosine(a: &[f32], b: &[f32]) -> f64 {
88 if a.len() != b.len() || a.is_empty() {
89 return 0.0;
90 }
91 let mut dot = 0f64;
92 let mut na = 0f64;
93 let mut nb = 0f64;
94 for i in 0..a.len() {
95 let x = a[i] as f64;
96 let y = b[i] as f64;
97 dot += x * y;
98 na += x * x;
99 nb += y * y;
100 }
101 if na == 0.0 || nb == 0.0 {
102 return 0.0;
103 }
104 dot / (na.sqrt() * nb.sqrt())
105}
106
107#[derive(Debug, Clone, PartialEq)]
113pub struct ChunkHit {
114 pub chunk_id: String,
116 pub record_id: RecordId,
118 pub seq: u32,
120 pub content: String,
122 pub score: f64,
124}
125
126#[derive(Debug, Clone, Default, PartialEq, Eq)]
138pub struct SearchFilter {
139 pub source: Option<String>,
142 pub instance: Option<String>,
145 pub kind: Option<String>,
148 pub scope: Option<String>,
150 pub time_from: Option<i64>,
152 pub time_to: Option<i64>,
154}
155
156impl SearchFilter {
157 pub fn is_empty(&self) -> bool {
159 self.source.is_none()
160 && self.instance.is_none()
161 && self.kind.is_none()
162 && self.scope.is_none()
163 && self.time_from.is_none()
164 && self.time_to.is_none()
165 }
166}
167
168#[derive(Debug, Clone, PartialEq)]
170pub struct PendingEmbeddingJob {
171 pub job_id: i64,
173 pub chunk_id: String,
175 pub content_hash: ContentHash,
177 pub model_id: String,
179 pub content: String,
181}
182
183#[derive(Debug, Clone, PartialEq, Eq)]
185pub struct StoreStats {
186 pub records: u64,
188 pub chunks: u64,
190 pub jobs_pending: u64,
192 pub jobs_failed: u64,
194 pub sources: u64,
196}
197
198#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct SourceRow {
207 pub adapter: String,
209 pub instance: String,
211 pub location: Option<String>,
215 pub config_json: Option<String>,
217 pub added_at: i64,
219 pub last_import_at: Option<i64>,
222}
223
224#[derive(Debug, Clone, PartialEq, Eq)]
229pub struct SourceWithCounts {
230 pub source: SourceRow,
232 pub record_count: u64,
236 pub chunk_count: u64,
238}
239
240pub const MAX_LIST_LIMIT: u32 = 1000;
245
246impl Store {
251 pub fn register_source(
257 &self,
258 adapter: &str,
259 instance: Option<&str>,
260 location: Option<&str>,
261 config_json: Option<&str>,
262 ) -> Result<()> {
263 let inst = instance.unwrap_or("");
264 self.conn.lock().execute(
265 "INSERT INTO sources(adapter, instance, location, config_json, added_at) \
266 VALUES(?1, ?2, ?3, ?4, strftime('%s','now')) \
267 ON CONFLICT(adapter, instance) DO UPDATE SET \
268 location = excluded.location, \
269 config_json = excluded.config_json",
270 params![adapter, inst, location, config_json],
271 )?;
272 Ok(())
273 }
274
275 pub fn get_source(&self, adapter: &str, instance: Option<&str>) -> Result<Option<SourceRow>> {
280 let inst = instance.unwrap_or("");
281 let conn = self.conn.lock();
282 let row = conn
283 .query_row(
284 "SELECT adapter, instance, location, config_json, added_at, last_import_at \
285 FROM sources WHERE adapter = ?1 AND instance = ?2",
286 params![adapter, inst],
287 |r| {
288 Ok(SourceRow {
289 adapter: r.get(0)?,
290 instance: r.get(1)?,
291 location: r.get(2)?,
292 config_json: r.get(3)?,
293 added_at: r.get(4)?,
294 last_import_at: r.get(5)?,
295 })
296 },
297 )
298 .optional()?;
299 Ok(row)
300 }
301
302 pub fn update_last_import_at(&self, adapter: &str, instance: Option<&str>) -> Result<bool> {
309 let inst = instance.unwrap_or("");
310 let n = self.conn.lock().execute(
311 "UPDATE sources SET last_import_at = strftime('%s','now') \
312 WHERE adapter = ?1 AND instance = ?2",
313 params![adapter, inst],
314 )?;
315 Ok(n > 0)
316 }
317
318 pub fn list_sources_full(&self) -> Result<Vec<SourceRow>> {
322 let conn = self.conn.lock();
323 let mut stmt = conn.prepare(
324 "SELECT adapter, instance, location, config_json, added_at, last_import_at \
325 FROM sources ORDER BY adapter, instance",
326 )?;
327 let rows = stmt
328 .query_map([], |r| {
329 Ok(SourceRow {
330 adapter: r.get(0)?,
331 instance: r.get(1)?,
332 location: r.get(2)?,
333 config_json: r.get(3)?,
334 added_at: r.get(4)?,
335 last_import_at: r.get(5)?,
336 })
337 })?
338 .collect::<rusqlite::Result<Vec<_>>>()?;
339 Ok(rows)
340 }
341
342 pub fn list_sources_with_counts(&self) -> Result<Vec<SourceWithCounts>> {
357 let conn = self.conn.lock();
358 let mut stmt = conn.prepare(
359 "SELECT s.adapter, s.instance, s.location, s.config_json, \
360 s.added_at, s.last_import_at, \
361 COUNT(DISTINCT r.id) AS record_count, \
362 COUNT(rc.id) AS chunk_count \
363 FROM sources s \
364 LEFT JOIN records r \
365 ON r.adapter = s.adapter AND r.instance = s.instance \
366 LEFT JOIN record_chunks rc \
367 ON rc.record_id = r.id \
368 GROUP BY s.adapter, s.instance \
369 ORDER BY s.adapter, s.instance",
370 )?;
371 let rows = stmt
372 .query_map([], |r| {
373 Ok(SourceWithCounts {
374 source: SourceRow {
375 adapter: r.get(0)?,
376 instance: r.get(1)?,
377 location: r.get(2)?,
378 config_json: r.get(3)?,
379 added_at: r.get(4)?,
380 last_import_at: r.get(5)?,
381 },
382 record_count: r.get::<_, i64>(6)? as u64,
383 chunk_count: r.get::<_, i64>(7)? as u64,
384 })
385 })?
386 .collect::<rusqlite::Result<Vec<_>>>()?;
387 Ok(rows)
388 }
389
390 pub fn deregister_source(&self, adapter: &str, instance: Option<&str>) -> Result<()> {
393 let inst = instance.unwrap_or("");
394 self.conn.lock().execute(
395 "DELETE FROM sources WHERE adapter = ?1 AND instance = ?2",
396 params![adapter, inst],
397 )?;
398 Ok(())
399 }
400
401 pub fn list_sources(&self) -> Result<Vec<(String, String, Option<String>)>> {
403 let conn = self.conn.lock();
404 let mut stmt = conn.prepare(
405 "SELECT adapter, instance, location FROM sources ORDER BY adapter, instance",
406 )?;
407 let rows = stmt
408 .query_map([], |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)))?
409 .collect::<rusqlite::Result<Vec<_>>>()?;
410 Ok(rows)
411 }
412}
413
414impl Store {
419 pub fn set_active_model(&self, model_id: &str) -> Result<()> {
424 self.conn.lock().execute(
425 "INSERT INTO meta(key, value) VALUES('active_embedding_model', ?1) \
426 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
427 params![model_id],
428 )?;
429 Ok(())
430 }
431
432 pub fn active_model(&self) -> Result<Option<String>> {
434 let v: Option<String> = self
435 .conn
436 .lock()
437 .query_row(
438 "SELECT value FROM meta WHERE key = 'active_embedding_model'",
439 [],
440 |r| r.get(0),
441 )
442 .optional()?;
443 Ok(v)
444 }
445}
446
447impl Store {
452 pub fn upsert_record(
482 &self,
483 record: &AnamnesisRecord,
484 chunks: &[Chunk],
485 raw_payload_json: Option<&str>,
486 ) -> Result<(u64, u64)> {
487 let active = self.active_model()?;
488 let mut conn = self.conn.lock();
489 let tx = conn.transaction()?;
490
491 let existing_hash: Option<String> = tx
495 .query_row(
496 "SELECT raw_hash FROM records WHERE id = ?1",
497 params![record.id.0],
498 |r| r.get::<_, String>(0),
499 )
500 .optional()?;
501 if existing_hash.as_deref() == Some(record.provenance.raw_hash.as_str()) {
502 if let Some(model_id) = active.as_deref() {
508 let now = chrono::Utc::now().timestamp();
509 enqueue_jobs(&tx, chunks, model_id, now)?;
510 }
511 tx.commit()?;
512 return Ok((0, 0));
513 }
514
515 let now = chrono::Utc::now().timestamp();
516 write_record(&tx, record)?;
517 write_raw_artifact(&tx, record, raw_payload_json, now)?;
518 write_chunks(&tx, record, chunks)?;
519 if let Some(model_id) = active.as_deref() {
520 enqueue_jobs(&tx, chunks, model_id, now)?;
521 }
522 tx.commit()?;
523 Ok((1, chunks.len() as u64))
524 }
525
526 pub fn rebuild_embedding_jobs(&self, model_id: &str) -> Result<u64> {
529 let now = chrono::Utc::now().timestamp();
530 let n = self.conn.lock().execute(
531 "INSERT INTO embedding_jobs(chunk_id, content_hash, model_id, status, enqueued_at) \
532 SELECT id, content_hash, ?1, 'pending', ?2 FROM record_chunks \
533 WHERE TRUE ON CONFLICT(chunk_id, model_id) DO NOTHING",
534 params![model_id, now],
535 )?;
536 Ok(n as u64)
537 }
538}
539
540fn write_record(tx: &Transaction<'_>, r: &AnamnesisRecord) -> Result<()> {
541 let tags = if r.tags.is_empty() {
542 None
543 } else {
544 Some(serde_json::to_string(&r.tags).unwrap_or_default())
545 };
546 let metadata = if r.metadata.is_empty() {
547 None
548 } else {
549 Some(serde_json::to_string(&r.metadata).unwrap_or_default())
550 };
551 tx.execute(
552 "INSERT INTO records(\
553 id, adapter, instance, content, scope, kind, \
554 created_at, updated_at, tags, metadata, \
555 native_id, native_path, captured_at, raw_hash, schema_version, \
556 derived_from\
557 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16) \
558 ON CONFLICT(id) DO UPDATE SET \
559 content = excluded.content, \
560 scope = excluded.scope, \
561 kind = excluded.kind, \
562 updated_at = excluded.updated_at, \
563 tags = excluded.tags, \
564 metadata = excluded.metadata, \
565 native_path = excluded.native_path, \
566 raw_hash = excluded.raw_hash, \
567 derived_from = excluded.derived_from",
568 params![
569 r.id.0,
570 r.source.adapter,
571 r.source.instance.as_deref().unwrap_or(""),
572 r.content,
573 scope_str(r.scope),
574 kind_str(r.kind),
575 ts(r.created_at),
576 r.updated_at.map(ts),
577 tags,
578 metadata,
579 r.provenance.native_id,
580 r.provenance.native_path,
581 ts(r.provenance.captured_at),
582 r.provenance.raw_hash,
583 r.schema_version,
584 r.provenance.derived_from.as_ref().map(|rid| rid.0.clone()),
585 ],
586 )?;
587 Ok(())
588}
589
590fn write_raw_artifact(
591 tx: &Transaction<'_>,
592 r: &AnamnesisRecord,
593 payload_json: Option<&str>,
594 now: i64,
595) -> Result<()> {
596 let (src_emb, src_model, src_dim) = match &r.embedding {
598 Some(e) => (
599 Some(f32_to_blob(&e.vector)),
600 Some(e.model.clone()),
601 Some(e.dim as i64),
602 ),
603 None => (None, None, None),
604 };
605 tx.execute(
606 "INSERT INTO raw_artifacts(record_id, payload_json, source_embedding, \
607 source_embedding_model, source_embedding_dim, captured_at) \
608 VALUES(?1, ?2, ?3, ?4, ?5, ?6) \
609 ON CONFLICT(record_id) DO UPDATE SET \
610 payload_json = excluded.payload_json, \
611 source_embedding = excluded.source_embedding, \
612 source_embedding_model = excluded.source_embedding_model, \
613 source_embedding_dim = excluded.source_embedding_dim, \
614 captured_at = excluded.captured_at",
615 params![
616 r.id.0,
617 payload_json,
618 src_emb.as_deref(),
619 src_model,
620 src_dim,
621 now,
622 ],
623 )?;
624 Ok(())
625}
626
627fn write_chunks(tx: &Transaction<'_>, r: &AnamnesisRecord, chunks: &[Chunk]) -> Result<()> {
628 tx.execute(
630 "DELETE FROM record_chunks WHERE record_id = ?1",
631 params![r.id.0],
632 )?;
633 for c in chunks {
634 let cid = format!("{}:{}", c.record_id.0, c.seq);
635 tx.execute(
636 "INSERT INTO record_chunks(id, record_id, seq, content, content_hash, token_estimate) \
637 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
638 params![
639 cid,
640 c.record_id.0,
641 c.seq,
642 c.content,
643 c.content_hash.0,
644 c.token_estimate
645 ],
646 )?;
647 }
648 Ok(())
649}
650
651fn enqueue_jobs(tx: &Transaction<'_>, chunks: &[Chunk], model_id: &str, now: i64) -> Result<()> {
652 for c in chunks {
653 let cid = format!("{}:{}", c.record_id.0, c.seq);
654 tx.execute(
655 "INSERT INTO embedding_jobs(chunk_id, content_hash, model_id, status, enqueued_at) \
656 VALUES(?1, ?2, ?3, 'pending', ?4) \
657 ON CONFLICT(chunk_id, model_id) DO NOTHING",
658 params![cid, c.content_hash.0, model_id, now],
659 )?;
660 }
661 Ok(())
662}
663
664impl Store {
669 pub fn list_recent_record_ids(&self, limit: u32) -> Result<Vec<String>> {
680 let conn = self.conn.lock();
681 let mut stmt = conn.prepare("SELECT id FROM records ORDER BY created_at DESC LIMIT ?1")?;
682 let rows = stmt
683 .query_map(params![limit], |r| r.get::<_, String>(0))?
684 .collect::<rusqlite::Result<Vec<_>>>()?;
685 Ok(rows)
686 }
687
688 pub fn list_record_ids_paged(
710 &self,
711 cursor: Option<&str>,
712 limit: u32,
713 ) -> Result<(Vec<String>, Option<String>)> {
714 let limit = limit.clamp(1, MAX_LIST_LIMIT);
715 let conn = self.conn.lock();
716 let rows: Vec<String> = match cursor {
720 Some(c) => {
721 let mut stmt =
722 conn.prepare("SELECT id FROM records WHERE id > ?1 ORDER BY id ASC LIMIT ?2")?;
723 let out = stmt
724 .query_map(params![c, limit], |r| r.get::<_, String>(0))?
725 .collect::<rusqlite::Result<Vec<_>>>()?;
726 out
727 }
728 None => {
729 let mut stmt = conn.prepare("SELECT id FROM records ORDER BY id ASC LIMIT ?1")?;
730 let out = stmt
731 .query_map(params![limit], |r| r.get::<_, String>(0))?
732 .collect::<rusqlite::Result<Vec<_>>>()?;
733 out
734 }
735 };
736 let next = if rows.len() as u32 == limit {
739 rows.last().cloned()
740 } else {
741 None
742 };
743 Ok((rows, next))
744 }
745
746 pub fn get_record(&self, id: &RecordId) -> Result<Option<AnamnesisRecord>> {
748 let conn = self.conn.lock();
749 let mut stmt = conn.prepare(
750 "SELECT id, adapter, instance, content, scope, kind, \
751 created_at, updated_at, tags, metadata, \
752 native_id, native_path, captured_at, raw_hash, schema_version, \
753 derived_from \
754 FROM records WHERE id = ?1",
755 )?;
756 let row = stmt.query_row(params![id.0], record_from_row).optional()?;
757 Ok(row)
758 }
759
760 pub fn list_derivations(&self, parent: &RecordId, limit: u32) -> Result<Vec<AnamnesisRecord>> {
772 let limit = limit.clamp(1, MAX_LIST_LIMIT);
773 let conn = self.conn.lock();
774 let mut stmt = conn.prepare(
775 "SELECT id, adapter, instance, content, scope, kind, \
776 created_at, updated_at, tags, metadata, \
777 native_id, native_path, captured_at, raw_hash, schema_version, \
778 derived_from \
779 FROM records \
780 WHERE derived_from = ?1 \
781 ORDER BY created_at ASC, id ASC \
782 LIMIT ?2",
783 )?;
784 let rows = stmt
785 .query_map(params![parent.0, limit], record_from_row)?
786 .collect::<rusqlite::Result<Vec<_>>>()?;
787 Ok(rows)
788 }
789
790 pub fn lineage_chain(&self, start: &RecordId) -> Result<Option<LineageChain>> {
804 let mut chain: Vec<AnamnesisRecord> = Vec::new();
805 let mut seen = std::collections::HashSet::new();
806 let mut cursor = Some(start.clone());
807 let mut missing_parent: Option<RecordId> = None;
808
809 while let Some(cur) = cursor {
810 if !seen.insert(cur.0.clone()) {
811 return Err(StoreError::Corruption(format!(
812 "lineage cycle detected at {}",
813 cur.0
814 )));
815 }
816 match self.get_record(&cur)? {
817 Some(record) => {
818 let next = record.provenance.derived_from.clone();
819 chain.push(record);
820 cursor = next;
821 }
822 None => {
823 if chain.is_empty() {
826 return Ok(None);
827 }
828 missing_parent = Some(cur);
829 break;
830 }
831 }
832 }
833
834 Ok(Some(LineageChain {
835 records: chain,
836 missing_parent,
837 }))
838 }
839
840 pub fn record_summary(&self, id: &RecordId) -> Result<Option<RecordSummary>> {
853 let conn = self.conn.lock();
854
855 let exists: bool = conn
857 .query_row("SELECT 1 FROM records WHERE id = ?1", params![id.0], |_| {
858 Ok(true)
859 })
860 .optional()?
861 .unwrap_or(false);
862 if !exists {
863 return Ok(None);
864 }
865
866 let chunk_count: i64 = conn.query_row(
867 "SELECT COUNT(*) FROM record_chunks WHERE record_id = ?1",
868 params![id.0],
869 |r| r.get(0),
870 )?;
871
872 let active_model: Option<String> = conn
874 .query_row(
875 "SELECT value FROM meta WHERE key = 'active_embedding_model'",
876 [],
877 |r| r.get(0),
878 )
879 .optional()?;
880
881 let embedded_chunk_count: i64 = match active_model.as_deref() {
884 Some(model) => conn.query_row(
885 "SELECT COUNT(*) FROM chunk_embeddings e \
886 JOIN record_chunks rc ON rc.id = e.chunk_id \
887 WHERE rc.record_id = ?1 AND e.model_id = ?2",
888 params![id.0, model],
889 |r| r.get(0),
890 )?,
891 None => 0,
892 };
893
894 let (source_model, source_dim): (Option<String>, Option<i64>) = conn
898 .query_row(
899 "SELECT source_embedding_model, source_embedding_dim \
900 FROM raw_artifacts WHERE record_id = ?1",
901 params![id.0],
902 |r| Ok((r.get(0)?, r.get(1)?)),
903 )
904 .optional()?
905 .unwrap_or((None, None));
906
907 Ok(Some(RecordSummary {
908 chunk_count: chunk_count as u64,
909 embedded_chunk_count: embedded_chunk_count as u64,
910 active_model,
911 source_embedding_model: source_model,
912 source_embedding_dim: source_dim.map(|d| d as u32),
913 }))
914 }
915
916 pub fn get_chunk(&self, chunk_id: &str) -> Result<Option<ChunkLookup>> {
924 let conn = self.conn.lock();
925 conn.query_row(
926 "SELECT rc.id, rc.record_id, rc.seq, rc.content, \
927 rc.content_hash, rc.token_estimate \
928 FROM record_chunks rc \
929 WHERE rc.id = ?1",
930 params![chunk_id],
931 |r| {
932 Ok(ChunkLookup {
933 chunk_id: r.get(0)?,
934 record_id: RecordId(r.get(1)?),
935 seq: r.get::<_, i64>(2)? as u32,
936 content: r.get(3)?,
937 content_hash: ContentHash(r.get(4)?),
938 token_estimate: r.get::<_, i64>(5)? as u32,
939 })
940 },
941 )
942 .optional()
943 .map_err(Into::into)
944 }
945}
946
947#[derive(Debug, Clone, PartialEq, Eq)]
951pub struct RecordSummary {
952 pub chunk_count: u64,
954 pub embedded_chunk_count: u64,
959 pub active_model: Option<String>,
962 pub source_embedding_model: Option<String>,
966 pub source_embedding_dim: Option<u32>,
968}
969
970#[derive(Debug, Clone, PartialEq)]
985pub struct LineageChain {
986 pub records: Vec<AnamnesisRecord>,
988 pub missing_parent: Option<RecordId>,
992}
993
994#[derive(Debug, Clone, PartialEq, Eq)]
998pub struct ChunkLookup {
999 pub chunk_id: String,
1001 pub record_id: RecordId,
1003 pub seq: u32,
1005 pub content: String,
1007 pub content_hash: ContentHash,
1009 pub token_estimate: u32,
1011}
1012
1013fn record_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<AnamnesisRecord> {
1014 let tags_json: Option<String> = row.get(8)?;
1015 let meta_json: Option<String> = row.get(9)?;
1016 let updated_at: Option<i64> = row.get(7)?;
1017 let instance: String = row.get(2)?;
1018 Ok(AnamnesisRecord {
1019 id: RecordId(row.get(0)?),
1020 source: SourceDescriptor {
1021 adapter: row.get(1)?,
1022 instance: if instance.is_empty() {
1023 None
1024 } else {
1025 Some(instance)
1026 },
1027 version: String::new(), },
1029 content: row.get(3)?,
1030 embedding: None, scope: scope_from(&row.get::<_, String>(4)?),
1032 kind: kind_from(&row.get::<_, String>(5)?),
1033 created_at: dt(row.get(6)?),
1034 updated_at: updated_at.map(dt),
1035 tags: tags_json
1036 .and_then(|s| serde_json::from_str(&s).ok())
1037 .unwrap_or_default(),
1038 metadata: meta_json
1039 .and_then(|s| serde_json::from_str(&s).ok())
1040 .unwrap_or_default(),
1041 provenance: Provenance {
1042 native_id: row.get(10)?,
1043 native_path: row.get(11)?,
1044 captured_at: dt(row.get(12)?),
1045 raw_hash: row.get(13)?,
1046 derived_from: row.get::<_, Option<String>>(15)?.map(RecordId),
1047 },
1048 schema_version: row.get::<_, i64>(14)? as u32,
1049 })
1050}
1051
1052impl Store {
1057 pub fn search_chunks_fts(
1060 &self,
1061 query: &str,
1062 filter: &SearchFilter,
1063 limit: u32,
1064 ) -> Result<Vec<ChunkHit>> {
1065 let match_query = crate::cjk::tokenize_query(query);
1072 if match_query.is_empty() {
1073 return Ok(Vec::new());
1076 }
1077
1078 let mut sql = String::from(
1088 "SELECT rc.id, rc.record_id, rc.seq, rc.content, bm25(chunks_fts) AS score \
1089 FROM chunks_fts \
1090 JOIN record_chunks rc ON rc.rowid = chunks_fts.rowid",
1091 );
1092 let need_records_join = !filter.is_empty();
1093 if need_records_join {
1094 sql.push_str(" JOIN records r ON r.id = rc.record_id");
1095 }
1096 sql.push_str(" WHERE chunks_fts MATCH ?");
1097 let filter_params = append_filter_predicates(&mut sql, filter);
1098 sql.push_str(" ORDER BY score LIMIT ?");
1099
1100 let conn = self.conn.lock();
1101 let mut stmt = conn.prepare(&sql)?;
1102 let mut bound: Vec<rusqlite::types::Value> = Vec::with_capacity(2 + filter_params.len());
1103 bound.push(rusqlite::types::Value::Text(match_query));
1104 bound.extend(filter_params);
1105 bound.push(rusqlite::types::Value::Integer(limit as i64));
1106 let rows = stmt
1107 .query_map(rusqlite::params_from_iter(bound.iter()), |r| {
1108 let raw_score: f64 = r.get(4)?;
1109 Ok(ChunkHit {
1110 chunk_id: r.get(0)?,
1111 record_id: RecordId(r.get(1)?),
1112 seq: r.get::<_, i64>(2)? as u32,
1113 content: r.get(3)?,
1114 score: -raw_score, })
1116 })?
1117 .collect::<rusqlite::Result<Vec<_>>>()?;
1118 Ok(rows)
1119 }
1120
1121 pub fn search_chunks_vec(
1131 &self,
1132 query_vec: &[f32],
1133 model_id: &str,
1134 filter: &SearchFilter,
1135 limit: u32,
1136 ) -> Result<Vec<ChunkHit>> {
1137 let mut sql = String::from(
1138 "SELECT e.chunk_id, e.embedding, rc.record_id, rc.seq, rc.content \
1139 FROM chunk_embeddings e \
1140 JOIN record_chunks rc ON rc.id = e.chunk_id",
1141 );
1142 let need_records_join = !filter.is_empty();
1143 if need_records_join {
1144 sql.push_str(" JOIN records r ON r.id = rc.record_id");
1145 }
1146 sql.push_str(" WHERE e.model_id = ?");
1147 let filter_params = append_filter_predicates(&mut sql, filter);
1148 let conn = self.conn.lock();
1153 let mut stmt = conn.prepare(&sql)?;
1154 let mut bound: Vec<rusqlite::types::Value> = Vec::with_capacity(1 + filter_params.len());
1155 bound.push(rusqlite::types::Value::Text(model_id.to_string()));
1156 bound.extend(filter_params);
1157 let mut scored: Vec<ChunkHit> = Vec::new();
1158 let rows = stmt.query_map(rusqlite::params_from_iter(bound.iter()), |r| {
1159 Ok((
1160 r.get::<_, String>(0)?,
1161 r.get::<_, Vec<u8>>(1)?,
1162 r.get::<_, String>(2)?,
1163 r.get::<_, i64>(3)?,
1164 r.get::<_, String>(4)?,
1165 ))
1166 })?;
1167 for row in rows {
1168 let (chunk_id, blob, rid, seq, content) = row?;
1169 let v = blob_to_f32(&blob)?;
1170 let score = cosine(query_vec, &v);
1171 scored.push(ChunkHit {
1172 chunk_id,
1173 record_id: RecordId(rid),
1174 seq: seq as u32,
1175 content,
1176 score,
1177 });
1178 }
1179 scored.sort_by(|a, b| {
1180 b.score
1181 .partial_cmp(&a.score)
1182 .unwrap_or(std::cmp::Ordering::Equal)
1183 });
1184 scored.truncate(limit as usize);
1185 Ok(scored)
1186 }
1187}
1188
1189fn append_filter_predicates(
1197 sql: &mut String,
1198 filter: &SearchFilter,
1199) -> Vec<rusqlite::types::Value> {
1200 use rusqlite::types::Value as V;
1201 let mut params: Vec<V> = Vec::new();
1202 if let Some(s) = &filter.source {
1203 sql.push_str(" AND r.adapter = ?");
1204 params.push(V::Text(s.clone()));
1205 }
1206 if let Some(i) = &filter.instance {
1207 sql.push_str(" AND r.instance = ?");
1211 params.push(V::Text(i.clone()));
1212 }
1213 if let Some(k) = &filter.kind {
1214 sql.push_str(" AND r.kind = ?");
1215 params.push(V::Text(k.clone()));
1216 }
1217 if let Some(sc) = &filter.scope {
1218 sql.push_str(" AND r.scope = ?");
1219 params.push(V::Text(sc.clone()));
1220 }
1221 if let Some(from) = filter.time_from {
1222 sql.push_str(" AND r.created_at >= ?");
1223 params.push(V::Integer(from));
1224 }
1225 if let Some(to) = filter.time_to {
1226 sql.push_str(" AND r.created_at <= ?");
1227 params.push(V::Integer(to));
1228 }
1229 params
1230}
1231
1232impl Store {
1237 pub fn claim_next_job(&self, model_id: &str) -> Result<Option<PendingEmbeddingJob>> {
1240 let mut conn = self.conn.lock();
1241 let tx = conn.transaction()?;
1242 let now = chrono::Utc::now().timestamp();
1243 let row: Option<(i64, String, String)> = tx
1244 .query_row(
1245 "SELECT id, chunk_id, content_hash FROM embedding_jobs \
1246 WHERE status = 'pending' AND model_id = ?1 \
1247 ORDER BY enqueued_at ASC LIMIT 1",
1248 params![model_id],
1249 |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1250 )
1251 .optional()?;
1252 let Some((job_id, chunk_id, content_hash)) = row else {
1253 tx.commit()?;
1254 return Ok(None);
1255 };
1256 tx.execute(
1257 "UPDATE embedding_jobs SET status = 'in_progress', claimed_at = ?1 WHERE id = ?2",
1258 params![now, job_id],
1259 )?;
1260 let content: String = tx.query_row(
1261 "SELECT content FROM record_chunks WHERE id = ?1",
1262 params![chunk_id],
1263 |r| r.get(0),
1264 )?;
1265 tx.commit()?;
1266 Ok(Some(PendingEmbeddingJob {
1267 job_id,
1268 chunk_id,
1269 content_hash: ContentHash(content_hash),
1270 model_id: model_id.to_string(),
1271 content,
1272 }))
1273 }
1274
1275 pub fn complete_job(&self, job: &PendingEmbeddingJob, vector: &[f32]) -> Result<()> {
1277 let dim = vector.len() as i64;
1278 let blob = f32_to_blob(vector);
1279 let mut conn = self.conn.lock();
1280 let tx = conn.transaction()?;
1281 let now = chrono::Utc::now().timestamp();
1282 tx.execute(
1283 "INSERT INTO chunk_embeddings(chunk_id, model_id, content_hash, dim, embedding, created_at) \
1284 VALUES(?1, ?2, ?3, ?4, ?5, ?6) \
1285 ON CONFLICT(chunk_id, model_id) DO UPDATE SET \
1286 content_hash = excluded.content_hash, \
1287 dim = excluded.dim, \
1288 embedding = excluded.embedding, \
1289 created_at = excluded.created_at",
1290 params![
1291 job.chunk_id,
1292 job.model_id,
1293 job.content_hash.0,
1294 dim,
1295 blob,
1296 now,
1297 ],
1298 )?;
1299 tx.execute(
1300 "UPDATE embedding_jobs SET status = 'done', finished_at = ?1, error = NULL WHERE id = ?2",
1301 params![now, job.job_id],
1302 )?;
1303 tx.commit()?;
1304 Ok(())
1305 }
1306
1307 pub fn fail_job(&self, job_id: i64, error: &str) -> Result<()> {
1309 let now = chrono::Utc::now().timestamp();
1310 self.conn.lock().execute(
1311 "UPDATE embedding_jobs SET status = 'failed', finished_at = ?1, error = ?2 WHERE id = ?3",
1312 params![now, error, job_id],
1313 )?;
1314 Ok(())
1315 }
1316}
1317
1318impl Store {
1323 pub fn log_import_error(
1325 &self,
1326 adapter: &str,
1327 instance: Option<&str>,
1328 native_id: Option<&str>,
1329 native_path: Option<&str>,
1330 phase: &str,
1331 error: &str,
1332 ) -> Result<()> {
1333 let now = chrono::Utc::now().timestamp();
1334 self.conn.lock().execute(
1335 "INSERT INTO import_errors(adapter, instance, native_id, native_path, phase, error, occurred_at) \
1336 VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1337 params![adapter, instance.unwrap_or(""), native_id, native_path, phase, error, now],
1338 )?;
1339 Ok(())
1340 }
1341
1342 pub fn stats(&self) -> Result<StoreStats> {
1344 let conn = self.conn.lock();
1345 let records: i64 = conn.query_row("SELECT COUNT(1) FROM records", [], |r| r.get(0))?;
1346 let chunks: i64 = conn.query_row("SELECT COUNT(1) FROM record_chunks", [], |r| r.get(0))?;
1347 let pending: i64 = conn.query_row(
1348 "SELECT COUNT(1) FROM embedding_jobs WHERE status IN ('pending','in_progress')",
1349 [],
1350 |r| r.get(0),
1351 )?;
1352 let failed: i64 = conn.query_row(
1353 "SELECT COUNT(1) FROM embedding_jobs WHERE status = 'failed'",
1354 [],
1355 |r| r.get(0),
1356 )?;
1357 let sources: i64 = conn.query_row("SELECT COUNT(1) FROM sources", [], |r| r.get(0))?;
1358 Ok(StoreStats {
1359 records: records as u64,
1360 chunks: chunks as u64,
1361 jobs_pending: pending as u64,
1362 jobs_failed: failed as u64,
1363 sources: sources as u64,
1364 })
1365 }
1366}
1367
1368#[cfg(test)]
1373mod tests {
1374 use super::*;
1375 use anamnesis_core::chunker::Chunker;
1376 use anamnesis_core::model::{Embedding, Provenance, SourceDescriptor};
1377 use chrono::Utc;
1378
1379 fn make_record(adapter: &str, native_id: &str, content: &str, kind: Kind) -> AnamnesisRecord {
1380 let id = RecordId::from_parts(adapter, None, native_id);
1381 AnamnesisRecord {
1382 id,
1383 source: SourceDescriptor {
1384 adapter: adapter.into(),
1385 instance: None,
1386 version: "0.0.1".into(),
1387 },
1388 content: content.into(),
1389 embedding: None,
1390 scope: Scope::User,
1391 kind,
1392 created_at: Utc::now(),
1393 updated_at: None,
1394 tags: vec!["t1".into(), "t2".into()],
1395 metadata: Default::default(),
1396 provenance: Provenance {
1397 native_id: native_id.into(),
1398 native_path: Some(format!("/tmp/{native_id}.md")),
1399 captured_at: Utc::now(),
1400 raw_hash: "h".into(),
1401 derived_from: None,
1402 },
1403 schema_version: anamnesis_core::SCHEMA_VERSION,
1404 }
1405 }
1406
1407 #[test]
1408 fn f32_blob_roundtrip() {
1409 let v = vec![0.1f32, -0.2, 1e10, -1e-10, 0.0];
1410 let back = blob_to_f32(&f32_to_blob(&v)).unwrap();
1411 assert_eq!(v, back);
1412 }
1413
1414 #[test]
1415 fn cosine_basic() {
1416 assert!((cosine(&[1.0, 0.0], &[1.0, 0.0]) - 1.0).abs() < 1e-9);
1417 assert!(cosine(&[1.0, 0.0], &[0.0, 1.0]).abs() < 1e-9);
1418 assert!((cosine(&[1.0, 1.0], &[1.0, 1.0]) - 1.0).abs() < 1e-9);
1419 }
1420
1421 #[test]
1422 fn register_and_list_sources() {
1423 let store = Store::open_in_memory().unwrap();
1424 store
1425 .register_source("claude-code", Some("default"), Some("/home/x"), None)
1426 .unwrap();
1427 store
1428 .register_source(
1429 "mem0",
1430 None,
1431 Some("/tmp/m.db"),
1432 Some("{\"mode\":\"sqlite\"}"),
1433 )
1434 .unwrap();
1435 let mut got = store.list_sources().unwrap();
1436 got.sort();
1437 assert_eq!(
1438 got,
1439 vec![
1440 (
1441 "claude-code".into(),
1442 "default".into(),
1443 Some("/home/x".into())
1444 ),
1445 ("mem0".into(), "".into(), Some("/tmp/m.db".into())),
1446 ]
1447 );
1448 }
1449
1450 #[test]
1453 fn get_source_normalises_none_instance_to_empty_string() {
1454 let store = Store::open_in_memory().unwrap();
1459 store
1460 .register_source("mem0", None, Some("/path/db.sqlite"), None)
1461 .unwrap();
1462 let row = store.get_source("mem0", None).unwrap();
1463 let row = row.expect("instance=None must round-trip via get_source");
1464 assert_eq!(row.adapter, "mem0");
1465 assert_eq!(row.instance, "", "default instance stored as empty string");
1466 assert_eq!(row.location.as_deref(), Some("/path/db.sqlite"));
1467 assert!(row.last_import_at.is_none());
1468 let row_via_empty = store.get_source("mem0", Some("")).unwrap();
1470 assert!(row_via_empty.is_some(), "Some(\"\") must hit same row");
1471 }
1472
1473 #[test]
1474 fn get_source_returns_none_for_unregistered() {
1475 let store = Store::open_in_memory().unwrap();
1476 assert!(store.get_source("claude-code", None).unwrap().is_none());
1477 assert!(store
1478 .get_source("mem0", Some("nonexistent-instance"))
1479 .unwrap()
1480 .is_none());
1481 }
1482
1483 #[test]
1484 fn update_last_import_at_stamps_existing_row() {
1485 let store = Store::open_in_memory().unwrap();
1486 store
1487 .register_source("claude-code", None, Some("/p"), None)
1488 .unwrap();
1489 assert!(store
1490 .get_source("claude-code", None)
1491 .unwrap()
1492 .unwrap()
1493 .last_import_at
1494 .is_none());
1495 let updated = store.update_last_import_at("claude-code", None).unwrap();
1496 assert!(updated, "update returns true when a row was stamped");
1497 let row = store.get_source("claude-code", None).unwrap().unwrap();
1498 assert!(
1499 row.last_import_at.is_some(),
1500 "last_import_at must be non-null after a successful update"
1501 );
1502 }
1503
1504 #[test]
1505 fn update_last_import_at_for_missing_row_returns_false() {
1506 let store = Store::open_in_memory().unwrap();
1507 let updated = store.update_last_import_at("claude-code", None).unwrap();
1508 assert!(
1509 !updated,
1510 "no matching source row → returns Ok(false) without inserting"
1511 );
1512 assert!(store.list_sources().unwrap().is_empty());
1513 }
1514
1515 #[test]
1516 fn register_source_is_idempotent_keeps_added_at_stable() {
1517 let store = Store::open_in_memory().unwrap();
1520 store
1521 .register_source("mem0", None, Some("/path/A"), None)
1522 .unwrap();
1523 let row1 = store.get_source("mem0", None).unwrap().unwrap();
1524 std::thread::sleep(std::time::Duration::from_secs(1));
1525 store
1526 .register_source("mem0", None, Some("/path/B"), None)
1527 .unwrap();
1528 let rows = store.list_sources().unwrap();
1529 assert_eq!(rows.len(), 1, "no duplicate rows");
1530 let row2 = store.get_source("mem0", None).unwrap().unwrap();
1531 assert_eq!(row1.added_at, row2.added_at, "added_at stays stable");
1532 assert_eq!(row2.location.as_deref(), Some("/path/B"));
1533 }
1534
1535 #[test]
1536 fn list_sources_full_carries_all_fields() {
1537 let store = Store::open_in_memory().unwrap();
1538 store
1539 .register_source("claude-code", Some("work"), Some("/work"), Some("{}"))
1540 .unwrap();
1541 store
1542 .update_last_import_at("claude-code", Some("work"))
1543 .unwrap();
1544 store.register_source("mem0", None, None, None).unwrap(); let rows = store.list_sources_full().unwrap();
1547 assert_eq!(rows.len(), 2);
1548 let cc = rows.iter().find(|r| r.adapter == "claude-code").unwrap();
1549 assert_eq!(cc.instance, "work");
1550 assert_eq!(cc.location.as_deref(), Some("/work"));
1551 assert_eq!(cc.config_json.as_deref(), Some("{}"));
1552 assert!(cc.last_import_at.is_some());
1553 let mem0 = rows.iter().find(|r| r.adapter == "mem0").unwrap();
1554 assert_eq!(mem0.instance, "");
1555 assert!(mem0.location.is_none());
1556 assert!(mem0.last_import_at.is_none());
1557 }
1558
1559 #[test]
1562 fn list_sources_with_counts_includes_zero_for_never_imported_source() {
1563 let store = Store::open_in_memory().unwrap();
1568 store
1569 .register_source("mem0", None, Some("/tmp/missing.db"), None)
1570 .unwrap();
1571 let rows = store.list_sources_with_counts().unwrap();
1572 assert_eq!(rows.len(), 1);
1573 let r = &rows[0];
1574 assert_eq!(r.source.adapter, "mem0");
1575 assert_eq!(r.record_count, 0);
1576 assert_eq!(r.chunk_count, 0);
1577 assert!(r.source.last_import_at.is_none());
1578 }
1579
1580 #[test]
1581 fn list_sources_with_counts_aggregates_records_and_chunks_per_source() {
1582 let store = Store::open_in_memory().unwrap();
1586 store
1587 .register_source("claude-code", None, Some("/c"), None)
1588 .unwrap();
1589 store
1590 .register_source("mem0", Some("prod"), Some("/m"), None)
1591 .unwrap();
1592
1593 for native in ["a", "b", "c"] {
1594 let r = make_record("claude-code", native, "x", Kind::Fact);
1595 let c = Chunker::default().chunk(&r.id, &r.content);
1596 store.upsert_record(&r, &c, None).unwrap();
1597 }
1598 let mut mem_r = make_record("mem0", "m1", "y", Kind::Fact);
1605 mem_r.source.instance = Some("prod".into());
1606 mem_r.id = RecordId::from_parts("mem0", Some("prod"), "m1");
1607 let mem_c = Chunker::default().chunk(&mem_r.id, &mem_r.content);
1608 store.upsert_record(&mem_r, &mem_c, None).unwrap();
1609
1610 let rows = store.list_sources_with_counts().unwrap();
1611 assert_eq!(rows.len(), 2);
1612 let cc = rows
1613 .iter()
1614 .find(|r| r.source.adapter == "claude-code")
1615 .unwrap();
1616 assert_eq!(
1617 cc.source.instance, "",
1618 "default instance kept as empty string"
1619 );
1620 assert_eq!(cc.record_count, 3);
1621 assert_eq!(cc.chunk_count, 3);
1622 let mem = rows.iter().find(|r| r.source.adapter == "mem0").unwrap();
1623 assert_eq!(
1624 mem.source.instance, "prod",
1625 "instance must round-trip through the JOIN"
1626 );
1627 assert_eq!(mem.record_count, 1);
1628 assert_eq!(mem.chunk_count, 1);
1629 }
1630
1631 #[test]
1632 fn list_sources_with_counts_groups_by_adapter_and_instance_not_just_adapter() {
1633 let store = Store::open_in_memory().unwrap();
1637 store
1638 .register_source("mem0", Some("self-hosted"), Some("/local"), None)
1639 .unwrap();
1640 store
1641 .register_source("mem0", Some("cloud"), Some("https://x"), None)
1642 .unwrap();
1643
1644 for native in ["x", "y"] {
1646 let mut r = make_record("mem0", native, "z", Kind::Fact);
1647 r.source.instance = Some("self-hosted".into());
1648 r.id = RecordId::from_parts("mem0", Some("self-hosted"), native);
1649 let c = Chunker::default().chunk(&r.id, &r.content);
1650 store.upsert_record(&r, &c, None).unwrap();
1651 }
1652
1653 let rows = store.list_sources_with_counts().unwrap();
1654 assert_eq!(rows.len(), 2, "two distinct (adapter, instance) rows");
1655 let local = rows
1656 .iter()
1657 .find(|r| r.source.instance == "self-hosted")
1658 .unwrap();
1659 assert_eq!(local.record_count, 2);
1660 let cloud = rows.iter().find(|r| r.source.instance == "cloud").unwrap();
1661 assert_eq!(cloud.record_count, 0);
1662 }
1663
1664 #[test]
1665 fn upsert_round_trips_record() {
1666 let store = Store::open_in_memory().unwrap();
1667 let r = make_record("claude-code", "n1", "alpha beta gamma", Kind::Preference);
1668 let chunks = Chunker::default().chunk(&r.id, &r.content);
1669 let (added, n_chunks) = store.upsert_record(&r, &chunks, Some("{}")).unwrap();
1670 assert_eq!(added, 1);
1671 assert_eq!(n_chunks, 1);
1672 let back = store.get_record(&r.id).unwrap().unwrap();
1673 assert_eq!(back.id, r.id);
1674 assert_eq!(back.content, r.content);
1675 assert_eq!(back.kind, Kind::Preference);
1676 assert_eq!(back.scope, Scope::User);
1677 assert_eq!(back.tags, vec!["t1".to_string(), "t2".to_string()]);
1678 assert_eq!(back.source.adapter, "claude-code");
1679 assert!(back.source.instance.is_none());
1680 }
1681
1682 #[test]
1695 fn reupsert_with_unchanged_raw_hash_returns_zero_zero() {
1696 let store = Store::open_in_memory().unwrap();
1697 let r = make_record("a", "x", "stable content", Kind::Fact);
1698 let c = Chunker::default().chunk(&r.id, &r.content);
1699 let (n1, k1) = store.upsert_record(&r, &c, Some("{\"v\":1}")).unwrap();
1700 assert_eq!((n1, k1), (1, c.len() as u64));
1701
1702 let (n2, k2) = store.upsert_record(&r, &c, Some("{\"v\":1}")).unwrap();
1704 assert_eq!(
1705 (n2, k2),
1706 (0, 0),
1707 "re-upsert with unchanged raw_hash must report zero work"
1708 );
1709 }
1710
1711 #[test]
1712 fn reupsert_with_unchanged_raw_hash_does_not_touch_chunks() {
1713 let store = Store::open_in_memory().unwrap();
1719 let r = make_record("a", "x", "the quick brown fox", Kind::Fact);
1720 let c = Chunker::default().chunk(&r.id, &r.content);
1721 store.upsert_record(&r, &c, None).unwrap();
1722 let rowid_before: i64 = store
1723 .conn()
1724 .query_row(
1725 "SELECT rowid FROM record_chunks WHERE record_id = ?1",
1726 params![r.id.0],
1727 |row| row.get(0),
1728 )
1729 .unwrap();
1730
1731 store.upsert_record(&r, &c, None).unwrap();
1732 let rowid_after: i64 = store
1733 .conn()
1734 .query_row(
1735 "SELECT rowid FROM record_chunks WHERE record_id = ?1",
1736 params![r.id.0],
1737 |row| row.get(0),
1738 )
1739 .unwrap();
1740 assert_eq!(
1741 rowid_before, rowid_after,
1742 "rowid changed → DELETE+INSERT happened → jieba triggers fired"
1743 );
1744 let hits = store
1746 .search_chunks_fts("quick fox", &SearchFilter::default(), 5)
1747 .unwrap();
1748 assert_eq!(hits.len(), 1);
1749 }
1750
1751 #[test]
1752 fn reupsert_with_changed_raw_hash_still_rewrites_chunks() {
1753 let store = Store::open_in_memory().unwrap();
1757 let mut r = make_record("a", "x", "old content", Kind::Fact);
1758 r.provenance.raw_hash = "hash-v1".into();
1759 let c1 = Chunker::default().chunk(&r.id, &r.content);
1760 store.upsert_record(&r, &c1, None).unwrap();
1761
1762 let mut r2 = r.clone();
1763 r2.content = "new completely different content".into();
1764 r2.provenance.raw_hash = "hash-v2".into();
1765 let c2 = Chunker::default().chunk(&r2.id, &r2.content);
1766 let (n, k) = store.upsert_record(&r2, &c2, None).unwrap();
1767 assert_eq!(n, 1, "raw_hash changed → record written");
1768 assert_eq!(k, c2.len() as u64, "chunks rewritten");
1769 let hits = store
1770 .search_chunks_fts("different", &SearchFilter::default(), 5)
1771 .unwrap();
1772 assert!(!hits.is_empty(), "new content searchable");
1773 let stale = store
1774 .search_chunks_fts("old", &SearchFilter::default(), 5)
1775 .unwrap();
1776 assert!(stale.is_empty(), "old content evicted");
1777 }
1778
1779 #[test]
1780 fn reupsert_no_op_still_enqueues_jobs_for_active_model() {
1781 let store = Store::open_in_memory().unwrap();
1787 let r = make_record("a", "x", "hello world", Kind::Fact);
1788 let c = Chunker::default().chunk(&r.id, &r.content);
1789 store.set_active_model("local:model-a:1").unwrap();
1791 store.upsert_record(&r, &c, None).unwrap();
1792
1793 store.set_active_model("local:model-b:1").unwrap();
1796 let (n, k) = store.upsert_record(&r, &c, None).unwrap();
1797 assert_eq!((n, k), (0, 0));
1798
1799 let pending_for_b: i64 = store
1800 .conn()
1801 .query_row(
1802 "SELECT COUNT(*) FROM embedding_jobs \
1803 WHERE status = 'pending' AND model_id = 'local:model-b:1'",
1804 [],
1805 |r| r.get(0),
1806 )
1807 .unwrap();
1808 assert_eq!(
1809 pending_for_b as usize,
1810 c.len(),
1811 "fast-path must still enqueue jobs under the active model"
1812 );
1813 }
1814
1815 #[test]
1816 fn upsert_replaces_chunks_on_recall() {
1817 let store = Store::open_in_memory().unwrap();
1818 let mut r = make_record("a", "x", "v1", Kind::Fact);
1819 r.provenance.raw_hash = "v1-hash".into();
1820 let c1 = Chunker::default().chunk(&r.id, &r.content);
1821 store.upsert_record(&r, &c1, None).unwrap();
1822
1823 let mut r2 = r.clone();
1824 r2.content = "v2 different and longer ".repeat(40);
1825 r2.provenance.raw_hash = "v2-hash".into();
1831 let c2 = Chunker::default().chunk(&r2.id, &r2.content);
1832 store.upsert_record(&r2, &c2, None).unwrap();
1833
1834 let chunk_count: i64 = store
1835 .conn()
1836 .query_row(
1837 "SELECT COUNT(1) FROM record_chunks WHERE record_id = ?1",
1838 params![r2.id.0],
1839 |row| row.get(0),
1840 )
1841 .unwrap();
1842 assert_eq!(chunk_count as usize, c2.len());
1843 let hits = store
1845 .search_chunks_fts("different", &SearchFilter::default(), 5)
1846 .unwrap();
1847 assert!(!hits.is_empty());
1848 let stale = store
1849 .search_chunks_fts("v1", &SearchFilter::default(), 5)
1850 .unwrap();
1851 assert!(stale.is_empty());
1852 }
1853
1854 #[test]
1855 fn fts_search_returns_chunks() {
1856 let store = Store::open_in_memory().unwrap();
1857 let r = make_record(
1858 "a",
1859 "x",
1860 "the quick brown fox jumps over the lazy dog",
1861 Kind::Fact,
1862 );
1863 let c = Chunker::default().chunk(&r.id, &r.content);
1864 store.upsert_record(&r, &c, None).unwrap();
1865 let hits = store
1866 .search_chunks_fts("quick fox", &SearchFilter::default(), 5)
1867 .unwrap();
1868 assert_eq!(hits.len(), 1);
1869 assert_eq!(hits[0].record_id, r.id);
1870 assert!(hits[0].score > 0.0);
1871 }
1872
1873 #[test]
1885 fn cjk_phrase_search_finds_indexed_document() {
1886 let store = Store::open_in_memory().unwrap();
1887 let r = make_record(
1888 "claude-code",
1889 "cjk-1",
1890 "Anamnesis 是跨 agent 的记忆基础设施,本地优先,无 telemetry",
1891 Kind::Fact,
1892 );
1893 let c = Chunker::default().chunk(&r.id, &r.content);
1894 store.upsert_record(&r, &c, None).unwrap();
1895
1896 for query in &["记忆", "基础设施", "本地优先"] {
1899 let hits = store
1900 .search_chunks_fts(query, &SearchFilter::default(), 5)
1901 .unwrap();
1902 assert!(
1903 !hits.is_empty(),
1904 "CJK query {query:?} must find the indexed record"
1905 );
1906 assert_eq!(hits[0].record_id, r.id, "wrong record for query {query:?}");
1907 }
1908 }
1909
1910 #[test]
1911 fn cjk_search_distinguishes_distinct_words() {
1912 let store = Store::open_in_memory().unwrap();
1916 let a = make_record("a", "a1", "我的偏好是 vim", Kind::Preference);
1917 let b = make_record("a", "b1", "项目里有很多代码", Kind::Fact);
1918 let ca = Chunker::default().chunk(&a.id, &a.content);
1919 let cb = Chunker::default().chunk(&b.id, &b.content);
1920 store.upsert_record(&a, &ca, None).unwrap();
1921 store.upsert_record(&b, &cb, None).unwrap();
1922
1923 let hits_pref = store
1924 .search_chunks_fts("偏好", &SearchFilter::default(), 5)
1925 .unwrap();
1926 assert_eq!(hits_pref.len(), 1);
1927 assert_eq!(hits_pref[0].record_id, a.id);
1928
1929 let hits_proj = store
1930 .search_chunks_fts("项目", &SearchFilter::default(), 5)
1931 .unwrap();
1932 assert_eq!(hits_proj.len(), 1);
1933 assert_eq!(hits_proj[0].record_id, b.id);
1934 }
1935
1936 #[test]
1937 fn empty_or_punctuation_only_query_returns_no_hits() {
1938 let store = Store::open_in_memory().unwrap();
1939 let r = make_record("a", "x", "alpha beta gamma", Kind::Fact);
1940 let c = Chunker::default().chunk(&r.id, &r.content);
1941 store.upsert_record(&r, &c, None).unwrap();
1942
1943 let empty = store
1945 .search_chunks_fts("", &SearchFilter::default(), 5)
1946 .unwrap();
1947 assert!(empty.is_empty());
1948 let punct = store
1949 .search_chunks_fts("!!! ???", &SearchFilter::default(), 5)
1950 .unwrap();
1951 assert!(punct.is_empty());
1952 }
1953
1954 #[test]
1955 fn cjk_reindex_picks_up_existing_chunks() {
1956 let store = Store::open_in_memory().unwrap();
1965 let r = make_record("a", "x", "重新索引 测试", Kind::Fact);
1966 let c = Chunker::default().chunk(&r.id, &r.content);
1967 store.upsert_record(&r, &c, None).unwrap();
1968 let conn = store.conn.lock();
1969 let chunks_n: i64 = conn
1970 .query_row("SELECT COUNT(*) FROM record_chunks", [], |r| r.get(0))
1971 .unwrap();
1972 let fts_n: i64 = conn
1973 .query_row("SELECT COUNT(*) FROM chunks_fts", [], |r| r.get(0))
1974 .unwrap();
1975 assert_eq!(chunks_n, fts_n, "every chunk has an FTS row");
1976 assert!(chunks_n > 0);
1977 }
1978
1979 #[test]
1994 fn filter_pushdown_returns_minority_source_under_majority_dominance() {
1995 let store = Store::open_in_memory().unwrap();
1996 for i in 0..1744u32 {
1998 let r = make_record(
1999 "claude-code",
2000 &format!("cc-{i:04}"),
2001 "sharedterm claude noise",
2002 Kind::Episode,
2003 );
2004 let c = Chunker::default().chunk(&r.id, &r.content);
2005 store.upsert_record(&r, &c, None).unwrap();
2006 }
2007 for i in 0..7u32 {
2009 let r = make_record(
2010 "mem0",
2011 &format!("m0-{i}"),
2012 "sharedterm mem0 fact",
2013 Kind::Fact,
2014 );
2015 let c = Chunker::default().chunk(&r.id, &r.content);
2016 store.upsert_record(&r, &c, None).unwrap();
2017 }
2018
2019 let none = store
2021 .search_chunks_fts("sharedterm", &SearchFilter::default(), 50)
2022 .unwrap();
2023 assert_eq!(none.len(), 50, "unfiltered hits fill the pool");
2024 let mem0_in_unfiltered = none
2025 .iter()
2026 .filter(|h| h.content.contains("mem0 fact"))
2027 .count();
2028 assert!(
2029 mem0_in_unfiltered <= 7,
2030 "without pushdown, the 7 mem0 records are squeezed by the 1744 claude-code majority"
2031 );
2032
2033 let filter = SearchFilter {
2036 source: Some("mem0".into()),
2037 ..SearchFilter::default()
2038 };
2039 let mem0_hits = store.search_chunks_fts("sharedterm", &filter, 50).unwrap();
2040 assert!(
2041 !mem0_hits.is_empty(),
2042 "source=mem0 must return non-empty results from the minority adapter"
2043 );
2044 assert_eq!(
2045 mem0_hits.len(),
2046 7,
2047 "filter pushdown must surface all 7 mem0 chunks, not zero"
2048 );
2049 for h in &mem0_hits {
2050 assert!(
2051 h.content.contains("mem0 fact"),
2052 "every hit must come from the mem0 adapter, not the claude-code majority"
2053 );
2054 assert!(
2055 !h.content.contains("claude noise"),
2056 "no claude-code chunk should leak through the SQL filter"
2057 );
2058 }
2059 }
2060
2061 #[test]
2062 fn filter_pushdown_supports_kind_and_scope_independently() {
2063 let store = Store::open_in_memory().unwrap();
2064 for (na, content, kind) in &[
2065 ("a", "shared topic alpha", Kind::Fact),
2066 ("b", "shared topic beta", Kind::Preference),
2067 ("c", "shared topic gamma", Kind::Feedback),
2068 ] {
2069 let r = make_record("claude-code", na, content, *kind);
2070 let c = Chunker::default().chunk(&r.id, &r.content);
2071 store.upsert_record(&r, &c, None).unwrap();
2072 }
2073 let kind_filter = SearchFilter {
2074 kind: Some("preference".into()),
2075 ..SearchFilter::default()
2076 };
2077 let hits = store
2078 .search_chunks_fts("shared topic", &kind_filter, 10)
2079 .unwrap();
2080 assert_eq!(hits.len(), 1);
2081 assert!(hits[0].content.contains("beta"));
2082 }
2083
2084 #[test]
2085 fn filter_pushdown_respects_time_range() {
2086 let store = Store::open_in_memory().unwrap();
2087 for (na, content, ts) in &[
2089 ("old", "shared topic", 1700000000_i64), ("mid", "shared topic", 1750000000_i64), ("new", "shared topic", 1800000000_i64), ] {
2093 let mut r = make_record("claude-code", na, content, Kind::Episode);
2094 r.created_at = Utc.timestamp_opt(*ts, 0).unwrap();
2095 let c = Chunker::default().chunk(&r.id, &r.content);
2096 store.upsert_record(&r, &c, None).unwrap();
2097 }
2098 let filter = SearchFilter {
2099 time_from: Some(1720000000),
2100 time_to: Some(1780000000),
2101 ..SearchFilter::default()
2102 };
2103 let hits = store
2104 .search_chunks_fts("shared topic", &filter, 10)
2105 .unwrap();
2106 assert_eq!(hits.len(), 1, "only the mid record falls in the window");
2107 }
2108
2109 #[test]
2110 fn active_model_setter_reads_back() {
2111 let store = Store::open_in_memory().unwrap();
2112 assert_eq!(store.active_model().unwrap(), None);
2113 store.set_active_model("local:e5:1").unwrap();
2114 assert_eq!(store.active_model().unwrap().as_deref(), Some("local:e5:1"));
2115 store.set_active_model("local:bge-m3:1").unwrap();
2116 assert_eq!(
2117 store.active_model().unwrap().as_deref(),
2118 Some("local:bge-m3:1")
2119 );
2120 }
2121
2122 #[test]
2123 fn upsert_enqueues_jobs_under_active_model() {
2124 let store = Store::open_in_memory().unwrap();
2125 store.set_active_model("local:e5:1").unwrap();
2126 let r = make_record("a", "x", "hello world", Kind::Fact);
2127 let c = Chunker::default().chunk(&r.id, &r.content);
2128 store.upsert_record(&r, &c, None).unwrap();
2129 let n: i64 = store
2130 .conn()
2131 .query_row(
2132 "SELECT COUNT(1) FROM embedding_jobs WHERE status = 'pending' AND model_id = 'local:e5:1'",
2133 [],
2134 |row| row.get(0),
2135 )
2136 .unwrap();
2137 assert_eq!(n, c.len() as i64);
2138 }
2139
2140 #[test]
2141 fn no_active_model_means_no_jobs() {
2142 let store = Store::open_in_memory().unwrap();
2143 let r = make_record("a", "x", "hi", Kind::Fact);
2144 let c = Chunker::default().chunk(&r.id, &r.content);
2145 store.upsert_record(&r, &c, None).unwrap();
2146 let n: i64 = store
2147 .conn()
2148 .query_row("SELECT COUNT(1) FROM embedding_jobs", [], |row| row.get(0))
2149 .unwrap();
2150 assert_eq!(n, 0);
2151 }
2152
2153 #[test]
2154 fn claim_and_complete_job_cycle() {
2155 let store = Store::open_in_memory().unwrap();
2156 store.set_active_model("local:fake:1").unwrap();
2157 let r = make_record("a", "x", "alpha", Kind::Fact);
2158 let c = Chunker::default().chunk(&r.id, &r.content);
2159 store.upsert_record(&r, &c, None).unwrap();
2160
2161 let job = store.claim_next_job("local:fake:1").unwrap().unwrap();
2162 assert_eq!(job.content, "alpha");
2163 assert_eq!(job.model_id, "local:fake:1");
2164
2165 let none = store.claim_next_job("local:fake:1").unwrap();
2167 assert!(none.is_none());
2168
2169 store.complete_job(&job, &[0.5, 0.5, 0.5, 0.5]).unwrap();
2170
2171 let pending: i64 = store
2172 .conn()
2173 .query_row(
2174 "SELECT COUNT(1) FROM embedding_jobs WHERE status = 'pending'",
2175 [],
2176 |r| r.get(0),
2177 )
2178 .unwrap();
2179 assert_eq!(pending, 0);
2180
2181 let hits = store
2183 .search_chunks_vec(
2184 &[0.5, 0.5, 0.5, 0.5],
2185 "local:fake:1",
2186 &SearchFilter::default(),
2187 5,
2188 )
2189 .unwrap();
2190 assert_eq!(hits.len(), 1);
2191 assert!((hits[0].score - 1.0).abs() < 1e-9);
2192 }
2193
2194 #[test]
2195 fn fail_job_marks_failed_and_unblocks_next() {
2196 let store = Store::open_in_memory().unwrap();
2197 store.set_active_model("local:fake:1").unwrap();
2198 let r1 = make_record("a", "x", "one", Kind::Fact);
2199 let r2 = make_record("a", "y", "two", Kind::Fact);
2200 let c1 = Chunker::default().chunk(&r1.id, &r1.content);
2201 let c2 = Chunker::default().chunk(&r2.id, &r2.content);
2202 store.upsert_record(&r1, &c1, None).unwrap();
2203 store.upsert_record(&r2, &c2, None).unwrap();
2204
2205 let j1 = store.claim_next_job("local:fake:1").unwrap().unwrap();
2206 store.fail_job(j1.job_id, "boom").unwrap();
2207
2208 let j2 = store.claim_next_job("local:fake:1").unwrap().unwrap();
2209 assert_ne!(j2.chunk_id, j1.chunk_id);
2210 }
2211
2212 #[test]
2213 fn rebuild_jobs_targets_a_new_model() {
2214 let store = Store::open_in_memory().unwrap();
2215 store.set_active_model("local:a:1").unwrap();
2216 let r = make_record("a", "x", "hi", Kind::Fact);
2217 let c = Chunker::default().chunk(&r.id, &r.content);
2218 store.upsert_record(&r, &c, None).unwrap();
2219
2220 let n = store.rebuild_embedding_jobs("local:b:1").unwrap();
2221 assert_eq!(n, c.len() as u64);
2222
2223 let by_model: Vec<(String, i64)> = {
2224 let conn = store.conn();
2225 let mut stmt = conn
2226 .prepare(
2227 "SELECT model_id, COUNT(1) FROM embedding_jobs GROUP BY model_id ORDER BY model_id",
2228 )
2229 .unwrap();
2230 stmt.query_map([], |r| Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?)))
2231 .unwrap()
2232 .collect::<rusqlite::Result<_>>()
2233 .unwrap()
2234 };
2235 assert_eq!(
2236 by_model,
2237 vec![
2238 ("local:a:1".into(), c.len() as i64),
2239 ("local:b:1".into(), c.len() as i64),
2240 ]
2241 );
2242 }
2243
2244 #[test]
2245 fn stats_reports_counts() {
2246 let store = Store::open_in_memory().unwrap();
2247 store.set_active_model("local:fake:1").unwrap();
2248 store
2249 .register_source("claude-code", None, None, None)
2250 .unwrap();
2251 let r = make_record("a", "x", "hello", Kind::Fact);
2252 let c = Chunker::default().chunk(&r.id, &r.content);
2253 store.upsert_record(&r, &c, None).unwrap();
2254 let s = store.stats().unwrap();
2255 assert_eq!(s.records, 1);
2256 assert_eq!(s.chunks, c.len() as u64);
2257 assert_eq!(s.jobs_pending, c.len() as u64);
2258 assert_eq!(s.jobs_failed, 0);
2259 assert_eq!(s.sources, 1);
2260 }
2261
2262 #[test]
2263 fn import_error_logged_and_visible() {
2264 let store = Store::open_in_memory().unwrap();
2265 store
2266 .log_import_error("a", None, Some("nid"), Some("/p"), "parse", "bad json")
2267 .unwrap();
2268 let count: i64 = store
2269 .conn()
2270 .query_row("SELECT COUNT(1) FROM import_errors", [], |r| r.get(0))
2271 .unwrap();
2272 assert_eq!(count, 1);
2273 }
2274
2275 #[test]
2276 fn source_vector_is_persisted_to_raw_artifacts() {
2277 let store = Store::open_in_memory().unwrap();
2278 let mut r = make_record("mem0", "x", "hi", Kind::Fact);
2279 r.embedding = Some(Embedding {
2280 vector: vec![0.1, 0.2, 0.3],
2281 model: "openai:text-embedding-3-small".into(),
2282 dim: 3,
2283 });
2284 let c = Chunker::default().chunk(&r.id, &r.content);
2285 store.upsert_record(&r, &c, None).unwrap();
2286 let (blob, model, dim): (Vec<u8>, String, i64) = store
2287 .conn()
2288 .query_row(
2289 "SELECT source_embedding, source_embedding_model, source_embedding_dim \
2290 FROM raw_artifacts WHERE record_id = ?1",
2291 params![r.id.0],
2292 |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2293 )
2294 .unwrap();
2295 assert_eq!(model, "openai:text-embedding-3-small");
2296 assert_eq!(dim, 3);
2297 assert_eq!(blob_to_f32(&blob).unwrap(), vec![0.1, 0.2, 0.3]);
2298 }
2299
2300 fn seed_n_records(store: &Store, n: usize) {
2305 for i in 0..n {
2306 let r = make_record(
2307 "claude-code",
2308 &format!("seed-{i:04}"),
2309 &format!("content {i}"),
2310 Kind::Fact,
2311 );
2312 let c = Chunker::default().chunk(&r.id, &r.content);
2313 store.upsert_record(&r, &c, None).unwrap();
2314 }
2315 }
2316
2317 #[test]
2318 fn paged_listing_walks_through_full_catalogue_via_cursor() {
2319 let store = Store::open_in_memory().unwrap();
2320 seed_n_records(&store, 25);
2321
2322 let mut collected: Vec<String> = Vec::new();
2323 let mut cursor: Option<String> = None;
2324 for _ in 0..100 {
2325 let (page, next) = store.list_record_ids_paged(cursor.as_deref(), 10).unwrap();
2327 assert!(!page.is_empty(), "non-final page must have rows");
2329 collected.extend(page);
2330 if next.is_none() {
2331 break;
2332 }
2333 cursor = next;
2334 }
2335 assert_eq!(
2336 collected.len(),
2337 25,
2338 "pagination must yield every record exactly once"
2339 );
2340
2341 let mut sorted = collected.clone();
2343 sorted.sort();
2344 assert_eq!(collected, sorted);
2345
2346 let unique: std::collections::HashSet<&String> = collected.iter().collect();
2348 assert_eq!(unique.len(), collected.len());
2349 }
2350
2351 #[test]
2352 fn paged_listing_signals_end_with_none_cursor() {
2353 let store = Store::open_in_memory().unwrap();
2356 seed_n_records(&store, 3);
2357 let (page, next) = store.list_record_ids_paged(None, 10).unwrap();
2358 assert_eq!(page.len(), 3);
2359 assert!(next.is_none(), "page < limit must clear nextCursor");
2360 }
2361
2362 #[test]
2363 fn paged_listing_clamps_limit() {
2364 let store = Store::open_in_memory().unwrap();
2368 seed_n_records(&store, 5);
2369 let (page, _) = store.list_record_ids_paged(None, 0).unwrap();
2370 assert_eq!(page.len(), 1, "limit=0 must clamp to 1");
2371 let (page, _) = store.list_record_ids_paged(None, u32::MAX).unwrap();
2372 assert!(page.len() <= MAX_LIST_LIMIT as usize);
2373 assert_eq!(page.len(), 5);
2374 }
2375
2376 #[test]
2377 fn derived_from_roundtrips_through_store() {
2378 let store = Store::open_in_memory().unwrap();
2382 let parent = make_record("claude-code", "ep-1", "raw conversation", Kind::Episode);
2383 let parent_id = parent.id.clone();
2384 let chunks = Chunker::default().chunk(&parent.id, &parent.content);
2385 store.upsert_record(&parent, &chunks, None).unwrap();
2386
2387 let mut derived = make_record("extractor", "fact-1", "user lives in Paris", Kind::Fact);
2388 derived.provenance.derived_from = Some(parent_id.clone());
2389 let derived_chunks = Chunker::default().chunk(&derived.id, &derived.content);
2390 let derived_id = derived.id.clone();
2391 store
2392 .upsert_record(&derived, &derived_chunks, None)
2393 .unwrap();
2394
2395 let got_parent = store.get_record(&parent_id).unwrap().unwrap();
2396 assert!(
2397 got_parent.provenance.derived_from.is_none(),
2398 "non-derived records keep derived_from = None on the way back"
2399 );
2400
2401 let got_derived = store.get_record(&derived_id).unwrap().unwrap();
2402 assert_eq!(
2403 got_derived.provenance.derived_from.as_ref().map(|r| &r.0),
2404 Some(&parent_id.0),
2405 "derived record's lineage must point at the source Episode after round-trip"
2406 );
2407 }
2408
2409 #[test]
2410 fn list_derivations_returns_only_direct_children() {
2411 let store = Store::open_in_memory().unwrap();
2412 let parent = make_record("claude-code", "ep-1", "raw conversation", Kind::Episode);
2413 let pid = parent.id.clone();
2414 let pc = Chunker::default().chunk(&parent.id, &parent.content);
2415 store.upsert_record(&parent, &pc, None).unwrap();
2416
2417 let mut child_a = make_record("extractor", "fact-a", "user lives in Paris", Kind::Fact);
2418 child_a.provenance.derived_from = Some(pid.clone());
2419 let c_a = Chunker::default().chunk(&child_a.id, &child_a.content);
2420 store.upsert_record(&child_a, &c_a, None).unwrap();
2421
2422 let mut child_b = make_record("extractor", "pref-a", "prefers Rust", Kind::Preference);
2423 child_b.provenance.derived_from = Some(pid.clone());
2424 let c_b = Chunker::default().chunk(&child_b.id, &child_b.content);
2425 store.upsert_record(&child_b, &c_b, None).unwrap();
2426
2427 let unrelated = make_record("claude-code", "ep-2", "different episode", Kind::Episode);
2429 let cu = Chunker::default().chunk(&unrelated.id, &unrelated.content);
2430 store.upsert_record(&unrelated, &cu, None).unwrap();
2431
2432 let children = store.list_derivations(&pid, 50).unwrap();
2433 assert_eq!(children.len(), 2);
2434 let kinds: std::collections::HashSet<_> = children.iter().map(|r| r.kind).collect();
2435 assert!(kinds.contains(&Kind::Fact));
2436 assert!(kinds.contains(&Kind::Preference));
2437 }
2438
2439 #[test]
2440 fn lineage_chain_walks_to_root() {
2441 let store = Store::open_in_memory().unwrap();
2442 let root = make_record("claude-code", "ep-1", "raw conv", Kind::Episode);
2444 let root_id = root.id.clone();
2445 let rc = Chunker::default().chunk(&root.id, &root.content);
2446 store.upsert_record(&root, &rc, None).unwrap();
2447
2448 let mut mid = make_record("extractor", "fact-a", "Paris is capital", Kind::Fact);
2449 mid.provenance.derived_from = Some(root_id.clone());
2450 let mid_id = mid.id.clone();
2451 let mc = Chunker::default().chunk(&mid.id, &mid.content);
2452 store.upsert_record(&mid, &mc, None).unwrap();
2453
2454 let mut leaf = make_record("extractor", "skill-a", "how to check capital", Kind::Skill);
2455 leaf.provenance.derived_from = Some(mid_id.clone());
2456 let leaf_id = leaf.id.clone();
2457 let lc = Chunker::default().chunk(&leaf.id, &leaf.content);
2458 store.upsert_record(&leaf, &lc, None).unwrap();
2459
2460 let chain = store.lineage_chain(&leaf_id).unwrap().unwrap();
2461 assert_eq!(chain.records.len(), 3);
2462 assert_eq!(chain.records[0].id.0, leaf_id.0);
2463 assert_eq!(chain.records[1].id.0, mid_id.0);
2464 assert_eq!(chain.records[2].id.0, root_id.0);
2465 assert!(chain.missing_parent.is_none());
2466 }
2467
2468 #[test]
2469 fn lineage_chain_missing_parent_is_signaled() {
2470 let store = Store::open_in_memory().unwrap();
2471 let phantom = RecordId("never-stored-record".into());
2472 let mut orphan = make_record("extractor", "orphan", "dangling fact", Kind::Fact);
2473 orphan.provenance.derived_from = Some(phantom.clone());
2474 let oid = orphan.id.clone();
2475 let oc = Chunker::default().chunk(&orphan.id, &orphan.content);
2476 store.upsert_record(&orphan, &oc, None).unwrap();
2477
2478 let chain = store.lineage_chain(&oid).unwrap().unwrap();
2479 assert_eq!(chain.records.len(), 1);
2480 assert_eq!(chain.records[0].id.0, oid.0);
2481 assert_eq!(chain.missing_parent.unwrap().0, phantom.0);
2482 }
2483
2484 #[test]
2485 fn lineage_chain_returns_none_for_unknown_start() {
2486 let store = Store::open_in_memory().unwrap();
2487 let chain = store
2488 .lineage_chain(&RecordId("does-not-exist".into()))
2489 .unwrap();
2490 assert!(chain.is_none());
2491 }
2492
2493 #[test]
2494 fn lineage_chain_detects_cycle_and_errors() {
2495 let store = Store::open_in_memory().unwrap();
2500 let a = make_record("extractor", "a", "node a", Kind::Fact);
2501 let b = make_record("extractor", "b", "node b", Kind::Fact);
2502 let aid = a.id.clone();
2503 let bid = b.id.clone();
2504 let ac = Chunker::default().chunk(&a.id, &a.content);
2505 let bc = Chunker::default().chunk(&b.id, &b.content);
2506 store.upsert_record(&a, &ac, None).unwrap();
2507 store.upsert_record(&b, &bc, None).unwrap();
2508 store
2510 .conn()
2511 .execute(
2512 "UPDATE records SET derived_from = ?1 WHERE id = ?2",
2513 params![bid.0, aid.0],
2514 )
2515 .unwrap();
2516 store
2517 .conn()
2518 .execute(
2519 "UPDATE records SET derived_from = ?1 WHERE id = ?2",
2520 params![aid.0, bid.0],
2521 )
2522 .unwrap();
2523 let err = store.lineage_chain(&aid).unwrap_err();
2524 match err {
2525 StoreError::Corruption(msg) => assert!(msg.contains("cycle")),
2526 other => panic!("expected Corruption, got {other:?}"),
2527 }
2528 }
2529
2530 #[test]
2531 fn derived_from_index_is_present_after_migration() {
2532 let store = Store::open_in_memory().unwrap();
2537 let count: i64 = store
2538 .conn()
2539 .query_row(
2540 "SELECT COUNT(*) FROM sqlite_master \
2541 WHERE type = 'index' AND name = 'idx_records_derived_from'",
2542 [],
2543 |r| r.get(0),
2544 )
2545 .unwrap();
2546 assert_eq!(
2547 count, 1,
2548 "derived_from index must exist after 0004 migration"
2549 );
2550 }
2551}