rx_rust/operators/conditional_boolean/
amb.rs1use crate::disposable::Disposable;
2use crate::utils::types::{Mutable, MutableHelper, NecessarySendSync, Shared};
3use crate::{
4 disposable::subscription::Subscription,
5 observable::Observable,
6 observer::{Observer, Termination},
7};
8use crate::{safe_lock, safe_lock_option, safe_lock_slot_map};
9use educe::Educe;
10use slotmap::{DefaultKey, SlotMap};
11
12#[derive(Educe)]
42#[educe(Debug, Clone)]
43pub struct Amb<I> {
44 sources: I,
45}
46
47impl<I> Amb<I> {
48 pub fn new(sources: I) -> Self {
49 Self { sources }
50 }
51}
52
53impl<'or, 'sub, T, E, OE, I> Observable<'or, 'sub, T, E> for Amb<I>
54where
55 'sub: 'or,
56 I: IntoIterator<Item = OE>,
57 OE: Observable<'or, 'sub, T, E>,
58{
59 fn subscribe(
60 self,
61 observer: impl Observer<T, E> + NecessarySendSync + 'or,
62 ) -> Subscription<'sub> {
63 let observer = Shared::new(Mutable::new(Some(observer)));
64
65 let mut slop_map = SlotMap::new();
66 let sources_and_key: Vec<_> = self
67 .sources
68 .into_iter()
69 .map(|e| {
70 let key = slop_map.insert(Subscription::default()); (e, key)
72 })
73 .collect();
74 let context = Shared::new(Mutable::new(AmbContext {
75 subscriptions: slop_map,
76 }));
77
78 for (source, key) in sources_and_key {
79 if safe_lock_option!(is_none: observer) {
80 break;
82 }
83 let amb_observer = AmbObserver {
84 observer: observer.clone(),
85 context: context.clone(),
86 key,
87 determined_observer: None,
88 };
89 let sub = source.subscribe(amb_observer);
90 safe_lock_slot_map!(replace: context, subscriptions, key, sub);
91 }
92
93 Subscription::new_with_disposal(context)
94 }
95}
96
97struct AmbContext<'sub> {
98 subscriptions: SlotMap<DefaultKey, Subscription<'sub>>,
99}
100
101impl Disposable for Shared<Mutable<AmbContext<'_>>> {
102 fn dispose(self) {
103 safe_lock!(mem_take: self, subscriptions);
104 }
105}
106
107struct AmbObserver<'sub, OR> {
108 observer: Shared<Mutable<Option<OR>>>,
109 context: Shared<Mutable<AmbContext<'sub>>>,
110 key: DefaultKey,
111 determined_observer: Option<OR>, }
113
114impl<'sub, T, E, OR> Observer<T, E> for AmbObserver<'sub, OR>
115where
116 OR: Observer<T, E>,
117{
118 fn on_next(&mut self, value: T) {
119 if let Some(observer) = self.determined_observer.as_mut() {
120 observer.on_next(value);
121 return;
122 }
123 if let Some(observer) = safe_lock_option!(take: self.observer) {
124 self.determined_observer = Some(observer);
125 drop_none_matched_subscriptions(&self.context, self.key);
126 self.on_next(value);
127 }
128 }
129
130 fn on_termination(self, termination: Termination<E>) {
131 if let Some(observer) = self.determined_observer {
132 observer.on_termination(termination);
133 return;
134 }
135 if let Some(observer) = safe_lock_option!(take: self.observer) {
136 drop_none_matched_subscriptions(&self.context, self.key);
137 observer.on_termination(termination);
138 }
139 }
140}
141
142fn drop_none_matched_subscriptions<'sub>(context: &Mutable<AmbContext<'sub>>, key: DefaultKey) {
143 let drop_subscriptions = context.lock_mut(|mut lock| {
144 let keep = lock.subscriptions.remove(key).unwrap();
145 let mut keep_slot_map = SlotMap::new();
146 keep_slot_map.insert(keep);
147 std::mem::replace(&mut lock.subscriptions, keep_slot_map)
148 });
149 drop(drop_subscriptions);
150}