fluvio_controlplane_metadata/topic/
deduplication.rs

1use std::{time::Duration, collections::BTreeMap};
2
3use derive_builder::Builder;
4use fluvio_protocol::{Encoder, Decoder};
5
6#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
7#[cfg_attr(
8    feature = "use_serde",
9    derive(serde::Serialize, serde::Deserialize),
10    derive(schemars::JsonSchema),
11    serde(rename_all = "kebab-case")
12)]
13pub struct Deduplication {
14    pub bounds: Bounds,
15    pub filter: Filter,
16}
17
18#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
19#[cfg_attr(
20    feature = "use_serde",
21    derive(serde::Serialize, serde::Deserialize),
22    derive(schemars::JsonSchema),
23    serde(rename_all = "kebab-case")
24)]
25pub struct Bounds {
26    #[cfg_attr(feature = "use_serde", serde(deserialize_with = "non_zero_count"))]
27    pub count: u64,
28    #[cfg_attr(
29        feature = "use_serde",
30        serde(
31            default,
32            skip_serializing_if = "Option::is_none",
33            with = "humantime_serde"
34        ),
35        schemars(with = "String")
36    )]
37    pub age: Option<Duration>,
38}
39
40#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
41#[cfg_attr(
42    feature = "use_serde",
43    derive(serde::Serialize, serde::Deserialize),
44    derive(schemars::JsonSchema),
45    serde(rename_all = "kebab-case")
46)]
47pub struct Filter {
48    pub transform: Transform,
49}
50
51#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
52#[cfg_attr(
53    feature = "use_serde",
54    derive(serde::Serialize, serde::Deserialize),
55    derive(schemars::JsonSchema),
56    serde(rename_all = "kebab-case")
57)]
58pub struct Transform {
59    pub uses: String,
60    #[cfg_attr(
61        feature = "use_serde",
62        serde(default, skip_serializing_if = "BTreeMap::is_empty")
63    )]
64    pub with: BTreeMap<String, String>,
65}
66
67#[cfg(feature = "use_serde")]
68fn non_zero_count<'de, D>(deserializer: D) -> Result<u64, D::Error>
69where
70    D: serde::Deserializer<'de>,
71{
72    use serde::Deserialize;
73    let count = u64::deserialize(deserializer)?;
74    if count == 0 {
75        Err(serde::de::Error::custom("count must be non-zero"))
76    } else {
77        Ok(count)
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use super::*;
84    #[cfg(feature = "use_serde")]
85    use serde_yaml;
86
87    #[cfg(feature = "use_serde")]
88    #[test]
89    fn test_deserialize_bounds_missing_count() {
90        let yaml = r#"
91            age: 30s
92        "#;
93        let bounds: Result<Bounds, _> = serde_yaml::from_str(yaml);
94        assert_eq!(
95            bounds.unwrap_err().to_string(),
96            "missing field `count` at line 2 column 13"
97        );
98    }
99
100    #[cfg(feature = "use_serde")]
101    #[test]
102    fn test_deserialize_bounds_zero() {
103        let yaml = r#"
104            count: 0
105            age: 30s
106        "#;
107        let bounds: Result<Bounds, _> = serde_yaml::from_str(yaml);
108        assert_eq!(
109            bounds.unwrap_err().to_string(),
110            "count must be non-zero at line 2 column 13"
111        );
112    }
113
114    #[cfg(feature = "use_serde")]
115    #[test]
116    fn test_deserialize_bounds_non_zero() {
117        let yaml = r#"
118            count: 10
119            age: 30s
120        "#;
121        let bounds: Result<Bounds, _> = serde_yaml::from_str(yaml);
122        assert!(bounds.is_ok());
123    }
124}