Skip to main content

oxigdal_security/lineage/
query.rs

1//! Lineage query API.
2
3use crate::error::{Result, SecurityError};
4use crate::lineage::{LineageNode, NodeType, graph::LineageGraph};
5use std::collections::HashSet;
6use std::sync::Arc;
7
8/// Lineage query builder.
9pub struct LineageQuery {
10    graph: Arc<LineageGraph>,
11    filters: Vec<QueryFilter>,
12    max_depth: Option<usize>,
13}
14
15impl LineageQuery {
16    /// Create a new lineage query.
17    pub fn new(graph: Arc<LineageGraph>) -> Self {
18        Self {
19            graph,
20            filters: Vec::new(),
21            max_depth: None,
22        }
23    }
24
25    /// Add a filter.
26    pub fn filter(mut self, filter: QueryFilter) -> Self {
27        self.filters.push(filter);
28        self
29    }
30
31    /// Set maximum traversal depth.
32    pub fn max_depth(mut self, depth: usize) -> Self {
33        self.max_depth = Some(depth);
34        self
35    }
36
37    /// Find all ancestors of a node.
38    pub fn ancestors(&self, node_id: &str) -> Result<Vec<LineageNode>> {
39        let mut ancestors = self.graph.get_ancestors(node_id)?;
40        ancestors.retain(|node| self.apply_filters(node));
41
42        if let Some(max_depth) = self.max_depth {
43            ancestors.truncate(max_depth);
44        }
45
46        Ok(ancestors)
47    }
48
49    /// Find all descendants of a node.
50    pub fn descendants(&self, node_id: &str) -> Result<Vec<LineageNode>> {
51        let mut descendants = self.graph.get_descendants(node_id)?;
52        descendants.retain(|node| self.apply_filters(node));
53
54        if let Some(max_depth) = self.max_depth {
55            descendants.truncate(max_depth);
56        }
57
58        Ok(descendants)
59    }
60
61    /// Find path between two nodes.
62    pub fn path(&self, from_id: &str, to_id: &str) -> Result<Option<Vec<LineageNode>>> {
63        let mut visited = HashSet::new();
64        let mut path = Vec::new();
65
66        if self.find_path(from_id, to_id, &mut visited, &mut path)? {
67            Ok(Some(path))
68        } else {
69            Ok(None)
70        }
71    }
72
73    fn find_path(
74        &self,
75        current_id: &str,
76        target_id: &str,
77        visited: &mut HashSet<String>,
78        path: &mut Vec<LineageNode>,
79    ) -> Result<bool> {
80        if visited.contains(current_id) {
81            return Ok(false);
82        }
83
84        visited.insert(current_id.to_string());
85
86        let current = self
87            .graph
88            .get_node(current_id)
89            .ok_or_else(|| SecurityError::lineage_query("Node not found"))?;
90
91        path.push(current.clone());
92
93        if current_id == target_id {
94            return Ok(true);
95        }
96
97        let downstream = self.graph.get_downstream(current_id)?;
98        for node in downstream {
99            if self.find_path(&node.id, target_id, visited, path)? {
100                return Ok(true);
101            }
102        }
103
104        path.pop();
105        Ok(false)
106    }
107
108    /// Find all nodes matching a filter.
109    pub fn find_nodes(&self) -> Result<Vec<LineageNode>> {
110        // Get all nodes by iterating through entity index
111        let nodes = Vec::new();
112
113        // This is a simplification - in a real implementation,
114        // we would need a way to iterate all nodes efficiently
115        // For now, this shows the API design
116
117        Ok(nodes)
118    }
119
120    fn apply_filters(&self, node: &LineageNode) -> bool {
121        for filter in &self.filters {
122            if !filter.matches(node) {
123                return false;
124            }
125        }
126        true
127    }
128
129    /// Find common ancestors of multiple nodes.
130    pub fn common_ancestors(&self, node_ids: &[String]) -> Result<Vec<LineageNode>> {
131        if node_ids.is_empty() {
132            return Ok(Vec::new());
133        }
134
135        let mut common: Option<HashSet<String>> = None;
136
137        for node_id in node_ids {
138            let ancestors = self.graph.get_ancestors(node_id)?;
139            let ancestor_ids: HashSet<String> = ancestors.iter().map(|n| n.id.clone()).collect();
140
141            common = Some(match common {
142                None => ancestor_ids,
143                Some(existing) => existing.intersection(&ancestor_ids).cloned().collect(),
144            });
145        }
146
147        let common_ids = common.unwrap_or_default();
148        let mut result = Vec::new();
149
150        for id in common_ids {
151            if let Some(node) = self.graph.get_node(&id) {
152                if self.apply_filters(&node) {
153                    result.push(node);
154                }
155            }
156        }
157
158        Ok(result)
159    }
160}
161
162/// Query filter.
163#[derive(Debug, Clone)]
164pub enum QueryFilter {
165    /// Filter by node type.
166    NodeType(NodeType),
167    /// Filter by entity ID pattern.
168    EntityIdPattern(String),
169    /// Filter by metadata key-value.
170    Metadata(String, String),
171    /// Filter by creation time range.
172    TimeRange(chrono::DateTime<chrono::Utc>, chrono::DateTime<chrono::Utc>),
173}
174
175impl QueryFilter {
176    /// Check if node matches filter.
177    pub fn matches(&self, node: &LineageNode) -> bool {
178        match self {
179            QueryFilter::NodeType(node_type) => &node.node_type == node_type,
180            QueryFilter::EntityIdPattern(pattern) => {
181                // Simple pattern matching
182                if pattern.contains('*') {
183                    let parts: Vec<&str> = pattern.split('*').collect();
184                    if parts.len() == 2 {
185                        node.entity_id.starts_with(parts[0]) && node.entity_id.ends_with(parts[1])
186                    } else {
187                        false
188                    }
189                } else {
190                    node.entity_id == *pattern
191                }
192            }
193            QueryFilter::Metadata(key, value) => node.metadata.get(key) == Some(value),
194            QueryFilter::TimeRange(start, end) => {
195                node.created_at >= *start && node.created_at <= *end
196            }
197        }
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204    use crate::lineage::{EdgeType, LineageEdge, LineageNode};
205
206    #[test]
207    fn test_query_filter_node_type() {
208        let node = LineageNode::new(NodeType::Dataset, "dataset-1".to_string());
209        let filter = QueryFilter::NodeType(NodeType::Dataset);
210
211        assert!(filter.matches(&node));
212
213        let filter = QueryFilter::NodeType(NodeType::Operation);
214        assert!(!filter.matches(&node));
215    }
216
217    #[test]
218    fn test_query_filter_entity_pattern() {
219        let node = LineageNode::new(NodeType::Dataset, "dataset-123".to_string());
220        let filter = QueryFilter::EntityIdPattern("dataset-*".to_string());
221
222        assert!(filter.matches(&node));
223
224        let filter = QueryFilter::EntityIdPattern("other-*".to_string());
225        assert!(!filter.matches(&node));
226    }
227
228    #[test]
229    fn test_query_filter_metadata() {
230        let node = LineageNode::new(NodeType::Dataset, "dataset-1".to_string())
231            .with_metadata("format".to_string(), "GeoTIFF".to_string());
232
233        let filter = QueryFilter::Metadata("format".to_string(), "GeoTIFF".to_string());
234        assert!(filter.matches(&node));
235
236        let filter = QueryFilter::Metadata("format".to_string(), "PNG".to_string());
237        assert!(!filter.matches(&node));
238    }
239
240    #[test]
241    fn test_lineage_query() {
242        let graph = Arc::new(LineageGraph::new());
243
244        let node1 = LineageNode::new(NodeType::Dataset, "dataset-1".to_string());
245        let node1_id = graph.add_node(node1).expect("Failed to add node");
246
247        let node2 = LineageNode::new(NodeType::Dataset, "dataset-2".to_string());
248        let node2_id = graph.add_node(node2).expect("Failed to add node");
249
250        let edge = LineageEdge::new(node1_id.clone(), node2_id.clone(), EdgeType::DerivedFrom);
251        graph.add_edge(edge).expect("Failed to add edge");
252
253        let query = LineageQuery::new(graph).filter(QueryFilter::NodeType(NodeType::Dataset));
254
255        let ancestors = query
256            .ancestors(&node2_id)
257            .expect("Failed to query ancestors");
258        assert_eq!(ancestors.len(), 1);
259    }
260}