1#[cfg(feature = "native")]
2use apache_avro::from_avro_datum;
3use serde::Deserialize;
4use serde::Serialize;
5#[cfg(feature = "native")]
6use serde_json::Error;
7use std::collections::BTreeMap;
8
9#[derive(Clone, Debug, Deserialize, Serialize, Hash, PartialEq, Eq, Default)]
12#[serde(rename_all = "lowercase")]
13pub struct KafkaRecord {
14 pub topic: String,
15 pub timestamp: Option<i64>,
16 pub partition: i32,
17 pub offset: i64,
18 pub headers: BTreeMap<String, String>,
19 #[serde(skip_serializing_if = "Option::is_none")]
20 pub key_schema: Option<Schema>,
21 #[serde(skip_serializing_if = "Option::is_none")]
22 pub value_schema: Option<Schema>,
23 #[serde(default)]
25 pub size: usize,
26 pub key: DataType,
28 #[serde(skip_serializing, default)]
29 pub key_as_string: String,
30 pub value: DataType,
32 #[serde(skip_serializing, default)]
33 pub value_as_string: String,
35}
36
37#[cfg(feature = "native")]
38use chrono::{DateTime, Local, Utc};
39#[cfg(feature = "native")]
40use rdkafka::message::{Headers, Message, OwnedMessage};
41
42#[cfg(feature = "native")]
43use super::SchemaRegistryClient;
44#[cfg(feature = "native")]
45use super::avro::avro_to_json;
46use super::data_type::DataType;
47use super::schema::Schema;
48#[cfg(feature = "native")]
49use super::schema::SchemaId;
50#[cfg(feature = "native")]
51use super::schema::SchemaType;
52#[cfg(feature = "native")]
53use super::schema_registry_client::SchemaResponse;
54
55#[cfg(feature = "native")]
56impl KafkaRecord {
57 pub fn timestamp_as_utc_date_time(&self) -> Option<DateTime<Utc>> {
58 DateTime::from_timestamp_millis(self.timestamp.unwrap_or(0))
59 }
60
61 pub fn timestamp_as_local_date_time(&self) -> Option<DateTime<Local>> {
62 self.timestamp_as_utc_date_time()
63 .map(DateTime::<Local>::from)
64 }
65
66 pub fn has_schemas(&self) -> bool {
67 self.key_schema.is_some() || self.value_schema.is_some()
68 }
69}
70
71#[cfg(feature = "native")]
72impl KafkaRecord {
73 pub async fn parse(
74 owned_message: OwnedMessage,
75 schema_registry: &mut Option<SchemaRegistryClient>,
76 ) -> Self {
77 let mut headers: BTreeMap<String, String> = BTreeMap::new();
78 if let Some(old_headers) = owned_message.headers() {
79 for header in old_headers.iter() {
80 headers.insert(
81 header.key.to_string(),
82 header
83 .value
84 .map(|e| {
85 String::from_utf8(e.to_vec()).unwrap_or("<unable to parse>".to_string())
86 })
87 .unwrap_or_default(),
88 );
89 }
90 }
91
92 let size = owned_message.payload().map(|e| e.len()).unwrap_or(0)
93 + owned_message.key().map(|e| e.len()).unwrap_or(0);
94
95 let (key, key_schema) =
96 Self::extract_data_and_schema(owned_message.key(), schema_registry).await;
97 let (value, value_schema) =
98 Self::extract_data_and_schema(owned_message.payload(), schema_registry).await;
99
100 Self {
101 value_as_string: value.to_string(),
102 value,
103 key_as_string: key.to_string(),
104 key,
105 topic: owned_message.topic().to_string(),
106 timestamp: owned_message.timestamp().to_millis(),
107 partition: owned_message.partition(),
108 offset: owned_message.offset(),
109 headers,
110 key_schema,
111 value_schema,
112 size,
113 }
114 }
115
116 fn payload_to_data_type(payload: Option<&[u8]>, schema: &Option<SchemaResponse>) -> DataType {
117 if schema.is_none() {
118 return Self::deserialize_json(payload);
119 };
120
121 let schema = schema.as_ref().unwrap();
122 match schema.schema_type {
123 Some(SchemaType::Json) => Self::deserialize_json(payload),
124 Some(SchemaType::Avro) => Self::deserialize_avro(payload, &schema.schema),
125 Some(SchemaType::Protobuf) => Self::deserialize_protobuf(payload, &schema.schema),
126 None => Self::deserialize_json(payload),
127 }
128 }
129
130 fn try_deserialize_json(payload: Option<&[u8]>) -> Result<DataType, Error> {
133 let payload = payload.unwrap_or_default();
134 match serde_json::from_slice(payload) {
135 Ok(e) => Ok(DataType::Json(e)),
136 Err(e) => Err(e),
137 }
138 }
139
140 fn deserialize_json(payload: Option<&[u8]>) -> DataType {
143 match Self::try_deserialize_json(payload) {
144 Ok(e) => e,
145 Err(_e) => DataType::String(
146 String::from_utf8(payload.unwrap_or_default().to_vec()).unwrap_or_default(),
147 ),
148 }
149 }
150
151 fn deserialize_avro(payload: Option<&[u8]>, schema: &str) -> DataType {
152 let mut payload = payload.unwrap_or_default();
153 let parsed_schema = apache_avro::Schema::parse_str(schema);
154 if let Err(e) = &parsed_schema {
155 return DataType::String(format!(
156 " Yozefu Error: The avro schema could not be parsed. Please check the schema in the schema registry.\n Error: {}\n Payload: {:?}\n String: {}",
157 e,
158 payload,
159 String::from_utf8(payload.to_vec()).unwrap_or_default()
160 ));
161 }
162 let parsed_schema = parsed_schema.unwrap();
163 match from_avro_datum(&parsed_schema, &mut payload, None) {
164 Ok(value) => DataType::Json(avro_to_json(value)),
165 Err(e) => DataType::String(format!(
166 " Yozefu Error: According to the schema registry, the record is serialized as avro but there was an issue deserializing the payload: {:?}\n Payload: {:?}\n String: {}",
167 e,
168 payload,
169 String::from_utf8(payload.to_vec()).unwrap_or_default()
170 )),
171 }
172 }
173
174 fn deserialize_protobuf(payload: Option<&[u8]>, schema: &str) -> DataType {
175 let payload = payload.unwrap_or_default();
176 DataType::String(format!(
177 " Error: Protobuf deserialization is not supported yet in Yozefu. Any contribution is welcome!\n Github: https://github.com/MAIF/yozefu\nPayload: {:?}\n String: {}\n Schema:\n{}",
178 payload,
179 String::from_utf8(payload.to_vec())
180 .unwrap_or_default()
181 .trim(),
182 schema,
183 ))
184 }
185
186 fn extract_data_from_payload_with_schema_header(payload: &[u8]) -> Option<&[u8]> {
188 if payload.len() <= 5 {
189 return None;
190 }
191 Some(&payload[5..])
192 }
193
194 async fn extract_data_and_schema(
195 payload: Option<&[u8]>,
196 schema_registry: &mut Option<SchemaRegistryClient>,
197 ) -> (DataType, Option<Schema>) {
198 let schema_id = SchemaId::parse(payload);
199 match (schema_id, schema_registry.as_mut()) {
200 (None, _) => (Self::payload_to_data_type(payload, &None), None),
201 (Some(id), None) => {
202 let payload = payload.unwrap_or_default();
203 match serde_json::from_slice(payload) {
204 Ok(e) => (DataType::Json(e), None),
205 Err(_e) => {
206 match Self::try_deserialize_json(
207 Self::extract_data_from_payload_with_schema_header(payload),
208 ) {
209 Ok(e) => (e, Some(Schema::new(id, None))),
210 Err(_e) => (
211 DataType::String(format!(
212 "Yozefu was not able to retrieve the schema {} because there is no schema registry configured. Please visit https://github.com/MAIF/yozefu/blob/main/docs/schema-registry/README.md for more details.\nPayload: {:?}\n String: {}",
213 id,
214 payload,
215 String::from_utf8(payload.to_vec()).unwrap_or_default()
216 )),
217 Some(Schema::new(id, None)),
218 ),
219 }
220 }
221 }
222 }
223 (Some(s), Some(schema_registry)) => {
224 let p = payload.unwrap_or_default();
225 let (schema_response, schema) = match schema_registry.schema(s.0).await {
226 Ok(Some(d)) => (Some(d.clone()), Some(Schema::new(s, d.schema_type))),
227 Ok(None) => (None, Some(Schema::new(s, None))),
228 Err(_e) => {
229 let payload = payload.unwrap_or_default();
230 return (
231 DataType::String(format!(
232 "{}.\nYozefu was not able to retrieve the schema {}.\nPlease make sure the schema registry is correctly configured.\nPayload: {:?}\n String: {}",
233 _e,
234 s.0,
235 payload,
236 String::from_utf8(payload.to_vec()).unwrap_or_default()
237 )),
238 Some(Schema::new(s, None)),
239 );
240 }
241 };
242 match p.len() <= 5 {
243 true => (
244 Self::payload_to_data_type(payload, &schema_response),
245 schema,
246 ),
247 false => (
248 Self::payload_to_data_type(
249 payload.map(|e| e[5..].as_ref()),
250 &schema_response,
251 ),
252 schema,
253 ),
254 }
255 }
256 }
257 }
258}