aurora_db/reactive/
mod.rs1pub mod updates;
7pub mod watcher;
8
9pub use updates::{QueryUpdate, UpdateType};
10pub use watcher::{QueryWatcher, ThrottledQueryWatcher};
11
12use crate::types::Document;
13use crate::query::Filter;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::RwLock;
17
18pub struct ReactiveQueryState {
20 results: Arc<RwLock<HashMap<String, Document>>>,
22 filters: Vec<Filter>,
24}
25
26impl ReactiveQueryState {
27 pub fn new(filters: Vec<Filter>) -> Self {
28 Self {
29 results: Arc::new(RwLock::new(HashMap::new())),
30 filters,
31 }
32 }
33
34 pub fn matches(&self, doc: &Document) -> bool {
36 self.filters.iter().all(|f| f.matches(doc))
37 }
38
39 pub async fn add_if_matches(&self, doc: Document) -> Option<QueryUpdate> {
41 if self.matches(&doc) {
42 let mut results = self.results.write().await;
43 let id = doc._sid.clone();
44
45 match results.entry(id) {
46 std::collections::hash_map::Entry::Occupied(mut e) => {
47 let old = e.insert(doc.clone());
49 Some(QueryUpdate::Modified { old, new: doc })
50 }
51 std::collections::hash_map::Entry::Vacant(e) => {
52 e.insert(doc.clone());
54 Some(QueryUpdate::Added(doc))
55 }
56 }
57 } else {
58 None
59 }
60 }
61
62 pub async fn remove(&self, id: &str) -> Option<QueryUpdate> {
64 let mut results = self.results.write().await;
65 results.remove(id).map(QueryUpdate::Removed)
66 }
67
68 pub async fn update(&self, id: &str, new_doc: Document) -> Option<QueryUpdate> {
70 let should_be_in_results = self.matches(&new_doc);
71 let mut results = self.results.write().await;
72 let was_in_results = results.contains_key(id);
73
74 match (was_in_results, should_be_in_results) {
75 (true, true) => {
76 let old = results.insert(id.to_string(), new_doc.clone());
77 Some(QueryUpdate::Modified {
78 old: old.unwrap(),
79 new: new_doc,
80 })
81 }
82 (true, false) => results.remove(id).map(QueryUpdate::Removed),
83 (false, true) => {
84 results.insert(id.to_string(), new_doc.clone());
85 Some(QueryUpdate::Added(new_doc))
86 }
87 (false, false) => None,
88 }
89 }
90
91 pub async fn get_results(&self) -> Vec<Document> {
93 self.results.read().await.values().cloned().collect()
94 }
95
96 pub async fn count(&self) -> usize {
98 self.results.read().await.len()
99 }
100
101 pub async fn sync_state<I>(&self, new_docs: I) -> Vec<QueryUpdate>
104 where
105 I: IntoIterator<Item = Document>,
106 {
107 let mut updates = Vec::new();
108 let mut results = self.results.write().await;
109
110 let mut next_results = HashMap::new();
112 for doc in new_docs {
113 if self.matches(&doc) {
114 next_results.insert(doc._sid.clone(), doc);
115 }
116 }
117
118 let current_ids: Vec<String> = results.keys().cloned().collect();
120 for id in current_ids {
121 if !next_results.contains_key(&id) {
122 if let Some(old_doc) = results.remove(&id) {
123 updates.push(QueryUpdate::Removed(old_doc));
124 }
125 }
126 }
127
128 for (id, new_doc) in next_results {
130 match results.entry(id) {
131 std::collections::hash_map::Entry::Occupied(mut e) => {
132 let old_doc = e.get();
133 if old_doc.data != new_doc.data {
134 let old = e.insert(new_doc.clone());
136 updates.push(QueryUpdate::Modified { old, new: new_doc });
137 }
138 }
139 std::collections::hash_map::Entry::Vacant(e) => {
140 e.insert(new_doc.clone());
142 updates.push(QueryUpdate::Added(new_doc));
143 }
144 }
145 }
146
147 updates
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use crate::types::Value;
155
156 #[tokio::test]
157 async fn test_reactive_state_add_if_matches() {
158 let state = ReactiveQueryState::new(vec![
159 Filter::Eq("active".to_string(), Value::Bool(true))
160 ]);
161
162 let mut data = HashMap::new();
163 data.insert("active".to_string(), Value::Bool(true));
164 let doc = Document {
165 _sid: "1".to_string(),
166 data,
167 };
168
169 let update = state.add_if_matches(doc.clone()).await;
170 assert!(matches!(update, Some(QueryUpdate::Added(_))));
171 assert_eq!(state.count().await, 1);
172 }
173
174 #[tokio::test]
175 async fn test_reactive_state_filter() {
176 let state = ReactiveQueryState::new(vec![
177 Filter::Eq("active".to_string(), Value::Bool(true))
178 ]);
179
180 let mut active_data = HashMap::new();
181 active_data.insert("active".to_string(), Value::Bool(true));
182 let active_doc = Document {
183 _sid: "1".to_string(),
184 data: active_data,
185 };
186
187 let mut inactive_data = HashMap::new();
188 inactive_data.insert("active".to_string(), Value::Bool(false));
189 let inactive_doc = Document {
190 _sid: "2".to_string(),
191 data: inactive_data,
192 };
193
194 assert!(state.add_if_matches(active_doc).await.is_some());
196 assert_eq!(state.count().await, 1);
197
198 assert!(state.add_if_matches(inactive_doc).await.is_none());
200 assert_eq!(state.count().await, 1);
201 }
202
203 #[tokio::test]
204 async fn test_reactive_state_update_transitions() {
205 let state = ReactiveQueryState::new(vec![
206 Filter::Eq("active".to_string(), Value::Bool(true))
207 ]);
208
209 let mut data = HashMap::new();
211 data.insert("active".to_string(), Value::Bool(true));
212 let doc = Document {
213 _sid: "1".to_string(),
214 data,
215 };
216 state.add_if_matches(doc).await;
217
218 let mut inactive_data = HashMap::new();
220 inactive_data.insert("active".to_string(), Value::Bool(false));
221 let inactive_doc = Document {
222 _sid: "1".to_string(),
223 data: inactive_data,
224 };
225
226 let update = state.update("1", inactive_doc).await;
227 assert!(matches!(update, Some(QueryUpdate::Removed(_))));
228 assert_eq!(state.count().await, 0);
229 }
230}