Skip to main content

zeph_memory/semantic/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! High-level semantic memory orchestrator.
5//!
6//! [`SemanticMemory`] is the primary entry point used by `zeph-core`.  It wires
7//! together [`crate::store::SqliteStore`] (relational persistence) and
8//! [`crate::embedding_store::EmbeddingStore`] (Qdrant vector index) into a single
9//! object with `remember` / `recall` / `summarize` operations.
10//!
11//! # Construction
12//!
13//! Use [`SemanticMemory::new`] for the default 0.7/0.3 vector/keyword weights, or
14//! [`SemanticMemory::with_qdrant_ops`] inside `AppBuilder` to share a single gRPC
15//! channel across all subsystems.
16//!
17//! # Hybrid recall
18//!
19//! Recall uses reciprocal-rank fusion of BM25 (`SQLite` FTS5) and cosine-similarity
20//! (Qdrant) results, with optional temporal decay, MMR diversity reranking, and
21//! per-tier score boosts.
22
23mod algorithms;
24mod corrections;
25mod cross_session;
26mod graph;
27pub(crate) mod importance;
28pub mod persona;
29mod recall;
30mod summarization;
31pub mod trajectory;
32pub mod tree_consolidation;
33pub(crate) mod write_buffer;
34
35#[cfg(test)]
36mod tests;
37
38use std::sync::Arc;
39use std::sync::Mutex;
40use std::sync::atomic::AtomicU64;
41
42use zeph_llm::any::AnyProvider;
43
44use crate::admission::AdmissionControl;
45use crate::embedding_store::EmbeddingStore;
46use crate::error::MemoryError;
47use crate::store::SqliteStore;
48use crate::token_counter::TokenCounter;
49
50pub(crate) const SESSION_SUMMARIES_COLLECTION: &str = "zeph_session_summaries";
51pub(crate) const KEY_FACTS_COLLECTION: &str = "zeph_key_facts";
52pub(crate) const CORRECTIONS_COLLECTION: &str = "zeph_corrections";
53
54/// Progress state for embed backfill.
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub struct BackfillProgress {
57    /// Number of messages processed so far (including failures).
58    pub done: usize,
59    /// Total number of unembedded messages at backfill start.
60    pub total: usize,
61}
62
63pub use algorithms::{apply_mmr, apply_temporal_decay};
64pub use cross_session::SessionSummaryResult;
65pub use graph::{
66    ExtractionResult, ExtractionStats, GraphExtractionConfig, LinkingStats, NoteLinkingConfig,
67    PostExtractValidator, extract_and_store, link_memory_notes,
68};
69pub use persona::{
70    PersonaExtractionConfig, contains_self_referential_language, extract_persona_facts,
71};
72pub use recall::{EmbedContext, RecalledMessage};
73pub use summarization::{StructuredSummary, Summary, build_summarization_prompt};
74pub use trajectory::{TrajectoryEntry, TrajectoryExtractionConfig, extract_trajectory_entries};
75pub use tree_consolidation::{
76    TreeConsolidationConfig, TreeConsolidationResult, run_tree_consolidation_sweep,
77    start_tree_consolidation_loop,
78};
79pub use write_buffer::{BufferedWrite, WriteBuffer};
80
81/// High-level semantic memory orchestrator combining `SQLite` and Qdrant.
82///
83/// Instantiate via [`SemanticMemory::new`] or the `AppBuilder` integration.
84/// All fields are `pub(crate)` — callers interact through the inherent method API.
85pub struct SemanticMemory {
86    pub(crate) sqlite: SqliteStore,
87    pub(crate) qdrant: Option<Arc<EmbeddingStore>>,
88    pub(crate) provider: AnyProvider,
89    /// Dedicated provider for batch embedding calls (backfill, write-path embedding).
90    ///
91    /// When `Some`, all embedding I/O is routed through this provider instead of `provider`.
92    /// This prevents `embed_backfill` from saturating the main provider and causing guardrail
93    /// timeouts. When `None`, falls back to `provider`.
94    pub(crate) embed_provider: Option<AnyProvider>,
95    pub(crate) embedding_model: String,
96    pub(crate) vector_weight: f64,
97    pub(crate) keyword_weight: f64,
98    pub(crate) temporal_decay_enabled: bool,
99    pub(crate) temporal_decay_half_life_days: u32,
100    pub(crate) mmr_enabled: bool,
101    pub(crate) mmr_lambda: f32,
102    pub(crate) importance_enabled: bool,
103    pub(crate) importance_weight: f64,
104    /// Multiplicative score boost for semantic-tier messages in recall ranking.
105    /// Default: `1.3`. Disabled when set to `1.0`.
106    pub(crate) tier_boost_semantic: f64,
107    pub token_counter: Arc<TokenCounter>,
108    pub graph_store: Option<Arc<crate::graph::GraphStore>>,
109    pub(crate) community_detection_failures: Arc<AtomicU64>,
110    pub(crate) graph_extraction_count: Arc<AtomicU64>,
111    pub(crate) graph_extraction_failures: Arc<AtomicU64>,
112    /// A-MAC admission control gate. When `Some`, each `remember()` call is evaluated.
113    pub(crate) admission_control: Option<Arc<AdmissionControl>>,
114    /// Cosine similarity threshold for skipping near-duplicate key facts (0.0–1.0).
115    /// When a new fact's nearest neighbour in `zeph_key_facts` has score >= this value,
116    /// the fact is considered a duplicate and not inserted.  Default: `0.95`.
117    pub(crate) key_facts_dedup_threshold: f32,
118    /// Bounded set of in-flight background embed tasks.
119    ///
120    /// Guarded by a `Mutex` because `SemanticMemory` is shared via `Arc` and
121    /// `JoinSet` requires `&mut self` for `spawn`. Capacity is capped at
122    /// `MAX_EMBED_BG_TASKS`; tasks that exceed the limit are dropped with a debug log.
123    pub(crate) embed_tasks: Mutex<tokio::task::JoinSet<()>>,
124}
125
126impl SemanticMemory {
127    /// Create a new `SemanticMemory` instance with default hybrid search weights (0.7/0.3).
128    ///
129    /// Qdrant connection is best-effort: if unavailable, semantic search is disabled.
130    ///
131    /// For `AppBuilder` bootstrap, prefer [`SemanticMemory::with_qdrant_ops`] to share
132    /// a single gRPC channel across all subsystems.
133    ///
134    /// # Errors
135    ///
136    /// Returns an error if `SQLite` cannot be initialized.
137    pub async fn new(
138        sqlite_path: &str,
139        qdrant_url: &str,
140        provider: AnyProvider,
141        embedding_model: &str,
142    ) -> Result<Self, MemoryError> {
143        Self::with_weights(sqlite_path, qdrant_url, provider, embedding_model, 0.7, 0.3).await
144    }
145
146    /// Create a new `SemanticMemory` with custom vector/keyword weights for hybrid search.
147    ///
148    /// For `AppBuilder` bootstrap, prefer [`SemanticMemory::with_qdrant_ops`] to share
149    /// a single gRPC channel across all subsystems.
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if `SQLite` cannot be initialized.
154    pub async fn with_weights(
155        sqlite_path: &str,
156        qdrant_url: &str,
157        provider: AnyProvider,
158        embedding_model: &str,
159        vector_weight: f64,
160        keyword_weight: f64,
161    ) -> Result<Self, MemoryError> {
162        Self::with_weights_and_pool_size(
163            sqlite_path,
164            qdrant_url,
165            provider,
166            embedding_model,
167            vector_weight,
168            keyword_weight,
169            5,
170        )
171        .await
172    }
173
174    /// Create a new `SemanticMemory` with custom weights and configurable pool size.
175    ///
176    /// For `AppBuilder` bootstrap, prefer [`SemanticMemory::with_qdrant_ops`] to share
177    /// a single gRPC channel across all subsystems.
178    ///
179    /// # Errors
180    ///
181    /// Returns an error if `SQLite` cannot be initialized.
182    pub async fn with_weights_and_pool_size(
183        sqlite_path: &str,
184        qdrant_url: &str,
185        provider: AnyProvider,
186        embedding_model: &str,
187        vector_weight: f64,
188        keyword_weight: f64,
189        pool_size: u32,
190    ) -> Result<Self, MemoryError> {
191        let sqlite = SqliteStore::with_pool_size(sqlite_path, pool_size).await?;
192        let pool = sqlite.pool().clone();
193
194        let qdrant = match EmbeddingStore::new(qdrant_url, pool) {
195            Ok(store) => Some(Arc::new(store)),
196            Err(e) => {
197                tracing::warn!("Qdrant unavailable, semantic search disabled: {e:#}");
198                None
199            }
200        };
201
202        Ok(Self {
203            sqlite,
204            qdrant,
205            provider,
206            embed_provider: None,
207            embedding_model: embedding_model.into(),
208            vector_weight,
209            keyword_weight,
210            temporal_decay_enabled: false,
211            temporal_decay_half_life_days: 30,
212            mmr_enabled: false,
213            mmr_lambda: 0.7,
214            importance_enabled: false,
215            importance_weight: 0.15,
216            tier_boost_semantic: 1.3,
217            token_counter: Arc::new(TokenCounter::new()),
218            graph_store: None,
219            community_detection_failures: Arc::new(AtomicU64::new(0)),
220            graph_extraction_count: Arc::new(AtomicU64::new(0)),
221            graph_extraction_failures: Arc::new(AtomicU64::new(0)),
222            admission_control: None,
223            key_facts_dedup_threshold: 0.95,
224            embed_tasks: std::sync::Mutex::new(tokio::task::JoinSet::new()),
225        })
226    }
227
228    /// Create a `SemanticMemory` from a pre-built `QdrantOps` instance.
229    ///
230    /// Use this at bootstrap to share one `QdrantOps` (and thus one gRPC channel)
231    /// across all subsystems. The `ops` is consumed and wrapped inside `EmbeddingStore`.
232    ///
233    /// # Errors
234    ///
235    /// Returns an error if `SQLite` cannot be initialized.
236    pub async fn with_qdrant_ops(
237        sqlite_path: &str,
238        ops: crate::QdrantOps,
239        provider: AnyProvider,
240        embedding_model: &str,
241        vector_weight: f64,
242        keyword_weight: f64,
243        pool_size: u32,
244    ) -> Result<Self, MemoryError> {
245        let sqlite = SqliteStore::with_pool_size(sqlite_path, pool_size).await?;
246        let pool = sqlite.pool().clone();
247        let store = EmbeddingStore::with_store(Box::new(ops), pool);
248
249        Ok(Self {
250            sqlite,
251            qdrant: Some(Arc::new(store)),
252            provider,
253            embed_provider: None,
254            embedding_model: embedding_model.into(),
255            vector_weight,
256            keyword_weight,
257            temporal_decay_enabled: false,
258            temporal_decay_half_life_days: 30,
259            mmr_enabled: false,
260            mmr_lambda: 0.7,
261            importance_enabled: false,
262            importance_weight: 0.15,
263            tier_boost_semantic: 1.3,
264            token_counter: Arc::new(TokenCounter::new()),
265            graph_store: None,
266            community_detection_failures: Arc::new(AtomicU64::new(0)),
267            graph_extraction_count: Arc::new(AtomicU64::new(0)),
268            graph_extraction_failures: Arc::new(AtomicU64::new(0)),
269            admission_control: None,
270            key_facts_dedup_threshold: 0.95,
271            embed_tasks: std::sync::Mutex::new(tokio::task::JoinSet::new()),
272        })
273    }
274
275    /// Attach a `GraphStore` for graph-aware retrieval.
276    ///
277    /// When set, `recall_graph` traverses the graph starting from entities
278    /// matched by the query.
279    #[must_use]
280    pub fn with_graph_store(mut self, store: Arc<crate::graph::GraphStore>) -> Self {
281        self.graph_store = Some(store);
282        self
283    }
284
285    /// Returns the cumulative count of community detection failures since startup.
286    #[must_use]
287    pub fn community_detection_failures(&self) -> u64 {
288        use std::sync::atomic::Ordering;
289        self.community_detection_failures.load(Ordering::Relaxed)
290    }
291
292    /// Returns the cumulative count of successful graph extractions since startup.
293    #[must_use]
294    pub fn graph_extraction_count(&self) -> u64 {
295        use std::sync::atomic::Ordering;
296        self.graph_extraction_count.load(Ordering::Relaxed)
297    }
298
299    /// Returns the cumulative count of failed graph extractions since startup.
300    #[must_use]
301    pub fn graph_extraction_failures(&self) -> u64 {
302        use std::sync::atomic::Ordering;
303        self.graph_extraction_failures.load(Ordering::Relaxed)
304    }
305
306    /// Configure temporal decay and MMR re-ranking options.
307    #[must_use]
308    pub fn with_ranking_options(
309        mut self,
310        temporal_decay_enabled: bool,
311        temporal_decay_half_life_days: u32,
312        mmr_enabled: bool,
313        mmr_lambda: f32,
314    ) -> Self {
315        self.temporal_decay_enabled = temporal_decay_enabled;
316        self.temporal_decay_half_life_days = temporal_decay_half_life_days;
317        self.mmr_enabled = mmr_enabled;
318        self.mmr_lambda = mmr_lambda;
319        self
320    }
321
322    /// Configure write-time importance scoring for memory retrieval.
323    #[must_use]
324    pub fn with_importance_options(mut self, enabled: bool, weight: f64) -> Self {
325        self.importance_enabled = enabled;
326        self.importance_weight = weight;
327        self
328    }
329
330    /// Configure the multiplicative score boost applied to semantic-tier messages during recall.
331    ///
332    /// Set to `1.0` to disable the boost. Default: `1.3`.
333    #[must_use]
334    pub fn with_tier_boost(mut self, boost: f64) -> Self {
335        self.tier_boost_semantic = boost;
336        self
337    }
338
339    /// Attach an A-MAC admission controller.
340    ///
341    /// When set, `remember()` and `remember_with_parts()` evaluate each message before persisting.
342    /// Messages below the admission threshold return `Ok(None)` without incrementing counts.
343    #[must_use]
344    pub fn with_admission_control(mut self, control: AdmissionControl) -> Self {
345        self.admission_control = Some(Arc::new(control));
346        self
347    }
348
349    /// Set the cosine similarity threshold used to skip near-duplicate key facts on insert.
350    ///
351    /// When a candidate fact's nearest neighbour in `zeph_key_facts` has a score ≥ this value,
352    /// the fact is not stored.  Default: `0.95`.
353    #[must_use]
354    pub fn with_key_facts_dedup_threshold(mut self, threshold: f32) -> Self {
355        self.key_facts_dedup_threshold = threshold;
356        self
357    }
358
359    /// Attach a dedicated embedding provider for write-path and backfill operations.
360    ///
361    /// When set, all batch embedding calls (backfill, `remember`) route through this provider
362    /// instead of the main `provider`. This prevents `embed_backfill` from saturating the main
363    /// provider and causing guardrail timeouts due to rate-limit contention or Ollama model-lock.
364    #[must_use]
365    pub fn with_embed_provider(mut self, embed_provider: AnyProvider) -> Self {
366        self.embed_provider = Some(embed_provider);
367        self
368    }
369
370    /// Returns the provider to use for embedding calls.
371    ///
372    /// Returns the dedicated embed provider when configured, falling back to the main provider.
373    pub(crate) fn effective_embed_provider(&self) -> &AnyProvider {
374        self.embed_provider.as_ref().unwrap_or(&self.provider)
375    }
376
377    /// Construct a `SemanticMemory` from pre-built parts.
378    ///
379    /// Intended for tests that need full control over the backing stores.
380    #[must_use]
381    pub fn from_parts(
382        sqlite: SqliteStore,
383        qdrant: Option<Arc<EmbeddingStore>>,
384        provider: AnyProvider,
385        embedding_model: impl Into<String>,
386        vector_weight: f64,
387        keyword_weight: f64,
388        token_counter: Arc<TokenCounter>,
389    ) -> Self {
390        Self {
391            sqlite,
392            qdrant,
393            provider,
394            embed_provider: None,
395            embedding_model: embedding_model.into(),
396            vector_weight,
397            keyword_weight,
398            temporal_decay_enabled: false,
399            temporal_decay_half_life_days: 30,
400            mmr_enabled: false,
401            mmr_lambda: 0.7,
402            importance_enabled: false,
403            importance_weight: 0.15,
404            tier_boost_semantic: 1.3,
405            token_counter,
406            graph_store: None,
407            community_detection_failures: Arc::new(AtomicU64::new(0)),
408            graph_extraction_count: Arc::new(AtomicU64::new(0)),
409            graph_extraction_failures: Arc::new(AtomicU64::new(0)),
410            admission_control: None,
411            key_facts_dedup_threshold: 0.95,
412            embed_tasks: std::sync::Mutex::new(tokio::task::JoinSet::new()),
413        }
414    }
415
416    /// Create a `SemanticMemory` using the `SQLite`-embedded vector backend.
417    ///
418    /// # Errors
419    ///
420    /// Returns an error if `SQLite` cannot be initialized.
421    pub async fn with_sqlite_backend(
422        sqlite_path: &str,
423        provider: AnyProvider,
424        embedding_model: &str,
425        vector_weight: f64,
426        keyword_weight: f64,
427    ) -> Result<Self, MemoryError> {
428        Self::with_sqlite_backend_and_pool_size(
429            sqlite_path,
430            provider,
431            embedding_model,
432            vector_weight,
433            keyword_weight,
434            5,
435        )
436        .await
437    }
438
439    /// Create a `SemanticMemory` using the `SQLite`-embedded vector backend with configurable pool size.
440    ///
441    /// # Errors
442    ///
443    /// Returns an error if `SQLite` cannot be initialized.
444    pub async fn with_sqlite_backend_and_pool_size(
445        sqlite_path: &str,
446        provider: AnyProvider,
447        embedding_model: &str,
448        vector_weight: f64,
449        keyword_weight: f64,
450        pool_size: u32,
451    ) -> Result<Self, MemoryError> {
452        let sqlite = SqliteStore::with_pool_size(sqlite_path, pool_size).await?;
453        let pool = sqlite.pool().clone();
454        let store = EmbeddingStore::new_sqlite(pool);
455
456        Ok(Self {
457            sqlite,
458            qdrant: Some(Arc::new(store)),
459            provider,
460            embed_provider: None,
461            embedding_model: embedding_model.into(),
462            vector_weight,
463            keyword_weight,
464            temporal_decay_enabled: false,
465            temporal_decay_half_life_days: 30,
466            mmr_enabled: false,
467            mmr_lambda: 0.7,
468            importance_enabled: false,
469            importance_weight: 0.15,
470            tier_boost_semantic: 1.3,
471            token_counter: Arc::new(TokenCounter::new()),
472            graph_store: None,
473            community_detection_failures: Arc::new(AtomicU64::new(0)),
474            graph_extraction_count: Arc::new(AtomicU64::new(0)),
475            graph_extraction_failures: Arc::new(AtomicU64::new(0)),
476            admission_control: None,
477            key_facts_dedup_threshold: 0.95,
478            embed_tasks: std::sync::Mutex::new(tokio::task::JoinSet::new()),
479        })
480    }
481
482    /// Access the underlying `SqliteStore` for operations that don't involve semantics.
483    #[must_use]
484    pub fn sqlite(&self) -> &SqliteStore {
485        &self.sqlite
486    }
487
488    /// Check if the vector store backend is reachable.
489    ///
490    /// Performs a real health check (Qdrant gRPC ping or `SQLite` query)
491    /// instead of just checking whether the client was created.
492    pub async fn is_vector_store_connected(&self) -> bool {
493        match self.qdrant.as_ref() {
494            Some(store) => store.health_check().await,
495            None => false,
496        }
497    }
498
499    /// Check if a vector store client is configured (may not be connected).
500    #[must_use]
501    pub fn has_vector_store(&self) -> bool {
502        self.qdrant.is_some()
503    }
504
505    /// Return a reference to the embedding store, if configured.
506    #[must_use]
507    pub fn embedding_store(&self) -> Option<&Arc<EmbeddingStore>> {
508        self.qdrant.as_ref()
509    }
510
511    /// Return a reference to the underlying LLM provider (used for RPE embedding).
512    pub fn provider(&self) -> &AnyProvider {
513        &self.provider
514    }
515
516    /// Count messages in a conversation.
517    ///
518    /// # Errors
519    ///
520    /// Returns an error if the query fails.
521    pub async fn message_count(
522        &self,
523        conversation_id: crate::types::ConversationId,
524    ) -> Result<i64, MemoryError> {
525        self.sqlite.count_messages(conversation_id).await
526    }
527
528    /// Count messages not yet covered by any summary.
529    ///
530    /// # Errors
531    ///
532    /// Returns an error if the query fails.
533    pub async fn unsummarized_message_count(
534        &self,
535        conversation_id: crate::types::ConversationId,
536    ) -> Result<i64, MemoryError> {
537        let after_id = self
538            .sqlite
539            .latest_summary_last_message_id(conversation_id)
540            .await?
541            .unwrap_or(crate::types::MessageId(0));
542        self.sqlite
543            .count_messages_after(conversation_id, after_id)
544            .await
545    }
546}