1use std::ops::{Deref, DerefMut};
2
3use anyhow::{anyhow, Result};
4
5use fluvio_protocol::record::ReplicaKey;
6use fluvio_types::defaults::{
7 STORAGE_RETENTION_SECONDS, SPU_LOG_LOG_SEGMENT_MAX_BYTE_MIN, STORAGE_RETENTION_SECONDS_MIN,
8 SPU_PARTITION_MAX_BYTES_MIN, SPU_LOG_SEGMENT_MAX_BYTES,
9};
10use fluvio_types::SpuId;
11use fluvio_types::{PartitionId, PartitionCount, ReplicationFactor, IgnoreRackAssignment};
12use fluvio_protocol::{Encoder, Decoder};
13
14use crate::partition::{HomePartitionConfig, PartitionMirrorConfig, RemotePartitionConfig};
15
16use super::deduplication::Deduplication;
17
18#[derive(Debug, Clone, PartialEq, Default, Encoder, Decoder)]
19#[cfg_attr(
20 feature = "use_serde",
21 derive(serde::Serialize, serde::Deserialize),
22 serde(rename_all = "camelCase")
23)]
24pub struct TopicSpec {
25 replicas: ReplicaSpec,
26 #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
27 #[fluvio(min_version = 3)]
28 cleanup_policy: Option<CleanupPolicy>,
29 #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
30 #[fluvio(min_version = 4)]
31 storage: Option<TopicStorageConfig>,
32 #[cfg_attr(feature = "use_serde", serde(default))]
33 #[fluvio(min_version = 6)]
34 compression_type: CompressionAlgorithm,
35 #[cfg_attr(feature = "use_serde", serde(default))]
36 #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
37 #[fluvio(min_version = 12)]
38 deduplication: Option<Deduplication>,
39 #[cfg_attr(feature = "use_serde", serde(default))]
40 #[fluvio(min_version = 13)]
41 system: bool,
42}
43
44impl From<ReplicaSpec> for TopicSpec {
45 fn from(replicas: ReplicaSpec) -> Self {
46 Self {
47 replicas,
48 ..Default::default()
49 }
50 }
51}
52
53impl Deref for TopicSpec {
54 type Target = ReplicaSpec;
55
56 fn deref(&self) -> &Self::Target {
57 &self.replicas
58 }
59}
60
61impl TopicSpec {
62 pub fn new_assigned(partition_map: impl Into<PartitionMaps>) -> Self {
63 Self {
64 replicas: ReplicaSpec::new_assigned(partition_map),
65 ..Default::default()
66 }
67 }
68
69 pub fn new_computed(
70 partitions: PartitionCount,
71 replication: ReplicationFactor,
72 ignore_rack: Option<IgnoreRackAssignment>,
73 ) -> Self {
74 Self {
75 replicas: ReplicaSpec::new_computed(partitions, replication, ignore_rack),
76 ..Default::default()
77 }
78 }
79
80 pub fn new_mirror(mirror: MirrorConfig) -> Self {
81 Self {
82 replicas: ReplicaSpec::Mirror(mirror),
83 ..Default::default()
84 }
85 }
86
87 #[inline(always)]
88 pub fn replicas(&self) -> &ReplicaSpec {
89 &self.replicas
90 }
91
92 pub fn set_replicas(&mut self, replicas: ReplicaSpec) {
93 self.replicas = replicas;
94 }
95
96 pub fn set_cleanup_policy(&mut self, policy: CleanupPolicy) {
97 self.cleanup_policy = Some(policy);
98 }
99
100 pub fn get_partition_mirror_map(&self) -> Option<PartitionMaps> {
101 match &self.replicas {
102 ReplicaSpec::Mirror(mirror) => match mirror {
103 MirrorConfig::Remote(e) => Some(e.as_partition_maps()),
104 MirrorConfig::Home(c) => Some(c.as_partition_maps()),
105 },
106 _ => None,
107 }
108 }
109
110 pub fn get_clean_policy(&self) -> Option<&CleanupPolicy> {
111 self.cleanup_policy.as_ref()
112 }
113
114 pub fn set_compression_type(&mut self, compression: CompressionAlgorithm) {
115 self.compression_type = compression;
116 }
117
118 pub fn get_compression_type(&self) -> &CompressionAlgorithm {
119 &self.compression_type
120 }
121
122 pub fn get_storage(&self) -> Option<&TopicStorageConfig> {
123 self.storage.as_ref()
124 }
125
126 pub fn get_storage_mut(&mut self) -> Option<&mut TopicStorageConfig> {
127 self.storage.as_mut()
128 }
129
130 pub fn set_storage(&mut self, storage: TopicStorageConfig) {
131 self.storage = Some(storage);
132 }
133
134 pub fn get_deduplication(&self) -> Option<&Deduplication> {
135 self.deduplication.as_ref()
136 }
137
138 pub fn set_deduplication(&mut self, deduplication: Option<Deduplication>) {
139 self.deduplication = deduplication;
140 }
141
142 pub fn is_system(&self) -> bool {
143 self.system
144 }
145
146 pub fn set_system(&mut self, system: bool) {
147 self.system = system;
148 }
149
150 pub fn retention_secs(&self) -> u32 {
152 self.get_clean_policy()
153 .map(|policy| policy.retention_secs())
154 .unwrap_or_else(|| STORAGE_RETENTION_SECONDS)
155 }
156
157 pub fn validate_config(&self) -> Option<String> {
159 if let Some(policy) = self.get_clean_policy() {
160 if policy.retention_secs() < STORAGE_RETENTION_SECONDS_MIN {
161 return Some(format!(
162 "retention_secs {} is less than minimum {}",
163 policy.retention_secs(),
164 STORAGE_RETENTION_SECONDS_MIN
165 ));
166 }
167 }
168
169 if let Some(storage) = self.get_storage() {
170 if let Some(segment_size) = storage.segment_size {
171 if segment_size < SPU_LOG_LOG_SEGMENT_MAX_BYTE_MIN {
172 return Some(format!(
173 "segment_size {segment_size} is less than minimum {SPU_LOG_LOG_SEGMENT_MAX_BYTE_MIN}"
174 ));
175 }
176 }
177 if let Some(max_partition_size) = storage.max_partition_size {
178 if max_partition_size < SPU_PARTITION_MAX_BYTES_MIN {
179 return Some(format!(
180 "max_partition_size {max_partition_size} is less than minimum {SPU_PARTITION_MAX_BYTES_MIN}"
181 ));
182 }
183 let segment_size = storage.segment_size.unwrap_or(SPU_LOG_SEGMENT_MAX_BYTES);
184 if max_partition_size < segment_size as u64 {
185 return Some(format!(
186 "max_partition_size {max_partition_size} is less than segment size {segment_size}"
187 ));
188 }
189 }
190 }
191
192 None
193 }
194}
195
196#[derive(Debug, Clone, Eq, PartialEq, Encoder, Decoder)]
197#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
198pub enum ReplicaSpec {
199 #[cfg_attr(feature = "use_serde", serde(rename = "assigned"))]
200 #[fluvio(tag = 0)]
201 Assigned(PartitionMaps),
202 #[cfg_attr(feature = "use_serde", serde(rename = "computed"))]
203 #[fluvio(tag = 1)]
204 Computed(TopicReplicaParam),
205 #[cfg_attr(
206 feature = "use_serde",
207 serde(rename = "mirror", with = "serde_yaml::with::singleton_map")
208 )]
209 #[fluvio(tag = 2)]
210 #[fluvio(min_version = 14)]
211 Mirror(MirrorConfig),
212}
213
214impl std::fmt::Display for ReplicaSpec {
215 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
216 match self {
217 Self::Assigned(partition_map) => write!(f, "assigned::{partition_map}"),
218 Self::Computed(param) => write!(f, "computed::({param})"),
219 Self::Mirror(param) => write!(f, "mirror::({param})"),
220 }
221 }
222}
223
224impl Default for ReplicaSpec {
225 fn default() -> Self {
226 Self::Computed(TopicReplicaParam::default())
227 }
228}
229
230impl ReplicaSpec {
231 pub fn new_assigned<J>(partition_map: J) -> Self
232 where
233 J: Into<PartitionMaps>,
234 {
235 Self::Assigned(partition_map.into())
236 }
237
238 pub fn new_computed(
239 partitions: PartitionCount,
240 replication: ReplicationFactor,
241 ignore_rack: Option<IgnoreRackAssignment>,
242 ) -> Self {
243 Self::Computed((partitions, replication, ignore_rack.unwrap_or(false)).into())
244 }
245
246 pub fn is_computed(&self) -> bool {
247 matches!(self, Self::Computed(_))
248 }
249
250 pub fn partitions(&self) -> PartitionCount {
251 match &self {
252 Self::Computed(param) => param.partitions,
253 Self::Assigned(partition_map) => partition_map.partition_count(),
254 Self::Mirror(partition_map) => partition_map.partition_count(),
255 }
256 }
257
258 pub fn replication_factor(&self) -> Option<ReplicationFactor> {
259 match self {
260 Self::Computed(param) => Some(param.replication_factor),
261 Self::Assigned(partition_map) => partition_map.replication_factor(),
262 Self::Mirror(partition_map) => partition_map.replication_factor(),
263 }
264 }
265
266 pub fn ignore_rack_assignment(&self) -> IgnoreRackAssignment {
267 match self {
268 Self::Computed(param) => param.ignore_rack_assignment,
269 Self::Assigned(_) => false,
270 Self::Mirror(_) => false,
271 }
272 }
273
274 pub fn type_label(&self) -> &'static str {
275 match self {
276 Self::Computed(_) => "computed",
277 Self::Assigned(_) => "assigned",
278 Self::Mirror(mirror) => match mirror {
279 MirrorConfig::Remote(remote_config) => {
280 if remote_config.target {
281 "from-home"
282 } else {
283 "to-home"
284 }
285 }
286 MirrorConfig::Home(home_config) => {
287 if home_config.0.source {
288 "to-remote"
289 } else {
290 "from-remote"
291 }
292 }
293 },
294 }
295 }
296
297 pub fn partitions_display(&self) -> String {
298 match self {
299 Self::Computed(param) => param.partitions.to_string(),
300 Self::Assigned(_) => "".to_owned(),
301 Self::Mirror(_) => "".to_owned(),
302 }
303 }
304
305 pub fn replication_factor_display(&self) -> String {
306 match self {
307 Self::Computed(param) => param.replication_factor.to_string(),
308 Self::Assigned(_) => "".to_owned(),
309 Self::Mirror(_) => "".to_owned(),
310 }
311 }
312
313 pub fn ignore_rack_assign_display(&self) -> &'static str {
314 match self {
315 Self::Computed(param) => {
316 if param.ignore_rack_assignment {
317 "yes"
318 } else {
319 ""
320 }
321 }
322 Self::Assigned(_) => "",
323 Self::Mirror(_) => "",
324 }
325 }
326
327 pub fn partition_map_str(&self) -> Option<String> {
328 match self {
329 Self::Computed(_) => None,
330 Self::Assigned(partition_map) => Some(partition_map.partition_map_string()),
331 Self::Mirror(mirror) => Some(mirror.as_partition_maps().partition_map_string()),
332 }
333 }
334
335 pub fn valid_partition(partitions: &PartitionCount) -> Result<()> {
341 if *partitions == 0 {
342 return Err(anyhow!("partition must be greater than 0"));
343 }
344
345 Ok(())
346 }
347
348 pub fn valid_replication_factor(replication: &ReplicationFactor) -> Result<()> {
350 if *replication == 0 {
351 return Err(anyhow!("replication factor must be greater than 0"));
352 }
353
354 Ok(())
355 }
356}
357
358#[derive(Debug, Clone, Default, Eq, PartialEq, Encoder, Decoder)]
360#[cfg_attr(
361 feature = "use_serde",
362 derive(serde::Serialize, serde::Deserialize),
363 serde(rename_all = "camelCase")
364)]
365pub struct TopicReplicaParam {
366 #[cfg_attr(feature = "use_serde", serde(default = "default_count"))]
367 pub partitions: PartitionCount,
368 #[cfg_attr(feature = "use_serde", serde(default = "default_count"))]
369 pub replication_factor: ReplicationFactor,
370 #[cfg_attr(
371 feature = "use_serde",
372 serde(skip_serializing_if = "bool::clone", default)
373 )]
374 pub ignore_rack_assignment: IgnoreRackAssignment,
375}
376
377#[allow(dead_code)]
378fn default_count() -> u32 {
379 1
380}
381
382impl TopicReplicaParam {
383 pub fn new(
384 partitions: PartitionCount,
385 replication_factor: ReplicationFactor,
386 ignore_rack_assignment: IgnoreRackAssignment,
387 ) -> Self {
388 Self {
389 partitions,
390 replication_factor,
391 ignore_rack_assignment,
392 }
393 }
394}
395
396impl From<(PartitionCount, ReplicationFactor, IgnoreRackAssignment)> for TopicReplicaParam {
397 fn from(value: (PartitionCount, ReplicationFactor, IgnoreRackAssignment)) -> Self {
398 let (partitions, replication_factor, ignore_rack) = value;
399 Self::new(partitions, replication_factor, ignore_rack)
400 }
401}
402
403impl std::fmt::Display for TopicReplicaParam {
404 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
405 write!(
406 f,
407 "replica param::(p:{}, r:{})",
408 self.partitions, self.replication_factor
409 )
410 }
411}
412
413#[derive(Debug, Default, Clone, Eq, PartialEq, Encoder, Decoder)]
415#[cfg_attr(
416 feature = "use_serde",
417 derive(serde::Serialize, serde::Deserialize),
418 serde(transparent)
419)]
420pub struct PartitionMaps(Vec<PartitionMap>);
421
422impl From<Vec<PartitionMap>> for PartitionMaps {
423 fn from(maps: Vec<PartitionMap>) -> Self {
424 Self(maps)
425 }
426}
427
428impl From<PartitionMaps> for Vec<PartitionMap> {
429 fn from(maps: PartitionMaps) -> Self {
430 maps.0
431 }
432}
433
434impl From<Vec<(PartitionId, Vec<SpuId>)>> for PartitionMaps {
435 fn from(partition_vec: Vec<(PartitionId, Vec<SpuId>)>) -> Self {
436 let maps: Vec<PartitionMap> = partition_vec
437 .into_iter()
438 .map(|(id, replicas)| PartitionMap {
439 id,
440 replicas,
441 mirror: None,
442 })
443 .collect();
444 maps.into()
445 }
446}
447
448impl std::fmt::Display for PartitionMaps {
449 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
450 write!(f, "partition map:{})", self.0.len())
451 }
452}
453
454impl PartitionMaps {
455 pub fn maps(&self) -> &Vec<PartitionMap> {
456 &self.0
457 }
458
459 pub fn maps_owned(self) -> Vec<PartitionMap> {
460 self.0
461 }
462
463 fn partition_count(&self) -> PartitionCount {
464 self.0.len() as PartitionCount
465 }
466
467 fn replication_factor(&self) -> Option<ReplicationFactor> {
468 self.0
470 .first()
471 .map(|partition| partition.replicas.len() as ReplicationFactor)
472 }
473
474 fn partition_map_string(&self) -> String {
475 use std::fmt::Write;
476
477 let mut res = String::new();
478 for partition in &self.0 {
479 write!(res, "{}:{:?}, ", partition.id, partition.replicas).unwrap();
480 }
482 if !res.is_empty() {
483 res.truncate(res.len() - 2);
484 }
485 res
486 }
487
488 pub fn unique_spus_in_partition_map(&self) -> Vec<SpuId> {
494 let mut spu_ids: Vec<SpuId> = vec![];
495
496 for partition in &self.0 {
497 for spu in &partition.replicas {
498 if !spu_ids.contains(spu) {
499 spu_ids.push(*spu);
500 }
501 }
502 }
503
504 spu_ids
505 }
506
507 #[allow(clippy::explicit_counter_loop)]
508 pub fn validate(&self) -> Result<()> {
510 if self.0.is_empty() {
512 return Err(anyhow!("no assigned partitions found"));
513 }
514
515 let mut id = 0;
525 let mut replica_len = 0;
526 for partition in &self.0 {
527 if id == 0 {
528 if partition.id != id {
530 return Err(anyhow!("assigned partitions must start with id 0",));
531 }
532
533 replica_len = partition.replicas.len();
535 if replica_len == 0 {
536 return Err(anyhow!("assigned replicas must have at least one spu id",));
537 }
538 } else {
539 if partition.id != id {
541 return Err(anyhow!(
542 "assigned partition ids must be in sequence and without gaps"
543 ));
544 }
545
546 if partition.replicas.len() != replica_len {
548 return Err(anyhow!(
549 "all assigned replicas must have the same number of spu ids: {replica_len}"
550 ));
551 }
552 }
553
554 let mut sorted_replicas = partition.replicas.clone();
556 sorted_replicas.sort_unstable();
557 let unique_count = 1 + sorted_replicas
558 .windows(2)
559 .filter(|pair| pair[0] != pair[1])
560 .count();
561 if partition.replicas.len() != unique_count {
562 return Err(anyhow!(format!(
563 "duplicate spu ids found in assigned partition with id: {id}"
564 ),));
565 }
566
567 for spu_id in &partition.replicas {
569 if *spu_id < 0 {
570 return Err(anyhow!(
571 "invalid spu id: {spu_id} in assigned partition with id: {id}"
572 ));
573 }
574 }
575
576 id += 1;
578 }
579
580 Ok(())
581 }
582}
583
584impl From<(PartitionCount, ReplicationFactor, IgnoreRackAssignment)> for TopicSpec {
585 fn from(spec: (PartitionCount, ReplicationFactor, IgnoreRackAssignment)) -> Self {
586 let (count, factor, rack) = spec;
587 Self::new_computed(count, factor, Some(rack))
588 }
589}
590
591impl From<(PartitionCount, ReplicationFactor)> for TopicSpec {
593 fn from(spec: (PartitionCount, ReplicationFactor)) -> Self {
594 let (count, factor) = spec;
595 Self::new_computed(count, factor, Some(false))
596 }
597}
598
599#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
600#[cfg_attr(
601 feature = "use_serde",
602 derive(serde::Serialize, serde::Deserialize),
603 derive(schemars::JsonSchema)
604)]
605pub struct PartitionMap {
606 pub id: PartitionId,
607 pub replicas: Vec<SpuId>,
608 #[cfg_attr(
609 feature = "use_serde",
610 serde(default, skip_serializing_if = "Option::is_none")
611 )]
612 #[fluvio(min_version = 14)]
613 pub mirror: Option<PartitionMirrorConfig>,
614}
615
616#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
617#[cfg_attr(
618 feature = "use_serde",
619 derive(serde::Serialize, serde::Deserialize),
620 serde(rename_all = "camelCase")
621)]
622pub enum MirrorConfig {
623 #[fluvio(tag = 0)]
624 Remote(RemoteMirrorConfig),
625 #[fluvio(tag = 1)]
626 Home(HomeMirrorConfig),
627}
628
629impl Default for MirrorConfig {
630 fn default() -> Self {
631 Self::Remote(RemoteMirrorConfig::default())
632 }
633}
634
635impl std::fmt::Display for MirrorConfig {
636 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
637 match self {
638 MirrorConfig::Remote(r) => {
639 write!(f, "Mirror Remote {r:?} ")
640 }
641 MirrorConfig::Home(h) => {
642 write!(f, "Mirror Home {h:?} ")
643 }
644 }
645 }
646}
647
648impl MirrorConfig {
649 pub fn partition_count(&self) -> PartitionCount {
650 match self {
651 MirrorConfig::Remote(src) => src.partition_count(),
652 MirrorConfig::Home(tg) => tg.partition_count(),
653 }
654 }
655
656 pub fn replication_factor(&self) -> Option<ReplicationFactor> {
657 None
658 }
659
660 pub fn as_partition_maps(&self) -> PartitionMaps {
661 match self {
662 MirrorConfig::Remote(src) => src.as_partition_maps(),
663 MirrorConfig::Home(tg) => tg.as_partition_maps(),
664 }
665 }
666
667 pub fn set_home_to_remote(&mut self, home_to_remote: bool) -> Result<()> {
669 match self {
670 Self::Remote(_) => Err(anyhow!(
671 "remote mirror config cannot be set to home to remote"
672 )),
673 Self::Home(home) => {
674 home.set_home_to_remote(home_to_remote);
675 Ok(())
676 }
677 }
678 }
679
680 pub fn validate(&self) -> anyhow::Result<()> {
682 Ok(())
683 }
684}
685
686type Partitions = Vec<HomePartitionConfig>;
687
688#[cfg_attr(
689 feature = "use_serde",
690 derive(serde::Serialize),
691 serde(rename_all = "camelCase", untagged)
692)]
693enum MultiHome {
694 V1(Partitions),
695 V2(HomeMirrorInner),
696}
697
698#[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)]
699#[cfg_attr(
700 feature = "use_serde",
701 derive(serde::Serialize, serde::Deserialize),
702 serde(rename_all = "camelCase")
703)]
704pub struct HomeMirrorConfig(
705 #[cfg_attr(feature = "use_serde", serde(deserialize_with = "from_home_v1"))] HomeMirrorInner,
706);
707
708impl Deref for HomeMirrorConfig {
709 type Target = HomeMirrorInner;
710
711 fn deref(&self) -> &Self::Target {
712 &self.0
713 }
714}
715
716impl DerefMut for HomeMirrorConfig {
717 fn deref_mut(&mut self) -> &mut Self::Target {
718 &mut self.0
719 }
720}
721
722impl HomeMirrorConfig {
723 pub fn from_simple(topic: &str, remote_clusters: Vec<String>) -> Self {
726 Self(HomeMirrorInner {
727 partitions: remote_clusters
728 .into_iter()
729 .map(|remote_cluster| HomePartitionConfig {
730 remote_cluster,
731 remote_replica: { ReplicaKey::new(topic, 0_u32).to_string() },
732 ..Default::default()
733 })
734 .collect(),
735 source: false,
736 })
737 }
738}
739
740cfg_if::cfg_if! {
741 if #[cfg(feature = "use_serde")] {
742 impl<'de> serde::Deserialize<'de> for MultiHome {
743 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
744 where
745 D: serde::Deserializer<'de>,
746 {
747 struct MultiHomeVisitor;
748
749 impl<'de> serde::de::Visitor<'de> for MultiHomeVisitor {
750 type Value = MultiHome;
751
752 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
753 formatter.write_str("an array or an object")
754 }
755
756 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
757 where
758 A: serde::de::SeqAccess<'de>,
759 {
760 let mut elements = vec![];
761 while let Some(value) = seq.next_element::<HomePartitionConfig>()? {
762 elements.push(value);
763 }
764 Ok(MultiHome::V1(elements))
765 }
766
767 fn visit_map<M>(self, map: M) -> Result<Self::Value, M::Error>
768 where
769 M: serde::de::MapAccess<'de>,
770 {
771 use serde::de::value::MapAccessDeserializer;
772 let obj: HomeMirrorInner = serde::Deserialize::deserialize(MapAccessDeserializer::new(map))?;
773 Ok(MultiHome::V2(obj))
774 }
775 }
776
777 deserializer.deserialize_any(MultiHomeVisitor)
778 }
779 }
780
781 fn from_home_v1<'de, D>(deserializer: D) -> Result<HomeMirrorInner, D::Error>
782 where
783 D: serde::Deserializer<'de>,
784 {
785 let home: MultiHome = serde::Deserialize::deserialize(deserializer)?;
786 match home {
787 MultiHome::V1(v1) => Ok(HomeMirrorInner {
788 partitions: v1,
789 source: false,
790 }),
791 MultiHome::V2(v2) => Ok(v2),
792 }
793 }
794 }
795}
796
797#[derive(Default, Debug, Clone, Eq, PartialEq, Decoder, Encoder)]
798#[cfg_attr(
799 feature = "use_serde",
800 derive(serde::Serialize, serde::Deserialize),
801 serde(rename_all = "camelCase")
802)]
803pub struct HomeMirrorInner {
804 #[cfg_attr(feature = "use_serde", serde(default))]
805 pub partitions: Vec<HomePartitionConfig>,
806 #[cfg_attr(
807 feature = "use_serde",
808 serde(skip_serializing_if = "crate::is_false", default)
809 )]
810 #[fluvio(min_version = 18)]
811 pub source: bool, }
813
814impl From<Vec<HomePartitionConfig>> for HomeMirrorConfig {
815 fn from(partitions: Vec<HomePartitionConfig>) -> Self {
816 Self(HomeMirrorInner {
817 partitions,
818 source: false,
819 })
820 }
821}
822
823impl HomeMirrorInner {
824 pub fn partition_count(&self) -> PartitionCount {
825 self.partitions.len() as PartitionCount
826 }
827
828 pub fn replication_factor(&self) -> Option<ReplicationFactor> {
829 None
830 }
831
832 pub fn partitions(&self) -> &Vec<HomePartitionConfig> {
833 &self.partitions
834 }
835
836 pub fn as_partition_maps(&self) -> PartitionMaps {
837 let mut maps = vec![];
838 for (partition_id, home_partition) in self.partitions.iter().enumerate() {
839 maps.push(PartitionMap {
840 id: partition_id as u32,
841 mirror: Some(PartitionMirrorConfig::Home(home_partition.clone())),
842 ..Default::default()
843 });
844 }
845 maps.into()
846 }
847
848 pub fn validate(&self) -> anyhow::Result<()> {
850 Ok(())
851 }
852
853 pub fn add_partition(&mut self, partition: HomePartitionConfig) {
855 self.partitions.push(partition);
856 }
857
858 pub fn set_home_to_remote(&mut self, home_to_remote: bool) {
860 self.source = home_to_remote;
861 self.partitions.iter_mut().for_each(|partition| {
862 partition.source = home_to_remote;
863 });
864 }
865}
866
867#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
868#[cfg_attr(
869 feature = "use_serde",
870 derive(serde::Serialize, serde::Deserialize),
871 serde(rename_all = "camelCase")
872)]
873pub struct HomeMirrorPartition {
874 pub remote_clusters: Vec<String>,
875}
876
877#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
878#[cfg_attr(
879 feature = "use_serde",
880 derive(serde::Serialize, serde::Deserialize),
881 serde(rename_all = "camelCase")
882)]
883pub struct RemoteMirrorConfig {
884 pub home_cluster: String,
886 pub home_spus: Vec<SpuMirrorConfig>,
887 #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "crate::is_false"))]
888 #[fluvio(min_version = 18)]
889 pub target: bool,
890}
891
892#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
893#[cfg_attr(
894 feature = "use_serde",
895 derive(serde::Serialize, serde::Deserialize),
896 serde(rename_all = "camelCase")
897)]
898pub struct SpuMirrorConfig {
899 pub id: SpuId,
900 pub key: String,
901 pub endpoint: String,
902}
903
904impl RemoteMirrorConfig {
905 pub fn partition_count(&self) -> PartitionCount {
906 self.home_spus.len() as PartitionCount
907 }
908
909 pub fn replication_factor(&self) -> Option<ReplicationFactor> {
910 None
911 }
912
913 pub fn spus(&self) -> &Vec<SpuMirrorConfig> {
914 &self.home_spus
915 }
916
917 pub fn as_partition_maps(&self) -> PartitionMaps {
918 let mut maps = vec![];
919 for (partition_id, home_spu) in self.home_spus.iter().enumerate() {
920 maps.push(PartitionMap {
921 id: partition_id as u32,
922 mirror: Some(PartitionMirrorConfig::Remote(RemotePartitionConfig {
923 home_spu_key: home_spu.key.clone(),
924 home_spu_id: home_spu.id,
925 home_cluster: self.home_cluster.clone(),
926 home_spu_endpoint: home_spu.endpoint.clone(),
927 target: self.target,
928 })),
929 ..Default::default()
930 });
931 }
932 maps.into()
933 }
934
935 pub fn validate(&self) -> anyhow::Result<()> {
937 Ok(())
938 }
939}
940
941#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
942#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
943pub enum CleanupPolicy {
944 #[cfg_attr(feature = "use_serde", serde(rename = "segment"))]
945 #[fluvio(tag = 0)]
946 Segment(SegmentBasedPolicy),
947}
948
949impl Default for CleanupPolicy {
950 fn default() -> Self {
951 CleanupPolicy::Segment(SegmentBasedPolicy::default())
952 }
953}
954
955impl CleanupPolicy {
956 pub fn retention_secs(&self) -> u32 {
957 match self {
958 CleanupPolicy::Segment(policy) => policy.retention_secs(),
959 }
960 }
961}
962
963#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
964#[cfg_attr(
965 feature = "use_serde",
966 derive(serde::Serialize, serde::Deserialize),
967 serde(rename_all = "camelCase")
968)]
969pub struct SegmentBasedPolicy {
970 pub time_in_seconds: u32,
971}
972
973impl SegmentBasedPolicy {
974 pub fn retention_secs(&self) -> u32 {
975 self.time_in_seconds
976 }
977}
978
979#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
980#[cfg_attr(
981 feature = "use_serde",
982 derive(serde::Serialize, serde::Deserialize),
983 serde(rename_all = "camelCase")
984)]
985pub struct TopicStorageConfig {
986 pub segment_size: Option<u32>, pub max_partition_size: Option<u64>, }
989
990#[derive(Decoder, Default, Encoder, Debug, Clone, Eq, PartialEq)]
991#[cfg_attr(
992 feature = "use_serde",
993 derive(serde::Serialize, serde::Deserialize),
994 derive(schemars::JsonSchema)
995)]
996pub enum CompressionAlgorithm {
997 #[fluvio(tag = 0)]
998 None,
999 #[fluvio(tag = 1)]
1000 Gzip,
1001 #[fluvio(tag = 2)]
1002 Snappy,
1003 #[fluvio(tag = 3)]
1004 Lz4,
1005 #[default]
1006 #[fluvio(tag = 4)]
1007 Any,
1008 #[fluvio(tag = 5)]
1009 Zstd,
1010}
1011
1012#[derive(Debug, thiserror::Error)]
1013#[error("Invalid compression type in topic")]
1014pub struct InvalidCompressionAlgorithm;
1015
1016impl std::str::FromStr for CompressionAlgorithm {
1017 type Err = InvalidCompressionAlgorithm;
1018
1019 fn from_str(s: &str) -> Result<Self, Self::Err> {
1020 match s.to_lowercase().as_str() {
1021 "none" => Ok(CompressionAlgorithm::None),
1022 "gzip" => Ok(CompressionAlgorithm::Gzip),
1023 "snappy" => Ok(CompressionAlgorithm::Snappy),
1024 "lz4" => Ok(CompressionAlgorithm::Lz4),
1025 "any" => Ok(CompressionAlgorithm::Any),
1026 "zstd" => Ok(CompressionAlgorithm::Zstd),
1027 _ => Err(InvalidCompressionAlgorithm),
1028 }
1029 }
1030}
1031impl std::fmt::Display for CompressionAlgorithm {
1032 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
1033 match self {
1034 Self::None => write!(f, "none"),
1035 Self::Gzip => write!(f, "gzip"),
1036 Self::Snappy => write!(f, "snappy"),
1037 Self::Lz4 => write!(f, "lz4"),
1038 Self::Any => write!(f, "any"),
1039 Self::Zstd => write!(f, "zstd"),
1040 }
1041 }
1042}
1043
1044#[cfg(test)]
1045mod test {
1046
1047 use std::io::Cursor;
1048
1049 use crate::topic::{Bounds, Filter, Transform};
1050
1051 use super::*;
1052
1053 #[test]
1054 fn test_is_computed_topic() {
1055 let p1: PartitionMaps = vec![(1, vec![0]), (2, vec![2])].into();
1056 let t1 = ReplicaSpec::new_assigned(p1);
1057 assert!(!t1.is_computed());
1058
1059 let t2 = ReplicaSpec::new_computed(0, 0, None);
1060 assert!(t2.is_computed());
1061 }
1062
1063 #[test]
1064 fn test_valid_computed_replica_params() {
1065 let t2_result = ReplicaSpec::valid_partition(&0);
1067 assert!(t2_result.is_err());
1068 assert_eq!(
1069 format!("{}", t2_result.unwrap_err()),
1070 "partition must be greater than 0"
1071 );
1072
1073 let t3_result = ReplicaSpec::valid_partition(&1);
1074 assert!(t3_result.is_ok());
1075
1076 let t5_result = ReplicaSpec::valid_replication_factor(&0);
1078 assert!(t5_result.is_err());
1079 assert_eq!(
1080 format!("{}", t5_result.unwrap_err()),
1081 "replication factor must be greater than 0"
1082 );
1083
1084 let t6_result = ReplicaSpec::valid_replication_factor(&1);
1086 assert!(t6_result.is_ok());
1087 }
1088
1089 #[test]
1093 fn test_replica_map_ids() {
1094 let p1: PartitionMaps = vec![(1, vec![0]), (2, vec![2])].into();
1096 let p1_result = p1.validate();
1097 assert!(p1_result.is_err());
1098 assert_eq!(
1099 format!("{}", p1_result.unwrap_err()),
1100 "assigned partitions must start with id 0"
1101 );
1102
1103 let p2: PartitionMaps = vec![(0, vec![0]), (2, vec![2])].into();
1105 let p2_result = p2.validate();
1106 assert!(p2_result.is_err());
1107 assert_eq!(
1108 format!("{}", p2_result.unwrap_err()),
1109 "assigned partition ids must be in sequence and without gaps"
1110 );
1111
1112 let p3: PartitionMaps = vec![(0, vec![0]), (2, vec![2]), (1, vec![1])].into();
1114 let p3_result = p3.validate();
1115 assert!(p3_result.is_err());
1116 assert_eq!(
1117 format!("{}", p3_result.unwrap_err()),
1118 "assigned partition ids must be in sequence and without gaps"
1119 );
1120
1121 let p4: PartitionMaps = vec![(0, vec![0]), (1, vec![1]), (1, vec![1])].into();
1123 let p4_result = p4.validate();
1124 assert!(p4_result.is_err());
1125 assert_eq!(
1126 format!("{}", p4_result.unwrap_err()),
1127 "assigned partition ids must be in sequence and without gaps"
1128 );
1129
1130 let p5: PartitionMaps = vec![(0, vec![1]), (1, vec![1]), (2, vec![2])].into();
1132 let p5_result = p5.validate();
1133 assert!(p5_result.is_ok());
1134 }
1135
1136 #[test]
1142 fn test_replica_map_spu_ids() {
1143 let p1: PartitionMaps = vec![(0, vec![]), (1, vec![1])].into();
1145 let p1_result = p1.validate();
1146 assert!(p1_result.is_err());
1147 assert_eq!(
1148 format!("{}", p1_result.unwrap_err()),
1149 "assigned replicas must have at least one spu id"
1150 );
1151
1152 let p2: PartitionMaps = vec![(0, vec![1, 2]), (1, vec![1])].into();
1154 let p2_result = p2.validate();
1155 assert!(p2_result.is_err());
1156 assert_eq!(
1157 format!("{}", p2_result.unwrap_err()),
1158 "all assigned replicas must have the same number of spu ids: 2"
1159 );
1160
1161 let p3: PartitionMaps = vec![(0, vec![1, 2]), (1, vec![1, 1])].into();
1163 let p3_result = p3.validate();
1164 assert!(p3_result.is_err());
1165 assert_eq!(
1166 format!("{}", p3_result.unwrap_err()),
1167 "duplicate spu ids found in assigned partition with id: 1"
1168 );
1169
1170 let p4: PartitionMaps = vec![(0, vec![3, 1, 2, 3])].into();
1172 let p4_result = p4.validate();
1173 assert!(p4_result.is_err());
1174 assert_eq!(
1175 format!("{}", p4_result.unwrap_err()),
1176 "duplicate spu ids found in assigned partition with id: 0"
1177 );
1178
1179 let p5: PartitionMaps = vec![(0, vec![1, 2]), (1, vec![1, -2])].into();
1181 let p5_result = p5.validate();
1182 assert!(p5_result.is_err());
1183 assert_eq!(
1184 format!("{}", p5_result.unwrap_err()),
1185 "invalid spu id: -2 in assigned partition with id: 1"
1186 );
1187 }
1188
1189 #[test]
1192 fn test_unique_spus_in_partition_map() {
1193 let p1: PartitionMaps =
1195 vec![(0, vec![0, 1, 3]), (1, vec![0, 2, 3]), (2, vec![1, 3, 4])].into();
1196
1197 let p1_result = p1.unique_spus_in_partition_map();
1198 let expected_p1_result: Vec<SpuId> = vec![0, 1, 3, 2, 4];
1199 assert_eq!(p1_result, expected_p1_result);
1200 }
1201
1202 #[test]
1203 fn test_encode_decode_computed_topic_spec() {
1204 let topic_spec = ReplicaSpec::Computed((2, 3, true).into());
1205 let mut dest = vec![];
1206
1207 let result = topic_spec.encode(&mut dest, 0);
1209 assert!(result.is_ok());
1210
1211 let expected_dest = [
1212 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x01, ];
1217 assert_eq!(dest, expected_dest);
1218
1219 let mut topic_spec_decoded = ReplicaSpec::default();
1221 let result = topic_spec_decoded.decode(&mut Cursor::new(&expected_dest), 0);
1222 assert!(result.is_ok());
1223
1224 match topic_spec_decoded {
1225 ReplicaSpec::Computed(param) => {
1226 assert_eq!(param.partitions, 2);
1227 assert_eq!(param.replication_factor, 3);
1228 assert!(param.ignore_rack_assignment);
1229 }
1230 _ => panic!("expect computed topic spec, found {topic_spec_decoded:?}"),
1231 }
1232 }
1233
1234 #[test]
1235 fn test_topic_with_dedup_prev_version_compatibility() {
1236 let prev_version = 11;
1238 let mut topic_spec: TopicSpec = ReplicaSpec::Computed((2, 3, true).into()).into();
1239 topic_spec.set_deduplication(Some(Deduplication {
1240 bounds: Bounds {
1241 count: 1,
1242 age: None,
1243 },
1244 filter: Filter {
1245 transform: Transform {
1246 uses: "filter".to_string(),
1247 ..Default::default()
1248 },
1249 },
1250 }));
1251
1252 let mut dest = vec![];
1254 topic_spec.encode(&mut dest, prev_version).expect("encoded");
1255 let mut topic_spec_decoded = TopicSpec::default();
1256 topic_spec_decoded
1257 .decode(&mut Cursor::new(&dest), prev_version)
1258 .expect("decoded");
1259
1260 assert!(topic_spec_decoded.deduplication.is_none());
1262 }
1263
1264 #[test]
1265 fn test_partition_map_str() {
1266 let p1: PartitionMaps =
1268 vec![(0, vec![0, 1, 3]), (1, vec![0, 2, 3]), (2, vec![1, 3, 4])].into();
1269 let spec = ReplicaSpec::new_assigned(p1);
1270 assert_eq!(
1271 spec.partition_map_str(),
1272 Some("0:[0, 1, 3], 1:[0, 2, 3], 2:[1, 3, 4]".to_string())
1273 );
1274
1275 let p2 = PartitionMaps::default();
1277 let spec2 = ReplicaSpec::new_assigned(p2);
1278 assert_eq!(spec2.partition_map_str(), Some("".to_string()));
1279 }
1280
1281 #[test]
1282 fn test_deserialize_home_mirror_config() {
1283 let data = r#"{"partitions":[{"remoteCluster":"boat1","remoteReplica":"boats-0","source":false},{"remoteCluster":"boat2","remoteReplica":"boats-0","source":false}]}"#;
1284 let home_mirror: HomeMirrorConfig = serde_json::from_str(data).expect("deserialize");
1285 assert_eq!(home_mirror.partitions().len(), 2);
1286 }
1287}
1288
1289#[cfg(test)]
1290mod mirror_test {
1291 use crate::{
1292 topic::{PartitionMap, HomeMirrorConfig},
1293 partition::{PartitionMirrorConfig, HomePartitionConfig},
1294 };
1295
1296 #[test]
1298 fn test_home_mirror_conversion() {
1299 let mirror =
1300 HomeMirrorConfig::from_simple("boats", vec!["boat1".to_owned(), "boat2".to_owned()]);
1301 assert_eq!(
1302 mirror.as_partition_maps(),
1303 vec![
1304 PartitionMap {
1305 id: 0,
1306 mirror: Some(PartitionMirrorConfig::Home(HomePartitionConfig {
1307 remote_replica: "boats-0".to_string(),
1308 remote_cluster: "boat1".to_owned(),
1309 ..Default::default()
1310 })),
1311 ..Default::default()
1312 },
1313 PartitionMap {
1314 id: 1,
1315 mirror: Some(PartitionMirrorConfig::Home(HomePartitionConfig {
1316 remote_replica: "boats-0".to_string(),
1317 remote_cluster: "boat2".to_string(),
1318 ..Default::default()
1319 })),
1320 replicas: vec![],
1321 },
1322 ]
1323 .into()
1324 );
1325 }
1326}