Skip to main content

zeph_memory/semantic/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4mod algorithms;
5mod corrections;
6mod cross_session;
7mod graph;
8pub(crate) mod importance;
9pub mod persona;
10mod recall;
11mod summarization;
12pub mod trajectory;
13pub mod tree_consolidation;
14pub(crate) mod write_buffer;
15
16#[cfg(test)]
17mod tests;
18
19use std::sync::Arc;
20use std::sync::atomic::AtomicU64;
21
22use zeph_llm::any::AnyProvider;
23
24use crate::admission::AdmissionControl;
25use crate::embedding_store::EmbeddingStore;
26use crate::error::MemoryError;
27use crate::store::SqliteStore;
28use crate::token_counter::TokenCounter;
29
30pub(crate) const SESSION_SUMMARIES_COLLECTION: &str = "zeph_session_summaries";
31pub(crate) const KEY_FACTS_COLLECTION: &str = "zeph_key_facts";
32pub(crate) const CORRECTIONS_COLLECTION: &str = "zeph_corrections";
33
34pub use algorithms::{apply_mmr, apply_temporal_decay};
35pub use cross_session::SessionSummaryResult;
36pub use graph::{
37    ExtractionResult, ExtractionStats, GraphExtractionConfig, LinkingStats, NoteLinkingConfig,
38    PostExtractValidator, extract_and_store, link_memory_notes,
39};
40pub use persona::{
41    PersonaExtractionConfig, contains_self_referential_language, extract_persona_facts,
42};
43pub use recall::{EmbedContext, RecalledMessage};
44pub use summarization::{StructuredSummary, Summary, build_summarization_prompt};
45pub use trajectory::{TrajectoryEntry, TrajectoryExtractionConfig, extract_trajectory_entries};
46pub use tree_consolidation::{
47    TreeConsolidationConfig, TreeConsolidationResult, run_tree_consolidation_sweep,
48    start_tree_consolidation_loop,
49};
50pub use write_buffer::{BufferedWrite, WriteBuffer};
51
52pub struct SemanticMemory {
53    pub(crate) sqlite: SqliteStore,
54    pub(crate) qdrant: Option<Arc<EmbeddingStore>>,
55    pub(crate) provider: AnyProvider,
56    /// Dedicated provider for batch embedding calls (backfill, write-path embedding).
57    ///
58    /// When `Some`, all embedding I/O is routed through this provider instead of `provider`.
59    /// This prevents `embed_backfill` from saturating the main provider and causing guardrail
60    /// timeouts. When `None`, falls back to `provider`.
61    pub(crate) embed_provider: Option<AnyProvider>,
62    pub(crate) embedding_model: String,
63    pub(crate) vector_weight: f64,
64    pub(crate) keyword_weight: f64,
65    pub(crate) temporal_decay_enabled: bool,
66    pub(crate) temporal_decay_half_life_days: u32,
67    pub(crate) mmr_enabled: bool,
68    pub(crate) mmr_lambda: f32,
69    pub(crate) importance_enabled: bool,
70    pub(crate) importance_weight: f64,
71    /// Multiplicative score boost for semantic-tier messages in recall ranking.
72    /// Default: `1.3`. Disabled when set to `1.0`.
73    pub(crate) tier_boost_semantic: f64,
74    pub token_counter: Arc<TokenCounter>,
75    pub graph_store: Option<Arc<crate::graph::GraphStore>>,
76    pub(crate) community_detection_failures: Arc<AtomicU64>,
77    pub(crate) graph_extraction_count: Arc<AtomicU64>,
78    pub(crate) graph_extraction_failures: Arc<AtomicU64>,
79    /// A-MAC admission control gate. When `Some`, each `remember()` call is evaluated.
80    pub(crate) admission_control: Option<Arc<AdmissionControl>>,
81    /// Cosine similarity threshold for skipping near-duplicate key facts (0.0–1.0).
82    /// When a new fact's nearest neighbour in `zeph_key_facts` has score >= this value,
83    /// the fact is considered a duplicate and not inserted.  Default: `0.95`.
84    pub(crate) key_facts_dedup_threshold: f32,
85}
86
87impl SemanticMemory {
88    /// Create a new `SemanticMemory` instance with default hybrid search weights (0.7/0.3).
89    ///
90    /// Qdrant connection is best-effort: if unavailable, semantic search is disabled.
91    ///
92    /// For `AppBuilder` bootstrap, prefer [`SemanticMemory::with_qdrant_ops`] to share
93    /// a single gRPC channel across all subsystems.
94    ///
95    /// # Errors
96    ///
97    /// Returns an error if `SQLite` cannot be initialized.
98    pub async fn new(
99        sqlite_path: &str,
100        qdrant_url: &str,
101        provider: AnyProvider,
102        embedding_model: &str,
103    ) -> Result<Self, MemoryError> {
104        Self::with_weights(sqlite_path, qdrant_url, provider, embedding_model, 0.7, 0.3).await
105    }
106
107    /// Create a new `SemanticMemory` with custom vector/keyword weights for hybrid search.
108    ///
109    /// For `AppBuilder` bootstrap, prefer [`SemanticMemory::with_qdrant_ops`] to share
110    /// a single gRPC channel across all subsystems.
111    ///
112    /// # Errors
113    ///
114    /// Returns an error if `SQLite` cannot be initialized.
115    pub async fn with_weights(
116        sqlite_path: &str,
117        qdrant_url: &str,
118        provider: AnyProvider,
119        embedding_model: &str,
120        vector_weight: f64,
121        keyword_weight: f64,
122    ) -> Result<Self, MemoryError> {
123        Self::with_weights_and_pool_size(
124            sqlite_path,
125            qdrant_url,
126            provider,
127            embedding_model,
128            vector_weight,
129            keyword_weight,
130            5,
131        )
132        .await
133    }
134
135    /// Create a new `SemanticMemory` with custom weights and configurable pool size.
136    ///
137    /// For `AppBuilder` bootstrap, prefer [`SemanticMemory::with_qdrant_ops`] to share
138    /// a single gRPC channel across all subsystems.
139    ///
140    /// # Errors
141    ///
142    /// Returns an error if `SQLite` cannot be initialized.
143    pub async fn with_weights_and_pool_size(
144        sqlite_path: &str,
145        qdrant_url: &str,
146        provider: AnyProvider,
147        embedding_model: &str,
148        vector_weight: f64,
149        keyword_weight: f64,
150        pool_size: u32,
151    ) -> Result<Self, MemoryError> {
152        let sqlite = SqliteStore::with_pool_size(sqlite_path, pool_size).await?;
153        let pool = sqlite.pool().clone();
154
155        let qdrant = match EmbeddingStore::new(qdrant_url, pool) {
156            Ok(store) => Some(Arc::new(store)),
157            Err(e) => {
158                tracing::warn!("Qdrant unavailable, semantic search disabled: {e:#}");
159                None
160            }
161        };
162
163        Ok(Self {
164            sqlite,
165            qdrant,
166            provider,
167            embed_provider: None,
168            embedding_model: embedding_model.into(),
169            vector_weight,
170            keyword_weight,
171            temporal_decay_enabled: false,
172            temporal_decay_half_life_days: 30,
173            mmr_enabled: false,
174            mmr_lambda: 0.7,
175            importance_enabled: false,
176            importance_weight: 0.15,
177            tier_boost_semantic: 1.3,
178            token_counter: Arc::new(TokenCounter::new()),
179            graph_store: None,
180            community_detection_failures: Arc::new(AtomicU64::new(0)),
181            graph_extraction_count: Arc::new(AtomicU64::new(0)),
182            graph_extraction_failures: Arc::new(AtomicU64::new(0)),
183            admission_control: None,
184            key_facts_dedup_threshold: 0.95,
185        })
186    }
187
188    /// Create a `SemanticMemory` from a pre-built `QdrantOps` instance.
189    ///
190    /// Use this at bootstrap to share one `QdrantOps` (and thus one gRPC channel)
191    /// across all subsystems. The `ops` is consumed and wrapped inside `EmbeddingStore`.
192    ///
193    /// # Errors
194    ///
195    /// Returns an error if `SQLite` cannot be initialized.
196    pub async fn with_qdrant_ops(
197        sqlite_path: &str,
198        ops: crate::QdrantOps,
199        provider: AnyProvider,
200        embedding_model: &str,
201        vector_weight: f64,
202        keyword_weight: f64,
203        pool_size: u32,
204    ) -> Result<Self, MemoryError> {
205        let sqlite = SqliteStore::with_pool_size(sqlite_path, pool_size).await?;
206        let pool = sqlite.pool().clone();
207        let store = EmbeddingStore::with_store(Box::new(ops), pool);
208
209        Ok(Self {
210            sqlite,
211            qdrant: Some(Arc::new(store)),
212            provider,
213            embed_provider: None,
214            embedding_model: embedding_model.into(),
215            vector_weight,
216            keyword_weight,
217            temporal_decay_enabled: false,
218            temporal_decay_half_life_days: 30,
219            mmr_enabled: false,
220            mmr_lambda: 0.7,
221            importance_enabled: false,
222            importance_weight: 0.15,
223            tier_boost_semantic: 1.3,
224            token_counter: Arc::new(TokenCounter::new()),
225            graph_store: None,
226            community_detection_failures: Arc::new(AtomicU64::new(0)),
227            graph_extraction_count: Arc::new(AtomicU64::new(0)),
228            graph_extraction_failures: Arc::new(AtomicU64::new(0)),
229            admission_control: None,
230            key_facts_dedup_threshold: 0.95,
231        })
232    }
233
234    /// Attach a `GraphStore` for graph-aware retrieval.
235    ///
236    /// When set, `recall_graph` traverses the graph starting from entities
237    /// matched by the query.
238    #[must_use]
239    pub fn with_graph_store(mut self, store: Arc<crate::graph::GraphStore>) -> Self {
240        self.graph_store = Some(store);
241        self
242    }
243
244    /// Returns the cumulative count of community detection failures since startup.
245    #[must_use]
246    pub fn community_detection_failures(&self) -> u64 {
247        use std::sync::atomic::Ordering;
248        self.community_detection_failures.load(Ordering::Relaxed)
249    }
250
251    /// Returns the cumulative count of successful graph extractions since startup.
252    #[must_use]
253    pub fn graph_extraction_count(&self) -> u64 {
254        use std::sync::atomic::Ordering;
255        self.graph_extraction_count.load(Ordering::Relaxed)
256    }
257
258    /// Returns the cumulative count of failed graph extractions since startup.
259    #[must_use]
260    pub fn graph_extraction_failures(&self) -> u64 {
261        use std::sync::atomic::Ordering;
262        self.graph_extraction_failures.load(Ordering::Relaxed)
263    }
264
265    /// Configure temporal decay and MMR re-ranking options.
266    #[must_use]
267    pub fn with_ranking_options(
268        mut self,
269        temporal_decay_enabled: bool,
270        temporal_decay_half_life_days: u32,
271        mmr_enabled: bool,
272        mmr_lambda: f32,
273    ) -> Self {
274        self.temporal_decay_enabled = temporal_decay_enabled;
275        self.temporal_decay_half_life_days = temporal_decay_half_life_days;
276        self.mmr_enabled = mmr_enabled;
277        self.mmr_lambda = mmr_lambda;
278        self
279    }
280
281    /// Configure write-time importance scoring for memory retrieval.
282    #[must_use]
283    pub fn with_importance_options(mut self, enabled: bool, weight: f64) -> Self {
284        self.importance_enabled = enabled;
285        self.importance_weight = weight;
286        self
287    }
288
289    /// Configure the multiplicative score boost applied to semantic-tier messages during recall.
290    ///
291    /// Set to `1.0` to disable the boost. Default: `1.3`.
292    #[must_use]
293    pub fn with_tier_boost(mut self, boost: f64) -> Self {
294        self.tier_boost_semantic = boost;
295        self
296    }
297
298    /// Attach an A-MAC admission controller.
299    ///
300    /// When set, `remember()` and `remember_with_parts()` evaluate each message before persisting.
301    /// Messages below the admission threshold return `Ok(None)` without incrementing counts.
302    #[must_use]
303    pub fn with_admission_control(mut self, control: AdmissionControl) -> Self {
304        self.admission_control = Some(Arc::new(control));
305        self
306    }
307
308    /// Set the cosine similarity threshold used to skip near-duplicate key facts on insert.
309    ///
310    /// When a candidate fact's nearest neighbour in `zeph_key_facts` has a score ≥ this value,
311    /// the fact is not stored.  Default: `0.95`.
312    #[must_use]
313    pub fn with_key_facts_dedup_threshold(mut self, threshold: f32) -> Self {
314        self.key_facts_dedup_threshold = threshold;
315        self
316    }
317
318    /// Attach a dedicated embedding provider for write-path and backfill operations.
319    ///
320    /// When set, all batch embedding calls (backfill, `remember`) route through this provider
321    /// instead of the main `provider`. This prevents `embed_backfill` from saturating the main
322    /// provider and causing guardrail timeouts due to rate-limit contention or Ollama model-lock.
323    #[must_use]
324    pub fn with_embed_provider(mut self, embed_provider: AnyProvider) -> Self {
325        self.embed_provider = Some(embed_provider);
326        self
327    }
328
329    /// Returns the provider to use for embedding calls.
330    ///
331    /// Returns the dedicated embed provider when configured, falling back to the main provider.
332    pub(crate) fn effective_embed_provider(&self) -> &AnyProvider {
333        self.embed_provider.as_ref().unwrap_or(&self.provider)
334    }
335
336    /// Construct a `SemanticMemory` from pre-built parts.
337    ///
338    /// Intended for tests that need full control over the backing stores.
339    #[must_use]
340    pub fn from_parts(
341        sqlite: SqliteStore,
342        qdrant: Option<Arc<EmbeddingStore>>,
343        provider: AnyProvider,
344        embedding_model: impl Into<String>,
345        vector_weight: f64,
346        keyword_weight: f64,
347        token_counter: Arc<TokenCounter>,
348    ) -> Self {
349        Self {
350            sqlite,
351            qdrant,
352            provider,
353            embed_provider: None,
354            embedding_model: embedding_model.into(),
355            vector_weight,
356            keyword_weight,
357            temporal_decay_enabled: false,
358            temporal_decay_half_life_days: 30,
359            mmr_enabled: false,
360            mmr_lambda: 0.7,
361            importance_enabled: false,
362            importance_weight: 0.15,
363            tier_boost_semantic: 1.3,
364            token_counter,
365            graph_store: None,
366            community_detection_failures: Arc::new(AtomicU64::new(0)),
367            graph_extraction_count: Arc::new(AtomicU64::new(0)),
368            graph_extraction_failures: Arc::new(AtomicU64::new(0)),
369            admission_control: None,
370            key_facts_dedup_threshold: 0.95,
371        }
372    }
373
374    /// Create a `SemanticMemory` using the `SQLite`-embedded vector backend.
375    ///
376    /// # Errors
377    ///
378    /// Returns an error if `SQLite` cannot be initialized.
379    pub async fn with_sqlite_backend(
380        sqlite_path: &str,
381        provider: AnyProvider,
382        embedding_model: &str,
383        vector_weight: f64,
384        keyword_weight: f64,
385    ) -> Result<Self, MemoryError> {
386        Self::with_sqlite_backend_and_pool_size(
387            sqlite_path,
388            provider,
389            embedding_model,
390            vector_weight,
391            keyword_weight,
392            5,
393        )
394        .await
395    }
396
397    /// Create a `SemanticMemory` using the `SQLite`-embedded vector backend with configurable pool size.
398    ///
399    /// # Errors
400    ///
401    /// Returns an error if `SQLite` cannot be initialized.
402    pub async fn with_sqlite_backend_and_pool_size(
403        sqlite_path: &str,
404        provider: AnyProvider,
405        embedding_model: &str,
406        vector_weight: f64,
407        keyword_weight: f64,
408        pool_size: u32,
409    ) -> Result<Self, MemoryError> {
410        let sqlite = SqliteStore::with_pool_size(sqlite_path, pool_size).await?;
411        let pool = sqlite.pool().clone();
412        let store = EmbeddingStore::new_sqlite(pool);
413
414        Ok(Self {
415            sqlite,
416            qdrant: Some(Arc::new(store)),
417            provider,
418            embed_provider: None,
419            embedding_model: embedding_model.into(),
420            vector_weight,
421            keyword_weight,
422            temporal_decay_enabled: false,
423            temporal_decay_half_life_days: 30,
424            mmr_enabled: false,
425            mmr_lambda: 0.7,
426            importance_enabled: false,
427            importance_weight: 0.15,
428            tier_boost_semantic: 1.3,
429            token_counter: Arc::new(TokenCounter::new()),
430            graph_store: None,
431            community_detection_failures: Arc::new(AtomicU64::new(0)),
432            graph_extraction_count: Arc::new(AtomicU64::new(0)),
433            graph_extraction_failures: Arc::new(AtomicU64::new(0)),
434            admission_control: None,
435            key_facts_dedup_threshold: 0.95,
436        })
437    }
438
439    /// Access the underlying `SqliteStore` for operations that don't involve semantics.
440    #[must_use]
441    pub fn sqlite(&self) -> &SqliteStore {
442        &self.sqlite
443    }
444
445    /// Check if the vector store backend is reachable.
446    ///
447    /// Performs a real health check (Qdrant gRPC ping or `SQLite` query)
448    /// instead of just checking whether the client was created.
449    pub async fn is_vector_store_connected(&self) -> bool {
450        match self.qdrant.as_ref() {
451            Some(store) => store.health_check().await,
452            None => false,
453        }
454    }
455
456    /// Check if a vector store client is configured (may not be connected).
457    #[must_use]
458    pub fn has_vector_store(&self) -> bool {
459        self.qdrant.is_some()
460    }
461
462    /// Return a reference to the embedding store, if configured.
463    #[must_use]
464    pub fn embedding_store(&self) -> Option<&Arc<EmbeddingStore>> {
465        self.qdrant.as_ref()
466    }
467
468    /// Return a reference to the underlying LLM provider (used for RPE embedding).
469    pub fn provider(&self) -> &AnyProvider {
470        &self.provider
471    }
472
473    /// Count messages in a conversation.
474    ///
475    /// # Errors
476    ///
477    /// Returns an error if the query fails.
478    pub async fn message_count(
479        &self,
480        conversation_id: crate::types::ConversationId,
481    ) -> Result<i64, MemoryError> {
482        self.sqlite.count_messages(conversation_id).await
483    }
484
485    /// Count messages not yet covered by any summary.
486    ///
487    /// # Errors
488    ///
489    /// Returns an error if the query fails.
490    pub async fn unsummarized_message_count(
491        &self,
492        conversation_id: crate::types::ConversationId,
493    ) -> Result<i64, MemoryError> {
494        let after_id = self
495            .sqlite
496            .latest_summary_last_message_id(conversation_id)
497            .await?
498            .unwrap_or(crate::types::MessageId(0));
499        self.sqlite
500            .count_messages_after(conversation_id, after_id)
501            .await
502    }
503}