pub mod updates;
pub mod watcher;
pub use updates::{QueryUpdate, UpdateType};
pub use watcher::{QueryWatcher, ThrottledQueryWatcher};
use crate::types::Document;
use crate::query::Filter;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct ReactiveQueryState {
results: Arc<RwLock<HashMap<String, Document>>>,
filters: Vec<Filter>,
}
impl ReactiveQueryState {
pub fn new(filters: Vec<Filter>) -> Self {
Self {
results: Arc::new(RwLock::new(HashMap::new())),
filters,
}
}
pub fn matches(&self, doc: &Document) -> bool {
self.filters.iter().all(|f| f.matches(doc))
}
pub async fn add_if_matches(&self, doc: Document) -> Option<QueryUpdate> {
if self.matches(&doc) {
let mut results = self.results.write().await;
let id = doc._sid.clone();
match results.entry(id) {
std::collections::hash_map::Entry::Occupied(mut e) => {
let old = e.insert(doc.clone());
Some(QueryUpdate::Modified { old, new: doc })
}
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(doc.clone());
Some(QueryUpdate::Added(doc))
}
}
} else {
None
}
}
pub async fn remove(&self, id: &str) -> Option<QueryUpdate> {
let mut results = self.results.write().await;
results.remove(id).map(QueryUpdate::Removed)
}
pub async fn update(&self, id: &str, new_doc: Document) -> Option<QueryUpdate> {
let should_be_in_results = self.matches(&new_doc);
let mut results = self.results.write().await;
let was_in_results = results.contains_key(id);
match (was_in_results, should_be_in_results) {
(true, true) => {
let old = results.insert(id.to_string(), new_doc.clone());
Some(QueryUpdate::Modified {
old: old.unwrap(),
new: new_doc,
})
}
(true, false) => results.remove(id).map(QueryUpdate::Removed),
(false, true) => {
results.insert(id.to_string(), new_doc.clone());
Some(QueryUpdate::Added(new_doc))
}
(false, false) => None,
}
}
pub async fn get_results(&self) -> Vec<Document> {
self.results.read().await.values().cloned().collect()
}
pub async fn count(&self) -> usize {
self.results.read().await.len()
}
pub async fn sync_state<I>(&self, new_docs: I) -> Vec<QueryUpdate>
where
I: IntoIterator<Item = Document>,
{
let mut updates = Vec::new();
let mut results = self.results.write().await;
let mut next_results = HashMap::new();
for doc in new_docs {
if self.matches(&doc) {
next_results.insert(doc._sid.clone(), doc);
}
}
let current_ids: Vec<String> = results.keys().cloned().collect();
for id in current_ids {
if !next_results.contains_key(&id) {
if let Some(old_doc) = results.remove(&id) {
updates.push(QueryUpdate::Removed(old_doc));
}
}
}
for (id, new_doc) in next_results {
match results.entry(id) {
std::collections::hash_map::Entry::Occupied(mut e) => {
let old_doc = e.get();
if old_doc.data != new_doc.data {
let old = e.insert(new_doc.clone());
updates.push(QueryUpdate::Modified { old, new: new_doc });
}
}
std::collections::hash_map::Entry::Vacant(e) => {
e.insert(new_doc.clone());
updates.push(QueryUpdate::Added(new_doc));
}
}
}
updates
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::Value;
#[tokio::test]
async fn test_reactive_state_add_if_matches() {
let state = ReactiveQueryState::new(vec![
Filter::Eq("active".to_string(), Value::Bool(true))
]);
let mut data = HashMap::new();
data.insert("active".to_string(), Value::Bool(true));
let doc = Document {
_sid: "1".to_string(),
data,
};
let update = state.add_if_matches(doc.clone()).await;
assert!(matches!(update, Some(QueryUpdate::Added(_))));
assert_eq!(state.count().await, 1);
}
#[tokio::test]
async fn test_reactive_state_filter() {
let state = ReactiveQueryState::new(vec![
Filter::Eq("active".to_string(), Value::Bool(true))
]);
let mut active_data = HashMap::new();
active_data.insert("active".to_string(), Value::Bool(true));
let active_doc = Document {
_sid: "1".to_string(),
data: active_data,
};
let mut inactive_data = HashMap::new();
inactive_data.insert("active".to_string(), Value::Bool(false));
let inactive_doc = Document {
_sid: "2".to_string(),
data: inactive_data,
};
assert!(state.add_if_matches(active_doc).await.is_some());
assert_eq!(state.count().await, 1);
assert!(state.add_if_matches(inactive_doc).await.is_none());
assert_eq!(state.count().await, 1);
}
#[tokio::test]
async fn test_reactive_state_update_transitions() {
let state = ReactiveQueryState::new(vec![
Filter::Eq("active".to_string(), Value::Bool(true))
]);
let mut data = HashMap::new();
data.insert("active".to_string(), Value::Bool(true));
let doc = Document {
_sid: "1".to_string(),
data,
};
state.add_if_matches(doc).await;
let mut inactive_data = HashMap::new();
inactive_data.insert("active".to_string(), Value::Bool(false));
let inactive_doc = Document {
_sid: "1".to_string(),
data: inactive_data,
};
let update = state.update("1", inactive_doc).await;
assert!(matches!(update, Some(QueryUpdate::Removed(_))));
assert_eq!(state.count().await, 0);
}
}