fluvio_controlplane_metadata/partition/
status.rs

1#![allow(clippy::assign_op_pattern)]
2
3//!
4//! # Partition Status
5//!
6//! Partition Status metadata information cached locally.
7//!
8
9use std::fmt;
10use std::slice::Iter;
11
12use fluvio_protocol::{Encoder, Decoder};
13use fluvio_protocol::record::Offset;
14use fluvio_types::SpuId;
15
16// -----------------------------------
17// Data Structures
18// -----------------------------------
19
20#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
21#[cfg_attr(
22    feature = "use_serde",
23    derive(serde::Serialize, serde::Deserialize),
24    serde(rename_all = "camelCase")
25)]
26pub struct PartitionStatus {
27    pub resolution: PartitionResolution,
28    pub leader: ReplicaStatus,
29    // TODO: Next time we make a breaking protocol change, rename this to `lrs`
30    // TODO: There is no such thing as `lsr`, it is a typo
31    #[cfg_attr(feature = "use_serde", serde(alias = "lrs"))]
32    pub lsr: u32,
33    pub replicas: Vec<ReplicaStatus>,
34    #[cfg_attr(
35        feature = "use_serde",
36        serde(default = "default_partition_status_size")
37    )]
38    #[fluvio(min_version = 5)]
39    pub size: i64,
40    pub is_being_deleted: bool,
41    #[cfg_attr(feature = "use_serde", serde(default))]
42    #[fluvio(min_version = 16)]
43    pub base_offset: i64,
44}
45
46impl Default for PartitionStatus {
47    fn default() -> Self {
48        Self {
49            size: PartitionStatus::SIZE_NOT_SUPPORTED,
50            resolution: Default::default(),
51            leader: Default::default(),
52            lsr: Default::default(),
53            replicas: Default::default(),
54            is_being_deleted: Default::default(),
55            base_offset: Default::default(),
56        }
57    }
58}
59
60#[cfg(feature = "use_serde")]
61const fn default_partition_status_size() -> i64 {
62    PartitionStatus::SIZE_NOT_SUPPORTED
63}
64
65impl fmt::Display for PartitionStatus {
66    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
67        write!(f, "{:#?} Leader: {} [", self.resolution, self.leader)?;
68        for replica in &self.replicas {
69            write!(f, "{replica},")?;
70        }
71        write!(f, "]")
72    }
73}
74
75// -----------------------------------
76// Implementation
77// -----------------------------------
78
79impl PartitionStatus {
80    pub const SIZE_ERROR: i64 = -1;
81    pub const SIZE_NOT_SUPPORTED: i64 = -2;
82
83    pub fn leader(leader: impl Into<ReplicaStatus>) -> Self {
84        Self::new(leader.into(), vec![])
85    }
86
87    pub fn new(leader: impl Into<ReplicaStatus>, replicas: Vec<ReplicaStatus>) -> Self {
88        Self {
89            resolution: PartitionResolution::default(),
90            leader: leader.into(),
91            replicas,
92            ..Default::default()
93        }
94    }
95
96    pub fn new2(
97        leader: impl Into<ReplicaStatus>,
98        replicas: Vec<ReplicaStatus>,
99        size: i64,
100        resolution: PartitionResolution,
101        base_offset: i64,
102    ) -> Self {
103        Self {
104            resolution,
105            leader: leader.into(),
106            replicas,
107            size,
108            base_offset,
109            ..Default::default()
110        }
111    }
112
113    pub fn is_online(&self) -> bool {
114        matches!(self.resolution, PartitionResolution::Online)
115    }
116
117    pub fn is_readable(&self) -> bool {
118        matches!(
119            self.resolution,
120            PartitionResolution::Online | PartitionResolution::OutOfStorage
121        )
122    }
123
124    pub fn is_offline(&self) -> bool {
125        self.resolution == PartitionResolution::Offline
126    }
127
128    #[deprecated = "Replaced by lrs()"]
129    pub fn lsr(&self) -> u32 {
130        self.lsr
131    }
132
133    pub fn lrs(&self) -> u32 {
134        self.lsr
135    }
136
137    pub fn replica_iter(&self) -> Iter<ReplicaStatus> {
138        self.replicas.iter()
139    }
140
141    pub fn live_replicas(&self) -> Vec<SpuId> {
142        self.replicas.iter().map(|lrs| lrs.spu).collect()
143    }
144
145    pub fn offline_replicas(&self) -> Vec<i32> {
146        vec![]
147    }
148
149    pub fn has_live_replicas(&self) -> bool {
150        !self.replicas.is_empty()
151    }
152
153    /// set to being deleted
154    pub fn set_to_delete(mut self) -> Self {
155        self.is_being_deleted = true;
156        self
157    }
158}
159
160#[derive(Decoder, Default, Encoder, Debug, Clone, Eq, PartialEq)]
161#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
162pub enum PartitionResolution {
163    #[default]
164    #[fluvio(tag = 0)]
165    Offline, // No leader available for serving partition
166    #[fluvio(tag = 1)]
167    Online, // Partition is running normally, status contains replica info
168    #[fluvio(tag = 2)]
169    LeaderOffline, // Election has failed, no suitable leader has been found
170    #[fluvio(tag = 3)]
171    ElectionLeaderFound, // New leader has been selected
172    #[fluvio(tag = 4, min_version = 19)]
173    OutOfStorage,
174}
175
176#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
177#[cfg_attr(
178    feature = "use_serde",
179    derive(serde::Serialize, serde::Deserialize),
180    serde(rename_all = "camelCase")
181)]
182pub struct ReplicaStatus {
183    pub spu: SpuId,
184    pub hw: i64,
185    pub leo: i64,
186}
187
188impl fmt::Display for ReplicaStatus {
189    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
190        write!(f, "spu:{} hw:{} leo: {}", self.spu, self.hw, self.leo)
191    }
192}
193
194impl Default for ReplicaStatus {
195    fn default() -> Self {
196        ReplicaStatus {
197            spu: -1,
198            hw: -1,
199            leo: -1,
200        }
201    }
202}
203
204impl ReplicaStatus {
205    pub fn new(spu: SpuId, hw: Offset, leo: Offset) -> Self {
206        Self { spu, hw, leo }
207    }
208
209    /// compute lag score respect to leader
210    pub fn leader_lag(&self, leader_status: &Self) -> i64 {
211        leader_status.leo - self.leo
212    }
213
214    pub fn high_watermark_lag(&self, leader_status: &Self) -> i64 {
215        leader_status.hw - self.hw
216    }
217}
218
219impl From<(SpuId, Offset, Offset)> for ReplicaStatus {
220    fn from(val: (SpuId, Offset, Offset)) -> Self {
221        let (id, high_watermark, end_offset) = val;
222        Self::new(id, high_watermark, end_offset)
223    }
224}