fluvio_controlplane_metadata/topic/
deduplication.rs1use std::{time::Duration, collections::BTreeMap};
2
3use derive_builder::Builder;
4use fluvio_protocol::{Encoder, Decoder};
5
6#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
7#[cfg_attr(
8 feature = "use_serde",
9 derive(serde::Serialize, serde::Deserialize),
10 derive(schemars::JsonSchema),
11 serde(rename_all = "kebab-case")
12)]
13pub struct Deduplication {
14 pub bounds: Bounds,
15 pub filter: Filter,
16}
17
18#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
19#[cfg_attr(
20 feature = "use_serde",
21 derive(serde::Serialize, serde::Deserialize),
22 derive(schemars::JsonSchema),
23 serde(rename_all = "kebab-case")
24)]
25pub struct Bounds {
26 #[cfg_attr(feature = "use_serde", serde(deserialize_with = "non_zero_count"))]
27 pub count: u64,
28 #[cfg_attr(
29 feature = "use_serde",
30 serde(
31 default,
32 skip_serializing_if = "Option::is_none",
33 with = "humantime_serde"
34 ),
35 schemars(with = "String")
36 )]
37 pub age: Option<Duration>,
38}
39
40#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
41#[cfg_attr(
42 feature = "use_serde",
43 derive(serde::Serialize, serde::Deserialize),
44 derive(schemars::JsonSchema),
45 serde(rename_all = "kebab-case")
46)]
47pub struct Filter {
48 pub transform: Transform,
49}
50
51#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
52#[cfg_attr(
53 feature = "use_serde",
54 derive(serde::Serialize, serde::Deserialize),
55 derive(schemars::JsonSchema),
56 serde(rename_all = "kebab-case")
57)]
58pub struct Transform {
59 pub uses: String,
60 #[cfg_attr(
61 feature = "use_serde",
62 serde(default, skip_serializing_if = "BTreeMap::is_empty")
63 )]
64 pub with: BTreeMap<String, String>,
65}
66
67#[cfg(feature = "use_serde")]
68fn non_zero_count<'de, D>(deserializer: D) -> Result<u64, D::Error>
69where
70 D: serde::Deserializer<'de>,
71{
72 use serde::Deserialize;
73 let count = u64::deserialize(deserializer)?;
74 if count == 0 {
75 Err(serde::de::Error::custom("count must be non-zero"))
76 } else {
77 Ok(count)
78 }
79}
80
81#[cfg(test)]
82mod tests {
83 use super::*;
84 #[cfg(feature = "use_serde")]
85 use serde_yaml;
86
87 #[cfg(feature = "use_serde")]
88 #[test]
89 fn test_deserialize_bounds_missing_count() {
90 let yaml = r#"
91 age: 30s
92 "#;
93 let bounds: Result<Bounds, _> = serde_yaml::from_str(yaml);
94 assert_eq!(
95 bounds.unwrap_err().to_string(),
96 "missing field `count` at line 2 column 13"
97 );
98 }
99
100 #[cfg(feature = "use_serde")]
101 #[test]
102 fn test_deserialize_bounds_zero() {
103 let yaml = r#"
104 count: 0
105 age: 30s
106 "#;
107 let bounds: Result<Bounds, _> = serde_yaml::from_str(yaml);
108 assert_eq!(
109 bounds.unwrap_err().to_string(),
110 "count must be non-zero at line 2 column 13"
111 );
112 }
113
114 #[cfg(feature = "use_serde")]
115 #[test]
116 fn test_deserialize_bounds_non_zero() {
117 let yaml = r#"
118 count: 10
119 age: 30s
120 "#;
121 let bounds: Result<Bounds, _> = serde_yaml::from_str(yaml);
122 assert!(bounds.is_ok());
123 }
124}