tx2_link/
compression.rs

1use crate::protocol::*;
2use crate::serialization::{WorldSnapshot, Delta};
3use crate::debug;
4use ahash::AHashMap;
5use std::time::Instant;
6
7pub struct DeltaCompressor {
8    previous_snapshot: Option<WorldSnapshot>,
9    field_compressor: FieldCompressor,
10}
11
12impl DeltaCompressor {
13    pub fn new() -> Self {
14        Self {
15            previous_snapshot: None,
16            field_compressor: FieldCompressor::new(),
17        }
18    }
19
20    pub fn with_field_compression(enable: bool) -> Self {
21        Self {
22            previous_snapshot: None,
23            field_compressor: FieldCompressor::with_enabled(enable),
24        }
25    }
26
27    pub fn create_delta(&mut self, current_snapshot: WorldSnapshot) -> Delta {
28        let start = Instant::now();
29
30        let timestamp = current_snapshot.timestamp;
31        let base_timestamp = self.previous_snapshot.as_ref()
32            .map(|s| s.timestamp)
33            .unwrap_or(0.0);
34
35        let changes = if let Some(prev) = &self.previous_snapshot {
36            self.compute_changes(prev, &current_snapshot)
37        } else {
38            self.create_initial_delta(&current_snapshot)
39        };
40
41        let delta = Delta {
42            changes,
43            timestamp,
44            base_timestamp,
45        };
46
47        // Debug logging
48        if debug::is_debug_enabled() {
49            debug::log_delta("Created", &delta);
50        }
51
52        if debug::is_trace_enabled() {
53            debug::trace_delta(&delta);
54            let duration = start.elapsed().as_micros();
55
56            // Estimate sizes for compression ratio
57            let original_size = bincode::serialize(&current_snapshot).unwrap_or_default().len();
58            let delta_size = bincode::serialize(&delta).unwrap_or_default().len();
59            debug::trace_compression(original_size, delta_size, duration);
60        }
61
62        self.previous_snapshot = Some(current_snapshot);
63
64        delta
65    }
66
67    fn create_initial_delta(&self, snapshot: &WorldSnapshot) -> Vec<DeltaChange> {
68        let mut changes = Vec::new();
69
70        for entity in &snapshot.entities {
71            changes.push(DeltaChange::EntityAdded {
72                entity_id: entity.id,
73            });
74
75            for component in &entity.components {
76                changes.push(DeltaChange::ComponentAdded {
77                    entity_id: entity.id,
78                    component_id: component.id.clone(),
79                    data: component.data.clone(),
80                });
81            }
82        }
83
84        changes
85    }
86
87    fn compute_changes(&self, prev: &WorldSnapshot, curr: &WorldSnapshot) -> Vec<DeltaChange> {
88        let mut changes = Vec::new();
89
90        let prev_entities: AHashMap<EntityId, &SerializedEntity> = prev.entities.iter()
91            .map(|e| (e.id, e))
92            .collect();
93        let curr_entities: AHashMap<EntityId, &SerializedEntity> = curr.entities.iter()
94            .map(|e| (e.id, e))
95            .collect();
96
97        for (entity_id, curr_entity) in &curr_entities {
98            if let Some(prev_entity) = prev_entities.get(entity_id) {
99                self.compute_component_changes(*entity_id, prev_entity, curr_entity, &mut changes);
100            } else {
101                changes.push(DeltaChange::EntityAdded {
102                    entity_id: *entity_id,
103                });
104
105                for component in &curr_entity.components {
106                    changes.push(DeltaChange::ComponentAdded {
107                        entity_id: *entity_id,
108                        component_id: component.id.clone(),
109                        data: component.data.clone(),
110                    });
111                }
112            }
113        }
114
115        for entity_id in prev_entities.keys() {
116            if !curr_entities.contains_key(entity_id) {
117                changes.push(DeltaChange::EntityRemoved {
118                    entity_id: *entity_id,
119                });
120            }
121        }
122
123        changes
124    }
125
126    fn compute_component_changes(
127        &self,
128        entity_id: EntityId,
129        prev_entity: &SerializedEntity,
130        curr_entity: &SerializedEntity,
131        changes: &mut Vec<DeltaChange>,
132    ) {
133        let prev_components: AHashMap<&str, &SerializedComponent> = prev_entity.components.iter()
134            .map(|c| (c.id.as_str(), c))
135            .collect();
136        let curr_components: AHashMap<&str, &SerializedComponent> = curr_entity.components.iter()
137            .map(|c| (c.id.as_str(), c))
138            .collect();
139
140        for (component_id, curr_component) in &curr_components {
141            if let Some(prev_component) = prev_components.get(component_id) {
142                if !self.components_equal(prev_component, curr_component) {
143                    if self.field_compressor.is_enabled() {
144                        if let Some(field_deltas) = self.field_compressor.compute_field_deltas(
145                            prev_component,
146                            curr_component,
147                        ) {
148                            if !field_deltas.is_empty() {
149                                changes.push(DeltaChange::FieldsUpdated {
150                                    entity_id,
151                                    component_id: component_id.to_string(),
152                                    fields: field_deltas,
153                                });
154                                continue;
155                            }
156                        }
157                    }
158
159                    changes.push(DeltaChange::ComponentUpdated {
160                        entity_id,
161                        component_id: component_id.to_string(),
162                        data: curr_component.data.clone(),
163                    });
164                }
165            } else {
166                changes.push(DeltaChange::ComponentAdded {
167                    entity_id,
168                    component_id: component_id.to_string(),
169                    data: curr_component.data.clone(),
170                });
171            }
172        }
173
174        for component_id in prev_components.keys() {
175            if !curr_components.contains_key(component_id) {
176                changes.push(DeltaChange::ComponentRemoved {
177                    entity_id,
178                    component_id: component_id.to_string(),
179                });
180            }
181        }
182    }
183
184    fn components_equal(&self, a: &SerializedComponent, b: &SerializedComponent) -> bool {
185        if a.id != b.id {
186            return false;
187        }
188
189        match (&a.data, &b.data) {
190            (ComponentData::Binary(a_data), ComponentData::Binary(b_data)) => a_data == b_data,
191            (ComponentData::Json(a_json), ComponentData::Json(b_json)) => a_json == b_json,
192            (ComponentData::Structured(a_map), ComponentData::Structured(b_map)) => a_map == b_map,
193            _ => false,
194        }
195    }
196
197    pub fn reset(&mut self) {
198        self.previous_snapshot = None;
199    }
200
201    pub fn get_previous_snapshot(&self) -> Option<&WorldSnapshot> {
202        self.previous_snapshot.as_ref()
203    }
204}
205
206impl Default for DeltaCompressor {
207    fn default() -> Self {
208        Self::new()
209    }
210}
211
212pub struct FieldCompressor {
213    enabled: bool,
214}
215
216impl FieldCompressor {
217    pub fn new() -> Self {
218        Self { enabled: true }
219    }
220
221    pub fn with_enabled(enabled: bool) -> Self {
222        Self { enabled }
223    }
224
225    pub fn is_enabled(&self) -> bool {
226        self.enabled
227    }
228
229    pub fn set_enabled(&mut self, enabled: bool) {
230        self.enabled = enabled;
231    }
232
233    pub fn compute_field_deltas(
234        &self,
235        prev: &SerializedComponent,
236        curr: &SerializedComponent,
237    ) -> Option<Vec<FieldDelta>> {
238        if !self.enabled {
239            return None;
240        }
241
242        match (&prev.data, &curr.data) {
243            (ComponentData::Structured(prev_fields), ComponentData::Structured(curr_fields)) => {
244                let mut deltas = Vec::new();
245
246                for (field_id, curr_value) in curr_fields {
247                    if let Some(prev_value) = prev_fields.get(field_id) {
248                        if prev_value != curr_value {
249                            deltas.push(FieldDelta {
250                                field_id: field_id.clone(),
251                                old_value: Some(prev_value.clone()),
252                                new_value: curr_value.clone(),
253                            });
254                        }
255                    } else {
256                        deltas.push(FieldDelta {
257                            field_id: field_id.clone(),
258                            old_value: None,
259                            new_value: curr_value.clone(),
260                        });
261                    }
262                }
263
264                for field_id in prev_fields.keys() {
265                    if !curr_fields.contains_key(field_id) {
266                        deltas.push(FieldDelta {
267                            field_id: field_id.clone(),
268                            old_value: prev_fields.get(field_id).cloned(),
269                            new_value: FieldValue::Null,
270                        });
271                    }
272                }
273
274                Some(deltas)
275            }
276            (ComponentData::Json(prev_json_str), ComponentData::Json(curr_json_str)) => {
277                if let (Ok(prev_json), Ok(curr_json)) = (
278                    serde_json::from_str::<serde_json::Value>(prev_json_str),
279                    serde_json::from_str::<serde_json::Value>(curr_json_str)
280                ) {
281                    if let (Some(prev_obj), Some(curr_obj)) = (prev_json.as_object(), curr_json.as_object()) {
282                        let mut deltas = Vec::new();
283
284                        for (key, curr_value) in curr_obj {
285                            if let Some(prev_value) = prev_obj.get(key) {
286                                if prev_value != curr_value {
287                                    deltas.push(FieldDelta {
288                                        field_id: key.clone(),
289                                        old_value: Some(json_to_field_value(prev_value)),
290                                        new_value: json_to_field_value(curr_value),
291                                    });
292                                }
293                            } else {
294                                deltas.push(FieldDelta {
295                                    field_id: key.clone(),
296                                    old_value: None,
297                                    new_value: json_to_field_value(curr_value),
298                                });
299                            }
300                        }
301
302                        for key in prev_obj.keys() {
303                            if !curr_obj.contains_key(key) {
304                                deltas.push(FieldDelta {
305                                    field_id: key.clone(),
306                                    old_value: prev_obj.get(key).map(json_to_field_value),
307                                    new_value: FieldValue::Null,
308                                });
309                            }
310                        }
311
312                        Some(deltas)
313                    } else {
314                        None
315                    }
316                } else {
317                    None
318                }
319            }
320            _ => None,
321        }
322    }
323}
324
325impl Default for FieldCompressor {
326    fn default() -> Self {
327        Self::new()
328    }
329}
330
331fn json_to_field_value(value: &serde_json::Value) -> FieldValue {
332    match value {
333        serde_json::Value::Null => FieldValue::Null,
334        serde_json::Value::Bool(b) => FieldValue::Bool(*b),
335        serde_json::Value::Number(n) => {
336            if let Some(i) = n.as_i64() {
337                FieldValue::I64(i)
338            } else if let Some(u) = n.as_u64() {
339                FieldValue::U64(u)
340            } else if let Some(f) = n.as_f64() {
341                FieldValue::F64(f)
342            } else {
343                FieldValue::Null
344            }
345        }
346        serde_json::Value::String(s) => FieldValue::String(s.clone()),
347        serde_json::Value::Array(arr) => {
348            FieldValue::Array(arr.iter().map(json_to_field_value).collect())
349        }
350        serde_json::Value::Object(obj) => {
351            let map = obj.iter()
352                .map(|(k, v)| (k.clone(), json_to_field_value(v)))
353                .collect();
354            FieldValue::Map(map)
355        }
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362    use std::collections::HashMap;
363
364    #[test]
365    fn test_delta_compression_initial() {
366        let mut compressor = DeltaCompressor::new();
367
368        let snapshot = WorldSnapshot {
369            entities: vec![
370                SerializedEntity {
371                    id: 1,
372                    components: vec![
373                        SerializedComponent {
374                            id: "Position".to_string(),
375                            data: ComponentData::from_json_value(serde_json::json!({"x": 10.0, "y": 20.0})),
376                        }
377                    ],
378                }
379            ],
380            timestamp: 100.0,
381            version: "1.0.0".to_string(),
382        };
383
384        let delta = compressor.create_delta(snapshot);
385
386        assert_eq!(delta.changes.len(), 2);
387        assert!(matches!(delta.changes[0], DeltaChange::EntityAdded { .. }));
388        assert!(matches!(delta.changes[1], DeltaChange::ComponentAdded { .. }));
389    }
390
391    #[test]
392    fn test_delta_compression_update() {
393        let mut compressor = DeltaCompressor::new();
394
395        let snapshot1 = WorldSnapshot {
396            entities: vec![
397                SerializedEntity {
398                    id: 1,
399                    components: vec![
400                        SerializedComponent {
401                            id: "Position".to_string(),
402                            data: ComponentData::from_json_value(serde_json::json!({"x": 10.0, "y": 20.0})),
403                        }
404                    ],
405                }
406            ],
407            timestamp: 100.0,
408            version: "1.0.0".to_string(),
409        };
410
411        compressor.create_delta(snapshot1);
412
413        let snapshot2 = WorldSnapshot {
414            entities: vec![
415                SerializedEntity {
416                    id: 1,
417                    components: vec![
418                        SerializedComponent {
419                            id: "Position".to_string(),
420                            data: ComponentData::from_json_value(serde_json::json!({"x": 15.0, "y": 20.0})),
421                        }
422                    ],
423                }
424            ],
425            timestamp: 200.0,
426            version: "1.0.0".to_string(),
427        };
428
429        let delta = compressor.create_delta(snapshot2);
430
431        assert!(delta.changes.iter().any(|c| matches!(c, DeltaChange::ComponentUpdated { .. } | DeltaChange::FieldsUpdated { .. })));
432    }
433
434    #[test]
435    fn test_field_level_delta() {
436        let compressor = FieldCompressor::new();
437
438        let mut prev_fields = HashMap::new();
439        prev_fields.insert("x".to_string(), FieldValue::F64(10.0));
440        prev_fields.insert("y".to_string(), FieldValue::F64(20.0));
441
442        let mut curr_fields = HashMap::new();
443        curr_fields.insert("x".to_string(), FieldValue::F64(15.0));
444        curr_fields.insert("y".to_string(), FieldValue::F64(20.0));
445
446        let prev_component = SerializedComponent {
447            id: "Position".to_string(),
448            data: ComponentData::Structured(prev_fields),
449        };
450
451        let curr_component = SerializedComponent {
452            id: "Position".to_string(),
453            data: ComponentData::Structured(curr_fields),
454        };
455
456        let deltas = compressor.compute_field_deltas(&prev_component, &curr_component).unwrap();
457
458        assert_eq!(deltas.len(), 1);
459        assert_eq!(deltas[0].field_id, "x");
460    }
461}