aurora_db/reactive/
watcher.rs

1use super::{QueryUpdate, ReactiveQueryState};
2use crate::pubsub::ChangeListener;
3use crate::types::Document;
4use std::sync::Arc;
5use tokio::sync::mpsc;
6
7/// Watches a query and emits updates when results change
8pub struct QueryWatcher {
9    /// Receiver for query updates
10    receiver: mpsc::UnboundedReceiver<QueryUpdate>,
11    /// Collection being watched
12    collection: String,
13}
14
15impl QueryWatcher {
16    /// Create a new query watcher
17    ///
18    /// # Arguments
19    /// * `collection` - Collection to watch
20    /// * `listener` - Change listener for the collection
21    /// * `state` - Reactive query state
22    /// * `initial_results` - Initial query results to populate the state
23    pub fn new(
24        collection: impl Into<String>,
25        mut listener: ChangeListener,
26        state: Arc<ReactiveQueryState>,
27        initial_results: Vec<Document>,
28    ) -> Self {
29        let collection = collection.into();
30        let (sender, receiver) = mpsc::unbounded_channel();
31
32        // Populate initial results
33        let init_state = Arc::clone(&state);
34        let init_sender = sender.clone();
35        tokio::spawn(async move {
36            for doc in initial_results {
37                if let Some(update) = init_state.add_if_matches(doc).await {
38                    let _ = init_sender.send(update);
39                }
40            }
41        });
42
43        // Spawn background task to listen for changes
44        tokio::spawn(async move {
45            while let Ok(event) = listener.recv().await {
46                let update = match event.change_type {
47                    crate::pubsub::ChangeType::Insert => {
48                        if let Some(doc) = event.document {
49                            state.add_if_matches(doc).await
50                        } else {
51                            None
52                        }
53                    }
54                    crate::pubsub::ChangeType::Update => {
55                        if let Some(new_doc) = event.document {
56                            state.update(&event.id, new_doc).await
57                        } else {
58                            None
59                        }
60                    }
61                    crate::pubsub::ChangeType::Delete => state.remove(&event.id).await,
62                };
63
64                if let Some(u) = update {
65                    if sender.send(u).is_err() {
66                        // Receiver dropped, stop watching
67                        break;
68                    }
69                }
70            }
71        });
72
73        Self {
74            receiver,
75            collection,
76        }
77    }
78
79    /// Get the next query update
80    /// Returns None when the watcher is closed
81    pub async fn next(&mut self) -> Option<QueryUpdate> {
82        self.receiver.recv().await
83    }
84
85    /// Get the collection name being watched
86    pub fn collection(&self) -> &str {
87        &self.collection
88    }
89
90    /// Try to receive an update without blocking
91    pub fn try_next(&mut self) -> Option<QueryUpdate> {
92        self.receiver.try_recv().ok()
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99    use crate::pubsub::{ChangeEvent, PubSubSystem};
100    use crate::types::Value;
101    use std::collections::HashMap;
102
103    #[tokio::test]
104    async fn test_query_watcher_insert() {
105        let pubsub = PubSubSystem::new(100);
106        let listener = pubsub.listen("users");
107
108        let state = Arc::new(ReactiveQueryState::new(|doc: &Document| {
109            doc.data.get("active") == Some(&Value::Bool(true))
110        }));
111
112        let mut watcher = QueryWatcher::new("users", listener, state, vec![]);
113
114        // Publish an insert event for an active user
115        let mut data = HashMap::new();
116        data.insert("active".to_string(), Value::Bool(true));
117        data.insert("name".to_string(), Value::String("Alice".into()));
118
119        let doc = Document {
120            id: "1".to_string(),
121            data,
122        };
123
124        pubsub
125            .publish(ChangeEvent::insert("users", "1", doc))
126            .unwrap();
127
128        // Should receive an Added update
129        let update = watcher.next().await.unwrap();
130        assert!(matches!(update, QueryUpdate::Added(_)));
131        assert_eq!(update.id(), "1");
132    }
133
134    #[tokio::test]
135    async fn test_query_watcher_filter() {
136        let pubsub = PubSubSystem::new(100);
137        let listener = pubsub.listen("users");
138
139        let state = Arc::new(ReactiveQueryState::new(|doc: &Document| {
140            doc.data.get("active") == Some(&Value::Bool(true))
141        }));
142
143        let mut watcher = QueryWatcher::new("users", listener, state, vec![]);
144
145        // Publish an inactive user (should be filtered)
146        let mut inactive_data = HashMap::new();
147        inactive_data.insert("active".to_string(), Value::Bool(false));
148
149        pubsub
150            .publish(ChangeEvent::insert(
151                "users",
152                "1",
153                Document {
154                    id: "1".to_string(),
155                    data: inactive_data,
156                },
157            ))
158            .unwrap();
159
160        // Publish an active user (should pass filter)
161        let mut active_data = HashMap::new();
162        active_data.insert("active".to_string(), Value::Bool(true));
163
164        pubsub
165            .publish(ChangeEvent::insert(
166                "users",
167                "2",
168                Document {
169                    id: "2".to_string(),
170                    data: active_data,
171                },
172            ))
173            .unwrap();
174
175        // Should only receive the active user
176        let update = watcher.next().await.unwrap();
177        assert_eq!(update.id(), "2");
178    }
179}