aurora_db/reactive/
watcher.rs1use super::{QueryUpdate, ReactiveQueryState};
2use crate::pubsub::ChangeListener;
3use crate::types::Document;
4use std::sync::Arc;
5use tokio::sync::mpsc;
6
7pub struct QueryWatcher {
9 receiver: mpsc::UnboundedReceiver<QueryUpdate>,
11 collection: String,
13}
14
15impl QueryWatcher {
16 pub fn new(
24 collection: impl Into<String>,
25 mut listener: ChangeListener,
26 state: Arc<ReactiveQueryState>,
27 initial_results: Vec<Document>,
28 ) -> Self {
29 let collection = collection.into();
30 let (sender, receiver) = mpsc::unbounded_channel();
31
32 let init_state = Arc::clone(&state);
34 let init_sender = sender.clone();
35 tokio::spawn(async move {
36 for doc in initial_results {
37 if let Some(update) = init_state.add_if_matches(doc).await {
38 let _ = init_sender.send(update);
39 }
40 }
41 });
42
43 tokio::spawn(async move {
45 while let Ok(event) = listener.recv().await {
46 let update = match event.change_type {
47 crate::pubsub::ChangeType::Insert => {
48 if let Some(doc) = event.document {
49 state.add_if_matches(doc).await
50 } else {
51 None
52 }
53 }
54 crate::pubsub::ChangeType::Update => {
55 if let Some(new_doc) = event.document {
56 state.update(&event.id, new_doc).await
57 } else {
58 None
59 }
60 }
61 crate::pubsub::ChangeType::Delete => state.remove(&event.id).await,
62 };
63
64 if let Some(u) = update {
65 if sender.send(u).is_err() {
66 break;
68 }
69 }
70 }
71 });
72
73 Self {
74 receiver,
75 collection,
76 }
77 }
78
79 pub async fn next(&mut self) -> Option<QueryUpdate> {
82 self.receiver.recv().await
83 }
84
85 pub fn collection(&self) -> &str {
87 &self.collection
88 }
89
90 pub fn try_next(&mut self) -> Option<QueryUpdate> {
92 self.receiver.try_recv().ok()
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99 use crate::pubsub::{ChangeEvent, PubSubSystem};
100 use crate::types::Value;
101 use std::collections::HashMap;
102
103 #[tokio::test]
104 async fn test_query_watcher_insert() {
105 let pubsub = PubSubSystem::new(100);
106 let listener = pubsub.listen("users");
107
108 let state = Arc::new(ReactiveQueryState::new(|doc: &Document| {
109 doc.data.get("active") == Some(&Value::Bool(true))
110 }));
111
112 let mut watcher = QueryWatcher::new("users", listener, state, vec![]);
113
114 let mut data = HashMap::new();
116 data.insert("active".to_string(), Value::Bool(true));
117 data.insert("name".to_string(), Value::String("Alice".into()));
118
119 let doc = Document {
120 id: "1".to_string(),
121 data,
122 };
123
124 pubsub
125 .publish(ChangeEvent::insert("users", "1", doc))
126 .unwrap();
127
128 let update = watcher.next().await.unwrap();
130 assert!(matches!(update, QueryUpdate::Added(_)));
131 assert_eq!(update.id(), "1");
132 }
133
134 #[tokio::test]
135 async fn test_query_watcher_filter() {
136 let pubsub = PubSubSystem::new(100);
137 let listener = pubsub.listen("users");
138
139 let state = Arc::new(ReactiveQueryState::new(|doc: &Document| {
140 doc.data.get("active") == Some(&Value::Bool(true))
141 }));
142
143 let mut watcher = QueryWatcher::new("users", listener, state, vec![]);
144
145 let mut inactive_data = HashMap::new();
147 inactive_data.insert("active".to_string(), Value::Bool(false));
148
149 pubsub
150 .publish(ChangeEvent::insert(
151 "users",
152 "1",
153 Document {
154 id: "1".to_string(),
155 data: inactive_data,
156 },
157 ))
158 .unwrap();
159
160 let mut active_data = HashMap::new();
162 active_data.insert("active".to_string(), Value::Bool(true));
163
164 pubsub
165 .publish(ChangeEvent::insert(
166 "users",
167 "2",
168 Document {
169 id: "2".to_string(),
170 data: active_data,
171 },
172 ))
173 .unwrap();
174
175 let update = watcher.next().await.unwrap();
177 assert_eq!(update.id(), "2");
178 }
179}