1use std::collections::HashMap;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use futures::Stream;
9use zeph_db::fts::sanitize_fts_query;
10use zeph_db::{ActiveDialect, DbPool, numbered_placeholder, placeholder_list};
11
12use crate::error::MemoryError;
13use crate::types::MessageId;
14
15use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
16
17pub struct GraphStore {
18 pool: DbPool,
19}
20
21impl GraphStore {
22 #[must_use]
23 pub fn new(pool: DbPool) -> Self {
24 Self { pool }
25 }
26
27 #[must_use]
28 pub fn pool(&self) -> &DbPool {
29 &self.pool
30 }
31
32 pub async fn upsert_entity(
45 &self,
46 surface_name: &str,
47 canonical_name: &str,
48 entity_type: EntityType,
49 summary: Option<&str>,
50 ) -> Result<i64, MemoryError> {
51 let type_str = entity_type.as_str();
52 let id: i64 = zeph_db::query_scalar(sql!(
53 "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
54 VALUES (?, ?, ?, ?)
55 ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
56 name = excluded.name,
57 summary = COALESCE(excluded.summary, summary),
58 last_seen_at = CURRENT_TIMESTAMP
59 RETURNING id"
60 ))
61 .bind(surface_name)
62 .bind(canonical_name)
63 .bind(type_str)
64 .bind(summary)
65 .fetch_one(&self.pool)
66 .await?;
67 Ok(id)
68 }
69
70 pub async fn find_entity(
76 &self,
77 canonical_name: &str,
78 entity_type: EntityType,
79 ) -> Result<Option<Entity>, MemoryError> {
80 let type_str = entity_type.as_str();
81 let row: Option<EntityRow> = zeph_db::query_as(
82 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
83 FROM graph_entities
84 WHERE canonical_name = ? AND entity_type = ?"),
85 )
86 .bind(canonical_name)
87 .bind(type_str)
88 .fetch_optional(&self.pool)
89 .await?;
90 row.map(entity_from_row).transpose()
91 }
92
93 pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
99 let row: Option<EntityRow> = zeph_db::query_as(
100 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
101 FROM graph_entities
102 WHERE id = ?"),
103 )
104 .bind(entity_id)
105 .fetch_optional(&self.pool)
106 .await?;
107 row.map(entity_from_row).transpose()
108 }
109
110 pub async fn set_entity_qdrant_point_id(
116 &self,
117 entity_id: i64,
118 point_id: &str,
119 ) -> Result<(), MemoryError> {
120 zeph_db::query(sql!(
121 "UPDATE graph_entities SET qdrant_point_id = ? WHERE id = ?"
122 ))
123 .bind(point_id)
124 .bind(entity_id)
125 .execute(&self.pool)
126 .await?;
127 Ok(())
128 }
129
130 pub async fn find_entities_fuzzy(
151 &self,
152 query: &str,
153 limit: usize,
154 ) -> Result<Vec<Entity>, MemoryError> {
155 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
159 let query = &query[..query.floor_char_boundary(512)];
160 let sanitized = sanitize_fts_query(query);
163 if sanitized.is_empty() {
164 return Ok(vec![]);
165 }
166 let fts_query: String = sanitized
167 .split_whitespace()
168 .filter(|t| !FTS5_OPERATORS.contains(t))
169 .map(|t| format!("{t}*"))
170 .collect::<Vec<_>>()
171 .join(" ");
172 if fts_query.is_empty() {
173 return Ok(vec![]);
174 }
175
176 let limit = i64::try_from(limit)?;
177 let search_sql = format!(
180 "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
181 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
182 FROM graph_entities_fts fts \
183 JOIN graph_entities e ON e.id = fts.rowid \
184 WHERE graph_entities_fts MATCH ? \
185 UNION \
186 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
187 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
188 FROM graph_entity_aliases a \
189 JOIN graph_entities e ON e.id = a.entity_id \
190 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
191 LIMIT ?",
192 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
193 );
194 let rows: Vec<EntityRow> = zeph_db::query_as(&search_sql)
195 .bind(&fts_query)
196 .bind(format!(
197 "%{}%",
198 query
199 .trim()
200 .replace('\\', "\\\\")
201 .replace('%', "\\%")
202 .replace('_', "\\_")
203 ))
204 .bind(limit)
205 .fetch_all(&self.pool)
206 .await?;
207 rows.into_iter()
208 .map(entity_from_row)
209 .collect::<Result<Vec<_>, _>>()
210 }
211
212 #[cfg(feature = "sqlite")]
222 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
223 zeph_db::query("PRAGMA wal_checkpoint(PASSIVE)")
224 .execute(&self.pool)
225 .await?;
226 Ok(())
227 }
228
229 #[cfg(feature = "postgres")]
235 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
236 Ok(())
237 }
238
239 pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
241 use futures::StreamExt as _;
242 zeph_db::query_as::<_, EntityRow>(
243 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
244 FROM graph_entities ORDER BY id ASC"),
245 )
246 .fetch(&self.pool)
247 .map(|r: Result<EntityRow, zeph_db::SqlxError>| {
248 r.map_err(MemoryError::from).and_then(entity_from_row)
249 })
250 }
251
252 pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
260 let insert_alias_sql = format!(
261 "{} INTO graph_entity_aliases (entity_id, alias_name) VALUES (?, ?){}",
262 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
263 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
264 );
265 zeph_db::query(&insert_alias_sql)
266 .bind(entity_id)
267 .bind(alias_name)
268 .execute(&self.pool)
269 .await?;
270 Ok(())
271 }
272
273 pub async fn find_entity_by_alias(
281 &self,
282 alias_name: &str,
283 entity_type: EntityType,
284 ) -> Result<Option<Entity>, MemoryError> {
285 let type_str = entity_type.as_str();
286 let alias_typed_sql = format!(
287 "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
288 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
289 FROM graph_entity_aliases a \
290 JOIN graph_entities e ON e.id = a.entity_id \
291 WHERE a.alias_name = ? {} \
292 AND e.entity_type = ? \
293 ORDER BY e.id ASC \
294 LIMIT 1",
295 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
296 );
297 let row: Option<EntityRow> = zeph_db::query_as(&alias_typed_sql)
298 .bind(alias_name)
299 .bind(type_str)
300 .fetch_optional(&self.pool)
301 .await?;
302 row.map(entity_from_row).transpose()
303 }
304
305 pub async fn aliases_for_entity(
311 &self,
312 entity_id: i64,
313 ) -> Result<Vec<EntityAlias>, MemoryError> {
314 let rows: Vec<AliasRow> = zeph_db::query_as(sql!(
315 "SELECT id, entity_id, alias_name, created_at
316 FROM graph_entity_aliases
317 WHERE entity_id = ?
318 ORDER BY id ASC"
319 ))
320 .bind(entity_id)
321 .fetch_all(&self.pool)
322 .await?;
323 Ok(rows.into_iter().map(alias_from_row).collect())
324 }
325
326 pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
332 use futures::TryStreamExt as _;
333 self.all_entities_stream().try_collect().await
334 }
335
336 pub async fn entity_count(&self) -> Result<i64, MemoryError> {
342 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_entities"))
343 .fetch_one(&self.pool)
344 .await?;
345 Ok(count)
346 }
347
348 pub async fn insert_edge(
366 &self,
367 source_entity_id: i64,
368 target_entity_id: i64,
369 relation: &str,
370 fact: &str,
371 confidence: f32,
372 episode_id: Option<MessageId>,
373 ) -> Result<i64, MemoryError> {
374 self.insert_edge_typed(
375 source_entity_id,
376 target_entity_id,
377 relation,
378 fact,
379 confidence,
380 episode_id,
381 EdgeType::Semantic,
382 )
383 .await
384 }
385
386 #[allow(clippy::too_many_arguments)]
395 pub async fn insert_edge_typed(
396 &self,
397 source_entity_id: i64,
398 target_entity_id: i64,
399 relation: &str,
400 fact: &str,
401 confidence: f32,
402 episode_id: Option<MessageId>,
403 edge_type: EdgeType,
404 ) -> Result<i64, MemoryError> {
405 if source_entity_id == target_entity_id {
406 return Err(MemoryError::InvalidInput(format!(
407 "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
408 )));
409 }
410 let confidence = confidence.clamp(0.0, 1.0);
411 let edge_type_str = edge_type.as_str();
412
413 let mut tx = zeph_db::begin(&self.pool).await?;
418
419 let existing: Option<(i64, f64)> = zeph_db::query_as(sql!(
420 "SELECT id, confidence FROM graph_edges
421 WHERE source_entity_id = ?
422 AND target_entity_id = ?
423 AND relation = ?
424 AND edge_type = ?
425 AND valid_to IS NULL
426 LIMIT 1"
427 ))
428 .bind(source_entity_id)
429 .bind(target_entity_id)
430 .bind(relation)
431 .bind(edge_type_str)
432 .fetch_optional(&mut *tx)
433 .await?;
434
435 if let Some((existing_id, stored_conf)) = existing {
436 let updated_conf = f64::from(confidence).max(stored_conf);
437 zeph_db::query(sql!("UPDATE graph_edges SET confidence = ? WHERE id = ?"))
438 .bind(updated_conf)
439 .bind(existing_id)
440 .execute(&mut *tx)
441 .await?;
442 tx.commit().await?;
443 return Ok(existing_id);
444 }
445
446 let episode_raw: Option<i64> = episode_id.map(|m| m.0);
447 let id: i64 = zeph_db::query_scalar(sql!(
448 "INSERT INTO graph_edges
449 (source_entity_id, target_entity_id, relation, fact, confidence, episode_id, edge_type)
450 VALUES (?, ?, ?, ?, ?, ?, ?)
451 RETURNING id"
452 ))
453 .bind(source_entity_id)
454 .bind(target_entity_id)
455 .bind(relation)
456 .bind(fact)
457 .bind(f64::from(confidence))
458 .bind(episode_raw)
459 .bind(edge_type_str)
460 .fetch_one(&mut *tx)
461 .await?;
462 tx.commit().await?;
463 Ok(id)
464 }
465
466 pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
472 zeph_db::query(sql!(
473 "UPDATE graph_edges SET valid_to = CURRENT_TIMESTAMP, expired_at = CURRENT_TIMESTAMP
474 WHERE id = ?"
475 ))
476 .bind(edge_id)
477 .execute(&self.pool)
478 .await?;
479 Ok(())
480 }
481
482 pub async fn invalidate_edge_with_supersession(
490 &self,
491 old_edge_id: i64,
492 new_edge_id: i64,
493 ) -> Result<(), MemoryError> {
494 zeph_db::query(sql!(
495 "UPDATE graph_edges
496 SET valid_to = CURRENT_TIMESTAMP,
497 expired_at = CURRENT_TIMESTAMP,
498 superseded_by = ?
499 WHERE id = ?"
500 ))
501 .bind(new_edge_id)
502 .bind(old_edge_id)
503 .execute(&self.pool)
504 .await?;
505 Ok(())
506 }
507
508 pub async fn edges_for_entities(
525 &self,
526 entity_ids: &[i64],
527 edge_types: &[super::types::EdgeType],
528 ) -> Result<Vec<Edge>, MemoryError> {
529 const MAX_BATCH_ENTITIES: usize = 490;
533
534 let mut all_edges: Vec<Edge> = Vec::new();
535
536 for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
537 let edges = self.query_batch_edges(chunk, edge_types).await?;
538 all_edges.extend(edges);
539 }
540
541 Ok(all_edges)
542 }
543
544 async fn query_batch_edges(
548 &self,
549 entity_ids: &[i64],
550 edge_types: &[super::types::EdgeType],
551 ) -> Result<Vec<Edge>, MemoryError> {
552 if entity_ids.is_empty() {
553 return Ok(Vec::new());
554 }
555
556 let n_ids = entity_ids.len();
559 let n_types = edge_types.len();
560
561 let sql = if n_types == 0 {
562 let placeholders = placeholder_list(1, n_ids);
564 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
565 format!(
566 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
567 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
568 edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
569 FROM graph_edges
570 WHERE valid_to IS NULL
571 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))"
572 )
573 } else {
574 let placeholders = placeholder_list(1, n_ids);
575 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
576 let type_placeholders = placeholder_list(n_ids * 2 + 1, n_types);
577 format!(
578 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
579 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
580 edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
581 FROM graph_edges
582 WHERE valid_to IS NULL
583 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))
584 AND edge_type IN ({type_placeholders})"
585 )
586 };
587
588 let mut query = zeph_db::query_as::<_, EdgeRow>(&sql);
590 for id in entity_ids {
591 query = query.bind(*id);
592 }
593 for id in entity_ids {
594 query = query.bind(*id);
595 }
596 for et in edge_types {
597 query = query.bind(et.as_str());
598 }
599
600 let rows: Vec<EdgeRow> = query.fetch_all(&self.pool).await?;
601 Ok(rows.into_iter().map(edge_from_row).collect())
602 }
603
604 pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
610 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
611 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
612 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
613 edge_type, retrieval_count, last_retrieved_at, superseded_by
614 FROM graph_edges
615 WHERE valid_to IS NULL
616 AND (source_entity_id = ? OR target_entity_id = ?)"
617 ))
618 .bind(entity_id)
619 .bind(entity_id)
620 .fetch_all(&self.pool)
621 .await?;
622 Ok(rows.into_iter().map(edge_from_row).collect())
623 }
624
625 pub async fn edge_history_for_entity(
632 &self,
633 entity_id: i64,
634 limit: usize,
635 ) -> Result<Vec<Edge>, MemoryError> {
636 let limit = i64::try_from(limit)?;
637 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
638 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
639 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
640 edge_type, retrieval_count, last_retrieved_at, superseded_by
641 FROM graph_edges
642 WHERE source_entity_id = ? OR target_entity_id = ?
643 ORDER BY valid_from DESC
644 LIMIT ?"
645 ))
646 .bind(entity_id)
647 .bind(entity_id)
648 .bind(limit)
649 .fetch_all(&self.pool)
650 .await?;
651 Ok(rows.into_iter().map(edge_from_row).collect())
652 }
653
654 pub async fn edges_between(
660 &self,
661 entity_a: i64,
662 entity_b: i64,
663 ) -> Result<Vec<Edge>, MemoryError> {
664 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
665 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
666 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
667 edge_type, retrieval_count, last_retrieved_at, superseded_by
668 FROM graph_edges
669 WHERE valid_to IS NULL
670 AND ((source_entity_id = ? AND target_entity_id = ?)
671 OR (source_entity_id = ? AND target_entity_id = ?))"
672 ))
673 .bind(entity_a)
674 .bind(entity_b)
675 .bind(entity_b)
676 .bind(entity_a)
677 .fetch_all(&self.pool)
678 .await?;
679 Ok(rows.into_iter().map(edge_from_row).collect())
680 }
681
682 pub async fn edges_exact(
688 &self,
689 source_entity_id: i64,
690 target_entity_id: i64,
691 ) -> Result<Vec<Edge>, MemoryError> {
692 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
693 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
694 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
695 edge_type, retrieval_count, last_retrieved_at, superseded_by
696 FROM graph_edges
697 WHERE valid_to IS NULL
698 AND source_entity_id = ?
699 AND target_entity_id = ?"
700 ))
701 .bind(source_entity_id)
702 .bind(target_entity_id)
703 .fetch_all(&self.pool)
704 .await?;
705 Ok(rows.into_iter().map(edge_from_row).collect())
706 }
707
708 pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
714 let count: i64 = zeph_db::query_scalar(sql!(
715 "SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL"
716 ))
717 .fetch_one(&self.pool)
718 .await?;
719 Ok(count)
720 }
721
722 pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
728 let rows: Vec<(String, i64)> = zeph_db::query_as(
729 sql!("SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type"),
730 )
731 .fetch_all(&self.pool)
732 .await?;
733 Ok(rows)
734 }
735
736 pub async fn upsert_community(
748 &self,
749 name: &str,
750 summary: &str,
751 entity_ids: &[i64],
752 fingerprint: Option<&str>,
753 ) -> Result<i64, MemoryError> {
754 let entity_ids_json = serde_json::to_string(entity_ids)?;
755 let id: i64 = zeph_db::query_scalar(sql!(
756 "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
757 VALUES (?, ?, ?, ?)
758 ON CONFLICT(name) DO UPDATE SET
759 summary = excluded.summary,
760 entity_ids = excluded.entity_ids,
761 fingerprint = COALESCE(excluded.fingerprint, fingerprint),
762 updated_at = CURRENT_TIMESTAMP
763 RETURNING id"
764 ))
765 .bind(name)
766 .bind(summary)
767 .bind(entity_ids_json)
768 .bind(fingerprint)
769 .fetch_one(&self.pool)
770 .await?;
771 Ok(id)
772 }
773
774 pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
781 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
782 "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL"
783 ))
784 .fetch_all(&self.pool)
785 .await?;
786 Ok(rows.into_iter().collect())
787 }
788
789 pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
795 zeph_db::query(sql!("DELETE FROM graph_communities WHERE id = ?"))
796 .bind(id)
797 .execute(&self.pool)
798 .await?;
799 Ok(())
800 }
801
802 pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
811 zeph_db::query(sql!(
812 "UPDATE graph_communities SET fingerprint = NULL WHERE id = ?"
813 ))
814 .bind(id)
815 .execute(&self.pool)
816 .await?;
817 Ok(())
818 }
819
820 pub async fn community_for_entity(
829 &self,
830 entity_id: i64,
831 ) -> Result<Option<Community>, MemoryError> {
832 let row: Option<CommunityRow> = zeph_db::query_as(
833 sql!("SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
834 FROM graph_communities c, json_each(c.entity_ids) j
835 WHERE CAST(j.value AS INTEGER) = ?
836 LIMIT 1"),
837 )
838 .bind(entity_id)
839 .fetch_optional(&self.pool)
840 .await?;
841 match row {
842 Some(row) => {
843 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
844 Ok(Some(Community {
845 id: row.id,
846 name: row.name,
847 summary: row.summary,
848 entity_ids,
849 fingerprint: row.fingerprint,
850 created_at: row.created_at,
851 updated_at: row.updated_at,
852 }))
853 }
854 None => Ok(None),
855 }
856 }
857
858 pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
864 let rows: Vec<CommunityRow> = zeph_db::query_as(sql!(
865 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
866 FROM graph_communities
867 ORDER BY id ASC"
868 ))
869 .fetch_all(&self.pool)
870 .await?;
871
872 rows.into_iter()
873 .map(|row| {
874 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
875 Ok(Community {
876 id: row.id,
877 name: row.name,
878 summary: row.summary,
879 entity_ids,
880 fingerprint: row.fingerprint,
881 created_at: row.created_at,
882 updated_at: row.updated_at,
883 })
884 })
885 .collect()
886 }
887
888 pub async fn community_count(&self) -> Result<i64, MemoryError> {
894 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_communities"))
895 .fetch_one(&self.pool)
896 .await?;
897 Ok(count)
898 }
899
900 pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
908 let val: Option<String> =
909 zeph_db::query_scalar(sql!("SELECT value FROM graph_metadata WHERE key = ?"))
910 .bind(key)
911 .fetch_optional(&self.pool)
912 .await?;
913 Ok(val)
914 }
915
916 pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
922 zeph_db::query(sql!(
923 "INSERT INTO graph_metadata (key, value) VALUES (?, ?)
924 ON CONFLICT(key) DO UPDATE SET value = excluded.value"
925 ))
926 .bind(key)
927 .bind(value)
928 .execute(&self.pool)
929 .await?;
930 Ok(())
931 }
932
933 pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
941 let val = self.get_metadata("extraction_count").await?;
942 Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
943 }
944
945 pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
947 use futures::StreamExt as _;
948 zeph_db::query_as::<_, EdgeRow>(sql!(
949 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
950 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
951 edge_type, retrieval_count, last_retrieved_at, superseded_by
952 FROM graph_edges
953 WHERE valid_to IS NULL
954 ORDER BY id ASC"
955 ))
956 .fetch(&self.pool)
957 .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
958 }
959
960 pub async fn edges_after_id(
977 &self,
978 after_id: i64,
979 limit: i64,
980 ) -> Result<Vec<Edge>, MemoryError> {
981 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
982 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
983 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
984 edge_type, retrieval_count, last_retrieved_at, superseded_by
985 FROM graph_edges
986 WHERE valid_to IS NULL AND id > ?
987 ORDER BY id ASC
988 LIMIT ?"
989 ))
990 .bind(after_id)
991 .bind(limit)
992 .fetch_all(&self.pool)
993 .await?;
994 Ok(rows.into_iter().map(edge_from_row).collect())
995 }
996
997 pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
1003 let row: Option<CommunityRow> = zeph_db::query_as(sql!(
1004 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
1005 FROM graph_communities
1006 WHERE id = ?"
1007 ))
1008 .bind(id)
1009 .fetch_optional(&self.pool)
1010 .await?;
1011 match row {
1012 Some(row) => {
1013 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
1014 Ok(Some(Community {
1015 id: row.id,
1016 name: row.name,
1017 summary: row.summary,
1018 entity_ids,
1019 fingerprint: row.fingerprint,
1020 created_at: row.created_at,
1021 updated_at: row.updated_at,
1022 }))
1023 }
1024 None => Ok(None),
1025 }
1026 }
1027
1028 pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
1034 zeph_db::query(sql!("DELETE FROM graph_communities"))
1035 .execute(&self.pool)
1036 .await?;
1037 Ok(())
1038 }
1039
1040 #[allow(clippy::too_many_lines)]
1054 pub async fn find_entities_ranked(
1055 &self,
1056 query: &str,
1057 limit: usize,
1058 ) -> Result<Vec<(Entity, f32)>, MemoryError> {
1059 type EntityFtsRow = (
1062 i64,
1063 String,
1064 String,
1065 String,
1066 Option<String>,
1067 String,
1068 String,
1069 Option<String>,
1070 f64,
1071 );
1072
1073 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
1074 let query = &query[..query.floor_char_boundary(512)];
1075 let sanitized = sanitize_fts_query(query);
1076 if sanitized.is_empty() {
1077 return Ok(vec![]);
1078 }
1079 let fts_query: String = sanitized
1080 .split_whitespace()
1081 .filter(|t| !FTS5_OPERATORS.contains(t))
1082 .map(|t| format!("{t}*"))
1083 .collect::<Vec<_>>()
1084 .join(" ");
1085 if fts_query.is_empty() {
1086 return Ok(vec![]);
1087 }
1088
1089 let limit_i64 = i64::try_from(limit)?;
1090
1091 let ranked_fts_sql = format!(
1094 "SELECT * FROM ( \
1095 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1096 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1097 -bm25(graph_entities_fts, 10.0, 1.0) AS fts_rank \
1098 FROM graph_entities_fts fts \
1099 JOIN graph_entities e ON e.id = fts.rowid \
1100 WHERE graph_entities_fts MATCH ? \
1101 UNION ALL \
1102 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1103 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1104 0.5 AS fts_rank \
1105 FROM graph_entity_aliases a \
1106 JOIN graph_entities e ON e.id = a.entity_id \
1107 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
1108 ) \
1109 ORDER BY fts_rank DESC \
1110 LIMIT ?",
1111 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
1112 );
1113 let rows: Vec<EntityFtsRow> = zeph_db::query_as(&ranked_fts_sql)
1114 .bind(&fts_query)
1115 .bind(format!(
1116 "%{}%",
1117 query
1118 .trim()
1119 .replace('\\', "\\\\")
1120 .replace('%', "\\%")
1121 .replace('_', "\\_")
1122 ))
1123 .bind(limit_i64)
1124 .fetch_all(&self.pool)
1125 .await?;
1126
1127 if rows.is_empty() {
1128 return Ok(vec![]);
1129 }
1130
1131 let max_score: f64 = rows.iter().map(|r| r.8).fold(0.0_f64, f64::max);
1133 let max_score = if max_score <= 0.0 { 1.0 } else { max_score };
1134
1135 let mut seen_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
1137 let mut result: Vec<(Entity, f32)> = Vec::with_capacity(rows.len());
1138 for (
1139 id,
1140 name,
1141 canonical_name,
1142 entity_type_str,
1143 summary,
1144 first_seen_at,
1145 last_seen_at,
1146 qdrant_point_id,
1147 raw_score,
1148 ) in rows
1149 {
1150 if !seen_ids.insert(id) {
1151 continue;
1152 }
1153 let entity_type = entity_type_str
1154 .parse()
1155 .unwrap_or(super::types::EntityType::Concept);
1156 let entity = Entity {
1157 id,
1158 name,
1159 canonical_name,
1160 entity_type,
1161 summary,
1162 first_seen_at,
1163 last_seen_at,
1164 qdrant_point_id,
1165 };
1166 #[allow(clippy::cast_possible_truncation)]
1167 let normalized = (raw_score / max_score).clamp(0.0, 1.0) as f32;
1168 result.push((entity, normalized));
1169 }
1170
1171 Ok(result)
1172 }
1173
1174 pub async fn entity_structural_scores(
1184 &self,
1185 entity_ids: &[i64],
1186 ) -> Result<HashMap<i64, f32>, MemoryError> {
1187 const MAX_BATCH: usize = 163;
1190
1191 if entity_ids.is_empty() {
1192 return Ok(HashMap::new());
1193 }
1194
1195 let mut all_rows: Vec<(i64, i64, i64)> = Vec::new();
1196 for chunk in entity_ids.chunks(MAX_BATCH) {
1197 let n = chunk.len();
1198 let ph1 = placeholder_list(1, n);
1200 let ph2 = placeholder_list(n + 1, n);
1201 let ph3 = placeholder_list(n * 2 + 1, n);
1202
1203 let sql = format!(
1205 "SELECT entity_id,
1206 COUNT(*) AS degree,
1207 COUNT(DISTINCT edge_type) AS type_diversity
1208 FROM (
1209 SELECT source_entity_id AS entity_id, edge_type
1210 FROM graph_edges
1211 WHERE valid_to IS NULL AND source_entity_id IN ({ph1})
1212 UNION ALL
1213 SELECT target_entity_id AS entity_id, edge_type
1214 FROM graph_edges
1215 WHERE valid_to IS NULL AND target_entity_id IN ({ph2})
1216 )
1217 WHERE entity_id IN ({ph3})
1218 GROUP BY entity_id"
1219 );
1220
1221 let mut query = zeph_db::query_as::<_, (i64, i64, i64)>(&sql);
1222 for id in chunk {
1224 query = query.bind(*id);
1225 }
1226 for id in chunk {
1227 query = query.bind(*id);
1228 }
1229 for id in chunk {
1230 query = query.bind(*id);
1231 }
1232
1233 let chunk_rows: Vec<(i64, i64, i64)> = query.fetch_all(&self.pool).await?;
1234 all_rows.extend(chunk_rows);
1235 }
1236
1237 if all_rows.is_empty() {
1238 return Ok(entity_ids.iter().map(|&id| (id, 0.0_f32)).collect());
1239 }
1240
1241 let max_degree = all_rows
1242 .iter()
1243 .map(|(_, d, _)| *d)
1244 .max()
1245 .unwrap_or(1)
1246 .max(1);
1247
1248 let mut scores: HashMap<i64, f32> = entity_ids.iter().map(|&id| (id, 0.0_f32)).collect();
1249 for (entity_id, degree, type_diversity) in all_rows {
1250 #[allow(clippy::cast_precision_loss)]
1251 let norm_degree = degree as f32 / max_degree as f32;
1252 #[allow(clippy::cast_precision_loss)]
1253 let norm_diversity = (type_diversity as f32 / 4.0).min(1.0);
1254 let score = 0.6 * norm_degree + 0.4 * norm_diversity;
1255 scores.insert(entity_id, score);
1256 }
1257
1258 Ok(scores)
1259 }
1260
1261 pub async fn entity_community_ids(
1270 &self,
1271 entity_ids: &[i64],
1272 ) -> Result<HashMap<i64, i64>, MemoryError> {
1273 const MAX_BATCH: usize = 490;
1274
1275 if entity_ids.is_empty() {
1276 return Ok(HashMap::new());
1277 }
1278
1279 let mut result: HashMap<i64, i64> = HashMap::new();
1280 for chunk in entity_ids.chunks(MAX_BATCH) {
1281 let placeholders = placeholder_list(1, chunk.len());
1282
1283 let community_sql = community_ids_sql(&placeholders);
1284 let mut query = zeph_db::query_as::<_, (i64, i64)>(&community_sql);
1285 for id in chunk {
1286 query = query.bind(*id);
1287 }
1288
1289 let rows: Vec<(i64, i64)> = query.fetch_all(&self.pool).await?;
1290 result.extend(rows);
1291 }
1292
1293 Ok(result)
1294 }
1295
1296 pub async fn record_edge_retrieval(&self, edge_ids: &[i64]) -> Result<(), MemoryError> {
1305 const MAX_BATCH: usize = 490;
1306 let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1307 for chunk in edge_ids.chunks(MAX_BATCH) {
1308 let edge_placeholders = placeholder_list(1, chunk.len());
1309 let retrieval_sql = format!(
1310 "UPDATE graph_edges \
1311 SET retrieval_count = retrieval_count + 1, \
1312 last_retrieved_at = {epoch_now} \
1313 WHERE id IN ({edge_placeholders})"
1314 );
1315 let mut q = zeph_db::query(&retrieval_sql);
1316 for id in chunk {
1317 q = q.bind(*id);
1318 }
1319 q.execute(&self.pool).await?;
1320 }
1321 Ok(())
1322 }
1323
1324 pub async fn decay_edge_retrieval_counts(
1333 &self,
1334 decay_lambda: f64,
1335 interval_secs: u64,
1336 ) -> Result<usize, MemoryError> {
1337 let epoch_now_decay = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1338 let decay_raw = format!(
1339 "UPDATE graph_edges \
1340 SET retrieval_count = MAX(CAST(retrieval_count * ? AS INTEGER), 0) \
1341 WHERE valid_to IS NULL \
1342 AND retrieval_count > 0 \
1343 AND (last_retrieved_at IS NULL OR last_retrieved_at < {epoch_now_decay} - ?)"
1344 );
1345 let decay_sql = zeph_db::rewrite_placeholders(&decay_raw);
1346 let result = zeph_db::query(&decay_sql)
1347 .bind(decay_lambda)
1348 .bind(i64::try_from(interval_secs).unwrap_or(i64::MAX))
1349 .execute(&self.pool)
1350 .await?;
1351 Ok(usize::try_from(result.rows_affected())?)
1352 }
1353
1354 pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
1360 let days = i64::from(retention_days);
1361 let result = zeph_db::query(sql!(
1362 "DELETE FROM graph_edges
1363 WHERE expired_at IS NOT NULL
1364 AND expired_at < datetime('now', '-' || ? || ' days')"
1365 ))
1366 .bind(days)
1367 .execute(&self.pool)
1368 .await?;
1369 Ok(usize::try_from(result.rows_affected())?)
1370 }
1371
1372 pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
1378 let days = i64::from(retention_days);
1379 let result = zeph_db::query(sql!(
1380 "DELETE FROM graph_entities
1381 WHERE id NOT IN (
1382 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
1383 UNION
1384 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
1385 )
1386 AND last_seen_at < datetime('now', '-' || ? || ' days')"
1387 ))
1388 .bind(days)
1389 .execute(&self.pool)
1390 .await?;
1391 Ok(usize::try_from(result.rows_affected())?)
1392 }
1393
1394 pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1403 let current = self.entity_count().await?;
1404 let max = i64::try_from(max_entities)?;
1405 if current <= max {
1406 return Ok(0);
1407 }
1408 let excess = current - max;
1409 let result = zeph_db::query(sql!(
1410 "DELETE FROM graph_entities
1411 WHERE id IN (
1412 SELECT e.id
1413 FROM graph_entities e
1414 LEFT JOIN (
1415 SELECT source_entity_id AS eid, COUNT(*) AS cnt
1416 FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1417 UNION ALL
1418 SELECT target_entity_id AS eid, COUNT(*) AS cnt
1419 FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1420 ) edge_counts ON e.id = edge_counts.eid
1421 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1422 LIMIT ?
1423 )"
1424 ))
1425 .bind(excess)
1426 .execute(&self.pool)
1427 .await?;
1428 Ok(usize::try_from(result.rows_affected())?)
1429 }
1430
1431 pub async fn edges_at_timestamp(
1445 &self,
1446 entity_id: i64,
1447 timestamp: &str,
1448 ) -> Result<Vec<Edge>, MemoryError> {
1449 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1453 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1454 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1455 edge_type, retrieval_count, last_retrieved_at, superseded_by
1456 FROM graph_edges
1457 WHERE valid_to IS NULL
1458 AND valid_from <= ?
1459 AND (source_entity_id = ? OR target_entity_id = ?)
1460 UNION ALL
1461 SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1462 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1463 edge_type, retrieval_count, last_retrieved_at, superseded_by
1464 FROM graph_edges
1465 WHERE valid_to IS NOT NULL
1466 AND valid_from <= ?
1467 AND valid_to > ?
1468 AND (source_entity_id = ? OR target_entity_id = ?)"
1469 ))
1470 .bind(timestamp)
1471 .bind(entity_id)
1472 .bind(entity_id)
1473 .bind(timestamp)
1474 .bind(timestamp)
1475 .bind(entity_id)
1476 .bind(entity_id)
1477 .fetch_all(&self.pool)
1478 .await?;
1479 Ok(rows.into_iter().map(edge_from_row).collect())
1480 }
1481
1482 pub async fn edge_history(
1491 &self,
1492 source_entity_id: i64,
1493 predicate: &str,
1494 relation: Option<&str>,
1495 limit: usize,
1496 ) -> Result<Vec<Edge>, MemoryError> {
1497 let escaped = predicate
1499 .replace('\\', "\\\\")
1500 .replace('%', "\\%")
1501 .replace('_', "\\_");
1502 let like_pattern = format!("%{escaped}%");
1503 let limit = i64::try_from(limit)?;
1504 let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1505 zeph_db::query_as(sql!(
1506 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1507 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1508 edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
1509 FROM graph_edges
1510 WHERE source_entity_id = ?
1511 AND fact LIKE ? ESCAPE '\\'
1512 AND relation = ?
1513 ORDER BY valid_from DESC
1514 LIMIT ?"
1515 ))
1516 .bind(source_entity_id)
1517 .bind(&like_pattern)
1518 .bind(rel)
1519 .bind(limit)
1520 .fetch_all(&self.pool)
1521 .await?
1522 } else {
1523 zeph_db::query_as(sql!(
1524 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1525 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1526 edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
1527 FROM graph_edges
1528 WHERE source_entity_id = ?
1529 AND fact LIKE ? ESCAPE '\\'
1530 ORDER BY valid_from DESC
1531 LIMIT ?"
1532 ))
1533 .bind(source_entity_id)
1534 .bind(&like_pattern)
1535 .bind(limit)
1536 .fetch_all(&self.pool)
1537 .await?
1538 };
1539 Ok(rows.into_iter().map(edge_from_row).collect())
1540 }
1541
1542 pub async fn bfs(
1559 &self,
1560 start_entity_id: i64,
1561 max_hops: u32,
1562 ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1563 self.bfs_with_depth(start_entity_id, max_hops)
1564 .await
1565 .map(|(e, ed, _)| (e, ed))
1566 }
1567
1568 pub async fn bfs_with_depth(
1579 &self,
1580 start_entity_id: i64,
1581 max_hops: u32,
1582 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1583 self.bfs_core(start_entity_id, max_hops, None).await
1584 }
1585
1586 pub async fn bfs_at_timestamp(
1597 &self,
1598 start_entity_id: i64,
1599 max_hops: u32,
1600 timestamp: &str,
1601 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1602 self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1603 .await
1604 }
1605
1606 pub async fn bfs_typed(
1622 &self,
1623 start_entity_id: i64,
1624 max_hops: u32,
1625 edge_types: &[EdgeType],
1626 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1627 if edge_types.is_empty() {
1628 return self.bfs_with_depth(start_entity_id, max_hops).await;
1629 }
1630 self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1631 .await
1632 }
1633
1634 async fn bfs_core(
1642 &self,
1643 start_entity_id: i64,
1644 max_hops: u32,
1645 at_timestamp: Option<&str>,
1646 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1647 use std::collections::HashMap;
1648
1649 const MAX_FRONTIER: usize = 300;
1652
1653 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1654 let mut frontier: Vec<i64> = vec![start_entity_id];
1655 depth_map.insert(start_entity_id, 0);
1656
1657 for hop in 0..max_hops {
1658 if frontier.is_empty() {
1659 break;
1660 }
1661 frontier.truncate(MAX_FRONTIER);
1662 let n = frontier.len();
1666 let ph1 = placeholder_list(1, n);
1667 let ph2 = placeholder_list(n + 1, n);
1668 let ph3 = placeholder_list(n * 2 + 1, n);
1669 let edge_filter = if at_timestamp.is_some() {
1670 let ts_pos = n * 3 + 1;
1671 format!(
1672 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1673 ts = numbered_placeholder(ts_pos),
1674 )
1675 } else {
1676 "valid_to IS NULL".to_owned()
1677 };
1678 let neighbour_sql = format!(
1679 "SELECT DISTINCT CASE
1680 WHEN source_entity_id IN ({ph1}) THEN target_entity_id
1681 ELSE source_entity_id
1682 END as neighbour_id
1683 FROM graph_edges
1684 WHERE {edge_filter}
1685 AND (source_entity_id IN ({ph2}) OR target_entity_id IN ({ph3}))"
1686 );
1687 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1688 for id in &frontier {
1689 q = q.bind(*id);
1690 }
1691 for id in &frontier {
1692 q = q.bind(*id);
1693 }
1694 for id in &frontier {
1695 q = q.bind(*id);
1696 }
1697 if let Some(ts) = at_timestamp {
1698 q = q.bind(ts);
1699 }
1700 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1701 let mut next_frontier: Vec<i64> = Vec::new();
1702 for nbr in neighbours {
1703 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1704 e.insert(hop + 1);
1705 next_frontier.push(nbr);
1706 }
1707 }
1708 frontier = next_frontier;
1709 }
1710
1711 self.bfs_fetch_results(depth_map, at_timestamp).await
1712 }
1713
1714 async fn bfs_core_typed(
1723 &self,
1724 start_entity_id: i64,
1725 max_hops: u32,
1726 at_timestamp: Option<&str>,
1727 edge_types: &[EdgeType],
1728 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1729 use std::collections::HashMap;
1730
1731 const MAX_FRONTIER: usize = 300;
1732
1733 let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1734
1735 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1736 let mut frontier: Vec<i64> = vec![start_entity_id];
1737 depth_map.insert(start_entity_id, 0);
1738
1739 let n_types = type_strs.len();
1740 let type_in = placeholder_list(1, n_types);
1742 let id_start = n_types + 1;
1743
1744 for hop in 0..max_hops {
1745 if frontier.is_empty() {
1746 break;
1747 }
1748 frontier.truncate(MAX_FRONTIER);
1749
1750 let n_frontier = frontier.len();
1751 let fp1 = placeholder_list(id_start, n_frontier);
1753 let fp2 = placeholder_list(id_start + n_frontier, n_frontier);
1754 let fp3 = placeholder_list(id_start + n_frontier * 2, n_frontier);
1755
1756 let edge_filter = if at_timestamp.is_some() {
1757 let ts_pos = id_start + n_frontier * 3;
1758 format!(
1759 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1760 ts = numbered_placeholder(ts_pos),
1761 )
1762 } else {
1763 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1764 };
1765
1766 let neighbour_sql = format!(
1767 "SELECT DISTINCT CASE
1768 WHEN source_entity_id IN ({fp1}) THEN target_entity_id
1769 ELSE source_entity_id
1770 END as neighbour_id
1771 FROM graph_edges
1772 WHERE {edge_filter}
1773 AND (source_entity_id IN ({fp2}) OR target_entity_id IN ({fp3}))"
1774 );
1775
1776 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1777 for t in &type_strs {
1779 q = q.bind(*t);
1780 }
1781 for id in &frontier {
1783 q = q.bind(*id);
1784 }
1785 for id in &frontier {
1786 q = q.bind(*id);
1787 }
1788 for id in &frontier {
1789 q = q.bind(*id);
1790 }
1791 if let Some(ts) = at_timestamp {
1792 q = q.bind(ts);
1793 }
1794
1795 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1796 let mut next_frontier: Vec<i64> = Vec::new();
1797 for nbr in neighbours {
1798 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1799 e.insert(hop + 1);
1800 next_frontier.push(nbr);
1801 }
1802 }
1803 frontier = next_frontier;
1804 }
1805
1806 self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1808 .await
1809 }
1810
1811 async fn bfs_fetch_results_typed(
1819 &self,
1820 depth_map: std::collections::HashMap<i64, u32>,
1821 at_timestamp: Option<&str>,
1822 type_strs: &[&str],
1823 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1824 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1825 if visited_ids.is_empty() {
1826 return Ok((Vec::new(), Vec::new(), depth_map));
1827 }
1828 if visited_ids.len() > 499 {
1829 tracing::warn!(
1830 total = visited_ids.len(),
1831 retained = 499,
1832 "bfs_fetch_results_typed: visited entity set truncated to 499"
1833 );
1834 visited_ids.truncate(499);
1835 }
1836
1837 let n_types = type_strs.len();
1838 let n_visited = visited_ids.len();
1839
1840 let type_in = placeholder_list(1, n_types);
1842 let id_start = n_types + 1;
1843 let ph_ids1 = placeholder_list(id_start, n_visited);
1844 let ph_ids2 = placeholder_list(id_start + n_visited, n_visited);
1845
1846 let edge_filter = if at_timestamp.is_some() {
1847 let ts_pos = id_start + n_visited * 2;
1848 format!(
1849 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1850 ts = numbered_placeholder(ts_pos),
1851 )
1852 } else {
1853 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1854 };
1855
1856 let edge_sql = format!(
1857 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1858 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1859 edge_type, retrieval_count, last_retrieved_at, superseded_by
1860 FROM graph_edges
1861 WHERE {edge_filter}
1862 AND source_entity_id IN ({ph_ids1})
1863 AND target_entity_id IN ({ph_ids2})"
1864 );
1865 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1866 for t in type_strs {
1867 edge_query = edge_query.bind(*t);
1868 }
1869 for id in &visited_ids {
1870 edge_query = edge_query.bind(*id);
1871 }
1872 for id in &visited_ids {
1873 edge_query = edge_query.bind(*id);
1874 }
1875 if let Some(ts) = at_timestamp {
1876 edge_query = edge_query.bind(ts);
1877 }
1878 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1879
1880 let entity_sql2 = format!(
1882 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1883 FROM graph_entities WHERE id IN ({ph})",
1884 ph = placeholder_list(1, visited_ids.len()),
1885 );
1886 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql2);
1887 for id in &visited_ids {
1888 entity_query = entity_query.bind(*id);
1889 }
1890 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1891
1892 let entities: Vec<Entity> = entity_rows
1893 .into_iter()
1894 .map(entity_from_row)
1895 .collect::<Result<Vec<_>, _>>()?;
1896 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1897
1898 Ok((entities, edges, depth_map))
1899 }
1900
1901 async fn bfs_fetch_results(
1903 &self,
1904 depth_map: std::collections::HashMap<i64, u32>,
1905 at_timestamp: Option<&str>,
1906 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1907 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1908 if visited_ids.is_empty() {
1909 return Ok((Vec::new(), Vec::new(), depth_map));
1910 }
1911 if visited_ids.len() > 499 {
1913 tracing::warn!(
1914 total = visited_ids.len(),
1915 retained = 499,
1916 "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1917 some reachable entities will be dropped from results"
1918 );
1919 visited_ids.truncate(499);
1920 }
1921
1922 let n = visited_ids.len();
1923 let ph_ids1 = placeholder_list(1, n);
1924 let ph_ids2 = placeholder_list(n + 1, n);
1925 let edge_filter = if at_timestamp.is_some() {
1926 let ts_pos = n * 2 + 1;
1927 format!(
1928 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1929 ts = numbered_placeholder(ts_pos),
1930 )
1931 } else {
1932 "valid_to IS NULL".to_owned()
1933 };
1934 let edge_sql = format!(
1935 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1936 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1937 edge_type, retrieval_count, last_retrieved_at, superseded_by
1938 FROM graph_edges
1939 WHERE {edge_filter}
1940 AND source_entity_id IN ({ph_ids1})
1941 AND target_entity_id IN ({ph_ids2})"
1942 );
1943 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1944 for id in &visited_ids {
1945 edge_query = edge_query.bind(*id);
1946 }
1947 for id in &visited_ids {
1948 edge_query = edge_query.bind(*id);
1949 }
1950 if let Some(ts) = at_timestamp {
1951 edge_query = edge_query.bind(ts);
1952 }
1953 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1954
1955 let entity_sql = format!(
1956 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1957 FROM graph_entities WHERE id IN ({ph})",
1958 ph = placeholder_list(1, n),
1959 );
1960 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql);
1961 for id in &visited_ids {
1962 entity_query = entity_query.bind(*id);
1963 }
1964 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1965
1966 let entities: Vec<Entity> = entity_rows
1967 .into_iter()
1968 .map(entity_from_row)
1969 .collect::<Result<Vec<_>, _>>()?;
1970 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1971
1972 Ok((entities, edges, depth_map))
1973 }
1974
1975 pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
1991 let find_by_name_sql = format!(
1992 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id \
1993 FROM graph_entities \
1994 WHERE name = ? {cn} OR canonical_name = ? {cn} \
1995 LIMIT 5",
1996 cn = <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
1997 );
1998 let rows: Vec<EntityRow> = zeph_db::query_as(&find_by_name_sql)
1999 .bind(name)
2000 .bind(name)
2001 .fetch_all(&self.pool)
2002 .await?;
2003
2004 if !rows.is_empty() {
2005 return rows.into_iter().map(entity_from_row).collect();
2006 }
2007
2008 self.find_entities_fuzzy(name, 5).await
2009 }
2010
2011 pub async fn unprocessed_messages_for_backfill(
2019 &self,
2020 limit: usize,
2021 ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
2022 let limit = i64::try_from(limit)?;
2023 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
2024 "SELECT id, content FROM messages
2025 WHERE graph_processed = 0
2026 ORDER BY id ASC
2027 LIMIT ?"
2028 ))
2029 .bind(limit)
2030 .fetch_all(&self.pool)
2031 .await?;
2032 Ok(rows
2033 .into_iter()
2034 .map(|(id, content)| (crate::types::MessageId(id), content))
2035 .collect())
2036 }
2037
2038 pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
2044 let count: i64 = zeph_db::query_scalar(sql!(
2045 "SELECT COUNT(*) FROM messages WHERE graph_processed = 0"
2046 ))
2047 .fetch_one(&self.pool)
2048 .await?;
2049 Ok(count)
2050 }
2051
2052 pub async fn mark_messages_graph_processed(
2058 &self,
2059 ids: &[crate::types::MessageId],
2060 ) -> Result<(), MemoryError> {
2061 const MAX_BATCH: usize = 490;
2062 if ids.is_empty() {
2063 return Ok(());
2064 }
2065 for chunk in ids.chunks(MAX_BATCH) {
2066 let placeholders = placeholder_list(1, chunk.len());
2067 let sql =
2068 format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
2069 let mut query = zeph_db::query(&sql);
2070 for id in chunk {
2071 query = query.bind(id.0);
2072 }
2073 query.execute(&self.pool).await?;
2074 }
2075 Ok(())
2076 }
2077}
2078
2079#[cfg(feature = "sqlite")]
2082fn community_ids_sql(placeholders: &str) -> String {
2083 format!(
2084 "SELECT CAST(j.value AS INTEGER) AS entity_id, c.id AS community_id
2085 FROM graph_communities c, json_each(c.entity_ids) j
2086 WHERE CAST(j.value AS INTEGER) IN ({placeholders})"
2087 )
2088}
2089
2090#[cfg(feature = "postgres")]
2091fn community_ids_sql(placeholders: &str) -> String {
2092 format!(
2093 "SELECT (j.value)::bigint AS entity_id, c.id AS community_id
2094 FROM graph_communities c,
2095 jsonb_array_elements_text(c.entity_ids::jsonb) j(value)
2096 WHERE (j.value)::bigint IN ({placeholders})"
2097 )
2098}
2099
2100#[derive(zeph_db::FromRow)]
2103struct EntityRow {
2104 id: i64,
2105 name: String,
2106 canonical_name: String,
2107 entity_type: String,
2108 summary: Option<String>,
2109 first_seen_at: String,
2110 last_seen_at: String,
2111 qdrant_point_id: Option<String>,
2112}
2113
2114fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
2115 let entity_type = row
2116 .entity_type
2117 .parse::<EntityType>()
2118 .map_err(MemoryError::GraphStore)?;
2119 Ok(Entity {
2120 id: row.id,
2121 name: row.name,
2122 canonical_name: row.canonical_name,
2123 entity_type,
2124 summary: row.summary,
2125 first_seen_at: row.first_seen_at,
2126 last_seen_at: row.last_seen_at,
2127 qdrant_point_id: row.qdrant_point_id,
2128 })
2129}
2130
2131#[derive(zeph_db::FromRow)]
2132struct AliasRow {
2133 id: i64,
2134 entity_id: i64,
2135 alias_name: String,
2136 created_at: String,
2137}
2138
2139fn alias_from_row(row: AliasRow) -> EntityAlias {
2140 EntityAlias {
2141 id: row.id,
2142 entity_id: row.entity_id,
2143 alias_name: row.alias_name,
2144 created_at: row.created_at,
2145 }
2146}
2147
2148#[derive(zeph_db::FromRow)]
2149struct EdgeRow {
2150 id: i64,
2151 source_entity_id: i64,
2152 target_entity_id: i64,
2153 relation: String,
2154 fact: String,
2155 confidence: f64,
2156 valid_from: String,
2157 valid_to: Option<String>,
2158 created_at: String,
2159 expired_at: Option<String>,
2160 episode_id: Option<i64>,
2161 qdrant_point_id: Option<String>,
2162 edge_type: String,
2163 retrieval_count: i32,
2164 last_retrieved_at: Option<i64>,
2165 superseded_by: Option<i64>,
2166}
2167
2168fn edge_from_row(row: EdgeRow) -> Edge {
2169 let edge_type = row
2170 .edge_type
2171 .parse::<EdgeType>()
2172 .unwrap_or(EdgeType::Semantic);
2173 Edge {
2174 id: row.id,
2175 source_entity_id: row.source_entity_id,
2176 target_entity_id: row.target_entity_id,
2177 relation: row.relation,
2178 fact: row.fact,
2179 #[allow(clippy::cast_possible_truncation)]
2180 confidence: row.confidence as f32,
2181 valid_from: row.valid_from,
2182 valid_to: row.valid_to,
2183 created_at: row.created_at,
2184 expired_at: row.expired_at,
2185 episode_id: row.episode_id.map(MessageId),
2186 qdrant_point_id: row.qdrant_point_id,
2187 edge_type,
2188 retrieval_count: row.retrieval_count,
2189 last_retrieved_at: row.last_retrieved_at,
2190 superseded_by: row.superseded_by,
2191 }
2192}
2193
2194#[derive(zeph_db::FromRow)]
2195struct CommunityRow {
2196 id: i64,
2197 name: String,
2198 summary: String,
2199 entity_ids: String,
2200 fingerprint: Option<String>,
2201 created_at: String,
2202 updated_at: String,
2203}
2204
2205#[cfg(test)]
2208mod tests;