1use anyhow::{anyhow, Result};
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::RwLock;
17
18use crate::StreamEvent;
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
22pub enum SerializationFormat {
23 Json,
25 Protobuf,
27 Avro,
29 Binary,
31 MessagePack,
33 Cbor,
35}
36
37impl SerializationFormat {
38 pub fn magic_bytes(&self) -> &[u8] {
40 match self {
41 SerializationFormat::Json => b"JSON",
42 SerializationFormat::Protobuf => b"PB03",
43 SerializationFormat::Avro => b"Obj\x01",
44 SerializationFormat::Binary => b"BIN1",
45 SerializationFormat::MessagePack => b"MSGP",
46 SerializationFormat::Cbor => b"CBOR",
47 }
48 }
49
50 pub fn detect(data: &[u8]) -> Option<Self> {
52 if data.len() < 4 {
53 return None;
54 }
55
56 let magic = &data[0..4];
57 match magic {
58 b"JSON" => Some(SerializationFormat::Json),
59 b"PB03" => Some(SerializationFormat::Protobuf),
60 b"Obj\x01" => Some(SerializationFormat::Avro),
61 b"BIN1" => Some(SerializationFormat::Binary),
62 b"MSGP" => Some(SerializationFormat::MessagePack),
63 b"CBOR" => Some(SerializationFormat::Cbor),
64 _ => {
65 if data.starts_with(b"{") || data.starts_with(b"[") {
67 Some(SerializationFormat::Json)
68 } else {
69 None
70 }
71 }
72 }
73 }
74}
75
76#[derive(Debug, Clone)]
78pub struct SerializerOptions {
79 pub include_schema_id: bool,
81 pub include_magic_bytes: bool,
83 pub pretty_json: bool,
85 pub validate_schema: bool,
87 pub max_size: Option<usize>,
89}
90
91impl Default for SerializerOptions {
92 fn default() -> Self {
93 Self {
94 include_schema_id: true,
95 include_magic_bytes: true,
96 pretty_json: false,
97 validate_schema: true,
98 max_size: Some(1024 * 1024), }
100 }
101}
102
103pub struct SchemaRegistry {
105 pub(crate) schemas: Arc<RwLock<HashMap<String, Schema>>>,
106 pub(crate) evolution_rules: EvolutionRules,
108}
109
110#[derive(Debug, Clone)]
112pub struct Schema {
113 pub id: String,
114 pub version: u32,
115 pub format: SerializationFormat,
116 pub definition: SchemaDefinition,
117 pub compatibility: CompatibilityMode,
118}
119
120#[derive(Debug, Clone)]
122pub enum SchemaDefinition {
123 JsonSchema(serde_json::Value),
125 ProtobufDescriptor(Vec<u8>),
127 AvroSchema(String),
129 Custom(HashMap<String, serde_json::Value>),
131}
132
133#[derive(Debug, Clone, Copy)]
135pub enum CompatibilityMode {
136 None,
138 Backward,
140 Forward,
142 Full,
144}
145
146#[derive(Debug, Clone)]
148pub struct EvolutionRules {
149 pub allow_field_addition: bool,
151 pub allow_field_removal: bool,
153 pub allow_type_promotion: bool,
155 pub required_fields: Vec<String>,
157}
158
159impl Default for EvolutionRules {
160 fn default() -> Self {
161 Self {
162 allow_field_addition: true,
163 allow_field_removal: false,
164 allow_type_promotion: true,
165 required_fields: vec!["event_id".to_string(), "timestamp".to_string()],
166 }
167 }
168}
169
170impl SchemaRegistry {
171 pub fn new(evolution_rules: EvolutionRules) -> Self {
173 Self {
174 schemas: Arc::new(RwLock::new(HashMap::new())),
175 evolution_rules,
176 }
177 }
178
179 pub async fn register_schema(&self, schema: Schema) -> Result<String> {
181 let schema_id = schema.id.clone();
182 self.schemas.write().await.insert(schema_id.clone(), schema);
183 Ok(schema_id)
184 }
185
186 pub async fn get_schema(&self, id: &str) -> Result<Schema> {
188 self.schemas
189 .read()
190 .await
191 .get(id)
192 .cloned()
193 .ok_or_else(|| anyhow!("Schema {id} not found"))
194 }
195
196 pub async fn get_schema_id_for_event(&self, _event: &StreamEvent) -> Result<String> {
198 Ok("default-v1".to_string())
200 }
201
202 pub async fn validate_evolution(&self, old_schema: &Schema, new_schema: &Schema) -> Result<()> {
204 match old_schema.compatibility {
205 CompatibilityMode::None => Ok(()),
206 CompatibilityMode::Backward => {
207 self.check_backward_compatibility(old_schema, new_schema)
209 }
210 CompatibilityMode::Forward => {
211 self.check_forward_compatibility(old_schema, new_schema)
213 }
214 CompatibilityMode::Full => {
215 self.check_backward_compatibility(old_schema, new_schema)?;
217 self.check_forward_compatibility(old_schema, new_schema)
218 }
219 }
220 }
221
222 fn check_backward_compatibility(
224 &self,
225 _old_schema: &Schema,
226 _new_schema: &Schema,
227 ) -> Result<()> {
228 Ok(())
230 }
231
232 fn check_forward_compatibility(
234 &self,
235 _old_schema: &Schema,
236 _new_schema: &Schema,
237 ) -> Result<()> {
238 Ok(())
240 }
241
242 pub async fn get_avro_schema_for_event(
244 &self,
245 _event: &StreamEvent,
246 ) -> Result<apache_avro::Schema> {
247 Ok(get_default_avro_schema())
249 }
250}
251
252#[derive(Debug, Clone)]
256pub struct ProtobufStreamEvent {
257 pub event_type: String,
258 pub data: Vec<u8>,
259 pub metadata: Vec<u8>,
260}
261
262impl ProtobufStreamEvent {
263 pub fn from_json(json: &serde_json::Value) -> Result<Self> {
265 let event_type = "StreamEvent".to_string(); let data = serde_json::to_vec(json)?;
270
271 let metadata = Vec::new();
273
274 Ok(Self {
275 event_type,
276 data,
277 metadata,
278 })
279 }
280
281 pub fn to_json(&self) -> Result<serde_json::Value> {
283 serde_json::from_slice(&self.data).map_err(|e| anyhow!("Failed to parse JSON: {}", e))
284 }
285
286 pub fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
288 buf.extend_from_slice(&self.data);
290 Ok(())
291 }
292
293 pub fn decode(data: &[u8]) -> Result<Self> {
295 Ok(Self {
297 event_type: "StreamEvent".to_string(),
298 data: data.to_vec(),
299 metadata: Vec::new(),
300 })
301 }
302}
303
304impl prost::Message for ProtobufStreamEvent {
305 fn encode_raw(&self, buf: &mut impl prost::bytes::BufMut) {
306 buf.put_slice(&self.data);
308 }
309
310 fn merge_field(
311 &mut self,
312 _tag: u32,
313 _wire_type: prost::encoding::WireType,
314 _buf: &mut impl prost::bytes::Buf,
315 _ctx: prost::encoding::DecodeContext,
316 ) -> Result<(), prost::DecodeError> {
317 Ok(())
318 }
319
320 fn encoded_len(&self) -> usize {
321 self.data.len()
322 }
323
324 fn clear(&mut self) {
325 self.data.clear();
326 self.metadata.clear();
327 }
328}
329
330pub fn get_default_avro_schema() -> apache_avro::Schema {
332 let schema_str = r#"
333 {
334 "type": "record",
335 "name": "StreamEvent",
336 "fields": [
337 {"name": "event_type", "type": "string"},
338 {"name": "data", "type": "bytes"},
339 {"name": "metadata", "type": ["null", "bytes"], "default": null}
340 ]
341 }
342 "#;
343
344 apache_avro::Schema::parse_str(schema_str).expect("Failed to parse default Avro schema")
345}
346
347pub fn to_avro_value(
349 event: &StreamEvent,
350 _schema: &apache_avro::Schema,
351) -> Result<apache_avro::types::Value> {
352 let json_data = serde_json::to_vec(event)?;
354
355 let fields = vec![
356 (
357 "event_type".to_string(),
358 apache_avro::types::Value::String("StreamEvent".to_string()),
359 ),
360 (
361 "data".to_string(),
362 apache_avro::types::Value::Bytes(json_data),
363 ),
364 (
365 "metadata".to_string(),
366 apache_avro::types::Value::Union(0, Box::new(apache_avro::types::Value::Null)),
367 ),
368 ];
369
370 Ok(apache_avro::types::Value::Record(fields))
371}
372
373pub fn from_avro_value(
375 value: &apache_avro::types::Value,
376 _schema: &apache_avro::Schema,
377) -> Result<StreamEvent> {
378 match value {
379 apache_avro::types::Value::Record(fields) => {
380 for (name, field_value) in fields {
382 if name == "data" {
383 if let apache_avro::types::Value::Bytes(bytes) = field_value {
384 let event: StreamEvent = serde_json::from_slice(bytes)?;
385 return Ok(event);
386 }
387 }
388 }
389 Err(anyhow!("No data field found in Avro record"))
390 }
391 _ => Err(anyhow!("Expected Avro record, got {:?}", value)),
392 }
393}
394
395#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
397pub enum DeltaCompressionType {
398 Xor,
400 Prefix,
402 Dictionary,
404 Lz4Delta,
406}
407
408#[derive(Debug, Clone, Serialize, Deserialize)]
410pub struct DeltaCompressedEvent {
411 pub event_id: String,
412 pub delta: EventDelta,
413 pub compression_type: DeltaCompressionType,
414 pub timestamp: DateTime<Utc>,
415}
416
417#[derive(Debug, Clone, Serialize, Deserialize)]
419pub enum EventDelta {
420 Full(Box<StreamEvent>),
422 Xor(Vec<u8>),
424 Prefix(serde_json::Value),
426 Dictionary {
428 dictionary: HashMap<String, u16>,
429 compressed_event: serde_json::Value,
430 },
431 Lz4(Vec<u8>),
433}