remoc_obs/
hash_map.rs

1//! Observable hash map.
2//!
3//! This provides a locally and remotely observable hash map.
4//! The observable hash map sends a change event each time a change is performed on it.
5//! The [resulting event stream](HashMapSubscription) can either be processed event-wise
6//! or used to build a [mirrored hash map](MirroredHashMap).
7//!
8//! Changes are sent using a [remote broadcast channel](remoc::rch::broadcast), thus
9//! subscribers cannot block the observed hash map and are shed when their event buffer
10//! exceeds a configurable size.
11//!
12//! # Basic use
13//!
14//! Create a [ObservableHashMap] and obtain a [subscription](HashMapSubscription) to it using
15//! [ObservableHashMap::subscribe].
16//! Send this subscription to a remote endpoint via a [remote channel](remoc::rch) and call
17//! [HashMapSubscription::mirror] on the remote endpoint to obtain a live mirror of the observed
18//! hash map or process each change event individually using [HashMapSubscription::recv].
19//!
20
21use remoc::prelude::*;
22use serde::{Deserialize, Serialize};
23use std::{
24    collections::HashMap,
25    fmt,
26    hash::Hash,
27    iter::FusedIterator,
28    mem::take,
29    ops::{Deref, DerefMut},
30    sync::Arc,
31};
32use tokio::sync::{oneshot, watch, RwLock, RwLockReadGuard};
33
34use crate::{default_on_err, send_event, ChangeNotifier, ChangeSender, RecvError, SendError};
35
36/// A hash map change event.
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub enum HashMapEvent<K, V> {
39    /// The incremental subscription has reached the value of the observed
40    /// hash map at the time it was subscribed.
41    #[serde(skip)]
42    InitialComplete,
43    /// An item was inserted or modified.
44    Set(K, V),
45    /// An item was removed.
46    Remove(K),
47    /// All items were removed.
48    Clear,
49    /// Shrink capacity to fit.
50    ShrinkToFit,
51    /// The hash map has reached its final state and
52    /// no further events will occur.
53    Done,
54}
55
56/// A hash map that emits an event for each change.
57///
58/// Use [subscribe](Self::subscribe) to obtain an event stream
59/// that can be used for building a mirror of this hash map.
60pub struct ObservableHashMap<K, V, Codec = remoc::codec::Default> {
61    hm: HashMap<K, V>,
62    tx: rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
63    change: ChangeSender,
64    on_err: Arc<dyn Fn(SendError) + Send + Sync>,
65    done: bool,
66}
67
68impl<K, V, Codec> fmt::Debug for ObservableHashMap<K, V, Codec>
69where
70    K: fmt::Debug,
71    V: fmt::Debug,
72{
73    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
74        self.hm.fmt(f)
75    }
76}
77
78impl<K, V, Codec> From<HashMap<K, V>> for ObservableHashMap<K, V, Codec>
79where
80    K: Clone + RemoteSend,
81    V: Clone + RemoteSend,
82    Codec: remoc::codec::Codec,
83{
84    fn from(hm: HashMap<K, V>) -> Self {
85        let (tx, _rx) = rch::broadcast::channel::<_, _, rch::buffer::Default>(1);
86        Self { hm, tx, on_err: Arc::new(default_on_err), change: ChangeSender::new(), done: false }
87    }
88}
89
90impl<K, V, Codec> From<ObservableHashMap<K, V, Codec>> for HashMap<K, V> {
91    fn from(ohm: ObservableHashMap<K, V, Codec>) -> Self {
92        ohm.hm
93    }
94}
95
96impl<K, V, Codec> Default for ObservableHashMap<K, V, Codec>
97where
98    K: Clone + RemoteSend,
99    V: Clone + RemoteSend,
100    Codec: remoc::codec::Codec,
101{
102    fn default() -> Self {
103        Self::from(HashMap::new())
104    }
105}
106
107impl<K, V, Codec> ObservableHashMap<K, V, Codec>
108where
109    K: Eq + Hash + Clone + RemoteSend,
110    V: Clone + RemoteSend,
111    Codec: remoc::codec::Codec,
112{
113    /// Creates an empty observable hash map.
114    pub fn new() -> Self {
115        Self::default()
116    }
117
118    /// Sets the error handler function that is called when sending an
119    /// event fails.
120    pub fn set_error_handler<E>(&mut self, on_err: E)
121    where
122        E: Fn(SendError) + Send + Sync + 'static,
123    {
124        self.on_err = Arc::new(on_err);
125    }
126
127    /// Subscribes to change events from this observable hash map.
128    ///
129    /// The current contents of the hash map is included with the subscription.
130    ///
131    /// `buffer` specifies the maximum size of the event buffer for this subscription in number of events.
132    /// If it is exceeded the subscription is shed and the receiver gets a [RecvError::Lagged].
133    pub fn subscribe(&self, buffer: usize) -> HashMapSubscription<K, V, Codec> {
134        HashMapSubscription::new(
135            HashMapInitialValue::new_value(self.hm.clone()),
136            if self.done { None } else { Some(self.tx.subscribe(buffer)) },
137        )
138    }
139
140    /// Subscribes to change events from this observable hash map with incremental sending
141    /// of the current contents.
142    ///
143    /// The current contents of the hash map are sent incrementally.
144    ///
145    /// `buffer` specifies the maximum size of the event buffer for this subscription in number of events.
146    /// If it is exceeded the subscription is shed and the receiver gets a [RecvError::Lagged].
147    pub fn subscribe_incremental(&self, buffer: usize) -> HashMapSubscription<K, V, Codec> {
148        HashMapSubscription::new(
149            HashMapInitialValue::new_incremental(self.hm.clone(), self.on_err.clone()),
150            if self.done { None } else { Some(self.tx.subscribe(buffer)) },
151        )
152    }
153
154    /// Current number of subscribers.
155    pub fn subscriber_count(&self) -> usize {
156        self.tx.receiver_count()
157    }
158
159    /// Returns a [change notifier](ChangeNotifier) that can be used *locally* to be
160    /// notified of changes to this collection.
161    pub fn notifier(&self) -> ChangeNotifier {
162        self.change.subscribe()
163    }
164
165    /// Inserts a value under a key.
166    ///
167    /// A [HashMapEvent::Set] change event is sent.
168    ///
169    /// Returns the value previously stored under the key, if any.
170    ///
171    /// # Panics
172    /// Panics when [done](Self::done) has been called before.
173    pub fn insert(&mut self, k: K, v: V) -> Option<V> {
174        self.assert_not_done();
175        self.change.notify();
176
177        send_event(&self.tx, &*self.on_err, HashMapEvent::Set(k.clone(), v.clone()));
178        self.hm.insert(k, v)
179    }
180
181    /// Removes the value under the specified key.
182    ///
183    /// A [HashMapEvent::Remove] change event is sent.
184    ///
185    /// The value is returned.
186    ///
187    /// # Panics
188    /// Panics when [done](Self::done) has been called before.
189    pub fn remove<Q>(&mut self, k: &Q) -> Option<V>
190    where
191        K: std::borrow::Borrow<Q>,
192        Q: Hash + Eq,
193    {
194        self.assert_not_done();
195
196        match self.hm.remove_entry(k) {
197            Some((k, v)) => {
198                self.change.notify();
199                send_event(&self.tx, &*self.on_err, HashMapEvent::Remove(k));
200                Some(v)
201            }
202            None => None,
203        }
204    }
205
206    /// Removes all items.
207    ///
208    /// A [HashMapEvent::Clear] change event is sent.
209    ///
210    /// # Panics
211    /// Panics when [done](Self::done) has been called before.
212    pub fn clear(&mut self) {
213        self.assert_not_done();
214
215        if !self.hm.is_empty() {
216            self.hm.clear();
217            self.change.notify();
218            send_event(&self.tx, &*self.on_err, HashMapEvent::Clear);
219        }
220    }
221
222    /// Retains only the elements specified by the predicate.
223    ///
224    /// A [HashMapEvent::Remove] change event is sent for every element that is removed.
225    ///
226    /// # Panics
227    /// Panics when [done](Self::done) has been called before.
228    pub fn retain<F>(&mut self, mut f: F)
229    where
230        F: FnMut(&K, &mut V) -> bool,
231    {
232        self.assert_not_done();
233
234        self.hm.retain(|k, v| {
235            if f(k, v) {
236                true
237            } else {
238                self.change.notify();
239                send_event(&self.tx, &*self.on_err, HashMapEvent::Remove(k.clone()));
240                false
241            }
242        });
243    }
244
245    /// Gets the given key’s corresponding entry in the map for in-place manipulation.
246    ///
247    /// # Panics
248    /// Panics when [done](Self::done) has been called before.
249    pub fn entry(&mut self, key: K) -> Entry<'_, K, V, Codec> {
250        self.assert_not_done();
251
252        match self.hm.entry(key) {
253            std::collections::hash_map::Entry::Occupied(inner) => Entry::Occupied(OccupiedEntry {
254                inner,
255                tx: &self.tx,
256                change: &self.change,
257                on_err: &*self.on_err,
258            }),
259            std::collections::hash_map::Entry::Vacant(inner) => {
260                Entry::Vacant(VacantEntry { inner, tx: &self.tx, change: &self.change, on_err: &*self.on_err })
261            }
262        }
263    }
264
265    /// Gets a mutable reference to the value under the specified key.
266    ///
267    /// A [HashMapEvent::Set] change event is sent if the reference is accessed mutably.
268    ///
269    /// # Panics
270    /// Panics when [done](Self::done) has been called before.
271    pub fn get_mut<Q>(&mut self, k: &Q) -> Option<RefMut<'_, K, V, Codec>>
272    where
273        K: std::borrow::Borrow<Q>,
274        Q: Hash + Eq,
275    {
276        self.assert_not_done();
277
278        match self.hm.get_key_value(k) {
279            Some((key, _)) => {
280                let key = key.clone();
281                let value = self.hm.get_mut(k).unwrap();
282                Some(RefMut {
283                    key,
284                    value,
285                    changed: false,
286                    tx: &self.tx,
287                    change: &self.change,
288                    on_err: &*self.on_err,
289                })
290            }
291            None => None,
292        }
293    }
294
295    /// Mutably iterates over the key-value pairs.
296    ///
297    /// A [HashMapEvent::Set] change event is sent for each value that is accessed mutably.
298    ///
299    /// # Panics
300    /// Panics when [done](Self::done) has been called before.
301    pub fn iter_mut(&mut self) -> IterMut<'_, K, V, Codec> {
302        self.assert_not_done();
303
304        IterMut { inner: self.hm.iter_mut(), tx: &self.tx, change: &self.change, on_err: &*self.on_err }
305    }
306
307    /// Shrinks the capacity of the hash map as much as possible.
308    ///
309    /// A [HashMapEvent::ShrinkToFit] change event is sent.
310    ///
311    /// # Panics
312    /// Panics when [done](Self::done) has been called before.
313    pub fn shrink_to_fit(&mut self) {
314        self.assert_not_done();
315        send_event(&self.tx, &*self.on_err, HashMapEvent::ShrinkToFit);
316        self.hm.shrink_to_fit()
317    }
318
319    /// Panics when `done` has been called.
320    fn assert_not_done(&self) {
321        if self.done {
322            panic!("observable hash map cannot be changed after done has been called");
323        }
324    }
325
326    /// Prevents further changes of this hash map and notifies
327    /// are subscribers that no further events will occur.
328    ///
329    /// Methods that modify the hash map will panic after this has been called.
330    /// It is still possible to subscribe to this observable hash map.
331    pub fn done(&mut self) {
332        if !self.done {
333            send_event(&self.tx, &*self.on_err, HashMapEvent::Done);
334            self.done = true;
335        }
336    }
337
338    /// Returns `true` if [done](Self::done) has been called and further
339    /// changes are prohibited.
340    ///
341    /// Methods that modify the hash map will panic in this case.
342    pub fn is_done(&self) -> bool {
343        self.done
344    }
345
346    /// Extracts the underlying hash map.
347    ///
348    /// If [done](Self::done) has not been called before this method,
349    /// subscribers will receive an error.
350    pub fn into_inner(self) -> HashMap<K, V> {
351        self.into()
352    }
353}
354
355impl<K, V, Codec> Deref for ObservableHashMap<K, V, Codec> {
356    type Target = HashMap<K, V>;
357
358    fn deref(&self) -> &Self::Target {
359        &self.hm
360    }
361}
362
363impl<K, V, Codec> Extend<(K, V)> for ObservableHashMap<K, V, Codec>
364where
365    K: Eq + Hash + Clone + RemoteSend,
366    V: Clone + RemoteSend,
367    Codec: remoc::codec::Codec,
368{
369    fn extend<I: IntoIterator<Item = (K, V)>>(&mut self, iter: I) {
370        for (k, v) in iter {
371            self.insert(k, v);
372        }
373    }
374}
375
376/// A mutable reference to a value inside an [observable hash map](ObservableHashMap).
377///
378/// A [HashMapEvent::Set] change event is sent when this reference is dropped and the
379/// value has been accessed mutably.
380pub struct RefMut<'a, K, V, Codec>
381where
382    K: Clone + RemoteSend,
383    V: Clone + RemoteSend,
384    Codec: remoc::codec::Codec,
385{
386    key: K,
387    value: &'a mut V,
388    changed: bool,
389    tx: &'a rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
390    change: &'a ChangeSender,
391    on_err: &'a dyn Fn(SendError),
392}
393
394impl<'a, K, V, Codec> Deref for RefMut<'a, K, V, Codec>
395where
396    K: Clone + RemoteSend,
397    V: Clone + RemoteSend,
398    Codec: remoc::codec::Codec,
399{
400    type Target = V;
401
402    fn deref(&self) -> &Self::Target {
403        self.value
404    }
405}
406
407impl<'a, K, V, Codec> DerefMut for RefMut<'a, K, V, Codec>
408where
409    K: Clone + RemoteSend,
410    V: Clone + RemoteSend,
411    Codec: remoc::codec::Codec,
412{
413    fn deref_mut(&mut self) -> &mut Self::Target {
414        self.changed = true;
415        self.value
416    }
417}
418
419impl<'a, K, V, Codec> Drop for RefMut<'a, K, V, Codec>
420where
421    K: Clone + RemoteSend,
422    V: Clone + RemoteSend,
423    Codec: remoc::codec::Codec,
424{
425    fn drop(&mut self) {
426        if self.changed {
427            self.change.notify();
428            send_event(self.tx, self.on_err, HashMapEvent::Set(self.key.clone(), self.value.clone()));
429        }
430    }
431}
432
433/// A mutable iterator over the key-value pairs in an [observable hash map](ObservableHashMap).
434///
435/// A [HashMapEvent::Set] change event is sent for each value that is accessed mutably.
436pub struct IterMut<'a, K, V, Codec = remoc::codec::Default> {
437    inner: std::collections::hash_map::IterMut<'a, K, V>,
438    tx: &'a rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
439    change: &'a ChangeSender,
440    on_err: &'a dyn Fn(SendError),
441}
442
443impl<'a, K, V, Codec> Iterator for IterMut<'a, K, V, Codec>
444where
445    K: Clone + RemoteSend,
446    V: Clone + RemoteSend,
447    Codec: remoc::codec::Codec,
448{
449    type Item = RefMut<'a, K, V, Codec>;
450
451    fn next(&mut self) -> Option<Self::Item> {
452        match self.inner.next() {
453            Some((key, value)) => Some(RefMut {
454                key: key.clone(),
455                value,
456                changed: false,
457                tx: self.tx,
458                change: self.change,
459                on_err: self.on_err,
460            }),
461            None => None,
462        }
463    }
464
465    fn size_hint(&self) -> (usize, Option<usize>) {
466        self.inner.size_hint()
467    }
468}
469
470impl<'a, K, V, Codec> ExactSizeIterator for IterMut<'a, K, V, Codec>
471where
472    K: Clone + RemoteSend,
473    V: Clone + RemoteSend,
474    Codec: remoc::codec::Codec,
475{
476    fn len(&self) -> usize {
477        self.inner.len()
478    }
479}
480
481impl<'a, K, V, Codec> FusedIterator for IterMut<'a, K, V, Codec>
482where
483    K: Clone + RemoteSend,
484    V: Clone + RemoteSend,
485    Codec: remoc::codec::Codec,
486{
487}
488
489/// A view into a single entry in an observable hash map, which may either be
490/// vacant or occupied.
491///
492/// This is returned by [ObservableHashMap::entry].
493#[derive(Debug)]
494pub enum Entry<'a, K, V, Codec = remoc::codec::Default> {
495    /// An occupied entry.
496    Occupied(OccupiedEntry<'a, K, V, Codec>),
497    /// A vacant entry.
498    Vacant(VacantEntry<'a, K, V, Codec>),
499}
500
501impl<'a, K, V, Codec> Entry<'a, K, V, Codec>
502where
503    K: Clone + RemoteSend,
504    V: Clone + RemoteSend,
505    Codec: remoc::codec::Codec,
506{
507    /// Ensures a value is in the entry by inserting the default if empty,
508    /// and returns a mutable reference to the value in the entry.    
509    pub fn or_insert(self, default: V) -> RefMut<'a, K, V, Codec> {
510        match self {
511            Self::Occupied(ocu) => ocu.into_mut(),
512            Self::Vacant(vac) => vac.insert(default),
513        }
514    }
515
516    /// Ensures a value is in the entry by inserting the result of the default
517    /// function if empty, and returns a mutable reference to the value in the entry.    
518    pub fn or_insert_with<F: FnOnce() -> V>(self, default: F) -> RefMut<'a, K, V, Codec> {
519        match self {
520            Self::Occupied(ocu) => ocu.into_mut(),
521            Self::Vacant(vac) => vac.insert(default()),
522        }
523    }
524
525    /// Ensures a value is in the entry by inserting the result of the default
526    /// function with key as argument if empty, and returns a mutable reference to the value in the entry.    
527    pub fn or_insert_with_key<F: FnOnce(&K) -> V>(self, default: F) -> RefMut<'a, K, V, Codec> {
528        match self {
529            Self::Occupied(ocu) => ocu.into_mut(),
530            Self::Vacant(vac) => {
531                let value = default(vac.key());
532                vac.insert(value)
533            }
534        }
535    }
536
537    /// Returns a reference to this entry’s key.
538    pub fn key(&self) -> &K {
539        match self {
540            Self::Occupied(ocu) => ocu.key(),
541            Self::Vacant(vac) => vac.key(),
542        }
543    }
544
545    /// Provides in-place mutable access to an occupied entry before any potential inserts into the map.
546    pub fn and_modify<F: FnOnce(&mut V)>(mut self, f: F) -> Self {
547        if let Self::Occupied(ocu) = &mut self {
548            let mut value = ocu.get_mut();
549            f(&mut *value);
550        }
551        self
552    }
553}
554
555impl<'a, K, V, Codec> Entry<'a, K, V, Codec>
556where
557    K: Clone + RemoteSend,
558    V: Clone + RemoteSend + Default,
559    Codec: remoc::codec::Codec,
560{
561    /// Ensures a value is in the entry by inserting the default value if empty,
562    /// and returns a mutable reference to the value in the entry.    
563    pub fn or_default(self) -> RefMut<'a, K, V, Codec> {
564        self.or_insert_with(V::default)
565    }
566}
567
568/// A view into an occupied entry in an observable hash map.
569pub struct OccupiedEntry<'a, K, V, Codec = remoc::codec::Default> {
570    inner: std::collections::hash_map::OccupiedEntry<'a, K, V>,
571    tx: &'a rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
572    change: &'a ChangeSender,
573    on_err: &'a dyn Fn(SendError),
574}
575
576impl<'a, K, V, Codec> fmt::Debug for OccupiedEntry<'a, K, V, Codec>
577where
578    K: fmt::Debug,
579    V: fmt::Debug,
580{
581    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
582        self.inner.fmt(f)
583    }
584}
585
586impl<'a, K, V, Codec> OccupiedEntry<'a, K, V, Codec>
587where
588    K: Clone + RemoteSend,
589    V: Clone + RemoteSend,
590    Codec: remoc::codec::Codec,
591{
592    /// Gets a reference to the key in the entry.
593    pub fn key(&self) -> &K {
594        self.inner.key()
595    }
596
597    /// Take the ownership of the key and value from the map.
598    ///
599    /// A [HashMapEvent::Remove] change event is sent.
600    pub fn remove_entry(self) -> (K, V) {
601        let (k, v) = self.inner.remove_entry();
602        self.change.notify();
603        send_event(self.tx, &*self.on_err, HashMapEvent::Remove(k.clone()));
604        (k, v)
605    }
606
607    /// Gets a reference to the value in the entry.
608    pub fn get(&self) -> &V {
609        self.inner.get()
610    }
611
612    /// Gets a mutable reference to the value in the entry.
613    pub fn get_mut(&mut self) -> RefMut<'_, K, V, Codec> {
614        RefMut {
615            key: self.inner.key().clone(),
616            value: self.inner.get_mut(),
617            changed: false,
618            tx: self.tx,
619            change: self.change,
620            on_err: &*self.on_err,
621        }
622    }
623
624    /// Converts this into a mutable reference to the value in
625    /// the entry with a lifetime bound to the map itself.
626    pub fn into_mut(self) -> RefMut<'a, K, V, Codec> {
627        let key = self.inner.key().clone();
628        RefMut {
629            key,
630            value: self.inner.into_mut(),
631            changed: false,
632            tx: self.tx,
633            change: self.change,
634            on_err: &*self.on_err,
635        }
636    }
637
638    /// Sets the value of the entry, and returns the entry’s old value.
639    ///
640    /// A [HashMapEvent::Set] change event is sent.
641    pub fn insert(&mut self, value: V) -> V {
642        self.change.notify();
643        send_event(self.tx, &*self.on_err, HashMapEvent::Set(self.inner.key().clone(), value.clone()));
644        self.inner.insert(value)
645    }
646
647    /// Takes the value out of the entry, and returns it.
648    ///
649    /// A [HashMapEvent::Remove] change event is sent.
650    pub fn remove(self) -> V {
651        let (k, v) = self.inner.remove_entry();
652        self.change.notify();
653        send_event(self.tx, &*self.on_err, HashMapEvent::Remove(k));
654        v
655    }
656}
657
658/// A view into a vacant entry in an observable hash map.
659pub struct VacantEntry<'a, K, V, Codec = remoc::codec::Default> {
660    inner: std::collections::hash_map::VacantEntry<'a, K, V>,
661    tx: &'a rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
662    change: &'a ChangeSender,
663    on_err: &'a dyn Fn(SendError),
664}
665
666impl<'a, K, V, Codec> fmt::Debug for VacantEntry<'a, K, V, Codec>
667where
668    K: fmt::Debug,
669{
670    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
671        self.inner.fmt(f)
672    }
673}
674
675impl<'a, K, V, Codec> VacantEntry<'a, K, V, Codec>
676where
677    K: Clone + RemoteSend,
678    V: Clone + RemoteSend,
679    Codec: remoc::codec::Codec,
680{
681    /// Gets a reference to the key.
682    pub fn key(&self) -> &K {
683        self.inner.key()
684    }
685
686    /// Take ownership of the key.
687    pub fn into_key(self) -> K {
688        self.inner.into_key()
689    }
690
691    /// Sets the value of the entry, and returns a mutable reference to it.
692    ///
693    /// A [HashMapEvent::Set] change event is sent.
694    pub fn insert(self, value: V) -> RefMut<'a, K, V, Codec> {
695        let key = self.inner.key().clone();
696        self.change.notify();
697        send_event(self.tx, &*self.on_err, HashMapEvent::Set(key.clone(), value.clone()));
698        let value = self.inner.insert(value);
699        RefMut { key, value, changed: false, tx: self.tx, change: self.change, on_err: &*self.on_err }
700    }
701}
702
703struct MirroredHashMapInner<K, V> {
704    hm: HashMap<K, V>,
705    complete: bool,
706    done: bool,
707    error: Option<RecvError>,
708    max_size: usize,
709}
710
711impl<K, V> MirroredHashMapInner<K, V>
712where
713    K: Eq + Hash,
714{
715    fn handle_event(&mut self, event: HashMapEvent<K, V>) -> Result<(), RecvError> {
716        match event {
717            HashMapEvent::InitialComplete => {
718                self.complete = true;
719            }
720            HashMapEvent::Set(k, v) => {
721                self.hm.insert(k, v);
722                if self.hm.len() > self.max_size {
723                    return Err(RecvError::MaxSizeExceeded(self.max_size));
724                }
725            }
726            HashMapEvent::Remove(k) => {
727                self.hm.remove(&k);
728            }
729            HashMapEvent::Clear => {
730                self.hm.clear();
731            }
732            HashMapEvent::ShrinkToFit => {
733                self.hm.shrink_to_fit();
734            }
735            HashMapEvent::Done => {
736                self.done = true;
737            }
738        }
739        Ok(())
740    }
741}
742
743/// Initial value of an observable hash map subscription.
744#[derive(Debug, Serialize, Deserialize)]
745#[serde(bound(serialize = "K: RemoteSend + Eq + Hash, V: RemoteSend, Codec: remoc::codec::Codec"))]
746#[serde(bound(deserialize = "K: RemoteSend + Eq + Hash, V: RemoteSend, Codec: remoc::codec::Codec"))]
747enum HashMapInitialValue<K, V, Codec = remoc::codec::Default> {
748    /// Initial value is present.
749    Value(HashMap<K, V>),
750    /// Initial value is received incrementally.
751    Incremental {
752        /// Number of elements.
753        len: usize,
754        /// Receiver.
755        rx: rch::mpsc::Receiver<(K, V), Codec>,
756    },
757}
758
759impl<K, V, Codec> HashMapInitialValue<K, V, Codec>
760where
761    K: RemoteSend + Eq + Hash + Clone,
762    V: RemoteSend + Clone,
763    Codec: remoc::codec::Codec,
764{
765    /// Transmits the initial value as a whole.
766    fn new_value(hm: HashMap<K, V>) -> Self {
767        Self::Value(hm)
768    }
769
770    /// Transmits the initial value incrementally.
771    fn new_incremental(hm: HashMap<K, V>, on_err: Arc<dyn Fn(SendError) + Send + Sync>) -> Self {
772        let (tx, rx) = rch::mpsc::channel(128);
773        let len = hm.len();
774
775        tokio::spawn(async move {
776            for (k, v) in hm.into_iter() {
777                match tx.send((k, v)).await {
778                    Ok(()) => (),
779                    Err(err) if err.is_disconnected() => break,
780                    Err(err) => match err.try_into() {
781                        Ok(err) => (on_err)(err),
782                        Err(_) => unreachable!(),
783                    },
784                }
785            }
786        });
787
788        Self::Incremental { len, rx }
789    }
790}
791
792/// Observable hash map subscription.
793///
794/// This can be sent to a remote endpoint via a [remote channel](remoc::rch).
795/// Then, on the remote endpoint, [mirror](Self::mirror) can be used to build
796/// and keep up-to-date a mirror of the observed hash map.
797///
798/// The event stream can also be processed event-wise using [recv](Self::recv).
799/// If the subscription is not incremental [take_initial](Self::take_initial) must
800/// be called before the first call to [recv](Self::recv).
801#[derive(Debug, Serialize, Deserialize)]
802#[serde(bound(serialize = "K: RemoteSend + Eq + Hash, V: RemoteSend, Codec: remoc::codec::Codec"))]
803#[serde(bound(deserialize = "K: RemoteSend + Eq + Hash, V: RemoteSend, Codec: remoc::codec::Codec"))]
804pub struct HashMapSubscription<K, V, Codec = remoc::codec::Default> {
805    /// Value of hash map at time of subscription.
806    initial: HashMapInitialValue<K, V, Codec>,
807    /// Initial value received completely.
808    #[serde(skip, default)]
809    complete: bool,
810    /// Change events receiver.
811    ///
812    /// `None` if [ObservableHashMap::done] has been called before subscribing.
813    events: Option<rch::broadcast::Receiver<HashMapEvent<K, V>, Codec>>,
814    /// Event stream ended.
815    #[serde(skip, default)]
816    done: bool,
817}
818
819impl<K, V, Codec> HashMapSubscription<K, V, Codec>
820where
821    K: RemoteSend + Eq + Hash + Clone,
822    V: RemoteSend + Clone,
823    Codec: remoc::codec::Codec,
824{
825    fn new(
826        initial: HashMapInitialValue<K, V, Codec>,
827        events: Option<rch::broadcast::Receiver<HashMapEvent<K, V>, Codec>>,
828    ) -> Self {
829        Self { initial, complete: false, events, done: false }
830    }
831
832    /// Returns whether the subscription is incremental.
833    pub fn is_incremental(&self) -> bool {
834        matches!(self.initial, HashMapInitialValue::Incremental { .. })
835    }
836
837    /// Returns whether the initial value event or
838    /// stream of events that build up the initial value
839    /// has completed or [take_initial](Self::take_initial) has been called.
840    pub fn is_complete(&self) -> bool {
841        self.complete
842    }
843
844    /// Returns whether the observed hash map has indicated that no further
845    /// change events will occur.
846    pub fn is_done(&self) -> bool {
847        self.events.is_none() || self.done
848    }
849
850    /// Take the initial value.
851    ///
852    /// This is only possible if the subscription is not incremental
853    /// and the initial value has not already been taken.
854    /// Otherwise `None` is returned.
855    ///
856    /// If the subscription is not incremental this must be called before the
857    /// first call to [recv](Self::recv).
858    pub fn take_initial(&mut self) -> Option<HashMap<K, V>> {
859        match &mut self.initial {
860            HashMapInitialValue::Value(value) if !self.complete => {
861                self.complete = true;
862                Some(take(value))
863            }
864            _ => None,
865        }
866    }
867
868    /// Receives the next change event.
869    ///
870    /// # Panics
871    /// Panics when the subscription is not incremental and [take_initial](Self::take_initial)
872    /// has not been called.
873    pub async fn recv(&mut self) -> Result<Option<HashMapEvent<K, V>>, RecvError> {
874        // Provide initial value events.
875        if !self.complete {
876            match &mut self.initial {
877                HashMapInitialValue::Incremental { len, rx } => {
878                    if *len > 0 {
879                        match rx.recv().await? {
880                            Some((k, v)) => {
881                                // Provide incremental initial value event.
882                                *len -= 1;
883                                return Ok(Some(HashMapEvent::Set(k, v)));
884                            }
885                            None => return Err(RecvError::Closed),
886                        }
887                    } else {
888                        // Provide incremental initial value complete event.
889                        self.complete = true;
890                        return Ok(Some(HashMapEvent::InitialComplete));
891                    }
892                }
893                HashMapInitialValue::Value(_) => {
894                    panic!("take_initial must be called before recv for non-incremental subscription");
895                }
896            }
897        }
898
899        // Provide change event.
900        if let Some(rx) = &mut self.events {
901            match rx.recv().await? {
902                HashMapEvent::Done => self.events = None,
903                evt => return Ok(Some(evt)),
904            }
905        }
906
907        // Provide done event.
908        if self.done {
909            Ok(None)
910        } else {
911            self.done = true;
912            Ok(Some(HashMapEvent::Done))
913        }
914    }
915}
916
917impl<K, V, Codec> HashMapSubscription<K, V, Codec>
918where
919    K: RemoteSend + Eq + Hash + Clone + Sync,
920    V: RemoteSend + Clone + Sync,
921    Codec: remoc::codec::Codec,
922{
923    /// Mirror the hash map that this subscription is observing.
924    ///
925    /// `max_size` specifies the maximum allowed size of the mirrored collection.
926    /// If this size is reached, processing of events is stopped and
927    /// [RecvError::MaxSizeExceeded] is returned.
928    pub fn mirror(mut self, max_size: usize) -> MirroredHashMap<K, V, Codec> {
929        let (tx, _rx) = rch::broadcast::channel::<_, _, rch::buffer::Default>(1);
930        let (changed_tx, changed_rx) = watch::channel(());
931        let (dropped_tx, mut dropped_rx) = oneshot::channel();
932
933        // Build initial state.
934        let inner = Arc::new(RwLock::new(Some(MirroredHashMapInner {
935            hm: self.take_initial().unwrap_or_default(),
936            complete: self.is_complete(),
937            done: self.is_done(),
938            error: None,
939            max_size,
940        })));
941        let inner_task = inner.clone();
942
943        // Process change events.
944        let tx_send = tx.clone();
945        tokio::spawn(async move {
946            loop {
947                let event = tokio::select! {
948                    event = self.recv() => event,
949                    _ = &mut dropped_rx => return,
950                };
951
952                let mut inner = inner_task.write().await;
953                let mut inner = match inner.as_mut() {
954                    Some(inner) => inner,
955                    None => return,
956                };
957
958                changed_tx.send_replace(());
959
960                match event {
961                    Ok(Some(event)) => {
962                        if tx_send.receiver_count() > 0 {
963                            let _ = tx_send.send(event.clone());
964                        }
965
966                        if let Err(err) = inner.handle_event(event) {
967                            inner.error = Some(err);
968                            return;
969                        }
970
971                        if inner.done {
972                            break;
973                        }
974                    }
975                    Ok(None) => break,
976                    Err(err) => {
977                        inner.error = Some(err);
978                        return;
979                    }
980                }
981            }
982        });
983
984        MirroredHashMap { inner, tx, changed_rx, _dropped_tx: dropped_tx }
985    }
986}
987
988/// A hash map that is mirroring an observable hash map.
989pub struct MirroredHashMap<K, V, Codec = remoc::codec::Default> {
990    inner: Arc<RwLock<Option<MirroredHashMapInner<K, V>>>>,
991    tx: rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
992    changed_rx: watch::Receiver<()>,
993    _dropped_tx: oneshot::Sender<()>,
994}
995
996impl<K, V, Codec> fmt::Debug for MirroredHashMap<K, V, Codec> {
997    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
998        f.debug_struct("MirroredHashMap").finish()
999    }
1000}
1001
1002impl<K, V, Codec> MirroredHashMap<K, V, Codec>
1003where
1004    K: RemoteSend + Eq + Hash + Clone,
1005    V: RemoteSend + Clone,
1006    Codec: remoc::codec::Codec,
1007{
1008    /// Returns a reference to the current value of the hash map.
1009    ///
1010    /// Updates are paused while the read lock is held.
1011    ///
1012    /// This method returns an error if the observed hash map has been dropped
1013    /// or the connection to it failed before it was marked as done by calling
1014    /// [ObservableHashMap::done].
1015    /// In this case the mirrored contents at the point of loss of connection
1016    /// can be obtained using [detach](Self::detach).
1017    pub async fn borrow(&self) -> Result<MirroredHashMapRef<'_, K, V>, RecvError> {
1018        let inner = self.inner.read().await;
1019        let inner = RwLockReadGuard::map(inner, |inner| inner.as_ref().unwrap());
1020        match &inner.error {
1021            None => Ok(MirroredHashMapRef(inner)),
1022            Some(err) => Err(err.clone()),
1023        }
1024    }
1025
1026    /// Returns a reference to the current value of the hash map and marks it as seen.
1027    ///
1028    /// Thus [changed](Self::changed) will not return immediately until the value changes
1029    /// after this method returns.
1030    ///
1031    /// Updates are paused while the read lock is held.
1032    ///
1033    /// This method returns an error if the observed hash map has been dropped
1034    /// or the connection to it failed before it was marked as done by calling
1035    /// [ObservableHashMap::done].
1036    /// In this case the mirrored contents at the point of loss of connection
1037    /// can be obtained using [detach](Self::detach).
1038    pub async fn borrow_and_update(&mut self) -> Result<MirroredHashMapRef<'_, K, V>, RecvError> {
1039        let inner = self.inner.read().await;
1040        self.changed_rx.borrow_and_update();
1041        let inner = RwLockReadGuard::map(inner, |inner| inner.as_ref().unwrap());
1042        match &inner.error {
1043            None => Ok(MirroredHashMapRef(inner)),
1044            Some(err) => Err(err.clone()),
1045        }
1046    }
1047
1048    /// Stops updating the hash map and returns its current contents.
1049    pub async fn detach(self) -> HashMap<K, V> {
1050        let mut inner = self.inner.write().await;
1051        inner.take().unwrap().hm
1052    }
1053
1054    /// Waits for a change and marks the newest value as seen.
1055    ///
1056    /// This also returns when connection to the observed hash map has been lost
1057    /// or the hash map has been marked as done.
1058    pub async fn changed(&mut self) {
1059        let _ = self.changed_rx.changed().await;
1060    }
1061
1062    /// Subscribes to change events from this mirrored hash map.
1063    ///
1064    /// The current contents of the hash map is included with the subscription.
1065    ///
1066    /// `buffer` specifies the maximum size of the event buffer for this subscription in number of events.
1067    /// If it is exceeded the subscription is shed and the receiver gets a [RecvError::Lagged].
1068    pub async fn subscribe(&self, buffer: usize) -> Result<HashMapSubscription<K, V, Codec>, RecvError> {
1069        let view = self.borrow().await?;
1070        let initial = view.clone();
1071        let events = if view.is_done() { None } else { Some(self.tx.subscribe(buffer)) };
1072
1073        Ok(HashMapSubscription::new(HashMapInitialValue::new_value(initial), events))
1074    }
1075
1076    /// Subscribes to change events from this mirrored hash map with incremental sending
1077    /// of the current contents.
1078    ///
1079    /// The current contents of the hash map are sent incrementally.
1080    ///
1081    /// `buffer` specifies the maximum size of the event buffer for this subscription in number of events.
1082    /// If it is exceeded the subscription is shed and the receiver gets a [RecvError::Lagged].    
1083    pub async fn subscribe_incremental(
1084        &self, buffer: usize,
1085    ) -> Result<HashMapSubscription<K, V, Codec>, RecvError> {
1086        let view = self.borrow().await?;
1087        let initial = view.clone();
1088        let events = if view.is_done() { None } else { Some(self.tx.subscribe(buffer)) };
1089
1090        Ok(HashMapSubscription::new(
1091            HashMapInitialValue::new_incremental(initial, Arc::new(default_on_err)),
1092            events,
1093        ))
1094    }
1095}
1096
1097impl<K, V, Codec> Drop for MirroredHashMap<K, V, Codec> {
1098    fn drop(&mut self) {
1099        // empty
1100    }
1101}
1102
1103/// A snapshot view of an observable hash map.
1104pub struct MirroredHashMapRef<'a, K, V>(RwLockReadGuard<'a, MirroredHashMapInner<K, V>>);
1105
1106impl<'a, K, V> MirroredHashMapRef<'a, K, V> {
1107    /// Returns `true` if the initial state of an incremental subscription has
1108    /// been reached.
1109    pub fn is_complete(&self) -> bool {
1110        self.0.complete
1111    }
1112
1113    /// Returns `true` if the observed hash map has been marked as done by calling
1114    /// [ObservableHashMap::done] and thus no further changes can occur.
1115    pub fn is_done(&self) -> bool {
1116        self.0.done
1117    }
1118}
1119
1120impl<'a, K, V> fmt::Debug for MirroredHashMapRef<'a, K, V>
1121where
1122    K: fmt::Debug,
1123    V: fmt::Debug,
1124{
1125    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1126        self.0.hm.fmt(f)
1127    }
1128}
1129
1130impl<'a, K, V> Deref for MirroredHashMapRef<'a, K, V> {
1131    type Target = HashMap<K, V>;
1132
1133    fn deref(&self) -> &Self::Target {
1134        &self.0.hm
1135    }
1136}
1137
1138impl<'a, K, V> Drop for MirroredHashMapRef<'a, K, V> {
1139    fn drop(&mut self) {
1140        // required for drop order
1141    }
1142}