Skip to main content

oxirs_stream/
serialization_encoder.rs

1//! Event serializer/deserializer implementations and supporting encoders.
2//!
3//! This module provides:
4//! - [`EventSerializer`] - format-aware (de)serializer
5//! - [`FormatConverter`] - cross-format converter
6//! - [`StreamingSerializer`] - batched/streamed serialization
7//! - [`EnhancedBinaryFormat`] - chunked binary format with checksums
8
9use anyhow::{anyhow, Result};
10use bytes::{Buf, BufMut, Bytes, BytesMut};
11use futures::stream::{BoxStream, StreamExt as _};
12use std::collections::BTreeMap;
13use std::io::Read as _;
14use std::sync::Arc;
15
16use crate::serialization_decoder::DeltaCompressor;
17use crate::serialization_types::{
18    from_avro_value, get_default_avro_schema, to_avro_value, DeltaCompressionType,
19    ProtobufStreamEvent, SchemaRegistry, SerializationFormat, SerializerOptions,
20};
21use crate::{CompressionType, EventMetadata, StreamEvent};
22use tokio_stream::Stream;
23
24/// Event serializer with format support
25#[derive(Clone)]
26pub struct EventSerializer {
27    pub(crate) format: SerializationFormat,
28    pub(crate) compression: Option<CompressionType>,
29    pub(crate) schema_registry: Option<Arc<SchemaRegistry>>,
30    pub(crate) options: SerializerOptions,
31}
32
33impl EventSerializer {
34    /// Create a new event serializer
35    pub fn new(format: SerializationFormat) -> Self {
36        Self {
37            format,
38            compression: None,
39            schema_registry: None,
40            options: SerializerOptions::default(),
41        }
42    }
43
44    /// Set compression type
45    pub fn with_compression(mut self, compression: CompressionType) -> Self {
46        self.compression = Some(compression);
47        self
48    }
49
50    /// Set schema registry
51    pub fn with_schema_registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
52        self.schema_registry = Some(registry);
53        self
54    }
55
56    /// Set serializer options
57    pub fn with_options(mut self, options: SerializerOptions) -> Self {
58        self.options = options;
59        self
60    }
61
62    /// Serialize a stream event
63    pub async fn serialize(&self, event: &StreamEvent) -> Result<Bytes> {
64        let mut buffer = BytesMut::new();
65
66        // Add magic bytes if enabled
67        if self.options.include_magic_bytes {
68            buffer.put(self.format.magic_bytes());
69        }
70
71        // Add schema ID if enabled and registry is available
72        if self.options.include_schema_id {
73            if let Some(registry) = &self.schema_registry {
74                let schema_id = registry.get_schema_id_for_event(event).await?;
75                buffer.put_u32(schema_id.parse::<u32>().unwrap_or(0));
76            }
77        }
78
79        // Serialize based on format
80        let serialized = match self.format {
81            SerializationFormat::Json => self.serialize_json(event)?,
82            SerializationFormat::Binary => self.serialize_binary(event)?,
83            SerializationFormat::MessagePack => self.serialize_messagepack(event)?,
84            SerializationFormat::Cbor => self.serialize_cbor(event)?,
85            SerializationFormat::Protobuf => self.serialize_protobuf(event)?,
86            SerializationFormat::Avro => self.serialize_avro(event).await?,
87        };
88
89        // Apply compression if enabled
90        let data = if let Some(compression) = &self.compression {
91            self.compress(&serialized, compression)?
92        } else {
93            serialized
94        };
95
96        // Check size limit
97        if let Some(max_size) = self.options.max_size {
98            if data.len() > max_size {
99                return Err(anyhow!(
100                    "Serialized data exceeds maximum size: {} > {max_size}",
101                    data.len()
102                ));
103            }
104        }
105
106        buffer.put(&data[..]);
107        Ok(buffer.freeze())
108    }
109
110    /// Deserialize a stream event
111    pub async fn deserialize(&self, data: &[u8]) -> Result<StreamEvent> {
112        let mut cursor = std::io::Cursor::new(data);
113        let mut offset = 0;
114
115        // Skip magic bytes if present
116        if self.options.include_magic_bytes && data.len() >= 4 {
117            let magic = &data[0..4];
118            if magic == self.format.magic_bytes() {
119                offset += 4;
120                cursor.set_position(4);
121            }
122        }
123
124        // Skip schema ID if present
125        if self.options.include_schema_id
126            && self.schema_registry.is_some()
127            && data.len() >= offset + 4
128        {
129            offset += 4;
130            cursor.set_position(offset as u64);
131        }
132
133        // Get remaining data
134        let event_data = &data[offset..];
135
136        // Decompress if needed
137        let decompressed = if let Some(compression) = &self.compression {
138            self.decompress(event_data, compression)?
139        } else {
140            event_data.to_vec()
141        };
142
143        // Deserialize based on format
144        match self.format {
145            SerializationFormat::Json => self.deserialize_json(&decompressed),
146            SerializationFormat::Binary => self.deserialize_binary(&decompressed),
147            SerializationFormat::MessagePack => self.deserialize_messagepack(&decompressed),
148            SerializationFormat::Cbor => self.deserialize_cbor(&decompressed),
149            SerializationFormat::Protobuf => self.deserialize_protobuf(&decompressed),
150            SerializationFormat::Avro => self.deserialize_avro(&decompressed).await,
151        }
152    }
153
154    /// Serialize to JSON
155    fn serialize_json(&self, event: &StreamEvent) -> Result<Vec<u8>> {
156        if self.options.pretty_json {
157            serde_json::to_vec_pretty(event).map_err(|e| anyhow!("JSON serialization failed: {e}"))
158        } else {
159            serde_json::to_vec(event).map_err(|e| anyhow!("JSON serialization failed: {e}"))
160        }
161    }
162
163    /// Deserialize from JSON
164    fn deserialize_json(&self, data: &[u8]) -> Result<StreamEvent> {
165        serde_json::from_slice(data).map_err(|e| anyhow!("JSON deserialization failed: {e}"))
166    }
167
168    /// Serialize to binary format
169    fn serialize_binary(&self, event: &StreamEvent) -> Result<Vec<u8>> {
170        // Custom binary format implementation
171        let mut buffer = Vec::new();
172
173        // Write version
174        buffer.push(1); // Version 1
175
176        // Write event type
177        let event_type = match event {
178            StreamEvent::TripleAdded { .. } => 1,
179            StreamEvent::TripleRemoved { .. } => 2,
180            StreamEvent::QuadAdded { .. } => 3,
181            StreamEvent::QuadRemoved { .. } => 4,
182            StreamEvent::GraphCreated { .. } => 5,
183            StreamEvent::GraphCleared { .. } => 6,
184            StreamEvent::GraphDeleted { .. } => 7,
185            StreamEvent::GraphMetadataUpdated { .. } => 17,
186            StreamEvent::GraphPermissionsChanged { .. } => 18,
187            StreamEvent::GraphStatisticsUpdated { .. } => 19,
188            StreamEvent::GraphRenamed { .. } => 20,
189            StreamEvent::GraphMerged { .. } => 21,
190            StreamEvent::GraphSplit { .. } => 22,
191            StreamEvent::SparqlUpdate { .. } => 8,
192            StreamEvent::TransactionBegin { .. } => 9,
193            StreamEvent::TransactionCommit { .. } => 10,
194            StreamEvent::TransactionAbort { .. } => 11,
195            StreamEvent::SchemaChanged { .. } => 12,
196            StreamEvent::SchemaDefinitionAdded { .. } => 23,
197            StreamEvent::SchemaDefinitionRemoved { .. } => 24,
198            StreamEvent::SchemaDefinitionModified { .. } => 25,
199            StreamEvent::OntologyImported { .. } => 26,
200            StreamEvent::OntologyRemoved { .. } => 27,
201            StreamEvent::ConstraintAdded { .. } => 28,
202            StreamEvent::ConstraintRemoved { .. } => 29,
203            StreamEvent::ConstraintViolated { .. } => 30,
204            StreamEvent::IndexCreated { .. } => 31,
205            StreamEvent::IndexDropped { .. } => 32,
206            StreamEvent::IndexRebuilt { .. } => 33,
207            StreamEvent::ShapeAdded { .. } => 34,
208            StreamEvent::ShapeRemoved { .. } => 35,
209            StreamEvent::ShapeModified { .. } => 36,
210            StreamEvent::ShapeValidationStarted { .. } => 37,
211            StreamEvent::ShapeValidationCompleted { .. } => 38,
212            StreamEvent::ShapeViolationDetected { .. } => 39,
213            StreamEvent::QueryResultAdded { .. } => 14,
214            StreamEvent::QueryResultRemoved { .. } => 15,
215            StreamEvent::QueryCompleted { .. } => 16,
216            StreamEvent::SchemaUpdated { .. } => 40,
217            StreamEvent::ShapeUpdated { .. } => 41,
218            StreamEvent::Heartbeat { .. } => 13,
219            StreamEvent::ErrorOccurred { .. } => 42,
220        };
221        buffer.push(event_type);
222
223        // Serialize fields based on event type
224        match event {
225            StreamEvent::TripleAdded {
226                subject,
227                predicate,
228                object,
229                graph,
230                metadata,
231            } => {
232                self.write_string(&mut buffer, subject);
233                self.write_string(&mut buffer, predicate);
234                self.write_string(&mut buffer, object);
235                self.write_optional_string(&mut buffer, graph.as_deref());
236                self.write_metadata(&mut buffer, metadata)?;
237            }
238            // ... implement other event types similarly
239            _ => {
240                return Err(anyhow!(
241                    "Binary serialization not implemented for this event type"
242                ))
243            }
244        }
245
246        Ok(buffer)
247    }
248
249    /// Helper to write string to binary buffer
250    fn write_string(&self, buffer: &mut Vec<u8>, s: &str) {
251        let bytes = s.as_bytes();
252        buffer.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
253        buffer.extend_from_slice(bytes);
254    }
255
256    /// Helper to write optional string
257    fn write_optional_string(&self, buffer: &mut Vec<u8>, s: Option<&str>) {
258        match s {
259            Some(s) => {
260                buffer.push(1); // Present
261                self.write_string(buffer, s);
262            }
263            None => {
264                buffer.push(0); // Not present
265            }
266        }
267    }
268
269    /// Helper to write metadata
270    fn write_metadata(&self, buffer: &mut Vec<u8>, metadata: &EventMetadata) -> Result<()> {
271        // Serialize metadata as JSON for simplicity
272        let metadata_json = serde_json::to_vec(metadata)?;
273        buffer.extend_from_slice(&(metadata_json.len() as u32).to_le_bytes());
274        buffer.extend_from_slice(&metadata_json);
275        Ok(())
276    }
277
278    /// Deserialize from binary format
279    fn deserialize_binary(&self, data: &[u8]) -> Result<StreamEvent> {
280        if data.len() < 2 {
281            return Err(anyhow!("Binary data too short"));
282        }
283
284        let version = data[0];
285        if version != 1 {
286            return Err(anyhow!("Unsupported binary format version: {version}"));
287        }
288
289        let event_type = data[1];
290        let mut cursor = std::io::Cursor::new(&data[2..]);
291
292        match event_type {
293            1 => {
294                // TripleAdded
295                let subject = self.read_string(&mut cursor)?;
296                let predicate = self.read_string(&mut cursor)?;
297                let object = self.read_string(&mut cursor)?;
298                let graph = self.read_optional_string(&mut cursor)?;
299                let metadata = self.read_metadata(&mut cursor)?;
300
301                Ok(StreamEvent::TripleAdded {
302                    subject,
303                    predicate,
304                    object,
305                    graph,
306                    metadata,
307                })
308            }
309            // ... implement other event types
310            _ => Err(anyhow!("Unknown event type: {event_type}")),
311        }
312    }
313
314    /// Helper to read string from cursor
315    fn read_string(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<String> {
316        use std::io::Read;
317
318        let mut len_bytes = [0u8; 4];
319        cursor.read_exact(&mut len_bytes)?;
320        let len = u32::from_le_bytes(len_bytes) as usize;
321
322        let mut bytes = vec![0u8; len];
323        cursor.read_exact(&mut bytes)?;
324
325        String::from_utf8(bytes).map_err(|e| anyhow!("Invalid UTF-8: {e}"))
326    }
327
328    /// Helper to read optional string
329    fn read_optional_string(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<Option<String>> {
330        use std::io::Read;
331
332        let mut present = [0u8; 1];
333        cursor.read_exact(&mut present)?;
334
335        if present[0] == 1 {
336            Ok(Some(self.read_string(cursor)?))
337        } else {
338            Ok(None)
339        }
340    }
341
342    /// Helper to read metadata
343    fn read_metadata(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<EventMetadata> {
344        use std::io::Read;
345
346        let mut len_bytes = [0u8; 4];
347        cursor.read_exact(&mut len_bytes)?;
348        let len = u32::from_le_bytes(len_bytes) as usize;
349
350        let mut json_bytes = vec![0u8; len];
351        cursor.read_exact(&mut json_bytes)?;
352
353        serde_json::from_slice(&json_bytes).map_err(|e| anyhow!("Failed to parse metadata: {e}"))
354    }
355
356    /// Serialize to MessagePack
357    fn serialize_messagepack(&self, event: &StreamEvent) -> Result<Vec<u8>> {
358        rmp_serde::to_vec(event).map_err(|e| anyhow!("MessagePack serialization failed: {e}"))
359    }
360
361    /// Deserialize from MessagePack
362    fn deserialize_messagepack(&self, data: &[u8]) -> Result<StreamEvent> {
363        rmp_serde::from_slice(data).map_err(|e| anyhow!("MessagePack deserialization failed: {e}"))
364    }
365
366    /// Serialize to CBOR
367    fn serialize_cbor(&self, event: &StreamEvent) -> Result<Vec<u8>> {
368        let mut buf = Vec::new();
369        ciborium::ser::into_writer(event, &mut buf)
370            .map_err(|e| anyhow!("CBOR serialization failed: {e}"))?;
371        Ok(buf)
372    }
373
374    /// Deserialize from CBOR
375    fn deserialize_cbor(&self, data: &[u8]) -> Result<StreamEvent> {
376        ciborium::de::from_reader(data).map_err(|e| anyhow!("CBOR deserialization failed: {e}"))
377    }
378
379    /// Serialize to Protocol Buffers
380    fn serialize_protobuf(&self, event: &StreamEvent) -> Result<Vec<u8>> {
381        // Use prost for Protocol Buffers serialization
382        // For now, we'll use a JSON-based approach until proper proto definitions are created
383        let json_data = serde_json::to_value(event)?;
384        let proto_event = ProtobufStreamEvent::from_json(&json_data)?;
385
386        let mut buf = Vec::new();
387        prost::Message::encode(&proto_event, &mut buf)?;
388        Ok(buf)
389    }
390
391    /// Deserialize from Protocol Buffers
392    fn deserialize_protobuf(&self, data: &[u8]) -> Result<StreamEvent> {
393        let proto_event = ProtobufStreamEvent::decode(data)?;
394        let json_value = proto_event.to_json()?;
395        let event: StreamEvent = serde_json::from_value(json_value)?;
396        Ok(event)
397    }
398
399    /// Serialize to Apache Avro
400    async fn serialize_avro(&self, event: &StreamEvent) -> Result<Vec<u8>> {
401        // Get schema from registry if available
402        let schema = if let Some(registry) = &self.schema_registry {
403            registry.get_avro_schema_for_event(event).await?
404        } else {
405            // Use default schema
406            get_default_avro_schema()
407        };
408
409        // Convert event to Avro value
410        let avro_value = to_avro_value(event, &schema)?;
411
412        // Serialize with schema
413        let mut writer = Vec::new();
414        let mut encoder = apache_avro::Writer::new(&schema, &mut writer);
415        encoder.append(avro_value)?;
416        encoder.flush()?;
417
418        // apache-avro 0.21: extract the writer before encoder is dropped
419        let result = encoder.into_inner()?.to_vec();
420        Ok(result)
421    }
422
423    /// Deserialize from Apache Avro
424    async fn deserialize_avro(&self, data: &[u8]) -> Result<StreamEvent> {
425        // Extract schema from data header
426        let reader = apache_avro::Reader::new(data)?;
427        let schema = reader.writer_schema().clone();
428
429        // Read the first (and only) record
430        if let Some(record) = reader.into_iter().next() {
431            let avro_value = record?;
432            let event = from_avro_value(&avro_value, &schema)?;
433            Ok(event)
434        } else {
435            Err(anyhow!("No Avro record found in data"))
436        }
437    }
438
439    /// Compress data
440    fn compress(&self, data: &[u8], compression: &CompressionType) -> Result<Vec<u8>> {
441        match compression {
442            CompressionType::None => Ok(data.to_vec()),
443            CompressionType::Gzip => oxiarc_deflate::gzip_compress(data, 6)
444                .map_err(|e| anyhow!("Gzip compression failed: {e}")),
445            CompressionType::Zstd => oxiarc_zstd::encode_all(data, 3)
446                .map_err(|e| anyhow!("Zstd compression failed: {e}")),
447            CompressionType::Lz4 => {
448                oxiarc_lz4::compress(data).map_err(|e| anyhow!("LZ4 compression failed: {e}"))
449            }
450            CompressionType::Snappy => Ok(oxiarc_snappy::compress(data)),
451        }
452    }
453
454    /// Decompress data
455    fn decompress(&self, data: &[u8], compression: &CompressionType) -> Result<Vec<u8>> {
456        match compression {
457            CompressionType::None => Ok(data.to_vec()),
458            CompressionType::Gzip => oxiarc_deflate::gzip_decompress(data)
459                .map_err(|e| anyhow!("Gzip decompression failed: {e}")),
460            CompressionType::Zstd => {
461                oxiarc_zstd::decode_all(data).map_err(|e| anyhow!("Zstd decompression failed: {e}"))
462            }
463            CompressionType::Lz4 => oxiarc_lz4::decompress(data, 100 * 1024 * 1024)
464                .map_err(|e| anyhow!("LZ4 decompression failed: {e}")),
465            CompressionType::Snappy => oxiarc_snappy::decompress(data)
466                .map_err(|e| anyhow!("Snappy decompression failed: {e}")),
467        }
468    }
469}
470
471/// Format converter for converting between serialization formats
472pub struct FormatConverter {
473    source_format: SerializationFormat,
474    target_format: SerializationFormat,
475    #[allow(dead_code)]
476    schema_registry: Option<Arc<SchemaRegistry>>,
477}
478
479impl FormatConverter {
480    /// Create a new format converter
481    pub fn new(source: SerializationFormat, target: SerializationFormat) -> Self {
482        Self {
483            source_format: source,
484            target_format: target,
485            schema_registry: None,
486        }
487    }
488
489    /// Convert data between formats
490    pub async fn convert(&self, data: &[u8]) -> Result<Bytes> {
491        // Deserialize from source format
492        let source_serializer = EventSerializer::new(self.source_format);
493        let event = source_serializer.deserialize(data).await?;
494
495        // Serialize to target format
496        let target_serializer = EventSerializer::new(self.target_format);
497        target_serializer.serialize(&event).await
498    }
499}
500
501/// Streaming serializer for batch processing
502pub struct StreamingSerializer {
503    serializer: EventSerializer,
504    delta_compressor: Option<DeltaCompressor>,
505    batch_size: usize,
506    current_batch: Vec<StreamEvent>,
507}
508
509impl StreamingSerializer {
510    /// Create a new streaming serializer
511    pub fn new(serializer: EventSerializer, batch_size: usize) -> Self {
512        Self {
513            serializer,
514            delta_compressor: None,
515            batch_size,
516            current_batch: Vec::new(),
517        }
518    }
519
520    /// Enable delta compression
521    pub fn with_delta_compression(
522        mut self,
523        compression_type: DeltaCompressionType,
524        max_states: usize,
525    ) -> Self {
526        self.delta_compressor = Some(DeltaCompressor::new(compression_type, max_states));
527        self
528    }
529
530    /// Add event to batch
531    pub async fn add_event(&mut self, event: StreamEvent) -> Result<Option<Bytes>> {
532        self.current_batch.push(event);
533
534        if self.current_batch.len() >= self.batch_size {
535            self.flush_batch().await
536        } else {
537            Ok(None)
538        }
539    }
540
541    /// Flush current batch
542    pub async fn flush_batch(&mut self) -> Result<Option<Bytes>> {
543        if self.current_batch.is_empty() {
544            return Ok(None);
545        }
546
547        let batch = std::mem::take(&mut self.current_batch);
548        let serialized = self.serialize_batch(&batch).await?;
549        Ok(Some(serialized))
550    }
551
552    /// Serialize a batch of events
553    async fn serialize_batch(&self, batch: &[StreamEvent]) -> Result<Bytes> {
554        let mut buffer = BytesMut::new();
555
556        // Write batch header
557        buffer.put_u32(batch.len() as u32);
558        buffer.put_u64(chrono::Utc::now().timestamp_millis() as u64);
559
560        // Serialize each event
561        for event in batch {
562            let event_data = self.serializer.serialize(event).await?;
563            buffer.put_u32(event_data.len() as u32);
564            buffer.put(event_data);
565        }
566
567        Ok(buffer.freeze())
568    }
569
570    /// Deserialize a batch of events
571    pub async fn deserialize_batch(&self, data: &[u8]) -> Result<Vec<StreamEvent>> {
572        let mut cursor = std::io::Cursor::new(data);
573        let mut events = Vec::new();
574
575        // Read batch header
576        let batch_size = cursor.get_u32();
577        let _timestamp = cursor.get_u64();
578
579        // Read each event
580        for _ in 0..batch_size {
581            let event_size = cursor.get_u32() as usize;
582            let event_data =
583                &data[cursor.position() as usize..(cursor.position() as usize + event_size)];
584            cursor.advance(event_size);
585
586            let event = self.serializer.deserialize(event_data).await?;
587            events.push(event);
588        }
589
590        Ok(events)
591    }
592
593    /// Create a stream of serialized batches
594    pub fn create_batch_stream(
595        &self,
596        events: impl Stream<Item = StreamEvent> + Send + 'static,
597    ) -> BoxStream<'static, Result<Bytes>> {
598        let serializer = self.serializer.clone();
599        let batch_size = self.batch_size;
600
601        Box::pin(events.chunks(batch_size).then(move |chunk| {
602            let serializer = serializer.clone();
603            async move {
604                let streaming_serializer = StreamingSerializer::new(serializer, batch_size);
605                streaming_serializer.serialize_batch(&chunk).await
606            }
607        }))
608    }
609}
610
611/// Enhanced binary format with streaming support
612pub struct EnhancedBinaryFormat {
613    version: u8,
614    enable_compression: bool,
615    enable_checksums: bool,
616    chunk_size: usize,
617}
618
619impl EnhancedBinaryFormat {
620    /// Create a new enhanced binary format
621    pub fn new() -> Self {
622        Self {
623            version: 2, // Enhanced version
624            enable_compression: true,
625            enable_checksums: true,
626            chunk_size: 8192, // 8KB chunks
627        }
628    }
629
630    /// Configure compression
631    pub fn with_compression(mut self, enable: bool) -> Self {
632        self.enable_compression = enable;
633        self
634    }
635
636    /// Configure checksums
637    pub fn with_checksums(mut self, enable: bool) -> Self {
638        self.enable_checksums = enable;
639        self
640    }
641
642    /// Set chunk size for streaming
643    pub fn with_chunk_size(mut self, size: usize) -> Self {
644        self.chunk_size = size;
645        self
646    }
647
648    /// Serialize event in enhanced binary format
649    pub async fn serialize(&self, event: &StreamEvent) -> Result<Bytes> {
650        let mut buffer = BytesMut::new();
651
652        // Header
653        buffer.put(&b"BIN2"[..]); // Magic bytes for v2
654        buffer.put_u8(self.version);
655        buffer.put_u8(self.get_flags());
656
657        // Serialize event data
658        let event_json = serde_json::to_vec(event)?;
659
660        // Apply compression if enabled
661        let data = if self.enable_compression {
662            oxiarc_lz4::compress(&event_json)
663                .map_err(|e| anyhow!("LZ4 compression failed: {}", e))?
664        } else {
665            event_json
666        };
667
668        // Add checksum if enabled
669        if self.enable_checksums {
670            let checksum = crc32fast::hash(&data);
671            buffer.put_u32(checksum);
672        }
673
674        // Add data length and data
675        buffer.put_u32(data.len() as u32);
676        buffer.put(&data[..]);
677
678        Ok(buffer.freeze())
679    }
680
681    /// Deserialize event from enhanced binary format
682    pub async fn deserialize(&self, data: &[u8]) -> Result<StreamEvent> {
683        let mut cursor = std::io::Cursor::new(data);
684
685        // Check magic bytes
686        let mut magic = [0u8; 4];
687        cursor.read_exact(&mut magic)?;
688        if &magic != b"BIN2" {
689            return Err(anyhow!("Invalid magic bytes for enhanced binary format"));
690        }
691
692        // Read version and flags
693        let version = cursor.get_u8();
694        if version != self.version {
695            return Err(anyhow!(
696                "Unsupported enhanced binary format version: {}",
697                version
698            ));
699        }
700
701        let flags = cursor.get_u8();
702        let has_compression = (flags & 0x01) != 0;
703        let has_checksum = (flags & 0x02) != 0;
704
705        // Read checksum if present
706        let expected_checksum = if has_checksum {
707            Some(cursor.get_u32())
708        } else {
709            None
710        };
711
712        // Read data
713        let data_len = cursor.get_u32() as usize;
714        let mut event_data = vec![0u8; data_len];
715        cursor.read_exact(&mut event_data)?;
716
717        // Verify checksum
718        if let Some(expected) = expected_checksum {
719            let actual = crc32fast::hash(&event_data);
720            if actual != expected {
721                return Err(anyhow!(
722                    "Checksum mismatch: expected {}, got {}",
723                    expected,
724                    actual
725                ));
726            }
727        }
728
729        // Decompress if needed
730        let decompressed = if has_compression {
731            oxiarc_lz4::decompress(&event_data, 100 * 1024 * 1024)
732                .map_err(|e| anyhow!("LZ4 decompression failed: {}", e))?
733        } else {
734            event_data
735        };
736
737        // Deserialize event
738        let event = serde_json::from_slice(&decompressed)?;
739        Ok(event)
740    }
741
742    /// Create streaming chunks for large events
743    pub async fn serialize_streaming(&self, event: &StreamEvent) -> Result<Vec<Bytes>> {
744        let serialized = self.serialize(event).await?;
745        let mut chunks = Vec::new();
746
747        if serialized.len() <= self.chunk_size {
748            chunks.push(serialized);
749        } else {
750            // Split into chunks
751            let chunk_count = (serialized.len() + self.chunk_size - 1) / self.chunk_size;
752
753            for i in 0..chunk_count {
754                let start = i * self.chunk_size;
755                let end = std::cmp::min(start + self.chunk_size, serialized.len());
756
757                let mut chunk_buffer = BytesMut::new();
758                chunk_buffer.put(&b"CHNK"[..]); // Chunk magic
759                chunk_buffer.put_u32(i as u32); // Chunk index
760                chunk_buffer.put_u32(chunk_count as u32); // Total chunks
761                chunk_buffer.put_u32((end - start) as u32); // Chunk size
762                chunk_buffer.put(&serialized[start..end]);
763
764                chunks.push(chunk_buffer.freeze());
765            }
766        }
767
768        Ok(chunks)
769    }
770
771    /// Reassemble streaming chunks
772    pub async fn deserialize_streaming(&self, chunks: Vec<Bytes>) -> Result<StreamEvent> {
773        if chunks.len() == 1 && !chunks[0].starts_with(b"CHNK") {
774            // Single chunk, deserialize directly
775            return self.deserialize(&chunks[0]).await;
776        }
777
778        // Reassemble chunks
779        let mut chunk_data: BTreeMap<u32, Vec<u8>> = BTreeMap::new();
780        let mut total_chunks = 0;
781
782        for chunk in chunks {
783            if !chunk.starts_with(b"CHNK") {
784                return Err(anyhow!("Invalid chunk format"));
785            }
786
787            let mut cursor = std::io::Cursor::new(&chunk[4..]);
788            let chunk_index = cursor.get_u32();
789            let chunk_count = cursor.get_u32();
790            let chunk_size = cursor.get_u32() as usize;
791
792            total_chunks = chunk_count;
793
794            let data = chunk[16..16 + chunk_size].to_vec();
795            chunk_data.insert(chunk_index, data);
796        }
797
798        if chunk_data.len() != total_chunks as usize {
799            return Err(anyhow!(
800                "Missing chunks: got {}, expected {}",
801                chunk_data.len(),
802                total_chunks
803            ));
804        }
805
806        // Reassemble data
807        let mut reassembled = Vec::new();
808        for (_index, data) in chunk_data {
809            reassembled.extend(data);
810        }
811
812        // Deserialize reassembled data
813        self.deserialize(&reassembled).await
814    }
815
816    /// Get format flags
817    fn get_flags(&self) -> u8 {
818        let mut flags = 0u8;
819        if self.enable_compression {
820            flags |= 0x01;
821        }
822        if self.enable_checksums {
823            flags |= 0x02;
824        }
825        flags
826    }
827}
828
829impl Default for EnhancedBinaryFormat {
830    fn default() -> Self {
831        Self::new()
832    }
833}
834
835#[cfg(test)]
836mod compression_tests {
837    use super::*;
838
839    fn serializer() -> EventSerializer {
840        EventSerializer::new(SerializationFormat::Json)
841    }
842
843    /// Raw-Snappy round-trip via oxiarc_snappy::compress / decompress.
844    #[test]
845    fn test_raw_snappy_round_trip() {
846        let ser = serializer();
847        let data = b"serialization encoder raw snappy payload ".repeat(48);
848        let compressed = ser
849            .compress(&data, &CompressionType::Snappy)
850            .expect("snappy compress");
851        let restored = ser
852            .decompress(&compressed, &CompressionType::Snappy)
853            .expect("snappy decompress");
854        assert_eq!(restored, data, "raw snappy round-trip mismatch");
855    }
856
857    /// Raw-Snappy must round-trip an incompressible/random buffer.
858    #[test]
859    fn test_raw_snappy_round_trip_random() {
860        use scirs2_core::random::Random;
861        use scirs2_core::RngExt;
862        let mut rng = Random::default();
863        let data: Vec<u8> = (0..2048).map(|_| rng.random()).collect();
864
865        let ser = serializer();
866        let compressed = ser
867            .compress(&data, &CompressionType::Snappy)
868            .expect("snappy compress random");
869        let restored = ser
870            .decompress(&compressed, &CompressionType::Snappy)
871            .expect("snappy decompress random");
872        assert_eq!(restored, data, "raw snappy random round-trip mismatch");
873    }
874
875    /// Gzip round-trip via oxiarc_deflate::gzip_compress / gzip_decompress.
876    #[test]
877    fn test_gzip_round_trip() {
878        let ser = serializer();
879        let data = b"serialization encoder gzip payload ".repeat(40);
880        let compressed = ser
881            .compress(&data, &CompressionType::Gzip)
882            .expect("gzip compress");
883        let restored = ser
884            .decompress(&compressed, &CompressionType::Gzip)
885            .expect("gzip decompress");
886        assert_eq!(restored, data, "gzip round-trip mismatch");
887    }
888}