another_rxrust/operators/
to_vec.rs

1use crate::prelude::*;
2use std::{
3  future::Future,
4  marker::PhantomData,
5  sync::{Arc, RwLock},
6  task::{self, Waker},
7};
8
9#[derive(Clone)]
10pub struct ToVec<'a, Item>
11where
12  Item: Clone + Send + Sync,
13{
14  buffer: Arc<RwLock<Vec<Item>>>,
15  done: Arc<RwLock<bool>>,
16  err: Arc<RwLock<Option<RxError>>>,
17  waker: Arc<RwLock<Option<Waker>>>,
18  _lifetime: PhantomData<&'a ()>,
19}
20
21impl<'a, Item> ToVec<'a, Item>
22where
23  Item: Clone + Send + Sync,
24{
25  fn start(&self, source: Observable<'a, Item>) {
26    let buff_next = Arc::clone(&self.buffer);
27    let done_error = Arc::clone(&self.done);
28    let done_complete = Arc::clone(&self.done);
29    let err_error = Arc::clone(&self.err);
30    let waker_error = Arc::clone(&self.waker);
31    let waker_complete = Arc::clone(&self.waker);
32    source.subscribe(
33      move |x| buff_next.write().unwrap().push(x),
34      move |e| {
35        *err_error.write().unwrap() = Some(e);
36        *done_error.write().unwrap() = true;
37        if let Some(w) = waker_error.read().unwrap().clone() {
38          w.wake();
39        }
40      },
41      move || {
42        *done_complete.write().unwrap() = true;
43        if let Some(w) = waker_complete.read().unwrap().clone() {
44          w.wake();
45        }
46      },
47    );
48  }
49}
50
51impl<'a, Item> Future for ToVec<'a, Item>
52where
53  Item: Clone + Send + Sync,
54{
55  type Output = Result<Arc<RwLock<Vec<Item>>>, RxError>;
56
57  fn poll(
58    self: std::pin::Pin<&mut Self>,
59    cx: &mut std::task::Context<'_>,
60  ) -> std::task::Poll<Self::Output> {
61    let mut waker = self.waker.write().unwrap();
62
63    if *self.done.read().unwrap() {
64      if let Some(err) = &*self.err.read().unwrap() {
65        task::Poll::Ready(Err(err.clone()))
66      } else {
67        task::Poll::Ready(Ok(Arc::clone(&self.buffer)))
68      }
69    } else {
70      *waker = Some(cx.waker().clone());
71      task::Poll::Pending
72    }
73  }
74}
75
76impl<'a, Item> Observable<'a, Item>
77where
78  Item: Clone + Send + Sync,
79{
80  pub fn to_vec(&self) -> ToVec<Item> {
81    let inst = ToVec {
82      buffer: Arc::new(RwLock::new(Vec::new())),
83      done: Arc::new(RwLock::new(false)),
84      err: Arc::new(RwLock::new(None)),
85      waker: Arc::new(RwLock::new(None)),
86      _lifetime: PhantomData,
87    };
88    inst.start(self.clone());
89    inst
90  }
91}
92
93#[cfg(all(test, not(feature = "web")))]
94mod test {
95  use crate::prelude::*;
96  use std::time;
97
98  #[tokio::test]
99  async fn basic() {
100    match observables::just(1).to_vec().await {
101      Ok(v) => println!("Ok -> {:?}", v.read().unwrap()),
102      Err(e) => println!("Err -> {:?}", e.downcast_ref::<&str>()),
103    }
104  }
105
106  #[tokio::test]
107  async fn thread() {
108    match observables::interval(
109      time::Duration::from_millis(100),
110      schedulers::new_thread_scheduler(),
111    )
112    .take(5)
113    .to_vec()
114    .await
115    {
116      Ok(v) => println!("Ok -> {:?}", v.read().unwrap()),
117      Err(e) => println!("Err -> {:?}", e.downcast_ref::<&str>()),
118    }
119  }
120
121  #[tokio::test]
122  async fn error() {
123    match observables::interval(
124      time::Duration::from_millis(100),
125      schedulers::new_thread_scheduler(),
126    )
127    .take(5)
128    .flat_map(|x| {
129      if x == 3 {
130        observables::error(RxError::from_error("ERR!"))
131      } else {
132        observables::just(x)
133      }
134    })
135    .to_vec()
136    .await
137    {
138      Ok(v) => println!("Ok -> {:?}", v.read().unwrap()),
139      Err(e) => println!("Err -> {:?}", e.downcast_ref::<&str>()),
140    }
141  }
142}