use super::{QueryUpdate, ReactiveQueryState};
use crate::Aurora;
use crate::pubsub::ChangeListener;
use crate::types::Document;
use std::sync::Arc;
use tokio::sync::mpsc;
pub struct QueryWatcher {
receiver: mpsc::UnboundedReceiver<QueryUpdate>,
collection: String,
#[allow(dead_code)]
db: Arc<Aurora>,
}
impl QueryWatcher {
pub fn new(
db: Arc<Aurora>,
collection: impl Into<String>,
mut listener: ChangeListener,
state: Arc<ReactiveQueryState>,
initial_results: Vec<Document>,
debounce_duration: Option<std::time::Duration>,
) -> Self {
let collection = collection.into();
let (sender, receiver) = mpsc::unbounded_channel();
let init_state = Arc::clone(&state);
let init_sender = sender.clone();
tokio::spawn(async move {
for doc in initial_results {
if let Some(update) = init_state.add_if_matches(doc).await {
let _ = init_sender.send(update);
}
}
});
let db_clone = Arc::clone(&db);
let coll_clone = collection.clone();
let state_clone = Arc::clone(&state);
let sender_clone = sender.clone();
tokio::spawn(async move {
let mut backoff_ms = 100;
loop {
match listener.recv().await {
Ok(event) => {
if backoff_ms > 100 { backoff_ms -= 50; }
let update = match event.change_type {
crate::pubsub::ChangeType::Insert => {
if let Some(doc) = event.document {
state_clone.add_if_matches(doc).await
} else {
None
}
}
crate::pubsub::ChangeType::Update => {
if let Some(new_doc) = event.document {
state_clone.update(&event._sid, new_doc).await
} else {
None
}
}
crate::pubsub::ChangeType::Delete => state_clone.remove(&event._sid).await,
};
if let Some(u) = update && sender_clone.send(u).is_err() {
break;
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(skipped)) => {
eprintln!(
"WARNING: Watcher for '{}' lagged by {} events. Applying {}ms backoff...",
coll_clone, skipped, backoff_ms
);
loop {
match listener.try_recv() {
Ok(_) | Err(tokio::sync::broadcast::error::TryRecvError::Lagged(_)) => continue,
_ => break,
}
}
tokio::time::sleep(tokio::time::Duration::from_millis(backoff_ms)).await;
backoff_ms = (backoff_ms * 2).min(2000);
let docs_snapshot: Vec<Document> = if let Ok(iter) = db_clone.stream_collection(&coll_clone) {
iter.collect()
} else {
Vec::new()
};
let updates = state_clone.sync_state(docs_snapshot).await;
for u in updates {
if sender_clone.send(u).is_err() {
return;
}
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
}
}
});
let final_receiver = if let Some(duration) = debounce_duration {
let (tx_throttled, rx_throttled) = mpsc::unbounded_channel();
let mut raw_rx = receiver;
tokio::spawn(async move {
use std::collections::HashMap;
let mut tick = tokio::time::interval(duration);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut pending: HashMap<String, QueryUpdate> = HashMap::new();
loop {
tokio::select! {
biased;
maybe_update = raw_rx.recv() => {
match maybe_update {
Some(update) => { pending.insert(update.id().to_string(), update); }
None => break,
}
}
_ = tick.tick() => {
if !pending.is_empty() {
for (_, update) in pending.drain() {
if tx_throttled.send(update).is_err() { return; }
}
}
}
}
}
});
rx_throttled
} else {
receiver
};
Self {
receiver: final_receiver,
collection,
db,
}
}
pub async fn next(&mut self) -> Option<QueryUpdate> { self.receiver.recv().await }
pub fn collection(&self) -> &str { &self.collection }
pub fn try_next(&mut self) -> Option<QueryUpdate> { self.receiver.try_recv().ok() }
pub fn throttled(self, interval: std::time::Duration) -> ThrottledQueryWatcher {
ThrottledQueryWatcher::new(self.receiver, self.collection, interval)
}
}
pub struct ThrottledQueryWatcher {
receiver: mpsc::UnboundedReceiver<QueryUpdate>,
collection: String,
}
impl ThrottledQueryWatcher {
pub fn new(
mut raw_receiver: mpsc::UnboundedReceiver<QueryUpdate>,
collection: impl Into<String>,
interval: std::time::Duration,
) -> Self {
let collection = collection.into();
let (tx, rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
use std::collections::HashMap;
let mut tick = tokio::time::interval(interval);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
let mut pending: HashMap<String, QueryUpdate> = HashMap::new();
loop {
tokio::select! {
biased;
maybe_update = raw_receiver.recv() => {
match maybe_update {
Some(update) => { pending.insert(update.id().to_string(), update); }
None => break,
}
}
_ = tick.tick() => {
if !pending.is_empty() {
for (_, update) in pending.drain() {
if tx.send(update).is_err() { return; }
}
}
}
}
}
});
Self { receiver: rx, collection }
}
pub async fn next(&mut self) -> Option<QueryUpdate> { self.receiver.recv().await }
pub fn collection(&self) -> &str { &self.collection }
pub fn try_next(&mut self) -> Option<QueryUpdate> { self.receiver.try_recv().ok() }
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pubsub::ChangeEvent;
use crate::types::Value;
use std::collections::HashMap;
#[tokio::test]
async fn test_query_watcher_insert() {
let temp_dir = tempfile::tempdir().unwrap();
let db = Arc::new(Aurora::open(temp_dir.path().join("test.db")).await.unwrap());
let listener = db.pubsub.listen("users");
let state = Arc::new(ReactiveQueryState::new(vec![
crate::query::Filter::Eq("active".to_string(), Value::Bool(true))
]));
let mut watcher = QueryWatcher::new(db.clone(), "users", listener, state, vec![], None);
let mut data = HashMap::new();
data.insert("active".to_string(), Value::Bool(true));
data.insert("name".to_string(), Value::String("Alice".into()));
let doc = Document { _sid: "1".to_string(), data };
db.pubsub.publish(ChangeEvent::insert("users", "1", doc)).unwrap();
let update = watcher.next().await.unwrap();
assert!(matches!(update, QueryUpdate::Added(_)));
assert_eq!(update.id(), "1");
}
}