1use serde::Serialize;
8
9use crate::error::Result;
10use crate::graph::{Edge, GraphStore, Node, NodeId};
11
12#[derive(Debug, Clone, Serialize)]
14pub struct RecordRef<'a> {
15 pub resource: &'a str,
17 pub record_id: &'a str,
19 pub payload: &'a serde_json::Value,
22 pub source: &'a str,
24}
25
26pub async fn ingest_record(store: &dyn GraphStore, record: RecordRef<'_>) -> Result<NodeId> {
28 let id: NodeId = format!("{}:{}", record.resource, record.record_id);
29
30 let mut node = Node::new(id.clone(), record.resource)
31 .with_property("__source", serde_json::json!(record.source));
32
33 let mut pending_edges: Vec<(String, String)> = Vec::new();
34
35 if let Some(obj) = record.payload.as_object() {
36 for (k, v) in obj {
37 if let Some(s) = v.as_str() {
38 if let Some((_resource, _id)) = parse_record_ref(s) {
39 pending_edges.push((k.clone(), s.to_string()));
40 continue;
41 }
42 }
43 node.properties.insert(k.clone(), v.clone());
44 }
45 }
46
47 store.upsert_node(node).await?;
48
49 for (label, target) in pending_edges {
50 if store.get_node(&target).await?.is_none() {
54 if let Some((res, rid)) = parse_record_ref(&target) {
55 let placeholder = Node::new(target.clone(), res)
56 .with_property("__placeholder", serde_json::json!(true))
57 .with_property("__record_id", serde_json::json!(rid));
58 store.upsert_node(placeholder).await?;
59 }
60 }
61 let edge = Edge::new(id.clone(), target, label);
62 store.add_edge(edge).await?;
63 }
64
65 Ok(id)
66}
67
68fn parse_record_ref(s: &str) -> Option<(&str, &str)> {
69 let (resource, record_id) = s.split_once(':')?;
70 if resource.is_empty() || record_id.is_empty() {
71 return None;
72 }
73 if !resource
77 .chars()
78 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
79 {
80 return None;
81 }
82 if record_id.starts_with('/') {
83 return None;
84 }
85 Some((resource, record_id))
86}
87
88#[cfg(test)]
89mod tests {
90 use super::*;
91 use crate::graph::InMemoryGraph;
92 use serde_json::json;
93
94 #[tokio::test]
95 async fn ingest_creates_node_with_properties() {
96 let g = InMemoryGraph::new();
97 let payload = json!({"name": "Rex", "age": 4});
98 ingest_record(
99 &g,
100 RecordRef {
101 resource: "pet",
102 record_id: "1",
103 payload: &payload,
104 source: "petstore",
105 },
106 )
107 .await
108 .unwrap();
109
110 let node = g.get_node(&"pet:1".into()).await.unwrap().unwrap();
111 assert_eq!(node.labels, vec!["pet"]);
112 assert_eq!(node.properties["name"], json!("Rex"));
113 assert_eq!(node.properties["__source"], json!("petstore"));
114 }
115
116 #[tokio::test]
117 async fn record_ref_strings_become_edges() {
118 let g = InMemoryGraph::new();
119 let owner_payload = json!({"name": "Alice"});
120 ingest_record(
121 &g,
122 RecordRef {
123 resource: "owner",
124 record_id: "1",
125 payload: &owner_payload,
126 source: "crm",
127 },
128 )
129 .await
130 .unwrap();
131 let pet_payload = json!({"name": "Rex", "owner": "owner:1"});
132 ingest_record(
133 &g,
134 RecordRef {
135 resource: "pet",
136 record_id: "1",
137 payload: &pet_payload,
138 source: "petstore",
139 },
140 )
141 .await
142 .unwrap();
143
144 let edges = g.edges_from(&"pet:1".into(), Some("owner")).await.unwrap();
145 assert_eq!(edges.len(), 1);
146 assert_eq!(edges[0].to, "owner:1");
147 }
148
149 #[tokio::test]
150 async fn missing_target_is_placeheld() {
151 let g = InMemoryGraph::new();
152 let payload = json!({"owner": "owner:99"});
153 ingest_record(
154 &g,
155 RecordRef {
156 resource: "pet",
157 record_id: "1",
158 payload: &payload,
159 source: "petstore",
160 },
161 )
162 .await
163 .unwrap();
164 let placeholder = g.get_node(&"owner:99".into()).await.unwrap().unwrap();
165 assert_eq!(placeholder.properties["__placeholder"], json!(true));
166 }
167
168 #[test]
169 fn parse_record_ref_rejects_urls_and_times() {
170 assert!(parse_record_ref("https://x").is_none());
171 assert!(parse_record_ref("12:34").is_some()); assert!(parse_record_ref("pet:1").is_some());
173 assert!(parse_record_ref(":1").is_none());
174 assert!(parse_record_ref("pet:").is_none());
175 }
176}