rxr/observable/multicast.rs
1//! Module for handling observables with multicast capabilities.
2//!
3//! This module provides functionality for creating and working with observables that
4//! can multicast emissions to multiple subscribers. It includes the `Connectable`
5//! observable type, which allows subscribers to connect to an underlying source and
6//! receive emissions through a shared subscription.
7//!
8//! The `Connectable` type enables efficient and controlled multicasting of
9//! observable emissions, allowing multiple subscribers to receive the same emissions
10//! without duplicating the source work or emissions.
11
12use std::sync::{Arc, Mutex};
13
14use crate::{
15 subjects::{SubjectEmitter, SubjectReceiver},
16 subscribe::{Fuse, Subscription, SubscriptionHandle, UnsubscribeLogic},
17 ObservableExt, Subject, Subscribeable, Unsubscribeable,
18};
19
20/// Multicasting observable with a `connect()` method for creating subscriptions to
21/// an underlying source, enabling multiple consumers to connect and receive emitted
22/// values concurrently.
23///
24/// `Connectable` observable is a type of observable that does not emit values until
25/// the `connect()` method is called. It allows for multiple subscribers to be
26/// registered before any values are emitted, enabling subscribers to receive the
27/// same set of values concurrently. Once connected, a `Connectable` observable
28/// behaves like a regular observable, emitting values to all registered subscribers.
29#[derive(Clone)]
30pub struct Connectable<T> {
31 source: Arc<Mutex<dyn ObservableExt<T> + Send + Sync>>,
32 state_subject: (SubjectEmitter<T>, SubjectReceiver<T>),
33 fused: bool,
34 defused: bool,
35 subject: bool,
36 connected: Arc<Mutex<bool>>,
37 connected_subscription: Arc<Mutex<Option<Subscription>>>,
38}
39
40impl<T: Clone + 'static> Connectable<T> {
41 /// Creates a new instance of a `Connectable` observable.
42 ///
43 /// This method constructs a new instance of a `Connectable` observable.
44 /// Typically, you will not directly use this method to create a `Connectable`
45 /// observable instance. Instead, the [`connectable()`] operator is used to create
46 /// instances of `Connectable` observables more conveniently.
47 ///
48 /// [`connectable()`]: ../trait.ObservableExt.html#method.connectable
49 pub fn new(source: Arc<Mutex<dyn ObservableExt<T> + Send + Sync>>) -> Self {
50 Connectable {
51 source,
52 state_subject: Subject::emitter_receiver(),
53 fused: false,
54 defused: false,
55 subject: false,
56 connected: Arc::new(Mutex::new(false)),
57 connected_subscription: Arc::new(Mutex::new(None)),
58 }
59 }
60
61 /// Connects the `Connectable` observable, allowing it to emit values to
62 /// its subscribers.
63 ///
64 /// This method triggers the `Connectable` observable to start emitting values to
65 /// its subscribers. Until this method is called, the `Connectable` observable
66 /// will not emit any values, even if subscribers are present. After calling
67 /// `connect()`, the `Connectable` observable will start emitting values to its
68 /// subscribers, allowing them to receive and process the emitted data. It
69 /// returns a subscription handle which can be used to unsubscribe from the
70 /// `Connectable` observable when no longer needed or to await for the connected
71 /// source to finish emitting.
72 #[must_use]
73 pub fn connect(self) -> Subscription {
74 // Mark `Connectable` as connected.
75 *self.connected.lock().unwrap() = true;
76
77 // Turn `SubjectEmitter` into `Subscriber` and subscribe to it. This will
78 // subscribe all sinked `Subscriber`'s in `Subject` to the source observable.
79 let mut subscription = self
80 .source
81 .lock()
82 .unwrap()
83 .subscribe(self.state_subject.0.into());
84
85 // Extract `subscription_future` so it can be returnrd within new `Subscription`.
86 let subscription_future = subscription.subscription_future;
87 subscription.subscription_future = SubscriptionHandle::Nil;
88
89 // Store subscription into `connected_subscription` so connected source can
90 // be unsubscribed.
91 *self.connected_subscription.lock().unwrap() = Some(subscription);
92
93 // Get connected subscription memory location.
94 let cs = Arc::clone(&self.connected_subscription);
95
96 // Return `Subscription` that can be used to await connected source to finish
97 // emitting or can be used to disconnect the source from connector subject
98 // and stopping notifications to all subscribers.
99 Subscription::new(
100 UnsubscribeLogic::Logic(Box::new(move || {
101 let connectable_subscription = cs.lock().unwrap().take();
102 if let Some(connectable_subscription) = connectable_subscription {
103 connectable_subscription.unsubscribe();
104 }
105 })),
106 subscription_future,
107 )
108 }
109}
110
111impl<T> Fuse for Connectable<T> {
112 fn set_fused(&mut self, fused: bool, defused: bool) {
113 self.fused = fused;
114 self.defused = defused;
115 }
116
117 fn get_fused(&self) -> (bool, bool) {
118 (self.fused, self.defused)
119 }
120}
121
122impl<T: 'static> Subscribeable for Connectable<T> {
123 type ObsType = T;
124
125 fn subscribe(
126 &mut self,
127 mut s: crate::subscribe::Subscriber<Self::ObsType>,
128 ) -> crate::subscribe::Subscription {
129 let (fused, defused) = s.get_fused();
130
131 if defused || (fused && !self.fused) {
132 self.defused = s.defused;
133 self.fused = s.fused;
134 } else {
135 s.set_fused(self.fused, self.defused);
136 }
137 // Sink all `Subscriber`'s into `SubjectReceiver`.
138 let subject_subscription = self.state_subject.1.subscribe(s);
139
140 // Get connected subscription memory location.
141 let cs = Arc::clone(&self.connected_subscription);
142
143 // Get connected indicator memory location.
144 let connected = Arc::clone(&self.connected);
145 Subscription::new(
146 UnsubscribeLogic::Logic(Box::new(move || {
147 // If `unsubscribe()` is called individually on one or more subscriptions
148 // before connecting `Connectable` with `connect()`, this flag will be
149 // `false` and each subscription can be unsubscribed individually so
150 // that it does not emit when `Connectable` is connected.
151 if !*connected.lock().unwrap() {
152 subject_subscription.unsubscribe();
153 }
154
155 let connectable_subscription = cs.lock().unwrap().take();
156
157 // The unsubscribe logic may be invoked multiple times if the
158 // `Connectable` observable has more than one subscriber. Upon the
159 // initial invocation, it will contain a valid `Subscription` that
160 // can be unsubscribed from. Subsequent invocations will result in
161 // `None`. Unsubscribing the `Connectable` observable once is
162 // sufficient, regardless of the number of subscribers it has.
163 if let Some(connectable_subscription) = connectable_subscription {
164 connectable_subscription.unsubscribe();
165 }
166 })),
167 SubscriptionHandle::Nil,
168 )
169 }
170
171 fn is_subject(&self) -> bool {
172 self.subject
173 }
174
175 fn set_subject_indicator(&mut self, s: bool) {
176 self.subject = s;
177 }
178}