unifly_core/stream/
mod.rs1mod filter;
6
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use futures_core::Stream;
12use tokio::sync::watch;
13use tokio_stream::wrappers::WatchStream;
14
15pub use filter::{ClientFilter, DeviceFilter};
16
17pub struct EntityStream<T: Clone + Send + Sync + 'static> {
22 current: Arc<Vec<Arc<T>>>,
23 receiver: watch::Receiver<Arc<Vec<Arc<T>>>>,
24}
25
26impl<T: Clone + Send + Sync + 'static> EntityStream<T> {
27 pub(crate) fn new(receiver: watch::Receiver<Arc<Vec<Arc<T>>>>) -> Self {
28 let current = receiver.borrow().clone();
29 Self { current, receiver }
30 }
31
32 pub fn current(&self) -> &Arc<Vec<Arc<T>>> {
34 &self.current
35 }
36
37 pub fn latest(&self) -> Arc<Vec<Arc<T>>> {
39 self.receiver.borrow().clone()
40 }
41
42 pub async fn changed(&mut self) -> Option<Arc<Vec<Arc<T>>>> {
45 self.receiver.changed().await.ok()?;
46 let snap = self.receiver.borrow_and_update().clone();
47 self.current = snap.clone();
48 Some(snap)
49 }
50
51 pub fn into_stream(self) -> EntityWatchStream<T> {
53 EntityWatchStream {
54 inner: WatchStream::new(self.receiver),
55 }
56 }
57}
58
59pub struct EntityWatchStream<T: Clone + Send + Sync + 'static> {
64 inner: WatchStream<Arc<Vec<Arc<T>>>>,
65}
66
67impl<T: Clone + Send + Sync + 'static> Stream for EntityWatchStream<T> {
68 type Item = Arc<Vec<Arc<T>>>;
69
70 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
71 Pin::new(&mut self.inner).poll_next(cx)
74 }
75}