fluvio_controlplane_metadata/topic/
spec.rs

1use std::ops::{Deref, DerefMut};
2
3use anyhow::{anyhow, Result};
4
5use fluvio_protocol::record::ReplicaKey;
6use fluvio_types::defaults::{
7    STORAGE_RETENTION_SECONDS, SPU_LOG_LOG_SEGMENT_MAX_BYTE_MIN, STORAGE_RETENTION_SECONDS_MIN,
8    SPU_PARTITION_MAX_BYTES_MIN, SPU_LOG_SEGMENT_MAX_BYTES,
9};
10use fluvio_types::SpuId;
11use fluvio_types::{PartitionId, PartitionCount, ReplicationFactor, IgnoreRackAssignment};
12use fluvio_protocol::{Encoder, Decoder};
13
14use crate::partition::{HomePartitionConfig, PartitionMirrorConfig, RemotePartitionConfig};
15
16use super::deduplication::Deduplication;
17
18#[derive(Debug, Clone, PartialEq, Default, Encoder, Decoder)]
19#[cfg_attr(
20    feature = "use_serde",
21    derive(serde::Serialize, serde::Deserialize),
22    serde(rename_all = "camelCase")
23)]
24pub struct TopicSpec {
25    replicas: ReplicaSpec,
26    #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
27    #[fluvio(min_version = 3)]
28    cleanup_policy: Option<CleanupPolicy>,
29    #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
30    #[fluvio(min_version = 4)]
31    storage: Option<TopicStorageConfig>,
32    #[cfg_attr(feature = "use_serde", serde(default))]
33    #[fluvio(min_version = 6)]
34    compression_type: CompressionAlgorithm,
35    #[cfg_attr(feature = "use_serde", serde(default))]
36    #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
37    #[fluvio(min_version = 12)]
38    deduplication: Option<Deduplication>,
39    #[cfg_attr(feature = "use_serde", serde(default))]
40    #[fluvio(min_version = 13)]
41    system: bool,
42}
43
44impl From<ReplicaSpec> for TopicSpec {
45    fn from(replicas: ReplicaSpec) -> Self {
46        Self {
47            replicas,
48            ..Default::default()
49        }
50    }
51}
52
53impl Deref for TopicSpec {
54    type Target = ReplicaSpec;
55
56    fn deref(&self) -> &Self::Target {
57        &self.replicas
58    }
59}
60
61impl TopicSpec {
62    pub fn new_assigned(partition_map: impl Into<PartitionMaps>) -> Self {
63        Self {
64            replicas: ReplicaSpec::new_assigned(partition_map),
65            ..Default::default()
66        }
67    }
68
69    pub fn new_computed(
70        partitions: PartitionCount,
71        replication: ReplicationFactor,
72        ignore_rack: Option<IgnoreRackAssignment>,
73    ) -> Self {
74        Self {
75            replicas: ReplicaSpec::new_computed(partitions, replication, ignore_rack),
76            ..Default::default()
77        }
78    }
79
80    pub fn new_mirror(mirror: MirrorConfig) -> Self {
81        Self {
82            replicas: ReplicaSpec::Mirror(mirror),
83            ..Default::default()
84        }
85    }
86
87    #[inline(always)]
88    pub fn replicas(&self) -> &ReplicaSpec {
89        &self.replicas
90    }
91
92    pub fn set_replicas(&mut self, replicas: ReplicaSpec) {
93        self.replicas = replicas;
94    }
95
96    pub fn set_cleanup_policy(&mut self, policy: CleanupPolicy) {
97        self.cleanup_policy = Some(policy);
98    }
99
100    pub fn get_partition_mirror_map(&self) -> Option<PartitionMaps> {
101        match &self.replicas {
102            ReplicaSpec::Mirror(mirror) => match mirror {
103                MirrorConfig::Remote(e) => Some(e.as_partition_maps()),
104                MirrorConfig::Home(c) => Some(c.as_partition_maps()),
105            },
106            _ => None,
107        }
108    }
109
110    pub fn get_clean_policy(&self) -> Option<&CleanupPolicy> {
111        self.cleanup_policy.as_ref()
112    }
113
114    pub fn set_compression_type(&mut self, compression: CompressionAlgorithm) {
115        self.compression_type = compression;
116    }
117
118    pub fn get_compression_type(&self) -> &CompressionAlgorithm {
119        &self.compression_type
120    }
121
122    pub fn get_storage(&self) -> Option<&TopicStorageConfig> {
123        self.storage.as_ref()
124    }
125
126    pub fn get_storage_mut(&mut self) -> Option<&mut TopicStorageConfig> {
127        self.storage.as_mut()
128    }
129
130    pub fn set_storage(&mut self, storage: TopicStorageConfig) {
131        self.storage = Some(storage);
132    }
133
134    pub fn get_deduplication(&self) -> Option<&Deduplication> {
135        self.deduplication.as_ref()
136    }
137
138    pub fn set_deduplication(&mut self, deduplication: Option<Deduplication>) {
139        self.deduplication = deduplication;
140    }
141
142    pub fn is_system(&self) -> bool {
143        self.system
144    }
145
146    pub fn set_system(&mut self, system: bool) {
147        self.system = system;
148    }
149
150    /// get retention secs that can be displayed
151    pub fn retention_secs(&self) -> u32 {
152        self.get_clean_policy()
153            .map(|policy| policy.retention_secs())
154            .unwrap_or_else(|| STORAGE_RETENTION_SECONDS)
155    }
156
157    /// validate configuration, return string with errors
158    pub fn validate_config(&self) -> Option<String> {
159        if let Some(policy) = self.get_clean_policy() {
160            if policy.retention_secs() < STORAGE_RETENTION_SECONDS_MIN {
161                return Some(format!(
162                    "retention_secs {} is less than minimum {}",
163                    policy.retention_secs(),
164                    STORAGE_RETENTION_SECONDS_MIN
165                ));
166            }
167        }
168
169        if let Some(storage) = self.get_storage() {
170            if let Some(segment_size) = storage.segment_size {
171                if segment_size < SPU_LOG_LOG_SEGMENT_MAX_BYTE_MIN {
172                    return Some(format!(
173                        "segment_size {segment_size} is less than minimum {SPU_LOG_LOG_SEGMENT_MAX_BYTE_MIN}"
174                    ));
175                }
176            }
177            if let Some(max_partition_size) = storage.max_partition_size {
178                if max_partition_size < SPU_PARTITION_MAX_BYTES_MIN {
179                    return Some(format!(
180                        "max_partition_size {max_partition_size} is less than minimum {SPU_PARTITION_MAX_BYTES_MIN}"
181                    ));
182                }
183                let segment_size = storage.segment_size.unwrap_or(SPU_LOG_SEGMENT_MAX_BYTES);
184                if max_partition_size < segment_size as u64 {
185                    return Some(format!(
186                        "max_partition_size {max_partition_size} is less than segment size {segment_size}"
187                    ));
188                }
189            }
190        }
191
192        None
193    }
194}
195
196#[derive(Debug, Clone, Eq, PartialEq, Encoder, Decoder)]
197#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
198pub enum ReplicaSpec {
199    #[cfg_attr(feature = "use_serde", serde(rename = "assigned"))]
200    #[fluvio(tag = 0)]
201    Assigned(PartitionMaps),
202    #[cfg_attr(feature = "use_serde", serde(rename = "computed"))]
203    #[fluvio(tag = 1)]
204    Computed(TopicReplicaParam),
205    #[cfg_attr(
206        feature = "use_serde",
207        serde(rename = "mirror", with = "serde_yaml::with::singleton_map")
208    )]
209    #[fluvio(tag = 2)]
210    #[fluvio(min_version = 14)]
211    Mirror(MirrorConfig),
212}
213
214impl std::fmt::Display for ReplicaSpec {
215    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
216        match self {
217            Self::Assigned(partition_map) => write!(f, "assigned::{partition_map}"),
218            Self::Computed(param) => write!(f, "computed::({param})"),
219            Self::Mirror(param) => write!(f, "mirror::({param})"),
220        }
221    }
222}
223
224impl Default for ReplicaSpec {
225    fn default() -> Self {
226        Self::Computed(TopicReplicaParam::default())
227    }
228}
229
230impl ReplicaSpec {
231    pub fn new_assigned<J>(partition_map: J) -> Self
232    where
233        J: Into<PartitionMaps>,
234    {
235        Self::Assigned(partition_map.into())
236    }
237
238    pub fn new_computed(
239        partitions: PartitionCount,
240        replication: ReplicationFactor,
241        ignore_rack: Option<IgnoreRackAssignment>,
242    ) -> Self {
243        Self::Computed((partitions, replication, ignore_rack.unwrap_or(false)).into())
244    }
245
246    pub fn is_computed(&self) -> bool {
247        matches!(self, Self::Computed(_))
248    }
249
250    pub fn partitions(&self) -> PartitionCount {
251        match &self {
252            Self::Computed(param) => param.partitions,
253            Self::Assigned(partition_map) => partition_map.partition_count(),
254            Self::Mirror(partition_map) => partition_map.partition_count(),
255        }
256    }
257
258    pub fn replication_factor(&self) -> Option<ReplicationFactor> {
259        match self {
260            Self::Computed(param) => Some(param.replication_factor),
261            Self::Assigned(partition_map) => partition_map.replication_factor(),
262            Self::Mirror(partition_map) => partition_map.replication_factor(),
263        }
264    }
265
266    pub fn ignore_rack_assignment(&self) -> IgnoreRackAssignment {
267        match self {
268            Self::Computed(param) => param.ignore_rack_assignment,
269            Self::Assigned(_) => false,
270            Self::Mirror(_) => false,
271        }
272    }
273
274    pub fn type_label(&self) -> &'static str {
275        match self {
276            Self::Computed(_) => "computed",
277            Self::Assigned(_) => "assigned",
278            Self::Mirror(mirror) => match mirror {
279                MirrorConfig::Remote(remote_config) => {
280                    if remote_config.target {
281                        "from-home"
282                    } else {
283                        "to-home"
284                    }
285                }
286                MirrorConfig::Home(home_config) => {
287                    if home_config.0.source {
288                        "to-remote"
289                    } else {
290                        "from-remote"
291                    }
292                }
293            },
294        }
295    }
296
297    pub fn partitions_display(&self) -> String {
298        match self {
299            Self::Computed(param) => param.partitions.to_string(),
300            Self::Assigned(_) => "".to_owned(),
301            Self::Mirror(_) => "".to_owned(),
302        }
303    }
304
305    pub fn replication_factor_display(&self) -> String {
306        match self {
307            Self::Computed(param) => param.replication_factor.to_string(),
308            Self::Assigned(_) => "".to_owned(),
309            Self::Mirror(_) => "".to_owned(),
310        }
311    }
312
313    pub fn ignore_rack_assign_display(&self) -> &'static str {
314        match self {
315            Self::Computed(param) => {
316                if param.ignore_rack_assignment {
317                    "yes"
318                } else {
319                    ""
320                }
321            }
322            Self::Assigned(_) => "",
323            Self::Mirror(_) => "",
324        }
325    }
326
327    pub fn partition_map_str(&self) -> Option<String> {
328        match self {
329            Self::Computed(_) => None,
330            Self::Assigned(partition_map) => Some(partition_map.partition_map_string()),
331            Self::Mirror(mirror) => Some(mirror.as_partition_maps().partition_map_string()),
332        }
333    }
334
335    // -----------------------------------
336    //  Parameter validation
337    // -----------------------------------
338
339    /// Validate partitions
340    pub fn valid_partition(partitions: &PartitionCount) -> Result<()> {
341        if *partitions == 0 {
342            return Err(anyhow!("partition must be greater than 0"));
343        }
344
345        Ok(())
346    }
347
348    /// Validate replication factor
349    pub fn valid_replication_factor(replication: &ReplicationFactor) -> Result<()> {
350        if *replication == 0 {
351            return Err(anyhow!("replication factor must be greater than 0"));
352        }
353
354        Ok(())
355    }
356}
357
358/// Topic param
359#[derive(Debug, Clone, Default, Eq, PartialEq, Encoder, Decoder)]
360#[cfg_attr(
361    feature = "use_serde",
362    derive(serde::Serialize, serde::Deserialize),
363    serde(rename_all = "camelCase")
364)]
365pub struct TopicReplicaParam {
366    #[cfg_attr(feature = "use_serde", serde(default = "default_count"))]
367    pub partitions: PartitionCount,
368    #[cfg_attr(feature = "use_serde", serde(default = "default_count"))]
369    pub replication_factor: ReplicationFactor,
370    #[cfg_attr(
371        feature = "use_serde",
372        serde(skip_serializing_if = "bool::clone", default)
373    )]
374    pub ignore_rack_assignment: IgnoreRackAssignment,
375}
376
377#[allow(dead_code)]
378fn default_count() -> u32 {
379    1
380}
381
382impl TopicReplicaParam {
383    pub fn new(
384        partitions: PartitionCount,
385        replication_factor: ReplicationFactor,
386        ignore_rack_assignment: IgnoreRackAssignment,
387    ) -> Self {
388        Self {
389            partitions,
390            replication_factor,
391            ignore_rack_assignment,
392        }
393    }
394}
395
396impl From<(PartitionCount, ReplicationFactor, IgnoreRackAssignment)> for TopicReplicaParam {
397    fn from(value: (PartitionCount, ReplicationFactor, IgnoreRackAssignment)) -> Self {
398        let (partitions, replication_factor, ignore_rack) = value;
399        Self::new(partitions, replication_factor, ignore_rack)
400    }
401}
402
403impl std::fmt::Display for TopicReplicaParam {
404    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
405        write!(
406            f,
407            "replica param::(p:{}, r:{})",
408            self.partitions, self.replication_factor
409        )
410    }
411}
412
413/// Hack: field instead of new type to get around encode and decode limitations
414#[derive(Debug, Default, Clone, Eq, PartialEq, Encoder, Decoder)]
415#[cfg_attr(
416    feature = "use_serde",
417    derive(serde::Serialize, serde::Deserialize),
418    serde(transparent)
419)]
420pub struct PartitionMaps(Vec<PartitionMap>);
421
422impl From<Vec<PartitionMap>> for PartitionMaps {
423    fn from(maps: Vec<PartitionMap>) -> Self {
424        Self(maps)
425    }
426}
427
428impl From<PartitionMaps> for Vec<PartitionMap> {
429    fn from(maps: PartitionMaps) -> Self {
430        maps.0
431    }
432}
433
434impl From<Vec<(PartitionId, Vec<SpuId>)>> for PartitionMaps {
435    fn from(partition_vec: Vec<(PartitionId, Vec<SpuId>)>) -> Self {
436        let maps: Vec<PartitionMap> = partition_vec
437            .into_iter()
438            .map(|(id, replicas)| PartitionMap {
439                id,
440                replicas,
441                mirror: None,
442            })
443            .collect();
444        maps.into()
445    }
446}
447
448impl std::fmt::Display for PartitionMaps {
449    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
450        write!(f, "partition map:{})", self.0.len())
451    }
452}
453
454impl PartitionMaps {
455    pub fn maps(&self) -> &Vec<PartitionMap> {
456        &self.0
457    }
458
459    pub fn maps_owned(self) -> Vec<PartitionMap> {
460        self.0
461    }
462
463    fn partition_count(&self) -> PartitionCount {
464        self.0.len() as PartitionCount
465    }
466
467    fn replication_factor(&self) -> Option<ReplicationFactor> {
468        // compute replication form replica map
469        self.0
470            .first()
471            .map(|partition| partition.replicas.len() as ReplicationFactor)
472    }
473
474    fn partition_map_string(&self) -> String {
475        use std::fmt::Write;
476
477        let mut res = String::new();
478        for partition in &self.0 {
479            write!(res, "{}:{:?}, ", partition.id, partition.replicas).unwrap();
480            // ok to unwrap since this will not fail
481        }
482        if !res.is_empty() {
483            res.truncate(res.len() - 2);
484        }
485        res
486    }
487
488    // -----------------------------------
489    // Partition Map - Operations
490    // -----------------------------------
491
492    /// Generate a vector with all spu ids represented by all partitions (no duplicates)
493    pub fn unique_spus_in_partition_map(&self) -> Vec<SpuId> {
494        let mut spu_ids: Vec<SpuId> = vec![];
495
496        for partition in &self.0 {
497            for spu in &partition.replicas {
498                if !spu_ids.contains(spu) {
499                    spu_ids.push(*spu);
500                }
501            }
502        }
503
504        spu_ids
505    }
506
507    #[allow(clippy::explicit_counter_loop)]
508    /// Validate partition map for assigned topics
509    pub fn validate(&self) -> Result<()> {
510        // there must be at least one partition in the partition map
511        if self.0.is_empty() {
512            return Err(anyhow!("no assigned partitions found"));
513        }
514
515        // assigned partitions must meet the following criteria
516        //  ids:
517        //      - must start with 0
518        //      - must be in sequence, without gaps
519        //  replicas:
520        //      - must have at least one element
521        //      - all replicas must have the same number of elements.
522        //      - all elements must be unique
523        //      - all elements must be positive integers
524        let mut id = 0;
525        let mut replica_len = 0;
526        for partition in &self.0 {
527            if id == 0 {
528                // id must be 0
529                if partition.id != id {
530                    return Err(anyhow!("assigned partitions must start with id 0",));
531                }
532
533                // replica must have elements
534                replica_len = partition.replicas.len();
535                if replica_len == 0 {
536                    return Err(anyhow!("assigned replicas must have at least one spu id",));
537                }
538            } else {
539                // id must be in sequence
540                if partition.id != id {
541                    return Err(anyhow!(
542                        "assigned partition ids must be in sequence and without gaps"
543                    ));
544                }
545
546                // replica must have same number of elements as previous one
547                if partition.replicas.len() != replica_len {
548                    return Err(anyhow!(
549                        "all assigned replicas must have the same number of spu ids: {replica_len}"
550                    ));
551                }
552            }
553
554            // all replica ids must be unique
555            let mut sorted_replicas = partition.replicas.clone();
556            sorted_replicas.sort_unstable();
557            let unique_count = 1 + sorted_replicas
558                .windows(2)
559                .filter(|pair| pair[0] != pair[1])
560                .count();
561            if partition.replicas.len() != unique_count {
562                return Err(anyhow!(format!(
563                    "duplicate spu ids found in assigned partition with id: {id}"
564                ),));
565            }
566
567            // all ids must be positive numbers
568            for spu_id in &partition.replicas {
569                if *spu_id < 0 {
570                    return Err(anyhow!(
571                        "invalid spu id: {spu_id} in assigned partition with id: {id}"
572                    ));
573                }
574            }
575
576            // increment id for next iteration
577            id += 1;
578        }
579
580        Ok(())
581    }
582}
583
584impl From<(PartitionCount, ReplicationFactor, IgnoreRackAssignment)> for TopicSpec {
585    fn from(spec: (PartitionCount, ReplicationFactor, IgnoreRackAssignment)) -> Self {
586        let (count, factor, rack) = spec;
587        Self::new_computed(count, factor, Some(rack))
588    }
589}
590
591/// convert from tuple with partition and replication with rack off
592impl From<(PartitionCount, ReplicationFactor)> for TopicSpec {
593    fn from(spec: (PartitionCount, ReplicationFactor)) -> Self {
594        let (count, factor) = spec;
595        Self::new_computed(count, factor, Some(false))
596    }
597}
598
599#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
600#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
601pub struct PartitionMap {
602    pub id: PartitionId,
603    pub replicas: Vec<SpuId>,
604    #[cfg_attr(
605        feature = "use_serde",
606        serde(default, skip_serializing_if = "Option::is_none")
607    )]
608    #[fluvio(min_version = 14)]
609    pub mirror: Option<PartitionMirrorConfig>,
610}
611
612#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
613#[cfg_attr(
614    feature = "use_serde",
615    derive(serde::Serialize, serde::Deserialize),
616    serde(rename_all = "camelCase")
617)]
618pub enum MirrorConfig {
619    #[fluvio(tag = 0)]
620    Remote(RemoteMirrorConfig),
621    #[fluvio(tag = 1)]
622    Home(HomeMirrorConfig),
623}
624
625impl Default for MirrorConfig {
626    fn default() -> Self {
627        Self::Remote(RemoteMirrorConfig::default())
628    }
629}
630
631impl std::fmt::Display for MirrorConfig {
632    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
633        match self {
634            MirrorConfig::Remote(r) => {
635                write!(f, "Mirror Remote {:?} ", r)
636            }
637            MirrorConfig::Home(h) => {
638                write!(f, "Mirror Home {:?} ", h)
639            }
640        }
641    }
642}
643
644impl MirrorConfig {
645    pub fn partition_count(&self) -> PartitionCount {
646        match self {
647            MirrorConfig::Remote(src) => src.partition_count(),
648            MirrorConfig::Home(tg) => tg.partition_count(),
649        }
650    }
651
652    pub fn replication_factor(&self) -> Option<ReplicationFactor> {
653        None
654    }
655
656    pub fn as_partition_maps(&self) -> PartitionMaps {
657        match self {
658            MirrorConfig::Remote(src) => src.as_partition_maps(),
659            MirrorConfig::Home(tg) => tg.as_partition_maps(),
660        }
661    }
662
663    /// Set home to remote replication
664    pub fn set_home_to_remote(&mut self, home_to_remote: bool) -> Result<()> {
665        match self {
666            Self::Remote(_) => Err(anyhow!(
667                "remote mirror config cannot be set to home to remote"
668            )),
669            Self::Home(home) => {
670                home.set_home_to_remote(home_to_remote);
671                Ok(())
672            }
673        }
674    }
675
676    /// Validate partition map for assigned topics
677    pub fn validate(&self) -> anyhow::Result<()> {
678        Ok(())
679    }
680}
681
682type Partitions = Vec<HomePartitionConfig>;
683
684#[cfg_attr(
685    feature = "use_serde",
686    derive(serde::Serialize),
687    serde(rename_all = "camelCase", untagged)
688)]
689enum MultiHome {
690    V1(Partitions),
691    V2(HomeMirrorInner),
692}
693
694#[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)]
695#[cfg_attr(
696    feature = "use_serde",
697    derive(serde::Serialize, serde::Deserialize),
698    serde(rename_all = "camelCase")
699)]
700pub struct HomeMirrorConfig(
701    #[cfg_attr(feature = "use_serde", serde(deserialize_with = "from_home_v1"))] HomeMirrorInner,
702);
703
704impl Deref for HomeMirrorConfig {
705    type Target = HomeMirrorInner;
706
707    fn deref(&self) -> &Self::Target {
708        &self.0
709    }
710}
711
712impl DerefMut for HomeMirrorConfig {
713    fn deref_mut(&mut self) -> &mut Self::Target {
714        &mut self.0
715    }
716}
717
718impl HomeMirrorConfig {
719    /// generate home config from simple mirror cluster list
720    /// this uses home topic to generate remote replicas
721    pub fn from_simple(topic: &str, remote_clusters: Vec<String>) -> Self {
722        Self(HomeMirrorInner {
723            partitions: remote_clusters
724                .into_iter()
725                .map(|remote_cluster| HomePartitionConfig {
726                    remote_cluster,
727                    remote_replica: { ReplicaKey::new(topic, 0_u32).to_string() },
728                    ..Default::default()
729                })
730                .collect(),
731            source: false,
732        })
733    }
734}
735
736cfg_if::cfg_if! {
737    if #[cfg(feature = "use_serde")] {
738        impl<'de> serde::Deserialize<'de> for MultiHome {
739            fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
740            where
741                D: serde::Deserializer<'de>,
742            {
743                struct MultiHomeVisitor;
744
745                impl<'de> serde::de::Visitor<'de> for MultiHomeVisitor {
746                    type Value = MultiHome;
747
748                    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
749                        formatter.write_str("an array or an object")
750                    }
751
752                    fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
753                    where
754                        A: serde::de::SeqAccess<'de>,
755                    {
756                        let mut elements = vec![];
757                        while let Some(value) = seq.next_element::<HomePartitionConfig>()? {
758                            elements.push(value);
759                        }
760                        Ok(MultiHome::V1(elements))
761                    }
762
763                    fn visit_map<M>(self, map: M) -> Result<Self::Value, M::Error>
764                    where
765                        M: serde::de::MapAccess<'de>,
766                    {
767                        use serde::de::value::MapAccessDeserializer;
768                        let obj: HomeMirrorInner = serde::Deserialize::deserialize(MapAccessDeserializer::new(map))?;
769                        Ok(MultiHome::V2(obj))
770                    }
771                }
772
773                deserializer.deserialize_any(MultiHomeVisitor)
774            }
775        }
776
777        fn from_home_v1<'de, D>(deserializer: D) -> Result<HomeMirrorInner, D::Error>
778        where
779            D: serde::Deserializer<'de>,
780        {
781            let home: MultiHome = serde::Deserialize::deserialize(deserializer)?;
782            match home {
783                MultiHome::V1(v1) => Ok(HomeMirrorInner {
784                    partitions: v1,
785                    source: false,
786                }),
787                MultiHome::V2(v2) => Ok(v2),
788            }
789        }
790    }
791}
792
793#[derive(Default, Debug, Clone, Eq, PartialEq, Decoder, Encoder)]
794#[cfg_attr(
795    feature = "use_serde",
796    derive(serde::Serialize, serde::Deserialize),
797    serde(rename_all = "camelCase")
798)]
799pub struct HomeMirrorInner {
800    #[cfg_attr(feature = "use_serde", serde(default))]
801    pub partitions: Vec<HomePartitionConfig>,
802    #[cfg_attr(
803        feature = "use_serde",
804        serde(skip_serializing_if = "crate::is_false", default)
805    )]
806    #[fluvio(min_version = 18)]
807    pub source: bool, // source of mirror
808}
809
810impl From<Vec<HomePartitionConfig>> for HomeMirrorConfig {
811    fn from(partitions: Vec<HomePartitionConfig>) -> Self {
812        Self(HomeMirrorInner {
813            partitions,
814            source: false,
815        })
816    }
817}
818
819impl HomeMirrorInner {
820    pub fn partition_count(&self) -> PartitionCount {
821        self.partitions.len() as PartitionCount
822    }
823
824    pub fn replication_factor(&self) -> Option<ReplicationFactor> {
825        None
826    }
827
828    pub fn partitions(&self) -> &Vec<HomePartitionConfig> {
829        &self.partitions
830    }
831
832    pub fn as_partition_maps(&self) -> PartitionMaps {
833        let mut maps = vec![];
834        for (partition_id, home_partition) in self.partitions.iter().enumerate() {
835            maps.push(PartitionMap {
836                id: partition_id as u32,
837                mirror: Some(PartitionMirrorConfig::Home(home_partition.clone())),
838                ..Default::default()
839            });
840        }
841        maps.into()
842    }
843
844    /// Validate partition map for assigned topics
845    pub fn validate(&self) -> anyhow::Result<()> {
846        Ok(())
847    }
848
849    /// Add partition to home mirror config
850    pub fn add_partition(&mut self, partition: HomePartitionConfig) {
851        self.partitions.push(partition);
852    }
853
854    /// set home to remote replication
855    pub fn set_home_to_remote(&mut self, home_to_remote: bool) {
856        self.source = home_to_remote;
857        self.partitions.iter_mut().for_each(|partition| {
858            partition.source = home_to_remote;
859        });
860    }
861}
862
863#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
864#[cfg_attr(
865    feature = "use_serde",
866    derive(serde::Serialize, serde::Deserialize),
867    serde(rename_all = "camelCase")
868)]
869pub struct HomeMirrorPartition {
870    pub remote_clusters: Vec<String>,
871}
872
873#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
874#[cfg_attr(
875    feature = "use_serde",
876    derive(serde::Serialize, serde::Deserialize),
877    serde(rename_all = "camelCase")
878)]
879pub struct RemoteMirrorConfig {
880    // source of mirror
881    pub home_cluster: String,
882    pub home_spus: Vec<SpuMirrorConfig>,
883    #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "crate::is_false"))]
884    #[fluvio(min_version = 18)]
885    pub target: bool,
886}
887
888#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
889#[cfg_attr(
890    feature = "use_serde",
891    derive(serde::Serialize, serde::Deserialize),
892    serde(rename_all = "camelCase")
893)]
894pub struct SpuMirrorConfig {
895    pub id: SpuId,
896    pub key: String,
897    pub endpoint: String,
898}
899
900impl RemoteMirrorConfig {
901    pub fn partition_count(&self) -> PartitionCount {
902        self.home_spus.len() as PartitionCount
903    }
904
905    pub fn replication_factor(&self) -> Option<ReplicationFactor> {
906        None
907    }
908
909    pub fn spus(&self) -> &Vec<SpuMirrorConfig> {
910        &self.home_spus
911    }
912
913    pub fn as_partition_maps(&self) -> PartitionMaps {
914        let mut maps = vec![];
915        for (partition_id, home_spu) in self.home_spus.iter().enumerate() {
916            maps.push(PartitionMap {
917                id: partition_id as u32,
918                mirror: Some(PartitionMirrorConfig::Remote(RemotePartitionConfig {
919                    home_spu_key: home_spu.key.clone(),
920                    home_spu_id: home_spu.id,
921                    home_cluster: self.home_cluster.clone(),
922                    home_spu_endpoint: home_spu.endpoint.clone(),
923                    target: self.target,
924                })),
925                ..Default::default()
926            });
927        }
928        maps.into()
929    }
930
931    /// Validate partition map for assigned topics
932    pub fn validate(&self) -> anyhow::Result<()> {
933        Ok(())
934    }
935}
936
937#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
938#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
939pub enum CleanupPolicy {
940    #[cfg_attr(feature = "use_serde", serde(rename = "segment"))]
941    #[fluvio(tag = 0)]
942    Segment(SegmentBasedPolicy),
943}
944
945impl Default for CleanupPolicy {
946    fn default() -> Self {
947        CleanupPolicy::Segment(SegmentBasedPolicy::default())
948    }
949}
950
951impl CleanupPolicy {
952    pub fn retention_secs(&self) -> u32 {
953        match self {
954            CleanupPolicy::Segment(policy) => policy.retention_secs(),
955        }
956    }
957}
958
959#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
960#[cfg_attr(
961    feature = "use_serde",
962    derive(serde::Serialize, serde::Deserialize),
963    serde(rename_all = "camelCase")
964)]
965pub struct SegmentBasedPolicy {
966    pub time_in_seconds: u32,
967}
968
969impl SegmentBasedPolicy {
970    pub fn retention_secs(&self) -> u32 {
971        self.time_in_seconds
972    }
973}
974
975#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
976#[cfg_attr(
977    feature = "use_serde",
978    derive(serde::Serialize, serde::Deserialize),
979    serde(rename_all = "camelCase")
980)]
981pub struct TopicStorageConfig {
982    pub segment_size: Option<u32>,       // segment size
983    pub max_partition_size: Option<u64>, // max partition size
984}
985
986#[derive(Decoder, Default, Encoder, Debug, Clone, Eq, PartialEq)]
987#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
988pub enum CompressionAlgorithm {
989    #[fluvio(tag = 0)]
990    None,
991    #[fluvio(tag = 1)]
992    Gzip,
993    #[fluvio(tag = 2)]
994    Snappy,
995    #[fluvio(tag = 3)]
996    Lz4,
997    #[default]
998    #[fluvio(tag = 4)]
999    Any,
1000    #[fluvio(tag = 5)]
1001    Zstd,
1002}
1003
1004#[derive(Debug, thiserror::Error)]
1005#[error("Invalid compression type in topic")]
1006pub struct InvalidCompressionAlgorithm;
1007
1008impl std::str::FromStr for CompressionAlgorithm {
1009    type Err = InvalidCompressionAlgorithm;
1010
1011    fn from_str(s: &str) -> Result<Self, Self::Err> {
1012        match s.to_lowercase().as_str() {
1013            "none" => Ok(CompressionAlgorithm::None),
1014            "gzip" => Ok(CompressionAlgorithm::Gzip),
1015            "snappy" => Ok(CompressionAlgorithm::Snappy),
1016            "lz4" => Ok(CompressionAlgorithm::Lz4),
1017            "any" => Ok(CompressionAlgorithm::Any),
1018            "zstd" => Ok(CompressionAlgorithm::Zstd),
1019            _ => Err(InvalidCompressionAlgorithm),
1020        }
1021    }
1022}
1023impl std::fmt::Display for CompressionAlgorithm {
1024    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
1025        match self {
1026            Self::None => write!(f, "none"),
1027            Self::Gzip => write!(f, "gzip"),
1028            Self::Snappy => write!(f, "snappy"),
1029            Self::Lz4 => write!(f, "lz4"),
1030            Self::Any => write!(f, "any"),
1031            Self::Zstd => write!(f, "zstd"),
1032        }
1033    }
1034}
1035
1036#[cfg(test)]
1037mod test {
1038
1039    use std::io::Cursor;
1040
1041    use crate::topic::{Bounds, Filter, Transform};
1042
1043    use super::*;
1044
1045    #[test]
1046    fn test_is_computed_topic() {
1047        let p1: PartitionMaps = vec![(1, vec![0]), (2, vec![2])].into();
1048        let t1 = ReplicaSpec::new_assigned(p1);
1049        assert!(!t1.is_computed());
1050
1051        let t2 = ReplicaSpec::new_computed(0, 0, None);
1052        assert!(t2.is_computed());
1053    }
1054
1055    #[test]
1056    fn test_valid_computed_replica_params() {
1057        // 0 is not a valid partition
1058        let t2_result = ReplicaSpec::valid_partition(&0);
1059        assert!(t2_result.is_err());
1060        assert_eq!(
1061            format!("{}", t2_result.unwrap_err()),
1062            "partition must be greater than 0"
1063        );
1064
1065        let t3_result = ReplicaSpec::valid_partition(&1);
1066        assert!(t3_result.is_ok());
1067
1068        // 0 is not a valid replication factor
1069        let t5_result = ReplicaSpec::valid_replication_factor(&0);
1070        assert!(t5_result.is_err());
1071        assert_eq!(
1072            format!("{}", t5_result.unwrap_err()),
1073            "replication factor must be greater than 0"
1074        );
1075
1076        // positive numbers are OK
1077        let t6_result = ReplicaSpec::valid_replication_factor(&1);
1078        assert!(t6_result.is_ok());
1079    }
1080
1081    //  Replica Map ids:
1082    //      - must start with 0
1083    //      - must be in sequence, without gaps
1084    #[test]
1085    fn test_replica_map_ids() {
1086        // id starts from 1 rather than 0
1087        let p1: PartitionMaps = vec![(1, vec![0]), (2, vec![2])].into();
1088        let p1_result = p1.validate();
1089        assert!(p1_result.is_err());
1090        assert_eq!(
1091            format!("{}", p1_result.unwrap_err()),
1092            "assigned partitions must start with id 0"
1093        );
1094
1095        // id has a gap
1096        let p2: PartitionMaps = vec![(0, vec![0]), (2, vec![2])].into();
1097        let p2_result = p2.validate();
1098        assert!(p2_result.is_err());
1099        assert_eq!(
1100            format!("{}", p2_result.unwrap_err()),
1101            "assigned partition ids must be in sequence and without gaps"
1102        );
1103
1104        // ids are out of sequence
1105        let p3: PartitionMaps = vec![(0, vec![0]), (2, vec![2]), (1, vec![1])].into();
1106        let p3_result = p3.validate();
1107        assert!(p3_result.is_err());
1108        assert_eq!(
1109            format!("{}", p3_result.unwrap_err()),
1110            "assigned partition ids must be in sequence and without gaps"
1111        );
1112
1113        // duplicate ids
1114        let p4: PartitionMaps = vec![(0, vec![0]), (1, vec![1]), (1, vec![1])].into();
1115        let p4_result = p4.validate();
1116        assert!(p4_result.is_err());
1117        assert_eq!(
1118            format!("{}", p4_result.unwrap_err()),
1119            "assigned partition ids must be in sequence and without gaps"
1120        );
1121
1122        // ids are ok
1123        let p5: PartitionMaps = vec![(0, vec![1]), (1, vec![1]), (2, vec![2])].into();
1124        let p5_result = p5.validate();
1125        assert!(p5_result.is_ok());
1126    }
1127
1128    //  Replica Map replicas:
1129    //      - replicas must have at least one element
1130    //      - all replicas must have the same number of elements
1131    //      - all elements must be unique
1132    //      - all elements must be positive integers
1133    #[test]
1134    fn test_replica_map_spu_ids() {
1135        // replicas must have at least one element
1136        let p1: PartitionMaps = vec![(0, vec![]), (1, vec![1])].into();
1137        let p1_result = p1.validate();
1138        assert!(p1_result.is_err());
1139        assert_eq!(
1140            format!("{}", p1_result.unwrap_err()),
1141            "assigned replicas must have at least one spu id"
1142        );
1143
1144        // all replicas must have the same number of elements
1145        let p2: PartitionMaps = vec![(0, vec![1, 2]), (1, vec![1])].into();
1146        let p2_result = p2.validate();
1147        assert!(p2_result.is_err());
1148        assert_eq!(
1149            format!("{}", p2_result.unwrap_err()),
1150            "all assigned replicas must have the same number of spu ids: 2"
1151        );
1152
1153        // all elements must be unique
1154        let p3: PartitionMaps = vec![(0, vec![1, 2]), (1, vec![1, 1])].into();
1155        let p3_result = p3.validate();
1156        assert!(p3_result.is_err());
1157        assert_eq!(
1158            format!("{}", p3_result.unwrap_err()),
1159            "duplicate spu ids found in assigned partition with id: 1"
1160        );
1161
1162        // all elements must be unique
1163        let p4: PartitionMaps = vec![(0, vec![3, 1, 2, 3])].into();
1164        let p4_result = p4.validate();
1165        assert!(p4_result.is_err());
1166        assert_eq!(
1167            format!("{}", p4_result.unwrap_err()),
1168            "duplicate spu ids found in assigned partition with id: 0"
1169        );
1170
1171        // all elements must be positive integers
1172        let p5: PartitionMaps = vec![(0, vec![1, 2]), (1, vec![1, -2])].into();
1173        let p5_result = p5.validate();
1174        assert!(p5_result.is_err());
1175        assert_eq!(
1176            format!("{}", p5_result.unwrap_err()),
1177            "invalid spu id: -2 in assigned partition with id: 1"
1178        );
1179    }
1180
1181    // Partitions repeatedly reference spu-ids. The purpose of
1182    // this API is to return a list of all unique SPUs
1183    #[test]
1184    fn test_unique_spus_in_partition_map() {
1185        // id starts from 1 rather than 0
1186        let p1: PartitionMaps =
1187            vec![(0, vec![0, 1, 3]), (1, vec![0, 2, 3]), (2, vec![1, 3, 4])].into();
1188
1189        let p1_result = p1.unique_spus_in_partition_map();
1190        let expected_p1_result: Vec<SpuId> = vec![0, 1, 3, 2, 4];
1191        assert_eq!(p1_result, expected_p1_result);
1192    }
1193
1194    #[test]
1195    fn test_encode_decode_computed_topic_spec() {
1196        let topic_spec = ReplicaSpec::Computed((2, 3, true).into());
1197        let mut dest = vec![];
1198
1199        // test encode
1200        let result = topic_spec.encode(&mut dest, 0);
1201        assert!(result.is_ok());
1202
1203        let expected_dest = [
1204            0x01, // type
1205            0x00, 0x00, 0x00, 0x02, // partition cnt
1206            0x00, 0x00, 0x00, 0x03, // replica cnt
1207            0x01, // ignore_rack_assignment
1208        ];
1209        assert_eq!(dest, expected_dest);
1210
1211        // test encode
1212        let mut topic_spec_decoded = ReplicaSpec::default();
1213        let result = topic_spec_decoded.decode(&mut Cursor::new(&expected_dest), 0);
1214        assert!(result.is_ok());
1215
1216        match topic_spec_decoded {
1217            ReplicaSpec::Computed(param) => {
1218                assert_eq!(param.partitions, 2);
1219                assert_eq!(param.replication_factor, 3);
1220                assert!(param.ignore_rack_assignment);
1221            }
1222            _ => panic!("expect computed topic spec, found {topic_spec_decoded:?}"),
1223        }
1224    }
1225
1226    #[test]
1227    fn test_topic_with_dedup_prev_version_compatibility() {
1228        //given
1229        let prev_version = 11;
1230        let mut topic_spec: TopicSpec = ReplicaSpec::Computed((2, 3, true).into()).into();
1231        topic_spec.set_deduplication(Some(Deduplication {
1232            bounds: Bounds {
1233                count: 1,
1234                age: None,
1235            },
1236            filter: Filter {
1237                transform: Transform {
1238                    uses: "filter".to_string(),
1239                    ..Default::default()
1240                },
1241            },
1242        }));
1243
1244        //when
1245        let mut dest = vec![];
1246        topic_spec.encode(&mut dest, prev_version).expect("encoded");
1247        let mut topic_spec_decoded = TopicSpec::default();
1248        topic_spec_decoded
1249            .decode(&mut Cursor::new(&dest), prev_version)
1250            .expect("decoded");
1251
1252        //then
1253        assert!(topic_spec_decoded.deduplication.is_none());
1254    }
1255
1256    #[test]
1257    fn test_partition_map_str() {
1258        // Test multiple
1259        let p1: PartitionMaps =
1260            vec![(0, vec![0, 1, 3]), (1, vec![0, 2, 3]), (2, vec![1, 3, 4])].into();
1261        let spec = ReplicaSpec::new_assigned(p1);
1262        assert_eq!(
1263            spec.partition_map_str(),
1264            Some("0:[0, 1, 3], 1:[0, 2, 3], 2:[1, 3, 4]".to_string())
1265        );
1266
1267        // Test empty
1268        let p2 = PartitionMaps::default();
1269        let spec2 = ReplicaSpec::new_assigned(p2);
1270        assert_eq!(spec2.partition_map_str(), Some("".to_string()));
1271    }
1272
1273    #[test]
1274    fn test_deserialize_home_mirror_config() {
1275        let data = r#"{"partitions":[{"remoteCluster":"boat1","remoteReplica":"boats-0","source":false},{"remoteCluster":"boat2","remoteReplica":"boats-0","source":false}]}"#;
1276        let home_mirror: HomeMirrorConfig = serde_json::from_str(data).expect("deserialize");
1277        assert_eq!(home_mirror.partitions().len(), 2);
1278    }
1279}
1280
1281#[cfg(test)]
1282mod mirror_test {
1283    use crate::{
1284        topic::{PartitionMap, HomeMirrorConfig},
1285        partition::{PartitionMirrorConfig, HomePartitionConfig},
1286    };
1287
1288    /// test generating home mirror config from simple array of remote cluster strings
1289    #[test]
1290    fn test_home_mirror_conversion() {
1291        let mirror =
1292            HomeMirrorConfig::from_simple("boats", vec!["boat1".to_owned(), "boat2".to_owned()]);
1293        assert_eq!(
1294            mirror.as_partition_maps(),
1295            vec![
1296                PartitionMap {
1297                    id: 0,
1298                    mirror: Some(PartitionMirrorConfig::Home(HomePartitionConfig {
1299                        remote_replica: "boats-0".to_string(),
1300                        remote_cluster: "boat1".to_owned(),
1301                        ..Default::default()
1302                    })),
1303                    ..Default::default()
1304                },
1305                PartitionMap {
1306                    id: 1,
1307                    mirror: Some(PartitionMirrorConfig::Home(HomePartitionConfig {
1308                        remote_replica: "boats-0".to_string(),
1309                        remote_cluster: "boat2".to_string(),
1310                        ..Default::default()
1311                    })),
1312                    replicas: vec![],
1313                },
1314            ]
1315            .into()
1316        );
1317    }
1318}