1use std::collections::HashMap;
11#[allow(unused_imports)]
12use zeph_db::sql;
13
14use futures::Stream;
15use zeph_db::fts::sanitize_fts_query;
16use zeph_db::{ActiveDialect, DbPool, numbered_placeholder, placeholder_list};
17
18use crate::error::MemoryError;
19use crate::graph::conflict::{ApexMetrics, SUPERSEDE_DEPTH_CAP};
20use crate::types::{EntityId, MessageId};
21
22use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
23
24pub struct GraphStore {
31 pool: DbPool,
32}
33
34impl GraphStore {
35 #[must_use]
39 pub fn new(pool: DbPool) -> Self {
40 Self { pool }
41 }
42
43 #[must_use]
45 pub fn pool(&self) -> &DbPool {
46 &self.pool
47 }
48
49 pub async fn upsert_entity(
62 &self,
63 surface_name: &str,
64 canonical_name: &str,
65 entity_type: EntityType,
66 summary: Option<&str>,
67 ) -> Result<EntityId, MemoryError> {
68 let type_str = entity_type.as_str();
69 let id: i64 = zeph_db::query_scalar(sql!(
70 "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
71 VALUES (?, ?, ?, ?)
72 ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
73 name = excluded.name,
74 summary = COALESCE(excluded.summary, summary),
75 last_seen_at = CURRENT_TIMESTAMP
76 RETURNING id"
77 ))
78 .bind(surface_name)
79 .bind(canonical_name)
80 .bind(type_str)
81 .bind(summary)
82 .fetch_one(&self.pool)
83 .await?;
84 Ok(EntityId(id))
85 }
86
87 pub async fn find_entity(
93 &self,
94 canonical_name: &str,
95 entity_type: EntityType,
96 ) -> Result<Option<Entity>, MemoryError> {
97 let type_str = entity_type.as_str();
98 let row: Option<EntityRow> = zeph_db::query_as(
99 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
100 FROM graph_entities
101 WHERE canonical_name = ? AND entity_type = ?"),
102 )
103 .bind(canonical_name)
104 .bind(type_str)
105 .fetch_optional(&self.pool)
106 .await?;
107 row.map(entity_from_row).transpose()
108 }
109
110 pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
116 let row: Option<EntityRow> = zeph_db::query_as(
117 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
118 FROM graph_entities
119 WHERE id = ?"),
120 )
121 .bind(entity_id)
122 .fetch_optional(&self.pool)
123 .await?;
124 row.map(entity_from_row).transpose()
125 }
126
127 pub async fn set_entity_qdrant_point_id(
133 &self,
134 entity_id: i64,
135 point_id: &str,
136 ) -> Result<(), MemoryError> {
137 zeph_db::query(sql!(
138 "UPDATE graph_entities SET qdrant_point_id = ? WHERE id = ?"
139 ))
140 .bind(point_id)
141 .bind(entity_id)
142 .execute(&self.pool)
143 .await?;
144 Ok(())
145 }
146
147 pub async fn find_entities_fuzzy(
168 &self,
169 query: &str,
170 limit: usize,
171 ) -> Result<Vec<Entity>, MemoryError> {
172 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
176 let query = &query[..query.floor_char_boundary(512)];
177 let sanitized = sanitize_fts_query(query);
180 if sanitized.is_empty() {
181 return Ok(vec![]);
182 }
183 let fts_query: String = sanitized
184 .split_whitespace()
185 .filter(|t| !FTS5_OPERATORS.contains(t))
186 .map(|t| format!("{t}*"))
187 .collect::<Vec<_>>()
188 .join(" ");
189 if fts_query.is_empty() {
190 return Ok(vec![]);
191 }
192
193 let limit = i64::try_from(limit)?;
194 let search_sql = format!(
197 "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
198 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
199 FROM graph_entities_fts fts \
200 JOIN graph_entities e ON e.id = fts.rowid \
201 WHERE graph_entities_fts MATCH ? \
202 UNION \
203 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
204 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
205 FROM graph_entity_aliases a \
206 JOIN graph_entities e ON e.id = a.entity_id \
207 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
208 LIMIT ?",
209 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
210 );
211 let rows: Vec<EntityRow> = zeph_db::query_as(&search_sql)
212 .bind(&fts_query)
213 .bind(format!(
214 "%{}%",
215 query
216 .trim()
217 .replace('\\', "\\\\")
218 .replace('%', "\\%")
219 .replace('_', "\\_")
220 ))
221 .bind(limit)
222 .fetch_all(&self.pool)
223 .await?;
224 rows.into_iter()
225 .map(entity_from_row)
226 .collect::<Result<Vec<_>, _>>()
227 }
228
229 #[cfg(all(feature = "sqlite", not(feature = "postgres")))]
239 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
240 zeph_db::query("PRAGMA wal_checkpoint(PASSIVE)")
241 .execute(&self.pool)
242 .await?;
243 Ok(())
244 }
245
246 #[cfg(feature = "postgres")]
252 #[allow(clippy::unused_async)]
253 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
254 Ok(())
255 }
256
257 pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
259 use futures::StreamExt as _;
260 zeph_db::query_as::<_, EntityRow>(
261 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
262 FROM graph_entities ORDER BY id ASC"),
263 )
264 .fetch(&self.pool)
265 .map(|r: Result<EntityRow, zeph_db::SqlxError>| {
266 r.map_err(MemoryError::from).and_then(entity_from_row)
267 })
268 }
269
270 pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
278 let insert_alias_sql = format!(
279 "{} INTO graph_entity_aliases (entity_id, alias_name) VALUES (?, ?){}",
280 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
281 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
282 );
283 zeph_db::query(&insert_alias_sql)
284 .bind(entity_id)
285 .bind(alias_name)
286 .execute(&self.pool)
287 .await?;
288 Ok(())
289 }
290
291 pub async fn find_entity_by_alias(
299 &self,
300 alias_name: &str,
301 entity_type: EntityType,
302 ) -> Result<Option<Entity>, MemoryError> {
303 let type_str = entity_type.as_str();
304 let alias_typed_sql = format!(
305 "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
306 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
307 FROM graph_entity_aliases a \
308 JOIN graph_entities e ON e.id = a.entity_id \
309 WHERE a.alias_name = ? {} \
310 AND e.entity_type = ? \
311 ORDER BY e.id ASC \
312 LIMIT 1",
313 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
314 );
315 let row: Option<EntityRow> = zeph_db::query_as(&alias_typed_sql)
316 .bind(alias_name)
317 .bind(type_str)
318 .fetch_optional(&self.pool)
319 .await?;
320 row.map(entity_from_row).transpose()
321 }
322
323 pub async fn aliases_for_entity(
329 &self,
330 entity_id: i64,
331 ) -> Result<Vec<EntityAlias>, MemoryError> {
332 let rows: Vec<AliasRow> = zeph_db::query_as(sql!(
333 "SELECT id, entity_id, alias_name, created_at
334 FROM graph_entity_aliases
335 WHERE entity_id = ?
336 ORDER BY id ASC"
337 ))
338 .bind(entity_id)
339 .fetch_all(&self.pool)
340 .await?;
341 Ok(rows.into_iter().map(alias_from_row).collect())
342 }
343
344 pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
350 use futures::TryStreamExt as _;
351 self.all_entities_stream().try_collect().await
352 }
353
354 pub async fn entity_count(&self) -> Result<i64, MemoryError> {
360 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_entities"))
361 .fetch_one(&self.pool)
362 .await?;
363 Ok(count)
364 }
365
366 pub async fn insert_edge(
384 &self,
385 source_entity_id: i64,
386 target_entity_id: i64,
387 relation: &str,
388 fact: &str,
389 confidence: f32,
390 episode_id: Option<MessageId>,
391 ) -> Result<i64, MemoryError> {
392 self.insert_edge_typed(
393 source_entity_id,
394 target_entity_id,
395 relation,
396 fact,
397 confidence,
398 episode_id,
399 EdgeType::Semantic,
400 )
401 .await
402 }
403
404 #[allow(clippy::too_many_arguments)] pub async fn insert_edge_typed(
414 &self,
415 source_entity_id: i64,
416 target_entity_id: i64,
417 relation: &str,
418 fact: &str,
419 confidence: f32,
420 episode_id: Option<MessageId>,
421 edge_type: EdgeType,
422 ) -> Result<i64, MemoryError> {
423 if source_entity_id == target_entity_id {
424 return Err(MemoryError::InvalidInput(format!(
425 "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
426 )));
427 }
428 let confidence = confidence.clamp(0.0, 1.0);
429 let edge_type_str = edge_type.as_str();
430
431 let mut tx = zeph_db::begin(&self.pool).await?;
436
437 let existing: Option<(i64, f64)> = zeph_db::query_as(sql!(
438 "SELECT id, confidence FROM graph_edges
439 WHERE source_entity_id = ?
440 AND target_entity_id = ?
441 AND relation = ?
442 AND edge_type = ?
443 AND valid_to IS NULL
444 LIMIT 1"
445 ))
446 .bind(source_entity_id)
447 .bind(target_entity_id)
448 .bind(relation)
449 .bind(edge_type_str)
450 .fetch_optional(&mut *tx)
451 .await?;
452
453 if let Some((existing_id, stored_conf)) = existing {
454 let updated_conf = f64::from(confidence).max(stored_conf);
455 zeph_db::query(sql!("UPDATE graph_edges SET confidence = ? WHERE id = ?"))
456 .bind(updated_conf)
457 .bind(existing_id)
458 .execute(&mut *tx)
459 .await?;
460 tx.commit().await?;
461 return Ok(existing_id);
462 }
463
464 let episode_raw: Option<i64> = episode_id.map(|m| m.0);
465 let id: i64 = zeph_db::query_scalar(sql!(
466 "INSERT INTO graph_edges
467 (source_entity_id, target_entity_id, relation, fact, confidence, episode_id, edge_type)
468 VALUES (?, ?, ?, ?, ?, ?, ?)
469 RETURNING id"
470 ))
471 .bind(source_entity_id)
472 .bind(target_entity_id)
473 .bind(relation)
474 .bind(fact)
475 .bind(f64::from(confidence))
476 .bind(episode_raw)
477 .bind(edge_type_str)
478 .fetch_one(&mut *tx)
479 .await?;
480 tx.commit().await?;
481 Ok(id)
482 }
483
484 pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
490 zeph_db::query(sql!(
491 "UPDATE graph_edges SET valid_to = CURRENT_TIMESTAMP, expired_at = CURRENT_TIMESTAMP
492 WHERE id = ?"
493 ))
494 .bind(edge_id)
495 .execute(&self.pool)
496 .await?;
497 Ok(())
498 }
499
500 pub async fn invalidate_edge_with_supersession(
508 &self,
509 old_edge_id: i64,
510 new_edge_id: i64,
511 ) -> Result<(), MemoryError> {
512 zeph_db::query(sql!(
513 "UPDATE graph_edges
514 SET valid_to = CURRENT_TIMESTAMP,
515 expired_at = CURRENT_TIMESTAMP,
516 superseded_by = ?
517 WHERE id = ?"
518 ))
519 .bind(new_edge_id)
520 .bind(old_edge_id)
521 .execute(&self.pool)
522 .await?;
523 Ok(())
524 }
525
526 pub async fn edges_for_entities(
543 &self,
544 entity_ids: &[i64],
545 edge_types: &[super::types::EdgeType],
546 ) -> Result<Vec<Edge>, MemoryError> {
547 const MAX_BATCH_ENTITIES: usize = 490;
551
552 let mut all_edges: Vec<Edge> = Vec::new();
553
554 for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
555 let edges = self.query_batch_edges(chunk, edge_types).await?;
556 all_edges.extend(edges);
557 }
558
559 Ok(all_edges)
560 }
561
562 async fn query_batch_edges(
570 &self,
571 entity_ids: &[i64],
572 edge_types: &[super::types::EdgeType],
573 ) -> Result<Vec<Edge>, MemoryError> {
574 if entity_ids.is_empty() {
575 return Ok(Vec::new());
576 }
577
578 let n_ids = entity_ids.len();
581 let n_types = edge_types.len();
582
583 let sql = if n_types == 0 {
584 let placeholders = placeholder_list(1, n_ids);
586 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
587 format!(
588 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
589 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
590 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
591 FROM graph_edges
592 WHERE valid_to IS NULL
593 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))"
594 )
595 } else {
596 let placeholders = placeholder_list(1, n_ids);
597 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
598 let type_placeholders = placeholder_list(n_ids * 2 + 1, n_types);
599 format!(
600 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
601 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
602 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
603 FROM graph_edges
604 WHERE valid_to IS NULL
605 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))
606 AND edge_type IN ({type_placeholders})"
607 )
608 };
609
610 let mut query = zeph_db::query_as::<_, EdgeRow>(&sql);
612 for id in entity_ids {
613 query = query.bind(*id);
614 }
615 for id in entity_ids {
616 query = query.bind(*id);
617 }
618 for et in edge_types {
619 query = query.bind(et.as_str());
620 }
621
622 let rows: Vec<EdgeRow> = tokio::time::timeout(
626 std::time::Duration::from_millis(500),
627 query.fetch_all(&self.pool),
628 )
629 .await
630 .map_err(|_| MemoryError::Timeout("graph pool acquire timed out after 500ms".into()))??;
631 Ok(rows.into_iter().map(edge_from_row).collect())
632 }
633
634 pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
640 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
641 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
642 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
643 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
644 FROM graph_edges
645 WHERE valid_to IS NULL
646 AND (source_entity_id = ? OR target_entity_id = ?)"
647 ))
648 .bind(entity_id)
649 .bind(entity_id)
650 .fetch_all(&self.pool)
651 .await?;
652 Ok(rows.into_iter().map(edge_from_row).collect())
653 }
654
655 pub async fn edge_history_for_entity(
662 &self,
663 entity_id: i64,
664 limit: usize,
665 ) -> Result<Vec<Edge>, MemoryError> {
666 let limit = i64::try_from(limit)?;
667 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
668 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
669 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
670 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
671 FROM graph_edges
672 WHERE source_entity_id = ? OR target_entity_id = ?
673 ORDER BY valid_from DESC
674 LIMIT ?"
675 ))
676 .bind(entity_id)
677 .bind(entity_id)
678 .bind(limit)
679 .fetch_all(&self.pool)
680 .await?;
681 Ok(rows.into_iter().map(edge_from_row).collect())
682 }
683
684 pub async fn edges_between(
690 &self,
691 entity_a: i64,
692 entity_b: i64,
693 ) -> Result<Vec<Edge>, MemoryError> {
694 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
695 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
696 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
697 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
698 FROM graph_edges
699 WHERE valid_to IS NULL
700 AND ((source_entity_id = ? AND target_entity_id = ?)
701 OR (source_entity_id = ? AND target_entity_id = ?))"
702 ))
703 .bind(entity_a)
704 .bind(entity_b)
705 .bind(entity_b)
706 .bind(entity_a)
707 .fetch_all(&self.pool)
708 .await?;
709 Ok(rows.into_iter().map(edge_from_row).collect())
710 }
711
712 pub async fn edges_exact(
718 &self,
719 source_entity_id: i64,
720 target_entity_id: i64,
721 ) -> Result<Vec<Edge>, MemoryError> {
722 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
723 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
724 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
725 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
726 FROM graph_edges
727 WHERE valid_to IS NULL
728 AND source_entity_id = ?
729 AND target_entity_id = ?"
730 ))
731 .bind(source_entity_id)
732 .bind(target_entity_id)
733 .fetch_all(&self.pool)
734 .await?;
735 Ok(rows.into_iter().map(edge_from_row).collect())
736 }
737
738 pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
744 let count: i64 = zeph_db::query_scalar(sql!(
745 "SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL"
746 ))
747 .fetch_one(&self.pool)
748 .await?;
749 Ok(count)
750 }
751
752 pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
758 let rows: Vec<(String, i64)> = zeph_db::query_as(
759 sql!("SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type"),
760 )
761 .fetch_all(&self.pool)
762 .await?;
763 Ok(rows)
764 }
765
766 pub async fn upsert_community(
778 &self,
779 name: &str,
780 summary: &str,
781 entity_ids: &[i64],
782 fingerprint: Option<&str>,
783 ) -> Result<i64, MemoryError> {
784 let entity_ids_json = serde_json::to_string(entity_ids)?;
785 let id: i64 = zeph_db::query_scalar(sql!(
786 "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
787 VALUES (?, ?, ?, ?)
788 ON CONFLICT(name) DO UPDATE SET
789 summary = excluded.summary,
790 entity_ids = excluded.entity_ids,
791 fingerprint = COALESCE(excluded.fingerprint, fingerprint),
792 updated_at = CURRENT_TIMESTAMP
793 RETURNING id"
794 ))
795 .bind(name)
796 .bind(summary)
797 .bind(entity_ids_json)
798 .bind(fingerprint)
799 .fetch_one(&self.pool)
800 .await?;
801 Ok(id)
802 }
803
804 pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
811 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
812 "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL"
813 ))
814 .fetch_all(&self.pool)
815 .await?;
816 Ok(rows.into_iter().collect())
817 }
818
819 pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
825 zeph_db::query(sql!("DELETE FROM graph_communities WHERE id = ?"))
826 .bind(id)
827 .execute(&self.pool)
828 .await?;
829 Ok(())
830 }
831
832 pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
841 zeph_db::query(sql!(
842 "UPDATE graph_communities SET fingerprint = NULL WHERE id = ?"
843 ))
844 .bind(id)
845 .execute(&self.pool)
846 .await?;
847 Ok(())
848 }
849
850 pub async fn community_for_entity(
859 &self,
860 entity_id: i64,
861 ) -> Result<Option<Community>, MemoryError> {
862 let row: Option<CommunityRow> = zeph_db::query_as(
863 sql!("SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
864 FROM graph_communities c, json_each(c.entity_ids) j
865 WHERE CAST(j.value AS INTEGER) = ?
866 LIMIT 1"),
867 )
868 .bind(entity_id)
869 .fetch_optional(&self.pool)
870 .await?;
871 match row {
872 Some(row) => {
873 let raw_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
874 let entity_ids = raw_ids.into_iter().map(EntityId).collect();
875 Ok(Some(Community {
876 id: row.id,
877 name: row.name,
878 summary: row.summary,
879 entity_ids,
880 fingerprint: row.fingerprint,
881 created_at: row.created_at,
882 updated_at: row.updated_at,
883 }))
884 }
885 None => Ok(None),
886 }
887 }
888
889 pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
895 let rows: Vec<CommunityRow> = zeph_db::query_as(sql!(
896 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
897 FROM graph_communities
898 ORDER BY id ASC"
899 ))
900 .fetch_all(&self.pool)
901 .await?;
902
903 rows.into_iter()
904 .map(|row| {
905 let raw_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
906 let entity_ids = raw_ids.into_iter().map(EntityId).collect();
907 Ok(Community {
908 id: row.id,
909 name: row.name,
910 summary: row.summary,
911 entity_ids,
912 fingerprint: row.fingerprint,
913 created_at: row.created_at,
914 updated_at: row.updated_at,
915 })
916 })
917 .collect()
918 }
919
920 pub async fn community_count(&self) -> Result<i64, MemoryError> {
926 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_communities"))
927 .fetch_one(&self.pool)
928 .await?;
929 Ok(count)
930 }
931
932 pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
940 let val: Option<String> =
941 zeph_db::query_scalar(sql!("SELECT value FROM graph_metadata WHERE key = ?"))
942 .bind(key)
943 .fetch_optional(&self.pool)
944 .await?;
945 Ok(val)
946 }
947
948 pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
954 zeph_db::query(sql!(
955 "INSERT INTO graph_metadata (key, value) VALUES (?, ?)
956 ON CONFLICT(key) DO UPDATE SET value = excluded.value"
957 ))
958 .bind(key)
959 .bind(value)
960 .execute(&self.pool)
961 .await?;
962 Ok(())
963 }
964
965 pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
973 let val = self.get_metadata("extraction_count").await?;
974 Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
975 }
976
977 pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
979 use futures::StreamExt as _;
980 zeph_db::query_as::<_, EdgeRow>(sql!(
981 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
982 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
983 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
984 FROM graph_edges
985 WHERE valid_to IS NULL
986 ORDER BY id ASC"
987 ))
988 .fetch(&self.pool)
989 .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
990 }
991
992 pub async fn edges_after_id(
1009 &self,
1010 after_id: i64,
1011 limit: i64,
1012 ) -> Result<Vec<Edge>, MemoryError> {
1013 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1014 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1015 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1016 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1017 FROM graph_edges
1018 WHERE valid_to IS NULL AND id > ?
1019 ORDER BY id ASC
1020 LIMIT ?"
1021 ))
1022 .bind(after_id)
1023 .bind(limit)
1024 .fetch_all(&self.pool)
1025 .await?;
1026 Ok(rows.into_iter().map(edge_from_row).collect())
1027 }
1028
1029 pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
1035 let row: Option<CommunityRow> = zeph_db::query_as(sql!(
1036 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
1037 FROM graph_communities
1038 WHERE id = ?"
1039 ))
1040 .bind(id)
1041 .fetch_optional(&self.pool)
1042 .await?;
1043 match row {
1044 Some(row) => {
1045 let raw_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
1046 let entity_ids = raw_ids.into_iter().map(EntityId).collect();
1047 Ok(Some(Community {
1048 id: row.id,
1049 name: row.name,
1050 summary: row.summary,
1051 entity_ids,
1052 fingerprint: row.fingerprint,
1053 created_at: row.created_at,
1054 updated_at: row.updated_at,
1055 }))
1056 }
1057 None => Ok(None),
1058 }
1059 }
1060
1061 pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
1067 zeph_db::query(sql!("DELETE FROM graph_communities"))
1068 .execute(&self.pool)
1069 .await?;
1070 Ok(())
1071 }
1072
1073 pub async fn find_entities_ranked(
1087 &self,
1088 query: &str,
1089 limit: usize,
1090 ) -> Result<Vec<(Entity, f32)>, MemoryError> {
1091 let query = &query[..query.floor_char_boundary(512)];
1092 let Some(fts_query) = build_fts_query(query) else {
1093 return Ok(vec![]);
1094 };
1095
1096 let limit_i64 = i64::try_from(limit)?;
1097 let ranked_fts_sql = build_ranked_fts_sql();
1098 let rows: Vec<EntityFtsRow> = zeph_db::query_as(&ranked_fts_sql)
1099 .bind(&fts_query)
1100 .bind(format!(
1101 "%{}%",
1102 query
1103 .trim()
1104 .replace('\\', "\\\\")
1105 .replace('%', "\\%")
1106 .replace('_', "\\_")
1107 ))
1108 .bind(limit_i64)
1109 .fetch_all(&self.pool)
1110 .await?;
1111
1112 if rows.is_empty() {
1113 return Ok(vec![]);
1114 }
1115
1116 Ok(normalize_and_dedup(rows))
1117 }
1118
1119 pub async fn entity_structural_scores(
1129 &self,
1130 entity_ids: &[i64],
1131 ) -> Result<HashMap<i64, f32>, MemoryError> {
1132 const MAX_BATCH: usize = 163;
1135
1136 if entity_ids.is_empty() {
1137 return Ok(HashMap::new());
1138 }
1139
1140 let mut all_rows: Vec<(i64, i64, i64)> = Vec::new();
1141 for chunk in entity_ids.chunks(MAX_BATCH) {
1142 let n = chunk.len();
1143 let ph1 = placeholder_list(1, n);
1145 let ph2 = placeholder_list(n + 1, n);
1146 let ph3 = placeholder_list(n * 2 + 1, n);
1147
1148 let sql = format!(
1150 "SELECT entity_id,
1151 COUNT(*) AS degree,
1152 COUNT(DISTINCT edge_type) AS type_diversity
1153 FROM (
1154 SELECT source_entity_id AS entity_id, edge_type
1155 FROM graph_edges
1156 WHERE valid_to IS NULL AND source_entity_id IN ({ph1})
1157 UNION ALL
1158 SELECT target_entity_id AS entity_id, edge_type
1159 FROM graph_edges
1160 WHERE valid_to IS NULL AND target_entity_id IN ({ph2})
1161 )
1162 WHERE entity_id IN ({ph3})
1163 GROUP BY entity_id"
1164 );
1165
1166 let mut query = zeph_db::query_as::<_, (i64, i64, i64)>(&sql);
1167 for id in chunk {
1169 query = query.bind(*id);
1170 }
1171 for id in chunk {
1172 query = query.bind(*id);
1173 }
1174 for id in chunk {
1175 query = query.bind(*id);
1176 }
1177
1178 let chunk_rows: Vec<(i64, i64, i64)> = query.fetch_all(&self.pool).await?;
1179 all_rows.extend(chunk_rows);
1180 }
1181
1182 if all_rows.is_empty() {
1183 return Ok(entity_ids.iter().map(|&id| (id, 0.0_f32)).collect());
1184 }
1185
1186 let max_degree = all_rows
1187 .iter()
1188 .map(|(_, d, _)| *d)
1189 .max()
1190 .unwrap_or(1)
1191 .max(1);
1192
1193 let mut scores: HashMap<i64, f32> = entity_ids.iter().map(|&id| (id, 0.0_f32)).collect();
1194 for (entity_id, degree, type_diversity) in all_rows {
1195 #[allow(clippy::cast_precision_loss)]
1196 let norm_degree = degree as f32 / max_degree as f32;
1197 #[allow(clippy::cast_precision_loss)]
1198 let norm_diversity = (type_diversity as f32 / 4.0).min(1.0);
1199 let score = 0.6 * norm_degree + 0.4 * norm_diversity;
1200 scores.insert(entity_id, score);
1201 }
1202
1203 Ok(scores)
1204 }
1205
1206 #[cfg(any(feature = "sqlite", feature = "postgres"))]
1215 pub async fn entity_community_ids(
1216 &self,
1217 entity_ids: &[i64],
1218 ) -> Result<HashMap<i64, i64>, MemoryError> {
1219 const MAX_BATCH: usize = 490;
1220
1221 if entity_ids.is_empty() {
1222 return Ok(HashMap::new());
1223 }
1224
1225 let mut result: HashMap<i64, i64> = HashMap::new();
1226 for chunk in entity_ids.chunks(MAX_BATCH) {
1227 let placeholders = placeholder_list(1, chunk.len());
1228
1229 let community_sql = community_ids_sql(&placeholders);
1230 let mut query = zeph_db::query_as::<_, (i64, i64)>(&community_sql);
1231 for id in chunk {
1232 query = query.bind(*id);
1233 }
1234
1235 let rows: Vec<(i64, i64)> = query.fetch_all(&self.pool).await?;
1236 result.extend(rows);
1237 }
1238
1239 Ok(result)
1240 }
1241
1242 pub async fn record_edge_retrieval(&self, edge_ids: &[i64]) -> Result<(), MemoryError> {
1251 const MAX_BATCH: usize = 490;
1252 let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1253 for chunk in edge_ids.chunks(MAX_BATCH) {
1254 let edge_placeholders = placeholder_list(1, chunk.len());
1255 let retrieval_sql = format!(
1256 "UPDATE graph_edges \
1257 SET retrieval_count = retrieval_count + 1, \
1258 last_retrieved_at = {epoch_now} \
1259 WHERE id IN ({edge_placeholders})"
1260 );
1261 let mut q = zeph_db::query(&retrieval_sql);
1262 for id in chunk {
1263 q = q.bind(*id);
1264 }
1265 q.execute(&self.pool).await?;
1266 }
1267 Ok(())
1268 }
1269
1270 #[tracing::instrument(
1282 name = "memory.graph.hebbian_increment",
1283 skip_all,
1284 fields(edge_count = edge_ids.len())
1285 )]
1286 pub async fn apply_hebbian_increment(
1287 &self,
1288 edge_ids: &[i64],
1289 delta: f32,
1290 ) -> Result<(), MemoryError> {
1291 const MAX_BATCH: usize = 490;
1294 if edge_ids.is_empty() || delta == 0.0 {
1295 return Ok(());
1296 }
1297 for chunk in edge_ids.chunks(MAX_BATCH) {
1298 let edge_placeholders = placeholder_list(2, chunk.len());
1299 let sql = format!(
1300 "UPDATE graph_edges \
1301 SET weight = weight + $1 \
1302 WHERE id IN ({edge_placeholders}) \
1303 AND valid_to IS NULL"
1304 );
1305 let mut q = zeph_db::query(&sql);
1306 q = q.bind(f64::from(delta));
1307 for id in chunk {
1308 q = q.bind(*id);
1309 }
1310 q.execute(&self.pool).await?;
1311 }
1312 Ok(())
1313 }
1314
1315 pub async fn entity_ids_in(&self, ids: &[i64]) -> Result<Vec<i64>, MemoryError> {
1324 const MAX_BATCH: usize = 490;
1325 if ids.is_empty() {
1326 return Ok(Vec::new());
1327 }
1328 let mut result = Vec::with_capacity(ids.len());
1330 for chunk in ids.chunks(MAX_BATCH) {
1331 let placeholders = zeph_db::placeholder_list(1, chunk.len());
1332 let sql = format!("SELECT id FROM graph_entities WHERE id IN ({placeholders})");
1333 let mut q = zeph_db::query_as::<_, (i64,)>(&sql);
1334 for id in chunk {
1335 q = q.bind(*id);
1336 }
1337 let rows = q.fetch_all(&self.pool).await?;
1338 for (id,) in rows {
1339 result.push(id);
1340 }
1341 }
1342 Ok(result)
1343 }
1344
1345 pub async fn qdrant_point_ids_for_entities(
1353 &self,
1354 entity_ids: &[i64],
1355 ) -> Result<HashMap<i64, String>, MemoryError> {
1356 const MAX_BATCH: usize = 490;
1358 if entity_ids.is_empty() {
1359 return Ok(HashMap::new());
1360 }
1361 let mut result: HashMap<i64, String> = HashMap::with_capacity(entity_ids.len());
1362 for chunk in entity_ids.chunks(MAX_BATCH) {
1363 let placeholders = zeph_db::placeholder_list(1, chunk.len());
1364 let sql = format!(
1365 "SELECT id, qdrant_point_id \
1366 FROM graph_entities \
1367 WHERE id IN ({placeholders}) \
1368 AND qdrant_point_id IS NOT NULL"
1369 );
1370 let mut q = zeph_db::query_as::<_, (i64, String)>(&sql);
1371 for id in chunk {
1372 q = q.bind(*id);
1373 }
1374 let rows = q.fetch_all(&self.pool).await?;
1375 for (entity_id, point_id) in rows {
1376 result.insert(entity_id, point_id);
1377 }
1378 }
1379 Ok(result)
1380 }
1381
1382 pub async fn decay_edge_retrieval_counts(
1391 &self,
1392 decay_lambda: f64,
1393 interval_secs: u64,
1394 ) -> Result<usize, MemoryError> {
1395 let epoch_now_decay = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1396 let decay_raw = format!(
1397 "UPDATE graph_edges \
1398 SET retrieval_count = MAX(CAST(retrieval_count * ? AS INTEGER), 0) \
1399 WHERE valid_to IS NULL \
1400 AND retrieval_count > 0 \
1401 AND (last_retrieved_at IS NULL OR last_retrieved_at < {epoch_now_decay} - ?)"
1402 );
1403 let decay_sql = zeph_db::rewrite_placeholders(&decay_raw);
1404 let result = zeph_db::query(&decay_sql)
1405 .bind(decay_lambda)
1406 .bind(i64::try_from(interval_secs).unwrap_or(i64::MAX))
1407 .execute(&self.pool)
1408 .await?;
1409 Ok(usize::try_from(result.rows_affected())?)
1410 }
1411
1412 pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
1418 let days = i64::from(retention_days);
1419 let result = zeph_db::query(sql!(
1420 "DELETE FROM graph_edges
1421 WHERE expired_at IS NOT NULL
1422 AND expired_at < datetime('now', '-' || ? || ' days')"
1423 ))
1424 .bind(days)
1425 .execute(&self.pool)
1426 .await?;
1427 Ok(usize::try_from(result.rows_affected())?)
1428 }
1429
1430 pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
1436 let days = i64::from(retention_days);
1437 let result = zeph_db::query(sql!(
1438 "DELETE FROM graph_entities
1439 WHERE id NOT IN (
1440 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
1441 UNION
1442 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
1443 )
1444 AND last_seen_at < datetime('now', '-' || ? || ' days')"
1445 ))
1446 .bind(days)
1447 .execute(&self.pool)
1448 .await?;
1449 Ok(usize::try_from(result.rows_affected())?)
1450 }
1451
1452 pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1461 let current = self.entity_count().await?;
1462 let max = i64::try_from(max_entities)?;
1463 if current <= max {
1464 return Ok(0);
1465 }
1466 let excess = current - max;
1467 let result = zeph_db::query(sql!(
1468 "DELETE FROM graph_entities
1469 WHERE id IN (
1470 SELECT e.id
1471 FROM graph_entities e
1472 LEFT JOIN (
1473 SELECT source_entity_id AS eid, COUNT(*) AS cnt
1474 FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1475 UNION ALL
1476 SELECT target_entity_id AS eid, COUNT(*) AS cnt
1477 FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1478 ) edge_counts ON e.id = edge_counts.eid
1479 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1480 LIMIT ?
1481 )"
1482 ))
1483 .bind(excess)
1484 .execute(&self.pool)
1485 .await?;
1486 Ok(usize::try_from(result.rows_affected())?)
1487 }
1488
1489 pub async fn edges_at_timestamp(
1503 &self,
1504 entity_id: i64,
1505 timestamp: &str,
1506 ) -> Result<Vec<Edge>, MemoryError> {
1507 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1511 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1512 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1513 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1514 FROM graph_edges
1515 WHERE valid_to IS NULL
1516 AND valid_from <= ?
1517 AND (source_entity_id = ? OR target_entity_id = ?)
1518 UNION ALL
1519 SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1520 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1521 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1522 FROM graph_edges
1523 WHERE valid_to IS NOT NULL
1524 AND valid_from <= ?
1525 AND valid_to > ?
1526 AND (source_entity_id = ? OR target_entity_id = ?)"
1527 ))
1528 .bind(timestamp)
1529 .bind(entity_id)
1530 .bind(entity_id)
1531 .bind(timestamp)
1532 .bind(timestamp)
1533 .bind(entity_id)
1534 .bind(entity_id)
1535 .fetch_all(&self.pool)
1536 .await?;
1537 Ok(rows.into_iter().map(edge_from_row).collect())
1538 }
1539
1540 pub async fn edge_history(
1549 &self,
1550 source_entity_id: i64,
1551 predicate: &str,
1552 relation: Option<&str>,
1553 limit: usize,
1554 ) -> Result<Vec<Edge>, MemoryError> {
1555 let escaped = predicate
1557 .replace('\\', "\\\\")
1558 .replace('%', "\\%")
1559 .replace('_', "\\_");
1560 let like_pattern = format!("%{escaped}%");
1561 let limit = i64::try_from(limit)?;
1562 let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1563 zeph_db::query_as(sql!(
1564 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1565 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1566 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1567 FROM graph_edges
1568 WHERE source_entity_id = ?
1569 AND fact LIKE ? ESCAPE '\\'
1570 AND relation = ?
1571 ORDER BY valid_from DESC
1572 LIMIT ?"
1573 ))
1574 .bind(source_entity_id)
1575 .bind(&like_pattern)
1576 .bind(rel)
1577 .bind(limit)
1578 .fetch_all(&self.pool)
1579 .await?
1580 } else {
1581 zeph_db::query_as(sql!(
1582 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1583 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1584 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1585 FROM graph_edges
1586 WHERE source_entity_id = ?
1587 AND fact LIKE ? ESCAPE '\\'
1588 ORDER BY valid_from DESC
1589 LIMIT ?"
1590 ))
1591 .bind(source_entity_id)
1592 .bind(&like_pattern)
1593 .bind(limit)
1594 .fetch_all(&self.pool)
1595 .await?
1596 };
1597 Ok(rows.into_iter().map(edge_from_row).collect())
1598 }
1599
1600 pub async fn bfs(
1617 &self,
1618 start_entity_id: i64,
1619 max_hops: u32,
1620 ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1621 self.bfs_with_depth(start_entity_id, max_hops)
1622 .await
1623 .map(|(e, ed, _)| (e, ed))
1624 }
1625
1626 pub async fn bfs_with_depth(
1637 &self,
1638 start_entity_id: i64,
1639 max_hops: u32,
1640 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1641 self.bfs_core(start_entity_id, max_hops, None).await
1642 }
1643
1644 pub async fn bfs_at_timestamp(
1655 &self,
1656 start_entity_id: i64,
1657 max_hops: u32,
1658 timestamp: &str,
1659 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1660 self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1661 .await
1662 }
1663
1664 pub async fn bfs_typed(
1680 &self,
1681 start_entity_id: i64,
1682 max_hops: u32,
1683 edge_types: &[EdgeType],
1684 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1685 if edge_types.is_empty() {
1686 return self.bfs_with_depth(start_entity_id, max_hops).await;
1687 }
1688 self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1689 .await
1690 }
1691
1692 async fn bfs_core(
1700 &self,
1701 start_entity_id: i64,
1702 max_hops: u32,
1703 at_timestamp: Option<&str>,
1704 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1705 use std::collections::HashMap;
1706
1707 const MAX_FRONTIER: usize = 300;
1710
1711 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1712 let mut frontier: Vec<i64> = vec![start_entity_id];
1713 depth_map.insert(start_entity_id, 0);
1714
1715 for hop in 0..max_hops {
1716 if frontier.is_empty() {
1717 break;
1718 }
1719 frontier.truncate(MAX_FRONTIER);
1720 let n = frontier.len();
1724 let ph1 = placeholder_list(1, n);
1725 let ph2 = placeholder_list(n + 1, n);
1726 let ph3 = placeholder_list(n * 2 + 1, n);
1727 let edge_filter = if at_timestamp.is_some() {
1728 let ts_pos = n * 3 + 1;
1729 format!(
1730 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1731 ts = numbered_placeholder(ts_pos),
1732 )
1733 } else {
1734 "valid_to IS NULL".to_owned()
1735 };
1736 let neighbour_sql = format!(
1737 "SELECT DISTINCT CASE
1738 WHEN source_entity_id IN ({ph1}) THEN target_entity_id
1739 ELSE source_entity_id
1740 END as neighbour_id
1741 FROM graph_edges
1742 WHERE {edge_filter}
1743 AND (source_entity_id IN ({ph2}) OR target_entity_id IN ({ph3}))"
1744 );
1745 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1746 for id in &frontier {
1747 q = q.bind(*id);
1748 }
1749 for id in &frontier {
1750 q = q.bind(*id);
1751 }
1752 for id in &frontier {
1753 q = q.bind(*id);
1754 }
1755 if let Some(ts) = at_timestamp {
1756 q = q.bind(ts);
1757 }
1758 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1759 let mut next_frontier: Vec<i64> = Vec::new();
1760 for nbr in neighbours {
1761 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1762 e.insert(hop + 1);
1763 next_frontier.push(nbr);
1764 }
1765 }
1766 frontier = next_frontier;
1767 }
1768
1769 self.bfs_fetch_results(depth_map, at_timestamp).await
1770 }
1771
1772 async fn bfs_core_typed(
1781 &self,
1782 start_entity_id: i64,
1783 max_hops: u32,
1784 at_timestamp: Option<&str>,
1785 edge_types: &[EdgeType],
1786 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1787 use std::collections::HashMap;
1788
1789 const MAX_FRONTIER: usize = 300;
1790
1791 let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1792
1793 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1794 let mut frontier: Vec<i64> = vec![start_entity_id];
1795 depth_map.insert(start_entity_id, 0);
1796
1797 let n_types = type_strs.len();
1798 let type_in = placeholder_list(1, n_types);
1800 let id_start = n_types + 1;
1801
1802 for hop in 0..max_hops {
1803 if frontier.is_empty() {
1804 break;
1805 }
1806 frontier.truncate(MAX_FRONTIER);
1807
1808 let n_frontier = frontier.len();
1809 let fp1 = placeholder_list(id_start, n_frontier);
1811 let fp2 = placeholder_list(id_start + n_frontier, n_frontier);
1812 let fp3 = placeholder_list(id_start + n_frontier * 2, n_frontier);
1813
1814 let edge_filter = if at_timestamp.is_some() {
1815 let ts_pos = id_start + n_frontier * 3;
1816 format!(
1817 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1818 ts = numbered_placeholder(ts_pos),
1819 )
1820 } else {
1821 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1822 };
1823
1824 let neighbour_sql = format!(
1825 "SELECT DISTINCT CASE
1826 WHEN source_entity_id IN ({fp1}) THEN target_entity_id
1827 ELSE source_entity_id
1828 END as neighbour_id
1829 FROM graph_edges
1830 WHERE {edge_filter}
1831 AND (source_entity_id IN ({fp2}) OR target_entity_id IN ({fp3}))"
1832 );
1833
1834 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1835 for t in &type_strs {
1837 q = q.bind(*t);
1838 }
1839 for id in &frontier {
1841 q = q.bind(*id);
1842 }
1843 for id in &frontier {
1844 q = q.bind(*id);
1845 }
1846 for id in &frontier {
1847 q = q.bind(*id);
1848 }
1849 if let Some(ts) = at_timestamp {
1850 q = q.bind(ts);
1851 }
1852
1853 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1854 let mut next_frontier: Vec<i64> = Vec::new();
1855 for nbr in neighbours {
1856 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1857 e.insert(hop + 1);
1858 next_frontier.push(nbr);
1859 }
1860 }
1861 frontier = next_frontier;
1862 }
1863
1864 self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1866 .await
1867 }
1868
1869 async fn bfs_fetch_results_typed(
1877 &self,
1878 depth_map: std::collections::HashMap<i64, u32>,
1879 at_timestamp: Option<&str>,
1880 type_strs: &[&str],
1881 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1882 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1883 if visited_ids.is_empty() {
1884 return Ok((Vec::new(), Vec::new(), depth_map));
1885 }
1886 if visited_ids.len() > 499 {
1887 tracing::warn!(
1888 total = visited_ids.len(),
1889 retained = 499,
1890 "bfs_fetch_results_typed: visited entity set truncated to 499"
1891 );
1892 visited_ids.truncate(499);
1893 }
1894
1895 let n_types = type_strs.len();
1896 let n_visited = visited_ids.len();
1897
1898 let type_in = placeholder_list(1, n_types);
1900 let id_start = n_types + 1;
1901 let ph_ids1 = placeholder_list(id_start, n_visited);
1902 let ph_ids2 = placeholder_list(id_start + n_visited, n_visited);
1903
1904 let edge_filter = if at_timestamp.is_some() {
1905 let ts_pos = id_start + n_visited * 2;
1906 format!(
1907 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1908 ts = numbered_placeholder(ts_pos),
1909 )
1910 } else {
1911 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1912 };
1913
1914 let edge_sql = format!(
1915 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1916 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1917 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1918 FROM graph_edges
1919 WHERE {edge_filter}
1920 AND source_entity_id IN ({ph_ids1})
1921 AND target_entity_id IN ({ph_ids2})"
1922 );
1923 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1924 for t in type_strs {
1925 edge_query = edge_query.bind(*t);
1926 }
1927 for id in &visited_ids {
1928 edge_query = edge_query.bind(*id);
1929 }
1930 for id in &visited_ids {
1931 edge_query = edge_query.bind(*id);
1932 }
1933 if let Some(ts) = at_timestamp {
1934 edge_query = edge_query.bind(ts);
1935 }
1936 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1937
1938 let entity_sql2 = format!(
1940 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1941 FROM graph_entities WHERE id IN ({ph})",
1942 ph = placeholder_list(1, visited_ids.len()),
1943 );
1944 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql2);
1945 for id in &visited_ids {
1946 entity_query = entity_query.bind(*id);
1947 }
1948 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1949
1950 let entities: Vec<Entity> = entity_rows
1951 .into_iter()
1952 .map(entity_from_row)
1953 .collect::<Result<Vec<_>, _>>()?;
1954 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1955
1956 Ok((entities, edges, depth_map))
1957 }
1958
1959 async fn bfs_fetch_results(
1961 &self,
1962 depth_map: std::collections::HashMap<i64, u32>,
1963 at_timestamp: Option<&str>,
1964 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1965 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1966 if visited_ids.is_empty() {
1967 return Ok((Vec::new(), Vec::new(), depth_map));
1968 }
1969 if visited_ids.len() > 499 {
1971 tracing::warn!(
1972 total = visited_ids.len(),
1973 retained = 499,
1974 "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1975 some reachable entities will be dropped from results"
1976 );
1977 visited_ids.truncate(499);
1978 }
1979
1980 let n = visited_ids.len();
1981 let ph_ids1 = placeholder_list(1, n);
1982 let ph_ids2 = placeholder_list(n + 1, n);
1983 let edge_filter = if at_timestamp.is_some() {
1984 let ts_pos = n * 2 + 1;
1985 format!(
1986 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1987 ts = numbered_placeholder(ts_pos),
1988 )
1989 } else {
1990 "valid_to IS NULL".to_owned()
1991 };
1992 let edge_sql = format!(
1993 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1994 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1995 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1996 FROM graph_edges
1997 WHERE {edge_filter}
1998 AND source_entity_id IN ({ph_ids1})
1999 AND target_entity_id IN ({ph_ids2})"
2000 );
2001 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
2002 for id in &visited_ids {
2003 edge_query = edge_query.bind(*id);
2004 }
2005 for id in &visited_ids {
2006 edge_query = edge_query.bind(*id);
2007 }
2008 if let Some(ts) = at_timestamp {
2009 edge_query = edge_query.bind(ts);
2010 }
2011 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
2012
2013 let entity_sql = format!(
2014 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
2015 FROM graph_entities WHERE id IN ({ph})",
2016 ph = placeholder_list(1, n),
2017 );
2018 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql);
2019 for id in &visited_ids {
2020 entity_query = entity_query.bind(*id);
2021 }
2022 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
2023
2024 let entities: Vec<Entity> = entity_rows
2025 .into_iter()
2026 .map(entity_from_row)
2027 .collect::<Result<Vec<_>, _>>()?;
2028 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
2029
2030 Ok((entities, edges, depth_map))
2031 }
2032
2033 pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
2049 let find_by_name_sql = format!(
2050 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id \
2051 FROM graph_entities \
2052 WHERE name = ? {cn} OR canonical_name = ? {cn} \
2053 LIMIT 5",
2054 cn = <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
2055 );
2056 let rows: Vec<EntityRow> = zeph_db::query_as(&find_by_name_sql)
2057 .bind(name)
2058 .bind(name)
2059 .fetch_all(&self.pool)
2060 .await?;
2061
2062 if !rows.is_empty() {
2063 return rows.into_iter().map(entity_from_row).collect();
2064 }
2065
2066 self.find_entities_fuzzy(name, 5).await
2067 }
2068
2069 pub async fn unprocessed_messages_for_backfill(
2077 &self,
2078 limit: usize,
2079 ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
2080 let limit = i64::try_from(limit)?;
2081 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
2082 "SELECT id, content FROM messages
2083 WHERE graph_processed = 0
2084 ORDER BY id ASC
2085 LIMIT ?"
2086 ))
2087 .bind(limit)
2088 .fetch_all(&self.pool)
2089 .await?;
2090 Ok(rows
2091 .into_iter()
2092 .map(|(id, content)| (crate::types::MessageId(id), content))
2093 .collect())
2094 }
2095
2096 pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
2102 let count: i64 = zeph_db::query_scalar(sql!(
2103 "SELECT COUNT(*) FROM messages WHERE graph_processed = 0"
2104 ))
2105 .fetch_one(&self.pool)
2106 .await?;
2107 Ok(count)
2108 }
2109
2110 pub async fn mark_messages_graph_processed(
2116 &self,
2117 ids: &[crate::types::MessageId],
2118 ) -> Result<(), MemoryError> {
2119 const MAX_BATCH: usize = 490;
2120 if ids.is_empty() {
2121 return Ok(());
2122 }
2123 for chunk in ids.chunks(MAX_BATCH) {
2124 let placeholders = placeholder_list(1, chunk.len());
2125 let sql =
2126 format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
2127 let mut query = zeph_db::query(&sql);
2128 for id in chunk {
2129 query = query.bind(id.0);
2130 }
2131 query.execute(&self.pool).await?;
2132 }
2133 Ok(())
2134 }
2135}
2136
2137#[cfg(all(feature = "sqlite", not(feature = "postgres")))]
2140fn community_ids_sql(placeholders: &str) -> String {
2141 format!(
2142 "SELECT CAST(j.value AS INTEGER) AS entity_id, c.id AS community_id
2143 FROM graph_communities c, json_each(c.entity_ids) j
2144 WHERE CAST(j.value AS INTEGER) IN ({placeholders})"
2145 )
2146}
2147
2148#[cfg(feature = "postgres")]
2149fn community_ids_sql(placeholders: &str) -> String {
2150 format!(
2151 "SELECT (j.value)::bigint AS entity_id, c.id AS community_id
2152 FROM graph_communities c,
2153 jsonb_array_elements_text(c.entity_ids::jsonb) j(value)
2154 WHERE (j.value)::bigint IN ({placeholders})"
2155 )
2156}
2157
2158#[derive(zeph_db::FromRow)]
2161struct EntityRow {
2162 id: i64,
2163 name: String,
2164 canonical_name: String,
2165 entity_type: String,
2166 summary: Option<String>,
2167 first_seen_at: String,
2168 last_seen_at: String,
2169 qdrant_point_id: Option<String>,
2170}
2171
2172fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
2173 let entity_type = row
2174 .entity_type
2175 .parse::<EntityType>()
2176 .map_err(MemoryError::GraphStore)?;
2177 Ok(Entity {
2178 id: EntityId(row.id),
2179 name: row.name,
2180 canonical_name: row.canonical_name,
2181 entity_type,
2182 summary: row.summary,
2183 first_seen_at: row.first_seen_at,
2184 last_seen_at: row.last_seen_at,
2185 qdrant_point_id: row.qdrant_point_id,
2186 })
2187}
2188
2189#[derive(zeph_db::FromRow)]
2190struct AliasRow {
2191 id: i64,
2192 entity_id: i64,
2193 alias_name: String,
2194 created_at: String,
2195}
2196
2197fn alias_from_row(row: AliasRow) -> EntityAlias {
2198 EntityAlias {
2199 id: row.id,
2200 entity_id: EntityId(row.entity_id),
2201 alias_name: row.alias_name,
2202 created_at: row.created_at,
2203 }
2204}
2205
2206#[derive(zeph_db::FromRow)]
2207struct EdgeRow {
2208 id: i64,
2209 source_entity_id: i64,
2210 target_entity_id: i64,
2211 relation: String,
2212 fact: String,
2213 confidence: f64,
2214 valid_from: String,
2215 valid_to: Option<String>,
2216 created_at: String,
2217 expired_at: Option<String>,
2218 #[sqlx(rename = "episode_id")]
2219 source_message_id: Option<i64>,
2220 qdrant_point_id: Option<String>,
2221 edge_type: String,
2222 retrieval_count: i32,
2223 last_retrieved_at: Option<i64>,
2224 superseded_by: Option<i64>,
2225 canonical_relation: Option<String>,
2226 supersedes: Option<i64>,
2227 weight: f64,
2229}
2230
2231fn edge_from_row(row: EdgeRow) -> Edge {
2232 let edge_type = row
2233 .edge_type
2234 .parse::<EdgeType>()
2235 .unwrap_or(EdgeType::Semantic);
2236 let canonical_relation = row
2237 .canonical_relation
2238 .unwrap_or_else(|| row.relation.clone());
2239 Edge {
2240 id: row.id,
2241 source_entity_id: row.source_entity_id,
2242 target_entity_id: row.target_entity_id,
2243 canonical_relation,
2244 relation: row.relation,
2245 fact: row.fact,
2246 #[allow(clippy::cast_possible_truncation)]
2247 confidence: row.confidence as f32,
2248 valid_from: row.valid_from,
2249 valid_to: row.valid_to,
2250 created_at: row.created_at,
2251 expired_at: row.expired_at,
2252 source_message_id: row.source_message_id.map(MessageId),
2253 qdrant_point_id: row.qdrant_point_id,
2254 edge_type,
2255 retrieval_count: row.retrieval_count,
2256 last_retrieved_at: row.last_retrieved_at,
2257 superseded_by: row.superseded_by,
2258 supersedes: row.supersedes,
2259 #[allow(clippy::cast_possible_truncation)]
2260 weight: row.weight as f32,
2261 }
2262}
2263
2264#[derive(zeph_db::FromRow)]
2265struct CommunityRow {
2266 id: i64,
2267 name: String,
2268 summary: String,
2269 entity_ids: String,
2270 fingerprint: Option<String>,
2271 created_at: String,
2272 updated_at: String,
2273}
2274
2275impl GraphStore {
2278 pub async fn ensure_episode(&self, conversation_id: i64) -> Result<i64, MemoryError> {
2286 zeph_db::query(sql!(
2290 "INSERT INTO conversations (id) VALUES (?)
2291 ON CONFLICT (id) DO NOTHING"
2292 ))
2293 .bind(conversation_id)
2294 .execute(&self.pool)
2295 .await?;
2296
2297 let id: i64 = zeph_db::query_scalar(sql!(
2298 "INSERT INTO graph_episodes (conversation_id)
2299 VALUES (?)
2300 ON CONFLICT(conversation_id) DO UPDATE SET conversation_id = excluded.conversation_id
2301 RETURNING id"
2302 ))
2303 .bind(conversation_id)
2304 .fetch_one(&self.pool)
2305 .await?;
2306 Ok(id)
2307 }
2308
2309 pub async fn link_entity_to_episode(
2317 &self,
2318 episode_id: i64,
2319 entity_id: i64,
2320 ) -> Result<(), MemoryError> {
2321 zeph_db::query(sql!(
2322 "INSERT INTO graph_episode_entities (episode_id, entity_id)
2323 VALUES (?, ?)
2324 ON CONFLICT (episode_id, entity_id) DO NOTHING"
2325 ))
2326 .bind(episode_id)
2327 .bind(entity_id)
2328 .execute(&self.pool)
2329 .await?;
2330 Ok(())
2331 }
2332
2333 pub async fn episodes_for_entity(
2339 &self,
2340 entity_id: i64,
2341 ) -> Result<Vec<super::types::Episode>, MemoryError> {
2342 #[derive(zeph_db::FromRow)]
2343 struct EpisodeRow {
2344 id: i64,
2345 conversation_id: i64,
2346 created_at: String,
2347 closed_at: Option<String>,
2348 }
2349 let rows: Vec<EpisodeRow> = zeph_db::query_as(sql!(
2350 "SELECT e.id, e.conversation_id, e.created_at, e.closed_at
2351 FROM graph_episodes e
2352 JOIN graph_episode_entities ee ON ee.episode_id = e.id
2353 WHERE ee.entity_id = ?"
2354 ))
2355 .bind(entity_id)
2356 .fetch_all(&self.pool)
2357 .await?;
2358 Ok(rows
2359 .into_iter()
2360 .map(|r| super::types::Episode {
2361 id: r.id,
2362 conversation_id: r.conversation_id,
2363 created_at: r.created_at,
2364 closed_at: r.closed_at,
2365 })
2366 .collect())
2367 }
2368
2369 #[allow(clippy::too_many_arguments)]
2388 #[tracing::instrument(name = "memory.graph.insert_or_supersede", skip_all)]
2391 pub async fn insert_or_supersede_with_metrics(
2392 &self,
2393 source_entity_id: i64,
2394 target_entity_id: i64,
2395 relation: &str,
2396 canonical_relation: &str,
2397 fact: &str,
2398 confidence: f32,
2399 episode_id: Option<MessageId>,
2400 edge_type: EdgeType,
2401 set_supersedes: bool,
2402 metrics: Option<&ApexMetrics>,
2403 ) -> Result<i64, MemoryError> {
2404 if source_entity_id == target_entity_id {
2405 return Err(MemoryError::InvalidInput(format!(
2406 "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
2407 )));
2408 }
2409 let confidence = confidence.clamp(0.0, 1.0);
2410 let edge_type_str = edge_type.as_str();
2411 let episode_raw: Option<i64> = episode_id.map(|m| m.0);
2412
2413 let mut tx = zeph_db::begin(&self.pool).await?;
2414
2415 if let Some(existing_id) = find_identical_active_edge(
2416 &mut tx,
2417 source_entity_id,
2418 target_entity_id,
2419 canonical_relation,
2420 edge_type_str,
2421 fact,
2422 )
2423 .await?
2424 {
2425 record_reassertion(&mut tx, existing_id, episode_raw, confidence).await?;
2426 tx.commit().await?;
2427 return Ok(existing_id);
2428 }
2429
2430 let prior_head =
2431 find_prior_active_head(&mut tx, source_entity_id, canonical_relation, edge_type_str)
2432 .await?;
2433
2434 if let Some(head_id) = prior_head {
2437 check_supersede_depth_in_tx(&mut tx, head_id).await?;
2438 }
2439
2440 if let Some(head_id) = prior_head {
2444 expire_prior_head(&mut tx, head_id).await?;
2445 }
2446
2447 let supersedes_val: Option<i64> = if set_supersedes { prior_head } else { None };
2448 let new_id = insert_new_edge(
2449 &mut tx,
2450 source_entity_id,
2451 target_entity_id,
2452 relation,
2453 canonical_relation,
2454 fact,
2455 confidence,
2456 episode_raw,
2457 edge_type_str,
2458 supersedes_val,
2459 )
2460 .await?;
2461
2462 if let Some(head_id) = prior_head {
2463 set_superseded_by(&mut tx, head_id, new_id).await?;
2464 if let Some(m) = metrics {
2465 m.supersedes_total
2466 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2467 }
2468 }
2469
2470 tx.commit().await?;
2471 Ok(new_id)
2472 }
2473
2474 #[allow(clippy::too_many_arguments)] pub async fn insert_or_supersede(
2481 &self,
2482 source_entity_id: i64,
2483 target_entity_id: i64,
2484 relation: &str,
2485 canonical_relation: &str,
2486 fact: &str,
2487 confidence: f32,
2488 episode_id: Option<MessageId>,
2489 edge_type: EdgeType,
2490 set_supersedes: bool,
2491 ) -> Result<i64, MemoryError> {
2492 self.insert_or_supersede_with_metrics(
2493 source_entity_id,
2494 target_entity_id,
2495 relation,
2496 canonical_relation,
2497 fact,
2498 confidence,
2499 episode_id,
2500 edge_type,
2501 set_supersedes,
2502 None,
2503 )
2504 .await
2505 }
2506
2507 pub async fn check_supersede_depth(&self, head_id: i64) -> Result<usize, MemoryError> {
2517 Self::check_supersede_depth_with_pool(&self.pool, head_id).await
2518 }
2519
2520 async fn check_supersede_depth_with_pool(
2521 pool: &zeph_db::DbPool,
2522 head_id: i64,
2523 ) -> Result<usize, MemoryError> {
2524 let cap = i64::try_from(SUPERSEDE_DEPTH_CAP + 1).unwrap_or(i64::MAX);
2525 let depth: Option<i64> = zeph_db::query_scalar(sql!(
2528 "WITH RECURSIVE chain(id, depth) AS (
2529 SELECT id, 0 FROM graph_edges WHERE id = ?
2530 UNION ALL
2531 SELECT e.supersedes, c.depth + 1
2532 FROM graph_edges e JOIN chain c ON e.id = c.id
2533 WHERE e.supersedes IS NOT NULL AND c.depth < ?
2534 )
2535 SELECT MAX(depth) FROM chain"
2536 ))
2537 .bind(head_id)
2538 .bind(cap)
2539 .fetch_optional(pool)
2540 .await?
2541 .flatten();
2542
2543 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
2544 let d = depth.unwrap_or(0) as usize;
2545 if d > SUPERSEDE_DEPTH_CAP {
2546 return Err(MemoryError::SupersedeCycle(head_id));
2547 }
2548 Ok(d)
2549 }
2550
2551 pub async fn source_message_ids_for_edges(
2562 &self,
2563 edge_ids: &[i64],
2564 ) -> Result<Vec<(i64, crate::types::MessageId)>, crate::error::MemoryError> {
2565 if edge_ids.is_empty() {
2566 return Ok(Vec::new());
2567 }
2568 let placeholders = placeholder_list(1, edge_ids.len());
2569 let sql = format!(
2570 "SELECT id, episode_id FROM graph_edges \
2571 WHERE id IN ({placeholders}) AND episode_id IS NOT NULL"
2572 );
2573 let mut q = zeph_db::query_as::<_, (i64, crate::types::MessageId)>(&sql);
2574 for &eid in edge_ids {
2575 q = q.bind(eid);
2576 }
2577 let rows = q.fetch_all(&self.pool).await?;
2578 Ok(rows)
2579 }
2580
2581 pub async fn source_entity_id_for_edge(
2589 &self,
2590 edge_id: i64,
2591 ) -> Result<Option<i64>, crate::error::MemoryError> {
2592 let row: Option<i64> = zeph_db::query_scalar(sql!(
2593 "SELECT source_entity_id FROM graph_edges WHERE id = ?1 AND valid_to IS NULL LIMIT 1"
2594 ))
2595 .bind(edge_id)
2596 .fetch_optional(&self.pool)
2597 .await?;
2598 Ok(row)
2599 }
2600
2601 pub async fn bfs_edges_at_depth(
2610 &self,
2611 entity_id: i64,
2612 _depth: u32,
2613 edge_types: &[crate::graph::types::EdgeType],
2614 ) -> Result<Vec<crate::recall_view::RecalledFact>, crate::error::MemoryError> {
2615 let type_strs: Vec<&str> = if edge_types.is_empty() {
2616 vec!["semantic", "temporal", "causal", "entity"]
2617 } else {
2618 edge_types.iter().map(|et| et.as_str()).collect()
2619 };
2620
2621 let ph = placeholder_list(1, type_strs.len());
2622 let src_pos = type_strs.len() + 1;
2623 let tgt_pos = src_pos + 1;
2624 let src_ph = numbered_placeholder(src_pos);
2625 let tgt_ph = numbered_placeholder(tgt_pos);
2626
2627 let sql = format!(
2628 "SELECT ge.id, ge.source_entity_id, ge.target_entity_id, ge.relation, ge.fact,
2629 ge.confidence, ge.valid_from, ge.valid_to, ge.created_at, ge.expired_at,
2630 ge.episode_id, ge.qdrant_point_id, ge.edge_type, ge.retrieval_count,
2631 ge.last_retrieved_at, ge.superseded_by, ge.canonical_relation, ge.supersedes, ge.weight
2632 FROM graph_edges ge
2633 WHERE ge.edge_type IN ({ph})
2634 AND ge.valid_to IS NULL
2635 AND (ge.source_entity_id = {src_ph} OR ge.target_entity_id = {tgt_ph})
2636 LIMIT 200"
2637 );
2638
2639 let mut q = zeph_db::query_as::<_, EdgeRow>(&sql);
2640 for t in &type_strs {
2641 q = q.bind(*t);
2642 }
2643 q = q.bind(entity_id).bind(entity_id);
2644
2645 let rows = q.fetch_all(&self.pool).await?;
2646
2647 let all_ids: Vec<i64> = rows
2649 .iter()
2650 .flat_map(|r| [r.source_entity_id, r.target_entity_id])
2651 .collect::<std::collections::HashSet<_>>()
2652 .into_iter()
2653 .collect();
2654
2655 let mut name_map: std::collections::HashMap<i64, String> = std::collections::HashMap::new();
2656 for chunk in all_ids.chunks(490) {
2657 let ph2 = placeholder_list(1, chunk.len());
2658 let name_sql =
2659 format!("SELECT id, canonical_name FROM graph_entities WHERE id IN ({ph2})");
2660 let mut nq = zeph_db::query_as::<_, (i64, String)>(&name_sql);
2661 for &id in chunk {
2662 nq = nq.bind(id);
2663 }
2664 let name_rows = nq.fetch_all(&self.pool).await?;
2665 for (id, name) in name_rows {
2666 name_map.insert(id, name);
2667 }
2668 }
2669
2670 let facts: Vec<crate::recall_view::RecalledFact> = rows
2671 .into_iter()
2672 .filter_map(|row| {
2673 let edge = edge_from_row(row);
2674 let entity_name = name_map.get(&edge.source_entity_id).cloned()?;
2675 let target_name = name_map.get(&edge.target_entity_id).cloned()?;
2676 if entity_name.is_empty() || target_name.is_empty() {
2677 return None;
2678 }
2679 let fact = crate::graph::types::GraphFact {
2680 entity_name,
2681 relation: edge.canonical_relation.clone(),
2682 target_name,
2683 fact: edge.fact.clone(),
2684 entity_match_score: 0.5,
2685 hop_distance: 1,
2686 confidence: edge.confidence,
2687 valid_from: if edge.valid_from.is_empty() {
2688 None
2689 } else {
2690 Some(edge.valid_from.clone())
2691 },
2692 edge_type: edge.edge_type,
2693 retrieval_count: edge.retrieval_count,
2694 edge_id: Some(edge.id),
2695 };
2696 Some(crate::recall_view::RecalledFact::from_graph_fact(fact))
2697 })
2698 .collect();
2699
2700 Ok(facts)
2701 }
2702}
2703
2704type Tx<'a> = zeph_db::DbTransaction<'a>;
2709
2710async fn find_identical_active_edge(
2714 tx: &mut Tx<'_>,
2715 src: i64,
2716 tgt: i64,
2717 canon: &str,
2718 edge_type_str: &str,
2719 fact: &str,
2720) -> Result<Option<i64>, MemoryError> {
2721 Ok(zeph_db::query_scalar(sql!(
2722 "SELECT id FROM graph_edges
2723 WHERE source_entity_id = ?
2724 AND target_entity_id = ?
2725 AND canonical_relation = ?
2726 AND edge_type = ?
2727 AND fact = ?
2728 AND valid_to IS NULL
2729 AND expired_at IS NULL
2730 LIMIT 1"
2731 ))
2732 .bind(src)
2733 .bind(tgt)
2734 .bind(canon)
2735 .bind(edge_type_str)
2736 .bind(fact)
2737 .fetch_optional(&mut **tx)
2738 .await?)
2739}
2740
2741async fn record_reassertion(
2743 tx: &mut Tx<'_>,
2744 head_id: i64,
2745 episode_raw: Option<i64>,
2746 confidence: f32,
2747) -> Result<(), MemoryError> {
2748 #[allow(clippy::cast_possible_wrap)]
2749 let asserted_at = std::time::SystemTime::now()
2750 .duration_since(std::time::UNIX_EPOCH)
2751 .unwrap_or_default()
2752 .as_secs() as i64;
2753 zeph_db::query(sql!(
2754 "INSERT INTO edge_reassertions (head_edge_id, asserted_at, episode_id, confidence)
2755 VALUES (?, ?, ?, ?)"
2756 ))
2757 .bind(head_id)
2758 .bind(asserted_at)
2759 .bind(episode_raw)
2760 .bind(f64::from(confidence))
2761 .execute(&mut **tx)
2762 .await?;
2763 Ok(())
2764}
2765
2766async fn find_prior_active_head(
2768 tx: &mut Tx<'_>,
2769 src: i64,
2770 canon: &str,
2771 edge_type_str: &str,
2772) -> Result<Option<i64>, MemoryError> {
2773 Ok(zeph_db::query_scalar(sql!(
2774 "SELECT id FROM graph_edges
2775 WHERE source_entity_id = ?
2776 AND canonical_relation = ?
2777 AND edge_type = ?
2778 AND valid_to IS NULL
2779 AND expired_at IS NULL
2780 ORDER BY created_at DESC
2781 LIMIT 1"
2782 ))
2783 .bind(src)
2784 .bind(canon)
2785 .bind(edge_type_str)
2786 .fetch_optional(&mut **tx)
2787 .await?)
2788}
2789
2790async fn check_supersede_depth_in_tx(tx: &mut Tx<'_>, head_id: i64) -> Result<(), MemoryError> {
2794 let cap = i64::try_from(SUPERSEDE_DEPTH_CAP + 1).unwrap_or(i64::MAX);
2795 let depth: Option<i64> = sqlx::query_scalar(
2796 "WITH RECURSIVE chain(id, depth) AS (
2797 SELECT supersedes, 1 FROM graph_edges WHERE id = ? AND supersedes IS NOT NULL
2798 UNION ALL
2799 SELECT e.supersedes, c.depth + 1
2800 FROM graph_edges e JOIN chain c ON e.id = c.id
2801 WHERE e.supersedes IS NOT NULL AND c.depth < ?
2802 )
2803 SELECT MAX(depth) FROM chain",
2804 )
2805 .bind(head_id)
2806 .bind(cap)
2807 .fetch_optional(&mut **tx)
2808 .await?
2809 .flatten();
2810 let d = usize::try_from(depth.unwrap_or(0)).unwrap_or(usize::MAX);
2811 if d > SUPERSEDE_DEPTH_CAP {
2812 return Err(MemoryError::SupersedeDepthExceeded(head_id));
2813 }
2814 Ok(())
2815}
2816
2817#[allow(clippy::too_many_arguments)]
2819async fn insert_new_edge(
2820 tx: &mut Tx<'_>,
2821 src: i64,
2822 tgt: i64,
2823 relation: &str,
2824 canonical_relation: &str,
2825 fact: &str,
2826 confidence: f32,
2827 episode_raw: Option<i64>,
2828 edge_type_str: &str,
2829 supersedes_val: Option<i64>,
2830) -> Result<i64, MemoryError> {
2831 Ok(zeph_db::query_scalar(sql!(
2832 "INSERT INTO graph_edges
2833 (source_entity_id, target_entity_id, relation, canonical_relation, fact,
2834 confidence, episode_id, edge_type, supersedes)
2835 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
2836 RETURNING id"
2837 ))
2838 .bind(src)
2839 .bind(tgt)
2840 .bind(relation)
2841 .bind(canonical_relation)
2842 .bind(fact)
2843 .bind(f64::from(confidence))
2844 .bind(episode_raw)
2845 .bind(edge_type_str)
2846 .bind(supersedes_val)
2847 .fetch_one(&mut **tx)
2848 .await?)
2849}
2850
2851async fn expire_prior_head(tx: &mut Tx<'_>, head_id: i64) -> Result<(), MemoryError> {
2855 zeph_db::query(sql!(
2856 "UPDATE graph_edges
2857 SET valid_to = CURRENT_TIMESTAMP,
2858 expired_at = CURRENT_TIMESTAMP
2859 WHERE id = ?"
2860 ))
2861 .bind(head_id)
2862 .execute(&mut **tx)
2863 .await?;
2864 Ok(())
2865}
2866
2867async fn set_superseded_by(tx: &mut Tx<'_>, head_id: i64, new_id: i64) -> Result<(), MemoryError> {
2870 zeph_db::query(sql!(
2871 "UPDATE graph_edges SET superseded_by = ? WHERE id = ?"
2872 ))
2873 .bind(new_id)
2874 .bind(head_id)
2875 .execute(&mut **tx)
2876 .await?;
2877 Ok(())
2878}
2879
2880type EntityFtsRow = (
2884 i64,
2885 String,
2886 String,
2887 String,
2888 Option<String>,
2889 String,
2890 String,
2891 Option<String>,
2892 f64,
2893);
2894
2895fn build_fts_query(query: &str) -> Option<String> {
2900 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
2901 let sanitized = sanitize_fts_query(query);
2902 if sanitized.is_empty() {
2903 return None;
2904 }
2905 let fts_query: String = sanitized
2906 .split_whitespace()
2907 .filter(|t| !FTS5_OPERATORS.contains(t))
2908 .map(|t| format!("{t}*"))
2909 .collect::<Vec<_>>()
2910 .join(" ");
2911 if fts_query.is_empty() {
2912 None
2913 } else {
2914 Some(fts_query)
2915 }
2916}
2917
2918fn build_ranked_fts_sql() -> String {
2923 format!(
2924 "SELECT * FROM ( \
2925 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
2926 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
2927 -bm25(graph_entities_fts, 10.0, 1.0) AS fts_rank \
2928 FROM graph_entities_fts fts \
2929 JOIN graph_entities e ON e.id = fts.rowid \
2930 WHERE graph_entities_fts MATCH ? \
2931 UNION ALL \
2932 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
2933 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
2934 0.5 AS fts_rank \
2935 FROM graph_entity_aliases a \
2936 JOIN graph_entities e ON e.id = a.entity_id \
2937 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
2938 ) \
2939 ORDER BY fts_rank DESC \
2940 LIMIT ?",
2941 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
2942 )
2943}
2944
2945fn normalize_and_dedup(rows: Vec<EntityFtsRow>) -> Vec<(Entity, f32)> {
2949 let max_score: f64 = rows.iter().map(|r| r.8).fold(0.0_f64, f64::max);
2951 let max_score = if max_score <= 0.0 { 1.0 } else { max_score };
2952
2953 let mut seen_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
2954 let mut result: Vec<(Entity, f32)> = Vec::with_capacity(rows.len());
2955 for (
2956 id,
2957 name,
2958 canonical_name,
2959 entity_type_str,
2960 summary,
2961 first_seen_at,
2962 last_seen_at,
2963 qdrant_point_id,
2964 raw_score,
2965 ) in rows
2966 {
2967 if !seen_ids.insert(id) {
2968 continue;
2969 }
2970 let entity_type = entity_type_str
2971 .parse()
2972 .unwrap_or(super::types::EntityType::Concept);
2973 let entity = Entity {
2974 id: EntityId(id),
2975 name,
2976 canonical_name,
2977 entity_type,
2978 summary,
2979 first_seen_at,
2980 last_seen_at,
2981 qdrant_point_id,
2982 };
2983 #[allow(clippy::cast_possible_truncation)]
2984 let normalized = (raw_score / max_score).clamp(0.0, 1.0) as f32;
2985 result.push((entity, normalized));
2986 }
2987 result
2988}
2989
2990#[cfg(test)]
2993mod tests;