Skip to main content

oxigdal_websocket/updates/
incremental.rs

1//! Incremental update delivery and delta encoding
2
3use crate::error::Result;
4use bytes::Bytes;
5use parking_lot::RwLock;
6use std::collections::HashMap;
7use std::sync::Arc;
8
9/// Update delta representation
10#[derive(Debug, Clone)]
11pub struct UpdateDelta {
12    /// Base version
13    pub base_version: u64,
14    /// Target version
15    pub target_version: u64,
16    /// Delta data
17    pub delta: Bytes,
18    /// Delta encoding type
19    pub encoding: DeltaEncoding,
20}
21
22/// Delta encoding type
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum DeltaEncoding {
25    /// Binary diff
26    BinaryDiff,
27    /// JSON patch (RFC 6902)
28    JsonPatch,
29    /// Custom encoding
30    Custom,
31}
32
33impl UpdateDelta {
34    /// Create a new update delta
35    pub fn new(
36        base_version: u64,
37        target_version: u64,
38        delta: Bytes,
39        encoding: DeltaEncoding,
40    ) -> Self {
41        Self {
42            base_version,
43            target_version,
44            delta,
45            encoding,
46        }
47    }
48
49    /// Get delta size
50    pub fn size(&self) -> usize {
51        self.delta.len()
52    }
53}
54
55/// Incremental update
56pub struct IncrementalUpdate {
57    /// Entity ID
58    pub entity_id: String,
59    /// Current version
60    pub version: u64,
61    /// Full data
62    pub full_data: Option<Bytes>,
63    /// Available deltas (indexed by target version)
64    pub deltas: HashMap<u64, UpdateDelta>,
65}
66
67impl IncrementalUpdate {
68    /// Create a new incremental update
69    pub fn new(entity_id: String, version: u64, full_data: Option<Bytes>) -> Self {
70        Self {
71            entity_id,
72            version,
73            full_data,
74            deltas: HashMap::new(),
75        }
76    }
77
78    /// Add a delta
79    pub fn add_delta(&mut self, delta: UpdateDelta) {
80        self.deltas.insert(delta.target_version, delta);
81    }
82
83    /// Get delta to specific version
84    pub fn get_delta(&self, target_version: u64) -> Option<&UpdateDelta> {
85        self.deltas.get(&target_version)
86    }
87
88    /// Get delta chain from base to target version
89    pub fn get_delta_chain(&self, from_version: u64, to_version: u64) -> Option<Vec<&UpdateDelta>> {
90        let mut chain = Vec::new();
91        let mut current_version = from_version;
92
93        while current_version < to_version {
94            let next_version = current_version + 1;
95            if let Some(delta) = self.deltas.get(&next_version) {
96                if delta.base_version == current_version {
97                    chain.push(delta);
98                    current_version = next_version;
99                } else {
100                    return None; // Chain broken
101                }
102            } else {
103                return None; // Missing delta
104            }
105        }
106
107        if current_version == to_version {
108            Some(chain)
109        } else {
110            None
111        }
112    }
113
114    /// Check if full data is available
115    pub fn has_full_data(&self) -> bool {
116        self.full_data.is_some()
117    }
118
119    /// Get full data size
120    pub fn full_data_size(&self) -> usize {
121        self.full_data.as_ref().map_or(0, |d| d.len())
122    }
123
124    /// Get total delta size
125    pub fn total_delta_size(&self) -> usize {
126        self.deltas.values().map(|d| d.size()).sum()
127    }
128}
129
130/// Incremental update manager
131pub struct IncrementalUpdateManager {
132    updates: Arc<RwLock<HashMap<String, IncrementalUpdate>>>,
133}
134
135impl IncrementalUpdateManager {
136    /// Create a new incremental update manager
137    pub fn new() -> Self {
138        Self {
139            updates: Arc::new(RwLock::new(HashMap::new())),
140        }
141    }
142
143    /// Register an entity for incremental updates
144    pub fn register(
145        &self,
146        entity_id: String,
147        version: u64,
148        full_data: Option<Bytes>,
149    ) -> Result<()> {
150        let mut updates = self.updates.write();
151        updates.insert(
152            entity_id.clone(),
153            IncrementalUpdate::new(entity_id, version, full_data),
154        );
155        Ok(())
156    }
157
158    /// Add a delta for an entity
159    pub fn add_delta(&self, entity_id: &str, delta: UpdateDelta) -> Result<()> {
160        let mut updates = self.updates.write();
161
162        if let Some(update) = updates.get_mut(entity_id) {
163            let target_version = delta.target_version;
164            update.add_delta(delta);
165            update.version = target_version;
166            Ok(())
167        } else {
168            Err(crate::error::Error::InvalidState(format!(
169                "Entity {} not registered",
170                entity_id
171            )))
172        }
173    }
174
175    /// Get full data or delta for an entity
176    pub fn get_update(&self, entity_id: &str, client_version: u64) -> Option<UpdateResponse> {
177        let updates = self.updates.read();
178
179        if let Some(update) = updates.get(entity_id) {
180            // If client is up to date, no update needed
181            if client_version >= update.version {
182                return Some(UpdateResponse::NoUpdate);
183            }
184
185            // Try to get delta chain
186            if let Some(chain) = update.get_delta_chain(client_version, update.version) {
187                let total_delta_size: usize = chain.iter().map(|d| d.size()).sum();
188
189                // If delta chain is available and smaller than full data, use it
190                if let Some(full_data) = &update.full_data {
191                    if total_delta_size < full_data.len() {
192                        return Some(UpdateResponse::DeltaChain(
193                            chain.into_iter().cloned().collect(),
194                        ));
195                    }
196                } else {
197                    return Some(UpdateResponse::DeltaChain(
198                        chain.into_iter().cloned().collect(),
199                    ));
200                }
201            }
202
203            // Fall back to full data if available
204            if let Some(full_data) = &update.full_data {
205                Some(UpdateResponse::FullData(full_data.clone(), update.version))
206            } else {
207                Some(UpdateResponse::NoData)
208            }
209        } else {
210            None
211        }
212    }
213
214    /// Remove an entity
215    pub fn remove(&self, entity_id: &str) -> Option<IncrementalUpdate> {
216        let mut updates = self.updates.write();
217        updates.remove(entity_id)
218    }
219
220    /// Get entity count
221    pub fn entity_count(&self) -> usize {
222        self.updates.read().len()
223    }
224
225    /// Get total delta count
226    pub fn total_delta_count(&self) -> usize {
227        let updates = self.updates.read();
228        updates.values().map(|u| u.deltas.len()).sum()
229    }
230
231    /// Get statistics
232    pub fn stats(&self) -> IncrementalUpdateStats {
233        let updates = self.updates.read();
234        let mut total_full_size = 0;
235        let mut total_delta_size = 0;
236        let mut total_deltas = 0;
237
238        for update in updates.values() {
239            total_full_size += update.full_data_size();
240            total_delta_size += update.total_delta_size();
241            total_deltas += update.deltas.len();
242        }
243
244        IncrementalUpdateStats {
245            entity_count: updates.len(),
246            total_deltas,
247            total_full_size,
248            total_delta_size,
249        }
250    }
251}
252
253impl Default for IncrementalUpdateManager {
254    fn default() -> Self {
255        Self::new()
256    }
257}
258
259/// Update response
260#[derive(Debug, Clone)]
261pub enum UpdateResponse {
262    /// No update needed (client is up to date)
263    NoUpdate,
264    /// Full data response
265    FullData(Bytes, u64),
266    /// Delta chain response
267    DeltaChain(Vec<UpdateDelta>),
268    /// No data available
269    NoData,
270}
271
272/// Incremental update statistics
273#[derive(Debug, Clone)]
274pub struct IncrementalUpdateStats {
275    /// Number of entities
276    pub entity_count: usize,
277    /// Total deltas
278    pub total_deltas: usize,
279    /// Total full data size
280    pub total_full_size: usize,
281    /// Total delta size
282    pub total_delta_size: usize,
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288
289    #[test]
290    fn test_update_delta() {
291        let delta = UpdateDelta::new(1, 2, Bytes::from(vec![1, 2, 3]), DeltaEncoding::BinaryDiff);
292
293        assert_eq!(delta.base_version, 1);
294        assert_eq!(delta.target_version, 2);
295        assert_eq!(delta.size(), 3);
296    }
297
298    #[test]
299    fn test_incremental_update() {
300        let update = IncrementalUpdate::new("entity1".to_string(), 1, None);
301
302        assert_eq!(update.entity_id, "entity1");
303        assert_eq!(update.version, 1);
304        assert!(!update.has_full_data());
305    }
306
307    #[test]
308    fn test_incremental_update_add_delta() {
309        let mut update = IncrementalUpdate::new("entity1".to_string(), 1, None);
310
311        let delta = UpdateDelta::new(1, 2, Bytes::from(vec![1, 2, 3]), DeltaEncoding::BinaryDiff);
312        update.add_delta(delta);
313
314        assert_eq!(update.deltas.len(), 1);
315        assert!(update.get_delta(2).is_some());
316    }
317
318    #[test]
319    fn test_incremental_update_delta_chain() {
320        let mut update = IncrementalUpdate::new("entity1".to_string(), 1, None);
321
322        let delta1 = UpdateDelta::new(1, 2, Bytes::from(vec![1]), DeltaEncoding::BinaryDiff);
323        let delta2 = UpdateDelta::new(2, 3, Bytes::from(vec![2]), DeltaEncoding::BinaryDiff);
324        let delta3 = UpdateDelta::new(3, 4, Bytes::from(vec![3]), DeltaEncoding::BinaryDiff);
325
326        update.add_delta(delta1);
327        update.add_delta(delta2);
328        update.add_delta(delta3);
329
330        let chain = update.get_delta_chain(1, 4);
331        assert!(chain.is_some());
332        assert_eq!(chain.as_ref().map(|c| c.len()), Some(3));
333    }
334
335    #[test]
336    fn test_incremental_update_manager() -> Result<()> {
337        let manager = IncrementalUpdateManager::new();
338
339        manager.register("entity1".to_string(), 1, Some(Bytes::from(vec![1, 2, 3])))?;
340
341        assert_eq!(manager.entity_count(), 1);
342        Ok(())
343    }
344
345    #[test]
346    fn test_incremental_update_manager_delta() -> Result<()> {
347        let manager = IncrementalUpdateManager::new();
348
349        manager.register("entity1".to_string(), 1, Some(Bytes::from(vec![1, 2, 3])))?;
350
351        let delta = UpdateDelta::new(1, 2, Bytes::from(vec![4, 5]), DeltaEncoding::BinaryDiff);
352        manager.add_delta("entity1", delta)?;
353
354        assert_eq!(manager.total_delta_count(), 1);
355        Ok(())
356    }
357
358    #[test]
359    fn test_incremental_update_manager_get_update() -> Result<()> {
360        let manager = IncrementalUpdateManager::new();
361
362        manager.register("entity1".to_string(), 2, Some(Bytes::from(vec![1, 2, 3])))?;
363
364        let delta = UpdateDelta::new(1, 2, Bytes::from(vec![4, 5]), DeltaEncoding::BinaryDiff);
365        manager.add_delta("entity1", delta)?;
366
367        // Client at version 1 should get delta
368        let response = manager.get_update("entity1", 1);
369        assert!(matches!(response, Some(UpdateResponse::DeltaChain(_))));
370
371        // Client at version 2 should get no update
372        let response = manager.get_update("entity1", 2);
373        assert!(matches!(response, Some(UpdateResponse::NoUpdate)));
374
375        Ok(())
376    }
377}