Skip to main content

kyma_graph/
stored_graph.rs

1//! Stored-graph provider: serves a registered property-graph by querying its
2//! node/edge tables via a `GraphQueryExecutor` and shaping rows into wire types.
3
4use std::collections::BTreeMap;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8
9use crate::executor::{GraphQueryExecutor, JsonRow, StoredGraphConfig};
10use crate::provider::GraphProvider;
11use crate::types::{
12    Direction, EdgeExpansion, GraphNode, GraphPayload, GraphRelationship, GraphSchema, GraphStats,
13    NodeMetadata, Props, SearchHits,
14};
15
16const NOW: &str = "1970-01-01T00:00:00Z";
17
18fn ident(name: &str) -> String {
19    format!("\"{}\"", name.replace('"', "\"\""))
20}
21fn lit(v: &str) -> String {
22    format!("'{}'", v.replace('\'', "''"))
23}
24fn in_list(values: &[String]) -> String {
25    values.iter().map(|v| lit(v)).collect::<Vec<_>>().join(",")
26}
27/// JSON value → plain string (unwrap JSON strings; stringify the rest).
28fn as_str(v: &serde_json::Value) -> String {
29    match v {
30        serde_json::Value::String(s) => s.clone(),
31        other => other.to_string(),
32    }
33}
34
35/// A deduplicated node source: one row per node id.
36///
37/// Connector tables are append-only, so a *continuous* connector re-ingests a
38/// fresh row for every node on each poll (metadata is re-emitted even when
39/// unchanged). Without dedup, the graph would render N copies of every node
40/// after N polls. We keep one row per id via a `ROW_NUMBER()` window. There is
41/// no reliable ingest-order column to pick the truly-latest row, so the choice
42/// is deterministic-arbitrary; re-ingested rows are identical in practice.
43fn node_source(c: &StoredGraphConfig) -> String {
44    format!(
45        "(select * from (select *, row_number() over (partition by {id} order by {id}) as __rn from {t}) where __rn = 1)",
46        id = ident(&c.id_col), t = ident(&c.node_table),
47    )
48}
49/// A deduplicated edge source: one row per (src, dst, type), matching the
50/// connector's deterministic edge identity (`hash(src, type, dst)`).
51fn edge_source(c: &StoredGraphConfig) -> String {
52    format!(
53        "(select * from (select *, row_number() over (partition by {s},{d},{ty} order by {s}) as __rn from {t}) where __rn = 1)",
54        s = ident(&c.src_col), d = ident(&c.dst_col), ty = ident(&c.type_col), t = ident(&c.edge_table),
55    )
56}
57
58pub(crate) fn node_sample_sql(c: &StoredGraphConfig, limit: usize) -> String {
59    format!("select * from {} limit {}", node_source(c), limit)
60}
61pub(crate) fn edge_sample_sql(c: &StoredGraphConfig, limit: usize) -> String {
62    format!("select * from {} limit {}", edge_source(c), limit)
63}
64pub(crate) fn node_by_id_sql(c: &StoredGraphConfig, id: &str) -> String {
65    format!("select * from {} where {} = {} limit 1", node_source(c), ident(&c.id_col), lit(id))
66}
67pub(crate) fn neighbors_sql(c: &StoredGraphConfig, ids: &[String], dir: Direction, limit: usize) -> String {
68    let list = in_list(ids);
69    let pred = match dir {
70        Direction::Forward => format!("{} in ({list})", ident(&c.src_col)),
71        Direction::Backward => format!("{} in ({list})", ident(&c.dst_col)),
72        Direction::Both => format!("{} in ({list}) or {} in ({list})", ident(&c.src_col), ident(&c.dst_col)),
73    };
74    format!("select * from {} where {pred} limit {limit}", edge_source(c))
75}
76pub(crate) fn search_sql(c: &StoredGraphConfig, text: &str, limit: usize, offset: usize) -> String {
77    let needle = lit(&format!("%{}%", text.to_lowercase()));
78    format!(
79        "select * from {t} where lower(cast({id} as varchar)) like {n} or lower(cast({lbl} as varchar)) like {n} limit {limit} offset {offset}",
80        t = node_source(c), id = ident(&c.id_col), lbl = ident(&c.label_col), n = needle,
81    )
82}
83/// Count rows of an already-built (possibly deduplicated) source expression.
84pub(crate) fn count_sql(source: &str) -> String {
85    format!("select count(*) as n from {}", source)
86}
87/// Group-count over an already-built source expression.
88pub(crate) fn group_count_sql(source: &str, col: &str) -> String {
89    format!("select cast({c} as varchar) as k, count(*) as n from {t} group by {c}", c = ident(col), t = source)
90}
91
92fn parse_labels(v: Option<&serde_json::Value>) -> Vec<String> {
93    match v {
94        Some(serde_json::Value::Array(a)) => a.iter().map(as_str).collect(),
95        Some(serde_json::Value::String(s)) => vec![s.clone()],
96        Some(other) => vec![other.to_string()],
97        None => vec![],
98    }
99}
100
101/// The conventional dynamic-JSON catch-all column (see kyma's default table
102/// schema). Connectors stash entity-specific fields here as a JSON blob.
103const PROPS_COL: &str = "props";
104
105/// Decode an even-length lowercase/uppercase hex string into bytes.
106fn hex_decode(s: &str) -> Option<Vec<u8>> {
107    if s.is_empty() || s.len() % 2 != 0 {
108        return None;
109    }
110    let val = |c: u8| match c {
111        b'0'..=b'9' => Some(c - b'0'),
112        b'a'..=b'f' => Some(c - b'a' + 10),
113        b'A'..=b'F' => Some(c - b'A' + 10),
114        _ => None,
115    };
116    let b = s.as_bytes();
117    let mut out = Vec::with_capacity(b.len() / 2);
118    let mut i = 0;
119    while i < b.len() {
120        out.push((val(b[i])? << 4) | val(b[i + 1])?);
121        i += 2;
122    }
123    Some(out)
124}
125
126/// Decode the `props` catch-all blob into an object so its fields surface as
127/// real properties. `ensure_table` types `props` as Binary, so the executor
128/// (arrow-json) renders it as a hex string; we hex-decode → UTF-8 → JSON. Also
129/// handles a Utf8 `props` column whose value is the JSON text directly.
130fn decode_props_blob(v: &serde_json::Value) -> Option<serde_json::Map<String, serde_json::Value>> {
131    let s = v.as_str()?;
132    if let Some(bytes) = hex_decode(s) {
133        if let Ok(txt) = std::str::from_utf8(&bytes) {
134            if let Ok(serde_json::Value::Object(m)) = serde_json::from_str(txt) {
135                return Some(m);
136            }
137        }
138    }
139    if let Ok(serde_json::Value::Object(m)) = serde_json::from_str::<serde_json::Value>(s) {
140        return Some(m);
141    }
142    None
143}
144
145/// Build the property map for a row: explicit (promoted) columns win; the
146/// `props` blob is decoded and merged in to fill any remaining keys. Role
147/// columns and the `__rn` dedup helper are excluded.
148fn collect_props(row: &JsonRow, role_cols: &[&str]) -> Props {
149    let mut props: Props = BTreeMap::new();
150    let mut blob: Option<&serde_json::Value> = None;
151    for (k, v) in row {
152        if role_cols.contains(&k.as_str()) || k == "__rn" {
153            continue;
154        }
155        if k == PROPS_COL {
156            blob = Some(v);
157            continue;
158        }
159        props.insert(k.clone(), v.clone());
160    }
161    if let Some(v) = blob {
162        match decode_props_blob(v) {
163            Some(obj) => {
164                for (pk, pv) in obj {
165                    props.entry(pk).or_insert(pv);
166                }
167            }
168            // Undecodable (not JSON): keep the raw value so nothing is lost.
169            None => {
170                props.insert(PROPS_COL.to_string(), v.clone());
171            }
172        }
173    }
174    props
175}
176
177pub(crate) fn row_to_node(c: &StoredGraphConfig, row: &JsonRow) -> GraphNode {
178    let id = row.get(&c.id_col).map(as_str).unwrap_or_default();
179    let labels = parse_labels(row.get(&c.label_col));
180    let realm = c.realm_col.as_ref().and_then(|rc| row.get(rc)).map(as_str).unwrap_or_else(|| c.database.clone());
181    let role_cols: [&str; 3] = [c.id_col.as_str(), c.label_col.as_str(), c.realm_col.as_deref().unwrap_or("")];
182    let props = collect_props(row, &role_cols);
183    GraphNode {
184        id, labels, properties: props,
185        metadata: NodeMetadata { created_at: NOW.into(), updated_at: NOW.into(), source_type: Some("stored".into()), source_id: None, realm },
186    }
187}
188
189pub(crate) fn row_to_edge(c: &StoredGraphConfig, row: &JsonRow) -> GraphRelationship {
190    let src = row.get(&c.src_col).map(as_str).unwrap_or_default();
191    let dst = row.get(&c.dst_col).map(as_str).unwrap_or_default();
192    let ty = row.get(&c.type_col).map(as_str).unwrap_or_default();
193    let role_cols = [c.src_col.as_str(), c.dst_col.as_str(), c.type_col.as_str()];
194    let props = collect_props(row, &role_cols);
195    GraphRelationship {
196        id: format!("{src}->{dst}:{ty}"),
197        source_id: src, target_id: dst, relationship_type: ty, properties: props,
198    }
199}
200
201pub struct StoredGraphProvider {
202    cfg: StoredGraphConfig,
203    exec: Arc<dyn GraphQueryExecutor>,
204}
205
206impl StoredGraphProvider {
207    pub fn new(cfg: StoredGraphConfig, exec: Arc<dyn GraphQueryExecutor>) -> Self {
208        Self { cfg, exec }
209    }
210    async fn rows(&self, sql: String) -> anyhow::Result<Vec<JsonRow>> {
211        self.exec.query(&self.cfg.database, sql).await
212    }
213    fn count_of(rows: &[JsonRow]) -> usize {
214        rows.first().and_then(|r| r.get("n")).and_then(|v| v.as_u64()).unwrap_or(0) as usize
215    }
216}
217
218async fn stats_for(p: &StoredGraphProvider) -> anyhow::Result<GraphStats> {
219    let total_nodes = StoredGraphProvider::count_of(&p.rows(count_sql(&node_source(&p.cfg))).await?);
220    let total_relationships = StoredGraphProvider::count_of(&p.rows(count_sql(&edge_source(&p.cfg))).await?);
221    let mut label_counts = BTreeMap::new();
222    for r in p.rows(group_count_sql(&node_source(&p.cfg), &p.cfg.label_col)).await? {
223        let k = r.get("k").map(as_str).unwrap_or_default();
224        let n = r.get("n").and_then(|v| v.as_u64()).unwrap_or(0) as usize;
225        label_counts.insert(k, n);
226    }
227    let mut relationship_type_counts = BTreeMap::new();
228    for r in p.rows(group_count_sql(&edge_source(&p.cfg), &p.cfg.type_col)).await? {
229        let k = r.get("k").map(as_str).unwrap_or_default();
230        let n = r.get("n").and_then(|v| v.as_u64()).unwrap_or(0) as usize;
231        relationship_type_counts.insert(k, n);
232    }
233    Ok(GraphStats { total_nodes, total_relationships, label_counts, relationship_type_counts })
234}
235
236#[async_trait]
237impl GraphProvider for StoredGraphProvider {
238    async fn overview(&self, _realm: Option<&str>, limit: usize) -> anyhow::Result<GraphPayload> {
239        let nodes: Vec<GraphNode> = self.rows(node_sample_sql(&self.cfg, limit)).await?
240            .iter().map(|r| row_to_node(&self.cfg, r)).collect();
241        let kept: std::collections::HashSet<&String> = nodes.iter().map(|n| &n.id).collect();
242        let edges: Vec<GraphRelationship> = self.rows(edge_sample_sql(&self.cfg, limit.saturating_mul(4))).await?
243            .iter().map(|r| row_to_edge(&self.cfg, r))
244            .filter(|e| kept.contains(&e.source_id) && kept.contains(&e.target_id))
245            .collect();
246        let stats = stats_for(self).await?;
247        Ok(GraphPayload { stats, nodes, edges })
248    }
249    async fn node(&self, id: &str) -> anyhow::Result<Option<GraphNode>> {
250        Ok(self.rows(node_by_id_sql(&self.cfg, id)).await?.first().map(|r| row_to_node(&self.cfg, r)))
251    }
252    async fn neighbors(&self, ids: &[String], dir: Direction, _only_internal: bool, limit: usize) -> anyhow::Result<EdgeExpansion> {
253        if ids.is_empty() { return Ok(EdgeExpansion { edges: vec![], new_node_ids: vec![] }); }
254        let edges: Vec<GraphRelationship> = self.rows(neighbors_sql(&self.cfg, ids, dir, limit)).await?
255            .iter().map(|r| row_to_edge(&self.cfg, r)).collect();
256        let idset: std::collections::HashSet<&String> = ids.iter().collect();
257        let mut new_ids = Vec::new();
258        for e in &edges {
259            for end in [&e.source_id, &e.target_id] {
260                if !idset.contains(end) && !new_ids.contains(end) { new_ids.push(end.clone()); }
261            }
262        }
263        Ok(EdgeExpansion { edges, new_node_ids: new_ids })
264    }
265    async fn subgraph(&self, id: &str, depth: usize) -> anyhow::Result<GraphPayload> {
266        let mut frontier = vec![id.to_string()];
267        let mut all_node_ids: std::collections::HashSet<String> = frontier.iter().cloned().collect();
268        let mut edges: Vec<GraphRelationship> = Vec::new();
269        for _ in 0..depth.max(1) {
270            if frontier.is_empty() { break; }
271            let exp = self.neighbors(&frontier, Direction::Both, true, 500).await?;
272            let mut next = Vec::new();
273            for e in exp.edges {
274                if !edges.iter().any(|k| k.id == e.id) { edges.push(e); }
275            }
276            for nid in exp.new_node_ids {
277                if all_node_ids.insert(nid.clone()) { next.push(nid); }
278            }
279            frontier = next;
280        }
281        // fetch node objects for the collected ids
282        let mut nodes = Vec::new();
283        for nid in &all_node_ids {
284            if let Some(n) = self.node(nid).await? { nodes.push(n); }
285        }
286        let stats = GraphStats {
287            total_nodes: nodes.len(), total_relationships: edges.len(),
288            label_counts: BTreeMap::new(), relationship_type_counts: BTreeMap::new(),
289        };
290        Ok(GraphPayload { stats, nodes, edges })
291    }
292    async fn search(&self, text: &str, labels: &[String], _realm: Option<&str>, limit: usize, offset: usize) -> anyhow::Result<SearchHits> {
293        let mut hits: Vec<GraphNode> = self.rows(search_sql(&self.cfg, text, limit, offset)).await?
294            .iter().map(|r| row_to_node(&self.cfg, r)).collect();
295        if !labels.is_empty() {
296            hits.retain(|n| labels.iter().any(|l| n.labels.contains(l)));
297        }
298        let total = hits.len();
299        Ok(SearchHits { hits, total, limit, offset })
300    }
301    async fn stats(&self, _realm: Option<&str>) -> anyhow::Result<GraphStats> {
302        stats_for(self).await
303    }
304    async fn schema(&self) -> anyhow::Result<GraphSchema> {
305        let stats = stats_for(self).await?;
306        Ok(GraphSchema {
307            node_kinds: stats.label_counts.keys().cloned().collect(),
308            edge_types: stats.relationship_type_counts.keys().cloned().collect(),
309            property_keys: BTreeMap::new(),
310        })
311    }
312}
313
314#[cfg(test)]
315mod sql_tests {
316    use super::*;
317    use crate::executor::StoredGraphConfig;
318
319    fn cfg() -> StoredGraphConfig {
320        StoredGraphConfig {
321            database: "kg".into(), node_table: "kg_nodes".into(), edge_table: "kg_edges".into(),
322            id_col: "id".into(), label_col: "labels".into(),
323            src_col: "src".into(), dst_col: "dst".into(), type_col: "type".into(),
324            realm_col: Some("realm".into()),
325        }
326    }
327
328    #[test]
329    fn node_by_id_sql_quotes_and_escapes() {
330        let s = node_by_id_sql(&cfg(), "a'b");
331        assert!(s.contains(r#"from "kg_nodes""#), "{s}");
332        assert!(s.contains(r#""id" = 'a''b'"#), "{s}");
333        assert!(s.to_lowercase().contains("limit 1"));
334    }
335
336    #[test]
337    fn neighbors_sql_both_directions() {
338        let s = neighbors_sql(&cfg(), &["x".into(), "y".into()], Direction::Both, 50);
339        assert!(s.contains(r#""src" in ('x','y')"#), "{s}");
340        assert!(s.contains(r#""dst" in ('x','y')"#), "{s}");
341        assert!(s.to_lowercase().contains("limit 50"));
342    }
343
344    #[test]
345    fn node_sample_dedups_by_id() {
346        // Append-only connector tables accumulate a fresh row per node on every
347        // poll; the node source must collapse to one row per id.
348        let s = node_sample_sql(&cfg(), 50).to_lowercase();
349        assert!(s.contains("row_number()"), "{s}");
350        assert!(s.contains(r#"partition by "id""#), "{s}");
351        assert!(s.contains("kg_nodes"), "{s}");
352        assert!(s.contains("limit 50"), "{s}");
353    }
354
355    #[test]
356    fn edge_sample_dedups_by_src_dst_type() {
357        let s = edge_sample_sql(&cfg(), 50).to_lowercase();
358        assert!(s.contains("row_number()"), "{s}");
359        assert!(s.contains(r#"partition by "src","dst","type""#), "{s}");
360        assert!(s.contains("kg_edges"), "{s}");
361    }
362
363    #[test]
364    fn count_and_group_count_run_over_deduped_source() {
365        // stats must count distinct ids, not raw rows.
366        let cnt = count_sql(&node_source(&cfg())).to_lowercase();
367        assert!(cnt.contains("count(*)") && cnt.contains("row_number()"), "{cnt}");
368        let grp = group_count_sql(&node_source(&cfg()), &cfg().label_col).to_lowercase();
369        assert!(grp.contains("group by") && grp.contains("row_number()"), "{grp}");
370    }
371
372    #[test]
373    fn row_to_node_uses_roles() {
374        let mut row = JsonRow::new();
375        row.insert("id".into(), serde_json::json!("n1"));
376        row.insert("labels".into(), serde_json::json!("Service"));
377        row.insert("realm".into(), serde_json::json!("prod"));
378        row.insert("owner".into(), serde_json::json!("team-a"));
379        let n = row_to_node(&cfg(), &row);
380        assert_eq!(n.id, "n1");
381        assert_eq!(n.labels, vec!["Service".to_string()]);
382        assert_eq!(n.metadata.realm, "prod");
383        assert_eq!(n.properties.get("owner").unwrap(), &serde_json::json!("team-a"));
384        assert!(!n.properties.contains_key("id")); // role columns excluded from props
385    }
386
387    #[test]
388    fn row_to_edge_uses_roles() {
389        let mut row = JsonRow::new();
390        row.insert("src".into(), serde_json::json!("a"));
391        row.insert("dst".into(), serde_json::json!("b"));
392        row.insert("type".into(), serde_json::json!("CALLS"));
393        row.insert("weight".into(), serde_json::json!(5));
394        let e = row_to_edge(&cfg(), &row);
395        assert_eq!(e.source_id, "a");
396        assert_eq!(e.target_id, "b");
397        assert_eq!(e.relationship_type, "CALLS");
398        assert_eq!(e.properties.get("weight").unwrap(), &serde_json::json!(5));
399    }
400
401    #[test]
402    fn hex_decode_round_trips() {
403        // `{"k":"v"}` as lowercase hex.
404        assert_eq!(hex_decode("7b226b223a2276227d").unwrap(), br#"{"k":"v"}"#.to_vec());
405        assert!(hex_decode("abc").is_none(), "odd length");
406        assert!(hex_decode("zz").is_none(), "non-hex");
407    }
408
409    #[test]
410    fn decode_props_blob_handles_hex_and_plain_json() {
411        // Binary props arrives hex-encoded from arrow-json.
412        let hex = serde_json::json!("7b226b223a2276227d");
413        let m = decode_props_blob(&hex).expect("hex json");
414        assert_eq!(m.get("k").unwrap(), &serde_json::json!("v"));
415        // A Utf8 props column arrives as the JSON text directly.
416        let plain = serde_json::json!(r#"{"a":1}"#);
417        assert_eq!(decode_props_blob(&plain).unwrap().get("a").unwrap(), &serde_json::json!(1));
418        // A non-JSON hex string (e.g. a sha) is left undecoded.
419        assert!(decode_props_blob(&serde_json::json!("deadbeef")).is_none());
420    }
421
422    #[test]
423    fn collect_props_merges_blob_and_explicit_wins() {
424        // hex of {"language":"python","name":"blob-name"}
425        let blob = "7b226c616e6775616765223a22707974686f6e222c226e616d65223a22626c6f622d6e616d65227d";
426        let mut row = JsonRow::new();
427        row.insert("id".into(), serde_json::json!("file:1"));
428        row.insert("labels".into(), serde_json::json!("CodeFile"));
429        row.insert("name".into(), serde_json::json!("real-name")); // explicit promoted col
430        row.insert("props".into(), serde_json::json!(blob));
431        row.insert("__rn".into(), serde_json::json!(1)); // dedup helper — excluded
432        let props = collect_props(&row, &["id", "labels"]);
433        // decoded blob field surfaces
434        assert_eq!(props.get("language").unwrap(), &serde_json::json!("python"));
435        // explicit `name` column wins over the blob's `name`
436        assert_eq!(props.get("name").unwrap(), &serde_json::json!("real-name"));
437        // helper + role cols excluded; raw hex `props` not leaked
438        assert!(!props.contains_key("__rn"));
439        assert!(!props.contains_key("props"));
440        assert!(!props.contains_key("id"));
441    }
442
443    #[test]
444    fn collect_props_keeps_undecodable_blob_raw() {
445        let mut row = JsonRow::new();
446        row.insert("props".into(), serde_json::json!("not-json"));
447        let props = collect_props(&row, &[]);
448        assert_eq!(props.get("props").unwrap(), &serde_json::json!("not-json"));
449    }
450}
451
452#[cfg(test)]
453mod provider_tests {
454    use super::*;
455    use crate::executor::{GraphQueryExecutor, JsonRow, StoredGraphConfig};
456
457    struct FakeExec;
458    fn row(pairs: &[(&str, serde_json::Value)]) -> JsonRow {
459        pairs.iter().map(|(k, v)| (k.to_string(), v.clone())).collect()
460    }
461    #[async_trait]
462    impl GraphQueryExecutor for FakeExec {
463        async fn query(&self, _db: &str, sql: String) -> anyhow::Result<Vec<JsonRow>> {
464            let s = sql.to_lowercase();
465            if s.contains("count(*)") && s.contains("group by") {
466                return Ok(vec![row(&[("k", serde_json::json!("Service")), ("n", serde_json::json!(2))])]);
467            }
468            if s.contains("count(*)") {
469                return Ok(vec![row(&[("n", serde_json::json!(2))])]);
470            }
471            if s.contains("kg_nodes") {
472                return Ok(vec![
473                    row(&[("id", serde_json::json!("a")), ("labels", serde_json::json!("Service"))]),
474                    row(&[("id", serde_json::json!("b")), ("labels", serde_json::json!("Service"))]),
475                ]);
476            }
477            if s.contains("kg_edges") {
478                return Ok(vec![row(&[("src", serde_json::json!("a")), ("dst", serde_json::json!("b")), ("type", serde_json::json!("CALLS"))])]);
479            }
480            Ok(vec![])
481        }
482    }
483    fn provider() -> StoredGraphProvider {
484        StoredGraphProvider::new(
485            StoredGraphConfig { database: "kg".into(), node_table: "kg_nodes".into(), edge_table: "kg_edges".into(),
486                id_col: "id".into(), label_col: "labels".into(), src_col: "src".into(), dst_col: "dst".into(), type_col: "type".into(), realm_col: None },
487            std::sync::Arc::new(FakeExec),
488        )
489    }
490
491    #[tokio::test]
492    async fn overview_shapes_nodes_edges_and_stats() {
493        let p = provider();
494        let ov = p.overview(None, 100).await.unwrap();
495        assert_eq!(ov.nodes.len(), 2);
496        assert_eq!(ov.edges.len(), 1);
497        assert_eq!(ov.edges[0].relationship_type, "CALLS");
498        assert_eq!(ov.stats.total_nodes, 2);
499        assert_eq!(ov.stats.label_counts.get("Service").copied(), Some(2));
500    }
501    #[tokio::test]
502    async fn neighbors_collects_new_ids() {
503        let p = provider();
504        let exp = p.neighbors(&["a".into()], Direction::Both, true, 50).await.unwrap();
505        assert_eq!(exp.edges.len(), 1);
506        assert_eq!(exp.new_node_ids, vec!["b".to_string()]);
507    }
508}