Skip to main content

aurora_db/reactive/
watcher.rs

1use super::{QueryUpdate, ReactiveQueryState};
2use crate::pubsub::ChangeListener;
3use crate::types::Document;
4use std::sync::Arc;
5use tokio::sync::mpsc;
6
7/// Watches a query and emits updates when results change
8pub struct QueryWatcher {
9    /// Receiver for query updates
10    receiver: mpsc::UnboundedReceiver<QueryUpdate>,
11    /// Collection being watched
12    collection: String,
13}
14
15impl QueryWatcher {
16    /// Create a new query watcher
17    ///
18    /// # Arguments
19    /// * `collection` - Collection to watch
20    /// * `listener` - Change listener for the collection
21    /// * `state` - Reactive query state
22    /// * `initial_results` - Initial query results to populate the state
23    pub fn new(
24        collection: impl Into<String>,
25        mut listener: ChangeListener,
26        state: Arc<ReactiveQueryState>,
27        initial_results: Vec<Document>,
28    ) -> Self {
29        let collection = collection.into();
30        let (sender, receiver) = mpsc::unbounded_channel();
31
32        // Populate initial results
33        let init_state = Arc::clone(&state);
34        let init_sender = sender.clone();
35        tokio::spawn(async move {
36            for doc in initial_results {
37                if let Some(update) = init_state.add_if_matches(doc).await {
38                    let _ = init_sender.send(update);
39                }
40            }
41        });
42
43        // Spawn background task to listen for changes
44        tokio::spawn(async move {
45            while let Ok(event) = listener.recv().await {
46                let update = match event.change_type {
47                    crate::pubsub::ChangeType::Insert => {
48                        if let Some(doc) = event.document {
49                            state.add_if_matches(doc).await
50                        } else {
51                            None
52                        }
53                    }
54                    crate::pubsub::ChangeType::Update => {
55                        if let Some(new_doc) = event.document {
56                            state.update(&event.id, new_doc).await
57                        } else {
58                            None
59                        }
60                    }
61                    crate::pubsub::ChangeType::Delete => state.remove(&event.id).await,
62                };
63
64                if let Some(u) = update
65                    && sender.send(u).is_err()
66                {
67                    // Receiver dropped, stop watching
68                    break;
69                }
70            }
71        });
72
73        Self {
74            receiver,
75            collection,
76        }
77    }
78
79    /// Get the next query update
80    /// Returns None when the watcher is closed
81    pub async fn next(&mut self) -> Option<QueryUpdate> {
82        self.receiver.recv().await
83    }
84
85    /// Get the collection name being watched
86    pub fn collection(&self) -> &str {
87        &self.collection
88    }
89
90    /// Try to receive an update without blocking
91    pub fn try_next(&mut self) -> Option<QueryUpdate> {
92        self.receiver.try_recv().ok()
93    }
94
95    /// Convert to a throttled watcher for rate-limiting updates
96    ///
97    /// Events are buffered and emitted at most once per interval.
98    /// Deduplicates by document ID, keeping only the latest state.
99    pub fn throttled(self, interval: std::time::Duration) -> ThrottledQueryWatcher {
100        ThrottledQueryWatcher::new(self.receiver, self.collection, interval)
101    }
102}
103
104/// A throttled/debounced query watcher for rate-limiting reactive updates
105///
106/// Buffers incoming events and emits them at a fixed interval.
107/// Deduplicates by document ID, keeping only the latest state per ID.
108/// This prevents overwhelming the UI with high-frequency updates.
109pub struct ThrottledQueryWatcher {
110    receiver: mpsc::UnboundedReceiver<QueryUpdate>,
111    collection: String,
112}
113
114impl ThrottledQueryWatcher {
115    /// Create a new throttled watcher
116    pub fn new(
117        mut raw_receiver: mpsc::UnboundedReceiver<QueryUpdate>,
118        collection: impl Into<String>,
119        interval: std::time::Duration,
120    ) -> Self {
121        let collection = collection.into();
122        let (tx, rx) = mpsc::unbounded_channel();
123
124        tokio::spawn(async move {
125            use std::collections::HashMap;
126            use tokio::time::interval as tokio_interval;
127
128            let mut tick = tokio_interval(interval);
129            tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
130
131            // Buffer: doc_id -> latest update for that doc
132            let mut pending: HashMap<String, QueryUpdate> = HashMap::new();
133
134            loop {
135                tokio::select! {
136                    biased;
137
138                    // Collect events as fast as they come
139                    maybe_update = raw_receiver.recv() => {
140                        match maybe_update {
141                            Some(update) => {
142                                // Dedupe by doc ID - keep latest state
143                                pending.insert(update.id().to_string(), update);
144                            }
145                            None => break, // Raw receiver closed
146                        }
147                    }
148
149                    // Every tick, flush the buffer
150                    _ = tick.tick() => {
151                        if !pending.is_empty() {
152                            for (_, update) in pending.drain() {
153                                if tx.send(update).is_err() {
154                                    return; // Receiver dropped
155                                }
156                            }
157                        }
158                    }
159                }
160            }
161        });
162
163        Self {
164            receiver: rx,
165            collection,
166        }
167    }
168
169    /// Get the next throttled update
170    pub async fn next(&mut self) -> Option<QueryUpdate> {
171        self.receiver.recv().await
172    }
173
174    /// Get the collection name
175    pub fn collection(&self) -> &str {
176        &self.collection
177    }
178
179    /// Try to receive without blocking
180    pub fn try_next(&mut self) -> Option<QueryUpdate> {
181        self.receiver.try_recv().ok()
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188    use crate::pubsub::{ChangeEvent, PubSubSystem};
189    use crate::types::Value;
190    use std::collections::HashMap;
191
192    #[tokio::test]
193    async fn test_query_watcher_insert() {
194        let pubsub = PubSubSystem::new(100);
195        let listener = pubsub.listen("users");
196
197        let state = Arc::new(ReactiveQueryState::new(|doc: &Document| {
198            doc.data.get("active") == Some(&Value::Bool(true))
199        }));
200
201        let mut watcher = QueryWatcher::new("users", listener, state, vec![]);
202
203        // Publish an insert event for an active user
204        let mut data = HashMap::new();
205        data.insert("active".to_string(), Value::Bool(true));
206        data.insert("name".to_string(), Value::String("Alice".into()));
207
208        let doc = Document {
209            id: "1".to_string(),
210            data,
211        };
212
213        pubsub
214            .publish(ChangeEvent::insert("users", "1", doc))
215            .unwrap();
216
217        // Should receive an Added update
218        let update = watcher.next().await.unwrap();
219        assert!(matches!(update, QueryUpdate::Added(_)));
220        assert_eq!(update.id(), "1");
221    }
222
223    #[tokio::test]
224    async fn test_query_watcher_filter() {
225        let pubsub = PubSubSystem::new(100);
226        let listener = pubsub.listen("users");
227
228        let state = Arc::new(ReactiveQueryState::new(|doc: &Document| {
229            doc.data.get("active") == Some(&Value::Bool(true))
230        }));
231
232        let mut watcher = QueryWatcher::new("users", listener, state, vec![]);
233
234        // Publish an inactive user (should be filtered)
235        let mut inactive_data = HashMap::new();
236        inactive_data.insert("active".to_string(), Value::Bool(false));
237
238        pubsub
239            .publish(ChangeEvent::insert(
240                "users",
241                "1",
242                Document {
243                    id: "1".to_string(),
244                    data: inactive_data,
245                },
246            ))
247            .unwrap();
248
249        // Publish an active user (should pass filter)
250        let mut active_data = HashMap::new();
251        active_data.insert("active".to_string(), Value::Bool(true));
252
253        pubsub
254            .publish(ChangeEvent::insert(
255                "users",
256                "2",
257                Document {
258                    id: "2".to_string(),
259                    data: active_data,
260                },
261            ))
262            .unwrap();
263
264        // Should only receive the active user
265        let update = watcher.next().await.unwrap();
266        assert_eq!(update.id(), "2");
267    }
268
269    #[tokio::test]
270    async fn test_debounced_watcher() {
271        use std::time::Duration;
272        use tokio::sync::mpsc;
273
274        // Create a channel that simulates raw query updates
275        let (tx, rx) = mpsc::unbounded_channel();
276
277        // Create throttled watcher with 100ms interval
278        let mut throttled = ThrottledQueryWatcher::new(rx, "test", Duration::from_millis(100));
279
280        // Send multiple updates for the same document rapidly
281        let mut data1 = HashMap::new();
282        data1.insert("value".to_string(), Value::Int(1));
283        tx.send(QueryUpdate::Added(Document {
284            id: "doc1".to_string(),
285            data: data1,
286        }))
287        .unwrap();
288
289        let mut data2 = HashMap::new();
290        data2.insert("value".to_string(), Value::Int(2));
291        tx.send(QueryUpdate::Modified {
292            old: Document {
293                id: "doc1".to_string(),
294                data: HashMap::new(),
295            },
296            new: Document {
297                id: "doc1".to_string(),
298                data: data2,
299            },
300        })
301        .unwrap();
302
303        let mut data3 = HashMap::new();
304        data3.insert("value".to_string(), Value::Int(3));
305        tx.send(QueryUpdate::Modified {
306            old: Document {
307                id: "doc1".to_string(),
308                data: HashMap::new(),
309            },
310            new: Document {
311                id: "doc1".to_string(),
312                data: data3.clone(),
313            },
314        })
315        .unwrap();
316
317        // Wait for throttle interval to pass
318        tokio::time::sleep(Duration::from_millis(150)).await;
319
320        // Should receive only the latest update (deduped by doc ID)
321        let update = throttled.try_next();
322        assert!(update.is_some());
323        // The last one wins due to deduplication
324        assert_eq!(update.unwrap().id(), "doc1");
325    }
326
327    #[tokio::test]
328    async fn test_throttled_watcher_multiple_docs() {
329        use std::time::Duration;
330        use tokio::sync::mpsc;
331
332        let (tx, rx) = mpsc::unbounded_channel();
333        let mut throttled = ThrottledQueryWatcher::new(rx, "test", Duration::from_millis(100));
334
335        // Send updates for different documents
336        for i in 1..=3 {
337            let mut data = HashMap::new();
338            data.insert("value".to_string(), Value::Int(i));
339            tx.send(QueryUpdate::Added(Document {
340                id: format!("doc{}", i),
341                data,
342            }))
343            .unwrap();
344        }
345
346        // Wait for throttle
347        tokio::time::sleep(Duration::from_millis(150)).await;
348
349        // Should receive all 3 (different IDs, no deduplication)
350        let mut received = Vec::new();
351        while let Some(update) = throttled.try_next() {
352            received.push(update.id().to_string());
353        }
354
355        assert_eq!(received.len(), 3);
356        assert!(received.contains(&"doc1".to_string()));
357        assert!(received.contains(&"doc2".to_string()));
358        assert!(received.contains(&"doc3".to_string()));
359    }
360}