another_rxrust/operators/
to_vec.rs1use crate::prelude::*;
2use std::{
3 future::Future,
4 marker::PhantomData,
5 sync::{Arc, RwLock},
6 task::{self, Waker},
7};
8
9#[derive(Clone)]
10pub struct ToVec<'a, Item>
11where
12 Item: Clone + Send + Sync,
13{
14 buffer: Arc<RwLock<Vec<Item>>>,
15 done: Arc<RwLock<bool>>,
16 err: Arc<RwLock<Option<RxError>>>,
17 waker: Arc<RwLock<Option<Waker>>>,
18 _lifetime: PhantomData<&'a ()>,
19}
20
21impl<'a, Item> ToVec<'a, Item>
22where
23 Item: Clone + Send + Sync,
24{
25 fn start(&self, source: Observable<'a, Item>) {
26 let buff_next = Arc::clone(&self.buffer);
27 let done_error = Arc::clone(&self.done);
28 let done_complete = Arc::clone(&self.done);
29 let err_error = Arc::clone(&self.err);
30 let waker_error = Arc::clone(&self.waker);
31 let waker_complete = Arc::clone(&self.waker);
32 source.subscribe(
33 move |x| buff_next.write().unwrap().push(x),
34 move |e| {
35 *err_error.write().unwrap() = Some(e);
36 *done_error.write().unwrap() = true;
37 if let Some(w) = waker_error.read().unwrap().clone() {
38 w.wake();
39 }
40 },
41 move || {
42 *done_complete.write().unwrap() = true;
43 if let Some(w) = waker_complete.read().unwrap().clone() {
44 w.wake();
45 }
46 },
47 );
48 }
49}
50
51impl<'a, Item> Future for ToVec<'a, Item>
52where
53 Item: Clone + Send + Sync,
54{
55 type Output = Result<Arc<RwLock<Vec<Item>>>, RxError>;
56
57 fn poll(
58 self: std::pin::Pin<&mut Self>,
59 cx: &mut std::task::Context<'_>,
60 ) -> std::task::Poll<Self::Output> {
61 let mut waker = self.waker.write().unwrap();
62
63 if *self.done.read().unwrap() {
64 if let Some(err) = &*self.err.read().unwrap() {
65 task::Poll::Ready(Err(err.clone()))
66 } else {
67 task::Poll::Ready(Ok(Arc::clone(&self.buffer)))
68 }
69 } else {
70 *waker = Some(cx.waker().clone());
71 task::Poll::Pending
72 }
73 }
74}
75
76impl<'a, Item> Observable<'a, Item>
77where
78 Item: Clone + Send + Sync,
79{
80 pub fn to_vec(&self) -> ToVec<Item> {
81 let inst = ToVec {
82 buffer: Arc::new(RwLock::new(Vec::new())),
83 done: Arc::new(RwLock::new(false)),
84 err: Arc::new(RwLock::new(None)),
85 waker: Arc::new(RwLock::new(None)),
86 _lifetime: PhantomData,
87 };
88 inst.start(self.clone());
89 inst
90 }
91}
92
93#[cfg(all(test, not(feature = "web")))]
94mod test {
95 use crate::prelude::*;
96 use std::time;
97
98 #[tokio::test]
99 async fn basic() {
100 match observables::just(1).to_vec().await {
101 Ok(v) => println!("Ok -> {:?}", v.read().unwrap()),
102 Err(e) => println!("Err -> {:?}", e.downcast_ref::<&str>()),
103 }
104 }
105
106 #[tokio::test]
107 async fn thread() {
108 match observables::interval(
109 time::Duration::from_millis(100),
110 schedulers::new_thread_scheduler(),
111 )
112 .take(5)
113 .to_vec()
114 .await
115 {
116 Ok(v) => println!("Ok -> {:?}", v.read().unwrap()),
117 Err(e) => println!("Err -> {:?}", e.downcast_ref::<&str>()),
118 }
119 }
120
121 #[tokio::test]
122 async fn error() {
123 match observables::interval(
124 time::Duration::from_millis(100),
125 schedulers::new_thread_scheduler(),
126 )
127 .take(5)
128 .flat_map(|x| {
129 if x == 3 {
130 observables::error(RxError::from_error("ERR!"))
131 } else {
132 observables::just(x)
133 }
134 })
135 .to_vec()
136 .await
137 {
138 Ok(v) => println!("Ok -> {:?}", v.read().unwrap()),
139 Err(e) => println!("Err -> {:?}", e.downcast_ref::<&str>()),
140 }
141 }
142}