mod filter;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures_core::Stream;
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
pub use filter::{ClientFilter, DeviceFilter};
pub struct EntityStream<T: Clone + Send + Sync + 'static> {
current: Arc<Vec<Arc<T>>>,
receiver: watch::Receiver<Arc<Vec<Arc<T>>>>,
}
impl<T: Clone + Send + Sync + 'static> EntityStream<T> {
pub(crate) fn new(receiver: watch::Receiver<Arc<Vec<Arc<T>>>>) -> Self {
let current = receiver.borrow().clone();
Self { current, receiver }
}
pub fn current(&self) -> &Arc<Vec<Arc<T>>> {
&self.current
}
pub fn latest(&self) -> Arc<Vec<Arc<T>>> {
self.receiver.borrow().clone()
}
pub async fn changed(&mut self) -> Option<Arc<Vec<Arc<T>>>> {
self.receiver.changed().await.ok()?;
let snap = self.receiver.borrow_and_update().clone();
self.current = snap.clone();
Some(snap)
}
pub fn into_stream(self) -> EntityWatchStream<T> {
EntityWatchStream {
inner: WatchStream::new(self.receiver),
}
}
}
pub struct EntityWatchStream<T: Clone + Send + Sync + 'static> {
inner: WatchStream<Arc<Vec<Arc<T>>>>,
}
impl<T: Clone + Send + Sync + 'static> Stream for EntityWatchStream<T> {
type Item = Arc<Vec<Arc<T>>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.inner).poll_next(cx)
}
}