remoc_obs/
list.rs

1//! Observable append-only list.
2//!
3//! This provides a locally and remotely observable append-only list.
4//! The observable list sends a change event each time a change is performed on it.
5//! The [resulting event stream](ListSubscription) can either be processed event-wise
6//! or used to build a [mirrored list](MirroredList).
7//!
8//! # Alternatives
9//!
10//! The [observable vector](crate::vec) provides most operations of a [vector](Vec),
11//! but allocates a separate event buffer per subscriber and thus uses more memory.
12//!
13//! # Basic use
14//!
15//! Create a [ObservableList] and obtain a [subscription](ListSubscription) to it using
16//! [ObservableList::subscribe].
17//! Send this subscription to a remote endpoint via a [remote channel](remoc::rch) and call
18//! [ListSubscription::mirror] on the remote endpoint to obtain a live mirror of the observed
19//! vector or process each change event individually using [ListSubscription::recv].
20//!
21
22use futures::{future, Future, FutureExt};
23use remoc::prelude::*;
24use serde::{Deserialize, Serialize};
25use std::{
26    fmt,
27    marker::PhantomData,
28    ops::Deref,
29    sync::{
30        atomic::{AtomicUsize, Ordering},
31        Arc,
32    },
33};
34use tokio::sync::{mpsc, oneshot, watch, Mutex, OwnedMutexGuard, RwLock, RwLockReadGuard};
35
36use crate::{default_on_err, ChangeNotifier, ChangeSender, RecvError, SendError};
37
38/// A list change event.
39#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub enum ListEvent<T> {
41    /// The subscription has reached the value of the observed
42    /// list at the time it was subscribed.
43    #[serde(skip)]
44    InitialComplete,
45    /// An item was pushed at the end of the list.
46    Push(T),
47    /// The list has reached its final state and
48    /// no further events will occur.
49    Done,
50}
51
52/// Observable list task request.
53enum Req<T> {
54    /// Push item.
55    Push(T),
56    /// List is complete, i.e. no more items will be sent.
57    Done,
58    /// Borrow the current list.
59    Borrow(oneshot::Sender<OwnedMutexGuard<Vec<T>>>),
60    /// Set the error handler.
61    SetErrorHandler(Box<dyn Fn(SendError) + Send + Sync + 'static>),
62}
63
64/// Oberservable list subscribing related task request.
65enum DistReq<T, Codec> {
66    /// Subscribe receiver.
67    Subscribe(rch::mpsc::Sender<ListEvent<T>, Codec>),
68    /// Notify when no subscribers are left.
69    NotifyNoSubscribers(oneshot::Sender<()>),
70}
71
72/// An observable list distributor allows subscribing to an observable list.
73///
74/// This is clonable and can be sent to other tasks.
75#[derive(Clone)]
76pub struct ObservableListDistributor<T, Codec = remoc::codec::Default> {
77    tx: mpsc::UnboundedSender<DistReq<T, Codec>>,
78    len: Arc<AtomicUsize>,
79    subscriber_count: Arc<AtomicUsize>,
80}
81
82impl<T, Codec> fmt::Debug for ObservableListDistributor<T, Codec> {
83    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
84        f.debug_struct("ObservableListDistributor")
85            .field("len", &self.len.load(Ordering::Relaxed))
86            .field("subscriber_count", &self.subscriber_count.load(Ordering::Relaxed))
87            .finish()
88    }
89}
90
91impl<T, Codec> ObservableListDistributor<T, Codec>
92where
93    T: RemoteSend + Clone,
94    Codec: remoc::codec::Codec,
95{
96    /// Sends a request to the task.
97    fn req(&self, req: DistReq<T, Codec>) {
98        if self.tx.send(req).is_err() {
99            panic!("observable list task was terminated");
100        }
101    }
102
103    /// Subscribes to the observable list with incremental sending of the current contents.
104    pub fn subscribe(&self) -> ListSubscription<T, Codec> {
105        let (tx, rx) = rch::mpsc::channel(1);
106        let _ = self.tx.send(DistReq::Subscribe(tx));
107        ListSubscription::new(self.len.load(Ordering::Relaxed), rx)
108    }
109
110    /// Current number of subscribers.
111    pub fn subscriber_count(&self) -> usize {
112        self.subscriber_count.load(Ordering::Relaxed)
113    }
114
115    /// Returns when all subscribers have quit.
116    ///
117    /// If no subscribers are currently present, this return immediately.
118    /// This also returns when [done](ObservableList::done) has been called and
119    /// all subscribers have received all elements of the list.
120    pub fn closed(&self) -> impl Future<Output = ()> {
121        let (tx, rx) = oneshot::channel();
122        self.req(DistReq::NotifyNoSubscribers(tx));
123        async move {
124            let _ = rx.await;
125        }
126    }
127
128    /// Returns `true` if there are currently no subscribers.
129    pub fn is_closed(&self) -> bool {
130        self.subscriber_count() == 0
131    }
132}
133
134/// An append-only list that emits an event for each change.
135///
136/// Use [subscribe](Self::subscribe) to obtain an event stream
137/// that can be used for building a mirror of this list.
138///
139/// The [distributor method](Self::distributor) can be used to obtain a clonable object
140/// that can be used to make subscriptions from other tasks.
141pub struct ObservableList<T, Codec = remoc::codec::Default> {
142    tx: mpsc::UnboundedSender<Req<T>>,
143    change: ChangeSender,
144    len: Arc<AtomicUsize>,
145    done: bool,
146    dist: ObservableListDistributor<T, Codec>,
147}
148
149impl<T, Codec> fmt::Debug for ObservableList<T, Codec> {
150    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
151        f.debug_struct("ObservableList")
152            .field("len", &self.len.load(Ordering::Relaxed))
153            .field("done", &self.done)
154            .field("subscriber_count", &self.dist.subscriber_count.load(Ordering::Relaxed))
155            .finish()
156    }
157}
158
159impl<T, Codec> Default for ObservableList<T, Codec>
160where
161    T: RemoteSend + Clone,
162    Codec: remoc::codec::Codec,
163{
164    fn default() -> Self {
165        Self::from(Vec::new())
166    }
167}
168
169impl<T, Codec> From<Vec<T>> for ObservableList<T, Codec>
170where
171    T: RemoteSend + Clone,
172    Codec: remoc::codec::Codec,
173{
174    fn from(initial: Vec<T>) -> Self {
175        let (tx, rx) = mpsc::unbounded_channel();
176        let (sub_tx, sub_rx) = mpsc::unbounded_channel();
177        let len = Arc::new(AtomicUsize::new(initial.len()));
178        let subscriber_count = Arc::new(AtomicUsize::new(0));
179        tokio::spawn(Self::task(initial, rx, sub_rx, subscriber_count.clone()));
180        Self {
181            tx,
182            change: ChangeSender::new(),
183            len: len.clone(),
184            done: false,
185            dist: ObservableListDistributor { tx: sub_tx, len, subscriber_count },
186        }
187    }
188}
189
190impl<T, Codec> ObservableList<T, Codec>
191where
192    T: RemoteSend + Clone,
193    Codec: remoc::codec::Codec,
194{
195    /// Creates a new, empty observable list.
196    pub fn new() -> Self {
197        Self::default()
198    }
199
200    /// Sends a request to the task.
201    fn req(&self, req: Req<T>) {
202        if self.tx.send(req).is_err() {
203            panic!("observable list task was terminated");
204        }
205    }
206
207    /// Panics when `done` has been called.
208    fn assert_not_done(&self) {
209        if self.done {
210            panic!("observable list cannot be changed after done has been called");
211        }
212    }
213
214    /// Sets the error handler function that is called when sending an
215    /// event fails.
216    pub fn set_error_handler<E>(&mut self, on_err: E)
217    where
218        E: Fn(SendError) + Send + Sync + 'static,
219    {
220        self.req(Req::SetErrorHandler(Box::new(on_err)));
221    }
222
223    /// Returns an observable list distributor that can be used to make subscriptions
224    /// to this observable list.
225    ///
226    /// It is clonable and can be sent to other tasks.
227    pub fn distributor(&self) -> ObservableListDistributor<T, Codec> {
228        self.dist.clone()
229    }
230
231    /// Subscribes to the observable list with incremental sending of the current contents.
232    pub fn subscribe(&self) -> ListSubscription<T, Codec> {
233        self.dist.subscribe()
234    }
235
236    /// Current number of subscribers.
237    pub fn subscriber_count(&self) -> usize {
238        self.dist.subscriber_count()
239    }
240
241    /// Returns a [change notifier](ChangeNotifier) that can be used *locally* to be
242    /// notified of changes to this collection.
243    pub fn notifier(&self) -> ChangeNotifier {
244        self.change.subscribe()
245    }
246
247    /// Returns when all subscribers have quit.
248    ///
249    /// If no subscribers are currently present, this return immediately.
250    /// This also returns when [done](Self::done) has been called and
251    /// all subscribers have received all elements of the list.
252    pub fn closed(&self) -> impl Future<Output = ()> {
253        self.dist.closed()
254    }
255
256    /// Returns `true` if there are currently no subscribers.
257    pub fn is_closed(&self) -> bool {
258        self.dist.is_closed()
259    }
260
261    /// Appends an element at the end.
262    ///
263    /// A [ListEvent::Push] change event is sent.
264    ///
265    /// # Panics
266    /// Panics when [done](Self::done) has been called before.
267    pub fn push(&mut self, value: T) {
268        self.assert_not_done();
269        self.req(Req::Push(value));
270        self.len.fetch_add(1, Ordering::Relaxed);
271        self.change.notify();
272    }
273
274    /// The current number of elements in the observable list.
275    pub fn len(&self) -> usize {
276        self.len.load(Ordering::Relaxed)
277    }
278
279    /// Returns whether this observable list is empty.
280    pub fn is_empty(&self) -> bool {
281        self.len() == 0
282    }
283
284    /// Prevents further changes of this list and notifies
285    /// are subscribers that no further events will occur.
286    ///
287    /// Methods that modify the list will panic after this has been called.
288    /// It is still possible to subscribe to this observable list.
289    pub fn done(&mut self) {
290        if !self.done {
291            self.req(Req::Done);
292            self.done = true;
293        }
294    }
295
296    /// Returns `true` if [done](Self::done) has been called and further
297    /// changes are prohibited.
298    ///
299    /// Methods that modify the list will panic in this case.
300    pub fn is_done(&self) -> bool {
301        self.done
302    }
303
304    /// Borrows the current value of the observable list.
305    ///
306    /// While the borrow is held sending of events to subscribers is paused.
307    #[allow(clippy::needless_lifetimes)]
308    pub async fn borrow<'a>(&'a self) -> ObservableListRef<'a, T> {
309        let (tx, rx) = oneshot::channel();
310        self.req(Req::Borrow(tx));
311        ObservableListRef { buffer: rx.await.unwrap(), _phantom: PhantomData }
312    }
313
314    /// Event dispatch task.
315    async fn task(
316        buffer: Vec<T>, rx: mpsc::UnboundedReceiver<Req<T>>, dist_rx: mpsc::UnboundedReceiver<DistReq<T, Codec>>,
317        subscriber_count: Arc<AtomicUsize>,
318    ) {
319        /// State of a subscription.
320        struct SubState<T, Codec> {
321            pos: usize,
322            done: bool,
323            tx: rch::mpsc::Sender<ListEvent<T>, Codec>,
324        }
325
326        let buffer_shared = Arc::new(Mutex::new(buffer));
327        let mut buffer_guard_opt = None;
328        let mut rx_opt = Some(rx);
329        let mut dist_rx_opt = Some(dist_rx);
330        let mut subs: Vec<SubState<T, Codec>> = Vec::new();
331        let mut done = false;
332        let mut on_err: Box<dyn Fn(SendError) + Send + Sync + 'static> = Box::new(default_on_err);
333        let mut no_sub_notify: Vec<oneshot::Sender<()>> = Vec::new();
334
335        // Event and transmit loop.
336        loop {
337            // Obtain buffer mutex.
338            let buffer = match &mut buffer_guard_opt {
339                Some(br) => br,
340                None => {
341                    buffer_guard_opt = Some(buffer_shared.clone().lock_owned().await);
342                    buffer_guard_opt.as_mut().unwrap()
343                }
344            };
345
346            // If no more items can arrive, keep only subscription that have
347            // not yet received all items.
348            if rx_opt.is_none() {
349                if done {
350                    subs.retain(|sub| !sub.done);
351                } else {
352                    subs.retain(|sub| sub.pos < buffer.len());
353                }
354            }
355
356            // Update subscriber counts.
357            subscriber_count.store(subs.len(), Ordering::Relaxed);
358            if subs.is_empty() {
359                for tx in no_sub_notify.drain(..) {
360                    let _ = tx.send(());
361                }
362            }
363
364            // Create tasks that obtain send permits.
365            let mut permit_tasks = Vec::new();
366            for (i, sub) in subs.iter().enumerate() {
367                if sub.pos < buffer.len() || (done && !sub.done) {
368                    permit_tasks.push(async move { (i, sub.tx.reserve().await) }.boxed());
369                }
370            }
371
372            // Check for termination.
373            if rx_opt.is_none() && dist_rx_opt.is_none() && permit_tasks.is_empty() {
374                break;
375            }
376
377            tokio::select! {
378                biased;
379
380                // Process request.
381                req = async {
382                    match &mut rx_opt {
383                        Some(rx) => rx.recv().await,
384                        None => future::pending().await,
385                    }
386                } => match req {
387                    Some(Req::Push(v)) => buffer.push(v),
388                    Some(Req::Done) => done = true,
389                    Some(Req::Borrow(tx)) => {
390                        let _ = tx.send(buffer_guard_opt.take().unwrap());
391                    }
392                    Some(Req::SetErrorHandler(handler)) => on_err = handler,
393                    None => rx_opt = None,
394                },
395
396                // Process distribution request.
397                req = async {
398                    match &mut dist_rx_opt {
399                        Some(rx) => rx.recv().await,
400                        None => future::pending().await,
401                    }
402                } => match req {
403                    Some(DistReq::Subscribe(tx)) => subs.push(SubState { pos: 0, done: false, tx }),
404                    Some(DistReq::NotifyNoSubscribers(tx)) => no_sub_notify.push(tx),
405                    None => dist_rx_opt = None,
406                },
407
408                // Process send permit ready.
409                (i, res) = async move {
410                    if permit_tasks.is_empty() {
411                        future::pending().await
412                    } else {
413                        future::select_all(permit_tasks).await.0
414                    }
415                } => match res {
416                    Ok(permit) => {
417                        let sub = &mut subs[i];
418                        if sub.pos < buffer.len() {
419                            permit.send(ListEvent::Push(buffer[sub.pos].clone()));
420                            sub.pos += 1;
421                        } else if done && !sub.done {
422                            permit.send(ListEvent::Done);
423                            sub.done = true;
424                        } else {
425                            unreachable!()
426                        }
427                    }
428                    Err(err) => {
429                        subs.swap_remove(i);
430                        if let Ok(err) = SendError::try_from(err) {
431                            on_err(err);
432                        }
433                    }
434                },
435            }
436        }
437    }
438}
439
440impl<T, Codec> Drop for ObservableList<T, Codec> {
441    fn drop(&mut self) {
442        // empty
443    }
444}
445
446impl<T, Codec> Extend<T> for ObservableList<T, Codec>
447where
448    T: RemoteSend + Clone,
449    Codec: remoc::codec::Codec,
450{
451    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
452        for value in iter {
453            self.push(value);
454        }
455    }
456}
457
458/// A reference to an observable list.
459///
460/// While this is held sending of events to subscribers is paused.
461pub struct ObservableListRef<'a, T> {
462    buffer: OwnedMutexGuard<Vec<T>>,
463    _phantom: PhantomData<&'a ()>,
464}
465
466impl<'a, T> fmt::Debug for ObservableListRef<'a, T>
467where
468    T: fmt::Debug,
469{
470    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
471        self.buffer.fmt(f)
472    }
473}
474
475impl<'a, T> Deref for ObservableListRef<'a, T> {
476    type Target = Vec<T>;
477
478    fn deref(&self) -> &Self::Target {
479        &*self.buffer
480    }
481}
482
483struct MirroredListInner<T> {
484    v: Vec<T>,
485    complete: bool,
486    done: bool,
487    error: Option<RecvError>,
488    max_size: usize,
489}
490
491impl<T> MirroredListInner<T> {
492    fn handle_event(&mut self, event: ListEvent<T>) -> Result<(), RecvError> {
493        match event {
494            ListEvent::InitialComplete => {
495                self.complete = true;
496            }
497            ListEvent::Push(v) => {
498                self.v.push(v);
499                if self.v.len() > self.max_size {
500                    return Err(RecvError::MaxSizeExceeded(self.max_size));
501                }
502            }
503            ListEvent::Done => {
504                self.done = true;
505            }
506        }
507        Ok(())
508    }
509}
510
511/// Observable list subscription.
512///
513/// This can be sent to a remote endpoint via a [remote channel](remoc::rch).
514/// Then, on the remote endpoint, [mirror](Self::mirror) can be used to build
515/// and keep up-to-date a mirror of the observed list.
516///
517/// The event stream can also be processed event-wise using [recv](Self::recv).
518#[derive(Debug, Serialize, Deserialize)]
519#[serde(bound(serialize = "T: RemoteSend, Codec: remoc::codec::Codec"))]
520#[serde(bound(deserialize = "T: RemoteSend, Codec: remoc::codec::Codec"))]
521pub struct ListSubscription<T, Codec = remoc::codec::Default> {
522    /// Length of list at time of subscription.
523    initial_len: usize,
524    /// Initial value received completely.
525    #[serde(skip, default)]
526    complete: bool,
527    /// Change events receiver.
528    events: Option<rch::mpsc::Receiver<ListEvent<T>, Codec>>,
529    /// Number of elements received so far.
530    len: usize,
531}
532
533impl<T, Codec> ListSubscription<T, Codec>
534where
535    T: RemoteSend + Clone,
536    Codec: remoc::codec::Codec,
537{
538    fn new(initial_len: usize, events: rch::mpsc::Receiver<ListEvent<T>, Codec>) -> Self {
539        Self { initial_len, complete: false, events: Some(events), len: 0 }
540    }
541
542    /// Returns whether the initial value event or
543    /// stream of events that build up the initial value
544    /// has completed.
545    pub fn is_complete(&self) -> bool {
546        self.complete
547    }
548
549    /// Returns whether the observed list has indicated that no further
550    /// change events will occur.
551    pub fn is_done(&self) -> bool {
552        self.events.is_none()
553    }
554
555    /// Receives the next change event.
556    pub async fn recv(&mut self) -> Result<Option<ListEvent<T>>, RecvError> {
557        // Provide initial value complete event.
558        if self.len == self.initial_len && !self.complete {
559            self.complete = true;
560            return Ok(Some(ListEvent::InitialComplete));
561        }
562
563        // Provide change event.
564        match &mut self.events {
565            Some(rx) => match rx.recv().await? {
566                Some(ListEvent::InitialComplete) => unreachable!(),
567                Some(evt @ ListEvent::Push(_)) => {
568                    self.len += 1;
569                    Ok(Some(evt))
570                }
571                Some(ListEvent::Done) => {
572                    self.events = None;
573                    Ok(Some(ListEvent::Done))
574                }
575                None => Err(RecvError::Closed),
576            },
577            None => Ok(None),
578        }
579    }
580
581    /// Receives the next item.
582    ///
583    /// `Ok(None)` is returned when all items have been received and the observed
584    /// list has been marked as done.
585    pub async fn recv_item(&mut self) -> Result<Option<T>, RecvError> {
586        loop {
587            match self.recv().await? {
588                Some(ListEvent::InitialComplete) => (),
589                Some(ListEvent::Push(item)) => return Ok(Some(item)),
590                Some(ListEvent::Done) => (),
591                None => return Ok(None),
592            }
593        }
594    }
595}
596
597impl<T, Codec> ListSubscription<T, Codec>
598where
599    T: RemoteSend + Clone + Sync,
600    Codec: remoc::codec::Codec,
601{
602    /// Mirror the list that this subscription is observing.
603    ///
604    /// `max_size` specifies the maximum allowed size of the mirrored collection.
605    /// If this size is reached, processing of events is stopped and
606    /// [RecvError::MaxSizeExceeded] is returned.
607    pub fn mirror(mut self, max_size: usize) -> MirroredList<T> {
608        let (changed_tx, changed_rx) = watch::channel(());
609        let (dropped_tx, mut dropped_rx) = mpsc::channel(1);
610
611        // Build initial state.
612        let inner = Arc::new(RwLock::new(MirroredListInner {
613            v: Vec::new(),
614            complete: false,
615            done: false,
616            error: None,
617            max_size,
618        }));
619        let inner_task = Arc::downgrade(&inner);
620
621        // Process change events.
622        tokio::spawn(async move {
623            loop {
624                let event = tokio::select! {
625                    event = self.recv() => event,
626                    _ = dropped_rx.recv() => return,
627                };
628
629                let inner = match inner_task.upgrade() {
630                    Some(inner) => inner,
631                    None => return,
632                };
633                let mut inner = inner.write().await;
634
635                changed_tx.send_replace(());
636
637                match event {
638                    Ok(Some(event)) => {
639                        if let Err(err) = inner.handle_event(event) {
640                            inner.error = Some(err);
641                            return;
642                        }
643
644                        if inner.done {
645                            break;
646                        }
647                    }
648                    Ok(None) => break,
649                    Err(err) => {
650                        inner.error = Some(err);
651                        return;
652                    }
653                }
654            }
655        });
656
657        MirroredList { inner: Some(inner), changed_rx, _dropped_tx: dropped_tx }
658    }
659}
660
661/// An append-only list that is mirroring an observable append-only list.
662///
663/// Clones of this are cheap and share the same underlying mirrored list.
664#[derive(Clone)]
665pub struct MirroredList<T> {
666    inner: Option<Arc<RwLock<MirroredListInner<T>>>>,
667    changed_rx: watch::Receiver<()>,
668    _dropped_tx: mpsc::Sender<()>,
669}
670
671impl<T> fmt::Debug for MirroredList<T> {
672    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
673        f.debug_struct("MirroredList").finish()
674    }
675}
676
677impl<T> MirroredList<T>
678where
679    T: RemoteSend + Clone,
680{
681    /// Returns a reference to the current value of the list.
682    ///
683    /// Updates are paused while the read lock is held.
684    ///
685    /// This method returns an error if the observed list has been dropped
686    /// or the connection to it failed before it was marked as done by calling
687    /// [ObservableList::done].
688    /// In this case the mirrored contents at the point of loss of connection
689    /// can be obtained using [detach](Self::detach).
690    pub async fn borrow(&self) -> Result<MirroredListRef<'_, T>, RecvError> {
691        let inner = self.inner.as_ref().unwrap().read().await;
692        match &inner.error {
693            None => Ok(MirroredListRef(inner)),
694            Some(err) => Err(err.clone()),
695        }
696    }
697
698    /// Returns a reference to the current value of the list and marks it as seen.
699    ///
700    /// Thus [changed](Self::changed) will not return immediately until the value changes
701    /// after this method returns.
702    ///
703    /// Updates are paused while the read lock is held.
704    ///
705    /// This method returns an error if the observed list has been dropped
706    /// or the connection to it failed before it was marked as done by calling
707    /// [ObservableList::done].
708    /// In this case the mirrored contents at the point of loss of connection
709    /// can be obtained using [detach](Self::detach).
710    pub async fn borrow_and_update(&mut self) -> Result<MirroredListRef<'_, T>, RecvError> {
711        let inner = self.inner.as_ref().unwrap().read().await;
712        self.changed_rx.borrow_and_update();
713        match &inner.error {
714            None => Ok(MirroredListRef(inner)),
715            Some(err) => Err(err.clone()),
716        }
717    }
718
719    /// Stops updating the list and returns its current contents.
720    ///
721    /// If clones of this mirrored list exist, the cloned contents are returned.
722    pub async fn detach(mut self) -> Vec<T> {
723        match Arc::try_unwrap(self.inner.take().unwrap()) {
724            Ok(inner) => inner.into_inner().v,
725            Err(inner) => inner.read().await.v.clone(),
726        }
727    }
728
729    /// Waits for a change (append of one or more elements) and marks the newest value as seen.
730    ///
731    /// This also returns when connection to the observed list has been lost
732    /// or the list has been marked as done.
733    pub async fn changed(&mut self) {
734        let _ = self.changed_rx.changed().await;
735    }
736
737    /// Waits for the observed list to be marked as done and for all elements to
738    /// be received by this mirror of it.
739    ///
740    /// This marks changes as seen, even when aborted.
741    pub async fn done(&mut self) -> Result<(), RecvError> {
742        while !self.borrow_and_update().await?.is_done() {
743            self.changed().await;
744        }
745        Ok(())
746    }
747}
748
749impl<T> Drop for MirroredList<T> {
750    fn drop(&mut self) {
751        // empty
752    }
753}
754
755/// A snapshot view of an observable append-only list.
756pub struct MirroredListRef<'a, T>(RwLockReadGuard<'a, MirroredListInner<T>>);
757
758impl<'a, T> MirroredListRef<'a, T> {
759    /// Returns `true` if the mirror list has reached the length of the observed
760    /// list at the time of subscription.
761    pub fn is_complete(&self) -> bool {
762        self.0.complete
763    }
764
765    /// Returns `true` if the observed list has been marked as done by calling
766    /// [ObservableList::done] and thus no further changes can occur.
767    pub fn is_done(&self) -> bool {
768        self.0.done
769    }
770}
771
772impl<'a, T> fmt::Debug for MirroredListRef<'a, T>
773where
774    T: fmt::Debug,
775{
776    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
777        self.0.v.fmt(f)
778    }
779}
780
781impl<'a, T> Deref for MirroredListRef<'a, T> {
782    type Target = Vec<T>;
783
784    fn deref(&self) -> &Self::Target {
785        &self.0.v
786    }
787}
788
789impl<'a, T> Drop for MirroredListRef<'a, T> {
790    fn drop(&mut self) {
791        // required for drop order
792    }
793}