Skip to main content

semantic_memory/
lib.rs

1#![allow(deprecated)]
2
3//! # semantic-memory
4//!
5//! Local-first semantic memory backed by authoritative SQLite state and an optional recoverable
6//! HNSW sidecar.
7//!
8//! The crate stores facts, chunked documents, conversation messages, and searchable episodes in
9//! SQLite. Search combines BM25 (FTS5) and vector retrieval with Reciprocal Rank Fusion, and
10//! `search_explained()` returns the exact scoring breakdown from the live pipeline.
11//!
12//! Concurrency uses one writer connection plus a pool of WAL-enabled reader connections.
13//! Durable writes are committed to SQLite first; any required HNSW sidecar mutations are journaled
14//! in SQLite and replayed on open, flush, rebuild, or reconcile.
15//!
16//! `search()` targets facts, document chunks, and episodes by default. Message retrieval is
17//! available through `search_conversations()` or by opting into
18//! [`SearchSourceType::Messages`](crate::SearchSourceType::Messages).
19//!
20//! Integrity tooling is strict about malformed stored data: invalid roles, JSON, enums, embedding
21//! blobs, quantized blobs, and sidecar drift are surfaced through `verify_integrity()` instead of
22//! being silently converted into defaults. `reconcile()` can rebuild FTS or fully re-embed and
23//! rebuild derived state from SQLite.
24//!
25//! `store.graph_view()` exposes a deterministic graph traversal layer over namespaces, facts,
26//! documents, chunks, sessions, messages, episodes, and semantic/temporal/causal links derived
27//! from SQLite state.
28//!
29//! ## Quick Start
30//!
31//! ```rust,no_run
32//! use semantic_memory::{MemoryConfig, MemoryStore};
33//!
34//! # async fn example() -> Result<(), semantic_memory::MemoryError> {
35//! let store = MemoryStore::open(MemoryConfig::default())?;
36//!
37//! // Store a fact
38//! store.add_fact("general", "Rust was first released in 2015", None, None).await?;
39//!
40//! // Search
41//! let results = store.search("when was Rust released", None, None, None).await?;
42//! # Ok(())
43//! # }
44//! ```
45//!
46//! ## Operational Notes
47//!
48//! - SQLite is authoritative for all durable records and embeddings.
49//! - HNSW is an acceleration sidecar. Pending sidecar mutations are journaled in SQLite, so a
50//!   sidecar failure does not imply the SQLite write rolled back.
51//! - WAL mode plus pooled reader connections allows concurrent reads while writes serialize through
52//!   the writer connection.
53//! - `search_explained()` reflects the exact ranking math used by the active search pipeline,
54//!   including reranking from exact f32 cosine similarity when configured.
55
56// At least one search backend must be enabled.
57#[cfg(not(any(feature = "hnsw", feature = "brute-force", feature = "usearch-backend")))]
58compile_error!(
59    "At least one search backend feature must be enabled: 'hnsw', 'usearch-backend', or 'brute-force'"
60);
61
62pub mod chunker;
63pub mod config;
64pub(crate) mod conversation;
65pub mod db;
66pub(crate) mod documents;
67pub mod embedder;
68pub(crate) mod episodes;
69pub mod error;
70mod graph;
71#[cfg(feature = "hnsw")]
72pub mod hnsw;
73#[cfg(feature = "hnsw")]
74mod hnsw_backend;
75#[cfg(feature = "hnsw")]
76mod hnsw_ops;
77#[cfg(feature = "usearch-backend")]
78mod usearch_backend;
79pub mod vector_backend;
80mod json_compat_import;
81pub(crate) mod knowledge;
82mod pool;
83mod projection_batch;
84mod projection_derivation;
85/// Compatibility-only legacy import surface.
86///
87/// This module exists only for migration compatibility with pre-V11 import paths.
88#[deprecated(
89    since = "0.6.0",
90    note = "Legacy V10 import path is migration-only. Use `import_projection_batch()` with `ProjectionImportBatchV3` on the canonical lane."
91)]
92#[doc(hidden)]
93pub mod projection_import;
94mod projection_lane;
95mod projection_legacy_compat;
96pub(crate) mod projection_storage;
97pub mod quantize;
98pub mod quantize_governed;
99pub mod search;
100pub mod storage;
101mod store_support;
102pub mod tokenizer;
103pub mod types;
104pub mod vector_codec;
105
106// Re-export primary public types.
107pub use config::{
108    ChunkingConfig, DerivedVectorBackendPolicy, EmbeddingConfig, MemoryConfig, MemoryLimits,
109    PoolConfig, SearchConfig,
110};
111pub use db::{IntegrityReport, ReconcileAction, VerifyMode};
112pub use embedder::{Embedder, MockEmbedder, OllamaEmbedder};
113pub use error::MemoryError;
114#[cfg(feature = "hnsw")]
115pub use hnsw::{HnswConfig, HnswHit, HnswIndex};
116// Type aliases for the new VectorBackend trait. The Hnsw* names are kept
117// for source compatibility; new code should prefer the Vector* names.
118pub use vector_backend::{VectorBackend, VectorHit, VectorIndex, VectorIndexConfig};
119pub(crate) use projection_lane::projection_import_failure_id;
120pub use projection_lane::{
121    ProjectionImportFailureReceiptEntry, ProjectionImportLogEntry, ProjectionImportResult,
122};
123pub use quantize::{pack_quantized, unpack_quantized, QuantizedVector, Quantizer};
124pub use storage::StoragePaths;
125pub use tokenizer::{EstimateTokenCounter, TokenCounter};
126pub use types::{
127    ChunkManifestChunkMapping, ChunkManifestEntry, ChunkManifestIngestOptions,
128    ChunkManifestIngestResult, Document, EmbeddingDisplacement, EpisodeAsOfReceiptV1, EpisodeMeta,
129    EpisodeOutcome, ExactnessProfile, ExplainedResult, ExplainedResultAnswerV1,
130    ExplainedSearchResponse, Fact, GraphDirection, GraphEdge, GraphEdgeType, GraphView,
131    MemoryStats, Message, NamespaceDeleteReport, ProjectionClaimVersion, ProjectionEntityAlias,
132    ProjectionEpisode, ProjectionEvidenceRef, ProjectionQuery, ProjectionRelationVersion,
133    ReceiptMode, Role, ScoreBreakdown, SearchContext, SearchReceiptAnswersV1, SearchReplayReportV1,
134    SearchResponse, SearchResult, SearchSource, SearchSourceType, Session, TextChunk,
135    VectorArtifactBuildReceiptV1, VectorSearchReceiptV1, VerificationStatus,
136};
137#[cfg(feature = "turbo-quant-codec")]
138pub use vector_codec::TurboQuantCodec;
139pub use vector_codec::{
140    RawF32Codec, Sq8Codec, VectorArtifactV1, VectorCodec, VectorCodecProfileV1,
141};
142
143use std::sync::Arc;
144
145const MAX_TOP_K: usize = 1_000;
146#[cfg(feature = "hnsw")]
147const MAX_HNSW_CANDIDATES: usize = 10_000;
148
149pub(crate) use store_support::{
150    as_str_slice, build_episode_search_text, merge_trace_ctx, to_owned_string_vec,
151    verification_status_for_outcome,
152};
153
154#[cfg(feature = "hnsw")]
155fn verify_hnsw_key_level_integrity(
156    conn: &rusqlite::Connection,
157    dimensions: usize,
158    node_vectors: &std::collections::HashMap<usize, Vec<f32>>,
159    sidecar_files_exist: bool,
160) -> Result<Vec<String>, MemoryError> {
161    let mut issues = Vec::new();
162    let mut live_rows: std::collections::HashMap<String, Vec<f32>> =
163        std::collections::HashMap::new();
164
165    let mut live_stmt = conn.prepare(
166        "SELECT 'fact:' || id, embedding FROM facts WHERE embedding IS NOT NULL
167         UNION ALL
168         SELECT 'chunk:' || id, embedding FROM chunks WHERE embedding IS NOT NULL
169         UNION ALL
170         SELECT 'msg:' || id, embedding FROM messages WHERE embedding IS NOT NULL
171         UNION ALL
172         SELECT 'episode:' || episode_id, embedding FROM episodes WHERE embedding IS NOT NULL",
173    )?;
174    let live_iter = live_stmt.query_map([], |row| {
175        Ok((row.get::<_, String>(0)?, row.get::<_, Vec<u8>>(1)?))
176    })?;
177    for row in live_iter {
178        let (key, blob) = row?;
179        match db::decode_f32_le(&blob, dimensions) {
180            Ok(vector) => {
181                live_rows.insert(key, vector);
182            }
183            Err(err) => issues.push(format!(
184                "HNSW live embedding row {key} has invalid vector: {err}"
185            )),
186        }
187    }
188
189    if !live_rows.is_empty() && !sidecar_files_exist {
190        issues.push(format!(
191            "HNSW sidecar files are missing while {} embedded rows exist in SQLite",
192            live_rows.len()
193        ));
194    }
195
196    let keymap_exists: bool = conn
197        .query_row(
198            "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='hnsw_keymap'",
199            [],
200            |row| row.get(0),
201        )
202        .unwrap_or(false);
203    if !keymap_exists {
204        if !live_rows.is_empty() {
205            issues.push("HNSW keymap table missing while embedded SQLite rows exist".to_string());
206        }
207        return Ok(issues);
208    }
209
210    let mut active_keymap: std::collections::HashMap<String, usize> =
211        std::collections::HashMap::new();
212    let mut keymap_stmt =
213        conn.prepare("SELECT node_id, item_key FROM hnsw_keymap WHERE deleted = 0")?;
214    let keymap_iter = keymap_stmt.query_map([], |row| {
215        Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
216    })?;
217    for row in keymap_iter {
218        let (node_id_raw, key) = row?;
219        let Some((domain, raw_id)) = key.split_once(':') else {
220            issues.push(format!("HNSW keymap entry has malformed key: {key}"));
221            continue;
222        };
223        if !matches!(domain, "fact" | "chunk" | "msg" | "episode") || raw_id.is_empty() {
224            issues.push(format!(
225                "HNSW keymap entry has unsupported key domain: {key}"
226            ));
227            continue;
228        }
229        if domain == "msg" && raw_id.parse::<i64>().is_err() {
230            issues.push(format!("HNSW message key has non-integer row id: {key}"));
231            continue;
232        }
233        let node_id = match usize::try_from(node_id_raw) {
234            Ok(node_id) => node_id,
235            Err(err) => {
236                issues.push(format!(
237                    "HNSW keymap node_id {node_id_raw} is invalid: {err}"
238                ));
239                continue;
240            }
241        };
242        active_keymap.insert(key, node_id);
243    }
244
245    for key in live_rows.keys() {
246        if !active_keymap.contains_key(key) {
247            issues.push(format!(
248                "HNSW keymap missing live embedded SQLite row: {key}"
249            ));
250        }
251    }
252
253    for (key, node_id) in &active_keymap {
254        let Some(live_vector) = live_rows.get(key) else {
255            issues.push(format!(
256                "HNSW keymap has stale active entry without live embedded SQLite row: {key}"
257            ));
258            continue;
259        };
260        let Some(index_vector) = node_vectors.get(node_id) else {
261            issues.push(format!(
262                "HNSW keymap entry {key} points to missing in-memory node vector {node_id}"
263            ));
264            continue;
265        };
266        if index_vector.len() != live_vector.len()
267            || index_vector
268                .iter()
269                .zip(live_vector)
270                .any(|(left, right)| left.to_bits() != right.to_bits())
271        {
272            issues.push(format!(
273                "HNSW keymap entry {key} points to node {node_id} whose vector does not match the authoritative SQLite embedding"
274            ));
275        }
276    }
277
278    if active_keymap.len() != live_rows.len() {
279        issues.push(format!(
280            "HNSW keymap drift: {} active keymap rows vs {} embedded SQLite rows",
281            active_keymap.len(),
282            live_rows.len()
283        ));
284    }
285
286    Ok(issues)
287}
288
289/// Compatibility-only public access to retained legacy surfaces.
290#[doc(hidden)]
291pub mod compat {
292    #[deprecated(
293        since = "0.5.0",
294        note = "Legacy ImportEnvelope is migration-only. New integrations should use `ProjectionImportBatchV3` on the canonical lane."
295    )]
296    #[doc(hidden)]
297    #[allow(deprecated)]
298    pub mod legacy_import_envelope {
299        pub use crate::projection_import::{
300            ImportEnvelope, ImportProjectionFreshness, ImportReceipt, ImportRecord, ImportStatus,
301        };
302        pub use stack_ids::EnvelopeId;
303    }
304
305    #[deprecated(
306        since = "0.5.0",
307        note = "Legacy trace_id is migration-only. Use `stack_ids::TraceCtx`."
308    )]
309    #[doc(hidden)]
310    #[allow(deprecated)]
311    pub mod compat_trace_id {
312        pub use crate::types::TraceId;
313    }
314}
315
316/// Thread-safe handle to the memory database.
317///
318/// Clone is cheap (Arc internals). `Send + Sync`.
319#[derive(Clone)]
320pub struct MemoryStore {
321    inner: Arc<MemoryStoreInner>,
322}
323
324struct MemoryStoreInner {
325    pool: pool::SqlitePool,
326    embedder: Box<dyn Embedder>,
327    embedding_permits: Arc<tokio::sync::Semaphore>,
328    config: MemoryConfig,
329    paths: StoragePaths,
330    token_counter: Arc<dyn TokenCounter>,
331    #[cfg(feature = "hnsw")]
332    hnsw_index: std::sync::RwLock<HnswIndex>,
333}
334
335#[cfg(feature = "hnsw")]
336impl Drop for MemoryStoreInner {
337    fn drop(&mut self) {
338        if !self.paths.hnsw_dir.exists() {
339            tracing::debug!(
340                path = %self.paths.hnsw_dir.display(),
341                "Skipping HNSW drop flush because the sidecar directory no longer exists"
342            );
343            return;
344        }
345
346        let pending_ops = match self.pool.with_read_conn(db::pending_index_op_count) {
347            Ok(count) => count,
348            Err(err) => {
349                tracing::warn!("Failed to inspect pending HNSW work on drop: {}", err);
350                0
351            }
352        };
353
354        if pending_ops > 0 {
355            if let Err(err) =
356                hnsw_ops::recover_hnsw_sidecar_sync(&self.pool, &self.paths, &self.config.hnsw)
357            {
358                tracing::error!("Failed to recover and flush HNSW on drop: {}", err);
359            }
360            return;
361        }
362
363        let hnsw_guard = match self.hnsw_index.read() {
364            Ok(g) => g,
365            Err(_) => {
366                tracing::warn!("HNSW RwLock poisoned on drop — skipping save");
367                return;
368            }
369        };
370
371        if let Err(err) = hnsw_ops::save_hnsw_sidecar(
372            &hnsw_guard,
373            &self.paths.hnsw_dir,
374            &self.paths.hnsw_basename,
375        ) {
376            tracing::error!("Failed to save HNSW index on drop: {}", err);
377        }
378
379        // Flush key mappings to SQLite
380        if let Err(e) = self
381            .pool
382            .with_write_conn(|conn| hnsw_guard.flush_keymap(conn))
383        {
384            tracing::error!("Failed to flush HNSW keymap on drop: {}", e);
385        }
386    }
387}
388
389impl MemoryStore {
390    /// Run read-only work on a pooled reader connection on a blocking thread.
391    ///
392    /// This prevents SQLite I/O from stalling the tokio executor while allowing
393    /// multiple concurrent readers under WAL mode.
394    async fn with_read_conn<F, T>(&self, f: F) -> Result<T, MemoryError>
395    where
396        F: FnOnce(&rusqlite::Connection) -> Result<T, MemoryError> + Send + 'static,
397        T: Send + 'static,
398    {
399        let inner = self.inner.clone();
400        tokio::task::spawn_blocking(move || -> Result<T, MemoryError> {
401            inner.pool.with_read_conn(f)
402        })
403        .await
404        .map_err(|e| MemoryError::Other(format!("Blocking task panicked: {}", e)))?
405    }
406
407    /// Run write-capable work on the single writer connection on a blocking thread.
408    async fn with_write_conn<F, T>(&self, f: F) -> Result<T, MemoryError>
409    where
410        F: FnOnce(&rusqlite::Connection) -> Result<T, MemoryError> + Send + 'static,
411        T: Send + 'static,
412    {
413        let inner = self.inner.clone();
414        tokio::task::spawn_blocking(move || -> Result<T, MemoryError> {
415            inner.pool.with_write_conn(f)
416        })
417        .await
418        .map_err(|e| MemoryError::Other(format!("Blocking task panicked: {}", e)))?
419    }
420
421    async fn persist_search_receipt(
422        &self,
423        receipt: &VectorSearchReceiptV1,
424    ) -> Result<(), MemoryError> {
425        let receipt = receipt.clone();
426        self.with_write_conn(move |conn| db::store_search_receipt(conn, &receipt))
427            .await
428    }
429
430    /// Run HNSW search on a blocking thread to avoid holding std::sync::RwLock
431    /// across await points (CONC-001).
432    #[cfg(feature = "hnsw")]
433    async fn hnsw_search_blocking(
434        &self,
435        query_embedding: Vec<f32>,
436        candidates: usize,
437    ) -> Vec<HnswHit> {
438        let inner = self.inner.clone();
439        tokio::task::spawn_blocking(move || {
440            let guard = inner.hnsw_index.read().unwrap_or_else(|e| e.into_inner());
441            match guard.search(&query_embedding, candidates) {
442                Ok(hits) => hits,
443                Err(e) => {
444                    tracing::error!(
445                        "HNSW search failed, falling back to brute-force vector search: {}",
446                        e
447                    );
448                    Vec::new()
449                }
450            }
451        })
452        .await
453        .unwrap_or_else(|e| {
454            tracing::error!("HNSW search blocking task panicked: {}", e);
455            Vec::new()
456        })
457    }
458
459    #[cfg(feature = "hnsw")]
460    fn sync_pending_hnsw_ops_blocking(&self) -> Result<usize, MemoryError> {
461        hnsw_ops::sync_pending_hnsw_sidecar(&self.inner)
462    }
463
464    #[cfg(feature = "hnsw")]
465    async fn sync_pending_hnsw_ops(&self) -> Result<usize, MemoryError> {
466        let inner = self.inner.clone();
467        tokio::task::spawn_blocking(move || hnsw_ops::sync_pending_hnsw_sidecar(&inner))
468            .await
469            .map_err(|e| MemoryError::Other(format!("Blocking task panicked: {}", e)))?
470    }
471
472    #[cfg(feature = "hnsw")]
473    async fn sync_pending_hnsw_ops_best_effort(&self, operation: &'static str) {
474        if let Err(err) = self.sync_pending_hnsw_ops().await {
475            tracing::warn!(
476                operation,
477                error = %err,
478                "SQLite write committed but HNSW sidecar sync is still pending"
479            );
480        } else {
481            self.maybe_flush_hnsw();
482        }
483    }
484
485    /// Open or create a memory store at the configured base directory.
486    ///
487    /// Creates the directory if it doesn't exist, opens/creates SQLite,
488    /// runs migrations, and initializes the HNSW index.
489    pub fn open(config: MemoryConfig) -> Result<Self, MemoryError> {
490        let config = config.normalize_and_validate()?;
491        let embedder = Box::new(OllamaEmbedder::try_new(&config.embedding)?);
492        Self::open_with_embedder(config, embedder)
493    }
494
495    /// Open with a custom embedder (for testing or non-Ollama providers).
496    #[allow(unused_mut)] // `config` is mutated only when the `hnsw` feature is enabled
497    pub fn open_with_embedder(
498        mut config: MemoryConfig,
499        embedder: Box<dyn Embedder>,
500    ) -> Result<Self, MemoryError> {
501        config = config.normalize_and_validate()?;
502        if embedder.dimensions() != config.embedding.dimensions {
503            return Err(MemoryError::DimensionMismatch {
504                expected: config.embedding.dimensions,
505                actual: embedder.dimensions(),
506            });
507        }
508        config.embedding.model = embedder.model_name().to_string();
509
510        let paths = StoragePaths::new(&config.base_dir);
511
512        // Create directory if needed
513        std::fs::create_dir_all(&paths.base_dir).map_err(|e| {
514            MemoryError::StorageError(format!(
515                "Failed to create directory {}: {}",
516                paths.base_dir.display(),
517                e
518            ))
519        })?;
520
521        let pool = pool::SqlitePool::open(&paths.sqlite_path, &config.pool, &config.limits)?;
522        pool.with_write_conn(|conn| db::check_embedding_metadata(conn, &config.embedding))?;
523
524        // Ensure HNSW dimensions match the embedding config
525        #[cfg(feature = "hnsw")]
526        {
527            config.hnsw.dimensions = config.embedding.dimensions;
528        }
529
530        let token_counter = config
531            .token_counter
532            .clone()
533            .unwrap_or_else(tokenizer::default_token_counter);
534
535        #[cfg(feature = "hnsw")]
536        let hnsw_index = {
537            let hnsw_config = config.hnsw.clone();
538
539            let embeddings_dirty = pool.with_read_conn(db::is_embeddings_dirty)?;
540            let pending_index_ops = pool.with_read_conn(db::pending_index_op_count)?;
541
542            if embeddings_dirty {
543                // Embedding model changed — old HNSW index is useless.
544                // Create a fresh index; reembed_all() will rebuild it.
545                tracing::warn!(
546                    "Embedding model changed — creating fresh HNSW index (old index is stale)"
547                );
548                pool.with_write_conn(|conn| {
549                    db::clear_all_pending_index_ops(conn)?;
550                    db::set_sidecar_dirty(conn, false)?;
551                    Ok(())
552                })?;
553                HnswIndex::new(hnsw_config)?
554            } else if pending_index_ops > 0 || pool.with_read_conn(db::is_sidecar_dirty)? {
555                tracing::warn!(
556                    pending_index_ops,
557                    "Recovering HNSW sidecar from SQLite because durable sidecar work exists"
558                );
559                hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?
560            } else if paths.hnsw_files_exist() {
561                tracing::info!("Loading HNSW index from {:?}", paths.hnsw_dir);
562                match HnswIndex::load(&paths.hnsw_dir, &paths.hnsw_basename, hnsw_config.clone()) {
563                    Ok(index) => {
564                        // Load key mappings from SQLite
565                        if let Err(e) = pool.with_write_conn(|conn| index.load_keymap(conn)) {
566                            tracing::warn!("Failed to load HNSW key mappings: {}. Mappings will be empty until rebuild.", e);
567                        }
568
569                        // Stale index detection: compare HNSW entry count vs SQLite
570                        // embedding count. A mismatch means the app crashed before
571                        // flushing HNSW, or keys were lost.
572                        let hnsw_count = index.len();
573                        let sqlite_count: i64 = pool.with_read_conn(|conn| {
574                            Ok(conn.query_row(
575                                    "SELECT (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
576                                        (SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
577                                        (SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
578                                        (SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
579                                    [],
580                                    |row| row.get(0),
581                                )?)
582                        })?;
583
584                        let drift = (sqlite_count - hnsw_count as i64).abs();
585                        if drift > 0 {
586                            tracing::warn!(
587                                hnsw_count,
588                                sqlite_count,
589                                drift,
590                                "HNSW index is stale — {} entries differ from SQLite. \
591                                 Likely caused by unclean shutdown. Triggering inline rebuild.",
592                                drift
593                            );
594                            // Discard the stale index and rebuild from SQLite
595                            let rebuilt =
596                                hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?;
597                            tracing::info!(
598                                active = rebuilt.len(),
599                                "HNSW index rebuilt after stale detection"
600                            );
601                            rebuilt
602                        } else {
603                            tracing::info!(
604                                "HNSW index loaded ({} active keys, in sync with SQLite)",
605                                hnsw_count
606                            );
607                            index
608                        }
609                    }
610                    Err(e) => {
611                        tracing::warn!(
612                            "Failed to load HNSW index: {}. Rebuilding sidecar from authoritative SQLite rows.",
613                            e
614                        );
615                        hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?
616                    }
617                }
618            } else {
619                // Check if SQLite has embeddings that should be in the index.
620                // This happens when: sidecar files were deleted, data dir was
621                // partially copied, app crashed before first flush, or HNSW was
622                // added after data already existed.
623                let orphan_count: i64 = pool.with_read_conn(|conn| {
624                    Ok(conn.query_row(
625                        "SELECT (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
626                                (SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
627                                (SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
628                                (SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
629                        [],
630                        |row| row.get(0),
631                    )?)
632                })?;
633
634                if orphan_count > 0 {
635                    tracing::warn!(
636                        orphan_count,
637                        "HNSW sidecar files missing but {} embeddings exist in SQLite — \
638                         rebuilding index inline",
639                        orphan_count
640                    );
641                    let new_index =
642                        hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?;
643                    tracing::info!(
644                        active = new_index.len(),
645                        "HNSW index rebuilt from SQLite embeddings"
646                    );
647                    new_index
648                } else {
649                    tracing::info!("Creating new empty HNSW index (no embeddings in SQLite)");
650                    HnswIndex::new(hnsw_config)?
651                }
652            }
653        };
654
655        let store = Self {
656            inner: Arc::new(MemoryStoreInner {
657                pool,
658                embedder,
659                embedding_permits: Arc::new(tokio::sync::Semaphore::new(
660                    config.limits.max_embedding_concurrency,
661                )),
662                config,
663                paths,
664                token_counter,
665                #[cfg(feature = "hnsw")]
666                hnsw_index: std::sync::RwLock::new(hnsw_index),
667            }),
668        };
669
670        #[cfg(feature = "hnsw")]
671        if let Err(err) = store.sync_pending_hnsw_ops_blocking() {
672            tracing::warn!(
673                error = %err,
674                "Failed to reconcile pending HNSW sidecar ops during open; sidecar replay remains pending"
675            );
676        }
677
678        Ok(store)
679    }
680
681    async fn with_embedding_permit(
682        &self,
683    ) -> Result<tokio::sync::OwnedSemaphorePermit, MemoryError> {
684        self.inner
685            .embedding_permits
686            .clone()
687            .acquire_owned()
688            .await
689            .map_err(|e| MemoryError::Other(format!("embedding semaphore closed: {e}")))
690    }
691
692    async fn embed_text_internal(&self, text: &str) -> Result<Vec<f32>, MemoryError> {
693        let _permit = self.with_embedding_permit().await?;
694        let embedding = self.inner.embedder.embed(text).await?;
695        db::validate_embedding(&embedding, self.inner.config.embedding.dimensions)?;
696        Ok(embedding)
697    }
698
699    async fn embed_batch_internal(&self, texts: Vec<String>) -> Result<Vec<Vec<f32>>, MemoryError> {
700        let requested = texts.len();
701        let _permit = self.with_embedding_permit().await?;
702        let embeddings = self.inner.embedder.embed_batch(texts).await?;
703        db::validate_embedding_batch(
704            &embeddings,
705            requested,
706            self.inner.config.embedding.dimensions,
707        )?;
708        Ok(embeddings)
709    }
710
711    fn validate_embedding_dimensions(&self, embedding: &[f32]) -> Result<(), MemoryError> {
712        db::validate_embedding(embedding, self.inner.config.embedding.dimensions)
713    }
714
715    fn validate_content(&self, field: &'static str, content: &str) -> Result<(), MemoryError> {
716        if content.is_empty() {
717            return Err(MemoryError::InvalidConfig {
718                field,
719                reason: "content must not be empty".to_string(),
720            });
721        }
722
723        let limit = self.inner.config.limits.max_content_bytes;
724        if content.len() > limit {
725            return Err(MemoryError::ContentTooLarge {
726                size: content.len(),
727                limit,
728            });
729        }
730
731        Ok(())
732    }
733
734    fn validate_confidence(confidence: f32) -> Result<(), MemoryError> {
735        if !confidence.is_finite() || !(0.0..=1.0).contains(&confidence) {
736            return Err(MemoryError::InvalidConfig {
737                field: "episodes.confidence",
738                reason: "confidence must be finite and within [0.0, 1.0]".to_string(),
739            });
740        }
741        Ok(())
742    }
743
744    // ─── HNSW Management ───────────────────────────────────────
745
746    /// Rebuild feature-gated TurboQuant artifacts from authoritative SQLite f32 embeddings.
747    #[cfg(feature = "turbo-quant-codec")]
748    pub async fn rebuild_vector_artifacts(
749        &self,
750    ) -> Result<VectorArtifactBuildReceiptV1, MemoryError> {
751        let dim = self.inner.config.embedding.dimensions;
752        let search = self.inner.config.search.clone();
753        self.with_write_conn(move |conn| {
754            db::rebuild_turbo_quant_artifacts(
755                conn,
756                dim,
757                search.turbo_quant_bits,
758                search.turbo_quant_projections,
759                search.turbo_quant_seed,
760            )
761        })
762        .await
763    }
764
765    /// Rebuild the HNSW index from SQLite f32 embeddings.
766    ///
767    /// Call this if sidecar files are missing, corrupted, or after `reembed_all()`.
768    #[cfg(feature = "hnsw")]
769    pub async fn rebuild_hnsw_index(
770        &self,
771    ) -> Result<crate::types::VectorArtifactBuildReceiptV1, MemoryError> {
772        tracing::info!("Rebuilding HNSW index from SQLite embeddings...");
773        let hnsw_config = self.inner.config.hnsw.clone();
774        let (new_index, build_receipt) = self
775            .with_read_conn(move |conn| hnsw_ops::rebuild_hnsw_from_sqlite(conn, &hnsw_config))
776            .await?;
777
778        {
779            let mut guard = self
780                .inner
781                .hnsw_index
782                .write()
783                .unwrap_or_else(|e| e.into_inner());
784            *guard = new_index.clone();
785        }
786
787        hnsw_ops::save_hnsw_sidecar(
788            &new_index,
789            &self.inner.paths.hnsw_dir,
790            &self.inner.paths.hnsw_basename,
791        )?;
792        self.inner.pool.with_write_conn(|conn| {
793            new_index.flush_keymap(conn)?;
794            db::clear_all_pending_index_ops(conn)?;
795            db::set_sidecar_dirty(conn, false)?;
796            Ok(())
797        })?;
798
799        tracing::info!(active = new_index.len(), receipt_generation_id = ?build_receipt.generation_id, "HNSW index rebuilt");
800
801        Ok(build_receipt)
802    }
803
804    /// Opportunistically flush HNSW if the configured interval has elapsed.
805    ///
806    /// Cheap no-op when `flush_interval_secs` is None or the interval hasn't
807    /// elapsed yet (just an atomic load + epoch comparison).
808    #[cfg(feature = "hnsw")]
809    fn maybe_flush_hnsw(&self) {
810        if let Some(interval) = self.inner.config.hnsw.flush_interval_secs {
811            let guard = self
812                .inner
813                .hnsw_index
814                .read()
815                .unwrap_or_else(|e| e.into_inner());
816            if guard.should_flush(interval) {
817                drop(guard); // release read lock before flushing
818                if let Err(e) = self.flush_hnsw() {
819                    tracing::warn!("Opportunistic HNSW flush failed: {}", e);
820                } else {
821                    let guard = self
822                        .inner
823                        .hnsw_index
824                        .read()
825                        .unwrap_or_else(|e| e.into_inner());
826                    guard.update_last_flush_epoch();
827                    tracing::info!("Opportunistic HNSW flush completed");
828                }
829            }
830        }
831    }
832
833    /// Persist the HNSW graph, vector data, and key mappings to disk.
834    ///
835    /// Called automatically on drop, but can be called explicitly for durability.
836    #[cfg(feature = "hnsw")]
837    pub fn flush_hnsw(&self) -> Result<(), MemoryError> {
838        let pending_ops = self.inner.pool.with_read_conn(db::pending_index_op_count)?;
839        if pending_ops > 0 {
840            tracing::info!(
841                pending_ops,
842                "Flushing HNSW via authoritative SQLite rebuild because pending durable sidecar work exists"
843            );
844            let rebuilt = hnsw_ops::recover_hnsw_sidecar_sync(
845                &self.inner.pool,
846                &self.inner.paths,
847                &self.inner.config.hnsw,
848            )?;
849            let mut guard = self
850                .inner
851                .hnsw_index
852                .write()
853                .unwrap_or_else(|e| e.into_inner());
854            *guard = rebuilt;
855            return Ok(());
856        }
857
858        let index = self
859            .inner
860            .hnsw_index
861            .write()
862            .unwrap_or_else(|e| e.into_inner());
863        hnsw_ops::save_hnsw_sidecar(
864            &index,
865            &self.inner.paths.hnsw_dir,
866            &self.inner.paths.hnsw_basename,
867        )?;
868
869        // Flush key mappings to SQLite
870        self.inner.pool.with_write_conn(|conn| {
871            index.flush_keymap(conn)?;
872            db::clear_all_pending_index_ops(conn)?;
873            db::set_sidecar_dirty(conn, false)?;
874            Ok(())
875        })?;
876        Ok(())
877    }
878
879    /// Compact the HNSW index by rebuilding without tombstones.
880    ///
881    /// Only rebuilds if the deleted ratio exceeds the compaction threshold.
882    #[cfg(feature = "hnsw")]
883    pub async fn compact_hnsw(&self) -> Result<(), MemoryError> {
884        if !self
885            .inner
886            .hnsw_index
887            .read()
888            .unwrap_or_else(|e| e.into_inner())
889            .needs_compaction()
890        {
891            tracing::info!("HNSW compaction not needed (deleted ratio below threshold)");
892            return Ok(());
893        }
894        let _receipt = self.rebuild_hnsw_index().await?;
895        Ok(())
896    }
897
898    // ─── Integrity & Diagnostics ────────────────────────────────
899
900    /// Verify database integrity.
901    ///
902    /// In `Quick` mode, checks table existence and row counts.
903    /// In `Full` mode, also verifies FTS consistency and runs SQLite integrity_check.
904    pub async fn verify_integrity(
905        &self,
906        mode: db::VerifyMode,
907    ) -> Result<db::IntegrityReport, MemoryError> {
908        let use_writer = mode == db::VerifyMode::Full;
909        let mut report = if use_writer {
910            self.with_write_conn(move |conn| db::verify_integrity_sync(conn, mode))
911                .await?
912        } else {
913            self.with_read_conn(move |conn| db::verify_integrity_sync(conn, mode))
914                .await?
915        };
916
917        #[cfg(feature = "hnsw")]
918        {
919            let hnsw_vectors = self
920                .inner
921                .hnsw_index
922                .read()
923                .unwrap_or_else(|e| e.into_inner())
924                .vector_snapshot();
925            let hnsw_dims = self.inner.config.embedding.dimensions;
926            let hnsw_files_exist = self.inner.paths.hnsw_files_exist();
927
928            let hnsw_issues = if use_writer {
929                let hnsw_vectors = hnsw_vectors.clone();
930                self.with_write_conn(move |conn| {
931                    verify_hnsw_key_level_integrity(
932                        conn,
933                        hnsw_dims,
934                        &hnsw_vectors,
935                        hnsw_files_exist,
936                    )
937                })
938                .await?
939            } else {
940                let hnsw_vectors = hnsw_vectors.clone();
941                self.with_read_conn(move |conn| {
942                    verify_hnsw_key_level_integrity(
943                        conn,
944                        hnsw_dims,
945                        &hnsw_vectors,
946                        hnsw_files_exist,
947                    )
948                })
949                .await?
950            };
951            report.issues.extend(hnsw_issues);
952        }
953
954        report.ok = report.issues.is_empty();
955        Ok(report)
956    }
957
958    /// Reconcile detected integrity issues.
959    ///
960    /// - `ReportOnly`: no-op, just returns the integrity report.
961    /// - `RebuildFts`: rebuilds all FTS indexes from source data.
962    /// - `ReEmbed`: not yet implemented (requires async embedding calls).
963    pub async fn reconcile(
964        &self,
965        action: db::ReconcileAction,
966    ) -> Result<db::IntegrityReport, MemoryError> {
967        match action {
968            db::ReconcileAction::ReportOnly => self.verify_integrity(db::VerifyMode::Full).await,
969            db::ReconcileAction::RebuildFts => {
970                self.with_write_conn(db::reconcile_fts).await?;
971                #[cfg(feature = "hnsw")]
972                self.sync_pending_hnsw_ops_best_effort("reconcile_rebuild_fts")
973                    .await;
974                self.verify_integrity(db::VerifyMode::Full).await
975            }
976            db::ReconcileAction::ReEmbed => {
977                self.reembed_all().await?;
978                self.verify_integrity(db::VerifyMode::Full).await
979            }
980        }
981    }
982
983    /// Get the current configuration.
984    pub fn config(&self) -> &MemoryConfig {
985        &self.inner.config
986    }
987
988    /// View the store as a derived graph over documents, chunks, facts, sessions, messages,
989    /// episodes, namespaces, and semantic similarity edges.
990    pub fn graph_view(&self) -> Arc<dyn GraphView> {
991        graph::graph_view(self.inner.clone())
992    }
993
994    // ─── Search ─────────────────────────────────────────────────
995
996    /// Hybrid search across facts, document chunks, and searchable episodes.
997    pub async fn search(
998        &self,
999        query: &str,
1000        top_k: Option<usize>,
1001        namespaces: Option<&[&str]>,
1002        source_types: Option<&[SearchSourceType]>,
1003    ) -> Result<Vec<SearchResult>, MemoryError> {
1004        Ok(self
1005            .search_with_context(
1006                query,
1007                top_k,
1008                namespaces,
1009                source_types,
1010                SearchContext::default_now(),
1011            )
1012            .await?
1013            .results)
1014    }
1015
1016    /// Hybrid search with an explicit deterministic context and optional receipt.
1017    pub async fn search_with_context(
1018        &self,
1019        query: &str,
1020        top_k: Option<usize>,
1021        namespaces: Option<&[&str]>,
1022        source_types: Option<&[SearchSourceType]>,
1023        context: SearchContext,
1024    ) -> Result<SearchResponse, MemoryError> {
1025        let k = top_k
1026            .unwrap_or(self.inner.config.search.default_top_k)
1027            .min(MAX_TOP_K);
1028
1029        let query_embedding = self.embed_text_internal(query).await?;
1030
1031        #[cfg(feature = "hnsw")]
1032        let hnsw_hits = if context.exactness_profile == ExactnessProfile::PreferExact
1033            || self.inner.config.search.uses_turbo_quant_backend()
1034        {
1035            Vec::new()
1036        } else {
1037            let candidates = self
1038                .inner
1039                .config
1040                .search
1041                .candidate_pool_size
1042                .max(k.saturating_mul(3))
1043                .min(MAX_HNSW_CANDIDATES);
1044            self.hnsw_search_blocking(query_embedding.clone(), candidates)
1045                .await
1046        };
1047
1048        let q = query.to_string();
1049        let config = self.inner.config.search.clone();
1050        let ns_owned = to_owned_string_vec(namespaces);
1051        let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|s| s.to_vec());
1052        let context_owned = context.clone();
1053
1054        #[cfg(feature = "hnsw")]
1055        let hnsw_hits_owned = hnsw_hits;
1056
1057        let response = self
1058            .with_read_conn(move |conn| {
1059                if db::is_embeddings_dirty(conn)? {
1060                    tracing::warn!(
1061                        "Embeddings are stale after model change — search quality is degraded. \
1062                     Call reembed_all() to regenerate embeddings."
1063                    );
1064                }
1065                let ns_refs = as_str_slice(&ns_owned);
1066                let ns_slice: Option<&[&str]> = ns_refs.as_deref();
1067                let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
1068
1069                #[cfg(feature = "hnsw")]
1070                {
1071                    let mut execution = if hnsw_hits_owned.is_empty() {
1072                        search::hybrid_search_detailed_with_context(
1073                            conn,
1074                            &q,
1075                            &query_embedding,
1076                            &config,
1077                            &context_owned,
1078                            k,
1079                            ns_slice,
1080                            st_slice,
1081                            None,
1082                        )
1083                    } else {
1084                        search::hybrid_search_with_hnsw_detailed_with_context(
1085                            conn,
1086                            &q,
1087                            &query_embedding,
1088                            &config,
1089                            &context_owned,
1090                            k,
1091                            ns_slice,
1092                            st_slice,
1093                            None,
1094                            &hnsw_hits_owned,
1095                        )
1096                    }?;
1097                    if context_owned.receipts_enabled()
1098                        && context_owned.exactness_profile == ExactnessProfile::PreferExact
1099                    {
1100                        if let Some(receipt) = execution.receipt.as_mut() {
1101                            receipt.search_profile = "hybrid_prefer_exact".to_string();
1102                        }
1103                    }
1104                    Ok(SearchResponse {
1105                        results: execution
1106                            .results
1107                            .into_iter()
1108                            .map(|result| result.result)
1109                            .collect(),
1110                        receipt: execution.receipt,
1111                    })
1112                }
1113                #[cfg(not(feature = "hnsw"))]
1114                {
1115                    let execution = search::hybrid_search_detailed_with_context(
1116                        conn,
1117                        &q,
1118                        &query_embedding,
1119                        &config,
1120                        &context_owned,
1121                        k,
1122                        ns_slice,
1123                        st_slice,
1124                        None,
1125                    )?;
1126                    Ok(SearchResponse {
1127                        results: execution
1128                            .results
1129                            .into_iter()
1130                            .map(|result| result.result)
1131                            .collect(),
1132                        receipt: execution.receipt,
1133                    })
1134                }
1135            })
1136            .await?;
1137        if let Some(receipt) = &response.receipt {
1138            self.persist_search_receipt(receipt).await?;
1139        }
1140        Ok(response)
1141    }
1142
1143    /// Full-text search only (no embeddings needed).
1144    pub async fn search_fts_only(
1145        &self,
1146        query: &str,
1147        top_k: Option<usize>,
1148        namespaces: Option<&[&str]>,
1149        source_types: Option<&[SearchSourceType]>,
1150    ) -> Result<Vec<SearchResult>, MemoryError> {
1151        let k = top_k
1152            .unwrap_or(self.inner.config.search.default_top_k)
1153            .min(MAX_TOP_K);
1154        let q = query.to_string();
1155        let config = self.inner.config.search.clone();
1156        let ns_owned = to_owned_string_vec(namespaces);
1157        let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|s| s.to_vec());
1158        self.with_read_conn(move |conn| {
1159            let ns_refs = as_str_slice(&ns_owned);
1160            let ns_slice: Option<&[&str]> = ns_refs.as_deref();
1161            let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
1162            search::fts_only_search(conn, &q, &config, k, ns_slice, st_slice, None)
1163        })
1164        .await
1165    }
1166
1167    /// Vector similarity search only (no FTS).
1168    pub async fn search_vector_only(
1169        &self,
1170        query: &str,
1171        top_k: Option<usize>,
1172        namespaces: Option<&[&str]>,
1173        source_types: Option<&[SearchSourceType]>,
1174    ) -> Result<Vec<SearchResult>, MemoryError> {
1175        Ok(self
1176            .search_vector_only_with_context(
1177                query,
1178                top_k,
1179                namespaces,
1180                source_types,
1181                SearchContext::default_now(),
1182            )
1183            .await?
1184            .results)
1185    }
1186
1187    /// Vector similarity search with an explicit deterministic context and optional receipt.
1188    pub async fn search_vector_only_with_context(
1189        &self,
1190        query: &str,
1191        top_k: Option<usize>,
1192        namespaces: Option<&[&str]>,
1193        source_types: Option<&[SearchSourceType]>,
1194        context: SearchContext,
1195    ) -> Result<SearchResponse, MemoryError> {
1196        let k = top_k
1197            .unwrap_or(self.inner.config.search.default_top_k)
1198            .min(MAX_TOP_K);
1199        let query_embedding = self.embed_text_internal(query).await?;
1200
1201        #[cfg(feature = "hnsw")]
1202        let hnsw_hits = if context.exactness_profile == ExactnessProfile::PreferExact
1203            || self.inner.config.search.uses_turbo_quant_backend()
1204        {
1205            Vec::new()
1206        } else {
1207            let candidates = self
1208                .inner
1209                .config
1210                .search
1211                .candidate_pool_size
1212                .max(k.saturating_mul(3))
1213                .min(MAX_HNSW_CANDIDATES);
1214            self.hnsw_search_blocking(query_embedding.clone(), candidates)
1215                .await
1216        };
1217
1218        let config = self.inner.config.search.clone();
1219        let ns_owned = to_owned_string_vec(namespaces);
1220        let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|s| s.to_vec());
1221        let context_owned = context.clone();
1222
1223        #[cfg(feature = "hnsw")]
1224        let hnsw_hits_owned = hnsw_hits;
1225
1226        let response = self
1227            .with_read_conn(move |conn| {
1228                if db::is_embeddings_dirty(conn)? {
1229                    tracing::warn!(
1230                        "Embeddings are stale after model change — search quality is degraded. \
1231                     Call reembed_all() to regenerate embeddings."
1232                    );
1233                }
1234                let ns_refs = as_str_slice(&ns_owned);
1235                let ns_slice: Option<&[&str]> = ns_refs.as_deref();
1236                let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
1237
1238                #[cfg(feature = "hnsw")]
1239                {
1240                    let mut execution = if hnsw_hits_owned.is_empty() {
1241                        search::vector_only_search_detailed_with_context(
1242                            conn,
1243                            &query_embedding,
1244                            &config,
1245                            &context_owned,
1246                            k,
1247                            ns_slice,
1248                            st_slice,
1249                            None,
1250                        )
1251                    } else {
1252                        search::vector_only_search_with_hnsw_detailed_with_context(
1253                            conn,
1254                            &query_embedding,
1255                            &config,
1256                            &context_owned,
1257                            k,
1258                            ns_slice,
1259                            st_slice,
1260                            None,
1261                            &hnsw_hits_owned,
1262                        )
1263                    }?;
1264                    if context_owned.receipts_enabled()
1265                        && context_owned.exactness_profile == ExactnessProfile::PreferExact
1266                    {
1267                        if let Some(receipt) = execution.receipt.as_mut() {
1268                            receipt.search_profile = "vector_only_prefer_exact".to_string();
1269                        }
1270                    }
1271                    Ok(SearchResponse {
1272                        results: execution
1273                            .results
1274                            .into_iter()
1275                            .map(|result| result.result)
1276                            .collect(),
1277                        receipt: execution.receipt,
1278                    })
1279                }
1280                #[cfg(not(feature = "hnsw"))]
1281                {
1282                    let execution = search::vector_only_search_detailed_with_context(
1283                        conn,
1284                        &query_embedding,
1285                        &config,
1286                        &context_owned,
1287                        k,
1288                        ns_slice,
1289                        st_slice,
1290                        None,
1291                    )?;
1292                    Ok(SearchResponse {
1293                        results: execution
1294                            .results
1295                            .into_iter()
1296                            .map(|result| result.result)
1297                            .collect(),
1298                        receipt: execution.receipt,
1299                    })
1300                }
1301            })
1302            .await?;
1303        if let Some(receipt) = &response.receipt {
1304            self.persist_search_receipt(receipt).await?;
1305        }
1306        Ok(response)
1307    }
1308
1309    // ─── Explainable Search ───────────────────────────────────
1310
1311    /// Search with full score breakdown for each result.
1312    pub async fn search_explained(
1313        &self,
1314        query: &str,
1315        top_k: Option<usize>,
1316        namespaces: Option<&[&str]>,
1317        source_types: Option<&[SearchSourceType]>,
1318    ) -> Result<Vec<types::ExplainedResult>, MemoryError> {
1319        Ok(self
1320            .search_explained_with_context(
1321                query,
1322                top_k,
1323                namespaces,
1324                source_types,
1325                SearchContext::default_now(),
1326            )
1327            .await?
1328            .results)
1329    }
1330
1331    /// Search with full score breakdown under an explicit deterministic context.
1332    pub async fn search_explained_with_context(
1333        &self,
1334        query: &str,
1335        top_k: Option<usize>,
1336        namespaces: Option<&[&str]>,
1337        source_types: Option<&[SearchSourceType]>,
1338        context: SearchContext,
1339    ) -> Result<types::ExplainedSearchResponse, MemoryError> {
1340        let k = top_k
1341            .unwrap_or(self.inner.config.search.default_top_k)
1342            .min(MAX_TOP_K);
1343        let query_embedding = self.embed_text_internal(query).await?;
1344
1345        #[cfg(feature = "hnsw")]
1346        let hnsw_hits = if context.exactness_profile == ExactnessProfile::PreferExact {
1347            Vec::new()
1348        } else {
1349            let candidates = self
1350                .inner
1351                .config
1352                .search
1353                .candidate_pool_size
1354                .max(k.saturating_mul(3))
1355                .min(MAX_HNSW_CANDIDATES);
1356            self.hnsw_search_blocking(query_embedding.clone(), candidates)
1357                .await
1358        };
1359
1360        let q = query.to_string();
1361        let config = self.inner.config.search.clone();
1362        let ns_owned = to_owned_string_vec(namespaces);
1363        let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|value| value.to_vec());
1364        let context_owned = context.clone();
1365
1366        #[cfg(feature = "hnsw")]
1367        let hnsw_hits_owned = hnsw_hits;
1368
1369        let response = self
1370            .with_read_conn(move |conn| {
1371                let ns_refs = as_str_slice(&ns_owned);
1372                let ns_slice: Option<&[&str]> = ns_refs.as_deref();
1373                let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
1374
1375                #[cfg(feature = "hnsw")]
1376                {
1377                    let mut execution = if hnsw_hits_owned.is_empty() {
1378                        search::hybrid_search_detailed_with_context(
1379                            conn,
1380                            &q,
1381                            &query_embedding,
1382                            &config,
1383                            &context_owned,
1384                            k,
1385                            ns_slice,
1386                            st_slice,
1387                            None,
1388                        )
1389                    } else {
1390                        search::hybrid_search_with_hnsw_detailed_with_context(
1391                            conn,
1392                            &q,
1393                            &query_embedding,
1394                            &config,
1395                            &context_owned,
1396                            k,
1397                            ns_slice,
1398                            st_slice,
1399                            None,
1400                            &hnsw_hits_owned,
1401                        )
1402                    }?;
1403                    if context_owned.receipts_enabled()
1404                        && context_owned.exactness_profile == ExactnessProfile::PreferExact
1405                    {
1406                        if let Some(receipt) = execution.receipt.as_mut() {
1407                            receipt.search_profile = "hybrid_prefer_exact".to_string();
1408                        }
1409                    }
1410                    Ok(types::ExplainedSearchResponse {
1411                        results: execution.results,
1412                        receipt: execution.receipt,
1413                    })
1414                }
1415                #[cfg(not(feature = "hnsw"))]
1416                {
1417                    let execution = search::hybrid_search_detailed_with_context(
1418                        conn,
1419                        &q,
1420                        &query_embedding,
1421                        &config,
1422                        &context_owned,
1423                        k,
1424                        ns_slice,
1425                        st_slice,
1426                        None,
1427                    )?;
1428                    Ok(types::ExplainedSearchResponse {
1429                        results: execution.results,
1430                        receipt: execution.receipt,
1431                    })
1432                }
1433            })
1434            .await?;
1435        if let Some(receipt) = &response.receipt {
1436            self.persist_search_receipt(receipt).await?;
1437        }
1438        Ok(response)
1439    }
1440
1441    /// Load a durable search receipt by receipt/request ID.
1442    pub async fn get_search_receipt(
1443        &self,
1444        receipt_id: &str,
1445    ) -> Result<Option<VectorSearchReceiptV1>, MemoryError> {
1446        let receipt_id = receipt_id.to_string();
1447        self.with_read_conn(move |conn| db::get_search_receipt(conn, &receipt_id))
1448            .await
1449    }
1450
1451    /// Replay a durable search receipt with caller-supplied query text and filters.
1452    ///
1453    /// Receipts intentionally do not store query text or filter values. The
1454    /// caller supplies those inputs, and the stored receipt supplies the
1455    /// deterministic evaluation time and retrieval family for comparison.
1456    pub async fn replay_search_receipt(
1457        &self,
1458        receipt_id: &str,
1459        query: &str,
1460        top_k: Option<usize>,
1461        namespaces: Option<&[&str]>,
1462        source_types: Option<&[SearchSourceType]>,
1463    ) -> Result<SearchReplayReportV1, MemoryError> {
1464        let original_receipt = self.get_search_receipt(receipt_id).await?.ok_or_else(|| {
1465            MemoryError::SearchReceiptNotFound {
1466                receipt_id: receipt_id.to_string(),
1467            }
1468        })?;
1469
1470        let vector_only = original_receipt.search_profile.starts_with("vector_only");
1471        let replay_top_k = top_k.or_else(|| Some(original_receipt.result_ids.len().max(1)));
1472        let replay_receipt_id = format!("{receipt_id}:replay:{}", uuid::Uuid::new_v4());
1473        let mut context = SearchContext::at(original_receipt.evaluation_time);
1474        context.receipt_mode = ReceiptMode::ReturnReceipt;
1475        context.request_id = Some(replay_receipt_id.clone());
1476        context.trace_id = original_receipt.trace_id.clone();
1477        context.attempt_family_id = original_receipt
1478            .attempt_family_id
1479            .clone()
1480            .or_else(|| Some(original_receipt.receipt_id.clone()));
1481        context.attempt_id = Some(replay_receipt_id.clone());
1482        context.replay_of = Some(original_receipt.receipt_id.clone());
1483        context.query_text_digest = original_receipt.query_text_digest.clone();
1484        context.query_input_digest = original_receipt.query_input_digest.clone();
1485        context.filter_digest = original_receipt.filter_digest.clone();
1486        context.redaction_state = original_receipt.redaction_state.clone();
1487        context.budget_id = original_receipt.budget_id.clone();
1488        context.exactness_profile = if original_receipt.approximate {
1489            ExactnessProfile::AllowApproximate
1490        } else {
1491            ExactnessProfile::PreferExact
1492        };
1493
1494        let replay_response = if vector_only {
1495            self.search_vector_only_with_context(
1496                query,
1497                replay_top_k,
1498                namespaces,
1499                source_types,
1500                context,
1501            )
1502            .await?
1503        } else {
1504            self.search_with_context(query, replay_top_k, namespaces, source_types, context)
1505                .await?
1506        };
1507        let replay_receipt = replay_response
1508            .receipt
1509            .ok_or_else(|| MemoryError::Other("replay did not produce a receipt".to_string()))?;
1510
1511        let query_embedding_digest_matches =
1512            original_receipt.query_embedding_digest == replay_receipt.query_embedding_digest;
1513        let result_ids_match = original_receipt.result_ids == replay_receipt.result_ids;
1514        let missing_result_ids = original_receipt
1515            .result_ids
1516            .iter()
1517            .filter(|id| !replay_receipt.result_ids.contains(*id))
1518            .cloned()
1519            .collect();
1520        let added_result_ids = replay_receipt
1521            .result_ids
1522            .iter()
1523            .filter(|id| !original_receipt.result_ids.contains(*id))
1524            .cloned()
1525            .collect();
1526
1527        Ok(SearchReplayReportV1 {
1528            receipt_id: original_receipt.receipt_id.clone(),
1529            replay_receipt_id,
1530            original_receipt,
1531            replay_receipt,
1532            query_embedding_digest_matches,
1533            result_ids_match,
1534            missing_result_ids,
1535            added_result_ids,
1536            vector_only,
1537        })
1538    }
1539
1540    // ─── Embedding Displacement ───────────────────────────────
1541
1542    /// Compute embedding displacement between two texts.
1543    pub async fn embedding_displacement(
1544        &self,
1545        text_a: &str,
1546        text_b: &str,
1547    ) -> Result<types::EmbeddingDisplacement, MemoryError> {
1548        let emb_a = self.embed_text_internal(text_a).await?;
1549        let emb_b = self.embed_text_internal(text_b).await?;
1550        Self::embedding_displacement_from_vecs(&emb_a, &emb_b)
1551    }
1552
1553    /// Compute embedding displacement from pre-computed vectors.
1554    pub fn embedding_displacement_from_vecs(
1555        a: &[f32],
1556        b: &[f32],
1557    ) -> Result<types::EmbeddingDisplacement, MemoryError> {
1558        if a.len() != b.len() {
1559            return Err(MemoryError::DimensionMismatch {
1560                expected: a.len(),
1561                actual: b.len(),
1562            });
1563        }
1564        let cosine_sim = search::cosine_similarity(a, b)?;
1565
1566        let euclidean_dist: f32 = a
1567            .iter()
1568            .zip(b.iter())
1569            .map(|(x, y)| (x - y) * (x - y))
1570            .sum::<f32>()
1571            .sqrt();
1572
1573        let mag_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
1574        let mag_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
1575
1576        Ok(types::EmbeddingDisplacement {
1577            cosine_similarity: cosine_sim,
1578            euclidean_distance: euclidean_dist,
1579            magnitude_a: mag_a,
1580            magnitude_b: mag_b,
1581        })
1582    }
1583
1584    // ─── Utility ────────────────────────────────────────────────
1585
1586    /// Chunk text using the configured strategy and token counter.
1587    pub fn chunk_text(&self, text: &str) -> Vec<TextChunk> {
1588        chunker::chunk_text(
1589            text,
1590            &self.inner.config.chunking,
1591            self.inner.token_counter.as_ref(),
1592        )
1593    }
1594
1595    /// Embed a single text via the configured provider.
1596    pub async fn embed(&self, text: &str) -> Result<Vec<f32>, MemoryError> {
1597        self.embed_text_internal(text).await
1598    }
1599
1600    /// Embed multiple texts in a batch.
1601    pub async fn embed_batch(&self, texts: &[&str]) -> Result<Vec<Vec<f32>>, MemoryError> {
1602        let owned: Vec<String> = texts.iter().map(|s| s.to_string()).collect();
1603        self.embed_batch_internal(owned).await
1604    }
1605
1606    /// Get database statistics.
1607    pub async fn stats(&self) -> Result<MemoryStats, MemoryError> {
1608        let db_path = self.inner.paths.sqlite_path.clone();
1609        self.with_read_conn(move |conn| {
1610            let total_facts: u64 =
1611                conn.query_row("SELECT COUNT(*) FROM facts", [], |r| r.get(0))?;
1612            let total_documents: u64 =
1613                conn.query_row("SELECT COUNT(*) FROM documents", [], |r| r.get(0))?;
1614            let total_chunks: u64 =
1615                conn.query_row("SELECT COUNT(*) FROM chunks", [], |r| r.get(0))?;
1616            let total_sessions: u64 =
1617                conn.query_row("SELECT COUNT(*) FROM sessions", [], |r| r.get(0))?;
1618            let total_messages: u64 =
1619                conn.query_row("SELECT COUNT(*) FROM messages", [], |r| r.get(0))?;
1620
1621            let db_size = std::fs::metadata(&db_path).map(|m| m.len()).unwrap_or(0);
1622
1623            let (model, dims): (Option<String>, Option<usize>) = conn
1624                .query_row(
1625                    "SELECT model_name, dimensions FROM embedding_metadata WHERE id = 1",
1626                    [],
1627                    |r| Ok((Some(r.get(0)?), Some(r.get(1)?))),
1628                )
1629                .unwrap_or((None, None));
1630
1631            Ok(MemoryStats {
1632                total_facts,
1633                total_documents,
1634                total_chunks,
1635                total_sessions,
1636                total_messages,
1637                database_size_bytes: db_size,
1638                embedding_model: model,
1639                embedding_dimensions: dims,
1640            })
1641        })
1642        .await
1643    }
1644
1645    /// Return distinct scope_domain values stored in document metadata.
1646    ///
1647    /// Queries `json_extract(metadata, '$.scope_domain')` across all documents
1648    /// and returns the unique non-null values. Used by the Recall app to populate
1649    /// the scope picker dynamically instead of relying on a hardcoded list.
1650    pub async fn list_scope_domains(&self) -> Result<Vec<String>, MemoryError> {
1651        self.with_read_conn(|conn| {
1652            let mut stmt = conn.prepare(
1653                "SELECT DISTINCT json_extract(metadata, '$.scope_domain') \
1654                 FROM documents \
1655                 WHERE json_extract(metadata, '$.scope_domain') IS NOT NULL",
1656            )?;
1657            let domains: Vec<String> = stmt
1658                .query_map([], |row| row.get::<_, String>(0))?
1659                .filter_map(|r| r.ok())
1660                .collect();
1661            Ok(domains)
1662        })
1663        .await
1664    }
1665
1666    /// Check if embeddings need re-generation after a model change.
1667    pub async fn embeddings_are_dirty(&self) -> Result<bool, MemoryError> {
1668        self.with_read_conn(db::is_embeddings_dirty).await
1669    }
1670
1671    /// Re-embed all facts, chunks, messages, and episodes. Call after changing embedding models.
1672    pub async fn reembed_all(&self) -> Result<usize, MemoryError> {
1673        let mut count = 0usize;
1674        let batch_size = self.inner.config.embedding.batch_size;
1675        let dims = self.inner.config.embedding.dimensions;
1676
1677        // ─── Facts ──────────────────────────────────────────────────
1678        let fact_contents: Vec<(String, String)> = self
1679            .with_read_conn(|conn| {
1680                let mut stmt = conn.prepare("SELECT id, content FROM facts")?;
1681                let result = stmt
1682                    .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1683                    .collect::<Result<Vec<_>, _>>()?;
1684                Ok(result)
1685            })
1686            .await?;
1687
1688        let mut fact_count = 0usize;
1689        for batch in fact_contents.chunks(batch_size) {
1690            let texts: Vec<String> = batch.iter().map(|(_, c)| c.clone()).collect();
1691            let embeddings = self.embed_batch_internal(texts).await?;
1692            for embedding in &embeddings {
1693                self.validate_embedding_dimensions(embedding)?;
1694            }
1695
1696            let quantizer = Quantizer::new(dims);
1697            let updates: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = batch
1698                .iter()
1699                .zip(embeddings.iter())
1700                .map(|((id, _), emb)| {
1701                    // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
1702                    let q8 = quantizer
1703                        .quantize(emb)
1704                        .map(|qv| quantize::pack_quantized(&qv))
1705                        .ok();
1706                    (id.clone(), db::embedding_to_bytes(emb), q8)
1707                })
1708                .collect();
1709
1710            self.with_write_conn(move |conn| {
1711                db::with_transaction(conn, |tx| {
1712                    for (fid, bytes, q8) in &updates {
1713                        tx.execute(
1714                            "UPDATE facts SET embedding = ?1, embedding_q8 = ?2, updated_at = datetime('now') WHERE id = ?3",
1715                            rusqlite::params![bytes, q8.as_deref(), fid],
1716                        )?;
1717                        #[cfg(feature = "hnsw")]
1718                        db::queue_pending_index_op(
1719                            tx,
1720                            &format!("fact:{fid}"),
1721                            "fact",
1722                            db::IndexOpKind::Upsert,
1723                        )?;
1724                        db::invalidate_derived_vector_artifact(tx, &format!("fact:{fid}"))?;
1725                    }
1726                    Ok(())
1727                })
1728            })
1729            .await?;
1730
1731            fact_count += batch.len();
1732            count += batch.len();
1733            if fact_count % 100 == 0 || fact_count == count {
1734                tracing::info!(fact_count, "Re-embedded {} facts so far", fact_count);
1735            }
1736        }
1737
1738        // ─── Chunks ─────────────────────────────────────────────────
1739        let chunk_data: Vec<(String, String)> = self
1740            .with_read_conn(|conn| {
1741                let mut stmt = conn.prepare("SELECT id, content FROM chunks")?;
1742                let result = stmt
1743                    .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1744                    .collect::<Result<Vec<_>, _>>()?;
1745                Ok(result)
1746            })
1747            .await?;
1748
1749        let mut chunk_count = 0usize;
1750        for batch in chunk_data.chunks(batch_size) {
1751            let texts: Vec<String> = batch.iter().map(|(_, c)| c.clone()).collect();
1752            let embeddings = self.embed_batch_internal(texts).await?;
1753            for embedding in &embeddings {
1754                self.validate_embedding_dimensions(embedding)?;
1755            }
1756
1757            let quantizer = Quantizer::new(dims);
1758            let updates: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = batch
1759                .iter()
1760                .zip(embeddings.iter())
1761                .map(|((id, _), emb)| {
1762                    // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
1763                    let q8 = quantizer
1764                        .quantize(emb)
1765                        .map(|qv| quantize::pack_quantized(&qv))
1766                        .ok();
1767                    (id.clone(), db::embedding_to_bytes(emb), q8)
1768                })
1769                .collect();
1770
1771            self.with_write_conn(move |conn| {
1772                db::with_transaction(conn, |tx| {
1773                    for (cid, bytes, q8) in &updates {
1774                        tx.execute(
1775                            "UPDATE chunks SET embedding = ?1, embedding_q8 = ?2 WHERE id = ?3",
1776                            rusqlite::params![bytes, q8.as_deref(), cid],
1777                        )?;
1778                        #[cfg(feature = "hnsw")]
1779                        db::queue_pending_index_op(
1780                            tx,
1781                            &format!("chunk:{cid}"),
1782                            "chunk",
1783                            db::IndexOpKind::Upsert,
1784                        )?;
1785                        db::invalidate_derived_vector_artifact(tx, &format!("chunk:{cid}"))?;
1786                    }
1787                    Ok(())
1788                })
1789            })
1790            .await?;
1791
1792            chunk_count += batch.len();
1793            count += batch.len();
1794            if chunk_count % 100 == 0 {
1795                tracing::info!(chunk_count, "Re-embedded {} chunks so far", chunk_count);
1796            }
1797        }
1798
1799        // ─── Messages ───────────────────────────────────────────────
1800        let message_data: Vec<(i64, String)> = self
1801            .with_read_conn(|conn| {
1802                let mut stmt = conn.prepare("SELECT id, content FROM messages")?;
1803                let result = stmt
1804                    .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1805                    .collect::<Result<Vec<_>, _>>()?;
1806                Ok(result)
1807            })
1808            .await?;
1809
1810        let mut msg_count = 0usize;
1811        for batch in message_data.chunks(batch_size) {
1812            let texts: Vec<String> = batch.iter().map(|(_, c)| c.clone()).collect();
1813            let embeddings = self.embed_batch_internal(texts).await?;
1814            for embedding in &embeddings {
1815                self.validate_embedding_dimensions(embedding)?;
1816            }
1817
1818            let quantizer = Quantizer::new(dims);
1819            let updates: Vec<(i64, Vec<u8>, Option<Vec<u8>>)> = batch
1820                .iter()
1821                .zip(embeddings.iter())
1822                .map(|((id, _), emb)| {
1823                    // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
1824                    let q8 = quantizer
1825                        .quantize(emb)
1826                        .map(|qv| quantize::pack_quantized(&qv))
1827                        .ok();
1828                    (*id, db::embedding_to_bytes(emb), q8)
1829                })
1830                .collect();
1831
1832            self.with_write_conn(move |conn| {
1833                db::with_transaction(conn, |tx| {
1834                    for (mid, bytes, q8) in &updates {
1835                        tx.execute(
1836                            "UPDATE messages SET embedding = ?1, embedding_q8 = ?2 WHERE id = ?3",
1837                            rusqlite::params![bytes, q8.as_deref(), mid],
1838                        )?;
1839                        #[cfg(feature = "hnsw")]
1840                        db::queue_pending_index_op(
1841                            tx,
1842                            &format!("msg:{mid}"),
1843                            "message",
1844                            db::IndexOpKind::Upsert,
1845                        )?;
1846                        db::invalidate_derived_vector_artifact(tx, &format!("msg:{mid}"))?;
1847                    }
1848                    Ok(())
1849                })
1850            })
1851            .await?;
1852
1853            msg_count += batch.len();
1854            count += batch.len();
1855            if msg_count % 100 == 0 {
1856                tracing::info!(msg_count, "Re-embedded {} messages so far", msg_count);
1857            }
1858        }
1859
1860        // ─── Episodes ───────────────────────────────────────────────
1861        let episode_data: Vec<(String, String)> = self
1862            .with_read_conn(|conn| {
1863                let mut stmt = conn.prepare("SELECT episode_id, search_text FROM episodes")?;
1864                let result = stmt
1865                    .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1866                    .collect::<Result<Vec<_>, _>>()?;
1867                Ok(result)
1868            })
1869            .await?;
1870
1871        let mut episode_count = 0usize;
1872        for batch in episode_data.chunks(batch_size) {
1873            let texts: Vec<String> = batch.iter().map(|(_, text)| text.clone()).collect();
1874            let embeddings = self.embed_batch_internal(texts).await?;
1875            for embedding in &embeddings {
1876                self.validate_embedding_dimensions(embedding)?;
1877            }
1878
1879            let quantizer = Quantizer::new(dims);
1880            let updates: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = batch
1881                .iter()
1882                .zip(embeddings.iter())
1883                .map(|((episode_id, _), embedding)| {
1884                    // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
1885                    let q8 = quantizer
1886                        .quantize(embedding)
1887                        .map(|vector| quantize::pack_quantized(&vector))
1888                        .ok();
1889                    (episode_id.clone(), db::embedding_to_bytes(embedding), q8)
1890                })
1891                .collect();
1892
1893            self.with_write_conn(move |conn| {
1894                db::with_transaction(conn, |tx| {
1895                    for (episode_id, bytes, q8) in &updates {
1896                        tx.execute(
1897                            "UPDATE episodes
1898                             SET embedding = ?1,
1899                                 embedding_q8 = ?2,
1900                                 updated_at = datetime('now')
1901                             WHERE episode_id = ?3",
1902                            rusqlite::params![bytes, q8.as_deref(), episode_id],
1903                        )?;
1904                        #[cfg(feature = "hnsw")]
1905                        db::queue_pending_index_op(
1906                            tx,
1907                            &episodes::episode_item_key(episode_id),
1908                            "episode",
1909                            db::IndexOpKind::Upsert,
1910                        )?;
1911                        db::invalidate_derived_vector_artifact(
1912                            tx,
1913                            &episodes::episode_item_key(episode_id),
1914                        )?;
1915                    }
1916                    Ok(())
1917                })
1918            })
1919            .await?;
1920
1921            episode_count += batch.len();
1922            count += batch.len();
1923            if episode_count % 100 == 0 {
1924                tracing::info!(
1925                    episode_count,
1926                    "Re-embedded {} episodes so far",
1927                    episode_count
1928                );
1929            }
1930        }
1931
1932        // Clear the dirty flag
1933        self.with_write_conn(db::clear_embeddings_dirty).await?;
1934
1935        tracing::info!(
1936            facts = fact_count,
1937            chunks = chunk_count,
1938            messages = msg_count,
1939            episodes = episode_count,
1940            total = count,
1941            "Re-embedding complete"
1942        );
1943
1944        // Rebuild HNSW after re-embedding
1945        #[cfg(feature = "hnsw")]
1946        {
1947            tracing::info!("Rebuilding HNSW index after re-embedding...");
1948            let _receipt = self.rebuild_hnsw_index().await?;
1949        }
1950
1951        Ok(count)
1952    }
1953
1954    /// Vacuum the database (reclaim space after deletions).
1955    pub async fn vacuum(&self) -> Result<(), MemoryError> {
1956        self.with_write_conn(|conn| {
1957            conn.execute_batch("VACUUM")?;
1958            Ok(())
1959        })
1960        .await
1961    }
1962
1963    // ─── Projection Import ─────────────────────────────────────
1964
1965    /// Import a projection envelope atomically (V10 legacy path).
1966    ///
1967    /// ## Phase status: compatibility / migration-only
1968    ///
1969    /// This method is the V10 legacy import path. New integrations should use
1970    /// [`import_projection_batch()`](Self::import_projection_batch) instead,
1971    /// which accepts the canonical `ProjectionImportBatchV3` format from
1972    /// `forge-memory-bridge`.
1973    ///
1974    /// **Removal condition**: removed when all callers migrate to the bridge pipeline.
1975    ///
1976    /// **Idempotent**: re-importing the same envelope (same `envelope_id` +
1977    /// `schema_version` + `content_digest`) returns a receipt with
1978    /// `was_duplicate = true` and does not modify data.
1979    ///
1980    /// **Atomic**: all records are committed in a single transaction. On any
1981    /// failure the entire import is rolled back — no partial visibility.
1982    ///
1983    /// **Provenance**: each imported record's metadata is tagged with the
1984    /// envelope_id and source_authority for traceability.
1985    #[deprecated(
1986        since = "0.5.0",
1987        note = "Legacy V10 import envelope path is compatibility-only. Use `import_projection_batch()` and `ProjectionImportBatchV3` on the canonical lane."
1988    )]
1989    #[doc(hidden)]
1990    #[allow(deprecated)]
1991    pub async fn import_envelope(
1992        &self,
1993        envelope: &projection_import::ImportEnvelope,
1994    ) -> Result<projection_import::ImportReceipt, MemoryError> {
1995        projection_legacy_compat::import_envelope(self, envelope).await
1996    }
1997
1998    /// Check whether an envelope has already been imported.
1999    #[deprecated(
2000        since = "0.5.0",
2001        note = "Legacy V10 import envelope status reads are compatibility-only. Prefer the projection import log."
2002    )]
2003    #[doc(hidden)]
2004    #[allow(deprecated)]
2005    pub async fn import_status(
2006        &self,
2007        envelope_id: &projection_import::EnvelopeId,
2008    ) -> Result<Vec<projection_import::ImportReceipt>, MemoryError> {
2009        projection_legacy_compat::import_status(self, envelope_id).await
2010    }
2011
2012    /// List recent imports, optionally filtered by namespace.
2013    #[deprecated(
2014        since = "0.5.0",
2015        note = "Legacy V10 import log access is compatibility-only. Prefer new projection-import metadata."
2016    )]
2017    #[doc(hidden)]
2018    #[allow(deprecated)]
2019    pub async fn list_imports(
2020        &self,
2021        namespace: Option<&str>,
2022        limit: usize,
2023    ) -> Result<Vec<projection_import::ImportReceipt>, MemoryError> {
2024        projection_legacy_compat::list_imports(self, namespace, limit).await
2025    }
2026
2027    /// Get the most recent successful import timestamp for a namespace.
2028    #[allow(deprecated)]
2029    pub async fn last_import_at(&self, namespace: &str) -> Result<Option<String>, MemoryError> {
2030        projection_legacy_compat::last_import_at(self, namespace).await
2031    }
2032
2033    /// Query imported claim projection rows through the supported public read surface.
2034    pub async fn query_claim_versions(
2035        &self,
2036        query: ProjectionQuery,
2037    ) -> Result<Vec<ProjectionClaimVersion>, MemoryError> {
2038        self.with_read_conn(move |conn| projection_storage::query_claim_versions(conn, &query))
2039            .await
2040    }
2041
2042    /// Query imported relation projection rows through the supported public read surface.
2043    pub async fn query_relation_versions(
2044        &self,
2045        query: ProjectionQuery,
2046    ) -> Result<Vec<ProjectionRelationVersion>, MemoryError> {
2047        self.with_read_conn(move |conn| projection_storage::query_relation_versions(conn, &query))
2048            .await
2049    }
2050
2051    /// Query imported episode projection rows through the supported public read surface.
2052    pub async fn query_episodes(
2053        &self,
2054        query: ProjectionQuery,
2055    ) -> Result<Vec<ProjectionEpisode>, MemoryError> {
2056        self.with_read_conn(move |conn| projection_storage::query_episode_rows(conn, &query))
2057            .await
2058    }
2059
2060    /// Query imported entity-alias rows through the supported public read surface.
2061    pub async fn query_entity_aliases(
2062        &self,
2063        query: ProjectionQuery,
2064    ) -> Result<Vec<ProjectionEntityAlias>, MemoryError> {
2065        self.with_read_conn(move |conn| projection_storage::query_entity_aliases(conn, &query))
2066            .await
2067    }
2068
2069    /// Query imported evidence-reference rows through the supported public read surface.
2070    pub async fn query_evidence_refs(
2071        &self,
2072        query: ProjectionQuery,
2073    ) -> Result<Vec<ProjectionEvidenceRef>, MemoryError> {
2074        self.with_read_conn(move |conn| projection_storage::query_evidence_refs(conn, &query))
2075            .await
2076    }
2077
2078    /// Execute raw SQL. For testing only — not part of the stable public API.
2079    #[cfg(any(test, feature = "testing"))]
2080    pub async fn raw_execute(&self, sql: &str, params: Vec<String>) -> Result<usize, MemoryError> {
2081        let sql = sql.to_string();
2082        self.with_write_conn(move |conn| {
2083            let param_refs: Vec<&dyn rusqlite::types::ToSql> = params
2084                .iter()
2085                .map(|s| s as &dyn rusqlite::types::ToSql)
2086                .collect();
2087            Ok(conn.execute(&sql, &*param_refs)?)
2088        })
2089        .await
2090    }
2091}