1use feldera_ir::{MirNode, MirNodeId};
2use serde::{Deserialize, Serialize};
3use std::{collections::HashMap, fmt::Display};
4use utoipa::ToSchema;
5
6#[derive(Debug, Default, Clone, Serialize, Deserialize, ToSchema, PartialEq, Eq)]
8pub struct ProgramDiff {
9 added_tables: Vec<String>,
10 removed_tables: Vec<String>,
11 modified_tables: Vec<String>,
12
13 added_views: Vec<String>,
14 removed_views: Vec<String>,
15 modified_views: Vec<String>,
16}
17
18impl Display for ProgramDiff {
19 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 fn quoted_list(list: &[String]) -> String {
21 list.iter()
22 .map(|s| format!("'{}'", s))
23 .collect::<Vec<_>>()
24 .join(", ")
25 }
26
27 if !self.added_tables.is_empty() {
28 writeln!(f, "Added tables: {}", quoted_list(&self.added_tables))?;
29 }
30 if !self.removed_tables.is_empty() {
31 writeln!(f, "Removed tables: {}", quoted_list(&self.removed_tables))?;
32 }
33 if !self.modified_tables.is_empty() {
34 writeln!(f, "Modified tables: {}", quoted_list(&self.modified_tables))?;
35 }
36 if !self.added_views.is_empty() {
37 writeln!(f, "Added views: {}", quoted_list(&self.added_views))?;
38 }
39 if !self.removed_views.is_empty() {
40 writeln!(f, "Removed views: {}", quoted_list(&self.removed_views))?;
41 }
42 if !self.modified_views.is_empty() {
43 writeln!(f, "Modified views: {}", quoted_list(&self.modified_views))?;
44 }
45 Ok(())
46 }
47}
48
49impl ProgramDiff {
50 pub fn new() -> Self {
51 Self {
52 added_tables: Vec::new(),
53 removed_tables: Vec::new(),
54 modified_tables: Vec::new(),
55 added_views: Vec::new(),
56 removed_views: Vec::new(),
57 modified_views: Vec::new(),
58 }
59 }
60
61 pub fn with_added_tables(mut self, mut tables: Vec<String>) -> Self {
62 tables.sort();
63 self.added_tables = tables;
64 self
65 }
66
67 pub fn with_removed_tables(mut self, mut tables: Vec<String>) -> Self {
68 tables.sort();
69 self.removed_tables = tables;
70 self
71 }
72
73 pub fn with_modified_tables(mut self, mut tables: Vec<String>) -> Self {
74 tables.sort();
75 self.modified_tables = tables;
76 self
77 }
78
79 pub fn with_added_views(mut self, mut views: Vec<String>) -> Self {
80 views.sort();
81 self.added_views = views;
82 self
83 }
84
85 pub fn with_removed_views(mut self, mut views: Vec<String>) -> Self {
86 views.sort();
87 self.removed_views = views;
88 self
89 }
90
91 pub fn with_modified_views(mut self, mut views: Vec<String>) -> Self {
92 views.sort();
93 self.modified_views = views;
94 self
95 }
96
97 pub fn added_tables(&self) -> &Vec<String> {
98 &self.added_tables
99 }
100
101 pub fn removed_tables(&self) -> &Vec<String> {
102 &self.removed_tables
103 }
104
105 pub fn modified_tables(&self) -> &Vec<String> {
106 &self.modified_tables
107 }
108
109 pub fn added_views(&self) -> &Vec<String> {
110 &self.added_views
111 }
112
113 pub fn removed_views(&self) -> &Vec<String> {
114 &self.removed_views
115 }
116
117 pub fn modified_views(&self) -> &Vec<String> {
118 &self.modified_views
119 }
120
121 pub fn is_empty(&self) -> bool {
122 self.added_tables.is_empty()
123 && self.removed_tables.is_empty()
124 && self.modified_tables.is_empty()
125 && self.added_views.is_empty()
126 && self.removed_views.is_empty()
127 && self.modified_views.is_empty()
128 }
129
130 pub fn is_affected_relation(&self, relation_name: &str) -> bool {
131 let relation_name = relation_name.to_string();
132 self.added_tables.contains(&relation_name)
133 || self.removed_tables.contains(&relation_name)
134 || self.modified_tables.contains(&relation_name)
135 || self.added_views.contains(&relation_name)
136 || self.removed_views.contains(&relation_name)
137 || self.modified_views.contains(&relation_name)
138 }
139}
140
141#[derive(Debug, Serialize, Deserialize, ToSchema, Clone, PartialEq, Eq)]
143pub struct PipelineDiff {
144 program_diff: Option<ProgramDiff>,
146 program_diff_error: Option<String>,
147
148 added_input_connectors: Vec<String>,
149 modified_input_connectors: Vec<String>,
150 removed_input_connectors: Vec<String>,
151
152 added_output_connectors: Vec<String>,
153 modified_output_connectors: Vec<String>,
154 removed_output_connectors: Vec<String>,
155}
156
157impl Display for PipelineDiff {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 if let Some(err) = &self.program_diff_error {
160 writeln!(f, "Could not compute program diff: {err}")?;
161 }
162
163 if let Some(diff) = &self.program_diff
164 && !diff.is_empty()
165 {
166 writeln!(f, "Program changes:")?;
167 for change in diff.to_string().lines() {
168 writeln!(f, " {change}")?;
169 }
170 }
171
172 if !self.added_input_connectors.is_empty() {
173 writeln!(
174 f,
175 "Added input connectors: {}",
176 self.added_input_connectors.join(", ")
177 )?;
178 }
179
180 if !self.removed_input_connectors.is_empty() {
181 writeln!(
182 f,
183 "Removed input connectors: {}",
184 self.removed_input_connectors.join(", ")
185 )?;
186 }
187
188 if !self.modified_input_connectors.is_empty() {
189 writeln!(
190 f,
191 "Modified input connectors: {}",
192 self.modified_input_connectors.join(", ")
193 )?;
194 }
195
196 if !self.added_output_connectors.is_empty() {
197 writeln!(
198 f,
199 "Added output connectors: {}",
200 self.added_output_connectors.join(", ")
201 )?;
202 }
203
204 if !self.removed_output_connectors.is_empty() {
205 writeln!(
206 f,
207 "Removed output connectors: {}",
208 self.removed_output_connectors.join(", ")
209 )?;
210 }
211
212 if !self.modified_output_connectors.is_empty() {
213 writeln!(
214 f,
215 "Modified output connectors: {}",
216 self.modified_output_connectors.join(", ")
217 )?;
218 }
219
220 Ok(())
221 }
222}
223
224impl PipelineDiff {
225 pub fn new(program_diff_or_err: Result<ProgramDiff, String>) -> Self {
226 match program_diff_or_err {
227 Ok(program_diff) => Self::new_with_program_diff(program_diff),
228 Err(program_diff_error) => Self::new_with_program_diff_error(program_diff_error),
229 }
230 }
231
232 pub fn new_with_program_diff(program_diff: ProgramDiff) -> Self {
233 Self {
234 program_diff: Some(program_diff),
235 program_diff_error: None,
236 added_input_connectors: Vec::new(),
237 modified_input_connectors: Vec::new(),
238 removed_input_connectors: Vec::new(),
239 added_output_connectors: Vec::new(),
240 modified_output_connectors: Vec::new(),
241 removed_output_connectors: Vec::new(),
242 }
243 }
244
245 pub fn new_with_program_diff_error(program_diff_error: String) -> Self {
246 Self {
247 program_diff: None,
248 program_diff_error: Some(program_diff_error),
249 added_input_connectors: Vec::new(),
250 modified_input_connectors: Vec::new(),
251 removed_input_connectors: Vec::new(),
252 added_output_connectors: Vec::new(),
253 modified_output_connectors: Vec::new(),
254 removed_output_connectors: Vec::new(),
255 }
256 }
257
258 pub fn with_added_input_connectors(mut self, mut connectors: Vec<String>) -> Self {
259 connectors.sort();
260 self.added_input_connectors = connectors;
261 self
262 }
263
264 pub fn with_modified_input_connectors(mut self, mut connectors: Vec<String>) -> Self {
265 connectors.sort();
266 self.modified_input_connectors = connectors;
267 self
268 }
269
270 pub fn with_removed_input_connectors(mut self, mut connectors: Vec<String>) -> Self {
271 connectors.sort();
272 self.removed_input_connectors = connectors;
273 self
274 }
275
276 pub fn with_added_output_connectors(mut self, mut connectors: Vec<String>) -> Self {
277 connectors.sort();
278 self.added_output_connectors = connectors;
279 self
280 }
281
282 pub fn with_modified_output_connectors(mut self, mut connectors: Vec<String>) -> Self {
283 connectors.sort();
284 self.modified_output_connectors = connectors;
285 self
286 }
287
288 pub fn with_removed_output_connectors(mut self, mut connectors: Vec<String>) -> Self {
289 connectors.sort();
290 self.removed_output_connectors = connectors;
291 self
292 }
293
294 pub fn is_empty(&self) -> bool {
295 self.program_diff
296 .as_ref()
297 .map(|diff| diff.is_empty())
298 .unwrap_or(false)
299 && self.added_input_connectors.is_empty()
300 && self.removed_input_connectors.is_empty()
301 && self.modified_input_connectors.is_empty()
302 && self.added_output_connectors.is_empty()
303 && self.removed_output_connectors.is_empty()
304 && self.modified_output_connectors.is_empty()
305 }
306
307 pub fn clear_program_diff(&mut self) {
308 self.program_diff = Some(ProgramDiff::default());
309 }
310
311 pub fn is_affected_connector(&self, connector_name: &str) -> bool {
312 let connector_name = connector_name.to_string();
313 self.added_input_connectors.contains(&connector_name)
314 || self.removed_input_connectors.contains(&connector_name)
315 || self.modified_input_connectors.contains(&connector_name)
316 || self.added_output_connectors.contains(&connector_name)
317 || self.removed_output_connectors.contains(&connector_name)
318 || self.modified_output_connectors.contains(&connector_name)
319 }
320
321 pub fn program_diff(&self) -> Option<&ProgramDiff> {
322 self.program_diff.as_ref()
323 }
324
325 pub fn program_diff_error(&self) -> Option<&String> {
326 self.program_diff_error.as_ref()
327 }
328
329 pub fn added_input_connectors(&self) -> &Vec<String> {
330 &self.added_input_connectors
331 }
332
333 pub fn modified_input_connectors(&self) -> &Vec<String> {
334 &self.modified_input_connectors
335 }
336
337 pub fn removed_input_connectors(&self) -> &Vec<String> {
338 &self.removed_input_connectors
339 }
340
341 pub fn added_output_connectors(&self) -> &Vec<String> {
342 &self.added_output_connectors
343 }
344
345 pub fn modified_output_connectors(&self) -> &Vec<String> {
346 &self.modified_output_connectors
347 }
348
349 pub fn removed_output_connectors(&self) -> &Vec<String> {
350 &self.removed_output_connectors
351 }
352}
353
354pub fn program_diff(
355 old_mir: &HashMap<MirNodeId, MirNode>,
356 new_mir: &HashMap<MirNodeId, MirNode>,
357) -> ProgramDiff {
358 let old_tables: HashMap<String, String> = old_mir
359 .values()
360 .filter(|node| node.persistent_id.is_some())
361 .filter_map(|node| {
362 node.table
363 .as_ref()
364 .map(|name| (name.clone(), node.persistent_id.clone().unwrap()))
365 })
366 .collect();
367
368 let old_views: HashMap<String, String> = old_mir
369 .values()
370 .filter(|node| node.persistent_id.is_some())
371 .filter_map(|node| {
372 node.view
373 .as_ref()
374 .map(|name| (name.clone(), node.persistent_id.clone().unwrap()))
375 })
376 .collect();
377
378 let new_tables: HashMap<String, String> = new_mir
379 .values()
380 .filter(|node| node.persistent_id.is_some())
381 .filter_map(|node| {
382 node.table
383 .as_ref()
384 .map(|name| (name.clone(), node.persistent_id.clone().unwrap()))
385 })
386 .collect();
387
388 let new_views: HashMap<String, String> = new_mir
389 .values()
390 .filter(|node| node.persistent_id.is_some())
391 .filter_map(|node| {
392 node.view
393 .as_ref()
394 .map(|name| (name.clone(), node.persistent_id.clone().unwrap()))
395 })
396 .collect();
397
398 let added_tables = new_tables
399 .keys()
400 .filter(|k| !old_tables.contains_key(*k))
401 .cloned()
402 .collect();
403 let removed_tables = old_tables
404 .keys()
405 .filter(|k| !new_tables.contains_key(*k))
406 .cloned()
407 .collect();
408 let modified_tables = new_tables
409 .iter()
410 .filter_map(|(name, id)| {
411 if let Some(old_id) = old_tables.get(name) {
412 if old_id != id {
413 Some(name.clone())
414 } else {
415 None
416 }
417 } else {
418 None
419 }
420 })
421 .collect();
422
423 let added_views = new_views
424 .keys()
425 .filter(|k| !old_views.contains_key(*k))
426 .cloned()
427 .collect();
428 let removed_views = old_views
429 .keys()
430 .filter(|k| !new_views.contains_key(*k))
431 .cloned()
432 .collect();
433 let modified_views = new_views
434 .iter()
435 .filter_map(|(name, id)| {
436 if let Some(old_id) = old_views.get(name) {
437 if old_id != id {
438 Some(name.clone())
439 } else {
440 None
441 }
442 } else {
443 None
444 }
445 })
446 .collect();
447
448 ProgramDiff::new()
449 .with_added_tables(added_tables)
450 .with_removed_tables(removed_tables)
451 .with_modified_tables(modified_tables)
452 .with_added_views(added_views)
453 .with_removed_views(removed_views)
454 .with_modified_views(modified_views)
455}