1use crate::ops::interface::AttachmentSetupChange;
14use crate::prelude::*;
28
29use indenter::indented;
30use std::any::Any;
31use std::fmt::Debug;
32use std::fmt::{Display, Write};
33use std::hash::Hash;
34
35#[cfg(feature = "persistence")]
36use super::db_metadata;
37#[cfg(feature = "persistence")]
38use crate::execution::db_tracking_setup::{TrackingTableSetupChange, TrackingTableSetupState};
39
40const INDENT: &str = " ";
41
42pub trait StateMode: Clone + Copy {
43 type State<T: Debug + Clone>: Debug + Clone;
44 type DefaultState<T: Debug + Clone + Default>: Debug + Clone + Default;
45}
46
47#[derive(Debug, Clone, Copy)]
48pub struct DesiredMode;
49impl StateMode for DesiredMode {
50 type State<T: Debug + Clone> = T;
51 type DefaultState<T: Debug + Clone + Default> = T;
52}
53
54#[derive(Debug, Clone)]
55pub struct CombinedState<T> {
56 pub current: Option<T>,
57 pub staging: Vec<StateChange<T>>,
58 pub legacy_state_key: Option<serde_json::Value>,
61}
62
63impl<T> CombinedState<T> {
64 pub fn current(desired: T) -> Self {
65 Self {
66 current: Some(desired),
67 staging: vec![],
68 legacy_state_key: None,
69 }
70 }
71
72 pub fn staging(change: StateChange<T>) -> Self {
73 Self {
74 current: None,
75 staging: vec![change],
76 legacy_state_key: None,
77 }
78 }
79
80 pub fn from_change(prev: Option<CombinedState<T>>, change: Option<Option<&T>>) -> Self
81 where
82 T: Clone,
83 {
84 Self {
85 current: match change {
86 Some(Some(state)) => Some(state.clone()),
87 Some(None) => None,
88 None => prev.and_then(|v| v.current),
89 },
90 staging: vec![],
91 legacy_state_key: None,
92 }
93 }
94
95 pub fn possible_versions(&self) -> impl Iterator<Item = &T> {
96 self.current
97 .iter()
98 .chain(self.staging.iter().flat_map(|s| s.state().into_iter()))
99 }
100
101 pub fn always_exists(&self) -> bool {
102 self.current.is_some() && self.staging.iter().all(|s| !s.is_delete())
103 }
104
105 pub fn always_exists_and(&self, predicate: impl Fn(&T) -> bool) -> bool {
106 self.always_exists() && self.possible_versions().all(predicate)
107 }
108
109 pub fn legacy_values<V: Ord + Eq, F: Fn(&T) -> &V>(
110 &self,
111 desired: Option<&T>,
112 f: F,
113 ) -> BTreeSet<&V> {
114 let desired_value = desired.map(&f);
115 self.possible_versions()
116 .map(f)
117 .filter(|v| Some(*v) != desired_value)
118 .collect()
119 }
120
121 pub fn has_state_diff<S>(&self, state: Option<&S>, map_fn: impl Fn(&T) -> &S) -> bool
122 where
123 S: PartialEq,
124 {
125 if let Some(state) = state {
126 !self.always_exists_and(|s| map_fn(s) == state)
127 } else {
128 self.possible_versions().next().is_some()
129 }
130 }
131}
132
133impl<T: Debug + Clone> Default for CombinedState<T> {
134 fn default() -> Self {
135 Self {
136 current: None,
137 staging: vec![],
138 legacy_state_key: None,
139 }
140 }
141}
142
143impl<T: PartialEq + Debug + Clone> PartialEq<T> for CombinedState<T> {
144 fn eq(&self, other: &T) -> bool {
145 self.staging.is_empty() && self.current.as_ref() == Some(other)
146 }
147}
148
149#[derive(Clone, Copy)]
150pub struct ExistingMode;
151impl StateMode for ExistingMode {
152 type State<T: Debug + Clone> = CombinedState<T>;
153 type DefaultState<T: Debug + Clone + Default> = CombinedState<T>;
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
157pub enum StateChange<State> {
158 Upsert(State),
159 Delete,
160}
161
162impl<State> StateChange<State> {
163 pub fn is_delete(&self) -> bool {
164 matches!(self, StateChange::Delete)
165 }
166
167 pub fn desired_state(&self) -> Option<&State> {
168 match self {
169 StateChange::Upsert(state) => Some(state),
170 StateChange::Delete => None,
171 }
172 }
173
174 pub fn state(&self) -> Option<&State> {
175 match self {
176 StateChange::Upsert(state) => Some(state),
177 StateChange::Delete => None,
178 }
179 }
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
183pub struct SourceSetupState {
184 pub source_id: i32,
185
186 #[serde(default, skip_serializing_if = "Option::is_none")]
187 pub keys_schema: Option<Box<[schema::ValueType]>>,
188
189 #[cfg(feature = "legacy-states-v0")]
191 #[serde(default, skip_serializing_if = "Option::is_none")]
192 pub key_schema: Option<schema::ValueType>,
193
194 #[serde(default)]
196 pub source_kind: String,
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
200pub struct ResourceIdentifier {
201 pub key: serde_json::Value,
202 pub target_kind: String,
203}
204
205impl Display for ResourceIdentifier {
206 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207 write!(f, "{}:{}", self.target_kind, self.key)
208 }
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
213pub struct TargetSetupStateCommon {
214 pub target_id: i32,
215
216 pub schema_version_id: usize,
222 pub max_schema_version_id: usize,
223
224 #[serde(default)]
225 pub setup_by_user: bool,
226 #[serde(default)]
227 pub key_type: Option<Box<[schema::ValueType]>>,
228}
229
230#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
231pub struct TargetSetupState {
232 pub common: TargetSetupStateCommon,
233
234 pub state: serde_json::Value,
235
236 #[serde(
237 default,
238 with = "indexmap::map::serde_seq",
239 skip_serializing_if = "IndexMap::is_empty"
240 )]
241 pub attachments: IndexMap<interface::AttachmentSetupKey, serde_json::Value>,
242}
243
244impl TargetSetupState {
245 pub fn state_unless_setup_by_user(self) -> Option<serde_json::Value> {
246 (!self.common.setup_by_user).then_some(self.state)
247 }
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
251pub struct FlowSetupMetadata {
252 pub last_source_id: i32,
253 pub last_target_id: i32,
254 pub sources: BTreeMap<String, SourceSetupState>,
255 #[serde(default)]
256 pub features: BTreeSet<String>,
257}
258
259#[derive(Debug, Clone)]
260pub struct FlowSetupState<Mode: StateMode> {
261 pub seen_flow_metadata_version: Option<u64>,
263 pub metadata: Mode::DefaultState<FlowSetupMetadata>,
264 #[cfg(feature = "persistence")]
265 pub tracking_table: Mode::State<TrackingTableSetupState>,
266 pub targets: IndexMap<ResourceIdentifier, Mode::State<TargetSetupState>>,
267}
268
269impl Default for FlowSetupState<ExistingMode> {
270 fn default() -> Self {
271 Self {
272 seen_flow_metadata_version: None,
273 metadata: Default::default(),
274 #[cfg(feature = "persistence")]
275 tracking_table: Default::default(),
276 targets: IndexMap::new(),
277 }
278 }
279}
280
281impl PartialEq for FlowSetupState<DesiredMode> {
282 fn eq(&self, other: &Self) -> bool {
283 self.metadata == other.metadata
284 && {
285 #[cfg(feature = "persistence")]
286 {
287 self.tracking_table == other.tracking_table
288 }
289 #[cfg(not(feature = "persistence"))]
290 {
291 true
292 }
293 }
294 && self.targets == other.targets
295 }
296}
297
298#[derive(Debug, Clone)]
299pub struct AllSetupStates<Mode: StateMode> {
300 pub has_metadata_table: bool,
301 pub flows: BTreeMap<String, FlowSetupState<Mode>>,
302}
303
304impl<Mode: StateMode> Default for AllSetupStates<Mode> {
305 fn default() -> Self {
306 Self {
307 has_metadata_table: false,
308 flows: BTreeMap::new(),
309 }
310 }
311}
312
313#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
314pub enum SetupChangeType {
315 NoChange,
316 Create,
317 Update,
318 Delete,
319 Invalid,
320}
321
322pub enum ChangeDescription {
323 Action(String),
324 Note(String),
325}
326
327pub trait ResourceSetupChange: Send + Sync + Any + 'static {
328 fn describe_changes(&self) -> Vec<ChangeDescription>;
329
330 fn change_type(&self) -> SetupChangeType;
331}
332
333impl ResourceSetupChange for Box<dyn ResourceSetupChange> {
334 fn describe_changes(&self) -> Vec<ChangeDescription> {
335 self.as_ref().describe_changes()
336 }
337
338 fn change_type(&self) -> SetupChangeType {
339 self.as_ref().change_type()
340 }
341}
342
343impl ResourceSetupChange for std::convert::Infallible {
344 fn describe_changes(&self) -> Vec<ChangeDescription> {
345 unreachable!()
346 }
347
348 fn change_type(&self) -> SetupChangeType {
349 unreachable!()
350 }
351}
352
353#[derive(Debug)]
354pub struct ResourceSetupInfo<K, S, C: ResourceSetupChange> {
355 pub key: K,
356 pub state: Option<S>,
357 pub has_tracked_state_change: bool,
358 pub description: String,
359
360 pub setup_change: Option<C>,
362
363 pub legacy_key: Option<K>,
364}
365
366impl<K, S, C: ResourceSetupChange> std::fmt::Display for ResourceSetupInfo<K, S, C> {
367 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
368 let status_code = match self.setup_change.as_ref().map(|c| c.change_type()) {
369 Some(SetupChangeType::NoChange) => "READY",
370 Some(SetupChangeType::Create) => "TO CREATE",
371 Some(SetupChangeType::Update) => "TO UPDATE",
372 Some(SetupChangeType::Delete) => "TO DELETE",
373 Some(SetupChangeType::Invalid) => "INVALID",
374 None => "USER MANAGED",
375 };
376 let status_str = format!("[ {status_code:^9} ]");
377 let status_full = status_str;
378 let desc_colored = &self.description;
379 writeln!(f, "{status_full} {desc_colored}")?;
380 if let Some(setup_change) = &self.setup_change {
381 let changes = setup_change.describe_changes();
382 if !changes.is_empty() {
383 let mut f = indented(f).with_str(INDENT);
384 writeln!(f)?;
385 for change in changes {
386 match change {
387 ChangeDescription::Action(action) => {
388 writeln!(f, "TODO: {}", action)?;
389 }
390 ChangeDescription::Note(note) => {
391 writeln!(f, "NOTE: {}", note)?;
392 }
393 }
394 }
395 writeln!(f)?;
396 }
397 }
398 Ok(())
399 }
400}
401
402impl<K, S, C: ResourceSetupChange> ResourceSetupInfo<K, S, C> {
403 pub fn is_up_to_date(&self) -> bool {
404 self.setup_change
405 .as_ref()
406 .is_none_or(|c| c.change_type() == SetupChangeType::NoChange)
407 }
408}
409
410#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
411pub enum ObjectStatus {
412 Invalid,
413 New,
414 Existing,
415 Deleted,
416}
417
418pub trait ObjectSetupChange {
419 fn status(&self) -> Option<ObjectStatus>;
420
421 fn has_internal_changes(&self) -> bool;
423
424 fn has_external_changes(&self) -> bool;
426
427 fn is_up_to_date(&self) -> bool {
428 !self.has_internal_changes() && !self.has_external_changes()
429 }
430}
431
432#[derive(Default)]
433pub struct AttachmentsSetupChange {
434 pub has_tracked_state_change: bool,
435 pub deletes: Vec<Box<dyn AttachmentSetupChange + Send + Sync>>,
436 pub upserts: Vec<Box<dyn AttachmentSetupChange + Send + Sync>>,
437}
438
439impl AttachmentsSetupChange {
440 pub fn is_empty(&self) -> bool {
441 self.deletes.is_empty() && self.upserts.is_empty()
442 }
443}
444
445pub struct TargetSetupChange {
446 pub target_change: Box<dyn ResourceSetupChange>,
447 pub attachments_change: AttachmentsSetupChange,
448}
449
450impl ResourceSetupChange for TargetSetupChange {
451 fn describe_changes(&self) -> Vec<ChangeDescription> {
452 let mut result = vec![];
453 self.attachments_change
454 .deletes
455 .iter()
456 .flat_map(|a| a.describe_changes().into_iter())
457 .for_each(|change| result.push(ChangeDescription::Action(change)));
458 result.extend(self.target_change.describe_changes());
459 self.attachments_change
460 .upserts
461 .iter()
462 .flat_map(|a| a.describe_changes().into_iter())
463 .for_each(|change| result.push(ChangeDescription::Action(change)));
464 result
465 }
466
467 fn change_type(&self) -> SetupChangeType {
468 match self.target_change.change_type() {
469 SetupChangeType::NoChange => {
470 if self.attachments_change.is_empty() {
471 SetupChangeType::NoChange
472 } else {
473 SetupChangeType::Update
474 }
475 }
476 t => t,
477 }
478 }
479}
480
481pub struct FlowSetupChange {
482 pub status: Option<ObjectStatus>,
483 pub seen_flow_metadata_version: Option<u64>,
484
485 pub metadata_change: Option<StateChange<FlowSetupMetadata>>,
486
487 #[cfg(feature = "persistence")]
488 pub tracking_table:
489 Option<ResourceSetupInfo<(), TrackingTableSetupState, TrackingTableSetupChange>>,
490 pub target_resources:
491 Vec<ResourceSetupInfo<ResourceIdentifier, TargetSetupState, TargetSetupChange>>,
492
493 pub unknown_resources: Vec<ResourceIdentifier>,
494}
495
496impl ObjectSetupChange for FlowSetupChange {
497 fn status(&self) -> Option<ObjectStatus> {
498 self.status
499 }
500
501 fn has_internal_changes(&self) -> bool {
502 #[allow(unused_mut)]
503 let mut changes = self.metadata_change.is_some();
504 #[cfg(feature = "persistence")]
505 {
506 changes = changes
507 || self
508 .tracking_table
509 .as_ref()
510 .is_some_and(|t| t.has_tracked_state_change);
511 }
512 changes
513 || self
514 .target_resources
515 .iter()
516 .any(|target| target.has_tracked_state_change)
517 }
518
519 fn has_external_changes(&self) -> bool {
520 #[allow(unused_mut)]
521 let mut changes = false;
522 #[cfg(feature = "persistence")]
523 {
524 changes = changes
525 || self
526 .tracking_table
527 .as_ref()
528 .is_some_and(|t| !t.is_up_to_date());
529 }
530 changes
531 || self
532 .target_resources
533 .iter()
534 .any(|target| !target.is_up_to_date())
535 }
536}
537
538#[derive(Debug)]
539pub struct GlobalSetupChange {
540 #[cfg(feature = "persistence")]
541 pub metadata_table: ResourceSetupInfo<(), (), db_metadata::MetadataTableSetup>,
542}
543
544impl GlobalSetupChange {
545 pub fn from_setup_states(_setup_states: &AllSetupStates<ExistingMode>) -> Self {
546 Self {
547 #[cfg(feature = "persistence")]
548 metadata_table: db_metadata::MetadataTableSetup {
549 metadata_table_missing: !_setup_states.has_metadata_table,
550 }
551 .into_setup_info(),
552 }
553 }
554
555 pub fn is_up_to_date(&self) -> bool {
556 #[cfg(feature = "persistence")]
557 {
558 self.metadata_table.is_up_to_date()
559 }
560 #[cfg(not(feature = "persistence"))]
561 {
562 true
563 }
564 }
565}
566
567pub struct ObjectSetupChangeCode<'a, Status: ObjectSetupChange>(&'a Status);
568impl<Status: ObjectSetupChange> std::fmt::Display for ObjectSetupChangeCode<'_, Status> {
569 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
570 let Some(status) = self.0.status() else {
571 return Ok(());
572 };
573 write!(
574 f,
575 "[ {:^9} ]",
576 match status {
577 ObjectStatus::New => "TO CREATE",
578 ObjectStatus::Existing =>
579 if self.0.is_up_to_date() {
580 "READY"
581 } else {
582 "TO UPDATE"
583 },
584 ObjectStatus::Deleted => "TO DELETE",
585 ObjectStatus::Invalid => "INVALID",
586 }
587 )
588 }
589}
590
591impl std::fmt::Display for GlobalSetupChange {
592 fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
593 #[cfg(feature = "persistence")]
594 {
595 writeln!(_f, "{}", self.metadata_table)
596 }
597 #[cfg(not(feature = "persistence"))]
598 {
599 Ok(())
600 }
601 }
602}
603
604pub struct FormattedFlowSetupChange<'a>(pub &'a str, pub &'a FlowSetupChange);
605
606impl std::fmt::Display for FormattedFlowSetupChange<'_> {
607 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
608 let flow_setup_change = self.1;
609 if flow_setup_change.status.is_none() {
610 return Ok(());
611 }
612
613 writeln!(
614 f,
615 "{} Flow: {}",
616 ObjectSetupChangeCode(flow_setup_change),
617 self.0
618 )?;
619
620 let mut f = indented(f).with_str(INDENT);
621 #[cfg(feature = "persistence")]
622 if let Some(tracking_table) = &flow_setup_change.tracking_table {
623 write!(f, "{tracking_table}")?;
624 }
625 for target_resource in &flow_setup_change.target_resources {
626 write!(f, "{target_resource}")?;
627 }
628 for resource in &flow_setup_change.unknown_resources {
629 writeln!(f, "[ UNKNOWN ] {resource}")?;
630 }
631
632 Ok(())
633 }
634}