fluvio_controlplane_metadata/topic/
spec.rs

1use std::ops::{Deref, DerefMut};
2
3use anyhow::{anyhow, Result};
4
5use fluvio_protocol::record::ReplicaKey;
6use fluvio_types::defaults::{
7    STORAGE_RETENTION_SECONDS, SPU_LOG_LOG_SEGMENT_MAX_BYTE_MIN, STORAGE_RETENTION_SECONDS_MIN,
8    SPU_PARTITION_MAX_BYTES_MIN, SPU_LOG_SEGMENT_MAX_BYTES,
9};
10use fluvio_types::SpuId;
11use fluvio_types::{PartitionId, PartitionCount, ReplicationFactor, IgnoreRackAssignment};
12use fluvio_protocol::{Encoder, Decoder};
13
14use crate::partition::{HomePartitionConfig, PartitionMirrorConfig, RemotePartitionConfig};
15
16use super::deduplication::Deduplication;
17
18#[derive(Debug, Clone, PartialEq, Default, Encoder, Decoder)]
19#[cfg_attr(
20    feature = "use_serde",
21    derive(serde::Serialize, serde::Deserialize),
22    serde(rename_all = "camelCase")
23)]
24pub struct TopicSpec {
25    replicas: ReplicaSpec,
26    #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
27    #[fluvio(min_version = 3)]
28    cleanup_policy: Option<CleanupPolicy>,
29    #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
30    #[fluvio(min_version = 4)]
31    storage: Option<TopicStorageConfig>,
32    #[cfg_attr(feature = "use_serde", serde(default))]
33    #[fluvio(min_version = 6)]
34    compression_type: CompressionAlgorithm,
35    #[cfg_attr(feature = "use_serde", serde(default))]
36    #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
37    #[fluvio(min_version = 12)]
38    deduplication: Option<Deduplication>,
39    #[cfg_attr(feature = "use_serde", serde(default))]
40    #[fluvio(min_version = 13)]
41    system: bool,
42}
43
44impl From<ReplicaSpec> for TopicSpec {
45    fn from(replicas: ReplicaSpec) -> Self {
46        Self {
47            replicas,
48            ..Default::default()
49        }
50    }
51}
52
53impl Deref for TopicSpec {
54    type Target = ReplicaSpec;
55
56    fn deref(&self) -> &Self::Target {
57        &self.replicas
58    }
59}
60
61impl TopicSpec {
62    pub fn new_assigned(partition_map: impl Into<PartitionMaps>) -> Self {
63        Self {
64            replicas: ReplicaSpec::new_assigned(partition_map),
65            ..Default::default()
66        }
67    }
68
69    pub fn new_computed(
70        partitions: PartitionCount,
71        replication: ReplicationFactor,
72        ignore_rack: Option<IgnoreRackAssignment>,
73    ) -> Self {
74        Self {
75            replicas: ReplicaSpec::new_computed(partitions, replication, ignore_rack),
76            ..Default::default()
77        }
78    }
79
80    pub fn new_mirror(mirror: MirrorConfig) -> Self {
81        Self {
82            replicas: ReplicaSpec::Mirror(mirror),
83            ..Default::default()
84        }
85    }
86
87    #[inline(always)]
88    pub fn replicas(&self) -> &ReplicaSpec {
89        &self.replicas
90    }
91
92    pub fn set_replicas(&mut self, replicas: ReplicaSpec) {
93        self.replicas = replicas;
94    }
95
96    pub fn set_cleanup_policy(&mut self, policy: CleanupPolicy) {
97        self.cleanup_policy = Some(policy);
98    }
99
100    pub fn get_partition_mirror_map(&self) -> Option<PartitionMaps> {
101        match &self.replicas {
102            ReplicaSpec::Mirror(mirror) => match mirror {
103                MirrorConfig::Remote(e) => Some(e.as_partition_maps()),
104                MirrorConfig::Home(c) => Some(c.as_partition_maps()),
105            },
106            _ => None,
107        }
108    }
109
110    pub fn get_clean_policy(&self) -> Option<&CleanupPolicy> {
111        self.cleanup_policy.as_ref()
112    }
113
114    pub fn set_compression_type(&mut self, compression: CompressionAlgorithm) {
115        self.compression_type = compression;
116    }
117
118    pub fn get_compression_type(&self) -> &CompressionAlgorithm {
119        &self.compression_type
120    }
121
122    pub fn get_storage(&self) -> Option<&TopicStorageConfig> {
123        self.storage.as_ref()
124    }
125
126    pub fn get_storage_mut(&mut self) -> Option<&mut TopicStorageConfig> {
127        self.storage.as_mut()
128    }
129
130    pub fn set_storage(&mut self, storage: TopicStorageConfig) {
131        self.storage = Some(storage);
132    }
133
134    pub fn get_deduplication(&self) -> Option<&Deduplication> {
135        self.deduplication.as_ref()
136    }
137
138    pub fn set_deduplication(&mut self, deduplication: Option<Deduplication>) {
139        self.deduplication = deduplication;
140    }
141
142    pub fn is_system(&self) -> bool {
143        self.system
144    }
145
146    pub fn set_system(&mut self, system: bool) {
147        self.system = system;
148    }
149
150    /// get retention secs that can be displayed
151    pub fn retention_secs(&self) -> u32 {
152        self.get_clean_policy()
153            .map(|policy| policy.retention_secs())
154            .unwrap_or_else(|| STORAGE_RETENTION_SECONDS)
155    }
156
157    /// validate configuration, return string with errors
158    pub fn validate_config(&self) -> Option<String> {
159        if let Some(policy) = self.get_clean_policy() {
160            if policy.retention_secs() < STORAGE_RETENTION_SECONDS_MIN {
161                return Some(format!(
162                    "retention_secs {} is less than minimum {}",
163                    policy.retention_secs(),
164                    STORAGE_RETENTION_SECONDS_MIN
165                ));
166            }
167        }
168
169        if let Some(storage) = self.get_storage() {
170            if let Some(segment_size) = storage.segment_size {
171                if segment_size < SPU_LOG_LOG_SEGMENT_MAX_BYTE_MIN {
172                    return Some(format!(
173                        "segment_size {segment_size} is less than minimum {SPU_LOG_LOG_SEGMENT_MAX_BYTE_MIN}"
174                    ));
175                }
176            }
177            if let Some(max_partition_size) = storage.max_partition_size {
178                if max_partition_size < SPU_PARTITION_MAX_BYTES_MIN {
179                    return Some(format!(
180                        "max_partition_size {max_partition_size} is less than minimum {SPU_PARTITION_MAX_BYTES_MIN}"
181                    ));
182                }
183                let segment_size = storage.segment_size.unwrap_or(SPU_LOG_SEGMENT_MAX_BYTES);
184                if max_partition_size < segment_size as u64 {
185                    return Some(format!(
186                        "max_partition_size {max_partition_size} is less than segment size {segment_size}"
187                    ));
188                }
189            }
190        }
191
192        None
193    }
194}
195
196#[derive(Debug, Clone, Eq, PartialEq, Encoder, Decoder)]
197#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
198pub enum ReplicaSpec {
199    #[cfg_attr(feature = "use_serde", serde(rename = "assigned"))]
200    #[fluvio(tag = 0)]
201    Assigned(PartitionMaps),
202    #[cfg_attr(feature = "use_serde", serde(rename = "computed"))]
203    #[fluvio(tag = 1)]
204    Computed(TopicReplicaParam),
205    #[cfg_attr(
206        feature = "use_serde",
207        serde(rename = "mirror", with = "serde_yaml::with::singleton_map")
208    )]
209    #[fluvio(tag = 2)]
210    #[fluvio(min_version = 14)]
211    Mirror(MirrorConfig),
212}
213
214impl std::fmt::Display for ReplicaSpec {
215    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
216        match self {
217            Self::Assigned(partition_map) => write!(f, "assigned::{partition_map}"),
218            Self::Computed(param) => write!(f, "computed::({param})"),
219            Self::Mirror(param) => write!(f, "mirror::({param})"),
220        }
221    }
222}
223
224impl Default for ReplicaSpec {
225    fn default() -> Self {
226        Self::Computed(TopicReplicaParam::default())
227    }
228}
229
230impl ReplicaSpec {
231    pub fn new_assigned<J>(partition_map: J) -> Self
232    where
233        J: Into<PartitionMaps>,
234    {
235        Self::Assigned(partition_map.into())
236    }
237
238    pub fn new_computed(
239        partitions: PartitionCount,
240        replication: ReplicationFactor,
241        ignore_rack: Option<IgnoreRackAssignment>,
242    ) -> Self {
243        Self::Computed((partitions, replication, ignore_rack.unwrap_or(false)).into())
244    }
245
246    pub fn is_computed(&self) -> bool {
247        matches!(self, Self::Computed(_))
248    }
249
250    pub fn partitions(&self) -> PartitionCount {
251        match &self {
252            Self::Computed(param) => param.partitions,
253            Self::Assigned(partition_map) => partition_map.partition_count(),
254            Self::Mirror(partition_map) => partition_map.partition_count(),
255        }
256    }
257
258    pub fn replication_factor(&self) -> Option<ReplicationFactor> {
259        match self {
260            Self::Computed(param) => Some(param.replication_factor),
261            Self::Assigned(partition_map) => partition_map.replication_factor(),
262            Self::Mirror(partition_map) => partition_map.replication_factor(),
263        }
264    }
265
266    pub fn ignore_rack_assignment(&self) -> IgnoreRackAssignment {
267        match self {
268            Self::Computed(param) => param.ignore_rack_assignment,
269            Self::Assigned(_) => false,
270            Self::Mirror(_) => false,
271        }
272    }
273
274    pub fn type_label(&self) -> &'static str {
275        match self {
276            Self::Computed(_) => "computed",
277            Self::Assigned(_) => "assigned",
278            Self::Mirror(mirror) => match mirror {
279                MirrorConfig::Remote(remote_config) => {
280                    if remote_config.target {
281                        "from-home"
282                    } else {
283                        "to-home"
284                    }
285                }
286                MirrorConfig::Home(home_config) => {
287                    if home_config.0.source {
288                        "to-remote"
289                    } else {
290                        "from-remote"
291                    }
292                }
293            },
294        }
295    }
296
297    pub fn partitions_display(&self) -> String {
298        match self {
299            Self::Computed(param) => param.partitions.to_string(),
300            Self::Assigned(_) => "".to_owned(),
301            Self::Mirror(_) => "".to_owned(),
302        }
303    }
304
305    pub fn replication_factor_display(&self) -> String {
306        match self {
307            Self::Computed(param) => param.replication_factor.to_string(),
308            Self::Assigned(_) => "".to_owned(),
309            Self::Mirror(_) => "".to_owned(),
310        }
311    }
312
313    pub fn ignore_rack_assign_display(&self) -> &'static str {
314        match self {
315            Self::Computed(param) => {
316                if param.ignore_rack_assignment {
317                    "yes"
318                } else {
319                    ""
320                }
321            }
322            Self::Assigned(_) => "",
323            Self::Mirror(_) => "",
324        }
325    }
326
327    pub fn partition_map_str(&self) -> Option<String> {
328        match self {
329            Self::Computed(_) => None,
330            Self::Assigned(partition_map) => Some(partition_map.partition_map_string()),
331            Self::Mirror(mirror) => Some(mirror.as_partition_maps().partition_map_string()),
332        }
333    }
334
335    // -----------------------------------
336    //  Parameter validation
337    // -----------------------------------
338
339    /// Validate partitions
340    pub fn valid_partition(partitions: &PartitionCount) -> Result<()> {
341        if *partitions == 0 {
342            return Err(anyhow!("partition must be greater than 0"));
343        }
344
345        Ok(())
346    }
347
348    /// Validate replication factor
349    pub fn valid_replication_factor(replication: &ReplicationFactor) -> Result<()> {
350        if *replication == 0 {
351            return Err(anyhow!("replication factor must be greater than 0"));
352        }
353
354        Ok(())
355    }
356}
357
358/// Topic param
359#[derive(Debug, Clone, Default, Eq, PartialEq, Encoder, Decoder)]
360#[cfg_attr(
361    feature = "use_serde",
362    derive(serde::Serialize, serde::Deserialize),
363    serde(rename_all = "camelCase")
364)]
365pub struct TopicReplicaParam {
366    #[cfg_attr(feature = "use_serde", serde(default = "default_count"))]
367    pub partitions: PartitionCount,
368    #[cfg_attr(feature = "use_serde", serde(default = "default_count"))]
369    pub replication_factor: ReplicationFactor,
370    #[cfg_attr(
371        feature = "use_serde",
372        serde(skip_serializing_if = "bool::clone", default)
373    )]
374    pub ignore_rack_assignment: IgnoreRackAssignment,
375}
376
377#[allow(dead_code)]
378fn default_count() -> u32 {
379    1
380}
381
382impl TopicReplicaParam {
383    pub fn new(
384        partitions: PartitionCount,
385        replication_factor: ReplicationFactor,
386        ignore_rack_assignment: IgnoreRackAssignment,
387    ) -> Self {
388        Self {
389            partitions,
390            replication_factor,
391            ignore_rack_assignment,
392        }
393    }
394}
395
396impl From<(PartitionCount, ReplicationFactor, IgnoreRackAssignment)> for TopicReplicaParam {
397    fn from(value: (PartitionCount, ReplicationFactor, IgnoreRackAssignment)) -> Self {
398        let (partitions, replication_factor, ignore_rack) = value;
399        Self::new(partitions, replication_factor, ignore_rack)
400    }
401}
402
403impl std::fmt::Display for TopicReplicaParam {
404    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
405        write!(
406            f,
407            "replica param::(p:{}, r:{})",
408            self.partitions, self.replication_factor
409        )
410    }
411}
412
413/// Hack: field instead of new type to get around encode and decode limitations
414#[derive(Debug, Default, Clone, Eq, PartialEq, Encoder, Decoder)]
415#[cfg_attr(
416    feature = "use_serde",
417    derive(serde::Serialize, serde::Deserialize),
418    serde(transparent)
419)]
420pub struct PartitionMaps(Vec<PartitionMap>);
421
422impl From<Vec<PartitionMap>> for PartitionMaps {
423    fn from(maps: Vec<PartitionMap>) -> Self {
424        Self(maps)
425    }
426}
427
428impl From<PartitionMaps> for Vec<PartitionMap> {
429    fn from(maps: PartitionMaps) -> Self {
430        maps.0
431    }
432}
433
434impl From<Vec<(PartitionId, Vec<SpuId>)>> for PartitionMaps {
435    fn from(partition_vec: Vec<(PartitionId, Vec<SpuId>)>) -> Self {
436        let maps: Vec<PartitionMap> = partition_vec
437            .into_iter()
438            .map(|(id, replicas)| PartitionMap {
439                id,
440                replicas,
441                mirror: None,
442            })
443            .collect();
444        maps.into()
445    }
446}
447
448impl std::fmt::Display for PartitionMaps {
449    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
450        write!(f, "partition map:{})", self.0.len())
451    }
452}
453
454impl PartitionMaps {
455    pub fn maps(&self) -> &Vec<PartitionMap> {
456        &self.0
457    }
458
459    pub fn maps_owned(self) -> Vec<PartitionMap> {
460        self.0
461    }
462
463    fn partition_count(&self) -> PartitionCount {
464        self.0.len() as PartitionCount
465    }
466
467    fn replication_factor(&self) -> Option<ReplicationFactor> {
468        // compute replication form replica map
469        self.0
470            .first()
471            .map(|partition| partition.replicas.len() as ReplicationFactor)
472    }
473
474    fn partition_map_string(&self) -> String {
475        use std::fmt::Write;
476
477        let mut res = String::new();
478        for partition in &self.0 {
479            write!(res, "{}:{:?}, ", partition.id, partition.replicas).unwrap();
480            // ok to unwrap since this will not fail
481        }
482        if !res.is_empty() {
483            res.truncate(res.len() - 2);
484        }
485        res
486    }
487
488    // -----------------------------------
489    // Partition Map - Operations
490    // -----------------------------------
491
492    /// Generate a vector with all spu ids represented by all partitions (no duplicates)
493    pub fn unique_spus_in_partition_map(&self) -> Vec<SpuId> {
494        let mut spu_ids: Vec<SpuId> = vec![];
495
496        for partition in &self.0 {
497            for spu in &partition.replicas {
498                if !spu_ids.contains(spu) {
499                    spu_ids.push(*spu);
500                }
501            }
502        }
503
504        spu_ids
505    }
506
507    #[allow(clippy::explicit_counter_loop)]
508    /// Validate partition map for assigned topics
509    pub fn validate(&self) -> Result<()> {
510        // there must be at least one partition in the partition map
511        if self.0.is_empty() {
512            return Err(anyhow!("no assigned partitions found"));
513        }
514
515        // assigned partitions must meet the following criteria
516        //  ids:
517        //      - must start with 0
518        //      - must be in sequence, without gaps
519        //  replicas:
520        //      - must have at least one element
521        //      - all replicas must have the same number of elements.
522        //      - all elements must be unique
523        //      - all elements must be positive integers
524        let mut id = 0;
525        let mut replica_len = 0;
526        for partition in &self.0 {
527            if id == 0 {
528                // id must be 0
529                if partition.id != id {
530                    return Err(anyhow!("assigned partitions must start with id 0",));
531                }
532
533                // replica must have elements
534                replica_len = partition.replicas.len();
535                if replica_len == 0 {
536                    return Err(anyhow!("assigned replicas must have at least one spu id",));
537                }
538            } else {
539                // id must be in sequence
540                if partition.id != id {
541                    return Err(anyhow!(
542                        "assigned partition ids must be in sequence and without gaps"
543                    ));
544                }
545
546                // replica must have same number of elements as previous one
547                if partition.replicas.len() != replica_len {
548                    return Err(anyhow!(
549                        "all assigned replicas must have the same number of spu ids: {replica_len}"
550                    ));
551                }
552            }
553
554            // all replica ids must be unique
555            let mut sorted_replicas = partition.replicas.clone();
556            sorted_replicas.sort_unstable();
557            let unique_count = 1 + sorted_replicas
558                .windows(2)
559                .filter(|pair| pair[0] != pair[1])
560                .count();
561            if partition.replicas.len() != unique_count {
562                return Err(anyhow!(format!(
563                    "duplicate spu ids found in assigned partition with id: {id}"
564                ),));
565            }
566
567            // all ids must be positive numbers
568            for spu_id in &partition.replicas {
569                if *spu_id < 0 {
570                    return Err(anyhow!(
571                        "invalid spu id: {spu_id} in assigned partition with id: {id}"
572                    ));
573                }
574            }
575
576            // increment id for next iteration
577            id += 1;
578        }
579
580        Ok(())
581    }
582}
583
584impl From<(PartitionCount, ReplicationFactor, IgnoreRackAssignment)> for TopicSpec {
585    fn from(spec: (PartitionCount, ReplicationFactor, IgnoreRackAssignment)) -> Self {
586        let (count, factor, rack) = spec;
587        Self::new_computed(count, factor, Some(rack))
588    }
589}
590
591/// convert from tuple with partition and replication with rack off
592impl From<(PartitionCount, ReplicationFactor)> for TopicSpec {
593    fn from(spec: (PartitionCount, ReplicationFactor)) -> Self {
594        let (count, factor) = spec;
595        Self::new_computed(count, factor, Some(false))
596    }
597}
598
599#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
600#[cfg_attr(
601    feature = "use_serde",
602    derive(serde::Serialize, serde::Deserialize),
603    derive(schemars::JsonSchema)
604)]
605pub struct PartitionMap {
606    pub id: PartitionId,
607    pub replicas: Vec<SpuId>,
608    #[cfg_attr(
609        feature = "use_serde",
610        serde(default, skip_serializing_if = "Option::is_none")
611    )]
612    #[fluvio(min_version = 14)]
613    pub mirror: Option<PartitionMirrorConfig>,
614}
615
616#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
617#[cfg_attr(
618    feature = "use_serde",
619    derive(serde::Serialize, serde::Deserialize),
620    serde(rename_all = "camelCase")
621)]
622pub enum MirrorConfig {
623    #[fluvio(tag = 0)]
624    Remote(RemoteMirrorConfig),
625    #[fluvio(tag = 1)]
626    Home(HomeMirrorConfig),
627}
628
629impl Default for MirrorConfig {
630    fn default() -> Self {
631        Self::Remote(RemoteMirrorConfig::default())
632    }
633}
634
635impl std::fmt::Display for MirrorConfig {
636    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
637        match self {
638            MirrorConfig::Remote(r) => {
639                write!(f, "Mirror Remote {r:?} ")
640            }
641            MirrorConfig::Home(h) => {
642                write!(f, "Mirror Home {h:?} ")
643            }
644        }
645    }
646}
647
648impl MirrorConfig {
649    pub fn partition_count(&self) -> PartitionCount {
650        match self {
651            MirrorConfig::Remote(src) => src.partition_count(),
652            MirrorConfig::Home(tg) => tg.partition_count(),
653        }
654    }
655
656    pub fn replication_factor(&self) -> Option<ReplicationFactor> {
657        None
658    }
659
660    pub fn as_partition_maps(&self) -> PartitionMaps {
661        match self {
662            MirrorConfig::Remote(src) => src.as_partition_maps(),
663            MirrorConfig::Home(tg) => tg.as_partition_maps(),
664        }
665    }
666
667    /// Set home to remote replication
668    pub fn set_home_to_remote(&mut self, home_to_remote: bool) -> Result<()> {
669        match self {
670            Self::Remote(_) => Err(anyhow!(
671                "remote mirror config cannot be set to home to remote"
672            )),
673            Self::Home(home) => {
674                home.set_home_to_remote(home_to_remote);
675                Ok(())
676            }
677        }
678    }
679
680    /// Validate partition map for assigned topics
681    pub fn validate(&self) -> anyhow::Result<()> {
682        Ok(())
683    }
684}
685
686type Partitions = Vec<HomePartitionConfig>;
687
688#[cfg_attr(
689    feature = "use_serde",
690    derive(serde::Serialize),
691    serde(rename_all = "camelCase", untagged)
692)]
693enum MultiHome {
694    V1(Partitions),
695    V2(HomeMirrorInner),
696}
697
698#[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)]
699#[cfg_attr(
700    feature = "use_serde",
701    derive(serde::Serialize, serde::Deserialize),
702    serde(rename_all = "camelCase")
703)]
704pub struct HomeMirrorConfig(
705    #[cfg_attr(feature = "use_serde", serde(deserialize_with = "from_home_v1"))] HomeMirrorInner,
706);
707
708impl Deref for HomeMirrorConfig {
709    type Target = HomeMirrorInner;
710
711    fn deref(&self) -> &Self::Target {
712        &self.0
713    }
714}
715
716impl DerefMut for HomeMirrorConfig {
717    fn deref_mut(&mut self) -> &mut Self::Target {
718        &mut self.0
719    }
720}
721
722impl HomeMirrorConfig {
723    /// generate home config from simple mirror cluster list
724    /// this uses home topic to generate remote replicas
725    pub fn from_simple(topic: &str, remote_clusters: Vec<String>) -> Self {
726        Self(HomeMirrorInner {
727            partitions: remote_clusters
728                .into_iter()
729                .map(|remote_cluster| HomePartitionConfig {
730                    remote_cluster,
731                    remote_replica: { ReplicaKey::new(topic, 0_u32).to_string() },
732                    ..Default::default()
733                })
734                .collect(),
735            source: false,
736        })
737    }
738}
739
740cfg_if::cfg_if! {
741    if #[cfg(feature = "use_serde")] {
742        impl<'de> serde::Deserialize<'de> for MultiHome {
743            fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
744            where
745                D: serde::Deserializer<'de>,
746            {
747                struct MultiHomeVisitor;
748
749                impl<'de> serde::de::Visitor<'de> for MultiHomeVisitor {
750                    type Value = MultiHome;
751
752                    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
753                        formatter.write_str("an array or an object")
754                    }
755
756                    fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
757                    where
758                        A: serde::de::SeqAccess<'de>,
759                    {
760                        let mut elements = vec![];
761                        while let Some(value) = seq.next_element::<HomePartitionConfig>()? {
762                            elements.push(value);
763                        }
764                        Ok(MultiHome::V1(elements))
765                    }
766
767                    fn visit_map<M>(self, map: M) -> Result<Self::Value, M::Error>
768                    where
769                        M: serde::de::MapAccess<'de>,
770                    {
771                        use serde::de::value::MapAccessDeserializer;
772                        let obj: HomeMirrorInner = serde::Deserialize::deserialize(MapAccessDeserializer::new(map))?;
773                        Ok(MultiHome::V2(obj))
774                    }
775                }
776
777                deserializer.deserialize_any(MultiHomeVisitor)
778            }
779        }
780
781        fn from_home_v1<'de, D>(deserializer: D) -> Result<HomeMirrorInner, D::Error>
782        where
783            D: serde::Deserializer<'de>,
784        {
785            let home: MultiHome = serde::Deserialize::deserialize(deserializer)?;
786            match home {
787                MultiHome::V1(v1) => Ok(HomeMirrorInner {
788                    partitions: v1,
789                    source: false,
790                }),
791                MultiHome::V2(v2) => Ok(v2),
792            }
793        }
794    }
795}
796
797#[derive(Default, Debug, Clone, Eq, PartialEq, Decoder, Encoder)]
798#[cfg_attr(
799    feature = "use_serde",
800    derive(serde::Serialize, serde::Deserialize),
801    serde(rename_all = "camelCase")
802)]
803pub struct HomeMirrorInner {
804    #[cfg_attr(feature = "use_serde", serde(default))]
805    pub partitions: Vec<HomePartitionConfig>,
806    #[cfg_attr(
807        feature = "use_serde",
808        serde(skip_serializing_if = "crate::is_false", default)
809    )]
810    #[fluvio(min_version = 18)]
811    pub source: bool, // source of mirror
812}
813
814impl From<Vec<HomePartitionConfig>> for HomeMirrorConfig {
815    fn from(partitions: Vec<HomePartitionConfig>) -> Self {
816        Self(HomeMirrorInner {
817            partitions,
818            source: false,
819        })
820    }
821}
822
823impl HomeMirrorInner {
824    pub fn partition_count(&self) -> PartitionCount {
825        self.partitions.len() as PartitionCount
826    }
827
828    pub fn replication_factor(&self) -> Option<ReplicationFactor> {
829        None
830    }
831
832    pub fn partitions(&self) -> &Vec<HomePartitionConfig> {
833        &self.partitions
834    }
835
836    pub fn as_partition_maps(&self) -> PartitionMaps {
837        let mut maps = vec![];
838        for (partition_id, home_partition) in self.partitions.iter().enumerate() {
839            maps.push(PartitionMap {
840                id: partition_id as u32,
841                mirror: Some(PartitionMirrorConfig::Home(home_partition.clone())),
842                ..Default::default()
843            });
844        }
845        maps.into()
846    }
847
848    /// Validate partition map for assigned topics
849    pub fn validate(&self) -> anyhow::Result<()> {
850        Ok(())
851    }
852
853    /// Add partition to home mirror config
854    pub fn add_partition(&mut self, partition: HomePartitionConfig) {
855        self.partitions.push(partition);
856    }
857
858    /// set home to remote replication
859    pub fn set_home_to_remote(&mut self, home_to_remote: bool) {
860        self.source = home_to_remote;
861        self.partitions.iter_mut().for_each(|partition| {
862            partition.source = home_to_remote;
863        });
864    }
865}
866
867#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
868#[cfg_attr(
869    feature = "use_serde",
870    derive(serde::Serialize, serde::Deserialize),
871    serde(rename_all = "camelCase")
872)]
873pub struct HomeMirrorPartition {
874    pub remote_clusters: Vec<String>,
875}
876
877#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
878#[cfg_attr(
879    feature = "use_serde",
880    derive(serde::Serialize, serde::Deserialize),
881    serde(rename_all = "camelCase")
882)]
883pub struct RemoteMirrorConfig {
884    // source of mirror
885    pub home_cluster: String,
886    pub home_spus: Vec<SpuMirrorConfig>,
887    #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "crate::is_false"))]
888    #[fluvio(min_version = 18)]
889    pub target: bool,
890}
891
892#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
893#[cfg_attr(
894    feature = "use_serde",
895    derive(serde::Serialize, serde::Deserialize),
896    serde(rename_all = "camelCase")
897)]
898pub struct SpuMirrorConfig {
899    pub id: SpuId,
900    pub key: String,
901    pub endpoint: String,
902}
903
904impl RemoteMirrorConfig {
905    pub fn partition_count(&self) -> PartitionCount {
906        self.home_spus.len() as PartitionCount
907    }
908
909    pub fn replication_factor(&self) -> Option<ReplicationFactor> {
910        None
911    }
912
913    pub fn spus(&self) -> &Vec<SpuMirrorConfig> {
914        &self.home_spus
915    }
916
917    pub fn as_partition_maps(&self) -> PartitionMaps {
918        let mut maps = vec![];
919        for (partition_id, home_spu) in self.home_spus.iter().enumerate() {
920            maps.push(PartitionMap {
921                id: partition_id as u32,
922                mirror: Some(PartitionMirrorConfig::Remote(RemotePartitionConfig {
923                    home_spu_key: home_spu.key.clone(),
924                    home_spu_id: home_spu.id,
925                    home_cluster: self.home_cluster.clone(),
926                    home_spu_endpoint: home_spu.endpoint.clone(),
927                    target: self.target,
928                })),
929                ..Default::default()
930            });
931        }
932        maps.into()
933    }
934
935    /// Validate partition map for assigned topics
936    pub fn validate(&self) -> anyhow::Result<()> {
937        Ok(())
938    }
939}
940
941#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
942#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
943pub enum CleanupPolicy {
944    #[cfg_attr(feature = "use_serde", serde(rename = "segment"))]
945    #[fluvio(tag = 0)]
946    Segment(SegmentBasedPolicy),
947}
948
949impl Default for CleanupPolicy {
950    fn default() -> Self {
951        CleanupPolicy::Segment(SegmentBasedPolicy::default())
952    }
953}
954
955impl CleanupPolicy {
956    pub fn retention_secs(&self) -> u32 {
957        match self {
958            CleanupPolicy::Segment(policy) => policy.retention_secs(),
959        }
960    }
961}
962
963#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
964#[cfg_attr(
965    feature = "use_serde",
966    derive(serde::Serialize, serde::Deserialize),
967    serde(rename_all = "camelCase")
968)]
969pub struct SegmentBasedPolicy {
970    pub time_in_seconds: u32,
971}
972
973impl SegmentBasedPolicy {
974    pub fn retention_secs(&self) -> u32 {
975        self.time_in_seconds
976    }
977}
978
979#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
980#[cfg_attr(
981    feature = "use_serde",
982    derive(serde::Serialize, serde::Deserialize),
983    serde(rename_all = "camelCase")
984)]
985pub struct TopicStorageConfig {
986    pub segment_size: Option<u32>,       // segment size
987    pub max_partition_size: Option<u64>, // max partition size
988}
989
990#[derive(Decoder, Default, Encoder, Debug, Clone, Eq, PartialEq)]
991#[cfg_attr(
992    feature = "use_serde",
993    derive(serde::Serialize, serde::Deserialize),
994    derive(schemars::JsonSchema)
995)]
996pub enum CompressionAlgorithm {
997    #[fluvio(tag = 0)]
998    None,
999    #[fluvio(tag = 1)]
1000    Gzip,
1001    #[fluvio(tag = 2)]
1002    Snappy,
1003    #[fluvio(tag = 3)]
1004    Lz4,
1005    #[default]
1006    #[fluvio(tag = 4)]
1007    Any,
1008    #[fluvio(tag = 5)]
1009    Zstd,
1010}
1011
1012#[derive(Debug, thiserror::Error)]
1013#[error("Invalid compression type in topic")]
1014pub struct InvalidCompressionAlgorithm;
1015
1016impl std::str::FromStr for CompressionAlgorithm {
1017    type Err = InvalidCompressionAlgorithm;
1018
1019    fn from_str(s: &str) -> Result<Self, Self::Err> {
1020        match s.to_lowercase().as_str() {
1021            "none" => Ok(CompressionAlgorithm::None),
1022            "gzip" => Ok(CompressionAlgorithm::Gzip),
1023            "snappy" => Ok(CompressionAlgorithm::Snappy),
1024            "lz4" => Ok(CompressionAlgorithm::Lz4),
1025            "any" => Ok(CompressionAlgorithm::Any),
1026            "zstd" => Ok(CompressionAlgorithm::Zstd),
1027            _ => Err(InvalidCompressionAlgorithm),
1028        }
1029    }
1030}
1031impl std::fmt::Display for CompressionAlgorithm {
1032    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
1033        match self {
1034            Self::None => write!(f, "none"),
1035            Self::Gzip => write!(f, "gzip"),
1036            Self::Snappy => write!(f, "snappy"),
1037            Self::Lz4 => write!(f, "lz4"),
1038            Self::Any => write!(f, "any"),
1039            Self::Zstd => write!(f, "zstd"),
1040        }
1041    }
1042}
1043
1044#[cfg(test)]
1045mod test {
1046
1047    use std::io::Cursor;
1048
1049    use crate::topic::{Bounds, Filter, Transform};
1050
1051    use super::*;
1052
1053    #[test]
1054    fn test_is_computed_topic() {
1055        let p1: PartitionMaps = vec![(1, vec![0]), (2, vec![2])].into();
1056        let t1 = ReplicaSpec::new_assigned(p1);
1057        assert!(!t1.is_computed());
1058
1059        let t2 = ReplicaSpec::new_computed(0, 0, None);
1060        assert!(t2.is_computed());
1061    }
1062
1063    #[test]
1064    fn test_valid_computed_replica_params() {
1065        // 0 is not a valid partition
1066        let t2_result = ReplicaSpec::valid_partition(&0);
1067        assert!(t2_result.is_err());
1068        assert_eq!(
1069            format!("{}", t2_result.unwrap_err()),
1070            "partition must be greater than 0"
1071        );
1072
1073        let t3_result = ReplicaSpec::valid_partition(&1);
1074        assert!(t3_result.is_ok());
1075
1076        // 0 is not a valid replication factor
1077        let t5_result = ReplicaSpec::valid_replication_factor(&0);
1078        assert!(t5_result.is_err());
1079        assert_eq!(
1080            format!("{}", t5_result.unwrap_err()),
1081            "replication factor must be greater than 0"
1082        );
1083
1084        // positive numbers are OK
1085        let t6_result = ReplicaSpec::valid_replication_factor(&1);
1086        assert!(t6_result.is_ok());
1087    }
1088
1089    //  Replica Map ids:
1090    //      - must start with 0
1091    //      - must be in sequence, without gaps
1092    #[test]
1093    fn test_replica_map_ids() {
1094        // id starts from 1 rather than 0
1095        let p1: PartitionMaps = vec![(1, vec![0]), (2, vec![2])].into();
1096        let p1_result = p1.validate();
1097        assert!(p1_result.is_err());
1098        assert_eq!(
1099            format!("{}", p1_result.unwrap_err()),
1100            "assigned partitions must start with id 0"
1101        );
1102
1103        // id has a gap
1104        let p2: PartitionMaps = vec![(0, vec![0]), (2, vec![2])].into();
1105        let p2_result = p2.validate();
1106        assert!(p2_result.is_err());
1107        assert_eq!(
1108            format!("{}", p2_result.unwrap_err()),
1109            "assigned partition ids must be in sequence and without gaps"
1110        );
1111
1112        // ids are out of sequence
1113        let p3: PartitionMaps = vec![(0, vec![0]), (2, vec![2]), (1, vec![1])].into();
1114        let p3_result = p3.validate();
1115        assert!(p3_result.is_err());
1116        assert_eq!(
1117            format!("{}", p3_result.unwrap_err()),
1118            "assigned partition ids must be in sequence and without gaps"
1119        );
1120
1121        // duplicate ids
1122        let p4: PartitionMaps = vec![(0, vec![0]), (1, vec![1]), (1, vec![1])].into();
1123        let p4_result = p4.validate();
1124        assert!(p4_result.is_err());
1125        assert_eq!(
1126            format!("{}", p4_result.unwrap_err()),
1127            "assigned partition ids must be in sequence and without gaps"
1128        );
1129
1130        // ids are ok
1131        let p5: PartitionMaps = vec![(0, vec![1]), (1, vec![1]), (2, vec![2])].into();
1132        let p5_result = p5.validate();
1133        assert!(p5_result.is_ok());
1134    }
1135
1136    //  Replica Map replicas:
1137    //      - replicas must have at least one element
1138    //      - all replicas must have the same number of elements
1139    //      - all elements must be unique
1140    //      - all elements must be positive integers
1141    #[test]
1142    fn test_replica_map_spu_ids() {
1143        // replicas must have at least one element
1144        let p1: PartitionMaps = vec![(0, vec![]), (1, vec![1])].into();
1145        let p1_result = p1.validate();
1146        assert!(p1_result.is_err());
1147        assert_eq!(
1148            format!("{}", p1_result.unwrap_err()),
1149            "assigned replicas must have at least one spu id"
1150        );
1151
1152        // all replicas must have the same number of elements
1153        let p2: PartitionMaps = vec![(0, vec![1, 2]), (1, vec![1])].into();
1154        let p2_result = p2.validate();
1155        assert!(p2_result.is_err());
1156        assert_eq!(
1157            format!("{}", p2_result.unwrap_err()),
1158            "all assigned replicas must have the same number of spu ids: 2"
1159        );
1160
1161        // all elements must be unique
1162        let p3: PartitionMaps = vec![(0, vec![1, 2]), (1, vec![1, 1])].into();
1163        let p3_result = p3.validate();
1164        assert!(p3_result.is_err());
1165        assert_eq!(
1166            format!("{}", p3_result.unwrap_err()),
1167            "duplicate spu ids found in assigned partition with id: 1"
1168        );
1169
1170        // all elements must be unique
1171        let p4: PartitionMaps = vec![(0, vec![3, 1, 2, 3])].into();
1172        let p4_result = p4.validate();
1173        assert!(p4_result.is_err());
1174        assert_eq!(
1175            format!("{}", p4_result.unwrap_err()),
1176            "duplicate spu ids found in assigned partition with id: 0"
1177        );
1178
1179        // all elements must be positive integers
1180        let p5: PartitionMaps = vec![(0, vec![1, 2]), (1, vec![1, -2])].into();
1181        let p5_result = p5.validate();
1182        assert!(p5_result.is_err());
1183        assert_eq!(
1184            format!("{}", p5_result.unwrap_err()),
1185            "invalid spu id: -2 in assigned partition with id: 1"
1186        );
1187    }
1188
1189    // Partitions repeatedly reference spu-ids. The purpose of
1190    // this API is to return a list of all unique SPUs
1191    #[test]
1192    fn test_unique_spus_in_partition_map() {
1193        // id starts from 1 rather than 0
1194        let p1: PartitionMaps =
1195            vec![(0, vec![0, 1, 3]), (1, vec![0, 2, 3]), (2, vec![1, 3, 4])].into();
1196
1197        let p1_result = p1.unique_spus_in_partition_map();
1198        let expected_p1_result: Vec<SpuId> = vec![0, 1, 3, 2, 4];
1199        assert_eq!(p1_result, expected_p1_result);
1200    }
1201
1202    #[test]
1203    fn test_encode_decode_computed_topic_spec() {
1204        let topic_spec = ReplicaSpec::Computed((2, 3, true).into());
1205        let mut dest = vec![];
1206
1207        // test encode
1208        let result = topic_spec.encode(&mut dest, 0);
1209        assert!(result.is_ok());
1210
1211        let expected_dest = [
1212            0x01, // type
1213            0x00, 0x00, 0x00, 0x02, // partition cnt
1214            0x00, 0x00, 0x00, 0x03, // replica cnt
1215            0x01, // ignore_rack_assignment
1216        ];
1217        assert_eq!(dest, expected_dest);
1218
1219        // test encode
1220        let mut topic_spec_decoded = ReplicaSpec::default();
1221        let result = topic_spec_decoded.decode(&mut Cursor::new(&expected_dest), 0);
1222        assert!(result.is_ok());
1223
1224        match topic_spec_decoded {
1225            ReplicaSpec::Computed(param) => {
1226                assert_eq!(param.partitions, 2);
1227                assert_eq!(param.replication_factor, 3);
1228                assert!(param.ignore_rack_assignment);
1229            }
1230            _ => panic!("expect computed topic spec, found {topic_spec_decoded:?}"),
1231        }
1232    }
1233
1234    #[test]
1235    fn test_topic_with_dedup_prev_version_compatibility() {
1236        //given
1237        let prev_version = 11;
1238        let mut topic_spec: TopicSpec = ReplicaSpec::Computed((2, 3, true).into()).into();
1239        topic_spec.set_deduplication(Some(Deduplication {
1240            bounds: Bounds {
1241                count: 1,
1242                age: None,
1243            },
1244            filter: Filter {
1245                transform: Transform {
1246                    uses: "filter".to_string(),
1247                    ..Default::default()
1248                },
1249            },
1250        }));
1251
1252        //when
1253        let mut dest = vec![];
1254        topic_spec.encode(&mut dest, prev_version).expect("encoded");
1255        let mut topic_spec_decoded = TopicSpec::default();
1256        topic_spec_decoded
1257            .decode(&mut Cursor::new(&dest), prev_version)
1258            .expect("decoded");
1259
1260        //then
1261        assert!(topic_spec_decoded.deduplication.is_none());
1262    }
1263
1264    #[test]
1265    fn test_partition_map_str() {
1266        // Test multiple
1267        let p1: PartitionMaps =
1268            vec![(0, vec![0, 1, 3]), (1, vec![0, 2, 3]), (2, vec![1, 3, 4])].into();
1269        let spec = ReplicaSpec::new_assigned(p1);
1270        assert_eq!(
1271            spec.partition_map_str(),
1272            Some("0:[0, 1, 3], 1:[0, 2, 3], 2:[1, 3, 4]".to_string())
1273        );
1274
1275        // Test empty
1276        let p2 = PartitionMaps::default();
1277        let spec2 = ReplicaSpec::new_assigned(p2);
1278        assert_eq!(spec2.partition_map_str(), Some("".to_string()));
1279    }
1280
1281    #[test]
1282    fn test_deserialize_home_mirror_config() {
1283        let data = r#"{"partitions":[{"remoteCluster":"boat1","remoteReplica":"boats-0","source":false},{"remoteCluster":"boat2","remoteReplica":"boats-0","source":false}]}"#;
1284        let home_mirror: HomeMirrorConfig = serde_json::from_str(data).expect("deserialize");
1285        assert_eq!(home_mirror.partitions().len(), 2);
1286    }
1287}
1288
1289#[cfg(test)]
1290mod mirror_test {
1291    use crate::{
1292        topic::{PartitionMap, HomeMirrorConfig},
1293        partition::{PartitionMirrorConfig, HomePartitionConfig},
1294    };
1295
1296    /// test generating home mirror config from simple array of remote cluster strings
1297    #[test]
1298    fn test_home_mirror_conversion() {
1299        let mirror =
1300            HomeMirrorConfig::from_simple("boats", vec!["boat1".to_owned(), "boat2".to_owned()]);
1301        assert_eq!(
1302            mirror.as_partition_maps(),
1303            vec![
1304                PartitionMap {
1305                    id: 0,
1306                    mirror: Some(PartitionMirrorConfig::Home(HomePartitionConfig {
1307                        remote_replica: "boats-0".to_string(),
1308                        remote_cluster: "boat1".to_owned(),
1309                        ..Default::default()
1310                    })),
1311                    ..Default::default()
1312                },
1313                PartitionMap {
1314                    id: 1,
1315                    mirror: Some(PartitionMirrorConfig::Home(HomePartitionConfig {
1316                        remote_replica: "boats-0".to_string(),
1317                        remote_cluster: "boat2".to_string(),
1318                        ..Default::default()
1319                    })),
1320                    replicas: vec![],
1321                },
1322            ]
1323            .into()
1324        );
1325    }
1326}