1use futures::{future, Future, FutureExt};
23use remoc::prelude::*;
24use serde::{Deserialize, Serialize};
25use std::{
26 fmt,
27 marker::PhantomData,
28 ops::Deref,
29 sync::{
30 atomic::{AtomicUsize, Ordering},
31 Arc,
32 },
33};
34use tokio::sync::{mpsc, oneshot, watch, Mutex, OwnedMutexGuard, RwLock, RwLockReadGuard};
35
36use crate::{default_on_err, ChangeNotifier, ChangeSender, RecvError, SendError};
37
38#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40pub enum ListEvent<T> {
41 #[serde(skip)]
44 InitialComplete,
45 Push(T),
47 Done,
50}
51
52enum Req<T> {
54 Push(T),
56 Done,
58 Borrow(oneshot::Sender<OwnedMutexGuard<Vec<T>>>),
60 SetErrorHandler(Box<dyn Fn(SendError) + Send + Sync + 'static>),
62}
63
64enum DistReq<T, Codec> {
66 Subscribe(rch::mpsc::Sender<ListEvent<T>, Codec>),
68 NotifyNoSubscribers(oneshot::Sender<()>),
70}
71
72#[derive(Clone)]
76pub struct ObservableListDistributor<T, Codec = remoc::codec::Default> {
77 tx: mpsc::UnboundedSender<DistReq<T, Codec>>,
78 len: Arc<AtomicUsize>,
79 subscriber_count: Arc<AtomicUsize>,
80}
81
82impl<T, Codec> fmt::Debug for ObservableListDistributor<T, Codec> {
83 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
84 f.debug_struct("ObservableListDistributor")
85 .field("len", &self.len.load(Ordering::Relaxed))
86 .field("subscriber_count", &self.subscriber_count.load(Ordering::Relaxed))
87 .finish()
88 }
89}
90
91impl<T, Codec> ObservableListDistributor<T, Codec>
92where
93 T: RemoteSend + Clone,
94 Codec: remoc::codec::Codec,
95{
96 fn req(&self, req: DistReq<T, Codec>) {
98 if self.tx.send(req).is_err() {
99 panic!("observable list task was terminated");
100 }
101 }
102
103 pub fn subscribe(&self) -> ListSubscription<T, Codec> {
105 let (tx, rx) = rch::mpsc::channel(1);
106 let _ = self.tx.send(DistReq::Subscribe(tx));
107 ListSubscription::new(self.len.load(Ordering::Relaxed), rx)
108 }
109
110 pub fn subscriber_count(&self) -> usize {
112 self.subscriber_count.load(Ordering::Relaxed)
113 }
114
115 pub fn closed(&self) -> impl Future<Output = ()> {
121 let (tx, rx) = oneshot::channel();
122 self.req(DistReq::NotifyNoSubscribers(tx));
123 async move {
124 let _ = rx.await;
125 }
126 }
127
128 pub fn is_closed(&self) -> bool {
130 self.subscriber_count() == 0
131 }
132}
133
134pub struct ObservableList<T, Codec = remoc::codec::Default> {
142 tx: mpsc::UnboundedSender<Req<T>>,
143 change: ChangeSender,
144 len: Arc<AtomicUsize>,
145 done: bool,
146 dist: ObservableListDistributor<T, Codec>,
147}
148
149impl<T, Codec> fmt::Debug for ObservableList<T, Codec> {
150 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
151 f.debug_struct("ObservableList")
152 .field("len", &self.len.load(Ordering::Relaxed))
153 .field("done", &self.done)
154 .field("subscriber_count", &self.dist.subscriber_count.load(Ordering::Relaxed))
155 .finish()
156 }
157}
158
159impl<T, Codec> Default for ObservableList<T, Codec>
160where
161 T: RemoteSend + Clone,
162 Codec: remoc::codec::Codec,
163{
164 fn default() -> Self {
165 Self::from(Vec::new())
166 }
167}
168
169impl<T, Codec> From<Vec<T>> for ObservableList<T, Codec>
170where
171 T: RemoteSend + Clone,
172 Codec: remoc::codec::Codec,
173{
174 fn from(initial: Vec<T>) -> Self {
175 let (tx, rx) = mpsc::unbounded_channel();
176 let (sub_tx, sub_rx) = mpsc::unbounded_channel();
177 let len = Arc::new(AtomicUsize::new(initial.len()));
178 let subscriber_count = Arc::new(AtomicUsize::new(0));
179 tokio::spawn(Self::task(initial, rx, sub_rx, subscriber_count.clone()));
180 Self {
181 tx,
182 change: ChangeSender::new(),
183 len: len.clone(),
184 done: false,
185 dist: ObservableListDistributor { tx: sub_tx, len, subscriber_count },
186 }
187 }
188}
189
190impl<T, Codec> ObservableList<T, Codec>
191where
192 T: RemoteSend + Clone,
193 Codec: remoc::codec::Codec,
194{
195 pub fn new() -> Self {
197 Self::default()
198 }
199
200 fn req(&self, req: Req<T>) {
202 if self.tx.send(req).is_err() {
203 panic!("observable list task was terminated");
204 }
205 }
206
207 fn assert_not_done(&self) {
209 if self.done {
210 panic!("observable list cannot be changed after done has been called");
211 }
212 }
213
214 pub fn set_error_handler<E>(&mut self, on_err: E)
217 where
218 E: Fn(SendError) + Send + Sync + 'static,
219 {
220 self.req(Req::SetErrorHandler(Box::new(on_err)));
221 }
222
223 pub fn distributor(&self) -> ObservableListDistributor<T, Codec> {
228 self.dist.clone()
229 }
230
231 pub fn subscribe(&self) -> ListSubscription<T, Codec> {
233 self.dist.subscribe()
234 }
235
236 pub fn subscriber_count(&self) -> usize {
238 self.dist.subscriber_count()
239 }
240
241 pub fn notifier(&self) -> ChangeNotifier {
244 self.change.subscribe()
245 }
246
247 pub fn closed(&self) -> impl Future<Output = ()> {
253 self.dist.closed()
254 }
255
256 pub fn is_closed(&self) -> bool {
258 self.dist.is_closed()
259 }
260
261 pub fn push(&mut self, value: T) {
268 self.assert_not_done();
269 self.req(Req::Push(value));
270 self.len.fetch_add(1, Ordering::Relaxed);
271 self.change.notify();
272 }
273
274 pub fn len(&self) -> usize {
276 self.len.load(Ordering::Relaxed)
277 }
278
279 pub fn is_empty(&self) -> bool {
281 self.len() == 0
282 }
283
284 pub fn done(&mut self) {
290 if !self.done {
291 self.req(Req::Done);
292 self.done = true;
293 }
294 }
295
296 pub fn is_done(&self) -> bool {
301 self.done
302 }
303
304 #[allow(clippy::needless_lifetimes)]
308 pub async fn borrow<'a>(&'a self) -> ObservableListRef<'a, T> {
309 let (tx, rx) = oneshot::channel();
310 self.req(Req::Borrow(tx));
311 ObservableListRef { buffer: rx.await.unwrap(), _phantom: PhantomData }
312 }
313
314 async fn task(
316 buffer: Vec<T>, rx: mpsc::UnboundedReceiver<Req<T>>, dist_rx: mpsc::UnboundedReceiver<DistReq<T, Codec>>,
317 subscriber_count: Arc<AtomicUsize>,
318 ) {
319 struct SubState<T, Codec> {
321 pos: usize,
322 done: bool,
323 tx: rch::mpsc::Sender<ListEvent<T>, Codec>,
324 }
325
326 let buffer_shared = Arc::new(Mutex::new(buffer));
327 let mut buffer_guard_opt = None;
328 let mut rx_opt = Some(rx);
329 let mut dist_rx_opt = Some(dist_rx);
330 let mut subs: Vec<SubState<T, Codec>> = Vec::new();
331 let mut done = false;
332 let mut on_err: Box<dyn Fn(SendError) + Send + Sync + 'static> = Box::new(default_on_err);
333 let mut no_sub_notify: Vec<oneshot::Sender<()>> = Vec::new();
334
335 loop {
337 let buffer = match &mut buffer_guard_opt {
339 Some(br) => br,
340 None => {
341 buffer_guard_opt = Some(buffer_shared.clone().lock_owned().await);
342 buffer_guard_opt.as_mut().unwrap()
343 }
344 };
345
346 if rx_opt.is_none() {
349 if done {
350 subs.retain(|sub| !sub.done);
351 } else {
352 subs.retain(|sub| sub.pos < buffer.len());
353 }
354 }
355
356 subscriber_count.store(subs.len(), Ordering::Relaxed);
358 if subs.is_empty() {
359 for tx in no_sub_notify.drain(..) {
360 let _ = tx.send(());
361 }
362 }
363
364 let mut permit_tasks = Vec::new();
366 for (i, sub) in subs.iter().enumerate() {
367 if sub.pos < buffer.len() || (done && !sub.done) {
368 permit_tasks.push(async move { (i, sub.tx.reserve().await) }.boxed());
369 }
370 }
371
372 if rx_opt.is_none() && dist_rx_opt.is_none() && permit_tasks.is_empty() {
374 break;
375 }
376
377 tokio::select! {
378 biased;
379
380 req = async {
382 match &mut rx_opt {
383 Some(rx) => rx.recv().await,
384 None => future::pending().await,
385 }
386 } => match req {
387 Some(Req::Push(v)) => buffer.push(v),
388 Some(Req::Done) => done = true,
389 Some(Req::Borrow(tx)) => {
390 let _ = tx.send(buffer_guard_opt.take().unwrap());
391 }
392 Some(Req::SetErrorHandler(handler)) => on_err = handler,
393 None => rx_opt = None,
394 },
395
396 req = async {
398 match &mut dist_rx_opt {
399 Some(rx) => rx.recv().await,
400 None => future::pending().await,
401 }
402 } => match req {
403 Some(DistReq::Subscribe(tx)) => subs.push(SubState { pos: 0, done: false, tx }),
404 Some(DistReq::NotifyNoSubscribers(tx)) => no_sub_notify.push(tx),
405 None => dist_rx_opt = None,
406 },
407
408 (i, res) = async move {
410 if permit_tasks.is_empty() {
411 future::pending().await
412 } else {
413 future::select_all(permit_tasks).await.0
414 }
415 } => match res {
416 Ok(permit) => {
417 let sub = &mut subs[i];
418 if sub.pos < buffer.len() {
419 permit.send(ListEvent::Push(buffer[sub.pos].clone()));
420 sub.pos += 1;
421 } else if done && !sub.done {
422 permit.send(ListEvent::Done);
423 sub.done = true;
424 } else {
425 unreachable!()
426 }
427 }
428 Err(err) => {
429 subs.swap_remove(i);
430 if let Ok(err) = SendError::try_from(err) {
431 on_err(err);
432 }
433 }
434 },
435 }
436 }
437 }
438}
439
440impl<T, Codec> Drop for ObservableList<T, Codec> {
441 fn drop(&mut self) {
442 }
444}
445
446impl<T, Codec> Extend<T> for ObservableList<T, Codec>
447where
448 T: RemoteSend + Clone,
449 Codec: remoc::codec::Codec,
450{
451 fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
452 for value in iter {
453 self.push(value);
454 }
455 }
456}
457
458pub struct ObservableListRef<'a, T> {
462 buffer: OwnedMutexGuard<Vec<T>>,
463 _phantom: PhantomData<&'a ()>,
464}
465
466impl<'a, T> fmt::Debug for ObservableListRef<'a, T>
467where
468 T: fmt::Debug,
469{
470 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
471 self.buffer.fmt(f)
472 }
473}
474
475impl<'a, T> Deref for ObservableListRef<'a, T> {
476 type Target = Vec<T>;
477
478 fn deref(&self) -> &Self::Target {
479 &*self.buffer
480 }
481}
482
483struct MirroredListInner<T> {
484 v: Vec<T>,
485 complete: bool,
486 done: bool,
487 error: Option<RecvError>,
488 max_size: usize,
489}
490
491impl<T> MirroredListInner<T> {
492 fn handle_event(&mut self, event: ListEvent<T>) -> Result<(), RecvError> {
493 match event {
494 ListEvent::InitialComplete => {
495 self.complete = true;
496 }
497 ListEvent::Push(v) => {
498 self.v.push(v);
499 if self.v.len() > self.max_size {
500 return Err(RecvError::MaxSizeExceeded(self.max_size));
501 }
502 }
503 ListEvent::Done => {
504 self.done = true;
505 }
506 }
507 Ok(())
508 }
509}
510
511#[derive(Debug, Serialize, Deserialize)]
519#[serde(bound(serialize = "T: RemoteSend, Codec: remoc::codec::Codec"))]
520#[serde(bound(deserialize = "T: RemoteSend, Codec: remoc::codec::Codec"))]
521pub struct ListSubscription<T, Codec = remoc::codec::Default> {
522 initial_len: usize,
524 #[serde(skip, default)]
526 complete: bool,
527 events: Option<rch::mpsc::Receiver<ListEvent<T>, Codec>>,
529 len: usize,
531}
532
533impl<T, Codec> ListSubscription<T, Codec>
534where
535 T: RemoteSend + Clone,
536 Codec: remoc::codec::Codec,
537{
538 fn new(initial_len: usize, events: rch::mpsc::Receiver<ListEvent<T>, Codec>) -> Self {
539 Self { initial_len, complete: false, events: Some(events), len: 0 }
540 }
541
542 pub fn is_complete(&self) -> bool {
546 self.complete
547 }
548
549 pub fn is_done(&self) -> bool {
552 self.events.is_none()
553 }
554
555 pub async fn recv(&mut self) -> Result<Option<ListEvent<T>>, RecvError> {
557 if self.len == self.initial_len && !self.complete {
559 self.complete = true;
560 return Ok(Some(ListEvent::InitialComplete));
561 }
562
563 match &mut self.events {
565 Some(rx) => match rx.recv().await? {
566 Some(ListEvent::InitialComplete) => unreachable!(),
567 Some(evt @ ListEvent::Push(_)) => {
568 self.len += 1;
569 Ok(Some(evt))
570 }
571 Some(ListEvent::Done) => {
572 self.events = None;
573 Ok(Some(ListEvent::Done))
574 }
575 None => Err(RecvError::Closed),
576 },
577 None => Ok(None),
578 }
579 }
580
581 pub async fn recv_item(&mut self) -> Result<Option<T>, RecvError> {
586 loop {
587 match self.recv().await? {
588 Some(ListEvent::InitialComplete) => (),
589 Some(ListEvent::Push(item)) => return Ok(Some(item)),
590 Some(ListEvent::Done) => (),
591 None => return Ok(None),
592 }
593 }
594 }
595}
596
597impl<T, Codec> ListSubscription<T, Codec>
598where
599 T: RemoteSend + Clone + Sync,
600 Codec: remoc::codec::Codec,
601{
602 pub fn mirror(mut self, max_size: usize) -> MirroredList<T> {
608 let (changed_tx, changed_rx) = watch::channel(());
609 let (dropped_tx, mut dropped_rx) = mpsc::channel(1);
610
611 let inner = Arc::new(RwLock::new(MirroredListInner {
613 v: Vec::new(),
614 complete: false,
615 done: false,
616 error: None,
617 max_size,
618 }));
619 let inner_task = Arc::downgrade(&inner);
620
621 tokio::spawn(async move {
623 loop {
624 let event = tokio::select! {
625 event = self.recv() => event,
626 _ = dropped_rx.recv() => return,
627 };
628
629 let inner = match inner_task.upgrade() {
630 Some(inner) => inner,
631 None => return,
632 };
633 let mut inner = inner.write().await;
634
635 changed_tx.send_replace(());
636
637 match event {
638 Ok(Some(event)) => {
639 if let Err(err) = inner.handle_event(event) {
640 inner.error = Some(err);
641 return;
642 }
643
644 if inner.done {
645 break;
646 }
647 }
648 Ok(None) => break,
649 Err(err) => {
650 inner.error = Some(err);
651 return;
652 }
653 }
654 }
655 });
656
657 MirroredList { inner: Some(inner), changed_rx, _dropped_tx: dropped_tx }
658 }
659}
660
661#[derive(Clone)]
665pub struct MirroredList<T> {
666 inner: Option<Arc<RwLock<MirroredListInner<T>>>>,
667 changed_rx: watch::Receiver<()>,
668 _dropped_tx: mpsc::Sender<()>,
669}
670
671impl<T> fmt::Debug for MirroredList<T> {
672 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
673 f.debug_struct("MirroredList").finish()
674 }
675}
676
677impl<T> MirroredList<T>
678where
679 T: RemoteSend + Clone,
680{
681 pub async fn borrow(&self) -> Result<MirroredListRef<'_, T>, RecvError> {
691 let inner = self.inner.as_ref().unwrap().read().await;
692 match &inner.error {
693 None => Ok(MirroredListRef(inner)),
694 Some(err) => Err(err.clone()),
695 }
696 }
697
698 pub async fn borrow_and_update(&mut self) -> Result<MirroredListRef<'_, T>, RecvError> {
711 let inner = self.inner.as_ref().unwrap().read().await;
712 self.changed_rx.borrow_and_update();
713 match &inner.error {
714 None => Ok(MirroredListRef(inner)),
715 Some(err) => Err(err.clone()),
716 }
717 }
718
719 pub async fn detach(mut self) -> Vec<T> {
723 match Arc::try_unwrap(self.inner.take().unwrap()) {
724 Ok(inner) => inner.into_inner().v,
725 Err(inner) => inner.read().await.v.clone(),
726 }
727 }
728
729 pub async fn changed(&mut self) {
734 let _ = self.changed_rx.changed().await;
735 }
736
737 pub async fn done(&mut self) -> Result<(), RecvError> {
742 while !self.borrow_and_update().await?.is_done() {
743 self.changed().await;
744 }
745 Ok(())
746 }
747}
748
749impl<T> Drop for MirroredList<T> {
750 fn drop(&mut self) {
751 }
753}
754
755pub struct MirroredListRef<'a, T>(RwLockReadGuard<'a, MirroredListInner<T>>);
757
758impl<'a, T> MirroredListRef<'a, T> {
759 pub fn is_complete(&self) -> bool {
762 self.0.complete
763 }
764
765 pub fn is_done(&self) -> bool {
768 self.0.done
769 }
770}
771
772impl<'a, T> fmt::Debug for MirroredListRef<'a, T>
773where
774 T: fmt::Debug,
775{
776 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
777 self.0.v.fmt(f)
778 }
779}
780
781impl<'a, T> Deref for MirroredListRef<'a, T> {
782 type Target = Vec<T>;
783
784 fn deref(&self) -> &Self::Target {
785 &self.0.v
786 }
787}
788
789impl<'a, T> Drop for MirroredListRef<'a, T> {
790 fn drop(&mut self) {
791 }
793}