Skip to main content

recall_echo/graph/
mod.rs

1//! recall-graph — Knowledge graph with semantic search for AI memory systems.
2//!
3//! Provides a structured graph layer (Layer 0) underneath flat-file memory systems.
4//! Used by recall-echo (pulse-null entities) and recall-claude (Claude Code users).
5
6pub mod crud;
7pub mod dedup;
8pub mod embed;
9pub mod error;
10pub mod extract;
11pub mod ingest;
12pub mod llm;
13pub mod pipeline;
14pub mod pipeline_sync;
15pub mod query;
16pub mod search;
17pub mod store;
18pub mod traverse;
19pub mod types;
20pub mod vigil_sync;
21
22use std::collections::HashMap;
23use std::path::{Path, PathBuf};
24
25use embed::FastEmbedder;
26use error::GraphError;
27use store::Db;
28#[allow(unused_imports)]
29use surrealdb::types::SurrealValue;
30use surrealdb::Surreal;
31use types::*;
32
33/// Take serde_json::Value results from a SurrealDB response and deserialize to a Rust type.
34/// This avoids needing SurrealValue derive on complex types.
35pub(crate) fn deserialize_take<T: serde::de::DeserializeOwned>(
36    response: &mut surrealdb::IndexedResults,
37    index: usize,
38) -> Result<Vec<T>, GraphError> {
39    let values: Vec<serde_json::Value> = response.take(index)?;
40    values
41        .into_iter()
42        .map(|v| serde_json::from_value(v).map_err(GraphError::from))
43        .collect()
44}
45
46pub(crate) fn deserialize_take_opt<T: serde::de::DeserializeOwned>(
47    response: &mut surrealdb::IndexedResults,
48    index: usize,
49) -> Result<Option<T>, GraphError> {
50    let values: Vec<T> = deserialize_take(response, index)?;
51    Ok(values.into_iter().next())
52}
53
54/// The main entry point for graph memory operations.
55pub struct GraphMemory {
56    db: Surreal<Db>,
57    embedder: FastEmbedder,
58    path: PathBuf,
59}
60
61impl GraphMemory {
62    /// Open or create a graph store at the given path.
63    /// Path should be the `graph/` directory inside the memory directory.
64    pub async fn open(path: &Path) -> Result<Self, GraphError> {
65        std::fs::create_dir_all(path)?;
66
67        let db = store::open(path).await?;
68        store::init_schema(&db).await?;
69
70        let models_dir = path.join("models");
71        std::fs::create_dir_all(&models_dir)?;
72        let embedder = FastEmbedder::new(&models_dir)?;
73
74        Ok(Self {
75            db,
76            embedder,
77            path: path.to_path_buf(),
78        })
79    }
80
81    /// Path to the graph store.
82    pub fn path(&self) -> &Path {
83        &self.path
84    }
85
86    /// Internal access to the database handle.
87    #[allow(dead_code)]
88    pub(crate) fn db(&self) -> &Surreal<Db> {
89        &self.db
90    }
91
92    /// Internal access to the embedder.
93    #[allow(dead_code)]
94    pub(crate) fn embedder(&self) -> &FastEmbedder {
95        &self.embedder
96    }
97
98    // --- Entity CRUD ---
99
100    /// Add a new entity to the graph.
101    pub async fn add_entity(&self, entity: NewEntity) -> Result<Entity, GraphError> {
102        crud::add_entity(&self.db, &self.embedder, entity).await
103    }
104
105    /// Get an entity by name.
106    pub async fn get_entity(&self, name: &str) -> Result<Option<Entity>, GraphError> {
107        crud::get_entity_by_name(&self.db, name).await
108    }
109
110    /// Get an entity by its record ID.
111    pub async fn get_entity_by_id(&self, id: &str) -> Result<Option<Entity>, GraphError> {
112        crud::get_entity_by_id(&self.db, id).await
113    }
114
115    /// Update an entity's fields.
116    pub async fn update_entity(
117        &self,
118        id: &str,
119        updates: EntityUpdate,
120    ) -> Result<Entity, GraphError> {
121        crud::update_entity(&self.db, &self.embedder, id, updates).await
122    }
123
124    /// Delete an entity and its relationships.
125    pub async fn delete_entity(&self, id: &str) -> Result<(), GraphError> {
126        crud::delete_entity(&self.db, id).await
127    }
128
129    /// List all entities, optionally filtered by type.
130    pub async fn list_entities(
131        &self,
132        entity_type: Option<&str>,
133    ) -> Result<Vec<Entity>, GraphError> {
134        crud::list_entities(&self.db, entity_type).await
135    }
136
137    // --- Relationships ---
138
139    /// Create a relationship between two named entities.
140    pub async fn add_relationship(&self, rel: NewRelationship) -> Result<Relationship, GraphError> {
141        crud::add_relationship(&self.db, rel).await
142    }
143
144    /// Get relationships for an entity.
145    pub async fn get_relationships(
146        &self,
147        entity_name: &str,
148        direction: Direction,
149    ) -> Result<Vec<Relationship>, GraphError> {
150        crud::get_relationships(&self.db, entity_name, direction).await
151    }
152
153    /// Supersede a relationship: close the old one, create a new one.
154    pub async fn supersede_relationship(
155        &self,
156        old_id: &str,
157        new: NewRelationship,
158    ) -> Result<Relationship, GraphError> {
159        crud::supersede_relationship(&self.db, old_id, new).await
160    }
161
162    // --- Episodes ---
163
164    /// Add a new episode to the graph.
165    pub async fn add_episode(&self, episode: NewEpisode) -> Result<Episode, GraphError> {
166        crud::add_episode(&self.db, &self.embedder, episode).await
167    }
168
169    /// Get episodes by session ID.
170    pub async fn get_episodes_by_session(
171        &self,
172        session_id: &str,
173    ) -> Result<Vec<Episode>, GraphError> {
174        crud::get_episodes_by_session(&self.db, session_id).await
175    }
176
177    /// Get episode by log number.
178    pub async fn get_episode_by_log_number(
179        &self,
180        log_number: u32,
181    ) -> Result<Option<Episode>, GraphError> {
182        crud::get_episode_by_log_number(&self.db, log_number).await
183    }
184
185    // --- Ingestion ---
186
187    /// Ingest a conversation archive into the knowledge graph.
188    pub async fn ingest_archive(
189        &self,
190        archive_text: &str,
191        session_id: &str,
192        log_number: Option<u32>,
193        llm: Option<&dyn llm::LlmProvider>,
194    ) -> Result<IngestionReport, GraphError> {
195        ingest::ingest_archive(self, archive_text, session_id, log_number, llm).await
196    }
197
198    /// Run LLM extraction on an archive without creating episodes.
199    pub async fn extract_from_archive(
200        &self,
201        archive_text: &str,
202        session_id: &str,
203        log_number: Option<u32>,
204        llm: &dyn llm::LlmProvider,
205    ) -> Result<IngestionReport, GraphError> {
206        ingest::extract_from_archive(self, archive_text, session_id, log_number, llm).await
207    }
208
209    /// Mark all episodes with a given log_number as extracted.
210    pub async fn mark_extracted(&self, log_number: u32) -> Result<(), GraphError> {
211        crud::mark_episodes_extracted(&self.db, log_number).await
212    }
213
214    /// Get log numbers of episodes that have NOT been extracted.
215    pub async fn unextracted_log_numbers(&self) -> Result<Vec<i64>, GraphError> {
216        crud::get_unextracted_log_numbers(&self.db).await
217    }
218
219    // --- Search ---
220
221    /// Semantic search across entities (legacy — returns full Entity).
222    pub async fn search(&self, query: &str, limit: usize) -> Result<Vec<SearchResult>, GraphError> {
223        search::search(&self.db, &self.embedder, query, limit).await
224    }
225
226    /// Search with options — L1 projections, type/keyword filters.
227    pub async fn search_with_options(
228        &self,
229        query: &str,
230        options: &SearchOptions,
231    ) -> Result<Vec<ScoredEntity>, GraphError> {
232        search::search_with_options(&self.db, &self.embedder, query, options).await
233    }
234
235    /// Semantic search across episodes.
236    pub async fn search_episodes(
237        &self,
238        query: &str,
239        limit: usize,
240    ) -> Result<Vec<EpisodeSearchResult>, GraphError> {
241        search::search_episodes(&self.db, &self.embedder, query, limit).await
242    }
243
244    // --- Hybrid Query ---
245
246    /// Hybrid query: semantic + graph expansion + optional episode search.
247    pub async fn query(
248        &self,
249        query_text: &str,
250        options: &QueryOptions,
251    ) -> Result<QueryResult, GraphError> {
252        query::query(&self.db, &self.embedder, query_text, options).await
253    }
254
255    // --- Traversal ---
256
257    /// Traverse the graph from a named entity.
258    pub async fn traverse(
259        &self,
260        entity_name: &str,
261        depth: u32,
262    ) -> Result<TraversalNode, GraphError> {
263        traverse::traverse(&self.db, entity_name, depth).await
264    }
265
266    /// Traverse with type filter.
267    pub async fn traverse_filtered(
268        &self,
269        entity_name: &str,
270        depth: u32,
271        type_filter: Option<&str>,
272    ) -> Result<TraversalNode, GraphError> {
273        traverse::traverse_filtered(&self.db, entity_name, depth, type_filter).await
274    }
275
276    // --- Pipeline ---
277
278    /// Sync pipeline documents into the graph.
279    pub async fn sync_pipeline(
280        &self,
281        docs: &PipelineDocuments,
282    ) -> Result<PipelineSyncReport, GraphError> {
283        pipeline_sync::sync_pipeline(self, docs).await
284    }
285
286    /// Get pipeline stats from the graph.
287    pub async fn pipeline_stats(
288        &self,
289        staleness_days: u32,
290    ) -> Result<PipelineGraphStats, GraphError> {
291        query::pipeline_stats(&self.db, staleness_days).await
292    }
293
294    /// Get pipeline entities by stage and optional status.
295    pub async fn pipeline_entities(
296        &self,
297        stage: &str,
298        status: Option<&str>,
299    ) -> Result<Vec<EntityDetail>, GraphError> {
300        query::pipeline_entities(&self.db, stage, status).await
301    }
302
303    /// Trace pipeline flow for an entity.
304    pub async fn pipeline_flow(
305        &self,
306        entity_name: &str,
307    ) -> Result<Vec<(EntityDetail, String, EntityDetail)>, GraphError> {
308        query::pipeline_flow(&self.db, entity_name).await
309    }
310
311    // --- Vigil Sync ---
312
313    /// Sync vigil signal vectors into the graph as Measurement entities.
314    pub async fn sync_vigil_signals(
315        &self,
316        signals_path: &std::path::Path,
317    ) -> Result<VigilSyncReport, GraphError> {
318        vigil_sync::sync_vigil_signals(self, signals_path).await
319    }
320
321    /// Sync outcome records into the graph as Outcome entities.
322    pub async fn sync_outcomes(
323        &self,
324        outcomes_path: &std::path::Path,
325    ) -> Result<VigilSyncReport, GraphError> {
326        vigil_sync::sync_outcomes(self, outcomes_path).await
327    }
328
329    /// Sync both vigil signals and outcomes in one call.
330    pub async fn sync_vigil(
331        &self,
332        signals_path: &std::path::Path,
333        outcomes_path: &std::path::Path,
334    ) -> Result<VigilSyncReport, GraphError> {
335        vigil_sync::sync_vigil(self, signals_path, outcomes_path).await
336    }
337
338    // --- Stats ---
339
340    /// Get graph statistics.
341    pub async fn stats(&self) -> Result<GraphStats, GraphError> {
342        let entity_count = db_count(&self.db, "entity").await?;
343        let relationship_count = db_count(&self.db, "relates_to").await?;
344        let episode_count = db_count(&self.db, "episode").await?;
345
346        // Count by type
347        let mut type_response = self
348            .db
349            .query("SELECT entity_type, count() AS count FROM entity GROUP BY entity_type")
350            .await?;
351
352        let type_rows: Vec<TypeCount> = type_response.take(0)?;
353        let entity_type_counts: HashMap<String, u64> = type_rows
354            .into_iter()
355            .map(|r| (r.entity_type, r.count))
356            .collect();
357
358        Ok(GraphStats {
359            entity_count,
360            relationship_count,
361            episode_count,
362            entity_type_counts,
363        })
364    }
365}
366
367async fn db_count(db: &Surreal<Db>, table: &str) -> Result<u64, GraphError> {
368    let query = format!("SELECT count() AS count FROM {} GROUP ALL", table);
369    let mut response = db.query(&query).await?;
370    let rows: Vec<CountRow> = response.take(0)?;
371    Ok(rows.first().map(|r| r.count).unwrap_or(0))
372}
373
374#[derive(serde::Deserialize, surrealdb::types::SurrealValue)]
375struct CountRow {
376    count: u64,
377}
378
379#[derive(serde::Deserialize, surrealdb::types::SurrealValue)]
380struct TypeCount {
381    entity_type: String,
382    count: u64,
383}