Skip to main content

zeph_memory/graph/
store.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use std::collections::HashMap;
5
6use futures::Stream;
7use sqlx::SqlitePool;
8
9use crate::error::MemoryError;
10use crate::sqlite::messages::sanitize_fts5_query;
11use crate::types::MessageId;
12
13use super::types::{Community, Edge, Entity, EntityAlias, EntityType};
14
15pub struct GraphStore {
16    pool: SqlitePool,
17}
18
19impl GraphStore {
20    #[must_use]
21    pub fn new(pool: SqlitePool) -> Self {
22        Self { pool }
23    }
24
25    #[must_use]
26    pub fn pool(&self) -> &SqlitePool {
27        &self.pool
28    }
29
30    // ── Entities ─────────────────────────────────────────────────────────────
31
32    /// Insert or update an entity by `(canonical_name, entity_type)`.
33    ///
34    /// - `surface_name`: the original display form (e.g. `"Rust"`) — stored in the `name` column
35    ///   so user-facing output preserves casing. Updated on every upsert to the latest seen form.
36    /// - `canonical_name`: the stable normalized key (e.g. `"rust"`) — used for deduplication.
37    /// - `summary`: pass `None` to preserve the existing summary; pass `Some("")` to blank it.
38    ///
39    /// # Errors
40    ///
41    /// Returns an error if the database query fails.
42    pub async fn upsert_entity(
43        &self,
44        surface_name: &str,
45        canonical_name: &str,
46        entity_type: EntityType,
47        summary: Option<&str>,
48    ) -> Result<i64, MemoryError> {
49        let type_str = entity_type.as_str();
50        let id: i64 = sqlx::query_scalar(
51            "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
52             VALUES (?1, ?2, ?3, ?4)
53             ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
54               name = excluded.name,
55               summary = COALESCE(excluded.summary, summary),
56               last_seen_at = datetime('now')
57             RETURNING id",
58        )
59        .bind(surface_name)
60        .bind(canonical_name)
61        .bind(type_str)
62        .bind(summary)
63        .fetch_one(&self.pool)
64        .await?;
65        Ok(id)
66    }
67
68    /// Find an entity by exact canonical name and type.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the database query fails.
73    pub async fn find_entity(
74        &self,
75        canonical_name: &str,
76        entity_type: EntityType,
77    ) -> Result<Option<Entity>, MemoryError> {
78        let type_str = entity_type.as_str();
79        let row: Option<EntityRow> = sqlx::query_as(
80            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
81             FROM graph_entities
82             WHERE canonical_name = ?1 AND entity_type = ?2",
83        )
84        .bind(canonical_name)
85        .bind(type_str)
86        .fetch_optional(&self.pool)
87        .await?;
88        row.map(entity_from_row).transpose()
89    }
90
91    /// Find an entity by its numeric ID.
92    ///
93    /// # Errors
94    ///
95    /// Returns an error if the database query fails.
96    pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
97        let row: Option<EntityRow> = sqlx::query_as(
98            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
99             FROM graph_entities
100             WHERE id = ?1",
101        )
102        .bind(entity_id)
103        .fetch_optional(&self.pool)
104        .await?;
105        row.map(entity_from_row).transpose()
106    }
107
108    /// Update the `qdrant_point_id` for an entity.
109    ///
110    /// # Errors
111    ///
112    /// Returns an error if the database query fails.
113    pub async fn set_entity_qdrant_point_id(
114        &self,
115        entity_id: i64,
116        point_id: &str,
117    ) -> Result<(), MemoryError> {
118        sqlx::query("UPDATE graph_entities SET qdrant_point_id = ?1 WHERE id = ?2")
119            .bind(point_id)
120            .bind(entity_id)
121            .execute(&self.pool)
122            .await?;
123        Ok(())
124    }
125
126    /// Find entities matching `query` in name, summary, or aliases, up to `limit` results, ranked by relevance.
127    ///
128    /// Uses FTS5 MATCH with prefix wildcards (`token*`) and bm25 ranking. Name matches are
129    /// weighted 10x higher than summary matches. Also searches `graph_entity_aliases` for
130    /// alias matches via a UNION query.
131    ///
132    /// # Behavioral note
133    ///
134    /// This replaces the previous `LIKE '%query%'` implementation. FTS5 prefix matching differs
135    /// from substring matching: searching "SQL" will match "`SQLite`" (prefix) but NOT "`GraphQL`"
136    /// (substring). Entity names are indexed as single tokens by the unicode61 tokenizer, so
137    /// mid-word substrings are not matched. This is a known trade-off for index performance.
138    ///
139    /// Single-character queries (e.g., "a") are allowed and produce a broad prefix match ("a*").
140    /// The `limit` parameter caps the result set. No minimum query length is enforced; if this
141    /// causes noise in practice, add a minimum length guard at the call site.
142    ///
143    /// # Errors
144    ///
145    /// Returns an error if the database query fails.
146    pub async fn find_entities_fuzzy(
147        &self,
148        query: &str,
149        limit: usize,
150    ) -> Result<Vec<Entity>, MemoryError> {
151        // FTS5 boolean operator keywords (case-sensitive uppercase). Filtering these
152        // prevents syntax errors when user input contains them as literal search terms
153        // (e.g., "graph OR unrelated" must not produce "graph* OR* unrelated*").
154        const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
155        let query = &query[..query.floor_char_boundary(512)];
156        // Sanitize input: split on non-alphanumeric characters, filter empty tokens,
157        // append '*' to each token for FTS5 prefix matching ("graph" -> "graph*").
158        let sanitized = sanitize_fts5_query(query);
159        if sanitized.is_empty() {
160            return Ok(vec![]);
161        }
162        let fts_query: String = sanitized
163            .split_whitespace()
164            .filter(|t| !FTS5_OPERATORS.contains(t))
165            .map(|t| format!("{t}*"))
166            .collect::<Vec<_>>()
167            .join(" ");
168        if fts_query.is_empty() {
169            return Ok(vec![]);
170        }
171
172        let limit = i64::try_from(limit)?;
173        // bm25(graph_entities_fts, 10.0, 1.0): name column weighted 10x over summary.
174        // bm25() returns negative values; ORDER BY ASC puts best matches first.
175        let rows: Vec<EntityRow> = sqlx::query_as(
176            "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary,
177                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id
178             FROM graph_entities_fts fts
179             JOIN graph_entities e ON e.id = fts.rowid
180             WHERE graph_entities_fts MATCH ?1
181             UNION
182             SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary,
183                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id
184             FROM graph_entity_aliases a
185             JOIN graph_entities e ON e.id = a.entity_id
186             WHERE a.alias_name LIKE ?2 ESCAPE '\\' COLLATE NOCASE
187             LIMIT ?3",
188        )
189        .bind(&fts_query)
190        .bind(format!(
191            "%{}%",
192            query
193                .trim()
194                .replace('\\', "\\\\")
195                .replace('%', "\\%")
196                .replace('_', "\\_")
197        ))
198        .bind(limit)
199        .fetch_all(&self.pool)
200        .await?;
201        rows.into_iter()
202            .map(entity_from_row)
203            .collect::<Result<Vec<_>, _>>()
204    }
205
206    /// Stream all entities from the database incrementally (true cursor, no full-table load).
207    pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
208        use futures::StreamExt as _;
209        sqlx::query_as::<_, EntityRow>(
210            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
211             FROM graph_entities ORDER BY id ASC",
212        )
213        .fetch(&self.pool)
214        .map(|r: Result<EntityRow, sqlx::Error>| {
215            r.map_err(MemoryError::from).and_then(entity_from_row)
216        })
217    }
218
219    // ── Alias methods ─────────────────────────────────────────────────────────
220
221    /// Insert an alias for an entity (idempotent: duplicate alias is silently ignored via UNIQUE constraint).
222    ///
223    /// # Errors
224    ///
225    /// Returns an error if the database query fails.
226    pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
227        sqlx::query(
228            "INSERT OR IGNORE INTO graph_entity_aliases (entity_id, alias_name) VALUES (?1, ?2)",
229        )
230        .bind(entity_id)
231        .bind(alias_name)
232        .execute(&self.pool)
233        .await?;
234        Ok(())
235    }
236
237    /// Find an entity by alias name and entity type (case-insensitive).
238    ///
239    /// Filters by `entity_type` to avoid cross-type alias collisions (S2 fix).
240    ///
241    /// # Errors
242    ///
243    /// Returns an error if the database query fails.
244    pub async fn find_entity_by_alias(
245        &self,
246        alias_name: &str,
247        entity_type: EntityType,
248    ) -> Result<Option<Entity>, MemoryError> {
249        let type_str = entity_type.as_str();
250        let row: Option<EntityRow> = sqlx::query_as(
251            "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary,
252                    e.first_seen_at, e.last_seen_at, e.qdrant_point_id
253             FROM graph_entity_aliases a
254             JOIN graph_entities e ON e.id = a.entity_id
255             WHERE a.alias_name = ?1 COLLATE NOCASE
256               AND e.entity_type = ?2
257             ORDER BY e.id ASC
258             LIMIT 1",
259        )
260        .bind(alias_name)
261        .bind(type_str)
262        .fetch_optional(&self.pool)
263        .await?;
264        row.map(entity_from_row).transpose()
265    }
266
267    /// Get all aliases for an entity.
268    ///
269    /// # Errors
270    ///
271    /// Returns an error if the database query fails.
272    pub async fn aliases_for_entity(
273        &self,
274        entity_id: i64,
275    ) -> Result<Vec<EntityAlias>, MemoryError> {
276        let rows: Vec<AliasRow> = sqlx::query_as(
277            "SELECT id, entity_id, alias_name, created_at
278             FROM graph_entity_aliases
279             WHERE entity_id = ?1
280             ORDER BY id ASC",
281        )
282        .bind(entity_id)
283        .fetch_all(&self.pool)
284        .await?;
285        Ok(rows.into_iter().map(alias_from_row).collect())
286    }
287
288    /// Collect all entities into a Vec.
289    ///
290    /// # Errors
291    ///
292    /// Returns an error if the database query fails or `entity_type` parsing fails.
293    pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
294        use futures::TryStreamExt as _;
295        self.all_entities_stream().try_collect().await
296    }
297
298    /// Count the total number of entities.
299    ///
300    /// # Errors
301    ///
302    /// Returns an error if the database query fails.
303    pub async fn entity_count(&self) -> Result<i64, MemoryError> {
304        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM graph_entities")
305            .fetch_one(&self.pool)
306            .await?;
307        Ok(count)
308    }
309
310    // ── Edges ─────────────────────────────────────────────────────────────────
311
312    /// Insert a new edge between two entities, or update the existing active edge.
313    ///
314    /// An active edge is identified by `(source_entity_id, target_entity_id, relation)` with
315    /// `valid_to IS NULL`. If such an edge already exists, its `confidence` is updated to the
316    /// maximum of the stored and incoming values, and the existing id is returned. This prevents
317    /// duplicate edges from repeated extraction of the same context messages.
318    ///
319    /// # Errors
320    ///
321    /// Returns an error if the database query fails.
322    pub async fn insert_edge(
323        &self,
324        source_entity_id: i64,
325        target_entity_id: i64,
326        relation: &str,
327        fact: &str,
328        confidence: f32,
329        episode_id: Option<MessageId>,
330    ) -> Result<i64, MemoryError> {
331        let confidence = confidence.clamp(0.0, 1.0);
332
333        let existing: Option<(i64, f64)> = sqlx::query_as(
334            "SELECT id, confidence FROM graph_edges
335             WHERE source_entity_id = ?1
336               AND target_entity_id = ?2
337               AND relation = ?3
338               AND valid_to IS NULL
339             LIMIT 1",
340        )
341        .bind(source_entity_id)
342        .bind(target_entity_id)
343        .bind(relation)
344        .fetch_optional(&self.pool)
345        .await?;
346
347        if let Some((existing_id, stored_conf)) = existing {
348            let updated_conf = f64::from(confidence).max(stored_conf);
349            sqlx::query("UPDATE graph_edges SET confidence = ?1 WHERE id = ?2")
350                .bind(updated_conf)
351                .bind(existing_id)
352                .execute(&self.pool)
353                .await?;
354            return Ok(existing_id);
355        }
356
357        let episode_raw: Option<i64> = episode_id.map(|m| m.0);
358        let id: i64 = sqlx::query_scalar(
359            "INSERT INTO graph_edges (source_entity_id, target_entity_id, relation, fact, confidence, episode_id)
360             VALUES (?1, ?2, ?3, ?4, ?5, ?6)
361             RETURNING id",
362        )
363        .bind(source_entity_id)
364        .bind(target_entity_id)
365        .bind(relation)
366        .bind(fact)
367        .bind(f64::from(confidence))
368        .bind(episode_raw)
369        .fetch_one(&self.pool)
370        .await?;
371        Ok(id)
372    }
373
374    /// Mark an edge as invalid (set `valid_to` and `expired_at` to now).
375    ///
376    /// # Errors
377    ///
378    /// Returns an error if the database update fails.
379    pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
380        sqlx::query(
381            "UPDATE graph_edges SET valid_to = datetime('now'), expired_at = datetime('now')
382             WHERE id = ?1",
383        )
384        .bind(edge_id)
385        .execute(&self.pool)
386        .await?;
387        Ok(())
388    }
389
390    /// Get all active edges where entity is source or target.
391    ///
392    /// # Errors
393    ///
394    /// Returns an error if the database query fails.
395    pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
396        let rows: Vec<EdgeRow> = sqlx::query_as(
397            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
398                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
399             FROM graph_edges
400             WHERE valid_to IS NULL
401               AND (source_entity_id = ?1 OR target_entity_id = ?1)",
402        )
403        .bind(entity_id)
404        .fetch_all(&self.pool)
405        .await?;
406        Ok(rows.into_iter().map(edge_from_row).collect())
407    }
408
409    /// Get all edges (active and expired) where entity is source or target, ordered by
410    /// `valid_from DESC`. Used by the `/graph history <name>` slash command.
411    ///
412    /// # Errors
413    ///
414    /// Returns an error if the database query fails or if `limit` overflows `i64`.
415    pub async fn edge_history_for_entity(
416        &self,
417        entity_id: i64,
418        limit: usize,
419    ) -> Result<Vec<Edge>, MemoryError> {
420        let limit = i64::try_from(limit)?;
421        let rows: Vec<EdgeRow> = sqlx::query_as(
422            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
423                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
424             FROM graph_edges
425             WHERE source_entity_id = ?1 OR target_entity_id = ?1
426             ORDER BY valid_from DESC
427             LIMIT ?2",
428        )
429        .bind(entity_id)
430        .bind(limit)
431        .fetch_all(&self.pool)
432        .await?;
433        Ok(rows.into_iter().map(edge_from_row).collect())
434    }
435
436    /// Get all active edges between two entities (both directions).
437    ///
438    /// # Errors
439    ///
440    /// Returns an error if the database query fails.
441    pub async fn edges_between(
442        &self,
443        entity_a: i64,
444        entity_b: i64,
445    ) -> Result<Vec<Edge>, MemoryError> {
446        let rows: Vec<EdgeRow> = sqlx::query_as(
447            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
448                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
449             FROM graph_edges
450             WHERE valid_to IS NULL
451               AND ((source_entity_id = ?1 AND target_entity_id = ?2)
452                 OR (source_entity_id = ?2 AND target_entity_id = ?1))",
453        )
454        .bind(entity_a)
455        .bind(entity_b)
456        .fetch_all(&self.pool)
457        .await?;
458        Ok(rows.into_iter().map(edge_from_row).collect())
459    }
460
461    /// Get active edges from `source` to `target` in the exact direction (no reverse).
462    ///
463    /// # Errors
464    ///
465    /// Returns an error if the database query fails.
466    pub async fn edges_exact(
467        &self,
468        source_entity_id: i64,
469        target_entity_id: i64,
470    ) -> Result<Vec<Edge>, MemoryError> {
471        let rows: Vec<EdgeRow> = sqlx::query_as(
472            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
473                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
474             FROM graph_edges
475             WHERE valid_to IS NULL
476               AND source_entity_id = ?1
477               AND target_entity_id = ?2",
478        )
479        .bind(source_entity_id)
480        .bind(target_entity_id)
481        .fetch_all(&self.pool)
482        .await?;
483        Ok(rows.into_iter().map(edge_from_row).collect())
484    }
485
486    /// Count active (non-invalidated) edges.
487    ///
488    /// # Errors
489    ///
490    /// Returns an error if the database query fails.
491    pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
492        let count: i64 =
493            sqlx::query_scalar("SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL")
494                .fetch_one(&self.pool)
495                .await?;
496        Ok(count)
497    }
498
499    // ── Communities ───────────────────────────────────────────────────────────
500
501    /// Insert or update a community by name.
502    ///
503    /// `fingerprint` is a BLAKE3 hex string computed from sorted entity IDs and
504    /// intra-community edge IDs. Pass `None` to leave the fingerprint unchanged (e.g. when
505    /// `assign_to_community` adds an entity without a full re-detection pass).
506    ///
507    /// # Errors
508    ///
509    /// Returns an error if the database query fails or JSON serialization fails.
510    pub async fn upsert_community(
511        &self,
512        name: &str,
513        summary: &str,
514        entity_ids: &[i64],
515        fingerprint: Option<&str>,
516    ) -> Result<i64, MemoryError> {
517        let entity_ids_json = serde_json::to_string(entity_ids)?;
518        let id: i64 = sqlx::query_scalar(
519            "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
520             VALUES (?1, ?2, ?3, ?4)
521             ON CONFLICT(name) DO UPDATE SET
522               summary = excluded.summary,
523               entity_ids = excluded.entity_ids,
524               fingerprint = COALESCE(excluded.fingerprint, fingerprint),
525               updated_at = datetime('now')
526             RETURNING id",
527        )
528        .bind(name)
529        .bind(summary)
530        .bind(entity_ids_json)
531        .bind(fingerprint)
532        .fetch_one(&self.pool)
533        .await?;
534        Ok(id)
535    }
536
537    /// Return a map of `fingerprint -> community_id` for all communities with a non-NULL
538    /// fingerprint. Used by `detect_communities` to skip unchanged partitions.
539    ///
540    /// # Errors
541    ///
542    /// Returns an error if the database query fails.
543    pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
544        let rows: Vec<(String, i64)> = sqlx::query_as(
545            "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL",
546        )
547        .fetch_all(&self.pool)
548        .await?;
549        Ok(rows.into_iter().collect())
550    }
551
552    /// Delete a single community by its primary key.
553    ///
554    /// # Errors
555    ///
556    /// Returns an error if the database query fails.
557    pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
558        sqlx::query("DELETE FROM graph_communities WHERE id = ?1")
559            .bind(id)
560            .execute(&self.pool)
561            .await?;
562        Ok(())
563    }
564
565    /// Set the fingerprint of a community to `NULL`, invalidating the incremental cache.
566    ///
567    /// Used by `assign_to_community` when an entity is added without a full re-detection pass,
568    /// ensuring the next `detect_communities` run re-summarizes the affected community.
569    ///
570    /// # Errors
571    ///
572    /// Returns an error if the database query fails.
573    pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
574        sqlx::query("UPDATE graph_communities SET fingerprint = NULL WHERE id = ?1")
575            .bind(id)
576            .execute(&self.pool)
577            .await?;
578        Ok(())
579    }
580
581    /// Find the first community that contains the given `entity_id`.
582    ///
583    /// Uses `json_each()` to push the membership search into `SQLite`, avoiding a full
584    /// table scan with per-row JSON parsing.
585    ///
586    /// # Errors
587    ///
588    /// Returns an error if the database query fails or JSON parsing fails.
589    pub async fn community_for_entity(
590        &self,
591        entity_id: i64,
592    ) -> Result<Option<Community>, MemoryError> {
593        let row: Option<CommunityRow> = sqlx::query_as(
594            "SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
595             FROM graph_communities c, json_each(c.entity_ids) j
596             WHERE CAST(j.value AS INTEGER) = ?1
597             LIMIT 1",
598        )
599        .bind(entity_id)
600        .fetch_optional(&self.pool)
601        .await?;
602        match row {
603            Some(row) => {
604                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
605                Ok(Some(Community {
606                    id: row.id,
607                    name: row.name,
608                    summary: row.summary,
609                    entity_ids,
610                    fingerprint: row.fingerprint,
611                    created_at: row.created_at,
612                    updated_at: row.updated_at,
613                }))
614            }
615            None => Ok(None),
616        }
617    }
618
619    /// Get all communities.
620    ///
621    /// # Errors
622    ///
623    /// Returns an error if the database query fails or JSON parsing fails.
624    pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
625        let rows: Vec<CommunityRow> = sqlx::query_as(
626            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
627             FROM graph_communities
628             ORDER BY id ASC",
629        )
630        .fetch_all(&self.pool)
631        .await?;
632
633        rows.into_iter()
634            .map(|row| {
635                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
636                Ok(Community {
637                    id: row.id,
638                    name: row.name,
639                    summary: row.summary,
640                    entity_ids,
641                    fingerprint: row.fingerprint,
642                    created_at: row.created_at,
643                    updated_at: row.updated_at,
644                })
645            })
646            .collect()
647    }
648
649    /// Count the total number of communities.
650    ///
651    /// # Errors
652    ///
653    /// Returns an error if the database query fails.
654    pub async fn community_count(&self) -> Result<i64, MemoryError> {
655        let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM graph_communities")
656            .fetch_one(&self.pool)
657            .await?;
658        Ok(count)
659    }
660
661    // ── Metadata ──────────────────────────────────────────────────────────────
662
663    /// Get a metadata value by key.
664    ///
665    /// # Errors
666    ///
667    /// Returns an error if the database query fails.
668    pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
669        let val: Option<String> =
670            sqlx::query_scalar("SELECT value FROM graph_metadata WHERE key = ?1")
671                .bind(key)
672                .fetch_optional(&self.pool)
673                .await?;
674        Ok(val)
675    }
676
677    /// Set a metadata value by key (upsert).
678    ///
679    /// # Errors
680    ///
681    /// Returns an error if the database query fails.
682    pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
683        sqlx::query(
684            "INSERT INTO graph_metadata (key, value) VALUES (?1, ?2)
685             ON CONFLICT(key) DO UPDATE SET value = excluded.value",
686        )
687        .bind(key)
688        .bind(value)
689        .execute(&self.pool)
690        .await?;
691        Ok(())
692    }
693
694    /// Get the current extraction count from metadata.
695    ///
696    /// Returns 0 if the counter has not been initialized.
697    ///
698    /// # Errors
699    ///
700    /// Returns an error if the database query fails.
701    pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
702        let val = self.get_metadata("extraction_count").await?;
703        Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
704    }
705
706    /// Stream all active (non-invalidated) edges.
707    pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
708        use futures::StreamExt as _;
709        sqlx::query_as::<_, EdgeRow>(
710            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
711                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
712             FROM graph_edges
713             WHERE valid_to IS NULL
714             ORDER BY id ASC",
715        )
716        .fetch(&self.pool)
717        .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
718    }
719
720    /// Fetch a chunk of active edges using keyset pagination.
721    ///
722    /// Returns edges with `id > after_id` in ascending order, up to `limit` rows.
723    /// Starting with `after_id = 0` returns the first chunk. Pass the last `id` from
724    /// the returned chunk as `after_id` for the next page. An empty result means all
725    /// edges have been consumed.
726    ///
727    /// Keyset pagination is O(1) per page (index seek on `id`) vs OFFSET which is O(N).
728    /// It is also stable under concurrent inserts: new edges get monotonically higher IDs
729    /// and will appear in subsequent chunks or after the last chunk, never causing
730    /// duplicates. Concurrent invalidations (setting `valid_to`) may cause a single edge
731    /// to be skipped, which is acceptable — LPA operates on an eventual-consistency snapshot.
732    ///
733    /// # Errors
734    ///
735    /// Returns an error if the database query fails.
736    pub async fn edges_after_id(
737        &self,
738        after_id: i64,
739        limit: i64,
740    ) -> Result<Vec<Edge>, MemoryError> {
741        let rows: Vec<EdgeRow> = sqlx::query_as(
742            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
743                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
744             FROM graph_edges
745             WHERE valid_to IS NULL AND id > ?1
746             ORDER BY id ASC
747             LIMIT ?2",
748        )
749        .bind(after_id)
750        .bind(limit)
751        .fetch_all(&self.pool)
752        .await?;
753        Ok(rows.into_iter().map(edge_from_row).collect())
754    }
755
756    /// Find a community by its primary key.
757    ///
758    /// # Errors
759    ///
760    /// Returns an error if the database query fails or JSON parsing fails.
761    pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
762        let row: Option<CommunityRow> = sqlx::query_as(
763            "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
764             FROM graph_communities
765             WHERE id = ?1",
766        )
767        .bind(id)
768        .fetch_optional(&self.pool)
769        .await?;
770        match row {
771            Some(row) => {
772                let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
773                Ok(Some(Community {
774                    id: row.id,
775                    name: row.name,
776                    summary: row.summary,
777                    entity_ids,
778                    fingerprint: row.fingerprint,
779                    created_at: row.created_at,
780                    updated_at: row.updated_at,
781                }))
782            }
783            None => Ok(None),
784        }
785    }
786
787    /// Delete all communities (full rebuild before upsert).
788    ///
789    /// # Errors
790    ///
791    /// Returns an error if the database query fails.
792    pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
793        sqlx::query("DELETE FROM graph_communities")
794            .execute(&self.pool)
795            .await?;
796        Ok(())
797    }
798
799    /// Delete expired edges older than `retention_days` and return count deleted.
800    ///
801    /// # Errors
802    ///
803    /// Returns an error if the database query fails.
804    pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
805        let days = i64::from(retention_days);
806        let result = sqlx::query(
807            "DELETE FROM graph_edges
808             WHERE expired_at IS NOT NULL
809               AND expired_at < datetime('now', '-' || ?1 || ' days')",
810        )
811        .bind(days)
812        .execute(&self.pool)
813        .await?;
814        Ok(usize::try_from(result.rows_affected())?)
815    }
816
817    /// Delete orphan entities (no active edges, last seen more than `retention_days` ago).
818    ///
819    /// # Errors
820    ///
821    /// Returns an error if the database query fails.
822    pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
823        let days = i64::from(retention_days);
824        let result = sqlx::query(
825            "DELETE FROM graph_entities
826             WHERE id NOT IN (
827                 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
828                 UNION
829                 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
830             )
831             AND last_seen_at < datetime('now', '-' || ?1 || ' days')",
832        )
833        .bind(days)
834        .execute(&self.pool)
835        .await?;
836        Ok(usize::try_from(result.rows_affected())?)
837    }
838
839    /// Delete the oldest excess entities when count exceeds `max_entities`.
840    ///
841    /// Entities are ranked by ascending edge count, then ascending `last_seen_at` (LRU).
842    /// Only deletes when `entity_count() > max_entities`.
843    ///
844    /// # Errors
845    ///
846    /// Returns an error if the database query fails.
847    pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
848        let current = self.entity_count().await?;
849        let max = i64::try_from(max_entities)?;
850        if current <= max {
851            return Ok(0);
852        }
853        let excess = current - max;
854        let result = sqlx::query(
855            "DELETE FROM graph_entities
856             WHERE id IN (
857                 SELECT e.id
858                 FROM graph_entities e
859                 LEFT JOIN (
860                     SELECT source_entity_id AS eid, COUNT(*) AS cnt
861                     FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
862                     UNION ALL
863                     SELECT target_entity_id AS eid, COUNT(*) AS cnt
864                     FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
865                 ) edge_counts ON e.id = edge_counts.eid
866                 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
867                 LIMIT ?1
868             )",
869        )
870        .bind(excess)
871        .execute(&self.pool)
872        .await?;
873        Ok(usize::try_from(result.rows_affected())?)
874    }
875
876    // ── Temporal Edge Queries ─────────────────────────────────────────────────
877
878    /// Return all edges for `entity_id` (as source or target) that were valid at `timestamp`.
879    ///
880    /// An edge is valid at `timestamp` when:
881    /// - `valid_from <= timestamp`, AND
882    /// - `valid_to IS NULL` (open-ended) OR `valid_to > timestamp`.
883    ///
884    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
885    ///
886    /// # Errors
887    ///
888    /// Returns an error if the database query fails.
889    pub async fn edges_at_timestamp(
890        &self,
891        entity_id: i64,
892        timestamp: &str,
893    ) -> Result<Vec<Edge>, MemoryError> {
894        // Split into two UNIONed branches to leverage the partial indexes from migration 030:
895        //   Branch 1 (active edges):     idx_graph_edges_valid + idx_graph_edges_source/target
896        //   Branch 2 (historical edges): idx_graph_edges_src_temporal / idx_graph_edges_tgt_temporal
897        let rows: Vec<EdgeRow> = sqlx::query_as(
898            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
899                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
900             FROM graph_edges
901             WHERE valid_to IS NULL
902               AND valid_from <= ?2
903               AND (source_entity_id = ?1 OR target_entity_id = ?1)
904             UNION ALL
905             SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
906                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
907             FROM graph_edges
908             WHERE valid_to IS NOT NULL
909               AND valid_from <= ?2
910               AND valid_to > ?2
911               AND (source_entity_id = ?1 OR target_entity_id = ?1)",
912        )
913        .bind(entity_id)
914        .bind(timestamp)
915        .fetch_all(&self.pool)
916        .await?;
917        Ok(rows.into_iter().map(edge_from_row).collect())
918    }
919
920    /// Return all edge versions (active and expired) for the given `(source, predicate)` pair.
921    ///
922    /// The optional `relation` filter restricts results to a specific relation label.
923    /// Results are ordered by `valid_from DESC` (most recent first).
924    ///
925    /// # Errors
926    ///
927    /// Returns an error if the database query fails.
928    pub async fn edge_history(
929        &self,
930        source_entity_id: i64,
931        predicate: &str,
932        relation: Option<&str>,
933        limit: usize,
934    ) -> Result<Vec<Edge>, MemoryError> {
935        // Escape LIKE wildcards so `%` and `_` in the predicate are treated as literals.
936        let escaped = predicate
937            .replace('\\', "\\\\")
938            .replace('%', "\\%")
939            .replace('_', "\\_");
940        let like_pattern = format!("%{escaped}%");
941        let limit = i64::try_from(limit)?;
942        let rows: Vec<EdgeRow> = if let Some(rel) = relation {
943            sqlx::query_as(
944                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
945                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
946                 FROM graph_edges
947                 WHERE source_entity_id = ?1
948                   AND fact LIKE ?2 ESCAPE '\\'
949                   AND relation = ?3
950                 ORDER BY valid_from DESC
951                 LIMIT ?4",
952            )
953            .bind(source_entity_id)
954            .bind(&like_pattern)
955            .bind(rel)
956            .bind(limit)
957            .fetch_all(&self.pool)
958            .await?
959        } else {
960            sqlx::query_as(
961                "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
962                        valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
963                 FROM graph_edges
964                 WHERE source_entity_id = ?1
965                   AND fact LIKE ?2 ESCAPE '\\'
966                 ORDER BY valid_from DESC
967                 LIMIT ?3",
968            )
969            .bind(source_entity_id)
970            .bind(&like_pattern)
971            .bind(limit)
972            .fetch_all(&self.pool)
973            .await?
974        };
975        Ok(rows.into_iter().map(edge_from_row).collect())
976    }
977
978    // ── BFS Traversal ─────────────────────────────────────────────────────────
979
980    /// Breadth-first traversal from `start_entity_id` up to `max_hops` hops.
981    ///
982    /// Returns all reachable entities and the active edges connecting them.
983    /// Implements BFS iteratively in Rust to guarantee cycle safety regardless
984    /// of `SQLite` CTE limitations.
985    ///
986    /// **`SQLite` bind parameter limit**: each BFS hop binds the frontier IDs three times in the
987    /// neighbour query. At ~300+ frontier entities per hop, the IN clause may approach `SQLite`'s
988    /// default `SQLITE_MAX_VARIABLE_NUMBER` limit of 999. Acceptable for Phase 1 (small graphs,
989    /// `max_hops` typically 2–3). For large graphs, consider batching or a temp-table approach.
990    ///
991    /// # Errors
992    ///
993    /// Returns an error if any database query fails.
994    pub async fn bfs(
995        &self,
996        start_entity_id: i64,
997        max_hops: u32,
998    ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
999        self.bfs_with_depth(start_entity_id, max_hops)
1000            .await
1001            .map(|(e, ed, _)| (e, ed))
1002    }
1003
1004    /// BFS traversal returning entities, edges, and a depth map (`entity_id` → hop distance).
1005    ///
1006    /// The depth map records the minimum hop distance from `start_entity_id` to each visited
1007    /// entity. The start entity itself has depth 0.
1008    ///
1009    /// **`SQLite` bind parameter limit**: see [`bfs`] for notes on frontier size limits.
1010    ///
1011    /// # Errors
1012    ///
1013    /// Returns an error if any database query fails.
1014    pub async fn bfs_with_depth(
1015        &self,
1016        start_entity_id: i64,
1017        max_hops: u32,
1018    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1019        self.bfs_core(start_entity_id, max_hops, None).await
1020    }
1021
1022    /// BFS traversal considering only edges that were valid at `timestamp`.
1023    ///
1024    /// Equivalent to [`bfs_with_depth`] but replaces the `valid_to IS NULL` filter with
1025    /// the temporal range predicate `valid_from <= ts AND (valid_to IS NULL OR valid_to > ts)`.
1026    ///
1027    /// `timestamp` must be a `SQLite` datetime string: `"YYYY-MM-DD HH:MM:SS"`.
1028    ///
1029    /// # Errors
1030    ///
1031    /// Returns an error if any database query fails.
1032    pub async fn bfs_at_timestamp(
1033        &self,
1034        start_entity_id: i64,
1035        max_hops: u32,
1036        timestamp: &str,
1037    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1038        self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1039            .await
1040    }
1041
1042    /// Shared BFS implementation.
1043    ///
1044    /// When `at_timestamp` is `None`, only active edges (`valid_to IS NULL`) are traversed.
1045    /// When `at_timestamp` is `Some(ts)`, edges valid at `ts` are traversed (temporal BFS).
1046    ///
1047    /// All IDs used in dynamic SQL come from our own database — no user input reaches the
1048    /// format string, so there is no SQL injection risk.
1049    async fn bfs_core(
1050        &self,
1051        start_entity_id: i64,
1052        max_hops: u32,
1053        at_timestamp: Option<&str>,
1054    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1055        use std::collections::HashMap;
1056
1057        // SQLite binds frontier IDs 3× per hop; at >333 IDs the IN clause exceeds
1058        // SQLITE_MAX_VARIABLE_NUMBER (999). Cap to 300 to stay safely within the limit.
1059        const MAX_FRONTIER: usize = 300;
1060
1061        let mut depth_map: HashMap<i64, u32> = HashMap::new();
1062        let mut frontier: Vec<i64> = vec![start_entity_id];
1063        depth_map.insert(start_entity_id, 0);
1064
1065        for hop in 0..max_hops {
1066            if frontier.is_empty() {
1067                break;
1068            }
1069            frontier.truncate(MAX_FRONTIER);
1070            // IDs come from our own DB — no user input, no injection risk.
1071            let placeholders = frontier
1072                .iter()
1073                .enumerate()
1074                .map(|(i, _)| format!("?{}", i + 1))
1075                .collect::<Vec<_>>()
1076                .join(", ");
1077            let edge_filter = if at_timestamp.is_some() {
1078                let ts_pos = frontier.len() * 3 + 1;
1079                format!("valid_from <= ?{ts_pos} AND (valid_to IS NULL OR valid_to > ?{ts_pos})")
1080            } else {
1081                "valid_to IS NULL".to_owned()
1082            };
1083            let neighbour_sql = format!(
1084                "SELECT DISTINCT CASE
1085                     WHEN source_entity_id IN ({placeholders}) THEN target_entity_id
1086                     ELSE source_entity_id
1087                 END as neighbour_id
1088                 FROM graph_edges
1089                 WHERE {edge_filter}
1090                   AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders}))"
1091            );
1092            let mut q = sqlx::query_scalar::<_, i64>(&neighbour_sql);
1093            for id in &frontier {
1094                q = q.bind(*id);
1095            }
1096            for id in &frontier {
1097                q = q.bind(*id);
1098            }
1099            for id in &frontier {
1100                q = q.bind(*id);
1101            }
1102            if let Some(ts) = at_timestamp {
1103                q = q.bind(ts);
1104            }
1105            let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1106            let mut next_frontier: Vec<i64> = Vec::new();
1107            for nbr in neighbours {
1108                if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1109                    e.insert(hop + 1);
1110                    next_frontier.push(nbr);
1111                }
1112            }
1113            frontier = next_frontier;
1114        }
1115
1116        self.bfs_fetch_results(depth_map, at_timestamp).await
1117    }
1118
1119    /// Fetch entities and edges for a completed BFS depth map.
1120    async fn bfs_fetch_results(
1121        &self,
1122        depth_map: std::collections::HashMap<i64, u32>,
1123        at_timestamp: Option<&str>,
1124    ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1125        let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1126        if visited_ids.is_empty() {
1127            return Ok((Vec::new(), Vec::new(), depth_map));
1128        }
1129        // Edge query binds visited_ids twice — cap at 499 to stay under SQLite 999 limit.
1130        if visited_ids.len() > 499 {
1131            tracing::warn!(
1132                total = visited_ids.len(),
1133                retained = 499,
1134                "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1135                 some reachable entities will be dropped from results"
1136            );
1137            visited_ids.truncate(499);
1138        }
1139
1140        let placeholders = visited_ids
1141            .iter()
1142            .enumerate()
1143            .map(|(i, _)| format!("?{}", i + 1))
1144            .collect::<Vec<_>>()
1145            .join(", ");
1146        let edge_filter = if at_timestamp.is_some() {
1147            let ts_pos = visited_ids.len() * 2 + 1;
1148            format!("valid_from <= ?{ts_pos} AND (valid_to IS NULL OR valid_to > ?{ts_pos})")
1149        } else {
1150            "valid_to IS NULL".to_owned()
1151        };
1152        let edge_sql = format!(
1153            "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1154                    valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
1155             FROM graph_edges
1156             WHERE {edge_filter}
1157               AND source_entity_id IN ({placeholders})
1158               AND target_entity_id IN ({placeholders})"
1159        );
1160        let mut edge_query = sqlx::query_as::<_, EdgeRow>(&edge_sql);
1161        for id in &visited_ids {
1162            edge_query = edge_query.bind(*id);
1163        }
1164        for id in &visited_ids {
1165            edge_query = edge_query.bind(*id);
1166        }
1167        if let Some(ts) = at_timestamp {
1168            edge_query = edge_query.bind(ts);
1169        }
1170        let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1171
1172        let entity_sql = format!(
1173            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1174             FROM graph_entities WHERE id IN ({placeholders})"
1175        );
1176        let mut entity_query = sqlx::query_as::<_, EntityRow>(&entity_sql);
1177        for id in &visited_ids {
1178            entity_query = entity_query.bind(*id);
1179        }
1180        let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1181
1182        let entities: Vec<Entity> = entity_rows
1183            .into_iter()
1184            .map(entity_from_row)
1185            .collect::<Result<Vec<_>, _>>()?;
1186        let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1187
1188        Ok((entities, edges, depth_map))
1189    }
1190
1191    // ── Backfill helpers ──────────────────────────────────────────────────────
1192
1193    /// Find an entity by name only (no type filter).
1194    ///
1195    /// Uses a two-phase lookup to ensure exact name matches are always prioritised:
1196    /// 1. Exact case-insensitive match on `name` or `canonical_name`.
1197    /// 2. If no exact match found, falls back to FTS5 prefix search (see `find_entities_fuzzy`).
1198    ///
1199    /// This prevents FTS5 from returning a different entity whose *summary* mentions the
1200    /// searched name (e.g. searching "Alice" returning "Google" because Google's summary
1201    /// contains "Alice").
1202    ///
1203    /// # Errors
1204    ///
1205    /// Returns an error if the database query fails.
1206    pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
1207        let rows: Vec<EntityRow> = sqlx::query_as(
1208            "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1209             FROM graph_entities
1210             WHERE name = ?1 COLLATE NOCASE OR canonical_name = ?1 COLLATE NOCASE
1211             LIMIT 5",
1212        )
1213        .bind(name)
1214        .fetch_all(&self.pool)
1215        .await?;
1216
1217        if !rows.is_empty() {
1218            return rows.into_iter().map(entity_from_row).collect();
1219        }
1220
1221        self.find_entities_fuzzy(name, 5).await
1222    }
1223
1224    /// Return up to `limit` messages that have not yet been processed by graph extraction.
1225    ///
1226    /// Reads the `graph_processed` column added by migration 021.
1227    ///
1228    /// # Errors
1229    ///
1230    /// Returns an error if the database query fails.
1231    pub async fn unprocessed_messages_for_backfill(
1232        &self,
1233        limit: usize,
1234    ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
1235        let limit = i64::try_from(limit)?;
1236        let rows: Vec<(i64, String)> = sqlx::query_as(
1237            "SELECT id, content FROM messages
1238             WHERE graph_processed = 0
1239             ORDER BY id ASC
1240             LIMIT ?1",
1241        )
1242        .bind(limit)
1243        .fetch_all(&self.pool)
1244        .await?;
1245        Ok(rows
1246            .into_iter()
1247            .map(|(id, content)| (crate::types::MessageId(id), content))
1248            .collect())
1249    }
1250
1251    /// Return the count of messages not yet processed by graph extraction.
1252    ///
1253    /// # Errors
1254    ///
1255    /// Returns an error if the database query fails.
1256    pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
1257        let count: i64 =
1258            sqlx::query_scalar("SELECT COUNT(*) FROM messages WHERE graph_processed = 0")
1259                .fetch_one(&self.pool)
1260                .await?;
1261        Ok(count)
1262    }
1263
1264    /// Mark a batch of messages as graph-processed.
1265    ///
1266    /// # Errors
1267    ///
1268    /// Returns an error if the database query fails.
1269    pub async fn mark_messages_graph_processed(
1270        &self,
1271        ids: &[crate::types::MessageId],
1272    ) -> Result<(), MemoryError> {
1273        if ids.is_empty() {
1274            return Ok(());
1275        }
1276        let placeholders = ids
1277            .iter()
1278            .enumerate()
1279            .map(|(i, _)| format!("?{}", i + 1))
1280            .collect::<Vec<_>>()
1281            .join(", ");
1282        let sql = format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
1283        let mut query = sqlx::query(&sql);
1284        for id in ids {
1285            query = query.bind(id.0);
1286        }
1287        query.execute(&self.pool).await?;
1288        Ok(())
1289    }
1290}
1291
1292// ── Row types for sqlx::query_as ─────────────────────────────────────────────
1293
1294#[derive(sqlx::FromRow)]
1295struct EntityRow {
1296    id: i64,
1297    name: String,
1298    canonical_name: String,
1299    entity_type: String,
1300    summary: Option<String>,
1301    first_seen_at: String,
1302    last_seen_at: String,
1303    qdrant_point_id: Option<String>,
1304}
1305
1306fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
1307    let entity_type = row
1308        .entity_type
1309        .parse::<EntityType>()
1310        .map_err(MemoryError::GraphStore)?;
1311    Ok(Entity {
1312        id: row.id,
1313        name: row.name,
1314        canonical_name: row.canonical_name,
1315        entity_type,
1316        summary: row.summary,
1317        first_seen_at: row.first_seen_at,
1318        last_seen_at: row.last_seen_at,
1319        qdrant_point_id: row.qdrant_point_id,
1320    })
1321}
1322
1323#[derive(sqlx::FromRow)]
1324struct AliasRow {
1325    id: i64,
1326    entity_id: i64,
1327    alias_name: String,
1328    created_at: String,
1329}
1330
1331fn alias_from_row(row: AliasRow) -> EntityAlias {
1332    EntityAlias {
1333        id: row.id,
1334        entity_id: row.entity_id,
1335        alias_name: row.alias_name,
1336        created_at: row.created_at,
1337    }
1338}
1339
1340#[derive(sqlx::FromRow)]
1341struct EdgeRow {
1342    id: i64,
1343    source_entity_id: i64,
1344    target_entity_id: i64,
1345    relation: String,
1346    fact: String,
1347    confidence: f64,
1348    valid_from: String,
1349    valid_to: Option<String>,
1350    created_at: String,
1351    expired_at: Option<String>,
1352    episode_id: Option<i64>,
1353    qdrant_point_id: Option<String>,
1354}
1355
1356fn edge_from_row(row: EdgeRow) -> Edge {
1357    Edge {
1358        id: row.id,
1359        source_entity_id: row.source_entity_id,
1360        target_entity_id: row.target_entity_id,
1361        relation: row.relation,
1362        fact: row.fact,
1363        #[allow(clippy::cast_possible_truncation)]
1364        confidence: row.confidence as f32,
1365        valid_from: row.valid_from,
1366        valid_to: row.valid_to,
1367        created_at: row.created_at,
1368        expired_at: row.expired_at,
1369        episode_id: row.episode_id.map(MessageId),
1370        qdrant_point_id: row.qdrant_point_id,
1371    }
1372}
1373
1374#[derive(sqlx::FromRow)]
1375struct CommunityRow {
1376    id: i64,
1377    name: String,
1378    summary: String,
1379    entity_ids: String,
1380    fingerprint: Option<String>,
1381    created_at: String,
1382    updated_at: String,
1383}
1384
1385// ── Tests ─────────────────────────────────────────────────────────────────────
1386
1387#[cfg(test)]
1388mod tests {
1389    use super::*;
1390    use crate::sqlite::SqliteStore;
1391
1392    async fn setup() -> GraphStore {
1393        let store = SqliteStore::new(":memory:").await.unwrap();
1394        GraphStore::new(store.pool().clone())
1395    }
1396
1397    #[tokio::test]
1398    async fn upsert_entity_insert_new() {
1399        let gs = setup().await;
1400        let id = gs
1401            .upsert_entity("Alice", "Alice", EntityType::Person, Some("a person"))
1402            .await
1403            .unwrap();
1404        assert!(id > 0);
1405    }
1406
1407    #[tokio::test]
1408    async fn upsert_entity_update_existing() {
1409        let gs = setup().await;
1410        let id1 = gs
1411            .upsert_entity("Alice", "Alice", EntityType::Person, None)
1412            .await
1413            .unwrap();
1414        // Sleep 1ms to ensure datetime changes; SQLite datetime granularity is 1s,
1415        // so we verify idempotency instead of timestamp ordering.
1416        let id2 = gs
1417            .upsert_entity("Alice", "Alice", EntityType::Person, Some("updated"))
1418            .await
1419            .unwrap();
1420        assert_eq!(id1, id2);
1421        let entity = gs
1422            .find_entity("Alice", EntityType::Person)
1423            .await
1424            .unwrap()
1425            .unwrap();
1426        assert_eq!(entity.summary.as_deref(), Some("updated"));
1427    }
1428
1429    #[tokio::test]
1430    async fn find_entity_found() {
1431        let gs = setup().await;
1432        gs.upsert_entity("Bob", "Bob", EntityType::Tool, Some("a tool"))
1433            .await
1434            .unwrap();
1435        let entity = gs
1436            .find_entity("Bob", EntityType::Tool)
1437            .await
1438            .unwrap()
1439            .unwrap();
1440        assert_eq!(entity.name, "Bob");
1441        assert_eq!(entity.entity_type, EntityType::Tool);
1442    }
1443
1444    #[tokio::test]
1445    async fn find_entity_not_found() {
1446        let gs = setup().await;
1447        let result = gs.find_entity("Nobody", EntityType::Person).await.unwrap();
1448        assert!(result.is_none());
1449    }
1450
1451    #[tokio::test]
1452    async fn find_entities_fuzzy_partial_match() {
1453        let gs = setup().await;
1454        gs.upsert_entity("GraphQL", "GraphQL", EntityType::Concept, None)
1455            .await
1456            .unwrap();
1457        gs.upsert_entity("Graph", "Graph", EntityType::Concept, None)
1458            .await
1459            .unwrap();
1460        gs.upsert_entity("Unrelated", "Unrelated", EntityType::Concept, None)
1461            .await
1462            .unwrap();
1463
1464        let results = gs.find_entities_fuzzy("graph", 10).await.unwrap();
1465        assert_eq!(results.len(), 2);
1466        assert!(results.iter().any(|e| e.name == "GraphQL"));
1467        assert!(results.iter().any(|e| e.name == "Graph"));
1468    }
1469
1470    #[tokio::test]
1471    async fn entity_count_empty() {
1472        let gs = setup().await;
1473        assert_eq!(gs.entity_count().await.unwrap(), 0);
1474    }
1475
1476    #[tokio::test]
1477    async fn entity_count_non_empty() {
1478        let gs = setup().await;
1479        gs.upsert_entity("A", "A", EntityType::Concept, None)
1480            .await
1481            .unwrap();
1482        gs.upsert_entity("B", "B", EntityType::Concept, None)
1483            .await
1484            .unwrap();
1485        assert_eq!(gs.entity_count().await.unwrap(), 2);
1486    }
1487
1488    #[tokio::test]
1489    async fn all_entities_and_stream() {
1490        use futures::StreamExt as _;
1491
1492        let gs = setup().await;
1493        gs.upsert_entity("X", "X", EntityType::Project, None)
1494            .await
1495            .unwrap();
1496        gs.upsert_entity("Y", "Y", EntityType::Language, None)
1497            .await
1498            .unwrap();
1499
1500        let all = gs.all_entities().await.unwrap();
1501        assert_eq!(all.len(), 2);
1502        let streamed: Vec<Result<Entity, _>> = gs.all_entities_stream().collect().await;
1503        assert_eq!(streamed.len(), 2);
1504        assert!(streamed.iter().all(Result::is_ok));
1505    }
1506
1507    #[tokio::test]
1508    async fn insert_edge_without_episode() {
1509        let gs = setup().await;
1510        let src = gs
1511            .upsert_entity("Src", "Src", EntityType::Concept, None)
1512            .await
1513            .unwrap();
1514        let tgt = gs
1515            .upsert_entity("Tgt", "Tgt", EntityType::Concept, None)
1516            .await
1517            .unwrap();
1518        let eid = gs
1519            .insert_edge(src, tgt, "relates_to", "Src relates to Tgt", 0.9, None)
1520            .await
1521            .unwrap();
1522        assert!(eid > 0);
1523    }
1524
1525    #[tokio::test]
1526    async fn insert_edge_deduplicates_active_edge() {
1527        let gs = setup().await;
1528        let src = gs
1529            .upsert_entity("Alice", "Alice", EntityType::Person, None)
1530            .await
1531            .unwrap();
1532        let tgt = gs
1533            .upsert_entity("Google", "Google", EntityType::Organization, None)
1534            .await
1535            .unwrap();
1536
1537        let id1 = gs
1538            .insert_edge(src, tgt, "works_at", "Alice works at Google", 0.7, None)
1539            .await
1540            .unwrap();
1541
1542        // Re-inserting the same (source, target, relation) must return the same id.
1543        let id2 = gs
1544            .insert_edge(src, tgt, "works_at", "Alice works at Google", 0.9, None)
1545            .await
1546            .unwrap();
1547        assert_eq!(id1, id2, "duplicate active edge must not be created");
1548
1549        // Confidence should be updated to the higher value.
1550        let count: i64 =
1551            sqlx::query_scalar("SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL")
1552                .fetch_one(&gs.pool)
1553                .await
1554                .unwrap();
1555        assert_eq!(count, 1, "only one active edge must exist");
1556
1557        let conf: f64 = sqlx::query_scalar("SELECT confidence FROM graph_edges WHERE id = ?1")
1558            .bind(id1)
1559            .fetch_one(&gs.pool)
1560            .await
1561            .unwrap();
1562        // Use 1e-6 tolerance: 0.9_f32 → f64 conversion is ~0.8999999761581421.
1563        assert!(
1564            (conf - f64::from(0.9_f32)).abs() < 1e-6,
1565            "confidence must be updated to max, got {conf}"
1566        );
1567    }
1568
1569    #[tokio::test]
1570    async fn insert_edge_different_relations_are_distinct() {
1571        let gs = setup().await;
1572        let src = gs
1573            .upsert_entity("Bob", "Bob", EntityType::Person, None)
1574            .await
1575            .unwrap();
1576        let tgt = gs
1577            .upsert_entity("Acme", "Acme", EntityType::Organization, None)
1578            .await
1579            .unwrap();
1580
1581        let id1 = gs
1582            .insert_edge(src, tgt, "founded", "Bob founded Acme", 0.8, None)
1583            .await
1584            .unwrap();
1585        let id2 = gs
1586            .insert_edge(src, tgt, "chairs", "Bob chairs Acme", 0.8, None)
1587            .await
1588            .unwrap();
1589        assert_ne!(id1, id2, "different relations must produce distinct edges");
1590
1591        let count: i64 =
1592            sqlx::query_scalar("SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL")
1593                .fetch_one(&gs.pool)
1594                .await
1595                .unwrap();
1596        assert_eq!(count, 2);
1597    }
1598
1599    #[tokio::test]
1600    async fn insert_edge_with_episode() {
1601        let gs = setup().await;
1602        let src = gs
1603            .upsert_entity("Src2", "Src2", EntityType::Concept, None)
1604            .await
1605            .unwrap();
1606        let tgt = gs
1607            .upsert_entity("Tgt2", "Tgt2", EntityType::Concept, None)
1608            .await
1609            .unwrap();
1610        // Verifies that passing an episode_id does not cause a panic or unexpected error on the
1611        // insertion path itself. The episode_id references the messages table; whether the FK
1612        // constraint fires depends on the SQLite FK enforcement mode at runtime. Both success
1613        // (FK off) and FK-violation error are acceptable outcomes for this test — we only assert
1614        // that insert_edge does not panic or return an unexpected error type.
1615        let episode = MessageId(999);
1616        let result = gs
1617            .insert_edge(src, tgt, "uses", "Src2 uses Tgt2", 1.0, Some(episode))
1618            .await;
1619        match &result {
1620            Ok(eid) => assert!(*eid > 0, "inserted edge should have positive id"),
1621            Err(MemoryError::Sqlite(_)) => {} // FK constraint failed — acceptable
1622            Err(e) => panic!("unexpected error: {e}"),
1623        }
1624    }
1625
1626    #[tokio::test]
1627    async fn invalidate_edge_sets_timestamps() {
1628        let gs = setup().await;
1629        let src = gs
1630            .upsert_entity("E1", "E1", EntityType::Concept, None)
1631            .await
1632            .unwrap();
1633        let tgt = gs
1634            .upsert_entity("E2", "E2", EntityType::Concept, None)
1635            .await
1636            .unwrap();
1637        let eid = gs
1638            .insert_edge(src, tgt, "r", "fact", 1.0, None)
1639            .await
1640            .unwrap();
1641        gs.invalidate_edge(eid).await.unwrap();
1642
1643        let row: (Option<String>, Option<String>) =
1644            sqlx::query_as("SELECT valid_to, expired_at FROM graph_edges WHERE id = ?1")
1645                .bind(eid)
1646                .fetch_one(&gs.pool)
1647                .await
1648                .unwrap();
1649        assert!(row.0.is_some(), "valid_to should be set");
1650        assert!(row.1.is_some(), "expired_at should be set");
1651    }
1652
1653    #[tokio::test]
1654    async fn edges_for_entity_both_directions() {
1655        let gs = setup().await;
1656        let a = gs
1657            .upsert_entity("A", "A", EntityType::Concept, None)
1658            .await
1659            .unwrap();
1660        let b = gs
1661            .upsert_entity("B", "B", EntityType::Concept, None)
1662            .await
1663            .unwrap();
1664        let c = gs
1665            .upsert_entity("C", "C", EntityType::Concept, None)
1666            .await
1667            .unwrap();
1668        gs.insert_edge(a, b, "r", "f1", 1.0, None).await.unwrap();
1669        gs.insert_edge(c, a, "r", "f2", 1.0, None).await.unwrap();
1670
1671        let edges = gs.edges_for_entity(a).await.unwrap();
1672        assert_eq!(edges.len(), 2);
1673    }
1674
1675    #[tokio::test]
1676    async fn edges_between_both_directions() {
1677        let gs = setup().await;
1678        let a = gs
1679            .upsert_entity("PA", "PA", EntityType::Person, None)
1680            .await
1681            .unwrap();
1682        let b = gs
1683            .upsert_entity("PB", "PB", EntityType::Person, None)
1684            .await
1685            .unwrap();
1686        gs.insert_edge(a, b, "knows", "PA knows PB", 1.0, None)
1687            .await
1688            .unwrap();
1689
1690        let fwd = gs.edges_between(a, b).await.unwrap();
1691        assert_eq!(fwd.len(), 1);
1692        let rev = gs.edges_between(b, a).await.unwrap();
1693        assert_eq!(rev.len(), 1);
1694    }
1695
1696    #[tokio::test]
1697    async fn active_edge_count_excludes_invalidated() {
1698        let gs = setup().await;
1699        let a = gs
1700            .upsert_entity("N1", "N1", EntityType::Concept, None)
1701            .await
1702            .unwrap();
1703        let b = gs
1704            .upsert_entity("N2", "N2", EntityType::Concept, None)
1705            .await
1706            .unwrap();
1707        let e1 = gs.insert_edge(a, b, "r1", "f1", 1.0, None).await.unwrap();
1708        gs.insert_edge(a, b, "r2", "f2", 1.0, None).await.unwrap();
1709        gs.invalidate_edge(e1).await.unwrap();
1710
1711        assert_eq!(gs.active_edge_count().await.unwrap(), 1);
1712    }
1713
1714    #[tokio::test]
1715    async fn upsert_community_insert_and_update() {
1716        let gs = setup().await;
1717        let id1 = gs
1718            .upsert_community("clusterA", "summary A", &[1, 2, 3], None)
1719            .await
1720            .unwrap();
1721        assert!(id1 > 0);
1722        let id2 = gs
1723            .upsert_community("clusterA", "summary A updated", &[1, 2, 3, 4], None)
1724            .await
1725            .unwrap();
1726        assert_eq!(id1, id2);
1727        let communities = gs.all_communities().await.unwrap();
1728        assert_eq!(communities.len(), 1);
1729        assert_eq!(communities[0].summary, "summary A updated");
1730        assert_eq!(communities[0].entity_ids, vec![1, 2, 3, 4]);
1731    }
1732
1733    #[tokio::test]
1734    async fn community_for_entity_found() {
1735        let gs = setup().await;
1736        let a = gs
1737            .upsert_entity("CA", "CA", EntityType::Concept, None)
1738            .await
1739            .unwrap();
1740        let b = gs
1741            .upsert_entity("CB", "CB", EntityType::Concept, None)
1742            .await
1743            .unwrap();
1744        gs.upsert_community("cA", "summary", &[a, b], None)
1745            .await
1746            .unwrap();
1747        let result = gs.community_for_entity(a).await.unwrap();
1748        assert!(result.is_some());
1749        assert_eq!(result.unwrap().name, "cA");
1750    }
1751
1752    #[tokio::test]
1753    async fn community_for_entity_not_found() {
1754        let gs = setup().await;
1755        let result = gs.community_for_entity(999).await.unwrap();
1756        assert!(result.is_none());
1757    }
1758
1759    #[tokio::test]
1760    async fn community_count() {
1761        let gs = setup().await;
1762        assert_eq!(gs.community_count().await.unwrap(), 0);
1763        gs.upsert_community("c1", "s1", &[], None).await.unwrap();
1764        gs.upsert_community("c2", "s2", &[], None).await.unwrap();
1765        assert_eq!(gs.community_count().await.unwrap(), 2);
1766    }
1767
1768    #[tokio::test]
1769    async fn metadata_get_set_round_trip() {
1770        let gs = setup().await;
1771        assert_eq!(gs.get_metadata("counter").await.unwrap(), None);
1772        gs.set_metadata("counter", "42").await.unwrap();
1773        assert_eq!(gs.get_metadata("counter").await.unwrap(), Some("42".into()));
1774        gs.set_metadata("counter", "43").await.unwrap();
1775        assert_eq!(gs.get_metadata("counter").await.unwrap(), Some("43".into()));
1776    }
1777
1778    #[tokio::test]
1779    async fn bfs_max_hops_0_returns_only_start() {
1780        let gs = setup().await;
1781        let a = gs
1782            .upsert_entity("BfsA", "BfsA", EntityType::Concept, None)
1783            .await
1784            .unwrap();
1785        let b = gs
1786            .upsert_entity("BfsB", "BfsB", EntityType::Concept, None)
1787            .await
1788            .unwrap();
1789        gs.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
1790
1791        let (entities, edges) = gs.bfs(a, 0).await.unwrap();
1792        assert_eq!(entities.len(), 1);
1793        assert_eq!(entities[0].id, a);
1794        assert!(edges.is_empty());
1795    }
1796
1797    #[tokio::test]
1798    async fn bfs_max_hops_2_chain() {
1799        let gs = setup().await;
1800        let a = gs
1801            .upsert_entity("ChainA", "ChainA", EntityType::Concept, None)
1802            .await
1803            .unwrap();
1804        let b = gs
1805            .upsert_entity("ChainB", "ChainB", EntityType::Concept, None)
1806            .await
1807            .unwrap();
1808        let c = gs
1809            .upsert_entity("ChainC", "ChainC", EntityType::Concept, None)
1810            .await
1811            .unwrap();
1812        gs.insert_edge(a, b, "r", "f1", 1.0, None).await.unwrap();
1813        gs.insert_edge(b, c, "r", "f2", 1.0, None).await.unwrap();
1814
1815        let (entities, edges) = gs.bfs(a, 2).await.unwrap();
1816        let ids: Vec<_> = entities.iter().map(|e| e.id).collect();
1817        assert!(ids.contains(&a));
1818        assert!(ids.contains(&b));
1819        assert!(ids.contains(&c));
1820        assert_eq!(edges.len(), 2);
1821    }
1822
1823    #[tokio::test]
1824    async fn bfs_cycle_no_infinite_loop() {
1825        let gs = setup().await;
1826        let a = gs
1827            .upsert_entity("CycA", "CycA", EntityType::Concept, None)
1828            .await
1829            .unwrap();
1830        let b = gs
1831            .upsert_entity("CycB", "CycB", EntityType::Concept, None)
1832            .await
1833            .unwrap();
1834        gs.insert_edge(a, b, "r", "f1", 1.0, None).await.unwrap();
1835        gs.insert_edge(b, a, "r", "f2", 1.0, None).await.unwrap();
1836
1837        let (entities, _edges) = gs.bfs(a, 3).await.unwrap();
1838        let ids: Vec<_> = entities.iter().map(|e| e.id).collect();
1839        // Should have exactly A and B, no infinite loop
1840        assert!(ids.contains(&a));
1841        assert!(ids.contains(&b));
1842        assert_eq!(ids.len(), 2);
1843    }
1844
1845    #[tokio::test]
1846    async fn test_invalidated_edges_excluded_from_bfs() {
1847        let gs = setup().await;
1848        let a = gs
1849            .upsert_entity("InvA", "InvA", EntityType::Concept, None)
1850            .await
1851            .unwrap();
1852        let b = gs
1853            .upsert_entity("InvB", "InvB", EntityType::Concept, None)
1854            .await
1855            .unwrap();
1856        let c = gs
1857            .upsert_entity("InvC", "InvC", EntityType::Concept, None)
1858            .await
1859            .unwrap();
1860        let ab = gs.insert_edge(a, b, "r", "f1", 1.0, None).await.unwrap();
1861        gs.insert_edge(b, c, "r", "f2", 1.0, None).await.unwrap();
1862        // Invalidate A->B: BFS from A should not reach B or C.
1863        gs.invalidate_edge(ab).await.unwrap();
1864
1865        let (entities, edges) = gs.bfs(a, 2).await.unwrap();
1866        let ids: Vec<_> = entities.iter().map(|e| e.id).collect();
1867        assert_eq!(ids, vec![a], "only start entity should be reachable");
1868        assert!(edges.is_empty(), "no active edges should be returned");
1869    }
1870
1871    #[tokio::test]
1872    async fn test_bfs_empty_graph() {
1873        let gs = setup().await;
1874        let a = gs
1875            .upsert_entity("IsoA", "IsoA", EntityType::Concept, None)
1876            .await
1877            .unwrap();
1878
1879        let (entities, edges) = gs.bfs(a, 2).await.unwrap();
1880        let ids: Vec<_> = entities.iter().map(|e| e.id).collect();
1881        assert_eq!(ids, vec![a], "isolated node: only start entity returned");
1882        assert!(edges.is_empty(), "no edges for isolated node");
1883    }
1884
1885    #[tokio::test]
1886    async fn test_bfs_diamond() {
1887        let gs = setup().await;
1888        let a = gs
1889            .upsert_entity("DiamA", "DiamA", EntityType::Concept, None)
1890            .await
1891            .unwrap();
1892        let b = gs
1893            .upsert_entity("DiamB", "DiamB", EntityType::Concept, None)
1894            .await
1895            .unwrap();
1896        let c = gs
1897            .upsert_entity("DiamC", "DiamC", EntityType::Concept, None)
1898            .await
1899            .unwrap();
1900        let d = gs
1901            .upsert_entity("DiamD", "DiamD", EntityType::Concept, None)
1902            .await
1903            .unwrap();
1904        gs.insert_edge(a, b, "r", "f1", 1.0, None).await.unwrap();
1905        gs.insert_edge(a, c, "r", "f2", 1.0, None).await.unwrap();
1906        gs.insert_edge(b, d, "r", "f3", 1.0, None).await.unwrap();
1907        gs.insert_edge(c, d, "r", "f4", 1.0, None).await.unwrap();
1908
1909        let (entities, edges) = gs.bfs(a, 2).await.unwrap();
1910        let mut ids: Vec<_> = entities.iter().map(|e| e.id).collect();
1911        ids.sort_unstable();
1912        let mut expected = vec![a, b, c, d];
1913        expected.sort_unstable();
1914        assert_eq!(ids, expected, "all 4 nodes reachable, no duplicates");
1915        assert_eq!(edges.len(), 4, "all 4 edges returned");
1916    }
1917
1918    #[tokio::test]
1919    async fn extraction_count_default_zero() {
1920        let gs = setup().await;
1921        assert_eq!(gs.extraction_count().await.unwrap(), 0);
1922    }
1923
1924    #[tokio::test]
1925    async fn extraction_count_after_set() {
1926        let gs = setup().await;
1927        gs.set_metadata("extraction_count", "7").await.unwrap();
1928        assert_eq!(gs.extraction_count().await.unwrap(), 7);
1929    }
1930
1931    #[tokio::test]
1932    async fn all_active_edges_stream_excludes_invalidated() {
1933        use futures::TryStreamExt as _;
1934        let gs = setup().await;
1935        let a = gs
1936            .upsert_entity("SA", "SA", EntityType::Concept, None)
1937            .await
1938            .unwrap();
1939        let b = gs
1940            .upsert_entity("SB", "SB", EntityType::Concept, None)
1941            .await
1942            .unwrap();
1943        let c = gs
1944            .upsert_entity("SC", "SC", EntityType::Concept, None)
1945            .await
1946            .unwrap();
1947        let e1 = gs.insert_edge(a, b, "r", "f1", 1.0, None).await.unwrap();
1948        gs.insert_edge(b, c, "r", "f2", 1.0, None).await.unwrap();
1949        gs.invalidate_edge(e1).await.unwrap();
1950
1951        let edges: Vec<_> = gs.all_active_edges_stream().try_collect().await.unwrap();
1952        assert_eq!(edges.len(), 1, "only the active edge should be returned");
1953        assert_eq!(edges[0].source_entity_id, b);
1954        assert_eq!(edges[0].target_entity_id, c);
1955    }
1956
1957    #[tokio::test]
1958    async fn find_community_by_id_found_and_not_found() {
1959        let gs = setup().await;
1960        let cid = gs
1961            .upsert_community("grp", "summary", &[1, 2], None)
1962            .await
1963            .unwrap();
1964        let found = gs.find_community_by_id(cid).await.unwrap();
1965        assert!(found.is_some());
1966        assert_eq!(found.unwrap().name, "grp");
1967
1968        let missing = gs.find_community_by_id(9999).await.unwrap();
1969        assert!(missing.is_none());
1970    }
1971
1972    #[tokio::test]
1973    async fn delete_all_communities_clears_table() {
1974        let gs = setup().await;
1975        gs.upsert_community("c1", "s1", &[1], None).await.unwrap();
1976        gs.upsert_community("c2", "s2", &[2], None).await.unwrap();
1977        assert_eq!(gs.community_count().await.unwrap(), 2);
1978        gs.delete_all_communities().await.unwrap();
1979        assert_eq!(gs.community_count().await.unwrap(), 0);
1980    }
1981
1982    #[tokio::test]
1983    async fn test_find_entities_fuzzy_no_results() {
1984        let gs = setup().await;
1985        gs.upsert_entity("Alpha", "Alpha", EntityType::Concept, None)
1986            .await
1987            .unwrap();
1988        let results = gs.find_entities_fuzzy("zzzznonexistent", 10).await.unwrap();
1989        assert!(
1990            results.is_empty(),
1991            "no entities should match an unknown term"
1992        );
1993    }
1994
1995    // ── Canonicalization / alias tests ────────────────────────────────────────
1996
1997    #[tokio::test]
1998    async fn upsert_entity_stores_canonical_name() {
1999        let gs = setup().await;
2000        gs.upsert_entity("rust", "rust", EntityType::Language, None)
2001            .await
2002            .unwrap();
2003        let entity = gs
2004            .find_entity("rust", EntityType::Language)
2005            .await
2006            .unwrap()
2007            .unwrap();
2008        assert_eq!(entity.canonical_name, "rust");
2009        assert_eq!(entity.name, "rust");
2010    }
2011
2012    #[tokio::test]
2013    async fn add_alias_idempotent() {
2014        let gs = setup().await;
2015        let id = gs
2016            .upsert_entity("rust", "rust", EntityType::Language, None)
2017            .await
2018            .unwrap();
2019        gs.add_alias(id, "rust-lang").await.unwrap();
2020        // Second insert should succeed silently (INSERT OR IGNORE)
2021        gs.add_alias(id, "rust-lang").await.unwrap();
2022        let aliases = gs.aliases_for_entity(id).await.unwrap();
2023        assert_eq!(
2024            aliases
2025                .iter()
2026                .filter(|a| a.alias_name == "rust-lang")
2027                .count(),
2028            1
2029        );
2030    }
2031
2032    // ── FTS5 fuzzy search tests ──────────────────────────────────────────────
2033
2034    #[tokio::test]
2035    async fn find_entity_by_id_found() {
2036        let gs = setup().await;
2037        let id = gs
2038            .upsert_entity("FindById", "finbyid", EntityType::Concept, Some("summary"))
2039            .await
2040            .unwrap();
2041        let entity = gs.find_entity_by_id(id).await.unwrap();
2042        assert!(entity.is_some());
2043        let entity = entity.unwrap();
2044        assert_eq!(entity.id, id);
2045        assert_eq!(entity.name, "FindById");
2046    }
2047
2048    #[tokio::test]
2049    async fn find_entity_by_id_not_found() {
2050        let gs = setup().await;
2051        let result = gs.find_entity_by_id(99999).await.unwrap();
2052        assert!(result.is_none());
2053    }
2054
2055    #[tokio::test]
2056    async fn set_entity_qdrant_point_id_updates() {
2057        let gs = setup().await;
2058        let id = gs
2059            .upsert_entity("QdrantPoint", "qdrantpoint", EntityType::Concept, None)
2060            .await
2061            .unwrap();
2062        let point_id = "550e8400-e29b-41d4-a716-446655440000";
2063        gs.set_entity_qdrant_point_id(id, point_id).await.unwrap();
2064
2065        let entity = gs.find_entity_by_id(id).await.unwrap().unwrap();
2066        assert_eq!(entity.qdrant_point_id.as_deref(), Some(point_id));
2067    }
2068
2069    #[tokio::test]
2070    async fn find_entities_fuzzy_matches_summary() {
2071        let gs = setup().await;
2072        gs.upsert_entity(
2073            "Rust",
2074            "Rust",
2075            EntityType::Language,
2076            Some("a systems programming language"),
2077        )
2078        .await
2079        .unwrap();
2080        gs.upsert_entity(
2081            "Go",
2082            "Go",
2083            EntityType::Language,
2084            Some("a compiled language by Google"),
2085        )
2086        .await
2087        .unwrap();
2088        // Search by summary word — should find "Rust" by "systems" in summary.
2089        let results = gs.find_entities_fuzzy("systems", 10).await.unwrap();
2090        assert_eq!(results.len(), 1);
2091        assert_eq!(results[0].name, "Rust");
2092    }
2093
2094    #[tokio::test]
2095    async fn find_entities_fuzzy_empty_query() {
2096        let gs = setup().await;
2097        gs.upsert_entity("Alpha", "Alpha", EntityType::Concept, None)
2098            .await
2099            .unwrap();
2100        // Empty query returns empty vec without hitting the database.
2101        let results = gs.find_entities_fuzzy("", 10).await.unwrap();
2102        assert!(results.is_empty(), "empty query should return no results");
2103        // Whitespace-only query also returns empty.
2104        let results = gs.find_entities_fuzzy("   ", 10).await.unwrap();
2105        assert!(
2106            results.is_empty(),
2107            "whitespace query should return no results"
2108        );
2109    }
2110
2111    #[tokio::test]
2112    async fn find_entity_by_alias_case_insensitive() {
2113        let gs = setup().await;
2114        let id = gs
2115            .upsert_entity("rust", "rust", EntityType::Language, None)
2116            .await
2117            .unwrap();
2118        gs.add_alias(id, "rust").await.unwrap();
2119        gs.add_alias(id, "rust-lang").await.unwrap();
2120
2121        let found = gs
2122            .find_entity_by_alias("RUST-LANG", EntityType::Language)
2123            .await
2124            .unwrap();
2125        assert!(found.is_some());
2126        assert_eq!(found.unwrap().id, id);
2127    }
2128
2129    #[tokio::test]
2130    async fn find_entity_by_alias_returns_none_for_unknown() {
2131        let gs = setup().await;
2132        let id = gs
2133            .upsert_entity("rust", "rust", EntityType::Language, None)
2134            .await
2135            .unwrap();
2136        gs.add_alias(id, "rust").await.unwrap();
2137
2138        let found = gs
2139            .find_entity_by_alias("python", EntityType::Language)
2140            .await
2141            .unwrap();
2142        assert!(found.is_none());
2143    }
2144
2145    #[tokio::test]
2146    async fn find_entity_by_alias_filters_by_entity_type() {
2147        // "python" alias for Language should NOT match when looking for Tool type
2148        let gs = setup().await;
2149        let lang_id = gs
2150            .upsert_entity("python", "python", EntityType::Language, None)
2151            .await
2152            .unwrap();
2153        gs.add_alias(lang_id, "python").await.unwrap();
2154
2155        let found_tool = gs
2156            .find_entity_by_alias("python", EntityType::Tool)
2157            .await
2158            .unwrap();
2159        assert!(
2160            found_tool.is_none(),
2161            "cross-type alias collision must not occur"
2162        );
2163
2164        let found_lang = gs
2165            .find_entity_by_alias("python", EntityType::Language)
2166            .await
2167            .unwrap();
2168        assert!(found_lang.is_some());
2169        assert_eq!(found_lang.unwrap().id, lang_id);
2170    }
2171
2172    #[tokio::test]
2173    async fn aliases_for_entity_returns_all() {
2174        let gs = setup().await;
2175        let id = gs
2176            .upsert_entity("rust", "rust", EntityType::Language, None)
2177            .await
2178            .unwrap();
2179        gs.add_alias(id, "rust").await.unwrap();
2180        gs.add_alias(id, "rust-lang").await.unwrap();
2181        gs.add_alias(id, "rustlang").await.unwrap();
2182
2183        let aliases = gs.aliases_for_entity(id).await.unwrap();
2184        assert_eq!(aliases.len(), 3);
2185        let names: Vec<&str> = aliases.iter().map(|a| a.alias_name.as_str()).collect();
2186        assert!(names.contains(&"rust"));
2187        assert!(names.contains(&"rust-lang"));
2188        assert!(names.contains(&"rustlang"));
2189    }
2190
2191    #[tokio::test]
2192    async fn find_entities_fuzzy_includes_aliases() {
2193        let gs = setup().await;
2194        let id = gs
2195            .upsert_entity("rust", "rust", EntityType::Language, None)
2196            .await
2197            .unwrap();
2198        gs.add_alias(id, "rust-lang").await.unwrap();
2199        gs.upsert_entity("python", "python", EntityType::Language, None)
2200            .await
2201            .unwrap();
2202
2203        // "rust-lang" is an alias, not the entity name — fuzzy search should still find it
2204        let results = gs.find_entities_fuzzy("rust-lang", 10).await.unwrap();
2205        assert!(!results.is_empty());
2206        assert!(results.iter().any(|e| e.id == id));
2207    }
2208
2209    #[tokio::test]
2210    async fn orphan_alias_cleanup_on_entity_delete() {
2211        let gs = setup().await;
2212        let id = gs
2213            .upsert_entity("rust", "rust", EntityType::Language, None)
2214            .await
2215            .unwrap();
2216        gs.add_alias(id, "rust").await.unwrap();
2217        gs.add_alias(id, "rust-lang").await.unwrap();
2218
2219        // Delete the entity directly (bypassing FK for test purposes)
2220        sqlx::query("DELETE FROM graph_entities WHERE id = ?1")
2221            .bind(id)
2222            .execute(&gs.pool)
2223            .await
2224            .unwrap();
2225
2226        // ON DELETE CASCADE should have removed aliases
2227        let aliases = gs.aliases_for_entity(id).await.unwrap();
2228        assert!(
2229            aliases.is_empty(),
2230            "aliases should cascade-delete with entity"
2231        );
2232    }
2233
2234    /// Validates migration 024 backfill on a pre-canonicalization database state.
2235    ///
2236    /// Simulates a database at migration 021 state (no `canonical_name`, no aliases), inserts
2237    /// entities and edges, then applies the migration 024 SQL directly via a single acquired
2238    /// connection (required so that PRAGMA `foreign_keys` = OFF takes effect on the same
2239    /// connection that executes DROP TABLE). Verifies:
2240    /// - `canonical_name` is backfilled from name for all existing entities
2241    /// - initial aliases are seeded from entity names
2242    /// - `graph_edges` survive (FK cascade did not wipe them)
2243    #[tokio::test]
2244    async fn migration_024_backfill_preserves_entities_and_edges() {
2245        use sqlx::Acquire as _;
2246        use sqlx::ConnectOptions as _;
2247        use sqlx::sqlite::SqliteConnectOptions;
2248
2249        // Open an in-memory SQLite database with FK enforcement enabled (matches production).
2250        // Pool size = 1 ensures all queries share the same underlying connection.
2251        let opts = SqliteConnectOptions::from_url(&"sqlite::memory:".parse().unwrap())
2252            .unwrap()
2253            .foreign_keys(true);
2254        let pool = sqlx::pool::PoolOptions::<sqlx::Sqlite>::new()
2255            .max_connections(1)
2256            .connect_with(opts)
2257            .await
2258            .unwrap();
2259
2260        // Create pre-023 schema (migration 021 state): no canonical_name column.
2261        sqlx::query(
2262            "CREATE TABLE graph_entities (
2263                id INTEGER PRIMARY KEY AUTOINCREMENT,
2264                name TEXT NOT NULL,
2265                entity_type TEXT NOT NULL,
2266                summary TEXT,
2267                first_seen_at TEXT NOT NULL DEFAULT (datetime('now')),
2268                last_seen_at TEXT NOT NULL DEFAULT (datetime('now')),
2269                qdrant_point_id TEXT,
2270                UNIQUE(name, entity_type)
2271             )",
2272        )
2273        .execute(&pool)
2274        .await
2275        .unwrap();
2276
2277        sqlx::query(
2278            "CREATE TABLE graph_edges (
2279                id INTEGER PRIMARY KEY AUTOINCREMENT,
2280                source_entity_id INTEGER NOT NULL REFERENCES graph_entities(id) ON DELETE CASCADE,
2281                target_entity_id INTEGER NOT NULL REFERENCES graph_entities(id) ON DELETE CASCADE,
2282                relation TEXT NOT NULL,
2283                fact TEXT NOT NULL,
2284                confidence REAL NOT NULL DEFAULT 1.0,
2285                valid_from TEXT NOT NULL DEFAULT (datetime('now')),
2286                valid_to TEXT,
2287                created_at TEXT NOT NULL DEFAULT (datetime('now')),
2288                expired_at TEXT,
2289                episode_id INTEGER,
2290                qdrant_point_id TEXT
2291             )",
2292        )
2293        .execute(&pool)
2294        .await
2295        .unwrap();
2296
2297        // Create FTS5 table and triggers (migration 023 state).
2298        sqlx::query(
2299            "CREATE VIRTUAL TABLE IF NOT EXISTS graph_entities_fts USING fts5(
2300                name, summary, content='graph_entities', content_rowid='id',
2301                tokenize='unicode61 remove_diacritics 2'
2302             )",
2303        )
2304        .execute(&pool)
2305        .await
2306        .unwrap();
2307        sqlx::query(
2308            "CREATE TRIGGER IF NOT EXISTS graph_entities_fts_insert AFTER INSERT ON graph_entities
2309             BEGIN INSERT INTO graph_entities_fts(rowid, name, summary) VALUES (new.id, new.name, COALESCE(new.summary, '')); END",
2310        )
2311        .execute(&pool)
2312        .await
2313        .unwrap();
2314        sqlx::query(
2315            "CREATE TRIGGER IF NOT EXISTS graph_entities_fts_delete AFTER DELETE ON graph_entities
2316             BEGIN INSERT INTO graph_entities_fts(graph_entities_fts, rowid, name, summary) VALUES ('delete', old.id, old.name, COALESCE(old.summary, '')); END",
2317        )
2318        .execute(&pool)
2319        .await
2320        .unwrap();
2321        sqlx::query(
2322            "CREATE TRIGGER IF NOT EXISTS graph_entities_fts_update AFTER UPDATE ON graph_entities
2323             BEGIN
2324                 INSERT INTO graph_entities_fts(graph_entities_fts, rowid, name, summary) VALUES ('delete', old.id, old.name, COALESCE(old.summary, ''));
2325                 INSERT INTO graph_entities_fts(rowid, name, summary) VALUES (new.id, new.name, COALESCE(new.summary, ''));
2326             END",
2327        )
2328        .execute(&pool)
2329        .await
2330        .unwrap();
2331
2332        // Insert pre-existing entities and an edge.
2333        let alice_id: i64 = sqlx::query_scalar(
2334            "INSERT INTO graph_entities (name, entity_type) VALUES ('Alice', 'person') RETURNING id",
2335        )
2336        .fetch_one(&pool)
2337        .await
2338        .unwrap();
2339
2340        let rust_id: i64 = sqlx::query_scalar(
2341            "INSERT INTO graph_entities (name, entity_type) VALUES ('Rust', 'language') RETURNING id",
2342        )
2343        .fetch_one(&pool)
2344        .await
2345        .unwrap();
2346
2347        sqlx::query(
2348            "INSERT INTO graph_edges (source_entity_id, target_entity_id, relation, fact)
2349             VALUES (?1, ?2, 'uses', 'Alice uses Rust')",
2350        )
2351        .bind(alice_id)
2352        .bind(rust_id)
2353        .execute(&pool)
2354        .await
2355        .unwrap();
2356
2357        // Apply migration 024 on a single pinned connection so PRAGMA foreign_keys = OFF
2358        // takes effect on the same connection that executes DROP TABLE (required because
2359        // PRAGMA foreign_keys is per-connection, not per-transaction).
2360        let mut conn = pool.acquire().await.unwrap();
2361        let conn = conn.acquire().await.unwrap();
2362
2363        sqlx::query("PRAGMA foreign_keys = OFF")
2364            .execute(&mut *conn)
2365            .await
2366            .unwrap();
2367        sqlx::query("ALTER TABLE graph_entities ADD COLUMN canonical_name TEXT")
2368            .execute(&mut *conn)
2369            .await
2370            .unwrap();
2371        sqlx::query("UPDATE graph_entities SET canonical_name = name WHERE canonical_name IS NULL")
2372            .execute(&mut *conn)
2373            .await
2374            .unwrap();
2375        sqlx::query(
2376            "CREATE TABLE graph_entities_new (
2377                id INTEGER PRIMARY KEY AUTOINCREMENT,
2378                name TEXT NOT NULL,
2379                canonical_name TEXT NOT NULL,
2380                entity_type TEXT NOT NULL,
2381                summary TEXT,
2382                first_seen_at TEXT NOT NULL DEFAULT (datetime('now')),
2383                last_seen_at TEXT NOT NULL DEFAULT (datetime('now')),
2384                qdrant_point_id TEXT,
2385                UNIQUE(canonical_name, entity_type)
2386             )",
2387        )
2388        .execute(&mut *conn)
2389        .await
2390        .unwrap();
2391        sqlx::query(
2392            "INSERT INTO graph_entities_new
2393                 (id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id)
2394             SELECT id, name, COALESCE(canonical_name, name), entity_type, summary,
2395                    first_seen_at, last_seen_at, qdrant_point_id
2396             FROM graph_entities",
2397        )
2398        .execute(&mut *conn)
2399        .await
2400        .unwrap();
2401        sqlx::query("DROP TABLE graph_entities")
2402            .execute(&mut *conn)
2403            .await
2404            .unwrap();
2405        sqlx::query("ALTER TABLE graph_entities_new RENAME TO graph_entities")
2406            .execute(&mut *conn)
2407            .await
2408            .unwrap();
2409        // Rebuild FTS5 triggers (dropped with the old table) and rebuild index.
2410        sqlx::query(
2411            "CREATE TRIGGER IF NOT EXISTS graph_entities_fts_insert AFTER INSERT ON graph_entities
2412             BEGIN INSERT INTO graph_entities_fts(rowid, name, summary) VALUES (new.id, new.name, COALESCE(new.summary, '')); END",
2413        )
2414        .execute(&mut *conn)
2415        .await
2416        .unwrap();
2417        sqlx::query(
2418            "CREATE TRIGGER IF NOT EXISTS graph_entities_fts_delete AFTER DELETE ON graph_entities
2419             BEGIN INSERT INTO graph_entities_fts(graph_entities_fts, rowid, name, summary) VALUES ('delete', old.id, old.name, COALESCE(old.summary, '')); END",
2420        )
2421        .execute(&mut *conn)
2422        .await
2423        .unwrap();
2424        sqlx::query(
2425            "CREATE TRIGGER IF NOT EXISTS graph_entities_fts_update AFTER UPDATE ON graph_entities
2426             BEGIN
2427                 INSERT INTO graph_entities_fts(graph_entities_fts, rowid, name, summary) VALUES ('delete', old.id, old.name, COALESCE(old.summary, ''));
2428                 INSERT INTO graph_entities_fts(rowid, name, summary) VALUES (new.id, new.name, COALESCE(new.summary, ''));
2429             END",
2430        )
2431        .execute(&mut *conn)
2432        .await
2433        .unwrap();
2434        sqlx::query("INSERT INTO graph_entities_fts(graph_entities_fts) VALUES('rebuild')")
2435            .execute(&mut *conn)
2436            .await
2437            .unwrap();
2438        sqlx::query(
2439            "CREATE TABLE graph_entity_aliases (
2440                id INTEGER PRIMARY KEY AUTOINCREMENT,
2441                entity_id INTEGER NOT NULL REFERENCES graph_entities(id) ON DELETE CASCADE,
2442                alias_name TEXT NOT NULL,
2443                created_at TEXT NOT NULL DEFAULT (datetime('now')),
2444                UNIQUE(alias_name, entity_id)
2445             )",
2446        )
2447        .execute(&mut *conn)
2448        .await
2449        .unwrap();
2450        sqlx::query(
2451            "INSERT OR IGNORE INTO graph_entity_aliases (entity_id, alias_name)
2452             SELECT id, name FROM graph_entities",
2453        )
2454        .execute(&mut *conn)
2455        .await
2456        .unwrap();
2457        sqlx::query("PRAGMA foreign_keys = ON")
2458            .execute(&mut *conn)
2459            .await
2460            .unwrap();
2461
2462        // Verify: canonical_name backfilled from name
2463        let alice_canon: String =
2464            sqlx::query_scalar("SELECT canonical_name FROM graph_entities WHERE id = ?1")
2465                .bind(alice_id)
2466                .fetch_one(&mut *conn)
2467                .await
2468                .unwrap();
2469        assert_eq!(
2470            alice_canon, "Alice",
2471            "canonical_name should equal pre-migration name"
2472        );
2473
2474        let rust_canon: String =
2475            sqlx::query_scalar("SELECT canonical_name FROM graph_entities WHERE id = ?1")
2476                .bind(rust_id)
2477                .fetch_one(&mut *conn)
2478                .await
2479                .unwrap();
2480        assert_eq!(
2481            rust_canon, "Rust",
2482            "canonical_name should equal pre-migration name"
2483        );
2484
2485        // Verify: aliases seeded
2486        let alice_aliases: Vec<String> =
2487            sqlx::query_scalar("SELECT alias_name FROM graph_entity_aliases WHERE entity_id = ?1")
2488                .bind(alice_id)
2489                .fetch_all(&mut *conn)
2490                .await
2491                .unwrap();
2492        assert!(
2493            alice_aliases.contains(&"Alice".to_owned()),
2494            "initial alias should be seeded from entity name"
2495        );
2496
2497        // Verify: graph_edges survived (FK cascade did not wipe them)
2498        let edge_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM graph_edges")
2499            .fetch_one(&mut *conn)
2500            .await
2501            .unwrap();
2502        assert_eq!(
2503            edge_count, 1,
2504            "graph_edges must survive migration 024 table recreation"
2505        );
2506    }
2507
2508    #[tokio::test]
2509    async fn find_entity_by_alias_same_alias_two_entities_deterministic() {
2510        // Two same-type entities share an alias — ORDER BY id ASC ensures first-registered wins.
2511        let gs = setup().await;
2512        let id1 = gs
2513            .upsert_entity("python-v2", "python-v2", EntityType::Language, None)
2514            .await
2515            .unwrap();
2516        let id2 = gs
2517            .upsert_entity("python-v3", "python-v3", EntityType::Language, None)
2518            .await
2519            .unwrap();
2520        gs.add_alias(id1, "python").await.unwrap();
2521        gs.add_alias(id2, "python").await.unwrap();
2522
2523        // Both entities now have alias "python" — should return the first-registered (id1)
2524        let found = gs
2525            .find_entity_by_alias("python", EntityType::Language)
2526            .await
2527            .unwrap();
2528        assert!(found.is_some(), "should find an entity by shared alias");
2529        // ORDER BY e.id ASC guarantees deterministic result: first inserted wins
2530        assert_eq!(
2531            found.unwrap().id,
2532            id1,
2533            "first-registered entity should win on shared alias"
2534        );
2535    }
2536
2537    // ── FTS5 search tests ────────────────────────────────────────────────────
2538
2539    #[tokio::test]
2540    async fn find_entities_fuzzy_special_chars() {
2541        let gs = setup().await;
2542        gs.upsert_entity("Graph", "Graph", EntityType::Concept, None)
2543            .await
2544            .unwrap();
2545        // FTS5 special characters in query must not cause an error.
2546        let results = gs.find_entities_fuzzy("graph\"()*:^", 10).await.unwrap();
2547        // "graph" survives sanitization and matches.
2548        assert!(results.iter().any(|e| e.name == "Graph"));
2549    }
2550
2551    #[tokio::test]
2552    async fn find_entities_fuzzy_prefix_match() {
2553        let gs = setup().await;
2554        gs.upsert_entity("Graph", "Graph", EntityType::Concept, None)
2555            .await
2556            .unwrap();
2557        gs.upsert_entity("GraphQL", "GraphQL", EntityType::Concept, None)
2558            .await
2559            .unwrap();
2560        gs.upsert_entity("Unrelated", "Unrelated", EntityType::Concept, None)
2561            .await
2562            .unwrap();
2563        // "Gra" prefix should match both "Graph" and "GraphQL" via FTS5 "gra*".
2564        let results = gs.find_entities_fuzzy("Gra", 10).await.unwrap();
2565        assert_eq!(results.len(), 2);
2566        assert!(results.iter().any(|e| e.name == "Graph"));
2567        assert!(results.iter().any(|e| e.name == "GraphQL"));
2568    }
2569
2570    #[tokio::test]
2571    async fn find_entities_fuzzy_fts5_operator_injection() {
2572        let gs = setup().await;
2573        gs.upsert_entity("Graph", "Graph", EntityType::Concept, None)
2574            .await
2575            .unwrap();
2576        gs.upsert_entity("Unrelated", "Unrelated", EntityType::Concept, None)
2577            .await
2578            .unwrap();
2579        // "graph OR unrelated" — sanitizer splits on non-alphanumeric chars,
2580        // yielding tokens ["graph", "OR", "unrelated"]. The FTS5_OPERATORS filter
2581        // removes "OR", producing "graph* unrelated*" (implicit AND).
2582        // No entity contains both token prefixes, so the result is empty.
2583        let results = gs
2584            .find_entities_fuzzy("graph OR unrelated", 10)
2585            .await
2586            .unwrap();
2587        assert!(
2588            results.is_empty(),
2589            "implicit AND of 'graph*' and 'unrelated*' should match no entity"
2590        );
2591    }
2592
2593    #[tokio::test]
2594    async fn find_entities_fuzzy_after_entity_update() {
2595        let gs = setup().await;
2596        // Insert entity with initial summary.
2597        gs.upsert_entity(
2598            "Foo",
2599            "Foo",
2600            EntityType::Concept,
2601            Some("initial summary bar"),
2602        )
2603        .await
2604        .unwrap();
2605        // Update summary via upsert — triggers the FTS UPDATE trigger.
2606        gs.upsert_entity(
2607            "Foo",
2608            "Foo",
2609            EntityType::Concept,
2610            Some("updated summary baz"),
2611        )
2612        .await
2613        .unwrap();
2614        // Old summary term should not match.
2615        let old_results = gs.find_entities_fuzzy("bar", 10).await.unwrap();
2616        assert!(
2617            old_results.is_empty(),
2618            "old summary content should not match after update"
2619        );
2620        // New summary term should match.
2621        let new_results = gs.find_entities_fuzzy("baz", 10).await.unwrap();
2622        assert_eq!(new_results.len(), 1);
2623        assert_eq!(new_results[0].name, "Foo");
2624    }
2625
2626    #[tokio::test]
2627    async fn find_entities_fuzzy_only_special_chars() {
2628        let gs = setup().await;
2629        gs.upsert_entity("Alpha", "Alpha", EntityType::Concept, None)
2630            .await
2631            .unwrap();
2632        // Queries consisting solely of FTS5 special characters produce no alphanumeric
2633        // tokens after sanitization, so the function returns early with an empty vec
2634        // rather than passing an empty or malformed MATCH expression to FTS5.
2635        let results = gs.find_entities_fuzzy("***", 10).await.unwrap();
2636        assert!(
2637            results.is_empty(),
2638            "only special chars should return no results"
2639        );
2640        let results = gs.find_entities_fuzzy("(((", 10).await.unwrap();
2641        assert!(results.is_empty(), "only parens should return no results");
2642        let results = gs.find_entities_fuzzy("\"\"\"", 10).await.unwrap();
2643        assert!(results.is_empty(), "only quotes should return no results");
2644    }
2645
2646    // ── find_entity_by_name tests ─────────────────────────────────────────────
2647
2648    #[tokio::test]
2649    async fn find_entity_by_name_exact_wins_over_summary_mention() {
2650        // Regression test for: /graph facts Alice returns Google because Google's
2651        // summary mentions "Alice".
2652        let gs = setup().await;
2653        gs.upsert_entity(
2654            "Alice",
2655            "Alice",
2656            EntityType::Person,
2657            Some("A person named Alice"),
2658        )
2659        .await
2660        .unwrap();
2661        // Google's summary mentions "Alice" — without the fix, FTS5 could rank this first.
2662        gs.upsert_entity(
2663            "Google",
2664            "Google",
2665            EntityType::Organization,
2666            Some("Company where Charlie, Alice, and Bob have worked"),
2667        )
2668        .await
2669        .unwrap();
2670
2671        let results = gs.find_entity_by_name("Alice").await.unwrap();
2672        assert!(!results.is_empty(), "must find at least one entity");
2673        assert_eq!(
2674            results[0].name, "Alice",
2675            "exact name match must come first, not entity with 'Alice' in summary"
2676        );
2677    }
2678
2679    #[tokio::test]
2680    async fn find_entity_by_name_case_insensitive_exact() {
2681        let gs = setup().await;
2682        gs.upsert_entity("Bob", "Bob", EntityType::Person, None)
2683            .await
2684            .unwrap();
2685
2686        let results = gs.find_entity_by_name("bob").await.unwrap();
2687        assert!(!results.is_empty());
2688        assert_eq!(results[0].name, "Bob");
2689    }
2690
2691    #[tokio::test]
2692    async fn find_entity_by_name_falls_back_to_fuzzy_when_no_exact_match() {
2693        let gs = setup().await;
2694        gs.upsert_entity("Charlie", "Charlie", EntityType::Person, None)
2695            .await
2696            .unwrap();
2697
2698        // "Char" is not an exact match for "Charlie" → FTS5 prefix fallback should find it.
2699        let results = gs.find_entity_by_name("Char").await.unwrap();
2700        assert!(!results.is_empty(), "prefix search must find Charlie");
2701    }
2702
2703    #[tokio::test]
2704    async fn find_entity_by_name_returns_empty_for_unknown() {
2705        let gs = setup().await;
2706        let results = gs.find_entity_by_name("NonExistent").await.unwrap();
2707        assert!(results.is_empty());
2708    }
2709
2710    #[tokio::test]
2711    async fn find_entity_by_name_matches_canonical_name() {
2712        // Verify the exact-match phase checks canonical_name, not only name.
2713        let gs = setup().await;
2714        // upsert_entity sets canonical_name = second arg
2715        gs.upsert_entity("Dave (Engineer)", "Dave", EntityType::Person, None)
2716            .await
2717            .unwrap();
2718
2719        // Searching by canonical_name "Dave" must return the entity even though
2720        // the display name is "Dave (Engineer)".
2721        let results = gs.find_entity_by_name("Dave").await.unwrap();
2722        assert!(
2723            !results.is_empty(),
2724            "canonical_name match must return entity"
2725        );
2726        assert_eq!(results[0].canonical_name, "Dave");
2727    }
2728
2729    async fn insert_test_message(gs: &GraphStore, content: &str) -> crate::types::MessageId {
2730        // Insert a conversation first (FK constraint).
2731        let conv_id: i64 =
2732            sqlx::query_scalar("INSERT INTO conversations DEFAULT VALUES RETURNING id")
2733                .fetch_one(&gs.pool)
2734                .await
2735                .unwrap();
2736        let id: i64 = sqlx::query_scalar(
2737            "INSERT INTO messages (conversation_id, role, content) VALUES (?1, 'user', ?2) RETURNING id",
2738        )
2739        .bind(conv_id)
2740        .bind(content)
2741        .fetch_one(&gs.pool)
2742        .await
2743        .unwrap();
2744        crate::types::MessageId(id)
2745    }
2746
2747    #[tokio::test]
2748    async fn unprocessed_messages_for_backfill_returns_unprocessed() {
2749        let gs = setup().await;
2750        let id1 = insert_test_message(&gs, "hello world").await;
2751        let id2 = insert_test_message(&gs, "second message").await;
2752
2753        let rows = gs.unprocessed_messages_for_backfill(10).await.unwrap();
2754        assert_eq!(rows.len(), 2);
2755        assert!(rows.iter().any(|(id, _)| *id == id1));
2756        assert!(rows.iter().any(|(id, _)| *id == id2));
2757    }
2758
2759    #[tokio::test]
2760    async fn unprocessed_messages_for_backfill_respects_limit() {
2761        let gs = setup().await;
2762        insert_test_message(&gs, "msg1").await;
2763        insert_test_message(&gs, "msg2").await;
2764        insert_test_message(&gs, "msg3").await;
2765
2766        let rows = gs.unprocessed_messages_for_backfill(2).await.unwrap();
2767        assert_eq!(rows.len(), 2);
2768    }
2769
2770    #[tokio::test]
2771    async fn mark_messages_graph_processed_updates_flag() {
2772        let gs = setup().await;
2773        let id1 = insert_test_message(&gs, "to process").await;
2774        let _id2 = insert_test_message(&gs, "also to process").await;
2775
2776        // Before marking: both are unprocessed
2777        let count_before = gs.unprocessed_message_count().await.unwrap();
2778        assert_eq!(count_before, 2);
2779
2780        gs.mark_messages_graph_processed(&[id1]).await.unwrap();
2781
2782        let count_after = gs.unprocessed_message_count().await.unwrap();
2783        assert_eq!(count_after, 1);
2784
2785        // Remaining unprocessed should not contain id1
2786        let rows = gs.unprocessed_messages_for_backfill(10).await.unwrap();
2787        assert!(!rows.iter().any(|(id, _)| *id == id1));
2788    }
2789
2790    #[tokio::test]
2791    async fn mark_messages_graph_processed_empty_ids_is_noop() {
2792        let gs = setup().await;
2793        insert_test_message(&gs, "message").await;
2794
2795        gs.mark_messages_graph_processed(&[]).await.unwrap();
2796
2797        let count = gs.unprocessed_message_count().await.unwrap();
2798        assert_eq!(count, 1);
2799    }
2800
2801    #[tokio::test]
2802    async fn edges_after_id_first_page_returns_all_within_limit() {
2803        let gs = setup().await;
2804        let a = gs
2805            .upsert_entity("PA", "PA", EntityType::Concept, None)
2806            .await
2807            .unwrap();
2808        let b = gs
2809            .upsert_entity("PB", "PB", EntityType::Concept, None)
2810            .await
2811            .unwrap();
2812        let c = gs
2813            .upsert_entity("PC", "PC", EntityType::Concept, None)
2814            .await
2815            .unwrap();
2816        let e1 = gs.insert_edge(a, b, "r", "f1", 1.0, None).await.unwrap();
2817        let e2 = gs.insert_edge(b, c, "r", "f2", 1.0, None).await.unwrap();
2818        let e3 = gs.insert_edge(a, c, "r", "f3", 1.0, None).await.unwrap();
2819
2820        // after_id=0 returns first page.
2821        let page1 = gs.edges_after_id(0, 2).await.unwrap();
2822        assert_eq!(page1.len(), 2);
2823        assert_eq!(page1[0].id, e1);
2824        assert_eq!(page1[1].id, e2);
2825
2826        // Continue from last id of page1.
2827        let page2 = gs
2828            .edges_after_id(page1.last().unwrap().id, 2)
2829            .await
2830            .unwrap();
2831        assert_eq!(page2.len(), 1);
2832        assert_eq!(page2[0].id, e3);
2833
2834        // Page after the last element returns empty.
2835        let page3 = gs
2836            .edges_after_id(page2.last().unwrap().id, 2)
2837            .await
2838            .unwrap();
2839        assert!(page3.is_empty(), "no more edges after last id");
2840    }
2841
2842    #[tokio::test]
2843    async fn edges_after_id_skips_invalidated_edges() {
2844        let gs = setup().await;
2845        let a = gs
2846            .upsert_entity("IA", "IA", EntityType::Concept, None)
2847            .await
2848            .unwrap();
2849        let b = gs
2850            .upsert_entity("IB", "IB", EntityType::Concept, None)
2851            .await
2852            .unwrap();
2853        let c = gs
2854            .upsert_entity("IC", "IC", EntityType::Concept, None)
2855            .await
2856            .unwrap();
2857        let e1 = gs.insert_edge(a, b, "r", "f1", 1.0, None).await.unwrap();
2858        let e2 = gs.insert_edge(b, c, "r", "f2", 1.0, None).await.unwrap();
2859
2860        // Invalidate e1 — it must not appear in edges_after_id results.
2861        gs.invalidate_edge(e1).await.unwrap();
2862
2863        let page = gs.edges_after_id(0, 10).await.unwrap();
2864        assert_eq!(page.len(), 1, "invalidated edge must be excluded");
2865        assert_eq!(page[0].id, e2);
2866    }
2867
2868    // ── Temporal query tests ──────────────────────────────────────────────────
2869
2870    #[tokio::test]
2871    async fn edges_at_timestamp_returns_active_edge() {
2872        let gs = setup().await;
2873        let a = gs
2874            .upsert_entity("TA", "TA", EntityType::Person, None)
2875            .await
2876            .unwrap();
2877        let b = gs
2878            .upsert_entity("TB", "TB", EntityType::Person, None)
2879            .await
2880            .unwrap();
2881        gs.insert_edge(a, b, "knows", "TA knows TB", 1.0, None)
2882            .await
2883            .unwrap();
2884
2885        // Active edge (valid_to IS NULL) must be visible at any timestamp.
2886        let edges = gs
2887            .edges_at_timestamp(a, "2099-01-01 00:00:00")
2888            .await
2889            .unwrap();
2890        assert_eq!(edges.len(), 1, "active edge must be visible at future ts");
2891        assert_eq!(edges[0].relation, "knows");
2892    }
2893
2894    #[tokio::test]
2895    async fn edges_at_timestamp_excludes_future_valid_from() {
2896        let gs = setup().await;
2897        let a = gs
2898            .upsert_entity("FA", "FA", EntityType::Person, None)
2899            .await
2900            .unwrap();
2901        let b = gs
2902            .upsert_entity("FB", "FB", EntityType::Person, None)
2903            .await
2904            .unwrap();
2905        // Insert edge with valid_from in the far future.
2906        sqlx::query(
2907            "INSERT INTO graph_edges (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
2908             VALUES (?1, ?2, 'rel', 'fact', 1.0, '2100-01-01 00:00:00')",
2909        )
2910        .bind(a)
2911        .bind(b)
2912        .execute(gs.pool())
2913        .await
2914        .unwrap();
2915
2916        // Query at 2026 — future-valid_from edge must be excluded.
2917        let edges = gs
2918            .edges_at_timestamp(a, "2026-01-01 00:00:00")
2919            .await
2920            .unwrap();
2921        assert!(
2922            edges.is_empty(),
2923            "edge with future valid_from must not be visible at earlier timestamp"
2924        );
2925    }
2926
2927    #[tokio::test]
2928    async fn edges_at_timestamp_historical_window_visible() {
2929        let gs = setup().await;
2930        let a = gs
2931            .upsert_entity("HA", "HA", EntityType::Person, None)
2932            .await
2933            .unwrap();
2934        let b = gs
2935            .upsert_entity("HB", "HB", EntityType::Person, None)
2936            .await
2937            .unwrap();
2938        // Expired edge valid 2020-01-01 → 2021-01-01.
2939        sqlx::query(
2940            "INSERT INTO graph_edges
2941             (source_entity_id, target_entity_id, relation, fact, confidence, valid_from, valid_to, expired_at)
2942             VALUES (?1, ?2, 'managed', 'HA managed HB', 0.8,
2943                     '2020-01-01 00:00:00', '2021-01-01 00:00:00', '2021-01-01 00:00:00')",
2944        )
2945        .bind(a)
2946        .bind(b)
2947        .execute(gs.pool())
2948        .await
2949        .unwrap();
2950
2951        // During validity window → visible.
2952        let during = gs
2953            .edges_at_timestamp(a, "2020-06-01 00:00:00")
2954            .await
2955            .unwrap();
2956        assert_eq!(
2957            during.len(),
2958            1,
2959            "expired edge must be visible during its validity window"
2960        );
2961
2962        // Before valid_from → not visible.
2963        let before = gs
2964            .edges_at_timestamp(a, "2019-01-01 00:00:00")
2965            .await
2966            .unwrap();
2967        assert!(
2968            before.is_empty(),
2969            "edge must not be visible before valid_from"
2970        );
2971
2972        // After valid_to → not visible.
2973        let after = gs
2974            .edges_at_timestamp(a, "2026-01-01 00:00:00")
2975            .await
2976            .unwrap();
2977        assert!(
2978            after.is_empty(),
2979            "expired edge must not be visible after valid_to"
2980        );
2981    }
2982
2983    #[tokio::test]
2984    async fn edges_at_timestamp_entity_as_target() {
2985        let gs = setup().await;
2986        let src = gs
2987            .upsert_entity("SRC", "SRC", EntityType::Person, None)
2988            .await
2989            .unwrap();
2990        let tgt = gs
2991            .upsert_entity("TGT", "TGT", EntityType::Person, None)
2992            .await
2993            .unwrap();
2994        gs.insert_edge(src, tgt, "links", "SRC links TGT", 0.9, None)
2995            .await
2996            .unwrap();
2997
2998        // Query by target entity_id at a far-future timestamp — must find the active edge.
2999        let edges = gs
3000            .edges_at_timestamp(tgt, "2099-01-01 00:00:00")
3001            .await
3002            .unwrap();
3003        assert_eq!(
3004            edges.len(),
3005            1,
3006            "edge must be found when querying by target entity_id"
3007        );
3008    }
3009
3010    #[tokio::test]
3011    async fn bfs_at_timestamp_excludes_expired_edges() {
3012        let gs = setup().await;
3013        let a = gs
3014            .upsert_entity("BA", "BA", EntityType::Person, None)
3015            .await
3016            .unwrap();
3017        let b = gs
3018            .upsert_entity("BB", "BB", EntityType::Person, None)
3019            .await
3020            .unwrap();
3021        let c = gs
3022            .upsert_entity("BC", "BC", EntityType::Concept, None)
3023            .await
3024            .unwrap();
3025
3026        // A → B: active edge with explicit valid_from in 2019 so it predates all test timestamps.
3027        sqlx::query(
3028            "INSERT INTO graph_edges
3029             (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3030             VALUES (?1, ?2, 'knows', 'BA knows BB', 1.0, '2019-01-01 00:00:00')",
3031        )
3032        .bind(a)
3033        .bind(b)
3034        .execute(gs.pool())
3035        .await
3036        .unwrap();
3037        // B → C: expired edge valid 2020→2021.
3038        sqlx::query(
3039            "INSERT INTO graph_edges
3040             (source_entity_id, target_entity_id, relation, fact, confidence, valid_from, valid_to, expired_at)
3041             VALUES (?1, ?2, 'used', 'BB used BC', 0.9,
3042                     '2020-01-01 00:00:00', '2021-01-01 00:00:00', '2021-01-01 00:00:00')",
3043        )
3044        .bind(b)
3045        .bind(c)
3046        .execute(gs.pool())
3047        .await
3048        .unwrap();
3049
3050        // BFS at 2026: A→B active (valid since 2019); B→C expired → C not reachable at 2026.
3051        let (entities, _edges, depth_map) = gs
3052            .bfs_at_timestamp(a, 3, "2026-01-01 00:00:00")
3053            .await
3054            .unwrap();
3055        let entity_ids: Vec<i64> = entities.iter().map(|e| e.id).collect();
3056        assert!(
3057            depth_map.contains_key(&a),
3058            "start entity must be in depth_map"
3059        );
3060        assert!(
3061            depth_map.contains_key(&b),
3062            "B should be reachable via active A→B edge"
3063        );
3064        assert!(
3065            !entity_ids.contains(&c),
3066            "C must not be reachable at 2026 because B→C expired in 2021"
3067        );
3068
3069        // BFS at 2020-06-01: both A→B (active since 2019) and B→C (within window) are valid.
3070        let (_entities2, _edges2, depth_map2) = gs
3071            .bfs_at_timestamp(a, 3, "2020-06-01 00:00:00")
3072            .await
3073            .unwrap();
3074        assert!(
3075            depth_map2.contains_key(&c),
3076            "C should be reachable at 2020-06-01 when B→C was valid"
3077        );
3078    }
3079
3080    #[tokio::test]
3081    async fn edge_history_returns_all_versions_ordered() {
3082        let gs = setup().await;
3083        let src = gs
3084            .upsert_entity("ESrc", "ESrc", EntityType::Person, None)
3085            .await
3086            .unwrap();
3087        let tgt = gs
3088            .upsert_entity("ETgt", "ETgt", EntityType::Organization, None)
3089            .await
3090            .unwrap();
3091
3092        // Version 1: valid 2020→2022 (expired).
3093        sqlx::query(
3094            "INSERT INTO graph_edges
3095             (source_entity_id, target_entity_id, relation, fact, confidence, valid_from, valid_to, expired_at)
3096             VALUES (?1, ?2, 'works_at', 'ESrc works at CompanyA', 0.9,
3097                     '2020-01-01 00:00:00', '2022-01-01 00:00:00', '2022-01-01 00:00:00')",
3098        )
3099        .bind(src)
3100        .bind(tgt)
3101        .execute(gs.pool())
3102        .await
3103        .unwrap();
3104        // Version 2: active since 2022.
3105        sqlx::query(
3106            "INSERT INTO graph_edges
3107             (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3108             VALUES (?1, ?2, 'works_at', 'ESrc works at CompanyB', 0.95, '2022-01-01 00:00:00')",
3109        )
3110        .bind(src)
3111        .bind(tgt)
3112        .execute(gs.pool())
3113        .await
3114        .unwrap();
3115
3116        // History without relation filter — both versions returned, newest first.
3117        let history = gs.edge_history(src, "works at", None, 100).await.unwrap();
3118        assert_eq!(history.len(), 2, "both edge versions must be returned");
3119        // Ordered valid_from DESC — version 2 (2022) before version 1 (2020).
3120        assert!(
3121            history[0].valid_from >= history[1].valid_from,
3122            "results must be ordered by valid_from DESC"
3123        );
3124
3125        // History with relation filter.
3126        let filtered = gs
3127            .edge_history(src, "works at", Some("works_at"), 100)
3128            .await
3129            .unwrap();
3130        assert_eq!(
3131            filtered.len(),
3132            2,
3133            "relation filter must retain both versions"
3134        );
3135
3136        // History with non-matching predicate.
3137        let empty = gs
3138            .edge_history(src, "nonexistent_predicate_xyz", None, 100)
3139            .await
3140            .unwrap();
3141        assert!(empty.is_empty(), "non-matching predicate must return empty");
3142    }
3143
3144    #[tokio::test]
3145    async fn edge_history_like_escaping() {
3146        let gs = setup().await;
3147        let src = gs
3148            .upsert_entity("EscSrc", "EscSrc", EntityType::Person, None)
3149            .await
3150            .unwrap();
3151        let tgt = gs
3152            .upsert_entity("EscTgt", "EscTgt", EntityType::Concept, None)
3153            .await
3154            .unwrap();
3155
3156        // Insert an edge with a fact that contains neither '%' nor '_'.
3157        gs.insert_edge(src, tgt, "ref", "plain text fact no wildcards", 1.0, None)
3158            .await
3159            .unwrap();
3160
3161        // Searching with '%' as predicate must NOT match all edges (wildcard injection).
3162        // After LIKE escaping '%' becomes '\%', so only facts containing literal '%' match.
3163        let results = gs.edge_history(src, "%", None, 100).await.unwrap();
3164        assert!(
3165            results.is_empty(),
3166            "LIKE wildcard '%' in predicate must be escaped and not match all edges"
3167        );
3168
3169        // Searching with '_' must only match facts containing literal '_'.
3170        // Our fact has no '_', so result must be empty.
3171        let results_underscore = gs.edge_history(src, "_", None, 100).await.unwrap();
3172        assert!(
3173            results_underscore.is_empty(),
3174            "LIKE wildcard '_' in predicate must be escaped and not match single-char substrings"
3175        );
3176    }
3177
3178    #[tokio::test]
3179    async fn invalidate_edge_sets_valid_to_and_expired_at() {
3180        let gs = setup().await;
3181        let a = gs
3182            .upsert_entity("InvA", "InvA", EntityType::Person, None)
3183            .await
3184            .unwrap();
3185        let b = gs
3186            .upsert_entity("InvB", "InvB", EntityType::Person, None)
3187            .await
3188            .unwrap();
3189        let edge_id = gs
3190            .insert_edge(a, b, "rel", "InvA rel InvB", 1.0, None)
3191            .await
3192            .unwrap();
3193
3194        // Before invalidation: valid_to and expired_at must be NULL.
3195        let active_edge: (Option<String>, Option<String>) =
3196            sqlx::query_as("SELECT valid_to, expired_at FROM graph_edges WHERE id = ?1")
3197                .bind(edge_id)
3198                .fetch_one(gs.pool())
3199                .await
3200                .unwrap();
3201        assert!(
3202            active_edge.0.is_none(),
3203            "valid_to must be NULL before invalidation"
3204        );
3205        assert!(
3206            active_edge.1.is_none(),
3207            "expired_at must be NULL before invalidation"
3208        );
3209
3210        gs.invalidate_edge(edge_id).await.unwrap();
3211
3212        // After invalidation: both valid_to and expired_at must be set.
3213        let dead_edge: (Option<String>, Option<String>) =
3214            sqlx::query_as("SELECT valid_to, expired_at FROM graph_edges WHERE id = ?1")
3215                .bind(edge_id)
3216                .fetch_one(gs.pool())
3217                .await
3218                .unwrap();
3219        assert!(
3220            dead_edge.0.is_some(),
3221            "valid_to must be set after invalidation"
3222        );
3223        assert!(
3224            dead_edge.1.is_some(),
3225            "expired_at must be set after invalidation"
3226        );
3227    }
3228
3229    // ── New temporal unit tests (issue-1776) ──────────────────────────────────
3230
3231    // edges_at_timestamp
3232
3233    #[tokio::test]
3234    async fn edges_at_timestamp_valid_from_inclusive() {
3235        let gs = setup().await;
3236        let a = gs
3237            .upsert_entity("VFI_A", "VFI_A", EntityType::Person, None)
3238            .await
3239            .unwrap();
3240        let b = gs
3241            .upsert_entity("VFI_B", "VFI_B", EntityType::Person, None)
3242            .await
3243            .unwrap();
3244        sqlx::query(
3245            "INSERT INTO graph_edges
3246             (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3247             VALUES (?1, ?2, 'knows', 'VFI_A knows VFI_B', 1.0, '2025-06-01 00:00:00')",
3248        )
3249        .bind(a)
3250        .bind(b)
3251        .execute(gs.pool())
3252        .await
3253        .unwrap();
3254
3255        // Query at exactly valid_from — must be included (valid_from <= ts).
3256        let edges = gs
3257            .edges_at_timestamp(a, "2025-06-01 00:00:00")
3258            .await
3259            .unwrap();
3260        assert_eq!(
3261            edges.len(),
3262            1,
3263            "edge with valid_from == ts must be visible (inclusive boundary)"
3264        );
3265    }
3266
3267    #[tokio::test]
3268    async fn edges_at_timestamp_valid_to_exclusive() {
3269        let gs = setup().await;
3270        let a = gs
3271            .upsert_entity("VTO_A", "VTO_A", EntityType::Person, None)
3272            .await
3273            .unwrap();
3274        let b = gs
3275            .upsert_entity("VTO_B", "VTO_B", EntityType::Person, None)
3276            .await
3277            .unwrap();
3278        sqlx::query(
3279            "INSERT INTO graph_edges
3280             (source_entity_id, target_entity_id, relation, fact, confidence,
3281              valid_from, valid_to, expired_at)
3282             VALUES (?1, ?2, 'knows', 'VTO_A knows VTO_B', 1.0,
3283                     '2020-01-01 00:00:00', '2025-06-01 00:00:00', '2025-06-01 00:00:00')",
3284        )
3285        .bind(a)
3286        .bind(b)
3287        .execute(gs.pool())
3288        .await
3289        .unwrap();
3290
3291        // Query at exactly valid_to — must be excluded (valid_to > ts fails when equal).
3292        let at_boundary = gs
3293            .edges_at_timestamp(a, "2025-06-01 00:00:00")
3294            .await
3295            .unwrap();
3296        assert!(
3297            at_boundary.is_empty(),
3298            "edge with valid_to == ts must NOT be visible (exclusive upper boundary)"
3299        );
3300
3301        // Query one second before valid_to — must be included.
3302        let before_boundary = gs
3303            .edges_at_timestamp(a, "2025-05-31 23:59:59")
3304            .await
3305            .unwrap();
3306        assert_eq!(
3307            before_boundary.len(),
3308            1,
3309            "edge must be visible one second before valid_to"
3310        );
3311    }
3312
3313    #[tokio::test]
3314    async fn edges_at_timestamp_multiple_edges_same_entity() {
3315        let gs = setup().await;
3316        let a = gs
3317            .upsert_entity("ME_A", "ME_A", EntityType::Person, None)
3318            .await
3319            .unwrap();
3320        let b = gs
3321            .upsert_entity("ME_B", "ME_B", EntityType::Person, None)
3322            .await
3323            .unwrap();
3324        let c = gs
3325            .upsert_entity("ME_C", "ME_C", EntityType::Person, None)
3326            .await
3327            .unwrap();
3328        let d = gs
3329            .upsert_entity("ME_D", "ME_D", EntityType::Person, None)
3330            .await
3331            .unwrap();
3332
3333        // A->B: active since 2020, no expiry — visible at 2025.
3334        sqlx::query(
3335            "INSERT INTO graph_edges
3336             (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3337             VALUES (?1, ?2, 'knows', 'ME_A knows ME_B', 1.0, '2020-01-01 00:00:00')",
3338        )
3339        .bind(a)
3340        .bind(b)
3341        .execute(gs.pool())
3342        .await
3343        .unwrap();
3344        // A->C: expired in 2023 — NOT visible at 2025.
3345        sqlx::query(
3346            "INSERT INTO graph_edges
3347             (source_entity_id, target_entity_id, relation, fact, confidence,
3348              valid_from, valid_to, expired_at)
3349             VALUES (?1, ?2, 'knows', 'ME_A knows ME_C', 1.0,
3350                     '2020-01-01 00:00:00', '2023-01-01 00:00:00', '2023-01-01 00:00:00')",
3351        )
3352        .bind(a)
3353        .bind(c)
3354        .execute(gs.pool())
3355        .await
3356        .unwrap();
3357        // A->D: future valid_from 2030 — NOT visible at 2025.
3358        sqlx::query(
3359            "INSERT INTO graph_edges
3360             (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3361             VALUES (?1, ?2, 'knows', 'ME_A knows ME_D', 1.0, '2030-01-01 00:00:00')",
3362        )
3363        .bind(a)
3364        .bind(d)
3365        .execute(gs.pool())
3366        .await
3367        .unwrap();
3368
3369        let edges = gs
3370            .edges_at_timestamp(a, "2025-01-01 00:00:00")
3371            .await
3372            .unwrap();
3373        assert_eq!(
3374            edges.len(),
3375            1,
3376            "only A->B must be visible at 2025 (C expired, D future)"
3377        );
3378        assert_eq!(edges[0].target_entity_id, b);
3379    }
3380
3381    #[tokio::test]
3382    async fn edges_at_timestamp_no_edges_returns_empty() {
3383        let gs = setup().await;
3384        let a = gs
3385            .upsert_entity("NE_A", "NE_A", EntityType::Person, None)
3386            .await
3387            .unwrap();
3388
3389        let edges = gs
3390            .edges_at_timestamp(a, "2025-01-01 00:00:00")
3391            .await
3392            .unwrap();
3393        assert!(
3394            edges.is_empty(),
3395            "entity with no edges must return empty vec"
3396        );
3397    }
3398
3399    // edge_history
3400
3401    #[tokio::test]
3402    async fn edge_history_basic_history() {
3403        let gs = setup().await;
3404        let src = gs
3405            .upsert_entity("EH_Src", "EH_Src", EntityType::Person, None)
3406            .await
3407            .unwrap();
3408        let tgt = gs
3409            .upsert_entity("EH_Tgt", "EH_Tgt", EntityType::Organization, None)
3410            .await
3411            .unwrap();
3412
3413        sqlx::query(
3414            "INSERT INTO graph_edges
3415             (source_entity_id, target_entity_id, relation, fact, confidence,
3416              valid_from, valid_to, expired_at)
3417             VALUES (?1, ?2, 'works_at', 'EH_Src works at OrgA', 0.9,
3418                     '2020-01-01 00:00:00', '2022-01-01 00:00:00', '2022-01-01 00:00:00')",
3419        )
3420        .bind(src)
3421        .bind(tgt)
3422        .execute(gs.pool())
3423        .await
3424        .unwrap();
3425        sqlx::query(
3426            "INSERT INTO graph_edges
3427             (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3428             VALUES (?1, ?2, 'works_at', 'EH_Src works at OrgB', 0.95, '2022-01-01 00:00:00')",
3429        )
3430        .bind(src)
3431        .bind(tgt)
3432        .execute(gs.pool())
3433        .await
3434        .unwrap();
3435
3436        let history = gs.edge_history(src, "works at", None, 100).await.unwrap();
3437        assert_eq!(history.len(), 2, "both versions must be returned");
3438        assert!(
3439            history[0].valid_from > history[1].valid_from,
3440            "ordered valid_from DESC — versions have distinct timestamps"
3441        );
3442    }
3443
3444    #[tokio::test]
3445    async fn edge_history_for_entity_includes_expired() {
3446        let gs = setup().await;
3447        let a = gs
3448            .upsert_entity("HistA", "HistA", EntityType::Concept, None)
3449            .await
3450            .unwrap();
3451        let b = gs
3452            .upsert_entity("HistB", "HistB", EntityType::Concept, None)
3453            .await
3454            .unwrap();
3455
3456        // Insert and immediately invalidate first edge
3457        let e1 = gs
3458            .insert_edge(a, b, "uses", "old fact", 0.8, None)
3459            .await
3460            .unwrap();
3461        gs.invalidate_edge(e1).await.unwrap();
3462        // Insert new active edge
3463        gs.insert_edge(a, b, "uses", "new fact", 0.9, None)
3464            .await
3465            .unwrap();
3466
3467        let history = gs.edge_history_for_entity(a, 10).await.unwrap();
3468        assert_eq!(
3469            history.len(),
3470            2,
3471            "both active and expired edges must appear"
3472        );
3473
3474        // Most recent first — active edge has later valid_from
3475        let active = history.iter().find(|e| e.valid_to.is_none());
3476        let expired = history.iter().find(|e| e.valid_to.is_some());
3477        assert!(active.is_some(), "active edge must be in history");
3478        assert!(expired.is_some(), "expired edge must be in history");
3479    }
3480
3481    #[tokio::test]
3482    async fn edge_history_for_entity_both_directions() {
3483        let gs = setup().await;
3484        let a = gs
3485            .upsert_entity("DirA", "DirA", EntityType::Concept, None)
3486            .await
3487            .unwrap();
3488        let b = gs
3489            .upsert_entity("DirB", "DirB", EntityType::Concept, None)
3490            .await
3491            .unwrap();
3492        let c = gs
3493            .upsert_entity("DirC", "DirC", EntityType::Concept, None)
3494            .await
3495            .unwrap();
3496
3497        gs.insert_edge(a, b, "r1", "f1", 1.0, None).await.unwrap();
3498        gs.insert_edge(c, a, "r2", "f2", 1.0, None).await.unwrap();
3499
3500        let history = gs.edge_history_for_entity(a, 10).await.unwrap();
3501        assert_eq!(
3502            history.len(),
3503            2,
3504            "both outgoing and incoming edges must appear"
3505        );
3506        assert!(
3507            history
3508                .iter()
3509                .any(|e| e.source_entity_id == a && e.target_entity_id == b)
3510        );
3511        assert!(
3512            history
3513                .iter()
3514                .any(|e| e.source_entity_id == c && e.target_entity_id == a)
3515        );
3516    }
3517
3518    #[tokio::test]
3519    async fn edge_history_for_entity_respects_limit() {
3520        let gs = setup().await;
3521        let a = gs
3522            .upsert_entity("LimA", "LimA", EntityType::Concept, None)
3523            .await
3524            .unwrap();
3525        let b = gs
3526            .upsert_entity("LimB", "LimB", EntityType::Concept, None)
3527            .await
3528            .unwrap();
3529
3530        for i in 0..5u32 {
3531            gs.insert_edge(a, b, &format!("r{i}"), &format!("fact {i}"), 1.0, None)
3532                .await
3533                .unwrap();
3534        }
3535
3536        let history = gs.edge_history_for_entity(a, 2).await.unwrap();
3537        assert_eq!(history.len(), 2, "limit must be respected");
3538    }
3539
3540    #[tokio::test]
3541    async fn edge_history_limit_parameter() {
3542        let gs = setup().await;
3543        let src = gs
3544            .upsert_entity("EHL_Src", "EHL_Src", EntityType::Person, None)
3545            .await
3546            .unwrap();
3547        let tgt = gs
3548            .upsert_entity("EHL_Tgt", "EHL_Tgt", EntityType::Organization, None)
3549            .await
3550            .unwrap();
3551
3552        for (year, rel) in [
3553            (2018i32, "worked_at_v1"),
3554            (2019, "worked_at_v2"),
3555            (2020, "worked_at_v3"),
3556            (2021, "worked_at_v4"),
3557            (2022, "worked_at_v5"),
3558        ] {
3559            let valid_from = format!("{year}-01-01 00:00:00");
3560            sqlx::query(
3561                "INSERT INTO graph_edges
3562                 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3563                 VALUES (?1, ?2, ?3, 'EHL_Src worked at org', 1.0, ?4)",
3564            )
3565            .bind(src)
3566            .bind(tgt)
3567            .bind(rel)
3568            .bind(valid_from)
3569            .execute(gs.pool())
3570            .await
3571            .unwrap();
3572        }
3573
3574        // Pre-condition: all 5 rows match without a limit.
3575        let all = gs.edge_history(src, "worked at", None, 100).await.unwrap();
3576        assert_eq!(
3577            all.len(),
3578            5,
3579            "all 5 rows must match without limit constraint"
3580        );
3581
3582        let limited = gs.edge_history(src, "worked at", None, 2).await.unwrap();
3583        assert_eq!(limited.len(), 2, "limit=2 must truncate to 2 results");
3584        assert!(
3585            limited[0].valid_from > limited[1].valid_from,
3586            "most recent results first"
3587        );
3588    }
3589
3590    #[tokio::test]
3591    async fn edge_history_non_matching_relation_returns_empty() {
3592        let gs = setup().await;
3593        let src = gs
3594            .upsert_entity("EHR_Src", "EHR_Src", EntityType::Person, None)
3595            .await
3596            .unwrap();
3597        let tgt = gs
3598            .upsert_entity("EHR_Tgt", "EHR_Tgt", EntityType::Organization, None)
3599            .await
3600            .unwrap();
3601
3602        sqlx::query(
3603            "INSERT INTO graph_edges
3604             (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3605             VALUES (?1, ?2, 'works_at', 'EHR_Src works at place', 1.0, '2020-01-01 00:00:00')",
3606        )
3607        .bind(src)
3608        .bind(tgt)
3609        .execute(gs.pool())
3610        .await
3611        .unwrap();
3612
3613        let result = gs
3614            .edge_history(src, "works at", Some("lives_in"), 100)
3615            .await
3616            .unwrap();
3617        assert!(
3618            result.is_empty(),
3619            "relation filter with no match must return empty"
3620        );
3621    }
3622
3623    #[tokio::test]
3624    async fn edge_history_empty_entity() {
3625        let gs = setup().await;
3626        let src = gs
3627            .upsert_entity("EHE_Src", "EHE_Src", EntityType::Person, None)
3628            .await
3629            .unwrap();
3630
3631        let result = gs.edge_history(src, "anything", None, 100).await.unwrap();
3632        assert!(
3633            result.is_empty(),
3634            "entity with no edges must return empty history"
3635        );
3636    }
3637
3638    #[tokio::test]
3639    async fn edge_history_fact_substring_filters_subset() {
3640        let gs = setup().await;
3641        let src = gs
3642            .upsert_entity("EHP_Src", "EHP_Src", EntityType::Person, None)
3643            .await
3644            .unwrap();
3645        let tgt = gs
3646            .upsert_entity("EHP_Tgt", "EHP_Tgt", EntityType::Concept, None)
3647            .await
3648            .unwrap();
3649
3650        // Two facts containing "uses" and one containing "knows" (distinct relations to avoid
3651        // UNIQUE(source, target, relation) violation).
3652        for (rel, fact) in [
3653            ("uses_lang1", "EHP_Src uses Rust"),
3654            ("uses_lang2", "EHP_Src uses Python"),
3655            ("knows_person", "EHP_Src knows Bob"),
3656        ] {
3657            sqlx::query(
3658                "INSERT INTO graph_edges
3659                 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3660                 VALUES (?1, ?2, ?3, ?4, 1.0, '2020-01-01 00:00:00')",
3661            )
3662            .bind(src)
3663            .bind(tgt)
3664            .bind(rel)
3665            .bind(fact)
3666            .execute(gs.pool())
3667            .await
3668            .unwrap();
3669        }
3670
3671        // All 3 facts match the empty-ish predicate "src" (present in every fact prefix).
3672        let all = gs.edge_history(src, "EHP_Src", None, 100).await.unwrap();
3673        assert_eq!(all.len(), 3, "broad predicate must return all 3 facts");
3674
3675        // Narrow predicate "uses" matches only the two Rust/Python facts.
3676        let filtered = gs.edge_history(src, "uses", None, 100).await.unwrap();
3677        assert_eq!(
3678            filtered.len(),
3679            2,
3680            "predicate 'uses' must match only the two 'uses' facts"
3681        );
3682        assert!(
3683            filtered.len() < all.len(),
3684            "filtered count must be less than total count"
3685        );
3686    }
3687
3688    // bfs_at_timestamp
3689
3690    #[tokio::test]
3691    async fn bfs_at_timestamp_zero_hops() {
3692        let gs = setup().await;
3693        let a = gs
3694            .upsert_entity("ZH_A", "ZH_A", EntityType::Person, None)
3695            .await
3696            .unwrap();
3697        let b = gs
3698            .upsert_entity("ZH_B", "ZH_B", EntityType::Person, None)
3699            .await
3700            .unwrap();
3701        // Use an explicit valid_from so the query timestamp is within the data range.
3702        sqlx::query(
3703            "INSERT INTO graph_edges
3704             (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3705             VALUES (?1, ?2, 'knows', 'ZH_A knows ZH_B', 1.0, '2020-01-01 00:00:00')",
3706        )
3707        .bind(a)
3708        .bind(b)
3709        .execute(gs.pool())
3710        .await
3711        .unwrap();
3712
3713        let (_entities, edges, depth_map) = gs
3714            .bfs_at_timestamp(a, 0, "2025-01-01 00:00:00")
3715            .await
3716            .unwrap();
3717        assert!(
3718            depth_map.contains_key(&a),
3719            "start entity must be in depth_map"
3720        );
3721        assert_eq!(depth_map.len(), 1, "depth=0 must include only start entity");
3722        assert!(edges.is_empty(), "depth=0 must return no edges");
3723    }
3724
3725    #[tokio::test]
3726    async fn bfs_at_timestamp_expired_intermediate_blocks() {
3727        let gs = setup().await;
3728        let a = gs
3729            .upsert_entity("EI_A", "EI_A", EntityType::Person, None)
3730            .await
3731            .unwrap();
3732        let b = gs
3733            .upsert_entity("EI_B", "EI_B", EntityType::Person, None)
3734            .await
3735            .unwrap();
3736        let c = gs
3737            .upsert_entity("EI_C", "EI_C", EntityType::Person, None)
3738            .await
3739            .unwrap();
3740
3741        // A->B: expired in 2022.
3742        sqlx::query(
3743            "INSERT INTO graph_edges
3744             (source_entity_id, target_entity_id, relation, fact, confidence,
3745              valid_from, valid_to, expired_at)
3746             VALUES (?1, ?2, 'link', 'EI_A link EI_B', 1.0,
3747                     '2020-01-01 00:00:00', '2022-01-01 00:00:00', '2022-01-01 00:00:00')",
3748        )
3749        .bind(a)
3750        .bind(b)
3751        .execute(gs.pool())
3752        .await
3753        .unwrap();
3754        // B->C: active.
3755        sqlx::query(
3756            "INSERT INTO graph_edges
3757             (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3758             VALUES (?1, ?2, 'link', 'EI_B link EI_C', 1.0, '2020-01-01 00:00:00')",
3759        )
3760        .bind(b)
3761        .bind(c)
3762        .execute(gs.pool())
3763        .await
3764        .unwrap();
3765
3766        let (entities, _edges, depth_map) = gs
3767            .bfs_at_timestamp(a, 3, "2025-01-01 00:00:00")
3768            .await
3769            .unwrap();
3770        let entity_ids: Vec<i64> = entities.iter().map(|e| e.id).collect();
3771        assert!(
3772            !depth_map.contains_key(&b),
3773            "B must not be reachable (A->B expired)"
3774        );
3775        assert!(
3776            !entity_ids.contains(&c),
3777            "C must not be reachable (blocked by expired A->B)"
3778        );
3779    }
3780
3781    #[tokio::test]
3782    async fn bfs_at_timestamp_disconnected_entity() {
3783        let gs = setup().await;
3784        let a = gs
3785            .upsert_entity("DC_A", "DC_A", EntityType::Person, None)
3786            .await
3787            .unwrap();
3788
3789        let (_entities, edges, depth_map) = gs
3790            .bfs_at_timestamp(a, 3, "2025-01-01 00:00:00")
3791            .await
3792            .unwrap();
3793        assert_eq!(depth_map.len(), 1, "disconnected entity has only itself");
3794        assert!(depth_map.contains_key(&a));
3795        assert!(edges.is_empty(), "disconnected entity has no edges");
3796    }
3797
3798    #[tokio::test]
3799    async fn bfs_at_timestamp_reverse_direction() {
3800        let gs = setup().await;
3801        let a = gs
3802            .upsert_entity("RD_A", "RD_A", EntityType::Person, None)
3803            .await
3804            .unwrap();
3805        let b = gs
3806            .upsert_entity("RD_B", "RD_B", EntityType::Person, None)
3807            .await
3808            .unwrap();
3809
3810        // B -> A (B is source, A is target).
3811        sqlx::query(
3812            "INSERT INTO graph_edges
3813             (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3814             VALUES (?1, ?2, 'points_to', 'RD_B points_to RD_A', 1.0, '2020-01-01 00:00:00')",
3815        )
3816        .bind(b)
3817        .bind(a)
3818        .execute(gs.pool())
3819        .await
3820        .unwrap();
3821
3822        let (entities, edges, depth_map) = gs
3823            .bfs_at_timestamp(a, 1, "2099-01-01 00:00:00")
3824            .await
3825            .unwrap();
3826        let entity_ids: Vec<i64> = entities.iter().map(|e| e.id).collect();
3827        assert!(
3828            depth_map.contains_key(&b),
3829            "B must be reachable when BFS traverses reverse direction"
3830        );
3831        assert!(entity_ids.contains(&b), "B must appear in entities vec");
3832        assert!(
3833            edges
3834                .iter()
3835                .any(|e| e.source_entity_id == b && e.target_entity_id == a),
3836            "traversed edge B->A must appear in returned edges"
3837        );
3838    }
3839
3840    #[tokio::test]
3841    async fn bfs_at_timestamp_valid_to_boundary() {
3842        let gs = setup().await;
3843        let a = gs
3844            .upsert_entity("VTB_A", "VTB_A", EntityType::Person, None)
3845            .await
3846            .unwrap();
3847        let b = gs
3848            .upsert_entity("VTB_B", "VTB_B", EntityType::Person, None)
3849            .await
3850            .unwrap();
3851
3852        // A->B: valid_to = "2025-06-01 00:00:00" (exactly).
3853        sqlx::query(
3854            "INSERT INTO graph_edges
3855             (source_entity_id, target_entity_id, relation, fact, confidence,
3856              valid_from, valid_to, expired_at)
3857             VALUES (?1, ?2, 'link', 'VTB_A link VTB_B', 1.0,
3858                     '2020-01-01 00:00:00', '2025-06-01 00:00:00', '2025-06-01 00:00:00')",
3859        )
3860        .bind(a)
3861        .bind(b)
3862        .execute(gs.pool())
3863        .await
3864        .unwrap();
3865
3866        // Query at exactly valid_to — B must NOT be reachable (exclusive upper bound).
3867        let (_entities, _edges, depth_map_at) = gs
3868            .bfs_at_timestamp(a, 1, "2025-06-01 00:00:00")
3869            .await
3870            .unwrap();
3871        assert!(
3872            !depth_map_at.contains_key(&b),
3873            "B must not be reachable when valid_to == ts (exclusive boundary)"
3874        );
3875
3876        // Query one second before — B must be reachable.
3877        let (_entities2, _edges2, depth_map_before) = gs
3878            .bfs_at_timestamp(a, 1, "2025-05-31 23:59:59")
3879            .await
3880            .unwrap();
3881        assert!(
3882            depth_map_before.contains_key(&b),
3883            "B must be reachable one second before valid_to"
3884        );
3885    }
3886}