Skip to main content

oxirs_stream/
serialization_types.rs

1//! Serialization type definitions and schema management.
2//!
3//! This module contains the core types used across the serialization subsystem:
4//! - [`SerializationFormat`] enum and format detection
5//! - [`SerializerOptions`] and configuration
6//! - [`SchemaRegistry`], [`Schema`], [`SchemaDefinition`], [`CompatibilityMode`]
7//! - [`EvolutionRules`]
8//! - [`DeltaCompressionType`], [`DeltaCompressedEvent`], [`EventDelta`]
9//! - [`ProtobufStreamEvent`] and Avro helper functions
10
11use anyhow::{anyhow, Result};
12use chrono::{DateTime, Utc};
13use serde::{Deserialize, Serialize};
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::RwLock;
17
18use crate::StreamEvent;
19
20/// Serialization format types
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
22pub enum SerializationFormat {
23    /// JSON format (human-readable)
24    Json,
25    /// Protocol Buffers (efficient binary)
26    Protobuf,
27    /// Apache Avro (schema-based)
28    Avro,
29    /// Custom binary format
30    Binary,
31    /// MessagePack format
32    MessagePack,
33    /// CBOR (Concise Binary Object Representation)
34    Cbor,
35}
36
37impl SerializationFormat {
38    /// Get format identifier bytes
39    pub fn magic_bytes(&self) -> &[u8] {
40        match self {
41            SerializationFormat::Json => b"JSON",
42            SerializationFormat::Protobuf => b"PB03",
43            SerializationFormat::Avro => b"Obj\x01",
44            SerializationFormat::Binary => b"BIN1",
45            SerializationFormat::MessagePack => b"MSGP",
46            SerializationFormat::Cbor => b"CBOR",
47        }
48    }
49
50    /// Detect format from magic bytes
51    pub fn detect(data: &[u8]) -> Option<Self> {
52        if data.len() < 4 {
53            return None;
54        }
55
56        let magic = &data[0..4];
57        match magic {
58            b"JSON" => Some(SerializationFormat::Json),
59            b"PB03" => Some(SerializationFormat::Protobuf),
60            b"Obj\x01" => Some(SerializationFormat::Avro),
61            b"BIN1" => Some(SerializationFormat::Binary),
62            b"MSGP" => Some(SerializationFormat::MessagePack),
63            b"CBOR" => Some(SerializationFormat::Cbor),
64            _ => {
65                // Try to detect JSON by checking for common patterns
66                if data.starts_with(b"{") || data.starts_with(b"[") {
67                    Some(SerializationFormat::Json)
68                } else {
69                    None
70                }
71            }
72        }
73    }
74}
75
76/// Serializer options
77#[derive(Debug, Clone)]
78pub struct SerializerOptions {
79    /// Include schema ID in serialized data
80    pub include_schema_id: bool,
81    /// Include format magic bytes
82    pub include_magic_bytes: bool,
83    /// Pretty print JSON
84    pub pretty_json: bool,
85    /// Validate against schema
86    pub validate_schema: bool,
87    /// Maximum serialized size
88    pub max_size: Option<usize>,
89}
90
91impl Default for SerializerOptions {
92    fn default() -> Self {
93        Self {
94            include_schema_id: true,
95            include_magic_bytes: true,
96            pretty_json: false,
97            validate_schema: true,
98            max_size: Some(1024 * 1024), // 1MB default
99        }
100    }
101}
102
103/// Schema registry for managing schemas
104pub struct SchemaRegistry {
105    pub(crate) schemas: Arc<RwLock<HashMap<String, Schema>>>,
106    /// Schema evolution rules
107    pub(crate) evolution_rules: EvolutionRules,
108}
109
110/// Schema definition
111#[derive(Debug, Clone)]
112pub struct Schema {
113    pub id: String,
114    pub version: u32,
115    pub format: SerializationFormat,
116    pub definition: SchemaDefinition,
117    pub compatibility: CompatibilityMode,
118}
119
120/// Schema definition types
121#[derive(Debug, Clone)]
122pub enum SchemaDefinition {
123    /// JSON Schema
124    JsonSchema(serde_json::Value),
125    /// Protobuf descriptor
126    ProtobufDescriptor(Vec<u8>),
127    /// Avro schema
128    AvroSchema(String),
129    /// Custom schema
130    Custom(HashMap<String, serde_json::Value>),
131}
132
133/// Schema compatibility modes
134#[derive(Debug, Clone, Copy)]
135pub enum CompatibilityMode {
136    /// No compatibility checking
137    None,
138    /// Can read previous version
139    Backward,
140    /// Can read next version
141    Forward,
142    /// Can read both previous and next
143    Full,
144}
145
146/// Schema evolution rules
147#[derive(Debug, Clone)]
148pub struct EvolutionRules {
149    /// Allow field addition
150    pub allow_field_addition: bool,
151    /// Allow field removal
152    pub allow_field_removal: bool,
153    /// Allow type promotion
154    pub allow_type_promotion: bool,
155    /// Required fields
156    pub required_fields: Vec<String>,
157}
158
159impl Default for EvolutionRules {
160    fn default() -> Self {
161        Self {
162            allow_field_addition: true,
163            allow_field_removal: false,
164            allow_type_promotion: true,
165            required_fields: vec!["event_id".to_string(), "timestamp".to_string()],
166        }
167    }
168}
169
170impl SchemaRegistry {
171    /// Create a new schema registry
172    pub fn new(evolution_rules: EvolutionRules) -> Self {
173        Self {
174            schemas: Arc::new(RwLock::new(HashMap::new())),
175            evolution_rules,
176        }
177    }
178
179    /// Register a schema
180    pub async fn register_schema(&self, schema: Schema) -> Result<String> {
181        let schema_id = schema.id.clone();
182        self.schemas.write().await.insert(schema_id.clone(), schema);
183        Ok(schema_id)
184    }
185
186    /// Get schema by ID
187    pub async fn get_schema(&self, id: &str) -> Result<Schema> {
188        self.schemas
189            .read()
190            .await
191            .get(id)
192            .cloned()
193            .ok_or_else(|| anyhow!("Schema {id} not found"))
194    }
195
196    /// Get schema ID for an event
197    pub async fn get_schema_id_for_event(&self, _event: &StreamEvent) -> Result<String> {
198        // In a real implementation, this would determine the appropriate schema
199        Ok("default-v1".to_string())
200    }
201
202    /// Validate schema evolution
203    pub async fn validate_evolution(&self, old_schema: &Schema, new_schema: &Schema) -> Result<()> {
204        match old_schema.compatibility {
205            CompatibilityMode::None => Ok(()),
206            CompatibilityMode::Backward => {
207                // Check if new schema can read old data
208                self.check_backward_compatibility(old_schema, new_schema)
209            }
210            CompatibilityMode::Forward => {
211                // Check if old schema can read new data
212                self.check_forward_compatibility(old_schema, new_schema)
213            }
214            CompatibilityMode::Full => {
215                // Check both directions
216                self.check_backward_compatibility(old_schema, new_schema)?;
217                self.check_forward_compatibility(old_schema, new_schema)
218            }
219        }
220    }
221
222    /// Check backward compatibility
223    fn check_backward_compatibility(
224        &self,
225        _old_schema: &Schema,
226        _new_schema: &Schema,
227    ) -> Result<()> {
228        // Implementation would check if new schema can read old data
229        Ok(())
230    }
231
232    /// Check forward compatibility
233    fn check_forward_compatibility(
234        &self,
235        _old_schema: &Schema,
236        _new_schema: &Schema,
237    ) -> Result<()> {
238        // Implementation would check if old schema can read new data
239        Ok(())
240    }
241
242    /// Get Avro schema for event
243    pub async fn get_avro_schema_for_event(
244        &self,
245        _event: &StreamEvent,
246    ) -> Result<apache_avro::Schema> {
247        // In practice, this would look up the appropriate schema
248        Ok(get_default_avro_schema())
249    }
250}
251
252/// Protobuf representation of StreamEvent.
253///
254/// This is a simplified version - in practice you'd use proper .proto definitions.
255#[derive(Debug, Clone)]
256pub struct ProtobufStreamEvent {
257    pub event_type: String,
258    pub data: Vec<u8>,
259    pub metadata: Vec<u8>,
260}
261
262impl ProtobufStreamEvent {
263    /// Convert from JSON value
264    pub fn from_json(json: &serde_json::Value) -> Result<Self> {
265        // Extract event type
266        let event_type = "StreamEvent".to_string(); // Simplified
267
268        // Serialize the entire JSON as data
269        let data = serde_json::to_vec(json)?;
270
271        // Empty metadata for now
272        let metadata = Vec::new();
273
274        Ok(Self {
275            event_type,
276            data,
277            metadata,
278        })
279    }
280
281    /// Convert to JSON value
282    pub fn to_json(&self) -> Result<serde_json::Value> {
283        serde_json::from_slice(&self.data).map_err(|e| anyhow!("Failed to parse JSON: {}", e))
284    }
285
286    /// Encode using prost
287    pub fn encode(&self, buf: &mut Vec<u8>) -> Result<()> {
288        // Simplified encoding - in practice use proper prost::Message
289        buf.extend_from_slice(&self.data);
290        Ok(())
291    }
292
293    /// Decode using prost
294    pub fn decode(data: &[u8]) -> Result<Self> {
295        // Simplified decoding - in practice use proper prost::Message
296        Ok(Self {
297            event_type: "StreamEvent".to_string(),
298            data: data.to_vec(),
299            metadata: Vec::new(),
300        })
301    }
302}
303
304impl prost::Message for ProtobufStreamEvent {
305    fn encode_raw(&self, buf: &mut impl prost::bytes::BufMut) {
306        // Simplified implementation
307        buf.put_slice(&self.data);
308    }
309
310    fn merge_field(
311        &mut self,
312        _tag: u32,
313        _wire_type: prost::encoding::WireType,
314        _buf: &mut impl prost::bytes::Buf,
315        _ctx: prost::encoding::DecodeContext,
316    ) -> Result<(), prost::DecodeError> {
317        Ok(())
318    }
319
320    fn encoded_len(&self) -> usize {
321        self.data.len()
322    }
323
324    fn clear(&mut self) {
325        self.data.clear();
326        self.metadata.clear();
327    }
328}
329
330/// Get default Avro schema for StreamEvent
331pub fn get_default_avro_schema() -> apache_avro::Schema {
332    let schema_str = r#"
333    {
334        "type": "record",
335        "name": "StreamEvent",
336        "fields": [
337            {"name": "event_type", "type": "string"},
338            {"name": "data", "type": "bytes"},
339            {"name": "metadata", "type": ["null", "bytes"], "default": null}
340        ]
341    }
342    "#;
343
344    apache_avro::Schema::parse_str(schema_str).expect("Failed to parse default Avro schema")
345}
346
347/// Convert StreamEvent to Avro value
348pub fn to_avro_value(
349    event: &StreamEvent,
350    _schema: &apache_avro::Schema,
351) -> Result<apache_avro::types::Value> {
352    // Simplified conversion - serialize to JSON then to bytes
353    let json_data = serde_json::to_vec(event)?;
354
355    let fields = vec![
356        (
357            "event_type".to_string(),
358            apache_avro::types::Value::String("StreamEvent".to_string()),
359        ),
360        (
361            "data".to_string(),
362            apache_avro::types::Value::Bytes(json_data),
363        ),
364        (
365            "metadata".to_string(),
366            apache_avro::types::Value::Union(0, Box::new(apache_avro::types::Value::Null)),
367        ),
368    ];
369
370    Ok(apache_avro::types::Value::Record(fields))
371}
372
373/// Convert Avro value to StreamEvent
374pub fn from_avro_value(
375    value: &apache_avro::types::Value,
376    _schema: &apache_avro::Schema,
377) -> Result<StreamEvent> {
378    match value {
379        apache_avro::types::Value::Record(fields) => {
380            // Extract data field
381            for (name, field_value) in fields {
382                if name == "data" {
383                    if let apache_avro::types::Value::Bytes(bytes) = field_value {
384                        let event: StreamEvent = serde_json::from_slice(bytes)?;
385                        return Ok(event);
386                    }
387                }
388            }
389            Err(anyhow!("No data field found in Avro record"))
390        }
391        _ => Err(anyhow!("Expected Avro record, got {:?}", value)),
392    }
393}
394
395/// Delta compression algorithms
396#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
397pub enum DeltaCompressionType {
398    /// XOR-based delta compression
399    Xor,
400    /// Prefix compression for strings
401    Prefix,
402    /// Dictionary-based compression
403    Dictionary,
404    /// LZ4-based delta compression
405    Lz4Delta,
406}
407
408/// Delta-compressed event representation
409#[derive(Debug, Clone, Serialize, Deserialize)]
410pub struct DeltaCompressedEvent {
411    pub event_id: String,
412    pub delta: EventDelta,
413    pub compression_type: DeltaCompressionType,
414    pub timestamp: DateTime<Utc>,
415}
416
417/// Event delta representations
418#[derive(Debug, Clone, Serialize, Deserialize)]
419pub enum EventDelta {
420    /// Full event (no compression possible)
421    Full(Box<StreamEvent>),
422    /// XOR-based delta
423    Xor(Vec<u8>),
424    /// Prefix-based delta
425    Prefix(serde_json::Value),
426    /// Dictionary-based compression
427    Dictionary {
428        dictionary: HashMap<String, u16>,
429        compressed_event: serde_json::Value,
430    },
431    /// LZ4 compressed delta
432    Lz4(Vec<u8>),
433}