1use std::collections::HashMap;
5#[allow(unused_imports)]
6use zeph_db::sql;
7
8use futures::Stream;
9use zeph_db::fts::sanitize_fts_query;
10use zeph_db::{ActiveDialect, DbPool, numbered_placeholder, placeholder_list};
11
12use crate::error::MemoryError;
13use crate::types::MessageId;
14
15use super::types::{Community, Edge, EdgeType, Entity, EntityAlias, EntityType};
16
17pub struct GraphStore {
18 pool: DbPool,
19}
20
21impl GraphStore {
22 #[must_use]
23 pub fn new(pool: DbPool) -> Self {
24 Self { pool }
25 }
26
27 #[must_use]
28 pub fn pool(&self) -> &DbPool {
29 &self.pool
30 }
31
32 pub async fn upsert_entity(
45 &self,
46 surface_name: &str,
47 canonical_name: &str,
48 entity_type: EntityType,
49 summary: Option<&str>,
50 ) -> Result<i64, MemoryError> {
51 let type_str = entity_type.as_str();
52 let id: i64 = zeph_db::query_scalar(sql!(
53 "INSERT INTO graph_entities (name, canonical_name, entity_type, summary)
54 VALUES (?, ?, ?, ?)
55 ON CONFLICT(canonical_name, entity_type) DO UPDATE SET
56 name = excluded.name,
57 summary = COALESCE(excluded.summary, summary),
58 last_seen_at = CURRENT_TIMESTAMP
59 RETURNING id"
60 ))
61 .bind(surface_name)
62 .bind(canonical_name)
63 .bind(type_str)
64 .bind(summary)
65 .fetch_one(&self.pool)
66 .await?;
67 Ok(id)
68 }
69
70 pub async fn find_entity(
76 &self,
77 canonical_name: &str,
78 entity_type: EntityType,
79 ) -> Result<Option<Entity>, MemoryError> {
80 let type_str = entity_type.as_str();
81 let row: Option<EntityRow> = zeph_db::query_as(
82 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
83 FROM graph_entities
84 WHERE canonical_name = ? AND entity_type = ?"),
85 )
86 .bind(canonical_name)
87 .bind(type_str)
88 .fetch_optional(&self.pool)
89 .await?;
90 row.map(entity_from_row).transpose()
91 }
92
93 pub async fn find_entity_by_id(&self, entity_id: i64) -> Result<Option<Entity>, MemoryError> {
99 let row: Option<EntityRow> = zeph_db::query_as(
100 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
101 FROM graph_entities
102 WHERE id = ?"),
103 )
104 .bind(entity_id)
105 .fetch_optional(&self.pool)
106 .await?;
107 row.map(entity_from_row).transpose()
108 }
109
110 pub async fn set_entity_qdrant_point_id(
116 &self,
117 entity_id: i64,
118 point_id: &str,
119 ) -> Result<(), MemoryError> {
120 zeph_db::query(sql!(
121 "UPDATE graph_entities SET qdrant_point_id = ? WHERE id = ?"
122 ))
123 .bind(point_id)
124 .bind(entity_id)
125 .execute(&self.pool)
126 .await?;
127 Ok(())
128 }
129
130 pub async fn find_entities_fuzzy(
151 &self,
152 query: &str,
153 limit: usize,
154 ) -> Result<Vec<Entity>, MemoryError> {
155 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
159 let query = &query[..query.floor_char_boundary(512)];
160 let sanitized = sanitize_fts_query(query);
163 if sanitized.is_empty() {
164 return Ok(vec![]);
165 }
166 let fts_query: String = sanitized
167 .split_whitespace()
168 .filter(|t| !FTS5_OPERATORS.contains(t))
169 .map(|t| format!("{t}*"))
170 .collect::<Vec<_>>()
171 .join(" ");
172 if fts_query.is_empty() {
173 return Ok(vec![]);
174 }
175
176 let limit = i64::try_from(limit)?;
177 let search_sql = format!(
180 "SELECT DISTINCT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
181 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
182 FROM graph_entities_fts fts \
183 JOIN graph_entities e ON e.id = fts.rowid \
184 WHERE graph_entities_fts MATCH ? \
185 UNION \
186 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
187 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
188 FROM graph_entity_aliases a \
189 JOIN graph_entities e ON e.id = a.entity_id \
190 WHERE a.alias_name LIKE ? ESCAPE '\\' {} \
191 LIMIT ?",
192 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
193 );
194 let rows: Vec<EntityRow> = zeph_db::query_as(&search_sql)
195 .bind(&fts_query)
196 .bind(format!(
197 "%{}%",
198 query
199 .trim()
200 .replace('\\', "\\\\")
201 .replace('%', "\\%")
202 .replace('_', "\\_")
203 ))
204 .bind(limit)
205 .fetch_all(&self.pool)
206 .await?;
207 rows.into_iter()
208 .map(entity_from_row)
209 .collect::<Result<Vec<_>, _>>()
210 }
211
212 #[cfg(feature = "sqlite")]
222 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
223 zeph_db::query("PRAGMA wal_checkpoint(PASSIVE)")
224 .execute(&self.pool)
225 .await?;
226 Ok(())
227 }
228
229 #[cfg(feature = "postgres")]
235 pub async fn checkpoint_wal(&self) -> Result<(), MemoryError> {
236 Ok(())
237 }
238
239 pub fn all_entities_stream(&self) -> impl Stream<Item = Result<Entity, MemoryError>> + '_ {
241 use futures::StreamExt as _;
242 zeph_db::query_as::<_, EntityRow>(
243 sql!("SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
244 FROM graph_entities ORDER BY id ASC"),
245 )
246 .fetch(&self.pool)
247 .map(|r: Result<EntityRow, zeph_db::SqlxError>| {
248 r.map_err(MemoryError::from).and_then(entity_from_row)
249 })
250 }
251
252 pub async fn add_alias(&self, entity_id: i64, alias_name: &str) -> Result<(), MemoryError> {
260 let insert_alias_sql = format!(
261 "{} INTO graph_entity_aliases (entity_id, alias_name) VALUES (?, ?){}",
262 <ActiveDialect as zeph_db::dialect::Dialect>::INSERT_IGNORE,
263 <ActiveDialect as zeph_db::dialect::Dialect>::CONFLICT_NOTHING,
264 );
265 zeph_db::query(&insert_alias_sql)
266 .bind(entity_id)
267 .bind(alias_name)
268 .execute(&self.pool)
269 .await?;
270 Ok(())
271 }
272
273 pub async fn find_entity_by_alias(
281 &self,
282 alias_name: &str,
283 entity_type: EntityType,
284 ) -> Result<Option<Entity>, MemoryError> {
285 let type_str = entity_type.as_str();
286 let alias_typed_sql = format!(
287 "SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
288 e.first_seen_at, e.last_seen_at, e.qdrant_point_id \
289 FROM graph_entity_aliases a \
290 JOIN graph_entities e ON e.id = a.entity_id \
291 WHERE a.alias_name = ? {} \
292 AND e.entity_type = ? \
293 ORDER BY e.id ASC \
294 LIMIT 1",
295 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
296 );
297 let row: Option<EntityRow> = zeph_db::query_as(&alias_typed_sql)
298 .bind(alias_name)
299 .bind(type_str)
300 .fetch_optional(&self.pool)
301 .await?;
302 row.map(entity_from_row).transpose()
303 }
304
305 pub async fn aliases_for_entity(
311 &self,
312 entity_id: i64,
313 ) -> Result<Vec<EntityAlias>, MemoryError> {
314 let rows: Vec<AliasRow> = zeph_db::query_as(sql!(
315 "SELECT id, entity_id, alias_name, created_at
316 FROM graph_entity_aliases
317 WHERE entity_id = ?
318 ORDER BY id ASC"
319 ))
320 .bind(entity_id)
321 .fetch_all(&self.pool)
322 .await?;
323 Ok(rows.into_iter().map(alias_from_row).collect())
324 }
325
326 pub async fn all_entities(&self) -> Result<Vec<Entity>, MemoryError> {
332 use futures::TryStreamExt as _;
333 self.all_entities_stream().try_collect().await
334 }
335
336 pub async fn entity_count(&self) -> Result<i64, MemoryError> {
342 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_entities"))
343 .fetch_one(&self.pool)
344 .await?;
345 Ok(count)
346 }
347
348 pub async fn insert_edge(
366 &self,
367 source_entity_id: i64,
368 target_entity_id: i64,
369 relation: &str,
370 fact: &str,
371 confidence: f32,
372 episode_id: Option<MessageId>,
373 ) -> Result<i64, MemoryError> {
374 self.insert_edge_typed(
375 source_entity_id,
376 target_entity_id,
377 relation,
378 fact,
379 confidence,
380 episode_id,
381 EdgeType::Semantic,
382 )
383 .await
384 }
385
386 #[allow(clippy::too_many_arguments)]
395 pub async fn insert_edge_typed(
396 &self,
397 source_entity_id: i64,
398 target_entity_id: i64,
399 relation: &str,
400 fact: &str,
401 confidence: f32,
402 episode_id: Option<MessageId>,
403 edge_type: EdgeType,
404 ) -> Result<i64, MemoryError> {
405 if source_entity_id == target_entity_id {
406 return Err(MemoryError::InvalidInput(format!(
407 "self-loop edge rejected: source and target are the same entity (id={source_entity_id})"
408 )));
409 }
410 let confidence = confidence.clamp(0.0, 1.0);
411 let edge_type_str = edge_type.as_str();
412
413 let mut tx = zeph_db::begin(&self.pool).await?;
418
419 let existing: Option<(i64, f64)> = zeph_db::query_as(sql!(
420 "SELECT id, confidence FROM graph_edges
421 WHERE source_entity_id = ?
422 AND target_entity_id = ?
423 AND relation = ?
424 AND edge_type = ?
425 AND valid_to IS NULL
426 LIMIT 1"
427 ))
428 .bind(source_entity_id)
429 .bind(target_entity_id)
430 .bind(relation)
431 .bind(edge_type_str)
432 .fetch_optional(&mut *tx)
433 .await?;
434
435 if let Some((existing_id, stored_conf)) = existing {
436 let updated_conf = f64::from(confidence).max(stored_conf);
437 zeph_db::query(sql!("UPDATE graph_edges SET confidence = ? WHERE id = ?"))
438 .bind(updated_conf)
439 .bind(existing_id)
440 .execute(&mut *tx)
441 .await?;
442 tx.commit().await?;
443 return Ok(existing_id);
444 }
445
446 let episode_raw: Option<i64> = episode_id.map(|m| m.0);
447 let id: i64 = zeph_db::query_scalar(sql!(
448 "INSERT INTO graph_edges
449 (source_entity_id, target_entity_id, relation, fact, confidence, episode_id, edge_type)
450 VALUES (?, ?, ?, ?, ?, ?, ?)
451 RETURNING id"
452 ))
453 .bind(source_entity_id)
454 .bind(target_entity_id)
455 .bind(relation)
456 .bind(fact)
457 .bind(f64::from(confidence))
458 .bind(episode_raw)
459 .bind(edge_type_str)
460 .fetch_one(&mut *tx)
461 .await?;
462 tx.commit().await?;
463 Ok(id)
464 }
465
466 pub async fn invalidate_edge(&self, edge_id: i64) -> Result<(), MemoryError> {
472 zeph_db::query(sql!(
473 "UPDATE graph_edges SET valid_to = CURRENT_TIMESTAMP, expired_at = CURRENT_TIMESTAMP
474 WHERE id = ?"
475 ))
476 .bind(edge_id)
477 .execute(&self.pool)
478 .await?;
479 Ok(())
480 }
481
482 pub async fn edges_for_entities(
499 &self,
500 entity_ids: &[i64],
501 edge_types: &[super::types::EdgeType],
502 ) -> Result<Vec<Edge>, MemoryError> {
503 const MAX_BATCH_ENTITIES: usize = 490;
507
508 let mut all_edges: Vec<Edge> = Vec::new();
509
510 for chunk in entity_ids.chunks(MAX_BATCH_ENTITIES) {
511 let edges = self.query_batch_edges(chunk, edge_types).await?;
512 all_edges.extend(edges);
513 }
514
515 Ok(all_edges)
516 }
517
518 async fn query_batch_edges(
522 &self,
523 entity_ids: &[i64],
524 edge_types: &[super::types::EdgeType],
525 ) -> Result<Vec<Edge>, MemoryError> {
526 if entity_ids.is_empty() {
527 return Ok(Vec::new());
528 }
529
530 let n_ids = entity_ids.len();
533 let n_types = edge_types.len();
534
535 let sql = if n_types == 0 {
536 let placeholders = placeholder_list(1, n_ids);
538 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
539 format!(
540 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
541 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
542 edge_type, retrieval_count, last_retrieved_at
543 FROM graph_edges
544 WHERE valid_to IS NULL
545 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))"
546 )
547 } else {
548 let placeholders = placeholder_list(1, n_ids);
549 let placeholders2 = placeholder_list(n_ids + 1, n_ids);
550 let type_placeholders = placeholder_list(n_ids * 2 + 1, n_types);
551 format!(
552 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
553 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
554 edge_type, retrieval_count, last_retrieved_at
555 FROM graph_edges
556 WHERE valid_to IS NULL
557 AND (source_entity_id IN ({placeholders}) OR target_entity_id IN ({placeholders2}))
558 AND edge_type IN ({type_placeholders})"
559 )
560 };
561
562 let mut query = zeph_db::query_as::<_, EdgeRow>(&sql);
564 for id in entity_ids {
565 query = query.bind(*id);
566 }
567 for id in entity_ids {
568 query = query.bind(*id);
569 }
570 for et in edge_types {
571 query = query.bind(et.as_str());
572 }
573
574 let rows: Vec<EdgeRow> = query.fetch_all(&self.pool).await?;
575 Ok(rows.into_iter().map(edge_from_row).collect())
576 }
577
578 pub async fn edges_for_entity(&self, entity_id: i64) -> Result<Vec<Edge>, MemoryError> {
584 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
585 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
586 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
587 edge_type, retrieval_count, last_retrieved_at
588 FROM graph_edges
589 WHERE valid_to IS NULL
590 AND (source_entity_id = ? OR target_entity_id = ?)"
591 ))
592 .bind(entity_id)
593 .bind(entity_id)
594 .fetch_all(&self.pool)
595 .await?;
596 Ok(rows.into_iter().map(edge_from_row).collect())
597 }
598
599 pub async fn edge_history_for_entity(
606 &self,
607 entity_id: i64,
608 limit: usize,
609 ) -> Result<Vec<Edge>, MemoryError> {
610 let limit = i64::try_from(limit)?;
611 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
612 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
613 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
614 edge_type, retrieval_count, last_retrieved_at
615 FROM graph_edges
616 WHERE source_entity_id = ? OR target_entity_id = ?
617 ORDER BY valid_from DESC
618 LIMIT ?"
619 ))
620 .bind(entity_id)
621 .bind(entity_id)
622 .bind(limit)
623 .fetch_all(&self.pool)
624 .await?;
625 Ok(rows.into_iter().map(edge_from_row).collect())
626 }
627
628 pub async fn edges_between(
634 &self,
635 entity_a: i64,
636 entity_b: i64,
637 ) -> Result<Vec<Edge>, MemoryError> {
638 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
639 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
640 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
641 edge_type, retrieval_count, last_retrieved_at
642 FROM graph_edges
643 WHERE valid_to IS NULL
644 AND ((source_entity_id = ? AND target_entity_id = ?)
645 OR (source_entity_id = ? AND target_entity_id = ?))"
646 ))
647 .bind(entity_a)
648 .bind(entity_b)
649 .bind(entity_b)
650 .bind(entity_a)
651 .fetch_all(&self.pool)
652 .await?;
653 Ok(rows.into_iter().map(edge_from_row).collect())
654 }
655
656 pub async fn edges_exact(
662 &self,
663 source_entity_id: i64,
664 target_entity_id: i64,
665 ) -> Result<Vec<Edge>, MemoryError> {
666 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
667 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
668 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
669 edge_type, retrieval_count, last_retrieved_at
670 FROM graph_edges
671 WHERE valid_to IS NULL
672 AND source_entity_id = ?
673 AND target_entity_id = ?"
674 ))
675 .bind(source_entity_id)
676 .bind(target_entity_id)
677 .fetch_all(&self.pool)
678 .await?;
679 Ok(rows.into_iter().map(edge_from_row).collect())
680 }
681
682 pub async fn active_edge_count(&self) -> Result<i64, MemoryError> {
688 let count: i64 = zeph_db::query_scalar(sql!(
689 "SELECT COUNT(*) FROM graph_edges WHERE valid_to IS NULL"
690 ))
691 .fetch_one(&self.pool)
692 .await?;
693 Ok(count)
694 }
695
696 pub async fn edge_type_distribution(&self) -> Result<Vec<(String, i64)>, MemoryError> {
702 let rows: Vec<(String, i64)> = zeph_db::query_as(
703 sql!("SELECT edge_type, COUNT(*) FROM graph_edges WHERE valid_to IS NULL GROUP BY edge_type ORDER BY edge_type"),
704 )
705 .fetch_all(&self.pool)
706 .await?;
707 Ok(rows)
708 }
709
710 pub async fn upsert_community(
722 &self,
723 name: &str,
724 summary: &str,
725 entity_ids: &[i64],
726 fingerprint: Option<&str>,
727 ) -> Result<i64, MemoryError> {
728 let entity_ids_json = serde_json::to_string(entity_ids)?;
729 let id: i64 = zeph_db::query_scalar(sql!(
730 "INSERT INTO graph_communities (name, summary, entity_ids, fingerprint)
731 VALUES (?, ?, ?, ?)
732 ON CONFLICT(name) DO UPDATE SET
733 summary = excluded.summary,
734 entity_ids = excluded.entity_ids,
735 fingerprint = COALESCE(excluded.fingerprint, fingerprint),
736 updated_at = CURRENT_TIMESTAMP
737 RETURNING id"
738 ))
739 .bind(name)
740 .bind(summary)
741 .bind(entity_ids_json)
742 .bind(fingerprint)
743 .fetch_one(&self.pool)
744 .await?;
745 Ok(id)
746 }
747
748 pub async fn community_fingerprints(&self) -> Result<HashMap<String, i64>, MemoryError> {
755 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
756 "SELECT fingerprint, id FROM graph_communities WHERE fingerprint IS NOT NULL"
757 ))
758 .fetch_all(&self.pool)
759 .await?;
760 Ok(rows.into_iter().collect())
761 }
762
763 pub async fn delete_community_by_id(&self, id: i64) -> Result<(), MemoryError> {
769 zeph_db::query(sql!("DELETE FROM graph_communities WHERE id = ?"))
770 .bind(id)
771 .execute(&self.pool)
772 .await?;
773 Ok(())
774 }
775
776 pub async fn clear_community_fingerprint(&self, id: i64) -> Result<(), MemoryError> {
785 zeph_db::query(sql!(
786 "UPDATE graph_communities SET fingerprint = NULL WHERE id = ?"
787 ))
788 .bind(id)
789 .execute(&self.pool)
790 .await?;
791 Ok(())
792 }
793
794 pub async fn community_for_entity(
803 &self,
804 entity_id: i64,
805 ) -> Result<Option<Community>, MemoryError> {
806 let row: Option<CommunityRow> = zeph_db::query_as(
807 sql!("SELECT c.id, c.name, c.summary, c.entity_ids, c.fingerprint, c.created_at, c.updated_at
808 FROM graph_communities c, json_each(c.entity_ids) j
809 WHERE CAST(j.value AS INTEGER) = ?
810 LIMIT 1"),
811 )
812 .bind(entity_id)
813 .fetch_optional(&self.pool)
814 .await?;
815 match row {
816 Some(row) => {
817 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
818 Ok(Some(Community {
819 id: row.id,
820 name: row.name,
821 summary: row.summary,
822 entity_ids,
823 fingerprint: row.fingerprint,
824 created_at: row.created_at,
825 updated_at: row.updated_at,
826 }))
827 }
828 None => Ok(None),
829 }
830 }
831
832 pub async fn all_communities(&self) -> Result<Vec<Community>, MemoryError> {
838 let rows: Vec<CommunityRow> = zeph_db::query_as(sql!(
839 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
840 FROM graph_communities
841 ORDER BY id ASC"
842 ))
843 .fetch_all(&self.pool)
844 .await?;
845
846 rows.into_iter()
847 .map(|row| {
848 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
849 Ok(Community {
850 id: row.id,
851 name: row.name,
852 summary: row.summary,
853 entity_ids,
854 fingerprint: row.fingerprint,
855 created_at: row.created_at,
856 updated_at: row.updated_at,
857 })
858 })
859 .collect()
860 }
861
862 pub async fn community_count(&self) -> Result<i64, MemoryError> {
868 let count: i64 = zeph_db::query_scalar(sql!("SELECT COUNT(*) FROM graph_communities"))
869 .fetch_one(&self.pool)
870 .await?;
871 Ok(count)
872 }
873
874 pub async fn get_metadata(&self, key: &str) -> Result<Option<String>, MemoryError> {
882 let val: Option<String> =
883 zeph_db::query_scalar(sql!("SELECT value FROM graph_metadata WHERE key = ?"))
884 .bind(key)
885 .fetch_optional(&self.pool)
886 .await?;
887 Ok(val)
888 }
889
890 pub async fn set_metadata(&self, key: &str, value: &str) -> Result<(), MemoryError> {
896 zeph_db::query(sql!(
897 "INSERT INTO graph_metadata (key, value) VALUES (?, ?)
898 ON CONFLICT(key) DO UPDATE SET value = excluded.value"
899 ))
900 .bind(key)
901 .bind(value)
902 .execute(&self.pool)
903 .await?;
904 Ok(())
905 }
906
907 pub async fn extraction_count(&self) -> Result<i64, MemoryError> {
915 let val = self.get_metadata("extraction_count").await?;
916 Ok(val.and_then(|v| v.parse::<i64>().ok()).unwrap_or(0))
917 }
918
919 pub fn all_active_edges_stream(&self) -> impl Stream<Item = Result<Edge, MemoryError>> + '_ {
921 use futures::StreamExt as _;
922 zeph_db::query_as::<_, EdgeRow>(sql!(
923 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
924 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
925 edge_type, retrieval_count, last_retrieved_at
926 FROM graph_edges
927 WHERE valid_to IS NULL
928 ORDER BY id ASC"
929 ))
930 .fetch(&self.pool)
931 .map(|r| r.map_err(MemoryError::from).map(edge_from_row))
932 }
933
934 pub async fn edges_after_id(
951 &self,
952 after_id: i64,
953 limit: i64,
954 ) -> Result<Vec<Edge>, MemoryError> {
955 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
956 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
957 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
958 edge_type, retrieval_count, last_retrieved_at
959 FROM graph_edges
960 WHERE valid_to IS NULL AND id > ?
961 ORDER BY id ASC
962 LIMIT ?"
963 ))
964 .bind(after_id)
965 .bind(limit)
966 .fetch_all(&self.pool)
967 .await?;
968 Ok(rows.into_iter().map(edge_from_row).collect())
969 }
970
971 pub async fn find_community_by_id(&self, id: i64) -> Result<Option<Community>, MemoryError> {
977 let row: Option<CommunityRow> = zeph_db::query_as(sql!(
978 "SELECT id, name, summary, entity_ids, fingerprint, created_at, updated_at
979 FROM graph_communities
980 WHERE id = ?"
981 ))
982 .bind(id)
983 .fetch_optional(&self.pool)
984 .await?;
985 match row {
986 Some(row) => {
987 let entity_ids: Vec<i64> = serde_json::from_str(&row.entity_ids)?;
988 Ok(Some(Community {
989 id: row.id,
990 name: row.name,
991 summary: row.summary,
992 entity_ids,
993 fingerprint: row.fingerprint,
994 created_at: row.created_at,
995 updated_at: row.updated_at,
996 }))
997 }
998 None => Ok(None),
999 }
1000 }
1001
1002 pub async fn delete_all_communities(&self) -> Result<(), MemoryError> {
1008 zeph_db::query(sql!("DELETE FROM graph_communities"))
1009 .execute(&self.pool)
1010 .await?;
1011 Ok(())
1012 }
1013
1014 #[allow(clippy::too_many_lines)]
1028 pub async fn find_entities_ranked(
1029 &self,
1030 query: &str,
1031 limit: usize,
1032 ) -> Result<Vec<(Entity, f32)>, MemoryError> {
1033 type EntityFtsRow = (
1036 i64,
1037 String,
1038 String,
1039 String,
1040 Option<String>,
1041 String,
1042 String,
1043 Option<String>,
1044 f64,
1045 );
1046
1047 const FTS5_OPERATORS: &[&str] = &["AND", "OR", "NOT", "NEAR"];
1048 let query = &query[..query.floor_char_boundary(512)];
1049 let sanitized = sanitize_fts_query(query);
1050 if sanitized.is_empty() {
1051 return Ok(vec![]);
1052 }
1053 let fts_query: String = sanitized
1054 .split_whitespace()
1055 .filter(|t| !FTS5_OPERATORS.contains(t))
1056 .map(|t| format!("{t}*"))
1057 .collect::<Vec<_>>()
1058 .join(" ");
1059 if fts_query.is_empty() {
1060 return Ok(vec![]);
1061 }
1062
1063 let limit_i64 = i64::try_from(limit)?;
1064
1065 let ranked_fts_sql = format!(
1068 "SELECT * FROM ( \
1069 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1070 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1071 -bm25(graph_entities_fts, 10.0, 1.0) AS fts_rank \
1072 FROM graph_entities_fts fts \
1073 JOIN graph_entities e ON e.id = fts.rowid \
1074 WHERE graph_entities_fts MATCH ? \
1075 UNION ALL \
1076 SELECT e.id, e.name, e.canonical_name, e.entity_type, e.summary, \
1077 e.first_seen_at, e.last_seen_at, e.qdrant_point_id, \
1078 0.5 AS fts_rank \
1079 FROM graph_entity_aliases a \
1080 JOIN graph_entities e ON e.id = a.entity_id \
1081 WHERE a.alias_name LIKE ? ESCAPE '\\\\' {} \
1082 ) \
1083 ORDER BY fts_rank DESC \
1084 LIMIT ?",
1085 <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
1086 );
1087 let rows: Vec<EntityFtsRow> = zeph_db::query_as(&ranked_fts_sql)
1088 .bind(&fts_query)
1089 .bind(format!(
1090 "%{}%",
1091 query
1092 .trim()
1093 .replace('\\', "\\\\")
1094 .replace('%', "\\%")
1095 .replace('_', "\\_")
1096 ))
1097 .bind(limit_i64)
1098 .fetch_all(&self.pool)
1099 .await?;
1100
1101 if rows.is_empty() {
1102 return Ok(vec![]);
1103 }
1104
1105 let max_score: f64 = rows.iter().map(|r| r.8).fold(0.0_f64, f64::max);
1107 let max_score = if max_score <= 0.0 { 1.0 } else { max_score };
1108
1109 let mut seen_ids: std::collections::HashSet<i64> = std::collections::HashSet::new();
1111 let mut result: Vec<(Entity, f32)> = Vec::with_capacity(rows.len());
1112 for (
1113 id,
1114 name,
1115 canonical_name,
1116 entity_type_str,
1117 summary,
1118 first_seen_at,
1119 last_seen_at,
1120 qdrant_point_id,
1121 raw_score,
1122 ) in rows
1123 {
1124 if !seen_ids.insert(id) {
1125 continue;
1126 }
1127 let entity_type = entity_type_str
1128 .parse()
1129 .unwrap_or(super::types::EntityType::Concept);
1130 let entity = Entity {
1131 id,
1132 name,
1133 canonical_name,
1134 entity_type,
1135 summary,
1136 first_seen_at,
1137 last_seen_at,
1138 qdrant_point_id,
1139 };
1140 #[allow(clippy::cast_possible_truncation)]
1141 let normalized = (raw_score / max_score).clamp(0.0, 1.0) as f32;
1142 result.push((entity, normalized));
1143 }
1144
1145 Ok(result)
1146 }
1147
1148 pub async fn entity_structural_scores(
1158 &self,
1159 entity_ids: &[i64],
1160 ) -> Result<HashMap<i64, f32>, MemoryError> {
1161 const MAX_BATCH: usize = 163;
1164
1165 if entity_ids.is_empty() {
1166 return Ok(HashMap::new());
1167 }
1168
1169 let mut all_rows: Vec<(i64, i64, i64)> = Vec::new();
1170 for chunk in entity_ids.chunks(MAX_BATCH) {
1171 let n = chunk.len();
1172 let ph1 = placeholder_list(1, n);
1174 let ph2 = placeholder_list(n + 1, n);
1175 let ph3 = placeholder_list(n * 2 + 1, n);
1176
1177 let sql = format!(
1179 "SELECT entity_id,
1180 COUNT(*) AS degree,
1181 COUNT(DISTINCT edge_type) AS type_diversity
1182 FROM (
1183 SELECT source_entity_id AS entity_id, edge_type
1184 FROM graph_edges
1185 WHERE valid_to IS NULL AND source_entity_id IN ({ph1})
1186 UNION ALL
1187 SELECT target_entity_id AS entity_id, edge_type
1188 FROM graph_edges
1189 WHERE valid_to IS NULL AND target_entity_id IN ({ph2})
1190 )
1191 WHERE entity_id IN ({ph3})
1192 GROUP BY entity_id"
1193 );
1194
1195 let mut query = zeph_db::query_as::<_, (i64, i64, i64)>(&sql);
1196 for id in chunk {
1198 query = query.bind(*id);
1199 }
1200 for id in chunk {
1201 query = query.bind(*id);
1202 }
1203 for id in chunk {
1204 query = query.bind(*id);
1205 }
1206
1207 let chunk_rows: Vec<(i64, i64, i64)> = query.fetch_all(&self.pool).await?;
1208 all_rows.extend(chunk_rows);
1209 }
1210
1211 if all_rows.is_empty() {
1212 return Ok(entity_ids.iter().map(|&id| (id, 0.0_f32)).collect());
1213 }
1214
1215 let max_degree = all_rows
1216 .iter()
1217 .map(|(_, d, _)| *d)
1218 .max()
1219 .unwrap_or(1)
1220 .max(1);
1221
1222 let mut scores: HashMap<i64, f32> = entity_ids.iter().map(|&id| (id, 0.0_f32)).collect();
1223 for (entity_id, degree, type_diversity) in all_rows {
1224 #[allow(clippy::cast_precision_loss)]
1225 let norm_degree = degree as f32 / max_degree as f32;
1226 #[allow(clippy::cast_precision_loss)]
1227 let norm_diversity = (type_diversity as f32 / 4.0).min(1.0);
1228 let score = 0.6 * norm_degree + 0.4 * norm_diversity;
1229 scores.insert(entity_id, score);
1230 }
1231
1232 Ok(scores)
1233 }
1234
1235 pub async fn entity_community_ids(
1244 &self,
1245 entity_ids: &[i64],
1246 ) -> Result<HashMap<i64, i64>, MemoryError> {
1247 const MAX_BATCH: usize = 490;
1248
1249 if entity_ids.is_empty() {
1250 return Ok(HashMap::new());
1251 }
1252
1253 let mut result: HashMap<i64, i64> = HashMap::new();
1254 for chunk in entity_ids.chunks(MAX_BATCH) {
1255 let placeholders = placeholder_list(1, chunk.len());
1256
1257 let community_sql = community_ids_sql(&placeholders);
1258 let mut query = zeph_db::query_as::<_, (i64, i64)>(&community_sql);
1259 for id in chunk {
1260 query = query.bind(*id);
1261 }
1262
1263 let rows: Vec<(i64, i64)> = query.fetch_all(&self.pool).await?;
1264 result.extend(rows);
1265 }
1266
1267 Ok(result)
1268 }
1269
1270 pub async fn record_edge_retrieval(&self, edge_ids: &[i64]) -> Result<(), MemoryError> {
1279 const MAX_BATCH: usize = 490;
1280 let epoch_now = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1281 for chunk in edge_ids.chunks(MAX_BATCH) {
1282 let edge_placeholders = placeholder_list(1, chunk.len());
1283 let retrieval_sql = format!(
1284 "UPDATE graph_edges \
1285 SET retrieval_count = retrieval_count + 1, \
1286 last_retrieved_at = {epoch_now} \
1287 WHERE id IN ({edge_placeholders})"
1288 );
1289 let mut q = zeph_db::query(&retrieval_sql);
1290 for id in chunk {
1291 q = q.bind(*id);
1292 }
1293 q.execute(&self.pool).await?;
1294 }
1295 Ok(())
1296 }
1297
1298 pub async fn decay_edge_retrieval_counts(
1307 &self,
1308 decay_lambda: f64,
1309 interval_secs: u64,
1310 ) -> Result<usize, MemoryError> {
1311 let epoch_now_decay = <ActiveDialect as zeph_db::dialect::Dialect>::EPOCH_NOW;
1312 let decay_raw = format!(
1313 "UPDATE graph_edges \
1314 SET retrieval_count = MAX(CAST(retrieval_count * ? AS INTEGER), 0) \
1315 WHERE valid_to IS NULL \
1316 AND retrieval_count > 0 \
1317 AND (last_retrieved_at IS NULL OR last_retrieved_at < {epoch_now_decay} - ?)"
1318 );
1319 let decay_sql = zeph_db::rewrite_placeholders(&decay_raw);
1320 let result = zeph_db::query(&decay_sql)
1321 .bind(decay_lambda)
1322 .bind(i64::try_from(interval_secs).unwrap_or(i64::MAX))
1323 .execute(&self.pool)
1324 .await?;
1325 Ok(usize::try_from(result.rows_affected())?)
1326 }
1327
1328 pub async fn delete_expired_edges(&self, retention_days: u32) -> Result<usize, MemoryError> {
1334 let days = i64::from(retention_days);
1335 let result = zeph_db::query(sql!(
1336 "DELETE FROM graph_edges
1337 WHERE expired_at IS NOT NULL
1338 AND expired_at < datetime('now', '-' || ? || ' days')"
1339 ))
1340 .bind(days)
1341 .execute(&self.pool)
1342 .await?;
1343 Ok(usize::try_from(result.rows_affected())?)
1344 }
1345
1346 pub async fn delete_orphan_entities(&self, retention_days: u32) -> Result<usize, MemoryError> {
1352 let days = i64::from(retention_days);
1353 let result = zeph_db::query(sql!(
1354 "DELETE FROM graph_entities
1355 WHERE id NOT IN (
1356 SELECT DISTINCT source_entity_id FROM graph_edges WHERE valid_to IS NULL
1357 UNION
1358 SELECT DISTINCT target_entity_id FROM graph_edges WHERE valid_to IS NULL
1359 )
1360 AND last_seen_at < datetime('now', '-' || ? || ' days')"
1361 ))
1362 .bind(days)
1363 .execute(&self.pool)
1364 .await?;
1365 Ok(usize::try_from(result.rows_affected())?)
1366 }
1367
1368 pub async fn cap_entities(&self, max_entities: usize) -> Result<usize, MemoryError> {
1377 let current = self.entity_count().await?;
1378 let max = i64::try_from(max_entities)?;
1379 if current <= max {
1380 return Ok(0);
1381 }
1382 let excess = current - max;
1383 let result = zeph_db::query(sql!(
1384 "DELETE FROM graph_entities
1385 WHERE id IN (
1386 SELECT e.id
1387 FROM graph_entities e
1388 LEFT JOIN (
1389 SELECT source_entity_id AS eid, COUNT(*) AS cnt
1390 FROM graph_edges WHERE valid_to IS NULL GROUP BY source_entity_id
1391 UNION ALL
1392 SELECT target_entity_id AS eid, COUNT(*) AS cnt
1393 FROM graph_edges WHERE valid_to IS NULL GROUP BY target_entity_id
1394 ) edge_counts ON e.id = edge_counts.eid
1395 ORDER BY COALESCE(edge_counts.cnt, 0) ASC, e.last_seen_at ASC
1396 LIMIT ?
1397 )"
1398 ))
1399 .bind(excess)
1400 .execute(&self.pool)
1401 .await?;
1402 Ok(usize::try_from(result.rows_affected())?)
1403 }
1404
1405 pub async fn edges_at_timestamp(
1419 &self,
1420 entity_id: i64,
1421 timestamp: &str,
1422 ) -> Result<Vec<Edge>, MemoryError> {
1423 let rows: Vec<EdgeRow> = zeph_db::query_as(sql!(
1427 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1428 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1429 edge_type, retrieval_count, last_retrieved_at
1430 FROM graph_edges
1431 WHERE valid_to IS NULL
1432 AND valid_from <= ?
1433 AND (source_entity_id = ? OR target_entity_id = ?)
1434 UNION ALL
1435 SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1436 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1437 edge_type, retrieval_count, last_retrieved_at
1438 FROM graph_edges
1439 WHERE valid_to IS NOT NULL
1440 AND valid_from <= ?
1441 AND valid_to > ?
1442 AND (source_entity_id = ? OR target_entity_id = ?)"
1443 ))
1444 .bind(timestamp)
1445 .bind(entity_id)
1446 .bind(entity_id)
1447 .bind(timestamp)
1448 .bind(timestamp)
1449 .bind(entity_id)
1450 .bind(entity_id)
1451 .fetch_all(&self.pool)
1452 .await?;
1453 Ok(rows.into_iter().map(edge_from_row).collect())
1454 }
1455
1456 pub async fn edge_history(
1465 &self,
1466 source_entity_id: i64,
1467 predicate: &str,
1468 relation: Option<&str>,
1469 limit: usize,
1470 ) -> Result<Vec<Edge>, MemoryError> {
1471 let escaped = predicate
1473 .replace('\\', "\\\\")
1474 .replace('%', "\\%")
1475 .replace('_', "\\_");
1476 let like_pattern = format!("%{escaped}%");
1477 let limit = i64::try_from(limit)?;
1478 let rows: Vec<EdgeRow> = if let Some(rel) = relation {
1479 zeph_db::query_as(sql!(
1480 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1481 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1482 edge_type, retrieval_count, last_retrieved_at
1483 FROM graph_edges
1484 WHERE source_entity_id = ?
1485 AND fact LIKE ? ESCAPE '\\'
1486 AND relation = ?
1487 ORDER BY valid_from DESC
1488 LIMIT ?"
1489 ))
1490 .bind(source_entity_id)
1491 .bind(&like_pattern)
1492 .bind(rel)
1493 .bind(limit)
1494 .fetch_all(&self.pool)
1495 .await?
1496 } else {
1497 zeph_db::query_as(sql!(
1498 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1499 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1500 edge_type, retrieval_count, last_retrieved_at
1501 FROM graph_edges
1502 WHERE source_entity_id = ?
1503 AND fact LIKE ? ESCAPE '\\'
1504 ORDER BY valid_from DESC
1505 LIMIT ?"
1506 ))
1507 .bind(source_entity_id)
1508 .bind(&like_pattern)
1509 .bind(limit)
1510 .fetch_all(&self.pool)
1511 .await?
1512 };
1513 Ok(rows.into_iter().map(edge_from_row).collect())
1514 }
1515
1516 pub async fn bfs(
1533 &self,
1534 start_entity_id: i64,
1535 max_hops: u32,
1536 ) -> Result<(Vec<Entity>, Vec<Edge>), MemoryError> {
1537 self.bfs_with_depth(start_entity_id, max_hops)
1538 .await
1539 .map(|(e, ed, _)| (e, ed))
1540 }
1541
1542 pub async fn bfs_with_depth(
1553 &self,
1554 start_entity_id: i64,
1555 max_hops: u32,
1556 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1557 self.bfs_core(start_entity_id, max_hops, None).await
1558 }
1559
1560 pub async fn bfs_at_timestamp(
1571 &self,
1572 start_entity_id: i64,
1573 max_hops: u32,
1574 timestamp: &str,
1575 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1576 self.bfs_core(start_entity_id, max_hops, Some(timestamp))
1577 .await
1578 }
1579
1580 pub async fn bfs_typed(
1596 &self,
1597 start_entity_id: i64,
1598 max_hops: u32,
1599 edge_types: &[EdgeType],
1600 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1601 if edge_types.is_empty() {
1602 return self.bfs_with_depth(start_entity_id, max_hops).await;
1603 }
1604 self.bfs_core_typed(start_entity_id, max_hops, None, edge_types)
1605 .await
1606 }
1607
1608 async fn bfs_core(
1616 &self,
1617 start_entity_id: i64,
1618 max_hops: u32,
1619 at_timestamp: Option<&str>,
1620 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1621 use std::collections::HashMap;
1622
1623 const MAX_FRONTIER: usize = 300;
1626
1627 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1628 let mut frontier: Vec<i64> = vec![start_entity_id];
1629 depth_map.insert(start_entity_id, 0);
1630
1631 for hop in 0..max_hops {
1632 if frontier.is_empty() {
1633 break;
1634 }
1635 frontier.truncate(MAX_FRONTIER);
1636 let n = frontier.len();
1640 let ph1 = placeholder_list(1, n);
1641 let ph2 = placeholder_list(n + 1, n);
1642 let ph3 = placeholder_list(n * 2 + 1, n);
1643 let edge_filter = if at_timestamp.is_some() {
1644 let ts_pos = n * 3 + 1;
1645 format!(
1646 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1647 ts = numbered_placeholder(ts_pos),
1648 )
1649 } else {
1650 "valid_to IS NULL".to_owned()
1651 };
1652 let neighbour_sql = format!(
1653 "SELECT DISTINCT CASE
1654 WHEN source_entity_id IN ({ph1}) THEN target_entity_id
1655 ELSE source_entity_id
1656 END as neighbour_id
1657 FROM graph_edges
1658 WHERE {edge_filter}
1659 AND (source_entity_id IN ({ph2}) OR target_entity_id IN ({ph3}))"
1660 );
1661 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1662 for id in &frontier {
1663 q = q.bind(*id);
1664 }
1665 for id in &frontier {
1666 q = q.bind(*id);
1667 }
1668 for id in &frontier {
1669 q = q.bind(*id);
1670 }
1671 if let Some(ts) = at_timestamp {
1672 q = q.bind(ts);
1673 }
1674 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1675 let mut next_frontier: Vec<i64> = Vec::new();
1676 for nbr in neighbours {
1677 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1678 e.insert(hop + 1);
1679 next_frontier.push(nbr);
1680 }
1681 }
1682 frontier = next_frontier;
1683 }
1684
1685 self.bfs_fetch_results(depth_map, at_timestamp).await
1686 }
1687
1688 async fn bfs_core_typed(
1697 &self,
1698 start_entity_id: i64,
1699 max_hops: u32,
1700 at_timestamp: Option<&str>,
1701 edge_types: &[EdgeType],
1702 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1703 use std::collections::HashMap;
1704
1705 const MAX_FRONTIER: usize = 300;
1706
1707 let type_strs: Vec<&str> = edge_types.iter().map(|t| t.as_str()).collect();
1708
1709 let mut depth_map: HashMap<i64, u32> = HashMap::new();
1710 let mut frontier: Vec<i64> = vec![start_entity_id];
1711 depth_map.insert(start_entity_id, 0);
1712
1713 let n_types = type_strs.len();
1714 let type_in = placeholder_list(1, n_types);
1716 let id_start = n_types + 1;
1717
1718 for hop in 0..max_hops {
1719 if frontier.is_empty() {
1720 break;
1721 }
1722 frontier.truncate(MAX_FRONTIER);
1723
1724 let n_frontier = frontier.len();
1725 let fp1 = placeholder_list(id_start, n_frontier);
1727 let fp2 = placeholder_list(id_start + n_frontier, n_frontier);
1728 let fp3 = placeholder_list(id_start + n_frontier * 2, n_frontier);
1729
1730 let edge_filter = if at_timestamp.is_some() {
1731 let ts_pos = id_start + n_frontier * 3;
1732 format!(
1733 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1734 ts = numbered_placeholder(ts_pos),
1735 )
1736 } else {
1737 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1738 };
1739
1740 let neighbour_sql = format!(
1741 "SELECT DISTINCT CASE
1742 WHEN source_entity_id IN ({fp1}) THEN target_entity_id
1743 ELSE source_entity_id
1744 END as neighbour_id
1745 FROM graph_edges
1746 WHERE {edge_filter}
1747 AND (source_entity_id IN ({fp2}) OR target_entity_id IN ({fp3}))"
1748 );
1749
1750 let mut q = zeph_db::query_scalar::<_, i64>(&neighbour_sql);
1751 for t in &type_strs {
1753 q = q.bind(*t);
1754 }
1755 for id in &frontier {
1757 q = q.bind(*id);
1758 }
1759 for id in &frontier {
1760 q = q.bind(*id);
1761 }
1762 for id in &frontier {
1763 q = q.bind(*id);
1764 }
1765 if let Some(ts) = at_timestamp {
1766 q = q.bind(ts);
1767 }
1768
1769 let neighbours: Vec<i64> = q.fetch_all(&self.pool).await?;
1770 let mut next_frontier: Vec<i64> = Vec::new();
1771 for nbr in neighbours {
1772 if let std::collections::hash_map::Entry::Vacant(e) = depth_map.entry(nbr) {
1773 e.insert(hop + 1);
1774 next_frontier.push(nbr);
1775 }
1776 }
1777 frontier = next_frontier;
1778 }
1779
1780 self.bfs_fetch_results_typed(depth_map, at_timestamp, &type_strs)
1782 .await
1783 }
1784
1785 async fn bfs_fetch_results_typed(
1793 &self,
1794 depth_map: std::collections::HashMap<i64, u32>,
1795 at_timestamp: Option<&str>,
1796 type_strs: &[&str],
1797 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1798 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1799 if visited_ids.is_empty() {
1800 return Ok((Vec::new(), Vec::new(), depth_map));
1801 }
1802 if visited_ids.len() > 499 {
1803 tracing::warn!(
1804 total = visited_ids.len(),
1805 retained = 499,
1806 "bfs_fetch_results_typed: visited entity set truncated to 499"
1807 );
1808 visited_ids.truncate(499);
1809 }
1810
1811 let n_types = type_strs.len();
1812 let n_visited = visited_ids.len();
1813
1814 let type_in = placeholder_list(1, n_types);
1816 let id_start = n_types + 1;
1817 let ph_ids1 = placeholder_list(id_start, n_visited);
1818 let ph_ids2 = placeholder_list(id_start + n_visited, n_visited);
1819
1820 let edge_filter = if at_timestamp.is_some() {
1821 let ts_pos = id_start + n_visited * 2;
1822 format!(
1823 "edge_type IN ({type_in}) AND valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1824 ts = numbered_placeholder(ts_pos),
1825 )
1826 } else {
1827 format!("edge_type IN ({type_in}) AND valid_to IS NULL")
1828 };
1829
1830 let edge_sql = format!(
1831 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1832 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1833 edge_type, retrieval_count, last_retrieved_at
1834 FROM graph_edges
1835 WHERE {edge_filter}
1836 AND source_entity_id IN ({ph_ids1})
1837 AND target_entity_id IN ({ph_ids2})"
1838 );
1839 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1840 for t in type_strs {
1841 edge_query = edge_query.bind(*t);
1842 }
1843 for id in &visited_ids {
1844 edge_query = edge_query.bind(*id);
1845 }
1846 for id in &visited_ids {
1847 edge_query = edge_query.bind(*id);
1848 }
1849 if let Some(ts) = at_timestamp {
1850 edge_query = edge_query.bind(ts);
1851 }
1852 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1853
1854 let entity_sql2 = format!(
1856 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1857 FROM graph_entities WHERE id IN ({ph})",
1858 ph = placeholder_list(1, visited_ids.len()),
1859 );
1860 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql2);
1861 for id in &visited_ids {
1862 entity_query = entity_query.bind(*id);
1863 }
1864 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1865
1866 let entities: Vec<Entity> = entity_rows
1867 .into_iter()
1868 .map(entity_from_row)
1869 .collect::<Result<Vec<_>, _>>()?;
1870 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1871
1872 Ok((entities, edges, depth_map))
1873 }
1874
1875 async fn bfs_fetch_results(
1877 &self,
1878 depth_map: std::collections::HashMap<i64, u32>,
1879 at_timestamp: Option<&str>,
1880 ) -> Result<(Vec<Entity>, Vec<Edge>, std::collections::HashMap<i64, u32>), MemoryError> {
1881 let mut visited_ids: Vec<i64> = depth_map.keys().copied().collect();
1882 if visited_ids.is_empty() {
1883 return Ok((Vec::new(), Vec::new(), depth_map));
1884 }
1885 if visited_ids.len() > 499 {
1887 tracing::warn!(
1888 total = visited_ids.len(),
1889 retained = 499,
1890 "bfs_fetch_results: visited entity set truncated to 499 to stay within SQLite bind limit; \
1891 some reachable entities will be dropped from results"
1892 );
1893 visited_ids.truncate(499);
1894 }
1895
1896 let n = visited_ids.len();
1897 let ph_ids1 = placeholder_list(1, n);
1898 let ph_ids2 = placeholder_list(n + 1, n);
1899 let edge_filter = if at_timestamp.is_some() {
1900 let ts_pos = n * 2 + 1;
1901 format!(
1902 "valid_from <= {ts} AND (valid_to IS NULL OR valid_to > {ts})",
1903 ts = numbered_placeholder(ts_pos),
1904 )
1905 } else {
1906 "valid_to IS NULL".to_owned()
1907 };
1908 let edge_sql = format!(
1909 "SELECT id, source_entity_id, target_entity_id, relation, fact, confidence,
1910 valid_from, valid_to, created_at, expired_at, episode_id, qdrant_point_id,
1911 edge_type, retrieval_count, last_retrieved_at
1912 FROM graph_edges
1913 WHERE {edge_filter}
1914 AND source_entity_id IN ({ph_ids1})
1915 AND target_entity_id IN ({ph_ids2})"
1916 );
1917 let mut edge_query = zeph_db::query_as::<_, EdgeRow>(&edge_sql);
1918 for id in &visited_ids {
1919 edge_query = edge_query.bind(*id);
1920 }
1921 for id in &visited_ids {
1922 edge_query = edge_query.bind(*id);
1923 }
1924 if let Some(ts) = at_timestamp {
1925 edge_query = edge_query.bind(ts);
1926 }
1927 let edge_rows: Vec<EdgeRow> = edge_query.fetch_all(&self.pool).await?;
1928
1929 let entity_sql = format!(
1930 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id
1931 FROM graph_entities WHERE id IN ({ph})",
1932 ph = placeholder_list(1, n),
1933 );
1934 let mut entity_query = zeph_db::query_as::<_, EntityRow>(&entity_sql);
1935 for id in &visited_ids {
1936 entity_query = entity_query.bind(*id);
1937 }
1938 let entity_rows: Vec<EntityRow> = entity_query.fetch_all(&self.pool).await?;
1939
1940 let entities: Vec<Entity> = entity_rows
1941 .into_iter()
1942 .map(entity_from_row)
1943 .collect::<Result<Vec<_>, _>>()?;
1944 let edges: Vec<Edge> = edge_rows.into_iter().map(edge_from_row).collect();
1945
1946 Ok((entities, edges, depth_map))
1947 }
1948
1949 pub async fn find_entity_by_name(&self, name: &str) -> Result<Vec<Entity>, MemoryError> {
1965 let find_by_name_sql = format!(
1966 "SELECT id, name, canonical_name, entity_type, summary, first_seen_at, last_seen_at, qdrant_point_id \
1967 FROM graph_entities \
1968 WHERE name = ? {cn} OR canonical_name = ? {cn} \
1969 LIMIT 5",
1970 cn = <ActiveDialect as zeph_db::dialect::Dialect>::COLLATE_NOCASE,
1971 );
1972 let rows: Vec<EntityRow> = zeph_db::query_as(&find_by_name_sql)
1973 .bind(name)
1974 .bind(name)
1975 .fetch_all(&self.pool)
1976 .await?;
1977
1978 if !rows.is_empty() {
1979 return rows.into_iter().map(entity_from_row).collect();
1980 }
1981
1982 self.find_entities_fuzzy(name, 5).await
1983 }
1984
1985 pub async fn unprocessed_messages_for_backfill(
1993 &self,
1994 limit: usize,
1995 ) -> Result<Vec<(crate::types::MessageId, String)>, MemoryError> {
1996 let limit = i64::try_from(limit)?;
1997 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
1998 "SELECT id, content FROM messages
1999 WHERE graph_processed = 0
2000 ORDER BY id ASC
2001 LIMIT ?"
2002 ))
2003 .bind(limit)
2004 .fetch_all(&self.pool)
2005 .await?;
2006 Ok(rows
2007 .into_iter()
2008 .map(|(id, content)| (crate::types::MessageId(id), content))
2009 .collect())
2010 }
2011
2012 pub async fn unprocessed_message_count(&self) -> Result<i64, MemoryError> {
2018 let count: i64 = zeph_db::query_scalar(sql!(
2019 "SELECT COUNT(*) FROM messages WHERE graph_processed = 0"
2020 ))
2021 .fetch_one(&self.pool)
2022 .await?;
2023 Ok(count)
2024 }
2025
2026 pub async fn mark_messages_graph_processed(
2032 &self,
2033 ids: &[crate::types::MessageId],
2034 ) -> Result<(), MemoryError> {
2035 const MAX_BATCH: usize = 490;
2036 if ids.is_empty() {
2037 return Ok(());
2038 }
2039 for chunk in ids.chunks(MAX_BATCH) {
2040 let placeholders = placeholder_list(1, chunk.len());
2041 let sql =
2042 format!("UPDATE messages SET graph_processed = 1 WHERE id IN ({placeholders})");
2043 let mut query = zeph_db::query(&sql);
2044 for id in chunk {
2045 query = query.bind(id.0);
2046 }
2047 query.execute(&self.pool).await?;
2048 }
2049 Ok(())
2050 }
2051}
2052
2053#[cfg(feature = "sqlite")]
2056fn community_ids_sql(placeholders: &str) -> String {
2057 format!(
2058 "SELECT CAST(j.value AS INTEGER) AS entity_id, c.id AS community_id
2059 FROM graph_communities c, json_each(c.entity_ids) j
2060 WHERE CAST(j.value AS INTEGER) IN ({placeholders})"
2061 )
2062}
2063
2064#[cfg(feature = "postgres")]
2065fn community_ids_sql(placeholders: &str) -> String {
2066 format!(
2067 "SELECT (j.value)::bigint AS entity_id, c.id AS community_id
2068 FROM graph_communities c,
2069 jsonb_array_elements_text(c.entity_ids::jsonb) j(value)
2070 WHERE (j.value)::bigint IN ({placeholders})"
2071 )
2072}
2073
2074#[derive(zeph_db::FromRow)]
2077struct EntityRow {
2078 id: i64,
2079 name: String,
2080 canonical_name: String,
2081 entity_type: String,
2082 summary: Option<String>,
2083 first_seen_at: String,
2084 last_seen_at: String,
2085 qdrant_point_id: Option<String>,
2086}
2087
2088fn entity_from_row(row: EntityRow) -> Result<Entity, MemoryError> {
2089 let entity_type = row
2090 .entity_type
2091 .parse::<EntityType>()
2092 .map_err(MemoryError::GraphStore)?;
2093 Ok(Entity {
2094 id: row.id,
2095 name: row.name,
2096 canonical_name: row.canonical_name,
2097 entity_type,
2098 summary: row.summary,
2099 first_seen_at: row.first_seen_at,
2100 last_seen_at: row.last_seen_at,
2101 qdrant_point_id: row.qdrant_point_id,
2102 })
2103}
2104
2105#[derive(zeph_db::FromRow)]
2106struct AliasRow {
2107 id: i64,
2108 entity_id: i64,
2109 alias_name: String,
2110 created_at: String,
2111}
2112
2113fn alias_from_row(row: AliasRow) -> EntityAlias {
2114 EntityAlias {
2115 id: row.id,
2116 entity_id: row.entity_id,
2117 alias_name: row.alias_name,
2118 created_at: row.created_at,
2119 }
2120}
2121
2122#[derive(zeph_db::FromRow)]
2123struct EdgeRow {
2124 id: i64,
2125 source_entity_id: i64,
2126 target_entity_id: i64,
2127 relation: String,
2128 fact: String,
2129 confidence: f64,
2130 valid_from: String,
2131 valid_to: Option<String>,
2132 created_at: String,
2133 expired_at: Option<String>,
2134 episode_id: Option<i64>,
2135 qdrant_point_id: Option<String>,
2136 edge_type: String,
2137 retrieval_count: i32,
2138 last_retrieved_at: Option<i64>,
2139}
2140
2141fn edge_from_row(row: EdgeRow) -> Edge {
2142 let edge_type = row
2143 .edge_type
2144 .parse::<EdgeType>()
2145 .unwrap_or(EdgeType::Semantic);
2146 Edge {
2147 id: row.id,
2148 source_entity_id: row.source_entity_id,
2149 target_entity_id: row.target_entity_id,
2150 relation: row.relation,
2151 fact: row.fact,
2152 #[allow(clippy::cast_possible_truncation)]
2153 confidence: row.confidence as f32,
2154 valid_from: row.valid_from,
2155 valid_to: row.valid_to,
2156 created_at: row.created_at,
2157 expired_at: row.expired_at,
2158 episode_id: row.episode_id.map(MessageId),
2159 qdrant_point_id: row.qdrant_point_id,
2160 edge_type,
2161 retrieval_count: row.retrieval_count,
2162 last_retrieved_at: row.last_retrieved_at,
2163 }
2164}
2165
2166#[derive(zeph_db::FromRow)]
2167struct CommunityRow {
2168 id: i64,
2169 name: String,
2170 summary: String,
2171 entity_ids: String,
2172 fingerprint: Option<String>,
2173 created_at: String,
2174 updated_at: String,
2175}
2176
2177#[cfg(test)]
2180mod tests;