fluvio_controlplane_metadata/topic/
deduplication.rs

1use std::{time::Duration, collections::BTreeMap};
2
3use derive_builder::Builder;
4use fluvio_protocol::{Encoder, Decoder};
5
6#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
7#[cfg_attr(
8    feature = "use_serde",
9    derive(serde::Serialize, serde::Deserialize),
10    serde(rename_all = "kebab-case")
11)]
12pub struct Deduplication {
13    pub bounds: Bounds,
14    pub filter: Filter,
15}
16
17#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
18#[cfg_attr(
19    feature = "use_serde",
20    derive(serde::Serialize, serde::Deserialize),
21    serde(rename_all = "kebab-case")
22)]
23pub struct Bounds {
24    #[cfg_attr(feature = "use_serde", serde(deserialize_with = "non_zero_count"))]
25    pub count: u64,
26    #[cfg_attr(
27        feature = "use_serde",
28        serde(
29            default,
30            skip_serializing_if = "Option::is_none",
31            with = "humantime_serde"
32        )
33    )]
34    pub age: Option<Duration>,
35}
36
37#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
38#[cfg_attr(
39    feature = "use_serde",
40    derive(serde::Serialize, serde::Deserialize),
41    serde(rename_all = "kebab-case")
42)]
43pub struct Filter {
44    pub transform: Transform,
45}
46
47#[derive(Debug, Default, Builder, Clone, PartialEq, Eq, Encoder, Decoder)]
48#[cfg_attr(
49    feature = "use_serde",
50    derive(serde::Serialize, serde::Deserialize),
51    serde(rename_all = "kebab-case")
52)]
53pub struct Transform {
54    pub uses: String,
55    #[cfg_attr(
56        feature = "use_serde",
57        serde(default, skip_serializing_if = "BTreeMap::is_empty")
58    )]
59    pub with: BTreeMap<String, String>,
60}
61
62#[cfg(feature = "use_serde")]
63fn non_zero_count<'de, D>(deserializer: D) -> Result<u64, D::Error>
64where
65    D: serde::Deserializer<'de>,
66{
67    use serde::Deserialize;
68    let count = u64::deserialize(deserializer)?;
69    if count == 0 {
70        Err(serde::de::Error::custom("count must be non-zero"))
71    } else {
72        Ok(count)
73    }
74}
75
76#[cfg(test)]
77mod tests {
78    use super::*;
79    #[cfg(feature = "use_serde")]
80    use serde_yaml;
81
82    #[cfg(feature = "use_serde")]
83    #[test]
84    fn test_deserialize_bounds_missing_count() {
85        let yaml = r#"
86            age: 30s
87        "#;
88        let bounds: Result<Bounds, _> = serde_yaml::from_str(yaml);
89        assert_eq!(
90            bounds.unwrap_err().to_string(),
91            "missing field `count` at line 2 column 13"
92        );
93    }
94
95    #[cfg(feature = "use_serde")]
96    #[test]
97    fn test_deserialize_bounds_zero() {
98        let yaml = r#"
99            count: 0
100            age: 30s
101        "#;
102        let bounds: Result<Bounds, _> = serde_yaml::from_str(yaml);
103        assert_eq!(
104            bounds.unwrap_err().to_string(),
105            "count must be non-zero at line 2 column 13"
106        );
107    }
108
109    #[cfg(feature = "use_serde")]
110    #[test]
111    fn test_deserialize_bounds_non_zero() {
112        let yaml = r#"
113            count: 10
114            age: 30s
115        "#;
116        let bounds: Result<Bounds, _> = serde_yaml::from_str(yaml);
117        assert!(bounds.is_ok());
118    }
119}