oxigdal_security/lineage/
query.rs1use crate::error::{Result, SecurityError};
4use crate::lineage::{LineageNode, NodeType, graph::LineageGraph};
5use std::collections::HashSet;
6use std::sync::Arc;
7
8pub struct LineageQuery {
10 graph: Arc<LineageGraph>,
11 filters: Vec<QueryFilter>,
12 max_depth: Option<usize>,
13}
14
15impl LineageQuery {
16 pub fn new(graph: Arc<LineageGraph>) -> Self {
18 Self {
19 graph,
20 filters: Vec::new(),
21 max_depth: None,
22 }
23 }
24
25 pub fn filter(mut self, filter: QueryFilter) -> Self {
27 self.filters.push(filter);
28 self
29 }
30
31 pub fn max_depth(mut self, depth: usize) -> Self {
33 self.max_depth = Some(depth);
34 self
35 }
36
37 pub fn ancestors(&self, node_id: &str) -> Result<Vec<LineageNode>> {
39 let mut ancestors = self.graph.get_ancestors(node_id)?;
40 ancestors.retain(|node| self.apply_filters(node));
41
42 if let Some(max_depth) = self.max_depth {
43 ancestors.truncate(max_depth);
44 }
45
46 Ok(ancestors)
47 }
48
49 pub fn descendants(&self, node_id: &str) -> Result<Vec<LineageNode>> {
51 let mut descendants = self.graph.get_descendants(node_id)?;
52 descendants.retain(|node| self.apply_filters(node));
53
54 if let Some(max_depth) = self.max_depth {
55 descendants.truncate(max_depth);
56 }
57
58 Ok(descendants)
59 }
60
61 pub fn path(&self, from_id: &str, to_id: &str) -> Result<Option<Vec<LineageNode>>> {
63 let mut visited = HashSet::new();
64 let mut path = Vec::new();
65
66 if self.find_path(from_id, to_id, &mut visited, &mut path)? {
67 Ok(Some(path))
68 } else {
69 Ok(None)
70 }
71 }
72
73 fn find_path(
74 &self,
75 current_id: &str,
76 target_id: &str,
77 visited: &mut HashSet<String>,
78 path: &mut Vec<LineageNode>,
79 ) -> Result<bool> {
80 if visited.contains(current_id) {
81 return Ok(false);
82 }
83
84 visited.insert(current_id.to_string());
85
86 let current = self
87 .graph
88 .get_node(current_id)
89 .ok_or_else(|| SecurityError::lineage_query("Node not found"))?;
90
91 path.push(current.clone());
92
93 if current_id == target_id {
94 return Ok(true);
95 }
96
97 let downstream = self.graph.get_downstream(current_id)?;
98 for node in downstream {
99 if self.find_path(&node.id, target_id, visited, path)? {
100 return Ok(true);
101 }
102 }
103
104 path.pop();
105 Ok(false)
106 }
107
108 pub fn find_nodes(&self) -> Result<Vec<LineageNode>> {
110 let nodes = Vec::new();
112
113 Ok(nodes)
118 }
119
120 fn apply_filters(&self, node: &LineageNode) -> bool {
121 for filter in &self.filters {
122 if !filter.matches(node) {
123 return false;
124 }
125 }
126 true
127 }
128
129 pub fn common_ancestors(&self, node_ids: &[String]) -> Result<Vec<LineageNode>> {
131 if node_ids.is_empty() {
132 return Ok(Vec::new());
133 }
134
135 let mut common: Option<HashSet<String>> = None;
136
137 for node_id in node_ids {
138 let ancestors = self.graph.get_ancestors(node_id)?;
139 let ancestor_ids: HashSet<String> = ancestors.iter().map(|n| n.id.clone()).collect();
140
141 common = Some(match common {
142 None => ancestor_ids,
143 Some(existing) => existing.intersection(&ancestor_ids).cloned().collect(),
144 });
145 }
146
147 let common_ids = common.unwrap_or_default();
148 let mut result = Vec::new();
149
150 for id in common_ids {
151 if let Some(node) = self.graph.get_node(&id) {
152 if self.apply_filters(&node) {
153 result.push(node);
154 }
155 }
156 }
157
158 Ok(result)
159 }
160}
161
162#[derive(Debug, Clone)]
164pub enum QueryFilter {
165 NodeType(NodeType),
167 EntityIdPattern(String),
169 Metadata(String, String),
171 TimeRange(chrono::DateTime<chrono::Utc>, chrono::DateTime<chrono::Utc>),
173}
174
175impl QueryFilter {
176 pub fn matches(&self, node: &LineageNode) -> bool {
178 match self {
179 QueryFilter::NodeType(node_type) => &node.node_type == node_type,
180 QueryFilter::EntityIdPattern(pattern) => {
181 if pattern.contains('*') {
183 let parts: Vec<&str> = pattern.split('*').collect();
184 if parts.len() == 2 {
185 node.entity_id.starts_with(parts[0]) && node.entity_id.ends_with(parts[1])
186 } else {
187 false
188 }
189 } else {
190 node.entity_id == *pattern
191 }
192 }
193 QueryFilter::Metadata(key, value) => node.metadata.get(key) == Some(value),
194 QueryFilter::TimeRange(start, end) => {
195 node.created_at >= *start && node.created_at <= *end
196 }
197 }
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204 use crate::lineage::{EdgeType, LineageEdge, LineageNode};
205
206 #[test]
207 fn test_query_filter_node_type() {
208 let node = LineageNode::new(NodeType::Dataset, "dataset-1".to_string());
209 let filter = QueryFilter::NodeType(NodeType::Dataset);
210
211 assert!(filter.matches(&node));
212
213 let filter = QueryFilter::NodeType(NodeType::Operation);
214 assert!(!filter.matches(&node));
215 }
216
217 #[test]
218 fn test_query_filter_entity_pattern() {
219 let node = LineageNode::new(NodeType::Dataset, "dataset-123".to_string());
220 let filter = QueryFilter::EntityIdPattern("dataset-*".to_string());
221
222 assert!(filter.matches(&node));
223
224 let filter = QueryFilter::EntityIdPattern("other-*".to_string());
225 assert!(!filter.matches(&node));
226 }
227
228 #[test]
229 fn test_query_filter_metadata() {
230 let node = LineageNode::new(NodeType::Dataset, "dataset-1".to_string())
231 .with_metadata("format".to_string(), "GeoTIFF".to_string());
232
233 let filter = QueryFilter::Metadata("format".to_string(), "GeoTIFF".to_string());
234 assert!(filter.matches(&node));
235
236 let filter = QueryFilter::Metadata("format".to_string(), "PNG".to_string());
237 assert!(!filter.matches(&node));
238 }
239
240 #[test]
241 fn test_lineage_query() {
242 let graph = Arc::new(LineageGraph::new());
243
244 let node1 = LineageNode::new(NodeType::Dataset, "dataset-1".to_string());
245 let node1_id = graph.add_node(node1).expect("Failed to add node");
246
247 let node2 = LineageNode::new(NodeType::Dataset, "dataset-2".to_string());
248 let node2_id = graph.add_node(node2).expect("Failed to add node");
249
250 let edge = LineageEdge::new(node1_id.clone(), node2_id.clone(), EdgeType::DerivedFrom);
251 graph.add_edge(edge).expect("Failed to add edge");
252
253 let query = LineageQuery::new(graph).filter(QueryFilter::NodeType(NodeType::Dataset));
254
255 let ancestors = query
256 .ancestors(&node2_id)
257 .expect("Failed to query ancestors");
258 assert_eq!(ancestors.len(), 1);
259 }
260}