Skip to main content

zeph_memory/graph/store/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::collections::HashMap;
5
6use futures::Stream;
7use sqlx::SqlitePool;
8
9use crate::error::MemoryError;
10use crate::sqlite::messages::sanitize_fts5_query;
11use crate::types::MessageId;
12
13use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
14
15pub struct GraphStore {
16    pool: SqlitePool,
17}
18
19impl GraphStore {
20    #[must_use]
21    pub fn new(pool: SqlitePool) -> Self {
22        Self { pool }
23    }
24
25    #[must_use]
26    pub fn pool(&self) -> &SqlitePool {
27        &self.pool
28    }
29
30    // ── Entities ─────────────────────────────────────────────────────────────
31
32    /// Insert or update an entity by `(canonical_name, entity_type)`.
33    ///
34    /// - `surface_name`: the original display form (e.g. `"Rust"`) — stored in the `name` column
35    ///   so user-facing output preserves casing. Updated on every upsert to the latest seen form.
36    /// - `canonical_name`: the stable normalized key (e.g. `"rust"`) — used for deduplication.
37    /// - `summary`: pass `None` to preserve the existing summary; pass `Some("")` to blank it.
38    ///
39    /// # Errors
40    ///
41    /// Returns an error if the database query fails.
42    pub async fn upsert_entity(
43        &self,
44        surface_name: &str,
45        canonical_name: &str,
46        entity_type: EntityType,
47        summary: Option<&str>,
48    ) -> Result<i64, MemoryError> {
49        let type_str = entity_type.as_str();
50        let id: i64 = sqlx::query_scalar(
51            "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
52             VALUES (?1, ?2, ?3, ?4)
53             ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
54               name = excluded.name,
55               summary = COALESCE(excluded.summary, summary),
56               last_seen_at = datetime('now')
57             RETURNING id",
58        )
59        .bind(surface_name)
60        .bind(canonical_name)
61        .bind(type_str)
62        .bind(summary)
63        .fetch_one(&self.pool)
64        .await?;
65        Ok(id)
66    }
67
68    /// Find an entity by exact canonical name and type.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the database query fails.
73    pub async fn find_entity(
74        &self,
75        canonical_name: &str,
76        entity_type: EntityType,
77    ) -> Result<Option<Entity>, MemoryError> {
78        let type_str = entity_type.as_str();
79        let row: Option<EntityRow> = sqlx::query_as(
80            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
81             FROM graph_entities
82             WHERE canonical_name = ?1 AND entity_type = ?2",
83        )
84        .bind(canonical_name)
85        .bind(type_str)
86        .fetch_optional(&self.pool)
87        .await?;
88        row.map(entity_from_row).transpose()
89    }
90
91    /// Find an entity by its numeric ID.
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the database query fails.
96    pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
97        let row: Option<EntityRow> = sqlx::query_as(
98            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
99             FROM graph_entities
100             WHERE id = ?1",
101        )
102        .bind(entity_id)
103        .fetch_optional(&self.pool)
104        .await?;
105        row.map(entity_from_row).transpose()
106    }
107
108    /// Update the `qdrant_point_id` for an entity.
109    ///
110    /// # Errors
111    ///
112    /// Returns an error if the database query fails.
113    pub async fn set_entity_qdrant_point_id(
114        &self,
115        entity_id: i64,
116        point_id: &str,
117    ) -> Result<(), MemoryError> {
118        sqlx::query("UPDATE graph_entities SET qdrant_point_id = ?1 WHERE id = ?2")
119            .bind(point_id)
120            .bind(entity_id)
121            .execute(&self.pool)
122            .await?;
123        Ok(())
124    }
125
126    /// Find entities matching `query` in name, summary, or aliases, up to `limit` results, ranked by relevance.
127    ///
128    /// Uses FTS5 MATCH with prefix wildcards (`token*`) and bm25 ranking. Name matches are
129    /// weighted 10x higher than summary matches. Also searches `graph_entity_aliases` for
130    /// alias matches via a UNION query.
131    ///
132    /// # Behavioral note
133    ///
134    /// This replaces the previous `LIKE '%query%'` implementation. FTS5 prefix matching differs
135    /// from substring matching: searching "SQL" will match "`SQLite`" (prefix) but NOT "`GraphQL`"
136    /// (substring). Entity names are indexed as single tokens by the unicode61 tokenizer, so
137    /// mid-word substrings are not matched. This is a known trade-off for index performance.
138    ///
139    /// Single-character queries (e.g., "a") are allowed and produce a broad prefix match ("a*").
140    /// The `limit` parameter caps the result set. No minimum query length is enforced; if this
141    /// causes noise in practice, add a minimum length guard at the call site.
142    ///
143    /// # Errors
144    ///
145    /// Returns an error if the database query fails.
146    pub async fn find_entities_fuzzy(
147        &self,
148        query: &str,
149        limit: usize,
150    ) -> Result<Vec<Entity>, MemoryError> {
151        // FTS5 boolean operator keywords (case-sensitive uppercase). Filtering these
152        // prevents syntax errors when user input contains them as literal search terms
153        // (e.g., "graph OR unrelated" must not produce "graph* OR* unrelated*").
154        const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
155        let query = &query[..query.floor_char_boundary(512)];
156        // Sanitize input: split on non-alphanumeric characters, filter empty tokens,
157        // append '*' to each token for FTS5 prefix matching ("graph" -> "graph*").
158        let sanitized = sanitize_fts5_query(query);
159        if sanitized.is_empty() {
160            return Ok(vec![]);
161        }
162        let fts_query: String = sanitized
163            .split_whitespace()
164            .filter(|t| !FTS5_OPERATORS.contains(t))
165            .map(|t| format!("{t}*"))
166            .collect::<Vec<_>>()
167            .join(" ");
168        if fts_query.is_empty() {
169            return Ok(vec![]);
170        }
171
172        let limit = i64::try_from(limit)?;
173        // bm25(graph_entities_fts, 10.0, 1.0): name column weighted 10x over summary.
174        // bm25() returns negative values; ORDER BY ASC puts best matches first.
175        let rows: Vec<EntityRow> = sqlx::query_as(
176            "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary,
177                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id
178             FROM graph_entities_fts fts
179             JOIN graph_entities e ON e.id = fts.rowid
180             WHERE graph_entities_fts MATCH ?1
181             UNION
182             SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary,
183                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id
184             FROM graph_entity_aliases a
185             JOIN graph_entities e ON e.id = a.entity_id
186             WHERE a.alias_name LIKE ?2 ESCAPE '\\' COLLATE NOCASE
187             LIMIT ?3",
188        )
189        .bind(&fts_query)
190        .bind(format!(
191            "%{}%",
192            query
193                .trim()
194                .replace('\\', "\\\\")
195                .replace('%', "\\%")
196                .replace('_', "\\_")
197        ))
198        .bind(limit)
199        .fetch_all(&self.pool)
200        .await?;
201        rows.into_iter()
202            .map(entity_from_row)
203            .collect::<Result<Vec<_>, _>>()
204    }
205
206    /// Stream all entities from the database incrementally (true cursor, no full-table load).
207    pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
208        use futures::StreamExt as _;
209        sqlx::query_as::<_, EntityRow>(
210            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
211             FROM graph_entities ORDER BY id ASC",
212        )
213        .fetch(&self.pool)
214        .map(|r: Result<EntityRow, sqlx::Error>| {
215            r.map_err(MemoryError::from).and_then(entity_from_row)
216        })
217    }
218
219    // ── Alias methods ─────────────────────────────────────────────────────────
220
221    /// Insert an alias for an entity (idempotent: duplicate alias is silently ignored via UNIQUE constraint).
222    ///
223    /// # Errors
224    ///
225    /// Returns an error if the database query fails.
226    pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
227        sqlx::query(
228            "INSERT OR IGNORE INTO graph_entity_aliases (entity_id, alias_name) VALUES (?1, ?2)",
229        )
230        .bind(entity_id)
231        .bind(alias_name)
232        .execute(&self.pool)
233        .await?;
234        Ok(())
235    }
236
237    /// Find an entity by alias name and entity type (case-insensitive).
238    ///
239    /// Filters by `entity_type` to avoid cross-type alias collisions (S2 fix).
240    ///
241    /// # Errors
242    ///
243    /// Returns an error if the database query fails.
244    pub async fn find_entity_by_alias(
245        &self,
246        alias_name: &str,
247        entity_type: EntityType,
248    ) -> Result<Option<Entity>, MemoryError> {
249        let type_str = entity_type.as_str();
250        let row: Option<EntityRow> = sqlx::query_as(
251            "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary,
252                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id
253             FROM graph_entity_aliases a
254             JOIN graph_entities e ON e.id = a.entity_id
255             WHERE a.alias_name = ?1 COLLATE NOCASE
256               AND e.entity_type = ?2
257             ORDER BY e.id ASC
258             LIMIT 1",
259        )
260        .bind(alias_name)
261        .bind(type_str)
262        .fetch_optional(&self.pool)
263        .await?;
264        row.map(entity_from_row).transpose()
265    }
266
267    /// Get all aliases for an entity.
268    ///
269    /// # Errors
270    ///
271    /// Returns an error if the database query fails.
272    pub async fn aliases_for_entity(
273        &self,
274        entity_id: i64,
275    ) -> Result<Vec<EntityAlias>, MemoryError> {
276        let rows: Vec<AliasRow> = sqlx::query_as(
277            "SELECT id, entity_id, alias_name, created_at
278             FROM graph_entity_aliases
279             WHERE entity_id = ?1
280             ORDER BY id ASC",
281        )
282        .bind(entity_id)
283        .fetch_all(&self.pool)
284        .await?;
285        Ok(rows.into_iter().map(alias_from_row).collect())
286    }
287
288    /// Collect all entities into a Vec.
289    ///
290    /// # Errors
291    ///
292    /// Returns an error if the database query fails or `entity_type` parsing fails.
293    pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
294        use futures::TryStreamExt as _;
295        self.all_entities_stream().try_collect().await
296    }
297
298    /// Count the total number of entities.
299    ///
300    /// # Errors
301    ///
302    /// Returns an error if the database query fails.
303    pub async fn entity_count(&self) -> Result<i64, MemoryError> {
304        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM graph_entities")
305            .fetch_one(&self.pool)
306            .await?;
307        Ok(count)
308    }
309
310    // ── Edges ─────────────────────────────────────────────────────────────────
311
312    /// Insert a new edge between two entities, or update the existing active edge.
313    ///
314    /// An active edge is identified by `(source_entity_id, target_entity_id, relation, edge_type)`
315    /// with `valid_to IS NULL`. If such an edge already exists, its `confidence` is updated to the
316    /// maximum of the stored and incoming values, and the existing id is returned. This prevents
317    /// duplicate edges from repeated extraction of the same context messages.
318    ///
319    /// The dedup key includes `edge_type` (critic mitigation): the same `(source, target, relation)`
320    /// triple can legitimately exist with different edge types (e.g., `depends_on` can be both
321    /// Semantic and Causal). Without `edge_type` in the key, the second insertion would silently
322    /// update the first and lose the type classification.
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if the database query fails.
327    pub async fn insert_edge(
328        &self,
329        source_entity_id: i64,
330        target_entity_id: i64,
331        relation: &str,
332        fact: &str,
333        confidence: f32,
334        episode_id: Option<MessageId>,
335    ) -> Result<i64, MemoryError> {
336        self.insert_edge_typed(
337            source_entity_id,
338            target_entity_id,
339            relation,
340            fact,
341            confidence,
342            episode_id,
343            EdgeType::Semantic,
344        )
345        .await
346    }
347
348    /// Insert a typed edge between two entities, or update the existing active edge of the same type.
349    ///
350    /// Identical semantics to [`insert_edge`] but with an explicit `edge_type` parameter.
351    /// The dedup key is `(source_entity_id, target_entity_id, relation, edge_type, valid_to IS NULL)`.
352    ///
353    /// # Errors
354    ///
355    /// Returns an error if the database query fails.
356    #[allow(clippy::too_many_arguments)]
357    pub async fn insert_edge_typed(
358        &self,
359        source_entity_id: i64,
360        target_entity_id: i64,
361        relation: &str,
362        fact: &str,
363        confidence: f32,
364        episode_id: Option<MessageId>,
365        edge_type: EdgeType,
366    ) -> Result<i64, MemoryError> {
367        let confidence = confidence.clamp(0.0, 1.0);
368        let edge_type_str = edge_type.as_str();
369
370        let existing: Option<(i64, f64)> = sqlx::query_as(
371            "SELECT id, confidence FROM graph_edges
372             WHERE source_entity_id = ?1
373               AND target_entity_id = ?2
374               AND relation = ?3
375               AND edge_type = ?4
376               AND valid_to IS NULL
377             LIMIT 1",
378        )
379        .bind(source_entity_id)
380        .bind(target_entity_id)
381        .bind(relation)
382        .bind(edge_type_str)
383        .fetch_optional(&self.pool)
384        .await?;
385
386        if let Some((existing_id, stored_conf)) = existing {
387            let updated_conf = f64::from(confidence).max(stored_conf);
388            sqlx::query("UPDATE graph_edges SET confidence = ?1 WHERE id = ?2")
389                .bind(updated_conf)
390                .bind(existing_id)
391                .execute(&self.pool)
392                .await?;
393            return Ok(existing_id);
394        }
395
396        let episode_raw: Option<i64> = episode_id.map(|m| m.0);
397        let id: i64 = sqlx::query_scalar(
398            "INSERT INTO graph_edges
399             (source_entity_id, target_entity_id, relation, fact, confidence, episode_id, edge_type)
400             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
401             RETURNING id",
402        )
403        .bind(source_entity_id)
404        .bind(target_entity_id)
405        .bind(relation)
406        .bind(fact)
407        .bind(f64::from(confidence))
408        .bind(episode_raw)
409        .bind(edge_type_str)
410        .fetch_one(&self.pool)
411        .await?;
412        Ok(id)
413    }
414
415    /// Mark an edge as invalid (set `valid_to` and `expired_at` to now).
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if the database update fails.
420    pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
421        sqlx::query(
422            "UPDATE graph_edges SET valid_to = datetime('now'), expired_at = datetime('now')
423             WHERE id = ?1",
424        )
425        .bind(edge_id)
426        .execute(&self.pool)
427        .await?;
428        Ok(())
429    }
430
431    /// Get all active edges for a batch of entity IDs, with optional MAGMA edge type filtering.
432    ///
433    /// Fetches all currently-active edges (`valid_to IS NULL`) where either endpoint
434    /// is in `entity_ids`. Traversal is always current-time only (no `at_timestamp` support
435    /// in v1 — see `bfs_at_timestamp` for historical traversal).
436    ///
437    /// # `SQLite` bind limit safety
438    ///
439    /// `SQLite` limits the number of bind parameters to `SQLITE_MAX_VARIABLE_NUMBER` (999 by
440    /// default). Each entity ID requires two bind slots (source OR target), so batches are
441    /// chunked at `MAX_BATCH_ENTITIES = 490` to stay safely under the limit regardless of
442    /// compile-time `SQLite` configuration.
443    ///
444    /// # Errors
445    ///
446    /// Returns an error if the database query fails.
447    pub async fn edges_for_entities(
448        &self,
449        entity_ids: &[i64],
450        edge_types: &[super::types::EdgeType],
451    ) -> Result<Vec<Edge>, MemoryError> {
452        // Safe margin under SQLite SQLITE_MAX_VARIABLE_NUMBER (999):
453        // each entity ID uses 2 bind slots (source_entity_id OR target_entity_id).
454        // 490 * 2 = 980, leaving headroom for future query additions.
455        const MAX_BATCH_ENTITIES: usize = 490;
456
457        let mut all_edges: Vec<Edge> = Vec::new();
458
459        for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
460            let edges = self.query_batch_edges(chunk, edge_types).await?;
461            all_edges.extend(edges);
462        }
463
464        Ok(all_edges)
465    }
466
467    /// Query active edges for a single chunk of entity IDs (internal helper).
468    ///
469    /// Caller is responsible for ensuring `entity_ids.len() <= MAX_BATCH_ENTITIES`.
470    async fn query_batch_edges(
471        &self,
472        entity_ids: &[i64],
473        edge_types: &[super::types::EdgeType],
474    ) -> Result<Vec<Edge>, MemoryError> {
475        if entity_ids.is_empty() {
476            return Ok(Vec::new());
477        }
478
479        // Build a parameterized IN clause: (?1, ?2, ..., ?N).
480        // We cannot use sqlx's query_as! macro here because the placeholder count is dynamic.
481        let placeholders: String = (1..=entity_ids.len())
482            .map(|i| format!("?{i}"))
483            .collect::<Vec<_>>()
484            .join(", ");
485
486        let sql = if edge_types.is_empty() {
487            format!(
488                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
489                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
490                        edge_type
491                 FROM graph_edges
492                 WHERE valid_to IS NULL
493                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders}))"
494            )
495        } else {
496            let type_placeholders: String = (entity_ids.len() + 1
497                ..=entity_ids.len() + edge_types.len())
498                .map(|i| format!("?{i}"))
499                .collect::<Vec<_>>()
500                .join(", ");
501            format!(
502                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
503                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
504                        edge_type
505                 FROM graph_edges
506                 WHERE valid_to IS NULL
507                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders}))
508                   AND edge_type IN ({type_placeholders})"
509            )
510        };
511
512        // Bind entity IDs once — ?1..?N are reused for both IN clauses via ?NNN syntax.
513        let mut query = sqlx::query_as::<_, EdgeRow>(&sql);
514        for id in entity_ids {
515            query = query.bind(*id);
516        }
517        for et in edge_types {
518            query = query.bind(et.as_str());
519        }
520
521        let rows: Vec<EdgeRow> = query.fetch_all(&self.pool).await?;
522        Ok(rows.into_iter().map(edge_from_row).collect())
523    }
524
525    /// Get all active edges where entity is source or target.
526    ///
527    /// # Errors
528    ///
529    /// Returns an error if the database query fails.
530    pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
531        let rows: Vec<EdgeRow> = sqlx::query_as(
532            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
533                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
534                    edge_type
535             FROM graph_edges
536             WHERE valid_to IS NULL
537               AND (source_entity_id = ?1 OR target_entity_id = ?1)",
538        )
539        .bind(entity_id)
540        .fetch_all(&self.pool)
541        .await?;
542        Ok(rows.into_iter().map(edge_from_row).collect())
543    }
544
545    /// Get all edges (active and expired) where entity is source or target, ordered by
546    /// `valid_from DESC`. Used by the `/graph history <name>` slash command.
547    ///
548    /// # Errors
549    ///
550    /// Returns an error if the database query fails or if `limit` overflows `i64`.
551    pub async fn edge_history_for_entity(
552        &self,
553        entity_id: i64,
554        limit: usize,
555    ) -> Result<Vec<Edge>, MemoryError> {
556        let limit = i64::try_from(limit)?;
557        let rows: Vec<EdgeRow> = sqlx::query_as(
558            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
559                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
560                    edge_type
561             FROM graph_edges
562             WHERE source_entity_id = ?1 OR target_entity_id = ?1
563             ORDER BY valid_from DESC
564             LIMIT ?2",
565        )
566        .bind(entity_id)
567        .bind(limit)
568        .fetch_all(&self.pool)
569        .await?;
570        Ok(rows.into_iter().map(edge_from_row).collect())
571    }
572
573    /// Get all active edges between two entities (both directions).
574    ///
575    /// # Errors
576    ///
577    /// Returns an error if the database query fails.
578    pub async fn edges_between(
579        &self,
580        entity_a: i64,
581        entity_b: i64,
582    ) -> Result<Vec<Edge>, MemoryError> {
583        let rows: Vec<EdgeRow> = sqlx::query_as(
584            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
585                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
586                    edge_type
587             FROM graph_edges
588             WHERE valid_to IS NULL
589               AND ((source_entity_id = ?1 AND target_entity_id = ?2)
590                 OR (source_entity_id = ?2 AND target_entity_id = ?1))",
591        )
592        .bind(entity_a)
593        .bind(entity_b)
594        .fetch_all(&self.pool)
595        .await?;
596        Ok(rows.into_iter().map(edge_from_row).collect())
597    }
598
599    /// Get active edges from `source` to `target` in the exact direction (no reverse).
600    ///
601    /// # Errors
602    ///
603    /// Returns an error if the database query fails.
604    pub async fn edges_exact(
605        &self,
606        source_entity_id: i64,
607        target_entity_id: i64,
608    ) -> Result<Vec<Edge>, MemoryError> {
609        let rows: Vec<EdgeRow> = sqlx::query_as(
610            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
611                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
612                    edge_type
613             FROM graph_edges
614             WHERE valid_to IS NULL
615               AND source_entity_id = ?1
616               AND target_entity_id = ?2",
617        )
618        .bind(source_entity_id)
619        .bind(target_entity_id)
620        .fetch_all(&self.pool)
621        .await?;
622        Ok(rows.into_iter().map(edge_from_row).collect())
623    }
624
625    /// Count active (non-invalidated) edges.
626    ///
627    /// # Errors
628    ///
629    /// Returns an error if the database query fails.
630    pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
631        let count: i64 =
632            sqlx::query_scalar("SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL")
633                .fetch_one(&self.pool)
634                .await?;
635        Ok(count)
636    }
637
638    /// Return per-type active edge counts as `(edge_type, count)` pairs.
639    ///
640    /// # Errors
641    ///
642    /// Returns an error if the database query fails.
643    pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
644        let rows: Vec<(String, i64)> = sqlx::query_as(
645            "SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type",
646        )
647        .fetch_all(&self.pool)
648        .await?;
649        Ok(rows)
650    }
651
652    // ── Communities ───────────────────────────────────────────────────────────
653
654    /// Insert or update a community by name.
655    ///
656    /// `fingerprint` is a BLAKE3 hex string computed from sorted entity IDs and
657    /// intra-community edge IDs. Pass `None` to leave the fingerprint unchanged (e.g. when
658    /// `assign_to_community` adds an entity without a full re-detection pass).
659    ///
660    /// # Errors
661    ///
662    /// Returns an error if the database query fails or JSON serialization fails.
663    pub async fn upsert_community(
664        &self,
665        name: &str,
666        summary: &str,
667        entity_ids: &[i64],
668        fingerprint: Option<&str>,
669    ) -> Result<i64, MemoryError> {
670        let entity_ids_json = serde_json::to_string(entity_ids)?;
671        let id: i64 = sqlx::query_scalar(
672            "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
673             VALUES (?1, ?2, ?3, ?4)
674             ON CONFLICT(name) DO UPDATE SET
675               summary = excluded.summary,
676               entity_ids = excluded.entity_ids,
677               fingerprint = COALESCE(excluded.fingerprint, fingerprint),
678               updated_at = datetime('now')
679             RETURNING id",
680        )
681        .bind(name)
682        .bind(summary)
683        .bind(entity_ids_json)
684        .bind(fingerprint)
685        .fetch_one(&self.pool)
686        .await?;
687        Ok(id)
688    }
689
690    /// Return a map of `fingerprint -> community_id` for all communities with a non-NULL
691    /// fingerprint. Used by `detect_communities` to skip unchanged partitions.
692    ///
693    /// # Errors
694    ///
695    /// Returns an error if the database query fails.
696    pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
697        let rows: Vec<(String, i64)> = sqlx::query_as(
698            "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL",
699        )
700        .fetch_all(&self.pool)
701        .await?;
702        Ok(rows.into_iter().collect())
703    }
704
705    /// Delete a single community by its primary key.
706    ///
707    /// # Errors
708    ///
709    /// Returns an error if the database query fails.
710    pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
711        sqlx::query("DELETE FROM graph_communities WHERE id = ?1")
712            .bind(id)
713            .execute(&self.pool)
714            .await?;
715        Ok(())
716    }
717
718    /// Set the fingerprint of a community to `NULL`, invalidating the incremental cache.
719    ///
720    /// Used by `assign_to_community` when an entity is added without a full re-detection pass,
721    /// ensuring the next `detect_communities` run re-summarizes the affected community.
722    ///
723    /// # Errors
724    ///
725    /// Returns an error if the database query fails.
726    pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
727        sqlx::query("UPDATE graph_communities SET fingerprint = NULL WHERE id = ?1")
728            .bind(id)
729            .execute(&self.pool)
730            .await?;
731        Ok(())
732    }
733
734    /// Find the first community that contains the given `entity_id`.
735    ///
736    /// Uses `json_each()` to push the membership search into `SQLite`, avoiding a full
737    /// table scan with per-row JSON parsing.
738    ///
739    /// # Errors
740    ///
741    /// Returns an error if the database query fails or JSON parsing fails.
742    pub async fn community_for_entity(
743        &self,
744        entity_id: i64,
745    ) -> Result<Option<Community>, MemoryError> {
746        let row: Option<CommunityRow> = sqlx::query_as(
747            "SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
748             FROM graph_communities c, json_each(c.entity_ids) j
749             WHERE CAST(j.value AS INTEGER) = ?1
750             LIMIT 1",
751        )
752        .bind(entity_id)
753        .fetch_optional(&self.pool)
754        .await?;
755        match row {
756            Some(row) => {
757                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
758                Ok(Some(Community {
759                    id: row.id,
760                    name: row.name,
761                    summary: row.summary,
762                    entity_ids,
763                    fingerprint: row.fingerprint,
764                    created_at: row.created_at,
765                    updated_at: row.updated_at,
766                }))
767            }
768            None => Ok(None),
769        }
770    }
771
772    /// Get all communities.
773    ///
774    /// # Errors
775    ///
776    /// Returns an error if the database query fails or JSON parsing fails.
777    pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
778        let rows: Vec<CommunityRow> = sqlx::query_as(
779            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
780             FROM graph_communities
781             ORDER BY id ASC",
782        )
783        .fetch_all(&self.pool)
784        .await?;
785
786        rows.into_iter()
787            .map(|row| {
788                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
789                Ok(Community {
790                    id: row.id,
791                    name: row.name,
792                    summary: row.summary,
793                    entity_ids,
794                    fingerprint: row.fingerprint,
795                    created_at: row.created_at,
796                    updated_at: row.updated_at,
797                })
798            })
799            .collect()
800    }
801
802    /// Count the total number of communities.
803    ///
804    /// # Errors
805    ///
806    /// Returns an error if the database query fails.
807    pub async fn community_count(&self) -> Result<i64, MemoryError> {
808        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM graph_communities")
809            .fetch_one(&self.pool)
810            .await?;
811        Ok(count)
812    }
813
814    // ── Metadata ──────────────────────────────────────────────────────────────
815
816    /// Get a metadata value by key.
817    ///
818    /// # Errors
819    ///
820    /// Returns an error if the database query fails.
821    pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
822        let val: Option<String> =
823            sqlx::query_scalar("SELECT value FROM graph_metadata WHERE key = ?1")
824                .bind(key)
825                .fetch_optional(&self.pool)
826                .await?;
827        Ok(val)
828    }
829
830    /// Set a metadata value by key (upsert).
831    ///
832    /// # Errors
833    ///
834    /// Returns an error if the database query fails.
835    pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
836        sqlx::query(
837            "INSERT INTO graph_metadata (key, value) VALUES (?1, ?2)
838             ON CONFLICT(key) DO UPDATE SET value = excluded.value",
839        )
840        .bind(key)
841        .bind(value)
842        .execute(&self.pool)
843        .await?;
844        Ok(())
845    }
846
847    /// Get the current extraction count from metadata.
848    ///
849    /// Returns 0 if the counter has not been initialized.
850    ///
851    /// # Errors
852    ///
853    /// Returns an error if the database query fails.
854    pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
855        let val = self.get_metadata("extraction_count").await?;
856        Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
857    }
858
859    /// Stream all active (non-invalidated) edges.
860    pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
861        use futures::StreamExt as _;
862        sqlx::query_as::<_, EdgeRow>(
863            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
864                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
865                    edge_type
866             FROM graph_edges
867             WHERE valid_to IS NULL
868             ORDER BY id ASC",
869        )
870        .fetch(&self.pool)
871        .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
872    }
873
874    /// Fetch a chunk of active edges using keyset pagination.
875    ///
876    /// Returns edges with `id > after_id` in ascending order, up to `limit` rows.
877    /// Starting with `after_id = 0` returns the first chunk. Pass the last `id` from
878    /// the returned chunk as `after_id` for the next page. An empty result means all
879    /// edges have been consumed.
880    ///
881    /// Keyset pagination is O(1) per page (index seek on `id`) vs OFFSET which is O(N).
882    /// It is also stable under concurrent inserts: new edges get monotonically higher IDs
883    /// and will appear in subsequent chunks or after the last chunk, never causing
884    /// duplicates. Concurrent invalidations (setting `valid_to`) may cause a single edge
885    /// to be skipped, which is acceptable — LPA operates on an eventual-consistency snapshot.
886    ///
887    /// # Errors
888    ///
889    /// Returns an error if the database query fails.
890    pub async fn edges_after_id(
891        &self,
892        after_id: i64,
893        limit: i64,
894    ) -> Result<Vec<Edge>, MemoryError> {
895        let rows: Vec<EdgeRow> = sqlx::query_as(
896            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
897                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
898                    edge_type
899             FROM graph_edges
900             WHERE valid_to IS NULL AND id > ?1
901             ORDER BY id ASC
902             LIMIT ?2",
903        )
904        .bind(after_id)
905        .bind(limit)
906        .fetch_all(&self.pool)
907        .await?;
908        Ok(rows.into_iter().map(edge_from_row).collect())
909    }
910
911    /// Find a community by its primary key.
912    ///
913    /// # Errors
914    ///
915    /// Returns an error if the database query fails or JSON parsing fails.
916    pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
917        let row: Option<CommunityRow> = sqlx::query_as(
918            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
919             FROM graph_communities
920             WHERE id = ?1",
921        )
922        .bind(id)
923        .fetch_optional(&self.pool)
924        .await?;
925        match row {
926            Some(row) => {
927                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
928                Ok(Some(Community {
929                    id: row.id,
930                    name: row.name,
931                    summary: row.summary,
932                    entity_ids,
933                    fingerprint: row.fingerprint,
934                    created_at: row.created_at,
935                    updated_at: row.updated_at,
936                }))
937            }
938            None => Ok(None),
939        }
940    }
941
942    /// Delete all communities (full rebuild before upsert).
943    ///
944    /// # Errors
945    ///
946    /// Returns an error if the database query fails.
947    pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
948        sqlx::query("DELETE FROM graph_communities")
949            .execute(&self.pool)
950            .await?;
951        Ok(())
952    }
953
954    /// Delete expired edges older than `retention_days` and return count deleted.
955    ///
956    /// # Errors
957    ///
958    /// Returns an error if the database query fails.
959    pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
960        let days = i64::from(retention_days);
961        let result = sqlx::query(
962            "DELETE FROM graph_edges
963             WHERE expired_at IS NOT NULL
964               AND expired_at < datetime('now', '-' || ?1 || ' days')",
965        )
966        .bind(days)
967        .execute(&self.pool)
968        .await?;
969        Ok(usize::try_from(result.rows_affected())?)
970    }
971
972    /// Delete orphan entities (no active edges, last seen more than `retention_days` ago).
973    ///
974    /// # Errors
975    ///
976    /// Returns an error if the database query fails.
977    pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
978        let days = i64::from(retention_days);
979        let result = sqlx::query(
980            "DELETE FROM graph_entities
981             WHERE id NOT IN (
982                 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
983                 UNION
984                 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
985             )
986             AND last_seen_at < datetime('now', '-' || ?1 || ' days')",
987        )
988        .bind(days)
989        .execute(&self.pool)
990        .await?;
991        Ok(usize::try_from(result.rows_affected())?)
992    }
993
994    /// Delete the oldest excess entities when count exceeds `max_entities`.
995    ///
996    /// Entities are ranked by ascending edge count, then ascending `last_seen_at` (LRU).
997    /// Only deletes when `entity_count() > max_entities`.
998    ///
999    /// # Errors
1000    ///
1001    /// Returns an error if the database query fails.
1002    pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1003        let current = self.entity_count().await?;
1004        let max = i64::try_from(max_entities)?;
1005        if current <= max {
1006            return Ok(0);
1007        }
1008        let excess = current - max;
1009        let result = sqlx::query(
1010            "DELETE FROM graph_entities
1011             WHERE id IN (
1012                 SELECT e.id
1013                 FROM graph_entities e
1014                 LEFT JOIN (
1015                     SELECT source_entity_id AS eid, COUNT(*) AS cnt
1016                     FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1017                     UNION ALL
1018                     SELECT target_entity_id AS eid, COUNT(*) AS cnt
1019                     FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1020                 ) edge_counts ON e.id = edge_counts.eid
1021                 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1022                 LIMIT ?1
1023             )",
1024        )
1025        .bind(excess)
1026        .execute(&self.pool)
1027        .await?;
1028        Ok(usize::try_from(result.rows_affected())?)
1029    }
1030
1031    // ── Temporal Edge Queries ─────────────────────────────────────────────────
1032
1033    /// Return all edges for `entity_id` (as source or target) that were valid at `timestamp`.
1034    ///
1035    /// An edge is valid at `timestamp` when:
1036    /// - `valid_from <= timestamp`, AND
1037    /// - `valid_to IS NULL` (open-ended) OR `valid_to > timestamp`.
1038    ///
1039    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1040    ///
1041    /// # Errors
1042    ///
1043    /// Returns an error if the database query fails.
1044    pub async fn edges_at_timestamp(
1045        &self,
1046        entity_id: i64,
1047        timestamp: &str,
1048    ) -> Result<Vec<Edge>, MemoryError> {
1049        // Split into two UNIONed branches to leverage the partial indexes from migration 030:
1050        //   Branch 1 (active edges):     idx_graph_edges_valid + idx_graph_edges_source/target
1051        //   Branch 2 (historical edges): idx_graph_edges_src_temporal / idx_graph_edges_tgt_temporal
1052        let rows: Vec<EdgeRow> = sqlx::query_as(
1053            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1054                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1055                    edge_type
1056             FROM graph_edges
1057             WHERE valid_to IS NULL
1058               AND valid_from <= ?2
1059               AND (source_entity_id = ?1 OR target_entity_id = ?1)
1060             UNION ALL
1061             SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1062                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1063                    edge_type
1064             FROM graph_edges
1065             WHERE valid_to IS NOT NULL
1066               AND valid_from <= ?2
1067               AND valid_to > ?2
1068               AND (source_entity_id = ?1 OR target_entity_id = ?1)",
1069        )
1070        .bind(entity_id)
1071        .bind(timestamp)
1072        .fetch_all(&self.pool)
1073        .await?;
1074        Ok(rows.into_iter().map(edge_from_row).collect())
1075    }
1076
1077    /// Return all edge versions (active and expired) for the given `(source, predicate)` pair.
1078    ///
1079    /// The optional `relation` filter restricts results to a specific relation label.
1080    /// Results are ordered by `valid_from DESC` (most recent first).
1081    ///
1082    /// # Errors
1083    ///
1084    /// Returns an error if the database query fails.
1085    pub async fn edge_history(
1086        &self,
1087        source_entity_id: i64,
1088        predicate: &str,
1089        relation: Option<&str>,
1090        limit: usize,
1091    ) -> Result<Vec<Edge>, MemoryError> {
1092        // Escape LIKE wildcards so `%` and `_` in the predicate are treated as literals.
1093        let escaped = predicate
1094            .replace('\\', "\\\\")
1095            .replace('%', "\\%")
1096            .replace('_', "\\_");
1097        let like_pattern = format!("%{escaped}%");
1098        let limit = i64::try_from(limit)?;
1099        let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1100            sqlx::query_as(
1101                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1102                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1103                        edge_type
1104                 FROM graph_edges
1105                 WHERE source_entity_id = ?1
1106                   AND fact LIKE ?2 ESCAPE '\\'
1107                   AND relation = ?3
1108                 ORDER BY valid_from DESC
1109                 LIMIT ?4",
1110            )
1111            .bind(source_entity_id)
1112            .bind(&like_pattern)
1113            .bind(rel)
1114            .bind(limit)
1115            .fetch_all(&self.pool)
1116            .await?
1117        } else {
1118            sqlx::query_as(
1119                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1120                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1121                        edge_type
1122                 FROM graph_edges
1123                 WHERE source_entity_id = ?1
1124                   AND fact LIKE ?2 ESCAPE '\\'
1125                 ORDER BY valid_from DESC
1126                 LIMIT ?3",
1127            )
1128            .bind(source_entity_id)
1129            .bind(&like_pattern)
1130            .bind(limit)
1131            .fetch_all(&self.pool)
1132            .await?
1133        };
1134        Ok(rows.into_iter().map(edge_from_row).collect())
1135    }
1136
1137    // ── BFS Traversal ─────────────────────────────────────────────────────────
1138
1139    /// Breadth-first traversal from `start_entity_id` up to `max_hops` hops.
1140    ///
1141    /// Returns all reachable entities and the active edges connecting them.
1142    /// Implements BFS iteratively in Rust to guarantee cycle safety regardless
1143    /// of `SQLite` CTE limitations.
1144    ///
1145    /// **`SQLite` bind parameter limit**: each BFS hop binds the frontier IDs three times in the
1146    /// neighbour query. At ~300+ frontier entities per hop, the IN clause may approach `SQLite`'s
1147    /// default `SQLITE_MAX_VARIABLE_NUMBER` limit of 999. Acceptable for Phase 1 (small graphs,
1148    /// `max_hops` typically 2–3). For large graphs, consider batching or a temp-table approach.
1149    ///
1150    /// # Errors
1151    ///
1152    /// Returns an error if any database query fails.
1153    pub async fn bfs(
1154        &self,
1155        start_entity_id: i64,
1156        max_hops: u32,
1157    ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1158        self.bfs_with_depth(start_entity_id, max_hops)
1159            .await
1160            .map(|(e, ed, _)| (e, ed))
1161    }
1162
1163    /// BFS traversal returning entities, edges, and a depth map (`entity_id` → hop distance).
1164    ///
1165    /// The depth map records the minimum hop distance from `start_entity_id` to each visited
1166    /// entity. The start entity itself has depth 0.
1167    ///
1168    /// **`SQLite` bind parameter limit**: see [`bfs`] for notes on frontier size limits.
1169    ///
1170    /// # Errors
1171    ///
1172    /// Returns an error if any database query fails.
1173    pub async fn bfs_with_depth(
1174        &self,
1175        start_entity_id: i64,
1176        max_hops: u32,
1177    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1178        self.bfs_core(start_entity_id, max_hops, None).await
1179    }
1180
1181    /// BFS traversal considering only edges that were valid at `timestamp`.
1182    ///
1183    /// Equivalent to [`bfs_with_depth`] but replaces the `valid_to IS NULL` filter with
1184    /// the temporal range predicate `valid_from <= ts AND (valid_to IS NULL OR valid_to > ts)`.
1185    ///
1186    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1187    ///
1188    /// # Errors
1189    ///
1190    /// Returns an error if any database query fails.
1191    pub async fn bfs_at_timestamp(
1192        &self,
1193        start_entity_id: i64,
1194        max_hops: u32,
1195        timestamp: &str,
1196    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1197        self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1198            .await
1199    }
1200
1201    /// BFS traversal scoped to specific MAGMA edge types.
1202    ///
1203    /// When `edge_types` is empty, behaves identically to [`bfs_with_depth`] (traverses all
1204    /// active edges). When `edge_types` is non-empty, only traverses edges whose `edge_type`
1205    /// matches one of the provided types.
1206    ///
1207    /// This enables subgraph-scoped retrieval: a causal query traverses only causal + semantic
1208    /// edges, a temporal query only temporal + semantic edges, etc.
1209    ///
1210    /// Note: Semantic is typically included in `edge_types` by the caller to ensure recall is
1211    /// never worse than the untyped BFS. See `classify_graph_subgraph` in `router.rs`.
1212    ///
1213    /// # Errors
1214    ///
1215    /// Returns an error if any database query fails.
1216    pub async fn bfs_typed(
1217        &self,
1218        start_entity_id: i64,
1219        max_hops: u32,
1220        edge_types: &[EdgeType],
1221    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1222        if edge_types.is_empty() {
1223            return self.bfs_with_depth(start_entity_id, max_hops).await;
1224        }
1225        self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1226            .await
1227    }
1228
1229    /// Shared BFS implementation.
1230    ///
1231    /// When `at_timestamp` is `None`, only active edges (`valid_to IS NULL`) are traversed.
1232    /// When `at_timestamp` is `Some(ts)`, edges valid at `ts` are traversed (temporal BFS).
1233    ///
1234    /// All IDs used in dynamic SQL come from our own database — no user input reaches the
1235    /// format string, so there is no SQL injection risk.
1236    async fn bfs_core(
1237        &self,
1238        start_entity_id: i64,
1239        max_hops: u32,
1240        at_timestamp: Option<&str>,
1241    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1242        use std::collections::HashMap;
1243
1244        // SQLite binds frontier IDs 3× per hop; at >333 IDs the IN clause exceeds
1245        // SQLITE_MAX_VARIABLE_NUMBER (999). Cap to 300 to stay safely within the limit.
1246        const MAX_FRONTIER: usize = 300;
1247
1248        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1249        let mut frontier: Vec<i64> = vec![start_entity_id];
1250        depth_map.insert(start_entity_id, 0);
1251
1252        for hop in 0..max_hops {
1253            if frontier.is_empty() {
1254                break;
1255            }
1256            frontier.truncate(MAX_FRONTIER);
1257            // IDs come from our own DB — no user input, no injection risk.
1258            let placeholders = frontier
1259                .iter()
1260                .enumerate()
1261                .map(|(i, _)| format!("?{}", i + 1))
1262                .collect::<Vec<_>>()
1263                .join(", ");
1264            let edge_filter = if at_timestamp.is_some() {
1265                let ts_pos = frontier.len() * 3 + 1;
1266                format!("valid_from <= ?{ts_pos} AND (valid_to IS NULL OR valid_to > ?{ts_pos})")
1267            } else {
1268                "valid_to IS NULL".to_owned()
1269            };
1270            let neighbour_sql = format!(
1271                "SELECT DISTINCT CASE
1272                     WHEN source_entity_id IN ({placeholders}) THEN target_entity_id
1273                     ELSE source_entity_id
1274                 END as neighbour_id
1275                 FROM graph_edges
1276                 WHERE {edge_filter}
1277                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders}))"
1278            );
1279            let mut q = sqlx::query_scalar::<_, i64>(&neighbour_sql);
1280            for id in &frontier {
1281                q = q.bind(*id);
1282            }
1283            for id in &frontier {
1284                q = q.bind(*id);
1285            }
1286            for id in &frontier {
1287                q = q.bind(*id);
1288            }
1289            if let Some(ts) = at_timestamp {
1290                q = q.bind(ts);
1291            }
1292            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1293            let mut next_frontier: Vec<i64> = Vec::new();
1294            for nbr in neighbours {
1295                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1296                    e.insert(hop + 1);
1297                    next_frontier.push(nbr);
1298                }
1299            }
1300            frontier = next_frontier;
1301        }
1302
1303        self.bfs_fetch_results(depth_map, at_timestamp).await
1304    }
1305
1306    /// BFS implementation scoped to specific edge types.
1307    ///
1308    /// Builds the IN clause for `edge_type` filtering dynamically from enum values.
1309    /// All enum-derived strings come from `EdgeType::as_str()` — no user input reaches SQL.
1310    ///
1311    /// # Errors
1312    ///
1313    /// Returns an error if any database query fails.
1314    async fn bfs_core_typed(
1315        &self,
1316        start_entity_id: i64,
1317        max_hops: u32,
1318        at_timestamp: Option<&str>,
1319        edge_types: &[EdgeType],
1320    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1321        use std::collections::HashMap;
1322
1323        const MAX_FRONTIER: usize = 300;
1324
1325        let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1326
1327        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1328        let mut frontier: Vec<i64> = vec![start_entity_id];
1329        depth_map.insert(start_entity_id, 0);
1330
1331        let n_types = type_strs.len();
1332        // type_in is constant for the entire BFS — positions ?1..?n_types never change.
1333        let type_in = (1..=n_types)
1334            .map(|i| format!("?{i}"))
1335            .collect::<Vec<_>>()
1336            .join(", ");
1337        let id_start = n_types + 1;
1338
1339        for hop in 0..max_hops {
1340            if frontier.is_empty() {
1341                break;
1342            }
1343            frontier.truncate(MAX_FRONTIER);
1344
1345            let n_frontier = frontier.len();
1346            // Positions: types first (?1..?n_types), then 3 copies of frontier IDs
1347            let frontier_placeholders = frontier
1348                .iter()
1349                .enumerate()
1350                .map(|(i, _)| format!("?{}", id_start + i))
1351                .collect::<Vec<_>>()
1352                .join(", ");
1353
1354            let edge_filter = if at_timestamp.is_some() {
1355                let ts_pos = id_start + n_frontier * 3;
1356                format!(
1357                    "edge_type IN ({type_in}) AND valid_from <= ?{ts_pos} AND (valid_to IS NULL OR valid_to > ?{ts_pos})"
1358                )
1359            } else {
1360                format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1361            };
1362
1363            let neighbour_sql = format!(
1364                "SELECT DISTINCT CASE
1365                     WHEN source_entity_id IN ({frontier_placeholders}) THEN target_entity_id
1366                     ELSE source_entity_id
1367                 END as neighbour_id
1368                 FROM graph_edges
1369                 WHERE {edge_filter}
1370                   AND (source_entity_id IN ({frontier_placeholders}) OR target_entity_id IN ({frontier_placeholders}))"
1371            );
1372
1373            let mut q = sqlx::query_scalar::<_, i64>(&neighbour_sql);
1374            // Bind types first
1375            for t in &type_strs {
1376                q = q.bind(*t);
1377            }
1378            // Bind frontier 3 times
1379            for id in &frontier {
1380                q = q.bind(*id);
1381            }
1382            for id in &frontier {
1383                q = q.bind(*id);
1384            }
1385            for id in &frontier {
1386                q = q.bind(*id);
1387            }
1388            if let Some(ts) = at_timestamp {
1389                q = q.bind(ts);
1390            }
1391
1392            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1393            let mut next_frontier: Vec<i64> = Vec::new();
1394            for nbr in neighbours {
1395                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1396                    e.insert(hop + 1);
1397                    next_frontier.push(nbr);
1398                }
1399            }
1400            frontier = next_frontier;
1401        }
1402
1403        // Fetch results — pass edge_type filter to bfs_fetch_results_typed
1404        self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1405            .await
1406    }
1407
1408    /// Fetch entities and typed edges for a completed BFS depth map.
1409    ///
1410    /// Filters returned edges by the provided `edge_type` strings.
1411    ///
1412    /// # Errors
1413    ///
1414    /// Returns an error if any database query fails.
1415    async fn bfs_fetch_results_typed(
1416        &self,
1417        depth_map: std::collections::HashMap<i64, u32>,
1418        at_timestamp: Option<&str>,
1419        type_strs: &[&str],
1420    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1421        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1422        if visited_ids.is_empty() {
1423            return Ok((Vec::new(), Vec::new(), depth_map));
1424        }
1425        if visited_ids.len() > 499 {
1426            tracing::warn!(
1427                total = visited_ids.len(),
1428                retained = 499,
1429                "bfs_fetch_results_typed: visited entity set truncated to 499"
1430            );
1431            visited_ids.truncate(499);
1432        }
1433
1434        let n_types = type_strs.len();
1435        let n_visited = visited_ids.len();
1436
1437        // Bind order: types first, then visited_ids twice, then optional timestamp
1438        let type_in = (1..=n_types)
1439            .map(|i| format!("?{i}"))
1440            .collect::<Vec<_>>()
1441            .join(", ");
1442        let id_start = n_types + 1;
1443        let placeholders = visited_ids
1444            .iter()
1445            .enumerate()
1446            .map(|(i, _)| format!("?{}", id_start + i))
1447            .collect::<Vec<_>>()
1448            .join(", ");
1449
1450        let edge_filter = if at_timestamp.is_some() {
1451            let ts_pos = id_start + n_visited * 2;
1452            format!(
1453                "edge_type IN ({type_in}) AND valid_from <= ?{ts_pos} AND (valid_to IS NULL OR valid_to > ?{ts_pos})"
1454            )
1455        } else {
1456            format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1457        };
1458
1459        let edge_sql = format!(
1460            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1461                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1462                    edge_type
1463             FROM graph_edges
1464             WHERE {edge_filter}
1465               AND source_entity_id IN ({placeholders})
1466               AND target_entity_id IN ({placeholders})"
1467        );
1468        let mut edge_query = sqlx::query_as::<_, EdgeRow>(&edge_sql);
1469        for t in type_strs {
1470            edge_query = edge_query.bind(*t);
1471        }
1472        for id in &visited_ids {
1473            edge_query = edge_query.bind(*id);
1474        }
1475        for id in &visited_ids {
1476            edge_query = edge_query.bind(*id);
1477        }
1478        if let Some(ts) = at_timestamp {
1479            edge_query = edge_query.bind(ts);
1480        }
1481        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1482
1483        // For entity query, use plain sequential bind positions (no type prefix offset)
1484        let entity_sql2 = {
1485            let ph = visited_ids
1486                .iter()
1487                .enumerate()
1488                .map(|(i, _)| format!("?{}", i + 1))
1489                .collect::<Vec<_>>()
1490                .join(", ");
1491            format!(
1492                "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1493                 FROM graph_entities WHERE id IN ({ph})"
1494            )
1495        };
1496        let mut entity_query = sqlx::query_as::<_, EntityRow>(&entity_sql2);
1497        for id in &visited_ids {
1498            entity_query = entity_query.bind(*id);
1499        }
1500        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1501
1502        let entities: Vec<Entity> = entity_rows
1503            .into_iter()
1504            .map(entity_from_row)
1505            .collect::<Result<Vec<_>, _>>()?;
1506        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1507
1508        Ok((entities, edges, depth_map))
1509    }
1510
1511    /// Fetch entities and edges for a completed BFS depth map.
1512    async fn bfs_fetch_results(
1513        &self,
1514        depth_map: std::collections::HashMap<i64, u32>,
1515        at_timestamp: Option<&str>,
1516    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1517        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1518        if visited_ids.is_empty() {
1519            return Ok((Vec::new(), Vec::new(), depth_map));
1520        }
1521        // Edge query binds visited_ids twice — cap at 499 to stay under SQLite 999 limit.
1522        if visited_ids.len() > 499 {
1523            tracing::warn!(
1524                total = visited_ids.len(),
1525                retained = 499,
1526                "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1527                 some reachable entities will be dropped from results"
1528            );
1529            visited_ids.truncate(499);
1530        }
1531
1532        let placeholders = visited_ids
1533            .iter()
1534            .enumerate()
1535            .map(|(i, _)| format!("?{}", i + 1))
1536            .collect::<Vec<_>>()
1537            .join(", ");
1538        let edge_filter = if at_timestamp.is_some() {
1539            let ts_pos = visited_ids.len() * 2 + 1;
1540            format!("valid_from <= ?{ts_pos} AND (valid_to IS NULL OR valid_to > ?{ts_pos})")
1541        } else {
1542            "valid_to IS NULL".to_owned()
1543        };
1544        let edge_sql = format!(
1545            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1546                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1547                    edge_type
1548             FROM graph_edges
1549             WHERE {edge_filter}
1550               AND source_entity_id IN ({placeholders})
1551               AND target_entity_id IN ({placeholders})"
1552        );
1553        let mut edge_query = sqlx::query_as::<_, EdgeRow>(&edge_sql);
1554        for id in &visited_ids {
1555            edge_query = edge_query.bind(*id);
1556        }
1557        for id in &visited_ids {
1558            edge_query = edge_query.bind(*id);
1559        }
1560        if let Some(ts) = at_timestamp {
1561            edge_query = edge_query.bind(ts);
1562        }
1563        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1564
1565        let entity_sql = format!(
1566            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1567             FROM graph_entities WHERE id IN ({placeholders})"
1568        );
1569        let mut entity_query = sqlx::query_as::<_, EntityRow>(&entity_sql);
1570        for id in &visited_ids {
1571            entity_query = entity_query.bind(*id);
1572        }
1573        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1574
1575        let entities: Vec<Entity> = entity_rows
1576            .into_iter()
1577            .map(entity_from_row)
1578            .collect::<Result<Vec<_>, _>>()?;
1579        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1580
1581        Ok((entities, edges, depth_map))
1582    }
1583
1584    // ── Backfill helpers ──────────────────────────────────────────────────────
1585
1586    /// Find an entity by name only (no type filter).
1587    ///
1588    /// Uses a two-phase lookup to ensure exact name matches are always prioritised:
1589    /// 1. Exact case-insensitive match on `name` or `canonical_name`.
1590    /// 2. If no exact match found, falls back to FTS5 prefix search (see `find_entities_fuzzy`).
1591    ///
1592    /// This prevents FTS5 from returning a different entity whose *summary* mentions the
1593    /// searched name (e.g. searching "Alice" returning "Google" because Google's summary
1594    /// contains "Alice").
1595    ///
1596    /// # Errors
1597    ///
1598    /// Returns an error if the database query fails.
1599    pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
1600        let rows: Vec<EntityRow> = sqlx::query_as(
1601            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1602             FROM graph_entities
1603             WHERE name = ?1 COLLATE NOCASE OR canonical_name = ?1 COLLATE NOCASE
1604             LIMIT 5",
1605        )
1606        .bind(name)
1607        .fetch_all(&self.pool)
1608        .await?;
1609
1610        if !rows.is_empty() {
1611            return rows.into_iter().map(entity_from_row).collect();
1612        }
1613
1614        self.find_entities_fuzzy(name, 5).await
1615    }
1616
1617    /// Return up to `limit` messages that have not yet been processed by graph extraction.
1618    ///
1619    /// Reads the `graph_processed` column added by migration 021.
1620    ///
1621    /// # Errors
1622    ///
1623    /// Returns an error if the database query fails.
1624    pub async fn unprocessed_messages_for_backfill(
1625        &self,
1626        limit: usize,
1627    ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
1628        let limit = i64::try_from(limit)?;
1629        let rows: Vec<(i64, String)> = sqlx::query_as(
1630            "SELECT id, content FROM messages
1631             WHERE graph_processed = 0
1632             ORDER BY id ASC
1633             LIMIT ?1",
1634        )
1635        .bind(limit)
1636        .fetch_all(&self.pool)
1637        .await?;
1638        Ok(rows
1639            .into_iter()
1640            .map(|(id, content)| (crate::types::MessageId(id), content))
1641            .collect())
1642    }
1643
1644    /// Return the count of messages not yet processed by graph extraction.
1645    ///
1646    /// # Errors
1647    ///
1648    /// Returns an error if the database query fails.
1649    pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
1650        let count: i64 =
1651            sqlx::query_scalar("SELECT COUNT(*) FROM messages WHERE graph_processed = 0")
1652                .fetch_one(&self.pool)
1653                .await?;
1654        Ok(count)
1655    }
1656
1657    /// Mark a batch of messages as graph-processed.
1658    ///
1659    /// # Errors
1660    ///
1661    /// Returns an error if the database query fails.
1662    pub async fn mark_messages_graph_processed(
1663        &self,
1664        ids: &[crate::types::MessageId],
1665    ) -> Result<(), MemoryError> {
1666        if ids.is_empty() {
1667            return Ok(());
1668        }
1669        let placeholders = ids
1670            .iter()
1671            .enumerate()
1672            .map(|(i, _)| format!("?{}", i + 1))
1673            .collect::<Vec<_>>()
1674            .join(", ");
1675        let sql = format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
1676        let mut query = sqlx::query(&sql);
1677        for id in ids {
1678            query = query.bind(id.0);
1679        }
1680        query.execute(&self.pool).await?;
1681        Ok(())
1682    }
1683}
1684
1685// ── Row types for sqlx::query_as ─────────────────────────────────────────────
1686
1687#[derive(sqlx::FromRow)]
1688struct EntityRow {
1689    id: i64,
1690    name: String,
1691    canonical_name: String,
1692    entity_type: String,
1693    summary: Option<String>,
1694    first_seen_at: String,
1695    last_seen_at: String,
1696    qdrant_point_id: Option<String>,
1697}
1698
1699fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
1700    let entity_type = row
1701        .entity_type
1702        .parse::<EntityType>()
1703        .map_err(MemoryError::GraphStore)?;
1704    Ok(Entity {
1705        id: row.id,
1706        name: row.name,
1707        canonical_name: row.canonical_name,
1708        entity_type,
1709        summary: row.summary,
1710        first_seen_at: row.first_seen_at,
1711        last_seen_at: row.last_seen_at,
1712        qdrant_point_id: row.qdrant_point_id,
1713    })
1714}
1715
1716#[derive(sqlx::FromRow)]
1717struct AliasRow {
1718    id: i64,
1719    entity_id: i64,
1720    alias_name: String,
1721    created_at: String,
1722}
1723
1724fn alias_from_row(row: AliasRow) -> EntityAlias {
1725    EntityAlias {
1726        id: row.id,
1727        entity_id: row.entity_id,
1728        alias_name: row.alias_name,
1729        created_at: row.created_at,
1730    }
1731}
1732
1733#[derive(sqlx::FromRow)]
1734struct EdgeRow {
1735    id: i64,
1736    source_entity_id: i64,
1737    target_entity_id: i64,
1738    relation: String,
1739    fact: String,
1740    confidence: f64,
1741    valid_from: String,
1742    valid_to: Option<String>,
1743    created_at: String,
1744    expired_at: Option<String>,
1745    episode_id: Option<i64>,
1746    qdrant_point_id: Option<String>,
1747    edge_type: String,
1748}
1749
1750fn edge_from_row(row: EdgeRow) -> Edge {
1751    let edge_type = row
1752        .edge_type
1753        .parse::<EdgeType>()
1754        .unwrap_or(EdgeType::Semantic);
1755    Edge {
1756        id: row.id,
1757        source_entity_id: row.source_entity_id,
1758        target_entity_id: row.target_entity_id,
1759        relation: row.relation,
1760        fact: row.fact,
1761        #[allow(clippy::cast_possible_truncation)]
1762        confidence: row.confidence as f32,
1763        valid_from: row.valid_from,
1764        valid_to: row.valid_to,
1765        created_at: row.created_at,
1766        expired_at: row.expired_at,
1767        episode_id: row.episode_id.map(MessageId),
1768        qdrant_point_id: row.qdrant_point_id,
1769        edge_type,
1770    }
1771}
1772
1773#[derive(sqlx::FromRow)]
1774struct CommunityRow {
1775    id: i64,
1776    name: String,
1777    summary: String,
1778    entity_ids: String,
1779    fingerprint: Option<String>,
1780    created_at: String,
1781    updated_at: String,
1782}
1783
1784// ── Tests ─────────────────────────────────────────────────────────────────────
1785
1786#[cfg(test)]
1787mod tests;