Skip to main content

unifly_core/stream/
mod.rs

1// ── Reactive entity streams ──
2//
3// Subscription types for consuming entity changes from the DataStore.
4
5mod filter;
6
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use futures_core::Stream;
12use tokio::sync::watch;
13use tokio_stream::wrappers::WatchStream;
14
15pub use filter::{ClientFilter, DeviceFilter};
16
17/// A subscription to a collection of entities.
18///
19/// Provides both point-in-time snapshot access and reactive change
20/// notification via the `changed()` method or by converting to a `Stream`.
21pub struct EntityStream<T: Clone + Send + Sync + 'static> {
22    current: Arc<Vec<Arc<T>>>,
23    receiver: watch::Receiver<Arc<Vec<Arc<T>>>>,
24}
25
26impl<T: Clone + Send + Sync + 'static> EntityStream<T> {
27    pub(crate) fn new(receiver: watch::Receiver<Arc<Vec<Arc<T>>>>) -> Self {
28        let current = receiver.borrow().clone();
29        Self { current, receiver }
30    }
31
32    /// Get the snapshot captured at creation time.
33    pub fn current(&self) -> &Arc<Vec<Arc<T>>> {
34        &self.current
35    }
36
37    /// Get the latest snapshot (may have changed since creation).
38    pub fn latest(&self) -> Arc<Vec<Arc<T>>> {
39        self.receiver.borrow().clone()
40    }
41
42    /// Wait for the next change, returning the new snapshot.
43    /// Returns `None` if the sender (DataStore) has been dropped.
44    pub async fn changed(&mut self) -> Option<Arc<Vec<Arc<T>>>> {
45        self.receiver.changed().await.ok()?;
46        let snap = self.receiver.borrow_and_update().clone();
47        self.current = snap.clone();
48        Some(snap)
49    }
50
51    /// Convert into a `Stream` for use with `StreamExt` combinators.
52    pub fn into_stream(self) -> EntityWatchStream<T> {
53        EntityWatchStream {
54            inner: WatchStream::new(self.receiver),
55        }
56    }
57}
58
59/// `Stream` adapter backed by a `watch::Receiver`.
60///
61/// Yields a new `Arc<Vec<Arc<T>>>` snapshot each time the underlying
62/// collection is mutated.
63pub struct EntityWatchStream<T: Clone + Send + Sync + 'static> {
64    inner: WatchStream<Arc<Vec<Arc<T>>>>,
65}
66
67impl<T: Clone + Send + Sync + 'static> Stream for EntityWatchStream<T> {
68    type Item = Arc<Vec<Arc<T>>>;
69
70    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
71        // WatchStream is Unpin when the inner type is Unpin.
72        // Arc<Vec<Arc<T>>> is always Unpin, so this is safe.
73        Pin::new(&mut self.inner).poll_next(cx)
74    }
75}