Skip to main content

oxigdal_websocket/updates/
tile_updates.rs

1//! Tile update management and notifications
2
3use crate::error::{Error, Result};
4use crate::protocol::message::{Message, MessageType, Payload, TilePayload};
5use parking_lot::RwLock;
6use std::collections::{HashMap, VecDeque};
7use std::fmt;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10
11/// Tile update type
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum TileUpdateType {
14    /// Full tile replacement
15    Full,
16    /// Incremental delta update
17    Delta,
18    /// Tile invalidation (needs refresh)
19    Invalidate,
20}
21
22/// Tile coordinates
23#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub struct TileCoord {
25    /// Zoom level
26    pub z: u8,
27    /// X coordinate
28    pub x: u32,
29    /// Y coordinate
30    pub y: u32,
31}
32
33impl TileCoord {
34    /// Create new tile coordinates
35    pub fn new(z: u8, x: u32, y: u32) -> Self {
36        Self { z, x, y }
37    }
38
39    /// Parse from string
40    pub fn from_string(s: &str) -> Result<Self> {
41        let parts: Vec<&str> = s.split('/').collect();
42        if parts.len() != 3 {
43            return Err(Error::Protocol(format!("Invalid tile coord format: {}", s)));
44        }
45
46        let z = parts[0]
47            .parse()
48            .map_err(|_| Error::Protocol("Invalid z coordinate".to_string()))?;
49        let x = parts[1]
50            .parse()
51            .map_err(|_| Error::Protocol("Invalid x coordinate".to_string()))?;
52        let y = parts[2]
53            .parse()
54            .map_err(|_| Error::Protocol("Invalid y coordinate".to_string()))?;
55
56        Ok(Self { z, x, y })
57    }
58}
59
60impl fmt::Display for TileCoord {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        write!(f, "{}/{}/{}", self.z, self.x, self.y)
63    }
64}
65
66/// Tile update
67pub struct TileUpdate {
68    /// Tile coordinates
69    pub coord: TileCoord,
70    /// Update type
71    pub update_type: TileUpdateType,
72    /// Tile data
73    pub data: Vec<u8>,
74    /// Tile format (e.g., "png", "webp", "mvt")
75    pub format: String,
76    /// Optional delta data
77    pub delta: Option<Vec<u8>>,
78    /// Timestamp
79    pub timestamp: i64,
80}
81
82impl TileUpdate {
83    /// Create a new full tile update
84    pub fn full(coord: TileCoord, data: Vec<u8>, format: String) -> Self {
85        Self {
86            coord,
87            update_type: TileUpdateType::Full,
88            data,
89            format,
90            delta: None,
91            timestamp: chrono::Utc::now().timestamp_millis(),
92        }
93    }
94
95    /// Create a new delta tile update
96    pub fn delta(coord: TileCoord, data: Vec<u8>, delta: Vec<u8>, format: String) -> Self {
97        Self {
98            coord,
99            update_type: TileUpdateType::Delta,
100            data,
101            format,
102            delta: Some(delta),
103            timestamp: chrono::Utc::now().timestamp_millis(),
104        }
105    }
106
107    /// Create an invalidation update
108    pub fn invalidate(coord: TileCoord) -> Self {
109        Self {
110            coord,
111            update_type: TileUpdateType::Invalidate,
112            data: Vec::new(),
113            format: String::new(),
114            delta: None,
115            timestamp: chrono::Utc::now().timestamp_millis(),
116        }
117    }
118
119    /// Convert to message
120    pub fn to_message(&self) -> Message {
121        let payload = Payload::TileData(TilePayload {
122            z: self.coord.z,
123            x: self.coord.x,
124            y: self.coord.y,
125            data: self.data.clone(),
126            format: self.format.clone(),
127            delta: self.delta.clone(),
128        });
129
130        Message::new(MessageType::TileUpdate, payload)
131    }
132}
133
134/// Tile update manager
135pub struct TileUpdateManager {
136    /// Pending updates by tile coordinate
137    updates: Arc<RwLock<HashMap<TileCoord, VecDeque<TileUpdate>>>>,
138    /// Maximum queue size per tile
139    max_queue_size: usize,
140    /// Statistics
141    stats: Arc<TileUpdateStats>,
142}
143
144/// Tile update statistics
145struct TileUpdateStats {
146    total_updates: AtomicU64,
147    full_updates: AtomicU64,
148    delta_updates: AtomicU64,
149    invalidations: AtomicU64,
150    dropped_updates: AtomicU64,
151}
152
153impl TileUpdateManager {
154    /// Create a new tile update manager
155    pub fn new(max_queue_size: usize) -> Self {
156        Self {
157            updates: Arc::new(RwLock::new(HashMap::new())),
158            max_queue_size,
159            stats: Arc::new(TileUpdateStats {
160                total_updates: AtomicU64::new(0),
161                full_updates: AtomicU64::new(0),
162                delta_updates: AtomicU64::new(0),
163                invalidations: AtomicU64::new(0),
164                dropped_updates: AtomicU64::new(0),
165            }),
166        }
167    }
168
169    /// Add a tile update
170    pub fn add_update(&self, update: TileUpdate) -> Result<()> {
171        self.stats.total_updates.fetch_add(1, Ordering::Relaxed);
172
173        match update.update_type {
174            TileUpdateType::Full => {
175                self.stats.full_updates.fetch_add(1, Ordering::Relaxed);
176            }
177            TileUpdateType::Delta => {
178                self.stats.delta_updates.fetch_add(1, Ordering::Relaxed);
179            }
180            TileUpdateType::Invalidate => {
181                self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
182            }
183        }
184
185        let mut updates = self.updates.write();
186        let queue = updates.entry(update.coord).or_default();
187
188        if queue.len() >= self.max_queue_size {
189            // Drop oldest update
190            queue.pop_front();
191            self.stats.dropped_updates.fetch_add(1, Ordering::Relaxed);
192        }
193
194        queue.push_back(update);
195        Ok(())
196    }
197
198    /// Get pending updates for a tile
199    pub fn get_updates(&self, coord: &TileCoord) -> Vec<TileUpdate> {
200        let mut updates = self.updates.write();
201
202        if let Some(queue) = updates.get_mut(coord) {
203            queue.drain(..).collect()
204        } else {
205            Vec::new()
206        }
207    }
208
209    /// Get all pending updates
210    pub fn get_all_updates(&self) -> HashMap<TileCoord, Vec<TileUpdate>> {
211        let mut updates = self.updates.write();
212        let mut result = HashMap::new();
213
214        for (coord, queue) in updates.iter_mut() {
215            result.insert(*coord, queue.drain(..).collect());
216        }
217
218        result
219    }
220
221    /// Clear updates for a tile
222    pub fn clear_tile(&self, coord: &TileCoord) {
223        let mut updates = self.updates.write();
224        updates.remove(coord);
225    }
226
227    /// Clear all updates
228    pub fn clear_all(&self) {
229        let mut updates = self.updates.write();
230        updates.clear();
231    }
232
233    /// Get pending update count
234    pub fn pending_count(&self) -> usize {
235        let updates = self.updates.read();
236        updates.values().map(|q| q.len()).sum()
237    }
238
239    /// Get statistics
240    pub async fn stats(&self) -> TileUpdateManagerStats {
241        TileUpdateManagerStats {
242            total_updates: self.stats.total_updates.load(Ordering::Relaxed),
243            full_updates: self.stats.full_updates.load(Ordering::Relaxed),
244            delta_updates: self.stats.delta_updates.load(Ordering::Relaxed),
245            invalidations: self.stats.invalidations.load(Ordering::Relaxed),
246            dropped_updates: self.stats.dropped_updates.load(Ordering::Relaxed),
247            pending_updates: self.pending_count(),
248        }
249    }
250}
251
252/// Tile update manager statistics
253#[derive(Debug, Clone)]
254pub struct TileUpdateManagerStats {
255    /// Total updates
256    pub total_updates: u64,
257    /// Full updates
258    pub full_updates: u64,
259    /// Delta updates
260    pub delta_updates: u64,
261    /// Invalidations
262    pub invalidations: u64,
263    /// Dropped updates
264    pub dropped_updates: u64,
265    /// Pending updates
266    pub pending_updates: usize,
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272
273    #[test]
274    fn test_tile_coord() {
275        let coord = TileCoord::new(10, 512, 384);
276        assert_eq!(coord.z, 10);
277        assert_eq!(coord.x, 512);
278        assert_eq!(coord.y, 384);
279    }
280
281    #[test]
282    fn test_tile_coord_string() -> Result<()> {
283        let coord = TileCoord::new(10, 512, 384);
284        let s = coord.to_string();
285        assert_eq!(s, "10/512/384");
286
287        let parsed = TileCoord::from_string(&s)?;
288        assert_eq!(parsed, coord);
289        Ok(())
290    }
291
292    #[test]
293    fn test_tile_update_full() {
294        let coord = TileCoord::new(10, 512, 384);
295        let data = vec![1, 2, 3, 4];
296        let update = TileUpdate::full(coord, data.clone(), "png".to_string());
297
298        assert_eq!(update.coord, coord);
299        assert_eq!(update.update_type, TileUpdateType::Full);
300        assert_eq!(update.data, data);
301        assert_eq!(update.format, "png");
302    }
303
304    #[test]
305    fn test_tile_update_delta() {
306        let coord = TileCoord::new(10, 512, 384);
307        let data = vec![1, 2, 3, 4];
308        let delta = vec![5, 6, 7, 8];
309        let update = TileUpdate::delta(coord, data.clone(), delta.clone(), "png".to_string());
310
311        assert_eq!(update.update_type, TileUpdateType::Delta);
312        assert_eq!(update.delta, Some(delta));
313    }
314
315    #[test]
316    fn test_tile_update_manager() -> Result<()> {
317        let manager = TileUpdateManager::new(10);
318        let coord = TileCoord::new(10, 512, 384);
319        let update = TileUpdate::full(coord, vec![1, 2, 3, 4], "png".to_string());
320
321        manager.add_update(update)?;
322        assert_eq!(manager.pending_count(), 1);
323
324        let updates = manager.get_updates(&coord);
325        assert_eq!(updates.len(), 1);
326        assert_eq!(manager.pending_count(), 0);
327        Ok(())
328    }
329
330    #[tokio::test]
331    async fn test_tile_update_stats() -> Result<()> {
332        let manager = TileUpdateManager::new(10);
333        let coord = TileCoord::new(10, 512, 384);
334
335        let full = TileUpdate::full(coord, vec![1, 2, 3], "png".to_string());
336        let delta = TileUpdate::delta(coord, vec![1, 2], vec![3, 4], "png".to_string());
337        let inv = TileUpdate::invalidate(coord);
338
339        manager.add_update(full)?;
340        manager.add_update(delta)?;
341        manager.add_update(inv)?;
342
343        let stats = manager.stats().await;
344        assert_eq!(stats.total_updates, 3);
345        assert_eq!(stats.full_updates, 1);
346        assert_eq!(stats.delta_updates, 1);
347        assert_eq!(stats.invalidations, 1);
348        Ok(())
349    }
350}