use anyhow::{anyhow, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use chrono::{DateTime, Utc};
use crc32fast;
use futures::stream::{BoxStream, StreamExt as _};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
use std::io::Read as _;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_stream::Stream;
use crate::{CompressionType, EventMetadata, StreamEvent};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SerializationFormat {
Json,
Protobuf,
Avro,
Binary,
MessagePack,
Cbor,
}
impl SerializationFormat {
pub fn magic_bytes(&self) -> &[u8] {
match self {
SerializationFormat::Json => b"JSON",
SerializationFormat::Protobuf => b"PB03",
SerializationFormat::Avro => b"Obj\x01",
SerializationFormat::Binary => b"BIN1",
SerializationFormat::MessagePack => b"MSGP",
SerializationFormat::Cbor => b"CBOR",
}
}
pub fn detect(data: &[u8]) -> Option<Self> {
if data.len() < 4 {
return None;
}
let magic = &data[0..4];
match magic {
b"JSON" => Some(SerializationFormat::Json),
b"PB03" => Some(SerializationFormat::Protobuf),
b"Obj\x01" => Some(SerializationFormat::Avro),
b"BIN1" => Some(SerializationFormat::Binary),
b"MSGP" => Some(SerializationFormat::MessagePack),
b"CBOR" => Some(SerializationFormat::Cbor),
_ => {
if data.starts_with(b"{") || data.starts_with(b"[") {
Some(SerializationFormat::Json)
} else {
None
}
}
}
}
}
#[derive(Clone)]
pub struct EventSerializer {
format: SerializationFormat,
compression: Option<CompressionType>,
schema_registry: Option<Arc<SchemaRegistry>>,
options: SerializerOptions,
}
#[derive(Debug, Clone)]
pub struct SerializerOptions {
pub include_schema_id: bool,
pub include_magic_bytes: bool,
pub pretty_json: bool,
pub validate_schema: bool,
pub max_size: Option<usize>,
}
impl Default for SerializerOptions {
fn default() -> Self {
Self {
include_schema_id: true,
include_magic_bytes: true,
pretty_json: false,
validate_schema: true,
max_size: Some(1024 * 1024), }
}
}
pub struct SchemaRegistry {
schemas: Arc<RwLock<HashMap<String, Schema>>>,
evolution_rules: EvolutionRules,
}
#[derive(Debug, Clone)]
pub struct Schema {
pub id: String,
pub version: u32,
pub format: SerializationFormat,
pub definition: SchemaDefinition,
pub compatibility: CompatibilityMode,
}
#[derive(Debug, Clone)]
pub enum SchemaDefinition {
JsonSchema(serde_json::Value),
ProtobufDescriptor(Vec<u8>),
AvroSchema(String),
Custom(HashMap<String, serde_json::Value>),
}
#[derive(Debug, Clone, Copy)]
pub enum CompatibilityMode {
None,
Backward,
Forward,
Full,
}
#[derive(Debug, Clone)]
pub struct EvolutionRules {
pub allow_field_addition: bool,
pub allow_field_removal: bool,
pub allow_type_promotion: bool,
pub required_fields: Vec<String>,
}
impl Default for EvolutionRules {
fn default() -> Self {
Self {
allow_field_addition: true,
allow_field_removal: false,
allow_type_promotion: true,
required_fields: vec!["event_id".to_string(), "timestamp".to_string()],
}
}
}
impl EventSerializer {
pub fn new(format: SerializationFormat) -> Self {
Self {
format,
compression: None,
schema_registry: None,
options: SerializerOptions::default(),
}
}
pub fn with_compression(mut self, compression: CompressionType) -> Self {
self.compression = Some(compression);
self
}
pub fn with_schema_registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
self.schema_registry = Some(registry);
self
}
pub fn with_options(mut self, options: SerializerOptions) -> Self {
self.options = options;
self
}
pub async fn serialize(&self, event: &StreamEvent) -> Result<Bytes> {
let mut buffer = BytesMut::new();
if self.options.include_magic_bytes {
buffer.put(self.format.magic_bytes());
}
if self.options.include_schema_id {
if let Some(registry) = &self.schema_registry {
let schema_id = registry.get_schema_id_for_event(event).await?;
buffer.put_u32(schema_id.parse::<u32>().unwrap_or(0));
}
}
let serialized = match self.format {
SerializationFormat::Json => self.serialize_json(event)?,
SerializationFormat::Binary => self.serialize_binary(event)?,
SerializationFormat::MessagePack => self.serialize_messagepack(event)?,
SerializationFormat::Cbor => self.serialize_cbor(event)?,
SerializationFormat::Protobuf => self.serialize_protobuf(event)?,
SerializationFormat::Avro => self.serialize_avro(event).await?,
};
let data = if let Some(compression) = &self.compression {
self.compress(&serialized, compression)?
} else {
serialized
};
if let Some(max_size) = self.options.max_size {
if data.len() > max_size {
return Err(anyhow!(
"Serialized data exceeds maximum size: {} > {max_size}",
data.len()
));
}
}
buffer.put(&data[..]);
Ok(buffer.freeze())
}
pub async fn deserialize(&self, data: &[u8]) -> Result<StreamEvent> {
let mut cursor = std::io::Cursor::new(data);
let mut offset = 0;
if self.options.include_magic_bytes && data.len() >= 4 {
let magic = &data[0..4];
if magic == self.format.magic_bytes() {
offset += 4;
cursor.set_position(4);
}
}
if self.options.include_schema_id
&& self.schema_registry.is_some()
&& data.len() >= offset + 4
{
offset += 4;
cursor.set_position(offset as u64);
}
let event_data = &data[offset..];
let decompressed = if let Some(compression) = &self.compression {
self.decompress(event_data, compression)?
} else {
event_data.to_vec()
};
match self.format {
SerializationFormat::Json => self.deserialize_json(&decompressed),
SerializationFormat::Binary => self.deserialize_binary(&decompressed),
SerializationFormat::MessagePack => self.deserialize_messagepack(&decompressed),
SerializationFormat::Cbor => self.deserialize_cbor(&decompressed),
SerializationFormat::Protobuf => self.deserialize_protobuf(&decompressed),
SerializationFormat::Avro => self.deserialize_avro(&decompressed).await,
}
}
fn serialize_json(&self, event: &StreamEvent) -> Result<Vec<u8>> {
if self.options.pretty_json {
serde_json::to_vec_pretty(event).map_err(|e| anyhow!("JSON serialization failed: {e}"))
} else {
serde_json::to_vec(event).map_err(|e| anyhow!("JSON serialization failed: {e}"))
}
}
fn deserialize_json(&self, data: &[u8]) -> Result<StreamEvent> {
serde_json::from_slice(data).map_err(|e| anyhow!("JSON deserialization failed: {e}"))
}
fn serialize_binary(&self, event: &StreamEvent) -> Result<Vec<u8>> {
let mut buffer = Vec::new();
buffer.push(1);
let event_type = match event {
StreamEvent::TripleAdded { .. } => 1,
StreamEvent::TripleRemoved { .. } => 2,
StreamEvent::QuadAdded { .. } => 3,
StreamEvent::QuadRemoved { .. } => 4,
StreamEvent::GraphCreated { .. } => 5,
StreamEvent::GraphCleared { .. } => 6,
StreamEvent::GraphDeleted { .. } => 7,
StreamEvent::GraphMetadataUpdated { .. } => 17,
StreamEvent::GraphPermissionsChanged { .. } => 18,
StreamEvent::GraphStatisticsUpdated { .. } => 19,
StreamEvent::GraphRenamed { .. } => 20,
StreamEvent::GraphMerged { .. } => 21,
StreamEvent::GraphSplit { .. } => 22,
StreamEvent::SparqlUpdate { .. } => 8,
StreamEvent::TransactionBegin { .. } => 9,
StreamEvent::TransactionCommit { .. } => 10,
StreamEvent::TransactionAbort { .. } => 11,
StreamEvent::SchemaChanged { .. } => 12,
StreamEvent::SchemaDefinitionAdded { .. } => 23,
StreamEvent::SchemaDefinitionRemoved { .. } => 24,
StreamEvent::SchemaDefinitionModified { .. } => 25,
StreamEvent::OntologyImported { .. } => 26,
StreamEvent::OntologyRemoved { .. } => 27,
StreamEvent::ConstraintAdded { .. } => 28,
StreamEvent::ConstraintRemoved { .. } => 29,
StreamEvent::ConstraintViolated { .. } => 30,
StreamEvent::IndexCreated { .. } => 31,
StreamEvent::IndexDropped { .. } => 32,
StreamEvent::IndexRebuilt { .. } => 33,
StreamEvent::ShapeAdded { .. } => 34,
StreamEvent::ShapeRemoved { .. } => 35,
StreamEvent::ShapeModified { .. } => 36,
StreamEvent::ShapeValidationStarted { .. } => 37,
StreamEvent::ShapeValidationCompleted { .. } => 38,
StreamEvent::ShapeViolationDetected { .. } => 39,
StreamEvent::QueryResultAdded { .. } => 14,
StreamEvent::QueryResultRemoved { .. } => 15,
StreamEvent::QueryCompleted { .. } => 16,
StreamEvent::SchemaUpdated { .. } => 40,
StreamEvent::ShapeUpdated { .. } => 41,
StreamEvent::Heartbeat { .. } => 13,
StreamEvent::ErrorOccurred { .. } => 42,
};
buffer.push(event_type);
match event {
StreamEvent::TripleAdded {
subject,
predicate,
object,
graph,
metadata,
} => {
self.write_string(&mut buffer, subject);
self.write_string(&mut buffer, predicate);
self.write_string(&mut buffer, object);
self.write_optional_string(&mut buffer, graph.as_deref());
self.write_metadata(&mut buffer, metadata)?;
}
_ => {
return Err(anyhow!(
"Binary serialization not implemented for this event type"
))
}
}
Ok(buffer)
}
fn write_string(&self, buffer: &mut Vec<u8>, s: &str) {
let bytes = s.as_bytes();
buffer.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
buffer.extend_from_slice(bytes);
}
fn write_optional_string(&self, buffer: &mut Vec<u8>, s: Option<&str>) {
match s {
Some(s) => {
buffer.push(1); self.write_string(buffer, s);
}
None => {
buffer.push(0); }
}
}
fn write_metadata(&self, buffer: &mut Vec<u8>, metadata: &EventMetadata) -> Result<()> {
let metadata_json = serde_json::to_vec(metadata)?;
buffer.extend_from_slice(&(metadata_json.len() as u32).to_le_bytes());
buffer.extend_from_slice(&metadata_json);
Ok(())
}
fn deserialize_binary(&self, data: &[u8]) -> Result<StreamEvent> {
if data.len() < 2 {
return Err(anyhow!("Binary data too short"));
}
let version = data[0];
if version != 1 {
return Err(anyhow!("Unsupported binary format version: {version}"));
}
let event_type = data[1];
let mut cursor = std::io::Cursor::new(&data[2..]);
match event_type {
1 => {
let subject = self.read_string(&mut cursor)?;
let predicate = self.read_string(&mut cursor)?;
let object = self.read_string(&mut cursor)?;
let graph = self.read_optional_string(&mut cursor)?;
let metadata = self.read_metadata(&mut cursor)?;
Ok(StreamEvent::TripleAdded {
subject,
predicate,
object,
graph,
metadata,
})
}
_ => Err(anyhow!("Unknown event type: {event_type}")),
}
}
fn read_string(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<String> {
use std::io::Read;
let mut len_bytes = [0u8; 4];
cursor.read_exact(&mut len_bytes)?;
let len = u32::from_le_bytes(len_bytes) as usize;
let mut bytes = vec![0u8; len];
cursor.read_exact(&mut bytes)?;
String::from_utf8(bytes).map_err(|e| anyhow!("Invalid UTF-8: {e}"))
}
fn read_optional_string(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<Option<String>> {
use std::io::Read;
let mut present = [0u8; 1];
cursor.read_exact(&mut present)?;
if present[0] == 1 {
Ok(Some(self.read_string(cursor)?))
} else {
Ok(None)
}
}
fn read_metadata(&self, cursor: &mut std::io::Cursor<&[u8]>) -> Result<EventMetadata> {
use std::io::Read;
let mut len_bytes = [0u8; 4];
cursor.read_exact(&mut len_bytes)?;
let len = u32::from_le_bytes(len_bytes) as usize;
let mut json_bytes = vec![0u8; len];
cursor.read_exact(&mut json_bytes)?;
serde_json::from_slice(&json_bytes).map_err(|e| anyhow!("Failed to parse metadata: {e}"))
}
fn serialize_messagepack(&self, event: &StreamEvent) -> Result<Vec<u8>> {
rmp_serde::to_vec(event).map_err(|e| anyhow!("MessagePack serialization failed: {e}"))
}
fn deserialize_messagepack(&self, data: &[u8]) -> Result<StreamEvent> {
rmp_serde::from_slice(data).map_err(|e| anyhow!("MessagePack deserialization failed: {e}"))
}
fn serialize_cbor(&self, event: &StreamEvent) -> Result<Vec<u8>> {
let mut buf = Vec::new();
ciborium::ser::into_writer(event, &mut buf)
.map_err(|e| anyhow!("CBOR serialization failed: {e}"))?;
Ok(buf)
}
fn deserialize_cbor(&self, data: &[u8]) -> Result<StreamEvent> {
ciborium::de::from_reader(data).map_err(|e| anyhow!("CBOR deserialization failed: {e}"))
}
fn serialize_protobuf(&self, event: &StreamEvent) -> Result<Vec<u8>> {
let json_data = serde_json::to_value(event)?;
let proto_event = ProtobufStreamEvent::from_json(&json_data)?;
let mut buf = Vec::new();
prost::Message::encode(&proto_event, &mut buf)?;
Ok(buf)
}
fn deserialize_protobuf(&self, data: &[u8]) -> Result<StreamEvent> {
let proto_event = ProtobufStreamEvent::decode(data)?;
let json_value = proto_event.to_json()?;
let event: StreamEvent = serde_json::from_value(json_value)?;
Ok(event)
}
async fn serialize_avro(&self, event: &StreamEvent) -> Result<Vec<u8>> {
let schema = if let Some(registry) = &self.schema_registry {
registry.get_avro_schema_for_event(event).await?
} else {
get_default_avro_schema()
};
let avro_value = to_avro_value(event, &schema)?;
let mut writer = Vec::new();
let mut encoder = apache_avro::Writer::new(&schema, &mut writer);
encoder.append(avro_value)?;
encoder.flush()?;
let result = encoder.into_inner()?.to_vec();
Ok(result)
}
async fn deserialize_avro(&self, data: &[u8]) -> Result<StreamEvent> {
let reader = apache_avro::Reader::new(data)?;
let schema = reader.writer_schema().clone();
if let Some(record) = reader.into_iter().next() {
let avro_value = record?;
let event = from_avro_value(&avro_value, &schema)?;
Ok(event)
} else {
Err(anyhow!("No Avro record found in data"))
}
}
fn compress(&self, data: &[u8], compression: &CompressionType) -> Result<Vec<u8>> {
use flate2::write::GzEncoder;
use std::io::Write;
match compression {
CompressionType::None => Ok(data.to_vec()),
CompressionType::Gzip => {
let mut encoder = GzEncoder::new(Vec::new(), flate2::Compression::default());
encoder.write_all(data)?;
encoder
.finish()
.map_err(|e| anyhow!("Gzip compression failed: {e}"))
}
CompressionType::Zstd => oxiarc_zstd::encode_all(data, 3)
.map_err(|e| anyhow!("Zstd compression failed: {e}")),
_ => Err(anyhow!("Compression type {compression:?} not implemented")),
}
}
fn decompress(&self, data: &[u8], compression: &CompressionType) -> Result<Vec<u8>> {
use flate2::read::GzDecoder;
use std::io::Read;
match compression {
CompressionType::None => Ok(data.to_vec()),
CompressionType::Gzip => {
let mut decoder = GzDecoder::new(data);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)?;
Ok(decompressed)
}
CompressionType::Zstd => {
oxiarc_zstd::decode_all(data).map_err(|e| anyhow!("Zstd decompression failed: {e}"))
}
_ => Err(anyhow!(
"Decompression type {compression:?} not implemented"
)),
}
}
}
impl SchemaRegistry {
pub fn new(evolution_rules: EvolutionRules) -> Self {
Self {
schemas: Arc::new(RwLock::new(HashMap::new())),
evolution_rules,
}
}
pub async fn register_schema(&self, schema: Schema) -> Result<String> {
let schema_id = schema.id.clone();
self.schemas.write().await.insert(schema_id.clone(), schema);
Ok(schema_id)
}
pub async fn get_schema(&self, id: &str) -> Result<Schema> {
self.schemas
.read()
.await
.get(id)
.cloned()
.ok_or_else(|| anyhow!("Schema {id} not found"))
}
pub async fn get_schema_id_for_event(&self, _event: &StreamEvent) -> Result<String> {
Ok("default-v1".to_string())
}
pub async fn validate_evolution(&self, old_schema: &Schema, new_schema: &Schema) -> Result<()> {
match old_schema.compatibility {
CompatibilityMode::None => Ok(()),
CompatibilityMode::Backward => {
self.check_backward_compatibility(old_schema, new_schema)
}
CompatibilityMode::Forward => {
self.check_forward_compatibility(old_schema, new_schema)
}
CompatibilityMode::Full => {
self.check_backward_compatibility(old_schema, new_schema)?;
self.check_forward_compatibility(old_schema, new_schema)
}
}
}
fn check_backward_compatibility(
&self,
_old_schema: &Schema,
_new_schema: &Schema,
) -> Result<()> {
Ok(())
}
fn check_forward_compatibility(
&self,
_old_schema: &Schema,
_new_schema: &Schema,
) -> Result<()> {
Ok(())
}
}
pub struct FormatConverter {
source_format: SerializationFormat,
target_format: SerializationFormat,
schema_registry: Option<Arc<SchemaRegistry>>,
}
impl FormatConverter {
pub fn new(source: SerializationFormat, target: SerializationFormat) -> Self {
Self {
source_format: source,
target_format: target,
schema_registry: None,
}
}
pub async fn convert(&self, data: &[u8]) -> Result<Bytes> {
let source_serializer = EventSerializer::new(self.source_format);
let event = source_serializer.deserialize(data).await?;
let target_serializer = EventSerializer::new(self.target_format);
target_serializer.serialize(&event).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::StreamEvent;
#[tokio::test]
async fn test_json_serialization() {
let event = StreamEvent::Heartbeat {
timestamp: chrono::Utc::now(),
source: "test".to_string(),
metadata: crate::event::EventMetadata::default(),
};
let serializer = EventSerializer::new(SerializationFormat::Json);
let serialized = serializer.serialize(&event).await.unwrap();
let deserialized = serializer.deserialize(&serialized).await.unwrap();
match deserialized {
StreamEvent::Heartbeat { source, .. } => {
assert_eq!(source, "test");
}
_ => panic!("Wrong event type"),
}
}
#[tokio::test]
async fn test_format_detection() {
let json_data = b"{\"test\": \"data\"}";
assert_eq!(
SerializationFormat::detect(json_data),
Some(SerializationFormat::Json)
);
let magic_data = b"PB03some_data";
assert_eq!(
SerializationFormat::detect(magic_data),
Some(SerializationFormat::Protobuf)
);
}
#[tokio::test]
async fn test_compression() {
let event = StreamEvent::Heartbeat {
timestamp: chrono::Utc::now(),
source: "test".to_string(),
metadata: crate::event::EventMetadata::default(),
};
let serializer =
EventSerializer::new(SerializationFormat::Json).with_compression(CompressionType::Gzip);
let serialized = serializer.serialize(&event).await.unwrap();
let deserialized = serializer.deserialize(&serialized).await.unwrap();
match deserialized {
StreamEvent::Heartbeat { source, .. } => {
assert_eq!(source, "test");
}
_ => panic!("Wrong event type"),
}
}
#[tokio::test]
async fn test_messagepack_serialization() {
let metadata = EventMetadata::default();
let event = StreamEvent::TripleAdded {
subject: "http://example.org/subject".to_string(),
predicate: "http://example.org/predicate".to_string(),
object: "http://example.org/object".to_string(),
graph: None,
metadata,
};
let serializer = EventSerializer::new(SerializationFormat::MessagePack);
let serialized = serializer.serialize(&event).await.unwrap();
let deserialized = serializer.deserialize(&serialized).await.unwrap();
match deserialized {
StreamEvent::TripleAdded {
subject,
predicate,
object,
..
} => {
assert_eq!(subject, "http://example.org/subject");
assert_eq!(predicate, "http://example.org/predicate");
assert_eq!(object, "http://example.org/object");
}
_ => panic!("Wrong event type"),
}
}
#[tokio::test]
async fn test_format_conversion() {
let event = StreamEvent::Heartbeat {
timestamp: chrono::Utc::now(),
source: "test".to_string(),
metadata: crate::event::EventMetadata::default(),
};
let json_serializer = EventSerializer::new(SerializationFormat::Json);
let json_data = json_serializer.serialize(&event).await.unwrap();
let converter =
FormatConverter::new(SerializationFormat::Json, SerializationFormat::MessagePack);
let msgpack_data = converter.convert(&json_data).await.unwrap();
let msgpack_serializer = EventSerializer::new(SerializationFormat::MessagePack);
let deserialized = msgpack_serializer.deserialize(&msgpack_data).await.unwrap();
match deserialized {
StreamEvent::Heartbeat { source, .. } => {
assert_eq!(source, "test");
}
_ => panic!("Wrong event type"),
}
}
}
#[derive(Debug, Clone)]
pub struct ProtobufStreamEvent {
pub event_type: String,
pub data: Vec<u8>,
pub metadata: Vec<u8>,
}
impl ProtobufStreamEvent {
pub fn from_json(json: &serde_json::Value) -> Result<Self> {
let event_type = "StreamEvent".to_string();
let data = serde_json::to_vec(json)?;
let metadata = Vec::new();
Ok(Self {
event_type,
data,
metadata,
})
}
pub fn to_json(&self) -> Result<serde_json::Value> {
serde_json::from_slice(&self.data).map_err(|e| anyhow!("Failed to parse JSON: {}", e))
}
pub fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
buf.extend_from_slice(&self.data);
Ok(())
}
pub fn decode(data: &[u8]) -> Result<Self> {
Ok(Self {
event_type: "StreamEvent".to_string(),
data: data.to_vec(),
metadata: Vec::new(),
})
}
}
impl prost::Message for ProtobufStreamEvent {
fn encode_raw(&self, buf: &mut impl prost::bytes::BufMut) {
buf.put_slice(&self.data);
}
fn merge_field(
&mut self,
_tag: u32,
_wire_type: prost::encoding::WireType,
_buf: &mut impl prost::bytes::Buf,
_ctx: prost::encoding::DecodeContext,
) -> Result<(), prost::DecodeError> {
Ok(())
}
fn encoded_len(&self) -> usize {
self.data.len()
}
fn clear(&mut self) {
self.data.clear();
self.metadata.clear();
}
}
pub fn get_default_avro_schema() -> apache_avro::Schema {
let schema_str = r#"
{
"type": "record",
"name": "StreamEvent",
"fields": [
{"name": "event_type", "type": "string"},
{"name": "data", "type": "bytes"},
{"name": "metadata", "type": ["null", "bytes"], "default": null}
]
}
"#;
apache_avro::Schema::parse_str(schema_str).expect("Failed to parse default Avro schema")
}
pub fn to_avro_value(
event: &StreamEvent,
_schema: &apache_avro::Schema,
) -> Result<apache_avro::types::Value> {
let json_data = serde_json::to_vec(event)?;
let fields = vec![
(
"event_type".to_string(),
apache_avro::types::Value::String("StreamEvent".to_string()),
),
(
"data".to_string(),
apache_avro::types::Value::Bytes(json_data),
),
(
"metadata".to_string(),
apache_avro::types::Value::Union(0, Box::new(apache_avro::types::Value::Null)),
),
];
Ok(apache_avro::types::Value::Record(fields))
}
pub fn from_avro_value(
value: &apache_avro::types::Value,
_schema: &apache_avro::Schema,
) -> Result<StreamEvent> {
match value {
apache_avro::types::Value::Record(fields) => {
for (name, field_value) in fields {
if name == "data" {
if let apache_avro::types::Value::Bytes(bytes) = field_value {
let event: StreamEvent = serde_json::from_slice(bytes)?;
return Ok(event);
}
}
}
Err(anyhow!("No data field found in Avro record"))
}
_ => Err(anyhow!("Expected Avro record, got {:?}", value)),
}
}
impl SchemaRegistry {
pub async fn get_avro_schema_for_event(
&self,
_event: &StreamEvent,
) -> Result<apache_avro::Schema> {
Ok(get_default_avro_schema())
}
}
pub struct DeltaCompressor {
previous_states: Arc<RwLock<HashMap<String, StreamEvent>>>,
compression_type: DeltaCompressionType,
max_states: usize,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum DeltaCompressionType {
Xor,
Prefix,
Dictionary,
Lz4Delta,
}
impl DeltaCompressor {
pub fn new(compression_type: DeltaCompressionType, max_states: usize) -> Self {
Self {
previous_states: Arc::new(RwLock::new(HashMap::new())),
compression_type,
max_states,
}
}
pub async fn compress_delta(
&self,
event: &StreamEvent,
event_id: &str,
) -> Result<DeltaCompressedEvent> {
let mut states = self.previous_states.write().await;
if states.len() >= self.max_states {
let keys_to_remove: Vec<String> = states
.keys()
.take(states.len() - self.max_states + 1)
.cloned()
.collect();
for key in keys_to_remove {
states.remove(&key);
}
}
let delta = if let Some(previous) = states.get(event_id) {
self.calculate_delta(previous, event)?
} else {
EventDelta::Full(Box::new(event.clone()))
};
states.insert(event_id.to_string(), event.clone());
Ok(DeltaCompressedEvent {
event_id: event_id.to_string(),
delta,
compression_type: self.compression_type,
timestamp: chrono::Utc::now(),
})
}
fn calculate_delta(&self, previous: &StreamEvent, current: &StreamEvent) -> Result<EventDelta> {
match self.compression_type {
DeltaCompressionType::Xor => self.calculate_xor_delta(previous, current),
DeltaCompressionType::Prefix => self.calculate_prefix_delta(previous, current),
DeltaCompressionType::Dictionary => self.calculate_dictionary_delta(previous, current),
DeltaCompressionType::Lz4Delta => self.calculate_lz4_delta(previous, current),
}
}
fn calculate_xor_delta(
&self,
previous: &StreamEvent,
current: &StreamEvent,
) -> Result<EventDelta> {
let prev_bytes = serde_json::to_vec(previous)?;
let curr_bytes = serde_json::to_vec(current)?;
if prev_bytes.len() != curr_bytes.len() {
return Ok(EventDelta::Full(Box::new(current.clone())));
}
let xor_bytes: Vec<u8> = prev_bytes
.iter()
.zip(curr_bytes.iter())
.map(|(a, b)| a ^ b)
.collect();
Ok(EventDelta::Xor(xor_bytes))
}
fn calculate_prefix_delta(
&self,
previous: &StreamEvent,
current: &StreamEvent,
) -> Result<EventDelta> {
let prev_json = serde_json::to_value(previous)?;
let curr_json = serde_json::to_value(current)?;
let diff = self.calculate_json_prefix_diff(&prev_json, &curr_json)?;
Ok(EventDelta::Prefix(diff))
}
fn calculate_dictionary_delta(
&self,
previous: &StreamEvent,
current: &StreamEvent,
) -> Result<EventDelta> {
let prev_strings = self.extract_strings_from_event(previous);
let curr_strings = self.extract_strings_from_event(current);
let mut dictionary = HashMap::new();
let mut dict_id = 0u16;
for string in &prev_strings {
if curr_strings.contains(string) && !dictionary.contains_key(string) {
dictionary.insert(string.clone(), dict_id);
dict_id += 1;
}
}
let compressed_event = self.replace_strings_with_ids(current, &dictionary)?;
Ok(EventDelta::Dictionary {
dictionary,
compressed_event,
})
}
fn calculate_lz4_delta(
&self,
previous: &StreamEvent,
current: &StreamEvent,
) -> Result<EventDelta> {
let prev_bytes = serde_json::to_vec(previous)?;
let curr_bytes = serde_json::to_vec(current)?;
let diff_bytes = self.calculate_byte_diff(&prev_bytes, &curr_bytes);
let compressed = oxiarc_lz4::compress(&diff_bytes)
.map_err(|e| anyhow!("LZ4 compression failed: {}", e))?;
Ok(EventDelta::Lz4(compressed))
}
fn calculate_json_prefix_diff(
&self,
prev: &serde_json::Value,
curr: &serde_json::Value,
) -> Result<serde_json::Value> {
match (prev, curr) {
(serde_json::Value::Object(prev_obj), serde_json::Value::Object(curr_obj)) => {
let mut diff = serde_json::Map::new();
for (key, curr_val) in curr_obj {
if let Some(prev_val) = prev_obj.get(key) {
if prev_val != curr_val {
diff.insert(key.clone(), curr_val.clone());
}
} else {
diff.insert(key.clone(), curr_val.clone());
}
}
Ok(serde_json::Value::Object(diff))
}
_ => Ok(curr.clone()),
}
}
fn extract_strings_from_event(&self, event: &StreamEvent) -> Vec<String> {
let mut strings = Vec::new();
if let Ok(json) = serde_json::to_value(event) {
Self::extract_strings_from_json(&json, &mut strings);
}
strings
}
fn extract_strings_from_json(value: &serde_json::Value, strings: &mut Vec<String>) {
match value {
serde_json::Value::String(s) => strings.push(s.clone()),
serde_json::Value::Array(arr) => {
for item in arr {
Self::extract_strings_from_json(item, strings);
}
}
serde_json::Value::Object(obj) => {
for (_, val) in obj {
Self::extract_strings_from_json(val, strings);
}
}
_ => {}
}
}
fn replace_strings_with_ids(
&self,
event: &StreamEvent,
dictionary: &HashMap<String, u16>,
) -> Result<serde_json::Value> {
let mut json = serde_json::to_value(event)?;
Self::replace_strings_in_json(&mut json, dictionary);
Ok(json)
}
fn replace_strings_in_json(value: &mut serde_json::Value, dictionary: &HashMap<String, u16>) {
match value {
serde_json::Value::String(s) => {
if let Some(&id) = dictionary.get(s) {
*value = serde_json::Value::Number(serde_json::Number::from(id));
}
}
serde_json::Value::Array(arr) => {
for item in arr {
Self::replace_strings_in_json(item, dictionary);
}
}
serde_json::Value::Object(obj) => {
for val in obj.values_mut() {
Self::replace_strings_in_json(val, dictionary);
}
}
_ => {}
}
}
fn calculate_byte_diff(&self, prev: &[u8], curr: &[u8]) -> Vec<u8> {
let mut diff = Vec::new();
diff.extend_from_slice(&(curr.len() as u32).to_le_bytes());
diff.extend_from_slice(&(prev.len() as u32).to_le_bytes());
diff.extend_from_slice(curr);
diff
}
pub async fn decompress_delta(
&self,
compressed: &DeltaCompressedEvent,
previous_event: Option<&StreamEvent>,
) -> Result<StreamEvent> {
match &compressed.delta {
EventDelta::Full(event) => Ok((**event).clone()),
EventDelta::Xor(xor_bytes) => {
if let Some(prev) = previous_event {
let prev_bytes = serde_json::to_vec(prev)?;
if prev_bytes.len() == xor_bytes.len() {
let restored_bytes: Vec<u8> = prev_bytes
.iter()
.zip(xor_bytes.iter())
.map(|(a, b)| a ^ b)
.collect();
let event = serde_json::from_slice(&restored_bytes)?;
Ok(event)
} else {
Err(anyhow!("XOR delta length mismatch"))
}
} else {
Err(anyhow!("Previous event required for XOR decompression"))
}
}
EventDelta::Prefix(diff) => {
if let Some(prev) = previous_event {
let mut prev_json = serde_json::to_value(prev)?;
self.apply_json_diff(&mut prev_json, diff)?;
let event = serde_json::from_value(prev_json)?;
Ok(event)
} else {
Err(anyhow!("Previous event required for prefix decompression"))
}
}
EventDelta::Dictionary {
dictionary,
compressed_event,
} => {
let mut restored_json = compressed_event.clone();
let reverse_dict: HashMap<u16, String> =
dictionary.iter().map(|(k, &v)| (v, k.clone())).collect();
Self::restore_strings_from_ids(&mut restored_json, &reverse_dict);
let event = serde_json::from_value(restored_json)?;
Ok(event)
}
EventDelta::Lz4(compressed_bytes) => {
let decompressed = oxiarc_lz4::decompress(compressed_bytes, 100 * 1024 * 1024)
.map_err(|e| anyhow!("LZ4 decompression failed: {}", e))?;
let event = serde_json::from_slice(&decompressed)?;
Ok(event)
}
}
}
fn apply_json_diff(
&self,
base: &mut serde_json::Value,
diff: &serde_json::Value,
) -> Result<()> {
if let (Some(base_obj), Some(diff_obj)) = (base.as_object_mut(), diff.as_object()) {
for (key, diff_val) in diff_obj {
base_obj.insert(key.clone(), diff_val.clone());
}
} else {
*base = diff.clone();
}
Ok(())
}
fn restore_strings_from_ids(
value: &mut serde_json::Value,
reverse_dict: &HashMap<u16, String>,
) {
match value {
serde_json::Value::Number(n) => {
if let Some(id) = n.as_u64() {
if let Some(string) = reverse_dict.get(&(id as u16)) {
*value = serde_json::Value::String(string.clone());
}
}
}
serde_json::Value::Array(arr) => {
for item in arr {
Self::restore_strings_from_ids(item, reverse_dict);
}
}
serde_json::Value::Object(obj) => {
for val in obj.values_mut() {
Self::restore_strings_from_ids(val, reverse_dict);
}
}
_ => {}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeltaCompressedEvent {
pub event_id: String,
pub delta: EventDelta,
pub compression_type: DeltaCompressionType,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EventDelta {
Full(Box<StreamEvent>),
Xor(Vec<u8>),
Prefix(serde_json::Value),
Dictionary {
dictionary: HashMap<String, u16>,
compressed_event: serde_json::Value,
},
Lz4(Vec<u8>),
}
pub struct StreamingSerializer {
serializer: EventSerializer,
delta_compressor: Option<DeltaCompressor>,
batch_size: usize,
current_batch: Vec<StreamEvent>,
}
impl StreamingSerializer {
pub fn new(serializer: EventSerializer, batch_size: usize) -> Self {
Self {
serializer,
delta_compressor: None,
batch_size,
current_batch: Vec::new(),
}
}
pub fn with_delta_compression(
mut self,
compression_type: DeltaCompressionType,
max_states: usize,
) -> Self {
self.delta_compressor = Some(DeltaCompressor::new(compression_type, max_states));
self
}
pub async fn add_event(&mut self, event: StreamEvent) -> Result<Option<Bytes>> {
self.current_batch.push(event);
if self.current_batch.len() >= self.batch_size {
self.flush_batch().await
} else {
Ok(None)
}
}
pub async fn flush_batch(&mut self) -> Result<Option<Bytes>> {
if self.current_batch.is_empty() {
return Ok(None);
}
let batch = std::mem::take(&mut self.current_batch);
let serialized = self.serialize_batch(&batch).await?;
Ok(Some(serialized))
}
async fn serialize_batch(&self, batch: &[StreamEvent]) -> Result<Bytes> {
let mut buffer = BytesMut::new();
buffer.put_u32(batch.len() as u32);
buffer.put_u64(chrono::Utc::now().timestamp_millis() as u64);
for event in batch {
let event_data = self.serializer.serialize(event).await?;
buffer.put_u32(event_data.len() as u32);
buffer.put(event_data);
}
Ok(buffer.freeze())
}
pub async fn deserialize_batch(&self, data: &[u8]) -> Result<Vec<StreamEvent>> {
let mut cursor = std::io::Cursor::new(data);
let mut events = Vec::new();
let batch_size = cursor.get_u32();
let _timestamp = cursor.get_u64();
for _ in 0..batch_size {
let event_size = cursor.get_u32() as usize;
let event_data =
&data[cursor.position() as usize..(cursor.position() as usize + event_size)];
cursor.advance(event_size);
let event = self.serializer.deserialize(event_data).await?;
events.push(event);
}
Ok(events)
}
pub fn create_batch_stream(
&self,
events: impl Stream<Item = StreamEvent> + Send + 'static,
) -> BoxStream<'static, Result<Bytes>> {
let serializer = self.serializer.clone();
let batch_size = self.batch_size;
Box::pin(events.chunks(batch_size).then(move |chunk| {
let serializer = serializer.clone();
async move {
let streaming_serializer = StreamingSerializer::new(serializer, batch_size);
streaming_serializer.serialize_batch(&chunk).await
}
}))
}
}
pub struct EnhancedBinaryFormat {
version: u8,
enable_compression: bool,
enable_checksums: bool,
chunk_size: usize,
}
impl EnhancedBinaryFormat {
pub fn new() -> Self {
Self {
version: 2, enable_compression: true,
enable_checksums: true,
chunk_size: 8192, }
}
pub fn with_compression(mut self, enable: bool) -> Self {
self.enable_compression = enable;
self
}
pub fn with_checksums(mut self, enable: bool) -> Self {
self.enable_checksums = enable;
self
}
pub fn with_chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size;
self
}
pub async fn serialize(&self, event: &StreamEvent) -> Result<Bytes> {
let mut buffer = BytesMut::new();
buffer.put(&b"BIN2"[..]); buffer.put_u8(self.version);
buffer.put_u8(self.get_flags());
let event_json = serde_json::to_vec(event)?;
let data = if self.enable_compression {
oxiarc_lz4::compress(&event_json)
.map_err(|e| anyhow!("LZ4 compression failed: {}", e))?
} else {
event_json
};
if self.enable_checksums {
let checksum = crc32fast::hash(&data);
buffer.put_u32(checksum);
}
buffer.put_u32(data.len() as u32);
buffer.put(&data[..]);
Ok(buffer.freeze())
}
pub async fn deserialize(&self, data: &[u8]) -> Result<StreamEvent> {
let mut cursor = std::io::Cursor::new(data);
let mut magic = [0u8; 4];
cursor.read_exact(&mut magic)?;
if &magic != b"BIN2" {
return Err(anyhow!("Invalid magic bytes for enhanced binary format"));
}
let version = cursor.get_u8();
if version != self.version {
return Err(anyhow!(
"Unsupported enhanced binary format version: {}",
version
));
}
let flags = cursor.get_u8();
let has_compression = (flags & 0x01) != 0;
let has_checksum = (flags & 0x02) != 0;
let expected_checksum = if has_checksum {
Some(cursor.get_u32())
} else {
None
};
let data_len = cursor.get_u32() as usize;
let mut event_data = vec![0u8; data_len];
cursor.read_exact(&mut event_data)?;
if let Some(expected) = expected_checksum {
let actual = crc32fast::hash(&event_data);
if actual != expected {
return Err(anyhow!(
"Checksum mismatch: expected {}, got {}",
expected,
actual
));
}
}
let decompressed = if has_compression {
oxiarc_lz4::decompress(&event_data, 100 * 1024 * 1024)
.map_err(|e| anyhow!("LZ4 decompression failed: {}", e))?
} else {
event_data
};
let event = serde_json::from_slice(&decompressed)?;
Ok(event)
}
pub async fn serialize_streaming(&self, event: &StreamEvent) -> Result<Vec<Bytes>> {
let serialized = self.serialize(event).await?;
let mut chunks = Vec::new();
if serialized.len() <= self.chunk_size {
chunks.push(serialized);
} else {
let chunk_count = (serialized.len() + self.chunk_size - 1) / self.chunk_size;
for i in 0..chunk_count {
let start = i * self.chunk_size;
let end = std::cmp::min(start + self.chunk_size, serialized.len());
let mut chunk_buffer = BytesMut::new();
chunk_buffer.put(&b"CHNK"[..]); chunk_buffer.put_u32(i as u32); chunk_buffer.put_u32(chunk_count as u32); chunk_buffer.put_u32((end - start) as u32); chunk_buffer.put(&serialized[start..end]);
chunks.push(chunk_buffer.freeze());
}
}
Ok(chunks)
}
pub async fn deserialize_streaming(&self, chunks: Vec<Bytes>) -> Result<StreamEvent> {
if chunks.len() == 1 && !chunks[0].starts_with(b"CHNK") {
return self.deserialize(&chunks[0]).await;
}
let mut chunk_data: BTreeMap<u32, Vec<u8>> = BTreeMap::new();
let mut total_chunks = 0;
for chunk in chunks {
if !chunk.starts_with(b"CHNK") {
return Err(anyhow!("Invalid chunk format"));
}
let mut cursor = std::io::Cursor::new(&chunk[4..]);
let chunk_index = cursor.get_u32();
let chunk_count = cursor.get_u32();
let chunk_size = cursor.get_u32() as usize;
total_chunks = chunk_count;
let data = chunk[16..16 + chunk_size].to_vec();
chunk_data.insert(chunk_index, data);
}
if chunk_data.len() != total_chunks as usize {
return Err(anyhow!(
"Missing chunks: got {}, expected {}",
chunk_data.len(),
total_chunks
));
}
let mut reassembled = Vec::new();
for (_index, data) in chunk_data {
reassembled.extend(data);
}
self.deserialize(&reassembled).await
}
fn get_flags(&self) -> u8 {
let mut flags = 0u8;
if self.enable_compression {
flags |= 0x01;
}
if self.enable_checksums {
flags |= 0x02;
}
flags
}
}
impl Default for EnhancedBinaryFormat {
fn default() -> Self {
Self::new()
}
}