Skip to main content

aurora_db/reactive/
mod.rs

1// Reactive Queries - Live query results that auto-update
2//
3// This module provides reactive queries that automatically update
4// when underlying data changes, similar to Firebase's real-time queries.
5
6pub mod updates;
7pub mod watcher;
8
9pub use updates::{QueryUpdate, UpdateType};
10pub use watcher::{QueryWatcher, ThrottledQueryWatcher};
11
12use crate::types::Document;
13use crate::query::Filter;
14use std::collections::HashMap;
15use std::sync::Arc;
16use tokio::sync::RwLock;
17
18/// Tracks the current state of a reactive query
19pub struct ReactiveQueryState {
20    /// Current results (keyed by document ID)
21    results: Arc<RwLock<HashMap<String, Document>>>,
22    /// Filter expression to check if a document matches
23    filters: Vec<Filter>,
24}
25
26impl ReactiveQueryState {
27    pub fn new(filters: Vec<Filter>) -> Self {
28        Self {
29            results: Arc::new(RwLock::new(HashMap::new())),
30            filters,
31        }
32    }
33
34    /// Check if a document matches the query filter
35    pub fn matches(&self, doc: &Document) -> bool {
36        self.filters.iter().all(|f| f.matches(doc))
37    }
38
39    /// Add a document to results if it matches
40    pub async fn add_if_matches(&self, doc: Document) -> Option<QueryUpdate> {
41        if self.matches(&doc) {
42            let mut results = self.results.write().await;
43            let id = doc.id.clone();
44
45            match results.entry(id) {
46                std::collections::hash_map::Entry::Occupied(mut e) => {
47                    // Document already in results, this is a modification
48                    let old = e.insert(doc.clone());
49                    Some(QueryUpdate::Modified { old, new: doc })
50                }
51                std::collections::hash_map::Entry::Vacant(e) => {
52                    // New document added to results
53                    e.insert(doc.clone());
54                    Some(QueryUpdate::Added(doc))
55                }
56            }
57        } else {
58            None
59        }
60    }
61
62    /// Remove a document from results
63    pub async fn remove(&self, id: &str) -> Option<QueryUpdate> {
64        let mut results = self.results.write().await;
65        results.remove(id).map(QueryUpdate::Removed)
66    }
67
68    /// Update a document, checking if it should be added/removed/modified
69    pub async fn update(&self, id: &str, new_doc: Document) -> Option<QueryUpdate> {
70        let should_be_in_results = self.matches(&new_doc);
71        let mut results = self.results.write().await;
72        let was_in_results = results.contains_key(id);
73
74        match (was_in_results, should_be_in_results) {
75            (true, true) => {
76                let old = results.insert(id.to_string(), new_doc.clone());
77                Some(QueryUpdate::Modified {
78                    old: old.unwrap(),
79                    new: new_doc,
80                })
81            }
82            (true, false) => results.remove(id).map(QueryUpdate::Removed),
83            (false, true) => {
84                results.insert(id.to_string(), new_doc.clone());
85                Some(QueryUpdate::Added(new_doc))
86            }
87            (false, false) => None,
88        }
89    }
90
91    /// Get current results as a Vec
92    pub async fn get_results(&self) -> Vec<Document> {
93        self.results.read().await.values().cloned().collect()
94    }
95
96    /// Get count of current results
97    pub async fn count(&self) -> usize {
98        self.results.read().await.len()
99    }
100
101    /// Synchronize state with a full snapshot of documents
102    /// Returns a list of updates needed to reach the new state
103    pub async fn sync_state<I>(&self, new_docs: I) -> Vec<QueryUpdate>
104    where
105        I: IntoIterator<Item = Document>,
106    {
107        let mut updates = Vec::new();
108        let mut results = self.results.write().await;
109
110        // 1. Identify which of the new documents should be in results
111        let mut next_results = HashMap::new();
112        for doc in new_docs {
113            if self.matches(&doc) {
114                next_results.insert(doc.id.clone(), doc);
115            }
116        }
117
118        // 2. Find Removed items (IDs present in current results but not in next_results)
119        let current_ids: Vec<String> = results.keys().cloned().collect();
120        for id in current_ids {
121            if !next_results.contains_key(&id) {
122                if let Some(old_doc) = results.remove(&id) {
123                    updates.push(QueryUpdate::Removed(old_doc));
124                }
125            }
126        }
127
128        // 3. Find Added or Modified items
129        for (id, new_doc) in next_results {
130            match results.entry(id) {
131                std::collections::hash_map::Entry::Occupied(mut e) => {
132                    let old_doc = e.get();
133                    if old_doc.data != new_doc.data {
134                        // Document modified
135                        let old = e.insert(new_doc.clone());
136                        updates.push(QueryUpdate::Modified { old, new: new_doc });
137                    }
138                }
139                std::collections::hash_map::Entry::Vacant(e) => {
140                    // Document added
141                    e.insert(new_doc.clone());
142                    updates.push(QueryUpdate::Added(new_doc));
143                }
144            }
145        }
146
147        updates
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use crate::types::Value;
155
156    #[tokio::test]
157    async fn test_reactive_state_add_if_matches() {
158        let state = ReactiveQueryState::new(vec![
159            Filter::Eq("active".to_string(), Value::Bool(true))
160        ]);
161
162        let mut data = HashMap::new();
163        data.insert("active".to_string(), Value::Bool(true));
164        let doc = Document {
165            id: "1".to_string(),
166            data,
167        };
168
169        let update = state.add_if_matches(doc.clone()).await;
170        assert!(matches!(update, Some(QueryUpdate::Added(_))));
171        assert_eq!(state.count().await, 1);
172    }
173
174    #[tokio::test]
175    async fn test_reactive_state_filter() {
176        let state = ReactiveQueryState::new(vec![
177            Filter::Eq("active".to_string(), Value::Bool(true))
178        ]);
179
180        let mut active_data = HashMap::new();
181        active_data.insert("active".to_string(), Value::Bool(true));
182        let active_doc = Document {
183            id: "1".to_string(),
184            data: active_data,
185        };
186
187        let mut inactive_data = HashMap::new();
188        inactive_data.insert("active".to_string(), Value::Bool(false));
189        let inactive_doc = Document {
190            id: "2".to_string(),
191            data: inactive_data,
192        };
193
194        // Active doc should be added
195        assert!(state.add_if_matches(active_doc).await.is_some());
196        assert_eq!(state.count().await, 1);
197
198        // Inactive doc should NOT be added
199        assert!(state.add_if_matches(inactive_doc).await.is_none());
200        assert_eq!(state.count().await, 1);
201    }
202
203    #[tokio::test]
204    async fn test_reactive_state_update_transitions() {
205        let state = ReactiveQueryState::new(vec![
206            Filter::Eq("active".to_string(), Value::Bool(true))
207        ]);
208
209        // Add initial active document
210        let mut data = HashMap::new();
211        data.insert("active".to_string(), Value::Bool(true));
212        let doc = Document {
213            id: "1".to_string(),
214            data,
215        };
216        state.add_if_matches(doc).await;
217
218        // Update to inactive (should be removed)
219        let mut inactive_data = HashMap::new();
220        inactive_data.insert("active".to_string(), Value::Bool(false));
221        let inactive_doc = Document {
222            id: "1".to_string(),
223            data: inactive_data,
224        };
225
226        let update = state.update("1", inactive_doc).await;
227        assert!(matches!(update, Some(QueryUpdate::Removed(_))));
228        assert_eq!(state.count().await, 0);
229    }
230}