1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use std::fmt;
use dataplane::core::{Encoder, Decoder};
#[derive(Default, Decoder, Encoder, Debug, Clone, PartialEq)]
#[cfg_attr(
feature = "use_serde",
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "camelCase")
)]
pub struct DerivedStreamStatus {
pub resolution: DerivedStreamResolution,
}
impl fmt::Display for DerivedStreamStatus {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:#?}", self.resolution)
}
}
impl DerivedStreamStatus {
pub fn is_deployable(&self) -> bool {
matches!(self.resolution, DerivedStreamResolution::Provisioned)
}
}
#[derive(Decoder, Encoder, Debug, Clone, PartialEq)]
#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
pub enum DerivedStreamResolution {
Init,
InvalidConfig(String),
Provisioned,
}
impl Default for DerivedStreamResolution {
fn default() -> Self {
DerivedStreamResolution::Init
}
}
mod states {
use tracing::{instrument, trace};
use fluvio_stream_model::core::MetadataItem;
use crate::derivedstream::{DerivedStreamSpec, DerivedStreamValidationInput};
use super::*;
impl DerivedStreamResolution {
#[instrument(skip(spec, objects))]
pub async fn next<'a, C>(
&'a self,
spec: &DerivedStreamSpec,
objects: &DerivedStreamValidationInput<'a, C>,
force: bool,
) -> Option<Self>
where
C: MetadataItem,
{
match self {
Self::Init => {
trace!("init or invalid, performing validation");
match spec.validate(objects).await {
Ok(()) => Some(Self::Provisioned),
Err(e) => Some(Self::InvalidConfig(e.to_string())),
}
}
Self::InvalidConfig(old_error) => {
if force {
trace!("revalidating invalid");
match spec.validate(objects).await {
Ok(()) => Some(Self::Provisioned),
Err(e) => {
trace!("invalid: {:#?}", e);
let new_error = e.to_string();
if old_error != &new_error {
Some(Self::InvalidConfig(e.to_string()))
} else {
trace!("same error as before");
None
}
}
}
} else {
trace!("ignoring");
None
}
}
Self::Provisioned => {
if force {
trace!("revalidating provisoned");
match spec.validate(objects).await {
Ok(()) => None,
Err(e) => Some(Self::InvalidConfig(e.to_string())),
}
} else {
trace!("ignoring");
None
}
}
}
}
}
}