Skip to main content

oxigdal_websocket/updates/
feature_updates.rs

1//! Feature update management and notifications
2
3use crate::error::Result;
4use crate::protocol::message::{ChangeType, FeaturePayload, Message, MessageType, Payload};
5use parking_lot::RwLock;
6use std::collections::{HashMap, VecDeque};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9
10/// Feature update type
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum FeatureUpdateType {
13    /// Feature created
14    Created,
15    /// Feature updated
16    Updated,
17    /// Feature deleted
18    Deleted,
19}
20
21impl From<FeatureUpdateType> for ChangeType {
22    fn from(update_type: FeatureUpdateType) -> Self {
23        match update_type {
24            FeatureUpdateType::Created => ChangeType::Created,
25            FeatureUpdateType::Updated => ChangeType::Updated,
26            FeatureUpdateType::Deleted => ChangeType::Deleted,
27        }
28    }
29}
30
31/// Feature update
32pub struct FeatureUpdate {
33    /// Feature ID
34    pub id: String,
35    /// Layer name
36    pub layer: String,
37    /// Update type
38    pub update_type: FeatureUpdateType,
39    /// GeoJSON feature
40    pub feature: serde_json::Value,
41    /// Timestamp
42    pub timestamp: i64,
43}
44
45impl FeatureUpdate {
46    /// Create a new feature update
47    pub fn new(
48        id: String,
49        layer: String,
50        update_type: FeatureUpdateType,
51        feature: serde_json::Value,
52    ) -> Self {
53        Self {
54            id,
55            layer,
56            update_type,
57            feature,
58            timestamp: chrono::Utc::now().timestamp_millis(),
59        }
60    }
61
62    /// Create a created update
63    pub fn created(id: String, layer: String, feature: serde_json::Value) -> Self {
64        Self::new(id, layer, FeatureUpdateType::Created, feature)
65    }
66
67    /// Create an updated update
68    pub fn updated(id: String, layer: String, feature: serde_json::Value) -> Self {
69        Self::new(id, layer, FeatureUpdateType::Updated, feature)
70    }
71
72    /// Create a deleted update
73    pub fn deleted(id: String, layer: String) -> Self {
74        Self::new(
75            id,
76            layer,
77            FeatureUpdateType::Deleted,
78            serde_json::Value::Null,
79        )
80    }
81
82    /// Convert to message
83    pub fn to_message(&self) -> Message {
84        let payload = Payload::FeatureData(FeaturePayload {
85            id: self.id.clone(),
86            layer: self.layer.clone(),
87            feature: self.feature.clone(),
88            change_type: self.update_type.into(),
89        });
90
91        Message::new(MessageType::FeatureUpdate, payload)
92    }
93}
94
95/// Feature update manager
96pub struct FeatureUpdateManager {
97    /// Pending updates by layer
98    updates: Arc<RwLock<HashMap<String, VecDeque<FeatureUpdate>>>>,
99    /// Maximum queue size per layer
100    max_queue_size: usize,
101    /// Statistics
102    stats: Arc<FeatureUpdateStats>,
103}
104
105/// Feature update statistics
106struct FeatureUpdateStats {
107    total_updates: AtomicU64,
108    created: AtomicU64,
109    updated: AtomicU64,
110    deleted: AtomicU64,
111    dropped_updates: AtomicU64,
112}
113
114impl FeatureUpdateManager {
115    /// Create a new feature update manager
116    pub fn new(max_queue_size: usize) -> Self {
117        Self {
118            updates: Arc::new(RwLock::new(HashMap::new())),
119            max_queue_size,
120            stats: Arc::new(FeatureUpdateStats {
121                total_updates: AtomicU64::new(0),
122                created: AtomicU64::new(0),
123                updated: AtomicU64::new(0),
124                deleted: AtomicU64::new(0),
125                dropped_updates: AtomicU64::new(0),
126            }),
127        }
128    }
129
130    /// Add a feature update
131    pub fn add_update(&self, update: FeatureUpdate) -> Result<()> {
132        self.stats.total_updates.fetch_add(1, Ordering::Relaxed);
133
134        match update.update_type {
135            FeatureUpdateType::Created => {
136                self.stats.created.fetch_add(1, Ordering::Relaxed);
137            }
138            FeatureUpdateType::Updated => {
139                self.stats.updated.fetch_add(1, Ordering::Relaxed);
140            }
141            FeatureUpdateType::Deleted => {
142                self.stats.deleted.fetch_add(1, Ordering::Relaxed);
143            }
144        }
145
146        let mut updates = self.updates.write();
147        let queue = updates.entry(update.layer.clone()).or_default();
148
149        if queue.len() >= self.max_queue_size {
150            // Drop oldest update
151            queue.pop_front();
152            self.stats.dropped_updates.fetch_add(1, Ordering::Relaxed);
153        }
154
155        queue.push_back(update);
156        Ok(())
157    }
158
159    /// Get pending updates for a layer
160    pub fn get_updates(&self, layer: &str) -> Vec<FeatureUpdate> {
161        let mut updates = self.updates.write();
162
163        if let Some(queue) = updates.get_mut(layer) {
164            queue.drain(..).collect()
165        } else {
166            Vec::new()
167        }
168    }
169
170    /// Get all pending updates
171    pub fn get_all_updates(&self) -> HashMap<String, Vec<FeatureUpdate>> {
172        let mut updates = self.updates.write();
173        let mut result = HashMap::new();
174
175        for (layer, queue) in updates.iter_mut() {
176            result.insert(layer.clone(), queue.drain(..).collect());
177        }
178
179        result
180    }
181
182    /// Clear updates for a layer
183    pub fn clear_layer(&self, layer: &str) {
184        let mut updates = self.updates.write();
185        updates.remove(layer);
186    }
187
188    /// Clear all updates
189    pub fn clear_all(&self) {
190        let mut updates = self.updates.write();
191        updates.clear();
192    }
193
194    /// Get pending update count
195    pub fn pending_count(&self) -> usize {
196        let updates = self.updates.read();
197        updates.values().map(|q| q.len()).sum()
198    }
199
200    /// Get layers with pending updates
201    pub fn layers_with_updates(&self) -> Vec<String> {
202        let updates = self.updates.read();
203        updates
204            .iter()
205            .filter(|(_, q)| !q.is_empty())
206            .map(|(k, _)| k.clone())
207            .collect()
208    }
209
210    /// Get statistics
211    pub async fn stats(&self) -> FeatureUpdateManagerStats {
212        FeatureUpdateManagerStats {
213            total_updates: self.stats.total_updates.load(Ordering::Relaxed),
214            created: self.stats.created.load(Ordering::Relaxed),
215            updated: self.stats.updated.load(Ordering::Relaxed),
216            deleted: self.stats.deleted.load(Ordering::Relaxed),
217            dropped_updates: self.stats.dropped_updates.load(Ordering::Relaxed),
218            pending_updates: self.pending_count(),
219        }
220    }
221}
222
223/// Feature update manager statistics
224#[derive(Debug, Clone)]
225pub struct FeatureUpdateManagerStats {
226    /// Total updates
227    pub total_updates: u64,
228    /// Created features
229    pub created: u64,
230    /// Updated features
231    pub updated: u64,
232    /// Deleted features
233    pub deleted: u64,
234    /// Dropped updates
235    pub dropped_updates: u64,
236    /// Pending updates
237    pub pending_updates: usize,
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243
244    #[test]
245    fn test_feature_update() {
246        let feature = serde_json::json!({
247            "type": "Feature",
248            "geometry": {"type": "Point", "coordinates": [0.0, 0.0]},
249            "properties": {}
250        });
251
252        let update = FeatureUpdate::created("f1".to_string(), "layer1".to_string(), feature);
253
254        assert_eq!(update.id, "f1");
255        assert_eq!(update.layer, "layer1");
256        assert_eq!(update.update_type, FeatureUpdateType::Created);
257    }
258
259    #[test]
260    fn test_feature_update_deleted() {
261        let update = FeatureUpdate::deleted("f1".to_string(), "layer1".to_string());
262
263        assert_eq!(update.update_type, FeatureUpdateType::Deleted);
264        assert_eq!(update.feature, serde_json::Value::Null);
265    }
266
267    #[test]
268    fn test_feature_update_manager() -> Result<()> {
269        let manager = FeatureUpdateManager::new(10);
270        let feature = serde_json::json!({"type": "Feature"});
271
272        let update = FeatureUpdate::created("f1".to_string(), "layer1".to_string(), feature);
273
274        manager.add_update(update)?;
275        assert_eq!(manager.pending_count(), 1);
276
277        let updates = manager.get_updates("layer1");
278        assert_eq!(updates.len(), 1);
279        assert_eq!(manager.pending_count(), 0);
280        Ok(())
281    }
282
283    #[test]
284    fn test_feature_update_layers() -> Result<()> {
285        let manager = FeatureUpdateManager::new(10);
286        let feature = serde_json::json!({"type": "Feature"});
287
288        let update1 =
289            FeatureUpdate::created("f1".to_string(), "layer1".to_string(), feature.clone());
290        let update2 = FeatureUpdate::created("f2".to_string(), "layer2".to_string(), feature);
291
292        manager.add_update(update1)?;
293        manager.add_update(update2)?;
294
295        let layers = manager.layers_with_updates();
296        assert_eq!(layers.len(), 2);
297        assert!(layers.contains(&"layer1".to_string()));
298        assert!(layers.contains(&"layer2".to_string()));
299        Ok(())
300    }
301
302    #[tokio::test]
303    async fn test_feature_update_stats() -> Result<()> {
304        let manager = FeatureUpdateManager::new(10);
305        let feature = serde_json::json!({"type": "Feature"});
306
307        let created =
308            FeatureUpdate::created("f1".to_string(), "layer1".to_string(), feature.clone());
309        let updated = FeatureUpdate::updated("f2".to_string(), "layer1".to_string(), feature);
310        let deleted = FeatureUpdate::deleted("f3".to_string(), "layer1".to_string());
311
312        manager.add_update(created)?;
313        manager.add_update(updated)?;
314        manager.add_update(deleted)?;
315
316        let stats = manager.stats().await;
317        assert_eq!(stats.total_updates, 3);
318        assert_eq!(stats.created, 1);
319        assert_eq!(stats.updated, 1);
320        assert_eq!(stats.deleted, 1);
321        Ok(())
322    }
323}