oxigdal_websocket/updates/
incremental.rs1use crate::error::Result;
4use bytes::Bytes;
5use parking_lot::RwLock;
6use std::collections::HashMap;
7use std::sync::Arc;
8
9#[derive(Debug, Clone)]
11pub struct UpdateDelta {
12 pub base_version: u64,
14 pub target_version: u64,
16 pub delta: Bytes,
18 pub encoding: DeltaEncoding,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum DeltaEncoding {
25 BinaryDiff,
27 JsonPatch,
29 Custom,
31}
32
33impl UpdateDelta {
34 pub fn new(
36 base_version: u64,
37 target_version: u64,
38 delta: Bytes,
39 encoding: DeltaEncoding,
40 ) -> Self {
41 Self {
42 base_version,
43 target_version,
44 delta,
45 encoding,
46 }
47 }
48
49 pub fn size(&self) -> usize {
51 self.delta.len()
52 }
53}
54
55pub struct IncrementalUpdate {
57 pub entity_id: String,
59 pub version: u64,
61 pub full_data: Option<Bytes>,
63 pub deltas: HashMap<u64, UpdateDelta>,
65}
66
67impl IncrementalUpdate {
68 pub fn new(entity_id: String, version: u64, full_data: Option<Bytes>) -> Self {
70 Self {
71 entity_id,
72 version,
73 full_data,
74 deltas: HashMap::new(),
75 }
76 }
77
78 pub fn add_delta(&mut self, delta: UpdateDelta) {
80 self.deltas.insert(delta.target_version, delta);
81 }
82
83 pub fn get_delta(&self, target_version: u64) -> Option<&UpdateDelta> {
85 self.deltas.get(&target_version)
86 }
87
88 pub fn get_delta_chain(&self, from_version: u64, to_version: u64) -> Option<Vec<&UpdateDelta>> {
90 let mut chain = Vec::new();
91 let mut current_version = from_version;
92
93 while current_version < to_version {
94 let next_version = current_version + 1;
95 if let Some(delta) = self.deltas.get(&next_version) {
96 if delta.base_version == current_version {
97 chain.push(delta);
98 current_version = next_version;
99 } else {
100 return None; }
102 } else {
103 return None; }
105 }
106
107 if current_version == to_version {
108 Some(chain)
109 } else {
110 None
111 }
112 }
113
114 pub fn has_full_data(&self) -> bool {
116 self.full_data.is_some()
117 }
118
119 pub fn full_data_size(&self) -> usize {
121 self.full_data.as_ref().map_or(0, |d| d.len())
122 }
123
124 pub fn total_delta_size(&self) -> usize {
126 self.deltas.values().map(|d| d.size()).sum()
127 }
128}
129
130pub struct IncrementalUpdateManager {
132 updates: Arc<RwLock<HashMap<String, IncrementalUpdate>>>,
133}
134
135impl IncrementalUpdateManager {
136 pub fn new() -> Self {
138 Self {
139 updates: Arc::new(RwLock::new(HashMap::new())),
140 }
141 }
142
143 pub fn register(
145 &self,
146 entity_id: String,
147 version: u64,
148 full_data: Option<Bytes>,
149 ) -> Result<()> {
150 let mut updates = self.updates.write();
151 updates.insert(
152 entity_id.clone(),
153 IncrementalUpdate::new(entity_id, version, full_data),
154 );
155 Ok(())
156 }
157
158 pub fn add_delta(&self, entity_id: &str, delta: UpdateDelta) -> Result<()> {
160 let mut updates = self.updates.write();
161
162 if let Some(update) = updates.get_mut(entity_id) {
163 let target_version = delta.target_version;
164 update.add_delta(delta);
165 update.version = target_version;
166 Ok(())
167 } else {
168 Err(crate::error::Error::InvalidState(format!(
169 "Entity {} not registered",
170 entity_id
171 )))
172 }
173 }
174
175 pub fn get_update(&self, entity_id: &str, client_version: u64) -> Option<UpdateResponse> {
177 let updates = self.updates.read();
178
179 if let Some(update) = updates.get(entity_id) {
180 if client_version >= update.version {
182 return Some(UpdateResponse::NoUpdate);
183 }
184
185 if let Some(chain) = update.get_delta_chain(client_version, update.version) {
187 let total_delta_size: usize = chain.iter().map(|d| d.size()).sum();
188
189 if let Some(full_data) = &update.full_data {
191 if total_delta_size < full_data.len() {
192 return Some(UpdateResponse::DeltaChain(
193 chain.into_iter().cloned().collect(),
194 ));
195 }
196 } else {
197 return Some(UpdateResponse::DeltaChain(
198 chain.into_iter().cloned().collect(),
199 ));
200 }
201 }
202
203 if let Some(full_data) = &update.full_data {
205 Some(UpdateResponse::FullData(full_data.clone(), update.version))
206 } else {
207 Some(UpdateResponse::NoData)
208 }
209 } else {
210 None
211 }
212 }
213
214 pub fn remove(&self, entity_id: &str) -> Option<IncrementalUpdate> {
216 let mut updates = self.updates.write();
217 updates.remove(entity_id)
218 }
219
220 pub fn entity_count(&self) -> usize {
222 self.updates.read().len()
223 }
224
225 pub fn total_delta_count(&self) -> usize {
227 let updates = self.updates.read();
228 updates.values().map(|u| u.deltas.len()).sum()
229 }
230
231 pub fn stats(&self) -> IncrementalUpdateStats {
233 let updates = self.updates.read();
234 let mut total_full_size = 0;
235 let mut total_delta_size = 0;
236 let mut total_deltas = 0;
237
238 for update in updates.values() {
239 total_full_size += update.full_data_size();
240 total_delta_size += update.total_delta_size();
241 total_deltas += update.deltas.len();
242 }
243
244 IncrementalUpdateStats {
245 entity_count: updates.len(),
246 total_deltas,
247 total_full_size,
248 total_delta_size,
249 }
250 }
251}
252
253impl Default for IncrementalUpdateManager {
254 fn default() -> Self {
255 Self::new()
256 }
257}
258
259#[derive(Debug, Clone)]
261pub enum UpdateResponse {
262 NoUpdate,
264 FullData(Bytes, u64),
266 DeltaChain(Vec<UpdateDelta>),
268 NoData,
270}
271
272#[derive(Debug, Clone)]
274pub struct IncrementalUpdateStats {
275 pub entity_count: usize,
277 pub total_deltas: usize,
279 pub total_full_size: usize,
281 pub total_delta_size: usize,
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288
289 #[test]
290 fn test_update_delta() {
291 let delta = UpdateDelta::new(1, 2, Bytes::from(vec![1, 2, 3]), DeltaEncoding::BinaryDiff);
292
293 assert_eq!(delta.base_version, 1);
294 assert_eq!(delta.target_version, 2);
295 assert_eq!(delta.size(), 3);
296 }
297
298 #[test]
299 fn test_incremental_update() {
300 let update = IncrementalUpdate::new("entity1".to_string(), 1, None);
301
302 assert_eq!(update.entity_id, "entity1");
303 assert_eq!(update.version, 1);
304 assert!(!update.has_full_data());
305 }
306
307 #[test]
308 fn test_incremental_update_add_delta() {
309 let mut update = IncrementalUpdate::new("entity1".to_string(), 1, None);
310
311 let delta = UpdateDelta::new(1, 2, Bytes::from(vec![1, 2, 3]), DeltaEncoding::BinaryDiff);
312 update.add_delta(delta);
313
314 assert_eq!(update.deltas.len(), 1);
315 assert!(update.get_delta(2).is_some());
316 }
317
318 #[test]
319 fn test_incremental_update_delta_chain() {
320 let mut update = IncrementalUpdate::new("entity1".to_string(), 1, None);
321
322 let delta1 = UpdateDelta::new(1, 2, Bytes::from(vec![1]), DeltaEncoding::BinaryDiff);
323 let delta2 = UpdateDelta::new(2, 3, Bytes::from(vec![2]), DeltaEncoding::BinaryDiff);
324 let delta3 = UpdateDelta::new(3, 4, Bytes::from(vec![3]), DeltaEncoding::BinaryDiff);
325
326 update.add_delta(delta1);
327 update.add_delta(delta2);
328 update.add_delta(delta3);
329
330 let chain = update.get_delta_chain(1, 4);
331 assert!(chain.is_some());
332 assert_eq!(chain.as_ref().map(|c| c.len()), Some(3));
333 }
334
335 #[test]
336 fn test_incremental_update_manager() -> Result<()> {
337 let manager = IncrementalUpdateManager::new();
338
339 manager.register("entity1".to_string(), 1, Some(Bytes::from(vec![1, 2, 3])))?;
340
341 assert_eq!(manager.entity_count(), 1);
342 Ok(())
343 }
344
345 #[test]
346 fn test_incremental_update_manager_delta() -> Result<()> {
347 let manager = IncrementalUpdateManager::new();
348
349 manager.register("entity1".to_string(), 1, Some(Bytes::from(vec![1, 2, 3])))?;
350
351 let delta = UpdateDelta::new(1, 2, Bytes::from(vec![4, 5]), DeltaEncoding::BinaryDiff);
352 manager.add_delta("entity1", delta)?;
353
354 assert_eq!(manager.total_delta_count(), 1);
355 Ok(())
356 }
357
358 #[test]
359 fn test_incremental_update_manager_get_update() -> Result<()> {
360 let manager = IncrementalUpdateManager::new();
361
362 manager.register("entity1".to_string(), 2, Some(Bytes::from(vec![1, 2, 3])))?;
363
364 let delta = UpdateDelta::new(1, 2, Bytes::from(vec![4, 5]), DeltaEncoding::BinaryDiff);
365 manager.add_delta("entity1", delta)?;
366
367 let response = manager.get_update("entity1", 1);
369 assert!(matches!(response, Some(UpdateResponse::DeltaChain(_))));
370
371 let response = manager.get_update("entity1", 2);
373 assert!(matches!(response, Some(UpdateResponse::NoUpdate)));
374
375 Ok(())
376 }
377}