rx_rust/operators/conditional_boolean/
amb.rs1use crate::disposable::Disposable;
2use crate::utils::types::{Mutable, MutableHelper, NecessarySend, Shared};
3use crate::{
4 disposable::subscription::Subscription,
5 observable::Observable,
6 observer::{Observer, Termination},
7};
8use crate::{safe_lock, safe_lock_option, safe_lock_slot_map};
9use educe::Educe;
10use slotmap::{DefaultKey, SlotMap};
11
12#[derive(Educe)]
42#[educe(Debug, Clone)]
43pub struct Amb<I> {
44 sources: I,
45}
46
47impl<I> Amb<I> {
48 pub fn new(sources: I) -> Self {
49 Self { sources }
50 }
51}
52
53impl<'or, 'sub, T, E, OE, I> Observable<'or, 'sub, T, E> for Amb<I>
54where
55 'sub: 'or,
56 I: IntoIterator<Item = OE>,
57 OE: Observable<'or, 'sub, T, E>,
58{
59 fn subscribe(self, observer: impl Observer<T, E> + NecessarySend + 'or) -> Subscription<'sub> {
60 let observer = Shared::new(Mutable::new(Some(observer)));
61
62 let mut slop_map = SlotMap::new();
63 let sources_and_key: Vec<_> = self
64 .sources
65 .into_iter()
66 .map(|e| {
67 let key = slop_map.insert(Subscription::default()); (e, key)
69 })
70 .collect();
71 let context = Shared::new(Mutable::new(AmbContext {
72 subscriptions: slop_map,
73 }));
74
75 for (source, key) in sources_and_key {
76 if safe_lock_option!(is_none: observer) {
77 break;
79 }
80 let amb_observer = AmbObserver {
81 observer: observer.clone(),
82 context: context.clone(),
83 key,
84 determined_observer: None,
85 };
86 let sub = source.subscribe(amb_observer);
87 safe_lock_slot_map!(replace: context, subscriptions, key, sub);
88 }
89
90 Subscription::new_with_disposal(context)
91 }
92}
93
94struct AmbContext<'sub> {
95 subscriptions: SlotMap<DefaultKey, Subscription<'sub>>,
96}
97
98impl Disposable for Shared<Mutable<AmbContext<'_>>> {
99 fn dispose(self) {
100 safe_lock!(mem_take: self, subscriptions);
101 }
102}
103
104struct AmbObserver<'sub, OR> {
105 observer: Shared<Mutable<Option<OR>>>,
106 context: Shared<Mutable<AmbContext<'sub>>>,
107 key: DefaultKey,
108 determined_observer: Option<OR>, }
110
111impl<'sub, T, E, OR> Observer<T, E> for AmbObserver<'sub, OR>
112where
113 OR: Observer<T, E>,
114{
115 fn on_next(&mut self, value: T) {
116 if let Some(observer) = self.determined_observer.as_mut() {
117 observer.on_next(value);
118 return;
119 }
120 if let Some(observer) = safe_lock_option!(take: self.observer) {
121 self.determined_observer = Some(observer);
122 drop_none_matched_subscriptions(&self.context, self.key);
123 self.on_next(value);
124 }
125 }
126
127 fn on_termination(self, termination: Termination<E>) {
128 if let Some(observer) = self.determined_observer {
129 observer.on_termination(termination);
130 return;
131 }
132 if let Some(observer) = safe_lock_option!(take: self.observer) {
133 drop_none_matched_subscriptions(&self.context, self.key);
134 observer.on_termination(termination);
135 }
136 }
137}
138
139fn drop_none_matched_subscriptions<'sub>(context: &Mutable<AmbContext<'sub>>, key: DefaultKey) {
140 let drop_subscriptions = context.lock_mut(|mut lock| {
141 let keep = lock.subscriptions.remove(key).unwrap();
142 let mut keep_slot_map = SlotMap::new();
143 keep_slot_map.insert(keep);
144 std::mem::replace(&mut lock.subscriptions, keep_slot_map)
145 });
146 drop(drop_subscriptions);
147}