1use anyhow::{anyhow, Result};
11use bytes::{Buf, BufMut, Bytes, BytesMut};
12use chrono::{DateTime, Utc};
13use crc32fast;
14use futures::stream::{BoxStream, StreamExt as _};
15use serde::{Deserialize, Serialize};
16use std::collections::{BTreeMap, HashMap};
17use std::io::Read as _;
18use std::sync::Arc;
19use tokio::sync::RwLock;
20use tokio_stream::Stream;
21
22use crate::{CompressionType, EventMetadata, StreamEvent};
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
26pub enum SerializationFormat {
27 Json,
29 Protobuf,
31 Avro,
33 Binary,
35 MessagePack,
37 Cbor,
39}
40
41impl SerializationFormat {
42 pub fn magic_bytes(&self) -> &[u8] {
44 match self {
45 SerializationFormat::Json => b"JSON",
46 SerializationFormat::Protobuf => b"PB03",
47 SerializationFormat::Avro => b"Obj\x01",
48 SerializationFormat::Binary => b"BIN1",
49 SerializationFormat::MessagePack => b"MSGP",
50 SerializationFormat::Cbor => b"CBOR",
51 }
52 }
53
54 pub fn detect(data: &[u8]) -> Option<Self> {
56 if data.len() < 4 {
57 return None;
58 }
59
60 let magic = &data[0..4];
61 match magic {
62 b"JSON" => Some(SerializationFormat::Json),
63 b"PB03" => Some(SerializationFormat::Protobuf),
64 b"Obj\x01" => Some(SerializationFormat::Avro),
65 b"BIN1" => Some(SerializationFormat::Binary),
66 b"MSGP" => Some(SerializationFormat::MessagePack),
67 b"CBOR" => Some(SerializationFormat::Cbor),
68 _ => {
69 if data.starts_with(b"{") || data.starts_with(b"[") {
71 Some(SerializationFormat::Json)
72 } else {
73 None
74 }
75 }
76 }
77 }
78}
79
80#[derive(Clone)]
82pub struct EventSerializer {
83 format: SerializationFormat,
84 compression: Option<CompressionType>,
85 schema_registry: Option<Arc<SchemaRegistry>>,
86 options: SerializerOptions,
87}
88
89#[derive(Debug, Clone)]
91pub struct SerializerOptions {
92 pub include_schema_id: bool,
94 pub include_magic_bytes: bool,
96 pub pretty_json: bool,
98 pub validate_schema: bool,
100 pub max_size: Option<usize>,
102}
103
104impl Default for SerializerOptions {
105 fn default() -> Self {
106 Self {
107 include_schema_id: true,
108 include_magic_bytes: true,
109 pretty_json: false,
110 validate_schema: true,
111 max_size: Some(1024 * 1024), }
113 }
114}
115
116pub struct SchemaRegistry {
118 schemas: Arc<RwLock<HashMap<String, Schema>>>,
119 evolution_rules: EvolutionRules,
121}
122
123#[derive(Debug, Clone)]
125pub struct Schema {
126 pub id: String,
127 pub version: u32,
128 pub format: SerializationFormat,
129 pub definition: SchemaDefinition,
130 pub compatibility: CompatibilityMode,
131}
132
133#[derive(Debug, Clone)]
135pub enum SchemaDefinition {
136 JsonSchema(serde_json::Value),
138 ProtobufDescriptor(Vec<u8>),
140 AvroSchema(String),
142 Custom(HashMap<String, serde_json::Value>),
144}
145
146#[derive(Debug, Clone, Copy)]
148pub enum CompatibilityMode {
149 None,
151 Backward,
153 Forward,
155 Full,
157}
158
159#[derive(Debug, Clone)]
161pub struct EvolutionRules {
162 pub allow_field_addition: bool,
164 pub allow_field_removal: bool,
166 pub allow_type_promotion: bool,
168 pub required_fields: Vec<String>,
170}
171
172impl Default for EvolutionRules {
173 fn default() -> Self {
174 Self {
175 allow_field_addition: true,
176 allow_field_removal: false,
177 allow_type_promotion: true,
178 required_fields: vec!["event_id".to_string(), "timestamp".to_string()],
179 }
180 }
181}
182
183impl EventSerializer {
184 pub fn new(format: SerializationFormat) -> Self {
186 Self {
187 format,
188 compression: None,
189 schema_registry: None,
190 options: SerializerOptions::default(),
191 }
192 }
193
194 pub fn with_compression(mut self, compression: CompressionType) -> Self {
196 self.compression = Some(compression);
197 self
198 }
199
200 pub fn with_schema_registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
202 self.schema_registry = Some(registry);
203 self
204 }
205
206 pub fn with_options(mut self, options: SerializerOptions) -> Self {
208 self.options = options;
209 self
210 }
211
212 pub async fn serialize(&self, event: &StreamEvent) -> Result<Bytes> {
214 let mut buffer = BytesMut::new();
215
216 if self.options.include_magic_bytes {
218 buffer.put(self.format.magic_bytes());
219 }
220
221 if self.options.include_schema_id {
223 if let Some(registry) = &self.schema_registry {
224 let schema_id = registry.get_schema_id_for_event(event).await?;
225 buffer.put_u32(schema_id.parse::<u32>().unwrap_or(0));
226 }
227 }
228
229 let serialized = match self.format {
231 SerializationFormat::Json => self.serialize_json(event)?,
232 SerializationFormat::Binary => self.serialize_binary(event)?,
233 SerializationFormat::MessagePack => self.serialize_messagepack(event)?,
234 SerializationFormat::Cbor => self.serialize_cbor(event)?,
235 SerializationFormat::Protobuf => self.serialize_protobuf(event)?,
236 SerializationFormat::Avro => self.serialize_avro(event).await?,
237 };
238
239 let data = if let Some(compression) = &self.compression {
241 self.compress(&serialized, compression)?
242 } else {
243 serialized
244 };
245
246 if let Some(max_size) = self.options.max_size {
248 if data.len() > max_size {
249 return Err(anyhow!(
250 "Serialized data exceeds maximum size: {} > {max_size}",
251 data.len()
252 ));
253 }
254 }
255
256 buffer.put(&data[..]);
257 Ok(buffer.freeze())
258 }
259
260 pub async fn deserialize(&self, data: &[u8]) -> Result<StreamEvent> {
262 let mut cursor = std::io::Cursor::new(data);
263 let mut offset = 0;
264
265 if self.options.include_magic_bytes && data.len() >= 4 {
267 let magic = &data[0..4];
268 if magic == self.format.magic_bytes() {
269 offset += 4;
270 cursor.set_position(4);
271 }
272 }
273
274 if self.options.include_schema_id
276 && self.schema_registry.is_some()
277 && data.len() >= offset + 4
278 {
279 offset += 4;
280 cursor.set_position(offset as u64);
281 }
282
283 let event_data = &data[offset..];
285
286 let decompressed = if let Some(compression) = &self.compression {
288 self.decompress(event_data, compression)?
289 } else {
290 event_data.to_vec()
291 };
292
293 match self.format {
295 SerializationFormat::Json => self.deserialize_json(&decompressed),
296 SerializationFormat::Binary => self.deserialize_binary(&decompressed),
297 SerializationFormat::MessagePack => self.deserialize_messagepack(&decompressed),
298 SerializationFormat::Cbor => self.deserialize_cbor(&decompressed),
299 SerializationFormat::Protobuf => self.deserialize_protobuf(&decompressed),
300 SerializationFormat::Avro => self.deserialize_avro(&decompressed).await,
301 }
302 }
303
304 fn serialize_json(&self, event: &StreamEvent) -> Result<Vec<u8>> {
306 if self.options.pretty_json {
307 serde_json::to_vec_pretty(event).map_err(|e| anyhow!("JSON serialization failed: {e}"))
308 } else {
309 serde_json::to_vec(event).map_err(|e| anyhow!("JSON serialization failed: {e}"))
310 }
311 }
312
313 fn deserialize_json(&self, data: &[u8]) -> Result<StreamEvent> {
315 serde_json::from_slice(data).map_err(|e| anyhow!("JSON deserialization failed: {e}"))
316 }
317
318 fn serialize_binary(&self, event: &StreamEvent) -> Result<Vec<u8>> {
320 let mut buffer = Vec::new();
322
323 buffer.push(1); let event_type = match event {
328 StreamEvent::TripleAdded { .. } => 1,
329 StreamEvent::TripleRemoved { .. } => 2,
330 StreamEvent::QuadAdded { .. } => 3,
331 StreamEvent::QuadRemoved { .. } => 4,
332 StreamEvent::GraphCreated { .. } => 5,
333 StreamEvent::GraphCleared { .. } => 6,
334 StreamEvent::GraphDeleted { .. } => 7,
335 StreamEvent::GraphMetadataUpdated { .. } => 17,
336 StreamEvent::GraphPermissionsChanged { .. } => 18,
337 StreamEvent::GraphStatisticsUpdated { .. } => 19,
338 StreamEvent::GraphRenamed { .. } => 20,
339 StreamEvent::GraphMerged { .. } => 21,
340 StreamEvent::GraphSplit { .. } => 22,
341 StreamEvent::SparqlUpdate { .. } => 8,
342 StreamEvent::TransactionBegin { .. } => 9,
343 StreamEvent::TransactionCommit { .. } => 10,
344 StreamEvent::TransactionAbort { .. } => 11,
345 StreamEvent::SchemaChanged { .. } => 12,
346 StreamEvent::SchemaDefinitionAdded { .. } => 23,
347 StreamEvent::SchemaDefinitionRemoved { .. } => 24,
348 StreamEvent::SchemaDefinitionModified { .. } => 25,
349 StreamEvent::OntologyImported { .. } => 26,
350 StreamEvent::OntologyRemoved { .. } => 27,
351 StreamEvent::ConstraintAdded { .. } => 28,
352 StreamEvent::ConstraintRemoved { .. } => 29,
353 StreamEvent::ConstraintViolated { .. } => 30,
354 StreamEvent::IndexCreated { .. } => 31,
355 StreamEvent::IndexDropped { .. } => 32,
356 StreamEvent::IndexRebuilt { .. } => 33,
357 StreamEvent::ShapeAdded { .. } => 34,
358 StreamEvent::ShapeRemoved { .. } => 35,
359 StreamEvent::ShapeModified { .. } => 36,
360 StreamEvent::ShapeValidationStarted { .. } => 37,
361 StreamEvent::ShapeValidationCompleted { .. } => 38,
362 StreamEvent::ShapeViolationDetected { .. } => 39,
363 StreamEvent::QueryResultAdded { .. } => 14,
364 StreamEvent::QueryResultRemoved { .. } => 15,
365 StreamEvent::QueryCompleted { .. } => 16,
366 StreamEvent::SchemaUpdated { .. } => 40,
367 StreamEvent::ShapeUpdated { .. } => 41,
368 StreamEvent::Heartbeat { .. } => 13,
369 StreamEvent::ErrorOccurred { .. } => 42,
370 };
371 buffer.push(event_type);
372
373 match event {
375 StreamEvent::TripleAdded {
376 subject,
377 predicate,
378 object,
379 graph,
380 metadata,
381 } => {
382 self.write_string(&mut buffer, subject);
383 self.write_string(&mut buffer, predicate);
384 self.write_string(&mut buffer, object);
385 self.write_optional_string(&mut buffer, graph.as_deref());
386 self.write_metadata(&mut buffer, metadata)?;
387 }
388 _ => {
390 return Err(anyhow!(
391 "Binary serialization not implemented for this event type"
392 ))
393 }
394 }
395
396 Ok(buffer)
397 }
398
399 fn write_string(&self, buffer: &mut Vec<u8>, s: &str) {
401 let bytes = s.as_bytes();
402 buffer.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
403 buffer.extend_from_slice(bytes);
404 }
405
406 fn write_optional_string(&self, buffer: &mut Vec<u8>, s: Option<&str>) {
408 match s {
409 Some(s) => {
410 buffer.push(1); self.write_string(buffer, s);
412 }
413 None => {
414 buffer.push(0); }
416 }
417 }
418
419 fn write_metadata(&self, buffer: &mut Vec<u8>, metadata: &EventMetadata) -> Result<()> {
421 let metadata_json = serde_json::to_vec(metadata)?;
423 buffer.extend_from_slice(&(metadata_json.len() as u32).to_le_bytes());
424 buffer.extend_from_slice(&metadata_json);
425 Ok(())
426 }
427
428 fn deserialize_binary(&self, data: &[u8]) -> Result<StreamEvent> {
430 if data.len() < 2 {
431 return Err(anyhow!("Binary data too short"));
432 }
433
434 let version = data[0];
435 if version != 1 {
436 return Err(anyhow!("Unsupported binary format version: {version}"));
437 }
438
439 let event_type = data[1];
440 let mut cursor = std::io::Cursor::new(&data[2..]);
441
442 match event_type {
443 1 => {
444 let subject = self.read_string(&mut cursor)?;
446 let predicate = self.read_string(&mut cursor)?;
447 let object = self.read_string(&mut cursor)?;
448 let graph = self.read_optional_string(&mut cursor)?;
449 let metadata = self.read_metadata(&mut cursor)?;
450
451 Ok(StreamEvent::TripleAdded {
452 subject,
453 predicate,
454 object,
455 graph,
456 metadata,
457 })
458 }
459 _ => Err(anyhow!("Unknown event type: {event_type}")),
461 }
462 }
463
464 fn read_string(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<String> {
466 use std::io::Read;
467
468 let mut len_bytes = [0u8; 4];
469 cursor.read_exact(&mut len_bytes)?;
470 let len = u32::from_le_bytes(len_bytes) as usize;
471
472 let mut bytes = vec![0u8; len];
473 cursor.read_exact(&mut bytes)?;
474
475 String::from_utf8(bytes).map_err(|e| anyhow!("Invalid UTF-8: {e}"))
476 }
477
478 fn read_optional_string(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<Option<String>> {
480 use std::io::Read;
481
482 let mut present = [0u8; 1];
483 cursor.read_exact(&mut present)?;
484
485 if present[0] == 1 {
486 Ok(Some(self.read_string(cursor)?))
487 } else {
488 Ok(None)
489 }
490 }
491
492 fn read_metadata(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<EventMetadata> {
494 use std::io::Read;
495
496 let mut len_bytes = [0u8; 4];
497 cursor.read_exact(&mut len_bytes)?;
498 let len = u32::from_le_bytes(len_bytes) as usize;
499
500 let mut json_bytes = vec![0u8; len];
501 cursor.read_exact(&mut json_bytes)?;
502
503 serde_json::from_slice(&json_bytes).map_err(|e| anyhow!("Failed to parse metadata: {e}"))
504 }
505
506 fn serialize_messagepack(&self, event: &StreamEvent) -> Result<Vec<u8>> {
508 rmp_serde::to_vec(event).map_err(|e| anyhow!("MessagePack serialization failed: {e}"))
509 }
510
511 fn deserialize_messagepack(&self, data: &[u8]) -> Result<StreamEvent> {
513 rmp_serde::from_slice(data).map_err(|e| anyhow!("MessagePack deserialization failed: {e}"))
514 }
515
516 fn serialize_cbor(&self, event: &StreamEvent) -> Result<Vec<u8>> {
518 let mut buf = Vec::new();
519 ciborium::ser::into_writer(event, &mut buf)
520 .map_err(|e| anyhow!("CBOR serialization failed: {e}"))?;
521 Ok(buf)
522 }
523
524 fn deserialize_cbor(&self, data: &[u8]) -> Result<StreamEvent> {
526 ciborium::de::from_reader(data).map_err(|e| anyhow!("CBOR deserialization failed: {e}"))
527 }
528
529 fn serialize_protobuf(&self, event: &StreamEvent) -> Result<Vec<u8>> {
531 let json_data = serde_json::to_value(event)?;
534 let proto_event = ProtobufStreamEvent::from_json(&json_data)?;
535
536 let mut buf = Vec::new();
537 prost::Message::encode(&proto_event, &mut buf)?;
538 Ok(buf)
539 }
540
541 fn deserialize_protobuf(&self, data: &[u8]) -> Result<StreamEvent> {
543 let proto_event = ProtobufStreamEvent::decode(data)?;
544 let json_value = proto_event.to_json()?;
545 let event: StreamEvent = serde_json::from_value(json_value)?;
546 Ok(event)
547 }
548
549 async fn serialize_avro(&self, event: &StreamEvent) -> Result<Vec<u8>> {
551 let schema = if let Some(registry) = &self.schema_registry {
553 registry.get_avro_schema_for_event(event).await?
554 } else {
555 get_default_avro_schema()
557 };
558
559 let avro_value = to_avro_value(event, &schema)?;
561
562 let mut writer = Vec::new();
564 let mut encoder = apache_avro::Writer::new(&schema, &mut writer);
565 encoder.append(avro_value)?;
566 encoder.flush()?;
567
568 let result = encoder.into_inner()?.to_vec();
570 Ok(result)
571 }
572
573 async fn deserialize_avro(&self, data: &[u8]) -> Result<StreamEvent> {
575 let reader = apache_avro::Reader::new(data)?;
577 let schema = reader.writer_schema().clone();
578
579 if let Some(record) = reader.into_iter().next() {
581 let avro_value = record?;
582 let event = from_avro_value(&avro_value, &schema)?;
583 Ok(event)
584 } else {
585 Err(anyhow!("No Avro record found in data"))
586 }
587 }
588
589 fn compress(&self, data: &[u8], compression: &CompressionType) -> Result<Vec<u8>> {
591 use flate2::write::GzEncoder;
592 use std::io::Write;
593
594 match compression {
595 CompressionType::None => Ok(data.to_vec()),
596 CompressionType::Gzip => {
597 let mut encoder = GzEncoder::new(Vec::new(), flate2::Compression::default());
598 encoder.write_all(data)?;
599 encoder
600 .finish()
601 .map_err(|e| anyhow!("Gzip compression failed: {e}"))
602 }
603 CompressionType::Zstd => oxiarc_zstd::encode_all(data, 3)
604 .map_err(|e| anyhow!("Zstd compression failed: {e}")),
605 _ => Err(anyhow!("Compression type {compression:?} not implemented")),
606 }
607 }
608
609 fn decompress(&self, data: &[u8], compression: &CompressionType) -> Result<Vec<u8>> {
611 use flate2::read::GzDecoder;
612 use std::io::Read;
613
614 match compression {
615 CompressionType::None => Ok(data.to_vec()),
616 CompressionType::Gzip => {
617 let mut decoder = GzDecoder::new(data);
618 let mut decompressed = Vec::new();
619 decoder.read_to_end(&mut decompressed)?;
620 Ok(decompressed)
621 }
622 CompressionType::Zstd => {
623 oxiarc_zstd::decode_all(data).map_err(|e| anyhow!("Zstd decompression failed: {e}"))
624 }
625 _ => Err(anyhow!(
626 "Decompression type {compression:?} not implemented"
627 )),
628 }
629 }
630}
631
632impl SchemaRegistry {
633 pub fn new(evolution_rules: EvolutionRules) -> Self {
635 Self {
636 schemas: Arc::new(RwLock::new(HashMap::new())),
637 evolution_rules,
638 }
639 }
640
641 pub async fn register_schema(&self, schema: Schema) -> Result<String> {
643 let schema_id = schema.id.clone();
644 self.schemas.write().await.insert(schema_id.clone(), schema);
645 Ok(schema_id)
646 }
647
648 pub async fn get_schema(&self, id: &str) -> Result<Schema> {
650 self.schemas
651 .read()
652 .await
653 .get(id)
654 .cloned()
655 .ok_or_else(|| anyhow!("Schema {id} not found"))
656 }
657
658 pub async fn get_schema_id_for_event(&self, _event: &StreamEvent) -> Result<String> {
660 Ok("default-v1".to_string())
662 }
663
664 pub async fn validate_evolution(&self, old_schema: &Schema, new_schema: &Schema) -> Result<()> {
666 match old_schema.compatibility {
667 CompatibilityMode::None => Ok(()),
668 CompatibilityMode::Backward => {
669 self.check_backward_compatibility(old_schema, new_schema)
671 }
672 CompatibilityMode::Forward => {
673 self.check_forward_compatibility(old_schema, new_schema)
675 }
676 CompatibilityMode::Full => {
677 self.check_backward_compatibility(old_schema, new_schema)?;
679 self.check_forward_compatibility(old_schema, new_schema)
680 }
681 }
682 }
683
684 fn check_backward_compatibility(
686 &self,
687 _old_schema: &Schema,
688 _new_schema: &Schema,
689 ) -> Result<()> {
690 Ok(())
692 }
693
694 fn check_forward_compatibility(
696 &self,
697 _old_schema: &Schema,
698 _new_schema: &Schema,
699 ) -> Result<()> {
700 Ok(())
702 }
703}
704
705pub struct FormatConverter {
707 source_format: SerializationFormat,
708 target_format: SerializationFormat,
709 schema_registry: Option<Arc<SchemaRegistry>>,
710}
711
712impl FormatConverter {
713 pub fn new(source: SerializationFormat, target: SerializationFormat) -> Self {
715 Self {
716 source_format: source,
717 target_format: target,
718 schema_registry: None,
719 }
720 }
721
722 pub async fn convert(&self, data: &[u8]) -> Result<Bytes> {
724 let source_serializer = EventSerializer::new(self.source_format);
726 let event = source_serializer.deserialize(data).await?;
727
728 let target_serializer = EventSerializer::new(self.target_format);
730 target_serializer.serialize(&event).await
731 }
732}
733
734#[cfg(test)]
735mod tests {
736 use super::*;
737 use crate::StreamEvent;
738
739 #[tokio::test]
740 async fn test_json_serialization() {
741 let event = StreamEvent::Heartbeat {
742 timestamp: chrono::Utc::now(),
743 source: "test".to_string(),
744 metadata: crate::event::EventMetadata::default(),
745 };
746
747 let serializer = EventSerializer::new(SerializationFormat::Json);
748 let serialized = serializer.serialize(&event).await.unwrap();
749 let deserialized = serializer.deserialize(&serialized).await.unwrap();
750
751 match deserialized {
752 StreamEvent::Heartbeat { source, .. } => {
753 assert_eq!(source, "test");
754 }
755 _ => panic!("Wrong event type"),
756 }
757 }
758
759 #[tokio::test]
760 async fn test_format_detection() {
761 let json_data = b"{\"test\": \"data\"}";
762 assert_eq!(
763 SerializationFormat::detect(json_data),
764 Some(SerializationFormat::Json)
765 );
766
767 let magic_data = b"PB03some_data";
768 assert_eq!(
769 SerializationFormat::detect(magic_data),
770 Some(SerializationFormat::Protobuf)
771 );
772 }
773
774 #[tokio::test]
775 async fn test_compression() {
776 let event = StreamEvent::Heartbeat {
777 timestamp: chrono::Utc::now(),
778 source: "test".to_string(),
779 metadata: crate::event::EventMetadata::default(),
780 };
781
782 let serializer =
783 EventSerializer::new(SerializationFormat::Json).with_compression(CompressionType::Gzip);
784
785 let serialized = serializer.serialize(&event).await.unwrap();
786 let deserialized = serializer.deserialize(&serialized).await.unwrap();
787
788 match deserialized {
789 StreamEvent::Heartbeat { source, .. } => {
790 assert_eq!(source, "test");
791 }
792 _ => panic!("Wrong event type"),
793 }
794 }
795
796 #[tokio::test]
797 async fn test_messagepack_serialization() {
798 let metadata = EventMetadata::default();
799 let event = StreamEvent::TripleAdded {
800 subject: "http://example.org/subject".to_string(),
801 predicate: "http://example.org/predicate".to_string(),
802 object: "http://example.org/object".to_string(),
803 graph: None,
804 metadata,
805 };
806
807 let serializer = EventSerializer::new(SerializationFormat::MessagePack);
808 let serialized = serializer.serialize(&event).await.unwrap();
809 let deserialized = serializer.deserialize(&serialized).await.unwrap();
810
811 match deserialized {
812 StreamEvent::TripleAdded {
813 subject,
814 predicate,
815 object,
816 ..
817 } => {
818 assert_eq!(subject, "http://example.org/subject");
819 assert_eq!(predicate, "http://example.org/predicate");
820 assert_eq!(object, "http://example.org/object");
821 }
822 _ => panic!("Wrong event type"),
823 }
824 }
825
826 #[tokio::test]
827 async fn test_format_conversion() {
828 let event = StreamEvent::Heartbeat {
829 timestamp: chrono::Utc::now(),
830 source: "test".to_string(),
831 metadata: crate::event::EventMetadata::default(),
832 };
833
834 let json_serializer = EventSerializer::new(SerializationFormat::Json);
836 let json_data = json_serializer.serialize(&event).await.unwrap();
837
838 let converter =
840 FormatConverter::new(SerializationFormat::Json, SerializationFormat::MessagePack);
841 let msgpack_data = converter.convert(&json_data).await.unwrap();
842
843 let msgpack_serializer = EventSerializer::new(SerializationFormat::MessagePack);
845 let deserialized = msgpack_serializer.deserialize(&msgpack_data).await.unwrap();
846
847 match deserialized {
848 StreamEvent::Heartbeat { source, .. } => {
849 assert_eq!(source, "test");
850 }
851 _ => panic!("Wrong event type"),
852 }
853 }
854}
855
856#[derive(Debug, Clone)]
861pub struct ProtobufStreamEvent {
862 pub event_type: String,
863 pub data: Vec<u8>,
864 pub metadata: Vec<u8>,
865}
866
867impl ProtobufStreamEvent {
868 pub fn from_json(json: &serde_json::Value) -> Result<Self> {
870 let event_type = "StreamEvent".to_string(); let data = serde_json::to_vec(json)?;
875
876 let metadata = Vec::new();
878
879 Ok(Self {
880 event_type,
881 data,
882 metadata,
883 })
884 }
885
886 pub fn to_json(&self) -> Result<serde_json::Value> {
888 serde_json::from_slice(&self.data).map_err(|e| anyhow!("Failed to parse JSON: {}", e))
889 }
890
891 pub fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
893 buf.extend_from_slice(&self.data);
895 Ok(())
896 }
897
898 pub fn decode(data: &[u8]) -> Result<Self> {
900 Ok(Self {
902 event_type: "StreamEvent".to_string(),
903 data: data.to_vec(),
904 metadata: Vec::new(),
905 })
906 }
907}
908
909impl prost::Message for ProtobufStreamEvent {
910 fn encode_raw(&self, buf: &mut impl prost::bytes::BufMut) {
911 buf.put_slice(&self.data);
913 }
914
915 fn merge_field(
916 &mut self,
917 _tag: u32,
918 _wire_type: prost::encoding::WireType,
919 _buf: &mut impl prost::bytes::Buf,
920 _ctx: prost::encoding::DecodeContext,
921 ) -> Result<(), prost::DecodeError> {
922 Ok(())
923 }
924
925 fn encoded_len(&self) -> usize {
926 self.data.len()
927 }
928
929 fn clear(&mut self) {
930 self.data.clear();
931 self.metadata.clear();
932 }
933}
934
935pub fn get_default_avro_schema() -> apache_avro::Schema {
937 let schema_str = r#"
938 {
939 "type": "record",
940 "name": "StreamEvent",
941 "fields": [
942 {"name": "event_type", "type": "string"},
943 {"name": "data", "type": "bytes"},
944 {"name": "metadata", "type": ["null", "bytes"], "default": null}
945 ]
946 }
947 "#;
948
949 apache_avro::Schema::parse_str(schema_str).expect("Failed to parse default Avro schema")
950}
951
952pub fn to_avro_value(
954 event: &StreamEvent,
955 _schema: &apache_avro::Schema,
956) -> Result<apache_avro::types::Value> {
957 let json_data = serde_json::to_vec(event)?;
959
960 let fields = vec![
961 (
962 "event_type".to_string(),
963 apache_avro::types::Value::String("StreamEvent".to_string()),
964 ),
965 (
966 "data".to_string(),
967 apache_avro::types::Value::Bytes(json_data),
968 ),
969 (
970 "metadata".to_string(),
971 apache_avro::types::Value::Union(0, Box::new(apache_avro::types::Value::Null)),
972 ),
973 ];
974
975 Ok(apache_avro::types::Value::Record(fields))
976}
977
978pub fn from_avro_value(
980 value: &apache_avro::types::Value,
981 _schema: &apache_avro::Schema,
982) -> Result<StreamEvent> {
983 match value {
984 apache_avro::types::Value::Record(fields) => {
985 for (name, field_value) in fields {
987 if name == "data" {
988 if let apache_avro::types::Value::Bytes(bytes) = field_value {
989 let event: StreamEvent = serde_json::from_slice(bytes)?;
990 return Ok(event);
991 }
992 }
993 }
994 Err(anyhow!("No data field found in Avro record"))
995 }
996 _ => Err(anyhow!("Expected Avro record, got {:?}", value)),
997 }
998}
999
1000impl SchemaRegistry {
1001 pub async fn get_avro_schema_for_event(
1003 &self,
1004 _event: &StreamEvent,
1005 ) -> Result<apache_avro::Schema> {
1006 Ok(get_default_avro_schema())
1008 }
1009}
1010
1011pub struct DeltaCompressor {
1013 previous_states: Arc<RwLock<HashMap<String, StreamEvent>>>,
1015 compression_type: DeltaCompressionType,
1017 max_states: usize,
1019}
1020
1021#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
1023pub enum DeltaCompressionType {
1024 Xor,
1026 Prefix,
1028 Dictionary,
1030 Lz4Delta,
1032}
1033
1034impl DeltaCompressor {
1035 pub fn new(compression_type: DeltaCompressionType, max_states: usize) -> Self {
1037 Self {
1038 previous_states: Arc::new(RwLock::new(HashMap::new())),
1039 compression_type,
1040 max_states,
1041 }
1042 }
1043
1044 pub async fn compress_delta(
1046 &self,
1047 event: &StreamEvent,
1048 event_id: &str,
1049 ) -> Result<DeltaCompressedEvent> {
1050 let mut states = self.previous_states.write().await;
1051
1052 if states.len() >= self.max_states {
1054 let keys_to_remove: Vec<String> = states
1055 .keys()
1056 .take(states.len() - self.max_states + 1)
1057 .cloned()
1058 .collect();
1059 for key in keys_to_remove {
1060 states.remove(&key);
1061 }
1062 }
1063
1064 let delta = if let Some(previous) = states.get(event_id) {
1065 self.calculate_delta(previous, event)?
1066 } else {
1067 EventDelta::Full(Box::new(event.clone()))
1069 };
1070
1071 states.insert(event_id.to_string(), event.clone());
1073
1074 Ok(DeltaCompressedEvent {
1075 event_id: event_id.to_string(),
1076 delta,
1077 compression_type: self.compression_type,
1078 timestamp: chrono::Utc::now(),
1079 })
1080 }
1081
1082 fn calculate_delta(&self, previous: &StreamEvent, current: &StreamEvent) -> Result<EventDelta> {
1084 match self.compression_type {
1085 DeltaCompressionType::Xor => self.calculate_xor_delta(previous, current),
1086 DeltaCompressionType::Prefix => self.calculate_prefix_delta(previous, current),
1087 DeltaCompressionType::Dictionary => self.calculate_dictionary_delta(previous, current),
1088 DeltaCompressionType::Lz4Delta => self.calculate_lz4_delta(previous, current),
1089 }
1090 }
1091
1092 fn calculate_xor_delta(
1094 &self,
1095 previous: &StreamEvent,
1096 current: &StreamEvent,
1097 ) -> Result<EventDelta> {
1098 let prev_bytes = serde_json::to_vec(previous)?;
1099 let curr_bytes = serde_json::to_vec(current)?;
1100
1101 if prev_bytes.len() != curr_bytes.len() {
1102 return Ok(EventDelta::Full(Box::new(current.clone())));
1104 }
1105
1106 let xor_bytes: Vec<u8> = prev_bytes
1107 .iter()
1108 .zip(curr_bytes.iter())
1109 .map(|(a, b)| a ^ b)
1110 .collect();
1111
1112 Ok(EventDelta::Xor(xor_bytes))
1113 }
1114
1115 fn calculate_prefix_delta(
1117 &self,
1118 previous: &StreamEvent,
1119 current: &StreamEvent,
1120 ) -> Result<EventDelta> {
1121 let prev_json = serde_json::to_value(previous)?;
1122 let curr_json = serde_json::to_value(current)?;
1123
1124 let diff = self.calculate_json_prefix_diff(&prev_json, &curr_json)?;
1125 Ok(EventDelta::Prefix(diff))
1126 }
1127
1128 fn calculate_dictionary_delta(
1130 &self,
1131 previous: &StreamEvent,
1132 current: &StreamEvent,
1133 ) -> Result<EventDelta> {
1134 let prev_strings = self.extract_strings_from_event(previous);
1135 let curr_strings = self.extract_strings_from_event(current);
1136
1137 let mut dictionary = HashMap::new();
1138 let mut dict_id = 0u16;
1139
1140 for string in &prev_strings {
1142 if curr_strings.contains(string) && !dictionary.contains_key(string) {
1143 dictionary.insert(string.clone(), dict_id);
1144 dict_id += 1;
1145 }
1146 }
1147
1148 let compressed_event = self.replace_strings_with_ids(current, &dictionary)?;
1150
1151 Ok(EventDelta::Dictionary {
1152 dictionary,
1153 compressed_event,
1154 })
1155 }
1156
1157 fn calculate_lz4_delta(
1159 &self,
1160 previous: &StreamEvent,
1161 current: &StreamEvent,
1162 ) -> Result<EventDelta> {
1163 let prev_bytes = serde_json::to_vec(previous)?;
1164 let curr_bytes = serde_json::to_vec(current)?;
1165
1166 let diff_bytes = self.calculate_byte_diff(&prev_bytes, &curr_bytes);
1168 let compressed = oxiarc_lz4::compress(&diff_bytes)
1169 .map_err(|e| anyhow!("LZ4 compression failed: {}", e))?;
1170
1171 Ok(EventDelta::Lz4(compressed))
1172 }
1173
1174 fn calculate_json_prefix_diff(
1176 &self,
1177 prev: &serde_json::Value,
1178 curr: &serde_json::Value,
1179 ) -> Result<serde_json::Value> {
1180 match (prev, curr) {
1181 (serde_json::Value::Object(prev_obj), serde_json::Value::Object(curr_obj)) => {
1182 let mut diff = serde_json::Map::new();
1183 for (key, curr_val) in curr_obj {
1184 if let Some(prev_val) = prev_obj.get(key) {
1185 if prev_val != curr_val {
1186 diff.insert(key.clone(), curr_val.clone());
1187 }
1188 } else {
1189 diff.insert(key.clone(), curr_val.clone());
1190 }
1191 }
1192 Ok(serde_json::Value::Object(diff))
1193 }
1194 _ => Ok(curr.clone()),
1195 }
1196 }
1197
1198 fn extract_strings_from_event(&self, event: &StreamEvent) -> Vec<String> {
1200 let mut strings = Vec::new();
1201 if let Ok(json) = serde_json::to_value(event) {
1202 Self::extract_strings_from_json(&json, &mut strings);
1203 }
1204 strings
1205 }
1206
1207 fn extract_strings_from_json(value: &serde_json::Value, strings: &mut Vec<String>) {
1209 match value {
1210 serde_json::Value::String(s) => strings.push(s.clone()),
1211 serde_json::Value::Array(arr) => {
1212 for item in arr {
1213 Self::extract_strings_from_json(item, strings);
1214 }
1215 }
1216 serde_json::Value::Object(obj) => {
1217 for (_, val) in obj {
1218 Self::extract_strings_from_json(val, strings);
1219 }
1220 }
1221 _ => {}
1222 }
1223 }
1224
1225 fn replace_strings_with_ids(
1227 &self,
1228 event: &StreamEvent,
1229 dictionary: &HashMap<String, u16>,
1230 ) -> Result<serde_json::Value> {
1231 let mut json = serde_json::to_value(event)?;
1232 Self::replace_strings_in_json(&mut json, dictionary);
1233 Ok(json)
1234 }
1235
1236 fn replace_strings_in_json(value: &mut serde_json::Value, dictionary: &HashMap<String, u16>) {
1238 match value {
1239 serde_json::Value::String(s) => {
1240 if let Some(&id) = dictionary.get(s) {
1241 *value = serde_json::Value::Number(serde_json::Number::from(id));
1242 }
1243 }
1244 serde_json::Value::Array(arr) => {
1245 for item in arr {
1246 Self::replace_strings_in_json(item, dictionary);
1247 }
1248 }
1249 serde_json::Value::Object(obj) => {
1250 for val in obj.values_mut() {
1251 Self::replace_strings_in_json(val, dictionary);
1252 }
1253 }
1254 _ => {}
1255 }
1256 }
1257
1258 fn calculate_byte_diff(&self, prev: &[u8], curr: &[u8]) -> Vec<u8> {
1260 let mut diff = Vec::new();
1262
1263 diff.extend_from_slice(&(curr.len() as u32).to_le_bytes());
1265 diff.extend_from_slice(&(prev.len() as u32).to_le_bytes());
1266
1267 diff.extend_from_slice(curr);
1269
1270 diff
1271 }
1272
1273 pub async fn decompress_delta(
1275 &self,
1276 compressed: &DeltaCompressedEvent,
1277 previous_event: Option<&StreamEvent>,
1278 ) -> Result<StreamEvent> {
1279 match &compressed.delta {
1280 EventDelta::Full(event) => Ok((**event).clone()),
1281 EventDelta::Xor(xor_bytes) => {
1282 if let Some(prev) = previous_event {
1283 let prev_bytes = serde_json::to_vec(prev)?;
1284 if prev_bytes.len() == xor_bytes.len() {
1285 let restored_bytes: Vec<u8> = prev_bytes
1286 .iter()
1287 .zip(xor_bytes.iter())
1288 .map(|(a, b)| a ^ b)
1289 .collect();
1290 let event = serde_json::from_slice(&restored_bytes)?;
1291 Ok(event)
1292 } else {
1293 Err(anyhow!("XOR delta length mismatch"))
1294 }
1295 } else {
1296 Err(anyhow!("Previous event required for XOR decompression"))
1297 }
1298 }
1299 EventDelta::Prefix(diff) => {
1300 if let Some(prev) = previous_event {
1301 let mut prev_json = serde_json::to_value(prev)?;
1302 self.apply_json_diff(&mut prev_json, diff)?;
1303 let event = serde_json::from_value(prev_json)?;
1304 Ok(event)
1305 } else {
1306 Err(anyhow!("Previous event required for prefix decompression"))
1307 }
1308 }
1309 EventDelta::Dictionary {
1310 dictionary,
1311 compressed_event,
1312 } => {
1313 let mut restored_json = compressed_event.clone();
1314 let reverse_dict: HashMap<u16, String> =
1315 dictionary.iter().map(|(k, &v)| (v, k.clone())).collect();
1316 Self::restore_strings_from_ids(&mut restored_json, &reverse_dict);
1317 let event = serde_json::from_value(restored_json)?;
1318 Ok(event)
1319 }
1320 EventDelta::Lz4(compressed_bytes) => {
1321 let decompressed = oxiarc_lz4::decompress(compressed_bytes, 100 * 1024 * 1024)
1322 .map_err(|e| anyhow!("LZ4 decompression failed: {}", e))?;
1323 let event = serde_json::from_slice(&decompressed)?;
1325 Ok(event)
1326 }
1327 }
1328 }
1329
1330 fn apply_json_diff(
1332 &self,
1333 base: &mut serde_json::Value,
1334 diff: &serde_json::Value,
1335 ) -> Result<()> {
1336 if let (Some(base_obj), Some(diff_obj)) = (base.as_object_mut(), diff.as_object()) {
1337 for (key, diff_val) in diff_obj {
1338 base_obj.insert(key.clone(), diff_val.clone());
1339 }
1340 } else {
1341 *base = diff.clone();
1342 }
1343 Ok(())
1344 }
1345
1346 fn restore_strings_from_ids(
1348 value: &mut serde_json::Value,
1349 reverse_dict: &HashMap<u16, String>,
1350 ) {
1351 match value {
1352 serde_json::Value::Number(n) => {
1353 if let Some(id) = n.as_u64() {
1354 if let Some(string) = reverse_dict.get(&(id as u16)) {
1355 *value = serde_json::Value::String(string.clone());
1356 }
1357 }
1358 }
1359 serde_json::Value::Array(arr) => {
1360 for item in arr {
1361 Self::restore_strings_from_ids(item, reverse_dict);
1362 }
1363 }
1364 serde_json::Value::Object(obj) => {
1365 for val in obj.values_mut() {
1366 Self::restore_strings_from_ids(val, reverse_dict);
1367 }
1368 }
1369 _ => {}
1370 }
1371 }
1372}
1373
1374#[derive(Debug, Clone, Serialize, Deserialize)]
1376pub struct DeltaCompressedEvent {
1377 pub event_id: String,
1378 pub delta: EventDelta,
1379 pub compression_type: DeltaCompressionType,
1380 pub timestamp: DateTime<Utc>,
1381}
1382
1383#[derive(Debug, Clone, Serialize, Deserialize)]
1385pub enum EventDelta {
1386 Full(Box<StreamEvent>),
1388 Xor(Vec<u8>),
1390 Prefix(serde_json::Value),
1392 Dictionary {
1394 dictionary: HashMap<String, u16>,
1395 compressed_event: serde_json::Value,
1396 },
1397 Lz4(Vec<u8>),
1399}
1400
1401pub struct StreamingSerializer {
1403 serializer: EventSerializer,
1404 delta_compressor: Option<DeltaCompressor>,
1405 batch_size: usize,
1406 current_batch: Vec<StreamEvent>,
1407}
1408
1409impl StreamingSerializer {
1410 pub fn new(serializer: EventSerializer, batch_size: usize) -> Self {
1412 Self {
1413 serializer,
1414 delta_compressor: None,
1415 batch_size,
1416 current_batch: Vec::new(),
1417 }
1418 }
1419
1420 pub fn with_delta_compression(
1422 mut self,
1423 compression_type: DeltaCompressionType,
1424 max_states: usize,
1425 ) -> Self {
1426 self.delta_compressor = Some(DeltaCompressor::new(compression_type, max_states));
1427 self
1428 }
1429
1430 pub async fn add_event(&mut self, event: StreamEvent) -> Result<Option<Bytes>> {
1432 self.current_batch.push(event);
1433
1434 if self.current_batch.len() >= self.batch_size {
1435 self.flush_batch().await
1436 } else {
1437 Ok(None)
1438 }
1439 }
1440
1441 pub async fn flush_batch(&mut self) -> Result<Option<Bytes>> {
1443 if self.current_batch.is_empty() {
1444 return Ok(None);
1445 }
1446
1447 let batch = std::mem::take(&mut self.current_batch);
1448 let serialized = self.serialize_batch(&batch).await?;
1449 Ok(Some(serialized))
1450 }
1451
1452 async fn serialize_batch(&self, batch: &[StreamEvent]) -> Result<Bytes> {
1454 let mut buffer = BytesMut::new();
1455
1456 buffer.put_u32(batch.len() as u32);
1458 buffer.put_u64(chrono::Utc::now().timestamp_millis() as u64);
1459
1460 for event in batch {
1462 let event_data = self.serializer.serialize(event).await?;
1463 buffer.put_u32(event_data.len() as u32);
1464 buffer.put(event_data);
1465 }
1466
1467 Ok(buffer.freeze())
1468 }
1469
1470 pub async fn deserialize_batch(&self, data: &[u8]) -> Result<Vec<StreamEvent>> {
1472 let mut cursor = std::io::Cursor::new(data);
1473 let mut events = Vec::new();
1474
1475 let batch_size = cursor.get_u32();
1477 let _timestamp = cursor.get_u64();
1478
1479 for _ in 0..batch_size {
1481 let event_size = cursor.get_u32() as usize;
1482 let event_data =
1483 &data[cursor.position() as usize..(cursor.position() as usize + event_size)];
1484 cursor.advance(event_size);
1485
1486 let event = self.serializer.deserialize(event_data).await?;
1487 events.push(event);
1488 }
1489
1490 Ok(events)
1491 }
1492
1493 pub fn create_batch_stream(
1495 &self,
1496 events: impl Stream<Item = StreamEvent> + Send + 'static,
1497 ) -> BoxStream<'static, Result<Bytes>> {
1498 let serializer = self.serializer.clone();
1499 let batch_size = self.batch_size;
1500
1501 Box::pin(events.chunks(batch_size).then(move |chunk| {
1502 let serializer = serializer.clone();
1503 async move {
1504 let streaming_serializer = StreamingSerializer::new(serializer, batch_size);
1505 streaming_serializer.serialize_batch(&chunk).await
1506 }
1507 }))
1508 }
1509}
1510
1511pub struct EnhancedBinaryFormat {
1513 version: u8,
1514 enable_compression: bool,
1515 enable_checksums: bool,
1516 chunk_size: usize,
1517}
1518
1519impl EnhancedBinaryFormat {
1520 pub fn new() -> Self {
1522 Self {
1523 version: 2, enable_compression: true,
1525 enable_checksums: true,
1526 chunk_size: 8192, }
1528 }
1529
1530 pub fn with_compression(mut self, enable: bool) -> Self {
1532 self.enable_compression = enable;
1533 self
1534 }
1535
1536 pub fn with_checksums(mut self, enable: bool) -> Self {
1538 self.enable_checksums = enable;
1539 self
1540 }
1541
1542 pub fn with_chunk_size(mut self, size: usize) -> Self {
1544 self.chunk_size = size;
1545 self
1546 }
1547
1548 pub async fn serialize(&self, event: &StreamEvent) -> Result<Bytes> {
1550 let mut buffer = BytesMut::new();
1551
1552 buffer.put(&b"BIN2"[..]); buffer.put_u8(self.version);
1555 buffer.put_u8(self.get_flags());
1556
1557 let event_json = serde_json::to_vec(event)?;
1559
1560 let data = if self.enable_compression {
1562 oxiarc_lz4::compress(&event_json)
1563 .map_err(|e| anyhow!("LZ4 compression failed: {}", e))?
1564 } else {
1565 event_json
1566 };
1567
1568 if self.enable_checksums {
1570 let checksum = crc32fast::hash(&data);
1571 buffer.put_u32(checksum);
1572 }
1573
1574 buffer.put_u32(data.len() as u32);
1576 buffer.put(&data[..]);
1577
1578 Ok(buffer.freeze())
1579 }
1580
1581 pub async fn deserialize(&self, data: &[u8]) -> Result<StreamEvent> {
1583 let mut cursor = std::io::Cursor::new(data);
1584
1585 let mut magic = [0u8; 4];
1587 cursor.read_exact(&mut magic)?;
1588 if &magic != b"BIN2" {
1589 return Err(anyhow!("Invalid magic bytes for enhanced binary format"));
1590 }
1591
1592 let version = cursor.get_u8();
1594 if version != self.version {
1595 return Err(anyhow!(
1596 "Unsupported enhanced binary format version: {}",
1597 version
1598 ));
1599 }
1600
1601 let flags = cursor.get_u8();
1602 let has_compression = (flags & 0x01) != 0;
1603 let has_checksum = (flags & 0x02) != 0;
1604
1605 let expected_checksum = if has_checksum {
1607 Some(cursor.get_u32())
1608 } else {
1609 None
1610 };
1611
1612 let data_len = cursor.get_u32() as usize;
1614 let mut event_data = vec![0u8; data_len];
1615 cursor.read_exact(&mut event_data)?;
1616
1617 if let Some(expected) = expected_checksum {
1619 let actual = crc32fast::hash(&event_data);
1620 if actual != expected {
1621 return Err(anyhow!(
1622 "Checksum mismatch: expected {}, got {}",
1623 expected,
1624 actual
1625 ));
1626 }
1627 }
1628
1629 let decompressed = if has_compression {
1631 oxiarc_lz4::decompress(&event_data, 100 * 1024 * 1024)
1632 .map_err(|e| anyhow!("LZ4 decompression failed: {}", e))?
1633 } else {
1634 event_data
1635 };
1636
1637 let event = serde_json::from_slice(&decompressed)?;
1639 Ok(event)
1640 }
1641
1642 pub async fn serialize_streaming(&self, event: &StreamEvent) -> Result<Vec<Bytes>> {
1644 let serialized = self.serialize(event).await?;
1645 let mut chunks = Vec::new();
1646
1647 if serialized.len() <= self.chunk_size {
1648 chunks.push(serialized);
1649 } else {
1650 let chunk_count = (serialized.len() + self.chunk_size - 1) / self.chunk_size;
1652
1653 for i in 0..chunk_count {
1654 let start = i * self.chunk_size;
1655 let end = std::cmp::min(start + self.chunk_size, serialized.len());
1656
1657 let mut chunk_buffer = BytesMut::new();
1658 chunk_buffer.put(&b"CHNK"[..]); chunk_buffer.put_u32(i as u32); chunk_buffer.put_u32(chunk_count as u32); chunk_buffer.put_u32((end - start) as u32); chunk_buffer.put(&serialized[start..end]);
1663
1664 chunks.push(chunk_buffer.freeze());
1665 }
1666 }
1667
1668 Ok(chunks)
1669 }
1670
1671 pub async fn deserialize_streaming(&self, chunks: Vec<Bytes>) -> Result<StreamEvent> {
1673 if chunks.len() == 1 && !chunks[0].starts_with(b"CHNK") {
1674 return self.deserialize(&chunks[0]).await;
1676 }
1677
1678 let mut chunk_data: BTreeMap<u32, Vec<u8>> = BTreeMap::new();
1680 let mut total_chunks = 0;
1681
1682 for chunk in chunks {
1683 if !chunk.starts_with(b"CHNK") {
1684 return Err(anyhow!("Invalid chunk format"));
1685 }
1686
1687 let mut cursor = std::io::Cursor::new(&chunk[4..]);
1688 let chunk_index = cursor.get_u32();
1689 let chunk_count = cursor.get_u32();
1690 let chunk_size = cursor.get_u32() as usize;
1691
1692 total_chunks = chunk_count;
1693
1694 let data = chunk[16..16 + chunk_size].to_vec();
1695 chunk_data.insert(chunk_index, data);
1696 }
1697
1698 if chunk_data.len() != total_chunks as usize {
1699 return Err(anyhow!(
1700 "Missing chunks: got {}, expected {}",
1701 chunk_data.len(),
1702 total_chunks
1703 ));
1704 }
1705
1706 let mut reassembled = Vec::new();
1708 for (_index, data) in chunk_data {
1709 reassembled.extend(data);
1710 }
1711
1712 self.deserialize(&reassembled).await
1714 }
1715
1716 fn get_flags(&self) -> u8 {
1718 let mut flags = 0u8;
1719 if self.enable_compression {
1720 flags |= 0x01;
1721 }
1722 if self.enable_checksums {
1723 flags |= 0x02;
1724 }
1725 flags
1726 }
1727}
1728
1729impl Default for EnhancedBinaryFormat {
1730 fn default() -> Self {
1731 Self::new()
1732 }
1733}
1734
1735