Skip to main content

oxirs_stream/
serialization_decoder.rs

1//! Delta compression / decompression for event streams.
2//!
3//! This module provides the [`DeltaCompressor`] which calculates and applies
4//! deltas between successive `StreamEvent` values using one of several
5//! algorithms: XOR, prefix, dictionary, or LZ4-based diffs.
6
7use anyhow::{anyhow, Result};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12use crate::serialization_types::{DeltaCompressedEvent, DeltaCompressionType, EventDelta};
13use crate::StreamEvent;
14
15/// Delta compression support for event streams
16pub struct DeltaCompressor {
17    /// Previous event states for delta calculation
18    previous_states: Arc<RwLock<HashMap<String, StreamEvent>>>,
19    /// Compression algorithm to use
20    compression_type: DeltaCompressionType,
21    /// Maximum states to keep in memory
22    max_states: usize,
23}
24
25impl DeltaCompressor {
26    /// Create a new delta compressor
27    pub fn new(compression_type: DeltaCompressionType, max_states: usize) -> Self {
28        Self {
29            previous_states: Arc::new(RwLock::new(HashMap::new())),
30            compression_type,
31            max_states,
32        }
33    }
34
35    /// Compress event using delta compression
36    pub async fn compress_delta(
37        &self,
38        event: &StreamEvent,
39        event_id: &str,
40    ) -> Result<DeltaCompressedEvent> {
41        let mut states = self.previous_states.write().await;
42
43        // Clean up old states if we exceed the limit
44        if states.len() >= self.max_states {
45            let keys_to_remove: Vec<String> = states
46                .keys()
47                .take(states.len() - self.max_states + 1)
48                .cloned()
49                .collect();
50            for key in keys_to_remove {
51                states.remove(&key);
52            }
53        }
54
55        let delta = if let Some(previous) = states.get(event_id) {
56            self.calculate_delta(previous, event)?
57        } else {
58            // First event, store as full event
59            EventDelta::Full(Box::new(event.clone()))
60        };
61
62        // Update state
63        states.insert(event_id.to_string(), event.clone());
64
65        Ok(DeltaCompressedEvent {
66            event_id: event_id.to_string(),
67            delta,
68            compression_type: self.compression_type,
69            timestamp: chrono::Utc::now(),
70        })
71    }
72
73    /// Calculate delta between two events
74    fn calculate_delta(&self, previous: &StreamEvent, current: &StreamEvent) -> Result<EventDelta> {
75        match self.compression_type {
76            DeltaCompressionType::Xor => self.calculate_xor_delta(previous, current),
77            DeltaCompressionType::Prefix => self.calculate_prefix_delta(previous, current),
78            DeltaCompressionType::Dictionary => self.calculate_dictionary_delta(previous, current),
79            DeltaCompressionType::Lz4Delta => self.calculate_lz4_delta(previous, current),
80        }
81    }
82
83    /// XOR-based delta compression
84    fn calculate_xor_delta(
85        &self,
86        previous: &StreamEvent,
87        current: &StreamEvent,
88    ) -> Result<EventDelta> {
89        let prev_bytes = serde_json::to_vec(previous)?;
90        let curr_bytes = serde_json::to_vec(current)?;
91
92        if prev_bytes.len() != curr_bytes.len() {
93            // If sizes differ, store as full event
94            return Ok(EventDelta::Full(Box::new(current.clone())));
95        }
96
97        let xor_bytes: Vec<u8> = prev_bytes
98            .iter()
99            .zip(curr_bytes.iter())
100            .map(|(a, b)| a ^ b)
101            .collect();
102
103        Ok(EventDelta::Xor(xor_bytes))
104    }
105
106    /// Prefix compression for string fields
107    fn calculate_prefix_delta(
108        &self,
109        previous: &StreamEvent,
110        current: &StreamEvent,
111    ) -> Result<EventDelta> {
112        let prev_json = serde_json::to_value(previous)?;
113        let curr_json = serde_json::to_value(current)?;
114
115        let diff = self.calculate_json_prefix_diff(&prev_json, &curr_json)?;
116        Ok(EventDelta::Prefix(diff))
117    }
118
119    /// Dictionary-based compression
120    fn calculate_dictionary_delta(
121        &self,
122        previous: &StreamEvent,
123        current: &StreamEvent,
124    ) -> Result<EventDelta> {
125        let prev_strings = self.extract_strings_from_event(previous);
126        let curr_strings = self.extract_strings_from_event(current);
127
128        let mut dictionary = HashMap::new();
129        let mut dict_id = 0u16;
130
131        // Build dictionary from common strings
132        for string in &prev_strings {
133            if curr_strings.contains(string) && !dictionary.contains_key(string) {
134                dictionary.insert(string.clone(), dict_id);
135                dict_id += 1;
136            }
137        }
138
139        // Replace strings with dictionary IDs
140        let compressed_event = self.replace_strings_with_ids(current, &dictionary)?;
141
142        Ok(EventDelta::Dictionary {
143            dictionary,
144            compressed_event,
145        })
146    }
147
148    /// LZ4-based delta compression
149    fn calculate_lz4_delta(
150        &self,
151        previous: &StreamEvent,
152        current: &StreamEvent,
153    ) -> Result<EventDelta> {
154        let prev_bytes = serde_json::to_vec(previous)?;
155        let curr_bytes = serde_json::to_vec(current)?;
156
157        // Simple delta: store additions and removals
158        let diff_bytes = self.calculate_byte_diff(&prev_bytes, &curr_bytes);
159        let compressed = oxiarc_lz4::compress(&diff_bytes)
160            .map_err(|e| anyhow!("LZ4 compression failed: {}", e))?;
161
162        Ok(EventDelta::Lz4(compressed))
163    }
164
165    /// Calculate JSON prefix differences
166    fn calculate_json_prefix_diff(
167        &self,
168        prev: &serde_json::Value,
169        curr: &serde_json::Value,
170    ) -> Result<serde_json::Value> {
171        match (prev, curr) {
172            (serde_json::Value::Object(prev_obj), serde_json::Value::Object(curr_obj)) => {
173                let mut diff = serde_json::Map::new();
174                for (key, curr_val) in curr_obj {
175                    if let Some(prev_val) = prev_obj.get(key) {
176                        if prev_val != curr_val {
177                            diff.insert(key.clone(), curr_val.clone());
178                        }
179                    } else {
180                        diff.insert(key.clone(), curr_val.clone());
181                    }
182                }
183                Ok(serde_json::Value::Object(diff))
184            }
185            _ => Ok(curr.clone()),
186        }
187    }
188
189    /// Extract all strings from an event
190    fn extract_strings_from_event(&self, event: &StreamEvent) -> Vec<String> {
191        let mut strings = Vec::new();
192        if let Ok(json) = serde_json::to_value(event) {
193            Self::extract_strings_from_json(&json, &mut strings);
194        }
195        strings
196    }
197
198    /// Recursively extract strings from JSON value
199    fn extract_strings_from_json(value: &serde_json::Value, strings: &mut Vec<String>) {
200        match value {
201            serde_json::Value::String(s) => strings.push(s.clone()),
202            serde_json::Value::Array(arr) => {
203                for item in arr {
204                    Self::extract_strings_from_json(item, strings);
205                }
206            }
207            serde_json::Value::Object(obj) => {
208                for (_, val) in obj {
209                    Self::extract_strings_from_json(val, strings);
210                }
211            }
212            _ => {}
213        }
214    }
215
216    /// Replace strings with dictionary IDs
217    fn replace_strings_with_ids(
218        &self,
219        event: &StreamEvent,
220        dictionary: &HashMap<String, u16>,
221    ) -> Result<serde_json::Value> {
222        let mut json = serde_json::to_value(event)?;
223        Self::replace_strings_in_json(&mut json, dictionary);
224        Ok(json)
225    }
226
227    /// Recursively replace strings in JSON
228    fn replace_strings_in_json(value: &mut serde_json::Value, dictionary: &HashMap<String, u16>) {
229        match value {
230            serde_json::Value::String(s) => {
231                if let Some(&id) = dictionary.get(s) {
232                    *value = serde_json::Value::Number(serde_json::Number::from(id));
233                }
234            }
235            serde_json::Value::Array(arr) => {
236                for item in arr {
237                    Self::replace_strings_in_json(item, dictionary);
238                }
239            }
240            serde_json::Value::Object(obj) => {
241                for val in obj.values_mut() {
242                    Self::replace_strings_in_json(val, dictionary);
243                }
244            }
245            _ => {}
246        }
247    }
248
249    /// Calculate byte-level differences
250    fn calculate_byte_diff(&self, prev: &[u8], curr: &[u8]) -> Vec<u8> {
251        // Simple implementation - could be enhanced with more sophisticated diff algorithms
252        let mut diff = Vec::new();
253
254        // Store length difference
255        diff.extend_from_slice(&(curr.len() as u32).to_le_bytes());
256        diff.extend_from_slice(&(prev.len() as u32).to_le_bytes());
257
258        // Store the current bytes (simplified)
259        diff.extend_from_slice(curr);
260
261        diff
262    }
263
264    /// Decompress delta-compressed event
265    pub async fn decompress_delta(
266        &self,
267        compressed: &DeltaCompressedEvent,
268        previous_event: Option<&StreamEvent>,
269    ) -> Result<StreamEvent> {
270        match &compressed.delta {
271            EventDelta::Full(event) => Ok((**event).clone()),
272            EventDelta::Xor(xor_bytes) => {
273                if let Some(prev) = previous_event {
274                    let prev_bytes = serde_json::to_vec(prev)?;
275                    if prev_bytes.len() == xor_bytes.len() {
276                        let restored_bytes: Vec<u8> = prev_bytes
277                            .iter()
278                            .zip(xor_bytes.iter())
279                            .map(|(a, b)| a ^ b)
280                            .collect();
281                        let event = serde_json::from_slice(&restored_bytes)?;
282                        Ok(event)
283                    } else {
284                        Err(anyhow!("XOR delta length mismatch"))
285                    }
286                } else {
287                    Err(anyhow!("Previous event required for XOR decompression"))
288                }
289            }
290            EventDelta::Prefix(diff) => {
291                if let Some(prev) = previous_event {
292                    let mut prev_json = serde_json::to_value(prev)?;
293                    self.apply_json_diff(&mut prev_json, diff)?;
294                    let event = serde_json::from_value(prev_json)?;
295                    Ok(event)
296                } else {
297                    Err(anyhow!("Previous event required for prefix decompression"))
298                }
299            }
300            EventDelta::Dictionary {
301                dictionary,
302                compressed_event,
303            } => {
304                let mut restored_json = compressed_event.clone();
305                let reverse_dict: HashMap<u16, String> =
306                    dictionary.iter().map(|(k, &v)| (v, k.clone())).collect();
307                Self::restore_strings_from_ids(&mut restored_json, &reverse_dict);
308                let event = serde_json::from_value(restored_json)?;
309                Ok(event)
310            }
311            EventDelta::Lz4(compressed_bytes) => {
312                let decompressed = oxiarc_lz4::decompress(compressed_bytes, 100 * 1024 * 1024)
313                    .map_err(|e| anyhow!("LZ4 decompression failed: {}", e))?;
314                // Restore from diff (simplified - would need more sophisticated restoration)
315                let event = serde_json::from_slice(&decompressed)?;
316                Ok(event)
317            }
318        }
319    }
320
321    /// Apply JSON diff to base JSON
322    fn apply_json_diff(
323        &self,
324        base: &mut serde_json::Value,
325        diff: &serde_json::Value,
326    ) -> Result<()> {
327        if let (Some(base_obj), Some(diff_obj)) = (base.as_object_mut(), diff.as_object()) {
328            for (key, diff_val) in diff_obj {
329                base_obj.insert(key.clone(), diff_val.clone());
330            }
331        } else {
332            *base = diff.clone();
333        }
334        Ok(())
335    }
336
337    /// Restore strings from dictionary IDs
338    fn restore_strings_from_ids(
339        value: &mut serde_json::Value,
340        reverse_dict: &HashMap<u16, String>,
341    ) {
342        match value {
343            serde_json::Value::Number(n) => {
344                if let Some(id) = n.as_u64() {
345                    if let Some(string) = reverse_dict.get(&(id as u16)) {
346                        *value = serde_json::Value::String(string.clone());
347                    }
348                }
349            }
350            serde_json::Value::Array(arr) => {
351                for item in arr {
352                    Self::restore_strings_from_ids(item, reverse_dict);
353                }
354            }
355            serde_json::Value::Object(obj) => {
356                for val in obj.values_mut() {
357                    Self::restore_strings_from_ids(val, reverse_dict);
358                }
359            }
360            _ => {}
361        }
362    }
363}