1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
#![allow(clippy::assign_op_pattern)]

//!
//! # Partition Spec
//!
//!
use fluvio_types::SpuId;
use fluvio_protocol::{Encoder, Decoder};

use crate::topic::{CleanupPolicy, CompressionAlgorithm, Deduplication, TopicSpec, TopicStorageConfig};

/// Spec for Partition
/// Each partition has replicas spread among SPU
/// one of replica is leader which is duplicated in the leader field
#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
#[cfg_attr(
    feature = "use_serde",
    derive(serde::Serialize, serde::Deserialize),
    serde(rename_all = "camelCase")
)]
pub struct PartitionSpec {
    pub leader: SpuId,
    #[cfg_attr(feature = "use_serde", serde(default))]
    pub replicas: Vec<SpuId>,
    #[fluvio(min_version = 4)]
    pub cleanup_policy: Option<CleanupPolicy>,
    #[fluvio(min_version = 4)]
    pub storage: Option<TopicStorageConfig>,
    #[cfg_attr(feature = "use_serde", serde(default))]
    #[fluvio(min_version = 6)]
    pub compression_type: CompressionAlgorithm,
    #[fluvio(min_version = 12)]
    pub deduplication: Option<Deduplication>,
    #[cfg_attr(feature = "use_serde", serde(default))]
    #[fluvio(min_version = 13)]
    pub system: bool,
    #[cfg_attr(feature = "use_serde", serde(default))]
    #[fluvio(min_version = 14)]
    pub mirror: Option<PartitionMirrorConfig>,
}

impl PartitionSpec {
    pub fn new(leader: SpuId, replicas: Vec<SpuId>) -> Self {
        Self {
            leader,
            replicas,
            ..Default::default()
        }
    }

    /// Create new partition spec from replica mapping with topic spec. This assume first replica is leader
    pub fn from_replicas(
        replicas: Vec<SpuId>,
        topic: &TopicSpec,
        mirror: Option<&PartitionMirrorConfig>,
    ) -> Self {
        let leader = if replicas.is_empty() { 0 } else { replicas[0] };

        Self {
            leader,
            replicas,
            mirror: mirror.cloned(),
            cleanup_policy: topic.get_clean_policy().cloned(),
            storage: topic.get_storage().cloned(),
            compression_type: topic.get_compression_type().clone(),
            deduplication: topic.get_deduplication().cloned(),
            system: topic.is_system(),
        }
    }

    pub fn has_spu(&self, spu: &SpuId) -> bool {
        self.replicas.contains(spu)
    }

    /// follower replicas
    pub fn followers(&self) -> Vec<SpuId> {
        self.replicas
            .iter()
            .filter_map(|r| if r == &self.leader { None } else { Some(*r) })
            .collect()
    }

    pub fn mirror_string(&self) -> String {
        if let Some(mirror) = &self.mirror {
            mirror.external_cluster()
        } else {
            "".to_owned()
        }
    }
}

impl From<Vec<SpuId>> for PartitionSpec {
    fn from(replicas: Vec<SpuId>) -> Self {
        if !replicas.is_empty() {
            Self::new(replicas[0], replicas)
        } else {
            Self::new(0, replicas)
        }
    }
}

/// Setting applied to a replica
#[derive(Decoder, Encoder, Debug, Eq, PartialEq, Clone, Default)]
pub struct PartitionConfig {
    pub retention_time_seconds: Option<u32>,
}

#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
#[cfg_attr(
    feature = "use_serde",
    derive(serde::Serialize, serde::Deserialize),
    serde(rename_all = "camelCase")
)]
pub enum PartitionMirrorConfig {
    #[fluvio(tag = 0)]
    Remote(RemotePartitionConfig),
    #[fluvio(tag = 1)]
    Home(HomePartitionConfig),
}

impl Default for PartitionMirrorConfig {
    fn default() -> Self {
        Self::Remote(RemotePartitionConfig::default())
    }
}

impl PartitionMirrorConfig {
    pub fn remote(&self) -> Option<&RemotePartitionConfig> {
        match self {
            Self::Remote(e) => Some(e),
            _ => None,
        }
    }

    pub fn home(&self) -> Option<&HomePartitionConfig> {
        match self {
            Self::Home(c) => Some(c),
            _ => None,
        }
    }

    pub fn external_cluster(&self) -> String {
        match self {
            Self::Remote(r) => format!(
                "{}:{}:{}",
                r.home_cluster, r.home_spu_id, r.home_spu_endpoint
            ),
            Self::Home(h) => h.remote_cluster.clone(),
        }
    }
}

impl std::fmt::Display for PartitionMirrorConfig {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        match self {
            PartitionMirrorConfig::Remote(cfg) => write!(f, "{}", cfg),
            PartitionMirrorConfig::Home(cfg) => write!(f, "{}", cfg),
        }
    }
}

#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
#[cfg_attr(
    feature = "use_serde",
    derive(serde::Serialize, serde::Deserialize),
    serde(rename_all = "camelCase")
)]
pub struct HomePartitionConfig {
    pub remote_cluster: String,
    pub remote_replica: String,
}

impl std::fmt::Display for HomePartitionConfig {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "{}", self.remote_cluster)
    }
}

#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
#[cfg_attr(
    feature = "use_serde",
    derive(serde::Serialize, serde::Deserialize),
    serde(rename_all = "camelCase")
)]
pub struct RemotePartitionConfig {
    pub home_cluster: String,
    #[cfg_attr(feature = "use_serde", serde(default))]
    pub home_spu_id: SpuId,
    pub home_spu_endpoint: String,
}

impl std::fmt::Display for RemotePartitionConfig {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(
            f,
            "{}:{}:{}",
            self.home_cluster, self.home_spu_id, self.home_spu_endpoint
        )
    }
}