scry_protocol/
serializer.rs1use crate::event::QueryEvent;
2use flexbuffers::FlexbufferSerializer as FlexSerializer;
3use serde::Serialize;
4use std::time::SystemTime;
5
6pub struct FlatBuffersSerializer;
13
14#[derive(Serialize)]
15struct QueryEventBatch<'a> {
16 events: &'a [SerializableEvent<'a>],
17 proxy_id: &'a str,
18 batch_seq: u64,
19}
20
21#[derive(Serialize)]
22struct SerializableEvent<'a> {
23 event_id: &'a str,
24 timestamp_us: u64,
25 query: &'a str,
26 #[serde(skip_serializing_if = "Vec::is_empty")]
27 params: Vec<String>,
28 #[serde(skip_serializing_if = "std::ops::Not::not")]
29 params_incomplete: bool,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 normalized_query: Option<&'a str>,
32 #[serde(skip_serializing_if = "Option::is_none")]
33 value_fingerprints: Option<&'a [String]>,
34 duration_us: u64,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 rows: Option<u64>,
37 success: bool,
38 #[serde(skip_serializing_if = "Option::is_none")]
39 error: Option<&'a str>,
40 database: &'a str,
41 connection_id: &'a str,
42}
43
44impl FlatBuffersSerializer {
45 pub fn serialize_batch(events: &[QueryEvent], proxy_id: &str, batch_seq: u64) -> Vec<u8> {
49 let serializable_events: Vec<SerializableEvent> = events
51 .iter()
52 .map(|event| Self::to_serializable(event))
53 .collect();
54
55 let batch = QueryEventBatch {
56 events: &serializable_events,
57 proxy_id,
58 batch_seq,
59 };
60
61 let mut serializer = FlexSerializer::new();
63 batch.serialize(&mut serializer).expect("FlexBuffers serialization should not fail");
64 serializer.view().to_vec()
65 }
66
67 fn to_serializable(event: &QueryEvent) -> SerializableEvent<'_> {
68 let timestamp_us = event
70 .timestamp
71 .duration_since(SystemTime::UNIX_EPOCH)
72 .unwrap_or_default()
73 .as_micros() as u64;
74
75 let duration_us = event.duration.as_micros() as u64;
77
78 let params: Vec<String> = event
80 .params
81 .iter()
82 .map(|p| serde_json::to_string(p).unwrap_or_default())
83 .collect();
84
85 SerializableEvent {
86 event_id: &event.event_id,
87 timestamp_us,
88 query: &event.query,
89 params,
90 params_incomplete: event.params_incomplete,
91 normalized_query: event.normalized_query.as_deref(),
92 value_fingerprints: event.value_fingerprints.as_deref(),
93 duration_us,
94 rows: event.rows,
95 success: event.success,
96 error: event.error.as_deref(),
97 database: &event.database,
98 connection_id: &event.connection_id,
99 }
100 }
101}
102
103#[cfg(test)]
104mod tests {
105 use super::*;
106 use crate::event::QueryEventBuilder;
107 use std::time::Duration;
108
109 #[test]
110 fn test_serialize_single_event() {
111 let event = QueryEventBuilder::new("SELECT 1")
112 .connection_id("conn-123")
113 .database("testdb")
114 .duration(Duration::from_millis(5))
115 .build();
116
117 let bytes = FlatBuffersSerializer::serialize_batch(&[event], "proxy-1", 42);
118
119 assert!(!bytes.is_empty());
121
122 assert!(bytes.len() > 4);
124 }
125
126 #[test]
127 fn test_serialize_batch() {
128 let events = vec![
129 QueryEventBuilder::new("SELECT 1")
130 .connection_id("conn-1")
131 .database("db1")
132 .duration(Duration::from_millis(5))
133 .build(),
134 QueryEventBuilder::new("SELECT 2")
135 .connection_id("conn-2")
136 .database("db2")
137 .duration(Duration::from_millis(10))
138 .build(),
139 ];
140
141 let bytes = FlatBuffersSerializer::serialize_batch(&events, "proxy-1", 1);
142
143 assert!(!bytes.is_empty());
144 assert!(bytes.len() > 100);
146 }
147
148 #[test]
149 fn test_serialize_with_anonymization() {
150 let event = QueryEventBuilder::new("SELECT * FROM users WHERE id = ?")
151 .normalized_query("SELECT * FROM users WHERE id = ?")
152 .value_fingerprints(vec!["abc123hash".to_string()])
153 .connection_id("conn-1")
154 .database("db1")
155 .duration(Duration::from_millis(5))
156 .build();
157
158 let bytes = FlatBuffersSerializer::serialize_batch(&[event], "proxy-1", 1);
159
160 assert!(!bytes.is_empty());
161 }
162
163 #[test]
164 fn test_serialize_with_error() {
165 let event = QueryEventBuilder::new("INVALID SQL")
166 .connection_id("conn-1")
167 .database("db1")
168 .duration(Duration::from_millis(1))
169 .success(false)
170 .error("syntax error")
171 .build();
172
173 let bytes = FlatBuffersSerializer::serialize_batch(&[event], "proxy-1", 1);
174
175 assert!(!bytes.is_empty());
176 }
177}