rx_rust/operators/conditional_boolean/
contains.rs1use crate::utils::types::NecessarySendSync;
2use crate::utils::unsub_after_termination::subscribe_unsub_after_termination;
3use crate::{
4 disposable::subscription::Subscription,
5 observable::Observable,
6 observer::{Observer, Termination},
7};
8use educe::Educe;
9
10#[derive(Educe)]
37#[educe(Debug, Clone)]
38pub struct Contains<T, OE> {
39 source: OE,
40 item: T,
41}
42
43impl<T, OE> Contains<T, OE> {
44 pub fn new<'or, 'sub, E>(source: OE, item: T) -> Self
45 where
46 OE: Observable<'or, 'sub, T, E>,
47 {
48 Self { source, item }
49 }
50}
51
52impl<'or, 'sub, T, E, OE> Observable<'or, 'sub, bool, E> for Contains<T, OE>
53where
54 OE: Observable<'or, 'sub, T, E>,
55 T: PartialEq + NecessarySendSync + 'or,
56 'sub: 'or,
57{
58 fn subscribe(
59 self,
60 observer: impl Observer<bool, E> + NecessarySendSync + 'or,
61 ) -> Subscription<'sub> {
62 subscribe_unsub_after_termination(observer, |observer| {
63 let observer = ContainsObserver {
64 observer: Some(observer),
65 item: self.item,
66 };
67 self.source.subscribe(observer)
68 })
69 }
70}
71
72struct ContainsObserver<T, OR> {
73 observer: Option<OR>,
74 item: T,
75}
76
77impl<T, E, OR> Observer<T, E> for ContainsObserver<T, OR>
78where
79 OR: Observer<bool, E>,
80 T: PartialEq,
81{
82 fn on_next(&mut self, value: T) {
83 if self.item == value
84 && let Some(mut observer) = self.observer.take()
85 {
86 observer.on_next(true);
87 observer.on_termination(Termination::Completed);
88 }
89 }
90
91 fn on_termination(mut self, termination: Termination<E>) {
92 if let Some(mut observer) = self.observer.take() {
93 match termination {
94 Termination::Completed => observer.on_next(false),
95 Termination::Error(_) => {}
96 }
97 observer.on_termination(termination);
98 }
99 }
100}