fluvio_controlplane_metadata/partition/
spec.rs

1#![allow(clippy::assign_op_pattern)]
2
3use fluvio_types::SpuId;
4use fluvio_protocol::{link::ErrorCode, Decoder, Encoder};
5
6use crate::topic::{CleanupPolicy, CompressionAlgorithm, Deduplication, TopicSpec, TopicStorageConfig};
7
8/// Spec for Partition
9/// Each partition has replicas spread among SPU
10/// one of replica is leader which is duplicated in the leader field
11#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
12#[cfg_attr(
13    feature = "use_serde",
14    derive(serde::Serialize, serde::Deserialize),
15    serde(rename_all = "camelCase")
16)]
17pub struct PartitionSpec {
18    pub leader: SpuId,
19    #[cfg_attr(feature = "use_serde", serde(default))]
20    pub replicas: Vec<SpuId>,
21    #[fluvio(min_version = 4)]
22    pub cleanup_policy: Option<CleanupPolicy>,
23    #[fluvio(min_version = 4)]
24    pub storage: Option<TopicStorageConfig>,
25    #[cfg_attr(feature = "use_serde", serde(default))]
26    #[fluvio(min_version = 6)]
27    pub compression_type: CompressionAlgorithm,
28    #[fluvio(min_version = 12)]
29    pub deduplication: Option<Deduplication>,
30    #[cfg_attr(feature = "use_serde", serde(default))]
31    #[fluvio(min_version = 13)]
32    pub system: bool,
33    #[cfg_attr(feature = "use_serde", serde(default))]
34    #[fluvio(min_version = 14)]
35    pub mirror: Option<PartitionMirrorConfig>,
36}
37
38impl PartitionSpec {
39    pub fn new(leader: SpuId, replicas: Vec<SpuId>) -> Self {
40        Self {
41            leader,
42            replicas,
43            ..Default::default()
44        }
45    }
46
47    /// Create new partition spec from replica mapping with topic spec. This assume first replica is leader
48    pub fn from_replicas(
49        replicas: Vec<SpuId>,
50        topic: &TopicSpec,
51        mirror: Option<&PartitionMirrorConfig>,
52    ) -> Self {
53        let leader = if replicas.is_empty() { 0 } else { replicas[0] };
54
55        Self {
56            leader,
57            replicas,
58            mirror: mirror.cloned(),
59            cleanup_policy: topic.get_clean_policy().cloned(),
60            storage: topic.get_storage().cloned(),
61            compression_type: topic.get_compression_type().clone(),
62            deduplication: topic.get_deduplication().cloned(),
63            system: topic.is_system(),
64        }
65    }
66
67    pub fn has_spu(&self, spu: &SpuId) -> bool {
68        self.replicas.contains(spu)
69    }
70
71    /// follower replicas
72    pub fn followers(&self) -> Vec<SpuId> {
73        self.replicas
74            .iter()
75            .filter_map(|r| if r == &self.leader { None } else { Some(*r) })
76            .collect()
77    }
78
79    pub fn mirror_string(&self) -> String {
80        if let Some(mirror) = &self.mirror {
81            let external = mirror.external_cluster();
82            match mirror {
83                PartitionMirrorConfig::Remote(remote) => {
84                    if remote.target {
85                        format!("{external}(from-home)")
86                    } else {
87                        format!("{external}(to-home)")
88                    }
89                }
90
91                PartitionMirrorConfig::Home(home) => {
92                    if home.source {
93                        format!("{external}(to-remote)")
94                    } else {
95                        format!("{external}(from-remote)")
96                    }
97                }
98            }
99        } else {
100            "".to_owned()
101        }
102    }
103}
104
105impl From<Vec<SpuId>> for PartitionSpec {
106    fn from(replicas: Vec<SpuId>) -> Self {
107        if !replicas.is_empty() {
108            Self::new(replicas[0], replicas)
109        } else {
110            Self::new(0, replicas)
111        }
112    }
113}
114
115/// Setting applied to a replica
116#[derive(Decoder, Encoder, Debug, Eq, PartialEq, Clone, Default)]
117pub struct PartitionConfig {
118    pub retention_time_seconds: Option<u32>,
119}
120
121#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
122#[cfg_attr(
123    feature = "use_serde",
124    derive(serde::Serialize, serde::Deserialize),
125    derive(schemars::JsonSchema),
126    serde(rename_all = "camelCase")
127)]
128pub enum PartitionMirrorConfig {
129    #[fluvio(tag = 0)]
130    Remote(RemotePartitionConfig),
131    #[fluvio(tag = 1)]
132    Home(HomePartitionConfig),
133}
134
135impl Default for PartitionMirrorConfig {
136    fn default() -> Self {
137        Self::Remote(RemotePartitionConfig::default())
138    }
139}
140
141impl PartitionMirrorConfig {
142    pub fn remote(&self) -> Option<&RemotePartitionConfig> {
143        match self {
144            Self::Remote(e) => Some(e),
145            _ => None,
146        }
147    }
148
149    pub fn home(&self) -> Option<&HomePartitionConfig> {
150        match self {
151            Self::Home(c) => Some(c),
152            _ => None,
153        }
154    }
155
156    pub fn external_cluster(&self) -> String {
157        match self {
158            Self::Remote(r) => format!(
159                "{}:{}:{}",
160                r.home_cluster, r.home_spu_id, r.home_spu_endpoint
161            ),
162            Self::Home(h) => h.remote_cluster.clone(),
163        }
164    }
165
166    #[deprecated(since = "0.29.1")]
167    pub fn is_home_mirror(&self) -> bool {
168        matches!(self, Self::Home(_))
169    }
170
171    /// check whether this mirror should accept traffic
172    pub fn accept_traffic(&self) -> Option<ErrorCode> {
173        match self {
174            Self::Remote(r) => {
175                if r.target {
176                    Some(ErrorCode::MirrorProduceFromRemoteNotAllowed)
177                } else {
178                    None
179                }
180            }
181            Self::Home(h) => {
182                if h.source {
183                    None
184                } else {
185                    Some(ErrorCode::MirrorProduceFromHome)
186                }
187            }
188        }
189    }
190}
191
192impl std::fmt::Display for PartitionMirrorConfig {
193    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
194        match self {
195            PartitionMirrorConfig::Remote(cfg) => write!(f, "{cfg}"),
196            PartitionMirrorConfig::Home(cfg) => write!(f, "{cfg}"),
197        }
198    }
199}
200
201#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
202#[cfg_attr(
203    feature = "use_serde",
204    derive(serde::Serialize, serde::Deserialize),
205    derive(schemars::JsonSchema),
206    serde(rename_all = "camelCase")
207)]
208pub struct HomePartitionConfig {
209    pub remote_cluster: String,
210    pub remote_replica: String,
211    // if this is set, home will be mirror instead of
212    #[cfg_attr(
213        feature = "use_serde",
214        serde(default, skip_serializing_if = "crate::is_false")
215    )]
216    #[fluvio(min_version = 18)]
217    pub source: bool,
218}
219
220impl std::fmt::Display for HomePartitionConfig {
221    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
222        write!(f, "{}", self.remote_cluster)
223    }
224}
225
226#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
227#[cfg_attr(
228    feature = "use_serde",
229    derive(serde::Serialize, serde::Deserialize),
230    derive(schemars::JsonSchema),
231    serde(rename_all = "camelCase")
232)]
233pub struct RemotePartitionConfig {
234    pub home_cluster: String,
235    pub home_spu_key: String,
236    #[cfg_attr(feature = "use_serde", serde(default))]
237    pub home_spu_id: SpuId,
238    pub home_spu_endpoint: String,
239    #[cfg_attr(
240        feature = "use_serde",
241        serde(default, skip_serializing_if = "crate::is_false")
242    )]
243    #[fluvio(min_version = 18)]
244    pub target: bool,
245}
246
247impl std::fmt::Display for RemotePartitionConfig {
248    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
249        write!(
250            f,
251            "{}:{}:{}",
252            self.home_cluster, self.home_spu_id, self.home_spu_endpoint
253        )
254    }
255}