Skip to main content

feldera_types/
pipeline_diff.rs

1use feldera_ir::{MirNode, MirNodeId};
2use serde::{Deserialize, Serialize};
3use std::{collections::HashMap, fmt::Display};
4use utoipa::ToSchema;
5
6/// Summary of changes in the program between checkpointed and new versions.
7#[derive(Debug, Default, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
8pub struct ProgramDiff {
9    added_tables: Vec<String>,
10    removed_tables: Vec<String>,
11    modified_tables: Vec<String>,
12
13    added_views: Vec<String>,
14    removed_views: Vec<String>,
15    modified_views: Vec<String>,
16}
17
18impl Display for ProgramDiff {
19    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20        fn quoted_list(list: &[String]) -> String {
21            list.iter()
22                .map(|s| format!("'{}'", s))
23                .collect::<Vec<_>>()
24                .join(", ")
25        }
26
27        if !self.added_tables.is_empty() {
28            writeln!(f, "Added tables: {}", quoted_list(&self.added_tables))?;
29        }
30        if !self.removed_tables.is_empty() {
31            writeln!(f, "Removed tables: {}", quoted_list(&self.removed_tables))?;
32        }
33        if !self.modified_tables.is_empty() {
34            writeln!(f, "Modified tables: {}", quoted_list(&self.modified_tables))?;
35        }
36        if !self.added_views.is_empty() {
37            writeln!(f, "Added views: {}", quoted_list(&self.added_views))?;
38        }
39        if !self.removed_views.is_empty() {
40            writeln!(f, "Removed views: {}", quoted_list(&self.removed_views))?;
41        }
42        if !self.modified_views.is_empty() {
43            writeln!(f, "Modified views: {}", quoted_list(&self.modified_views))?;
44        }
45        Ok(())
46    }
47}
48
49impl ProgramDiff {
50    pub fn new() -> Self {
51        Self {
52            added_tables: Vec::new(),
53            removed_tables: Vec::new(),
54            modified_tables: Vec::new(),
55            added_views: Vec::new(),
56            removed_views: Vec::new(),
57            modified_views: Vec::new(),
58        }
59    }
60
61    pub fn with_added_tables(mut self, mut tables: Vec<String>) -> Self {
62        tables.sort();
63        self.added_tables = tables;
64        self
65    }
66
67    pub fn with_removed_tables(mut self, mut tables: Vec<String>) -> Self {
68        tables.sort();
69        self.removed_tables = tables;
70        self
71    }
72
73    pub fn with_modified_tables(mut self, mut tables: Vec<String>) -> Self {
74        tables.sort();
75        self.modified_tables = tables;
76        self
77    }
78
79    pub fn with_added_views(mut self, mut views: Vec<String>) -> Self {
80        views.sort();
81        self.added_views = views;
82        self
83    }
84
85    pub fn with_removed_views(mut self, mut views: Vec<String>) -> Self {
86        views.sort();
87        self.removed_views = views;
88        self
89    }
90
91    pub fn with_modified_views(mut self, mut views: Vec<String>) -> Self {
92        views.sort();
93        self.modified_views = views;
94        self
95    }
96
97    pub fn added_tables(&self) -> &Vec<String> {
98        &self.added_tables
99    }
100
101    pub fn removed_tables(&self) -> &Vec<String> {
102        &self.removed_tables
103    }
104
105    pub fn modified_tables(&self) -> &Vec<String> {
106        &self.modified_tables
107    }
108
109    pub fn added_views(&self) -> &Vec<String> {
110        &self.added_views
111    }
112
113    pub fn removed_views(&self) -> &Vec<String> {
114        &self.removed_views
115    }
116
117    pub fn modified_views(&self) -> &Vec<String> {
118        &self.modified_views
119    }
120
121    pub fn is_empty(&self) -> bool {
122        self.added_tables.is_empty()
123            && self.removed_tables.is_empty()
124            && self.modified_tables.is_empty()
125            && self.added_views.is_empty()
126            && self.removed_views.is_empty()
127            && self.modified_views.is_empty()
128    }
129
130    pub fn is_affected_relation(&self, relation_name: &str) -> bool {
131        let relation_name = relation_name.to_string();
132        self.added_tables.contains(&relation_name)
133            || self.removed_tables.contains(&relation_name)
134            || self.modified_tables.contains(&relation_name)
135            || self.added_views.contains(&relation_name)
136            || self.removed_views.contains(&relation_name)
137            || self.modified_views.contains(&relation_name)
138    }
139}
140
141/// Summary of changes in the pipeline between checkpointed and new versions.
142#[derive(Debug, Serialize, Deserialize, ToSchema, Clone, PartialEq, Eq)]
143pub struct PipelineDiff {
144    /// IR changes or the reason why we couldn't compute them.
145    program_diff: Option<ProgramDiff>,
146    program_diff_error: Option<String>,
147
148    added_input_connectors: Vec<String>,
149    modified_input_connectors: Vec<String>,
150    removed_input_connectors: Vec<String>,
151
152    added_output_connectors: Vec<String>,
153    modified_output_connectors: Vec<String>,
154    removed_output_connectors: Vec<String>,
155}
156
157impl Display for PipelineDiff {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        if let Some(err) = &self.program_diff_error {
160            writeln!(f, "Could not compute program diff: {err}")?;
161        }
162
163        if let Some(diff) = &self.program_diff
164            && !diff.is_empty()
165        {
166            writeln!(f, "Program changes:")?;
167            for change in diff.to_string().lines() {
168                writeln!(f, "  {change}")?;
169            }
170        }
171
172        if !self.added_input_connectors.is_empty() {
173            writeln!(
174                f,
175                "Added input connectors: {}",
176                self.added_input_connectors.join(", ")
177            )?;
178        }
179
180        if !self.removed_input_connectors.is_empty() {
181            writeln!(
182                f,
183                "Removed input connectors: {}",
184                self.removed_input_connectors.join(", ")
185            )?;
186        }
187
188        if !self.modified_input_connectors.is_empty() {
189            writeln!(
190                f,
191                "Modified input connectors: {}",
192                self.modified_input_connectors.join(", ")
193            )?;
194        }
195
196        if !self.added_output_connectors.is_empty() {
197            writeln!(
198                f,
199                "Added output connectors: {}",
200                self.added_output_connectors.join(", ")
201            )?;
202        }
203
204        if !self.removed_output_connectors.is_empty() {
205            writeln!(
206                f,
207                "Removed output connectors: {}",
208                self.removed_output_connectors.join(", ")
209            )?;
210        }
211
212        if !self.modified_output_connectors.is_empty() {
213            writeln!(
214                f,
215                "Modified output connectors: {}",
216                self.modified_output_connectors.join(", ")
217            )?;
218        }
219
220        Ok(())
221    }
222}
223
224impl PipelineDiff {
225    pub fn new(program_diff_or_err: Result<ProgramDiff, String>) -> Self {
226        match program_diff_or_err {
227            Ok(program_diff) => Self::new_with_program_diff(program_diff),
228            Err(program_diff_error) => Self::new_with_program_diff_error(program_diff_error),
229        }
230    }
231
232    pub fn new_with_program_diff(program_diff: ProgramDiff) -> Self {
233        Self {
234            program_diff: Some(program_diff),
235            program_diff_error: None,
236            added_input_connectors: Vec::new(),
237            modified_input_connectors: Vec::new(),
238            removed_input_connectors: Vec::new(),
239            added_output_connectors: Vec::new(),
240            modified_output_connectors: Vec::new(),
241            removed_output_connectors: Vec::new(),
242        }
243    }
244
245    pub fn new_with_program_diff_error(program_diff_error: String) -> Self {
246        Self {
247            program_diff: None,
248            program_diff_error: Some(program_diff_error),
249            added_input_connectors: Vec::new(),
250            modified_input_connectors: Vec::new(),
251            removed_input_connectors: Vec::new(),
252            added_output_connectors: Vec::new(),
253            modified_output_connectors: Vec::new(),
254            removed_output_connectors: Vec::new(),
255        }
256    }
257
258    pub fn with_added_input_connectors(mut self, mut connectors: Vec<String>) -> Self {
259        connectors.sort();
260        self.added_input_connectors = connectors;
261        self
262    }
263
264    pub fn with_modified_input_connectors(mut self, mut connectors: Vec<String>) -> Self {
265        connectors.sort();
266        self.modified_input_connectors = connectors;
267        self
268    }
269
270    pub fn with_removed_input_connectors(mut self, mut connectors: Vec<String>) -> Self {
271        connectors.sort();
272        self.removed_input_connectors = connectors;
273        self
274    }
275
276    pub fn with_added_output_connectors(mut self, mut connectors: Vec<String>) -> Self {
277        connectors.sort();
278        self.added_output_connectors = connectors;
279        self
280    }
281
282    pub fn with_modified_output_connectors(mut self, mut connectors: Vec<String>) -> Self {
283        connectors.sort();
284        self.modified_output_connectors = connectors;
285        self
286    }
287
288    pub fn with_removed_output_connectors(mut self, mut connectors: Vec<String>) -> Self {
289        connectors.sort();
290        self.removed_output_connectors = connectors;
291        self
292    }
293
294    pub fn is_empty(&self) -> bool {
295        self.program_diff
296            .as_ref()
297            .map(|diff| diff.is_empty())
298            .unwrap_or(false)
299            && self.added_input_connectors.is_empty()
300            && self.removed_input_connectors.is_empty()
301            && self.modified_input_connectors.is_empty()
302            && self.added_output_connectors.is_empty()
303            && self.removed_output_connectors.is_empty()
304            && self.modified_output_connectors.is_empty()
305    }
306
307    pub fn clear_program_diff(&mut self) {
308        self.program_diff = Some(ProgramDiff::default());
309    }
310
311    pub fn is_affected_connector(&self, connector_name: &str) -> bool {
312        let connector_name = connector_name.to_string();
313        self.added_input_connectors.contains(&connector_name)
314            || self.removed_input_connectors.contains(&connector_name)
315            || self.modified_input_connectors.contains(&connector_name)
316            || self.added_output_connectors.contains(&connector_name)
317            || self.removed_output_connectors.contains(&connector_name)
318            || self.modified_output_connectors.contains(&connector_name)
319    }
320
321    pub fn program_diff(&self) -> Option<&ProgramDiff> {
322        self.program_diff.as_ref()
323    }
324
325    pub fn program_diff_error(&self) -> Option<&String> {
326        self.program_diff_error.as_ref()
327    }
328
329    pub fn added_input_connectors(&self) -> &Vec<String> {
330        &self.added_input_connectors
331    }
332
333    pub fn modified_input_connectors(&self) -> &Vec<String> {
334        &self.modified_input_connectors
335    }
336
337    pub fn removed_input_connectors(&self) -> &Vec<String> {
338        &self.removed_input_connectors
339    }
340
341    pub fn added_output_connectors(&self) -> &Vec<String> {
342        &self.added_output_connectors
343    }
344
345    pub fn modified_output_connectors(&self) -> &Vec<String> {
346        &self.modified_output_connectors
347    }
348
349    pub fn removed_output_connectors(&self) -> &Vec<String> {
350        &self.removed_output_connectors
351    }
352}
353
354pub fn program_diff(
355    old_mir: &HashMap<MirNodeId, MirNode>,
356    new_mir: &HashMap<MirNodeId, MirNode>,
357) -> ProgramDiff {
358    let old_tables: HashMap<String, String> = old_mir
359        .values()
360        .filter(|node| node.persistent_id.is_some())
361        .filter_map(|node| {
362            node.table
363                .as_ref()
364                .map(|name| (name.clone(), node.persistent_id.clone().unwrap()))
365        })
366        .collect();
367
368    let old_views: HashMap<String, String> = old_mir
369        .values()
370        .filter(|node| node.persistent_id.is_some())
371        .filter_map(|node| {
372            node.view
373                .as_ref()
374                .map(|name| (name.clone(), node.persistent_id.clone().unwrap()))
375        })
376        .collect();
377
378    let new_tables: HashMap<String, String> = new_mir
379        .values()
380        .filter(|node| node.persistent_id.is_some())
381        .filter_map(|node| {
382            node.table
383                .as_ref()
384                .map(|name| (name.clone(), node.persistent_id.clone().unwrap()))
385        })
386        .collect();
387
388    let new_views: HashMap<String, String> = new_mir
389        .values()
390        .filter(|node| node.persistent_id.is_some())
391        .filter_map(|node| {
392            node.view
393                .as_ref()
394                .map(|name| (name.clone(), node.persistent_id.clone().unwrap()))
395        })
396        .collect();
397
398    let added_tables = new_tables
399        .keys()
400        .filter(|k| !old_tables.contains_key(*k))
401        .cloned()
402        .collect();
403    let removed_tables = old_tables
404        .keys()
405        .filter(|k| !new_tables.contains_key(*k))
406        .cloned()
407        .collect();
408    let modified_tables = new_tables
409        .iter()
410        .filter_map(|(name, id)| {
411            if let Some(old_id) = old_tables.get(name) {
412                if old_id != id {
413                    Some(name.clone())
414                } else {
415                    None
416                }
417            } else {
418                None
419            }
420        })
421        .collect();
422
423    let added_views = new_views
424        .keys()
425        .filter(|k| !old_views.contains_key(*k))
426        .cloned()
427        .collect();
428    let removed_views = old_views
429        .keys()
430        .filter(|k| !new_views.contains_key(*k))
431        .cloned()
432        .collect();
433    let modified_views = new_views
434        .iter()
435        .filter_map(|(name, id)| {
436            if let Some(old_id) = old_views.get(name) {
437                if old_id != id {
438                    Some(name.clone())
439                } else {
440                    None
441                }
442            } else {
443                None
444            }
445        })
446        .collect();
447
448    ProgramDiff::new()
449        .with_added_tables(added_tables)
450        .with_removed_tables(removed_tables)
451        .with_modified_tables(modified_tables)
452        .with_added_views(added_views)
453        .with_removed_views(removed_views)
454        .with_modified_views(modified_views)
455}