fluvio_controlplane_metadata/partition/
status.rs1#![allow(clippy::assign_op_pattern)]
2
3use std::fmt;
10use std::slice::Iter;
11
12use fluvio_protocol::{Encoder, Decoder};
13use fluvio_protocol::record::Offset;
14use fluvio_types::SpuId;
15
16#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
21#[cfg_attr(
22 feature = "use_serde",
23 derive(serde::Serialize, serde::Deserialize),
24 serde(rename_all = "camelCase")
25)]
26pub struct PartitionStatus {
27 pub resolution: PartitionResolution,
28 pub leader: ReplicaStatus,
29 #[cfg_attr(feature = "use_serde", serde(alias = "lrs"))]
32 pub lsr: u32,
33 pub replicas: Vec<ReplicaStatus>,
34 #[cfg_attr(
35 feature = "use_serde",
36 serde(default = "default_partition_status_size")
37 )]
38 #[fluvio(min_version = 5)]
39 pub size: i64,
40 pub is_being_deleted: bool,
41 #[cfg_attr(feature = "use_serde", serde(default))]
42 #[fluvio(min_version = 16)]
43 pub base_offset: i64,
44}
45
46impl Default for PartitionStatus {
47 fn default() -> Self {
48 Self {
49 size: PartitionStatus::SIZE_NOT_SUPPORTED,
50 resolution: Default::default(),
51 leader: Default::default(),
52 lsr: Default::default(),
53 replicas: Default::default(),
54 is_being_deleted: Default::default(),
55 base_offset: Default::default(),
56 }
57 }
58}
59
60#[cfg(feature = "use_serde")]
61const fn default_partition_status_size() -> i64 {
62 PartitionStatus::SIZE_NOT_SUPPORTED
63}
64
65impl fmt::Display for PartitionStatus {
66 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
67 write!(f, "{:#?} Leader: {} [", self.resolution, self.leader)?;
68 for replica in &self.replicas {
69 write!(f, "{replica},")?;
70 }
71 write!(f, "]")
72 }
73}
74
75impl PartitionStatus {
80 pub const SIZE_ERROR: i64 = -1;
81 pub const SIZE_NOT_SUPPORTED: i64 = -2;
82
83 pub fn leader(leader: impl Into<ReplicaStatus>) -> Self {
84 Self::new(leader.into(), vec![])
85 }
86
87 pub fn new(leader: impl Into<ReplicaStatus>, replicas: Vec<ReplicaStatus>) -> Self {
88 Self {
89 resolution: PartitionResolution::default(),
90 leader: leader.into(),
91 replicas,
92 ..Default::default()
93 }
94 }
95
96 pub fn new2(
97 leader: impl Into<ReplicaStatus>,
98 replicas: Vec<ReplicaStatus>,
99 size: i64,
100 resolution: PartitionResolution,
101 base_offset: i64,
102 ) -> Self {
103 Self {
104 resolution,
105 leader: leader.into(),
106 replicas,
107 size,
108 base_offset,
109 ..Default::default()
110 }
111 }
112
113 pub fn is_online(&self) -> bool {
114 matches!(self.resolution, PartitionResolution::Online)
115 }
116
117 pub fn is_readable(&self) -> bool {
118 matches!(
119 self.resolution,
120 PartitionResolution::Online | PartitionResolution::OutOfStorage
121 )
122 }
123
124 pub fn is_offline(&self) -> bool {
125 self.resolution == PartitionResolution::Offline
126 }
127
128 #[deprecated = "Replaced by lrs()"]
129 pub fn lsr(&self) -> u32 {
130 self.lsr
131 }
132
133 pub fn lrs(&self) -> u32 {
134 self.lsr
135 }
136
137 pub fn replica_iter(&self) -> Iter<ReplicaStatus> {
138 self.replicas.iter()
139 }
140
141 pub fn live_replicas(&self) -> Vec<SpuId> {
142 self.replicas.iter().map(|lrs| lrs.spu).collect()
143 }
144
145 pub fn offline_replicas(&self) -> Vec<i32> {
146 vec![]
147 }
148
149 pub fn has_live_replicas(&self) -> bool {
150 !self.replicas.is_empty()
151 }
152
153 pub fn set_to_delete(mut self) -> Self {
155 self.is_being_deleted = true;
156 self
157 }
158}
159
160#[derive(Decoder, Default, Encoder, Debug, Clone, Eq, PartialEq)]
161#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
162pub enum PartitionResolution {
163 #[default]
164 #[fluvio(tag = 0)]
165 Offline, #[fluvio(tag = 1)]
167 Online, #[fluvio(tag = 2)]
169 LeaderOffline, #[fluvio(tag = 3)]
171 ElectionLeaderFound, #[fluvio(tag = 4, min_version = 19)]
173 OutOfStorage,
174}
175
176#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
177#[cfg_attr(
178 feature = "use_serde",
179 derive(serde::Serialize, serde::Deserialize),
180 serde(rename_all = "camelCase")
181)]
182pub struct ReplicaStatus {
183 pub spu: SpuId,
184 pub hw: i64,
185 pub leo: i64,
186}
187
188impl fmt::Display for ReplicaStatus {
189 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
190 write!(f, "spu:{} hw:{} leo: {}", self.spu, self.hw, self.leo)
191 }
192}
193
194impl Default for ReplicaStatus {
195 fn default() -> Self {
196 ReplicaStatus {
197 spu: -1,
198 hw: -1,
199 leo: -1,
200 }
201 }
202}
203
204impl ReplicaStatus {
205 pub fn new(spu: SpuId, hw: Offset, leo: Offset) -> Self {
206 Self { spu, hw, leo }
207 }
208
209 pub fn leader_lag(&self, leader_status: &Self) -> i64 {
211 leader_status.leo - self.leo
212 }
213
214 pub fn high_watermark_lag(&self, leader_status: &Self) -> i64 {
215 leader_status.hw - self.hw
216 }
217}
218
219impl From<(SpuId, Offset, Offset)> for ReplicaStatus {
220 fn from(val: (SpuId, Offset, Offset)) -> Self {
221 let (id, high_watermark, end_offset) = val;
222 Self::new(id, high_watermark, end_offset)
223 }
224}