use anyhow::{anyhow, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::stream::{BoxStream, StreamExt as _};
use std::collections::BTreeMap;
use std::io::Read as _;
use std::sync::Arc;
use crate::serialization_decoder::DeltaCompressor;
use crate::serialization_types::{
from_avro_value, get_default_avro_schema, to_avro_value, DeltaCompressionType,
ProtobufStreamEvent, SchemaRegistry, SerializationFormat, SerializerOptions,
};
use crate::{CompressionType, EventMetadata, StreamEvent};
use tokio_stream::Stream;
#[derive(Clone)]
pub struct EventSerializer {
pub(crate) format: SerializationFormat,
pub(crate) compression: Option<CompressionType>,
pub(crate) schema_registry: Option<Arc<SchemaRegistry>>,
pub(crate) options: SerializerOptions,
}
impl EventSerializer {
pub fn new(format: SerializationFormat) -> Self {
Self {
format,
compression: None,
schema_registry: None,
options: SerializerOptions::default(),
}
}
pub fn with_compression(mut self, compression: CompressionType) -> Self {
self.compression = Some(compression);
self
}
pub fn with_schema_registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
self.schema_registry = Some(registry);
self
}
pub fn with_options(mut self, options: SerializerOptions) -> Self {
self.options = options;
self
}
pub async fn serialize(&self, event: &StreamEvent) -> Result<Bytes> {
let mut buffer = BytesMut::new();
if self.options.include_magic_bytes {
buffer.put(self.format.magic_bytes());
}
if self.options.include_schema_id {
if let Some(registry) = &self.schema_registry {
let schema_id = registry.get_schema_id_for_event(event).await?;
buffer.put_u32(schema_id.parse::<u32>().unwrap_or(0));
}
}
let serialized = match self.format {
SerializationFormat::Json => self.serialize_json(event)?,
SerializationFormat::Binary => self.serialize_binary(event)?,
SerializationFormat::MessagePack => self.serialize_messagepack(event)?,
SerializationFormat::Cbor => self.serialize_cbor(event)?,
SerializationFormat::Protobuf => self.serialize_protobuf(event)?,
SerializationFormat::Avro => self.serialize_avro(event).await?,
};
let data = if let Some(compression) = &self.compression {
self.compress(&serialized, compression)?
} else {
serialized
};
if let Some(max_size) = self.options.max_size {
if data.len() > max_size {
return Err(anyhow!(
"Serialized data exceeds maximum size: {} > {max_size}",
data.len()
));
}
}
buffer.put(&data[..]);
Ok(buffer.freeze())
}
pub async fn deserialize(&self, data: &[u8]) -> Result<StreamEvent> {
let mut cursor = std::io::Cursor::new(data);
let mut offset = 0;
if self.options.include_magic_bytes && data.len() >= 4 {
let magic = &data[0..4];
if magic == self.format.magic_bytes() {
offset += 4;
cursor.set_position(4);
}
}
if self.options.include_schema_id
&& self.schema_registry.is_some()
&& data.len() >= offset + 4
{
offset += 4;
cursor.set_position(offset as u64);
}
let event_data = &data[offset..];
let decompressed = if let Some(compression) = &self.compression {
self.decompress(event_data, compression)?
} else {
event_data.to_vec()
};
match self.format {
SerializationFormat::Json => self.deserialize_json(&decompressed),
SerializationFormat::Binary => self.deserialize_binary(&decompressed),
SerializationFormat::MessagePack => self.deserialize_messagepack(&decompressed),
SerializationFormat::Cbor => self.deserialize_cbor(&decompressed),
SerializationFormat::Protobuf => self.deserialize_protobuf(&decompressed),
SerializationFormat::Avro => self.deserialize_avro(&decompressed).await,
}
}
fn serialize_json(&self, event: &StreamEvent) -> Result<Vec<u8>> {
if self.options.pretty_json {
serde_json::to_vec_pretty(event).map_err(|e| anyhow!("JSON serialization failed: {e}"))
} else {
serde_json::to_vec(event).map_err(|e| anyhow!("JSON serialization failed: {e}"))
}
}
fn deserialize_json(&self, data: &[u8]) -> Result<StreamEvent> {
serde_json::from_slice(data).map_err(|e| anyhow!("JSON deserialization failed: {e}"))
}
fn serialize_binary(&self, event: &StreamEvent) -> Result<Vec<u8>> {
let mut buffer = Vec::new();
buffer.push(1);
let event_type = match event {
StreamEvent::TripleAdded { .. } => 1,
StreamEvent::TripleRemoved { .. } => 2,
StreamEvent::QuadAdded { .. } => 3,
StreamEvent::QuadRemoved { .. } => 4,
StreamEvent::GraphCreated { .. } => 5,
StreamEvent::GraphCleared { .. } => 6,
StreamEvent::GraphDeleted { .. } => 7,
StreamEvent::GraphMetadataUpdated { .. } => 17,
StreamEvent::GraphPermissionsChanged { .. } => 18,
StreamEvent::GraphStatisticsUpdated { .. } => 19,
StreamEvent::GraphRenamed { .. } => 20,
StreamEvent::GraphMerged { .. } => 21,
StreamEvent::GraphSplit { .. } => 22,
StreamEvent::SparqlUpdate { .. } => 8,
StreamEvent::TransactionBegin { .. } => 9,
StreamEvent::TransactionCommit { .. } => 10,
StreamEvent::TransactionAbort { .. } => 11,
StreamEvent::SchemaChanged { .. } => 12,
StreamEvent::SchemaDefinitionAdded { .. } => 23,
StreamEvent::SchemaDefinitionRemoved { .. } => 24,
StreamEvent::SchemaDefinitionModified { .. } => 25,
StreamEvent::OntologyImported { .. } => 26,
StreamEvent::OntologyRemoved { .. } => 27,
StreamEvent::ConstraintAdded { .. } => 28,
StreamEvent::ConstraintRemoved { .. } => 29,
StreamEvent::ConstraintViolated { .. } => 30,
StreamEvent::IndexCreated { .. } => 31,
StreamEvent::IndexDropped { .. } => 32,
StreamEvent::IndexRebuilt { .. } => 33,
StreamEvent::ShapeAdded { .. } => 34,
StreamEvent::ShapeRemoved { .. } => 35,
StreamEvent::ShapeModified { .. } => 36,
StreamEvent::ShapeValidationStarted { .. } => 37,
StreamEvent::ShapeValidationCompleted { .. } => 38,
StreamEvent::ShapeViolationDetected { .. } => 39,
StreamEvent::QueryResultAdded { .. } => 14,
StreamEvent::QueryResultRemoved { .. } => 15,
StreamEvent::QueryCompleted { .. } => 16,
StreamEvent::SchemaUpdated { .. } => 40,
StreamEvent::ShapeUpdated { .. } => 41,
StreamEvent::Heartbeat { .. } => 13,
StreamEvent::ErrorOccurred { .. } => 42,
};
buffer.push(event_type);
match event {
StreamEvent::TripleAdded {
subject,
predicate,
object,
graph,
metadata,
} => {
self.write_string(&mut buffer, subject);
self.write_string(&mut buffer, predicate);
self.write_string(&mut buffer, object);
self.write_optional_string(&mut buffer, graph.as_deref());
self.write_metadata(&mut buffer, metadata)?;
}
_ => {
return Err(anyhow!(
"Binary serialization not implemented for this event type"
))
}
}
Ok(buffer)
}
fn write_string(&self, buffer: &mut Vec<u8>, s: &str) {
let bytes = s.as_bytes();
buffer.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
buffer.extend_from_slice(bytes);
}
fn write_optional_string(&self, buffer: &mut Vec<u8>, s: Option<&str>) {
match s {
Some(s) => {
buffer.push(1); self.write_string(buffer, s);
}
None => {
buffer.push(0); }
}
}
fn write_metadata(&self, buffer: &mut Vec<u8>, metadata: &EventMetadata) -> Result<()> {
let metadata_json = serde_json::to_vec(metadata)?;
buffer.extend_from_slice(&(metadata_json.len() as u32).to_le_bytes());
buffer.extend_from_slice(&metadata_json);
Ok(())
}
fn deserialize_binary(&self, data: &[u8]) -> Result<StreamEvent> {
if data.len() < 2 {
return Err(anyhow!("Binary data too short"));
}
let version = data[0];
if version != 1 {
return Err(anyhow!("Unsupported binary format version: {version}"));
}
let event_type = data[1];
let mut cursor = std::io::Cursor::new(&data[2..]);
match event_type {
1 => {
let subject = self.read_string(&mut cursor)?;
let predicate = self.read_string(&mut cursor)?;
let object = self.read_string(&mut cursor)?;
let graph = self.read_optional_string(&mut cursor)?;
let metadata = self.read_metadata(&mut cursor)?;
Ok(StreamEvent::TripleAdded {
subject,
predicate,
object,
graph,
metadata,
})
}
_ => Err(anyhow!("Unknown event type: {event_type}")),
}
}
fn read_string(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<String> {
use std::io::Read;
let mut len_bytes = [0u8; 4];
cursor.read_exact(&mut len_bytes)?;
let len = u32::from_le_bytes(len_bytes) as usize;
let mut bytes = vec![0u8; len];
cursor.read_exact(&mut bytes)?;
String::from_utf8(bytes).map_err(|e| anyhow!("Invalid UTF-8: {e}"))
}
fn read_optional_string(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<Option<String>> {
use std::io::Read;
let mut present = [0u8; 1];
cursor.read_exact(&mut present)?;
if present[0] == 1 {
Ok(Some(self.read_string(cursor)?))
} else {
Ok(None)
}
}
fn read_metadata(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<EventMetadata> {
use std::io::Read;
let mut len_bytes = [0u8; 4];
cursor.read_exact(&mut len_bytes)?;
let len = u32::from_le_bytes(len_bytes) as usize;
let mut json_bytes = vec![0u8; len];
cursor.read_exact(&mut json_bytes)?;
serde_json::from_slice(&json_bytes).map_err(|e| anyhow!("Failed to parse metadata: {e}"))
}
fn serialize_messagepack(&self, event: &StreamEvent) -> Result<Vec<u8>> {
rmp_serde::to_vec(event).map_err(|e| anyhow!("MessagePack serialization failed: {e}"))
}
fn deserialize_messagepack(&self, data: &[u8]) -> Result<StreamEvent> {
rmp_serde::from_slice(data).map_err(|e| anyhow!("MessagePack deserialization failed: {e}"))
}
fn serialize_cbor(&self, event: &StreamEvent) -> Result<Vec<u8>> {
let mut buf = Vec::new();
ciborium::ser::into_writer(event, &mut buf)
.map_err(|e| anyhow!("CBOR serialization failed: {e}"))?;
Ok(buf)
}
fn deserialize_cbor(&self, data: &[u8]) -> Result<StreamEvent> {
ciborium::de::from_reader(data).map_err(|e| anyhow!("CBOR deserialization failed: {e}"))
}
fn serialize_protobuf(&self, event: &StreamEvent) -> Result<Vec<u8>> {
let json_data = serde_json::to_value(event)?;
let proto_event = ProtobufStreamEvent::from_json(&json_data)?;
let mut buf = Vec::new();
prost::Message::encode(&proto_event, &mut buf)?;
Ok(buf)
}
fn deserialize_protobuf(&self, data: &[u8]) -> Result<StreamEvent> {
let proto_event = ProtobufStreamEvent::decode(data)?;
let json_value = proto_event.to_json()?;
let event: StreamEvent = serde_json::from_value(json_value)?;
Ok(event)
}
async fn serialize_avro(&self, event: &StreamEvent) -> Result<Vec<u8>> {
let schema = if let Some(registry) = &self.schema_registry {
registry.get_avro_schema_for_event(event).await?
} else {
get_default_avro_schema()
};
let avro_value = to_avro_value(event, &schema)?;
let mut writer = Vec::new();
let mut encoder = apache_avro::Writer::new(&schema, &mut writer);
encoder.append(avro_value)?;
encoder.flush()?;
let result = encoder.into_inner()?.to_vec();
Ok(result)
}
async fn deserialize_avro(&self, data: &[u8]) -> Result<StreamEvent> {
let reader = apache_avro::Reader::new(data)?;
let schema = reader.writer_schema().clone();
if let Some(record) = reader.into_iter().next() {
let avro_value = record?;
let event = from_avro_value(&avro_value, &schema)?;
Ok(event)
} else {
Err(anyhow!("No Avro record found in data"))
}
}
fn compress(&self, data: &[u8], compression: &CompressionType) -> Result<Vec<u8>> {
match compression {
CompressionType::None => Ok(data.to_vec()),
CompressionType::Gzip => oxiarc_deflate::gzip_compress(data, 6)
.map_err(|e| anyhow!("Gzip compression failed: {e}")),
CompressionType::Zstd => oxiarc_zstd::encode_all(data, 3)
.map_err(|e| anyhow!("Zstd compression failed: {e}")),
CompressionType::Lz4 => {
oxiarc_lz4::compress(data).map_err(|e| anyhow!("LZ4 compression failed: {e}"))
}
CompressionType::Snappy => Ok(oxiarc_snappy::compress(data)),
}
}
fn decompress(&self, data: &[u8], compression: &CompressionType) -> Result<Vec<u8>> {
match compression {
CompressionType::None => Ok(data.to_vec()),
CompressionType::Gzip => oxiarc_deflate::gzip_decompress(data)
.map_err(|e| anyhow!("Gzip decompression failed: {e}")),
CompressionType::Zstd => {
oxiarc_zstd::decode_all(data).map_err(|e| anyhow!("Zstd decompression failed: {e}"))
}
CompressionType::Lz4 => oxiarc_lz4::decompress(data, 100 * 1024 * 1024)
.map_err(|e| anyhow!("LZ4 decompression failed: {e}")),
CompressionType::Snappy => oxiarc_snappy::decompress(data)
.map_err(|e| anyhow!("Snappy decompression failed: {e}")),
}
}
}
pub struct FormatConverter {
source_format: SerializationFormat,
target_format: SerializationFormat,
#[allow(dead_code)]
schema_registry: Option<Arc<SchemaRegistry>>,
}
impl FormatConverter {
pub fn new(source: SerializationFormat, target: SerializationFormat) -> Self {
Self {
source_format: source,
target_format: target,
schema_registry: None,
}
}
pub async fn convert(&self, data: &[u8]) -> Result<Bytes> {
let source_serializer = EventSerializer::new(self.source_format);
let event = source_serializer.deserialize(data).await?;
let target_serializer = EventSerializer::new(self.target_format);
target_serializer.serialize(&event).await
}
}
pub struct StreamingSerializer {
serializer: EventSerializer,
delta_compressor: Option<DeltaCompressor>,
batch_size: usize,
current_batch: Vec<StreamEvent>,
}
impl StreamingSerializer {
pub fn new(serializer: EventSerializer, batch_size: usize) -> Self {
Self {
serializer,
delta_compressor: None,
batch_size,
current_batch: Vec::new(),
}
}
pub fn with_delta_compression(
mut self,
compression_type: DeltaCompressionType,
max_states: usize,
) -> Self {
self.delta_compressor = Some(DeltaCompressor::new(compression_type, max_states));
self
}
pub async fn add_event(&mut self, event: StreamEvent) -> Result<Option<Bytes>> {
self.current_batch.push(event);
if self.current_batch.len() >= self.batch_size {
self.flush_batch().await
} else {
Ok(None)
}
}
pub async fn flush_batch(&mut self) -> Result<Option<Bytes>> {
if self.current_batch.is_empty() {
return Ok(None);
}
let batch = std::mem::take(&mut self.current_batch);
let serialized = self.serialize_batch(&batch).await?;
Ok(Some(serialized))
}
async fn serialize_batch(&self, batch: &[StreamEvent]) -> Result<Bytes> {
let mut buffer = BytesMut::new();
buffer.put_u32(batch.len() as u32);
buffer.put_u64(chrono::Utc::now().timestamp_millis() as u64);
for event in batch {
let event_data = self.serializer.serialize(event).await?;
buffer.put_u32(event_data.len() as u32);
buffer.put(event_data);
}
Ok(buffer.freeze())
}
pub async fn deserialize_batch(&self, data: &[u8]) -> Result<Vec<StreamEvent>> {
let mut cursor = std::io::Cursor::new(data);
let mut events = Vec::new();
let batch_size = cursor.get_u32();
let _timestamp = cursor.get_u64();
for _ in 0..batch_size {
let event_size = cursor.get_u32() as usize;
let event_data =
&data[cursor.position() as usize..(cursor.position() as usize + event_size)];
cursor.advance(event_size);
let event = self.serializer.deserialize(event_data).await?;
events.push(event);
}
Ok(events)
}
pub fn create_batch_stream(
&self,
events: impl Stream<Item = StreamEvent> + Send + 'static,
) -> BoxStream<'static, Result<Bytes>> {
let serializer = self.serializer.clone();
let batch_size = self.batch_size;
Box::pin(events.chunks(batch_size).then(move |chunk| {
let serializer = serializer.clone();
async move {
let streaming_serializer = StreamingSerializer::new(serializer, batch_size);
streaming_serializer.serialize_batch(&chunk).await
}
}))
}
}
pub struct EnhancedBinaryFormat {
version: u8,
enable_compression: bool,
enable_checksums: bool,
chunk_size: usize,
}
impl EnhancedBinaryFormat {
pub fn new() -> Self {
Self {
version: 2, enable_compression: true,
enable_checksums: true,
chunk_size: 8192, }
}
pub fn with_compression(mut self, enable: bool) -> Self {
self.enable_compression = enable;
self
}
pub fn with_checksums(mut self, enable: bool) -> Self {
self.enable_checksums = enable;
self
}
pub fn with_chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size;
self
}
pub async fn serialize(&self, event: &StreamEvent) -> Result<Bytes> {
let mut buffer = BytesMut::new();
buffer.put(&b"BIN2"[..]); buffer.put_u8(self.version);
buffer.put_u8(self.get_flags());
let event_json = serde_json::to_vec(event)?;
let data = if self.enable_compression {
oxiarc_lz4::compress(&event_json)
.map_err(|e| anyhow!("LZ4 compression failed: {}", e))?
} else {
event_json
};
if self.enable_checksums {
let checksum = crc32fast::hash(&data);
buffer.put_u32(checksum);
}
buffer.put_u32(data.len() as u32);
buffer.put(&data[..]);
Ok(buffer.freeze())
}
pub async fn deserialize(&self, data: &[u8]) -> Result<StreamEvent> {
let mut cursor = std::io::Cursor::new(data);
let mut magic = [0u8; 4];
cursor.read_exact(&mut magic)?;
if &magic != b"BIN2" {
return Err(anyhow!("Invalid magic bytes for enhanced binary format"));
}
let version = cursor.get_u8();
if version != self.version {
return Err(anyhow!(
"Unsupported enhanced binary format version: {}",
version
));
}
let flags = cursor.get_u8();
let has_compression = (flags & 0x01) != 0;
let has_checksum = (flags & 0x02) != 0;
let expected_checksum = if has_checksum {
Some(cursor.get_u32())
} else {
None
};
let data_len = cursor.get_u32() as usize;
let mut event_data = vec![0u8; data_len];
cursor.read_exact(&mut event_data)?;
if let Some(expected) = expected_checksum {
let actual = crc32fast::hash(&event_data);
if actual != expected {
return Err(anyhow!(
"Checksum mismatch: expected {}, got {}",
expected,
actual
));
}
}
let decompressed = if has_compression {
oxiarc_lz4::decompress(&event_data, 100 * 1024 * 1024)
.map_err(|e| anyhow!("LZ4 decompression failed: {}", e))?
} else {
event_data
};
let event = serde_json::from_slice(&decompressed)?;
Ok(event)
}
pub async fn serialize_streaming(&self, event: &StreamEvent) -> Result<Vec<Bytes>> {
let serialized = self.serialize(event).await?;
let mut chunks = Vec::new();
if serialized.len() <= self.chunk_size {
chunks.push(serialized);
} else {
let chunk_count = (serialized.len() + self.chunk_size - 1) / self.chunk_size;
for i in 0..chunk_count {
let start = i * self.chunk_size;
let end = std::cmp::min(start + self.chunk_size, serialized.len());
let mut chunk_buffer = BytesMut::new();
chunk_buffer.put(&b"CHNK"[..]); chunk_buffer.put_u32(i as u32); chunk_buffer.put_u32(chunk_count as u32); chunk_buffer.put_u32((end - start) as u32); chunk_buffer.put(&serialized[start..end]);
chunks.push(chunk_buffer.freeze());
}
}
Ok(chunks)
}
pub async fn deserialize_streaming(&self, chunks: Vec<Bytes>) -> Result<StreamEvent> {
if chunks.len() == 1 && !chunks[0].starts_with(b"CHNK") {
return self.deserialize(&chunks[0]).await;
}
let mut chunk_data: BTreeMap<u32, Vec<u8>> = BTreeMap::new();
let mut total_chunks = 0;
for chunk in chunks {
if !chunk.starts_with(b"CHNK") {
return Err(anyhow!("Invalid chunk format"));
}
let mut cursor = std::io::Cursor::new(&chunk[4..]);
let chunk_index = cursor.get_u32();
let chunk_count = cursor.get_u32();
let chunk_size = cursor.get_u32() as usize;
total_chunks = chunk_count;
let data = chunk[16..16 + chunk_size].to_vec();
chunk_data.insert(chunk_index, data);
}
if chunk_data.len() != total_chunks as usize {
return Err(anyhow!(
"Missing chunks: got {}, expected {}",
chunk_data.len(),
total_chunks
));
}
let mut reassembled = Vec::new();
for (_index, data) in chunk_data {
reassembled.extend(data);
}
self.deserialize(&reassembled).await
}
fn get_flags(&self) -> u8 {
let mut flags = 0u8;
if self.enable_compression {
flags |= 0x01;
}
if self.enable_checksums {
flags |= 0x02;
}
flags
}
}
impl Default for EnhancedBinaryFormat {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod compression_tests {
use super::*;
fn serializer() -> EventSerializer {
EventSerializer::new(SerializationFormat::Json)
}
#[test]
fn test_raw_snappy_round_trip() {
let ser = serializer();
let data = b"serialization encoder raw snappy payload ".repeat(48);
let compressed = ser
.compress(&data, &CompressionType::Snappy)
.expect("snappy compress");
let restored = ser
.decompress(&compressed, &CompressionType::Snappy)
.expect("snappy decompress");
assert_eq!(restored, data, "raw snappy round-trip mismatch");
}
#[test]
fn test_raw_snappy_round_trip_random() {
use scirs2_core::random::Random;
use scirs2_core::RngExt;
let mut rng = Random::default();
let data: Vec<u8> = (0..2048).map(|_| rng.random()).collect();
let ser = serializer();
let compressed = ser
.compress(&data, &CompressionType::Snappy)
.expect("snappy compress random");
let restored = ser
.decompress(&compressed, &CompressionType::Snappy)
.expect("snappy decompress random");
assert_eq!(restored, data, "raw snappy random round-trip mismatch");
}
#[test]
fn test_gzip_round_trip() {
let ser = serializer();
let data = b"serialization encoder gzip payload ".repeat(40);
let compressed = ser
.compress(&data, &CompressionType::Gzip)
.expect("gzip compress");
let restored = ser
.decompress(&compressed, &CompressionType::Gzip)
.expect("gzip decompress");
assert_eq!(restored, data, "gzip round-trip mismatch");
}
}