Skip to main content

anamnesis_store/
api.rs

1//! Typed read/write API over the SQLite store.
2//!
3//! Everything that touches the database goes through this module. `Store`
4//! itself owns the `Connection`; callers must never write SQL directly.
5
6use anamnesis_core::chunk::{Chunk, ContentHash};
7use anamnesis_core::model::{AnamnesisRecord, Kind, Provenance, RecordId, Scope, SourceDescriptor};
8use chrono::{DateTime, TimeZone, Utc};
9use rusqlite::{params, OptionalExtension, Transaction};
10
11use crate::{Result, Store, StoreError};
12
13// ─────────────────────────────────────────────────────────────────────────────
14// Conversion helpers
15// ─────────────────────────────────────────────────────────────────────────────
16
17fn ts(dt: DateTime<Utc>) -> i64 {
18    dt.timestamp()
19}
20
21fn dt(ts: i64) -> DateTime<Utc> {
22    Utc.timestamp_opt(ts, 0).single().unwrap_or_else(Utc::now)
23}
24
25fn scope_str(s: Scope) -> &'static str {
26    match s {
27        Scope::User => "user",
28        Scope::Project => "project",
29        Scope::Session => "session",
30        Scope::Ephemeral => "ephemeral",
31    }
32}
33
34fn scope_from(s: &str) -> Scope {
35    match s {
36        "user" => Scope::User,
37        "project" => Scope::Project,
38        "session" => Scope::Session,
39        "ephemeral" => Scope::Ephemeral,
40        _ => Scope::Ephemeral,
41    }
42}
43
44fn kind_str(k: Kind) -> &'static str {
45    match k {
46        Kind::Fact => "fact",
47        Kind::Preference => "preference",
48        Kind::Feedback => "feedback",
49        Kind::Reference => "reference",
50        Kind::Episode => "episode",
51        Kind::Skill => "skill",
52        Kind::Unknown => "unknown",
53    }
54}
55
56fn kind_from(s: &str) -> Kind {
57    match s {
58        "fact" => Kind::Fact,
59        "preference" => Kind::Preference,
60        "feedback" => Kind::Feedback,
61        "reference" => Kind::Reference,
62        "episode" => Kind::Episode,
63        "skill" => Kind::Skill,
64        _ => Kind::Unknown,
65    }
66}
67
68/// Serialize a `Vec<f32>` to little-endian bytes.
69pub fn f32_to_blob(v: &[f32]) -> Vec<u8> {
70    let mut out = Vec::with_capacity(v.len() * 4);
71    for x in v {
72        out.extend_from_slice(&x.to_le_bytes());
73    }
74    out
75}
76
77/// Parse a little-endian f32 blob back into a vector.
78pub fn blob_to_f32(b: &[u8]) -> Result<Vec<f32>> {
79    if b.len() % 4 != 0 {
80        return Err(StoreError::Sqlite(rusqlite::Error::InvalidQuery));
81    }
82    Ok(b.chunks_exact(4)
83        .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]]))
84        .collect())
85}
86
87fn cosine(a: &[f32], b: &[f32]) -> f64 {
88    if a.len() != b.len() || a.is_empty() {
89        return 0.0;
90    }
91    let mut dot = 0f64;
92    let mut na = 0f64;
93    let mut nb = 0f64;
94    for i in 0..a.len() {
95        let x = a[i] as f64;
96        let y = b[i] as f64;
97        dot += x * y;
98        na += x * x;
99        nb += y * y;
100    }
101    if na == 0.0 || nb == 0.0 {
102        return 0.0;
103    }
104    dot / (na.sqrt() * nb.sqrt())
105}
106
107// ─────────────────────────────────────────────────────────────────────────────
108// Returned shapes
109// ─────────────────────────────────────────────────────────────────────────────
110
111/// A chunk-level search hit.
112#[derive(Debug, Clone, PartialEq)]
113pub struct ChunkHit {
114    /// Synthetic chunk id (`"{record_id}:{seq}"`).
115    pub chunk_id: String,
116    /// Parent record.
117    pub record_id: RecordId,
118    /// Per-record sequence.
119    pub seq: u32,
120    /// The matched chunk content.
121    pub content: String,
122    /// Score in the search-specific scale (FTS: bm25 rank, vector: cosine).
123    pub score: f64,
124}
125
126/// Filter pushed into the SQL candidate-retrieval stage of `search_chunks_*`.
127///
128/// **All fields go into the SQL `WHERE` clause before `LIMIT` is applied**,
129/// so they shape the candidate pool itself — never just trim a pre-built
130/// majority pool after the fact. This is the load-bearing fix from
131/// BLUEPRINT §17.5 PR-C: with thousands of records from one adapter and a
132/// handful from another, post-filter shrinkage can leave the minority
133/// adapter's results empty even when they're the best match.
134///
135/// Empty filter (all fields `None`) is a no-op — the original
136/// `WHERE chunks_fts MATCH ?` / `WHERE e.model_id = ?` is preserved.
137#[derive(Debug, Clone, Default, PartialEq, Eq)]
138pub struct SearchFilter {
139    /// Adapter id (e.g. `"claude-code"`, `"mem0"`). When set, only chunks
140    /// belonging to records from this adapter survive.
141    pub source: Option<String>,
142    /// Instance discriminator. Only meaningful when `source` is also set
143    /// (the SQL key is `(adapter, instance)`).
144    pub instance: Option<String>,
145    /// `Kind` string: `"fact"` / `"preference"` / `"feedback"` / `"reference"`
146    /// / `"episode"` / `"skill"` / `"unknown"`.
147    pub kind: Option<String>,
148    /// `Scope` string: `"user"` / `"project"` / `"session"` / `"ephemeral"`.
149    pub scope: Option<String>,
150    /// Inclusive lower bound on `records.created_at` (unix epoch seconds).
151    pub time_from: Option<i64>,
152    /// Inclusive upper bound on `records.created_at` (unix epoch seconds).
153    pub time_to: Option<i64>,
154}
155
156impl SearchFilter {
157    /// True when every field is `None` — caller can skip the JOIN.
158    pub fn is_empty(&self) -> bool {
159        self.source.is_none()
160            && self.instance.is_none()
161            && self.kind.is_none()
162            && self.scope.is_none()
163            && self.time_from.is_none()
164            && self.time_to.is_none()
165    }
166}
167
168/// A claimed embedding job.
169#[derive(Debug, Clone, PartialEq)]
170pub struct PendingEmbeddingJob {
171    /// Surrogate primary key from `embedding_jobs.id`.
172    pub job_id: i64,
173    /// Chunk to embed.
174    pub chunk_id: String,
175    /// `blake3` of the chunk content.
176    pub content_hash: ContentHash,
177    /// The model the embedding must be produced under.
178    pub model_id: String,
179    /// The chunk's text — included so the worker doesn't need a second query.
180    pub content: String,
181}
182
183/// Coarse counters for `anamnesis status`.
184#[derive(Debug, Clone, PartialEq, Eq)]
185pub struct StoreStats {
186    /// Records in `records` table.
187    pub records: u64,
188    /// Chunks in `record_chunks`.
189    pub chunks: u64,
190    /// Pending or in-progress embedding jobs.
191    pub jobs_pending: u64,
192    /// Failed embedding jobs (terminal state).
193    pub jobs_failed: u64,
194    /// Distinct `(adapter, instance)` source rows.
195    pub sources: u64,
196}
197
198/// Full row from `sources` — what `list_sources_full` and `get_source`
199/// return. The legacy `list_sources` 3-tuple shape stays for back-compat.
200///
201/// `instance` is the empty string `""` (NOT `None`) to represent the
202/// default instance — that's the canonical key the table uses (see
203/// `0002_phase1.sql`). Callers that work in `Option<String>` must convert
204/// at the boundary.
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct SourceRow {
207    /// Adapter id (e.g. `"claude-code"`).
208    pub adapter: String,
209    /// Instance discriminator — `""` for the default instance.
210    pub instance: String,
211    /// User-registered location (path / URL / connection string). `None`
212    /// when registered without one — `import` will fall back to the
213    /// adapter default and register that as the canonical location.
214    pub location: Option<String>,
215    /// JSON-encoded adapter-specific config, opaque to the store.
216    pub config_json: Option<String>,
217    /// Unix epoch seconds — when the source was first registered.
218    pub added_at: i64,
219    /// Unix epoch seconds — when the last successful (non-dry-run)
220    /// import finished. `None` until the first import lands.
221    pub last_import_at: Option<i64>,
222}
223
224/// Source row joined with its current per-source counts. Returned by
225/// `list_sources_with_counts`; consumed by MCP `list_sources` and CLI
226/// `source list` so agents and operators see how much data is behind
227/// each registered source.
228#[derive(Debug, Clone, PartialEq, Eq)]
229pub struct SourceWithCounts {
230    /// The source registry row itself.
231    pub source: SourceRow,
232    /// Number of distinct records currently in the store for this
233    /// `(adapter, instance)` pair. `0` for a registered-but-never-imported
234    /// source — that's a useful staleness signal, not a defect.
235    pub record_count: u64,
236    /// Number of chunks across all records for this source.
237    pub chunk_count: u64,
238}
239
240/// Maximum `limit` accepted by `list_record_ids_paged` and the MCP
241/// `resources/list` handler. Sized so a single page fits comfortably
242/// in a JSON-RPC response (~ a few hundred KB at most). Round-21
243/// (§-1.5 PR-2).
244pub const MAX_LIST_LIMIT: u32 = 1000;
245
246// ─────────────────────────────────────────────────────────────────────────────
247// Source registry
248// ─────────────────────────────────────────────────────────────────────────────
249
250impl Store {
251    /// Register or update a memory source. Idempotent.
252    ///
253    /// `instance = None` is stored as the empty string `""` because the
254    /// `sources` table uses NOT NULL DEFAULT '' on that column; matching
255    /// against `NULL` would silently miss the row.
256    pub fn register_source(
257        &self,
258        adapter: &str,
259        instance: Option<&str>,
260        location: Option<&str>,
261        config_json: Option<&str>,
262    ) -> Result<()> {
263        let inst = instance.unwrap_or("");
264        self.conn.lock().execute(
265            "INSERT INTO sources(adapter, instance, location, config_json, added_at) \
266             VALUES(?1, ?2, ?3, ?4, strftime('%s','now')) \
267             ON CONFLICT(adapter, instance) DO UPDATE SET \
268               location = excluded.location, \
269               config_json = excluded.config_json",
270            params![adapter, inst, location, config_json],
271        )?;
272        Ok(())
273    }
274
275    /// Look up a single source row by `(adapter, instance)`.
276    ///
277    /// Returns `None` if no row exists. `instance = None` is normalised to
278    /// the empty string for the lookup (see `register_source` rationale).
279    pub fn get_source(&self, adapter: &str, instance: Option<&str>) -> Result<Option<SourceRow>> {
280        let inst = instance.unwrap_or("");
281        let conn = self.conn.lock();
282        let row = conn
283            .query_row(
284                "SELECT adapter, instance, location, config_json, added_at, last_import_at \
285                 FROM sources WHERE adapter = ?1 AND instance = ?2",
286                params![adapter, inst],
287                |r| {
288                    Ok(SourceRow {
289                        adapter: r.get(0)?,
290                        instance: r.get(1)?,
291                        location: r.get(2)?,
292                        config_json: r.get(3)?,
293                        added_at: r.get(4)?,
294                        last_import_at: r.get(5)?,
295                    })
296                },
297            )
298            .optional()?;
299        Ok(row)
300    }
301
302    /// Stamp `last_import_at` for a source.
303    ///
304    /// Returns `Ok(true)` when the source existed and was updated, `Ok(false)`
305    /// when no matching row exists (the caller should usually
306    /// `register_source` first so this can never be `false` on the happy
307    /// path).
308    pub fn update_last_import_at(&self, adapter: &str, instance: Option<&str>) -> Result<bool> {
309        let inst = instance.unwrap_or("");
310        let n = self.conn.lock().execute(
311            "UPDATE sources SET last_import_at = strftime('%s','now') \
312             WHERE adapter = ?1 AND instance = ?2",
313            params![adapter, inst],
314        )?;
315        Ok(n > 0)
316    }
317
318    /// Like `list_sources` but returns the full row shape including
319    /// `added_at` and `last_import_at`. Newer code should prefer this; the
320    /// 3-tuple `list_sources` stays for back-compat with existing callers.
321    pub fn list_sources_full(&self) -> Result<Vec<SourceRow>> {
322        let conn = self.conn.lock();
323        let mut stmt = conn.prepare(
324            "SELECT adapter, instance, location, config_json, added_at, last_import_at \
325             FROM sources ORDER BY adapter, instance",
326        )?;
327        let rows = stmt
328            .query_map([], |r| {
329                Ok(SourceRow {
330                    adapter: r.get(0)?,
331                    instance: r.get(1)?,
332                    location: r.get(2)?,
333                    config_json: r.get(3)?,
334                    added_at: r.get(4)?,
335                    last_import_at: r.get(5)?,
336                })
337            })?
338            .collect::<rusqlite::Result<Vec<_>>>()?;
339        Ok(rows)
340    }
341
342    /// Like `list_sources_full` but also carries per-source record /
343    /// chunk counts so MCP consumers can answer "is this source stale?"
344    /// and "how much data lives behind it?" without a second round
345    /// trip.
346    ///
347    /// Counts are computed via `LEFT JOIN`, so a source that's been
348    /// registered but has never produced records still appears with
349    /// counts of zero — which is exactly the signal an agent needs to
350    /// detect a configured-but-broken adapter.
351    ///
352    /// Aggregation is grouped on `(adapter, instance)` because the
353    /// canonical key in the `sources` table uses `instance=''` for the
354    /// default instance. Grouping on `adapter` alone would silently
355    /// merge multiple instances of the same adapter into one row.
356    pub fn list_sources_with_counts(&self) -> Result<Vec<SourceWithCounts>> {
357        let conn = self.conn.lock();
358        let mut stmt = conn.prepare(
359            "SELECT s.adapter, s.instance, s.location, s.config_json, \
360                    s.added_at, s.last_import_at, \
361                    COUNT(DISTINCT r.id) AS record_count, \
362                    COUNT(rc.id)         AS chunk_count \
363             FROM sources s \
364             LEFT JOIN records r \
365                    ON r.adapter = s.adapter AND r.instance = s.instance \
366             LEFT JOIN record_chunks rc \
367                    ON rc.record_id = r.id \
368             GROUP BY s.adapter, s.instance \
369             ORDER BY s.adapter, s.instance",
370        )?;
371        let rows = stmt
372            .query_map([], |r| {
373                Ok(SourceWithCounts {
374                    source: SourceRow {
375                        adapter: r.get(0)?,
376                        instance: r.get(1)?,
377                        location: r.get(2)?,
378                        config_json: r.get(3)?,
379                        added_at: r.get(4)?,
380                        last_import_at: r.get(5)?,
381                    },
382                    record_count: r.get::<_, i64>(6)? as u64,
383                    chunk_count: r.get::<_, i64>(7)? as u64,
384                })
385            })?
386            .collect::<rusqlite::Result<Vec<_>>>()?;
387        Ok(rows)
388    }
389
390    /// Forget a source. Does NOT cascade-delete records (those keep their
391    /// own provenance and can be inspected even after the source is gone).
392    pub fn deregister_source(&self, adapter: &str, instance: Option<&str>) -> Result<()> {
393        let inst = instance.unwrap_or("");
394        self.conn.lock().execute(
395            "DELETE FROM sources WHERE adapter = ?1 AND instance = ?2",
396            params![adapter, inst],
397        )?;
398        Ok(())
399    }
400
401    /// List configured sources as `(adapter, instance, location)` triples.
402    pub fn list_sources(&self) -> Result<Vec<(String, String, Option<String>)>> {
403        let conn = self.conn.lock();
404        let mut stmt = conn.prepare(
405            "SELECT adapter, instance, location FROM sources ORDER BY adapter, instance",
406        )?;
407        let rows = stmt
408            .query_map([], |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)))?
409            .collect::<rusqlite::Result<Vec<_>>>()?;
410        Ok(rows)
411    }
412}
413
414// ─────────────────────────────────────────────────────────────────────────────
415// Active embedding model (single-writer config knob)
416// ─────────────────────────────────────────────────────────────────────────────
417
418impl Store {
419    /// Set the active embedding model. New chunks will enqueue jobs against
420    /// this model. Switching models does NOT retroactively rebuild
421    /// embeddings; callers (the CLI `model use` command) decide whether to
422    /// also call `rebuild_embedding_jobs`.
423    pub fn set_active_model(&self, model_id: &str) -> Result<()> {
424        self.conn.lock().execute(
425            "INSERT INTO meta(key, value) VALUES('active_embedding_model', ?1) \
426             ON CONFLICT(key) DO UPDATE SET value = excluded.value",
427            params![model_id],
428        )?;
429        Ok(())
430    }
431
432    /// Returns the active model id, if any.
433    pub fn active_model(&self) -> Result<Option<String>> {
434        let v: Option<String> = self
435            .conn
436            .lock()
437            .query_row(
438                "SELECT value FROM meta WHERE key = 'active_embedding_model'",
439                [],
440                |r| r.get(0),
441            )
442            .optional()?;
443        Ok(v)
444    }
445}
446
447// ─────────────────────────────────────────────────────────────────────────────
448// Records + chunks (atomic upsert)
449// ─────────────────────────────────────────────────────────────────────────────
450
451impl Store {
452    /// Atomically upsert a record, its chunks, and (optionally) its raw
453    /// artifact. Old chunks for this record are deleted first so re-chunking
454    /// is consistent. Embedding jobs are enqueued for every chunk against
455    /// the current active model (if any); duplicates are no-ops.
456    ///
457    /// Returns `(records_added_or_updated, chunks_written)`. Both counts
458    /// are 1/N — meaningful for tests and import job summaries.
459    /// Returns `(records_written, chunks_written)`. Both are `0` when the
460    /// record already exists with an identical `raw_hash` (= the source
461    /// payload byte-for-byte unchanged), in which case **the call is a
462    /// total no-op**: no `records` rewrite, no `raw_artifacts` rewrite,
463    /// and crucially no `record_chunks` DELETE / INSERT — which is what
464    /// keeps the jieba `chunks_ai` / `chunks_ad` triggers from firing
465    /// 99,716 times on a re-import (see `docs/verification/round-6-
466    /// embedding-dogfood.md` Finding 2 for the regression this fixes).
467    ///
468    /// The fast-path check happens **before** any DELETE so the AFTER
469    /// DELETE trigger never runs on unchanged content. Putting the check
470    /// after the DELETE would wipe the entire performance win — the
471    /// tokenize_cjk(old.content) call inside `chunks_ad` is the
472    /// expensive piece, not the INSERT.
473    ///
474    /// raw_hash is a pure function of the source payload (see each
475    /// adapter's `normalize_*` for the blake3 input), so equal raw_hash
476    /// guarantees the normalized record and its chunks are identical
477    /// to what's in the store. Tags / metadata / scope / kind cannot
478    /// drift independently of the source payload because every
479    /// normalizer derives them deterministically from the same source
480    /// bytes that produce raw_hash.
481    pub fn upsert_record(
482        &self,
483        record: &AnamnesisRecord,
484        chunks: &[Chunk],
485        raw_payload_json: Option<&str>,
486    ) -> Result<(u64, u64)> {
487        let active = self.active_model()?;
488        let mut conn = self.conn.lock();
489        let tx = conn.transaction()?;
490
491        // Fast-path. The check must run before write_record / write_chunks
492        // so neither the records UPSERT nor the chunks DELETE+INSERT fires
493        // when nothing has changed.
494        let existing_hash: Option<String> = tx
495            .query_row(
496                "SELECT raw_hash FROM records WHERE id = ?1",
497                params![record.id.0],
498                |r| r.get::<_, String>(0),
499            )
500            .optional()?;
501        if existing_hash.as_deref() == Some(record.provenance.raw_hash.as_str()) {
502            // Nothing to do — but we still want any pending embedding
503            // jobs to be enqueued under the active model if they aren't
504            // already (e.g. user switched models since last import).
505            // `enqueue_jobs` is ON CONFLICT DO NOTHING, so this is safe
506            // and cheap (no jieba calls, no chunk rewrite).
507            if let Some(model_id) = active.as_deref() {
508                let now = chrono::Utc::now().timestamp();
509                enqueue_jobs(&tx, chunks, model_id, now)?;
510            }
511            tx.commit()?;
512            return Ok((0, 0));
513        }
514
515        let now = chrono::Utc::now().timestamp();
516        write_record(&tx, record)?;
517        write_raw_artifact(&tx, record, raw_payload_json, now)?;
518        write_chunks(&tx, record, chunks)?;
519        if let Some(model_id) = active.as_deref() {
520            enqueue_jobs(&tx, chunks, model_id, now)?;
521        }
522        tx.commit()?;
523        Ok((1, chunks.len() as u64))
524    }
525
526    /// Re-enqueue embedding jobs for every chunk under a different model.
527    /// Used by `anamnesis model use <other>` to trigger a full re-embed.
528    pub fn rebuild_embedding_jobs(&self, model_id: &str) -> Result<u64> {
529        let now = chrono::Utc::now().timestamp();
530        let n = self.conn.lock().execute(
531            "INSERT INTO embedding_jobs(chunk_id, content_hash, model_id, status, enqueued_at) \
532             SELECT id, content_hash, ?1, 'pending', ?2 FROM record_chunks \
533             WHERE TRUE ON CONFLICT(chunk_id, model_id) DO NOTHING",
534            params![model_id, now],
535        )?;
536        Ok(n as u64)
537    }
538}
539
540fn write_record(tx: &Transaction<'_>, r: &AnamnesisRecord) -> Result<()> {
541    let tags = if r.tags.is_empty() {
542        None
543    } else {
544        Some(serde_json::to_string(&r.tags).unwrap_or_default())
545    };
546    let metadata = if r.metadata.is_empty() {
547        None
548    } else {
549        Some(serde_json::to_string(&r.metadata).unwrap_or_default())
550    };
551    tx.execute(
552        "INSERT INTO records(\
553            id, adapter, instance, content, scope, kind, \
554            created_at, updated_at, tags, metadata, \
555            native_id, native_path, captured_at, raw_hash, schema_version, \
556            derived_from\
557         ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16) \
558         ON CONFLICT(id) DO UPDATE SET \
559            content = excluded.content, \
560            scope = excluded.scope, \
561            kind = excluded.kind, \
562            updated_at = excluded.updated_at, \
563            tags = excluded.tags, \
564            metadata = excluded.metadata, \
565            native_path = excluded.native_path, \
566            raw_hash = excluded.raw_hash, \
567            derived_from = excluded.derived_from",
568        params![
569            r.id.0,
570            r.source.adapter,
571            r.source.instance.as_deref().unwrap_or(""),
572            r.content,
573            scope_str(r.scope),
574            kind_str(r.kind),
575            ts(r.created_at),
576            r.updated_at.map(ts),
577            tags,
578            metadata,
579            r.provenance.native_id,
580            r.provenance.native_path,
581            ts(r.provenance.captured_at),
582            r.provenance.raw_hash,
583            r.schema_version,
584            r.provenance.derived_from.as_ref().map(|rid| rid.0.clone()),
585        ],
586    )?;
587    Ok(())
588}
589
590fn write_raw_artifact(
591    tx: &Transaction<'_>,
592    r: &AnamnesisRecord,
593    payload_json: Option<&str>,
594    now: i64,
595) -> Result<()> {
596    // Source vectors are kept for provenance ONLY — never queried.
597    let (src_emb, src_model, src_dim) = match &r.embedding {
598        Some(e) => (
599            Some(f32_to_blob(&e.vector)),
600            Some(e.model.clone()),
601            Some(e.dim as i64),
602        ),
603        None => (None, None, None),
604    };
605    tx.execute(
606        "INSERT INTO raw_artifacts(record_id, payload_json, source_embedding, \
607            source_embedding_model, source_embedding_dim, captured_at) \
608         VALUES(?1, ?2, ?3, ?4, ?5, ?6) \
609         ON CONFLICT(record_id) DO UPDATE SET \
610            payload_json = excluded.payload_json, \
611            source_embedding = excluded.source_embedding, \
612            source_embedding_model = excluded.source_embedding_model, \
613            source_embedding_dim = excluded.source_embedding_dim, \
614            captured_at = excluded.captured_at",
615        params![
616            r.id.0,
617            payload_json,
618            src_emb.as_deref(),
619            src_model,
620            src_dim,
621            now,
622        ],
623    )?;
624    Ok(())
625}
626
627fn write_chunks(tx: &Transaction<'_>, r: &AnamnesisRecord, chunks: &[Chunk]) -> Result<()> {
628    // Re-chunking is a clean replace.
629    tx.execute(
630        "DELETE FROM record_chunks WHERE record_id = ?1",
631        params![r.id.0],
632    )?;
633    for c in chunks {
634        let cid = format!("{}:{}", c.record_id.0, c.seq);
635        tx.execute(
636            "INSERT INTO record_chunks(id, record_id, seq, content, content_hash, token_estimate) \
637             VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
638            params![
639                cid,
640                c.record_id.0,
641                c.seq,
642                c.content,
643                c.content_hash.0,
644                c.token_estimate
645            ],
646        )?;
647    }
648    Ok(())
649}
650
651fn enqueue_jobs(tx: &Transaction<'_>, chunks: &[Chunk], model_id: &str, now: i64) -> Result<()> {
652    for c in chunks {
653        let cid = format!("{}:{}", c.record_id.0, c.seq);
654        tx.execute(
655            "INSERT INTO embedding_jobs(chunk_id, content_hash, model_id, status, enqueued_at) \
656             VALUES(?1, ?2, ?3, 'pending', ?4) \
657             ON CONFLICT(chunk_id, model_id) DO NOTHING",
658            params![cid, c.content_hash.0, model_id, now],
659        )?;
660    }
661    Ok(())
662}
663
664// ─────────────────────────────────────────────────────────────────────────────
665// Record reads
666// ─────────────────────────────────────────────────────────────────────────────
667
668impl Store {
669    /// Return the most recently created record ids, newest first.
670    ///
671    /// Used by MCP `resources/list` to enumerate concrete record URIs
672    /// — generic-mcp loopback (Anamnesis → Anamnesis) needs real URIs
673    /// to consume, not just `anamnesis://record/{id}` templates that
674    /// the adapter (correctly) filters out.
675    ///
676    /// `limit` is bounded — the resource catalogue is meant to be a
677    /// window into the store, not a full dump. 100 is a reasonable
678    /// default for "what's recent enough to be worth surfacing".
679    pub fn list_recent_record_ids(&self, limit: u32) -> Result<Vec<String>> {
680        let conn = self.conn.lock();
681        let mut stmt = conn.prepare("SELECT id FROM records ORDER BY created_at DESC LIMIT ?1")?;
682        let rows = stmt
683            .query_map(params![limit], |r| r.get::<_, String>(0))?
684            .collect::<rusqlite::Result<Vec<_>>>()?;
685        Ok(rows)
686    }
687
688    /// Paged listing of record ids for **complete migration**.
689    ///
690    /// Round-21 (§-1.5 PR-2): the original `list_recent_record_ids` is
691    /// a "what's recent" window; this is "give me everything, page by
692    /// page" so a downstream generic-mcp client can pull the entire
693    /// catalogue without dropping records past the 100-row cap.
694    ///
695    /// Ordering: lexicographic ascending by id. Record ids are
696    /// content-derived (blake3 of provenance triple), so the order is
697    /// stable across calls and across hosts — making cursor-based
698    /// pagination an opaque string the client just round-trips.
699    ///
700    /// Contract:
701    ///   * `cursor = None` → return the first `limit` ids.
702    ///   * `cursor = Some(last_id)` → return the next `limit` ids
703    ///     STRICTLY AFTER `last_id` in ascending order.
704    ///   * `limit` is clamped to `[1, MAX_LIST_LIMIT]`.
705    ///   * Returns `(ids, next_cursor)`. `next_cursor` is `Some(last)`
706    ///     when the page hit the limit (i.e. another page may exist),
707    ///     `None` when the page returned fewer than `limit` rows (= end
708    ///     of catalogue).
709    pub fn list_record_ids_paged(
710        &self,
711        cursor: Option<&str>,
712        limit: u32,
713    ) -> Result<(Vec<String>, Option<String>)> {
714        let limit = limit.clamp(1, MAX_LIST_LIMIT);
715        let conn = self.conn.lock();
716        // `stmt` must outlive the iterator from `query_map`; bind it
717        // explicitly in each branch to give the iterator a stable
718        // borrow for the duration of the `collect`.
719        let rows: Vec<String> = match cursor {
720            Some(c) => {
721                let mut stmt =
722                    conn.prepare("SELECT id FROM records WHERE id > ?1 ORDER BY id ASC LIMIT ?2")?;
723                let out = stmt
724                    .query_map(params![c, limit], |r| r.get::<_, String>(0))?
725                    .collect::<rusqlite::Result<Vec<_>>>()?;
726                out
727            }
728            None => {
729                let mut stmt = conn.prepare("SELECT id FROM records ORDER BY id ASC LIMIT ?1")?;
730                let out = stmt
731                    .query_map(params![limit], |r| r.get::<_, String>(0))?
732                    .collect::<rusqlite::Result<Vec<_>>>()?;
733                out
734            }
735        };
736        // `next_cursor` is the last row's id IFF we hit the limit —
737        // otherwise we're at the end and signal that to the caller.
738        let next = if rows.len() as u32 == limit {
739            rows.last().cloned()
740        } else {
741            None
742        };
743        Ok((rows, next))
744    }
745
746    /// Fetch a record by id.
747    pub fn get_record(&self, id: &RecordId) -> Result<Option<AnamnesisRecord>> {
748        let conn = self.conn.lock();
749        let mut stmt = conn.prepare(
750            "SELECT id, adapter, instance, content, scope, kind, \
751                    created_at, updated_at, tags, metadata, \
752                    native_id, native_path, captured_at, raw_hash, schema_version, \
753                    derived_from \
754             FROM records WHERE id = ?1",
755        )?;
756        let row = stmt.query_row(params![id.0], record_from_row).optional()?;
757        Ok(row)
758    }
759
760    /// Direct children of `parent` — records whose
761    /// `provenance.derived_from == parent`. Hits the
762    /// `idx_records_derived_from` partial index (see migration 0004).
763    ///
764    /// Limit is clamped to `[1, MAX_LIST_LIMIT]` to match the rest of
765    /// the listing API. Pass a high limit if you genuinely want every
766    /// child — the partial index keeps the query cheap.
767    ///
768    /// Used by `anamnesis lineage` to show the §-1.5 PR-6 audit trail
769    /// (which Facts/Preferences/Skills got distilled out of a given
770    /// Episode).
771    pub fn list_derivations(&self, parent: &RecordId, limit: u32) -> Result<Vec<AnamnesisRecord>> {
772        let limit = limit.clamp(1, MAX_LIST_LIMIT);
773        let conn = self.conn.lock();
774        let mut stmt = conn.prepare(
775            "SELECT id, adapter, instance, content, scope, kind, \
776                    created_at, updated_at, tags, metadata, \
777                    native_id, native_path, captured_at, raw_hash, schema_version, \
778                    derived_from \
779             FROM records \
780             WHERE derived_from = ?1 \
781             ORDER BY created_at ASC, id ASC \
782             LIMIT ?2",
783        )?;
784        let rows = stmt
785            .query_map(params![parent.0, limit], record_from_row)?
786            .collect::<rusqlite::Result<Vec<_>>>()?;
787        Ok(rows)
788    }
789
790    /// Walk `start` → `start.derived_from` → `…` up to the root of the
791    /// lineage chain. The returned `Vec` is ordered child-first: index 0
792    /// is `start` itself, the last element is the root (a record whose
793    /// `derived_from` is `None`, or the deepest record still in the store
794    /// — broken parents are tolerated but reported via the second tuple
795    /// element).
796    ///
797    /// Cycle-safe: if a malformed write ever creates `A → B → A`, the
798    /// walk stops at the second encounter and the cycle is signaled as
799    /// `Err(StoreError::Corruption)` so callers can surface the
800    /// corruption instead of silently truncating.
801    ///
802    /// Returns `Ok(None)` when `start` itself doesn't exist.
803    pub fn lineage_chain(&self, start: &RecordId) -> Result<Option<LineageChain>> {
804        let mut chain: Vec<AnamnesisRecord> = Vec::new();
805        let mut seen = std::collections::HashSet::new();
806        let mut cursor = Some(start.clone());
807        let mut missing_parent: Option<RecordId> = None;
808
809        while let Some(cur) = cursor {
810            if !seen.insert(cur.0.clone()) {
811                return Err(StoreError::Corruption(format!(
812                    "lineage cycle detected at {}",
813                    cur.0
814                )));
815            }
816            match self.get_record(&cur)? {
817                Some(record) => {
818                    let next = record.provenance.derived_from.clone();
819                    chain.push(record);
820                    cursor = next;
821                }
822                None => {
823                    // Parent record is missing. If this is the first hop,
824                    // the caller's `start` doesn't exist — return None.
825                    if chain.is_empty() {
826                        return Ok(None);
827                    }
828                    missing_parent = Some(cur);
829                    break;
830                }
831            }
832        }
833
834        Ok(Some(LineageChain {
835            records: chain,
836            missing_parent,
837        }))
838    }
839
840    /// Per-record summary an MCP consumer needs to decide what to do
841    /// with a hit (or with `get_record` output) without a second
842    /// round trip: how many chunks live behind this record, how many
843    /// are embedded under the *active* model, and whether the source
844    /// adapter included its own pre-existing embedding for provenance.
845    ///
846    /// Returns `None` when no record with `id` exists. The active-model
847    /// chunk count is deliberately scoped: an embedding produced under
848    /// a previous model (e.g. before `anamnesis model use`) does NOT
849    /// count toward "ready for vector search right now". This matches
850    /// the contract `search_chunks_vec` enforces (it filters on the
851    /// caller's `model_id`).
852    pub fn record_summary(&self, id: &RecordId) -> Result<Option<RecordSummary>> {
853        let conn = self.conn.lock();
854
855        // Cheap probe — does the record exist?
856        let exists: bool = conn
857            .query_row("SELECT 1 FROM records WHERE id = ?1", params![id.0], |_| {
858                Ok(true)
859            })
860            .optional()?
861            .unwrap_or(false);
862        if !exists {
863            return Ok(None);
864        }
865
866        let chunk_count: i64 = conn.query_row(
867            "SELECT COUNT(*) FROM record_chunks WHERE record_id = ?1",
868            params![id.0],
869            |r| r.get(0),
870        )?;
871
872        // Active model — None when the user has never set one.
873        let active_model: Option<String> = conn
874            .query_row(
875                "SELECT value FROM meta WHERE key = 'active_embedding_model'",
876                [],
877                |r| r.get(0),
878            )
879            .optional()?;
880
881        // Chunks that have a fresh embedding under the active model.
882        // Returns 0 when active_model is None or no embeddings exist.
883        let embedded_chunk_count: i64 = match active_model.as_deref() {
884            Some(model) => conn.query_row(
885                "SELECT COUNT(*) FROM chunk_embeddings e \
886                 JOIN record_chunks rc ON rc.id = e.chunk_id \
887                 WHERE rc.record_id = ?1 AND e.model_id = ?2",
888                params![id.0, model],
889                |r| r.get(0),
890            )?,
891            None => 0,
892        };
893
894        // Source-vector presence — never the vector itself; just a
895        // tiny breadcrumb so the agent knows mem0's OpenAI embeddings
896        // (etc.) are on file as provenance.
897        let (source_model, source_dim): (Option<String>, Option<i64>) = conn
898            .query_row(
899                "SELECT source_embedding_model, source_embedding_dim \
900                 FROM raw_artifacts WHERE record_id = ?1",
901                params![id.0],
902                |r| Ok((r.get(0)?, r.get(1)?)),
903            )
904            .optional()?
905            .unwrap_or((None, None));
906
907        Ok(Some(RecordSummary {
908            chunk_count: chunk_count as u64,
909            embedded_chunk_count: embedded_chunk_count as u64,
910            active_model,
911            source_embedding_model: source_model,
912            source_embedding_dim: source_dim.map(|d| d as u32),
913        }))
914    }
915
916    /// Fetch one chunk by its id.
917    ///
918    /// `chunk_id` is the synthetic `"{record_id}:{seq}"` string written
919    /// by `write_chunks`. We don't parse it here — instead we JOIN
920    /// `record_chunks` against `records` so the returned parent
921    /// `record_id` survives any future change to the chunk-id format
922    /// without callers having to update.
923    pub fn get_chunk(&self, chunk_id: &str) -> Result<Option<ChunkLookup>> {
924        let conn = self.conn.lock();
925        conn.query_row(
926            "SELECT rc.id, rc.record_id, rc.seq, rc.content, \
927                    rc.content_hash, rc.token_estimate \
928             FROM record_chunks rc \
929             WHERE rc.id = ?1",
930            params![chunk_id],
931            |r| {
932                Ok(ChunkLookup {
933                    chunk_id: r.get(0)?,
934                    record_id: RecordId(r.get(1)?),
935                    seq: r.get::<_, i64>(2)? as u32,
936                    content: r.get(3)?,
937                    content_hash: ContentHash(r.get(4)?),
938                    token_estimate: r.get::<_, i64>(5)? as u32,
939                })
940            },
941        )
942        .optional()
943        .map_err(Into::into)
944    }
945}
946
947/// Lightweight per-record summary an MCP / CLI consumer needs to decide
948/// what to do with a `get_record` result without a second round trip.
949/// Computed by `Store::record_summary`.
950#[derive(Debug, Clone, PartialEq, Eq)]
951pub struct RecordSummary {
952    /// Number of chunks behind this record.
953    pub chunk_count: u64,
954    /// Number of chunks that have a fresh embedding under the *active*
955    /// embedding model. Equal to `chunk_count` when the record is
956    /// fully ready for vector search; less when the embedder hasn't
957    /// caught up; `0` when no active model is configured.
958    pub embedded_chunk_count: u64,
959    /// The currently-active embedding model id (e.g.
960    /// `"local:default:1"`). `None` when no model is set.
961    pub active_model: Option<String>,
962    /// If the source adapter shipped a pre-existing embedding for this
963    /// record's raw payload, this is its model id (informational only —
964    /// source vectors NEVER reach retrieval per BLUEPRINT §6.6.1).
965    pub source_embedding_model: Option<String>,
966    /// Dimensionality of the source embedding, when present.
967    pub source_embedding_dim: Option<u32>,
968}
969
970/// Result of `Store::lineage_chain` — an ordered walk from a starting
971/// record up to the root of its `provenance.derived_from` chain.
972///
973/// `records[0]` is the record the caller asked about (the leaf). The
974/// last element is whichever ancestor terminated the walk:
975///
976/// - if it has `provenance.derived_from == None`, it's the true root;
977/// - if `missing_parent` is `Some`, the walk stopped because that
978///   parent id wasn't in the store (e.g. it was deleted, or the
979///   derived record was created with a dangling lineage reference).
980///   The chain is still usable; callers can surface the dangling id.
981///
982/// Cycles cause `Store::lineage_chain` to return `Err`, not a truncated
983/// `LineageChain`.
984#[derive(Debug, Clone, PartialEq)]
985pub struct LineageChain {
986    /// Records from leaf to root (or as far up as the chain is intact).
987    pub records: Vec<AnamnesisRecord>,
988    /// If the walk stopped because a parent `RecordId` wasn't in the
989    /// store, this is that missing id. `None` when the walk reached a
990    /// real root (a record with `derived_from = None`).
991    pub missing_parent: Option<RecordId>,
992}
993
994/// One chunk row, joined with enough provenance for downstream tools
995/// (currently `trace_provenance`) to surface chunk-level debug info
996/// without a second round trip.
997#[derive(Debug, Clone, PartialEq, Eq)]
998pub struct ChunkLookup {
999    /// The synthetic chunk id (`"{record_id}:{seq}"`).
1000    pub chunk_id: String,
1001    /// Parent record id.
1002    pub record_id: RecordId,
1003    /// Per-record chunk index.
1004    pub seq: u32,
1005    /// Chunk text content (original, NOT jieba-tokenized).
1006    pub content: String,
1007    /// `blake3` of the content — match key for embedding-job dedup.
1008    pub content_hash: ContentHash,
1009    /// Heuristic token count used by the chunker.
1010    pub token_estimate: u32,
1011}
1012
1013fn record_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<AnamnesisRecord> {
1014    let tags_json: Option<String> = row.get(8)?;
1015    let meta_json: Option<String> = row.get(9)?;
1016    let updated_at: Option<i64> = row.get(7)?;
1017    let instance: String = row.get(2)?;
1018    Ok(AnamnesisRecord {
1019        id: RecordId(row.get(0)?),
1020        source: SourceDescriptor {
1021            adapter: row.get(1)?,
1022            instance: if instance.is_empty() {
1023                None
1024            } else {
1025                Some(instance)
1026            },
1027            version: String::new(), // store doesn't track adapter self-version
1028        },
1029        content: row.get(3)?,
1030        embedding: None, // source vectors live in raw_artifacts (provenance only)
1031        scope: scope_from(&row.get::<_, String>(4)?),
1032        kind: kind_from(&row.get::<_, String>(5)?),
1033        created_at: dt(row.get(6)?),
1034        updated_at: updated_at.map(dt),
1035        tags: tags_json
1036            .and_then(|s| serde_json::from_str(&s).ok())
1037            .unwrap_or_default(),
1038        metadata: meta_json
1039            .and_then(|s| serde_json::from_str(&s).ok())
1040            .unwrap_or_default(),
1041        provenance: Provenance {
1042            native_id: row.get(10)?,
1043            native_path: row.get(11)?,
1044            captured_at: dt(row.get(12)?),
1045            raw_hash: row.get(13)?,
1046            derived_from: row.get::<_, Option<String>>(15)?.map(RecordId),
1047        },
1048        schema_version: row.get::<_, i64>(14)? as u32,
1049    })
1050}
1051
1052// ─────────────────────────────────────────────────────────────────────────────
1053// Search
1054// ─────────────────────────────────────────────────────────────────────────────
1055
1056impl Store {
1057    /// FTS5 chunk search. Returns hits ordered by BM25 (lower rank = better);
1058    /// `score` is the bm25() value (negated so larger = more relevant).
1059    pub fn search_chunks_fts(
1060        &self,
1061        query: &str,
1062        filter: &SearchFilter,
1063        limit: u32,
1064    ) -> Result<Vec<ChunkHit>> {
1065        // PR-Jieba (round-5 consult, see `cjk` module): we MUST tokenize
1066        // the query through the same pipeline that indexed the chunks.
1067        // Otherwise FTS5 MATCH compares raw codepoints against the
1068        // jieba-segmented index, and Chinese queries return zero hits.
1069        // The Codex consult flagged this asymmetry as the load-bearing
1070        // trap of the whole feature.
1071        let match_query = crate::cjk::tokenize_query(query);
1072        if match_query.is_empty() {
1073            // FTS5 errors on empty MATCH; an empty user query has no
1074            // searchable tokens, so zero hits is the right answer.
1075            return Ok(Vec::new());
1076        }
1077
1078        // Build the SQL + bound parameters together — the candidate pool
1079        // is filtered BEFORE the `LIMIT` truncates it.
1080        // The first two bound params are always (query, limit); filter
1081        // params start at index 3 in declaration order below.
1082        // All placeholders are anonymous `?`. SQLite forbids mixing
1083        // numbered (`?1`) and unnumbered placeholders within one
1084        // statement, which is exactly what would happen if we kept the
1085        // pre-PR-C `?1` MATCH placeholder and appended `?` filter
1086        // predicates after it.
1087        let mut sql = String::from(
1088            "SELECT rc.id, rc.record_id, rc.seq, rc.content, bm25(chunks_fts) AS score \
1089             FROM chunks_fts \
1090             JOIN record_chunks rc ON rc.rowid = chunks_fts.rowid",
1091        );
1092        let need_records_join = !filter.is_empty();
1093        if need_records_join {
1094            sql.push_str(" JOIN records r ON r.id = rc.record_id");
1095        }
1096        sql.push_str(" WHERE chunks_fts MATCH ?");
1097        let filter_params = append_filter_predicates(&mut sql, filter);
1098        sql.push_str(" ORDER BY score LIMIT ?");
1099
1100        let conn = self.conn.lock();
1101        let mut stmt = conn.prepare(&sql)?;
1102        let mut bound: Vec<rusqlite::types::Value> = Vec::with_capacity(2 + filter_params.len());
1103        bound.push(rusqlite::types::Value::Text(match_query));
1104        bound.extend(filter_params);
1105        bound.push(rusqlite::types::Value::Integer(limit as i64));
1106        let rows = stmt
1107            .query_map(rusqlite::params_from_iter(bound.iter()), |r| {
1108                let raw_score: f64 = r.get(4)?;
1109                Ok(ChunkHit {
1110                    chunk_id: r.get(0)?,
1111                    record_id: RecordId(r.get(1)?),
1112                    seq: r.get::<_, i64>(2)? as u32,
1113                    content: r.get(3)?,
1114                    score: -raw_score, // bm25 returns negative-ish; flip so > is better
1115                })
1116            })?
1117            .collect::<rusqlite::Result<Vec<_>>>()?;
1118        Ok(rows)
1119    }
1120
1121    /// Linear-scan vector search over `chunk_embeddings` filtered by
1122    /// `model_id`. Acceptable for Phase-1 corpora (<100k chunks per
1123    /// BLUEPRINT §12). sqlite-vec swap-in lives behind the same API.
1124    ///
1125    /// `filter` is pushed into the SQL `WHERE` (joined against
1126    /// `record_chunks` → `records`) so the cosine pass only scores
1127    /// candidates that already match — this matters at 1700+:7
1128    /// distributions where any post-filter would leave the minority
1129    /// adapter empty.
1130    pub fn search_chunks_vec(
1131        &self,
1132        query_vec: &[f32],
1133        model_id: &str,
1134        filter: &SearchFilter,
1135        limit: u32,
1136    ) -> Result<Vec<ChunkHit>> {
1137        let mut sql = String::from(
1138            "SELECT e.chunk_id, e.embedding, rc.record_id, rc.seq, rc.content \
1139             FROM chunk_embeddings e \
1140             JOIN record_chunks rc ON rc.id = e.chunk_id",
1141        );
1142        let need_records_join = !filter.is_empty();
1143        if need_records_join {
1144            sql.push_str(" JOIN records r ON r.id = rc.record_id");
1145        }
1146        sql.push_str(" WHERE e.model_id = ?");
1147        let filter_params = append_filter_predicates(&mut sql, filter);
1148        // No LIMIT on the SQL — we still have to score every survivor; the
1149        // top-k cut happens in Rust after cosine. But the survivor set is
1150        // now bounded by the filter, not by the entire embedding table.
1151
1152        let conn = self.conn.lock();
1153        let mut stmt = conn.prepare(&sql)?;
1154        let mut bound: Vec<rusqlite::types::Value> = Vec::with_capacity(1 + filter_params.len());
1155        bound.push(rusqlite::types::Value::Text(model_id.to_string()));
1156        bound.extend(filter_params);
1157        let mut scored: Vec<ChunkHit> = Vec::new();
1158        let rows = stmt.query_map(rusqlite::params_from_iter(bound.iter()), |r| {
1159            Ok((
1160                r.get::<_, String>(0)?,
1161                r.get::<_, Vec<u8>>(1)?,
1162                r.get::<_, String>(2)?,
1163                r.get::<_, i64>(3)?,
1164                r.get::<_, String>(4)?,
1165            ))
1166        })?;
1167        for row in rows {
1168            let (chunk_id, blob, rid, seq, content) = row?;
1169            let v = blob_to_f32(&blob)?;
1170            let score = cosine(query_vec, &v);
1171            scored.push(ChunkHit {
1172                chunk_id,
1173                record_id: RecordId(rid),
1174                seq: seq as u32,
1175                content,
1176                score,
1177            });
1178        }
1179        scored.sort_by(|a, b| {
1180            b.score
1181                .partial_cmp(&a.score)
1182                .unwrap_or(std::cmp::Ordering::Equal)
1183        });
1184        scored.truncate(limit as usize);
1185        Ok(scored)
1186    }
1187}
1188
1189/// Append filter predicates to `sql` and return the bound parameters in
1190/// declaration order. Caller decides where in their param stream these
1191/// land — they're given as positional values via `params_from_iter`.
1192///
1193/// Predicates use `r.<col>`, requiring the caller to have already added
1194/// `JOIN records r ON r.id = rc.record_id` (we don't add it here so the
1195/// SQL builder owns join shape).
1196fn append_filter_predicates(
1197    sql: &mut String,
1198    filter: &SearchFilter,
1199) -> Vec<rusqlite::types::Value> {
1200    use rusqlite::types::Value as V;
1201    let mut params: Vec<V> = Vec::new();
1202    if let Some(s) = &filter.source {
1203        sql.push_str(" AND r.adapter = ?");
1204        params.push(V::Text(s.clone()));
1205    }
1206    if let Some(i) = &filter.instance {
1207        // BLUEPRINT §18 trap: `records.instance` is NOT NULL DEFAULT ''.
1208        // We normalise the *empty / None* case to `''` so SQL key lookup
1209        // never misses, mirroring the sources-registry handling in PR-B.
1210        sql.push_str(" AND r.instance = ?");
1211        params.push(V::Text(i.clone()));
1212    }
1213    if let Some(k) = &filter.kind {
1214        sql.push_str(" AND r.kind = ?");
1215        params.push(V::Text(k.clone()));
1216    }
1217    if let Some(sc) = &filter.scope {
1218        sql.push_str(" AND r.scope = ?");
1219        params.push(V::Text(sc.clone()));
1220    }
1221    if let Some(from) = filter.time_from {
1222        sql.push_str(" AND r.created_at >= ?");
1223        params.push(V::Integer(from));
1224    }
1225    if let Some(to) = filter.time_to {
1226        sql.push_str(" AND r.created_at <= ?");
1227        params.push(V::Integer(to));
1228    }
1229    params
1230}
1231
1232// ─────────────────────────────────────────────────────────────────────────────
1233// Embedding job queue
1234// ─────────────────────────────────────────────────────────────────────────────
1235
1236impl Store {
1237    /// Atomically claim one pending job (pending → in_progress).
1238    /// Returns `None` when the queue is empty.
1239    pub fn claim_next_job(&self, model_id: &str) -> Result<Option<PendingEmbeddingJob>> {
1240        let mut conn = self.conn.lock();
1241        let tx = conn.transaction()?;
1242        let now = chrono::Utc::now().timestamp();
1243        let row: Option<(i64, String, String)> = tx
1244            .query_row(
1245                "SELECT id, chunk_id, content_hash FROM embedding_jobs \
1246                 WHERE status = 'pending' AND model_id = ?1 \
1247                 ORDER BY enqueued_at ASC LIMIT 1",
1248                params![model_id],
1249                |r| Ok((r.get(0)?, r.get(1)?, r.get(2)?)),
1250            )
1251            .optional()?;
1252        let Some((job_id, chunk_id, content_hash)) = row else {
1253            tx.commit()?;
1254            return Ok(None);
1255        };
1256        tx.execute(
1257            "UPDATE embedding_jobs SET status = 'in_progress', claimed_at = ?1 WHERE id = ?2",
1258            params![now, job_id],
1259        )?;
1260        let content: String = tx.query_row(
1261            "SELECT content FROM record_chunks WHERE id = ?1",
1262            params![chunk_id],
1263            |r| r.get(0),
1264        )?;
1265        tx.commit()?;
1266        Ok(Some(PendingEmbeddingJob {
1267            job_id,
1268            chunk_id,
1269            content_hash: ContentHash(content_hash),
1270            model_id: model_id.to_string(),
1271            content,
1272        }))
1273    }
1274
1275    /// Mark a job done and persist its embedding.
1276    pub fn complete_job(&self, job: &PendingEmbeddingJob, vector: &[f32]) -> Result<()> {
1277        let dim = vector.len() as i64;
1278        let blob = f32_to_blob(vector);
1279        let mut conn = self.conn.lock();
1280        let tx = conn.transaction()?;
1281        let now = chrono::Utc::now().timestamp();
1282        tx.execute(
1283            "INSERT INTO chunk_embeddings(chunk_id, model_id, content_hash, dim, embedding, created_at) \
1284             VALUES(?1, ?2, ?3, ?4, ?5, ?6) \
1285             ON CONFLICT(chunk_id, model_id) DO UPDATE SET \
1286                content_hash = excluded.content_hash, \
1287                dim = excluded.dim, \
1288                embedding = excluded.embedding, \
1289                created_at = excluded.created_at",
1290            params![
1291                job.chunk_id,
1292                job.model_id,
1293                job.content_hash.0,
1294                dim,
1295                blob,
1296                now,
1297            ],
1298        )?;
1299        tx.execute(
1300            "UPDATE embedding_jobs SET status = 'done', finished_at = ?1, error = NULL WHERE id = ?2",
1301            params![now, job.job_id],
1302        )?;
1303        tx.commit()?;
1304        Ok(())
1305    }
1306
1307    /// Mark a job failed; the embedder may retry by re-enqueueing later.
1308    pub fn fail_job(&self, job_id: i64, error: &str) -> Result<()> {
1309        let now = chrono::Utc::now().timestamp();
1310        self.conn.lock().execute(
1311            "UPDATE embedding_jobs SET status = 'failed', finished_at = ?1, error = ?2 WHERE id = ?3",
1312            params![now, error, job_id],
1313        )?;
1314        Ok(())
1315    }
1316}
1317
1318// ─────────────────────────────────────────────────────────────────────────────
1319// Import errors + stats
1320// ─────────────────────────────────────────────────────────────────────────────
1321
1322impl Store {
1323    /// Record a non-fatal per-record import error.
1324    pub fn log_import_error(
1325        &self,
1326        adapter: &str,
1327        instance: Option<&str>,
1328        native_id: Option<&str>,
1329        native_path: Option<&str>,
1330        phase: &str,
1331        error: &str,
1332    ) -> Result<()> {
1333        let now = chrono::Utc::now().timestamp();
1334        self.conn.lock().execute(
1335            "INSERT INTO import_errors(adapter, instance, native_id, native_path, phase, error, occurred_at) \
1336             VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7)",
1337            params![adapter, instance.unwrap_or(""), native_id, native_path, phase, error, now],
1338        )?;
1339        Ok(())
1340    }
1341
1342    /// Coarse counters for `anamnesis status`.
1343    pub fn stats(&self) -> Result<StoreStats> {
1344        let conn = self.conn.lock();
1345        let records: i64 = conn.query_row("SELECT COUNT(1) FROM records", [], |r| r.get(0))?;
1346        let chunks: i64 = conn.query_row("SELECT COUNT(1) FROM record_chunks", [], |r| r.get(0))?;
1347        let pending: i64 = conn.query_row(
1348            "SELECT COUNT(1) FROM embedding_jobs WHERE status IN ('pending','in_progress')",
1349            [],
1350            |r| r.get(0),
1351        )?;
1352        let failed: i64 = conn.query_row(
1353            "SELECT COUNT(1) FROM embedding_jobs WHERE status = 'failed'",
1354            [],
1355            |r| r.get(0),
1356        )?;
1357        let sources: i64 = conn.query_row("SELECT COUNT(1) FROM sources", [], |r| r.get(0))?;
1358        Ok(StoreStats {
1359            records: records as u64,
1360            chunks: chunks as u64,
1361            jobs_pending: pending as u64,
1362            jobs_failed: failed as u64,
1363            sources: sources as u64,
1364        })
1365    }
1366}
1367
1368// ─────────────────────────────────────────────────────────────────────────────
1369// Tests
1370// ─────────────────────────────────────────────────────────────────────────────
1371
1372#[cfg(test)]
1373mod tests {
1374    use super::*;
1375    use anamnesis_core::chunker::Chunker;
1376    use anamnesis_core::model::{Embedding, Provenance, SourceDescriptor};
1377    use chrono::Utc;
1378
1379    fn make_record(adapter: &str, native_id: &str, content: &str, kind: Kind) -> AnamnesisRecord {
1380        let id = RecordId::from_parts(adapter, None, native_id);
1381        AnamnesisRecord {
1382            id,
1383            source: SourceDescriptor {
1384                adapter: adapter.into(),
1385                instance: None,
1386                version: "0.0.1".into(),
1387            },
1388            content: content.into(),
1389            embedding: None,
1390            scope: Scope::User,
1391            kind,
1392            created_at: Utc::now(),
1393            updated_at: None,
1394            tags: vec!["t1".into(), "t2".into()],
1395            metadata: Default::default(),
1396            provenance: Provenance {
1397                native_id: native_id.into(),
1398                native_path: Some(format!("/tmp/{native_id}.md")),
1399                captured_at: Utc::now(),
1400                raw_hash: "h".into(),
1401                derived_from: None,
1402            },
1403            schema_version: anamnesis_core::SCHEMA_VERSION,
1404        }
1405    }
1406
1407    #[test]
1408    fn f32_blob_roundtrip() {
1409        let v = vec![0.1f32, -0.2, 1e10, -1e-10, 0.0];
1410        let back = blob_to_f32(&f32_to_blob(&v)).unwrap();
1411        assert_eq!(v, back);
1412    }
1413
1414    #[test]
1415    fn cosine_basic() {
1416        assert!((cosine(&[1.0, 0.0], &[1.0, 0.0]) - 1.0).abs() < 1e-9);
1417        assert!(cosine(&[1.0, 0.0], &[0.0, 1.0]).abs() < 1e-9);
1418        assert!((cosine(&[1.0, 1.0], &[1.0, 1.0]) - 1.0).abs() < 1e-9);
1419    }
1420
1421    #[test]
1422    fn register_and_list_sources() {
1423        let store = Store::open_in_memory().unwrap();
1424        store
1425            .register_source("claude-code", Some("default"), Some("/home/x"), None)
1426            .unwrap();
1427        store
1428            .register_source(
1429                "mem0",
1430                None,
1431                Some("/tmp/m.db"),
1432                Some("{\"mode\":\"sqlite\"}"),
1433            )
1434            .unwrap();
1435        let mut got = store.list_sources().unwrap();
1436        got.sort();
1437        assert_eq!(
1438            got,
1439            vec![
1440                (
1441                    "claude-code".into(),
1442                    "default".into(),
1443                    Some("/home/x".into())
1444                ),
1445                ("mem0".into(), "".into(), Some("/tmp/m.db".into())),
1446            ]
1447        );
1448    }
1449
1450    // ─── PR-B: SourceRow / get_source / update_last_import_at ───
1451
1452    #[test]
1453    fn get_source_normalises_none_instance_to_empty_string() {
1454        // Codex-flagged gotcha: `sources.instance` is NOT NULL DEFAULT ''.
1455        // If callers pass instance=None and we lookup with SQL NULL, the
1456        // row will never be found → silent re-registration. Verify
1457        // get_source matches the same row register_source(None, ...) wrote.
1458        let store = Store::open_in_memory().unwrap();
1459        store
1460            .register_source("mem0", None, Some("/path/db.sqlite"), None)
1461            .unwrap();
1462        let row = store.get_source("mem0", None).unwrap();
1463        let row = row.expect("instance=None must round-trip via get_source");
1464        assert_eq!(row.adapter, "mem0");
1465        assert_eq!(row.instance, "", "default instance stored as empty string");
1466        assert_eq!(row.location.as_deref(), Some("/path/db.sqlite"));
1467        assert!(row.last_import_at.is_none());
1468        // Also: Some("") should not be treated as a distinct instance.
1469        let row_via_empty = store.get_source("mem0", Some("")).unwrap();
1470        assert!(row_via_empty.is_some(), "Some(\"\") must hit same row");
1471    }
1472
1473    #[test]
1474    fn get_source_returns_none_for_unregistered() {
1475        let store = Store::open_in_memory().unwrap();
1476        assert!(store.get_source("claude-code", None).unwrap().is_none());
1477        assert!(store
1478            .get_source("mem0", Some("nonexistent-instance"))
1479            .unwrap()
1480            .is_none());
1481    }
1482
1483    #[test]
1484    fn update_last_import_at_stamps_existing_row() {
1485        let store = Store::open_in_memory().unwrap();
1486        store
1487            .register_source("claude-code", None, Some("/p"), None)
1488            .unwrap();
1489        assert!(store
1490            .get_source("claude-code", None)
1491            .unwrap()
1492            .unwrap()
1493            .last_import_at
1494            .is_none());
1495        let updated = store.update_last_import_at("claude-code", None).unwrap();
1496        assert!(updated, "update returns true when a row was stamped");
1497        let row = store.get_source("claude-code", None).unwrap().unwrap();
1498        assert!(
1499            row.last_import_at.is_some(),
1500            "last_import_at must be non-null after a successful update"
1501        );
1502    }
1503
1504    #[test]
1505    fn update_last_import_at_for_missing_row_returns_false() {
1506        let store = Store::open_in_memory().unwrap();
1507        let updated = store.update_last_import_at("claude-code", None).unwrap();
1508        assert!(
1509            !updated,
1510            "no matching source row → returns Ok(false) without inserting"
1511        );
1512        assert!(store.list_sources().unwrap().is_empty());
1513    }
1514
1515    #[test]
1516    fn register_source_is_idempotent_keeps_added_at_stable() {
1517        // The trap: a second register_source must NOT insert a new row.
1518        // ON CONFLICT keeps added_at fixed (it's only set in INSERT).
1519        let store = Store::open_in_memory().unwrap();
1520        store
1521            .register_source("mem0", None, Some("/path/A"), None)
1522            .unwrap();
1523        let row1 = store.get_source("mem0", None).unwrap().unwrap();
1524        std::thread::sleep(std::time::Duration::from_secs(1));
1525        store
1526            .register_source("mem0", None, Some("/path/B"), None)
1527            .unwrap();
1528        let rows = store.list_sources().unwrap();
1529        assert_eq!(rows.len(), 1, "no duplicate rows");
1530        let row2 = store.get_source("mem0", None).unwrap().unwrap();
1531        assert_eq!(row1.added_at, row2.added_at, "added_at stays stable");
1532        assert_eq!(row2.location.as_deref(), Some("/path/B"));
1533    }
1534
1535    #[test]
1536    fn list_sources_full_carries_all_fields() {
1537        let store = Store::open_in_memory().unwrap();
1538        store
1539            .register_source("claude-code", Some("work"), Some("/work"), Some("{}"))
1540            .unwrap();
1541        store
1542            .update_last_import_at("claude-code", Some("work"))
1543            .unwrap();
1544        store.register_source("mem0", None, None, None).unwrap(); // location=None is valid
1545
1546        let rows = store.list_sources_full().unwrap();
1547        assert_eq!(rows.len(), 2);
1548        let cc = rows.iter().find(|r| r.adapter == "claude-code").unwrap();
1549        assert_eq!(cc.instance, "work");
1550        assert_eq!(cc.location.as_deref(), Some("/work"));
1551        assert_eq!(cc.config_json.as_deref(), Some("{}"));
1552        assert!(cc.last_import_at.is_some());
1553        let mem0 = rows.iter().find(|r| r.adapter == "mem0").unwrap();
1554        assert_eq!(mem0.instance, "");
1555        assert!(mem0.location.is_none());
1556        assert!(mem0.last_import_at.is_none());
1557    }
1558
1559    // ─── Round-9: list_sources_with_counts (per-source aggregation) ───
1560
1561    #[test]
1562    fn list_sources_with_counts_includes_zero_for_never_imported_source() {
1563        // Codex acceptance: a source that's been registered but has no
1564        // records yet must STILL appear with record_count/chunk_count = 0.
1565        // This is the "registered but stale / never imported" signal an
1566        // agent needs to detect a misconfigured adapter.
1567        let store = Store::open_in_memory().unwrap();
1568        store
1569            .register_source("mem0", None, Some("/tmp/missing.db"), None)
1570            .unwrap();
1571        let rows = store.list_sources_with_counts().unwrap();
1572        assert_eq!(rows.len(), 1);
1573        let r = &rows[0];
1574        assert_eq!(r.source.adapter, "mem0");
1575        assert_eq!(r.record_count, 0);
1576        assert_eq!(r.chunk_count, 0);
1577        assert!(r.source.last_import_at.is_none());
1578    }
1579
1580    #[test]
1581    fn list_sources_with_counts_aggregates_records_and_chunks_per_source() {
1582        // Two sources, different shape:
1583        //   claude-code  (default instance): 3 records, 3 chunks
1584        //   mem0         (instance="prod"):  1 record,  1 chunk
1585        let store = Store::open_in_memory().unwrap();
1586        store
1587            .register_source("claude-code", None, Some("/c"), None)
1588            .unwrap();
1589        store
1590            .register_source("mem0", Some("prod"), Some("/m"), None)
1591            .unwrap();
1592
1593        for native in ["a", "b", "c"] {
1594            let r = make_record("claude-code", native, "x", Kind::Fact);
1595            let c = Chunker::default().chunk(&r.id, &r.content);
1596            store.upsert_record(&r, &c, None).unwrap();
1597        }
1598        // Note: make_record sets instance=None, which is stored as "".
1599        // We need a "claude-code"/"" row to match the records above —
1600        // the register_source(None, ...) call already did that.
1601
1602        // For mem0 we need a record under instance="prod" so the JOIN
1603        // hits the right source row. Build it manually.
1604        let mut mem_r = make_record("mem0", "m1", "y", Kind::Fact);
1605        mem_r.source.instance = Some("prod".into());
1606        mem_r.id = RecordId::from_parts("mem0", Some("prod"), "m1");
1607        let mem_c = Chunker::default().chunk(&mem_r.id, &mem_r.content);
1608        store.upsert_record(&mem_r, &mem_c, None).unwrap();
1609
1610        let rows = store.list_sources_with_counts().unwrap();
1611        assert_eq!(rows.len(), 2);
1612        let cc = rows
1613            .iter()
1614            .find(|r| r.source.adapter == "claude-code")
1615            .unwrap();
1616        assert_eq!(
1617            cc.source.instance, "",
1618            "default instance kept as empty string"
1619        );
1620        assert_eq!(cc.record_count, 3);
1621        assert_eq!(cc.chunk_count, 3);
1622        let mem = rows.iter().find(|r| r.source.adapter == "mem0").unwrap();
1623        assert_eq!(
1624            mem.source.instance, "prod",
1625            "instance must round-trip through the JOIN"
1626        );
1627        assert_eq!(mem.record_count, 1);
1628        assert_eq!(mem.chunk_count, 1);
1629    }
1630
1631    #[test]
1632    fn list_sources_with_counts_groups_by_adapter_and_instance_not_just_adapter() {
1633        // Trap Codex flagged: grouping by adapter alone would collapse
1634        // (mem0, "self-hosted") and (mem0, "cloud") into one row even
1635        // when they have different counts. Pin the right behavior here.
1636        let store = Store::open_in_memory().unwrap();
1637        store
1638            .register_source("mem0", Some("self-hosted"), Some("/local"), None)
1639            .unwrap();
1640        store
1641            .register_source("mem0", Some("cloud"), Some("https://x"), None)
1642            .unwrap();
1643
1644        // 2 records under "self-hosted", 0 under "cloud".
1645        for native in ["x", "y"] {
1646            let mut r = make_record("mem0", native, "z", Kind::Fact);
1647            r.source.instance = Some("self-hosted".into());
1648            r.id = RecordId::from_parts("mem0", Some("self-hosted"), native);
1649            let c = Chunker::default().chunk(&r.id, &r.content);
1650            store.upsert_record(&r, &c, None).unwrap();
1651        }
1652
1653        let rows = store.list_sources_with_counts().unwrap();
1654        assert_eq!(rows.len(), 2, "two distinct (adapter, instance) rows");
1655        let local = rows
1656            .iter()
1657            .find(|r| r.source.instance == "self-hosted")
1658            .unwrap();
1659        assert_eq!(local.record_count, 2);
1660        let cloud = rows.iter().find(|r| r.source.instance == "cloud").unwrap();
1661        assert_eq!(cloud.record_count, 0);
1662    }
1663
1664    #[test]
1665    fn upsert_round_trips_record() {
1666        let store = Store::open_in_memory().unwrap();
1667        let r = make_record("claude-code", "n1", "alpha beta gamma", Kind::Preference);
1668        let chunks = Chunker::default().chunk(&r.id, &r.content);
1669        let (added, n_chunks) = store.upsert_record(&r, &chunks, Some("{}")).unwrap();
1670        assert_eq!(added, 1);
1671        assert_eq!(n_chunks, 1);
1672        let back = store.get_record(&r.id).unwrap().unwrap();
1673        assert_eq!(back.id, r.id);
1674        assert_eq!(back.content, r.content);
1675        assert_eq!(back.kind, Kind::Preference);
1676        assert_eq!(back.scope, Scope::User);
1677        assert_eq!(back.tags, vec!["t1".to_string(), "t2".to_string()]);
1678        assert_eq!(back.source.adapter, "claude-code");
1679        assert!(back.source.instance.is_none());
1680    }
1681
1682    // ─── Round-7: write_chunks dedup (BLUEPRINT round-6 Finding 2 fix) ───
1683    //
1684    // Codex's acceptance: re-upserting a record whose raw_hash is
1685    // unchanged must NOT touch record_chunks at all. The win is that
1686    // the AFTER DELETE / AFTER INSERT triggers (which call
1687    // tokenize_cjk(content)) don't fire on no-op re-imports.
1688    //
1689    // The store-level test asserts the invariant by counting trigger
1690    // side effects: chunks_fts row content stays byte-identical across
1691    // the re-upsert, which is only possible if no DELETE+INSERT cycle
1692    // happened.
1693
1694    #[test]
1695    fn reupsert_with_unchanged_raw_hash_returns_zero_zero() {
1696        let store = Store::open_in_memory().unwrap();
1697        let r = make_record("a", "x", "stable content", Kind::Fact);
1698        let c = Chunker::default().chunk(&r.id, &r.content);
1699        let (n1, k1) = store.upsert_record(&r, &c, Some("{\"v\":1}")).unwrap();
1700        assert_eq!((n1, k1), (1, c.len() as u64));
1701
1702        // Second call with the same record (same raw_hash) → no-op.
1703        let (n2, k2) = store.upsert_record(&r, &c, Some("{\"v\":1}")).unwrap();
1704        assert_eq!(
1705            (n2, k2),
1706            (0, 0),
1707            "re-upsert with unchanged raw_hash must report zero work"
1708        );
1709    }
1710
1711    #[test]
1712    fn reupsert_with_unchanged_raw_hash_does_not_touch_chunks() {
1713        // Pin Codex's load-bearing assertion: the row in `chunks_fts`
1714        // must be the SAME row (same rowid, same content) across a no-op
1715        // re-upsert. If write_chunks fired its DELETE+INSERT, the chunk
1716        // would get a fresh rowid (record_chunks.id stays the same but
1717        // SQLite rowid is reassigned on INSERT after DELETE).
1718        let store = Store::open_in_memory().unwrap();
1719        let r = make_record("a", "x", "the quick brown fox", Kind::Fact);
1720        let c = Chunker::default().chunk(&r.id, &r.content);
1721        store.upsert_record(&r, &c, None).unwrap();
1722        let rowid_before: i64 = store
1723            .conn()
1724            .query_row(
1725                "SELECT rowid FROM record_chunks WHERE record_id = ?1",
1726                params![r.id.0],
1727                |row| row.get(0),
1728            )
1729            .unwrap();
1730
1731        store.upsert_record(&r, &c, None).unwrap();
1732        let rowid_after: i64 = store
1733            .conn()
1734            .query_row(
1735                "SELECT rowid FROM record_chunks WHERE record_id = ?1",
1736                params![r.id.0],
1737                |row| row.get(0),
1738            )
1739            .unwrap();
1740        assert_eq!(
1741            rowid_before, rowid_after,
1742            "rowid changed → DELETE+INSERT happened → jieba triggers fired"
1743        );
1744        // FTS still finds the content (because chunks_fts wasn't touched).
1745        let hits = store
1746            .search_chunks_fts("quick fox", &SearchFilter::default(), 5)
1747            .unwrap();
1748        assert_eq!(hits.len(), 1);
1749    }
1750
1751    #[test]
1752    fn reupsert_with_changed_raw_hash_still_rewrites_chunks() {
1753        // Negative case: when raw_hash genuinely changes the fast-path
1754        // must NOT swallow the update. Content rewrite + FTS reindex
1755        // must still happen.
1756        let store = Store::open_in_memory().unwrap();
1757        let mut r = make_record("a", "x", "old content", Kind::Fact);
1758        r.provenance.raw_hash = "hash-v1".into();
1759        let c1 = Chunker::default().chunk(&r.id, &r.content);
1760        store.upsert_record(&r, &c1, None).unwrap();
1761
1762        let mut r2 = r.clone();
1763        r2.content = "new completely different content".into();
1764        r2.provenance.raw_hash = "hash-v2".into();
1765        let c2 = Chunker::default().chunk(&r2.id, &r2.content);
1766        let (n, k) = store.upsert_record(&r2, &c2, None).unwrap();
1767        assert_eq!(n, 1, "raw_hash changed → record written");
1768        assert_eq!(k, c2.len() as u64, "chunks rewritten");
1769        let hits = store
1770            .search_chunks_fts("different", &SearchFilter::default(), 5)
1771            .unwrap();
1772        assert!(!hits.is_empty(), "new content searchable");
1773        let stale = store
1774            .search_chunks_fts("old", &SearchFilter::default(), 5)
1775            .unwrap();
1776        assert!(stale.is_empty(), "old content evicted");
1777    }
1778
1779    #[test]
1780    fn reupsert_no_op_still_enqueues_jobs_for_active_model() {
1781        // If the user switched embedding models between two imports,
1782        // the no-op fast-path must still enqueue jobs for the NEW model
1783        // (otherwise chunks would be invisible to vector search under
1784        // the new model). enqueue_jobs is ON CONFLICT DO NOTHING so
1785        // this is safe + cheap.
1786        let store = Store::open_in_memory().unwrap();
1787        let r = make_record("a", "x", "hello world", Kind::Fact);
1788        let c = Chunker::default().chunk(&r.id, &r.content);
1789        // First import with model A.
1790        store.set_active_model("local:model-a:1").unwrap();
1791        store.upsert_record(&r, &c, None).unwrap();
1792
1793        // Switch model, re-import the same record. raw_hash is identical
1794        // so write path skips, but jobs should be enqueued under model-b.
1795        store.set_active_model("local:model-b:1").unwrap();
1796        let (n, k) = store.upsert_record(&r, &c, None).unwrap();
1797        assert_eq!((n, k), (0, 0));
1798
1799        let pending_for_b: i64 = store
1800            .conn()
1801            .query_row(
1802                "SELECT COUNT(*) FROM embedding_jobs \
1803                 WHERE status = 'pending' AND model_id = 'local:model-b:1'",
1804                [],
1805                |r| r.get(0),
1806            )
1807            .unwrap();
1808        assert_eq!(
1809            pending_for_b as usize,
1810            c.len(),
1811            "fast-path must still enqueue jobs under the active model"
1812        );
1813    }
1814
1815    #[test]
1816    fn upsert_replaces_chunks_on_recall() {
1817        let store = Store::open_in_memory().unwrap();
1818        let mut r = make_record("a", "x", "v1", Kind::Fact);
1819        r.provenance.raw_hash = "v1-hash".into();
1820        let c1 = Chunker::default().chunk(&r.id, &r.content);
1821        store.upsert_record(&r, &c1, None).unwrap();
1822
1823        let mut r2 = r.clone();
1824        r2.content = "v2 different and longer ".repeat(40);
1825        // Round-7: a content change must come with a raw_hash bump, or
1826        // the fast-path will (correctly) treat the upsert as a no-op.
1827        // Real adapters always recompute raw_hash from the source bytes
1828        // so this is automatic in practice; the test must mirror that
1829        // by bumping the hash here.
1830        r2.provenance.raw_hash = "v2-hash".into();
1831        let c2 = Chunker::default().chunk(&r2.id, &r2.content);
1832        store.upsert_record(&r2, &c2, None).unwrap();
1833
1834        let chunk_count: i64 = store
1835            .conn()
1836            .query_row(
1837                "SELECT COUNT(1) FROM record_chunks WHERE record_id = ?1",
1838                params![r2.id.0],
1839                |row| row.get(0),
1840            )
1841            .unwrap();
1842        assert_eq!(chunk_count as usize, c2.len());
1843        // FTS index should match v2 content, not v1.
1844        let hits = store
1845            .search_chunks_fts("different", &SearchFilter::default(), 5)
1846            .unwrap();
1847        assert!(!hits.is_empty());
1848        let stale = store
1849            .search_chunks_fts("v1", &SearchFilter::default(), 5)
1850            .unwrap();
1851        assert!(stale.is_empty());
1852    }
1853
1854    #[test]
1855    fn fts_search_returns_chunks() {
1856        let store = Store::open_in_memory().unwrap();
1857        let r = make_record(
1858            "a",
1859            "x",
1860            "the quick brown fox jumps over the lazy dog",
1861            Kind::Fact,
1862        );
1863        let c = Chunker::default().chunk(&r.id, &r.content);
1864        store.upsert_record(&r, &c, None).unwrap();
1865        let hits = store
1866            .search_chunks_fts("quick fox", &SearchFilter::default(), 5)
1867            .unwrap();
1868        assert_eq!(hits.len(), 1);
1869        assert_eq!(hits[0].record_id, r.id);
1870        assert!(hits[0].score > 0.0);
1871    }
1872
1873    // ─── PR-Jieba (round-5): CJK FTS round-trip ───
1874    //
1875    // The point of jieba-based pre-tokenization is that a multi-char
1876    // Chinese phrase the user typed maps to the same word boundaries
1877    // jieba picked when we indexed the document. unicode61 alone
1878    // (the pre-PR-Jieba behaviour) would still match — because every
1879    // Han codepoint becomes its own token — but BM25 scoring would be
1880    // dominated by character frequency, not phrase frequency. The
1881    // semantics that matter to users only emerge once we agree that
1882    // "记忆" is one token, not two.
1883
1884    #[test]
1885    fn cjk_phrase_search_finds_indexed_document() {
1886        let store = Store::open_in_memory().unwrap();
1887        let r = make_record(
1888            "claude-code",
1889            "cjk-1",
1890            "Anamnesis 是跨 agent 的记忆基础设施,本地优先,无 telemetry",
1891            Kind::Fact,
1892        );
1893        let c = Chunker::default().chunk(&r.id, &r.content);
1894        store.upsert_record(&r, &c, None).unwrap();
1895
1896        // The exact phrase "记忆基础" (or any 2-char Chinese substring
1897        // of the content) must surface the indexed record.
1898        for query in &["记忆", "基础设施", "本地优先"] {
1899            let hits = store
1900                .search_chunks_fts(query, &SearchFilter::default(), 5)
1901                .unwrap();
1902            assert!(
1903                !hits.is_empty(),
1904                "CJK query {query:?} must find the indexed record"
1905            );
1906            assert_eq!(hits[0].record_id, r.id, "wrong record for query {query:?}");
1907        }
1908    }
1909
1910    #[test]
1911    fn cjk_search_distinguishes_distinct_words() {
1912        // Two documents that share characters but not jieba-segmented
1913        // words. With unicode61 they'd both match a single-char query;
1914        // with jieba they're correctly separated.
1915        let store = Store::open_in_memory().unwrap();
1916        let a = make_record("a", "a1", "我的偏好是 vim", Kind::Preference);
1917        let b = make_record("a", "b1", "项目里有很多代码", Kind::Fact);
1918        let ca = Chunker::default().chunk(&a.id, &a.content);
1919        let cb = Chunker::default().chunk(&b.id, &b.content);
1920        store.upsert_record(&a, &ca, None).unwrap();
1921        store.upsert_record(&b, &cb, None).unwrap();
1922
1923        let hits_pref = store
1924            .search_chunks_fts("偏好", &SearchFilter::default(), 5)
1925            .unwrap();
1926        assert_eq!(hits_pref.len(), 1);
1927        assert_eq!(hits_pref[0].record_id, a.id);
1928
1929        let hits_proj = store
1930            .search_chunks_fts("项目", &SearchFilter::default(), 5)
1931            .unwrap();
1932        assert_eq!(hits_proj.len(), 1);
1933        assert_eq!(hits_proj[0].record_id, b.id);
1934    }
1935
1936    #[test]
1937    fn empty_or_punctuation_only_query_returns_no_hits() {
1938        let store = Store::open_in_memory().unwrap();
1939        let r = make_record("a", "x", "alpha beta gamma", Kind::Fact);
1940        let c = Chunker::default().chunk(&r.id, &r.content);
1941        store.upsert_record(&r, &c, None).unwrap();
1942
1943        // FTS5 errors on empty MATCH — we must short-circuit instead.
1944        let empty = store
1945            .search_chunks_fts("", &SearchFilter::default(), 5)
1946            .unwrap();
1947        assert!(empty.is_empty());
1948        let punct = store
1949            .search_chunks_fts("!!!  ???", &SearchFilter::default(), 5)
1950            .unwrap();
1951        assert!(punct.is_empty());
1952    }
1953
1954    #[test]
1955    fn cjk_reindex_picks_up_existing_chunks() {
1956        // Migration 0003 sets `chunks_fts_rebuild_pending`; verify that
1957        // `Store::open` running over an existing DB with rows in
1958        // `record_chunks` reconstructs the FTS index. We can't easily
1959        // simulate the pre-0003 DB state from in-memory tests, so we
1960        // assert the simpler invariant: after `upsert_record + open`
1961        // the FTS row count equals the chunks row count. This catches
1962        // regression in the reindex path even if the migration shape
1963        // changes.
1964        let store = Store::open_in_memory().unwrap();
1965        let r = make_record("a", "x", "重新索引 测试", Kind::Fact);
1966        let c = Chunker::default().chunk(&r.id, &r.content);
1967        store.upsert_record(&r, &c, None).unwrap();
1968        let conn = store.conn.lock();
1969        let chunks_n: i64 = conn
1970            .query_row("SELECT COUNT(*) FROM record_chunks", [], |r| r.get(0))
1971            .unwrap();
1972        let fts_n: i64 = conn
1973            .query_row("SELECT COUNT(*) FROM chunks_fts", [], |r| r.get(0))
1974            .unwrap();
1975        assert_eq!(chunks_n, fts_n, "every chunk has an FTS row");
1976        assert!(chunks_n > 0);
1977    }
1978
1979    // ─── PR-C: candidate-side filter pushdown ───
1980    //
1981    // Codex's acceptance assertion (BLUEPRINT §17.5 PR-C consult):
1982    //
1983    //   "Construct 1744 claude-code records + 7 mem0 records sharing
1984    //    one query term; `source=mem0` must return non-empty results,
1985    //    all from mem0, even with a candidate-pool limit smaller than
1986    //    the claude-code majority."
1987    //
1988    // If filter pushdown is wrong, FTS picks the top-pool by BM25
1989    // unfiltered → the pool fills with claude-code chunks → post-filter
1990    // shrinks to zero. The whole point of pushdown is that the SQL
1991    // recall stage drops claude-code BEFORE the limit applies.
1992
1993    #[test]
1994    fn filter_pushdown_returns_minority_source_under_majority_dominance() {
1995        let store = Store::open_in_memory().unwrap();
1996        // 1744 claude-code records (every one matches "sharedterm").
1997        for i in 0..1744u32 {
1998            let r = make_record(
1999                "claude-code",
2000                &format!("cc-{i:04}"),
2001                "sharedterm claude noise",
2002                Kind::Episode,
2003            );
2004            let c = Chunker::default().chunk(&r.id, &r.content);
2005            store.upsert_record(&r, &c, None).unwrap();
2006        }
2007        // 7 mem0 records, all matching the same term.
2008        for i in 0..7u32 {
2009            let r = make_record(
2010                "mem0",
2011                &format!("m0-{i}"),
2012                "sharedterm mem0 fact",
2013                Kind::Fact,
2014            );
2015            let c = Chunker::default().chunk(&r.id, &r.content);
2016            store.upsert_record(&r, &c, None).unwrap();
2017        }
2018
2019        // With NO filter, the pool of 50 is dominated by claude-code.
2020        let none = store
2021            .search_chunks_fts("sharedterm", &SearchFilter::default(), 50)
2022            .unwrap();
2023        assert_eq!(none.len(), 50, "unfiltered hits fill the pool");
2024        let mem0_in_unfiltered = none
2025            .iter()
2026            .filter(|h| h.content.contains("mem0 fact"))
2027            .count();
2028        assert!(
2029            mem0_in_unfiltered <= 7,
2030            "without pushdown, the 7 mem0 records are squeezed by the 1744 claude-code majority"
2031        );
2032
2033        // WITH source=mem0 pushed into SQL, the pool is drawn from mem0
2034        // chunks only — even at the same pool size of 50.
2035        let filter = SearchFilter {
2036            source: Some("mem0".into()),
2037            ..SearchFilter::default()
2038        };
2039        let mem0_hits = store.search_chunks_fts("sharedterm", &filter, 50).unwrap();
2040        assert!(
2041            !mem0_hits.is_empty(),
2042            "source=mem0 must return non-empty results from the minority adapter"
2043        );
2044        assert_eq!(
2045            mem0_hits.len(),
2046            7,
2047            "filter pushdown must surface all 7 mem0 chunks, not zero"
2048        );
2049        for h in &mem0_hits {
2050            assert!(
2051                h.content.contains("mem0 fact"),
2052                "every hit must come from the mem0 adapter, not the claude-code majority"
2053            );
2054            assert!(
2055                !h.content.contains("claude noise"),
2056                "no claude-code chunk should leak through the SQL filter"
2057            );
2058        }
2059    }
2060
2061    #[test]
2062    fn filter_pushdown_supports_kind_and_scope_independently() {
2063        let store = Store::open_in_memory().unwrap();
2064        for (na, content, kind) in &[
2065            ("a", "shared topic alpha", Kind::Fact),
2066            ("b", "shared topic beta", Kind::Preference),
2067            ("c", "shared topic gamma", Kind::Feedback),
2068        ] {
2069            let r = make_record("claude-code", na, content, *kind);
2070            let c = Chunker::default().chunk(&r.id, &r.content);
2071            store.upsert_record(&r, &c, None).unwrap();
2072        }
2073        let kind_filter = SearchFilter {
2074            kind: Some("preference".into()),
2075            ..SearchFilter::default()
2076        };
2077        let hits = store
2078            .search_chunks_fts("shared topic", &kind_filter, 10)
2079            .unwrap();
2080        assert_eq!(hits.len(), 1);
2081        assert!(hits[0].content.contains("beta"));
2082    }
2083
2084    #[test]
2085    fn filter_pushdown_respects_time_range() {
2086        let store = Store::open_in_memory().unwrap();
2087        // Manually crafted records at known timestamps.
2088        for (na, content, ts) in &[
2089            ("old", "shared topic", 1700000000_i64), // 2023-11
2090            ("mid", "shared topic", 1750000000_i64), // 2025-06
2091            ("new", "shared topic", 1800000000_i64), // 2027-01
2092        ] {
2093            let mut r = make_record("claude-code", na, content, Kind::Episode);
2094            r.created_at = Utc.timestamp_opt(*ts, 0).unwrap();
2095            let c = Chunker::default().chunk(&r.id, &r.content);
2096            store.upsert_record(&r, &c, None).unwrap();
2097        }
2098        let filter = SearchFilter {
2099            time_from: Some(1720000000),
2100            time_to: Some(1780000000),
2101            ..SearchFilter::default()
2102        };
2103        let hits = store
2104            .search_chunks_fts("shared topic", &filter, 10)
2105            .unwrap();
2106        assert_eq!(hits.len(), 1, "only the mid record falls in the window");
2107    }
2108
2109    #[test]
2110    fn active_model_setter_reads_back() {
2111        let store = Store::open_in_memory().unwrap();
2112        assert_eq!(store.active_model().unwrap(), None);
2113        store.set_active_model("local:e5:1").unwrap();
2114        assert_eq!(store.active_model().unwrap().as_deref(), Some("local:e5:1"));
2115        store.set_active_model("local:bge-m3:1").unwrap();
2116        assert_eq!(
2117            store.active_model().unwrap().as_deref(),
2118            Some("local:bge-m3:1")
2119        );
2120    }
2121
2122    #[test]
2123    fn upsert_enqueues_jobs_under_active_model() {
2124        let store = Store::open_in_memory().unwrap();
2125        store.set_active_model("local:e5:1").unwrap();
2126        let r = make_record("a", "x", "hello world", Kind::Fact);
2127        let c = Chunker::default().chunk(&r.id, &r.content);
2128        store.upsert_record(&r, &c, None).unwrap();
2129        let n: i64 = store
2130            .conn()
2131            .query_row(
2132                "SELECT COUNT(1) FROM embedding_jobs WHERE status = 'pending' AND model_id = 'local:e5:1'",
2133                [],
2134                |row| row.get(0),
2135            )
2136            .unwrap();
2137        assert_eq!(n, c.len() as i64);
2138    }
2139
2140    #[test]
2141    fn no_active_model_means_no_jobs() {
2142        let store = Store::open_in_memory().unwrap();
2143        let r = make_record("a", "x", "hi", Kind::Fact);
2144        let c = Chunker::default().chunk(&r.id, &r.content);
2145        store.upsert_record(&r, &c, None).unwrap();
2146        let n: i64 = store
2147            .conn()
2148            .query_row("SELECT COUNT(1) FROM embedding_jobs", [], |row| row.get(0))
2149            .unwrap();
2150        assert_eq!(n, 0);
2151    }
2152
2153    #[test]
2154    fn claim_and_complete_job_cycle() {
2155        let store = Store::open_in_memory().unwrap();
2156        store.set_active_model("local:fake:1").unwrap();
2157        let r = make_record("a", "x", "alpha", Kind::Fact);
2158        let c = Chunker::default().chunk(&r.id, &r.content);
2159        store.upsert_record(&r, &c, None).unwrap();
2160
2161        let job = store.claim_next_job("local:fake:1").unwrap().unwrap();
2162        assert_eq!(job.content, "alpha");
2163        assert_eq!(job.model_id, "local:fake:1");
2164
2165        // Same call should now miss (claimed → in_progress).
2166        let none = store.claim_next_job("local:fake:1").unwrap();
2167        assert!(none.is_none());
2168
2169        store.complete_job(&job, &[0.5, 0.5, 0.5, 0.5]).unwrap();
2170
2171        let pending: i64 = store
2172            .conn()
2173            .query_row(
2174                "SELECT COUNT(1) FROM embedding_jobs WHERE status = 'pending'",
2175                [],
2176                |r| r.get(0),
2177            )
2178            .unwrap();
2179        assert_eq!(pending, 0);
2180
2181        // Vector search must now find this chunk.
2182        let hits = store
2183            .search_chunks_vec(
2184                &[0.5, 0.5, 0.5, 0.5],
2185                "local:fake:1",
2186                &SearchFilter::default(),
2187                5,
2188            )
2189            .unwrap();
2190        assert_eq!(hits.len(), 1);
2191        assert!((hits[0].score - 1.0).abs() < 1e-9);
2192    }
2193
2194    #[test]
2195    fn fail_job_marks_failed_and_unblocks_next() {
2196        let store = Store::open_in_memory().unwrap();
2197        store.set_active_model("local:fake:1").unwrap();
2198        let r1 = make_record("a", "x", "one", Kind::Fact);
2199        let r2 = make_record("a", "y", "two", Kind::Fact);
2200        let c1 = Chunker::default().chunk(&r1.id, &r1.content);
2201        let c2 = Chunker::default().chunk(&r2.id, &r2.content);
2202        store.upsert_record(&r1, &c1, None).unwrap();
2203        store.upsert_record(&r2, &c2, None).unwrap();
2204
2205        let j1 = store.claim_next_job("local:fake:1").unwrap().unwrap();
2206        store.fail_job(j1.job_id, "boom").unwrap();
2207
2208        let j2 = store.claim_next_job("local:fake:1").unwrap().unwrap();
2209        assert_ne!(j2.chunk_id, j1.chunk_id);
2210    }
2211
2212    #[test]
2213    fn rebuild_jobs_targets_a_new_model() {
2214        let store = Store::open_in_memory().unwrap();
2215        store.set_active_model("local:a:1").unwrap();
2216        let r = make_record("a", "x", "hi", Kind::Fact);
2217        let c = Chunker::default().chunk(&r.id, &r.content);
2218        store.upsert_record(&r, &c, None).unwrap();
2219
2220        let n = store.rebuild_embedding_jobs("local:b:1").unwrap();
2221        assert_eq!(n, c.len() as u64);
2222
2223        let by_model: Vec<(String, i64)> = {
2224            let conn = store.conn();
2225            let mut stmt = conn
2226                .prepare(
2227                    "SELECT model_id, COUNT(1) FROM embedding_jobs GROUP BY model_id ORDER BY model_id",
2228                )
2229                .unwrap();
2230            stmt.query_map([], |r| Ok((r.get::<_, String>(0)?, r.get::<_, i64>(1)?)))
2231                .unwrap()
2232                .collect::<rusqlite::Result<_>>()
2233                .unwrap()
2234        };
2235        assert_eq!(
2236            by_model,
2237            vec![
2238                ("local:a:1".into(), c.len() as i64),
2239                ("local:b:1".into(), c.len() as i64),
2240            ]
2241        );
2242    }
2243
2244    #[test]
2245    fn stats_reports_counts() {
2246        let store = Store::open_in_memory().unwrap();
2247        store.set_active_model("local:fake:1").unwrap();
2248        store
2249            .register_source("claude-code", None, None, None)
2250            .unwrap();
2251        let r = make_record("a", "x", "hello", Kind::Fact);
2252        let c = Chunker::default().chunk(&r.id, &r.content);
2253        store.upsert_record(&r, &c, None).unwrap();
2254        let s = store.stats().unwrap();
2255        assert_eq!(s.records, 1);
2256        assert_eq!(s.chunks, c.len() as u64);
2257        assert_eq!(s.jobs_pending, c.len() as u64);
2258        assert_eq!(s.jobs_failed, 0);
2259        assert_eq!(s.sources, 1);
2260    }
2261
2262    #[test]
2263    fn import_error_logged_and_visible() {
2264        let store = Store::open_in_memory().unwrap();
2265        store
2266            .log_import_error("a", None, Some("nid"), Some("/p"), "parse", "bad json")
2267            .unwrap();
2268        let count: i64 = store
2269            .conn()
2270            .query_row("SELECT COUNT(1) FROM import_errors", [], |r| r.get(0))
2271            .unwrap();
2272        assert_eq!(count, 1);
2273    }
2274
2275    #[test]
2276    fn source_vector_is_persisted_to_raw_artifacts() {
2277        let store = Store::open_in_memory().unwrap();
2278        let mut r = make_record("mem0", "x", "hi", Kind::Fact);
2279        r.embedding = Some(Embedding {
2280            vector: vec![0.1, 0.2, 0.3],
2281            model: "openai:text-embedding-3-small".into(),
2282            dim: 3,
2283        });
2284        let c = Chunker::default().chunk(&r.id, &r.content);
2285        store.upsert_record(&r, &c, None).unwrap();
2286        let (blob, model, dim): (Vec<u8>, String, i64) = store
2287            .conn()
2288            .query_row(
2289                "SELECT source_embedding, source_embedding_model, source_embedding_dim \
2290                 FROM raw_artifacts WHERE record_id = ?1",
2291                params![r.id.0],
2292                |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
2293            )
2294            .unwrap();
2295        assert_eq!(model, "openai:text-embedding-3-small");
2296        assert_eq!(dim, 3);
2297        assert_eq!(blob_to_f32(&blob).unwrap(), vec![0.1, 0.2, 0.3]);
2298    }
2299
2300    // ─────────────────────────────────────────────────────────────────────
2301    // Round-21 (§-1.5 PR-2): list_record_ids_paged cursor contract.
2302    // ─────────────────────────────────────────────────────────────────────
2303
2304    fn seed_n_records(store: &Store, n: usize) {
2305        for i in 0..n {
2306            let r = make_record(
2307                "claude-code",
2308                &format!("seed-{i:04}"),
2309                &format!("content {i}"),
2310                Kind::Fact,
2311            );
2312            let c = Chunker::default().chunk(&r.id, &r.content);
2313            store.upsert_record(&r, &c, None).unwrap();
2314        }
2315    }
2316
2317    #[test]
2318    fn paged_listing_walks_through_full_catalogue_via_cursor() {
2319        let store = Store::open_in_memory().unwrap();
2320        seed_n_records(&store, 25);
2321
2322        let mut collected: Vec<String> = Vec::new();
2323        let mut cursor: Option<String> = None;
2324        for _ in 0..100 {
2325            // Outer cap; we expect to terminate in ≤ 3 iterations.
2326            let (page, next) = store.list_record_ids_paged(cursor.as_deref(), 10).unwrap();
2327            // Pages are non-empty until we exit.
2328            assert!(!page.is_empty(), "non-final page must have rows");
2329            collected.extend(page);
2330            if next.is_none() {
2331                break;
2332            }
2333            cursor = next;
2334        }
2335        assert_eq!(
2336            collected.len(),
2337            25,
2338            "pagination must yield every record exactly once"
2339        );
2340
2341        // Lexicographic ascending order is the documented contract.
2342        let mut sorted = collected.clone();
2343        sorted.sort();
2344        assert_eq!(collected, sorted);
2345
2346        // No duplicates.
2347        let unique: std::collections::HashSet<&String> = collected.iter().collect();
2348        assert_eq!(unique.len(), collected.len());
2349    }
2350
2351    #[test]
2352    fn paged_listing_signals_end_with_none_cursor() {
2353        // When the page returns fewer than `limit` rows, `next_cursor`
2354        // must be `None` — that's the "end of catalogue" signal.
2355        let store = Store::open_in_memory().unwrap();
2356        seed_n_records(&store, 3);
2357        let (page, next) = store.list_record_ids_paged(None, 10).unwrap();
2358        assert_eq!(page.len(), 3);
2359        assert!(next.is_none(), "page < limit must clear nextCursor");
2360    }
2361
2362    #[test]
2363    fn paged_listing_clamps_limit() {
2364        // limit=0 must clamp to 1; limit>MAX must clamp to MAX. The
2365        // store should never refuse a malformed limit — it should be
2366        // permissive at the edge and let the caller see useful data.
2367        let store = Store::open_in_memory().unwrap();
2368        seed_n_records(&store, 5);
2369        let (page, _) = store.list_record_ids_paged(None, 0).unwrap();
2370        assert_eq!(page.len(), 1, "limit=0 must clamp to 1");
2371        let (page, _) = store.list_record_ids_paged(None, u32::MAX).unwrap();
2372        assert!(page.len() <= MAX_LIST_LIMIT as usize);
2373        assert_eq!(page.len(), 5);
2374    }
2375
2376    #[test]
2377    fn derived_from_roundtrips_through_store() {
2378        // §-1.5 PR-6 regression: a record carrying `provenance.derived_from`
2379        // must survive upsert + get_record without losing the lineage link.
2380        // This is the only audit hook §-1.5 #6 promises.
2381        let store = Store::open_in_memory().unwrap();
2382        let parent = make_record("claude-code", "ep-1", "raw conversation", Kind::Episode);
2383        let parent_id = parent.id.clone();
2384        let chunks = Chunker::default().chunk(&parent.id, &parent.content);
2385        store.upsert_record(&parent, &chunks, None).unwrap();
2386
2387        let mut derived = make_record("extractor", "fact-1", "user lives in Paris", Kind::Fact);
2388        derived.provenance.derived_from = Some(parent_id.clone());
2389        let derived_chunks = Chunker::default().chunk(&derived.id, &derived.content);
2390        let derived_id = derived.id.clone();
2391        store
2392            .upsert_record(&derived, &derived_chunks, None)
2393            .unwrap();
2394
2395        let got_parent = store.get_record(&parent_id).unwrap().unwrap();
2396        assert!(
2397            got_parent.provenance.derived_from.is_none(),
2398            "non-derived records keep derived_from = None on the way back"
2399        );
2400
2401        let got_derived = store.get_record(&derived_id).unwrap().unwrap();
2402        assert_eq!(
2403            got_derived.provenance.derived_from.as_ref().map(|r| &r.0),
2404            Some(&parent_id.0),
2405            "derived record's lineage must point at the source Episode after round-trip"
2406        );
2407    }
2408
2409    #[test]
2410    fn list_derivations_returns_only_direct_children() {
2411        let store = Store::open_in_memory().unwrap();
2412        let parent = make_record("claude-code", "ep-1", "raw conversation", Kind::Episode);
2413        let pid = parent.id.clone();
2414        let pc = Chunker::default().chunk(&parent.id, &parent.content);
2415        store.upsert_record(&parent, &pc, None).unwrap();
2416
2417        let mut child_a = make_record("extractor", "fact-a", "user lives in Paris", Kind::Fact);
2418        child_a.provenance.derived_from = Some(pid.clone());
2419        let c_a = Chunker::default().chunk(&child_a.id, &child_a.content);
2420        store.upsert_record(&child_a, &c_a, None).unwrap();
2421
2422        let mut child_b = make_record("extractor", "pref-a", "prefers Rust", Kind::Preference);
2423        child_b.provenance.derived_from = Some(pid.clone());
2424        let c_b = Chunker::default().chunk(&child_b.id, &child_b.content);
2425        store.upsert_record(&child_b, &c_b, None).unwrap();
2426
2427        // Sibling that is NOT derived from parent — must not appear.
2428        let unrelated = make_record("claude-code", "ep-2", "different episode", Kind::Episode);
2429        let cu = Chunker::default().chunk(&unrelated.id, &unrelated.content);
2430        store.upsert_record(&unrelated, &cu, None).unwrap();
2431
2432        let children = store.list_derivations(&pid, 50).unwrap();
2433        assert_eq!(children.len(), 2);
2434        let kinds: std::collections::HashSet<_> = children.iter().map(|r| r.kind).collect();
2435        assert!(kinds.contains(&Kind::Fact));
2436        assert!(kinds.contains(&Kind::Preference));
2437    }
2438
2439    #[test]
2440    fn lineage_chain_walks_to_root() {
2441        let store = Store::open_in_memory().unwrap();
2442        // Episode (root) → Fact (mid) → Skill (leaf).
2443        let root = make_record("claude-code", "ep-1", "raw conv", Kind::Episode);
2444        let root_id = root.id.clone();
2445        let rc = Chunker::default().chunk(&root.id, &root.content);
2446        store.upsert_record(&root, &rc, None).unwrap();
2447
2448        let mut mid = make_record("extractor", "fact-a", "Paris is capital", Kind::Fact);
2449        mid.provenance.derived_from = Some(root_id.clone());
2450        let mid_id = mid.id.clone();
2451        let mc = Chunker::default().chunk(&mid.id, &mid.content);
2452        store.upsert_record(&mid, &mc, None).unwrap();
2453
2454        let mut leaf = make_record("extractor", "skill-a", "how to check capital", Kind::Skill);
2455        leaf.provenance.derived_from = Some(mid_id.clone());
2456        let leaf_id = leaf.id.clone();
2457        let lc = Chunker::default().chunk(&leaf.id, &leaf.content);
2458        store.upsert_record(&leaf, &lc, None).unwrap();
2459
2460        let chain = store.lineage_chain(&leaf_id).unwrap().unwrap();
2461        assert_eq!(chain.records.len(), 3);
2462        assert_eq!(chain.records[0].id.0, leaf_id.0);
2463        assert_eq!(chain.records[1].id.0, mid_id.0);
2464        assert_eq!(chain.records[2].id.0, root_id.0);
2465        assert!(chain.missing_parent.is_none());
2466    }
2467
2468    #[test]
2469    fn lineage_chain_missing_parent_is_signaled() {
2470        let store = Store::open_in_memory().unwrap();
2471        let phantom = RecordId("never-stored-record".into());
2472        let mut orphan = make_record("extractor", "orphan", "dangling fact", Kind::Fact);
2473        orphan.provenance.derived_from = Some(phantom.clone());
2474        let oid = orphan.id.clone();
2475        let oc = Chunker::default().chunk(&orphan.id, &orphan.content);
2476        store.upsert_record(&orphan, &oc, None).unwrap();
2477
2478        let chain = store.lineage_chain(&oid).unwrap().unwrap();
2479        assert_eq!(chain.records.len(), 1);
2480        assert_eq!(chain.records[0].id.0, oid.0);
2481        assert_eq!(chain.missing_parent.unwrap().0, phantom.0);
2482    }
2483
2484    #[test]
2485    fn lineage_chain_returns_none_for_unknown_start() {
2486        let store = Store::open_in_memory().unwrap();
2487        let chain = store
2488            .lineage_chain(&RecordId("does-not-exist".into()))
2489            .unwrap();
2490        assert!(chain.is_none());
2491    }
2492
2493    #[test]
2494    fn lineage_chain_detects_cycle_and_errors() {
2495        // Build A → B → A via direct DB writes. The high-level API
2496        // can't construct this (insertion order forbids it) but a
2497        // corrupted file or future bug could — make sure the walk
2498        // bails loudly instead of looping forever.
2499        let store = Store::open_in_memory().unwrap();
2500        let a = make_record("extractor", "a", "node a", Kind::Fact);
2501        let b = make_record("extractor", "b", "node b", Kind::Fact);
2502        let aid = a.id.clone();
2503        let bid = b.id.clone();
2504        let ac = Chunker::default().chunk(&a.id, &a.content);
2505        let bc = Chunker::default().chunk(&b.id, &b.content);
2506        store.upsert_record(&a, &ac, None).unwrap();
2507        store.upsert_record(&b, &bc, None).unwrap();
2508        // Hand-write the cycle.
2509        store
2510            .conn()
2511            .execute(
2512                "UPDATE records SET derived_from = ?1 WHERE id = ?2",
2513                params![bid.0, aid.0],
2514            )
2515            .unwrap();
2516        store
2517            .conn()
2518            .execute(
2519                "UPDATE records SET derived_from = ?1 WHERE id = ?2",
2520                params![aid.0, bid.0],
2521            )
2522            .unwrap();
2523        let err = store.lineage_chain(&aid).unwrap_err();
2524        match err {
2525            StoreError::Corruption(msg) => assert!(msg.contains("cycle")),
2526            other => panic!("expected Corruption, got {other:?}"),
2527        }
2528    }
2529
2530    #[test]
2531    fn derived_from_index_is_present_after_migration() {
2532        // The migration explicitly creates `idx_records_derived_from`. If
2533        // a future change drops it, the `anamnesis lineage` query path
2534        // would regress to a full table scan — fail loudly here so the
2535        // perf characteristic is part of the contract.
2536        let store = Store::open_in_memory().unwrap();
2537        let count: i64 = store
2538            .conn()
2539            .query_row(
2540                "SELECT COUNT(*) FROM sqlite_master \
2541                 WHERE type = 'index' AND name = 'idx_records_derived_from'",
2542                [],
2543                |r| r.get(0),
2544            )
2545            .unwrap();
2546        assert_eq!(
2547            count, 1,
2548            "derived_from index must exist after 0004 migration"
2549        );
2550    }
2551}