tx2_link/
serialization.rs1use crate::error::Result;
2use crate::protocol::*;
3use crate::debug;
4use serde::{Deserialize, Serialize};
5use bytes::{Bytes, BytesMut, BufMut};
6use std::time::Instant;
7
8pub use crate::protocol::{SerializedComponent, SerializedEntity};
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct WorldSnapshot {
12 pub entities: Vec<SerializedEntity>,
13 pub timestamp: f64,
14 pub version: String,
15}
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct Delta {
19 pub changes: Vec<DeltaChange>,
20 pub timestamp: f64,
21 pub base_timestamp: f64,
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum BinaryFormat {
26 Json,
27 MessagePack,
28 Bincode,
29}
30
31pub struct BinarySerializer {
32 format: BinaryFormat,
33}
34
35impl BinarySerializer {
36 pub fn new(format: BinaryFormat) -> Self {
37 Self { format }
38 }
39
40 pub fn json() -> Self {
41 Self::new(BinaryFormat::Json)
42 }
43
44 pub fn messagepack() -> Self {
45 Self::new(BinaryFormat::MessagePack)
46 }
47
48 pub fn bincode() -> Self {
49 Self::new(BinaryFormat::Bincode)
50 }
51
52 pub fn serialize_message(&self, message: &Message) -> Result<Bytes> {
53 let start = Instant::now();
54
55 let result = match self.format {
56 BinaryFormat::Json => {
57 let json = serde_json::to_vec(message)?;
58 Ok(Bytes::from(json))
59 }
60 BinaryFormat::MessagePack => {
61 let msgpack = rmp_serde::to_vec(message)?;
62 Ok(Bytes::from(msgpack))
63 }
64 BinaryFormat::Bincode => {
65 let bincode_data = bincode::serialize(message)?;
66 Ok(Bytes::from(bincode_data))
67 }
68 };
69
70 if let Ok(ref bytes) = result {
71 if debug::is_debug_enabled() {
72 debug::log_message("Serialized", message);
73 }
74
75 if debug::is_trace_enabled() {
76 let format_name = match self.format {
77 BinaryFormat::Json => "JSON",
78 BinaryFormat::MessagePack => "MessagePack",
79 BinaryFormat::Bincode => "Bincode",
80 };
81 debug::trace_serialization(format_name, bytes.len(), start.elapsed().as_micros());
82 }
83 }
84
85 result
86 }
87
88 pub fn deserialize_message(&self, data: &[u8]) -> Result<Message> {
89 let start = Instant::now();
90
91 let result = match self.format {
92 BinaryFormat::Json => {
93 let message = serde_json::from_slice(data)?;
94 Ok(message)
95 }
96 BinaryFormat::MessagePack => {
97 let message = rmp_serde::from_slice(data)?;
98 Ok(message)
99 }
100 BinaryFormat::Bincode => {
101 let message = bincode::deserialize(data)?;
102 Ok(message)
103 }
104 };
105
106 if let Ok(ref message) = result {
107 if debug::is_debug_enabled() {
108 debug::log_message("Deserialized", message);
109 }
110
111 if debug::is_trace_enabled() {
112 let format_name = match self.format {
113 BinaryFormat::Json => "JSON",
114 BinaryFormat::MessagePack => "MessagePack",
115 BinaryFormat::Bincode => "Bincode",
116 };
117 debug::trace_deserialization(format_name, data.len(), start.elapsed().as_micros());
118 }
119 }
120
121 result
122 }
123
124 pub fn serialize_snapshot(&self, snapshot: &WorldSnapshot) -> Result<Bytes> {
125 match self.format {
126 BinaryFormat::Json => {
127 let json = serde_json::to_vec(snapshot)?;
128 Ok(Bytes::from(json))
129 }
130 BinaryFormat::MessagePack => {
131 let msgpack = rmp_serde::to_vec(snapshot)?;
132 Ok(Bytes::from(msgpack))
133 }
134 BinaryFormat::Bincode => {
135 let bincode_data = bincode::serialize(snapshot)?;
136 Ok(Bytes::from(bincode_data))
137 }
138 }
139 }
140
141 pub fn deserialize_snapshot(&self, data: &[u8]) -> Result<WorldSnapshot> {
142 match self.format {
143 BinaryFormat::Json => {
144 let snapshot = serde_json::from_slice(data)?;
145 Ok(snapshot)
146 }
147 BinaryFormat::MessagePack => {
148 let snapshot = rmp_serde::from_slice(data)?;
149 Ok(snapshot)
150 }
151 BinaryFormat::Bincode => {
152 let snapshot = bincode::deserialize(data)?;
153 Ok(snapshot)
154 }
155 }
156 }
157
158 pub fn serialize_delta(&self, delta: &Delta) -> Result<Bytes> {
159 match self.format {
160 BinaryFormat::Json => {
161 let json = serde_json::to_vec(delta)?;
162 Ok(Bytes::from(json))
163 }
164 BinaryFormat::MessagePack => {
165 let msgpack = rmp_serde::to_vec(delta)?;
166 Ok(Bytes::from(msgpack))
167 }
168 BinaryFormat::Bincode => {
169 let bincode_data = bincode::serialize(delta)?;
170 Ok(Bytes::from(bincode_data))
171 }
172 }
173 }
174
175 pub fn deserialize_delta(&self, data: &[u8]) -> Result<Delta> {
176 match self.format {
177 BinaryFormat::Json => {
178 let delta = serde_json::from_slice(data)?;
179 Ok(delta)
180 }
181 BinaryFormat::MessagePack => {
182 let delta = rmp_serde::from_slice(data)?;
183 Ok(delta)
184 }
185 BinaryFormat::Bincode => {
186 let delta = bincode::deserialize(data)?;
187 Ok(delta)
188 }
189 }
190 }
191
192 pub fn serialize_component(&self, component: &SerializedComponent) -> Result<Bytes> {
193 match self.format {
194 BinaryFormat::Json => {
195 let json = serde_json::to_vec(component)?;
196 Ok(Bytes::from(json))
197 }
198 BinaryFormat::MessagePack => {
199 let msgpack = rmp_serde::to_vec(component)?;
200 Ok(Bytes::from(msgpack))
201 }
202 BinaryFormat::Bincode => {
203 let bincode_data = bincode::serialize(component)?;
204 Ok(Bytes::from(bincode_data))
205 }
206 }
207 }
208
209 pub fn deserialize_component(&self, data: &[u8]) -> Result<SerializedComponent> {
210 match self.format {
211 BinaryFormat::Json => {
212 let component = serde_json::from_slice(data)?;
213 Ok(component)
214 }
215 BinaryFormat::MessagePack => {
216 let component = rmp_serde::from_slice(data)?;
217 Ok(component)
218 }
219 BinaryFormat::Bincode => {
220 let component = bincode::deserialize(data)?;
221 Ok(component)
222 }
223 }
224 }
225
226 pub fn get_format(&self) -> BinaryFormat {
227 self.format
228 }
229}
230
231pub struct StreamingSerializer {
232 format: BinaryFormat,
233 buffer: BytesMut,
234}
235
236impl StreamingSerializer {
237 pub fn new(format: BinaryFormat) -> Self {
238 Self {
239 format,
240 buffer: BytesMut::with_capacity(8192),
241 }
242 }
243
244 pub fn write_message(&mut self, message: &Message) -> Result<()> {
245 let serializer = BinarySerializer::new(self.format);
246 let data = serializer.serialize_message(message)?;
247
248 let len = data.len() as u32;
249 self.buffer.put_u32_le(len);
250 self.buffer.put(data);
251
252 Ok(())
253 }
254
255 pub fn flush(&mut self) -> Bytes {
256 self.buffer.split().freeze()
257 }
258
259 pub fn clear(&mut self) {
260 self.buffer.clear();
261 }
262}
263
264pub struct StreamingDeserializer {
265 format: BinaryFormat,
266 buffer: BytesMut,
267}
268
269impl StreamingDeserializer {
270 pub fn new(format: BinaryFormat) -> Self {
271 Self {
272 format,
273 buffer: BytesMut::with_capacity(8192),
274 }
275 }
276
277 pub fn feed(&mut self, data: &[u8]) {
278 self.buffer.extend_from_slice(data);
279 }
280
281 pub fn try_read_message(&mut self) -> Result<Option<Message>> {
282 if self.buffer.len() < 4 {
283 return Ok(None);
284 }
285
286 let len = u32::from_le_bytes([
287 self.buffer[0],
288 self.buffer[1],
289 self.buffer[2],
290 self.buffer[3],
291 ]) as usize;
292
293 if self.buffer.len() < 4 + len {
294 return Ok(None);
295 }
296
297 self.buffer.advance(4);
298
299 let message_data = self.buffer.split_to(len);
300
301 let serializer = BinarySerializer::new(self.format);
302 let message = serializer.deserialize_message(&message_data)?;
303
304 Ok(Some(message))
305 }
306
307 pub fn clear(&mut self) {
308 self.buffer.clear();
309 }
310}
311
312trait Advance {
313 fn advance(&mut self, cnt: usize);
314}
315
316impl Advance for BytesMut {
317 fn advance(&mut self, cnt: usize) {
318 let _ = self.split_to(cnt);
319 }
320}
321
322#[cfg(test)]
323mod tests {
324 use super::*;
325
326 #[test]
327 fn test_json_serialization() {
328 let serializer = BinarySerializer::json();
329 let message = Message::ping(1);
330
331 let serialized = serializer.serialize_message(&message).unwrap();
332 let deserialized = serializer.deserialize_message(&serialized).unwrap();
333
334 assert_eq!(message.header.msg_type, deserialized.header.msg_type);
335 }
336
337 #[test]
338 fn test_messagepack_serialization() {
339 let serializer = BinarySerializer::messagepack();
340 let message = Message::ping(1);
341
342 let serialized = serializer.serialize_message(&message).unwrap();
343 let deserialized = serializer.deserialize_message(&serialized).unwrap();
344
345 assert_eq!(message.header.msg_type, deserialized.header.msg_type);
346 }
347
348 #[test]
349 fn test_bincode_serialization() {
350 let serializer = BinarySerializer::bincode();
351
352 let snapshot = WorldSnapshot {
353 entities: vec![],
354 timestamp: 100.0,
355 version: "1.0.0".to_string(),
356 };
357
358 let serialized = serializer.serialize_snapshot(&snapshot).unwrap();
359 let deserialized = serializer.deserialize_snapshot(&serialized).unwrap();
360
361 assert_eq!(snapshot.timestamp, deserialized.timestamp);
362 }
363
364 #[test]
365 fn test_streaming_serialization() {
366 let mut stream_serializer = StreamingSerializer::new(BinaryFormat::MessagePack);
367 let mut stream_deserializer = StreamingDeserializer::new(BinaryFormat::MessagePack);
368
369 let msg1 = Message::ping(1);
370 let msg2 = Message::pong(1);
371
372 stream_serializer.write_message(&msg1).unwrap();
373 stream_serializer.write_message(&msg2).unwrap();
374
375 let data = stream_serializer.flush();
376 stream_deserializer.feed(&data);
377
378 let decoded1 = stream_deserializer.try_read_message().unwrap().unwrap();
379 let decoded2 = stream_deserializer.try_read_message().unwrap().unwrap();
380
381 assert_eq!(msg1.header.msg_type, decoded1.header.msg_type);
382 assert_eq!(msg2.header.msg_type, decoded2.header.msg_type);
383 }
384
385 #[test]
386 fn test_snapshot_serialization() {
387 let snapshot = WorldSnapshot {
388 entities: vec![
389 SerializedEntity {
390 id: 1,
391 components: vec![
392 SerializedComponent {
393 id: "Position".to_string(),
394 data: ComponentData::from_json_value(serde_json::json!({"x": 10.0, "y": 20.0})),
395 }
396 ],
397 }
398 ],
399 timestamp: 123.456,
400 version: "1.0.0".to_string(),
401 };
402
403 let serializer = BinarySerializer::messagepack();
404 let serialized = serializer.serialize_snapshot(&snapshot).unwrap();
405 let deserialized = serializer.deserialize_snapshot(&serialized).unwrap();
406
407 assert_eq!(snapshot.entities.len(), deserialized.entities.len());
408 assert_eq!(snapshot.timestamp, deserialized.timestamp);
409 assert_eq!(snapshot.version, deserialized.version);
410 }
411}