1use crate::protocol::*;
2use crate::serialization::{WorldSnapshot, Delta};
3use crate::debug;
4use ahash::AHashMap;
5use std::time::Instant;
6
7pub struct DeltaCompressor {
8 previous_snapshot: Option<WorldSnapshot>,
9 field_compressor: FieldCompressor,
10}
11
12impl DeltaCompressor {
13 pub fn new() -> Self {
14 Self {
15 previous_snapshot: None,
16 field_compressor: FieldCompressor::new(),
17 }
18 }
19
20 pub fn with_field_compression(enable: bool) -> Self {
21 Self {
22 previous_snapshot: None,
23 field_compressor: FieldCompressor::with_enabled(enable),
24 }
25 }
26
27 pub fn create_delta(&mut self, current_snapshot: WorldSnapshot) -> Delta {
28 let start = Instant::now();
29
30 let timestamp = current_snapshot.timestamp;
31 let base_timestamp = self.previous_snapshot.as_ref()
32 .map(|s| s.timestamp)
33 .unwrap_or(0.0);
34
35 let changes = if let Some(prev) = &self.previous_snapshot {
36 self.compute_changes(prev, ¤t_snapshot)
37 } else {
38 self.create_initial_delta(¤t_snapshot)
39 };
40
41 let delta = Delta {
42 changes,
43 timestamp,
44 base_timestamp,
45 };
46
47 if debug::is_debug_enabled() {
49 debug::log_delta("Created", &delta);
50 }
51
52 if debug::is_trace_enabled() {
53 debug::trace_delta(&delta);
54 let duration = start.elapsed().as_micros();
55
56 let original_size = bincode::serialize(¤t_snapshot).unwrap_or_default().len();
58 let delta_size = bincode::serialize(&delta).unwrap_or_default().len();
59 debug::trace_compression(original_size, delta_size, duration);
60 }
61
62 self.previous_snapshot = Some(current_snapshot);
63
64 delta
65 }
66
67 fn create_initial_delta(&self, snapshot: &WorldSnapshot) -> Vec<DeltaChange> {
68 let mut changes = Vec::new();
69
70 for entity in &snapshot.entities {
71 changes.push(DeltaChange::EntityAdded {
72 entity_id: entity.id,
73 });
74
75 for component in &entity.components {
76 changes.push(DeltaChange::ComponentAdded {
77 entity_id: entity.id,
78 component_id: component.id.clone(),
79 data: component.data.clone(),
80 });
81 }
82 }
83
84 changes
85 }
86
87 fn compute_changes(&self, prev: &WorldSnapshot, curr: &WorldSnapshot) -> Vec<DeltaChange> {
88 let mut changes = Vec::new();
89
90 let prev_entities: AHashMap<EntityId, &SerializedEntity> = prev.entities.iter()
91 .map(|e| (e.id, e))
92 .collect();
93 let curr_entities: AHashMap<EntityId, &SerializedEntity> = curr.entities.iter()
94 .map(|e| (e.id, e))
95 .collect();
96
97 for (entity_id, curr_entity) in &curr_entities {
98 if let Some(prev_entity) = prev_entities.get(entity_id) {
99 self.compute_component_changes(*entity_id, prev_entity, curr_entity, &mut changes);
100 } else {
101 changes.push(DeltaChange::EntityAdded {
102 entity_id: *entity_id,
103 });
104
105 for component in &curr_entity.components {
106 changes.push(DeltaChange::ComponentAdded {
107 entity_id: *entity_id,
108 component_id: component.id.clone(),
109 data: component.data.clone(),
110 });
111 }
112 }
113 }
114
115 for entity_id in prev_entities.keys() {
116 if !curr_entities.contains_key(entity_id) {
117 changes.push(DeltaChange::EntityRemoved {
118 entity_id: *entity_id,
119 });
120 }
121 }
122
123 changes
124 }
125
126 fn compute_component_changes(
127 &self,
128 entity_id: EntityId,
129 prev_entity: &SerializedEntity,
130 curr_entity: &SerializedEntity,
131 changes: &mut Vec<DeltaChange>,
132 ) {
133 let prev_components: AHashMap<&str, &SerializedComponent> = prev_entity.components.iter()
134 .map(|c| (c.id.as_str(), c))
135 .collect();
136 let curr_components: AHashMap<&str, &SerializedComponent> = curr_entity.components.iter()
137 .map(|c| (c.id.as_str(), c))
138 .collect();
139
140 for (component_id, curr_component) in &curr_components {
141 if let Some(prev_component) = prev_components.get(component_id) {
142 if !self.components_equal(prev_component, curr_component) {
143 if self.field_compressor.is_enabled() {
144 if let Some(field_deltas) = self.field_compressor.compute_field_deltas(
145 prev_component,
146 curr_component,
147 ) {
148 if !field_deltas.is_empty() {
149 changes.push(DeltaChange::FieldsUpdated {
150 entity_id,
151 component_id: component_id.to_string(),
152 fields: field_deltas,
153 });
154 continue;
155 }
156 }
157 }
158
159 changes.push(DeltaChange::ComponentUpdated {
160 entity_id,
161 component_id: component_id.to_string(),
162 data: curr_component.data.clone(),
163 });
164 }
165 } else {
166 changes.push(DeltaChange::ComponentAdded {
167 entity_id,
168 component_id: component_id.to_string(),
169 data: curr_component.data.clone(),
170 });
171 }
172 }
173
174 for component_id in prev_components.keys() {
175 if !curr_components.contains_key(component_id) {
176 changes.push(DeltaChange::ComponentRemoved {
177 entity_id,
178 component_id: component_id.to_string(),
179 });
180 }
181 }
182 }
183
184 fn components_equal(&self, a: &SerializedComponent, b: &SerializedComponent) -> bool {
185 if a.id != b.id {
186 return false;
187 }
188
189 match (&a.data, &b.data) {
190 (ComponentData::Binary(a_data), ComponentData::Binary(b_data)) => a_data == b_data,
191 (ComponentData::Json(a_json), ComponentData::Json(b_json)) => a_json == b_json,
192 (ComponentData::Structured(a_map), ComponentData::Structured(b_map)) => a_map == b_map,
193 _ => false,
194 }
195 }
196
197 pub fn reset(&mut self) {
198 self.previous_snapshot = None;
199 }
200
201 pub fn get_previous_snapshot(&self) -> Option<&WorldSnapshot> {
202 self.previous_snapshot.as_ref()
203 }
204}
205
206impl Default for DeltaCompressor {
207 fn default() -> Self {
208 Self::new()
209 }
210}
211
212pub struct FieldCompressor {
213 enabled: bool,
214}
215
216impl FieldCompressor {
217 pub fn new() -> Self {
218 Self { enabled: true }
219 }
220
221 pub fn with_enabled(enabled: bool) -> Self {
222 Self { enabled }
223 }
224
225 pub fn is_enabled(&self) -> bool {
226 self.enabled
227 }
228
229 pub fn set_enabled(&mut self, enabled: bool) {
230 self.enabled = enabled;
231 }
232
233 pub fn compute_field_deltas(
234 &self,
235 prev: &SerializedComponent,
236 curr: &SerializedComponent,
237 ) -> Option<Vec<FieldDelta>> {
238 if !self.enabled {
239 return None;
240 }
241
242 match (&prev.data, &curr.data) {
243 (ComponentData::Structured(prev_fields), ComponentData::Structured(curr_fields)) => {
244 let mut deltas = Vec::new();
245
246 for (field_id, curr_value) in curr_fields {
247 if let Some(prev_value) = prev_fields.get(field_id) {
248 if prev_value != curr_value {
249 deltas.push(FieldDelta {
250 field_id: field_id.clone(),
251 old_value: Some(prev_value.clone()),
252 new_value: curr_value.clone(),
253 });
254 }
255 } else {
256 deltas.push(FieldDelta {
257 field_id: field_id.clone(),
258 old_value: None,
259 new_value: curr_value.clone(),
260 });
261 }
262 }
263
264 for field_id in prev_fields.keys() {
265 if !curr_fields.contains_key(field_id) {
266 deltas.push(FieldDelta {
267 field_id: field_id.clone(),
268 old_value: prev_fields.get(field_id).cloned(),
269 new_value: FieldValue::Null,
270 });
271 }
272 }
273
274 Some(deltas)
275 }
276 (ComponentData::Json(prev_json_str), ComponentData::Json(curr_json_str)) => {
277 if let (Ok(prev_json), Ok(curr_json)) = (
278 serde_json::from_str::<serde_json::Value>(prev_json_str),
279 serde_json::from_str::<serde_json::Value>(curr_json_str)
280 ) {
281 if let (Some(prev_obj), Some(curr_obj)) = (prev_json.as_object(), curr_json.as_object()) {
282 let mut deltas = Vec::new();
283
284 for (key, curr_value) in curr_obj {
285 if let Some(prev_value) = prev_obj.get(key) {
286 if prev_value != curr_value {
287 deltas.push(FieldDelta {
288 field_id: key.clone(),
289 old_value: Some(json_to_field_value(prev_value)),
290 new_value: json_to_field_value(curr_value),
291 });
292 }
293 } else {
294 deltas.push(FieldDelta {
295 field_id: key.clone(),
296 old_value: None,
297 new_value: json_to_field_value(curr_value),
298 });
299 }
300 }
301
302 for key in prev_obj.keys() {
303 if !curr_obj.contains_key(key) {
304 deltas.push(FieldDelta {
305 field_id: key.clone(),
306 old_value: prev_obj.get(key).map(json_to_field_value),
307 new_value: FieldValue::Null,
308 });
309 }
310 }
311
312 Some(deltas)
313 } else {
314 None
315 }
316 } else {
317 None
318 }
319 }
320 _ => None,
321 }
322 }
323}
324
325impl Default for FieldCompressor {
326 fn default() -> Self {
327 Self::new()
328 }
329}
330
331fn json_to_field_value(value: &serde_json::Value) -> FieldValue {
332 match value {
333 serde_json::Value::Null => FieldValue::Null,
334 serde_json::Value::Bool(b) => FieldValue::Bool(*b),
335 serde_json::Value::Number(n) => {
336 if let Some(i) = n.as_i64() {
337 FieldValue::I64(i)
338 } else if let Some(u) = n.as_u64() {
339 FieldValue::U64(u)
340 } else if let Some(f) = n.as_f64() {
341 FieldValue::F64(f)
342 } else {
343 FieldValue::Null
344 }
345 }
346 serde_json::Value::String(s) => FieldValue::String(s.clone()),
347 serde_json::Value::Array(arr) => {
348 FieldValue::Array(arr.iter().map(json_to_field_value).collect())
349 }
350 serde_json::Value::Object(obj) => {
351 let map = obj.iter()
352 .map(|(k, v)| (k.clone(), json_to_field_value(v)))
353 .collect();
354 FieldValue::Map(map)
355 }
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use super::*;
362 use std::collections::HashMap;
363
364 #[test]
365 fn test_delta_compression_initial() {
366 let mut compressor = DeltaCompressor::new();
367
368 let snapshot = WorldSnapshot {
369 entities: vec![
370 SerializedEntity {
371 id: 1,
372 components: vec![
373 SerializedComponent {
374 id: "Position".to_string(),
375 data: ComponentData::from_json_value(serde_json::json!({"x": 10.0, "y": 20.0})),
376 }
377 ],
378 }
379 ],
380 timestamp: 100.0,
381 version: "1.0.0".to_string(),
382 };
383
384 let delta = compressor.create_delta(snapshot);
385
386 assert_eq!(delta.changes.len(), 2);
387 assert!(matches!(delta.changes[0], DeltaChange::EntityAdded { .. }));
388 assert!(matches!(delta.changes[1], DeltaChange::ComponentAdded { .. }));
389 }
390
391 #[test]
392 fn test_delta_compression_update() {
393 let mut compressor = DeltaCompressor::new();
394
395 let snapshot1 = WorldSnapshot {
396 entities: vec![
397 SerializedEntity {
398 id: 1,
399 components: vec![
400 SerializedComponent {
401 id: "Position".to_string(),
402 data: ComponentData::from_json_value(serde_json::json!({"x": 10.0, "y": 20.0})),
403 }
404 ],
405 }
406 ],
407 timestamp: 100.0,
408 version: "1.0.0".to_string(),
409 };
410
411 compressor.create_delta(snapshot1);
412
413 let snapshot2 = WorldSnapshot {
414 entities: vec![
415 SerializedEntity {
416 id: 1,
417 components: vec![
418 SerializedComponent {
419 id: "Position".to_string(),
420 data: ComponentData::from_json_value(serde_json::json!({"x": 15.0, "y": 20.0})),
421 }
422 ],
423 }
424 ],
425 timestamp: 200.0,
426 version: "1.0.0".to_string(),
427 };
428
429 let delta = compressor.create_delta(snapshot2);
430
431 assert!(delta.changes.iter().any(|c| matches!(c, DeltaChange::ComponentUpdated { .. } | DeltaChange::FieldsUpdated { .. })));
432 }
433
434 #[test]
435 fn test_field_level_delta() {
436 let compressor = FieldCompressor::new();
437
438 let mut prev_fields = HashMap::new();
439 prev_fields.insert("x".to_string(), FieldValue::F64(10.0));
440 prev_fields.insert("y".to_string(), FieldValue::F64(20.0));
441
442 let mut curr_fields = HashMap::new();
443 curr_fields.insert("x".to_string(), FieldValue::F64(15.0));
444 curr_fields.insert("y".to_string(), FieldValue::F64(20.0));
445
446 let prev_component = SerializedComponent {
447 id: "Position".to_string(),
448 data: ComponentData::Structured(prev_fields),
449 };
450
451 let curr_component = SerializedComponent {
452 id: "Position".to_string(),
453 data: ComponentData::Structured(curr_fields),
454 };
455
456 let deltas = compressor.compute_field_deltas(&prev_component, &curr_component).unwrap();
457
458 assert_eq!(deltas.len(), 1);
459 assert_eq!(deltas[0].field_id, "x");
460 }
461}