Skip to main content

dlin_core/graph/
impact.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2
3use path_slash::PathExt as _;
4use petgraph::Direction;
5use petgraph::stable_graph::NodeIndex;
6use petgraph::visit::EdgeRef;
7use serde::Serialize;
8
9use super::types::*;
10
11/// Severity level of impact
12#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
13#[serde(rename_all = "lowercase")]
14pub enum ImpactSeverity {
15    Low,
16    Medium,
17    High,
18    Critical,
19}
20
21impl ImpactSeverity {
22    pub fn label(&self) -> &'static str {
23        match self {
24            ImpactSeverity::Low => "low",
25            ImpactSeverity::Medium => "medium",
26            ImpactSeverity::High => "high",
27            ImpactSeverity::Critical => "critical",
28        }
29    }
30}
31
32/// Path from source model to an exposure
33#[derive(Debug, Clone, Serialize)]
34pub struct ExposurePath {
35    pub exposure: String,
36    pub path: Vec<String>,
37}
38
39/// A single impacted node with its severity
40#[derive(Debug, Clone, Serialize)]
41pub struct ImpactedNode {
42    pub unique_id: String,
43    pub label: String,
44    pub node_type: String,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub file_path: Option<String>,
47    pub severity: ImpactSeverity,
48    /// Number of edges from the source model (also known as "degree" in dbt terminology)
49    pub distance: usize,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub sql_content: Option<String>,
52}
53
54/// Full impact analysis report
55#[derive(Debug, Clone, Serialize)]
56pub struct ImpactReport {
57    pub source_model: String,
58    pub overall_severity: ImpactSeverity,
59    pub affected_models: usize,
60    pub affected_tests: usize,
61    pub affected_exposures: usize,
62    pub exposure_paths: Vec<ExposurePath>,
63    /// True if any exposure had more paths than the enumeration cap
64    pub exposure_paths_truncated: bool,
65    pub impacted_nodes: Vec<ImpactedNode>,
66}
67
68/// Classify the severity of a single node
69pub fn classify_severity(node: &NodeData) -> ImpactSeverity {
70    match node.node_type {
71        NodeType::Exposure => ImpactSeverity::Critical,
72        NodeType::Test => ImpactSeverity::Low,
73        NodeType::Model => {
74            // Check for mart-like indicators
75            let is_mart = node
76                .materialization
77                .as_deref()
78                .is_some_and(|m| m == "table" || m == "incremental")
79                || node
80                    .file_path
81                    .as_ref()
82                    .is_some_and(|p| p.to_string_lossy().contains("mart"));
83
84            if is_mart {
85                return ImpactSeverity::High;
86            }
87
88            ImpactSeverity::Medium
89        }
90        _ => ImpactSeverity::Medium,
91    }
92}
93
94/// Compute the full impact report for a given model
95pub fn compute_impact(graph: &LineageGraph, source_idx: NodeIndex) -> ImpactReport {
96    let source_node = &graph[source_idx];
97    let source_model = source_node.label.clone();
98
99    // BFS downstream to find all impacted nodes with distances
100    let mut visited: HashSet<NodeIndex> = HashSet::new();
101    let mut queue: VecDeque<(NodeIndex, usize)> = VecDeque::new();
102    visited.insert(source_idx);
103    queue.push_back((source_idx, 0));
104
105    let mut impacted_nodes: Vec<ImpactedNode> = Vec::new();
106    let mut affected_models = 0usize;
107    let mut affected_tests = 0usize;
108    let mut affected_exposures = 0usize;
109    let mut exposure_indices: Vec<NodeIndex> = Vec::new();
110
111    while let Some((current, distance)) = queue.pop_front() {
112        for edge in graph.edges_directed(current, Direction::Outgoing) {
113            let neighbor = edge.target();
114            if visited.insert(neighbor) {
115                let node = &graph[neighbor];
116                let severity = classify_severity(node);
117                let next_distance = distance + 1;
118
119                match node.node_type {
120                    NodeType::Model => affected_models += 1,
121                    NodeType::Test => affected_tests += 1,
122                    NodeType::Exposure => {
123                        affected_exposures += 1;
124                        exposure_indices.push(neighbor);
125                    }
126                    _ => {}
127                }
128
129                impacted_nodes.push(ImpactedNode {
130                    unique_id: node.unique_id.clone(),
131                    label: node.label.clone(),
132                    node_type: node.node_type.label().to_string(),
133                    file_path: node
134                        .file_path
135                        .as_ref()
136                        .map(|p| p.to_slash_lossy().into_owned()),
137                    severity,
138                    distance: next_distance,
139                    sql_content: None,
140                });
141
142                queue.push_back((neighbor, next_distance));
143            }
144        }
145    }
146
147    // Find all simple paths to exposures via single DFS (capped per exposure)
148    const MAX_PATHS_PER_EXPOSURE: usize = 10;
149    let exposure_set: HashSet<NodeIndex> = exposure_indices.iter().copied().collect();
150    let mut exposure_paths: Vec<ExposurePath> = Vec::new();
151    let mut path_counts: HashMap<NodeIndex, usize> = HashMap::new();
152
153    if !exposure_set.is_empty() {
154        let mut stack: Vec<(NodeIndex, Vec<NodeIndex>, HashSet<NodeIndex>)> =
155            vec![(source_idx, vec![source_idx], HashSet::from([source_idx]))];
156        while let Some((current, path, path_set)) = stack.pop() {
157            // Early termination: stop when all exposures have reached the cap
158            if path_counts.len() == exposure_set.len()
159                && path_counts.values().all(|&c| c >= MAX_PATHS_PER_EXPOSURE)
160            {
161                break;
162            }
163            if exposure_set.contains(&current) {
164                let count = path_counts.entry(current).or_insert(0);
165                if *count < MAX_PATHS_PER_EXPOSURE {
166                    *count += 1;
167                    exposure_paths.push(ExposurePath {
168                        exposure: graph[current].label.clone(),
169                        path: path.iter().map(|&idx| graph[idx].label.clone()).collect(),
170                    });
171                }
172                // Exposures are leaf nodes in dbt; don't traverse beyond them
173                continue;
174            }
175            for edge in graph.edges_directed(current, Direction::Outgoing) {
176                let neighbor = edge.target();
177                if !path_set.contains(&neighbor) {
178                    let mut new_path = path.clone();
179                    new_path.push(neighbor);
180                    let mut new_set = path_set.clone();
181                    new_set.insert(neighbor);
182                    stack.push((neighbor, new_path, new_set));
183                }
184            }
185        }
186    }
187    let exposure_paths_truncated = path_counts
188        .values()
189        .any(|&count| count >= MAX_PATHS_PER_EXPOSURE);
190    exposure_paths.sort_by(|a, b| a.exposure.cmp(&b.exposure).then(a.path.cmp(&b.path)));
191
192    // Sort by severity (descending), then distance
193    impacted_nodes.sort_by(|a, b| {
194        b.severity
195            .cmp(&a.severity)
196            .then(a.distance.cmp(&b.distance))
197    });
198
199    let overall_severity = impacted_nodes
200        .iter()
201        .map(|n| n.severity)
202        .max()
203        .unwrap_or(ImpactSeverity::Low);
204
205    ImpactReport {
206        source_model,
207        overall_severity,
208        affected_models,
209        affected_tests,
210        affected_exposures,
211        exposure_paths,
212        exposure_paths_truncated,
213        impacted_nodes,
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use std::path::PathBuf;
221
222    fn make_node(
223        unique_id: &str,
224        label: &str,
225        node_type: NodeType,
226        materialization: Option<&str>,
227        file_path: Option<&str>,
228    ) -> NodeData {
229        NodeData {
230            unique_id: unique_id.into(),
231            label: label.into(),
232            node_type,
233            file_path: file_path.map(PathBuf::from),
234            description: None,
235            materialization: materialization.map(|s| s.to_string()),
236            tags: vec![],
237            columns: vec![],
238            exposure: None,
239            aliases: vec![],
240        }
241    }
242
243    fn make_test_graph() -> (LineageGraph, NodeIndex) {
244        let mut g = LineageGraph::new();
245        let src = g.add_node(make_node(
246            "source.raw.orders",
247            "raw.orders",
248            NodeType::Source,
249            None,
250            None,
251        ));
252        let stg = g.add_node(make_node(
253            "model.stg_orders",
254            "stg_orders",
255            NodeType::Model,
256            Some("view"),
257            Some("models/staging/stg_orders.sql"),
258        ));
259        let mart = g.add_node(make_node(
260            "model.orders",
261            "orders",
262            NodeType::Model,
263            Some("table"),
264            Some("models/marts/orders.sql"),
265        ));
266        let test = g.add_node(make_node(
267            "test.orders_positive",
268            "orders_positive",
269            NodeType::Test,
270            None,
271            None,
272        ));
273        let exp = g.add_node(make_node(
274            "exposure.dashboard",
275            "dashboard",
276            NodeType::Exposure,
277            None,
278            None,
279        ));
280
281        g.add_edge(src, stg, EdgeData::direct(EdgeType::Source));
282        g.add_edge(stg, mart, EdgeData::direct(EdgeType::Ref));
283        g.add_edge(mart, test, EdgeData::direct(EdgeType::Test));
284        g.add_edge(mart, exp, EdgeData::direct(EdgeType::Exposure));
285
286        (g, stg)
287    }
288
289    #[test]
290    fn test_classify_severity_exposure() {
291        let node = make_node("exposure.x", "x", NodeType::Exposure, None, None);
292        assert_eq!(classify_severity(&node), ImpactSeverity::Critical);
293    }
294
295    #[test]
296    fn test_classify_severity_test() {
297        let node = make_node("test.x", "x", NodeType::Test, None, None);
298        assert_eq!(classify_severity(&node), ImpactSeverity::Low);
299    }
300
301    #[test]
302    fn test_classify_severity_mart_table() {
303        let node = make_node(
304            "model.orders",
305            "orders",
306            NodeType::Model,
307            Some("table"),
308            None,
309        );
310        assert_eq!(classify_severity(&node), ImpactSeverity::High);
311    }
312
313    #[test]
314    fn test_classify_severity_mart_incremental() {
315        let node = make_node(
316            "model.orders",
317            "orders",
318            NodeType::Model,
319            Some("incremental"),
320            None,
321        );
322        assert_eq!(classify_severity(&node), ImpactSeverity::High);
323    }
324
325    #[test]
326    fn test_classify_severity_mart_path() {
327        let node = make_node(
328            "model.orders",
329            "orders",
330            NodeType::Model,
331            None,
332            Some("models/marts/orders.sql"),
333        );
334        assert_eq!(classify_severity(&node), ImpactSeverity::High);
335    }
336
337    #[test]
338    fn test_classify_severity_staging() {
339        let node = make_node(
340            "model.stg_orders",
341            "stg_orders",
342            NodeType::Model,
343            Some("view"),
344            Some("models/staging/stg_orders.sql"),
345        );
346        assert_eq!(classify_severity(&node), ImpactSeverity::Medium);
347    }
348
349    #[test]
350    fn test_compute_impact() {
351        let (g, stg) = make_test_graph();
352        let report = compute_impact(&g, stg);
353
354        assert_eq!(report.source_model, "stg_orders");
355        assert_eq!(report.affected_models, 1); // orders
356        assert_eq!(report.affected_tests, 1); // orders_positive
357        assert_eq!(report.affected_exposures, 1); // dashboard
358        assert_eq!(report.overall_severity, ImpactSeverity::Critical);
359        assert_eq!(report.impacted_nodes.len(), 3);
360
361        // Exposure path: stg_orders -> orders -> dashboard
362        assert_eq!(report.exposure_paths.len(), 1);
363        assert_eq!(report.exposure_paths[0].exposure, "dashboard");
364        assert_eq!(
365            report.exposure_paths[0].path,
366            vec!["stg_orders", "orders", "dashboard"]
367        );
368    }
369
370    #[test]
371    fn test_compute_impact_leaf_node() {
372        let (g, _) = make_test_graph();
373        let exp = g
374            .node_indices()
375            .find(|&i| g[i].label == "dashboard")
376            .unwrap();
377        let report = compute_impact(&g, exp);
378
379        assert_eq!(report.source_model, "dashboard");
380        assert_eq!(report.affected_models, 0);
381        assert_eq!(report.affected_tests, 0);
382        assert_eq!(report.affected_exposures, 0);
383        assert!(report.impacted_nodes.is_empty());
384        assert!(report.exposure_paths.is_empty());
385    }
386
387    #[test]
388    fn test_impact_severity_ordering() {
389        assert!(ImpactSeverity::Low < ImpactSeverity::Medium);
390        assert!(ImpactSeverity::Medium < ImpactSeverity::High);
391        assert!(ImpactSeverity::High < ImpactSeverity::Critical);
392    }
393
394    #[test]
395    fn test_impact_isolated_node() {
396        let mut g = LineageGraph::new();
397        let n = g.add_node(make_node("model.x", "x", NodeType::Model, None, None));
398        let report = compute_impact(&g, n);
399        assert_eq!(report.affected_models, 0);
400        assert_eq!(report.affected_tests, 0);
401        assert_eq!(report.affected_exposures, 0);
402        assert!(report.impacted_nodes.is_empty());
403    }
404
405    #[test]
406    fn test_exposure_paths_multiple_exposures() {
407        // Diamond graph: src -> A -> exp1, src -> B -> exp2
408        let mut g = LineageGraph::new();
409        let src = g.add_node(make_node(
410            "model.src",
411            "src",
412            NodeType::Model,
413            Some("view"),
414            None,
415        ));
416        let a = g.add_node(make_node(
417            "model.a",
418            "a",
419            NodeType::Model,
420            Some("view"),
421            None,
422        ));
423        let b = g.add_node(make_node(
424            "model.b",
425            "b",
426            NodeType::Model,
427            Some("view"),
428            None,
429        ));
430        let exp1 = g.add_node(make_node(
431            "exposure.dashboard",
432            "dashboard",
433            NodeType::Exposure,
434            None,
435            None,
436        ));
437        let exp2 = g.add_node(make_node(
438            "exposure.report",
439            "report",
440            NodeType::Exposure,
441            None,
442            None,
443        ));
444
445        g.add_edge(src, a, EdgeData::direct(EdgeType::Ref));
446        g.add_edge(src, b, EdgeData::direct(EdgeType::Ref));
447        g.add_edge(a, exp1, EdgeData::direct(EdgeType::Exposure));
448        g.add_edge(b, exp2, EdgeData::direct(EdgeType::Exposure));
449
450        let report = compute_impact(&g, src);
451        assert_eq!(report.affected_exposures, 2);
452        assert_eq!(report.exposure_paths.len(), 2);
453
454        // Sorted by exposure name
455        assert_eq!(report.exposure_paths[0].exposure, "dashboard");
456        assert_eq!(report.exposure_paths[0].path, vec!["src", "a", "dashboard"]);
457        assert_eq!(report.exposure_paths[1].exposure, "report");
458        assert_eq!(report.exposure_paths[1].path, vec!["src", "b", "report"]);
459    }
460
461    #[test]
462    fn test_exposure_paths_diamond_convergent() {
463        // Diamond: src -> A -> C -> exp, src -> B -> C -> exp
464        // BFS finds shortest path (through whichever of A/B is visited first)
465        let mut g = LineageGraph::new();
466        let src = g.add_node(make_node(
467            "model.src",
468            "src",
469            NodeType::Model,
470            Some("view"),
471            None,
472        ));
473        let a = g.add_node(make_node(
474            "model.a",
475            "a",
476            NodeType::Model,
477            Some("view"),
478            None,
479        ));
480        let b = g.add_node(make_node(
481            "model.b",
482            "b",
483            NodeType::Model,
484            Some("view"),
485            None,
486        ));
487        let c = g.add_node(make_node(
488            "model.c",
489            "c",
490            NodeType::Model,
491            Some("table"),
492            None,
493        ));
494        let exp = g.add_node(make_node(
495            "exposure.dashboard",
496            "dashboard",
497            NodeType::Exposure,
498            None,
499            None,
500        ));
501
502        g.add_edge(src, a, EdgeData::direct(EdgeType::Ref));
503        g.add_edge(src, b, EdgeData::direct(EdgeType::Ref));
504        g.add_edge(a, c, EdgeData::direct(EdgeType::Ref));
505        g.add_edge(b, c, EdgeData::direct(EdgeType::Ref));
506        g.add_edge(c, exp, EdgeData::direct(EdgeType::Exposure));
507
508        let report = compute_impact(&g, src);
509        assert_eq!(report.affected_exposures, 1);
510        // Both paths should be found: src->a->c->dashboard and src->b->c->dashboard
511        assert_eq!(report.exposure_paths.len(), 2);
512        assert_eq!(
513            report.exposure_paths[0].path,
514            vec!["src", "a", "c", "dashboard"]
515        );
516        assert_eq!(
517            report.exposure_paths[1].path,
518            vec!["src", "b", "c", "dashboard"]
519        );
520    }
521
522    #[test]
523    fn test_classify_severity_source_seed_snapshot() {
524        // Covers the wildcard arm (line 76): Source, Seed, Snapshot → Medium
525        let source = make_node("source.raw.o", "raw.o", NodeType::Source, None, None);
526        assert_eq!(classify_severity(&source), ImpactSeverity::Medium);
527
528        let seed = make_node("seed.countries", "countries", NodeType::Seed, None, None);
529        assert_eq!(classify_severity(&seed), ImpactSeverity::Medium);
530
531        let snap = make_node("snapshot.snap", "snap", NodeType::Snapshot, None, None);
532        assert_eq!(classify_severity(&snap), ImpactSeverity::Medium);
533    }
534}