Skip to main content

oxigdal_websocket/updates/
mod.rs

1//! Live updates system for real-time data changes
2//!
3//! This module provides:
4//! - Tile update notifications
5//! - Feature change tracking
6//! - Change stream processing
7//! - Incremental update delivery
8
9pub mod change_stream;
10pub mod feature_updates;
11pub mod incremental;
12pub mod tile_updates;
13
14pub use change_stream::{ChangeEvent, ChangeStream, ChangeStreamConfig};
15pub use feature_updates::{FeatureUpdate, FeatureUpdateManager, FeatureUpdateType};
16pub use incremental::{IncrementalUpdate, IncrementalUpdateManager, UpdateDelta};
17pub use tile_updates::{TileUpdate, TileUpdateManager, TileUpdateType};
18
19use std::sync::Arc;
20use tokio::sync::RwLock;
21
22/// Update configuration
23#[derive(Debug, Clone)]
24pub struct UpdateConfig {
25    /// Enable tile updates
26    pub enable_tile_updates: bool,
27    /// Enable feature updates
28    pub enable_feature_updates: bool,
29    /// Enable change streams
30    pub enable_change_streams: bool,
31    /// Maximum update queue size
32    pub max_queue_size: usize,
33    /// Update batch size
34    pub batch_size: usize,
35    /// Update interval in milliseconds
36    pub update_interval_ms: u64,
37}
38
39impl Default for UpdateConfig {
40    fn default() -> Self {
41        Self {
42            enable_tile_updates: true,
43            enable_feature_updates: true,
44            enable_change_streams: true,
45            max_queue_size: 10_000,
46            batch_size: 100,
47            update_interval_ms: 100,
48        }
49    }
50}
51
52/// Update system manager
53pub struct UpdateSystem {
54    #[allow(dead_code)]
55    config: UpdateConfig,
56    tile_manager: Arc<TileUpdateManager>,
57    feature_manager: Arc<FeatureUpdateManager>,
58    incremental_manager: Arc<IncrementalUpdateManager>,
59    change_streams: Arc<RwLock<std::collections::HashMap<String, Arc<ChangeStream>>>>,
60}
61
62impl UpdateSystem {
63    /// Create a new update system
64    pub fn new(config: UpdateConfig) -> Self {
65        Self {
66            config: config.clone(),
67            tile_manager: Arc::new(TileUpdateManager::new(config.max_queue_size)),
68            feature_manager: Arc::new(FeatureUpdateManager::new(config.max_queue_size)),
69            incremental_manager: Arc::new(IncrementalUpdateManager::new()),
70            change_streams: Arc::new(RwLock::new(std::collections::HashMap::new())),
71        }
72    }
73
74    /// Get tile update manager
75    pub fn tile_manager(&self) -> &Arc<TileUpdateManager> {
76        &self.tile_manager
77    }
78
79    /// Get feature update manager
80    pub fn feature_manager(&self) -> &Arc<FeatureUpdateManager> {
81        &self.feature_manager
82    }
83
84    /// Get incremental update manager
85    pub fn incremental_manager(&self) -> &Arc<IncrementalUpdateManager> {
86        &self.incremental_manager
87    }
88
89    /// Create or get a change stream
90    pub async fn get_or_create_stream(&self, name: &str) -> Arc<ChangeStream> {
91        let mut streams = self.change_streams.write().await;
92
93        streams
94            .entry(name.to_string())
95            .or_insert_with(|| {
96                Arc::new(ChangeStream::new(
97                    name.to_string(),
98                    ChangeStreamConfig::default(),
99                ))
100            })
101            .clone()
102    }
103
104    /// Remove a change stream
105    pub async fn remove_stream(&self, name: &str) -> Option<Arc<ChangeStream>> {
106        let mut streams = self.change_streams.write().await;
107        streams.remove(name)
108    }
109
110    /// Get update statistics
111    pub async fn stats(&self) -> UpdateStats {
112        let tile_stats = self.tile_manager.stats().await;
113        let feature_stats = self.feature_manager.stats().await;
114
115        UpdateStats {
116            tile_updates: tile_stats.total_updates,
117            feature_updates: feature_stats.total_updates,
118            change_streams: self.change_streams.read().await.len(),
119        }
120    }
121}
122
123/// Update statistics
124#[derive(Debug, Clone)]
125pub struct UpdateStats {
126    /// Total tile updates
127    pub tile_updates: u64,
128    /// Total feature updates
129    pub feature_updates: u64,
130    /// Number of change streams
131    pub change_streams: usize,
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137
138    #[test]
139    fn test_update_config_default() {
140        let config = UpdateConfig::default();
141        assert!(config.enable_tile_updates);
142        assert!(config.enable_feature_updates);
143        assert!(config.enable_change_streams);
144    }
145
146    #[tokio::test]
147    async fn test_update_system() {
148        let config = UpdateConfig::default();
149        let system = UpdateSystem::new(config);
150
151        let stats = system.stats().await;
152        assert_eq!(stats.tile_updates, 0);
153        assert_eq!(stats.feature_updates, 0);
154        assert_eq!(stats.change_streams, 0);
155    }
156}