1use serde::{Deserialize, Serialize};
22use std::{
23 collections::HashMap,
24 fmt,
25 hash::Hash,
26 iter::FusedIterator,
27 mem::take,
28 ops::{Deref, DerefMut},
29 sync::Arc,
30};
31use tokio::sync::{RwLock, RwLockReadGuard, oneshot, watch};
32use tracing::Instrument;
33
34use super::{ChangeNotifier, ChangeSender, RecvError, SendError, default_on_err, send_event};
35use crate::{exec, prelude::*};
36
37#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub enum HashMapEvent<K, V> {
40 Set(K, V),
42 Remove(K),
44 Clear,
46 ShrinkToFit,
48 Done,
51
52 #[serde(skip)]
58 InitialComplete,
59}
60
61pub struct ObservableHashMap<K, V, Codec = crate::codec::Default> {
66 hm: HashMap<K, V>,
67 tx: rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
68 change: ChangeSender,
69 on_err: Arc<dyn Fn(SendError) + Send + Sync>,
70 done: bool,
71}
72
73impl<K, V, Codec> fmt::Debug for ObservableHashMap<K, V, Codec>
74where
75 K: fmt::Debug,
76 V: fmt::Debug,
77{
78 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
79 self.hm.fmt(f)
80 }
81}
82
83impl<K, V, Codec> From<HashMap<K, V>> for ObservableHashMap<K, V, Codec>
84where
85 K: Clone + RemoteSend,
86 V: Clone + RemoteSend,
87 Codec: crate::codec::Codec,
88{
89 fn from(hm: HashMap<K, V>) -> Self {
90 let (tx, _rx) = rch::broadcast::channel::<_, _, { rch::DEFAULT_BUFFER }>(1);
91 Self { hm, tx, on_err: Arc::new(default_on_err), change: ChangeSender::new(), done: false }
92 }
93}
94
95impl<K, V, Codec> From<ObservableHashMap<K, V, Codec>> for HashMap<K, V> {
96 fn from(ohm: ObservableHashMap<K, V, Codec>) -> Self {
97 ohm.hm
98 }
99}
100
101impl<K, V, Codec> Default for ObservableHashMap<K, V, Codec>
102where
103 K: Clone + RemoteSend,
104 V: Clone + RemoteSend,
105 Codec: crate::codec::Codec,
106{
107 fn default() -> Self {
108 Self::from(HashMap::new())
109 }
110}
111
112impl<K, V, Codec> ObservableHashMap<K, V, Codec>
113where
114 K: Eq + Hash + Clone + RemoteSend,
115 V: Clone + RemoteSend,
116 Codec: crate::codec::Codec,
117{
118 pub fn new() -> Self {
120 Self::default()
121 }
122
123 pub fn set_error_handler<E>(&mut self, on_err: E)
126 where
127 E: Fn(SendError) + Send + Sync + 'static,
128 {
129 self.on_err = Arc::new(on_err);
130 }
131
132 pub fn subscribe(&self, buffer: usize) -> HashMapSubscription<K, V, Codec> {
139 HashMapSubscription::new(
140 HashMapInitialValue::new_value(self.hm.clone()),
141 if self.done { None } else { Some(self.tx.subscribe(buffer)) },
142 )
143 }
144
145 pub fn subscribe_incremental(&self, buffer: usize) -> HashMapSubscription<K, V, Codec> {
153 HashMapSubscription::new(
154 HashMapInitialValue::new_incremental(self.hm.clone(), self.on_err.clone()),
155 if self.done { None } else { Some(self.tx.subscribe(buffer)) },
156 )
157 }
158
159 pub fn subscriber_count(&self) -> usize {
161 self.tx.receiver_count()
162 }
163
164 pub fn notifier(&self) -> ChangeNotifier {
167 self.change.subscribe()
168 }
169
170 pub fn insert(&mut self, k: K, v: V) -> Option<V> {
179 self.assert_not_done();
180 self.change.notify();
181
182 send_event(&self.tx, &*self.on_err, HashMapEvent::Set(k.clone(), v.clone()));
183 self.hm.insert(k, v)
184 }
185
186 pub fn remove<Q>(&mut self, k: &Q) -> Option<V>
195 where
196 K: std::borrow::Borrow<Q>,
197 Q: Hash + Eq,
198 {
199 self.assert_not_done();
200
201 match self.hm.remove_entry(k) {
202 Some((k, v)) => {
203 self.change.notify();
204 send_event(&self.tx, &*self.on_err, HashMapEvent::Remove(k));
205 Some(v)
206 }
207 None => None,
208 }
209 }
210
211 pub fn clear(&mut self) {
218 self.assert_not_done();
219
220 if !self.hm.is_empty() {
221 self.hm.clear();
222 self.change.notify();
223 send_event(&self.tx, &*self.on_err, HashMapEvent::Clear);
224 }
225 }
226
227 pub fn retain<F>(&mut self, mut f: F)
234 where
235 F: FnMut(&K, &mut V) -> bool,
236 {
237 self.assert_not_done();
238
239 self.hm.retain(|k, v| {
240 if f(k, v) {
241 true
242 } else {
243 self.change.notify();
244 send_event(&self.tx, &*self.on_err, HashMapEvent::Remove(k.clone()));
245 false
246 }
247 });
248 }
249
250 pub fn entry(&mut self, key: K) -> Entry<'_, K, V, Codec> {
255 self.assert_not_done();
256
257 match self.hm.entry(key) {
258 std::collections::hash_map::Entry::Occupied(inner) => Entry::Occupied(OccupiedEntry {
259 inner,
260 tx: &self.tx,
261 change: &self.change,
262 on_err: &*self.on_err,
263 }),
264 std::collections::hash_map::Entry::Vacant(inner) => {
265 Entry::Vacant(VacantEntry { inner, tx: &self.tx, change: &self.change, on_err: &*self.on_err })
266 }
267 }
268 }
269
270 pub fn get_mut<Q>(&mut self, k: &Q) -> Option<RefMut<'_, K, V, Codec>>
277 where
278 K: std::borrow::Borrow<Q>,
279 Q: Hash + Eq,
280 {
281 self.assert_not_done();
282
283 match self.hm.get_key_value(k) {
284 Some((key, _)) => {
285 let key = key.clone();
286 let value = self.hm.get_mut(k).unwrap();
287 Some(RefMut {
288 key,
289 value,
290 changed: false,
291 tx: &self.tx,
292 change: &self.change,
293 on_err: &*self.on_err,
294 })
295 }
296 None => None,
297 }
298 }
299
300 pub fn iter_mut(&mut self) -> IterMut<'_, K, V, Codec> {
307 self.assert_not_done();
308
309 IterMut { inner: self.hm.iter_mut(), tx: &self.tx, change: &self.change, on_err: &*self.on_err }
310 }
311
312 pub fn shrink_to_fit(&mut self) {
319 self.assert_not_done();
320 send_event(&self.tx, &*self.on_err, HashMapEvent::ShrinkToFit);
321 self.hm.shrink_to_fit()
322 }
323
324 fn assert_not_done(&self) {
326 if self.done {
327 panic!("observable hash map cannot be changed after done has been called");
328 }
329 }
330
331 pub fn done(&mut self) {
337 if !self.done {
338 send_event(&self.tx, &*self.on_err, HashMapEvent::Done);
339 self.done = true;
340 }
341 }
342
343 pub fn is_done(&self) -> bool {
348 self.done
349 }
350
351 pub fn into_inner(self) -> HashMap<K, V> {
356 self.into()
357 }
358}
359
360impl<K, V, Codec> Deref for ObservableHashMap<K, V, Codec> {
361 type Target = HashMap<K, V>;
362
363 fn deref(&self) -> &Self::Target {
364 &self.hm
365 }
366}
367
368impl<K, V, Codec> Extend<(K, V)> for ObservableHashMap<K, V, Codec>
369where
370 K: Eq + Hash + Clone + RemoteSend,
371 V: Clone + RemoteSend,
372 Codec: crate::codec::Codec,
373{
374 fn extend<I: IntoIterator<Item = (K, V)>>(&mut self, iter: I) {
375 for (k, v) in iter {
376 self.insert(k, v);
377 }
378 }
379}
380
381pub struct RefMut<'a, K, V, Codec>
386where
387 K: Clone + RemoteSend,
388 V: Clone + RemoteSend,
389 Codec: crate::codec::Codec,
390{
391 key: K,
392 value: &'a mut V,
393 changed: bool,
394 tx: &'a rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
395 change: &'a ChangeSender,
396 on_err: &'a dyn Fn(SendError),
397}
398
399impl<K, V, Codec> Deref for RefMut<'_, K, V, Codec>
400where
401 K: Clone + RemoteSend,
402 V: Clone + RemoteSend,
403 Codec: crate::codec::Codec,
404{
405 type Target = V;
406
407 fn deref(&self) -> &Self::Target {
408 self.value
409 }
410}
411
412impl<K, V, Codec> DerefMut for RefMut<'_, K, V, Codec>
413where
414 K: Clone + RemoteSend,
415 V: Clone + RemoteSend,
416 Codec: crate::codec::Codec,
417{
418 fn deref_mut(&mut self) -> &mut Self::Target {
419 self.changed = true;
420 self.value
421 }
422}
423
424impl<K, V, Codec> Drop for RefMut<'_, K, V, Codec>
425where
426 K: Clone + RemoteSend,
427 V: Clone + RemoteSend,
428 Codec: crate::codec::Codec,
429{
430 fn drop(&mut self) {
431 if self.changed {
432 self.change.notify();
433 send_event(self.tx, self.on_err, HashMapEvent::Set(self.key.clone(), self.value.clone()));
434 }
435 }
436}
437
438pub struct IterMut<'a, K, V, Codec = crate::codec::Default> {
442 inner: std::collections::hash_map::IterMut<'a, K, V>,
443 tx: &'a rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
444 change: &'a ChangeSender,
445 on_err: &'a dyn Fn(SendError),
446}
447
448impl<'a, K, V, Codec> Iterator for IterMut<'a, K, V, Codec>
449where
450 K: Clone + RemoteSend,
451 V: Clone + RemoteSend,
452 Codec: crate::codec::Codec,
453{
454 type Item = RefMut<'a, K, V, Codec>;
455
456 fn next(&mut self) -> Option<Self::Item> {
457 match self.inner.next() {
458 Some((key, value)) => Some(RefMut {
459 key: key.clone(),
460 value,
461 changed: false,
462 tx: self.tx,
463 change: self.change,
464 on_err: self.on_err,
465 }),
466 None => None,
467 }
468 }
469
470 fn size_hint(&self) -> (usize, Option<usize>) {
471 self.inner.size_hint()
472 }
473}
474
475impl<K, V, Codec> ExactSizeIterator for IterMut<'_, K, V, Codec>
476where
477 K: Clone + RemoteSend,
478 V: Clone + RemoteSend,
479 Codec: crate::codec::Codec,
480{
481 fn len(&self) -> usize {
482 self.inner.len()
483 }
484}
485
486impl<K, V, Codec> FusedIterator for IterMut<'_, K, V, Codec>
487where
488 K: Clone + RemoteSend,
489 V: Clone + RemoteSend,
490 Codec: crate::codec::Codec,
491{
492}
493
494#[derive(Debug)]
499pub enum Entry<'a, K, V, Codec = crate::codec::Default> {
500 Occupied(OccupiedEntry<'a, K, V, Codec>),
502 Vacant(VacantEntry<'a, K, V, Codec>),
504}
505
506impl<'a, K, V, Codec> Entry<'a, K, V, Codec>
507where
508 K: Clone + RemoteSend,
509 V: Clone + RemoteSend,
510 Codec: crate::codec::Codec,
511{
512 pub fn or_insert(self, default: V) -> RefMut<'a, K, V, Codec> {
515 match self {
516 Self::Occupied(ocu) => ocu.into_mut(),
517 Self::Vacant(vac) => vac.insert(default),
518 }
519 }
520
521 pub fn or_insert_with<F: FnOnce() -> V>(self, default: F) -> RefMut<'a, K, V, Codec> {
524 match self {
525 Self::Occupied(ocu) => ocu.into_mut(),
526 Self::Vacant(vac) => vac.insert(default()),
527 }
528 }
529
530 pub fn or_insert_with_key<F: FnOnce(&K) -> V>(self, default: F) -> RefMut<'a, K, V, Codec> {
533 match self {
534 Self::Occupied(ocu) => ocu.into_mut(),
535 Self::Vacant(vac) => {
536 let value = default(vac.key());
537 vac.insert(value)
538 }
539 }
540 }
541
542 pub fn key(&self) -> &K {
544 match self {
545 Self::Occupied(ocu) => ocu.key(),
546 Self::Vacant(vac) => vac.key(),
547 }
548 }
549
550 pub fn and_modify<F: FnOnce(&mut V)>(mut self, f: F) -> Self {
552 if let Self::Occupied(ocu) = &mut self {
553 let mut value = ocu.get_mut();
554 f(&mut *value);
555 }
556 self
557 }
558}
559
560impl<'a, K, V, Codec> Entry<'a, K, V, Codec>
561where
562 K: Clone + RemoteSend,
563 V: Clone + RemoteSend + Default,
564 Codec: crate::codec::Codec,
565{
566 pub fn or_default(self) -> RefMut<'a, K, V, Codec> {
569 #[allow(clippy::unwrap_or_default)]
570 self.or_insert_with(V::default)
571 }
572}
573
574pub struct OccupiedEntry<'a, K, V, Codec = crate::codec::Default> {
576 inner: std::collections::hash_map::OccupiedEntry<'a, K, V>,
577 tx: &'a rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
578 change: &'a ChangeSender,
579 on_err: &'a dyn Fn(SendError),
580}
581
582impl<K, V, Codec> fmt::Debug for OccupiedEntry<'_, K, V, Codec>
583where
584 K: fmt::Debug,
585 V: fmt::Debug,
586{
587 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
588 self.inner.fmt(f)
589 }
590}
591
592impl<'a, K, V, Codec> OccupiedEntry<'a, K, V, Codec>
593where
594 K: Clone + RemoteSend,
595 V: Clone + RemoteSend,
596 Codec: crate::codec::Codec,
597{
598 pub fn key(&self) -> &K {
600 self.inner.key()
601 }
602
603 pub fn remove_entry(self) -> (K, V) {
607 let (k, v) = self.inner.remove_entry();
608 self.change.notify();
609 send_event(self.tx, self.on_err, HashMapEvent::Remove(k.clone()));
610 (k, v)
611 }
612
613 pub fn get(&self) -> &V {
615 self.inner.get()
616 }
617
618 pub fn get_mut(&mut self) -> RefMut<'_, K, V, Codec> {
620 RefMut {
621 key: self.inner.key().clone(),
622 value: self.inner.get_mut(),
623 changed: false,
624 tx: self.tx,
625 change: self.change,
626 on_err: self.on_err,
627 }
628 }
629
630 pub fn into_mut(self) -> RefMut<'a, K, V, Codec> {
633 let key = self.inner.key().clone();
634 RefMut {
635 key,
636 value: self.inner.into_mut(),
637 changed: false,
638 tx: self.tx,
639 change: self.change,
640 on_err: self.on_err,
641 }
642 }
643
644 pub fn insert(&mut self, value: V) -> V {
648 self.change.notify();
649 send_event(self.tx, self.on_err, HashMapEvent::Set(self.inner.key().clone(), value.clone()));
650 self.inner.insert(value)
651 }
652
653 pub fn remove(self) -> V {
657 let (k, v) = self.inner.remove_entry();
658 self.change.notify();
659 send_event(self.tx, self.on_err, HashMapEvent::Remove(k));
660 v
661 }
662}
663
664pub struct VacantEntry<'a, K, V, Codec = crate::codec::Default> {
666 inner: std::collections::hash_map::VacantEntry<'a, K, V>,
667 tx: &'a rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
668 change: &'a ChangeSender,
669 on_err: &'a dyn Fn(SendError),
670}
671
672impl<K, V, Codec> fmt::Debug for VacantEntry<'_, K, V, Codec>
673where
674 K: fmt::Debug,
675{
676 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
677 self.inner.fmt(f)
678 }
679}
680
681impl<'a, K, V, Codec> VacantEntry<'a, K, V, Codec>
682where
683 K: Clone + RemoteSend,
684 V: Clone + RemoteSend,
685 Codec: crate::codec::Codec,
686{
687 pub fn key(&self) -> &K {
689 self.inner.key()
690 }
691
692 pub fn into_key(self) -> K {
694 self.inner.into_key()
695 }
696
697 pub fn insert(self, value: V) -> RefMut<'a, K, V, Codec> {
701 let key = self.inner.key().clone();
702 self.change.notify();
703 send_event(self.tx, self.on_err, HashMapEvent::Set(key.clone(), value.clone()));
704 let value = self.inner.insert(value);
705 RefMut { key, value, changed: false, tx: self.tx, change: self.change, on_err: self.on_err }
706 }
707}
708
709struct MirroredHashMapInner<K, V> {
710 hm: HashMap<K, V>,
711 complete: bool,
712 done: bool,
713 error: Option<RecvError>,
714 max_size: usize,
715}
716
717impl<K, V> MirroredHashMapInner<K, V>
718where
719 K: Eq + Hash,
720{
721 fn handle_event(&mut self, event: HashMapEvent<K, V>) -> Result<(), RecvError> {
722 match event {
723 HashMapEvent::InitialComplete => {
724 self.complete = true;
725 }
726 HashMapEvent::Set(k, v) => {
727 self.hm.insert(k, v);
728 if self.hm.len() > self.max_size {
729 return Err(RecvError::MaxSizeExceeded(self.max_size));
730 }
731 }
732 HashMapEvent::Remove(k) => {
733 self.hm.remove(&k);
734 }
735 HashMapEvent::Clear => {
736 self.hm.clear();
737 }
738 HashMapEvent::ShrinkToFit => {
739 self.hm.shrink_to_fit();
740 }
741 HashMapEvent::Done => {
742 self.done = true;
743 }
744 }
745 Ok(())
746 }
747}
748
749#[derive(Debug, Serialize, Deserialize)]
751#[serde(bound(serialize = "K: RemoteSend + Eq + Hash, V: RemoteSend, Codec: crate::codec::Codec"))]
752#[serde(bound(deserialize = "K: RemoteSend + Eq + Hash, V: RemoteSend, Codec: crate::codec::Codec"))]
753enum HashMapInitialValue<K, V, Codec = crate::codec::Default> {
754 Value(HashMap<K, V>),
756 Incremental {
758 len: usize,
760 rx: rch::mpsc::Receiver<(K, V), Codec>,
762 },
763}
764
765impl<K, V, Codec> HashMapInitialValue<K, V, Codec>
766where
767 K: RemoteSend + Eq + Hash + Clone,
768 V: RemoteSend + Clone,
769 Codec: crate::codec::Codec,
770{
771 fn new_value(hm: HashMap<K, V>) -> Self {
773 Self::Value(hm)
774 }
775
776 fn new_incremental(hm: HashMap<K, V>, on_err: Arc<dyn Fn(SendError) + Send + Sync>) -> Self {
778 let (tx, rx) = rch::mpsc::channel(128);
779 let len = hm.len();
780
781 exec::spawn(
782 async move {
783 for (k, v) in hm.into_iter() {
784 match tx.send((k, v)).await {
785 Ok(_) => (),
786 Err(err) if err.is_disconnected() => break,
787 Err(err) => match err.try_into() {
788 Ok(err) => (on_err)(err),
789 Err(_) => unreachable!(),
790 },
791 }
792 }
793 }
794 .in_current_span(),
795 );
796
797 Self::Incremental { len, rx }
798 }
799}
800
801#[derive(Debug, Serialize, Deserialize)]
811#[serde(bound(serialize = "K: RemoteSend + Eq + Hash, V: RemoteSend, Codec: crate::codec::Codec"))]
812#[serde(bound(deserialize = "K: RemoteSend + Eq + Hash, V: RemoteSend, Codec: crate::codec::Codec"))]
813pub struct HashMapSubscription<K, V, Codec = crate::codec::Default> {
814 initial: HashMapInitialValue<K, V, Codec>,
816 #[serde(skip, default)]
818 complete: bool,
819 events: Option<rch::broadcast::Receiver<HashMapEvent<K, V>, Codec>>,
823 #[serde(skip, default)]
825 done: bool,
826}
827
828impl<K, V, Codec> HashMapSubscription<K, V, Codec>
829where
830 K: RemoteSend + Eq + Hash + Clone,
831 V: RemoteSend + Clone,
832 Codec: crate::codec::Codec,
833{
834 fn new(
835 initial: HashMapInitialValue<K, V, Codec>,
836 events: Option<rch::broadcast::Receiver<HashMapEvent<K, V>, Codec>>,
837 ) -> Self {
838 Self { initial, complete: false, events, done: false }
839 }
840
841 pub fn is_incremental(&self) -> bool {
843 matches!(self.initial, HashMapInitialValue::Incremental { .. })
844 }
845
846 pub fn is_complete(&self) -> bool {
850 self.complete
851 }
852
853 pub fn is_done(&self) -> bool {
856 self.events.is_none() || self.done
857 }
858
859 pub fn take_initial(&mut self) -> Option<HashMap<K, V>> {
868 match &mut self.initial {
869 HashMapInitialValue::Value(value) if !self.complete => {
870 self.complete = true;
871 Some(take(value))
872 }
873 _ => None,
874 }
875 }
876
877 pub async fn recv(&mut self) -> Result<Option<HashMapEvent<K, V>>, RecvError> {
883 if !self.complete {
885 match &mut self.initial {
886 HashMapInitialValue::Incremental { len, rx } => {
887 if *len > 0 {
888 match rx.recv().await? {
889 Some((k, v)) => {
890 *len -= 1;
892 return Ok(Some(HashMapEvent::Set(k, v)));
893 }
894 None => return Err(RecvError::Closed),
895 }
896 } else {
897 self.complete = true;
899 return Ok(Some(HashMapEvent::InitialComplete));
900 }
901 }
902 HashMapInitialValue::Value(_) => {
903 panic!("take_initial must be called before recv for non-incremental subscription");
904 }
905 }
906 }
907
908 if let Some(rx) = &mut self.events {
910 match rx.recv().await? {
911 HashMapEvent::Done => self.events = None,
912 evt => return Ok(Some(evt)),
913 }
914 }
915
916 if self.done {
918 Ok(None)
919 } else {
920 self.done = true;
921 Ok(Some(HashMapEvent::Done))
922 }
923 }
924}
925
926impl<K, V, Codec> HashMapSubscription<K, V, Codec>
927where
928 K: RemoteSend + Eq + Hash + Clone + Sync,
929 V: RemoteSend + Clone + Sync,
930 Codec: crate::codec::Codec,
931{
932 pub fn mirror(mut self, max_size: usize) -> MirroredHashMap<K, V, Codec> {
938 let (tx, _rx) = rch::broadcast::channel::<_, _, { rch::DEFAULT_BUFFER }>(1);
939 let (changed_tx, changed_rx) = watch::channel(());
940 let (dropped_tx, mut dropped_rx) = oneshot::channel();
941
942 let inner = Arc::new(RwLock::new(Some(MirroredHashMapInner {
944 hm: self.take_initial().unwrap_or_default(),
945 complete: self.is_complete(),
946 done: self.is_done(),
947 error: None,
948 max_size,
949 })));
950 let inner_task = inner.clone();
951
952 let tx_send = tx.clone();
954 exec::spawn(
955 async move {
956 loop {
957 let event = tokio::select! {
958 event = self.recv() => event,
959 _ = &mut dropped_rx => return,
960 };
961
962 let mut inner = inner_task.write().await;
963 let inner = match inner.as_mut() {
964 Some(inner) => inner,
965 None => return,
966 };
967
968 changed_tx.send_replace(());
969
970 match event {
971 Ok(Some(event)) => {
972 if tx_send.receiver_count() > 0 {
973 let _ = tx_send.send(event.clone());
974 }
975
976 if let Err(err) = inner.handle_event(event) {
977 inner.error = Some(err);
978 return;
979 }
980
981 if inner.done {
982 break;
983 }
984 }
985 Ok(None) => break,
986 Err(err) => {
987 inner.error = Some(err);
988 return;
989 }
990 }
991 }
992 }
993 .in_current_span(),
994 );
995
996 MirroredHashMap { inner, tx, changed_rx, _dropped_tx: dropped_tx }
997 }
998}
999
1000pub struct MirroredHashMap<K, V, Codec = crate::codec::Default> {
1002 inner: Arc<RwLock<Option<MirroredHashMapInner<K, V>>>>,
1003 tx: rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
1004 changed_rx: watch::Receiver<()>,
1005 _dropped_tx: oneshot::Sender<()>,
1006}
1007
1008impl<K, V, Codec> fmt::Debug for MirroredHashMap<K, V, Codec> {
1009 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1010 f.debug_struct("MirroredHashMap").finish()
1011 }
1012}
1013
1014impl<K, V, Codec> MirroredHashMap<K, V, Codec>
1015where
1016 K: RemoteSend + Eq + Hash + Clone,
1017 V: RemoteSend + Clone,
1018 Codec: crate::codec::Codec,
1019{
1020 pub async fn borrow(&self) -> Result<MirroredHashMapRef<'_, K, V>, RecvError> {
1030 let inner = self.inner.read().await;
1031 let inner = RwLockReadGuard::map(inner, |inner| inner.as_ref().unwrap());
1032 match &inner.error {
1033 None => Ok(MirroredHashMapRef(inner)),
1034 Some(err) => Err(err.clone()),
1035 }
1036 }
1037
1038 pub async fn borrow_and_update(&mut self) -> Result<MirroredHashMapRef<'_, K, V>, RecvError> {
1051 let inner = self.inner.read().await;
1052 self.changed_rx.borrow_and_update();
1053 let inner = RwLockReadGuard::map(inner, |inner| inner.as_ref().unwrap());
1054 match &inner.error {
1055 None => Ok(MirroredHashMapRef(inner)),
1056 Some(err) => Err(err.clone()),
1057 }
1058 }
1059
1060 pub async fn detach(self) -> HashMap<K, V> {
1062 let mut inner = self.inner.write().await;
1063 inner.take().unwrap().hm
1064 }
1065
1066 pub async fn changed(&mut self) {
1071 let _ = self.changed_rx.changed().await;
1072 }
1073
1074 pub async fn subscribe(&self, buffer: usize) -> Result<HashMapSubscription<K, V, Codec>, RecvError> {
1081 let view = self.borrow().await?;
1082 let initial = view.clone();
1083 let events = if view.is_done() { None } else { Some(self.tx.subscribe(buffer)) };
1084
1085 Ok(HashMapSubscription::new(HashMapInitialValue::new_value(initial), events))
1086 }
1087
1088 pub async fn subscribe_incremental(
1096 &self, buffer: usize,
1097 ) -> Result<HashMapSubscription<K, V, Codec>, RecvError> {
1098 let view = self.borrow().await?;
1099 let initial = view.clone();
1100 let events = if view.is_done() { None } else { Some(self.tx.subscribe(buffer)) };
1101
1102 Ok(HashMapSubscription::new(
1103 HashMapInitialValue::new_incremental(initial, Arc::new(default_on_err)),
1104 events,
1105 ))
1106 }
1107}
1108
1109impl<K, V, Codec> Drop for MirroredHashMap<K, V, Codec> {
1110 fn drop(&mut self) {
1111 }
1113}
1114
1115pub struct MirroredHashMapRef<'a, K, V>(RwLockReadGuard<'a, MirroredHashMapInner<K, V>>);
1117
1118impl<K, V> MirroredHashMapRef<'_, K, V> {
1119 pub fn is_complete(&self) -> bool {
1122 self.0.complete
1123 }
1124
1125 pub fn is_done(&self) -> bool {
1128 self.0.done
1129 }
1130}
1131
1132impl<K, V> fmt::Debug for MirroredHashMapRef<'_, K, V>
1133where
1134 K: fmt::Debug,
1135 V: fmt::Debug,
1136{
1137 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1138 self.0.hm.fmt(f)
1139 }
1140}
1141
1142impl<K, V> Deref for MirroredHashMapRef<'_, K, V> {
1143 type Target = HashMap<K, V>;
1144
1145 fn deref(&self) -> &Self::Target {
1146 &self.0.hm
1147 }
1148}
1149
1150impl<K, V> Drop for MirroredHashMapRef<'_, K, V> {
1151 fn drop(&mut self) {
1152 }
1154}