1use serde::{Deserialize, Serialize};
29use std::collections::{BTreeMap, BTreeSet};
30
31#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
37#[serde(rename_all = "camelCase")]
38pub struct SourceSchema {
39 #[serde(default)]
40 pub nodes: Vec<NodeSchema>,
41 #[serde(default)]
42 pub relations: Vec<RelationSchema>,
43}
44
45impl SourceSchema {
46 pub fn is_empty(&self) -> bool {
48 self.nodes.is_empty() && self.relations.is_empty()
49 }
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
54pub struct NodeSchema {
55 pub label: String,
56 #[serde(default)]
57 pub properties: Vec<PropertySchema>,
58}
59
60impl NodeSchema {
61 pub fn new(label: impl Into<String>) -> Self {
62 Self {
63 label: label.into(),
64 properties: Vec::new(),
65 }
66 }
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
71#[serde(rename_all = "camelCase")]
72pub struct RelationSchema {
73 pub label: String,
74 #[serde(default, skip_serializing_if = "Option::is_none")]
75 pub from: Option<String>,
76 #[serde(default, skip_serializing_if = "Option::is_none")]
77 pub to: Option<String>,
78 #[serde(default)]
79 pub properties: Vec<PropertySchema>,
80}
81
82impl RelationSchema {
83 pub fn new(label: impl Into<String>) -> Self {
84 Self {
85 label: label.into(),
86 from: None,
87 to: None,
88 properties: Vec::new(),
89 }
90 }
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
95#[serde(rename_all = "camelCase")]
96pub struct PropertySchema {
97 pub name: String,
98 #[serde(default, skip_serializing_if = "Option::is_none")]
99 pub data_type: Option<PropertyType>,
100 #[serde(default, skip_serializing_if = "Option::is_none")]
101 pub description: Option<String>,
102}
103
104impl PropertySchema {
105 pub fn new(name: impl Into<String>) -> Self {
106 Self {
107 name: name.into(),
108 data_type: None,
109 description: None,
110 }
111 }
112}
113
114#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
116#[serde(rename_all = "lowercase")]
117pub enum PropertyType {
118 String,
119 Integer,
120 Float,
121 Boolean,
122 Timestamp,
123 Json,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
128#[serde(rename_all = "camelCase")]
129pub struct GraphSchema {
130 #[serde(default)]
131 pub nodes: BTreeMap<String, GraphNodeSchema>,
132 #[serde(default)]
133 pub relations: BTreeMap<String, GraphRelationSchema>,
134 #[serde(default)]
135 pub sources_without_schema: BTreeSet<String>,
136}
137
138impl GraphSchema {
139 pub fn merge_source_schema(&mut self, source_id: &str, schema: &SourceSchema) {
141 for node in &schema.nodes {
142 let entry = self.nodes.entry(node.label.clone()).or_default();
143 entry.sources.insert(source_id.to_string());
144 merge_properties(&mut entry.properties, &node.properties);
145 }
146
147 for relation in &schema.relations {
148 let entry = self.relations.entry(relation.label.clone()).or_default();
149 entry.sources.insert(source_id.to_string());
150
151 if entry.from.is_none() {
152 entry.from = relation.from.clone();
153 } else if entry.from != relation.from && relation.from.is_some() {
154 log::debug!(
155 "Relation '{}': source '{}' reports from={:?}, but existing entry has from={:?}; keeping existing",
156 relation.label, source_id, relation.from, entry.from
157 );
158 }
159 if entry.to.is_none() {
160 entry.to = relation.to.clone();
161 } else if entry.to != relation.to && relation.to.is_some() {
162 log::debug!(
163 "Relation '{}': source '{}' reports to={:?}, but existing entry has to={:?}; keeping existing",
164 relation.label, source_id, relation.to, entry.to
165 );
166 }
167
168 merge_properties(&mut entry.properties, &relation.properties);
169 }
170 }
171
172 pub fn mark_queried_nodes<'a, I>(&mut self, labels: I, query_id: &str)
174 where
175 I: IntoIterator<Item = &'a str>,
176 {
177 for label in labels {
178 let entry = self.nodes.entry(label.to_string()).or_default();
179 entry.queried_by.insert(query_id.to_string());
180 }
181 }
182
183 pub fn mark_queried_relations<'a, I>(&mut self, labels: I, query_id: &str)
185 where
186 I: IntoIterator<Item = &'a str>,
187 {
188 for label in labels {
189 let entry = self.relations.entry(label.to_string()).or_default();
190 entry.queried_by.insert(query_id.to_string());
191 }
192 }
193
194 pub fn record_source_without_schema(&mut self, source_id: &str) {
196 self.sources_without_schema.insert(source_id.to_string());
197 }
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
202#[serde(rename_all = "camelCase")]
203pub struct GraphNodeSchema {
204 #[serde(default)]
205 pub sources: BTreeSet<String>,
206 #[serde(default)]
207 pub queried_by: BTreeSet<String>,
208 #[serde(default)]
209 pub properties: Vec<PropertySchema>,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
214#[serde(rename_all = "camelCase")]
215pub struct GraphRelationSchema {
216 #[serde(default)]
217 pub sources: BTreeSet<String>,
218 #[serde(default)]
219 pub queried_by: BTreeSet<String>,
220 #[serde(default, skip_serializing_if = "Option::is_none")]
221 pub from: Option<String>,
222 #[serde(default, skip_serializing_if = "Option::is_none")]
223 pub to: Option<String>,
224 #[serde(default)]
225 pub properties: Vec<PropertySchema>,
226}
227
228fn merge_properties(target: &mut Vec<PropertySchema>, incoming: &[PropertySchema]) {
229 for property in incoming {
230 if let Some(existing) = target.iter_mut().find(|p| p.name == property.name) {
231 if existing.data_type.is_none() {
232 existing.data_type = property.data_type;
233 }
234 if existing.description.is_none() {
235 existing.description = property.description.clone();
236 }
237 } else {
238 target.push(property.clone());
239 }
240 }
241
242 target.sort_by(|a, b| a.name.cmp(&b.name));
243}
244
245pub fn normalize_table_label(table: &str) -> String {
251 table.rsplit('.').next().unwrap_or(table).to_string()
252}
253
254#[cfg(test)]
255mod tests {
256 use super::*;
257
258 #[test]
259 fn test_source_schema_is_empty() {
260 let schema = SourceSchema::default();
261 assert!(schema.is_empty());
262
263 let schema = SourceSchema {
264 nodes: vec![NodeSchema::new("User")],
265 relations: Vec::new(),
266 };
267 assert!(!schema.is_empty());
268 }
269
270 #[test]
271 fn test_normalize_table_label_strips_schema() {
272 assert_eq!(normalize_table_label("public.users"), "users");
273 assert_eq!(normalize_table_label("dbo.orders"), "orders");
274 assert_eq!(normalize_table_label("orders"), "orders");
275 }
276
277 #[test]
278 fn test_merge_source_schema_nodes() {
279 let mut graph = GraphSchema::default();
280 let source_schema = SourceSchema {
281 nodes: vec![NodeSchema {
282 label: "User".to_string(),
283 properties: vec![PropertySchema {
284 name: "name".to_string(),
285 data_type: Some(PropertyType::String),
286 description: None,
287 }],
288 }],
289 relations: Vec::new(),
290 };
291
292 graph.merge_source_schema("source1", &source_schema);
293
294 assert!(graph.nodes.contains_key("User"));
295 let node = &graph.nodes["User"];
296 assert!(node.sources.contains("source1"));
297 assert_eq!(node.properties.len(), 1);
298 assert_eq!(node.properties[0].name, "name");
299 }
300
301 #[test]
302 fn test_merge_source_schema_deduplicates_properties() {
303 let mut graph = GraphSchema::default();
304 let schema1 = SourceSchema {
305 nodes: vec![NodeSchema {
306 label: "User".to_string(),
307 properties: vec![PropertySchema {
308 name: "age".to_string(),
309 data_type: None,
310 description: None,
311 }],
312 }],
313 relations: Vec::new(),
314 };
315 let schema2 = SourceSchema {
316 nodes: vec![NodeSchema {
317 label: "User".to_string(),
318 properties: vec![PropertySchema {
319 name: "age".to_string(),
320 data_type: Some(PropertyType::Integer),
321 description: Some("User age".to_string()),
322 }],
323 }],
324 relations: Vec::new(),
325 };
326
327 graph.merge_source_schema("s1", &schema1);
328 graph.merge_source_schema("s2", &schema2);
329
330 let node = &graph.nodes["User"];
331 assert_eq!(node.properties.len(), 1);
332 assert_eq!(node.properties[0].data_type, Some(PropertyType::Integer));
334 assert_eq!(node.properties[0].description, Some("User age".to_string()));
335 }
336
337 #[test]
338 fn test_merge_source_schema_relations() {
339 let mut graph = GraphSchema::default();
340 let schema = SourceSchema {
341 nodes: Vec::new(),
342 relations: vec![RelationSchema {
343 label: "KNOWS".to_string(),
344 from: Some("User".to_string()),
345 to: Some("User".to_string()),
346 properties: Vec::new(),
347 }],
348 };
349
350 graph.merge_source_schema("s1", &schema);
351
352 let rel = &graph.relations["KNOWS"];
353 assert_eq!(rel.from, Some("User".to_string()));
354 assert_eq!(rel.to, Some("User".to_string()));
355 }
356
357 #[test]
358 fn test_mark_queried_nodes() {
359 let mut graph = GraphSchema::default();
360 graph.mark_queried_nodes(["User", "Order"].iter().copied(), "q1");
361
362 assert!(graph.nodes["User"].queried_by.contains("q1"));
363 assert!(graph.nodes["Order"].queried_by.contains("q1"));
364 }
365
366 #[test]
367 fn test_mark_queried_relations() {
368 let mut graph = GraphSchema::default();
369 graph.mark_queried_relations(["PLACED"].iter().copied(), "q1");
370
371 assert!(graph.relations["PLACED"].queried_by.contains("q1"));
372 }
373
374 #[test]
375 fn test_record_source_without_schema() {
376 let mut graph = GraphSchema::default();
377 graph.record_source_without_schema("unknown-source");
378 assert!(graph.sources_without_schema.contains("unknown-source"));
379 }
380
381 #[test]
382 fn test_source_schema_serialization_roundtrip() {
383 let schema = SourceSchema {
384 nodes: vec![NodeSchema {
385 label: "Sensor".to_string(),
386 properties: vec![PropertySchema {
387 name: "temperature".to_string(),
388 data_type: Some(PropertyType::Float),
389 description: Some("Celsius".to_string()),
390 }],
391 }],
392 relations: vec![RelationSchema {
393 label: "MEASURES".to_string(),
394 from: Some("Sensor".to_string()),
395 to: Some("Location".to_string()),
396 properties: Vec::new(),
397 }],
398 };
399
400 let json = serde_json::to_string(&schema).unwrap();
401 let deserialized: SourceSchema = serde_json::from_str(&json).unwrap();
402 assert_eq!(schema, deserialized);
403 }
404}