Skip to main content

aurora_db/reactive/
mod.rs

1// Reactive Queries - Live query results that auto-update
2//
3// This module provides reactive queries that automatically update
4// when underlying data changes, similar to Firebase's real-time queries.
5
6pub mod updates;
7pub mod watcher;
8
9pub use updates::{QueryUpdate, UpdateType};
10pub use watcher::{QueryWatcher, ThrottledQueryWatcher};
11
12use crate::types::Document;
13use std::collections::HashMap;
14use std::sync::Arc;
15use tokio::sync::RwLock;
16
17/// Tracks the current state of a reactive query
18pub struct ReactiveQueryState {
19    /// Current results (keyed by document ID)
20    results: Arc<RwLock<HashMap<String, Document>>>,
21    /// Filter function to check if a document matches
22    filter: Arc<dyn Fn(&Document) -> bool + Send + Sync>,
23}
24
25impl ReactiveQueryState {
26    pub fn new<F>(filter: F) -> Self
27    where
28        F: Fn(&Document) -> bool + Send + Sync + 'static,
29    {
30        Self {
31            results: Arc::new(RwLock::new(HashMap::new())),
32            filter: Arc::new(filter),
33        }
34    }
35
36    /// Check if a document matches the query filter
37    pub fn matches(&self, doc: &Document) -> bool {
38        (self.filter)(doc)
39    }
40
41    /// Add a document to results if it matches
42    pub async fn add_if_matches(&self, doc: Document) -> Option<QueryUpdate> {
43        if self.matches(&doc) {
44            let mut results = self.results.write().await;
45            let id = doc.id.clone();
46
47            match results.entry(id) {
48                std::collections::hash_map::Entry::Occupied(mut e) => {
49                    // Document already in results, this is a modification
50                    let old = e.insert(doc.clone());
51                    Some(QueryUpdate::Modified { old, new: doc })
52                }
53                std::collections::hash_map::Entry::Vacant(e) => {
54                    // New document added to results
55                    e.insert(doc.clone());
56                    Some(QueryUpdate::Added(doc))
57                }
58            }
59        } else {
60            None
61        }
62    }
63
64    /// Remove a document from results
65    pub async fn remove(&self, id: &str) -> Option<QueryUpdate> {
66        let mut results = self.results.write().await;
67        results.remove(id).map(QueryUpdate::Removed)
68    }
69
70    /// Update a document, checking if it should be added/removed/modified
71    pub async fn update(&self, id: &str, new_doc: Document) -> Option<QueryUpdate> {
72        let should_be_in_results = self.matches(&new_doc);
73        let mut results = self.results.write().await;
74        let was_in_results = results.contains_key(id);
75
76        match (was_in_results, should_be_in_results) {
77            (true, true) => {
78                let old = results.insert(id.to_string(), new_doc.clone());
79                Some(QueryUpdate::Modified {
80                    old: old.unwrap(),
81                    new: new_doc,
82                })
83            }
84            (true, false) => results.remove(id).map(QueryUpdate::Removed),
85            (false, true) => {
86                results.insert(id.to_string(), new_doc.clone());
87                Some(QueryUpdate::Added(new_doc))
88            }
89            (false, false) => None,
90        }
91    }
92
93    /// Get current results as a Vec
94    pub async fn get_results(&self) -> Vec<Document> {
95        self.results.read().await.values().cloned().collect()
96    }
97
98    /// Get count of current results
99    pub async fn count(&self) -> usize {
100        self.results.read().await.len()
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107    use crate::types::Value;
108
109    #[tokio::test]
110    async fn test_reactive_state_add_if_matches() {
111        let state = ReactiveQueryState::new(|doc: &Document| {
112            doc.data.get("active") == Some(&Value::Bool(true))
113        });
114
115        let mut data = HashMap::new();
116        data.insert("active".to_string(), Value::Bool(true));
117        let doc = Document {
118            id: "1".to_string(),
119            data,
120        };
121
122        let update = state.add_if_matches(doc.clone()).await;
123        assert!(matches!(update, Some(QueryUpdate::Added(_))));
124        assert_eq!(state.count().await, 1);
125    }
126
127    #[tokio::test]
128    async fn test_reactive_state_filter() {
129        let state = ReactiveQueryState::new(|doc: &Document| {
130            doc.data.get("active") == Some(&Value::Bool(true))
131        });
132
133        let mut active_data = HashMap::new();
134        active_data.insert("active".to_string(), Value::Bool(true));
135        let active_doc = Document {
136            id: "1".to_string(),
137            data: active_data,
138        };
139
140        let mut inactive_data = HashMap::new();
141        inactive_data.insert("active".to_string(), Value::Bool(false));
142        let inactive_doc = Document {
143            id: "2".to_string(),
144            data: inactive_data,
145        };
146
147        // Active doc should be added
148        assert!(state.add_if_matches(active_doc).await.is_some());
149        assert_eq!(state.count().await, 1);
150
151        // Inactive doc should NOT be added
152        assert!(state.add_if_matches(inactive_doc).await.is_none());
153        assert_eq!(state.count().await, 1);
154    }
155
156    #[tokio::test]
157    async fn test_reactive_state_update_transitions() {
158        let state = ReactiveQueryState::new(|doc: &Document| {
159            doc.data.get("active") == Some(&Value::Bool(true))
160        });
161
162        // Add initial active document
163        let mut data = HashMap::new();
164        data.insert("active".to_string(), Value::Bool(true));
165        let doc = Document {
166            id: "1".to_string(),
167            data,
168        };
169        state.add_if_matches(doc).await;
170
171        // Update to inactive (should be removed)
172        let mut inactive_data = HashMap::new();
173        inactive_data.insert("active".to_string(), Value::Bool(false));
174        let inactive_doc = Document {
175            id: "1".to_string(),
176            data: inactive_data,
177        };
178
179        let update = state.update("1", inactive_doc).await;
180        assert!(matches!(update, Some(QueryUpdate::Removed(_))));
181        assert_eq!(state.count().await, 0);
182    }
183}