Skip to main content

oxirs_stream/
serialization.rs

1//! # Event Serialization Module
2//!
3//! This module provides comprehensive serialization support for stream events with:
4//! - Multiple format support (JSON, Protobuf, Avro, Binary)
5//! - Schema evolution and versioning
6//! - Compression integration
7//! - Format auto-detection
8//! - Schema registry integration
9
10use anyhow::{anyhow, Result};
11use bytes::{Buf, BufMut, Bytes, BytesMut};
12use chrono::{DateTime, Utc};
13use crc32fast;
14use futures::stream::{BoxStream, StreamExt as _};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeMap, HashMap};
17use std::io::Read as _;
18use std::sync::Arc;
19use tokio::sync::RwLock;
20use tokio_stream::Stream;
21
22use crate::{CompressionType, EventMetadata, StreamEvent};
23
24/// Serialization format types
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
26pub enum SerializationFormat {
27    /// JSON format (human-readable)
28    Json,
29    /// Protocol Buffers (efficient binary)
30    Protobuf,
31    /// Apache Avro (schema-based)
32    Avro,
33    /// Custom binary format
34    Binary,
35    /// MessagePack format
36    MessagePack,
37    /// CBOR (Concise Binary Object Representation)
38    Cbor,
39}
40
41impl SerializationFormat {
42    /// Get format identifier bytes
43    pub fn magic_bytes(&self) -> &[u8] {
44        match self {
45            SerializationFormat::Json => b"JSON",
46            SerializationFormat::Protobuf => b"PB03",
47            SerializationFormat::Avro => b"Obj\x01",
48            SerializationFormat::Binary => b"BIN1",
49            SerializationFormat::MessagePack => b"MSGP",
50            SerializationFormat::Cbor => b"CBOR",
51        }
52    }
53
54    /// Detect format from magic bytes
55    pub fn detect(data: &[u8]) -> Option<Self> {
56        if data.len() < 4 {
57            return None;
58        }
59
60        let magic = &data[0..4];
61        match magic {
62            b"JSON" => Some(SerializationFormat::Json),
63            b"PB03" => Some(SerializationFormat::Protobuf),
64            b"Obj\x01" => Some(SerializationFormat::Avro),
65            b"BIN1" => Some(SerializationFormat::Binary),
66            b"MSGP" => Some(SerializationFormat::MessagePack),
67            b"CBOR" => Some(SerializationFormat::Cbor),
68            _ => {
69                // Try to detect JSON by checking for common patterns
70                if data.starts_with(b"{") || data.starts_with(b"[") {
71                    Some(SerializationFormat::Json)
72                } else {
73                    None
74                }
75            }
76        }
77    }
78}
79
80/// Event serializer with format support
81#[derive(Clone)]
82pub struct EventSerializer {
83    format: SerializationFormat,
84    compression: Option<CompressionType>,
85    schema_registry: Option<Arc<SchemaRegistry>>,
86    options: SerializerOptions,
87}
88
89/// Serializer options
90#[derive(Debug, Clone)]
91pub struct SerializerOptions {
92    /// Include schema ID in serialized data
93    pub include_schema_id: bool,
94    /// Include format magic bytes
95    pub include_magic_bytes: bool,
96    /// Pretty print JSON
97    pub pretty_json: bool,
98    /// Validate against schema
99    pub validate_schema: bool,
100    /// Maximum serialized size
101    pub max_size: Option<usize>,
102}
103
104impl Default for SerializerOptions {
105    fn default() -> Self {
106        Self {
107            include_schema_id: true,
108            include_magic_bytes: true,
109            pretty_json: false,
110            validate_schema: true,
111            max_size: Some(1024 * 1024), // 1MB default
112        }
113    }
114}
115
116/// Schema registry for managing schemas
117pub struct SchemaRegistry {
118    schemas: Arc<RwLock<HashMap<String, Schema>>>,
119    /// Schema evolution rules
120    evolution_rules: EvolutionRules,
121}
122
123/// Schema definition
124#[derive(Debug, Clone)]
125pub struct Schema {
126    pub id: String,
127    pub version: u32,
128    pub format: SerializationFormat,
129    pub definition: SchemaDefinition,
130    pub compatibility: CompatibilityMode,
131}
132
133/// Schema definition types
134#[derive(Debug, Clone)]
135pub enum SchemaDefinition {
136    /// JSON Schema
137    JsonSchema(serde_json::Value),
138    /// Protobuf descriptor
139    ProtobufDescriptor(Vec<u8>),
140    /// Avro schema
141    AvroSchema(String),
142    /// Custom schema
143    Custom(HashMap<String, serde_json::Value>),
144}
145
146/// Schema compatibility modes
147#[derive(Debug, Clone, Copy)]
148pub enum CompatibilityMode {
149    /// No compatibility checking
150    None,
151    /// Can read previous version
152    Backward,
153    /// Can read next version
154    Forward,
155    /// Can read both previous and next
156    Full,
157}
158
159/// Schema evolution rules
160#[derive(Debug, Clone)]
161pub struct EvolutionRules {
162    /// Allow field addition
163    pub allow_field_addition: bool,
164    /// Allow field removal
165    pub allow_field_removal: bool,
166    /// Allow type promotion
167    pub allow_type_promotion: bool,
168    /// Required fields
169    pub required_fields: Vec<String>,
170}
171
172impl Default for EvolutionRules {
173    fn default() -> Self {
174        Self {
175            allow_field_addition: true,
176            allow_field_removal: false,
177            allow_type_promotion: true,
178            required_fields: vec!["event_id".to_string(), "timestamp".to_string()],
179        }
180    }
181}
182
183impl EventSerializer {
184    /// Create a new event serializer
185    pub fn new(format: SerializationFormat) -> Self {
186        Self {
187            format,
188            compression: None,
189            schema_registry: None,
190            options: SerializerOptions::default(),
191        }
192    }
193
194    /// Set compression type
195    pub fn with_compression(mut self, compression: CompressionType) -> Self {
196        self.compression = Some(compression);
197        self
198    }
199
200    /// Set schema registry
201    pub fn with_schema_registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
202        self.schema_registry = Some(registry);
203        self
204    }
205
206    /// Set serializer options
207    pub fn with_options(mut self, options: SerializerOptions) -> Self {
208        self.options = options;
209        self
210    }
211
212    /// Serialize a stream event
213    pub async fn serialize(&self, event: &StreamEvent) -> Result<Bytes> {
214        let mut buffer = BytesMut::new();
215
216        // Add magic bytes if enabled
217        if self.options.include_magic_bytes {
218            buffer.put(self.format.magic_bytes());
219        }
220
221        // Add schema ID if enabled and registry is available
222        if self.options.include_schema_id {
223            if let Some(registry) = &self.schema_registry {
224                let schema_id = registry.get_schema_id_for_event(event).await?;
225                buffer.put_u32(schema_id.parse::<u32>().unwrap_or(0));
226            }
227        }
228
229        // Serialize based on format
230        let serialized = match self.format {
231            SerializationFormat::Json => self.serialize_json(event)?,
232            SerializationFormat::Binary => self.serialize_binary(event)?,
233            SerializationFormat::MessagePack => self.serialize_messagepack(event)?,
234            SerializationFormat::Cbor => self.serialize_cbor(event)?,
235            SerializationFormat::Protobuf => self.serialize_protobuf(event)?,
236            SerializationFormat::Avro => self.serialize_avro(event).await?,
237        };
238
239        // Apply compression if enabled
240        let data = if let Some(compression) = &self.compression {
241            self.compress(&serialized, compression)?
242        } else {
243            serialized
244        };
245
246        // Check size limit
247        if let Some(max_size) = self.options.max_size {
248            if data.len() > max_size {
249                return Err(anyhow!(
250                    "Serialized data exceeds maximum size: {} > {max_size}",
251                    data.len()
252                ));
253            }
254        }
255
256        buffer.put(&data[..]);
257        Ok(buffer.freeze())
258    }
259
260    /// Deserialize a stream event
261    pub async fn deserialize(&self, data: &[u8]) -> Result<StreamEvent> {
262        let mut cursor = std::io::Cursor::new(data);
263        let mut offset = 0;
264
265        // Skip magic bytes if present
266        if self.options.include_magic_bytes && data.len() >= 4 {
267            let magic = &data[0..4];
268            if magic == self.format.magic_bytes() {
269                offset += 4;
270                cursor.set_position(4);
271            }
272        }
273
274        // Skip schema ID if present
275        if self.options.include_schema_id
276            && self.schema_registry.is_some()
277            && data.len() >= offset + 4
278        {
279            offset += 4;
280            cursor.set_position(offset as u64);
281        }
282
283        // Get remaining data
284        let event_data = &data[offset..];
285
286        // Decompress if needed
287        let decompressed = if let Some(compression) = &self.compression {
288            self.decompress(event_data, compression)?
289        } else {
290            event_data.to_vec()
291        };
292
293        // Deserialize based on format
294        match self.format {
295            SerializationFormat::Json => self.deserialize_json(&decompressed),
296            SerializationFormat::Binary => self.deserialize_binary(&decompressed),
297            SerializationFormat::MessagePack => self.deserialize_messagepack(&decompressed),
298            SerializationFormat::Cbor => self.deserialize_cbor(&decompressed),
299            SerializationFormat::Protobuf => self.deserialize_protobuf(&decompressed),
300            SerializationFormat::Avro => self.deserialize_avro(&decompressed).await,
301        }
302    }
303
304    /// Serialize to JSON
305    fn serialize_json(&self, event: &StreamEvent) -> Result<Vec<u8>> {
306        if self.options.pretty_json {
307            serde_json::to_vec_pretty(event).map_err(|e| anyhow!("JSON serialization failed: {e}"))
308        } else {
309            serde_json::to_vec(event).map_err(|e| anyhow!("JSON serialization failed: {e}"))
310        }
311    }
312
313    /// Deserialize from JSON
314    fn deserialize_json(&self, data: &[u8]) -> Result<StreamEvent> {
315        serde_json::from_slice(data).map_err(|e| anyhow!("JSON deserialization failed: {e}"))
316    }
317
318    /// Serialize to binary format
319    fn serialize_binary(&self, event: &StreamEvent) -> Result<Vec<u8>> {
320        // Custom binary format implementation
321        let mut buffer = Vec::new();
322
323        // Write version
324        buffer.push(1); // Version 1
325
326        // Write event type
327        let event_type = match event {
328            StreamEvent::TripleAdded { .. } => 1,
329            StreamEvent::TripleRemoved { .. } => 2,
330            StreamEvent::QuadAdded { .. } => 3,
331            StreamEvent::QuadRemoved { .. } => 4,
332            StreamEvent::GraphCreated { .. } => 5,
333            StreamEvent::GraphCleared { .. } => 6,
334            StreamEvent::GraphDeleted { .. } => 7,
335            StreamEvent::GraphMetadataUpdated { .. } => 17,
336            StreamEvent::GraphPermissionsChanged { .. } => 18,
337            StreamEvent::GraphStatisticsUpdated { .. } => 19,
338            StreamEvent::GraphRenamed { .. } => 20,
339            StreamEvent::GraphMerged { .. } => 21,
340            StreamEvent::GraphSplit { .. } => 22,
341            StreamEvent::SparqlUpdate { .. } => 8,
342            StreamEvent::TransactionBegin { .. } => 9,
343            StreamEvent::TransactionCommit { .. } => 10,
344            StreamEvent::TransactionAbort { .. } => 11,
345            StreamEvent::SchemaChanged { .. } => 12,
346            StreamEvent::SchemaDefinitionAdded { .. } => 23,
347            StreamEvent::SchemaDefinitionRemoved { .. } => 24,
348            StreamEvent::SchemaDefinitionModified { .. } => 25,
349            StreamEvent::OntologyImported { .. } => 26,
350            StreamEvent::OntologyRemoved { .. } => 27,
351            StreamEvent::ConstraintAdded { .. } => 28,
352            StreamEvent::ConstraintRemoved { .. } => 29,
353            StreamEvent::ConstraintViolated { .. } => 30,
354            StreamEvent::IndexCreated { .. } => 31,
355            StreamEvent::IndexDropped { .. } => 32,
356            StreamEvent::IndexRebuilt { .. } => 33,
357            StreamEvent::ShapeAdded { .. } => 34,
358            StreamEvent::ShapeRemoved { .. } => 35,
359            StreamEvent::ShapeModified { .. } => 36,
360            StreamEvent::ShapeValidationStarted { .. } => 37,
361            StreamEvent::ShapeValidationCompleted { .. } => 38,
362            StreamEvent::ShapeViolationDetected { .. } => 39,
363            StreamEvent::QueryResultAdded { .. } => 14,
364            StreamEvent::QueryResultRemoved { .. } => 15,
365            StreamEvent::QueryCompleted { .. } => 16,
366            StreamEvent::SchemaUpdated { .. } => 40,
367            StreamEvent::ShapeUpdated { .. } => 41,
368            StreamEvent::Heartbeat { .. } => 13,
369            StreamEvent::ErrorOccurred { .. } => 42,
370        };
371        buffer.push(event_type);
372
373        // Serialize fields based on event type
374        match event {
375            StreamEvent::TripleAdded {
376                subject,
377                predicate,
378                object,
379                graph,
380                metadata,
381            } => {
382                self.write_string(&mut buffer, subject);
383                self.write_string(&mut buffer, predicate);
384                self.write_string(&mut buffer, object);
385                self.write_optional_string(&mut buffer, graph.as_deref());
386                self.write_metadata(&mut buffer, metadata)?;
387            }
388            // ... implement other event types similarly
389            _ => {
390                return Err(anyhow!(
391                    "Binary serialization not implemented for this event type"
392                ))
393            }
394        }
395
396        Ok(buffer)
397    }
398
399    /// Helper to write string to binary buffer
400    fn write_string(&self, buffer: &mut Vec<u8>, s: &str) {
401        let bytes = s.as_bytes();
402        buffer.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
403        buffer.extend_from_slice(bytes);
404    }
405
406    /// Helper to write optional string
407    fn write_optional_string(&self, buffer: &mut Vec<u8>, s: Option<&str>) {
408        match s {
409            Some(s) => {
410                buffer.push(1); // Present
411                self.write_string(buffer, s);
412            }
413            None => {
414                buffer.push(0); // Not present
415            }
416        }
417    }
418
419    /// Helper to write metadata
420    fn write_metadata(&self, buffer: &mut Vec<u8>, metadata: &EventMetadata) -> Result<()> {
421        // Serialize metadata as JSON for simplicity
422        let metadata_json = serde_json::to_vec(metadata)?;
423        buffer.extend_from_slice(&(metadata_json.len() as u32).to_le_bytes());
424        buffer.extend_from_slice(&metadata_json);
425        Ok(())
426    }
427
428    /// Deserialize from binary format
429    fn deserialize_binary(&self, data: &[u8]) -> Result<StreamEvent> {
430        if data.len() < 2 {
431            return Err(anyhow!("Binary data too short"));
432        }
433
434        let version = data[0];
435        if version != 1 {
436            return Err(anyhow!("Unsupported binary format version: {version}"));
437        }
438
439        let event_type = data[1];
440        let mut cursor = std::io::Cursor::new(&data[2..]);
441
442        match event_type {
443            1 => {
444                // TripleAdded
445                let subject = self.read_string(&mut cursor)?;
446                let predicate = self.read_string(&mut cursor)?;
447                let object = self.read_string(&mut cursor)?;
448                let graph = self.read_optional_string(&mut cursor)?;
449                let metadata = self.read_metadata(&mut cursor)?;
450
451                Ok(StreamEvent::TripleAdded {
452                    subject,
453                    predicate,
454                    object,
455                    graph,
456                    metadata,
457                })
458            }
459            // ... implement other event types
460            _ => Err(anyhow!("Unknown event type: {event_type}")),
461        }
462    }
463
464    /// Helper to read string from cursor
465    fn read_string(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<String> {
466        use std::io::Read;
467
468        let mut len_bytes = [0u8; 4];
469        cursor.read_exact(&mut len_bytes)?;
470        let len = u32::from_le_bytes(len_bytes) as usize;
471
472        let mut bytes = vec![0u8; len];
473        cursor.read_exact(&mut bytes)?;
474
475        String::from_utf8(bytes).map_err(|e| anyhow!("Invalid UTF-8: {e}"))
476    }
477
478    /// Helper to read optional string
479    fn read_optional_string(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<Option<String>> {
480        use std::io::Read;
481
482        let mut present = [0u8; 1];
483        cursor.read_exact(&mut present)?;
484
485        if present[0] == 1 {
486            Ok(Some(self.read_string(cursor)?))
487        } else {
488            Ok(None)
489        }
490    }
491
492    /// Helper to read metadata
493    fn read_metadata(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<EventMetadata> {
494        use std::io::Read;
495
496        let mut len_bytes = [0u8; 4];
497        cursor.read_exact(&mut len_bytes)?;
498        let len = u32::from_le_bytes(len_bytes) as usize;
499
500        let mut json_bytes = vec![0u8; len];
501        cursor.read_exact(&mut json_bytes)?;
502
503        serde_json::from_slice(&json_bytes).map_err(|e| anyhow!("Failed to parse metadata: {e}"))
504    }
505
506    /// Serialize to MessagePack
507    fn serialize_messagepack(&self, event: &StreamEvent) -> Result<Vec<u8>> {
508        rmp_serde::to_vec(event).map_err(|e| anyhow!("MessagePack serialization failed: {e}"))
509    }
510
511    /// Deserialize from MessagePack
512    fn deserialize_messagepack(&self, data: &[u8]) -> Result<StreamEvent> {
513        rmp_serde::from_slice(data).map_err(|e| anyhow!("MessagePack deserialization failed: {e}"))
514    }
515
516    /// Serialize to CBOR
517    fn serialize_cbor(&self, event: &StreamEvent) -> Result<Vec<u8>> {
518        let mut buf = Vec::new();
519        ciborium::ser::into_writer(event, &mut buf)
520            .map_err(|e| anyhow!("CBOR serialization failed: {e}"))?;
521        Ok(buf)
522    }
523
524    /// Deserialize from CBOR
525    fn deserialize_cbor(&self, data: &[u8]) -> Result<StreamEvent> {
526        ciborium::de::from_reader(data).map_err(|e| anyhow!("CBOR deserialization failed: {e}"))
527    }
528
529    /// Serialize to Protocol Buffers
530    fn serialize_protobuf(&self, event: &StreamEvent) -> Result<Vec<u8>> {
531        // Use prost for Protocol Buffers serialization
532        // For now, we'll use a JSON-based approach until proper proto definitions are created
533        let json_data = serde_json::to_value(event)?;
534        let proto_event = ProtobufStreamEvent::from_json(&json_data)?;
535
536        let mut buf = Vec::new();
537        prost::Message::encode(&proto_event, &mut buf)?;
538        Ok(buf)
539    }
540
541    /// Deserialize from Protocol Buffers
542    fn deserialize_protobuf(&self, data: &[u8]) -> Result<StreamEvent> {
543        let proto_event = ProtobufStreamEvent::decode(data)?;
544        let json_value = proto_event.to_json()?;
545        let event: StreamEvent = serde_json::from_value(json_value)?;
546        Ok(event)
547    }
548
549    /// Serialize to Apache Avro
550    async fn serialize_avro(&self, event: &StreamEvent) -> Result<Vec<u8>> {
551        // Get schema from registry if available
552        let schema = if let Some(registry) = &self.schema_registry {
553            registry.get_avro_schema_for_event(event).await?
554        } else {
555            // Use default schema
556            get_default_avro_schema()
557        };
558
559        // Convert event to Avro value
560        let avro_value = to_avro_value(event, &schema)?;
561
562        // Serialize with schema
563        let mut writer = Vec::new();
564        let mut encoder = apache_avro::Writer::new(&schema, &mut writer);
565        encoder.append(avro_value)?;
566        encoder.flush()?;
567
568        // apache-avro 0.21: extract the writer before encoder is dropped
569        let result = encoder.into_inner()?.to_vec();
570        Ok(result)
571    }
572
573    /// Deserialize from Apache Avro
574    async fn deserialize_avro(&self, data: &[u8]) -> Result<StreamEvent> {
575        // Extract schema from data header
576        let reader = apache_avro::Reader::new(data)?;
577        let schema = reader.writer_schema().clone();
578
579        // Read the first (and only) record
580        if let Some(record) = reader.into_iter().next() {
581            let avro_value = record?;
582            let event = from_avro_value(&avro_value, &schema)?;
583            Ok(event)
584        } else {
585            Err(anyhow!("No Avro record found in data"))
586        }
587    }
588
589    /// Compress data
590    fn compress(&self, data: &[u8], compression: &CompressionType) -> Result<Vec<u8>> {
591        use flate2::write::GzEncoder;
592        use std::io::Write;
593
594        match compression {
595            CompressionType::None => Ok(data.to_vec()),
596            CompressionType::Gzip => {
597                let mut encoder = GzEncoder::new(Vec::new(), flate2::Compression::default());
598                encoder.write_all(data)?;
599                encoder
600                    .finish()
601                    .map_err(|e| anyhow!("Gzip compression failed: {e}"))
602            }
603            CompressionType::Zstd => oxiarc_zstd::encode_all(data, 3)
604                .map_err(|e| anyhow!("Zstd compression failed: {e}")),
605            _ => Err(anyhow!("Compression type {compression:?} not implemented")),
606        }
607    }
608
609    /// Decompress data
610    fn decompress(&self, data: &[u8], compression: &CompressionType) -> Result<Vec<u8>> {
611        use flate2::read::GzDecoder;
612        use std::io::Read;
613
614        match compression {
615            CompressionType::None => Ok(data.to_vec()),
616            CompressionType::Gzip => {
617                let mut decoder = GzDecoder::new(data);
618                let mut decompressed = Vec::new();
619                decoder.read_to_end(&mut decompressed)?;
620                Ok(decompressed)
621            }
622            CompressionType::Zstd => {
623                oxiarc_zstd::decode_all(data).map_err(|e| anyhow!("Zstd decompression failed: {e}"))
624            }
625            _ => Err(anyhow!(
626                "Decompression type {compression:?} not implemented"
627            )),
628        }
629    }
630}
631
632impl SchemaRegistry {
633    /// Create a new schema registry
634    pub fn new(evolution_rules: EvolutionRules) -> Self {
635        Self {
636            schemas: Arc::new(RwLock::new(HashMap::new())),
637            evolution_rules,
638        }
639    }
640
641    /// Register a schema
642    pub async fn register_schema(&self, schema: Schema) -> Result<String> {
643        let schema_id = schema.id.clone();
644        self.schemas.write().await.insert(schema_id.clone(), schema);
645        Ok(schema_id)
646    }
647
648    /// Get schema by ID
649    pub async fn get_schema(&self, id: &str) -> Result<Schema> {
650        self.schemas
651            .read()
652            .await
653            .get(id)
654            .cloned()
655            .ok_or_else(|| anyhow!("Schema {id} not found"))
656    }
657
658    /// Get schema ID for an event
659    pub async fn get_schema_id_for_event(&self, _event: &StreamEvent) -> Result<String> {
660        // In a real implementation, this would determine the appropriate schema
661        Ok("default-v1".to_string())
662    }
663
664    /// Validate schema evolution
665    pub async fn validate_evolution(&self, old_schema: &Schema, new_schema: &Schema) -> Result<()> {
666        match old_schema.compatibility {
667            CompatibilityMode::None => Ok(()),
668            CompatibilityMode::Backward => {
669                // Check if new schema can read old data
670                self.check_backward_compatibility(old_schema, new_schema)
671            }
672            CompatibilityMode::Forward => {
673                // Check if old schema can read new data
674                self.check_forward_compatibility(old_schema, new_schema)
675            }
676            CompatibilityMode::Full => {
677                // Check both directions
678                self.check_backward_compatibility(old_schema, new_schema)?;
679                self.check_forward_compatibility(old_schema, new_schema)
680            }
681        }
682    }
683
684    /// Check backward compatibility
685    fn check_backward_compatibility(
686        &self,
687        _old_schema: &Schema,
688        _new_schema: &Schema,
689    ) -> Result<()> {
690        // Implementation would check if new schema can read old data
691        Ok(())
692    }
693
694    /// Check forward compatibility
695    fn check_forward_compatibility(
696        &self,
697        _old_schema: &Schema,
698        _new_schema: &Schema,
699    ) -> Result<()> {
700        // Implementation would check if old schema can read new data
701        Ok(())
702    }
703}
704
705/// Format converter for converting between serialization formats
706pub struct FormatConverter {
707    source_format: SerializationFormat,
708    target_format: SerializationFormat,
709    schema_registry: Option<Arc<SchemaRegistry>>,
710}
711
712impl FormatConverter {
713    /// Create a new format converter
714    pub fn new(source: SerializationFormat, target: SerializationFormat) -> Self {
715        Self {
716            source_format: source,
717            target_format: target,
718            schema_registry: None,
719        }
720    }
721
722    /// Convert data between formats
723    pub async fn convert(&self, data: &[u8]) -> Result<Bytes> {
724        // Deserialize from source format
725        let source_serializer = EventSerializer::new(self.source_format);
726        let event = source_serializer.deserialize(data).await?;
727
728        // Serialize to target format
729        let target_serializer = EventSerializer::new(self.target_format);
730        target_serializer.serialize(&event).await
731    }
732}
733
734#[cfg(test)]
735mod tests {
736    use super::*;
737    use crate::StreamEvent;
738
739    #[tokio::test]
740    async fn test_json_serialization() {
741        let event = StreamEvent::Heartbeat {
742            timestamp: chrono::Utc::now(),
743            source: "test".to_string(),
744            metadata: crate::event::EventMetadata::default(),
745        };
746
747        let serializer = EventSerializer::new(SerializationFormat::Json);
748        let serialized = serializer.serialize(&event).await.unwrap();
749        let deserialized = serializer.deserialize(&serialized).await.unwrap();
750
751        match deserialized {
752            StreamEvent::Heartbeat { source, .. } => {
753                assert_eq!(source, "test");
754            }
755            _ => panic!("Wrong event type"),
756        }
757    }
758
759    #[tokio::test]
760    async fn test_format_detection() {
761        let json_data = b"{\"test\": \"data\"}";
762        assert_eq!(
763            SerializationFormat::detect(json_data),
764            Some(SerializationFormat::Json)
765        );
766
767        let magic_data = b"PB03some_data";
768        assert_eq!(
769            SerializationFormat::detect(magic_data),
770            Some(SerializationFormat::Protobuf)
771        );
772    }
773
774    #[tokio::test]
775    async fn test_compression() {
776        let event = StreamEvent::Heartbeat {
777            timestamp: chrono::Utc::now(),
778            source: "test".to_string(),
779            metadata: crate::event::EventMetadata::default(),
780        };
781
782        let serializer =
783            EventSerializer::new(SerializationFormat::Json).with_compression(CompressionType::Gzip);
784
785        let serialized = serializer.serialize(&event).await.unwrap();
786        let deserialized = serializer.deserialize(&serialized).await.unwrap();
787
788        match deserialized {
789            StreamEvent::Heartbeat { source, .. } => {
790                assert_eq!(source, "test");
791            }
792            _ => panic!("Wrong event type"),
793        }
794    }
795
796    #[tokio::test]
797    async fn test_messagepack_serialization() {
798        let metadata = EventMetadata::default();
799        let event = StreamEvent::TripleAdded {
800            subject: "http://example.org/subject".to_string(),
801            predicate: "http://example.org/predicate".to_string(),
802            object: "http://example.org/object".to_string(),
803            graph: None,
804            metadata,
805        };
806
807        let serializer = EventSerializer::new(SerializationFormat::MessagePack);
808        let serialized = serializer.serialize(&event).await.unwrap();
809        let deserialized = serializer.deserialize(&serialized).await.unwrap();
810
811        match deserialized {
812            StreamEvent::TripleAdded {
813                subject,
814                predicate,
815                object,
816                ..
817            } => {
818                assert_eq!(subject, "http://example.org/subject");
819                assert_eq!(predicate, "http://example.org/predicate");
820                assert_eq!(object, "http://example.org/object");
821            }
822            _ => panic!("Wrong event type"),
823        }
824    }
825
826    #[tokio::test]
827    async fn test_format_conversion() {
828        let event = StreamEvent::Heartbeat {
829            timestamp: chrono::Utc::now(),
830            source: "test".to_string(),
831            metadata: crate::event::EventMetadata::default(),
832        };
833
834        // Serialize to JSON
835        let json_serializer = EventSerializer::new(SerializationFormat::Json);
836        let json_data = json_serializer.serialize(&event).await.unwrap();
837
838        // Convert to MessagePack
839        let converter =
840            FormatConverter::new(SerializationFormat::Json, SerializationFormat::MessagePack);
841        let msgpack_data = converter.convert(&json_data).await.unwrap();
842
843        // Verify by deserializing
844        let msgpack_serializer = EventSerializer::new(SerializationFormat::MessagePack);
845        let deserialized = msgpack_serializer.deserialize(&msgpack_data).await.unwrap();
846
847        match deserialized {
848            StreamEvent::Heartbeat { source, .. } => {
849                assert_eq!(source, "test");
850            }
851            _ => panic!("Wrong event type"),
852        }
853    }
854}
855
856// Supporting types and functions for Protobuf and Avro serialization
857
858/// Protobuf representation of StreamEvent
859/// This is a simplified version - in practice you'd use proper .proto definitions
860#[derive(Debug, Clone)]
861pub struct ProtobufStreamEvent {
862    pub event_type: String,
863    pub data: Vec<u8>,
864    pub metadata: Vec<u8>,
865}
866
867impl ProtobufStreamEvent {
868    /// Convert from JSON value
869    pub fn from_json(json: &serde_json::Value) -> Result<Self> {
870        // Extract event type
871        let event_type = "StreamEvent".to_string(); // Simplified
872
873        // Serialize the entire JSON as data
874        let data = serde_json::to_vec(json)?;
875
876        // Empty metadata for now
877        let metadata = Vec::new();
878
879        Ok(Self {
880            event_type,
881            data,
882            metadata,
883        })
884    }
885
886    /// Convert to JSON value
887    pub fn to_json(&self) -> Result<serde_json::Value> {
888        serde_json::from_slice(&self.data).map_err(|e| anyhow!("Failed to parse JSON: {}", e))
889    }
890
891    /// Encode using prost
892    pub fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
893        // Simplified encoding - in practice use proper prost::Message
894        buf.extend_from_slice(&self.data);
895        Ok(())
896    }
897
898    /// Decode using prost
899    pub fn decode(data: &[u8]) -> Result<Self> {
900        // Simplified decoding - in practice use proper prost::Message
901        Ok(Self {
902            event_type: "StreamEvent".to_string(),
903            data: data.to_vec(),
904            metadata: Vec::new(),
905        })
906    }
907}
908
909impl prost::Message for ProtobufStreamEvent {
910    fn encode_raw(&self, buf: &mut impl prost::bytes::BufMut) {
911        // Simplified implementation
912        buf.put_slice(&self.data);
913    }
914
915    fn merge_field(
916        &mut self,
917        _tag: u32,
918        _wire_type: prost::encoding::WireType,
919        _buf: &mut impl prost::bytes::Buf,
920        _ctx: prost::encoding::DecodeContext,
921    ) -> Result<(), prost::DecodeError> {
922        Ok(())
923    }
924
925    fn encoded_len(&self) -> usize {
926        self.data.len()
927    }
928
929    fn clear(&mut self) {
930        self.data.clear();
931        self.metadata.clear();
932    }
933}
934
935/// Get default Avro schema for StreamEvent
936pub fn get_default_avro_schema() -> apache_avro::Schema {
937    let schema_str = r#"
938    {
939        "type": "record",
940        "name": "StreamEvent",
941        "fields": [
942            {"name": "event_type", "type": "string"},
943            {"name": "data", "type": "bytes"},
944            {"name": "metadata", "type": ["null", "bytes"], "default": null}
945        ]
946    }
947    "#;
948
949    apache_avro::Schema::parse_str(schema_str).expect("Failed to parse default Avro schema")
950}
951
952/// Convert StreamEvent to Avro value
953pub fn to_avro_value(
954    event: &StreamEvent,
955    _schema: &apache_avro::Schema,
956) -> Result<apache_avro::types::Value> {
957    // Simplified conversion - serialize to JSON then to bytes
958    let json_data = serde_json::to_vec(event)?;
959
960    let fields = vec![
961        (
962            "event_type".to_string(),
963            apache_avro::types::Value::String("StreamEvent".to_string()),
964        ),
965        (
966            "data".to_string(),
967            apache_avro::types::Value::Bytes(json_data),
968        ),
969        (
970            "metadata".to_string(),
971            apache_avro::types::Value::Union(0, Box::new(apache_avro::types::Value::Null)),
972        ),
973    ];
974
975    Ok(apache_avro::types::Value::Record(fields))
976}
977
978/// Convert Avro value to StreamEvent
979pub fn from_avro_value(
980    value: &apache_avro::types::Value,
981    _schema: &apache_avro::Schema,
982) -> Result<StreamEvent> {
983    match value {
984        apache_avro::types::Value::Record(fields) => {
985            // Extract data field
986            for (name, field_value) in fields {
987                if name == "data" {
988                    if let apache_avro::types::Value::Bytes(bytes) = field_value {
989                        let event: StreamEvent = serde_json::from_slice(bytes)?;
990                        return Ok(event);
991                    }
992                }
993            }
994            Err(anyhow!("No data field found in Avro record"))
995        }
996        _ => Err(anyhow!("Expected Avro record, got {:?}", value)),
997    }
998}
999
1000impl SchemaRegistry {
1001    /// Get Avro schema for event
1002    pub async fn get_avro_schema_for_event(
1003        &self,
1004        _event: &StreamEvent,
1005    ) -> Result<apache_avro::Schema> {
1006        // In practice, this would look up the appropriate schema
1007        Ok(get_default_avro_schema())
1008    }
1009}
1010
1011/// Delta compression support for event streams
1012pub struct DeltaCompressor {
1013    /// Previous event states for delta calculation
1014    previous_states: Arc<RwLock<HashMap<String, StreamEvent>>>,
1015    /// Compression algorithm to use
1016    compression_type: DeltaCompressionType,
1017    /// Maximum states to keep in memory
1018    max_states: usize,
1019}
1020
1021/// Delta compression algorithms
1022#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
1023pub enum DeltaCompressionType {
1024    /// XOR-based delta compression
1025    Xor,
1026    /// Prefix compression for strings
1027    Prefix,
1028    /// Dictionary-based compression
1029    Dictionary,
1030    /// LZ4-based delta compression
1031    Lz4Delta,
1032}
1033
1034impl DeltaCompressor {
1035    /// Create a new delta compressor
1036    pub fn new(compression_type: DeltaCompressionType, max_states: usize) -> Self {
1037        Self {
1038            previous_states: Arc::new(RwLock::new(HashMap::new())),
1039            compression_type,
1040            max_states,
1041        }
1042    }
1043
1044    /// Compress event using delta compression
1045    pub async fn compress_delta(
1046        &self,
1047        event: &StreamEvent,
1048        event_id: &str,
1049    ) -> Result<DeltaCompressedEvent> {
1050        let mut states = self.previous_states.write().await;
1051
1052        // Clean up old states if we exceed the limit
1053        if states.len() >= self.max_states {
1054            let keys_to_remove: Vec<String> = states
1055                .keys()
1056                .take(states.len() - self.max_states + 1)
1057                .cloned()
1058                .collect();
1059            for key in keys_to_remove {
1060                states.remove(&key);
1061            }
1062        }
1063
1064        let delta = if let Some(previous) = states.get(event_id) {
1065            self.calculate_delta(previous, event)?
1066        } else {
1067            // First event, store as full event
1068            EventDelta::Full(Box::new(event.clone()))
1069        };
1070
1071        // Update state
1072        states.insert(event_id.to_string(), event.clone());
1073
1074        Ok(DeltaCompressedEvent {
1075            event_id: event_id.to_string(),
1076            delta,
1077            compression_type: self.compression_type,
1078            timestamp: chrono::Utc::now(),
1079        })
1080    }
1081
1082    /// Calculate delta between two events
1083    fn calculate_delta(&self, previous: &StreamEvent, current: &StreamEvent) -> Result<EventDelta> {
1084        match self.compression_type {
1085            DeltaCompressionType::Xor => self.calculate_xor_delta(previous, current),
1086            DeltaCompressionType::Prefix => self.calculate_prefix_delta(previous, current),
1087            DeltaCompressionType::Dictionary => self.calculate_dictionary_delta(previous, current),
1088            DeltaCompressionType::Lz4Delta => self.calculate_lz4_delta(previous, current),
1089        }
1090    }
1091
1092    /// XOR-based delta compression
1093    fn calculate_xor_delta(
1094        &self,
1095        previous: &StreamEvent,
1096        current: &StreamEvent,
1097    ) -> Result<EventDelta> {
1098        let prev_bytes = serde_json::to_vec(previous)?;
1099        let curr_bytes = serde_json::to_vec(current)?;
1100
1101        if prev_bytes.len() != curr_bytes.len() {
1102            // If sizes differ, store as full event
1103            return Ok(EventDelta::Full(Box::new(current.clone())));
1104        }
1105
1106        let xor_bytes: Vec<u8> = prev_bytes
1107            .iter()
1108            .zip(curr_bytes.iter())
1109            .map(|(a, b)| a ^ b)
1110            .collect();
1111
1112        Ok(EventDelta::Xor(xor_bytes))
1113    }
1114
1115    /// Prefix compression for string fields
1116    fn calculate_prefix_delta(
1117        &self,
1118        previous: &StreamEvent,
1119        current: &StreamEvent,
1120    ) -> Result<EventDelta> {
1121        let prev_json = serde_json::to_value(previous)?;
1122        let curr_json = serde_json::to_value(current)?;
1123
1124        let diff = self.calculate_json_prefix_diff(&prev_json, &curr_json)?;
1125        Ok(EventDelta::Prefix(diff))
1126    }
1127
1128    /// Dictionary-based compression
1129    fn calculate_dictionary_delta(
1130        &self,
1131        previous: &StreamEvent,
1132        current: &StreamEvent,
1133    ) -> Result<EventDelta> {
1134        let prev_strings = self.extract_strings_from_event(previous);
1135        let curr_strings = self.extract_strings_from_event(current);
1136
1137        let mut dictionary = HashMap::new();
1138        let mut dict_id = 0u16;
1139
1140        // Build dictionary from common strings
1141        for string in &prev_strings {
1142            if curr_strings.contains(string) && !dictionary.contains_key(string) {
1143                dictionary.insert(string.clone(), dict_id);
1144                dict_id += 1;
1145            }
1146        }
1147
1148        // Replace strings with dictionary IDs
1149        let compressed_event = self.replace_strings_with_ids(current, &dictionary)?;
1150
1151        Ok(EventDelta::Dictionary {
1152            dictionary,
1153            compressed_event,
1154        })
1155    }
1156
1157    /// LZ4-based delta compression
1158    fn calculate_lz4_delta(
1159        &self,
1160        previous: &StreamEvent,
1161        current: &StreamEvent,
1162    ) -> Result<EventDelta> {
1163        let prev_bytes = serde_json::to_vec(previous)?;
1164        let curr_bytes = serde_json::to_vec(current)?;
1165
1166        // Simple delta: store additions and removals
1167        let diff_bytes = self.calculate_byte_diff(&prev_bytes, &curr_bytes);
1168        let compressed = oxiarc_lz4::compress(&diff_bytes)
1169            .map_err(|e| anyhow!("LZ4 compression failed: {}", e))?;
1170
1171        Ok(EventDelta::Lz4(compressed))
1172    }
1173
1174    /// Calculate JSON prefix differences
1175    fn calculate_json_prefix_diff(
1176        &self,
1177        prev: &serde_json::Value,
1178        curr: &serde_json::Value,
1179    ) -> Result<serde_json::Value> {
1180        match (prev, curr) {
1181            (serde_json::Value::Object(prev_obj), serde_json::Value::Object(curr_obj)) => {
1182                let mut diff = serde_json::Map::new();
1183                for (key, curr_val) in curr_obj {
1184                    if let Some(prev_val) = prev_obj.get(key) {
1185                        if prev_val != curr_val {
1186                            diff.insert(key.clone(), curr_val.clone());
1187                        }
1188                    } else {
1189                        diff.insert(key.clone(), curr_val.clone());
1190                    }
1191                }
1192                Ok(serde_json::Value::Object(diff))
1193            }
1194            _ => Ok(curr.clone()),
1195        }
1196    }
1197
1198    /// Extract all strings from an event
1199    fn extract_strings_from_event(&self, event: &StreamEvent) -> Vec<String> {
1200        let mut strings = Vec::new();
1201        if let Ok(json) = serde_json::to_value(event) {
1202            Self::extract_strings_from_json(&json, &mut strings);
1203        }
1204        strings
1205    }
1206
1207    /// Recursively extract strings from JSON value
1208    fn extract_strings_from_json(value: &serde_json::Value, strings: &mut Vec<String>) {
1209        match value {
1210            serde_json::Value::String(s) => strings.push(s.clone()),
1211            serde_json::Value::Array(arr) => {
1212                for item in arr {
1213                    Self::extract_strings_from_json(item, strings);
1214                }
1215            }
1216            serde_json::Value::Object(obj) => {
1217                for (_, val) in obj {
1218                    Self::extract_strings_from_json(val, strings);
1219                }
1220            }
1221            _ => {}
1222        }
1223    }
1224
1225    /// Replace strings with dictionary IDs
1226    fn replace_strings_with_ids(
1227        &self,
1228        event: &StreamEvent,
1229        dictionary: &HashMap<String, u16>,
1230    ) -> Result<serde_json::Value> {
1231        let mut json = serde_json::to_value(event)?;
1232        Self::replace_strings_in_json(&mut json, dictionary);
1233        Ok(json)
1234    }
1235
1236    /// Recursively replace strings in JSON
1237    fn replace_strings_in_json(value: &mut serde_json::Value, dictionary: &HashMap<String, u16>) {
1238        match value {
1239            serde_json::Value::String(s) => {
1240                if let Some(&id) = dictionary.get(s) {
1241                    *value = serde_json::Value::Number(serde_json::Number::from(id));
1242                }
1243            }
1244            serde_json::Value::Array(arr) => {
1245                for item in arr {
1246                    Self::replace_strings_in_json(item, dictionary);
1247                }
1248            }
1249            serde_json::Value::Object(obj) => {
1250                for val in obj.values_mut() {
1251                    Self::replace_strings_in_json(val, dictionary);
1252                }
1253            }
1254            _ => {}
1255        }
1256    }
1257
1258    /// Calculate byte-level differences
1259    fn calculate_byte_diff(&self, prev: &[u8], curr: &[u8]) -> Vec<u8> {
1260        // Simple implementation - could be enhanced with more sophisticated diff algorithms
1261        let mut diff = Vec::new();
1262
1263        // Store length difference
1264        diff.extend_from_slice(&(curr.len() as u32).to_le_bytes());
1265        diff.extend_from_slice(&(prev.len() as u32).to_le_bytes());
1266
1267        // Store the current bytes (simplified)
1268        diff.extend_from_slice(curr);
1269
1270        diff
1271    }
1272
1273    /// Decompress delta-compressed event
1274    pub async fn decompress_delta(
1275        &self,
1276        compressed: &DeltaCompressedEvent,
1277        previous_event: Option<&StreamEvent>,
1278    ) -> Result<StreamEvent> {
1279        match &compressed.delta {
1280            EventDelta::Full(event) => Ok((**event).clone()),
1281            EventDelta::Xor(xor_bytes) => {
1282                if let Some(prev) = previous_event {
1283                    let prev_bytes = serde_json::to_vec(prev)?;
1284                    if prev_bytes.len() == xor_bytes.len() {
1285                        let restored_bytes: Vec<u8> = prev_bytes
1286                            .iter()
1287                            .zip(xor_bytes.iter())
1288                            .map(|(a, b)| a ^ b)
1289                            .collect();
1290                        let event = serde_json::from_slice(&restored_bytes)?;
1291                        Ok(event)
1292                    } else {
1293                        Err(anyhow!("XOR delta length mismatch"))
1294                    }
1295                } else {
1296                    Err(anyhow!("Previous event required for XOR decompression"))
1297                }
1298            }
1299            EventDelta::Prefix(diff) => {
1300                if let Some(prev) = previous_event {
1301                    let mut prev_json = serde_json::to_value(prev)?;
1302                    self.apply_json_diff(&mut prev_json, diff)?;
1303                    let event = serde_json::from_value(prev_json)?;
1304                    Ok(event)
1305                } else {
1306                    Err(anyhow!("Previous event required for prefix decompression"))
1307                }
1308            }
1309            EventDelta::Dictionary {
1310                dictionary,
1311                compressed_event,
1312            } => {
1313                let mut restored_json = compressed_event.clone();
1314                let reverse_dict: HashMap<u16, String> =
1315                    dictionary.iter().map(|(k, &v)| (v, k.clone())).collect();
1316                Self::restore_strings_from_ids(&mut restored_json, &reverse_dict);
1317                let event = serde_json::from_value(restored_json)?;
1318                Ok(event)
1319            }
1320            EventDelta::Lz4(compressed_bytes) => {
1321                let decompressed = oxiarc_lz4::decompress(compressed_bytes, 100 * 1024 * 1024)
1322                    .map_err(|e| anyhow!("LZ4 decompression failed: {}", e))?;
1323                // Restore from diff (simplified - would need more sophisticated restoration)
1324                let event = serde_json::from_slice(&decompressed)?;
1325                Ok(event)
1326            }
1327        }
1328    }
1329
1330    /// Apply JSON diff to base JSON
1331    fn apply_json_diff(
1332        &self,
1333        base: &mut serde_json::Value,
1334        diff: &serde_json::Value,
1335    ) -> Result<()> {
1336        if let (Some(base_obj), Some(diff_obj)) = (base.as_object_mut(), diff.as_object()) {
1337            for (key, diff_val) in diff_obj {
1338                base_obj.insert(key.clone(), diff_val.clone());
1339            }
1340        } else {
1341            *base = diff.clone();
1342        }
1343        Ok(())
1344    }
1345
1346    /// Restore strings from dictionary IDs
1347    fn restore_strings_from_ids(
1348        value: &mut serde_json::Value,
1349        reverse_dict: &HashMap<u16, String>,
1350    ) {
1351        match value {
1352            serde_json::Value::Number(n) => {
1353                if let Some(id) = n.as_u64() {
1354                    if let Some(string) = reverse_dict.get(&(id as u16)) {
1355                        *value = serde_json::Value::String(string.clone());
1356                    }
1357                }
1358            }
1359            serde_json::Value::Array(arr) => {
1360                for item in arr {
1361                    Self::restore_strings_from_ids(item, reverse_dict);
1362                }
1363            }
1364            serde_json::Value::Object(obj) => {
1365                for val in obj.values_mut() {
1366                    Self::restore_strings_from_ids(val, reverse_dict);
1367                }
1368            }
1369            _ => {}
1370        }
1371    }
1372}
1373
1374/// Delta-compressed event representation
1375#[derive(Debug, Clone, Serialize, Deserialize)]
1376pub struct DeltaCompressedEvent {
1377    pub event_id: String,
1378    pub delta: EventDelta,
1379    pub compression_type: DeltaCompressionType,
1380    pub timestamp: DateTime<Utc>,
1381}
1382
1383/// Event delta representations
1384#[derive(Debug, Clone, Serialize, Deserialize)]
1385pub enum EventDelta {
1386    /// Full event (no compression possible)
1387    Full(Box<StreamEvent>),
1388    /// XOR-based delta
1389    Xor(Vec<u8>),
1390    /// Prefix-based delta
1391    Prefix(serde_json::Value),
1392    /// Dictionary-based compression
1393    Dictionary {
1394        dictionary: HashMap<String, u16>,
1395        compressed_event: serde_json::Value,
1396    },
1397    /// LZ4 compressed delta
1398    Lz4(Vec<u8>),
1399}
1400
1401/// Streaming serializer for batch processing
1402pub struct StreamingSerializer {
1403    serializer: EventSerializer,
1404    delta_compressor: Option<DeltaCompressor>,
1405    batch_size: usize,
1406    current_batch: Vec<StreamEvent>,
1407}
1408
1409impl StreamingSerializer {
1410    /// Create a new streaming serializer
1411    pub fn new(serializer: EventSerializer, batch_size: usize) -> Self {
1412        Self {
1413            serializer,
1414            delta_compressor: None,
1415            batch_size,
1416            current_batch: Vec::new(),
1417        }
1418    }
1419
1420    /// Enable delta compression
1421    pub fn with_delta_compression(
1422        mut self,
1423        compression_type: DeltaCompressionType,
1424        max_states: usize,
1425    ) -> Self {
1426        self.delta_compressor = Some(DeltaCompressor::new(compression_type, max_states));
1427        self
1428    }
1429
1430    /// Add event to batch
1431    pub async fn add_event(&mut self, event: StreamEvent) -> Result<Option<Bytes>> {
1432        self.current_batch.push(event);
1433
1434        if self.current_batch.len() >= self.batch_size {
1435            self.flush_batch().await
1436        } else {
1437            Ok(None)
1438        }
1439    }
1440
1441    /// Flush current batch
1442    pub async fn flush_batch(&mut self) -> Result<Option<Bytes>> {
1443        if self.current_batch.is_empty() {
1444            return Ok(None);
1445        }
1446
1447        let batch = std::mem::take(&mut self.current_batch);
1448        let serialized = self.serialize_batch(&batch).await?;
1449        Ok(Some(serialized))
1450    }
1451
1452    /// Serialize a batch of events
1453    async fn serialize_batch(&self, batch: &[StreamEvent]) -> Result<Bytes> {
1454        let mut buffer = BytesMut::new();
1455
1456        // Write batch header
1457        buffer.put_u32(batch.len() as u32);
1458        buffer.put_u64(chrono::Utc::now().timestamp_millis() as u64);
1459
1460        // Serialize each event
1461        for event in batch {
1462            let event_data = self.serializer.serialize(event).await?;
1463            buffer.put_u32(event_data.len() as u32);
1464            buffer.put(event_data);
1465        }
1466
1467        Ok(buffer.freeze())
1468    }
1469
1470    /// Deserialize a batch of events
1471    pub async fn deserialize_batch(&self, data: &[u8]) -> Result<Vec<StreamEvent>> {
1472        let mut cursor = std::io::Cursor::new(data);
1473        let mut events = Vec::new();
1474
1475        // Read batch header
1476        let batch_size = cursor.get_u32();
1477        let _timestamp = cursor.get_u64();
1478
1479        // Read each event
1480        for _ in 0..batch_size {
1481            let event_size = cursor.get_u32() as usize;
1482            let event_data =
1483                &data[cursor.position() as usize..(cursor.position() as usize + event_size)];
1484            cursor.advance(event_size);
1485
1486            let event = self.serializer.deserialize(event_data).await?;
1487            events.push(event);
1488        }
1489
1490        Ok(events)
1491    }
1492
1493    /// Create a stream of serialized batches
1494    pub fn create_batch_stream(
1495        &self,
1496        events: impl Stream<Item = StreamEvent> + Send + 'static,
1497    ) -> BoxStream<'static, Result<Bytes>> {
1498        let serializer = self.serializer.clone();
1499        let batch_size = self.batch_size;
1500
1501        Box::pin(events.chunks(batch_size).then(move |chunk| {
1502            let serializer = serializer.clone();
1503            async move {
1504                let streaming_serializer = StreamingSerializer::new(serializer, batch_size);
1505                streaming_serializer.serialize_batch(&chunk).await
1506            }
1507        }))
1508    }
1509}
1510
1511/// Enhanced binary format with streaming support
1512pub struct EnhancedBinaryFormat {
1513    version: u8,
1514    enable_compression: bool,
1515    enable_checksums: bool,
1516    chunk_size: usize,
1517}
1518
1519impl EnhancedBinaryFormat {
1520    /// Create a new enhanced binary format
1521    pub fn new() -> Self {
1522        Self {
1523            version: 2, // Enhanced version
1524            enable_compression: true,
1525            enable_checksums: true,
1526            chunk_size: 8192, // 8KB chunks
1527        }
1528    }
1529
1530    /// Configure compression
1531    pub fn with_compression(mut self, enable: bool) -> Self {
1532        self.enable_compression = enable;
1533        self
1534    }
1535
1536    /// Configure checksums
1537    pub fn with_checksums(mut self, enable: bool) -> Self {
1538        self.enable_checksums = enable;
1539        self
1540    }
1541
1542    /// Set chunk size for streaming
1543    pub fn with_chunk_size(mut self, size: usize) -> Self {
1544        self.chunk_size = size;
1545        self
1546    }
1547
1548    /// Serialize event in enhanced binary format
1549    pub async fn serialize(&self, event: &StreamEvent) -> Result<Bytes> {
1550        let mut buffer = BytesMut::new();
1551
1552        // Header
1553        buffer.put(&b"BIN2"[..]); // Magic bytes for v2
1554        buffer.put_u8(self.version);
1555        buffer.put_u8(self.get_flags());
1556
1557        // Serialize event data
1558        let event_json = serde_json::to_vec(event)?;
1559
1560        // Apply compression if enabled
1561        let data = if self.enable_compression {
1562            oxiarc_lz4::compress(&event_json)
1563                .map_err(|e| anyhow!("LZ4 compression failed: {}", e))?
1564        } else {
1565            event_json
1566        };
1567
1568        // Add checksum if enabled
1569        if self.enable_checksums {
1570            let checksum = crc32fast::hash(&data);
1571            buffer.put_u32(checksum);
1572        }
1573
1574        // Add data length and data
1575        buffer.put_u32(data.len() as u32);
1576        buffer.put(&data[..]);
1577
1578        Ok(buffer.freeze())
1579    }
1580
1581    /// Deserialize event from enhanced binary format
1582    pub async fn deserialize(&self, data: &[u8]) -> Result<StreamEvent> {
1583        let mut cursor = std::io::Cursor::new(data);
1584
1585        // Check magic bytes
1586        let mut magic = [0u8; 4];
1587        cursor.read_exact(&mut magic)?;
1588        if &magic != b"BIN2" {
1589            return Err(anyhow!("Invalid magic bytes for enhanced binary format"));
1590        }
1591
1592        // Read version and flags
1593        let version = cursor.get_u8();
1594        if version != self.version {
1595            return Err(anyhow!(
1596                "Unsupported enhanced binary format version: {}",
1597                version
1598            ));
1599        }
1600
1601        let flags = cursor.get_u8();
1602        let has_compression = (flags & 0x01) != 0;
1603        let has_checksum = (flags & 0x02) != 0;
1604
1605        // Read checksum if present
1606        let expected_checksum = if has_checksum {
1607            Some(cursor.get_u32())
1608        } else {
1609            None
1610        };
1611
1612        // Read data
1613        let data_len = cursor.get_u32() as usize;
1614        let mut event_data = vec![0u8; data_len];
1615        cursor.read_exact(&mut event_data)?;
1616
1617        // Verify checksum
1618        if let Some(expected) = expected_checksum {
1619            let actual = crc32fast::hash(&event_data);
1620            if actual != expected {
1621                return Err(anyhow!(
1622                    "Checksum mismatch: expected {}, got {}",
1623                    expected,
1624                    actual
1625                ));
1626            }
1627        }
1628
1629        // Decompress if needed
1630        let decompressed = if has_compression {
1631            oxiarc_lz4::decompress(&event_data, 100 * 1024 * 1024)
1632                .map_err(|e| anyhow!("LZ4 decompression failed: {}", e))?
1633        } else {
1634            event_data
1635        };
1636
1637        // Deserialize event
1638        let event = serde_json::from_slice(&decompressed)?;
1639        Ok(event)
1640    }
1641
1642    /// Create streaming chunks for large events
1643    pub async fn serialize_streaming(&self, event: &StreamEvent) -> Result<Vec<Bytes>> {
1644        let serialized = self.serialize(event).await?;
1645        let mut chunks = Vec::new();
1646
1647        if serialized.len() <= self.chunk_size {
1648            chunks.push(serialized);
1649        } else {
1650            // Split into chunks
1651            let chunk_count = (serialized.len() + self.chunk_size - 1) / self.chunk_size;
1652
1653            for i in 0..chunk_count {
1654                let start = i * self.chunk_size;
1655                let end = std::cmp::min(start + self.chunk_size, serialized.len());
1656
1657                let mut chunk_buffer = BytesMut::new();
1658                chunk_buffer.put(&b"CHNK"[..]); // Chunk magic
1659                chunk_buffer.put_u32(i as u32); // Chunk index
1660                chunk_buffer.put_u32(chunk_count as u32); // Total chunks
1661                chunk_buffer.put_u32((end - start) as u32); // Chunk size
1662                chunk_buffer.put(&serialized[start..end]);
1663
1664                chunks.push(chunk_buffer.freeze());
1665            }
1666        }
1667
1668        Ok(chunks)
1669    }
1670
1671    /// Reassemble streaming chunks
1672    pub async fn deserialize_streaming(&self, chunks: Vec<Bytes>) -> Result<StreamEvent> {
1673        if chunks.len() == 1 && !chunks[0].starts_with(b"CHNK") {
1674            // Single chunk, deserialize directly
1675            return self.deserialize(&chunks[0]).await;
1676        }
1677
1678        // Reassemble chunks
1679        let mut chunk_data: BTreeMap<u32, Vec<u8>> = BTreeMap::new();
1680        let mut total_chunks = 0;
1681
1682        for chunk in chunks {
1683            if !chunk.starts_with(b"CHNK") {
1684                return Err(anyhow!("Invalid chunk format"));
1685            }
1686
1687            let mut cursor = std::io::Cursor::new(&chunk[4..]);
1688            let chunk_index = cursor.get_u32();
1689            let chunk_count = cursor.get_u32();
1690            let chunk_size = cursor.get_u32() as usize;
1691
1692            total_chunks = chunk_count;
1693
1694            let data = chunk[16..16 + chunk_size].to_vec();
1695            chunk_data.insert(chunk_index, data);
1696        }
1697
1698        if chunk_data.len() != total_chunks as usize {
1699            return Err(anyhow!(
1700                "Missing chunks: got {}, expected {}",
1701                chunk_data.len(),
1702                total_chunks
1703            ));
1704        }
1705
1706        // Reassemble data
1707        let mut reassembled = Vec::new();
1708        for (_index, data) in chunk_data {
1709            reassembled.extend(data);
1710        }
1711
1712        // Deserialize reassembled data
1713        self.deserialize(&reassembled).await
1714    }
1715
1716    /// Get format flags
1717    fn get_flags(&self) -> u8 {
1718        let mut flags = 0u8;
1719        if self.enable_compression {
1720            flags |= 0x01;
1721        }
1722        if self.enable_checksums {
1723            flags |= 0x02;
1724        }
1725        flags
1726    }
1727}
1728
1729impl Default for EnhancedBinaryFormat {
1730    fn default() -> Self {
1731        Self::new()
1732    }
1733}
1734
1735// Required imports are now at the top of the file