fluvio_controlplane_metadata/mirror/
status.rs

1use fluvio_protocol::{Encoder, Decoder};
2
3#[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)]
4#[cfg_attr(
5    feature = "use_serde",
6    derive(serde::Serialize, serde::Deserialize),
7    serde(rename_all = "camelCase")
8)]
9pub struct MirrorStatus {
10    pub connection_status: ConnectionStatus,
11    #[cfg_attr(feature = "use_serde", serde(default))]
12    pub pairing_sc: MirrorPairStatus,
13    #[cfg_attr(feature = "use_serde", serde(default))]
14    pub pairing_spu: MirrorPairStatus,
15    pub connection_stat: ConnectionStat,
16}
17
18impl MirrorStatus {
19    pub fn new(
20        pairing: MirrorPairStatus,
21        connection_status: ConnectionStatus,
22        last_seen: u64,
23    ) -> Self {
24        Self {
25            pairing_sc: pairing,
26            pairing_spu: MirrorPairStatus::Waiting,
27            connection_status,
28            connection_stat: ConnectionStat { last_seen },
29        }
30    }
31
32    pub fn new_by_spu(pairing_spu: MirrorPairStatus, last_seen: u64) -> Self {
33        Self {
34            pairing_spu,
35            connection_stat: ConnectionStat { last_seen },
36            ..Default::default()
37        }
38    }
39
40    pub fn merge_from_sc(&mut self, other: Self) {
41        self.pairing_sc = other.pairing_sc;
42        self.connection_status = other.connection_status;
43        self.connection_stat = other.connection_stat;
44    }
45
46    pub fn merge_from_spu(&mut self, other: Self) {
47        self.pairing_spu = other.pairing_spu;
48        self.connection_stat = other.connection_stat;
49    }
50
51    pub fn pair_errors(self) -> String {
52        match (self.pairing_sc, self.pairing_spu) {
53            (MirrorPairStatus::DetailFailure(sc_err), MirrorPairStatus::DetailFailure(spu_err)) => {
54                format!("SC: {sc_err} - SPU: {spu_err}")
55            }
56            (MirrorPairStatus::DetailFailure(sc_err), _) => sc_err,
57            (_, MirrorPairStatus::DetailFailure(spu_err)) => spu_err,
58            _ => "-".to_string(),
59        }
60    }
61}
62
63#[derive(Encoder, Decoder, Debug, Clone, Eq, PartialEq, Default)]
64#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
65pub enum MirrorPairStatus {
66    #[default]
67    #[fluvio(tag = 0)]
68    Waiting,
69    #[fluvio(tag = 1)]
70    Successful,
71    #[fluvio(tag = 2)]
72    Failed,
73    #[fluvio(tag = 3)]
74    Disabled,
75    #[fluvio(tag = 4)]
76    Unauthorized,
77    #[fluvio(tag = 5, min_version = 17)]
78    DetailFailure(String),
79}
80
81#[derive(Encoder, Decoder, Debug, Clone, Eq, PartialEq, Default)]
82#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
83pub enum ConnectionStatus {
84    #[default]
85    #[fluvio(tag = 0)]
86    Offline,
87    #[fluvio(tag = 1)]
88    Online,
89}
90
91#[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)]
92#[cfg_attr(
93    feature = "use_serde",
94    derive(serde::Serialize, serde::Deserialize),
95    serde(rename_all = "camelCase")
96)]
97pub struct ConnectionStat {
98    pub last_seen: u64, // number of milliseconds since last seen
99}
100
101impl MirrorStatus {
102    #[cfg(feature = "use_serde")]
103    pub fn last_seen(&self, since: std::time::Duration) -> String {
104        use humantime_serde::re::humantime;
105
106        let since_sec = since.as_secs();
107
108        if self.connection_stat.last_seen == 0 {
109            return "-".to_string();
110        }
111
112        let last_seen_sec =
113            std::time::Duration::from_millis(self.connection_stat.last_seen).as_secs();
114        humantime::Duration::from(std::time::Duration::from_secs(since_sec - last_seen_sec))
115            .to_string()
116    }
117}
118
119impl std::fmt::Display for MirrorStatus {
120    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121        write!(
122            f,
123            "{}:SPU:{}:SC:{}",
124            self.connection_status, self.pairing_spu, self.pairing_sc
125        )
126    }
127}
128
129impl std::fmt::Display for MirrorPairStatus {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        let status = match self {
132            MirrorPairStatus::Successful => "Connected",
133            MirrorPairStatus::Disabled => "Disabled",
134            MirrorPairStatus::Failed => "Failed",
135            MirrorPairStatus::Waiting => "Waiting",
136            MirrorPairStatus::Unauthorized => "Unauthorized",
137            MirrorPairStatus::DetailFailure(_) => "Failed", // the msg is showed with pair_errors
138        };
139        write!(f, "{status}")
140    }
141}
142
143impl std::fmt::Display for ConnectionStatus {
144    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145        let status = match self {
146            ConnectionStatus::Online => "online",
147            ConnectionStatus::Offline => "offline",
148        };
149        write!(f, "{status}")
150    }
151}
152
153#[cfg(test)]
154mod test {
155    use std::time::Duration;
156
157    use super::*;
158
159    #[test]
160    fn test_last_seen() {
161        let status = MirrorStatus {
162            pairing_sc: MirrorPairStatus::Successful,
163            pairing_spu: MirrorPairStatus::Waiting,
164            connection_status: ConnectionStatus::Online,
165            connection_stat: ConnectionStat {
166                last_seen: 1713902927812,
167            },
168        };
169
170        let since = Duration::from_millis(1713902932152);
171        let last_seen = status.last_seen(since);
172        assert_eq!(last_seen, "5s");
173
174        let default_status = MirrorStatus {
175            pairing_sc: MirrorPairStatus::Successful,
176            pairing_spu: MirrorPairStatus::Waiting,
177            connection_status: ConnectionStatus::Online,
178            connection_stat: ConnectionStat { last_seen: 0 },
179        };
180        let last_seen = default_status.last_seen(since);
181        assert_eq!(last_seen, "-");
182    }
183}