fluvio_controlplane_metadata/partition/
spec.rs1#![allow(clippy::assign_op_pattern)]
2
3use fluvio_types::SpuId;
4use fluvio_protocol::{link::ErrorCode, Decoder, Encoder};
5
6use crate::topic::{CleanupPolicy, CompressionAlgorithm, Deduplication, TopicSpec, TopicStorageConfig};
7
8#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
12#[cfg_attr(
13 feature = "use_serde",
14 derive(serde::Serialize, serde::Deserialize),
15 serde(rename_all = "camelCase")
16)]
17pub struct PartitionSpec {
18 pub leader: SpuId,
19 #[cfg_attr(feature = "use_serde", serde(default))]
20 pub replicas: Vec<SpuId>,
21 #[fluvio(min_version = 4)]
22 pub cleanup_policy: Option<CleanupPolicy>,
23 #[fluvio(min_version = 4)]
24 pub storage: Option<TopicStorageConfig>,
25 #[cfg_attr(feature = "use_serde", serde(default))]
26 #[fluvio(min_version = 6)]
27 pub compression_type: CompressionAlgorithm,
28 #[fluvio(min_version = 12)]
29 pub deduplication: Option<Deduplication>,
30 #[cfg_attr(feature = "use_serde", serde(default))]
31 #[fluvio(min_version = 13)]
32 pub system: bool,
33 #[cfg_attr(feature = "use_serde", serde(default))]
34 #[fluvio(min_version = 14)]
35 pub mirror: Option<PartitionMirrorConfig>,
36}
37
38impl PartitionSpec {
39 pub fn new(leader: SpuId, replicas: Vec<SpuId>) -> Self {
40 Self {
41 leader,
42 replicas,
43 ..Default::default()
44 }
45 }
46
47 pub fn from_replicas(
49 replicas: Vec<SpuId>,
50 topic: &TopicSpec,
51 mirror: Option<&PartitionMirrorConfig>,
52 ) -> Self {
53 let leader = if replicas.is_empty() { 0 } else { replicas[0] };
54
55 Self {
56 leader,
57 replicas,
58 mirror: mirror.cloned(),
59 cleanup_policy: topic.get_clean_policy().cloned(),
60 storage: topic.get_storage().cloned(),
61 compression_type: topic.get_compression_type().clone(),
62 deduplication: topic.get_deduplication().cloned(),
63 system: topic.is_system(),
64 }
65 }
66
67 pub fn has_spu(&self, spu: &SpuId) -> bool {
68 self.replicas.contains(spu)
69 }
70
71 pub fn followers(&self) -> Vec<SpuId> {
73 self.replicas
74 .iter()
75 .filter_map(|r| if r == &self.leader { None } else { Some(*r) })
76 .collect()
77 }
78
79 pub fn mirror_string(&self) -> String {
80 if let Some(mirror) = &self.mirror {
81 let external = mirror.external_cluster();
82 match mirror {
83 PartitionMirrorConfig::Remote(remote) => {
84 if remote.target {
85 format!("{external}(from-home)")
86 } else {
87 format!("{external}(to-home)")
88 }
89 }
90
91 PartitionMirrorConfig::Home(home) => {
92 if home.source {
93 format!("{external}(to-remote)")
94 } else {
95 format!("{external}(from-remote)")
96 }
97 }
98 }
99 } else {
100 "".to_owned()
101 }
102 }
103}
104
105impl From<Vec<SpuId>> for PartitionSpec {
106 fn from(replicas: Vec<SpuId>) -> Self {
107 if !replicas.is_empty() {
108 Self::new(replicas[0], replicas)
109 } else {
110 Self::new(0, replicas)
111 }
112 }
113}
114
115#[derive(Decoder, Encoder, Debug, Eq, PartialEq, Clone, Default)]
117pub struct PartitionConfig {
118 pub retention_time_seconds: Option<u32>,
119}
120
121#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
122#[cfg_attr(
123 feature = "use_serde",
124 derive(serde::Serialize, serde::Deserialize),
125 derive(schemars::JsonSchema),
126 serde(rename_all = "camelCase")
127)]
128pub enum PartitionMirrorConfig {
129 #[fluvio(tag = 0)]
130 Remote(RemotePartitionConfig),
131 #[fluvio(tag = 1)]
132 Home(HomePartitionConfig),
133}
134
135impl Default for PartitionMirrorConfig {
136 fn default() -> Self {
137 Self::Remote(RemotePartitionConfig::default())
138 }
139}
140
141impl PartitionMirrorConfig {
142 pub fn remote(&self) -> Option<&RemotePartitionConfig> {
143 match self {
144 Self::Remote(e) => Some(e),
145 _ => None,
146 }
147 }
148
149 pub fn home(&self) -> Option<&HomePartitionConfig> {
150 match self {
151 Self::Home(c) => Some(c),
152 _ => None,
153 }
154 }
155
156 pub fn external_cluster(&self) -> String {
157 match self {
158 Self::Remote(r) => format!(
159 "{}:{}:{}",
160 r.home_cluster, r.home_spu_id, r.home_spu_endpoint
161 ),
162 Self::Home(h) => h.remote_cluster.clone(),
163 }
164 }
165
166 #[deprecated(since = "0.29.1")]
167 pub fn is_home_mirror(&self) -> bool {
168 matches!(self, Self::Home(_))
169 }
170
171 pub fn accept_traffic(&self) -> Option<ErrorCode> {
173 match self {
174 Self::Remote(r) => {
175 if r.target {
176 Some(ErrorCode::MirrorProduceFromRemoteNotAllowed)
177 } else {
178 None
179 }
180 }
181 Self::Home(h) => {
182 if h.source {
183 None
184 } else {
185 Some(ErrorCode::MirrorProduceFromHome)
186 }
187 }
188 }
189 }
190}
191
192impl std::fmt::Display for PartitionMirrorConfig {
193 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
194 match self {
195 PartitionMirrorConfig::Remote(cfg) => write!(f, "{cfg}"),
196 PartitionMirrorConfig::Home(cfg) => write!(f, "{cfg}"),
197 }
198 }
199}
200
201#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
202#[cfg_attr(
203 feature = "use_serde",
204 derive(serde::Serialize, serde::Deserialize),
205 derive(schemars::JsonSchema),
206 serde(rename_all = "camelCase")
207)]
208pub struct HomePartitionConfig {
209 pub remote_cluster: String,
210 pub remote_replica: String,
211 #[cfg_attr(
213 feature = "use_serde",
214 serde(default, skip_serializing_if = "crate::is_false")
215 )]
216 #[fluvio(min_version = 18)]
217 pub source: bool,
218}
219
220impl std::fmt::Display for HomePartitionConfig {
221 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
222 write!(f, "{}", self.remote_cluster)
223 }
224}
225
226#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
227#[cfg_attr(
228 feature = "use_serde",
229 derive(serde::Serialize, serde::Deserialize),
230 derive(schemars::JsonSchema),
231 serde(rename_all = "camelCase")
232)]
233pub struct RemotePartitionConfig {
234 pub home_cluster: String,
235 pub home_spu_key: String,
236 #[cfg_attr(feature = "use_serde", serde(default))]
237 pub home_spu_id: SpuId,
238 pub home_spu_endpoint: String,
239 #[cfg_attr(
240 feature = "use_serde",
241 serde(default, skip_serializing_if = "crate::is_false")
242 )]
243 #[fluvio(min_version = 18)]
244 pub target: bool,
245}
246
247impl std::fmt::Display for RemotePartitionConfig {
248 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
249 write!(
250 f,
251 "{}:{}:{}",
252 self.home_cluster, self.home_spu_id, self.home_spu_endpoint
253 )
254 }
255}