1use feldera_ir::{MirNode, MirNodeId};
2use serde::{Deserialize, Serialize};
3use std::{collections::HashMap, fmt::Display};
4use utoipa::ToSchema;
5
6#[derive(Debug, Default, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
8pub struct ProgramDiff {
9 added_tables: Vec<String>,
10 removed_tables: Vec<String>,
11 modified_tables: Vec<String>,
12
13 added_views: Vec<String>,
14 removed_views: Vec<String>,
15 modified_views: Vec<String>,
16}
17
18impl Display for ProgramDiff {
19 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 fn quoted_list(list: &[String]) -> String {
21 list.iter()
22 .map(|s| format!("'{}'", s))
23 .collect::<Vec<_>>()
24 .join(", ")
25 }
26
27 if !self.added_tables.is_empty() {
28 writeln!(f, "Added tables: {}", quoted_list(&self.added_tables))?;
29 }
30 if !self.removed_tables.is_empty() {
31 writeln!(f, "Removed tables: {}", quoted_list(&self.removed_tables))?;
32 }
33 if !self.modified_tables.is_empty() {
34 writeln!(f, "Modified tables: {}", quoted_list(&self.modified_tables))?;
35 }
36 if !self.added_views.is_empty() {
37 writeln!(f, "Added views: {}", quoted_list(&self.added_views))?;
38 }
39 if !self.removed_views.is_empty() {
40 writeln!(f, "Removed views: {}", quoted_list(&self.removed_views))?;
41 }
42 if !self.modified_views.is_empty() {
43 writeln!(f, "Modified views: {}", quoted_list(&self.modified_views))?;
44 }
45 Ok(())
46 }
47}
48
49impl ProgramDiff {
50 pub fn new() -> Self {
51 Self {
52 added_tables: Vec::new(),
53 removed_tables: Vec::new(),
54 modified_tables: Vec::new(),
55 added_views: Vec::new(),
56 removed_views: Vec::new(),
57 modified_views: Vec::new(),
58 }
59 }
60
61 pub fn with_added_tables(mut self, mut tables: Vec<String>) -> Self {
62 tables.sort();
63 self.added_tables = tables;
64 self
65 }
66
67 pub fn with_removed_tables(mut self, mut tables: Vec<String>) -> Self {
68 tables.sort();
69 self.removed_tables = tables;
70 self
71 }
72
73 pub fn with_modified_tables(mut self, mut tables: Vec<String>) -> Self {
74 tables.sort();
75 self.modified_tables = tables;
76 self
77 }
78
79 pub fn with_added_views(mut self, mut views: Vec<String>) -> Self {
80 views.sort();
81 self.added_views = views;
82 self
83 }
84
85 pub fn with_removed_views(mut self, mut views: Vec<String>) -> Self {
86 views.sort();
87 self.removed_views = views;
88 self
89 }
90
91 pub fn with_modified_views(mut self, mut views: Vec<String>) -> Self {
92 views.sort();
93 self.modified_views = views;
94 self
95 }
96
97 pub fn added_tables(&self) -> &Vec<String> {
98 &self.added_tables
99 }
100
101 pub fn removed_tables(&self) -> &Vec<String> {
102 &self.removed_tables
103 }
104
105 pub fn modified_tables(&self) -> &Vec<String> {
106 &self.modified_tables
107 }
108
109 pub fn added_views(&self) -> &Vec<String> {
110 &self.added_views
111 }
112
113 pub fn removed_views(&self) -> &Vec<String> {
114 &self.removed_views
115 }
116
117 pub fn modified_views(&self) -> &Vec<String> {
118 &self.modified_views
119 }
120
121 pub fn is_empty(&self) -> bool {
122 self.added_tables.is_empty()
123 && self.removed_tables.is_empty()
124 && self.modified_tables.is_empty()
125 && self.added_views.is_empty()
126 && self.removed_views.is_empty()
127 && self.modified_views.is_empty()
128 }
129
130 pub fn is_affected_relation(&self, relation_name: &str) -> bool {
131 let relation_name = relation_name.to_string();
132 self.added_tables.contains(&relation_name)
133 || self.removed_tables.contains(&relation_name)
134 || self.modified_tables.contains(&relation_name)
135 || self.added_views.contains(&relation_name)
136 || self.removed_views.contains(&relation_name)
137 || self.modified_views.contains(&relation_name)
138 }
139}
140
141#[derive(Debug, Serialize, Deserialize, ToSchema, Clone, PartialEq, Eq)]
143pub struct PipelineDiff {
144 program_diff: Option<ProgramDiff>,
146 program_diff_error: Option<String>,
147
148 added_input_connectors: Vec<String>,
149 modified_input_connectors: Vec<String>,
150 removed_input_connectors: Vec<String>,
151
152 added_output_connectors: Vec<String>,
153 modified_output_connectors: Vec<String>,
154 removed_output_connectors: Vec<String>,
155}
156
157impl Display for PipelineDiff {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 if let Some(err) = &self.program_diff_error {
160 writeln!(f, "Could not compute program diff: {err}")?;
161 }
162
163 if let Some(diff) = &self.program_diff
164 && !diff.is_empty()
165 {
166 writeln!(f, "Program changes:")?;
167 for change in diff.to_string().lines() {
168 writeln!(f, " {change}")?;
169 }
170 }
171
172 if !self.added_input_connectors.is_empty() {
173 writeln!(
174 f,
175 "Added input connectors: {}",
176 self.added_input_connectors.join(", ")
177 )?;
178 }
179
180 if !self.removed_input_connectors.is_empty() {
181 writeln!(
182 f,
183 "Removed input connectors: {}",
184 self.removed_input_connectors.join(", ")
185 )?;
186 }
187
188 if !self.modified_input_connectors.is_empty() {
189 writeln!(
190 f,
191 "Modified input connectors: {}",
192 self.modified_input_connectors.join(", ")
193 )?;
194 }
195
196 if !self.added_output_connectors.is_empty() {
197 writeln!(
198 f,
199 "Added output connectors: {}",
200 self.added_output_connectors.join(", ")
201 )?;
202 }
203
204 if !self.removed_output_connectors.is_empty() {
205 writeln!(
206 f,
207 "Removed output connectors: {}",
208 self.removed_output_connectors.join(", ")
209 )?;
210 }
211
212 if !self.modified_output_connectors.is_empty() {
213 writeln!(
214 f,
215 "Modified output connectors: {}",
216 self.modified_output_connectors.join(", ")
217 )?;
218 }
219
220 Ok(())
221 }
222}
223
224impl PipelineDiff {
225 pub fn new(program_diff_or_err: Result<ProgramDiff, String>) -> Self {
226 match program_diff_or_err {
227 Ok(program_diff) => Self::new_with_program_diff(program_diff),
228 Err(program_diff_error) => Self::new_with_program_diff_error(program_diff_error),
229 }
230 }
231
232 pub fn new_with_program_diff(program_diff: ProgramDiff) -> Self {
233 Self {
234 program_diff: Some(program_diff),
235 program_diff_error: None,
236 added_input_connectors: Vec::new(),
237 modified_input_connectors: Vec::new(),
238 removed_input_connectors: Vec::new(),
239 added_output_connectors: Vec::new(),
240 modified_output_connectors: Vec::new(),
241 removed_output_connectors: Vec::new(),
242 }
243 }
244
245 pub fn new_with_program_diff_error(program_diff_error: String) -> Self {
246 Self {
247 program_diff: None,
248 program_diff_error: Some(program_diff_error),
249 added_input_connectors: Vec::new(),
250 modified_input_connectors: Vec::new(),
251 removed_input_connectors: Vec::new(),
252 added_output_connectors: Vec::new(),
253 modified_output_connectors: Vec::new(),
254 removed_output_connectors: Vec::new(),
255 }
256 }
257
258 pub fn with_added_input_connectors(mut self, mut connectors: Vec<String>) -> Self {
259 connectors.sort();
260 self.added_input_connectors = connectors;
261 self
262 }
263
264 pub fn with_modified_input_connectors(mut self, mut connectors: Vec<String>) -> Self {
265 connectors.sort();
266 self.modified_input_connectors = connectors;
267 self
268 }
269
270 pub fn with_removed_input_connectors(mut self, mut connectors: Vec<String>) -> Self {
271 connectors.sort();
272 self.removed_input_connectors = connectors;
273 self
274 }
275
276 pub fn with_added_output_connectors(mut self, mut connectors: Vec<String>) -> Self {
277 connectors.sort();
278 self.added_output_connectors = connectors;
279 self
280 }
281
282 pub fn with_modified_output_connectors(mut self, mut connectors: Vec<String>) -> Self {
283 connectors.sort();
284 self.modified_output_connectors = connectors;
285 self
286 }
287
288 pub fn with_removed_output_connectors(mut self, mut connectors: Vec<String>) -> Self {
289 connectors.sort();
290 self.removed_output_connectors = connectors;
291 self
292 }
293
294 pub fn is_empty(&self) -> bool {
295 self.program_diff
296 .as_ref()
297 .map(|diff| diff.is_empty())
298 .unwrap_or(false)
299 && self.added_input_connectors.is_empty()
300 && self.removed_input_connectors.is_empty()
301 && self.modified_input_connectors.is_empty()
302 && self.added_output_connectors.is_empty()
303 && self.removed_output_connectors.is_empty()
304 && self.modified_output_connectors.is_empty()
305 }
306
307 pub fn clear_program_diff(&mut self) {
308 self.program_diff = Some(ProgramDiff::default());
309 }
310
311 pub fn is_affected_connector(&self, connector_name: &str) -> bool {
312 let connector_name = connector_name.to_string();
313 self.added_input_connectors.contains(&connector_name)
314 || self.removed_input_connectors.contains(&connector_name)
315 || self.modified_input_connectors.contains(&connector_name)
316 || self.added_output_connectors.contains(&connector_name)
317 || self.removed_output_connectors.contains(&connector_name)
318 || self.modified_output_connectors.contains(&connector_name)
319 }
320
321 pub fn is_affected_relation(&self, relation_name: &str) -> bool {
322 self.program_diff
323 .as_ref()
324 .map(|diff| diff.is_affected_relation(relation_name))
325 .unwrap_or(false)
326 }
327
328 pub fn program_diff(&self) -> Option<&ProgramDiff> {
329 self.program_diff.as_ref()
330 }
331
332 pub fn program_diff_error(&self) -> Option<&String> {
333 self.program_diff_error.as_ref()
334 }
335
336 pub fn added_input_connectors(&self) -> &Vec<String> {
337 &self.added_input_connectors
338 }
339
340 pub fn modified_input_connectors(&self) -> &Vec<String> {
341 &self.modified_input_connectors
342 }
343
344 pub fn removed_input_connectors(&self) -> &Vec<String> {
345 &self.removed_input_connectors
346 }
347
348 pub fn added_output_connectors(&self) -> &Vec<String> {
349 &self.added_output_connectors
350 }
351
352 pub fn modified_output_connectors(&self) -> &Vec<String> {
353 &self.modified_output_connectors
354 }
355
356 pub fn removed_output_connectors(&self) -> &Vec<String> {
357 &self.removed_output_connectors
358 }
359}
360
361pub fn program_diff(
362 old_mir: &HashMap<MirNodeId, MirNode>,
363 new_mir: &HashMap<MirNodeId, MirNode>,
364) -> ProgramDiff {
365 let old_tables: HashMap<String, String> = old_mir
366 .values()
367 .filter(|node| node.persistent_id.is_some())
368 .filter_map(|node| {
369 node.table
370 .as_ref()
371 .map(|name| (name.clone(), node.persistent_id.clone().unwrap()))
372 })
373 .collect();
374
375 let old_views: HashMap<String, String> = old_mir
376 .values()
377 .filter(|node| node.persistent_id.is_some())
378 .filter_map(|node| {
379 node.view
380 .as_ref()
381 .map(|name| (name.clone(), node.persistent_id.clone().unwrap()))
382 })
383 .collect();
384
385 let new_tables: HashMap<String, String> = new_mir
386 .values()
387 .filter(|node| node.persistent_id.is_some())
388 .filter_map(|node| {
389 node.table
390 .as_ref()
391 .map(|name| (name.clone(), node.persistent_id.clone().unwrap()))
392 })
393 .collect();
394
395 let new_views: HashMap<String, String> = new_mir
396 .values()
397 .filter(|node| node.persistent_id.is_some())
398 .filter_map(|node| {
399 node.view
400 .as_ref()
401 .map(|name| (name.clone(), node.persistent_id.clone().unwrap()))
402 })
403 .collect();
404
405 let added_tables = new_tables
406 .keys()
407 .filter(|k| !old_tables.contains_key(*k))
408 .cloned()
409 .collect();
410 let removed_tables = old_tables
411 .keys()
412 .filter(|k| !new_tables.contains_key(*k))
413 .cloned()
414 .collect();
415 let modified_tables = new_tables
416 .iter()
417 .filter_map(|(name, id)| {
418 if let Some(old_id) = old_tables.get(name) {
419 if old_id != id {
420 Some(name.clone())
421 } else {
422 None
423 }
424 } else {
425 None
426 }
427 })
428 .collect();
429
430 let added_views = new_views
431 .keys()
432 .filter(|k| !old_views.contains_key(*k))
433 .cloned()
434 .collect();
435 let removed_views = old_views
436 .keys()
437 .filter(|k| !new_views.contains_key(*k))
438 .cloned()
439 .collect();
440 let modified_views = new_views
441 .iter()
442 .filter_map(|(name, id)| {
443 if let Some(old_id) = old_views.get(name) {
444 if old_id != id {
445 Some(name.clone())
446 } else {
447 None
448 }
449 } else {
450 None
451 }
452 })
453 .collect();
454
455 ProgramDiff::new()
456 .with_added_tables(added_tables)
457 .with_removed_tables(removed_tables)
458 .with_modified_tables(modified_tables)
459 .with_added_views(added_views)
460 .with_removed_views(removed_views)
461 .with_modified_views(modified_views)
462}