fluvio_controlplane_metadata/topic/
config.rs

1use std::time::Duration;
2
3use derive_builder::Builder;
4use fluvio_types::{ReplicationFactor, TopicName, PartitionCount, IgnoreRackAssignment};
5
6use crate::topic::{
7    ReplicaSpec, TopicReplicaParam, SegmentBasedPolicy, CleanupPolicy, TopicStorageConfig,
8};
9
10use super::{TopicSpec, PartitionMap, CompressionAlgorithm, deduplication::Deduplication};
11
12const DEFAULT_PARTITION_COUNT: PartitionCount = 1;
13const DEFAULT_REPLICATION_FACTOR: ReplicationFactor = 1;
14const DEFAULT_IGNORE_RACK_ASSIGMENT: IgnoreRackAssignment = false;
15const DEFAULT_VERSION: &str = "0.1.0";
16
17#[derive(Debug, Default, Builder, Clone, PartialEq, Eq)]
18#[cfg_attr(
19    feature = "use_serde",
20    derive(serde::Serialize, serde::Deserialize),
21    derive(schemars::JsonSchema),
22    serde(rename_all = "kebab-case")
23)]
24pub struct TopicConfig {
25    #[cfg_attr(feature = "use_serde", serde(default = "default_version"))]
26    #[builder(default = "default_version()")]
27    pub version: String,
28
29    pub meta: MetaConfig,
30
31    #[builder(default)]
32    #[cfg_attr(feature = "use_serde", serde(default))]
33    pub partition: PartitionConfig,
34
35    #[builder(default)]
36    #[cfg_attr(feature = "use_serde", serde(default))]
37    pub retention: RetentionConfig,
38
39    #[builder(default)]
40    #[cfg_attr(feature = "use_serde", serde(default))]
41    pub compression: CompressionConfig,
42
43    #[builder(default)]
44    #[cfg_attr(
45        feature = "use_serde",
46        serde(default, skip_serializing_if = "Option::is_none")
47    )]
48    pub deduplication: Option<Deduplication>,
49}
50
51#[derive(Debug, Default, Builder, Clone, PartialEq, Eq)]
52#[cfg_attr(
53    feature = "use_serde",
54    derive(serde::Serialize, serde::Deserialize),
55    derive(schemars::JsonSchema),
56    serde(rename_all = "kebab-case")
57)]
58pub struct MetaConfig {
59    pub name: TopicName,
60}
61
62#[derive(Debug, Builder, Clone, PartialEq, Eq)]
63#[cfg_attr(
64    feature = "use_serde",
65    derive(serde::Serialize, serde::Deserialize),
66    derive(schemars::JsonSchema),
67    serde(rename_all = "kebab-case")
68)]
69pub struct PartitionConfig {
70    #[cfg_attr(
71        feature = "use_serde",
72        serde(skip_serializing_if = "Option::is_none", default)
73    )]
74    pub count: Option<PartitionCount>,
75
76    #[cfg_attr(
77        feature = "use_serde",
78        serde(skip_serializing_if = "Option::is_none", default),
79        schemars(with = "String")
80    )]
81    pub max_size: Option<bytesize::ByteSize>,
82
83    #[cfg_attr(
84        feature = "use_serde",
85        serde(skip_serializing_if = "Option::is_none", default)
86    )]
87    pub replication: Option<ReplicationFactor>,
88
89    #[cfg_attr(
90        feature = "use_serde",
91        serde(skip_serializing_if = "Option::is_none", default)
92    )]
93    pub ignore_rack_assignment: Option<IgnoreRackAssignment>,
94
95    #[cfg_attr(
96        feature = "use_serde",
97        serde(skip_serializing_if = "Option::is_none", default)
98    )]
99    pub maps: Option<Vec<PartitionMap>>,
100}
101
102#[derive(Debug, Default, Builder, Clone, PartialEq, Eq)]
103#[cfg_attr(
104    feature = "use_serde",
105    derive(serde::Serialize, serde::Deserialize),
106    derive(schemars::JsonSchema),
107    serde(rename_all = "kebab-case")
108)]
109pub struct RetentionConfig {
110    #[cfg_attr(
111        feature = "use_serde",
112        serde(
113            skip_serializing_if = "Option::is_none",
114            with = "humantime_serde",
115            default
116        ),
117        schemars(with = "String")
118    )]
119    pub time: Option<Duration>,
120
121    #[cfg_attr(
122        feature = "use_serde",
123        serde(skip_serializing_if = "Option::is_none", default),
124        schemars(with = "Option::<String>")
125    )]
126    pub segment_size: Option<bytesize::ByteSize>,
127}
128
129#[derive(Debug, Default, Builder, Clone, PartialEq, Eq)]
130#[cfg_attr(
131    feature = "use_serde",
132    derive(serde::Serialize, serde::Deserialize),
133    derive(schemars::JsonSchema),
134    serde(rename_all = "kebab-case")
135)]
136pub struct CompressionConfig {
137    #[cfg_attr(feature = "use_serde", serde(rename = "type", default))]
138    pub type_: CompressionAlgorithm,
139}
140
141impl TopicConfig {
142    #[cfg(feature = "use_serde")]
143    pub fn from_file<P: AsRef<std::path::Path>>(path: P) -> anyhow::Result<Self> {
144        Ok(serde_yaml::from_reader(std::fs::File::open(path)?)?)
145    }
146}
147
148#[cfg(feature = "use_serde")]
149impl std::str::FromStr for TopicConfig {
150    type Err = anyhow::Error;
151
152    fn from_str(s: &str) -> Result<Self, Self::Err> {
153        Ok(serde_yaml::from_str(s)?)
154    }
155}
156
157impl Default for PartitionConfig {
158    fn default() -> Self {
159        Self {
160            count: Some(DEFAULT_PARTITION_COUNT),
161            replication: Some(DEFAULT_REPLICATION_FACTOR),
162            ignore_rack_assignment: Some(DEFAULT_IGNORE_RACK_ASSIGMENT),
163            max_size: Default::default(),
164            maps: Default::default(),
165        }
166    }
167}
168
169impl From<TopicConfig> for TopicSpec {
170    fn from(config: TopicConfig) -> Self {
171        let segment_size = config.retention.segment_size.map(|s| s.as_u64() as u32);
172        let max_partition_size = config.partition.max_size.map(|s| s.as_u64());
173
174        let replica_spec = match config.partition.maps {
175            Some(maps) => ReplicaSpec::Assigned(maps.into()),
176            None => ReplicaSpec::Computed(TopicReplicaParam {
177                partitions: config.partition.count.unwrap_or(DEFAULT_PARTITION_COUNT),
178                replication_factor: config
179                    .partition
180                    .replication
181                    .unwrap_or(DEFAULT_REPLICATION_FACTOR),
182                ignore_rack_assignment: config
183                    .partition
184                    .ignore_rack_assignment
185                    .unwrap_or(DEFAULT_IGNORE_RACK_ASSIGMENT),
186            }),
187        };
188        let mut topic_spec: TopicSpec = replica_spec.into();
189        if let Some(retention_time) = config.retention.time {
190            topic_spec.set_cleanup_policy(CleanupPolicy::Segment(SegmentBasedPolicy {
191                time_in_seconds: retention_time.as_secs() as u32,
192            }));
193        };
194
195        topic_spec.set_compression_type(config.compression.type_);
196        topic_spec.set_deduplication(config.deduplication);
197
198        if segment_size.is_some() || max_partition_size.is_some() {
199            topic_spec.set_storage(TopicStorageConfig {
200                segment_size,
201                max_partition_size,
202            });
203        }
204
205        topic_spec
206    }
207}
208
209fn default_version() -> String {
210    DEFAULT_VERSION.to_string()
211}
212
213#[cfg(test)]
214mod tests {
215    use crate::topic::deduplication::{Bounds, Filter, Transform};
216
217    use super::*;
218
219    #[cfg(feature = "use_serde")]
220    #[test]
221    fn test_topic_config_ser_de() {
222        //given
223        let input = r#"version: 0.1.1
224meta:
225  name: test_topic
226partition:
227  count: 3
228  max-size: 1.0 KB
229  replication: 2
230  ignore-rack-assignment: true
231  maps:
232  - id: 1
233    replicas:
234    - 1
235    - 2
236retention:
237  time: 2m
238  segment-size: 2.0 KB
239compression:
240  type: Lz4
241deduplication:
242  bounds:
243    count: 100
244    age: 1m
245  filter:
246    transform:
247      uses: fluvio/dedup-bloom-filter@0.1.0
248"#;
249
250        //when
251        use std::str::FromStr;
252
253        let deser = TopicConfig::from_str(input).expect("deserialized");
254        let ser = serde_yaml::to_string(&deser).expect("serialized");
255
256        //then
257        assert_eq!(input, ser);
258        assert_eq!(deser, test_config());
259    }
260
261    #[cfg(feature = "use_serde")]
262    #[test]
263    fn test_minimal_topic_config_ser_de() {
264        //given
265        let input = r#"meta:
266  name: test_topic
267"#;
268
269        //when
270        use std::str::FromStr;
271
272        let deser = TopicConfig::from_str(input).expect("deserialized");
273        let ser = serde_yaml::to_string(&deser).expect("serialized");
274
275        //then
276        assert_eq!(deser.version, DEFAULT_VERSION);
277        assert_eq!(deser.partition.count, Some(DEFAULT_PARTITION_COUNT));
278        assert_eq!(
279            deser.partition.replication,
280            Some(DEFAULT_REPLICATION_FACTOR)
281        );
282        assert_eq!(
283            deser.partition.ignore_rack_assignment,
284            Some(DEFAULT_IGNORE_RACK_ASSIGMENT)
285        );
286        assert_eq!(
287            ser,
288            r#"version: 0.1.0
289meta:
290  name: test_topic
291partition:
292  count: 1
293  replication: 1
294  ignore-rack-assignment: false
295retention: {}
296compression:
297  type: Any
298"#
299        );
300    }
301
302    #[test]
303    fn test_default_config_to_spec() {
304        //given
305        let config = TopicConfig::default();
306
307        //when
308        let spec: TopicSpec = config.into();
309
310        //then
311        assert_eq!(
312            spec,
313            TopicSpec::new_computed(
314                DEFAULT_PARTITION_COUNT,
315                DEFAULT_REPLICATION_FACTOR,
316                Some(DEFAULT_IGNORE_RACK_ASSIGMENT)
317            )
318        );
319    }
320
321    #[test]
322    fn test_full_config_to_spec() {
323        //given
324        let config = test_config();
325
326        //when
327        let spec: TopicSpec = config.into();
328
329        //then
330        let mut test_spec = TopicSpec::new_assigned(vec![PartitionMap {
331            id: 1,
332            replicas: vec![1, 2],
333            ..Default::default()
334        }]);
335        test_spec.set_cleanup_policy(CleanupPolicy::Segment(SegmentBasedPolicy {
336            time_in_seconds: 120,
337        }));
338        test_spec.set_compression_type(CompressionAlgorithm::Lz4);
339        test_spec.set_storage(TopicStorageConfig {
340            segment_size: Some(2000),
341            max_partition_size: Some(1000),
342        });
343        test_spec.set_deduplication(Some(test_deduplication()));
344
345        assert_eq!(spec, test_spec);
346    }
347
348    fn test_config() -> TopicConfig {
349        TopicConfig {
350            version: "0.1.1".to_string(),
351            meta: MetaConfig {
352                name: "test_topic".to_string(),
353            },
354            partition: PartitionConfig {
355                count: Some(3),
356                max_size: Some(bytesize::ByteSize(1000)),
357                replication: Some(2),
358                ignore_rack_assignment: Some(true),
359                maps: Some(vec![PartitionMap {
360                    id: 1,
361                    replicas: vec![1, 2],
362                    ..Default::default()
363                }]),
364            },
365            retention: RetentionConfig {
366                time: Some(Duration::from_secs(120)),
367                segment_size: Some(bytesize::ByteSize(2000)),
368            },
369            compression: CompressionConfig {
370                type_: CompressionAlgorithm::Lz4,
371            },
372            deduplication: Some(test_deduplication()),
373        }
374    }
375
376    fn test_deduplication() -> Deduplication {
377        Deduplication {
378            bounds: Bounds {
379                count: 100,
380                age: Some(Duration::from_secs(60)),
381            },
382            filter: Filter {
383                transform: Transform {
384                    uses: "fluvio/dedup-bloom-filter@0.1.0".to_string(),
385                    with: Default::default(),
386                },
387            },
388        }
389    }
390}