yozefu_lib/kafka/
data_type.rs

1//! A kafka record is just a bunch of bytes. The key and the value of a record can be of different types.
2//! This module defines the different data types supported.
3//! More details about the bytes format when using a schema: <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
4use std::fmt::Display;
5
6use serde::Deserialize;
7use serde::Serialize;
8
9use crate::search::compare::StringOperator;
10
11#[derive(Clone, Debug, Deserialize, Serialize, Hash, PartialEq, Eq)]
12#[cfg_attr(test, derive(schemars::JsonSchema))]
13#[serde(untagged)]
14pub enum DataType {
15    Json(serde_json::Value),
16    String(String),
17}
18
19impl From<DataType> for serde_json::Value {
20    fn from(val: DataType) -> Self {
21        match val {
22            DataType::Json(value) => value,
23            DataType::String(s) => serde_json::Value::String(s),
24        }
25    }
26}
27
28impl Default for DataType {
29    fn default() -> Self {
30        Self::String(String::new())
31    }
32}
33
34pub trait Comparable {
35    fn compare(
36        &self,
37        json_pointer: &Option<String>,
38        operator: &StringOperator,
39        right: &str,
40    ) -> bool;
41}
42
43impl Comparable for DataType {
44    fn compare(
45        &self,
46        json_pointer: &Option<String>,
47        operator: &StringOperator,
48        right: &str,
49    ) -> bool {
50        match &self {
51            DataType::Json(value) => {
52                Self::compare_json(value, json_pointer.as_deref(), operator, right)
53            }
54            DataType::String(value) => Self::compare_string(value, operator, right),
55        }
56    }
57}
58
59impl DataType {
60    fn compare_json(
61        value: &serde_json::Value,
62        json_pointer: Option<&str>,
63        operator: &StringOperator,
64        right: &str,
65    ) -> bool {
66        let v = match json_pointer {
67            Some(path) => {
68                let path = path.replace(['.', '['], "/").replace(']', "");
69                match value.pointer(&path) {
70                    Some(d) => match d {
71                        serde_json::Value::Null => "null".to_string(),
72                        serde_json::Value::Bool(v) => v.to_string(),
73                        serde_json::Value::Number(v) => v.to_string(),
74                        serde_json::Value::String(v) => v.to_string(),
75                        serde_json::Value::Array(_) => return false,
76                        serde_json::Value::Object(_) => return false,
77                    },
78                    None => {
79                        return false;
80                    }
81                }
82            }
83            None => serde_json::to_string(value).unwrap(),
84        };
85        match operator {
86            StringOperator::Contain => v.contains(right),
87            StringOperator::Equal => v == right,
88            StringOperator::StartWith => v.starts_with(right),
89            StringOperator::NotEqual => v != right,
90        }
91    }
92
93    fn compare_string(value: &str, operator: &StringOperator, right: &str) -> bool {
94        match operator {
95            StringOperator::Contain => value.contains(right),
96            StringOperator::Equal => value == right,
97            StringOperator::StartWith => value.starts_with(right),
98            StringOperator::NotEqual => value != right,
99        }
100    }
101
102    pub fn raw(&self) -> String {
103        match &self {
104            DataType::Json(value) => match value {
105                serde_json::Value::Null => "null".to_string(),
106                serde_json::Value::Bool(b) => b.to_string(),
107                serde_json::Value::Number(number) => number.to_string(),
108                serde_json::Value::String(s) => s.to_string(),
109                serde_json::Value::Array(vec) => serde_json::to_string(vec).unwrap_or_default(),
110                serde_json::Value::Object(map) => serde_json::to_string(map).unwrap_or_default(),
111            },
112            DataType::String(s) => s.clone(),
113        }
114    }
115
116    pub fn to_string_pretty(&self) -> String {
117        match &self {
118            DataType::Json(value) => serde_json::to_string_pretty(value).unwrap_or_default(),
119            DataType::String(s) => s.clone(),
120        }
121    }
122}
123
124impl Display for DataType {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        match &self {
127            DataType::Json(value) => {
128                write!(f, "{}", serde_json::to_string(value).unwrap_or_default())
129            }
130            DataType::String(s) => write!(f, "{s}"),
131        }
132    }
133}
134
135#[test]
136fn test_compare_string() {
137    let data_type = DataType::String("hello world".to_string());
138    assert!(data_type.compare(&None, &StringOperator::Contain, "world"));
139    assert!(data_type.compare(&None, &StringOperator::Equal, "hello world"));
140    assert!(data_type.compare(&None, &StringOperator::StartWith, "hello"));
141    assert!(!data_type.compare(&None, &StringOperator::NotEqual, "hello world"));
142    assert!(data_type.compare(&None, &StringOperator::Equal, "hello world"));
143}
144
145#[test]
146fn test_compare_json() {
147    use serde_json::json;
148    let data_type = DataType::Json(json!({"hello": "world"}));
149    assert!(data_type.compare(&None, &StringOperator::Contain, "world"));
150    assert!(data_type.compare(&Some("/hello".into()), &StringOperator::Equal, "world"));
151    assert!(data_type.compare(&Some("/hello".into()), &StringOperator::StartWith, "world"));
152    assert!(!data_type.compare(&Some("/hello".into()), &StringOperator::NotEqual, "world"));
153    assert!(!data_type.compare(&None, &StringOperator::Equal, "goodbye"));
154}
155
156#[test]
157fn test_data_type_to_string() {
158    assert_eq!(
159        DataType::Json(serde_json::json!({"key": "value"})).to_string(),
160        r#"{"key":"value"}"#
161    );
162    assert_eq!(DataType::String("hello".into()).to_string(), "hello");
163}