remoc/robs/
hash_map.rs

1//! Observable hash map.
2//!
3//! This provides a locally and remotely observable hash map.
4//! The observable hash map sends a change event each time a change is performed on it.
5//! The [resulting event stream](HashMapSubscription) can either be processed event-wise
6//! or used to build a [mirrored hash map](MirroredHashMap).
7//!
8//! Changes are sent using a [remote broadcast channel](crate::rch::broadcast), thus
9//! subscribers cannot block the observed hash map and are shed when their event buffer
10//! exceeds a configurable size.
11//!
12//! # Basic use
13//!
14//! Create a [ObservableHashMap] and obtain a [subscription](HashMapSubscription) to it using
15//! [ObservableHashMap::subscribe].
16//! Send this subscription to a remote endpoint via a [remote channel](crate::rch) and call
17//! [HashMapSubscription::mirror] on the remote endpoint to obtain a live mirror of the observed
18//! hash map or process each change event individually using [HashMapSubscription::recv].
19//!
20
21use serde::{Deserialize, Serialize};
22use std::{
23    collections::HashMap,
24    fmt,
25    hash::Hash,
26    iter::FusedIterator,
27    mem::take,
28    ops::{Deref, DerefMut},
29    sync::Arc,
30};
31use tokio::sync::{RwLock, RwLockReadGuard, oneshot, watch};
32use tracing::Instrument;
33
34use super::{ChangeNotifier, ChangeSender, RecvError, SendError, default_on_err, send_event};
35use crate::{exec, prelude::*};
36
37/// A hash map change event.
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub enum HashMapEvent<K, V> {
40    /// An item was inserted or modified.
41    Set(K, V),
42    /// An item was removed.
43    Remove(K),
44    /// All items were removed.
45    Clear,
46    /// Shrink capacity to fit.
47    ShrinkToFit,
48    /// The hash map has reached its final state and
49    /// no further events will occur.
50    Done,
51
52    // NOTE: All variants with #[serde(skip)] must be at the end of the enum
53    //       to workaround serde issue serde-rs/serde#2224.
54    //
55    /// The incremental subscription has reached the value of the observed
56    /// hash map at the time it was subscribed.
57    #[serde(skip)]
58    InitialComplete,
59}
60
61/// A hash map that emits an event for each change.
62///
63/// Use [subscribe](Self::subscribe) to obtain an event stream
64/// that can be used for building a mirror of this hash map.
65pub struct ObservableHashMap<K, V, Codec = crate::codec::Default> {
66    hm: HashMap<K, V>,
67    tx: rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
68    change: ChangeSender,
69    on_err: Arc<dyn Fn(SendError) + Send + Sync>,
70    done: bool,
71}
72
73impl<K, V, Codec> fmt::Debug for ObservableHashMap<K, V, Codec>
74where
75    K: fmt::Debug,
76    V: fmt::Debug,
77{
78    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
79        self.hm.fmt(f)
80    }
81}
82
83impl<K, V, Codec> From<HashMap<K, V>> for ObservableHashMap<K, V, Codec>
84where
85    K: Clone + RemoteSend,
86    V: Clone + RemoteSend,
87    Codec: crate::codec::Codec,
88{
89    fn from(hm: HashMap<K, V>) -> Self {
90        let (tx, _rx) = rch::broadcast::channel::<_, _, { rch::DEFAULT_BUFFER }>(1);
91        Self { hm, tx, on_err: Arc::new(default_on_err), change: ChangeSender::new(), done: false }
92    }
93}
94
95impl<K, V, Codec> From<ObservableHashMap<K, V, Codec>> for HashMap<K, V> {
96    fn from(ohm: ObservableHashMap<K, V, Codec>) -> Self {
97        ohm.hm
98    }
99}
100
101impl<K, V, Codec> Default for ObservableHashMap<K, V, Codec>
102where
103    K: Clone + RemoteSend,
104    V: Clone + RemoteSend,
105    Codec: crate::codec::Codec,
106{
107    fn default() -> Self {
108        Self::from(HashMap::new())
109    }
110}
111
112impl<K, V, Codec> ObservableHashMap<K, V, Codec>
113where
114    K: Eq + Hash + Clone + RemoteSend,
115    V: Clone + RemoteSend,
116    Codec: crate::codec::Codec,
117{
118    /// Creates an empty observable hash map.
119    pub fn new() -> Self {
120        Self::default()
121    }
122
123    /// Sets the error handler function that is called when sending an
124    /// event fails.
125    pub fn set_error_handler<E>(&mut self, on_err: E)
126    where
127        E: Fn(SendError) + Send + Sync + 'static,
128    {
129        self.on_err = Arc::new(on_err);
130    }
131
132    /// Subscribes to change events from this observable hash map.
133    ///
134    /// The current contents of the hash map is included with the subscription.
135    ///
136    /// `buffer` specifies the maximum size of the event buffer for this subscription in number of events.
137    /// If it is exceeded the subscription is shed and the receiver gets a [RecvError::Lagged].
138    pub fn subscribe(&self, buffer: usize) -> HashMapSubscription<K, V, Codec> {
139        HashMapSubscription::new(
140            HashMapInitialValue::new_value(self.hm.clone()),
141            if self.done { None } else { Some(self.tx.subscribe(buffer)) },
142        )
143    }
144
145    /// Subscribes to change events from this observable hash map with incremental sending
146    /// of the current contents.
147    ///
148    /// The current contents of the hash map are sent incrementally.
149    ///
150    /// `buffer` specifies the maximum size of the event buffer for this subscription in number of events.
151    /// If it is exceeded the subscription is shed and the receiver gets a [RecvError::Lagged].
152    pub fn subscribe_incremental(&self, buffer: usize) -> HashMapSubscription<K, V, Codec> {
153        HashMapSubscription::new(
154            HashMapInitialValue::new_incremental(self.hm.clone(), self.on_err.clone()),
155            if self.done { None } else { Some(self.tx.subscribe(buffer)) },
156        )
157    }
158
159    /// Current number of subscribers.
160    pub fn subscriber_count(&self) -> usize {
161        self.tx.receiver_count()
162    }
163
164    /// Returns a [change notifier](ChangeNotifier) that can be used *locally* to be
165    /// notified of changes to this collection.
166    pub fn notifier(&self) -> ChangeNotifier {
167        self.change.subscribe()
168    }
169
170    /// Inserts a value under a key.
171    ///
172    /// A [HashMapEvent::Set] change event is sent.
173    ///
174    /// Returns the value previously stored under the key, if any.
175    ///
176    /// # Panics
177    /// Panics when [done](Self::done) has been called before.
178    pub fn insert(&mut self, k: K, v: V) -> Option<V> {
179        self.assert_not_done();
180        self.change.notify();
181
182        send_event(&self.tx, &*self.on_err, HashMapEvent::Set(k.clone(), v.clone()));
183        self.hm.insert(k, v)
184    }
185
186    /// Removes the value under the specified key.
187    ///
188    /// A [HashMapEvent::Remove] change event is sent.
189    ///
190    /// The value is returned.
191    ///
192    /// # Panics
193    /// Panics when [done](Self::done) has been called before.
194    pub fn remove<Q>(&mut self, k: &Q) -> Option<V>
195    where
196        K: std::borrow::Borrow<Q>,
197        Q: Hash + Eq,
198    {
199        self.assert_not_done();
200
201        match self.hm.remove_entry(k) {
202            Some((k, v)) => {
203                self.change.notify();
204                send_event(&self.tx, &*self.on_err, HashMapEvent::Remove(k));
205                Some(v)
206            }
207            None => None,
208        }
209    }
210
211    /// Removes all items.
212    ///
213    /// A [HashMapEvent::Clear] change event is sent.
214    ///
215    /// # Panics
216    /// Panics when [done](Self::done) has been called before.
217    pub fn clear(&mut self) {
218        self.assert_not_done();
219
220        if !self.hm.is_empty() {
221            self.hm.clear();
222            self.change.notify();
223            send_event(&self.tx, &*self.on_err, HashMapEvent::Clear);
224        }
225    }
226
227    /// Retains only the elements specified by the predicate.
228    ///
229    /// A [HashMapEvent::Remove] change event is sent for every element that is removed.
230    ///
231    /// # Panics
232    /// Panics when [done](Self::done) has been called before.
233    pub fn retain<F>(&mut self, mut f: F)
234    where
235        F: FnMut(&K, &mut V) -> bool,
236    {
237        self.assert_not_done();
238
239        self.hm.retain(|k, v| {
240            if f(k, v) {
241                true
242            } else {
243                self.change.notify();
244                send_event(&self.tx, &*self.on_err, HashMapEvent::Remove(k.clone()));
245                false
246            }
247        });
248    }
249
250    /// Gets the given key’s corresponding entry in the map for in-place manipulation.
251    ///
252    /// # Panics
253    /// Panics when [done](Self::done) has been called before.
254    pub fn entry(&mut self, key: K) -> Entry<'_, K, V, Codec> {
255        self.assert_not_done();
256
257        match self.hm.entry(key) {
258            std::collections::hash_map::Entry::Occupied(inner) => Entry::Occupied(OccupiedEntry {
259                inner,
260                tx: &self.tx,
261                change: &self.change,
262                on_err: &*self.on_err,
263            }),
264            std::collections::hash_map::Entry::Vacant(inner) => {
265                Entry::Vacant(VacantEntry { inner, tx: &self.tx, change: &self.change, on_err: &*self.on_err })
266            }
267        }
268    }
269
270    /// Gets a mutable reference to the value under the specified key.
271    ///
272    /// A [HashMapEvent::Set] change event is sent if the reference is accessed mutably.
273    ///
274    /// # Panics
275    /// Panics when [done](Self::done) has been called before.
276    pub fn get_mut<Q>(&mut self, k: &Q) -> Option<RefMut<'_, K, V, Codec>>
277    where
278        K: std::borrow::Borrow<Q>,
279        Q: Hash + Eq,
280    {
281        self.assert_not_done();
282
283        match self.hm.get_key_value(k) {
284            Some((key, _)) => {
285                let key = key.clone();
286                let value = self.hm.get_mut(k).unwrap();
287                Some(RefMut {
288                    key,
289                    value,
290                    changed: false,
291                    tx: &self.tx,
292                    change: &self.change,
293                    on_err: &*self.on_err,
294                })
295            }
296            None => None,
297        }
298    }
299
300    /// Mutably iterates over the key-value pairs.
301    ///
302    /// A [HashMapEvent::Set] change event is sent for each value that is accessed mutably.
303    ///
304    /// # Panics
305    /// Panics when [done](Self::done) has been called before.
306    pub fn iter_mut(&mut self) -> IterMut<'_, K, V, Codec> {
307        self.assert_not_done();
308
309        IterMut { inner: self.hm.iter_mut(), tx: &self.tx, change: &self.change, on_err: &*self.on_err }
310    }
311
312    /// Shrinks the capacity of the hash map as much as possible.
313    ///
314    /// A [HashMapEvent::ShrinkToFit] change event is sent.
315    ///
316    /// # Panics
317    /// Panics when [done](Self::done) has been called before.
318    pub fn shrink_to_fit(&mut self) {
319        self.assert_not_done();
320        send_event(&self.tx, &*self.on_err, HashMapEvent::ShrinkToFit);
321        self.hm.shrink_to_fit()
322    }
323
324    /// Panics when `done` has been called.
325    fn assert_not_done(&self) {
326        if self.done {
327            panic!("observable hash map cannot be changed after done has been called");
328        }
329    }
330
331    /// Prevents further changes of this hash map and notifies
332    /// are subscribers that no further events will occur.
333    ///
334    /// Methods that modify the hash map will panic after this has been called.
335    /// It is still possible to subscribe to this observable hash map.
336    pub fn done(&mut self) {
337        if !self.done {
338            send_event(&self.tx, &*self.on_err, HashMapEvent::Done);
339            self.done = true;
340        }
341    }
342
343    /// Returns `true` if [done](Self::done) has been called and further
344    /// changes are prohibited.
345    ///
346    /// Methods that modify the hash map will panic in this case.
347    pub fn is_done(&self) -> bool {
348        self.done
349    }
350
351    /// Extracts the underlying hash map.
352    ///
353    /// If [done](Self::done) has not been called before this method,
354    /// subscribers will receive an error.
355    pub fn into_inner(self) -> HashMap<K, V> {
356        self.into()
357    }
358}
359
360impl<K, V, Codec> Deref for ObservableHashMap<K, V, Codec> {
361    type Target = HashMap<K, V>;
362
363    fn deref(&self) -> &Self::Target {
364        &self.hm
365    }
366}
367
368impl<K, V, Codec> Extend<(K, V)> for ObservableHashMap<K, V, Codec>
369where
370    K: Eq + Hash + Clone + RemoteSend,
371    V: Clone + RemoteSend,
372    Codec: crate::codec::Codec,
373{
374    fn extend<I: IntoIterator<Item = (K, V)>>(&mut self, iter: I) {
375        for (k, v) in iter {
376            self.insert(k, v);
377        }
378    }
379}
380
381/// A mutable reference to a value inside an [observable hash map](ObservableHashMap).
382///
383/// A [HashMapEvent::Set] change event is sent when this reference is dropped and the
384/// value has been accessed mutably.
385pub struct RefMut<'a, K, V, Codec>
386where
387    K: Clone + RemoteSend,
388    V: Clone + RemoteSend,
389    Codec: crate::codec::Codec,
390{
391    key: K,
392    value: &'a mut V,
393    changed: bool,
394    tx: &'a rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
395    change: &'a ChangeSender,
396    on_err: &'a dyn Fn(SendError),
397}
398
399impl<K, V, Codec> Deref for RefMut<'_, K, V, Codec>
400where
401    K: Clone + RemoteSend,
402    V: Clone + RemoteSend,
403    Codec: crate::codec::Codec,
404{
405    type Target = V;
406
407    fn deref(&self) -> &Self::Target {
408        self.value
409    }
410}
411
412impl<K, V, Codec> DerefMut for RefMut<'_, K, V, Codec>
413where
414    K: Clone + RemoteSend,
415    V: Clone + RemoteSend,
416    Codec: crate::codec::Codec,
417{
418    fn deref_mut(&mut self) -> &mut Self::Target {
419        self.changed = true;
420        self.value
421    }
422}
423
424impl<K, V, Codec> Drop for RefMut<'_, K, V, Codec>
425where
426    K: Clone + RemoteSend,
427    V: Clone + RemoteSend,
428    Codec: crate::codec::Codec,
429{
430    fn drop(&mut self) {
431        if self.changed {
432            self.change.notify();
433            send_event(self.tx, self.on_err, HashMapEvent::Set(self.key.clone(), self.value.clone()));
434        }
435    }
436}
437
438/// A mutable iterator over the key-value pairs in an [observable hash map](ObservableHashMap).
439///
440/// A [HashMapEvent::Set] change event is sent for each value that is accessed mutably.
441pub struct IterMut<'a, K, V, Codec = crate::codec::Default> {
442    inner: std::collections::hash_map::IterMut<'a, K, V>,
443    tx: &'a rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
444    change: &'a ChangeSender,
445    on_err: &'a dyn Fn(SendError),
446}
447
448impl<'a, K, V, Codec> Iterator for IterMut<'a, K, V, Codec>
449where
450    K: Clone + RemoteSend,
451    V: Clone + RemoteSend,
452    Codec: crate::codec::Codec,
453{
454    type Item = RefMut<'a, K, V, Codec>;
455
456    fn next(&mut self) -> Option<Self::Item> {
457        match self.inner.next() {
458            Some((key, value)) => Some(RefMut {
459                key: key.clone(),
460                value,
461                changed: false,
462                tx: self.tx,
463                change: self.change,
464                on_err: self.on_err,
465            }),
466            None => None,
467        }
468    }
469
470    fn size_hint(&self) -> (usize, Option<usize>) {
471        self.inner.size_hint()
472    }
473}
474
475impl<K, V, Codec> ExactSizeIterator for IterMut<'_, K, V, Codec>
476where
477    K: Clone + RemoteSend,
478    V: Clone + RemoteSend,
479    Codec: crate::codec::Codec,
480{
481    fn len(&self) -> usize {
482        self.inner.len()
483    }
484}
485
486impl<K, V, Codec> FusedIterator for IterMut<'_, K, V, Codec>
487where
488    K: Clone + RemoteSend,
489    V: Clone + RemoteSend,
490    Codec: crate::codec::Codec,
491{
492}
493
494/// A view into a single entry in an observable hash map, which may either be
495/// vacant or occupied.
496///
497/// This is returned by [ObservableHashMap::entry].
498#[derive(Debug)]
499pub enum Entry<'a, K, V, Codec = crate::codec::Default> {
500    /// An occupied entry.
501    Occupied(OccupiedEntry<'a, K, V, Codec>),
502    /// A vacant entry.
503    Vacant(VacantEntry<'a, K, V, Codec>),
504}
505
506impl<'a, K, V, Codec> Entry<'a, K, V, Codec>
507where
508    K: Clone + RemoteSend,
509    V: Clone + RemoteSend,
510    Codec: crate::codec::Codec,
511{
512    /// Ensures a value is in the entry by inserting the default if empty,
513    /// and returns a mutable reference to the value in the entry.
514    pub fn or_insert(self, default: V) -> RefMut<'a, K, V, Codec> {
515        match self {
516            Self::Occupied(ocu) => ocu.into_mut(),
517            Self::Vacant(vac) => vac.insert(default),
518        }
519    }
520
521    /// Ensures a value is in the entry by inserting the result of the default
522    /// function if empty, and returns a mutable reference to the value in the entry.
523    pub fn or_insert_with<F: FnOnce() -> V>(self, default: F) -> RefMut<'a, K, V, Codec> {
524        match self {
525            Self::Occupied(ocu) => ocu.into_mut(),
526            Self::Vacant(vac) => vac.insert(default()),
527        }
528    }
529
530    /// Ensures a value is in the entry by inserting the result of the default
531    /// function with key as argument if empty, and returns a mutable reference to the value in the entry.
532    pub fn or_insert_with_key<F: FnOnce(&K) -> V>(self, default: F) -> RefMut<'a, K, V, Codec> {
533        match self {
534            Self::Occupied(ocu) => ocu.into_mut(),
535            Self::Vacant(vac) => {
536                let value = default(vac.key());
537                vac.insert(value)
538            }
539        }
540    }
541
542    /// Returns a reference to this entry’s key.
543    pub fn key(&self) -> &K {
544        match self {
545            Self::Occupied(ocu) => ocu.key(),
546            Self::Vacant(vac) => vac.key(),
547        }
548    }
549
550    /// Provides in-place mutable access to an occupied entry before any potential inserts into the map.
551    pub fn and_modify<F: FnOnce(&mut V)>(mut self, f: F) -> Self {
552        if let Self::Occupied(ocu) = &mut self {
553            let mut value = ocu.get_mut();
554            f(&mut *value);
555        }
556        self
557    }
558}
559
560impl<'a, K, V, Codec> Entry<'a, K, V, Codec>
561where
562    K: Clone + RemoteSend,
563    V: Clone + RemoteSend + Default,
564    Codec: crate::codec::Codec,
565{
566    /// Ensures a value is in the entry by inserting the default value if empty,
567    /// and returns a mutable reference to the value in the entry.
568    pub fn or_default(self) -> RefMut<'a, K, V, Codec> {
569        #[allow(clippy::unwrap_or_default)]
570        self.or_insert_with(V::default)
571    }
572}
573
574/// A view into an occupied entry in an observable hash map.
575pub struct OccupiedEntry<'a, K, V, Codec = crate::codec::Default> {
576    inner: std::collections::hash_map::OccupiedEntry<'a, K, V>,
577    tx: &'a rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
578    change: &'a ChangeSender,
579    on_err: &'a dyn Fn(SendError),
580}
581
582impl<K, V, Codec> fmt::Debug for OccupiedEntry<'_, K, V, Codec>
583where
584    K: fmt::Debug,
585    V: fmt::Debug,
586{
587    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
588        self.inner.fmt(f)
589    }
590}
591
592impl<'a, K, V, Codec> OccupiedEntry<'a, K, V, Codec>
593where
594    K: Clone + RemoteSend,
595    V: Clone + RemoteSend,
596    Codec: crate::codec::Codec,
597{
598    /// Gets a reference to the key in the entry.
599    pub fn key(&self) -> &K {
600        self.inner.key()
601    }
602
603    /// Take the ownership of the key and value from the map.
604    ///
605    /// A [HashMapEvent::Remove] change event is sent.
606    pub fn remove_entry(self) -> (K, V) {
607        let (k, v) = self.inner.remove_entry();
608        self.change.notify();
609        send_event(self.tx, self.on_err, HashMapEvent::Remove(k.clone()));
610        (k, v)
611    }
612
613    /// Gets a reference to the value in the entry.
614    pub fn get(&self) -> &V {
615        self.inner.get()
616    }
617
618    /// Gets a mutable reference to the value in the entry.
619    pub fn get_mut(&mut self) -> RefMut<'_, K, V, Codec> {
620        RefMut {
621            key: self.inner.key().clone(),
622            value: self.inner.get_mut(),
623            changed: false,
624            tx: self.tx,
625            change: self.change,
626            on_err: self.on_err,
627        }
628    }
629
630    /// Converts this into a mutable reference to the value in
631    /// the entry with a lifetime bound to the map itself.
632    pub fn into_mut(self) -> RefMut<'a, K, V, Codec> {
633        let key = self.inner.key().clone();
634        RefMut {
635            key,
636            value: self.inner.into_mut(),
637            changed: false,
638            tx: self.tx,
639            change: self.change,
640            on_err: self.on_err,
641        }
642    }
643
644    /// Sets the value of the entry, and returns the entry’s old value.
645    ///
646    /// A [HashMapEvent::Set] change event is sent.
647    pub fn insert(&mut self, value: V) -> V {
648        self.change.notify();
649        send_event(self.tx, self.on_err, HashMapEvent::Set(self.inner.key().clone(), value.clone()));
650        self.inner.insert(value)
651    }
652
653    /// Takes the value out of the entry, and returns it.
654    ///
655    /// A [HashMapEvent::Remove] change event is sent.
656    pub fn remove(self) -> V {
657        let (k, v) = self.inner.remove_entry();
658        self.change.notify();
659        send_event(self.tx, self.on_err, HashMapEvent::Remove(k));
660        v
661    }
662}
663
664/// A view into a vacant entry in an observable hash map.
665pub struct VacantEntry<'a, K, V, Codec = crate::codec::Default> {
666    inner: std::collections::hash_map::VacantEntry<'a, K, V>,
667    tx: &'a rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
668    change: &'a ChangeSender,
669    on_err: &'a dyn Fn(SendError),
670}
671
672impl<K, V, Codec> fmt::Debug for VacantEntry<'_, K, V, Codec>
673where
674    K: fmt::Debug,
675{
676    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
677        self.inner.fmt(f)
678    }
679}
680
681impl<'a, K, V, Codec> VacantEntry<'a, K, V, Codec>
682where
683    K: Clone + RemoteSend,
684    V: Clone + RemoteSend,
685    Codec: crate::codec::Codec,
686{
687    /// Gets a reference to the key.
688    pub fn key(&self) -> &K {
689        self.inner.key()
690    }
691
692    /// Take ownership of the key.
693    pub fn into_key(self) -> K {
694        self.inner.into_key()
695    }
696
697    /// Sets the value of the entry, and returns a mutable reference to it.
698    ///
699    /// A [HashMapEvent::Set] change event is sent.
700    pub fn insert(self, value: V) -> RefMut<'a, K, V, Codec> {
701        let key = self.inner.key().clone();
702        self.change.notify();
703        send_event(self.tx, self.on_err, HashMapEvent::Set(key.clone(), value.clone()));
704        let value = self.inner.insert(value);
705        RefMut { key, value, changed: false, tx: self.tx, change: self.change, on_err: self.on_err }
706    }
707}
708
709struct MirroredHashMapInner<K, V> {
710    hm: HashMap<K, V>,
711    complete: bool,
712    done: bool,
713    error: Option<RecvError>,
714    max_size: usize,
715}
716
717impl<K, V> MirroredHashMapInner<K, V>
718where
719    K: Eq + Hash,
720{
721    fn handle_event(&mut self, event: HashMapEvent<K, V>) -> Result<(), RecvError> {
722        match event {
723            HashMapEvent::InitialComplete => {
724                self.complete = true;
725            }
726            HashMapEvent::Set(k, v) => {
727                self.hm.insert(k, v);
728                if self.hm.len() > self.max_size {
729                    return Err(RecvError::MaxSizeExceeded(self.max_size));
730                }
731            }
732            HashMapEvent::Remove(k) => {
733                self.hm.remove(&k);
734            }
735            HashMapEvent::Clear => {
736                self.hm.clear();
737            }
738            HashMapEvent::ShrinkToFit => {
739                self.hm.shrink_to_fit();
740            }
741            HashMapEvent::Done => {
742                self.done = true;
743            }
744        }
745        Ok(())
746    }
747}
748
749/// Initial value of an observable hash map subscription.
750#[derive(Debug, Serialize, Deserialize)]
751#[serde(bound(serialize = "K: RemoteSend + Eq + Hash, V: RemoteSend, Codec: crate::codec::Codec"))]
752#[serde(bound(deserialize = "K: RemoteSend + Eq + Hash, V: RemoteSend, Codec: crate::codec::Codec"))]
753enum HashMapInitialValue<K, V, Codec = crate::codec::Default> {
754    /// Initial value is present.
755    Value(HashMap<K, V>),
756    /// Initial value is received incrementally.
757    Incremental {
758        /// Number of elements.
759        len: usize,
760        /// Receiver.
761        rx: rch::mpsc::Receiver<(K, V), Codec>,
762    },
763}
764
765impl<K, V, Codec> HashMapInitialValue<K, V, Codec>
766where
767    K: RemoteSend + Eq + Hash + Clone,
768    V: RemoteSend + Clone,
769    Codec: crate::codec::Codec,
770{
771    /// Transmits the initial value as a whole.
772    fn new_value(hm: HashMap<K, V>) -> Self {
773        Self::Value(hm)
774    }
775
776    /// Transmits the initial value incrementally.
777    fn new_incremental(hm: HashMap<K, V>, on_err: Arc<dyn Fn(SendError) + Send + Sync>) -> Self {
778        let (tx, rx) = rch::mpsc::channel(128);
779        let len = hm.len();
780
781        exec::spawn(
782            async move {
783                for (k, v) in hm.into_iter() {
784                    match tx.send((k, v)).await {
785                        Ok(_) => (),
786                        Err(err) if err.is_disconnected() => break,
787                        Err(err) => match err.try_into() {
788                            Ok(err) => (on_err)(err),
789                            Err(_) => unreachable!(),
790                        },
791                    }
792                }
793            }
794            .in_current_span(),
795        );
796
797        Self::Incremental { len, rx }
798    }
799}
800
801/// Observable hash map subscription.
802///
803/// This can be sent to a remote endpoint via a [remote channel](crate::rch).
804/// Then, on the remote endpoint, [mirror](Self::mirror) can be used to build
805/// and keep up-to-date a mirror of the observed hash map.
806///
807/// The event stream can also be processed event-wise using [recv](Self::recv).
808/// If the subscription is not incremental [take_initial](Self::take_initial) must
809/// be called before the first call to [recv](Self::recv).
810#[derive(Debug, Serialize, Deserialize)]
811#[serde(bound(serialize = "K: RemoteSend + Eq + Hash, V: RemoteSend, Codec: crate::codec::Codec"))]
812#[serde(bound(deserialize = "K: RemoteSend + Eq + Hash, V: RemoteSend, Codec: crate::codec::Codec"))]
813pub struct HashMapSubscription<K, V, Codec = crate::codec::Default> {
814    /// Value of hash map at time of subscription.
815    initial: HashMapInitialValue<K, V, Codec>,
816    /// Initial value received completely.
817    #[serde(skip, default)]
818    complete: bool,
819    /// Change events receiver.
820    ///
821    /// `None` if [ObservableHashMap::done] has been called before subscribing.
822    events: Option<rch::broadcast::Receiver<HashMapEvent<K, V>, Codec>>,
823    /// Event stream ended.
824    #[serde(skip, default)]
825    done: bool,
826}
827
828impl<K, V, Codec> HashMapSubscription<K, V, Codec>
829where
830    K: RemoteSend + Eq + Hash + Clone,
831    V: RemoteSend + Clone,
832    Codec: crate::codec::Codec,
833{
834    fn new(
835        initial: HashMapInitialValue<K, V, Codec>,
836        events: Option<rch::broadcast::Receiver<HashMapEvent<K, V>, Codec>>,
837    ) -> Self {
838        Self { initial, complete: false, events, done: false }
839    }
840
841    /// Returns whether the subscription is incremental.
842    pub fn is_incremental(&self) -> bool {
843        matches!(self.initial, HashMapInitialValue::Incremental { .. })
844    }
845
846    /// Returns whether the initial value event or
847    /// stream of events that build up the initial value
848    /// has completed or [take_initial](Self::take_initial) has been called.
849    pub fn is_complete(&self) -> bool {
850        self.complete
851    }
852
853    /// Returns whether the observed hash map has indicated that no further
854    /// change events will occur.
855    pub fn is_done(&self) -> bool {
856        self.events.is_none() || self.done
857    }
858
859    /// Take the initial value.
860    ///
861    /// This is only possible if the subscription is not incremental
862    /// and the initial value has not already been taken.
863    /// Otherwise `None` is returned.
864    ///
865    /// If the subscription is not incremental this must be called before the
866    /// first call to [recv](Self::recv).
867    pub fn take_initial(&mut self) -> Option<HashMap<K, V>> {
868        match &mut self.initial {
869            HashMapInitialValue::Value(value) if !self.complete => {
870                self.complete = true;
871                Some(take(value))
872            }
873            _ => None,
874        }
875    }
876
877    /// Receives the next change event.
878    ///
879    /// # Panics
880    /// Panics when the subscription is not incremental and [take_initial](Self::take_initial)
881    /// has not been called.
882    pub async fn recv(&mut self) -> Result<Option<HashMapEvent<K, V>>, RecvError> {
883        // Provide initial value events.
884        if !self.complete {
885            match &mut self.initial {
886                HashMapInitialValue::Incremental { len, rx } => {
887                    if *len > 0 {
888                        match rx.recv().await? {
889                            Some((k, v)) => {
890                                // Provide incremental initial value event.
891                                *len -= 1;
892                                return Ok(Some(HashMapEvent::Set(k, v)));
893                            }
894                            None => return Err(RecvError::Closed),
895                        }
896                    } else {
897                        // Provide incremental initial value complete event.
898                        self.complete = true;
899                        return Ok(Some(HashMapEvent::InitialComplete));
900                    }
901                }
902                HashMapInitialValue::Value(_) => {
903                    panic!("take_initial must be called before recv for non-incremental subscription");
904                }
905            }
906        }
907
908        // Provide change event.
909        if let Some(rx) = &mut self.events {
910            match rx.recv().await? {
911                HashMapEvent::Done => self.events = None,
912                evt => return Ok(Some(evt)),
913            }
914        }
915
916        // Provide done event.
917        if self.done {
918            Ok(None)
919        } else {
920            self.done = true;
921            Ok(Some(HashMapEvent::Done))
922        }
923    }
924}
925
926impl<K, V, Codec> HashMapSubscription<K, V, Codec>
927where
928    K: RemoteSend + Eq + Hash + Clone + Sync,
929    V: RemoteSend + Clone + Sync,
930    Codec: crate::codec::Codec,
931{
932    /// Mirror the hash map that this subscription is observing.
933    ///
934    /// `max_size` specifies the maximum allowed size of the mirrored collection.
935    /// If this size is reached, processing of events is stopped and
936    /// [RecvError::MaxSizeExceeded] is returned.
937    pub fn mirror(mut self, max_size: usize) -> MirroredHashMap<K, V, Codec> {
938        let (tx, _rx) = rch::broadcast::channel::<_, _, { rch::DEFAULT_BUFFER }>(1);
939        let (changed_tx, changed_rx) = watch::channel(());
940        let (dropped_tx, mut dropped_rx) = oneshot::channel();
941
942        // Build initial state.
943        let inner = Arc::new(RwLock::new(Some(MirroredHashMapInner {
944            hm: self.take_initial().unwrap_or_default(),
945            complete: self.is_complete(),
946            done: self.is_done(),
947            error: None,
948            max_size,
949        })));
950        let inner_task = inner.clone();
951
952        // Process change events.
953        let tx_send = tx.clone();
954        exec::spawn(
955            async move {
956                loop {
957                    let event = tokio::select! {
958                        event = self.recv() => event,
959                        _ = &mut dropped_rx => return,
960                    };
961
962                    let mut inner = inner_task.write().await;
963                    let inner = match inner.as_mut() {
964                        Some(inner) => inner,
965                        None => return,
966                    };
967
968                    changed_tx.send_replace(());
969
970                    match event {
971                        Ok(Some(event)) => {
972                            if tx_send.receiver_count() > 0 {
973                                let _ = tx_send.send(event.clone());
974                            }
975
976                            if let Err(err) = inner.handle_event(event) {
977                                inner.error = Some(err);
978                                return;
979                            }
980
981                            if inner.done {
982                                break;
983                            }
984                        }
985                        Ok(None) => break,
986                        Err(err) => {
987                            inner.error = Some(err);
988                            return;
989                        }
990                    }
991                }
992            }
993            .in_current_span(),
994        );
995
996        MirroredHashMap { inner, tx, changed_rx, _dropped_tx: dropped_tx }
997    }
998}
999
1000/// A hash map that is mirroring an observable hash map.
1001pub struct MirroredHashMap<K, V, Codec = crate::codec::Default> {
1002    inner: Arc<RwLock<Option<MirroredHashMapInner<K, V>>>>,
1003    tx: rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
1004    changed_rx: watch::Receiver<()>,
1005    _dropped_tx: oneshot::Sender<()>,
1006}
1007
1008impl<K, V, Codec> fmt::Debug for MirroredHashMap<K, V, Codec> {
1009    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1010        f.debug_struct("MirroredHashMap").finish()
1011    }
1012}
1013
1014impl<K, V, Codec> MirroredHashMap<K, V, Codec>
1015where
1016    K: RemoteSend + Eq + Hash + Clone,
1017    V: RemoteSend + Clone,
1018    Codec: crate::codec::Codec,
1019{
1020    /// Returns a reference to the current value of the hash map.
1021    ///
1022    /// Updates are paused while the read lock is held.
1023    ///
1024    /// This method returns an error if the observed hash map has been dropped
1025    /// or the connection to it failed before it was marked as done by calling
1026    /// [ObservableHashMap::done].
1027    /// In this case the mirrored contents at the point of loss of connection
1028    /// can be obtained using [detach](Self::detach).
1029    pub async fn borrow(&self) -> Result<MirroredHashMapRef<'_, K, V>, RecvError> {
1030        let inner = self.inner.read().await;
1031        let inner = RwLockReadGuard::map(inner, |inner| inner.as_ref().unwrap());
1032        match &inner.error {
1033            None => Ok(MirroredHashMapRef(inner)),
1034            Some(err) => Err(err.clone()),
1035        }
1036    }
1037
1038    /// Returns a reference to the current value of the hash map and marks it as seen.
1039    ///
1040    /// Thus [changed](Self::changed) will not return immediately until the value changes
1041    /// after this method returns.
1042    ///
1043    /// Updates are paused while the read lock is held.
1044    ///
1045    /// This method returns an error if the observed hash map has been dropped
1046    /// or the connection to it failed before it was marked as done by calling
1047    /// [ObservableHashMap::done].
1048    /// In this case the mirrored contents at the point of loss of connection
1049    /// can be obtained using [detach](Self::detach).
1050    pub async fn borrow_and_update(&mut self) -> Result<MirroredHashMapRef<'_, K, V>, RecvError> {
1051        let inner = self.inner.read().await;
1052        self.changed_rx.borrow_and_update();
1053        let inner = RwLockReadGuard::map(inner, |inner| inner.as_ref().unwrap());
1054        match &inner.error {
1055            None => Ok(MirroredHashMapRef(inner)),
1056            Some(err) => Err(err.clone()),
1057        }
1058    }
1059
1060    /// Stops updating the hash map and returns its current contents.
1061    pub async fn detach(self) -> HashMap<K, V> {
1062        let mut inner = self.inner.write().await;
1063        inner.take().unwrap().hm
1064    }
1065
1066    /// Waits for a change and marks the newest value as seen.
1067    ///
1068    /// This also returns when connection to the observed hash map has been lost
1069    /// or the hash map has been marked as done.
1070    pub async fn changed(&mut self) {
1071        let _ = self.changed_rx.changed().await;
1072    }
1073
1074    /// Subscribes to change events from this mirrored hash map.
1075    ///
1076    /// The current contents of the hash map is included with the subscription.
1077    ///
1078    /// `buffer` specifies the maximum size of the event buffer for this subscription in number of events.
1079    /// If it is exceeded the subscription is shed and the receiver gets a [RecvError::Lagged].
1080    pub async fn subscribe(&self, buffer: usize) -> Result<HashMapSubscription<K, V, Codec>, RecvError> {
1081        let view = self.borrow().await?;
1082        let initial = view.clone();
1083        let events = if view.is_done() { None } else { Some(self.tx.subscribe(buffer)) };
1084
1085        Ok(HashMapSubscription::new(HashMapInitialValue::new_value(initial), events))
1086    }
1087
1088    /// Subscribes to change events from this mirrored hash map with incremental sending
1089    /// of the current contents.
1090    ///
1091    /// The current contents of the hash map are sent incrementally.
1092    ///
1093    /// `buffer` specifies the maximum size of the event buffer for this subscription in number of events.
1094    /// If it is exceeded the subscription is shed and the receiver gets a [RecvError::Lagged].
1095    pub async fn subscribe_incremental(
1096        &self, buffer: usize,
1097    ) -> Result<HashMapSubscription<K, V, Codec>, RecvError> {
1098        let view = self.borrow().await?;
1099        let initial = view.clone();
1100        let events = if view.is_done() { None } else { Some(self.tx.subscribe(buffer)) };
1101
1102        Ok(HashMapSubscription::new(
1103            HashMapInitialValue::new_incremental(initial, Arc::new(default_on_err)),
1104            events,
1105        ))
1106    }
1107}
1108
1109impl<K, V, Codec> Drop for MirroredHashMap<K, V, Codec> {
1110    fn drop(&mut self) {
1111        // empty
1112    }
1113}
1114
1115/// A snapshot view of an observable hash map.
1116pub struct MirroredHashMapRef<'a, K, V>(RwLockReadGuard<'a, MirroredHashMapInner<K, V>>);
1117
1118impl<K, V> MirroredHashMapRef<'_, K, V> {
1119    /// Returns `true` if the initial state of an incremental subscription has
1120    /// been reached.
1121    pub fn is_complete(&self) -> bool {
1122        self.0.complete
1123    }
1124
1125    /// Returns `true` if the observed hash map has been marked as done by calling
1126    /// [ObservableHashMap::done] and thus no further changes can occur.
1127    pub fn is_done(&self) -> bool {
1128        self.0.done
1129    }
1130}
1131
1132impl<K, V> fmt::Debug for MirroredHashMapRef<'_, K, V>
1133where
1134    K: fmt::Debug,
1135    V: fmt::Debug,
1136{
1137    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1138        self.0.hm.fmt(f)
1139    }
1140}
1141
1142impl<K, V> Deref for MirroredHashMapRef<'_, K, V> {
1143    type Target = HashMap<K, V>;
1144
1145    fn deref(&self) -> &Self::Target {
1146        &self.0.hm
1147    }
1148}
1149
1150impl<K, V> Drop for MirroredHashMapRef<'_, K, V> {
1151    fn drop(&mut self) {
1152        // required for drop order
1153    }
1154}