oxigdal_websocket/updates/
feature_updates.rs1use crate::error::Result;
4use crate::protocol::message::{ChangeType, FeaturePayload, Message, MessageType, Payload};
5use parking_lot::RwLock;
6use std::collections::{HashMap, VecDeque};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum FeatureUpdateType {
13 Created,
15 Updated,
17 Deleted,
19}
20
21impl From<FeatureUpdateType> for ChangeType {
22 fn from(update_type: FeatureUpdateType) -> Self {
23 match update_type {
24 FeatureUpdateType::Created => ChangeType::Created,
25 FeatureUpdateType::Updated => ChangeType::Updated,
26 FeatureUpdateType::Deleted => ChangeType::Deleted,
27 }
28 }
29}
30
31pub struct FeatureUpdate {
33 pub id: String,
35 pub layer: String,
37 pub update_type: FeatureUpdateType,
39 pub feature: serde_json::Value,
41 pub timestamp: i64,
43}
44
45impl FeatureUpdate {
46 pub fn new(
48 id: String,
49 layer: String,
50 update_type: FeatureUpdateType,
51 feature: serde_json::Value,
52 ) -> Self {
53 Self {
54 id,
55 layer,
56 update_type,
57 feature,
58 timestamp: chrono::Utc::now().timestamp_millis(),
59 }
60 }
61
62 pub fn created(id: String, layer: String, feature: serde_json::Value) -> Self {
64 Self::new(id, layer, FeatureUpdateType::Created, feature)
65 }
66
67 pub fn updated(id: String, layer: String, feature: serde_json::Value) -> Self {
69 Self::new(id, layer, FeatureUpdateType::Updated, feature)
70 }
71
72 pub fn deleted(id: String, layer: String) -> Self {
74 Self::new(
75 id,
76 layer,
77 FeatureUpdateType::Deleted,
78 serde_json::Value::Null,
79 )
80 }
81
82 pub fn to_message(&self) -> Message {
84 let payload = Payload::FeatureData(FeaturePayload {
85 id: self.id.clone(),
86 layer: self.layer.clone(),
87 feature: self.feature.clone(),
88 change_type: self.update_type.into(),
89 });
90
91 Message::new(MessageType::FeatureUpdate, payload)
92 }
93}
94
95pub struct FeatureUpdateManager {
97 updates: Arc<RwLock<HashMap<String, VecDeque<FeatureUpdate>>>>,
99 max_queue_size: usize,
101 stats: Arc<FeatureUpdateStats>,
103}
104
105struct FeatureUpdateStats {
107 total_updates: AtomicU64,
108 created: AtomicU64,
109 updated: AtomicU64,
110 deleted: AtomicU64,
111 dropped_updates: AtomicU64,
112}
113
114impl FeatureUpdateManager {
115 pub fn new(max_queue_size: usize) -> Self {
117 Self {
118 updates: Arc::new(RwLock::new(HashMap::new())),
119 max_queue_size,
120 stats: Arc::new(FeatureUpdateStats {
121 total_updates: AtomicU64::new(0),
122 created: AtomicU64::new(0),
123 updated: AtomicU64::new(0),
124 deleted: AtomicU64::new(0),
125 dropped_updates: AtomicU64::new(0),
126 }),
127 }
128 }
129
130 pub fn add_update(&self, update: FeatureUpdate) -> Result<()> {
132 self.stats.total_updates.fetch_add(1, Ordering::Relaxed);
133
134 match update.update_type {
135 FeatureUpdateType::Created => {
136 self.stats.created.fetch_add(1, Ordering::Relaxed);
137 }
138 FeatureUpdateType::Updated => {
139 self.stats.updated.fetch_add(1, Ordering::Relaxed);
140 }
141 FeatureUpdateType::Deleted => {
142 self.stats.deleted.fetch_add(1, Ordering::Relaxed);
143 }
144 }
145
146 let mut updates = self.updates.write();
147 let queue = updates.entry(update.layer.clone()).or_default();
148
149 if queue.len() >= self.max_queue_size {
150 queue.pop_front();
152 self.stats.dropped_updates.fetch_add(1, Ordering::Relaxed);
153 }
154
155 queue.push_back(update);
156 Ok(())
157 }
158
159 pub fn get_updates(&self, layer: &str) -> Vec<FeatureUpdate> {
161 let mut updates = self.updates.write();
162
163 if let Some(queue) = updates.get_mut(layer) {
164 queue.drain(..).collect()
165 } else {
166 Vec::new()
167 }
168 }
169
170 pub fn get_all_updates(&self) -> HashMap<String, Vec<FeatureUpdate>> {
172 let mut updates = self.updates.write();
173 let mut result = HashMap::new();
174
175 for (layer, queue) in updates.iter_mut() {
176 result.insert(layer.clone(), queue.drain(..).collect());
177 }
178
179 result
180 }
181
182 pub fn clear_layer(&self, layer: &str) {
184 let mut updates = self.updates.write();
185 updates.remove(layer);
186 }
187
188 pub fn clear_all(&self) {
190 let mut updates = self.updates.write();
191 updates.clear();
192 }
193
194 pub fn pending_count(&self) -> usize {
196 let updates = self.updates.read();
197 updates.values().map(|q| q.len()).sum()
198 }
199
200 pub fn layers_with_updates(&self) -> Vec<String> {
202 let updates = self.updates.read();
203 updates
204 .iter()
205 .filter(|(_, q)| !q.is_empty())
206 .map(|(k, _)| k.clone())
207 .collect()
208 }
209
210 pub async fn stats(&self) -> FeatureUpdateManagerStats {
212 FeatureUpdateManagerStats {
213 total_updates: self.stats.total_updates.load(Ordering::Relaxed),
214 created: self.stats.created.load(Ordering::Relaxed),
215 updated: self.stats.updated.load(Ordering::Relaxed),
216 deleted: self.stats.deleted.load(Ordering::Relaxed),
217 dropped_updates: self.stats.dropped_updates.load(Ordering::Relaxed),
218 pending_updates: self.pending_count(),
219 }
220 }
221}
222
223#[derive(Debug, Clone)]
225pub struct FeatureUpdateManagerStats {
226 pub total_updates: u64,
228 pub created: u64,
230 pub updated: u64,
232 pub deleted: u64,
234 pub dropped_updates: u64,
236 pub pending_updates: usize,
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243
244 #[test]
245 fn test_feature_update() {
246 let feature = serde_json::json!({
247 "type": "Feature",
248 "geometry": {"type": "Point", "coordinates": [0.0, 0.0]},
249 "properties": {}
250 });
251
252 let update = FeatureUpdate::created("f1".to_string(), "layer1".to_string(), feature);
253
254 assert_eq!(update.id, "f1");
255 assert_eq!(update.layer, "layer1");
256 assert_eq!(update.update_type, FeatureUpdateType::Created);
257 }
258
259 #[test]
260 fn test_feature_update_deleted() {
261 let update = FeatureUpdate::deleted("f1".to_string(), "layer1".to_string());
262
263 assert_eq!(update.update_type, FeatureUpdateType::Deleted);
264 assert_eq!(update.feature, serde_json::Value::Null);
265 }
266
267 #[test]
268 fn test_feature_update_manager() -> Result<()> {
269 let manager = FeatureUpdateManager::new(10);
270 let feature = serde_json::json!({"type": "Feature"});
271
272 let update = FeatureUpdate::created("f1".to_string(), "layer1".to_string(), feature);
273
274 manager.add_update(update)?;
275 assert_eq!(manager.pending_count(), 1);
276
277 let updates = manager.get_updates("layer1");
278 assert_eq!(updates.len(), 1);
279 assert_eq!(manager.pending_count(), 0);
280 Ok(())
281 }
282
283 #[test]
284 fn test_feature_update_layers() -> Result<()> {
285 let manager = FeatureUpdateManager::new(10);
286 let feature = serde_json::json!({"type": "Feature"});
287
288 let update1 =
289 FeatureUpdate::created("f1".to_string(), "layer1".to_string(), feature.clone());
290 let update2 = FeatureUpdate::created("f2".to_string(), "layer2".to_string(), feature);
291
292 manager.add_update(update1)?;
293 manager.add_update(update2)?;
294
295 let layers = manager.layers_with_updates();
296 assert_eq!(layers.len(), 2);
297 assert!(layers.contains(&"layer1".to_string()));
298 assert!(layers.contains(&"layer2".to_string()));
299 Ok(())
300 }
301
302 #[tokio::test]
303 async fn test_feature_update_stats() -> Result<()> {
304 let manager = FeatureUpdateManager::new(10);
305 let feature = serde_json::json!({"type": "Feature"});
306
307 let created =
308 FeatureUpdate::created("f1".to_string(), "layer1".to_string(), feature.clone());
309 let updated = FeatureUpdate::updated("f2".to_string(), "layer1".to_string(), feature);
310 let deleted = FeatureUpdate::deleted("f3".to_string(), "layer1".to_string());
311
312 manager.add_update(created)?;
313 manager.add_update(updated)?;
314 manager.add_update(deleted)?;
315
316 let stats = manager.stats().await;
317 assert_eq!(stats.total_updates, 3);
318 assert_eq!(stats.created, 1);
319 assert_eq!(stats.updated, 1);
320 assert_eq!(stats.deleted, 1);
321 Ok(())
322 }
323}