fluvio_controlplane_metadata/mirror/
status.rs1use fluvio_protocol::{Encoder, Decoder};
2
3#[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)]
4#[cfg_attr(
5 feature = "use_serde",
6 derive(serde::Serialize, serde::Deserialize),
7 serde(rename_all = "camelCase")
8)]
9pub struct MirrorStatus {
10 pub connection_status: ConnectionStatus,
11 #[cfg_attr(feature = "use_serde", serde(default))]
12 pub pairing_sc: MirrorPairStatus,
13 #[cfg_attr(feature = "use_serde", serde(default))]
14 pub pairing_spu: MirrorPairStatus,
15 pub connection_stat: ConnectionStat,
16}
17
18impl MirrorStatus {
19 pub fn new(
20 pairing: MirrorPairStatus,
21 connection_status: ConnectionStatus,
22 last_seen: u64,
23 ) -> Self {
24 Self {
25 pairing_sc: pairing,
26 pairing_spu: MirrorPairStatus::Waiting,
27 connection_status,
28 connection_stat: ConnectionStat { last_seen },
29 }
30 }
31
32 pub fn new_by_spu(pairing_spu: MirrorPairStatus, last_seen: u64) -> Self {
33 Self {
34 pairing_spu,
35 connection_stat: ConnectionStat { last_seen },
36 ..Default::default()
37 }
38 }
39
40 pub fn merge_from_sc(&mut self, other: Self) {
41 self.pairing_sc = other.pairing_sc;
42 self.connection_status = other.connection_status;
43 self.connection_stat = other.connection_stat;
44 }
45
46 pub fn merge_from_spu(&mut self, other: Self) {
47 self.pairing_spu = other.pairing_spu;
48 self.connection_stat = other.connection_stat;
49 }
50
51 pub fn pair_errors(self) -> String {
52 match (self.pairing_sc, self.pairing_spu) {
53 (MirrorPairStatus::DetailFailure(sc_err), MirrorPairStatus::DetailFailure(spu_err)) => {
54 format!("SC: {sc_err} - SPU: {spu_err}")
55 }
56 (MirrorPairStatus::DetailFailure(sc_err), _) => sc_err,
57 (_, MirrorPairStatus::DetailFailure(spu_err)) => spu_err,
58 _ => "-".to_string(),
59 }
60 }
61}
62
63#[derive(Encoder, Decoder, Debug, Clone, Eq, PartialEq, Default)]
64#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
65pub enum MirrorPairStatus {
66 #[default]
67 #[fluvio(tag = 0)]
68 Waiting,
69 #[fluvio(tag = 1)]
70 Successful,
71 #[fluvio(tag = 2)]
72 Failed,
73 #[fluvio(tag = 3)]
74 Disabled,
75 #[fluvio(tag = 4)]
76 Unauthorized,
77 #[fluvio(tag = 5, min_version = 17)]
78 DetailFailure(String),
79}
80
81#[derive(Encoder, Decoder, Debug, Clone, Eq, PartialEq, Default)]
82#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
83pub enum ConnectionStatus {
84 #[default]
85 #[fluvio(tag = 0)]
86 Offline,
87 #[fluvio(tag = 1)]
88 Online,
89}
90
91#[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)]
92#[cfg_attr(
93 feature = "use_serde",
94 derive(serde::Serialize, serde::Deserialize),
95 serde(rename_all = "camelCase")
96)]
97pub struct ConnectionStat {
98 pub last_seen: u64, }
100
101impl MirrorStatus {
102 #[cfg(feature = "use_serde")]
103 pub fn last_seen(&self, since: std::time::Duration) -> String {
104 use humantime_serde::re::humantime;
105
106 let since_sec = since.as_secs();
107
108 if self.connection_stat.last_seen == 0 {
109 return "-".to_string();
110 }
111
112 let last_seen_sec =
113 std::time::Duration::from_millis(self.connection_stat.last_seen).as_secs();
114 humantime::Duration::from(std::time::Duration::from_secs(since_sec - last_seen_sec))
115 .to_string()
116 }
117}
118
119impl std::fmt::Display for MirrorStatus {
120 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
121 write!(
122 f,
123 "{}:SPU:{}:SC:{}",
124 self.connection_status, self.pairing_spu, self.pairing_sc
125 )
126 }
127}
128
129impl std::fmt::Display for MirrorPairStatus {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 let status = match self {
132 MirrorPairStatus::Successful => "Connected",
133 MirrorPairStatus::Disabled => "Disabled",
134 MirrorPairStatus::Failed => "Failed",
135 MirrorPairStatus::Waiting => "Waiting",
136 MirrorPairStatus::Unauthorized => "Unauthorized",
137 MirrorPairStatus::DetailFailure(_) => "Failed", };
139 write!(f, "{status}")
140 }
141}
142
143impl std::fmt::Display for ConnectionStatus {
144 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
145 let status = match self {
146 ConnectionStatus::Online => "online",
147 ConnectionStatus::Offline => "offline",
148 };
149 write!(f, "{status}")
150 }
151}
152
153#[cfg(test)]
154mod test {
155 use std::time::Duration;
156
157 use super::*;
158
159 #[test]
160 fn test_last_seen() {
161 let status = MirrorStatus {
162 pairing_sc: MirrorPairStatus::Successful,
163 pairing_spu: MirrorPairStatus::Waiting,
164 connection_status: ConnectionStatus::Online,
165 connection_stat: ConnectionStat {
166 last_seen: 1713902927812,
167 },
168 };
169
170 let since = Duration::from_millis(1713902932152);
171 let last_seen = status.last_seen(since);
172 assert_eq!(last_seen, "5s");
173
174 let default_status = MirrorStatus {
175 pairing_sc: MirrorPairStatus::Successful,
176 pairing_spu: MirrorPairStatus::Waiting,
177 connection_status: ConnectionStatus::Online,
178 connection_stat: ConnectionStat { last_seen: 0 },
179 };
180 let last_seen = default_status.last_seen(since);
181 assert_eq!(last_seen, "-");
182 }
183}