oxigdal_websocket/updates/
mod.rs1pub mod change_stream;
10pub mod feature_updates;
11pub mod incremental;
12pub mod tile_updates;
13
14pub use change_stream::{ChangeEvent, ChangeStream, ChangeStreamConfig};
15pub use feature_updates::{FeatureUpdate, FeatureUpdateManager, FeatureUpdateType};
16pub use incremental::{IncrementalUpdate, IncrementalUpdateManager, UpdateDelta};
17pub use tile_updates::{TileUpdate, TileUpdateManager, TileUpdateType};
18
19use std::sync::Arc;
20use tokio::sync::RwLock;
21
22#[derive(Debug, Clone)]
24pub struct UpdateConfig {
25 pub enable_tile_updates: bool,
27 pub enable_feature_updates: bool,
29 pub enable_change_streams: bool,
31 pub max_queue_size: usize,
33 pub batch_size: usize,
35 pub update_interval_ms: u64,
37}
38
39impl Default for UpdateConfig {
40 fn default() -> Self {
41 Self {
42 enable_tile_updates: true,
43 enable_feature_updates: true,
44 enable_change_streams: true,
45 max_queue_size: 10_000,
46 batch_size: 100,
47 update_interval_ms: 100,
48 }
49 }
50}
51
52pub struct UpdateSystem {
54 #[allow(dead_code)]
55 config: UpdateConfig,
56 tile_manager: Arc<TileUpdateManager>,
57 feature_manager: Arc<FeatureUpdateManager>,
58 incremental_manager: Arc<IncrementalUpdateManager>,
59 change_streams: Arc<RwLock<std::collections::HashMap<String, Arc<ChangeStream>>>>,
60}
61
62impl UpdateSystem {
63 pub fn new(config: UpdateConfig) -> Self {
65 Self {
66 config: config.clone(),
67 tile_manager: Arc::new(TileUpdateManager::new(config.max_queue_size)),
68 feature_manager: Arc::new(FeatureUpdateManager::new(config.max_queue_size)),
69 incremental_manager: Arc::new(IncrementalUpdateManager::new()),
70 change_streams: Arc::new(RwLock::new(std::collections::HashMap::new())),
71 }
72 }
73
74 pub fn tile_manager(&self) -> &Arc<TileUpdateManager> {
76 &self.tile_manager
77 }
78
79 pub fn feature_manager(&self) -> &Arc<FeatureUpdateManager> {
81 &self.feature_manager
82 }
83
84 pub fn incremental_manager(&self) -> &Arc<IncrementalUpdateManager> {
86 &self.incremental_manager
87 }
88
89 pub async fn get_or_create_stream(&self, name: &str) -> Arc<ChangeStream> {
91 let mut streams = self.change_streams.write().await;
92
93 streams
94 .entry(name.to_string())
95 .or_insert_with(|| {
96 Arc::new(ChangeStream::new(
97 name.to_string(),
98 ChangeStreamConfig::default(),
99 ))
100 })
101 .clone()
102 }
103
104 pub async fn remove_stream(&self, name: &str) -> Option<Arc<ChangeStream>> {
106 let mut streams = self.change_streams.write().await;
107 streams.remove(name)
108 }
109
110 pub async fn stats(&self) -> UpdateStats {
112 let tile_stats = self.tile_manager.stats().await;
113 let feature_stats = self.feature_manager.stats().await;
114
115 UpdateStats {
116 tile_updates: tile_stats.total_updates,
117 feature_updates: feature_stats.total_updates,
118 change_streams: self.change_streams.read().await.len(),
119 }
120 }
121}
122
123#[derive(Debug, Clone)]
125pub struct UpdateStats {
126 pub tile_updates: u64,
128 pub feature_updates: u64,
130 pub change_streams: usize,
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137
138 #[test]
139 fn test_update_config_default() {
140 let config = UpdateConfig::default();
141 assert!(config.enable_tile_updates);
142 assert!(config.enable_feature_updates);
143 assert!(config.enable_change_streams);
144 }
145
146 #[tokio::test]
147 async fn test_update_system() {
148 let config = UpdateConfig::default();
149 let system = UpdateSystem::new(config);
150
151 let stats = system.stats().await;
152 assert_eq!(stats.tile_updates, 0);
153 assert_eq!(stats.feature_updates, 0);
154 assert_eq!(stats.change_streams, 0);
155 }
156}