faucet_common_kafka/
format.rs1use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
9#[serde(tag = "type", rename_all = "snake_case")]
10pub enum KafkaValueFormat {
11 #[default]
13 Json,
14 RawString,
16 Bytes,
19
20 #[cfg(feature = "schema-registry")]
22 ConfluentAvro {
23 schema_registry: crate::SchemaRegistryConfig,
24 },
25
26 #[cfg(feature = "schema-registry")]
28 ConfluentProtobuf {
29 schema_registry: crate::SchemaRegistryConfig,
30 },
31
32 #[cfg(feature = "schema-registry")]
35 ConfluentJsonSchema {
36 schema_registry: crate::SchemaRegistryConfig,
37 #[serde(default)]
38 validate: bool,
39 },
40}
41
42impl KafkaValueFormat {
43 pub fn is_schema_registry(&self) -> bool {
48 #[cfg(feature = "schema-registry")]
49 {
50 matches!(
51 self,
52 KafkaValueFormat::ConfluentAvro { .. }
53 | KafkaValueFormat::ConfluentProtobuf { .. }
54 | KafkaValueFormat::ConfluentJsonSchema { .. }
55 )
56 }
57 #[cfg(not(feature = "schema-registry"))]
58 {
59 false
60 }
61 }
62}
63
64#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq, Eq, Default)]
66#[serde(rename_all = "lowercase")]
67pub enum CompressionType {
68 #[default]
69 None,
70 Gzip,
71 Snappy,
72 Lz4,
73 Zstd,
74}
75
76impl CompressionType {
77 pub fn as_str(&self) -> &'static str {
78 match self {
79 CompressionType::None => "none",
80 CompressionType::Gzip => "gzip",
81 CompressionType::Snappy => "snappy",
82 CompressionType::Lz4 => "lz4",
83 CompressionType::Zstd => "zstd",
84 }
85 }
86}
87
88#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq, Eq, Default)]
90#[serde(rename_all = "snake_case")]
91pub enum OnDecodeError {
92 Skip,
94 #[default]
96 Fail,
97}
98
99#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq, Eq, Default)]
101#[serde(rename_all = "snake_case")]
102pub enum OnKeyError {
103 Skip,
105 #[default]
107 Fail,
108 RoundRobin,
110}
111
112#[cfg(test)]
113mod tests {
114 use super::*;
115
116 #[test]
117 fn value_format_json_serializes_as_object_with_type() {
118 let v = serde_json::to_value(KafkaValueFormat::Json).unwrap();
119 assert_eq!(v["type"], "json");
120 }
121
122 #[test]
123 fn value_format_deserializes_raw_string() {
124 let parsed: KafkaValueFormat =
125 serde_json::from_value(serde_json::json!({"type": "raw_string"})).unwrap();
126 assert!(matches!(parsed, KafkaValueFormat::RawString));
127 }
128
129 #[test]
130 fn value_format_bytes_round_trip() {
131 let parsed: KafkaValueFormat =
132 serde_json::from_value(serde_json::json!({"type": "bytes"})).unwrap();
133 assert!(matches!(parsed, KafkaValueFormat::Bytes));
134 }
135
136 #[test]
137 fn compression_round_trip() {
138 for v in [
139 CompressionType::None,
140 CompressionType::Gzip,
141 CompressionType::Snappy,
142 CompressionType::Lz4,
143 CompressionType::Zstd,
144 ] {
145 let s = serde_json::to_value(v).unwrap();
146 let back: CompressionType = serde_json::from_value(s.clone()).unwrap();
147 assert_eq!(v, back);
148 assert_eq!(s.as_str().unwrap(), v.as_str());
149 }
150 }
151
152 #[test]
153 fn on_decode_error_default_is_fail() {
154 assert_eq!(OnDecodeError::default(), OnDecodeError::Fail);
155 }
156
157 #[test]
158 fn on_key_error_default_is_fail() {
159 assert_eq!(OnKeyError::default(), OnKeyError::Fail);
160 }
161
162 #[test]
163 fn compression_default_is_none() {
164 assert_eq!(CompressionType::default(), CompressionType::None);
165 }
166
167 #[test]
168 fn schema_for_format_types_compile() {
169 let _ = schemars::schema_for!(KafkaValueFormat);
170 let _ = schemars::schema_for!(CompressionType);
171 let _ = schemars::schema_for!(OnDecodeError);
172 let _ = schemars::schema_for!(OnKeyError);
173 }
174
175 #[cfg(feature = "schema-registry")]
176 #[test]
177 fn confluent_avro_round_trips_through_serde() {
178 let cfg = crate::SchemaRegistryConfig::new("http://localhost:8081");
179 let format = KafkaValueFormat::ConfluentAvro {
180 schema_registry: cfg,
181 };
182 let s = serde_json::to_value(&format).unwrap();
183 assert_eq!(s["type"], "confluent_avro");
184 let parsed: KafkaValueFormat = serde_json::from_value(s).unwrap();
185 assert!(matches!(parsed, KafkaValueFormat::ConfluentAvro { .. }));
186 }
187}