fluvio_controlplane_metadata/partition/
status.rs1#![allow(clippy::assign_op_pattern)]
2
3use std::fmt;
10use std::slice::Iter;
11
12use fluvio_protocol::{Encoder, Decoder};
13use fluvio_protocol::record::Offset;
14use fluvio_types::SpuId;
15
16#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
21#[cfg_attr(
22 feature = "use_serde",
23 derive(serde::Serialize, serde::Deserialize),
24 serde(rename_all = "camelCase")
25)]
26pub struct PartitionStatus {
27 pub resolution: PartitionResolution,
28 pub leader: ReplicaStatus,
29 #[cfg_attr(feature = "use_serde", serde(alias = "lrs"))]
32 pub lsr: u32,
33 pub replicas: Vec<ReplicaStatus>,
34 #[cfg_attr(
35 feature = "use_serde",
36 serde(default = "default_partition_status_size")
37 )]
38 #[fluvio(min_version = 5)]
39 pub size: i64,
40 pub is_being_deleted: bool,
41 #[cfg_attr(feature = "use_serde", serde(default))]
42 #[fluvio(min_version = 16)]
43 pub base_offset: i64,
44}
45
46impl Default for PartitionStatus {
47 fn default() -> Self {
48 Self {
49 size: PartitionStatus::SIZE_NOT_SUPPORTED,
50 resolution: Default::default(),
51 leader: Default::default(),
52 lsr: Default::default(),
53 replicas: Default::default(),
54 is_being_deleted: Default::default(),
55 base_offset: Default::default(),
56 }
57 }
58}
59
60#[cfg(feature = "use_serde")]
61const fn default_partition_status_size() -> i64 {
62 PartitionStatus::SIZE_NOT_SUPPORTED
63}
64
65impl fmt::Display for PartitionStatus {
66 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
67 write!(f, "{:#?} Leader: {} [", self.resolution, self.leader)?;
68 for replica in &self.replicas {
69 write!(f, "{replica},")?;
70 }
71 write!(f, "]")
72 }
73}
74
75impl PartitionStatus {
80 pub const SIZE_ERROR: i64 = -1;
81 pub const SIZE_NOT_SUPPORTED: i64 = -2;
82
83 pub fn leader(leader: impl Into<ReplicaStatus>) -> Self {
84 Self::new(leader.into(), vec![])
85 }
86
87 pub fn new(leader: impl Into<ReplicaStatus>, replicas: Vec<ReplicaStatus>) -> Self {
88 Self {
89 resolution: PartitionResolution::default(),
90 leader: leader.into(),
91 replicas,
92 ..Default::default()
93 }
94 }
95
96 pub fn new2(
97 leader: impl Into<ReplicaStatus>,
98 replicas: Vec<ReplicaStatus>,
99 size: i64,
100 resolution: PartitionResolution,
101 base_offset: i64,
102 ) -> Self {
103 Self {
104 resolution,
105 leader: leader.into(),
106 replicas,
107 size,
108 base_offset,
109 ..Default::default()
110 }
111 }
112
113 pub fn is_online(&self) -> bool {
114 self.resolution == PartitionResolution::Online
115 }
116
117 pub fn is_offline(&self) -> bool {
118 self.resolution != PartitionResolution::Online
119 }
120
121 #[deprecated = "Replaced by lrs()"]
122 pub fn lsr(&self) -> u32 {
123 self.lsr
124 }
125
126 pub fn lrs(&self) -> u32 {
127 self.lsr
128 }
129
130 pub fn replica_iter(&self) -> Iter<ReplicaStatus> {
131 self.replicas.iter()
132 }
133
134 pub fn live_replicas(&self) -> Vec<SpuId> {
135 self.replicas.iter().map(|lrs| lrs.spu).collect()
136 }
137
138 pub fn offline_replicas(&self) -> Vec<i32> {
139 vec![]
140 }
141
142 pub fn has_live_replicas(&self) -> bool {
143 !self.replicas.is_empty()
144 }
145
146 pub fn set_to_delete(mut self) -> Self {
148 self.is_being_deleted = true;
149 self
150 }
151}
152
153#[derive(Decoder, Default, Encoder, Debug, Clone, Eq, PartialEq)]
154#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
155pub enum PartitionResolution {
156 #[default]
157 #[fluvio(tag = 0)]
158 Offline, #[fluvio(tag = 1)]
160 Online, #[fluvio(tag = 2)]
162 LeaderOffline, #[fluvio(tag = 3)]
164 ElectionLeaderFound, }
166
167#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
168#[cfg_attr(
169 feature = "use_serde",
170 derive(serde::Serialize, serde::Deserialize),
171 serde(rename_all = "camelCase")
172)]
173pub struct ReplicaStatus {
174 pub spu: SpuId,
175 pub hw: i64,
176 pub leo: i64,
177}
178
179impl fmt::Display for ReplicaStatus {
180 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
181 write!(f, "spu:{} hw:{} leo: {}", self.spu, self.hw, self.leo)
182 }
183}
184
185impl Default for ReplicaStatus {
186 fn default() -> Self {
187 ReplicaStatus {
188 spu: -1,
189 hw: -1,
190 leo: -1,
191 }
192 }
193}
194
195impl ReplicaStatus {
196 pub fn new(spu: SpuId, hw: Offset, leo: Offset) -> Self {
197 Self { spu, hw, leo }
198 }
199
200 pub fn leader_lag(&self, leader_status: &Self) -> i64 {
202 leader_status.leo - self.leo
203 }
204
205 pub fn high_watermark_lag(&self, leader_status: &Self) -> i64 {
206 leader_status.hw - self.hw
207 }
208}
209
210impl From<(SpuId, Offset, Offset)> for ReplicaStatus {
211 fn from(val: (SpuId, Offset, Offset)) -> Self {
212 let (id, high_watermark, end_offset) = val;
213 Self::new(id, high_watermark, end_offset)
214 }
215}