rx_rust/operators/conditional_boolean/
amb.rs

1use crate::disposable::Disposable;
2use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
3use crate::{
4    disposable::subscription::Subscription,
5    observable::Observable,
6    observer::{Observer, Termination},
7};
8use crate::{safe_lock, safe_lock_option, safe_lock_slot_map};
9use educe::Educe;
10use slotmap::{DefaultKey, SlotMap};
11
12/// Given two or more source Observables, emit all of the items from only the first of these Observables to emit an item or notification.
13/// See <https://reactivex.io/documentation/operators/amb.html>
14///
15/// # Examples
16/// ```rust
17/// use rx_rust::{
18///     observable::observable_ext::ObservableExt,
19///     observer::Termination,
20///     operators::{
21///         conditional_boolean::amb::Amb,
22///         creating::from_iter::FromIter,
23///     },
24/// };
25///
26/// let mut values = Vec::new();
27/// let mut terminations = Vec::new();
28///
29/// let observable = Amb::new([
30///     FromIter::new(vec![1, 2]),
31///     FromIter::new(vec![3, 4]),
32/// ]);
33/// observable.subscribe_with_callback(
34///     |value| values.push(value),
35///     |termination| terminations.push(termination),
36/// );
37///
38/// assert_eq!(values, vec![1, 2]);
39/// assert_eq!(terminations, vec![Termination::Completed]);
40/// ```
41#[derive(Educe)]
42#[educe(Debug, Clone)]
43pub struct Amb<I> {
44    sources: I,
45}
46
47impl<I> Amb<I> {
48    pub fn new(sources: I) -> Self {
49        Self { sources }
50    }
51}
52
53impl<'or, 'sub, T, E, OE, I> Observable<'or, 'sub, T, E> for Amb<I>
54where
55    'sub: 'or,
56    I: IntoIterator<Item = OE>,
57    OE: Observable<'or, 'sub, T, E>,
58{
59    fn subscribe(self, observer: impl Observer<T, E> + NecessarySendSync + 'or) -> Subscription<'sub> {
60        let observer = Shared::new(Mutable::new(Some(observer)));
61
62        let mut slop_map = SlotMap::new();
63        let sources_and_key: Vec<_> = self
64            .sources
65            .into_iter()
66            .map(|e| {
67                let key = slop_map.insert(Subscription::default()); // Insert placeholder
68                (e, key)
69            })
70            .collect();
71        let context = Shared::new(Mutable::new(AmbContext {
72            subscriptions: slop_map,
73        }));
74
75        for (source, key) in sources_and_key {
76            if safe_lock_option!(is_none: observer) {
77                // determined
78                break;
79            }
80            let amb_observer = AmbObserver {
81                observer: observer.clone(),
82                context: context.clone(),
83                key,
84                determined_observer: None,
85            };
86            let sub = source.subscribe(amb_observer);
87            safe_lock_slot_map!(replace: context, subscriptions, key, sub);
88        }
89
90        Subscription::new_with_disposal(context)
91    }
92}
93
94struct AmbContext<'sub> {
95    subscriptions: SlotMap<DefaultKey, Subscription<'sub>>,
96}
97
98impl Disposable for Shared<Mutable<AmbContext<'_>>> {
99    fn dispose(self) {
100        safe_lock!(mem_take: self, subscriptions);
101    }
102}
103
104struct AmbObserver<'sub, OR> {
105    observer: Shared<Mutable<Option<OR>>>,
106    context: Shared<Mutable<AmbContext<'sub>>>,
107    key: DefaultKey,
108    determined_observer: Option<OR>, // Some means this AmbObserver is the first. None means this AmbObserver is not the first or not determined yet.
109}
110
111impl<'sub, T, E, OR> Observer<T, E> for AmbObserver<'sub, OR>
112where
113    OR: Observer<T, E>,
114{
115    fn on_next(&mut self, value: T) {
116        if let Some(observer) = self.determined_observer.as_mut() {
117            observer.on_next(value);
118            return;
119        }
120        if let Some(observer) = safe_lock_option!(take: self.observer) {
121            self.determined_observer = Some(observer);
122            drop_none_matched_subscriptions(&self.context, self.key);
123            self.on_next(value);
124        }
125    }
126
127    fn on_termination(self, termination: Termination<E>) {
128        if let Some(observer) = self.determined_observer {
129            observer.on_termination(termination);
130            return;
131        }
132        if let Some(observer) = safe_lock_option!(take: self.observer) {
133            drop_none_matched_subscriptions(&self.context, self.key);
134            observer.on_termination(termination);
135        }
136    }
137}
138
139fn drop_none_matched_subscriptions<'sub>(context: &Mutable<AmbContext<'sub>>, key: DefaultKey) {
140    let drop_subscriptions = context.lock_mut(|mut lock| {
141        let keep = lock.subscriptions.remove(key).unwrap();
142        let mut keep_slot_map = SlotMap::new();
143        keep_slot_map.insert(keep);
144        std::mem::replace(&mut lock.subscriptions, keep_slot_map)
145    });
146    drop(drop_subscriptions);
147}