Skip to main content

recoco_core/setup/
states.rs

1// ReCoco is a Rust-only fork of CocoIndex, by [CocoIndex](https://CocoIndex)
2// Original code from CocoIndex is copyrighted by CocoIndex
3// SPDX-FileCopyrightText: 2025-2026 CocoIndex (upstream)
4// SPDX-FileContributor: CocoIndex Contributors
5//
6// All modifications from the upstream for ReCoco are copyrighted by Knitli Inc.
7// SPDX-FileCopyrightText: 2026 Knitli Inc. (ReCoco)
8// SPDX-FileContributor: Adam Poulemanos <adam@knit.li>
9//
10// Both the upstream CocoIndex code and the ReCoco modifications are licensed under the Apache-2.0 License.
11// SPDX-License-Identifier: Apache-2.0
12
13use crate::ops::interface::AttachmentSetupChange;
14/// Concepts:
15/// - Resource: some setup that needs to be tracked and maintained.
16/// - Setup State: current state of a resource.
17/// - Staging Change: states changes that may not be really applied yet.
18/// - Combined Setup State: Setup State + Staging Change.
19/// - Status Check: information about changes that are being applied / need to be applied.
20///
21/// Resource hierarchy:
22/// - [resource: setup metadata table] /// - Flow
23///   - [resource: metadata]
24///   - [resource: tracking table]
25///   - Target
26///     - [resource: target-specific stuff]
27use crate::prelude::*;
28
29use indenter::indented;
30use std::any::Any;
31use std::fmt::Debug;
32use std::fmt::{Display, Write};
33use std::hash::Hash;
34
35#[cfg(feature = "persistence")]
36use super::db_metadata;
37#[cfg(feature = "persistence")]
38use crate::execution::db_tracking_setup::{TrackingTableSetupChange, TrackingTableSetupState};
39
40const INDENT: &str = "    ";
41
42pub trait StateMode: Clone + Copy {
43    type State<T: Debug + Clone>: Debug + Clone;
44    type DefaultState<T: Debug + Clone + Default>: Debug + Clone + Default;
45}
46
47#[derive(Debug, Clone, Copy)]
48pub struct DesiredMode;
49impl StateMode for DesiredMode {
50    type State<T: Debug + Clone> = T;
51    type DefaultState<T: Debug + Clone + Default> = T;
52}
53
54#[derive(Debug, Clone)]
55pub struct CombinedState<T> {
56    pub current: Option<T>,
57    pub staging: Vec<StateChange<T>>,
58    /// Legacy state keys that no longer identical to the latest serialized form (usually caused by code change).
59    /// They will be deleted when the next change is applied.
60    pub legacy_state_key: Option<serde_json::Value>,
61}
62
63impl<T> CombinedState<T> {
64    pub fn current(desired: T) -> Self {
65        Self {
66            current: Some(desired),
67            staging: vec![],
68            legacy_state_key: None,
69        }
70    }
71
72    pub fn staging(change: StateChange<T>) -> Self {
73        Self {
74            current: None,
75            staging: vec![change],
76            legacy_state_key: None,
77        }
78    }
79
80    pub fn from_change(prev: Option<CombinedState<T>>, change: Option<Option<&T>>) -> Self
81    where
82        T: Clone,
83    {
84        Self {
85            current: match change {
86                Some(Some(state)) => Some(state.clone()),
87                Some(None) => None,
88                None => prev.and_then(|v| v.current),
89            },
90            staging: vec![],
91            legacy_state_key: None,
92        }
93    }
94
95    pub fn possible_versions(&self) -> impl Iterator<Item = &T> {
96        self.current
97            .iter()
98            .chain(self.staging.iter().flat_map(|s| s.state().into_iter()))
99    }
100
101    pub fn always_exists(&self) -> bool {
102        self.current.is_some() && self.staging.iter().all(|s| !s.is_delete())
103    }
104
105    pub fn always_exists_and(&self, predicate: impl Fn(&T) -> bool) -> bool {
106        self.always_exists() && self.possible_versions().all(predicate)
107    }
108
109    pub fn legacy_values<V: Ord + Eq, F: Fn(&T) -> &V>(
110        &self,
111        desired: Option<&T>,
112        f: F,
113    ) -> BTreeSet<&V> {
114        let desired_value = desired.map(&f);
115        self.possible_versions()
116            .map(f)
117            .filter(|v| Some(*v) != desired_value)
118            .collect()
119    }
120
121    pub fn has_state_diff<S>(&self, state: Option<&S>, map_fn: impl Fn(&T) -> &S) -> bool
122    where
123        S: PartialEq,
124    {
125        if let Some(state) = state {
126            !self.always_exists_and(|s| map_fn(s) == state)
127        } else {
128            self.possible_versions().next().is_some()
129        }
130    }
131}
132
133impl<T: Debug + Clone> Default for CombinedState<T> {
134    fn default() -> Self {
135        Self {
136            current: None,
137            staging: vec![],
138            legacy_state_key: None,
139        }
140    }
141}
142
143impl<T: PartialEq + Debug + Clone> PartialEq<T> for CombinedState<T> {
144    fn eq(&self, other: &T) -> bool {
145        self.staging.is_empty() && self.current.as_ref() == Some(other)
146    }
147}
148
149#[derive(Clone, Copy)]
150pub struct ExistingMode;
151impl StateMode for ExistingMode {
152    type State<T: Debug + Clone> = CombinedState<T>;
153    type DefaultState<T: Debug + Clone + Default> = CombinedState<T>;
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
157pub enum StateChange<State> {
158    Upsert(State),
159    Delete,
160}
161
162impl<State> StateChange<State> {
163    pub fn is_delete(&self) -> bool {
164        matches!(self, StateChange::Delete)
165    }
166
167    pub fn desired_state(&self) -> Option<&State> {
168        match self {
169            StateChange::Upsert(state) => Some(state),
170            StateChange::Delete => None,
171        }
172    }
173
174    pub fn state(&self) -> Option<&State> {
175        match self {
176            StateChange::Upsert(state) => Some(state),
177            StateChange::Delete => None,
178        }
179    }
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
183pub struct SourceSetupState {
184    pub source_id: i32,
185
186    #[serde(default, skip_serializing_if = "Option::is_none")]
187    pub keys_schema: Option<Box<[schema::ValueType]>>,
188
189    /// DEPRECATED. For backward compatibility.
190    #[cfg(feature = "legacy-states-v0")]
191    #[serde(default, skip_serializing_if = "Option::is_none")]
192    pub key_schema: Option<schema::ValueType>,
193
194    // Allow empty string during deserialization for backward compatibility.
195    #[serde(default)]
196    pub source_kind: String,
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
200pub struct ResourceIdentifier {
201    pub key: serde_json::Value,
202    pub target_kind: String,
203}
204
205impl Display for ResourceIdentifier {
206    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
207        write!(f, "{}:{}", self.target_kind, self.key)
208    }
209}
210
211/// Common state (i.e. not specific to a target kind) for a target.
212#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
213pub struct TargetSetupStateCommon {
214    pub target_id: i32,
215
216    /// schema_version_id indicates if a previous exported target row (as tracked by the tracking table)
217    /// is possible to be reused without re-exporting the row, on the exported values don't change.
218    ///
219    /// Note that sometimes even if exported values don't change, the target row may still need to be re-exported,
220    /// for example, a column is dropped then added back (which has data loss in between).
221    pub schema_version_id: usize,
222    pub max_schema_version_id: usize,
223
224    #[serde(default)]
225    pub setup_by_user: bool,
226    #[serde(default)]
227    pub key_type: Option<Box<[schema::ValueType]>>,
228}
229
230#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
231pub struct TargetSetupState {
232    pub common: TargetSetupStateCommon,
233
234    pub state: serde_json::Value,
235
236    #[serde(
237        default,
238        with = "indexmap::map::serde_seq",
239        skip_serializing_if = "IndexMap::is_empty"
240    )]
241    pub attachments: IndexMap<interface::AttachmentSetupKey, serde_json::Value>,
242}
243
244impl TargetSetupState {
245    pub fn state_unless_setup_by_user(self) -> Option<serde_json::Value> {
246        (!self.common.setup_by_user).then_some(self.state)
247    }
248}
249
250#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
251pub struct FlowSetupMetadata {
252    pub last_source_id: i32,
253    pub last_target_id: i32,
254    pub sources: BTreeMap<String, SourceSetupState>,
255    #[serde(default)]
256    pub features: BTreeSet<String>,
257}
258
259#[derive(Debug, Clone)]
260pub struct FlowSetupState<Mode: StateMode> {
261    // The version number for the flow, last seen in the metadata table.
262    pub seen_flow_metadata_version: Option<u64>,
263    pub metadata: Mode::DefaultState<FlowSetupMetadata>,
264    #[cfg(feature = "persistence")]
265    pub tracking_table: Mode::State<TrackingTableSetupState>,
266    pub targets: IndexMap<ResourceIdentifier, Mode::State<TargetSetupState>>,
267}
268
269impl Default for FlowSetupState<ExistingMode> {
270    fn default() -> Self {
271        Self {
272            seen_flow_metadata_version: None,
273            metadata: Default::default(),
274            #[cfg(feature = "persistence")]
275            tracking_table: Default::default(),
276            targets: IndexMap::new(),
277        }
278    }
279}
280
281impl PartialEq for FlowSetupState<DesiredMode> {
282    fn eq(&self, other: &Self) -> bool {
283        self.metadata == other.metadata
284            && {
285                #[cfg(feature = "persistence")]
286                {
287                    self.tracking_table == other.tracking_table
288                }
289                #[cfg(not(feature = "persistence"))]
290                {
291                    true
292                }
293            }
294            && self.targets == other.targets
295    }
296}
297
298#[derive(Debug, Clone)]
299pub struct AllSetupStates<Mode: StateMode> {
300    pub has_metadata_table: bool,
301    pub flows: BTreeMap<String, FlowSetupState<Mode>>,
302}
303
304impl<Mode: StateMode> Default for AllSetupStates<Mode> {
305    fn default() -> Self {
306        Self {
307            has_metadata_table: false,
308            flows: BTreeMap::new(),
309        }
310    }
311}
312
313#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
314pub enum SetupChangeType {
315    NoChange,
316    Create,
317    Update,
318    Delete,
319    Invalid,
320}
321
322pub enum ChangeDescription {
323    Action(String),
324    Note(String),
325}
326
327pub trait ResourceSetupChange: Send + Sync + Any + 'static {
328    fn describe_changes(&self) -> Vec<ChangeDescription>;
329
330    fn change_type(&self) -> SetupChangeType;
331}
332
333impl ResourceSetupChange for Box<dyn ResourceSetupChange> {
334    fn describe_changes(&self) -> Vec<ChangeDescription> {
335        self.as_ref().describe_changes()
336    }
337
338    fn change_type(&self) -> SetupChangeType {
339        self.as_ref().change_type()
340    }
341}
342
343impl ResourceSetupChange for std::convert::Infallible {
344    fn describe_changes(&self) -> Vec<ChangeDescription> {
345        unreachable!()
346    }
347
348    fn change_type(&self) -> SetupChangeType {
349        unreachable!()
350    }
351}
352
353#[derive(Debug)]
354pub struct ResourceSetupInfo<K, S, C: ResourceSetupChange> {
355    pub key: K,
356    pub state: Option<S>,
357    pub has_tracked_state_change: bool,
358    pub description: String,
359
360    /// If `None`, the resource is managed by users.
361    pub setup_change: Option<C>,
362
363    pub legacy_key: Option<K>,
364}
365
366impl<K, S, C: ResourceSetupChange> std::fmt::Display for ResourceSetupInfo<K, S, C> {
367    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
368        let status_code = match self.setup_change.as_ref().map(|c| c.change_type()) {
369            Some(SetupChangeType::NoChange) => "READY",
370            Some(SetupChangeType::Create) => "TO CREATE",
371            Some(SetupChangeType::Update) => "TO UPDATE",
372            Some(SetupChangeType::Delete) => "TO DELETE",
373            Some(SetupChangeType::Invalid) => "INVALID",
374            None => "USER MANAGED",
375        };
376        let status_str = format!("[ {status_code:^9} ]");
377        let status_full = status_str;
378        let desc_colored = &self.description;
379        writeln!(f, "{status_full} {desc_colored}")?;
380        if let Some(setup_change) = &self.setup_change {
381            let changes = setup_change.describe_changes();
382            if !changes.is_empty() {
383                let mut f = indented(f).with_str(INDENT);
384                writeln!(f)?;
385                for change in changes {
386                    match change {
387                        ChangeDescription::Action(action) => {
388                            writeln!(f, "TODO: {}", action)?;
389                        }
390                        ChangeDescription::Note(note) => {
391                            writeln!(f, "NOTE: {}", note)?;
392                        }
393                    }
394                }
395                writeln!(f)?;
396            }
397        }
398        Ok(())
399    }
400}
401
402impl<K, S, C: ResourceSetupChange> ResourceSetupInfo<K, S, C> {
403    pub fn is_up_to_date(&self) -> bool {
404        self.setup_change
405            .as_ref()
406            .is_none_or(|c| c.change_type() == SetupChangeType::NoChange)
407    }
408}
409
410#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
411pub enum ObjectStatus {
412    Invalid,
413    New,
414    Existing,
415    Deleted,
416}
417
418pub trait ObjectSetupChange {
419    fn status(&self) -> Option<ObjectStatus>;
420
421    /// Returns true if it has internal changes, i.e. changes that don't need user intervention.
422    fn has_internal_changes(&self) -> bool;
423
424    /// Returns true if it has external changes, i.e. changes that should notify users.
425    fn has_external_changes(&self) -> bool;
426
427    fn is_up_to_date(&self) -> bool {
428        !self.has_internal_changes() && !self.has_external_changes()
429    }
430}
431
432#[derive(Default)]
433pub struct AttachmentsSetupChange {
434    pub has_tracked_state_change: bool,
435    pub deletes: Vec<Box<dyn AttachmentSetupChange + Send + Sync>>,
436    pub upserts: Vec<Box<dyn AttachmentSetupChange + Send + Sync>>,
437}
438
439impl AttachmentsSetupChange {
440    pub fn is_empty(&self) -> bool {
441        self.deletes.is_empty() && self.upserts.is_empty()
442    }
443}
444
445pub struct TargetSetupChange {
446    pub target_change: Box<dyn ResourceSetupChange>,
447    pub attachments_change: AttachmentsSetupChange,
448}
449
450impl ResourceSetupChange for TargetSetupChange {
451    fn describe_changes(&self) -> Vec<ChangeDescription> {
452        let mut result = vec![];
453        self.attachments_change
454            .deletes
455            .iter()
456            .flat_map(|a| a.describe_changes().into_iter())
457            .for_each(|change| result.push(ChangeDescription::Action(change)));
458        result.extend(self.target_change.describe_changes());
459        self.attachments_change
460            .upserts
461            .iter()
462            .flat_map(|a| a.describe_changes().into_iter())
463            .for_each(|change| result.push(ChangeDescription::Action(change)));
464        result
465    }
466
467    fn change_type(&self) -> SetupChangeType {
468        match self.target_change.change_type() {
469            SetupChangeType::NoChange => {
470                if self.attachments_change.is_empty() {
471                    SetupChangeType::NoChange
472                } else {
473                    SetupChangeType::Update
474                }
475            }
476            t => t,
477        }
478    }
479}
480
481pub struct FlowSetupChange {
482    pub status: Option<ObjectStatus>,
483    pub seen_flow_metadata_version: Option<u64>,
484
485    pub metadata_change: Option<StateChange<FlowSetupMetadata>>,
486
487    #[cfg(feature = "persistence")]
488    pub tracking_table:
489        Option<ResourceSetupInfo<(), TrackingTableSetupState, TrackingTableSetupChange>>,
490    pub target_resources:
491        Vec<ResourceSetupInfo<ResourceIdentifier, TargetSetupState, TargetSetupChange>>,
492
493    pub unknown_resources: Vec<ResourceIdentifier>,
494}
495
496impl ObjectSetupChange for FlowSetupChange {
497    fn status(&self) -> Option<ObjectStatus> {
498        self.status
499    }
500
501    fn has_internal_changes(&self) -> bool {
502        #[allow(unused_mut)]
503        let mut changes = self.metadata_change.is_some();
504        #[cfg(feature = "persistence")]
505        {
506            changes = changes
507                || self
508                    .tracking_table
509                    .as_ref()
510                    .is_some_and(|t| t.has_tracked_state_change);
511        }
512        changes
513            || self
514                .target_resources
515                .iter()
516                .any(|target| target.has_tracked_state_change)
517    }
518
519    fn has_external_changes(&self) -> bool {
520        #[allow(unused_mut)]
521        let mut changes = false;
522        #[cfg(feature = "persistence")]
523        {
524            changes = changes
525                || self
526                    .tracking_table
527                    .as_ref()
528                    .is_some_and(|t| !t.is_up_to_date());
529        }
530        changes
531            || self
532                .target_resources
533                .iter()
534                .any(|target| !target.is_up_to_date())
535    }
536}
537
538#[derive(Debug)]
539pub struct GlobalSetupChange {
540    #[cfg(feature = "persistence")]
541    pub metadata_table: ResourceSetupInfo<(), (), db_metadata::MetadataTableSetup>,
542}
543
544impl GlobalSetupChange {
545    pub fn from_setup_states(_setup_states: &AllSetupStates<ExistingMode>) -> Self {
546        Self {
547            #[cfg(feature = "persistence")]
548            metadata_table: db_metadata::MetadataTableSetup {
549                metadata_table_missing: !_setup_states.has_metadata_table,
550            }
551            .into_setup_info(),
552        }
553    }
554
555    pub fn is_up_to_date(&self) -> bool {
556        #[cfg(feature = "persistence")]
557        {
558            self.metadata_table.is_up_to_date()
559        }
560        #[cfg(not(feature = "persistence"))]
561        {
562            true
563        }
564    }
565}
566
567pub struct ObjectSetupChangeCode<'a, Status: ObjectSetupChange>(&'a Status);
568impl<Status: ObjectSetupChange> std::fmt::Display for ObjectSetupChangeCode<'_, Status> {
569    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
570        let Some(status) = self.0.status() else {
571            return Ok(());
572        };
573        write!(
574            f,
575            "[ {:^9} ]",
576            match status {
577                ObjectStatus::New => "TO CREATE",
578                ObjectStatus::Existing =>
579                    if self.0.is_up_to_date() {
580                        "READY"
581                    } else {
582                        "TO UPDATE"
583                    },
584                ObjectStatus::Deleted => "TO DELETE",
585                ObjectStatus::Invalid => "INVALID",
586            }
587        )
588    }
589}
590
591impl std::fmt::Display for GlobalSetupChange {
592    fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
593        #[cfg(feature = "persistence")]
594        {
595            writeln!(_f, "{}", self.metadata_table)
596        }
597        #[cfg(not(feature = "persistence"))]
598        {
599            Ok(())
600        }
601    }
602}
603
604pub struct FormattedFlowSetupChange<'a>(pub &'a str, pub &'a FlowSetupChange);
605
606impl std::fmt::Display for FormattedFlowSetupChange<'_> {
607    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
608        let flow_setup_change = self.1;
609        if flow_setup_change.status.is_none() {
610            return Ok(());
611        }
612
613        writeln!(
614            f,
615            "{} Flow: {}",
616            ObjectSetupChangeCode(flow_setup_change),
617            self.0
618        )?;
619
620        let mut f = indented(f).with_str(INDENT);
621        #[cfg(feature = "persistence")]
622        if let Some(tracking_table) = &flow_setup_change.tracking_table {
623            write!(f, "{tracking_table}")?;
624        }
625        for target_resource in &flow_setup_change.target_resources {
626            write!(f, "{target_resource}")?;
627        }
628        for resource in &flow_setup_change.unknown_resources {
629            writeln!(f, "[  UNKNOWN  ] {resource}")?;
630        }
631
632        Ok(())
633    }
634}