mod algorithms;
mod corrections;
mod cross_session;
mod graph;
pub(crate) mod importance;
pub mod persona;
mod recall;
mod summarization;
pub mod trajectory;
pub mod tree_consolidation;
pub(crate) mod write_buffer;
#[cfg(test)]
mod tests;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicU64;
use std::time::Instant;
use tokio::sync::RwLock;
use zeph_llm::any::AnyProvider;
use zeph_llm::provider::LlmProvider as _;
use crate::admission::AdmissionControl;
use crate::embedding_store::EmbeddingStore;
use crate::error::MemoryError;
use crate::store::SqliteStore;
use crate::token_counter::TokenCounter;
pub(crate) const SESSION_SUMMARIES_COLLECTION: &str = "zeph_session_summaries";
pub(crate) const KEY_FACTS_COLLECTION: &str = "zeph_key_facts";
pub(crate) const CORRECTIONS_COLLECTION: &str = "zeph_corrections";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct BackfillProgress {
pub done: usize,
pub total: usize,
}
pub use algorithms::{apply_mmr, apply_temporal_decay};
pub use cross_session::SessionSummaryResult;
pub use graph::{
ExtractionResult, ExtractionStats, GraphExtractionConfig, LinkingStats, NoteLinkingConfig,
PostExtractValidator, extract_and_store, link_memory_notes,
};
pub use persona::{
PersonaExtractionConfig, contains_self_referential_language, extract_persona_facts,
};
pub use recall::{EmbedContext, RecalledMessage};
pub use summarization::{StructuredSummary, Summary, build_summarization_prompt};
pub use trajectory::{TrajectoryEntry, TrajectoryExtractionConfig, extract_trajectory_entries};
pub use tree_consolidation::{
TreeConsolidationConfig, TreeConsolidationResult, run_tree_consolidation_sweep,
start_tree_consolidation_loop,
};
pub use write_buffer::{BufferedWrite, WriteBuffer};
#[derive(Debug, Clone)]
pub(crate) struct CachedCentroid {
pub vector: Vec<f32>,
pub computed_at: Instant,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum TemporalDecay {
Enabled,
#[default]
Disabled,
}
impl TemporalDecay {
#[must_use]
#[inline]
pub fn is_enabled(self) -> bool {
self == Self::Enabled
}
}
impl From<bool> for TemporalDecay {
fn from(b: bool) -> Self {
if b { Self::Enabled } else { Self::Disabled }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum MmrReranking {
Enabled,
#[default]
Disabled,
}
impl MmrReranking {
#[must_use]
#[inline]
pub fn is_enabled(self) -> bool {
self == Self::Enabled
}
}
impl From<bool> for MmrReranking {
fn from(b: bool) -> Self {
if b { Self::Enabled } else { Self::Disabled }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ImportanceScoring {
Enabled,
#[default]
Disabled,
}
impl ImportanceScoring {
#[must_use]
#[inline]
pub fn is_enabled(self) -> bool {
self == Self::Enabled
}
}
impl From<bool> for ImportanceScoring {
fn from(b: bool) -> Self {
if b { Self::Enabled } else { Self::Disabled }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum QueryBiasCorrection {
#[default]
Enabled,
Disabled,
}
impl QueryBiasCorrection {
#[must_use]
#[inline]
pub fn is_enabled(self) -> bool {
self == Self::Enabled
}
}
impl From<bool> for QueryBiasCorrection {
fn from(b: bool) -> Self {
if b { Self::Enabled } else { Self::Disabled }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum HebbianReinforcement {
Enabled,
#[default]
Disabled,
}
impl HebbianReinforcement {
#[must_use]
#[inline]
pub fn is_enabled(self) -> bool {
self == Self::Enabled
}
}
impl From<bool> for HebbianReinforcement {
fn from(b: bool) -> Self {
if b { Self::Enabled } else { Self::Disabled }
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum QueryIntent {
FirstPerson,
Other,
}
#[derive(Debug, Clone, Default)]
pub struct HelaSpreadRuntime {
pub enabled: bool,
pub depth: u32,
pub max_visited: usize,
pub edge_types: Vec<crate::graph::EdgeType>,
pub step_budget: Option<std::time::Duration>,
}
pub struct SemanticMemory {
pub(crate) sqlite: SqliteStore,
pub(crate) qdrant: Option<Arc<EmbeddingStore>>,
pub(crate) provider: AnyProvider,
pub(crate) embed_provider: Option<AnyProvider>,
pub(crate) embedding_model: String,
pub(crate) vector_weight: f64,
pub(crate) keyword_weight: f64,
pub(crate) temporal_decay: TemporalDecay,
pub(crate) temporal_decay_half_life_days: u32,
pub(crate) mmr_reranking: MmrReranking,
pub(crate) mmr_lambda: f32,
pub(crate) importance_scoring: ImportanceScoring,
pub(crate) importance_weight: f64,
pub(crate) tier_boost_semantic: f64,
pub token_counter: Arc<TokenCounter>,
pub graph_store: Option<Arc<crate::graph::GraphStore>>,
pub experience: Option<Arc<crate::graph::experience::ExperienceStore>>,
pub reasoning: Option<Arc<crate::reasoning::ReasoningMemory>>,
pub(crate) community_detection_failures: Arc<AtomicU64>,
pub(crate) graph_extraction_count: Arc<AtomicU64>,
pub(crate) graph_extraction_failures: Arc<AtomicU64>,
pub(crate) last_qdrant_warn: Arc<AtomicU64>,
pub(crate) admission_control: Option<Arc<AdmissionControl>>,
pub(crate) quality_gate: Option<Arc<crate::quality_gate::QualityGate>>,
pub(crate) key_facts_dedup_threshold: f32,
pub(crate) embed_tasks: Mutex<tokio::task::JoinSet<()>>,
pub(crate) retrieval_depth: u32,
pub(crate) search_prompt_template: String,
pub(crate) depth_below_limit_warned: Arc<std::sync::atomic::AtomicBool>,
pub(crate) missing_placeholder_warned: Arc<std::sync::atomic::AtomicBool>,
pub(crate) query_bias_correction: QueryBiasCorrection,
pub(crate) query_bias_profile_weight: f32,
pub(crate) profile_centroid: RwLock<Option<CachedCentroid>>,
pub(crate) profile_centroid_ttl_secs: u64,
pub(crate) hebbian_reinforcement: HebbianReinforcement,
pub(crate) hebbian_lr: f32,
pub(crate) hebbian_spread: HelaSpreadRuntime,
}
impl SemanticMemory {
pub async fn new(
sqlite_path: &str,
qdrant_url: &str,
api_key: Option<&str>,
provider: AnyProvider,
embedding_model: &str,
) -> Result<Self, MemoryError> {
Self::with_weights(
sqlite_path,
qdrant_url,
api_key,
provider,
embedding_model,
0.7,
0.3,
)
.await
}
pub async fn with_weights(
sqlite_path: &str,
qdrant_url: &str,
api_key: Option<&str>,
provider: AnyProvider,
embedding_model: &str,
vector_weight: f64,
keyword_weight: f64,
) -> Result<Self, MemoryError> {
Self::with_weights_and_pool_size(
sqlite_path,
qdrant_url,
api_key,
provider,
embedding_model,
vector_weight,
keyword_weight,
5,
)
.await
}
#[allow(clippy::too_many_arguments)]
pub async fn with_weights_and_pool_size(
sqlite_path: &str,
qdrant_url: &str,
api_key: Option<&str>,
provider: AnyProvider,
embedding_model: &str,
vector_weight: f64,
keyword_weight: f64,
pool_size: u32,
) -> Result<Self, MemoryError> {
let sqlite = SqliteStore::with_pool_size(sqlite_path, pool_size).await?;
let pool = sqlite.pool().clone();
let qdrant = match EmbeddingStore::new(qdrant_url, api_key, pool) {
Ok(store) => Some(Arc::new(store)),
Err(e) => {
tracing::warn!("Qdrant unavailable, semantic search disabled: {e:#}");
None
}
};
Ok(Self {
sqlite,
qdrant,
provider,
embed_provider: None,
embedding_model: embedding_model.into(),
vector_weight,
keyword_weight,
temporal_decay: TemporalDecay::Disabled,
temporal_decay_half_life_days: 30,
mmr_reranking: MmrReranking::Disabled,
mmr_lambda: 0.7,
importance_scoring: ImportanceScoring::Disabled,
importance_weight: 0.15,
tier_boost_semantic: 1.3,
token_counter: Arc::new(TokenCounter::new()),
graph_store: None,
experience: None,
reasoning: None,
community_detection_failures: Arc::new(AtomicU64::new(0)),
graph_extraction_count: Arc::new(AtomicU64::new(0)),
graph_extraction_failures: Arc::new(AtomicU64::new(0)),
last_qdrant_warn: Arc::new(AtomicU64::new(0)),
admission_control: None,
quality_gate: None,
key_facts_dedup_threshold: 0.95,
embed_tasks: std::sync::Mutex::new(tokio::task::JoinSet::new()),
retrieval_depth: 0,
search_prompt_template: String::new(),
depth_below_limit_warned: Arc::new(std::sync::atomic::AtomicBool::new(false)),
missing_placeholder_warned: Arc::new(std::sync::atomic::AtomicBool::new(false)),
query_bias_correction: QueryBiasCorrection::Enabled,
query_bias_profile_weight: 0.25,
profile_centroid: RwLock::new(None),
profile_centroid_ttl_secs: 300,
hebbian_reinforcement: HebbianReinforcement::Disabled,
hebbian_lr: 0.1,
hebbian_spread: HelaSpreadRuntime::default(),
})
}
pub async fn with_qdrant_ops(
sqlite_path: &str,
ops: crate::QdrantOps,
provider: AnyProvider,
embedding_model: &str,
vector_weight: f64,
keyword_weight: f64,
pool_size: u32,
) -> Result<Self, MemoryError> {
let sqlite = SqliteStore::with_pool_size(sqlite_path, pool_size).await?;
let pool = sqlite.pool().clone();
let store = EmbeddingStore::with_store(Box::new(ops), pool);
Ok(Self {
sqlite,
qdrant: Some(Arc::new(store)),
provider,
embed_provider: None,
embedding_model: embedding_model.into(),
vector_weight,
keyword_weight,
temporal_decay: TemporalDecay::Disabled,
temporal_decay_half_life_days: 30,
mmr_reranking: MmrReranking::Disabled,
mmr_lambda: 0.7,
importance_scoring: ImportanceScoring::Disabled,
importance_weight: 0.15,
tier_boost_semantic: 1.3,
token_counter: Arc::new(TokenCounter::new()),
graph_store: None,
experience: None,
reasoning: None,
community_detection_failures: Arc::new(AtomicU64::new(0)),
graph_extraction_count: Arc::new(AtomicU64::new(0)),
graph_extraction_failures: Arc::new(AtomicU64::new(0)),
last_qdrant_warn: Arc::new(AtomicU64::new(0)),
admission_control: None,
quality_gate: None,
key_facts_dedup_threshold: 0.95,
embed_tasks: std::sync::Mutex::new(tokio::task::JoinSet::new()),
retrieval_depth: 0,
search_prompt_template: String::new(),
depth_below_limit_warned: Arc::new(std::sync::atomic::AtomicBool::new(false)),
missing_placeholder_warned: Arc::new(std::sync::atomic::AtomicBool::new(false)),
query_bias_correction: QueryBiasCorrection::Enabled,
query_bias_profile_weight: 0.25,
profile_centroid: RwLock::new(None),
profile_centroid_ttl_secs: 300,
hebbian_reinforcement: HebbianReinforcement::Disabled,
hebbian_lr: 0.1,
hebbian_spread: HelaSpreadRuntime::default(),
})
}
#[must_use]
pub fn with_graph_store(mut self, store: Arc<crate::graph::GraphStore>) -> Self {
self.graph_store = Some(store);
self
}
#[must_use]
pub fn with_experience_store(
mut self,
store: Arc<crate::graph::experience::ExperienceStore>,
) -> Self {
self.experience = Some(store);
self
}
#[must_use]
pub fn with_reasoning(mut self, store: Arc<crate::reasoning::ReasoningMemory>) -> Self {
self.reasoning = Some(store);
self
}
#[must_use]
pub fn community_detection_failures(&self) -> u64 {
use std::sync::atomic::Ordering;
self.community_detection_failures.load(Ordering::Relaxed)
}
#[must_use]
pub fn graph_extraction_count(&self) -> u64 {
use std::sync::atomic::Ordering;
self.graph_extraction_count.load(Ordering::Relaxed)
}
#[must_use]
pub fn graph_extraction_failures(&self) -> u64 {
use std::sync::atomic::Ordering;
self.graph_extraction_failures.load(Ordering::Relaxed)
}
#[must_use]
pub fn with_ranking_options(
mut self,
temporal_decay: TemporalDecay,
temporal_decay_half_life_days: u32,
mmr_reranking: MmrReranking,
mmr_lambda: f32,
) -> Self {
self.temporal_decay = temporal_decay;
self.temporal_decay_half_life_days = temporal_decay_half_life_days;
self.mmr_reranking = mmr_reranking;
self.mmr_lambda = mmr_lambda;
self
}
#[must_use]
pub fn with_importance_options(mut self, scoring: ImportanceScoring, weight: f64) -> Self {
self.importance_scoring = scoring;
self.importance_weight = weight;
self
}
#[must_use]
pub fn with_tier_boost(mut self, boost: f64) -> Self {
self.tier_boost_semantic = boost;
self
}
#[must_use]
pub fn with_admission_control(mut self, control: AdmissionControl) -> Self {
self.admission_control = Some(Arc::new(control));
self
}
#[must_use]
pub fn with_quality_gate(mut self, gate: Arc<crate::quality_gate::QualityGate>) -> Self {
self.quality_gate = Some(gate);
self
}
#[must_use]
pub fn with_key_facts_dedup_threshold(mut self, threshold: f32) -> Self {
self.key_facts_dedup_threshold = threshold;
self
}
#[must_use]
pub fn with_query_bias(
mut self,
correction: QueryBiasCorrection,
profile_weight: f32,
centroid_ttl_secs: u64,
) -> Self {
self.query_bias_correction = correction;
self.query_bias_profile_weight = profile_weight.clamp(0.0, 1.0);
self.profile_centroid_ttl_secs = centroid_ttl_secs;
self
}
#[must_use]
pub fn with_hebbian_spread(mut self, runtime: HelaSpreadRuntime) -> Self {
self.hebbian_spread = runtime;
self
}
#[must_use]
pub fn with_hebbian(mut self, reinforcement: HebbianReinforcement, lr: f32) -> Self {
let lr = lr.max(0.0);
if reinforcement.is_enabled() && lr == 0.0 {
tracing::warn!("hebbian enabled with lr=0.0 — no reinforcement will occur");
}
self.hebbian_reinforcement = reinforcement;
self.hebbian_lr = lr;
self
}
pub(crate) fn classify_query_intent(query: &str) -> QueryIntent {
if persona::contains_self_referential_language(query) {
QueryIntent::FirstPerson
} else {
QueryIntent::Other
}
}
#[tracing::instrument(name = "memory.query_bias.apply", skip(self, embedding), fields(query_len = query.len()))]
pub(crate) async fn apply_query_bias(&self, query: &str, embedding: Vec<f32>) -> Vec<f32> {
if !self.query_bias_correction.is_enabled() {
tracing::debug!(reason = "disabled", "query-bias: skipping");
return embedding;
}
if Self::classify_query_intent(query) != QueryIntent::FirstPerson {
tracing::debug!(reason = "not_first_person", "query-bias: skipping");
return embedding;
}
let Some(centroid) = self.profile_centroid_cached().await else {
tracing::debug!(reason = "no_centroid", "query-bias: skipping");
return embedding;
};
if centroid.len() != embedding.len() {
tracing::warn!(
centroid_dim = centroid.len(),
query_dim = embedding.len(),
reason = "dim_mismatch",
"query-bias: dimension mismatch between profile centroid and query embedding — skipping bias"
);
return embedding;
}
let w = self.query_bias_profile_weight;
tracing::debug!(
intent = "first_person",
centroid_dim = centroid.len(),
weight = w,
"query-bias: applying profile bias"
);
embedding
.iter()
.zip(centroid.iter())
.map(|(&q, &c)| (1.0 - w) * q + w * c)
.collect()
}
#[tracing::instrument(name = "memory.query_bias.centroid", skip(self))]
pub(crate) async fn profile_centroid_cached(&self) -> Option<Vec<f32>> {
{
let guard = self.profile_centroid.read().await;
if let Some(c) = &*guard
&& c.computed_at.elapsed().as_secs() < self.profile_centroid_ttl_secs
{
let ttl_remaining = self
.profile_centroid_ttl_secs
.saturating_sub(c.computed_at.elapsed().as_secs());
tracing::debug!(
centroid_dim = c.vector.len(),
ttl_remaining_secs = ttl_remaining,
"query-bias: centroid cache hit"
);
return Some(c.vector.clone());
}
}
let computed = self.compute_profile_centroid().await;
let mut guard = self.profile_centroid.write().await;
match computed {
Some(v) => {
tracing::debug!(centroid_dim = v.len(), "query-bias: centroid computed");
*guard = Some(CachedCentroid {
vector: v.clone(),
computed_at: Instant::now(),
});
Some(v)
}
None => {
guard.as_ref().map(|c| c.vector.clone())
}
}
}
async fn compute_profile_centroid(&self) -> Option<Vec<f32>> {
let facts = match self.sqlite.load_persona_facts(0.0).await {
Ok(f) => f,
Err(e) => {
tracing::warn!(error = %e, "query-bias: failed to load persona facts");
return None;
}
};
if facts.is_empty() {
return None;
}
let provider = self.effective_embed_provider();
let texts: Vec<String> = facts.iter().map(|f| f.content.clone()).collect();
let mut embeddings: Vec<Vec<f32>> = Vec::with_capacity(texts.len());
for text in &texts {
match provider.embed(text).await {
Ok(v) => embeddings.push(v),
Err(e) => {
tracing::warn!(error = %e, "query-bias: failed to embed persona fact — skipping");
}
}
}
if embeddings.is_empty() {
return None;
}
let dim = embeddings[0].len();
let mut centroid = vec![0.0f32; dim];
for emb in &embeddings {
if emb.len() != dim {
tracing::warn!(
expected = dim,
got = emb.len(),
"query-bias: persona embedding dimension mismatch — skipping fact"
);
continue;
}
for (c, &v) in centroid.iter_mut().zip(emb.iter()) {
*c += v;
}
}
#[allow(clippy::cast_precision_loss)]
let n = embeddings.len() as f32;
for c in &mut centroid {
*c /= n;
}
Some(centroid)
}
#[must_use]
pub fn with_retrieval_options(
mut self,
depth: u32,
search_prompt_template: impl Into<String>,
) -> Self {
self.retrieval_depth = depth;
self.search_prompt_template = search_prompt_template.into();
self
}
pub(crate) fn effective_depth(&self, limit: usize) -> usize {
use std::sync::atomic::Ordering;
let depth = self.retrieval_depth as usize;
if depth == 0 {
return limit.saturating_mul(2);
}
if depth < limit {
if !self.depth_below_limit_warned.swap(true, Ordering::Relaxed) {
tracing::warn!(
retrieval_depth = depth,
recall_limit = limit,
"memory.retrieval.depth < recall_limit; ANN pool cannot saturate top-k — consider raising depth"
);
}
} else if depth < limit.saturating_mul(2) {
tracing::info!(
retrieval_depth = depth,
recall_limit = limit,
legacy_default = limit.saturating_mul(2),
"memory.retrieval.depth is below legacy limit*2; ANN pool will be smaller than pre-#3340"
);
} else {
tracing::debug!(
retrieval_depth = depth,
recall_limit = limit,
"recall: using configured ANN depth"
);
}
depth
}
pub(crate) fn apply_search_prompt(&self, query: &str) -> String {
use std::sync::atomic::Ordering;
let template = &self.search_prompt_template;
if template.is_empty() {
return query.to_owned();
}
if !template.contains("{query}") {
if !self
.missing_placeholder_warned
.swap(true, Ordering::Relaxed)
{
tracing::warn!(
template = template.as_str(),
"memory.retrieval.search_prompt_template has no {{query}} placeholder — \
using raw query as-is"
);
}
return query.to_owned();
}
template.replace("{query}", query)
}
#[must_use]
pub fn with_embed_provider(mut self, embed_provider: AnyProvider) -> Self {
self.embed_provider = Some(embed_provider);
self
}
pub fn effective_embed_provider(&self) -> &AnyProvider {
self.embed_provider.as_ref().unwrap_or(&self.provider)
}
#[must_use]
pub fn from_parts(
sqlite: SqliteStore,
qdrant: Option<Arc<EmbeddingStore>>,
provider: AnyProvider,
embedding_model: impl Into<String>,
vector_weight: f64,
keyword_weight: f64,
token_counter: Arc<TokenCounter>,
) -> Self {
Self {
sqlite,
qdrant,
provider,
embed_provider: None,
embedding_model: embedding_model.into(),
vector_weight,
keyword_weight,
temporal_decay: TemporalDecay::Disabled,
temporal_decay_half_life_days: 30,
mmr_reranking: MmrReranking::Disabled,
mmr_lambda: 0.7,
importance_scoring: ImportanceScoring::Disabled,
importance_weight: 0.15,
tier_boost_semantic: 1.3,
token_counter,
graph_store: None,
experience: None,
reasoning: None,
community_detection_failures: Arc::new(AtomicU64::new(0)),
graph_extraction_count: Arc::new(AtomicU64::new(0)),
graph_extraction_failures: Arc::new(AtomicU64::new(0)),
last_qdrant_warn: Arc::new(AtomicU64::new(0)),
admission_control: None,
quality_gate: None,
key_facts_dedup_threshold: 0.95,
embed_tasks: std::sync::Mutex::new(tokio::task::JoinSet::new()),
retrieval_depth: 0,
search_prompt_template: String::new(),
depth_below_limit_warned: Arc::new(std::sync::atomic::AtomicBool::new(false)),
missing_placeholder_warned: Arc::new(std::sync::atomic::AtomicBool::new(false)),
query_bias_correction: QueryBiasCorrection::Enabled,
query_bias_profile_weight: 0.25,
profile_centroid: RwLock::new(None),
profile_centroid_ttl_secs: 300,
hebbian_reinforcement: HebbianReinforcement::Disabled,
hebbian_lr: 0.1,
hebbian_spread: HelaSpreadRuntime::default(),
}
}
pub async fn with_sqlite_backend(
sqlite_path: &str,
provider: AnyProvider,
embedding_model: &str,
vector_weight: f64,
keyword_weight: f64,
) -> Result<Self, MemoryError> {
Self::with_sqlite_backend_and_pool_size(
sqlite_path,
provider,
embedding_model,
vector_weight,
keyword_weight,
5,
)
.await
}
pub async fn with_sqlite_backend_and_pool_size(
sqlite_path: &str,
provider: AnyProvider,
embedding_model: &str,
vector_weight: f64,
keyword_weight: f64,
pool_size: u32,
) -> Result<Self, MemoryError> {
let sqlite = SqliteStore::with_pool_size(sqlite_path, pool_size).await?;
let pool = sqlite.pool().clone();
let store = EmbeddingStore::new_sqlite(pool);
Ok(Self {
sqlite,
qdrant: Some(Arc::new(store)),
provider,
embed_provider: None,
embedding_model: embedding_model.into(),
vector_weight,
keyword_weight,
temporal_decay: TemporalDecay::Disabled,
temporal_decay_half_life_days: 30,
mmr_reranking: MmrReranking::Disabled,
mmr_lambda: 0.7,
importance_scoring: ImportanceScoring::Disabled,
importance_weight: 0.15,
tier_boost_semantic: 1.3,
token_counter: Arc::new(TokenCounter::new()),
graph_store: None,
experience: None,
reasoning: None,
community_detection_failures: Arc::new(AtomicU64::new(0)),
graph_extraction_count: Arc::new(AtomicU64::new(0)),
graph_extraction_failures: Arc::new(AtomicU64::new(0)),
last_qdrant_warn: Arc::new(AtomicU64::new(0)),
admission_control: None,
quality_gate: None,
key_facts_dedup_threshold: 0.95,
embed_tasks: std::sync::Mutex::new(tokio::task::JoinSet::new()),
retrieval_depth: 0,
search_prompt_template: String::new(),
depth_below_limit_warned: Arc::new(std::sync::atomic::AtomicBool::new(false)),
missing_placeholder_warned: Arc::new(std::sync::atomic::AtomicBool::new(false)),
query_bias_correction: QueryBiasCorrection::Enabled,
query_bias_profile_weight: 0.25,
profile_centroid: RwLock::new(None),
profile_centroid_ttl_secs: 300,
hebbian_reinforcement: HebbianReinforcement::Disabled,
hebbian_lr: 0.1,
hebbian_spread: HelaSpreadRuntime::default(),
})
}
#[must_use]
pub fn sqlite(&self) -> &SqliteStore {
&self.sqlite
}
pub async fn is_vector_store_connected(&self) -> bool {
match self.qdrant.as_ref() {
Some(store) => store.health_check().await,
None => false,
}
}
#[must_use]
pub fn has_vector_store(&self) -> bool {
self.qdrant.is_some()
}
#[must_use]
pub fn embedding_store(&self) -> Option<&Arc<EmbeddingStore>> {
self.qdrant.as_ref()
}
pub fn provider(&self) -> &AnyProvider {
&self.provider
}
pub async fn message_count(
&self,
conversation_id: crate::types::ConversationId,
) -> Result<i64, MemoryError> {
self.sqlite.count_messages(conversation_id).await
}
pub async fn unsummarized_message_count(
&self,
conversation_id: crate::types::ConversationId,
) -> Result<i64, MemoryError> {
let after_id = self
.sqlite
.latest_summary_last_message_id(conversation_id)
.await?
.unwrap_or(crate::types::MessageId(0));
self.sqlite
.count_messages_after(conversation_id, after_id)
.await
}
pub async fn load_promotion_window(
&self,
max_items: usize,
) -> Result<Vec<crate::compression::promotion::PromotionInput>, MemoryError> {
use zeph_db::sql;
let limit = i64::try_from(max_items).unwrap_or(i64::MAX);
let rows: Vec<(
crate::types::MessageId,
crate::types::ConversationId,
String,
)> = zeph_db::query_as(sql!(
"SELECT id, conversation_id, content \
FROM messages \
WHERE deleted_at IS NULL \
ORDER BY id DESC \
LIMIT ?"
))
.bind(limit)
.fetch_all(self.sqlite.pool())
.await?;
let mut vectors = if let Some(qdrant) = &self.qdrant {
let ids: Vec<_> = rows.iter().map(|(id, _, _)| *id).collect();
let mut raw = qdrant.get_vectors_for_messages(&ids).await?;
let ref_dim = raw.values().next().map(Vec::len);
if let Some(ref_dim) = ref_dim {
let mismatched: Vec<_> = raw
.iter()
.filter(|(_, v)| v.len() != ref_dim)
.map(|(id, v)| (*id, v.len()))
.collect();
if !mismatched.is_empty() {
tracing::warn!(
expected_dim = ref_dim,
dropped_count = mismatched.len(),
"load_promotion_window: dimension mismatch — dropping mismatched vectors"
);
for (id, _) in mismatched {
raw.remove(&id);
}
}
}
raw
} else {
std::collections::HashMap::new()
};
Ok(rows
.into_iter()
.map(|(message_id, conversation_id, content)| {
crate::compression::promotion::PromotionInput {
message_id,
conversation_id,
content,
embedding: vectors.remove(&message_id),
}
})
.collect())
}
pub async fn retrieve_reasoning_strategies(
&self,
query: &str,
limit: usize,
) -> Result<Vec<crate::reasoning::ReasoningStrategy>, MemoryError> {
let Some(reasoning) = &self.reasoning else {
return Ok(Vec::new());
};
if !self.effective_embed_provider().supports_embeddings() {
return Ok(Vec::new());
}
let embedding = self.effective_embed_provider().embed(query).await?;
reasoning
.retrieve_by_embedding(&embedding, limit as u64)
.await
}
}