1use anyhow::{anyhow, Result};
10use bytes::{Buf, BufMut, Bytes, BytesMut};
11use futures::stream::{BoxStream, StreamExt as _};
12use std::collections::BTreeMap;
13use std::io::Read as _;
14use std::sync::Arc;
15
16use crate::serialization_decoder::DeltaCompressor;
17use crate::serialization_types::{
18 from_avro_value, get_default_avro_schema, to_avro_value, DeltaCompressionType,
19 ProtobufStreamEvent, SchemaRegistry, SerializationFormat, SerializerOptions,
20};
21use crate::{CompressionType, EventMetadata, StreamEvent};
22use tokio_stream::Stream;
23
24#[derive(Clone)]
26pub struct EventSerializer {
27 pub(crate) format: SerializationFormat,
28 pub(crate) compression: Option<CompressionType>,
29 pub(crate) schema_registry: Option<Arc<SchemaRegistry>>,
30 pub(crate) options: SerializerOptions,
31}
32
33impl EventSerializer {
34 pub fn new(format: SerializationFormat) -> Self {
36 Self {
37 format,
38 compression: None,
39 schema_registry: None,
40 options: SerializerOptions::default(),
41 }
42 }
43
44 pub fn with_compression(mut self, compression: CompressionType) -> Self {
46 self.compression = Some(compression);
47 self
48 }
49
50 pub fn with_schema_registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
52 self.schema_registry = Some(registry);
53 self
54 }
55
56 pub fn with_options(mut self, options: SerializerOptions) -> Self {
58 self.options = options;
59 self
60 }
61
62 pub async fn serialize(&self, event: &StreamEvent) -> Result<Bytes> {
64 let mut buffer = BytesMut::new();
65
66 if self.options.include_magic_bytes {
68 buffer.put(self.format.magic_bytes());
69 }
70
71 if self.options.include_schema_id {
73 if let Some(registry) = &self.schema_registry {
74 let schema_id = registry.get_schema_id_for_event(event).await?;
75 buffer.put_u32(schema_id.parse::<u32>().unwrap_or(0));
76 }
77 }
78
79 let serialized = match self.format {
81 SerializationFormat::Json => self.serialize_json(event)?,
82 SerializationFormat::Binary => self.serialize_binary(event)?,
83 SerializationFormat::MessagePack => self.serialize_messagepack(event)?,
84 SerializationFormat::Cbor => self.serialize_cbor(event)?,
85 SerializationFormat::Protobuf => self.serialize_protobuf(event)?,
86 SerializationFormat::Avro => self.serialize_avro(event).await?,
87 };
88
89 let data = if let Some(compression) = &self.compression {
91 self.compress(&serialized, compression)?
92 } else {
93 serialized
94 };
95
96 if let Some(max_size) = self.options.max_size {
98 if data.len() > max_size {
99 return Err(anyhow!(
100 "Serialized data exceeds maximum size: {} > {max_size}",
101 data.len()
102 ));
103 }
104 }
105
106 buffer.put(&data[..]);
107 Ok(buffer.freeze())
108 }
109
110 pub async fn deserialize(&self, data: &[u8]) -> Result<StreamEvent> {
112 let mut cursor = std::io::Cursor::new(data);
113 let mut offset = 0;
114
115 if self.options.include_magic_bytes && data.len() >= 4 {
117 let magic = &data[0..4];
118 if magic == self.format.magic_bytes() {
119 offset += 4;
120 cursor.set_position(4);
121 }
122 }
123
124 if self.options.include_schema_id
126 && self.schema_registry.is_some()
127 && data.len() >= offset + 4
128 {
129 offset += 4;
130 cursor.set_position(offset as u64);
131 }
132
133 let event_data = &data[offset..];
135
136 let decompressed = if let Some(compression) = &self.compression {
138 self.decompress(event_data, compression)?
139 } else {
140 event_data.to_vec()
141 };
142
143 match self.format {
145 SerializationFormat::Json => self.deserialize_json(&decompressed),
146 SerializationFormat::Binary => self.deserialize_binary(&decompressed),
147 SerializationFormat::MessagePack => self.deserialize_messagepack(&decompressed),
148 SerializationFormat::Cbor => self.deserialize_cbor(&decompressed),
149 SerializationFormat::Protobuf => self.deserialize_protobuf(&decompressed),
150 SerializationFormat::Avro => self.deserialize_avro(&decompressed).await,
151 }
152 }
153
154 fn serialize_json(&self, event: &StreamEvent) -> Result<Vec<u8>> {
156 if self.options.pretty_json {
157 serde_json::to_vec_pretty(event).map_err(|e| anyhow!("JSON serialization failed: {e}"))
158 } else {
159 serde_json::to_vec(event).map_err(|e| anyhow!("JSON serialization failed: {e}"))
160 }
161 }
162
163 fn deserialize_json(&self, data: &[u8]) -> Result<StreamEvent> {
165 serde_json::from_slice(data).map_err(|e| anyhow!("JSON deserialization failed: {e}"))
166 }
167
168 fn serialize_binary(&self, event: &StreamEvent) -> Result<Vec<u8>> {
170 let mut buffer = Vec::new();
172
173 buffer.push(1); let event_type = match event {
178 StreamEvent::TripleAdded { .. } => 1,
179 StreamEvent::TripleRemoved { .. } => 2,
180 StreamEvent::QuadAdded { .. } => 3,
181 StreamEvent::QuadRemoved { .. } => 4,
182 StreamEvent::GraphCreated { .. } => 5,
183 StreamEvent::GraphCleared { .. } => 6,
184 StreamEvent::GraphDeleted { .. } => 7,
185 StreamEvent::GraphMetadataUpdated { .. } => 17,
186 StreamEvent::GraphPermissionsChanged { .. } => 18,
187 StreamEvent::GraphStatisticsUpdated { .. } => 19,
188 StreamEvent::GraphRenamed { .. } => 20,
189 StreamEvent::GraphMerged { .. } => 21,
190 StreamEvent::GraphSplit { .. } => 22,
191 StreamEvent::SparqlUpdate { .. } => 8,
192 StreamEvent::TransactionBegin { .. } => 9,
193 StreamEvent::TransactionCommit { .. } => 10,
194 StreamEvent::TransactionAbort { .. } => 11,
195 StreamEvent::SchemaChanged { .. } => 12,
196 StreamEvent::SchemaDefinitionAdded { .. } => 23,
197 StreamEvent::SchemaDefinitionRemoved { .. } => 24,
198 StreamEvent::SchemaDefinitionModified { .. } => 25,
199 StreamEvent::OntologyImported { .. } => 26,
200 StreamEvent::OntologyRemoved { .. } => 27,
201 StreamEvent::ConstraintAdded { .. } => 28,
202 StreamEvent::ConstraintRemoved { .. } => 29,
203 StreamEvent::ConstraintViolated { .. } => 30,
204 StreamEvent::IndexCreated { .. } => 31,
205 StreamEvent::IndexDropped { .. } => 32,
206 StreamEvent::IndexRebuilt { .. } => 33,
207 StreamEvent::ShapeAdded { .. } => 34,
208 StreamEvent::ShapeRemoved { .. } => 35,
209 StreamEvent::ShapeModified { .. } => 36,
210 StreamEvent::ShapeValidationStarted { .. } => 37,
211 StreamEvent::ShapeValidationCompleted { .. } => 38,
212 StreamEvent::ShapeViolationDetected { .. } => 39,
213 StreamEvent::QueryResultAdded { .. } => 14,
214 StreamEvent::QueryResultRemoved { .. } => 15,
215 StreamEvent::QueryCompleted { .. } => 16,
216 StreamEvent::SchemaUpdated { .. } => 40,
217 StreamEvent::ShapeUpdated { .. } => 41,
218 StreamEvent::Heartbeat { .. } => 13,
219 StreamEvent::ErrorOccurred { .. } => 42,
220 };
221 buffer.push(event_type);
222
223 match event {
225 StreamEvent::TripleAdded {
226 subject,
227 predicate,
228 object,
229 graph,
230 metadata,
231 } => {
232 self.write_string(&mut buffer, subject);
233 self.write_string(&mut buffer, predicate);
234 self.write_string(&mut buffer, object);
235 self.write_optional_string(&mut buffer, graph.as_deref());
236 self.write_metadata(&mut buffer, metadata)?;
237 }
238 _ => {
240 return Err(anyhow!(
241 "Binary serialization not implemented for this event type"
242 ))
243 }
244 }
245
246 Ok(buffer)
247 }
248
249 fn write_string(&self, buffer: &mut Vec<u8>, s: &str) {
251 let bytes = s.as_bytes();
252 buffer.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
253 buffer.extend_from_slice(bytes);
254 }
255
256 fn write_optional_string(&self, buffer: &mut Vec<u8>, s: Option<&str>) {
258 match s {
259 Some(s) => {
260 buffer.push(1); self.write_string(buffer, s);
262 }
263 None => {
264 buffer.push(0); }
266 }
267 }
268
269 fn write_metadata(&self, buffer: &mut Vec<u8>, metadata: &EventMetadata) -> Result<()> {
271 let metadata_json = serde_json::to_vec(metadata)?;
273 buffer.extend_from_slice(&(metadata_json.len() as u32).to_le_bytes());
274 buffer.extend_from_slice(&metadata_json);
275 Ok(())
276 }
277
278 fn deserialize_binary(&self, data: &[u8]) -> Result<StreamEvent> {
280 if data.len() < 2 {
281 return Err(anyhow!("Binary data too short"));
282 }
283
284 let version = data[0];
285 if version != 1 {
286 return Err(anyhow!("Unsupported binary format version: {version}"));
287 }
288
289 let event_type = data[1];
290 let mut cursor = std::io::Cursor::new(&data[2..]);
291
292 match event_type {
293 1 => {
294 let subject = self.read_string(&mut cursor)?;
296 let predicate = self.read_string(&mut cursor)?;
297 let object = self.read_string(&mut cursor)?;
298 let graph = self.read_optional_string(&mut cursor)?;
299 let metadata = self.read_metadata(&mut cursor)?;
300
301 Ok(StreamEvent::TripleAdded {
302 subject,
303 predicate,
304 object,
305 graph,
306 metadata,
307 })
308 }
309 _ => Err(anyhow!("Unknown event type: {event_type}")),
311 }
312 }
313
314 fn read_string(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<String> {
316 use std::io::Read;
317
318 let mut len_bytes = [0u8; 4];
319 cursor.read_exact(&mut len_bytes)?;
320 let len = u32::from_le_bytes(len_bytes) as usize;
321
322 let mut bytes = vec![0u8; len];
323 cursor.read_exact(&mut bytes)?;
324
325 String::from_utf8(bytes).map_err(|e| anyhow!("Invalid UTF-8: {e}"))
326 }
327
328 fn read_optional_string(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<Option<String>> {
330 use std::io::Read;
331
332 let mut present = [0u8; 1];
333 cursor.read_exact(&mut present)?;
334
335 if present[0] == 1 {
336 Ok(Some(self.read_string(cursor)?))
337 } else {
338 Ok(None)
339 }
340 }
341
342 fn read_metadata(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<EventMetadata> {
344 use std::io::Read;
345
346 let mut len_bytes = [0u8; 4];
347 cursor.read_exact(&mut len_bytes)?;
348 let len = u32::from_le_bytes(len_bytes) as usize;
349
350 let mut json_bytes = vec![0u8; len];
351 cursor.read_exact(&mut json_bytes)?;
352
353 serde_json::from_slice(&json_bytes).map_err(|e| anyhow!("Failed to parse metadata: {e}"))
354 }
355
356 fn serialize_messagepack(&self, event: &StreamEvent) -> Result<Vec<u8>> {
358 rmp_serde::to_vec(event).map_err(|e| anyhow!("MessagePack serialization failed: {e}"))
359 }
360
361 fn deserialize_messagepack(&self, data: &[u8]) -> Result<StreamEvent> {
363 rmp_serde::from_slice(data).map_err(|e| anyhow!("MessagePack deserialization failed: {e}"))
364 }
365
366 fn serialize_cbor(&self, event: &StreamEvent) -> Result<Vec<u8>> {
368 let mut buf = Vec::new();
369 ciborium::ser::into_writer(event, &mut buf)
370 .map_err(|e| anyhow!("CBOR serialization failed: {e}"))?;
371 Ok(buf)
372 }
373
374 fn deserialize_cbor(&self, data: &[u8]) -> Result<StreamEvent> {
376 ciborium::de::from_reader(data).map_err(|e| anyhow!("CBOR deserialization failed: {e}"))
377 }
378
379 fn serialize_protobuf(&self, event: &StreamEvent) -> Result<Vec<u8>> {
381 let json_data = serde_json::to_value(event)?;
384 let proto_event = ProtobufStreamEvent::from_json(&json_data)?;
385
386 let mut buf = Vec::new();
387 prost::Message::encode(&proto_event, &mut buf)?;
388 Ok(buf)
389 }
390
391 fn deserialize_protobuf(&self, data: &[u8]) -> Result<StreamEvent> {
393 let proto_event = ProtobufStreamEvent::decode(data)?;
394 let json_value = proto_event.to_json()?;
395 let event: StreamEvent = serde_json::from_value(json_value)?;
396 Ok(event)
397 }
398
399 async fn serialize_avro(&self, event: &StreamEvent) -> Result<Vec<u8>> {
401 let schema = if let Some(registry) = &self.schema_registry {
403 registry.get_avro_schema_for_event(event).await?
404 } else {
405 get_default_avro_schema()
407 };
408
409 let avro_value = to_avro_value(event, &schema)?;
411
412 let mut writer = Vec::new();
414 let mut encoder = apache_avro::Writer::new(&schema, &mut writer);
415 encoder.append(avro_value)?;
416 encoder.flush()?;
417
418 let result = encoder.into_inner()?.to_vec();
420 Ok(result)
421 }
422
423 async fn deserialize_avro(&self, data: &[u8]) -> Result<StreamEvent> {
425 let reader = apache_avro::Reader::new(data)?;
427 let schema = reader.writer_schema().clone();
428
429 if let Some(record) = reader.into_iter().next() {
431 let avro_value = record?;
432 let event = from_avro_value(&avro_value, &schema)?;
433 Ok(event)
434 } else {
435 Err(anyhow!("No Avro record found in data"))
436 }
437 }
438
439 fn compress(&self, data: &[u8], compression: &CompressionType) -> Result<Vec<u8>> {
441 match compression {
442 CompressionType::None => Ok(data.to_vec()),
443 CompressionType::Gzip => oxiarc_deflate::gzip_compress(data, 6)
444 .map_err(|e| anyhow!("Gzip compression failed: {e}")),
445 CompressionType::Zstd => oxiarc_zstd::encode_all(data, 3)
446 .map_err(|e| anyhow!("Zstd compression failed: {e}")),
447 CompressionType::Lz4 => {
448 oxiarc_lz4::compress(data).map_err(|e| anyhow!("LZ4 compression failed: {e}"))
449 }
450 CompressionType::Snappy => Ok(oxiarc_snappy::compress(data)),
451 }
452 }
453
454 fn decompress(&self, data: &[u8], compression: &CompressionType) -> Result<Vec<u8>> {
456 match compression {
457 CompressionType::None => Ok(data.to_vec()),
458 CompressionType::Gzip => oxiarc_deflate::gzip_decompress(data)
459 .map_err(|e| anyhow!("Gzip decompression failed: {e}")),
460 CompressionType::Zstd => {
461 oxiarc_zstd::decode_all(data).map_err(|e| anyhow!("Zstd decompression failed: {e}"))
462 }
463 CompressionType::Lz4 => oxiarc_lz4::decompress(data, 100 * 1024 * 1024)
464 .map_err(|e| anyhow!("LZ4 decompression failed: {e}")),
465 CompressionType::Snappy => oxiarc_snappy::decompress(data)
466 .map_err(|e| anyhow!("Snappy decompression failed: {e}")),
467 }
468 }
469}
470
471pub struct FormatConverter {
473 source_format: SerializationFormat,
474 target_format: SerializationFormat,
475 #[allow(dead_code)]
476 schema_registry: Option<Arc<SchemaRegistry>>,
477}
478
479impl FormatConverter {
480 pub fn new(source: SerializationFormat, target: SerializationFormat) -> Self {
482 Self {
483 source_format: source,
484 target_format: target,
485 schema_registry: None,
486 }
487 }
488
489 pub async fn convert(&self, data: &[u8]) -> Result<Bytes> {
491 let source_serializer = EventSerializer::new(self.source_format);
493 let event = source_serializer.deserialize(data).await?;
494
495 let target_serializer = EventSerializer::new(self.target_format);
497 target_serializer.serialize(&event).await
498 }
499}
500
501pub struct StreamingSerializer {
503 serializer: EventSerializer,
504 delta_compressor: Option<DeltaCompressor>,
505 batch_size: usize,
506 current_batch: Vec<StreamEvent>,
507}
508
509impl StreamingSerializer {
510 pub fn new(serializer: EventSerializer, batch_size: usize) -> Self {
512 Self {
513 serializer,
514 delta_compressor: None,
515 batch_size,
516 current_batch: Vec::new(),
517 }
518 }
519
520 pub fn with_delta_compression(
522 mut self,
523 compression_type: DeltaCompressionType,
524 max_states: usize,
525 ) -> Self {
526 self.delta_compressor = Some(DeltaCompressor::new(compression_type, max_states));
527 self
528 }
529
530 pub async fn add_event(&mut self, event: StreamEvent) -> Result<Option<Bytes>> {
532 self.current_batch.push(event);
533
534 if self.current_batch.len() >= self.batch_size {
535 self.flush_batch().await
536 } else {
537 Ok(None)
538 }
539 }
540
541 pub async fn flush_batch(&mut self) -> Result<Option<Bytes>> {
543 if self.current_batch.is_empty() {
544 return Ok(None);
545 }
546
547 let batch = std::mem::take(&mut self.current_batch);
548 let serialized = self.serialize_batch(&batch).await?;
549 Ok(Some(serialized))
550 }
551
552 async fn serialize_batch(&self, batch: &[StreamEvent]) -> Result<Bytes> {
554 let mut buffer = BytesMut::new();
555
556 buffer.put_u32(batch.len() as u32);
558 buffer.put_u64(chrono::Utc::now().timestamp_millis() as u64);
559
560 for event in batch {
562 let event_data = self.serializer.serialize(event).await?;
563 buffer.put_u32(event_data.len() as u32);
564 buffer.put(event_data);
565 }
566
567 Ok(buffer.freeze())
568 }
569
570 pub async fn deserialize_batch(&self, data: &[u8]) -> Result<Vec<StreamEvent>> {
572 let mut cursor = std::io::Cursor::new(data);
573 let mut events = Vec::new();
574
575 let batch_size = cursor.get_u32();
577 let _timestamp = cursor.get_u64();
578
579 for _ in 0..batch_size {
581 let event_size = cursor.get_u32() as usize;
582 let event_data =
583 &data[cursor.position() as usize..(cursor.position() as usize + event_size)];
584 cursor.advance(event_size);
585
586 let event = self.serializer.deserialize(event_data).await?;
587 events.push(event);
588 }
589
590 Ok(events)
591 }
592
593 pub fn create_batch_stream(
595 &self,
596 events: impl Stream<Item = StreamEvent> + Send + 'static,
597 ) -> BoxStream<'static, Result<Bytes>> {
598 let serializer = self.serializer.clone();
599 let batch_size = self.batch_size;
600
601 Box::pin(events.chunks(batch_size).then(move |chunk| {
602 let serializer = serializer.clone();
603 async move {
604 let streaming_serializer = StreamingSerializer::new(serializer, batch_size);
605 streaming_serializer.serialize_batch(&chunk).await
606 }
607 }))
608 }
609}
610
611pub struct EnhancedBinaryFormat {
613 version: u8,
614 enable_compression: bool,
615 enable_checksums: bool,
616 chunk_size: usize,
617}
618
619impl EnhancedBinaryFormat {
620 pub fn new() -> Self {
622 Self {
623 version: 2, enable_compression: true,
625 enable_checksums: true,
626 chunk_size: 8192, }
628 }
629
630 pub fn with_compression(mut self, enable: bool) -> Self {
632 self.enable_compression = enable;
633 self
634 }
635
636 pub fn with_checksums(mut self, enable: bool) -> Self {
638 self.enable_checksums = enable;
639 self
640 }
641
642 pub fn with_chunk_size(mut self, size: usize) -> Self {
644 self.chunk_size = size;
645 self
646 }
647
648 pub async fn serialize(&self, event: &StreamEvent) -> Result<Bytes> {
650 let mut buffer = BytesMut::new();
651
652 buffer.put(&b"BIN2"[..]); buffer.put_u8(self.version);
655 buffer.put_u8(self.get_flags());
656
657 let event_json = serde_json::to_vec(event)?;
659
660 let data = if self.enable_compression {
662 oxiarc_lz4::compress(&event_json)
663 .map_err(|e| anyhow!("LZ4 compression failed: {}", e))?
664 } else {
665 event_json
666 };
667
668 if self.enable_checksums {
670 let checksum = crc32fast::hash(&data);
671 buffer.put_u32(checksum);
672 }
673
674 buffer.put_u32(data.len() as u32);
676 buffer.put(&data[..]);
677
678 Ok(buffer.freeze())
679 }
680
681 pub async fn deserialize(&self, data: &[u8]) -> Result<StreamEvent> {
683 let mut cursor = std::io::Cursor::new(data);
684
685 let mut magic = [0u8; 4];
687 cursor.read_exact(&mut magic)?;
688 if &magic != b"BIN2" {
689 return Err(anyhow!("Invalid magic bytes for enhanced binary format"));
690 }
691
692 let version = cursor.get_u8();
694 if version != self.version {
695 return Err(anyhow!(
696 "Unsupported enhanced binary format version: {}",
697 version
698 ));
699 }
700
701 let flags = cursor.get_u8();
702 let has_compression = (flags & 0x01) != 0;
703 let has_checksum = (flags & 0x02) != 0;
704
705 let expected_checksum = if has_checksum {
707 Some(cursor.get_u32())
708 } else {
709 None
710 };
711
712 let data_len = cursor.get_u32() as usize;
714 let mut event_data = vec![0u8; data_len];
715 cursor.read_exact(&mut event_data)?;
716
717 if let Some(expected) = expected_checksum {
719 let actual = crc32fast::hash(&event_data);
720 if actual != expected {
721 return Err(anyhow!(
722 "Checksum mismatch: expected {}, got {}",
723 expected,
724 actual
725 ));
726 }
727 }
728
729 let decompressed = if has_compression {
731 oxiarc_lz4::decompress(&event_data, 100 * 1024 * 1024)
732 .map_err(|e| anyhow!("LZ4 decompression failed: {}", e))?
733 } else {
734 event_data
735 };
736
737 let event = serde_json::from_slice(&decompressed)?;
739 Ok(event)
740 }
741
742 pub async fn serialize_streaming(&self, event: &StreamEvent) -> Result<Vec<Bytes>> {
744 let serialized = self.serialize(event).await?;
745 let mut chunks = Vec::new();
746
747 if serialized.len() <= self.chunk_size {
748 chunks.push(serialized);
749 } else {
750 let chunk_count = (serialized.len() + self.chunk_size - 1) / self.chunk_size;
752
753 for i in 0..chunk_count {
754 let start = i * self.chunk_size;
755 let end = std::cmp::min(start + self.chunk_size, serialized.len());
756
757 let mut chunk_buffer = BytesMut::new();
758 chunk_buffer.put(&b"CHNK"[..]); chunk_buffer.put_u32(i as u32); chunk_buffer.put_u32(chunk_count as u32); chunk_buffer.put_u32((end - start) as u32); chunk_buffer.put(&serialized[start..end]);
763
764 chunks.push(chunk_buffer.freeze());
765 }
766 }
767
768 Ok(chunks)
769 }
770
771 pub async fn deserialize_streaming(&self, chunks: Vec<Bytes>) -> Result<StreamEvent> {
773 if chunks.len() == 1 && !chunks[0].starts_with(b"CHNK") {
774 return self.deserialize(&chunks[0]).await;
776 }
777
778 let mut chunk_data: BTreeMap<u32, Vec<u8>> = BTreeMap::new();
780 let mut total_chunks = 0;
781
782 for chunk in chunks {
783 if !chunk.starts_with(b"CHNK") {
784 return Err(anyhow!("Invalid chunk format"));
785 }
786
787 let mut cursor = std::io::Cursor::new(&chunk[4..]);
788 let chunk_index = cursor.get_u32();
789 let chunk_count = cursor.get_u32();
790 let chunk_size = cursor.get_u32() as usize;
791
792 total_chunks = chunk_count;
793
794 let data = chunk[16..16 + chunk_size].to_vec();
795 chunk_data.insert(chunk_index, data);
796 }
797
798 if chunk_data.len() != total_chunks as usize {
799 return Err(anyhow!(
800 "Missing chunks: got {}, expected {}",
801 chunk_data.len(),
802 total_chunks
803 ));
804 }
805
806 let mut reassembled = Vec::new();
808 for (_index, data) in chunk_data {
809 reassembled.extend(data);
810 }
811
812 self.deserialize(&reassembled).await
814 }
815
816 fn get_flags(&self) -> u8 {
818 let mut flags = 0u8;
819 if self.enable_compression {
820 flags |= 0x01;
821 }
822 if self.enable_checksums {
823 flags |= 0x02;
824 }
825 flags
826 }
827}
828
829impl Default for EnhancedBinaryFormat {
830 fn default() -> Self {
831 Self::new()
832 }
833}
834
835#[cfg(test)]
836mod compression_tests {
837 use super::*;
838
839 fn serializer() -> EventSerializer {
840 EventSerializer::new(SerializationFormat::Json)
841 }
842
843 #[test]
845 fn test_raw_snappy_round_trip() {
846 let ser = serializer();
847 let data = b"serialization encoder raw snappy payload ".repeat(48);
848 let compressed = ser
849 .compress(&data, &CompressionType::Snappy)
850 .expect("snappy compress");
851 let restored = ser
852 .decompress(&compressed, &CompressionType::Snappy)
853 .expect("snappy decompress");
854 assert_eq!(restored, data, "raw snappy round-trip mismatch");
855 }
856
857 #[test]
859 fn test_raw_snappy_round_trip_random() {
860 use scirs2_core::random::Random;
861 use scirs2_core::RngExt;
862 let mut rng = Random::default();
863 let data: Vec<u8> = (0..2048).map(|_| rng.random()).collect();
864
865 let ser = serializer();
866 let compressed = ser
867 .compress(&data, &CompressionType::Snappy)
868 .expect("snappy compress random");
869 let restored = ser
870 .decompress(&compressed, &CompressionType::Snappy)
871 .expect("snappy decompress random");
872 assert_eq!(restored, data, "raw snappy random round-trip mismatch");
873 }
874
875 #[test]
877 fn test_gzip_round_trip() {
878 let ser = serializer();
879 let data = b"serialization encoder gzip payload ".repeat(40);
880 let compressed = ser
881 .compress(&data, &CompressionType::Gzip)
882 .expect("gzip compress");
883 let restored = ser
884 .decompress(&compressed, &CompressionType::Gzip)
885 .expect("gzip decompress");
886 assert_eq!(restored, data, "gzip round-trip mismatch");
887 }
888}