1use std::collections::HashMap;
11#[allow(unused_imports)]
12use zeph_db::sql;
13
14use futures::Stream;
15use zeph_db::fts::sanitize_fts_query;
16use zeph_db::{ActiveDialect, DbPool, numbered_placeholder, placeholder_list};
17
18use crate::error::MemoryError;
19use crate::graph::conflict::{ApexMetrics, SUPERSEDE_DEPTH_CAP};
20use crate::types::MessageId;
21
22use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
23
24pub struct GraphStore {
31 pool: DbPool,
32}
33
34impl GraphStore {
35 #[must_use]
39 pub fn new(pool: DbPool) -> Self {
40 Self { pool }
41 }
42
43 #[must_use]
45 pub fn pool(&self) -> &DbPool {
46 &self.pool
47 }
48
49 pub async fn upsert_entity(
62 &self,
63 surface_name: &str,
64 canonical_name: &str,
65 entity_type: EntityType,
66 summary: Option<&str>,
67 ) -> Result<i64, MemoryError> {
68 let type_str = entity_type.as_str();
69 let id: i64 = zeph_db::query_scalar(sql!(
70 "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
71 VALUES (?, ?, ?, ?)
72 ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
73 name = excluded.name,
74 summary = COALESCE(excluded.summary, summary),
75 last_seen_at = CURRENT_TIMESTAMP
76 RETURNING id"
77 ))
78 .bind(surface_name)
79 .bind(canonical_name)
80 .bind(type_str)
81 .bind(summary)
82 .fetch_one(&self.pool)
83 .await?;
84 Ok(id)
85 }
86
87 pub async fn find_entity(
93 &self,
94 canonical_name: &str,
95 entity_type: EntityType,
96 ) -> Result<Option<Entity>, MemoryError> {
97 let type_str = entity_type.as_str();
98 let row: Option<EntityRow> = zeph_db::query_as(
99 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
100 FROM graph_entities
101 WHERE canonical_name = ? AND entity_type = ?"),
102 )
103 .bind(canonical_name)
104 .bind(type_str)
105 .fetch_optional(&self.pool)
106 .await?;
107 row.map(entity_from_row).transpose()
108 }
109
110 pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
116 let row: Option<EntityRow> = zeph_db::query_as(
117 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
118 FROM graph_entities
119 WHERE id = ?"),
120 )
121 .bind(entity_id)
122 .fetch_optional(&self.pool)
123 .await?;
124 row.map(entity_from_row).transpose()
125 }
126
127 pub async fn set_entity_qdrant_point_id(
133 &self,
134 entity_id: i64,
135 point_id: &str,
136 ) -> Result<(), MemoryError> {
137 zeph_db::query(sql!(
138 "UPDATE graph_entities SET qdrant_point_id = ? WHERE id = ?"
139 ))
140 .bind(point_id)
141 .bind(entity_id)
142 .execute(&self.pool)
143 .await?;
144 Ok(())
145 }
146
147 pub async fn find_entities_fuzzy(
168 &self,
169 query: &str,
170 limit: usize,
171 ) -> Result<Vec<Entity>, MemoryError> {
172 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
176 let query = &query[..query.floor_char_boundary(512)];
177 let sanitized = sanitize_fts_query(query);
180 if sanitized.is_empty() {
181 return Ok(vec![]);
182 }
183 let fts_query: String = sanitized
184 .split_whitespace()
185 .filter(|t| !FTS5_OPERATORS.contains(t))
186 .map(|t| format!("{t}*"))
187 .collect::<Vec<_>>()
188 .join(" ");
189 if fts_query.is_empty() {
190 return Ok(vec![]);
191 }
192
193 let limit = i64::try_from(limit)?;
194 let search_sql = format!(
197 "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
198 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
199 FROM graph_entities_fts fts \
200 JOIN graph_entities e ON e.id = fts.rowid \
201 WHERE graph_entities_fts MATCH ? \
202 UNION \
203 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
204 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
205 FROM graph_entity_aliases a \
206 JOIN graph_entities e ON e.id = a.entity_id \
207 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
208 LIMIT ?",
209 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
210 );
211 let rows: Vec<EntityRow> = zeph_db::query_as(&search_sql)
212 .bind(&fts_query)
213 .bind(format!(
214 "%{}%",
215 query
216 .trim()
217 .replace('\\', "\\\\")
218 .replace('%', "\\%")
219 .replace('_', "\\_")
220 ))
221 .bind(limit)
222 .fetch_all(&self.pool)
223 .await?;
224 rows.into_iter()
225 .map(entity_from_row)
226 .collect::<Result<Vec<_>, _>>()
227 }
228
229 #[cfg(feature = "sqlite")]
239 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
240 zeph_db::query("PRAGMA wal_checkpoint(PASSIVE)")
241 .execute(&self.pool)
242 .await?;
243 Ok(())
244 }
245
246 #[cfg(feature = "postgres")]
252 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
253 Ok(())
254 }
255
256 pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
258 use futures::StreamExt as _;
259 zeph_db::query_as::<_, EntityRow>(
260 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
261 FROM graph_entities ORDER BY id ASC"),
262 )
263 .fetch(&self.pool)
264 .map(|r: Result<EntityRow, zeph_db::SqlxError>| {
265 r.map_err(MemoryError::from).and_then(entity_from_row)
266 })
267 }
268
269 pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
277 let insert_alias_sql = format!(
278 "{} INTO graph_entity_aliases (entity_id, alias_name) VALUES (?, ?){}",
279 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
280 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
281 );
282 zeph_db::query(&insert_alias_sql)
283 .bind(entity_id)
284 .bind(alias_name)
285 .execute(&self.pool)
286 .await?;
287 Ok(())
288 }
289
290 pub async fn find_entity_by_alias(
298 &self,
299 alias_name: &str,
300 entity_type: EntityType,
301 ) -> Result<Option<Entity>, MemoryError> {
302 let type_str = entity_type.as_str();
303 let alias_typed_sql = format!(
304 "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
305 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
306 FROM graph_entity_aliases a \
307 JOIN graph_entities e ON e.id = a.entity_id \
308 WHERE a.alias_name = ? {} \
309 AND e.entity_type = ? \
310 ORDER BY e.id ASC \
311 LIMIT 1",
312 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
313 );
314 let row: Option<EntityRow> = zeph_db::query_as(&alias_typed_sql)
315 .bind(alias_name)
316 .bind(type_str)
317 .fetch_optional(&self.pool)
318 .await?;
319 row.map(entity_from_row).transpose()
320 }
321
322 pub async fn aliases_for_entity(
328 &self,
329 entity_id: i64,
330 ) -> Result<Vec<EntityAlias>, MemoryError> {
331 let rows: Vec<AliasRow> = zeph_db::query_as(sql!(
332 "SELECT id, entity_id, alias_name, created_at
333 FROM graph_entity_aliases
334 WHERE entity_id = ?
335 ORDER BY id ASC"
336 ))
337 .bind(entity_id)
338 .fetch_all(&self.pool)
339 .await?;
340 Ok(rows.into_iter().map(alias_from_row).collect())
341 }
342
343 pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
349 use futures::TryStreamExt as _;
350 self.all_entities_stream().try_collect().await
351 }
352
353 pub async fn entity_count(&self) -> Result<i64, MemoryError> {
359 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_entities"))
360 .fetch_one(&self.pool)
361 .await?;
362 Ok(count)
363 }
364
365 pub async fn insert_edge(
383 &self,
384 source_entity_id: i64,
385 target_entity_id: i64,
386 relation: &str,
387 fact: &str,
388 confidence: f32,
389 episode_id: Option<MessageId>,
390 ) -> Result<i64, MemoryError> {
391 self.insert_edge_typed(
392 source_entity_id,
393 target_entity_id,
394 relation,
395 fact,
396 confidence,
397 episode_id,
398 EdgeType::Semantic,
399 )
400 .await
401 }
402
403 #[allow(clippy::too_many_arguments)]
412 pub async fn insert_edge_typed(
413 &self,
414 source_entity_id: i64,
415 target_entity_id: i64,
416 relation: &str,
417 fact: &str,
418 confidence: f32,
419 episode_id: Option<MessageId>,
420 edge_type: EdgeType,
421 ) -> Result<i64, MemoryError> {
422 if source_entity_id == target_entity_id {
423 return Err(MemoryError::InvalidInput(format!(
424 "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
425 )));
426 }
427 let confidence = confidence.clamp(0.0, 1.0);
428 let edge_type_str = edge_type.as_str();
429
430 let mut tx = zeph_db::begin(&self.pool).await?;
435
436 let existing: Option<(i64, f64)> = zeph_db::query_as(sql!(
437 "SELECT id, confidence FROM graph_edges
438 WHERE source_entity_id = ?
439 AND target_entity_id = ?
440 AND relation = ?
441 AND edge_type = ?
442 AND valid_to IS NULL
443 LIMIT 1"
444 ))
445 .bind(source_entity_id)
446 .bind(target_entity_id)
447 .bind(relation)
448 .bind(edge_type_str)
449 .fetch_optional(&mut *tx)
450 .await?;
451
452 if let Some((existing_id, stored_conf)) = existing {
453 let updated_conf = f64::from(confidence).max(stored_conf);
454 zeph_db::query(sql!("UPDATE graph_edges SET confidence = ? WHERE id = ?"))
455 .bind(updated_conf)
456 .bind(existing_id)
457 .execute(&mut *tx)
458 .await?;
459 tx.commit().await?;
460 return Ok(existing_id);
461 }
462
463 let episode_raw: Option<i64> = episode_id.map(|m| m.0);
464 let id: i64 = zeph_db::query_scalar(sql!(
465 "INSERT INTO graph_edges
466 (source_entity_id, target_entity_id, relation, fact, confidence, episode_id, edge_type)
467 VALUES (?, ?, ?, ?, ?, ?, ?)
468 RETURNING id"
469 ))
470 .bind(source_entity_id)
471 .bind(target_entity_id)
472 .bind(relation)
473 .bind(fact)
474 .bind(f64::from(confidence))
475 .bind(episode_raw)
476 .bind(edge_type_str)
477 .fetch_one(&mut *tx)
478 .await?;
479 tx.commit().await?;
480 Ok(id)
481 }
482
483 pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
489 zeph_db::query(sql!(
490 "UPDATE graph_edges SET valid_to = CURRENT_TIMESTAMP, expired_at = CURRENT_TIMESTAMP
491 WHERE id = ?"
492 ))
493 .bind(edge_id)
494 .execute(&self.pool)
495 .await?;
496 Ok(())
497 }
498
499 pub async fn invalidate_edge_with_supersession(
507 &self,
508 old_edge_id: i64,
509 new_edge_id: i64,
510 ) -> Result<(), MemoryError> {
511 zeph_db::query(sql!(
512 "UPDATE graph_edges
513 SET valid_to = CURRENT_TIMESTAMP,
514 expired_at = CURRENT_TIMESTAMP,
515 superseded_by = ?
516 WHERE id = ?"
517 ))
518 .bind(new_edge_id)
519 .bind(old_edge_id)
520 .execute(&self.pool)
521 .await?;
522 Ok(())
523 }
524
525 pub async fn edges_for_entities(
542 &self,
543 entity_ids: &[i64],
544 edge_types: &[super::types::EdgeType],
545 ) -> Result<Vec<Edge>, MemoryError> {
546 const MAX_BATCH_ENTITIES: usize = 490;
550
551 let mut all_edges: Vec<Edge> = Vec::new();
552
553 for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
554 let edges = self.query_batch_edges(chunk, edge_types).await?;
555 all_edges.extend(edges);
556 }
557
558 Ok(all_edges)
559 }
560
561 async fn query_batch_edges(
569 &self,
570 entity_ids: &[i64],
571 edge_types: &[super::types::EdgeType],
572 ) -> Result<Vec<Edge>, MemoryError> {
573 if entity_ids.is_empty() {
574 return Ok(Vec::new());
575 }
576
577 let n_ids = entity_ids.len();
580 let n_types = edge_types.len();
581
582 let sql = if n_types == 0 {
583 let placeholders = placeholder_list(1, n_ids);
585 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
586 format!(
587 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
588 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
589 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
590 FROM graph_edges
591 WHERE valid_to IS NULL
592 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))"
593 )
594 } else {
595 let placeholders = placeholder_list(1, n_ids);
596 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
597 let type_placeholders = placeholder_list(n_ids * 2 + 1, n_types);
598 format!(
599 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
600 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
601 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
602 FROM graph_edges
603 WHERE valid_to IS NULL
604 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))
605 AND edge_type IN ({type_placeholders})"
606 )
607 };
608
609 let mut query = zeph_db::query_as::<_, EdgeRow>(&sql);
611 for id in entity_ids {
612 query = query.bind(*id);
613 }
614 for id in entity_ids {
615 query = query.bind(*id);
616 }
617 for et in edge_types {
618 query = query.bind(et.as_str());
619 }
620
621 let rows: Vec<EdgeRow> = tokio::time::timeout(
625 std::time::Duration::from_millis(500),
626 query.fetch_all(&self.pool),
627 )
628 .await
629 .map_err(|_| MemoryError::Timeout("graph pool acquire timed out after 500ms".into()))??;
630 Ok(rows.into_iter().map(edge_from_row).collect())
631 }
632
633 pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
639 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
640 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
641 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
642 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
643 FROM graph_edges
644 WHERE valid_to IS NULL
645 AND (source_entity_id = ? OR target_entity_id = ?)"
646 ))
647 .bind(entity_id)
648 .bind(entity_id)
649 .fetch_all(&self.pool)
650 .await?;
651 Ok(rows.into_iter().map(edge_from_row).collect())
652 }
653
654 pub async fn edge_history_for_entity(
661 &self,
662 entity_id: i64,
663 limit: usize,
664 ) -> Result<Vec<Edge>, MemoryError> {
665 let limit = i64::try_from(limit)?;
666 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
667 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
668 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
669 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
670 FROM graph_edges
671 WHERE source_entity_id = ? OR target_entity_id = ?
672 ORDER BY valid_from DESC
673 LIMIT ?"
674 ))
675 .bind(entity_id)
676 .bind(entity_id)
677 .bind(limit)
678 .fetch_all(&self.pool)
679 .await?;
680 Ok(rows.into_iter().map(edge_from_row).collect())
681 }
682
683 pub async fn edges_between(
689 &self,
690 entity_a: i64,
691 entity_b: i64,
692 ) -> Result<Vec<Edge>, MemoryError> {
693 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
694 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
695 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
696 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
697 FROM graph_edges
698 WHERE valid_to IS NULL
699 AND ((source_entity_id = ? AND target_entity_id = ?)
700 OR (source_entity_id = ? AND target_entity_id = ?))"
701 ))
702 .bind(entity_a)
703 .bind(entity_b)
704 .bind(entity_b)
705 .bind(entity_a)
706 .fetch_all(&self.pool)
707 .await?;
708 Ok(rows.into_iter().map(edge_from_row).collect())
709 }
710
711 pub async fn edges_exact(
717 &self,
718 source_entity_id: i64,
719 target_entity_id: i64,
720 ) -> Result<Vec<Edge>, MemoryError> {
721 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
722 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
723 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
724 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
725 FROM graph_edges
726 WHERE valid_to IS NULL
727 AND source_entity_id = ?
728 AND target_entity_id = ?"
729 ))
730 .bind(source_entity_id)
731 .bind(target_entity_id)
732 .fetch_all(&self.pool)
733 .await?;
734 Ok(rows.into_iter().map(edge_from_row).collect())
735 }
736
737 pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
743 let count: i64 = zeph_db::query_scalar(sql!(
744 "SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL"
745 ))
746 .fetch_one(&self.pool)
747 .await?;
748 Ok(count)
749 }
750
751 pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
757 let rows: Vec<(String, i64)> = zeph_db::query_as(
758 sql!("SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type"),
759 )
760 .fetch_all(&self.pool)
761 .await?;
762 Ok(rows)
763 }
764
765 pub async fn upsert_community(
777 &self,
778 name: &str,
779 summary: &str,
780 entity_ids: &[i64],
781 fingerprint: Option<&str>,
782 ) -> Result<i64, MemoryError> {
783 let entity_ids_json = serde_json::to_string(entity_ids)?;
784 let id: i64 = zeph_db::query_scalar(sql!(
785 "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
786 VALUES (?, ?, ?, ?)
787 ON CONFLICT(name) DO UPDATE SET
788 summary = excluded.summary,
789 entity_ids = excluded.entity_ids,
790 fingerprint = COALESCE(excluded.fingerprint, fingerprint),
791 updated_at = CURRENT_TIMESTAMP
792 RETURNING id"
793 ))
794 .bind(name)
795 .bind(summary)
796 .bind(entity_ids_json)
797 .bind(fingerprint)
798 .fetch_one(&self.pool)
799 .await?;
800 Ok(id)
801 }
802
803 pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
810 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
811 "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL"
812 ))
813 .fetch_all(&self.pool)
814 .await?;
815 Ok(rows.into_iter().collect())
816 }
817
818 pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
824 zeph_db::query(sql!("DELETE FROM graph_communities WHERE id = ?"))
825 .bind(id)
826 .execute(&self.pool)
827 .await?;
828 Ok(())
829 }
830
831 pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
840 zeph_db::query(sql!(
841 "UPDATE graph_communities SET fingerprint = NULL WHERE id = ?"
842 ))
843 .bind(id)
844 .execute(&self.pool)
845 .await?;
846 Ok(())
847 }
848
849 pub async fn community_for_entity(
858 &self,
859 entity_id: i64,
860 ) -> Result<Option<Community>, MemoryError> {
861 let row: Option<CommunityRow> = zeph_db::query_as(
862 sql!("SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
863 FROM graph_communities c, json_each(c.entity_ids) j
864 WHERE CAST(j.value AS INTEGER) = ?
865 LIMIT 1"),
866 )
867 .bind(entity_id)
868 .fetch_optional(&self.pool)
869 .await?;
870 match row {
871 Some(row) => {
872 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
873 Ok(Some(Community {
874 id: row.id,
875 name: row.name,
876 summary: row.summary,
877 entity_ids,
878 fingerprint: row.fingerprint,
879 created_at: row.created_at,
880 updated_at: row.updated_at,
881 }))
882 }
883 None => Ok(None),
884 }
885 }
886
887 pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
893 let rows: Vec<CommunityRow> = zeph_db::query_as(sql!(
894 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
895 FROM graph_communities
896 ORDER BY id ASC"
897 ))
898 .fetch_all(&self.pool)
899 .await?;
900
901 rows.into_iter()
902 .map(|row| {
903 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
904 Ok(Community {
905 id: row.id,
906 name: row.name,
907 summary: row.summary,
908 entity_ids,
909 fingerprint: row.fingerprint,
910 created_at: row.created_at,
911 updated_at: row.updated_at,
912 })
913 })
914 .collect()
915 }
916
917 pub async fn community_count(&self) -> Result<i64, MemoryError> {
923 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_communities"))
924 .fetch_one(&self.pool)
925 .await?;
926 Ok(count)
927 }
928
929 pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
937 let val: Option<String> =
938 zeph_db::query_scalar(sql!("SELECT value FROM graph_metadata WHERE key = ?"))
939 .bind(key)
940 .fetch_optional(&self.pool)
941 .await?;
942 Ok(val)
943 }
944
945 pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
951 zeph_db::query(sql!(
952 "INSERT INTO graph_metadata (key, value) VALUES (?, ?)
953 ON CONFLICT(key) DO UPDATE SET value = excluded.value"
954 ))
955 .bind(key)
956 .bind(value)
957 .execute(&self.pool)
958 .await?;
959 Ok(())
960 }
961
962 pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
970 let val = self.get_metadata("extraction_count").await?;
971 Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
972 }
973
974 pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
976 use futures::StreamExt as _;
977 zeph_db::query_as::<_, EdgeRow>(sql!(
978 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
979 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
980 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
981 FROM graph_edges
982 WHERE valid_to IS NULL
983 ORDER BY id ASC"
984 ))
985 .fetch(&self.pool)
986 .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
987 }
988
989 pub async fn edges_after_id(
1006 &self,
1007 after_id: i64,
1008 limit: i64,
1009 ) -> Result<Vec<Edge>, MemoryError> {
1010 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1011 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1012 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1013 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1014 FROM graph_edges
1015 WHERE valid_to IS NULL AND id > ?
1016 ORDER BY id ASC
1017 LIMIT ?"
1018 ))
1019 .bind(after_id)
1020 .bind(limit)
1021 .fetch_all(&self.pool)
1022 .await?;
1023 Ok(rows.into_iter().map(edge_from_row).collect())
1024 }
1025
1026 pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
1032 let row: Option<CommunityRow> = zeph_db::query_as(sql!(
1033 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
1034 FROM graph_communities
1035 WHERE id = ?"
1036 ))
1037 .bind(id)
1038 .fetch_optional(&self.pool)
1039 .await?;
1040 match row {
1041 Some(row) => {
1042 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
1043 Ok(Some(Community {
1044 id: row.id,
1045 name: row.name,
1046 summary: row.summary,
1047 entity_ids,
1048 fingerprint: row.fingerprint,
1049 created_at: row.created_at,
1050 updated_at: row.updated_at,
1051 }))
1052 }
1053 None => Ok(None),
1054 }
1055 }
1056
1057 pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
1063 zeph_db::query(sql!("DELETE FROM graph_communities"))
1064 .execute(&self.pool)
1065 .await?;
1066 Ok(())
1067 }
1068
1069 #[allow(clippy::too_many_lines)]
1083 pub async fn find_entities_ranked(
1084 &self,
1085 query: &str,
1086 limit: usize,
1087 ) -> Result<Vec<(Entity, f32)>, MemoryError> {
1088 type EntityFtsRow = (
1091 i64,
1092 String,
1093 String,
1094 String,
1095 Option<String>,
1096 String,
1097 String,
1098 Option<String>,
1099 f64,
1100 );
1101
1102 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
1103 let query = &query[..query.floor_char_boundary(512)];
1104 let sanitized = sanitize_fts_query(query);
1105 if sanitized.is_empty() {
1106 return Ok(vec![]);
1107 }
1108 let fts_query: String = sanitized
1109 .split_whitespace()
1110 .filter(|t| !FTS5_OPERATORS.contains(t))
1111 .map(|t| format!("{t}*"))
1112 .collect::<Vec<_>>()
1113 .join(" ");
1114 if fts_query.is_empty() {
1115 return Ok(vec![]);
1116 }
1117
1118 let limit_i64 = i64::try_from(limit)?;
1119
1120 let ranked_fts_sql = format!(
1123 "SELECT * FROM ( \
1124 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1125 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1126 -bm25(graph_entities_fts, 10.0, 1.0) AS fts_rank \
1127 FROM graph_entities_fts fts \
1128 JOIN graph_entities e ON e.id = fts.rowid \
1129 WHERE graph_entities_fts MATCH ? \
1130 UNION ALL \
1131 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1132 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1133 0.5 AS fts_rank \
1134 FROM graph_entity_aliases a \
1135 JOIN graph_entities e ON e.id = a.entity_id \
1136 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
1137 ) \
1138 ORDER BY fts_rank DESC \
1139 LIMIT ?",
1140 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
1141 );
1142 let rows: Vec<EntityFtsRow> = zeph_db::query_as(&ranked_fts_sql)
1143 .bind(&fts_query)
1144 .bind(format!(
1145 "%{}%",
1146 query
1147 .trim()
1148 .replace('\\', "\\\\")
1149 .replace('%', "\\%")
1150 .replace('_', "\\_")
1151 ))
1152 .bind(limit_i64)
1153 .fetch_all(&self.pool)
1154 .await?;
1155
1156 if rows.is_empty() {
1157 return Ok(vec![]);
1158 }
1159
1160 let max_score: f64 = rows.iter().map(|r| r.8).fold(0.0_f64, f64::max);
1162 let max_score = if max_score <= 0.0 { 1.0 } else { max_score };
1163
1164 let mut seen_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
1166 let mut result: Vec<(Entity, f32)> = Vec::with_capacity(rows.len());
1167 for (
1168 id,
1169 name,
1170 canonical_name,
1171 entity_type_str,
1172 summary,
1173 first_seen_at,
1174 last_seen_at,
1175 qdrant_point_id,
1176 raw_score,
1177 ) in rows
1178 {
1179 if !seen_ids.insert(id) {
1180 continue;
1181 }
1182 let entity_type = entity_type_str
1183 .parse()
1184 .unwrap_or(super::types::EntityType::Concept);
1185 let entity = Entity {
1186 id,
1187 name,
1188 canonical_name,
1189 entity_type,
1190 summary,
1191 first_seen_at,
1192 last_seen_at,
1193 qdrant_point_id,
1194 };
1195 #[allow(clippy::cast_possible_truncation)]
1196 let normalized = (raw_score / max_score).clamp(0.0, 1.0) as f32;
1197 result.push((entity, normalized));
1198 }
1199
1200 Ok(result)
1201 }
1202
1203 pub async fn entity_structural_scores(
1213 &self,
1214 entity_ids: &[i64],
1215 ) -> Result<HashMap<i64, f32>, MemoryError> {
1216 const MAX_BATCH: usize = 163;
1219
1220 if entity_ids.is_empty() {
1221 return Ok(HashMap::new());
1222 }
1223
1224 let mut all_rows: Vec<(i64, i64, i64)> = Vec::new();
1225 for chunk in entity_ids.chunks(MAX_BATCH) {
1226 let n = chunk.len();
1227 let ph1 = placeholder_list(1, n);
1229 let ph2 = placeholder_list(n + 1, n);
1230 let ph3 = placeholder_list(n * 2 + 1, n);
1231
1232 let sql = format!(
1234 "SELECT entity_id,
1235 COUNT(*) AS degree,
1236 COUNT(DISTINCT edge_type) AS type_diversity
1237 FROM (
1238 SELECT source_entity_id AS entity_id, edge_type
1239 FROM graph_edges
1240 WHERE valid_to IS NULL AND source_entity_id IN ({ph1})
1241 UNION ALL
1242 SELECT target_entity_id AS entity_id, edge_type
1243 FROM graph_edges
1244 WHERE valid_to IS NULL AND target_entity_id IN ({ph2})
1245 )
1246 WHERE entity_id IN ({ph3})
1247 GROUP BY entity_id"
1248 );
1249
1250 let mut query = zeph_db::query_as::<_, (i64, i64, i64)>(&sql);
1251 for id in chunk {
1253 query = query.bind(*id);
1254 }
1255 for id in chunk {
1256 query = query.bind(*id);
1257 }
1258 for id in chunk {
1259 query = query.bind(*id);
1260 }
1261
1262 let chunk_rows: Vec<(i64, i64, i64)> = query.fetch_all(&self.pool).await?;
1263 all_rows.extend(chunk_rows);
1264 }
1265
1266 if all_rows.is_empty() {
1267 return Ok(entity_ids.iter().map(|&id| (id, 0.0_f32)).collect());
1268 }
1269
1270 let max_degree = all_rows
1271 .iter()
1272 .map(|(_, d, _)| *d)
1273 .max()
1274 .unwrap_or(1)
1275 .max(1);
1276
1277 let mut scores: HashMap<i64, f32> = entity_ids.iter().map(|&id| (id, 0.0_f32)).collect();
1278 for (entity_id, degree, type_diversity) in all_rows {
1279 #[allow(clippy::cast_precision_loss)]
1280 let norm_degree = degree as f32 / max_degree as f32;
1281 #[allow(clippy::cast_precision_loss)]
1282 let norm_diversity = (type_diversity as f32 / 4.0).min(1.0);
1283 let score = 0.6 * norm_degree + 0.4 * norm_diversity;
1284 scores.insert(entity_id, score);
1285 }
1286
1287 Ok(scores)
1288 }
1289
1290 pub async fn entity_community_ids(
1299 &self,
1300 entity_ids: &[i64],
1301 ) -> Result<HashMap<i64, i64>, MemoryError> {
1302 const MAX_BATCH: usize = 490;
1303
1304 if entity_ids.is_empty() {
1305 return Ok(HashMap::new());
1306 }
1307
1308 let mut result: HashMap<i64, i64> = HashMap::new();
1309 for chunk in entity_ids.chunks(MAX_BATCH) {
1310 let placeholders = placeholder_list(1, chunk.len());
1311
1312 let community_sql = community_ids_sql(&placeholders);
1313 let mut query = zeph_db::query_as::<_, (i64, i64)>(&community_sql);
1314 for id in chunk {
1315 query = query.bind(*id);
1316 }
1317
1318 let rows: Vec<(i64, i64)> = query.fetch_all(&self.pool).await?;
1319 result.extend(rows);
1320 }
1321
1322 Ok(result)
1323 }
1324
1325 pub async fn record_edge_retrieval(&self, edge_ids: &[i64]) -> Result<(), MemoryError> {
1334 const MAX_BATCH: usize = 490;
1335 let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1336 for chunk in edge_ids.chunks(MAX_BATCH) {
1337 let edge_placeholders = placeholder_list(1, chunk.len());
1338 let retrieval_sql = format!(
1339 "UPDATE graph_edges \
1340 SET retrieval_count = retrieval_count + 1, \
1341 last_retrieved_at = {epoch_now} \
1342 WHERE id IN ({edge_placeholders})"
1343 );
1344 let mut q = zeph_db::query(&retrieval_sql);
1345 for id in chunk {
1346 q = q.bind(*id);
1347 }
1348 q.execute(&self.pool).await?;
1349 }
1350 Ok(())
1351 }
1352
1353 #[tracing::instrument(
1365 name = "memory.graph.hebbian_increment",
1366 skip_all,
1367 fields(edge_count = edge_ids.len())
1368 )]
1369 pub async fn apply_hebbian_increment(
1370 &self,
1371 edge_ids: &[i64],
1372 delta: f32,
1373 ) -> Result<(), MemoryError> {
1374 const MAX_BATCH: usize = 490;
1377 if edge_ids.is_empty() || delta == 0.0 {
1378 return Ok(());
1379 }
1380 for chunk in edge_ids.chunks(MAX_BATCH) {
1381 let edge_placeholders = placeholder_list(2, chunk.len());
1382 let sql = format!(
1383 "UPDATE graph_edges \
1384 SET weight = weight + $1 \
1385 WHERE id IN ({edge_placeholders}) \
1386 AND valid_to IS NULL"
1387 );
1388 let mut q = zeph_db::query(&sql);
1389 q = q.bind(f64::from(delta));
1390 for id in chunk {
1391 q = q.bind(*id);
1392 }
1393 q.execute(&self.pool).await?;
1394 }
1395 Ok(())
1396 }
1397
1398 pub async fn qdrant_point_ids_for_entities(
1406 &self,
1407 entity_ids: &[i64],
1408 ) -> Result<HashMap<i64, String>, MemoryError> {
1409 const MAX_BATCH: usize = 490;
1411 if entity_ids.is_empty() {
1412 return Ok(HashMap::new());
1413 }
1414 let mut result: HashMap<i64, String> = HashMap::with_capacity(entity_ids.len());
1415 for chunk in entity_ids.chunks(MAX_BATCH) {
1416 let placeholders = zeph_db::placeholder_list(1, chunk.len());
1417 let sql = format!(
1418 "SELECT id, qdrant_point_id \
1419 FROM graph_entities \
1420 WHERE id IN ({placeholders}) \
1421 AND qdrant_point_id IS NOT NULL"
1422 );
1423 let mut q = zeph_db::query_as::<_, (i64, String)>(&sql);
1424 for id in chunk {
1425 q = q.bind(*id);
1426 }
1427 let rows = q.fetch_all(&self.pool).await?;
1428 for (entity_id, point_id) in rows {
1429 result.insert(entity_id, point_id);
1430 }
1431 }
1432 Ok(result)
1433 }
1434
1435 pub async fn decay_edge_retrieval_counts(
1444 &self,
1445 decay_lambda: f64,
1446 interval_secs: u64,
1447 ) -> Result<usize, MemoryError> {
1448 let epoch_now_decay = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1449 let decay_raw = format!(
1450 "UPDATE graph_edges \
1451 SET retrieval_count = MAX(CAST(retrieval_count * ? AS INTEGER), 0) \
1452 WHERE valid_to IS NULL \
1453 AND retrieval_count > 0 \
1454 AND (last_retrieved_at IS NULL OR last_retrieved_at < {epoch_now_decay} - ?)"
1455 );
1456 let decay_sql = zeph_db::rewrite_placeholders(&decay_raw);
1457 let result = zeph_db::query(&decay_sql)
1458 .bind(decay_lambda)
1459 .bind(i64::try_from(interval_secs).unwrap_or(i64::MAX))
1460 .execute(&self.pool)
1461 .await?;
1462 Ok(usize::try_from(result.rows_affected())?)
1463 }
1464
1465 pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
1471 let days = i64::from(retention_days);
1472 let result = zeph_db::query(sql!(
1473 "DELETE FROM graph_edges
1474 WHERE expired_at IS NOT NULL
1475 AND expired_at < datetime('now', '-' || ? || ' days')"
1476 ))
1477 .bind(days)
1478 .execute(&self.pool)
1479 .await?;
1480 Ok(usize::try_from(result.rows_affected())?)
1481 }
1482
1483 pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
1489 let days = i64::from(retention_days);
1490 let result = zeph_db::query(sql!(
1491 "DELETE FROM graph_entities
1492 WHERE id NOT IN (
1493 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
1494 UNION
1495 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
1496 )
1497 AND last_seen_at < datetime('now', '-' || ? || ' days')"
1498 ))
1499 .bind(days)
1500 .execute(&self.pool)
1501 .await?;
1502 Ok(usize::try_from(result.rows_affected())?)
1503 }
1504
1505 pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1514 let current = self.entity_count().await?;
1515 let max = i64::try_from(max_entities)?;
1516 if current <= max {
1517 return Ok(0);
1518 }
1519 let excess = current - max;
1520 let result = zeph_db::query(sql!(
1521 "DELETE FROM graph_entities
1522 WHERE id IN (
1523 SELECT e.id
1524 FROM graph_entities e
1525 LEFT JOIN (
1526 SELECT source_entity_id AS eid, COUNT(*) AS cnt
1527 FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1528 UNION ALL
1529 SELECT target_entity_id AS eid, COUNT(*) AS cnt
1530 FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1531 ) edge_counts ON e.id = edge_counts.eid
1532 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1533 LIMIT ?
1534 )"
1535 ))
1536 .bind(excess)
1537 .execute(&self.pool)
1538 .await?;
1539 Ok(usize::try_from(result.rows_affected())?)
1540 }
1541
1542 pub async fn edges_at_timestamp(
1556 &self,
1557 entity_id: i64,
1558 timestamp: &str,
1559 ) -> Result<Vec<Edge>, MemoryError> {
1560 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1564 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1565 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1566 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1567 FROM graph_edges
1568 WHERE valid_to IS NULL
1569 AND valid_from <= ?
1570 AND (source_entity_id = ? OR target_entity_id = ?)
1571 UNION ALL
1572 SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1573 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1574 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1575 FROM graph_edges
1576 WHERE valid_to IS NOT NULL
1577 AND valid_from <= ?
1578 AND valid_to > ?
1579 AND (source_entity_id = ? OR target_entity_id = ?)"
1580 ))
1581 .bind(timestamp)
1582 .bind(entity_id)
1583 .bind(entity_id)
1584 .bind(timestamp)
1585 .bind(timestamp)
1586 .bind(entity_id)
1587 .bind(entity_id)
1588 .fetch_all(&self.pool)
1589 .await?;
1590 Ok(rows.into_iter().map(edge_from_row).collect())
1591 }
1592
1593 pub async fn edge_history(
1602 &self,
1603 source_entity_id: i64,
1604 predicate: &str,
1605 relation: Option<&str>,
1606 limit: usize,
1607 ) -> Result<Vec<Edge>, MemoryError> {
1608 let escaped = predicate
1610 .replace('\\', "\\\\")
1611 .replace('%', "\\%")
1612 .replace('_', "\\_");
1613 let like_pattern = format!("%{escaped}%");
1614 let limit = i64::try_from(limit)?;
1615 let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1616 zeph_db::query_as(sql!(
1617 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1618 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1619 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1620 FROM graph_edges
1621 WHERE source_entity_id = ?
1622 AND fact LIKE ? ESCAPE '\\'
1623 AND relation = ?
1624 ORDER BY valid_from DESC
1625 LIMIT ?"
1626 ))
1627 .bind(source_entity_id)
1628 .bind(&like_pattern)
1629 .bind(rel)
1630 .bind(limit)
1631 .fetch_all(&self.pool)
1632 .await?
1633 } else {
1634 zeph_db::query_as(sql!(
1635 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1636 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1637 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1638 FROM graph_edges
1639 WHERE source_entity_id = ?
1640 AND fact LIKE ? ESCAPE '\\'
1641 ORDER BY valid_from DESC
1642 LIMIT ?"
1643 ))
1644 .bind(source_entity_id)
1645 .bind(&like_pattern)
1646 .bind(limit)
1647 .fetch_all(&self.pool)
1648 .await?
1649 };
1650 Ok(rows.into_iter().map(edge_from_row).collect())
1651 }
1652
1653 pub async fn bfs(
1670 &self,
1671 start_entity_id: i64,
1672 max_hops: u32,
1673 ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1674 self.bfs_with_depth(start_entity_id, max_hops)
1675 .await
1676 .map(|(e, ed, _)| (e, ed))
1677 }
1678
1679 pub async fn bfs_with_depth(
1690 &self,
1691 start_entity_id: i64,
1692 max_hops: u32,
1693 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1694 self.bfs_core(start_entity_id, max_hops, None).await
1695 }
1696
1697 pub async fn bfs_at_timestamp(
1708 &self,
1709 start_entity_id: i64,
1710 max_hops: u32,
1711 timestamp: &str,
1712 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1713 self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1714 .await
1715 }
1716
1717 pub async fn bfs_typed(
1733 &self,
1734 start_entity_id: i64,
1735 max_hops: u32,
1736 edge_types: &[EdgeType],
1737 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1738 if edge_types.is_empty() {
1739 return self.bfs_with_depth(start_entity_id, max_hops).await;
1740 }
1741 self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1742 .await
1743 }
1744
1745 async fn bfs_core(
1753 &self,
1754 start_entity_id: i64,
1755 max_hops: u32,
1756 at_timestamp: Option<&str>,
1757 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1758 use std::collections::HashMap;
1759
1760 const MAX_FRONTIER: usize = 300;
1763
1764 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1765 let mut frontier: Vec<i64> = vec![start_entity_id];
1766 depth_map.insert(start_entity_id, 0);
1767
1768 for hop in 0..max_hops {
1769 if frontier.is_empty() {
1770 break;
1771 }
1772 frontier.truncate(MAX_FRONTIER);
1773 let n = frontier.len();
1777 let ph1 = placeholder_list(1, n);
1778 let ph2 = placeholder_list(n + 1, n);
1779 let ph3 = placeholder_list(n * 2 + 1, n);
1780 let edge_filter = if at_timestamp.is_some() {
1781 let ts_pos = n * 3 + 1;
1782 format!(
1783 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1784 ts = numbered_placeholder(ts_pos),
1785 )
1786 } else {
1787 "valid_to IS NULL".to_owned()
1788 };
1789 let neighbour_sql = format!(
1790 "SELECT DISTINCT CASE
1791 WHEN source_entity_id IN ({ph1}) THEN target_entity_id
1792 ELSE source_entity_id
1793 END as neighbour_id
1794 FROM graph_edges
1795 WHERE {edge_filter}
1796 AND (source_entity_id IN ({ph2}) OR target_entity_id IN ({ph3}))"
1797 );
1798 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1799 for id in &frontier {
1800 q = q.bind(*id);
1801 }
1802 for id in &frontier {
1803 q = q.bind(*id);
1804 }
1805 for id in &frontier {
1806 q = q.bind(*id);
1807 }
1808 if let Some(ts) = at_timestamp {
1809 q = q.bind(ts);
1810 }
1811 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1812 let mut next_frontier: Vec<i64> = Vec::new();
1813 for nbr in neighbours {
1814 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1815 e.insert(hop + 1);
1816 next_frontier.push(nbr);
1817 }
1818 }
1819 frontier = next_frontier;
1820 }
1821
1822 self.bfs_fetch_results(depth_map, at_timestamp).await
1823 }
1824
1825 async fn bfs_core_typed(
1834 &self,
1835 start_entity_id: i64,
1836 max_hops: u32,
1837 at_timestamp: Option<&str>,
1838 edge_types: &[EdgeType],
1839 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1840 use std::collections::HashMap;
1841
1842 const MAX_FRONTIER: usize = 300;
1843
1844 let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1845
1846 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1847 let mut frontier: Vec<i64> = vec![start_entity_id];
1848 depth_map.insert(start_entity_id, 0);
1849
1850 let n_types = type_strs.len();
1851 let type_in = placeholder_list(1, n_types);
1853 let id_start = n_types + 1;
1854
1855 for hop in 0..max_hops {
1856 if frontier.is_empty() {
1857 break;
1858 }
1859 frontier.truncate(MAX_FRONTIER);
1860
1861 let n_frontier = frontier.len();
1862 let fp1 = placeholder_list(id_start, n_frontier);
1864 let fp2 = placeholder_list(id_start + n_frontier, n_frontier);
1865 let fp3 = placeholder_list(id_start + n_frontier * 2, n_frontier);
1866
1867 let edge_filter = if at_timestamp.is_some() {
1868 let ts_pos = id_start + n_frontier * 3;
1869 format!(
1870 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1871 ts = numbered_placeholder(ts_pos),
1872 )
1873 } else {
1874 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1875 };
1876
1877 let neighbour_sql = format!(
1878 "SELECT DISTINCT CASE
1879 WHEN source_entity_id IN ({fp1}) THEN target_entity_id
1880 ELSE source_entity_id
1881 END as neighbour_id
1882 FROM graph_edges
1883 WHERE {edge_filter}
1884 AND (source_entity_id IN ({fp2}) OR target_entity_id IN ({fp3}))"
1885 );
1886
1887 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1888 for t in &type_strs {
1890 q = q.bind(*t);
1891 }
1892 for id in &frontier {
1894 q = q.bind(*id);
1895 }
1896 for id in &frontier {
1897 q = q.bind(*id);
1898 }
1899 for id in &frontier {
1900 q = q.bind(*id);
1901 }
1902 if let Some(ts) = at_timestamp {
1903 q = q.bind(ts);
1904 }
1905
1906 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1907 let mut next_frontier: Vec<i64> = Vec::new();
1908 for nbr in neighbours {
1909 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1910 e.insert(hop + 1);
1911 next_frontier.push(nbr);
1912 }
1913 }
1914 frontier = next_frontier;
1915 }
1916
1917 self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1919 .await
1920 }
1921
1922 async fn bfs_fetch_results_typed(
1930 &self,
1931 depth_map: std::collections::HashMap<i64, u32>,
1932 at_timestamp: Option<&str>,
1933 type_strs: &[&str],
1934 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1935 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1936 if visited_ids.is_empty() {
1937 return Ok((Vec::new(), Vec::new(), depth_map));
1938 }
1939 if visited_ids.len() > 499 {
1940 tracing::warn!(
1941 total = visited_ids.len(),
1942 retained = 499,
1943 "bfs_fetch_results_typed: visited entity set truncated to 499"
1944 );
1945 visited_ids.truncate(499);
1946 }
1947
1948 let n_types = type_strs.len();
1949 let n_visited = visited_ids.len();
1950
1951 let type_in = placeholder_list(1, n_types);
1953 let id_start = n_types + 1;
1954 let ph_ids1 = placeholder_list(id_start, n_visited);
1955 let ph_ids2 = placeholder_list(id_start + n_visited, n_visited);
1956
1957 let edge_filter = if at_timestamp.is_some() {
1958 let ts_pos = id_start + n_visited * 2;
1959 format!(
1960 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1961 ts = numbered_placeholder(ts_pos),
1962 )
1963 } else {
1964 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1965 };
1966
1967 let edge_sql = format!(
1968 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1969 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1970 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1971 FROM graph_edges
1972 WHERE {edge_filter}
1973 AND source_entity_id IN ({ph_ids1})
1974 AND target_entity_id IN ({ph_ids2})"
1975 );
1976 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1977 for t in type_strs {
1978 edge_query = edge_query.bind(*t);
1979 }
1980 for id in &visited_ids {
1981 edge_query = edge_query.bind(*id);
1982 }
1983 for id in &visited_ids {
1984 edge_query = edge_query.bind(*id);
1985 }
1986 if let Some(ts) = at_timestamp {
1987 edge_query = edge_query.bind(ts);
1988 }
1989 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1990
1991 let entity_sql2 = format!(
1993 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1994 FROM graph_entities WHERE id IN ({ph})",
1995 ph = placeholder_list(1, visited_ids.len()),
1996 );
1997 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql2);
1998 for id in &visited_ids {
1999 entity_query = entity_query.bind(*id);
2000 }
2001 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
2002
2003 let entities: Vec<Entity> = entity_rows
2004 .into_iter()
2005 .map(entity_from_row)
2006 .collect::<Result<Vec<_>, _>>()?;
2007 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
2008
2009 Ok((entities, edges, depth_map))
2010 }
2011
2012 async fn bfs_fetch_results(
2014 &self,
2015 depth_map: std::collections::HashMap<i64, u32>,
2016 at_timestamp: Option<&str>,
2017 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
2018 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
2019 if visited_ids.is_empty() {
2020 return Ok((Vec::new(), Vec::new(), depth_map));
2021 }
2022 if visited_ids.len() > 499 {
2024 tracing::warn!(
2025 total = visited_ids.len(),
2026 retained = 499,
2027 "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
2028 some reachable entities will be dropped from results"
2029 );
2030 visited_ids.truncate(499);
2031 }
2032
2033 let n = visited_ids.len();
2034 let ph_ids1 = placeholder_list(1, n);
2035 let ph_ids2 = placeholder_list(n + 1, n);
2036 let edge_filter = if at_timestamp.is_some() {
2037 let ts_pos = n * 2 + 1;
2038 format!(
2039 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
2040 ts = numbered_placeholder(ts_pos),
2041 )
2042 } else {
2043 "valid_to IS NULL".to_owned()
2044 };
2045 let edge_sql = format!(
2046 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
2047 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
2048 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
2049 FROM graph_edges
2050 WHERE {edge_filter}
2051 AND source_entity_id IN ({ph_ids1})
2052 AND target_entity_id IN ({ph_ids2})"
2053 );
2054 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
2055 for id in &visited_ids {
2056 edge_query = edge_query.bind(*id);
2057 }
2058 for id in &visited_ids {
2059 edge_query = edge_query.bind(*id);
2060 }
2061 if let Some(ts) = at_timestamp {
2062 edge_query = edge_query.bind(ts);
2063 }
2064 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
2065
2066 let entity_sql = format!(
2067 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
2068 FROM graph_entities WHERE id IN ({ph})",
2069 ph = placeholder_list(1, n),
2070 );
2071 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql);
2072 for id in &visited_ids {
2073 entity_query = entity_query.bind(*id);
2074 }
2075 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
2076
2077 let entities: Vec<Entity> = entity_rows
2078 .into_iter()
2079 .map(entity_from_row)
2080 .collect::<Result<Vec<_>, _>>()?;
2081 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
2082
2083 Ok((entities, edges, depth_map))
2084 }
2085
2086 pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
2102 let find_by_name_sql = format!(
2103 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id \
2104 FROM graph_entities \
2105 WHERE name = ? {cn} OR canonical_name = ? {cn} \
2106 LIMIT 5",
2107 cn = <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
2108 );
2109 let rows: Vec<EntityRow> = zeph_db::query_as(&find_by_name_sql)
2110 .bind(name)
2111 .bind(name)
2112 .fetch_all(&self.pool)
2113 .await?;
2114
2115 if !rows.is_empty() {
2116 return rows.into_iter().map(entity_from_row).collect();
2117 }
2118
2119 self.find_entities_fuzzy(name, 5).await
2120 }
2121
2122 pub async fn unprocessed_messages_for_backfill(
2130 &self,
2131 limit: usize,
2132 ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
2133 let limit = i64::try_from(limit)?;
2134 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
2135 "SELECT id, content FROM messages
2136 WHERE graph_processed = 0
2137 ORDER BY id ASC
2138 LIMIT ?"
2139 ))
2140 .bind(limit)
2141 .fetch_all(&self.pool)
2142 .await?;
2143 Ok(rows
2144 .into_iter()
2145 .map(|(id, content)| (crate::types::MessageId(id), content))
2146 .collect())
2147 }
2148
2149 pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
2155 let count: i64 = zeph_db::query_scalar(sql!(
2156 "SELECT COUNT(*) FROM messages WHERE graph_processed = 0"
2157 ))
2158 .fetch_one(&self.pool)
2159 .await?;
2160 Ok(count)
2161 }
2162
2163 pub async fn mark_messages_graph_processed(
2169 &self,
2170 ids: &[crate::types::MessageId],
2171 ) -> Result<(), MemoryError> {
2172 const MAX_BATCH: usize = 490;
2173 if ids.is_empty() {
2174 return Ok(());
2175 }
2176 for chunk in ids.chunks(MAX_BATCH) {
2177 let placeholders = placeholder_list(1, chunk.len());
2178 let sql =
2179 format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
2180 let mut query = zeph_db::query(&sql);
2181 for id in chunk {
2182 query = query.bind(id.0);
2183 }
2184 query.execute(&self.pool).await?;
2185 }
2186 Ok(())
2187 }
2188}
2189
2190#[cfg(feature = "sqlite")]
2193fn community_ids_sql(placeholders: &str) -> String {
2194 format!(
2195 "SELECT CAST(j.value AS INTEGER) AS entity_id, c.id AS community_id
2196 FROM graph_communities c, json_each(c.entity_ids) j
2197 WHERE CAST(j.value AS INTEGER) IN ({placeholders})"
2198 )
2199}
2200
2201#[cfg(feature = "postgres")]
2202fn community_ids_sql(placeholders: &str) -> String {
2203 format!(
2204 "SELECT (j.value)::bigint AS entity_id, c.id AS community_id
2205 FROM graph_communities c,
2206 jsonb_array_elements_text(c.entity_ids::jsonb) j(value)
2207 WHERE (j.value)::bigint IN ({placeholders})"
2208 )
2209}
2210
2211#[derive(zeph_db::FromRow)]
2214struct EntityRow {
2215 id: i64,
2216 name: String,
2217 canonical_name: String,
2218 entity_type: String,
2219 summary: Option<String>,
2220 first_seen_at: String,
2221 last_seen_at: String,
2222 qdrant_point_id: Option<String>,
2223}
2224
2225fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
2226 let entity_type = row
2227 .entity_type
2228 .parse::<EntityType>()
2229 .map_err(MemoryError::GraphStore)?;
2230 Ok(Entity {
2231 id: row.id,
2232 name: row.name,
2233 canonical_name: row.canonical_name,
2234 entity_type,
2235 summary: row.summary,
2236 first_seen_at: row.first_seen_at,
2237 last_seen_at: row.last_seen_at,
2238 qdrant_point_id: row.qdrant_point_id,
2239 })
2240}
2241
2242#[derive(zeph_db::FromRow)]
2243struct AliasRow {
2244 id: i64,
2245 entity_id: i64,
2246 alias_name: String,
2247 created_at: String,
2248}
2249
2250fn alias_from_row(row: AliasRow) -> EntityAlias {
2251 EntityAlias {
2252 id: row.id,
2253 entity_id: row.entity_id,
2254 alias_name: row.alias_name,
2255 created_at: row.created_at,
2256 }
2257}
2258
2259#[derive(zeph_db::FromRow)]
2260struct EdgeRow {
2261 id: i64,
2262 source_entity_id: i64,
2263 target_entity_id: i64,
2264 relation: String,
2265 fact: String,
2266 confidence: f64,
2267 valid_from: String,
2268 valid_to: Option<String>,
2269 created_at: String,
2270 expired_at: Option<String>,
2271 #[sqlx(rename = "episode_id")]
2272 source_message_id: Option<i64>,
2273 qdrant_point_id: Option<String>,
2274 edge_type: String,
2275 retrieval_count: i32,
2276 last_retrieved_at: Option<i64>,
2277 superseded_by: Option<i64>,
2278 canonical_relation: Option<String>,
2279 supersedes: Option<i64>,
2280 weight: f64,
2282}
2283
2284fn edge_from_row(row: EdgeRow) -> Edge {
2285 let edge_type = row
2286 .edge_type
2287 .parse::<EdgeType>()
2288 .unwrap_or(EdgeType::Semantic);
2289 let canonical_relation = row
2290 .canonical_relation
2291 .unwrap_or_else(|| row.relation.clone());
2292 Edge {
2293 id: row.id,
2294 source_entity_id: row.source_entity_id,
2295 target_entity_id: row.target_entity_id,
2296 canonical_relation,
2297 relation: row.relation,
2298 fact: row.fact,
2299 #[allow(clippy::cast_possible_truncation)]
2300 confidence: row.confidence as f32,
2301 valid_from: row.valid_from,
2302 valid_to: row.valid_to,
2303 created_at: row.created_at,
2304 expired_at: row.expired_at,
2305 source_message_id: row.source_message_id.map(MessageId),
2306 qdrant_point_id: row.qdrant_point_id,
2307 edge_type,
2308 retrieval_count: row.retrieval_count,
2309 last_retrieved_at: row.last_retrieved_at,
2310 superseded_by: row.superseded_by,
2311 supersedes: row.supersedes,
2312 #[allow(clippy::cast_possible_truncation)]
2313 weight: row.weight as f32,
2314 }
2315}
2316
2317#[derive(zeph_db::FromRow)]
2318struct CommunityRow {
2319 id: i64,
2320 name: String,
2321 summary: String,
2322 entity_ids: String,
2323 fingerprint: Option<String>,
2324 created_at: String,
2325 updated_at: String,
2326}
2327
2328impl GraphStore {
2331 pub async fn ensure_episode(&self, conversation_id: i64) -> Result<i64, MemoryError> {
2339 zeph_db::query(sql!("INSERT OR IGNORE INTO conversations (id) VALUES (?)"))
2343 .bind(conversation_id)
2344 .execute(&self.pool)
2345 .await?;
2346
2347 let id: i64 = zeph_db::query_scalar(sql!(
2348 "INSERT INTO graph_episodes (conversation_id)
2349 VALUES (?)
2350 ON CONFLICT(conversation_id) DO UPDATE SET conversation_id = excluded.conversation_id
2351 RETURNING id"
2352 ))
2353 .bind(conversation_id)
2354 .fetch_one(&self.pool)
2355 .await?;
2356 Ok(id)
2357 }
2358
2359 pub async fn link_entity_to_episode(
2367 &self,
2368 episode_id: i64,
2369 entity_id: i64,
2370 ) -> Result<(), MemoryError> {
2371 zeph_db::query(sql!(
2372 "INSERT OR IGNORE INTO graph_episode_entities (episode_id, entity_id)
2373 VALUES (?, ?)"
2374 ))
2375 .bind(episode_id)
2376 .bind(entity_id)
2377 .execute(&self.pool)
2378 .await?;
2379 Ok(())
2380 }
2381
2382 pub async fn episodes_for_entity(
2388 &self,
2389 entity_id: i64,
2390 ) -> Result<Vec<super::types::Episode>, MemoryError> {
2391 #[derive(zeph_db::FromRow)]
2392 struct EpisodeRow {
2393 id: i64,
2394 conversation_id: i64,
2395 created_at: String,
2396 closed_at: Option<String>,
2397 }
2398 let rows: Vec<EpisodeRow> = zeph_db::query_as(sql!(
2399 "SELECT e.id, e.conversation_id, e.created_at, e.closed_at
2400 FROM graph_episodes e
2401 JOIN graph_episode_entities ee ON ee.episode_id = e.id
2402 WHERE ee.entity_id = ?"
2403 ))
2404 .bind(entity_id)
2405 .fetch_all(&self.pool)
2406 .await?;
2407 Ok(rows
2408 .into_iter()
2409 .map(|r| super::types::Episode {
2410 id: r.id,
2411 conversation_id: r.conversation_id,
2412 created_at: r.created_at,
2413 closed_at: r.closed_at,
2414 })
2415 .collect())
2416 }
2417
2418 #[allow(clippy::too_many_arguments)]
2437 #[allow(clippy::too_many_lines)]
2438 pub async fn insert_or_supersede_with_metrics(
2439 &self,
2440 source_entity_id: i64,
2441 target_entity_id: i64,
2442 relation: &str,
2443 canonical_relation: &str,
2444 fact: &str,
2445 confidence: f32,
2446 episode_id: Option<MessageId>,
2447 edge_type: EdgeType,
2448 set_supersedes: bool,
2449 metrics: Option<&ApexMetrics>,
2450 ) -> Result<i64, MemoryError> {
2451 if source_entity_id == target_entity_id {
2452 return Err(MemoryError::InvalidInput(format!(
2453 "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
2454 )));
2455 }
2456 let confidence = confidence.clamp(0.0, 1.0);
2457 let edge_type_str = edge_type.as_str();
2458 let episode_raw: Option<i64> = episode_id.map(|m| m.0);
2459
2460 let mut tx = zeph_db::begin(&self.pool).await?;
2461
2462 let identical: Option<i64> = zeph_db::query_scalar(sql!(
2464 "SELECT id FROM graph_edges
2465 WHERE source_entity_id = ?
2466 AND target_entity_id = ?
2467 AND canonical_relation = ?
2468 AND edge_type = ?
2469 AND fact = ?
2470 AND valid_to IS NULL
2471 AND expired_at IS NULL
2472 LIMIT 1"
2473 ))
2474 .bind(source_entity_id)
2475 .bind(target_entity_id)
2476 .bind(canonical_relation)
2477 .bind(edge_type_str)
2478 .bind(fact)
2479 .fetch_optional(&mut *tx)
2480 .await?;
2481
2482 if let Some(existing_id) = identical {
2483 #[allow(clippy::cast_possible_wrap)]
2485 let asserted_at = std::time::SystemTime::now()
2486 .duration_since(std::time::UNIX_EPOCH)
2487 .unwrap_or_default()
2488 .as_secs() as i64;
2489 zeph_db::query(sql!(
2490 "INSERT INTO edge_reassertions (head_edge_id, asserted_at, episode_id, confidence)
2491 VALUES (?, ?, ?, ?)"
2492 ))
2493 .bind(existing_id)
2494 .bind(asserted_at)
2495 .bind(episode_raw)
2496 .bind(f64::from(confidence))
2497 .execute(&mut *tx)
2498 .await?;
2499 tx.commit().await?;
2500 return Ok(existing_id);
2501 }
2502
2503 let prior_head: Option<i64> = zeph_db::query_scalar(sql!(
2505 "SELECT id FROM graph_edges
2506 WHERE source_entity_id = ?
2507 AND canonical_relation = ?
2508 AND edge_type = ?
2509 AND valid_to IS NULL
2510 AND expired_at IS NULL
2511 ORDER BY created_at DESC
2512 LIMIT 1"
2513 ))
2514 .bind(source_entity_id)
2515 .bind(canonical_relation)
2516 .bind(edge_type_str)
2517 .fetch_optional(&mut *tx)
2518 .await?;
2519
2520 if let Some(head_id) = prior_head {
2527 let cap = i64::try_from(SUPERSEDE_DEPTH_CAP + 1).unwrap_or(i64::MAX);
2528 let depth: Option<i64> = sqlx::query_scalar(
2529 "WITH RECURSIVE chain(id, depth) AS (
2530 SELECT supersedes, 1 FROM graph_edges WHERE id = ? AND supersedes IS NOT NULL
2531 UNION ALL
2532 SELECT e.supersedes, c.depth + 1
2533 FROM graph_edges e JOIN chain c ON e.id = c.id
2534 WHERE e.supersedes IS NOT NULL AND c.depth < ?
2535 )
2536 SELECT MAX(depth) FROM chain",
2537 )
2538 .bind(head_id)
2539 .bind(cap)
2540 .fetch_optional(&mut *tx)
2541 .await?
2542 .flatten();
2543 let d = usize::try_from(depth.unwrap_or(0)).unwrap_or(usize::MAX);
2544 if d > SUPERSEDE_DEPTH_CAP {
2545 return Err(MemoryError::SupersedeDepthExceeded(head_id));
2546 }
2547 }
2548
2549 let supersedes_val: Option<i64> = if set_supersedes { prior_head } else { None };
2551 let new_id: i64 = zeph_db::query_scalar(sql!(
2552 "INSERT INTO graph_edges
2553 (source_entity_id, target_entity_id, relation, canonical_relation, fact,
2554 confidence, episode_id, edge_type, supersedes)
2555 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
2556 RETURNING id"
2557 ))
2558 .bind(source_entity_id)
2559 .bind(target_entity_id)
2560 .bind(relation)
2561 .bind(canonical_relation)
2562 .bind(fact)
2563 .bind(f64::from(confidence))
2564 .bind(episode_raw)
2565 .bind(edge_type_str)
2566 .bind(supersedes_val)
2567 .fetch_one(&mut *tx)
2568 .await?;
2569
2570 if let Some(head_id) = prior_head {
2572 zeph_db::query(sql!(
2573 "UPDATE graph_edges
2574 SET valid_to = CURRENT_TIMESTAMP,
2575 expired_at = CURRENT_TIMESTAMP,
2576 superseded_by = ?
2577 WHERE id = ?"
2578 ))
2579 .bind(new_id)
2580 .bind(head_id)
2581 .execute(&mut *tx)
2582 .await?;
2583
2584 if let Some(m) = metrics {
2585 m.supersedes_total
2586 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2587 }
2588 }
2589
2590 tx.commit().await?;
2591 Ok(new_id)
2592 }
2593
2594 #[allow(clippy::too_many_arguments)]
2600 pub async fn insert_or_supersede(
2601 &self,
2602 source_entity_id: i64,
2603 target_entity_id: i64,
2604 relation: &str,
2605 canonical_relation: &str,
2606 fact: &str,
2607 confidence: f32,
2608 episode_id: Option<MessageId>,
2609 edge_type: EdgeType,
2610 set_supersedes: bool,
2611 ) -> Result<i64, MemoryError> {
2612 self.insert_or_supersede_with_metrics(
2613 source_entity_id,
2614 target_entity_id,
2615 relation,
2616 canonical_relation,
2617 fact,
2618 confidence,
2619 episode_id,
2620 edge_type,
2621 set_supersedes,
2622 None,
2623 )
2624 .await
2625 }
2626
2627 pub async fn check_supersede_depth(&self, head_id: i64) -> Result<usize, MemoryError> {
2637 Self::check_supersede_depth_with_pool(&self.pool, head_id).await
2638 }
2639
2640 async fn check_supersede_depth_with_pool(
2641 pool: &zeph_db::DbPool,
2642 head_id: i64,
2643 ) -> Result<usize, MemoryError> {
2644 let cap = i64::try_from(SUPERSEDE_DEPTH_CAP + 1).unwrap_or(i64::MAX);
2645 let depth: Option<i64> = zeph_db::query_scalar(sql!(
2648 "WITH RECURSIVE chain(id, depth) AS (
2649 SELECT id, 0 FROM graph_edges WHERE id = ?
2650 UNION ALL
2651 SELECT e.supersedes, c.depth + 1
2652 FROM graph_edges e JOIN chain c ON e.id = c.id
2653 WHERE e.supersedes IS NOT NULL AND c.depth < ?
2654 )
2655 SELECT MAX(depth) FROM chain"
2656 ))
2657 .bind(head_id)
2658 .bind(cap)
2659 .fetch_optional(pool)
2660 .await?
2661 .flatten();
2662
2663 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
2664 let d = depth.unwrap_or(0) as usize;
2665 if d > SUPERSEDE_DEPTH_CAP {
2666 return Err(MemoryError::SupersedeCycle(head_id));
2667 }
2668 Ok(d)
2669 }
2670}
2671
2672#[cfg(test)]
2675mod tests;