Skip to main content

faucet_common_kafka/
format.rs

1//! Kafka value/key serialization formats and policy enums shared by
2//! `faucet-source-kafka` and `faucet-sink-kafka`.
3
4use schemars::JsonSchema;
5use serde::{Deserialize, Serialize};
6
7/// How a Kafka message value (or key) is encoded on the wire.
8#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
9#[serde(tag = "type", rename_all = "snake_case")]
10pub enum KafkaValueFormat {
11    /// Parse value bytes as a JSON document.
12    #[default]
13    Json,
14    /// Treat value bytes as a UTF-8 string. Invalid UTF-8 fails per `OnDecodeError`.
15    RawString,
16    /// Pass value bytes through as a base64-encoded string inside the JSON record.
17    /// On the sink side, expects a base64 string in the source record.
18    Bytes,
19
20    /// Confluent-wire-format Avro: magic byte + schema_id + Avro binary.
21    #[cfg(feature = "schema-registry")]
22    ConfluentAvro {
23        schema_registry: crate::SchemaRegistryConfig,
24    },
25
26    /// Confluent-wire-format Protobuf: magic byte + schema_id + protobuf binary.
27    #[cfg(feature = "schema-registry")]
28    ConfluentProtobuf {
29        schema_registry: crate::SchemaRegistryConfig,
30    },
31
32    /// Confluent-wire-format JSON Schema: magic byte + schema_id + JSON payload.
33    /// When `validate` is true, decoded JSON is validated against the registered schema.
34    #[cfg(feature = "schema-registry")]
35    ConfluentJsonSchema {
36        schema_registry: crate::SchemaRegistryConfig,
37        #[serde(default)]
38        validate: bool,
39    },
40}
41
42impl KafkaValueFormat {
43    /// True for Confluent Schema Registry wire formats (`ConfluentAvro`,
44    /// `ConfluentProtobuf`, `ConfluentJsonSchema`), which require a schema to
45    /// encode on the sink side. Always `false` when the `schema-registry`
46    /// feature is disabled (those variants don't exist).
47    pub fn is_schema_registry(&self) -> bool {
48        #[cfg(feature = "schema-registry")]
49        {
50            matches!(
51                self,
52                KafkaValueFormat::ConfluentAvro { .. }
53                    | KafkaValueFormat::ConfluentProtobuf { .. }
54                    | KafkaValueFormat::ConfluentJsonSchema { .. }
55            )
56        }
57        #[cfg(not(feature = "schema-registry"))]
58        {
59            false
60        }
61    }
62}
63
64/// Producer-side compression for outbound batches.
65#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq, Eq, Default)]
66#[serde(rename_all = "lowercase")]
67pub enum CompressionType {
68    #[default]
69    None,
70    Gzip,
71    Snappy,
72    Lz4,
73    Zstd,
74}
75
76impl CompressionType {
77    pub fn as_str(&self) -> &'static str {
78        match self {
79            CompressionType::None => "none",
80            CompressionType::Gzip => "gzip",
81            CompressionType::Snappy => "snappy",
82            CompressionType::Lz4 => "lz4",
83            CompressionType::Zstd => "zstd",
84        }
85    }
86}
87
88/// What the source does when a single message fails to decode.
89#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq, Eq, Default)]
90#[serde(rename_all = "snake_case")]
91pub enum OnDecodeError {
92    /// Drop the message and continue (logs a warning).
93    Skip,
94    /// Surface `FaucetError::Source` and abort the batch.
95    #[default]
96    Fail,
97}
98
99/// What the sink does when key/partition extraction fails for a record.
100#[derive(Debug, Clone, Copy, Serialize, Deserialize, JsonSchema, PartialEq, Eq, Default)]
101#[serde(rename_all = "snake_case")]
102pub enum OnKeyError {
103    /// Drop the record (logs a warning).
104    Skip,
105    /// Surface `FaucetError::Sink` and abort the batch.
106    #[default]
107    Fail,
108    /// Send the record with no key (librdkafka picks the partition).
109    RoundRobin,
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115
116    #[test]
117    fn value_format_json_serializes_as_object_with_type() {
118        let v = serde_json::to_value(KafkaValueFormat::Json).unwrap();
119        assert_eq!(v["type"], "json");
120    }
121
122    #[test]
123    fn value_format_deserializes_raw_string() {
124        let parsed: KafkaValueFormat =
125            serde_json::from_value(serde_json::json!({"type": "raw_string"})).unwrap();
126        assert!(matches!(parsed, KafkaValueFormat::RawString));
127    }
128
129    #[test]
130    fn value_format_bytes_round_trip() {
131        let parsed: KafkaValueFormat =
132            serde_json::from_value(serde_json::json!({"type": "bytes"})).unwrap();
133        assert!(matches!(parsed, KafkaValueFormat::Bytes));
134    }
135
136    #[test]
137    fn compression_round_trip() {
138        for v in [
139            CompressionType::None,
140            CompressionType::Gzip,
141            CompressionType::Snappy,
142            CompressionType::Lz4,
143            CompressionType::Zstd,
144        ] {
145            let s = serde_json::to_value(v).unwrap();
146            let back: CompressionType = serde_json::from_value(s.clone()).unwrap();
147            assert_eq!(v, back);
148            assert_eq!(s.as_str().unwrap(), v.as_str());
149        }
150    }
151
152    #[test]
153    fn on_decode_error_default_is_fail() {
154        assert_eq!(OnDecodeError::default(), OnDecodeError::Fail);
155    }
156
157    #[test]
158    fn on_key_error_default_is_fail() {
159        assert_eq!(OnKeyError::default(), OnKeyError::Fail);
160    }
161
162    #[test]
163    fn compression_default_is_none() {
164        assert_eq!(CompressionType::default(), CompressionType::None);
165    }
166
167    #[test]
168    fn schema_for_format_types_compile() {
169        let _ = schemars::schema_for!(KafkaValueFormat);
170        let _ = schemars::schema_for!(CompressionType);
171        let _ = schemars::schema_for!(OnDecodeError);
172        let _ = schemars::schema_for!(OnKeyError);
173    }
174
175    #[cfg(feature = "schema-registry")]
176    #[test]
177    fn confluent_avro_round_trips_through_serde() {
178        let cfg = crate::SchemaRegistryConfig::new("http://localhost:8081");
179        let format = KafkaValueFormat::ConfluentAvro {
180            schema_registry: cfg,
181        };
182        let s = serde_json::to_value(&format).unwrap();
183        assert_eq!(s["type"], "confluent_avro");
184        let parsed: KafkaValueFormat = serde_json::from_value(s).unwrap();
185        assert!(matches!(parsed, KafkaValueFormat::ConfluentAvro { .. }));
186    }
187}