aurora_db/reactive/
watcher.rs1use super::{QueryUpdate, ReactiveQueryState};
2use crate::pubsub::ChangeListener;
3use crate::types::Document;
4use std::sync::Arc;
5use tokio::sync::mpsc;
6
7pub struct QueryWatcher {
9 receiver: mpsc::UnboundedReceiver<QueryUpdate>,
11 collection: String,
13}
14
15impl QueryWatcher {
16 pub fn new(
24 collection: impl Into<String>,
25 mut listener: ChangeListener,
26 state: Arc<ReactiveQueryState>,
27 initial_results: Vec<Document>,
28 ) -> Self {
29 let collection = collection.into();
30 let (sender, receiver) = mpsc::unbounded_channel();
31
32 let init_state = Arc::clone(&state);
34 let init_sender = sender.clone();
35 tokio::spawn(async move {
36 for doc in initial_results {
37 if let Some(update) = init_state.add_if_matches(doc).await {
38 let _ = init_sender.send(update);
39 }
40 }
41 });
42
43 tokio::spawn(async move {
45 while let Ok(event) = listener.recv().await {
46 let update = match event.change_type {
47 crate::pubsub::ChangeType::Insert => {
48 if let Some(doc) = event.document {
49 state.add_if_matches(doc).await
50 } else {
51 None
52 }
53 }
54 crate::pubsub::ChangeType::Update => {
55 if let Some(new_doc) = event.document {
56 state.update(&event.id, new_doc).await
57 } else {
58 None
59 }
60 }
61 crate::pubsub::ChangeType::Delete => state.remove(&event.id).await,
62 };
63
64 if let Some(u) = update
65 && sender.send(u).is_err() {
66 break;
68 }
69 }
70 });
71
72 Self {
73 receiver,
74 collection,
75 }
76 }
77
78 pub async fn next(&mut self) -> Option<QueryUpdate> {
81 self.receiver.recv().await
82 }
83
84 pub fn collection(&self) -> &str {
86 &self.collection
87 }
88
89 pub fn try_next(&mut self) -> Option<QueryUpdate> {
91 self.receiver.try_recv().ok()
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use super::*;
98 use crate::pubsub::{ChangeEvent, PubSubSystem};
99 use crate::types::Value;
100 use std::collections::HashMap;
101
102 #[tokio::test]
103 async fn test_query_watcher_insert() {
104 let pubsub = PubSubSystem::new(100);
105 let listener = pubsub.listen("users");
106
107 let state = Arc::new(ReactiveQueryState::new(|doc: &Document| {
108 doc.data.get("active") == Some(&Value::Bool(true))
109 }));
110
111 let mut watcher = QueryWatcher::new("users", listener, state, vec![]);
112
113 let mut data = HashMap::new();
115 data.insert("active".to_string(), Value::Bool(true));
116 data.insert("name".to_string(), Value::String("Alice".into()));
117
118 let doc = Document {
119 id: "1".to_string(),
120 data,
121 };
122
123 pubsub
124 .publish(ChangeEvent::insert("users", "1", doc))
125 .unwrap();
126
127 let update = watcher.next().await.unwrap();
129 assert!(matches!(update, QueryUpdate::Added(_)));
130 assert_eq!(update.id(), "1");
131 }
132
133 #[tokio::test]
134 async fn test_query_watcher_filter() {
135 let pubsub = PubSubSystem::new(100);
136 let listener = pubsub.listen("users");
137
138 let state = Arc::new(ReactiveQueryState::new(|doc: &Document| {
139 doc.data.get("active") == Some(&Value::Bool(true))
140 }));
141
142 let mut watcher = QueryWatcher::new("users", listener, state, vec![]);
143
144 let mut inactive_data = HashMap::new();
146 inactive_data.insert("active".to_string(), Value::Bool(false));
147
148 pubsub
149 .publish(ChangeEvent::insert(
150 "users",
151 "1",
152 Document {
153 id: "1".to_string(),
154 data: inactive_data,
155 },
156 ))
157 .unwrap();
158
159 let mut active_data = HashMap::new();
161 active_data.insert("active".to_string(), Value::Bool(true));
162
163 pubsub
164 .publish(ChangeEvent::insert(
165 "users",
166 "2",
167 Document {
168 id: "2".to_string(),
169 data: active_data,
170 },
171 ))
172 .unwrap();
173
174 let update = watcher.next().await.unwrap();
176 assert_eq!(update.id(), "2");
177 }
178}