fluvio_controlplane_metadata/topic/
status.rs1#![allow(clippy::assign_op_pattern)]
2
3use std::collections::BTreeMap;
9use std::fmt;
10
11use fluvio_protocol::{Encoder, Decoder};
12use fluvio_types::{ReplicaMap, SpuId, PartitionId};
13
14use crate::partition::PartitionMirrorConfig;
15
16pub type MirrorMap = BTreeMap<PartitionId, PartitionMirrorConfig>;
17
18#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
23#[cfg_attr(
24 feature = "use_serde",
25 derive(serde::Serialize, serde::Deserialize),
26 serde(rename_all = "camelCase")
27)]
28pub struct TopicStatus {
29 pub resolution: TopicResolution,
30 pub replica_map: ReplicaMap,
31 #[cfg_attr(feature = "use_serde", serde(default))]
32 #[fluvio(min_version = 14)]
33 pub mirror_map: MirrorMap,
34 pub reason: String,
35}
36
37impl fmt::Display for TopicStatus {
38 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
39 write!(f, "{:#?}", self.resolution)
40 }
41}
42
43#[derive(Decoder, Default, Encoder, Debug, Clone, Eq, PartialEq)]
44#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
45pub enum TopicResolution {
46 #[default]
47 #[fluvio(tag = 0)]
48 Init, #[fluvio(tag = 1)]
50 Pending, #[fluvio(tag = 2)]
52 InsufficientResources, #[fluvio(tag = 3)]
54 InvalidConfig, #[fluvio(tag = 4)]
56 Provisioned, #[fluvio(tag = 5)]
58 Deleting, }
60
61impl TopicResolution {
62 pub fn resolution_label(&self) -> &'static str {
63 match self {
64 TopicResolution::Provisioned => "provisioned",
65 TopicResolution::Init => "initializing",
66 TopicResolution::Pending => "pending",
67 TopicResolution::InsufficientResources => "insufficient-resources",
68 TopicResolution::InvalidConfig => "invalid-config",
69 TopicResolution::Deleting => "Deleting",
70 }
71 }
72
73 pub fn is_invalid(&self) -> bool {
74 matches!(self, Self::InvalidConfig)
75 }
76
77 pub fn no_resource(&self) -> bool {
78 matches!(self, Self::InsufficientResources)
79 }
80
81 pub fn is_being_deleted(&self) -> bool {
82 matches!(self, Self::Deleting)
83 }
84}
85
86impl std::fmt::Display for TopicResolution {
87 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
88 write!(f, "resolution::{}", self.resolution_label())
89 }
90}
91
92impl ::std::default::Default for TopicStatus {
119 fn default() -> Self {
120 TopicStatus {
121 resolution: TopicResolution::Init,
122 replica_map: BTreeMap::new(),
123 reason: "".to_owned(),
124 mirror_map: BTreeMap::new(),
125 }
126 }
127}
128
129fn create_replica_map(rows: Vec<Vec<SpuId>>) -> ReplicaMap {
134 let mut map: ReplicaMap = BTreeMap::new();
135 for (idx, row) in rows.iter().enumerate() {
136 map.insert(idx as PartitionId, row.clone());
137 }
138 map
139}
140
141impl TopicStatus {
142 pub fn new(
143 resolution: TopicResolution,
144 replica_map: Vec<Vec<SpuId>>,
145 reason: impl Into<String>,
146 ) -> Self {
147 TopicStatus {
148 resolution,
149 replica_map: create_replica_map(replica_map),
150 reason: reason.into(),
151 mirror_map: BTreeMap::new(),
152 }
153 }
154
155 pub fn resolution(&self) -> &TopicResolution {
156 &self.resolution
157 }
158
159 pub fn replica_map_cnt(&self) -> i32 {
160 self.replica_map.len() as i32
161 }
162
163 pub fn set_replica_map(&mut self, replica_map: ReplicaMap) {
164 self.replica_map = replica_map;
165 }
166
167 pub fn set_mirror_map(&mut self, mirror_map: MirrorMap) {
168 self.mirror_map = mirror_map;
169 }
170
171 pub fn spus_in_replica(&self) -> Vec<SpuId> {
172 let mut spu_list: Vec<SpuId> = vec![];
173
174 for (_, replicas) in self.replica_map.iter() {
175 for spu in replicas {
176 if !spu_list.contains(spu) {
177 spu_list.push(*spu);
178 }
179 }
180 }
181
182 spu_list
183 }
184
185 pub fn replica_map_str(&self) -> String {
186 format!("{:?}", self.replica_map)
187 }
188
189 pub fn replica_map_cnt_str(&self) -> String {
190 let map_rows = self.replica_map_cnt();
191 if map_rows > 0 {
192 format!("{map_rows}")
193 } else {
194 "-".to_owned()
195 }
196 }
197
198 pub fn reason_str(&self) -> &String {
199 &self.reason
200 }
201
202 pub fn is_resolution_initializing(&self) -> bool {
207 self.resolution == TopicResolution::Init
208 }
209
210 pub fn need_replica_map_recal(&self) -> bool {
212 self.resolution == TopicResolution::Pending
213 || self.resolution == TopicResolution::InsufficientResources
214 }
215
216 pub fn is_resolution_pending(&self) -> bool {
217 self.resolution == TopicResolution::Pending
218 }
219
220 pub fn is_resolution_transient(&self) -> bool {
221 self.resolution == TopicResolution::Init || self.resolution == TopicResolution::Pending
222 }
223
224 pub fn is_resolution_provisioned(&self) -> bool {
225 self.resolution == TopicResolution::Provisioned
226 }
227
228 pub fn next_resolution_pending() -> (TopicResolution, String) {
230 (TopicResolution::Pending, super::PENDING_REASON.to_owned())
231 }
232
233 pub fn next_resolution_invalid_config(reason: impl Into<String>) -> (TopicResolution, String) {
234 (TopicResolution::InvalidConfig, reason.into())
235 }
236
237 pub fn set_next_resolution(&mut self, next: (TopicResolution, String)) {
238 let (resolution, reason) = next;
239 self.resolution = resolution;
240 self.reason = reason;
241 }
242}