1use remoc::prelude::*;
28use serde::{Deserialize, Serialize};
29use std::{
30 collections::HashSet,
31 fmt,
32 iter::{Enumerate, FusedIterator},
33 mem::take,
34 ops::{Deref, DerefMut},
35 sync::Arc,
36};
37use tokio::sync::{oneshot, watch, RwLock, RwLockReadGuard};
38
39use crate::{default_on_err, send_event, ChangeNotifier, ChangeSender, RecvError, SendError};
40
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
43pub enum VecEvent<T> {
44 #[serde(skip)]
47 InitialComplete,
48 Push(T),
50 Pop,
52 Insert(usize, T),
54 Set(usize, T),
56 Remove(usize),
58 SwapRemove(usize),
60 Fill(T),
62 Resize(usize, T),
64 Truncate(usize),
66 Retain(HashSet<usize>),
68 RetainNot(HashSet<usize>),
70 Clear,
72 ShrinkToFit,
74 Done,
77}
78
79pub struct ObservableVec<T, Codec = remoc::codec::Default> {
90 v: Vec<T>,
91 tx: rch::broadcast::Sender<VecEvent<T>, Codec>,
92 change: ChangeSender,
93 on_err: Arc<dyn Fn(SendError) + Send + Sync>,
94 done: bool,
95}
96
97impl<T, Codec> fmt::Debug for ObservableVec<T, Codec>
98where
99 T: fmt::Debug,
100{
101 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
102 self.v.fmt(f)
103 }
104}
105
106impl<T, Codec> From<Vec<T>> for ObservableVec<T, Codec>
107where
108 T: Clone + RemoteSend,
109 Codec: remoc::codec::Codec,
110{
111 fn from(hs: Vec<T>) -> Self {
112 let (tx, _rx) = rch::broadcast::channel::<_, _, rch::buffer::Default>(1);
113 Self { v: hs, tx, change: ChangeSender::new(), on_err: Arc::new(default_on_err), done: false }
114 }
115}
116
117impl<T, Codec> From<ObservableVec<T, Codec>> for Vec<T> {
118 fn from(ohs: ObservableVec<T, Codec>) -> Self {
119 ohs.v
120 }
121}
122
123impl<T, Codec> Default for ObservableVec<T, Codec>
124where
125 T: Clone + RemoteSend,
126 Codec: remoc::codec::Codec,
127{
128 fn default() -> Self {
129 Self::from(Vec::new())
130 }
131}
132
133impl<T, Codec> ObservableVec<T, Codec>
134where
135 T: Clone + RemoteSend,
136 Codec: remoc::codec::Codec,
137{
138 pub fn new() -> Self {
140 Self::default()
141 }
142
143 pub fn set_error_handler<E>(&mut self, on_err: E)
146 where
147 E: Fn(SendError) + Send + Sync + 'static,
148 {
149 self.on_err = Arc::new(on_err);
150 }
151
152 pub fn subscribe(&self, buffer: usize) -> VecSubscription<T, Codec> {
159 VecSubscription::new(
160 VecInitialValue::new_value(self.v.clone()),
161 if self.done { None } else { Some(self.tx.subscribe(buffer)) },
162 )
163 }
164
165 pub fn subscribe_incremental(&self, buffer: usize) -> VecSubscription<T, Codec> {
173 VecSubscription::new(
174 VecInitialValue::new_incremental(self.v.clone(), self.on_err.clone()),
175 if self.done { None } else { Some(self.tx.subscribe(buffer)) },
176 )
177 }
178
179 pub fn subscriber_count(&self) -> usize {
181 self.tx.receiver_count()
182 }
183
184 pub fn notifier(&self) -> ChangeNotifier {
187 self.change.subscribe()
188 }
189
190 pub fn push(&mut self, value: T) {
197 self.assert_not_done();
198 self.change.notify();
199
200 send_event(&self.tx, &*self.on_err, VecEvent::Push(value.clone()));
201 self.v.push(value);
202 }
203
204 pub fn pop(&mut self) -> Option<T> {
211 self.assert_not_done();
212
213 match self.v.pop() {
214 Some(value) => {
215 self.change.notify();
216 send_event(&self.tx, &*self.on_err, VecEvent::Pop);
217 Some(value)
218 }
219 None => None,
220 }
221 }
222
223 pub fn get_mut(&mut self, index: usize) -> Option<RefMut<T, Codec>> {
230 self.assert_not_done();
231
232 match self.v.get_mut(index) {
233 Some(value) => Some(RefMut {
234 index,
235 value,
236 changed: false,
237 tx: &self.tx,
238 change: &self.change,
239 on_err: &*self.on_err,
240 }),
241 None => None,
242 }
243 }
244
245 pub fn iter_mut(&mut self) -> IterMut<T, Codec> {
252 self.assert_not_done();
253
254 IterMut {
255 inner: self.v.iter_mut().enumerate(),
256 tx: &self.tx,
257 change: &self.change,
258 on_err: &*self.on_err,
259 }
260 }
261
262 pub fn insert(&mut self, index: usize, value: T) {
269 self.assert_not_done();
270
271 let value_event = value.clone();
272 self.v.insert(index, value);
273 self.change.notify();
274 send_event(&self.tx, &*self.on_err, VecEvent::Insert(index, value_event));
275 }
276
277 pub fn remove(&mut self, index: usize) -> T {
286 self.assert_not_done();
287
288 let value = self.v.remove(index);
289 self.change.notify();
290 send_event(&self.tx, &*self.on_err, VecEvent::Remove(index));
291 value
292 }
293
294 pub fn swap_remove(&mut self, index: usize) -> T {
303 self.assert_not_done();
304
305 let value = self.v.swap_remove(index);
306 self.change.notify();
307 send_event(&self.tx, &*self.on_err, VecEvent::SwapRemove(index));
308 value
309 }
310
311 pub fn fill(&mut self, value: T) {
318 self.assert_not_done();
319
320 self.change.notify();
321 send_event(&self.tx, &*self.on_err, VecEvent::Fill(value.clone()));
322 self.v.fill(value);
323 }
324
325 pub fn resize(&mut self, new_len: usize, value: T) {
333 self.assert_not_done();
334
335 if new_len != self.v.len() {
336 self.change.notify();
337 send_event(&self.tx, &*self.on_err, VecEvent::Resize(new_len, value.clone()));
338 self.v.resize(new_len, value);
339 }
340 }
341
342 pub fn truncate(&mut self, new_len: usize) {
351 self.assert_not_done();
352
353 if new_len < self.len() {
354 self.change.notify();
355 send_event(&self.tx, &*self.on_err, VecEvent::Truncate(new_len));
356 self.v.truncate(new_len);
357 }
358 }
359
360 pub fn clear(&mut self) {
367 self.assert_not_done();
368
369 if !self.v.is_empty() {
370 self.v.clear();
371 self.change.notify();
372 send_event(&self.tx, &*self.on_err, VecEvent::Clear);
373 }
374 }
375
376 pub fn retain<F>(&mut self, mut f: F)
383 where
384 F: FnMut(&T) -> bool,
385 {
386 self.assert_not_done();
387
388 let mut keep = HashSet::new();
389 let mut remove = HashSet::new();
390 let mut pos = 0;
391
392 self.v.retain(|v| {
393 let keep_this = f(v);
394
395 if keep_this {
396 keep.insert(pos);
397 } else {
398 remove.insert(pos);
399 }
400 pos += 1;
401
402 keep_this
403 });
404
405 if !remove.is_empty() {
406 self.change.notify();
407 if keep.len() < remove.len() {
408 send_event(&self.tx, &*self.on_err, VecEvent::Retain(keep));
409 } else {
410 send_event(&self.tx, &*self.on_err, VecEvent::RetainNot(remove));
411 }
412 }
413 }
414
415 pub fn shrink_to_fit(&mut self) {
422 self.assert_not_done();
423 send_event(&self.tx, &*self.on_err, VecEvent::ShrinkToFit);
424 self.v.shrink_to_fit()
425 }
426
427 fn assert_not_done(&self) {
429 if self.done {
430 panic!("observable vector cannot be changed after done has been called");
431 }
432 }
433
434 pub fn done(&mut self) {
440 if !self.done {
441 send_event(&self.tx, &*self.on_err, VecEvent::Done);
442 self.done = true;
443 }
444 }
445
446 pub fn is_done(&self) -> bool {
451 self.done
452 }
453
454 pub fn into_inner(self) -> Vec<T> {
459 self.into()
460 }
461}
462
463impl<T, Codec> Deref for ObservableVec<T, Codec> {
464 type Target = Vec<T>;
465
466 fn deref(&self) -> &Self::Target {
467 &self.v
468 }
469}
470
471impl<T, Codec> Extend<T> for ObservableVec<T, Codec>
472where
473 T: RemoteSend + Clone,
474 Codec: remoc::codec::Codec,
475{
476 fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
477 for value in iter {
478 self.push(value);
479 }
480 }
481}
482
483pub struct RefMut<'a, T, Codec>
488where
489 T: Clone + RemoteSend,
490 Codec: remoc::codec::Codec,
491{
492 index: usize,
493 value: &'a mut T,
494 changed: bool,
495 tx: &'a rch::broadcast::Sender<VecEvent<T>, Codec>,
496 change: &'a ChangeSender,
497 on_err: &'a dyn Fn(SendError),
498}
499
500impl<'a, T, Codec> Deref for RefMut<'a, T, Codec>
501where
502 T: Clone + RemoteSend,
503 Codec: remoc::codec::Codec,
504{
505 type Target = T;
506
507 fn deref(&self) -> &Self::Target {
508 self.value
509 }
510}
511
512impl<'a, T, Codec> DerefMut for RefMut<'a, T, Codec>
513where
514 T: Clone + RemoteSend,
515 Codec: remoc::codec::Codec,
516{
517 fn deref_mut(&mut self) -> &mut Self::Target {
518 self.changed = true;
519 self.value
520 }
521}
522
523impl<'a, T, Codec> Drop for RefMut<'a, T, Codec>
524where
525 T: Clone + RemoteSend,
526 Codec: remoc::codec::Codec,
527{
528 fn drop(&mut self) {
529 if self.changed {
530 self.change.notify();
531 send_event(self.tx, self.on_err, VecEvent::Set(self.index, self.value.clone()));
532 }
533 }
534}
535
536pub struct IterMut<'a, T, Codec> {
540 inner: Enumerate<std::slice::IterMut<'a, T>>,
541 tx: &'a rch::broadcast::Sender<VecEvent<T>, Codec>,
542 change: &'a ChangeSender,
543 on_err: &'a dyn Fn(SendError),
544}
545
546impl<'a, T, Codec> Iterator for IterMut<'a, T, Codec>
547where
548 T: Clone + RemoteSend,
549 Codec: remoc::codec::Codec,
550{
551 type Item = RefMut<'a, T, Codec>;
552
553 fn next(&mut self) -> Option<Self::Item> {
554 match self.inner.next() {
555 Some((index, value)) => Some(RefMut {
556 index,
557 value,
558 changed: false,
559 tx: self.tx,
560 change: self.change,
561 on_err: self.on_err,
562 }),
563 None => None,
564 }
565 }
566
567 fn size_hint(&self) -> (usize, Option<usize>) {
568 self.inner.size_hint()
569 }
570}
571
572impl<'a, T, Codec> ExactSizeIterator for IterMut<'a, T, Codec>
573where
574 T: Clone + RemoteSend,
575 Codec: remoc::codec::Codec,
576{
577 fn len(&self) -> usize {
578 self.inner.len()
579 }
580}
581
582impl<'a, T, Codec> DoubleEndedIterator for IterMut<'a, T, Codec>
583where
584 T: Clone + RemoteSend,
585 Codec: remoc::codec::Codec,
586{
587 fn next_back(&mut self) -> Option<Self::Item> {
588 match self.inner.next_back() {
589 Some((index, value)) => Some(RefMut {
590 index,
591 value,
592 changed: false,
593 tx: self.tx,
594 change: self.change,
595 on_err: self.on_err,
596 }),
597 None => None,
598 }
599 }
600}
601
602impl<'a, T, Codec> FusedIterator for IterMut<'a, T, Codec>
603where
604 T: Clone + RemoteSend,
605 Codec: remoc::codec::Codec,
606{
607}
608
609struct MirroredVecInner<T> {
610 v: Vec<T>,
611 complete: bool,
612 done: bool,
613 error: Option<RecvError>,
614 max_size: usize,
615}
616
617impl<T> MirroredVecInner<T>
618where
619 T: Clone,
620{
621 fn handle_event(&mut self, event: VecEvent<T>) -> Result<(), RecvError> {
622 match event {
623 VecEvent::InitialComplete => {
624 self.complete = true;
625 }
626 VecEvent::Push(v) => {
627 self.v.push(v);
628 if self.v.len() > self.max_size {
629 return Err(RecvError::MaxSizeExceeded(self.max_size));
630 }
631 }
632 VecEvent::Pop => {
633 self.v.pop();
634 }
635 VecEvent::Insert(i, v) => {
636 if i > self.v.len() {
637 return Err(RecvError::InvalidIndex(i));
638 }
639 self.v.insert(i, v);
640 }
641 VecEvent::Set(i, v) => {
642 if i >= self.v.len() {
643 return Err(RecvError::InvalidIndex(i));
644 }
645 self.v[i] = v;
646 }
647 VecEvent::Remove(i) => {
648 if i >= self.v.len() {
649 return Err(RecvError::InvalidIndex(i));
650 }
651 self.v.remove(i);
652 }
653 VecEvent::SwapRemove(i) => {
654 if i >= self.v.len() {
655 return Err(RecvError::InvalidIndex(i));
656 }
657 self.v.swap_remove(i);
658 }
659 VecEvent::Fill(v) => {
660 self.v.fill(v);
661 }
662 VecEvent::Resize(l, v) => {
663 self.v.resize(l, v);
664 }
665 VecEvent::Truncate(l) => {
666 self.v.truncate(l);
667 }
668 VecEvent::Retain(r) => {
669 let mut pos = 0;
670 self.v.retain(|_| {
671 let keep_this = r.contains(&pos);
672 pos += 1;
673 keep_this
674 });
675 }
676 VecEvent::RetainNot(nr) => {
677 let mut pos = 0;
678 self.v.retain(|_| {
679 let keep_this = !nr.contains(&pos);
680 pos += 1;
681 keep_this
682 });
683 }
684 VecEvent::Clear => {
685 self.v.clear();
686 }
687 VecEvent::ShrinkToFit => {
688 self.v.shrink_to_fit();
689 }
690 VecEvent::Done => {
691 self.done = true;
692 }
693 }
694 Ok(())
695 }
696}
697
698#[derive(Debug, Serialize, Deserialize)]
700#[serde(bound(serialize = "T: RemoteSend, Codec: remoc::codec::Codec"))]
701#[serde(bound(deserialize = "T: RemoteSend, Codec: remoc::codec::Codec"))]
702enum VecInitialValue<T, Codec = remoc::codec::Default> {
703 Value(Vec<T>),
705 Incremental {
707 len: usize,
709 rx: rch::mpsc::Receiver<T, Codec>,
711 },
712}
713
714impl<T, Codec> VecInitialValue<T, Codec>
715where
716 T: RemoteSend + Clone,
717 Codec: remoc::codec::Codec,
718{
719 fn new_value(hs: Vec<T>) -> Self {
721 Self::Value(hs)
722 }
723
724 fn new_incremental(hs: Vec<T>, on_err: Arc<dyn Fn(SendError) + Send + Sync>) -> Self {
726 let (tx, rx) = rch::mpsc::channel(128);
727 let len = hs.len();
728
729 tokio::spawn(async move {
730 for v in hs.into_iter() {
731 match tx.send(v).await {
732 Ok(()) => (),
733 Err(err) if err.is_disconnected() => break,
734 Err(err) => match err.try_into() {
735 Ok(err) => (on_err)(err),
736 Err(_) => unreachable!(),
737 },
738 }
739 }
740 });
741
742 Self::Incremental { len, rx }
743 }
744}
745
746#[derive(Debug, Serialize, Deserialize)]
756#[serde(bound(serialize = "T: RemoteSend, Codec: remoc::codec::Codec"))]
757#[serde(bound(deserialize = "T: RemoteSend, Codec: remoc::codec::Codec"))]
758pub struct VecSubscription<T, Codec = remoc::codec::Default> {
759 initial: VecInitialValue<T, Codec>,
761 #[serde(skip, default)]
763 complete: bool,
764 events: Option<rch::broadcast::Receiver<VecEvent<T>, Codec>>,
768 #[serde(skip, default)]
770 done: bool,
771}
772
773impl<T, Codec> VecSubscription<T, Codec>
774where
775 T: RemoteSend + Clone,
776 Codec: remoc::codec::Codec,
777{
778 fn new(
779 initial: VecInitialValue<T, Codec>, events: Option<rch::broadcast::Receiver<VecEvent<T>, Codec>>,
780 ) -> Self {
781 Self { initial, complete: false, events, done: false }
782 }
783
784 pub fn is_incremental(&self) -> bool {
786 matches!(self.initial, VecInitialValue::Incremental { .. })
787 }
788
789 pub fn is_complete(&self) -> bool {
793 self.complete
794 }
795
796 pub fn is_done(&self) -> bool {
799 self.events.is_none() || self.done
800 }
801
802 pub fn take_initial(&mut self) -> Option<Vec<T>> {
811 match &mut self.initial {
812 VecInitialValue::Value(value) if !self.complete => {
813 self.complete = true;
814 Some(take(value))
815 }
816 _ => None,
817 }
818 }
819
820 pub async fn recv(&mut self) -> Result<Option<VecEvent<T>>, RecvError> {
826 if !self.complete {
828 match &mut self.initial {
829 VecInitialValue::Incremental { len, rx } => {
830 if *len > 0 {
831 match rx.recv().await? {
832 Some(v) => {
833 *len -= 1;
835 return Ok(Some(VecEvent::Push(v)));
836 }
837 None => return Err(RecvError::Closed),
838 }
839 } else {
840 self.complete = true;
842 return Ok(Some(VecEvent::InitialComplete));
843 }
844 }
845 VecInitialValue::Value(_) => {
846 panic!("take_initial must be called before recv for non-incremental subscription");
847 }
848 }
849 }
850
851 if let Some(rx) = &mut self.events {
853 match rx.recv().await? {
854 VecEvent::Done => self.events = None,
855 evt => return Ok(Some(evt)),
856 }
857 }
858
859 if self.done {
861 Ok(None)
862 } else {
863 self.done = true;
864 Ok(Some(VecEvent::Done))
865 }
866 }
867}
868
869impl<T, Codec> VecSubscription<T, Codec>
870where
871 T: RemoteSend + Clone + Sync,
872 Codec: remoc::codec::Codec,
873{
874 pub fn mirror(mut self, max_size: usize) -> MirroredVec<T, Codec> {
880 let (tx, _rx) = rch::broadcast::channel::<_, _, rch::buffer::Default>(1);
881 let (changed_tx, changed_rx) = watch::channel(());
882 let (dropped_tx, mut dropped_rx) = oneshot::channel();
883
884 let inner = Arc::new(RwLock::new(Some(MirroredVecInner {
886 v: self.take_initial().unwrap_or_default(),
887 complete: self.is_complete(),
888 done: self.is_done(),
889 error: None,
890 max_size,
891 })));
892 let inner_task = inner.clone();
893
894 let tx_send = tx.clone();
896 tokio::spawn(async move {
897 loop {
898 let event = tokio::select! {
899 event = self.recv() => event,
900 _ = &mut dropped_rx => return,
901 };
902
903 let mut inner = inner_task.write().await;
904 let mut inner = match inner.as_mut() {
905 Some(inner) => inner,
906 None => return,
907 };
908
909 changed_tx.send_replace(());
910
911 match event {
912 Ok(Some(event)) => {
913 if tx_send.receiver_count() > 0 {
914 let _ = tx_send.send(event.clone());
915 }
916
917 if let Err(err) = inner.handle_event(event) {
918 inner.error = Some(err);
919 return;
920 }
921
922 if inner.done {
923 break;
924 }
925 }
926 Ok(None) => break,
927 Err(err) => {
928 inner.error = Some(err);
929 return;
930 }
931 }
932 }
933 });
934
935 MirroredVec { inner, tx, changed_rx, _dropped_tx: dropped_tx }
936 }
937}
938
939pub struct MirroredVec<T, Codec = remoc::codec::Default> {
941 inner: Arc<RwLock<Option<MirroredVecInner<T>>>>,
942 tx: rch::broadcast::Sender<VecEvent<T>, Codec>,
943 changed_rx: watch::Receiver<()>,
944 _dropped_tx: oneshot::Sender<()>,
945}
946
947impl<T, Codec> fmt::Debug for MirroredVec<T, Codec> {
948 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
949 f.debug_struct("MirroredVec").finish()
950 }
951}
952
953impl<T, Codec> MirroredVec<T, Codec>
954where
955 T: RemoteSend + Clone,
956 Codec: remoc::codec::Codec,
957{
958 pub async fn borrow(&self) -> Result<MirroredVecRef<'_, T>, RecvError> {
968 let inner = self.inner.read().await;
969 let inner = RwLockReadGuard::map(inner, |inner| inner.as_ref().unwrap());
970 match &inner.error {
971 None => Ok(MirroredVecRef(inner)),
972 Some(err) => Err(err.clone()),
973 }
974 }
975
976 pub async fn borrow_and_update(&mut self) -> Result<MirroredVecRef<'_, T>, RecvError> {
989 let inner = self.inner.read().await;
990 self.changed_rx.borrow_and_update();
991 let inner = RwLockReadGuard::map(inner, |inner| inner.as_ref().unwrap());
992 match &inner.error {
993 None => Ok(MirroredVecRef(inner)),
994 Some(err) => Err(err.clone()),
995 }
996 }
997
998 pub async fn detach(self) -> Vec<T> {
1000 let mut inner = self.inner.write().await;
1001 inner.take().unwrap().v
1002 }
1003
1004 pub async fn changed(&mut self) {
1009 let _ = self.changed_rx.changed().await;
1010 }
1011
1012 pub async fn subscribe(&self, buffer: usize) -> Result<VecSubscription<T, Codec>, RecvError> {
1019 let view = self.borrow().await?;
1020 let initial = view.clone();
1021 let events = if view.is_done() { None } else { Some(self.tx.subscribe(buffer)) };
1022
1023 Ok(VecSubscription::new(VecInitialValue::new_value(initial), events))
1024 }
1025
1026 pub async fn subscribe_incremental(&self, buffer: usize) -> Result<VecSubscription<T, Codec>, RecvError> {
1034 let view = self.borrow().await?;
1035 let initial = view.clone();
1036 let events = if view.is_done() { None } else { Some(self.tx.subscribe(buffer)) };
1037
1038 Ok(VecSubscription::new(VecInitialValue::new_incremental(initial, Arc::new(default_on_err)), events))
1039 }
1040}
1041
1042impl<T, Codec> Drop for MirroredVec<T, Codec> {
1043 fn drop(&mut self) {
1044 }
1046}
1047
1048pub struct MirroredVecRef<'a, T>(RwLockReadGuard<'a, MirroredVecInner<T>>);
1050
1051impl<'a, T> MirroredVecRef<'a, T> {
1052 pub fn is_complete(&self) -> bool {
1055 self.0.complete
1056 }
1057
1058 pub fn is_done(&self) -> bool {
1061 self.0.done
1062 }
1063}
1064
1065impl<'a, T> fmt::Debug for MirroredVecRef<'a, T>
1066where
1067 T: fmt::Debug,
1068{
1069 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1070 self.0.v.fmt(f)
1071 }
1072}
1073
1074impl<'a, T> Deref for MirroredVecRef<'a, T> {
1075 type Target = Vec<T>;
1076
1077 fn deref(&self) -> &Self::Target {
1078 &self.0.v
1079 }
1080}
1081
1082impl<'a, T> Drop for MirroredVecRef<'a, T> {
1083 fn drop(&mut self) {
1084 }
1086}