fluvio_controlplane_metadata/topic/
status.rs

1#![allow(clippy::assign_op_pattern)]
2
3//!
4//! # Topic Status
5//!
6//! Topic Status metadata information cached locally.
7//!
8use std::collections::BTreeMap;
9use std::fmt;
10
11use fluvio_protocol::{Encoder, Decoder};
12use fluvio_types::{ReplicaMap, SpuId, PartitionId};
13
14use crate::partition::PartitionMirrorConfig;
15
16pub type MirrorMap = BTreeMap<PartitionId, PartitionMirrorConfig>;
17
18// -----------------------------------
19// Data Structures
20// -----------------------------------
21
22#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
23#[cfg_attr(
24    feature = "use_serde",
25    derive(serde::Serialize, serde::Deserialize),
26    serde(rename_all = "camelCase")
27)]
28pub struct TopicStatus {
29    pub resolution: TopicResolution,
30    pub replica_map: ReplicaMap,
31    #[cfg_attr(feature = "use_serde", serde(default))]
32    #[fluvio(min_version = 14)]
33    pub mirror_map: MirrorMap,
34    pub reason: String,
35}
36
37impl fmt::Display for TopicStatus {
38    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
39        write!(f, "{:#?}", self.resolution)
40    }
41}
42
43#[derive(Decoder, Default, Encoder, Debug, Clone, Eq, PartialEq)]
44#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
45pub enum TopicResolution {
46    #[default]
47    #[fluvio(tag = 0)]
48    Init, // Initializing this is starting state.
49    #[fluvio(tag = 1)]
50    Pending, // Has valid config, ready for replica mapping assignment
51    #[fluvio(tag = 2)]
52    InsufficientResources, // Replica map cannot be created due to lack of capacity
53    #[fluvio(tag = 3)]
54    InvalidConfig, // Invalid configuration
55    #[fluvio(tag = 4)]
56    Provisioned, // All partitions has been provisioned
57    #[fluvio(tag = 5)]
58    Deleting, // Process of being deleted
59}
60
61impl TopicResolution {
62    pub fn resolution_label(&self) -> &'static str {
63        match self {
64            TopicResolution::Provisioned => "provisioned",
65            TopicResolution::Init => "initializing",
66            TopicResolution::Pending => "pending",
67            TopicResolution::InsufficientResources => "insufficient-resources",
68            TopicResolution::InvalidConfig => "invalid-config",
69            TopicResolution::Deleting => "Deleting",
70        }
71    }
72
73    pub fn is_invalid(&self) -> bool {
74        matches!(self, Self::InvalidConfig)
75    }
76
77    pub fn no_resource(&self) -> bool {
78        matches!(self, Self::InsufficientResources)
79    }
80
81    pub fn is_being_deleted(&self) -> bool {
82        matches!(self, Self::Deleting)
83    }
84}
85
86impl std::fmt::Display for TopicResolution {
87    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
88        write!(f, "resolution::{}", self.resolution_label())
89    }
90}
91
92// -----------------------------------
93// Encoder - from KV Topic Status
94// -----------------------------------
95
96/*
97// -----------------------------------
98// Encoder/Decoder - Internal API
99// -----------------------------------
100
101impl From<&TopicResolution> for u8 {
102    fn from(spec: &TopicResolution) -> Self {
103        match spec {
104            TopicResolution::Init => 0 as u8,
105            TopicResolution::Pending => 1 as u8,
106            TopicResolution::NoResourceForReplicaMap => 2 as u8,
107            TopicResolution::InvalidConfig => 3 as u8,
108            TopicResolution::Provisioned => 4 as u8,
109        }
110    }
111}
112*/
113
114// -----------------------------------
115// Default
116// -----------------------------------
117
118impl ::std::default::Default for TopicStatus {
119    fn default() -> Self {
120        TopicStatus {
121            resolution: TopicResolution::Init,
122            replica_map: BTreeMap::new(),
123            reason: "".to_owned(),
124            mirror_map: BTreeMap::new(),
125        }
126    }
127}
128
129// -----------------------------------
130// Implementation
131// -----------------------------------
132
133fn create_replica_map(rows: Vec<Vec<SpuId>>) -> ReplicaMap {
134    let mut map: ReplicaMap = BTreeMap::new();
135    for (idx, row) in rows.iter().enumerate() {
136        map.insert(idx as PartitionId, row.clone());
137    }
138    map
139}
140
141impl TopicStatus {
142    pub fn new(
143        resolution: TopicResolution,
144        replica_map: Vec<Vec<SpuId>>,
145        reason: impl Into<String>,
146    ) -> Self {
147        TopicStatus {
148            resolution,
149            replica_map: create_replica_map(replica_map),
150            reason: reason.into(),
151            mirror_map: BTreeMap::new(),
152        }
153    }
154
155    pub fn resolution(&self) -> &TopicResolution {
156        &self.resolution
157    }
158
159    pub fn replica_map_cnt(&self) -> i32 {
160        self.replica_map.len() as i32
161    }
162
163    pub fn set_replica_map(&mut self, replica_map: ReplicaMap) {
164        self.replica_map = replica_map;
165    }
166
167    pub fn set_mirror_map(&mut self, mirror_map: MirrorMap) {
168        self.mirror_map = mirror_map;
169    }
170
171    pub fn spus_in_replica(&self) -> Vec<SpuId> {
172        let mut spu_list: Vec<SpuId> = vec![];
173
174        for (_, replicas) in self.replica_map.iter() {
175            for spu in replicas {
176                if !spu_list.contains(spu) {
177                    spu_list.push(*spu);
178                }
179            }
180        }
181
182        spu_list
183    }
184
185    pub fn replica_map_str(&self) -> String {
186        format!("{:?}", self.replica_map)
187    }
188
189    pub fn replica_map_cnt_str(&self) -> String {
190        let map_rows = self.replica_map_cnt();
191        if map_rows > 0 {
192            format!("{map_rows}")
193        } else {
194            "-".to_owned()
195        }
196    }
197
198    pub fn reason_str(&self) -> &String {
199        &self.reason
200    }
201
202    // -----------------------------------
203    // State Machine
204    // -----------------------------------
205
206    pub fn is_resolution_initializing(&self) -> bool {
207        self.resolution == TopicResolution::Init
208    }
209
210    /// need to update the replic map
211    pub fn need_replica_map_recal(&self) -> bool {
212        self.resolution == TopicResolution::Pending
213            || self.resolution == TopicResolution::InsufficientResources
214    }
215
216    pub fn is_resolution_pending(&self) -> bool {
217        self.resolution == TopicResolution::Pending
218    }
219
220    pub fn is_resolution_transient(&self) -> bool {
221        self.resolution == TopicResolution::Init || self.resolution == TopicResolution::Pending
222    }
223
224    pub fn is_resolution_provisioned(&self) -> bool {
225        self.resolution == TopicResolution::Provisioned
226    }
227
228    /// set to pending mode which means it is waiting for spu resources to be allocated
229    pub fn next_resolution_pending() -> (TopicResolution, String) {
230        (TopicResolution::Pending, super::PENDING_REASON.to_owned())
231    }
232
233    pub fn next_resolution_invalid_config(reason: impl Into<String>) -> (TopicResolution, String) {
234        (TopicResolution::InvalidConfig, reason.into())
235    }
236
237    pub fn set_next_resolution(&mut self, next: (TopicResolution, String)) {
238        let (resolution, reason) = next;
239        self.resolution = resolution;
240        self.reason = reason;
241    }
242}