rx_rust/operators/conditional_boolean/
amb.rs

1use crate::disposable::Disposable;
2use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
3use crate::{
4    disposable::subscription::Subscription,
5    observable::Observable,
6    observer::{Observer, Termination},
7};
8use crate::{safe_lock, safe_lock_option, safe_lock_slot_map};
9use educe::Educe;
10use slotmap::{DefaultKey, SlotMap};
11
12/// Given two or more source Observables, emit all of the items from only the first of these Observables to emit an item or notification.
13/// See <https://reactivex.io/documentation/operators/amb.html>
14///
15/// # Examples
16/// ```rust
17/// use rx_rust::{
18///     observable::observable_ext::ObservableExt,
19///     observer::Termination,
20///     operators::{
21///         conditional_boolean::amb::Amb,
22///         creating::from_iter::FromIter,
23///     },
24/// };
25///
26/// let mut values = Vec::new();
27/// let mut terminations = Vec::new();
28///
29/// let observable = Amb::new([
30///     FromIter::new(vec![1, 2]),
31///     FromIter::new(vec![3, 4]),
32/// ]);
33/// observable.subscribe_with_callback(
34///     |value| values.push(value),
35///     |termination| terminations.push(termination),
36/// );
37///
38/// assert_eq!(values, vec![1, 2]);
39/// assert_eq!(terminations, vec![Termination::Completed]);
40/// ```
41#[derive(Educe)]
42#[educe(Debug, Clone)]
43pub struct Amb<I> {
44    sources: I,
45}
46
47impl<I> Amb<I> {
48    pub fn new(sources: I) -> Self {
49        Self { sources }
50    }
51}
52
53impl<'or, 'sub, T, E, OE, I> Observable<'or, 'sub, T, E> for Amb<I>
54where
55    'sub: 'or,
56    I: IntoIterator<Item = OE>,
57    OE: Observable<'or, 'sub, T, E>,
58{
59    fn subscribe(
60        self,
61        observer: impl Observer<T, E> + NecessarySendSync + 'or,
62    ) -> Subscription<'sub> {
63        let observer = Shared::new(Mutable::new(Some(observer)));
64
65        let mut slop_map = SlotMap::new();
66        let sources_and_key: Vec<_> = self
67            .sources
68            .into_iter()
69            .map(|e| {
70                let key = slop_map.insert(Subscription::default()); // Insert placeholder
71                (e, key)
72            })
73            .collect();
74        let context = Shared::new(Mutable::new(AmbContext {
75            subscriptions: slop_map,
76        }));
77
78        for (source, key) in sources_and_key {
79            if safe_lock_option!(is_none: observer) {
80                // determined
81                break;
82            }
83            let amb_observer = AmbObserver {
84                observer: observer.clone(),
85                context: context.clone(),
86                key,
87                determined_observer: None,
88            };
89            let sub = source.subscribe(amb_observer);
90            safe_lock_slot_map!(replace: context, subscriptions, key, sub);
91        }
92
93        Subscription::new_with_disposal(context)
94    }
95}
96
97struct AmbContext<'sub> {
98    subscriptions: SlotMap<DefaultKey, Subscription<'sub>>,
99}
100
101impl Disposable for Shared<Mutable<AmbContext<'_>>> {
102    fn dispose(self) {
103        safe_lock!(mem_take: self, subscriptions);
104    }
105}
106
107struct AmbObserver<'sub, OR> {
108    observer: Shared<Mutable<Option<OR>>>,
109    context: Shared<Mutable<AmbContext<'sub>>>,
110    key: DefaultKey,
111    determined_observer: Option<OR>, // Some means this AmbObserver is the first. None means this AmbObserver is not the first or not determined yet.
112}
113
114impl<'sub, T, E, OR> Observer<T, E> for AmbObserver<'sub, OR>
115where
116    OR: Observer<T, E>,
117{
118    fn on_next(&mut self, value: T) {
119        if let Some(observer) = self.determined_observer.as_mut() {
120            observer.on_next(value);
121            return;
122        }
123        if let Some(observer) = safe_lock_option!(take: self.observer) {
124            self.determined_observer = Some(observer);
125            drop_none_matched_subscriptions(&self.context, self.key);
126            self.on_next(value);
127        }
128    }
129
130    fn on_termination(self, termination: Termination<E>) {
131        if let Some(observer) = self.determined_observer {
132            observer.on_termination(termination);
133            return;
134        }
135        if let Some(observer) = safe_lock_option!(take: self.observer) {
136            drop_none_matched_subscriptions(&self.context, self.key);
137            observer.on_termination(termination);
138        }
139    }
140}
141
142fn drop_none_matched_subscriptions<'sub>(context: &Mutable<AmbContext<'sub>>, key: DefaultKey) {
143    let drop_subscriptions = context.lock_mut(|mut lock| {
144        let keep = lock.subscriptions.remove(key).unwrap();
145        let mut keep_slot_map = SlotMap::new();
146        keep_slot_map.insert(keep);
147        std::mem::replace(&mut lock.subscriptions, keep_slot_map)
148    });
149    drop(drop_subscriptions);
150}