Skip to main content

zeph_memory/graph/store/
mod.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::collections::HashMap;
5
6use futures::Stream;
7use sqlx::SqlitePool;
8
9use crate::error::MemoryError;
10use crate::sqlite::messages::sanitize_fts5_query;
11use crate::types::MessageId;
12
13use super::types::{Community, Edge, Entity, EntityAlias, EntityType};
14
15pub struct GraphStore {
16    pool: SqlitePool,
17}
18
19impl GraphStore {
20    #[must_use]
21    pub fn new(pool: SqlitePool) -> Self {
22        Self { pool }
23    }
24
25    #[must_use]
26    pub fn pool(&self) -> &SqlitePool {
27        &self.pool
28    }
29
30    // ── Entities ─────────────────────────────────────────────────────────────
31
32    /// Insert or update an entity by `(canonical_name, entity_type)`.
33    ///
34    /// - `surface_name`: the original display form (e.g. `"Rust"`) — stored in the `name` column
35    ///   so user-facing output preserves casing. Updated on every upsert to the latest seen form.
36    /// - `canonical_name`: the stable normalized key (e.g. `"rust"`) — used for deduplication.
37    /// - `summary`: pass `None` to preserve the existing summary; pass `Some("")` to blank it.
38    ///
39    /// # Errors
40    ///
41    /// Returns an error if the database query fails.
42    pub async fn upsert_entity(
43        &self,
44        surface_name: &str,
45        canonical_name: &str,
46        entity_type: EntityType,
47        summary: Option<&str>,
48    ) -> Result<i64, MemoryError> {
49        let type_str = entity_type.as_str();
50        let id: i64 = sqlx::query_scalar(
51            "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
52             VALUES (?1, ?2, ?3, ?4)
53             ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
54               name = excluded.name,
55               summary = COALESCE(excluded.summary, summary),
56               last_seen_at = datetime('now')
57             RETURNING id",
58        )
59        .bind(surface_name)
60        .bind(canonical_name)
61        .bind(type_str)
62        .bind(summary)
63        .fetch_one(&self.pool)
64        .await?;
65        Ok(id)
66    }
67
68    /// Find an entity by exact canonical name and type.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the database query fails.
73    pub async fn find_entity(
74        &self,
75        canonical_name: &str,
76        entity_type: EntityType,
77    ) -> Result<Option<Entity>, MemoryError> {
78        let type_str = entity_type.as_str();
79        let row: Option<EntityRow> = sqlx::query_as(
80            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
81             FROM graph_entities
82             WHERE canonical_name = ?1 AND entity_type = ?2",
83        )
84        .bind(canonical_name)
85        .bind(type_str)
86        .fetch_optional(&self.pool)
87        .await?;
88        row.map(entity_from_row).transpose()
89    }
90
91    /// Find an entity by its numeric ID.
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the database query fails.
96    pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
97        let row: Option<EntityRow> = sqlx::query_as(
98            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
99             FROM graph_entities
100             WHERE id = ?1",
101        )
102        .bind(entity_id)
103        .fetch_optional(&self.pool)
104        .await?;
105        row.map(entity_from_row).transpose()
106    }
107
108    /// Update the `qdrant_point_id` for an entity.
109    ///
110    /// # Errors
111    ///
112    /// Returns an error if the database query fails.
113    pub async fn set_entity_qdrant_point_id(
114        &self,
115        entity_id: i64,
116        point_id: &str,
117    ) -> Result<(), MemoryError> {
118        sqlx::query("UPDATE graph_entities SET qdrant_point_id = ?1 WHERE id = ?2")
119            .bind(point_id)
120            .bind(entity_id)
121            .execute(&self.pool)
122            .await?;
123        Ok(())
124    }
125
126    /// Find entities matching `query` in name, summary, or aliases, up to `limit` results, ranked by relevance.
127    ///
128    /// Uses FTS5 MATCH with prefix wildcards (`token*`) and bm25 ranking. Name matches are
129    /// weighted 10x higher than summary matches. Also searches `graph_entity_aliases` for
130    /// alias matches via a UNION query.
131    ///
132    /// # Behavioral note
133    ///
134    /// This replaces the previous `LIKE '%query%'` implementation. FTS5 prefix matching differs
135    /// from substring matching: searching "SQL" will match "`SQLite`" (prefix) but NOT "`GraphQL`"
136    /// (substring). Entity names are indexed as single tokens by the unicode61 tokenizer, so
137    /// mid-word substrings are not matched. This is a known trade-off for index performance.
138    ///
139    /// Single-character queries (e.g., "a") are allowed and produce a broad prefix match ("a*").
140    /// The `limit` parameter caps the result set. No minimum query length is enforced; if this
141    /// causes noise in practice, add a minimum length guard at the call site.
142    ///
143    /// # Errors
144    ///
145    /// Returns an error if the database query fails.
146    pub async fn find_entities_fuzzy(
147        &self,
148        query: &str,
149        limit: usize,
150    ) -> Result<Vec<Entity>, MemoryError> {
151        // FTS5 boolean operator keywords (case-sensitive uppercase). Filtering these
152        // prevents syntax errors when user input contains them as literal search terms
153        // (e.g., "graph OR unrelated" must not produce "graph* OR* unrelated*").
154        const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
155        let query = &query[..query.floor_char_boundary(512)];
156        // Sanitize input: split on non-alphanumeric characters, filter empty tokens,
157        // append '*' to each token for FTS5 prefix matching ("graph" -> "graph*").
158        let sanitized = sanitize_fts5_query(query);
159        if sanitized.is_empty() {
160            return Ok(vec![]);
161        }
162        let fts_query: String = sanitized
163            .split_whitespace()
164            .filter(|t| !FTS5_OPERATORS.contains(t))
165            .map(|t| format!("{t}*"))
166            .collect::<Vec<_>>()
167            .join(" ");
168        if fts_query.is_empty() {
169            return Ok(vec![]);
170        }
171
172        let limit = i64::try_from(limit)?;
173        // bm25(graph_entities_fts, 10.0, 1.0): name column weighted 10x over summary.
174        // bm25() returns negative values; ORDER BY ASC puts best matches first.
175        let rows: Vec<EntityRow> = sqlx::query_as(
176            "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary,
177                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id
178             FROM graph_entities_fts fts
179             JOIN graph_entities e ON e.id = fts.rowid
180             WHERE graph_entities_fts MATCH ?1
181             UNION
182             SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary,
183                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id
184             FROM graph_entity_aliases a
185             JOIN graph_entities e ON e.id = a.entity_id
186             WHERE a.alias_name LIKE ?2 ESCAPE '\\' COLLATE NOCASE
187             LIMIT ?3",
188        )
189        .bind(&fts_query)
190        .bind(format!(
191            "%{}%",
192            query
193                .trim()
194                .replace('\\', "\\\\")
195                .replace('%', "\\%")
196                .replace('_', "\\_")
197        ))
198        .bind(limit)
199        .fetch_all(&self.pool)
200        .await?;
201        rows.into_iter()
202            .map(entity_from_row)
203            .collect::<Result<Vec<_>, _>>()
204    }
205
206    /// Stream all entities from the database incrementally (true cursor, no full-table load).
207    pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
208        use futures::StreamExt as _;
209        sqlx::query_as::<_, EntityRow>(
210            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
211             FROM graph_entities ORDER BY id ASC",
212        )
213        .fetch(&self.pool)
214        .map(|r: Result<EntityRow, sqlx::Error>| {
215            r.map_err(MemoryError::from).and_then(entity_from_row)
216        })
217    }
218
219    // ── Alias methods ─────────────────────────────────────────────────────────
220
221    /// Insert an alias for an entity (idempotent: duplicate alias is silently ignored via UNIQUE constraint).
222    ///
223    /// # Errors
224    ///
225    /// Returns an error if the database query fails.
226    pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
227        sqlx::query(
228            "INSERT OR IGNORE INTO graph_entity_aliases (entity_id, alias_name) VALUES (?1, ?2)",
229        )
230        .bind(entity_id)
231        .bind(alias_name)
232        .execute(&self.pool)
233        .await?;
234        Ok(())
235    }
236
237    /// Find an entity by alias name and entity type (case-insensitive).
238    ///
239    /// Filters by `entity_type` to avoid cross-type alias collisions (S2 fix).
240    ///
241    /// # Errors
242    ///
243    /// Returns an error if the database query fails.
244    pub async fn find_entity_by_alias(
245        &self,
246        alias_name: &str,
247        entity_type: EntityType,
248    ) -> Result<Option<Entity>, MemoryError> {
249        let type_str = entity_type.as_str();
250        let row: Option<EntityRow> = sqlx::query_as(
251            "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary,
252                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id
253             FROM graph_entity_aliases a
254             JOIN graph_entities e ON e.id = a.entity_id
255             WHERE a.alias_name = ?1 COLLATE NOCASE
256               AND e.entity_type = ?2
257             ORDER BY e.id ASC
258             LIMIT 1",
259        )
260        .bind(alias_name)
261        .bind(type_str)
262        .fetch_optional(&self.pool)
263        .await?;
264        row.map(entity_from_row).transpose()
265    }
266
267    /// Get all aliases for an entity.
268    ///
269    /// # Errors
270    ///
271    /// Returns an error if the database query fails.
272    pub async fn aliases_for_entity(
273        &self,
274        entity_id: i64,
275    ) -> Result<Vec<EntityAlias>, MemoryError> {
276        let rows: Vec<AliasRow> = sqlx::query_as(
277            "SELECT id, entity_id, alias_name, created_at
278             FROM graph_entity_aliases
279             WHERE entity_id = ?1
280             ORDER BY id ASC",
281        )
282        .bind(entity_id)
283        .fetch_all(&self.pool)
284        .await?;
285        Ok(rows.into_iter().map(alias_from_row).collect())
286    }
287
288    /// Collect all entities into a Vec.
289    ///
290    /// # Errors
291    ///
292    /// Returns an error if the database query fails or `entity_type` parsing fails.
293    pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
294        use futures::TryStreamExt as _;
295        self.all_entities_stream().try_collect().await
296    }
297
298    /// Count the total number of entities.
299    ///
300    /// # Errors
301    ///
302    /// Returns an error if the database query fails.
303    pub async fn entity_count(&self) -> Result<i64, MemoryError> {
304        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM graph_entities")
305            .fetch_one(&self.pool)
306            .await?;
307        Ok(count)
308    }
309
310    // ── Edges ─────────────────────────────────────────────────────────────────
311
312    /// Insert a new edge between two entities, or update the existing active edge.
313    ///
314    /// An active edge is identified by `(source_entity_id, target_entity_id, relation)` with
315    /// `valid_to IS NULL`. If such an edge already exists, its `confidence` is updated to the
316    /// maximum of the stored and incoming values, and the existing id is returned. This prevents
317    /// duplicate edges from repeated extraction of the same context messages.
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if the database query fails.
322    pub async fn insert_edge(
323        &self,
324        source_entity_id: i64,
325        target_entity_id: i64,
326        relation: &str,
327        fact: &str,
328        confidence: f32,
329        episode_id: Option<MessageId>,
330    ) -> Result<i64, MemoryError> {
331        let confidence = confidence.clamp(0.0, 1.0);
332
333        let existing: Option<(i64, f64)> = sqlx::query_as(
334            "SELECT id, confidence FROM graph_edges
335             WHERE source_entity_id = ?1
336               AND target_entity_id = ?2
337               AND relation = ?3
338               AND valid_to IS NULL
339             LIMIT 1",
340        )
341        .bind(source_entity_id)
342        .bind(target_entity_id)
343        .bind(relation)
344        .fetch_optional(&self.pool)
345        .await?;
346
347        if let Some((existing_id, stored_conf)) = existing {
348            let updated_conf = f64::from(confidence).max(stored_conf);
349            sqlx::query("UPDATE graph_edges SET confidence = ?1 WHERE id = ?2")
350                .bind(updated_conf)
351                .bind(existing_id)
352                .execute(&self.pool)
353                .await?;
354            return Ok(existing_id);
355        }
356
357        let episode_raw: Option<i64> = episode_id.map(|m| m.0);
358        let id: i64 = sqlx::query_scalar(
359            "INSERT INTO graph_edges (source_entity_id, target_entity_id, relation, fact, confidence, episode_id)
360             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
361             RETURNING id",
362        )
363        .bind(source_entity_id)
364        .bind(target_entity_id)
365        .bind(relation)
366        .bind(fact)
367        .bind(f64::from(confidence))
368        .bind(episode_raw)
369        .fetch_one(&self.pool)
370        .await?;
371        Ok(id)
372    }
373
374    /// Mark an edge as invalid (set `valid_to` and `expired_at` to now).
375    ///
376    /// # Errors
377    ///
378    /// Returns an error if the database update fails.
379    pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
380        sqlx::query(
381            "UPDATE graph_edges SET valid_to = datetime('now'), expired_at = datetime('now')
382             WHERE id = ?1",
383        )
384        .bind(edge_id)
385        .execute(&self.pool)
386        .await?;
387        Ok(())
388    }
389
390    /// Get all active edges where entity is source or target.
391    ///
392    /// # Errors
393    ///
394    /// Returns an error if the database query fails.
395    pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
396        let rows: Vec<EdgeRow> = sqlx::query_as(
397            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
398                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
399             FROM graph_edges
400             WHERE valid_to IS NULL
401               AND (source_entity_id = ?1 OR target_entity_id = ?1)",
402        )
403        .bind(entity_id)
404        .fetch_all(&self.pool)
405        .await?;
406        Ok(rows.into_iter().map(edge_from_row).collect())
407    }
408
409    /// Get all edges (active and expired) where entity is source or target, ordered by
410    /// `valid_from DESC`. Used by the `/graph history <name>` slash command.
411    ///
412    /// # Errors
413    ///
414    /// Returns an error if the database query fails or if `limit` overflows `i64`.
415    pub async fn edge_history_for_entity(
416        &self,
417        entity_id: i64,
418        limit: usize,
419    ) -> Result<Vec<Edge>, MemoryError> {
420        let limit = i64::try_from(limit)?;
421        let rows: Vec<EdgeRow> = sqlx::query_as(
422            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
423                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
424             FROM graph_edges
425             WHERE source_entity_id = ?1 OR target_entity_id = ?1
426             ORDER BY valid_from DESC
427             LIMIT ?2",
428        )
429        .bind(entity_id)
430        .bind(limit)
431        .fetch_all(&self.pool)
432        .await?;
433        Ok(rows.into_iter().map(edge_from_row).collect())
434    }
435
436    /// Get all active edges between two entities (both directions).
437    ///
438    /// # Errors
439    ///
440    /// Returns an error if the database query fails.
441    pub async fn edges_between(
442        &self,
443        entity_a: i64,
444        entity_b: i64,
445    ) -> Result<Vec<Edge>, MemoryError> {
446        let rows: Vec<EdgeRow> = sqlx::query_as(
447            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
448                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
449             FROM graph_edges
450             WHERE valid_to IS NULL
451               AND ((source_entity_id = ?1 AND target_entity_id = ?2)
452                 OR (source_entity_id = ?2 AND target_entity_id = ?1))",
453        )
454        .bind(entity_a)
455        .bind(entity_b)
456        .fetch_all(&self.pool)
457        .await?;
458        Ok(rows.into_iter().map(edge_from_row).collect())
459    }
460
461    /// Get active edges from `source` to `target` in the exact direction (no reverse).
462    ///
463    /// # Errors
464    ///
465    /// Returns an error if the database query fails.
466    pub async fn edges_exact(
467        &self,
468        source_entity_id: i64,
469        target_entity_id: i64,
470    ) -> Result<Vec<Edge>, MemoryError> {
471        let rows: Vec<EdgeRow> = sqlx::query_as(
472            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
473                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
474             FROM graph_edges
475             WHERE valid_to IS NULL
476               AND source_entity_id = ?1
477               AND target_entity_id = ?2",
478        )
479        .bind(source_entity_id)
480        .bind(target_entity_id)
481        .fetch_all(&self.pool)
482        .await?;
483        Ok(rows.into_iter().map(edge_from_row).collect())
484    }
485
486    /// Count active (non-invalidated) edges.
487    ///
488    /// # Errors
489    ///
490    /// Returns an error if the database query fails.
491    pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
492        let count: i64 =
493            sqlx::query_scalar("SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL")
494                .fetch_one(&self.pool)
495                .await?;
496        Ok(count)
497    }
498
499    // ── Communities ───────────────────────────────────────────────────────────
500
501    /// Insert or update a community by name.
502    ///
503    /// `fingerprint` is a BLAKE3 hex string computed from sorted entity IDs and
504    /// intra-community edge IDs. Pass `None` to leave the fingerprint unchanged (e.g. when
505    /// `assign_to_community` adds an entity without a full re-detection pass).
506    ///
507    /// # Errors
508    ///
509    /// Returns an error if the database query fails or JSON serialization fails.
510    pub async fn upsert_community(
511        &self,
512        name: &str,
513        summary: &str,
514        entity_ids: &[i64],
515        fingerprint: Option<&str>,
516    ) -> Result<i64, MemoryError> {
517        let entity_ids_json = serde_json::to_string(entity_ids)?;
518        let id: i64 = sqlx::query_scalar(
519            "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
520             VALUES (?1, ?2, ?3, ?4)
521             ON CONFLICT(name) DO UPDATE SET
522               summary = excluded.summary,
523               entity_ids = excluded.entity_ids,
524               fingerprint = COALESCE(excluded.fingerprint, fingerprint),
525               updated_at = datetime('now')
526             RETURNING id",
527        )
528        .bind(name)
529        .bind(summary)
530        .bind(entity_ids_json)
531        .bind(fingerprint)
532        .fetch_one(&self.pool)
533        .await?;
534        Ok(id)
535    }
536
537    /// Return a map of `fingerprint -> community_id` for all communities with a non-NULL
538    /// fingerprint. Used by `detect_communities` to skip unchanged partitions.
539    ///
540    /// # Errors
541    ///
542    /// Returns an error if the database query fails.
543    pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
544        let rows: Vec<(String, i64)> = sqlx::query_as(
545            "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL",
546        )
547        .fetch_all(&self.pool)
548        .await?;
549        Ok(rows.into_iter().collect())
550    }
551
552    /// Delete a single community by its primary key.
553    ///
554    /// # Errors
555    ///
556    /// Returns an error if the database query fails.
557    pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
558        sqlx::query("DELETE FROM graph_communities WHERE id = ?1")
559            .bind(id)
560            .execute(&self.pool)
561            .await?;
562        Ok(())
563    }
564
565    /// Set the fingerprint of a community to `NULL`, invalidating the incremental cache.
566    ///
567    /// Used by `assign_to_community` when an entity is added without a full re-detection pass,
568    /// ensuring the next `detect_communities` run re-summarizes the affected community.
569    ///
570    /// # Errors
571    ///
572    /// Returns an error if the database query fails.
573    pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
574        sqlx::query("UPDATE graph_communities SET fingerprint = NULL WHERE id = ?1")
575            .bind(id)
576            .execute(&self.pool)
577            .await?;
578        Ok(())
579    }
580
581    /// Find the first community that contains the given `entity_id`.
582    ///
583    /// Uses `json_each()` to push the membership search into `SQLite`, avoiding a full
584    /// table scan with per-row JSON parsing.
585    ///
586    /// # Errors
587    ///
588    /// Returns an error if the database query fails or JSON parsing fails.
589    pub async fn community_for_entity(
590        &self,
591        entity_id: i64,
592    ) -> Result<Option<Community>, MemoryError> {
593        let row: Option<CommunityRow> = sqlx::query_as(
594            "SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
595             FROM graph_communities c, json_each(c.entity_ids) j
596             WHERE CAST(j.value AS INTEGER) = ?1
597             LIMIT 1",
598        )
599        .bind(entity_id)
600        .fetch_optional(&self.pool)
601        .await?;
602        match row {
603            Some(row) => {
604                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
605                Ok(Some(Community {
606                    id: row.id,
607                    name: row.name,
608                    summary: row.summary,
609                    entity_ids,
610                    fingerprint: row.fingerprint,
611                    created_at: row.created_at,
612                    updated_at: row.updated_at,
613                }))
614            }
615            None => Ok(None),
616        }
617    }
618
619    /// Get all communities.
620    ///
621    /// # Errors
622    ///
623    /// Returns an error if the database query fails or JSON parsing fails.
624    pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
625        let rows: Vec<CommunityRow> = sqlx::query_as(
626            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
627             FROM graph_communities
628             ORDER BY id ASC",
629        )
630        .fetch_all(&self.pool)
631        .await?;
632
633        rows.into_iter()
634            .map(|row| {
635                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
636                Ok(Community {
637                    id: row.id,
638                    name: row.name,
639                    summary: row.summary,
640                    entity_ids,
641                    fingerprint: row.fingerprint,
642                    created_at: row.created_at,
643                    updated_at: row.updated_at,
644                })
645            })
646            .collect()
647    }
648
649    /// Count the total number of communities.
650    ///
651    /// # Errors
652    ///
653    /// Returns an error if the database query fails.
654    pub async fn community_count(&self) -> Result<i64, MemoryError> {
655        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM graph_communities")
656            .fetch_one(&self.pool)
657            .await?;
658        Ok(count)
659    }
660
661    // ── Metadata ──────────────────────────────────────────────────────────────
662
663    /// Get a metadata value by key.
664    ///
665    /// # Errors
666    ///
667    /// Returns an error if the database query fails.
668    pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
669        let val: Option<String> =
670            sqlx::query_scalar("SELECT value FROM graph_metadata WHERE key = ?1")
671                .bind(key)
672                .fetch_optional(&self.pool)
673                .await?;
674        Ok(val)
675    }
676
677    /// Set a metadata value by key (upsert).
678    ///
679    /// # Errors
680    ///
681    /// Returns an error if the database query fails.
682    pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
683        sqlx::query(
684            "INSERT INTO graph_metadata (key, value) VALUES (?1, ?2)
685             ON CONFLICT(key) DO UPDATE SET value = excluded.value",
686        )
687        .bind(key)
688        .bind(value)
689        .execute(&self.pool)
690        .await?;
691        Ok(())
692    }
693
694    /// Get the current extraction count from metadata.
695    ///
696    /// Returns 0 if the counter has not been initialized.
697    ///
698    /// # Errors
699    ///
700    /// Returns an error if the database query fails.
701    pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
702        let val = self.get_metadata("extraction_count").await?;
703        Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
704    }
705
706    /// Stream all active (non-invalidated) edges.
707    pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
708        use futures::StreamExt as _;
709        sqlx::query_as::<_, EdgeRow>(
710            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
711                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
712             FROM graph_edges
713             WHERE valid_to IS NULL
714             ORDER BY id ASC",
715        )
716        .fetch(&self.pool)
717        .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
718    }
719
720    /// Fetch a chunk of active edges using keyset pagination.
721    ///
722    /// Returns edges with `id > after_id` in ascending order, up to `limit` rows.
723    /// Starting with `after_id = 0` returns the first chunk. Pass the last `id` from
724    /// the returned chunk as `after_id` for the next page. An empty result means all
725    /// edges have been consumed.
726    ///
727    /// Keyset pagination is O(1) per page (index seek on `id`) vs OFFSET which is O(N).
728    /// It is also stable under concurrent inserts: new edges get monotonically higher IDs
729    /// and will appear in subsequent chunks or after the last chunk, never causing
730    /// duplicates. Concurrent invalidations (setting `valid_to`) may cause a single edge
731    /// to be skipped, which is acceptable — LPA operates on an eventual-consistency snapshot.
732    ///
733    /// # Errors
734    ///
735    /// Returns an error if the database query fails.
736    pub async fn edges_after_id(
737        &self,
738        after_id: i64,
739        limit: i64,
740    ) -> Result<Vec<Edge>, MemoryError> {
741        let rows: Vec<EdgeRow> = sqlx::query_as(
742            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
743                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
744             FROM graph_edges
745             WHERE valid_to IS NULL AND id > ?1
746             ORDER BY id ASC
747             LIMIT ?2",
748        )
749        .bind(after_id)
750        .bind(limit)
751        .fetch_all(&self.pool)
752        .await?;
753        Ok(rows.into_iter().map(edge_from_row).collect())
754    }
755
756    /// Find a community by its primary key.
757    ///
758    /// # Errors
759    ///
760    /// Returns an error if the database query fails or JSON parsing fails.
761    pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
762        let row: Option<CommunityRow> = sqlx::query_as(
763            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
764             FROM graph_communities
765             WHERE id = ?1",
766        )
767        .bind(id)
768        .fetch_optional(&self.pool)
769        .await?;
770        match row {
771            Some(row) => {
772                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
773                Ok(Some(Community {
774                    id: row.id,
775                    name: row.name,
776                    summary: row.summary,
777                    entity_ids,
778                    fingerprint: row.fingerprint,
779                    created_at: row.created_at,
780                    updated_at: row.updated_at,
781                }))
782            }
783            None => Ok(None),
784        }
785    }
786
787    /// Delete all communities (full rebuild before upsert).
788    ///
789    /// # Errors
790    ///
791    /// Returns an error if the database query fails.
792    pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
793        sqlx::query("DELETE FROM graph_communities")
794            .execute(&self.pool)
795            .await?;
796        Ok(())
797    }
798
799    /// Delete expired edges older than `retention_days` and return count deleted.
800    ///
801    /// # Errors
802    ///
803    /// Returns an error if the database query fails.
804    pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
805        let days = i64::from(retention_days);
806        let result = sqlx::query(
807            "DELETE FROM graph_edges
808             WHERE expired_at IS NOT NULL
809               AND expired_at < datetime('now', '-' || ?1 || ' days')",
810        )
811        .bind(days)
812        .execute(&self.pool)
813        .await?;
814        Ok(usize::try_from(result.rows_affected())?)
815    }
816
817    /// Delete orphan entities (no active edges, last seen more than `retention_days` ago).
818    ///
819    /// # Errors
820    ///
821    /// Returns an error if the database query fails.
822    pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
823        let days = i64::from(retention_days);
824        let result = sqlx::query(
825            "DELETE FROM graph_entities
826             WHERE id NOT IN (
827                 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
828                 UNION
829                 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
830             )
831             AND last_seen_at < datetime('now', '-' || ?1 || ' days')",
832        )
833        .bind(days)
834        .execute(&self.pool)
835        .await?;
836        Ok(usize::try_from(result.rows_affected())?)
837    }
838
839    /// Delete the oldest excess entities when count exceeds `max_entities`.
840    ///
841    /// Entities are ranked by ascending edge count, then ascending `last_seen_at` (LRU).
842    /// Only deletes when `entity_count() > max_entities`.
843    ///
844    /// # Errors
845    ///
846    /// Returns an error if the database query fails.
847    pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
848        let current = self.entity_count().await?;
849        let max = i64::try_from(max_entities)?;
850        if current <= max {
851            return Ok(0);
852        }
853        let excess = current - max;
854        let result = sqlx::query(
855            "DELETE FROM graph_entities
856             WHERE id IN (
857                 SELECT e.id
858                 FROM graph_entities e
859                 LEFT JOIN (
860                     SELECT source_entity_id AS eid, COUNT(*) AS cnt
861                     FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
862                     UNION ALL
863                     SELECT target_entity_id AS eid, COUNT(*) AS cnt
864                     FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
865                 ) edge_counts ON e.id = edge_counts.eid
866                 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
867                 LIMIT ?1
868             )",
869        )
870        .bind(excess)
871        .execute(&self.pool)
872        .await?;
873        Ok(usize::try_from(result.rows_affected())?)
874    }
875
876    // ── Temporal Edge Queries ─────────────────────────────────────────────────
877
878    /// Return all edges for `entity_id` (as source or target) that were valid at `timestamp`.
879    ///
880    /// An edge is valid at `timestamp` when:
881    /// - `valid_from <= timestamp`, AND
882    /// - `valid_to IS NULL` (open-ended) OR `valid_to > timestamp`.
883    ///
884    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
885    ///
886    /// # Errors
887    ///
888    /// Returns an error if the database query fails.
889    pub async fn edges_at_timestamp(
890        &self,
891        entity_id: i64,
892        timestamp: &str,
893    ) -> Result<Vec<Edge>, MemoryError> {
894        // Split into two UNIONed branches to leverage the partial indexes from migration 030:
895        //   Branch 1 (active edges):     idx_graph_edges_valid + idx_graph_edges_source/target
896        //   Branch 2 (historical edges): idx_graph_edges_src_temporal / idx_graph_edges_tgt_temporal
897        let rows: Vec<EdgeRow> = sqlx::query_as(
898            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
899                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
900             FROM graph_edges
901             WHERE valid_to IS NULL
902               AND valid_from <= ?2
903               AND (source_entity_id = ?1 OR target_entity_id = ?1)
904             UNION ALL
905             SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
906                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
907             FROM graph_edges
908             WHERE valid_to IS NOT NULL
909               AND valid_from <= ?2
910               AND valid_to > ?2
911               AND (source_entity_id = ?1 OR target_entity_id = ?1)",
912        )
913        .bind(entity_id)
914        .bind(timestamp)
915        .fetch_all(&self.pool)
916        .await?;
917        Ok(rows.into_iter().map(edge_from_row).collect())
918    }
919
920    /// Return all edge versions (active and expired) for the given `(source, predicate)` pair.
921    ///
922    /// The optional `relation` filter restricts results to a specific relation label.
923    /// Results are ordered by `valid_from DESC` (most recent first).
924    ///
925    /// # Errors
926    ///
927    /// Returns an error if the database query fails.
928    pub async fn edge_history(
929        &self,
930        source_entity_id: i64,
931        predicate: &str,
932        relation: Option<&str>,
933        limit: usize,
934    ) -> Result<Vec<Edge>, MemoryError> {
935        // Escape LIKE wildcards so `%` and `_` in the predicate are treated as literals.
936        let escaped = predicate
937            .replace('\\', "\\\\")
938            .replace('%', "\\%")
939            .replace('_', "\\_");
940        let like_pattern = format!("%{escaped}%");
941        let limit = i64::try_from(limit)?;
942        let rows: Vec<EdgeRow> = if let Some(rel) = relation {
943            sqlx::query_as(
944                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
945                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
946                 FROM graph_edges
947                 WHERE source_entity_id = ?1
948                   AND fact LIKE ?2 ESCAPE '\\'
949                   AND relation = ?3
950                 ORDER BY valid_from DESC
951                 LIMIT ?4",
952            )
953            .bind(source_entity_id)
954            .bind(&like_pattern)
955            .bind(rel)
956            .bind(limit)
957            .fetch_all(&self.pool)
958            .await?
959        } else {
960            sqlx::query_as(
961                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
962                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
963                 FROM graph_edges
964                 WHERE source_entity_id = ?1
965                   AND fact LIKE ?2 ESCAPE '\\'
966                 ORDER BY valid_from DESC
967                 LIMIT ?3",
968            )
969            .bind(source_entity_id)
970            .bind(&like_pattern)
971            .bind(limit)
972            .fetch_all(&self.pool)
973            .await?
974        };
975        Ok(rows.into_iter().map(edge_from_row).collect())
976    }
977
978    // ── BFS Traversal ─────────────────────────────────────────────────────────
979
980    /// Breadth-first traversal from `start_entity_id` up to `max_hops` hops.
981    ///
982    /// Returns all reachable entities and the active edges connecting them.
983    /// Implements BFS iteratively in Rust to guarantee cycle safety regardless
984    /// of `SQLite` CTE limitations.
985    ///
986    /// **`SQLite` bind parameter limit**: each BFS hop binds the frontier IDs three times in the
987    /// neighbour query. At ~300+ frontier entities per hop, the IN clause may approach `SQLite`'s
988    /// default `SQLITE_MAX_VARIABLE_NUMBER` limit of 999. Acceptable for Phase 1 (small graphs,
989    /// `max_hops` typically 2–3). For large graphs, consider batching or a temp-table approach.
990    ///
991    /// # Errors
992    ///
993    /// Returns an error if any database query fails.
994    pub async fn bfs(
995        &self,
996        start_entity_id: i64,
997        max_hops: u32,
998    ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
999        self.bfs_with_depth(start_entity_id, max_hops)
1000            .await
1001            .map(|(e, ed, _)| (e, ed))
1002    }
1003
1004    /// BFS traversal returning entities, edges, and a depth map (`entity_id` → hop distance).
1005    ///
1006    /// The depth map records the minimum hop distance from `start_entity_id` to each visited
1007    /// entity. The start entity itself has depth 0.
1008    ///
1009    /// **`SQLite` bind parameter limit**: see [`bfs`] for notes on frontier size limits.
1010    ///
1011    /// # Errors
1012    ///
1013    /// Returns an error if any database query fails.
1014    pub async fn bfs_with_depth(
1015        &self,
1016        start_entity_id: i64,
1017        max_hops: u32,
1018    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1019        self.bfs_core(start_entity_id, max_hops, None).await
1020    }
1021
1022    /// BFS traversal considering only edges that were valid at `timestamp`.
1023    ///
1024    /// Equivalent to [`bfs_with_depth`] but replaces the `valid_to IS NULL` filter with
1025    /// the temporal range predicate `valid_from <= ts AND (valid_to IS NULL OR valid_to > ts)`.
1026    ///
1027    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1028    ///
1029    /// # Errors
1030    ///
1031    /// Returns an error if any database query fails.
1032    pub async fn bfs_at_timestamp(
1033        &self,
1034        start_entity_id: i64,
1035        max_hops: u32,
1036        timestamp: &str,
1037    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1038        self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1039            .await
1040    }
1041
1042    /// Shared BFS implementation.
1043    ///
1044    /// When `at_timestamp` is `None`, only active edges (`valid_to IS NULL`) are traversed.
1045    /// When `at_timestamp` is `Some(ts)`, edges valid at `ts` are traversed (temporal BFS).
1046    ///
1047    /// All IDs used in dynamic SQL come from our own database — no user input reaches the
1048    /// format string, so there is no SQL injection risk.
1049    async fn bfs_core(
1050        &self,
1051        start_entity_id: i64,
1052        max_hops: u32,
1053        at_timestamp: Option<&str>,
1054    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1055        use std::collections::HashMap;
1056
1057        // SQLite binds frontier IDs 3× per hop; at >333 IDs the IN clause exceeds
1058        // SQLITE_MAX_VARIABLE_NUMBER (999). Cap to 300 to stay safely within the limit.
1059        const MAX_FRONTIER: usize = 300;
1060
1061        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1062        let mut frontier: Vec<i64> = vec![start_entity_id];
1063        depth_map.insert(start_entity_id, 0);
1064
1065        for hop in 0..max_hops {
1066            if frontier.is_empty() {
1067                break;
1068            }
1069            frontier.truncate(MAX_FRONTIER);
1070            // IDs come from our own DB — no user input, no injection risk.
1071            let placeholders = frontier
1072                .iter()
1073                .enumerate()
1074                .map(|(i, _)| format!("?{}", i + 1))
1075                .collect::<Vec<_>>()
1076                .join(", ");
1077            let edge_filter = if at_timestamp.is_some() {
1078                let ts_pos = frontier.len() * 3 + 1;
1079                format!("valid_from <= ?{ts_pos} AND (valid_to IS NULL OR valid_to > ?{ts_pos})")
1080            } else {
1081                "valid_to IS NULL".to_owned()
1082            };
1083            let neighbour_sql = format!(
1084                "SELECT DISTINCT CASE
1085                     WHEN source_entity_id IN ({placeholders}) THEN target_entity_id
1086                     ELSE source_entity_id
1087                 END as neighbour_id
1088                 FROM graph_edges
1089                 WHERE {edge_filter}
1090                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders}))"
1091            );
1092            let mut q = sqlx::query_scalar::<_, i64>(&neighbour_sql);
1093            for id in &frontier {
1094                q = q.bind(*id);
1095            }
1096            for id in &frontier {
1097                q = q.bind(*id);
1098            }
1099            for id in &frontier {
1100                q = q.bind(*id);
1101            }
1102            if let Some(ts) = at_timestamp {
1103                q = q.bind(ts);
1104            }
1105            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1106            let mut next_frontier: Vec<i64> = Vec::new();
1107            for nbr in neighbours {
1108                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1109                    e.insert(hop + 1);
1110                    next_frontier.push(nbr);
1111                }
1112            }
1113            frontier = next_frontier;
1114        }
1115
1116        self.bfs_fetch_results(depth_map, at_timestamp).await
1117    }
1118
1119    /// Fetch entities and edges for a completed BFS depth map.
1120    async fn bfs_fetch_results(
1121        &self,
1122        depth_map: std::collections::HashMap<i64, u32>,
1123        at_timestamp: Option<&str>,
1124    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1125        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1126        if visited_ids.is_empty() {
1127            return Ok((Vec::new(), Vec::new(), depth_map));
1128        }
1129        // Edge query binds visited_ids twice — cap at 499 to stay under SQLite 999 limit.
1130        if visited_ids.len() > 499 {
1131            tracing::warn!(
1132                total = visited_ids.len(),
1133                retained = 499,
1134                "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1135                 some reachable entities will be dropped from results"
1136            );
1137            visited_ids.truncate(499);
1138        }
1139
1140        let placeholders = visited_ids
1141            .iter()
1142            .enumerate()
1143            .map(|(i, _)| format!("?{}", i + 1))
1144            .collect::<Vec<_>>()
1145            .join(", ");
1146        let edge_filter = if at_timestamp.is_some() {
1147            let ts_pos = visited_ids.len() * 2 + 1;
1148            format!("valid_from <= ?{ts_pos} AND (valid_to IS NULL OR valid_to > ?{ts_pos})")
1149        } else {
1150            "valid_to IS NULL".to_owned()
1151        };
1152        let edge_sql = format!(
1153            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1154                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
1155             FROM graph_edges
1156             WHERE {edge_filter}
1157               AND source_entity_id IN ({placeholders})
1158               AND target_entity_id IN ({placeholders})"
1159        );
1160        let mut edge_query = sqlx::query_as::<_, EdgeRow>(&edge_sql);
1161        for id in &visited_ids {
1162            edge_query = edge_query.bind(*id);
1163        }
1164        for id in &visited_ids {
1165            edge_query = edge_query.bind(*id);
1166        }
1167        if let Some(ts) = at_timestamp {
1168            edge_query = edge_query.bind(ts);
1169        }
1170        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1171
1172        let entity_sql = format!(
1173            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1174             FROM graph_entities WHERE id IN ({placeholders})"
1175        );
1176        let mut entity_query = sqlx::query_as::<_, EntityRow>(&entity_sql);
1177        for id in &visited_ids {
1178            entity_query = entity_query.bind(*id);
1179        }
1180        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1181
1182        let entities: Vec<Entity> = entity_rows
1183            .into_iter()
1184            .map(entity_from_row)
1185            .collect::<Result<Vec<_>, _>>()?;
1186        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1187
1188        Ok((entities, edges, depth_map))
1189    }
1190
1191    // ── Backfill helpers ──────────────────────────────────────────────────────
1192
1193    /// Find an entity by name only (no type filter).
1194    ///
1195    /// Uses a two-phase lookup to ensure exact name matches are always prioritised:
1196    /// 1. Exact case-insensitive match on `name` or `canonical_name`.
1197    /// 2. If no exact match found, falls back to FTS5 prefix search (see `find_entities_fuzzy`).
1198    ///
1199    /// This prevents FTS5 from returning a different entity whose *summary* mentions the
1200    /// searched name (e.g. searching "Alice" returning "Google" because Google's summary
1201    /// contains "Alice").
1202    ///
1203    /// # Errors
1204    ///
1205    /// Returns an error if the database query fails.
1206    pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
1207        let rows: Vec<EntityRow> = sqlx::query_as(
1208            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1209             FROM graph_entities
1210             WHERE name = ?1 COLLATE NOCASE OR canonical_name = ?1 COLLATE NOCASE
1211             LIMIT 5",
1212        )
1213        .bind(name)
1214        .fetch_all(&self.pool)
1215        .await?;
1216
1217        if !rows.is_empty() {
1218            return rows.into_iter().map(entity_from_row).collect();
1219        }
1220
1221        self.find_entities_fuzzy(name, 5).await
1222    }
1223
1224    /// Return up to `limit` messages that have not yet been processed by graph extraction.
1225    ///
1226    /// Reads the `graph_processed` column added by migration 021.
1227    ///
1228    /// # Errors
1229    ///
1230    /// Returns an error if the database query fails.
1231    pub async fn unprocessed_messages_for_backfill(
1232        &self,
1233        limit: usize,
1234    ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
1235        let limit = i64::try_from(limit)?;
1236        let rows: Vec<(i64, String)> = sqlx::query_as(
1237            "SELECT id, content FROM messages
1238             WHERE graph_processed = 0
1239             ORDER BY id ASC
1240             LIMIT ?1",
1241        )
1242        .bind(limit)
1243        .fetch_all(&self.pool)
1244        .await?;
1245        Ok(rows
1246            .into_iter()
1247            .map(|(id, content)| (crate::types::MessageId(id), content))
1248            .collect())
1249    }
1250
1251    /// Return the count of messages not yet processed by graph extraction.
1252    ///
1253    /// # Errors
1254    ///
1255    /// Returns an error if the database query fails.
1256    pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
1257        let count: i64 =
1258            sqlx::query_scalar("SELECT COUNT(*) FROM messages WHERE graph_processed = 0")
1259                .fetch_one(&self.pool)
1260                .await?;
1261        Ok(count)
1262    }
1263
1264    /// Mark a batch of messages as graph-processed.
1265    ///
1266    /// # Errors
1267    ///
1268    /// Returns an error if the database query fails.
1269    pub async fn mark_messages_graph_processed(
1270        &self,
1271        ids: &[crate::types::MessageId],
1272    ) -> Result<(), MemoryError> {
1273        if ids.is_empty() {
1274            return Ok(());
1275        }
1276        let placeholders = ids
1277            .iter()
1278            .enumerate()
1279            .map(|(i, _)| format!("?{}", i + 1))
1280            .collect::<Vec<_>>()
1281            .join(", ");
1282        let sql = format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
1283        let mut query = sqlx::query(&sql);
1284        for id in ids {
1285            query = query.bind(id.0);
1286        }
1287        query.execute(&self.pool).await?;
1288        Ok(())
1289    }
1290}
1291
1292// ── Row types for sqlx::query_as ─────────────────────────────────────────────
1293
1294#[derive(sqlx::FromRow)]
1295struct EntityRow {
1296    id: i64,
1297    name: String,
1298    canonical_name: String,
1299    entity_type: String,
1300    summary: Option<String>,
1301    first_seen_at: String,
1302    last_seen_at: String,
1303    qdrant_point_id: Option<String>,
1304}
1305
1306fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
1307    let entity_type = row
1308        .entity_type
1309        .parse::<EntityType>()
1310        .map_err(MemoryError::GraphStore)?;
1311    Ok(Entity {
1312        id: row.id,
1313        name: row.name,
1314        canonical_name: row.canonical_name,
1315        entity_type,
1316        summary: row.summary,
1317        first_seen_at: row.first_seen_at,
1318        last_seen_at: row.last_seen_at,
1319        qdrant_point_id: row.qdrant_point_id,
1320    })
1321}
1322
1323#[derive(sqlx::FromRow)]
1324struct AliasRow {
1325    id: i64,
1326    entity_id: i64,
1327    alias_name: String,
1328    created_at: String,
1329}
1330
1331fn alias_from_row(row: AliasRow) -> EntityAlias {
1332    EntityAlias {
1333        id: row.id,
1334        entity_id: row.entity_id,
1335        alias_name: row.alias_name,
1336        created_at: row.created_at,
1337    }
1338}
1339
1340#[derive(sqlx::FromRow)]
1341struct EdgeRow {
1342    id: i64,
1343    source_entity_id: i64,
1344    target_entity_id: i64,
1345    relation: String,
1346    fact: String,
1347    confidence: f64,
1348    valid_from: String,
1349    valid_to: Option<String>,
1350    created_at: String,
1351    expired_at: Option<String>,
1352    episode_id: Option<i64>,
1353    qdrant_point_id: Option<String>,
1354}
1355
1356fn edge_from_row(row: EdgeRow) -> Edge {
1357    Edge {
1358        id: row.id,
1359        source_entity_id: row.source_entity_id,
1360        target_entity_id: row.target_entity_id,
1361        relation: row.relation,
1362        fact: row.fact,
1363        #[allow(clippy::cast_possible_truncation)]
1364        confidence: row.confidence as f32,
1365        valid_from: row.valid_from,
1366        valid_to: row.valid_to,
1367        created_at: row.created_at,
1368        expired_at: row.expired_at,
1369        episode_id: row.episode_id.map(MessageId),
1370        qdrant_point_id: row.qdrant_point_id,
1371    }
1372}
1373
1374#[derive(sqlx::FromRow)]
1375struct CommunityRow {
1376    id: i64,
1377    name: String,
1378    summary: String,
1379    entity_ids: String,
1380    fingerprint: Option<String>,
1381    created_at: String,
1382    updated_at: String,
1383}
1384
1385// ── Tests ─────────────────────────────────────────────────────────────────────
1386
1387#[cfg(test)]
1388mod tests;