Skip to main content

scry_protocol/
deserializer.rs

1use crate::event::QueryEvent;
2use crate::ParamValue;
3use serde::Deserialize;
4use std::time::{Duration, UNIX_EPOCH};
5use thiserror::Error;
6
7/// Errors that can occur during deserialization
8#[derive(Error, Debug)]
9pub enum DeserializationError {
10    #[error("FlexBuffers deserialization failed: {0}")]
11    FlexBuffersError(String),
12
13    #[error("Invalid timestamp: {0}")]
14    InvalidTimestamp(String),
15
16    #[error("Missing required field: {0}")]
17    MissingField(&'static str),
18}
19
20/// A deserialized batch of query events
21#[derive(Debug)]
22pub struct DeserializedBatch {
23    /// The events in this batch
24    pub events: Vec<QueryEvent>,
25
26    /// Proxy instance identifier
27    pub proxy_id: String,
28
29    /// Batch sequence number
30    pub batch_seq: u64,
31}
32
33/// Deserializes batches of QueryEvents from FlexBuffers format
34pub struct FlexBuffersDeserializer;
35
36#[derive(Deserialize)]
37struct QueryEventBatch {
38    events: Vec<DeserializableEvent>,
39    proxy_id: String,
40    batch_seq: u64,
41}
42
43#[derive(Deserialize)]
44struct DeserializableEvent {
45    event_id: String,
46    timestamp_us: u64,
47    query: String,
48    #[serde(default)]
49    params: Vec<String>,
50    #[serde(default)]
51    params_incomplete: bool,
52    normalized_query: Option<String>,
53    value_fingerprints: Option<Vec<String>>,
54    duration_us: u64,
55    rows: Option<u64>,
56    success: bool,
57    error: Option<String>,
58    database: String,
59    connection_id: String,
60}
61
62impl FlexBuffersDeserializer {
63    /// Deserialize a batch of events from FlexBuffers binary format
64    ///
65    /// Returns the deserialized events along with batch metadata
66    pub fn deserialize_batch(bytes: &[u8]) -> Result<DeserializedBatch, DeserializationError> {
67        // Deserialize from FlexBuffers
68        let batch: QueryEventBatch = flexbuffers::from_slice(bytes)
69            .map_err(|e| DeserializationError::FlexBuffersError(e.to_string()))?;
70
71        // Convert DeserializableEvent to QueryEvent
72        let events: Result<Vec<QueryEvent>, DeserializationError> = batch
73            .events
74            .into_iter()
75            .map(Self::to_query_event)
76            .collect();
77
78        Ok(DeserializedBatch {
79            events: events?,
80            proxy_id: batch.proxy_id,
81            batch_seq: batch.batch_seq,
82        })
83    }
84
85    fn to_query_event(event: DeserializableEvent) -> Result<QueryEvent, DeserializationError> {
86        // Convert timestamp from microseconds to SystemTime
87        let timestamp = UNIX_EPOCH
88            + Duration::from_micros(event.timestamp_us);
89
90        // Convert duration from microseconds
91        let duration = Duration::from_micros(event.duration_us);
92
93        // Parse params from JSON strings
94        let params: Vec<ParamValue> = event
95            .params
96            .iter()
97            .filter_map(|s| serde_json::from_str(s).ok())
98            .collect();
99
100        Ok(QueryEvent {
101            event_id: event.event_id,
102            timestamp,
103            query: event.query,
104            params,
105            params_incomplete: event.params_incomplete,
106            normalized_query: event.normalized_query,
107            value_fingerprints: event.value_fingerprints,
108            duration,
109            rows: event.rows,
110            success: event.success,
111            error: event.error,
112            database: event.database,
113            connection_id: event.connection_id,
114        })
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121    use crate::event::QueryEventBuilder;
122    use crate::serializer::FlatBuffersSerializer;
123
124    #[test]
125    fn test_deserialize_single_event() {
126        // Create an event
127        let event = QueryEventBuilder::new("SELECT 1")
128            .connection_id("conn-123")
129            .database("testdb")
130            .duration(Duration::from_millis(5))
131            .build();
132
133        // Serialize it
134        let bytes = FlatBuffersSerializer::serialize_batch(&[event.clone()], "proxy-1", 42);
135
136        // Deserialize it
137        let batch = FlexBuffersDeserializer::deserialize_batch(&bytes)
138            .expect("Deserialization should succeed");
139
140        assert_eq!(batch.proxy_id, "proxy-1");
141        assert_eq!(batch.batch_seq, 42);
142        assert_eq!(batch.events.len(), 1);
143
144        let deserialized = &batch.events[0];
145        assert_eq!(deserialized.query, "SELECT 1");
146        assert_eq!(deserialized.connection_id, "conn-123");
147        assert_eq!(deserialized.database, "testdb");
148        assert_eq!(deserialized.duration, Duration::from_millis(5));
149        assert!(deserialized.success);
150        assert!(deserialized.error.is_none());
151    }
152
153    #[test]
154    fn test_deserialize_batch() {
155        let events = vec![
156            QueryEventBuilder::new("SELECT 1")
157                .connection_id("conn-1")
158                .database("db1")
159                .duration(Duration::from_millis(5))
160                .build(),
161            QueryEventBuilder::new("SELECT 2")
162                .connection_id("conn-2")
163                .database("db2")
164                .duration(Duration::from_millis(10))
165                .rows(42)
166                .build(),
167        ];
168
169        let bytes = FlatBuffersSerializer::serialize_batch(&events, "proxy-1", 1);
170        let batch = FlexBuffersDeserializer::deserialize_batch(&bytes)
171            .expect("Deserialization should succeed");
172
173        assert_eq!(batch.events.len(), 2);
174        assert_eq!(batch.events[0].query, "SELECT 1");
175        assert_eq!(batch.events[1].query, "SELECT 2");
176        assert_eq!(batch.events[1].rows, Some(42));
177    }
178
179    #[test]
180    fn test_deserialize_with_anonymization() {
181        let event = QueryEventBuilder::new("SELECT * FROM users WHERE id = ?")
182            .normalized_query("SELECT * FROM users WHERE id = ?")
183            .value_fingerprints(vec!["abc123hash".to_string()])
184            .connection_id("conn-1")
185            .database("db1")
186            .duration(Duration::from_millis(5))
187            .build();
188
189        let bytes = FlatBuffersSerializer::serialize_batch(&[event], "proxy-1", 1);
190        let batch = FlexBuffersDeserializer::deserialize_batch(&bytes)
191            .expect("Deserialization should succeed");
192
193        assert_eq!(batch.events.len(), 1);
194        assert_eq!(
195            batch.events[0].normalized_query,
196            Some("SELECT * FROM users WHERE id = ?".to_string())
197        );
198        assert_eq!(
199            batch.events[0].value_fingerprints,
200            Some(vec!["abc123hash".to_string()])
201        );
202    }
203
204    #[test]
205    fn test_deserialize_with_error() {
206        let event = QueryEventBuilder::new("INVALID SQL")
207            .connection_id("conn-1")
208            .database("db1")
209            .duration(Duration::from_millis(1))
210            .success(false)
211            .error("syntax error")
212            .build();
213
214        let bytes = FlatBuffersSerializer::serialize_batch(&[event], "proxy-1", 1);
215        let batch = FlexBuffersDeserializer::deserialize_batch(&bytes)
216            .expect("Deserialization should succeed");
217
218        assert_eq!(batch.events.len(), 1);
219        assert!(!batch.events[0].success);
220        assert_eq!(batch.events[0].error, Some("syntax error".to_string()));
221    }
222
223    #[test]
224    fn test_roundtrip_timestamp_precision() {
225        let event = QueryEventBuilder::new("SELECT 1")
226            .connection_id("conn-1")
227            .database("db1")
228            .duration(Duration::from_micros(12345))
229            .build();
230
231        let original_timestamp = event.timestamp;
232        let original_duration = event.duration;
233
234        let bytes = FlatBuffersSerializer::serialize_batch(&[event], "proxy-1", 0);
235        let batch = FlexBuffersDeserializer::deserialize_batch(&bytes)
236            .expect("Deserialization should succeed");
237
238        // Check timestamp precision (microseconds)
239        let deserialized_timestamp = batch.events[0].timestamp;
240        let diff = if deserialized_timestamp > original_timestamp {
241            deserialized_timestamp
242                .duration_since(original_timestamp)
243                .unwrap()
244        } else {
245            original_timestamp
246                .duration_since(deserialized_timestamp)
247                .unwrap()
248        };
249
250        // Should be within 1 microsecond
251        assert!(diff < Duration::from_micros(1));
252
253        // Check duration precision
254        assert_eq!(batch.events[0].duration, original_duration);
255    }
256
257    #[test]
258    fn test_deserialize_invalid_data() {
259        let invalid_bytes = vec![0, 1, 2, 3, 4];
260        let result = FlexBuffersDeserializer::deserialize_batch(&invalid_bytes);
261        assert!(result.is_err());
262    }
263
264    #[test]
265    fn test_params_serialization_roundtrip() {
266        use crate::ParamValue;
267
268        let event = QueryEventBuilder::new("SELECT * FROM users WHERE id = $1")
269            .params(vec![ParamValue::Int32(42), ParamValue::Text("alice".into())])
270            .duration(Duration::from_millis(5))
271            .connection_id("conn-1")
272            .database("testdb")
273            .build();
274
275        let bytes = FlatBuffersSerializer::serialize_batch(&[event], "test-proxy", 1);
276        let result = FlexBuffersDeserializer::deserialize_batch(&bytes).unwrap();
277
278        assert_eq!(result.events.len(), 1);
279        assert_eq!(result.events[0].params.len(), 2);
280        assert_eq!(result.events[0].params[0], ParamValue::Int32(42));
281        assert_eq!(result.events[0].params[1], ParamValue::Text("alice".into()));
282    }
283
284    #[test]
285    fn test_params_incomplete_roundtrip() {
286        use crate::ParamValue;
287
288        let event = QueryEventBuilder::new("SELECT * FROM users WHERE id = $1")
289            .params(vec![ParamValue::Unknown { oid: 0, data: vec![0x01] }])
290            .params_incomplete(true)
291            .duration(Duration::from_millis(5))
292            .connection_id("conn-1")
293            .database("testdb")
294            .build();
295
296        let bytes = FlatBuffersSerializer::serialize_batch(&[event], "test-proxy", 1);
297        let result = FlexBuffersDeserializer::deserialize_batch(&bytes).unwrap();
298
299        assert_eq!(result.events.len(), 1);
300        assert!(result.events[0].params_incomplete);
301    }
302}