danube_connect_core/message/
source_record.rs1use crate::{ConnectorError, ConnectorResult};
4use serde::Serialize;
5use serde_json::{json, Value};
6use std::collections::HashMap;
7use tracing::warn;
8
9#[derive(Debug, Clone, Serialize)]
14pub struct SourceRecord {
15 pub topic: String,
17 pub payload: Value,
19 pub attributes: HashMap<String, String>,
21 pub key: Option<String>,
23}
24
25impl SourceRecord {
26 pub fn new(topic: impl Into<String>, payload: Value) -> Self {
28 Self {
29 topic: topic.into(),
30 payload,
31 attributes: HashMap::new(),
32 key: None,
33 }
34 }
35
36 pub fn from_string(topic: impl Into<String>, payload: impl Into<String>) -> Self {
46 Self::new(topic, json!(payload.into()))
47 }
48
49 pub fn from_json<T: Serialize>(topic: impl Into<String>, data: T) -> ConnectorResult<Self> {
72 let value =
73 serde_json::to_value(data).map_err(|e| ConnectorError::Serialization(e.to_string()))?;
74 Ok(Self::new(topic, value))
75 }
76
77 pub fn from_number<T: Serialize>(topic: impl Into<String>, number: T) -> ConnectorResult<Self> {
87 let value = serde_json::to_value(number)
88 .map_err(|e| ConnectorError::Serialization(e.to_string()))?;
89
90 if !value.is_number() {
92 return Err(ConnectorError::Serialization(
93 "Value is not a number".to_string(),
94 ));
95 }
96
97 Ok(Self::new(topic, value))
98 }
99
100 pub fn from_avro<T: Serialize>(topic: impl Into<String>, data: T) -> ConnectorResult<Self> {
118 Self::from_json(topic, data)
120 }
121
122 pub fn from_bytes(topic: impl Into<String>, data: Vec<u8>) -> Self {
132 let base64_data = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, &data);
133 Self::new(
134 topic,
135 json!({
136 "data": base64_data,
137 "size": data.len()
138 }),
139 )
140 }
141
142 pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
144 self.attributes.insert(key.into(), value.into());
145 self
146 }
147
148 pub fn with_attributes(mut self, attrs: HashMap<String, String>) -> Self {
150 self.attributes.extend(attrs);
151 self
152 }
153
154 pub fn with_key(mut self, key: impl Into<String>) -> Self {
156 self.key = Some(key.into());
157 self
158 }
159
160 pub fn payload(&self) -> &Value {
162 &self.payload
163 }
164
165 pub(crate) fn serialize_with_schema(&self, schema_type: &str) -> ConnectorResult<Vec<u8>> {
170 match schema_type.to_lowercase().as_str() {
171 "json_schema" | "json" => {
172 serde_json::to_vec(&self.payload).map_err(|e| {
174 ConnectorError::Serialization(format!("JSON serialization failed: {}", e))
175 })
176 }
177 "string" => {
178 if let Some(s) = self.payload.as_str() {
180 Ok(s.as_bytes().to_vec())
181 } else {
182 Ok(self.payload.to_string().into_bytes())
184 }
185 }
186 "number" => {
187 serde_json::to_vec(&self.payload).map_err(|e| {
189 ConnectorError::Serialization(format!("Number serialization failed: {}", e))
190 })
191 }
192 "bytes" => {
193 if let Some(s) = self.payload.as_str() {
195 base64::Engine::decode(&base64::engine::general_purpose::STANDARD, s).map_err(
196 |e| ConnectorError::Serialization(format!("Invalid base64: {}", e)),
197 )
198 } else if let Some(obj) = self.payload.as_object() {
199 if let Some(data) = obj.get("data").and_then(|v| v.as_str()) {
200 base64::Engine::decode(&base64::engine::general_purpose::STANDARD, data)
201 .map_err(|e| {
202 ConnectorError::Serialization(format!("Invalid base64: {}", e))
203 })
204 } else {
205 Err(ConnectorError::Serialization(
206 "Expected 'data' field with base64 string".to_string(),
207 ))
208 }
209 } else {
210 Err(ConnectorError::Serialization(
211 "Cannot convert to bytes".to_string(),
212 ))
213 }
214 }
215 "avro" => {
216 serde_json::to_vec(&self.payload).map_err(|e| {
218 ConnectorError::Serialization(format!("Avro (JSON) serialization failed: {}", e))
219 })
220 }
221 "protobuf" => {
222 Err(ConnectorError::config(
224 "Protobuf serialization not yet implemented",
225 ))
226 }
227 _ => {
228 warn!(
230 "Unknown schema type '{}', defaulting to JSON serialization",
231 schema_type
232 );
233 serde_json::to_vec(&self.payload).map_err(|e| {
234 ConnectorError::Serialization(format!("JSON serialization failed: {}", e))
235 })
236 }
237 }
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244 use serde::Serialize;
245
246 #[test]
247 fn test_source_record_basic() {
248 let record = SourceRecord::new("/default/events", json!("test"));
249
250 assert_eq!(record.topic, "/default/events");
251 assert_eq!(record.payload, json!("test"));
252 assert!(record.attributes.is_empty());
253 assert!(record.key.is_none());
254 }
255
256 #[test]
257 fn test_source_record_from_string() {
258 let record = SourceRecord::from_string("/default/events", "test message");
259
260 assert_eq!(record.payload, json!("test message"));
261 assert_eq!(record.payload.as_str().unwrap(), "test message");
262 }
263
264 #[test]
265 fn test_source_record_from_json() {
266 #[derive(Serialize)]
267 struct TestData {
268 name: String,
269 value: i32,
270 }
271
272 let data = TestData {
273 name: "test".to_string(),
274 value: 42,
275 };
276
277 let record = SourceRecord::from_json("/default/events", data).unwrap();
278
279 assert_eq!(record.payload["name"], "test");
280 assert_eq!(record.payload["value"], 42);
281 }
282
283 #[test]
284 fn test_source_record_builder() {
285 let record = SourceRecord::new("/default/events", json!("test"))
286 .with_attribute("source", "test-connector")
287 .with_attribute("version", "1.0")
288 .with_key("user-123");
289
290 assert_eq!(
291 record.attributes.get("source"),
292 Some(&"test-connector".to_string())
293 );
294 assert_eq!(record.attributes.get("version"), Some(&"1.0".to_string()));
295 assert_eq!(record.key, Some("user-123".to_string()));
296 }
297
298 #[test]
299 fn test_source_record_from_number() {
300 let record = SourceRecord::from_number("/metrics/counter", 42).unwrap();
302 assert_eq!(record.payload, json!(42));
303 assert_eq!(record.payload.as_i64().unwrap(), 42);
304
305 let record = SourceRecord::from_number("/metrics/temperature", 23.5).unwrap();
307 assert_eq!(record.payload.as_f64().unwrap(), 23.5);
308
309 let record = SourceRecord::from_number("/metrics/balance", -100).unwrap();
311 assert_eq!(record.payload.as_i64().unwrap(), -100);
312 }
313
314 #[test]
315 fn test_source_record_from_avro() {
316 #[derive(Serialize)]
317 struct UserEvent {
318 user_id: String,
319 action: String,
320 timestamp: i64,
321 }
322
323 let event = UserEvent {
324 user_id: "user-123".to_string(),
325 action: "login".to_string(),
326 timestamp: 1234567890,
327 };
328
329 let record = SourceRecord::from_avro("/events/users", &event).unwrap();
330 assert_eq!(record.payload["user_id"], "user-123");
331 assert_eq!(record.payload["action"], "login");
332 assert_eq!(record.payload["timestamp"], 1234567890);
333 }
334
335 #[test]
336 fn test_source_record_from_bytes() {
337 let data = vec![0x48, 0x65, 0x6c, 0x6c, 0x6f]; let record = SourceRecord::from_bytes("/binary/data", data.clone());
339
340 assert!(record.payload.is_object());
342 assert!(record.payload["data"].is_string());
343 assert_eq!(record.payload["size"], 5);
344
345 let base64_data = record.payload["data"].as_str().unwrap();
347 let decoded = base64::Engine::decode(
348 &base64::engine::general_purpose::STANDARD,
349 base64_data,
350 )
351 .unwrap();
352 assert_eq!(decoded, data);
353 }
354}