Skip to main content

nodedb_types/
graph.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Shared graph types used by both Origin and Lite CSR engines.
4
5use serde::{Deserialize, Serialize};
6
7/// Aggregate stats for a single graph collection (or the full edge store
8/// when no collection is specified).
9///
10/// Returned by [`NodeDb::graph_stats`]. Wire-safe: serializes to/from
11/// MessagePack so the value can cross the Data-Plane boundary unchanged.
12///
13/// The `node_count` field counts distinct node IDs observed as edge
14/// endpoints (not necessarily all nodes ever inserted). For Origin, this
15/// equals `distinct_node_count` from the persistent stats table; for Lite,
16/// it is derived from the CRDT edge store at query time.
17#[derive(
18    Debug,
19    Clone,
20    PartialEq,
21    Serialize,
22    Deserialize,
23    zerompk::ToMessagePack,
24    zerompk::FromMessagePack,
25)]
26#[msgpack(map)]
27pub struct GraphStats {
28    /// Name of the collection (or `"__edges"` on Lite when no collection is
29    /// supplied).
30    pub collection: String,
31    /// Distinct node IDs that appear as an edge source or destination.
32    pub node_count: u64,
33    /// Total number of edges in the collection.
34    pub edge_count: u64,
35    /// Number of distinct edge label strings.
36    pub distinct_label_count: u64,
37    /// Per-label edge counts, sorted ascending by label name.
38    pub labels: Vec<(String, u64)>,
39}
40
41/// Edge traversal direction.
42#[derive(
43    Debug,
44    Clone,
45    Copy,
46    PartialEq,
47    Eq,
48    Hash,
49    Serialize,
50    Deserialize,
51    zerompk::ToMessagePack,
52    zerompk::FromMessagePack,
53)]
54#[msgpack(c_enum)]
55pub enum Direction {
56    /// Outgoing edges only.
57    Out,
58    /// Incoming edges only.
59    In,
60    /// Both directions.
61    Both,
62}
63
64impl Direction {
65    pub fn as_str(&self) -> &'static str {
66        match self {
67            Self::Out => "out",
68            Self::In => "in",
69            Self::Both => "both",
70        }
71    }
72}
73
74impl std::fmt::Display for Direction {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        f.write_str(self.as_str())
77    }
78}
79
80impl std::str::FromStr for Direction {
81    type Err = String;
82    fn from_str(s: &str) -> Result<Self, Self::Err> {
83        match s.to_lowercase().as_str() {
84            "out" | "outgoing" => Ok(Self::Out),
85            "in" | "incoming" => Ok(Self::In),
86            "both" | "any" => Ok(Self::Both),
87            other => Err(format!("unknown direction: '{other}'")),
88        }
89    }
90}
91
92impl GraphStats {
93    /// Column names of the wire row shape produced by
94    /// `SHOW GRAPH STATS [<'collection'>]`. Single source of truth — the
95    /// parser validates against this, and clients use it to build the
96    /// `columns` slice in tests so the two never drift.
97    pub const EXPECTED_COLUMNS: [&'static str; 5] = [
98        "collection",
99        "node_count",
100        "edge_count",
101        "distinct_label_count",
102        "labels",
103    ];
104
105    pub fn zero(collection: impl Into<String>) -> Self {
106        Self {
107            collection: collection.into(),
108            node_count: 0,
109            edge_count: 0,
110            distinct_label_count: 0,
111            labels: Vec::new(),
112        }
113    }
114
115    /// Parse the wire shape produced by `SHOW GRAPH STATS [<'collection'>]`
116    /// into a vec of `GraphStats`, one entry per row. Used by both the
117    /// native and the pgwire remote clients — keeping a single parser here
118    /// is what makes the `wire_shape:` smoke-probe error messages a
119    /// meaningful diagnostic rather than a coincidence between two copies.
120    ///
121    /// Expected columns: `(collection, node_count, edge_count,
122    /// distinct_label_count, labels)`, where `labels` is a JSON array of
123    /// `{"label": str, "count": u64}` objects.
124    ///
125    /// Both wire paths can deliver count cells as `Value::Integer`
126    /// (extended-query / native typed) or `Value::String` (pgwire simple
127    /// query text protocol); both shapes are accepted.
128    ///
129    /// Column-shape mismatches surface as errors (this is exactly the bug
130    /// class the smoke probe is designed to trap, so no fallback); empty
131    /// rows return an empty vec — callers decide how to interpret no rows.
132    pub fn parse_show_stats_response(
133        columns: &[String],
134        rows: &[Vec<crate::value::Value>],
135    ) -> crate::error::NodeDbResult<Vec<Self>> {
136        use crate::error::NodeDbError;
137
138        if columns.len() != Self::EXPECTED_COLUMNS.len()
139            || columns
140                .iter()
141                .zip(Self::EXPECTED_COLUMNS.iter())
142                .any(|(a, b)| a != b)
143        {
144            if !columns.is_empty() {
145                return Err(NodeDbError::storage(format!(
146                    "wire_shape: SHOW GRAPH STATS returned unexpected columns: {columns:?}"
147                )));
148            }
149            // No columns and no rows: treat as empty result.
150            return Ok(Vec::new());
151        }
152
153        if rows.is_empty() {
154            return Ok(Vec::new());
155        }
156
157        let mut out = Vec::with_capacity(rows.len());
158        for row in rows {
159            out.push(Self::parse_one_row(row)?);
160        }
161        Ok(out)
162    }
163
164    fn parse_one_row(row: &[crate::value::Value]) -> crate::error::NodeDbResult<Self> {
165        use crate::error::NodeDbError;
166
167        let coll_name = row
168            .first()
169            .and_then(|v| v.as_str())
170            .ok_or_else(|| {
171                NodeDbError::storage("wire_shape: SHOW GRAPH STATS: missing collection cell")
172            })?
173            .to_string();
174        let node_count = row.get(1).and_then(parse_u64_cell).ok_or_else(|| {
175            NodeDbError::storage("wire_shape: SHOW GRAPH STATS: missing node_count")
176        })?;
177        let edge_count = row.get(2).and_then(parse_u64_cell).ok_or_else(|| {
178            NodeDbError::storage("wire_shape: SHOW GRAPH STATS: missing edge_count")
179        })?;
180        let distinct_label_count = row.get(3).and_then(parse_u64_cell).ok_or_else(|| {
181            NodeDbError::storage("wire_shape: SHOW GRAPH STATS: missing distinct_label_count")
182        })?;
183        let labels_json = row.get(4).and_then(|v| v.as_str()).unwrap_or("[]");
184        let parsed: Vec<sonic_rs::Value> = sonic_rs::from_str(labels_json)
185            .map_err(|e| NodeDbError::storage(format!("wire_shape: labels JSON parse: {e}")))?;
186        let mut labels: Vec<(String, u64)> = Vec::with_capacity(parsed.len());
187        for entry in &parsed {
188            use sonic_rs::JsonValueTrait;
189            let label = entry
190                .get("label")
191                .and_then(|v| v.as_str())
192                .ok_or_else(|| NodeDbError::storage("wire_shape: labels entry missing 'label'"))?
193                .to_string();
194            let count = entry
195                .get("count")
196                .and_then(|v| v.as_u64())
197                .ok_or_else(|| NodeDbError::storage("wire_shape: labels entry missing 'count'"))?;
198            labels.push((label, count));
199        }
200        Ok(Self {
201            collection: coll_name,
202            node_count,
203            edge_count,
204            distinct_label_count,
205            labels,
206        })
207    }
208}
209
210/// Parse a count cell that may arrive typed (`Value::Integer`) or as
211/// pgwire text (`Value::String`).
212fn parse_u64_cell(v: &crate::value::Value) -> Option<u64> {
213    match v {
214        crate::value::Value::Integer(i) => Some(*i as u64),
215        crate::value::Value::String(s) => s.parse::<u64>().ok(),
216        _ => None,
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223
224    #[test]
225    fn direction_roundtrip() {
226        for dir in [Direction::Out, Direction::In, Direction::Both] {
227            let s = dir.as_str();
228            let parsed: Direction = s.parse().unwrap();
229            assert_eq!(dir, parsed);
230        }
231    }
232
233    #[test]
234    fn direction_display() {
235        assert_eq!(Direction::Out.to_string(), "out");
236    }
237
238    #[test]
239    fn graph_stats_zero() {
240        let s = GraphStats::zero("my_coll");
241        assert_eq!(s.collection, "my_coll");
242        assert_eq!(s.node_count, 0);
243        assert_eq!(s.edge_count, 0);
244        assert_eq!(s.distinct_label_count, 0);
245        assert!(s.labels.is_empty());
246    }
247
248    #[test]
249    fn graph_stats_serde_round_trip() {
250        let s = GraphStats {
251            collection: "coll".into(),
252            node_count: 10,
253            edge_count: 5,
254            distinct_label_count: 2,
255            labels: vec![("KNOWS".into(), 3), ("OWNS".into(), 2)],
256        };
257        let json = sonic_rs::to_string(&s).unwrap();
258        let back: GraphStats = sonic_rs::from_str(&json).unwrap();
259        assert_eq!(back, s);
260    }
261
262    #[test]
263    fn parse_show_stats_multi_row() {
264        use crate::value::Value;
265        let columns: Vec<String> = GraphStats::EXPECTED_COLUMNS
266            .iter()
267            .map(|s| s.to_string())
268            .collect();
269        let labels_json = r#"[{"label":"KNOWS","count":3},{"label":"OWNS","count":2}]"#;
270        let rows = vec![
271            vec![
272                Value::String("social".into()),
273                // pgwire simple-query text protocol arrives as strings;
274                // the native protocol arrives as integers — both shapes are accepted.
275                Value::String("10".into()),
276                Value::Integer(5),
277                Value::String("2".into()),
278                Value::String(labels_json.into()),
279            ],
280            vec![
281                Value::String("comms".into()),
282                Value::Integer(3),
283                Value::Integer(2),
284                Value::Integer(1),
285                Value::String(r#"[{"label":"CALLS","count":2}]"#.into()),
286            ],
287        ];
288        let result = GraphStats::parse_show_stats_response(&columns, &rows).unwrap();
289        assert_eq!(result.len(), 2);
290        let social = &result[0];
291        assert_eq!(social.collection, "social");
292        assert_eq!(social.node_count, 10);
293        assert_eq!(social.edge_count, 5);
294        assert_eq!(social.distinct_label_count, 2);
295        assert_eq!(social.labels, vec![("KNOWS".into(), 3), ("OWNS".into(), 2)]);
296        let comms = &result[1];
297        assert_eq!(comms.collection, "comms");
298        assert_eq!(comms.edge_count, 2);
299        assert_eq!(comms.labels, vec![("CALLS".into(), 2)]);
300    }
301
302    #[test]
303    fn parse_show_stats_empty_rows_returns_empty_vec() {
304        let columns: Vec<String> = GraphStats::EXPECTED_COLUMNS
305            .iter()
306            .map(|s| s.to_string())
307            .collect();
308        let result = GraphStats::parse_show_stats_response(&columns, &[]).unwrap();
309        assert!(result.is_empty());
310    }
311
312    #[test]
313    fn parse_show_stats_wrong_columns_errors() {
314        let columns = vec!["id".to_string(), "count".to_string()];
315        let err = GraphStats::parse_show_stats_response(&columns, &[]).unwrap_err();
316        assert!(
317            err.to_string().contains("unexpected columns"),
318            "error should mention unexpected columns: {err}"
319        );
320    }
321
322    #[test]
323    fn parse_show_stats_no_columns_no_rows_returns_empty_vec() {
324        let result = GraphStats::parse_show_stats_response(&[], &[]).unwrap();
325        assert!(result.is_empty());
326    }
327
328    #[test]
329    fn graph_stats_msgpack_round_trip() {
330        let s = GraphStats {
331            collection: "coll".into(),
332            node_count: 7,
333            edge_count: 3,
334            distinct_label_count: 1,
335            labels: vec![("FOLLOWS".into(), 3)],
336        };
337        let bytes = zerompk::to_msgpack_vec(&s).unwrap();
338        let back: GraphStats = zerompk::from_msgpack(&bytes).unwrap();
339        assert_eq!(back, s);
340    }
341}