use anyhow::{anyhow, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::StreamEvent;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SerializationFormat {
Json,
Protobuf,
Avro,
Binary,
MessagePack,
Cbor,
}
impl SerializationFormat {
pub fn magic_bytes(&self) -> &[u8] {
match self {
SerializationFormat::Json => b"JSON",
SerializationFormat::Protobuf => b"PB03",
SerializationFormat::Avro => b"Obj\x01",
SerializationFormat::Binary => b"BIN1",
SerializationFormat::MessagePack => b"MSGP",
SerializationFormat::Cbor => b"CBOR",
}
}
pub fn detect(data: &[u8]) -> Option<Self> {
if data.len() < 4 {
return None;
}
let magic = &data[0..4];
match magic {
b"JSON" => Some(SerializationFormat::Json),
b"PB03" => Some(SerializationFormat::Protobuf),
b"Obj\x01" => Some(SerializationFormat::Avro),
b"BIN1" => Some(SerializationFormat::Binary),
b"MSGP" => Some(SerializationFormat::MessagePack),
b"CBOR" => Some(SerializationFormat::Cbor),
_ => {
if data.starts_with(b"{") || data.starts_with(b"[") {
Some(SerializationFormat::Json)
} else {
None
}
}
}
}
}
#[derive(Debug, Clone)]
pub struct SerializerOptions {
pub include_schema_id: bool,
pub include_magic_bytes: bool,
pub pretty_json: bool,
pub validate_schema: bool,
pub max_size: Option<usize>,
}
impl Default for SerializerOptions {
fn default() -> Self {
Self {
include_schema_id: true,
include_magic_bytes: true,
pretty_json: false,
validate_schema: true,
max_size: Some(1024 * 1024), }
}
}
pub struct SchemaRegistry {
pub(crate) schemas: Arc<RwLock<HashMap<String, Schema>>>,
pub(crate) evolution_rules: EvolutionRules,
}
#[derive(Debug, Clone)]
pub struct Schema {
pub id: String,
pub version: u32,
pub format: SerializationFormat,
pub definition: SchemaDefinition,
pub compatibility: CompatibilityMode,
}
#[derive(Debug, Clone)]
pub enum SchemaDefinition {
JsonSchema(serde_json::Value),
ProtobufDescriptor(Vec<u8>),
AvroSchema(String),
Custom(HashMap<String, serde_json::Value>),
}
#[derive(Debug, Clone, Copy)]
pub enum CompatibilityMode {
None,
Backward,
Forward,
Full,
}
#[derive(Debug, Clone)]
pub struct EvolutionRules {
pub allow_field_addition: bool,
pub allow_field_removal: bool,
pub allow_type_promotion: bool,
pub required_fields: Vec<String>,
}
impl Default for EvolutionRules {
fn default() -> Self {
Self {
allow_field_addition: true,
allow_field_removal: false,
allow_type_promotion: true,
required_fields: vec!["event_id".to_string(), "timestamp".to_string()],
}
}
}
impl SchemaRegistry {
pub fn new(evolution_rules: EvolutionRules) -> Self {
Self {
schemas: Arc::new(RwLock::new(HashMap::new())),
evolution_rules,
}
}
pub async fn register_schema(&self, schema: Schema) -> Result<String> {
let schema_id = schema.id.clone();
self.schemas.write().await.insert(schema_id.clone(), schema);
Ok(schema_id)
}
pub async fn get_schema(&self, id: &str) -> Result<Schema> {
self.schemas
.read()
.await
.get(id)
.cloned()
.ok_or_else(|| anyhow!("Schema {id} not found"))
}
pub async fn get_schema_id_for_event(&self, _event: &StreamEvent) -> Result<String> {
Ok("default-v1".to_string())
}
pub async fn validate_evolution(&self, old_schema: &Schema, new_schema: &Schema) -> Result<()> {
match old_schema.compatibility {
CompatibilityMode::None => Ok(()),
CompatibilityMode::Backward => {
self.check_backward_compatibility(old_schema, new_schema)
}
CompatibilityMode::Forward => {
self.check_forward_compatibility(old_schema, new_schema)
}
CompatibilityMode::Full => {
self.check_backward_compatibility(old_schema, new_schema)?;
self.check_forward_compatibility(old_schema, new_schema)
}
}
}
fn check_backward_compatibility(
&self,
_old_schema: &Schema,
_new_schema: &Schema,
) -> Result<()> {
Ok(())
}
fn check_forward_compatibility(
&self,
_old_schema: &Schema,
_new_schema: &Schema,
) -> Result<()> {
Ok(())
}
pub async fn get_avro_schema_for_event(
&self,
_event: &StreamEvent,
) -> Result<apache_avro::Schema> {
Ok(get_default_avro_schema())
}
}
#[derive(Debug, Clone)]
pub struct ProtobufStreamEvent {
pub event_type: String,
pub data: Vec<u8>,
pub metadata: Vec<u8>,
}
impl ProtobufStreamEvent {
pub fn from_json(json: &serde_json::Value) -> Result<Self> {
let event_type = "StreamEvent".to_string();
let data = serde_json::to_vec(json)?;
let metadata = Vec::new();
Ok(Self {
event_type,
data,
metadata,
})
}
pub fn to_json(&self) -> Result<serde_json::Value> {
serde_json::from_slice(&self.data).map_err(|e| anyhow!("Failed to parse JSON: {}", e))
}
pub fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
buf.extend_from_slice(&self.data);
Ok(())
}
pub fn decode(data: &[u8]) -> Result<Self> {
Ok(Self {
event_type: "StreamEvent".to_string(),
data: data.to_vec(),
metadata: Vec::new(),
})
}
}
impl prost::Message for ProtobufStreamEvent {
fn encode_raw(&self, buf: &mut impl prost::bytes::BufMut) {
buf.put_slice(&self.data);
}
fn merge_field(
&mut self,
_tag: u32,
_wire_type: prost::encoding::WireType,
_buf: &mut impl prost::bytes::Buf,
_ctx: prost::encoding::DecodeContext,
) -> Result<(), prost::DecodeError> {
Ok(())
}
fn encoded_len(&self) -> usize {
self.data.len()
}
fn clear(&mut self) {
self.data.clear();
self.metadata.clear();
}
}
pub fn get_default_avro_schema() -> apache_avro::Schema {
let schema_str = r#"
{
"type": "record",
"name": "StreamEvent",
"fields": [
{"name": "event_type", "type": "string"},
{"name": "data", "type": "bytes"},
{"name": "metadata", "type": ["null", "bytes"], "default": null}
]
}
"#;
apache_avro::Schema::parse_str(schema_str).expect("Failed to parse default Avro schema")
}
pub fn to_avro_value(
event: &StreamEvent,
_schema: &apache_avro::Schema,
) -> Result<apache_avro::types::Value> {
let json_data = serde_json::to_vec(event)?;
let fields = vec![
(
"event_type".to_string(),
apache_avro::types::Value::String("StreamEvent".to_string()),
),
(
"data".to_string(),
apache_avro::types::Value::Bytes(json_data),
),
(
"metadata".to_string(),
apache_avro::types::Value::Union(0, Box::new(apache_avro::types::Value::Null)),
),
];
Ok(apache_avro::types::Value::Record(fields))
}
pub fn from_avro_value(
value: &apache_avro::types::Value,
_schema: &apache_avro::Schema,
) -> Result<StreamEvent> {
match value {
apache_avro::types::Value::Record(fields) => {
for (name, field_value) in fields {
if name == "data" {
if let apache_avro::types::Value::Bytes(bytes) = field_value {
let event: StreamEvent = serde_json::from_slice(bytes)?;
return Ok(event);
}
}
}
Err(anyhow!("No data field found in Avro record"))
}
_ => Err(anyhow!("Expected Avro record, got {:?}", value)),
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum DeltaCompressionType {
Xor,
Prefix,
Dictionary,
Lz4Delta,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeltaCompressedEvent {
pub event_id: String,
pub delta: EventDelta,
pub compression_type: DeltaCompressionType,
pub timestamp: DateTime<Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EventDelta {
Full(Box<StreamEvent>),
Xor(Vec<u8>),
Prefix(serde_json::Value),
Dictionary {
dictionary: HashMap<String, u16>,
compressed_event: serde_json::Value,
},
Lz4(Vec<u8>),
}