oxigdal_websocket/updates/
change_stream.rs1use crate::error::Result;
4use crate::protocol::message::{ChangePayload, ChangeType, Message, MessageType, Payload};
5use parking_lot::RwLock;
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU64, Ordering};
9use tokio::sync::broadcast;
10
11#[derive(Debug, Clone)]
13pub struct ChangeStreamConfig {
14 pub max_buffer_size: usize,
16 pub enable_deduplication: bool,
18 pub broadcast_capacity: usize,
20}
21
22impl Default for ChangeStreamConfig {
23 fn default() -> Self {
24 Self {
25 max_buffer_size: 10_000,
26 enable_deduplication: true,
27 broadcast_capacity: 1000,
28 }
29 }
30}
31
32#[derive(Debug, Clone)]
34pub struct ChangeEvent {
35 pub change_id: u64,
37 pub collection: String,
39 pub change_type: ChangeType,
41 pub document_id: String,
43 pub data: Option<serde_json::Value>,
45 pub timestamp: i64,
47}
48
49impl ChangeEvent {
50 pub fn new(
52 change_id: u64,
53 collection: String,
54 change_type: ChangeType,
55 document_id: String,
56 data: Option<serde_json::Value>,
57 ) -> Self {
58 Self {
59 change_id,
60 collection,
61 change_type,
62 document_id,
63 data,
64 timestamp: chrono::Utc::now().timestamp_millis(),
65 }
66 }
67
68 pub fn to_message(&self) -> Message {
70 let payload = Payload::ChangeEvent(ChangePayload {
71 change_id: self.change_id,
72 collection: self.collection.clone(),
73 change_type: self.change_type,
74 document_id: self.document_id.clone(),
75 data: self.data.clone(),
76 });
77
78 Message::new(MessageType::ChangeStream, payload)
79 }
80}
81
82pub struct ChangeStream {
84 name: String,
85 config: ChangeStreamConfig,
86 buffer: Arc<RwLock<VecDeque<ChangeEvent>>>,
87 next_change_id: Arc<AtomicU64>,
88 tx: broadcast::Sender<ChangeEvent>,
89 stats: Arc<ChangeStreamStats>,
90}
91
92struct ChangeStreamStats {
94 total_events: AtomicU64,
95 created_events: AtomicU64,
96 updated_events: AtomicU64,
97 deleted_events: AtomicU64,
98 dropped_events: AtomicU64,
99 deduplicated_events: AtomicU64,
100}
101
102impl ChangeStream {
103 pub fn new(name: String, config: ChangeStreamConfig) -> Self {
105 let (tx, _) = broadcast::channel(config.broadcast_capacity);
106
107 Self {
108 name,
109 config,
110 buffer: Arc::new(RwLock::new(VecDeque::new())),
111 next_change_id: Arc::new(AtomicU64::new(1)),
112 tx,
113 stats: Arc::new(ChangeStreamStats {
114 total_events: AtomicU64::new(0),
115 created_events: AtomicU64::new(0),
116 updated_events: AtomicU64::new(0),
117 deleted_events: AtomicU64::new(0),
118 dropped_events: AtomicU64::new(0),
119 deduplicated_events: AtomicU64::new(0),
120 }),
121 }
122 }
123
124 pub fn name(&self) -> &str {
126 &self.name
127 }
128
129 pub fn add_event(
131 &self,
132 collection: String,
133 change_type: ChangeType,
134 document_id: String,
135 data: Option<serde_json::Value>,
136 ) -> Result<u64> {
137 let change_id = self.next_change_id.fetch_add(1, Ordering::Relaxed);
138
139 let event = ChangeEvent::new(change_id, collection, change_type, document_id, data);
140
141 self.stats.total_events.fetch_add(1, Ordering::Relaxed);
143 match change_type {
144 ChangeType::Created => {
145 self.stats.created_events.fetch_add(1, Ordering::Relaxed);
146 }
147 ChangeType::Updated => {
148 self.stats.updated_events.fetch_add(1, Ordering::Relaxed);
149 }
150 ChangeType::Deleted => {
151 self.stats.deleted_events.fetch_add(1, Ordering::Relaxed);
152 }
153 }
154
155 if self.config.enable_deduplication && self.is_duplicate(&event) {
157 self.stats
158 .deduplicated_events
159 .fetch_add(1, Ordering::Relaxed);
160 return Ok(change_id);
161 }
162
163 let mut buffer = self.buffer.write();
165 if buffer.len() >= self.config.max_buffer_size {
166 buffer.pop_front();
167 self.stats.dropped_events.fetch_add(1, Ordering::Relaxed);
168 }
169 buffer.push_back(event.clone());
170 drop(buffer);
171
172 let _ = self.tx.send(event);
174
175 Ok(change_id)
176 }
177
178 fn is_duplicate(&self, event: &ChangeEvent) -> bool {
180 let buffer = self.buffer.read();
181
182 buffer.iter().rev().take(10).any(|e| {
184 e.collection == event.collection
185 && e.document_id == event.document_id
186 && e.change_type == event.change_type
187 && e.timestamp.abs_diff(event.timestamp) < 1000 })
189 }
190
191 pub fn subscribe(&self) -> broadcast::Receiver<ChangeEvent> {
193 self.tx.subscribe()
194 }
195
196 pub fn get_events(&self, since_change_id: Option<u64>) -> Vec<ChangeEvent> {
198 let buffer = self.buffer.read();
199
200 if let Some(since_id) = since_change_id {
201 buffer
202 .iter()
203 .filter(|e| e.change_id > since_id)
204 .cloned()
205 .collect()
206 } else {
207 buffer.iter().cloned().collect()
208 }
209 }
210
211 pub fn buffer_size(&self) -> usize {
213 self.buffer.read().len()
214 }
215
216 pub fn clear(&self) {
218 self.buffer.write().clear();
219 }
220
221 pub fn stats(&self) -> ChangeStreamStatsSnapshot {
223 ChangeStreamStatsSnapshot {
224 name: self.name.clone(),
225 total_events: self.stats.total_events.load(Ordering::Relaxed),
226 created_events: self.stats.created_events.load(Ordering::Relaxed),
227 updated_events: self.stats.updated_events.load(Ordering::Relaxed),
228 deleted_events: self.stats.deleted_events.load(Ordering::Relaxed),
229 dropped_events: self.stats.dropped_events.load(Ordering::Relaxed),
230 deduplicated_events: self.stats.deduplicated_events.load(Ordering::Relaxed),
231 buffer_size: self.buffer_size(),
232 }
233 }
234}
235
236#[derive(Debug, Clone)]
238pub struct ChangeStreamStatsSnapshot {
239 pub name: String,
241 pub total_events: u64,
243 pub created_events: u64,
245 pub updated_events: u64,
247 pub deleted_events: u64,
249 pub dropped_events: u64,
251 pub deduplicated_events: u64,
253 pub buffer_size: usize,
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260
261 #[test]
262 fn test_change_event() {
263 let event = ChangeEvent::new(
264 1,
265 "collection".to_string(),
266 ChangeType::Created,
267 "doc1".to_string(),
268 None,
269 );
270
271 assert_eq!(event.change_id, 1);
272 assert_eq!(event.collection, "collection");
273 assert_eq!(event.change_type, ChangeType::Created);
274 }
275
276 #[test]
277 fn test_change_stream() {
278 let config = ChangeStreamConfig::default();
279 let stream = ChangeStream::new("test".to_string(), config);
280
281 assert_eq!(stream.name(), "test");
282 assert_eq!(stream.buffer_size(), 0);
283 }
284
285 #[test]
286 fn test_change_stream_add_event() -> Result<()> {
287 let config = ChangeStreamConfig::default();
288 let stream = ChangeStream::new("test".to_string(), config);
289
290 let change_id = stream.add_event(
291 "collection".to_string(),
292 ChangeType::Created,
293 "doc1".to_string(),
294 None,
295 )?;
296
297 assert_eq!(change_id, 1);
298 assert_eq!(stream.buffer_size(), 1);
299 Ok(())
300 }
301
302 #[test]
303 fn test_change_stream_get_events() -> Result<()> {
304 let config = ChangeStreamConfig::default();
305 let stream = ChangeStream::new("test".to_string(), config);
306
307 stream.add_event(
308 "collection".to_string(),
309 ChangeType::Created,
310 "doc1".to_string(),
311 None,
312 )?;
313
314 stream.add_event(
315 "collection".to_string(),
316 ChangeType::Updated,
317 "doc1".to_string(),
318 None,
319 )?;
320
321 let events = stream.get_events(None);
322 assert_eq!(events.len(), 2);
323
324 let events_since = stream.get_events(Some(1));
325 assert_eq!(events_since.len(), 1);
326 Ok(())
327 }
328
329 #[test]
330 fn test_change_stream_stats() -> Result<()> {
331 let config = ChangeStreamConfig::default();
332 let stream = ChangeStream::new("test".to_string(), config);
333
334 stream.add_event(
335 "collection".to_string(),
336 ChangeType::Created,
337 "doc1".to_string(),
338 None,
339 )?;
340
341 stream.add_event(
342 "collection".to_string(),
343 ChangeType::Updated,
344 "doc2".to_string(),
345 None,
346 )?;
347
348 stream.add_event(
349 "collection".to_string(),
350 ChangeType::Deleted,
351 "doc3".to_string(),
352 None,
353 )?;
354
355 let stats = stream.stats();
356 assert_eq!(stats.total_events, 3);
357 assert_eq!(stats.created_events, 1);
358 assert_eq!(stats.updated_events, 1);
359 assert_eq!(stats.deleted_events, 1);
360 Ok(())
361 }
362
363 #[test]
364 fn test_change_stream_max_buffer() -> Result<()> {
365 let config = ChangeStreamConfig {
366 max_buffer_size: 2,
367 ..Default::default()
368 };
369 let stream = ChangeStream::new("test".to_string(), config);
370
371 stream.add_event("c".to_string(), ChangeType::Created, "d1".to_string(), None)?;
372 stream.add_event("c".to_string(), ChangeType::Created, "d2".to_string(), None)?;
373 stream.add_event("c".to_string(), ChangeType::Created, "d3".to_string(), None)?;
374
375 assert_eq!(stream.buffer_size(), 2);
377
378 let stats = stream.stats();
379 assert_eq!(stats.dropped_events, 1);
380 Ok(())
381 }
382}