1use remoc::prelude::*;
22use serde::{Deserialize, Serialize};
23use std::{
24 collections::HashMap,
25 fmt,
26 hash::Hash,
27 iter::FusedIterator,
28 mem::take,
29 ops::{Deref, DerefMut},
30 sync::Arc,
31};
32use tokio::sync::{oneshot, watch, RwLock, RwLockReadGuard};
33
34use crate::{default_on_err, send_event, ChangeNotifier, ChangeSender, RecvError, SendError};
35
36#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
38pub enum HashMapEvent<K, V> {
39 #[serde(skip)]
42 InitialComplete,
43 Set(K, V),
45 Remove(K),
47 Clear,
49 ShrinkToFit,
51 Done,
54}
55
56pub struct ObservableHashMap<K, V, Codec = remoc::codec::Default> {
61 hm: HashMap<K, V>,
62 tx: rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
63 change: ChangeSender,
64 on_err: Arc<dyn Fn(SendError) + Send + Sync>,
65 done: bool,
66}
67
68impl<K, V, Codec> fmt::Debug for ObservableHashMap<K, V, Codec>
69where
70 K: fmt::Debug,
71 V: fmt::Debug,
72{
73 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
74 self.hm.fmt(f)
75 }
76}
77
78impl<K, V, Codec> From<HashMap<K, V>> for ObservableHashMap<K, V, Codec>
79where
80 K: Clone + RemoteSend,
81 V: Clone + RemoteSend,
82 Codec: remoc::codec::Codec,
83{
84 fn from(hm: HashMap<K, V>) -> Self {
85 let (tx, _rx) = rch::broadcast::channel::<_, _, rch::buffer::Default>(1);
86 Self { hm, tx, on_err: Arc::new(default_on_err), change: ChangeSender::new(), done: false }
87 }
88}
89
90impl<K, V, Codec> From<ObservableHashMap<K, V, Codec>> for HashMap<K, V> {
91 fn from(ohm: ObservableHashMap<K, V, Codec>) -> Self {
92 ohm.hm
93 }
94}
95
96impl<K, V, Codec> Default for ObservableHashMap<K, V, Codec>
97where
98 K: Clone + RemoteSend,
99 V: Clone + RemoteSend,
100 Codec: remoc::codec::Codec,
101{
102 fn default() -> Self {
103 Self::from(HashMap::new())
104 }
105}
106
107impl<K, V, Codec> ObservableHashMap<K, V, Codec>
108where
109 K: Eq + Hash + Clone + RemoteSend,
110 V: Clone + RemoteSend,
111 Codec: remoc::codec::Codec,
112{
113 pub fn new() -> Self {
115 Self::default()
116 }
117
118 pub fn set_error_handler<E>(&mut self, on_err: E)
121 where
122 E: Fn(SendError) + Send + Sync + 'static,
123 {
124 self.on_err = Arc::new(on_err);
125 }
126
127 pub fn subscribe(&self, buffer: usize) -> HashMapSubscription<K, V, Codec> {
134 HashMapSubscription::new(
135 HashMapInitialValue::new_value(self.hm.clone()),
136 if self.done { None } else { Some(self.tx.subscribe(buffer)) },
137 )
138 }
139
140 pub fn subscribe_incremental(&self, buffer: usize) -> HashMapSubscription<K, V, Codec> {
148 HashMapSubscription::new(
149 HashMapInitialValue::new_incremental(self.hm.clone(), self.on_err.clone()),
150 if self.done { None } else { Some(self.tx.subscribe(buffer)) },
151 )
152 }
153
154 pub fn subscriber_count(&self) -> usize {
156 self.tx.receiver_count()
157 }
158
159 pub fn notifier(&self) -> ChangeNotifier {
162 self.change.subscribe()
163 }
164
165 pub fn insert(&mut self, k: K, v: V) -> Option<V> {
174 self.assert_not_done();
175 self.change.notify();
176
177 send_event(&self.tx, &*self.on_err, HashMapEvent::Set(k.clone(), v.clone()));
178 self.hm.insert(k, v)
179 }
180
181 pub fn remove<Q>(&mut self, k: &Q) -> Option<V>
190 where
191 K: std::borrow::Borrow<Q>,
192 Q: Hash + Eq,
193 {
194 self.assert_not_done();
195
196 match self.hm.remove_entry(k) {
197 Some((k, v)) => {
198 self.change.notify();
199 send_event(&self.tx, &*self.on_err, HashMapEvent::Remove(k));
200 Some(v)
201 }
202 None => None,
203 }
204 }
205
206 pub fn clear(&mut self) {
213 self.assert_not_done();
214
215 if !self.hm.is_empty() {
216 self.hm.clear();
217 self.change.notify();
218 send_event(&self.tx, &*self.on_err, HashMapEvent::Clear);
219 }
220 }
221
222 pub fn retain<F>(&mut self, mut f: F)
229 where
230 F: FnMut(&K, &mut V) -> bool,
231 {
232 self.assert_not_done();
233
234 self.hm.retain(|k, v| {
235 if f(k, v) {
236 true
237 } else {
238 self.change.notify();
239 send_event(&self.tx, &*self.on_err, HashMapEvent::Remove(k.clone()));
240 false
241 }
242 });
243 }
244
245 pub fn entry(&mut self, key: K) -> Entry<'_, K, V, Codec> {
250 self.assert_not_done();
251
252 match self.hm.entry(key) {
253 std::collections::hash_map::Entry::Occupied(inner) => Entry::Occupied(OccupiedEntry {
254 inner,
255 tx: &self.tx,
256 change: &self.change,
257 on_err: &*self.on_err,
258 }),
259 std::collections::hash_map::Entry::Vacant(inner) => {
260 Entry::Vacant(VacantEntry { inner, tx: &self.tx, change: &self.change, on_err: &*self.on_err })
261 }
262 }
263 }
264
265 pub fn get_mut<Q>(&mut self, k: &Q) -> Option<RefMut<'_, K, V, Codec>>
272 where
273 K: std::borrow::Borrow<Q>,
274 Q: Hash + Eq,
275 {
276 self.assert_not_done();
277
278 match self.hm.get_key_value(k) {
279 Some((key, _)) => {
280 let key = key.clone();
281 let value = self.hm.get_mut(k).unwrap();
282 Some(RefMut {
283 key,
284 value,
285 changed: false,
286 tx: &self.tx,
287 change: &self.change,
288 on_err: &*self.on_err,
289 })
290 }
291 None => None,
292 }
293 }
294
295 pub fn iter_mut(&mut self) -> IterMut<'_, K, V, Codec> {
302 self.assert_not_done();
303
304 IterMut { inner: self.hm.iter_mut(), tx: &self.tx, change: &self.change, on_err: &*self.on_err }
305 }
306
307 pub fn shrink_to_fit(&mut self) {
314 self.assert_not_done();
315 send_event(&self.tx, &*self.on_err, HashMapEvent::ShrinkToFit);
316 self.hm.shrink_to_fit()
317 }
318
319 fn assert_not_done(&self) {
321 if self.done {
322 panic!("observable hash map cannot be changed after done has been called");
323 }
324 }
325
326 pub fn done(&mut self) {
332 if !self.done {
333 send_event(&self.tx, &*self.on_err, HashMapEvent::Done);
334 self.done = true;
335 }
336 }
337
338 pub fn is_done(&self) -> bool {
343 self.done
344 }
345
346 pub fn into_inner(self) -> HashMap<K, V> {
351 self.into()
352 }
353}
354
355impl<K, V, Codec> Deref for ObservableHashMap<K, V, Codec> {
356 type Target = HashMap<K, V>;
357
358 fn deref(&self) -> &Self::Target {
359 &self.hm
360 }
361}
362
363impl<K, V, Codec> Extend<(K, V)> for ObservableHashMap<K, V, Codec>
364where
365 K: Eq + Hash + Clone + RemoteSend,
366 V: Clone + RemoteSend,
367 Codec: remoc::codec::Codec,
368{
369 fn extend<I: IntoIterator<Item = (K, V)>>(&mut self, iter: I) {
370 for (k, v) in iter {
371 self.insert(k, v);
372 }
373 }
374}
375
376pub struct RefMut<'a, K, V, Codec>
381where
382 K: Clone + RemoteSend,
383 V: Clone + RemoteSend,
384 Codec: remoc::codec::Codec,
385{
386 key: K,
387 value: &'a mut V,
388 changed: bool,
389 tx: &'a rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
390 change: &'a ChangeSender,
391 on_err: &'a dyn Fn(SendError),
392}
393
394impl<'a, K, V, Codec> Deref for RefMut<'a, K, V, Codec>
395where
396 K: Clone + RemoteSend,
397 V: Clone + RemoteSend,
398 Codec: remoc::codec::Codec,
399{
400 type Target = V;
401
402 fn deref(&self) -> &Self::Target {
403 self.value
404 }
405}
406
407impl<'a, K, V, Codec> DerefMut for RefMut<'a, K, V, Codec>
408where
409 K: Clone + RemoteSend,
410 V: Clone + RemoteSend,
411 Codec: remoc::codec::Codec,
412{
413 fn deref_mut(&mut self) -> &mut Self::Target {
414 self.changed = true;
415 self.value
416 }
417}
418
419impl<'a, K, V, Codec> Drop for RefMut<'a, K, V, Codec>
420where
421 K: Clone + RemoteSend,
422 V: Clone + RemoteSend,
423 Codec: remoc::codec::Codec,
424{
425 fn drop(&mut self) {
426 if self.changed {
427 self.change.notify();
428 send_event(self.tx, self.on_err, HashMapEvent::Set(self.key.clone(), self.value.clone()));
429 }
430 }
431}
432
433pub struct IterMut<'a, K, V, Codec = remoc::codec::Default> {
437 inner: std::collections::hash_map::IterMut<'a, K, V>,
438 tx: &'a rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
439 change: &'a ChangeSender,
440 on_err: &'a dyn Fn(SendError),
441}
442
443impl<'a, K, V, Codec> Iterator for IterMut<'a, K, V, Codec>
444where
445 K: Clone + RemoteSend,
446 V: Clone + RemoteSend,
447 Codec: remoc::codec::Codec,
448{
449 type Item = RefMut<'a, K, V, Codec>;
450
451 fn next(&mut self) -> Option<Self::Item> {
452 match self.inner.next() {
453 Some((key, value)) => Some(RefMut {
454 key: key.clone(),
455 value,
456 changed: false,
457 tx: self.tx,
458 change: self.change,
459 on_err: self.on_err,
460 }),
461 None => None,
462 }
463 }
464
465 fn size_hint(&self) -> (usize, Option<usize>) {
466 self.inner.size_hint()
467 }
468}
469
470impl<'a, K, V, Codec> ExactSizeIterator for IterMut<'a, K, V, Codec>
471where
472 K: Clone + RemoteSend,
473 V: Clone + RemoteSend,
474 Codec: remoc::codec::Codec,
475{
476 fn len(&self) -> usize {
477 self.inner.len()
478 }
479}
480
481impl<'a, K, V, Codec> FusedIterator for IterMut<'a, K, V, Codec>
482where
483 K: Clone + RemoteSend,
484 V: Clone + RemoteSend,
485 Codec: remoc::codec::Codec,
486{
487}
488
489#[derive(Debug)]
494pub enum Entry<'a, K, V, Codec = remoc::codec::Default> {
495 Occupied(OccupiedEntry<'a, K, V, Codec>),
497 Vacant(VacantEntry<'a, K, V, Codec>),
499}
500
501impl<'a, K, V, Codec> Entry<'a, K, V, Codec>
502where
503 K: Clone + RemoteSend,
504 V: Clone + RemoteSend,
505 Codec: remoc::codec::Codec,
506{
507 pub fn or_insert(self, default: V) -> RefMut<'a, K, V, Codec> {
510 match self {
511 Self::Occupied(ocu) => ocu.into_mut(),
512 Self::Vacant(vac) => vac.insert(default),
513 }
514 }
515
516 pub fn or_insert_with<F: FnOnce() -> V>(self, default: F) -> RefMut<'a, K, V, Codec> {
519 match self {
520 Self::Occupied(ocu) => ocu.into_mut(),
521 Self::Vacant(vac) => vac.insert(default()),
522 }
523 }
524
525 pub fn or_insert_with_key<F: FnOnce(&K) -> V>(self, default: F) -> RefMut<'a, K, V, Codec> {
528 match self {
529 Self::Occupied(ocu) => ocu.into_mut(),
530 Self::Vacant(vac) => {
531 let value = default(vac.key());
532 vac.insert(value)
533 }
534 }
535 }
536
537 pub fn key(&self) -> &K {
539 match self {
540 Self::Occupied(ocu) => ocu.key(),
541 Self::Vacant(vac) => vac.key(),
542 }
543 }
544
545 pub fn and_modify<F: FnOnce(&mut V)>(mut self, f: F) -> Self {
547 if let Self::Occupied(ocu) = &mut self {
548 let mut value = ocu.get_mut();
549 f(&mut *value);
550 }
551 self
552 }
553}
554
555impl<'a, K, V, Codec> Entry<'a, K, V, Codec>
556where
557 K: Clone + RemoteSend,
558 V: Clone + RemoteSend + Default,
559 Codec: remoc::codec::Codec,
560{
561 pub fn or_default(self) -> RefMut<'a, K, V, Codec> {
564 self.or_insert_with(V::default)
565 }
566}
567
568pub struct OccupiedEntry<'a, K, V, Codec = remoc::codec::Default> {
570 inner: std::collections::hash_map::OccupiedEntry<'a, K, V>,
571 tx: &'a rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
572 change: &'a ChangeSender,
573 on_err: &'a dyn Fn(SendError),
574}
575
576impl<'a, K, V, Codec> fmt::Debug for OccupiedEntry<'a, K, V, Codec>
577where
578 K: fmt::Debug,
579 V: fmt::Debug,
580{
581 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
582 self.inner.fmt(f)
583 }
584}
585
586impl<'a, K, V, Codec> OccupiedEntry<'a, K, V, Codec>
587where
588 K: Clone + RemoteSend,
589 V: Clone + RemoteSend,
590 Codec: remoc::codec::Codec,
591{
592 pub fn key(&self) -> &K {
594 self.inner.key()
595 }
596
597 pub fn remove_entry(self) -> (K, V) {
601 let (k, v) = self.inner.remove_entry();
602 self.change.notify();
603 send_event(self.tx, &*self.on_err, HashMapEvent::Remove(k.clone()));
604 (k, v)
605 }
606
607 pub fn get(&self) -> &V {
609 self.inner.get()
610 }
611
612 pub fn get_mut(&mut self) -> RefMut<'_, K, V, Codec> {
614 RefMut {
615 key: self.inner.key().clone(),
616 value: self.inner.get_mut(),
617 changed: false,
618 tx: self.tx,
619 change: self.change,
620 on_err: &*self.on_err,
621 }
622 }
623
624 pub fn into_mut(self) -> RefMut<'a, K, V, Codec> {
627 let key = self.inner.key().clone();
628 RefMut {
629 key,
630 value: self.inner.into_mut(),
631 changed: false,
632 tx: self.tx,
633 change: self.change,
634 on_err: &*self.on_err,
635 }
636 }
637
638 pub fn insert(&mut self, value: V) -> V {
642 self.change.notify();
643 send_event(self.tx, &*self.on_err, HashMapEvent::Set(self.inner.key().clone(), value.clone()));
644 self.inner.insert(value)
645 }
646
647 pub fn remove(self) -> V {
651 let (k, v) = self.inner.remove_entry();
652 self.change.notify();
653 send_event(self.tx, &*self.on_err, HashMapEvent::Remove(k));
654 v
655 }
656}
657
658pub struct VacantEntry<'a, K, V, Codec = remoc::codec::Default> {
660 inner: std::collections::hash_map::VacantEntry<'a, K, V>,
661 tx: &'a rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
662 change: &'a ChangeSender,
663 on_err: &'a dyn Fn(SendError),
664}
665
666impl<'a, K, V, Codec> fmt::Debug for VacantEntry<'a, K, V, Codec>
667where
668 K: fmt::Debug,
669{
670 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
671 self.inner.fmt(f)
672 }
673}
674
675impl<'a, K, V, Codec> VacantEntry<'a, K, V, Codec>
676where
677 K: Clone + RemoteSend,
678 V: Clone + RemoteSend,
679 Codec: remoc::codec::Codec,
680{
681 pub fn key(&self) -> &K {
683 self.inner.key()
684 }
685
686 pub fn into_key(self) -> K {
688 self.inner.into_key()
689 }
690
691 pub fn insert(self, value: V) -> RefMut<'a, K, V, Codec> {
695 let key = self.inner.key().clone();
696 self.change.notify();
697 send_event(self.tx, &*self.on_err, HashMapEvent::Set(key.clone(), value.clone()));
698 let value = self.inner.insert(value);
699 RefMut { key, value, changed: false, tx: self.tx, change: self.change, on_err: &*self.on_err }
700 }
701}
702
703struct MirroredHashMapInner<K, V> {
704 hm: HashMap<K, V>,
705 complete: bool,
706 done: bool,
707 error: Option<RecvError>,
708 max_size: usize,
709}
710
711impl<K, V> MirroredHashMapInner<K, V>
712where
713 K: Eq + Hash,
714{
715 fn handle_event(&mut self, event: HashMapEvent<K, V>) -> Result<(), RecvError> {
716 match event {
717 HashMapEvent::InitialComplete => {
718 self.complete = true;
719 }
720 HashMapEvent::Set(k, v) => {
721 self.hm.insert(k, v);
722 if self.hm.len() > self.max_size {
723 return Err(RecvError::MaxSizeExceeded(self.max_size));
724 }
725 }
726 HashMapEvent::Remove(k) => {
727 self.hm.remove(&k);
728 }
729 HashMapEvent::Clear => {
730 self.hm.clear();
731 }
732 HashMapEvent::ShrinkToFit => {
733 self.hm.shrink_to_fit();
734 }
735 HashMapEvent::Done => {
736 self.done = true;
737 }
738 }
739 Ok(())
740 }
741}
742
743#[derive(Debug, Serialize, Deserialize)]
745#[serde(bound(serialize = "K: RemoteSend + Eq + Hash, V: RemoteSend, Codec: remoc::codec::Codec"))]
746#[serde(bound(deserialize = "K: RemoteSend + Eq + Hash, V: RemoteSend, Codec: remoc::codec::Codec"))]
747enum HashMapInitialValue<K, V, Codec = remoc::codec::Default> {
748 Value(HashMap<K, V>),
750 Incremental {
752 len: usize,
754 rx: rch::mpsc::Receiver<(K, V), Codec>,
756 },
757}
758
759impl<K, V, Codec> HashMapInitialValue<K, V, Codec>
760where
761 K: RemoteSend + Eq + Hash + Clone,
762 V: RemoteSend + Clone,
763 Codec: remoc::codec::Codec,
764{
765 fn new_value(hm: HashMap<K, V>) -> Self {
767 Self::Value(hm)
768 }
769
770 fn new_incremental(hm: HashMap<K, V>, on_err: Arc<dyn Fn(SendError) + Send + Sync>) -> Self {
772 let (tx, rx) = rch::mpsc::channel(128);
773 let len = hm.len();
774
775 tokio::spawn(async move {
776 for (k, v) in hm.into_iter() {
777 match tx.send((k, v)).await {
778 Ok(()) => (),
779 Err(err) if err.is_disconnected() => break,
780 Err(err) => match err.try_into() {
781 Ok(err) => (on_err)(err),
782 Err(_) => unreachable!(),
783 },
784 }
785 }
786 });
787
788 Self::Incremental { len, rx }
789 }
790}
791
792#[derive(Debug, Serialize, Deserialize)]
802#[serde(bound(serialize = "K: RemoteSend + Eq + Hash, V: RemoteSend, Codec: remoc::codec::Codec"))]
803#[serde(bound(deserialize = "K: RemoteSend + Eq + Hash, V: RemoteSend, Codec: remoc::codec::Codec"))]
804pub struct HashMapSubscription<K, V, Codec = remoc::codec::Default> {
805 initial: HashMapInitialValue<K, V, Codec>,
807 #[serde(skip, default)]
809 complete: bool,
810 events: Option<rch::broadcast::Receiver<HashMapEvent<K, V>, Codec>>,
814 #[serde(skip, default)]
816 done: bool,
817}
818
819impl<K, V, Codec> HashMapSubscription<K, V, Codec>
820where
821 K: RemoteSend + Eq + Hash + Clone,
822 V: RemoteSend + Clone,
823 Codec: remoc::codec::Codec,
824{
825 fn new(
826 initial: HashMapInitialValue<K, V, Codec>,
827 events: Option<rch::broadcast::Receiver<HashMapEvent<K, V>, Codec>>,
828 ) -> Self {
829 Self { initial, complete: false, events, done: false }
830 }
831
832 pub fn is_incremental(&self) -> bool {
834 matches!(self.initial, HashMapInitialValue::Incremental { .. })
835 }
836
837 pub fn is_complete(&self) -> bool {
841 self.complete
842 }
843
844 pub fn is_done(&self) -> bool {
847 self.events.is_none() || self.done
848 }
849
850 pub fn take_initial(&mut self) -> Option<HashMap<K, V>> {
859 match &mut self.initial {
860 HashMapInitialValue::Value(value) if !self.complete => {
861 self.complete = true;
862 Some(take(value))
863 }
864 _ => None,
865 }
866 }
867
868 pub async fn recv(&mut self) -> Result<Option<HashMapEvent<K, V>>, RecvError> {
874 if !self.complete {
876 match &mut self.initial {
877 HashMapInitialValue::Incremental { len, rx } => {
878 if *len > 0 {
879 match rx.recv().await? {
880 Some((k, v)) => {
881 *len -= 1;
883 return Ok(Some(HashMapEvent::Set(k, v)));
884 }
885 None => return Err(RecvError::Closed),
886 }
887 } else {
888 self.complete = true;
890 return Ok(Some(HashMapEvent::InitialComplete));
891 }
892 }
893 HashMapInitialValue::Value(_) => {
894 panic!("take_initial must be called before recv for non-incremental subscription");
895 }
896 }
897 }
898
899 if let Some(rx) = &mut self.events {
901 match rx.recv().await? {
902 HashMapEvent::Done => self.events = None,
903 evt => return Ok(Some(evt)),
904 }
905 }
906
907 if self.done {
909 Ok(None)
910 } else {
911 self.done = true;
912 Ok(Some(HashMapEvent::Done))
913 }
914 }
915}
916
917impl<K, V, Codec> HashMapSubscription<K, V, Codec>
918where
919 K: RemoteSend + Eq + Hash + Clone + Sync,
920 V: RemoteSend + Clone + Sync,
921 Codec: remoc::codec::Codec,
922{
923 pub fn mirror(mut self, max_size: usize) -> MirroredHashMap<K, V, Codec> {
929 let (tx, _rx) = rch::broadcast::channel::<_, _, rch::buffer::Default>(1);
930 let (changed_tx, changed_rx) = watch::channel(());
931 let (dropped_tx, mut dropped_rx) = oneshot::channel();
932
933 let inner = Arc::new(RwLock::new(Some(MirroredHashMapInner {
935 hm: self.take_initial().unwrap_or_default(),
936 complete: self.is_complete(),
937 done: self.is_done(),
938 error: None,
939 max_size,
940 })));
941 let inner_task = inner.clone();
942
943 let tx_send = tx.clone();
945 tokio::spawn(async move {
946 loop {
947 let event = tokio::select! {
948 event = self.recv() => event,
949 _ = &mut dropped_rx => return,
950 };
951
952 let mut inner = inner_task.write().await;
953 let mut inner = match inner.as_mut() {
954 Some(inner) => inner,
955 None => return,
956 };
957
958 changed_tx.send_replace(());
959
960 match event {
961 Ok(Some(event)) => {
962 if tx_send.receiver_count() > 0 {
963 let _ = tx_send.send(event.clone());
964 }
965
966 if let Err(err) = inner.handle_event(event) {
967 inner.error = Some(err);
968 return;
969 }
970
971 if inner.done {
972 break;
973 }
974 }
975 Ok(None) => break,
976 Err(err) => {
977 inner.error = Some(err);
978 return;
979 }
980 }
981 }
982 });
983
984 MirroredHashMap { inner, tx, changed_rx, _dropped_tx: dropped_tx }
985 }
986}
987
988pub struct MirroredHashMap<K, V, Codec = remoc::codec::Default> {
990 inner: Arc<RwLock<Option<MirroredHashMapInner<K, V>>>>,
991 tx: rch::broadcast::Sender<HashMapEvent<K, V>, Codec>,
992 changed_rx: watch::Receiver<()>,
993 _dropped_tx: oneshot::Sender<()>,
994}
995
996impl<K, V, Codec> fmt::Debug for MirroredHashMap<K, V, Codec> {
997 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
998 f.debug_struct("MirroredHashMap").finish()
999 }
1000}
1001
1002impl<K, V, Codec> MirroredHashMap<K, V, Codec>
1003where
1004 K: RemoteSend + Eq + Hash + Clone,
1005 V: RemoteSend + Clone,
1006 Codec: remoc::codec::Codec,
1007{
1008 pub async fn borrow(&self) -> Result<MirroredHashMapRef<'_, K, V>, RecvError> {
1018 let inner = self.inner.read().await;
1019 let inner = RwLockReadGuard::map(inner, |inner| inner.as_ref().unwrap());
1020 match &inner.error {
1021 None => Ok(MirroredHashMapRef(inner)),
1022 Some(err) => Err(err.clone()),
1023 }
1024 }
1025
1026 pub async fn borrow_and_update(&mut self) -> Result<MirroredHashMapRef<'_, K, V>, RecvError> {
1039 let inner = self.inner.read().await;
1040 self.changed_rx.borrow_and_update();
1041 let inner = RwLockReadGuard::map(inner, |inner| inner.as_ref().unwrap());
1042 match &inner.error {
1043 None => Ok(MirroredHashMapRef(inner)),
1044 Some(err) => Err(err.clone()),
1045 }
1046 }
1047
1048 pub async fn detach(self) -> HashMap<K, V> {
1050 let mut inner = self.inner.write().await;
1051 inner.take().unwrap().hm
1052 }
1053
1054 pub async fn changed(&mut self) {
1059 let _ = self.changed_rx.changed().await;
1060 }
1061
1062 pub async fn subscribe(&self, buffer: usize) -> Result<HashMapSubscription<K, V, Codec>, RecvError> {
1069 let view = self.borrow().await?;
1070 let initial = view.clone();
1071 let events = if view.is_done() { None } else { Some(self.tx.subscribe(buffer)) };
1072
1073 Ok(HashMapSubscription::new(HashMapInitialValue::new_value(initial), events))
1074 }
1075
1076 pub async fn subscribe_incremental(
1084 &self, buffer: usize,
1085 ) -> Result<HashMapSubscription<K, V, Codec>, RecvError> {
1086 let view = self.borrow().await?;
1087 let initial = view.clone();
1088 let events = if view.is_done() { None } else { Some(self.tx.subscribe(buffer)) };
1089
1090 Ok(HashMapSubscription::new(
1091 HashMapInitialValue::new_incremental(initial, Arc::new(default_on_err)),
1092 events,
1093 ))
1094 }
1095}
1096
1097impl<K, V, Codec> Drop for MirroredHashMap<K, V, Codec> {
1098 fn drop(&mut self) {
1099 }
1101}
1102
1103pub struct MirroredHashMapRef<'a, K, V>(RwLockReadGuard<'a, MirroredHashMapInner<K, V>>);
1105
1106impl<'a, K, V> MirroredHashMapRef<'a, K, V> {
1107 pub fn is_complete(&self) -> bool {
1110 self.0.complete
1111 }
1112
1113 pub fn is_done(&self) -> bool {
1116 self.0.done
1117 }
1118}
1119
1120impl<'a, K, V> fmt::Debug for MirroredHashMapRef<'a, K, V>
1121where
1122 K: fmt::Debug,
1123 V: fmt::Debug,
1124{
1125 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1126 self.0.hm.fmt(f)
1127 }
1128}
1129
1130impl<'a, K, V> Deref for MirroredHashMapRef<'a, K, V> {
1131 type Target = HashMap<K, V>;
1132
1133 fn deref(&self) -> &Self::Target {
1134 &self.0.hm
1135 }
1136}
1137
1138impl<'a, K, V> Drop for MirroredHashMapRef<'a, K, V> {
1139 fn drop(&mut self) {
1140 }
1142}