use std::sync::Arc;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use parking_lot::RwLock;
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use super::query::{OrderBy, TasksFilterSpec, TitleNeedle};
use super::state::TasksState;
use super::types::{Task, TaskId, TaskStatus};
pub struct TasksWatcher {
state: Arc<RwLock<TasksState>>,
changes: BoxStream<'static, u64>,
spec: TasksFilterSpec,
}
impl TasksWatcher {
pub(super) fn new(state: Arc<RwLock<TasksState>>, changes: BoxStream<'static, u64>) -> Self {
Self {
state,
changes,
spec: TasksFilterSpec::default(),
}
}
pub fn where_status(mut self, status: TaskStatus) -> Self {
self.spec.status = Some(status);
self
}
pub fn where_id_in(mut self, ids: impl IntoIterator<Item = TaskId>) -> Self {
self.spec.id_in = Some(ids.into_iter().collect());
self
}
pub fn created_after(mut self, ns: u64) -> Self {
self.spec.created_after_ns = Some(ns);
self
}
pub fn created_before(mut self, ns: u64) -> Self {
self.spec.created_before_ns = Some(ns);
self
}
pub fn updated_after(mut self, ns: u64) -> Self {
self.spec.updated_after_ns = Some(ns);
self
}
pub fn updated_before(mut self, ns: u64) -> Self {
self.spec.updated_before_ns = Some(ns);
self
}
pub fn title_contains(mut self, needle: impl Into<String>) -> Self {
self.spec.title_contains = Some(TitleNeedle::new(needle));
self
}
pub fn order_by(mut self, order: OrderBy) -> Self {
self.spec.order_by = Some(order);
self
}
pub fn limit(mut self, n: usize) -> Self {
self.spec.limit = Some(n);
self
}
pub(super) fn spec_for_snapshot(&self) -> TasksFilterSpec {
let mut spec = self.spec.clone();
if spec.order_by.is_none() {
spec.order_by = Some(OrderBy::IdAsc);
}
spec
}
pub fn stream(self) -> impl Stream<Item = Vec<Task>> + Send + 'static {
let TasksWatcher {
state,
mut changes,
mut spec,
} = self;
if spec.order_by.is_none() {
spec.order_by = Some(OrderBy::IdAsc);
}
let initial = {
let guard = state.read();
spec.execute(&guard)
};
let (tx, rx) = watch::channel(initial.clone());
tokio::spawn(async move {
let mut last = initial;
loop {
tokio::select! {
_ = tx.closed() => return,
maybe_seq = changes.next() => {
let Some(_seq) = maybe_seq else { return };
let current = {
let guard = state.read();
spec.execute(&guard)
};
if current != last {
if tx.send(current.clone()).is_err() {
return;
}
last = current;
}
}
}
}
});
WatchStream::new(rx)
}
}