1use std::ops::{Deref, DerefMut};
2
3use anyhow::{anyhow, Result};
4
5use fluvio_protocol::record::ReplicaKey;
6use fluvio_types::defaults::{
7 STORAGE_RETENTION_SECONDS, SPU_LOG_LOG_SEGMENT_MAX_BYTE_MIN, STORAGE_RETENTION_SECONDS_MIN,
8 SPU_PARTITION_MAX_BYTES_MIN, SPU_LOG_SEGMENT_MAX_BYTES,
9};
10use fluvio_types::SpuId;
11use fluvio_types::{PartitionId, PartitionCount, ReplicationFactor, IgnoreRackAssignment};
12use fluvio_protocol::{Encoder, Decoder};
13
14use crate::partition::{HomePartitionConfig, PartitionMirrorConfig, RemotePartitionConfig};
15
16use super::deduplication::Deduplication;
17
18#[derive(Debug, Clone, PartialEq, Default, Encoder, Decoder)]
19#[cfg_attr(
20 feature = "use_serde",
21 derive(serde::Serialize, serde::Deserialize),
22 serde(rename_all = "camelCase")
23)]
24pub struct TopicSpec {
25 replicas: ReplicaSpec,
26 #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
27 #[fluvio(min_version = 3)]
28 cleanup_policy: Option<CleanupPolicy>,
29 #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
30 #[fluvio(min_version = 4)]
31 storage: Option<TopicStorageConfig>,
32 #[cfg_attr(feature = "use_serde", serde(default))]
33 #[fluvio(min_version = 6)]
34 compression_type: CompressionAlgorithm,
35 #[cfg_attr(feature = "use_serde", serde(default))]
36 #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "Option::is_none"))]
37 #[fluvio(min_version = 12)]
38 deduplication: Option<Deduplication>,
39 #[cfg_attr(feature = "use_serde", serde(default))]
40 #[fluvio(min_version = 13)]
41 system: bool,
42}
43
44impl From<ReplicaSpec> for TopicSpec {
45 fn from(replicas: ReplicaSpec) -> Self {
46 Self {
47 replicas,
48 ..Default::default()
49 }
50 }
51}
52
53impl Deref for TopicSpec {
54 type Target = ReplicaSpec;
55
56 fn deref(&self) -> &Self::Target {
57 &self.replicas
58 }
59}
60
61impl TopicSpec {
62 pub fn new_assigned(partition_map: impl Into<PartitionMaps>) -> Self {
63 Self {
64 replicas: ReplicaSpec::new_assigned(partition_map),
65 ..Default::default()
66 }
67 }
68
69 pub fn new_computed(
70 partitions: PartitionCount,
71 replication: ReplicationFactor,
72 ignore_rack: Option<IgnoreRackAssignment>,
73 ) -> Self {
74 Self {
75 replicas: ReplicaSpec::new_computed(partitions, replication, ignore_rack),
76 ..Default::default()
77 }
78 }
79
80 pub fn new_mirror(mirror: MirrorConfig) -> Self {
81 Self {
82 replicas: ReplicaSpec::Mirror(mirror),
83 ..Default::default()
84 }
85 }
86
87 #[inline(always)]
88 pub fn replicas(&self) -> &ReplicaSpec {
89 &self.replicas
90 }
91
92 pub fn set_replicas(&mut self, replicas: ReplicaSpec) {
93 self.replicas = replicas;
94 }
95
96 pub fn set_cleanup_policy(&mut self, policy: CleanupPolicy) {
97 self.cleanup_policy = Some(policy);
98 }
99
100 pub fn get_partition_mirror_map(&self) -> Option<PartitionMaps> {
101 match &self.replicas {
102 ReplicaSpec::Mirror(mirror) => match mirror {
103 MirrorConfig::Remote(e) => Some(e.as_partition_maps()),
104 MirrorConfig::Home(c) => Some(c.as_partition_maps()),
105 },
106 _ => None,
107 }
108 }
109
110 pub fn get_clean_policy(&self) -> Option<&CleanupPolicy> {
111 self.cleanup_policy.as_ref()
112 }
113
114 pub fn set_compression_type(&mut self, compression: CompressionAlgorithm) {
115 self.compression_type = compression;
116 }
117
118 pub fn get_compression_type(&self) -> &CompressionAlgorithm {
119 &self.compression_type
120 }
121
122 pub fn get_storage(&self) -> Option<&TopicStorageConfig> {
123 self.storage.as_ref()
124 }
125
126 pub fn get_storage_mut(&mut self) -> Option<&mut TopicStorageConfig> {
127 self.storage.as_mut()
128 }
129
130 pub fn set_storage(&mut self, storage: TopicStorageConfig) {
131 self.storage = Some(storage);
132 }
133
134 pub fn get_deduplication(&self) -> Option<&Deduplication> {
135 self.deduplication.as_ref()
136 }
137
138 pub fn set_deduplication(&mut self, deduplication: Option<Deduplication>) {
139 self.deduplication = deduplication;
140 }
141
142 pub fn is_system(&self) -> bool {
143 self.system
144 }
145
146 pub fn set_system(&mut self, system: bool) {
147 self.system = system;
148 }
149
150 pub fn retention_secs(&self) -> u32 {
152 self.get_clean_policy()
153 .map(|policy| policy.retention_secs())
154 .unwrap_or_else(|| STORAGE_RETENTION_SECONDS)
155 }
156
157 pub fn validate_config(&self) -> Option<String> {
159 if let Some(policy) = self.get_clean_policy() {
160 if policy.retention_secs() < STORAGE_RETENTION_SECONDS_MIN {
161 return Some(format!(
162 "retention_secs {} is less than minimum {}",
163 policy.retention_secs(),
164 STORAGE_RETENTION_SECONDS_MIN
165 ));
166 }
167 }
168
169 if let Some(storage) = self.get_storage() {
170 if let Some(segment_size) = storage.segment_size {
171 if segment_size < SPU_LOG_LOG_SEGMENT_MAX_BYTE_MIN {
172 return Some(format!(
173 "segment_size {segment_size} is less than minimum {SPU_LOG_LOG_SEGMENT_MAX_BYTE_MIN}"
174 ));
175 }
176 }
177 if let Some(max_partition_size) = storage.max_partition_size {
178 if max_partition_size < SPU_PARTITION_MAX_BYTES_MIN {
179 return Some(format!(
180 "max_partition_size {max_partition_size} is less than minimum {SPU_PARTITION_MAX_BYTES_MIN}"
181 ));
182 }
183 let segment_size = storage.segment_size.unwrap_or(SPU_LOG_SEGMENT_MAX_BYTES);
184 if max_partition_size < segment_size as u64 {
185 return Some(format!(
186 "max_partition_size {max_partition_size} is less than segment size {segment_size}"
187 ));
188 }
189 }
190 }
191
192 None
193 }
194}
195
196#[derive(Debug, Clone, Eq, PartialEq, Encoder, Decoder)]
197#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
198pub enum ReplicaSpec {
199 #[cfg_attr(feature = "use_serde", serde(rename = "assigned"))]
200 #[fluvio(tag = 0)]
201 Assigned(PartitionMaps),
202 #[cfg_attr(feature = "use_serde", serde(rename = "computed"))]
203 #[fluvio(tag = 1)]
204 Computed(TopicReplicaParam),
205 #[cfg_attr(
206 feature = "use_serde",
207 serde(rename = "mirror", with = "serde_yaml::with::singleton_map")
208 )]
209 #[fluvio(tag = 2)]
210 #[fluvio(min_version = 14)]
211 Mirror(MirrorConfig),
212}
213
214impl std::fmt::Display for ReplicaSpec {
215 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
216 match self {
217 Self::Assigned(partition_map) => write!(f, "assigned::{partition_map}"),
218 Self::Computed(param) => write!(f, "computed::({param})"),
219 Self::Mirror(param) => write!(f, "mirror::({param})"),
220 }
221 }
222}
223
224impl Default for ReplicaSpec {
225 fn default() -> Self {
226 Self::Computed(TopicReplicaParam::default())
227 }
228}
229
230impl ReplicaSpec {
231 pub fn new_assigned<J>(partition_map: J) -> Self
232 where
233 J: Into<PartitionMaps>,
234 {
235 Self::Assigned(partition_map.into())
236 }
237
238 pub fn new_computed(
239 partitions: PartitionCount,
240 replication: ReplicationFactor,
241 ignore_rack: Option<IgnoreRackAssignment>,
242 ) -> Self {
243 Self::Computed((partitions, replication, ignore_rack.unwrap_or(false)).into())
244 }
245
246 pub fn is_computed(&self) -> bool {
247 matches!(self, Self::Computed(_))
248 }
249
250 pub fn partitions(&self) -> PartitionCount {
251 match &self {
252 Self::Computed(param) => param.partitions,
253 Self::Assigned(partition_map) => partition_map.partition_count(),
254 Self::Mirror(partition_map) => partition_map.partition_count(),
255 }
256 }
257
258 pub fn replication_factor(&self) -> Option<ReplicationFactor> {
259 match self {
260 Self::Computed(param) => Some(param.replication_factor),
261 Self::Assigned(partition_map) => partition_map.replication_factor(),
262 Self::Mirror(partition_map) => partition_map.replication_factor(),
263 }
264 }
265
266 pub fn ignore_rack_assignment(&self) -> IgnoreRackAssignment {
267 match self {
268 Self::Computed(param) => param.ignore_rack_assignment,
269 Self::Assigned(_) => false,
270 Self::Mirror(_) => false,
271 }
272 }
273
274 pub fn type_label(&self) -> &'static str {
275 match self {
276 Self::Computed(_) => "computed",
277 Self::Assigned(_) => "assigned",
278 Self::Mirror(mirror) => match mirror {
279 MirrorConfig::Remote(remote_config) => {
280 if remote_config.target {
281 "from-home"
282 } else {
283 "to-home"
284 }
285 }
286 MirrorConfig::Home(home_config) => {
287 if home_config.0.source {
288 "to-remote"
289 } else {
290 "from-remote"
291 }
292 }
293 },
294 }
295 }
296
297 pub fn partitions_display(&self) -> String {
298 match self {
299 Self::Computed(param) => param.partitions.to_string(),
300 Self::Assigned(_) => "".to_owned(),
301 Self::Mirror(_) => "".to_owned(),
302 }
303 }
304
305 pub fn replication_factor_display(&self) -> String {
306 match self {
307 Self::Computed(param) => param.replication_factor.to_string(),
308 Self::Assigned(_) => "".to_owned(),
309 Self::Mirror(_) => "".to_owned(),
310 }
311 }
312
313 pub fn ignore_rack_assign_display(&self) -> &'static str {
314 match self {
315 Self::Computed(param) => {
316 if param.ignore_rack_assignment {
317 "yes"
318 } else {
319 ""
320 }
321 }
322 Self::Assigned(_) => "",
323 Self::Mirror(_) => "",
324 }
325 }
326
327 pub fn partition_map_str(&self) -> Option<String> {
328 match self {
329 Self::Computed(_) => None,
330 Self::Assigned(partition_map) => Some(partition_map.partition_map_string()),
331 Self::Mirror(mirror) => Some(mirror.as_partition_maps().partition_map_string()),
332 }
333 }
334
335 pub fn valid_partition(partitions: &PartitionCount) -> Result<()> {
341 if *partitions == 0 {
342 return Err(anyhow!("partition must be greater than 0"));
343 }
344
345 Ok(())
346 }
347
348 pub fn valid_replication_factor(replication: &ReplicationFactor) -> Result<()> {
350 if *replication == 0 {
351 return Err(anyhow!("replication factor must be greater than 0"));
352 }
353
354 Ok(())
355 }
356}
357
358#[derive(Debug, Clone, Default, Eq, PartialEq, Encoder, Decoder)]
360#[cfg_attr(
361 feature = "use_serde",
362 derive(serde::Serialize, serde::Deserialize),
363 serde(rename_all = "camelCase")
364)]
365pub struct TopicReplicaParam {
366 #[cfg_attr(feature = "use_serde", serde(default = "default_count"))]
367 pub partitions: PartitionCount,
368 #[cfg_attr(feature = "use_serde", serde(default = "default_count"))]
369 pub replication_factor: ReplicationFactor,
370 #[cfg_attr(
371 feature = "use_serde",
372 serde(skip_serializing_if = "bool::clone", default)
373 )]
374 pub ignore_rack_assignment: IgnoreRackAssignment,
375}
376
377#[allow(dead_code)]
378fn default_count() -> u32 {
379 1
380}
381
382impl TopicReplicaParam {
383 pub fn new(
384 partitions: PartitionCount,
385 replication_factor: ReplicationFactor,
386 ignore_rack_assignment: IgnoreRackAssignment,
387 ) -> Self {
388 Self {
389 partitions,
390 replication_factor,
391 ignore_rack_assignment,
392 }
393 }
394}
395
396impl From<(PartitionCount, ReplicationFactor, IgnoreRackAssignment)> for TopicReplicaParam {
397 fn from(value: (PartitionCount, ReplicationFactor, IgnoreRackAssignment)) -> Self {
398 let (partitions, replication_factor, ignore_rack) = value;
399 Self::new(partitions, replication_factor, ignore_rack)
400 }
401}
402
403impl std::fmt::Display for TopicReplicaParam {
404 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
405 write!(
406 f,
407 "replica param::(p:{}, r:{})",
408 self.partitions, self.replication_factor
409 )
410 }
411}
412
413#[derive(Debug, Default, Clone, Eq, PartialEq, Encoder, Decoder)]
415#[cfg_attr(
416 feature = "use_serde",
417 derive(serde::Serialize, serde::Deserialize),
418 serde(transparent)
419)]
420pub struct PartitionMaps(Vec<PartitionMap>);
421
422impl From<Vec<PartitionMap>> for PartitionMaps {
423 fn from(maps: Vec<PartitionMap>) -> Self {
424 Self(maps)
425 }
426}
427
428impl From<PartitionMaps> for Vec<PartitionMap> {
429 fn from(maps: PartitionMaps) -> Self {
430 maps.0
431 }
432}
433
434impl From<Vec<(PartitionId, Vec<SpuId>)>> for PartitionMaps {
435 fn from(partition_vec: Vec<(PartitionId, Vec<SpuId>)>) -> Self {
436 let maps: Vec<PartitionMap> = partition_vec
437 .into_iter()
438 .map(|(id, replicas)| PartitionMap {
439 id,
440 replicas,
441 mirror: None,
442 })
443 .collect();
444 maps.into()
445 }
446}
447
448impl std::fmt::Display for PartitionMaps {
449 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
450 write!(f, "partition map:{})", self.0.len())
451 }
452}
453
454impl PartitionMaps {
455 pub fn maps(&self) -> &Vec<PartitionMap> {
456 &self.0
457 }
458
459 pub fn maps_owned(self) -> Vec<PartitionMap> {
460 self.0
461 }
462
463 fn partition_count(&self) -> PartitionCount {
464 self.0.len() as PartitionCount
465 }
466
467 fn replication_factor(&self) -> Option<ReplicationFactor> {
468 self.0
470 .first()
471 .map(|partition| partition.replicas.len() as ReplicationFactor)
472 }
473
474 fn partition_map_string(&self) -> String {
475 use std::fmt::Write;
476
477 let mut res = String::new();
478 for partition in &self.0 {
479 write!(res, "{}:{:?}, ", partition.id, partition.replicas).unwrap();
480 }
482 if !res.is_empty() {
483 res.truncate(res.len() - 2);
484 }
485 res
486 }
487
488 pub fn unique_spus_in_partition_map(&self) -> Vec<SpuId> {
494 let mut spu_ids: Vec<SpuId> = vec![];
495
496 for partition in &self.0 {
497 for spu in &partition.replicas {
498 if !spu_ids.contains(spu) {
499 spu_ids.push(*spu);
500 }
501 }
502 }
503
504 spu_ids
505 }
506
507 #[allow(clippy::explicit_counter_loop)]
508 pub fn validate(&self) -> Result<()> {
510 if self.0.is_empty() {
512 return Err(anyhow!("no assigned partitions found"));
513 }
514
515 let mut id = 0;
525 let mut replica_len = 0;
526 for partition in &self.0 {
527 if id == 0 {
528 if partition.id != id {
530 return Err(anyhow!("assigned partitions must start with id 0",));
531 }
532
533 replica_len = partition.replicas.len();
535 if replica_len == 0 {
536 return Err(anyhow!("assigned replicas must have at least one spu id",));
537 }
538 } else {
539 if partition.id != id {
541 return Err(anyhow!(
542 "assigned partition ids must be in sequence and without gaps"
543 ));
544 }
545
546 if partition.replicas.len() != replica_len {
548 return Err(anyhow!(
549 "all assigned replicas must have the same number of spu ids: {replica_len}"
550 ));
551 }
552 }
553
554 let mut sorted_replicas = partition.replicas.clone();
556 sorted_replicas.sort_unstable();
557 let unique_count = 1 + sorted_replicas
558 .windows(2)
559 .filter(|pair| pair[0] != pair[1])
560 .count();
561 if partition.replicas.len() != unique_count {
562 return Err(anyhow!(format!(
563 "duplicate spu ids found in assigned partition with id: {id}"
564 ),));
565 }
566
567 for spu_id in &partition.replicas {
569 if *spu_id < 0 {
570 return Err(anyhow!(
571 "invalid spu id: {spu_id} in assigned partition with id: {id}"
572 ));
573 }
574 }
575
576 id += 1;
578 }
579
580 Ok(())
581 }
582}
583
584impl From<(PartitionCount, ReplicationFactor, IgnoreRackAssignment)> for TopicSpec {
585 fn from(spec: (PartitionCount, ReplicationFactor, IgnoreRackAssignment)) -> Self {
586 let (count, factor, rack) = spec;
587 Self::new_computed(count, factor, Some(rack))
588 }
589}
590
591impl From<(PartitionCount, ReplicationFactor)> for TopicSpec {
593 fn from(spec: (PartitionCount, ReplicationFactor)) -> Self {
594 let (count, factor) = spec;
595 Self::new_computed(count, factor, Some(false))
596 }
597}
598
599#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
600#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
601pub struct PartitionMap {
602 pub id: PartitionId,
603 pub replicas: Vec<SpuId>,
604 #[cfg_attr(
605 feature = "use_serde",
606 serde(default, skip_serializing_if = "Option::is_none")
607 )]
608 #[fluvio(min_version = 14)]
609 pub mirror: Option<PartitionMirrorConfig>,
610}
611
612#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
613#[cfg_attr(
614 feature = "use_serde",
615 derive(serde::Serialize, serde::Deserialize),
616 serde(rename_all = "camelCase")
617)]
618pub enum MirrorConfig {
619 #[fluvio(tag = 0)]
620 Remote(RemoteMirrorConfig),
621 #[fluvio(tag = 1)]
622 Home(HomeMirrorConfig),
623}
624
625impl Default for MirrorConfig {
626 fn default() -> Self {
627 Self::Remote(RemoteMirrorConfig::default())
628 }
629}
630
631impl std::fmt::Display for MirrorConfig {
632 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
633 match self {
634 MirrorConfig::Remote(r) => {
635 write!(f, "Mirror Remote {:?} ", r)
636 }
637 MirrorConfig::Home(h) => {
638 write!(f, "Mirror Home {:?} ", h)
639 }
640 }
641 }
642}
643
644impl MirrorConfig {
645 pub fn partition_count(&self) -> PartitionCount {
646 match self {
647 MirrorConfig::Remote(src) => src.partition_count(),
648 MirrorConfig::Home(tg) => tg.partition_count(),
649 }
650 }
651
652 pub fn replication_factor(&self) -> Option<ReplicationFactor> {
653 None
654 }
655
656 pub fn as_partition_maps(&self) -> PartitionMaps {
657 match self {
658 MirrorConfig::Remote(src) => src.as_partition_maps(),
659 MirrorConfig::Home(tg) => tg.as_partition_maps(),
660 }
661 }
662
663 pub fn set_home_to_remote(&mut self, home_to_remote: bool) -> Result<()> {
665 match self {
666 Self::Remote(_) => Err(anyhow!(
667 "remote mirror config cannot be set to home to remote"
668 )),
669 Self::Home(home) => {
670 home.set_home_to_remote(home_to_remote);
671 Ok(())
672 }
673 }
674 }
675
676 pub fn validate(&self) -> anyhow::Result<()> {
678 Ok(())
679 }
680}
681
682type Partitions = Vec<HomePartitionConfig>;
683
684#[cfg_attr(
685 feature = "use_serde",
686 derive(serde::Serialize),
687 serde(rename_all = "camelCase", untagged)
688)]
689enum MultiHome {
690 V1(Partitions),
691 V2(HomeMirrorInner),
692}
693
694#[derive(Encoder, Decoder, Default, Debug, Clone, Eq, PartialEq)]
695#[cfg_attr(
696 feature = "use_serde",
697 derive(serde::Serialize, serde::Deserialize),
698 serde(rename_all = "camelCase")
699)]
700pub struct HomeMirrorConfig(
701 #[cfg_attr(feature = "use_serde", serde(deserialize_with = "from_home_v1"))] HomeMirrorInner,
702);
703
704impl Deref for HomeMirrorConfig {
705 type Target = HomeMirrorInner;
706
707 fn deref(&self) -> &Self::Target {
708 &self.0
709 }
710}
711
712impl DerefMut for HomeMirrorConfig {
713 fn deref_mut(&mut self) -> &mut Self::Target {
714 &mut self.0
715 }
716}
717
718impl HomeMirrorConfig {
719 pub fn from_simple(topic: &str, remote_clusters: Vec<String>) -> Self {
722 Self(HomeMirrorInner {
723 partitions: remote_clusters
724 .into_iter()
725 .map(|remote_cluster| HomePartitionConfig {
726 remote_cluster,
727 remote_replica: { ReplicaKey::new(topic, 0_u32).to_string() },
728 ..Default::default()
729 })
730 .collect(),
731 source: false,
732 })
733 }
734}
735
736cfg_if::cfg_if! {
737 if #[cfg(feature = "use_serde")] {
738 impl<'de> serde::Deserialize<'de> for MultiHome {
739 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
740 where
741 D: serde::Deserializer<'de>,
742 {
743 struct MultiHomeVisitor;
744
745 impl<'de> serde::de::Visitor<'de> for MultiHomeVisitor {
746 type Value = MultiHome;
747
748 fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
749 formatter.write_str("an array or an object")
750 }
751
752 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
753 where
754 A: serde::de::SeqAccess<'de>,
755 {
756 let mut elements = vec![];
757 while let Some(value) = seq.next_element::<HomePartitionConfig>()? {
758 elements.push(value);
759 }
760 Ok(MultiHome::V1(elements))
761 }
762
763 fn visit_map<M>(self, map: M) -> Result<Self::Value, M::Error>
764 where
765 M: serde::de::MapAccess<'de>,
766 {
767 use serde::de::value::MapAccessDeserializer;
768 let obj: HomeMirrorInner = serde::Deserialize::deserialize(MapAccessDeserializer::new(map))?;
769 Ok(MultiHome::V2(obj))
770 }
771 }
772
773 deserializer.deserialize_any(MultiHomeVisitor)
774 }
775 }
776
777 fn from_home_v1<'de, D>(deserializer: D) -> Result<HomeMirrorInner, D::Error>
778 where
779 D: serde::Deserializer<'de>,
780 {
781 let home: MultiHome = serde::Deserialize::deserialize(deserializer)?;
782 match home {
783 MultiHome::V1(v1) => Ok(HomeMirrorInner {
784 partitions: v1,
785 source: false,
786 }),
787 MultiHome::V2(v2) => Ok(v2),
788 }
789 }
790 }
791}
792
793#[derive(Default, Debug, Clone, Eq, PartialEq, Decoder, Encoder)]
794#[cfg_attr(
795 feature = "use_serde",
796 derive(serde::Serialize, serde::Deserialize),
797 serde(rename_all = "camelCase")
798)]
799pub struct HomeMirrorInner {
800 #[cfg_attr(feature = "use_serde", serde(default))]
801 pub partitions: Vec<HomePartitionConfig>,
802 #[cfg_attr(
803 feature = "use_serde",
804 serde(skip_serializing_if = "crate::is_false", default)
805 )]
806 #[fluvio(min_version = 18)]
807 pub source: bool, }
809
810impl From<Vec<HomePartitionConfig>> for HomeMirrorConfig {
811 fn from(partitions: Vec<HomePartitionConfig>) -> Self {
812 Self(HomeMirrorInner {
813 partitions,
814 source: false,
815 })
816 }
817}
818
819impl HomeMirrorInner {
820 pub fn partition_count(&self) -> PartitionCount {
821 self.partitions.len() as PartitionCount
822 }
823
824 pub fn replication_factor(&self) -> Option<ReplicationFactor> {
825 None
826 }
827
828 pub fn partitions(&self) -> &Vec<HomePartitionConfig> {
829 &self.partitions
830 }
831
832 pub fn as_partition_maps(&self) -> PartitionMaps {
833 let mut maps = vec![];
834 for (partition_id, home_partition) in self.partitions.iter().enumerate() {
835 maps.push(PartitionMap {
836 id: partition_id as u32,
837 mirror: Some(PartitionMirrorConfig::Home(home_partition.clone())),
838 ..Default::default()
839 });
840 }
841 maps.into()
842 }
843
844 pub fn validate(&self) -> anyhow::Result<()> {
846 Ok(())
847 }
848
849 pub fn add_partition(&mut self, partition: HomePartitionConfig) {
851 self.partitions.push(partition);
852 }
853
854 pub fn set_home_to_remote(&mut self, home_to_remote: bool) {
856 self.source = home_to_remote;
857 self.partitions.iter_mut().for_each(|partition| {
858 partition.source = home_to_remote;
859 });
860 }
861}
862
863#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
864#[cfg_attr(
865 feature = "use_serde",
866 derive(serde::Serialize, serde::Deserialize),
867 serde(rename_all = "camelCase")
868)]
869pub struct HomeMirrorPartition {
870 pub remote_clusters: Vec<String>,
871}
872
873#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
874#[cfg_attr(
875 feature = "use_serde",
876 derive(serde::Serialize, serde::Deserialize),
877 serde(rename_all = "camelCase")
878)]
879pub struct RemoteMirrorConfig {
880 pub home_cluster: String,
882 pub home_spus: Vec<SpuMirrorConfig>,
883 #[cfg_attr(feature = "use_serde", serde(skip_serializing_if = "crate::is_false"))]
884 #[fluvio(min_version = 18)]
885 pub target: bool,
886}
887
888#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
889#[cfg_attr(
890 feature = "use_serde",
891 derive(serde::Serialize, serde::Deserialize),
892 serde(rename_all = "camelCase")
893)]
894pub struct SpuMirrorConfig {
895 pub id: SpuId,
896 pub key: String,
897 pub endpoint: String,
898}
899
900impl RemoteMirrorConfig {
901 pub fn partition_count(&self) -> PartitionCount {
902 self.home_spus.len() as PartitionCount
903 }
904
905 pub fn replication_factor(&self) -> Option<ReplicationFactor> {
906 None
907 }
908
909 pub fn spus(&self) -> &Vec<SpuMirrorConfig> {
910 &self.home_spus
911 }
912
913 pub fn as_partition_maps(&self) -> PartitionMaps {
914 let mut maps = vec![];
915 for (partition_id, home_spu) in self.home_spus.iter().enumerate() {
916 maps.push(PartitionMap {
917 id: partition_id as u32,
918 mirror: Some(PartitionMirrorConfig::Remote(RemotePartitionConfig {
919 home_spu_key: home_spu.key.clone(),
920 home_spu_id: home_spu.id,
921 home_cluster: self.home_cluster.clone(),
922 home_spu_endpoint: home_spu.endpoint.clone(),
923 target: self.target,
924 })),
925 ..Default::default()
926 });
927 }
928 maps.into()
929 }
930
931 pub fn validate(&self) -> anyhow::Result<()> {
933 Ok(())
934 }
935}
936
937#[derive(Decoder, Encoder, Debug, Clone, Eq, PartialEq)]
938#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
939pub enum CleanupPolicy {
940 #[cfg_attr(feature = "use_serde", serde(rename = "segment"))]
941 #[fluvio(tag = 0)]
942 Segment(SegmentBasedPolicy),
943}
944
945impl Default for CleanupPolicy {
946 fn default() -> Self {
947 CleanupPolicy::Segment(SegmentBasedPolicy::default())
948 }
949}
950
951impl CleanupPolicy {
952 pub fn retention_secs(&self) -> u32 {
953 match self {
954 CleanupPolicy::Segment(policy) => policy.retention_secs(),
955 }
956 }
957}
958
959#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
960#[cfg_attr(
961 feature = "use_serde",
962 derive(serde::Serialize, serde::Deserialize),
963 serde(rename_all = "camelCase")
964)]
965pub struct SegmentBasedPolicy {
966 pub time_in_seconds: u32,
967}
968
969impl SegmentBasedPolicy {
970 pub fn retention_secs(&self) -> u32 {
971 self.time_in_seconds
972 }
973}
974
975#[derive(Decoder, Encoder, Default, Debug, Clone, Eq, PartialEq)]
976#[cfg_attr(
977 feature = "use_serde",
978 derive(serde::Serialize, serde::Deserialize),
979 serde(rename_all = "camelCase")
980)]
981pub struct TopicStorageConfig {
982 pub segment_size: Option<u32>, pub max_partition_size: Option<u64>, }
985
986#[derive(Decoder, Default, Encoder, Debug, Clone, Eq, PartialEq)]
987#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
988pub enum CompressionAlgorithm {
989 #[fluvio(tag = 0)]
990 None,
991 #[fluvio(tag = 1)]
992 Gzip,
993 #[fluvio(tag = 2)]
994 Snappy,
995 #[fluvio(tag = 3)]
996 Lz4,
997 #[default]
998 #[fluvio(tag = 4)]
999 Any,
1000 #[fluvio(tag = 5)]
1001 Zstd,
1002}
1003
1004#[derive(Debug, thiserror::Error)]
1005#[error("Invalid compression type in topic")]
1006pub struct InvalidCompressionAlgorithm;
1007
1008impl std::str::FromStr for CompressionAlgorithm {
1009 type Err = InvalidCompressionAlgorithm;
1010
1011 fn from_str(s: &str) -> Result<Self, Self::Err> {
1012 match s.to_lowercase().as_str() {
1013 "none" => Ok(CompressionAlgorithm::None),
1014 "gzip" => Ok(CompressionAlgorithm::Gzip),
1015 "snappy" => Ok(CompressionAlgorithm::Snappy),
1016 "lz4" => Ok(CompressionAlgorithm::Lz4),
1017 "any" => Ok(CompressionAlgorithm::Any),
1018 "zstd" => Ok(CompressionAlgorithm::Zstd),
1019 _ => Err(InvalidCompressionAlgorithm),
1020 }
1021 }
1022}
1023impl std::fmt::Display for CompressionAlgorithm {
1024 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
1025 match self {
1026 Self::None => write!(f, "none"),
1027 Self::Gzip => write!(f, "gzip"),
1028 Self::Snappy => write!(f, "snappy"),
1029 Self::Lz4 => write!(f, "lz4"),
1030 Self::Any => write!(f, "any"),
1031 Self::Zstd => write!(f, "zstd"),
1032 }
1033 }
1034}
1035
1036#[cfg(test)]
1037mod test {
1038
1039 use std::io::Cursor;
1040
1041 use crate::topic::{Bounds, Filter, Transform};
1042
1043 use super::*;
1044
1045 #[test]
1046 fn test_is_computed_topic() {
1047 let p1: PartitionMaps = vec![(1, vec![0]), (2, vec![2])].into();
1048 let t1 = ReplicaSpec::new_assigned(p1);
1049 assert!(!t1.is_computed());
1050
1051 let t2 = ReplicaSpec::new_computed(0, 0, None);
1052 assert!(t2.is_computed());
1053 }
1054
1055 #[test]
1056 fn test_valid_computed_replica_params() {
1057 let t2_result = ReplicaSpec::valid_partition(&0);
1059 assert!(t2_result.is_err());
1060 assert_eq!(
1061 format!("{}", t2_result.unwrap_err()),
1062 "partition must be greater than 0"
1063 );
1064
1065 let t3_result = ReplicaSpec::valid_partition(&1);
1066 assert!(t3_result.is_ok());
1067
1068 let t5_result = ReplicaSpec::valid_replication_factor(&0);
1070 assert!(t5_result.is_err());
1071 assert_eq!(
1072 format!("{}", t5_result.unwrap_err()),
1073 "replication factor must be greater than 0"
1074 );
1075
1076 let t6_result = ReplicaSpec::valid_replication_factor(&1);
1078 assert!(t6_result.is_ok());
1079 }
1080
1081 #[test]
1085 fn test_replica_map_ids() {
1086 let p1: PartitionMaps = vec![(1, vec![0]), (2, vec![2])].into();
1088 let p1_result = p1.validate();
1089 assert!(p1_result.is_err());
1090 assert_eq!(
1091 format!("{}", p1_result.unwrap_err()),
1092 "assigned partitions must start with id 0"
1093 );
1094
1095 let p2: PartitionMaps = vec![(0, vec![0]), (2, vec![2])].into();
1097 let p2_result = p2.validate();
1098 assert!(p2_result.is_err());
1099 assert_eq!(
1100 format!("{}", p2_result.unwrap_err()),
1101 "assigned partition ids must be in sequence and without gaps"
1102 );
1103
1104 let p3: PartitionMaps = vec![(0, vec![0]), (2, vec![2]), (1, vec![1])].into();
1106 let p3_result = p3.validate();
1107 assert!(p3_result.is_err());
1108 assert_eq!(
1109 format!("{}", p3_result.unwrap_err()),
1110 "assigned partition ids must be in sequence and without gaps"
1111 );
1112
1113 let p4: PartitionMaps = vec![(0, vec![0]), (1, vec![1]), (1, vec![1])].into();
1115 let p4_result = p4.validate();
1116 assert!(p4_result.is_err());
1117 assert_eq!(
1118 format!("{}", p4_result.unwrap_err()),
1119 "assigned partition ids must be in sequence and without gaps"
1120 );
1121
1122 let p5: PartitionMaps = vec![(0, vec![1]), (1, vec![1]), (2, vec![2])].into();
1124 let p5_result = p5.validate();
1125 assert!(p5_result.is_ok());
1126 }
1127
1128 #[test]
1134 fn test_replica_map_spu_ids() {
1135 let p1: PartitionMaps = vec![(0, vec![]), (1, vec![1])].into();
1137 let p1_result = p1.validate();
1138 assert!(p1_result.is_err());
1139 assert_eq!(
1140 format!("{}", p1_result.unwrap_err()),
1141 "assigned replicas must have at least one spu id"
1142 );
1143
1144 let p2: PartitionMaps = vec![(0, vec![1, 2]), (1, vec![1])].into();
1146 let p2_result = p2.validate();
1147 assert!(p2_result.is_err());
1148 assert_eq!(
1149 format!("{}", p2_result.unwrap_err()),
1150 "all assigned replicas must have the same number of spu ids: 2"
1151 );
1152
1153 let p3: PartitionMaps = vec![(0, vec![1, 2]), (1, vec![1, 1])].into();
1155 let p3_result = p3.validate();
1156 assert!(p3_result.is_err());
1157 assert_eq!(
1158 format!("{}", p3_result.unwrap_err()),
1159 "duplicate spu ids found in assigned partition with id: 1"
1160 );
1161
1162 let p4: PartitionMaps = vec![(0, vec![3, 1, 2, 3])].into();
1164 let p4_result = p4.validate();
1165 assert!(p4_result.is_err());
1166 assert_eq!(
1167 format!("{}", p4_result.unwrap_err()),
1168 "duplicate spu ids found in assigned partition with id: 0"
1169 );
1170
1171 let p5: PartitionMaps = vec![(0, vec![1, 2]), (1, vec![1, -2])].into();
1173 let p5_result = p5.validate();
1174 assert!(p5_result.is_err());
1175 assert_eq!(
1176 format!("{}", p5_result.unwrap_err()),
1177 "invalid spu id: -2 in assigned partition with id: 1"
1178 );
1179 }
1180
1181 #[test]
1184 fn test_unique_spus_in_partition_map() {
1185 let p1: PartitionMaps =
1187 vec![(0, vec![0, 1, 3]), (1, vec![0, 2, 3]), (2, vec![1, 3, 4])].into();
1188
1189 let p1_result = p1.unique_spus_in_partition_map();
1190 let expected_p1_result: Vec<SpuId> = vec![0, 1, 3, 2, 4];
1191 assert_eq!(p1_result, expected_p1_result);
1192 }
1193
1194 #[test]
1195 fn test_encode_decode_computed_topic_spec() {
1196 let topic_spec = ReplicaSpec::Computed((2, 3, true).into());
1197 let mut dest = vec![];
1198
1199 let result = topic_spec.encode(&mut dest, 0);
1201 assert!(result.is_ok());
1202
1203 let expected_dest = [
1204 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x01, ];
1209 assert_eq!(dest, expected_dest);
1210
1211 let mut topic_spec_decoded = ReplicaSpec::default();
1213 let result = topic_spec_decoded.decode(&mut Cursor::new(&expected_dest), 0);
1214 assert!(result.is_ok());
1215
1216 match topic_spec_decoded {
1217 ReplicaSpec::Computed(param) => {
1218 assert_eq!(param.partitions, 2);
1219 assert_eq!(param.replication_factor, 3);
1220 assert!(param.ignore_rack_assignment);
1221 }
1222 _ => panic!("expect computed topic spec, found {topic_spec_decoded:?}"),
1223 }
1224 }
1225
1226 #[test]
1227 fn test_topic_with_dedup_prev_version_compatibility() {
1228 let prev_version = 11;
1230 let mut topic_spec: TopicSpec = ReplicaSpec::Computed((2, 3, true).into()).into();
1231 topic_spec.set_deduplication(Some(Deduplication {
1232 bounds: Bounds {
1233 count: 1,
1234 age: None,
1235 },
1236 filter: Filter {
1237 transform: Transform {
1238 uses: "filter".to_string(),
1239 ..Default::default()
1240 },
1241 },
1242 }));
1243
1244 let mut dest = vec![];
1246 topic_spec.encode(&mut dest, prev_version).expect("encoded");
1247 let mut topic_spec_decoded = TopicSpec::default();
1248 topic_spec_decoded
1249 .decode(&mut Cursor::new(&dest), prev_version)
1250 .expect("decoded");
1251
1252 assert!(topic_spec_decoded.deduplication.is_none());
1254 }
1255
1256 #[test]
1257 fn test_partition_map_str() {
1258 let p1: PartitionMaps =
1260 vec![(0, vec![0, 1, 3]), (1, vec![0, 2, 3]), (2, vec![1, 3, 4])].into();
1261 let spec = ReplicaSpec::new_assigned(p1);
1262 assert_eq!(
1263 spec.partition_map_str(),
1264 Some("0:[0, 1, 3], 1:[0, 2, 3], 2:[1, 3, 4]".to_string())
1265 );
1266
1267 let p2 = PartitionMaps::default();
1269 let spec2 = ReplicaSpec::new_assigned(p2);
1270 assert_eq!(spec2.partition_map_str(), Some("".to_string()));
1271 }
1272
1273 #[test]
1274 fn test_deserialize_home_mirror_config() {
1275 let data = r#"{"partitions":[{"remoteCluster":"boat1","remoteReplica":"boats-0","source":false},{"remoteCluster":"boat2","remoteReplica":"boats-0","source":false}]}"#;
1276 let home_mirror: HomeMirrorConfig = serde_json::from_str(data).expect("deserialize");
1277 assert_eq!(home_mirror.partitions().len(), 2);
1278 }
1279}
1280
1281#[cfg(test)]
1282mod mirror_test {
1283 use crate::{
1284 topic::{PartitionMap, HomeMirrorConfig},
1285 partition::{PartitionMirrorConfig, HomePartitionConfig},
1286 };
1287
1288 #[test]
1290 fn test_home_mirror_conversion() {
1291 let mirror =
1292 HomeMirrorConfig::from_simple("boats", vec!["boat1".to_owned(), "boat2".to_owned()]);
1293 assert_eq!(
1294 mirror.as_partition_maps(),
1295 vec![
1296 PartitionMap {
1297 id: 0,
1298 mirror: Some(PartitionMirrorConfig::Home(HomePartitionConfig {
1299 remote_replica: "boats-0".to_string(),
1300 remote_cluster: "boat1".to_owned(),
1301 ..Default::default()
1302 })),
1303 ..Default::default()
1304 },
1305 PartitionMap {
1306 id: 1,
1307 mirror: Some(PartitionMirrorConfig::Home(HomePartitionConfig {
1308 remote_replica: "boats-0".to_string(),
1309 remote_cluster: "boat2".to_string(),
1310 ..Default::default()
1311 })),
1312 replicas: vec![],
1313 },
1314 ]
1315 .into()
1316 );
1317 }
1318}