aurora_db/reactive/
mod.rs1pub mod updates;
7pub mod watcher;
8
9pub use updates::{QueryUpdate, UpdateType};
10pub use watcher::{QueryWatcher, ThrottledQueryWatcher};
11
12use crate::types::Document;
13use std::collections::HashMap;
14use std::sync::Arc;
15use tokio::sync::RwLock;
16
17pub struct ReactiveQueryState {
19 results: Arc<RwLock<HashMap<String, Document>>>,
21 filter: Arc<dyn Fn(&Document) -> bool + Send + Sync>,
23}
24
25impl ReactiveQueryState {
26 pub fn new<F>(filter: F) -> Self
27 where
28 F: Fn(&Document) -> bool + Send + Sync + 'static,
29 {
30 Self {
31 results: Arc::new(RwLock::new(HashMap::new())),
32 filter: Arc::new(filter),
33 }
34 }
35
36 pub fn matches(&self, doc: &Document) -> bool {
38 (self.filter)(doc)
39 }
40
41 pub async fn add_if_matches(&self, doc: Document) -> Option<QueryUpdate> {
43 if self.matches(&doc) {
44 let mut results = self.results.write().await;
45 let id = doc.id.clone();
46
47 match results.entry(id) {
48 std::collections::hash_map::Entry::Occupied(mut e) => {
49 let old = e.insert(doc.clone());
51 Some(QueryUpdate::Modified { old, new: doc })
52 }
53 std::collections::hash_map::Entry::Vacant(e) => {
54 e.insert(doc.clone());
56 Some(QueryUpdate::Added(doc))
57 }
58 }
59 } else {
60 None
61 }
62 }
63
64 pub async fn remove(&self, id: &str) -> Option<QueryUpdate> {
66 let mut results = self.results.write().await;
67 results.remove(id).map(QueryUpdate::Removed)
68 }
69
70 pub async fn update(&self, id: &str, new_doc: Document) -> Option<QueryUpdate> {
72 let should_be_in_results = self.matches(&new_doc);
73 let mut results = self.results.write().await;
74 let was_in_results = results.contains_key(id);
75
76 match (was_in_results, should_be_in_results) {
77 (true, true) => {
78 let old = results.insert(id.to_string(), new_doc.clone());
79 Some(QueryUpdate::Modified {
80 old: old.unwrap(),
81 new: new_doc,
82 })
83 }
84 (true, false) => results.remove(id).map(QueryUpdate::Removed),
85 (false, true) => {
86 results.insert(id.to_string(), new_doc.clone());
87 Some(QueryUpdate::Added(new_doc))
88 }
89 (false, false) => None,
90 }
91 }
92
93 pub async fn get_results(&self) -> Vec<Document> {
95 self.results.read().await.values().cloned().collect()
96 }
97
98 pub async fn count(&self) -> usize {
100 self.results.read().await.len()
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107 use crate::types::Value;
108
109 #[tokio::test]
110 async fn test_reactive_state_add_if_matches() {
111 let state = ReactiveQueryState::new(|doc: &Document| {
112 doc.data.get("active") == Some(&Value::Bool(true))
113 });
114
115 let mut data = HashMap::new();
116 data.insert("active".to_string(), Value::Bool(true));
117 let doc = Document {
118 id: "1".to_string(),
119 data,
120 };
121
122 let update = state.add_if_matches(doc.clone()).await;
123 assert!(matches!(update, Some(QueryUpdate::Added(_))));
124 assert_eq!(state.count().await, 1);
125 }
126
127 #[tokio::test]
128 async fn test_reactive_state_filter() {
129 let state = ReactiveQueryState::new(|doc: &Document| {
130 doc.data.get("active") == Some(&Value::Bool(true))
131 });
132
133 let mut active_data = HashMap::new();
134 active_data.insert("active".to_string(), Value::Bool(true));
135 let active_doc = Document {
136 id: "1".to_string(),
137 data: active_data,
138 };
139
140 let mut inactive_data = HashMap::new();
141 inactive_data.insert("active".to_string(), Value::Bool(false));
142 let inactive_doc = Document {
143 id: "2".to_string(),
144 data: inactive_data,
145 };
146
147 assert!(state.add_if_matches(active_doc).await.is_some());
149 assert_eq!(state.count().await, 1);
150
151 assert!(state.add_if_matches(inactive_doc).await.is_none());
153 assert_eq!(state.count().await, 1);
154 }
155
156 #[tokio::test]
157 async fn test_reactive_state_update_transitions() {
158 let state = ReactiveQueryState::new(|doc: &Document| {
159 doc.data.get("active") == Some(&Value::Bool(true))
160 });
161
162 let mut data = HashMap::new();
164 data.insert("active".to_string(), Value::Bool(true));
165 let doc = Document {
166 id: "1".to_string(),
167 data,
168 };
169 state.add_if_matches(doc).await;
170
171 let mut inactive_data = HashMap::new();
173 inactive_data.insert("active".to_string(), Value::Bool(false));
174 let inactive_doc = Document {
175 id: "1".to_string(),
176 data: inactive_data,
177 };
178
179 let update = state.update("1", inactive_doc).await;
180 assert!(matches!(update, Some(QueryUpdate::Removed(_))));
181 assert_eq!(state.count().await, 0);
182 }
183}