Skip to main content

scry_protocol/
serializer.rs

1use crate::event::QueryEvent;
2use flexbuffers::FlexbufferSerializer as FlexSerializer;
3use serde::Serialize;
4use std::time::SystemTime;
5
6/// Serializes a batch of QueryEvents to FlexBuffers format
7///
8/// FlexBuffers is a schema-less binary format from the FlatBuffers project
9/// that provides efficient serialization without requiring code generation.
10///
11/// It's ideal for our use case: high performance, compact, and works with serde.
12pub struct FlatBuffersSerializer;
13
14#[derive(Serialize)]
15struct QueryEventBatch<'a> {
16    events: &'a [SerializableEvent<'a>],
17    proxy_id: &'a str,
18    batch_seq: u64,
19}
20
21#[derive(Serialize)]
22struct SerializableEvent<'a> {
23    event_id: &'a str,
24    timestamp_us: u64,
25    query: &'a str,
26    #[serde(skip_serializing_if = "Vec::is_empty")]
27    params: Vec<String>,
28    #[serde(skip_serializing_if = "std::ops::Not::not")]
29    params_incomplete: bool,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    normalized_query: Option<&'a str>,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    value_fingerprints: Option<&'a [String]>,
34    duration_us: u64,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    rows: Option<u64>,
37    success: bool,
38    #[serde(skip_serializing_if = "Option::is_none")]
39    error: Option<&'a str>,
40    database: &'a str,
41    connection_id: &'a str,
42}
43
44impl FlatBuffersSerializer {
45    /// Serialize a batch of events to FlexBuffers binary format
46    ///
47    /// Returns the serialized bytes ready to send over the wire
48    pub fn serialize_batch(events: &[QueryEvent], proxy_id: &str, batch_seq: u64) -> Vec<u8> {
49        // Convert QueryEvent to SerializableEvent
50        let serializable_events: Vec<SerializableEvent> = events
51            .iter()
52            .map(|event| Self::to_serializable(event))
53            .collect();
54
55        let batch = QueryEventBatch {
56            events: &serializable_events,
57            proxy_id,
58            batch_seq,
59        };
60
61        // Serialize to FlexBuffers
62        let mut serializer = FlexSerializer::new();
63        batch.serialize(&mut serializer).expect("FlexBuffers serialization should not fail");
64        serializer.view().to_vec()
65    }
66
67    fn to_serializable(event: &QueryEvent) -> SerializableEvent<'_> {
68        // Convert timestamp to microseconds
69        let timestamp_us = event
70            .timestamp
71            .duration_since(SystemTime::UNIX_EPOCH)
72            .unwrap_or_default()
73            .as_micros() as u64;
74
75        // Convert duration to microseconds
76        let duration_us = event.duration.as_micros() as u64;
77
78        // Serialize params to JSON strings
79        let params: Vec<String> = event
80            .params
81            .iter()
82            .map(|p| serde_json::to_string(p).unwrap_or_default())
83            .collect();
84
85        SerializableEvent {
86            event_id: &event.event_id,
87            timestamp_us,
88            query: &event.query,
89            params,
90            params_incomplete: event.params_incomplete,
91            normalized_query: event.normalized_query.as_deref(),
92            value_fingerprints: event.value_fingerprints.as_deref(),
93            duration_us,
94            rows: event.rows,
95            success: event.success,
96            error: event.error.as_deref(),
97            database: &event.database,
98            connection_id: &event.connection_id,
99        }
100    }
101}
102
103#[cfg(test)]
104mod tests {
105    use super::*;
106    use crate::event::QueryEventBuilder;
107    use std::time::Duration;
108
109    #[test]
110    fn test_serialize_single_event() {
111        let event = QueryEventBuilder::new("SELECT 1")
112            .connection_id("conn-123")
113            .database("testdb")
114            .duration(Duration::from_millis(5))
115            .build();
116
117        let bytes = FlatBuffersSerializer::serialize_batch(&[event], "proxy-1", 42);
118
119        // Verify we got some bytes
120        assert!(!bytes.is_empty());
121
122        // Basic sanity check - FlatBuffers has a file identifier at the start
123        assert!(bytes.len() > 4);
124    }
125
126    #[test]
127    fn test_serialize_batch() {
128        let events = vec![
129            QueryEventBuilder::new("SELECT 1")
130                .connection_id("conn-1")
131                .database("db1")
132                .duration(Duration::from_millis(5))
133                .build(),
134            QueryEventBuilder::new("SELECT 2")
135                .connection_id("conn-2")
136                .database("db2")
137                .duration(Duration::from_millis(10))
138                .build(),
139        ];
140
141        let bytes = FlatBuffersSerializer::serialize_batch(&events, "proxy-1", 1);
142
143        assert!(!bytes.is_empty());
144        // Batch should be larger than single event
145        assert!(bytes.len() > 100);
146    }
147
148    #[test]
149    fn test_serialize_with_anonymization() {
150        let event = QueryEventBuilder::new("SELECT * FROM users WHERE id = ?")
151            .normalized_query("SELECT * FROM users WHERE id = ?")
152            .value_fingerprints(vec!["abc123hash".to_string()])
153            .connection_id("conn-1")
154            .database("db1")
155            .duration(Duration::from_millis(5))
156            .build();
157
158        let bytes = FlatBuffersSerializer::serialize_batch(&[event], "proxy-1", 1);
159
160        assert!(!bytes.is_empty());
161    }
162
163    #[test]
164    fn test_serialize_with_error() {
165        let event = QueryEventBuilder::new("INVALID SQL")
166            .connection_id("conn-1")
167            .database("db1")
168            .duration(Duration::from_millis(1))
169            .success(false)
170            .error("syntax error")
171            .build();
172
173        let bytes = FlatBuffersSerializer::serialize_batch(&[event], "proxy-1", 1);
174
175        assert!(!bytes.is_empty());
176    }
177}