remoc_obs/
vec.rs

1//! Observable vector.
2//!
3//! This provides a locally and remotely observable vector.
4//! The observable vector sends a change event each time a change is performed on it.
5//! The [resulting event stream](VecSubscription) can either be processed event-wise
6//! or used to build a [mirrored vector](MirroredVec).
7//!
8//! Changes are sent using a [remote broadcast channel](remoc::rch::broadcast), thus
9//! subscribers cannot block the observed vector and are shed when their event buffer
10//! exceeds a configurable size.
11//!
12//! # Alternatives
13//!
14//! The [observable append-only list](crate::list) is more memory-efficient as it does not
15//! require a send buffer for each subscriber.
16//! You should prefer it, if you are only appending to the vector.
17//!
18//! # Basic use
19//!
20//! Create a [ObservableVec] and obtain a [subscription](VecSubscription) to it using
21//! [ObservableVec::subscribe].
22//! Send this subscription to a remote endpoint via a [remote channel](remoc::rch) and call
23//! [VecSubscription::mirror] on the remote endpoint to obtain a live mirror of the observed
24//! vector or process each change event individually using [VecSubscription::recv].
25//!
26
27use remoc::prelude::*;
28use serde::{Deserialize, Serialize};
29use std::{
30    collections::HashSet,
31    fmt,
32    iter::{Enumerate, FusedIterator},
33    mem::take,
34    ops::{Deref, DerefMut},
35    sync::Arc,
36};
37use tokio::sync::{oneshot, watch, RwLock, RwLockReadGuard};
38
39use crate::{default_on_err, send_event, ChangeNotifier, ChangeSender, RecvError, SendError};
40
41/// A vector change event.
42#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
43pub enum VecEvent<T> {
44    /// The incremental subscription has reached the value of the observed
45    /// vector at the time it was subscribed.
46    #[serde(skip)]
47    InitialComplete,
48    /// An item was added at the end.
49    Push(T),
50    /// The last item was removed.
51    Pop,
52    /// An item was inserted at the given index.
53    Insert(usize, T),
54    /// The specified item was modified.
55    Set(usize, T),
56    /// The specified item was removed.
57    Remove(usize),
58    /// The specified element was removed and replaced by the last element.
59    SwapRemove(usize),
60    /// All vector elements have been set to the specified value.
61    Fill(T),
62    /// The vector has been resized to the specified length.
63    Resize(usize, T),
64    /// The vector has been truncated to the specified length.
65    Truncate(usize),
66    /// Retain the specified elements.
67    Retain(HashSet<usize>),
68    /// Retain the inverse of the specified elements.
69    RetainNot(HashSet<usize>),
70    /// All items were removed.
71    Clear,
72    /// Shrink capacity to fit.
73    ShrinkToFit,
74    /// The vector has reached its final state and
75    /// no further events will occur.
76    Done,
77}
78
79/// A vector that emits an event for each change.
80///
81/// Use [subscribe](Self::subscribe) to obtain an event stream
82/// that can be used for building a mirror of this vector.
83///
84/// Due to current limitations of Rust you must use [get_mut](Self::get_mut)
85/// to obtain a mutable reference to an element instead of using
86/// the indexing notation.
87/// Also, mutable slicing is not supported and updates must be done
88/// element-wise.
89pub struct ObservableVec<T, Codec = remoc::codec::Default> {
90    v: Vec<T>,
91    tx: rch::broadcast::Sender<VecEvent<T>, Codec>,
92    change: ChangeSender,
93    on_err: Arc<dyn Fn(SendError) + Send + Sync>,
94    done: bool,
95}
96
97impl<T, Codec> fmt::Debug for ObservableVec<T, Codec>
98where
99    T: fmt::Debug,
100{
101    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
102        self.v.fmt(f)
103    }
104}
105
106impl<T, Codec> From<Vec<T>> for ObservableVec<T, Codec>
107where
108    T: Clone + RemoteSend,
109    Codec: remoc::codec::Codec,
110{
111    fn from(hs: Vec<T>) -> Self {
112        let (tx, _rx) = rch::broadcast::channel::<_, _, rch::buffer::Default>(1);
113        Self { v: hs, tx, change: ChangeSender::new(), on_err: Arc::new(default_on_err), done: false }
114    }
115}
116
117impl<T, Codec> From<ObservableVec<T, Codec>> for Vec<T> {
118    fn from(ohs: ObservableVec<T, Codec>) -> Self {
119        ohs.v
120    }
121}
122
123impl<T, Codec> Default for ObservableVec<T, Codec>
124where
125    T: Clone + RemoteSend,
126    Codec: remoc::codec::Codec,
127{
128    fn default() -> Self {
129        Self::from(Vec::new())
130    }
131}
132
133impl<T, Codec> ObservableVec<T, Codec>
134where
135    T: Clone + RemoteSend,
136    Codec: remoc::codec::Codec,
137{
138    /// Creates an empty observable vector.
139    pub fn new() -> Self {
140        Self::default()
141    }
142
143    /// Sets the error handler function that is called when sending an
144    /// event fails.
145    pub fn set_error_handler<E>(&mut self, on_err: E)
146    where
147        E: Fn(SendError) + Send + Sync + 'static,
148    {
149        self.on_err = Arc::new(on_err);
150    }
151
152    /// Subscribes to change events from this observable vector.
153    ///
154    /// The current contents of the vector is included with the subscription.
155    ///
156    /// `buffer` specifies the maximum size of the event buffer for this subscription in number of events.
157    /// If it is exceeded the subscription is shed and the receiver gets a [RecvError::Lagged].
158    pub fn subscribe(&self, buffer: usize) -> VecSubscription<T, Codec> {
159        VecSubscription::new(
160            VecInitialValue::new_value(self.v.clone()),
161            if self.done { None } else { Some(self.tx.subscribe(buffer)) },
162        )
163    }
164
165    /// Subscribes to change events from this observable vector with incremental sending
166    /// of the current contents.
167    ///
168    /// The current contents of the vector are sent incrementally.
169    ///
170    /// `buffer` specifies the maximum size of the event buffer for this subscription in number of events.
171    /// If it is exceeded the subscription is shed and the receiver gets a [RecvError::Lagged].
172    pub fn subscribe_incremental(&self, buffer: usize) -> VecSubscription<T, Codec> {
173        VecSubscription::new(
174            VecInitialValue::new_incremental(self.v.clone(), self.on_err.clone()),
175            if self.done { None } else { Some(self.tx.subscribe(buffer)) },
176        )
177    }
178
179    /// Current number of subscribers.
180    pub fn subscriber_count(&self) -> usize {
181        self.tx.receiver_count()
182    }
183
184    /// Returns a [change notifier](ChangeNotifier) that can be used *locally* to be
185    /// notified of changes to this collection.
186    pub fn notifier(&self) -> ChangeNotifier {
187        self.change.subscribe()
188    }
189
190    /// Appends an element at the end.
191    ///
192    /// A [VecEvent::Push] change event is sent.
193    ///
194    /// # Panics
195    /// Panics when [done](Self::done) has been called before.
196    pub fn push(&mut self, value: T) {
197        self.assert_not_done();
198        self.change.notify();
199
200        send_event(&self.tx, &*self.on_err, VecEvent::Push(value.clone()));
201        self.v.push(value);
202    }
203
204    /// Removes the last element and returns it, or `None` if the vector is empty.
205    ///
206    /// A [VecEvent::Pop] change event is sent.
207    ///
208    /// # Panics
209    /// Panics when [done](Self::done) has been called before.
210    pub fn pop(&mut self) -> Option<T> {
211        self.assert_not_done();
212
213        match self.v.pop() {
214            Some(value) => {
215                self.change.notify();
216                send_event(&self.tx, &*self.on_err, VecEvent::Pop);
217                Some(value)
218            }
219            None => None,
220        }
221    }
222
223    /// Gets a mutable reference to the value at the specified index.
224    ///
225    /// A [VecEvent::Set] change event is sent if the reference is accessed mutably.
226    ///
227    /// # Panics
228    /// Panics when [done](Self::done) has been called before.
229    pub fn get_mut(&mut self, index: usize) -> Option<RefMut<T, Codec>> {
230        self.assert_not_done();
231
232        match self.v.get_mut(index) {
233            Some(value) => Some(RefMut {
234                index,
235                value,
236                changed: false,
237                tx: &self.tx,
238                change: &self.change,
239                on_err: &*self.on_err,
240            }),
241            None => None,
242        }
243    }
244
245    /// Mutably iterates over items.
246    ///
247    /// A [VecEvent::Set] change event is sent for each value that is accessed mutably.
248    ///
249    /// # Panics
250    /// Panics when [done](Self::done) has been called before.    
251    pub fn iter_mut(&mut self) -> IterMut<T, Codec> {
252        self.assert_not_done();
253
254        IterMut {
255            inner: self.v.iter_mut().enumerate(),
256            tx: &self.tx,
257            change: &self.change,
258            on_err: &*self.on_err,
259        }
260    }
261
262    /// Inserts an element at the specified position, shift all elements after it to the right.
263    ///
264    /// A [VecEvent::Insert] change event is sent.
265    ///
266    /// # Panics
267    /// Panics when [done](Self::done) has been called before or `index > len`.
268    pub fn insert(&mut self, index: usize, value: T) {
269        self.assert_not_done();
270
271        let value_event = value.clone();
272        self.v.insert(index, value);
273        self.change.notify();
274        send_event(&self.tx, &*self.on_err, VecEvent::Insert(index, value_event));
275    }
276
277    /// Removes the element at the specified position, shifting all elements after it to the left.
278    ///
279    /// A [VecEvent::Remove] change event is sent.
280    ///
281    /// Returns the removed value.
282    ///
283    /// # Panics
284    /// Panics when [done](Self::done) has been called before or the index is out of bounds.
285    pub fn remove(&mut self, index: usize) -> T {
286        self.assert_not_done();
287
288        let value = self.v.remove(index);
289        self.change.notify();
290        send_event(&self.tx, &*self.on_err, VecEvent::Remove(index));
291        value
292    }
293
294    /// Removes the element at the specified position, replacing it with the last element.
295    ///
296    /// A [VecEvent::SwapRemove] change event is sent.
297    ///
298    /// Returns the removed value.
299    ///
300    /// # Panics
301    /// Panics when [done](Self::done) has been called before or the index is out of bounds.
302    pub fn swap_remove(&mut self, index: usize) -> T {
303        self.assert_not_done();
304
305        let value = self.v.swap_remove(index);
306        self.change.notify();
307        send_event(&self.tx, &*self.on_err, VecEvent::SwapRemove(index));
308        value
309    }
310
311    /// Fills the vector with the specified value.
312    ///
313    /// A [VecEvent::Fill] change event is sent.
314    ///
315    /// # Panics
316    /// Panics when [done](Self::done) has been called before.
317    pub fn fill(&mut self, value: T) {
318        self.assert_not_done();
319
320        self.change.notify();
321        send_event(&self.tx, &*self.on_err, VecEvent::Fill(value.clone()));
322        self.v.fill(value);
323    }
324
325    /// Resizes the vector to the specified length, filling each additional slot with the
326    /// specified value.
327    ///
328    /// A [VecEvent::Resize] change event is sent.
329    ///
330    /// # Panics
331    /// Panics when [done](Self::done) has been called before.
332    pub fn resize(&mut self, new_len: usize, value: T) {
333        self.assert_not_done();
334
335        if new_len != self.v.len() {
336            self.change.notify();
337            send_event(&self.tx, &*self.on_err, VecEvent::Resize(new_len, value.clone()));
338            self.v.resize(new_len, value);
339        }
340    }
341
342    /// Truncates the vector to the specified length.
343    ///
344    /// If `new_len` is greater than the current length, this has no effect.
345    ///
346    /// A [VecEvent::Truncate] change event is sent.
347    ///
348    /// # Panics
349    /// Panics when [done](Self::done) has been called before.
350    pub fn truncate(&mut self, new_len: usize) {
351        self.assert_not_done();
352
353        if new_len < self.len() {
354            self.change.notify();
355            send_event(&self.tx, &*self.on_err, VecEvent::Truncate(new_len));
356            self.v.truncate(new_len);
357        }
358    }
359
360    /// Removes all items.
361    ///
362    /// A [VecEvent::Clear] change event is sent.
363    ///
364    /// # Panics
365    /// Panics when [done](Self::done) has been called before.
366    pub fn clear(&mut self) {
367        self.assert_not_done();
368
369        if !self.v.is_empty() {
370            self.v.clear();
371            self.change.notify();
372            send_event(&self.tx, &*self.on_err, VecEvent::Clear);
373        }
374    }
375
376    /// Retains only the elements specified by the predicate.
377    ///
378    /// A [VecEvent::Retain] or [VecEvent::RetainNot] change event is sent.
379    ///
380    /// # Panics
381    /// Panics when [done](Self::done) has been called before.
382    pub fn retain<F>(&mut self, mut f: F)
383    where
384        F: FnMut(&T) -> bool,
385    {
386        self.assert_not_done();
387
388        let mut keep = HashSet::new();
389        let mut remove = HashSet::new();
390        let mut pos = 0;
391
392        self.v.retain(|v| {
393            let keep_this = f(v);
394
395            if keep_this {
396                keep.insert(pos);
397            } else {
398                remove.insert(pos);
399            }
400            pos += 1;
401
402            keep_this
403        });
404
405        if !remove.is_empty() {
406            self.change.notify();
407            if keep.len() < remove.len() {
408                send_event(&self.tx, &*self.on_err, VecEvent::Retain(keep));
409            } else {
410                send_event(&self.tx, &*self.on_err, VecEvent::RetainNot(remove));
411            }
412        }
413    }
414
415    /// Shrinks the capacity of the vector as much as possible.
416    ///
417    /// A [VecEvent::ShrinkToFit] change event is sent.
418    ///
419    /// # Panics
420    /// Panics when [done](Self::done) has been called before.
421    pub fn shrink_to_fit(&mut self) {
422        self.assert_not_done();
423        send_event(&self.tx, &*self.on_err, VecEvent::ShrinkToFit);
424        self.v.shrink_to_fit()
425    }
426
427    /// Panics when `done` has been called.
428    fn assert_not_done(&self) {
429        if self.done {
430            panic!("observable vector cannot be changed after done has been called");
431        }
432    }
433
434    /// Prevents further changes of this vector and notifies
435    /// are subscribers that no further events will occur.
436    ///
437    /// Methods that modify the vector will panic after this has been called.
438    /// It is still possible to subscribe to this observable vector.
439    pub fn done(&mut self) {
440        if !self.done {
441            send_event(&self.tx, &*self.on_err, VecEvent::Done);
442            self.done = true;
443        }
444    }
445
446    /// Returns `true` if [done](Self::done) has been called and further
447    /// changes are prohibited.
448    ///
449    /// Methods that modify the vector will panic in this case.
450    pub fn is_done(&self) -> bool {
451        self.done
452    }
453
454    /// Extracts the underlying vector.
455    ///
456    /// If [done](Self::done) has not been called before this method,
457    /// subscribers will receive an error.
458    pub fn into_inner(self) -> Vec<T> {
459        self.into()
460    }
461}
462
463impl<T, Codec> Deref for ObservableVec<T, Codec> {
464    type Target = Vec<T>;
465
466    fn deref(&self) -> &Self::Target {
467        &self.v
468    }
469}
470
471impl<T, Codec> Extend<T> for ObservableVec<T, Codec>
472where
473    T: RemoteSend + Clone,
474    Codec: remoc::codec::Codec,
475{
476    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
477        for value in iter {
478            self.push(value);
479        }
480    }
481}
482
483/// A mutable reference to a value inside an [observable vector](ObservableVec).
484///
485/// A [VecEvent::Set] change event is sent when this reference is dropped and the
486/// value has been accessed mutably.
487pub struct RefMut<'a, T, Codec>
488where
489    T: Clone + RemoteSend,
490    Codec: remoc::codec::Codec,
491{
492    index: usize,
493    value: &'a mut T,
494    changed: bool,
495    tx: &'a rch::broadcast::Sender<VecEvent<T>, Codec>,
496    change: &'a ChangeSender,
497    on_err: &'a dyn Fn(SendError),
498}
499
500impl<'a, T, Codec> Deref for RefMut<'a, T, Codec>
501where
502    T: Clone + RemoteSend,
503    Codec: remoc::codec::Codec,
504{
505    type Target = T;
506
507    fn deref(&self) -> &Self::Target {
508        self.value
509    }
510}
511
512impl<'a, T, Codec> DerefMut for RefMut<'a, T, Codec>
513where
514    T: Clone + RemoteSend,
515    Codec: remoc::codec::Codec,
516{
517    fn deref_mut(&mut self) -> &mut Self::Target {
518        self.changed = true;
519        self.value
520    }
521}
522
523impl<'a, T, Codec> Drop for RefMut<'a, T, Codec>
524where
525    T: Clone + RemoteSend,
526    Codec: remoc::codec::Codec,
527{
528    fn drop(&mut self) {
529        if self.changed {
530            self.change.notify();
531            send_event(self.tx, self.on_err, VecEvent::Set(self.index, self.value.clone()));
532        }
533    }
534}
535
536/// A mutable iterator over the items in an [observable vector](ObservableVec).
537///
538/// A [VecEvent::Set] change event is sent for each value that is accessed mutably.
539pub struct IterMut<'a, T, Codec> {
540    inner: Enumerate<std::slice::IterMut<'a, T>>,
541    tx: &'a rch::broadcast::Sender<VecEvent<T>, Codec>,
542    change: &'a ChangeSender,
543    on_err: &'a dyn Fn(SendError),
544}
545
546impl<'a, T, Codec> Iterator for IterMut<'a, T, Codec>
547where
548    T: Clone + RemoteSend,
549    Codec: remoc::codec::Codec,
550{
551    type Item = RefMut<'a, T, Codec>;
552
553    fn next(&mut self) -> Option<Self::Item> {
554        match self.inner.next() {
555            Some((index, value)) => Some(RefMut {
556                index,
557                value,
558                changed: false,
559                tx: self.tx,
560                change: self.change,
561                on_err: self.on_err,
562            }),
563            None => None,
564        }
565    }
566
567    fn size_hint(&self) -> (usize, Option<usize>) {
568        self.inner.size_hint()
569    }
570}
571
572impl<'a, T, Codec> ExactSizeIterator for IterMut<'a, T, Codec>
573where
574    T: Clone + RemoteSend,
575    Codec: remoc::codec::Codec,
576{
577    fn len(&self) -> usize {
578        self.inner.len()
579    }
580}
581
582impl<'a, T, Codec> DoubleEndedIterator for IterMut<'a, T, Codec>
583where
584    T: Clone + RemoteSend,
585    Codec: remoc::codec::Codec,
586{
587    fn next_back(&mut self) -> Option<Self::Item> {
588        match self.inner.next_back() {
589            Some((index, value)) => Some(RefMut {
590                index,
591                value,
592                changed: false,
593                tx: self.tx,
594                change: self.change,
595                on_err: self.on_err,
596            }),
597            None => None,
598        }
599    }
600}
601
602impl<'a, T, Codec> FusedIterator for IterMut<'a, T, Codec>
603where
604    T: Clone + RemoteSend,
605    Codec: remoc::codec::Codec,
606{
607}
608
609struct MirroredVecInner<T> {
610    v: Vec<T>,
611    complete: bool,
612    done: bool,
613    error: Option<RecvError>,
614    max_size: usize,
615}
616
617impl<T> MirroredVecInner<T>
618where
619    T: Clone,
620{
621    fn handle_event(&mut self, event: VecEvent<T>) -> Result<(), RecvError> {
622        match event {
623            VecEvent::InitialComplete => {
624                self.complete = true;
625            }
626            VecEvent::Push(v) => {
627                self.v.push(v);
628                if self.v.len() > self.max_size {
629                    return Err(RecvError::MaxSizeExceeded(self.max_size));
630                }
631            }
632            VecEvent::Pop => {
633                self.v.pop();
634            }
635            VecEvent::Insert(i, v) => {
636                if i > self.v.len() {
637                    return Err(RecvError::InvalidIndex(i));
638                }
639                self.v.insert(i, v);
640            }
641            VecEvent::Set(i, v) => {
642                if i >= self.v.len() {
643                    return Err(RecvError::InvalidIndex(i));
644                }
645                self.v[i] = v;
646            }
647            VecEvent::Remove(i) => {
648                if i >= self.v.len() {
649                    return Err(RecvError::InvalidIndex(i));
650                }
651                self.v.remove(i);
652            }
653            VecEvent::SwapRemove(i) => {
654                if i >= self.v.len() {
655                    return Err(RecvError::InvalidIndex(i));
656                }
657                self.v.swap_remove(i);
658            }
659            VecEvent::Fill(v) => {
660                self.v.fill(v);
661            }
662            VecEvent::Resize(l, v) => {
663                self.v.resize(l, v);
664            }
665            VecEvent::Truncate(l) => {
666                self.v.truncate(l);
667            }
668            VecEvent::Retain(r) => {
669                let mut pos = 0;
670                self.v.retain(|_| {
671                    let keep_this = r.contains(&pos);
672                    pos += 1;
673                    keep_this
674                });
675            }
676            VecEvent::RetainNot(nr) => {
677                let mut pos = 0;
678                self.v.retain(|_| {
679                    let keep_this = !nr.contains(&pos);
680                    pos += 1;
681                    keep_this
682                });
683            }
684            VecEvent::Clear => {
685                self.v.clear();
686            }
687            VecEvent::ShrinkToFit => {
688                self.v.shrink_to_fit();
689            }
690            VecEvent::Done => {
691                self.done = true;
692            }
693        }
694        Ok(())
695    }
696}
697
698/// Initial value of an observable vector subscription.
699#[derive(Debug, Serialize, Deserialize)]
700#[serde(bound(serialize = "T: RemoteSend, Codec: remoc::codec::Codec"))]
701#[serde(bound(deserialize = "T: RemoteSend, Codec: remoc::codec::Codec"))]
702enum VecInitialValue<T, Codec = remoc::codec::Default> {
703    /// Initial value is present.
704    Value(Vec<T>),
705    /// Initial value is received incrementally.
706    Incremental {
707        /// Number of elements.
708        len: usize,
709        /// Receiver.
710        rx: rch::mpsc::Receiver<T, Codec>,
711    },
712}
713
714impl<T, Codec> VecInitialValue<T, Codec>
715where
716    T: RemoteSend + Clone,
717    Codec: remoc::codec::Codec,
718{
719    /// Transmits the initial value as a whole.
720    fn new_value(hs: Vec<T>) -> Self {
721        Self::Value(hs)
722    }
723
724    /// Transmits the initial value incrementally.
725    fn new_incremental(hs: Vec<T>, on_err: Arc<dyn Fn(SendError) + Send + Sync>) -> Self {
726        let (tx, rx) = rch::mpsc::channel(128);
727        let len = hs.len();
728
729        tokio::spawn(async move {
730            for v in hs.into_iter() {
731                match tx.send(v).await {
732                    Ok(()) => (),
733                    Err(err) if err.is_disconnected() => break,
734                    Err(err) => match err.try_into() {
735                        Ok(err) => (on_err)(err),
736                        Err(_) => unreachable!(),
737                    },
738                }
739            }
740        });
741
742        Self::Incremental { len, rx }
743    }
744}
745
746/// Observable vector subscription.
747///
748/// This can be sent to a remote endpoint via a [remote channel](remoc::rch).
749/// Then, on the remote endpoint, [mirror](Self::mirror) can be used to build
750/// and keep up-to-date a mirror of the observed vector.
751///
752/// The event stream can also be processed event-wise using [recv](Self::recv).
753/// If the subscription is not incremental [take_initial](Self::take_initial) must
754/// be called before the first call to [recv](Self::recv).
755#[derive(Debug, Serialize, Deserialize)]
756#[serde(bound(serialize = "T: RemoteSend, Codec: remoc::codec::Codec"))]
757#[serde(bound(deserialize = "T: RemoteSend, Codec: remoc::codec::Codec"))]
758pub struct VecSubscription<T, Codec = remoc::codec::Default> {
759    /// Value of vector at time of subscription.
760    initial: VecInitialValue<T, Codec>,
761    /// Initial value received completely.
762    #[serde(skip, default)]
763    complete: bool,
764    /// Change events receiver.
765    ///
766    /// `None` if [ObservableVec::done] has been called before subscribing.
767    events: Option<rch::broadcast::Receiver<VecEvent<T>, Codec>>,
768    /// Event stream ended.
769    #[serde(skip, default)]
770    done: bool,
771}
772
773impl<T, Codec> VecSubscription<T, Codec>
774where
775    T: RemoteSend + Clone,
776    Codec: remoc::codec::Codec,
777{
778    fn new(
779        initial: VecInitialValue<T, Codec>, events: Option<rch::broadcast::Receiver<VecEvent<T>, Codec>>,
780    ) -> Self {
781        Self { initial, complete: false, events, done: false }
782    }
783
784    /// Returns whether the subscription is incremental.
785    pub fn is_incremental(&self) -> bool {
786        matches!(self.initial, VecInitialValue::Incremental { .. })
787    }
788
789    /// Returns whether the initial value event or
790    /// stream of events that build up the initial value
791    /// has completed or [take_initial](Self::take_initial) has been called.
792    pub fn is_complete(&self) -> bool {
793        self.complete
794    }
795
796    /// Returns whether the observed vector has indicated that no further
797    /// change events will occur.
798    pub fn is_done(&self) -> bool {
799        self.events.is_none() || self.done
800    }
801
802    /// Take the initial value.
803    ///
804    /// This is only possible if the subscription is not incremental
805    /// and the initial value has not already been taken.
806    /// Otherwise `None` is returned.
807    ///
808    /// If the subscription is not incremental this must be called before the
809    /// first call to [recv](Self::recv).
810    pub fn take_initial(&mut self) -> Option<Vec<T>> {
811        match &mut self.initial {
812            VecInitialValue::Value(value) if !self.complete => {
813                self.complete = true;
814                Some(take(value))
815            }
816            _ => None,
817        }
818    }
819
820    /// Receives the next change event.
821    ///
822    /// # Panics
823    /// Panics when the subscription is not incremental and [take_initial](Self::take_initial)
824    /// has not been called.
825    pub async fn recv(&mut self) -> Result<Option<VecEvent<T>>, RecvError> {
826        // Provide initial value events.
827        if !self.complete {
828            match &mut self.initial {
829                VecInitialValue::Incremental { len, rx } => {
830                    if *len > 0 {
831                        match rx.recv().await? {
832                            Some(v) => {
833                                // Provide incremental initial value event.
834                                *len -= 1;
835                                return Ok(Some(VecEvent::Push(v)));
836                            }
837                            None => return Err(RecvError::Closed),
838                        }
839                    } else {
840                        // Provide incremental initial value complete event.
841                        self.complete = true;
842                        return Ok(Some(VecEvent::InitialComplete));
843                    }
844                }
845                VecInitialValue::Value(_) => {
846                    panic!("take_initial must be called before recv for non-incremental subscription");
847                }
848            }
849        }
850
851        // Provide change event.
852        if let Some(rx) = &mut self.events {
853            match rx.recv().await? {
854                VecEvent::Done => self.events = None,
855                evt => return Ok(Some(evt)),
856            }
857        }
858
859        // Provide done event.
860        if self.done {
861            Ok(None)
862        } else {
863            self.done = true;
864            Ok(Some(VecEvent::Done))
865        }
866    }
867}
868
869impl<T, Codec> VecSubscription<T, Codec>
870where
871    T: RemoteSend + Clone + Sync,
872    Codec: remoc::codec::Codec,
873{
874    /// Mirror the vector that this subscription is observing.
875    ///
876    /// `max_size` specifies the maximum allowed size of the mirrored collection.
877    /// If this size is reached, processing of events is stopped and
878    /// [RecvError::MaxSizeExceeded] is returned.
879    pub fn mirror(mut self, max_size: usize) -> MirroredVec<T, Codec> {
880        let (tx, _rx) = rch::broadcast::channel::<_, _, rch::buffer::Default>(1);
881        let (changed_tx, changed_rx) = watch::channel(());
882        let (dropped_tx, mut dropped_rx) = oneshot::channel();
883
884        // Build initial state.
885        let inner = Arc::new(RwLock::new(Some(MirroredVecInner {
886            v: self.take_initial().unwrap_or_default(),
887            complete: self.is_complete(),
888            done: self.is_done(),
889            error: None,
890            max_size,
891        })));
892        let inner_task = inner.clone();
893
894        // Process change events.
895        let tx_send = tx.clone();
896        tokio::spawn(async move {
897            loop {
898                let event = tokio::select! {
899                    event = self.recv() => event,
900                    _ = &mut dropped_rx => return,
901                };
902
903                let mut inner = inner_task.write().await;
904                let mut inner = match inner.as_mut() {
905                    Some(inner) => inner,
906                    None => return,
907                };
908
909                changed_tx.send_replace(());
910
911                match event {
912                    Ok(Some(event)) => {
913                        if tx_send.receiver_count() > 0 {
914                            let _ = tx_send.send(event.clone());
915                        }
916
917                        if let Err(err) = inner.handle_event(event) {
918                            inner.error = Some(err);
919                            return;
920                        }
921
922                        if inner.done {
923                            break;
924                        }
925                    }
926                    Ok(None) => break,
927                    Err(err) => {
928                        inner.error = Some(err);
929                        return;
930                    }
931                }
932            }
933        });
934
935        MirroredVec { inner, tx, changed_rx, _dropped_tx: dropped_tx }
936    }
937}
938
939/// A vector that is mirroring an observable vector.
940pub struct MirroredVec<T, Codec = remoc::codec::Default> {
941    inner: Arc<RwLock<Option<MirroredVecInner<T>>>>,
942    tx: rch::broadcast::Sender<VecEvent<T>, Codec>,
943    changed_rx: watch::Receiver<()>,
944    _dropped_tx: oneshot::Sender<()>,
945}
946
947impl<T, Codec> fmt::Debug for MirroredVec<T, Codec> {
948    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
949        f.debug_struct("MirroredVec").finish()
950    }
951}
952
953impl<T, Codec> MirroredVec<T, Codec>
954where
955    T: RemoteSend + Clone,
956    Codec: remoc::codec::Codec,
957{
958    /// Returns a reference to the current value of the vector.
959    ///
960    /// Updates are paused while the read lock is held.
961    ///
962    /// This method returns an error if the observed vector has been dropped
963    /// or the connection to it failed before it was marked as done by calling
964    /// [ObservableVec::done].
965    /// In this case the mirrored contents at the point of loss of connection
966    /// can be obtained using [detach](Self::detach).
967    pub async fn borrow(&self) -> Result<MirroredVecRef<'_, T>, RecvError> {
968        let inner = self.inner.read().await;
969        let inner = RwLockReadGuard::map(inner, |inner| inner.as_ref().unwrap());
970        match &inner.error {
971            None => Ok(MirroredVecRef(inner)),
972            Some(err) => Err(err.clone()),
973        }
974    }
975
976    /// Returns a reference to the current value of the vector and marks it as seen.
977    ///
978    /// Thus [changed](Self::changed) will not return immediately until the value changes
979    /// after this method returns.
980    ///
981    /// Updates are paused while the read lock is held.
982    ///
983    /// This method returns an error if the observed vector has been dropped
984    /// or the connection to it failed before it was marked as done by calling
985    /// [ObservableVec::done].
986    /// In this case the mirrored contents at the point of loss of connection
987    /// can be obtained using [detach](Self::detach).
988    pub async fn borrow_and_update(&mut self) -> Result<MirroredVecRef<'_, T>, RecvError> {
989        let inner = self.inner.read().await;
990        self.changed_rx.borrow_and_update();
991        let inner = RwLockReadGuard::map(inner, |inner| inner.as_ref().unwrap());
992        match &inner.error {
993            None => Ok(MirroredVecRef(inner)),
994            Some(err) => Err(err.clone()),
995        }
996    }
997
998    /// Stops updating the vector and returns its current contents.
999    pub async fn detach(self) -> Vec<T> {
1000        let mut inner = self.inner.write().await;
1001        inner.take().unwrap().v
1002    }
1003
1004    /// Waits for a change and marks the newest value as seen.
1005    ///
1006    /// This also returns when connection to the observed vector has been lost
1007    /// or the vector has been marked as done.
1008    pub async fn changed(&mut self) {
1009        let _ = self.changed_rx.changed().await;
1010    }
1011
1012    /// Subscribes to change events from this mirrored vector.
1013    ///
1014    /// The current contents of the vector is included with the subscription.
1015    ///
1016    /// `buffer` specifies the maximum size of the event buffer for this subscription in number of events.
1017    /// If it is exceeded the subscription is shed and the receiver gets a [RecvError::Lagged].
1018    pub async fn subscribe(&self, buffer: usize) -> Result<VecSubscription<T, Codec>, RecvError> {
1019        let view = self.borrow().await?;
1020        let initial = view.clone();
1021        let events = if view.is_done() { None } else { Some(self.tx.subscribe(buffer)) };
1022
1023        Ok(VecSubscription::new(VecInitialValue::new_value(initial), events))
1024    }
1025
1026    /// Subscribes to change events from this mirrored vector with incremental sending
1027    /// of the current contents.
1028    ///
1029    /// The current contents of the vector are sent incrementally.
1030    ///
1031    /// `buffer` specifies the maximum size of the event buffer for this subscription in number of events.
1032    /// If it is exceeded the subscription is shed and the receiver gets a [RecvError::Lagged].
1033    pub async fn subscribe_incremental(&self, buffer: usize) -> Result<VecSubscription<T, Codec>, RecvError> {
1034        let view = self.borrow().await?;
1035        let initial = view.clone();
1036        let events = if view.is_done() { None } else { Some(self.tx.subscribe(buffer)) };
1037
1038        Ok(VecSubscription::new(VecInitialValue::new_incremental(initial, Arc::new(default_on_err)), events))
1039    }
1040}
1041
1042impl<T, Codec> Drop for MirroredVec<T, Codec> {
1043    fn drop(&mut self) {
1044        // empty
1045    }
1046}
1047
1048/// A snapshot view of an observable vector.
1049pub struct MirroredVecRef<'a, T>(RwLockReadGuard<'a, MirroredVecInner<T>>);
1050
1051impl<'a, T> MirroredVecRef<'a, T> {
1052    /// Returns `true` if the initial state of an incremental subscription has
1053    /// been reached.
1054    pub fn is_complete(&self) -> bool {
1055        self.0.complete
1056    }
1057
1058    /// Returns `true` if the observed vector has been marked as done by calling
1059    /// [ObservableVec::done] and thus no further changes can occur.
1060    pub fn is_done(&self) -> bool {
1061        self.0.done
1062    }
1063}
1064
1065impl<'a, T> fmt::Debug for MirroredVecRef<'a, T>
1066where
1067    T: fmt::Debug,
1068{
1069    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1070        self.0.v.fmt(f)
1071    }
1072}
1073
1074impl<'a, T> Deref for MirroredVecRef<'a, T> {
1075    type Target = Vec<T>;
1076
1077    fn deref(&self) -> &Self::Target {
1078        &self.0.v
1079    }
1080}
1081
1082impl<'a, T> Drop for MirroredVecRef<'a, T> {
1083    fn drop(&mut self) {
1084        // required for drop order
1085    }
1086}