eyeball_im/vector/
subscriber.rs1use std::{
2 fmt,
3 hint::unreachable_unchecked,
4 mem,
5 pin::Pin,
6 task::{ready, Context, Poll},
7 vec,
8};
9
10use crate::reusable_box::ReusableBoxFuture;
11use futures_core::Stream;
12use imbl::Vector;
13use tokio::sync::broadcast::{
14 self,
15 error::{RecvError, TryRecvError},
16 Receiver,
17};
18#[cfg(feature = "tracing")]
19use tracing::info;
20
21use super::{BroadcastMessage, OneOrManyDiffs, VectorDiff};
22
23#[derive(Debug)]
25pub struct VectorSubscriber<T> {
26 values: Vector<T>,
27 rx: Receiver<BroadcastMessage<T>>,
28}
29
30impl<T: Clone + 'static> VectorSubscriber<T> {
31 pub(super) fn new(items: Vector<T>, rx: Receiver<BroadcastMessage<T>>) -> Self {
32 Self { values: items, rx }
33 }
34
35 pub fn values(&self) -> Vector<T> {
38 self.values.clone()
39 }
40
41 pub fn into_stream(self) -> VectorSubscriberStream<T> {
43 VectorSubscriberStream::new(ReusableBoxRecvFuture::new(self.rx))
44 }
45
46 pub fn into_batched_stream(self) -> VectorSubscriberBatchedStream<T> {
48 VectorSubscriberBatchedStream::new(ReusableBoxRecvFuture::new(self.rx))
49 }
50
51 pub fn into_values_and_stream(self) -> (Vector<T>, VectorSubscriberStream<T>) {
57 let Self { values, rx } = self;
58 (values, VectorSubscriberStream::new(ReusableBoxRecvFuture::new(rx)))
59 }
60
61 pub fn into_values_and_batched_stream(self) -> (Vector<T>, VectorSubscriberBatchedStream<T>) {
68 let Self { values, rx } = self;
69 (values, VectorSubscriberBatchedStream::new(ReusableBoxRecvFuture::new(rx)))
70 }
71}
72
73#[derive(Debug)]
79pub struct VectorSubscriberStream<T> {
80 inner: ReusableBoxRecvFuture<T>,
81 state: VectorSubscriberStreamState<T>,
82}
83
84impl<T> VectorSubscriberStream<T> {
85 fn new(inner: ReusableBoxRecvFuture<T>) -> Self {
86 Self { inner, state: VectorSubscriberStreamState::Recv }
87 }
88}
89
90#[derive(Debug)]
91enum VectorSubscriberStreamState<T> {
92 Recv,
94 YieldBatch { iter: vec::IntoIter<VectorDiff<T>>, rx: Receiver<BroadcastMessage<T>> },
97}
98
99impl<T> Unpin for VectorSubscriberStreamState<T> {}
101
102impl<T: Clone + 'static> Stream for VectorSubscriberStream<T> {
103 type Item = VectorDiff<T>;
104
105 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
106 match &mut self.state {
107 VectorSubscriberStreamState::Recv => {
108 let (result, mut rx) = ready!(self.inner.poll(cx));
109
110 let poll = match result {
111 Ok(msg) => match msg.diffs {
112 OneOrManyDiffs::One(diff) => Poll::Ready(Some(diff)),
113 OneOrManyDiffs::Many(diffs) if diffs.is_empty() => {
114 unreachable!("ObservableVectorTransaction never sends empty diffs")
115 }
116 OneOrManyDiffs::Many(mut diffs) if diffs.len() == 1 => {
117 Poll::Ready(Some(diffs.pop().unwrap()))
118 }
119 OneOrManyDiffs::Many(diffs) => {
120 let mut iter = diffs.into_iter();
121 let fst = iter.next().unwrap();
122 self.state = VectorSubscriberStreamState::YieldBatch { iter, rx };
123 return Poll::Ready(Some(fst));
124 }
125 },
126 Err(RecvError::Closed) => Poll::Ready(None),
127 Err(RecvError::Lagged(_)) => {
128 Poll::Ready(handle_lag(&mut rx).map(|values| VectorDiff::Reset { values }))
129 }
130 };
131
132 self.inner.set(rx);
133 poll
134 }
135 VectorSubscriberStreamState::YieldBatch { iter, .. } => {
136 let diff =
137 iter.next().expect("YieldBatch is never left empty when exiting poll_next");
138
139 if iter.len() == 0 {
140 let old_state =
141 mem::replace(&mut self.state, VectorSubscriberStreamState::Recv);
142 let rx = match old_state {
143 VectorSubscriberStreamState::YieldBatch { rx, .. } => rx,
144 _ => unsafe { unreachable_unchecked() },
146 };
147
148 self.inner.set(rx);
149 }
150
151 Poll::Ready(Some(diff))
152 }
153 }
154 }
155}
156
157#[derive(Debug)]
163pub struct VectorSubscriberBatchedStream<T> {
164 inner: ReusableBoxRecvFuture<T>,
165}
166
167impl<T> VectorSubscriberBatchedStream<T> {
168 fn new(inner: ReusableBoxRecvFuture<T>) -> Self {
169 Self { inner }
170 }
171}
172
173impl<T: Clone + 'static> Stream for VectorSubscriberBatchedStream<T> {
174 type Item = Vec<VectorDiff<T>>;
175
176 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
177 fn append<T>(target: &mut Vec<VectorDiff<T>>, source: OneOrManyDiffs<T>) {
178 match source {
179 OneOrManyDiffs::One(diff) => target.push(diff),
180 OneOrManyDiffs::Many(mut diffs) => target.append(&mut diffs),
181 }
182 }
183
184 let (result, mut rx) = ready!(self.inner.poll(cx));
185
186 let poll = match result {
187 Ok(msg) => {
188 let mut batch = msg.diffs.into_vec();
189 loop {
190 match rx.try_recv() {
191 Ok(msg) => append(&mut batch, msg.diffs),
192 Err(TryRecvError::Empty | TryRecvError::Closed) => {
193 break Poll::Ready(Some(batch));
194 }
195 Err(TryRecvError::Lagged(_)) => {
196 break Poll::Ready(
197 handle_lag(&mut rx)
198 .map(|values| vec![VectorDiff::Reset { values }]),
199 );
200 }
201 }
202 }
203 }
204 Err(RecvError::Closed) => Poll::Ready(None),
205 Err(RecvError::Lagged(_)) => {
206 Poll::Ready(handle_lag(&mut rx).map(|values| vec![VectorDiff::Reset { values }]))
207 }
208 };
209
210 self.inner.set(rx);
211 poll
212 }
213}
214
215fn handle_lag<T: Clone + 'static>(rx: &mut Receiver<BroadcastMessage<T>>) -> Option<Vector<T>> {
216 let mut msg = None;
217 loop {
218 match rx.try_recv() {
219 Ok(m) => {
221 msg = Some(m);
222 }
223 Err(TryRecvError::Closed) => {
226 #[cfg(feature = "tracing")]
227 info!("Channel closed after lag, can't return last state");
228 return None;
229 }
230 Err(TryRecvError::Lagged(_)) => {}
233 Err(TryRecvError::Empty) => match msg {
234 Some(msg) => return Some(msg.state),
237 None => unreachable!("got no new message via try_recv after lag"),
240 },
241 }
242 }
243}
244
245type SubscriberFutureReturn<T> = (Result<T, RecvError>, Receiver<T>);
246
247struct ReusableBoxRecvFuture<T> {
248 inner: ReusableBoxFuture<'static, SubscriberFutureReturn<BroadcastMessage<T>>>,
249}
250
251async fn make_recv_future<T: Clone>(mut rx: Receiver<T>) -> SubscriberFutureReturn<T> {
252 let result = rx.recv().await;
253 (result, rx)
254}
255
256impl<T> ReusableBoxRecvFuture<T>
257where
258 T: Clone + 'static,
259{
260 fn set(&mut self, rx: Receiver<BroadcastMessage<T>>) {
261 self.inner.set(make_recv_future(rx));
262 }
263
264 fn poll(&mut self, cx: &mut Context<'_>) -> Poll<SubscriberFutureReturn<BroadcastMessage<T>>> {
265 self.inner.poll(cx)
266 }
267}
268
269impl<T> ReusableBoxRecvFuture<T>
270where
271 T: Clone + 'static,
272{
273 fn new(rx: Receiver<BroadcastMessage<T>>) -> Self {
274 Self { inner: ReusableBoxFuture::new(make_recv_future(rx)) }
275 }
276}
277
278fn assert_send<T: Send>(_val: T) {}
279#[allow(unused)]
280fn assert_make_future_send() {
281 #[derive(Clone)]
282 struct IsSend(*mut ());
283 unsafe impl Send for IsSend {}
284
285 let (_sender, receiver): (_, Receiver<IsSend>) = broadcast::channel(1);
286
287 assert_send(make_recv_future(receiver));
288}
289unsafe impl<T: Send> Send for ReusableBoxRecvFuture<T> {}
291
292impl<T> fmt::Debug for ReusableBoxRecvFuture<T> {
293 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294 f.debug_struct("ReusableBoxRecvFuture").finish()
295 }
296}