Skip to main content

dlin_core/graph/
impact.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2
3use path_slash::PathExt as _;
4use petgraph::Direction;
5use petgraph::stable_graph::NodeIndex;
6use petgraph::visit::EdgeRef;
7use serde::Serialize;
8
9use super::types::*;
10
11/// Severity level of impact
12#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize)]
13#[serde(rename_all = "lowercase")]
14pub enum ImpactSeverity {
15    Low,
16    Medium,
17    High,
18    Critical,
19}
20
21impl ImpactSeverity {
22    pub fn label(&self) -> &'static str {
23        match self {
24            ImpactSeverity::Low => "low",
25            ImpactSeverity::Medium => "medium",
26            ImpactSeverity::High => "high",
27            ImpactSeverity::Critical => "critical",
28        }
29    }
30}
31
32/// Path from source model to an exposure
33#[derive(Debug, Clone, Serialize)]
34pub struct ExposurePath {
35    pub exposure: String,
36    pub path: Vec<String>,
37}
38
39/// A single impacted node with its severity
40#[derive(Debug, Clone, Serialize)]
41pub struct ImpactedNode {
42    pub unique_id: String,
43    pub label: String,
44    pub node_type: String,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub file_path: Option<String>,
47    pub severity: ImpactSeverity,
48    /// Number of edges from the source model (also known as "degree" in dbt terminology)
49    pub distance: usize,
50    #[serde(skip_serializing_if = "Option::is_none")]
51    pub sql_content: Option<String>,
52}
53
54/// Full impact analysis report
55#[derive(Debug, Clone, Serialize)]
56pub struct ImpactReport {
57    pub source_model: String,
58    pub overall_severity: ImpactSeverity,
59    pub affected_models: usize,
60    pub affected_tests: usize,
61    pub affected_exposures: usize,
62    pub exposure_paths: Vec<ExposurePath>,
63    /// True if any exposure had more paths than the enumeration cap
64    pub exposure_paths_truncated: bool,
65    pub impacted_nodes: Vec<ImpactedNode>,
66}
67
68/// Classify the severity of a single node
69pub fn classify_severity(node: &NodeData) -> ImpactSeverity {
70    match node.node_type {
71        NodeType::Exposure => ImpactSeverity::Critical,
72        NodeType::Test => ImpactSeverity::Low,
73        NodeType::Model => {
74            // Check for mart-like indicators
75            let is_mart = node
76                .materialization
77                .as_deref()
78                .is_some_and(|m| m == "table" || m == "incremental")
79                || node
80                    .file_path
81                    .as_ref()
82                    .is_some_and(|p| p.to_string_lossy().contains("mart"));
83
84            if is_mart {
85                return ImpactSeverity::High;
86            }
87
88            ImpactSeverity::Medium
89        }
90        _ => ImpactSeverity::Medium,
91    }
92}
93
94/// Compute the full impact report for a given model
95pub fn compute_impact(graph: &LineageGraph, source_idx: NodeIndex) -> ImpactReport {
96    let source_node = &graph[source_idx];
97    let source_model = source_node.label.clone();
98
99    // BFS downstream to find all impacted nodes with distances
100    let mut visited: HashSet<NodeIndex> = HashSet::new();
101    let mut queue: VecDeque<(NodeIndex, usize)> = VecDeque::new();
102    visited.insert(source_idx);
103    queue.push_back((source_idx, 0));
104
105    let mut impacted_nodes: Vec<ImpactedNode> = Vec::new();
106    let mut affected_models = 0usize;
107    let mut affected_tests = 0usize;
108    let mut affected_exposures = 0usize;
109    let mut exposure_indices: Vec<NodeIndex> = Vec::new();
110
111    while let Some((current, distance)) = queue.pop_front() {
112        for edge in graph.edges_directed(current, Direction::Outgoing) {
113            let neighbor = edge.target();
114            if visited.insert(neighbor) {
115                let node = &graph[neighbor];
116                let severity = classify_severity(node);
117                let next_distance = distance + 1;
118
119                match node.node_type {
120                    NodeType::Model => affected_models += 1,
121                    NodeType::Test => affected_tests += 1,
122                    NodeType::Exposure => {
123                        affected_exposures += 1;
124                        exposure_indices.push(neighbor);
125                    }
126                    _ => {}
127                }
128
129                impacted_nodes.push(ImpactedNode {
130                    unique_id: node.unique_id.clone(),
131                    label: node.label.clone(),
132                    node_type: node.node_type.label().to_string(),
133                    file_path: node
134                        .file_path
135                        .as_ref()
136                        .map(|p| p.to_slash_lossy().into_owned()),
137                    severity,
138                    distance: next_distance,
139                    sql_content: None,
140                });
141
142                queue.push_back((neighbor, next_distance));
143            }
144        }
145    }
146
147    // Find all simple paths to exposures via single DFS (capped per exposure)
148    const MAX_PATHS_PER_EXPOSURE: usize = 10;
149    let exposure_set: HashSet<NodeIndex> = exposure_indices.iter().copied().collect();
150    let mut exposure_paths: Vec<ExposurePath> = Vec::new();
151    let mut path_counts: HashMap<NodeIndex, usize> = HashMap::new();
152
153    if !exposure_set.is_empty() {
154        let mut stack: Vec<(NodeIndex, Vec<NodeIndex>, HashSet<NodeIndex>)> =
155            vec![(source_idx, vec![source_idx], HashSet::from([source_idx]))];
156        while let Some((current, path, path_set)) = stack.pop() {
157            // Early termination: stop when all exposures have reached the cap
158            if path_counts.len() == exposure_set.len()
159                && path_counts.values().all(|&c| c >= MAX_PATHS_PER_EXPOSURE)
160            {
161                break;
162            }
163            if exposure_set.contains(&current) {
164                let count = path_counts.entry(current).or_insert(0);
165                if *count < MAX_PATHS_PER_EXPOSURE {
166                    *count += 1;
167                    exposure_paths.push(ExposurePath {
168                        exposure: graph[current].label.clone(),
169                        path: path.iter().map(|&idx| graph[idx].label.clone()).collect(),
170                    });
171                }
172                // Exposures are leaf nodes in dbt; don't traverse beyond them
173                continue;
174            }
175            for edge in graph.edges_directed(current, Direction::Outgoing) {
176                let neighbor = edge.target();
177                if !path_set.contains(&neighbor) {
178                    let mut new_path = path.clone();
179                    new_path.push(neighbor);
180                    let mut new_set = path_set.clone();
181                    new_set.insert(neighbor);
182                    stack.push((neighbor, new_path, new_set));
183                }
184            }
185        }
186    }
187    let exposure_paths_truncated = path_counts
188        .values()
189        .any(|&count| count >= MAX_PATHS_PER_EXPOSURE);
190    exposure_paths.sort_by(|a, b| a.exposure.cmp(&b.exposure).then(a.path.cmp(&b.path)));
191
192    // Sort by severity (descending), then distance
193    impacted_nodes.sort_by(|a, b| {
194        b.severity
195            .cmp(&a.severity)
196            .then(a.distance.cmp(&b.distance))
197    });
198
199    let overall_severity = impacted_nodes
200        .iter()
201        .map(|n| n.severity)
202        .max()
203        .unwrap_or(ImpactSeverity::Low);
204
205    ImpactReport {
206        source_model,
207        overall_severity,
208        affected_models,
209        affected_tests,
210        affected_exposures,
211        exposure_paths,
212        exposure_paths_truncated,
213        impacted_nodes,
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use std::path::PathBuf;
221
222    fn make_node(
223        unique_id: &str,
224        label: &str,
225        node_type: NodeType,
226        materialization: Option<&str>,
227        file_path: Option<&str>,
228    ) -> NodeData {
229        NodeData {
230            unique_id: unique_id.into(),
231            label: label.into(),
232            node_type,
233            file_path: file_path.map(PathBuf::from),
234            description: None,
235            materialization: materialization.map(|s| s.to_string()),
236            tags: vec![],
237            columns: vec![],
238            exposure: None,
239        }
240    }
241
242    fn make_test_graph() -> (LineageGraph, NodeIndex) {
243        let mut g = LineageGraph::new();
244        let src = g.add_node(make_node(
245            "source.raw.orders",
246            "raw.orders",
247            NodeType::Source,
248            None,
249            None,
250        ));
251        let stg = g.add_node(make_node(
252            "model.stg_orders",
253            "stg_orders",
254            NodeType::Model,
255            Some("view"),
256            Some("models/staging/stg_orders.sql"),
257        ));
258        let mart = g.add_node(make_node(
259            "model.orders",
260            "orders",
261            NodeType::Model,
262            Some("table"),
263            Some("models/marts/orders.sql"),
264        ));
265        let test = g.add_node(make_node(
266            "test.orders_positive",
267            "orders_positive",
268            NodeType::Test,
269            None,
270            None,
271        ));
272        let exp = g.add_node(make_node(
273            "exposure.dashboard",
274            "dashboard",
275            NodeType::Exposure,
276            None,
277            None,
278        ));
279
280        g.add_edge(src, stg, EdgeData::direct(EdgeType::Source));
281        g.add_edge(stg, mart, EdgeData::direct(EdgeType::Ref));
282        g.add_edge(mart, test, EdgeData::direct(EdgeType::Test));
283        g.add_edge(mart, exp, EdgeData::direct(EdgeType::Exposure));
284
285        (g, stg)
286    }
287
288    #[test]
289    fn test_classify_severity_exposure() {
290        let node = make_node("exposure.x", "x", NodeType::Exposure, None, None);
291        assert_eq!(classify_severity(&node), ImpactSeverity::Critical);
292    }
293
294    #[test]
295    fn test_classify_severity_test() {
296        let node = make_node("test.x", "x", NodeType::Test, None, None);
297        assert_eq!(classify_severity(&node), ImpactSeverity::Low);
298    }
299
300    #[test]
301    fn test_classify_severity_mart_table() {
302        let node = make_node(
303            "model.orders",
304            "orders",
305            NodeType::Model,
306            Some("table"),
307            None,
308        );
309        assert_eq!(classify_severity(&node), ImpactSeverity::High);
310    }
311
312    #[test]
313    fn test_classify_severity_mart_incremental() {
314        let node = make_node(
315            "model.orders",
316            "orders",
317            NodeType::Model,
318            Some("incremental"),
319            None,
320        );
321        assert_eq!(classify_severity(&node), ImpactSeverity::High);
322    }
323
324    #[test]
325    fn test_classify_severity_mart_path() {
326        let node = make_node(
327            "model.orders",
328            "orders",
329            NodeType::Model,
330            None,
331            Some("models/marts/orders.sql"),
332        );
333        assert_eq!(classify_severity(&node), ImpactSeverity::High);
334    }
335
336    #[test]
337    fn test_classify_severity_staging() {
338        let node = make_node(
339            "model.stg_orders",
340            "stg_orders",
341            NodeType::Model,
342            Some("view"),
343            Some("models/staging/stg_orders.sql"),
344        );
345        assert_eq!(classify_severity(&node), ImpactSeverity::Medium);
346    }
347
348    #[test]
349    fn test_compute_impact() {
350        let (g, stg) = make_test_graph();
351        let report = compute_impact(&g, stg);
352
353        assert_eq!(report.source_model, "stg_orders");
354        assert_eq!(report.affected_models, 1); // orders
355        assert_eq!(report.affected_tests, 1); // orders_positive
356        assert_eq!(report.affected_exposures, 1); // dashboard
357        assert_eq!(report.overall_severity, ImpactSeverity::Critical);
358        assert_eq!(report.impacted_nodes.len(), 3);
359
360        // Exposure path: stg_orders -> orders -> dashboard
361        assert_eq!(report.exposure_paths.len(), 1);
362        assert_eq!(report.exposure_paths[0].exposure, "dashboard");
363        assert_eq!(
364            report.exposure_paths[0].path,
365            vec!["stg_orders", "orders", "dashboard"]
366        );
367    }
368
369    #[test]
370    fn test_compute_impact_leaf_node() {
371        let (g, _) = make_test_graph();
372        let exp = g
373            .node_indices()
374            .find(|&i| g[i].label == "dashboard")
375            .unwrap();
376        let report = compute_impact(&g, exp);
377
378        assert_eq!(report.source_model, "dashboard");
379        assert_eq!(report.affected_models, 0);
380        assert_eq!(report.affected_tests, 0);
381        assert_eq!(report.affected_exposures, 0);
382        assert!(report.impacted_nodes.is_empty());
383        assert!(report.exposure_paths.is_empty());
384    }
385
386    #[test]
387    fn test_impact_severity_ordering() {
388        assert!(ImpactSeverity::Low < ImpactSeverity::Medium);
389        assert!(ImpactSeverity::Medium < ImpactSeverity::High);
390        assert!(ImpactSeverity::High < ImpactSeverity::Critical);
391    }
392
393    #[test]
394    fn test_impact_isolated_node() {
395        let mut g = LineageGraph::new();
396        let n = g.add_node(make_node("model.x", "x", NodeType::Model, None, None));
397        let report = compute_impact(&g, n);
398        assert_eq!(report.affected_models, 0);
399        assert_eq!(report.affected_tests, 0);
400        assert_eq!(report.affected_exposures, 0);
401        assert!(report.impacted_nodes.is_empty());
402    }
403
404    #[test]
405    fn test_exposure_paths_multiple_exposures() {
406        // Diamond graph: src -> A -> exp1, src -> B -> exp2
407        let mut g = LineageGraph::new();
408        let src = g.add_node(make_node(
409            "model.src",
410            "src",
411            NodeType::Model,
412            Some("view"),
413            None,
414        ));
415        let a = g.add_node(make_node(
416            "model.a",
417            "a",
418            NodeType::Model,
419            Some("view"),
420            None,
421        ));
422        let b = g.add_node(make_node(
423            "model.b",
424            "b",
425            NodeType::Model,
426            Some("view"),
427            None,
428        ));
429        let exp1 = g.add_node(make_node(
430            "exposure.dashboard",
431            "dashboard",
432            NodeType::Exposure,
433            None,
434            None,
435        ));
436        let exp2 = g.add_node(make_node(
437            "exposure.report",
438            "report",
439            NodeType::Exposure,
440            None,
441            None,
442        ));
443
444        g.add_edge(src, a, EdgeData::direct(EdgeType::Ref));
445        g.add_edge(src, b, EdgeData::direct(EdgeType::Ref));
446        g.add_edge(a, exp1, EdgeData::direct(EdgeType::Exposure));
447        g.add_edge(b, exp2, EdgeData::direct(EdgeType::Exposure));
448
449        let report = compute_impact(&g, src);
450        assert_eq!(report.affected_exposures, 2);
451        assert_eq!(report.exposure_paths.len(), 2);
452
453        // Sorted by exposure name
454        assert_eq!(report.exposure_paths[0].exposure, "dashboard");
455        assert_eq!(report.exposure_paths[0].path, vec!["src", "a", "dashboard"]);
456        assert_eq!(report.exposure_paths[1].exposure, "report");
457        assert_eq!(report.exposure_paths[1].path, vec!["src", "b", "report"]);
458    }
459
460    #[test]
461    fn test_exposure_paths_diamond_convergent() {
462        // Diamond: src -> A -> C -> exp, src -> B -> C -> exp
463        // BFS finds shortest path (through whichever of A/B is visited first)
464        let mut g = LineageGraph::new();
465        let src = g.add_node(make_node(
466            "model.src",
467            "src",
468            NodeType::Model,
469            Some("view"),
470            None,
471        ));
472        let a = g.add_node(make_node(
473            "model.a",
474            "a",
475            NodeType::Model,
476            Some("view"),
477            None,
478        ));
479        let b = g.add_node(make_node(
480            "model.b",
481            "b",
482            NodeType::Model,
483            Some("view"),
484            None,
485        ));
486        let c = g.add_node(make_node(
487            "model.c",
488            "c",
489            NodeType::Model,
490            Some("table"),
491            None,
492        ));
493        let exp = g.add_node(make_node(
494            "exposure.dashboard",
495            "dashboard",
496            NodeType::Exposure,
497            None,
498            None,
499        ));
500
501        g.add_edge(src, a, EdgeData::direct(EdgeType::Ref));
502        g.add_edge(src, b, EdgeData::direct(EdgeType::Ref));
503        g.add_edge(a, c, EdgeData::direct(EdgeType::Ref));
504        g.add_edge(b, c, EdgeData::direct(EdgeType::Ref));
505        g.add_edge(c, exp, EdgeData::direct(EdgeType::Exposure));
506
507        let report = compute_impact(&g, src);
508        assert_eq!(report.affected_exposures, 1);
509        // Both paths should be found: src->a->c->dashboard and src->b->c->dashboard
510        assert_eq!(report.exposure_paths.len(), 2);
511        assert_eq!(
512            report.exposure_paths[0].path,
513            vec!["src", "a", "c", "dashboard"]
514        );
515        assert_eq!(
516            report.exposure_paths[1].path,
517            vec!["src", "b", "c", "dashboard"]
518        );
519    }
520
521    #[test]
522    fn test_classify_severity_source_seed_snapshot() {
523        // Covers the wildcard arm (line 76): Source, Seed, Snapshot → Medium
524        let source = make_node("source.raw.o", "raw.o", NodeType::Source, None, None);
525        assert_eq!(classify_severity(&source), ImpactSeverity::Medium);
526
527        let seed = make_node("seed.countries", "countries", NodeType::Seed, None, None);
528        assert_eq!(classify_severity(&seed), ImpactSeverity::Medium);
529
530        let snap = make_node("snapshot.snap", "snap", NodeType::Snapshot, None, None);
531        assert_eq!(classify_severity(&snap), ImpactSeverity::Medium);
532    }
533}