Skip to main content

recall_echo/graph/
mod.rs

1//! recall-graph — Knowledge graph with semantic search for AI memory systems.
2//!
3//! Provides a structured graph layer (Layer 0) underneath flat-file memory systems.
4//! Used by recall-echo (pulse-null entities) and recall-claude (Claude Code users).
5
6pub mod confidence;
7pub mod crud;
8pub mod dedup;
9pub mod embed;
10pub mod error;
11pub mod extract;
12pub mod ingest;
13pub mod llm;
14pub mod pipeline;
15pub mod pipeline_sync;
16pub mod query;
17pub mod search;
18pub mod store;
19pub mod traverse;
20pub mod types;
21pub mod vigil_sync;
22
23use std::collections::HashMap;
24use std::path::{Path, PathBuf};
25
26use embed::FastEmbedder;
27use error::GraphError;
28use store::Db;
29#[allow(unused_imports)]
30use surrealdb::types::SurrealValue;
31use surrealdb::Surreal;
32use types::*;
33
34/// Take serde_json::Value results from a SurrealDB response and deserialize to a Rust type.
35/// This avoids needing SurrealValue derive on complex types.
36pub(crate) fn deserialize_take<T: serde::de::DeserializeOwned>(
37    response: &mut surrealdb::IndexedResults,
38    index: usize,
39) -> Result<Vec<T>, GraphError> {
40    let values: Vec<serde_json::Value> = response.take(index)?;
41    values
42        .into_iter()
43        .map(|v| serde_json::from_value(v).map_err(GraphError::from))
44        .collect()
45}
46
47pub(crate) fn deserialize_take_opt<T: serde::de::DeserializeOwned>(
48    response: &mut surrealdb::IndexedResults,
49    index: usize,
50) -> Result<Option<T>, GraphError> {
51    let values: Vec<T> = deserialize_take(response, index)?;
52    Ok(values.into_iter().next())
53}
54
55/// The main entry point for graph memory operations.
56pub struct GraphMemory {
57    db: Surreal<Db>,
58    embedder: FastEmbedder,
59    path: PathBuf,
60}
61
62impl GraphMemory {
63    /// Open or create a graph store at the given path.
64    /// Path should be the `graph/` directory inside the memory directory.
65    pub async fn open(path: &Path) -> Result<Self, GraphError> {
66        std::fs::create_dir_all(path)?;
67
68        let db = store::open(path).await?;
69        store::init_schema(&db).await?;
70
71        let models_dir = path.join("models");
72        std::fs::create_dir_all(&models_dir)?;
73        let embedder = FastEmbedder::new(&models_dir)?;
74
75        Ok(Self {
76            db,
77            embedder,
78            path: path.to_path_buf(),
79        })
80    }
81
82    /// Path to the graph store.
83    pub fn path(&self) -> &Path {
84        &self.path
85    }
86
87    /// Internal access to the database handle.
88    #[allow(dead_code)]
89    pub(crate) fn db(&self) -> &Surreal<Db> {
90        &self.db
91    }
92
93    /// Internal access to the embedder.
94    #[allow(dead_code)]
95    pub(crate) fn embedder(&self) -> &FastEmbedder {
96        &self.embedder
97    }
98
99    // --- Entity CRUD ---
100
101    /// Add a new entity to the graph.
102    pub async fn add_entity(&self, entity: NewEntity) -> Result<Entity, GraphError> {
103        crud::add_entity(&self.db, &self.embedder, entity).await
104    }
105
106    /// Get an entity by name.
107    pub async fn get_entity(&self, name: &str) -> Result<Option<Entity>, GraphError> {
108        crud::get_entity_by_name(&self.db, name).await
109    }
110
111    /// Get an entity by its record ID.
112    pub async fn get_entity_by_id(&self, id: &str) -> Result<Option<Entity>, GraphError> {
113        crud::get_entity_by_id(&self.db, id).await
114    }
115
116    /// Update an entity's fields.
117    pub async fn update_entity(
118        &self,
119        id: &str,
120        updates: EntityUpdate,
121    ) -> Result<Entity, GraphError> {
122        crud::update_entity(&self.db, &self.embedder, id, updates).await
123    }
124
125    /// Delete an entity and its relationships.
126    pub async fn delete_entity(&self, id: &str) -> Result<(), GraphError> {
127        crud::delete_entity(&self.db, id).await
128    }
129
130    /// List all entities, optionally filtered by type.
131    pub async fn list_entities(
132        &self,
133        entity_type: Option<&str>,
134    ) -> Result<Vec<Entity>, GraphError> {
135        crud::list_entities(&self.db, entity_type).await
136    }
137
138    // --- Relationships ---
139
140    /// Create a relationship between two named entities.
141    pub async fn add_relationship(&self, rel: NewRelationship) -> Result<Relationship, GraphError> {
142        crud::add_relationship(&self.db, rel).await
143    }
144
145    /// Get relationships for an entity.
146    pub async fn get_relationships(
147        &self,
148        entity_name: &str,
149        direction: Direction,
150    ) -> Result<Vec<Relationship>, GraphError> {
151        crud::get_relationships(&self.db, entity_name, direction).await
152    }
153
154    /// Supersede a relationship: close the old one, create a new one.
155    pub async fn supersede_relationship(
156        &self,
157        old_id: &str,
158        new: NewRelationship,
159    ) -> Result<Relationship, GraphError> {
160        crud::supersede_relationship(&self.db, old_id, new).await
161    }
162
163    /// Update relationship confidence (Bayesian posterior).
164    pub async fn update_relationship_confidence(
165        &self,
166        rel_id: &str,
167        confidence: f64,
168    ) -> Result<(), GraphError> {
169        crud::update_relationship_confidence(&self.db, rel_id, confidence).await
170    }
171
172    // --- Episodes ---
173
174    /// Add a new episode to the graph.
175    pub async fn add_episode(&self, episode: NewEpisode) -> Result<Episode, GraphError> {
176        crud::add_episode(&self.db, &self.embedder, episode).await
177    }
178
179    /// Get episodes by session ID.
180    pub async fn get_episodes_by_session(
181        &self,
182        session_id: &str,
183    ) -> Result<Vec<Episode>, GraphError> {
184        crud::get_episodes_by_session(&self.db, session_id).await
185    }
186
187    /// Get episode by log number.
188    pub async fn get_episode_by_log_number(
189        &self,
190        log_number: u32,
191    ) -> Result<Option<Episode>, GraphError> {
192        crud::get_episode_by_log_number(&self.db, log_number).await
193    }
194
195    // --- Ingestion ---
196
197    /// Ingest a conversation archive into the knowledge graph.
198    pub async fn ingest_archive(
199        &self,
200        archive_text: &str,
201        session_id: &str,
202        log_number: Option<u32>,
203        llm: Option<&dyn llm::LlmProvider>,
204    ) -> Result<IngestionReport, GraphError> {
205        ingest::ingest_archive(self, archive_text, session_id, log_number, llm).await
206    }
207
208    /// Run LLM extraction on an archive without creating episodes.
209    pub async fn extract_from_archive(
210        &self,
211        archive_text: &str,
212        session_id: &str,
213        log_number: Option<u32>,
214        llm: &dyn llm::LlmProvider,
215    ) -> Result<IngestionReport, GraphError> {
216        ingest::extract_from_archive(self, archive_text, session_id, log_number, llm).await
217    }
218
219    /// Mark all episodes with a given log_number as extracted.
220    pub async fn mark_extracted(&self, log_number: u32) -> Result<(), GraphError> {
221        crud::mark_episodes_extracted(&self.db, log_number).await
222    }
223
224    /// Get log numbers of episodes that have NOT been extracted.
225    pub async fn unextracted_log_numbers(&self) -> Result<Vec<i64>, GraphError> {
226        crud::get_unextracted_log_numbers(&self.db).await
227    }
228
229    // --- Search ---
230
231    /// Semantic search across entities (legacy — returns full Entity).
232    pub async fn search(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>, GraphError> {
233        search::search(&self.db, &self.embedder, query, limit).await
234    }
235
236    /// Search with options — L1 projections, type/keyword filters.
237    pub async fn search_with_options(
238        &self,
239        query: &str,
240        options: &SearchOptions,
241    ) -> Result<Vec<ScoredEntity>, GraphError> {
242        search::search_with_options(&self.db, &self.embedder, query, options).await
243    }
244
245    /// Semantic search across episodes.
246    pub async fn search_episodes(
247        &self,
248        query: &str,
249        limit: usize,
250    ) -> Result<Vec<EpisodeSearchResult>, GraphError> {
251        search::search_episodes(&self.db, &self.embedder, query, limit).await
252    }
253
254    // --- Hybrid Query ---
255
256    /// Hybrid query: semantic + graph expansion + optional episode search.
257    pub async fn query(
258        &self,
259        query_text: &str,
260        options: &QueryOptions,
261    ) -> Result<QueryResult, GraphError> {
262        query::query(&self.db, &self.embedder, query_text, options).await
263    }
264
265    // --- Traversal ---
266
267    /// Traverse the graph from a named entity.
268    pub async fn traverse(
269        &self,
270        entity_name: &str,
271        depth: u32,
272    ) -> Result<TraversalNode, GraphError> {
273        traverse::traverse(&self.db, entity_name, depth).await
274    }
275
276    /// Traverse with type filter.
277    pub async fn traverse_filtered(
278        &self,
279        entity_name: &str,
280        depth: u32,
281        type_filter: Option<&str>,
282    ) -> Result<TraversalNode, GraphError> {
283        traverse::traverse_filtered(&self.db, entity_name, depth, type_filter).await
284    }
285
286    // --- Pipeline ---
287
288    /// Sync pipeline documents into the graph.
289    pub async fn sync_pipeline(
290        &self,
291        docs: &PipelineDocuments,
292    ) -> Result<PipelineSyncReport, GraphError> {
293        pipeline_sync::sync_pipeline(self, docs).await
294    }
295
296    /// Get pipeline stats from the graph.
297    pub async fn pipeline_stats(
298        &self,
299        staleness_days: u32,
300    ) -> Result<PipelineGraphStats, GraphError> {
301        query::pipeline_stats(&self.db, staleness_days).await
302    }
303
304    /// Get pipeline entities by stage and optional status.
305    pub async fn pipeline_entities(
306        &self,
307        stage: &str,
308        status: Option<&str>,
309    ) -> Result<Vec<EntityDetail>, GraphError> {
310        query::pipeline_entities(&self.db, stage, status).await
311    }
312
313    /// Trace pipeline flow for an entity.
314    pub async fn pipeline_flow(
315        &self,
316        entity_name: &str,
317    ) -> Result<Vec<(EntityDetail, String, EntityDetail)>, GraphError> {
318        query::pipeline_flow(&self.db, entity_name).await
319    }
320
321    // --- Vigil Sync ---
322
323    /// Sync vigil signal vectors into the graph as Measurement entities.
324    pub async fn sync_vigil_signals(
325        &self,
326        signals_path: &std::path::Path,
327    ) -> Result<VigilSyncReport, GraphError> {
328        vigil_sync::sync_vigil_signals(self, signals_path).await
329    }
330
331    /// Sync outcome records into the graph as Outcome entities.
332    pub async fn sync_outcomes(
333        &self,
334        outcomes_path: &std::path::Path,
335    ) -> Result<VigilSyncReport, GraphError> {
336        vigil_sync::sync_outcomes(self, outcomes_path).await
337    }
338
339    /// Sync both vigil signals and outcomes in one call.
340    pub async fn sync_vigil(
341        &self,
342        signals_path: &std::path::Path,
343        outcomes_path: &std::path::Path,
344    ) -> Result<VigilSyncReport, GraphError> {
345        vigil_sync::sync_vigil(self, signals_path, outcomes_path).await
346    }
347
348    // --- Stats ---
349
350    /// Get graph statistics.
351    pub async fn stats(&self) -> Result<GraphStats, GraphError> {
352        let entity_count = db_count(&self.db, "entity").await?;
353        let relationship_count = db_count(&self.db, "relates_to").await?;
354        let episode_count = db_count(&self.db, "episode").await?;
355
356        // Count by type
357        let mut type_response = self
358            .db
359            .query("SELECT entity_type, count() AS count FROM entity GROUP BY entity_type")
360            .await?;
361
362        let type_rows: Vec<TypeCount> = type_response.take(0)?;
363        let entity_type_counts: HashMap<String, u64> = type_rows
364            .into_iter()
365            .map(|r| (r.entity_type, r.count))
366            .collect();
367
368        Ok(GraphStats {
369            entity_count,
370            relationship_count,
371            episode_count,
372            entity_type_counts,
373        })
374    }
375}
376
377async fn db_count(db: &Surreal<Db>, table: &str) -> Result<u64, GraphError> {
378    let query = format!("SELECT count() AS count FROM {} GROUP ALL", table);
379    let mut response = db.query(&query).await?;
380    let rows: Vec<CountRow> = response.take(0)?;
381    Ok(rows.first().map(|r| r.count).unwrap_or(0))
382}
383
384#[derive(serde::Deserialize, surrealdb::types::SurrealValue)]
385struct CountRow {
386    count: u64,
387}
388
389#[derive(serde::Deserialize, surrealdb::types::SurrealValue)]
390struct TypeCount {
391    entity_type: String,
392    count: u64,
393}