1use std::collections::{HashMap, HashSet, VecDeque};
2
3use path_slash::PathExt as _;
4use petgraph::Direction;
5use petgraph::stable_graph::NodeIndex;
6use petgraph::visit::EdgeRef;
7use serde::Serialize;
8
9use super::types::*;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
13#[serde(rename_all = "lowercase")]
14pub enum ImpactSeverity {
15 Low,
16 Medium,
17 High,
18 Critical,
19}
20
21impl ImpactSeverity {
22 pub fn label(&self) -> &'static str {
23 match self {
24 ImpactSeverity::Low => "low",
25 ImpactSeverity::Medium => "medium",
26 ImpactSeverity::High => "high",
27 ImpactSeverity::Critical => "critical",
28 }
29 }
30}
31
32#[derive(Debug, Clone, Serialize)]
34pub struct ExposurePath {
35 pub exposure: String,
36 pub path: Vec<String>,
37}
38
39#[derive(Debug, Clone, Serialize)]
41pub struct ImpactedNode {
42 pub unique_id: String,
43 pub label: String,
44 pub node_type: String,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub file_path: Option<String>,
47 pub severity: ImpactSeverity,
48 pub distance: usize,
50 #[serde(skip_serializing_if = "Option::is_none")]
51 pub sql_content: Option<String>,
52}
53
54#[derive(Debug, Clone, Serialize)]
56pub struct ImpactReport {
57 pub source_model: String,
58 pub overall_severity: ImpactSeverity,
59 pub affected_models: usize,
60 pub affected_tests: usize,
61 pub affected_exposures: usize,
62 pub exposure_paths: Vec<ExposurePath>,
63 pub exposure_paths_truncated: bool,
65 pub impacted_nodes: Vec<ImpactedNode>,
66}
67
68pub fn classify_severity(node: &NodeData) -> ImpactSeverity {
70 match node.node_type {
71 NodeType::Exposure => ImpactSeverity::Critical,
72 NodeType::Test => ImpactSeverity::Low,
73 NodeType::Model => {
74 let is_mart = node
76 .materialization
77 .as_deref()
78 .is_some_and(|m| m == "table" || m == "incremental")
79 || node
80 .file_path
81 .as_ref()
82 .is_some_and(|p| p.to_string_lossy().contains("mart"));
83
84 if is_mart {
85 return ImpactSeverity::High;
86 }
87
88 ImpactSeverity::Medium
89 }
90 _ => ImpactSeverity::Medium,
91 }
92}
93
94pub fn compute_impact(graph: &LineageGraph, source_idx: NodeIndex) -> ImpactReport {
96 let source_node = &graph[source_idx];
97 let source_model = source_node.label.clone();
98
99 let mut visited: HashSet<NodeIndex> = HashSet::new();
101 let mut queue: VecDeque<(NodeIndex, usize)> = VecDeque::new();
102 visited.insert(source_idx);
103 queue.push_back((source_idx, 0));
104
105 let mut impacted_nodes: Vec<ImpactedNode> = Vec::new();
106 let mut affected_models = 0usize;
107 let mut affected_tests = 0usize;
108 let mut affected_exposures = 0usize;
109 let mut exposure_indices: Vec<NodeIndex> = Vec::new();
110
111 while let Some((current, distance)) = queue.pop_front() {
112 for edge in graph.edges_directed(current, Direction::Outgoing) {
113 let neighbor = edge.target();
114 if visited.insert(neighbor) {
115 let node = &graph[neighbor];
116 let severity = classify_severity(node);
117 let next_distance = distance + 1;
118
119 match node.node_type {
120 NodeType::Model => affected_models += 1,
121 NodeType::Test => affected_tests += 1,
122 NodeType::Exposure => {
123 affected_exposures += 1;
124 exposure_indices.push(neighbor);
125 }
126 _ => {}
127 }
128
129 impacted_nodes.push(ImpactedNode {
130 unique_id: node.unique_id.clone(),
131 label: node.label.clone(),
132 node_type: node.node_type.label().to_string(),
133 file_path: node
134 .file_path
135 .as_ref()
136 .map(|p| p.to_slash_lossy().into_owned()),
137 severity,
138 distance: next_distance,
139 sql_content: None,
140 });
141
142 queue.push_back((neighbor, next_distance));
143 }
144 }
145 }
146
147 const MAX_PATHS_PER_EXPOSURE: usize = 10;
149 let exposure_set: HashSet<NodeIndex> = exposure_indices.iter().copied().collect();
150 let mut exposure_paths: Vec<ExposurePath> = Vec::new();
151 let mut path_counts: HashMap<NodeIndex, usize> = HashMap::new();
152
153 if !exposure_set.is_empty() {
154 let mut stack: Vec<(NodeIndex, Vec<NodeIndex>, HashSet<NodeIndex>)> =
155 vec![(source_idx, vec![source_idx], HashSet::from([source_idx]))];
156 while let Some((current, path, path_set)) = stack.pop() {
157 if path_counts.len() == exposure_set.len()
159 && path_counts.values().all(|&c| c >= MAX_PATHS_PER_EXPOSURE)
160 {
161 break;
162 }
163 if exposure_set.contains(¤t) {
164 let count = path_counts.entry(current).or_insert(0);
165 if *count < MAX_PATHS_PER_EXPOSURE {
166 *count += 1;
167 exposure_paths.push(ExposurePath {
168 exposure: graph[current].label.clone(),
169 path: path.iter().map(|&idx| graph[idx].label.clone()).collect(),
170 });
171 }
172 continue;
174 }
175 for edge in graph.edges_directed(current, Direction::Outgoing) {
176 let neighbor = edge.target();
177 if !path_set.contains(&neighbor) {
178 let mut new_path = path.clone();
179 new_path.push(neighbor);
180 let mut new_set = path_set.clone();
181 new_set.insert(neighbor);
182 stack.push((neighbor, new_path, new_set));
183 }
184 }
185 }
186 }
187 let exposure_paths_truncated = path_counts
188 .values()
189 .any(|&count| count >= MAX_PATHS_PER_EXPOSURE);
190 exposure_paths.sort_by(|a, b| a.exposure.cmp(&b.exposure).then(a.path.cmp(&b.path)));
191
192 impacted_nodes.sort_by(|a, b| {
194 b.severity
195 .cmp(&a.severity)
196 .then(a.distance.cmp(&b.distance))
197 });
198
199 let overall_severity = impacted_nodes
200 .iter()
201 .map(|n| n.severity)
202 .max()
203 .unwrap_or(ImpactSeverity::Low);
204
205 ImpactReport {
206 source_model,
207 overall_severity,
208 affected_models,
209 affected_tests,
210 affected_exposures,
211 exposure_paths,
212 exposure_paths_truncated,
213 impacted_nodes,
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use std::path::PathBuf;
221
222 fn make_node(
223 unique_id: &str,
224 label: &str,
225 node_type: NodeType,
226 materialization: Option<&str>,
227 file_path: Option<&str>,
228 ) -> NodeData {
229 NodeData {
230 unique_id: unique_id.into(),
231 label: label.into(),
232 node_type,
233 file_path: file_path.map(PathBuf::from),
234 description: None,
235 materialization: materialization.map(|s| s.to_string()),
236 tags: vec![],
237 columns: vec![],
238 exposure: None,
239 aliases: vec![],
240 }
241 }
242
243 fn make_test_graph() -> (LineageGraph, NodeIndex) {
244 let mut g = LineageGraph::new();
245 let src = g.add_node(make_node(
246 "source.raw.orders",
247 "raw.orders",
248 NodeType::Source,
249 None,
250 None,
251 ));
252 let stg = g.add_node(make_node(
253 "model.stg_orders",
254 "stg_orders",
255 NodeType::Model,
256 Some("view"),
257 Some("models/staging/stg_orders.sql"),
258 ));
259 let mart = g.add_node(make_node(
260 "model.orders",
261 "orders",
262 NodeType::Model,
263 Some("table"),
264 Some("models/marts/orders.sql"),
265 ));
266 let test = g.add_node(make_node(
267 "test.orders_positive",
268 "orders_positive",
269 NodeType::Test,
270 None,
271 None,
272 ));
273 let exp = g.add_node(make_node(
274 "exposure.dashboard",
275 "dashboard",
276 NodeType::Exposure,
277 None,
278 None,
279 ));
280
281 g.add_edge(src, stg, EdgeData::direct(EdgeType::Source));
282 g.add_edge(stg, mart, EdgeData::direct(EdgeType::Ref));
283 g.add_edge(mart, test, EdgeData::direct(EdgeType::Test));
284 g.add_edge(mart, exp, EdgeData::direct(EdgeType::Exposure));
285
286 (g, stg)
287 }
288
289 #[test]
290 fn test_classify_severity_exposure() {
291 let node = make_node("exposure.x", "x", NodeType::Exposure, None, None);
292 assert_eq!(classify_severity(&node), ImpactSeverity::Critical);
293 }
294
295 #[test]
296 fn test_classify_severity_test() {
297 let node = make_node("test.x", "x", NodeType::Test, None, None);
298 assert_eq!(classify_severity(&node), ImpactSeverity::Low);
299 }
300
301 #[test]
302 fn test_classify_severity_mart_table() {
303 let node = make_node(
304 "model.orders",
305 "orders",
306 NodeType::Model,
307 Some("table"),
308 None,
309 );
310 assert_eq!(classify_severity(&node), ImpactSeverity::High);
311 }
312
313 #[test]
314 fn test_classify_severity_mart_incremental() {
315 let node = make_node(
316 "model.orders",
317 "orders",
318 NodeType::Model,
319 Some("incremental"),
320 None,
321 );
322 assert_eq!(classify_severity(&node), ImpactSeverity::High);
323 }
324
325 #[test]
326 fn test_classify_severity_mart_path() {
327 let node = make_node(
328 "model.orders",
329 "orders",
330 NodeType::Model,
331 None,
332 Some("models/marts/orders.sql"),
333 );
334 assert_eq!(classify_severity(&node), ImpactSeverity::High);
335 }
336
337 #[test]
338 fn test_classify_severity_staging() {
339 let node = make_node(
340 "model.stg_orders",
341 "stg_orders",
342 NodeType::Model,
343 Some("view"),
344 Some("models/staging/stg_orders.sql"),
345 );
346 assert_eq!(classify_severity(&node), ImpactSeverity::Medium);
347 }
348
349 #[test]
350 fn test_compute_impact() {
351 let (g, stg) = make_test_graph();
352 let report = compute_impact(&g, stg);
353
354 assert_eq!(report.source_model, "stg_orders");
355 assert_eq!(report.affected_models, 1); assert_eq!(report.affected_tests, 1); assert_eq!(report.affected_exposures, 1); assert_eq!(report.overall_severity, ImpactSeverity::Critical);
359 assert_eq!(report.impacted_nodes.len(), 3);
360
361 assert_eq!(report.exposure_paths.len(), 1);
363 assert_eq!(report.exposure_paths[0].exposure, "dashboard");
364 assert_eq!(
365 report.exposure_paths[0].path,
366 vec!["stg_orders", "orders", "dashboard"]
367 );
368 }
369
370 #[test]
371 fn test_compute_impact_leaf_node() {
372 let (g, _) = make_test_graph();
373 let exp = g
374 .node_indices()
375 .find(|&i| g[i].label == "dashboard")
376 .unwrap();
377 let report = compute_impact(&g, exp);
378
379 assert_eq!(report.source_model, "dashboard");
380 assert_eq!(report.affected_models, 0);
381 assert_eq!(report.affected_tests, 0);
382 assert_eq!(report.affected_exposures, 0);
383 assert!(report.impacted_nodes.is_empty());
384 assert!(report.exposure_paths.is_empty());
385 }
386
387 #[test]
388 fn test_impact_severity_ordering() {
389 assert!(ImpactSeverity::Low < ImpactSeverity::Medium);
390 assert!(ImpactSeverity::Medium < ImpactSeverity::High);
391 assert!(ImpactSeverity::High < ImpactSeverity::Critical);
392 }
393
394 #[test]
395 fn test_impact_isolated_node() {
396 let mut g = LineageGraph::new();
397 let n = g.add_node(make_node("model.x", "x", NodeType::Model, None, None));
398 let report = compute_impact(&g, n);
399 assert_eq!(report.affected_models, 0);
400 assert_eq!(report.affected_tests, 0);
401 assert_eq!(report.affected_exposures, 0);
402 assert!(report.impacted_nodes.is_empty());
403 }
404
405 #[test]
406 fn test_exposure_paths_multiple_exposures() {
407 let mut g = LineageGraph::new();
409 let src = g.add_node(make_node(
410 "model.src",
411 "src",
412 NodeType::Model,
413 Some("view"),
414 None,
415 ));
416 let a = g.add_node(make_node(
417 "model.a",
418 "a",
419 NodeType::Model,
420 Some("view"),
421 None,
422 ));
423 let b = g.add_node(make_node(
424 "model.b",
425 "b",
426 NodeType::Model,
427 Some("view"),
428 None,
429 ));
430 let exp1 = g.add_node(make_node(
431 "exposure.dashboard",
432 "dashboard",
433 NodeType::Exposure,
434 None,
435 None,
436 ));
437 let exp2 = g.add_node(make_node(
438 "exposure.report",
439 "report",
440 NodeType::Exposure,
441 None,
442 None,
443 ));
444
445 g.add_edge(src, a, EdgeData::direct(EdgeType::Ref));
446 g.add_edge(src, b, EdgeData::direct(EdgeType::Ref));
447 g.add_edge(a, exp1, EdgeData::direct(EdgeType::Exposure));
448 g.add_edge(b, exp2, EdgeData::direct(EdgeType::Exposure));
449
450 let report = compute_impact(&g, src);
451 assert_eq!(report.affected_exposures, 2);
452 assert_eq!(report.exposure_paths.len(), 2);
453
454 assert_eq!(report.exposure_paths[0].exposure, "dashboard");
456 assert_eq!(report.exposure_paths[0].path, vec!["src", "a", "dashboard"]);
457 assert_eq!(report.exposure_paths[1].exposure, "report");
458 assert_eq!(report.exposure_paths[1].path, vec!["src", "b", "report"]);
459 }
460
461 #[test]
462 fn test_exposure_paths_diamond_convergent() {
463 let mut g = LineageGraph::new();
466 let src = g.add_node(make_node(
467 "model.src",
468 "src",
469 NodeType::Model,
470 Some("view"),
471 None,
472 ));
473 let a = g.add_node(make_node(
474 "model.a",
475 "a",
476 NodeType::Model,
477 Some("view"),
478 None,
479 ));
480 let b = g.add_node(make_node(
481 "model.b",
482 "b",
483 NodeType::Model,
484 Some("view"),
485 None,
486 ));
487 let c = g.add_node(make_node(
488 "model.c",
489 "c",
490 NodeType::Model,
491 Some("table"),
492 None,
493 ));
494 let exp = g.add_node(make_node(
495 "exposure.dashboard",
496 "dashboard",
497 NodeType::Exposure,
498 None,
499 None,
500 ));
501
502 g.add_edge(src, a, EdgeData::direct(EdgeType::Ref));
503 g.add_edge(src, b, EdgeData::direct(EdgeType::Ref));
504 g.add_edge(a, c, EdgeData::direct(EdgeType::Ref));
505 g.add_edge(b, c, EdgeData::direct(EdgeType::Ref));
506 g.add_edge(c, exp, EdgeData::direct(EdgeType::Exposure));
507
508 let report = compute_impact(&g, src);
509 assert_eq!(report.affected_exposures, 1);
510 assert_eq!(report.exposure_paths.len(), 2);
512 assert_eq!(
513 report.exposure_paths[0].path,
514 vec!["src", "a", "c", "dashboard"]
515 );
516 assert_eq!(
517 report.exposure_paths[1].path,
518 vec!["src", "b", "c", "dashboard"]
519 );
520 }
521
522 #[test]
523 fn test_classify_severity_source_seed_snapshot() {
524 let source = make_node("source.raw.o", "raw.o", NodeType::Source, None, None);
526 assert_eq!(classify_severity(&source), ImpactSeverity::Medium);
527
528 let seed = make_node("seed.countries", "countries", NodeType::Seed, None, None);
529 assert_eq!(classify_severity(&seed), ImpactSeverity::Medium);
530
531 let snap = make_node("snapshot.snap", "snap", NodeType::Snapshot, None, None);
532 assert_eq!(classify_severity(&snap), ImpactSeverity::Medium);
533 }
534}