1#![allow(deprecated)]
2
3#[cfg(not(any(feature = "hnsw", feature = "brute-force")))]
58compile_error!("At least one search backend feature must be enabled: 'hnsw' or 'brute-force'");
59
60pub mod chunker;
61pub mod config;
62pub(crate) mod conversation;
63pub mod db;
64pub(crate) mod documents;
65pub mod embedder;
66pub(crate) mod episodes;
67pub mod error;
68mod graph;
69#[cfg(feature = "hnsw")]
70pub mod hnsw;
71#[cfg(feature = "hnsw")]
72mod hnsw_ops;
73mod json_compat_import;
74pub(crate) mod knowledge;
75mod pool;
76mod projection_batch;
77mod projection_derivation;
78#[deprecated(
82 since = "0.6.0",
83 note = "Legacy V10 import path is migration-only. Use `import_projection_batch()` with `ProjectionImportBatchV3` on the canonical lane."
84)]
85#[doc(hidden)]
86pub mod projection_import;
87mod projection_lane;
88mod projection_legacy_compat;
89pub(crate) mod projection_storage;
90pub mod quantize;
91pub mod search;
92pub mod storage;
93mod store_support;
94pub mod tokenizer;
95pub mod types;
96
97pub use config::{
99 ChunkingConfig, EmbeddingConfig, MemoryConfig, MemoryLimits, PoolConfig, SearchConfig,
100};
101pub use db::{IntegrityReport, ReconcileAction, VerifyMode};
102pub use embedder::{Embedder, MockEmbedder, OllamaEmbedder};
103pub use error::MemoryError;
104#[cfg(feature = "hnsw")]
105pub use hnsw::{HnswConfig, HnswHit, HnswIndex};
106pub(crate) use projection_lane::projection_import_failure_id;
107pub use projection_lane::{
108 ProjectionImportFailureReceiptEntry, ProjectionImportLogEntry, ProjectionImportResult,
109};
110pub use quantize::{pack_quantized, unpack_quantized, QuantizedVector, Quantizer};
111pub use storage::StoragePaths;
112pub use tokenizer::{EstimateTokenCounter, TokenCounter};
113pub use types::{
114 Document, EmbeddingDisplacement, EpisodeMeta, EpisodeOutcome, ExplainedResult, Fact,
115 GraphDirection, GraphEdge, GraphEdgeType, GraphView, MemoryStats, Message,
116 ProjectionClaimVersion, ProjectionEntityAlias, ProjectionEpisode, ProjectionEvidenceRef,
117 ProjectionQuery, ProjectionRelationVersion, Role, ScoreBreakdown, SearchResult, SearchSource,
118 SearchSourceType, Session, TextChunk, VerificationStatus,
119};
120
121use std::sync::Arc;
122
123pub(crate) use store_support::{
124 as_str_slice, build_episode_search_text, merge_trace_ctx, to_owned_string_vec,
125 verification_status_for_outcome,
126};
127
128#[doc(hidden)]
130pub mod compat {
131 #[deprecated(
132 since = "0.5.0",
133 note = "Legacy ImportEnvelope is migration-only. New integrations should use `ProjectionImportBatchV3` on the canonical lane."
134 )]
135 #[doc(hidden)]
136 #[allow(deprecated)]
137 pub mod legacy_import_envelope {
138 pub use crate::projection_import::{
139 ImportEnvelope, ImportProjectionFreshness, ImportReceipt, ImportRecord, ImportStatus,
140 };
141 pub use stack_ids::EnvelopeId;
142 }
143
144 #[deprecated(
145 since = "0.5.0",
146 note = "Legacy trace_id is migration-only. Use `stack_ids::TraceCtx`."
147 )]
148 #[doc(hidden)]
149 #[allow(deprecated)]
150 pub mod compat_trace_id {
151 pub use crate::types::TraceId;
152 }
153}
154
155#[derive(Clone)]
159pub struct MemoryStore {
160 inner: Arc<MemoryStoreInner>,
161}
162
163struct MemoryStoreInner {
164 pool: pool::SqlitePool,
165 embedder: Box<dyn Embedder>,
166 embedding_permits: Arc<tokio::sync::Semaphore>,
167 config: MemoryConfig,
168 paths: StoragePaths,
169 token_counter: Arc<dyn TokenCounter>,
170 #[cfg(feature = "hnsw")]
171 hnsw_index: std::sync::RwLock<HnswIndex>,
172}
173
174#[cfg(feature = "hnsw")]
175impl Drop for MemoryStoreInner {
176 fn drop(&mut self) {
177 if !self.paths.hnsw_dir.exists() {
178 tracing::debug!(
179 path = %self.paths.hnsw_dir.display(),
180 "Skipping HNSW drop flush because the sidecar directory no longer exists"
181 );
182 return;
183 }
184
185 let pending_ops = match self.pool.with_read_conn(db::pending_index_op_count) {
186 Ok(count) => count,
187 Err(err) => {
188 tracing::warn!("Failed to inspect pending HNSW work on drop: {}", err);
189 0
190 }
191 };
192
193 if pending_ops > 0 {
194 if let Err(err) =
195 hnsw_ops::recover_hnsw_sidecar_sync(&self.pool, &self.paths, &self.config.hnsw)
196 {
197 tracing::error!("Failed to recover and flush HNSW on drop: {}", err);
198 }
199 return;
200 }
201
202 let hnsw_guard = match self.hnsw_index.read() {
203 Ok(g) => g,
204 Err(_) => {
205 tracing::warn!("HNSW RwLock poisoned on drop — skipping save");
206 return;
207 }
208 };
209
210 if let Err(err) = hnsw_ops::save_hnsw_sidecar(
211 &hnsw_guard,
212 &self.paths.hnsw_dir,
213 &self.paths.hnsw_basename,
214 ) {
215 tracing::error!("Failed to save HNSW index on drop: {}", err);
216 }
217
218 if let Err(e) = self
220 .pool
221 .with_write_conn(|conn| hnsw_guard.flush_keymap(conn))
222 {
223 tracing::error!("Failed to flush HNSW keymap on drop: {}", e);
224 }
225 }
226}
227
228impl MemoryStore {
229 async fn with_read_conn<F, T>(&self, f: F) -> Result<T, MemoryError>
234 where
235 F: FnOnce(&rusqlite::Connection) -> Result<T, MemoryError> + Send + 'static,
236 T: Send + 'static,
237 {
238 let inner = self.inner.clone();
239 tokio::task::spawn_blocking(move || -> Result<T, MemoryError> {
240 inner.pool.with_read_conn(f)
241 })
242 .await
243 .map_err(|e| MemoryError::Other(format!("Blocking task panicked: {}", e)))?
244 }
245
246 async fn with_write_conn<F, T>(&self, f: F) -> Result<T, MemoryError>
248 where
249 F: FnOnce(&rusqlite::Connection) -> Result<T, MemoryError> + Send + 'static,
250 T: Send + 'static,
251 {
252 let inner = self.inner.clone();
253 tokio::task::spawn_blocking(move || -> Result<T, MemoryError> {
254 inner.pool.with_write_conn(f)
255 })
256 .await
257 .map_err(|e| MemoryError::Other(format!("Blocking task panicked: {}", e)))?
258 }
259
260 #[cfg(feature = "hnsw")]
263 async fn hnsw_search_blocking(
264 &self,
265 query_embedding: Vec<f32>,
266 candidates: usize,
267 ) -> Vec<HnswHit> {
268 let inner = self.inner.clone();
269 tokio::task::spawn_blocking(move || {
270 let guard = inner.hnsw_index.read().unwrap_or_else(|e| e.into_inner());
271 match guard.search(&query_embedding, candidates) {
272 Ok(hits) => hits,
273 Err(e) => {
274 tracing::error!(
275 "HNSW search failed, falling back to brute-force vector search: {}",
276 e
277 );
278 Vec::new()
279 }
280 }
281 })
282 .await
283 .unwrap_or_else(|e| {
284 tracing::error!("HNSW search blocking task panicked: {}", e);
285 Vec::new()
286 })
287 }
288
289 #[cfg(feature = "hnsw")]
290 fn sync_pending_hnsw_ops_blocking(&self) -> Result<usize, MemoryError> {
291 hnsw_ops::sync_pending_hnsw_sidecar(&self.inner)
292 }
293
294 #[cfg(feature = "hnsw")]
295 async fn sync_pending_hnsw_ops(&self) -> Result<usize, MemoryError> {
296 let inner = self.inner.clone();
297 tokio::task::spawn_blocking(move || hnsw_ops::sync_pending_hnsw_sidecar(&inner))
298 .await
299 .map_err(|e| MemoryError::Other(format!("Blocking task panicked: {}", e)))?
300 }
301
302 #[cfg(feature = "hnsw")]
303 async fn sync_pending_hnsw_ops_best_effort(&self, operation: &'static str) {
304 if let Err(err) = self.sync_pending_hnsw_ops().await {
305 tracing::warn!(
306 operation,
307 error = %err,
308 "SQLite write committed but HNSW sidecar sync is still pending"
309 );
310 } else {
311 self.maybe_flush_hnsw();
312 }
313 }
314
315 pub fn open(config: MemoryConfig) -> Result<Self, MemoryError> {
320 let config = config.normalize_and_validate()?;
321 let embedder = Box::new(OllamaEmbedder::try_new(&config.embedding)?);
322 Self::open_with_embedder(config, embedder)
323 }
324
325 #[allow(unused_mut)] pub fn open_with_embedder(
328 mut config: MemoryConfig,
329 embedder: Box<dyn Embedder>,
330 ) -> Result<Self, MemoryError> {
331 config = config.normalize_and_validate()?;
332 if embedder.dimensions() != config.embedding.dimensions {
333 return Err(MemoryError::DimensionMismatch {
334 expected: config.embedding.dimensions,
335 actual: embedder.dimensions(),
336 });
337 }
338 config.embedding.model = embedder.model_name().to_string();
339
340 let paths = StoragePaths::new(&config.base_dir);
341
342 std::fs::create_dir_all(&paths.base_dir).map_err(|e| {
344 MemoryError::StorageError(format!(
345 "Failed to create directory {}: {}",
346 paths.base_dir.display(),
347 e
348 ))
349 })?;
350
351 let pool = pool::SqlitePool::open(&paths.sqlite_path, &config.pool, &config.limits)?;
352 pool.with_write_conn(|conn| db::check_embedding_metadata(conn, &config.embedding))?;
353
354 #[cfg(feature = "hnsw")]
356 {
357 config.hnsw.dimensions = config.embedding.dimensions;
358 }
359
360 let token_counter = config
361 .token_counter
362 .clone()
363 .unwrap_or_else(tokenizer::default_token_counter);
364
365 #[cfg(feature = "hnsw")]
366 let hnsw_index = {
367 let hnsw_config = config.hnsw.clone();
368
369 let embeddings_dirty = pool.with_read_conn(db::is_embeddings_dirty)?;
370 let pending_index_ops = pool.with_read_conn(db::pending_index_op_count)?;
371
372 if embeddings_dirty {
373 tracing::warn!(
376 "Embedding model changed — creating fresh HNSW index (old index is stale)"
377 );
378 pool.with_write_conn(|conn| {
379 db::clear_all_pending_index_ops(conn)?;
380 db::set_sidecar_dirty(conn, false)?;
381 Ok(())
382 })?;
383 HnswIndex::new(hnsw_config)?
384 } else if pending_index_ops > 0 || pool.with_read_conn(db::is_sidecar_dirty)? {
385 tracing::warn!(
386 pending_index_ops,
387 "Recovering HNSW sidecar from SQLite because durable sidecar work exists"
388 );
389 hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?
390 } else if paths.hnsw_files_exist() {
391 tracing::info!("Loading HNSW index from {:?}", paths.hnsw_dir);
392 match HnswIndex::load(&paths.hnsw_dir, &paths.hnsw_basename, hnsw_config.clone()) {
393 Ok(index) => {
394 if let Err(e) = pool.with_write_conn(|conn| index.load_keymap(conn)) {
396 tracing::warn!("Failed to load HNSW key mappings: {}. Mappings will be empty until rebuild.", e);
397 }
398
399 let hnsw_count = index.len();
403 let sqlite_count: i64 = pool.with_read_conn(|conn| {
404 Ok(conn
405 .query_row(
406 "SELECT (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
407 (SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
408 (SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
409 (SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
410 [],
411 |row| row.get(0),
412 )
413 .unwrap_or(0))
414 })?;
415
416 let drift = (sqlite_count - hnsw_count as i64).abs();
417 if drift > 0 {
418 tracing::warn!(
419 hnsw_count,
420 sqlite_count,
421 drift,
422 "HNSW index is stale — {} entries differ from SQLite. \
423 Likely caused by unclean shutdown. Triggering inline rebuild.",
424 drift
425 );
426 let rebuilt =
428 hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?;
429 tracing::info!(
430 active = rebuilt.len(),
431 "HNSW index rebuilt after stale detection"
432 );
433 rebuilt
434 } else {
435 tracing::info!(
436 "HNSW index loaded ({} active keys, in sync with SQLite)",
437 hnsw_count
438 );
439 index
440 }
441 }
442 Err(e) => {
443 tracing::warn!(
444 "Failed to load HNSW index: {}. Creating new empty index.",
445 e
446 );
447 HnswIndex::new(hnsw_config)?
448 }
449 }
450 } else {
451 let orphan_count: i64 = pool.with_read_conn(|conn| {
456 Ok(conn
457 .query_row(
458 "SELECT (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
459 (SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
460 (SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
461 (SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
462 [],
463 |row| row.get(0),
464 )
465 .unwrap_or(0))
466 })?;
467
468 if orphan_count > 0 {
469 tracing::warn!(
470 orphan_count,
471 "HNSW sidecar files missing but {} embeddings exist in SQLite — \
472 rebuilding index inline",
473 orphan_count
474 );
475 let new_index =
476 hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?;
477 tracing::info!(
478 active = new_index.len(),
479 "HNSW index rebuilt from SQLite embeddings"
480 );
481 new_index
482 } else {
483 tracing::info!("Creating new empty HNSW index (no embeddings in SQLite)");
484 HnswIndex::new(hnsw_config)?
485 }
486 }
487 };
488
489 let store = Self {
490 inner: Arc::new(MemoryStoreInner {
491 pool,
492 embedder,
493 embedding_permits: Arc::new(tokio::sync::Semaphore::new(
494 config.limits.max_embedding_concurrency,
495 )),
496 config,
497 paths,
498 token_counter,
499 #[cfg(feature = "hnsw")]
500 hnsw_index: std::sync::RwLock::new(hnsw_index),
501 }),
502 };
503
504 #[cfg(feature = "hnsw")]
505 if let Err(err) = store.sync_pending_hnsw_ops_blocking() {
506 tracing::warn!(
507 error = %err,
508 "Failed to reconcile pending HNSW sidecar ops during open; sidecar replay remains pending"
509 );
510 }
511
512 Ok(store)
513 }
514
515 async fn with_embedding_permit(
516 &self,
517 ) -> Result<tokio::sync::OwnedSemaphorePermit, MemoryError> {
518 self.inner
519 .embedding_permits
520 .clone()
521 .acquire_owned()
522 .await
523 .map_err(|e| MemoryError::Other(format!("embedding semaphore closed: {e}")))
524 }
525
526 async fn embed_text_internal(&self, text: &str) -> Result<Vec<f32>, MemoryError> {
527 let _permit = self.with_embedding_permit().await?;
528 self.inner.embedder.embed(text).await
529 }
530
531 async fn embed_batch_internal(&self, texts: Vec<String>) -> Result<Vec<Vec<f32>>, MemoryError> {
532 let _permit = self.with_embedding_permit().await?;
533 self.inner.embedder.embed_batch(texts).await
534 }
535
536 fn validate_embedding_dimensions(&self, embedding: &[f32]) -> Result<(), MemoryError> {
537 let expected = self.inner.config.embedding.dimensions;
538 if embedding.len() != expected {
539 return Err(MemoryError::DimensionMismatch {
540 expected,
541 actual: embedding.len(),
542 });
543 }
544 Ok(())
545 }
546
547 fn validate_content(&self, field: &'static str, content: &str) -> Result<(), MemoryError> {
548 if content.is_empty() {
549 return Err(MemoryError::InvalidConfig {
550 field,
551 reason: "content must not be empty".to_string(),
552 });
553 }
554
555 let limit = self.inner.config.limits.max_content_bytes;
556 if content.len() > limit {
557 return Err(MemoryError::ContentTooLarge {
558 size: content.len(),
559 limit,
560 });
561 }
562
563 Ok(())
564 }
565
566 fn validate_confidence(confidence: f32) -> Result<(), MemoryError> {
567 if !confidence.is_finite() || !(0.0..=1.0).contains(&confidence) {
568 return Err(MemoryError::InvalidConfig {
569 field: "episodes.confidence",
570 reason: "confidence must be finite and within [0.0, 1.0]".to_string(),
571 });
572 }
573 Ok(())
574 }
575
576 #[cfg(feature = "hnsw")]
582 pub async fn rebuild_hnsw_index(&self) -> Result<(), MemoryError> {
583 tracing::info!("Rebuilding HNSW index from SQLite embeddings...");
584 let hnsw_config = self.inner.config.hnsw.clone();
585 let new_index = self
586 .with_read_conn(move |conn| hnsw_ops::rebuild_hnsw_from_sqlite(conn, &hnsw_config))
587 .await?;
588
589 {
590 let mut guard = self
591 .inner
592 .hnsw_index
593 .write()
594 .unwrap_or_else(|e| e.into_inner());
595 *guard = new_index.clone();
596 }
597
598 hnsw_ops::save_hnsw_sidecar(
599 &new_index,
600 &self.inner.paths.hnsw_dir,
601 &self.inner.paths.hnsw_basename,
602 )?;
603 self.inner.pool.with_write_conn(|conn| {
604 new_index.flush_keymap(conn)?;
605 db::clear_all_pending_index_ops(conn)?;
606 db::set_sidecar_dirty(conn, false)?;
607 Ok(())
608 })?;
609
610 tracing::info!(active = new_index.len(), "HNSW index rebuilt");
611
612 Ok(())
613 }
614
615 #[cfg(feature = "hnsw")]
620 fn maybe_flush_hnsw(&self) {
621 if let Some(interval) = self.inner.config.hnsw.flush_interval_secs {
622 let guard = self
623 .inner
624 .hnsw_index
625 .read()
626 .unwrap_or_else(|e| e.into_inner());
627 if guard.should_flush(interval) {
628 drop(guard); if let Err(e) = self.flush_hnsw() {
630 tracing::warn!("Opportunistic HNSW flush failed: {}", e);
631 } else {
632 let guard = self
633 .inner
634 .hnsw_index
635 .read()
636 .unwrap_or_else(|e| e.into_inner());
637 guard.update_last_flush_epoch();
638 tracing::info!("Opportunistic HNSW flush completed");
639 }
640 }
641 }
642 }
643
644 #[cfg(feature = "hnsw")]
648 pub fn flush_hnsw(&self) -> Result<(), MemoryError> {
649 let pending_ops = self.inner.pool.with_read_conn(db::pending_index_op_count)?;
650 if pending_ops > 0 {
651 tracing::info!(
652 pending_ops,
653 "Flushing HNSW via authoritative SQLite rebuild because pending durable sidecar work exists"
654 );
655 let rebuilt = hnsw_ops::recover_hnsw_sidecar_sync(
656 &self.inner.pool,
657 &self.inner.paths,
658 &self.inner.config.hnsw,
659 )?;
660 let mut guard = self
661 .inner
662 .hnsw_index
663 .write()
664 .unwrap_or_else(|e| e.into_inner());
665 *guard = rebuilt;
666 return Ok(());
667 }
668
669 let index = self
670 .inner
671 .hnsw_index
672 .read()
673 .unwrap_or_else(|e| e.into_inner())
674 .clone();
675 hnsw_ops::save_hnsw_sidecar(
676 &index,
677 &self.inner.paths.hnsw_dir,
678 &self.inner.paths.hnsw_basename,
679 )?;
680
681 self.inner.pool.with_write_conn(|conn| {
683 index.flush_keymap(conn)?;
684 db::clear_all_pending_index_ops(conn)?;
685 db::set_sidecar_dirty(conn, false)?;
686 Ok(())
687 })?;
688 Ok(())
689 }
690
691 #[cfg(feature = "hnsw")]
695 pub async fn compact_hnsw(&self) -> Result<(), MemoryError> {
696 if !self
697 .inner
698 .hnsw_index
699 .read()
700 .unwrap_or_else(|e| e.into_inner())
701 .needs_compaction()
702 {
703 tracing::info!("HNSW compaction not needed (deleted ratio below threshold)");
704 return Ok(());
705 }
706 self.rebuild_hnsw_index().await
707 }
708
709 pub async fn verify_integrity(
716 &self,
717 mode: db::VerifyMode,
718 ) -> Result<db::IntegrityReport, MemoryError> {
719 let use_writer = mode == db::VerifyMode::Full;
720 let mut report = if use_writer {
721 self.with_write_conn(move |conn| db::verify_integrity_sync(conn, mode))
722 .await?
723 } else {
724 self.with_read_conn(move |conn| db::verify_integrity_sync(conn, mode))
725 .await?
726 };
727
728 #[cfg(feature = "hnsw")]
729 {
730 let embedding_count: i64 = if use_writer {
731 self.with_write_conn(|conn| {
732 Ok(conn.query_row(
733 "SELECT (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
734 (SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
735 (SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
736 (SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
737 [],
738 |row| row.get(0),
739 )?)
740 })
741 .await?
742 } else {
743 self.with_read_conn(|conn| {
744 Ok(conn.query_row(
745 "SELECT (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
746 (SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
747 (SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
748 (SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
749 [],
750 |row| row.get(0),
751 )?)
752 })
753 .await?
754 };
755
756 if embedding_count > 0 && !self.inner.paths.hnsw_files_exist() {
757 report.issues.push(format!(
758 "HNSW sidecar files are missing while {} embedded rows exist in SQLite",
759 embedding_count
760 ));
761 }
762
763 let keymap_count: i64 = if use_writer {
764 self.with_write_conn(|conn| {
765 Ok(conn
766 .query_row(
767 "SELECT COUNT(*) FROM hnsw_keymap WHERE deleted = 0",
768 [],
769 |row| row.get(0),
770 )
771 .unwrap_or(0))
772 })
773 .await?
774 } else {
775 self.with_read_conn(|conn| {
776 Ok(conn
777 .query_row(
778 "SELECT COUNT(*) FROM hnsw_keymap WHERE deleted = 0",
779 [],
780 |row| row.get(0),
781 )
782 .unwrap_or(0))
783 })
784 .await?
785 };
786
787 if keymap_count != embedding_count {
788 report.issues.push(format!(
789 "HNSW keymap drift: {} active keymap rows vs {} embedded SQLite rows",
790 keymap_count, embedding_count
791 ));
792 }
793 }
794
795 report.ok = report.issues.is_empty();
796 Ok(report)
797 }
798
799 pub async fn reconcile(
805 &self,
806 action: db::ReconcileAction,
807 ) -> Result<db::IntegrityReport, MemoryError> {
808 match action {
809 db::ReconcileAction::ReportOnly => self.verify_integrity(db::VerifyMode::Full).await,
810 db::ReconcileAction::RebuildFts => {
811 self.with_write_conn(db::reconcile_fts).await?;
812 #[cfg(feature = "hnsw")]
813 self.sync_pending_hnsw_ops_best_effort("reconcile_rebuild_fts")
814 .await;
815 self.verify_integrity(db::VerifyMode::Full).await
816 }
817 db::ReconcileAction::ReEmbed => {
818 self.reembed_all().await?;
819 self.verify_integrity(db::VerifyMode::Full).await
820 }
821 }
822 }
823
824 pub fn config(&self) -> &MemoryConfig {
826 &self.inner.config
827 }
828
829 pub fn graph_view(&self) -> Arc<dyn GraphView> {
832 graph::graph_view(self.inner.clone())
833 }
834
835 pub async fn search(
839 &self,
840 query: &str,
841 top_k: Option<usize>,
842 namespaces: Option<&[&str]>,
843 source_types: Option<&[SearchSourceType]>,
844 ) -> Result<Vec<SearchResult>, MemoryError> {
845 let k = top_k.unwrap_or(self.inner.config.search.default_top_k);
846
847 let query_embedding = self.embed_text_internal(query).await?;
848
849 #[cfg(feature = "hnsw")]
850 let hnsw_hits = {
851 let candidates = self.inner.config.search.candidate_pool_size.max(k * 3);
852 self.hnsw_search_blocking(query_embedding.clone(), candidates)
853 .await
854 };
855
856 let q = query.to_string();
857 let config = self.inner.config.search.clone();
858 let ns_owned = to_owned_string_vec(namespaces);
859 let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|s| s.to_vec());
860
861 #[cfg(feature = "hnsw")]
862 let hnsw_hits_owned = hnsw_hits;
863
864 self.with_read_conn(move |conn| {
865 if db::is_embeddings_dirty(conn)? {
866 tracing::warn!(
867 "Embeddings are stale after model change — search quality is degraded. \
868 Call reembed_all() to regenerate embeddings."
869 );
870 }
871 let ns_refs = as_str_slice(&ns_owned);
872 let ns_slice: Option<&[&str]> = ns_refs.as_deref();
873 let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
874
875 #[cfg(feature = "hnsw")]
876 {
877 if hnsw_hits_owned.is_empty() {
878 search::hybrid_search(
879 conn,
880 &q,
881 &query_embedding,
882 &config,
883 k,
884 ns_slice,
885 st_slice,
886 None,
887 )
888 } else {
889 search::hybrid_search_with_hnsw(
890 conn,
891 &q,
892 &query_embedding,
893 &config,
894 k,
895 ns_slice,
896 st_slice,
897 None,
898 &hnsw_hits_owned,
899 )
900 }
901 }
902 #[cfg(not(feature = "hnsw"))]
903 {
904 search::hybrid_search(
905 conn,
906 &q,
907 &query_embedding,
908 &config,
909 k,
910 ns_slice,
911 st_slice,
912 None,
913 )
914 }
915 })
916 .await
917 }
918
919 pub async fn search_fts_only(
921 &self,
922 query: &str,
923 top_k: Option<usize>,
924 namespaces: Option<&[&str]>,
925 source_types: Option<&[SearchSourceType]>,
926 ) -> Result<Vec<SearchResult>, MemoryError> {
927 let k = top_k.unwrap_or(self.inner.config.search.default_top_k);
928 let q = query.to_string();
929 let config = self.inner.config.search.clone();
930 let ns_owned = to_owned_string_vec(namespaces);
931 let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|s| s.to_vec());
932 self.with_read_conn(move |conn| {
933 let ns_refs = as_str_slice(&ns_owned);
934 let ns_slice: Option<&[&str]> = ns_refs.as_deref();
935 let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
936 search::fts_only_search(conn, &q, &config, k, ns_slice, st_slice, None)
937 })
938 .await
939 }
940
941 pub async fn search_vector_only(
943 &self,
944 query: &str,
945 top_k: Option<usize>,
946 namespaces: Option<&[&str]>,
947 source_types: Option<&[SearchSourceType]>,
948 ) -> Result<Vec<SearchResult>, MemoryError> {
949 let k = top_k.unwrap_or(self.inner.config.search.default_top_k);
950 let query_embedding = self.embed_text_internal(query).await?;
951
952 #[cfg(feature = "hnsw")]
953 let hnsw_hits = {
954 let candidates = self.inner.config.search.candidate_pool_size.max(k * 3);
955 self.hnsw_search_blocking(query_embedding.clone(), candidates)
956 .await
957 };
958
959 let config = self.inner.config.search.clone();
960 let ns_owned = to_owned_string_vec(namespaces);
961 let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|s| s.to_vec());
962
963 #[cfg(feature = "hnsw")]
964 let hnsw_hits_owned = hnsw_hits;
965
966 self.with_read_conn(move |conn| {
967 if db::is_embeddings_dirty(conn)? {
968 tracing::warn!(
969 "Embeddings are stale after model change — search quality is degraded. \
970 Call reembed_all() to regenerate embeddings."
971 );
972 }
973 let ns_refs = as_str_slice(&ns_owned);
974 let ns_slice: Option<&[&str]> = ns_refs.as_deref();
975 let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
976
977 #[cfg(feature = "hnsw")]
978 {
979 if hnsw_hits_owned.is_empty() {
980 search::vector_only_search(
981 conn,
982 &query_embedding,
983 &config,
984 k,
985 ns_slice,
986 st_slice,
987 None,
988 )
989 } else {
990 search::vector_only_search_with_hnsw(
991 conn,
992 &query_embedding,
993 &config,
994 k,
995 ns_slice,
996 st_slice,
997 None,
998 &hnsw_hits_owned,
999 )
1000 }
1001 }
1002 #[cfg(not(feature = "hnsw"))]
1003 {
1004 search::vector_only_search(
1005 conn,
1006 &query_embedding,
1007 &config,
1008 k,
1009 ns_slice,
1010 st_slice,
1011 None,
1012 )
1013 }
1014 })
1015 .await
1016 }
1017
1018 pub async fn search_explained(
1022 &self,
1023 query: &str,
1024 top_k: Option<usize>,
1025 namespaces: Option<&[&str]>,
1026 source_types: Option<&[SearchSourceType]>,
1027 ) -> Result<Vec<types::ExplainedResult>, MemoryError> {
1028 let k = top_k.unwrap_or(self.inner.config.search.default_top_k);
1029 let query_embedding = self.embed_text_internal(query).await?;
1030
1031 #[cfg(feature = "hnsw")]
1032 let hnsw_hits = {
1033 let candidates = self.inner.config.search.candidate_pool_size.max(k * 3);
1034 self.hnsw_search_blocking(query_embedding.clone(), candidates)
1035 .await
1036 };
1037
1038 let q = query.to_string();
1039 let config = self.inner.config.search.clone();
1040 let ns_owned = to_owned_string_vec(namespaces);
1041 let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|value| value.to_vec());
1042
1043 #[cfg(feature = "hnsw")]
1044 let hnsw_hits_owned = hnsw_hits;
1045
1046 self.with_read_conn(move |conn| {
1047 let ns_refs = as_str_slice(&ns_owned);
1048 let ns_slice: Option<&[&str]> = ns_refs.as_deref();
1049 let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
1050
1051 #[cfg(feature = "hnsw")]
1052 {
1053 if hnsw_hits_owned.is_empty() {
1054 search::hybrid_search_detailed(
1055 conn,
1056 &q,
1057 &query_embedding,
1058 &config,
1059 k,
1060 ns_slice,
1061 st_slice,
1062 None,
1063 )
1064 } else {
1065 search::hybrid_search_with_hnsw_detailed(
1066 conn,
1067 &q,
1068 &query_embedding,
1069 &config,
1070 k,
1071 ns_slice,
1072 st_slice,
1073 None,
1074 &hnsw_hits_owned,
1075 )
1076 }
1077 }
1078 #[cfg(not(feature = "hnsw"))]
1079 {
1080 search::hybrid_search_detailed(
1081 conn,
1082 &q,
1083 &query_embedding,
1084 &config,
1085 k,
1086 ns_slice,
1087 st_slice,
1088 None,
1089 )
1090 }
1091 })
1092 .await
1093 }
1094
1095 pub async fn embedding_displacement(
1099 &self,
1100 text_a: &str,
1101 text_b: &str,
1102 ) -> Result<types::EmbeddingDisplacement, MemoryError> {
1103 let emb_a = self.embed_text_internal(text_a).await?;
1104 let emb_b = self.embed_text_internal(text_b).await?;
1105 Self::embedding_displacement_from_vecs(&emb_a, &emb_b)
1106 }
1107
1108 pub fn embedding_displacement_from_vecs(
1110 a: &[f32],
1111 b: &[f32],
1112 ) -> Result<types::EmbeddingDisplacement, MemoryError> {
1113 if a.len() != b.len() {
1114 return Err(MemoryError::DimensionMismatch {
1115 expected: a.len(),
1116 actual: b.len(),
1117 });
1118 }
1119 let cosine_sim = search::cosine_similarity(a, b);
1120
1121 let euclidean_dist: f32 = a
1122 .iter()
1123 .zip(b.iter())
1124 .map(|(x, y)| (x - y) * (x - y))
1125 .sum::<f32>()
1126 .sqrt();
1127
1128 let mag_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
1129 let mag_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
1130
1131 Ok(types::EmbeddingDisplacement {
1132 cosine_similarity: cosine_sim,
1133 euclidean_distance: euclidean_dist,
1134 magnitude_a: mag_a,
1135 magnitude_b: mag_b,
1136 })
1137 }
1138
1139 pub fn chunk_text(&self, text: &str) -> Vec<TextChunk> {
1143 chunker::chunk_text(
1144 text,
1145 &self.inner.config.chunking,
1146 self.inner.token_counter.as_ref(),
1147 )
1148 }
1149
1150 pub async fn embed(&self, text: &str) -> Result<Vec<f32>, MemoryError> {
1152 self.embed_text_internal(text).await
1153 }
1154
1155 pub async fn embed_batch(&self, texts: &[&str]) -> Result<Vec<Vec<f32>>, MemoryError> {
1157 let owned: Vec<String> = texts.iter().map(|s| s.to_string()).collect();
1158 self.embed_batch_internal(owned).await
1159 }
1160
1161 pub async fn stats(&self) -> Result<MemoryStats, MemoryError> {
1163 let db_path = self.inner.paths.sqlite_path.clone();
1164 self.with_read_conn(move |conn| {
1165 let total_facts: u64 =
1166 conn.query_row("SELECT COUNT(*) FROM facts", [], |r| r.get(0))?;
1167 let total_documents: u64 =
1168 conn.query_row("SELECT COUNT(*) FROM documents", [], |r| r.get(0))?;
1169 let total_chunks: u64 =
1170 conn.query_row("SELECT COUNT(*) FROM chunks", [], |r| r.get(0))?;
1171 let total_sessions: u64 =
1172 conn.query_row("SELECT COUNT(*) FROM sessions", [], |r| r.get(0))?;
1173 let total_messages: u64 =
1174 conn.query_row("SELECT COUNT(*) FROM messages", [], |r| r.get(0))?;
1175
1176 let db_size = std::fs::metadata(&db_path).map(|m| m.len()).unwrap_or(0);
1177
1178 let (model, dims): (Option<String>, Option<usize>) = conn
1179 .query_row(
1180 "SELECT model_name, dimensions FROM embedding_metadata WHERE id = 1",
1181 [],
1182 |r| Ok((Some(r.get(0)?), Some(r.get(1)?))),
1183 )
1184 .unwrap_or((None, None));
1185
1186 Ok(MemoryStats {
1187 total_facts,
1188 total_documents,
1189 total_chunks,
1190 total_sessions,
1191 total_messages,
1192 database_size_bytes: db_size,
1193 embedding_model: model,
1194 embedding_dimensions: dims,
1195 })
1196 })
1197 .await
1198 }
1199
1200 pub async fn list_scope_domains(&self) -> Result<Vec<String>, MemoryError> {
1206 self.with_read_conn(|conn| {
1207 let mut stmt = conn.prepare(
1208 "SELECT DISTINCT json_extract(metadata, '$.scope_domain') \
1209 FROM documents \
1210 WHERE json_extract(metadata, '$.scope_domain') IS NOT NULL",
1211 )?;
1212 let domains: Vec<String> = stmt
1213 .query_map([], |row| row.get::<_, String>(0))?
1214 .filter_map(|r| r.ok())
1215 .collect();
1216 Ok(domains)
1217 })
1218 .await
1219 }
1220
1221 pub async fn embeddings_are_dirty(&self) -> Result<bool, MemoryError> {
1223 self.with_read_conn(db::is_embeddings_dirty).await
1224 }
1225
1226 pub async fn reembed_all(&self) -> Result<usize, MemoryError> {
1228 let mut count = 0usize;
1229 let batch_size = self.inner.config.embedding.batch_size;
1230 let dims = self.inner.config.embedding.dimensions;
1231
1232 let fact_contents: Vec<(String, String)> = self
1234 .with_read_conn(|conn| {
1235 let mut stmt = conn.prepare("SELECT id, content FROM facts")?;
1236 let result = stmt
1237 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1238 .collect::<Result<Vec<_>, _>>()?;
1239 Ok(result)
1240 })
1241 .await?;
1242
1243 let mut fact_count = 0usize;
1244 for batch in fact_contents.chunks(batch_size) {
1245 let texts: Vec<String> = batch.iter().map(|(_, c)| c.clone()).collect();
1246 let embeddings = self.embed_batch_internal(texts).await?;
1247 for embedding in &embeddings {
1248 self.validate_embedding_dimensions(embedding)?;
1249 }
1250
1251 let quantizer = Quantizer::new(dims);
1252 let updates: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = batch
1253 .iter()
1254 .zip(embeddings.iter())
1255 .map(|((id, _), emb)| {
1256 let q8 = quantizer
1258 .quantize(emb)
1259 .map(|qv| quantize::pack_quantized(&qv))
1260 .ok();
1261 (id.clone(), db::embedding_to_bytes(emb), q8)
1262 })
1263 .collect();
1264
1265 self.with_write_conn(move |conn| {
1266 db::with_transaction(conn, |tx| {
1267 for (fid, bytes, q8) in &updates {
1268 tx.execute(
1269 "UPDATE facts SET embedding = ?1, embedding_q8 = ?2, updated_at = datetime('now') WHERE id = ?3",
1270 rusqlite::params![bytes, q8.as_deref(), fid],
1271 )?;
1272 #[cfg(feature = "hnsw")]
1273 db::queue_pending_index_op(
1274 tx,
1275 &format!("fact:{fid}"),
1276 "fact",
1277 db::IndexOpKind::Upsert,
1278 )?;
1279 }
1280 Ok(())
1281 })
1282 })
1283 .await?;
1284
1285 fact_count += batch.len();
1286 count += batch.len();
1287 if fact_count % 100 == 0 || fact_count == count {
1288 tracing::info!(fact_count, "Re-embedded {} facts so far", fact_count);
1289 }
1290 }
1291
1292 let chunk_data: Vec<(String, String)> = self
1294 .with_read_conn(|conn| {
1295 let mut stmt = conn.prepare("SELECT id, content FROM chunks")?;
1296 let result = stmt
1297 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1298 .collect::<Result<Vec<_>, _>>()?;
1299 Ok(result)
1300 })
1301 .await?;
1302
1303 let mut chunk_count = 0usize;
1304 for batch in chunk_data.chunks(batch_size) {
1305 let texts: Vec<String> = batch.iter().map(|(_, c)| c.clone()).collect();
1306 let embeddings = self.embed_batch_internal(texts).await?;
1307 for embedding in &embeddings {
1308 self.validate_embedding_dimensions(embedding)?;
1309 }
1310
1311 let quantizer = Quantizer::new(dims);
1312 let updates: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = batch
1313 .iter()
1314 .zip(embeddings.iter())
1315 .map(|((id, _), emb)| {
1316 let q8 = quantizer
1318 .quantize(emb)
1319 .map(|qv| quantize::pack_quantized(&qv))
1320 .ok();
1321 (id.clone(), db::embedding_to_bytes(emb), q8)
1322 })
1323 .collect();
1324
1325 self.with_write_conn(move |conn| {
1326 db::with_transaction(conn, |tx| {
1327 for (cid, bytes, q8) in &updates {
1328 tx.execute(
1329 "UPDATE chunks SET embedding = ?1, embedding_q8 = ?2 WHERE id = ?3",
1330 rusqlite::params![bytes, q8.as_deref(), cid],
1331 )?;
1332 #[cfg(feature = "hnsw")]
1333 db::queue_pending_index_op(
1334 tx,
1335 &format!("chunk:{cid}"),
1336 "chunk",
1337 db::IndexOpKind::Upsert,
1338 )?;
1339 }
1340 Ok(())
1341 })
1342 })
1343 .await?;
1344
1345 chunk_count += batch.len();
1346 count += batch.len();
1347 if chunk_count % 100 == 0 {
1348 tracing::info!(chunk_count, "Re-embedded {} chunks so far", chunk_count);
1349 }
1350 }
1351
1352 let message_data: Vec<(i64, String)> = self
1354 .with_read_conn(|conn| {
1355 let mut stmt = conn.prepare("SELECT id, content FROM messages")?;
1356 let result = stmt
1357 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1358 .collect::<Result<Vec<_>, _>>()?;
1359 Ok(result)
1360 })
1361 .await?;
1362
1363 let mut msg_count = 0usize;
1364 for batch in message_data.chunks(batch_size) {
1365 let texts: Vec<String> = batch.iter().map(|(_, c)| c.clone()).collect();
1366 let embeddings = self.embed_batch_internal(texts).await?;
1367 for embedding in &embeddings {
1368 self.validate_embedding_dimensions(embedding)?;
1369 }
1370
1371 let quantizer = Quantizer::new(dims);
1372 let updates: Vec<(i64, Vec<u8>, Option<Vec<u8>>)> = batch
1373 .iter()
1374 .zip(embeddings.iter())
1375 .map(|((id, _), emb)| {
1376 let q8 = quantizer
1378 .quantize(emb)
1379 .map(|qv| quantize::pack_quantized(&qv))
1380 .ok();
1381 (*id, db::embedding_to_bytes(emb), q8)
1382 })
1383 .collect();
1384
1385 self.with_write_conn(move |conn| {
1386 db::with_transaction(conn, |tx| {
1387 for (mid, bytes, q8) in &updates {
1388 tx.execute(
1389 "UPDATE messages SET embedding = ?1, embedding_q8 = ?2 WHERE id = ?3",
1390 rusqlite::params![bytes, q8.as_deref(), mid],
1391 )?;
1392 #[cfg(feature = "hnsw")]
1393 db::queue_pending_index_op(
1394 tx,
1395 &format!("msg:{mid}"),
1396 "message",
1397 db::IndexOpKind::Upsert,
1398 )?;
1399 }
1400 Ok(())
1401 })
1402 })
1403 .await?;
1404
1405 msg_count += batch.len();
1406 count += batch.len();
1407 if msg_count % 100 == 0 {
1408 tracing::info!(msg_count, "Re-embedded {} messages so far", msg_count);
1409 }
1410 }
1411
1412 let episode_data: Vec<(String, String)> = self
1414 .with_read_conn(|conn| {
1415 let mut stmt = conn.prepare("SELECT episode_id, search_text FROM episodes")?;
1416 let result = stmt
1417 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
1418 .collect::<Result<Vec<_>, _>>()?;
1419 Ok(result)
1420 })
1421 .await?;
1422
1423 let mut episode_count = 0usize;
1424 for batch in episode_data.chunks(batch_size) {
1425 let texts: Vec<String> = batch.iter().map(|(_, text)| text.clone()).collect();
1426 let embeddings = self.embed_batch_internal(texts).await?;
1427 for embedding in &embeddings {
1428 self.validate_embedding_dimensions(embedding)?;
1429 }
1430
1431 let quantizer = Quantizer::new(dims);
1432 let updates: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = batch
1433 .iter()
1434 .zip(embeddings.iter())
1435 .map(|((episode_id, _), embedding)| {
1436 let q8 = quantizer
1438 .quantize(embedding)
1439 .map(|vector| quantize::pack_quantized(&vector))
1440 .ok();
1441 (episode_id.clone(), db::embedding_to_bytes(embedding), q8)
1442 })
1443 .collect();
1444
1445 self.with_write_conn(move |conn| {
1446 db::with_transaction(conn, |tx| {
1447 for (episode_id, bytes, q8) in &updates {
1448 tx.execute(
1449 "UPDATE episodes
1450 SET embedding = ?1,
1451 embedding_q8 = ?2,
1452 updated_at = datetime('now')
1453 WHERE episode_id = ?3",
1454 rusqlite::params![bytes, q8.as_deref(), episode_id],
1455 )?;
1456 #[cfg(feature = "hnsw")]
1457 db::queue_pending_index_op(
1458 tx,
1459 &episodes::episode_item_key(episode_id),
1460 "episode",
1461 db::IndexOpKind::Upsert,
1462 )?;
1463 }
1464 Ok(())
1465 })
1466 })
1467 .await?;
1468
1469 episode_count += batch.len();
1470 count += batch.len();
1471 if episode_count % 100 == 0 {
1472 tracing::info!(
1473 episode_count,
1474 "Re-embedded {} episodes so far",
1475 episode_count
1476 );
1477 }
1478 }
1479
1480 self.with_write_conn(db::clear_embeddings_dirty).await?;
1482
1483 tracing::info!(
1484 facts = fact_count,
1485 chunks = chunk_count,
1486 messages = msg_count,
1487 episodes = episode_count,
1488 total = count,
1489 "Re-embedding complete"
1490 );
1491
1492 #[cfg(feature = "hnsw")]
1494 {
1495 tracing::info!("Rebuilding HNSW index after re-embedding...");
1496 self.rebuild_hnsw_index().await?;
1497 }
1498
1499 Ok(count)
1500 }
1501
1502 pub async fn vacuum(&self) -> Result<(), MemoryError> {
1504 self.with_write_conn(|conn| {
1505 conn.execute_batch("VACUUM")?;
1506 Ok(())
1507 })
1508 .await
1509 }
1510
1511 #[deprecated(
1534 since = "0.5.0",
1535 note = "Legacy V10 import envelope path is compatibility-only. Use `import_projection_batch()` and `ProjectionImportBatchV3` on the canonical lane."
1536 )]
1537 #[doc(hidden)]
1538 #[allow(deprecated)]
1539 pub async fn import_envelope(
1540 &self,
1541 envelope: &projection_import::ImportEnvelope,
1542 ) -> Result<projection_import::ImportReceipt, MemoryError> {
1543 projection_legacy_compat::import_envelope(self, envelope).await
1544 }
1545
1546 #[deprecated(
1548 since = "0.5.0",
1549 note = "Legacy V10 import envelope status reads are compatibility-only. Prefer the projection import log."
1550 )]
1551 #[doc(hidden)]
1552 #[allow(deprecated)]
1553 pub async fn import_status(
1554 &self,
1555 envelope_id: &projection_import::EnvelopeId,
1556 ) -> Result<Vec<projection_import::ImportReceipt>, MemoryError> {
1557 projection_legacy_compat::import_status(self, envelope_id).await
1558 }
1559
1560 #[deprecated(
1562 since = "0.5.0",
1563 note = "Legacy V10 import log access is compatibility-only. Prefer new projection-import metadata."
1564 )]
1565 #[doc(hidden)]
1566 #[allow(deprecated)]
1567 pub async fn list_imports(
1568 &self,
1569 namespace: Option<&str>,
1570 limit: usize,
1571 ) -> Result<Vec<projection_import::ImportReceipt>, MemoryError> {
1572 projection_legacy_compat::list_imports(self, namespace, limit).await
1573 }
1574
1575 #[allow(deprecated)]
1577 pub async fn last_import_at(&self, namespace: &str) -> Result<Option<String>, MemoryError> {
1578 projection_legacy_compat::last_import_at(self, namespace).await
1579 }
1580
1581 pub async fn query_claim_versions(
1583 &self,
1584 query: ProjectionQuery,
1585 ) -> Result<Vec<ProjectionClaimVersion>, MemoryError> {
1586 self.with_read_conn(move |conn| projection_storage::query_claim_versions(conn, &query))
1587 .await
1588 }
1589
1590 pub async fn query_relation_versions(
1592 &self,
1593 query: ProjectionQuery,
1594 ) -> Result<Vec<ProjectionRelationVersion>, MemoryError> {
1595 self.with_read_conn(move |conn| projection_storage::query_relation_versions(conn, &query))
1596 .await
1597 }
1598
1599 pub async fn query_episodes(
1601 &self,
1602 query: ProjectionQuery,
1603 ) -> Result<Vec<ProjectionEpisode>, MemoryError> {
1604 self.with_read_conn(move |conn| projection_storage::query_episode_rows(conn, &query))
1605 .await
1606 }
1607
1608 pub async fn query_entity_aliases(
1610 &self,
1611 query: ProjectionQuery,
1612 ) -> Result<Vec<ProjectionEntityAlias>, MemoryError> {
1613 self.with_read_conn(move |conn| projection_storage::query_entity_aliases(conn, &query))
1614 .await
1615 }
1616
1617 pub async fn query_evidence_refs(
1619 &self,
1620 query: ProjectionQuery,
1621 ) -> Result<Vec<ProjectionEvidenceRef>, MemoryError> {
1622 self.with_read_conn(move |conn| projection_storage::query_evidence_refs(conn, &query))
1623 .await
1624 }
1625
1626 #[cfg(any(test, feature = "testing"))]
1628 pub async fn raw_execute(&self, sql: &str, params: Vec<String>) -> Result<usize, MemoryError> {
1629 let sql = sql.to_string();
1630 self.with_write_conn(move |conn| {
1631 let param_refs: Vec<&dyn rusqlite::types::ToSql> = params
1632 .iter()
1633 .map(|s| s as &dyn rusqlite::types::ToSql)
1634 .collect();
1635 Ok(conn.execute(&sql, &*param_refs)?)
1636 })
1637 .await
1638 }
1639}