use crate::ops::interface::AttachmentSetupChange;
use crate::prelude::*;
use indenter::indented;
use std::any::Any;
use std::fmt::Debug;
use std::fmt::{Display, Write};
use std::hash::Hash;
#[cfg(feature = "persistence")]
use super::db_metadata;
#[cfg(feature = "persistence")]
use crate::execution::db_tracking_setup::{TrackingTableSetupChange, TrackingTableSetupState};
const INDENT: &str = " ";
pub trait StateMode: Clone + Copy {
type State<T: Debug + Clone>: Debug + Clone;
type DefaultState<T: Debug + Clone + Default>: Debug + Clone + Default;
}
#[derive(Debug, Clone, Copy)]
pub struct DesiredMode;
impl StateMode for DesiredMode {
type State<T: Debug + Clone> = T;
type DefaultState<T: Debug + Clone + Default> = T;
}
#[derive(Debug, Clone)]
pub struct CombinedState<T> {
pub current: Option<T>,
pub staging: Vec<StateChange<T>>,
pub legacy_state_key: Option<serde_json::Value>,
}
impl<T> CombinedState<T> {
pub fn current(desired: T) -> Self {
Self {
current: Some(desired),
staging: vec![],
legacy_state_key: None,
}
}
pub fn staging(change: StateChange<T>) -> Self {
Self {
current: None,
staging: vec![change],
legacy_state_key: None,
}
}
pub fn from_change(prev: Option<CombinedState<T>>, change: Option<Option<&T>>) -> Self
where
T: Clone,
{
Self {
current: match change {
Some(Some(state)) => Some(state.clone()),
Some(None) => None,
None => prev.and_then(|v| v.current),
},
staging: vec![],
legacy_state_key: None,
}
}
pub fn possible_versions(&self) -> impl Iterator<Item = &T> {
self.current
.iter()
.chain(self.staging.iter().flat_map(|s| s.state().into_iter()))
}
pub fn always_exists(&self) -> bool {
self.current.is_some() && self.staging.iter().all(|s| !s.is_delete())
}
pub fn always_exists_and(&self, predicate: impl Fn(&T) -> bool) -> bool {
self.always_exists() && self.possible_versions().all(predicate)
}
pub fn legacy_values<V: Ord + Eq, F: Fn(&T) -> &V>(
&self,
desired: Option<&T>,
f: F,
) -> BTreeSet<&V> {
let desired_value = desired.map(&f);
self.possible_versions()
.map(f)
.filter(|v| Some(*v) != desired_value)
.collect()
}
pub fn has_state_diff<S>(&self, state: Option<&S>, map_fn: impl Fn(&T) -> &S) -> bool
where
S: PartialEq,
{
if let Some(state) = state {
!self.always_exists_and(|s| map_fn(s) == state)
} else {
self.possible_versions().next().is_some()
}
}
}
impl<T: Debug + Clone> Default for CombinedState<T> {
fn default() -> Self {
Self {
current: None,
staging: vec![],
legacy_state_key: None,
}
}
}
impl<T: PartialEq + Debug + Clone> PartialEq<T> for CombinedState<T> {
fn eq(&self, other: &T) -> bool {
self.staging.is_empty() && self.current.as_ref() == Some(other)
}
}
#[derive(Clone, Copy)]
pub struct ExistingMode;
impl StateMode for ExistingMode {
type State<T: Debug + Clone> = CombinedState<T>;
type DefaultState<T: Debug + Clone + Default> = CombinedState<T>;
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum StateChange<State> {
Upsert(State),
Delete,
}
impl<State> StateChange<State> {
pub fn is_delete(&self) -> bool {
matches!(self, StateChange::Delete)
}
pub fn desired_state(&self) -> Option<&State> {
match self {
StateChange::Upsert(state) => Some(state),
StateChange::Delete => None,
}
}
pub fn state(&self) -> Option<&State> {
match self {
StateChange::Upsert(state) => Some(state),
StateChange::Delete => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct SourceSetupState {
pub source_id: i32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub keys_schema: Option<Box<[schema::ValueType]>>,
#[cfg(feature = "legacy-states-v0")]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub key_schema: Option<schema::ValueType>,
#[serde(default)]
pub source_kind: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct ResourceIdentifier {
pub key: serde_json::Value,
pub target_kind: String,
}
impl Display for ResourceIdentifier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}", self.target_kind, self.key)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TargetSetupStateCommon {
pub target_id: i32,
pub schema_version_id: usize,
pub max_schema_version_id: usize,
#[serde(default)]
pub setup_by_user: bool,
#[serde(default)]
pub key_type: Option<Box<[schema::ValueType]>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TargetSetupState {
pub common: TargetSetupStateCommon,
pub state: serde_json::Value,
#[serde(
default,
with = "indexmap::map::serde_seq",
skip_serializing_if = "IndexMap::is_empty"
)]
pub attachments: IndexMap<interface::AttachmentSetupKey, serde_json::Value>,
}
impl TargetSetupState {
pub fn state_unless_setup_by_user(self) -> Option<serde_json::Value> {
(!self.common.setup_by_user).then_some(self.state)
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
pub struct FlowSetupMetadata {
pub last_source_id: i32,
pub last_target_id: i32,
pub sources: BTreeMap<String, SourceSetupState>,
#[serde(default)]
pub features: BTreeSet<String>,
}
#[derive(Debug, Clone)]
pub struct FlowSetupState<Mode: StateMode> {
pub seen_flow_metadata_version: Option<u64>,
pub metadata: Mode::DefaultState<FlowSetupMetadata>,
#[cfg(feature = "persistence")]
pub tracking_table: Mode::State<TrackingTableSetupState>,
pub targets: IndexMap<ResourceIdentifier, Mode::State<TargetSetupState>>,
}
impl Default for FlowSetupState<ExistingMode> {
fn default() -> Self {
Self {
seen_flow_metadata_version: None,
metadata: Default::default(),
#[cfg(feature = "persistence")]
tracking_table: Default::default(),
targets: IndexMap::new(),
}
}
}
impl PartialEq for FlowSetupState<DesiredMode> {
fn eq(&self, other: &Self) -> bool {
self.metadata == other.metadata
&& {
#[cfg(feature = "persistence")]
{
self.tracking_table == other.tracking_table
}
#[cfg(not(feature = "persistence"))]
{
true
}
}
&& self.targets == other.targets
}
}
#[derive(Debug, Clone)]
pub struct AllSetupStates<Mode: StateMode> {
pub has_metadata_table: bool,
pub flows: BTreeMap<String, FlowSetupState<Mode>>,
}
impl<Mode: StateMode> Default for AllSetupStates<Mode> {
fn default() -> Self {
Self {
has_metadata_table: false,
flows: BTreeMap::new(),
}
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum SetupChangeType {
NoChange,
Create,
Update,
Delete,
Invalid,
}
pub enum ChangeDescription {
Action(String),
Note(String),
}
pub trait ResourceSetupChange: Send + Sync + Any + 'static {
fn describe_changes(&self) -> Vec<ChangeDescription>;
fn change_type(&self) -> SetupChangeType;
}
impl ResourceSetupChange for Box<dyn ResourceSetupChange> {
fn describe_changes(&self) -> Vec<ChangeDescription> {
self.as_ref().describe_changes()
}
fn change_type(&self) -> SetupChangeType {
self.as_ref().change_type()
}
}
impl ResourceSetupChange for std::convert::Infallible {
fn describe_changes(&self) -> Vec<ChangeDescription> {
unreachable!()
}
fn change_type(&self) -> SetupChangeType {
unreachable!()
}
}
#[derive(Debug)]
pub struct ResourceSetupInfo<K, S, C: ResourceSetupChange> {
pub key: K,
pub state: Option<S>,
pub has_tracked_state_change: bool,
pub description: String,
pub setup_change: Option<C>,
pub legacy_key: Option<K>,
}
impl<K, S, C: ResourceSetupChange> std::fmt::Display for ResourceSetupInfo<K, S, C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let status_code = match self.setup_change.as_ref().map(|c| c.change_type()) {
Some(SetupChangeType::NoChange) => "READY",
Some(SetupChangeType::Create) => "TO CREATE",
Some(SetupChangeType::Update) => "TO UPDATE",
Some(SetupChangeType::Delete) => "TO DELETE",
Some(SetupChangeType::Invalid) => "INVALID",
None => "USER MANAGED",
};
let status_str = format!("[ {status_code:^9} ]");
let status_full = status_str;
let desc_colored = &self.description;
writeln!(f, "{status_full} {desc_colored}")?;
if let Some(setup_change) = &self.setup_change {
let changes = setup_change.describe_changes();
if !changes.is_empty() {
let mut f = indented(f).with_str(INDENT);
writeln!(f)?;
for change in changes {
match change {
ChangeDescription::Action(action) => {
writeln!(f, "TODO: {}", action)?;
}
ChangeDescription::Note(note) => {
writeln!(f, "NOTE: {}", note)?;
}
}
}
writeln!(f)?;
}
}
Ok(())
}
}
impl<K, S, C: ResourceSetupChange> ResourceSetupInfo<K, S, C> {
pub fn is_up_to_date(&self) -> bool {
self.setup_change
.as_ref()
.is_none_or(|c| c.change_type() == SetupChangeType::NoChange)
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum ObjectStatus {
Invalid,
New,
Existing,
Deleted,
}
pub trait ObjectSetupChange {
fn status(&self) -> Option<ObjectStatus>;
fn has_internal_changes(&self) -> bool;
fn has_external_changes(&self) -> bool;
fn is_up_to_date(&self) -> bool {
!self.has_internal_changes() && !self.has_external_changes()
}
}
#[derive(Default)]
pub struct AttachmentsSetupChange {
pub has_tracked_state_change: bool,
pub deletes: Vec<Box<dyn AttachmentSetupChange + Send + Sync>>,
pub upserts: Vec<Box<dyn AttachmentSetupChange + Send + Sync>>,
}
impl AttachmentsSetupChange {
pub fn is_empty(&self) -> bool {
self.deletes.is_empty() && self.upserts.is_empty()
}
}
pub struct TargetSetupChange {
pub target_change: Box<dyn ResourceSetupChange>,
pub attachments_change: AttachmentsSetupChange,
}
impl ResourceSetupChange for TargetSetupChange {
fn describe_changes(&self) -> Vec<ChangeDescription> {
let mut result = vec![];
self.attachments_change
.deletes
.iter()
.flat_map(|a| a.describe_changes().into_iter())
.for_each(|change| result.push(ChangeDescription::Action(change)));
result.extend(self.target_change.describe_changes());
self.attachments_change
.upserts
.iter()
.flat_map(|a| a.describe_changes().into_iter())
.for_each(|change| result.push(ChangeDescription::Action(change)));
result
}
fn change_type(&self) -> SetupChangeType {
match self.target_change.change_type() {
SetupChangeType::NoChange => {
if self.attachments_change.is_empty() {
SetupChangeType::NoChange
} else {
SetupChangeType::Update
}
}
t => t,
}
}
}
pub struct FlowSetupChange {
pub status: Option<ObjectStatus>,
pub seen_flow_metadata_version: Option<u64>,
pub metadata_change: Option<StateChange<FlowSetupMetadata>>,
#[cfg(feature = "persistence")]
pub tracking_table:
Option<ResourceSetupInfo<(), TrackingTableSetupState, TrackingTableSetupChange>>,
pub target_resources:
Vec<ResourceSetupInfo<ResourceIdentifier, TargetSetupState, TargetSetupChange>>,
pub unknown_resources: Vec<ResourceIdentifier>,
}
impl ObjectSetupChange for FlowSetupChange {
fn status(&self) -> Option<ObjectStatus> {
self.status
}
fn has_internal_changes(&self) -> bool {
#[allow(unused_mut)]
let mut changes = self.metadata_change.is_some();
#[cfg(feature = "persistence")]
{
changes = changes
|| self
.tracking_table
.as_ref()
.is_some_and(|t| t.has_tracked_state_change);
}
changes
|| self
.target_resources
.iter()
.any(|target| target.has_tracked_state_change)
}
fn has_external_changes(&self) -> bool {
#[allow(unused_mut)]
let mut changes = false;
#[cfg(feature = "persistence")]
{
changes = changes
|| self
.tracking_table
.as_ref()
.is_some_and(|t| !t.is_up_to_date());
}
changes
|| self
.target_resources
.iter()
.any(|target| !target.is_up_to_date())
}
}
#[derive(Debug)]
pub struct GlobalSetupChange {
#[cfg(feature = "persistence")]
pub metadata_table: ResourceSetupInfo<(), (), db_metadata::MetadataTableSetup>,
}
impl GlobalSetupChange {
pub fn from_setup_states(_setup_states: &AllSetupStates<ExistingMode>) -> Self {
Self {
#[cfg(feature = "persistence")]
metadata_table: db_metadata::MetadataTableSetup {
metadata_table_missing: !_setup_states.has_metadata_table,
}
.into_setup_info(),
}
}
pub fn is_up_to_date(&self) -> bool {
#[cfg(feature = "persistence")]
{
self.metadata_table.is_up_to_date()
}
#[cfg(not(feature = "persistence"))]
{
true
}
}
}
pub struct ObjectSetupChangeCode<'a, Status: ObjectSetupChange>(&'a Status);
impl<Status: ObjectSetupChange> std::fmt::Display for ObjectSetupChangeCode<'_, Status> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Some(status) = self.0.status() else {
return Ok(());
};
write!(
f,
"[ {:^9} ]",
match status {
ObjectStatus::New => "TO CREATE",
ObjectStatus::Existing =>
if self.0.is_up_to_date() {
"READY"
} else {
"TO UPDATE"
},
ObjectStatus::Deleted => "TO DELETE",
ObjectStatus::Invalid => "INVALID",
}
)
}
}
impl std::fmt::Display for GlobalSetupChange {
fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
#[cfg(feature = "persistence")]
{
writeln!(_f, "{}", self.metadata_table)
}
#[cfg(not(feature = "persistence"))]
{
Ok(())
}
}
}
pub struct FormattedFlowSetupChange<'a>(pub &'a str, pub &'a FlowSetupChange);
impl std::fmt::Display for FormattedFlowSetupChange<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let flow_setup_change = self.1;
if flow_setup_change.status.is_none() {
return Ok(());
}
writeln!(
f,
"{} Flow: {}",
ObjectSetupChangeCode(flow_setup_change),
self.0
)?;
let mut f = indented(f).with_str(INDENT);
#[cfg(feature = "persistence")]
if let Some(tracking_table) = &flow_setup_change.tracking_table {
write!(f, "{tracking_table}")?;
}
for target_resource in &flow_setup_change.target_resources {
write!(f, "{target_resource}")?;
}
for resource in &flow_setup_change.unknown_resources {
writeln!(f, "[ UNKNOWN ] {resource}")?;
}
Ok(())
}
}