Skip to main content

oxigdal_websocket/updates/
change_stream.rs

1//! Change stream processing for real-time updates
2
3use crate::error::Result;
4use crate::protocol::message::{ChangePayload, ChangeType, Message, MessageType, Payload};
5use parking_lot::RwLock;
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9use tokio::sync::broadcast;
10
11/// Change stream configuration
12#[derive(Debug, Clone)]
13pub struct ChangeStreamConfig {
14    /// Maximum buffer size
15    pub max_buffer_size: usize,
16    /// Enable change deduplication
17    pub enable_deduplication: bool,
18    /// Broadcast channel capacity
19    pub broadcast_capacity: usize,
20}
21
22impl Default for ChangeStreamConfig {
23    fn default() -> Self {
24        Self {
25            max_buffer_size: 10_000,
26            enable_deduplication: true,
27            broadcast_capacity: 1000,
28        }
29    }
30}
31
32/// Change event
33#[derive(Debug, Clone)]
34pub struct ChangeEvent {
35    /// Change ID (monotonically increasing)
36    pub change_id: u64,
37    /// Collection/layer name
38    pub collection: String,
39    /// Change type
40    pub change_type: ChangeType,
41    /// Document/feature ID
42    pub document_id: String,
43    /// Change data
44    pub data: Option<serde_json::Value>,
45    /// Timestamp
46    pub timestamp: i64,
47}
48
49impl ChangeEvent {
50    /// Create a new change event
51    pub fn new(
52        change_id: u64,
53        collection: String,
54        change_type: ChangeType,
55        document_id: String,
56        data: Option<serde_json::Value>,
57    ) -> Self {
58        Self {
59            change_id,
60            collection,
61            change_type,
62            document_id,
63            data,
64            timestamp: chrono::Utc::now().timestamp_millis(),
65        }
66    }
67
68    /// Convert to message
69    pub fn to_message(&self) -> Message {
70        let payload = Payload::ChangeEvent(ChangePayload {
71            change_id: self.change_id,
72            collection: self.collection.clone(),
73            change_type: self.change_type,
74            document_id: self.document_id.clone(),
75            data: self.data.clone(),
76        });
77
78        Message::new(MessageType::ChangeStream, payload)
79    }
80}
81
82/// Change stream
83pub struct ChangeStream {
84    name: String,
85    config: ChangeStreamConfig,
86    buffer: Arc<RwLock<VecDeque<ChangeEvent>>>,
87    next_change_id: Arc<AtomicU64>,
88    tx: broadcast::Sender<ChangeEvent>,
89    stats: Arc<ChangeStreamStats>,
90}
91
92/// Change stream statistics
93struct ChangeStreamStats {
94    total_events: AtomicU64,
95    created_events: AtomicU64,
96    updated_events: AtomicU64,
97    deleted_events: AtomicU64,
98    dropped_events: AtomicU64,
99    deduplicated_events: AtomicU64,
100}
101
102impl ChangeStream {
103    /// Create a new change stream
104    pub fn new(name: String, config: ChangeStreamConfig) -> Self {
105        let (tx, _) = broadcast::channel(config.broadcast_capacity);
106
107        Self {
108            name,
109            config,
110            buffer: Arc::new(RwLock::new(VecDeque::new())),
111            next_change_id: Arc::new(AtomicU64::new(1)),
112            tx,
113            stats: Arc::new(ChangeStreamStats {
114                total_events: AtomicU64::new(0),
115                created_events: AtomicU64::new(0),
116                updated_events: AtomicU64::new(0),
117                deleted_events: AtomicU64::new(0),
118                dropped_events: AtomicU64::new(0),
119                deduplicated_events: AtomicU64::new(0),
120            }),
121        }
122    }
123
124    /// Get stream name
125    pub fn name(&self) -> &str {
126        &self.name
127    }
128
129    /// Add a change event
130    pub fn add_event(
131        &self,
132        collection: String,
133        change_type: ChangeType,
134        document_id: String,
135        data: Option<serde_json::Value>,
136    ) -> Result<u64> {
137        let change_id = self.next_change_id.fetch_add(1, Ordering::Relaxed);
138
139        let event = ChangeEvent::new(change_id, collection, change_type, document_id, data);
140
141        // Update statistics
142        self.stats.total_events.fetch_add(1, Ordering::Relaxed);
143        match change_type {
144            ChangeType::Created => {
145                self.stats.created_events.fetch_add(1, Ordering::Relaxed);
146            }
147            ChangeType::Updated => {
148                self.stats.updated_events.fetch_add(1, Ordering::Relaxed);
149            }
150            ChangeType::Deleted => {
151                self.stats.deleted_events.fetch_add(1, Ordering::Relaxed);
152            }
153        }
154
155        // Check for deduplication
156        if self.config.enable_deduplication && self.is_duplicate(&event) {
157            self.stats
158                .deduplicated_events
159                .fetch_add(1, Ordering::Relaxed);
160            return Ok(change_id);
161        }
162
163        // Add to buffer
164        let mut buffer = self.buffer.write();
165        if buffer.len() >= self.config.max_buffer_size {
166            buffer.pop_front();
167            self.stats.dropped_events.fetch_add(1, Ordering::Relaxed);
168        }
169        buffer.push_back(event.clone());
170        drop(buffer);
171
172        // Broadcast to subscribers
173        let _ = self.tx.send(event);
174
175        Ok(change_id)
176    }
177
178    /// Check if an event is a duplicate
179    fn is_duplicate(&self, event: &ChangeEvent) -> bool {
180        let buffer = self.buffer.read();
181
182        // Check last few events for duplicates (same collection and document)
183        buffer.iter().rev().take(10).any(|e| {
184            e.collection == event.collection
185                && e.document_id == event.document_id
186                && e.change_type == event.change_type
187                && e.timestamp.abs_diff(event.timestamp) < 1000 // Within 1 second
188        })
189    }
190
191    /// Subscribe to change events
192    pub fn subscribe(&self) -> broadcast::Receiver<ChangeEvent> {
193        self.tx.subscribe()
194    }
195
196    /// Get buffered events
197    pub fn get_events(&self, since_change_id: Option<u64>) -> Vec<ChangeEvent> {
198        let buffer = self.buffer.read();
199
200        if let Some(since_id) = since_change_id {
201            buffer
202                .iter()
203                .filter(|e| e.change_id > since_id)
204                .cloned()
205                .collect()
206        } else {
207            buffer.iter().cloned().collect()
208        }
209    }
210
211    /// Get buffer size
212    pub fn buffer_size(&self) -> usize {
213        self.buffer.read().len()
214    }
215
216    /// Clear buffer
217    pub fn clear(&self) {
218        self.buffer.write().clear();
219    }
220
221    /// Get statistics
222    pub fn stats(&self) -> ChangeStreamStatsSnapshot {
223        ChangeStreamStatsSnapshot {
224            name: self.name.clone(),
225            total_events: self.stats.total_events.load(Ordering::Relaxed),
226            created_events: self.stats.created_events.load(Ordering::Relaxed),
227            updated_events: self.stats.updated_events.load(Ordering::Relaxed),
228            deleted_events: self.stats.deleted_events.load(Ordering::Relaxed),
229            dropped_events: self.stats.dropped_events.load(Ordering::Relaxed),
230            deduplicated_events: self.stats.deduplicated_events.load(Ordering::Relaxed),
231            buffer_size: self.buffer_size(),
232        }
233    }
234}
235
236/// Change stream statistics snapshot
237#[derive(Debug, Clone)]
238pub struct ChangeStreamStatsSnapshot {
239    /// Stream name
240    pub name: String,
241    /// Total events
242    pub total_events: u64,
243    /// Created events
244    pub created_events: u64,
245    /// Updated events
246    pub updated_events: u64,
247    /// Deleted events
248    pub deleted_events: u64,
249    /// Dropped events
250    pub dropped_events: u64,
251    /// Deduplicated events
252    pub deduplicated_events: u64,
253    /// Current buffer size
254    pub buffer_size: usize,
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260
261    #[test]
262    fn test_change_event() {
263        let event = ChangeEvent::new(
264            1,
265            "collection".to_string(),
266            ChangeType::Created,
267            "doc1".to_string(),
268            None,
269        );
270
271        assert_eq!(event.change_id, 1);
272        assert_eq!(event.collection, "collection");
273        assert_eq!(event.change_type, ChangeType::Created);
274    }
275
276    #[test]
277    fn test_change_stream() {
278        let config = ChangeStreamConfig::default();
279        let stream = ChangeStream::new("test".to_string(), config);
280
281        assert_eq!(stream.name(), "test");
282        assert_eq!(stream.buffer_size(), 0);
283    }
284
285    #[test]
286    fn test_change_stream_add_event() -> Result<()> {
287        let config = ChangeStreamConfig::default();
288        let stream = ChangeStream::new("test".to_string(), config);
289
290        let change_id = stream.add_event(
291            "collection".to_string(),
292            ChangeType::Created,
293            "doc1".to_string(),
294            None,
295        )?;
296
297        assert_eq!(change_id, 1);
298        assert_eq!(stream.buffer_size(), 1);
299        Ok(())
300    }
301
302    #[test]
303    fn test_change_stream_get_events() -> Result<()> {
304        let config = ChangeStreamConfig::default();
305        let stream = ChangeStream::new("test".to_string(), config);
306
307        stream.add_event(
308            "collection".to_string(),
309            ChangeType::Created,
310            "doc1".to_string(),
311            None,
312        )?;
313
314        stream.add_event(
315            "collection".to_string(),
316            ChangeType::Updated,
317            "doc1".to_string(),
318            None,
319        )?;
320
321        let events = stream.get_events(None);
322        assert_eq!(events.len(), 2);
323
324        let events_since = stream.get_events(Some(1));
325        assert_eq!(events_since.len(), 1);
326        Ok(())
327    }
328
329    #[test]
330    fn test_change_stream_stats() -> Result<()> {
331        let config = ChangeStreamConfig::default();
332        let stream = ChangeStream::new("test".to_string(), config);
333
334        stream.add_event(
335            "collection".to_string(),
336            ChangeType::Created,
337            "doc1".to_string(),
338            None,
339        )?;
340
341        stream.add_event(
342            "collection".to_string(),
343            ChangeType::Updated,
344            "doc2".to_string(),
345            None,
346        )?;
347
348        stream.add_event(
349            "collection".to_string(),
350            ChangeType::Deleted,
351            "doc3".to_string(),
352            None,
353        )?;
354
355        let stats = stream.stats();
356        assert_eq!(stats.total_events, 3);
357        assert_eq!(stats.created_events, 1);
358        assert_eq!(stats.updated_events, 1);
359        assert_eq!(stats.deleted_events, 1);
360        Ok(())
361    }
362
363    #[test]
364    fn test_change_stream_max_buffer() -> Result<()> {
365        let config = ChangeStreamConfig {
366            max_buffer_size: 2,
367            ..Default::default()
368        };
369        let stream = ChangeStream::new("test".to_string(), config);
370
371        stream.add_event("c".to_string(), ChangeType::Created, "d1".to_string(), None)?;
372        stream.add_event("c".to_string(), ChangeType::Created, "d2".to_string(), None)?;
373        stream.add_event("c".to_string(), ChangeType::Created, "d3".to_string(), None)?;
374
375        // Buffer should only contain 2 events
376        assert_eq!(stream.buffer_size(), 2);
377
378        let stats = stream.stats();
379        assert_eq!(stats.dropped_events, 1);
380        Ok(())
381    }
382}