fluvio_controlplane_metadata/partition/
status.rs

1#![allow(clippy::assign_op_pattern)]
2
3//!
4//! # Partition Status
5//!
6//! Partition Status metadata information cached locally.
7//!
8
9use std::fmt;
10use std::slice::Iter;
11
12use fluvio_protocol::{Encoder, Decoder};
13use fluvio_protocol::record::Offset;
14use fluvio_types::SpuId;
15
16// -----------------------------------
17// Data Structures
18// -----------------------------------
19
20#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
21#[cfg_attr(
22    feature = "use_serde",
23    derive(serde::Serialize, serde::Deserialize),
24    serde(rename_all = "camelCase")
25)]
26pub struct PartitionStatus {
27    pub resolution: PartitionResolution,
28    pub leader: ReplicaStatus,
29    // TODO: Next time we make a breaking protocol change, rename this to `lrs`
30    // TODO: There is no such thing as `lsr`, it is a typo
31    #[cfg_attr(feature = "use_serde", serde(alias = "lrs"))]
32    pub lsr: u32,
33    pub replicas: Vec<ReplicaStatus>,
34    #[cfg_attr(
35        feature = "use_serde",
36        serde(default = "default_partition_status_size")
37    )]
38    #[fluvio(min_version = 5)]
39    pub size: i64,
40    pub is_being_deleted: bool,
41    #[cfg_attr(feature = "use_serde", serde(default))]
42    #[fluvio(min_version = 16)]
43    pub base_offset: i64,
44}
45
46impl Default for PartitionStatus {
47    fn default() -> Self {
48        Self {
49            size: PartitionStatus::SIZE_NOT_SUPPORTED,
50            resolution: Default::default(),
51            leader: Default::default(),
52            lsr: Default::default(),
53            replicas: Default::default(),
54            is_being_deleted: Default::default(),
55            base_offset: Default::default(),
56        }
57    }
58}
59
60#[cfg(feature = "use_serde")]
61const fn default_partition_status_size() -> i64 {
62    PartitionStatus::SIZE_NOT_SUPPORTED
63}
64
65impl fmt::Display for PartitionStatus {
66    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
67        write!(f, "{:#?} Leader: {} [", self.resolution, self.leader)?;
68        for replica in &self.replicas {
69            write!(f, "{replica},")?;
70        }
71        write!(f, "]")
72    }
73}
74
75// -----------------------------------
76// Implementation
77// -----------------------------------
78
79impl PartitionStatus {
80    pub const SIZE_ERROR: i64 = -1;
81    pub const SIZE_NOT_SUPPORTED: i64 = -2;
82
83    pub fn leader(leader: impl Into<ReplicaStatus>) -> Self {
84        Self::new(leader.into(), vec![])
85    }
86
87    pub fn new(leader: impl Into<ReplicaStatus>, replicas: Vec<ReplicaStatus>) -> Self {
88        Self {
89            resolution: PartitionResolution::default(),
90            leader: leader.into(),
91            replicas,
92            ..Default::default()
93        }
94    }
95
96    pub fn new2(
97        leader: impl Into<ReplicaStatus>,
98        replicas: Vec<ReplicaStatus>,
99        size: i64,
100        resolution: PartitionResolution,
101        base_offset: i64,
102    ) -> Self {
103        Self {
104            resolution,
105            leader: leader.into(),
106            replicas,
107            size,
108            base_offset,
109            ..Default::default()
110        }
111    }
112
113    pub fn is_online(&self) -> bool {
114        self.resolution == PartitionResolution::Online
115    }
116
117    pub fn is_offline(&self) -> bool {
118        self.resolution != PartitionResolution::Online
119    }
120
121    #[deprecated = "Replaced by lrs()"]
122    pub fn lsr(&self) -> u32 {
123        self.lsr
124    }
125
126    pub fn lrs(&self) -> u32 {
127        self.lsr
128    }
129
130    pub fn replica_iter(&self) -> Iter<ReplicaStatus> {
131        self.replicas.iter()
132    }
133
134    pub fn live_replicas(&self) -> Vec<SpuId> {
135        self.replicas.iter().map(|lrs| lrs.spu).collect()
136    }
137
138    pub fn offline_replicas(&self) -> Vec<i32> {
139        vec![]
140    }
141
142    pub fn has_live_replicas(&self) -> bool {
143        !self.replicas.is_empty()
144    }
145
146    /// set to being deleted
147    pub fn set_to_delete(mut self) -> Self {
148        self.is_being_deleted = true;
149        self
150    }
151}
152
153#[derive(Decoder, Default, Encoder, Debug, Clone, Eq, PartialEq)]
154#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
155pub enum PartitionResolution {
156    #[default]
157    #[fluvio(tag = 0)]
158    Offline, // No leader available for serving partition
159    #[fluvio(tag = 1)]
160    Online, // Partition is running normally, status contains replica info
161    #[fluvio(tag = 2)]
162    LeaderOffline, // Election has failed, no suitable leader has been found
163    #[fluvio(tag = 3)]
164    ElectionLeaderFound, // New leader has been selected
165}
166
167#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
168#[cfg_attr(
169    feature = "use_serde",
170    derive(serde::Serialize, serde::Deserialize),
171    serde(rename_all = "camelCase")
172)]
173pub struct ReplicaStatus {
174    pub spu: SpuId,
175    pub hw: i64,
176    pub leo: i64,
177}
178
179impl fmt::Display for ReplicaStatus {
180    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
181        write!(f, "spu:{} hw:{} leo: {}", self.spu, self.hw, self.leo)
182    }
183}
184
185impl Default for ReplicaStatus {
186    fn default() -> Self {
187        ReplicaStatus {
188            spu: -1,
189            hw: -1,
190            leo: -1,
191        }
192    }
193}
194
195impl ReplicaStatus {
196    pub fn new(spu: SpuId, hw: Offset, leo: Offset) -> Self {
197        Self { spu, hw, leo }
198    }
199
200    /// compute lag score respect to leader
201    pub fn leader_lag(&self, leader_status: &Self) -> i64 {
202        leader_status.leo - self.leo
203    }
204
205    pub fn high_watermark_lag(&self, leader_status: &Self) -> i64 {
206        leader_status.hw - self.hw
207    }
208}
209
210impl From<(SpuId, Offset, Offset)> for ReplicaStatus {
211    fn from(val: (SpuId, Offset, Offset)) -> Self {
212        let (id, high_watermark, end_offset) = val;
213        Self::new(id, high_watermark, end_offset)
214    }
215}