1use std::collections::HashMap;
5
6use futures::Stream;
7use sqlx::SqlitePool;
8
9use crate::error::MemoryError;
10use crate::sqlite::messages::sanitize_fts5_query;
11use crate::types::MessageId;
12
13use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
14
15pub struct GraphStore {
16 pool: SqlitePool,
17}
18
19impl GraphStore {
20 #[must_use]
21 pub fn new(pool: SqlitePool) -> Self {
22 Self { pool }
23 }
24
25 #[must_use]
26 pub fn pool(&self) -> &SqlitePool {
27 &self.pool
28 }
29
30 pub async fn upsert_entity(
43 &self,
44 surface_name: &str,
45 canonical_name: &str,
46 entity_type: EntityType,
47 summary: Option<&str>,
48 ) -> Result<i64, MemoryError> {
49 let type_str = entity_type.as_str();
50 let id: i64 = sqlx::query_scalar(
51 "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
52 VALUES (?1, ?2, ?3, ?4)
53 ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
54 name = excluded.name,
55 summary = COALESCE(excluded.summary, summary),
56 last_seen_at = datetime('now')
57 RETURNING id",
58 )
59 .bind(surface_name)
60 .bind(canonical_name)
61 .bind(type_str)
62 .bind(summary)
63 .fetch_one(&self.pool)
64 .await?;
65 Ok(id)
66 }
67
68 pub async fn find_entity(
74 &self,
75 canonical_name: &str,
76 entity_type: EntityType,
77 ) -> Result<Option<Entity>, MemoryError> {
78 let type_str = entity_type.as_str();
79 let row: Option<EntityRow> = sqlx::query_as(
80 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
81 FROM graph_entities
82 WHERE canonical_name = ?1 AND entity_type = ?2",
83 )
84 .bind(canonical_name)
85 .bind(type_str)
86 .fetch_optional(&self.pool)
87 .await?;
88 row.map(entity_from_row).transpose()
89 }
90
91 pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
97 let row: Option<EntityRow> = sqlx::query_as(
98 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
99 FROM graph_entities
100 WHERE id = ?1",
101 )
102 .bind(entity_id)
103 .fetch_optional(&self.pool)
104 .await?;
105 row.map(entity_from_row).transpose()
106 }
107
108 pub async fn set_entity_qdrant_point_id(
114 &self,
115 entity_id: i64,
116 point_id: &str,
117 ) -> Result<(), MemoryError> {
118 sqlx::query("UPDATE graph_entities SET qdrant_point_id = ?1 WHERE id = ?2")
119 .bind(point_id)
120 .bind(entity_id)
121 .execute(&self.pool)
122 .await?;
123 Ok(())
124 }
125
126 pub async fn find_entities_fuzzy(
147 &self,
148 query: &str,
149 limit: usize,
150 ) -> Result<Vec<Entity>, MemoryError> {
151 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
155 let query = &query[..query.floor_char_boundary(512)];
156 let sanitized = sanitize_fts5_query(query);
159 if sanitized.is_empty() {
160 return Ok(vec![]);
161 }
162 let fts_query: String = sanitized
163 .split_whitespace()
164 .filter(|t| !FTS5_OPERATORS.contains(t))
165 .map(|t| format!("{t}*"))
166 .collect::<Vec<_>>()
167 .join(" ");
168 if fts_query.is_empty() {
169 return Ok(vec![]);
170 }
171
172 let limit = i64::try_from(limit)?;
173 let rows: Vec<EntityRow> = sqlx::query_as(
176 "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary,
177 e.first_seen_at, e.last_seen_at, e.qdrant_point_id
178 FROM graph_entities_fts fts
179 JOIN graph_entities e ON e.id = fts.rowid
180 WHERE graph_entities_fts MATCH ?1
181 UNION
182 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary,
183 e.first_seen_at, e.last_seen_at, e.qdrant_point_id
184 FROM graph_entity_aliases a
185 JOIN graph_entities e ON e.id = a.entity_id
186 WHERE a.alias_name LIKE ?2 ESCAPE '\\' COLLATE NOCASE
187 LIMIT ?3",
188 )
189 .bind(&fts_query)
190 .bind(format!(
191 "%{}%",
192 query
193 .trim()
194 .replace('\\', "\\\\")
195 .replace('%', "\\%")
196 .replace('_', "\\_")
197 ))
198 .bind(limit)
199 .fetch_all(&self.pool)
200 .await?;
201 rows.into_iter()
202 .map(entity_from_row)
203 .collect::<Result<Vec<_>, _>>()
204 }
205
206 pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
208 use futures::StreamExt as _;
209 sqlx::query_as::<_, EntityRow>(
210 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
211 FROM graph_entities ORDER BY id ASC",
212 )
213 .fetch(&self.pool)
214 .map(|r: Result<EntityRow, sqlx::Error>| {
215 r.map_err(MemoryError::from).and_then(entity_from_row)
216 })
217 }
218
219 pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
227 sqlx::query(
228 "INSERT OR IGNORE INTO graph_entity_aliases (entity_id, alias_name) VALUES (?1, ?2)",
229 )
230 .bind(entity_id)
231 .bind(alias_name)
232 .execute(&self.pool)
233 .await?;
234 Ok(())
235 }
236
237 pub async fn find_entity_by_alias(
245 &self,
246 alias_name: &str,
247 entity_type: EntityType,
248 ) -> Result<Option<Entity>, MemoryError> {
249 let type_str = entity_type.as_str();
250 let row: Option<EntityRow> = sqlx::query_as(
251 "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary,
252 e.first_seen_at, e.last_seen_at, e.qdrant_point_id
253 FROM graph_entity_aliases a
254 JOIN graph_entities e ON e.id = a.entity_id
255 WHERE a.alias_name = ?1 COLLATE NOCASE
256 AND e.entity_type = ?2
257 ORDER BY e.id ASC
258 LIMIT 1",
259 )
260 .bind(alias_name)
261 .bind(type_str)
262 .fetch_optional(&self.pool)
263 .await?;
264 row.map(entity_from_row).transpose()
265 }
266
267 pub async fn aliases_for_entity(
273 &self,
274 entity_id: i64,
275 ) -> Result<Vec<EntityAlias>, MemoryError> {
276 let rows: Vec<AliasRow> = sqlx::query_as(
277 "SELECT id, entity_id, alias_name, created_at
278 FROM graph_entity_aliases
279 WHERE entity_id = ?1
280 ORDER BY id ASC",
281 )
282 .bind(entity_id)
283 .fetch_all(&self.pool)
284 .await?;
285 Ok(rows.into_iter().map(alias_from_row).collect())
286 }
287
288 pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
294 use futures::TryStreamExt as _;
295 self.all_entities_stream().try_collect().await
296 }
297
298 pub async fn entity_count(&self) -> Result<i64, MemoryError> {
304 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM graph_entities")
305 .fetch_one(&self.pool)
306 .await?;
307 Ok(count)
308 }
309
310 pub async fn insert_edge(
328 &self,
329 source_entity_id: i64,
330 target_entity_id: i64,
331 relation: &str,
332 fact: &str,
333 confidence: f32,
334 episode_id: Option<MessageId>,
335 ) -> Result<i64, MemoryError> {
336 self.insert_edge_typed(
337 source_entity_id,
338 target_entity_id,
339 relation,
340 fact,
341 confidence,
342 episode_id,
343 EdgeType::Semantic,
344 )
345 .await
346 }
347
348 #[allow(clippy::too_many_arguments)]
357 pub async fn insert_edge_typed(
358 &self,
359 source_entity_id: i64,
360 target_entity_id: i64,
361 relation: &str,
362 fact: &str,
363 confidence: f32,
364 episode_id: Option<MessageId>,
365 edge_type: EdgeType,
366 ) -> Result<i64, MemoryError> {
367 let confidence = confidence.clamp(0.0, 1.0);
368 let edge_type_str = edge_type.as_str();
369
370 let existing: Option<(i64, f64)> = sqlx::query_as(
371 "SELECT id, confidence FROM graph_edges
372 WHERE source_entity_id = ?1
373 AND target_entity_id = ?2
374 AND relation = ?3
375 AND edge_type = ?4
376 AND valid_to IS NULL
377 LIMIT 1",
378 )
379 .bind(source_entity_id)
380 .bind(target_entity_id)
381 .bind(relation)
382 .bind(edge_type_str)
383 .fetch_optional(&self.pool)
384 .await?;
385
386 if let Some((existing_id, stored_conf)) = existing {
387 let updated_conf = f64::from(confidence).max(stored_conf);
388 sqlx::query("UPDATE graph_edges SET confidence = ?1 WHERE id = ?2")
389 .bind(updated_conf)
390 .bind(existing_id)
391 .execute(&self.pool)
392 .await?;
393 return Ok(existing_id);
394 }
395
396 let episode_raw: Option<i64> = episode_id.map(|m| m.0);
397 let id: i64 = sqlx::query_scalar(
398 "INSERT INTO graph_edges
399 (source_entity_id, target_entity_id, relation, fact, confidence, episode_id, edge_type)
400 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
401 RETURNING id",
402 )
403 .bind(source_entity_id)
404 .bind(target_entity_id)
405 .bind(relation)
406 .bind(fact)
407 .bind(f64::from(confidence))
408 .bind(episode_raw)
409 .bind(edge_type_str)
410 .fetch_one(&self.pool)
411 .await?;
412 Ok(id)
413 }
414
415 pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
421 sqlx::query(
422 "UPDATE graph_edges SET valid_to = datetime('now'), expired_at = datetime('now')
423 WHERE id = ?1",
424 )
425 .bind(edge_id)
426 .execute(&self.pool)
427 .await?;
428 Ok(())
429 }
430
431 pub async fn edges_for_entities(
448 &self,
449 entity_ids: &[i64],
450 edge_types: &[super::types::EdgeType],
451 ) -> Result<Vec<Edge>, MemoryError> {
452 const MAX_BATCH_ENTITIES: usize = 490;
456
457 let mut all_edges: Vec<Edge> = Vec::new();
458
459 for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
460 let edges = self.query_batch_edges(chunk, edge_types).await?;
461 all_edges.extend(edges);
462 }
463
464 Ok(all_edges)
465 }
466
467 async fn query_batch_edges(
471 &self,
472 entity_ids: &[i64],
473 edge_types: &[super::types::EdgeType],
474 ) -> Result<Vec<Edge>, MemoryError> {
475 if entity_ids.is_empty() {
476 return Ok(Vec::new());
477 }
478
479 let placeholders: String = (1..=entity_ids.len())
482 .map(|i| format!("?{i}"))
483 .collect::<Vec<_>>()
484 .join(", ");
485
486 let sql = if edge_types.is_empty() {
487 format!(
488 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
489 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
490 edge_type
491 FROM graph_edges
492 WHERE valid_to IS NULL
493 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders}))"
494 )
495 } else {
496 let type_placeholders: String = (entity_ids.len() + 1
497 ..=entity_ids.len() + edge_types.len())
498 .map(|i| format!("?{i}"))
499 .collect::<Vec<_>>()
500 .join(", ");
501 format!(
502 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
503 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
504 edge_type
505 FROM graph_edges
506 WHERE valid_to IS NULL
507 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders}))
508 AND edge_type IN ({type_placeholders})"
509 )
510 };
511
512 let mut query = sqlx::query_as::<_, EdgeRow>(&sql);
514 for id in entity_ids {
515 query = query.bind(*id);
516 }
517 for et in edge_types {
518 query = query.bind(et.as_str());
519 }
520
521 let rows: Vec<EdgeRow> = query.fetch_all(&self.pool).await?;
522 Ok(rows.into_iter().map(edge_from_row).collect())
523 }
524
525 pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
531 let rows: Vec<EdgeRow> = sqlx::query_as(
532 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
533 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
534 edge_type
535 FROM graph_edges
536 WHERE valid_to IS NULL
537 AND (source_entity_id = ?1 OR target_entity_id = ?1)",
538 )
539 .bind(entity_id)
540 .fetch_all(&self.pool)
541 .await?;
542 Ok(rows.into_iter().map(edge_from_row).collect())
543 }
544
545 pub async fn edge_history_for_entity(
552 &self,
553 entity_id: i64,
554 limit: usize,
555 ) -> Result<Vec<Edge>, MemoryError> {
556 let limit = i64::try_from(limit)?;
557 let rows: Vec<EdgeRow> = sqlx::query_as(
558 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
559 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
560 edge_type
561 FROM graph_edges
562 WHERE source_entity_id = ?1 OR target_entity_id = ?1
563 ORDER BY valid_from DESC
564 LIMIT ?2",
565 )
566 .bind(entity_id)
567 .bind(limit)
568 .fetch_all(&self.pool)
569 .await?;
570 Ok(rows.into_iter().map(edge_from_row).collect())
571 }
572
573 pub async fn edges_between(
579 &self,
580 entity_a: i64,
581 entity_b: i64,
582 ) -> Result<Vec<Edge>, MemoryError> {
583 let rows: Vec<EdgeRow> = sqlx::query_as(
584 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
585 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
586 edge_type
587 FROM graph_edges
588 WHERE valid_to IS NULL
589 AND ((source_entity_id = ?1 AND target_entity_id = ?2)
590 OR (source_entity_id = ?2 AND target_entity_id = ?1))",
591 )
592 .bind(entity_a)
593 .bind(entity_b)
594 .fetch_all(&self.pool)
595 .await?;
596 Ok(rows.into_iter().map(edge_from_row).collect())
597 }
598
599 pub async fn edges_exact(
605 &self,
606 source_entity_id: i64,
607 target_entity_id: i64,
608 ) -> Result<Vec<Edge>, MemoryError> {
609 let rows: Vec<EdgeRow> = sqlx::query_as(
610 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
611 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
612 edge_type
613 FROM graph_edges
614 WHERE valid_to IS NULL
615 AND source_entity_id = ?1
616 AND target_entity_id = ?2",
617 )
618 .bind(source_entity_id)
619 .bind(target_entity_id)
620 .fetch_all(&self.pool)
621 .await?;
622 Ok(rows.into_iter().map(edge_from_row).collect())
623 }
624
625 pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
631 let count: i64 =
632 sqlx::query_scalar("SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL")
633 .fetch_one(&self.pool)
634 .await?;
635 Ok(count)
636 }
637
638 pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
644 let rows: Vec<(String, i64)> = sqlx::query_as(
645 "SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type",
646 )
647 .fetch_all(&self.pool)
648 .await?;
649 Ok(rows)
650 }
651
652 pub async fn upsert_community(
664 &self,
665 name: &str,
666 summary: &str,
667 entity_ids: &[i64],
668 fingerprint: Option<&str>,
669 ) -> Result<i64, MemoryError> {
670 let entity_ids_json = serde_json::to_string(entity_ids)?;
671 let id: i64 = sqlx::query_scalar(
672 "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
673 VALUES (?1, ?2, ?3, ?4)
674 ON CONFLICT(name) DO UPDATE SET
675 summary = excluded.summary,
676 entity_ids = excluded.entity_ids,
677 fingerprint = COALESCE(excluded.fingerprint, fingerprint),
678 updated_at = datetime('now')
679 RETURNING id",
680 )
681 .bind(name)
682 .bind(summary)
683 .bind(entity_ids_json)
684 .bind(fingerprint)
685 .fetch_one(&self.pool)
686 .await?;
687 Ok(id)
688 }
689
690 pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
697 let rows: Vec<(String, i64)> = sqlx::query_as(
698 "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL",
699 )
700 .fetch_all(&self.pool)
701 .await?;
702 Ok(rows.into_iter().collect())
703 }
704
705 pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
711 sqlx::query("DELETE FROM graph_communities WHERE id = ?1")
712 .bind(id)
713 .execute(&self.pool)
714 .await?;
715 Ok(())
716 }
717
718 pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
727 sqlx::query("UPDATE graph_communities SET fingerprint = NULL WHERE id = ?1")
728 .bind(id)
729 .execute(&self.pool)
730 .await?;
731 Ok(())
732 }
733
734 pub async fn community_for_entity(
743 &self,
744 entity_id: i64,
745 ) -> Result<Option<Community>, MemoryError> {
746 let row: Option<CommunityRow> = sqlx::query_as(
747 "SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
748 FROM graph_communities c, json_each(c.entity_ids) j
749 WHERE CAST(j.value AS INTEGER) = ?1
750 LIMIT 1",
751 )
752 .bind(entity_id)
753 .fetch_optional(&self.pool)
754 .await?;
755 match row {
756 Some(row) => {
757 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
758 Ok(Some(Community {
759 id: row.id,
760 name: row.name,
761 summary: row.summary,
762 entity_ids,
763 fingerprint: row.fingerprint,
764 created_at: row.created_at,
765 updated_at: row.updated_at,
766 }))
767 }
768 None => Ok(None),
769 }
770 }
771
772 pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
778 let rows: Vec<CommunityRow> = sqlx::query_as(
779 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
780 FROM graph_communities
781 ORDER BY id ASC",
782 )
783 .fetch_all(&self.pool)
784 .await?;
785
786 rows.into_iter()
787 .map(|row| {
788 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
789 Ok(Community {
790 id: row.id,
791 name: row.name,
792 summary: row.summary,
793 entity_ids,
794 fingerprint: row.fingerprint,
795 created_at: row.created_at,
796 updated_at: row.updated_at,
797 })
798 })
799 .collect()
800 }
801
802 pub async fn community_count(&self) -> Result<i64, MemoryError> {
808 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM graph_communities")
809 .fetch_one(&self.pool)
810 .await?;
811 Ok(count)
812 }
813
814 pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
822 let val: Option<String> =
823 sqlx::query_scalar("SELECT value FROM graph_metadata WHERE key = ?1")
824 .bind(key)
825 .fetch_optional(&self.pool)
826 .await?;
827 Ok(val)
828 }
829
830 pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
836 sqlx::query(
837 "INSERT INTO graph_metadata (key, value) VALUES (?1, ?2)
838 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
839 )
840 .bind(key)
841 .bind(value)
842 .execute(&self.pool)
843 .await?;
844 Ok(())
845 }
846
847 pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
855 let val = self.get_metadata("extraction_count").await?;
856 Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
857 }
858
859 pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
861 use futures::StreamExt as _;
862 sqlx::query_as::<_, EdgeRow>(
863 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
864 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
865 edge_type
866 FROM graph_edges
867 WHERE valid_to IS NULL
868 ORDER BY id ASC",
869 )
870 .fetch(&self.pool)
871 .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
872 }
873
874 pub async fn edges_after_id(
891 &self,
892 after_id: i64,
893 limit: i64,
894 ) -> Result<Vec<Edge>, MemoryError> {
895 let rows: Vec<EdgeRow> = sqlx::query_as(
896 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
897 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
898 edge_type
899 FROM graph_edges
900 WHERE valid_to IS NULL AND id > ?1
901 ORDER BY id ASC
902 LIMIT ?2",
903 )
904 .bind(after_id)
905 .bind(limit)
906 .fetch_all(&self.pool)
907 .await?;
908 Ok(rows.into_iter().map(edge_from_row).collect())
909 }
910
911 pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
917 let row: Option<CommunityRow> = sqlx::query_as(
918 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
919 FROM graph_communities
920 WHERE id = ?1",
921 )
922 .bind(id)
923 .fetch_optional(&self.pool)
924 .await?;
925 match row {
926 Some(row) => {
927 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
928 Ok(Some(Community {
929 id: row.id,
930 name: row.name,
931 summary: row.summary,
932 entity_ids,
933 fingerprint: row.fingerprint,
934 created_at: row.created_at,
935 updated_at: row.updated_at,
936 }))
937 }
938 None => Ok(None),
939 }
940 }
941
942 pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
948 sqlx::query("DELETE FROM graph_communities")
949 .execute(&self.pool)
950 .await?;
951 Ok(())
952 }
953
954 pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
960 let days = i64::from(retention_days);
961 let result = sqlx::query(
962 "DELETE FROM graph_edges
963 WHERE expired_at IS NOT NULL
964 AND expired_at < datetime('now', '-' || ?1 || ' days')",
965 )
966 .bind(days)
967 .execute(&self.pool)
968 .await?;
969 Ok(usize::try_from(result.rows_affected())?)
970 }
971
972 pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
978 let days = i64::from(retention_days);
979 let result = sqlx::query(
980 "DELETE FROM graph_entities
981 WHERE id NOT IN (
982 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
983 UNION
984 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
985 )
986 AND last_seen_at < datetime('now', '-' || ?1 || ' days')",
987 )
988 .bind(days)
989 .execute(&self.pool)
990 .await?;
991 Ok(usize::try_from(result.rows_affected())?)
992 }
993
994 pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1003 let current = self.entity_count().await?;
1004 let max = i64::try_from(max_entities)?;
1005 if current <= max {
1006 return Ok(0);
1007 }
1008 let excess = current - max;
1009 let result = sqlx::query(
1010 "DELETE FROM graph_entities
1011 WHERE id IN (
1012 SELECT e.id
1013 FROM graph_entities e
1014 LEFT JOIN (
1015 SELECT source_entity_id AS eid, COUNT(*) AS cnt
1016 FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1017 UNION ALL
1018 SELECT target_entity_id AS eid, COUNT(*) AS cnt
1019 FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1020 ) edge_counts ON e.id = edge_counts.eid
1021 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1022 LIMIT ?1
1023 )",
1024 )
1025 .bind(excess)
1026 .execute(&self.pool)
1027 .await?;
1028 Ok(usize::try_from(result.rows_affected())?)
1029 }
1030
1031 pub async fn edges_at_timestamp(
1045 &self,
1046 entity_id: i64,
1047 timestamp: &str,
1048 ) -> Result<Vec<Edge>, MemoryError> {
1049 let rows: Vec<EdgeRow> = sqlx::query_as(
1053 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1054 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1055 edge_type
1056 FROM graph_edges
1057 WHERE valid_to IS NULL
1058 AND valid_from <= ?2
1059 AND (source_entity_id = ?1 OR target_entity_id = ?1)
1060 UNION ALL
1061 SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1062 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1063 edge_type
1064 FROM graph_edges
1065 WHERE valid_to IS NOT NULL
1066 AND valid_from <= ?2
1067 AND valid_to > ?2
1068 AND (source_entity_id = ?1 OR target_entity_id = ?1)",
1069 )
1070 .bind(entity_id)
1071 .bind(timestamp)
1072 .fetch_all(&self.pool)
1073 .await?;
1074 Ok(rows.into_iter().map(edge_from_row).collect())
1075 }
1076
1077 pub async fn edge_history(
1086 &self,
1087 source_entity_id: i64,
1088 predicate: &str,
1089 relation: Option<&str>,
1090 limit: usize,
1091 ) -> Result<Vec<Edge>, MemoryError> {
1092 let escaped = predicate
1094 .replace('\\', "\\\\")
1095 .replace('%', "\\%")
1096 .replace('_', "\\_");
1097 let like_pattern = format!("%{escaped}%");
1098 let limit = i64::try_from(limit)?;
1099 let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1100 sqlx::query_as(
1101 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1102 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1103 edge_type
1104 FROM graph_edges
1105 WHERE source_entity_id = ?1
1106 AND fact LIKE ?2 ESCAPE '\\'
1107 AND relation = ?3
1108 ORDER BY valid_from DESC
1109 LIMIT ?4",
1110 )
1111 .bind(source_entity_id)
1112 .bind(&like_pattern)
1113 .bind(rel)
1114 .bind(limit)
1115 .fetch_all(&self.pool)
1116 .await?
1117 } else {
1118 sqlx::query_as(
1119 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1120 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1121 edge_type
1122 FROM graph_edges
1123 WHERE source_entity_id = ?1
1124 AND fact LIKE ?2 ESCAPE '\\'
1125 ORDER BY valid_from DESC
1126 LIMIT ?3",
1127 )
1128 .bind(source_entity_id)
1129 .bind(&like_pattern)
1130 .bind(limit)
1131 .fetch_all(&self.pool)
1132 .await?
1133 };
1134 Ok(rows.into_iter().map(edge_from_row).collect())
1135 }
1136
1137 pub async fn bfs(
1154 &self,
1155 start_entity_id: i64,
1156 max_hops: u32,
1157 ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1158 self.bfs_with_depth(start_entity_id, max_hops)
1159 .await
1160 .map(|(e, ed, _)| (e, ed))
1161 }
1162
1163 pub async fn bfs_with_depth(
1174 &self,
1175 start_entity_id: i64,
1176 max_hops: u32,
1177 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1178 self.bfs_core(start_entity_id, max_hops, None).await
1179 }
1180
1181 pub async fn bfs_at_timestamp(
1192 &self,
1193 start_entity_id: i64,
1194 max_hops: u32,
1195 timestamp: &str,
1196 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1197 self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1198 .await
1199 }
1200
1201 pub async fn bfs_typed(
1217 &self,
1218 start_entity_id: i64,
1219 max_hops: u32,
1220 edge_types: &[EdgeType],
1221 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1222 if edge_types.is_empty() {
1223 return self.bfs_with_depth(start_entity_id, max_hops).await;
1224 }
1225 self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1226 .await
1227 }
1228
1229 async fn bfs_core(
1237 &self,
1238 start_entity_id: i64,
1239 max_hops: u32,
1240 at_timestamp: Option<&str>,
1241 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1242 use std::collections::HashMap;
1243
1244 const MAX_FRONTIER: usize = 300;
1247
1248 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1249 let mut frontier: Vec<i64> = vec![start_entity_id];
1250 depth_map.insert(start_entity_id, 0);
1251
1252 for hop in 0..max_hops {
1253 if frontier.is_empty() {
1254 break;
1255 }
1256 frontier.truncate(MAX_FRONTIER);
1257 let placeholders = frontier
1259 .iter()
1260 .enumerate()
1261 .map(|(i, _)| format!("?{}", i + 1))
1262 .collect::<Vec<_>>()
1263 .join(", ");
1264 let edge_filter = if at_timestamp.is_some() {
1265 let ts_pos = frontier.len() * 3 + 1;
1266 format!("valid_from <= ?{ts_pos} AND (valid_to IS NULL OR valid_to > ?{ts_pos})")
1267 } else {
1268 "valid_to IS NULL".to_owned()
1269 };
1270 let neighbour_sql = format!(
1271 "SELECT DISTINCT CASE
1272 WHEN source_entity_id IN ({placeholders}) THEN target_entity_id
1273 ELSE source_entity_id
1274 END as neighbour_id
1275 FROM graph_edges
1276 WHERE {edge_filter}
1277 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders}))"
1278 );
1279 let mut q = sqlx::query_scalar::<_, i64>(&neighbour_sql);
1280 for id in &frontier {
1281 q = q.bind(*id);
1282 }
1283 for id in &frontier {
1284 q = q.bind(*id);
1285 }
1286 for id in &frontier {
1287 q = q.bind(*id);
1288 }
1289 if let Some(ts) = at_timestamp {
1290 q = q.bind(ts);
1291 }
1292 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1293 let mut next_frontier: Vec<i64> = Vec::new();
1294 for nbr in neighbours {
1295 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1296 e.insert(hop + 1);
1297 next_frontier.push(nbr);
1298 }
1299 }
1300 frontier = next_frontier;
1301 }
1302
1303 self.bfs_fetch_results(depth_map, at_timestamp).await
1304 }
1305
1306 async fn bfs_core_typed(
1315 &self,
1316 start_entity_id: i64,
1317 max_hops: u32,
1318 at_timestamp: Option<&str>,
1319 edge_types: &[EdgeType],
1320 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1321 use std::collections::HashMap;
1322
1323 const MAX_FRONTIER: usize = 300;
1324
1325 let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1326
1327 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1328 let mut frontier: Vec<i64> = vec![start_entity_id];
1329 depth_map.insert(start_entity_id, 0);
1330
1331 let n_types = type_strs.len();
1332 let type_in = (1..=n_types)
1334 .map(|i| format!("?{i}"))
1335 .collect::<Vec<_>>()
1336 .join(", ");
1337 let id_start = n_types + 1;
1338
1339 for hop in 0..max_hops {
1340 if frontier.is_empty() {
1341 break;
1342 }
1343 frontier.truncate(MAX_FRONTIER);
1344
1345 let n_frontier = frontier.len();
1346 let frontier_placeholders = frontier
1348 .iter()
1349 .enumerate()
1350 .map(|(i, _)| format!("?{}", id_start + i))
1351 .collect::<Vec<_>>()
1352 .join(", ");
1353
1354 let edge_filter = if at_timestamp.is_some() {
1355 let ts_pos = id_start + n_frontier * 3;
1356 format!(
1357 "edge_type IN ({type_in}) AND valid_from <= ?{ts_pos} AND (valid_to IS NULL OR valid_to > ?{ts_pos})"
1358 )
1359 } else {
1360 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1361 };
1362
1363 let neighbour_sql = format!(
1364 "SELECT DISTINCT CASE
1365 WHEN source_entity_id IN ({frontier_placeholders}) THEN target_entity_id
1366 ELSE source_entity_id
1367 END as neighbour_id
1368 FROM graph_edges
1369 WHERE {edge_filter}
1370 AND (source_entity_id IN ({frontier_placeholders}) OR target_entity_id IN ({frontier_placeholders}))"
1371 );
1372
1373 let mut q = sqlx::query_scalar::<_, i64>(&neighbour_sql);
1374 for t in &type_strs {
1376 q = q.bind(*t);
1377 }
1378 for id in &frontier {
1380 q = q.bind(*id);
1381 }
1382 for id in &frontier {
1383 q = q.bind(*id);
1384 }
1385 for id in &frontier {
1386 q = q.bind(*id);
1387 }
1388 if let Some(ts) = at_timestamp {
1389 q = q.bind(ts);
1390 }
1391
1392 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1393 let mut next_frontier: Vec<i64> = Vec::new();
1394 for nbr in neighbours {
1395 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1396 e.insert(hop + 1);
1397 next_frontier.push(nbr);
1398 }
1399 }
1400 frontier = next_frontier;
1401 }
1402
1403 self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1405 .await
1406 }
1407
1408 async fn bfs_fetch_results_typed(
1416 &self,
1417 depth_map: std::collections::HashMap<i64, u32>,
1418 at_timestamp: Option<&str>,
1419 type_strs: &[&str],
1420 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1421 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1422 if visited_ids.is_empty() {
1423 return Ok((Vec::new(), Vec::new(), depth_map));
1424 }
1425 if visited_ids.len() > 499 {
1426 tracing::warn!(
1427 total = visited_ids.len(),
1428 retained = 499,
1429 "bfs_fetch_results_typed: visited entity set truncated to 499"
1430 );
1431 visited_ids.truncate(499);
1432 }
1433
1434 let n_types = type_strs.len();
1435 let n_visited = visited_ids.len();
1436
1437 let type_in = (1..=n_types)
1439 .map(|i| format!("?{i}"))
1440 .collect::<Vec<_>>()
1441 .join(", ");
1442 let id_start = n_types + 1;
1443 let placeholders = visited_ids
1444 .iter()
1445 .enumerate()
1446 .map(|(i, _)| format!("?{}", id_start + i))
1447 .collect::<Vec<_>>()
1448 .join(", ");
1449
1450 let edge_filter = if at_timestamp.is_some() {
1451 let ts_pos = id_start + n_visited * 2;
1452 format!(
1453 "edge_type IN ({type_in}) AND valid_from <= ?{ts_pos} AND (valid_to IS NULL OR valid_to > ?{ts_pos})"
1454 )
1455 } else {
1456 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1457 };
1458
1459 let edge_sql = format!(
1460 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1461 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1462 edge_type
1463 FROM graph_edges
1464 WHERE {edge_filter}
1465 AND source_entity_id IN ({placeholders})
1466 AND target_entity_id IN ({placeholders})"
1467 );
1468 let mut edge_query = sqlx::query_as::<_, EdgeRow>(&edge_sql);
1469 for t in type_strs {
1470 edge_query = edge_query.bind(*t);
1471 }
1472 for id in &visited_ids {
1473 edge_query = edge_query.bind(*id);
1474 }
1475 for id in &visited_ids {
1476 edge_query = edge_query.bind(*id);
1477 }
1478 if let Some(ts) = at_timestamp {
1479 edge_query = edge_query.bind(ts);
1480 }
1481 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1482
1483 let entity_sql2 = {
1485 let ph = visited_ids
1486 .iter()
1487 .enumerate()
1488 .map(|(i, _)| format!("?{}", i + 1))
1489 .collect::<Vec<_>>()
1490 .join(", ");
1491 format!(
1492 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1493 FROM graph_entities WHERE id IN ({ph})"
1494 )
1495 };
1496 let mut entity_query = sqlx::query_as::<_, EntityRow>(&entity_sql2);
1497 for id in &visited_ids {
1498 entity_query = entity_query.bind(*id);
1499 }
1500 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1501
1502 let entities: Vec<Entity> = entity_rows
1503 .into_iter()
1504 .map(entity_from_row)
1505 .collect::<Result<Vec<_>, _>>()?;
1506 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1507
1508 Ok((entities, edges, depth_map))
1509 }
1510
1511 async fn bfs_fetch_results(
1513 &self,
1514 depth_map: std::collections::HashMap<i64, u32>,
1515 at_timestamp: Option<&str>,
1516 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1517 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1518 if visited_ids.is_empty() {
1519 return Ok((Vec::new(), Vec::new(), depth_map));
1520 }
1521 if visited_ids.len() > 499 {
1523 tracing::warn!(
1524 total = visited_ids.len(),
1525 retained = 499,
1526 "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1527 some reachable entities will be dropped from results"
1528 );
1529 visited_ids.truncate(499);
1530 }
1531
1532 let placeholders = visited_ids
1533 .iter()
1534 .enumerate()
1535 .map(|(i, _)| format!("?{}", i + 1))
1536 .collect::<Vec<_>>()
1537 .join(", ");
1538 let edge_filter = if at_timestamp.is_some() {
1539 let ts_pos = visited_ids.len() * 2 + 1;
1540 format!("valid_from <= ?{ts_pos} AND (valid_to IS NULL OR valid_to > ?{ts_pos})")
1541 } else {
1542 "valid_to IS NULL".to_owned()
1543 };
1544 let edge_sql = format!(
1545 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1546 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1547 edge_type
1548 FROM graph_edges
1549 WHERE {edge_filter}
1550 AND source_entity_id IN ({placeholders})
1551 AND target_entity_id IN ({placeholders})"
1552 );
1553 let mut edge_query = sqlx::query_as::<_, EdgeRow>(&edge_sql);
1554 for id in &visited_ids {
1555 edge_query = edge_query.bind(*id);
1556 }
1557 for id in &visited_ids {
1558 edge_query = edge_query.bind(*id);
1559 }
1560 if let Some(ts) = at_timestamp {
1561 edge_query = edge_query.bind(ts);
1562 }
1563 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1564
1565 let entity_sql = format!(
1566 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1567 FROM graph_entities WHERE id IN ({placeholders})"
1568 );
1569 let mut entity_query = sqlx::query_as::<_, EntityRow>(&entity_sql);
1570 for id in &visited_ids {
1571 entity_query = entity_query.bind(*id);
1572 }
1573 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1574
1575 let entities: Vec<Entity> = entity_rows
1576 .into_iter()
1577 .map(entity_from_row)
1578 .collect::<Result<Vec<_>, _>>()?;
1579 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1580
1581 Ok((entities, edges, depth_map))
1582 }
1583
1584 pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
1600 let rows: Vec<EntityRow> = sqlx::query_as(
1601 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1602 FROM graph_entities
1603 WHERE name = ?1 COLLATE NOCASE OR canonical_name = ?1 COLLATE NOCASE
1604 LIMIT 5",
1605 )
1606 .bind(name)
1607 .fetch_all(&self.pool)
1608 .await?;
1609
1610 if !rows.is_empty() {
1611 return rows.into_iter().map(entity_from_row).collect();
1612 }
1613
1614 self.find_entities_fuzzy(name, 5).await
1615 }
1616
1617 pub async fn unprocessed_messages_for_backfill(
1625 &self,
1626 limit: usize,
1627 ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
1628 let limit = i64::try_from(limit)?;
1629 let rows: Vec<(i64, String)> = sqlx::query_as(
1630 "SELECT id, content FROM messages
1631 WHERE graph_processed = 0
1632 ORDER BY id ASC
1633 LIMIT ?1",
1634 )
1635 .bind(limit)
1636 .fetch_all(&self.pool)
1637 .await?;
1638 Ok(rows
1639 .into_iter()
1640 .map(|(id, content)| (crate::types::MessageId(id), content))
1641 .collect())
1642 }
1643
1644 pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
1650 let count: i64 =
1651 sqlx::query_scalar("SELECT COUNT(*) FROM messages WHERE graph_processed = 0")
1652 .fetch_one(&self.pool)
1653 .await?;
1654 Ok(count)
1655 }
1656
1657 pub async fn mark_messages_graph_processed(
1663 &self,
1664 ids: &[crate::types::MessageId],
1665 ) -> Result<(), MemoryError> {
1666 if ids.is_empty() {
1667 return Ok(());
1668 }
1669 let placeholders = ids
1670 .iter()
1671 .enumerate()
1672 .map(|(i, _)| format!("?{}", i + 1))
1673 .collect::<Vec<_>>()
1674 .join(", ");
1675 let sql = format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
1676 let mut query = sqlx::query(&sql);
1677 for id in ids {
1678 query = query.bind(id.0);
1679 }
1680 query.execute(&self.pool).await?;
1681 Ok(())
1682 }
1683}
1684
1685#[derive(sqlx::FromRow)]
1688struct EntityRow {
1689 id: i64,
1690 name: String,
1691 canonical_name: String,
1692 entity_type: String,
1693 summary: Option<String>,
1694 first_seen_at: String,
1695 last_seen_at: String,
1696 qdrant_point_id: Option<String>,
1697}
1698
1699fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
1700 let entity_type = row
1701 .entity_type
1702 .parse::<EntityType>()
1703 .map_err(MemoryError::GraphStore)?;
1704 Ok(Entity {
1705 id: row.id,
1706 name: row.name,
1707 canonical_name: row.canonical_name,
1708 entity_type,
1709 summary: row.summary,
1710 first_seen_at: row.first_seen_at,
1711 last_seen_at: row.last_seen_at,
1712 qdrant_point_id: row.qdrant_point_id,
1713 })
1714}
1715
1716#[derive(sqlx::FromRow)]
1717struct AliasRow {
1718 id: i64,
1719 entity_id: i64,
1720 alias_name: String,
1721 created_at: String,
1722}
1723
1724fn alias_from_row(row: AliasRow) -> EntityAlias {
1725 EntityAlias {
1726 id: row.id,
1727 entity_id: row.entity_id,
1728 alias_name: row.alias_name,
1729 created_at: row.created_at,
1730 }
1731}
1732
1733#[derive(sqlx::FromRow)]
1734struct EdgeRow {
1735 id: i64,
1736 source_entity_id: i64,
1737 target_entity_id: i64,
1738 relation: String,
1739 fact: String,
1740 confidence: f64,
1741 valid_from: String,
1742 valid_to: Option<String>,
1743 created_at: String,
1744 expired_at: Option<String>,
1745 episode_id: Option<i64>,
1746 qdrant_point_id: Option<String>,
1747 edge_type: String,
1748}
1749
1750fn edge_from_row(row: EdgeRow) -> Edge {
1751 let edge_type = row
1752 .edge_type
1753 .parse::<EdgeType>()
1754 .unwrap_or(EdgeType::Semantic);
1755 Edge {
1756 id: row.id,
1757 source_entity_id: row.source_entity_id,
1758 target_entity_id: row.target_entity_id,
1759 relation: row.relation,
1760 fact: row.fact,
1761 #[allow(clippy::cast_possible_truncation)]
1762 confidence: row.confidence as f32,
1763 valid_from: row.valid_from,
1764 valid_to: row.valid_to,
1765 created_at: row.created_at,
1766 expired_at: row.expired_at,
1767 episode_id: row.episode_id.map(MessageId),
1768 qdrant_point_id: row.qdrant_point_id,
1769 edge_type,
1770 }
1771}
1772
1773#[derive(sqlx::FromRow)]
1774struct CommunityRow {
1775 id: i64,
1776 name: String,
1777 summary: String,
1778 entity_ids: String,
1779 fingerprint: Option<String>,
1780 created_at: String,
1781 updated_at: String,
1782}
1783
1784#[cfg(test)]
1787mod tests;