aurora_db/reactive/
watcher.rs1use super::{QueryUpdate, ReactiveQueryState};
2use crate::pubsub::ChangeListener;
3use crate::types::Document;
4use std::sync::Arc;
5use tokio::sync::mpsc;
6
7pub struct QueryWatcher {
9 receiver: mpsc::UnboundedReceiver<QueryUpdate>,
11 collection: String,
13}
14
15impl QueryWatcher {
16 pub fn new(
24 collection: impl Into<String>,
25 mut listener: ChangeListener,
26 state: Arc<ReactiveQueryState>,
27 initial_results: Vec<Document>,
28 ) -> Self {
29 let collection = collection.into();
30 let (sender, receiver) = mpsc::unbounded_channel();
31
32 let init_state = Arc::clone(&state);
34 let init_sender = sender.clone();
35 tokio::spawn(async move {
36 for doc in initial_results {
37 if let Some(update) = init_state.add_if_matches(doc).await {
38 let _ = init_sender.send(update);
39 }
40 }
41 });
42
43 tokio::spawn(async move {
45 while let Ok(event) = listener.recv().await {
46 let update = match event.change_type {
47 crate::pubsub::ChangeType::Insert => {
48 if let Some(doc) = event.document {
49 state.add_if_matches(doc).await
50 } else {
51 None
52 }
53 }
54 crate::pubsub::ChangeType::Update => {
55 if let Some(new_doc) = event.document {
56 state.update(&event.id, new_doc).await
57 } else {
58 None
59 }
60 }
61 crate::pubsub::ChangeType::Delete => state.remove(&event.id).await,
62 };
63
64 if let Some(u) = update
65 && sender.send(u).is_err()
66 {
67 break;
69 }
70 }
71 });
72
73 Self {
74 receiver,
75 collection,
76 }
77 }
78
79 pub async fn next(&mut self) -> Option<QueryUpdate> {
82 self.receiver.recv().await
83 }
84
85 pub fn collection(&self) -> &str {
87 &self.collection
88 }
89
90 pub fn try_next(&mut self) -> Option<QueryUpdate> {
92 self.receiver.try_recv().ok()
93 }
94
95 pub fn throttled(self, interval: std::time::Duration) -> ThrottledQueryWatcher {
100 ThrottledQueryWatcher::new(self.receiver, self.collection, interval)
101 }
102}
103
104pub struct ThrottledQueryWatcher {
110 receiver: mpsc::UnboundedReceiver<QueryUpdate>,
111 collection: String,
112}
113
114impl ThrottledQueryWatcher {
115 pub fn new(
117 mut raw_receiver: mpsc::UnboundedReceiver<QueryUpdate>,
118 collection: impl Into<String>,
119 interval: std::time::Duration,
120 ) -> Self {
121 let collection = collection.into();
122 let (tx, rx) = mpsc::unbounded_channel();
123
124 tokio::spawn(async move {
125 use std::collections::HashMap;
126 use tokio::time::interval as tokio_interval;
127
128 let mut tick = tokio_interval(interval);
129 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
130
131 let mut pending: HashMap<String, QueryUpdate> = HashMap::new();
133
134 loop {
135 tokio::select! {
136 biased;
137
138 maybe_update = raw_receiver.recv() => {
140 match maybe_update {
141 Some(update) => {
142 pending.insert(update.id().to_string(), update);
144 }
145 None => break, }
147 }
148
149 _ = tick.tick() => {
151 if !pending.is_empty() {
152 for (_, update) in pending.drain() {
153 if tx.send(update).is_err() {
154 return; }
156 }
157 }
158 }
159 }
160 }
161 });
162
163 Self {
164 receiver: rx,
165 collection,
166 }
167 }
168
169 pub async fn next(&mut self) -> Option<QueryUpdate> {
171 self.receiver.recv().await
172 }
173
174 pub fn collection(&self) -> &str {
176 &self.collection
177 }
178
179 pub fn try_next(&mut self) -> Option<QueryUpdate> {
181 self.receiver.try_recv().ok()
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188 use crate::pubsub::{ChangeEvent, PubSubSystem};
189 use crate::types::Value;
190 use std::collections::HashMap;
191
192 #[tokio::test]
193 async fn test_query_watcher_insert() {
194 let pubsub = PubSubSystem::new(100);
195 let listener = pubsub.listen("users");
196
197 let state = Arc::new(ReactiveQueryState::new(|doc: &Document| {
198 doc.data.get("active") == Some(&Value::Bool(true))
199 }));
200
201 let mut watcher = QueryWatcher::new("users", listener, state, vec![]);
202
203 let mut data = HashMap::new();
205 data.insert("active".to_string(), Value::Bool(true));
206 data.insert("name".to_string(), Value::String("Alice".into()));
207
208 let doc = Document {
209 id: "1".to_string(),
210 data,
211 };
212
213 pubsub
214 .publish(ChangeEvent::insert("users", "1", doc))
215 .unwrap();
216
217 let update = watcher.next().await.unwrap();
219 assert!(matches!(update, QueryUpdate::Added(_)));
220 assert_eq!(update.id(), "1");
221 }
222
223 #[tokio::test]
224 async fn test_query_watcher_filter() {
225 let pubsub = PubSubSystem::new(100);
226 let listener = pubsub.listen("users");
227
228 let state = Arc::new(ReactiveQueryState::new(|doc: &Document| {
229 doc.data.get("active") == Some(&Value::Bool(true))
230 }));
231
232 let mut watcher = QueryWatcher::new("users", listener, state, vec![]);
233
234 let mut inactive_data = HashMap::new();
236 inactive_data.insert("active".to_string(), Value::Bool(false));
237
238 pubsub
239 .publish(ChangeEvent::insert(
240 "users",
241 "1",
242 Document {
243 id: "1".to_string(),
244 data: inactive_data,
245 },
246 ))
247 .unwrap();
248
249 let mut active_data = HashMap::new();
251 active_data.insert("active".to_string(), Value::Bool(true));
252
253 pubsub
254 .publish(ChangeEvent::insert(
255 "users",
256 "2",
257 Document {
258 id: "2".to_string(),
259 data: active_data,
260 },
261 ))
262 .unwrap();
263
264 let update = watcher.next().await.unwrap();
266 assert_eq!(update.id(), "2");
267 }
268
269 #[tokio::test]
270 async fn test_debounced_watcher() {
271 use std::time::Duration;
272 use tokio::sync::mpsc;
273
274 let (tx, rx) = mpsc::unbounded_channel();
276
277 let mut throttled = ThrottledQueryWatcher::new(rx, "test", Duration::from_millis(100));
279
280 let mut data1 = HashMap::new();
282 data1.insert("value".to_string(), Value::Int(1));
283 tx.send(QueryUpdate::Added(Document {
284 id: "doc1".to_string(),
285 data: data1,
286 }))
287 .unwrap();
288
289 let mut data2 = HashMap::new();
290 data2.insert("value".to_string(), Value::Int(2));
291 tx.send(QueryUpdate::Modified {
292 old: Document {
293 id: "doc1".to_string(),
294 data: HashMap::new(),
295 },
296 new: Document {
297 id: "doc1".to_string(),
298 data: data2,
299 },
300 })
301 .unwrap();
302
303 let mut data3 = HashMap::new();
304 data3.insert("value".to_string(), Value::Int(3));
305 tx.send(QueryUpdate::Modified {
306 old: Document {
307 id: "doc1".to_string(),
308 data: HashMap::new(),
309 },
310 new: Document {
311 id: "doc1".to_string(),
312 data: data3.clone(),
313 },
314 })
315 .unwrap();
316
317 tokio::time::sleep(Duration::from_millis(150)).await;
319
320 let update = throttled.try_next();
322 assert!(update.is_some());
323 assert_eq!(update.unwrap().id(), "doc1");
325 }
326
327 #[tokio::test]
328 async fn test_throttled_watcher_multiple_docs() {
329 use std::time::Duration;
330 use tokio::sync::mpsc;
331
332 let (tx, rx) = mpsc::unbounded_channel();
333 let mut throttled = ThrottledQueryWatcher::new(rx, "test", Duration::from_millis(100));
334
335 for i in 1..=3 {
337 let mut data = HashMap::new();
338 data.insert("value".to_string(), Value::Int(i));
339 tx.send(QueryUpdate::Added(Document {
340 id: format!("doc{}", i),
341 data,
342 }))
343 .unwrap();
344 }
345
346 tokio::time::sleep(Duration::from_millis(150)).await;
348
349 let mut received = Vec::new();
351 while let Some(update) = throttled.try_next() {
352 received.push(update.id().to_string());
353 }
354
355 assert_eq!(received.len(), 3);
356 assert!(received.contains(&"doc1".to_string()));
357 assert!(received.contains(&"doc2".to_string()));
358 assert!(received.contains(&"doc3".to_string()));
359 }
360}