Skip to main content

drasi_lib/
schema.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Schema discovery types for source introspection.
16//!
17//! This module provides types for best-effort schema reporting by sources. Higher
18//! layers (inspection APIs, MCP adapters, LLM-powered tools) consume these types
19//! to understand the shape of the data graph without reverse-engineering
20//! source-specific configuration.
21//!
22//! # Architecture
23//!
24//! - [`SourceSchema`] — per-source schema reported by `Source::describe_schema()`
25//! - [`GraphSchema`] — merged view across all sources and queries (used by inspection APIs)
26//! - [`normalize_table_label`] — utility for deriving node labels from qualified table names
27
28use serde::{Deserialize, Serialize};
29use std::collections::{BTreeMap, BTreeSet};
30
31/// Best-effort schema information reported by a source.
32///
33/// Sources may return this from `Source::describe_schema()` so higher layers
34/// (such as inspection APIs or MCP adapters) can understand the graph shape
35/// without reverse-engineering source-specific configuration.
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
37#[serde(rename_all = "camelCase")]
38pub struct SourceSchema {
39    #[serde(default)]
40    pub nodes: Vec<NodeSchema>,
41    #[serde(default)]
42    pub relations: Vec<RelationSchema>,
43}
44
45impl SourceSchema {
46    /// Returns true when the schema contains no node or relation declarations.
47    pub fn is_empty(&self) -> bool {
48        self.nodes.is_empty() && self.relations.is_empty()
49    }
50}
51
52/// Schema for a single node label.
53#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
54pub struct NodeSchema {
55    pub label: String,
56    #[serde(default)]
57    pub properties: Vec<PropertySchema>,
58}
59
60impl NodeSchema {
61    pub fn new(label: impl Into<String>) -> Self {
62        Self {
63            label: label.into(),
64            properties: Vec::new(),
65        }
66    }
67}
68
69/// Schema for a single relationship label.
70#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
71#[serde(rename_all = "camelCase")]
72pub struct RelationSchema {
73    pub label: String,
74    #[serde(default, skip_serializing_if = "Option::is_none")]
75    pub from: Option<String>,
76    #[serde(default, skip_serializing_if = "Option::is_none")]
77    pub to: Option<String>,
78    #[serde(default)]
79    pub properties: Vec<PropertySchema>,
80}
81
82impl RelationSchema {
83    pub fn new(label: impl Into<String>) -> Self {
84        Self {
85            label: label.into(),
86            from: None,
87            to: None,
88            properties: Vec::new(),
89        }
90    }
91}
92
93/// Schema for a single property.
94#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
95#[serde(rename_all = "camelCase")]
96pub struct PropertySchema {
97    pub name: String,
98    #[serde(default, skip_serializing_if = "Option::is_none")]
99    pub data_type: Option<PropertyType>,
100    #[serde(default, skip_serializing_if = "Option::is_none")]
101    pub description: Option<String>,
102}
103
104impl PropertySchema {
105    pub fn new(name: impl Into<String>) -> Self {
106        Self {
107            name: name.into(),
108            data_type: None,
109            description: None,
110        }
111    }
112}
113
114/// Cross-source property type hints for schema discovery.
115#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
116#[serde(rename_all = "lowercase")]
117pub enum PropertyType {
118    String,
119    Integer,
120    Float,
121    Boolean,
122    Timestamp,
123    Json,
124}
125
126/// Merged graph schema used by inspection APIs and future MCP tools.
127#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
128#[serde(rename_all = "camelCase")]
129pub struct GraphSchema {
130    #[serde(default)]
131    pub nodes: BTreeMap<String, GraphNodeSchema>,
132    #[serde(default)]
133    pub relations: BTreeMap<String, GraphRelationSchema>,
134    #[serde(default)]
135    pub sources_without_schema: BTreeSet<String>,
136}
137
138impl GraphSchema {
139    /// Merge a source-provided schema into the aggregate graph view.
140    pub fn merge_source_schema(&mut self, source_id: &str, schema: &SourceSchema) {
141        for node in &schema.nodes {
142            let entry = self.nodes.entry(node.label.clone()).or_default();
143            entry.sources.insert(source_id.to_string());
144            merge_properties(&mut entry.properties, &node.properties);
145        }
146
147        for relation in &schema.relations {
148            let entry = self.relations.entry(relation.label.clone()).or_default();
149            entry.sources.insert(source_id.to_string());
150
151            if entry.from.is_none() {
152                entry.from = relation.from.clone();
153            } else if entry.from != relation.from && relation.from.is_some() {
154                log::debug!(
155                    "Relation '{}': source '{}' reports from={:?}, but existing entry has from={:?}; keeping existing",
156                    relation.label, source_id, relation.from, entry.from
157                );
158            }
159            if entry.to.is_none() {
160                entry.to = relation.to.clone();
161            } else if entry.to != relation.to && relation.to.is_some() {
162                log::debug!(
163                    "Relation '{}': source '{}' reports to={:?}, but existing entry has to={:?}; keeping existing",
164                    relation.label, source_id, relation.to, entry.to
165                );
166            }
167
168            merge_properties(&mut entry.properties, &relation.properties);
169        }
170    }
171
172    /// Mark node labels as being referenced by a query.
173    pub fn mark_queried_nodes<'a, I>(&mut self, labels: I, query_id: &str)
174    where
175        I: IntoIterator<Item = &'a str>,
176    {
177        for label in labels {
178            let entry = self.nodes.entry(label.to_string()).or_default();
179            entry.queried_by.insert(query_id.to_string());
180        }
181    }
182
183    /// Mark relationship labels as being referenced by a query.
184    pub fn mark_queried_relations<'a, I>(&mut self, labels: I, query_id: &str)
185    where
186        I: IntoIterator<Item = &'a str>,
187    {
188        for label in labels {
189            let entry = self.relations.entry(label.to_string()).or_default();
190            entry.queried_by.insert(query_id.to_string());
191        }
192    }
193
194    /// Record a source that exists but could not describe its schema.
195    pub fn record_source_without_schema(&mut self, source_id: &str) {
196        self.sources_without_schema.insert(source_id.to_string());
197    }
198}
199
200/// Aggregated node schema across one or more sources.
201#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
202#[serde(rename_all = "camelCase")]
203pub struct GraphNodeSchema {
204    #[serde(default)]
205    pub sources: BTreeSet<String>,
206    #[serde(default)]
207    pub queried_by: BTreeSet<String>,
208    #[serde(default)]
209    pub properties: Vec<PropertySchema>,
210}
211
212/// Aggregated relationship schema across one or more sources.
213#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
214#[serde(rename_all = "camelCase")]
215pub struct GraphRelationSchema {
216    #[serde(default)]
217    pub sources: BTreeSet<String>,
218    #[serde(default)]
219    pub queried_by: BTreeSet<String>,
220    #[serde(default, skip_serializing_if = "Option::is_none")]
221    pub from: Option<String>,
222    #[serde(default, skip_serializing_if = "Option::is_none")]
223    pub to: Option<String>,
224    #[serde(default)]
225    pub properties: Vec<PropertySchema>,
226}
227
228fn merge_properties(target: &mut Vec<PropertySchema>, incoming: &[PropertySchema]) {
229    for property in incoming {
230        if let Some(existing) = target.iter_mut().find(|p| p.name == property.name) {
231            if existing.data_type.is_none() {
232                existing.data_type = property.data_type;
233            }
234            if existing.description.is_none() {
235                existing.description = property.description.clone();
236            }
237        } else {
238            target.push(property.clone());
239        }
240    }
241
242    target.sort_by(|a, b| a.name.cmp(&b.name));
243}
244
245/// Strip an optional schema prefix from a qualified table name to derive a node label.
246///
247/// For example, `"public.users"` becomes `"users"` and `"orders"` stays `"orders"`.
248/// This is used by database sources (Postgres, MSSQL) when converting configured
249/// table names into graph node labels.
250pub fn normalize_table_label(table: &str) -> String {
251    table.rsplit('.').next().unwrap_or(table).to_string()
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257
258    #[test]
259    fn test_source_schema_is_empty() {
260        let schema = SourceSchema::default();
261        assert!(schema.is_empty());
262
263        let schema = SourceSchema {
264            nodes: vec![NodeSchema::new("User")],
265            relations: Vec::new(),
266        };
267        assert!(!schema.is_empty());
268    }
269
270    #[test]
271    fn test_normalize_table_label_strips_schema() {
272        assert_eq!(normalize_table_label("public.users"), "users");
273        assert_eq!(normalize_table_label("dbo.orders"), "orders");
274        assert_eq!(normalize_table_label("orders"), "orders");
275    }
276
277    #[test]
278    fn test_merge_source_schema_nodes() {
279        let mut graph = GraphSchema::default();
280        let source_schema = SourceSchema {
281            nodes: vec![NodeSchema {
282                label: "User".to_string(),
283                properties: vec![PropertySchema {
284                    name: "name".to_string(),
285                    data_type: Some(PropertyType::String),
286                    description: None,
287                }],
288            }],
289            relations: Vec::new(),
290        };
291
292        graph.merge_source_schema("source1", &source_schema);
293
294        assert!(graph.nodes.contains_key("User"));
295        let node = &graph.nodes["User"];
296        assert!(node.sources.contains("source1"));
297        assert_eq!(node.properties.len(), 1);
298        assert_eq!(node.properties[0].name, "name");
299    }
300
301    #[test]
302    fn test_merge_source_schema_deduplicates_properties() {
303        let mut graph = GraphSchema::default();
304        let schema1 = SourceSchema {
305            nodes: vec![NodeSchema {
306                label: "User".to_string(),
307                properties: vec![PropertySchema {
308                    name: "age".to_string(),
309                    data_type: None,
310                    description: None,
311                }],
312            }],
313            relations: Vec::new(),
314        };
315        let schema2 = SourceSchema {
316            nodes: vec![NodeSchema {
317                label: "User".to_string(),
318                properties: vec![PropertySchema {
319                    name: "age".to_string(),
320                    data_type: Some(PropertyType::Integer),
321                    description: Some("User age".to_string()),
322                }],
323            }],
324            relations: Vec::new(),
325        };
326
327        graph.merge_source_schema("s1", &schema1);
328        graph.merge_source_schema("s2", &schema2);
329
330        let node = &graph.nodes["User"];
331        assert_eq!(node.properties.len(), 1);
332        // Second merge promotes type
333        assert_eq!(node.properties[0].data_type, Some(PropertyType::Integer));
334        assert_eq!(node.properties[0].description, Some("User age".to_string()));
335    }
336
337    #[test]
338    fn test_merge_source_schema_relations() {
339        let mut graph = GraphSchema::default();
340        let schema = SourceSchema {
341            nodes: Vec::new(),
342            relations: vec![RelationSchema {
343                label: "KNOWS".to_string(),
344                from: Some("User".to_string()),
345                to: Some("User".to_string()),
346                properties: Vec::new(),
347            }],
348        };
349
350        graph.merge_source_schema("s1", &schema);
351
352        let rel = &graph.relations["KNOWS"];
353        assert_eq!(rel.from, Some("User".to_string()));
354        assert_eq!(rel.to, Some("User".to_string()));
355    }
356
357    #[test]
358    fn test_mark_queried_nodes() {
359        let mut graph = GraphSchema::default();
360        graph.mark_queried_nodes(["User", "Order"].iter().copied(), "q1");
361
362        assert!(graph.nodes["User"].queried_by.contains("q1"));
363        assert!(graph.nodes["Order"].queried_by.contains("q1"));
364    }
365
366    #[test]
367    fn test_mark_queried_relations() {
368        let mut graph = GraphSchema::default();
369        graph.mark_queried_relations(["PLACED"].iter().copied(), "q1");
370
371        assert!(graph.relations["PLACED"].queried_by.contains("q1"));
372    }
373
374    #[test]
375    fn test_record_source_without_schema() {
376        let mut graph = GraphSchema::default();
377        graph.record_source_without_schema("unknown-source");
378        assert!(graph.sources_without_schema.contains("unknown-source"));
379    }
380
381    #[test]
382    fn test_source_schema_serialization_roundtrip() {
383        let schema = SourceSchema {
384            nodes: vec![NodeSchema {
385                label: "Sensor".to_string(),
386                properties: vec![PropertySchema {
387                    name: "temperature".to_string(),
388                    data_type: Some(PropertyType::Float),
389                    description: Some("Celsius".to_string()),
390                }],
391            }],
392            relations: vec![RelationSchema {
393                label: "MEASURES".to_string(),
394                from: Some("Sensor".to_string()),
395                to: Some("Location".to_string()),
396                properties: Vec::new(),
397            }],
398        };
399
400        let json = serde_json::to_string(&schema).unwrap();
401        let deserialized: SourceSchema = serde_json::from_str(&json).unwrap();
402        assert_eq!(schema, deserialized);
403    }
404}