1#![allow(deprecated)]
2
3#[cfg(not(any(feature = "hnsw", feature = "brute-force", feature = "usearch-backend")))]
58compile_error!(
59 "At least one search backend feature must be enabled: 'hnsw', 'usearch-backend', or 'brute-force'"
60);
61
62pub mod chunker;
63pub mod config;
64pub(crate) mod conversation;
65pub mod db;
66pub(crate) mod documents;
67pub mod embedder;
68pub(crate) mod episodes;
69pub mod error;
70mod graph;
71#[cfg(feature = "hnsw")]
72pub mod hnsw;
73#[cfg(feature = "hnsw")]
74mod hnsw_backend;
75#[cfg(feature = "hnsw")]
76mod hnsw_ops;
77#[cfg(feature = "usearch-backend")]
78mod usearch_backend;
79pub mod vector_backend;
80mod json_compat_import;
81pub(crate) mod knowledge;
82mod pool;
83mod projection_batch;
84mod projection_derivation;
85#[deprecated(
89 since = "0.6.0",
90 note = "Legacy V10 import path is migration-only. Use `import_projection_batch()` with `ProjectionImportBatchV3` on the canonical lane."
91)]
92#[doc(hidden)]
93pub mod projection_import;
94mod projection_lane;
95mod projection_legacy_compat;
96pub(crate) mod projection_storage;
97pub mod quantize;
98pub mod quantize_governed;
99pub mod search;
100pub mod storage;
101mod store_support;
102pub mod tokenizer;
103pub mod types;
104pub mod vector_codec;
105
106pub use config::{
108 ChunkingConfig, DerivedVectorBackendPolicy, EmbeddingConfig, MemoryConfig, MemoryLimits,
109 PoolConfig, SearchConfig,
110};
111pub use db::{IntegrityReport, ReconcileAction, VerifyMode};
112pub use embedder::{Embedder, MockEmbedder, OllamaEmbedder};
113pub use error::MemoryError;
114#[cfg(feature = "hnsw")]
115pub use hnsw::{HnswConfig, HnswHit, HnswIndex};
116pub use vector_backend::{VectorBackend, VectorHit, VectorIndex, VectorIndexConfig};
119pub(crate) use projection_lane::projection_import_failure_id;
120pub use projection_lane::{
121 ProjectionImportFailureReceiptEntry, ProjectionImportLogEntry, ProjectionImportResult,
122};
123pub use quantize::{pack_quantized, unpack_quantized, QuantizedVector, Quantizer};
124pub use storage::StoragePaths;
125pub use tokenizer::{EstimateTokenCounter, TokenCounter};
126pub use types::{
127 ChunkManifestChunkMapping, ChunkManifestEntry, ChunkManifestIngestOptions,
128 ChunkManifestIngestResult, Document, EmbeddingDisplacement, EpisodeAsOfReceiptV1, EpisodeMeta,
129 EpisodeOutcome, ExactnessProfile, ExplainedResult, ExplainedResultAnswerV1,
130 ExplainedSearchResponse, Fact, GraphDirection, GraphEdge, GraphEdgeType, GraphView,
131 MemoryStats, Message, NamespaceDeleteReport, ProjectionClaimVersion, ProjectionEntityAlias,
132 ProjectionEpisode, ProjectionEvidenceRef, ProjectionQuery, ProjectionRelationVersion,
133 ReceiptMode, Role, ScoreBreakdown, SearchContext, SearchReceiptAnswersV1, SearchReplayReportV1,
134 SearchResponse, SearchResult, SearchSource, SearchSourceType, Session, TextChunk,
135 VectorArtifactBuildReceiptV1, VectorSearchReceiptV1, VerificationStatus,
136};
137#[cfg(feature = "turbo-quant-codec")]
138pub use vector_codec::TurboQuantCodec;
139pub use vector_codec::{
140 RawF32Codec, Sq8Codec, VectorArtifactV1, VectorCodec, VectorCodecProfileV1,
141};
142
143use std::sync::Arc;
144
145const MAX_TOP_K: usize = 1_000;
146#[cfg(feature = "hnsw")]
147const MAX_HNSW_CANDIDATES: usize = 10_000;
148
149pub(crate) use store_support::{
150 as_str_slice, build_episode_search_text, merge_trace_ctx, to_owned_string_vec,
151 verification_status_for_outcome,
152};
153
154#[cfg(feature = "hnsw")]
155fn verify_hnsw_key_level_integrity(
156 conn: &rusqlite::Connection,
157 dimensions: usize,
158 node_vectors: &std::collections::HashMap<usize, Vec<f32>>,
159 sidecar_files_exist: bool,
160) -> Result<Vec<String>, MemoryError> {
161 let mut issues = Vec::new();
162 let mut live_rows: std::collections::HashMap<String, Vec<f32>> =
163 std::collections::HashMap::new();
164
165 let mut live_stmt = conn.prepare(
166 "SELECT 'fact:' || id, embedding FROM facts WHERE embedding IS NOT NULL
167 UNION ALL
168 SELECT 'chunk:' || id, embedding FROM chunks WHERE embedding IS NOT NULL
169 UNION ALL
170 SELECT 'msg:' || id, embedding FROM messages WHERE embedding IS NOT NULL
171 UNION ALL
172 SELECT 'episode:' || episode_id, embedding FROM episodes WHERE embedding IS NOT NULL",
173 )?;
174 let live_iter = live_stmt.query_map([], |row| {
175 Ok((row.get::<_, String>(0)?, row.get::<_, Vec<u8>>(1)?))
176 })?;
177 for row in live_iter {
178 let (key, blob) = row?;
179 match db::decode_f32_le(&blob, dimensions) {
180 Ok(vector) => {
181 live_rows.insert(key, vector);
182 }
183 Err(err) => issues.push(format!(
184 "HNSW live embedding row {key} has invalid vector: {err}"
185 )),
186 }
187 }
188
189 if !live_rows.is_empty() && !sidecar_files_exist {
190 issues.push(format!(
191 "HNSW sidecar files are missing while {} embedded rows exist in SQLite",
192 live_rows.len()
193 ));
194 }
195
196 let keymap_exists: bool = conn
197 .query_row(
198 "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='hnsw_keymap'",
199 [],
200 |row| row.get(0),
201 )
202 .unwrap_or(false);
203 if !keymap_exists {
204 if !live_rows.is_empty() {
205 issues.push("HNSW keymap table missing while embedded SQLite rows exist".to_string());
206 }
207 return Ok(issues);
208 }
209
210 let mut active_keymap: std::collections::HashMap<String, usize> =
211 std::collections::HashMap::new();
212 let mut keymap_stmt =
213 conn.prepare("SELECT node_id, item_key FROM hnsw_keymap WHERE deleted = 0")?;
214 let keymap_iter = keymap_stmt.query_map([], |row| {
215 Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?))
216 })?;
217 for row in keymap_iter {
218 let (node_id_raw, key) = row?;
219 let Some((domain, raw_id)) = key.split_once(':') else {
220 issues.push(format!("HNSW keymap entry has malformed key: {key}"));
221 continue;
222 };
223 if !matches!(domain, "fact" | "chunk" | "msg" | "episode") || raw_id.is_empty() {
224 issues.push(format!(
225 "HNSW keymap entry has unsupported key domain: {key}"
226 ));
227 continue;
228 }
229 if domain == "msg" && raw_id.parse::<i64>().is_err() {
230 issues.push(format!("HNSW message key has non-integer row id: {key}"));
231 continue;
232 }
233 let node_id = match usize::try_from(node_id_raw) {
234 Ok(node_id) => node_id,
235 Err(err) => {
236 issues.push(format!(
237 "HNSW keymap node_id {node_id_raw} is invalid: {err}"
238 ));
239 continue;
240 }
241 };
242 active_keymap.insert(key, node_id);
243 }
244
245 for key in live_rows.keys() {
246 if !active_keymap.contains_key(key) {
247 issues.push(format!(
248 "HNSW keymap missing live embedded SQLite row: {key}"
249 ));
250 }
251 }
252
253 for (key, node_id) in &active_keymap {
254 let Some(live_vector) = live_rows.get(key) else {
255 issues.push(format!(
256 "HNSW keymap has stale active entry without live embedded SQLite row: {key}"
257 ));
258 continue;
259 };
260 let Some(index_vector) = node_vectors.get(node_id) else {
261 issues.push(format!(
262 "HNSW keymap entry {key} points to missing in-memory node vector {node_id}"
263 ));
264 continue;
265 };
266 if index_vector.len() != live_vector.len()
267 || index_vector
268 .iter()
269 .zip(live_vector)
270 .any(|(left, right)| left.to_bits() != right.to_bits())
271 {
272 issues.push(format!(
273 "HNSW keymap entry {key} points to node {node_id} whose vector does not match the authoritative SQLite embedding"
274 ));
275 }
276 }
277
278 if active_keymap.len() != live_rows.len() {
279 issues.push(format!(
280 "HNSW keymap drift: {} active keymap rows vs {} embedded SQLite rows",
281 active_keymap.len(),
282 live_rows.len()
283 ));
284 }
285
286 Ok(issues)
287}
288
289#[doc(hidden)]
291pub mod compat {
292 #[deprecated(
293 since = "0.5.0",
294 note = "Legacy ImportEnvelope is migration-only. New integrations should use `ProjectionImportBatchV3` on the canonical lane."
295 )]
296 #[doc(hidden)]
297 #[allow(deprecated)]
298 pub mod legacy_import_envelope {
299 pub use crate::projection_import::{
300 ImportEnvelope, ImportProjectionFreshness, ImportReceipt, ImportRecord, ImportStatus,
301 };
302 pub use stack_ids::EnvelopeId;
303 }
304
305 #[deprecated(
306 since = "0.5.0",
307 note = "Legacy trace_id is migration-only. Use `stack_ids::TraceCtx`."
308 )]
309 #[doc(hidden)]
310 #[allow(deprecated)]
311 pub mod compat_trace_id {
312 pub use crate::types::TraceId;
313 }
314}
315
316#[derive(Clone)]
320pub struct MemoryStore {
321 inner: Arc<MemoryStoreInner>,
322}
323
324struct MemoryStoreInner {
325 pool: pool::SqlitePool,
326 embedder: Box<dyn Embedder>,
327 embedding_permits: Arc<tokio::sync::Semaphore>,
328 config: MemoryConfig,
329 paths: StoragePaths,
330 token_counter: Arc<dyn TokenCounter>,
331 #[cfg(feature = "hnsw")]
332 hnsw_index: std::sync::RwLock<HnswIndex>,
333}
334
335#[cfg(feature = "hnsw")]
336impl Drop for MemoryStoreInner {
337 fn drop(&mut self) {
338 if !self.paths.hnsw_dir.exists() {
339 tracing::debug!(
340 path = %self.paths.hnsw_dir.display(),
341 "Skipping HNSW drop flush because the sidecar directory no longer exists"
342 );
343 return;
344 }
345
346 let pending_ops = match self.pool.with_read_conn(db::pending_index_op_count) {
347 Ok(count) => count,
348 Err(err) => {
349 tracing::warn!("Failed to inspect pending HNSW work on drop: {}", err);
350 0
351 }
352 };
353
354 if pending_ops > 0 {
355 if let Err(err) =
356 hnsw_ops::recover_hnsw_sidecar_sync(&self.pool, &self.paths, &self.config.hnsw)
357 {
358 tracing::error!("Failed to recover and flush HNSW on drop: {}", err);
359 }
360 return;
361 }
362
363 let hnsw_guard = match self.hnsw_index.read() {
364 Ok(g) => g,
365 Err(_) => {
366 tracing::warn!("HNSW RwLock poisoned on drop — skipping save");
367 return;
368 }
369 };
370
371 if let Err(err) = hnsw_ops::save_hnsw_sidecar(
372 &hnsw_guard,
373 &self.paths.hnsw_dir,
374 &self.paths.hnsw_basename,
375 ) {
376 tracing::error!("Failed to save HNSW index on drop: {}", err);
377 }
378
379 if let Err(e) = self
381 .pool
382 .with_write_conn(|conn| hnsw_guard.flush_keymap(conn))
383 {
384 tracing::error!("Failed to flush HNSW keymap on drop: {}", e);
385 }
386 }
387}
388
389impl MemoryStore {
390 async fn with_read_conn<F, T>(&self, f: F) -> Result<T, MemoryError>
395 where
396 F: FnOnce(&rusqlite::Connection) -> Result<T, MemoryError> + Send + 'static,
397 T: Send + 'static,
398 {
399 let inner = self.inner.clone();
400 tokio::task::spawn_blocking(move || -> Result<T, MemoryError> {
401 inner.pool.with_read_conn(f)
402 })
403 .await
404 .map_err(|e| MemoryError::Other(format!("Blocking task panicked: {}", e)))?
405 }
406
407 async fn with_write_conn<F, T>(&self, f: F) -> Result<T, MemoryError>
409 where
410 F: FnOnce(&rusqlite::Connection) -> Result<T, MemoryError> + Send + 'static,
411 T: Send + 'static,
412 {
413 let inner = self.inner.clone();
414 tokio::task::spawn_blocking(move || -> Result<T, MemoryError> {
415 inner.pool.with_write_conn(f)
416 })
417 .await
418 .map_err(|e| MemoryError::Other(format!("Blocking task panicked: {}", e)))?
419 }
420
421 async fn persist_search_receipt(
422 &self,
423 receipt: &VectorSearchReceiptV1,
424 ) -> Result<(), MemoryError> {
425 let receipt = receipt.clone();
426 self.with_write_conn(move |conn| db::store_search_receipt(conn, &receipt))
427 .await
428 }
429
430 #[cfg(feature = "hnsw")]
433 async fn hnsw_search_blocking(
434 &self,
435 query_embedding: Vec<f32>,
436 candidates: usize,
437 ) -> Vec<HnswHit> {
438 let inner = self.inner.clone();
439 tokio::task::spawn_blocking(move || {
440 let guard = inner.hnsw_index.read().unwrap_or_else(|e| e.into_inner());
441 match guard.search(&query_embedding, candidates) {
442 Ok(hits) => hits,
443 Err(e) => {
444 tracing::error!(
445 "HNSW search failed, falling back to brute-force vector search: {}",
446 e
447 );
448 Vec::new()
449 }
450 }
451 })
452 .await
453 .unwrap_or_else(|e| {
454 tracing::error!("HNSW search blocking task panicked: {}", e);
455 Vec::new()
456 })
457 }
458
459 #[cfg(feature = "hnsw")]
460 fn sync_pending_hnsw_ops_blocking(&self) -> Result<usize, MemoryError> {
461 hnsw_ops::sync_pending_hnsw_sidecar(&self.inner)
462 }
463
464 #[cfg(feature = "hnsw")]
465 async fn sync_pending_hnsw_ops(&self) -> Result<usize, MemoryError> {
466 let inner = self.inner.clone();
467 tokio::task::spawn_blocking(move || hnsw_ops::sync_pending_hnsw_sidecar(&inner))
468 .await
469 .map_err(|e| MemoryError::Other(format!("Blocking task panicked: {}", e)))?
470 }
471
472 #[cfg(feature = "hnsw")]
473 async fn sync_pending_hnsw_ops_best_effort(&self, operation: &'static str) {
474 if let Err(err) = self.sync_pending_hnsw_ops().await {
475 tracing::warn!(
476 operation,
477 error = %err,
478 "SQLite write committed but HNSW sidecar sync is still pending"
479 );
480 } else {
481 self.maybe_flush_hnsw();
482 }
483 }
484
485 pub fn open(config: MemoryConfig) -> Result<Self, MemoryError> {
490 let config = config.normalize_and_validate()?;
491 let embedder = Box::new(OllamaEmbedder::try_new(&config.embedding)?);
492 Self::open_with_embedder(config, embedder)
493 }
494
495 #[allow(unused_mut)] pub fn open_with_embedder(
498 mut config: MemoryConfig,
499 embedder: Box<dyn Embedder>,
500 ) -> Result<Self, MemoryError> {
501 config = config.normalize_and_validate()?;
502 if embedder.dimensions() != config.embedding.dimensions {
503 return Err(MemoryError::DimensionMismatch {
504 expected: config.embedding.dimensions,
505 actual: embedder.dimensions(),
506 });
507 }
508 config.embedding.model = embedder.model_name().to_string();
509
510 let paths = StoragePaths::new(&config.base_dir);
511
512 std::fs::create_dir_all(&paths.base_dir).map_err(|e| {
514 MemoryError::StorageError(format!(
515 "Failed to create directory {}: {}",
516 paths.base_dir.display(),
517 e
518 ))
519 })?;
520
521 let pool = pool::SqlitePool::open(&paths.sqlite_path, &config.pool, &config.limits)?;
522 pool.with_write_conn(|conn| db::check_embedding_metadata(conn, &config.embedding))?;
523
524 #[cfg(feature = "hnsw")]
526 {
527 config.hnsw.dimensions = config.embedding.dimensions;
528 }
529
530 let token_counter = config
531 .token_counter
532 .clone()
533 .unwrap_or_else(tokenizer::default_token_counter);
534
535 #[cfg(feature = "hnsw")]
536 let hnsw_index = {
537 let hnsw_config = config.hnsw.clone();
538
539 let embeddings_dirty = pool.with_read_conn(db::is_embeddings_dirty)?;
540 let pending_index_ops = pool.with_read_conn(db::pending_index_op_count)?;
541
542 if embeddings_dirty {
543 tracing::warn!(
546 "Embedding model changed — creating fresh HNSW index (old index is stale)"
547 );
548 pool.with_write_conn(|conn| {
549 db::clear_all_pending_index_ops(conn)?;
550 db::set_sidecar_dirty(conn, false)?;
551 Ok(())
552 })?;
553 HnswIndex::new(hnsw_config)?
554 } else if pending_index_ops > 0 || pool.with_read_conn(db::is_sidecar_dirty)? {
555 tracing::warn!(
556 pending_index_ops,
557 "Recovering HNSW sidecar from SQLite because durable sidecar work exists"
558 );
559 hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?
560 } else if paths.hnsw_files_exist() {
561 tracing::info!("Loading HNSW index from {:?}", paths.hnsw_dir);
562 match HnswIndex::load(&paths.hnsw_dir, &paths.hnsw_basename, hnsw_config.clone()) {
563 Ok(index) => {
564 if let Err(e) = pool.with_write_conn(|conn| index.load_keymap(conn)) {
566 tracing::warn!("Failed to load HNSW key mappings: {}. Mappings will be empty until rebuild.", e);
567 }
568
569 let hnsw_count = index.len();
573 let sqlite_count: i64 = pool.with_read_conn(|conn| {
574 Ok(conn.query_row(
575 "SELECT (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
576 (SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
577 (SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
578 (SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
579 [],
580 |row| row.get(0),
581 )?)
582 })?;
583
584 let drift = (sqlite_count - hnsw_count as i64).abs();
585 if drift > 0 {
586 tracing::warn!(
587 hnsw_count,
588 sqlite_count,
589 drift,
590 "HNSW index is stale — {} entries differ from SQLite. \
591 Likely caused by unclean shutdown. Triggering inline rebuild.",
592 drift
593 );
594 let rebuilt =
596 hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?;
597 tracing::info!(
598 active = rebuilt.len(),
599 "HNSW index rebuilt after stale detection"
600 );
601 rebuilt
602 } else {
603 tracing::info!(
604 "HNSW index loaded ({} active keys, in sync with SQLite)",
605 hnsw_count
606 );
607 index
608 }
609 }
610 Err(e) => {
611 tracing::warn!(
612 "Failed to load HNSW index: {}. Rebuilding sidecar from authoritative SQLite rows.",
613 e
614 );
615 hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?
616 }
617 }
618 } else {
619 let orphan_count: i64 = pool.with_read_conn(|conn| {
624 Ok(conn.query_row(
625 "SELECT (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
626 (SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
627 (SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
628 (SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
629 [],
630 |row| row.get(0),
631 )?)
632 })?;
633
634 if orphan_count > 0 {
635 tracing::warn!(
636 orphan_count,
637 "HNSW sidecar files missing but {} embeddings exist in SQLite — \
638 rebuilding index inline",
639 orphan_count
640 );
641 let new_index =
642 hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?;
643 tracing::info!(
644 active = new_index.len(),
645 "HNSW index rebuilt from SQLite embeddings"
646 );
647 new_index
648 } else {
649 tracing::info!("Creating new empty HNSW index (no embeddings in SQLite)");
650 HnswIndex::new(hnsw_config)?
651 }
652 }
653 };
654
655 let store = Self {
656 inner: Arc::new(MemoryStoreInner {
657 pool,
658 embedder,
659 embedding_permits: Arc::new(tokio::sync::Semaphore::new(
660 config.limits.max_embedding_concurrency,
661 )),
662 config,
663 paths,
664 token_counter,
665 #[cfg(feature = "hnsw")]
666 hnsw_index: std::sync::RwLock::new(hnsw_index),
667 }),
668 };
669
670 #[cfg(feature = "hnsw")]
671 if let Err(err) = store.sync_pending_hnsw_ops_blocking() {
672 tracing::warn!(
673 error = %err,
674 "Failed to reconcile pending HNSW sidecar ops during open; sidecar replay remains pending"
675 );
676 }
677
678 Ok(store)
679 }
680
681 async fn with_embedding_permit(
682 &self,
683 ) -> Result<tokio::sync::OwnedSemaphorePermit, MemoryError> {
684 self.inner
685 .embedding_permits
686 .clone()
687 .acquire_owned()
688 .await
689 .map_err(|e| MemoryError::Other(format!("embedding semaphore closed: {e}")))
690 }
691
692 async fn embed_text_internal(&self, text: &str) -> Result<Vec<f32>, MemoryError> {
693 let _permit = self.with_embedding_permit().await?;
694 let embedding = self.inner.embedder.embed(text).await?;
695 db::validate_embedding(&embedding, self.inner.config.embedding.dimensions)?;
696 Ok(embedding)
697 }
698
699 async fn embed_batch_internal(&self, texts: Vec<String>) -> Result<Vec<Vec<f32>>, MemoryError> {
700 let requested = texts.len();
701 let _permit = self.with_embedding_permit().await?;
702 let embeddings = self.inner.embedder.embed_batch(texts).await?;
703 db::validate_embedding_batch(
704 &embeddings,
705 requested,
706 self.inner.config.embedding.dimensions,
707 )?;
708 Ok(embeddings)
709 }
710
711 fn validate_embedding_dimensions(&self, embedding: &[f32]) -> Result<(), MemoryError> {
712 db::validate_embedding(embedding, self.inner.config.embedding.dimensions)
713 }
714
715 fn validate_content(&self, field: &'static str, content: &str) -> Result<(), MemoryError> {
716 if content.is_empty() {
717 return Err(MemoryError::InvalidConfig {
718 field,
719 reason: "content must not be empty".to_string(),
720 });
721 }
722
723 let limit = self.inner.config.limits.max_content_bytes;
724 if content.len() > limit {
725 return Err(MemoryError::ContentTooLarge {
726 size: content.len(),
727 limit,
728 });
729 }
730
731 Ok(())
732 }
733
734 fn validate_confidence(confidence: f32) -> Result<(), MemoryError> {
735 if !confidence.is_finite() || !(0.0..=1.0).contains(&confidence) {
736 return Err(MemoryError::InvalidConfig {
737 field: "episodes.confidence",
738 reason: "confidence must be finite and within [0.0, 1.0]".to_string(),
739 });
740 }
741 Ok(())
742 }
743
744 #[cfg(feature = "turbo-quant-codec")]
748 pub async fn rebuild_vector_artifacts(
749 &self,
750 ) -> Result<VectorArtifactBuildReceiptV1, MemoryError> {
751 let dim = self.inner.config.embedding.dimensions;
752 let search = self.inner.config.search.clone();
753 self.with_write_conn(move |conn| {
754 db::rebuild_turbo_quant_artifacts(
755 conn,
756 dim,
757 search.turbo_quant_bits,
758 search.turbo_quant_projections,
759 search.turbo_quant_seed,
760 )
761 })
762 .await
763 }
764
765 #[cfg(feature = "hnsw")]
769 pub async fn rebuild_hnsw_index(
770 &self,
771 ) -> Result<crate::types::VectorArtifactBuildReceiptV1, MemoryError> {
772 tracing::info!("Rebuilding HNSW index from SQLite embeddings...");
773 let hnsw_config = self.inner.config.hnsw.clone();
774 let (new_index, build_receipt) = self
775 .with_read_conn(move |conn| hnsw_ops::rebuild_hnsw_from_sqlite(conn, &hnsw_config))
776 .await?;
777
778 {
779 let mut guard = self
780 .inner
781 .hnsw_index
782 .write()
783 .unwrap_or_else(|e| e.into_inner());
784 *guard = new_index.clone();
785 }
786
787 hnsw_ops::save_hnsw_sidecar(
788 &new_index,
789 &self.inner.paths.hnsw_dir,
790 &self.inner.paths.hnsw_basename,
791 )?;
792 self.inner.pool.with_write_conn(|conn| {
793 new_index.flush_keymap(conn)?;
794 db::clear_all_pending_index_ops(conn)?;
795 db::set_sidecar_dirty(conn, false)?;
796 Ok(())
797 })?;
798
799 tracing::info!(active = new_index.len(), receipt_generation_id = ?build_receipt.generation_id, "HNSW index rebuilt");
800
801 Ok(build_receipt)
802 }
803
804 #[cfg(feature = "hnsw")]
809 fn maybe_flush_hnsw(&self) {
810 if let Some(interval) = self.inner.config.hnsw.flush_interval_secs {
811 let guard = self
812 .inner
813 .hnsw_index
814 .read()
815 .unwrap_or_else(|e| e.into_inner());
816 if guard.should_flush(interval) {
817 drop(guard); if let Err(e) = self.flush_hnsw() {
819 tracing::warn!("Opportunistic HNSW flush failed: {}", e);
820 } else {
821 let guard = self
822 .inner
823 .hnsw_index
824 .read()
825 .unwrap_or_else(|e| e.into_inner());
826 guard.update_last_flush_epoch();
827 tracing::info!("Opportunistic HNSW flush completed");
828 }
829 }
830 }
831 }
832
833 #[cfg(feature = "hnsw")]
837 pub fn flush_hnsw(&self) -> Result<(), MemoryError> {
838 let pending_ops = self.inner.pool.with_read_conn(db::pending_index_op_count)?;
839 if pending_ops > 0 {
840 tracing::info!(
841 pending_ops,
842 "Flushing HNSW via authoritative SQLite rebuild because pending durable sidecar work exists"
843 );
844 let rebuilt = hnsw_ops::recover_hnsw_sidecar_sync(
845 &self.inner.pool,
846 &self.inner.paths,
847 &self.inner.config.hnsw,
848 )?;
849 let mut guard = self
850 .inner
851 .hnsw_index
852 .write()
853 .unwrap_or_else(|e| e.into_inner());
854 *guard = rebuilt;
855 return Ok(());
856 }
857
858 let index = self
859 .inner
860 .hnsw_index
861 .write()
862 .unwrap_or_else(|e| e.into_inner());
863 hnsw_ops::save_hnsw_sidecar(
864 &index,
865 &self.inner.paths.hnsw_dir,
866 &self.inner.paths.hnsw_basename,
867 )?;
868
869 self.inner.pool.with_write_conn(|conn| {
871 index.flush_keymap(conn)?;
872 db::clear_all_pending_index_ops(conn)?;
873 db::set_sidecar_dirty(conn, false)?;
874 Ok(())
875 })?;
876 Ok(())
877 }
878
879 #[cfg(feature = "hnsw")]
883 pub async fn compact_hnsw(&self) -> Result<(), MemoryError> {
884 if !self
885 .inner
886 .hnsw_index
887 .read()
888 .unwrap_or_else(|e| e.into_inner())
889 .needs_compaction()
890 {
891 tracing::info!("HNSW compaction not needed (deleted ratio below threshold)");
892 return Ok(());
893 }
894 let _receipt = self.rebuild_hnsw_index().await?;
895 Ok(())
896 }
897
898 pub async fn verify_integrity(
905 &self,
906 mode: db::VerifyMode,
907 ) -> Result<db::IntegrityReport, MemoryError> {
908 let use_writer = mode == db::VerifyMode::Full;
909 let mut report = if use_writer {
910 self.with_write_conn(move |conn| db::verify_integrity_sync(conn, mode))
911 .await?
912 } else {
913 self.with_read_conn(move |conn| db::verify_integrity_sync(conn, mode))
914 .await?
915 };
916
917 #[cfg(feature = "hnsw")]
918 {
919 let hnsw_vectors = self
920 .inner
921 .hnsw_index
922 .read()
923 .unwrap_or_else(|e| e.into_inner())
924 .vector_snapshot();
925 let hnsw_dims = self.inner.config.embedding.dimensions;
926 let hnsw_files_exist = self.inner.paths.hnsw_files_exist();
927
928 let hnsw_issues = if use_writer {
929 let hnsw_vectors = hnsw_vectors.clone();
930 self.with_write_conn(move |conn| {
931 verify_hnsw_key_level_integrity(
932 conn,
933 hnsw_dims,
934 &hnsw_vectors,
935 hnsw_files_exist,
936 )
937 })
938 .await?
939 } else {
940 let hnsw_vectors = hnsw_vectors.clone();
941 self.with_read_conn(move |conn| {
942 verify_hnsw_key_level_integrity(
943 conn,
944 hnsw_dims,
945 &hnsw_vectors,
946 hnsw_files_exist,
947 )
948 })
949 .await?
950 };
951 report.issues.extend(hnsw_issues);
952 }
953
954 report.ok = report.issues.is_empty();
955 Ok(report)
956 }
957
958 pub async fn reconcile(
964 &self,
965 action: db::ReconcileAction,
966 ) -> Result<db::IntegrityReport, MemoryError> {
967 match action {
968 db::ReconcileAction::ReportOnly => self.verify_integrity(db::VerifyMode::Full).await,
969 db::ReconcileAction::RebuildFts => {
970 self.with_write_conn(db::reconcile_fts).await?;
971 #[cfg(feature = "hnsw")]
972 self.sync_pending_hnsw_ops_best_effort("reconcile_rebuild_fts")
973 .await;
974 self.verify_integrity(db::VerifyMode::Full).await
975 }
976 db::ReconcileAction::ReEmbed => {
977 self.reembed_all().await?;
978 self.verify_integrity(db::VerifyMode::Full).await
979 }
980 }
981 }
982
983 pub fn config(&self) -> &MemoryConfig {
985 &self.inner.config
986 }
987
988 pub fn graph_view(&self) -> Arc<dyn GraphView> {
991 graph::graph_view(self.inner.clone())
992 }
993
994 pub async fn search(
998 &self,
999 query: &str,
1000 top_k: Option<usize>,
1001 namespaces: Option<&[&str]>,
1002 source_types: Option<&[SearchSourceType]>,
1003 ) -> Result<Vec<SearchResult>, MemoryError> {
1004 Ok(self
1005 .search_with_context(
1006 query,
1007 top_k,
1008 namespaces,
1009 source_types,
1010 SearchContext::default_now(),
1011 )
1012 .await?
1013 .results)
1014 }
1015
1016 pub async fn search_with_context(
1018 &self,
1019 query: &str,
1020 top_k: Option<usize>,
1021 namespaces: Option<&[&str]>,
1022 source_types: Option<&[SearchSourceType]>,
1023 context: SearchContext,
1024 ) -> Result<SearchResponse, MemoryError> {
1025 let k = top_k
1026 .unwrap_or(self.inner.config.search.default_top_k)
1027 .min(MAX_TOP_K);
1028
1029 let query_embedding = self.embed_text_internal(query).await?;
1030
1031 #[cfg(feature = "hnsw")]
1032 let hnsw_hits = if context.exactness_profile == ExactnessProfile::PreferExact
1033 || self.inner.config.search.uses_turbo_quant_backend()
1034 {
1035 Vec::new()
1036 } else {
1037 let candidates = self
1038 .inner
1039 .config
1040 .search
1041 .candidate_pool_size
1042 .max(k.saturating_mul(3))
1043 .min(MAX_HNSW_CANDIDATES);
1044 self.hnsw_search_blocking(query_embedding.clone(), candidates)
1045 .await
1046 };
1047
1048 let q = query.to_string();
1049 let config = self.inner.config.search.clone();
1050 let ns_owned = to_owned_string_vec(namespaces);
1051 let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|s| s.to_vec());
1052 let context_owned = context.clone();
1053
1054 #[cfg(feature = "hnsw")]
1055 let hnsw_hits_owned = hnsw_hits;
1056
1057 let response = self
1058 .with_read_conn(move |conn| {
1059 if db::is_embeddings_dirty(conn)? {
1060 tracing::warn!(
1061 "Embeddings are stale after model change — search quality is degraded. \
1062 Call reembed_all() to regenerate embeddings."
1063 );
1064 }
1065 let ns_refs = as_str_slice(&ns_owned);
1066 let ns_slice: Option<&[&str]> = ns_refs.as_deref();
1067 let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
1068
1069 #[cfg(feature = "hnsw")]
1070 {
1071 let mut execution = if hnsw_hits_owned.is_empty() {
1072 search::hybrid_search_detailed_with_context(
1073 conn,
1074 &q,
1075 &query_embedding,
1076 &config,
1077 &context_owned,
1078 k,
1079 ns_slice,
1080 st_slice,
1081 None,
1082 )
1083 } else {
1084 search::hybrid_search_with_hnsw_detailed_with_context(
1085 conn,
1086 &q,
1087 &query_embedding,
1088 &config,
1089 &context_owned,
1090 k,
1091 ns_slice,
1092 st_slice,
1093 None,
1094 &hnsw_hits_owned,
1095 )
1096 }?;
1097 if context_owned.receipts_enabled()
1098 && context_owned.exactness_profile == ExactnessProfile::PreferExact
1099 {
1100 if let Some(receipt) = execution.receipt.as_mut() {
1101 receipt.search_profile = "hybrid_prefer_exact".to_string();
1102 }
1103 }
1104 Ok(SearchResponse {
1105 results: execution
1106 .results
1107 .into_iter()
1108 .map(|result| result.result)
1109 .collect(),
1110 receipt: execution.receipt,
1111 })
1112 }
1113 #[cfg(not(feature = "hnsw"))]
1114 {
1115 let execution = search::hybrid_search_detailed_with_context(
1116 conn,
1117 &q,
1118 &query_embedding,
1119 &config,
1120 &context_owned,
1121 k,
1122 ns_slice,
1123 st_slice,
1124 None,
1125 )?;
1126 Ok(SearchResponse {
1127 results: execution
1128 .results
1129 .into_iter()
1130 .map(|result| result.result)
1131 .collect(),
1132 receipt: execution.receipt,
1133 })
1134 }
1135 })
1136 .await?;
1137 if let Some(receipt) = &response.receipt {
1138 self.persist_search_receipt(receipt).await?;
1139 }
1140 Ok(response)
1141 }
1142
1143 pub async fn search_fts_only(
1145 &self,
1146 query: &str,
1147 top_k: Option<usize>,
1148 namespaces: Option<&[&str]>,
1149 source_types: Option<&[SearchSourceType]>,
1150 ) -> Result<Vec<SearchResult>, MemoryError> {
1151 let k = top_k
1152 .unwrap_or(self.inner.config.search.default_top_k)
1153 .min(MAX_TOP_K);
1154 let q = query.to_string();
1155 let config = self.inner.config.search.clone();
1156 let ns_owned = to_owned_string_vec(namespaces);
1157 let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|s| s.to_vec());
1158 self.with_read_conn(move |conn| {
1159 let ns_refs = as_str_slice(&ns_owned);
1160 let ns_slice: Option<&[&str]> = ns_refs.as_deref();
1161 let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
1162 search::fts_only_search(conn, &q, &config, k, ns_slice, st_slice, None)
1163 })
1164 .await
1165 }
1166
1167 pub async fn search_vector_only(
1169 &self,
1170 query: &str,
1171 top_k: Option<usize>,
1172 namespaces: Option<&[&str]>,
1173 source_types: Option<&[SearchSourceType]>,
1174 ) -> Result<Vec<SearchResult>, MemoryError> {
1175 Ok(self
1176 .search_vector_only_with_context(
1177 query,
1178 top_k,
1179 namespaces,
1180 source_types,
1181 SearchContext::default_now(),
1182 )
1183 .await?
1184 .results)
1185 }
1186
1187 pub async fn search_vector_only_with_context(
1189 &self,
1190 query: &str,
1191 top_k: Option<usize>,
1192 namespaces: Option<&[&str]>,
1193 source_types: Option<&[SearchSourceType]>,
1194 context: SearchContext,
1195 ) -> Result<SearchResponse, MemoryError> {
1196 let k = top_k
1197 .unwrap_or(self.inner.config.search.default_top_k)
1198 .min(MAX_TOP_K);
1199 let query_embedding = self.embed_text_internal(query).await?;
1200
1201 #[cfg(feature = "hnsw")]
1202 let hnsw_hits = if context.exactness_profile == ExactnessProfile::PreferExact
1203 || self.inner.config.search.uses_turbo_quant_backend()
1204 {
1205 Vec::new()
1206 } else {
1207 let candidates = self
1208 .inner
1209 .config
1210 .search
1211 .candidate_pool_size
1212 .max(k.saturating_mul(3))
1213 .min(MAX_HNSW_CANDIDATES);
1214 self.hnsw_search_blocking(query_embedding.clone(), candidates)
1215 .await
1216 };
1217
1218 let config = self.inner.config.search.clone();
1219 let ns_owned = to_owned_string_vec(namespaces);
1220 let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|s| s.to_vec());
1221 let context_owned = context.clone();
1222
1223 #[cfg(feature = "hnsw")]
1224 let hnsw_hits_owned = hnsw_hits;
1225
1226 let response = self
1227 .with_read_conn(move |conn| {
1228 if db::is_embeddings_dirty(conn)? {
1229 tracing::warn!(
1230 "Embeddings are stale after model change — search quality is degraded. \
1231 Call reembed_all() to regenerate embeddings."
1232 );
1233 }
1234 let ns_refs = as_str_slice(&ns_owned);
1235 let ns_slice: Option<&[&str]> = ns_refs.as_deref();
1236 let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
1237
1238 #[cfg(feature = "hnsw")]
1239 {
1240 let mut execution = if hnsw_hits_owned.is_empty() {
1241 search::vector_only_search_detailed_with_context(
1242 conn,
1243 &query_embedding,
1244 &config,
1245 &context_owned,
1246 k,
1247 ns_slice,
1248 st_slice,
1249 None,
1250 )
1251 } else {
1252 search::vector_only_search_with_hnsw_detailed_with_context(
1253 conn,
1254 &query_embedding,
1255 &config,
1256 &context_owned,
1257 k,
1258 ns_slice,
1259 st_slice,
1260 None,
1261 &hnsw_hits_owned,
1262 )
1263 }?;
1264 if context_owned.receipts_enabled()
1265 && context_owned.exactness_profile == ExactnessProfile::PreferExact
1266 {
1267 if let Some(receipt) = execution.receipt.as_mut() {
1268 receipt.search_profile = "vector_only_prefer_exact".to_string();
1269 }
1270 }
1271 Ok(SearchResponse {
1272 results: execution
1273 .results
1274 .into_iter()
1275 .map(|result| result.result)
1276 .collect(),
1277 receipt: execution.receipt,
1278 })
1279 }
1280 #[cfg(not(feature = "hnsw"))]
1281 {
1282 let execution = search::vector_only_search_detailed_with_context(
1283 conn,
1284 &query_embedding,
1285 &config,
1286 &context_owned,
1287 k,
1288 ns_slice,
1289 st_slice,
1290 None,
1291 )?;
1292 Ok(SearchResponse {
1293 results: execution
1294 .results
1295 .into_iter()
1296 .map(|result| result.result)
1297 .collect(),
1298 receipt: execution.receipt,
1299 })
1300 }
1301 })
1302 .await?;
1303 if let Some(receipt) = &response.receipt {
1304 self.persist_search_receipt(receipt).await?;
1305 }
1306 Ok(response)
1307 }
1308
1309 pub async fn search_explained(
1313 &self,
1314 query: &str,
1315 top_k: Option<usize>,
1316 namespaces: Option<&[&str]>,
1317 source_types: Option<&[SearchSourceType]>,
1318 ) -> Result<Vec<types::ExplainedResult>, MemoryError> {
1319 Ok(self
1320 .search_explained_with_context(
1321 query,
1322 top_k,
1323 namespaces,
1324 source_types,
1325 SearchContext::default_now(),
1326 )
1327 .await?
1328 .results)
1329 }
1330
1331 pub async fn search_explained_with_context(
1333 &self,
1334 query: &str,
1335 top_k: Option<usize>,
1336 namespaces: Option<&[&str]>,
1337 source_types: Option<&[SearchSourceType]>,
1338 context: SearchContext,
1339 ) -> Result<types::ExplainedSearchResponse, MemoryError> {
1340 let k = top_k
1341 .unwrap_or(self.inner.config.search.default_top_k)
1342 .min(MAX_TOP_K);
1343 let query_embedding = self.embed_text_internal(query).await?;
1344
1345 #[cfg(feature = "hnsw")]
1346 let hnsw_hits = if context.exactness_profile == ExactnessProfile::PreferExact {
1347 Vec::new()
1348 } else {
1349 let candidates = self
1350 .inner
1351 .config
1352 .search
1353 .candidate_pool_size
1354 .max(k.saturating_mul(3))
1355 .min(MAX_HNSW_CANDIDATES);
1356 self.hnsw_search_blocking(query_embedding.clone(), candidates)
1357 .await
1358 };
1359
1360 let q = query.to_string();
1361 let config = self.inner.config.search.clone();
1362 let ns_owned = to_owned_string_vec(namespaces);
1363 let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|value| value.to_vec());
1364 let context_owned = context.clone();
1365
1366 #[cfg(feature = "hnsw")]
1367 let hnsw_hits_owned = hnsw_hits;
1368
1369 let response = self
1370 .with_read_conn(move |conn| {
1371 let ns_refs = as_str_slice(&ns_owned);
1372 let ns_slice: Option<&[&str]> = ns_refs.as_deref();
1373 let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
1374
1375 #[cfg(feature = "hnsw")]
1376 {
1377 let mut execution = if hnsw_hits_owned.is_empty() {
1378 search::hybrid_search_detailed_with_context(
1379 conn,
1380 &q,
1381 &query_embedding,
1382 &config,
1383 &context_owned,
1384 k,
1385 ns_slice,
1386 st_slice,
1387 None,
1388 )
1389 } else {
1390 search::hybrid_search_with_hnsw_detailed_with_context(
1391 conn,
1392 &q,
1393 &query_embedding,
1394 &config,
1395 &context_owned,
1396 k,
1397 ns_slice,
1398 st_slice,
1399 None,
1400 &hnsw_hits_owned,
1401 )
1402 }?;
1403 if context_owned.receipts_enabled()
1404 && context_owned.exactness_profile == ExactnessProfile::PreferExact
1405 {
1406 if let Some(receipt) = execution.receipt.as_mut() {
1407 receipt.search_profile = "hybrid_prefer_exact".to_string();
1408 }
1409 }
1410 Ok(types::ExplainedSearchResponse {
1411 results: execution.results,
1412 receipt: execution.receipt,
1413 })
1414 }
1415 #[cfg(not(feature = "hnsw"))]
1416 {
1417 let execution = search::hybrid_search_detailed_with_context(
1418 conn,
1419 &q,
1420 &query_embedding,
1421 &config,
1422 &context_owned,
1423 k,
1424 ns_slice,
1425 st_slice,
1426 None,
1427 )?;
1428 Ok(types::ExplainedSearchResponse {
1429 results: execution.results,
1430 receipt: execution.receipt,
1431 })
1432 }
1433 })
1434 .await?;
1435 if let Some(receipt) = &response.receipt {
1436 self.persist_search_receipt(receipt).await?;
1437 }
1438 Ok(response)
1439 }
1440
1441 pub async fn get_search_receipt(
1443 &self,
1444 receipt_id: &str,
1445 ) -> Result<Option<VectorSearchReceiptV1>, MemoryError> {
1446 let receipt_id = receipt_id.to_string();
1447 self.with_read_conn(move |conn| db::get_search_receipt(conn, &receipt_id))
1448 .await
1449 }
1450
1451 pub async fn replay_search_receipt(
1457 &self,
1458 receipt_id: &str,
1459 query: &str,
1460 top_k: Option<usize>,
1461 namespaces: Option<&[&str]>,
1462 source_types: Option<&[SearchSourceType]>,
1463 ) -> Result<SearchReplayReportV1, MemoryError> {
1464 let original_receipt = self.get_search_receipt(receipt_id).await?.ok_or_else(|| {
1465 MemoryError::SearchReceiptNotFound {
1466 receipt_id: receipt_id.to_string(),
1467 }
1468 })?;
1469
1470 let vector_only = original_receipt.search_profile.starts_with("vector_only");
1471 let replay_top_k = top_k.or_else(|| Some(original_receipt.result_ids.len().max(1)));
1472 let replay_receipt_id = format!("{receipt_id}:replay:{}", uuid::Uuid::new_v4());
1473 let mut context = SearchContext::at(original_receipt.evaluation_time);
1474 context.receipt_mode = ReceiptMode::ReturnReceipt;
1475 context.request_id = Some(replay_receipt_id.clone());
1476 context.trace_id = original_receipt.trace_id.clone();
1477 context.attempt_family_id = original_receipt
1478 .attempt_family_id
1479 .clone()
1480 .or_else(|| Some(original_receipt.receipt_id.clone()));
1481 context.attempt_id = Some(replay_receipt_id.clone());
1482 context.replay_of = Some(original_receipt.receipt_id.clone());
1483 context.query_text_digest = original_receipt.query_text_digest.clone();
1484 context.query_input_digest = original_receipt.query_input_digest.clone();
1485 context.filter_digest = original_receipt.filter_digest.clone();
1486 context.redaction_state = original_receipt.redaction_state.clone();
1487 context.budget_id = original_receipt.budget_id.clone();
1488 context.exactness_profile = if original_receipt.approximate {
1489 ExactnessProfile::AllowApproximate
1490 } else {
1491 ExactnessProfile::PreferExact
1492 };
1493
1494 let replay_response = if vector_only {
1495 self.search_vector_only_with_context(
1496 query,
1497 replay_top_k,
1498 namespaces,
1499 source_types,
1500 context,
1501 )
1502 .await?
1503 } else {
1504 self.search_with_context(query, replay_top_k, namespaces, source_types, context)
1505 .await?
1506 };
1507 let replay_receipt = replay_response
1508 .receipt
1509 .ok_or_else(|| MemoryError::Other("replay did not produce a receipt".to_string()))?;
1510
1511 let query_embedding_digest_matches =
1512 original_receipt.query_embedding_digest == replay_receipt.query_embedding_digest;
1513 let result_ids_match = original_receipt.result_ids == replay_receipt.result_ids;
1514 let missing_result_ids = original_receipt
1515 .result_ids
1516 .iter()
1517 .filter(|id| !replay_receipt.result_ids.contains(*id))
1518 .cloned()
1519 .collect();
1520 let added_result_ids = replay_receipt
1521 .result_ids
1522 .iter()
1523 .filter(|id| !original_receipt.result_ids.contains(*id))
1524 .cloned()
1525 .collect();
1526
1527 Ok(SearchReplayReportV1 {
1528 receipt_id: original_receipt.receipt_id.clone(),
1529 replay_receipt_id,
1530 original_receipt,
1531 replay_receipt,
1532 query_embedding_digest_matches,
1533 result_ids_match,
1534 missing_result_ids,
1535 added_result_ids,
1536 vector_only,
1537 })
1538 }
1539
1540 pub async fn embedding_displacement(
1544 &self,
1545 text_a: &str,
1546 text_b: &str,
1547 ) -> Result<types::EmbeddingDisplacement, MemoryError> {
1548 let emb_a = self.embed_text_internal(text_a).await?;
1549 let emb_b = self.embed_text_internal(text_b).await?;
1550 Self::embedding_displacement_from_vecs(&emb_a, &emb_b)
1551 }
1552
1553 pub fn embedding_displacement_from_vecs(
1555 a: &[f32],
1556 b: &[f32],
1557 ) -> Result<types::EmbeddingDisplacement, MemoryError> {
1558 if a.len() != b.len() {
1559 return Err(MemoryError::DimensionMismatch {
1560 expected: a.len(),
1561 actual: b.len(),
1562 });
1563 }
1564 let cosine_sim = search::cosine_similarity(a, b)?;
1565
1566 let euclidean_dist: f32 = a
1567 .iter()
1568 .zip(b.iter())
1569 .map(|(x, y)| (x - y) * (x - y))
1570 .sum::<f32>()
1571 .sqrt();
1572
1573 let mag_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
1574 let mag_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
1575
1576 Ok(types::EmbeddingDisplacement {
1577 cosine_similarity: cosine_sim,
1578 euclidean_distance: euclidean_dist,
1579 magnitude_a: mag_a,
1580 magnitude_b: mag_b,
1581 })
1582 }
1583
1584 pub fn chunk_text(&self, text: &str) -> Vec<TextChunk> {
1588 chunker::chunk_text(
1589 text,
1590 &self.inner.config.chunking,
1591 self.inner.token_counter.as_ref(),
1592 )
1593 }
1594
1595 pub async fn embed(&self, text: &str) -> Result<Vec<f32>, MemoryError> {
1597 self.embed_text_internal(text).await
1598 }
1599
1600 pub async fn embed_batch(&self, texts: &[&str]) -> Result<Vec<Vec<f32>>, MemoryError> {
1602 let owned: Vec<String> = texts.iter().map(|s| s.to_string()).collect();
1603 self.embed_batch_internal(owned).await
1604 }
1605
1606 pub async fn stats(&self) -> Result<MemoryStats, MemoryError> {
1608 let db_path = self.inner.paths.sqlite_path.clone();
1609 self.with_read_conn(move |conn| {
1610 let total_facts: u64 =
1611 conn.query_row("SELECT COUNT(*) FROM facts", [], |r| r.get(0))?;
1612 let total_documents: u64 =
1613 conn.query_row("SELECT COUNT(*) FROM documents", [], |r| r.get(0))?;
1614 let total_chunks: u64 =
1615 conn.query_row("SELECT COUNT(*) FROM chunks", [], |r| r.get(0))?;
1616 let total_sessions: u64 =
1617 conn.query_row("SELECT COUNT(*) FROM sessions", [], |r| r.get(0))?;
1618 let total_messages: u64 =
1619 conn.query_row("SELECT COUNT(*) FROM messages", [], |r| r.get(0))?;
1620
1621 let db_size = std::fs::metadata(&db_path).map(|m| m.len()).unwrap_or(0);
1622
1623 let (model, dims): (Option<String>, Option<usize>) = conn
1624 .query_row(
1625 "SELECT model_name, dimensions FROM embedding_metadata WHERE id = 1",
1626 [],
1627 |r| Ok((Some(r.get(0)?), Some(r.get(1)?))),
1628 )
1629 .unwrap_or((None, None));
1630
1631 Ok(MemoryStats {
1632 total_facts,
1633 total_documents,
1634 total_chunks,
1635 total_sessions,
1636 total_messages,
1637 database_size_bytes: db_size,
1638 embedding_model: model,
1639 embedding_dimensions: dims,
1640 })
1641 })
1642 .await
1643 }
1644
1645 pub async fn list_scope_domains(&self) -> Result<Vec<String>, MemoryError> {
1651 self.with_read_conn(|conn| {
1652 let mut stmt = conn.prepare(
1653 "SELECT DISTINCT json_extract(metadata, '$.scope_domain') \
1654 FROM documents \
1655 WHERE json_extract(metadata, '$.scope_domain') IS NOT NULL",
1656 )?;
1657 let domains: Vec<String> = stmt
1658 .query_map([], |row| row.get::<_, String>(0))?
1659 .filter_map(|r| r.ok())
1660 .collect();
1661 Ok(domains)
1662 })
1663 .await
1664 }
1665
1666 pub async fn embeddings_are_dirty(&self) -> Result<bool, MemoryError> {
1668 self.with_read_conn(db::is_embeddings_dirty).await
1669 }
1670
1671 pub async fn reembed_all(&self) -> Result<usize, MemoryError> {
1673 let mut count = 0usize;
1674 let batch_size = self.inner.config.embedding.batch_size;
1675 let dims = self.inner.config.embedding.dimensions;
1676
1677 let fact_contents: Vec<(String, String)> = self
1679 .with_read_conn(|conn| {
1680 let mut stmt = conn.prepare("SELECT id, content FROM facts")?;
1681 let result = stmt
1682 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1683 .collect::<Result<Vec<_>, _>>()?;
1684 Ok(result)
1685 })
1686 .await?;
1687
1688 let mut fact_count = 0usize;
1689 for batch in fact_contents.chunks(batch_size) {
1690 let texts: Vec<String> = batch.iter().map(|(_, c)| c.clone()).collect();
1691 let embeddings = self.embed_batch_internal(texts).await?;
1692 for embedding in &embeddings {
1693 self.validate_embedding_dimensions(embedding)?;
1694 }
1695
1696 let quantizer = Quantizer::new(dims);
1697 let updates: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = batch
1698 .iter()
1699 .zip(embeddings.iter())
1700 .map(|((id, _), emb)| {
1701 let q8 = quantizer
1703 .quantize(emb)
1704 .map(|qv| quantize::pack_quantized(&qv))
1705 .ok();
1706 (id.clone(), db::embedding_to_bytes(emb), q8)
1707 })
1708 .collect();
1709
1710 self.with_write_conn(move |conn| {
1711 db::with_transaction(conn, |tx| {
1712 for (fid, bytes, q8) in &updates {
1713 tx.execute(
1714 "UPDATE facts SET embedding = ?1, embedding_q8 = ?2, updated_at = datetime('now') WHERE id = ?3",
1715 rusqlite::params![bytes, q8.as_deref(), fid],
1716 )?;
1717 #[cfg(feature = "hnsw")]
1718 db::queue_pending_index_op(
1719 tx,
1720 &format!("fact:{fid}"),
1721 "fact",
1722 db::IndexOpKind::Upsert,
1723 )?;
1724 db::invalidate_derived_vector_artifact(tx, &format!("fact:{fid}"))?;
1725 }
1726 Ok(())
1727 })
1728 })
1729 .await?;
1730
1731 fact_count += batch.len();
1732 count += batch.len();
1733 if fact_count % 100 == 0 || fact_count == count {
1734 tracing::info!(fact_count, "Re-embedded {} facts so far", fact_count);
1735 }
1736 }
1737
1738 let chunk_data: Vec<(String, String)> = self
1740 .with_read_conn(|conn| {
1741 let mut stmt = conn.prepare("SELECT id, content FROM chunks")?;
1742 let result = stmt
1743 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1744 .collect::<Result<Vec<_>, _>>()?;
1745 Ok(result)
1746 })
1747 .await?;
1748
1749 let mut chunk_count = 0usize;
1750 for batch in chunk_data.chunks(batch_size) {
1751 let texts: Vec<String> = batch.iter().map(|(_, c)| c.clone()).collect();
1752 let embeddings = self.embed_batch_internal(texts).await?;
1753 for embedding in &embeddings {
1754 self.validate_embedding_dimensions(embedding)?;
1755 }
1756
1757 let quantizer = Quantizer::new(dims);
1758 let updates: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = batch
1759 .iter()
1760 .zip(embeddings.iter())
1761 .map(|((id, _), emb)| {
1762 let q8 = quantizer
1764 .quantize(emb)
1765 .map(|qv| quantize::pack_quantized(&qv))
1766 .ok();
1767 (id.clone(), db::embedding_to_bytes(emb), q8)
1768 })
1769 .collect();
1770
1771 self.with_write_conn(move |conn| {
1772 db::with_transaction(conn, |tx| {
1773 for (cid, bytes, q8) in &updates {
1774 tx.execute(
1775 "UPDATE chunks SET embedding = ?1, embedding_q8 = ?2 WHERE id = ?3",
1776 rusqlite::params![bytes, q8.as_deref(), cid],
1777 )?;
1778 #[cfg(feature = "hnsw")]
1779 db::queue_pending_index_op(
1780 tx,
1781 &format!("chunk:{cid}"),
1782 "chunk",
1783 db::IndexOpKind::Upsert,
1784 )?;
1785 db::invalidate_derived_vector_artifact(tx, &format!("chunk:{cid}"))?;
1786 }
1787 Ok(())
1788 })
1789 })
1790 .await?;
1791
1792 chunk_count += batch.len();
1793 count += batch.len();
1794 if chunk_count % 100 == 0 {
1795 tracing::info!(chunk_count, "Re-embedded {} chunks so far", chunk_count);
1796 }
1797 }
1798
1799 let message_data: Vec<(i64, String)> = self
1801 .with_read_conn(|conn| {
1802 let mut stmt = conn.prepare("SELECT id, content FROM messages")?;
1803 let result = stmt
1804 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1805 .collect::<Result<Vec<_>, _>>()?;
1806 Ok(result)
1807 })
1808 .await?;
1809
1810 let mut msg_count = 0usize;
1811 for batch in message_data.chunks(batch_size) {
1812 let texts: Vec<String> = batch.iter().map(|(_, c)| c.clone()).collect();
1813 let embeddings = self.embed_batch_internal(texts).await?;
1814 for embedding in &embeddings {
1815 self.validate_embedding_dimensions(embedding)?;
1816 }
1817
1818 let quantizer = Quantizer::new(dims);
1819 let updates: Vec<(i64, Vec<u8>, Option<Vec<u8>>)> = batch
1820 .iter()
1821 .zip(embeddings.iter())
1822 .map(|((id, _), emb)| {
1823 let q8 = quantizer
1825 .quantize(emb)
1826 .map(|qv| quantize::pack_quantized(&qv))
1827 .ok();
1828 (*id, db::embedding_to_bytes(emb), q8)
1829 })
1830 .collect();
1831
1832 self.with_write_conn(move |conn| {
1833 db::with_transaction(conn, |tx| {
1834 for (mid, bytes, q8) in &updates {
1835 tx.execute(
1836 "UPDATE messages SET embedding = ?1, embedding_q8 = ?2 WHERE id = ?3",
1837 rusqlite::params![bytes, q8.as_deref(), mid],
1838 )?;
1839 #[cfg(feature = "hnsw")]
1840 db::queue_pending_index_op(
1841 tx,
1842 &format!("msg:{mid}"),
1843 "message",
1844 db::IndexOpKind::Upsert,
1845 )?;
1846 db::invalidate_derived_vector_artifact(tx, &format!("msg:{mid}"))?;
1847 }
1848 Ok(())
1849 })
1850 })
1851 .await?;
1852
1853 msg_count += batch.len();
1854 count += batch.len();
1855 if msg_count % 100 == 0 {
1856 tracing::info!(msg_count, "Re-embedded {} messages so far", msg_count);
1857 }
1858 }
1859
1860 let episode_data: Vec<(String, String)> = self
1862 .with_read_conn(|conn| {
1863 let mut stmt = conn.prepare("SELECT episode_id, search_text FROM episodes")?;
1864 let result = stmt
1865 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1866 .collect::<Result<Vec<_>, _>>()?;
1867 Ok(result)
1868 })
1869 .await?;
1870
1871 let mut episode_count = 0usize;
1872 for batch in episode_data.chunks(batch_size) {
1873 let texts: Vec<String> = batch.iter().map(|(_, text)| text.clone()).collect();
1874 let embeddings = self.embed_batch_internal(texts).await?;
1875 for embedding in &embeddings {
1876 self.validate_embedding_dimensions(embedding)?;
1877 }
1878
1879 let quantizer = Quantizer::new(dims);
1880 let updates: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = batch
1881 .iter()
1882 .zip(embeddings.iter())
1883 .map(|((episode_id, _), embedding)| {
1884 let q8 = quantizer
1886 .quantize(embedding)
1887 .map(|vector| quantize::pack_quantized(&vector))
1888 .ok();
1889 (episode_id.clone(), db::embedding_to_bytes(embedding), q8)
1890 })
1891 .collect();
1892
1893 self.with_write_conn(move |conn| {
1894 db::with_transaction(conn, |tx| {
1895 for (episode_id, bytes, q8) in &updates {
1896 tx.execute(
1897 "UPDATE episodes
1898 SET embedding = ?1,
1899 embedding_q8 = ?2,
1900 updated_at = datetime('now')
1901 WHERE episode_id = ?3",
1902 rusqlite::params![bytes, q8.as_deref(), episode_id],
1903 )?;
1904 #[cfg(feature = "hnsw")]
1905 db::queue_pending_index_op(
1906 tx,
1907 &episodes::episode_item_key(episode_id),
1908 "episode",
1909 db::IndexOpKind::Upsert,
1910 )?;
1911 db::invalidate_derived_vector_artifact(
1912 tx,
1913 &episodes::episode_item_key(episode_id),
1914 )?;
1915 }
1916 Ok(())
1917 })
1918 })
1919 .await?;
1920
1921 episode_count += batch.len();
1922 count += batch.len();
1923 if episode_count % 100 == 0 {
1924 tracing::info!(
1925 episode_count,
1926 "Re-embedded {} episodes so far",
1927 episode_count
1928 );
1929 }
1930 }
1931
1932 self.with_write_conn(db::clear_embeddings_dirty).await?;
1934
1935 tracing::info!(
1936 facts = fact_count,
1937 chunks = chunk_count,
1938 messages = msg_count,
1939 episodes = episode_count,
1940 total = count,
1941 "Re-embedding complete"
1942 );
1943
1944 #[cfg(feature = "hnsw")]
1946 {
1947 tracing::info!("Rebuilding HNSW index after re-embedding...");
1948 let _receipt = self.rebuild_hnsw_index().await?;
1949 }
1950
1951 Ok(count)
1952 }
1953
1954 pub async fn vacuum(&self) -> Result<(), MemoryError> {
1956 self.with_write_conn(|conn| {
1957 conn.execute_batch("VACUUM")?;
1958 Ok(())
1959 })
1960 .await
1961 }
1962
1963 #[deprecated(
1986 since = "0.5.0",
1987 note = "Legacy V10 import envelope path is compatibility-only. Use `import_projection_batch()` and `ProjectionImportBatchV3` on the canonical lane."
1988 )]
1989 #[doc(hidden)]
1990 #[allow(deprecated)]
1991 pub async fn import_envelope(
1992 &self,
1993 envelope: &projection_import::ImportEnvelope,
1994 ) -> Result<projection_import::ImportReceipt, MemoryError> {
1995 projection_legacy_compat::import_envelope(self, envelope).await
1996 }
1997
1998 #[deprecated(
2000 since = "0.5.0",
2001 note = "Legacy V10 import envelope status reads are compatibility-only. Prefer the projection import log."
2002 )]
2003 #[doc(hidden)]
2004 #[allow(deprecated)]
2005 pub async fn import_status(
2006 &self,
2007 envelope_id: &projection_import::EnvelopeId,
2008 ) -> Result<Vec<projection_import::ImportReceipt>, MemoryError> {
2009 projection_legacy_compat::import_status(self, envelope_id).await
2010 }
2011
2012 #[deprecated(
2014 since = "0.5.0",
2015 note = "Legacy V10 import log access is compatibility-only. Prefer new projection-import metadata."
2016 )]
2017 #[doc(hidden)]
2018 #[allow(deprecated)]
2019 pub async fn list_imports(
2020 &self,
2021 namespace: Option<&str>,
2022 limit: usize,
2023 ) -> Result<Vec<projection_import::ImportReceipt>, MemoryError> {
2024 projection_legacy_compat::list_imports(self, namespace, limit).await
2025 }
2026
2027 #[allow(deprecated)]
2029 pub async fn last_import_at(&self, namespace: &str) -> Result<Option<String>, MemoryError> {
2030 projection_legacy_compat::last_import_at(self, namespace).await
2031 }
2032
2033 pub async fn query_claim_versions(
2035 &self,
2036 query: ProjectionQuery,
2037 ) -> Result<Vec<ProjectionClaimVersion>, MemoryError> {
2038 self.with_read_conn(move |conn| projection_storage::query_claim_versions(conn, &query))
2039 .await
2040 }
2041
2042 pub async fn query_relation_versions(
2044 &self,
2045 query: ProjectionQuery,
2046 ) -> Result<Vec<ProjectionRelationVersion>, MemoryError> {
2047 self.with_read_conn(move |conn| projection_storage::query_relation_versions(conn, &query))
2048 .await
2049 }
2050
2051 pub async fn query_episodes(
2053 &self,
2054 query: ProjectionQuery,
2055 ) -> Result<Vec<ProjectionEpisode>, MemoryError> {
2056 self.with_read_conn(move |conn| projection_storage::query_episode_rows(conn, &query))
2057 .await
2058 }
2059
2060 pub async fn query_entity_aliases(
2062 &self,
2063 query: ProjectionQuery,
2064 ) -> Result<Vec<ProjectionEntityAlias>, MemoryError> {
2065 self.with_read_conn(move |conn| projection_storage::query_entity_aliases(conn, &query))
2066 .await
2067 }
2068
2069 pub async fn query_evidence_refs(
2071 &self,
2072 query: ProjectionQuery,
2073 ) -> Result<Vec<ProjectionEvidenceRef>, MemoryError> {
2074 self.with_read_conn(move |conn| projection_storage::query_evidence_refs(conn, &query))
2075 .await
2076 }
2077
2078 #[cfg(any(test, feature = "testing"))]
2080 pub async fn raw_execute(&self, sql: &str, params: Vec<String>) -> Result<usize, MemoryError> {
2081 let sql = sql.to_string();
2082 self.with_write_conn(move |conn| {
2083 let param_refs: Vec<&dyn rusqlite::types::ToSql> = params
2084 .iter()
2085 .map(|s| s as &dyn rusqlite::types::ToSql)
2086 .collect();
2087 Ok(conn.execute(&sql, &*param_refs)?)
2088 })
2089 .await
2090 }
2091}