rxr/observable/
multicast.rs

1//! Module for handling observables with multicast capabilities.
2//!
3//! This module provides functionality for creating and working with observables that
4//! can multicast emissions to multiple subscribers. It includes the `Connectable`
5//! observable type, which allows subscribers to connect to an underlying source and
6//! receive emissions through a shared subscription.
7//!
8//! The `Connectable` type enables efficient and controlled multicasting of
9//! observable emissions, allowing multiple subscribers to receive the same emissions
10//! without duplicating the source work or emissions.
11
12use std::sync::{Arc, Mutex};
13
14use crate::{
15    subjects::{SubjectEmitter, SubjectReceiver},
16    subscribe::{Fuse, Subscription, SubscriptionHandle, UnsubscribeLogic},
17    ObservableExt, Subject, Subscribeable, Unsubscribeable,
18};
19
20/// Multicasting observable with a `connect()` method for creating subscriptions to
21/// an underlying source, enabling multiple consumers to connect and receive emitted
22/// values concurrently.
23///
24/// `Connectable` observable is a type of observable that does not emit values until
25/// the `connect()` method is called. It allows for multiple subscribers to be
26/// registered before any values are emitted, enabling subscribers to receive the
27/// same set of values concurrently. Once connected, a `Connectable` observable
28/// behaves like a regular observable, emitting values to all registered subscribers.
29#[derive(Clone)]
30pub struct Connectable<T> {
31    source: Arc<Mutex<dyn ObservableExt<T> + Send + Sync>>,
32    state_subject: (SubjectEmitter<T>, SubjectReceiver<T>),
33    fused: bool,
34    defused: bool,
35    subject: bool,
36    connected: Arc<Mutex<bool>>,
37    connected_subscription: Arc<Mutex<Option<Subscription>>>,
38}
39
40impl<T: Clone + 'static> Connectable<T> {
41    /// Creates a new instance of a `Connectable` observable.
42    ///
43    /// This method constructs a new instance of a `Connectable` observable.
44    /// Typically, you will not directly use this method to create a `Connectable`
45    /// observable instance. Instead, the [`connectable()`] operator is used to create
46    /// instances of `Connectable` observables more conveniently.
47    ///
48    /// [`connectable()`]: ../trait.ObservableExt.html#method.connectable
49    pub fn new(source: Arc<Mutex<dyn ObservableExt<T> + Send + Sync>>) -> Self {
50        Connectable {
51            source,
52            state_subject: Subject::emitter_receiver(),
53            fused: false,
54            defused: false,
55            subject: false,
56            connected: Arc::new(Mutex::new(false)),
57            connected_subscription: Arc::new(Mutex::new(None)),
58        }
59    }
60
61    /// Connects the `Connectable` observable, allowing it to emit values to
62    /// its subscribers.
63    ///
64    /// This method triggers the `Connectable` observable to start emitting values to
65    /// its subscribers. Until this method is called, the `Connectable` observable
66    /// will not emit any values, even if subscribers are present. After calling
67    /// `connect()`, the `Connectable` observable will start emitting values to its
68    /// subscribers, allowing them to receive and process the emitted data. It
69    /// returns a subscription handle which can be used to unsubscribe from the
70    /// `Connectable` observable when no longer needed or to await for the connected
71    /// source to finish emitting.
72    #[must_use]
73    pub fn connect(self) -> Subscription {
74        // Mark `Connectable` as connected.
75        *self.connected.lock().unwrap() = true;
76
77        // Turn `SubjectEmitter` into `Subscriber` and subscribe to it. This will
78        // subscribe all sinked `Subscriber`'s in `Subject` to the source observable.
79        let mut subscription = self
80            .source
81            .lock()
82            .unwrap()
83            .subscribe(self.state_subject.0.into());
84
85        // Extract `subscription_future` so it can be returnrd within new `Subscription`.
86        let subscription_future = subscription.subscription_future;
87        subscription.subscription_future = SubscriptionHandle::Nil;
88
89        // Store subscription into `connected_subscription` so connected source can
90        // be unsubscribed.
91        *self.connected_subscription.lock().unwrap() = Some(subscription);
92
93        // Get connected subscription memory location.
94        let cs = Arc::clone(&self.connected_subscription);
95
96        // Return `Subscription` that can be used to await connected source to finish
97        // emitting or can be used to disconnect the source from connector subject
98        // and stopping notifications to all subscribers.
99        Subscription::new(
100            UnsubscribeLogic::Logic(Box::new(move || {
101                let connectable_subscription = cs.lock().unwrap().take();
102                if let Some(connectable_subscription) = connectable_subscription {
103                    connectable_subscription.unsubscribe();
104                }
105            })),
106            subscription_future,
107        )
108    }
109}
110
111impl<T> Fuse for Connectable<T> {
112    fn set_fused(&mut self, fused: bool, defused: bool) {
113        self.fused = fused;
114        self.defused = defused;
115    }
116
117    fn get_fused(&self) -> (bool, bool) {
118        (self.fused, self.defused)
119    }
120}
121
122impl<T: 'static> Subscribeable for Connectable<T> {
123    type ObsType = T;
124
125    fn subscribe(
126        &mut self,
127        mut s: crate::subscribe::Subscriber<Self::ObsType>,
128    ) -> crate::subscribe::Subscription {
129        let (fused, defused) = s.get_fused();
130
131        if defused || (fused && !self.fused) {
132            self.defused = s.defused;
133            self.fused = s.fused;
134        } else {
135            s.set_fused(self.fused, self.defused);
136        }
137        // Sink all `Subscriber`'s into `SubjectReceiver`.
138        let subject_subscription = self.state_subject.1.subscribe(s);
139
140        // Get connected subscription memory location.
141        let cs = Arc::clone(&self.connected_subscription);
142
143        // Get connected indicator memory location.
144        let connected = Arc::clone(&self.connected);
145        Subscription::new(
146            UnsubscribeLogic::Logic(Box::new(move || {
147                // If `unsubscribe()` is called individually on one or more subscriptions
148                // before connecting `Connectable` with `connect()`, this flag will be
149                // `false` and each subscription can be unsubscribed individually so
150                // that it does not emit when `Connectable` is connected.
151                if !*connected.lock().unwrap() {
152                    subject_subscription.unsubscribe();
153                }
154
155                let connectable_subscription = cs.lock().unwrap().take();
156
157                // The unsubscribe logic may be invoked multiple times if the
158                // `Connectable` observable has more than one subscriber. Upon the
159                // initial invocation, it will contain a valid `Subscription` that
160                // can be unsubscribed from. Subsequent invocations will result in
161                // `None`. Unsubscribing the `Connectable` observable once is
162                // sufficient, regardless of the number of subscribers it has.
163                if let Some(connectable_subscription) = connectable_subscription {
164                    connectable_subscription.unsubscribe();
165                }
166            })),
167            SubscriptionHandle::Nil,
168        )
169    }
170
171    fn is_subject(&self) -> bool {
172        self.subject
173    }
174
175    fn set_subject_indicator(&mut self, s: bool) {
176        self.subject = s;
177    }
178}