Skip to main content

tsift_core/
lib.rs

1use anyhow::{Result, bail};
2use serde::{Deserialize, Serialize};
3use std::cell::RefCell;
4use std::collections::{BTreeMap, BTreeSet, VecDeque};
5
6pub const SQLITE_GRAPH_SCHEMA_VERSION: i64 = 5;
7
8#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
9pub struct GraphProvenance {
10    pub source: String,
11    pub source_ref: String,
12    #[serde(skip_serializing_if = "Option::is_none")]
13    pub content_hash: Option<String>,
14}
15
16impl GraphProvenance {
17    pub fn new(source: impl Into<String>, source_ref: impl Into<String>) -> Self {
18        Self {
19            source: source.into(),
20            source_ref: source_ref.into(),
21            content_hash: None,
22        }
23    }
24
25    pub fn with_content_hash(mut self, content_hash: impl Into<String>) -> Self {
26        self.content_hash = Some(content_hash.into());
27        self
28    }
29}
30
31#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
32pub struct GraphFreshness {
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub content_hash: Option<String>,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub observed_at_unix: Option<i64>,
37}
38
39impl GraphFreshness {
40    pub fn content_hash(content_hash: impl Into<String>) -> Self {
41        Self {
42            content_hash: Some(content_hash.into()),
43            observed_at_unix: None,
44        }
45    }
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
49pub struct GraphNode {
50    pub id: String,
51    pub kind: String,
52    pub label: String,
53    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
54    pub properties: BTreeMap<String, String>,
55    #[serde(default, skip_serializing_if = "Vec::is_empty")]
56    pub provenance: Vec<GraphProvenance>,
57    #[serde(skip_serializing_if = "Option::is_none")]
58    pub freshness: Option<GraphFreshness>,
59}
60
61impl GraphNode {
62    pub fn new(id: impl Into<String>, kind: impl Into<String>, label: impl Into<String>) -> Self {
63        Self {
64            id: id.into(),
65            kind: kind.into(),
66            label: label.into(),
67            properties: BTreeMap::new(),
68            provenance: Vec::new(),
69            freshness: None,
70        }
71    }
72
73    pub fn with_property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
74        self.properties.insert(key.into(), value.into());
75        self
76    }
77
78    pub fn with_provenance(mut self, provenance: GraphProvenance) -> Self {
79        self.provenance.push(provenance);
80        self
81    }
82
83    pub fn with_freshness(mut self, freshness: GraphFreshness) -> Self {
84        self.freshness = Some(freshness);
85        self
86    }
87}
88
89#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
90pub struct GraphEdge {
91    #[serde(default)]
92    pub id: String,
93    pub from_id: String,
94    pub to_id: String,
95    pub kind: String,
96    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
97    pub properties: BTreeMap<String, String>,
98    #[serde(default, skip_serializing_if = "Vec::is_empty")]
99    pub provenance: Vec<GraphProvenance>,
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub freshness: Option<GraphFreshness>,
102}
103
104impl GraphEdge {
105    pub fn stable_id(from_id: &str, to_id: &str, kind: &str) -> String {
106        stable_graph_edge_id(from_id, to_id, kind)
107    }
108
109    pub fn new(
110        from_id: impl Into<String>,
111        to_id: impl Into<String>,
112        kind: impl Into<String>,
113    ) -> Self {
114        let from_id = from_id.into();
115        let to_id = to_id.into();
116        let kind = kind.into();
117        Self {
118            id: stable_graph_edge_id(&from_id, &to_id, &kind),
119            from_id,
120            to_id,
121            kind,
122            properties: BTreeMap::new(),
123            provenance: Vec::new(),
124            freshness: None,
125        }
126    }
127
128    pub fn with_property(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
129        self.properties.insert(key.into(), value.into());
130        self
131    }
132
133    pub fn with_provenance(mut self, provenance: GraphProvenance) -> Self {
134        self.provenance.push(provenance);
135        self
136    }
137
138    pub fn with_freshness(mut self, freshness: GraphFreshness) -> Self {
139        self.freshness = Some(freshness);
140        self
141    }
142}
143
144pub fn stable_graph_edge_id(from_id: &str, to_id: &str, kind: &str) -> String {
145    let raw = serde_json::json!([from_id, kind, to_id]).to_string();
146    format!("edge:{}", blake3::hash(raw.as_bytes()).to_hex())
147}
148
149pub fn graph_edge_id(edge: &GraphEdge) -> String {
150    if edge.id.is_empty() {
151        stable_graph_edge_id(&edge.from_id, &edge.to_id, &edge.kind)
152    } else {
153        edge.id.clone()
154    }
155}
156
157#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
158pub struct GraphProjection {
159    pub nodes: Vec<GraphNode>,
160    pub edges: Vec<GraphEdge>,
161}
162
163impl GraphProjection {
164    pub fn upsert_into<S: GraphStore + ?Sized>(&self, store: &S) -> Result<()> {
165        for node in &self.nodes {
166            store.upsert_node(node)?;
167        }
168        for edge in &self.edges {
169            store.upsert_edge(edge)?;
170        }
171        Ok(())
172    }
173
174    pub fn to_convex_rows(&self) -> ConvexProjectionRows {
175        ConvexProjectionRows::from(self)
176    }
177}
178
179#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
180pub struct GraphPath {
181    pub nodes: Vec<String>,
182    pub hops: usize,
183}
184
185#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
186pub struct GraphSubgraph {
187    pub nodes: Vec<GraphNode>,
188    pub edges: Vec<GraphEdge>,
189}
190
191impl GraphSubgraph {
192    pub fn sorted(mut self) -> Self {
193        self.nodes.sort_by(|left, right| left.id.cmp(&right.id));
194        self.edges.sort_by(|left, right| {
195            left.from_id
196                .cmp(&right.from_id)
197                .then(left.kind.cmp(&right.kind))
198                .then(left.to_id.cmp(&right.to_id))
199                .then_with(|| graph_edge_id(left).cmp(&graph_edge_id(right)))
200        });
201        self
202    }
203}
204
205#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct GraphPropertyFilter {
207    pub key: String,
208    pub value: String,
209}
210
211#[derive(Debug, Clone, Default, PartialEq, Eq)]
212pub struct GraphQueryOptions {
213    pub cursor: Option<String>,
214    pub limit: Option<usize>,
215    pub property_filters: Vec<GraphPropertyFilter>,
216}
217
218#[derive(Debug, Clone, Default, PartialEq, Eq)]
219pub struct GraphQueryPage {
220    pub cursor: Option<String>,
221    pub limit: Option<usize>,
222    pub next_cursor: Option<String>,
223    pub returned_nodes: usize,
224    pub returned_edges: usize,
225    pub truncated: bool,
226    pub diagnostics: Vec<String>,
227}
228
229#[derive(Debug, Clone, PartialEq, Eq)]
230pub struct GraphPagedSubgraph {
231    pub nodes: Vec<GraphNode>,
232    pub edges: Vec<GraphEdge>,
233    pub page: GraphQueryPage,
234}
235
236fn graph_node_matches_filters(node: &GraphNode, filters: &[GraphPropertyFilter]) -> bool {
237    filters
238        .iter()
239        .all(|filter| node.properties.get(&filter.key) == Some(&filter.value))
240}
241
242fn graph_edge_matches_filters(edge: &GraphEdge, filters: &[GraphPropertyFilter]) -> bool {
243    filters
244        .iter()
245        .all(|filter| edge.properties.get(&filter.key) == Some(&filter.value))
246}
247
248pub fn apply_graph_query_page(
249    mut nodes: Vec<GraphNode>,
250    mut edges: Vec<GraphEdge>,
251    options: GraphQueryOptions,
252    mut diagnostics: Vec<String>,
253) -> GraphPagedSubgraph {
254    nodes.sort_by(|left, right| left.id.cmp(&right.id));
255    edges.sort_by(|left, right| {
256        left.from_id
257            .cmp(&right.from_id)
258            .then(left.kind.cmp(&right.kind))
259            .then(left.to_id.cmp(&right.to_id))
260    });
261
262    let before_filter = nodes.len();
263    if !options.property_filters.is_empty() {
264        nodes.retain(|node| graph_node_matches_filters(node, &options.property_filters));
265    }
266    let after_filter = nodes.len();
267
268    if let Some(cursor) = &options.cursor {
269        nodes.retain(|node| node.id > *cursor);
270    }
271
272    let before_limit = nodes.len();
273    let mut next_cursor = None;
274    if let Some(limit) = options.limit
275        && nodes.len() > limit
276    {
277        next_cursor = nodes
278            .get(limit.saturating_sub(1))
279            .map(|node| node.id.clone());
280        nodes.truncate(limit);
281    }
282
283    let node_ids = nodes
284        .iter()
285        .map(|node| node.id.as_str())
286        .collect::<BTreeSet<_>>();
287    edges.retain(|edge| {
288        node_ids.contains(edge.from_id.as_str()) && node_ids.contains(edge.to_id.as_str())
289    });
290
291    if after_filter != before_filter {
292        diagnostics.push(format!(
293            "property filters removed {} node(s)",
294            before_filter.saturating_sub(after_filter)
295        ));
296    }
297    if options.cursor.is_some() {
298        diagnostics.push("cursor is exclusive and ordered by node id".to_string());
299    }
300    if next_cursor.is_some() {
301        diagnostics.push(
302            "result was truncated; pass page.next_cursor as --cursor for the next page".to_string(),
303        );
304    }
305
306    GraphPagedSubgraph {
307        page: GraphQueryPage {
308            cursor: options.cursor,
309            limit: options.limit,
310            next_cursor,
311            returned_nodes: nodes.len(),
312            returned_edges: edges.len(),
313            truncated: options.limit.is_some_and(|limit| before_limit > limit),
314            diagnostics,
315        },
316        nodes,
317        edges,
318    }
319}
320
321pub fn apply_graph_edge_query_page(
322    mut edges: Vec<GraphEdge>,
323    options: GraphQueryOptions,
324    mut diagnostics: Vec<String>,
325) -> GraphPagedSubgraph {
326    edges.sort_by_key(graph_edge_id);
327
328    let before_filter = edges.len();
329    if !options.property_filters.is_empty() {
330        edges.retain(|edge| graph_edge_matches_filters(edge, &options.property_filters));
331    }
332    let after_filter = edges.len();
333
334    if let Some(cursor) = &options.cursor {
335        edges.retain(|edge| graph_edge_id(edge) > *cursor);
336    }
337
338    let before_limit = edges.len();
339    let mut next_cursor = None;
340    if let Some(limit) = options.limit
341        && edges.len() > limit
342    {
343        next_cursor = edges.get(limit.saturating_sub(1)).map(graph_edge_id);
344        edges.truncate(limit);
345    }
346
347    if after_filter != before_filter {
348        diagnostics.push(format!(
349            "edge property filters removed {} edge(s)",
350            before_filter.saturating_sub(after_filter)
351        ));
352    }
353    if options.cursor.is_some() {
354        diagnostics.push("cursor is exclusive and ordered by edge id".to_string());
355    }
356    if next_cursor.is_some() {
357        diagnostics.push(
358            "result was truncated; pass page.next_cursor as --cursor for the next page".to_string(),
359        );
360    }
361
362    GraphPagedSubgraph {
363        page: GraphQueryPage {
364            cursor: options.cursor,
365            limit: options.limit,
366            next_cursor,
367            returned_nodes: 0,
368            returned_edges: edges.len(),
369            truncated: options.limit.is_some_and(|limit| before_limit > limit),
370            diagnostics,
371        },
372        nodes: Vec::new(),
373        edges,
374    }
375}
376
377pub trait GraphStore {
378    fn upsert_node(&self, node: &GraphNode) -> Result<()>;
379    fn upsert_edge(&self, edge: &GraphEdge) -> Result<()>;
380    fn delete_node(&self, id: &str) -> Result<usize>;
381    fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize>;
382    fn node(&self, id: &str) -> Result<Option<GraphNode>>;
383    fn all_nodes(&self) -> Result<Vec<GraphNode>>;
384    fn all_edges(&self) -> Result<Vec<GraphEdge>>;
385    fn edge(&self, edge_id: &str) -> Result<Option<GraphEdge>> {
386        Ok(self
387            .all_edges()?
388            .into_iter()
389            .find(|edge| graph_edge_id(edge) == edge_id))
390    }
391    fn graph_counts(&self) -> Result<(usize, usize)> {
392        Ok((self.all_nodes()?.len(), self.all_edges()?.len()))
393    }
394    fn sample_edge(&self, kind: Option<&str>) -> Result<Option<GraphEdge>> {
395        let mut edges = self
396            .all_edges()?
397            .into_iter()
398            .filter(|edge| edge.from_id != edge.to_id)
399            .filter(|edge| kind.is_none_or(|kind| edge.kind == kind))
400            .collect::<Vec<_>>();
401        edges.sort_by(|left, right| {
402            left.from_id
403                .cmp(&right.from_id)
404                .then(left.kind.cmp(&right.kind))
405                .then(left.to_id.cmp(&right.to_id))
406        });
407        Ok(edges.into_iter().next())
408    }
409    fn sample_edge_with_property(&self) -> Result<Option<(GraphEdge, GraphPropertyFilter)>> {
410        let mut probes = Vec::new();
411        for edge in self.all_edges()? {
412            if edge.from_id == edge.to_id {
413                continue;
414            }
415            if let Some((key, value)) = edge
416                .properties
417                .iter()
418                .next()
419                .map(|(key, value)| (key.clone(), value.clone()))
420            {
421                probes.push((edge, GraphPropertyFilter { key, value }));
422            }
423        }
424        probes.sort_by(|(left_edge, left_filter), (right_edge, right_filter)| {
425            left_filter
426                .key
427                .cmp(&right_filter.key)
428                .then(left_filter.value.cmp(&right_filter.value))
429                .then_with(|| graph_edge_id(left_edge).cmp(&graph_edge_id(right_edge)))
430        });
431        Ok(probes.into_iter().next())
432    }
433    fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>>;
434    fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>>;
435    fn incident_edges(&self, node_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
436        let mut edges = self
437            .all_edges()?
438            .into_iter()
439            .filter(|edge| edge.from_id == node_id || edge.to_id == node_id)
440            .filter(|edge| kind.is_none_or(|kind| edge.kind == kind))
441            .collect::<Vec<_>>();
442        edges.sort_by_key(graph_edge_id);
443        Ok(edges)
444    }
445    fn paged_edges(
446        &self,
447        kind: Option<&str>,
448        options: GraphQueryOptions,
449    ) -> Result<GraphPagedSubgraph> {
450        let edges = self
451            .all_edges()?
452            .into_iter()
453            .filter(|edge| kind.is_none_or(|kind| edge.kind == kind))
454            .collect::<Vec<_>>();
455        Ok(apply_graph_edge_query_page(edges, options, Vec::new()))
456    }
457    fn paged_incident_edges(
458        &self,
459        node_id: &str,
460        kind: Option<&str>,
461        options: GraphQueryOptions,
462    ) -> Result<GraphPagedSubgraph> {
463        Ok(apply_graph_edge_query_page(
464            self.incident_edges(node_id, kind)?,
465            options,
466            Vec::new(),
467        ))
468    }
469    fn edges_between_nodes(&self, node_ids: &BTreeSet<String>) -> Result<Vec<GraphEdge>> {
470        let mut edges = BTreeMap::<(String, String, String), GraphEdge>::new();
471        for from_id in node_ids {
472            for edge in self.outgoing_edges(from_id, None)? {
473                if node_ids.contains(&edge.to_id) {
474                    edges
475                        .entry((edge.from_id.clone(), edge.kind.clone(), edge.to_id.clone()))
476                        .or_insert(edge);
477                }
478            }
479        }
480        Ok(edges.into_values().collect())
481    }
482    fn shortest_path(
483        &self,
484        from_id: &str,
485        to_id: &str,
486        kind: Option<&str>,
487    ) -> Result<Option<GraphPath>>;
488    fn shortest_path_with_max_hops(
489        &self,
490        from_id: &str,
491        to_id: &str,
492        kind: Option<&str>,
493        max_hops: Option<usize>,
494    ) -> Result<Option<GraphPath>> {
495        shortest_path_using_outgoing(from_id, to_id, kind, max_hops, |current, kind| {
496            self.outgoing_edges(current, kind)
497        })
498    }
499    fn neighborhood(
500        &self,
501        center_id: &str,
502        depth: usize,
503        kind: Option<&str>,
504    ) -> Result<Option<GraphSubgraph>> {
505        let Some(center) = self.node(center_id)? else {
506            return Ok(None);
507        };
508        let mut nodes = BTreeMap::from([(center_id.to_string(), center)]);
509        let mut edges = BTreeMap::<(String, String, String), GraphEdge>::new();
510        let mut queue = VecDeque::from([(center_id.to_string(), 0usize)]);
511
512        while let Some((current, current_depth)) = queue.pop_front() {
513            if current_depth >= depth {
514                continue;
515            }
516            for edge in self.outgoing_edges(&current, kind)? {
517                let edge_key = (edge.from_id.clone(), edge.kind.clone(), edge.to_id.clone());
518                edges.entry(edge_key).or_insert_with(|| edge.clone());
519                if !nodes.contains_key(&edge.to_id)
520                    && let Some(node) = self.node(&edge.to_id)?
521                {
522                    nodes.insert(edge.to_id.clone(), node);
523                    queue.push_back((edge.to_id.clone(), current_depth + 1));
524                }
525            }
526        }
527
528        Ok(Some(
529            GraphSubgraph {
530                nodes: nodes.into_values().collect(),
531                edges: edges.into_values().collect(),
532            }
533            .sorted(),
534        ))
535    }
536    fn paged_nodes_by_kind(
537        &self,
538        kind: &str,
539        options: GraphQueryOptions,
540    ) -> Result<GraphPagedSubgraph> {
541        Ok(apply_graph_query_page(
542            self.nodes_by_kind(kind)?,
543            Vec::new(),
544            options,
545            Vec::new(),
546        ))
547    }
548    fn paged_neighborhood(
549        &self,
550        center_id: &str,
551        depth: usize,
552        kind: Option<&str>,
553        options: GraphQueryOptions,
554    ) -> Result<Option<GraphPagedSubgraph>> {
555        Ok(self.neighborhood(center_id, depth, kind)?.map(|subgraph| {
556            apply_graph_query_page(subgraph.nodes, subgraph.edges, options, Vec::new())
557        }))
558    }
559    fn reachable_nodes_by_kind(
560        &self,
561        from_id: &str,
562        kind: &str,
563        depth: usize,
564        limit: usize,
565    ) -> Result<Vec<(GraphNode, GraphPath)>> {
566        let mut rows = BTreeMap::<String, (GraphNode, GraphPath)>::new();
567        let mut seen = BTreeSet::from([from_id.to_string()]);
568        let mut queue = VecDeque::from([(from_id.to_string(), vec![from_id.to_string()])]);
569
570        while let Some((current, path)) = queue.pop_front() {
571            let current_depth = path.len().saturating_sub(1);
572            if current_depth >= depth {
573                continue;
574            }
575            for edge in self.outgoing_edges(&current, None)? {
576                if !seen.insert(edge.to_id.clone()) {
577                    continue;
578                }
579                let Some(node) = self.node(&edge.to_id)? else {
580                    continue;
581                };
582                let mut next_path = path.clone();
583                next_path.push(edge.to_id.clone());
584                let graph_path = GraphPath {
585                    hops: next_path.len().saturating_sub(1),
586                    nodes: next_path.clone(),
587                };
588                if node.kind == kind {
589                    rows.entry(node.id.clone()).or_insert((node, graph_path));
590                }
591                queue.push_back((edge.to_id, next_path));
592            }
593        }
594        let mut rows = rows.into_values().collect::<Vec<_>>();
595        rows.sort_by(|(left_node, left_path), (right_node, right_path)| {
596            left_path
597                .hops
598                .cmp(&right_path.hops)
599                .then(left_node.label.cmp(&right_node.label))
600                .then(left_node.id.cmp(&right_node.id))
601        });
602        if limit > 0 && rows.len() > limit {
603            rows.truncate(limit);
604        }
605        Ok(rows)
606    }
607
608    fn reachable_nodes_by_kinds(
609        &self,
610        from_id: &str,
611        kinds: &[&str],
612        depth: usize,
613        limit: usize,
614    ) -> Result<BTreeMap<String, Vec<(GraphNode, GraphPath)>>> {
615        let mut rows = BTreeMap::new();
616        for kind in kinds {
617            rows.insert(
618                (*kind).to_string(),
619                self.reachable_nodes_by_kind(from_id, kind, depth, limit)?,
620            );
621        }
622        Ok(rows)
623    }
624
625    fn resolve_evidence_target(&self, target: &str, kinds: &[&str]) -> Result<Option<GraphNode>> {
626        if let Some(node) = self.node(target)? {
627            return Ok(Some(node));
628        }
629        let normalized = target.trim().trim_start_matches('#');
630        for kind in kinds {
631            let mut candidates = self
632                .nodes_by_kind(kind)?
633                .into_iter()
634                .filter(|node| {
635                    node.properties.get("handle").map(String::as_str) == Some(target)
636                        || node.properties.get("ref_id").map(String::as_str) == Some(normalized)
637                        || node.label == target
638                        || node.label == format!("#{normalized}")
639                })
640                .collect::<Vec<_>>();
641            candidates.sort_by(|left, right| left.id.cmp(&right.id));
642            if let Some(candidate) = candidates.into_iter().next() {
643                return Ok(Some(candidate));
644            }
645        }
646        Ok(None)
647    }
648}
649
650#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
651#[serde(rename_all = "camelCase")]
652pub struct ConvexProjectionRows {
653    pub nodes: Vec<ConvexNodeRow>,
654    pub edges: Vec<ConvexEdgeRow>,
655}
656
657impl From<&GraphProjection> for ConvexProjectionRows {
658    fn from(projection: &GraphProjection) -> Self {
659        Self {
660            nodes: projection.nodes.iter().map(ConvexNodeRow::from).collect(),
661            edges: projection.edges.iter().map(ConvexEdgeRow::from).collect(),
662        }
663    }
664}
665
666#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
667#[serde(rename_all = "camelCase")]
668pub struct ConvexNodeRow {
669    pub external_id: String,
670    pub kind: String,
671    pub label: String,
672    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
673    pub properties: BTreeMap<String, String>,
674    #[serde(default, skip_serializing_if = "Vec::is_empty")]
675    pub provenance: Vec<GraphProvenance>,
676    #[serde(skip_serializing_if = "Option::is_none")]
677    pub freshness: Option<GraphFreshness>,
678}
679
680impl From<&GraphNode> for ConvexNodeRow {
681    fn from(node: &GraphNode) -> Self {
682        Self {
683            external_id: node.id.clone(),
684            kind: node.kind.clone(),
685            label: node.label.clone(),
686            properties: node.properties.clone(),
687            provenance: node.provenance.clone(),
688            freshness: node.freshness.clone(),
689        }
690    }
691}
692
693impl From<ConvexNodeRow> for GraphNode {
694    fn from(row: ConvexNodeRow) -> Self {
695        Self {
696            id: row.external_id,
697            kind: row.kind,
698            label: row.label,
699            properties: row.properties,
700            provenance: row.provenance,
701            freshness: row.freshness,
702        }
703    }
704}
705
706#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
707#[serde(rename_all = "camelCase")]
708pub struct ConvexEdgeRow {
709    pub edge_key: String,
710    pub from_external_id: String,
711    pub to_external_id: String,
712    pub kind: String,
713    #[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
714    pub properties: BTreeMap<String, String>,
715    #[serde(default, skip_serializing_if = "Vec::is_empty")]
716    pub provenance: Vec<GraphProvenance>,
717    #[serde(skip_serializing_if = "Option::is_none")]
718    pub freshness: Option<GraphFreshness>,
719}
720
721impl ConvexEdgeRow {
722    pub fn stable_key(from_id: &str, to_id: &str, kind: &str) -> String {
723        stable_graph_edge_id(from_id, to_id, kind)
724    }
725}
726
727impl From<&GraphEdge> for ConvexEdgeRow {
728    fn from(edge: &GraphEdge) -> Self {
729        Self {
730            edge_key: graph_edge_id(edge),
731            from_external_id: edge.from_id.clone(),
732            to_external_id: edge.to_id.clone(),
733            kind: edge.kind.clone(),
734            properties: edge.properties.clone(),
735            provenance: edge.provenance.clone(),
736            freshness: edge.freshness.clone(),
737        }
738    }
739}
740
741impl From<ConvexEdgeRow> for GraphEdge {
742    fn from(row: ConvexEdgeRow) -> Self {
743        Self {
744            id: row.edge_key,
745            from_id: row.from_external_id,
746            to_id: row.to_external_id,
747            kind: row.kind,
748            properties: row.properties,
749            provenance: row.provenance,
750            freshness: row.freshness,
751        }
752    }
753}
754
755pub trait ConvexGraphClient {
756    fn upsert_node_row(&self, row: &ConvexNodeRow) -> Result<()>;
757    fn upsert_edge_row(&self, row: &ConvexEdgeRow) -> Result<()>;
758    fn delete_node_row(&self, external_id: &str) -> Result<usize>;
759    fn delete_edge_row(&self, edge_key: &str) -> Result<usize>;
760    fn node_row(&self, external_id: &str) -> Result<Option<ConvexNodeRow>>;
761    fn node_rows(&self) -> Result<Vec<ConvexNodeRow>>;
762    fn edge_rows(&self) -> Result<Vec<ConvexEdgeRow>>;
763    fn node_rows_by_kind(&self, kind: &str) -> Result<Vec<ConvexNodeRow>>;
764    fn outgoing_edge_rows(
765        &self,
766        from_external_id: &str,
767        kind: Option<&str>,
768    ) -> Result<Vec<ConvexEdgeRow>>;
769}
770
771#[derive(Default)]
772pub struct ConvexRowsGraphClient {
773    nodes: RefCell<BTreeMap<String, ConvexNodeRow>>,
774    edges: RefCell<BTreeMap<String, ConvexEdgeRow>>,
775}
776
777impl ConvexRowsGraphClient {
778    pub fn from_rows(rows: ConvexProjectionRows) -> Self {
779        Self {
780            nodes: RefCell::new(
781                rows.nodes
782                    .into_iter()
783                    .map(|row| (row.external_id.clone(), row))
784                    .collect(),
785            ),
786            edges: RefCell::new(
787                rows.edges
788                    .into_iter()
789                    .map(|row| (row.edge_key.clone(), row))
790                    .collect(),
791            ),
792        }
793    }
794
795    pub fn to_rows(&self) -> ConvexProjectionRows {
796        ConvexProjectionRows {
797            nodes: self.nodes.borrow().values().cloned().collect(),
798            edges: self.edges.borrow().values().cloned().collect(),
799        }
800    }
801}
802
803impl ConvexGraphClient for ConvexRowsGraphClient {
804    fn upsert_node_row(&self, row: &ConvexNodeRow) -> Result<()> {
805        self.nodes
806            .borrow_mut()
807            .insert(row.external_id.clone(), row.clone());
808        Ok(())
809    }
810
811    fn upsert_edge_row(&self, row: &ConvexEdgeRow) -> Result<()> {
812        self.edges
813            .borrow_mut()
814            .insert(row.edge_key.clone(), row.clone());
815        Ok(())
816    }
817
818    fn delete_node_row(&self, external_id: &str) -> Result<usize> {
819        let mut edges = self.edges.borrow_mut();
820        let incident = edges
821            .values()
822            .filter(|row| row.from_external_id == external_id || row.to_external_id == external_id)
823            .map(|row| row.edge_key.clone())
824            .collect::<Vec<_>>();
825        for edge_key in incident {
826            edges.remove(&edge_key);
827        }
828        Ok(usize::from(
829            self.nodes.borrow_mut().remove(external_id).is_some(),
830        ))
831    }
832
833    fn delete_edge_row(&self, edge_key: &str) -> Result<usize> {
834        Ok(usize::from(
835            self.edges.borrow_mut().remove(edge_key).is_some(),
836        ))
837    }
838
839    fn node_row(&self, external_id: &str) -> Result<Option<ConvexNodeRow>> {
840        Ok(self.nodes.borrow().get(external_id).cloned())
841    }
842
843    fn node_rows(&self) -> Result<Vec<ConvexNodeRow>> {
844        Ok(self.nodes.borrow().values().cloned().collect())
845    }
846
847    fn edge_rows(&self) -> Result<Vec<ConvexEdgeRow>> {
848        Ok(self.edges.borrow().values().cloned().collect())
849    }
850
851    fn node_rows_by_kind(&self, kind: &str) -> Result<Vec<ConvexNodeRow>> {
852        Ok(self
853            .nodes
854            .borrow()
855            .values()
856            .filter(|row| row.kind == kind)
857            .cloned()
858            .collect())
859    }
860
861    fn outgoing_edge_rows(
862        &self,
863        from_external_id: &str,
864        kind: Option<&str>,
865    ) -> Result<Vec<ConvexEdgeRow>> {
866        Ok(self
867            .edges
868            .borrow()
869            .values()
870            .filter(|row| row.from_external_id == from_external_id)
871            .filter(|row| kind.is_none_or(|kind| row.kind == kind))
872            .cloned()
873            .collect())
874    }
875}
876
877pub struct ConvexGraphStore<C> {
878    client: C,
879}
880
881impl<C> ConvexGraphStore<C> {
882    pub fn new(client: C) -> Self {
883        Self { client }
884    }
885
886    pub fn client(&self) -> &C {
887        &self.client
888    }
889
890    pub fn into_inner(self) -> C {
891        self.client
892    }
893}
894
895impl<C: ConvexGraphClient> GraphStore for ConvexGraphStore<C> {
896    fn upsert_node(&self, node: &GraphNode) -> Result<()> {
897        self.client.upsert_node_row(&ConvexNodeRow::from(node))
898    }
899
900    fn upsert_edge(&self, edge: &GraphEdge) -> Result<()> {
901        if self.client.node_row(&edge.from_id)?.is_none() {
902            bail!(
903                "convex graph edge {} -> {} ({}) references missing from node",
904                edge.from_id,
905                edge.to_id,
906                edge.kind
907            );
908        }
909        if self.client.node_row(&edge.to_id)?.is_none() {
910            bail!(
911                "convex graph edge {} -> {} ({}) references missing to node",
912                edge.from_id,
913                edge.to_id,
914                edge.kind
915            );
916        }
917        self.client.upsert_edge_row(&ConvexEdgeRow::from(edge))
918    }
919
920    fn delete_node(&self, id: &str) -> Result<usize> {
921        let incident = self
922            .client
923            .edge_rows()?
924            .into_iter()
925            .filter(|row| row.from_external_id == id || row.to_external_id == id)
926            .map(|row| row.edge_key)
927            .collect::<Vec<_>>();
928        for edge_key in incident {
929            self.client.delete_edge_row(&edge_key)?;
930        }
931        self.client.delete_node_row(id)
932    }
933
934    fn delete_edge(&self, from_id: &str, to_id: &str, kind: &str) -> Result<usize> {
935        self.client
936            .delete_edge_row(&ConvexEdgeRow::stable_key(from_id, to_id, kind))
937    }
938
939    fn node(&self, id: &str) -> Result<Option<GraphNode>> {
940        Ok(self.client.node_row(id)?.map(GraphNode::from))
941    }
942
943    fn all_nodes(&self) -> Result<Vec<GraphNode>> {
944        let mut nodes: Vec<GraphNode> = self
945            .client
946            .node_rows()?
947            .into_iter()
948            .map(GraphNode::from)
949            .collect();
950        nodes.sort_by(|left, right| left.id.cmp(&right.id));
951        Ok(nodes)
952    }
953
954    fn all_edges(&self) -> Result<Vec<GraphEdge>> {
955        let mut edges: Vec<GraphEdge> = self
956            .client
957            .edge_rows()?
958            .into_iter()
959            .map(GraphEdge::from)
960            .collect();
961        edges.sort_by(|left, right| {
962            left.from_id
963                .cmp(&right.from_id)
964                .then(left.kind.cmp(&right.kind))
965                .then(left.to_id.cmp(&right.to_id))
966        });
967        Ok(edges)
968    }
969
970    fn graph_counts(&self) -> Result<(usize, usize)> {
971        Ok((
972            self.client.node_rows()?.len(),
973            self.client.edge_rows()?.len(),
974        ))
975    }
976
977    fn nodes_by_kind(&self, kind: &str) -> Result<Vec<GraphNode>> {
978        let mut nodes: Vec<GraphNode> = self
979            .client
980            .node_rows_by_kind(kind)?
981            .into_iter()
982            .map(GraphNode::from)
983            .collect();
984        nodes.sort_by(|left, right| left.id.cmp(&right.id));
985        Ok(nodes)
986    }
987
988    fn outgoing_edges(&self, from_id: &str, kind: Option<&str>) -> Result<Vec<GraphEdge>> {
989        let mut edges: Vec<GraphEdge> = self
990            .client
991            .outgoing_edge_rows(from_id, kind)?
992            .into_iter()
993            .map(GraphEdge::from)
994            .collect();
995        edges.sort_by(|left, right| {
996            left.to_id
997                .cmp(&right.to_id)
998                .then(left.kind.cmp(&right.kind))
999        });
1000        Ok(edges)
1001    }
1002
1003    fn shortest_path(
1004        &self,
1005        from_id: &str,
1006        to_id: &str,
1007        kind: Option<&str>,
1008    ) -> Result<Option<GraphPath>> {
1009        shortest_path_using_outgoing(from_id, to_id, kind, None, |current, kind| {
1010            self.outgoing_edges(current, kind)
1011        })
1012    }
1013}
1014
1015pub fn shortest_path_using_outgoing(
1016    from_id: &str,
1017    to_id: &str,
1018    kind: Option<&str>,
1019    max_hops: Option<usize>,
1020    mut outgoing_edges: impl FnMut(&str, Option<&str>) -> Result<Vec<GraphEdge>>,
1021) -> Result<Option<GraphPath>> {
1022    if from_id == to_id {
1023        return Ok(Some(GraphPath {
1024            nodes: vec![from_id.to_string()],
1025            hops: 0,
1026        }));
1027    }
1028
1029    let mut queue = VecDeque::new();
1030    let mut parent = BTreeMap::<String, String>::new();
1031    parent.insert(from_id.to_string(), String::new());
1032    queue.push_back(from_id.to_string());
1033
1034    while let Some(current) = queue.pop_front() {
1035        let current_depth = parent_depth(&parent, &current);
1036        if max_hops.is_some_and(|max_hops| current_depth >= max_hops) {
1037            continue;
1038        }
1039        for edge in outgoing_edges(&current, kind)? {
1040            if parent.contains_key(&edge.to_id) {
1041                continue;
1042            }
1043            parent.insert(edge.to_id.clone(), current.clone());
1044            if edge.to_id == to_id {
1045                let mut nodes = vec![to_id.to_string()];
1046                let mut cursor = to_id;
1047                while let Some(previous) = parent.get(cursor) {
1048                    if previous.is_empty() {
1049                        break;
1050                    }
1051                    nodes.push(previous.clone());
1052                    cursor = previous;
1053                }
1054                nodes.reverse();
1055                let hops = nodes.len().saturating_sub(1);
1056                return Ok(Some(GraphPath { nodes, hops }));
1057            }
1058            queue.push_back(edge.to_id);
1059        }
1060    }
1061
1062    Ok(None)
1063}
1064
1065fn parent_depth(parent: &BTreeMap<String, String>, id: &str) -> usize {
1066    let mut depth = 0usize;
1067    let mut cursor = id;
1068    while let Some(previous) = parent.get(cursor) {
1069        if previous.is_empty() {
1070            break;
1071        }
1072        depth += 1;
1073        cursor = previous;
1074    }
1075    depth
1076}
1077
1078#[cfg(test)]
1079mod tests {
1080    use super::*;
1081
1082    fn sample_provenance() -> GraphProvenance {
1083        GraphProvenance::new("fixture", "src/lib.rs:1").with_content_hash("hash-1")
1084    }
1085
1086    fn sample_projection() -> GraphProjection {
1087        let source = sample_provenance();
1088        GraphProjection {
1089            nodes: vec![
1090                GraphNode::new("doc:livekit", "document", "LiveKit guide")
1091                    .with_property("domain", "livekit")
1092                    .with_provenance(source.clone())
1093                    .with_freshness(GraphFreshness::content_hash("node-hash")),
1094                GraphNode::new("topic:rooms", "topic", "Rooms"),
1095                GraphNode::new("topic:egress", "topic", "Egress"),
1096            ],
1097            edges: vec![
1098                GraphEdge::new("doc:livekit", "topic:rooms", "mentions")
1099                    .with_property("confidence", "0.91")
1100                    .with_provenance(source.clone())
1101                    .with_freshness(GraphFreshness::content_hash("edge-hash")),
1102                GraphEdge::new("topic:rooms", "topic:egress", "related_to").with_provenance(source),
1103            ],
1104        }
1105    }
1106
1107    fn assert_projection_store_contract(store: &impl GraphStore) {
1108        let projection = sample_projection();
1109        projection.upsert_into(store).unwrap();
1110
1111        assert_eq!(
1112            store.node("doc:livekit").unwrap(),
1113            projection
1114                .nodes
1115                .iter()
1116                .find(|node| node.id == "doc:livekit")
1117                .cloned()
1118        );
1119        assert_eq!(
1120            store.nodes_by_kind("topic").unwrap(),
1121            vec![
1122                GraphNode::new("topic:egress", "topic", "Egress"),
1123                GraphNode::new("topic:rooms", "topic", "Rooms"),
1124            ]
1125        );
1126
1127        let mentions = store
1128            .outgoing_edges("doc:livekit", Some("mentions"))
1129            .unwrap();
1130        assert_eq!(mentions.len(), 1);
1131        assert_eq!(mentions[0].to_id, "topic:rooms");
1132        assert_eq!(
1133            mentions[0].properties.get("confidence"),
1134            Some(&"0.91".into())
1135        );
1136
1137        let path = store
1138            .shortest_path("doc:livekit", "topic:egress", None)
1139            .unwrap()
1140            .unwrap();
1141        assert_eq!(
1142            path.nodes,
1143            vec!["doc:livekit", "topic:rooms", "topic:egress"]
1144        );
1145    }
1146
1147    #[derive(Default)]
1148    struct MemoryConvexGraphClient {
1149        nodes: RefCell<BTreeMap<String, ConvexNodeRow>>,
1150        edges: RefCell<BTreeMap<String, ConvexEdgeRow>>,
1151    }
1152
1153    impl ConvexGraphClient for MemoryConvexGraphClient {
1154        fn upsert_node_row(&self, row: &ConvexNodeRow) -> Result<()> {
1155            self.nodes
1156                .borrow_mut()
1157                .insert(row.external_id.clone(), row.clone());
1158            Ok(())
1159        }
1160
1161        fn upsert_edge_row(&self, row: &ConvexEdgeRow) -> Result<()> {
1162            self.edges
1163                .borrow_mut()
1164                .insert(row.edge_key.clone(), row.clone());
1165            Ok(())
1166        }
1167
1168        fn delete_node_row(&self, external_id: &str) -> Result<usize> {
1169            Ok(usize::from(
1170                self.nodes.borrow_mut().remove(external_id).is_some(),
1171            ))
1172        }
1173
1174        fn delete_edge_row(&self, edge_key: &str) -> Result<usize> {
1175            Ok(usize::from(
1176                self.edges.borrow_mut().remove(edge_key).is_some(),
1177            ))
1178        }
1179
1180        fn node_row(&self, external_id: &str) -> Result<Option<ConvexNodeRow>> {
1181            Ok(self.nodes.borrow().get(external_id).cloned())
1182        }
1183
1184        fn node_rows(&self) -> Result<Vec<ConvexNodeRow>> {
1185            Ok(self.nodes.borrow().values().cloned().collect())
1186        }
1187
1188        fn edge_rows(&self) -> Result<Vec<ConvexEdgeRow>> {
1189            Ok(self.edges.borrow().values().cloned().collect())
1190        }
1191
1192        fn node_rows_by_kind(&self, kind: &str) -> Result<Vec<ConvexNodeRow>> {
1193            Ok(self
1194                .nodes
1195                .borrow()
1196                .values()
1197                .filter(|row| row.kind == kind)
1198                .cloned()
1199                .collect())
1200        }
1201
1202        fn outgoing_edge_rows(
1203            &self,
1204            from_external_id: &str,
1205            kind: Option<&str>,
1206        ) -> Result<Vec<ConvexEdgeRow>> {
1207            Ok(self
1208                .edges
1209                .borrow()
1210                .values()
1211                .filter(|row| row.from_external_id == from_external_id)
1212                .filter(|row| kind.is_none_or(|kind| row.kind == kind))
1213                .cloned()
1214                .collect())
1215        }
1216    }
1217
1218    #[test]
1219    fn graph_projection_round_trips_through_backend_agnostic_store_contract() {
1220        let convex = ConvexGraphStore::new(MemoryConvexGraphClient::default());
1221        assert_projection_store_contract(&convex);
1222
1223        let client = convex.client();
1224        assert_eq!(client.nodes.borrow().len(), 3);
1225        assert_eq!(client.edges.borrow().len(), 2);
1226        assert!(
1227            client.nodes.borrow().contains_key("doc:livekit"),
1228            "Convex rows keep GraphNode.id as the externalId upsert key"
1229        );
1230    }
1231
1232    #[test]
1233    fn graph_store_contract_covers_crud_neighborhood_and_ordering() {
1234        fn assert_crud_contract(store: &impl GraphStore) {
1235            let projection = sample_projection();
1236            projection.upsert_into(store).unwrap();
1237
1238            let neighborhood = store.neighborhood("doc:livekit", 2, None).unwrap().unwrap();
1239            assert_eq!(
1240                neighborhood
1241                    .nodes
1242                    .iter()
1243                    .map(|node| node.id.as_str())
1244                    .collect::<Vec<_>>(),
1245                vec!["doc:livekit", "topic:egress", "topic:rooms"]
1246            );
1247            assert_eq!(
1248                neighborhood
1249                    .edges
1250                    .iter()
1251                    .map(|edge| (
1252                        edge.from_id.as_str(),
1253                        edge.kind.as_str(),
1254                        edge.to_id.as_str()
1255                    ))
1256                    .collect::<Vec<_>>(),
1257                vec![
1258                    ("doc:livekit", "mentions", "topic:rooms"),
1259                    ("topic:rooms", "related_to", "topic:egress"),
1260                ]
1261            );
1262
1263            assert_eq!(
1264                store
1265                    .delete_edge("topic:rooms", "topic:egress", "related_to")
1266                    .unwrap(),
1267                1
1268            );
1269            assert!(
1270                store
1271                    .shortest_path("doc:livekit", "topic:egress", None)
1272                    .unwrap()
1273                    .is_none()
1274            );
1275            assert_eq!(store.delete_node("topic:rooms").unwrap(), 1);
1276            assert!(store.node("topic:rooms").unwrap().is_none());
1277            assert!(
1278                store
1279                    .outgoing_edges("doc:livekit", None)
1280                    .unwrap()
1281                    .is_empty()
1282            );
1283        }
1284
1285        assert_crud_contract(&ConvexGraphStore::new(ConvexRowsGraphClient::default()));
1286    }
1287
1288    #[test]
1289    fn convex_projection_rows_keep_stable_ids_and_edge_keys() {
1290        let projection = sample_projection();
1291        let rows = projection.to_convex_rows();
1292
1293        let doc_row = rows
1294            .nodes
1295            .iter()
1296            .find(|row| row.external_id == "doc:livekit")
1297            .unwrap();
1298        assert_eq!(doc_row.kind, "document");
1299        assert_eq!(doc_row.properties.get("domain"), Some(&"livekit".into()));
1300
1301        let mentions = rows
1302            .edges
1303            .iter()
1304            .find(|row| row.kind == "mentions")
1305            .unwrap();
1306        assert_eq!(mentions.from_external_id, "doc:livekit");
1307        assert_eq!(mentions.to_external_id, "topic:rooms");
1308        assert_eq!(
1309            mentions.edge_key,
1310            ConvexEdgeRow::stable_key("doc:livekit", "topic:rooms", "mentions")
1311        );
1312        assert!(mentions.edge_key.starts_with("edge:"));
1313    }
1314
1315    #[test]
1316    fn convex_store_rejects_edges_when_projection_nodes_are_missing() {
1317        let store = ConvexGraphStore::new(MemoryConvexGraphClient::default());
1318        store
1319            .upsert_node(&GraphNode::new("doc:livekit", "document", "LiveKit guide"))
1320            .unwrap();
1321
1322        let err = store
1323            .upsert_edge(&GraphEdge::new("doc:livekit", "topic:rooms", "mentions"))
1324            .unwrap_err();
1325        assert!(
1326            err.to_string().contains("references missing to node"),
1327            "{err}"
1328        );
1329    }
1330}