eyeball_im/vector/
subscriber.rs

1use std::{
2    fmt,
3    hint::unreachable_unchecked,
4    mem,
5    pin::Pin,
6    task::{ready, Context, Poll},
7    vec,
8};
9
10use crate::reusable_box::ReusableBoxFuture;
11use futures_core::Stream;
12use imbl::Vector;
13use tokio::sync::broadcast::{
14    self,
15    error::{RecvError, TryRecvError},
16    Receiver,
17};
18#[cfg(feature = "tracing")]
19use tracing::info;
20
21use super::{BroadcastMessage, OneOrManyDiffs, VectorDiff};
22
23/// A subscriber for updates of a [`Vector`].
24#[derive(Debug)]
25pub struct VectorSubscriber<T> {
26    values: Vector<T>,
27    rx: Receiver<BroadcastMessage<T>>,
28}
29
30impl<T: Clone + 'static> VectorSubscriber<T> {
31    pub(super) fn new(items: Vector<T>, rx: Receiver<BroadcastMessage<T>>) -> Self {
32        Self { values: items, rx }
33    }
34
35    /// Get the items the [`ObservableVector`][super::ObservableVector]
36    /// contained when this subscriber was created.
37    pub fn values(&self) -> Vector<T> {
38        self.values.clone()
39    }
40
41    /// Turn this `VectorSubcriber` into a stream of `VectorDiff`s.
42    pub fn into_stream(self) -> VectorSubscriberStream<T> {
43        VectorSubscriberStream::new(ReusableBoxRecvFuture::new(self.rx))
44    }
45
46    /// Turn this `VectorSubcriber` into a stream of `Vec<VectorDiff>`s.
47    pub fn into_batched_stream(self) -> VectorSubscriberBatchedStream<T> {
48        VectorSubscriberBatchedStream::new(ReusableBoxRecvFuture::new(self.rx))
49    }
50
51    /// Destructure this `VectorSubscriber` into the initial values and a stream
52    /// of `VectorDiff`s.
53    ///
54    /// Semantically equivalent to calling `.values()` and `.into_stream()`
55    /// separately, but guarantees that the values are not unnecessarily cloned.
56    pub fn into_values_and_stream(self) -> (Vector<T>, VectorSubscriberStream<T>) {
57        let Self { values, rx } = self;
58        (values, VectorSubscriberStream::new(ReusableBoxRecvFuture::new(rx)))
59    }
60
61    /// Destructure this `VectorSubscriber` into the initial values and a stream
62    /// of `Vec<VectorDiff>`s.
63    ///
64    /// Semantically equivalent to calling `.values()` and
65    /// `.into_batched_stream()` separately, but guarantees that the values
66    /// are not unnecessarily cloned.
67    pub fn into_values_and_batched_stream(self) -> (Vector<T>, VectorSubscriberBatchedStream<T>) {
68        let Self { values, rx } = self;
69        (values, VectorSubscriberBatchedStream::new(ReusableBoxRecvFuture::new(rx)))
70    }
71}
72
73/// A stream of `VectorDiff`s created from a [`VectorSubscriber`].
74///
75/// Use its [`Stream`] implementation to interact with it (futures-util and
76/// other futures-related crates have extension traits with convenience
77/// methods).
78#[derive(Debug)]
79pub struct VectorSubscriberStream<T> {
80    inner: ReusableBoxRecvFuture<T>,
81    state: VectorSubscriberStreamState<T>,
82}
83
84impl<T> VectorSubscriberStream<T> {
85    fn new(inner: ReusableBoxRecvFuture<T>) -> Self {
86        Self { inner, state: VectorSubscriberStreamState::Recv }
87    }
88}
89
90#[derive(Debug)]
91enum VectorSubscriberStreamState<T> {
92    // Stream is waiting on a new message from the inner broadcast receiver.
93    Recv,
94    // Stream is yielding remaining items from a previous message with multiple
95    // diffs.
96    YieldBatch { iter: vec::IntoIter<VectorDiff<T>>, rx: Receiver<BroadcastMessage<T>> },
97}
98
99// Not clear why this explicit impl is needed, but it's not unsafe so it is fine
100impl<T> Unpin for VectorSubscriberStreamState<T> {}
101
102impl<T: Clone + 'static> Stream for VectorSubscriberStream<T> {
103    type Item = VectorDiff<T>;
104
105    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
106        match &mut self.state {
107            VectorSubscriberStreamState::Recv => {
108                let (result, mut rx) = ready!(self.inner.poll(cx));
109
110                let poll = match result {
111                    Ok(msg) => match msg.diffs {
112                        OneOrManyDiffs::One(diff) => Poll::Ready(Some(diff)),
113                        OneOrManyDiffs::Many(diffs) if diffs.is_empty() => {
114                            unreachable!("ObservableVectorTransaction never sends empty diffs")
115                        }
116                        OneOrManyDiffs::Many(mut diffs) if diffs.len() == 1 => {
117                            Poll::Ready(Some(diffs.pop().unwrap()))
118                        }
119                        OneOrManyDiffs::Many(diffs) => {
120                            let mut iter = diffs.into_iter();
121                            let fst = iter.next().unwrap();
122                            self.state = VectorSubscriberStreamState::YieldBatch { iter, rx };
123                            return Poll::Ready(Some(fst));
124                        }
125                    },
126                    Err(RecvError::Closed) => Poll::Ready(None),
127                    Err(RecvError::Lagged(_)) => {
128                        Poll::Ready(handle_lag(&mut rx).map(|values| VectorDiff::Reset { values }))
129                    }
130                };
131
132                self.inner.set(rx);
133                poll
134            }
135            VectorSubscriberStreamState::YieldBatch { iter, .. } => {
136                let diff =
137                    iter.next().expect("YieldBatch is never left empty when exiting poll_next");
138
139                if iter.len() == 0 {
140                    let old_state =
141                        mem::replace(&mut self.state, VectorSubscriberStreamState::Recv);
142                    let rx = match old_state {
143                        VectorSubscriberStreamState::YieldBatch { rx, .. } => rx,
144                        // Safety: We would not be in the outer branch otherwise
145                        _ => unsafe { unreachable_unchecked() },
146                    };
147
148                    self.inner.set(rx);
149                }
150
151                Poll::Ready(Some(diff))
152            }
153        }
154    }
155}
156
157/// A batched stream of `VectorDiff`s created from a [`VectorSubscriber`].
158///
159/// Use its [`Stream`] implementation to interact with it (futures-util and
160/// other futures-related crates have extension traits with convenience
161/// methods).
162#[derive(Debug)]
163pub struct VectorSubscriberBatchedStream<T> {
164    inner: ReusableBoxRecvFuture<T>,
165}
166
167impl<T> VectorSubscriberBatchedStream<T> {
168    fn new(inner: ReusableBoxRecvFuture<T>) -> Self {
169        Self { inner }
170    }
171}
172
173impl<T: Clone + 'static> Stream for VectorSubscriberBatchedStream<T> {
174    type Item = Vec<VectorDiff<T>>;
175
176    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
177        fn append<T>(target: &mut Vec<VectorDiff<T>>, source: OneOrManyDiffs<T>) {
178            match source {
179                OneOrManyDiffs::One(diff) => target.push(diff),
180                OneOrManyDiffs::Many(mut diffs) => target.append(&mut diffs),
181            }
182        }
183
184        let (result, mut rx) = ready!(self.inner.poll(cx));
185
186        let poll = match result {
187            Ok(msg) => {
188                let mut batch = msg.diffs.into_vec();
189                loop {
190                    match rx.try_recv() {
191                        Ok(msg) => append(&mut batch, msg.diffs),
192                        Err(TryRecvError::Empty | TryRecvError::Closed) => {
193                            break Poll::Ready(Some(batch));
194                        }
195                        Err(TryRecvError::Lagged(_)) => {
196                            break Poll::Ready(
197                                handle_lag(&mut rx)
198                                    .map(|values| vec![VectorDiff::Reset { values }]),
199                            );
200                        }
201                    }
202                }
203            }
204            Err(RecvError::Closed) => Poll::Ready(None),
205            Err(RecvError::Lagged(_)) => {
206                Poll::Ready(handle_lag(&mut rx).map(|values| vec![VectorDiff::Reset { values }]))
207            }
208        };
209
210        self.inner.set(rx);
211        poll
212    }
213}
214
215fn handle_lag<T: Clone + 'static>(rx: &mut Receiver<BroadcastMessage<T>>) -> Option<Vector<T>> {
216    let mut msg = None;
217    loop {
218        match rx.try_recv() {
219            // There's a newer message in the receiver's buffer, use that for reset.
220            Ok(m) => {
221                msg = Some(m);
222            }
223            // Ideally we'd return a `VecDiff::Reset` with the last state before the
224            // channel was closed here, but we have no way of obtaining the last state.
225            Err(TryRecvError::Closed) => {
226                #[cfg(feature = "tracing")]
227                info!("Channel closed after lag, can't return last state");
228                return None;
229            }
230            // Lagged twice in a row, is this possible? If it is, it's fine to just
231            // loop again and look at the next try_recv result.
232            Err(TryRecvError::Lagged(_)) => {}
233            Err(TryRecvError::Empty) => match msg {
234                // We exhausted the internal buffer using try_recv, msg contains the
235                // last message from it, which we use for the reset.
236                Some(msg) => return Some(msg.state),
237                // We exhausted the internal buffer using try_recv but there was no
238                // message in it, even though we got TryRecvError::Lagged(_) before.
239                None => unreachable!("got no new message via try_recv after lag"),
240            },
241        }
242    }
243}
244
245type SubscriberFutureReturn<T> = (Result<T, RecvError>, Receiver<T>);
246
247struct ReusableBoxRecvFuture<T> {
248    inner: ReusableBoxFuture<'static, SubscriberFutureReturn<BroadcastMessage<T>>>,
249}
250
251async fn make_recv_future<T: Clone>(mut rx: Receiver<T>) -> SubscriberFutureReturn<T> {
252    let result = rx.recv().await;
253    (result, rx)
254}
255
256impl<T> ReusableBoxRecvFuture<T>
257where
258    T: Clone + 'static,
259{
260    fn set(&mut self, rx: Receiver<BroadcastMessage<T>>) {
261        self.inner.set(make_recv_future(rx));
262    }
263
264    fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SubscriberFutureReturn<BroadcastMessage<T>>> {
265        self.inner.poll(cx)
266    }
267}
268
269impl<T> ReusableBoxRecvFuture<T>
270where
271    T: Clone + 'static,
272{
273    fn new(rx: Receiver<BroadcastMessage<T>>) -> Self {
274        Self { inner: ReusableBoxFuture::new(make_recv_future(rx)) }
275    }
276}
277
278fn assert_send<T: Send>(_val: T) {}
279#[allow(unused)]
280fn assert_make_future_send() {
281    #[derive(Clone)]
282    struct IsSend(*mut ());
283    unsafe impl Send for IsSend {}
284
285    let (_sender, receiver): (_, Receiver<IsSend>) = broadcast::channel(1);
286
287    assert_send(make_recv_future(receiver));
288}
289// SAFETY: make_future is Send if T is, as proven by assert_make_future_send.
290unsafe impl<T: Send> Send for ReusableBoxRecvFuture<T> {}
291
292impl<T> fmt::Debug for ReusableBoxRecvFuture<T> {
293    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294        f.debug_struct("ReusableBoxRecvFuture").finish()
295    }
296}