fluvio_controlplane_metadata/topic/
config.rs1use std::time::Duration;
2
3use derive_builder::Builder;
4use fluvio_types::{ReplicationFactor, TopicName, PartitionCount, IgnoreRackAssignment};
5
6use crate::topic::{
7 ReplicaSpec, TopicReplicaParam, SegmentBasedPolicy, CleanupPolicy, TopicStorageConfig,
8};
9
10use super::{TopicSpec, PartitionMap, CompressionAlgorithm, deduplication::Deduplication};
11
12const DEFAULT_PARTITION_COUNT: PartitionCount = 1;
13const DEFAULT_REPLICATION_FACTOR: ReplicationFactor = 1;
14const DEFAULT_IGNORE_RACK_ASSIGMENT: IgnoreRackAssignment = false;
15const DEFAULT_VERSION: &str = "0.1.0";
16
17#[derive(Debug, Default, Builder, Clone, PartialEq, Eq)]
18#[cfg_attr(
19 feature = "use_serde",
20 derive(serde::Serialize, serde::Deserialize),
21 derive(schemars::JsonSchema),
22 serde(rename_all = "kebab-case")
23)]
24pub struct TopicConfig {
25 #[cfg_attr(feature = "use_serde", serde(default = "default_version"))]
26 #[builder(default = "default_version()")]
27 pub version: String,
28
29 pub meta: MetaConfig,
30
31 #[builder(default)]
32 #[cfg_attr(feature = "use_serde", serde(default))]
33 pub partition: PartitionConfig,
34
35 #[builder(default)]
36 #[cfg_attr(feature = "use_serde", serde(default))]
37 pub retention: RetentionConfig,
38
39 #[builder(default)]
40 #[cfg_attr(feature = "use_serde", serde(default))]
41 pub compression: CompressionConfig,
42
43 #[builder(default)]
44 #[cfg_attr(
45 feature = "use_serde",
46 serde(default, skip_serializing_if = "Option::is_none")
47 )]
48 pub deduplication: Option<Deduplication>,
49}
50
51#[derive(Debug, Default, Builder, Clone, PartialEq, Eq)]
52#[cfg_attr(
53 feature = "use_serde",
54 derive(serde::Serialize, serde::Deserialize),
55 derive(schemars::JsonSchema),
56 serde(rename_all = "kebab-case")
57)]
58pub struct MetaConfig {
59 pub name: TopicName,
60}
61
62#[derive(Debug, Builder, Clone, PartialEq, Eq)]
63#[cfg_attr(
64 feature = "use_serde",
65 derive(serde::Serialize, serde::Deserialize),
66 derive(schemars::JsonSchema),
67 serde(rename_all = "kebab-case")
68)]
69pub struct PartitionConfig {
70 #[cfg_attr(
71 feature = "use_serde",
72 serde(skip_serializing_if = "Option::is_none", default)
73 )]
74 pub count: Option<PartitionCount>,
75
76 #[cfg_attr(
77 feature = "use_serde",
78 serde(skip_serializing_if = "Option::is_none", default),
79 schemars(with = "String")
80 )]
81 pub max_size: Option<bytesize::ByteSize>,
82
83 #[cfg_attr(
84 feature = "use_serde",
85 serde(skip_serializing_if = "Option::is_none", default)
86 )]
87 pub replication: Option<ReplicationFactor>,
88
89 #[cfg_attr(
90 feature = "use_serde",
91 serde(skip_serializing_if = "Option::is_none", default)
92 )]
93 pub ignore_rack_assignment: Option<IgnoreRackAssignment>,
94
95 #[cfg_attr(
96 feature = "use_serde",
97 serde(skip_serializing_if = "Option::is_none", default)
98 )]
99 pub maps: Option<Vec<PartitionMap>>,
100}
101
102#[derive(Debug, Default, Builder, Clone, PartialEq, Eq)]
103#[cfg_attr(
104 feature = "use_serde",
105 derive(serde::Serialize, serde::Deserialize),
106 derive(schemars::JsonSchema),
107 serde(rename_all = "kebab-case")
108)]
109pub struct RetentionConfig {
110 #[cfg_attr(
111 feature = "use_serde",
112 serde(
113 skip_serializing_if = "Option::is_none",
114 with = "humantime_serde",
115 default
116 ),
117 schemars(with = "String")
118 )]
119 pub time: Option<Duration>,
120
121 #[cfg_attr(
122 feature = "use_serde",
123 serde(skip_serializing_if = "Option::is_none", default),
124 schemars(with = "Option::<String>")
125 )]
126 pub segment_size: Option<bytesize::ByteSize>,
127}
128
129#[derive(Debug, Default, Builder, Clone, PartialEq, Eq)]
130#[cfg_attr(
131 feature = "use_serde",
132 derive(serde::Serialize, serde::Deserialize),
133 derive(schemars::JsonSchema),
134 serde(rename_all = "kebab-case")
135)]
136pub struct CompressionConfig {
137 #[cfg_attr(feature = "use_serde", serde(rename = "type", default))]
138 pub type_: CompressionAlgorithm,
139}
140
141impl TopicConfig {
142 #[cfg(feature = "use_serde")]
143 pub fn from_file<P: AsRef<std::path::Path>>(path: P) -> anyhow::Result<Self> {
144 Ok(serde_yaml::from_reader(std::fs::File::open(path)?)?)
145 }
146}
147
148#[cfg(feature = "use_serde")]
149impl std::str::FromStr for TopicConfig {
150 type Err = anyhow::Error;
151
152 fn from_str(s: &str) -> Result<Self, Self::Err> {
153 Ok(serde_yaml::from_str(s)?)
154 }
155}
156
157impl Default for PartitionConfig {
158 fn default() -> Self {
159 Self {
160 count: Some(DEFAULT_PARTITION_COUNT),
161 replication: Some(DEFAULT_REPLICATION_FACTOR),
162 ignore_rack_assignment: Some(DEFAULT_IGNORE_RACK_ASSIGMENT),
163 max_size: Default::default(),
164 maps: Default::default(),
165 }
166 }
167}
168
169impl From<TopicConfig> for TopicSpec {
170 fn from(config: TopicConfig) -> Self {
171 let segment_size = config.retention.segment_size.map(|s| s.as_u64() as u32);
172 let max_partition_size = config.partition.max_size.map(|s| s.as_u64());
173
174 let replica_spec = match config.partition.maps {
175 Some(maps) => ReplicaSpec::Assigned(maps.into()),
176 None => ReplicaSpec::Computed(TopicReplicaParam {
177 partitions: config.partition.count.unwrap_or(DEFAULT_PARTITION_COUNT),
178 replication_factor: config
179 .partition
180 .replication
181 .unwrap_or(DEFAULT_REPLICATION_FACTOR),
182 ignore_rack_assignment: config
183 .partition
184 .ignore_rack_assignment
185 .unwrap_or(DEFAULT_IGNORE_RACK_ASSIGMENT),
186 }),
187 };
188 let mut topic_spec: TopicSpec = replica_spec.into();
189 if let Some(retention_time) = config.retention.time {
190 topic_spec.set_cleanup_policy(CleanupPolicy::Segment(SegmentBasedPolicy {
191 time_in_seconds: retention_time.as_secs() as u32,
192 }));
193 };
194
195 topic_spec.set_compression_type(config.compression.type_);
196 topic_spec.set_deduplication(config.deduplication);
197
198 if segment_size.is_some() || max_partition_size.is_some() {
199 topic_spec.set_storage(TopicStorageConfig {
200 segment_size,
201 max_partition_size,
202 });
203 }
204
205 topic_spec
206 }
207}
208
209fn default_version() -> String {
210 DEFAULT_VERSION.to_string()
211}
212
213#[cfg(test)]
214mod tests {
215 use crate::topic::deduplication::{Bounds, Filter, Transform};
216
217 use super::*;
218
219 #[cfg(feature = "use_serde")]
220 #[test]
221 fn test_topic_config_ser_de() {
222 let input = r#"version: 0.1.1
224meta:
225 name: test_topic
226partition:
227 count: 3
228 max-size: 1.0 KB
229 replication: 2
230 ignore-rack-assignment: true
231 maps:
232 - id: 1
233 replicas:
234 - 1
235 - 2
236retention:
237 time: 2m
238 segment-size: 2.0 KB
239compression:
240 type: Lz4
241deduplication:
242 bounds:
243 count: 100
244 age: 1m
245 filter:
246 transform:
247 uses: fluvio/dedup-bloom-filter@0.1.0
248"#;
249
250 use std::str::FromStr;
252
253 let deser = TopicConfig::from_str(input).expect("deserialized");
254 let ser = serde_yaml::to_string(&deser).expect("serialized");
255
256 assert_eq!(input, ser);
258 assert_eq!(deser, test_config());
259 }
260
261 #[cfg(feature = "use_serde")]
262 #[test]
263 fn test_minimal_topic_config_ser_de() {
264 let input = r#"meta:
266 name: test_topic
267"#;
268
269 use std::str::FromStr;
271
272 let deser = TopicConfig::from_str(input).expect("deserialized");
273 let ser = serde_yaml::to_string(&deser).expect("serialized");
274
275 assert_eq!(deser.version, DEFAULT_VERSION);
277 assert_eq!(deser.partition.count, Some(DEFAULT_PARTITION_COUNT));
278 assert_eq!(
279 deser.partition.replication,
280 Some(DEFAULT_REPLICATION_FACTOR)
281 );
282 assert_eq!(
283 deser.partition.ignore_rack_assignment,
284 Some(DEFAULT_IGNORE_RACK_ASSIGMENT)
285 );
286 assert_eq!(
287 ser,
288 r#"version: 0.1.0
289meta:
290 name: test_topic
291partition:
292 count: 1
293 replication: 1
294 ignore-rack-assignment: false
295retention: {}
296compression:
297 type: Any
298"#
299 );
300 }
301
302 #[test]
303 fn test_default_config_to_spec() {
304 let config = TopicConfig::default();
306
307 let spec: TopicSpec = config.into();
309
310 assert_eq!(
312 spec,
313 TopicSpec::new_computed(
314 DEFAULT_PARTITION_COUNT,
315 DEFAULT_REPLICATION_FACTOR,
316 Some(DEFAULT_IGNORE_RACK_ASSIGMENT)
317 )
318 );
319 }
320
321 #[test]
322 fn test_full_config_to_spec() {
323 let config = test_config();
325
326 let spec: TopicSpec = config.into();
328
329 let mut test_spec = TopicSpec::new_assigned(vec![PartitionMap {
331 id: 1,
332 replicas: vec![1, 2],
333 ..Default::default()
334 }]);
335 test_spec.set_cleanup_policy(CleanupPolicy::Segment(SegmentBasedPolicy {
336 time_in_seconds: 120,
337 }));
338 test_spec.set_compression_type(CompressionAlgorithm::Lz4);
339 test_spec.set_storage(TopicStorageConfig {
340 segment_size: Some(2000),
341 max_partition_size: Some(1000),
342 });
343 test_spec.set_deduplication(Some(test_deduplication()));
344
345 assert_eq!(spec, test_spec);
346 }
347
348 fn test_config() -> TopicConfig {
349 TopicConfig {
350 version: "0.1.1".to_string(),
351 meta: MetaConfig {
352 name: "test_topic".to_string(),
353 },
354 partition: PartitionConfig {
355 count: Some(3),
356 max_size: Some(bytesize::ByteSize(1000)),
357 replication: Some(2),
358 ignore_rack_assignment: Some(true),
359 maps: Some(vec![PartitionMap {
360 id: 1,
361 replicas: vec![1, 2],
362 ..Default::default()
363 }]),
364 },
365 retention: RetentionConfig {
366 time: Some(Duration::from_secs(120)),
367 segment_size: Some(bytesize::ByteSize(2000)),
368 },
369 compression: CompressionConfig {
370 type_: CompressionAlgorithm::Lz4,
371 },
372 deduplication: Some(test_deduplication()),
373 }
374 }
375
376 fn test_deduplication() -> Deduplication {
377 Deduplication {
378 bounds: Bounds {
379 count: 100,
380 age: Some(Duration::from_secs(60)),
381 },
382 filter: Filter {
383 transform: Transform {
384 uses: "fluvio/dedup-bloom-filter@0.1.0".to_string(),
385 with: Default::default(),
386 },
387 },
388 }
389 }
390}