fluvio_controlplane_metadata/partition/
spec.rs1#![allow(clippy::assign_op_pattern)]
2
3use fluvio_types::SpuId;
4use fluvio_protocol::{link::ErrorCode, Decoder, Encoder};
5
6use crate::topic::{CleanupPolicy, CompressionAlgorithm, Deduplication, TopicSpec, TopicStorageConfig};
7
8#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
12#[cfg_attr(
13 feature = "use_serde",
14 derive(serde::Serialize, serde::Deserialize),
15 serde(rename_all = "camelCase")
16)]
17pub struct PartitionSpec {
18 pub leader: SpuId,
19 #[cfg_attr(feature = "use_serde", serde(default))]
20 pub replicas: Vec<SpuId>,
21 #[fluvio(min_version = 4)]
22 pub cleanup_policy: Option<CleanupPolicy>,
23 #[fluvio(min_version = 4)]
24 pub storage: Option<TopicStorageConfig>,
25 #[cfg_attr(feature = "use_serde", serde(default))]
26 #[fluvio(min_version = 6)]
27 pub compression_type: CompressionAlgorithm,
28 #[fluvio(min_version = 12)]
29 pub deduplication: Option<Deduplication>,
30 #[cfg_attr(feature = "use_serde", serde(default))]
31 #[fluvio(min_version = 13)]
32 pub system: bool,
33 #[cfg_attr(feature = "use_serde", serde(default))]
34 #[fluvio(min_version = 14)]
35 pub mirror: Option<PartitionMirrorConfig>,
36}
37
38impl PartitionSpec {
39 pub fn new(leader: SpuId, replicas: Vec<SpuId>) -> Self {
40 Self {
41 leader,
42 replicas,
43 ..Default::default()
44 }
45 }
46
47 pub fn from_replicas(
49 replicas: Vec<SpuId>,
50 topic: &TopicSpec,
51 mirror: Option<&PartitionMirrorConfig>,
52 ) -> Self {
53 let leader = if replicas.is_empty() { 0 } else { replicas[0] };
54
55 Self {
56 leader,
57 replicas,
58 mirror: mirror.cloned(),
59 cleanup_policy: topic.get_clean_policy().cloned(),
60 storage: topic.get_storage().cloned(),
61 compression_type: topic.get_compression_type().clone(),
62 deduplication: topic.get_deduplication().cloned(),
63 system: topic.is_system(),
64 }
65 }
66
67 pub fn has_spu(&self, spu: &SpuId) -> bool {
68 self.replicas.contains(spu)
69 }
70
71 pub fn followers(&self) -> Vec<SpuId> {
73 self.replicas
74 .iter()
75 .filter_map(|r| if r == &self.leader { None } else { Some(*r) })
76 .collect()
77 }
78
79 pub fn mirror_string(&self) -> String {
80 if let Some(mirror) = &self.mirror {
81 let external = mirror.external_cluster();
82 match mirror {
83 PartitionMirrorConfig::Remote(remote) => {
84 if remote.target {
85 format!("{}(from-home)", external)
86 } else {
87 format!("{}(to-home)", external)
88 }
89 }
90
91 PartitionMirrorConfig::Home(home) => {
92 if home.source {
93 format!("{}(to-remote)", external)
94 } else {
95 format!("{}(from-remote)", external)
96 }
97 }
98 }
99 } else {
100 "".to_owned()
101 }
102 }
103}
104
105impl From<Vec<SpuId>> for PartitionSpec {
106 fn from(replicas: Vec<SpuId>) -> Self {
107 if !replicas.is_empty() {
108 Self::new(replicas[0], replicas)
109 } else {
110 Self::new(0, replicas)
111 }
112 }
113}
114
115#[derive(Decoder, Encoder, Debug, Eq, PartialEq, Clone, Default)]
117pub struct PartitionConfig {
118 pub retention_time_seconds: Option<u32>,
119}
120
121#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
122#[cfg_attr(
123 feature = "use_serde",
124 derive(serde::Serialize, serde::Deserialize),
125 serde(rename_all = "camelCase")
126)]
127pub enum PartitionMirrorConfig {
128 #[fluvio(tag = 0)]
129 Remote(RemotePartitionConfig),
130 #[fluvio(tag = 1)]
131 Home(HomePartitionConfig),
132}
133
134impl Default for PartitionMirrorConfig {
135 fn default() -> Self {
136 Self::Remote(RemotePartitionConfig::default())
137 }
138}
139
140impl PartitionMirrorConfig {
141 pub fn remote(&self) -> Option<&RemotePartitionConfig> {
142 match self {
143 Self::Remote(e) => Some(e),
144 _ => None,
145 }
146 }
147
148 pub fn home(&self) -> Option<&HomePartitionConfig> {
149 match self {
150 Self::Home(c) => Some(c),
151 _ => None,
152 }
153 }
154
155 pub fn external_cluster(&self) -> String {
156 match self {
157 Self::Remote(r) => format!(
158 "{}:{}:{}",
159 r.home_cluster, r.home_spu_id, r.home_spu_endpoint
160 ),
161 Self::Home(h) => h.remote_cluster.clone(),
162 }
163 }
164
165 #[deprecated(since = "0.29.1")]
166 pub fn is_home_mirror(&self) -> bool {
167 matches!(self, Self::Home(_))
168 }
169
170 pub fn accept_traffic(&self) -> Option<ErrorCode> {
172 match self {
173 Self::Remote(r) => {
174 if r.target {
175 Some(ErrorCode::MirrorProduceFromRemoteNotAllowed)
176 } else {
177 None
178 }
179 }
180 Self::Home(h) => {
181 if h.source {
182 None
183 } else {
184 Some(ErrorCode::MirrorProduceFromHome)
185 }
186 }
187 }
188 }
189}
190
191impl std::fmt::Display for PartitionMirrorConfig {
192 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
193 match self {
194 PartitionMirrorConfig::Remote(cfg) => write!(f, "{}", cfg),
195 PartitionMirrorConfig::Home(cfg) => write!(f, "{}", cfg),
196 }
197 }
198}
199
200#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
201#[cfg_attr(
202 feature = "use_serde",
203 derive(serde::Serialize, serde::Deserialize),
204 serde(rename_all = "camelCase")
205)]
206pub struct HomePartitionConfig {
207 pub remote_cluster: String,
208 pub remote_replica: String,
209 #[cfg_attr(
211 feature = "use_serde",
212 serde(default, skip_serializing_if = "crate::is_false")
213 )]
214 #[fluvio(min_version = 18)]
215 pub source: bool,
216}
217
218impl std::fmt::Display for HomePartitionConfig {
219 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
220 write!(f, "{}", self.remote_cluster)
221 }
222}
223
224#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
225#[cfg_attr(
226 feature = "use_serde",
227 derive(serde::Serialize, serde::Deserialize),
228 serde(rename_all = "camelCase")
229)]
230pub struct RemotePartitionConfig {
231 pub home_cluster: String,
232 pub home_spu_key: String,
233 #[cfg_attr(feature = "use_serde", serde(default))]
234 pub home_spu_id: SpuId,
235 pub home_spu_endpoint: String,
236 #[cfg_attr(
237 feature = "use_serde",
238 serde(default, skip_serializing_if = "crate::is_false")
239 )]
240 #[fluvio(min_version = 18)]
241 pub target: bool,
242}
243
244impl std::fmt::Display for RemotePartitionConfig {
245 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
246 write!(
247 f,
248 "{}:{}:{}",
249 self.home_cluster, self.home_spu_id, self.home_spu_endpoint
250 )
251 }
252}