1use std::collections::{HashMap, HashSet, VecDeque};
2
3use path_slash::PathExt as _;
4use petgraph::Direction;
5use petgraph::stable_graph::NodeIndex;
6use petgraph::visit::EdgeRef;
7use serde::Serialize;
8
9use super::types::*;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
13#[serde(rename_all = "lowercase")]
14pub enum ImpactSeverity {
15 Low,
16 Medium,
17 High,
18 Critical,
19}
20
21impl ImpactSeverity {
22 pub fn label(&self) -> &'static str {
23 match self {
24 ImpactSeverity::Low => "low",
25 ImpactSeverity::Medium => "medium",
26 ImpactSeverity::High => "high",
27 ImpactSeverity::Critical => "critical",
28 }
29 }
30}
31
32#[derive(Debug, Clone, Serialize)]
34pub struct ExposurePath {
35 pub exposure: String,
36 pub path: Vec<String>,
37}
38
39#[derive(Debug, Clone, Serialize)]
41pub struct ImpactedNode {
42 pub unique_id: String,
43 pub label: String,
44 pub node_type: String,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub file_path: Option<String>,
47 pub severity: ImpactSeverity,
48 pub distance: usize,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub sql_content: Option<String>,
52}
53
54#[derive(Debug, Clone, Serialize)]
56pub struct ImpactReport {
57 pub source_model: String,
58 pub overall_severity: ImpactSeverity,
59 pub affected_models: usize,
60 pub affected_tests: usize,
61 pub affected_exposures: usize,
62 pub exposure_paths: Vec<ExposurePath>,
63 pub exposure_paths_truncated: bool,
65 pub impacted_nodes: Vec<ImpactedNode>,
66}
67
68pub fn classify_severity(node: &NodeData) -> ImpactSeverity {
70 match node.node_type {
71 NodeType::Exposure => ImpactSeverity::Critical,
72 NodeType::Test => ImpactSeverity::Low,
73 NodeType::Model => {
74 let is_mart = node
76 .materialization
77 .as_deref()
78 .is_some_and(|m| m == "table" || m == "incremental")
79 || node
80 .file_path
81 .as_ref()
82 .is_some_and(|p| p.to_string_lossy().contains("mart"));
83
84 if is_mart {
85 return ImpactSeverity::High;
86 }
87
88 ImpactSeverity::Medium
89 }
90 _ => ImpactSeverity::Medium,
91 }
92}
93
94pub fn compute_impact(graph: &LineageGraph, source_idx: NodeIndex) -> ImpactReport {
96 let source_node = &graph[source_idx];
97 let source_model = source_node.label.clone();
98
99 let mut visited: HashSet<NodeIndex> = HashSet::new();
101 let mut queue: VecDeque<(NodeIndex, usize)> = VecDeque::new();
102 visited.insert(source_idx);
103 queue.push_back((source_idx, 0));
104
105 let mut impacted_nodes: Vec<ImpactedNode> = Vec::new();
106 let mut affected_models = 0usize;
107 let mut affected_tests = 0usize;
108 let mut affected_exposures = 0usize;
109 let mut exposure_indices: Vec<NodeIndex> = Vec::new();
110
111 while let Some((current, distance)) = queue.pop_front() {
112 for edge in graph.edges_directed(current, Direction::Outgoing) {
113 let neighbor = edge.target();
114 if visited.insert(neighbor) {
115 let node = &graph[neighbor];
116 let severity = classify_severity(node);
117 let next_distance = distance + 1;
118
119 match node.node_type {
120 NodeType::Model => affected_models += 1,
121 NodeType::Test => affected_tests += 1,
122 NodeType::Exposure => {
123 affected_exposures += 1;
124 exposure_indices.push(neighbor);
125 }
126 _ => {}
127 }
128
129 impacted_nodes.push(ImpactedNode {
130 unique_id: node.unique_id.clone(),
131 label: node.label.clone(),
132 node_type: node.node_type.label().to_string(),
133 file_path: node
134 .file_path
135 .as_ref()
136 .map(|p| p.to_slash_lossy().into_owned()),
137 severity,
138 distance: next_distance,
139 sql_content: None,
140 });
141
142 queue.push_back((neighbor, next_distance));
143 }
144 }
145 }
146
147 const MAX_PATHS_PER_EXPOSURE: usize = 10;
149 let exposure_set: HashSet<NodeIndex> = exposure_indices.iter().copied().collect();
150 let mut exposure_paths: Vec<ExposurePath> = Vec::new();
151 let mut path_counts: HashMap<NodeIndex, usize> = HashMap::new();
152
153 if !exposure_set.is_empty() {
154 let mut stack: Vec<(NodeIndex, Vec<NodeIndex>, HashSet<NodeIndex>)> =
155 vec![(source_idx, vec![source_idx], HashSet::from([source_idx]))];
156 while let Some((current, path, path_set)) = stack.pop() {
157 if path_counts.len() == exposure_set.len()
159 && path_counts.values().all(|&c| c >= MAX_PATHS_PER_EXPOSURE)
160 {
161 break;
162 }
163 if exposure_set.contains(¤t) {
164 let count = path_counts.entry(current).or_insert(0);
165 if *count < MAX_PATHS_PER_EXPOSURE {
166 *count += 1;
167 exposure_paths.push(ExposurePath {
168 exposure: graph[current].label.clone(),
169 path: path.iter().map(|&idx| graph[idx].label.clone()).collect(),
170 });
171 }
172 continue;
174 }
175 for edge in graph.edges_directed(current, Direction::Outgoing) {
176 let neighbor = edge.target();
177 if !path_set.contains(&neighbor) {
178 let mut new_path = path.clone();
179 new_path.push(neighbor);
180 let mut new_set = path_set.clone();
181 new_set.insert(neighbor);
182 stack.push((neighbor, new_path, new_set));
183 }
184 }
185 }
186 }
187 let exposure_paths_truncated = path_counts
188 .values()
189 .any(|&count| count >= MAX_PATHS_PER_EXPOSURE);
190 exposure_paths.sort_by(|a, b| a.exposure.cmp(&b.exposure).then(a.path.cmp(&b.path)));
191
192 impacted_nodes.sort_by(|a, b| {
194 b.severity
195 .cmp(&a.severity)
196 .then(a.distance.cmp(&b.distance))
197 });
198
199 let overall_severity = impacted_nodes
200 .iter()
201 .map(|n| n.severity)
202 .max()
203 .unwrap_or(ImpactSeverity::Low);
204
205 ImpactReport {
206 source_model,
207 overall_severity,
208 affected_models,
209 affected_tests,
210 affected_exposures,
211 exposure_paths,
212 exposure_paths_truncated,
213 impacted_nodes,
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use std::path::PathBuf;
221
222 fn make_node(
223 unique_id: &str,
224 label: &str,
225 node_type: NodeType,
226 materialization: Option<&str>,
227 file_path: Option<&str>,
228 ) -> NodeData {
229 NodeData {
230 unique_id: unique_id.into(),
231 label: label.into(),
232 node_type,
233 file_path: file_path.map(PathBuf::from),
234 description: None,
235 materialization: materialization.map(|s| s.to_string()),
236 tags: vec![],
237 columns: vec![],
238 exposure: None,
239 }
240 }
241
242 fn make_test_graph() -> (LineageGraph, NodeIndex) {
243 let mut g = LineageGraph::new();
244 let src = g.add_node(make_node(
245 "source.raw.orders",
246 "raw.orders",
247 NodeType::Source,
248 None,
249 None,
250 ));
251 let stg = g.add_node(make_node(
252 "model.stg_orders",
253 "stg_orders",
254 NodeType::Model,
255 Some("view"),
256 Some("models/staging/stg_orders.sql"),
257 ));
258 let mart = g.add_node(make_node(
259 "model.orders",
260 "orders",
261 NodeType::Model,
262 Some("table"),
263 Some("models/marts/orders.sql"),
264 ));
265 let test = g.add_node(make_node(
266 "test.orders_positive",
267 "orders_positive",
268 NodeType::Test,
269 None,
270 None,
271 ));
272 let exp = g.add_node(make_node(
273 "exposure.dashboard",
274 "dashboard",
275 NodeType::Exposure,
276 None,
277 None,
278 ));
279
280 g.add_edge(src, stg, EdgeData::direct(EdgeType::Source));
281 g.add_edge(stg, mart, EdgeData::direct(EdgeType::Ref));
282 g.add_edge(mart, test, EdgeData::direct(EdgeType::Test));
283 g.add_edge(mart, exp, EdgeData::direct(EdgeType::Exposure));
284
285 (g, stg)
286 }
287
288 #[test]
289 fn test_classify_severity_exposure() {
290 let node = make_node("exposure.x", "x", NodeType::Exposure, None, None);
291 assert_eq!(classify_severity(&node), ImpactSeverity::Critical);
292 }
293
294 #[test]
295 fn test_classify_severity_test() {
296 let node = make_node("test.x", "x", NodeType::Test, None, None);
297 assert_eq!(classify_severity(&node), ImpactSeverity::Low);
298 }
299
300 #[test]
301 fn test_classify_severity_mart_table() {
302 let node = make_node(
303 "model.orders",
304 "orders",
305 NodeType::Model,
306 Some("table"),
307 None,
308 );
309 assert_eq!(classify_severity(&node), ImpactSeverity::High);
310 }
311
312 #[test]
313 fn test_classify_severity_mart_incremental() {
314 let node = make_node(
315 "model.orders",
316 "orders",
317 NodeType::Model,
318 Some("incremental"),
319 None,
320 );
321 assert_eq!(classify_severity(&node), ImpactSeverity::High);
322 }
323
324 #[test]
325 fn test_classify_severity_mart_path() {
326 let node = make_node(
327 "model.orders",
328 "orders",
329 NodeType::Model,
330 None,
331 Some("models/marts/orders.sql"),
332 );
333 assert_eq!(classify_severity(&node), ImpactSeverity::High);
334 }
335
336 #[test]
337 fn test_classify_severity_staging() {
338 let node = make_node(
339 "model.stg_orders",
340 "stg_orders",
341 NodeType::Model,
342 Some("view"),
343 Some("models/staging/stg_orders.sql"),
344 );
345 assert_eq!(classify_severity(&node), ImpactSeverity::Medium);
346 }
347
348 #[test]
349 fn test_compute_impact() {
350 let (g, stg) = make_test_graph();
351 let report = compute_impact(&g, stg);
352
353 assert_eq!(report.source_model, "stg_orders");
354 assert_eq!(report.affected_models, 1); assert_eq!(report.affected_tests, 1); assert_eq!(report.affected_exposures, 1); assert_eq!(report.overall_severity, ImpactSeverity::Critical);
358 assert_eq!(report.impacted_nodes.len(), 3);
359
360 assert_eq!(report.exposure_paths.len(), 1);
362 assert_eq!(report.exposure_paths[0].exposure, "dashboard");
363 assert_eq!(
364 report.exposure_paths[0].path,
365 vec!["stg_orders", "orders", "dashboard"]
366 );
367 }
368
369 #[test]
370 fn test_compute_impact_leaf_node() {
371 let (g, _) = make_test_graph();
372 let exp = g
373 .node_indices()
374 .find(|&i| g[i].label == "dashboard")
375 .unwrap();
376 let report = compute_impact(&g, exp);
377
378 assert_eq!(report.source_model, "dashboard");
379 assert_eq!(report.affected_models, 0);
380 assert_eq!(report.affected_tests, 0);
381 assert_eq!(report.affected_exposures, 0);
382 assert!(report.impacted_nodes.is_empty());
383 assert!(report.exposure_paths.is_empty());
384 }
385
386 #[test]
387 fn test_impact_severity_ordering() {
388 assert!(ImpactSeverity::Low < ImpactSeverity::Medium);
389 assert!(ImpactSeverity::Medium < ImpactSeverity::High);
390 assert!(ImpactSeverity::High < ImpactSeverity::Critical);
391 }
392
393 #[test]
394 fn test_impact_isolated_node() {
395 let mut g = LineageGraph::new();
396 let n = g.add_node(make_node("model.x", "x", NodeType::Model, None, None));
397 let report = compute_impact(&g, n);
398 assert_eq!(report.affected_models, 0);
399 assert_eq!(report.affected_tests, 0);
400 assert_eq!(report.affected_exposures, 0);
401 assert!(report.impacted_nodes.is_empty());
402 }
403
404 #[test]
405 fn test_exposure_paths_multiple_exposures() {
406 let mut g = LineageGraph::new();
408 let src = g.add_node(make_node(
409 "model.src",
410 "src",
411 NodeType::Model,
412 Some("view"),
413 None,
414 ));
415 let a = g.add_node(make_node(
416 "model.a",
417 "a",
418 NodeType::Model,
419 Some("view"),
420 None,
421 ));
422 let b = g.add_node(make_node(
423 "model.b",
424 "b",
425 NodeType::Model,
426 Some("view"),
427 None,
428 ));
429 let exp1 = g.add_node(make_node(
430 "exposure.dashboard",
431 "dashboard",
432 NodeType::Exposure,
433 None,
434 None,
435 ));
436 let exp2 = g.add_node(make_node(
437 "exposure.report",
438 "report",
439 NodeType::Exposure,
440 None,
441 None,
442 ));
443
444 g.add_edge(src, a, EdgeData::direct(EdgeType::Ref));
445 g.add_edge(src, b, EdgeData::direct(EdgeType::Ref));
446 g.add_edge(a, exp1, EdgeData::direct(EdgeType::Exposure));
447 g.add_edge(b, exp2, EdgeData::direct(EdgeType::Exposure));
448
449 let report = compute_impact(&g, src);
450 assert_eq!(report.affected_exposures, 2);
451 assert_eq!(report.exposure_paths.len(), 2);
452
453 assert_eq!(report.exposure_paths[0].exposure, "dashboard");
455 assert_eq!(report.exposure_paths[0].path, vec!["src", "a", "dashboard"]);
456 assert_eq!(report.exposure_paths[1].exposure, "report");
457 assert_eq!(report.exposure_paths[1].path, vec!["src", "b", "report"]);
458 }
459
460 #[test]
461 fn test_exposure_paths_diamond_convergent() {
462 let mut g = LineageGraph::new();
465 let src = g.add_node(make_node(
466 "model.src",
467 "src",
468 NodeType::Model,
469 Some("view"),
470 None,
471 ));
472 let a = g.add_node(make_node(
473 "model.a",
474 "a",
475 NodeType::Model,
476 Some("view"),
477 None,
478 ));
479 let b = g.add_node(make_node(
480 "model.b",
481 "b",
482 NodeType::Model,
483 Some("view"),
484 None,
485 ));
486 let c = g.add_node(make_node(
487 "model.c",
488 "c",
489 NodeType::Model,
490 Some("table"),
491 None,
492 ));
493 let exp = g.add_node(make_node(
494 "exposure.dashboard",
495 "dashboard",
496 NodeType::Exposure,
497 None,
498 None,
499 ));
500
501 g.add_edge(src, a, EdgeData::direct(EdgeType::Ref));
502 g.add_edge(src, b, EdgeData::direct(EdgeType::Ref));
503 g.add_edge(a, c, EdgeData::direct(EdgeType::Ref));
504 g.add_edge(b, c, EdgeData::direct(EdgeType::Ref));
505 g.add_edge(c, exp, EdgeData::direct(EdgeType::Exposure));
506
507 let report = compute_impact(&g, src);
508 assert_eq!(report.affected_exposures, 1);
509 assert_eq!(report.exposure_paths.len(), 2);
511 assert_eq!(
512 report.exposure_paths[0].path,
513 vec!["src", "a", "c", "dashboard"]
514 );
515 assert_eq!(
516 report.exposure_paths[1].path,
517 vec!["src", "b", "c", "dashboard"]
518 );
519 }
520
521 #[test]
522 fn test_classify_severity_source_seed_snapshot() {
523 let source = make_node("source.raw.o", "raw.o", NodeType::Source, None, None);
525 assert_eq!(classify_severity(&source), ImpactSeverity::Medium);
526
527 let seed = make_node("seed.countries", "countries", NodeType::Seed, None, None);
528 assert_eq!(classify_severity(&seed), ImpactSeverity::Medium);
529
530 let snap = make_node("snapshot.snap", "snap", NodeType::Snapshot, None, None);
531 assert_eq!(classify_severity(&snap), ImpactSeverity::Medium);
532 }
533}