1use crate::error::{Error, Result};
4use crate::protocol::message::{Message, MessageType, Payload, TilePayload};
5use parking_lot::RwLock;
6use std::collections::{HashMap, VecDeque};
7use std::fmt;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum TileUpdateType {
14 Full,
16 Delta,
18 Invalidate,
20}
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
24pub struct TileCoord {
25 pub z: u8,
27 pub x: u32,
29 pub y: u32,
31}
32
33impl TileCoord {
34 pub fn new(z: u8, x: u32, y: u32) -> Self {
36 Self { z, x, y }
37 }
38
39 pub fn from_string(s: &str) -> Result<Self> {
41 let parts: Vec<&str> = s.split('/').collect();
42 if parts.len() != 3 {
43 return Err(Error::Protocol(format!("Invalid tile coord format: {}", s)));
44 }
45
46 let z = parts[0]
47 .parse()
48 .map_err(|_| Error::Protocol("Invalid z coordinate".to_string()))?;
49 let x = parts[1]
50 .parse()
51 .map_err(|_| Error::Protocol("Invalid x coordinate".to_string()))?;
52 let y = parts[2]
53 .parse()
54 .map_err(|_| Error::Protocol("Invalid y coordinate".to_string()))?;
55
56 Ok(Self { z, x, y })
57 }
58}
59
60impl fmt::Display for TileCoord {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 write!(f, "{}/{}/{}", self.z, self.x, self.y)
63 }
64}
65
66pub struct TileUpdate {
68 pub coord: TileCoord,
70 pub update_type: TileUpdateType,
72 pub data: Vec<u8>,
74 pub format: String,
76 pub delta: Option<Vec<u8>>,
78 pub timestamp: i64,
80}
81
82impl TileUpdate {
83 pub fn full(coord: TileCoord, data: Vec<u8>, format: String) -> Self {
85 Self {
86 coord,
87 update_type: TileUpdateType::Full,
88 data,
89 format,
90 delta: None,
91 timestamp: chrono::Utc::now().timestamp_millis(),
92 }
93 }
94
95 pub fn delta(coord: TileCoord, data: Vec<u8>, delta: Vec<u8>, format: String) -> Self {
97 Self {
98 coord,
99 update_type: TileUpdateType::Delta,
100 data,
101 format,
102 delta: Some(delta),
103 timestamp: chrono::Utc::now().timestamp_millis(),
104 }
105 }
106
107 pub fn invalidate(coord: TileCoord) -> Self {
109 Self {
110 coord,
111 update_type: TileUpdateType::Invalidate,
112 data: Vec::new(),
113 format: String::new(),
114 delta: None,
115 timestamp: chrono::Utc::now().timestamp_millis(),
116 }
117 }
118
119 pub fn to_message(&self) -> Message {
121 let payload = Payload::TileData(TilePayload {
122 z: self.coord.z,
123 x: self.coord.x,
124 y: self.coord.y,
125 data: self.data.clone(),
126 format: self.format.clone(),
127 delta: self.delta.clone(),
128 });
129
130 Message::new(MessageType::TileUpdate, payload)
131 }
132}
133
134pub struct TileUpdateManager {
136 updates: Arc<RwLock<HashMap<TileCoord, VecDeque<TileUpdate>>>>,
138 max_queue_size: usize,
140 stats: Arc<TileUpdateStats>,
142}
143
144struct TileUpdateStats {
146 total_updates: AtomicU64,
147 full_updates: AtomicU64,
148 delta_updates: AtomicU64,
149 invalidations: AtomicU64,
150 dropped_updates: AtomicU64,
151}
152
153impl TileUpdateManager {
154 pub fn new(max_queue_size: usize) -> Self {
156 Self {
157 updates: Arc::new(RwLock::new(HashMap::new())),
158 max_queue_size,
159 stats: Arc::new(TileUpdateStats {
160 total_updates: AtomicU64::new(0),
161 full_updates: AtomicU64::new(0),
162 delta_updates: AtomicU64::new(0),
163 invalidations: AtomicU64::new(0),
164 dropped_updates: AtomicU64::new(0),
165 }),
166 }
167 }
168
169 pub fn add_update(&self, update: TileUpdate) -> Result<()> {
171 self.stats.total_updates.fetch_add(1, Ordering::Relaxed);
172
173 match update.update_type {
174 TileUpdateType::Full => {
175 self.stats.full_updates.fetch_add(1, Ordering::Relaxed);
176 }
177 TileUpdateType::Delta => {
178 self.stats.delta_updates.fetch_add(1, Ordering::Relaxed);
179 }
180 TileUpdateType::Invalidate => {
181 self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
182 }
183 }
184
185 let mut updates = self.updates.write();
186 let queue = updates.entry(update.coord).or_default();
187
188 if queue.len() >= self.max_queue_size {
189 queue.pop_front();
191 self.stats.dropped_updates.fetch_add(1, Ordering::Relaxed);
192 }
193
194 queue.push_back(update);
195 Ok(())
196 }
197
198 pub fn get_updates(&self, coord: &TileCoord) -> Vec<TileUpdate> {
200 let mut updates = self.updates.write();
201
202 if let Some(queue) = updates.get_mut(coord) {
203 queue.drain(..).collect()
204 } else {
205 Vec::new()
206 }
207 }
208
209 pub fn get_all_updates(&self) -> HashMap<TileCoord, Vec<TileUpdate>> {
211 let mut updates = self.updates.write();
212 let mut result = HashMap::new();
213
214 for (coord, queue) in updates.iter_mut() {
215 result.insert(*coord, queue.drain(..).collect());
216 }
217
218 result
219 }
220
221 pub fn clear_tile(&self, coord: &TileCoord) {
223 let mut updates = self.updates.write();
224 updates.remove(coord);
225 }
226
227 pub fn clear_all(&self) {
229 let mut updates = self.updates.write();
230 updates.clear();
231 }
232
233 pub fn pending_count(&self) -> usize {
235 let updates = self.updates.read();
236 updates.values().map(|q| q.len()).sum()
237 }
238
239 pub async fn stats(&self) -> TileUpdateManagerStats {
241 TileUpdateManagerStats {
242 total_updates: self.stats.total_updates.load(Ordering::Relaxed),
243 full_updates: self.stats.full_updates.load(Ordering::Relaxed),
244 delta_updates: self.stats.delta_updates.load(Ordering::Relaxed),
245 invalidations: self.stats.invalidations.load(Ordering::Relaxed),
246 dropped_updates: self.stats.dropped_updates.load(Ordering::Relaxed),
247 pending_updates: self.pending_count(),
248 }
249 }
250}
251
252#[derive(Debug, Clone)]
254pub struct TileUpdateManagerStats {
255 pub total_updates: u64,
257 pub full_updates: u64,
259 pub delta_updates: u64,
261 pub invalidations: u64,
263 pub dropped_updates: u64,
265 pub pending_updates: usize,
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272
273 #[test]
274 fn test_tile_coord() {
275 let coord = TileCoord::new(10, 512, 384);
276 assert_eq!(coord.z, 10);
277 assert_eq!(coord.x, 512);
278 assert_eq!(coord.y, 384);
279 }
280
281 #[test]
282 fn test_tile_coord_string() -> Result<()> {
283 let coord = TileCoord::new(10, 512, 384);
284 let s = coord.to_string();
285 assert_eq!(s, "10/512/384");
286
287 let parsed = TileCoord::from_string(&s)?;
288 assert_eq!(parsed, coord);
289 Ok(())
290 }
291
292 #[test]
293 fn test_tile_update_full() {
294 let coord = TileCoord::new(10, 512, 384);
295 let data = vec![1, 2, 3, 4];
296 let update = TileUpdate::full(coord, data.clone(), "png".to_string());
297
298 assert_eq!(update.coord, coord);
299 assert_eq!(update.update_type, TileUpdateType::Full);
300 assert_eq!(update.data, data);
301 assert_eq!(update.format, "png");
302 }
303
304 #[test]
305 fn test_tile_update_delta() {
306 let coord = TileCoord::new(10, 512, 384);
307 let data = vec![1, 2, 3, 4];
308 let delta = vec![5, 6, 7, 8];
309 let update = TileUpdate::delta(coord, data.clone(), delta.clone(), "png".to_string());
310
311 assert_eq!(update.update_type, TileUpdateType::Delta);
312 assert_eq!(update.delta, Some(delta));
313 }
314
315 #[test]
316 fn test_tile_update_manager() -> Result<()> {
317 let manager = TileUpdateManager::new(10);
318 let coord = TileCoord::new(10, 512, 384);
319 let update = TileUpdate::full(coord, vec![1, 2, 3, 4], "png".to_string());
320
321 manager.add_update(update)?;
322 assert_eq!(manager.pending_count(), 1);
323
324 let updates = manager.get_updates(&coord);
325 assert_eq!(updates.len(), 1);
326 assert_eq!(manager.pending_count(), 0);
327 Ok(())
328 }
329
330 #[tokio::test]
331 async fn test_tile_update_stats() -> Result<()> {
332 let manager = TileUpdateManager::new(10);
333 let coord = TileCoord::new(10, 512, 384);
334
335 let full = TileUpdate::full(coord, vec![1, 2, 3], "png".to_string());
336 let delta = TileUpdate::delta(coord, vec![1, 2], vec![3, 4], "png".to_string());
337 let inv = TileUpdate::invalidate(coord);
338
339 manager.add_update(full)?;
340 manager.add_update(delta)?;
341 manager.add_update(inv)?;
342
343 let stats = manager.stats().await;
344 assert_eq!(stats.total_updates, 3);
345 assert_eq!(stats.full_updates, 1);
346 assert_eq!(stats.delta_updates, 1);
347 assert_eq!(stats.invalidations, 1);
348 Ok(())
349 }
350}