yozefu_lib/kafka/
kafka_record.rs

1#[cfg(feature = "native")]
2use apache_avro::from_avro_datum;
3use serde::Deserialize;
4use serde::Serialize;
5#[cfg(feature = "native")]
6use serde_json::Error;
7use std::collections::BTreeMap;
8
9/// Inspired of the `[rdkafka::Message]` struct.
10/// Currently, we only support utf-8 string keys/values/headers.
11#[derive(Clone, Debug, Deserialize, Serialize, Hash, PartialEq, Eq, Default)]
12#[serde(rename_all = "lowercase")]
13pub struct KafkaRecord {
14    pub topic: String,
15    pub timestamp: Option<i64>,
16    pub partition: i32,
17    pub offset: i64,
18    pub headers: BTreeMap<String, String>,
19    #[serde(skip_serializing_if = "Option::is_none")]
20    pub key_schema: Option<Schema>,
21    #[serde(skip_serializing_if = "Option::is_none")]
22    pub value_schema: Option<Schema>,
23    /// Number of bytes in the key + the value
24    #[serde(default)]
25    pub size: usize,
26    /// A human readable representation of the key
27    pub key: DataType,
28    #[serde(skip_serializing, default)]
29    pub key_as_string: String,
30    /// A human readable representation of the value
31    pub value: DataType,
32    #[serde(skip_serializing, default)]
33    /// The value as a string. needed to be displayed in the TUI
34    pub value_as_string: String,
35}
36
37#[cfg(feature = "native")]
38use chrono::{DateTime, Local, Utc};
39#[cfg(feature = "native")]
40use rdkafka::message::{Headers, Message, OwnedMessage};
41
42#[cfg(feature = "native")]
43use super::SchemaRegistryClient;
44#[cfg(feature = "native")]
45use super::avro::avro_to_json;
46use super::data_type::DataType;
47use super::schema::Schema;
48#[cfg(feature = "native")]
49use super::schema::SchemaId;
50#[cfg(feature = "native")]
51use super::schema::SchemaType;
52#[cfg(feature = "native")]
53use super::schema_registry_client::SchemaResponse;
54
55#[cfg(feature = "native")]
56impl KafkaRecord {
57    pub fn timestamp_as_utc_date_time(&self) -> Option<DateTime<Utc>> {
58        DateTime::from_timestamp_millis(self.timestamp.unwrap_or(0))
59    }
60
61    pub fn timestamp_as_local_date_time(&self) -> Option<DateTime<Local>> {
62        self.timestamp_as_utc_date_time()
63            .map(DateTime::<Local>::from)
64    }
65
66    pub fn has_schemas(&self) -> bool {
67        self.key_schema.is_some() || self.value_schema.is_some()
68    }
69}
70
71#[cfg(feature = "native")]
72impl KafkaRecord {
73    pub async fn parse(
74        owned_message: OwnedMessage,
75        schema_registry: &mut Option<SchemaRegistryClient>,
76    ) -> Self {
77        let mut headers: BTreeMap<String, String> = BTreeMap::new();
78        if let Some(old_headers) = owned_message.headers() {
79            for header in old_headers.iter() {
80                headers.insert(
81                    header.key.to_string(),
82                    header
83                        .value
84                        .map(|e| {
85                            String::from_utf8(e.to_vec()).unwrap_or("<unable to parse>".to_string())
86                        })
87                        .unwrap_or_default(),
88                );
89            }
90        }
91
92        let size = owned_message.payload().map(|e| e.len()).unwrap_or(0)
93            + owned_message.key().map(|e| e.len()).unwrap_or(0);
94
95        let (key, key_schema) =
96            Self::extract_data_and_schema(owned_message.key(), schema_registry).await;
97        let (value, value_schema) =
98            Self::extract_data_and_schema(owned_message.payload(), schema_registry).await;
99
100        Self {
101            value_as_string: value.to_string(),
102            value,
103            key_as_string: key.to_string(),
104            key,
105            topic: owned_message.topic().to_string(),
106            timestamp: owned_message.timestamp().to_millis(),
107            partition: owned_message.partition(),
108            offset: owned_message.offset(),
109            headers,
110            key_schema,
111            value_schema,
112            size,
113        }
114    }
115
116    fn payload_to_data_type(payload: Option<&[u8]>, schema: &Option<SchemaResponse>) -> DataType {
117        if schema.is_none() {
118            return Self::deserialize_json(payload);
119        };
120
121        let schema = schema.as_ref().unwrap();
122        match schema.schema_type {
123            Some(SchemaType::Json) => Self::deserialize_json(payload),
124            Some(SchemaType::Avro) => Self::deserialize_avro(payload, &schema.schema),
125            Some(SchemaType::Protobuf) => Self::deserialize_protobuf(payload, &schema.schema),
126            None => Self::deserialize_json(payload),
127        }
128    }
129
130    /// Fallback to String if this is not json
131    /// Will I regret it ? Maybe
132    fn try_deserialize_json(payload: Option<&[u8]>) -> Result<DataType, Error> {
133        let payload = payload.unwrap_or_default();
134        match serde_json::from_slice(payload) {
135            Ok(e) => Ok(DataType::Json(e)),
136            Err(e) => Err(e),
137        }
138    }
139
140    /// Fallback to String if this is not json
141    /// Will I regret it ? Maybe
142    fn deserialize_json(payload: Option<&[u8]>) -> DataType {
143        match Self::try_deserialize_json(payload) {
144            Ok(e) => e,
145            Err(_e) => DataType::String(
146                String::from_utf8(payload.unwrap_or_default().to_vec()).unwrap_or_default(),
147            ),
148        }
149    }
150
151    fn deserialize_avro(payload: Option<&[u8]>, schema: &str) -> DataType {
152        let mut payload = payload.unwrap_or_default();
153        let parsed_schema = apache_avro::Schema::parse_str(schema);
154        if let Err(e) = &parsed_schema {
155            return DataType::String(format!(
156                "  Yozefu Error: The avro schema could not be parsed. Please check the schema in the schema registry.\n       Error: {}\n       Payload: {:?}\n        String: {}",
157                e,
158                payload,
159                String::from_utf8(payload.to_vec()).unwrap_or_default()
160            ));
161        }
162        let parsed_schema = parsed_schema.unwrap();
163        match from_avro_datum(&parsed_schema, &mut payload, None) {
164            Ok(value) => DataType::Json(avro_to_json(value)),
165            Err(e) => DataType::String(format!(
166                "  Yozefu Error: According to the schema registry, the record is serialized as avro but there was an issue deserializing the payload: {:?}\n       Payload: {:?}\n        String: {}",
167                e,
168                payload,
169                String::from_utf8(payload.to_vec()).unwrap_or_default()
170            )),
171        }
172    }
173
174    fn deserialize_protobuf(payload: Option<&[u8]>, schema: &str) -> DataType {
175        let payload = payload.unwrap_or_default();
176        DataType::String(format!(
177            "  Error: Protobuf deserialization is not supported yet in Yozefu. Any contribution is welcome!\n Github: https://github.com/MAIF/yozefu\nPayload: {:?}\n String: {}\n Schema:\n{}",
178            payload,
179            String::from_utf8(payload.to_vec())
180                .unwrap_or_default()
181                .trim(),
182            schema,
183        ))
184    }
185
186    /// Extract the data section from the payload prefixed with a schema section.
187    fn extract_data_from_payload_with_schema_header(payload: &[u8]) -> Option<&[u8]> {
188        if payload.len() <= 5 {
189            return None;
190        }
191        Some(&payload[5..])
192    }
193
194    async fn extract_data_and_schema(
195        payload: Option<&[u8]>,
196        schema_registry: &mut Option<SchemaRegistryClient>,
197    ) -> (DataType, Option<Schema>) {
198        let schema_id = SchemaId::parse(payload);
199        match (schema_id, schema_registry.as_mut()) {
200            (None, _) => (Self::payload_to_data_type(payload, &None), None),
201            (Some(id), None) => {
202                let payload = payload.unwrap_or_default();
203                match serde_json::from_slice(payload) {
204                    Ok(e) => (DataType::Json(e), None),
205                    Err(_e) => {
206                        match Self::try_deserialize_json(
207                            Self::extract_data_from_payload_with_schema_header(payload),
208                        ) {
209                            Ok(e) => (e, Some(Schema::new(id, None))),
210                            Err(_e) => (
211                                DataType::String(format!(
212                                    "Yozefu was not able to retrieve the schema {} because there is no schema registry configured. Please visit https://github.com/MAIF/yozefu/blob/main/docs/schema-registry/README.md for more details.\nPayload: {:?}\n String: {}",
213                                    id,
214                                    payload,
215                                    String::from_utf8(payload.to_vec()).unwrap_or_default()
216                                )),
217                                Some(Schema::new(id, None)),
218                            ),
219                        }
220                    }
221                }
222            }
223            (Some(s), Some(schema_registry)) => {
224                let p = payload.unwrap_or_default();
225                let (schema_response, schema) = match schema_registry.schema(s.0).await {
226                    Ok(Some(d)) => (Some(d.clone()), Some(Schema::new(s, d.schema_type))),
227                    Ok(None) => (None, Some(Schema::new(s, None))),
228                    Err(_e) => {
229                        let payload = payload.unwrap_or_default();
230                        return (
231                            DataType::String(format!(
232                                "{}.\nYozefu was not able to retrieve the schema {}.\nPlease make sure the schema registry is correctly configured.\nPayload: {:?}\n String: {}",
233                                _e,
234                                s.0,
235                                payload,
236                                String::from_utf8(payload.to_vec()).unwrap_or_default()
237                            )),
238                            Some(Schema::new(s, None)),
239                        );
240                    }
241                };
242                match p.len() <= 5 {
243                    true => (
244                        Self::payload_to_data_type(payload, &schema_response),
245                        schema,
246                    ),
247                    false => (
248                        Self::payload_to_data_type(
249                            payload.map(|e| e[5..].as_ref()),
250                            &schema_response,
251                        ),
252                        schema,
253                    ),
254                }
255            }
256        }
257    }
258}