1use std::collections::HashMap;
11#[allow(unused_imports)]
12use zeph_db::sql;
13
14use futures::Stream;
15use zeph_db::fts::sanitize_fts_query;
16use zeph_db::{ActiveDialect, DbPool, numbered_placeholder, placeholder_list};
17
18use crate::error::MemoryError;
19use crate::graph::conflict::{ApexMetrics, SUPERSEDE_DEPTH_CAP};
20use crate::types::MessageId;
21
22use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
23
24pub struct GraphStore {
31 pool: DbPool,
32}
33
34impl GraphStore {
35 #[must_use]
39 pub fn new(pool: DbPool) -> Self {
40 Self { pool }
41 }
42
43 #[must_use]
45 pub fn pool(&self) -> &DbPool {
46 &self.pool
47 }
48
49 pub async fn upsert_entity(
62 &self,
63 surface_name: &str,
64 canonical_name: &str,
65 entity_type: EntityType,
66 summary: Option<&str>,
67 ) -> Result<i64, MemoryError> {
68 let type_str = entity_type.as_str();
69 let id: i64 = zeph_db::query_scalar(sql!(
70 "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
71 VALUES (?, ?, ?, ?)
72 ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
73 name = excluded.name,
74 summary = COALESCE(excluded.summary, summary),
75 last_seen_at = CURRENT_TIMESTAMP
76 RETURNING id"
77 ))
78 .bind(surface_name)
79 .bind(canonical_name)
80 .bind(type_str)
81 .bind(summary)
82 .fetch_one(&self.pool)
83 .await?;
84 Ok(id)
85 }
86
87 pub async fn find_entity(
93 &self,
94 canonical_name: &str,
95 entity_type: EntityType,
96 ) -> Result<Option<Entity>, MemoryError> {
97 let type_str = entity_type.as_str();
98 let row: Option<EntityRow> = zeph_db::query_as(
99 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
100 FROM graph_entities
101 WHERE canonical_name = ? AND entity_type = ?"),
102 )
103 .bind(canonical_name)
104 .bind(type_str)
105 .fetch_optional(&self.pool)
106 .await?;
107 row.map(entity_from_row).transpose()
108 }
109
110 pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
116 let row: Option<EntityRow> = zeph_db::query_as(
117 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
118 FROM graph_entities
119 WHERE id = ?"),
120 )
121 .bind(entity_id)
122 .fetch_optional(&self.pool)
123 .await?;
124 row.map(entity_from_row).transpose()
125 }
126
127 pub async fn set_entity_qdrant_point_id(
133 &self,
134 entity_id: i64,
135 point_id: &str,
136 ) -> Result<(), MemoryError> {
137 zeph_db::query(sql!(
138 "UPDATE graph_entities SET qdrant_point_id = ? WHERE id = ?"
139 ))
140 .bind(point_id)
141 .bind(entity_id)
142 .execute(&self.pool)
143 .await?;
144 Ok(())
145 }
146
147 pub async fn find_entities_fuzzy(
168 &self,
169 query: &str,
170 limit: usize,
171 ) -> Result<Vec<Entity>, MemoryError> {
172 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
176 let query = &query[..query.floor_char_boundary(512)];
177 let sanitized = sanitize_fts_query(query);
180 if sanitized.is_empty() {
181 return Ok(vec![]);
182 }
183 let fts_query: String = sanitized
184 .split_whitespace()
185 .filter(|t| !FTS5_OPERATORS.contains(t))
186 .map(|t| format!("{t}*"))
187 .collect::<Vec<_>>()
188 .join(" ");
189 if fts_query.is_empty() {
190 return Ok(vec![]);
191 }
192
193 let limit = i64::try_from(limit)?;
194 let search_sql = format!(
197 "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
198 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
199 FROM graph_entities_fts fts \
200 JOIN graph_entities e ON e.id = fts.rowid \
201 WHERE graph_entities_fts MATCH ? \
202 UNION \
203 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
204 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
205 FROM graph_entity_aliases a \
206 JOIN graph_entities e ON e.id = a.entity_id \
207 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
208 LIMIT ?",
209 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
210 );
211 let rows: Vec<EntityRow> = zeph_db::query_as(&search_sql)
212 .bind(&fts_query)
213 .bind(format!(
214 "%{}%",
215 query
216 .trim()
217 .replace('\\', "\\\\")
218 .replace('%', "\\%")
219 .replace('_', "\\_")
220 ))
221 .bind(limit)
222 .fetch_all(&self.pool)
223 .await?;
224 rows.into_iter()
225 .map(entity_from_row)
226 .collect::<Result<Vec<_>, _>>()
227 }
228
229 #[cfg(feature = "sqlite")]
239 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
240 zeph_db::query("PRAGMA wal_checkpoint(PASSIVE)")
241 .execute(&self.pool)
242 .await?;
243 Ok(())
244 }
245
246 #[cfg(feature = "postgres")]
252 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
253 Ok(())
254 }
255
256 pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
258 use futures::StreamExt as _;
259 zeph_db::query_as::<_, EntityRow>(
260 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
261 FROM graph_entities ORDER BY id ASC"),
262 )
263 .fetch(&self.pool)
264 .map(|r: Result<EntityRow, zeph_db::SqlxError>| {
265 r.map_err(MemoryError::from).and_then(entity_from_row)
266 })
267 }
268
269 pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
277 let insert_alias_sql = format!(
278 "{} INTO graph_entity_aliases (entity_id, alias_name) VALUES (?, ?){}",
279 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
280 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
281 );
282 zeph_db::query(&insert_alias_sql)
283 .bind(entity_id)
284 .bind(alias_name)
285 .execute(&self.pool)
286 .await?;
287 Ok(())
288 }
289
290 pub async fn find_entity_by_alias(
298 &self,
299 alias_name: &str,
300 entity_type: EntityType,
301 ) -> Result<Option<Entity>, MemoryError> {
302 let type_str = entity_type.as_str();
303 let alias_typed_sql = format!(
304 "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
305 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
306 FROM graph_entity_aliases a \
307 JOIN graph_entities e ON e.id = a.entity_id \
308 WHERE a.alias_name = ? {} \
309 AND e.entity_type = ? \
310 ORDER BY e.id ASC \
311 LIMIT 1",
312 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
313 );
314 let row: Option<EntityRow> = zeph_db::query_as(&alias_typed_sql)
315 .bind(alias_name)
316 .bind(type_str)
317 .fetch_optional(&self.pool)
318 .await?;
319 row.map(entity_from_row).transpose()
320 }
321
322 pub async fn aliases_for_entity(
328 &self,
329 entity_id: i64,
330 ) -> Result<Vec<EntityAlias>, MemoryError> {
331 let rows: Vec<AliasRow> = zeph_db::query_as(sql!(
332 "SELECT id, entity_id, alias_name, created_at
333 FROM graph_entity_aliases
334 WHERE entity_id = ?
335 ORDER BY id ASC"
336 ))
337 .bind(entity_id)
338 .fetch_all(&self.pool)
339 .await?;
340 Ok(rows.into_iter().map(alias_from_row).collect())
341 }
342
343 pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
349 use futures::TryStreamExt as _;
350 self.all_entities_stream().try_collect().await
351 }
352
353 pub async fn entity_count(&self) -> Result<i64, MemoryError> {
359 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_entities"))
360 .fetch_one(&self.pool)
361 .await?;
362 Ok(count)
363 }
364
365 pub async fn insert_edge(
383 &self,
384 source_entity_id: i64,
385 target_entity_id: i64,
386 relation: &str,
387 fact: &str,
388 confidence: f32,
389 episode_id: Option<MessageId>,
390 ) -> Result<i64, MemoryError> {
391 self.insert_edge_typed(
392 source_entity_id,
393 target_entity_id,
394 relation,
395 fact,
396 confidence,
397 episode_id,
398 EdgeType::Semantic,
399 )
400 .await
401 }
402
403 #[allow(clippy::too_many_arguments)]
412 pub async fn insert_edge_typed(
413 &self,
414 source_entity_id: i64,
415 target_entity_id: i64,
416 relation: &str,
417 fact: &str,
418 confidence: f32,
419 episode_id: Option<MessageId>,
420 edge_type: EdgeType,
421 ) -> Result<i64, MemoryError> {
422 if source_entity_id == target_entity_id {
423 return Err(MemoryError::InvalidInput(format!(
424 "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
425 )));
426 }
427 let confidence = confidence.clamp(0.0, 1.0);
428 let edge_type_str = edge_type.as_str();
429
430 let mut tx = zeph_db::begin(&self.pool).await?;
435
436 let existing: Option<(i64, f64)> = zeph_db::query_as(sql!(
437 "SELECT id, confidence FROM graph_edges
438 WHERE source_entity_id = ?
439 AND target_entity_id = ?
440 AND relation = ?
441 AND edge_type = ?
442 AND valid_to IS NULL
443 LIMIT 1"
444 ))
445 .bind(source_entity_id)
446 .bind(target_entity_id)
447 .bind(relation)
448 .bind(edge_type_str)
449 .fetch_optional(&mut *tx)
450 .await?;
451
452 if let Some((existing_id, stored_conf)) = existing {
453 let updated_conf = f64::from(confidence).max(stored_conf);
454 zeph_db::query(sql!("UPDATE graph_edges SET confidence = ? WHERE id = ?"))
455 .bind(updated_conf)
456 .bind(existing_id)
457 .execute(&mut *tx)
458 .await?;
459 tx.commit().await?;
460 return Ok(existing_id);
461 }
462
463 let episode_raw: Option<i64> = episode_id.map(|m| m.0);
464 let id: i64 = zeph_db::query_scalar(sql!(
465 "INSERT INTO graph_edges
466 (source_entity_id, target_entity_id, relation, fact, confidence, episode_id, edge_type)
467 VALUES (?, ?, ?, ?, ?, ?, ?)
468 RETURNING id"
469 ))
470 .bind(source_entity_id)
471 .bind(target_entity_id)
472 .bind(relation)
473 .bind(fact)
474 .bind(f64::from(confidence))
475 .bind(episode_raw)
476 .bind(edge_type_str)
477 .fetch_one(&mut *tx)
478 .await?;
479 tx.commit().await?;
480 Ok(id)
481 }
482
483 pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
489 zeph_db::query(sql!(
490 "UPDATE graph_edges SET valid_to = CURRENT_TIMESTAMP, expired_at = CURRENT_TIMESTAMP
491 WHERE id = ?"
492 ))
493 .bind(edge_id)
494 .execute(&self.pool)
495 .await?;
496 Ok(())
497 }
498
499 pub async fn invalidate_edge_with_supersession(
507 &self,
508 old_edge_id: i64,
509 new_edge_id: i64,
510 ) -> Result<(), MemoryError> {
511 zeph_db::query(sql!(
512 "UPDATE graph_edges
513 SET valid_to = CURRENT_TIMESTAMP,
514 expired_at = CURRENT_TIMESTAMP,
515 superseded_by = ?
516 WHERE id = ?"
517 ))
518 .bind(new_edge_id)
519 .bind(old_edge_id)
520 .execute(&self.pool)
521 .await?;
522 Ok(())
523 }
524
525 pub async fn edges_for_entities(
542 &self,
543 entity_ids: &[i64],
544 edge_types: &[super::types::EdgeType],
545 ) -> Result<Vec<Edge>, MemoryError> {
546 const MAX_BATCH_ENTITIES: usize = 490;
550
551 let mut all_edges: Vec<Edge> = Vec::new();
552
553 for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
554 let edges = self.query_batch_edges(chunk, edge_types).await?;
555 all_edges.extend(edges);
556 }
557
558 Ok(all_edges)
559 }
560
561 async fn query_batch_edges(
569 &self,
570 entity_ids: &[i64],
571 edge_types: &[super::types::EdgeType],
572 ) -> Result<Vec<Edge>, MemoryError> {
573 if entity_ids.is_empty() {
574 return Ok(Vec::new());
575 }
576
577 let n_ids = entity_ids.len();
580 let n_types = edge_types.len();
581
582 let sql = if n_types == 0 {
583 let placeholders = placeholder_list(1, n_ids);
585 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
586 format!(
587 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
588 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
589 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes
590 FROM graph_edges
591 WHERE valid_to IS NULL
592 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))"
593 )
594 } else {
595 let placeholders = placeholder_list(1, n_ids);
596 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
597 let type_placeholders = placeholder_list(n_ids * 2 + 1, n_types);
598 format!(
599 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
600 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
601 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes
602 FROM graph_edges
603 WHERE valid_to IS NULL
604 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))
605 AND edge_type IN ({type_placeholders})"
606 )
607 };
608
609 let mut query = zeph_db::query_as::<_, EdgeRow>(&sql);
611 for id in entity_ids {
612 query = query.bind(*id);
613 }
614 for id in entity_ids {
615 query = query.bind(*id);
616 }
617 for et in edge_types {
618 query = query.bind(et.as_str());
619 }
620
621 let rows: Vec<EdgeRow> = tokio::time::timeout(
625 std::time::Duration::from_millis(500),
626 query.fetch_all(&self.pool),
627 )
628 .await
629 .map_err(|_| MemoryError::Timeout("graph pool acquire timed out after 500ms".into()))??;
630 Ok(rows.into_iter().map(edge_from_row).collect())
631 }
632
633 pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
639 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
640 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
641 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
642 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes
643 FROM graph_edges
644 WHERE valid_to IS NULL
645 AND (source_entity_id = ? OR target_entity_id = ?)"
646 ))
647 .bind(entity_id)
648 .bind(entity_id)
649 .fetch_all(&self.pool)
650 .await?;
651 Ok(rows.into_iter().map(edge_from_row).collect())
652 }
653
654 pub async fn edge_history_for_entity(
661 &self,
662 entity_id: i64,
663 limit: usize,
664 ) -> Result<Vec<Edge>, MemoryError> {
665 let limit = i64::try_from(limit)?;
666 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
667 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
668 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
669 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes
670 FROM graph_edges
671 WHERE source_entity_id = ? OR target_entity_id = ?
672 ORDER BY valid_from DESC
673 LIMIT ?"
674 ))
675 .bind(entity_id)
676 .bind(entity_id)
677 .bind(limit)
678 .fetch_all(&self.pool)
679 .await?;
680 Ok(rows.into_iter().map(edge_from_row).collect())
681 }
682
683 pub async fn edges_between(
689 &self,
690 entity_a: i64,
691 entity_b: i64,
692 ) -> Result<Vec<Edge>, MemoryError> {
693 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
694 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
695 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
696 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes
697 FROM graph_edges
698 WHERE valid_to IS NULL
699 AND ((source_entity_id = ? AND target_entity_id = ?)
700 OR (source_entity_id = ? AND target_entity_id = ?))"
701 ))
702 .bind(entity_a)
703 .bind(entity_b)
704 .bind(entity_b)
705 .bind(entity_a)
706 .fetch_all(&self.pool)
707 .await?;
708 Ok(rows.into_iter().map(edge_from_row).collect())
709 }
710
711 pub async fn edges_exact(
717 &self,
718 source_entity_id: i64,
719 target_entity_id: i64,
720 ) -> Result<Vec<Edge>, MemoryError> {
721 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
722 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
723 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
724 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes
725 FROM graph_edges
726 WHERE valid_to IS NULL
727 AND source_entity_id = ?
728 AND target_entity_id = ?"
729 ))
730 .bind(source_entity_id)
731 .bind(target_entity_id)
732 .fetch_all(&self.pool)
733 .await?;
734 Ok(rows.into_iter().map(edge_from_row).collect())
735 }
736
737 pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
743 let count: i64 = zeph_db::query_scalar(sql!(
744 "SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL"
745 ))
746 .fetch_one(&self.pool)
747 .await?;
748 Ok(count)
749 }
750
751 pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
757 let rows: Vec<(String, i64)> = zeph_db::query_as(
758 sql!("SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type"),
759 )
760 .fetch_all(&self.pool)
761 .await?;
762 Ok(rows)
763 }
764
765 pub async fn upsert_community(
777 &self,
778 name: &str,
779 summary: &str,
780 entity_ids: &[i64],
781 fingerprint: Option<&str>,
782 ) -> Result<i64, MemoryError> {
783 let entity_ids_json = serde_json::to_string(entity_ids)?;
784 let id: i64 = zeph_db::query_scalar(sql!(
785 "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
786 VALUES (?, ?, ?, ?)
787 ON CONFLICT(name) DO UPDATE SET
788 summary = excluded.summary,
789 entity_ids = excluded.entity_ids,
790 fingerprint = COALESCE(excluded.fingerprint, fingerprint),
791 updated_at = CURRENT_TIMESTAMP
792 RETURNING id"
793 ))
794 .bind(name)
795 .bind(summary)
796 .bind(entity_ids_json)
797 .bind(fingerprint)
798 .fetch_one(&self.pool)
799 .await?;
800 Ok(id)
801 }
802
803 pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
810 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
811 "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL"
812 ))
813 .fetch_all(&self.pool)
814 .await?;
815 Ok(rows.into_iter().collect())
816 }
817
818 pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
824 zeph_db::query(sql!("DELETE FROM graph_communities WHERE id = ?"))
825 .bind(id)
826 .execute(&self.pool)
827 .await?;
828 Ok(())
829 }
830
831 pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
840 zeph_db::query(sql!(
841 "UPDATE graph_communities SET fingerprint = NULL WHERE id = ?"
842 ))
843 .bind(id)
844 .execute(&self.pool)
845 .await?;
846 Ok(())
847 }
848
849 pub async fn community_for_entity(
858 &self,
859 entity_id: i64,
860 ) -> Result<Option<Community>, MemoryError> {
861 let row: Option<CommunityRow> = zeph_db::query_as(
862 sql!("SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
863 FROM graph_communities c, json_each(c.entity_ids) j
864 WHERE CAST(j.value AS INTEGER) = ?
865 LIMIT 1"),
866 )
867 .bind(entity_id)
868 .fetch_optional(&self.pool)
869 .await?;
870 match row {
871 Some(row) => {
872 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
873 Ok(Some(Community {
874 id: row.id,
875 name: row.name,
876 summary: row.summary,
877 entity_ids,
878 fingerprint: row.fingerprint,
879 created_at: row.created_at,
880 updated_at: row.updated_at,
881 }))
882 }
883 None => Ok(None),
884 }
885 }
886
887 pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
893 let rows: Vec<CommunityRow> = zeph_db::query_as(sql!(
894 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
895 FROM graph_communities
896 ORDER BY id ASC"
897 ))
898 .fetch_all(&self.pool)
899 .await?;
900
901 rows.into_iter()
902 .map(|row| {
903 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
904 Ok(Community {
905 id: row.id,
906 name: row.name,
907 summary: row.summary,
908 entity_ids,
909 fingerprint: row.fingerprint,
910 created_at: row.created_at,
911 updated_at: row.updated_at,
912 })
913 })
914 .collect()
915 }
916
917 pub async fn community_count(&self) -> Result<i64, MemoryError> {
923 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_communities"))
924 .fetch_one(&self.pool)
925 .await?;
926 Ok(count)
927 }
928
929 pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
937 let val: Option<String> =
938 zeph_db::query_scalar(sql!("SELECT value FROM graph_metadata WHERE key = ?"))
939 .bind(key)
940 .fetch_optional(&self.pool)
941 .await?;
942 Ok(val)
943 }
944
945 pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
951 zeph_db::query(sql!(
952 "INSERT INTO graph_metadata (key, value) VALUES (?, ?)
953 ON CONFLICT(key) DO UPDATE SET value = excluded.value"
954 ))
955 .bind(key)
956 .bind(value)
957 .execute(&self.pool)
958 .await?;
959 Ok(())
960 }
961
962 pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
970 let val = self.get_metadata("extraction_count").await?;
971 Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
972 }
973
974 pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
976 use futures::StreamExt as _;
977 zeph_db::query_as::<_, EdgeRow>(sql!(
978 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
979 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
980 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes
981 FROM graph_edges
982 WHERE valid_to IS NULL
983 ORDER BY id ASC"
984 ))
985 .fetch(&self.pool)
986 .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
987 }
988
989 pub async fn edges_after_id(
1006 &self,
1007 after_id: i64,
1008 limit: i64,
1009 ) -> Result<Vec<Edge>, MemoryError> {
1010 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1011 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1012 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1013 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes
1014 FROM graph_edges
1015 WHERE valid_to IS NULL AND id > ?
1016 ORDER BY id ASC
1017 LIMIT ?"
1018 ))
1019 .bind(after_id)
1020 .bind(limit)
1021 .fetch_all(&self.pool)
1022 .await?;
1023 Ok(rows.into_iter().map(edge_from_row).collect())
1024 }
1025
1026 pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
1032 let row: Option<CommunityRow> = zeph_db::query_as(sql!(
1033 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
1034 FROM graph_communities
1035 WHERE id = ?"
1036 ))
1037 .bind(id)
1038 .fetch_optional(&self.pool)
1039 .await?;
1040 match row {
1041 Some(row) => {
1042 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
1043 Ok(Some(Community {
1044 id: row.id,
1045 name: row.name,
1046 summary: row.summary,
1047 entity_ids,
1048 fingerprint: row.fingerprint,
1049 created_at: row.created_at,
1050 updated_at: row.updated_at,
1051 }))
1052 }
1053 None => Ok(None),
1054 }
1055 }
1056
1057 pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
1063 zeph_db::query(sql!("DELETE FROM graph_communities"))
1064 .execute(&self.pool)
1065 .await?;
1066 Ok(())
1067 }
1068
1069 #[allow(clippy::too_many_lines)]
1083 pub async fn find_entities_ranked(
1084 &self,
1085 query: &str,
1086 limit: usize,
1087 ) -> Result<Vec<(Entity, f32)>, MemoryError> {
1088 type EntityFtsRow = (
1091 i64,
1092 String,
1093 String,
1094 String,
1095 Option<String>,
1096 String,
1097 String,
1098 Option<String>,
1099 f64,
1100 );
1101
1102 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
1103 let query = &query[..query.floor_char_boundary(512)];
1104 let sanitized = sanitize_fts_query(query);
1105 if sanitized.is_empty() {
1106 return Ok(vec![]);
1107 }
1108 let fts_query: String = sanitized
1109 .split_whitespace()
1110 .filter(|t| !FTS5_OPERATORS.contains(t))
1111 .map(|t| format!("{t}*"))
1112 .collect::<Vec<_>>()
1113 .join(" ");
1114 if fts_query.is_empty() {
1115 return Ok(vec![]);
1116 }
1117
1118 let limit_i64 = i64::try_from(limit)?;
1119
1120 let ranked_fts_sql = format!(
1123 "SELECT * FROM ( \
1124 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1125 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1126 -bm25(graph_entities_fts, 10.0, 1.0) AS fts_rank \
1127 FROM graph_entities_fts fts \
1128 JOIN graph_entities e ON e.id = fts.rowid \
1129 WHERE graph_entities_fts MATCH ? \
1130 UNION ALL \
1131 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1132 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1133 0.5 AS fts_rank \
1134 FROM graph_entity_aliases a \
1135 JOIN graph_entities e ON e.id = a.entity_id \
1136 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
1137 ) \
1138 ORDER BY fts_rank DESC \
1139 LIMIT ?",
1140 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
1141 );
1142 let rows: Vec<EntityFtsRow> = zeph_db::query_as(&ranked_fts_sql)
1143 .bind(&fts_query)
1144 .bind(format!(
1145 "%{}%",
1146 query
1147 .trim()
1148 .replace('\\', "\\\\")
1149 .replace('%', "\\%")
1150 .replace('_', "\\_")
1151 ))
1152 .bind(limit_i64)
1153 .fetch_all(&self.pool)
1154 .await?;
1155
1156 if rows.is_empty() {
1157 return Ok(vec![]);
1158 }
1159
1160 let max_score: f64 = rows.iter().map(|r| r.8).fold(0.0_f64, f64::max);
1162 let max_score = if max_score <= 0.0 { 1.0 } else { max_score };
1163
1164 let mut seen_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
1166 let mut result: Vec<(Entity, f32)> = Vec::with_capacity(rows.len());
1167 for (
1168 id,
1169 name,
1170 canonical_name,
1171 entity_type_str,
1172 summary,
1173 first_seen_at,
1174 last_seen_at,
1175 qdrant_point_id,
1176 raw_score,
1177 ) in rows
1178 {
1179 if !seen_ids.insert(id) {
1180 continue;
1181 }
1182 let entity_type = entity_type_str
1183 .parse()
1184 .unwrap_or(super::types::EntityType::Concept);
1185 let entity = Entity {
1186 id,
1187 name,
1188 canonical_name,
1189 entity_type,
1190 summary,
1191 first_seen_at,
1192 last_seen_at,
1193 qdrant_point_id,
1194 };
1195 #[allow(clippy::cast_possible_truncation)]
1196 let normalized = (raw_score / max_score).clamp(0.0, 1.0) as f32;
1197 result.push((entity, normalized));
1198 }
1199
1200 Ok(result)
1201 }
1202
1203 pub async fn entity_structural_scores(
1213 &self,
1214 entity_ids: &[i64],
1215 ) -> Result<HashMap<i64, f32>, MemoryError> {
1216 const MAX_BATCH: usize = 163;
1219
1220 if entity_ids.is_empty() {
1221 return Ok(HashMap::new());
1222 }
1223
1224 let mut all_rows: Vec<(i64, i64, i64)> = Vec::new();
1225 for chunk in entity_ids.chunks(MAX_BATCH) {
1226 let n = chunk.len();
1227 let ph1 = placeholder_list(1, n);
1229 let ph2 = placeholder_list(n + 1, n);
1230 let ph3 = placeholder_list(n * 2 + 1, n);
1231
1232 let sql = format!(
1234 "SELECT entity_id,
1235 COUNT(*) AS degree,
1236 COUNT(DISTINCT edge_type) AS type_diversity
1237 FROM (
1238 SELECT source_entity_id AS entity_id, edge_type
1239 FROM graph_edges
1240 WHERE valid_to IS NULL AND source_entity_id IN ({ph1})
1241 UNION ALL
1242 SELECT target_entity_id AS entity_id, edge_type
1243 FROM graph_edges
1244 WHERE valid_to IS NULL AND target_entity_id IN ({ph2})
1245 )
1246 WHERE entity_id IN ({ph3})
1247 GROUP BY entity_id"
1248 );
1249
1250 let mut query = zeph_db::query_as::<_, (i64, i64, i64)>(&sql);
1251 for id in chunk {
1253 query = query.bind(*id);
1254 }
1255 for id in chunk {
1256 query = query.bind(*id);
1257 }
1258 for id in chunk {
1259 query = query.bind(*id);
1260 }
1261
1262 let chunk_rows: Vec<(i64, i64, i64)> = query.fetch_all(&self.pool).await?;
1263 all_rows.extend(chunk_rows);
1264 }
1265
1266 if all_rows.is_empty() {
1267 return Ok(entity_ids.iter().map(|&id| (id, 0.0_f32)).collect());
1268 }
1269
1270 let max_degree = all_rows
1271 .iter()
1272 .map(|(_, d, _)| *d)
1273 .max()
1274 .unwrap_or(1)
1275 .max(1);
1276
1277 let mut scores: HashMap<i64, f32> = entity_ids.iter().map(|&id| (id, 0.0_f32)).collect();
1278 for (entity_id, degree, type_diversity) in all_rows {
1279 #[allow(clippy::cast_precision_loss)]
1280 let norm_degree = degree as f32 / max_degree as f32;
1281 #[allow(clippy::cast_precision_loss)]
1282 let norm_diversity = (type_diversity as f32 / 4.0).min(1.0);
1283 let score = 0.6 * norm_degree + 0.4 * norm_diversity;
1284 scores.insert(entity_id, score);
1285 }
1286
1287 Ok(scores)
1288 }
1289
1290 pub async fn entity_community_ids(
1299 &self,
1300 entity_ids: &[i64],
1301 ) -> Result<HashMap<i64, i64>, MemoryError> {
1302 const MAX_BATCH: usize = 490;
1303
1304 if entity_ids.is_empty() {
1305 return Ok(HashMap::new());
1306 }
1307
1308 let mut result: HashMap<i64, i64> = HashMap::new();
1309 for chunk in entity_ids.chunks(MAX_BATCH) {
1310 let placeholders = placeholder_list(1, chunk.len());
1311
1312 let community_sql = community_ids_sql(&placeholders);
1313 let mut query = zeph_db::query_as::<_, (i64, i64)>(&community_sql);
1314 for id in chunk {
1315 query = query.bind(*id);
1316 }
1317
1318 let rows: Vec<(i64, i64)> = query.fetch_all(&self.pool).await?;
1319 result.extend(rows);
1320 }
1321
1322 Ok(result)
1323 }
1324
1325 pub async fn record_edge_retrieval(&self, edge_ids: &[i64]) -> Result<(), MemoryError> {
1334 const MAX_BATCH: usize = 490;
1335 let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1336 for chunk in edge_ids.chunks(MAX_BATCH) {
1337 let edge_placeholders = placeholder_list(1, chunk.len());
1338 let retrieval_sql = format!(
1339 "UPDATE graph_edges \
1340 SET retrieval_count = retrieval_count + 1, \
1341 last_retrieved_at = {epoch_now} \
1342 WHERE id IN ({edge_placeholders})"
1343 );
1344 let mut q = zeph_db::query(&retrieval_sql);
1345 for id in chunk {
1346 q = q.bind(*id);
1347 }
1348 q.execute(&self.pool).await?;
1349 }
1350 Ok(())
1351 }
1352
1353 pub async fn decay_edge_retrieval_counts(
1362 &self,
1363 decay_lambda: f64,
1364 interval_secs: u64,
1365 ) -> Result<usize, MemoryError> {
1366 let epoch_now_decay = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1367 let decay_raw = format!(
1368 "UPDATE graph_edges \
1369 SET retrieval_count = MAX(CAST(retrieval_count * ? AS INTEGER), 0) \
1370 WHERE valid_to IS NULL \
1371 AND retrieval_count > 0 \
1372 AND (last_retrieved_at IS NULL OR last_retrieved_at < {epoch_now_decay} - ?)"
1373 );
1374 let decay_sql = zeph_db::rewrite_placeholders(&decay_raw);
1375 let result = zeph_db::query(&decay_sql)
1376 .bind(decay_lambda)
1377 .bind(i64::try_from(interval_secs).unwrap_or(i64::MAX))
1378 .execute(&self.pool)
1379 .await?;
1380 Ok(usize::try_from(result.rows_affected())?)
1381 }
1382
1383 pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
1389 let days = i64::from(retention_days);
1390 let result = zeph_db::query(sql!(
1391 "DELETE FROM graph_edges
1392 WHERE expired_at IS NOT NULL
1393 AND expired_at < datetime('now', '-' || ? || ' days')"
1394 ))
1395 .bind(days)
1396 .execute(&self.pool)
1397 .await?;
1398 Ok(usize::try_from(result.rows_affected())?)
1399 }
1400
1401 pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
1407 let days = i64::from(retention_days);
1408 let result = zeph_db::query(sql!(
1409 "DELETE FROM graph_entities
1410 WHERE id NOT IN (
1411 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
1412 UNION
1413 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
1414 )
1415 AND last_seen_at < datetime('now', '-' || ? || ' days')"
1416 ))
1417 .bind(days)
1418 .execute(&self.pool)
1419 .await?;
1420 Ok(usize::try_from(result.rows_affected())?)
1421 }
1422
1423 pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1432 let current = self.entity_count().await?;
1433 let max = i64::try_from(max_entities)?;
1434 if current <= max {
1435 return Ok(0);
1436 }
1437 let excess = current - max;
1438 let result = zeph_db::query(sql!(
1439 "DELETE FROM graph_entities
1440 WHERE id IN (
1441 SELECT e.id
1442 FROM graph_entities e
1443 LEFT JOIN (
1444 SELECT source_entity_id AS eid, COUNT(*) AS cnt
1445 FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1446 UNION ALL
1447 SELECT target_entity_id AS eid, COUNT(*) AS cnt
1448 FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1449 ) edge_counts ON e.id = edge_counts.eid
1450 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1451 LIMIT ?
1452 )"
1453 ))
1454 .bind(excess)
1455 .execute(&self.pool)
1456 .await?;
1457 Ok(usize::try_from(result.rows_affected())?)
1458 }
1459
1460 pub async fn edges_at_timestamp(
1474 &self,
1475 entity_id: i64,
1476 timestamp: &str,
1477 ) -> Result<Vec<Edge>, MemoryError> {
1478 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1482 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1483 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1484 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes
1485 FROM graph_edges
1486 WHERE valid_to IS NULL
1487 AND valid_from <= ?
1488 AND (source_entity_id = ? OR target_entity_id = ?)
1489 UNION ALL
1490 SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1491 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1492 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes
1493 FROM graph_edges
1494 WHERE valid_to IS NOT NULL
1495 AND valid_from <= ?
1496 AND valid_to > ?
1497 AND (source_entity_id = ? OR target_entity_id = ?)"
1498 ))
1499 .bind(timestamp)
1500 .bind(entity_id)
1501 .bind(entity_id)
1502 .bind(timestamp)
1503 .bind(timestamp)
1504 .bind(entity_id)
1505 .bind(entity_id)
1506 .fetch_all(&self.pool)
1507 .await?;
1508 Ok(rows.into_iter().map(edge_from_row).collect())
1509 }
1510
1511 pub async fn edge_history(
1520 &self,
1521 source_entity_id: i64,
1522 predicate: &str,
1523 relation: Option<&str>,
1524 limit: usize,
1525 ) -> Result<Vec<Edge>, MemoryError> {
1526 let escaped = predicate
1528 .replace('\\', "\\\\")
1529 .replace('%', "\\%")
1530 .replace('_', "\\_");
1531 let like_pattern = format!("%{escaped}%");
1532 let limit = i64::try_from(limit)?;
1533 let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1534 zeph_db::query_as(sql!(
1535 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1536 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1537 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes
1538 FROM graph_edges
1539 WHERE source_entity_id = ?
1540 AND fact LIKE ? ESCAPE '\\'
1541 AND relation = ?
1542 ORDER BY valid_from DESC
1543 LIMIT ?"
1544 ))
1545 .bind(source_entity_id)
1546 .bind(&like_pattern)
1547 .bind(rel)
1548 .bind(limit)
1549 .fetch_all(&self.pool)
1550 .await?
1551 } else {
1552 zeph_db::query_as(sql!(
1553 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1554 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1555 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes
1556 FROM graph_edges
1557 WHERE source_entity_id = ?
1558 AND fact LIKE ? ESCAPE '\\'
1559 ORDER BY valid_from DESC
1560 LIMIT ?"
1561 ))
1562 .bind(source_entity_id)
1563 .bind(&like_pattern)
1564 .bind(limit)
1565 .fetch_all(&self.pool)
1566 .await?
1567 };
1568 Ok(rows.into_iter().map(edge_from_row).collect())
1569 }
1570
1571 pub async fn bfs(
1588 &self,
1589 start_entity_id: i64,
1590 max_hops: u32,
1591 ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1592 self.bfs_with_depth(start_entity_id, max_hops)
1593 .await
1594 .map(|(e, ed, _)| (e, ed))
1595 }
1596
1597 pub async fn bfs_with_depth(
1608 &self,
1609 start_entity_id: i64,
1610 max_hops: u32,
1611 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1612 self.bfs_core(start_entity_id, max_hops, None).await
1613 }
1614
1615 pub async fn bfs_at_timestamp(
1626 &self,
1627 start_entity_id: i64,
1628 max_hops: u32,
1629 timestamp: &str,
1630 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1631 self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1632 .await
1633 }
1634
1635 pub async fn bfs_typed(
1651 &self,
1652 start_entity_id: i64,
1653 max_hops: u32,
1654 edge_types: &[EdgeType],
1655 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1656 if edge_types.is_empty() {
1657 return self.bfs_with_depth(start_entity_id, max_hops).await;
1658 }
1659 self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1660 .await
1661 }
1662
1663 async fn bfs_core(
1671 &self,
1672 start_entity_id: i64,
1673 max_hops: u32,
1674 at_timestamp: Option<&str>,
1675 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1676 use std::collections::HashMap;
1677
1678 const MAX_FRONTIER: usize = 300;
1681
1682 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1683 let mut frontier: Vec<i64> = vec![start_entity_id];
1684 depth_map.insert(start_entity_id, 0);
1685
1686 for hop in 0..max_hops {
1687 if frontier.is_empty() {
1688 break;
1689 }
1690 frontier.truncate(MAX_FRONTIER);
1691 let n = frontier.len();
1695 let ph1 = placeholder_list(1, n);
1696 let ph2 = placeholder_list(n + 1, n);
1697 let ph3 = placeholder_list(n * 2 + 1, n);
1698 let edge_filter = if at_timestamp.is_some() {
1699 let ts_pos = n * 3 + 1;
1700 format!(
1701 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1702 ts = numbered_placeholder(ts_pos),
1703 )
1704 } else {
1705 "valid_to IS NULL".to_owned()
1706 };
1707 let neighbour_sql = format!(
1708 "SELECT DISTINCT CASE
1709 WHEN source_entity_id IN ({ph1}) THEN target_entity_id
1710 ELSE source_entity_id
1711 END as neighbour_id
1712 FROM graph_edges
1713 WHERE {edge_filter}
1714 AND (source_entity_id IN ({ph2}) OR target_entity_id IN ({ph3}))"
1715 );
1716 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1717 for id in &frontier {
1718 q = q.bind(*id);
1719 }
1720 for id in &frontier {
1721 q = q.bind(*id);
1722 }
1723 for id in &frontier {
1724 q = q.bind(*id);
1725 }
1726 if let Some(ts) = at_timestamp {
1727 q = q.bind(ts);
1728 }
1729 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1730 let mut next_frontier: Vec<i64> = Vec::new();
1731 for nbr in neighbours {
1732 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1733 e.insert(hop + 1);
1734 next_frontier.push(nbr);
1735 }
1736 }
1737 frontier = next_frontier;
1738 }
1739
1740 self.bfs_fetch_results(depth_map, at_timestamp).await
1741 }
1742
1743 async fn bfs_core_typed(
1752 &self,
1753 start_entity_id: i64,
1754 max_hops: u32,
1755 at_timestamp: Option<&str>,
1756 edge_types: &[EdgeType],
1757 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1758 use std::collections::HashMap;
1759
1760 const MAX_FRONTIER: usize = 300;
1761
1762 let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1763
1764 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1765 let mut frontier: Vec<i64> = vec![start_entity_id];
1766 depth_map.insert(start_entity_id, 0);
1767
1768 let n_types = type_strs.len();
1769 let type_in = placeholder_list(1, n_types);
1771 let id_start = n_types + 1;
1772
1773 for hop in 0..max_hops {
1774 if frontier.is_empty() {
1775 break;
1776 }
1777 frontier.truncate(MAX_FRONTIER);
1778
1779 let n_frontier = frontier.len();
1780 let fp1 = placeholder_list(id_start, n_frontier);
1782 let fp2 = placeholder_list(id_start + n_frontier, n_frontier);
1783 let fp3 = placeholder_list(id_start + n_frontier * 2, n_frontier);
1784
1785 let edge_filter = if at_timestamp.is_some() {
1786 let ts_pos = id_start + n_frontier * 3;
1787 format!(
1788 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1789 ts = numbered_placeholder(ts_pos),
1790 )
1791 } else {
1792 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1793 };
1794
1795 let neighbour_sql = format!(
1796 "SELECT DISTINCT CASE
1797 WHEN source_entity_id IN ({fp1}) THEN target_entity_id
1798 ELSE source_entity_id
1799 END as neighbour_id
1800 FROM graph_edges
1801 WHERE {edge_filter}
1802 AND (source_entity_id IN ({fp2}) OR target_entity_id IN ({fp3}))"
1803 );
1804
1805 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1806 for t in &type_strs {
1808 q = q.bind(*t);
1809 }
1810 for id in &frontier {
1812 q = q.bind(*id);
1813 }
1814 for id in &frontier {
1815 q = q.bind(*id);
1816 }
1817 for id in &frontier {
1818 q = q.bind(*id);
1819 }
1820 if let Some(ts) = at_timestamp {
1821 q = q.bind(ts);
1822 }
1823
1824 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1825 let mut next_frontier: Vec<i64> = Vec::new();
1826 for nbr in neighbours {
1827 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1828 e.insert(hop + 1);
1829 next_frontier.push(nbr);
1830 }
1831 }
1832 frontier = next_frontier;
1833 }
1834
1835 self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1837 .await
1838 }
1839
1840 async fn bfs_fetch_results_typed(
1848 &self,
1849 depth_map: std::collections::HashMap<i64, u32>,
1850 at_timestamp: Option<&str>,
1851 type_strs: &[&str],
1852 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1853 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1854 if visited_ids.is_empty() {
1855 return Ok((Vec::new(), Vec::new(), depth_map));
1856 }
1857 if visited_ids.len() > 499 {
1858 tracing::warn!(
1859 total = visited_ids.len(),
1860 retained = 499,
1861 "bfs_fetch_results_typed: visited entity set truncated to 499"
1862 );
1863 visited_ids.truncate(499);
1864 }
1865
1866 let n_types = type_strs.len();
1867 let n_visited = visited_ids.len();
1868
1869 let type_in = placeholder_list(1, n_types);
1871 let id_start = n_types + 1;
1872 let ph_ids1 = placeholder_list(id_start, n_visited);
1873 let ph_ids2 = placeholder_list(id_start + n_visited, n_visited);
1874
1875 let edge_filter = if at_timestamp.is_some() {
1876 let ts_pos = id_start + n_visited * 2;
1877 format!(
1878 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1879 ts = numbered_placeholder(ts_pos),
1880 )
1881 } else {
1882 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1883 };
1884
1885 let edge_sql = format!(
1886 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1887 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1888 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes
1889 FROM graph_edges
1890 WHERE {edge_filter}
1891 AND source_entity_id IN ({ph_ids1})
1892 AND target_entity_id IN ({ph_ids2})"
1893 );
1894 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1895 for t in type_strs {
1896 edge_query = edge_query.bind(*t);
1897 }
1898 for id in &visited_ids {
1899 edge_query = edge_query.bind(*id);
1900 }
1901 for id in &visited_ids {
1902 edge_query = edge_query.bind(*id);
1903 }
1904 if let Some(ts) = at_timestamp {
1905 edge_query = edge_query.bind(ts);
1906 }
1907 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1908
1909 let entity_sql2 = format!(
1911 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1912 FROM graph_entities WHERE id IN ({ph})",
1913 ph = placeholder_list(1, visited_ids.len()),
1914 );
1915 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql2);
1916 for id in &visited_ids {
1917 entity_query = entity_query.bind(*id);
1918 }
1919 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1920
1921 let entities: Vec<Entity> = entity_rows
1922 .into_iter()
1923 .map(entity_from_row)
1924 .collect::<Result<Vec<_>, _>>()?;
1925 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1926
1927 Ok((entities, edges, depth_map))
1928 }
1929
1930 async fn bfs_fetch_results(
1932 &self,
1933 depth_map: std::collections::HashMap<i64, u32>,
1934 at_timestamp: Option<&str>,
1935 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1936 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1937 if visited_ids.is_empty() {
1938 return Ok((Vec::new(), Vec::new(), depth_map));
1939 }
1940 if visited_ids.len() > 499 {
1942 tracing::warn!(
1943 total = visited_ids.len(),
1944 retained = 499,
1945 "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1946 some reachable entities will be dropped from results"
1947 );
1948 visited_ids.truncate(499);
1949 }
1950
1951 let n = visited_ids.len();
1952 let ph_ids1 = placeholder_list(1, n);
1953 let ph_ids2 = placeholder_list(n + 1, n);
1954 let edge_filter = if at_timestamp.is_some() {
1955 let ts_pos = n * 2 + 1;
1956 format!(
1957 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1958 ts = numbered_placeholder(ts_pos),
1959 )
1960 } else {
1961 "valid_to IS NULL".to_owned()
1962 };
1963 let edge_sql = format!(
1964 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1965 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1966 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes
1967 FROM graph_edges
1968 WHERE {edge_filter}
1969 AND source_entity_id IN ({ph_ids1})
1970 AND target_entity_id IN ({ph_ids2})"
1971 );
1972 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1973 for id in &visited_ids {
1974 edge_query = edge_query.bind(*id);
1975 }
1976 for id in &visited_ids {
1977 edge_query = edge_query.bind(*id);
1978 }
1979 if let Some(ts) = at_timestamp {
1980 edge_query = edge_query.bind(ts);
1981 }
1982 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1983
1984 let entity_sql = format!(
1985 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1986 FROM graph_entities WHERE id IN ({ph})",
1987 ph = placeholder_list(1, n),
1988 );
1989 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql);
1990 for id in &visited_ids {
1991 entity_query = entity_query.bind(*id);
1992 }
1993 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1994
1995 let entities: Vec<Entity> = entity_rows
1996 .into_iter()
1997 .map(entity_from_row)
1998 .collect::<Result<Vec<_>, _>>()?;
1999 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
2000
2001 Ok((entities, edges, depth_map))
2002 }
2003
2004 pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
2020 let find_by_name_sql = format!(
2021 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id \
2022 FROM graph_entities \
2023 WHERE name = ? {cn} OR canonical_name = ? {cn} \
2024 LIMIT 5",
2025 cn = <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
2026 );
2027 let rows: Vec<EntityRow> = zeph_db::query_as(&find_by_name_sql)
2028 .bind(name)
2029 .bind(name)
2030 .fetch_all(&self.pool)
2031 .await?;
2032
2033 if !rows.is_empty() {
2034 return rows.into_iter().map(entity_from_row).collect();
2035 }
2036
2037 self.find_entities_fuzzy(name, 5).await
2038 }
2039
2040 pub async fn unprocessed_messages_for_backfill(
2048 &self,
2049 limit: usize,
2050 ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
2051 let limit = i64::try_from(limit)?;
2052 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
2053 "SELECT id, content FROM messages
2054 WHERE graph_processed = 0
2055 ORDER BY id ASC
2056 LIMIT ?"
2057 ))
2058 .bind(limit)
2059 .fetch_all(&self.pool)
2060 .await?;
2061 Ok(rows
2062 .into_iter()
2063 .map(|(id, content)| (crate::types::MessageId(id), content))
2064 .collect())
2065 }
2066
2067 pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
2073 let count: i64 = zeph_db::query_scalar(sql!(
2074 "SELECT COUNT(*) FROM messages WHERE graph_processed = 0"
2075 ))
2076 .fetch_one(&self.pool)
2077 .await?;
2078 Ok(count)
2079 }
2080
2081 pub async fn mark_messages_graph_processed(
2087 &self,
2088 ids: &[crate::types::MessageId],
2089 ) -> Result<(), MemoryError> {
2090 const MAX_BATCH: usize = 490;
2091 if ids.is_empty() {
2092 return Ok(());
2093 }
2094 for chunk in ids.chunks(MAX_BATCH) {
2095 let placeholders = placeholder_list(1, chunk.len());
2096 let sql =
2097 format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
2098 let mut query = zeph_db::query(&sql);
2099 for id in chunk {
2100 query = query.bind(id.0);
2101 }
2102 query.execute(&self.pool).await?;
2103 }
2104 Ok(())
2105 }
2106}
2107
2108#[cfg(feature = "sqlite")]
2111fn community_ids_sql(placeholders: &str) -> String {
2112 format!(
2113 "SELECT CAST(j.value AS INTEGER) AS entity_id, c.id AS community_id
2114 FROM graph_communities c, json_each(c.entity_ids) j
2115 WHERE CAST(j.value AS INTEGER) IN ({placeholders})"
2116 )
2117}
2118
2119#[cfg(feature = "postgres")]
2120fn community_ids_sql(placeholders: &str) -> String {
2121 format!(
2122 "SELECT (j.value)::bigint AS entity_id, c.id AS community_id
2123 FROM graph_communities c,
2124 jsonb_array_elements_text(c.entity_ids::jsonb) j(value)
2125 WHERE (j.value)::bigint IN ({placeholders})"
2126 )
2127}
2128
2129#[derive(zeph_db::FromRow)]
2132struct EntityRow {
2133 id: i64,
2134 name: String,
2135 canonical_name: String,
2136 entity_type: String,
2137 summary: Option<String>,
2138 first_seen_at: String,
2139 last_seen_at: String,
2140 qdrant_point_id: Option<String>,
2141}
2142
2143fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
2144 let entity_type = row
2145 .entity_type
2146 .parse::<EntityType>()
2147 .map_err(MemoryError::GraphStore)?;
2148 Ok(Entity {
2149 id: row.id,
2150 name: row.name,
2151 canonical_name: row.canonical_name,
2152 entity_type,
2153 summary: row.summary,
2154 first_seen_at: row.first_seen_at,
2155 last_seen_at: row.last_seen_at,
2156 qdrant_point_id: row.qdrant_point_id,
2157 })
2158}
2159
2160#[derive(zeph_db::FromRow)]
2161struct AliasRow {
2162 id: i64,
2163 entity_id: i64,
2164 alias_name: String,
2165 created_at: String,
2166}
2167
2168fn alias_from_row(row: AliasRow) -> EntityAlias {
2169 EntityAlias {
2170 id: row.id,
2171 entity_id: row.entity_id,
2172 alias_name: row.alias_name,
2173 created_at: row.created_at,
2174 }
2175}
2176
2177#[derive(zeph_db::FromRow)]
2178struct EdgeRow {
2179 id: i64,
2180 source_entity_id: i64,
2181 target_entity_id: i64,
2182 relation: String,
2183 fact: String,
2184 confidence: f64,
2185 valid_from: String,
2186 valid_to: Option<String>,
2187 created_at: String,
2188 expired_at: Option<String>,
2189 #[sqlx(rename = "episode_id")]
2190 source_message_id: Option<i64>,
2191 qdrant_point_id: Option<String>,
2192 edge_type: String,
2193 retrieval_count: i32,
2194 last_retrieved_at: Option<i64>,
2195 superseded_by: Option<i64>,
2196 canonical_relation: Option<String>,
2197 supersedes: Option<i64>,
2198}
2199
2200fn edge_from_row(row: EdgeRow) -> Edge {
2201 let edge_type = row
2202 .edge_type
2203 .parse::<EdgeType>()
2204 .unwrap_or(EdgeType::Semantic);
2205 let canonical_relation = row
2206 .canonical_relation
2207 .unwrap_or_else(|| row.relation.clone());
2208 Edge {
2209 id: row.id,
2210 source_entity_id: row.source_entity_id,
2211 target_entity_id: row.target_entity_id,
2212 canonical_relation,
2213 relation: row.relation,
2214 fact: row.fact,
2215 #[allow(clippy::cast_possible_truncation)]
2216 confidence: row.confidence as f32,
2217 valid_from: row.valid_from,
2218 valid_to: row.valid_to,
2219 created_at: row.created_at,
2220 expired_at: row.expired_at,
2221 source_message_id: row.source_message_id.map(MessageId),
2222 qdrant_point_id: row.qdrant_point_id,
2223 edge_type,
2224 retrieval_count: row.retrieval_count,
2225 last_retrieved_at: row.last_retrieved_at,
2226 superseded_by: row.superseded_by,
2227 supersedes: row.supersedes,
2228 }
2229}
2230
2231#[derive(zeph_db::FromRow)]
2232struct CommunityRow {
2233 id: i64,
2234 name: String,
2235 summary: String,
2236 entity_ids: String,
2237 fingerprint: Option<String>,
2238 created_at: String,
2239 updated_at: String,
2240}
2241
2242impl GraphStore {
2245 pub async fn ensure_episode(&self, conversation_id: i64) -> Result<i64, MemoryError> {
2253 zeph_db::query(sql!("INSERT OR IGNORE INTO conversations (id) VALUES (?)"))
2257 .bind(conversation_id)
2258 .execute(&self.pool)
2259 .await?;
2260
2261 let id: i64 = zeph_db::query_scalar(sql!(
2262 "INSERT INTO graph_episodes (conversation_id)
2263 VALUES (?)
2264 ON CONFLICT(conversation_id) DO UPDATE SET conversation_id = excluded.conversation_id
2265 RETURNING id"
2266 ))
2267 .bind(conversation_id)
2268 .fetch_one(&self.pool)
2269 .await?;
2270 Ok(id)
2271 }
2272
2273 pub async fn link_entity_to_episode(
2281 &self,
2282 episode_id: i64,
2283 entity_id: i64,
2284 ) -> Result<(), MemoryError> {
2285 zeph_db::query(sql!(
2286 "INSERT OR IGNORE INTO graph_episode_entities (episode_id, entity_id)
2287 VALUES (?, ?)"
2288 ))
2289 .bind(episode_id)
2290 .bind(entity_id)
2291 .execute(&self.pool)
2292 .await?;
2293 Ok(())
2294 }
2295
2296 pub async fn episodes_for_entity(
2302 &self,
2303 entity_id: i64,
2304 ) -> Result<Vec<super::types::Episode>, MemoryError> {
2305 #[derive(zeph_db::FromRow)]
2306 struct EpisodeRow {
2307 id: i64,
2308 conversation_id: i64,
2309 created_at: String,
2310 closed_at: Option<String>,
2311 }
2312 let rows: Vec<EpisodeRow> = zeph_db::query_as(sql!(
2313 "SELECT e.id, e.conversation_id, e.created_at, e.closed_at
2314 FROM graph_episodes e
2315 JOIN graph_episode_entities ee ON ee.episode_id = e.id
2316 WHERE ee.entity_id = ?"
2317 ))
2318 .bind(entity_id)
2319 .fetch_all(&self.pool)
2320 .await?;
2321 Ok(rows
2322 .into_iter()
2323 .map(|r| super::types::Episode {
2324 id: r.id,
2325 conversation_id: r.conversation_id,
2326 created_at: r.created_at,
2327 closed_at: r.closed_at,
2328 })
2329 .collect())
2330 }
2331
2332 #[allow(clippy::too_many_arguments)]
2351 #[allow(clippy::too_many_lines)]
2352 pub async fn insert_or_supersede_with_metrics(
2353 &self,
2354 source_entity_id: i64,
2355 target_entity_id: i64,
2356 relation: &str,
2357 canonical_relation: &str,
2358 fact: &str,
2359 confidence: f32,
2360 episode_id: Option<MessageId>,
2361 edge_type: EdgeType,
2362 set_supersedes: bool,
2363 metrics: Option<&ApexMetrics>,
2364 ) -> Result<i64, MemoryError> {
2365 if source_entity_id == target_entity_id {
2366 return Err(MemoryError::InvalidInput(format!(
2367 "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
2368 )));
2369 }
2370 let confidence = confidence.clamp(0.0, 1.0);
2371 let edge_type_str = edge_type.as_str();
2372 let episode_raw: Option<i64> = episode_id.map(|m| m.0);
2373
2374 let mut tx = zeph_db::begin(&self.pool).await?;
2375
2376 let identical: Option<i64> = zeph_db::query_scalar(sql!(
2378 "SELECT id FROM graph_edges
2379 WHERE source_entity_id = ?
2380 AND target_entity_id = ?
2381 AND canonical_relation = ?
2382 AND edge_type = ?
2383 AND fact = ?
2384 AND valid_to IS NULL
2385 AND expired_at IS NULL
2386 LIMIT 1"
2387 ))
2388 .bind(source_entity_id)
2389 .bind(target_entity_id)
2390 .bind(canonical_relation)
2391 .bind(edge_type_str)
2392 .bind(fact)
2393 .fetch_optional(&mut *tx)
2394 .await?;
2395
2396 if let Some(existing_id) = identical {
2397 #[allow(clippy::cast_possible_wrap)]
2399 let asserted_at = std::time::SystemTime::now()
2400 .duration_since(std::time::UNIX_EPOCH)
2401 .unwrap_or_default()
2402 .as_secs() as i64;
2403 zeph_db::query(sql!(
2404 "INSERT INTO edge_reassertions (head_edge_id, asserted_at, episode_id, confidence)
2405 VALUES (?, ?, ?, ?)"
2406 ))
2407 .bind(existing_id)
2408 .bind(asserted_at)
2409 .bind(episode_raw)
2410 .bind(f64::from(confidence))
2411 .execute(&mut *tx)
2412 .await?;
2413 tx.commit().await?;
2414 return Ok(existing_id);
2415 }
2416
2417 let prior_head: Option<i64> = zeph_db::query_scalar(sql!(
2419 "SELECT id FROM graph_edges
2420 WHERE source_entity_id = ?
2421 AND canonical_relation = ?
2422 AND edge_type = ?
2423 AND valid_to IS NULL
2424 AND expired_at IS NULL
2425 ORDER BY created_at DESC
2426 LIMIT 1"
2427 ))
2428 .bind(source_entity_id)
2429 .bind(canonical_relation)
2430 .bind(edge_type_str)
2431 .fetch_optional(&mut *tx)
2432 .await?;
2433
2434 if let Some(head_id) = prior_head {
2441 let cap = i64::try_from(SUPERSEDE_DEPTH_CAP + 1).unwrap_or(i64::MAX);
2442 let depth: Option<i64> = sqlx::query_scalar(
2443 "WITH RECURSIVE chain(id, depth) AS (
2444 SELECT supersedes, 1 FROM graph_edges WHERE id = ? AND supersedes IS NOT NULL
2445 UNION ALL
2446 SELECT e.supersedes, c.depth + 1
2447 FROM graph_edges e JOIN chain c ON e.id = c.id
2448 WHERE e.supersedes IS NOT NULL AND c.depth < ?
2449 )
2450 SELECT MAX(depth) FROM chain",
2451 )
2452 .bind(head_id)
2453 .bind(cap)
2454 .fetch_optional(&mut *tx)
2455 .await?
2456 .flatten();
2457 let d = usize::try_from(depth.unwrap_or(0)).unwrap_or(usize::MAX);
2458 if d > SUPERSEDE_DEPTH_CAP {
2459 return Err(MemoryError::SupersedeDepthExceeded(head_id));
2460 }
2461 }
2462
2463 let supersedes_val: Option<i64> = if set_supersedes { prior_head } else { None };
2465 let new_id: i64 = zeph_db::query_scalar(sql!(
2466 "INSERT INTO graph_edges
2467 (source_entity_id, target_entity_id, relation, canonical_relation, fact,
2468 confidence, episode_id, edge_type, supersedes)
2469 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
2470 RETURNING id"
2471 ))
2472 .bind(source_entity_id)
2473 .bind(target_entity_id)
2474 .bind(relation)
2475 .bind(canonical_relation)
2476 .bind(fact)
2477 .bind(f64::from(confidence))
2478 .bind(episode_raw)
2479 .bind(edge_type_str)
2480 .bind(supersedes_val)
2481 .fetch_one(&mut *tx)
2482 .await?;
2483
2484 if let Some(head_id) = prior_head {
2486 zeph_db::query(sql!(
2487 "UPDATE graph_edges
2488 SET valid_to = CURRENT_TIMESTAMP,
2489 expired_at = CURRENT_TIMESTAMP,
2490 superseded_by = ?
2491 WHERE id = ?"
2492 ))
2493 .bind(new_id)
2494 .bind(head_id)
2495 .execute(&mut *tx)
2496 .await?;
2497
2498 if let Some(m) = metrics {
2499 m.supersedes_total
2500 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2501 }
2502 }
2503
2504 tx.commit().await?;
2505 Ok(new_id)
2506 }
2507
2508 #[allow(clippy::too_many_arguments)]
2514 pub async fn insert_or_supersede(
2515 &self,
2516 source_entity_id: i64,
2517 target_entity_id: i64,
2518 relation: &str,
2519 canonical_relation: &str,
2520 fact: &str,
2521 confidence: f32,
2522 episode_id: Option<MessageId>,
2523 edge_type: EdgeType,
2524 set_supersedes: bool,
2525 ) -> Result<i64, MemoryError> {
2526 self.insert_or_supersede_with_metrics(
2527 source_entity_id,
2528 target_entity_id,
2529 relation,
2530 canonical_relation,
2531 fact,
2532 confidence,
2533 episode_id,
2534 edge_type,
2535 set_supersedes,
2536 None,
2537 )
2538 .await
2539 }
2540
2541 pub async fn check_supersede_depth(&self, head_id: i64) -> Result<usize, MemoryError> {
2551 Self::check_supersede_depth_with_pool(&self.pool, head_id).await
2552 }
2553
2554 async fn check_supersede_depth_with_pool(
2555 pool: &zeph_db::DbPool,
2556 head_id: i64,
2557 ) -> Result<usize, MemoryError> {
2558 let cap = i64::try_from(SUPERSEDE_DEPTH_CAP + 1).unwrap_or(i64::MAX);
2559 let depth: Option<i64> = zeph_db::query_scalar(sql!(
2562 "WITH RECURSIVE chain(id, depth) AS (
2563 SELECT id, 0 FROM graph_edges WHERE id = ?
2564 UNION ALL
2565 SELECT e.supersedes, c.depth + 1
2566 FROM graph_edges e JOIN chain c ON e.id = c.id
2567 WHERE e.supersedes IS NOT NULL AND c.depth < ?
2568 )
2569 SELECT MAX(depth) FROM chain"
2570 ))
2571 .bind(head_id)
2572 .bind(cap)
2573 .fetch_optional(pool)
2574 .await?
2575 .flatten();
2576
2577 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
2578 let d = depth.unwrap_or(0) as usize;
2579 if d > SUPERSEDE_DEPTH_CAP {
2580 return Err(MemoryError::SupersedeCycle(head_id));
2581 }
2582 Ok(d)
2583 }
2584}
2585
2586#[cfg(test)]
2589mod tests;