1use std::collections::HashMap;
11#[allow(unused_imports)]
12use zeph_db::sql;
13
14use futures::Stream;
15use zeph_db::fts::sanitize_fts_query;
16use zeph_db::{ActiveDialect, DbPool, numbered_placeholder, placeholder_list};
17
18use crate::error::MemoryError;
19use crate::graph::conflict::{ApexMetrics, SUPERSEDE_DEPTH_CAP};
20use crate::types::MessageId;
21
22use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
23
24pub struct GraphStore {
31 pool: DbPool,
32}
33
34impl GraphStore {
35 #[must_use]
39 pub fn new(pool: DbPool) -> Self {
40 Self { pool }
41 }
42
43 #[must_use]
45 pub fn pool(&self) -> &DbPool {
46 &self.pool
47 }
48
49 pub async fn upsert_entity(
62 &self,
63 surface_name: &str,
64 canonical_name: &str,
65 entity_type: EntityType,
66 summary: Option<&str>,
67 ) -> Result<i64, MemoryError> {
68 let type_str = entity_type.as_str();
69 let id: i64 = zeph_db::query_scalar(sql!(
70 "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
71 VALUES (?, ?, ?, ?)
72 ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
73 name = excluded.name,
74 summary = COALESCE(excluded.summary, summary),
75 last_seen_at = CURRENT_TIMESTAMP
76 RETURNING id"
77 ))
78 .bind(surface_name)
79 .bind(canonical_name)
80 .bind(type_str)
81 .bind(summary)
82 .fetch_one(&self.pool)
83 .await?;
84 Ok(id)
85 }
86
87 pub async fn find_entity(
93 &self,
94 canonical_name: &str,
95 entity_type: EntityType,
96 ) -> Result<Option<Entity>, MemoryError> {
97 let type_str = entity_type.as_str();
98 let row: Option<EntityRow> = zeph_db::query_as(
99 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
100 FROM graph_entities
101 WHERE canonical_name = ? AND entity_type = ?"),
102 )
103 .bind(canonical_name)
104 .bind(type_str)
105 .fetch_optional(&self.pool)
106 .await?;
107 row.map(entity_from_row).transpose()
108 }
109
110 pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
116 let row: Option<EntityRow> = zeph_db::query_as(
117 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
118 FROM graph_entities
119 WHERE id = ?"),
120 )
121 .bind(entity_id)
122 .fetch_optional(&self.pool)
123 .await?;
124 row.map(entity_from_row).transpose()
125 }
126
127 pub async fn set_entity_qdrant_point_id(
133 &self,
134 entity_id: i64,
135 point_id: &str,
136 ) -> Result<(), MemoryError> {
137 zeph_db::query(sql!(
138 "UPDATE graph_entities SET qdrant_point_id = ? WHERE id = ?"
139 ))
140 .bind(point_id)
141 .bind(entity_id)
142 .execute(&self.pool)
143 .await?;
144 Ok(())
145 }
146
147 pub async fn find_entities_fuzzy(
168 &self,
169 query: &str,
170 limit: usize,
171 ) -> Result<Vec<Entity>, MemoryError> {
172 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
176 let query = &query[..query.floor_char_boundary(512)];
177 let sanitized = sanitize_fts_query(query);
180 if sanitized.is_empty() {
181 return Ok(vec![]);
182 }
183 let fts_query: String = sanitized
184 .split_whitespace()
185 .filter(|t| !FTS5_OPERATORS.contains(t))
186 .map(|t| format!("{t}*"))
187 .collect::<Vec<_>>()
188 .join(" ");
189 if fts_query.is_empty() {
190 return Ok(vec![]);
191 }
192
193 let limit = i64::try_from(limit)?;
194 let search_sql = format!(
197 "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
198 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
199 FROM graph_entities_fts fts \
200 JOIN graph_entities e ON e.id = fts.rowid \
201 WHERE graph_entities_fts MATCH ? \
202 UNION \
203 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
204 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
205 FROM graph_entity_aliases a \
206 JOIN graph_entities e ON e.id = a.entity_id \
207 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
208 LIMIT ?",
209 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
210 );
211 let rows: Vec<EntityRow> = zeph_db::query_as(&search_sql)
212 .bind(&fts_query)
213 .bind(format!(
214 "%{}%",
215 query
216 .trim()
217 .replace('\\', "\\\\")
218 .replace('%', "\\%")
219 .replace('_', "\\_")
220 ))
221 .bind(limit)
222 .fetch_all(&self.pool)
223 .await?;
224 rows.into_iter()
225 .map(entity_from_row)
226 .collect::<Result<Vec<_>, _>>()
227 }
228
229 #[cfg(feature = "sqlite")]
239 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
240 zeph_db::query("PRAGMA wal_checkpoint(PASSIVE)")
241 .execute(&self.pool)
242 .await?;
243 Ok(())
244 }
245
246 #[cfg(feature = "postgres")]
252 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
253 Ok(())
254 }
255
256 pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
258 use futures::StreamExt as _;
259 zeph_db::query_as::<_, EntityRow>(
260 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
261 FROM graph_entities ORDER BY id ASC"),
262 )
263 .fetch(&self.pool)
264 .map(|r: Result<EntityRow, zeph_db::SqlxError>| {
265 r.map_err(MemoryError::from).and_then(entity_from_row)
266 })
267 }
268
269 pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
277 let insert_alias_sql = format!(
278 "{} INTO graph_entity_aliases (entity_id, alias_name) VALUES (?, ?){}",
279 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
280 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
281 );
282 zeph_db::query(&insert_alias_sql)
283 .bind(entity_id)
284 .bind(alias_name)
285 .execute(&self.pool)
286 .await?;
287 Ok(())
288 }
289
290 pub async fn find_entity_by_alias(
298 &self,
299 alias_name: &str,
300 entity_type: EntityType,
301 ) -> Result<Option<Entity>, MemoryError> {
302 let type_str = entity_type.as_str();
303 let alias_typed_sql = format!(
304 "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
305 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
306 FROM graph_entity_aliases a \
307 JOIN graph_entities e ON e.id = a.entity_id \
308 WHERE a.alias_name = ? {} \
309 AND e.entity_type = ? \
310 ORDER BY e.id ASC \
311 LIMIT 1",
312 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
313 );
314 let row: Option<EntityRow> = zeph_db::query_as(&alias_typed_sql)
315 .bind(alias_name)
316 .bind(type_str)
317 .fetch_optional(&self.pool)
318 .await?;
319 row.map(entity_from_row).transpose()
320 }
321
322 pub async fn aliases_for_entity(
328 &self,
329 entity_id: i64,
330 ) -> Result<Vec<EntityAlias>, MemoryError> {
331 let rows: Vec<AliasRow> = zeph_db::query_as(sql!(
332 "SELECT id, entity_id, alias_name, created_at
333 FROM graph_entity_aliases
334 WHERE entity_id = ?
335 ORDER BY id ASC"
336 ))
337 .bind(entity_id)
338 .fetch_all(&self.pool)
339 .await?;
340 Ok(rows.into_iter().map(alias_from_row).collect())
341 }
342
343 pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
349 use futures::TryStreamExt as _;
350 self.all_entities_stream().try_collect().await
351 }
352
353 pub async fn entity_count(&self) -> Result<i64, MemoryError> {
359 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_entities"))
360 .fetch_one(&self.pool)
361 .await?;
362 Ok(count)
363 }
364
365 pub async fn insert_edge(
383 &self,
384 source_entity_id: i64,
385 target_entity_id: i64,
386 relation: &str,
387 fact: &str,
388 confidence: f32,
389 episode_id: Option<MessageId>,
390 ) -> Result<i64, MemoryError> {
391 self.insert_edge_typed(
392 source_entity_id,
393 target_entity_id,
394 relation,
395 fact,
396 confidence,
397 episode_id,
398 EdgeType::Semantic,
399 )
400 .await
401 }
402
403 #[allow(clippy::too_many_arguments)] pub async fn insert_edge_typed(
413 &self,
414 source_entity_id: i64,
415 target_entity_id: i64,
416 relation: &str,
417 fact: &str,
418 confidence: f32,
419 episode_id: Option<MessageId>,
420 edge_type: EdgeType,
421 ) -> Result<i64, MemoryError> {
422 if source_entity_id == target_entity_id {
423 return Err(MemoryError::InvalidInput(format!(
424 "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
425 )));
426 }
427 let confidence = confidence.clamp(0.0, 1.0);
428 let edge_type_str = edge_type.as_str();
429
430 let mut tx = zeph_db::begin(&self.pool).await?;
435
436 let existing: Option<(i64, f64)> = zeph_db::query_as(sql!(
437 "SELECT id, confidence FROM graph_edges
438 WHERE source_entity_id = ?
439 AND target_entity_id = ?
440 AND relation = ?
441 AND edge_type = ?
442 AND valid_to IS NULL
443 LIMIT 1"
444 ))
445 .bind(source_entity_id)
446 .bind(target_entity_id)
447 .bind(relation)
448 .bind(edge_type_str)
449 .fetch_optional(&mut *tx)
450 .await?;
451
452 if let Some((existing_id, stored_conf)) = existing {
453 let updated_conf = f64::from(confidence).max(stored_conf);
454 zeph_db::query(sql!("UPDATE graph_edges SET confidence = ? WHERE id = ?"))
455 .bind(updated_conf)
456 .bind(existing_id)
457 .execute(&mut *tx)
458 .await?;
459 tx.commit().await?;
460 return Ok(existing_id);
461 }
462
463 let episode_raw: Option<i64> = episode_id.map(|m| m.0);
464 let id: i64 = zeph_db::query_scalar(sql!(
465 "INSERT INTO graph_edges
466 (source_entity_id, target_entity_id, relation, fact, confidence, episode_id, edge_type)
467 VALUES (?, ?, ?, ?, ?, ?, ?)
468 RETURNING id"
469 ))
470 .bind(source_entity_id)
471 .bind(target_entity_id)
472 .bind(relation)
473 .bind(fact)
474 .bind(f64::from(confidence))
475 .bind(episode_raw)
476 .bind(edge_type_str)
477 .fetch_one(&mut *tx)
478 .await?;
479 tx.commit().await?;
480 Ok(id)
481 }
482
483 pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
489 zeph_db::query(sql!(
490 "UPDATE graph_edges SET valid_to = CURRENT_TIMESTAMP, expired_at = CURRENT_TIMESTAMP
491 WHERE id = ?"
492 ))
493 .bind(edge_id)
494 .execute(&self.pool)
495 .await?;
496 Ok(())
497 }
498
499 pub async fn invalidate_edge_with_supersession(
507 &self,
508 old_edge_id: i64,
509 new_edge_id: i64,
510 ) -> Result<(), MemoryError> {
511 zeph_db::query(sql!(
512 "UPDATE graph_edges
513 SET valid_to = CURRENT_TIMESTAMP,
514 expired_at = CURRENT_TIMESTAMP,
515 superseded_by = ?
516 WHERE id = ?"
517 ))
518 .bind(new_edge_id)
519 .bind(old_edge_id)
520 .execute(&self.pool)
521 .await?;
522 Ok(())
523 }
524
525 pub async fn edges_for_entities(
542 &self,
543 entity_ids: &[i64],
544 edge_types: &[super::types::EdgeType],
545 ) -> Result<Vec<Edge>, MemoryError> {
546 const MAX_BATCH_ENTITIES: usize = 490;
550
551 let mut all_edges: Vec<Edge> = Vec::new();
552
553 for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
554 let edges = self.query_batch_edges(chunk, edge_types).await?;
555 all_edges.extend(edges);
556 }
557
558 Ok(all_edges)
559 }
560
561 async fn query_batch_edges(
569 &self,
570 entity_ids: &[i64],
571 edge_types: &[super::types::EdgeType],
572 ) -> Result<Vec<Edge>, MemoryError> {
573 if entity_ids.is_empty() {
574 return Ok(Vec::new());
575 }
576
577 let n_ids = entity_ids.len();
580 let n_types = edge_types.len();
581
582 let sql = if n_types == 0 {
583 let placeholders = placeholder_list(1, n_ids);
585 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
586 format!(
587 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
588 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
589 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
590 FROM graph_edges
591 WHERE valid_to IS NULL
592 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))"
593 )
594 } else {
595 let placeholders = placeholder_list(1, n_ids);
596 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
597 let type_placeholders = placeholder_list(n_ids * 2 + 1, n_types);
598 format!(
599 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
600 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
601 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
602 FROM graph_edges
603 WHERE valid_to IS NULL
604 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))
605 AND edge_type IN ({type_placeholders})"
606 )
607 };
608
609 let mut query = zeph_db::query_as::<_, EdgeRow>(&sql);
611 for id in entity_ids {
612 query = query.bind(*id);
613 }
614 for id in entity_ids {
615 query = query.bind(*id);
616 }
617 for et in edge_types {
618 query = query.bind(et.as_str());
619 }
620
621 let rows: Vec<EdgeRow> = tokio::time::timeout(
625 std::time::Duration::from_millis(500),
626 query.fetch_all(&self.pool),
627 )
628 .await
629 .map_err(|_| MemoryError::Timeout("graph pool acquire timed out after 500ms".into()))??;
630 Ok(rows.into_iter().map(edge_from_row).collect())
631 }
632
633 pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
639 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
640 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
641 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
642 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
643 FROM graph_edges
644 WHERE valid_to IS NULL
645 AND (source_entity_id = ? OR target_entity_id = ?)"
646 ))
647 .bind(entity_id)
648 .bind(entity_id)
649 .fetch_all(&self.pool)
650 .await?;
651 Ok(rows.into_iter().map(edge_from_row).collect())
652 }
653
654 pub async fn edge_history_for_entity(
661 &self,
662 entity_id: i64,
663 limit: usize,
664 ) -> Result<Vec<Edge>, MemoryError> {
665 let limit = i64::try_from(limit)?;
666 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
667 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
668 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
669 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
670 FROM graph_edges
671 WHERE source_entity_id = ? OR target_entity_id = ?
672 ORDER BY valid_from DESC
673 LIMIT ?"
674 ))
675 .bind(entity_id)
676 .bind(entity_id)
677 .bind(limit)
678 .fetch_all(&self.pool)
679 .await?;
680 Ok(rows.into_iter().map(edge_from_row).collect())
681 }
682
683 pub async fn edges_between(
689 &self,
690 entity_a: i64,
691 entity_b: i64,
692 ) -> Result<Vec<Edge>, MemoryError> {
693 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
694 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
695 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
696 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
697 FROM graph_edges
698 WHERE valid_to IS NULL
699 AND ((source_entity_id = ? AND target_entity_id = ?)
700 OR (source_entity_id = ? AND target_entity_id = ?))"
701 ))
702 .bind(entity_a)
703 .bind(entity_b)
704 .bind(entity_b)
705 .bind(entity_a)
706 .fetch_all(&self.pool)
707 .await?;
708 Ok(rows.into_iter().map(edge_from_row).collect())
709 }
710
711 pub async fn edges_exact(
717 &self,
718 source_entity_id: i64,
719 target_entity_id: i64,
720 ) -> Result<Vec<Edge>, MemoryError> {
721 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
722 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
723 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
724 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
725 FROM graph_edges
726 WHERE valid_to IS NULL
727 AND source_entity_id = ?
728 AND target_entity_id = ?"
729 ))
730 .bind(source_entity_id)
731 .bind(target_entity_id)
732 .fetch_all(&self.pool)
733 .await?;
734 Ok(rows.into_iter().map(edge_from_row).collect())
735 }
736
737 pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
743 let count: i64 = zeph_db::query_scalar(sql!(
744 "SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL"
745 ))
746 .fetch_one(&self.pool)
747 .await?;
748 Ok(count)
749 }
750
751 pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
757 let rows: Vec<(String, i64)> = zeph_db::query_as(
758 sql!("SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type"),
759 )
760 .fetch_all(&self.pool)
761 .await?;
762 Ok(rows)
763 }
764
765 pub async fn upsert_community(
777 &self,
778 name: &str,
779 summary: &str,
780 entity_ids: &[i64],
781 fingerprint: Option<&str>,
782 ) -> Result<i64, MemoryError> {
783 let entity_ids_json = serde_json::to_string(entity_ids)?;
784 let id: i64 = zeph_db::query_scalar(sql!(
785 "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
786 VALUES (?, ?, ?, ?)
787 ON CONFLICT(name) DO UPDATE SET
788 summary = excluded.summary,
789 entity_ids = excluded.entity_ids,
790 fingerprint = COALESCE(excluded.fingerprint, fingerprint),
791 updated_at = CURRENT_TIMESTAMP
792 RETURNING id"
793 ))
794 .bind(name)
795 .bind(summary)
796 .bind(entity_ids_json)
797 .bind(fingerprint)
798 .fetch_one(&self.pool)
799 .await?;
800 Ok(id)
801 }
802
803 pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
810 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
811 "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL"
812 ))
813 .fetch_all(&self.pool)
814 .await?;
815 Ok(rows.into_iter().collect())
816 }
817
818 pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
824 zeph_db::query(sql!("DELETE FROM graph_communities WHERE id = ?"))
825 .bind(id)
826 .execute(&self.pool)
827 .await?;
828 Ok(())
829 }
830
831 pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
840 zeph_db::query(sql!(
841 "UPDATE graph_communities SET fingerprint = NULL WHERE id = ?"
842 ))
843 .bind(id)
844 .execute(&self.pool)
845 .await?;
846 Ok(())
847 }
848
849 pub async fn community_for_entity(
858 &self,
859 entity_id: i64,
860 ) -> Result<Option<Community>, MemoryError> {
861 let row: Option<CommunityRow> = zeph_db::query_as(
862 sql!("SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
863 FROM graph_communities c, json_each(c.entity_ids) j
864 WHERE CAST(j.value AS INTEGER) = ?
865 LIMIT 1"),
866 )
867 .bind(entity_id)
868 .fetch_optional(&self.pool)
869 .await?;
870 match row {
871 Some(row) => {
872 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
873 Ok(Some(Community {
874 id: row.id,
875 name: row.name,
876 summary: row.summary,
877 entity_ids,
878 fingerprint: row.fingerprint,
879 created_at: row.created_at,
880 updated_at: row.updated_at,
881 }))
882 }
883 None => Ok(None),
884 }
885 }
886
887 pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
893 let rows: Vec<CommunityRow> = zeph_db::query_as(sql!(
894 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
895 FROM graph_communities
896 ORDER BY id ASC"
897 ))
898 .fetch_all(&self.pool)
899 .await?;
900
901 rows.into_iter()
902 .map(|row| {
903 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
904 Ok(Community {
905 id: row.id,
906 name: row.name,
907 summary: row.summary,
908 entity_ids,
909 fingerprint: row.fingerprint,
910 created_at: row.created_at,
911 updated_at: row.updated_at,
912 })
913 })
914 .collect()
915 }
916
917 pub async fn community_count(&self) -> Result<i64, MemoryError> {
923 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_communities"))
924 .fetch_one(&self.pool)
925 .await?;
926 Ok(count)
927 }
928
929 pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
937 let val: Option<String> =
938 zeph_db::query_scalar(sql!("SELECT value FROM graph_metadata WHERE key = ?"))
939 .bind(key)
940 .fetch_optional(&self.pool)
941 .await?;
942 Ok(val)
943 }
944
945 pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
951 zeph_db::query(sql!(
952 "INSERT INTO graph_metadata (key, value) VALUES (?, ?)
953 ON CONFLICT(key) DO UPDATE SET value = excluded.value"
954 ))
955 .bind(key)
956 .bind(value)
957 .execute(&self.pool)
958 .await?;
959 Ok(())
960 }
961
962 pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
970 let val = self.get_metadata("extraction_count").await?;
971 Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
972 }
973
974 pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
976 use futures::StreamExt as _;
977 zeph_db::query_as::<_, EdgeRow>(sql!(
978 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
979 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
980 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
981 FROM graph_edges
982 WHERE valid_to IS NULL
983 ORDER BY id ASC"
984 ))
985 .fetch(&self.pool)
986 .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
987 }
988
989 pub async fn edges_after_id(
1006 &self,
1007 after_id: i64,
1008 limit: i64,
1009 ) -> Result<Vec<Edge>, MemoryError> {
1010 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1011 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1012 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1013 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1014 FROM graph_edges
1015 WHERE valid_to IS NULL AND id > ?
1016 ORDER BY id ASC
1017 LIMIT ?"
1018 ))
1019 .bind(after_id)
1020 .bind(limit)
1021 .fetch_all(&self.pool)
1022 .await?;
1023 Ok(rows.into_iter().map(edge_from_row).collect())
1024 }
1025
1026 pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
1032 let row: Option<CommunityRow> = zeph_db::query_as(sql!(
1033 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
1034 FROM graph_communities
1035 WHERE id = ?"
1036 ))
1037 .bind(id)
1038 .fetch_optional(&self.pool)
1039 .await?;
1040 match row {
1041 Some(row) => {
1042 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
1043 Ok(Some(Community {
1044 id: row.id,
1045 name: row.name,
1046 summary: row.summary,
1047 entity_ids,
1048 fingerprint: row.fingerprint,
1049 created_at: row.created_at,
1050 updated_at: row.updated_at,
1051 }))
1052 }
1053 None => Ok(None),
1054 }
1055 }
1056
1057 pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
1063 zeph_db::query(sql!("DELETE FROM graph_communities"))
1064 .execute(&self.pool)
1065 .await?;
1066 Ok(())
1067 }
1068
1069 pub async fn find_entities_ranked(
1083 &self,
1084 query: &str,
1085 limit: usize,
1086 ) -> Result<Vec<(Entity, f32)>, MemoryError> {
1087 let query = &query[..query.floor_char_boundary(512)];
1088 let Some(fts_query) = build_fts_query(query) else {
1089 return Ok(vec![]);
1090 };
1091
1092 let limit_i64 = i64::try_from(limit)?;
1093 let ranked_fts_sql = build_ranked_fts_sql();
1094 let rows: Vec<EntityFtsRow> = zeph_db::query_as(&ranked_fts_sql)
1095 .bind(&fts_query)
1096 .bind(format!(
1097 "%{}%",
1098 query
1099 .trim()
1100 .replace('\\', "\\\\")
1101 .replace('%', "\\%")
1102 .replace('_', "\\_")
1103 ))
1104 .bind(limit_i64)
1105 .fetch_all(&self.pool)
1106 .await?;
1107
1108 if rows.is_empty() {
1109 return Ok(vec![]);
1110 }
1111
1112 Ok(normalize_and_dedup(rows))
1113 }
1114
1115 pub async fn entity_structural_scores(
1125 &self,
1126 entity_ids: &[i64],
1127 ) -> Result<HashMap<i64, f32>, MemoryError> {
1128 const MAX_BATCH: usize = 163;
1131
1132 if entity_ids.is_empty() {
1133 return Ok(HashMap::new());
1134 }
1135
1136 let mut all_rows: Vec<(i64, i64, i64)> = Vec::new();
1137 for chunk in entity_ids.chunks(MAX_BATCH) {
1138 let n = chunk.len();
1139 let ph1 = placeholder_list(1, n);
1141 let ph2 = placeholder_list(n + 1, n);
1142 let ph3 = placeholder_list(n * 2 + 1, n);
1143
1144 let sql = format!(
1146 "SELECT entity_id,
1147 COUNT(*) AS degree,
1148 COUNT(DISTINCT edge_type) AS type_diversity
1149 FROM (
1150 SELECT source_entity_id AS entity_id, edge_type
1151 FROM graph_edges
1152 WHERE valid_to IS NULL AND source_entity_id IN ({ph1})
1153 UNION ALL
1154 SELECT target_entity_id AS entity_id, edge_type
1155 FROM graph_edges
1156 WHERE valid_to IS NULL AND target_entity_id IN ({ph2})
1157 )
1158 WHERE entity_id IN ({ph3})
1159 GROUP BY entity_id"
1160 );
1161
1162 let mut query = zeph_db::query_as::<_, (i64, i64, i64)>(&sql);
1163 for id in chunk {
1165 query = query.bind(*id);
1166 }
1167 for id in chunk {
1168 query = query.bind(*id);
1169 }
1170 for id in chunk {
1171 query = query.bind(*id);
1172 }
1173
1174 let chunk_rows: Vec<(i64, i64, i64)> = query.fetch_all(&self.pool).await?;
1175 all_rows.extend(chunk_rows);
1176 }
1177
1178 if all_rows.is_empty() {
1179 return Ok(entity_ids.iter().map(|&id| (id, 0.0_f32)).collect());
1180 }
1181
1182 let max_degree = all_rows
1183 .iter()
1184 .map(|(_, d, _)| *d)
1185 .max()
1186 .unwrap_or(1)
1187 .max(1);
1188
1189 let mut scores: HashMap<i64, f32> = entity_ids.iter().map(|&id| (id, 0.0_f32)).collect();
1190 for (entity_id, degree, type_diversity) in all_rows {
1191 #[allow(clippy::cast_precision_loss)]
1192 let norm_degree = degree as f32 / max_degree as f32;
1193 #[allow(clippy::cast_precision_loss)]
1194 let norm_diversity = (type_diversity as f32 / 4.0).min(1.0);
1195 let score = 0.6 * norm_degree + 0.4 * norm_diversity;
1196 scores.insert(entity_id, score);
1197 }
1198
1199 Ok(scores)
1200 }
1201
1202 pub async fn entity_community_ids(
1211 &self,
1212 entity_ids: &[i64],
1213 ) -> Result<HashMap<i64, i64>, MemoryError> {
1214 const MAX_BATCH: usize = 490;
1215
1216 if entity_ids.is_empty() {
1217 return Ok(HashMap::new());
1218 }
1219
1220 let mut result: HashMap<i64, i64> = HashMap::new();
1221 for chunk in entity_ids.chunks(MAX_BATCH) {
1222 let placeholders = placeholder_list(1, chunk.len());
1223
1224 let community_sql = community_ids_sql(&placeholders);
1225 let mut query = zeph_db::query_as::<_, (i64, i64)>(&community_sql);
1226 for id in chunk {
1227 query = query.bind(*id);
1228 }
1229
1230 let rows: Vec<(i64, i64)> = query.fetch_all(&self.pool).await?;
1231 result.extend(rows);
1232 }
1233
1234 Ok(result)
1235 }
1236
1237 pub async fn record_edge_retrieval(&self, edge_ids: &[i64]) -> Result<(), MemoryError> {
1246 const MAX_BATCH: usize = 490;
1247 let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1248 for chunk in edge_ids.chunks(MAX_BATCH) {
1249 let edge_placeholders = placeholder_list(1, chunk.len());
1250 let retrieval_sql = format!(
1251 "UPDATE graph_edges \
1252 SET retrieval_count = retrieval_count + 1, \
1253 last_retrieved_at = {epoch_now} \
1254 WHERE id IN ({edge_placeholders})"
1255 );
1256 let mut q = zeph_db::query(&retrieval_sql);
1257 for id in chunk {
1258 q = q.bind(*id);
1259 }
1260 q.execute(&self.pool).await?;
1261 }
1262 Ok(())
1263 }
1264
1265 #[tracing::instrument(
1277 name = "memory.graph.hebbian_increment",
1278 skip_all,
1279 fields(edge_count = edge_ids.len())
1280 )]
1281 pub async fn apply_hebbian_increment(
1282 &self,
1283 edge_ids: &[i64],
1284 delta: f32,
1285 ) -> Result<(), MemoryError> {
1286 const MAX_BATCH: usize = 490;
1289 if edge_ids.is_empty() || delta == 0.0 {
1290 return Ok(());
1291 }
1292 for chunk in edge_ids.chunks(MAX_BATCH) {
1293 let edge_placeholders = placeholder_list(2, chunk.len());
1294 let sql = format!(
1295 "UPDATE graph_edges \
1296 SET weight = weight + $1 \
1297 WHERE id IN ({edge_placeholders}) \
1298 AND valid_to IS NULL"
1299 );
1300 let mut q = zeph_db::query(&sql);
1301 q = q.bind(f64::from(delta));
1302 for id in chunk {
1303 q = q.bind(*id);
1304 }
1305 q.execute(&self.pool).await?;
1306 }
1307 Ok(())
1308 }
1309
1310 pub async fn entity_ids_in(&self, ids: &[i64]) -> Result<Vec<i64>, MemoryError> {
1319 const MAX_BATCH: usize = 490;
1320 if ids.is_empty() {
1321 return Ok(Vec::new());
1322 }
1323 let mut result = Vec::with_capacity(ids.len());
1325 for chunk in ids.chunks(MAX_BATCH) {
1326 let placeholders = zeph_db::placeholder_list(1, chunk.len());
1327 let sql = format!("SELECT id FROM graph_entities WHERE id IN ({placeholders})");
1328 let mut q = zeph_db::query_as::<_, (i64,)>(&sql);
1329 for id in chunk {
1330 q = q.bind(*id);
1331 }
1332 let rows = q.fetch_all(&self.pool).await?;
1333 for (id,) in rows {
1334 result.push(id);
1335 }
1336 }
1337 Ok(result)
1338 }
1339
1340 pub async fn qdrant_point_ids_for_entities(
1348 &self,
1349 entity_ids: &[i64],
1350 ) -> Result<HashMap<i64, String>, MemoryError> {
1351 const MAX_BATCH: usize = 490;
1353 if entity_ids.is_empty() {
1354 return Ok(HashMap::new());
1355 }
1356 let mut result: HashMap<i64, String> = HashMap::with_capacity(entity_ids.len());
1357 for chunk in entity_ids.chunks(MAX_BATCH) {
1358 let placeholders = zeph_db::placeholder_list(1, chunk.len());
1359 let sql = format!(
1360 "SELECT id, qdrant_point_id \
1361 FROM graph_entities \
1362 WHERE id IN ({placeholders}) \
1363 AND qdrant_point_id IS NOT NULL"
1364 );
1365 let mut q = zeph_db::query_as::<_, (i64, String)>(&sql);
1366 for id in chunk {
1367 q = q.bind(*id);
1368 }
1369 let rows = q.fetch_all(&self.pool).await?;
1370 for (entity_id, point_id) in rows {
1371 result.insert(entity_id, point_id);
1372 }
1373 }
1374 Ok(result)
1375 }
1376
1377 pub async fn decay_edge_retrieval_counts(
1386 &self,
1387 decay_lambda: f64,
1388 interval_secs: u64,
1389 ) -> Result<usize, MemoryError> {
1390 let epoch_now_decay = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1391 let decay_raw = format!(
1392 "UPDATE graph_edges \
1393 SET retrieval_count = MAX(CAST(retrieval_count * ? AS INTEGER), 0) \
1394 WHERE valid_to IS NULL \
1395 AND retrieval_count > 0 \
1396 AND (last_retrieved_at IS NULL OR last_retrieved_at < {epoch_now_decay} - ?)"
1397 );
1398 let decay_sql = zeph_db::rewrite_placeholders(&decay_raw);
1399 let result = zeph_db::query(&decay_sql)
1400 .bind(decay_lambda)
1401 .bind(i64::try_from(interval_secs).unwrap_or(i64::MAX))
1402 .execute(&self.pool)
1403 .await?;
1404 Ok(usize::try_from(result.rows_affected())?)
1405 }
1406
1407 pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
1413 let days = i64::from(retention_days);
1414 let result = zeph_db::query(sql!(
1415 "DELETE FROM graph_edges
1416 WHERE expired_at IS NOT NULL
1417 AND expired_at < datetime('now', '-' || ? || ' days')"
1418 ))
1419 .bind(days)
1420 .execute(&self.pool)
1421 .await?;
1422 Ok(usize::try_from(result.rows_affected())?)
1423 }
1424
1425 pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
1431 let days = i64::from(retention_days);
1432 let result = zeph_db::query(sql!(
1433 "DELETE FROM graph_entities
1434 WHERE id NOT IN (
1435 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
1436 UNION
1437 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
1438 )
1439 AND last_seen_at < datetime('now', '-' || ? || ' days')"
1440 ))
1441 .bind(days)
1442 .execute(&self.pool)
1443 .await?;
1444 Ok(usize::try_from(result.rows_affected())?)
1445 }
1446
1447 pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1456 let current = self.entity_count().await?;
1457 let max = i64::try_from(max_entities)?;
1458 if current <= max {
1459 return Ok(0);
1460 }
1461 let excess = current - max;
1462 let result = zeph_db::query(sql!(
1463 "DELETE FROM graph_entities
1464 WHERE id IN (
1465 SELECT e.id
1466 FROM graph_entities e
1467 LEFT JOIN (
1468 SELECT source_entity_id AS eid, COUNT(*) AS cnt
1469 FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1470 UNION ALL
1471 SELECT target_entity_id AS eid, COUNT(*) AS cnt
1472 FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1473 ) edge_counts ON e.id = edge_counts.eid
1474 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1475 LIMIT ?
1476 )"
1477 ))
1478 .bind(excess)
1479 .execute(&self.pool)
1480 .await?;
1481 Ok(usize::try_from(result.rows_affected())?)
1482 }
1483
1484 pub async fn edges_at_timestamp(
1498 &self,
1499 entity_id: i64,
1500 timestamp: &str,
1501 ) -> Result<Vec<Edge>, MemoryError> {
1502 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1506 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1507 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1508 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1509 FROM graph_edges
1510 WHERE valid_to IS NULL
1511 AND valid_from <= ?
1512 AND (source_entity_id = ? OR target_entity_id = ?)
1513 UNION ALL
1514 SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1515 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1516 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1517 FROM graph_edges
1518 WHERE valid_to IS NOT NULL
1519 AND valid_from <= ?
1520 AND valid_to > ?
1521 AND (source_entity_id = ? OR target_entity_id = ?)"
1522 ))
1523 .bind(timestamp)
1524 .bind(entity_id)
1525 .bind(entity_id)
1526 .bind(timestamp)
1527 .bind(timestamp)
1528 .bind(entity_id)
1529 .bind(entity_id)
1530 .fetch_all(&self.pool)
1531 .await?;
1532 Ok(rows.into_iter().map(edge_from_row).collect())
1533 }
1534
1535 pub async fn edge_history(
1544 &self,
1545 source_entity_id: i64,
1546 predicate: &str,
1547 relation: Option<&str>,
1548 limit: usize,
1549 ) -> Result<Vec<Edge>, MemoryError> {
1550 let escaped = predicate
1552 .replace('\\', "\\\\")
1553 .replace('%', "\\%")
1554 .replace('_', "\\_");
1555 let like_pattern = format!("%{escaped}%");
1556 let limit = i64::try_from(limit)?;
1557 let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1558 zeph_db::query_as(sql!(
1559 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1560 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1561 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1562 FROM graph_edges
1563 WHERE source_entity_id = ?
1564 AND fact LIKE ? ESCAPE '\\'
1565 AND relation = ?
1566 ORDER BY valid_from DESC
1567 LIMIT ?"
1568 ))
1569 .bind(source_entity_id)
1570 .bind(&like_pattern)
1571 .bind(rel)
1572 .bind(limit)
1573 .fetch_all(&self.pool)
1574 .await?
1575 } else {
1576 zeph_db::query_as(sql!(
1577 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1578 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1579 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1580 FROM graph_edges
1581 WHERE source_entity_id = ?
1582 AND fact LIKE ? ESCAPE '\\'
1583 ORDER BY valid_from DESC
1584 LIMIT ?"
1585 ))
1586 .bind(source_entity_id)
1587 .bind(&like_pattern)
1588 .bind(limit)
1589 .fetch_all(&self.pool)
1590 .await?
1591 };
1592 Ok(rows.into_iter().map(edge_from_row).collect())
1593 }
1594
1595 pub async fn bfs(
1612 &self,
1613 start_entity_id: i64,
1614 max_hops: u32,
1615 ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1616 self.bfs_with_depth(start_entity_id, max_hops)
1617 .await
1618 .map(|(e, ed, _)| (e, ed))
1619 }
1620
1621 pub async fn bfs_with_depth(
1632 &self,
1633 start_entity_id: i64,
1634 max_hops: u32,
1635 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1636 self.bfs_core(start_entity_id, max_hops, None).await
1637 }
1638
1639 pub async fn bfs_at_timestamp(
1650 &self,
1651 start_entity_id: i64,
1652 max_hops: u32,
1653 timestamp: &str,
1654 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1655 self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1656 .await
1657 }
1658
1659 pub async fn bfs_typed(
1675 &self,
1676 start_entity_id: i64,
1677 max_hops: u32,
1678 edge_types: &[EdgeType],
1679 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1680 if edge_types.is_empty() {
1681 return self.bfs_with_depth(start_entity_id, max_hops).await;
1682 }
1683 self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1684 .await
1685 }
1686
1687 async fn bfs_core(
1695 &self,
1696 start_entity_id: i64,
1697 max_hops: u32,
1698 at_timestamp: Option<&str>,
1699 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1700 use std::collections::HashMap;
1701
1702 const MAX_FRONTIER: usize = 300;
1705
1706 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1707 let mut frontier: Vec<i64> = vec![start_entity_id];
1708 depth_map.insert(start_entity_id, 0);
1709
1710 for hop in 0..max_hops {
1711 if frontier.is_empty() {
1712 break;
1713 }
1714 frontier.truncate(MAX_FRONTIER);
1715 let n = frontier.len();
1719 let ph1 = placeholder_list(1, n);
1720 let ph2 = placeholder_list(n + 1, n);
1721 let ph3 = placeholder_list(n * 2 + 1, n);
1722 let edge_filter = if at_timestamp.is_some() {
1723 let ts_pos = n * 3 + 1;
1724 format!(
1725 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1726 ts = numbered_placeholder(ts_pos),
1727 )
1728 } else {
1729 "valid_to IS NULL".to_owned()
1730 };
1731 let neighbour_sql = format!(
1732 "SELECT DISTINCT CASE
1733 WHEN source_entity_id IN ({ph1}) THEN target_entity_id
1734 ELSE source_entity_id
1735 END as neighbour_id
1736 FROM graph_edges
1737 WHERE {edge_filter}
1738 AND (source_entity_id IN ({ph2}) OR target_entity_id IN ({ph3}))"
1739 );
1740 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1741 for id in &frontier {
1742 q = q.bind(*id);
1743 }
1744 for id in &frontier {
1745 q = q.bind(*id);
1746 }
1747 for id in &frontier {
1748 q = q.bind(*id);
1749 }
1750 if let Some(ts) = at_timestamp {
1751 q = q.bind(ts);
1752 }
1753 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1754 let mut next_frontier: Vec<i64> = Vec::new();
1755 for nbr in neighbours {
1756 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1757 e.insert(hop + 1);
1758 next_frontier.push(nbr);
1759 }
1760 }
1761 frontier = next_frontier;
1762 }
1763
1764 self.bfs_fetch_results(depth_map, at_timestamp).await
1765 }
1766
1767 async fn bfs_core_typed(
1776 &self,
1777 start_entity_id: i64,
1778 max_hops: u32,
1779 at_timestamp: Option<&str>,
1780 edge_types: &[EdgeType],
1781 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1782 use std::collections::HashMap;
1783
1784 const MAX_FRONTIER: usize = 300;
1785
1786 let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1787
1788 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1789 let mut frontier: Vec<i64> = vec![start_entity_id];
1790 depth_map.insert(start_entity_id, 0);
1791
1792 let n_types = type_strs.len();
1793 let type_in = placeholder_list(1, n_types);
1795 let id_start = n_types + 1;
1796
1797 for hop in 0..max_hops {
1798 if frontier.is_empty() {
1799 break;
1800 }
1801 frontier.truncate(MAX_FRONTIER);
1802
1803 let n_frontier = frontier.len();
1804 let fp1 = placeholder_list(id_start, n_frontier);
1806 let fp2 = placeholder_list(id_start + n_frontier, n_frontier);
1807 let fp3 = placeholder_list(id_start + n_frontier * 2, n_frontier);
1808
1809 let edge_filter = if at_timestamp.is_some() {
1810 let ts_pos = id_start + n_frontier * 3;
1811 format!(
1812 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1813 ts = numbered_placeholder(ts_pos),
1814 )
1815 } else {
1816 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1817 };
1818
1819 let neighbour_sql = format!(
1820 "SELECT DISTINCT CASE
1821 WHEN source_entity_id IN ({fp1}) THEN target_entity_id
1822 ELSE source_entity_id
1823 END as neighbour_id
1824 FROM graph_edges
1825 WHERE {edge_filter}
1826 AND (source_entity_id IN ({fp2}) OR target_entity_id IN ({fp3}))"
1827 );
1828
1829 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1830 for t in &type_strs {
1832 q = q.bind(*t);
1833 }
1834 for id in &frontier {
1836 q = q.bind(*id);
1837 }
1838 for id in &frontier {
1839 q = q.bind(*id);
1840 }
1841 for id in &frontier {
1842 q = q.bind(*id);
1843 }
1844 if let Some(ts) = at_timestamp {
1845 q = q.bind(ts);
1846 }
1847
1848 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1849 let mut next_frontier: Vec<i64> = Vec::new();
1850 for nbr in neighbours {
1851 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1852 e.insert(hop + 1);
1853 next_frontier.push(nbr);
1854 }
1855 }
1856 frontier = next_frontier;
1857 }
1858
1859 self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1861 .await
1862 }
1863
1864 async fn bfs_fetch_results_typed(
1872 &self,
1873 depth_map: std::collections::HashMap<i64, u32>,
1874 at_timestamp: Option<&str>,
1875 type_strs: &[&str],
1876 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1877 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1878 if visited_ids.is_empty() {
1879 return Ok((Vec::new(), Vec::new(), depth_map));
1880 }
1881 if visited_ids.len() > 499 {
1882 tracing::warn!(
1883 total = visited_ids.len(),
1884 retained = 499,
1885 "bfs_fetch_results_typed: visited entity set truncated to 499"
1886 );
1887 visited_ids.truncate(499);
1888 }
1889
1890 let n_types = type_strs.len();
1891 let n_visited = visited_ids.len();
1892
1893 let type_in = placeholder_list(1, n_types);
1895 let id_start = n_types + 1;
1896 let ph_ids1 = placeholder_list(id_start, n_visited);
1897 let ph_ids2 = placeholder_list(id_start + n_visited, n_visited);
1898
1899 let edge_filter = if at_timestamp.is_some() {
1900 let ts_pos = id_start + n_visited * 2;
1901 format!(
1902 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1903 ts = numbered_placeholder(ts_pos),
1904 )
1905 } else {
1906 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1907 };
1908
1909 let edge_sql = format!(
1910 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1911 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1912 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1913 FROM graph_edges
1914 WHERE {edge_filter}
1915 AND source_entity_id IN ({ph_ids1})
1916 AND target_entity_id IN ({ph_ids2})"
1917 );
1918 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1919 for t in type_strs {
1920 edge_query = edge_query.bind(*t);
1921 }
1922 for id in &visited_ids {
1923 edge_query = edge_query.bind(*id);
1924 }
1925 for id in &visited_ids {
1926 edge_query = edge_query.bind(*id);
1927 }
1928 if let Some(ts) = at_timestamp {
1929 edge_query = edge_query.bind(ts);
1930 }
1931 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1932
1933 let entity_sql2 = format!(
1935 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1936 FROM graph_entities WHERE id IN ({ph})",
1937 ph = placeholder_list(1, visited_ids.len()),
1938 );
1939 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql2);
1940 for id in &visited_ids {
1941 entity_query = entity_query.bind(*id);
1942 }
1943 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1944
1945 let entities: Vec<Entity> = entity_rows
1946 .into_iter()
1947 .map(entity_from_row)
1948 .collect::<Result<Vec<_>, _>>()?;
1949 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1950
1951 Ok((entities, edges, depth_map))
1952 }
1953
1954 async fn bfs_fetch_results(
1956 &self,
1957 depth_map: std::collections::HashMap<i64, u32>,
1958 at_timestamp: Option<&str>,
1959 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1960 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1961 if visited_ids.is_empty() {
1962 return Ok((Vec::new(), Vec::new(), depth_map));
1963 }
1964 if visited_ids.len() > 499 {
1966 tracing::warn!(
1967 total = visited_ids.len(),
1968 retained = 499,
1969 "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1970 some reachable entities will be dropped from results"
1971 );
1972 visited_ids.truncate(499);
1973 }
1974
1975 let n = visited_ids.len();
1976 let ph_ids1 = placeholder_list(1, n);
1977 let ph_ids2 = placeholder_list(n + 1, n);
1978 let edge_filter = if at_timestamp.is_some() {
1979 let ts_pos = n * 2 + 1;
1980 format!(
1981 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1982 ts = numbered_placeholder(ts_pos),
1983 )
1984 } else {
1985 "valid_to IS NULL".to_owned()
1986 };
1987 let edge_sql = format!(
1988 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1989 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1990 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight
1991 FROM graph_edges
1992 WHERE {edge_filter}
1993 AND source_entity_id IN ({ph_ids1})
1994 AND target_entity_id IN ({ph_ids2})"
1995 );
1996 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1997 for id in &visited_ids {
1998 edge_query = edge_query.bind(*id);
1999 }
2000 for id in &visited_ids {
2001 edge_query = edge_query.bind(*id);
2002 }
2003 if let Some(ts) = at_timestamp {
2004 edge_query = edge_query.bind(ts);
2005 }
2006 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
2007
2008 let entity_sql = format!(
2009 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
2010 FROM graph_entities WHERE id IN ({ph})",
2011 ph = placeholder_list(1, n),
2012 );
2013 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql);
2014 for id in &visited_ids {
2015 entity_query = entity_query.bind(*id);
2016 }
2017 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
2018
2019 let entities: Vec<Entity> = entity_rows
2020 .into_iter()
2021 .map(entity_from_row)
2022 .collect::<Result<Vec<_>, _>>()?;
2023 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
2024
2025 Ok((entities, edges, depth_map))
2026 }
2027
2028 pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
2044 let find_by_name_sql = format!(
2045 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id \
2046 FROM graph_entities \
2047 WHERE name = ? {cn} OR canonical_name = ? {cn} \
2048 LIMIT 5",
2049 cn = <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
2050 );
2051 let rows: Vec<EntityRow> = zeph_db::query_as(&find_by_name_sql)
2052 .bind(name)
2053 .bind(name)
2054 .fetch_all(&self.pool)
2055 .await?;
2056
2057 if !rows.is_empty() {
2058 return rows.into_iter().map(entity_from_row).collect();
2059 }
2060
2061 self.find_entities_fuzzy(name, 5).await
2062 }
2063
2064 pub async fn unprocessed_messages_for_backfill(
2072 &self,
2073 limit: usize,
2074 ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
2075 let limit = i64::try_from(limit)?;
2076 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
2077 "SELECT id, content FROM messages
2078 WHERE graph_processed = 0
2079 ORDER BY id ASC
2080 LIMIT ?"
2081 ))
2082 .bind(limit)
2083 .fetch_all(&self.pool)
2084 .await?;
2085 Ok(rows
2086 .into_iter()
2087 .map(|(id, content)| (crate::types::MessageId(id), content))
2088 .collect())
2089 }
2090
2091 pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
2097 let count: i64 = zeph_db::query_scalar(sql!(
2098 "SELECT COUNT(*) FROM messages WHERE graph_processed = 0"
2099 ))
2100 .fetch_one(&self.pool)
2101 .await?;
2102 Ok(count)
2103 }
2104
2105 pub async fn mark_messages_graph_processed(
2111 &self,
2112 ids: &[crate::types::MessageId],
2113 ) -> Result<(), MemoryError> {
2114 const MAX_BATCH: usize = 490;
2115 if ids.is_empty() {
2116 return Ok(());
2117 }
2118 for chunk in ids.chunks(MAX_BATCH) {
2119 let placeholders = placeholder_list(1, chunk.len());
2120 let sql =
2121 format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
2122 let mut query = zeph_db::query(&sql);
2123 for id in chunk {
2124 query = query.bind(id.0);
2125 }
2126 query.execute(&self.pool).await?;
2127 }
2128 Ok(())
2129 }
2130}
2131
2132#[cfg(feature = "sqlite")]
2135fn community_ids_sql(placeholders: &str) -> String {
2136 format!(
2137 "SELECT CAST(j.value AS INTEGER) AS entity_id, c.id AS community_id
2138 FROM graph_communities c, json_each(c.entity_ids) j
2139 WHERE CAST(j.value AS INTEGER) IN ({placeholders})"
2140 )
2141}
2142
2143#[cfg(feature = "postgres")]
2144fn community_ids_sql(placeholders: &str) -> String {
2145 format!(
2146 "SELECT (j.value)::bigint AS entity_id, c.id AS community_id
2147 FROM graph_communities c,
2148 jsonb_array_elements_text(c.entity_ids::jsonb) j(value)
2149 WHERE (j.value)::bigint IN ({placeholders})"
2150 )
2151}
2152
2153#[derive(zeph_db::FromRow)]
2156struct EntityRow {
2157 id: i64,
2158 name: String,
2159 canonical_name: String,
2160 entity_type: String,
2161 summary: Option<String>,
2162 first_seen_at: String,
2163 last_seen_at: String,
2164 qdrant_point_id: Option<String>,
2165}
2166
2167fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
2168 let entity_type = row
2169 .entity_type
2170 .parse::<EntityType>()
2171 .map_err(MemoryError::GraphStore)?;
2172 Ok(Entity {
2173 id: row.id,
2174 name: row.name,
2175 canonical_name: row.canonical_name,
2176 entity_type,
2177 summary: row.summary,
2178 first_seen_at: row.first_seen_at,
2179 last_seen_at: row.last_seen_at,
2180 qdrant_point_id: row.qdrant_point_id,
2181 })
2182}
2183
2184#[derive(zeph_db::FromRow)]
2185struct AliasRow {
2186 id: i64,
2187 entity_id: i64,
2188 alias_name: String,
2189 created_at: String,
2190}
2191
2192fn alias_from_row(row: AliasRow) -> EntityAlias {
2193 EntityAlias {
2194 id: row.id,
2195 entity_id: row.entity_id,
2196 alias_name: row.alias_name,
2197 created_at: row.created_at,
2198 }
2199}
2200
2201#[derive(zeph_db::FromRow)]
2202struct EdgeRow {
2203 id: i64,
2204 source_entity_id: i64,
2205 target_entity_id: i64,
2206 relation: String,
2207 fact: String,
2208 confidence: f64,
2209 valid_from: String,
2210 valid_to: Option<String>,
2211 created_at: String,
2212 expired_at: Option<String>,
2213 #[sqlx(rename = "episode_id")]
2214 source_message_id: Option<i64>,
2215 qdrant_point_id: Option<String>,
2216 edge_type: String,
2217 retrieval_count: i32,
2218 last_retrieved_at: Option<i64>,
2219 superseded_by: Option<i64>,
2220 canonical_relation: Option<String>,
2221 supersedes: Option<i64>,
2222 weight: f64,
2224}
2225
2226fn edge_from_row(row: EdgeRow) -> Edge {
2227 let edge_type = row
2228 .edge_type
2229 .parse::<EdgeType>()
2230 .unwrap_or(EdgeType::Semantic);
2231 let canonical_relation = row
2232 .canonical_relation
2233 .unwrap_or_else(|| row.relation.clone());
2234 Edge {
2235 id: row.id,
2236 source_entity_id: row.source_entity_id,
2237 target_entity_id: row.target_entity_id,
2238 canonical_relation,
2239 relation: row.relation,
2240 fact: row.fact,
2241 #[allow(clippy::cast_possible_truncation)]
2242 confidence: row.confidence as f32,
2243 valid_from: row.valid_from,
2244 valid_to: row.valid_to,
2245 created_at: row.created_at,
2246 expired_at: row.expired_at,
2247 source_message_id: row.source_message_id.map(MessageId),
2248 qdrant_point_id: row.qdrant_point_id,
2249 edge_type,
2250 retrieval_count: row.retrieval_count,
2251 last_retrieved_at: row.last_retrieved_at,
2252 superseded_by: row.superseded_by,
2253 supersedes: row.supersedes,
2254 #[allow(clippy::cast_possible_truncation)]
2255 weight: row.weight as f32,
2256 }
2257}
2258
2259#[derive(zeph_db::FromRow)]
2260struct CommunityRow {
2261 id: i64,
2262 name: String,
2263 summary: String,
2264 entity_ids: String,
2265 fingerprint: Option<String>,
2266 created_at: String,
2267 updated_at: String,
2268}
2269
2270impl GraphStore {
2273 pub async fn ensure_episode(&self, conversation_id: i64) -> Result<i64, MemoryError> {
2281 zeph_db::query(sql!("INSERT OR IGNORE INTO conversations (id) VALUES (?)"))
2285 .bind(conversation_id)
2286 .execute(&self.pool)
2287 .await?;
2288
2289 let id: i64 = zeph_db::query_scalar(sql!(
2290 "INSERT INTO graph_episodes (conversation_id)
2291 VALUES (?)
2292 ON CONFLICT(conversation_id) DO UPDATE SET conversation_id = excluded.conversation_id
2293 RETURNING id"
2294 ))
2295 .bind(conversation_id)
2296 .fetch_one(&self.pool)
2297 .await?;
2298 Ok(id)
2299 }
2300
2301 pub async fn link_entity_to_episode(
2309 &self,
2310 episode_id: i64,
2311 entity_id: i64,
2312 ) -> Result<(), MemoryError> {
2313 zeph_db::query(sql!(
2314 "INSERT OR IGNORE INTO graph_episode_entities (episode_id, entity_id)
2315 VALUES (?, ?)"
2316 ))
2317 .bind(episode_id)
2318 .bind(entity_id)
2319 .execute(&self.pool)
2320 .await?;
2321 Ok(())
2322 }
2323
2324 pub async fn episodes_for_entity(
2330 &self,
2331 entity_id: i64,
2332 ) -> Result<Vec<super::types::Episode>, MemoryError> {
2333 #[derive(zeph_db::FromRow)]
2334 struct EpisodeRow {
2335 id: i64,
2336 conversation_id: i64,
2337 created_at: String,
2338 closed_at: Option<String>,
2339 }
2340 let rows: Vec<EpisodeRow> = zeph_db::query_as(sql!(
2341 "SELECT e.id, e.conversation_id, e.created_at, e.closed_at
2342 FROM graph_episodes e
2343 JOIN graph_episode_entities ee ON ee.episode_id = e.id
2344 WHERE ee.entity_id = ?"
2345 ))
2346 .bind(entity_id)
2347 .fetch_all(&self.pool)
2348 .await?;
2349 Ok(rows
2350 .into_iter()
2351 .map(|r| super::types::Episode {
2352 id: r.id,
2353 conversation_id: r.conversation_id,
2354 created_at: r.created_at,
2355 closed_at: r.closed_at,
2356 })
2357 .collect())
2358 }
2359
2360 #[allow(clippy::too_many_arguments)]
2379 pub async fn insert_or_supersede_with_metrics(
2382 &self,
2383 source_entity_id: i64,
2384 target_entity_id: i64,
2385 relation: &str,
2386 canonical_relation: &str,
2387 fact: &str,
2388 confidence: f32,
2389 episode_id: Option<MessageId>,
2390 edge_type: EdgeType,
2391 set_supersedes: bool,
2392 metrics: Option<&ApexMetrics>,
2393 ) -> Result<i64, MemoryError> {
2394 if source_entity_id == target_entity_id {
2395 return Err(MemoryError::InvalidInput(format!(
2396 "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
2397 )));
2398 }
2399 let confidence = confidence.clamp(0.0, 1.0);
2400 let edge_type_str = edge_type.as_str();
2401 let episode_raw: Option<i64> = episode_id.map(|m| m.0);
2402
2403 let mut tx = zeph_db::begin(&self.pool).await?;
2404
2405 if let Some(existing_id) = find_identical_active_edge(
2406 &mut tx,
2407 source_entity_id,
2408 target_entity_id,
2409 canonical_relation,
2410 edge_type_str,
2411 fact,
2412 )
2413 .await?
2414 {
2415 record_reassertion(&mut tx, existing_id, episode_raw, confidence).await?;
2416 tx.commit().await?;
2417 return Ok(existing_id);
2418 }
2419
2420 let prior_head =
2421 find_prior_active_head(&mut tx, source_entity_id, canonical_relation, edge_type_str)
2422 .await?;
2423
2424 if let Some(head_id) = prior_head {
2427 check_supersede_depth_in_tx(&mut tx, head_id).await?;
2428 }
2429
2430 let supersedes_val: Option<i64> = if set_supersedes { prior_head } else { None };
2431 let new_id = insert_new_edge(
2432 &mut tx,
2433 source_entity_id,
2434 target_entity_id,
2435 relation,
2436 canonical_relation,
2437 fact,
2438 confidence,
2439 episode_raw,
2440 edge_type_str,
2441 supersedes_val,
2442 )
2443 .await?;
2444
2445 if let Some(head_id) = prior_head {
2446 invalidate_prior_head(&mut tx, head_id, new_id).await?;
2447 if let Some(m) = metrics {
2448 m.supersedes_total
2449 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2450 }
2451 }
2452
2453 tx.commit().await?;
2454 Ok(new_id)
2455 }
2456
2457 #[allow(clippy::too_many_arguments)] pub async fn insert_or_supersede(
2464 &self,
2465 source_entity_id: i64,
2466 target_entity_id: i64,
2467 relation: &str,
2468 canonical_relation: &str,
2469 fact: &str,
2470 confidence: f32,
2471 episode_id: Option<MessageId>,
2472 edge_type: EdgeType,
2473 set_supersedes: bool,
2474 ) -> Result<i64, MemoryError> {
2475 self.insert_or_supersede_with_metrics(
2476 source_entity_id,
2477 target_entity_id,
2478 relation,
2479 canonical_relation,
2480 fact,
2481 confidence,
2482 episode_id,
2483 edge_type,
2484 set_supersedes,
2485 None,
2486 )
2487 .await
2488 }
2489
2490 pub async fn check_supersede_depth(&self, head_id: i64) -> Result<usize, MemoryError> {
2500 Self::check_supersede_depth_with_pool(&self.pool, head_id).await
2501 }
2502
2503 async fn check_supersede_depth_with_pool(
2504 pool: &zeph_db::DbPool,
2505 head_id: i64,
2506 ) -> Result<usize, MemoryError> {
2507 let cap = i64::try_from(SUPERSEDE_DEPTH_CAP + 1).unwrap_or(i64::MAX);
2508 let depth: Option<i64> = zeph_db::query_scalar(sql!(
2511 "WITH RECURSIVE chain(id, depth) AS (
2512 SELECT id, 0 FROM graph_edges WHERE id = ?
2513 UNION ALL
2514 SELECT e.supersedes, c.depth + 1
2515 FROM graph_edges e JOIN chain c ON e.id = c.id
2516 WHERE e.supersedes IS NOT NULL AND c.depth < ?
2517 )
2518 SELECT MAX(depth) FROM chain"
2519 ))
2520 .bind(head_id)
2521 .bind(cap)
2522 .fetch_optional(pool)
2523 .await?
2524 .flatten();
2525
2526 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
2527 let d = depth.unwrap_or(0) as usize;
2528 if d > SUPERSEDE_DEPTH_CAP {
2529 return Err(MemoryError::SupersedeCycle(head_id));
2530 }
2531 Ok(d)
2532 }
2533}
2534
2535type Tx<'a> = zeph_db::DbTransaction<'a>;
2540
2541async fn find_identical_active_edge(
2545 tx: &mut Tx<'_>,
2546 src: i64,
2547 tgt: i64,
2548 canon: &str,
2549 edge_type_str: &str,
2550 fact: &str,
2551) -> Result<Option<i64>, MemoryError> {
2552 Ok(zeph_db::query_scalar(sql!(
2553 "SELECT id FROM graph_edges
2554 WHERE source_entity_id = ?
2555 AND target_entity_id = ?
2556 AND canonical_relation = ?
2557 AND edge_type = ?
2558 AND fact = ?
2559 AND valid_to IS NULL
2560 AND expired_at IS NULL
2561 LIMIT 1"
2562 ))
2563 .bind(src)
2564 .bind(tgt)
2565 .bind(canon)
2566 .bind(edge_type_str)
2567 .bind(fact)
2568 .fetch_optional(&mut **tx)
2569 .await?)
2570}
2571
2572async fn record_reassertion(
2574 tx: &mut Tx<'_>,
2575 head_id: i64,
2576 episode_raw: Option<i64>,
2577 confidence: f32,
2578) -> Result<(), MemoryError> {
2579 #[allow(clippy::cast_possible_wrap)]
2580 let asserted_at = std::time::SystemTime::now()
2581 .duration_since(std::time::UNIX_EPOCH)
2582 .unwrap_or_default()
2583 .as_secs() as i64;
2584 zeph_db::query(sql!(
2585 "INSERT INTO edge_reassertions (head_edge_id, asserted_at, episode_id, confidence)
2586 VALUES (?, ?, ?, ?)"
2587 ))
2588 .bind(head_id)
2589 .bind(asserted_at)
2590 .bind(episode_raw)
2591 .bind(f64::from(confidence))
2592 .execute(&mut **tx)
2593 .await?;
2594 Ok(())
2595}
2596
2597async fn find_prior_active_head(
2599 tx: &mut Tx<'_>,
2600 src: i64,
2601 canon: &str,
2602 edge_type_str: &str,
2603) -> Result<Option<i64>, MemoryError> {
2604 Ok(zeph_db::query_scalar(sql!(
2605 "SELECT id FROM graph_edges
2606 WHERE source_entity_id = ?
2607 AND canonical_relation = ?
2608 AND edge_type = ?
2609 AND valid_to IS NULL
2610 AND expired_at IS NULL
2611 ORDER BY created_at DESC
2612 LIMIT 1"
2613 ))
2614 .bind(src)
2615 .bind(canon)
2616 .bind(edge_type_str)
2617 .fetch_optional(&mut **tx)
2618 .await?)
2619}
2620
2621async fn check_supersede_depth_in_tx(tx: &mut Tx<'_>, head_id: i64) -> Result<(), MemoryError> {
2625 let cap = i64::try_from(SUPERSEDE_DEPTH_CAP + 1).unwrap_or(i64::MAX);
2626 let depth: Option<i64> = sqlx::query_scalar(
2627 "WITH RECURSIVE chain(id, depth) AS (
2628 SELECT supersedes, 1 FROM graph_edges WHERE id = ? AND supersedes IS NOT NULL
2629 UNION ALL
2630 SELECT e.supersedes, c.depth + 1
2631 FROM graph_edges e JOIN chain c ON e.id = c.id
2632 WHERE e.supersedes IS NOT NULL AND c.depth < ?
2633 )
2634 SELECT MAX(depth) FROM chain",
2635 )
2636 .bind(head_id)
2637 .bind(cap)
2638 .fetch_optional(&mut **tx)
2639 .await?
2640 .flatten();
2641 let d = usize::try_from(depth.unwrap_or(0)).unwrap_or(usize::MAX);
2642 if d > SUPERSEDE_DEPTH_CAP {
2643 return Err(MemoryError::SupersedeDepthExceeded(head_id));
2644 }
2645 Ok(())
2646}
2647
2648#[allow(clippy::too_many_arguments)]
2650async fn insert_new_edge(
2651 tx: &mut Tx<'_>,
2652 src: i64,
2653 tgt: i64,
2654 relation: &str,
2655 canonical_relation: &str,
2656 fact: &str,
2657 confidence: f32,
2658 episode_raw: Option<i64>,
2659 edge_type_str: &str,
2660 supersedes_val: Option<i64>,
2661) -> Result<i64, MemoryError> {
2662 Ok(zeph_db::query_scalar(sql!(
2663 "INSERT INTO graph_edges
2664 (source_entity_id, target_entity_id, relation, canonical_relation, fact,
2665 confidence, episode_id, edge_type, supersedes)
2666 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
2667 RETURNING id"
2668 ))
2669 .bind(src)
2670 .bind(tgt)
2671 .bind(relation)
2672 .bind(canonical_relation)
2673 .bind(fact)
2674 .bind(f64::from(confidence))
2675 .bind(episode_raw)
2676 .bind(edge_type_str)
2677 .bind(supersedes_val)
2678 .fetch_one(&mut **tx)
2679 .await?)
2680}
2681
2682async fn invalidate_prior_head(
2684 tx: &mut Tx<'_>,
2685 head_id: i64,
2686 new_id: i64,
2687) -> Result<(), MemoryError> {
2688 zeph_db::query(sql!(
2689 "UPDATE graph_edges
2690 SET valid_to = CURRENT_TIMESTAMP,
2691 expired_at = CURRENT_TIMESTAMP,
2692 superseded_by = ?
2693 WHERE id = ?"
2694 ))
2695 .bind(new_id)
2696 .bind(head_id)
2697 .execute(&mut **tx)
2698 .await?;
2699 Ok(())
2700}
2701
2702type EntityFtsRow = (
2706 i64,
2707 String,
2708 String,
2709 String,
2710 Option<String>,
2711 String,
2712 String,
2713 Option<String>,
2714 f64,
2715);
2716
2717fn build_fts_query(query: &str) -> Option<String> {
2722 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
2723 let sanitized = sanitize_fts_query(query);
2724 if sanitized.is_empty() {
2725 return None;
2726 }
2727 let fts_query: String = sanitized
2728 .split_whitespace()
2729 .filter(|t| !FTS5_OPERATORS.contains(t))
2730 .map(|t| format!("{t}*"))
2731 .collect::<Vec<_>>()
2732 .join(" ");
2733 if fts_query.is_empty() {
2734 None
2735 } else {
2736 Some(fts_query)
2737 }
2738}
2739
2740fn build_ranked_fts_sql() -> String {
2745 format!(
2746 "SELECT * FROM ( \
2747 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
2748 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
2749 -bm25(graph_entities_fts, 10.0, 1.0) AS fts_rank \
2750 FROM graph_entities_fts fts \
2751 JOIN graph_entities e ON e.id = fts.rowid \
2752 WHERE graph_entities_fts MATCH ? \
2753 UNION ALL \
2754 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
2755 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
2756 0.5 AS fts_rank \
2757 FROM graph_entity_aliases a \
2758 JOIN graph_entities e ON e.id = a.entity_id \
2759 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
2760 ) \
2761 ORDER BY fts_rank DESC \
2762 LIMIT ?",
2763 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
2764 )
2765}
2766
2767fn normalize_and_dedup(rows: Vec<EntityFtsRow>) -> Vec<(Entity, f32)> {
2771 let max_score: f64 = rows.iter().map(|r| r.8).fold(0.0_f64, f64::max);
2773 let max_score = if max_score <= 0.0 { 1.0 } else { max_score };
2774
2775 let mut seen_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
2776 let mut result: Vec<(Entity, f32)> = Vec::with_capacity(rows.len());
2777 for (
2778 id,
2779 name,
2780 canonical_name,
2781 entity_type_str,
2782 summary,
2783 first_seen_at,
2784 last_seen_at,
2785 qdrant_point_id,
2786 raw_score,
2787 ) in rows
2788 {
2789 if !seen_ids.insert(id) {
2790 continue;
2791 }
2792 let entity_type = entity_type_str
2793 .parse()
2794 .unwrap_or(super::types::EntityType::Concept);
2795 let entity = Entity {
2796 id,
2797 name,
2798 canonical_name,
2799 entity_type,
2800 summary,
2801 first_seen_at,
2802 last_seen_at,
2803 qdrant_point_id,
2804 };
2805 #[allow(clippy::cast_possible_truncation)]
2806 let normalized = (raw_score / max_score).clamp(0.0, 1.0) as f32;
2807 result.push((entity, normalized));
2808 }
2809 result
2810}
2811
2812#[cfg(test)]
2815mod tests;