1use std::collections::HashMap;
11#[allow(unused_imports)]
12use zeph_db::sql;
13
14use futures::Stream;
15use zeph_db::fts::sanitize_fts_query;
16use zeph_db::{ActiveDialect, DbPool, numbered_placeholder, placeholder_list};
17
18use crate::error::MemoryError;
19use crate::graph::conflict::{ApexMetrics, SUPERSEDE_DEPTH_CAP};
20use crate::types::{EntityId, MessageId};
21
22use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
23
24pub struct GraphStore {
31 pool: DbPool,
32 benna_fast_rate: f32,
34 benna_slow_rate: f32,
36}
37
38impl GraphStore {
39 #[must_use]
43 pub fn new(pool: DbPool) -> Self {
44 Self {
45 pool,
46 benna_fast_rate: 0.5,
47 benna_slow_rate: 0.05,
48 }
49 }
50
51 #[must_use]
55 pub fn with_benna_rates(mut self, fast_rate: f32, slow_rate: f32) -> Self {
56 self.benna_fast_rate = fast_rate;
57 self.benna_slow_rate = slow_rate;
58 self
59 }
60
61 #[must_use]
63 pub fn pool(&self) -> &DbPool {
64 &self.pool
65 }
66
67 pub async fn upsert_entity(
80 &self,
81 surface_name: &str,
82 canonical_name: &str,
83 entity_type: EntityType,
84 summary: Option<&str>,
85 ) -> Result<EntityId, MemoryError> {
86 let type_str = entity_type.as_str();
87 let id: i64 = zeph_db::query_scalar(sql!(
88 "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
89 VALUES (?, ?, ?, ?)
90 ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
91 name = excluded.name,
92 summary = COALESCE(excluded.summary, summary),
93 last_seen_at = CURRENT_TIMESTAMP
94 RETURNING id"
95 ))
96 .bind(surface_name)
97 .bind(canonical_name)
98 .bind(type_str)
99 .bind(summary)
100 .fetch_one(&self.pool)
101 .await?;
102 Ok(EntityId(id))
103 }
104
105 pub async fn find_entity(
111 &self,
112 canonical_name: &str,
113 entity_type: EntityType,
114 ) -> Result<Option<Entity>, MemoryError> {
115 let type_str = entity_type.as_str();
116 let row: Option<EntityRow> = zeph_db::query_as(
117 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
118 FROM graph_entities
119 WHERE canonical_name = ? AND entity_type = ?"),
120 )
121 .bind(canonical_name)
122 .bind(type_str)
123 .fetch_optional(&self.pool)
124 .await?;
125 row.map(entity_from_row).transpose()
126 }
127
128 pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
134 let row: Option<EntityRow> = zeph_db::query_as(
135 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
136 FROM graph_entities
137 WHERE id = ?"),
138 )
139 .bind(entity_id)
140 .fetch_optional(&self.pool)
141 .await?;
142 row.map(entity_from_row).transpose()
143 }
144
145 pub async fn set_entity_qdrant_point_id(
151 &self,
152 entity_id: i64,
153 point_id: &str,
154 ) -> Result<(), MemoryError> {
155 zeph_db::query(sql!(
156 "UPDATE graph_entities SET qdrant_point_id = ? WHERE id = ?"
157 ))
158 .bind(point_id)
159 .bind(entity_id)
160 .execute(&self.pool)
161 .await?;
162 Ok(())
163 }
164
165 pub async fn find_entities_fuzzy(
186 &self,
187 query: &str,
188 limit: usize,
189 ) -> Result<Vec<Entity>, MemoryError> {
190 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
194 let query = &query[..query.floor_char_boundary(512)];
195 let sanitized = sanitize_fts_query(query);
198 if sanitized.is_empty() {
199 return Ok(vec![]);
200 }
201 let fts_query: String = sanitized
202 .split_whitespace()
203 .filter(|t| !FTS5_OPERATORS.contains(t))
204 .map(|t| format!("{t}*"))
205 .collect::<Vec<_>>()
206 .join(" ");
207 if fts_query.is_empty() {
208 return Ok(vec![]);
209 }
210
211 let limit = i64::try_from(limit)?;
212 let search_sql = format!(
215 "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
216 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
217 FROM graph_entities_fts fts \
218 JOIN graph_entities e ON e.id = fts.rowid \
219 WHERE graph_entities_fts MATCH ? \
220 UNION \
221 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
222 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
223 FROM graph_entity_aliases a \
224 JOIN graph_entities e ON e.id = a.entity_id \
225 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
226 LIMIT ?",
227 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
228 );
229 let rows: Vec<EntityRow> = zeph_db::query_as(&search_sql)
230 .bind(&fts_query)
231 .bind(format!(
232 "%{}%",
233 query
234 .trim()
235 .replace('\\', "\\\\")
236 .replace('%', "\\%")
237 .replace('_', "\\_")
238 ))
239 .bind(limit)
240 .fetch_all(&self.pool)
241 .await?;
242 rows.into_iter()
243 .map(entity_from_row)
244 .collect::<Result<Vec<_>, _>>()
245 }
246
247 #[cfg(all(feature = "sqlite", not(feature = "postgres")))]
257 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
258 zeph_db::query("PRAGMA wal_checkpoint(PASSIVE)")
259 .execute(&self.pool)
260 .await?;
261 Ok(())
262 }
263
264 #[cfg(feature = "postgres")]
270 #[allow(clippy::unused_async)]
271 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
272 Ok(())
273 }
274
275 pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
277 use futures::StreamExt as _;
278 zeph_db::query_as::<_, EntityRow>(
279 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
280 FROM graph_entities ORDER BY id ASC"),
281 )
282 .fetch(&self.pool)
283 .map(|r: Result<EntityRow, zeph_db::SqlxError>| {
284 r.map_err(MemoryError::from).and_then(entity_from_row)
285 })
286 }
287
288 pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
296 let insert_alias_sql = format!(
297 "{} INTO graph_entity_aliases (entity_id, alias_name) VALUES (?, ?){}",
298 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
299 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
300 );
301 zeph_db::query(&insert_alias_sql)
302 .bind(entity_id)
303 .bind(alias_name)
304 .execute(&self.pool)
305 .await?;
306 Ok(())
307 }
308
309 pub async fn find_entity_by_alias(
317 &self,
318 alias_name: &str,
319 entity_type: EntityType,
320 ) -> Result<Option<Entity>, MemoryError> {
321 let type_str = entity_type.as_str();
322 let alias_typed_sql = format!(
323 "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
324 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
325 FROM graph_entity_aliases a \
326 JOIN graph_entities e ON e.id = a.entity_id \
327 WHERE a.alias_name = ? {} \
328 AND e.entity_type = ? \
329 ORDER BY e.id ASC \
330 LIMIT 1",
331 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
332 );
333 let row: Option<EntityRow> = zeph_db::query_as(&alias_typed_sql)
334 .bind(alias_name)
335 .bind(type_str)
336 .fetch_optional(&self.pool)
337 .await?;
338 row.map(entity_from_row).transpose()
339 }
340
341 pub async fn aliases_for_entity(
347 &self,
348 entity_id: i64,
349 ) -> Result<Vec<EntityAlias>, MemoryError> {
350 let rows: Vec<AliasRow> = zeph_db::query_as(sql!(
351 "SELECT id, entity_id, alias_name, created_at
352 FROM graph_entity_aliases
353 WHERE entity_id = ?
354 ORDER BY id ASC"
355 ))
356 .bind(entity_id)
357 .fetch_all(&self.pool)
358 .await?;
359 Ok(rows.into_iter().map(alias_from_row).collect())
360 }
361
362 pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
368 use futures::TryStreamExt as _;
369 self.all_entities_stream().try_collect().await
370 }
371
372 pub async fn entity_count(&self) -> Result<i64, MemoryError> {
378 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_entities"))
379 .fetch_one(&self.pool)
380 .await?;
381 Ok(count)
382 }
383
384 pub async fn insert_edge(
402 &self,
403 source_entity_id: i64,
404 target_entity_id: i64,
405 relation: &str,
406 fact: &str,
407 confidence: f32,
408 episode_id: Option<MessageId>,
409 ) -> Result<i64, MemoryError> {
410 self.insert_edge_typed(
411 source_entity_id,
412 target_entity_id,
413 relation,
414 fact,
415 confidence,
416 episode_id,
417 EdgeType::Semantic,
418 )
419 .await
420 }
421
422 #[allow(clippy::too_many_arguments)] pub async fn insert_edge_typed(
432 &self,
433 source_entity_id: i64,
434 target_entity_id: i64,
435 relation: &str,
436 fact: &str,
437 confidence: f32,
438 episode_id: Option<MessageId>,
439 edge_type: EdgeType,
440 ) -> Result<i64, MemoryError> {
441 if source_entity_id == target_entity_id {
442 return Err(MemoryError::InvalidInput(format!(
443 "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
444 )));
445 }
446 let confidence = confidence.clamp(0.0, 1.0);
447 let edge_type_str = edge_type.as_str();
448
449 let mut tx = zeph_db::begin(&self.pool).await?;
454
455 let existing: Option<(i64, f64, f64, f64)> = zeph_db::query_as(sql!(
456 "SELECT id, confidence, confidence_fast, confidence_slow FROM graph_edges
457 WHERE source_entity_id = ?
458 AND target_entity_id = ?
459 AND relation = ?
460 AND edge_type = ?
461 AND valid_to IS NULL
462 LIMIT 1"
463 ))
464 .bind(source_entity_id)
465 .bind(target_entity_id)
466 .bind(relation)
467 .bind(edge_type_str)
468 .fetch_optional(&mut *tx)
469 .await?;
470
471 if let Some((existing_id, stored_conf, stored_fast, stored_slow)) = existing {
472 let updated_conf = f64::from(confidence).max(stored_conf);
473 #[allow(clippy::cast_possible_truncation)]
477 let stored_fast_f32 = (stored_fast as f32).clamp(0.0, 1.0);
478 #[allow(clippy::cast_possible_truncation)]
479 let stored_slow_f32 = (stored_slow as f32).clamp(0.0, 1.0);
480 let new_fast = stored_fast_f32 + self.benna_fast_rate * (confidence - stored_fast_f32);
481 let new_slow = stored_slow_f32 + self.benna_slow_rate * (new_fast - stored_slow_f32);
482 zeph_db::query(sql!(
483 "UPDATE graph_edges SET confidence = ?, confidence_fast = ?, confidence_slow = ? WHERE id = ?"
484 ))
485 .bind(updated_conf)
486 .bind(f64::from(new_fast))
487 .bind(f64::from(new_slow))
488 .bind(existing_id)
489 .execute(&mut *tx)
490 .await?;
491 tx.commit().await?;
492 return Ok(existing_id);
493 }
494
495 let episode_raw: Option<i64> = episode_id.map(|m| m.0);
496 let id: i64 = zeph_db::query_scalar(sql!(
497 "INSERT INTO graph_edges
498 (source_entity_id, target_entity_id, relation, fact, confidence,
499 confidence_fast, confidence_slow, episode_id, edge_type)
500 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
501 RETURNING id"
502 ))
503 .bind(source_entity_id)
504 .bind(target_entity_id)
505 .bind(relation)
506 .bind(fact)
507 .bind(f64::from(confidence))
508 .bind(f64::from(confidence))
509 .bind(f64::from(confidence))
510 .bind(episode_raw)
511 .bind(edge_type_str)
512 .fetch_one(&mut *tx)
513 .await?;
514 tx.commit().await?;
515 Ok(id)
516 }
517
518 pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
524 zeph_db::query(sql!(
525 "UPDATE graph_edges SET valid_to = CURRENT_TIMESTAMP, expired_at = CURRENT_TIMESTAMP
526 WHERE id = ?"
527 ))
528 .bind(edge_id)
529 .execute(&self.pool)
530 .await?;
531 Ok(())
532 }
533
534 pub async fn invalidate_edge_with_supersession(
542 &self,
543 old_edge_id: i64,
544 new_edge_id: i64,
545 ) -> Result<(), MemoryError> {
546 zeph_db::query(sql!(
547 "UPDATE graph_edges
548 SET valid_to = CURRENT_TIMESTAMP,
549 expired_at = CURRENT_TIMESTAMP,
550 superseded_by = ?
551 WHERE id = ?"
552 ))
553 .bind(new_edge_id)
554 .bind(old_edge_id)
555 .execute(&self.pool)
556 .await?;
557 Ok(())
558 }
559
560 pub async fn edges_for_entities(
577 &self,
578 entity_ids: &[i64],
579 edge_types: &[super::types::EdgeType],
580 ) -> Result<Vec<Edge>, MemoryError> {
581 const MAX_BATCH_ENTITIES: usize = 490;
585
586 let mut all_edges: Vec<Edge> = Vec::new();
587
588 for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
589 let edges = self.query_batch_edges(chunk, edge_types).await?;
590 all_edges.extend(edges);
591 }
592
593 Ok(all_edges)
594 }
595
596 async fn query_batch_edges(
604 &self,
605 entity_ids: &[i64],
606 edge_types: &[super::types::EdgeType],
607 ) -> Result<Vec<Edge>, MemoryError> {
608 if entity_ids.is_empty() {
609 return Ok(Vec::new());
610 }
611
612 let n_ids = entity_ids.len();
615 let n_types = edge_types.len();
616
617 let sql = if n_types == 0 {
618 let placeholders = placeholder_list(1, n_ids);
620 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
621 format!(
622 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
623 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
624 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
625 FROM graph_edges
626 WHERE valid_to IS NULL
627 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))"
628 )
629 } else {
630 let placeholders = placeholder_list(1, n_ids);
631 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
632 let type_placeholders = placeholder_list(n_ids * 2 + 1, n_types);
633 format!(
634 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
635 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
636 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
637 FROM graph_edges
638 WHERE valid_to IS NULL
639 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))
640 AND edge_type IN ({type_placeholders})"
641 )
642 };
643
644 let mut query = zeph_db::query_as::<_, EdgeRow>(&sql);
646 for id in entity_ids {
647 query = query.bind(*id);
648 }
649 for id in entity_ids {
650 query = query.bind(*id);
651 }
652 for et in edge_types {
653 query = query.bind(et.as_str());
654 }
655
656 let rows: Vec<EdgeRow> = tokio::time::timeout(
660 std::time::Duration::from_millis(500),
661 query.fetch_all(&self.pool),
662 )
663 .await
664 .map_err(|_| MemoryError::Timeout("graph pool acquire timed out after 500ms".into()))??;
665 Ok(rows.into_iter().map(edge_from_row).collect())
666 }
667
668 pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
674 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
675 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
676 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
677 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
678 FROM graph_edges
679 WHERE valid_to IS NULL
680 AND (source_entity_id = ? OR target_entity_id = ?)"
681 ))
682 .bind(entity_id)
683 .bind(entity_id)
684 .fetch_all(&self.pool)
685 .await?;
686 Ok(rows.into_iter().map(edge_from_row).collect())
687 }
688
689 pub async fn edge_history_for_entity(
696 &self,
697 entity_id: i64,
698 limit: usize,
699 ) -> Result<Vec<Edge>, MemoryError> {
700 let limit = i64::try_from(limit)?;
701 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
702 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
703 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
704 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
705 FROM graph_edges
706 WHERE source_entity_id = ? OR target_entity_id = ?
707 ORDER BY valid_from DESC
708 LIMIT ?"
709 ))
710 .bind(entity_id)
711 .bind(entity_id)
712 .bind(limit)
713 .fetch_all(&self.pool)
714 .await?;
715 Ok(rows.into_iter().map(edge_from_row).collect())
716 }
717
718 pub async fn edges_between(
724 &self,
725 entity_a: i64,
726 entity_b: i64,
727 ) -> Result<Vec<Edge>, MemoryError> {
728 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
729 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
730 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
731 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
732 FROM graph_edges
733 WHERE valid_to IS NULL
734 AND ((source_entity_id = ? AND target_entity_id = ?)
735 OR (source_entity_id = ? AND target_entity_id = ?))"
736 ))
737 .bind(entity_a)
738 .bind(entity_b)
739 .bind(entity_b)
740 .bind(entity_a)
741 .fetch_all(&self.pool)
742 .await?;
743 Ok(rows.into_iter().map(edge_from_row).collect())
744 }
745
746 pub async fn edges_exact(
752 &self,
753 source_entity_id: i64,
754 target_entity_id: i64,
755 ) -> Result<Vec<Edge>, MemoryError> {
756 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
757 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
758 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
759 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
760 FROM graph_edges
761 WHERE valid_to IS NULL
762 AND source_entity_id = ?
763 AND target_entity_id = ?"
764 ))
765 .bind(source_entity_id)
766 .bind(target_entity_id)
767 .fetch_all(&self.pool)
768 .await?;
769 Ok(rows.into_iter().map(edge_from_row).collect())
770 }
771
772 pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
778 let count: i64 = zeph_db::query_scalar(sql!(
779 "SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL"
780 ))
781 .fetch_one(&self.pool)
782 .await?;
783 Ok(count)
784 }
785
786 pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
792 let rows: Vec<(String, i64)> = zeph_db::query_as(
793 sql!("SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type"),
794 )
795 .fetch_all(&self.pool)
796 .await?;
797 Ok(rows)
798 }
799
800 pub async fn upsert_community(
812 &self,
813 name: &str,
814 summary: &str,
815 entity_ids: &[i64],
816 fingerprint: Option<&str>,
817 ) -> Result<i64, MemoryError> {
818 let entity_ids_json = serde_json::to_string(entity_ids)?;
819 let id: i64 = zeph_db::query_scalar(sql!(
820 "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
821 VALUES (?, ?, ?, ?)
822 ON CONFLICT(name) DO UPDATE SET
823 summary = excluded.summary,
824 entity_ids = excluded.entity_ids,
825 fingerprint = COALESCE(excluded.fingerprint, fingerprint),
826 updated_at = CURRENT_TIMESTAMP
827 RETURNING id"
828 ))
829 .bind(name)
830 .bind(summary)
831 .bind(entity_ids_json)
832 .bind(fingerprint)
833 .fetch_one(&self.pool)
834 .await?;
835 Ok(id)
836 }
837
838 pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
845 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
846 "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL"
847 ))
848 .fetch_all(&self.pool)
849 .await?;
850 Ok(rows.into_iter().collect())
851 }
852
853 pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
859 zeph_db::query(sql!("DELETE FROM graph_communities WHERE id = ?"))
860 .bind(id)
861 .execute(&self.pool)
862 .await?;
863 Ok(())
864 }
865
866 pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
875 zeph_db::query(sql!(
876 "UPDATE graph_communities SET fingerprint = NULL WHERE id = ?"
877 ))
878 .bind(id)
879 .execute(&self.pool)
880 .await?;
881 Ok(())
882 }
883
884 pub async fn community_for_entity(
893 &self,
894 entity_id: i64,
895 ) -> Result<Option<Community>, MemoryError> {
896 let row: Option<CommunityRow> = zeph_db::query_as(
897 sql!("SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
898 FROM graph_communities c, json_each(c.entity_ids) j
899 WHERE CAST(j.value AS INTEGER) = ?
900 LIMIT 1"),
901 )
902 .bind(entity_id)
903 .fetch_optional(&self.pool)
904 .await?;
905 match row {
906 Some(row) => {
907 let raw_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
908 let entity_ids = raw_ids.into_iter().map(EntityId).collect();
909 Ok(Some(Community {
910 id: row.id,
911 name: row.name,
912 summary: row.summary,
913 entity_ids,
914 fingerprint: row.fingerprint,
915 created_at: row.created_at,
916 updated_at: row.updated_at,
917 }))
918 }
919 None => Ok(None),
920 }
921 }
922
923 pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
929 let rows: Vec<CommunityRow> = zeph_db::query_as(sql!(
930 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
931 FROM graph_communities
932 ORDER BY id ASC"
933 ))
934 .fetch_all(&self.pool)
935 .await?;
936
937 rows.into_iter()
938 .map(|row| {
939 let raw_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
940 let entity_ids = raw_ids.into_iter().map(EntityId).collect();
941 Ok(Community {
942 id: row.id,
943 name: row.name,
944 summary: row.summary,
945 entity_ids,
946 fingerprint: row.fingerprint,
947 created_at: row.created_at,
948 updated_at: row.updated_at,
949 })
950 })
951 .collect()
952 }
953
954 pub async fn community_count(&self) -> Result<i64, MemoryError> {
960 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_communities"))
961 .fetch_one(&self.pool)
962 .await?;
963 Ok(count)
964 }
965
966 pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
974 let val: Option<String> =
975 zeph_db::query_scalar(sql!("SELECT value FROM graph_metadata WHERE key = ?"))
976 .bind(key)
977 .fetch_optional(&self.pool)
978 .await?;
979 Ok(val)
980 }
981
982 pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
988 zeph_db::query(sql!(
989 "INSERT INTO graph_metadata (key, value) VALUES (?, ?)
990 ON CONFLICT(key) DO UPDATE SET value = excluded.value"
991 ))
992 .bind(key)
993 .bind(value)
994 .execute(&self.pool)
995 .await?;
996 Ok(())
997 }
998
999 pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
1007 let val = self.get_metadata("extraction_count").await?;
1008 Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
1009 }
1010
1011 pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
1013 use futures::StreamExt as _;
1014 zeph_db::query_as::<_, EdgeRow>(sql!(
1015 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1016 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1017 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
1018 FROM graph_edges
1019 WHERE valid_to IS NULL
1020 ORDER BY id ASC"
1021 ))
1022 .fetch(&self.pool)
1023 .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
1024 }
1025
1026 pub async fn edges_after_id(
1043 &self,
1044 after_id: i64,
1045 limit: i64,
1046 ) -> Result<Vec<Edge>, MemoryError> {
1047 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1048 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1049 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1050 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
1051 FROM graph_edges
1052 WHERE valid_to IS NULL AND id > ?
1053 ORDER BY id ASC
1054 LIMIT ?"
1055 ))
1056 .bind(after_id)
1057 .bind(limit)
1058 .fetch_all(&self.pool)
1059 .await?;
1060 Ok(rows.into_iter().map(edge_from_row).collect())
1061 }
1062
1063 pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
1069 let row: Option<CommunityRow> = zeph_db::query_as(sql!(
1070 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
1071 FROM graph_communities
1072 WHERE id = ?"
1073 ))
1074 .bind(id)
1075 .fetch_optional(&self.pool)
1076 .await?;
1077 match row {
1078 Some(row) => {
1079 let raw_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
1080 let entity_ids = raw_ids.into_iter().map(EntityId).collect();
1081 Ok(Some(Community {
1082 id: row.id,
1083 name: row.name,
1084 summary: row.summary,
1085 entity_ids,
1086 fingerprint: row.fingerprint,
1087 created_at: row.created_at,
1088 updated_at: row.updated_at,
1089 }))
1090 }
1091 None => Ok(None),
1092 }
1093 }
1094
1095 pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
1101 zeph_db::query(sql!("DELETE FROM graph_communities"))
1102 .execute(&self.pool)
1103 .await?;
1104 Ok(())
1105 }
1106
1107 pub async fn find_entities_ranked(
1121 &self,
1122 query: &str,
1123 limit: usize,
1124 ) -> Result<Vec<(Entity, f32)>, MemoryError> {
1125 let query = &query[..query.floor_char_boundary(512)];
1126 let Some(fts_query) = build_fts_query(query) else {
1127 return Ok(vec![]);
1128 };
1129
1130 let limit_i64 = i64::try_from(limit)?;
1131 let ranked_fts_sql = build_ranked_fts_sql();
1132 let rows: Vec<EntityFtsRow> = zeph_db::query_as(&ranked_fts_sql)
1133 .bind(&fts_query)
1134 .bind(format!(
1135 "%{}%",
1136 query
1137 .trim()
1138 .replace('\\', "\\\\")
1139 .replace('%', "\\%")
1140 .replace('_', "\\_")
1141 ))
1142 .bind(limit_i64)
1143 .fetch_all(&self.pool)
1144 .await?;
1145
1146 if rows.is_empty() {
1147 return Ok(vec![]);
1148 }
1149
1150 Ok(normalize_and_dedup(rows))
1151 }
1152
1153 pub async fn entity_structural_scores(
1163 &self,
1164 entity_ids: &[i64],
1165 ) -> Result<HashMap<i64, f32>, MemoryError> {
1166 const MAX_BATCH: usize = 163;
1169
1170 if entity_ids.is_empty() {
1171 return Ok(HashMap::new());
1172 }
1173
1174 let mut all_rows: Vec<(i64, i64, i64)> = Vec::new();
1175 for chunk in entity_ids.chunks(MAX_BATCH) {
1176 let n = chunk.len();
1177 let ph1 = placeholder_list(1, n);
1179 let ph2 = placeholder_list(n + 1, n);
1180 let ph3 = placeholder_list(n * 2 + 1, n);
1181
1182 let sql = format!(
1184 "SELECT entity_id,
1185 COUNT(*) AS degree,
1186 COUNT(DISTINCT edge_type) AS type_diversity
1187 FROM (
1188 SELECT source_entity_id AS entity_id, edge_type
1189 FROM graph_edges
1190 WHERE valid_to IS NULL AND source_entity_id IN ({ph1})
1191 UNION ALL
1192 SELECT target_entity_id AS entity_id, edge_type
1193 FROM graph_edges
1194 WHERE valid_to IS NULL AND target_entity_id IN ({ph2})
1195 )
1196 WHERE entity_id IN ({ph3})
1197 GROUP BY entity_id"
1198 );
1199
1200 let mut query = zeph_db::query_as::<_, (i64, i64, i64)>(&sql);
1201 for id in chunk {
1203 query = query.bind(*id);
1204 }
1205 for id in chunk {
1206 query = query.bind(*id);
1207 }
1208 for id in chunk {
1209 query = query.bind(*id);
1210 }
1211
1212 let chunk_rows: Vec<(i64, i64, i64)> = query.fetch_all(&self.pool).await?;
1213 all_rows.extend(chunk_rows);
1214 }
1215
1216 if all_rows.is_empty() {
1217 return Ok(entity_ids.iter().map(|&id| (id, 0.0_f32)).collect());
1218 }
1219
1220 let max_degree = all_rows
1221 .iter()
1222 .map(|(_, d, _)| *d)
1223 .max()
1224 .unwrap_or(1)
1225 .max(1);
1226
1227 let mut scores: HashMap<i64, f32> = entity_ids.iter().map(|&id| (id, 0.0_f32)).collect();
1228 for (entity_id, degree, type_diversity) in all_rows {
1229 #[allow(clippy::cast_precision_loss)]
1230 let norm_degree = degree as f32 / max_degree as f32;
1231 #[allow(clippy::cast_precision_loss)]
1232 let norm_diversity = (type_diversity as f32 / 4.0).min(1.0);
1233 let score = 0.6 * norm_degree + 0.4 * norm_diversity;
1234 scores.insert(entity_id, score);
1235 }
1236
1237 Ok(scores)
1238 }
1239
1240 #[cfg(any(feature = "sqlite", feature = "postgres"))]
1249 pub async fn entity_community_ids(
1250 &self,
1251 entity_ids: &[i64],
1252 ) -> Result<HashMap<i64, i64>, MemoryError> {
1253 const MAX_BATCH: usize = 490;
1254
1255 if entity_ids.is_empty() {
1256 return Ok(HashMap::new());
1257 }
1258
1259 let mut result: HashMap<i64, i64> = HashMap::new();
1260 for chunk in entity_ids.chunks(MAX_BATCH) {
1261 let placeholders = placeholder_list(1, chunk.len());
1262
1263 let community_sql = community_ids_sql(&placeholders);
1264 let mut query = zeph_db::query_as::<_, (i64, i64)>(&community_sql);
1265 for id in chunk {
1266 query = query.bind(*id);
1267 }
1268
1269 let rows: Vec<(i64, i64)> = query.fetch_all(&self.pool).await?;
1270 result.extend(rows);
1271 }
1272
1273 Ok(result)
1274 }
1275
1276 pub async fn record_edge_retrieval(&self, edge_ids: &[i64]) -> Result<(), MemoryError> {
1285 const MAX_BATCH: usize = 490;
1286 let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1287 for chunk in edge_ids.chunks(MAX_BATCH) {
1288 let edge_placeholders = placeholder_list(1, chunk.len());
1289 let retrieval_sql = format!(
1290 "UPDATE graph_edges \
1291 SET retrieval_count = retrieval_count + 1, \
1292 last_retrieved_at = {epoch_now} \
1293 WHERE id IN ({edge_placeholders})"
1294 );
1295 let mut q = zeph_db::query(&retrieval_sql);
1296 for id in chunk {
1297 q = q.bind(*id);
1298 }
1299 q.execute(&self.pool).await?;
1300 }
1301 Ok(())
1302 }
1303
1304 #[tracing::instrument(
1316 name = "memory.graph.hebbian_increment",
1317 skip_all,
1318 fields(edge_count = edge_ids.len())
1319 )]
1320 pub async fn apply_hebbian_increment(
1321 &self,
1322 edge_ids: &[i64],
1323 delta: f32,
1324 ) -> Result<(), MemoryError> {
1325 const MAX_BATCH: usize = 490;
1328 if edge_ids.is_empty() || delta == 0.0 {
1329 return Ok(());
1330 }
1331 for chunk in edge_ids.chunks(MAX_BATCH) {
1332 let edge_placeholders = placeholder_list(2, chunk.len());
1333 let sql = format!(
1334 "UPDATE graph_edges \
1335 SET weight = weight + $1 \
1336 WHERE id IN ({edge_placeholders}) \
1337 AND valid_to IS NULL"
1338 );
1339 let mut q = zeph_db::query(&sql);
1340 q = q.bind(f64::from(delta));
1341 for id in chunk {
1342 q = q.bind(*id);
1343 }
1344 q.execute(&self.pool).await?;
1345 }
1346 Ok(())
1347 }
1348
1349 pub async fn entity_ids_in(&self, ids: &[i64]) -> Result<Vec<i64>, MemoryError> {
1358 const MAX_BATCH: usize = 490;
1359 if ids.is_empty() {
1360 return Ok(Vec::new());
1361 }
1362 let mut result = Vec::with_capacity(ids.len());
1364 for chunk in ids.chunks(MAX_BATCH) {
1365 let placeholders = zeph_db::placeholder_list(1, chunk.len());
1366 let sql = format!("SELECT id FROM graph_entities WHERE id IN ({placeholders})");
1367 let mut q = zeph_db::query_as::<_, (i64,)>(&sql);
1368 for id in chunk {
1369 q = q.bind(*id);
1370 }
1371 let rows = q.fetch_all(&self.pool).await?;
1372 for (id,) in rows {
1373 result.push(id);
1374 }
1375 }
1376 Ok(result)
1377 }
1378
1379 pub async fn qdrant_point_ids_for_entities(
1387 &self,
1388 entity_ids: &[i64],
1389 ) -> Result<HashMap<i64, String>, MemoryError> {
1390 const MAX_BATCH: usize = 490;
1392 if entity_ids.is_empty() {
1393 return Ok(HashMap::new());
1394 }
1395 let mut result: HashMap<i64, String> = HashMap::with_capacity(entity_ids.len());
1396 for chunk in entity_ids.chunks(MAX_BATCH) {
1397 let placeholders = zeph_db::placeholder_list(1, chunk.len());
1398 let sql = format!(
1399 "SELECT id, qdrant_point_id \
1400 FROM graph_entities \
1401 WHERE id IN ({placeholders}) \
1402 AND qdrant_point_id IS NOT NULL"
1403 );
1404 let mut q = zeph_db::query_as::<_, (i64, String)>(&sql);
1405 for id in chunk {
1406 q = q.bind(*id);
1407 }
1408 let rows = q.fetch_all(&self.pool).await?;
1409 for (entity_id, point_id) in rows {
1410 result.insert(entity_id, point_id);
1411 }
1412 }
1413 Ok(result)
1414 }
1415
1416 pub async fn decay_edge_retrieval_counts(
1425 &self,
1426 decay_lambda: f64,
1427 interval_secs: u64,
1428 ) -> Result<usize, MemoryError> {
1429 let epoch_now_decay = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1430 let decay_raw = format!(
1431 "UPDATE graph_edges \
1432 SET retrieval_count = MAX(CAST(retrieval_count * ? AS INTEGER), 0) \
1433 WHERE valid_to IS NULL \
1434 AND retrieval_count > 0 \
1435 AND (last_retrieved_at IS NULL OR last_retrieved_at < {epoch_now_decay} - ?)"
1436 );
1437 let decay_sql = zeph_db::rewrite_placeholders(&decay_raw);
1438 let result = zeph_db::query(&decay_sql)
1439 .bind(decay_lambda)
1440 .bind(i64::try_from(interval_secs).unwrap_or(i64::MAX))
1441 .execute(&self.pool)
1442 .await?;
1443 Ok(usize::try_from(result.rows_affected())?)
1444 }
1445
1446 pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
1452 let days = i64::from(retention_days);
1453 let result = zeph_db::query(sql!(
1454 "DELETE FROM graph_edges
1455 WHERE expired_at IS NOT NULL
1456 AND expired_at < datetime('now', '-' || ? || ' days')"
1457 ))
1458 .bind(days)
1459 .execute(&self.pool)
1460 .await?;
1461 Ok(usize::try_from(result.rows_affected())?)
1462 }
1463
1464 pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
1470 let days = i64::from(retention_days);
1471 let result = zeph_db::query(sql!(
1472 "DELETE FROM graph_entities
1473 WHERE id NOT IN (
1474 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
1475 UNION
1476 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
1477 )
1478 AND last_seen_at < datetime('now', '-' || ? || ' days')"
1479 ))
1480 .bind(days)
1481 .execute(&self.pool)
1482 .await?;
1483 Ok(usize::try_from(result.rows_affected())?)
1484 }
1485
1486 pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1495 let current = self.entity_count().await?;
1496 let max = i64::try_from(max_entities)?;
1497 if current <= max {
1498 return Ok(0);
1499 }
1500 let excess = current - max;
1501 let result = zeph_db::query(sql!(
1502 "DELETE FROM graph_entities
1503 WHERE id IN (
1504 SELECT e.id
1505 FROM graph_entities e
1506 LEFT JOIN (
1507 SELECT source_entity_id AS eid, COUNT(*) AS cnt
1508 FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1509 UNION ALL
1510 SELECT target_entity_id AS eid, COUNT(*) AS cnt
1511 FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1512 ) edge_counts ON e.id = edge_counts.eid
1513 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1514 LIMIT ?
1515 )"
1516 ))
1517 .bind(excess)
1518 .execute(&self.pool)
1519 .await?;
1520 Ok(usize::try_from(result.rows_affected())?)
1521 }
1522
1523 pub async fn edges_at_timestamp(
1537 &self,
1538 entity_id: i64,
1539 timestamp: &str,
1540 ) -> Result<Vec<Edge>, MemoryError> {
1541 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1545 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1546 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1547 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
1548 FROM graph_edges
1549 WHERE valid_to IS NULL
1550 AND valid_from <= ?
1551 AND (source_entity_id = ? OR target_entity_id = ?)
1552 UNION ALL
1553 SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1554 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1555 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
1556 FROM graph_edges
1557 WHERE valid_to IS NOT NULL
1558 AND valid_from <= ?
1559 AND valid_to > ?
1560 AND (source_entity_id = ? OR target_entity_id = ?)"
1561 ))
1562 .bind(timestamp)
1563 .bind(entity_id)
1564 .bind(entity_id)
1565 .bind(timestamp)
1566 .bind(timestamp)
1567 .bind(entity_id)
1568 .bind(entity_id)
1569 .fetch_all(&self.pool)
1570 .await?;
1571 Ok(rows.into_iter().map(edge_from_row).collect())
1572 }
1573
1574 pub async fn edge_history(
1583 &self,
1584 source_entity_id: i64,
1585 predicate: &str,
1586 relation: Option<&str>,
1587 limit: usize,
1588 ) -> Result<Vec<Edge>, MemoryError> {
1589 let escaped = predicate
1591 .replace('\\', "\\\\")
1592 .replace('%', "\\%")
1593 .replace('_', "\\_");
1594 let like_pattern = format!("%{escaped}%");
1595 let limit = i64::try_from(limit)?;
1596 let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1597 zeph_db::query_as(sql!(
1598 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1599 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1600 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
1601 FROM graph_edges
1602 WHERE source_entity_id = ?
1603 AND fact LIKE ? ESCAPE '\\'
1604 AND relation = ?
1605 ORDER BY valid_from DESC
1606 LIMIT ?"
1607 ))
1608 .bind(source_entity_id)
1609 .bind(&like_pattern)
1610 .bind(rel)
1611 .bind(limit)
1612 .fetch_all(&self.pool)
1613 .await?
1614 } else {
1615 zeph_db::query_as(sql!(
1616 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1617 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1618 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
1619 FROM graph_edges
1620 WHERE source_entity_id = ?
1621 AND fact LIKE ? ESCAPE '\\'
1622 ORDER BY valid_from DESC
1623 LIMIT ?"
1624 ))
1625 .bind(source_entity_id)
1626 .bind(&like_pattern)
1627 .bind(limit)
1628 .fetch_all(&self.pool)
1629 .await?
1630 };
1631 Ok(rows.into_iter().map(edge_from_row).collect())
1632 }
1633
1634 pub async fn bfs(
1651 &self,
1652 start_entity_id: i64,
1653 max_hops: u32,
1654 ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1655 self.bfs_with_depth(start_entity_id, max_hops)
1656 .await
1657 .map(|(e, ed, _)| (e, ed))
1658 }
1659
1660 pub async fn bfs_with_depth(
1671 &self,
1672 start_entity_id: i64,
1673 max_hops: u32,
1674 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1675 self.bfs_core(start_entity_id, max_hops, None).await
1676 }
1677
1678 pub async fn bfs_at_timestamp(
1689 &self,
1690 start_entity_id: i64,
1691 max_hops: u32,
1692 timestamp: &str,
1693 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1694 self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1695 .await
1696 }
1697
1698 pub async fn bfs_typed(
1714 &self,
1715 start_entity_id: i64,
1716 max_hops: u32,
1717 edge_types: &[EdgeType],
1718 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1719 if edge_types.is_empty() {
1720 return self.bfs_with_depth(start_entity_id, max_hops).await;
1721 }
1722 self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1723 .await
1724 }
1725
1726 async fn bfs_core(
1734 &self,
1735 start_entity_id: i64,
1736 max_hops: u32,
1737 at_timestamp: Option<&str>,
1738 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1739 use std::collections::HashMap;
1740
1741 const MAX_FRONTIER: usize = 300;
1744
1745 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1746 let mut frontier: Vec<i64> = vec![start_entity_id];
1747 depth_map.insert(start_entity_id, 0);
1748
1749 for hop in 0..max_hops {
1750 if frontier.is_empty() {
1751 break;
1752 }
1753 frontier.truncate(MAX_FRONTIER);
1754 let n = frontier.len();
1758 let ph1 = placeholder_list(1, n);
1759 let ph2 = placeholder_list(n + 1, n);
1760 let ph3 = placeholder_list(n * 2 + 1, n);
1761 let edge_filter = if at_timestamp.is_some() {
1762 let ts_pos = n * 3 + 1;
1763 format!(
1764 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1765 ts = numbered_placeholder(ts_pos),
1766 )
1767 } else {
1768 "valid_to IS NULL".to_owned()
1769 };
1770 let neighbour_sql = format!(
1771 "SELECT DISTINCT CASE
1772 WHEN source_entity_id IN ({ph1}) THEN target_entity_id
1773 ELSE source_entity_id
1774 END as neighbour_id
1775 FROM graph_edges
1776 WHERE {edge_filter}
1777 AND (source_entity_id IN ({ph2}) OR target_entity_id IN ({ph3}))"
1778 );
1779 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1780 for id in &frontier {
1781 q = q.bind(*id);
1782 }
1783 for id in &frontier {
1784 q = q.bind(*id);
1785 }
1786 for id in &frontier {
1787 q = q.bind(*id);
1788 }
1789 if let Some(ts) = at_timestamp {
1790 q = q.bind(ts);
1791 }
1792 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1793 let mut next_frontier: Vec<i64> = Vec::new();
1794 for nbr in neighbours {
1795 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1796 e.insert(hop + 1);
1797 next_frontier.push(nbr);
1798 }
1799 }
1800 frontier = next_frontier;
1801 }
1802
1803 self.bfs_fetch_results(depth_map, at_timestamp).await
1804 }
1805
1806 async fn bfs_core_typed(
1815 &self,
1816 start_entity_id: i64,
1817 max_hops: u32,
1818 at_timestamp: Option<&str>,
1819 edge_types: &[EdgeType],
1820 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1821 use std::collections::HashMap;
1822
1823 const MAX_FRONTIER: usize = 300;
1824
1825 let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1826
1827 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1828 let mut frontier: Vec<i64> = vec![start_entity_id];
1829 depth_map.insert(start_entity_id, 0);
1830
1831 let n_types = type_strs.len();
1832 let type_in = placeholder_list(1, n_types);
1834 let id_start = n_types + 1;
1835
1836 for hop in 0..max_hops {
1837 if frontier.is_empty() {
1838 break;
1839 }
1840 frontier.truncate(MAX_FRONTIER);
1841
1842 let n_frontier = frontier.len();
1843 let fp1 = placeholder_list(id_start, n_frontier);
1845 let fp2 = placeholder_list(id_start + n_frontier, n_frontier);
1846 let fp3 = placeholder_list(id_start + n_frontier * 2, n_frontier);
1847
1848 let edge_filter = if at_timestamp.is_some() {
1849 let ts_pos = id_start + n_frontier * 3;
1850 format!(
1851 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1852 ts = numbered_placeholder(ts_pos),
1853 )
1854 } else {
1855 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1856 };
1857
1858 let neighbour_sql = format!(
1859 "SELECT DISTINCT CASE
1860 WHEN source_entity_id IN ({fp1}) THEN target_entity_id
1861 ELSE source_entity_id
1862 END as neighbour_id
1863 FROM graph_edges
1864 WHERE {edge_filter}
1865 AND (source_entity_id IN ({fp2}) OR target_entity_id IN ({fp3}))"
1866 );
1867
1868 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1869 for t in &type_strs {
1871 q = q.bind(*t);
1872 }
1873 for id in &frontier {
1875 q = q.bind(*id);
1876 }
1877 for id in &frontier {
1878 q = q.bind(*id);
1879 }
1880 for id in &frontier {
1881 q = q.bind(*id);
1882 }
1883 if let Some(ts) = at_timestamp {
1884 q = q.bind(ts);
1885 }
1886
1887 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1888 let mut next_frontier: Vec<i64> = Vec::new();
1889 for nbr in neighbours {
1890 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1891 e.insert(hop + 1);
1892 next_frontier.push(nbr);
1893 }
1894 }
1895 frontier = next_frontier;
1896 }
1897
1898 self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1900 .await
1901 }
1902
1903 async fn bfs_fetch_results_typed(
1911 &self,
1912 depth_map: std::collections::HashMap<i64, u32>,
1913 at_timestamp: Option<&str>,
1914 type_strs: &[&str],
1915 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1916 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1917 if visited_ids.is_empty() {
1918 return Ok((Vec::new(), Vec::new(), depth_map));
1919 }
1920 if visited_ids.len() > 499 {
1921 tracing::warn!(
1922 total = visited_ids.len(),
1923 retained = 499,
1924 "bfs_fetch_results_typed: visited entity set truncated to 499"
1925 );
1926 visited_ids.truncate(499);
1927 }
1928
1929 let n_types = type_strs.len();
1930 let n_visited = visited_ids.len();
1931
1932 let type_in = placeholder_list(1, n_types);
1934 let id_start = n_types + 1;
1935 let ph_ids1 = placeholder_list(id_start, n_visited);
1936 let ph_ids2 = placeholder_list(id_start + n_visited, n_visited);
1937
1938 let edge_filter = if at_timestamp.is_some() {
1939 let ts_pos = id_start + n_visited * 2;
1940 format!(
1941 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1942 ts = numbered_placeholder(ts_pos),
1943 )
1944 } else {
1945 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1946 };
1947
1948 let edge_sql = format!(
1949 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1950 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1951 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
1952 FROM graph_edges
1953 WHERE {edge_filter}
1954 AND source_entity_id IN ({ph_ids1})
1955 AND target_entity_id IN ({ph_ids2})"
1956 );
1957 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1958 for t in type_strs {
1959 edge_query = edge_query.bind(*t);
1960 }
1961 for id in &visited_ids {
1962 edge_query = edge_query.bind(*id);
1963 }
1964 for id in &visited_ids {
1965 edge_query = edge_query.bind(*id);
1966 }
1967 if let Some(ts) = at_timestamp {
1968 edge_query = edge_query.bind(ts);
1969 }
1970 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1971
1972 let entity_sql2 = format!(
1974 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1975 FROM graph_entities WHERE id IN ({ph})",
1976 ph = placeholder_list(1, visited_ids.len()),
1977 );
1978 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql2);
1979 for id in &visited_ids {
1980 entity_query = entity_query.bind(*id);
1981 }
1982 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1983
1984 let entities: Vec<Entity> = entity_rows
1985 .into_iter()
1986 .map(entity_from_row)
1987 .collect::<Result<Vec<_>, _>>()?;
1988 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1989
1990 Ok((entities, edges, depth_map))
1991 }
1992
1993 async fn bfs_fetch_results(
1995 &self,
1996 depth_map: std::collections::HashMap<i64, u32>,
1997 at_timestamp: Option<&str>,
1998 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1999 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
2000 if visited_ids.is_empty() {
2001 return Ok((Vec::new(), Vec::new(), depth_map));
2002 }
2003 if visited_ids.len() > 499 {
2005 tracing::warn!(
2006 total = visited_ids.len(),
2007 retained = 499,
2008 "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
2009 some reachable entities will be dropped from results"
2010 );
2011 visited_ids.truncate(499);
2012 }
2013
2014 let n = visited_ids.len();
2015 let ph_ids1 = placeholder_list(1, n);
2016 let ph_ids2 = placeholder_list(n + 1, n);
2017 let edge_filter = if at_timestamp.is_some() {
2018 let ts_pos = n * 2 + 1;
2019 format!(
2020 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
2021 ts = numbered_placeholder(ts_pos),
2022 )
2023 } else {
2024 "valid_to IS NULL".to_owned()
2025 };
2026 let edge_sql = format!(
2027 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
2028 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
2029 edge_type, retrieval_count, last_retrieved_at, superseded_by, canonical_relation, supersedes, weight, confidence_fast, confidence_slow, turn_index
2030 FROM graph_edges
2031 WHERE {edge_filter}
2032 AND source_entity_id IN ({ph_ids1})
2033 AND target_entity_id IN ({ph_ids2})"
2034 );
2035 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
2036 for id in &visited_ids {
2037 edge_query = edge_query.bind(*id);
2038 }
2039 for id in &visited_ids {
2040 edge_query = edge_query.bind(*id);
2041 }
2042 if let Some(ts) = at_timestamp {
2043 edge_query = edge_query.bind(ts);
2044 }
2045 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
2046
2047 let entity_sql = format!(
2048 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
2049 FROM graph_entities WHERE id IN ({ph})",
2050 ph = placeholder_list(1, n),
2051 );
2052 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql);
2053 for id in &visited_ids {
2054 entity_query = entity_query.bind(*id);
2055 }
2056 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
2057
2058 let entities: Vec<Entity> = entity_rows
2059 .into_iter()
2060 .map(entity_from_row)
2061 .collect::<Result<Vec<_>, _>>()?;
2062 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
2063
2064 Ok((entities, edges, depth_map))
2065 }
2066
2067 pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
2083 let find_by_name_sql = format!(
2084 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id \
2085 FROM graph_entities \
2086 WHERE name = ? {cn} OR canonical_name = ? {cn} \
2087 LIMIT 5",
2088 cn = <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
2089 );
2090 let rows: Vec<EntityRow> = zeph_db::query_as(&find_by_name_sql)
2091 .bind(name)
2092 .bind(name)
2093 .fetch_all(&self.pool)
2094 .await?;
2095
2096 if !rows.is_empty() {
2097 return rows.into_iter().map(entity_from_row).collect();
2098 }
2099
2100 self.find_entities_fuzzy(name, 5).await
2101 }
2102
2103 pub async fn unprocessed_messages_for_backfill(
2111 &self,
2112 limit: usize,
2113 ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
2114 let limit = i64::try_from(limit)?;
2115 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
2116 "SELECT id, content FROM messages
2117 WHERE graph_processed = 0
2118 ORDER BY id ASC
2119 LIMIT ?"
2120 ))
2121 .bind(limit)
2122 .fetch_all(&self.pool)
2123 .await?;
2124 Ok(rows
2125 .into_iter()
2126 .map(|(id, content)| (crate::types::MessageId(id), content))
2127 .collect())
2128 }
2129
2130 pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
2136 let count: i64 = zeph_db::query_scalar(sql!(
2137 "SELECT COUNT(*) FROM messages WHERE graph_processed = 0"
2138 ))
2139 .fetch_one(&self.pool)
2140 .await?;
2141 Ok(count)
2142 }
2143
2144 pub async fn mark_messages_graph_processed(
2150 &self,
2151 ids: &[crate::types::MessageId],
2152 ) -> Result<(), MemoryError> {
2153 const MAX_BATCH: usize = 490;
2154 if ids.is_empty() {
2155 return Ok(());
2156 }
2157 for chunk in ids.chunks(MAX_BATCH) {
2158 let placeholders = placeholder_list(1, chunk.len());
2159 let sql =
2160 format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
2161 let mut query = zeph_db::query(&sql);
2162 for id in chunk {
2163 query = query.bind(id.0);
2164 }
2165 query.execute(&self.pool).await?;
2166 }
2167 Ok(())
2168 }
2169}
2170
2171#[cfg(all(feature = "sqlite", not(feature = "postgres")))]
2174fn community_ids_sql(placeholders: &str) -> String {
2175 format!(
2176 "SELECT CAST(j.value AS INTEGER) AS entity_id, c.id AS community_id
2177 FROM graph_communities c, json_each(c.entity_ids) j
2178 WHERE CAST(j.value AS INTEGER) IN ({placeholders})"
2179 )
2180}
2181
2182#[cfg(feature = "postgres")]
2183fn community_ids_sql(placeholders: &str) -> String {
2184 format!(
2185 "SELECT (j.value)::bigint AS entity_id, c.id AS community_id
2186 FROM graph_communities c,
2187 jsonb_array_elements_text(c.entity_ids::jsonb) j(value)
2188 WHERE (j.value)::bigint IN ({placeholders})"
2189 )
2190}
2191
2192#[derive(zeph_db::FromRow)]
2195struct EntityRow {
2196 id: i64,
2197 name: String,
2198 canonical_name: String,
2199 entity_type: String,
2200 summary: Option<String>,
2201 first_seen_at: String,
2202 last_seen_at: String,
2203 qdrant_point_id: Option<String>,
2204}
2205
2206fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
2207 let entity_type = row
2208 .entity_type
2209 .parse::<EntityType>()
2210 .map_err(MemoryError::GraphStore)?;
2211 Ok(Entity {
2212 id: EntityId(row.id),
2213 name: row.name,
2214 canonical_name: row.canonical_name,
2215 entity_type,
2216 summary: row.summary,
2217 first_seen_at: row.first_seen_at,
2218 last_seen_at: row.last_seen_at,
2219 qdrant_point_id: row.qdrant_point_id,
2220 })
2221}
2222
2223#[derive(zeph_db::FromRow)]
2224struct AliasRow {
2225 id: i64,
2226 entity_id: i64,
2227 alias_name: String,
2228 created_at: String,
2229}
2230
2231fn alias_from_row(row: AliasRow) -> EntityAlias {
2232 EntityAlias {
2233 id: row.id,
2234 entity_id: EntityId(row.entity_id),
2235 alias_name: row.alias_name,
2236 created_at: row.created_at,
2237 }
2238}
2239
2240#[derive(zeph_db::FromRow)]
2241struct EdgeRow {
2242 id: i64,
2243 source_entity_id: i64,
2244 target_entity_id: i64,
2245 relation: String,
2246 fact: String,
2247 confidence: f64,
2248 valid_from: String,
2249 valid_to: Option<String>,
2250 created_at: String,
2251 expired_at: Option<String>,
2252 #[sqlx(rename = "episode_id")]
2253 source_message_id: Option<i64>,
2254 qdrant_point_id: Option<String>,
2255 edge_type: String,
2256 retrieval_count: i32,
2257 last_retrieved_at: Option<i64>,
2258 superseded_by: Option<i64>,
2259 canonical_relation: Option<String>,
2260 supersedes: Option<i64>,
2261 weight: f64,
2263 confidence_fast: f64,
2265 confidence_slow: f64,
2266 turn_index: Option<i64>,
2267}
2268
2269fn edge_from_row(row: EdgeRow) -> Edge {
2270 let edge_type = row
2271 .edge_type
2272 .parse::<EdgeType>()
2273 .unwrap_or(EdgeType::Semantic);
2274 let canonical_relation = row
2275 .canonical_relation
2276 .unwrap_or_else(|| row.relation.clone());
2277 Edge {
2278 id: row.id,
2279 source_entity_id: row.source_entity_id,
2280 target_entity_id: row.target_entity_id,
2281 canonical_relation,
2282 relation: row.relation,
2283 fact: row.fact,
2284 #[allow(clippy::cast_possible_truncation)]
2285 confidence: row.confidence as f32,
2286 valid_from: row.valid_from,
2287 valid_to: row.valid_to,
2288 created_at: row.created_at,
2289 expired_at: row.expired_at,
2290 source_message_id: row.source_message_id.map(MessageId),
2291 qdrant_point_id: row.qdrant_point_id,
2292 edge_type,
2293 retrieval_count: row.retrieval_count,
2294 last_retrieved_at: row.last_retrieved_at,
2295 superseded_by: row.superseded_by,
2296 supersedes: row.supersedes,
2297 #[allow(clippy::cast_possible_truncation)]
2298 weight: row.weight as f32,
2299 #[allow(clippy::cast_possible_truncation)]
2300 confidence_fast: row.confidence_fast as f32,
2301 #[allow(clippy::cast_possible_truncation)]
2302 confidence_slow: row.confidence_slow as f32,
2303 #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
2304 turn_index: row.turn_index.map(|v| v as u32),
2305 }
2306}
2307
2308#[derive(zeph_db::FromRow)]
2309struct CommunityRow {
2310 id: i64,
2311 name: String,
2312 summary: String,
2313 entity_ids: String,
2314 fingerprint: Option<String>,
2315 created_at: String,
2316 updated_at: String,
2317}
2318
2319impl GraphStore {
2322 pub async fn ensure_episode(&self, conversation_id: i64) -> Result<i64, MemoryError> {
2330 zeph_db::query(sql!(
2334 "INSERT INTO conversations (id) VALUES (?)
2335 ON CONFLICT (id) DO NOTHING"
2336 ))
2337 .bind(conversation_id)
2338 .execute(&self.pool)
2339 .await?;
2340
2341 let id: i64 = zeph_db::query_scalar(sql!(
2342 "INSERT INTO graph_episodes (conversation_id)
2343 VALUES (?)
2344 ON CONFLICT(conversation_id) DO UPDATE SET conversation_id = excluded.conversation_id
2345 RETURNING id"
2346 ))
2347 .bind(conversation_id)
2348 .fetch_one(&self.pool)
2349 .await?;
2350 Ok(id)
2351 }
2352
2353 pub async fn link_entity_to_episode(
2361 &self,
2362 episode_id: i64,
2363 entity_id: i64,
2364 ) -> Result<(), MemoryError> {
2365 zeph_db::query(sql!(
2366 "INSERT INTO graph_episode_entities (episode_id, entity_id)
2367 VALUES (?, ?)
2368 ON CONFLICT (episode_id, entity_id) DO NOTHING"
2369 ))
2370 .bind(episode_id)
2371 .bind(entity_id)
2372 .execute(&self.pool)
2373 .await?;
2374 Ok(())
2375 }
2376
2377 pub async fn episodes_for_entity(
2383 &self,
2384 entity_id: i64,
2385 ) -> Result<Vec<super::types::Episode>, MemoryError> {
2386 #[derive(zeph_db::FromRow)]
2387 struct EpisodeRow {
2388 id: i64,
2389 conversation_id: i64,
2390 created_at: String,
2391 closed_at: Option<String>,
2392 }
2393 let rows: Vec<EpisodeRow> = zeph_db::query_as(sql!(
2394 "SELECT e.id, e.conversation_id, e.created_at, e.closed_at
2395 FROM graph_episodes e
2396 JOIN graph_episode_entities ee ON ee.episode_id = e.id
2397 WHERE ee.entity_id = ?"
2398 ))
2399 .bind(entity_id)
2400 .fetch_all(&self.pool)
2401 .await?;
2402 Ok(rows
2403 .into_iter()
2404 .map(|r| super::types::Episode {
2405 id: r.id,
2406 conversation_id: r.conversation_id,
2407 created_at: r.created_at,
2408 closed_at: r.closed_at,
2409 })
2410 .collect())
2411 }
2412
2413 #[allow(clippy::too_many_arguments)]
2432 #[tracing::instrument(name = "memory.graph.insert_or_supersede", skip_all)]
2435 pub async fn insert_or_supersede_with_metrics(
2436 &self,
2437 source_entity_id: i64,
2438 target_entity_id: i64,
2439 relation: &str,
2440 canonical_relation: &str,
2441 fact: &str,
2442 confidence: f32,
2443 episode_id: Option<MessageId>,
2444 edge_type: EdgeType,
2445 set_supersedes: bool,
2446 metrics: Option<&ApexMetrics>,
2447 ) -> Result<i64, MemoryError> {
2448 self.insert_or_supersede_with_turn_index_and_metrics(
2449 source_entity_id,
2450 target_entity_id,
2451 relation,
2452 canonical_relation,
2453 fact,
2454 confidence,
2455 episode_id,
2456 edge_type,
2457 set_supersedes,
2458 metrics,
2459 None,
2460 )
2461 .await
2462 }
2463
2464 #[allow(clippy::too_many_arguments)]
2473 pub async fn insert_or_supersede_with_turn_index_and_metrics(
2474 &self,
2475 source_entity_id: i64,
2476 target_entity_id: i64,
2477 relation: &str,
2478 canonical_relation: &str,
2479 fact: &str,
2480 confidence: f32,
2481 episode_id: Option<MessageId>,
2482 edge_type: EdgeType,
2483 set_supersedes: bool,
2484 metrics: Option<&ApexMetrics>,
2485 turn_index: Option<u32>,
2486 ) -> Result<i64, MemoryError> {
2487 if source_entity_id == target_entity_id {
2488 return Err(MemoryError::InvalidInput(format!(
2489 "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
2490 )));
2491 }
2492 let confidence = confidence.clamp(0.0, 1.0);
2493 let edge_type_str = edge_type.as_str();
2494 let episode_raw: Option<i64> = episode_id.map(|m| m.0);
2495
2496 let mut tx = zeph_db::begin(&self.pool).await?;
2497
2498 if let Some(existing_id) = find_identical_active_edge(
2499 &mut tx,
2500 source_entity_id,
2501 target_entity_id,
2502 canonical_relation,
2503 edge_type_str,
2504 fact,
2505 )
2506 .await?
2507 {
2508 record_reassertion(
2509 &mut tx,
2510 existing_id,
2511 episode_raw,
2512 confidence,
2513 self.benna_fast_rate,
2514 self.benna_slow_rate,
2515 )
2516 .await?;
2517 tx.commit().await?;
2518 return Ok(existing_id);
2519 }
2520
2521 let prior_head =
2522 find_prior_active_head(&mut tx, source_entity_id, canonical_relation, edge_type_str)
2523 .await?;
2524
2525 if let Some(head_id) = prior_head {
2528 check_supersede_depth_in_tx(&mut tx, head_id).await?;
2529 }
2530
2531 if let Some(head_id) = prior_head {
2535 expire_prior_head(&mut tx, head_id).await?;
2536 }
2537
2538 let supersedes_val: Option<i64> = if set_supersedes { prior_head } else { None };
2539 let turn_index_raw: Option<i64> = turn_index.map(i64::from);
2540 let new_id = insert_new_edge(
2541 &mut tx,
2542 source_entity_id,
2543 target_entity_id,
2544 relation,
2545 canonical_relation,
2546 fact,
2547 confidence,
2548 episode_raw,
2549 edge_type_str,
2550 supersedes_val,
2551 turn_index_raw,
2552 )
2553 .await?;
2554
2555 if let Some(head_id) = prior_head {
2556 set_superseded_by(&mut tx, head_id, new_id).await?;
2557 if let Some(m) = metrics {
2558 m.supersedes_total
2559 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2560 }
2561 }
2562
2563 tx.commit().await?;
2564 Ok(new_id)
2565 }
2566
2567 #[allow(clippy::too_many_arguments)] pub async fn insert_or_supersede(
2574 &self,
2575 source_entity_id: i64,
2576 target_entity_id: i64,
2577 relation: &str,
2578 canonical_relation: &str,
2579 fact: &str,
2580 confidence: f32,
2581 episode_id: Option<MessageId>,
2582 edge_type: EdgeType,
2583 set_supersedes: bool,
2584 ) -> Result<i64, MemoryError> {
2585 self.insert_or_supersede_with_metrics(
2586 source_entity_id,
2587 target_entity_id,
2588 relation,
2589 canonical_relation,
2590 fact,
2591 confidence,
2592 episode_id,
2593 edge_type,
2594 set_supersedes,
2595 None,
2596 )
2597 .await
2598 }
2599
2600 #[allow(clippy::too_many_arguments)] pub async fn insert_or_supersede_with_conflict_detection(
2613 &self,
2614 source_entity_id: i64,
2615 target_entity_id: i64,
2616 relation: &str,
2617 canonical_relation: &str,
2618 fact: &str,
2619 confidence: f32,
2620 episode_id: Option<MessageId>,
2621 edge_type: EdgeType,
2622 set_supersedes: bool,
2623 metrics: Option<&ApexMetrics>,
2624 detector: Option<&crate::graph::implicit_conflict::ImplicitConflictDetector>,
2625 ontology: Option<&crate::graph::ontology::OntologyTable>,
2626 ) -> Result<i64, MemoryError> {
2627 let new_id = self
2628 .insert_or_supersede_with_metrics(
2629 source_entity_id,
2630 target_entity_id,
2631 relation,
2632 canonical_relation,
2633 fact,
2634 confidence,
2635 episode_id,
2636 edge_type,
2637 set_supersedes,
2638 metrics,
2639 )
2640 .await?;
2641
2642 if let (Some(det), Some(onto)) = (detector, ontology)
2643 && det.is_enabled()
2644 {
2645 let is_cardinality_n =
2646 onto.cardinality(canonical_relation) == crate::graph::ontology::Cardinality::Many;
2647
2648 let existing_raw = sqlx::query(
2650 "SELECT id, canonical_relation FROM graph_edges
2651 WHERE source_entity_id = ?
2652 AND id != ?
2653 AND expired_at IS NULL",
2654 )
2655 .bind(source_entity_id)
2656 .bind(new_id)
2657 .fetch_all(&self.pool)
2658 .await;
2659
2660 match existing_raw {
2661 Ok(rows) => {
2662 use sqlx::Row as _;
2663 let existing: Vec<(i64, String)> = rows
2664 .into_iter()
2665 .map(|r| {
2666 let id: i64 = r.try_get("id").unwrap_or(0);
2667 let rel: String = r.try_get("canonical_relation").unwrap_or_default();
2668 (id, rel)
2669 })
2670 .collect();
2671 let existing_refs: Vec<(i64, &str)> = existing
2672 .iter()
2673 .map(|(id, rel)| (*id, rel.as_str()))
2674 .collect();
2675
2676 let candidates = det.detect_candidates(
2677 new_id,
2678 canonical_relation,
2679 &existing_refs,
2680 is_cardinality_n,
2681 );
2682
2683 if !candidates.is_empty() {
2684 let ttl = det.candidate_ttl_days();
2685 match zeph_db::begin(&self.pool).await {
2686 Ok(mut tx) => {
2687 match det.stage_candidates(&candidates, &mut tx, ttl).await {
2688 Ok(()) => {
2689 if let Err(e) = tx.commit().await {
2690 tracing::warn!(
2691 error = %e,
2692 "implicit conflict tx commit failed (non-fatal)"
2693 );
2694 }
2695 }
2696 Err(e) => {
2697 tracing::warn!(
2698 error = %e,
2699 "implicit conflict staging failed (non-fatal)"
2700 );
2701 }
2702 }
2703 }
2704 Err(e) => {
2705 tracing::warn!(
2706 error = %e,
2707 "implicit conflict: tx begin failed (non-fatal)"
2708 );
2709 }
2710 }
2711 }
2712 }
2713 Err(e) => {
2714 tracing::warn!(
2715 error = %e,
2716 "implicit conflict: failed to query existing edges (non-fatal)"
2717 );
2718 }
2719 }
2720 }
2721
2722 Ok(new_id)
2723 }
2724
2725 pub async fn check_supersede_depth(&self, head_id: i64) -> Result<usize, MemoryError> {
2735 Self::check_supersede_depth_with_pool(&self.pool, head_id).await
2736 }
2737
2738 async fn check_supersede_depth_with_pool(
2739 pool: &zeph_db::DbPool,
2740 head_id: i64,
2741 ) -> Result<usize, MemoryError> {
2742 let cap = i64::try_from(SUPERSEDE_DEPTH_CAP + 1).unwrap_or(i64::MAX);
2743 let depth: Option<i64> = zeph_db::query_scalar(sql!(
2746 "WITH RECURSIVE chain(id, depth) AS (
2747 SELECT id, 0 FROM graph_edges WHERE id = ?
2748 UNION ALL
2749 SELECT e.supersedes, c.depth + 1
2750 FROM graph_edges e JOIN chain c ON e.id = c.id
2751 WHERE e.supersedes IS NOT NULL AND c.depth < ?
2752 )
2753 SELECT MAX(depth) FROM chain"
2754 ))
2755 .bind(head_id)
2756 .bind(cap)
2757 .fetch_optional(pool)
2758 .await?
2759 .flatten();
2760
2761 #[allow(clippy::cast_sign_loss, clippy::cast_possible_truncation)]
2762 let d = depth.unwrap_or(0) as usize;
2763 if d > SUPERSEDE_DEPTH_CAP {
2764 return Err(MemoryError::SupersedeCycle(head_id));
2765 }
2766 Ok(d)
2767 }
2768
2769 pub async fn source_message_ids_for_edges(
2780 &self,
2781 edge_ids: &[i64],
2782 ) -> Result<Vec<(i64, crate::types::MessageId)>, crate::error::MemoryError> {
2783 if edge_ids.is_empty() {
2784 return Ok(Vec::new());
2785 }
2786 let placeholders = placeholder_list(1, edge_ids.len());
2787 let sql = format!(
2788 "SELECT id, episode_id FROM graph_edges \
2789 WHERE id IN ({placeholders}) AND episode_id IS NOT NULL"
2790 );
2791 let mut q = zeph_db::query_as::<_, (i64, crate::types::MessageId)>(&sql);
2792 for &eid in edge_ids {
2793 q = q.bind(eid);
2794 }
2795 let rows = q.fetch_all(&self.pool).await?;
2796 Ok(rows)
2797 }
2798
2799 pub async fn source_entity_id_for_edge(
2807 &self,
2808 edge_id: i64,
2809 ) -> Result<Option<i64>, crate::error::MemoryError> {
2810 let row: Option<i64> = zeph_db::query_scalar(sql!(
2811 "SELECT source_entity_id FROM graph_edges WHERE id = ?1 AND valid_to IS NULL LIMIT 1"
2812 ))
2813 .bind(edge_id)
2814 .fetch_optional(&self.pool)
2815 .await?;
2816 Ok(row)
2817 }
2818
2819 pub async fn bfs_edges_at_depth(
2828 &self,
2829 entity_id: i64,
2830 _depth: u32,
2831 edge_types: &[crate::graph::types::EdgeType],
2832 ) -> Result<Vec<crate::recall_view::RecalledFact>, crate::error::MemoryError> {
2833 let type_strs: Vec<&str> = if edge_types.is_empty() {
2834 vec!["semantic", "temporal", "causal", "entity"]
2835 } else {
2836 edge_types.iter().map(|et| et.as_str()).collect()
2837 };
2838
2839 let ph = placeholder_list(1, type_strs.len());
2840 let src_pos = type_strs.len() + 1;
2841 let tgt_pos = src_pos + 1;
2842 let src_ph = numbered_placeholder(src_pos);
2843 let tgt_ph = numbered_placeholder(tgt_pos);
2844
2845 let sql = format!(
2846 "SELECT ge.id, ge.source_entity_id, ge.target_entity_id, ge.relation, ge.fact,
2847 ge.confidence, ge.valid_from, ge.valid_to, ge.created_at, ge.expired_at,
2848 ge.episode_id, ge.qdrant_point_id, ge.edge_type, ge.retrieval_count,
2849 ge.last_retrieved_at, ge.superseded_by, ge.canonical_relation, ge.supersedes,
2850 ge.weight, ge.confidence_fast, ge.confidence_slow, ge.turn_index
2851 FROM graph_edges ge
2852 WHERE ge.edge_type IN ({ph})
2853 AND ge.valid_to IS NULL
2854 AND (ge.source_entity_id = {src_ph} OR ge.target_entity_id = {tgt_ph})
2855 LIMIT 200"
2856 );
2857
2858 let mut q = zeph_db::query_as::<_, EdgeRow>(&sql);
2859 for t in &type_strs {
2860 q = q.bind(*t);
2861 }
2862 q = q.bind(entity_id).bind(entity_id);
2863
2864 let rows = q.fetch_all(&self.pool).await?;
2865
2866 let all_ids: Vec<i64> = rows
2868 .iter()
2869 .flat_map(|r| [r.source_entity_id, r.target_entity_id])
2870 .collect::<std::collections::HashSet<_>>()
2871 .into_iter()
2872 .collect();
2873
2874 let mut name_map: std::collections::HashMap<i64, String> = std::collections::HashMap::new();
2875 for chunk in all_ids.chunks(490) {
2876 let ph2 = placeholder_list(1, chunk.len());
2877 let name_sql =
2878 format!("SELECT id, canonical_name FROM graph_entities WHERE id IN ({ph2})");
2879 let mut nq = zeph_db::query_as::<_, (i64, String)>(&name_sql);
2880 for &id in chunk {
2881 nq = nq.bind(id);
2882 }
2883 let name_rows = nq.fetch_all(&self.pool).await?;
2884 for (id, name) in name_rows {
2885 name_map.insert(id, name);
2886 }
2887 }
2888
2889 let facts: Vec<crate::recall_view::RecalledFact> = rows
2890 .into_iter()
2891 .filter_map(|row| {
2892 let edge = edge_from_row(row);
2893 let entity_name = name_map.get(&edge.source_entity_id).cloned()?;
2894 let target_name = name_map.get(&edge.target_entity_id).cloned()?;
2895 if entity_name.is_empty() || target_name.is_empty() {
2896 return None;
2897 }
2898 let fact = crate::graph::types::GraphFact {
2899 entity_name,
2900 relation: edge.canonical_relation.clone(),
2901 target_name,
2902 fact: edge.fact.clone(),
2903 entity_match_score: 0.5,
2904 hop_distance: 1,
2905 confidence: edge.confidence,
2906 valid_from: if edge.valid_from.is_empty() {
2907 None
2908 } else {
2909 Some(edge.valid_from.clone())
2910 },
2911 edge_type: edge.edge_type,
2912 retrieval_count: edge.retrieval_count,
2913 edge_id: Some(edge.id),
2914 };
2915 Some(crate::recall_view::RecalledFact::from_graph_fact(fact))
2916 })
2917 .collect();
2918
2919 Ok(facts)
2920 }
2921}
2922
2923type Tx<'a> = zeph_db::DbTransaction<'a>;
2928
2929async fn find_identical_active_edge(
2933 tx: &mut Tx<'_>,
2934 src: i64,
2935 tgt: i64,
2936 canon: &str,
2937 edge_type_str: &str,
2938 fact: &str,
2939) -> Result<Option<i64>, MemoryError> {
2940 Ok(zeph_db::query_scalar(sql!(
2941 "SELECT id FROM graph_edges
2942 WHERE source_entity_id = ?
2943 AND target_entity_id = ?
2944 AND canonical_relation = ?
2945 AND edge_type = ?
2946 AND fact = ?
2947 AND valid_to IS NULL
2948 AND expired_at IS NULL
2949 LIMIT 1"
2950 ))
2951 .bind(src)
2952 .bind(tgt)
2953 .bind(canon)
2954 .bind(edge_type_str)
2955 .bind(fact)
2956 .fetch_optional(&mut **tx)
2957 .await?)
2958}
2959
2960async fn record_reassertion(
2966 tx: &mut Tx<'_>,
2967 head_id: i64,
2968 episode_raw: Option<i64>,
2969 confidence: f32,
2970 benna_fast_rate: f32,
2971 benna_slow_rate: f32,
2972) -> Result<(), MemoryError> {
2973 #[allow(clippy::cast_possible_wrap)]
2974 let asserted_at = std::time::SystemTime::now()
2975 .duration_since(std::time::UNIX_EPOCH)
2976 .unwrap_or_default()
2977 .as_secs() as i64;
2978 zeph_db::query(sql!(
2979 "INSERT INTO edge_reassertions (head_edge_id, asserted_at, episode_id, confidence)
2980 VALUES (?, ?, ?, ?)"
2981 ))
2982 .bind(head_id)
2983 .bind(asserted_at)
2984 .bind(episode_raw)
2985 .bind(f64::from(confidence))
2986 .execute(&mut **tx)
2987 .await?;
2988
2989 let existing: Option<(f64, f64)> = zeph_db::query_as(sql!(
2991 "SELECT confidence_fast, confidence_slow FROM graph_edges WHERE id = ? LIMIT 1"
2992 ))
2993 .bind(head_id)
2994 .fetch_optional(&mut **tx)
2995 .await?;
2996
2997 if let Some((stored_fast, stored_slow)) = existing {
2998 #[allow(clippy::cast_possible_truncation)]
2999 let stored_fast_f32 = (stored_fast as f32).clamp(0.0, 1.0);
3000 #[allow(clippy::cast_possible_truncation)]
3001 let stored_slow_f32 = (stored_slow as f32).clamp(0.0, 1.0);
3002 let new_fast = stored_fast_f32 + benna_fast_rate * (confidence - stored_fast_f32);
3003 let new_slow = stored_slow_f32 + benna_slow_rate * (new_fast - stored_slow_f32);
3004 zeph_db::query(sql!(
3005 "UPDATE graph_edges SET confidence_fast = ?, confidence_slow = ? WHERE id = ?"
3006 ))
3007 .bind(f64::from(new_fast))
3008 .bind(f64::from(new_slow))
3009 .bind(head_id)
3010 .execute(&mut **tx)
3011 .await?;
3012 }
3013
3014 Ok(())
3015}
3016
3017async fn find_prior_active_head(
3019 tx: &mut Tx<'_>,
3020 src: i64,
3021 canon: &str,
3022 edge_type_str: &str,
3023) -> Result<Option<i64>, MemoryError> {
3024 Ok(zeph_db::query_scalar(sql!(
3025 "SELECT id FROM graph_edges
3026 WHERE source_entity_id = ?
3027 AND canonical_relation = ?
3028 AND edge_type = ?
3029 AND valid_to IS NULL
3030 AND expired_at IS NULL
3031 ORDER BY created_at DESC
3032 LIMIT 1"
3033 ))
3034 .bind(src)
3035 .bind(canon)
3036 .bind(edge_type_str)
3037 .fetch_optional(&mut **tx)
3038 .await?)
3039}
3040
3041async fn check_supersede_depth_in_tx(tx: &mut Tx<'_>, head_id: i64) -> Result<(), MemoryError> {
3045 let cap = i64::try_from(SUPERSEDE_DEPTH_CAP + 1).unwrap_or(i64::MAX);
3046 let depth: Option<i64> = sqlx::query_scalar(
3047 "WITH RECURSIVE chain(id, depth) AS (
3048 SELECT supersedes, 1 FROM graph_edges WHERE id = ? AND supersedes IS NOT NULL
3049 UNION ALL
3050 SELECT e.supersedes, c.depth + 1
3051 FROM graph_edges e JOIN chain c ON e.id = c.id
3052 WHERE e.supersedes IS NOT NULL AND c.depth < ?
3053 )
3054 SELECT MAX(depth) FROM chain",
3055 )
3056 .bind(head_id)
3057 .bind(cap)
3058 .fetch_optional(&mut **tx)
3059 .await?
3060 .flatten();
3061 let d = usize::try_from(depth.unwrap_or(0)).unwrap_or(usize::MAX);
3062 if d > SUPERSEDE_DEPTH_CAP {
3063 return Err(MemoryError::SupersedeDepthExceeded(head_id));
3064 }
3065 Ok(())
3066}
3067
3068#[allow(clippy::too_many_arguments)]
3070async fn insert_new_edge(
3071 tx: &mut Tx<'_>,
3072 src: i64,
3073 tgt: i64,
3074 relation: &str,
3075 canonical_relation: &str,
3076 fact: &str,
3077 confidence: f32,
3078 episode_raw: Option<i64>,
3079 edge_type_str: &str,
3080 supersedes_val: Option<i64>,
3081 turn_index: Option<i64>,
3082) -> Result<i64, MemoryError> {
3083 Ok(zeph_db::query_scalar(sql!(
3084 "INSERT INTO graph_edges
3085 (source_entity_id, target_entity_id, relation, canonical_relation, fact,
3086 confidence, confidence_fast, confidence_slow, episode_id, edge_type, supersedes, turn_index)
3087 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
3088 RETURNING id"
3089 ))
3090 .bind(src)
3091 .bind(tgt)
3092 .bind(relation)
3093 .bind(canonical_relation)
3094 .bind(fact)
3095 .bind(f64::from(confidence))
3096 .bind(f64::from(confidence))
3097 .bind(f64::from(confidence))
3098 .bind(episode_raw)
3099 .bind(edge_type_str)
3100 .bind(supersedes_val)
3101 .bind(turn_index)
3102 .fetch_one(&mut **tx)
3103 .await?)
3104}
3105
3106async fn expire_prior_head(tx: &mut Tx<'_>, head_id: i64) -> Result<(), MemoryError> {
3110 zeph_db::query(sql!(
3111 "UPDATE graph_edges
3112 SET valid_to = CURRENT_TIMESTAMP,
3113 expired_at = CURRENT_TIMESTAMP
3114 WHERE id = ?"
3115 ))
3116 .bind(head_id)
3117 .execute(&mut **tx)
3118 .await?;
3119 Ok(())
3120}
3121
3122async fn set_superseded_by(tx: &mut Tx<'_>, head_id: i64, new_id: i64) -> Result<(), MemoryError> {
3125 zeph_db::query(sql!(
3126 "UPDATE graph_edges SET superseded_by = ? WHERE id = ?"
3127 ))
3128 .bind(new_id)
3129 .bind(head_id)
3130 .execute(&mut **tx)
3131 .await?;
3132 Ok(())
3133}
3134
3135type EntityFtsRow = (
3139 i64,
3140 String,
3141 String,
3142 String,
3143 Option<String>,
3144 String,
3145 String,
3146 Option<String>,
3147 f64,
3148);
3149
3150fn build_fts_query(query: &str) -> Option<String> {
3155 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
3156 let sanitized = sanitize_fts_query(query);
3157 if sanitized.is_empty() {
3158 return None;
3159 }
3160 let fts_query: String = sanitized
3161 .split_whitespace()
3162 .filter(|t| !FTS5_OPERATORS.contains(t))
3163 .map(|t| format!("{t}*"))
3164 .collect::<Vec<_>>()
3165 .join(" ");
3166 if fts_query.is_empty() {
3167 None
3168 } else {
3169 Some(fts_query)
3170 }
3171}
3172
3173fn build_ranked_fts_sql() -> String {
3178 format!(
3179 "SELECT * FROM ( \
3180 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
3181 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
3182 -bm25(graph_entities_fts, 10.0, 1.0) AS fts_rank \
3183 FROM graph_entities_fts fts \
3184 JOIN graph_entities e ON e.id = fts.rowid \
3185 WHERE graph_entities_fts MATCH ? \
3186 UNION ALL \
3187 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
3188 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
3189 0.5 AS fts_rank \
3190 FROM graph_entity_aliases a \
3191 JOIN graph_entities e ON e.id = a.entity_id \
3192 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
3193 ) \
3194 ORDER BY fts_rank DESC \
3195 LIMIT ?",
3196 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
3197 )
3198}
3199
3200fn normalize_and_dedup(rows: Vec<EntityFtsRow>) -> Vec<(Entity, f32)> {
3204 let max_score: f64 = rows.iter().map(|r| r.8).fold(0.0_f64, f64::max);
3206 let max_score = if max_score <= 0.0 { 1.0 } else { max_score };
3207
3208 let mut seen_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
3209 let mut result: Vec<(Entity, f32)> = Vec::with_capacity(rows.len());
3210 for (
3211 id,
3212 name,
3213 canonical_name,
3214 entity_type_str,
3215 summary,
3216 first_seen_at,
3217 last_seen_at,
3218 qdrant_point_id,
3219 raw_score,
3220 ) in rows
3221 {
3222 if !seen_ids.insert(id) {
3223 continue;
3224 }
3225 let entity_type = entity_type_str
3226 .parse()
3227 .unwrap_or(super::types::EntityType::Concept);
3228 let entity = Entity {
3229 id: EntityId(id),
3230 name,
3231 canonical_name,
3232 entity_type,
3233 summary,
3234 first_seen_at,
3235 last_seen_at,
3236 qdrant_point_id,
3237 };
3238 #[allow(clippy::cast_possible_truncation)]
3239 let normalized = (raw_score / max_score).clamp(0.0, 1.0) as f32;
3240 result.push((entity, normalized));
3241 }
3242 result
3243}
3244
3245#[cfg(test)]
3248mod tests;