aurora_db/reactive/
watcher.rs

1use super::{QueryUpdate, ReactiveQueryState};
2use crate::pubsub::ChangeListener;
3use crate::types::Document;
4use std::sync::Arc;
5use tokio::sync::mpsc;
6
7/// Watches a query and emits updates when results change
8pub struct QueryWatcher {
9    /// Receiver for query updates
10    receiver: mpsc::UnboundedReceiver<QueryUpdate>,
11    /// Collection being watched
12    collection: String,
13}
14
15impl QueryWatcher {
16    /// Create a new query watcher
17    ///
18    /// # Arguments
19    /// * `collection` - Collection to watch
20    /// * `listener` - Change listener for the collection
21    /// * `state` - Reactive query state
22    /// * `initial_results` - Initial query results to populate the state
23    pub fn new(
24        collection: impl Into<String>,
25        mut listener: ChangeListener,
26        state: Arc<ReactiveQueryState>,
27        initial_results: Vec<Document>,
28    ) -> Self {
29        let collection = collection.into();
30        let (sender, receiver) = mpsc::unbounded_channel();
31
32        // Populate initial results
33        let init_state = Arc::clone(&state);
34        let init_sender = sender.clone();
35        tokio::spawn(async move {
36            for doc in initial_results {
37                if let Some(update) = init_state.add_if_matches(doc).await {
38                    let _ = init_sender.send(update);
39                }
40            }
41        });
42
43        // Spawn background task to listen for changes
44        tokio::spawn(async move {
45            while let Ok(event) = listener.recv().await {
46                let update = match event.change_type {
47                    crate::pubsub::ChangeType::Insert => {
48                        if let Some(doc) = event.document {
49                            state.add_if_matches(doc).await
50                        } else {
51                            None
52                        }
53                    }
54                    crate::pubsub::ChangeType::Update => {
55                        if let Some(new_doc) = event.document {
56                            state.update(&event.id, new_doc).await
57                        } else {
58                            None
59                        }
60                    }
61                    crate::pubsub::ChangeType::Delete => state.remove(&event.id).await,
62                };
63
64                if let Some(u) = update
65                    && sender.send(u).is_err() {
66                        // Receiver dropped, stop watching
67                        break;
68                    }
69            }
70        });
71
72        Self {
73            receiver,
74            collection,
75        }
76    }
77
78    /// Get the next query update
79    /// Returns None when the watcher is closed
80    pub async fn next(&mut self) -> Option<QueryUpdate> {
81        self.receiver.recv().await
82    }
83
84    /// Get the collection name being watched
85    pub fn collection(&self) -> &str {
86        &self.collection
87    }
88
89    /// Try to receive an update without blocking
90    pub fn try_next(&mut self) -> Option<QueryUpdate> {
91        self.receiver.try_recv().ok()
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use super::*;
98    use crate::pubsub::{ChangeEvent, PubSubSystem};
99    use crate::types::Value;
100    use std::collections::HashMap;
101
102    #[tokio::test]
103    async fn test_query_watcher_insert() {
104        let pubsub = PubSubSystem::new(100);
105        let listener = pubsub.listen("users");
106
107        let state = Arc::new(ReactiveQueryState::new(|doc: &Document| {
108            doc.data.get("active") == Some(&Value::Bool(true))
109        }));
110
111        let mut watcher = QueryWatcher::new("users", listener, state, vec![]);
112
113        // Publish an insert event for an active user
114        let mut data = HashMap::new();
115        data.insert("active".to_string(), Value::Bool(true));
116        data.insert("name".to_string(), Value::String("Alice".into()));
117
118        let doc = Document {
119            id: "1".to_string(),
120            data,
121        };
122
123        pubsub
124            .publish(ChangeEvent::insert("users", "1", doc))
125            .unwrap();
126
127        // Should receive an Added update
128        let update = watcher.next().await.unwrap();
129        assert!(matches!(update, QueryUpdate::Added(_)));
130        assert_eq!(update.id(), "1");
131    }
132
133    #[tokio::test]
134    async fn test_query_watcher_filter() {
135        let pubsub = PubSubSystem::new(100);
136        let listener = pubsub.listen("users");
137
138        let state = Arc::new(ReactiveQueryState::new(|doc: &Document| {
139            doc.data.get("active") == Some(&Value::Bool(true))
140        }));
141
142        let mut watcher = QueryWatcher::new("users", listener, state, vec![]);
143
144        // Publish an inactive user (should be filtered)
145        let mut inactive_data = HashMap::new();
146        inactive_data.insert("active".to_string(), Value::Bool(false));
147
148        pubsub
149            .publish(ChangeEvent::insert(
150                "users",
151                "1",
152                Document {
153                    id: "1".to_string(),
154                    data: inactive_data,
155                },
156            ))
157            .unwrap();
158
159        // Publish an active user (should pass filter)
160        let mut active_data = HashMap::new();
161        active_data.insert("active".to_string(), Value::Bool(true));
162
163        pubsub
164            .publish(ChangeEvent::insert(
165                "users",
166                "2",
167                Document {
168                    id: "2".to_string(),
169                    data: active_data,
170                },
171            ))
172            .unwrap();
173
174        // Should only receive the active user
175        let update = watcher.next().await.unwrap();
176        assert_eq!(update.id(), "2");
177    }
178}