rxrust/ops/
buffer.rs

1use crate::prelude::*;
2use crate::{error_proxy_impl, is_stopped_proxy_impl};
3use std::cell::RefCell;
4use std::rc::Rc;
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7
8#[derive(Clone)]
9pub struct BufferWithCountOp<S> {
10  pub(crate) source: S,
11  pub(crate) count: usize,
12}
13
14#[doc(hidden)]
15macro_rules! buffer_op_observable_impl {
16  ($ty: ident, $host: ident$(, $lf: lifetime)?$(, $generics: ident) *) => {
17    impl<$($lf, )? $host, $($generics ,)*> Observable
18    for $ty<$($lf, )? $host, $($generics ,)*>
19    where
20      $host: Observable
21    {
22      type Item = Vec<$host::Item>;
23      type Err = $host::Err;
24    }
25  }
26}
27
28buffer_op_observable_impl!(BufferWithCountOp, S);
29
30impl<'a, S> LocalObservable<'a> for BufferWithCountOp<S>
31where
32  S: LocalObservable<'a>,
33  S::Item: 'a,
34{
35  type Unsub = S::Unsub;
36
37  fn actual_subscribe<O: Observer<Item = Self::Item, Err = Self::Err> + 'a>(
38    self,
39    subscriber: Subscriber<O, LocalSubscription>,
40  ) -> Self::Unsub {
41    self.source.actual_subscribe(Subscriber {
42      observer: BufferWithCountObserver::new(subscriber.observer, self.count),
43      subscription: subscriber.subscription,
44    })
45  }
46}
47
48impl<S> SharedObservable for BufferWithCountOp<S>
49where
50  S: SharedObservable,
51  S::Item: Send + Sync + 'static,
52{
53  type Unsub = S::Unsub;
54
55  fn actual_subscribe<
56    O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
57  >(
58    self,
59    subscriber: Subscriber<O, SharedSubscription>,
60  ) -> Self::Unsub {
61    self.source.actual_subscribe(Subscriber {
62      observer: BufferWithCountObserver::new(subscriber.observer, self.count),
63      subscription: subscriber.subscription,
64    })
65  }
66}
67
68#[derive(Clone)]
69pub struct BufferWithCountObserver<O, Item> {
70  observer: O,
71  buffer: Vec<Item>,
72  count: usize,
73}
74
75impl<O, Item> BufferWithCountObserver<O, Item> {
76  fn new(observer: O, count: usize) -> BufferWithCountObserver<O, Item> {
77    BufferWithCountObserver {
78      observer,
79      buffer: vec![],
80      count,
81    }
82  }
83}
84
85impl<O, Item, Err> Observer for BufferWithCountObserver<O, Item>
86where
87  O: Observer<Item = Vec<Item>, Err = Err>,
88{
89  type Item = Item;
90  type Err = Err;
91
92  fn next(&mut self, value: Self::Item) {
93    self.buffer.push(value);
94
95    if self.buffer.len() >= self.count {
96      let buffer = std::mem::take(&mut self.buffer);
97      self.observer.next(buffer);
98    }
99  }
100
101  fn complete(&mut self) {
102    if !self.buffer.is_empty() {
103      let buffer = std::mem::take(&mut self.buffer);
104      self.observer.next(buffer);
105    }
106
107    self.observer.complete();
108  }
109
110  error_proxy_impl!(Err, observer);
111
112  is_stopped_proxy_impl!(observer);
113}
114
115#[derive(Clone)]
116pub struct BufferWithTimeOp<Source, Scheduler> {
117  pub(crate) source: Source,
118  pub(crate) time: Duration,
119  pub(crate) scheduler: Scheduler,
120}
121
122buffer_op_observable_impl!(BufferWithTimeOp, S, Scheduler);
123
124impl<Source, Scheduler> LocalObservable<'static>
125  for BufferWithTimeOp<Source, Scheduler>
126where
127  Source: LocalObservable<'static>,
128  Source::Item: 'static,
129  Scheduler: LocalScheduler + 'static,
130{
131  type Unsub = Source::Unsub;
132
133  fn actual_subscribe<
134    O: Observer<Item = Self::Item, Err = Self::Err> + 'static,
135  >(
136    self,
137    subscriber: Subscriber<O, LocalSubscription>,
138  ) -> Self::Unsub {
139    self.source.actual_subscribe(Subscriber {
140      observer: BufferWithTimeObserver::new(
141        subscriber.observer,
142        self.time,
143        self.scheduler,
144      ),
145      subscription: subscriber.subscription,
146    })
147  }
148}
149
150#[derive(Clone)]
151pub struct BufferWithTimeObserver<O, Item> {
152  observer: Rc<RefCell<O>>,
153  buffer: Rc<RefCell<Vec<Item>>>,
154  handle: SpawnHandle,
155}
156
157impl<O, Item> BufferWithTimeObserver<O, Item>
158where
159  O: Observer<Item = Vec<Item>> + 'static,
160  Item: 'static,
161{
162  fn new<S>(
163    observer: O,
164    time: Duration,
165    scheduler: S,
166  ) -> BufferWithTimeObserver<O, Item>
167  where
168    S: LocalScheduler + 'static,
169  {
170    let observer = Rc::new(RefCell::new(observer));
171    let mut observer_c = observer.clone();
172
173    let buffer = Rc::new(RefCell::new(vec![]));
174    let buffer_c = buffer.clone();
175
176    let handle = scheduler.schedule_repeating(
177      move |_| {
178        if !buffer_c.borrow().is_empty() {
179          observer_c.next(buffer_c.take());
180        }
181      },
182      time,
183      None,
184    );
185
186    BufferWithTimeObserver {
187      observer,
188      buffer,
189      handle,
190    }
191  }
192}
193
194#[doc(hidden)]
195macro_rules! complete_time_impl_local {
196  ($buffer:tt, $observer:tt, $handle:tt) => {
197    fn complete(&mut self) {
198      let buffer = self.$buffer.take();
199      if !buffer.is_empty() {
200        self.$observer.next(buffer);
201      }
202
203      self.$handle.unsubscribe();
204      self.$observer.complete();
205    }
206  };
207}
208
209impl<O, Item, Err> Observer for BufferWithTimeObserver<O, Item>
210where
211  O: Observer<Item = Vec<Item>, Err = Err>,
212{
213  type Item = Item;
214  type Err = Err;
215
216  fn next(&mut self, value: Self::Item) {
217    self.buffer.borrow_mut().push(value);
218  }
219
220  fn error(&mut self, err: Self::Err) {
221    self.handle.unsubscribe();
222    self.observer.error(err);
223  }
224
225  complete_time_impl_local!(buffer, observer, handle);
226
227  is_stopped_proxy_impl!(observer);
228}
229
230impl<Source, Scheduler> SharedObservable for BufferWithTimeOp<Source, Scheduler>
231where
232  Source: SharedObservable,
233  <Source as Observable>::Item: Send + Sync + 'static,
234  Scheduler: SharedScheduler,
235{
236  type Unsub = Source::Unsub;
237
238  fn actual_subscribe<O>(
239    self,
240    subscriber: Subscriber<O, SharedSubscription>,
241  ) -> Self::Unsub
242  where
243    O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
244  {
245    self.source.actual_subscribe(Subscriber {
246      observer: BufferWithTimeObserverShared::new(
247        subscriber.observer,
248        self.time,
249        self.scheduler,
250      ),
251      subscription: subscriber.subscription,
252    })
253  }
254}
255
256#[derive(Clone)]
257pub struct BufferWithTimeObserverShared<O, Item> {
258  observer: Arc<Mutex<O>>,
259  buffer: Arc<Mutex<Vec<Item>>>,
260  handle: SpawnHandle,
261}
262
263impl<O, Item> BufferWithTimeObserverShared<O, Item>
264where
265  O: Observer<Item = Vec<Item>> + Send + Sync + 'static,
266  Item: Send + Sync + 'static,
267{
268  fn new<S>(
269    observer: O,
270    time: Duration,
271    scheduler: S,
272  ) -> BufferWithTimeObserverShared<O, Item>
273  where
274    S: SharedScheduler,
275  {
276    let observer = Arc::new(Mutex::new(observer));
277    let mut observer_c = observer.clone();
278
279    let buffer = Arc::new(Mutex::new(vec![]));
280    let buffer_c = buffer.clone();
281
282    let handle = scheduler.schedule_repeating(
283      move |_| {
284        let mut buffer = buffer_c.lock().unwrap();
285        let buffer = std::mem::take(&mut *buffer);
286        if !buffer.is_empty() {
287          observer_c.next(buffer);
288        }
289      },
290      time,
291      None,
292    );
293
294    BufferWithTimeObserverShared {
295      observer,
296      buffer,
297      handle,
298    }
299  }
300}
301
302#[doc(hidden)]
303macro_rules! complete_time_impl_shared {
304  ($buffer:tt, $observer:tt, $handle:tt) => {
305    fn complete(&mut self) {
306      let mut buffer = self.$buffer.lock().unwrap();
307      let buffer = std::mem::take(&mut *buffer);
308
309      if !buffer.is_empty() {
310        self.$observer.next(buffer);
311      }
312
313      self.$handle.unsubscribe();
314      self.$observer.complete();
315    }
316  };
317}
318
319impl<O, Item, Err> Observer for BufferWithTimeObserverShared<O, Item>
320where
321  O: Observer<Item = Vec<Item>, Err = Err>,
322{
323  type Item = Item;
324  type Err = Err;
325
326  fn next(&mut self, value: Self::Item) {
327    let mut buffer = self.buffer.lock().unwrap();
328    (*buffer).push(value);
329  }
330
331  fn error(&mut self, err: Self::Err) {
332    self.handle.unsubscribe();
333    self.observer.error(err);
334  }
335
336  complete_time_impl_shared!(buffer, observer, handle);
337
338  is_stopped_proxy_impl!(observer);
339}
340
341#[derive(Clone)]
342pub struct BufferWithCountOrTimerOp<Source, Scheduler> {
343  pub(crate) source: Source,
344  pub(crate) count: usize,
345  pub(crate) time: Duration,
346  pub(crate) scheduler: Scheduler,
347}
348
349buffer_op_observable_impl!(BufferWithCountOrTimerOp, S, Scheduler);
350
351impl<Source, Scheduler> LocalObservable<'static>
352  for BufferWithCountOrTimerOp<Source, Scheduler>
353where
354  Source: LocalObservable<'static>,
355  Source::Item: 'static,
356  Scheduler: LocalScheduler + 'static,
357{
358  type Unsub = Source::Unsub;
359
360  fn actual_subscribe<
361    O: Observer<Item = Self::Item, Err = Self::Err> + 'static,
362  >(
363    self,
364    subscriber: Subscriber<O, LocalSubscription>,
365  ) -> Self::Unsub {
366    self.source.actual_subscribe(Subscriber {
367      observer: BufferWithCountOrTimerObserver::new(
368        subscriber.observer,
369        self.count,
370        self.time,
371        self.scheduler,
372      ),
373      subscription: subscriber.subscription,
374    })
375  }
376}
377
378#[derive(Clone)]
379pub struct BufferWithCountOrTimerObserver<O, Item> {
380  observer: Rc<RefCell<O>>,
381  buffer: Rc<RefCell<Vec<Item>>>,
382  count: usize,
383  handle: SpawnHandle,
384}
385
386impl<O, Item> BufferWithCountOrTimerObserver<O, Item> {
387  fn new<S>(observer: O, count: usize, time: Duration, scheduler: S) -> Self
388  where
389    O: Observer<Item = Vec<Item>> + 'static,
390    Item: 'static,
391    S: LocalScheduler + 'static,
392  {
393    let observer = Rc::new(RefCell::new(observer));
394    let mut observer_c = observer.clone();
395
396    let buffer = Rc::new(RefCell::new(vec![]));
397    let buffer_c = buffer.clone();
398
399    let handle = scheduler.schedule_repeating(
400      move |_| {
401        if buffer_c.borrow().is_empty() {
402          observer_c.next(buffer_c.take());
403        }
404      },
405      time,
406      None,
407    );
408
409    BufferWithCountOrTimerObserver {
410      observer,
411      buffer,
412      count,
413      handle,
414    }
415  }
416}
417
418impl<O, Item, Err> Observer for BufferWithCountOrTimerObserver<O, Item>
419where
420  O: Observer<Item = Vec<Item>, Err = Err>,
421{
422  type Item = Item;
423  type Err = Err;
424
425  fn next(&mut self, value: Self::Item) {
426    self.buffer.borrow_mut().push(value);
427
428    if self.buffer.borrow().len() >= self.count {
429      let buffer = self.buffer.take();
430      self.observer.borrow_mut().next(buffer);
431    }
432  }
433
434  fn error(&mut self, err: Self::Err) {
435    self.handle.unsubscribe();
436    self.observer.error(err);
437  }
438
439  complete_time_impl_local!(buffer, observer, handle);
440
441  is_stopped_proxy_impl!(observer);
442}
443
444impl<Source, Scheduler> SharedObservable
445  for BufferWithCountOrTimerOp<Source, Scheduler>
446where
447  Source: SharedObservable,
448  Source::Item: Send + Sync + 'static,
449  Scheduler: SharedScheduler,
450{
451  type Unsub = Source::Unsub;
452
453  fn actual_subscribe<
454    O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
455  >(
456    self,
457    subscriber: Subscriber<O, SharedSubscription>,
458  ) -> Self::Unsub {
459    self.source.actual_subscribe(Subscriber {
460      observer: BufferWithCountOrTimerObserverShared::new(
461        subscriber.observer,
462        self.count,
463        self.time,
464        self.scheduler,
465      ),
466      subscription: subscriber.subscription,
467    })
468  }
469}
470
471#[derive(Clone)]
472pub struct BufferWithCountOrTimerObserverShared<O, Item> {
473  observer: Arc<Mutex<O>>,
474  buffer: Arc<Mutex<Vec<Item>>>,
475  count: usize,
476  handle: SpawnHandle,
477}
478
479impl<O, Item> BufferWithCountOrTimerObserverShared<O, Item> {
480  fn new<S>(observer: O, count: usize, time: Duration, scheduler: S) -> Self
481  where
482    O: Observer<Item = Vec<Item>> + Send + Sync + 'static,
483    Item: Send + Sync + 'static,
484    S: SharedScheduler,
485  {
486    let observer = Arc::new(Mutex::new(observer));
487    let mut observer_c = observer.clone();
488
489    let buffer = Arc::new(Mutex::new(vec![]));
490    let buffer_c = buffer.clone();
491
492    let handle = scheduler.schedule_repeating(
493      move |_| {
494        let mut buffer = buffer_c.lock().unwrap();
495        if !buffer.is_empty() {
496          let buffer = std::mem::take(&mut *buffer);
497          observer_c.next(buffer);
498        }
499      },
500      time,
501      None,
502    );
503
504    BufferWithCountOrTimerObserverShared {
505      observer,
506      buffer,
507      count,
508      handle,
509    }
510  }
511}
512
513impl<O, Item, Err> Observer for BufferWithCountOrTimerObserverShared<O, Item>
514where
515  O: Observer<Item = Vec<Item>, Err = Err>,
516{
517  type Item = Item;
518  type Err = Err;
519
520  fn next(&mut self, value: Self::Item) {
521    let mut buffer = self.buffer.lock().unwrap();
522    (*buffer).push(value);
523
524    if buffer.len() >= self.count {
525      let buffer = std::mem::take(&mut *buffer);
526      self.observer.next(buffer);
527    }
528  }
529
530  fn error(&mut self, err: Self::Err) {
531    self.handle.unsubscribe();
532    self.observer.error(err);
533  }
534
535  complete_time_impl_shared!(buffer, observer, handle);
536
537  is_stopped_proxy_impl!(observer);
538}
539
540#[cfg(test)]
541mod tests {
542  use crate::prelude::*;
543  use futures::executor::{LocalPool, ThreadPool};
544  use std::cell::RefCell;
545  use std::rc::Rc;
546  use std::sync::atomic::{AtomicBool, Ordering};
547  use std::sync::{Arc, Mutex};
548  use std::time::Duration;
549
550  #[test]
551  fn it_shall_buffer_with_count() {
552    let expected =
553      vec![vec![0, 1], vec![2, 3], vec![4, 5], vec![6, 7], vec![8, 9]];
554    let mut actual = vec![];
555    observable::from_iter(0..10)
556      .buffer_with_count(2)
557      .subscribe(|vec| actual.push(vec));
558
559    assert_eq!(expected, actual);
560  }
561
562  #[test]
563  fn it_shall_buffer_with_count_shared() {
564    let expected =
565      vec![vec![0, 1], vec![2, 3], vec![4, 5], vec![6, 7], vec![8, 9]];
566    let actual = Arc::new(Mutex::new(vec![]));
567    let actual_c = actual.clone();
568    observable::from_iter(0..10)
569      .buffer_with_count(2)
570      .into_shared()
571      .subscribe(move |vec| actual_c.lock().unwrap().push(vec));
572
573    assert_eq!(expected, *actual.lock().unwrap());
574  }
575
576  #[test]
577  fn it_shall_emit_buffer_on_completed() {
578    let expected = vec![vec![0, 1], vec![2, 3], vec![4]];
579    let mut actual = vec![];
580
581    let is_completed = Rc::new(AtomicBool::new(false));
582    let is_completed_c = is_completed.clone();
583
584    observable::create(|mut subscriber| {
585      subscriber.next(0);
586      subscriber.next(1);
587      subscriber.next(2);
588      subscriber.next(3);
589      subscriber.next(4);
590      subscriber.complete();
591    })
592    .buffer_with_count(2)
593    .subscribe_complete(
594      |vec| actual.push(vec),
595      move || is_completed_c.store(true, Ordering::Relaxed),
596    );
597
598    assert_eq!(expected, actual);
599    assert!(is_completed.load(Ordering::Relaxed));
600  }
601
602  #[test]
603  fn it_shall_discard_buffer_on_error() {
604    let expected = vec![vec![0, 1], vec![2, 3]];
605    let mut actual = vec![];
606    let mut err_called = false;
607
608    observable::create(|mut subscriber| {
609      subscriber.next(0);
610      subscriber.next(1);
611      subscriber.next(2);
612      subscriber.next(3);
613      subscriber.next(4);
614      subscriber.error(());
615    })
616    .buffer_with_count(2)
617    .subscribe_err(|vec| actual.push(vec), |_| err_called = true);
618
619    assert_eq!(expected, actual);
620    assert!(err_called);
621  }
622
623  #[test]
624  fn it_shall_buffer_with_time_local() {
625    let mut local = LocalPool::new();
626
627    let expected = vec![vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]];
628    let actual = Rc::new(RefCell::new(vec![]));
629    let actual_c = actual.clone();
630
631    observable::from_iter(0..10)
632      .buffer_with_time(Duration::from_millis(500), local.spawner())
633      .subscribe(move |vec| actual_c.borrow_mut().push(vec));
634
635    local.run();
636
637    // this can't be really tested as local scheduler runs on a single thread
638    assert_eq!(expected, *actual.borrow());
639  }
640
641  #[test]
642  fn it_shall_not_block_with_error_on_time_local() {
643    let mut local = LocalPool::new();
644
645    observable::create(|mut subscriber| {
646      subscriber.next(0);
647      subscriber.next(1);
648      subscriber.next(2);
649      subscriber.error(());
650    })
651    .buffer_with_time(Duration::from_millis(500), local.spawner())
652    .subscribe(|_| {});
653
654    // if this call blocks execution, the observer's handle has not been
655    // unsubscribed
656    local.run();
657  }
658
659  #[test]
660  fn it_shall_buffer_with_time_shared() {
661    let pool = ThreadPool::new().unwrap();
662
663    let expected = vec![vec![0, 1, 2], vec![3, 4, 5, 6]];
664    let actual = Arc::new(Mutex::new(vec![]));
665    let actual_c = actual.clone();
666
667    let is_completed = Arc::new(AtomicBool::new(false));
668    let is_completed_c = is_completed.clone();
669
670    observable::create(|mut subscriber| {
671      let sleep = Duration::from_millis(100);
672      subscriber.next(0);
673      subscriber.next(1);
674      subscriber.next(2);
675      std::thread::sleep(sleep);
676      subscriber.next(3);
677      subscriber.next(4);
678      subscriber.next(5);
679      subscriber.next(6);
680      subscriber.complete();
681    })
682    .buffer_with_time(Duration::from_millis(50), pool)
683    .into_shared()
684    .subscribe_blocking_all(
685      move |vec| {
686        let mut a = actual_c.lock().unwrap();
687        (*a).push(vec);
688      },
689      |()| {},
690      move || is_completed_c.store(true, Ordering::Relaxed),
691    );
692
693    assert_eq!(expected, *actual.lock().unwrap());
694    assert!(is_completed.load(Ordering::Relaxed));
695  }
696
697  #[test]
698  fn it_shall_not_emit_buffer_with_time_on_error() {
699    let pool = ThreadPool::new().unwrap();
700
701    let expected = vec![vec![0, 1, 2]];
702    let actual = Arc::new(Mutex::new(vec![]));
703    let actual_c = actual.clone();
704
705    let error_called = Arc::new(AtomicBool::new(false));
706    let error_called_c = error_called.clone();
707
708    observable::create(|mut subscriber| {
709      let sleep = Duration::from_millis(100);
710      subscriber.next(0);
711      subscriber.next(1);
712      subscriber.next(2);
713      std::thread::sleep(sleep);
714      subscriber.next(3);
715      subscriber.next(4);
716      subscriber.error(());
717    })
718    .buffer_with_time(Duration::from_millis(50), pool)
719    .into_shared()
720    .subscribe_blocking_all(
721      move |vec| {
722        let mut a = actual_c.lock().unwrap();
723        (*a).push(vec);
724      },
725      move |_| error_called_c.store(true, Ordering::Relaxed),
726      || {},
727    );
728
729    assert_eq!(expected, *actual.lock().unwrap());
730    assert!(error_called.load(Ordering::Relaxed));
731  }
732
733  #[test]
734  fn it_shall_buffer_with_count_and_time() {
735    let mut local = LocalPool::new();
736
737    let expected =
738      vec![vec![0, 1], vec![2, 3], vec![4, 5], vec![6, 7], vec![8, 9]];
739    let actual = Rc::new(RefCell::new(vec![]));
740    let actual_c = actual.clone();
741
742    observable::from_iter(0..10)
743      .buffer_with_count_and_time(
744        2,
745        Duration::from_millis(500),
746        local.spawner(),
747      )
748      .subscribe(move |vec| actual_c.borrow_mut().push(vec));
749
750    local.run();
751
752    // this can't be really tested as local scheduler runs on a single thread
753    assert_eq!(expected, *actual.borrow());
754  }
755
756  #[test]
757  fn it_shall_buffer_with_count_and_time_on_error() {
758    let mut local = LocalPool::new();
759
760    let expected = vec![vec![0, 1]];
761    let actual = Rc::new(RefCell::new(vec![]));
762    let actual_c = actual.clone();
763
764    let error_called = Rc::new(AtomicBool::new(false));
765    let error_called_c = error_called.clone();
766
767    observable::create(|mut subscriber| {
768      subscriber.next(0);
769      subscriber.next(1);
770      subscriber.next(2);
771      subscriber.error(());
772      subscriber.next(3);
773      subscriber.next(4);
774    })
775    .buffer_with_count_and_time(2, Duration::from_millis(500), local.spawner())
776    .subscribe_err(
777      move |vec| actual_c.borrow_mut().push(vec),
778      move |_| error_called_c.store(true, Ordering::Relaxed),
779    );
780
781    local.run();
782
783    assert_eq!(expected, *actual.borrow());
784    assert!(error_called.load(Ordering::Relaxed));
785  }
786
787  #[test]
788  fn it_shall_buffer_with_count_or_time_shared() {
789    let pool = ThreadPool::new().unwrap();
790
791    let expected = vec![vec![0, 1], vec![2], vec![3, 4]];
792    let actual = Arc::new(Mutex::new(vec![]));
793    let actual_c = actual.clone();
794
795    let is_completed = Arc::new(AtomicBool::new(false));
796    let is_completed_c = is_completed.clone();
797
798    observable::create(|mut subscriber| {
799      let sleep = Duration::from_millis(100);
800      subscriber.next(0);
801      subscriber.next(1);
802      subscriber.next(2);
803      std::thread::sleep(sleep);
804      subscriber.next(3);
805      subscriber.next(4);
806      subscriber.complete();
807    })
808    .buffer_with_count_and_time(2, Duration::from_millis(50), pool)
809    .into_shared()
810    .subscribe_blocking_all(
811      move |vec| {
812        let mut a = actual_c.lock().unwrap();
813        (*a).push(vec);
814      },
815      |()| {},
816      move || is_completed_c.store(true, Ordering::Relaxed),
817    );
818
819    assert_eq!(expected, *actual.lock().unwrap());
820    assert!(is_completed.load(Ordering::Relaxed));
821  }
822
823  #[test]
824  fn it_shall_buffer_with_count_or_time_shared_on_error() {
825    let pool = ThreadPool::new().unwrap();
826
827    let expected = vec![vec![0, 1], vec![2]];
828    let actual = Arc::new(Mutex::new(vec![]));
829    let actual_c = actual.clone();
830
831    let error_called = Arc::new(AtomicBool::new(false));
832    let error_called_c = error_called.clone();
833
834    observable::create(|mut subscriber| {
835      let sleep = Duration::from_millis(100);
836      subscriber.next(0);
837      subscriber.next(1);
838      subscriber.next(2);
839      std::thread::sleep(sleep);
840      subscriber.next(3);
841      subscriber.error(());
842      subscriber.next(4);
843    })
844    .buffer_with_count_and_time(2, Duration::from_millis(50), pool)
845    .into_shared()
846    .subscribe_blocking_all(
847      move |vec| {
848        let mut a = actual_c.lock().unwrap();
849        (*a).push(vec);
850      },
851      move |_| error_called_c.store(true, Ordering::Relaxed),
852      || {},
853    );
854
855    assert_eq!(expected, *actual.lock().unwrap());
856    assert!(error_called.load(Ordering::Relaxed));
857  }
858}