1use std::collections::HashMap;
5
6use futures::Stream;
7use sqlx::SqlitePool;
8
9use crate::error::MemoryError;
10use crate::sqlite::messages::sanitize_fts5_query;
11use crate::types::MessageId;
12
13use super::types::{Community, Edge, Entity, EntityAlias, EntityType};
14
15pub struct GraphStore {
16 pool: SqlitePool,
17}
18
19impl GraphStore {
20 #[must_use]
21 pub fn new(pool: SqlitePool) -> Self {
22 Self { pool }
23 }
24
25 #[must_use]
26 pub fn pool(&self) -> &SqlitePool {
27 &self.pool
28 }
29
30 pub async fn upsert_entity(
43 &self,
44 surface_name: &str,
45 canonical_name: &str,
46 entity_type: EntityType,
47 summary: Option<&str>,
48 ) -> Result<i64, MemoryError> {
49 let type_str = entity_type.as_str();
50 let id: i64 = sqlx::query_scalar(
51 "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
52 VALUES (?1, ?2, ?3, ?4)
53 ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
54 name = excluded.name,
55 summary = COALESCE(excluded.summary, summary),
56 last_seen_at = datetime('now')
57 RETURNING id",
58 )
59 .bind(surface_name)
60 .bind(canonical_name)
61 .bind(type_str)
62 .bind(summary)
63 .fetch_one(&self.pool)
64 .await?;
65 Ok(id)
66 }
67
68 pub async fn find_entity(
74 &self,
75 canonical_name: &str,
76 entity_type: EntityType,
77 ) -> Result<Option<Entity>, MemoryError> {
78 let type_str = entity_type.as_str();
79 let row: Option<EntityRow> = sqlx::query_as(
80 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
81 FROM graph_entities
82 WHERE canonical_name = ?1 AND entity_type = ?2",
83 )
84 .bind(canonical_name)
85 .bind(type_str)
86 .fetch_optional(&self.pool)
87 .await?;
88 row.map(entity_from_row).transpose()
89 }
90
91 pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
97 let row: Option<EntityRow> = sqlx::query_as(
98 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
99 FROM graph_entities
100 WHERE id = ?1",
101 )
102 .bind(entity_id)
103 .fetch_optional(&self.pool)
104 .await?;
105 row.map(entity_from_row).transpose()
106 }
107
108 pub async fn set_entity_qdrant_point_id(
114 &self,
115 entity_id: i64,
116 point_id: &str,
117 ) -> Result<(), MemoryError> {
118 sqlx::query("UPDATE graph_entities SET qdrant_point_id = ?1 WHERE id = ?2")
119 .bind(point_id)
120 .bind(entity_id)
121 .execute(&self.pool)
122 .await?;
123 Ok(())
124 }
125
126 pub async fn find_entities_fuzzy(
147 &self,
148 query: &str,
149 limit: usize,
150 ) -> Result<Vec<Entity>, MemoryError> {
151 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
155 let query = &query[..query.floor_char_boundary(512)];
156 let sanitized = sanitize_fts5_query(query);
159 if sanitized.is_empty() {
160 return Ok(vec![]);
161 }
162 let fts_query: String = sanitized
163 .split_whitespace()
164 .filter(|t| !FTS5_OPERATORS.contains(t))
165 .map(|t| format!("{t}*"))
166 .collect::<Vec<_>>()
167 .join(" ");
168 if fts_query.is_empty() {
169 return Ok(vec![]);
170 }
171
172 let limit = i64::try_from(limit)?;
173 let rows: Vec<EntityRow> = sqlx::query_as(
176 "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary,
177 e.first_seen_at, e.last_seen_at, e.qdrant_point_id
178 FROM graph_entities_fts fts
179 JOIN graph_entities e ON e.id = fts.rowid
180 WHERE graph_entities_fts MATCH ?1
181 UNION
182 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary,
183 e.first_seen_at, e.last_seen_at, e.qdrant_point_id
184 FROM graph_entity_aliases a
185 JOIN graph_entities e ON e.id = a.entity_id
186 WHERE a.alias_name LIKE ?2 ESCAPE '\\' COLLATE NOCASE
187 LIMIT ?3",
188 )
189 .bind(&fts_query)
190 .bind(format!(
191 "%{}%",
192 query
193 .trim()
194 .replace('\\', "\\\\")
195 .replace('%', "\\%")
196 .replace('_', "\\_")
197 ))
198 .bind(limit)
199 .fetch_all(&self.pool)
200 .await?;
201 rows.into_iter()
202 .map(entity_from_row)
203 .collect::<Result<Vec<_>, _>>()
204 }
205
206 pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
208 use futures::StreamExt as _;
209 sqlx::query_as::<_, EntityRow>(
210 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
211 FROM graph_entities ORDER BY id ASC",
212 )
213 .fetch(&self.pool)
214 .map(|r: Result<EntityRow, sqlx::Error>| {
215 r.map_err(MemoryError::from).and_then(entity_from_row)
216 })
217 }
218
219 pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
227 sqlx::query(
228 "INSERT OR IGNORE INTO graph_entity_aliases (entity_id, alias_name) VALUES (?1, ?2)",
229 )
230 .bind(entity_id)
231 .bind(alias_name)
232 .execute(&self.pool)
233 .await?;
234 Ok(())
235 }
236
237 pub async fn find_entity_by_alias(
245 &self,
246 alias_name: &str,
247 entity_type: EntityType,
248 ) -> Result<Option<Entity>, MemoryError> {
249 let type_str = entity_type.as_str();
250 let row: Option<EntityRow> = sqlx::query_as(
251 "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary,
252 e.first_seen_at, e.last_seen_at, e.qdrant_point_id
253 FROM graph_entity_aliases a
254 JOIN graph_entities e ON e.id = a.entity_id
255 WHERE a.alias_name = ?1 COLLATE NOCASE
256 AND e.entity_type = ?2
257 ORDER BY e.id ASC
258 LIMIT 1",
259 )
260 .bind(alias_name)
261 .bind(type_str)
262 .fetch_optional(&self.pool)
263 .await?;
264 row.map(entity_from_row).transpose()
265 }
266
267 pub async fn aliases_for_entity(
273 &self,
274 entity_id: i64,
275 ) -> Result<Vec<EntityAlias>, MemoryError> {
276 let rows: Vec<AliasRow> = sqlx::query_as(
277 "SELECT id, entity_id, alias_name, created_at
278 FROM graph_entity_aliases
279 WHERE entity_id = ?1
280 ORDER BY id ASC",
281 )
282 .bind(entity_id)
283 .fetch_all(&self.pool)
284 .await?;
285 Ok(rows.into_iter().map(alias_from_row).collect())
286 }
287
288 pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
294 use futures::TryStreamExt as _;
295 self.all_entities_stream().try_collect().await
296 }
297
298 pub async fn entity_count(&self) -> Result<i64, MemoryError> {
304 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM graph_entities")
305 .fetch_one(&self.pool)
306 .await?;
307 Ok(count)
308 }
309
310 pub async fn insert_edge(
323 &self,
324 source_entity_id: i64,
325 target_entity_id: i64,
326 relation: &str,
327 fact: &str,
328 confidence: f32,
329 episode_id: Option<MessageId>,
330 ) -> Result<i64, MemoryError> {
331 let confidence = confidence.clamp(0.0, 1.0);
332
333 let existing: Option<(i64, f64)> = sqlx::query_as(
334 "SELECT id, confidence FROM graph_edges
335 WHERE source_entity_id = ?1
336 AND target_entity_id = ?2
337 AND relation = ?3
338 AND valid_to IS NULL
339 LIMIT 1",
340 )
341 .bind(source_entity_id)
342 .bind(target_entity_id)
343 .bind(relation)
344 .fetch_optional(&self.pool)
345 .await?;
346
347 if let Some((existing_id, stored_conf)) = existing {
348 let updated_conf = f64::from(confidence).max(stored_conf);
349 sqlx::query("UPDATE graph_edges SET confidence = ?1 WHERE id = ?2")
350 .bind(updated_conf)
351 .bind(existing_id)
352 .execute(&self.pool)
353 .await?;
354 return Ok(existing_id);
355 }
356
357 let episode_raw: Option<i64> = episode_id.map(|m| m.0);
358 let id: i64 = sqlx::query_scalar(
359 "INSERT INTO graph_edges (source_entity_id, target_entity_id, relation, fact, confidence, episode_id)
360 VALUES (?1, ?2, ?3, ?4, ?5, ?6)
361 RETURNING id",
362 )
363 .bind(source_entity_id)
364 .bind(target_entity_id)
365 .bind(relation)
366 .bind(fact)
367 .bind(f64::from(confidence))
368 .bind(episode_raw)
369 .fetch_one(&self.pool)
370 .await?;
371 Ok(id)
372 }
373
374 pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
380 sqlx::query(
381 "UPDATE graph_edges SET valid_to = datetime('now'), expired_at = datetime('now')
382 WHERE id = ?1",
383 )
384 .bind(edge_id)
385 .execute(&self.pool)
386 .await?;
387 Ok(())
388 }
389
390 pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
396 let rows: Vec<EdgeRow> = sqlx::query_as(
397 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
398 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
399 FROM graph_edges
400 WHERE valid_to IS NULL
401 AND (source_entity_id = ?1 OR target_entity_id = ?1)",
402 )
403 .bind(entity_id)
404 .fetch_all(&self.pool)
405 .await?;
406 Ok(rows.into_iter().map(edge_from_row).collect())
407 }
408
409 pub async fn edge_history_for_entity(
416 &self,
417 entity_id: i64,
418 limit: usize,
419 ) -> Result<Vec<Edge>, MemoryError> {
420 let limit = i64::try_from(limit)?;
421 let rows: Vec<EdgeRow> = sqlx::query_as(
422 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
423 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
424 FROM graph_edges
425 WHERE source_entity_id = ?1 OR target_entity_id = ?1
426 ORDER BY valid_from DESC
427 LIMIT ?2",
428 )
429 .bind(entity_id)
430 .bind(limit)
431 .fetch_all(&self.pool)
432 .await?;
433 Ok(rows.into_iter().map(edge_from_row).collect())
434 }
435
436 pub async fn edges_between(
442 &self,
443 entity_a: i64,
444 entity_b: i64,
445 ) -> Result<Vec<Edge>, MemoryError> {
446 let rows: Vec<EdgeRow> = sqlx::query_as(
447 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
448 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
449 FROM graph_edges
450 WHERE valid_to IS NULL
451 AND ((source_entity_id = ?1 AND target_entity_id = ?2)
452 OR (source_entity_id = ?2 AND target_entity_id = ?1))",
453 )
454 .bind(entity_a)
455 .bind(entity_b)
456 .fetch_all(&self.pool)
457 .await?;
458 Ok(rows.into_iter().map(edge_from_row).collect())
459 }
460
461 pub async fn edges_exact(
467 &self,
468 source_entity_id: i64,
469 target_entity_id: i64,
470 ) -> Result<Vec<Edge>, MemoryError> {
471 let rows: Vec<EdgeRow> = sqlx::query_as(
472 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
473 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
474 FROM graph_edges
475 WHERE valid_to IS NULL
476 AND source_entity_id = ?1
477 AND target_entity_id = ?2",
478 )
479 .bind(source_entity_id)
480 .bind(target_entity_id)
481 .fetch_all(&self.pool)
482 .await?;
483 Ok(rows.into_iter().map(edge_from_row).collect())
484 }
485
486 pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
492 let count: i64 =
493 sqlx::query_scalar("SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL")
494 .fetch_one(&self.pool)
495 .await?;
496 Ok(count)
497 }
498
499 pub async fn upsert_community(
511 &self,
512 name: &str,
513 summary: &str,
514 entity_ids: &[i64],
515 fingerprint: Option<&str>,
516 ) -> Result<i64, MemoryError> {
517 let entity_ids_json = serde_json::to_string(entity_ids)?;
518 let id: i64 = sqlx::query_scalar(
519 "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
520 VALUES (?1, ?2, ?3, ?4)
521 ON CONFLICT(name) DO UPDATE SET
522 summary = excluded.summary,
523 entity_ids = excluded.entity_ids,
524 fingerprint = COALESCE(excluded.fingerprint, fingerprint),
525 updated_at = datetime('now')
526 RETURNING id",
527 )
528 .bind(name)
529 .bind(summary)
530 .bind(entity_ids_json)
531 .bind(fingerprint)
532 .fetch_one(&self.pool)
533 .await?;
534 Ok(id)
535 }
536
537 pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
544 let rows: Vec<(String, i64)> = sqlx::query_as(
545 "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL",
546 )
547 .fetch_all(&self.pool)
548 .await?;
549 Ok(rows.into_iter().collect())
550 }
551
552 pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
558 sqlx::query("DELETE FROM graph_communities WHERE id = ?1")
559 .bind(id)
560 .execute(&self.pool)
561 .await?;
562 Ok(())
563 }
564
565 pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
574 sqlx::query("UPDATE graph_communities SET fingerprint = NULL WHERE id = ?1")
575 .bind(id)
576 .execute(&self.pool)
577 .await?;
578 Ok(())
579 }
580
581 pub async fn community_for_entity(
590 &self,
591 entity_id: i64,
592 ) -> Result<Option<Community>, MemoryError> {
593 let row: Option<CommunityRow> = sqlx::query_as(
594 "SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
595 FROM graph_communities c, json_each(c.entity_ids) j
596 WHERE CAST(j.value AS INTEGER) = ?1
597 LIMIT 1",
598 )
599 .bind(entity_id)
600 .fetch_optional(&self.pool)
601 .await?;
602 match row {
603 Some(row) => {
604 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
605 Ok(Some(Community {
606 id: row.id,
607 name: row.name,
608 summary: row.summary,
609 entity_ids,
610 fingerprint: row.fingerprint,
611 created_at: row.created_at,
612 updated_at: row.updated_at,
613 }))
614 }
615 None => Ok(None),
616 }
617 }
618
619 pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
625 let rows: Vec<CommunityRow> = sqlx::query_as(
626 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
627 FROM graph_communities
628 ORDER BY id ASC",
629 )
630 .fetch_all(&self.pool)
631 .await?;
632
633 rows.into_iter()
634 .map(|row| {
635 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
636 Ok(Community {
637 id: row.id,
638 name: row.name,
639 summary: row.summary,
640 entity_ids,
641 fingerprint: row.fingerprint,
642 created_at: row.created_at,
643 updated_at: row.updated_at,
644 })
645 })
646 .collect()
647 }
648
649 pub async fn community_count(&self) -> Result<i64, MemoryError> {
655 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM graph_communities")
656 .fetch_one(&self.pool)
657 .await?;
658 Ok(count)
659 }
660
661 pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
669 let val: Option<String> =
670 sqlx::query_scalar("SELECT value FROM graph_metadata WHERE key = ?1")
671 .bind(key)
672 .fetch_optional(&self.pool)
673 .await?;
674 Ok(val)
675 }
676
677 pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
683 sqlx::query(
684 "INSERT INTO graph_metadata (key, value) VALUES (?1, ?2)
685 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
686 )
687 .bind(key)
688 .bind(value)
689 .execute(&self.pool)
690 .await?;
691 Ok(())
692 }
693
694 pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
702 let val = self.get_metadata("extraction_count").await?;
703 Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
704 }
705
706 pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
708 use futures::StreamExt as _;
709 sqlx::query_as::<_, EdgeRow>(
710 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
711 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
712 FROM graph_edges
713 WHERE valid_to IS NULL
714 ORDER BY id ASC",
715 )
716 .fetch(&self.pool)
717 .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
718 }
719
720 pub async fn edges_after_id(
737 &self,
738 after_id: i64,
739 limit: i64,
740 ) -> Result<Vec<Edge>, MemoryError> {
741 let rows: Vec<EdgeRow> = sqlx::query_as(
742 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
743 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
744 FROM graph_edges
745 WHERE valid_to IS NULL AND id > ?1
746 ORDER BY id ASC
747 LIMIT ?2",
748 )
749 .bind(after_id)
750 .bind(limit)
751 .fetch_all(&self.pool)
752 .await?;
753 Ok(rows.into_iter().map(edge_from_row).collect())
754 }
755
756 pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
762 let row: Option<CommunityRow> = sqlx::query_as(
763 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
764 FROM graph_communities
765 WHERE id = ?1",
766 )
767 .bind(id)
768 .fetch_optional(&self.pool)
769 .await?;
770 match row {
771 Some(row) => {
772 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
773 Ok(Some(Community {
774 id: row.id,
775 name: row.name,
776 summary: row.summary,
777 entity_ids,
778 fingerprint: row.fingerprint,
779 created_at: row.created_at,
780 updated_at: row.updated_at,
781 }))
782 }
783 None => Ok(None),
784 }
785 }
786
787 pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
793 sqlx::query("DELETE FROM graph_communities")
794 .execute(&self.pool)
795 .await?;
796 Ok(())
797 }
798
799 pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
805 let days = i64::from(retention_days);
806 let result = sqlx::query(
807 "DELETE FROM graph_edges
808 WHERE expired_at IS NOT NULL
809 AND expired_at < datetime('now', '-' || ?1 || ' days')",
810 )
811 .bind(days)
812 .execute(&self.pool)
813 .await?;
814 Ok(usize::try_from(result.rows_affected())?)
815 }
816
817 pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
823 let days = i64::from(retention_days);
824 let result = sqlx::query(
825 "DELETE FROM graph_entities
826 WHERE id NOT IN (
827 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
828 UNION
829 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
830 )
831 AND last_seen_at < datetime('now', '-' || ?1 || ' days')",
832 )
833 .bind(days)
834 .execute(&self.pool)
835 .await?;
836 Ok(usize::try_from(result.rows_affected())?)
837 }
838
839 pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
848 let current = self.entity_count().await?;
849 let max = i64::try_from(max_entities)?;
850 if current <= max {
851 return Ok(0);
852 }
853 let excess = current - max;
854 let result = sqlx::query(
855 "DELETE FROM graph_entities
856 WHERE id IN (
857 SELECT e.id
858 FROM graph_entities e
859 LEFT JOIN (
860 SELECT source_entity_id AS eid, COUNT(*) AS cnt
861 FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
862 UNION ALL
863 SELECT target_entity_id AS eid, COUNT(*) AS cnt
864 FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
865 ) edge_counts ON e.id = edge_counts.eid
866 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
867 LIMIT ?1
868 )",
869 )
870 .bind(excess)
871 .execute(&self.pool)
872 .await?;
873 Ok(usize::try_from(result.rows_affected())?)
874 }
875
876 pub async fn edges_at_timestamp(
890 &self,
891 entity_id: i64,
892 timestamp: &str,
893 ) -> Result<Vec<Edge>, MemoryError> {
894 let rows: Vec<EdgeRow> = sqlx::query_as(
898 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
899 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
900 FROM graph_edges
901 WHERE valid_to IS NULL
902 AND valid_from <= ?2
903 AND (source_entity_id = ?1 OR target_entity_id = ?1)
904 UNION ALL
905 SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
906 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
907 FROM graph_edges
908 WHERE valid_to IS NOT NULL
909 AND valid_from <= ?2
910 AND valid_to > ?2
911 AND (source_entity_id = ?1 OR target_entity_id = ?1)",
912 )
913 .bind(entity_id)
914 .bind(timestamp)
915 .fetch_all(&self.pool)
916 .await?;
917 Ok(rows.into_iter().map(edge_from_row).collect())
918 }
919
920 pub async fn edge_history(
929 &self,
930 source_entity_id: i64,
931 predicate: &str,
932 relation: Option<&str>,
933 limit: usize,
934 ) -> Result<Vec<Edge>, MemoryError> {
935 let escaped = predicate
937 .replace('\\', "\\\\")
938 .replace('%', "\\%")
939 .replace('_', "\\_");
940 let like_pattern = format!("%{escaped}%");
941 let limit = i64::try_from(limit)?;
942 let rows: Vec<EdgeRow> = if let Some(rel) = relation {
943 sqlx::query_as(
944 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
945 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
946 FROM graph_edges
947 WHERE source_entity_id = ?1
948 AND fact LIKE ?2 ESCAPE '\\'
949 AND relation = ?3
950 ORDER BY valid_from DESC
951 LIMIT ?4",
952 )
953 .bind(source_entity_id)
954 .bind(&like_pattern)
955 .bind(rel)
956 .bind(limit)
957 .fetch_all(&self.pool)
958 .await?
959 } else {
960 sqlx::query_as(
961 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
962 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
963 FROM graph_edges
964 WHERE source_entity_id = ?1
965 AND fact LIKE ?2 ESCAPE '\\'
966 ORDER BY valid_from DESC
967 LIMIT ?3",
968 )
969 .bind(source_entity_id)
970 .bind(&like_pattern)
971 .bind(limit)
972 .fetch_all(&self.pool)
973 .await?
974 };
975 Ok(rows.into_iter().map(edge_from_row).collect())
976 }
977
978 pub async fn bfs(
995 &self,
996 start_entity_id: i64,
997 max_hops: u32,
998 ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
999 self.bfs_with_depth(start_entity_id, max_hops)
1000 .await
1001 .map(|(e, ed, _)| (e, ed))
1002 }
1003
1004 pub async fn bfs_with_depth(
1015 &self,
1016 start_entity_id: i64,
1017 max_hops: u32,
1018 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1019 self.bfs_core(start_entity_id, max_hops, None).await
1020 }
1021
1022 pub async fn bfs_at_timestamp(
1033 &self,
1034 start_entity_id: i64,
1035 max_hops: u32,
1036 timestamp: &str,
1037 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1038 self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1039 .await
1040 }
1041
1042 async fn bfs_core(
1050 &self,
1051 start_entity_id: i64,
1052 max_hops: u32,
1053 at_timestamp: Option<&str>,
1054 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1055 use std::collections::HashMap;
1056
1057 const MAX_FRONTIER: usize = 300;
1060
1061 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1062 let mut frontier: Vec<i64> = vec![start_entity_id];
1063 depth_map.insert(start_entity_id, 0);
1064
1065 for hop in 0..max_hops {
1066 if frontier.is_empty() {
1067 break;
1068 }
1069 frontier.truncate(MAX_FRONTIER);
1070 let placeholders = frontier
1072 .iter()
1073 .enumerate()
1074 .map(|(i, _)| format!("?{}", i + 1))
1075 .collect::<Vec<_>>()
1076 .join(", ");
1077 let edge_filter = if at_timestamp.is_some() {
1078 let ts_pos = frontier.len() * 3 + 1;
1079 format!("valid_from <= ?{ts_pos} AND (valid_to IS NULL OR valid_to > ?{ts_pos})")
1080 } else {
1081 "valid_to IS NULL".to_owned()
1082 };
1083 let neighbour_sql = format!(
1084 "SELECT DISTINCT CASE
1085 WHEN source_entity_id IN ({placeholders}) THEN target_entity_id
1086 ELSE source_entity_id
1087 END as neighbour_id
1088 FROM graph_edges
1089 WHERE {edge_filter}
1090 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders}))"
1091 );
1092 let mut q = sqlx::query_scalar::<_, i64>(&neighbour_sql);
1093 for id in &frontier {
1094 q = q.bind(*id);
1095 }
1096 for id in &frontier {
1097 q = q.bind(*id);
1098 }
1099 for id in &frontier {
1100 q = q.bind(*id);
1101 }
1102 if let Some(ts) = at_timestamp {
1103 q = q.bind(ts);
1104 }
1105 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1106 let mut next_frontier: Vec<i64> = Vec::new();
1107 for nbr in neighbours {
1108 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1109 e.insert(hop + 1);
1110 next_frontier.push(nbr);
1111 }
1112 }
1113 frontier = next_frontier;
1114 }
1115
1116 self.bfs_fetch_results(depth_map, at_timestamp).await
1117 }
1118
1119 async fn bfs_fetch_results(
1121 &self,
1122 depth_map: std::collections::HashMap<i64, u32>,
1123 at_timestamp: Option<&str>,
1124 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1125 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1126 if visited_ids.is_empty() {
1127 return Ok((Vec::new(), Vec::new(), depth_map));
1128 }
1129 if visited_ids.len() > 499 {
1131 tracing::warn!(
1132 total = visited_ids.len(),
1133 retained = 499,
1134 "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1135 some reachable entities will be dropped from results"
1136 );
1137 visited_ids.truncate(499);
1138 }
1139
1140 let placeholders = visited_ids
1141 .iter()
1142 .enumerate()
1143 .map(|(i, _)| format!("?{}", i + 1))
1144 .collect::<Vec<_>>()
1145 .join(", ");
1146 let edge_filter = if at_timestamp.is_some() {
1147 let ts_pos = visited_ids.len() * 2 + 1;
1148 format!("valid_from <= ?{ts_pos} AND (valid_to IS NULL OR valid_to > ?{ts_pos})")
1149 } else {
1150 "valid_to IS NULL".to_owned()
1151 };
1152 let edge_sql = format!(
1153 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1154 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id
1155 FROM graph_edges
1156 WHERE {edge_filter}
1157 AND source_entity_id IN ({placeholders})
1158 AND target_entity_id IN ({placeholders})"
1159 );
1160 let mut edge_query = sqlx::query_as::<_, EdgeRow>(&edge_sql);
1161 for id in &visited_ids {
1162 edge_query = edge_query.bind(*id);
1163 }
1164 for id in &visited_ids {
1165 edge_query = edge_query.bind(*id);
1166 }
1167 if let Some(ts) = at_timestamp {
1168 edge_query = edge_query.bind(ts);
1169 }
1170 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1171
1172 let entity_sql = format!(
1173 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1174 FROM graph_entities WHERE id IN ({placeholders})"
1175 );
1176 let mut entity_query = sqlx::query_as::<_, EntityRow>(&entity_sql);
1177 for id in &visited_ids {
1178 entity_query = entity_query.bind(*id);
1179 }
1180 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1181
1182 let entities: Vec<Entity> = entity_rows
1183 .into_iter()
1184 .map(entity_from_row)
1185 .collect::<Result<Vec<_>, _>>()?;
1186 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1187
1188 Ok((entities, edges, depth_map))
1189 }
1190
1191 pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
1207 let rows: Vec<EntityRow> = sqlx::query_as(
1208 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1209 FROM graph_entities
1210 WHERE name = ?1 COLLATE NOCASE OR canonical_name = ?1 COLLATE NOCASE
1211 LIMIT 5",
1212 )
1213 .bind(name)
1214 .fetch_all(&self.pool)
1215 .await?;
1216
1217 if !rows.is_empty() {
1218 return rows.into_iter().map(entity_from_row).collect();
1219 }
1220
1221 self.find_entities_fuzzy(name, 5).await
1222 }
1223
1224 pub async fn unprocessed_messages_for_backfill(
1232 &self,
1233 limit: usize,
1234 ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
1235 let limit = i64::try_from(limit)?;
1236 let rows: Vec<(i64, String)> = sqlx::query_as(
1237 "SELECT id, content FROM messages
1238 WHERE graph_processed = 0
1239 ORDER BY id ASC
1240 LIMIT ?1",
1241 )
1242 .bind(limit)
1243 .fetch_all(&self.pool)
1244 .await?;
1245 Ok(rows
1246 .into_iter()
1247 .map(|(id, content)| (crate::types::MessageId(id), content))
1248 .collect())
1249 }
1250
1251 pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
1257 let count: i64 =
1258 sqlx::query_scalar("SELECT COUNT(*) FROM messages WHERE graph_processed = 0")
1259 .fetch_one(&self.pool)
1260 .await?;
1261 Ok(count)
1262 }
1263
1264 pub async fn mark_messages_graph_processed(
1270 &self,
1271 ids: &[crate::types::MessageId],
1272 ) -> Result<(), MemoryError> {
1273 if ids.is_empty() {
1274 return Ok(());
1275 }
1276 let placeholders = ids
1277 .iter()
1278 .enumerate()
1279 .map(|(i, _)| format!("?{}", i + 1))
1280 .collect::<Vec<_>>()
1281 .join(", ");
1282 let sql = format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
1283 let mut query = sqlx::query(&sql);
1284 for id in ids {
1285 query = query.bind(id.0);
1286 }
1287 query.execute(&self.pool).await?;
1288 Ok(())
1289 }
1290}
1291
1292#[derive(sqlx::FromRow)]
1295struct EntityRow {
1296 id: i64,
1297 name: String,
1298 canonical_name: String,
1299 entity_type: String,
1300 summary: Option<String>,
1301 first_seen_at: String,
1302 last_seen_at: String,
1303 qdrant_point_id: Option<String>,
1304}
1305
1306fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
1307 let entity_type = row
1308 .entity_type
1309 .parse::<EntityType>()
1310 .map_err(MemoryError::GraphStore)?;
1311 Ok(Entity {
1312 id: row.id,
1313 name: row.name,
1314 canonical_name: row.canonical_name,
1315 entity_type,
1316 summary: row.summary,
1317 first_seen_at: row.first_seen_at,
1318 last_seen_at: row.last_seen_at,
1319 qdrant_point_id: row.qdrant_point_id,
1320 })
1321}
1322
1323#[derive(sqlx::FromRow)]
1324struct AliasRow {
1325 id: i64,
1326 entity_id: i64,
1327 alias_name: String,
1328 created_at: String,
1329}
1330
1331fn alias_from_row(row: AliasRow) -> EntityAlias {
1332 EntityAlias {
1333 id: row.id,
1334 entity_id: row.entity_id,
1335 alias_name: row.alias_name,
1336 created_at: row.created_at,
1337 }
1338}
1339
1340#[derive(sqlx::FromRow)]
1341struct EdgeRow {
1342 id: i64,
1343 source_entity_id: i64,
1344 target_entity_id: i64,
1345 relation: String,
1346 fact: String,
1347 confidence: f64,
1348 valid_from: String,
1349 valid_to: Option<String>,
1350 created_at: String,
1351 expired_at: Option<String>,
1352 episode_id: Option<i64>,
1353 qdrant_point_id: Option<String>,
1354}
1355
1356fn edge_from_row(row: EdgeRow) -> Edge {
1357 Edge {
1358 id: row.id,
1359 source_entity_id: row.source_entity_id,
1360 target_entity_id: row.target_entity_id,
1361 relation: row.relation,
1362 fact: row.fact,
1363 #[allow(clippy::cast_possible_truncation)]
1364 confidence: row.confidence as f32,
1365 valid_from: row.valid_from,
1366 valid_to: row.valid_to,
1367 created_at: row.created_at,
1368 expired_at: row.expired_at,
1369 episode_id: row.episode_id.map(MessageId),
1370 qdrant_point_id: row.qdrant_point_id,
1371 }
1372}
1373
1374#[derive(sqlx::FromRow)]
1375struct CommunityRow {
1376 id: i64,
1377 name: String,
1378 summary: String,
1379 entity_ids: String,
1380 fingerprint: Option<String>,
1381 created_at: String,
1382 updated_at: String,
1383}
1384
1385#[cfg(test)]
1388mod tests {
1389 use super::*;
1390 use crate::sqlite::SqliteStore;
1391
1392 async fn setup() -> GraphStore {
1393 let store = SqliteStore::new(":memory:").await.unwrap();
1394 GraphStore::new(store.pool().clone())
1395 }
1396
1397 #[tokio::test]
1398 async fn upsert_entity_insert_new() {
1399 let gs = setup().await;
1400 let id = gs
1401 .upsert_entity("Alice", "Alice", EntityType::Person, Some("a person"))
1402 .await
1403 .unwrap();
1404 assert!(id > 0);
1405 }
1406
1407 #[tokio::test]
1408 async fn upsert_entity_update_existing() {
1409 let gs = setup().await;
1410 let id1 = gs
1411 .upsert_entity("Alice", "Alice", EntityType::Person, None)
1412 .await
1413 .unwrap();
1414 let id2 = gs
1417 .upsert_entity("Alice", "Alice", EntityType::Person, Some("updated"))
1418 .await
1419 .unwrap();
1420 assert_eq!(id1, id2);
1421 let entity = gs
1422 .find_entity("Alice", EntityType::Person)
1423 .await
1424 .unwrap()
1425 .unwrap();
1426 assert_eq!(entity.summary.as_deref(), Some("updated"));
1427 }
1428
1429 #[tokio::test]
1430 async fn find_entity_found() {
1431 let gs = setup().await;
1432 gs.upsert_entity("Bob", "Bob", EntityType::Tool, Some("a tool"))
1433 .await
1434 .unwrap();
1435 let entity = gs
1436 .find_entity("Bob", EntityType::Tool)
1437 .await
1438 .unwrap()
1439 .unwrap();
1440 assert_eq!(entity.name, "Bob");
1441 assert_eq!(entity.entity_type, EntityType::Tool);
1442 }
1443
1444 #[tokio::test]
1445 async fn find_entity_not_found() {
1446 let gs = setup().await;
1447 let result = gs.find_entity("Nobody", EntityType::Person).await.unwrap();
1448 assert!(result.is_none());
1449 }
1450
1451 #[tokio::test]
1452 async fn find_entities_fuzzy_partial_match() {
1453 let gs = setup().await;
1454 gs.upsert_entity("GraphQL", "GraphQL", EntityType::Concept, None)
1455 .await
1456 .unwrap();
1457 gs.upsert_entity("Graph", "Graph", EntityType::Concept, None)
1458 .await
1459 .unwrap();
1460 gs.upsert_entity("Unrelated", "Unrelated", EntityType::Concept, None)
1461 .await
1462 .unwrap();
1463
1464 let results = gs.find_entities_fuzzy("graph", 10).await.unwrap();
1465 assert_eq!(results.len(), 2);
1466 assert!(results.iter().any(|e| e.name == "GraphQL"));
1467 assert!(results.iter().any(|e| e.name == "Graph"));
1468 }
1469
1470 #[tokio::test]
1471 async fn entity_count_empty() {
1472 let gs = setup().await;
1473 assert_eq!(gs.entity_count().await.unwrap(), 0);
1474 }
1475
1476 #[tokio::test]
1477 async fn entity_count_non_empty() {
1478 let gs = setup().await;
1479 gs.upsert_entity("A", "A", EntityType::Concept, None)
1480 .await
1481 .unwrap();
1482 gs.upsert_entity("B", "B", EntityType::Concept, None)
1483 .await
1484 .unwrap();
1485 assert_eq!(gs.entity_count().await.unwrap(), 2);
1486 }
1487
1488 #[tokio::test]
1489 async fn all_entities_and_stream() {
1490 use futures::StreamExt as _;
1491
1492 let gs = setup().await;
1493 gs.upsert_entity("X", "X", EntityType::Project, None)
1494 .await
1495 .unwrap();
1496 gs.upsert_entity("Y", "Y", EntityType::Language, None)
1497 .await
1498 .unwrap();
1499
1500 let all = gs.all_entities().await.unwrap();
1501 assert_eq!(all.len(), 2);
1502 let streamed: Vec<Result<Entity, _>> = gs.all_entities_stream().collect().await;
1503 assert_eq!(streamed.len(), 2);
1504 assert!(streamed.iter().all(Result::is_ok));
1505 }
1506
1507 #[tokio::test]
1508 async fn insert_edge_without_episode() {
1509 let gs = setup().await;
1510 let src = gs
1511 .upsert_entity("Src", "Src", EntityType::Concept, None)
1512 .await
1513 .unwrap();
1514 let tgt = gs
1515 .upsert_entity("Tgt", "Tgt", EntityType::Concept, None)
1516 .await
1517 .unwrap();
1518 let eid = gs
1519 .insert_edge(src, tgt, "relates_to", "Src relates to Tgt", 0.9, None)
1520 .await
1521 .unwrap();
1522 assert!(eid > 0);
1523 }
1524
1525 #[tokio::test]
1526 async fn insert_edge_deduplicates_active_edge() {
1527 let gs = setup().await;
1528 let src = gs
1529 .upsert_entity("Alice", "Alice", EntityType::Person, None)
1530 .await
1531 .unwrap();
1532 let tgt = gs
1533 .upsert_entity("Google", "Google", EntityType::Organization, None)
1534 .await
1535 .unwrap();
1536
1537 let id1 = gs
1538 .insert_edge(src, tgt, "works_at", "Alice works at Google", 0.7, None)
1539 .await
1540 .unwrap();
1541
1542 let id2 = gs
1544 .insert_edge(src, tgt, "works_at", "Alice works at Google", 0.9, None)
1545 .await
1546 .unwrap();
1547 assert_eq!(id1, id2, "duplicate active edge must not be created");
1548
1549 let count: i64 =
1551 sqlx::query_scalar("SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL")
1552 .fetch_one(&gs.pool)
1553 .await
1554 .unwrap();
1555 assert_eq!(count, 1, "only one active edge must exist");
1556
1557 let conf: f64 = sqlx::query_scalar("SELECT confidence FROM graph_edges WHERE id = ?1")
1558 .bind(id1)
1559 .fetch_one(&gs.pool)
1560 .await
1561 .unwrap();
1562 assert!(
1564 (conf - f64::from(0.9_f32)).abs() < 1e-6,
1565 "confidence must be updated to max, got {conf}"
1566 );
1567 }
1568
1569 #[tokio::test]
1570 async fn insert_edge_different_relations_are_distinct() {
1571 let gs = setup().await;
1572 let src = gs
1573 .upsert_entity("Bob", "Bob", EntityType::Person, None)
1574 .await
1575 .unwrap();
1576 let tgt = gs
1577 .upsert_entity("Acme", "Acme", EntityType::Organization, None)
1578 .await
1579 .unwrap();
1580
1581 let id1 = gs
1582 .insert_edge(src, tgt, "founded", "Bob founded Acme", 0.8, None)
1583 .await
1584 .unwrap();
1585 let id2 = gs
1586 .insert_edge(src, tgt, "chairs", "Bob chairs Acme", 0.8, None)
1587 .await
1588 .unwrap();
1589 assert_ne!(id1, id2, "different relations must produce distinct edges");
1590
1591 let count: i64 =
1592 sqlx::query_scalar("SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL")
1593 .fetch_one(&gs.pool)
1594 .await
1595 .unwrap();
1596 assert_eq!(count, 2);
1597 }
1598
1599 #[tokio::test]
1600 async fn insert_edge_with_episode() {
1601 let gs = setup().await;
1602 let src = gs
1603 .upsert_entity("Src2", "Src2", EntityType::Concept, None)
1604 .await
1605 .unwrap();
1606 let tgt = gs
1607 .upsert_entity("Tgt2", "Tgt2", EntityType::Concept, None)
1608 .await
1609 .unwrap();
1610 let episode = MessageId(999);
1616 let result = gs
1617 .insert_edge(src, tgt, "uses", "Src2 uses Tgt2", 1.0, Some(episode))
1618 .await;
1619 match &result {
1620 Ok(eid) => assert!(*eid > 0, "inserted edge should have positive id"),
1621 Err(MemoryError::Sqlite(_)) => {} Err(e) => panic!("unexpected error: {e}"),
1623 }
1624 }
1625
1626 #[tokio::test]
1627 async fn invalidate_edge_sets_timestamps() {
1628 let gs = setup().await;
1629 let src = gs
1630 .upsert_entity("E1", "E1", EntityType::Concept, None)
1631 .await
1632 .unwrap();
1633 let tgt = gs
1634 .upsert_entity("E2", "E2", EntityType::Concept, None)
1635 .await
1636 .unwrap();
1637 let eid = gs
1638 .insert_edge(src, tgt, "r", "fact", 1.0, None)
1639 .await
1640 .unwrap();
1641 gs.invalidate_edge(eid).await.unwrap();
1642
1643 let row: (Option<String>, Option<String>) =
1644 sqlx::query_as("SELECT valid_to, expired_at FROM graph_edges WHERE id = ?1")
1645 .bind(eid)
1646 .fetch_one(&gs.pool)
1647 .await
1648 .unwrap();
1649 assert!(row.0.is_some(), "valid_to should be set");
1650 assert!(row.1.is_some(), "expired_at should be set");
1651 }
1652
1653 #[tokio::test]
1654 async fn edges_for_entity_both_directions() {
1655 let gs = setup().await;
1656 let a = gs
1657 .upsert_entity("A", "A", EntityType::Concept, None)
1658 .await
1659 .unwrap();
1660 let b = gs
1661 .upsert_entity("B", "B", EntityType::Concept, None)
1662 .await
1663 .unwrap();
1664 let c = gs
1665 .upsert_entity("C", "C", EntityType::Concept, None)
1666 .await
1667 .unwrap();
1668 gs.insert_edge(a, b, "r", "f1", 1.0, None).await.unwrap();
1669 gs.insert_edge(c, a, "r", "f2", 1.0, None).await.unwrap();
1670
1671 let edges = gs.edges_for_entity(a).await.unwrap();
1672 assert_eq!(edges.len(), 2);
1673 }
1674
1675 #[tokio::test]
1676 async fn edges_between_both_directions() {
1677 let gs = setup().await;
1678 let a = gs
1679 .upsert_entity("PA", "PA", EntityType::Person, None)
1680 .await
1681 .unwrap();
1682 let b = gs
1683 .upsert_entity("PB", "PB", EntityType::Person, None)
1684 .await
1685 .unwrap();
1686 gs.insert_edge(a, b, "knows", "PA knows PB", 1.0, None)
1687 .await
1688 .unwrap();
1689
1690 let fwd = gs.edges_between(a, b).await.unwrap();
1691 assert_eq!(fwd.len(), 1);
1692 let rev = gs.edges_between(b, a).await.unwrap();
1693 assert_eq!(rev.len(), 1);
1694 }
1695
1696 #[tokio::test]
1697 async fn active_edge_count_excludes_invalidated() {
1698 let gs = setup().await;
1699 let a = gs
1700 .upsert_entity("N1", "N1", EntityType::Concept, None)
1701 .await
1702 .unwrap();
1703 let b = gs
1704 .upsert_entity("N2", "N2", EntityType::Concept, None)
1705 .await
1706 .unwrap();
1707 let e1 = gs.insert_edge(a, b, "r1", "f1", 1.0, None).await.unwrap();
1708 gs.insert_edge(a, b, "r2", "f2", 1.0, None).await.unwrap();
1709 gs.invalidate_edge(e1).await.unwrap();
1710
1711 assert_eq!(gs.active_edge_count().await.unwrap(), 1);
1712 }
1713
1714 #[tokio::test]
1715 async fn upsert_community_insert_and_update() {
1716 let gs = setup().await;
1717 let id1 = gs
1718 .upsert_community("clusterA", "summary A", &[1, 2, 3], None)
1719 .await
1720 .unwrap();
1721 assert!(id1 > 0);
1722 let id2 = gs
1723 .upsert_community("clusterA", "summary A updated", &[1, 2, 3, 4], None)
1724 .await
1725 .unwrap();
1726 assert_eq!(id1, id2);
1727 let communities = gs.all_communities().await.unwrap();
1728 assert_eq!(communities.len(), 1);
1729 assert_eq!(communities[0].summary, "summary A updated");
1730 assert_eq!(communities[0].entity_ids, vec![1, 2, 3, 4]);
1731 }
1732
1733 #[tokio::test]
1734 async fn community_for_entity_found() {
1735 let gs = setup().await;
1736 let a = gs
1737 .upsert_entity("CA", "CA", EntityType::Concept, None)
1738 .await
1739 .unwrap();
1740 let b = gs
1741 .upsert_entity("CB", "CB", EntityType::Concept, None)
1742 .await
1743 .unwrap();
1744 gs.upsert_community("cA", "summary", &[a, b], None)
1745 .await
1746 .unwrap();
1747 let result = gs.community_for_entity(a).await.unwrap();
1748 assert!(result.is_some());
1749 assert_eq!(result.unwrap().name, "cA");
1750 }
1751
1752 #[tokio::test]
1753 async fn community_for_entity_not_found() {
1754 let gs = setup().await;
1755 let result = gs.community_for_entity(999).await.unwrap();
1756 assert!(result.is_none());
1757 }
1758
1759 #[tokio::test]
1760 async fn community_count() {
1761 let gs = setup().await;
1762 assert_eq!(gs.community_count().await.unwrap(), 0);
1763 gs.upsert_community("c1", "s1", &[], None).await.unwrap();
1764 gs.upsert_community("c2", "s2", &[], None).await.unwrap();
1765 assert_eq!(gs.community_count().await.unwrap(), 2);
1766 }
1767
1768 #[tokio::test]
1769 async fn metadata_get_set_round_trip() {
1770 let gs = setup().await;
1771 assert_eq!(gs.get_metadata("counter").await.unwrap(), None);
1772 gs.set_metadata("counter", "42").await.unwrap();
1773 assert_eq!(gs.get_metadata("counter").await.unwrap(), Some("42".into()));
1774 gs.set_metadata("counter", "43").await.unwrap();
1775 assert_eq!(gs.get_metadata("counter").await.unwrap(), Some("43".into()));
1776 }
1777
1778 #[tokio::test]
1779 async fn bfs_max_hops_0_returns_only_start() {
1780 let gs = setup().await;
1781 let a = gs
1782 .upsert_entity("BfsA", "BfsA", EntityType::Concept, None)
1783 .await
1784 .unwrap();
1785 let b = gs
1786 .upsert_entity("BfsB", "BfsB", EntityType::Concept, None)
1787 .await
1788 .unwrap();
1789 gs.insert_edge(a, b, "r", "f", 1.0, None).await.unwrap();
1790
1791 let (entities, edges) = gs.bfs(a, 0).await.unwrap();
1792 assert_eq!(entities.len(), 1);
1793 assert_eq!(entities[0].id, a);
1794 assert!(edges.is_empty());
1795 }
1796
1797 #[tokio::test]
1798 async fn bfs_max_hops_2_chain() {
1799 let gs = setup().await;
1800 let a = gs
1801 .upsert_entity("ChainA", "ChainA", EntityType::Concept, None)
1802 .await
1803 .unwrap();
1804 let b = gs
1805 .upsert_entity("ChainB", "ChainB", EntityType::Concept, None)
1806 .await
1807 .unwrap();
1808 let c = gs
1809 .upsert_entity("ChainC", "ChainC", EntityType::Concept, None)
1810 .await
1811 .unwrap();
1812 gs.insert_edge(a, b, "r", "f1", 1.0, None).await.unwrap();
1813 gs.insert_edge(b, c, "r", "f2", 1.0, None).await.unwrap();
1814
1815 let (entities, edges) = gs.bfs(a, 2).await.unwrap();
1816 let ids: Vec<_> = entities.iter().map(|e| e.id).collect();
1817 assert!(ids.contains(&a));
1818 assert!(ids.contains(&b));
1819 assert!(ids.contains(&c));
1820 assert_eq!(edges.len(), 2);
1821 }
1822
1823 #[tokio::test]
1824 async fn bfs_cycle_no_infinite_loop() {
1825 let gs = setup().await;
1826 let a = gs
1827 .upsert_entity("CycA", "CycA", EntityType::Concept, None)
1828 .await
1829 .unwrap();
1830 let b = gs
1831 .upsert_entity("CycB", "CycB", EntityType::Concept, None)
1832 .await
1833 .unwrap();
1834 gs.insert_edge(a, b, "r", "f1", 1.0, None).await.unwrap();
1835 gs.insert_edge(b, a, "r", "f2", 1.0, None).await.unwrap();
1836
1837 let (entities, _edges) = gs.bfs(a, 3).await.unwrap();
1838 let ids: Vec<_> = entities.iter().map(|e| e.id).collect();
1839 assert!(ids.contains(&a));
1841 assert!(ids.contains(&b));
1842 assert_eq!(ids.len(), 2);
1843 }
1844
1845 #[tokio::test]
1846 async fn test_invalidated_edges_excluded_from_bfs() {
1847 let gs = setup().await;
1848 let a = gs
1849 .upsert_entity("InvA", "InvA", EntityType::Concept, None)
1850 .await
1851 .unwrap();
1852 let b = gs
1853 .upsert_entity("InvB", "InvB", EntityType::Concept, None)
1854 .await
1855 .unwrap();
1856 let c = gs
1857 .upsert_entity("InvC", "InvC", EntityType::Concept, None)
1858 .await
1859 .unwrap();
1860 let ab = gs.insert_edge(a, b, "r", "f1", 1.0, None).await.unwrap();
1861 gs.insert_edge(b, c, "r", "f2", 1.0, None).await.unwrap();
1862 gs.invalidate_edge(ab).await.unwrap();
1864
1865 let (entities, edges) = gs.bfs(a, 2).await.unwrap();
1866 let ids: Vec<_> = entities.iter().map(|e| e.id).collect();
1867 assert_eq!(ids, vec![a], "only start entity should be reachable");
1868 assert!(edges.is_empty(), "no active edges should be returned");
1869 }
1870
1871 #[tokio::test]
1872 async fn test_bfs_empty_graph() {
1873 let gs = setup().await;
1874 let a = gs
1875 .upsert_entity("IsoA", "IsoA", EntityType::Concept, None)
1876 .await
1877 .unwrap();
1878
1879 let (entities, edges) = gs.bfs(a, 2).await.unwrap();
1880 let ids: Vec<_> = entities.iter().map(|e| e.id).collect();
1881 assert_eq!(ids, vec![a], "isolated node: only start entity returned");
1882 assert!(edges.is_empty(), "no edges for isolated node");
1883 }
1884
1885 #[tokio::test]
1886 async fn test_bfs_diamond() {
1887 let gs = setup().await;
1888 let a = gs
1889 .upsert_entity("DiamA", "DiamA", EntityType::Concept, None)
1890 .await
1891 .unwrap();
1892 let b = gs
1893 .upsert_entity("DiamB", "DiamB", EntityType::Concept, None)
1894 .await
1895 .unwrap();
1896 let c = gs
1897 .upsert_entity("DiamC", "DiamC", EntityType::Concept, None)
1898 .await
1899 .unwrap();
1900 let d = gs
1901 .upsert_entity("DiamD", "DiamD", EntityType::Concept, None)
1902 .await
1903 .unwrap();
1904 gs.insert_edge(a, b, "r", "f1", 1.0, None).await.unwrap();
1905 gs.insert_edge(a, c, "r", "f2", 1.0, None).await.unwrap();
1906 gs.insert_edge(b, d, "r", "f3", 1.0, None).await.unwrap();
1907 gs.insert_edge(c, d, "r", "f4", 1.0, None).await.unwrap();
1908
1909 let (entities, edges) = gs.bfs(a, 2).await.unwrap();
1910 let mut ids: Vec<_> = entities.iter().map(|e| e.id).collect();
1911 ids.sort_unstable();
1912 let mut expected = vec![a, b, c, d];
1913 expected.sort_unstable();
1914 assert_eq!(ids, expected, "all 4 nodes reachable, no duplicates");
1915 assert_eq!(edges.len(), 4, "all 4 edges returned");
1916 }
1917
1918 #[tokio::test]
1919 async fn extraction_count_default_zero() {
1920 let gs = setup().await;
1921 assert_eq!(gs.extraction_count().await.unwrap(), 0);
1922 }
1923
1924 #[tokio::test]
1925 async fn extraction_count_after_set() {
1926 let gs = setup().await;
1927 gs.set_metadata("extraction_count", "7").await.unwrap();
1928 assert_eq!(gs.extraction_count().await.unwrap(), 7);
1929 }
1930
1931 #[tokio::test]
1932 async fn all_active_edges_stream_excludes_invalidated() {
1933 use futures::TryStreamExt as _;
1934 let gs = setup().await;
1935 let a = gs
1936 .upsert_entity("SA", "SA", EntityType::Concept, None)
1937 .await
1938 .unwrap();
1939 let b = gs
1940 .upsert_entity("SB", "SB", EntityType::Concept, None)
1941 .await
1942 .unwrap();
1943 let c = gs
1944 .upsert_entity("SC", "SC", EntityType::Concept, None)
1945 .await
1946 .unwrap();
1947 let e1 = gs.insert_edge(a, b, "r", "f1", 1.0, None).await.unwrap();
1948 gs.insert_edge(b, c, "r", "f2", 1.0, None).await.unwrap();
1949 gs.invalidate_edge(e1).await.unwrap();
1950
1951 let edges: Vec<_> = gs.all_active_edges_stream().try_collect().await.unwrap();
1952 assert_eq!(edges.len(), 1, "only the active edge should be returned");
1953 assert_eq!(edges[0].source_entity_id, b);
1954 assert_eq!(edges[0].target_entity_id, c);
1955 }
1956
1957 #[tokio::test]
1958 async fn find_community_by_id_found_and_not_found() {
1959 let gs = setup().await;
1960 let cid = gs
1961 .upsert_community("grp", "summary", &[1, 2], None)
1962 .await
1963 .unwrap();
1964 let found = gs.find_community_by_id(cid).await.unwrap();
1965 assert!(found.is_some());
1966 assert_eq!(found.unwrap().name, "grp");
1967
1968 let missing = gs.find_community_by_id(9999).await.unwrap();
1969 assert!(missing.is_none());
1970 }
1971
1972 #[tokio::test]
1973 async fn delete_all_communities_clears_table() {
1974 let gs = setup().await;
1975 gs.upsert_community("c1", "s1", &[1], None).await.unwrap();
1976 gs.upsert_community("c2", "s2", &[2], None).await.unwrap();
1977 assert_eq!(gs.community_count().await.unwrap(), 2);
1978 gs.delete_all_communities().await.unwrap();
1979 assert_eq!(gs.community_count().await.unwrap(), 0);
1980 }
1981
1982 #[tokio::test]
1983 async fn test_find_entities_fuzzy_no_results() {
1984 let gs = setup().await;
1985 gs.upsert_entity("Alpha", "Alpha", EntityType::Concept, None)
1986 .await
1987 .unwrap();
1988 let results = gs.find_entities_fuzzy("zzzznonexistent", 10).await.unwrap();
1989 assert!(
1990 results.is_empty(),
1991 "no entities should match an unknown term"
1992 );
1993 }
1994
1995 #[tokio::test]
1998 async fn upsert_entity_stores_canonical_name() {
1999 let gs = setup().await;
2000 gs.upsert_entity("rust", "rust", EntityType::Language, None)
2001 .await
2002 .unwrap();
2003 let entity = gs
2004 .find_entity("rust", EntityType::Language)
2005 .await
2006 .unwrap()
2007 .unwrap();
2008 assert_eq!(entity.canonical_name, "rust");
2009 assert_eq!(entity.name, "rust");
2010 }
2011
2012 #[tokio::test]
2013 async fn add_alias_idempotent() {
2014 let gs = setup().await;
2015 let id = gs
2016 .upsert_entity("rust", "rust", EntityType::Language, None)
2017 .await
2018 .unwrap();
2019 gs.add_alias(id, "rust-lang").await.unwrap();
2020 gs.add_alias(id, "rust-lang").await.unwrap();
2022 let aliases = gs.aliases_for_entity(id).await.unwrap();
2023 assert_eq!(
2024 aliases
2025 .iter()
2026 .filter(|a| a.alias_name == "rust-lang")
2027 .count(),
2028 1
2029 );
2030 }
2031
2032 #[tokio::test]
2035 async fn find_entity_by_id_found() {
2036 let gs = setup().await;
2037 let id = gs
2038 .upsert_entity("FindById", "finbyid", EntityType::Concept, Some("summary"))
2039 .await
2040 .unwrap();
2041 let entity = gs.find_entity_by_id(id).await.unwrap();
2042 assert!(entity.is_some());
2043 let entity = entity.unwrap();
2044 assert_eq!(entity.id, id);
2045 assert_eq!(entity.name, "FindById");
2046 }
2047
2048 #[tokio::test]
2049 async fn find_entity_by_id_not_found() {
2050 let gs = setup().await;
2051 let result = gs.find_entity_by_id(99999).await.unwrap();
2052 assert!(result.is_none());
2053 }
2054
2055 #[tokio::test]
2056 async fn set_entity_qdrant_point_id_updates() {
2057 let gs = setup().await;
2058 let id = gs
2059 .upsert_entity("QdrantPoint", "qdrantpoint", EntityType::Concept, None)
2060 .await
2061 .unwrap();
2062 let point_id = "550e8400-e29b-41d4-a716-446655440000";
2063 gs.set_entity_qdrant_point_id(id, point_id).await.unwrap();
2064
2065 let entity = gs.find_entity_by_id(id).await.unwrap().unwrap();
2066 assert_eq!(entity.qdrant_point_id.as_deref(), Some(point_id));
2067 }
2068
2069 #[tokio::test]
2070 async fn find_entities_fuzzy_matches_summary() {
2071 let gs = setup().await;
2072 gs.upsert_entity(
2073 "Rust",
2074 "Rust",
2075 EntityType::Language,
2076 Some("a systems programming language"),
2077 )
2078 .await
2079 .unwrap();
2080 gs.upsert_entity(
2081 "Go",
2082 "Go",
2083 EntityType::Language,
2084 Some("a compiled language by Google"),
2085 )
2086 .await
2087 .unwrap();
2088 let results = gs.find_entities_fuzzy("systems", 10).await.unwrap();
2090 assert_eq!(results.len(), 1);
2091 assert_eq!(results[0].name, "Rust");
2092 }
2093
2094 #[tokio::test]
2095 async fn find_entities_fuzzy_empty_query() {
2096 let gs = setup().await;
2097 gs.upsert_entity("Alpha", "Alpha", EntityType::Concept, None)
2098 .await
2099 .unwrap();
2100 let results = gs.find_entities_fuzzy("", 10).await.unwrap();
2102 assert!(results.is_empty(), "empty query should return no results");
2103 let results = gs.find_entities_fuzzy(" ", 10).await.unwrap();
2105 assert!(
2106 results.is_empty(),
2107 "whitespace query should return no results"
2108 );
2109 }
2110
2111 #[tokio::test]
2112 async fn find_entity_by_alias_case_insensitive() {
2113 let gs = setup().await;
2114 let id = gs
2115 .upsert_entity("rust", "rust", EntityType::Language, None)
2116 .await
2117 .unwrap();
2118 gs.add_alias(id, "rust").await.unwrap();
2119 gs.add_alias(id, "rust-lang").await.unwrap();
2120
2121 let found = gs
2122 .find_entity_by_alias("RUST-LANG", EntityType::Language)
2123 .await
2124 .unwrap();
2125 assert!(found.is_some());
2126 assert_eq!(found.unwrap().id, id);
2127 }
2128
2129 #[tokio::test]
2130 async fn find_entity_by_alias_returns_none_for_unknown() {
2131 let gs = setup().await;
2132 let id = gs
2133 .upsert_entity("rust", "rust", EntityType::Language, None)
2134 .await
2135 .unwrap();
2136 gs.add_alias(id, "rust").await.unwrap();
2137
2138 let found = gs
2139 .find_entity_by_alias("python", EntityType::Language)
2140 .await
2141 .unwrap();
2142 assert!(found.is_none());
2143 }
2144
2145 #[tokio::test]
2146 async fn find_entity_by_alias_filters_by_entity_type() {
2147 let gs = setup().await;
2149 let lang_id = gs
2150 .upsert_entity("python", "python", EntityType::Language, None)
2151 .await
2152 .unwrap();
2153 gs.add_alias(lang_id, "python").await.unwrap();
2154
2155 let found_tool = gs
2156 .find_entity_by_alias("python", EntityType::Tool)
2157 .await
2158 .unwrap();
2159 assert!(
2160 found_tool.is_none(),
2161 "cross-type alias collision must not occur"
2162 );
2163
2164 let found_lang = gs
2165 .find_entity_by_alias("python", EntityType::Language)
2166 .await
2167 .unwrap();
2168 assert!(found_lang.is_some());
2169 assert_eq!(found_lang.unwrap().id, lang_id);
2170 }
2171
2172 #[tokio::test]
2173 async fn aliases_for_entity_returns_all() {
2174 let gs = setup().await;
2175 let id = gs
2176 .upsert_entity("rust", "rust", EntityType::Language, None)
2177 .await
2178 .unwrap();
2179 gs.add_alias(id, "rust").await.unwrap();
2180 gs.add_alias(id, "rust-lang").await.unwrap();
2181 gs.add_alias(id, "rustlang").await.unwrap();
2182
2183 let aliases = gs.aliases_for_entity(id).await.unwrap();
2184 assert_eq!(aliases.len(), 3);
2185 let names: Vec<&str> = aliases.iter().map(|a| a.alias_name.as_str()).collect();
2186 assert!(names.contains(&"rust"));
2187 assert!(names.contains(&"rust-lang"));
2188 assert!(names.contains(&"rustlang"));
2189 }
2190
2191 #[tokio::test]
2192 async fn find_entities_fuzzy_includes_aliases() {
2193 let gs = setup().await;
2194 let id = gs
2195 .upsert_entity("rust", "rust", EntityType::Language, None)
2196 .await
2197 .unwrap();
2198 gs.add_alias(id, "rust-lang").await.unwrap();
2199 gs.upsert_entity("python", "python", EntityType::Language, None)
2200 .await
2201 .unwrap();
2202
2203 let results = gs.find_entities_fuzzy("rust-lang", 10).await.unwrap();
2205 assert!(!results.is_empty());
2206 assert!(results.iter().any(|e| e.id == id));
2207 }
2208
2209 #[tokio::test]
2210 async fn orphan_alias_cleanup_on_entity_delete() {
2211 let gs = setup().await;
2212 let id = gs
2213 .upsert_entity("rust", "rust", EntityType::Language, None)
2214 .await
2215 .unwrap();
2216 gs.add_alias(id, "rust").await.unwrap();
2217 gs.add_alias(id, "rust-lang").await.unwrap();
2218
2219 sqlx::query("DELETE FROM graph_entities WHERE id = ?1")
2221 .bind(id)
2222 .execute(&gs.pool)
2223 .await
2224 .unwrap();
2225
2226 let aliases = gs.aliases_for_entity(id).await.unwrap();
2228 assert!(
2229 aliases.is_empty(),
2230 "aliases should cascade-delete with entity"
2231 );
2232 }
2233
2234 #[tokio::test]
2244 async fn migration_024_backfill_preserves_entities_and_edges() {
2245 use sqlx::Acquire as _;
2246 use sqlx::ConnectOptions as _;
2247 use sqlx::sqlite::SqliteConnectOptions;
2248
2249 let opts = SqliteConnectOptions::from_url(&"sqlite::memory:".parse().unwrap())
2252 .unwrap()
2253 .foreign_keys(true);
2254 let pool = sqlx::pool::PoolOptions::<sqlx::Sqlite>::new()
2255 .max_connections(1)
2256 .connect_with(opts)
2257 .await
2258 .unwrap();
2259
2260 sqlx::query(
2262 "CREATE TABLE graph_entities (
2263 id INTEGER PRIMARY KEY AUTOINCREMENT,
2264 name TEXT NOT NULL,
2265 entity_type TEXT NOT NULL,
2266 summary TEXT,
2267 first_seen_at TEXT NOT NULL DEFAULT (datetime('now')),
2268 last_seen_at TEXT NOT NULL DEFAULT (datetime('now')),
2269 qdrant_point_id TEXT,
2270 UNIQUE(name, entity_type)
2271 )",
2272 )
2273 .execute(&pool)
2274 .await
2275 .unwrap();
2276
2277 sqlx::query(
2278 "CREATE TABLE graph_edges (
2279 id INTEGER PRIMARY KEY AUTOINCREMENT,
2280 source_entity_id INTEGER NOT NULL REFERENCES graph_entities(id) ON DELETE CASCADE,
2281 target_entity_id INTEGER NOT NULL REFERENCES graph_entities(id) ON DELETE CASCADE,
2282 relation TEXT NOT NULL,
2283 fact TEXT NOT NULL,
2284 confidence REAL NOT NULL DEFAULT 1.0,
2285 valid_from TEXT NOT NULL DEFAULT (datetime('now')),
2286 valid_to TEXT,
2287 created_at TEXT NOT NULL DEFAULT (datetime('now')),
2288 expired_at TEXT,
2289 episode_id INTEGER,
2290 qdrant_point_id TEXT
2291 )",
2292 )
2293 .execute(&pool)
2294 .await
2295 .unwrap();
2296
2297 sqlx::query(
2299 "CREATE VIRTUAL TABLE IF NOT EXISTS graph_entities_fts USING fts5(
2300 name, summary, content='graph_entities', content_rowid='id',
2301 tokenize='unicode61 remove_diacritics 2'
2302 )",
2303 )
2304 .execute(&pool)
2305 .await
2306 .unwrap();
2307 sqlx::query(
2308 "CREATE TRIGGER IF NOT EXISTS graph_entities_fts_insert AFTER INSERT ON graph_entities
2309 BEGIN INSERT INTO graph_entities_fts(rowid, name, summary) VALUES (new.id, new.name, COALESCE(new.summary, '')); END",
2310 )
2311 .execute(&pool)
2312 .await
2313 .unwrap();
2314 sqlx::query(
2315 "CREATE TRIGGER IF NOT EXISTS graph_entities_fts_delete AFTER DELETE ON graph_entities
2316 BEGIN INSERT INTO graph_entities_fts(graph_entities_fts, rowid, name, summary) VALUES ('delete', old.id, old.name, COALESCE(old.summary, '')); END",
2317 )
2318 .execute(&pool)
2319 .await
2320 .unwrap();
2321 sqlx::query(
2322 "CREATE TRIGGER IF NOT EXISTS graph_entities_fts_update AFTER UPDATE ON graph_entities
2323 BEGIN
2324 INSERT INTO graph_entities_fts(graph_entities_fts, rowid, name, summary) VALUES ('delete', old.id, old.name, COALESCE(old.summary, ''));
2325 INSERT INTO graph_entities_fts(rowid, name, summary) VALUES (new.id, new.name, COALESCE(new.summary, ''));
2326 END",
2327 )
2328 .execute(&pool)
2329 .await
2330 .unwrap();
2331
2332 let alice_id: i64 = sqlx::query_scalar(
2334 "INSERT INTO graph_entities (name, entity_type) VALUES ('Alice', 'person') RETURNING id",
2335 )
2336 .fetch_one(&pool)
2337 .await
2338 .unwrap();
2339
2340 let rust_id: i64 = sqlx::query_scalar(
2341 "INSERT INTO graph_entities (name, entity_type) VALUES ('Rust', 'language') RETURNING id",
2342 )
2343 .fetch_one(&pool)
2344 .await
2345 .unwrap();
2346
2347 sqlx::query(
2348 "INSERT INTO graph_edges (source_entity_id, target_entity_id, relation, fact)
2349 VALUES (?1, ?2, 'uses', 'Alice uses Rust')",
2350 )
2351 .bind(alice_id)
2352 .bind(rust_id)
2353 .execute(&pool)
2354 .await
2355 .unwrap();
2356
2357 let mut conn = pool.acquire().await.unwrap();
2361 let conn = conn.acquire().await.unwrap();
2362
2363 sqlx::query("PRAGMA foreign_keys = OFF")
2364 .execute(&mut *conn)
2365 .await
2366 .unwrap();
2367 sqlx::query("ALTER TABLE graph_entities ADD COLUMN canonical_name TEXT")
2368 .execute(&mut *conn)
2369 .await
2370 .unwrap();
2371 sqlx::query("UPDATE graph_entities SET canonical_name = name WHERE canonical_name IS NULL")
2372 .execute(&mut *conn)
2373 .await
2374 .unwrap();
2375 sqlx::query(
2376 "CREATE TABLE graph_entities_new (
2377 id INTEGER PRIMARY KEY AUTOINCREMENT,
2378 name TEXT NOT NULL,
2379 canonical_name TEXT NOT NULL,
2380 entity_type TEXT NOT NULL,
2381 summary TEXT,
2382 first_seen_at TEXT NOT NULL DEFAULT (datetime('now')),
2383 last_seen_at TEXT NOT NULL DEFAULT (datetime('now')),
2384 qdrant_point_id TEXT,
2385 UNIQUE(canonical_name, entity_type)
2386 )",
2387 )
2388 .execute(&mut *conn)
2389 .await
2390 .unwrap();
2391 sqlx::query(
2392 "INSERT INTO graph_entities_new
2393 (id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id)
2394 SELECT id, name, COALESCE(canonical_name, name), entity_type, summary,
2395 first_seen_at, last_seen_at, qdrant_point_id
2396 FROM graph_entities",
2397 )
2398 .execute(&mut *conn)
2399 .await
2400 .unwrap();
2401 sqlx::query("DROP TABLE graph_entities")
2402 .execute(&mut *conn)
2403 .await
2404 .unwrap();
2405 sqlx::query("ALTER TABLE graph_entities_new RENAME TO graph_entities")
2406 .execute(&mut *conn)
2407 .await
2408 .unwrap();
2409 sqlx::query(
2411 "CREATE TRIGGER IF NOT EXISTS graph_entities_fts_insert AFTER INSERT ON graph_entities
2412 BEGIN INSERT INTO graph_entities_fts(rowid, name, summary) VALUES (new.id, new.name, COALESCE(new.summary, '')); END",
2413 )
2414 .execute(&mut *conn)
2415 .await
2416 .unwrap();
2417 sqlx::query(
2418 "CREATE TRIGGER IF NOT EXISTS graph_entities_fts_delete AFTER DELETE ON graph_entities
2419 BEGIN INSERT INTO graph_entities_fts(graph_entities_fts, rowid, name, summary) VALUES ('delete', old.id, old.name, COALESCE(old.summary, '')); END",
2420 )
2421 .execute(&mut *conn)
2422 .await
2423 .unwrap();
2424 sqlx::query(
2425 "CREATE TRIGGER IF NOT EXISTS graph_entities_fts_update AFTER UPDATE ON graph_entities
2426 BEGIN
2427 INSERT INTO graph_entities_fts(graph_entities_fts, rowid, name, summary) VALUES ('delete', old.id, old.name, COALESCE(old.summary, ''));
2428 INSERT INTO graph_entities_fts(rowid, name, summary) VALUES (new.id, new.name, COALESCE(new.summary, ''));
2429 END",
2430 )
2431 .execute(&mut *conn)
2432 .await
2433 .unwrap();
2434 sqlx::query("INSERT INTO graph_entities_fts(graph_entities_fts) VALUES('rebuild')")
2435 .execute(&mut *conn)
2436 .await
2437 .unwrap();
2438 sqlx::query(
2439 "CREATE TABLE graph_entity_aliases (
2440 id INTEGER PRIMARY KEY AUTOINCREMENT,
2441 entity_id INTEGER NOT NULL REFERENCES graph_entities(id) ON DELETE CASCADE,
2442 alias_name TEXT NOT NULL,
2443 created_at TEXT NOT NULL DEFAULT (datetime('now')),
2444 UNIQUE(alias_name, entity_id)
2445 )",
2446 )
2447 .execute(&mut *conn)
2448 .await
2449 .unwrap();
2450 sqlx::query(
2451 "INSERT OR IGNORE INTO graph_entity_aliases (entity_id, alias_name)
2452 SELECT id, name FROM graph_entities",
2453 )
2454 .execute(&mut *conn)
2455 .await
2456 .unwrap();
2457 sqlx::query("PRAGMA foreign_keys = ON")
2458 .execute(&mut *conn)
2459 .await
2460 .unwrap();
2461
2462 let alice_canon: String =
2464 sqlx::query_scalar("SELECT canonical_name FROM graph_entities WHERE id = ?1")
2465 .bind(alice_id)
2466 .fetch_one(&mut *conn)
2467 .await
2468 .unwrap();
2469 assert_eq!(
2470 alice_canon, "Alice",
2471 "canonical_name should equal pre-migration name"
2472 );
2473
2474 let rust_canon: String =
2475 sqlx::query_scalar("SELECT canonical_name FROM graph_entities WHERE id = ?1")
2476 .bind(rust_id)
2477 .fetch_one(&mut *conn)
2478 .await
2479 .unwrap();
2480 assert_eq!(
2481 rust_canon, "Rust",
2482 "canonical_name should equal pre-migration name"
2483 );
2484
2485 let alice_aliases: Vec<String> =
2487 sqlx::query_scalar("SELECT alias_name FROM graph_entity_aliases WHERE entity_id = ?1")
2488 .bind(alice_id)
2489 .fetch_all(&mut *conn)
2490 .await
2491 .unwrap();
2492 assert!(
2493 alice_aliases.contains(&"Alice".to_owned()),
2494 "initial alias should be seeded from entity name"
2495 );
2496
2497 let edge_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM graph_edges")
2499 .fetch_one(&mut *conn)
2500 .await
2501 .unwrap();
2502 assert_eq!(
2503 edge_count, 1,
2504 "graph_edges must survive migration 024 table recreation"
2505 );
2506 }
2507
2508 #[tokio::test]
2509 async fn find_entity_by_alias_same_alias_two_entities_deterministic() {
2510 let gs = setup().await;
2512 let id1 = gs
2513 .upsert_entity("python-v2", "python-v2", EntityType::Language, None)
2514 .await
2515 .unwrap();
2516 let id2 = gs
2517 .upsert_entity("python-v3", "python-v3", EntityType::Language, None)
2518 .await
2519 .unwrap();
2520 gs.add_alias(id1, "python").await.unwrap();
2521 gs.add_alias(id2, "python").await.unwrap();
2522
2523 let found = gs
2525 .find_entity_by_alias("python", EntityType::Language)
2526 .await
2527 .unwrap();
2528 assert!(found.is_some(), "should find an entity by shared alias");
2529 assert_eq!(
2531 found.unwrap().id,
2532 id1,
2533 "first-registered entity should win on shared alias"
2534 );
2535 }
2536
2537 #[tokio::test]
2540 async fn find_entities_fuzzy_special_chars() {
2541 let gs = setup().await;
2542 gs.upsert_entity("Graph", "Graph", EntityType::Concept, None)
2543 .await
2544 .unwrap();
2545 let results = gs.find_entities_fuzzy("graph\"()*:^", 10).await.unwrap();
2547 assert!(results.iter().any(|e| e.name == "Graph"));
2549 }
2550
2551 #[tokio::test]
2552 async fn find_entities_fuzzy_prefix_match() {
2553 let gs = setup().await;
2554 gs.upsert_entity("Graph", "Graph", EntityType::Concept, None)
2555 .await
2556 .unwrap();
2557 gs.upsert_entity("GraphQL", "GraphQL", EntityType::Concept, None)
2558 .await
2559 .unwrap();
2560 gs.upsert_entity("Unrelated", "Unrelated", EntityType::Concept, None)
2561 .await
2562 .unwrap();
2563 let results = gs.find_entities_fuzzy("Gra", 10).await.unwrap();
2565 assert_eq!(results.len(), 2);
2566 assert!(results.iter().any(|e| e.name == "Graph"));
2567 assert!(results.iter().any(|e| e.name == "GraphQL"));
2568 }
2569
2570 #[tokio::test]
2571 async fn find_entities_fuzzy_fts5_operator_injection() {
2572 let gs = setup().await;
2573 gs.upsert_entity("Graph", "Graph", EntityType::Concept, None)
2574 .await
2575 .unwrap();
2576 gs.upsert_entity("Unrelated", "Unrelated", EntityType::Concept, None)
2577 .await
2578 .unwrap();
2579 let results = gs
2584 .find_entities_fuzzy("graph OR unrelated", 10)
2585 .await
2586 .unwrap();
2587 assert!(
2588 results.is_empty(),
2589 "implicit AND of 'graph*' and 'unrelated*' should match no entity"
2590 );
2591 }
2592
2593 #[tokio::test]
2594 async fn find_entities_fuzzy_after_entity_update() {
2595 let gs = setup().await;
2596 gs.upsert_entity(
2598 "Foo",
2599 "Foo",
2600 EntityType::Concept,
2601 Some("initial summary bar"),
2602 )
2603 .await
2604 .unwrap();
2605 gs.upsert_entity(
2607 "Foo",
2608 "Foo",
2609 EntityType::Concept,
2610 Some("updated summary baz"),
2611 )
2612 .await
2613 .unwrap();
2614 let old_results = gs.find_entities_fuzzy("bar", 10).await.unwrap();
2616 assert!(
2617 old_results.is_empty(),
2618 "old summary content should not match after update"
2619 );
2620 let new_results = gs.find_entities_fuzzy("baz", 10).await.unwrap();
2622 assert_eq!(new_results.len(), 1);
2623 assert_eq!(new_results[0].name, "Foo");
2624 }
2625
2626 #[tokio::test]
2627 async fn find_entities_fuzzy_only_special_chars() {
2628 let gs = setup().await;
2629 gs.upsert_entity("Alpha", "Alpha", EntityType::Concept, None)
2630 .await
2631 .unwrap();
2632 let results = gs.find_entities_fuzzy("***", 10).await.unwrap();
2636 assert!(
2637 results.is_empty(),
2638 "only special chars should return no results"
2639 );
2640 let results = gs.find_entities_fuzzy("(((", 10).await.unwrap();
2641 assert!(results.is_empty(), "only parens should return no results");
2642 let results = gs.find_entities_fuzzy("\"\"\"", 10).await.unwrap();
2643 assert!(results.is_empty(), "only quotes should return no results");
2644 }
2645
2646 #[tokio::test]
2649 async fn find_entity_by_name_exact_wins_over_summary_mention() {
2650 let gs = setup().await;
2653 gs.upsert_entity(
2654 "Alice",
2655 "Alice",
2656 EntityType::Person,
2657 Some("A person named Alice"),
2658 )
2659 .await
2660 .unwrap();
2661 gs.upsert_entity(
2663 "Google",
2664 "Google",
2665 EntityType::Organization,
2666 Some("Company where Charlie, Alice, and Bob have worked"),
2667 )
2668 .await
2669 .unwrap();
2670
2671 let results = gs.find_entity_by_name("Alice").await.unwrap();
2672 assert!(!results.is_empty(), "must find at least one entity");
2673 assert_eq!(
2674 results[0].name, "Alice",
2675 "exact name match must come first, not entity with 'Alice' in summary"
2676 );
2677 }
2678
2679 #[tokio::test]
2680 async fn find_entity_by_name_case_insensitive_exact() {
2681 let gs = setup().await;
2682 gs.upsert_entity("Bob", "Bob", EntityType::Person, None)
2683 .await
2684 .unwrap();
2685
2686 let results = gs.find_entity_by_name("bob").await.unwrap();
2687 assert!(!results.is_empty());
2688 assert_eq!(results[0].name, "Bob");
2689 }
2690
2691 #[tokio::test]
2692 async fn find_entity_by_name_falls_back_to_fuzzy_when_no_exact_match() {
2693 let gs = setup().await;
2694 gs.upsert_entity("Charlie", "Charlie", EntityType::Person, None)
2695 .await
2696 .unwrap();
2697
2698 let results = gs.find_entity_by_name("Char").await.unwrap();
2700 assert!(!results.is_empty(), "prefix search must find Charlie");
2701 }
2702
2703 #[tokio::test]
2704 async fn find_entity_by_name_returns_empty_for_unknown() {
2705 let gs = setup().await;
2706 let results = gs.find_entity_by_name("NonExistent").await.unwrap();
2707 assert!(results.is_empty());
2708 }
2709
2710 #[tokio::test]
2711 async fn find_entity_by_name_matches_canonical_name() {
2712 let gs = setup().await;
2714 gs.upsert_entity("Dave (Engineer)", "Dave", EntityType::Person, None)
2716 .await
2717 .unwrap();
2718
2719 let results = gs.find_entity_by_name("Dave").await.unwrap();
2722 assert!(
2723 !results.is_empty(),
2724 "canonical_name match must return entity"
2725 );
2726 assert_eq!(results[0].canonical_name, "Dave");
2727 }
2728
2729 async fn insert_test_message(gs: &GraphStore, content: &str) -> crate::types::MessageId {
2730 let conv_id: i64 =
2732 sqlx::query_scalar("INSERT INTO conversations DEFAULT VALUES RETURNING id")
2733 .fetch_one(&gs.pool)
2734 .await
2735 .unwrap();
2736 let id: i64 = sqlx::query_scalar(
2737 "INSERT INTO messages (conversation_id, role, content) VALUES (?1, 'user', ?2) RETURNING id",
2738 )
2739 .bind(conv_id)
2740 .bind(content)
2741 .fetch_one(&gs.pool)
2742 .await
2743 .unwrap();
2744 crate::types::MessageId(id)
2745 }
2746
2747 #[tokio::test]
2748 async fn unprocessed_messages_for_backfill_returns_unprocessed() {
2749 let gs = setup().await;
2750 let id1 = insert_test_message(&gs, "hello world").await;
2751 let id2 = insert_test_message(&gs, "second message").await;
2752
2753 let rows = gs.unprocessed_messages_for_backfill(10).await.unwrap();
2754 assert_eq!(rows.len(), 2);
2755 assert!(rows.iter().any(|(id, _)| *id == id1));
2756 assert!(rows.iter().any(|(id, _)| *id == id2));
2757 }
2758
2759 #[tokio::test]
2760 async fn unprocessed_messages_for_backfill_respects_limit() {
2761 let gs = setup().await;
2762 insert_test_message(&gs, "msg1").await;
2763 insert_test_message(&gs, "msg2").await;
2764 insert_test_message(&gs, "msg3").await;
2765
2766 let rows = gs.unprocessed_messages_for_backfill(2).await.unwrap();
2767 assert_eq!(rows.len(), 2);
2768 }
2769
2770 #[tokio::test]
2771 async fn mark_messages_graph_processed_updates_flag() {
2772 let gs = setup().await;
2773 let id1 = insert_test_message(&gs, "to process").await;
2774 let _id2 = insert_test_message(&gs, "also to process").await;
2775
2776 let count_before = gs.unprocessed_message_count().await.unwrap();
2778 assert_eq!(count_before, 2);
2779
2780 gs.mark_messages_graph_processed(&[id1]).await.unwrap();
2781
2782 let count_after = gs.unprocessed_message_count().await.unwrap();
2783 assert_eq!(count_after, 1);
2784
2785 let rows = gs.unprocessed_messages_for_backfill(10).await.unwrap();
2787 assert!(!rows.iter().any(|(id, _)| *id == id1));
2788 }
2789
2790 #[tokio::test]
2791 async fn mark_messages_graph_processed_empty_ids_is_noop() {
2792 let gs = setup().await;
2793 insert_test_message(&gs, "message").await;
2794
2795 gs.mark_messages_graph_processed(&[]).await.unwrap();
2796
2797 let count = gs.unprocessed_message_count().await.unwrap();
2798 assert_eq!(count, 1);
2799 }
2800
2801 #[tokio::test]
2802 async fn edges_after_id_first_page_returns_all_within_limit() {
2803 let gs = setup().await;
2804 let a = gs
2805 .upsert_entity("PA", "PA", EntityType::Concept, None)
2806 .await
2807 .unwrap();
2808 let b = gs
2809 .upsert_entity("PB", "PB", EntityType::Concept, None)
2810 .await
2811 .unwrap();
2812 let c = gs
2813 .upsert_entity("PC", "PC", EntityType::Concept, None)
2814 .await
2815 .unwrap();
2816 let e1 = gs.insert_edge(a, b, "r", "f1", 1.0, None).await.unwrap();
2817 let e2 = gs.insert_edge(b, c, "r", "f2", 1.0, None).await.unwrap();
2818 let e3 = gs.insert_edge(a, c, "r", "f3", 1.0, None).await.unwrap();
2819
2820 let page1 = gs.edges_after_id(0, 2).await.unwrap();
2822 assert_eq!(page1.len(), 2);
2823 assert_eq!(page1[0].id, e1);
2824 assert_eq!(page1[1].id, e2);
2825
2826 let page2 = gs
2828 .edges_after_id(page1.last().unwrap().id, 2)
2829 .await
2830 .unwrap();
2831 assert_eq!(page2.len(), 1);
2832 assert_eq!(page2[0].id, e3);
2833
2834 let page3 = gs
2836 .edges_after_id(page2.last().unwrap().id, 2)
2837 .await
2838 .unwrap();
2839 assert!(page3.is_empty(), "no more edges after last id");
2840 }
2841
2842 #[tokio::test]
2843 async fn edges_after_id_skips_invalidated_edges() {
2844 let gs = setup().await;
2845 let a = gs
2846 .upsert_entity("IA", "IA", EntityType::Concept, None)
2847 .await
2848 .unwrap();
2849 let b = gs
2850 .upsert_entity("IB", "IB", EntityType::Concept, None)
2851 .await
2852 .unwrap();
2853 let c = gs
2854 .upsert_entity("IC", "IC", EntityType::Concept, None)
2855 .await
2856 .unwrap();
2857 let e1 = gs.insert_edge(a, b, "r", "f1", 1.0, None).await.unwrap();
2858 let e2 = gs.insert_edge(b, c, "r", "f2", 1.0, None).await.unwrap();
2859
2860 gs.invalidate_edge(e1).await.unwrap();
2862
2863 let page = gs.edges_after_id(0, 10).await.unwrap();
2864 assert_eq!(page.len(), 1, "invalidated edge must be excluded");
2865 assert_eq!(page[0].id, e2);
2866 }
2867
2868 #[tokio::test]
2871 async fn edges_at_timestamp_returns_active_edge() {
2872 let gs = setup().await;
2873 let a = gs
2874 .upsert_entity("TA", "TA", EntityType::Person, None)
2875 .await
2876 .unwrap();
2877 let b = gs
2878 .upsert_entity("TB", "TB", EntityType::Person, None)
2879 .await
2880 .unwrap();
2881 gs.insert_edge(a, b, "knows", "TA knows TB", 1.0, None)
2882 .await
2883 .unwrap();
2884
2885 let edges = gs
2887 .edges_at_timestamp(a, "2099-01-01 00:00:00")
2888 .await
2889 .unwrap();
2890 assert_eq!(edges.len(), 1, "active edge must be visible at future ts");
2891 assert_eq!(edges[0].relation, "knows");
2892 }
2893
2894 #[tokio::test]
2895 async fn edges_at_timestamp_excludes_future_valid_from() {
2896 let gs = setup().await;
2897 let a = gs
2898 .upsert_entity("FA", "FA", EntityType::Person, None)
2899 .await
2900 .unwrap();
2901 let b = gs
2902 .upsert_entity("FB", "FB", EntityType::Person, None)
2903 .await
2904 .unwrap();
2905 sqlx::query(
2907 "INSERT INTO graph_edges (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
2908 VALUES (?1, ?2, 'rel', 'fact', 1.0, '2100-01-01 00:00:00')",
2909 )
2910 .bind(a)
2911 .bind(b)
2912 .execute(gs.pool())
2913 .await
2914 .unwrap();
2915
2916 let edges = gs
2918 .edges_at_timestamp(a, "2026-01-01 00:00:00")
2919 .await
2920 .unwrap();
2921 assert!(
2922 edges.is_empty(),
2923 "edge with future valid_from must not be visible at earlier timestamp"
2924 );
2925 }
2926
2927 #[tokio::test]
2928 async fn edges_at_timestamp_historical_window_visible() {
2929 let gs = setup().await;
2930 let a = gs
2931 .upsert_entity("HA", "HA", EntityType::Person, None)
2932 .await
2933 .unwrap();
2934 let b = gs
2935 .upsert_entity("HB", "HB", EntityType::Person, None)
2936 .await
2937 .unwrap();
2938 sqlx::query(
2940 "INSERT INTO graph_edges
2941 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from, valid_to, expired_at)
2942 VALUES (?1, ?2, 'managed', 'HA managed HB', 0.8,
2943 '2020-01-01 00:00:00', '2021-01-01 00:00:00', '2021-01-01 00:00:00')",
2944 )
2945 .bind(a)
2946 .bind(b)
2947 .execute(gs.pool())
2948 .await
2949 .unwrap();
2950
2951 let during = gs
2953 .edges_at_timestamp(a, "2020-06-01 00:00:00")
2954 .await
2955 .unwrap();
2956 assert_eq!(
2957 during.len(),
2958 1,
2959 "expired edge must be visible during its validity window"
2960 );
2961
2962 let before = gs
2964 .edges_at_timestamp(a, "2019-01-01 00:00:00")
2965 .await
2966 .unwrap();
2967 assert!(
2968 before.is_empty(),
2969 "edge must not be visible before valid_from"
2970 );
2971
2972 let after = gs
2974 .edges_at_timestamp(a, "2026-01-01 00:00:00")
2975 .await
2976 .unwrap();
2977 assert!(
2978 after.is_empty(),
2979 "expired edge must not be visible after valid_to"
2980 );
2981 }
2982
2983 #[tokio::test]
2984 async fn edges_at_timestamp_entity_as_target() {
2985 let gs = setup().await;
2986 let src = gs
2987 .upsert_entity("SRC", "SRC", EntityType::Person, None)
2988 .await
2989 .unwrap();
2990 let tgt = gs
2991 .upsert_entity("TGT", "TGT", EntityType::Person, None)
2992 .await
2993 .unwrap();
2994 gs.insert_edge(src, tgt, "links", "SRC links TGT", 0.9, None)
2995 .await
2996 .unwrap();
2997
2998 let edges = gs
3000 .edges_at_timestamp(tgt, "2099-01-01 00:00:00")
3001 .await
3002 .unwrap();
3003 assert_eq!(
3004 edges.len(),
3005 1,
3006 "edge must be found when querying by target entity_id"
3007 );
3008 }
3009
3010 #[tokio::test]
3011 async fn bfs_at_timestamp_excludes_expired_edges() {
3012 let gs = setup().await;
3013 let a = gs
3014 .upsert_entity("BA", "BA", EntityType::Person, None)
3015 .await
3016 .unwrap();
3017 let b = gs
3018 .upsert_entity("BB", "BB", EntityType::Person, None)
3019 .await
3020 .unwrap();
3021 let c = gs
3022 .upsert_entity("BC", "BC", EntityType::Concept, None)
3023 .await
3024 .unwrap();
3025
3026 sqlx::query(
3028 "INSERT INTO graph_edges
3029 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3030 VALUES (?1, ?2, 'knows', 'BA knows BB', 1.0, '2019-01-01 00:00:00')",
3031 )
3032 .bind(a)
3033 .bind(b)
3034 .execute(gs.pool())
3035 .await
3036 .unwrap();
3037 sqlx::query(
3039 "INSERT INTO graph_edges
3040 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from, valid_to, expired_at)
3041 VALUES (?1, ?2, 'used', 'BB used BC', 0.9,
3042 '2020-01-01 00:00:00', '2021-01-01 00:00:00', '2021-01-01 00:00:00')",
3043 )
3044 .bind(b)
3045 .bind(c)
3046 .execute(gs.pool())
3047 .await
3048 .unwrap();
3049
3050 let (entities, _edges, depth_map) = gs
3052 .bfs_at_timestamp(a, 3, "2026-01-01 00:00:00")
3053 .await
3054 .unwrap();
3055 let entity_ids: Vec<i64> = entities.iter().map(|e| e.id).collect();
3056 assert!(
3057 depth_map.contains_key(&a),
3058 "start entity must be in depth_map"
3059 );
3060 assert!(
3061 depth_map.contains_key(&b),
3062 "B should be reachable via active A→B edge"
3063 );
3064 assert!(
3065 !entity_ids.contains(&c),
3066 "C must not be reachable at 2026 because B→C expired in 2021"
3067 );
3068
3069 let (_entities2, _edges2, depth_map2) = gs
3071 .bfs_at_timestamp(a, 3, "2020-06-01 00:00:00")
3072 .await
3073 .unwrap();
3074 assert!(
3075 depth_map2.contains_key(&c),
3076 "C should be reachable at 2020-06-01 when B→C was valid"
3077 );
3078 }
3079
3080 #[tokio::test]
3081 async fn edge_history_returns_all_versions_ordered() {
3082 let gs = setup().await;
3083 let src = gs
3084 .upsert_entity("ESrc", "ESrc", EntityType::Person, None)
3085 .await
3086 .unwrap();
3087 let tgt = gs
3088 .upsert_entity("ETgt", "ETgt", EntityType::Organization, None)
3089 .await
3090 .unwrap();
3091
3092 sqlx::query(
3094 "INSERT INTO graph_edges
3095 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from, valid_to, expired_at)
3096 VALUES (?1, ?2, 'works_at', 'ESrc works at CompanyA', 0.9,
3097 '2020-01-01 00:00:00', '2022-01-01 00:00:00', '2022-01-01 00:00:00')",
3098 )
3099 .bind(src)
3100 .bind(tgt)
3101 .execute(gs.pool())
3102 .await
3103 .unwrap();
3104 sqlx::query(
3106 "INSERT INTO graph_edges
3107 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3108 VALUES (?1, ?2, 'works_at', 'ESrc works at CompanyB', 0.95, '2022-01-01 00:00:00')",
3109 )
3110 .bind(src)
3111 .bind(tgt)
3112 .execute(gs.pool())
3113 .await
3114 .unwrap();
3115
3116 let history = gs.edge_history(src, "works at", None, 100).await.unwrap();
3118 assert_eq!(history.len(), 2, "both edge versions must be returned");
3119 assert!(
3121 history[0].valid_from >= history[1].valid_from,
3122 "results must be ordered by valid_from DESC"
3123 );
3124
3125 let filtered = gs
3127 .edge_history(src, "works at", Some("works_at"), 100)
3128 .await
3129 .unwrap();
3130 assert_eq!(
3131 filtered.len(),
3132 2,
3133 "relation filter must retain both versions"
3134 );
3135
3136 let empty = gs
3138 .edge_history(src, "nonexistent_predicate_xyz", None, 100)
3139 .await
3140 .unwrap();
3141 assert!(empty.is_empty(), "non-matching predicate must return empty");
3142 }
3143
3144 #[tokio::test]
3145 async fn edge_history_like_escaping() {
3146 let gs = setup().await;
3147 let src = gs
3148 .upsert_entity("EscSrc", "EscSrc", EntityType::Person, None)
3149 .await
3150 .unwrap();
3151 let tgt = gs
3152 .upsert_entity("EscTgt", "EscTgt", EntityType::Concept, None)
3153 .await
3154 .unwrap();
3155
3156 gs.insert_edge(src, tgt, "ref", "plain text fact no wildcards", 1.0, None)
3158 .await
3159 .unwrap();
3160
3161 let results = gs.edge_history(src, "%", None, 100).await.unwrap();
3164 assert!(
3165 results.is_empty(),
3166 "LIKE wildcard '%' in predicate must be escaped and not match all edges"
3167 );
3168
3169 let results_underscore = gs.edge_history(src, "_", None, 100).await.unwrap();
3172 assert!(
3173 results_underscore.is_empty(),
3174 "LIKE wildcard '_' in predicate must be escaped and not match single-char substrings"
3175 );
3176 }
3177
3178 #[tokio::test]
3179 async fn invalidate_edge_sets_valid_to_and_expired_at() {
3180 let gs = setup().await;
3181 let a = gs
3182 .upsert_entity("InvA", "InvA", EntityType::Person, None)
3183 .await
3184 .unwrap();
3185 let b = gs
3186 .upsert_entity("InvB", "InvB", EntityType::Person, None)
3187 .await
3188 .unwrap();
3189 let edge_id = gs
3190 .insert_edge(a, b, "rel", "InvA rel InvB", 1.0, None)
3191 .await
3192 .unwrap();
3193
3194 let active_edge: (Option<String>, Option<String>) =
3196 sqlx::query_as("SELECT valid_to, expired_at FROM graph_edges WHERE id = ?1")
3197 .bind(edge_id)
3198 .fetch_one(gs.pool())
3199 .await
3200 .unwrap();
3201 assert!(
3202 active_edge.0.is_none(),
3203 "valid_to must be NULL before invalidation"
3204 );
3205 assert!(
3206 active_edge.1.is_none(),
3207 "expired_at must be NULL before invalidation"
3208 );
3209
3210 gs.invalidate_edge(edge_id).await.unwrap();
3211
3212 let dead_edge: (Option<String>, Option<String>) =
3214 sqlx::query_as("SELECT valid_to, expired_at FROM graph_edges WHERE id = ?1")
3215 .bind(edge_id)
3216 .fetch_one(gs.pool())
3217 .await
3218 .unwrap();
3219 assert!(
3220 dead_edge.0.is_some(),
3221 "valid_to must be set after invalidation"
3222 );
3223 assert!(
3224 dead_edge.1.is_some(),
3225 "expired_at must be set after invalidation"
3226 );
3227 }
3228
3229 #[tokio::test]
3234 async fn edges_at_timestamp_valid_from_inclusive() {
3235 let gs = setup().await;
3236 let a = gs
3237 .upsert_entity("VFI_A", "VFI_A", EntityType::Person, None)
3238 .await
3239 .unwrap();
3240 let b = gs
3241 .upsert_entity("VFI_B", "VFI_B", EntityType::Person, None)
3242 .await
3243 .unwrap();
3244 sqlx::query(
3245 "INSERT INTO graph_edges
3246 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3247 VALUES (?1, ?2, 'knows', 'VFI_A knows VFI_B', 1.0, '2025-06-01 00:00:00')",
3248 )
3249 .bind(a)
3250 .bind(b)
3251 .execute(gs.pool())
3252 .await
3253 .unwrap();
3254
3255 let edges = gs
3257 .edges_at_timestamp(a, "2025-06-01 00:00:00")
3258 .await
3259 .unwrap();
3260 assert_eq!(
3261 edges.len(),
3262 1,
3263 "edge with valid_from == ts must be visible (inclusive boundary)"
3264 );
3265 }
3266
3267 #[tokio::test]
3268 async fn edges_at_timestamp_valid_to_exclusive() {
3269 let gs = setup().await;
3270 let a = gs
3271 .upsert_entity("VTO_A", "VTO_A", EntityType::Person, None)
3272 .await
3273 .unwrap();
3274 let b = gs
3275 .upsert_entity("VTO_B", "VTO_B", EntityType::Person, None)
3276 .await
3277 .unwrap();
3278 sqlx::query(
3279 "INSERT INTO graph_edges
3280 (source_entity_id, target_entity_id, relation, fact, confidence,
3281 valid_from, valid_to, expired_at)
3282 VALUES (?1, ?2, 'knows', 'VTO_A knows VTO_B', 1.0,
3283 '2020-01-01 00:00:00', '2025-06-01 00:00:00', '2025-06-01 00:00:00')",
3284 )
3285 .bind(a)
3286 .bind(b)
3287 .execute(gs.pool())
3288 .await
3289 .unwrap();
3290
3291 let at_boundary = gs
3293 .edges_at_timestamp(a, "2025-06-01 00:00:00")
3294 .await
3295 .unwrap();
3296 assert!(
3297 at_boundary.is_empty(),
3298 "edge with valid_to == ts must NOT be visible (exclusive upper boundary)"
3299 );
3300
3301 let before_boundary = gs
3303 .edges_at_timestamp(a, "2025-05-31 23:59:59")
3304 .await
3305 .unwrap();
3306 assert_eq!(
3307 before_boundary.len(),
3308 1,
3309 "edge must be visible one second before valid_to"
3310 );
3311 }
3312
3313 #[tokio::test]
3314 async fn edges_at_timestamp_multiple_edges_same_entity() {
3315 let gs = setup().await;
3316 let a = gs
3317 .upsert_entity("ME_A", "ME_A", EntityType::Person, None)
3318 .await
3319 .unwrap();
3320 let b = gs
3321 .upsert_entity("ME_B", "ME_B", EntityType::Person, None)
3322 .await
3323 .unwrap();
3324 let c = gs
3325 .upsert_entity("ME_C", "ME_C", EntityType::Person, None)
3326 .await
3327 .unwrap();
3328 let d = gs
3329 .upsert_entity("ME_D", "ME_D", EntityType::Person, None)
3330 .await
3331 .unwrap();
3332
3333 sqlx::query(
3335 "INSERT INTO graph_edges
3336 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3337 VALUES (?1, ?2, 'knows', 'ME_A knows ME_B', 1.0, '2020-01-01 00:00:00')",
3338 )
3339 .bind(a)
3340 .bind(b)
3341 .execute(gs.pool())
3342 .await
3343 .unwrap();
3344 sqlx::query(
3346 "INSERT INTO graph_edges
3347 (source_entity_id, target_entity_id, relation, fact, confidence,
3348 valid_from, valid_to, expired_at)
3349 VALUES (?1, ?2, 'knows', 'ME_A knows ME_C', 1.0,
3350 '2020-01-01 00:00:00', '2023-01-01 00:00:00', '2023-01-01 00:00:00')",
3351 )
3352 .bind(a)
3353 .bind(c)
3354 .execute(gs.pool())
3355 .await
3356 .unwrap();
3357 sqlx::query(
3359 "INSERT INTO graph_edges
3360 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3361 VALUES (?1, ?2, 'knows', 'ME_A knows ME_D', 1.0, '2030-01-01 00:00:00')",
3362 )
3363 .bind(a)
3364 .bind(d)
3365 .execute(gs.pool())
3366 .await
3367 .unwrap();
3368
3369 let edges = gs
3370 .edges_at_timestamp(a, "2025-01-01 00:00:00")
3371 .await
3372 .unwrap();
3373 assert_eq!(
3374 edges.len(),
3375 1,
3376 "only A->B must be visible at 2025 (C expired, D future)"
3377 );
3378 assert_eq!(edges[0].target_entity_id, b);
3379 }
3380
3381 #[tokio::test]
3382 async fn edges_at_timestamp_no_edges_returns_empty() {
3383 let gs = setup().await;
3384 let a = gs
3385 .upsert_entity("NE_A", "NE_A", EntityType::Person, None)
3386 .await
3387 .unwrap();
3388
3389 let edges = gs
3390 .edges_at_timestamp(a, "2025-01-01 00:00:00")
3391 .await
3392 .unwrap();
3393 assert!(
3394 edges.is_empty(),
3395 "entity with no edges must return empty vec"
3396 );
3397 }
3398
3399 #[tokio::test]
3402 async fn edge_history_basic_history() {
3403 let gs = setup().await;
3404 let src = gs
3405 .upsert_entity("EH_Src", "EH_Src", EntityType::Person, None)
3406 .await
3407 .unwrap();
3408 let tgt = gs
3409 .upsert_entity("EH_Tgt", "EH_Tgt", EntityType::Organization, None)
3410 .await
3411 .unwrap();
3412
3413 sqlx::query(
3414 "INSERT INTO graph_edges
3415 (source_entity_id, target_entity_id, relation, fact, confidence,
3416 valid_from, valid_to, expired_at)
3417 VALUES (?1, ?2, 'works_at', 'EH_Src works at OrgA', 0.9,
3418 '2020-01-01 00:00:00', '2022-01-01 00:00:00', '2022-01-01 00:00:00')",
3419 )
3420 .bind(src)
3421 .bind(tgt)
3422 .execute(gs.pool())
3423 .await
3424 .unwrap();
3425 sqlx::query(
3426 "INSERT INTO graph_edges
3427 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3428 VALUES (?1, ?2, 'works_at', 'EH_Src works at OrgB', 0.95, '2022-01-01 00:00:00')",
3429 )
3430 .bind(src)
3431 .bind(tgt)
3432 .execute(gs.pool())
3433 .await
3434 .unwrap();
3435
3436 let history = gs.edge_history(src, "works at", None, 100).await.unwrap();
3437 assert_eq!(history.len(), 2, "both versions must be returned");
3438 assert!(
3439 history[0].valid_from > history[1].valid_from,
3440 "ordered valid_from DESC — versions have distinct timestamps"
3441 );
3442 }
3443
3444 #[tokio::test]
3445 async fn edge_history_for_entity_includes_expired() {
3446 let gs = setup().await;
3447 let a = gs
3448 .upsert_entity("HistA", "HistA", EntityType::Concept, None)
3449 .await
3450 .unwrap();
3451 let b = gs
3452 .upsert_entity("HistB", "HistB", EntityType::Concept, None)
3453 .await
3454 .unwrap();
3455
3456 let e1 = gs
3458 .insert_edge(a, b, "uses", "old fact", 0.8, None)
3459 .await
3460 .unwrap();
3461 gs.invalidate_edge(e1).await.unwrap();
3462 gs.insert_edge(a, b, "uses", "new fact", 0.9, None)
3464 .await
3465 .unwrap();
3466
3467 let history = gs.edge_history_for_entity(a, 10).await.unwrap();
3468 assert_eq!(
3469 history.len(),
3470 2,
3471 "both active and expired edges must appear"
3472 );
3473
3474 let active = history.iter().find(|e| e.valid_to.is_none());
3476 let expired = history.iter().find(|e| e.valid_to.is_some());
3477 assert!(active.is_some(), "active edge must be in history");
3478 assert!(expired.is_some(), "expired edge must be in history");
3479 }
3480
3481 #[tokio::test]
3482 async fn edge_history_for_entity_both_directions() {
3483 let gs = setup().await;
3484 let a = gs
3485 .upsert_entity("DirA", "DirA", EntityType::Concept, None)
3486 .await
3487 .unwrap();
3488 let b = gs
3489 .upsert_entity("DirB", "DirB", EntityType::Concept, None)
3490 .await
3491 .unwrap();
3492 let c = gs
3493 .upsert_entity("DirC", "DirC", EntityType::Concept, None)
3494 .await
3495 .unwrap();
3496
3497 gs.insert_edge(a, b, "r1", "f1", 1.0, None).await.unwrap();
3498 gs.insert_edge(c, a, "r2", "f2", 1.0, None).await.unwrap();
3499
3500 let history = gs.edge_history_for_entity(a, 10).await.unwrap();
3501 assert_eq!(
3502 history.len(),
3503 2,
3504 "both outgoing and incoming edges must appear"
3505 );
3506 assert!(
3507 history
3508 .iter()
3509 .any(|e| e.source_entity_id == a && e.target_entity_id == b)
3510 );
3511 assert!(
3512 history
3513 .iter()
3514 .any(|e| e.source_entity_id == c && e.target_entity_id == a)
3515 );
3516 }
3517
3518 #[tokio::test]
3519 async fn edge_history_for_entity_respects_limit() {
3520 let gs = setup().await;
3521 let a = gs
3522 .upsert_entity("LimA", "LimA", EntityType::Concept, None)
3523 .await
3524 .unwrap();
3525 let b = gs
3526 .upsert_entity("LimB", "LimB", EntityType::Concept, None)
3527 .await
3528 .unwrap();
3529
3530 for i in 0..5u32 {
3531 gs.insert_edge(a, b, &format!("r{i}"), &format!("fact {i}"), 1.0, None)
3532 .await
3533 .unwrap();
3534 }
3535
3536 let history = gs.edge_history_for_entity(a, 2).await.unwrap();
3537 assert_eq!(history.len(), 2, "limit must be respected");
3538 }
3539
3540 #[tokio::test]
3541 async fn edge_history_limit_parameter() {
3542 let gs = setup().await;
3543 let src = gs
3544 .upsert_entity("EHL_Src", "EHL_Src", EntityType::Person, None)
3545 .await
3546 .unwrap();
3547 let tgt = gs
3548 .upsert_entity("EHL_Tgt", "EHL_Tgt", EntityType::Organization, None)
3549 .await
3550 .unwrap();
3551
3552 for (year, rel) in [
3553 (2018i32, "worked_at_v1"),
3554 (2019, "worked_at_v2"),
3555 (2020, "worked_at_v3"),
3556 (2021, "worked_at_v4"),
3557 (2022, "worked_at_v5"),
3558 ] {
3559 let valid_from = format!("{year}-01-01 00:00:00");
3560 sqlx::query(
3561 "INSERT INTO graph_edges
3562 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3563 VALUES (?1, ?2, ?3, 'EHL_Src worked at org', 1.0, ?4)",
3564 )
3565 .bind(src)
3566 .bind(tgt)
3567 .bind(rel)
3568 .bind(valid_from)
3569 .execute(gs.pool())
3570 .await
3571 .unwrap();
3572 }
3573
3574 let all = gs.edge_history(src, "worked at", None, 100).await.unwrap();
3576 assert_eq!(
3577 all.len(),
3578 5,
3579 "all 5 rows must match without limit constraint"
3580 );
3581
3582 let limited = gs.edge_history(src, "worked at", None, 2).await.unwrap();
3583 assert_eq!(limited.len(), 2, "limit=2 must truncate to 2 results");
3584 assert!(
3585 limited[0].valid_from > limited[1].valid_from,
3586 "most recent results first"
3587 );
3588 }
3589
3590 #[tokio::test]
3591 async fn edge_history_non_matching_relation_returns_empty() {
3592 let gs = setup().await;
3593 let src = gs
3594 .upsert_entity("EHR_Src", "EHR_Src", EntityType::Person, None)
3595 .await
3596 .unwrap();
3597 let tgt = gs
3598 .upsert_entity("EHR_Tgt", "EHR_Tgt", EntityType::Organization, None)
3599 .await
3600 .unwrap();
3601
3602 sqlx::query(
3603 "INSERT INTO graph_edges
3604 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3605 VALUES (?1, ?2, 'works_at', 'EHR_Src works at place', 1.0, '2020-01-01 00:00:00')",
3606 )
3607 .bind(src)
3608 .bind(tgt)
3609 .execute(gs.pool())
3610 .await
3611 .unwrap();
3612
3613 let result = gs
3614 .edge_history(src, "works at", Some("lives_in"), 100)
3615 .await
3616 .unwrap();
3617 assert!(
3618 result.is_empty(),
3619 "relation filter with no match must return empty"
3620 );
3621 }
3622
3623 #[tokio::test]
3624 async fn edge_history_empty_entity() {
3625 let gs = setup().await;
3626 let src = gs
3627 .upsert_entity("EHE_Src", "EHE_Src", EntityType::Person, None)
3628 .await
3629 .unwrap();
3630
3631 let result = gs.edge_history(src, "anything", None, 100).await.unwrap();
3632 assert!(
3633 result.is_empty(),
3634 "entity with no edges must return empty history"
3635 );
3636 }
3637
3638 #[tokio::test]
3639 async fn edge_history_fact_substring_filters_subset() {
3640 let gs = setup().await;
3641 let src = gs
3642 .upsert_entity("EHP_Src", "EHP_Src", EntityType::Person, None)
3643 .await
3644 .unwrap();
3645 let tgt = gs
3646 .upsert_entity("EHP_Tgt", "EHP_Tgt", EntityType::Concept, None)
3647 .await
3648 .unwrap();
3649
3650 for (rel, fact) in [
3653 ("uses_lang1", "EHP_Src uses Rust"),
3654 ("uses_lang2", "EHP_Src uses Python"),
3655 ("knows_person", "EHP_Src knows Bob"),
3656 ] {
3657 sqlx::query(
3658 "INSERT INTO graph_edges
3659 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3660 VALUES (?1, ?2, ?3, ?4, 1.0, '2020-01-01 00:00:00')",
3661 )
3662 .bind(src)
3663 .bind(tgt)
3664 .bind(rel)
3665 .bind(fact)
3666 .execute(gs.pool())
3667 .await
3668 .unwrap();
3669 }
3670
3671 let all = gs.edge_history(src, "EHP_Src", None, 100).await.unwrap();
3673 assert_eq!(all.len(), 3, "broad predicate must return all 3 facts");
3674
3675 let filtered = gs.edge_history(src, "uses", None, 100).await.unwrap();
3677 assert_eq!(
3678 filtered.len(),
3679 2,
3680 "predicate 'uses' must match only the two 'uses' facts"
3681 );
3682 assert!(
3683 filtered.len() < all.len(),
3684 "filtered count must be less than total count"
3685 );
3686 }
3687
3688 #[tokio::test]
3691 async fn bfs_at_timestamp_zero_hops() {
3692 let gs = setup().await;
3693 let a = gs
3694 .upsert_entity("ZH_A", "ZH_A", EntityType::Person, None)
3695 .await
3696 .unwrap();
3697 let b = gs
3698 .upsert_entity("ZH_B", "ZH_B", EntityType::Person, None)
3699 .await
3700 .unwrap();
3701 sqlx::query(
3703 "INSERT INTO graph_edges
3704 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3705 VALUES (?1, ?2, 'knows', 'ZH_A knows ZH_B', 1.0, '2020-01-01 00:00:00')",
3706 )
3707 .bind(a)
3708 .bind(b)
3709 .execute(gs.pool())
3710 .await
3711 .unwrap();
3712
3713 let (_entities, edges, depth_map) = gs
3714 .bfs_at_timestamp(a, 0, "2025-01-01 00:00:00")
3715 .await
3716 .unwrap();
3717 assert!(
3718 depth_map.contains_key(&a),
3719 "start entity must be in depth_map"
3720 );
3721 assert_eq!(depth_map.len(), 1, "depth=0 must include only start entity");
3722 assert!(edges.is_empty(), "depth=0 must return no edges");
3723 }
3724
3725 #[tokio::test]
3726 async fn bfs_at_timestamp_expired_intermediate_blocks() {
3727 let gs = setup().await;
3728 let a = gs
3729 .upsert_entity("EI_A", "EI_A", EntityType::Person, None)
3730 .await
3731 .unwrap();
3732 let b = gs
3733 .upsert_entity("EI_B", "EI_B", EntityType::Person, None)
3734 .await
3735 .unwrap();
3736 let c = gs
3737 .upsert_entity("EI_C", "EI_C", EntityType::Person, None)
3738 .await
3739 .unwrap();
3740
3741 sqlx::query(
3743 "INSERT INTO graph_edges
3744 (source_entity_id, target_entity_id, relation, fact, confidence,
3745 valid_from, valid_to, expired_at)
3746 VALUES (?1, ?2, 'link', 'EI_A link EI_B', 1.0,
3747 '2020-01-01 00:00:00', '2022-01-01 00:00:00', '2022-01-01 00:00:00')",
3748 )
3749 .bind(a)
3750 .bind(b)
3751 .execute(gs.pool())
3752 .await
3753 .unwrap();
3754 sqlx::query(
3756 "INSERT INTO graph_edges
3757 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3758 VALUES (?1, ?2, 'link', 'EI_B link EI_C', 1.0, '2020-01-01 00:00:00')",
3759 )
3760 .bind(b)
3761 .bind(c)
3762 .execute(gs.pool())
3763 .await
3764 .unwrap();
3765
3766 let (entities, _edges, depth_map) = gs
3767 .bfs_at_timestamp(a, 3, "2025-01-01 00:00:00")
3768 .await
3769 .unwrap();
3770 let entity_ids: Vec<i64> = entities.iter().map(|e| e.id).collect();
3771 assert!(
3772 !depth_map.contains_key(&b),
3773 "B must not be reachable (A->B expired)"
3774 );
3775 assert!(
3776 !entity_ids.contains(&c),
3777 "C must not be reachable (blocked by expired A->B)"
3778 );
3779 }
3780
3781 #[tokio::test]
3782 async fn bfs_at_timestamp_disconnected_entity() {
3783 let gs = setup().await;
3784 let a = gs
3785 .upsert_entity("DC_A", "DC_A", EntityType::Person, None)
3786 .await
3787 .unwrap();
3788
3789 let (_entities, edges, depth_map) = gs
3790 .bfs_at_timestamp(a, 3, "2025-01-01 00:00:00")
3791 .await
3792 .unwrap();
3793 assert_eq!(depth_map.len(), 1, "disconnected entity has only itself");
3794 assert!(depth_map.contains_key(&a));
3795 assert!(edges.is_empty(), "disconnected entity has no edges");
3796 }
3797
3798 #[tokio::test]
3799 async fn bfs_at_timestamp_reverse_direction() {
3800 let gs = setup().await;
3801 let a = gs
3802 .upsert_entity("RD_A", "RD_A", EntityType::Person, None)
3803 .await
3804 .unwrap();
3805 let b = gs
3806 .upsert_entity("RD_B", "RD_B", EntityType::Person, None)
3807 .await
3808 .unwrap();
3809
3810 sqlx::query(
3812 "INSERT INTO graph_edges
3813 (source_entity_id, target_entity_id, relation, fact, confidence, valid_from)
3814 VALUES (?1, ?2, 'points_to', 'RD_B points_to RD_A', 1.0, '2020-01-01 00:00:00')",
3815 )
3816 .bind(b)
3817 .bind(a)
3818 .execute(gs.pool())
3819 .await
3820 .unwrap();
3821
3822 let (entities, edges, depth_map) = gs
3823 .bfs_at_timestamp(a, 1, "2099-01-01 00:00:00")
3824 .await
3825 .unwrap();
3826 let entity_ids: Vec<i64> = entities.iter().map(|e| e.id).collect();
3827 assert!(
3828 depth_map.contains_key(&b),
3829 "B must be reachable when BFS traverses reverse direction"
3830 );
3831 assert!(entity_ids.contains(&b), "B must appear in entities vec");
3832 assert!(
3833 edges
3834 .iter()
3835 .any(|e| e.source_entity_id == b && e.target_entity_id == a),
3836 "traversed edge B->A must appear in returned edges"
3837 );
3838 }
3839
3840 #[tokio::test]
3841 async fn bfs_at_timestamp_valid_to_boundary() {
3842 let gs = setup().await;
3843 let a = gs
3844 .upsert_entity("VTB_A", "VTB_A", EntityType::Person, None)
3845 .await
3846 .unwrap();
3847 let b = gs
3848 .upsert_entity("VTB_B", "VTB_B", EntityType::Person, None)
3849 .await
3850 .unwrap();
3851
3852 sqlx::query(
3854 "INSERT INTO graph_edges
3855 (source_entity_id, target_entity_id, relation, fact, confidence,
3856 valid_from, valid_to, expired_at)
3857 VALUES (?1, ?2, 'link', 'VTB_A link VTB_B', 1.0,
3858 '2020-01-01 00:00:00', '2025-06-01 00:00:00', '2025-06-01 00:00:00')",
3859 )
3860 .bind(a)
3861 .bind(b)
3862 .execute(gs.pool())
3863 .await
3864 .unwrap();
3865
3866 let (_entities, _edges, depth_map_at) = gs
3868 .bfs_at_timestamp(a, 1, "2025-06-01 00:00:00")
3869 .await
3870 .unwrap();
3871 assert!(
3872 !depth_map_at.contains_key(&b),
3873 "B must not be reachable when valid_to == ts (exclusive boundary)"
3874 );
3875
3876 let (_entities2, _edges2, depth_map_before) = gs
3878 .bfs_at_timestamp(a, 1, "2025-05-31 23:59:59")
3879 .await
3880 .unwrap();
3881 assert!(
3882 depth_map_before.contains_key(&b),
3883 "B must be reachable one second before valid_to"
3884 );
3885 }
3886}