Skip to main content

semantic_memory/
lib.rs

1#![allow(deprecated)]
2
3//! # semantic-memory
4//!
5//! Local-first semantic memory backed by authoritative SQLite state and an optional recoverable
6//! HNSW sidecar.
7//!
8//! The crate stores facts, chunked documents, conversation messages, and searchable episodes in
9//! SQLite. Search combines BM25 (FTS5) and vector retrieval with Reciprocal Rank Fusion, and
10//! `search_explained()` returns the exact scoring breakdown from the live pipeline.
11//!
12//! Concurrency uses one writer connection plus a pool of WAL-enabled reader connections.
13//! Durable writes are committed to SQLite first; any required HNSW sidecar mutations are journaled
14//! in SQLite and replayed on open, flush, rebuild, or reconcile.
15//!
16//! `search()` targets facts, document chunks, and episodes by default. Message retrieval is
17//! available through `search_conversations()` or by opting into
18//! [`SearchSourceType::Messages`](crate::SearchSourceType::Messages).
19//!
20//! Integrity tooling is strict about malformed stored data: invalid roles, JSON, enums, embedding
21//! blobs, quantized blobs, and sidecar drift are surfaced through `verify_integrity()` instead of
22//! being silently converted into defaults. `reconcile()` can rebuild FTS or fully re-embed and
23//! rebuild derived state from SQLite.
24//!
25//! `store.graph_view()` exposes a deterministic graph traversal layer over namespaces, facts,
26//! documents, chunks, sessions, messages, episodes, and semantic/temporal/causal links derived
27//! from SQLite state.
28//!
29//! ## Quick Start
30//!
31//! ```rust,no_run
32//! use semantic_memory::{MemoryConfig, MemoryStore};
33//!
34//! # async fn example() -> Result<(), semantic_memory::MemoryError> {
35//! let store = MemoryStore::open(MemoryConfig::default())?;
36//!
37//! // Store a fact
38//! store.add_fact("general", "Rust was first released in 2015", None, None).await?;
39//!
40//! // Search
41//! let results = store.search("when was Rust released", None, None, None).await?;
42//! # Ok(())
43//! # }
44//! ```
45//!
46//! ## Operational Notes
47//!
48//! - SQLite is authoritative for all durable records and embeddings.
49//! - HNSW is an acceleration sidecar. Pending sidecar mutations are journaled in SQLite, so a
50//!   sidecar failure does not imply the SQLite write rolled back.
51//! - WAL mode plus pooled reader connections allows concurrent reads while writes serialize through
52//!   the writer connection.
53//! - `search_explained()` reflects the exact ranking math used by the active search pipeline,
54//!   including reranking from exact f32 cosine similarity when configured.
55
56// At least one search backend must be enabled.
57#[cfg(not(any(feature = "hnsw", feature = "brute-force")))]
58compile_error!("At least one search backend feature must be enabled: 'hnsw' or 'brute-force'");
59
60pub mod chunker;
61pub mod config;
62pub(crate) mod conversation;
63pub mod db;
64pub(crate) mod documents;
65pub mod embedder;
66pub(crate) mod episodes;
67pub mod error;
68mod graph;
69#[cfg(feature = "hnsw")]
70pub mod hnsw;
71#[cfg(feature = "hnsw")]
72mod hnsw_ops;
73mod json_compat_import;
74pub(crate) mod knowledge;
75mod pool;
76mod projection_batch;
77mod projection_derivation;
78/// Compatibility-only legacy import surface.
79///
80/// This module exists only for migration compatibility with pre-V11 import paths.
81#[deprecated(
82    since = "0.6.0",
83    note = "Legacy V10 import path is migration-only. Use `import_projection_batch()` with `ProjectionImportBatchV3` on the canonical lane."
84)]
85#[doc(hidden)]
86pub mod projection_import;
87mod projection_lane;
88mod projection_legacy_compat;
89pub(crate) mod projection_storage;
90pub mod quantize;
91pub mod search;
92pub mod storage;
93mod store_support;
94pub mod tokenizer;
95pub mod types;
96
97// Re-export primary public types.
98pub use config::{
99    ChunkingConfig, EmbeddingConfig, MemoryConfig, MemoryLimits, PoolConfig, SearchConfig,
100};
101pub use db::{IntegrityReport, ReconcileAction, VerifyMode};
102pub use embedder::{Embedder, MockEmbedder, OllamaEmbedder};
103pub use error::MemoryError;
104#[cfg(feature = "hnsw")]
105pub use hnsw::{HnswConfig, HnswHit, HnswIndex};
106pub(crate) use projection_lane::projection_import_failure_id;
107pub use projection_lane::{
108    ProjectionImportFailureReceiptEntry, ProjectionImportLogEntry, ProjectionImportResult,
109};
110pub use quantize::{pack_quantized, unpack_quantized, QuantizedVector, Quantizer};
111pub use storage::StoragePaths;
112pub use tokenizer::{EstimateTokenCounter, TokenCounter};
113pub use types::{
114    Document, EmbeddingDisplacement, EpisodeMeta, EpisodeOutcome, ExplainedResult, Fact,
115    GraphDirection, GraphEdge, GraphEdgeType, GraphView, MemoryStats, Message,
116    ProjectionClaimVersion, ProjectionEntityAlias, ProjectionEpisode, ProjectionEvidenceRef,
117    ProjectionQuery, ProjectionRelationVersion, Role, ScoreBreakdown, SearchResult, SearchSource,
118    SearchSourceType, Session, TextChunk, VerificationStatus,
119};
120
121use std::sync::Arc;
122
123pub(crate) use store_support::{
124    as_str_slice, build_episode_search_text, merge_trace_ctx, to_owned_string_vec,
125    verification_status_for_outcome,
126};
127
128/// Compatibility-only public access to retained legacy surfaces.
129#[doc(hidden)]
130pub mod compat {
131    #[deprecated(
132        since = "0.5.0",
133        note = "Legacy ImportEnvelope is migration-only. New integrations should use `ProjectionImportBatchV3` on the canonical lane."
134    )]
135    #[doc(hidden)]
136    #[allow(deprecated)]
137    pub mod legacy_import_envelope {
138        pub use crate::projection_import::{
139            ImportEnvelope, ImportProjectionFreshness, ImportReceipt, ImportRecord, ImportStatus,
140        };
141        pub use stack_ids::EnvelopeId;
142    }
143
144    #[deprecated(
145        since = "0.5.0",
146        note = "Legacy trace_id is migration-only. Use `stack_ids::TraceCtx`."
147    )]
148    #[doc(hidden)]
149    #[allow(deprecated)]
150    pub mod compat_trace_id {
151        pub use crate::types::TraceId;
152    }
153}
154
155/// Thread-safe handle to the memory database.
156///
157/// Clone is cheap (Arc internals). `Send + Sync`.
158#[derive(Clone)]
159pub struct MemoryStore {
160    inner: Arc<MemoryStoreInner>,
161}
162
163struct MemoryStoreInner {
164    pool: pool::SqlitePool,
165    embedder: Box<dyn Embedder>,
166    embedding_permits: Arc<tokio::sync::Semaphore>,
167    config: MemoryConfig,
168    paths: StoragePaths,
169    token_counter: Arc<dyn TokenCounter>,
170    #[cfg(feature = "hnsw")]
171    hnsw_index: std::sync::RwLock<HnswIndex>,
172}
173
174#[cfg(feature = "hnsw")]
175impl Drop for MemoryStoreInner {
176    fn drop(&mut self) {
177        if !self.paths.hnsw_dir.exists() {
178            tracing::debug!(
179                path = %self.paths.hnsw_dir.display(),
180                "Skipping HNSW drop flush because the sidecar directory no longer exists"
181            );
182            return;
183        }
184
185        let pending_ops = match self.pool.with_read_conn(db::pending_index_op_count) {
186            Ok(count) => count,
187            Err(err) => {
188                tracing::warn!("Failed to inspect pending HNSW work on drop: {}", err);
189                0
190            }
191        };
192
193        if pending_ops > 0 {
194            if let Err(err) =
195                hnsw_ops::recover_hnsw_sidecar_sync(&self.pool, &self.paths, &self.config.hnsw)
196            {
197                tracing::error!("Failed to recover and flush HNSW on drop: {}", err);
198            }
199            return;
200        }
201
202        let hnsw_guard = match self.hnsw_index.read() {
203            Ok(g) => g,
204            Err(_) => {
205                tracing::warn!("HNSW RwLock poisoned on drop — skipping save");
206                return;
207            }
208        };
209
210        if let Err(err) = hnsw_ops::save_hnsw_sidecar(
211            &hnsw_guard,
212            &self.paths.hnsw_dir,
213            &self.paths.hnsw_basename,
214        ) {
215            tracing::error!("Failed to save HNSW index on drop: {}", err);
216        }
217
218        // Flush key mappings to SQLite
219        if let Err(e) = self
220            .pool
221            .with_write_conn(|conn| hnsw_guard.flush_keymap(conn))
222        {
223            tracing::error!("Failed to flush HNSW keymap on drop: {}", e);
224        }
225    }
226}
227
228impl MemoryStore {
229    /// Run read-only work on a pooled reader connection on a blocking thread.
230    ///
231    /// This prevents SQLite I/O from stalling the tokio executor while allowing
232    /// multiple concurrent readers under WAL mode.
233    async fn with_read_conn<F, T>(&self, f: F) -> Result<T, MemoryError>
234    where
235        F: FnOnce(&rusqlite::Connection) -> Result<T, MemoryError> + Send + 'static,
236        T: Send + 'static,
237    {
238        let inner = self.inner.clone();
239        tokio::task::spawn_blocking(move || -> Result<T, MemoryError> {
240            inner.pool.with_read_conn(f)
241        })
242        .await
243        .map_err(|e| MemoryError::Other(format!("Blocking task panicked: {}", e)))?
244    }
245
246    /// Run write-capable work on the single writer connection on a blocking thread.
247    async fn with_write_conn<F, T>(&self, f: F) -> Result<T, MemoryError>
248    where
249        F: FnOnce(&rusqlite::Connection) -> Result<T, MemoryError> + Send + 'static,
250        T: Send + 'static,
251    {
252        let inner = self.inner.clone();
253        tokio::task::spawn_blocking(move || -> Result<T, MemoryError> {
254            inner.pool.with_write_conn(f)
255        })
256        .await
257        .map_err(|e| MemoryError::Other(format!("Blocking task panicked: {}", e)))?
258    }
259
260    /// Run HNSW search on a blocking thread to avoid holding std::sync::RwLock
261    /// across await points (CONC-001).
262    #[cfg(feature = "hnsw")]
263    async fn hnsw_search_blocking(
264        &self,
265        query_embedding: Vec<f32>,
266        candidates: usize,
267    ) -> Vec<HnswHit> {
268        let inner = self.inner.clone();
269        tokio::task::spawn_blocking(move || {
270            let guard = inner.hnsw_index.read().unwrap_or_else(|e| e.into_inner());
271            match guard.search(&query_embedding, candidates) {
272                Ok(hits) => hits,
273                Err(e) => {
274                    tracing::error!(
275                        "HNSW search failed, falling back to brute-force vector search: {}",
276                        e
277                    );
278                    Vec::new()
279                }
280            }
281        })
282        .await
283        .unwrap_or_else(|e| {
284            tracing::error!("HNSW search blocking task panicked: {}", e);
285            Vec::new()
286        })
287    }
288
289    #[cfg(feature = "hnsw")]
290    fn sync_pending_hnsw_ops_blocking(&self) -> Result<usize, MemoryError> {
291        hnsw_ops::sync_pending_hnsw_sidecar(&self.inner)
292    }
293
294    #[cfg(feature = "hnsw")]
295    async fn sync_pending_hnsw_ops(&self) -> Result<usize, MemoryError> {
296        let inner = self.inner.clone();
297        tokio::task::spawn_blocking(move || hnsw_ops::sync_pending_hnsw_sidecar(&inner))
298            .await
299            .map_err(|e| MemoryError::Other(format!("Blocking task panicked: {}", e)))?
300    }
301
302    #[cfg(feature = "hnsw")]
303    async fn sync_pending_hnsw_ops_best_effort(&self, operation: &'static str) {
304        if let Err(err) = self.sync_pending_hnsw_ops().await {
305            tracing::warn!(
306                operation,
307                error = %err,
308                "SQLite write committed but HNSW sidecar sync is still pending"
309            );
310        } else {
311            self.maybe_flush_hnsw();
312        }
313    }
314
315    /// Open or create a memory store at the configured base directory.
316    ///
317    /// Creates the directory if it doesn't exist, opens/creates SQLite,
318    /// runs migrations, and initializes the HNSW index.
319    pub fn open(config: MemoryConfig) -> Result<Self, MemoryError> {
320        let config = config.normalize_and_validate()?;
321        let embedder = Box::new(OllamaEmbedder::try_new(&config.embedding)?);
322        Self::open_with_embedder(config, embedder)
323    }
324
325    /// Open with a custom embedder (for testing or non-Ollama providers).
326    #[allow(unused_mut)] // `config` is mutated only when the `hnsw` feature is enabled
327    pub fn open_with_embedder(
328        mut config: MemoryConfig,
329        embedder: Box<dyn Embedder>,
330    ) -> Result<Self, MemoryError> {
331        config = config.normalize_and_validate()?;
332        if embedder.dimensions() != config.embedding.dimensions {
333            return Err(MemoryError::DimensionMismatch {
334                expected: config.embedding.dimensions,
335                actual: embedder.dimensions(),
336            });
337        }
338        config.embedding.model = embedder.model_name().to_string();
339
340        let paths = StoragePaths::new(&config.base_dir);
341
342        // Create directory if needed
343        std::fs::create_dir_all(&paths.base_dir).map_err(|e| {
344            MemoryError::StorageError(format!(
345                "Failed to create directory {}: {}",
346                paths.base_dir.display(),
347                e
348            ))
349        })?;
350
351        let pool = pool::SqlitePool::open(&paths.sqlite_path, &config.pool, &config.limits)?;
352        pool.with_write_conn(|conn| db::check_embedding_metadata(conn, &config.embedding))?;
353
354        // Ensure HNSW dimensions match the embedding config
355        #[cfg(feature = "hnsw")]
356        {
357            config.hnsw.dimensions = config.embedding.dimensions;
358        }
359
360        let token_counter = config
361            .token_counter
362            .clone()
363            .unwrap_or_else(tokenizer::default_token_counter);
364
365        #[cfg(feature = "hnsw")]
366        let hnsw_index = {
367            let hnsw_config = config.hnsw.clone();
368
369            let embeddings_dirty = pool.with_read_conn(db::is_embeddings_dirty)?;
370            let pending_index_ops = pool.with_read_conn(db::pending_index_op_count)?;
371
372            if embeddings_dirty {
373                // Embedding model changed — old HNSW index is useless.
374                // Create a fresh index; reembed_all() will rebuild it.
375                tracing::warn!(
376                    "Embedding model changed — creating fresh HNSW index (old index is stale)"
377                );
378                pool.with_write_conn(|conn| {
379                    db::clear_all_pending_index_ops(conn)?;
380                    db::set_sidecar_dirty(conn, false)?;
381                    Ok(())
382                })?;
383                HnswIndex::new(hnsw_config)?
384            } else if pending_index_ops > 0 || pool.with_read_conn(db::is_sidecar_dirty)? {
385                tracing::warn!(
386                    pending_index_ops,
387                    "Recovering HNSW sidecar from SQLite because durable sidecar work exists"
388                );
389                hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?
390            } else if paths.hnsw_files_exist() {
391                tracing::info!("Loading HNSW index from {:?}", paths.hnsw_dir);
392                match HnswIndex::load(&paths.hnsw_dir, &paths.hnsw_basename, hnsw_config.clone()) {
393                    Ok(index) => {
394                        // Load key mappings from SQLite
395                        if let Err(e) = pool.with_write_conn(|conn| index.load_keymap(conn)) {
396                            tracing::warn!("Failed to load HNSW key mappings: {}. Mappings will be empty until rebuild.", e);
397                        }
398
399                        // Stale index detection: compare HNSW entry count vs SQLite
400                        // embedding count. A mismatch means the app crashed before
401                        // flushing HNSW, or keys were lost.
402                        let hnsw_count = index.len();
403                        let sqlite_count: i64 = pool.with_read_conn(|conn| {
404                            Ok(conn
405                                .query_row(
406                                    "SELECT (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
407                                        (SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
408                                        (SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
409                                        (SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
410                                    [],
411                                    |row| row.get(0),
412                                )
413                                .unwrap_or(0))
414                        })?;
415
416                        let drift = (sqlite_count - hnsw_count as i64).abs();
417                        if drift > 0 {
418                            tracing::warn!(
419                                hnsw_count,
420                                sqlite_count,
421                                drift,
422                                "HNSW index is stale — {} entries differ from SQLite. \
423                                 Likely caused by unclean shutdown. Triggering inline rebuild.",
424                                drift
425                            );
426                            // Discard the stale index and rebuild from SQLite
427                            let rebuilt =
428                                hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?;
429                            tracing::info!(
430                                active = rebuilt.len(),
431                                "HNSW index rebuilt after stale detection"
432                            );
433                            rebuilt
434                        } else {
435                            tracing::info!(
436                                "HNSW index loaded ({} active keys, in sync with SQLite)",
437                                hnsw_count
438                            );
439                            index
440                        }
441                    }
442                    Err(e) => {
443                        tracing::warn!(
444                            "Failed to load HNSW index: {}. Creating new empty index.",
445                            e
446                        );
447                        HnswIndex::new(hnsw_config)?
448                    }
449                }
450            } else {
451                // Check if SQLite has embeddings that should be in the index.
452                // This happens when: sidecar files were deleted, data dir was
453                // partially copied, app crashed before first flush, or HNSW was
454                // added after data already existed.
455                let orphan_count: i64 = pool.with_read_conn(|conn| {
456                    Ok(conn
457                        .query_row(
458                            "SELECT (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
459                                (SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
460                                (SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
461                                (SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
462                            [],
463                            |row| row.get(0),
464                        )
465                        .unwrap_or(0))
466                })?;
467
468                if orphan_count > 0 {
469                    tracing::warn!(
470                        orphan_count,
471                        "HNSW sidecar files missing but {} embeddings exist in SQLite — \
472                         rebuilding index inline",
473                        orphan_count
474                    );
475                    let new_index =
476                        hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?;
477                    tracing::info!(
478                        active = new_index.len(),
479                        "HNSW index rebuilt from SQLite embeddings"
480                    );
481                    new_index
482                } else {
483                    tracing::info!("Creating new empty HNSW index (no embeddings in SQLite)");
484                    HnswIndex::new(hnsw_config)?
485                }
486            }
487        };
488
489        let store = Self {
490            inner: Arc::new(MemoryStoreInner {
491                pool,
492                embedder,
493                embedding_permits: Arc::new(tokio::sync::Semaphore::new(
494                    config.limits.max_embedding_concurrency,
495                )),
496                config,
497                paths,
498                token_counter,
499                #[cfg(feature = "hnsw")]
500                hnsw_index: std::sync::RwLock::new(hnsw_index),
501            }),
502        };
503
504        #[cfg(feature = "hnsw")]
505        if let Err(err) = store.sync_pending_hnsw_ops_blocking() {
506            tracing::warn!(
507                error = %err,
508                "Failed to reconcile pending HNSW sidecar ops during open; sidecar replay remains pending"
509            );
510        }
511
512        Ok(store)
513    }
514
515    async fn with_embedding_permit(
516        &self,
517    ) -> Result<tokio::sync::OwnedSemaphorePermit, MemoryError> {
518        self.inner
519            .embedding_permits
520            .clone()
521            .acquire_owned()
522            .await
523            .map_err(|e| MemoryError::Other(format!("embedding semaphore closed: {e}")))
524    }
525
526    async fn embed_text_internal(&self, text: &str) -> Result<Vec<f32>, MemoryError> {
527        let _permit = self.with_embedding_permit().await?;
528        self.inner.embedder.embed(text).await
529    }
530
531    async fn embed_batch_internal(&self, texts: Vec<String>) -> Result<Vec<Vec<f32>>, MemoryError> {
532        let _permit = self.with_embedding_permit().await?;
533        self.inner.embedder.embed_batch(texts).await
534    }
535
536    fn validate_embedding_dimensions(&self, embedding: &[f32]) -> Result<(), MemoryError> {
537        let expected = self.inner.config.embedding.dimensions;
538        if embedding.len() != expected {
539            return Err(MemoryError::DimensionMismatch {
540                expected,
541                actual: embedding.len(),
542            });
543        }
544        Ok(())
545    }
546
547    fn validate_content(&self, field: &'static str, content: &str) -> Result<(), MemoryError> {
548        if content.is_empty() {
549            return Err(MemoryError::InvalidConfig {
550                field,
551                reason: "content must not be empty".to_string(),
552            });
553        }
554
555        let limit = self.inner.config.limits.max_content_bytes;
556        if content.len() > limit {
557            return Err(MemoryError::ContentTooLarge {
558                size: content.len(),
559                limit,
560            });
561        }
562
563        Ok(())
564    }
565
566    fn validate_confidence(confidence: f32) -> Result<(), MemoryError> {
567        if !confidence.is_finite() || !(0.0..=1.0).contains(&confidence) {
568            return Err(MemoryError::InvalidConfig {
569                field: "episodes.confidence",
570                reason: "confidence must be finite and within [0.0, 1.0]".to_string(),
571            });
572        }
573        Ok(())
574    }
575
576    // ─── HNSW Management ───────────────────────────────────────
577
578    /// Rebuild the HNSW index from SQLite f32 embeddings.
579    ///
580    /// Call this if sidecar files are missing, corrupted, or after `reembed_all()`.
581    #[cfg(feature = "hnsw")]
582    pub async fn rebuild_hnsw_index(&self) -> Result<(), MemoryError> {
583        tracing::info!("Rebuilding HNSW index from SQLite embeddings...");
584        let hnsw_config = self.inner.config.hnsw.clone();
585        let new_index = self
586            .with_read_conn(move |conn| hnsw_ops::rebuild_hnsw_from_sqlite(conn, &hnsw_config))
587            .await?;
588
589        {
590            let mut guard = self
591                .inner
592                .hnsw_index
593                .write()
594                .unwrap_or_else(|e| e.into_inner());
595            *guard = new_index.clone();
596        }
597
598        hnsw_ops::save_hnsw_sidecar(
599            &new_index,
600            &self.inner.paths.hnsw_dir,
601            &self.inner.paths.hnsw_basename,
602        )?;
603        self.inner.pool.with_write_conn(|conn| {
604            new_index.flush_keymap(conn)?;
605            db::clear_all_pending_index_ops(conn)?;
606            db::set_sidecar_dirty(conn, false)?;
607            Ok(())
608        })?;
609
610        tracing::info!(active = new_index.len(), "HNSW index rebuilt");
611
612        Ok(())
613    }
614
615    /// Opportunistically flush HNSW if the configured interval has elapsed.
616    ///
617    /// Cheap no-op when `flush_interval_secs` is None or the interval hasn't
618    /// elapsed yet (just an atomic load + epoch comparison).
619    #[cfg(feature = "hnsw")]
620    fn maybe_flush_hnsw(&self) {
621        if let Some(interval) = self.inner.config.hnsw.flush_interval_secs {
622            let guard = self
623                .inner
624                .hnsw_index
625                .read()
626                .unwrap_or_else(|e| e.into_inner());
627            if guard.should_flush(interval) {
628                drop(guard); // release read lock before flushing
629                if let Err(e) = self.flush_hnsw() {
630                    tracing::warn!("Opportunistic HNSW flush failed: {}", e);
631                } else {
632                    let guard = self
633                        .inner
634                        .hnsw_index
635                        .read()
636                        .unwrap_or_else(|e| e.into_inner());
637                    guard.update_last_flush_epoch();
638                    tracing::info!("Opportunistic HNSW flush completed");
639                }
640            }
641        }
642    }
643
644    /// Persist the HNSW graph, vector data, and key mappings to disk.
645    ///
646    /// Called automatically on drop, but can be called explicitly for durability.
647    #[cfg(feature = "hnsw")]
648    pub fn flush_hnsw(&self) -> Result<(), MemoryError> {
649        let pending_ops = self.inner.pool.with_read_conn(db::pending_index_op_count)?;
650        if pending_ops > 0 {
651            tracing::info!(
652                pending_ops,
653                "Flushing HNSW via authoritative SQLite rebuild because pending durable sidecar work exists"
654            );
655            let rebuilt = hnsw_ops::recover_hnsw_sidecar_sync(
656                &self.inner.pool,
657                &self.inner.paths,
658                &self.inner.config.hnsw,
659            )?;
660            let mut guard = self
661                .inner
662                .hnsw_index
663                .write()
664                .unwrap_or_else(|e| e.into_inner());
665            *guard = rebuilt;
666            return Ok(());
667        }
668
669        let index = self
670            .inner
671            .hnsw_index
672            .read()
673            .unwrap_or_else(|e| e.into_inner())
674            .clone();
675        hnsw_ops::save_hnsw_sidecar(
676            &index,
677            &self.inner.paths.hnsw_dir,
678            &self.inner.paths.hnsw_basename,
679        )?;
680
681        // Flush key mappings to SQLite
682        self.inner.pool.with_write_conn(|conn| {
683            index.flush_keymap(conn)?;
684            db::clear_all_pending_index_ops(conn)?;
685            db::set_sidecar_dirty(conn, false)?;
686            Ok(())
687        })?;
688        Ok(())
689    }
690
691    /// Compact the HNSW index by rebuilding without tombstones.
692    ///
693    /// Only rebuilds if the deleted ratio exceeds the compaction threshold.
694    #[cfg(feature = "hnsw")]
695    pub async fn compact_hnsw(&self) -> Result<(), MemoryError> {
696        if !self
697            .inner
698            .hnsw_index
699            .read()
700            .unwrap_or_else(|e| e.into_inner())
701            .needs_compaction()
702        {
703            tracing::info!("HNSW compaction not needed (deleted ratio below threshold)");
704            return Ok(());
705        }
706        self.rebuild_hnsw_index().await
707    }
708
709    // ─── Integrity & Diagnostics ────────────────────────────────
710
711    /// Verify database integrity.
712    ///
713    /// In `Quick` mode, checks table existence and row counts.
714    /// In `Full` mode, also verifies FTS consistency and runs SQLite integrity_check.
715    pub async fn verify_integrity(
716        &self,
717        mode: db::VerifyMode,
718    ) -> Result<db::IntegrityReport, MemoryError> {
719        let use_writer = mode == db::VerifyMode::Full;
720        let mut report = if use_writer {
721            self.with_write_conn(move |conn| db::verify_integrity_sync(conn, mode))
722                .await?
723        } else {
724            self.with_read_conn(move |conn| db::verify_integrity_sync(conn, mode))
725                .await?
726        };
727
728        #[cfg(feature = "hnsw")]
729        {
730            let embedding_count: i64 = if use_writer {
731                self.with_write_conn(|conn| {
732                    Ok(conn.query_row(
733                        "SELECT (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
734                            (SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
735                            (SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
736                            (SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
737                        [],
738                        |row| row.get(0),
739                    )?)
740                })
741                .await?
742            } else {
743                self.with_read_conn(|conn| {
744                    Ok(conn.query_row(
745                        "SELECT (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
746                            (SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
747                            (SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
748                            (SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
749                        [],
750                        |row| row.get(0),
751                    )?)
752                })
753                .await?
754            };
755
756            if embedding_count > 0 && !self.inner.paths.hnsw_files_exist() {
757                report.issues.push(format!(
758                    "HNSW sidecar files are missing while {} embedded rows exist in SQLite",
759                    embedding_count
760                ));
761            }
762
763            let keymap_count: i64 = if use_writer {
764                self.with_write_conn(|conn| {
765                    Ok(conn
766                        .query_row(
767                            "SELECT COUNT(*) FROM hnsw_keymap WHERE deleted = 0",
768                            [],
769                            |row| row.get(0),
770                        )
771                        .unwrap_or(0))
772                })
773                .await?
774            } else {
775                self.with_read_conn(|conn| {
776                    Ok(conn
777                        .query_row(
778                            "SELECT COUNT(*) FROM hnsw_keymap WHERE deleted = 0",
779                            [],
780                            |row| row.get(0),
781                        )
782                        .unwrap_or(0))
783                })
784                .await?
785            };
786
787            if keymap_count != embedding_count {
788                report.issues.push(format!(
789                    "HNSW keymap drift: {} active keymap rows vs {} embedded SQLite rows",
790                    keymap_count, embedding_count
791                ));
792            }
793        }
794
795        report.ok = report.issues.is_empty();
796        Ok(report)
797    }
798
799    /// Reconcile detected integrity issues.
800    ///
801    /// - `ReportOnly`: no-op, just returns the integrity report.
802    /// - `RebuildFts`: rebuilds all FTS indexes from source data.
803    /// - `ReEmbed`: not yet implemented (requires async embedding calls).
804    pub async fn reconcile(
805        &self,
806        action: db::ReconcileAction,
807    ) -> Result<db::IntegrityReport, MemoryError> {
808        match action {
809            db::ReconcileAction::ReportOnly => self.verify_integrity(db::VerifyMode::Full).await,
810            db::ReconcileAction::RebuildFts => {
811                self.with_write_conn(db::reconcile_fts).await?;
812                #[cfg(feature = "hnsw")]
813                self.sync_pending_hnsw_ops_best_effort("reconcile_rebuild_fts")
814                    .await;
815                self.verify_integrity(db::VerifyMode::Full).await
816            }
817            db::ReconcileAction::ReEmbed => {
818                self.reembed_all().await?;
819                self.verify_integrity(db::VerifyMode::Full).await
820            }
821        }
822    }
823
824    /// Get the current configuration.
825    pub fn config(&self) -> &MemoryConfig {
826        &self.inner.config
827    }
828
829    /// View the store as a derived graph over documents, chunks, facts, sessions, messages,
830    /// episodes, namespaces, and semantic similarity edges.
831    pub fn graph_view(&self) -> Arc<dyn GraphView> {
832        graph::graph_view(self.inner.clone())
833    }
834
835    // ─── Search ─────────────────────────────────────────────────
836
837    /// Hybrid search across facts, document chunks, and searchable episodes.
838    pub async fn search(
839        &self,
840        query: &str,
841        top_k: Option<usize>,
842        namespaces: Option<&[&str]>,
843        source_types: Option<&[SearchSourceType]>,
844    ) -> Result<Vec<SearchResult>, MemoryError> {
845        let k = top_k.unwrap_or(self.inner.config.search.default_top_k);
846
847        let query_embedding = self.embed_text_internal(query).await?;
848
849        #[cfg(feature = "hnsw")]
850        let hnsw_hits = {
851            let candidates = self.inner.config.search.candidate_pool_size.max(k * 3);
852            self.hnsw_search_blocking(query_embedding.clone(), candidates)
853                .await
854        };
855
856        let q = query.to_string();
857        let config = self.inner.config.search.clone();
858        let ns_owned = to_owned_string_vec(namespaces);
859        let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|s| s.to_vec());
860
861        #[cfg(feature = "hnsw")]
862        let hnsw_hits_owned = hnsw_hits;
863
864        self.with_read_conn(move |conn| {
865            if db::is_embeddings_dirty(conn)? {
866                tracing::warn!(
867                    "Embeddings are stale after model change — search quality is degraded. \
868                     Call reembed_all() to regenerate embeddings."
869                );
870            }
871            let ns_refs = as_str_slice(&ns_owned);
872            let ns_slice: Option<&[&str]> = ns_refs.as_deref();
873            let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
874
875            #[cfg(feature = "hnsw")]
876            {
877                if hnsw_hits_owned.is_empty() {
878                    search::hybrid_search(
879                        conn,
880                        &q,
881                        &query_embedding,
882                        &config,
883                        k,
884                        ns_slice,
885                        st_slice,
886                        None,
887                    )
888                } else {
889                    search::hybrid_search_with_hnsw(
890                        conn,
891                        &q,
892                        &query_embedding,
893                        &config,
894                        k,
895                        ns_slice,
896                        st_slice,
897                        None,
898                        &hnsw_hits_owned,
899                    )
900                }
901            }
902            #[cfg(not(feature = "hnsw"))]
903            {
904                search::hybrid_search(
905                    conn,
906                    &q,
907                    &query_embedding,
908                    &config,
909                    k,
910                    ns_slice,
911                    st_slice,
912                    None,
913                )
914            }
915        })
916        .await
917    }
918
919    /// Full-text search only (no embeddings needed).
920    pub async fn search_fts_only(
921        &self,
922        query: &str,
923        top_k: Option<usize>,
924        namespaces: Option<&[&str]>,
925        source_types: Option<&[SearchSourceType]>,
926    ) -> Result<Vec<SearchResult>, MemoryError> {
927        let k = top_k.unwrap_or(self.inner.config.search.default_top_k);
928        let q = query.to_string();
929        let config = self.inner.config.search.clone();
930        let ns_owned = to_owned_string_vec(namespaces);
931        let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|s| s.to_vec());
932        self.with_read_conn(move |conn| {
933            let ns_refs = as_str_slice(&ns_owned);
934            let ns_slice: Option<&[&str]> = ns_refs.as_deref();
935            let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
936            search::fts_only_search(conn, &q, &config, k, ns_slice, st_slice, None)
937        })
938        .await
939    }
940
941    /// Vector similarity search only (no FTS).
942    pub async fn search_vector_only(
943        &self,
944        query: &str,
945        top_k: Option<usize>,
946        namespaces: Option<&[&str]>,
947        source_types: Option<&[SearchSourceType]>,
948    ) -> Result<Vec<SearchResult>, MemoryError> {
949        let k = top_k.unwrap_or(self.inner.config.search.default_top_k);
950        let query_embedding = self.embed_text_internal(query).await?;
951
952        #[cfg(feature = "hnsw")]
953        let hnsw_hits = {
954            let candidates = self.inner.config.search.candidate_pool_size.max(k * 3);
955            self.hnsw_search_blocking(query_embedding.clone(), candidates)
956                .await
957        };
958
959        let config = self.inner.config.search.clone();
960        let ns_owned = to_owned_string_vec(namespaces);
961        let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|s| s.to_vec());
962
963        #[cfg(feature = "hnsw")]
964        let hnsw_hits_owned = hnsw_hits;
965
966        self.with_read_conn(move |conn| {
967            if db::is_embeddings_dirty(conn)? {
968                tracing::warn!(
969                    "Embeddings are stale after model change — search quality is degraded. \
970                     Call reembed_all() to regenerate embeddings."
971                );
972            }
973            let ns_refs = as_str_slice(&ns_owned);
974            let ns_slice: Option<&[&str]> = ns_refs.as_deref();
975            let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
976
977            #[cfg(feature = "hnsw")]
978            {
979                if hnsw_hits_owned.is_empty() {
980                    search::vector_only_search(
981                        conn,
982                        &query_embedding,
983                        &config,
984                        k,
985                        ns_slice,
986                        st_slice,
987                        None,
988                    )
989                } else {
990                    search::vector_only_search_with_hnsw(
991                        conn,
992                        &query_embedding,
993                        &config,
994                        k,
995                        ns_slice,
996                        st_slice,
997                        None,
998                        &hnsw_hits_owned,
999                    )
1000                }
1001            }
1002            #[cfg(not(feature = "hnsw"))]
1003            {
1004                search::vector_only_search(
1005                    conn,
1006                    &query_embedding,
1007                    &config,
1008                    k,
1009                    ns_slice,
1010                    st_slice,
1011                    None,
1012                )
1013            }
1014        })
1015        .await
1016    }
1017
1018    // ─── Explainable Search ───────────────────────────────────
1019
1020    /// Search with full score breakdown for each result.
1021    pub async fn search_explained(
1022        &self,
1023        query: &str,
1024        top_k: Option<usize>,
1025        namespaces: Option<&[&str]>,
1026        source_types: Option<&[SearchSourceType]>,
1027    ) -> Result<Vec<types::ExplainedResult>, MemoryError> {
1028        let k = top_k.unwrap_or(self.inner.config.search.default_top_k);
1029        let query_embedding = self.embed_text_internal(query).await?;
1030
1031        #[cfg(feature = "hnsw")]
1032        let hnsw_hits = {
1033            let candidates = self.inner.config.search.candidate_pool_size.max(k * 3);
1034            self.hnsw_search_blocking(query_embedding.clone(), candidates)
1035                .await
1036        };
1037
1038        let q = query.to_string();
1039        let config = self.inner.config.search.clone();
1040        let ns_owned = to_owned_string_vec(namespaces);
1041        let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|value| value.to_vec());
1042
1043        #[cfg(feature = "hnsw")]
1044        let hnsw_hits_owned = hnsw_hits;
1045
1046        self.with_read_conn(move |conn| {
1047            let ns_refs = as_str_slice(&ns_owned);
1048            let ns_slice: Option<&[&str]> = ns_refs.as_deref();
1049            let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
1050
1051            #[cfg(feature = "hnsw")]
1052            {
1053                if hnsw_hits_owned.is_empty() {
1054                    search::hybrid_search_detailed(
1055                        conn,
1056                        &q,
1057                        &query_embedding,
1058                        &config,
1059                        k,
1060                        ns_slice,
1061                        st_slice,
1062                        None,
1063                    )
1064                } else {
1065                    search::hybrid_search_with_hnsw_detailed(
1066                        conn,
1067                        &q,
1068                        &query_embedding,
1069                        &config,
1070                        k,
1071                        ns_slice,
1072                        st_slice,
1073                        None,
1074                        &hnsw_hits_owned,
1075                    )
1076                }
1077            }
1078            #[cfg(not(feature = "hnsw"))]
1079            {
1080                search::hybrid_search_detailed(
1081                    conn,
1082                    &q,
1083                    &query_embedding,
1084                    &config,
1085                    k,
1086                    ns_slice,
1087                    st_slice,
1088                    None,
1089                )
1090            }
1091        })
1092        .await
1093    }
1094
1095    // ─── Embedding Displacement ───────────────────────────────
1096
1097    /// Compute embedding displacement between two texts.
1098    pub async fn embedding_displacement(
1099        &self,
1100        text_a: &str,
1101        text_b: &str,
1102    ) -> Result<types::EmbeddingDisplacement, MemoryError> {
1103        let emb_a = self.embed_text_internal(text_a).await?;
1104        let emb_b = self.embed_text_internal(text_b).await?;
1105        Self::embedding_displacement_from_vecs(&emb_a, &emb_b)
1106    }
1107
1108    /// Compute embedding displacement from pre-computed vectors.
1109    pub fn embedding_displacement_from_vecs(
1110        a: &[f32],
1111        b: &[f32],
1112    ) -> Result<types::EmbeddingDisplacement, MemoryError> {
1113        if a.len() != b.len() {
1114            return Err(MemoryError::DimensionMismatch {
1115                expected: a.len(),
1116                actual: b.len(),
1117            });
1118        }
1119        let cosine_sim = search::cosine_similarity(a, b);
1120
1121        let euclidean_dist: f32 = a
1122            .iter()
1123            .zip(b.iter())
1124            .map(|(x, y)| (x - y) * (x - y))
1125            .sum::<f32>()
1126            .sqrt();
1127
1128        let mag_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
1129        let mag_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
1130
1131        Ok(types::EmbeddingDisplacement {
1132            cosine_similarity: cosine_sim,
1133            euclidean_distance: euclidean_dist,
1134            magnitude_a: mag_a,
1135            magnitude_b: mag_b,
1136        })
1137    }
1138
1139    // ─── Utility ────────────────────────────────────────────────
1140
1141    /// Chunk text using the configured strategy and token counter.
1142    pub fn chunk_text(&self, text: &str) -> Vec<TextChunk> {
1143        chunker::chunk_text(
1144            text,
1145            &self.inner.config.chunking,
1146            self.inner.token_counter.as_ref(),
1147        )
1148    }
1149
1150    /// Embed a single text via the configured provider.
1151    pub async fn embed(&self, text: &str) -> Result<Vec<f32>, MemoryError> {
1152        self.embed_text_internal(text).await
1153    }
1154
1155    /// Embed multiple texts in a batch.
1156    pub async fn embed_batch(&self, texts: &[&str]) -> Result<Vec<Vec<f32>>, MemoryError> {
1157        let owned: Vec<String> = texts.iter().map(|s| s.to_string()).collect();
1158        self.embed_batch_internal(owned).await
1159    }
1160
1161    /// Get database statistics.
1162    pub async fn stats(&self) -> Result<MemoryStats, MemoryError> {
1163        let db_path = self.inner.paths.sqlite_path.clone();
1164        self.with_read_conn(move |conn| {
1165            let total_facts: u64 =
1166                conn.query_row("SELECT COUNT(*) FROM facts", [], |r| r.get(0))?;
1167            let total_documents: u64 =
1168                conn.query_row("SELECT COUNT(*) FROM documents", [], |r| r.get(0))?;
1169            let total_chunks: u64 =
1170                conn.query_row("SELECT COUNT(*) FROM chunks", [], |r| r.get(0))?;
1171            let total_sessions: u64 =
1172                conn.query_row("SELECT COUNT(*) FROM sessions", [], |r| r.get(0))?;
1173            let total_messages: u64 =
1174                conn.query_row("SELECT COUNT(*) FROM messages", [], |r| r.get(0))?;
1175
1176            let db_size = std::fs::metadata(&db_path).map(|m| m.len()).unwrap_or(0);
1177
1178            let (model, dims): (Option<String>, Option<usize>) = conn
1179                .query_row(
1180                    "SELECT model_name, dimensions FROM embedding_metadata WHERE id = 1",
1181                    [],
1182                    |r| Ok((Some(r.get(0)?), Some(r.get(1)?))),
1183                )
1184                .unwrap_or((None, None));
1185
1186            Ok(MemoryStats {
1187                total_facts,
1188                total_documents,
1189                total_chunks,
1190                total_sessions,
1191                total_messages,
1192                database_size_bytes: db_size,
1193                embedding_model: model,
1194                embedding_dimensions: dims,
1195            })
1196        })
1197        .await
1198    }
1199
1200    /// Return distinct scope_domain values stored in document metadata.
1201    ///
1202    /// Queries `json_extract(metadata, '$.scope_domain')` across all documents
1203    /// and returns the unique non-null values. Used by the Recall app to populate
1204    /// the scope picker dynamically instead of relying on a hardcoded list.
1205    pub async fn list_scope_domains(&self) -> Result<Vec<String>, MemoryError> {
1206        self.with_read_conn(|conn| {
1207            let mut stmt = conn.prepare(
1208                "SELECT DISTINCT json_extract(metadata, '$.scope_domain') \
1209                 FROM documents \
1210                 WHERE json_extract(metadata, '$.scope_domain') IS NOT NULL",
1211            )?;
1212            let domains: Vec<String> = stmt
1213                .query_map([], |row| row.get::<_, String>(0))?
1214                .filter_map(|r| r.ok())
1215                .collect();
1216            Ok(domains)
1217        })
1218        .await
1219    }
1220
1221    /// Check if embeddings need re-generation after a model change.
1222    pub async fn embeddings_are_dirty(&self) -> Result<bool, MemoryError> {
1223        self.with_read_conn(db::is_embeddings_dirty).await
1224    }
1225
1226    /// Re-embed all facts, chunks, messages, and episodes. Call after changing embedding models.
1227    pub async fn reembed_all(&self) -> Result<usize, MemoryError> {
1228        let mut count = 0usize;
1229        let batch_size = self.inner.config.embedding.batch_size;
1230        let dims = self.inner.config.embedding.dimensions;
1231
1232        // ─── Facts ──────────────────────────────────────────────────
1233        let fact_contents: Vec<(String, String)> = self
1234            .with_read_conn(|conn| {
1235                let mut stmt = conn.prepare("SELECT id, content FROM facts")?;
1236                let result = stmt
1237                    .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1238                    .collect::<Result<Vec<_>, _>>()?;
1239                Ok(result)
1240            })
1241            .await?;
1242
1243        let mut fact_count = 0usize;
1244        for batch in fact_contents.chunks(batch_size) {
1245            let texts: Vec<String> = batch.iter().map(|(_, c)| c.clone()).collect();
1246            let embeddings = self.embed_batch_internal(texts).await?;
1247            for embedding in &embeddings {
1248                self.validate_embedding_dimensions(embedding)?;
1249            }
1250
1251            let quantizer = Quantizer::new(dims);
1252            let updates: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = batch
1253                .iter()
1254                .zip(embeddings.iter())
1255                .map(|((id, _), emb)| {
1256                    // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
1257                    let q8 = quantizer
1258                        .quantize(emb)
1259                        .map(|qv| quantize::pack_quantized(&qv))
1260                        .ok();
1261                    (id.clone(), db::embedding_to_bytes(emb), q8)
1262                })
1263                .collect();
1264
1265            self.with_write_conn(move |conn| {
1266                db::with_transaction(conn, |tx| {
1267                    for (fid, bytes, q8) in &updates {
1268                        tx.execute(
1269                            "UPDATE facts SET embedding = ?1, embedding_q8 = ?2, updated_at = datetime('now') WHERE id = ?3",
1270                            rusqlite::params![bytes, q8.as_deref(), fid],
1271                        )?;
1272                        #[cfg(feature = "hnsw")]
1273                        db::queue_pending_index_op(
1274                            tx,
1275                            &format!("fact:{fid}"),
1276                            "fact",
1277                            db::IndexOpKind::Upsert,
1278                        )?;
1279                    }
1280                    Ok(())
1281                })
1282            })
1283            .await?;
1284
1285            fact_count += batch.len();
1286            count += batch.len();
1287            if fact_count % 100 == 0 || fact_count == count {
1288                tracing::info!(fact_count, "Re-embedded {} facts so far", fact_count);
1289            }
1290        }
1291
1292        // ─── Chunks ─────────────────────────────────────────────────
1293        let chunk_data: Vec<(String, String)> = self
1294            .with_read_conn(|conn| {
1295                let mut stmt = conn.prepare("SELECT id, content FROM chunks")?;
1296                let result = stmt
1297                    .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1298                    .collect::<Result<Vec<_>, _>>()?;
1299                Ok(result)
1300            })
1301            .await?;
1302
1303        let mut chunk_count = 0usize;
1304        for batch in chunk_data.chunks(batch_size) {
1305            let texts: Vec<String> = batch.iter().map(|(_, c)| c.clone()).collect();
1306            let embeddings = self.embed_batch_internal(texts).await?;
1307            for embedding in &embeddings {
1308                self.validate_embedding_dimensions(embedding)?;
1309            }
1310
1311            let quantizer = Quantizer::new(dims);
1312            let updates: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = batch
1313                .iter()
1314                .zip(embeddings.iter())
1315                .map(|((id, _), emb)| {
1316                    // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
1317                    let q8 = quantizer
1318                        .quantize(emb)
1319                        .map(|qv| quantize::pack_quantized(&qv))
1320                        .ok();
1321                    (id.clone(), db::embedding_to_bytes(emb), q8)
1322                })
1323                .collect();
1324
1325            self.with_write_conn(move |conn| {
1326                db::with_transaction(conn, |tx| {
1327                    for (cid, bytes, q8) in &updates {
1328                        tx.execute(
1329                            "UPDATE chunks SET embedding = ?1, embedding_q8 = ?2 WHERE id = ?3",
1330                            rusqlite::params![bytes, q8.as_deref(), cid],
1331                        )?;
1332                        #[cfg(feature = "hnsw")]
1333                        db::queue_pending_index_op(
1334                            tx,
1335                            &format!("chunk:{cid}"),
1336                            "chunk",
1337                            db::IndexOpKind::Upsert,
1338                        )?;
1339                    }
1340                    Ok(())
1341                })
1342            })
1343            .await?;
1344
1345            chunk_count += batch.len();
1346            count += batch.len();
1347            if chunk_count % 100 == 0 {
1348                tracing::info!(chunk_count, "Re-embedded {} chunks so far", chunk_count);
1349            }
1350        }
1351
1352        // ─── Messages ───────────────────────────────────────────────
1353        let message_data: Vec<(i64, String)> = self
1354            .with_read_conn(|conn| {
1355                let mut stmt = conn.prepare("SELECT id, content FROM messages")?;
1356                let result = stmt
1357                    .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1358                    .collect::<Result<Vec<_>, _>>()?;
1359                Ok(result)
1360            })
1361            .await?;
1362
1363        let mut msg_count = 0usize;
1364        for batch in message_data.chunks(batch_size) {
1365            let texts: Vec<String> = batch.iter().map(|(_, c)| c.clone()).collect();
1366            let embeddings = self.embed_batch_internal(texts).await?;
1367            for embedding in &embeddings {
1368                self.validate_embedding_dimensions(embedding)?;
1369            }
1370
1371            let quantizer = Quantizer::new(dims);
1372            let updates: Vec<(i64, Vec<u8>, Option<Vec<u8>>)> = batch
1373                .iter()
1374                .zip(embeddings.iter())
1375                .map(|((id, _), emb)| {
1376                    // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
1377                    let q8 = quantizer
1378                        .quantize(emb)
1379                        .map(|qv| quantize::pack_quantized(&qv))
1380                        .ok();
1381                    (*id, db::embedding_to_bytes(emb), q8)
1382                })
1383                .collect();
1384
1385            self.with_write_conn(move |conn| {
1386                db::with_transaction(conn, |tx| {
1387                    for (mid, bytes, q8) in &updates {
1388                        tx.execute(
1389                            "UPDATE messages SET embedding = ?1, embedding_q8 = ?2 WHERE id = ?3",
1390                            rusqlite::params![bytes, q8.as_deref(), mid],
1391                        )?;
1392                        #[cfg(feature = "hnsw")]
1393                        db::queue_pending_index_op(
1394                            tx,
1395                            &format!("msg:{mid}"),
1396                            "message",
1397                            db::IndexOpKind::Upsert,
1398                        )?;
1399                    }
1400                    Ok(())
1401                })
1402            })
1403            .await?;
1404
1405            msg_count += batch.len();
1406            count += batch.len();
1407            if msg_count % 100 == 0 {
1408                tracing::info!(msg_count, "Re-embedded {} messages so far", msg_count);
1409            }
1410        }
1411
1412        // ─── Episodes ───────────────────────────────────────────────
1413        let episode_data: Vec<(String, String)> = self
1414            .with_read_conn(|conn| {
1415                let mut stmt = conn.prepare("SELECT episode_id, search_text FROM episodes")?;
1416                let result = stmt
1417                    .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1418                    .collect::<Result<Vec<_>, _>>()?;
1419                Ok(result)
1420            })
1421            .await?;
1422
1423        let mut episode_count = 0usize;
1424        for batch in episode_data.chunks(batch_size) {
1425            let texts: Vec<String> = batch.iter().map(|(_, text)| text.clone()).collect();
1426            let embeddings = self.embed_batch_internal(texts).await?;
1427            for embedding in &embeddings {
1428                self.validate_embedding_dimensions(embedding)?;
1429            }
1430
1431            let quantizer = Quantizer::new(dims);
1432            let updates: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = batch
1433                .iter()
1434                .zip(embeddings.iter())
1435                .map(|((episode_id, _), embedding)| {
1436                    // INTENTIONAL: q8 quantization is an optional search optimization; missing q8 is non-fatal
1437                    let q8 = quantizer
1438                        .quantize(embedding)
1439                        .map(|vector| quantize::pack_quantized(&vector))
1440                        .ok();
1441                    (episode_id.clone(), db::embedding_to_bytes(embedding), q8)
1442                })
1443                .collect();
1444
1445            self.with_write_conn(move |conn| {
1446                db::with_transaction(conn, |tx| {
1447                    for (episode_id, bytes, q8) in &updates {
1448                        tx.execute(
1449                            "UPDATE episodes
1450                             SET embedding = ?1,
1451                                 embedding_q8 = ?2,
1452                                 updated_at = datetime('now')
1453                             WHERE episode_id = ?3",
1454                            rusqlite::params![bytes, q8.as_deref(), episode_id],
1455                        )?;
1456                        #[cfg(feature = "hnsw")]
1457                        db::queue_pending_index_op(
1458                            tx,
1459                            &episodes::episode_item_key(episode_id),
1460                            "episode",
1461                            db::IndexOpKind::Upsert,
1462                        )?;
1463                    }
1464                    Ok(())
1465                })
1466            })
1467            .await?;
1468
1469            episode_count += batch.len();
1470            count += batch.len();
1471            if episode_count % 100 == 0 {
1472                tracing::info!(
1473                    episode_count,
1474                    "Re-embedded {} episodes so far",
1475                    episode_count
1476                );
1477            }
1478        }
1479
1480        // Clear the dirty flag
1481        self.with_write_conn(db::clear_embeddings_dirty).await?;
1482
1483        tracing::info!(
1484            facts = fact_count,
1485            chunks = chunk_count,
1486            messages = msg_count,
1487            episodes = episode_count,
1488            total = count,
1489            "Re-embedding complete"
1490        );
1491
1492        // Rebuild HNSW after re-embedding
1493        #[cfg(feature = "hnsw")]
1494        {
1495            tracing::info!("Rebuilding HNSW index after re-embedding...");
1496            self.rebuild_hnsw_index().await?;
1497        }
1498
1499        Ok(count)
1500    }
1501
1502    /// Vacuum the database (reclaim space after deletions).
1503    pub async fn vacuum(&self) -> Result<(), MemoryError> {
1504        self.with_write_conn(|conn| {
1505            conn.execute_batch("VACUUM")?;
1506            Ok(())
1507        })
1508        .await
1509    }
1510
1511    // ─── Projection Import ─────────────────────────────────────
1512
1513    /// Import a projection envelope atomically (V10 legacy path).
1514    ///
1515    /// ## Phase status: compatibility / migration-only
1516    ///
1517    /// This method is the V10 legacy import path. New integrations should use
1518    /// [`import_projection_batch()`](Self::import_projection_batch) instead,
1519    /// which accepts the canonical `ProjectionImportBatchV3` format from
1520    /// `forge-memory-bridge`.
1521    ///
1522    /// **Removal condition**: removed when all callers migrate to the bridge pipeline.
1523    ///
1524    /// **Idempotent**: re-importing the same envelope (same `envelope_id` +
1525    /// `schema_version` + `content_digest`) returns a receipt with
1526    /// `was_duplicate = true` and does not modify data.
1527    ///
1528    /// **Atomic**: all records are committed in a single transaction. On any
1529    /// failure the entire import is rolled back — no partial visibility.
1530    ///
1531    /// **Provenance**: each imported record's metadata is tagged with the
1532    /// envelope_id and source_authority for traceability.
1533    #[deprecated(
1534        since = "0.5.0",
1535        note = "Legacy V10 import envelope path is compatibility-only. Use `import_projection_batch()` and `ProjectionImportBatchV3` on the canonical lane."
1536    )]
1537    #[doc(hidden)]
1538    #[allow(deprecated)]
1539    pub async fn import_envelope(
1540        &self,
1541        envelope: &projection_import::ImportEnvelope,
1542    ) -> Result<projection_import::ImportReceipt, MemoryError> {
1543        projection_legacy_compat::import_envelope(self, envelope).await
1544    }
1545
1546    /// Check whether an envelope has already been imported.
1547    #[deprecated(
1548        since = "0.5.0",
1549        note = "Legacy V10 import envelope status reads are compatibility-only. Prefer the projection import log."
1550    )]
1551    #[doc(hidden)]
1552    #[allow(deprecated)]
1553    pub async fn import_status(
1554        &self,
1555        envelope_id: &projection_import::EnvelopeId,
1556    ) -> Result<Vec<projection_import::ImportReceipt>, MemoryError> {
1557        projection_legacy_compat::import_status(self, envelope_id).await
1558    }
1559
1560    /// List recent imports, optionally filtered by namespace.
1561    #[deprecated(
1562        since = "0.5.0",
1563        note = "Legacy V10 import log access is compatibility-only. Prefer new projection-import metadata."
1564    )]
1565    #[doc(hidden)]
1566    #[allow(deprecated)]
1567    pub async fn list_imports(
1568        &self,
1569        namespace: Option<&str>,
1570        limit: usize,
1571    ) -> Result<Vec<projection_import::ImportReceipt>, MemoryError> {
1572        projection_legacy_compat::list_imports(self, namespace, limit).await
1573    }
1574
1575    /// Get the most recent successful import timestamp for a namespace.
1576    #[allow(deprecated)]
1577    pub async fn last_import_at(&self, namespace: &str) -> Result<Option<String>, MemoryError> {
1578        projection_legacy_compat::last_import_at(self, namespace).await
1579    }
1580
1581    /// Query imported claim projection rows through the supported public read surface.
1582    pub async fn query_claim_versions(
1583        &self,
1584        query: ProjectionQuery,
1585    ) -> Result<Vec<ProjectionClaimVersion>, MemoryError> {
1586        self.with_read_conn(move |conn| projection_storage::query_claim_versions(conn, &query))
1587            .await
1588    }
1589
1590    /// Query imported relation projection rows through the supported public read surface.
1591    pub async fn query_relation_versions(
1592        &self,
1593        query: ProjectionQuery,
1594    ) -> Result<Vec<ProjectionRelationVersion>, MemoryError> {
1595        self.with_read_conn(move |conn| projection_storage::query_relation_versions(conn, &query))
1596            .await
1597    }
1598
1599    /// Query imported episode projection rows through the supported public read surface.
1600    pub async fn query_episodes(
1601        &self,
1602        query: ProjectionQuery,
1603    ) -> Result<Vec<ProjectionEpisode>, MemoryError> {
1604        self.with_read_conn(move |conn| projection_storage::query_episode_rows(conn, &query))
1605            .await
1606    }
1607
1608    /// Query imported entity-alias rows through the supported public read surface.
1609    pub async fn query_entity_aliases(
1610        &self,
1611        query: ProjectionQuery,
1612    ) -> Result<Vec<ProjectionEntityAlias>, MemoryError> {
1613        self.with_read_conn(move |conn| projection_storage::query_entity_aliases(conn, &query))
1614            .await
1615    }
1616
1617    /// Query imported evidence-reference rows through the supported public read surface.
1618    pub async fn query_evidence_refs(
1619        &self,
1620        query: ProjectionQuery,
1621    ) -> Result<Vec<ProjectionEvidenceRef>, MemoryError> {
1622        self.with_read_conn(move |conn| projection_storage::query_evidence_refs(conn, &query))
1623            .await
1624    }
1625
1626    /// Execute raw SQL. For testing only — not part of the stable public API.
1627    #[cfg(any(test, feature = "testing"))]
1628    pub async fn raw_execute(&self, sql: &str, params: Vec<String>) -> Result<usize, MemoryError> {
1629        let sql = sql.to_string();
1630        self.with_write_conn(move |conn| {
1631            let param_refs: Vec<&dyn rusqlite::types::ToSql> = params
1632                .iter()
1633                .map(|s| s as &dyn rusqlite::types::ToSql)
1634                .collect();
1635            Ok(conn.execute(&sql, &*param_refs)?)
1636        })
1637        .await
1638    }
1639}