aurora_db/reactive/
mod.rs1pub mod updates;
7pub mod watcher;
8
9pub use updates::{QueryUpdate, UpdateType};
10pub use watcher::{QueryWatcher, ThrottledQueryWatcher};
11
12use crate::query::Filter;
13use crate::types::Document;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::RwLock;
17
18pub struct ReactiveQueryState {
20 results: Arc<RwLock<HashMap<String, Document>>>,
22 filters: Vec<Filter>,
24}
25
26impl ReactiveQueryState {
27 pub fn new(filters: Vec<Filter>) -> Self {
28 Self {
29 results: Arc::new(RwLock::new(HashMap::new())),
30 filters,
31 }
32 }
33
34 pub fn matches(&self, doc: &Document) -> bool {
36 self.filters.iter().all(|f| f.matches(doc))
37 }
38
39 pub async fn add_if_matches(&self, doc: Document) -> Option<QueryUpdate> {
41 if self.matches(&doc) {
42 let mut results = self.results.write().await;
43 let id = doc._sid.clone();
44
45 match results.entry(id) {
46 std::collections::hash_map::Entry::Occupied(mut e) => {
47 let old = e.insert(doc.clone());
49 Some(QueryUpdate::Modified { old, new: doc })
50 }
51 std::collections::hash_map::Entry::Vacant(e) => {
52 e.insert(doc.clone());
54 Some(QueryUpdate::Added(doc))
55 }
56 }
57 } else {
58 None
59 }
60 }
61
62 pub async fn remove(&self, id: &str) -> Option<QueryUpdate> {
64 let mut results = self.results.write().await;
65 results.remove(id).map(QueryUpdate::Removed)
66 }
67
68 pub async fn update(&self, id: &str, new_doc: Document) -> Option<QueryUpdate> {
70 let should_be_in_results = self.matches(&new_doc);
71 let mut results = self.results.write().await;
72 let was_in_results = results.contains_key(id);
73
74 match (was_in_results, should_be_in_results) {
75 (true, true) => {
76 let old = results.insert(id.to_string(), new_doc.clone());
77 Some(QueryUpdate::Modified {
78 old: old.unwrap(),
79 new: new_doc,
80 })
81 }
82 (true, false) => results.remove(id).map(QueryUpdate::Removed),
83 (false, true) => {
84 results.insert(id.to_string(), new_doc.clone());
85 Some(QueryUpdate::Added(new_doc))
86 }
87 (false, false) => None,
88 }
89 }
90
91 pub async fn get_results(&self) -> Vec<Document> {
93 self.results.read().await.values().cloned().collect()
94 }
95
96 pub async fn count(&self) -> usize {
98 self.results.read().await.len()
99 }
100
101 pub async fn sync_state<I>(&self, new_docs: I) -> Vec<QueryUpdate>
104 where
105 I: IntoIterator<Item = Document>,
106 {
107 let mut updates = Vec::new();
108 let mut results = self.results.write().await;
109
110 let mut next_results = HashMap::new();
112 for doc in new_docs {
113 if self.matches(&doc) {
114 next_results.insert(doc._sid.clone(), doc);
115 }
116 }
117
118 let current_ids: Vec<String> = results.keys().cloned().collect();
120 for id in current_ids {
121 if !next_results.contains_key(&id) {
122 if let Some(old_doc) = results.remove(&id) {
123 updates.push(QueryUpdate::Removed(old_doc));
124 }
125 }
126 }
127
128 for (id, new_doc) in next_results {
130 match results.entry(id) {
131 std::collections::hash_map::Entry::Occupied(mut e) => {
132 let old_doc = e.get();
133 if old_doc.data != new_doc.data {
134 let old = e.insert(new_doc.clone());
136 updates.push(QueryUpdate::Modified { old, new: new_doc });
137 }
138 }
139 std::collections::hash_map::Entry::Vacant(e) => {
140 e.insert(new_doc.clone());
142 updates.push(QueryUpdate::Added(new_doc));
143 }
144 }
145 }
146
147 updates
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use crate::types::Value;
155
156 #[tokio::test]
157 async fn test_reactive_state_add_if_matches() {
158 let state =
159 ReactiveQueryState::new(vec![Filter::Eq("active".to_string(), Value::Bool(true))]);
160
161 let mut data = HashMap::new();
162 data.insert("active".to_string(), Value::Bool(true));
163 let doc = Document {
164 _sid: "1".to_string(),
165 data,
166 };
167
168 let update = state.add_if_matches(doc.clone()).await;
169 assert!(matches!(update, Some(QueryUpdate::Added(_))));
170 assert_eq!(state.count().await, 1);
171 }
172
173 #[tokio::test]
174 async fn test_reactive_state_filter() {
175 let state =
176 ReactiveQueryState::new(vec![Filter::Eq("active".to_string(), Value::Bool(true))]);
177
178 let mut active_data = HashMap::new();
179 active_data.insert("active".to_string(), Value::Bool(true));
180 let active_doc = Document {
181 _sid: "1".to_string(),
182 data: active_data,
183 };
184
185 let mut inactive_data = HashMap::new();
186 inactive_data.insert("active".to_string(), Value::Bool(false));
187 let inactive_doc = Document {
188 _sid: "2".to_string(),
189 data: inactive_data,
190 };
191
192 assert!(state.add_if_matches(active_doc).await.is_some());
194 assert_eq!(state.count().await, 1);
195
196 assert!(state.add_if_matches(inactive_doc).await.is_none());
198 assert_eq!(state.count().await, 1);
199 }
200
201 #[tokio::test]
202 async fn test_reactive_state_update_transitions() {
203 let state =
204 ReactiveQueryState::new(vec![Filter::Eq("active".to_string(), Value::Bool(true))]);
205
206 let mut data = HashMap::new();
208 data.insert("active".to_string(), Value::Bool(true));
209 let doc = Document {
210 _sid: "1".to_string(),
211 data,
212 };
213 state.add_if_matches(doc).await;
214
215 let mut inactive_data = HashMap::new();
217 inactive_data.insert("active".to_string(), Value::Bool(false));
218 let inactive_doc = Document {
219 _sid: "1".to_string(),
220 data: inactive_data,
221 };
222
223 let update = state.update("1", inactive_doc).await;
224 assert!(matches!(update, Some(QueryUpdate::Removed(_))));
225 assert_eq!(state.count().await, 0);
226 }
227}