#![allow(deprecated)]
#[cfg(not(any(feature = "hnsw", feature = "brute-force")))]
compile_error!("At least one search backend feature must be enabled: 'hnsw' or 'brute-force'");
pub mod chunker;
pub mod config;
pub(crate) mod conversation;
pub mod db;
pub(crate) mod documents;
pub mod embedder;
pub(crate) mod episodes;
pub mod error;
mod graph;
#[cfg(feature = "hnsw")]
pub mod hnsw;
#[cfg(feature = "hnsw")]
mod hnsw_ops;
mod json_compat_import;
pub(crate) mod knowledge;
mod pool;
mod projection_batch;
mod projection_derivation;
#[deprecated(
since = "0.6.0",
note = "Legacy V10 import path is migration-only. Use `import_projection_batch()` with `ProjectionImportBatchV3` on the canonical lane."
)]
#[doc(hidden)]
pub mod projection_import;
mod projection_lane;
mod projection_legacy_compat;
pub(crate) mod projection_storage;
pub mod quantize;
pub mod search;
pub mod storage;
mod store_support;
pub mod tokenizer;
pub mod types;
pub use config::{
ChunkingConfig, EmbeddingConfig, MemoryConfig, MemoryLimits, PoolConfig, SearchConfig,
};
pub use db::{IntegrityReport, ReconcileAction, VerifyMode};
pub use embedder::{Embedder, MockEmbedder, OllamaEmbedder};
pub use error::MemoryError;
#[cfg(feature = "hnsw")]
pub use hnsw::{HnswConfig, HnswHit, HnswIndex};
pub(crate) use projection_lane::projection_import_failure_id;
pub use projection_lane::{
ProjectionImportFailureReceiptEntry, ProjectionImportLogEntry, ProjectionImportResult,
};
pub use quantize::{pack_quantized, unpack_quantized, QuantizedVector, Quantizer};
pub use storage::StoragePaths;
pub use tokenizer::{EstimateTokenCounter, TokenCounter};
pub use types::{
Document, EmbeddingDisplacement, EpisodeMeta, EpisodeOutcome, ExplainedResult, Fact,
GraphDirection, GraphEdge, GraphEdgeType, GraphView, MemoryStats, Message,
ProjectionClaimVersion, ProjectionEntityAlias, ProjectionEpisode, ProjectionEvidenceRef,
ProjectionQuery, ProjectionRelationVersion, Role, ScoreBreakdown, SearchResult, SearchSource,
SearchSourceType, Session, TextChunk, VerificationStatus,
};
use std::sync::Arc;
pub(crate) use store_support::{
as_str_slice, build_episode_search_text, merge_trace_ctx, to_owned_string_vec,
verification_status_for_outcome,
};
#[doc(hidden)]
pub mod compat {
#[deprecated(
since = "0.5.0",
note = "Legacy ImportEnvelope is migration-only. New integrations should use `ProjectionImportBatchV3` on the canonical lane."
)]
#[doc(hidden)]
#[allow(deprecated)]
pub mod legacy_import_envelope {
pub use crate::projection_import::{
ImportEnvelope, ImportProjectionFreshness, ImportReceipt, ImportRecord, ImportStatus,
};
pub use stack_ids::EnvelopeId;
}
#[deprecated(
since = "0.5.0",
note = "Legacy trace_id is migration-only. Use `stack_ids::TraceCtx`."
)]
#[doc(hidden)]
#[allow(deprecated)]
pub mod compat_trace_id {
pub use crate::types::TraceId;
}
}
#[derive(Clone)]
pub struct MemoryStore {
inner: Arc<MemoryStoreInner>,
}
struct MemoryStoreInner {
pool: pool::SqlitePool,
embedder: Box<dyn Embedder>,
embedding_permits: Arc<tokio::sync::Semaphore>,
config: MemoryConfig,
paths: StoragePaths,
token_counter: Arc<dyn TokenCounter>,
#[cfg(feature = "hnsw")]
hnsw_index: std::sync::RwLock<HnswIndex>,
}
#[cfg(feature = "hnsw")]
impl Drop for MemoryStoreInner {
fn drop(&mut self) {
if !self.paths.hnsw_dir.exists() {
tracing::debug!(
path = %self.paths.hnsw_dir.display(),
"Skipping HNSW drop flush because the sidecar directory no longer exists"
);
return;
}
let pending_ops = match self.pool.with_read_conn(db::pending_index_op_count) {
Ok(count) => count,
Err(err) => {
tracing::warn!("Failed to inspect pending HNSW work on drop: {}", err);
0
}
};
if pending_ops > 0 {
if let Err(err) =
hnsw_ops::recover_hnsw_sidecar_sync(&self.pool, &self.paths, &self.config.hnsw)
{
tracing::error!("Failed to recover and flush HNSW on drop: {}", err);
}
return;
}
let hnsw_guard = match self.hnsw_index.read() {
Ok(g) => g,
Err(_) => {
tracing::warn!("HNSW RwLock poisoned on drop — skipping save");
return;
}
};
if let Err(err) = hnsw_ops::save_hnsw_sidecar(
&hnsw_guard,
&self.paths.hnsw_dir,
&self.paths.hnsw_basename,
) {
tracing::error!("Failed to save HNSW index on drop: {}", err);
}
if let Err(e) = self
.pool
.with_write_conn(|conn| hnsw_guard.flush_keymap(conn))
{
tracing::error!("Failed to flush HNSW keymap on drop: {}", e);
}
}
}
impl MemoryStore {
async fn with_read_conn<F, T>(&self, f: F) -> Result<T, MemoryError>
where
F: FnOnce(&rusqlite::Connection) -> Result<T, MemoryError> + Send + 'static,
T: Send + 'static,
{
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || -> Result<T, MemoryError> {
inner.pool.with_read_conn(f)
})
.await
.map_err(|e| MemoryError::Other(format!("Blocking task panicked: {}", e)))?
}
async fn with_write_conn<F, T>(&self, f: F) -> Result<T, MemoryError>
where
F: FnOnce(&rusqlite::Connection) -> Result<T, MemoryError> + Send + 'static,
T: Send + 'static,
{
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || -> Result<T, MemoryError> {
inner.pool.with_write_conn(f)
})
.await
.map_err(|e| MemoryError::Other(format!("Blocking task panicked: {}", e)))?
}
#[cfg(feature = "hnsw")]
async fn hnsw_search_blocking(
&self,
query_embedding: Vec<f32>,
candidates: usize,
) -> Vec<HnswHit> {
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || {
let guard = inner.hnsw_index.read().unwrap_or_else(|e| e.into_inner());
match guard.search(&query_embedding, candidates) {
Ok(hits) => hits,
Err(e) => {
tracing::error!(
"HNSW search failed, falling back to brute-force vector search: {}",
e
);
Vec::new()
}
}
})
.await
.unwrap_or_else(|e| {
tracing::error!("HNSW search blocking task panicked: {}", e);
Vec::new()
})
}
#[cfg(feature = "hnsw")]
fn sync_pending_hnsw_ops_blocking(&self) -> Result<usize, MemoryError> {
hnsw_ops::sync_pending_hnsw_sidecar(&self.inner)
}
#[cfg(feature = "hnsw")]
async fn sync_pending_hnsw_ops(&self) -> Result<usize, MemoryError> {
let inner = self.inner.clone();
tokio::task::spawn_blocking(move || hnsw_ops::sync_pending_hnsw_sidecar(&inner))
.await
.map_err(|e| MemoryError::Other(format!("Blocking task panicked: {}", e)))?
}
#[cfg(feature = "hnsw")]
async fn sync_pending_hnsw_ops_best_effort(&self, operation: &'static str) {
if let Err(err) = self.sync_pending_hnsw_ops().await {
tracing::warn!(
operation,
error = %err,
"SQLite write committed but HNSW sidecar sync is still pending"
);
} else {
self.maybe_flush_hnsw();
}
}
pub fn open(config: MemoryConfig) -> Result<Self, MemoryError> {
let config = config.normalize_and_validate()?;
let embedder = Box::new(OllamaEmbedder::try_new(&config.embedding)?);
Self::open_with_embedder(config, embedder)
}
#[allow(unused_mut)] pub fn open_with_embedder(
mut config: MemoryConfig,
embedder: Box<dyn Embedder>,
) -> Result<Self, MemoryError> {
config = config.normalize_and_validate()?;
if embedder.dimensions() != config.embedding.dimensions {
return Err(MemoryError::DimensionMismatch {
expected: config.embedding.dimensions,
actual: embedder.dimensions(),
});
}
config.embedding.model = embedder.model_name().to_string();
let paths = StoragePaths::new(&config.base_dir);
std::fs::create_dir_all(&paths.base_dir).map_err(|e| {
MemoryError::StorageError(format!(
"Failed to create directory {}: {}",
paths.base_dir.display(),
e
))
})?;
let pool = pool::SqlitePool::open(&paths.sqlite_path, &config.pool, &config.limits)?;
pool.with_write_conn(|conn| db::check_embedding_metadata(conn, &config.embedding))?;
#[cfg(feature = "hnsw")]
{
config.hnsw.dimensions = config.embedding.dimensions;
}
let token_counter = config
.token_counter
.clone()
.unwrap_or_else(tokenizer::default_token_counter);
#[cfg(feature = "hnsw")]
let hnsw_index = {
let hnsw_config = config.hnsw.clone();
let embeddings_dirty = pool.with_read_conn(db::is_embeddings_dirty)?;
let pending_index_ops = pool.with_read_conn(db::pending_index_op_count)?;
if embeddings_dirty {
tracing::warn!(
"Embedding model changed — creating fresh HNSW index (old index is stale)"
);
pool.with_write_conn(|conn| {
db::clear_all_pending_index_ops(conn)?;
db::set_sidecar_dirty(conn, false)?;
Ok(())
})?;
HnswIndex::new(hnsw_config)?
} else if pending_index_ops > 0 || pool.with_read_conn(db::is_sidecar_dirty)? {
tracing::warn!(
pending_index_ops,
"Recovering HNSW sidecar from SQLite because durable sidecar work exists"
);
hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?
} else if paths.hnsw_files_exist() {
tracing::info!("Loading HNSW index from {:?}", paths.hnsw_dir);
match HnswIndex::load(&paths.hnsw_dir, &paths.hnsw_basename, hnsw_config.clone()) {
Ok(index) => {
if let Err(e) = pool.with_write_conn(|conn| index.load_keymap(conn)) {
tracing::warn!("Failed to load HNSW key mappings: {}. Mappings will be empty until rebuild.", e);
}
let hnsw_count = index.len();
let sqlite_count: i64 = pool.with_read_conn(|conn| {
Ok(conn
.query_row(
"SELECT (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
(SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
(SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
(SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
[],
|row| row.get(0),
)
.unwrap_or(0))
})?;
let drift = (sqlite_count - hnsw_count as i64).abs();
if drift > 0 {
tracing::warn!(
hnsw_count,
sqlite_count,
drift,
"HNSW index is stale — {} entries differ from SQLite. \
Likely caused by unclean shutdown. Triggering inline rebuild.",
drift
);
let rebuilt =
hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?;
tracing::info!(
active = rebuilt.len(),
"HNSW index rebuilt after stale detection"
);
rebuilt
} else {
tracing::info!(
"HNSW index loaded ({} active keys, in sync with SQLite)",
hnsw_count
);
index
}
}
Err(e) => {
tracing::warn!(
"Failed to load HNSW index: {}. Creating new empty index.",
e
);
HnswIndex::new(hnsw_config)?
}
}
} else {
let orphan_count: i64 = pool.with_read_conn(|conn| {
Ok(conn
.query_row(
"SELECT (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
(SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
(SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
(SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
[],
|row| row.get(0),
)
.unwrap_or(0))
})?;
if orphan_count > 0 {
tracing::warn!(
orphan_count,
"HNSW sidecar files missing but {} embeddings exist in SQLite — \
rebuilding index inline",
orphan_count
);
let new_index =
hnsw_ops::recover_hnsw_sidecar_sync(&pool, &paths, &hnsw_config)?;
tracing::info!(
active = new_index.len(),
"HNSW index rebuilt from SQLite embeddings"
);
new_index
} else {
tracing::info!("Creating new empty HNSW index (no embeddings in SQLite)");
HnswIndex::new(hnsw_config)?
}
}
};
let store = Self {
inner: Arc::new(MemoryStoreInner {
pool,
embedder,
embedding_permits: Arc::new(tokio::sync::Semaphore::new(
config.limits.max_embedding_concurrency,
)),
config,
paths,
token_counter,
#[cfg(feature = "hnsw")]
hnsw_index: std::sync::RwLock::new(hnsw_index),
}),
};
#[cfg(feature = "hnsw")]
if let Err(err) = store.sync_pending_hnsw_ops_blocking() {
tracing::warn!(
error = %err,
"Failed to reconcile pending HNSW sidecar ops during open; sidecar replay remains pending"
);
}
Ok(store)
}
async fn with_embedding_permit(
&self,
) -> Result<tokio::sync::OwnedSemaphorePermit, MemoryError> {
self.inner
.embedding_permits
.clone()
.acquire_owned()
.await
.map_err(|e| MemoryError::Other(format!("embedding semaphore closed: {e}")))
}
async fn embed_text_internal(&self, text: &str) -> Result<Vec<f32>, MemoryError> {
let _permit = self.with_embedding_permit().await?;
self.inner.embedder.embed(text).await
}
async fn embed_batch_internal(&self, texts: Vec<String>) -> Result<Vec<Vec<f32>>, MemoryError> {
let _permit = self.with_embedding_permit().await?;
self.inner.embedder.embed_batch(texts).await
}
fn validate_embedding_dimensions(&self, embedding: &[f32]) -> Result<(), MemoryError> {
let expected = self.inner.config.embedding.dimensions;
if embedding.len() != expected {
return Err(MemoryError::DimensionMismatch {
expected,
actual: embedding.len(),
});
}
Ok(())
}
fn validate_content(&self, field: &'static str, content: &str) -> Result<(), MemoryError> {
if content.is_empty() {
return Err(MemoryError::InvalidConfig {
field,
reason: "content must not be empty".to_string(),
});
}
let limit = self.inner.config.limits.max_content_bytes;
if content.len() > limit {
return Err(MemoryError::ContentTooLarge {
size: content.len(),
limit,
});
}
Ok(())
}
fn validate_confidence(confidence: f32) -> Result<(), MemoryError> {
if !confidence.is_finite() || !(0.0..=1.0).contains(&confidence) {
return Err(MemoryError::InvalidConfig {
field: "episodes.confidence",
reason: "confidence must be finite and within [0.0, 1.0]".to_string(),
});
}
Ok(())
}
#[cfg(feature = "hnsw")]
pub async fn rebuild_hnsw_index(&self) -> Result<(), MemoryError> {
tracing::info!("Rebuilding HNSW index from SQLite embeddings...");
let hnsw_config = self.inner.config.hnsw.clone();
let new_index = self
.with_read_conn(move |conn| hnsw_ops::rebuild_hnsw_from_sqlite(conn, &hnsw_config))
.await?;
{
let mut guard = self
.inner
.hnsw_index
.write()
.unwrap_or_else(|e| e.into_inner());
*guard = new_index.clone();
}
hnsw_ops::save_hnsw_sidecar(
&new_index,
&self.inner.paths.hnsw_dir,
&self.inner.paths.hnsw_basename,
)?;
self.inner.pool.with_write_conn(|conn| {
new_index.flush_keymap(conn)?;
db::clear_all_pending_index_ops(conn)?;
db::set_sidecar_dirty(conn, false)?;
Ok(())
})?;
tracing::info!(active = new_index.len(), "HNSW index rebuilt");
Ok(())
}
#[cfg(feature = "hnsw")]
fn maybe_flush_hnsw(&self) {
if let Some(interval) = self.inner.config.hnsw.flush_interval_secs {
let guard = self
.inner
.hnsw_index
.read()
.unwrap_or_else(|e| e.into_inner());
if guard.should_flush(interval) {
drop(guard); if let Err(e) = self.flush_hnsw() {
tracing::warn!("Opportunistic HNSW flush failed: {}", e);
} else {
let guard = self
.inner
.hnsw_index
.read()
.unwrap_or_else(|e| e.into_inner());
guard.update_last_flush_epoch();
tracing::info!("Opportunistic HNSW flush completed");
}
}
}
}
#[cfg(feature = "hnsw")]
pub fn flush_hnsw(&self) -> Result<(), MemoryError> {
let pending_ops = self.inner.pool.with_read_conn(db::pending_index_op_count)?;
if pending_ops > 0 {
tracing::info!(
pending_ops,
"Flushing HNSW via authoritative SQLite rebuild because pending durable sidecar work exists"
);
let rebuilt = hnsw_ops::recover_hnsw_sidecar_sync(
&self.inner.pool,
&self.inner.paths,
&self.inner.config.hnsw,
)?;
let mut guard = self
.inner
.hnsw_index
.write()
.unwrap_or_else(|e| e.into_inner());
*guard = rebuilt;
return Ok(());
}
let index = self
.inner
.hnsw_index
.read()
.unwrap_or_else(|e| e.into_inner())
.clone();
hnsw_ops::save_hnsw_sidecar(
&index,
&self.inner.paths.hnsw_dir,
&self.inner.paths.hnsw_basename,
)?;
self.inner.pool.with_write_conn(|conn| {
index.flush_keymap(conn)?;
db::clear_all_pending_index_ops(conn)?;
db::set_sidecar_dirty(conn, false)?;
Ok(())
})?;
Ok(())
}
#[cfg(feature = "hnsw")]
pub async fn compact_hnsw(&self) -> Result<(), MemoryError> {
if !self
.inner
.hnsw_index
.read()
.unwrap_or_else(|e| e.into_inner())
.needs_compaction()
{
tracing::info!("HNSW compaction not needed (deleted ratio below threshold)");
return Ok(());
}
self.rebuild_hnsw_index().await
}
pub async fn verify_integrity(
&self,
mode: db::VerifyMode,
) -> Result<db::IntegrityReport, MemoryError> {
let use_writer = mode == db::VerifyMode::Full;
let mut report = if use_writer {
self.with_write_conn(move |conn| db::verify_integrity_sync(conn, mode))
.await?
} else {
self.with_read_conn(move |conn| db::verify_integrity_sync(conn, mode))
.await?
};
#[cfg(feature = "hnsw")]
{
let embedding_count: i64 = if use_writer {
self.with_write_conn(|conn| {
Ok(conn.query_row(
"SELECT (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
(SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
(SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
(SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
[],
|row| row.get(0),
)?)
})
.await?
} else {
self.with_read_conn(|conn| {
Ok(conn.query_row(
"SELECT (SELECT COUNT(*) FROM facts WHERE embedding IS NOT NULL) +
(SELECT COUNT(*) FROM chunks WHERE embedding IS NOT NULL) +
(SELECT COUNT(*) FROM messages WHERE embedding IS NOT NULL) +
(SELECT COUNT(*) FROM episodes WHERE embedding IS NOT NULL)",
[],
|row| row.get(0),
)?)
})
.await?
};
if embedding_count > 0 && !self.inner.paths.hnsw_files_exist() {
report.issues.push(format!(
"HNSW sidecar files are missing while {} embedded rows exist in SQLite",
embedding_count
));
}
let keymap_count: i64 = if use_writer {
self.with_write_conn(|conn| {
Ok(conn
.query_row(
"SELECT COUNT(*) FROM hnsw_keymap WHERE deleted = 0",
[],
|row| row.get(0),
)
.unwrap_or(0))
})
.await?
} else {
self.with_read_conn(|conn| {
Ok(conn
.query_row(
"SELECT COUNT(*) FROM hnsw_keymap WHERE deleted = 0",
[],
|row| row.get(0),
)
.unwrap_or(0))
})
.await?
};
if keymap_count != embedding_count {
report.issues.push(format!(
"HNSW keymap drift: {} active keymap rows vs {} embedded SQLite rows",
keymap_count, embedding_count
));
}
}
report.ok = report.issues.is_empty();
Ok(report)
}
pub async fn reconcile(
&self,
action: db::ReconcileAction,
) -> Result<db::IntegrityReport, MemoryError> {
match action {
db::ReconcileAction::ReportOnly => self.verify_integrity(db::VerifyMode::Full).await,
db::ReconcileAction::RebuildFts => {
self.with_write_conn(db::reconcile_fts).await?;
#[cfg(feature = "hnsw")]
self.sync_pending_hnsw_ops_best_effort("reconcile_rebuild_fts")
.await;
self.verify_integrity(db::VerifyMode::Full).await
}
db::ReconcileAction::ReEmbed => {
self.reembed_all().await?;
self.verify_integrity(db::VerifyMode::Full).await
}
}
}
pub fn config(&self) -> &MemoryConfig {
&self.inner.config
}
pub fn graph_view(&self) -> Arc<dyn GraphView> {
graph::graph_view(self.inner.clone())
}
pub async fn search(
&self,
query: &str,
top_k: Option<usize>,
namespaces: Option<&[&str]>,
source_types: Option<&[SearchSourceType]>,
) -> Result<Vec<SearchResult>, MemoryError> {
let k = top_k.unwrap_or(self.inner.config.search.default_top_k);
let query_embedding = self.embed_text_internal(query).await?;
#[cfg(feature = "hnsw")]
let hnsw_hits = {
let candidates = self.inner.config.search.candidate_pool_size.max(k * 3);
self.hnsw_search_blocking(query_embedding.clone(), candidates)
.await
};
let q = query.to_string();
let config = self.inner.config.search.clone();
let ns_owned = to_owned_string_vec(namespaces);
let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|s| s.to_vec());
#[cfg(feature = "hnsw")]
let hnsw_hits_owned = hnsw_hits;
self.with_read_conn(move |conn| {
if db::is_embeddings_dirty(conn)? {
tracing::warn!(
"Embeddings are stale after model change — search quality is degraded. \
Call reembed_all() to regenerate embeddings."
);
}
let ns_refs = as_str_slice(&ns_owned);
let ns_slice: Option<&[&str]> = ns_refs.as_deref();
let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
#[cfg(feature = "hnsw")]
{
if hnsw_hits_owned.is_empty() {
search::hybrid_search(
conn,
&q,
&query_embedding,
&config,
k,
ns_slice,
st_slice,
None,
)
} else {
search::hybrid_search_with_hnsw(
conn,
&q,
&query_embedding,
&config,
k,
ns_slice,
st_slice,
None,
&hnsw_hits_owned,
)
}
}
#[cfg(not(feature = "hnsw"))]
{
search::hybrid_search(
conn,
&q,
&query_embedding,
&config,
k,
ns_slice,
st_slice,
None,
)
}
})
.await
}
pub async fn search_fts_only(
&self,
query: &str,
top_k: Option<usize>,
namespaces: Option<&[&str]>,
source_types: Option<&[SearchSourceType]>,
) -> Result<Vec<SearchResult>, MemoryError> {
let k = top_k.unwrap_or(self.inner.config.search.default_top_k);
let q = query.to_string();
let config = self.inner.config.search.clone();
let ns_owned = to_owned_string_vec(namespaces);
let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|s| s.to_vec());
self.with_read_conn(move |conn| {
let ns_refs = as_str_slice(&ns_owned);
let ns_slice: Option<&[&str]> = ns_refs.as_deref();
let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
search::fts_only_search(conn, &q, &config, k, ns_slice, st_slice, None)
})
.await
}
pub async fn search_vector_only(
&self,
query: &str,
top_k: Option<usize>,
namespaces: Option<&[&str]>,
source_types: Option<&[SearchSourceType]>,
) -> Result<Vec<SearchResult>, MemoryError> {
let k = top_k.unwrap_or(self.inner.config.search.default_top_k);
let query_embedding = self.embed_text_internal(query).await?;
#[cfg(feature = "hnsw")]
let hnsw_hits = {
let candidates = self.inner.config.search.candidate_pool_size.max(k * 3);
self.hnsw_search_blocking(query_embedding.clone(), candidates)
.await
};
let config = self.inner.config.search.clone();
let ns_owned = to_owned_string_vec(namespaces);
let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|s| s.to_vec());
#[cfg(feature = "hnsw")]
let hnsw_hits_owned = hnsw_hits;
self.with_read_conn(move |conn| {
if db::is_embeddings_dirty(conn)? {
tracing::warn!(
"Embeddings are stale after model change — search quality is degraded. \
Call reembed_all() to regenerate embeddings."
);
}
let ns_refs = as_str_slice(&ns_owned);
let ns_slice: Option<&[&str]> = ns_refs.as_deref();
let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
#[cfg(feature = "hnsw")]
{
if hnsw_hits_owned.is_empty() {
search::vector_only_search(
conn,
&query_embedding,
&config,
k,
ns_slice,
st_slice,
None,
)
} else {
search::vector_only_search_with_hnsw(
conn,
&query_embedding,
&config,
k,
ns_slice,
st_slice,
None,
&hnsw_hits_owned,
)
}
}
#[cfg(not(feature = "hnsw"))]
{
search::vector_only_search(
conn,
&query_embedding,
&config,
k,
ns_slice,
st_slice,
None,
)
}
})
.await
}
pub async fn search_explained(
&self,
query: &str,
top_k: Option<usize>,
namespaces: Option<&[&str]>,
source_types: Option<&[SearchSourceType]>,
) -> Result<Vec<types::ExplainedResult>, MemoryError> {
let k = top_k.unwrap_or(self.inner.config.search.default_top_k);
let query_embedding = self.embed_text_internal(query).await?;
#[cfg(feature = "hnsw")]
let hnsw_hits = {
let candidates = self.inner.config.search.candidate_pool_size.max(k * 3);
self.hnsw_search_blocking(query_embedding.clone(), candidates)
.await
};
let q = query.to_string();
let config = self.inner.config.search.clone();
let ns_owned = to_owned_string_vec(namespaces);
let st_owned: Option<Vec<SearchSourceType>> = source_types.map(|value| value.to_vec());
#[cfg(feature = "hnsw")]
let hnsw_hits_owned = hnsw_hits;
self.with_read_conn(move |conn| {
let ns_refs = as_str_slice(&ns_owned);
let ns_slice: Option<&[&str]> = ns_refs.as_deref();
let st_slice: Option<&[SearchSourceType]> = st_owned.as_deref();
#[cfg(feature = "hnsw")]
{
if hnsw_hits_owned.is_empty() {
search::hybrid_search_detailed(
conn,
&q,
&query_embedding,
&config,
k,
ns_slice,
st_slice,
None,
)
} else {
search::hybrid_search_with_hnsw_detailed(
conn,
&q,
&query_embedding,
&config,
k,
ns_slice,
st_slice,
None,
&hnsw_hits_owned,
)
}
}
#[cfg(not(feature = "hnsw"))]
{
search::hybrid_search_detailed(
conn,
&q,
&query_embedding,
&config,
k,
ns_slice,
st_slice,
None,
)
}
})
.await
}
pub async fn embedding_displacement(
&self,
text_a: &str,
text_b: &str,
) -> Result<types::EmbeddingDisplacement, MemoryError> {
let emb_a = self.embed_text_internal(text_a).await?;
let emb_b = self.embed_text_internal(text_b).await?;
Self::embedding_displacement_from_vecs(&emb_a, &emb_b)
}
pub fn embedding_displacement_from_vecs(
a: &[f32],
b: &[f32],
) -> Result<types::EmbeddingDisplacement, MemoryError> {
if a.len() != b.len() {
return Err(MemoryError::DimensionMismatch {
expected: a.len(),
actual: b.len(),
});
}
let cosine_sim = search::cosine_similarity(a, b);
let euclidean_dist: f32 = a
.iter()
.zip(b.iter())
.map(|(x, y)| (x - y) * (x - y))
.sum::<f32>()
.sqrt();
let mag_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let mag_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
Ok(types::EmbeddingDisplacement {
cosine_similarity: cosine_sim,
euclidean_distance: euclidean_dist,
magnitude_a: mag_a,
magnitude_b: mag_b,
})
}
pub fn chunk_text(&self, text: &str) -> Vec<TextChunk> {
chunker::chunk_text(
text,
&self.inner.config.chunking,
self.inner.token_counter.as_ref(),
)
}
pub async fn embed(&self, text: &str) -> Result<Vec<f32>, MemoryError> {
self.embed_text_internal(text).await
}
pub async fn embed_batch(&self, texts: &[&str]) -> Result<Vec<Vec<f32>>, MemoryError> {
let owned: Vec<String> = texts.iter().map(|s| s.to_string()).collect();
self.embed_batch_internal(owned).await
}
pub async fn stats(&self) -> Result<MemoryStats, MemoryError> {
let db_path = self.inner.paths.sqlite_path.clone();
self.with_read_conn(move |conn| {
let total_facts: u64 =
conn.query_row("SELECT COUNT(*) FROM facts", [], |r| r.get(0))?;
let total_documents: u64 =
conn.query_row("SELECT COUNT(*) FROM documents", [], |r| r.get(0))?;
let total_chunks: u64 =
conn.query_row("SELECT COUNT(*) FROM chunks", [], |r| r.get(0))?;
let total_sessions: u64 =
conn.query_row("SELECT COUNT(*) FROM sessions", [], |r| r.get(0))?;
let total_messages: u64 =
conn.query_row("SELECT COUNT(*) FROM messages", [], |r| r.get(0))?;
let db_size = std::fs::metadata(&db_path).map(|m| m.len()).unwrap_or(0);
let (model, dims): (Option<String>, Option<usize>) = conn
.query_row(
"SELECT model_name, dimensions FROM embedding_metadata WHERE id = 1",
[],
|r| Ok((Some(r.get(0)?), Some(r.get(1)?))),
)
.unwrap_or((None, None));
Ok(MemoryStats {
total_facts,
total_documents,
total_chunks,
total_sessions,
total_messages,
database_size_bytes: db_size,
embedding_model: model,
embedding_dimensions: dims,
})
})
.await
}
pub async fn list_scope_domains(&self) -> Result<Vec<String>, MemoryError> {
self.with_read_conn(|conn| {
let mut stmt = conn.prepare(
"SELECT DISTINCT json_extract(metadata, '$.scope_domain') \
FROM documents \
WHERE json_extract(metadata, '$.scope_domain') IS NOT NULL",
)?;
let domains: Vec<String> = stmt
.query_map([], |row| row.get::<_, String>(0))?
.filter_map(|r| r.ok())
.collect();
Ok(domains)
})
.await
}
pub async fn embeddings_are_dirty(&self) -> Result<bool, MemoryError> {
self.with_read_conn(db::is_embeddings_dirty).await
}
pub async fn reembed_all(&self) -> Result<usize, MemoryError> {
let mut count = 0usize;
let batch_size = self.inner.config.embedding.batch_size;
let dims = self.inner.config.embedding.dimensions;
let fact_contents: Vec<(String, String)> = self
.with_read_conn(|conn| {
let mut stmt = conn.prepare("SELECT id, content FROM facts")?;
let result = stmt
.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<Result<Vec<_>, _>>()?;
Ok(result)
})
.await?;
let mut fact_count = 0usize;
for batch in fact_contents.chunks(batch_size) {
let texts: Vec<String> = batch.iter().map(|(_, c)| c.clone()).collect();
let embeddings = self.embed_batch_internal(texts).await?;
for embedding in &embeddings {
self.validate_embedding_dimensions(embedding)?;
}
let quantizer = Quantizer::new(dims);
let updates: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = batch
.iter()
.zip(embeddings.iter())
.map(|((id, _), emb)| {
let q8 = quantizer
.quantize(emb)
.map(|qv| quantize::pack_quantized(&qv))
.ok();
(id.clone(), db::embedding_to_bytes(emb), q8)
})
.collect();
self.with_write_conn(move |conn| {
db::with_transaction(conn, |tx| {
for (fid, bytes, q8) in &updates {
tx.execute(
"UPDATE facts SET embedding = ?1, embedding_q8 = ?2, updated_at = datetime('now') WHERE id = ?3",
rusqlite::params![bytes, q8.as_deref(), fid],
)?;
#[cfg(feature = "hnsw")]
db::queue_pending_index_op(
tx,
&format!("fact:{fid}"),
"fact",
db::IndexOpKind::Upsert,
)?;
}
Ok(())
})
})
.await?;
fact_count += batch.len();
count += batch.len();
if fact_count % 100 == 0 || fact_count == count {
tracing::info!(fact_count, "Re-embedded {} facts so far", fact_count);
}
}
let chunk_data: Vec<(String, String)> = self
.with_read_conn(|conn| {
let mut stmt = conn.prepare("SELECT id, content FROM chunks")?;
let result = stmt
.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<Result<Vec<_>, _>>()?;
Ok(result)
})
.await?;
let mut chunk_count = 0usize;
for batch in chunk_data.chunks(batch_size) {
let texts: Vec<String> = batch.iter().map(|(_, c)| c.clone()).collect();
let embeddings = self.embed_batch_internal(texts).await?;
for embedding in &embeddings {
self.validate_embedding_dimensions(embedding)?;
}
let quantizer = Quantizer::new(dims);
let updates: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = batch
.iter()
.zip(embeddings.iter())
.map(|((id, _), emb)| {
let q8 = quantizer
.quantize(emb)
.map(|qv| quantize::pack_quantized(&qv))
.ok();
(id.clone(), db::embedding_to_bytes(emb), q8)
})
.collect();
self.with_write_conn(move |conn| {
db::with_transaction(conn, |tx| {
for (cid, bytes, q8) in &updates {
tx.execute(
"UPDATE chunks SET embedding = ?1, embedding_q8 = ?2 WHERE id = ?3",
rusqlite::params![bytes, q8.as_deref(), cid],
)?;
#[cfg(feature = "hnsw")]
db::queue_pending_index_op(
tx,
&format!("chunk:{cid}"),
"chunk",
db::IndexOpKind::Upsert,
)?;
}
Ok(())
})
})
.await?;
chunk_count += batch.len();
count += batch.len();
if chunk_count % 100 == 0 {
tracing::info!(chunk_count, "Re-embedded {} chunks so far", chunk_count);
}
}
let message_data: Vec<(i64, String)> = self
.with_read_conn(|conn| {
let mut stmt = conn.prepare("SELECT id, content FROM messages")?;
let result = stmt
.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<Result<Vec<_>, _>>()?;
Ok(result)
})
.await?;
let mut msg_count = 0usize;
for batch in message_data.chunks(batch_size) {
let texts: Vec<String> = batch.iter().map(|(_, c)| c.clone()).collect();
let embeddings = self.embed_batch_internal(texts).await?;
for embedding in &embeddings {
self.validate_embedding_dimensions(embedding)?;
}
let quantizer = Quantizer::new(dims);
let updates: Vec<(i64, Vec<u8>, Option<Vec<u8>>)> = batch
.iter()
.zip(embeddings.iter())
.map(|((id, _), emb)| {
let q8 = quantizer
.quantize(emb)
.map(|qv| quantize::pack_quantized(&qv))
.ok();
(*id, db::embedding_to_bytes(emb), q8)
})
.collect();
self.with_write_conn(move |conn| {
db::with_transaction(conn, |tx| {
for (mid, bytes, q8) in &updates {
tx.execute(
"UPDATE messages SET embedding = ?1, embedding_q8 = ?2 WHERE id = ?3",
rusqlite::params![bytes, q8.as_deref(), mid],
)?;
#[cfg(feature = "hnsw")]
db::queue_pending_index_op(
tx,
&format!("msg:{mid}"),
"message",
db::IndexOpKind::Upsert,
)?;
}
Ok(())
})
})
.await?;
msg_count += batch.len();
count += batch.len();
if msg_count % 100 == 0 {
tracing::info!(msg_count, "Re-embedded {} messages so far", msg_count);
}
}
let episode_data: Vec<(String, String)> = self
.with_read_conn(|conn| {
let mut stmt = conn.prepare("SELECT episode_id, search_text FROM episodes")?;
let result = stmt
.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<Result<Vec<_>, _>>()?;
Ok(result)
})
.await?;
let mut episode_count = 0usize;
for batch in episode_data.chunks(batch_size) {
let texts: Vec<String> = batch.iter().map(|(_, text)| text.clone()).collect();
let embeddings = self.embed_batch_internal(texts).await?;
for embedding in &embeddings {
self.validate_embedding_dimensions(embedding)?;
}
let quantizer = Quantizer::new(dims);
let updates: Vec<(String, Vec<u8>, Option<Vec<u8>>)> = batch
.iter()
.zip(embeddings.iter())
.map(|((episode_id, _), embedding)| {
let q8 = quantizer
.quantize(embedding)
.map(|vector| quantize::pack_quantized(&vector))
.ok();
(episode_id.clone(), db::embedding_to_bytes(embedding), q8)
})
.collect();
self.with_write_conn(move |conn| {
db::with_transaction(conn, |tx| {
for (episode_id, bytes, q8) in &updates {
tx.execute(
"UPDATE episodes
SET embedding = ?1,
embedding_q8 = ?2,
updated_at = datetime('now')
WHERE episode_id = ?3",
rusqlite::params![bytes, q8.as_deref(), episode_id],
)?;
#[cfg(feature = "hnsw")]
db::queue_pending_index_op(
tx,
&episodes::episode_item_key(episode_id),
"episode",
db::IndexOpKind::Upsert,
)?;
}
Ok(())
})
})
.await?;
episode_count += batch.len();
count += batch.len();
if episode_count % 100 == 0 {
tracing::info!(
episode_count,
"Re-embedded {} episodes so far",
episode_count
);
}
}
self.with_write_conn(db::clear_embeddings_dirty).await?;
tracing::info!(
facts = fact_count,
chunks = chunk_count,
messages = msg_count,
episodes = episode_count,
total = count,
"Re-embedding complete"
);
#[cfg(feature = "hnsw")]
{
tracing::info!("Rebuilding HNSW index after re-embedding...");
self.rebuild_hnsw_index().await?;
}
Ok(count)
}
pub async fn vacuum(&self) -> Result<(), MemoryError> {
self.with_write_conn(|conn| {
conn.execute_batch("VACUUM")?;
Ok(())
})
.await
}
#[deprecated(
since = "0.5.0",
note = "Legacy V10 import envelope path is compatibility-only. Use `import_projection_batch()` and `ProjectionImportBatchV3` on the canonical lane."
)]
#[doc(hidden)]
#[allow(deprecated)]
pub async fn import_envelope(
&self,
envelope: &projection_import::ImportEnvelope,
) -> Result<projection_import::ImportReceipt, MemoryError> {
projection_legacy_compat::import_envelope(self, envelope).await
}
#[deprecated(
since = "0.5.0",
note = "Legacy V10 import envelope status reads are compatibility-only. Prefer the projection import log."
)]
#[doc(hidden)]
#[allow(deprecated)]
pub async fn import_status(
&self,
envelope_id: &projection_import::EnvelopeId,
) -> Result<Vec<projection_import::ImportReceipt>, MemoryError> {
projection_legacy_compat::import_status(self, envelope_id).await
}
#[deprecated(
since = "0.5.0",
note = "Legacy V10 import log access is compatibility-only. Prefer new projection-import metadata."
)]
#[doc(hidden)]
#[allow(deprecated)]
pub async fn list_imports(
&self,
namespace: Option<&str>,
limit: usize,
) -> Result<Vec<projection_import::ImportReceipt>, MemoryError> {
projection_legacy_compat::list_imports(self, namespace, limit).await
}
#[allow(deprecated)]
pub async fn last_import_at(&self, namespace: &str) -> Result<Option<String>, MemoryError> {
projection_legacy_compat::last_import_at(self, namespace).await
}
pub async fn query_claim_versions(
&self,
query: ProjectionQuery,
) -> Result<Vec<ProjectionClaimVersion>, MemoryError> {
self.with_read_conn(move |conn| projection_storage::query_claim_versions(conn, &query))
.await
}
pub async fn query_relation_versions(
&self,
query: ProjectionQuery,
) -> Result<Vec<ProjectionRelationVersion>, MemoryError> {
self.with_read_conn(move |conn| projection_storage::query_relation_versions(conn, &query))
.await
}
pub async fn query_episodes(
&self,
query: ProjectionQuery,
) -> Result<Vec<ProjectionEpisode>, MemoryError> {
self.with_read_conn(move |conn| projection_storage::query_episode_rows(conn, &query))
.await
}
pub async fn query_entity_aliases(
&self,
query: ProjectionQuery,
) -> Result<Vec<ProjectionEntityAlias>, MemoryError> {
self.with_read_conn(move |conn| projection_storage::query_entity_aliases(conn, &query))
.await
}
pub async fn query_evidence_refs(
&self,
query: ProjectionQuery,
) -> Result<Vec<ProjectionEvidenceRef>, MemoryError> {
self.with_read_conn(move |conn| projection_storage::query_evidence_refs(conn, &query))
.await
}
#[cfg(any(test, feature = "testing"))]
pub async fn raw_execute(&self, sql: &str, params: Vec<String>) -> Result<usize, MemoryError> {
let sql = sql.to_string();
self.with_write_conn(move |conn| {
let param_refs: Vec<&dyn rusqlite::types::ToSql> = params
.iter()
.map(|s| s as &dyn rusqlite::types::ToSql)
.collect();
Ok(conn.execute(&sql, &*param_refs)?)
})
.await
}
}