1use anyhow::{anyhow, Result};
8use std::collections::HashMap;
9use std::sync::Arc;
10use tokio::sync::RwLock;
11
12use crate::serialization_types::{DeltaCompressedEvent, DeltaCompressionType, EventDelta};
13use crate::StreamEvent;
14
15pub struct DeltaCompressor {
17 previous_states: Arc<RwLock<HashMap<String, StreamEvent>>>,
19 compression_type: DeltaCompressionType,
21 max_states: usize,
23}
24
25impl DeltaCompressor {
26 pub fn new(compression_type: DeltaCompressionType, max_states: usize) -> Self {
28 Self {
29 previous_states: Arc::new(RwLock::new(HashMap::new())),
30 compression_type,
31 max_states,
32 }
33 }
34
35 pub async fn compress_delta(
37 &self,
38 event: &StreamEvent,
39 event_id: &str,
40 ) -> Result<DeltaCompressedEvent> {
41 let mut states = self.previous_states.write().await;
42
43 if states.len() >= self.max_states {
45 let keys_to_remove: Vec<String> = states
46 .keys()
47 .take(states.len() - self.max_states + 1)
48 .cloned()
49 .collect();
50 for key in keys_to_remove {
51 states.remove(&key);
52 }
53 }
54
55 let delta = if let Some(previous) = states.get(event_id) {
56 self.calculate_delta(previous, event)?
57 } else {
58 EventDelta::Full(Box::new(event.clone()))
60 };
61
62 states.insert(event_id.to_string(), event.clone());
64
65 Ok(DeltaCompressedEvent {
66 event_id: event_id.to_string(),
67 delta,
68 compression_type: self.compression_type,
69 timestamp: chrono::Utc::now(),
70 })
71 }
72
73 fn calculate_delta(&self, previous: &StreamEvent, current: &StreamEvent) -> Result<EventDelta> {
75 match self.compression_type {
76 DeltaCompressionType::Xor => self.calculate_xor_delta(previous, current),
77 DeltaCompressionType::Prefix => self.calculate_prefix_delta(previous, current),
78 DeltaCompressionType::Dictionary => self.calculate_dictionary_delta(previous, current),
79 DeltaCompressionType::Lz4Delta => self.calculate_lz4_delta(previous, current),
80 }
81 }
82
83 fn calculate_xor_delta(
85 &self,
86 previous: &StreamEvent,
87 current: &StreamEvent,
88 ) -> Result<EventDelta> {
89 let prev_bytes = serde_json::to_vec(previous)?;
90 let curr_bytes = serde_json::to_vec(current)?;
91
92 if prev_bytes.len() != curr_bytes.len() {
93 return Ok(EventDelta::Full(Box::new(current.clone())));
95 }
96
97 let xor_bytes: Vec<u8> = prev_bytes
98 .iter()
99 .zip(curr_bytes.iter())
100 .map(|(a, b)| a ^ b)
101 .collect();
102
103 Ok(EventDelta::Xor(xor_bytes))
104 }
105
106 fn calculate_prefix_delta(
108 &self,
109 previous: &StreamEvent,
110 current: &StreamEvent,
111 ) -> Result<EventDelta> {
112 let prev_json = serde_json::to_value(previous)?;
113 let curr_json = serde_json::to_value(current)?;
114
115 let diff = self.calculate_json_prefix_diff(&prev_json, &curr_json)?;
116 Ok(EventDelta::Prefix(diff))
117 }
118
119 fn calculate_dictionary_delta(
121 &self,
122 previous: &StreamEvent,
123 current: &StreamEvent,
124 ) -> Result<EventDelta> {
125 let prev_strings = self.extract_strings_from_event(previous);
126 let curr_strings = self.extract_strings_from_event(current);
127
128 let mut dictionary = HashMap::new();
129 let mut dict_id = 0u16;
130
131 for string in &prev_strings {
133 if curr_strings.contains(string) && !dictionary.contains_key(string) {
134 dictionary.insert(string.clone(), dict_id);
135 dict_id += 1;
136 }
137 }
138
139 let compressed_event = self.replace_strings_with_ids(current, &dictionary)?;
141
142 Ok(EventDelta::Dictionary {
143 dictionary,
144 compressed_event,
145 })
146 }
147
148 fn calculate_lz4_delta(
150 &self,
151 previous: &StreamEvent,
152 current: &StreamEvent,
153 ) -> Result<EventDelta> {
154 let prev_bytes = serde_json::to_vec(previous)?;
155 let curr_bytes = serde_json::to_vec(current)?;
156
157 let diff_bytes = self.calculate_byte_diff(&prev_bytes, &curr_bytes);
159 let compressed = oxiarc_lz4::compress(&diff_bytes)
160 .map_err(|e| anyhow!("LZ4 compression failed: {}", e))?;
161
162 Ok(EventDelta::Lz4(compressed))
163 }
164
165 fn calculate_json_prefix_diff(
167 &self,
168 prev: &serde_json::Value,
169 curr: &serde_json::Value,
170 ) -> Result<serde_json::Value> {
171 match (prev, curr) {
172 (serde_json::Value::Object(prev_obj), serde_json::Value::Object(curr_obj)) => {
173 let mut diff = serde_json::Map::new();
174 for (key, curr_val) in curr_obj {
175 if let Some(prev_val) = prev_obj.get(key) {
176 if prev_val != curr_val {
177 diff.insert(key.clone(), curr_val.clone());
178 }
179 } else {
180 diff.insert(key.clone(), curr_val.clone());
181 }
182 }
183 Ok(serde_json::Value::Object(diff))
184 }
185 _ => Ok(curr.clone()),
186 }
187 }
188
189 fn extract_strings_from_event(&self, event: &StreamEvent) -> Vec<String> {
191 let mut strings = Vec::new();
192 if let Ok(json) = serde_json::to_value(event) {
193 Self::extract_strings_from_json(&json, &mut strings);
194 }
195 strings
196 }
197
198 fn extract_strings_from_json(value: &serde_json::Value, strings: &mut Vec<String>) {
200 match value {
201 serde_json::Value::String(s) => strings.push(s.clone()),
202 serde_json::Value::Array(arr) => {
203 for item in arr {
204 Self::extract_strings_from_json(item, strings);
205 }
206 }
207 serde_json::Value::Object(obj) => {
208 for (_, val) in obj {
209 Self::extract_strings_from_json(val, strings);
210 }
211 }
212 _ => {}
213 }
214 }
215
216 fn replace_strings_with_ids(
218 &self,
219 event: &StreamEvent,
220 dictionary: &HashMap<String, u16>,
221 ) -> Result<serde_json::Value> {
222 let mut json = serde_json::to_value(event)?;
223 Self::replace_strings_in_json(&mut json, dictionary);
224 Ok(json)
225 }
226
227 fn replace_strings_in_json(value: &mut serde_json::Value, dictionary: &HashMap<String, u16>) {
229 match value {
230 serde_json::Value::String(s) => {
231 if let Some(&id) = dictionary.get(s) {
232 *value = serde_json::Value::Number(serde_json::Number::from(id));
233 }
234 }
235 serde_json::Value::Array(arr) => {
236 for item in arr {
237 Self::replace_strings_in_json(item, dictionary);
238 }
239 }
240 serde_json::Value::Object(obj) => {
241 for val in obj.values_mut() {
242 Self::replace_strings_in_json(val, dictionary);
243 }
244 }
245 _ => {}
246 }
247 }
248
249 fn calculate_byte_diff(&self, prev: &[u8], curr: &[u8]) -> Vec<u8> {
251 let mut diff = Vec::new();
253
254 diff.extend_from_slice(&(curr.len() as u32).to_le_bytes());
256 diff.extend_from_slice(&(prev.len() as u32).to_le_bytes());
257
258 diff.extend_from_slice(curr);
260
261 diff
262 }
263
264 pub async fn decompress_delta(
266 &self,
267 compressed: &DeltaCompressedEvent,
268 previous_event: Option<&StreamEvent>,
269 ) -> Result<StreamEvent> {
270 match &compressed.delta {
271 EventDelta::Full(event) => Ok((**event).clone()),
272 EventDelta::Xor(xor_bytes) => {
273 if let Some(prev) = previous_event {
274 let prev_bytes = serde_json::to_vec(prev)?;
275 if prev_bytes.len() == xor_bytes.len() {
276 let restored_bytes: Vec<u8> = prev_bytes
277 .iter()
278 .zip(xor_bytes.iter())
279 .map(|(a, b)| a ^ b)
280 .collect();
281 let event = serde_json::from_slice(&restored_bytes)?;
282 Ok(event)
283 } else {
284 Err(anyhow!("XOR delta length mismatch"))
285 }
286 } else {
287 Err(anyhow!("Previous event required for XOR decompression"))
288 }
289 }
290 EventDelta::Prefix(diff) => {
291 if let Some(prev) = previous_event {
292 let mut prev_json = serde_json::to_value(prev)?;
293 self.apply_json_diff(&mut prev_json, diff)?;
294 let event = serde_json::from_value(prev_json)?;
295 Ok(event)
296 } else {
297 Err(anyhow!("Previous event required for prefix decompression"))
298 }
299 }
300 EventDelta::Dictionary {
301 dictionary,
302 compressed_event,
303 } => {
304 let mut restored_json = compressed_event.clone();
305 let reverse_dict: HashMap<u16, String> =
306 dictionary.iter().map(|(k, &v)| (v, k.clone())).collect();
307 Self::restore_strings_from_ids(&mut restored_json, &reverse_dict);
308 let event = serde_json::from_value(restored_json)?;
309 Ok(event)
310 }
311 EventDelta::Lz4(compressed_bytes) => {
312 let decompressed = oxiarc_lz4::decompress(compressed_bytes, 100 * 1024 * 1024)
313 .map_err(|e| anyhow!("LZ4 decompression failed: {}", e))?;
314 let event = serde_json::from_slice(&decompressed)?;
316 Ok(event)
317 }
318 }
319 }
320
321 fn apply_json_diff(
323 &self,
324 base: &mut serde_json::Value,
325 diff: &serde_json::Value,
326 ) -> Result<()> {
327 if let (Some(base_obj), Some(diff_obj)) = (base.as_object_mut(), diff.as_object()) {
328 for (key, diff_val) in diff_obj {
329 base_obj.insert(key.clone(), diff_val.clone());
330 }
331 } else {
332 *base = diff.clone();
333 }
334 Ok(())
335 }
336
337 fn restore_strings_from_ids(
339 value: &mut serde_json::Value,
340 reverse_dict: &HashMap<u16, String>,
341 ) {
342 match value {
343 serde_json::Value::Number(n) => {
344 if let Some(id) = n.as_u64() {
345 if let Some(string) = reverse_dict.get(&(id as u16)) {
346 *value = serde_json::Value::String(string.clone());
347 }
348 }
349 }
350 serde_json::Value::Array(arr) => {
351 for item in arr {
352 Self::restore_strings_from_ids(item, reverse_dict);
353 }
354 }
355 serde_json::Value::Object(obj) => {
356 for val in obj.values_mut() {
357 Self::restore_strings_from_ids(val, reverse_dict);
358 }
359 }
360 _ => {}
361 }
362 }
363}