1use std::collections::HashMap;
11#[allow(unused_imports)]
12use zeph_db::sql;
13
14use futures::Stream;
15use zeph_db::fts::sanitize_fts_query;
16use zeph_db::{ActiveDialect, DbPool, numbered_placeholder, placeholder_list};
17
18use crate::error::MemoryError;
19use crate::types::MessageId;
20
21use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
22
23pub struct GraphStore {
30 pool: DbPool,
31}
32
33impl GraphStore {
34 #[must_use]
38 pub fn new(pool: DbPool) -> Self {
39 Self { pool }
40 }
41
42 #[must_use]
44 pub fn pool(&self) -> &DbPool {
45 &self.pool
46 }
47
48 pub async fn upsert_entity(
61 &self,
62 surface_name: &str,
63 canonical_name: &str,
64 entity_type: EntityType,
65 summary: Option<&str>,
66 ) -> Result<i64, MemoryError> {
67 let type_str = entity_type.as_str();
68 let id: i64 = zeph_db::query_scalar(sql!(
69 "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
70 VALUES (?, ?, ?, ?)
71 ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
72 name = excluded.name,
73 summary = COALESCE(excluded.summary, summary),
74 last_seen_at = CURRENT_TIMESTAMP
75 RETURNING id"
76 ))
77 .bind(surface_name)
78 .bind(canonical_name)
79 .bind(type_str)
80 .bind(summary)
81 .fetch_one(&self.pool)
82 .await?;
83 Ok(id)
84 }
85
86 pub async fn find_entity(
92 &self,
93 canonical_name: &str,
94 entity_type: EntityType,
95 ) -> Result<Option<Entity>, MemoryError> {
96 let type_str = entity_type.as_str();
97 let row: Option<EntityRow> = zeph_db::query_as(
98 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
99 FROM graph_entities
100 WHERE canonical_name = ? AND entity_type = ?"),
101 )
102 .bind(canonical_name)
103 .bind(type_str)
104 .fetch_optional(&self.pool)
105 .await?;
106 row.map(entity_from_row).transpose()
107 }
108
109 pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
115 let row: Option<EntityRow> = zeph_db::query_as(
116 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
117 FROM graph_entities
118 WHERE id = ?"),
119 )
120 .bind(entity_id)
121 .fetch_optional(&self.pool)
122 .await?;
123 row.map(entity_from_row).transpose()
124 }
125
126 pub async fn set_entity_qdrant_point_id(
132 &self,
133 entity_id: i64,
134 point_id: &str,
135 ) -> Result<(), MemoryError> {
136 zeph_db::query(sql!(
137 "UPDATE graph_entities SET qdrant_point_id = ? WHERE id = ?"
138 ))
139 .bind(point_id)
140 .bind(entity_id)
141 .execute(&self.pool)
142 .await?;
143 Ok(())
144 }
145
146 pub async fn find_entities_fuzzy(
167 &self,
168 query: &str,
169 limit: usize,
170 ) -> Result<Vec<Entity>, MemoryError> {
171 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
175 let query = &query[..query.floor_char_boundary(512)];
176 let sanitized = sanitize_fts_query(query);
179 if sanitized.is_empty() {
180 return Ok(vec![]);
181 }
182 let fts_query: String = sanitized
183 .split_whitespace()
184 .filter(|t| !FTS5_OPERATORS.contains(t))
185 .map(|t| format!("{t}*"))
186 .collect::<Vec<_>>()
187 .join(" ");
188 if fts_query.is_empty() {
189 return Ok(vec![]);
190 }
191
192 let limit = i64::try_from(limit)?;
193 let search_sql = format!(
196 "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
197 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
198 FROM graph_entities_fts fts \
199 JOIN graph_entities e ON e.id = fts.rowid \
200 WHERE graph_entities_fts MATCH ? \
201 UNION \
202 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
203 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
204 FROM graph_entity_aliases a \
205 JOIN graph_entities e ON e.id = a.entity_id \
206 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
207 LIMIT ?",
208 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
209 );
210 let rows: Vec<EntityRow> = zeph_db::query_as(&search_sql)
211 .bind(&fts_query)
212 .bind(format!(
213 "%{}%",
214 query
215 .trim()
216 .replace('\\', "\\\\")
217 .replace('%', "\\%")
218 .replace('_', "\\_")
219 ))
220 .bind(limit)
221 .fetch_all(&self.pool)
222 .await?;
223 rows.into_iter()
224 .map(entity_from_row)
225 .collect::<Result<Vec<_>, _>>()
226 }
227
228 #[cfg(feature = "sqlite")]
238 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
239 zeph_db::query("PRAGMA wal_checkpoint(PASSIVE)")
240 .execute(&self.pool)
241 .await?;
242 Ok(())
243 }
244
245 #[cfg(feature = "postgres")]
251 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
252 Ok(())
253 }
254
255 pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
257 use futures::StreamExt as _;
258 zeph_db::query_as::<_, EntityRow>(
259 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
260 FROM graph_entities ORDER BY id ASC"),
261 )
262 .fetch(&self.pool)
263 .map(|r: Result<EntityRow, zeph_db::SqlxError>| {
264 r.map_err(MemoryError::from).and_then(entity_from_row)
265 })
266 }
267
268 pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
276 let insert_alias_sql = format!(
277 "{} INTO graph_entity_aliases (entity_id, alias_name) VALUES (?, ?){}",
278 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
279 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
280 );
281 zeph_db::query(&insert_alias_sql)
282 .bind(entity_id)
283 .bind(alias_name)
284 .execute(&self.pool)
285 .await?;
286 Ok(())
287 }
288
289 pub async fn find_entity_by_alias(
297 &self,
298 alias_name: &str,
299 entity_type: EntityType,
300 ) -> Result<Option<Entity>, MemoryError> {
301 let type_str = entity_type.as_str();
302 let alias_typed_sql = format!(
303 "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
304 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
305 FROM graph_entity_aliases a \
306 JOIN graph_entities e ON e.id = a.entity_id \
307 WHERE a.alias_name = ? {} \
308 AND e.entity_type = ? \
309 ORDER BY e.id ASC \
310 LIMIT 1",
311 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
312 );
313 let row: Option<EntityRow> = zeph_db::query_as(&alias_typed_sql)
314 .bind(alias_name)
315 .bind(type_str)
316 .fetch_optional(&self.pool)
317 .await?;
318 row.map(entity_from_row).transpose()
319 }
320
321 pub async fn aliases_for_entity(
327 &self,
328 entity_id: i64,
329 ) -> Result<Vec<EntityAlias>, MemoryError> {
330 let rows: Vec<AliasRow> = zeph_db::query_as(sql!(
331 "SELECT id, entity_id, alias_name, created_at
332 FROM graph_entity_aliases
333 WHERE entity_id = ?
334 ORDER BY id ASC"
335 ))
336 .bind(entity_id)
337 .fetch_all(&self.pool)
338 .await?;
339 Ok(rows.into_iter().map(alias_from_row).collect())
340 }
341
342 pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
348 use futures::TryStreamExt as _;
349 self.all_entities_stream().try_collect().await
350 }
351
352 pub async fn entity_count(&self) -> Result<i64, MemoryError> {
358 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_entities"))
359 .fetch_one(&self.pool)
360 .await?;
361 Ok(count)
362 }
363
364 pub async fn insert_edge(
382 &self,
383 source_entity_id: i64,
384 target_entity_id: i64,
385 relation: &str,
386 fact: &str,
387 confidence: f32,
388 episode_id: Option<MessageId>,
389 ) -> Result<i64, MemoryError> {
390 self.insert_edge_typed(
391 source_entity_id,
392 target_entity_id,
393 relation,
394 fact,
395 confidence,
396 episode_id,
397 EdgeType::Semantic,
398 )
399 .await
400 }
401
402 #[allow(clippy::too_many_arguments)]
411 pub async fn insert_edge_typed(
412 &self,
413 source_entity_id: i64,
414 target_entity_id: i64,
415 relation: &str,
416 fact: &str,
417 confidence: f32,
418 episode_id: Option<MessageId>,
419 edge_type: EdgeType,
420 ) -> Result<i64, MemoryError> {
421 if source_entity_id == target_entity_id {
422 return Err(MemoryError::InvalidInput(format!(
423 "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
424 )));
425 }
426 let confidence = confidence.clamp(0.0, 1.0);
427 let edge_type_str = edge_type.as_str();
428
429 let mut tx = zeph_db::begin(&self.pool).await?;
434
435 let existing: Option<(i64, f64)> = zeph_db::query_as(sql!(
436 "SELECT id, confidence FROM graph_edges
437 WHERE source_entity_id = ?
438 AND target_entity_id = ?
439 AND relation = ?
440 AND edge_type = ?
441 AND valid_to IS NULL
442 LIMIT 1"
443 ))
444 .bind(source_entity_id)
445 .bind(target_entity_id)
446 .bind(relation)
447 .bind(edge_type_str)
448 .fetch_optional(&mut *tx)
449 .await?;
450
451 if let Some((existing_id, stored_conf)) = existing {
452 let updated_conf = f64::from(confidence).max(stored_conf);
453 zeph_db::query(sql!("UPDATE graph_edges SET confidence = ? WHERE id = ?"))
454 .bind(updated_conf)
455 .bind(existing_id)
456 .execute(&mut *tx)
457 .await?;
458 tx.commit().await?;
459 return Ok(existing_id);
460 }
461
462 let episode_raw: Option<i64> = episode_id.map(|m| m.0);
463 let id: i64 = zeph_db::query_scalar(sql!(
464 "INSERT INTO graph_edges
465 (source_entity_id, target_entity_id, relation, fact, confidence, episode_id, edge_type)
466 VALUES (?, ?, ?, ?, ?, ?, ?)
467 RETURNING id"
468 ))
469 .bind(source_entity_id)
470 .bind(target_entity_id)
471 .bind(relation)
472 .bind(fact)
473 .bind(f64::from(confidence))
474 .bind(episode_raw)
475 .bind(edge_type_str)
476 .fetch_one(&mut *tx)
477 .await?;
478 tx.commit().await?;
479 Ok(id)
480 }
481
482 pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
488 zeph_db::query(sql!(
489 "UPDATE graph_edges SET valid_to = CURRENT_TIMESTAMP, expired_at = CURRENT_TIMESTAMP
490 WHERE id = ?"
491 ))
492 .bind(edge_id)
493 .execute(&self.pool)
494 .await?;
495 Ok(())
496 }
497
498 pub async fn invalidate_edge_with_supersession(
506 &self,
507 old_edge_id: i64,
508 new_edge_id: i64,
509 ) -> Result<(), MemoryError> {
510 zeph_db::query(sql!(
511 "UPDATE graph_edges
512 SET valid_to = CURRENT_TIMESTAMP,
513 expired_at = CURRENT_TIMESTAMP,
514 superseded_by = ?
515 WHERE id = ?"
516 ))
517 .bind(new_edge_id)
518 .bind(old_edge_id)
519 .execute(&self.pool)
520 .await?;
521 Ok(())
522 }
523
524 pub async fn edges_for_entities(
541 &self,
542 entity_ids: &[i64],
543 edge_types: &[super::types::EdgeType],
544 ) -> Result<Vec<Edge>, MemoryError> {
545 const MAX_BATCH_ENTITIES: usize = 490;
549
550 let mut all_edges: Vec<Edge> = Vec::new();
551
552 for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
553 let edges = self.query_batch_edges(chunk, edge_types).await?;
554 all_edges.extend(edges);
555 }
556
557 Ok(all_edges)
558 }
559
560 async fn query_batch_edges(
564 &self,
565 entity_ids: &[i64],
566 edge_types: &[super::types::EdgeType],
567 ) -> Result<Vec<Edge>, MemoryError> {
568 if entity_ids.is_empty() {
569 return Ok(Vec::new());
570 }
571
572 let n_ids = entity_ids.len();
575 let n_types = edge_types.len();
576
577 let sql = if n_types == 0 {
578 let placeholders = placeholder_list(1, n_ids);
580 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
581 format!(
582 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
583 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
584 edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
585 FROM graph_edges
586 WHERE valid_to IS NULL
587 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))"
588 )
589 } else {
590 let placeholders = placeholder_list(1, n_ids);
591 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
592 let type_placeholders = placeholder_list(n_ids * 2 + 1, n_types);
593 format!(
594 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
595 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
596 edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
597 FROM graph_edges
598 WHERE valid_to IS NULL
599 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))
600 AND edge_type IN ({type_placeholders})"
601 )
602 };
603
604 let mut query = zeph_db::query_as::<_, EdgeRow>(&sql);
606 for id in entity_ids {
607 query = query.bind(*id);
608 }
609 for id in entity_ids {
610 query = query.bind(*id);
611 }
612 for et in edge_types {
613 query = query.bind(et.as_str());
614 }
615
616 let rows: Vec<EdgeRow> = tokio::time::timeout(
620 std::time::Duration::from_millis(500),
621 query.fetch_all(&self.pool),
622 )
623 .await
624 .map_err(|_| MemoryError::Timeout("graph pool acquire timed out after 500ms".into()))??;
625 Ok(rows.into_iter().map(edge_from_row).collect())
626 }
627
628 pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
634 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
635 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
636 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
637 edge_type, retrieval_count, last_retrieved_at, superseded_by
638 FROM graph_edges
639 WHERE valid_to IS NULL
640 AND (source_entity_id = ? OR target_entity_id = ?)"
641 ))
642 .bind(entity_id)
643 .bind(entity_id)
644 .fetch_all(&self.pool)
645 .await?;
646 Ok(rows.into_iter().map(edge_from_row).collect())
647 }
648
649 pub async fn edge_history_for_entity(
656 &self,
657 entity_id: i64,
658 limit: usize,
659 ) -> Result<Vec<Edge>, MemoryError> {
660 let limit = i64::try_from(limit)?;
661 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
662 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
663 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
664 edge_type, retrieval_count, last_retrieved_at, superseded_by
665 FROM graph_edges
666 WHERE source_entity_id = ? OR target_entity_id = ?
667 ORDER BY valid_from DESC
668 LIMIT ?"
669 ))
670 .bind(entity_id)
671 .bind(entity_id)
672 .bind(limit)
673 .fetch_all(&self.pool)
674 .await?;
675 Ok(rows.into_iter().map(edge_from_row).collect())
676 }
677
678 pub async fn edges_between(
684 &self,
685 entity_a: i64,
686 entity_b: i64,
687 ) -> Result<Vec<Edge>, MemoryError> {
688 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
689 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
690 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
691 edge_type, retrieval_count, last_retrieved_at, superseded_by
692 FROM graph_edges
693 WHERE valid_to IS NULL
694 AND ((source_entity_id = ? AND target_entity_id = ?)
695 OR (source_entity_id = ? AND target_entity_id = ?))"
696 ))
697 .bind(entity_a)
698 .bind(entity_b)
699 .bind(entity_b)
700 .bind(entity_a)
701 .fetch_all(&self.pool)
702 .await?;
703 Ok(rows.into_iter().map(edge_from_row).collect())
704 }
705
706 pub async fn edges_exact(
712 &self,
713 source_entity_id: i64,
714 target_entity_id: i64,
715 ) -> Result<Vec<Edge>, MemoryError> {
716 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
717 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
718 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
719 edge_type, retrieval_count, last_retrieved_at, superseded_by
720 FROM graph_edges
721 WHERE valid_to IS NULL
722 AND source_entity_id = ?
723 AND target_entity_id = ?"
724 ))
725 .bind(source_entity_id)
726 .bind(target_entity_id)
727 .fetch_all(&self.pool)
728 .await?;
729 Ok(rows.into_iter().map(edge_from_row).collect())
730 }
731
732 pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
738 let count: i64 = zeph_db::query_scalar(sql!(
739 "SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL"
740 ))
741 .fetch_one(&self.pool)
742 .await?;
743 Ok(count)
744 }
745
746 pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
752 let rows: Vec<(String, i64)> = zeph_db::query_as(
753 sql!("SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type"),
754 )
755 .fetch_all(&self.pool)
756 .await?;
757 Ok(rows)
758 }
759
760 pub async fn upsert_community(
772 &self,
773 name: &str,
774 summary: &str,
775 entity_ids: &[i64],
776 fingerprint: Option<&str>,
777 ) -> Result<i64, MemoryError> {
778 let entity_ids_json = serde_json::to_string(entity_ids)?;
779 let id: i64 = zeph_db::query_scalar(sql!(
780 "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
781 VALUES (?, ?, ?, ?)
782 ON CONFLICT(name) DO UPDATE SET
783 summary = excluded.summary,
784 entity_ids = excluded.entity_ids,
785 fingerprint = COALESCE(excluded.fingerprint, fingerprint),
786 updated_at = CURRENT_TIMESTAMP
787 RETURNING id"
788 ))
789 .bind(name)
790 .bind(summary)
791 .bind(entity_ids_json)
792 .bind(fingerprint)
793 .fetch_one(&self.pool)
794 .await?;
795 Ok(id)
796 }
797
798 pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
805 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
806 "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL"
807 ))
808 .fetch_all(&self.pool)
809 .await?;
810 Ok(rows.into_iter().collect())
811 }
812
813 pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
819 zeph_db::query(sql!("DELETE FROM graph_communities WHERE id = ?"))
820 .bind(id)
821 .execute(&self.pool)
822 .await?;
823 Ok(())
824 }
825
826 pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
835 zeph_db::query(sql!(
836 "UPDATE graph_communities SET fingerprint = NULL WHERE id = ?"
837 ))
838 .bind(id)
839 .execute(&self.pool)
840 .await?;
841 Ok(())
842 }
843
844 pub async fn community_for_entity(
853 &self,
854 entity_id: i64,
855 ) -> Result<Option<Community>, MemoryError> {
856 let row: Option<CommunityRow> = zeph_db::query_as(
857 sql!("SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
858 FROM graph_communities c, json_each(c.entity_ids) j
859 WHERE CAST(j.value AS INTEGER) = ?
860 LIMIT 1"),
861 )
862 .bind(entity_id)
863 .fetch_optional(&self.pool)
864 .await?;
865 match row {
866 Some(row) => {
867 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
868 Ok(Some(Community {
869 id: row.id,
870 name: row.name,
871 summary: row.summary,
872 entity_ids,
873 fingerprint: row.fingerprint,
874 created_at: row.created_at,
875 updated_at: row.updated_at,
876 }))
877 }
878 None => Ok(None),
879 }
880 }
881
882 pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
888 let rows: Vec<CommunityRow> = zeph_db::query_as(sql!(
889 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
890 FROM graph_communities
891 ORDER BY id ASC"
892 ))
893 .fetch_all(&self.pool)
894 .await?;
895
896 rows.into_iter()
897 .map(|row| {
898 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
899 Ok(Community {
900 id: row.id,
901 name: row.name,
902 summary: row.summary,
903 entity_ids,
904 fingerprint: row.fingerprint,
905 created_at: row.created_at,
906 updated_at: row.updated_at,
907 })
908 })
909 .collect()
910 }
911
912 pub async fn community_count(&self) -> Result<i64, MemoryError> {
918 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_communities"))
919 .fetch_one(&self.pool)
920 .await?;
921 Ok(count)
922 }
923
924 pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
932 let val: Option<String> =
933 zeph_db::query_scalar(sql!("SELECT value FROM graph_metadata WHERE key = ?"))
934 .bind(key)
935 .fetch_optional(&self.pool)
936 .await?;
937 Ok(val)
938 }
939
940 pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
946 zeph_db::query(sql!(
947 "INSERT INTO graph_metadata (key, value) VALUES (?, ?)
948 ON CONFLICT(key) DO UPDATE SET value = excluded.value"
949 ))
950 .bind(key)
951 .bind(value)
952 .execute(&self.pool)
953 .await?;
954 Ok(())
955 }
956
957 pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
965 let val = self.get_metadata("extraction_count").await?;
966 Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
967 }
968
969 pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
971 use futures::StreamExt as _;
972 zeph_db::query_as::<_, EdgeRow>(sql!(
973 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
974 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
975 edge_type, retrieval_count, last_retrieved_at, superseded_by
976 FROM graph_edges
977 WHERE valid_to IS NULL
978 ORDER BY id ASC"
979 ))
980 .fetch(&self.pool)
981 .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
982 }
983
984 pub async fn edges_after_id(
1001 &self,
1002 after_id: i64,
1003 limit: i64,
1004 ) -> Result<Vec<Edge>, MemoryError> {
1005 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1006 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1007 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1008 edge_type, retrieval_count, last_retrieved_at, superseded_by
1009 FROM graph_edges
1010 WHERE valid_to IS NULL AND id > ?
1011 ORDER BY id ASC
1012 LIMIT ?"
1013 ))
1014 .bind(after_id)
1015 .bind(limit)
1016 .fetch_all(&self.pool)
1017 .await?;
1018 Ok(rows.into_iter().map(edge_from_row).collect())
1019 }
1020
1021 pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
1027 let row: Option<CommunityRow> = zeph_db::query_as(sql!(
1028 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
1029 FROM graph_communities
1030 WHERE id = ?"
1031 ))
1032 .bind(id)
1033 .fetch_optional(&self.pool)
1034 .await?;
1035 match row {
1036 Some(row) => {
1037 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
1038 Ok(Some(Community {
1039 id: row.id,
1040 name: row.name,
1041 summary: row.summary,
1042 entity_ids,
1043 fingerprint: row.fingerprint,
1044 created_at: row.created_at,
1045 updated_at: row.updated_at,
1046 }))
1047 }
1048 None => Ok(None),
1049 }
1050 }
1051
1052 pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
1058 zeph_db::query(sql!("DELETE FROM graph_communities"))
1059 .execute(&self.pool)
1060 .await?;
1061 Ok(())
1062 }
1063
1064 #[allow(clippy::too_many_lines)]
1078 pub async fn find_entities_ranked(
1079 &self,
1080 query: &str,
1081 limit: usize,
1082 ) -> Result<Vec<(Entity, f32)>, MemoryError> {
1083 type EntityFtsRow = (
1086 i64,
1087 String,
1088 String,
1089 String,
1090 Option<String>,
1091 String,
1092 String,
1093 Option<String>,
1094 f64,
1095 );
1096
1097 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
1098 let query = &query[..query.floor_char_boundary(512)];
1099 let sanitized = sanitize_fts_query(query);
1100 if sanitized.is_empty() {
1101 return Ok(vec![]);
1102 }
1103 let fts_query: String = sanitized
1104 .split_whitespace()
1105 .filter(|t| !FTS5_OPERATORS.contains(t))
1106 .map(|t| format!("{t}*"))
1107 .collect::<Vec<_>>()
1108 .join(" ");
1109 if fts_query.is_empty() {
1110 return Ok(vec![]);
1111 }
1112
1113 let limit_i64 = i64::try_from(limit)?;
1114
1115 let ranked_fts_sql = format!(
1118 "SELECT * FROM ( \
1119 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1120 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1121 -bm25(graph_entities_fts, 10.0, 1.0) AS fts_rank \
1122 FROM graph_entities_fts fts \
1123 JOIN graph_entities e ON e.id = fts.rowid \
1124 WHERE graph_entities_fts MATCH ? \
1125 UNION ALL \
1126 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1127 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1128 0.5 AS fts_rank \
1129 FROM graph_entity_aliases a \
1130 JOIN graph_entities e ON e.id = a.entity_id \
1131 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
1132 ) \
1133 ORDER BY fts_rank DESC \
1134 LIMIT ?",
1135 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
1136 );
1137 let rows: Vec<EntityFtsRow> = zeph_db::query_as(&ranked_fts_sql)
1138 .bind(&fts_query)
1139 .bind(format!(
1140 "%{}%",
1141 query
1142 .trim()
1143 .replace('\\', "\\\\")
1144 .replace('%', "\\%")
1145 .replace('_', "\\_")
1146 ))
1147 .bind(limit_i64)
1148 .fetch_all(&self.pool)
1149 .await?;
1150
1151 if rows.is_empty() {
1152 return Ok(vec![]);
1153 }
1154
1155 let max_score: f64 = rows.iter().map(|r| r.8).fold(0.0_f64, f64::max);
1157 let max_score = if max_score <= 0.0 { 1.0 } else { max_score };
1158
1159 let mut seen_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
1161 let mut result: Vec<(Entity, f32)> = Vec::with_capacity(rows.len());
1162 for (
1163 id,
1164 name,
1165 canonical_name,
1166 entity_type_str,
1167 summary,
1168 first_seen_at,
1169 last_seen_at,
1170 qdrant_point_id,
1171 raw_score,
1172 ) in rows
1173 {
1174 if !seen_ids.insert(id) {
1175 continue;
1176 }
1177 let entity_type = entity_type_str
1178 .parse()
1179 .unwrap_or(super::types::EntityType::Concept);
1180 let entity = Entity {
1181 id,
1182 name,
1183 canonical_name,
1184 entity_type,
1185 summary,
1186 first_seen_at,
1187 last_seen_at,
1188 qdrant_point_id,
1189 };
1190 #[allow(clippy::cast_possible_truncation)]
1191 let normalized = (raw_score / max_score).clamp(0.0, 1.0) as f32;
1192 result.push((entity, normalized));
1193 }
1194
1195 Ok(result)
1196 }
1197
1198 pub async fn entity_structural_scores(
1208 &self,
1209 entity_ids: &[i64],
1210 ) -> Result<HashMap<i64, f32>, MemoryError> {
1211 const MAX_BATCH: usize = 163;
1214
1215 if entity_ids.is_empty() {
1216 return Ok(HashMap::new());
1217 }
1218
1219 let mut all_rows: Vec<(i64, i64, i64)> = Vec::new();
1220 for chunk in entity_ids.chunks(MAX_BATCH) {
1221 let n = chunk.len();
1222 let ph1 = placeholder_list(1, n);
1224 let ph2 = placeholder_list(n + 1, n);
1225 let ph3 = placeholder_list(n * 2 + 1, n);
1226
1227 let sql = format!(
1229 "SELECT entity_id,
1230 COUNT(*) AS degree,
1231 COUNT(DISTINCT edge_type) AS type_diversity
1232 FROM (
1233 SELECT source_entity_id AS entity_id, edge_type
1234 FROM graph_edges
1235 WHERE valid_to IS NULL AND source_entity_id IN ({ph1})
1236 UNION ALL
1237 SELECT target_entity_id AS entity_id, edge_type
1238 FROM graph_edges
1239 WHERE valid_to IS NULL AND target_entity_id IN ({ph2})
1240 )
1241 WHERE entity_id IN ({ph3})
1242 GROUP BY entity_id"
1243 );
1244
1245 let mut query = zeph_db::query_as::<_, (i64, i64, i64)>(&sql);
1246 for id in chunk {
1248 query = query.bind(*id);
1249 }
1250 for id in chunk {
1251 query = query.bind(*id);
1252 }
1253 for id in chunk {
1254 query = query.bind(*id);
1255 }
1256
1257 let chunk_rows: Vec<(i64, i64, i64)> = query.fetch_all(&self.pool).await?;
1258 all_rows.extend(chunk_rows);
1259 }
1260
1261 if all_rows.is_empty() {
1262 return Ok(entity_ids.iter().map(|&id| (id, 0.0_f32)).collect());
1263 }
1264
1265 let max_degree = all_rows
1266 .iter()
1267 .map(|(_, d, _)| *d)
1268 .max()
1269 .unwrap_or(1)
1270 .max(1);
1271
1272 let mut scores: HashMap<i64, f32> = entity_ids.iter().map(|&id| (id, 0.0_f32)).collect();
1273 for (entity_id, degree, type_diversity) in all_rows {
1274 #[allow(clippy::cast_precision_loss)]
1275 let norm_degree = degree as f32 / max_degree as f32;
1276 #[allow(clippy::cast_precision_loss)]
1277 let norm_diversity = (type_diversity as f32 / 4.0).min(1.0);
1278 let score = 0.6 * norm_degree + 0.4 * norm_diversity;
1279 scores.insert(entity_id, score);
1280 }
1281
1282 Ok(scores)
1283 }
1284
1285 pub async fn entity_community_ids(
1294 &self,
1295 entity_ids: &[i64],
1296 ) -> Result<HashMap<i64, i64>, MemoryError> {
1297 const MAX_BATCH: usize = 490;
1298
1299 if entity_ids.is_empty() {
1300 return Ok(HashMap::new());
1301 }
1302
1303 let mut result: HashMap<i64, i64> = HashMap::new();
1304 for chunk in entity_ids.chunks(MAX_BATCH) {
1305 let placeholders = placeholder_list(1, chunk.len());
1306
1307 let community_sql = community_ids_sql(&placeholders);
1308 let mut query = zeph_db::query_as::<_, (i64, i64)>(&community_sql);
1309 for id in chunk {
1310 query = query.bind(*id);
1311 }
1312
1313 let rows: Vec<(i64, i64)> = query.fetch_all(&self.pool).await?;
1314 result.extend(rows);
1315 }
1316
1317 Ok(result)
1318 }
1319
1320 pub async fn record_edge_retrieval(&self, edge_ids: &[i64]) -> Result<(), MemoryError> {
1329 const MAX_BATCH: usize = 490;
1330 let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1331 for chunk in edge_ids.chunks(MAX_BATCH) {
1332 let edge_placeholders = placeholder_list(1, chunk.len());
1333 let retrieval_sql = format!(
1334 "UPDATE graph_edges \
1335 SET retrieval_count = retrieval_count + 1, \
1336 last_retrieved_at = {epoch_now} \
1337 WHERE id IN ({edge_placeholders})"
1338 );
1339 let mut q = zeph_db::query(&retrieval_sql);
1340 for id in chunk {
1341 q = q.bind(*id);
1342 }
1343 q.execute(&self.pool).await?;
1344 }
1345 Ok(())
1346 }
1347
1348 pub async fn decay_edge_retrieval_counts(
1357 &self,
1358 decay_lambda: f64,
1359 interval_secs: u64,
1360 ) -> Result<usize, MemoryError> {
1361 let epoch_now_decay = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1362 let decay_raw = format!(
1363 "UPDATE graph_edges \
1364 SET retrieval_count = MAX(CAST(retrieval_count * ? AS INTEGER), 0) \
1365 WHERE valid_to IS NULL \
1366 AND retrieval_count > 0 \
1367 AND (last_retrieved_at IS NULL OR last_retrieved_at < {epoch_now_decay} - ?)"
1368 );
1369 let decay_sql = zeph_db::rewrite_placeholders(&decay_raw);
1370 let result = zeph_db::query(&decay_sql)
1371 .bind(decay_lambda)
1372 .bind(i64::try_from(interval_secs).unwrap_or(i64::MAX))
1373 .execute(&self.pool)
1374 .await?;
1375 Ok(usize::try_from(result.rows_affected())?)
1376 }
1377
1378 pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
1384 let days = i64::from(retention_days);
1385 let result = zeph_db::query(sql!(
1386 "DELETE FROM graph_edges
1387 WHERE expired_at IS NOT NULL
1388 AND expired_at < datetime('now', '-' || ? || ' days')"
1389 ))
1390 .bind(days)
1391 .execute(&self.pool)
1392 .await?;
1393 Ok(usize::try_from(result.rows_affected())?)
1394 }
1395
1396 pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
1402 let days = i64::from(retention_days);
1403 let result = zeph_db::query(sql!(
1404 "DELETE FROM graph_entities
1405 WHERE id NOT IN (
1406 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
1407 UNION
1408 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
1409 )
1410 AND last_seen_at < datetime('now', '-' || ? || ' days')"
1411 ))
1412 .bind(days)
1413 .execute(&self.pool)
1414 .await?;
1415 Ok(usize::try_from(result.rows_affected())?)
1416 }
1417
1418 pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1427 let current = self.entity_count().await?;
1428 let max = i64::try_from(max_entities)?;
1429 if current <= max {
1430 return Ok(0);
1431 }
1432 let excess = current - max;
1433 let result = zeph_db::query(sql!(
1434 "DELETE FROM graph_entities
1435 WHERE id IN (
1436 SELECT e.id
1437 FROM graph_entities e
1438 LEFT JOIN (
1439 SELECT source_entity_id AS eid, COUNT(*) AS cnt
1440 FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1441 UNION ALL
1442 SELECT target_entity_id AS eid, COUNT(*) AS cnt
1443 FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1444 ) edge_counts ON e.id = edge_counts.eid
1445 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1446 LIMIT ?
1447 )"
1448 ))
1449 .bind(excess)
1450 .execute(&self.pool)
1451 .await?;
1452 Ok(usize::try_from(result.rows_affected())?)
1453 }
1454
1455 pub async fn edges_at_timestamp(
1469 &self,
1470 entity_id: i64,
1471 timestamp: &str,
1472 ) -> Result<Vec<Edge>, MemoryError> {
1473 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1477 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1478 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1479 edge_type, retrieval_count, last_retrieved_at, superseded_by
1480 FROM graph_edges
1481 WHERE valid_to IS NULL
1482 AND valid_from <= ?
1483 AND (source_entity_id = ? OR target_entity_id = ?)
1484 UNION ALL
1485 SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1486 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1487 edge_type, retrieval_count, last_retrieved_at, superseded_by
1488 FROM graph_edges
1489 WHERE valid_to IS NOT NULL
1490 AND valid_from <= ?
1491 AND valid_to > ?
1492 AND (source_entity_id = ? OR target_entity_id = ?)"
1493 ))
1494 .bind(timestamp)
1495 .bind(entity_id)
1496 .bind(entity_id)
1497 .bind(timestamp)
1498 .bind(timestamp)
1499 .bind(entity_id)
1500 .bind(entity_id)
1501 .fetch_all(&self.pool)
1502 .await?;
1503 Ok(rows.into_iter().map(edge_from_row).collect())
1504 }
1505
1506 pub async fn edge_history(
1515 &self,
1516 source_entity_id: i64,
1517 predicate: &str,
1518 relation: Option<&str>,
1519 limit: usize,
1520 ) -> Result<Vec<Edge>, MemoryError> {
1521 let escaped = predicate
1523 .replace('\\', "\\\\")
1524 .replace('%', "\\%")
1525 .replace('_', "\\_");
1526 let like_pattern = format!("%{escaped}%");
1527 let limit = i64::try_from(limit)?;
1528 let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1529 zeph_db::query_as(sql!(
1530 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1531 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1532 edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
1533 FROM graph_edges
1534 WHERE source_entity_id = ?
1535 AND fact LIKE ? ESCAPE '\\'
1536 AND relation = ?
1537 ORDER BY valid_from DESC
1538 LIMIT ?"
1539 ))
1540 .bind(source_entity_id)
1541 .bind(&like_pattern)
1542 .bind(rel)
1543 .bind(limit)
1544 .fetch_all(&self.pool)
1545 .await?
1546 } else {
1547 zeph_db::query_as(sql!(
1548 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1549 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1550 edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
1551 FROM graph_edges
1552 WHERE source_entity_id = ?
1553 AND fact LIKE ? ESCAPE '\\'
1554 ORDER BY valid_from DESC
1555 LIMIT ?"
1556 ))
1557 .bind(source_entity_id)
1558 .bind(&like_pattern)
1559 .bind(limit)
1560 .fetch_all(&self.pool)
1561 .await?
1562 };
1563 Ok(rows.into_iter().map(edge_from_row).collect())
1564 }
1565
1566 pub async fn bfs(
1583 &self,
1584 start_entity_id: i64,
1585 max_hops: u32,
1586 ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1587 self.bfs_with_depth(start_entity_id, max_hops)
1588 .await
1589 .map(|(e, ed, _)| (e, ed))
1590 }
1591
1592 pub async fn bfs_with_depth(
1603 &self,
1604 start_entity_id: i64,
1605 max_hops: u32,
1606 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1607 self.bfs_core(start_entity_id, max_hops, None).await
1608 }
1609
1610 pub async fn bfs_at_timestamp(
1621 &self,
1622 start_entity_id: i64,
1623 max_hops: u32,
1624 timestamp: &str,
1625 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1626 self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1627 .await
1628 }
1629
1630 pub async fn bfs_typed(
1646 &self,
1647 start_entity_id: i64,
1648 max_hops: u32,
1649 edge_types: &[EdgeType],
1650 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1651 if edge_types.is_empty() {
1652 return self.bfs_with_depth(start_entity_id, max_hops).await;
1653 }
1654 self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1655 .await
1656 }
1657
1658 async fn bfs_core(
1666 &self,
1667 start_entity_id: i64,
1668 max_hops: u32,
1669 at_timestamp: Option<&str>,
1670 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1671 use std::collections::HashMap;
1672
1673 const MAX_FRONTIER: usize = 300;
1676
1677 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1678 let mut frontier: Vec<i64> = vec![start_entity_id];
1679 depth_map.insert(start_entity_id, 0);
1680
1681 for hop in 0..max_hops {
1682 if frontier.is_empty() {
1683 break;
1684 }
1685 frontier.truncate(MAX_FRONTIER);
1686 let n = frontier.len();
1690 let ph1 = placeholder_list(1, n);
1691 let ph2 = placeholder_list(n + 1, n);
1692 let ph3 = placeholder_list(n * 2 + 1, n);
1693 let edge_filter = if at_timestamp.is_some() {
1694 let ts_pos = n * 3 + 1;
1695 format!(
1696 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1697 ts = numbered_placeholder(ts_pos),
1698 )
1699 } else {
1700 "valid_to IS NULL".to_owned()
1701 };
1702 let neighbour_sql = format!(
1703 "SELECT DISTINCT CASE
1704 WHEN source_entity_id IN ({ph1}) THEN target_entity_id
1705 ELSE source_entity_id
1706 END as neighbour_id
1707 FROM graph_edges
1708 WHERE {edge_filter}
1709 AND (source_entity_id IN ({ph2}) OR target_entity_id IN ({ph3}))"
1710 );
1711 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1712 for id in &frontier {
1713 q = q.bind(*id);
1714 }
1715 for id in &frontier {
1716 q = q.bind(*id);
1717 }
1718 for id in &frontier {
1719 q = q.bind(*id);
1720 }
1721 if let Some(ts) = at_timestamp {
1722 q = q.bind(ts);
1723 }
1724 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1725 let mut next_frontier: Vec<i64> = Vec::new();
1726 for nbr in neighbours {
1727 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1728 e.insert(hop + 1);
1729 next_frontier.push(nbr);
1730 }
1731 }
1732 frontier = next_frontier;
1733 }
1734
1735 self.bfs_fetch_results(depth_map, at_timestamp).await
1736 }
1737
1738 async fn bfs_core_typed(
1747 &self,
1748 start_entity_id: i64,
1749 max_hops: u32,
1750 at_timestamp: Option<&str>,
1751 edge_types: &[EdgeType],
1752 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1753 use std::collections::HashMap;
1754
1755 const MAX_FRONTIER: usize = 300;
1756
1757 let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1758
1759 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1760 let mut frontier: Vec<i64> = vec![start_entity_id];
1761 depth_map.insert(start_entity_id, 0);
1762
1763 let n_types = type_strs.len();
1764 let type_in = placeholder_list(1, n_types);
1766 let id_start = n_types + 1;
1767
1768 for hop in 0..max_hops {
1769 if frontier.is_empty() {
1770 break;
1771 }
1772 frontier.truncate(MAX_FRONTIER);
1773
1774 let n_frontier = frontier.len();
1775 let fp1 = placeholder_list(id_start, n_frontier);
1777 let fp2 = placeholder_list(id_start + n_frontier, n_frontier);
1778 let fp3 = placeholder_list(id_start + n_frontier * 2, n_frontier);
1779
1780 let edge_filter = if at_timestamp.is_some() {
1781 let ts_pos = id_start + n_frontier * 3;
1782 format!(
1783 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1784 ts = numbered_placeholder(ts_pos),
1785 )
1786 } else {
1787 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1788 };
1789
1790 let neighbour_sql = format!(
1791 "SELECT DISTINCT CASE
1792 WHEN source_entity_id IN ({fp1}) THEN target_entity_id
1793 ELSE source_entity_id
1794 END as neighbour_id
1795 FROM graph_edges
1796 WHERE {edge_filter}
1797 AND (source_entity_id IN ({fp2}) OR target_entity_id IN ({fp3}))"
1798 );
1799
1800 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1801 for t in &type_strs {
1803 q = q.bind(*t);
1804 }
1805 for id in &frontier {
1807 q = q.bind(*id);
1808 }
1809 for id in &frontier {
1810 q = q.bind(*id);
1811 }
1812 for id in &frontier {
1813 q = q.bind(*id);
1814 }
1815 if let Some(ts) = at_timestamp {
1816 q = q.bind(ts);
1817 }
1818
1819 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1820 let mut next_frontier: Vec<i64> = Vec::new();
1821 for nbr in neighbours {
1822 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1823 e.insert(hop + 1);
1824 next_frontier.push(nbr);
1825 }
1826 }
1827 frontier = next_frontier;
1828 }
1829
1830 self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1832 .await
1833 }
1834
1835 async fn bfs_fetch_results_typed(
1843 &self,
1844 depth_map: std::collections::HashMap<i64, u32>,
1845 at_timestamp: Option<&str>,
1846 type_strs: &[&str],
1847 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1848 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1849 if visited_ids.is_empty() {
1850 return Ok((Vec::new(), Vec::new(), depth_map));
1851 }
1852 if visited_ids.len() > 499 {
1853 tracing::warn!(
1854 total = visited_ids.len(),
1855 retained = 499,
1856 "bfs_fetch_results_typed: visited entity set truncated to 499"
1857 );
1858 visited_ids.truncate(499);
1859 }
1860
1861 let n_types = type_strs.len();
1862 let n_visited = visited_ids.len();
1863
1864 let type_in = placeholder_list(1, n_types);
1866 let id_start = n_types + 1;
1867 let ph_ids1 = placeholder_list(id_start, n_visited);
1868 let ph_ids2 = placeholder_list(id_start + n_visited, n_visited);
1869
1870 let edge_filter = if at_timestamp.is_some() {
1871 let ts_pos = id_start + n_visited * 2;
1872 format!(
1873 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1874 ts = numbered_placeholder(ts_pos),
1875 )
1876 } else {
1877 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1878 };
1879
1880 let edge_sql = format!(
1881 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1882 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1883 edge_type, retrieval_count, last_retrieved_at, superseded_by
1884 FROM graph_edges
1885 WHERE {edge_filter}
1886 AND source_entity_id IN ({ph_ids1})
1887 AND target_entity_id IN ({ph_ids2})"
1888 );
1889 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1890 for t in type_strs {
1891 edge_query = edge_query.bind(*t);
1892 }
1893 for id in &visited_ids {
1894 edge_query = edge_query.bind(*id);
1895 }
1896 for id in &visited_ids {
1897 edge_query = edge_query.bind(*id);
1898 }
1899 if let Some(ts) = at_timestamp {
1900 edge_query = edge_query.bind(ts);
1901 }
1902 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1903
1904 let entity_sql2 = format!(
1906 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1907 FROM graph_entities WHERE id IN ({ph})",
1908 ph = placeholder_list(1, visited_ids.len()),
1909 );
1910 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql2);
1911 for id in &visited_ids {
1912 entity_query = entity_query.bind(*id);
1913 }
1914 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1915
1916 let entities: Vec<Entity> = entity_rows
1917 .into_iter()
1918 .map(entity_from_row)
1919 .collect::<Result<Vec<_>, _>>()?;
1920 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1921
1922 Ok((entities, edges, depth_map))
1923 }
1924
1925 async fn bfs_fetch_results(
1927 &self,
1928 depth_map: std::collections::HashMap<i64, u32>,
1929 at_timestamp: Option<&str>,
1930 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1931 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1932 if visited_ids.is_empty() {
1933 return Ok((Vec::new(), Vec::new(), depth_map));
1934 }
1935 if visited_ids.len() > 499 {
1937 tracing::warn!(
1938 total = visited_ids.len(),
1939 retained = 499,
1940 "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1941 some reachable entities will be dropped from results"
1942 );
1943 visited_ids.truncate(499);
1944 }
1945
1946 let n = visited_ids.len();
1947 let ph_ids1 = placeholder_list(1, n);
1948 let ph_ids2 = placeholder_list(n + 1, n);
1949 let edge_filter = if at_timestamp.is_some() {
1950 let ts_pos = n * 2 + 1;
1951 format!(
1952 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1953 ts = numbered_placeholder(ts_pos),
1954 )
1955 } else {
1956 "valid_to IS NULL".to_owned()
1957 };
1958 let edge_sql = format!(
1959 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1960 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1961 edge_type, retrieval_count, last_retrieved_at, superseded_by
1962 FROM graph_edges
1963 WHERE {edge_filter}
1964 AND source_entity_id IN ({ph_ids1})
1965 AND target_entity_id IN ({ph_ids2})"
1966 );
1967 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1968 for id in &visited_ids {
1969 edge_query = edge_query.bind(*id);
1970 }
1971 for id in &visited_ids {
1972 edge_query = edge_query.bind(*id);
1973 }
1974 if let Some(ts) = at_timestamp {
1975 edge_query = edge_query.bind(ts);
1976 }
1977 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1978
1979 let entity_sql = format!(
1980 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1981 FROM graph_entities WHERE id IN ({ph})",
1982 ph = placeholder_list(1, n),
1983 );
1984 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql);
1985 for id in &visited_ids {
1986 entity_query = entity_query.bind(*id);
1987 }
1988 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1989
1990 let entities: Vec<Entity> = entity_rows
1991 .into_iter()
1992 .map(entity_from_row)
1993 .collect::<Result<Vec<_>, _>>()?;
1994 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1995
1996 Ok((entities, edges, depth_map))
1997 }
1998
1999 pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
2015 let find_by_name_sql = format!(
2016 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id \
2017 FROM graph_entities \
2018 WHERE name = ? {cn} OR canonical_name = ? {cn} \
2019 LIMIT 5",
2020 cn = <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
2021 );
2022 let rows: Vec<EntityRow> = zeph_db::query_as(&find_by_name_sql)
2023 .bind(name)
2024 .bind(name)
2025 .fetch_all(&self.pool)
2026 .await?;
2027
2028 if !rows.is_empty() {
2029 return rows.into_iter().map(entity_from_row).collect();
2030 }
2031
2032 self.find_entities_fuzzy(name, 5).await
2033 }
2034
2035 pub async fn unprocessed_messages_for_backfill(
2043 &self,
2044 limit: usize,
2045 ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
2046 let limit = i64::try_from(limit)?;
2047 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
2048 "SELECT id, content FROM messages
2049 WHERE graph_processed = 0
2050 ORDER BY id ASC
2051 LIMIT ?"
2052 ))
2053 .bind(limit)
2054 .fetch_all(&self.pool)
2055 .await?;
2056 Ok(rows
2057 .into_iter()
2058 .map(|(id, content)| (crate::types::MessageId(id), content))
2059 .collect())
2060 }
2061
2062 pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
2068 let count: i64 = zeph_db::query_scalar(sql!(
2069 "SELECT COUNT(*) FROM messages WHERE graph_processed = 0"
2070 ))
2071 .fetch_one(&self.pool)
2072 .await?;
2073 Ok(count)
2074 }
2075
2076 pub async fn mark_messages_graph_processed(
2082 &self,
2083 ids: &[crate::types::MessageId],
2084 ) -> Result<(), MemoryError> {
2085 const MAX_BATCH: usize = 490;
2086 if ids.is_empty() {
2087 return Ok(());
2088 }
2089 for chunk in ids.chunks(MAX_BATCH) {
2090 let placeholders = placeholder_list(1, chunk.len());
2091 let sql =
2092 format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
2093 let mut query = zeph_db::query(&sql);
2094 for id in chunk {
2095 query = query.bind(id.0);
2096 }
2097 query.execute(&self.pool).await?;
2098 }
2099 Ok(())
2100 }
2101}
2102
2103#[cfg(feature = "sqlite")]
2106fn community_ids_sql(placeholders: &str) -> String {
2107 format!(
2108 "SELECT CAST(j.value AS INTEGER) AS entity_id, c.id AS community_id
2109 FROM graph_communities c, json_each(c.entity_ids) j
2110 WHERE CAST(j.value AS INTEGER) IN ({placeholders})"
2111 )
2112}
2113
2114#[cfg(feature = "postgres")]
2115fn community_ids_sql(placeholders: &str) -> String {
2116 format!(
2117 "SELECT (j.value)::bigint AS entity_id, c.id AS community_id
2118 FROM graph_communities c,
2119 jsonb_array_elements_text(c.entity_ids::jsonb) j(value)
2120 WHERE (j.value)::bigint IN ({placeholders})"
2121 )
2122}
2123
2124#[derive(zeph_db::FromRow)]
2127struct EntityRow {
2128 id: i64,
2129 name: String,
2130 canonical_name: String,
2131 entity_type: String,
2132 summary: Option<String>,
2133 first_seen_at: String,
2134 last_seen_at: String,
2135 qdrant_point_id: Option<String>,
2136}
2137
2138fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
2139 let entity_type = row
2140 .entity_type
2141 .parse::<EntityType>()
2142 .map_err(MemoryError::GraphStore)?;
2143 Ok(Entity {
2144 id: row.id,
2145 name: row.name,
2146 canonical_name: row.canonical_name,
2147 entity_type,
2148 summary: row.summary,
2149 first_seen_at: row.first_seen_at,
2150 last_seen_at: row.last_seen_at,
2151 qdrant_point_id: row.qdrant_point_id,
2152 })
2153}
2154
2155#[derive(zeph_db::FromRow)]
2156struct AliasRow {
2157 id: i64,
2158 entity_id: i64,
2159 alias_name: String,
2160 created_at: String,
2161}
2162
2163fn alias_from_row(row: AliasRow) -> EntityAlias {
2164 EntityAlias {
2165 id: row.id,
2166 entity_id: row.entity_id,
2167 alias_name: row.alias_name,
2168 created_at: row.created_at,
2169 }
2170}
2171
2172#[derive(zeph_db::FromRow)]
2173struct EdgeRow {
2174 id: i64,
2175 source_entity_id: i64,
2176 target_entity_id: i64,
2177 relation: String,
2178 fact: String,
2179 confidence: f64,
2180 valid_from: String,
2181 valid_to: Option<String>,
2182 created_at: String,
2183 expired_at: Option<String>,
2184 #[sqlx(rename = "episode_id")]
2185 source_message_id: Option<i64>,
2186 qdrant_point_id: Option<String>,
2187 edge_type: String,
2188 retrieval_count: i32,
2189 last_retrieved_at: Option<i64>,
2190 superseded_by: Option<i64>,
2191}
2192
2193fn edge_from_row(row: EdgeRow) -> Edge {
2194 let edge_type = row
2195 .edge_type
2196 .parse::<EdgeType>()
2197 .unwrap_or(EdgeType::Semantic);
2198 Edge {
2199 id: row.id,
2200 source_entity_id: row.source_entity_id,
2201 target_entity_id: row.target_entity_id,
2202 relation: row.relation,
2203 fact: row.fact,
2204 #[allow(clippy::cast_possible_truncation)]
2205 confidence: row.confidence as f32,
2206 valid_from: row.valid_from,
2207 valid_to: row.valid_to,
2208 created_at: row.created_at,
2209 expired_at: row.expired_at,
2210 source_message_id: row.source_message_id.map(MessageId),
2211 qdrant_point_id: row.qdrant_point_id,
2212 edge_type,
2213 retrieval_count: row.retrieval_count,
2214 last_retrieved_at: row.last_retrieved_at,
2215 superseded_by: row.superseded_by,
2216 }
2217}
2218
2219#[derive(zeph_db::FromRow)]
2220struct CommunityRow {
2221 id: i64,
2222 name: String,
2223 summary: String,
2224 entity_ids: String,
2225 fingerprint: Option<String>,
2226 created_at: String,
2227 updated_at: String,
2228}
2229
2230impl GraphStore {
2233 pub async fn ensure_episode(&self, conversation_id: i64) -> Result<i64, MemoryError> {
2241 zeph_db::query(sql!("INSERT OR IGNORE INTO conversations (id) VALUES (?)"))
2245 .bind(conversation_id)
2246 .execute(&self.pool)
2247 .await?;
2248
2249 let id: i64 = zeph_db::query_scalar(sql!(
2250 "INSERT INTO graph_episodes (conversation_id)
2251 VALUES (?)
2252 ON CONFLICT(conversation_id) DO UPDATE SET conversation_id = excluded.conversation_id
2253 RETURNING id"
2254 ))
2255 .bind(conversation_id)
2256 .fetch_one(&self.pool)
2257 .await?;
2258 Ok(id)
2259 }
2260
2261 pub async fn link_entity_to_episode(
2269 &self,
2270 episode_id: i64,
2271 entity_id: i64,
2272 ) -> Result<(), MemoryError> {
2273 zeph_db::query(sql!(
2274 "INSERT OR IGNORE INTO graph_episode_entities (episode_id, entity_id)
2275 VALUES (?, ?)"
2276 ))
2277 .bind(episode_id)
2278 .bind(entity_id)
2279 .execute(&self.pool)
2280 .await?;
2281 Ok(())
2282 }
2283
2284 pub async fn episodes_for_entity(
2290 &self,
2291 entity_id: i64,
2292 ) -> Result<Vec<super::types::Episode>, MemoryError> {
2293 #[derive(zeph_db::FromRow)]
2294 struct EpisodeRow {
2295 id: i64,
2296 conversation_id: i64,
2297 created_at: String,
2298 closed_at: Option<String>,
2299 }
2300 let rows: Vec<EpisodeRow> = zeph_db::query_as(sql!(
2301 "SELECT e.id, e.conversation_id, e.created_at, e.closed_at
2302 FROM graph_episodes e
2303 JOIN graph_episode_entities ee ON ee.episode_id = e.id
2304 WHERE ee.entity_id = ?"
2305 ))
2306 .bind(entity_id)
2307 .fetch_all(&self.pool)
2308 .await?;
2309 Ok(rows
2310 .into_iter()
2311 .map(|r| super::types::Episode {
2312 id: r.id,
2313 conversation_id: r.conversation_id,
2314 created_at: r.created_at,
2315 closed_at: r.closed_at,
2316 })
2317 .collect())
2318 }
2319}
2320
2321#[cfg(test)]
2324mod tests;