rx_rust/operators/conditional_boolean/
all.rs1use crate::utils::types::{MarkerType, NecessarySendSync};
2use crate::utils::unsub_after_termination::subscribe_unsub_after_termination;
3use crate::{
4 disposable::subscription::Subscription,
5 observable::Observable,
6 observer::{Observer, Termination},
7};
8use educe::Educe;
9use std::marker::PhantomData;
10
11#[derive(Educe)]
38#[educe(Debug, Clone)]
39pub struct All<T, OE, F> {
40 source: OE,
41 callback: F,
42 _marker: MarkerType<T>,
43}
44
45impl<T, OE, F> All<T, OE, F> {
46 pub fn new<'or, 'sub, E>(source: OE, callback: F) -> Self
47 where
48 OE: Observable<'or, 'sub, T, E>,
49 F: FnMut(T) -> bool,
50 {
51 Self {
52 source,
53 callback,
54 _marker: PhantomData,
55 }
56 }
57}
58
59impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, bool, E> for All<T, OE, F>
60where
61 OE: Observable<'or, 'sub, T, E>,
62 F: FnMut(T) -> bool + NecessarySendSync + 'or,
63 'sub: 'or,
64{
65 fn subscribe(
66 self,
67 observer: impl Observer<bool, E> + NecessarySendSync + 'or,
68 ) -> Subscription<'sub> {
69 subscribe_unsub_after_termination(observer, |observer| {
70 let observer = AllObserver {
71 observer: Some(observer),
72 callback: self.callback,
73 };
74 self.source.subscribe(observer)
75 })
76 }
77}
78
79struct AllObserver<OR, F> {
80 observer: Option<OR>,
81 callback: F,
82}
83
84impl<T, E, OR, F> Observer<T, E> for AllObserver<OR, F>
85where
86 OR: Observer<bool, E>,
87 F: FnMut(T) -> bool,
88{
89 fn on_next(&mut self, value: T) {
90 if !(self.callback)(value)
91 && let Some(mut observer) = self.observer.take()
92 {
93 observer.on_next(false);
94 observer.on_termination(Termination::Completed);
95 }
96 }
97
98 fn on_termination(mut self, termination: Termination<E>) {
99 if let Some(mut observer) = self.observer.take() {
100 match termination {
101 Termination::Completed => observer.on_next(true),
102 Termination::Error(_) => {}
103 }
104 observer.on_termination(termination);
105 }
106 }
107}