1use crate::prelude::*;
2use crate::{error_proxy_impl, is_stopped_proxy_impl};
3use std::cell::RefCell;
4use std::rc::Rc;
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7
8#[derive(Clone)]
9pub struct BufferWithCountOp<S> {
10 pub(crate) source: S,
11 pub(crate) count: usize,
12}
13
14#[doc(hidden)]
15macro_rules! buffer_op_observable_impl {
16 ($ty: ident, $host: ident$(, $lf: lifetime)?$(, $generics: ident) *) => {
17 impl<$($lf, )? $host, $($generics ,)*> Observable
18 for $ty<$($lf, )? $host, $($generics ,)*>
19 where
20 $host: Observable
21 {
22 type Item = Vec<$host::Item>;
23 type Err = $host::Err;
24 }
25 }
26}
27
28buffer_op_observable_impl!(BufferWithCountOp, S);
29
30impl<'a, S> LocalObservable<'a> for BufferWithCountOp<S>
31where
32 S: LocalObservable<'a>,
33 S::Item: 'a,
34{
35 type Unsub = S::Unsub;
36
37 fn actual_subscribe<O: Observer<Item = Self::Item, Err = Self::Err> + 'a>(
38 self,
39 subscriber: Subscriber<O, LocalSubscription>,
40 ) -> Self::Unsub {
41 self.source.actual_subscribe(Subscriber {
42 observer: BufferWithCountObserver::new(subscriber.observer, self.count),
43 subscription: subscriber.subscription,
44 })
45 }
46}
47
48impl<S> SharedObservable for BufferWithCountOp<S>
49where
50 S: SharedObservable,
51 S::Item: Send + Sync + 'static,
52{
53 type Unsub = S::Unsub;
54
55 fn actual_subscribe<
56 O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
57 >(
58 self,
59 subscriber: Subscriber<O, SharedSubscription>,
60 ) -> Self::Unsub {
61 self.source.actual_subscribe(Subscriber {
62 observer: BufferWithCountObserver::new(subscriber.observer, self.count),
63 subscription: subscriber.subscription,
64 })
65 }
66}
67
68#[derive(Clone)]
69pub struct BufferWithCountObserver<O, Item> {
70 observer: O,
71 buffer: Vec<Item>,
72 count: usize,
73}
74
75impl<O, Item> BufferWithCountObserver<O, Item> {
76 fn new(observer: O, count: usize) -> BufferWithCountObserver<O, Item> {
77 BufferWithCountObserver {
78 observer,
79 buffer: vec![],
80 count,
81 }
82 }
83}
84
85impl<O, Item, Err> Observer for BufferWithCountObserver<O, Item>
86where
87 O: Observer<Item = Vec<Item>, Err = Err>,
88{
89 type Item = Item;
90 type Err = Err;
91
92 fn next(&mut self, value: Self::Item) {
93 self.buffer.push(value);
94
95 if self.buffer.len() >= self.count {
96 let buffer = std::mem::take(&mut self.buffer);
97 self.observer.next(buffer);
98 }
99 }
100
101 fn complete(&mut self) {
102 if !self.buffer.is_empty() {
103 let buffer = std::mem::take(&mut self.buffer);
104 self.observer.next(buffer);
105 }
106
107 self.observer.complete();
108 }
109
110 error_proxy_impl!(Err, observer);
111
112 is_stopped_proxy_impl!(observer);
113}
114
115#[derive(Clone)]
116pub struct BufferWithTimeOp<Source, Scheduler> {
117 pub(crate) source: Source,
118 pub(crate) time: Duration,
119 pub(crate) scheduler: Scheduler,
120}
121
122buffer_op_observable_impl!(BufferWithTimeOp, S, Scheduler);
123
124impl<Source, Scheduler> LocalObservable<'static>
125 for BufferWithTimeOp<Source, Scheduler>
126where
127 Source: LocalObservable<'static>,
128 Source::Item: 'static,
129 Scheduler: LocalScheduler + 'static,
130{
131 type Unsub = Source::Unsub;
132
133 fn actual_subscribe<
134 O: Observer<Item = Self::Item, Err = Self::Err> + 'static,
135 >(
136 self,
137 subscriber: Subscriber<O, LocalSubscription>,
138 ) -> Self::Unsub {
139 self.source.actual_subscribe(Subscriber {
140 observer: BufferWithTimeObserver::new(
141 subscriber.observer,
142 self.time,
143 self.scheduler,
144 ),
145 subscription: subscriber.subscription,
146 })
147 }
148}
149
150#[derive(Clone)]
151pub struct BufferWithTimeObserver<O, Item> {
152 observer: Rc<RefCell<O>>,
153 buffer: Rc<RefCell<Vec<Item>>>,
154 handle: SpawnHandle,
155}
156
157impl<O, Item> BufferWithTimeObserver<O, Item>
158where
159 O: Observer<Item = Vec<Item>> + 'static,
160 Item: 'static,
161{
162 fn new<S>(
163 observer: O,
164 time: Duration,
165 scheduler: S,
166 ) -> BufferWithTimeObserver<O, Item>
167 where
168 S: LocalScheduler + 'static,
169 {
170 let observer = Rc::new(RefCell::new(observer));
171 let mut observer_c = observer.clone();
172
173 let buffer = Rc::new(RefCell::new(vec![]));
174 let buffer_c = buffer.clone();
175
176 let handle = scheduler.schedule_repeating(
177 move |_| {
178 if !buffer_c.borrow().is_empty() {
179 observer_c.next(buffer_c.take());
180 }
181 },
182 time,
183 None,
184 );
185
186 BufferWithTimeObserver {
187 observer,
188 buffer,
189 handle,
190 }
191 }
192}
193
194#[doc(hidden)]
195macro_rules! complete_time_impl_local {
196 ($buffer:tt, $observer:tt, $handle:tt) => {
197 fn complete(&mut self) {
198 let buffer = self.$buffer.take();
199 if !buffer.is_empty() {
200 self.$observer.next(buffer);
201 }
202
203 self.$handle.unsubscribe();
204 self.$observer.complete();
205 }
206 };
207}
208
209impl<O, Item, Err> Observer for BufferWithTimeObserver<O, Item>
210where
211 O: Observer<Item = Vec<Item>, Err = Err>,
212{
213 type Item = Item;
214 type Err = Err;
215
216 fn next(&mut self, value: Self::Item) {
217 self.buffer.borrow_mut().push(value);
218 }
219
220 fn error(&mut self, err: Self::Err) {
221 self.handle.unsubscribe();
222 self.observer.error(err);
223 }
224
225 complete_time_impl_local!(buffer, observer, handle);
226
227 is_stopped_proxy_impl!(observer);
228}
229
230impl<Source, Scheduler> SharedObservable for BufferWithTimeOp<Source, Scheduler>
231where
232 Source: SharedObservable,
233 <Source as Observable>::Item: Send + Sync + 'static,
234 Scheduler: SharedScheduler,
235{
236 type Unsub = Source::Unsub;
237
238 fn actual_subscribe<O>(
239 self,
240 subscriber: Subscriber<O, SharedSubscription>,
241 ) -> Self::Unsub
242 where
243 O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
244 {
245 self.source.actual_subscribe(Subscriber {
246 observer: BufferWithTimeObserverShared::new(
247 subscriber.observer,
248 self.time,
249 self.scheduler,
250 ),
251 subscription: subscriber.subscription,
252 })
253 }
254}
255
256#[derive(Clone)]
257pub struct BufferWithTimeObserverShared<O, Item> {
258 observer: Arc<Mutex<O>>,
259 buffer: Arc<Mutex<Vec<Item>>>,
260 handle: SpawnHandle,
261}
262
263impl<O, Item> BufferWithTimeObserverShared<O, Item>
264where
265 O: Observer<Item = Vec<Item>> + Send + Sync + 'static,
266 Item: Send + Sync + 'static,
267{
268 fn new<S>(
269 observer: O,
270 time: Duration,
271 scheduler: S,
272 ) -> BufferWithTimeObserverShared<O, Item>
273 where
274 S: SharedScheduler,
275 {
276 let observer = Arc::new(Mutex::new(observer));
277 let mut observer_c = observer.clone();
278
279 let buffer = Arc::new(Mutex::new(vec![]));
280 let buffer_c = buffer.clone();
281
282 let handle = scheduler.schedule_repeating(
283 move |_| {
284 let mut buffer = buffer_c.lock().unwrap();
285 let buffer = std::mem::take(&mut *buffer);
286 if !buffer.is_empty() {
287 observer_c.next(buffer);
288 }
289 },
290 time,
291 None,
292 );
293
294 BufferWithTimeObserverShared {
295 observer,
296 buffer,
297 handle,
298 }
299 }
300}
301
302#[doc(hidden)]
303macro_rules! complete_time_impl_shared {
304 ($buffer:tt, $observer:tt, $handle:tt) => {
305 fn complete(&mut self) {
306 let mut buffer = self.$buffer.lock().unwrap();
307 let buffer = std::mem::take(&mut *buffer);
308
309 if !buffer.is_empty() {
310 self.$observer.next(buffer);
311 }
312
313 self.$handle.unsubscribe();
314 self.$observer.complete();
315 }
316 };
317}
318
319impl<O, Item, Err> Observer for BufferWithTimeObserverShared<O, Item>
320where
321 O: Observer<Item = Vec<Item>, Err = Err>,
322{
323 type Item = Item;
324 type Err = Err;
325
326 fn next(&mut self, value: Self::Item) {
327 let mut buffer = self.buffer.lock().unwrap();
328 (*buffer).push(value);
329 }
330
331 fn error(&mut self, err: Self::Err) {
332 self.handle.unsubscribe();
333 self.observer.error(err);
334 }
335
336 complete_time_impl_shared!(buffer, observer, handle);
337
338 is_stopped_proxy_impl!(observer);
339}
340
341#[derive(Clone)]
342pub struct BufferWithCountOrTimerOp<Source, Scheduler> {
343 pub(crate) source: Source,
344 pub(crate) count: usize,
345 pub(crate) time: Duration,
346 pub(crate) scheduler: Scheduler,
347}
348
349buffer_op_observable_impl!(BufferWithCountOrTimerOp, S, Scheduler);
350
351impl<Source, Scheduler> LocalObservable<'static>
352 for BufferWithCountOrTimerOp<Source, Scheduler>
353where
354 Source: LocalObservable<'static>,
355 Source::Item: 'static,
356 Scheduler: LocalScheduler + 'static,
357{
358 type Unsub = Source::Unsub;
359
360 fn actual_subscribe<
361 O: Observer<Item = Self::Item, Err = Self::Err> + 'static,
362 >(
363 self,
364 subscriber: Subscriber<O, LocalSubscription>,
365 ) -> Self::Unsub {
366 self.source.actual_subscribe(Subscriber {
367 observer: BufferWithCountOrTimerObserver::new(
368 subscriber.observer,
369 self.count,
370 self.time,
371 self.scheduler,
372 ),
373 subscription: subscriber.subscription,
374 })
375 }
376}
377
378#[derive(Clone)]
379pub struct BufferWithCountOrTimerObserver<O, Item> {
380 observer: Rc<RefCell<O>>,
381 buffer: Rc<RefCell<Vec<Item>>>,
382 count: usize,
383 handle: SpawnHandle,
384}
385
386impl<O, Item> BufferWithCountOrTimerObserver<O, Item> {
387 fn new<S>(observer: O, count: usize, time: Duration, scheduler: S) -> Self
388 where
389 O: Observer<Item = Vec<Item>> + 'static,
390 Item: 'static,
391 S: LocalScheduler + 'static,
392 {
393 let observer = Rc::new(RefCell::new(observer));
394 let mut observer_c = observer.clone();
395
396 let buffer = Rc::new(RefCell::new(vec![]));
397 let buffer_c = buffer.clone();
398
399 let handle = scheduler.schedule_repeating(
400 move |_| {
401 if buffer_c.borrow().is_empty() {
402 observer_c.next(buffer_c.take());
403 }
404 },
405 time,
406 None,
407 );
408
409 BufferWithCountOrTimerObserver {
410 observer,
411 buffer,
412 count,
413 handle,
414 }
415 }
416}
417
418impl<O, Item, Err> Observer for BufferWithCountOrTimerObserver<O, Item>
419where
420 O: Observer<Item = Vec<Item>, Err = Err>,
421{
422 type Item = Item;
423 type Err = Err;
424
425 fn next(&mut self, value: Self::Item) {
426 self.buffer.borrow_mut().push(value);
427
428 if self.buffer.borrow().len() >= self.count {
429 let buffer = self.buffer.take();
430 self.observer.borrow_mut().next(buffer);
431 }
432 }
433
434 fn error(&mut self, err: Self::Err) {
435 self.handle.unsubscribe();
436 self.observer.error(err);
437 }
438
439 complete_time_impl_local!(buffer, observer, handle);
440
441 is_stopped_proxy_impl!(observer);
442}
443
444impl<Source, Scheduler> SharedObservable
445 for BufferWithCountOrTimerOp<Source, Scheduler>
446where
447 Source: SharedObservable,
448 Source::Item: Send + Sync + 'static,
449 Scheduler: SharedScheduler,
450{
451 type Unsub = Source::Unsub;
452
453 fn actual_subscribe<
454 O: Observer<Item = Self::Item, Err = Self::Err> + Sync + Send + 'static,
455 >(
456 self,
457 subscriber: Subscriber<O, SharedSubscription>,
458 ) -> Self::Unsub {
459 self.source.actual_subscribe(Subscriber {
460 observer: BufferWithCountOrTimerObserverShared::new(
461 subscriber.observer,
462 self.count,
463 self.time,
464 self.scheduler,
465 ),
466 subscription: subscriber.subscription,
467 })
468 }
469}
470
471#[derive(Clone)]
472pub struct BufferWithCountOrTimerObserverShared<O, Item> {
473 observer: Arc<Mutex<O>>,
474 buffer: Arc<Mutex<Vec<Item>>>,
475 count: usize,
476 handle: SpawnHandle,
477}
478
479impl<O, Item> BufferWithCountOrTimerObserverShared<O, Item> {
480 fn new<S>(observer: O, count: usize, time: Duration, scheduler: S) -> Self
481 where
482 O: Observer<Item = Vec<Item>> + Send + Sync + 'static,
483 Item: Send + Sync + 'static,
484 S: SharedScheduler,
485 {
486 let observer = Arc::new(Mutex::new(observer));
487 let mut observer_c = observer.clone();
488
489 let buffer = Arc::new(Mutex::new(vec![]));
490 let buffer_c = buffer.clone();
491
492 let handle = scheduler.schedule_repeating(
493 move |_| {
494 let mut buffer = buffer_c.lock().unwrap();
495 if !buffer.is_empty() {
496 let buffer = std::mem::take(&mut *buffer);
497 observer_c.next(buffer);
498 }
499 },
500 time,
501 None,
502 );
503
504 BufferWithCountOrTimerObserverShared {
505 observer,
506 buffer,
507 count,
508 handle,
509 }
510 }
511}
512
513impl<O, Item, Err> Observer for BufferWithCountOrTimerObserverShared<O, Item>
514where
515 O: Observer<Item = Vec<Item>, Err = Err>,
516{
517 type Item = Item;
518 type Err = Err;
519
520 fn next(&mut self, value: Self::Item) {
521 let mut buffer = self.buffer.lock().unwrap();
522 (*buffer).push(value);
523
524 if buffer.len() >= self.count {
525 let buffer = std::mem::take(&mut *buffer);
526 self.observer.next(buffer);
527 }
528 }
529
530 fn error(&mut self, err: Self::Err) {
531 self.handle.unsubscribe();
532 self.observer.error(err);
533 }
534
535 complete_time_impl_shared!(buffer, observer, handle);
536
537 is_stopped_proxy_impl!(observer);
538}
539
540#[cfg(test)]
541mod tests {
542 use crate::prelude::*;
543 use futures::executor::{LocalPool, ThreadPool};
544 use std::cell::RefCell;
545 use std::rc::Rc;
546 use std::sync::atomic::{AtomicBool, Ordering};
547 use std::sync::{Arc, Mutex};
548 use std::time::Duration;
549
550 #[test]
551 fn it_shall_buffer_with_count() {
552 let expected =
553 vec![vec![0, 1], vec![2, 3], vec![4, 5], vec![6, 7], vec![8, 9]];
554 let mut actual = vec![];
555 observable::from_iter(0..10)
556 .buffer_with_count(2)
557 .subscribe(|vec| actual.push(vec));
558
559 assert_eq!(expected, actual);
560 }
561
562 #[test]
563 fn it_shall_buffer_with_count_shared() {
564 let expected =
565 vec![vec![0, 1], vec![2, 3], vec![4, 5], vec![6, 7], vec![8, 9]];
566 let actual = Arc::new(Mutex::new(vec![]));
567 let actual_c = actual.clone();
568 observable::from_iter(0..10)
569 .buffer_with_count(2)
570 .into_shared()
571 .subscribe(move |vec| actual_c.lock().unwrap().push(vec));
572
573 assert_eq!(expected, *actual.lock().unwrap());
574 }
575
576 #[test]
577 fn it_shall_emit_buffer_on_completed() {
578 let expected = vec![vec![0, 1], vec![2, 3], vec![4]];
579 let mut actual = vec![];
580
581 let is_completed = Rc::new(AtomicBool::new(false));
582 let is_completed_c = is_completed.clone();
583
584 observable::create(|mut subscriber| {
585 subscriber.next(0);
586 subscriber.next(1);
587 subscriber.next(2);
588 subscriber.next(3);
589 subscriber.next(4);
590 subscriber.complete();
591 })
592 .buffer_with_count(2)
593 .subscribe_complete(
594 |vec| actual.push(vec),
595 move || is_completed_c.store(true, Ordering::Relaxed),
596 );
597
598 assert_eq!(expected, actual);
599 assert!(is_completed.load(Ordering::Relaxed));
600 }
601
602 #[test]
603 fn it_shall_discard_buffer_on_error() {
604 let expected = vec![vec![0, 1], vec![2, 3]];
605 let mut actual = vec![];
606 let mut err_called = false;
607
608 observable::create(|mut subscriber| {
609 subscriber.next(0);
610 subscriber.next(1);
611 subscriber.next(2);
612 subscriber.next(3);
613 subscriber.next(4);
614 subscriber.error(());
615 })
616 .buffer_with_count(2)
617 .subscribe_err(|vec| actual.push(vec), |_| err_called = true);
618
619 assert_eq!(expected, actual);
620 assert!(err_called);
621 }
622
623 #[test]
624 fn it_shall_buffer_with_time_local() {
625 let mut local = LocalPool::new();
626
627 let expected = vec![vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]];
628 let actual = Rc::new(RefCell::new(vec![]));
629 let actual_c = actual.clone();
630
631 observable::from_iter(0..10)
632 .buffer_with_time(Duration::from_millis(500), local.spawner())
633 .subscribe(move |vec| actual_c.borrow_mut().push(vec));
634
635 local.run();
636
637 assert_eq!(expected, *actual.borrow());
639 }
640
641 #[test]
642 fn it_shall_not_block_with_error_on_time_local() {
643 let mut local = LocalPool::new();
644
645 observable::create(|mut subscriber| {
646 subscriber.next(0);
647 subscriber.next(1);
648 subscriber.next(2);
649 subscriber.error(());
650 })
651 .buffer_with_time(Duration::from_millis(500), local.spawner())
652 .subscribe(|_| {});
653
654 local.run();
657 }
658
659 #[test]
660 fn it_shall_buffer_with_time_shared() {
661 let pool = ThreadPool::new().unwrap();
662
663 let expected = vec![vec![0, 1, 2], vec![3, 4, 5, 6]];
664 let actual = Arc::new(Mutex::new(vec![]));
665 let actual_c = actual.clone();
666
667 let is_completed = Arc::new(AtomicBool::new(false));
668 let is_completed_c = is_completed.clone();
669
670 observable::create(|mut subscriber| {
671 let sleep = Duration::from_millis(100);
672 subscriber.next(0);
673 subscriber.next(1);
674 subscriber.next(2);
675 std::thread::sleep(sleep);
676 subscriber.next(3);
677 subscriber.next(4);
678 subscriber.next(5);
679 subscriber.next(6);
680 subscriber.complete();
681 })
682 .buffer_with_time(Duration::from_millis(50), pool)
683 .into_shared()
684 .subscribe_blocking_all(
685 move |vec| {
686 let mut a = actual_c.lock().unwrap();
687 (*a).push(vec);
688 },
689 |()| {},
690 move || is_completed_c.store(true, Ordering::Relaxed),
691 );
692
693 assert_eq!(expected, *actual.lock().unwrap());
694 assert!(is_completed.load(Ordering::Relaxed));
695 }
696
697 #[test]
698 fn it_shall_not_emit_buffer_with_time_on_error() {
699 let pool = ThreadPool::new().unwrap();
700
701 let expected = vec![vec![0, 1, 2]];
702 let actual = Arc::new(Mutex::new(vec![]));
703 let actual_c = actual.clone();
704
705 let error_called = Arc::new(AtomicBool::new(false));
706 let error_called_c = error_called.clone();
707
708 observable::create(|mut subscriber| {
709 let sleep = Duration::from_millis(100);
710 subscriber.next(0);
711 subscriber.next(1);
712 subscriber.next(2);
713 std::thread::sleep(sleep);
714 subscriber.next(3);
715 subscriber.next(4);
716 subscriber.error(());
717 })
718 .buffer_with_time(Duration::from_millis(50), pool)
719 .into_shared()
720 .subscribe_blocking_all(
721 move |vec| {
722 let mut a = actual_c.lock().unwrap();
723 (*a).push(vec);
724 },
725 move |_| error_called_c.store(true, Ordering::Relaxed),
726 || {},
727 );
728
729 assert_eq!(expected, *actual.lock().unwrap());
730 assert!(error_called.load(Ordering::Relaxed));
731 }
732
733 #[test]
734 fn it_shall_buffer_with_count_and_time() {
735 let mut local = LocalPool::new();
736
737 let expected =
738 vec![vec![0, 1], vec![2, 3], vec![4, 5], vec![6, 7], vec![8, 9]];
739 let actual = Rc::new(RefCell::new(vec![]));
740 let actual_c = actual.clone();
741
742 observable::from_iter(0..10)
743 .buffer_with_count_and_time(
744 2,
745 Duration::from_millis(500),
746 local.spawner(),
747 )
748 .subscribe(move |vec| actual_c.borrow_mut().push(vec));
749
750 local.run();
751
752 assert_eq!(expected, *actual.borrow());
754 }
755
756 #[test]
757 fn it_shall_buffer_with_count_and_time_on_error() {
758 let mut local = LocalPool::new();
759
760 let expected = vec![vec![0, 1]];
761 let actual = Rc::new(RefCell::new(vec![]));
762 let actual_c = actual.clone();
763
764 let error_called = Rc::new(AtomicBool::new(false));
765 let error_called_c = error_called.clone();
766
767 observable::create(|mut subscriber| {
768 subscriber.next(0);
769 subscriber.next(1);
770 subscriber.next(2);
771 subscriber.error(());
772 subscriber.next(3);
773 subscriber.next(4);
774 })
775 .buffer_with_count_and_time(2, Duration::from_millis(500), local.spawner())
776 .subscribe_err(
777 move |vec| actual_c.borrow_mut().push(vec),
778 move |_| error_called_c.store(true, Ordering::Relaxed),
779 );
780
781 local.run();
782
783 assert_eq!(expected, *actual.borrow());
784 assert!(error_called.load(Ordering::Relaxed));
785 }
786
787 #[test]
788 fn it_shall_buffer_with_count_or_time_shared() {
789 let pool = ThreadPool::new().unwrap();
790
791 let expected = vec![vec![0, 1], vec![2], vec![3, 4]];
792 let actual = Arc::new(Mutex::new(vec![]));
793 let actual_c = actual.clone();
794
795 let is_completed = Arc::new(AtomicBool::new(false));
796 let is_completed_c = is_completed.clone();
797
798 observable::create(|mut subscriber| {
799 let sleep = Duration::from_millis(100);
800 subscriber.next(0);
801 subscriber.next(1);
802 subscriber.next(2);
803 std::thread::sleep(sleep);
804 subscriber.next(3);
805 subscriber.next(4);
806 subscriber.complete();
807 })
808 .buffer_with_count_and_time(2, Duration::from_millis(50), pool)
809 .into_shared()
810 .subscribe_blocking_all(
811 move |vec| {
812 let mut a = actual_c.lock().unwrap();
813 (*a).push(vec);
814 },
815 |()| {},
816 move || is_completed_c.store(true, Ordering::Relaxed),
817 );
818
819 assert_eq!(expected, *actual.lock().unwrap());
820 assert!(is_completed.load(Ordering::Relaxed));
821 }
822
823 #[test]
824 fn it_shall_buffer_with_count_or_time_shared_on_error() {
825 let pool = ThreadPool::new().unwrap();
826
827 let expected = vec![vec![0, 1], vec![2]];
828 let actual = Arc::new(Mutex::new(vec![]));
829 let actual_c = actual.clone();
830
831 let error_called = Arc::new(AtomicBool::new(false));
832 let error_called_c = error_called.clone();
833
834 observable::create(|mut subscriber| {
835 let sleep = Duration::from_millis(100);
836 subscriber.next(0);
837 subscriber.next(1);
838 subscriber.next(2);
839 std::thread::sleep(sleep);
840 subscriber.next(3);
841 subscriber.error(());
842 subscriber.next(4);
843 })
844 .buffer_with_count_and_time(2, Duration::from_millis(50), pool)
845 .into_shared()
846 .subscribe_blocking_all(
847 move |vec| {
848 let mut a = actual_c.lock().unwrap();
849 (*a).push(vec);
850 },
851 move |_| error_called_c.store(true, Ordering::Relaxed),
852 || {},
853 );
854
855 assert_eq!(expected, *actual.lock().unwrap());
856 assert!(error_called.load(Ordering::Relaxed));
857 }
858}