1use anyhow::{Result, bail};
2use serde::{Deserialize, Serialize};
3use std::cell::RefCell;
4use std::collections::{BTreeMap, BTreeSet, VecDeque};
5
6pub const SQLITE_GRAPH_SCHEMA_VERSION: i64 = 5;
7
8#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
9pub struct GraphProvenance {
10 pub source: String,
11 pub source_ref: String,
12 #[serde(skip_serializing_if = "Option::is_none")]
13 pub content_hash: Option<String>,
14}
15
16impl GraphProvenance {
17 pub fn new(source: impl Into<String>, source_ref: impl Into<String>) -> Self {
18 Self {
19 source: source.into(),
20 source_ref: source_ref.into(),
21 content_hash: None,
22 }
23 }
24
25 pub fn with_content_hash(mut self, content_hash: impl Into<String>) -> Self {
26 self.content_hash = Some(content_hash.into());
27 self
28 }
29}
30
31#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
32pub struct GraphFreshness {
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub content_hash: Option<String>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub observed_at_unix: Option<i64>,
37}
38
39impl GraphFreshness {
40 pub fn content_hash(content_hash: impl Into<String>) -> Self {
41 Self {
42 content_hash: Some(content_hash.into()),
43 observed_at_unix: None,
44 }
45 }
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49pub struct GraphNode {
50 pub id: String,
51 pub kind: String,
52 pub label: String,
53 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
54 pub properties: BTreeMap<String, String>,
55 #[serde(default, skip_serializing_if = "Vec::is_empty")]
56 pub provenance: Vec<GraphProvenance>,
57 #[serde(skip_serializing_if = "Option::is_none")]
58 pub freshness: Option<GraphFreshness>,
59}
60
61impl GraphNode {
62 pub fn new(id: impl Into<String>, kind: impl Into<String>, label: impl Into<String>) -> Self {
63 Self {
64 id: id.into(),
65 kind: kind.into(),
66 label: label.into(),
67 properties: BTreeMap::new(),
68 provenance: Vec::new(),
69 freshness: None,
70 }
71 }
72
73 pub fn with_property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
74 self.properties.insert(key.into(), value.into());
75 self
76 }
77
78 pub fn with_provenance(mut self, provenance: GraphProvenance) -> Self {
79 self.provenance.push(provenance);
80 self
81 }
82
83 pub fn with_freshness(mut self, freshness: GraphFreshness) -> Self {
84 self.freshness = Some(freshness);
85 self
86 }
87}
88
89#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
90pub struct GraphEdge {
91 #[serde(default)]
92 pub id: String,
93 pub from_id: String,
94 pub to_id: String,
95 pub kind: String,
96 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
97 pub properties: BTreeMap<String, String>,
98 #[serde(default, skip_serializing_if = "Vec::is_empty")]
99 pub provenance: Vec<GraphProvenance>,
100 #[serde(skip_serializing_if = "Option::is_none")]
101 pub freshness: Option<GraphFreshness>,
102}
103
104impl GraphEdge {
105 pub fn stable_id(from_id: &str, to_id: &str, kind: &str) -> String {
106 stable_graph_edge_id(from_id, to_id, kind)
107 }
108
109 pub fn new(
110 from_id: impl Into<String>,
111 to_id: impl Into<String>,
112 kind: impl Into<String>,
113 ) -> Self {
114 let from_id = from_id.into();
115 let to_id = to_id.into();
116 let kind = kind.into();
117 Self {
118 id: stable_graph_edge_id(&from_id, &to_id, &kind),
119 from_id,
120 to_id,
121 kind,
122 properties: BTreeMap::new(),
123 provenance: Vec::new(),
124 freshness: None,
125 }
126 }
127
128 pub fn with_property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
129 self.properties.insert(key.into(), value.into());
130 self
131 }
132
133 pub fn with_provenance(mut self, provenance: GraphProvenance) -> Self {
134 self.provenance.push(provenance);
135 self
136 }
137
138 pub fn with_freshness(mut self, freshness: GraphFreshness) -> Self {
139 self.freshness = Some(freshness);
140 self
141 }
142}
143
144pub fn stable_graph_edge_id(from_id: &str, to_id: &str, kind: &str) -> String {
145 let raw = serde_json::json!([from_id, kind, to_id]).to_string();
146 format!("edge:{}", blake3::hash(raw.as_bytes()).to_hex())
147}
148
149pub fn graph_edge_id(edge: &GraphEdge) -> String {
150 if edge.id.is_empty() {
151 stable_graph_edge_id(&edge.from_id, &edge.to_id, &edge.kind)
152 } else {
153 edge.id.clone()
154 }
155}
156
157#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
158pub struct GraphProjection {
159 pub nodes: Vec<GraphNode>,
160 pub edges: Vec<GraphEdge>,
161}
162
163impl GraphProjection {
164 pub fn upsert_into<S: GraphStore + ?Sized>(&self, store: &S) -> Result<()> {
165 for node in &self.nodes {
166 store.upsert_node(node)?;
167 }
168 for edge in &self.edges {
169 store.upsert_edge(edge)?;
170 }
171 Ok(())
172 }
173
174 pub fn to_convex_rows(&self) -> ConvexProjectionRows {
175 ConvexProjectionRows::from(self)
176 }
177}
178
179#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
180pub struct GraphPath {
181 pub nodes: Vec<String>,
182 pub hops: usize,
183}
184
185#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
186pub struct GraphSubgraph {
187 pub nodes: Vec<GraphNode>,
188 pub edges: Vec<GraphEdge>,
189}
190
191impl GraphSubgraph {
192 pub fn sorted(mut self) -> Self {
193 self.nodes.sort_by(|left, right| left.id.cmp(&right.id));
194 self.edges.sort_by(|left, right| {
195 left.from_id
196 .cmp(&right.from_id)
197 .then(left.kind.cmp(&right.kind))
198 .then(left.to_id.cmp(&right.to_id))
199 .then_with(|| graph_edge_id(left).cmp(&graph_edge_id(right)))
200 });
201 self
202 }
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct GraphPropertyFilter {
207 pub key: String,
208 pub value: String,
209}
210
211#[derive(Debug, Clone, Default, PartialEq, Eq)]
212pub struct GraphQueryOptions {
213 pub cursor: Option<String>,
214 pub limit: Option<usize>,
215 pub property_filters: Vec<GraphPropertyFilter>,
216}
217
218#[derive(Debug, Clone, Default, PartialEq, Eq)]
219pub struct GraphQueryPage {
220 pub cursor: Option<String>,
221 pub limit: Option<usize>,
222 pub next_cursor: Option<String>,
223 pub returned_nodes: usize,
224 pub returned_edges: usize,
225 pub truncated: bool,
226 pub diagnostics: Vec<String>,
227}
228
229#[derive(Debug, Clone, PartialEq, Eq)]
230pub struct GraphPagedSubgraph {
231 pub nodes: Vec<GraphNode>,
232 pub edges: Vec<GraphEdge>,
233 pub page: GraphQueryPage,
234}
235
236fn graph_node_matches_filters(node: &GraphNode, filters: &[GraphPropertyFilter]) -> bool {
237 filters
238 .iter()
239 .all(|filter| node.properties.get(&filter.key) == Some(&filter.value))
240}
241
242fn graph_edge_matches_filters(edge: &GraphEdge, filters: &[GraphPropertyFilter]) -> bool {
243 filters
244 .iter()
245 .all(|filter| edge.properties.get(&filter.key) == Some(&filter.value))
246}
247
248pub fn apply_graph_query_page(
249 mut nodes: Vec<GraphNode>,
250 mut edges: Vec<GraphEdge>,
251 options: GraphQueryOptions,
252 mut diagnostics: Vec<String>,
253) -> GraphPagedSubgraph {
254 nodes.sort_by(|left, right| left.id.cmp(&right.id));
255 edges.sort_by(|left, right| {
256 left.from_id
257 .cmp(&right.from_id)
258 .then(left.kind.cmp(&right.kind))
259 .then(left.to_id.cmp(&right.to_id))
260 });
261
262 let before_filter = nodes.len();
263 if !options.property_filters.is_empty() {
264 nodes.retain(|node| graph_node_matches_filters(node, &options.property_filters));
265 }
266 let after_filter = nodes.len();
267
268 if let Some(cursor) = &options.cursor {
269 nodes.retain(|node| node.id > *cursor);
270 }
271
272 let before_limit = nodes.len();
273 let mut next_cursor = None;
274 if let Some(limit) = options.limit
275 && nodes.len() > limit
276 {
277 next_cursor = nodes
278 .get(limit.saturating_sub(1))
279 .map(|node| node.id.clone());
280 nodes.truncate(limit);
281 }
282
283 let node_ids = nodes
284 .iter()
285 .map(|node| node.id.as_str())
286 .collect::<BTreeSet<_>>();
287 edges.retain(|edge| {
288 node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
289 });
290
291 if after_filter != before_filter {
292 diagnostics.push(format!(
293 "property filters removed {} node(s)",
294 before_filter.saturating_sub(after_filter)
295 ));
296 }
297 if options.cursor.is_some() {
298 diagnostics.push("cursor is exclusive and ordered by node id".to_string());
299 }
300 if next_cursor.is_some() {
301 diagnostics.push(
302 "result was truncated; pass page.next_cursor as --cursor for the next page".to_string(),
303 );
304 }
305
306 GraphPagedSubgraph {
307 page: GraphQueryPage {
308 cursor: options.cursor,
309 limit: options.limit,
310 next_cursor,
311 returned_nodes: nodes.len(),
312 returned_edges: edges.len(),
313 truncated: options.limit.is_some_and(|limit| before_limit > limit),
314 diagnostics,
315 },
316 nodes,
317 edges,
318 }
319}
320
321pub fn apply_graph_edge_query_page(
322 mut edges: Vec<GraphEdge>,
323 options: GraphQueryOptions,
324 mut diagnostics: Vec<String>,
325) -> GraphPagedSubgraph {
326 edges.sort_by_key(graph_edge_id);
327
328 let before_filter = edges.len();
329 if !options.property_filters.is_empty() {
330 edges.retain(|edge| graph_edge_matches_filters(edge, &options.property_filters));
331 }
332 let after_filter = edges.len();
333
334 if let Some(cursor) = &options.cursor {
335 edges.retain(|edge| graph_edge_id(edge) > *cursor);
336 }
337
338 let before_limit = edges.len();
339 let mut next_cursor = None;
340 if let Some(limit) = options.limit
341 && edges.len() > limit
342 {
343 next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
344 edges.truncate(limit);
345 }
346
347 if after_filter != before_filter {
348 diagnostics.push(format!(
349 "edge property filters removed {} edge(s)",
350 before_filter.saturating_sub(after_filter)
351 ));
352 }
353 if options.cursor.is_some() {
354 diagnostics.push("cursor is exclusive and ordered by edge id".to_string());
355 }
356 if next_cursor.is_some() {
357 diagnostics.push(
358 "result was truncated; pass page.next_cursor as --cursor for the next page".to_string(),
359 );
360 }
361
362 GraphPagedSubgraph {
363 page: GraphQueryPage {
364 cursor: options.cursor,
365 limit: options.limit,
366 next_cursor,
367 returned_nodes: 0,
368 returned_edges: edges.len(),
369 truncated: options.limit.is_some_and(|limit| before_limit > limit),
370 diagnostics,
371 },
372 nodes: Vec::new(),
373 edges,
374 }
375}
376
377pub trait GraphStore {
378 fn upsert_node(&self, node: &GraphNode) -> Result<()>;
379 fn upsert_edge(&self, edge: &GraphEdge) -> Result<()>;
380 fn delete_node(&self, id: &str) -> Result<usize>;
381 fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize>;
382 fn node(&self, id: &str) -> Result<Option<GraphNode>>;
383 fn all_nodes(&self) -> Result<Vec<GraphNode>>;
384 fn all_edges(&self) -> Result<Vec<GraphEdge>>;
385 fn edge(&self, edge_id: &str) -> Result<Option<GraphEdge>> {
386 Ok(self
387 .all_edges()?
388 .into_iter()
389 .find(|edge| graph_edge_id(edge) == edge_id))
390 }
391 fn graph_counts(&self) -> Result<(usize, usize)> {
392 Ok((self.all_nodes()?.len(), self.all_edges()?.len()))
393 }
394 fn sample_edge(&self, kind: Option<&str>) -> Result<Option<GraphEdge>> {
395 let mut edges = self
396 .all_edges()?
397 .into_iter()
398 .filter(|edge| edge.from_id != edge.to_id)
399 .filter(|edge| kind.is_none_or(|kind| edge.kind == kind))
400 .collect::<Vec<_>>();
401 edges.sort_by(|left, right| {
402 left.from_id
403 .cmp(&right.from_id)
404 .then(left.kind.cmp(&right.kind))
405 .then(left.to_id.cmp(&right.to_id))
406 });
407 Ok(edges.into_iter().next())
408 }
409 fn sample_edge_with_property(&self) -> Result<Option<(GraphEdge, GraphPropertyFilter)>> {
410 let mut probes = Vec::new();
411 for edge in self.all_edges()? {
412 if edge.from_id == edge.to_id {
413 continue;
414 }
415 if let Some((key, value)) = edge
416 .properties
417 .iter()
418 .next()
419 .map(|(key, value)| (key.clone(), value.clone()))
420 {
421 probes.push((edge, GraphPropertyFilter { key, value }));
422 }
423 }
424 probes.sort_by(|(left_edge, left_filter), (right_edge, right_filter)| {
425 left_filter
426 .key
427 .cmp(&right_filter.key)
428 .then(left_filter.value.cmp(&right_filter.value))
429 .then_with(|| graph_edge_id(left_edge).cmp(&graph_edge_id(right_edge)))
430 });
431 Ok(probes.into_iter().next())
432 }
433 fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>>;
434 fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>>;
435 fn incident_edges(&self, node_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
436 let mut edges = self
437 .all_edges()?
438 .into_iter()
439 .filter(|edge| edge.from_id == node_id || edge.to_id == node_id)
440 .filter(|edge| kind.is_none_or(|kind| edge.kind == kind))
441 .collect::<Vec<_>>();
442 edges.sort_by_key(graph_edge_id);
443 Ok(edges)
444 }
445 fn paged_edges(
446 &self,
447 kind: Option<&str>,
448 options: GraphQueryOptions,
449 ) -> Result<GraphPagedSubgraph> {
450 let edges = self
451 .all_edges()?
452 .into_iter()
453 .filter(|edge| kind.is_none_or(|kind| edge.kind == kind))
454 .collect::<Vec<_>>();
455 Ok(apply_graph_edge_query_page(edges, options, Vec::new()))
456 }
457 fn paged_incident_edges(
458 &self,
459 node_id: &str,
460 kind: Option<&str>,
461 options: GraphQueryOptions,
462 ) -> Result<GraphPagedSubgraph> {
463 Ok(apply_graph_edge_query_page(
464 self.incident_edges(node_id, kind)?,
465 options,
466 Vec::new(),
467 ))
468 }
469 fn edges_between_nodes(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
470 let mut edges = BTreeMap::<(String, String, String), GraphEdge>::new();
471 for from_id in node_ids {
472 for edge in self.outgoing_edges(from_id, None)? {
473 if node_ids.contains(&edge.to_id) {
474 edges
475 .entry((edge.from_id.clone(), edge.kind.clone(), edge.to_id.clone()))
476 .or_insert(edge);
477 }
478 }
479 }
480 Ok(edges.into_values().collect())
481 }
482 fn shortest_path(
483 &self,
484 from_id: &str,
485 to_id: &str,
486 kind: Option<&str>,
487 ) -> Result<Option<GraphPath>>;
488 fn shortest_path_with_max_hops(
489 &self,
490 from_id: &str,
491 to_id: &str,
492 kind: Option<&str>,
493 max_hops: Option<usize>,
494 ) -> Result<Option<GraphPath>> {
495 shortest_path_using_outgoing(from_id, to_id, kind, max_hops, |current, kind| {
496 self.outgoing_edges(current, kind)
497 })
498 }
499 fn neighborhood(
500 &self,
501 center_id: &str,
502 depth: usize,
503 kind: Option<&str>,
504 ) -> Result<Option<GraphSubgraph>> {
505 let Some(center) = self.node(center_id)? else {
506 return Ok(None);
507 };
508 let mut nodes = BTreeMap::from([(center_id.to_string(), center)]);
509 let mut edges = BTreeMap::<(String, String, String), GraphEdge>::new();
510 let mut queue = VecDeque::from([(center_id.to_string(), 0usize)]);
511
512 while let Some((current, current_depth)) = queue.pop_front() {
513 if current_depth >= depth {
514 continue;
515 }
516 for edge in self.outgoing_edges(¤t, kind)? {
517 let edge_key = (edge.from_id.clone(), edge.kind.clone(), edge.to_id.clone());
518 edges.entry(edge_key).or_insert_with(|| edge.clone());
519 if !nodes.contains_key(&edge.to_id)
520 && let Some(node) = self.node(&edge.to_id)?
521 {
522 nodes.insert(edge.to_id.clone(), node);
523 queue.push_back((edge.to_id.clone(), current_depth + 1));
524 }
525 }
526 }
527
528 Ok(Some(
529 GraphSubgraph {
530 nodes: nodes.into_values().collect(),
531 edges: edges.into_values().collect(),
532 }
533 .sorted(),
534 ))
535 }
536 fn paged_nodes_by_kind(
537 &self,
538 kind: &str,
539 options: GraphQueryOptions,
540 ) -> Result<GraphPagedSubgraph> {
541 Ok(apply_graph_query_page(
542 self.nodes_by_kind(kind)?,
543 Vec::new(),
544 options,
545 Vec::new(),
546 ))
547 }
548 fn paged_neighborhood(
549 &self,
550 center_id: &str,
551 depth: usize,
552 kind: Option<&str>,
553 options: GraphQueryOptions,
554 ) -> Result<Option<GraphPagedSubgraph>> {
555 Ok(self.neighborhood(center_id, depth, kind)?.map(|subgraph| {
556 apply_graph_query_page(subgraph.nodes, subgraph.edges, options, Vec::new())
557 }))
558 }
559 fn reachable_nodes_by_kind(
560 &self,
561 from_id: &str,
562 kind: &str,
563 depth: usize,
564 limit: usize,
565 ) -> Result<Vec<(GraphNode, GraphPath)>> {
566 let mut rows = BTreeMap::<String, (GraphNode, GraphPath)>::new();
567 let mut seen = BTreeSet::from([from_id.to_string()]);
568 let mut queue = VecDeque::from([(from_id.to_string(), vec![from_id.to_string()])]);
569
570 while let Some((current, path)) = queue.pop_front() {
571 let current_depth = path.len().saturating_sub(1);
572 if current_depth >= depth {
573 continue;
574 }
575 for edge in self.outgoing_edges(¤t, None)? {
576 if !seen.insert(edge.to_id.clone()) {
577 continue;
578 }
579 let Some(node) = self.node(&edge.to_id)? else {
580 continue;
581 };
582 let mut next_path = path.clone();
583 next_path.push(edge.to_id.clone());
584 let graph_path = GraphPath {
585 hops: next_path.len().saturating_sub(1),
586 nodes: next_path.clone(),
587 };
588 if node.kind == kind {
589 rows.entry(node.id.clone()).or_insert((node, graph_path));
590 }
591 queue.push_back((edge.to_id, next_path));
592 }
593 }
594 let mut rows = rows.into_values().collect::<Vec<_>>();
595 rows.sort_by(|(left_node, left_path), (right_node, right_path)| {
596 left_path
597 .hops
598 .cmp(&right_path.hops)
599 .then(left_node.label.cmp(&right_node.label))
600 .then(left_node.id.cmp(&right_node.id))
601 });
602 if limit > 0 && rows.len() > limit {
603 rows.truncate(limit);
604 }
605 Ok(rows)
606 }
607
608 fn reachable_nodes_by_kinds(
609 &self,
610 from_id: &str,
611 kinds: &[&str],
612 depth: usize,
613 limit: usize,
614 ) -> Result<BTreeMap<String, Vec<(GraphNode, GraphPath)>>> {
615 let mut rows = BTreeMap::new();
616 for kind in kinds {
617 rows.insert(
618 (*kind).to_string(),
619 self.reachable_nodes_by_kind(from_id, kind, depth, limit)?,
620 );
621 }
622 Ok(rows)
623 }
624
625 fn resolve_evidence_target(&self, target: &str, kinds: &[&str]) -> Result<Option<GraphNode>> {
626 if let Some(node) = self.node(target)? {
627 return Ok(Some(node));
628 }
629 let normalized = target.trim().trim_start_matches('#');
630 for kind in kinds {
631 let mut candidates = self
632 .nodes_by_kind(kind)?
633 .into_iter()
634 .filter(|node| {
635 node.properties.get("handle").map(String::as_str) == Some(target)
636 || node.properties.get("ref_id").map(String::as_str) == Some(normalized)
637 || node.label == target
638 || node.label == format!("#{normalized}")
639 })
640 .collect::<Vec<_>>();
641 candidates.sort_by(|left, right| left.id.cmp(&right.id));
642 if let Some(candidate) = candidates.into_iter().next() {
643 return Ok(Some(candidate));
644 }
645 }
646 Ok(None)
647 }
648}
649
650#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
651#[serde(rename_all = "camelCase")]
652pub struct ConvexProjectionRows {
653 pub nodes: Vec<ConvexNodeRow>,
654 pub edges: Vec<ConvexEdgeRow>,
655}
656
657impl From<&GraphProjection> for ConvexProjectionRows {
658 fn from(projection: &GraphProjection) -> Self {
659 Self {
660 nodes: projection.nodes.iter().map(ConvexNodeRow::from).collect(),
661 edges: projection.edges.iter().map(ConvexEdgeRow::from).collect(),
662 }
663 }
664}
665
666#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
667#[serde(rename_all = "camelCase")]
668pub struct ConvexNodeRow {
669 pub external_id: String,
670 pub kind: String,
671 pub label: String,
672 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
673 pub properties: BTreeMap<String, String>,
674 #[serde(default, skip_serializing_if = "Vec::is_empty")]
675 pub provenance: Vec<GraphProvenance>,
676 #[serde(skip_serializing_if = "Option::is_none")]
677 pub freshness: Option<GraphFreshness>,
678}
679
680impl From<&GraphNode> for ConvexNodeRow {
681 fn from(node: &GraphNode) -> Self {
682 Self {
683 external_id: node.id.clone(),
684 kind: node.kind.clone(),
685 label: node.label.clone(),
686 properties: node.properties.clone(),
687 provenance: node.provenance.clone(),
688 freshness: node.freshness.clone(),
689 }
690 }
691}
692
693impl From<ConvexNodeRow> for GraphNode {
694 fn from(row: ConvexNodeRow) -> Self {
695 Self {
696 id: row.external_id,
697 kind: row.kind,
698 label: row.label,
699 properties: row.properties,
700 provenance: row.provenance,
701 freshness: row.freshness,
702 }
703 }
704}
705
706#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
707#[serde(rename_all = "camelCase")]
708pub struct ConvexEdgeRow {
709 pub edge_key: String,
710 pub from_external_id: String,
711 pub to_external_id: String,
712 pub kind: String,
713 #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
714 pub properties: BTreeMap<String, String>,
715 #[serde(default, skip_serializing_if = "Vec::is_empty")]
716 pub provenance: Vec<GraphProvenance>,
717 #[serde(skip_serializing_if = "Option::is_none")]
718 pub freshness: Option<GraphFreshness>,
719}
720
721impl ConvexEdgeRow {
722 pub fn stable_key(from_id: &str, to_id: &str, kind: &str) -> String {
723 stable_graph_edge_id(from_id, to_id, kind)
724 }
725}
726
727impl From<&GraphEdge> for ConvexEdgeRow {
728 fn from(edge: &GraphEdge) -> Self {
729 Self {
730 edge_key: graph_edge_id(edge),
731 from_external_id: edge.from_id.clone(),
732 to_external_id: edge.to_id.clone(),
733 kind: edge.kind.clone(),
734 properties: edge.properties.clone(),
735 provenance: edge.provenance.clone(),
736 freshness: edge.freshness.clone(),
737 }
738 }
739}
740
741impl From<ConvexEdgeRow> for GraphEdge {
742 fn from(row: ConvexEdgeRow) -> Self {
743 Self {
744 id: row.edge_key,
745 from_id: row.from_external_id,
746 to_id: row.to_external_id,
747 kind: row.kind,
748 properties: row.properties,
749 provenance: row.provenance,
750 freshness: row.freshness,
751 }
752 }
753}
754
755pub trait ConvexGraphClient {
756 fn upsert_node_row(&self, row: &ConvexNodeRow) -> Result<()>;
757 fn upsert_edge_row(&self, row: &ConvexEdgeRow) -> Result<()>;
758 fn delete_node_row(&self, external_id: &str) -> Result<usize>;
759 fn delete_edge_row(&self, edge_key: &str) -> Result<usize>;
760 fn node_row(&self, external_id: &str) -> Result<Option<ConvexNodeRow>>;
761 fn node_rows(&self) -> Result<Vec<ConvexNodeRow>>;
762 fn edge_rows(&self) -> Result<Vec<ConvexEdgeRow>>;
763 fn node_rows_by_kind(&self, kind: &str) -> Result<Vec<ConvexNodeRow>>;
764 fn outgoing_edge_rows(
765 &self,
766 from_external_id: &str,
767 kind: Option<&str>,
768 ) -> Result<Vec<ConvexEdgeRow>>;
769}
770
771#[derive(Default)]
772pub struct ConvexRowsGraphClient {
773 nodes: RefCell<BTreeMap<String, ConvexNodeRow>>,
774 edges: RefCell<BTreeMap<String, ConvexEdgeRow>>,
775}
776
777impl ConvexRowsGraphClient {
778 pub fn from_rows(rows: ConvexProjectionRows) -> Self {
779 Self {
780 nodes: RefCell::new(
781 rows.nodes
782 .into_iter()
783 .map(|row| (row.external_id.clone(), row))
784 .collect(),
785 ),
786 edges: RefCell::new(
787 rows.edges
788 .into_iter()
789 .map(|row| (row.edge_key.clone(), row))
790 .collect(),
791 ),
792 }
793 }
794
795 pub fn to_rows(&self) -> ConvexProjectionRows {
796 ConvexProjectionRows {
797 nodes: self.nodes.borrow().values().cloned().collect(),
798 edges: self.edges.borrow().values().cloned().collect(),
799 }
800 }
801}
802
803impl ConvexGraphClient for ConvexRowsGraphClient {
804 fn upsert_node_row(&self, row: &ConvexNodeRow) -> Result<()> {
805 self.nodes
806 .borrow_mut()
807 .insert(row.external_id.clone(), row.clone());
808 Ok(())
809 }
810
811 fn upsert_edge_row(&self, row: &ConvexEdgeRow) -> Result<()> {
812 self.edges
813 .borrow_mut()
814 .insert(row.edge_key.clone(), row.clone());
815 Ok(())
816 }
817
818 fn delete_node_row(&self, external_id: &str) -> Result<usize> {
819 let mut edges = self.edges.borrow_mut();
820 let incident = edges
821 .values()
822 .filter(|row| row.from_external_id == external_id || row.to_external_id == external_id)
823 .map(|row| row.edge_key.clone())
824 .collect::<Vec<_>>();
825 for edge_key in incident {
826 edges.remove(&edge_key);
827 }
828 Ok(usize::from(
829 self.nodes.borrow_mut().remove(external_id).is_some(),
830 ))
831 }
832
833 fn delete_edge_row(&self, edge_key: &str) -> Result<usize> {
834 Ok(usize::from(
835 self.edges.borrow_mut().remove(edge_key).is_some(),
836 ))
837 }
838
839 fn node_row(&self, external_id: &str) -> Result<Option<ConvexNodeRow>> {
840 Ok(self.nodes.borrow().get(external_id).cloned())
841 }
842
843 fn node_rows(&self) -> Result<Vec<ConvexNodeRow>> {
844 Ok(self.nodes.borrow().values().cloned().collect())
845 }
846
847 fn edge_rows(&self) -> Result<Vec<ConvexEdgeRow>> {
848 Ok(self.edges.borrow().values().cloned().collect())
849 }
850
851 fn node_rows_by_kind(&self, kind: &str) -> Result<Vec<ConvexNodeRow>> {
852 Ok(self
853 .nodes
854 .borrow()
855 .values()
856 .filter(|row| row.kind == kind)
857 .cloned()
858 .collect())
859 }
860
861 fn outgoing_edge_rows(
862 &self,
863 from_external_id: &str,
864 kind: Option<&str>,
865 ) -> Result<Vec<ConvexEdgeRow>> {
866 Ok(self
867 .edges
868 .borrow()
869 .values()
870 .filter(|row| row.from_external_id == from_external_id)
871 .filter(|row| kind.is_none_or(|kind| row.kind == kind))
872 .cloned()
873 .collect())
874 }
875}
876
877pub struct ConvexGraphStore<C> {
878 client: C,
879}
880
881impl<C> ConvexGraphStore<C> {
882 pub fn new(client: C) -> Self {
883 Self { client }
884 }
885
886 pub fn client(&self) -> &C {
887 &self.client
888 }
889
890 pub fn into_inner(self) -> C {
891 self.client
892 }
893}
894
895impl<C: ConvexGraphClient> GraphStore for ConvexGraphStore<C> {
896 fn upsert_node(&self, node: &GraphNode) -> Result<()> {
897 self.client.upsert_node_row(&ConvexNodeRow::from(node))
898 }
899
900 fn upsert_edge(&self, edge: &GraphEdge) -> Result<()> {
901 if self.client.node_row(&edge.from_id)?.is_none() {
902 bail!(
903 "convex graph edge {} -> {} ({}) references missing from node",
904 edge.from_id,
905 edge.to_id,
906 edge.kind
907 );
908 }
909 if self.client.node_row(&edge.to_id)?.is_none() {
910 bail!(
911 "convex graph edge {} -> {} ({}) references missing to node",
912 edge.from_id,
913 edge.to_id,
914 edge.kind
915 );
916 }
917 self.client.upsert_edge_row(&ConvexEdgeRow::from(edge))
918 }
919
920 fn delete_node(&self, id: &str) -> Result<usize> {
921 let incident = self
922 .client
923 .edge_rows()?
924 .into_iter()
925 .filter(|row| row.from_external_id == id || row.to_external_id == id)
926 .map(|row| row.edge_key)
927 .collect::<Vec<_>>();
928 for edge_key in incident {
929 self.client.delete_edge_row(&edge_key)?;
930 }
931 self.client.delete_node_row(id)
932 }
933
934 fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize> {
935 self.client
936 .delete_edge_row(&ConvexEdgeRow::stable_key(from_id, to_id, kind))
937 }
938
939 fn node(&self, id: &str) -> Result<Option<GraphNode>> {
940 Ok(self.client.node_row(id)?.map(GraphNode::from))
941 }
942
943 fn all_nodes(&self) -> Result<Vec<GraphNode>> {
944 let mut nodes: Vec<GraphNode> = self
945 .client
946 .node_rows()?
947 .into_iter()
948 .map(GraphNode::from)
949 .collect();
950 nodes.sort_by(|left, right| left.id.cmp(&right.id));
951 Ok(nodes)
952 }
953
954 fn all_edges(&self) -> Result<Vec<GraphEdge>> {
955 let mut edges: Vec<GraphEdge> = self
956 .client
957 .edge_rows()?
958 .into_iter()
959 .map(GraphEdge::from)
960 .collect();
961 edges.sort_by(|left, right| {
962 left.from_id
963 .cmp(&right.from_id)
964 .then(left.kind.cmp(&right.kind))
965 .then(left.to_id.cmp(&right.to_id))
966 });
967 Ok(edges)
968 }
969
970 fn graph_counts(&self) -> Result<(usize, usize)> {
971 Ok((
972 self.client.node_rows()?.len(),
973 self.client.edge_rows()?.len(),
974 ))
975 }
976
977 fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>> {
978 let mut nodes: Vec<GraphNode> = self
979 .client
980 .node_rows_by_kind(kind)?
981 .into_iter()
982 .map(GraphNode::from)
983 .collect();
984 nodes.sort_by(|left, right| left.id.cmp(&right.id));
985 Ok(nodes)
986 }
987
988 fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
989 let mut edges: Vec<GraphEdge> = self
990 .client
991 .outgoing_edge_rows(from_id, kind)?
992 .into_iter()
993 .map(GraphEdge::from)
994 .collect();
995 edges.sort_by(|left, right| {
996 left.to_id
997 .cmp(&right.to_id)
998 .then(left.kind.cmp(&right.kind))
999 });
1000 Ok(edges)
1001 }
1002
1003 fn shortest_path(
1004 &self,
1005 from_id: &str,
1006 to_id: &str,
1007 kind: Option<&str>,
1008 ) -> Result<Option<GraphPath>> {
1009 shortest_path_using_outgoing(from_id, to_id, kind, None, |current, kind| {
1010 self.outgoing_edges(current, kind)
1011 })
1012 }
1013}
1014
1015pub fn shortest_path_using_outgoing(
1016 from_id: &str,
1017 to_id: &str,
1018 kind: Option<&str>,
1019 max_hops: Option<usize>,
1020 mut outgoing_edges: impl FnMut(&str, Option<&str>) -> Result<Vec<GraphEdge>>,
1021) -> Result<Option<GraphPath>> {
1022 if from_id == to_id {
1023 return Ok(Some(GraphPath {
1024 nodes: vec![from_id.to_string()],
1025 hops: 0,
1026 }));
1027 }
1028
1029 let mut queue = VecDeque::new();
1030 let mut parent = BTreeMap::<String, String>::new();
1031 parent.insert(from_id.to_string(), String::new());
1032 queue.push_back(from_id.to_string());
1033
1034 while let Some(current) = queue.pop_front() {
1035 let current_depth = parent_depth(&parent, ¤t);
1036 if max_hops.is_some_and(|max_hops| current_depth >= max_hops) {
1037 continue;
1038 }
1039 for edge in outgoing_edges(¤t, kind)? {
1040 if parent.contains_key(&edge.to_id) {
1041 continue;
1042 }
1043 parent.insert(edge.to_id.clone(), current.clone());
1044 if edge.to_id == to_id {
1045 let mut nodes = vec![to_id.to_string()];
1046 let mut cursor = to_id;
1047 while let Some(previous) = parent.get(cursor) {
1048 if previous.is_empty() {
1049 break;
1050 }
1051 nodes.push(previous.clone());
1052 cursor = previous;
1053 }
1054 nodes.reverse();
1055 let hops = nodes.len().saturating_sub(1);
1056 return Ok(Some(GraphPath { nodes, hops }));
1057 }
1058 queue.push_back(edge.to_id);
1059 }
1060 }
1061
1062 Ok(None)
1063}
1064
1065fn parent_depth(parent: &BTreeMap<String, String>, id: &str) -> usize {
1066 let mut depth = 0usize;
1067 let mut cursor = id;
1068 while let Some(previous) = parent.get(cursor) {
1069 if previous.is_empty() {
1070 break;
1071 }
1072 depth += 1;
1073 cursor = previous;
1074 }
1075 depth
1076}
1077
1078#[cfg(test)]
1079mod tests {
1080 use super::*;
1081
1082 fn sample_provenance() -> GraphProvenance {
1083 GraphProvenance::new("fixture", "src/lib.rs:1").with_content_hash("hash-1")
1084 }
1085
1086 fn sample_projection() -> GraphProjection {
1087 let source = sample_provenance();
1088 GraphProjection {
1089 nodes: vec![
1090 GraphNode::new("doc:livekit", "document", "LiveKit guide")
1091 .with_property("domain", "livekit")
1092 .with_provenance(source.clone())
1093 .with_freshness(GraphFreshness::content_hash("node-hash")),
1094 GraphNode::new("topic:rooms", "topic", "Rooms"),
1095 GraphNode::new("topic:egress", "topic", "Egress"),
1096 ],
1097 edges: vec![
1098 GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
1099 .with_property("confidence", "0.91")
1100 .with_provenance(source.clone())
1101 .with_freshness(GraphFreshness::content_hash("edge-hash")),
1102 GraphEdge::new("topic:rooms", "topic:egress", "related_to").with_provenance(source),
1103 ],
1104 }
1105 }
1106
1107 fn assert_projection_store_contract(store: &impl GraphStore) {
1108 let projection = sample_projection();
1109 projection.upsert_into(store).unwrap();
1110
1111 assert_eq!(
1112 store.node("doc:livekit").unwrap(),
1113 projection
1114 .nodes
1115 .iter()
1116 .find(|node| node.id == "doc:livekit")
1117 .cloned()
1118 );
1119 assert_eq!(
1120 store.nodes_by_kind("topic").unwrap(),
1121 vec![
1122 GraphNode::new("topic:egress", "topic", "Egress"),
1123 GraphNode::new("topic:rooms", "topic", "Rooms"),
1124 ]
1125 );
1126
1127 let mentions = store
1128 .outgoing_edges("doc:livekit", Some("mentions"))
1129 .unwrap();
1130 assert_eq!(mentions.len(), 1);
1131 assert_eq!(mentions[0].to_id, "topic:rooms");
1132 assert_eq!(
1133 mentions[0].properties.get("confidence"),
1134 Some(&"0.91".into())
1135 );
1136
1137 let path = store
1138 .shortest_path("doc:livekit", "topic:egress", None)
1139 .unwrap()
1140 .unwrap();
1141 assert_eq!(
1142 path.nodes,
1143 vec!["doc:livekit", "topic:rooms", "topic:egress"]
1144 );
1145 }
1146
1147 #[derive(Default)]
1148 struct MemoryConvexGraphClient {
1149 nodes: RefCell<BTreeMap<String, ConvexNodeRow>>,
1150 edges: RefCell<BTreeMap<String, ConvexEdgeRow>>,
1151 }
1152
1153 impl ConvexGraphClient for MemoryConvexGraphClient {
1154 fn upsert_node_row(&self, row: &ConvexNodeRow) -> Result<()> {
1155 self.nodes
1156 .borrow_mut()
1157 .insert(row.external_id.clone(), row.clone());
1158 Ok(())
1159 }
1160
1161 fn upsert_edge_row(&self, row: &ConvexEdgeRow) -> Result<()> {
1162 self.edges
1163 .borrow_mut()
1164 .insert(row.edge_key.clone(), row.clone());
1165 Ok(())
1166 }
1167
1168 fn delete_node_row(&self, external_id: &str) -> Result<usize> {
1169 Ok(usize::from(
1170 self.nodes.borrow_mut().remove(external_id).is_some(),
1171 ))
1172 }
1173
1174 fn delete_edge_row(&self, edge_key: &str) -> Result<usize> {
1175 Ok(usize::from(
1176 self.edges.borrow_mut().remove(edge_key).is_some(),
1177 ))
1178 }
1179
1180 fn node_row(&self, external_id: &str) -> Result<Option<ConvexNodeRow>> {
1181 Ok(self.nodes.borrow().get(external_id).cloned())
1182 }
1183
1184 fn node_rows(&self) -> Result<Vec<ConvexNodeRow>> {
1185 Ok(self.nodes.borrow().values().cloned().collect())
1186 }
1187
1188 fn edge_rows(&self) -> Result<Vec<ConvexEdgeRow>> {
1189 Ok(self.edges.borrow().values().cloned().collect())
1190 }
1191
1192 fn node_rows_by_kind(&self, kind: &str) -> Result<Vec<ConvexNodeRow>> {
1193 Ok(self
1194 .nodes
1195 .borrow()
1196 .values()
1197 .filter(|row| row.kind == kind)
1198 .cloned()
1199 .collect())
1200 }
1201
1202 fn outgoing_edge_rows(
1203 &self,
1204 from_external_id: &str,
1205 kind: Option<&str>,
1206 ) -> Result<Vec<ConvexEdgeRow>> {
1207 Ok(self
1208 .edges
1209 .borrow()
1210 .values()
1211 .filter(|row| row.from_external_id == from_external_id)
1212 .filter(|row| kind.is_none_or(|kind| row.kind == kind))
1213 .cloned()
1214 .collect())
1215 }
1216 }
1217
1218 #[test]
1219 fn graph_projection_round_trips_through_backend_agnostic_store_contract() {
1220 let convex = ConvexGraphStore::new(MemoryConvexGraphClient::default());
1221 assert_projection_store_contract(&convex);
1222
1223 let client = convex.client();
1224 assert_eq!(client.nodes.borrow().len(), 3);
1225 assert_eq!(client.edges.borrow().len(), 2);
1226 assert!(
1227 client.nodes.borrow().contains_key("doc:livekit"),
1228 "Convex rows keep GraphNode.id as the externalId upsert key"
1229 );
1230 }
1231
1232 #[test]
1233 fn graph_store_contract_covers_crud_neighborhood_and_ordering() {
1234 fn assert_crud_contract(store: &impl GraphStore) {
1235 let projection = sample_projection();
1236 projection.upsert_into(store).unwrap();
1237
1238 let neighborhood = store.neighborhood("doc:livekit", 2, None).unwrap().unwrap();
1239 assert_eq!(
1240 neighborhood
1241 .nodes
1242 .iter()
1243 .map(|node| node.id.as_str())
1244 .collect::<Vec<_>>(),
1245 vec!["doc:livekit", "topic:egress", "topic:rooms"]
1246 );
1247 assert_eq!(
1248 neighborhood
1249 .edges
1250 .iter()
1251 .map(|edge| (
1252 edge.from_id.as_str(),
1253 edge.kind.as_str(),
1254 edge.to_id.as_str()
1255 ))
1256 .collect::<Vec<_>>(),
1257 vec![
1258 ("doc:livekit", "mentions", "topic:rooms"),
1259 ("topic:rooms", "related_to", "topic:egress"),
1260 ]
1261 );
1262
1263 assert_eq!(
1264 store
1265 .delete_edge("topic:rooms", "topic:egress", "related_to")
1266 .unwrap(),
1267 1
1268 );
1269 assert!(
1270 store
1271 .shortest_path("doc:livekit", "topic:egress", None)
1272 .unwrap()
1273 .is_none()
1274 );
1275 assert_eq!(store.delete_node("topic:rooms").unwrap(), 1);
1276 assert!(store.node("topic:rooms").unwrap().is_none());
1277 assert!(
1278 store
1279 .outgoing_edges("doc:livekit", None)
1280 .unwrap()
1281 .is_empty()
1282 );
1283 }
1284
1285 assert_crud_contract(&ConvexGraphStore::new(ConvexRowsGraphClient::default()));
1286 }
1287
1288 #[test]
1289 fn convex_projection_rows_keep_stable_ids_and_edge_keys() {
1290 let projection = sample_projection();
1291 let rows = projection.to_convex_rows();
1292
1293 let doc_row = rows
1294 .nodes
1295 .iter()
1296 .find(|row| row.external_id == "doc:livekit")
1297 .unwrap();
1298 assert_eq!(doc_row.kind, "document");
1299 assert_eq!(doc_row.properties.get("domain"), Some(&"livekit".into()));
1300
1301 let mentions = rows
1302 .edges
1303 .iter()
1304 .find(|row| row.kind == "mentions")
1305 .unwrap();
1306 assert_eq!(mentions.from_external_id, "doc:livekit");
1307 assert_eq!(mentions.to_external_id, "topic:rooms");
1308 assert_eq!(
1309 mentions.edge_key,
1310 ConvexEdgeRow::stable_key("doc:livekit", "topic:rooms", "mentions")
1311 );
1312 assert!(mentions.edge_key.starts_with("edge:"));
1313 }
1314
1315 #[test]
1316 fn convex_store_rejects_edges_when_projection_nodes_are_missing() {
1317 let store = ConvexGraphStore::new(MemoryConvexGraphClient::default());
1318 store
1319 .upsert_node(&GraphNode::new("doc:livekit", "document", "LiveKit guide"))
1320 .unwrap();
1321
1322 let err = store
1323 .upsert_edge(&GraphEdge::new("doc:livekit", "topic:rooms", "mentions"))
1324 .unwrap_err();
1325 assert!(
1326 err.to_string().contains("references missing to node"),
1327 "{err}"
1328 );
1329 }
1330}