danube_connect_core/message/
source_record.rs

1//! SourceRecord - messages from external systems to Danube
2
3use crate::{ConnectorError, ConnectorResult};
4use serde::Serialize;
5use serde_json::{json, Value};
6use std::collections::HashMap;
7use tracing::warn;
8
9/// Record passed from source connectors (External System → Danube)
10///
11/// Source connectors emit typed data as `serde_json::Value`. The runtime handles
12/// schema-based serialization before sending to Danube.
13#[derive(Debug, Clone, Serialize)]
14pub struct SourceRecord {
15    /// The topic to publish to
16    pub topic: String,
17    /// The message payload (typed data, not bytes)
18    pub payload: Value,
19    /// Optional message attributes/headers
20    pub attributes: HashMap<String, String>,
21    /// Optional routing key for partitioned topics (will be used when Danube supports it)
22    pub key: Option<String>,
23}
24
25impl SourceRecord {
26    /// Create a new SourceRecord with typed payload
27    pub fn new(topic: impl Into<String>, payload: Value) -> Self {
28        Self {
29            topic: topic.into(),
30            payload,
31            attributes: HashMap::new(),
32            key: None,
33        }
34    }
35
36    /// Create a SourceRecord from a string payload
37    ///
38    /// Use this for text-based data like log messages, plain text, or string values.
39    ///
40    /// # Example
41    /// ```ignore
42    /// let record = SourceRecord::from_string("/logs/application", "Server started successfully");
43    /// let record = SourceRecord::from_string("/events/notifications", format!("User {} logged in", user_id));
44    /// ```
45    pub fn from_string(topic: impl Into<String>, payload: impl Into<String>) -> Self {
46        Self::new(topic, json!(payload.into()))
47    }
48
49    /// Create a SourceRecord from any JSON-serializable object
50    ///
51    /// Use this for structured data types that implement `Serialize`.
52    /// The data will be converted to `serde_json::Value`.
53    ///
54    /// # Example
55    /// ```ignore
56    /// #[derive(Serialize)]
57    /// struct OrderEvent {
58    ///     order_id: String,
59    ///     amount: f64,
60    ///     currency: String,
61    /// }
62    ///
63    /// let order = OrderEvent {
64    ///     order_id: "ORD-12345".to_string(),
65    ///     amount: 99.99,
66    ///     currency: "USD".to_string(),
67    /// };
68    ///
69    /// let record = SourceRecord::from_json("/orders/created", &order)?;
70    /// ```
71    pub fn from_json<T: Serialize>(topic: impl Into<String>, data: T) -> ConnectorResult<Self> {
72        let value =
73            serde_json::to_value(data).map_err(|e| ConnectorError::Serialization(e.to_string()))?;
74        Ok(Self::new(topic, value))
75    }
76
77    /// Create a SourceRecord from a numeric value
78    ///
79    /// Supports integers and floats. The value will be stored as a JSON number.
80    ///
81    /// # Example
82    /// ```ignore
83    /// let record = SourceRecord::from_number("/metrics/counter", 42);
84    /// let record = SourceRecord::from_number("/metrics/temperature", 23.5);
85    /// ```
86    pub fn from_number<T: Serialize>(topic: impl Into<String>, number: T) -> ConnectorResult<Self> {
87        let value = serde_json::to_value(number)
88            .map_err(|e| ConnectorError::Serialization(e.to_string()))?;
89        
90        // Ensure it's actually a number
91        if !value.is_number() {
92            return Err(ConnectorError::Serialization(
93                "Value is not a number".to_string(),
94            ));
95        }
96        
97        Ok(Self::new(topic, value))
98    }
99
100    /// Create a SourceRecord from an Avro-compatible struct
101    ///
102    /// In Danube, Avro schemas use JSON serialization with schema validation.
103    /// This is an alias for `from_json()` for clarity when working with Avro schemas.
104    ///
105    /// # Example
106    /// ```ignore
107    /// #[derive(Serialize)]
108    /// struct UserEvent {
109    ///     user_id: String,
110    ///     action: String,
111    ///     timestamp: i64,
112    /// }
113    ///
114    /// let event = UserEvent { ... };
115    /// let record = SourceRecord::from_avro("/events/users", &event)?;
116    /// ```
117    pub fn from_avro<T: Serialize>(topic: impl Into<String>, data: T) -> ConnectorResult<Self> {
118        // Avro in Danube uses JSON serialization
119        Self::from_json(topic, data)
120    }
121
122    /// Create a SourceRecord from binary data (base64-encoded)
123    ///
124    /// The bytes will be base64-encoded and stored as a JSON object.
125    ///
126    /// # Example
127    /// ```ignore
128    /// let binary_data = vec![0x48, 0x65, 0x6c, 0x6c, 0x6f]; // "Hello"
129    /// let record = SourceRecord::from_bytes("/binary/data", binary_data);
130    /// ```
131    pub fn from_bytes(topic: impl Into<String>, data: Vec<u8>) -> Self {
132        let base64_data = base64::Engine::encode(&base64::engine::general_purpose::STANDARD, &data);
133        Self::new(
134            topic,
135            json!({
136                "data": base64_data,
137                "size": data.len()
138            }),
139        )
140    }
141
142    /// Add an attribute
143    pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
144        self.attributes.insert(key.into(), value.into());
145        self
146    }
147
148    /// Add multiple attributes
149    pub fn with_attributes(mut self, attrs: HashMap<String, String>) -> Self {
150        self.attributes.extend(attrs);
151        self
152    }
153
154    /// Set the routing key for partitioned topics
155    pub fn with_key(mut self, key: impl Into<String>) -> Self {
156        self.key = Some(key.into());
157        self
158    }
159
160    /// Get the payload as a reference
161    pub fn payload(&self) -> &Value {
162        &self.payload
163    }
164
165    /// Serialize payload to bytes based on schema type
166    ///
167    /// Converts serde_json::Value to bytes according to the schema type.
168    /// The actual schema validation happens in the broker.
169    pub(crate) fn serialize_with_schema(&self, schema_type: &str) -> ConnectorResult<Vec<u8>> {
170        match schema_type.to_lowercase().as_str() {
171            "json_schema" | "json" => {
172                // JSON Schema - serialize as JSON
173                serde_json::to_vec(&self.payload).map_err(|e| {
174                    ConnectorError::Serialization(format!("JSON serialization failed: {}", e))
175                })
176            }
177            "string" => {
178                // String type - convert to UTF-8 bytes
179                if let Some(s) = self.payload.as_str() {
180                    Ok(s.as_bytes().to_vec())
181                } else {
182                    // If not a string, serialize as JSON string
183                    Ok(self.payload.to_string().into_bytes())
184                }
185            }
186            "number" => {
187                // Number - serialize as JSON number
188                serde_json::to_vec(&self.payload).map_err(|e| {
189                    ConnectorError::Serialization(format!("Number serialization failed: {}", e))
190                })
191            }
192            "bytes" => {
193                // Bytes - try to extract from base64 string or object
194                if let Some(s) = self.payload.as_str() {
195                    base64::Engine::decode(&base64::engine::general_purpose::STANDARD, s).map_err(
196                        |e| ConnectorError::Serialization(format!("Invalid base64: {}", e)),
197                    )
198                } else if let Some(obj) = self.payload.as_object() {
199                    if let Some(data) = obj.get("data").and_then(|v| v.as_str()) {
200                        base64::Engine::decode(&base64::engine::general_purpose::STANDARD, data)
201                            .map_err(|e| {
202                                ConnectorError::Serialization(format!("Invalid base64: {}", e))
203                            })
204                    } else {
205                        Err(ConnectorError::Serialization(
206                            "Expected 'data' field with base64 string".to_string(),
207                        ))
208                    }
209                } else {
210                    Err(ConnectorError::Serialization(
211                        "Cannot convert to bytes".to_string(),
212                    ))
213                }
214            }
215            "avro" => {
216                // Avro in Danube uses JSON serialization with schema validation
217                serde_json::to_vec(&self.payload).map_err(|e| {
218                    ConnectorError::Serialization(format!("Avro (JSON) serialization failed: {}", e))
219                })
220            }
221            "protobuf" => {
222                // TODO: Implement Protobuf serialization
223                Err(ConnectorError::config(
224                    "Protobuf serialization not yet implemented",
225                ))
226            }
227            _ => {
228                // Unknown type - default to JSON
229                warn!(
230                    "Unknown schema type '{}', defaulting to JSON serialization",
231                    schema_type
232                );
233                serde_json::to_vec(&self.payload).map_err(|e| {
234                    ConnectorError::Serialization(format!("JSON serialization failed: {}", e))
235                })
236            }
237        }
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use serde::Serialize;
245
246    #[test]
247    fn test_source_record_basic() {
248        let record = SourceRecord::new("/default/events", json!("test"));
249
250        assert_eq!(record.topic, "/default/events");
251        assert_eq!(record.payload, json!("test"));
252        assert!(record.attributes.is_empty());
253        assert!(record.key.is_none());
254    }
255
256    #[test]
257    fn test_source_record_from_string() {
258        let record = SourceRecord::from_string("/default/events", "test message");
259
260        assert_eq!(record.payload, json!("test message"));
261        assert_eq!(record.payload.as_str().unwrap(), "test message");
262    }
263
264    #[test]
265    fn test_source_record_from_json() {
266        #[derive(Serialize)]
267        struct TestData {
268            name: String,
269            value: i32,
270        }
271
272        let data = TestData {
273            name: "test".to_string(),
274            value: 42,
275        };
276
277        let record = SourceRecord::from_json("/default/events", data).unwrap();
278
279        assert_eq!(record.payload["name"], "test");
280        assert_eq!(record.payload["value"], 42);
281    }
282
283    #[test]
284    fn test_source_record_builder() {
285        let record = SourceRecord::new("/default/events", json!("test"))
286            .with_attribute("source", "test-connector")
287            .with_attribute("version", "1.0")
288            .with_key("user-123");
289
290        assert_eq!(
291            record.attributes.get("source"),
292            Some(&"test-connector".to_string())
293        );
294        assert_eq!(record.attributes.get("version"), Some(&"1.0".to_string()));
295        assert_eq!(record.key, Some("user-123".to_string()));
296    }
297
298    #[test]
299    fn test_source_record_from_number() {
300        // Integer
301        let record = SourceRecord::from_number("/metrics/counter", 42).unwrap();
302        assert_eq!(record.payload, json!(42));
303        assert_eq!(record.payload.as_i64().unwrap(), 42);
304
305        // Float
306        let record = SourceRecord::from_number("/metrics/temperature", 23.5).unwrap();
307        assert_eq!(record.payload.as_f64().unwrap(), 23.5);
308
309        // Negative number
310        let record = SourceRecord::from_number("/metrics/balance", -100).unwrap();
311        assert_eq!(record.payload.as_i64().unwrap(), -100);
312    }
313
314    #[test]
315    fn test_source_record_from_avro() {
316        #[derive(Serialize)]
317        struct UserEvent {
318            user_id: String,
319            action: String,
320            timestamp: i64,
321        }
322
323        let event = UserEvent {
324            user_id: "user-123".to_string(),
325            action: "login".to_string(),
326            timestamp: 1234567890,
327        };
328
329        let record = SourceRecord::from_avro("/events/users", &event).unwrap();
330        assert_eq!(record.payload["user_id"], "user-123");
331        assert_eq!(record.payload["action"], "login");
332        assert_eq!(record.payload["timestamp"], 1234567890);
333    }
334
335    #[test]
336    fn test_source_record_from_bytes() {
337        let data = vec![0x48, 0x65, 0x6c, 0x6c, 0x6f]; // "Hello"
338        let record = SourceRecord::from_bytes("/binary/data", data.clone());
339
340        // Check the structure
341        assert!(record.payload.is_object());
342        assert!(record.payload["data"].is_string());
343        assert_eq!(record.payload["size"], 5);
344
345        // Verify base64 encoding
346        let base64_data = record.payload["data"].as_str().unwrap();
347        let decoded = base64::Engine::decode(
348            &base64::engine::general_purpose::STANDARD,
349            base64_data,
350        )
351        .unwrap();
352        assert_eq!(decoded, data);
353    }
354}