1use std::collections::HashMap;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use futures::Stream;
9use zeph_db::fts::sanitize_fts_query;
10use zeph_db::{ActiveDialect, DbPool, numbered_placeholder, placeholder_list};
11
12use crate::error::MemoryError;
13use crate::types::MessageId;
14
15use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
16
17pub struct GraphStore {
18 pool: DbPool,
19}
20
21impl GraphStore {
22 #[must_use]
23 pub fn new(pool: DbPool) -> Self {
24 Self { pool }
25 }
26
27 #[must_use]
28 pub fn pool(&self) -> &DbPool {
29 &self.pool
30 }
31
32 pub async fn upsert_entity(
45 &self,
46 surface_name: &str,
47 canonical_name: &str,
48 entity_type: EntityType,
49 summary: Option<&str>,
50 ) -> Result<i64, MemoryError> {
51 let type_str = entity_type.as_str();
52 let id: i64 = zeph_db::query_scalar(sql!(
53 "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
54 VALUES (?, ?, ?, ?)
55 ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
56 name = excluded.name,
57 summary = COALESCE(excluded.summary, summary),
58 last_seen_at = CURRENT_TIMESTAMP
59 RETURNING id"
60 ))
61 .bind(surface_name)
62 .bind(canonical_name)
63 .bind(type_str)
64 .bind(summary)
65 .fetch_one(&self.pool)
66 .await?;
67 Ok(id)
68 }
69
70 pub async fn find_entity(
76 &self,
77 canonical_name: &str,
78 entity_type: EntityType,
79 ) -> Result<Option<Entity>, MemoryError> {
80 let type_str = entity_type.as_str();
81 let row: Option<EntityRow> = zeph_db::query_as(
82 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
83 FROM graph_entities
84 WHERE canonical_name = ? AND entity_type = ?"),
85 )
86 .bind(canonical_name)
87 .bind(type_str)
88 .fetch_optional(&self.pool)
89 .await?;
90 row.map(entity_from_row).transpose()
91 }
92
93 pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
99 let row: Option<EntityRow> = zeph_db::query_as(
100 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
101 FROM graph_entities
102 WHERE id = ?"),
103 )
104 .bind(entity_id)
105 .fetch_optional(&self.pool)
106 .await?;
107 row.map(entity_from_row).transpose()
108 }
109
110 pub async fn set_entity_qdrant_point_id(
116 &self,
117 entity_id: i64,
118 point_id: &str,
119 ) -> Result<(), MemoryError> {
120 zeph_db::query(sql!(
121 "UPDATE graph_entities SET qdrant_point_id = ? WHERE id = ?"
122 ))
123 .bind(point_id)
124 .bind(entity_id)
125 .execute(&self.pool)
126 .await?;
127 Ok(())
128 }
129
130 pub async fn find_entities_fuzzy(
151 &self,
152 query: &str,
153 limit: usize,
154 ) -> Result<Vec<Entity>, MemoryError> {
155 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
159 let query = &query[..query.floor_char_boundary(512)];
160 let sanitized = sanitize_fts_query(query);
163 if sanitized.is_empty() {
164 return Ok(vec![]);
165 }
166 let fts_query: String = sanitized
167 .split_whitespace()
168 .filter(|t| !FTS5_OPERATORS.contains(t))
169 .map(|t| format!("{t}*"))
170 .collect::<Vec<_>>()
171 .join(" ");
172 if fts_query.is_empty() {
173 return Ok(vec![]);
174 }
175
176 let limit = i64::try_from(limit)?;
177 let search_sql = format!(
180 "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
181 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
182 FROM graph_entities_fts fts \
183 JOIN graph_entities e ON e.id = fts.rowid \
184 WHERE graph_entities_fts MATCH ? \
185 UNION \
186 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
187 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
188 FROM graph_entity_aliases a \
189 JOIN graph_entities e ON e.id = a.entity_id \
190 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
191 LIMIT ?",
192 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
193 );
194 let rows: Vec<EntityRow> = zeph_db::query_as(&search_sql)
195 .bind(&fts_query)
196 .bind(format!(
197 "%{}%",
198 query
199 .trim()
200 .replace('\\', "\\\\")
201 .replace('%', "\\%")
202 .replace('_', "\\_")
203 ))
204 .bind(limit)
205 .fetch_all(&self.pool)
206 .await?;
207 rows.into_iter()
208 .map(entity_from_row)
209 .collect::<Result<Vec<_>, _>>()
210 }
211
212 #[cfg(feature = "sqlite")]
222 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
223 zeph_db::query("PRAGMA wal_checkpoint(PASSIVE)")
224 .execute(&self.pool)
225 .await?;
226 Ok(())
227 }
228
229 #[cfg(feature = "postgres")]
235 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
236 Ok(())
237 }
238
239 pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
241 use futures::StreamExt as _;
242 zeph_db::query_as::<_, EntityRow>(
243 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
244 FROM graph_entities ORDER BY id ASC"),
245 )
246 .fetch(&self.pool)
247 .map(|r: Result<EntityRow, zeph_db::SqlxError>| {
248 r.map_err(MemoryError::from).and_then(entity_from_row)
249 })
250 }
251
252 pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
260 let insert_alias_sql = format!(
261 "{} INTO graph_entity_aliases (entity_id, alias_name) VALUES (?, ?){}",
262 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
263 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
264 );
265 zeph_db::query(&insert_alias_sql)
266 .bind(entity_id)
267 .bind(alias_name)
268 .execute(&self.pool)
269 .await?;
270 Ok(())
271 }
272
273 pub async fn find_entity_by_alias(
281 &self,
282 alias_name: &str,
283 entity_type: EntityType,
284 ) -> Result<Option<Entity>, MemoryError> {
285 let type_str = entity_type.as_str();
286 let alias_typed_sql = format!(
287 "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
288 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
289 FROM graph_entity_aliases a \
290 JOIN graph_entities e ON e.id = a.entity_id \
291 WHERE a.alias_name = ? {} \
292 AND e.entity_type = ? \
293 ORDER BY e.id ASC \
294 LIMIT 1",
295 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
296 );
297 let row: Option<EntityRow> = zeph_db::query_as(&alias_typed_sql)
298 .bind(alias_name)
299 .bind(type_str)
300 .fetch_optional(&self.pool)
301 .await?;
302 row.map(entity_from_row).transpose()
303 }
304
305 pub async fn aliases_for_entity(
311 &self,
312 entity_id: i64,
313 ) -> Result<Vec<EntityAlias>, MemoryError> {
314 let rows: Vec<AliasRow> = zeph_db::query_as(sql!(
315 "SELECT id, entity_id, alias_name, created_at
316 FROM graph_entity_aliases
317 WHERE entity_id = ?
318 ORDER BY id ASC"
319 ))
320 .bind(entity_id)
321 .fetch_all(&self.pool)
322 .await?;
323 Ok(rows.into_iter().map(alias_from_row).collect())
324 }
325
326 pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
332 use futures::TryStreamExt as _;
333 self.all_entities_stream().try_collect().await
334 }
335
336 pub async fn entity_count(&self) -> Result<i64, MemoryError> {
342 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_entities"))
343 .fetch_one(&self.pool)
344 .await?;
345 Ok(count)
346 }
347
348 pub async fn insert_edge(
366 &self,
367 source_entity_id: i64,
368 target_entity_id: i64,
369 relation: &str,
370 fact: &str,
371 confidence: f32,
372 episode_id: Option<MessageId>,
373 ) -> Result<i64, MemoryError> {
374 self.insert_edge_typed(
375 source_entity_id,
376 target_entity_id,
377 relation,
378 fact,
379 confidence,
380 episode_id,
381 EdgeType::Semantic,
382 )
383 .await
384 }
385
386 #[allow(clippy::too_many_arguments)]
395 pub async fn insert_edge_typed(
396 &self,
397 source_entity_id: i64,
398 target_entity_id: i64,
399 relation: &str,
400 fact: &str,
401 confidence: f32,
402 episode_id: Option<MessageId>,
403 edge_type: EdgeType,
404 ) -> Result<i64, MemoryError> {
405 if source_entity_id == target_entity_id {
406 return Err(MemoryError::InvalidInput(format!(
407 "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
408 )));
409 }
410 let confidence = confidence.clamp(0.0, 1.0);
411 let edge_type_str = edge_type.as_str();
412
413 let mut tx = zeph_db::begin(&self.pool).await?;
418
419 let existing: Option<(i64, f64)> = zeph_db::query_as(sql!(
420 "SELECT id, confidence FROM graph_edges
421 WHERE source_entity_id = ?
422 AND target_entity_id = ?
423 AND relation = ?
424 AND edge_type = ?
425 AND valid_to IS NULL
426 LIMIT 1"
427 ))
428 .bind(source_entity_id)
429 .bind(target_entity_id)
430 .bind(relation)
431 .bind(edge_type_str)
432 .fetch_optional(&mut *tx)
433 .await?;
434
435 if let Some((existing_id, stored_conf)) = existing {
436 let updated_conf = f64::from(confidence).max(stored_conf);
437 zeph_db::query(sql!("UPDATE graph_edges SET confidence = ? WHERE id = ?"))
438 .bind(updated_conf)
439 .bind(existing_id)
440 .execute(&mut *tx)
441 .await?;
442 tx.commit().await?;
443 return Ok(existing_id);
444 }
445
446 let episode_raw: Option<i64> = episode_id.map(|m| m.0);
447 let id: i64 = zeph_db::query_scalar(sql!(
448 "INSERT INTO graph_edges
449 (source_entity_id, target_entity_id, relation, fact, confidence, episode_id, edge_type)
450 VALUES (?, ?, ?, ?, ?, ?, ?)
451 RETURNING id"
452 ))
453 .bind(source_entity_id)
454 .bind(target_entity_id)
455 .bind(relation)
456 .bind(fact)
457 .bind(f64::from(confidence))
458 .bind(episode_raw)
459 .bind(edge_type_str)
460 .fetch_one(&mut *tx)
461 .await?;
462 tx.commit().await?;
463 Ok(id)
464 }
465
466 pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
472 zeph_db::query(sql!(
473 "UPDATE graph_edges SET valid_to = CURRENT_TIMESTAMP, expired_at = CURRENT_TIMESTAMP
474 WHERE id = ?"
475 ))
476 .bind(edge_id)
477 .execute(&self.pool)
478 .await?;
479 Ok(())
480 }
481
482 pub async fn invalidate_edge_with_supersession(
490 &self,
491 old_edge_id: i64,
492 new_edge_id: i64,
493 ) -> Result<(), MemoryError> {
494 zeph_db::query(sql!(
495 "UPDATE graph_edges
496 SET valid_to = CURRENT_TIMESTAMP,
497 expired_at = CURRENT_TIMESTAMP,
498 superseded_by = ?
499 WHERE id = ?"
500 ))
501 .bind(new_edge_id)
502 .bind(old_edge_id)
503 .execute(&self.pool)
504 .await?;
505 Ok(())
506 }
507
508 pub async fn edges_for_entities(
525 &self,
526 entity_ids: &[i64],
527 edge_types: &[super::types::EdgeType],
528 ) -> Result<Vec<Edge>, MemoryError> {
529 const MAX_BATCH_ENTITIES: usize = 490;
533
534 let mut all_edges: Vec<Edge> = Vec::new();
535
536 for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
537 let edges = self.query_batch_edges(chunk, edge_types).await?;
538 all_edges.extend(edges);
539 }
540
541 Ok(all_edges)
542 }
543
544 async fn query_batch_edges(
548 &self,
549 entity_ids: &[i64],
550 edge_types: &[super::types::EdgeType],
551 ) -> Result<Vec<Edge>, MemoryError> {
552 if entity_ids.is_empty() {
553 return Ok(Vec::new());
554 }
555
556 let n_ids = entity_ids.len();
559 let n_types = edge_types.len();
560
561 let sql = if n_types == 0 {
562 let placeholders = placeholder_list(1, n_ids);
564 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
565 format!(
566 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
567 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
568 edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
569 FROM graph_edges
570 WHERE valid_to IS NULL
571 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))"
572 )
573 } else {
574 let placeholders = placeholder_list(1, n_ids);
575 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
576 let type_placeholders = placeholder_list(n_ids * 2 + 1, n_types);
577 format!(
578 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
579 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
580 edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
581 FROM graph_edges
582 WHERE valid_to IS NULL
583 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))
584 AND edge_type IN ({type_placeholders})"
585 )
586 };
587
588 let mut query = zeph_db::query_as::<_, EdgeRow>(&sql);
590 for id in entity_ids {
591 query = query.bind(*id);
592 }
593 for id in entity_ids {
594 query = query.bind(*id);
595 }
596 for et in edge_types {
597 query = query.bind(et.as_str());
598 }
599
600 let rows: Vec<EdgeRow> = tokio::time::timeout(
604 std::time::Duration::from_millis(500),
605 query.fetch_all(&self.pool),
606 )
607 .await
608 .map_err(|_| MemoryError::Timeout("graph pool acquire timed out after 500ms".into()))??;
609 Ok(rows.into_iter().map(edge_from_row).collect())
610 }
611
612 pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
618 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
619 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
620 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
621 edge_type, retrieval_count, last_retrieved_at, superseded_by
622 FROM graph_edges
623 WHERE valid_to IS NULL
624 AND (source_entity_id = ? OR target_entity_id = ?)"
625 ))
626 .bind(entity_id)
627 .bind(entity_id)
628 .fetch_all(&self.pool)
629 .await?;
630 Ok(rows.into_iter().map(edge_from_row).collect())
631 }
632
633 pub async fn edge_history_for_entity(
640 &self,
641 entity_id: i64,
642 limit: usize,
643 ) -> Result<Vec<Edge>, MemoryError> {
644 let limit = i64::try_from(limit)?;
645 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
646 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
647 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
648 edge_type, retrieval_count, last_retrieved_at, superseded_by
649 FROM graph_edges
650 WHERE source_entity_id = ? OR target_entity_id = ?
651 ORDER BY valid_from DESC
652 LIMIT ?"
653 ))
654 .bind(entity_id)
655 .bind(entity_id)
656 .bind(limit)
657 .fetch_all(&self.pool)
658 .await?;
659 Ok(rows.into_iter().map(edge_from_row).collect())
660 }
661
662 pub async fn edges_between(
668 &self,
669 entity_a: i64,
670 entity_b: i64,
671 ) -> Result<Vec<Edge>, MemoryError> {
672 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
673 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
674 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
675 edge_type, retrieval_count, last_retrieved_at, superseded_by
676 FROM graph_edges
677 WHERE valid_to IS NULL
678 AND ((source_entity_id = ? AND target_entity_id = ?)
679 OR (source_entity_id = ? AND target_entity_id = ?))"
680 ))
681 .bind(entity_a)
682 .bind(entity_b)
683 .bind(entity_b)
684 .bind(entity_a)
685 .fetch_all(&self.pool)
686 .await?;
687 Ok(rows.into_iter().map(edge_from_row).collect())
688 }
689
690 pub async fn edges_exact(
696 &self,
697 source_entity_id: i64,
698 target_entity_id: i64,
699 ) -> Result<Vec<Edge>, MemoryError> {
700 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
701 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
702 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
703 edge_type, retrieval_count, last_retrieved_at, superseded_by
704 FROM graph_edges
705 WHERE valid_to IS NULL
706 AND source_entity_id = ?
707 AND target_entity_id = ?"
708 ))
709 .bind(source_entity_id)
710 .bind(target_entity_id)
711 .fetch_all(&self.pool)
712 .await?;
713 Ok(rows.into_iter().map(edge_from_row).collect())
714 }
715
716 pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
722 let count: i64 = zeph_db::query_scalar(sql!(
723 "SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL"
724 ))
725 .fetch_one(&self.pool)
726 .await?;
727 Ok(count)
728 }
729
730 pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
736 let rows: Vec<(String, i64)> = zeph_db::query_as(
737 sql!("SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type"),
738 )
739 .fetch_all(&self.pool)
740 .await?;
741 Ok(rows)
742 }
743
744 pub async fn upsert_community(
756 &self,
757 name: &str,
758 summary: &str,
759 entity_ids: &[i64],
760 fingerprint: Option<&str>,
761 ) -> Result<i64, MemoryError> {
762 let entity_ids_json = serde_json::to_string(entity_ids)?;
763 let id: i64 = zeph_db::query_scalar(sql!(
764 "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
765 VALUES (?, ?, ?, ?)
766 ON CONFLICT(name) DO UPDATE SET
767 summary = excluded.summary,
768 entity_ids = excluded.entity_ids,
769 fingerprint = COALESCE(excluded.fingerprint, fingerprint),
770 updated_at = CURRENT_TIMESTAMP
771 RETURNING id"
772 ))
773 .bind(name)
774 .bind(summary)
775 .bind(entity_ids_json)
776 .bind(fingerprint)
777 .fetch_one(&self.pool)
778 .await?;
779 Ok(id)
780 }
781
782 pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
789 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
790 "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL"
791 ))
792 .fetch_all(&self.pool)
793 .await?;
794 Ok(rows.into_iter().collect())
795 }
796
797 pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
803 zeph_db::query(sql!("DELETE FROM graph_communities WHERE id = ?"))
804 .bind(id)
805 .execute(&self.pool)
806 .await?;
807 Ok(())
808 }
809
810 pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
819 zeph_db::query(sql!(
820 "UPDATE graph_communities SET fingerprint = NULL WHERE id = ?"
821 ))
822 .bind(id)
823 .execute(&self.pool)
824 .await?;
825 Ok(())
826 }
827
828 pub async fn community_for_entity(
837 &self,
838 entity_id: i64,
839 ) -> Result<Option<Community>, MemoryError> {
840 let row: Option<CommunityRow> = zeph_db::query_as(
841 sql!("SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
842 FROM graph_communities c, json_each(c.entity_ids) j
843 WHERE CAST(j.value AS INTEGER) = ?
844 LIMIT 1"),
845 )
846 .bind(entity_id)
847 .fetch_optional(&self.pool)
848 .await?;
849 match row {
850 Some(row) => {
851 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
852 Ok(Some(Community {
853 id: row.id,
854 name: row.name,
855 summary: row.summary,
856 entity_ids,
857 fingerprint: row.fingerprint,
858 created_at: row.created_at,
859 updated_at: row.updated_at,
860 }))
861 }
862 None => Ok(None),
863 }
864 }
865
866 pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
872 let rows: Vec<CommunityRow> = zeph_db::query_as(sql!(
873 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
874 FROM graph_communities
875 ORDER BY id ASC"
876 ))
877 .fetch_all(&self.pool)
878 .await?;
879
880 rows.into_iter()
881 .map(|row| {
882 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
883 Ok(Community {
884 id: row.id,
885 name: row.name,
886 summary: row.summary,
887 entity_ids,
888 fingerprint: row.fingerprint,
889 created_at: row.created_at,
890 updated_at: row.updated_at,
891 })
892 })
893 .collect()
894 }
895
896 pub async fn community_count(&self) -> Result<i64, MemoryError> {
902 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_communities"))
903 .fetch_one(&self.pool)
904 .await?;
905 Ok(count)
906 }
907
908 pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
916 let val: Option<String> =
917 zeph_db::query_scalar(sql!("SELECT value FROM graph_metadata WHERE key = ?"))
918 .bind(key)
919 .fetch_optional(&self.pool)
920 .await?;
921 Ok(val)
922 }
923
924 pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
930 zeph_db::query(sql!(
931 "INSERT INTO graph_metadata (key, value) VALUES (?, ?)
932 ON CONFLICT(key) DO UPDATE SET value = excluded.value"
933 ))
934 .bind(key)
935 .bind(value)
936 .execute(&self.pool)
937 .await?;
938 Ok(())
939 }
940
941 pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
949 let val = self.get_metadata("extraction_count").await?;
950 Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
951 }
952
953 pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
955 use futures::StreamExt as _;
956 zeph_db::query_as::<_, EdgeRow>(sql!(
957 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
958 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
959 edge_type, retrieval_count, last_retrieved_at, superseded_by
960 FROM graph_edges
961 WHERE valid_to IS NULL
962 ORDER BY id ASC"
963 ))
964 .fetch(&self.pool)
965 .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
966 }
967
968 pub async fn edges_after_id(
985 &self,
986 after_id: i64,
987 limit: i64,
988 ) -> Result<Vec<Edge>, MemoryError> {
989 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
990 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
991 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
992 edge_type, retrieval_count, last_retrieved_at, superseded_by
993 FROM graph_edges
994 WHERE valid_to IS NULL AND id > ?
995 ORDER BY id ASC
996 LIMIT ?"
997 ))
998 .bind(after_id)
999 .bind(limit)
1000 .fetch_all(&self.pool)
1001 .await?;
1002 Ok(rows.into_iter().map(edge_from_row).collect())
1003 }
1004
1005 pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
1011 let row: Option<CommunityRow> = zeph_db::query_as(sql!(
1012 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
1013 FROM graph_communities
1014 WHERE id = ?"
1015 ))
1016 .bind(id)
1017 .fetch_optional(&self.pool)
1018 .await?;
1019 match row {
1020 Some(row) => {
1021 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
1022 Ok(Some(Community {
1023 id: row.id,
1024 name: row.name,
1025 summary: row.summary,
1026 entity_ids,
1027 fingerprint: row.fingerprint,
1028 created_at: row.created_at,
1029 updated_at: row.updated_at,
1030 }))
1031 }
1032 None => Ok(None),
1033 }
1034 }
1035
1036 pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
1042 zeph_db::query(sql!("DELETE FROM graph_communities"))
1043 .execute(&self.pool)
1044 .await?;
1045 Ok(())
1046 }
1047
1048 #[allow(clippy::too_many_lines)]
1062 pub async fn find_entities_ranked(
1063 &self,
1064 query: &str,
1065 limit: usize,
1066 ) -> Result<Vec<(Entity, f32)>, MemoryError> {
1067 type EntityFtsRow = (
1070 i64,
1071 String,
1072 String,
1073 String,
1074 Option<String>,
1075 String,
1076 String,
1077 Option<String>,
1078 f64,
1079 );
1080
1081 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
1082 let query = &query[..query.floor_char_boundary(512)];
1083 let sanitized = sanitize_fts_query(query);
1084 if sanitized.is_empty() {
1085 return Ok(vec![]);
1086 }
1087 let fts_query: String = sanitized
1088 .split_whitespace()
1089 .filter(|t| !FTS5_OPERATORS.contains(t))
1090 .map(|t| format!("{t}*"))
1091 .collect::<Vec<_>>()
1092 .join(" ");
1093 if fts_query.is_empty() {
1094 return Ok(vec![]);
1095 }
1096
1097 let limit_i64 = i64::try_from(limit)?;
1098
1099 let ranked_fts_sql = format!(
1102 "SELECT * FROM ( \
1103 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1104 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1105 -bm25(graph_entities_fts, 10.0, 1.0) AS fts_rank \
1106 FROM graph_entities_fts fts \
1107 JOIN graph_entities e ON e.id = fts.rowid \
1108 WHERE graph_entities_fts MATCH ? \
1109 UNION ALL \
1110 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1111 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1112 0.5 AS fts_rank \
1113 FROM graph_entity_aliases a \
1114 JOIN graph_entities e ON e.id = a.entity_id \
1115 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
1116 ) \
1117 ORDER BY fts_rank DESC \
1118 LIMIT ?",
1119 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
1120 );
1121 let rows: Vec<EntityFtsRow> = zeph_db::query_as(&ranked_fts_sql)
1122 .bind(&fts_query)
1123 .bind(format!(
1124 "%{}%",
1125 query
1126 .trim()
1127 .replace('\\', "\\\\")
1128 .replace('%', "\\%")
1129 .replace('_', "\\_")
1130 ))
1131 .bind(limit_i64)
1132 .fetch_all(&self.pool)
1133 .await?;
1134
1135 if rows.is_empty() {
1136 return Ok(vec![]);
1137 }
1138
1139 let max_score: f64 = rows.iter().map(|r| r.8).fold(0.0_f64, f64::max);
1141 let max_score = if max_score <= 0.0 { 1.0 } else { max_score };
1142
1143 let mut seen_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
1145 let mut result: Vec<(Entity, f32)> = Vec::with_capacity(rows.len());
1146 for (
1147 id,
1148 name,
1149 canonical_name,
1150 entity_type_str,
1151 summary,
1152 first_seen_at,
1153 last_seen_at,
1154 qdrant_point_id,
1155 raw_score,
1156 ) in rows
1157 {
1158 if !seen_ids.insert(id) {
1159 continue;
1160 }
1161 let entity_type = entity_type_str
1162 .parse()
1163 .unwrap_or(super::types::EntityType::Concept);
1164 let entity = Entity {
1165 id,
1166 name,
1167 canonical_name,
1168 entity_type,
1169 summary,
1170 first_seen_at,
1171 last_seen_at,
1172 qdrant_point_id,
1173 };
1174 #[allow(clippy::cast_possible_truncation)]
1175 let normalized = (raw_score / max_score).clamp(0.0, 1.0) as f32;
1176 result.push((entity, normalized));
1177 }
1178
1179 Ok(result)
1180 }
1181
1182 pub async fn entity_structural_scores(
1192 &self,
1193 entity_ids: &[i64],
1194 ) -> Result<HashMap<i64, f32>, MemoryError> {
1195 const MAX_BATCH: usize = 163;
1198
1199 if entity_ids.is_empty() {
1200 return Ok(HashMap::new());
1201 }
1202
1203 let mut all_rows: Vec<(i64, i64, i64)> = Vec::new();
1204 for chunk in entity_ids.chunks(MAX_BATCH) {
1205 let n = chunk.len();
1206 let ph1 = placeholder_list(1, n);
1208 let ph2 = placeholder_list(n + 1, n);
1209 let ph3 = placeholder_list(n * 2 + 1, n);
1210
1211 let sql = format!(
1213 "SELECT entity_id,
1214 COUNT(*) AS degree,
1215 COUNT(DISTINCT edge_type) AS type_diversity
1216 FROM (
1217 SELECT source_entity_id AS entity_id, edge_type
1218 FROM graph_edges
1219 WHERE valid_to IS NULL AND source_entity_id IN ({ph1})
1220 UNION ALL
1221 SELECT target_entity_id AS entity_id, edge_type
1222 FROM graph_edges
1223 WHERE valid_to IS NULL AND target_entity_id IN ({ph2})
1224 )
1225 WHERE entity_id IN ({ph3})
1226 GROUP BY entity_id"
1227 );
1228
1229 let mut query = zeph_db::query_as::<_, (i64, i64, i64)>(&sql);
1230 for id in chunk {
1232 query = query.bind(*id);
1233 }
1234 for id in chunk {
1235 query = query.bind(*id);
1236 }
1237 for id in chunk {
1238 query = query.bind(*id);
1239 }
1240
1241 let chunk_rows: Vec<(i64, i64, i64)> = query.fetch_all(&self.pool).await?;
1242 all_rows.extend(chunk_rows);
1243 }
1244
1245 if all_rows.is_empty() {
1246 return Ok(entity_ids.iter().map(|&id| (id, 0.0_f32)).collect());
1247 }
1248
1249 let max_degree = all_rows
1250 .iter()
1251 .map(|(_, d, _)| *d)
1252 .max()
1253 .unwrap_or(1)
1254 .max(1);
1255
1256 let mut scores: HashMap<i64, f32> = entity_ids.iter().map(|&id| (id, 0.0_f32)).collect();
1257 for (entity_id, degree, type_diversity) in all_rows {
1258 #[allow(clippy::cast_precision_loss)]
1259 let norm_degree = degree as f32 / max_degree as f32;
1260 #[allow(clippy::cast_precision_loss)]
1261 let norm_diversity = (type_diversity as f32 / 4.0).min(1.0);
1262 let score = 0.6 * norm_degree + 0.4 * norm_diversity;
1263 scores.insert(entity_id, score);
1264 }
1265
1266 Ok(scores)
1267 }
1268
1269 pub async fn entity_community_ids(
1278 &self,
1279 entity_ids: &[i64],
1280 ) -> Result<HashMap<i64, i64>, MemoryError> {
1281 const MAX_BATCH: usize = 490;
1282
1283 if entity_ids.is_empty() {
1284 return Ok(HashMap::new());
1285 }
1286
1287 let mut result: HashMap<i64, i64> = HashMap::new();
1288 for chunk in entity_ids.chunks(MAX_BATCH) {
1289 let placeholders = placeholder_list(1, chunk.len());
1290
1291 let community_sql = community_ids_sql(&placeholders);
1292 let mut query = zeph_db::query_as::<_, (i64, i64)>(&community_sql);
1293 for id in chunk {
1294 query = query.bind(*id);
1295 }
1296
1297 let rows: Vec<(i64, i64)> = query.fetch_all(&self.pool).await?;
1298 result.extend(rows);
1299 }
1300
1301 Ok(result)
1302 }
1303
1304 pub async fn record_edge_retrieval(&self, edge_ids: &[i64]) -> Result<(), MemoryError> {
1313 const MAX_BATCH: usize = 490;
1314 let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1315 for chunk in edge_ids.chunks(MAX_BATCH) {
1316 let edge_placeholders = placeholder_list(1, chunk.len());
1317 let retrieval_sql = format!(
1318 "UPDATE graph_edges \
1319 SET retrieval_count = retrieval_count + 1, \
1320 last_retrieved_at = {epoch_now} \
1321 WHERE id IN ({edge_placeholders})"
1322 );
1323 let mut q = zeph_db::query(&retrieval_sql);
1324 for id in chunk {
1325 q = q.bind(*id);
1326 }
1327 q.execute(&self.pool).await?;
1328 }
1329 Ok(())
1330 }
1331
1332 pub async fn decay_edge_retrieval_counts(
1341 &self,
1342 decay_lambda: f64,
1343 interval_secs: u64,
1344 ) -> Result<usize, MemoryError> {
1345 let epoch_now_decay = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1346 let decay_raw = format!(
1347 "UPDATE graph_edges \
1348 SET retrieval_count = MAX(CAST(retrieval_count * ? AS INTEGER), 0) \
1349 WHERE valid_to IS NULL \
1350 AND retrieval_count > 0 \
1351 AND (last_retrieved_at IS NULL OR last_retrieved_at < {epoch_now_decay} - ?)"
1352 );
1353 let decay_sql = zeph_db::rewrite_placeholders(&decay_raw);
1354 let result = zeph_db::query(&decay_sql)
1355 .bind(decay_lambda)
1356 .bind(i64::try_from(interval_secs).unwrap_or(i64::MAX))
1357 .execute(&self.pool)
1358 .await?;
1359 Ok(usize::try_from(result.rows_affected())?)
1360 }
1361
1362 pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
1368 let days = i64::from(retention_days);
1369 let result = zeph_db::query(sql!(
1370 "DELETE FROM graph_edges
1371 WHERE expired_at IS NOT NULL
1372 AND expired_at < datetime('now', '-' || ? || ' days')"
1373 ))
1374 .bind(days)
1375 .execute(&self.pool)
1376 .await?;
1377 Ok(usize::try_from(result.rows_affected())?)
1378 }
1379
1380 pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
1386 let days = i64::from(retention_days);
1387 let result = zeph_db::query(sql!(
1388 "DELETE FROM graph_entities
1389 WHERE id NOT IN (
1390 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
1391 UNION
1392 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
1393 )
1394 AND last_seen_at < datetime('now', '-' || ? || ' days')"
1395 ))
1396 .bind(days)
1397 .execute(&self.pool)
1398 .await?;
1399 Ok(usize::try_from(result.rows_affected())?)
1400 }
1401
1402 pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1411 let current = self.entity_count().await?;
1412 let max = i64::try_from(max_entities)?;
1413 if current <= max {
1414 return Ok(0);
1415 }
1416 let excess = current - max;
1417 let result = zeph_db::query(sql!(
1418 "DELETE FROM graph_entities
1419 WHERE id IN (
1420 SELECT e.id
1421 FROM graph_entities e
1422 LEFT JOIN (
1423 SELECT source_entity_id AS eid, COUNT(*) AS cnt
1424 FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1425 UNION ALL
1426 SELECT target_entity_id AS eid, COUNT(*) AS cnt
1427 FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1428 ) edge_counts ON e.id = edge_counts.eid
1429 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1430 LIMIT ?
1431 )"
1432 ))
1433 .bind(excess)
1434 .execute(&self.pool)
1435 .await?;
1436 Ok(usize::try_from(result.rows_affected())?)
1437 }
1438
1439 pub async fn edges_at_timestamp(
1453 &self,
1454 entity_id: i64,
1455 timestamp: &str,
1456 ) -> Result<Vec<Edge>, MemoryError> {
1457 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1461 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1462 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1463 edge_type, retrieval_count, last_retrieved_at, superseded_by
1464 FROM graph_edges
1465 WHERE valid_to IS NULL
1466 AND valid_from <= ?
1467 AND (source_entity_id = ? OR target_entity_id = ?)
1468 UNION ALL
1469 SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1470 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1471 edge_type, retrieval_count, last_retrieved_at, superseded_by
1472 FROM graph_edges
1473 WHERE valid_to IS NOT NULL
1474 AND valid_from <= ?
1475 AND valid_to > ?
1476 AND (source_entity_id = ? OR target_entity_id = ?)"
1477 ))
1478 .bind(timestamp)
1479 .bind(entity_id)
1480 .bind(entity_id)
1481 .bind(timestamp)
1482 .bind(timestamp)
1483 .bind(entity_id)
1484 .bind(entity_id)
1485 .fetch_all(&self.pool)
1486 .await?;
1487 Ok(rows.into_iter().map(edge_from_row).collect())
1488 }
1489
1490 pub async fn edge_history(
1499 &self,
1500 source_entity_id: i64,
1501 predicate: &str,
1502 relation: Option<&str>,
1503 limit: usize,
1504 ) -> Result<Vec<Edge>, MemoryError> {
1505 let escaped = predicate
1507 .replace('\\', "\\\\")
1508 .replace('%', "\\%")
1509 .replace('_', "\\_");
1510 let like_pattern = format!("%{escaped}%");
1511 let limit = i64::try_from(limit)?;
1512 let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1513 zeph_db::query_as(sql!(
1514 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1515 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1516 edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
1517 FROM graph_edges
1518 WHERE source_entity_id = ?
1519 AND fact LIKE ? ESCAPE '\\'
1520 AND relation = ?
1521 ORDER BY valid_from DESC
1522 LIMIT ?"
1523 ))
1524 .bind(source_entity_id)
1525 .bind(&like_pattern)
1526 .bind(rel)
1527 .bind(limit)
1528 .fetch_all(&self.pool)
1529 .await?
1530 } else {
1531 zeph_db::query_as(sql!(
1532 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1533 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1534 edge_type, retrieval_count, last_retrieved_at, superseded_by, superseded_by
1535 FROM graph_edges
1536 WHERE source_entity_id = ?
1537 AND fact LIKE ? ESCAPE '\\'
1538 ORDER BY valid_from DESC
1539 LIMIT ?"
1540 ))
1541 .bind(source_entity_id)
1542 .bind(&like_pattern)
1543 .bind(limit)
1544 .fetch_all(&self.pool)
1545 .await?
1546 };
1547 Ok(rows.into_iter().map(edge_from_row).collect())
1548 }
1549
1550 pub async fn bfs(
1567 &self,
1568 start_entity_id: i64,
1569 max_hops: u32,
1570 ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1571 self.bfs_with_depth(start_entity_id, max_hops)
1572 .await
1573 .map(|(e, ed, _)| (e, ed))
1574 }
1575
1576 pub async fn bfs_with_depth(
1587 &self,
1588 start_entity_id: i64,
1589 max_hops: u32,
1590 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1591 self.bfs_core(start_entity_id, max_hops, None).await
1592 }
1593
1594 pub async fn bfs_at_timestamp(
1605 &self,
1606 start_entity_id: i64,
1607 max_hops: u32,
1608 timestamp: &str,
1609 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1610 self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1611 .await
1612 }
1613
1614 pub async fn bfs_typed(
1630 &self,
1631 start_entity_id: i64,
1632 max_hops: u32,
1633 edge_types: &[EdgeType],
1634 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1635 if edge_types.is_empty() {
1636 return self.bfs_with_depth(start_entity_id, max_hops).await;
1637 }
1638 self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1639 .await
1640 }
1641
1642 async fn bfs_core(
1650 &self,
1651 start_entity_id: i64,
1652 max_hops: u32,
1653 at_timestamp: Option<&str>,
1654 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1655 use std::collections::HashMap;
1656
1657 const MAX_FRONTIER: usize = 300;
1660
1661 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1662 let mut frontier: Vec<i64> = vec![start_entity_id];
1663 depth_map.insert(start_entity_id, 0);
1664
1665 for hop in 0..max_hops {
1666 if frontier.is_empty() {
1667 break;
1668 }
1669 frontier.truncate(MAX_FRONTIER);
1670 let n = frontier.len();
1674 let ph1 = placeholder_list(1, n);
1675 let ph2 = placeholder_list(n + 1, n);
1676 let ph3 = placeholder_list(n * 2 + 1, n);
1677 let edge_filter = if at_timestamp.is_some() {
1678 let ts_pos = n * 3 + 1;
1679 format!(
1680 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1681 ts = numbered_placeholder(ts_pos),
1682 )
1683 } else {
1684 "valid_to IS NULL".to_owned()
1685 };
1686 let neighbour_sql = format!(
1687 "SELECT DISTINCT CASE
1688 WHEN source_entity_id IN ({ph1}) THEN target_entity_id
1689 ELSE source_entity_id
1690 END as neighbour_id
1691 FROM graph_edges
1692 WHERE {edge_filter}
1693 AND (source_entity_id IN ({ph2}) OR target_entity_id IN ({ph3}))"
1694 );
1695 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1696 for id in &frontier {
1697 q = q.bind(*id);
1698 }
1699 for id in &frontier {
1700 q = q.bind(*id);
1701 }
1702 for id in &frontier {
1703 q = q.bind(*id);
1704 }
1705 if let Some(ts) = at_timestamp {
1706 q = q.bind(ts);
1707 }
1708 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1709 let mut next_frontier: Vec<i64> = Vec::new();
1710 for nbr in neighbours {
1711 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1712 e.insert(hop + 1);
1713 next_frontier.push(nbr);
1714 }
1715 }
1716 frontier = next_frontier;
1717 }
1718
1719 self.bfs_fetch_results(depth_map, at_timestamp).await
1720 }
1721
1722 async fn bfs_core_typed(
1731 &self,
1732 start_entity_id: i64,
1733 max_hops: u32,
1734 at_timestamp: Option<&str>,
1735 edge_types: &[EdgeType],
1736 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1737 use std::collections::HashMap;
1738
1739 const MAX_FRONTIER: usize = 300;
1740
1741 let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1742
1743 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1744 let mut frontier: Vec<i64> = vec![start_entity_id];
1745 depth_map.insert(start_entity_id, 0);
1746
1747 let n_types = type_strs.len();
1748 let type_in = placeholder_list(1, n_types);
1750 let id_start = n_types + 1;
1751
1752 for hop in 0..max_hops {
1753 if frontier.is_empty() {
1754 break;
1755 }
1756 frontier.truncate(MAX_FRONTIER);
1757
1758 let n_frontier = frontier.len();
1759 let fp1 = placeholder_list(id_start, n_frontier);
1761 let fp2 = placeholder_list(id_start + n_frontier, n_frontier);
1762 let fp3 = placeholder_list(id_start + n_frontier * 2, n_frontier);
1763
1764 let edge_filter = if at_timestamp.is_some() {
1765 let ts_pos = id_start + n_frontier * 3;
1766 format!(
1767 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1768 ts = numbered_placeholder(ts_pos),
1769 )
1770 } else {
1771 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1772 };
1773
1774 let neighbour_sql = format!(
1775 "SELECT DISTINCT CASE
1776 WHEN source_entity_id IN ({fp1}) THEN target_entity_id
1777 ELSE source_entity_id
1778 END as neighbour_id
1779 FROM graph_edges
1780 WHERE {edge_filter}
1781 AND (source_entity_id IN ({fp2}) OR target_entity_id IN ({fp3}))"
1782 );
1783
1784 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1785 for t in &type_strs {
1787 q = q.bind(*t);
1788 }
1789 for id in &frontier {
1791 q = q.bind(*id);
1792 }
1793 for id in &frontier {
1794 q = q.bind(*id);
1795 }
1796 for id in &frontier {
1797 q = q.bind(*id);
1798 }
1799 if let Some(ts) = at_timestamp {
1800 q = q.bind(ts);
1801 }
1802
1803 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1804 let mut next_frontier: Vec<i64> = Vec::new();
1805 for nbr in neighbours {
1806 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1807 e.insert(hop + 1);
1808 next_frontier.push(nbr);
1809 }
1810 }
1811 frontier = next_frontier;
1812 }
1813
1814 self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1816 .await
1817 }
1818
1819 async fn bfs_fetch_results_typed(
1827 &self,
1828 depth_map: std::collections::HashMap<i64, u32>,
1829 at_timestamp: Option<&str>,
1830 type_strs: &[&str],
1831 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1832 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1833 if visited_ids.is_empty() {
1834 return Ok((Vec::new(), Vec::new(), depth_map));
1835 }
1836 if visited_ids.len() > 499 {
1837 tracing::warn!(
1838 total = visited_ids.len(),
1839 retained = 499,
1840 "bfs_fetch_results_typed: visited entity set truncated to 499"
1841 );
1842 visited_ids.truncate(499);
1843 }
1844
1845 let n_types = type_strs.len();
1846 let n_visited = visited_ids.len();
1847
1848 let type_in = placeholder_list(1, n_types);
1850 let id_start = n_types + 1;
1851 let ph_ids1 = placeholder_list(id_start, n_visited);
1852 let ph_ids2 = placeholder_list(id_start + n_visited, n_visited);
1853
1854 let edge_filter = if at_timestamp.is_some() {
1855 let ts_pos = id_start + n_visited * 2;
1856 format!(
1857 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1858 ts = numbered_placeholder(ts_pos),
1859 )
1860 } else {
1861 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1862 };
1863
1864 let edge_sql = format!(
1865 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1866 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1867 edge_type, retrieval_count, last_retrieved_at, superseded_by
1868 FROM graph_edges
1869 WHERE {edge_filter}
1870 AND source_entity_id IN ({ph_ids1})
1871 AND target_entity_id IN ({ph_ids2})"
1872 );
1873 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1874 for t in type_strs {
1875 edge_query = edge_query.bind(*t);
1876 }
1877 for id in &visited_ids {
1878 edge_query = edge_query.bind(*id);
1879 }
1880 for id in &visited_ids {
1881 edge_query = edge_query.bind(*id);
1882 }
1883 if let Some(ts) = at_timestamp {
1884 edge_query = edge_query.bind(ts);
1885 }
1886 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1887
1888 let entity_sql2 = format!(
1890 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1891 FROM graph_entities WHERE id IN ({ph})",
1892 ph = placeholder_list(1, visited_ids.len()),
1893 );
1894 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql2);
1895 for id in &visited_ids {
1896 entity_query = entity_query.bind(*id);
1897 }
1898 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1899
1900 let entities: Vec<Entity> = entity_rows
1901 .into_iter()
1902 .map(entity_from_row)
1903 .collect::<Result<Vec<_>, _>>()?;
1904 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1905
1906 Ok((entities, edges, depth_map))
1907 }
1908
1909 async fn bfs_fetch_results(
1911 &self,
1912 depth_map: std::collections::HashMap<i64, u32>,
1913 at_timestamp: Option<&str>,
1914 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1915 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1916 if visited_ids.is_empty() {
1917 return Ok((Vec::new(), Vec::new(), depth_map));
1918 }
1919 if visited_ids.len() > 499 {
1921 tracing::warn!(
1922 total = visited_ids.len(),
1923 retained = 499,
1924 "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1925 some reachable entities will be dropped from results"
1926 );
1927 visited_ids.truncate(499);
1928 }
1929
1930 let n = visited_ids.len();
1931 let ph_ids1 = placeholder_list(1, n);
1932 let ph_ids2 = placeholder_list(n + 1, n);
1933 let edge_filter = if at_timestamp.is_some() {
1934 let ts_pos = n * 2 + 1;
1935 format!(
1936 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1937 ts = numbered_placeholder(ts_pos),
1938 )
1939 } else {
1940 "valid_to IS NULL".to_owned()
1941 };
1942 let edge_sql = format!(
1943 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1944 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1945 edge_type, retrieval_count, last_retrieved_at, superseded_by
1946 FROM graph_edges
1947 WHERE {edge_filter}
1948 AND source_entity_id IN ({ph_ids1})
1949 AND target_entity_id IN ({ph_ids2})"
1950 );
1951 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1952 for id in &visited_ids {
1953 edge_query = edge_query.bind(*id);
1954 }
1955 for id in &visited_ids {
1956 edge_query = edge_query.bind(*id);
1957 }
1958 if let Some(ts) = at_timestamp {
1959 edge_query = edge_query.bind(ts);
1960 }
1961 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1962
1963 let entity_sql = format!(
1964 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1965 FROM graph_entities WHERE id IN ({ph})",
1966 ph = placeholder_list(1, n),
1967 );
1968 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql);
1969 for id in &visited_ids {
1970 entity_query = entity_query.bind(*id);
1971 }
1972 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1973
1974 let entities: Vec<Entity> = entity_rows
1975 .into_iter()
1976 .map(entity_from_row)
1977 .collect::<Result<Vec<_>, _>>()?;
1978 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1979
1980 Ok((entities, edges, depth_map))
1981 }
1982
1983 pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
1999 let find_by_name_sql = format!(
2000 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id \
2001 FROM graph_entities \
2002 WHERE name = ? {cn} OR canonical_name = ? {cn} \
2003 LIMIT 5",
2004 cn = <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
2005 );
2006 let rows: Vec<EntityRow> = zeph_db::query_as(&find_by_name_sql)
2007 .bind(name)
2008 .bind(name)
2009 .fetch_all(&self.pool)
2010 .await?;
2011
2012 if !rows.is_empty() {
2013 return rows.into_iter().map(entity_from_row).collect();
2014 }
2015
2016 self.find_entities_fuzzy(name, 5).await
2017 }
2018
2019 pub async fn unprocessed_messages_for_backfill(
2027 &self,
2028 limit: usize,
2029 ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
2030 let limit = i64::try_from(limit)?;
2031 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
2032 "SELECT id, content FROM messages
2033 WHERE graph_processed = 0
2034 ORDER BY id ASC
2035 LIMIT ?"
2036 ))
2037 .bind(limit)
2038 .fetch_all(&self.pool)
2039 .await?;
2040 Ok(rows
2041 .into_iter()
2042 .map(|(id, content)| (crate::types::MessageId(id), content))
2043 .collect())
2044 }
2045
2046 pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
2052 let count: i64 = zeph_db::query_scalar(sql!(
2053 "SELECT COUNT(*) FROM messages WHERE graph_processed = 0"
2054 ))
2055 .fetch_one(&self.pool)
2056 .await?;
2057 Ok(count)
2058 }
2059
2060 pub async fn mark_messages_graph_processed(
2066 &self,
2067 ids: &[crate::types::MessageId],
2068 ) -> Result<(), MemoryError> {
2069 const MAX_BATCH: usize = 490;
2070 if ids.is_empty() {
2071 return Ok(());
2072 }
2073 for chunk in ids.chunks(MAX_BATCH) {
2074 let placeholders = placeholder_list(1, chunk.len());
2075 let sql =
2076 format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
2077 let mut query = zeph_db::query(&sql);
2078 for id in chunk {
2079 query = query.bind(id.0);
2080 }
2081 query.execute(&self.pool).await?;
2082 }
2083 Ok(())
2084 }
2085}
2086
2087#[cfg(feature = "sqlite")]
2090fn community_ids_sql(placeholders: &str) -> String {
2091 format!(
2092 "SELECT CAST(j.value AS INTEGER) AS entity_id, c.id AS community_id
2093 FROM graph_communities c, json_each(c.entity_ids) j
2094 WHERE CAST(j.value AS INTEGER) IN ({placeholders})"
2095 )
2096}
2097
2098#[cfg(feature = "postgres")]
2099fn community_ids_sql(placeholders: &str) -> String {
2100 format!(
2101 "SELECT (j.value)::bigint AS entity_id, c.id AS community_id
2102 FROM graph_communities c,
2103 jsonb_array_elements_text(c.entity_ids::jsonb) j(value)
2104 WHERE (j.value)::bigint IN ({placeholders})"
2105 )
2106}
2107
2108#[derive(zeph_db::FromRow)]
2111struct EntityRow {
2112 id: i64,
2113 name: String,
2114 canonical_name: String,
2115 entity_type: String,
2116 summary: Option<String>,
2117 first_seen_at: String,
2118 last_seen_at: String,
2119 qdrant_point_id: Option<String>,
2120}
2121
2122fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
2123 let entity_type = row
2124 .entity_type
2125 .parse::<EntityType>()
2126 .map_err(MemoryError::GraphStore)?;
2127 Ok(Entity {
2128 id: row.id,
2129 name: row.name,
2130 canonical_name: row.canonical_name,
2131 entity_type,
2132 summary: row.summary,
2133 first_seen_at: row.first_seen_at,
2134 last_seen_at: row.last_seen_at,
2135 qdrant_point_id: row.qdrant_point_id,
2136 })
2137}
2138
2139#[derive(zeph_db::FromRow)]
2140struct AliasRow {
2141 id: i64,
2142 entity_id: i64,
2143 alias_name: String,
2144 created_at: String,
2145}
2146
2147fn alias_from_row(row: AliasRow) -> EntityAlias {
2148 EntityAlias {
2149 id: row.id,
2150 entity_id: row.entity_id,
2151 alias_name: row.alias_name,
2152 created_at: row.created_at,
2153 }
2154}
2155
2156#[derive(zeph_db::FromRow)]
2157struct EdgeRow {
2158 id: i64,
2159 source_entity_id: i64,
2160 target_entity_id: i64,
2161 relation: String,
2162 fact: String,
2163 confidence: f64,
2164 valid_from: String,
2165 valid_to: Option<String>,
2166 created_at: String,
2167 expired_at: Option<String>,
2168 #[sqlx(rename = "episode_id")]
2169 source_message_id: Option<i64>,
2170 qdrant_point_id: Option<String>,
2171 edge_type: String,
2172 retrieval_count: i32,
2173 last_retrieved_at: Option<i64>,
2174 superseded_by: Option<i64>,
2175}
2176
2177fn edge_from_row(row: EdgeRow) -> Edge {
2178 let edge_type = row
2179 .edge_type
2180 .parse::<EdgeType>()
2181 .unwrap_or(EdgeType::Semantic);
2182 Edge {
2183 id: row.id,
2184 source_entity_id: row.source_entity_id,
2185 target_entity_id: row.target_entity_id,
2186 relation: row.relation,
2187 fact: row.fact,
2188 #[allow(clippy::cast_possible_truncation)]
2189 confidence: row.confidence as f32,
2190 valid_from: row.valid_from,
2191 valid_to: row.valid_to,
2192 created_at: row.created_at,
2193 expired_at: row.expired_at,
2194 source_message_id: row.source_message_id.map(MessageId),
2195 qdrant_point_id: row.qdrant_point_id,
2196 edge_type,
2197 retrieval_count: row.retrieval_count,
2198 last_retrieved_at: row.last_retrieved_at,
2199 superseded_by: row.superseded_by,
2200 }
2201}
2202
2203#[derive(zeph_db::FromRow)]
2204struct CommunityRow {
2205 id: i64,
2206 name: String,
2207 summary: String,
2208 entity_ids: String,
2209 fingerprint: Option<String>,
2210 created_at: String,
2211 updated_at: String,
2212}
2213
2214impl GraphStore {
2217 pub async fn ensure_episode(&self, conversation_id: i64) -> Result<i64, MemoryError> {
2225 zeph_db::query(sql!("INSERT OR IGNORE INTO conversations (id) VALUES (?)"))
2229 .bind(conversation_id)
2230 .execute(&self.pool)
2231 .await?;
2232
2233 let id: i64 = zeph_db::query_scalar(sql!(
2234 "INSERT INTO graph_episodes (conversation_id)
2235 VALUES (?)
2236 ON CONFLICT(conversation_id) DO UPDATE SET conversation_id = excluded.conversation_id
2237 RETURNING id"
2238 ))
2239 .bind(conversation_id)
2240 .fetch_one(&self.pool)
2241 .await?;
2242 Ok(id)
2243 }
2244
2245 pub async fn link_entity_to_episode(
2253 &self,
2254 episode_id: i64,
2255 entity_id: i64,
2256 ) -> Result<(), MemoryError> {
2257 zeph_db::query(sql!(
2258 "INSERT OR IGNORE INTO graph_episode_entities (episode_id, entity_id)
2259 VALUES (?, ?)"
2260 ))
2261 .bind(episode_id)
2262 .bind(entity_id)
2263 .execute(&self.pool)
2264 .await?;
2265 Ok(())
2266 }
2267
2268 pub async fn episodes_for_entity(
2274 &self,
2275 entity_id: i64,
2276 ) -> Result<Vec<super::types::Episode>, MemoryError> {
2277 #[derive(zeph_db::FromRow)]
2278 struct EpisodeRow {
2279 id: i64,
2280 conversation_id: i64,
2281 created_at: String,
2282 closed_at: Option<String>,
2283 }
2284 let rows: Vec<EpisodeRow> = zeph_db::query_as(sql!(
2285 "SELECT e.id, e.conversation_id, e.created_at, e.closed_at
2286 FROM graph_episodes e
2287 JOIN graph_episode_entities ee ON ee.episode_id = e.id
2288 WHERE ee.entity_id = ?"
2289 ))
2290 .bind(entity_id)
2291 .fetch_all(&self.pool)
2292 .await?;
2293 Ok(rows
2294 .into_iter()
2295 .map(|r| super::types::Episode {
2296 id: r.id,
2297 conversation_id: r.conversation_id,
2298 created_at: r.created_at,
2299 closed_at: r.closed_at,
2300 })
2301 .collect())
2302 }
2303}
2304
2305#[cfg(test)]
2308mod tests;