Skip to main content

engram/
memory.rs

1//! `Memory` — the primary public API for the Engram memory layer.
2//!
3//! `Memory` composes a `FactStore`, `VectorStore`, `GraphStore`, and
4//! `EmbeddingProvider` into a single high-level interface. Callers interact
5//! with `Memory` rather than the lower-level trait objects directly.
6
7use crate::consolidation::{ConsolidationConfig, ConsolidationEngine, ConsolidationResult};
8use crate::context::{ContextBlock, ContextBuilder, ContextConfig};
9use crate::embedding::EmbeddingProvider;
10use crate::extract::{ExtractionConfig, Message};
11use crate::fact::{Entity, Fact, FactFilter, FactId, Relationship};
12use crate::graph::GraphStore;
13use crate::graph_postgres::PostgresGraphStore;
14use crate::graph_sqlite::SqliteGraphStore;
15use crate::llm::LlmClient;
16use crate::message::{ChatMessage, MessageId, MessageStore};
17use crate::message_postgres::PostgresMessageStore;
18use crate::message_sqlite::SqliteMessageStore;
19use crate::pipeline::ExtractionPipeline;
20use crate::scope::Scope;
21use crate::store::{FactStore, MemoryError, StoreStats};
22use crate::store_postgres::PostgresFactStore;
23use crate::store_sqlite::SqliteFactStore;
24use crate::vector::{VectorFilter, VectorStore};
25use crate::vector_embedded::EmbeddedVectorStore;
26use chrono::{DateTime, Utc};
27use std::collections::HashMap;
28use std::sync::Arc;
29
30// ---------------------------------------------------------------------------
31// RecallQuery
32// ---------------------------------------------------------------------------
33
34/// Parameters for a semantic recall operation.
35#[derive(Debug, Clone, Default)]
36pub struct RecallQuery {
37    /// The text to embed and search for semantically similar facts.
38    pub query: String,
39    /// Optional scope filter — only return facts within this scope.
40    pub scope: Option<Scope>,
41    /// Maximum number of results to return (default: 10).
42    pub max_results: usize,
43    /// Point-in-time filter — only return facts valid at this instant.
44    pub as_of: Option<DateTime<Utc>>,
45    /// Minimum cosine similarity score (0.0 – 1.0).
46    pub min_score: Option<f32>,
47}
48
49// ---------------------------------------------------------------------------
50// Memory
51// ---------------------------------------------------------------------------
52
53/// High-level memory API for AI agents.
54///
55/// `Memory` wires together fact storage, vector search, graph storage, and an
56/// embedding provider. Most callers should construct it via `in_memory` or
57/// `open` rather than calling `new` directly.
58pub struct Memory {
59    fact_store: Arc<dyn FactStore>,
60    vector_store: Arc<dyn VectorStore>,
61    graph_store: Arc<dyn GraphStore>,
62    embedding: Arc<dyn EmbeddingProvider>,
63    message_store: Option<Arc<dyn MessageStore>>,
64}
65
66impl Memory {
67    // -----------------------------------------------------------------------
68    // Constructors
69    // -----------------------------------------------------------------------
70
71    /// Construct `Memory` from explicit store and embedding instances.
72    pub fn new(
73        fact_store: Arc<dyn FactStore>,
74        vector_store: Arc<dyn VectorStore>,
75        graph_store: Arc<dyn GraphStore>,
76        embedding: Arc<dyn EmbeddingProvider>,
77    ) -> Self {
78        Self {
79            fact_store,
80            vector_store,
81            graph_store,
82            embedding,
83            message_store: None,
84        }
85    }
86
87    /// Builder method — attach a `MessageStore` to this `Memory` instance.
88    pub fn with_message_store(mut self, store: Arc<dyn MessageStore>) -> Self {
89        self.message_store = Some(store);
90        self
91    }
92
93    /// Create a fully in-memory `Memory` instance backed by SQLite `:memory:`.
94    ///
95    /// Schema migration is applied automatically. Suitable for testing and
96    /// short-lived agent invocations.
97    pub async fn in_memory(embedding: Box<dyn EmbeddingProvider>) -> Result<Self, MemoryError> {
98        let dims = embedding.dimensions();
99        let embedding = Arc::from(embedding);
100
101        let fact_store = SqliteFactStore::open("sqlite::memory:")
102            .await
103            .map_err(|e| MemoryError::Database(format!("failed to open in-memory SQLite: {e}")))?;
104        fact_store
105            .migrate()
106            .await
107            .map_err(|e| MemoryError::Database(format!("fact store migration failed: {e}")))?;
108
109        let graph_store = SqliteGraphStore::open("sqlite::memory:")
110            .await
111            .map_err(|e| MemoryError::Database(format!("failed to open in-memory graph: {e}")))?;
112        graph_store
113            .migrate()
114            .await
115            .map_err(|e| MemoryError::Database(format!("graph store migration failed: {e}")))?;
116
117        let message_store = SqliteMessageStore::open("sqlite::memory:")
118            .await
119            .map_err(|e| {
120                MemoryError::Database(format!("failed to open in-memory message store: {e}"))
121            })?;
122        message_store
123            .migrate()
124            .await
125            .map_err(|e| MemoryError::Database(format!("message store migration failed: {e}")))?;
126
127        let vector_store = EmbeddedVectorStore::new(dims);
128
129        Ok(Self {
130            fact_store: Arc::new(fact_store),
131            vector_store: Arc::new(vector_store),
132            graph_store: Arc::new(graph_store),
133            embedding,
134            message_store: Some(Arc::new(message_store)),
135        })
136    }
137
138    /// Open a file-backed `Memory` instance at `database_url`.
139    ///
140    /// Uses SQLite for facts and graph data, with an in-process
141    /// `EmbeddedVectorStore` for semantic search. Schema migration is applied
142    /// automatically.
143    pub async fn open(
144        database_url: &str,
145        embedding: Box<dyn EmbeddingProvider>,
146    ) -> Result<Self, MemoryError> {
147        let dims = embedding.dimensions();
148        let embedding = Arc::from(embedding);
149
150        let fact_store = SqliteFactStore::open(database_url)
151            .await
152            .map_err(|e| MemoryError::Database(format!("failed to open SQLite: {e}")))?;
153        fact_store
154            .migrate()
155            .await
156            .map_err(|e| MemoryError::Database(format!("fact store migration failed: {e}")))?;
157
158        let graph_store = SqliteGraphStore::open(database_url)
159            .await
160            .map_err(|e| MemoryError::Database(format!("failed to open graph SQLite: {e}")))?;
161        graph_store
162            .migrate()
163            .await
164            .map_err(|e| MemoryError::Database(format!("graph store migration failed: {e}")))?;
165
166        let message_store = SqliteMessageStore::open(database_url).await.map_err(|e| {
167            MemoryError::Database(format!("failed to open message store SQLite: {e}"))
168        })?;
169        message_store
170            .migrate()
171            .await
172            .map_err(|e| MemoryError::Database(format!("message store migration failed: {e}")))?;
173
174        let vector_store = EmbeddedVectorStore::new(dims);
175
176        Ok(Self {
177            fact_store: Arc::new(fact_store),
178            vector_store: Arc::new(vector_store),
179            graph_store: Arc::new(graph_store),
180            embedding,
181            message_store: Some(Arc::new(message_store)),
182        })
183    }
184
185    /// Open a PostgreSQL-backed `Memory` instance at `database_url`.
186    ///
187    /// Uses Postgres for facts, graph, and message data, with an in-process
188    /// `EmbeddedVectorStore` for semantic search. Schema migration is applied
189    /// automatically.
190    pub async fn open_postgres(
191        database_url: &str,
192        embedding: Box<dyn EmbeddingProvider>,
193    ) -> Result<Self, MemoryError> {
194        let dims = embedding.dimensions();
195        let embedding = Arc::from(embedding);
196
197        let fact_store = PostgresFactStore::open(database_url)
198            .await
199            .map_err(|e| MemoryError::Database(format!("failed to open Postgres: {e}")))?;
200        fact_store
201            .migrate()
202            .await
203            .map_err(|e| MemoryError::Database(format!("fact store migration failed: {e}")))?;
204
205        let graph_store = PostgresGraphStore::open(database_url)
206            .await
207            .map_err(|e| MemoryError::Database(format!("failed to open graph Postgres: {e}")))?;
208        graph_store
209            .migrate()
210            .await
211            .map_err(|e| MemoryError::Database(format!("graph store migration failed: {e}")))?;
212
213        let message_store = PostgresMessageStore::open(database_url)
214            .await
215            .map_err(|e| {
216                MemoryError::Database(format!("failed to open message store Postgres: {e}"))
217            })?;
218        message_store
219            .migrate()
220            .await
221            .map_err(|e| MemoryError::Database(format!("message store migration failed: {e}")))?;
222
223        let vector_store = EmbeddedVectorStore::new(dims);
224
225        Ok(Self {
226            fact_store: Arc::new(fact_store),
227            vector_store: Arc::new(vector_store),
228            graph_store: Arc::new(graph_store),
229            embedding,
230            message_store: Some(Arc::new(message_store)),
231        })
232    }
233
234    // -----------------------------------------------------------------------
235    // Write operations
236    // -----------------------------------------------------------------------
237
238    /// Embed `text`, create a `Fact`, and persist it in both the fact store
239    /// and the vector store.
240    ///
241    /// Returns the `FactId` of the newly created fact.
242    pub async fn add_fact(&self, text: &str, scope: Scope) -> Result<FactId, MemoryError> {
243        // Embed text.
244        let mut embeddings = self.embedding.embed(&[text]).await?;
245        let embedding = embeddings.pop().ok_or_else(|| {
246            MemoryError::Embedding("provider returned empty embeddings".to_string())
247        })?;
248
249        // Build and persist the fact.
250        let mut fact = Fact::new(text, scope);
251        fact.embedding = embedding.clone();
252        let id = self.fact_store.insert_fact(fact).await?;
253
254        // Insert into vector store.
255        let metadata = serde_json::json!({ "fact_id": id.to_string() });
256        self.vector_store.upsert(id, embedding, metadata).await?;
257
258        Ok(id)
259    }
260
261    /// Semantically recall facts matching `query`.
262    ///
263    /// Embeds the query text, performs vector search, fetches full facts from
264    /// the fact store, filters by validity and scope, and records an access
265    /// event for each returned fact.
266    pub async fn recall(&self, query: &RecallQuery) -> Result<Vec<Fact>, MemoryError> {
267        let max_results = if query.max_results == 0 {
268            10
269        } else {
270            query.max_results
271        };
272
273        // Embed the query.
274        let mut embeddings = self.embedding.embed(&[query.query.as_str()]).await?;
275        let query_vec = embeddings.pop().ok_or_else(|| {
276            MemoryError::Embedding("provider returned empty embeddings".to_string())
277        })?;
278
279        // Vector search.
280        let filter = VectorFilter {
281            scope: query.scope.clone(),
282            min_score: query.min_score,
283        };
284        let matches = self
285            .vector_store
286            .search(&query_vec, &filter, max_results)
287            .await?;
288
289        // Fetch full facts from the fact store.
290        let mut facts = Vec::with_capacity(matches.len());
291        for vm in matches {
292            match self.fact_store.get_fact(vm.id).await {
293                Ok(fact) => {
294                    // Validity filter.
295                    let valid = match query.as_of {
296                        Some(t) => fact.is_valid_at(t),
297                        None => fact.is_valid(),
298                    };
299                    if !valid {
300                        continue;
301                    }
302                    // Scope filter (post-fetch).
303                    if let Some(ref scope) = query.scope {
304                        if !scope.contains(&fact.scope) {
305                            continue;
306                        }
307                    }
308                    // Record access (fire-and-forget; ignore error).
309                    let _ = self.fact_store.record_access(fact.id).await;
310                    facts.push(fact);
311                }
312                Err(MemoryError::NotFound(_)) => {
313                    // Vector store has a stale entry — skip silently.
314                }
315                Err(e) => return Err(e),
316            }
317        }
318
319        Ok(facts)
320    }
321
322    // -----------------------------------------------------------------------
323    // Read operations
324    // -----------------------------------------------------------------------
325
326    /// List currently-valid facts for the given scope.
327    pub async fn list_facts(&self, scope: Option<Scope>) -> Result<Vec<Fact>, MemoryError> {
328        let filter = match scope {
329            Some(s) => FactFilter::new().with_scope(s),
330            None => FactFilter::new(),
331        };
332        self.fact_store.list_facts(&filter).await
333    }
334
335    // -----------------------------------------------------------------------
336    // Mutation operations
337    // -----------------------------------------------------------------------
338
339    /// Invalidate (soft-delete) a fact and remove it from the vector store.
340    ///
341    /// The fact record is preserved for historical queries. `reason` is
342    /// currently logged via tracing but not stored (reserved for future use).
343    pub async fn forget(&self, id: FactId, _reason: Option<&str>) -> Result<(), MemoryError> {
344        self.fact_store.invalidate_fact(id).await?;
345        self.vector_store.delete(id).await?;
346        Ok(())
347    }
348
349    /// Hard-delete all data (facts, vectors, graph nodes/edges) for `scope`.
350    ///
351    /// Returns the number of facts deleted. Intended for GDPR / right-to-erasure
352    /// requests.
353    pub async fn delete_user_data(&self, scope: Scope) -> Result<u64, MemoryError> {
354        let fact_count = self.fact_store.delete_scope_data(&scope).await?;
355        self.vector_store.delete_by_scope(&scope).await?;
356        self.graph_store.delete_by_scope(&scope).await?;
357        Ok(fact_count)
358    }
359
360    // -----------------------------------------------------------------------
361    // Stats & export/import
362    // -----------------------------------------------------------------------
363
364    /// Return aggregate statistics for the fact store.
365    pub async fn stats(&self, _scope: Option<Scope>) -> Result<StoreStats, MemoryError> {
366        self.fact_store.stats().await
367    }
368
369    /// Export all currently-valid facts for `scope`.
370    pub async fn export(&self, scope: Option<Scope>) -> Result<Vec<Fact>, MemoryError> {
371        let filter = match scope {
372            Some(s) => FactFilter::new().with_scope(s),
373            None => FactFilter::new(),
374        };
375        self.fact_store.export(&filter).await
376    }
377
378    /// Import a batch of facts, re-embedding each one.
379    ///
380    /// Returns the number of facts successfully imported (skips duplicates by id).
381    pub async fn import(&self, facts: Vec<Fact>) -> Result<u64, MemoryError> {
382        let mut imported: u64 = 0;
383        for mut fact in facts {
384            // Re-embed the fact text.
385            let mut embeddings = self.embedding.embed(&[fact.text.as_str()]).await?;
386            let embedding = embeddings.pop().ok_or_else(|| {
387                MemoryError::Embedding("provider returned empty embeddings".to_string())
388            })?;
389            fact.embedding = embedding.clone();
390
391            let fact_id = fact.id;
392            self.fact_store.insert_fact(fact).await?;
393
394            let metadata = serde_json::json!({ "fact_id": fact_id.to_string() });
395            self.vector_store
396                .upsert(fact_id, embedding, metadata)
397                .await?;
398            imported += 1;
399        }
400        Ok(imported)
401    }
402
403    // -----------------------------------------------------------------------
404    // Consolidation
405    // -----------------------------------------------------------------------
406
407    /// Run a consolidation cycle over the given scope.
408    ///
409    /// Operations (decay, promote, dedup, summarize, reflect) are controlled
410    /// by `config.enabled_ops`. LLM-dependent operations (summarize, reflect)
411    /// are skipped when `llm` is `None`.
412    pub async fn consolidate(
413        &self,
414        scope: &Scope,
415        llm: Option<&dyn LlmClient>,
416        config: ConsolidationConfig,
417    ) -> Result<ConsolidationResult, MemoryError> {
418        let engine = ConsolidationEngine::new(
419            self.fact_store.clone(),
420            self.vector_store.clone(),
421            self.embedding.clone(),
422            config,
423        );
424        engine.run(scope, llm).await
425    }
426
427    // -----------------------------------------------------------------------
428    // Context assembly
429    // -----------------------------------------------------------------------
430
431    /// Assemble a token-budgeted context block for LLM prompt injection.
432    ///
433    /// Retrieves relevant facts via hybrid search (vector + keyword + graph),
434    /// ranks by tier priority (Working > Conversation > Knowledge), fills the
435    /// token budget greedily, and formats the output.
436    pub async fn context(
437        &self,
438        query: &str,
439        scope: &Scope,
440        config: ContextConfig,
441    ) -> Result<ContextBlock, MemoryError> {
442        let builder = ContextBuilder::new(
443            self.fact_store.clone(),
444            self.vector_store.clone(),
445            self.graph_store.clone(),
446            self.embedding.clone(),
447            config,
448        );
449        builder.build(query, scope).await
450    }
451
452    // -----------------------------------------------------------------------
453    // Extraction pipeline
454    // -----------------------------------------------------------------------
455
456    /// Ingest conversation messages: extract facts via LLM, detect conflicts,
457    /// store facts + entities + relationships.
458    ///
459    /// Returns the IDs of newly created facts.
460    pub async fn add_messages(
461        &self,
462        messages: &[Message],
463        scope: Scope,
464        llm: Box<dyn LlmClient>,
465        config: ExtractionConfig,
466    ) -> Result<Vec<FactId>, MemoryError> {
467        let pipeline = ExtractionPipeline::new(llm, config);
468        let extraction = pipeline.extract(messages).await?;
469
470        let mut fact_ids = Vec::new();
471
472        for extracted in extraction.facts {
473            // Create and embed the fact
474            let mut embeddings = self.embedding.embed(&[extracted.text.as_str()]).await?;
475            let embedding = embeddings
476                .pop()
477                .ok_or_else(|| MemoryError::Embedding("empty embedding".to_string()))?;
478
479            let mut fact = Fact::new(&extracted.text, scope.clone());
480            fact.confidence = Some(extracted.confidence as f32);
481            fact.category = extracted.category;
482            fact.embedding = embedding.clone();
483
484            let id = self.fact_store.insert_fact(fact).await?;
485            self.vector_store
486                .upsert(id, embedding, serde_json::json!({}))
487                .await?;
488
489            // Store entities in graph
490            let mut entity_map: HashMap<String, uuid::Uuid> = HashMap::new();
491            for ext_entity in &extracted.entities {
492                let entity = Entity::new(&ext_entity.name, scope.clone())
493                    .with_type(ext_entity.entity_type.as_deref().unwrap_or("unknown"));
494                entity_map.insert(ext_entity.name.clone(), entity.id);
495                self.graph_store.upsert_entity(&entity).await?;
496            }
497
498            // Store relationships in graph
499            for ext_rel in &extracted.relationships {
500                if let (Some(&src_id), Some(&tgt_id)) = (
501                    entity_map.get(&ext_rel.source),
502                    entity_map.get(&ext_rel.target),
503                ) {
504                    let rel = Relationship::new(src_id, &ext_rel.relation, tgt_id, scope.clone());
505                    self.graph_store.upsert_relationship(&rel).await?;
506                }
507            }
508
509            fact_ids.push(id);
510        }
511
512        Ok(fact_ids)
513    }
514
515    // -----------------------------------------------------------------------
516    // Accessors
517    // -----------------------------------------------------------------------
518
519    /// Access the underlying `FactStore`.
520    pub fn fact_store(&self) -> &Arc<dyn FactStore> {
521        &self.fact_store
522    }
523
524    /// Access the underlying `VectorStore`.
525    pub fn vector_store(&self) -> &Arc<dyn VectorStore> {
526        &self.vector_store
527    }
528
529    /// Access the underlying `GraphStore`.
530    pub fn graph_store(&self) -> &Arc<dyn GraphStore> {
531        &self.graph_store
532    }
533
534    /// Access the underlying `MessageStore`, if configured.
535    pub fn message_store(&self) -> Option<&Arc<dyn MessageStore>> {
536        self.message_store.as_ref()
537    }
538
539    // -----------------------------------------------------------------------
540    // Chat message operations
541    // -----------------------------------------------------------------------
542
543    /// Save chat messages to a conversation.
544    pub async fn save_chat_messages(
545        &self,
546        conversation_id: &str,
547        messages: &[ChatMessage],
548        scope: &Scope,
549    ) -> Result<Vec<MessageId>, MemoryError> {
550        let store = self
551            .message_store
552            .as_ref()
553            .ok_or_else(|| MemoryError::Database("message store not configured".to_string()))?;
554        store.save_messages(conversation_id, messages, scope).await
555    }
556
557    /// Retrieve chat messages from a conversation.
558    pub async fn get_chat_messages(
559        &self,
560        conversation_id: &str,
561        last_n: Option<usize>,
562        scope: &Scope,
563    ) -> Result<Vec<ChatMessage>, MemoryError> {
564        let store = self
565            .message_store
566            .as_ref()
567            .ok_or_else(|| MemoryError::Database("message store not configured".to_string()))?;
568        store.get_messages(conversation_id, last_n, scope).await
569    }
570
571    /// List all conversations visible to the given scope.
572    pub async fn list_conversations(&self, scope: &Scope) -> Result<Vec<String>, MemoryError> {
573        let store = self
574            .message_store
575            .as_ref()
576            .ok_or_else(|| MemoryError::Database("message store not configured".to_string()))?;
577        store.list_conversations(scope).await
578    }
579
580    /// Delete all messages in a conversation.
581    pub async fn delete_chat_messages(
582        &self,
583        conversation_id: &str,
584        scope: &Scope,
585    ) -> Result<u64, MemoryError> {
586        let store = self
587            .message_store
588            .as_ref()
589            .ok_or_else(|| MemoryError::Database("message store not configured".to_string()))?;
590        store.delete_messages(conversation_id, scope).await
591    }
592}