medea_jason/peer/media/transitable_state/
controller.rs

1//! Component managing [`TransitableState`].
2
3use std::{
4    cell::{Cell, RefCell},
5    rc::Rc,
6    time::Duration,
7};
8
9use futures::{
10    FutureExt as _, StreamExt as _, future, future::Either,
11    stream::LocalBoxStream,
12};
13use medea_reactive::{Processed, ProgressableCell};
14
15use super::TransitableState;
16use crate::{
17    peer::media::transitable_state::{
18        InStable, InTransition, media_exchange_state, mute_state,
19    },
20    platform,
21    utils::{ResettableDelayHandle, resettable_delay_for},
22};
23
24/// [`TransitableStateController`] for the [`mute_state`].
25pub type MuteStateController =
26    TransitableStateController<mute_state::Stable, mute_state::Transition>;
27
28/// [`TransitableStateController`] for the [`media_exchange_state`].
29pub type MediaExchangeStateController = TransitableStateController<
30    media_exchange_state::Stable,
31    media_exchange_state::Transition,
32>;
33
34/// Component managing all kinds of [`TransitableState`].
35#[derive(Debug)]
36pub struct TransitableStateController<S, T> {
37    /// Actual [`TransitableState`].
38    state: ProgressableCell<TransitableState<S, T>>,
39
40    /// Timeout of the [`TransitableStateController::state`] transition.
41    timeout_handle: RefCell<Option<ResettableDelayHandle>>,
42
43    /// Indicator whether [`TransitableStateController::timeout_handle`]'s
44    /// timeout is stopped.
45    is_transition_timeout_stopped: Cell<bool>,
46}
47
48impl<S, T> TransitableStateController<S, T>
49where
50    S: InStable<Transition = T> + Into<TransitableState<S, T>> + 'static,
51    T: InTransition<Stable = S> + Into<TransitableState<S, T>> + 'static,
52{
53    /// Timeout for the current state to transit into the desired one.
54    const TRANSITION_TIMEOUT: Duration = {
55        #[cfg(not(feature = "mockable"))]
56        {
57            Duration::from_secs(10)
58        }
59        #[cfg(feature = "mockable")]
60        {
61            Duration::from_millis(500)
62        }
63    };
64
65    /// Returns new [`TransitableStateController`] with the provided
66    /// stable state.
67    #[must_use]
68    pub fn new(state: S) -> Rc<Self> {
69        let this = Rc::new(Self {
70            state: ProgressableCell::new(state.into()),
71            timeout_handle: RefCell::new(None),
72            is_transition_timeout_stopped: Cell::new(false),
73        });
74        Rc::clone(&this).spawn();
75        this
76    }
77
78    /// Spawns all the required [`Stream`] listeners for this
79    /// [`TransitableStateController`].
80    ///
81    /// [`Stream`]: futures::Stream
82    fn spawn(self: Rc<Self>) {
83        // We don't care about initial state, be cause transceiver is inactive
84        // at that moment.
85        let mut state_changes = self.state.subscribe().skip(1);
86        let weak_self = Rc::downgrade(&self);
87        platform::spawn(async move {
88            while let Some(state) = state_changes.next().await {
89                let (state, _guard) = state.into_parts();
90                if let Some(this) = weak_self.upgrade() {
91                    if let TransitableState::Transition(_) = state {
92                        let weak_this = Rc::downgrade(&this);
93                        platform::spawn(async move {
94                            let mut states = this.state.subscribe().skip(1);
95                            let (timeout, timeout_handle) =
96                                resettable_delay_for(
97                                    Self::TRANSITION_TIMEOUT,
98                                    this.is_transition_timeout_stopped.get(),
99                                );
100                            drop(
101                                this.timeout_handle
102                                    .borrow_mut()
103                                    .replace(timeout_handle),
104                            );
105                            match future::select(
106                                states.next(),
107                                Box::pin(timeout),
108                            )
109                            .await
110                            {
111                                Either::Left(_) => (),
112                                Either::Right(_) => {
113                                    #[expect( // false positive
114                                        clippy::shadow_unrelated,
115                                        reason = "actually related"
116                                    )]
117                                    if let Some(this) = weak_this.upgrade() {
118                                        let stable = this
119                                            .state
120                                            .get()
121                                            .cancel_transition();
122                                        this.state.set(stable);
123                                    }
124                                }
125                            }
126                        });
127                    }
128                } else {
129                    break;
130                }
131            }
132        });
133    }
134
135    /// Returns [`Stream`] into which the [`TransitableState::Stable`] updates
136    /// will be emitted.
137    ///
138    /// [`Stream`]: futures::Stream
139    pub fn subscribe_stable(&self) -> LocalBoxStream<'static, S> {
140        self.state
141            .subscribe()
142            .filter_map(async |s| {
143                let (s, _guard) = s.into_parts();
144                if let TransitableState::Stable(stable) = s {
145                    Some(stable)
146                } else {
147                    None
148                }
149            })
150            .boxed_local()
151    }
152
153    /// Returns [`Stream`] into which the [`TransitableState::Transition`]
154    /// updates will be emitted.
155    ///
156    /// [`Stream`]: futures::Stream
157    pub fn subscribe_transition(&self) -> LocalBoxStream<'static, T> {
158        self.state
159            .subscribe()
160            .filter_map(async |s| {
161                let (s, _guard) = s.into_parts();
162                if let TransitableState::Transition(transition) = s {
163                    Some(transition)
164                } else {
165                    None
166                }
167            })
168            .boxed_local()
169    }
170
171    /// Stops disable/enable timeout of this [`TransitableStateController`].
172    pub fn stop_transition_timeout(&self) {
173        self.is_transition_timeout_stopped.set(true);
174        if let Some(timer) = &*self.timeout_handle.borrow() {
175            timer.stop();
176        }
177    }
178
179    /// Resets disable/enable timeout of this [`TransitableStateController`].
180    pub fn reset_transition_timeout(&self) {
181        self.is_transition_timeout_stopped.set(false);
182        if let Some(timer) = &*self.timeout_handle.borrow() {
183            timer.reset();
184        }
185    }
186
187    /// Returns current [`TransitableStateController::state`].
188    #[must_use]
189    pub fn state(&self) -> TransitableState<S, T> {
190        self.state.get()
191    }
192
193    /// Starts transition of the [`TransitableStateController::state`] to the
194    /// provided one.
195    pub fn transition_to(&self, desired_state: S) {
196        let current_state = self.state.get();
197        self.state.set(current_state.transition_to(desired_state));
198    }
199
200    /// Returns [`Future`] which will be resolved when state of this
201    /// [`TransitableStateController`] will be [`TransitableState::Stable`] or
202    /// the [`TransitableStateController`] is dropped.
203    ///
204    /// Succeeds if [`TransitableStateController`]'s state transits into the
205    /// `desired_state` or the [`TransitableStateController`] is dropped.
206    ///
207    /// # Errors
208    ///
209    /// With an approved stable [`MediaState`] if transition to the
210    /// `desired_state` cannot be made.
211    ///
212    /// [`MediaState`]: super::MediaState
213    pub fn when_media_state_stable(
214        &self,
215        desired_state: S,
216    ) -> future::LocalBoxFuture<'static, Result<(), S>> {
217        let mut states = self.state.subscribe();
218        async move {
219            while let Some(state) = states.next().await {
220                let (state, _guard) = state.into_parts();
221                match state {
222                    TransitableState::Transition(_) => {}
223                    TransitableState::Stable(s) => {
224                        return if s == desired_state {
225                            Ok(())
226                        } else {
227                            Err(s)
228                        };
229                    }
230                }
231            }
232            Ok(())
233        }
234        .boxed_local()
235    }
236
237    /// Returns [`Processed`] that will be resolved once all the underlying data
238    /// updates are processed by all subscribers.
239    pub fn when_processed(&self) -> Processed<'static> {
240        self.state.when_all_processed()
241    }
242
243    /// Returns [`Future`] which will be resolved once [`TransitableState`] is
244    /// transited to the [`TransitableState::Stable`].
245    pub fn when_stabilized(self: Rc<Self>) -> Processed<'static, ()> {
246        Processed::new(Box::new(move || {
247            let stable = self.subscribe_stable();
248            Box::pin(async move {
249                stable.fuse().select_next_some().map(drop).await;
250            })
251        }))
252    }
253
254    /// Updates [`TransitableStateController::state`].
255    pub(in super::super) fn update(&self, new_state: S) {
256        let current_state = self.state.get();
257
258        let state_update = match current_state {
259            TransitableState::Stable(_) => new_state.into(),
260            TransitableState::Transition(t) => {
261                if t.intended() == new_state {
262                    new_state.into()
263                } else {
264                    t.set_inner(new_state).into()
265                }
266            }
267        };
268
269        self.state.set(state_update);
270    }
271}
272
273impl MuteStateController {
274    /// Indicates whether [`TransitableStateController`]'s mute state is in
275    /// [`mute_state::Stable::Muted`].
276    #[must_use]
277    pub fn muted(&self) -> bool {
278        self.state.get() == mute_state::Stable::Muted.into()
279    }
280
281    /// Indicates whether [`TransitableStateController`]'s mute state is in
282    /// [`mute_state::Stable::Unmuted`].
283    #[must_use]
284    pub fn unmuted(&self) -> bool {
285        self.state.get() == mute_state::Stable::Unmuted.into()
286    }
287}
288
289impl MediaExchangeStateController {
290    /// Indicates whether [`TransitableStateController`]'s media exchange state
291    /// is in [`media_exchange_state::Stable::Disabled`].
292    #[must_use]
293    pub fn disabled(&self) -> bool {
294        self.state.get() == media_exchange_state::Stable::Disabled.into()
295    }
296
297    /// Indicates whether [`TransitableStateController`]'s media exchange state
298    /// is in [`media_exchange_state::Stable::Enabled`].
299    #[must_use]
300    pub fn enabled(&self) -> bool {
301        self.state.get() == media_exchange_state::Stable::Enabled.into()
302    }
303}