fluvio_controlplane_metadata/partition/
spec.rs

1#![allow(clippy::assign_op_pattern)]
2
3use fluvio_types::SpuId;
4use fluvio_protocol::{link::ErrorCode, Decoder, Encoder};
5
6use crate::topic::{CleanupPolicy, CompressionAlgorithm, Deduplication, TopicSpec, TopicStorageConfig};
7
8/// Spec for Partition
9/// Each partition has replicas spread among SPU
10/// one of replica is leader which is duplicated in the leader field
11#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
12#[cfg_attr(
13    feature = "use_serde",
14    derive(serde::Serialize, serde::Deserialize),
15    serde(rename_all = "camelCase")
16)]
17pub struct PartitionSpec {
18    pub leader: SpuId,
19    #[cfg_attr(feature = "use_serde", serde(default))]
20    pub replicas: Vec<SpuId>,
21    #[fluvio(min_version = 4)]
22    pub cleanup_policy: Option<CleanupPolicy>,
23    #[fluvio(min_version = 4)]
24    pub storage: Option<TopicStorageConfig>,
25    #[cfg_attr(feature = "use_serde", serde(default))]
26    #[fluvio(min_version = 6)]
27    pub compression_type: CompressionAlgorithm,
28    #[fluvio(min_version = 12)]
29    pub deduplication: Option<Deduplication>,
30    #[cfg_attr(feature = "use_serde", serde(default))]
31    #[fluvio(min_version = 13)]
32    pub system: bool,
33    #[cfg_attr(feature = "use_serde", serde(default))]
34    #[fluvio(min_version = 14)]
35    pub mirror: Option<PartitionMirrorConfig>,
36}
37
38impl PartitionSpec {
39    pub fn new(leader: SpuId, replicas: Vec<SpuId>) -> Self {
40        Self {
41            leader,
42            replicas,
43            ..Default::default()
44        }
45    }
46
47    /// Create new partition spec from replica mapping with topic spec. This assume first replica is leader
48    pub fn from_replicas(
49        replicas: Vec<SpuId>,
50        topic: &TopicSpec,
51        mirror: Option<&PartitionMirrorConfig>,
52    ) -> Self {
53        let leader = if replicas.is_empty() { 0 } else { replicas[0] };
54
55        Self {
56            leader,
57            replicas,
58            mirror: mirror.cloned(),
59            cleanup_policy: topic.get_clean_policy().cloned(),
60            storage: topic.get_storage().cloned(),
61            compression_type: topic.get_compression_type().clone(),
62            deduplication: topic.get_deduplication().cloned(),
63            system: topic.is_system(),
64        }
65    }
66
67    pub fn has_spu(&self, spu: &SpuId) -> bool {
68        self.replicas.contains(spu)
69    }
70
71    /// follower replicas
72    pub fn followers(&self) -> Vec<SpuId> {
73        self.replicas
74            .iter()
75            .filter_map(|r| if r == &self.leader { None } else { Some(*r) })
76            .collect()
77    }
78
79    pub fn mirror_string(&self) -> String {
80        if let Some(mirror) = &self.mirror {
81            let external = mirror.external_cluster();
82            match mirror {
83                PartitionMirrorConfig::Remote(remote) => {
84                    if remote.target {
85                        format!("{}(from-home)", external)
86                    } else {
87                        format!("{}(to-home)", external)
88                    }
89                }
90
91                PartitionMirrorConfig::Home(home) => {
92                    if home.source {
93                        format!("{}(to-remote)", external)
94                    } else {
95                        format!("{}(from-remote)", external)
96                    }
97                }
98            }
99        } else {
100            "".to_owned()
101        }
102    }
103}
104
105impl From<Vec<SpuId>> for PartitionSpec {
106    fn from(replicas: Vec<SpuId>) -> Self {
107        if !replicas.is_empty() {
108            Self::new(replicas[0], replicas)
109        } else {
110            Self::new(0, replicas)
111        }
112    }
113}
114
115/// Setting applied to a replica
116#[derive(Decoder, Encoder, Debug, Eq, PartialEq, Clone, Default)]
117pub struct PartitionConfig {
118    pub retention_time_seconds: Option<u32>,
119}
120
121#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
122#[cfg_attr(
123    feature = "use_serde",
124    derive(serde::Serialize, serde::Deserialize),
125    serde(rename_all = "camelCase")
126)]
127pub enum PartitionMirrorConfig {
128    #[fluvio(tag = 0)]
129    Remote(RemotePartitionConfig),
130    #[fluvio(tag = 1)]
131    Home(HomePartitionConfig),
132}
133
134impl Default for PartitionMirrorConfig {
135    fn default() -> Self {
136        Self::Remote(RemotePartitionConfig::default())
137    }
138}
139
140impl PartitionMirrorConfig {
141    pub fn remote(&self) -> Option<&RemotePartitionConfig> {
142        match self {
143            Self::Remote(e) => Some(e),
144            _ => None,
145        }
146    }
147
148    pub fn home(&self) -> Option<&HomePartitionConfig> {
149        match self {
150            Self::Home(c) => Some(c),
151            _ => None,
152        }
153    }
154
155    pub fn external_cluster(&self) -> String {
156        match self {
157            Self::Remote(r) => format!(
158                "{}:{}:{}",
159                r.home_cluster, r.home_spu_id, r.home_spu_endpoint
160            ),
161            Self::Home(h) => h.remote_cluster.clone(),
162        }
163    }
164
165    #[deprecated(since = "0.29.1")]
166    pub fn is_home_mirror(&self) -> bool {
167        matches!(self, Self::Home(_))
168    }
169
170    /// check whether this mirror should accept traffic
171    pub fn accept_traffic(&self) -> Option<ErrorCode> {
172        match self {
173            Self::Remote(r) => {
174                if r.target {
175                    Some(ErrorCode::MirrorProduceFromRemoteNotAllowed)
176                } else {
177                    None
178                }
179            }
180            Self::Home(h) => {
181                if h.source {
182                    None
183                } else {
184                    Some(ErrorCode::MirrorProduceFromHome)
185                }
186            }
187        }
188    }
189}
190
191impl std::fmt::Display for PartitionMirrorConfig {
192    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
193        match self {
194            PartitionMirrorConfig::Remote(cfg) => write!(f, "{}", cfg),
195            PartitionMirrorConfig::Home(cfg) => write!(f, "{}", cfg),
196        }
197    }
198}
199
200#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
201#[cfg_attr(
202    feature = "use_serde",
203    derive(serde::Serialize, serde::Deserialize),
204    serde(rename_all = "camelCase")
205)]
206pub struct HomePartitionConfig {
207    pub remote_cluster: String,
208    pub remote_replica: String,
209    // if this is set, home will be mirror instead of
210    #[cfg_attr(
211        feature = "use_serde",
212        serde(default, skip_serializing_if = "crate::is_false")
213    )]
214    #[fluvio(min_version = 18)]
215    pub source: bool,
216}
217
218impl std::fmt::Display for HomePartitionConfig {
219    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
220        write!(f, "{}", self.remote_cluster)
221    }
222}
223
224#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
225#[cfg_attr(
226    feature = "use_serde",
227    derive(serde::Serialize, serde::Deserialize),
228    serde(rename_all = "camelCase")
229)]
230pub struct RemotePartitionConfig {
231    pub home_cluster: String,
232    pub home_spu_key: String,
233    #[cfg_attr(feature = "use_serde", serde(default))]
234    pub home_spu_id: SpuId,
235    pub home_spu_endpoint: String,
236    #[cfg_attr(
237        feature = "use_serde",
238        serde(default, skip_serializing_if = "crate::is_false")
239    )]
240    #[fluvio(min_version = 18)]
241    pub target: bool,
242}
243
244impl std::fmt::Display for RemotePartitionConfig {
245    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
246        write!(
247            f,
248            "{}:{}:{}",
249            self.home_cluster, self.home_spu_id, self.home_spu_endpoint
250        )
251    }
252}