1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
//!
//! # DerivedStream Status
//!
use std::fmt;

use dataplane::core::{Encoder, Decoder};

// -----------------------------------
// Data Structures
// -----------------------------------

#[derive(Default, Decoder, Encoder, Debug, Clone, PartialEq)]
#[cfg_attr(
    feature = "use_serde",
    derive(serde::Serialize, serde::Deserialize),
    serde(rename_all = "camelCase")
)]
pub struct DerivedStreamStatus {
    pub resolution: DerivedStreamResolution,
}

impl fmt::Display for DerivedStreamStatus {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "{:#?}", self.resolution)
    }
}

impl DerivedStreamStatus {
    // whether this is valid (deployable)
    pub fn is_deployable(&self) -> bool {
        matches!(self.resolution, DerivedStreamResolution::Provisioned)
    }
}

#[derive(Decoder, Encoder, Debug, Clone, PartialEq)]
#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
pub enum DerivedStreamResolution {
    Init,
    InvalidConfig(String),
    Provisioned,
}

impl Default for DerivedStreamResolution {
    fn default() -> Self {
        DerivedStreamResolution::Init
    }
}

mod states {

    use tracing::{instrument, trace};

    use fluvio_stream_model::core::MetadataItem;

    use crate::derivedstream::{DerivedStreamSpec, DerivedStreamValidationInput};

    use super::*;

    impl DerivedStreamResolution {
        /// for this resolution, compute next state based on smartmodules
        #[instrument(skip(spec, objects))]
        pub async fn next<'a, C>(
            &'a self,
            spec: &DerivedStreamSpec,
            objects: &DerivedStreamValidationInput<'a, C>,
            force: bool,
        ) -> Option<Self>
        where
            C: MetadataItem,
        {
            match self {
                Self::Init => {
                    trace!("init or invalid, performing validation");
                    match spec.validate(objects).await {
                        Ok(()) => Some(Self::Provisioned),
                        Err(e) => Some(Self::InvalidConfig(e.to_string())),
                    }
                }
                Self::InvalidConfig(old_error) => {
                    if force {
                        trace!("revalidating invalid");
                        match spec.validate(objects).await {
                            Ok(()) => Some(Self::Provisioned),
                            Err(e) => {
                                trace!("invalid: {:#?}", e);
                                let new_error = e.to_string();
                                if old_error != &new_error {
                                    Some(Self::InvalidConfig(e.to_string()))
                                } else {
                                    trace!("same error as before");
                                    None
                                }
                            }
                        }
                    } else {
                        trace!("ignoring");
                        None
                    }
                }
                Self::Provisioned => {
                    if force {
                        trace!("revalidating provisoned");
                        match spec.validate(objects).await {
                            Ok(()) => None, // it is already validated
                            Err(e) => Some(Self::InvalidConfig(e.to_string())),
                        }
                    } else {
                        trace!("ignoring");
                        None
                    }
                }
            }
        }
    }
}