1use crate::event::QueryEvent;
2use crate::ParamValue;
3use serde::Deserialize;
4use std::time::{Duration, UNIX_EPOCH};
5use thiserror::Error;
6
7#[derive(Error, Debug)]
9pub enum DeserializationError {
10 #[error("FlexBuffers deserialization failed: {0}")]
11 FlexBuffersError(String),
12
13 #[error("Invalid timestamp: {0}")]
14 InvalidTimestamp(String),
15
16 #[error("Missing required field: {0}")]
17 MissingField(&'static str),
18}
19
20#[derive(Debug)]
22pub struct DeserializedBatch {
23 pub events: Vec<QueryEvent>,
25
26 pub proxy_id: String,
28
29 pub batch_seq: u64,
31}
32
33pub struct FlexBuffersDeserializer;
35
36#[derive(Deserialize)]
37struct QueryEventBatch {
38 events: Vec<DeserializableEvent>,
39 proxy_id: String,
40 batch_seq: u64,
41}
42
43#[derive(Deserialize)]
44struct DeserializableEvent {
45 event_id: String,
46 timestamp_us: u64,
47 query: String,
48 #[serde(default)]
49 params: Vec<String>,
50 #[serde(default)]
51 params_incomplete: bool,
52 normalized_query: Option<String>,
53 value_fingerprints: Option<Vec<String>>,
54 duration_us: u64,
55 rows: Option<u64>,
56 success: bool,
57 error: Option<String>,
58 database: String,
59 connection_id: String,
60}
61
62impl FlexBuffersDeserializer {
63 pub fn deserialize_batch(bytes: &[u8]) -> Result<DeserializedBatch, DeserializationError> {
67 let batch: QueryEventBatch = flexbuffers::from_slice(bytes)
69 .map_err(|e| DeserializationError::FlexBuffersError(e.to_string()))?;
70
71 let events: Result<Vec<QueryEvent>, DeserializationError> = batch
73 .events
74 .into_iter()
75 .map(Self::to_query_event)
76 .collect();
77
78 Ok(DeserializedBatch {
79 events: events?,
80 proxy_id: batch.proxy_id,
81 batch_seq: batch.batch_seq,
82 })
83 }
84
85 fn to_query_event(event: DeserializableEvent) -> Result<QueryEvent, DeserializationError> {
86 let timestamp = UNIX_EPOCH
88 + Duration::from_micros(event.timestamp_us);
89
90 let duration = Duration::from_micros(event.duration_us);
92
93 let params: Vec<ParamValue> = event
95 .params
96 .iter()
97 .filter_map(|s| serde_json::from_str(s).ok())
98 .collect();
99
100 Ok(QueryEvent {
101 event_id: event.event_id,
102 timestamp,
103 query: event.query,
104 params,
105 params_incomplete: event.params_incomplete,
106 normalized_query: event.normalized_query,
107 value_fingerprints: event.value_fingerprints,
108 duration,
109 rows: event.rows,
110 success: event.success,
111 error: event.error,
112 database: event.database,
113 connection_id: event.connection_id,
114 })
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121 use crate::event::QueryEventBuilder;
122 use crate::serializer::FlatBuffersSerializer;
123
124 #[test]
125 fn test_deserialize_single_event() {
126 let event = QueryEventBuilder::new("SELECT 1")
128 .connection_id("conn-123")
129 .database("testdb")
130 .duration(Duration::from_millis(5))
131 .build();
132
133 let bytes = FlatBuffersSerializer::serialize_batch(&[event.clone()], "proxy-1", 42);
135
136 let batch = FlexBuffersDeserializer::deserialize_batch(&bytes)
138 .expect("Deserialization should succeed");
139
140 assert_eq!(batch.proxy_id, "proxy-1");
141 assert_eq!(batch.batch_seq, 42);
142 assert_eq!(batch.events.len(), 1);
143
144 let deserialized = &batch.events[0];
145 assert_eq!(deserialized.query, "SELECT 1");
146 assert_eq!(deserialized.connection_id, "conn-123");
147 assert_eq!(deserialized.database, "testdb");
148 assert_eq!(deserialized.duration, Duration::from_millis(5));
149 assert!(deserialized.success);
150 assert!(deserialized.error.is_none());
151 }
152
153 #[test]
154 fn test_deserialize_batch() {
155 let events = vec![
156 QueryEventBuilder::new("SELECT 1")
157 .connection_id("conn-1")
158 .database("db1")
159 .duration(Duration::from_millis(5))
160 .build(),
161 QueryEventBuilder::new("SELECT 2")
162 .connection_id("conn-2")
163 .database("db2")
164 .duration(Duration::from_millis(10))
165 .rows(42)
166 .build(),
167 ];
168
169 let bytes = FlatBuffersSerializer::serialize_batch(&events, "proxy-1", 1);
170 let batch = FlexBuffersDeserializer::deserialize_batch(&bytes)
171 .expect("Deserialization should succeed");
172
173 assert_eq!(batch.events.len(), 2);
174 assert_eq!(batch.events[0].query, "SELECT 1");
175 assert_eq!(batch.events[1].query, "SELECT 2");
176 assert_eq!(batch.events[1].rows, Some(42));
177 }
178
179 #[test]
180 fn test_deserialize_with_anonymization() {
181 let event = QueryEventBuilder::new("SELECT * FROM users WHERE id = ?")
182 .normalized_query("SELECT * FROM users WHERE id = ?")
183 .value_fingerprints(vec!["abc123hash".to_string()])
184 .connection_id("conn-1")
185 .database("db1")
186 .duration(Duration::from_millis(5))
187 .build();
188
189 let bytes = FlatBuffersSerializer::serialize_batch(&[event], "proxy-1", 1);
190 let batch = FlexBuffersDeserializer::deserialize_batch(&bytes)
191 .expect("Deserialization should succeed");
192
193 assert_eq!(batch.events.len(), 1);
194 assert_eq!(
195 batch.events[0].normalized_query,
196 Some("SELECT * FROM users WHERE id = ?".to_string())
197 );
198 assert_eq!(
199 batch.events[0].value_fingerprints,
200 Some(vec!["abc123hash".to_string()])
201 );
202 }
203
204 #[test]
205 fn test_deserialize_with_error() {
206 let event = QueryEventBuilder::new("INVALID SQL")
207 .connection_id("conn-1")
208 .database("db1")
209 .duration(Duration::from_millis(1))
210 .success(false)
211 .error("syntax error")
212 .build();
213
214 let bytes = FlatBuffersSerializer::serialize_batch(&[event], "proxy-1", 1);
215 let batch = FlexBuffersDeserializer::deserialize_batch(&bytes)
216 .expect("Deserialization should succeed");
217
218 assert_eq!(batch.events.len(), 1);
219 assert!(!batch.events[0].success);
220 assert_eq!(batch.events[0].error, Some("syntax error".to_string()));
221 }
222
223 #[test]
224 fn test_roundtrip_timestamp_precision() {
225 let event = QueryEventBuilder::new("SELECT 1")
226 .connection_id("conn-1")
227 .database("db1")
228 .duration(Duration::from_micros(12345))
229 .build();
230
231 let original_timestamp = event.timestamp;
232 let original_duration = event.duration;
233
234 let bytes = FlatBuffersSerializer::serialize_batch(&[event], "proxy-1", 0);
235 let batch = FlexBuffersDeserializer::deserialize_batch(&bytes)
236 .expect("Deserialization should succeed");
237
238 let deserialized_timestamp = batch.events[0].timestamp;
240 let diff = if deserialized_timestamp > original_timestamp {
241 deserialized_timestamp
242 .duration_since(original_timestamp)
243 .unwrap()
244 } else {
245 original_timestamp
246 .duration_since(deserialized_timestamp)
247 .unwrap()
248 };
249
250 assert!(diff < Duration::from_micros(1));
252
253 assert_eq!(batch.events[0].duration, original_duration);
255 }
256
257 #[test]
258 fn test_deserialize_invalid_data() {
259 let invalid_bytes = vec![0, 1, 2, 3, 4];
260 let result = FlexBuffersDeserializer::deserialize_batch(&invalid_bytes);
261 assert!(result.is_err());
262 }
263
264 #[test]
265 fn test_params_serialization_roundtrip() {
266 use crate::ParamValue;
267
268 let event = QueryEventBuilder::new("SELECT * FROM users WHERE id = $1")
269 .params(vec![ParamValue::Int32(42), ParamValue::Text("alice".into())])
270 .duration(Duration::from_millis(5))
271 .connection_id("conn-1")
272 .database("testdb")
273 .build();
274
275 let bytes = FlatBuffersSerializer::serialize_batch(&[event], "test-proxy", 1);
276 let result = FlexBuffersDeserializer::deserialize_batch(&bytes).unwrap();
277
278 assert_eq!(result.events.len(), 1);
279 assert_eq!(result.events[0].params.len(), 2);
280 assert_eq!(result.events[0].params[0], ParamValue::Int32(42));
281 assert_eq!(result.events[0].params[1], ParamValue::Text("alice".into()));
282 }
283
284 #[test]
285 fn test_params_incomplete_roundtrip() {
286 use crate::ParamValue;
287
288 let event = QueryEventBuilder::new("SELECT * FROM users WHERE id = $1")
289 .params(vec![ParamValue::Unknown { oid: 0, data: vec![0x01] }])
290 .params_incomplete(true)
291 .duration(Duration::from_millis(5))
292 .connection_id("conn-1")
293 .database("testdb")
294 .build();
295
296 let bytes = FlatBuffersSerializer::serialize_batch(&[event], "test-proxy", 1);
297 let result = FlexBuffersDeserializer::deserialize_batch(&bytes).unwrap();
298
299 assert_eq!(result.events.len(), 1);
300 assert!(result.events[0].params_incomplete);
301 }
302}