medea_jason/peer/media/transitable_state/
controller.rs1use std::{
4 cell::{Cell, RefCell},
5 rc::Rc,
6 time::Duration,
7};
8
9use futures::{
10 FutureExt as _, StreamExt as _, future, future::Either,
11 stream::LocalBoxStream,
12};
13use medea_reactive::{Processed, ProgressableCell};
14
15use super::TransitableState;
16use crate::{
17 peer::media::transitable_state::{
18 InStable, InTransition, media_exchange_state, mute_state,
19 },
20 platform,
21 utils::{ResettableDelayHandle, resettable_delay_for},
22};
23
24pub type MuteStateController =
26 TransitableStateController<mute_state::Stable, mute_state::Transition>;
27
28pub type MediaExchangeStateController = TransitableStateController<
30 media_exchange_state::Stable,
31 media_exchange_state::Transition,
32>;
33
34#[derive(Debug)]
36pub struct TransitableStateController<S, T> {
37 state: ProgressableCell<TransitableState<S, T>>,
39
40 timeout_handle: RefCell<Option<ResettableDelayHandle>>,
42
43 is_transition_timeout_stopped: Cell<bool>,
46}
47
48impl<S, T> TransitableStateController<S, T>
49where
50 S: InStable<Transition = T> + Into<TransitableState<S, T>> + 'static,
51 T: InTransition<Stable = S> + Into<TransitableState<S, T>> + 'static,
52{
53 const TRANSITION_TIMEOUT: Duration = {
55 #[cfg(not(feature = "mockable"))]
56 {
57 Duration::from_secs(10)
58 }
59 #[cfg(feature = "mockable")]
60 {
61 Duration::from_millis(500)
62 }
63 };
64
65 #[must_use]
68 pub fn new(state: S) -> Rc<Self> {
69 let this = Rc::new(Self {
70 state: ProgressableCell::new(state.into()),
71 timeout_handle: RefCell::new(None),
72 is_transition_timeout_stopped: Cell::new(false),
73 });
74 Rc::clone(&this).spawn();
75 this
76 }
77
78 fn spawn(self: Rc<Self>) {
83 let mut state_changes = self.state.subscribe().skip(1);
86 let weak_self = Rc::downgrade(&self);
87 platform::spawn(async move {
88 while let Some(state) = state_changes.next().await {
89 let (state, _guard) = state.into_parts();
90 if let Some(this) = weak_self.upgrade() {
91 if let TransitableState::Transition(_) = state {
92 let weak_this = Rc::downgrade(&this);
93 platform::spawn(async move {
94 let mut states = this.state.subscribe().skip(1);
95 let (timeout, timeout_handle) =
96 resettable_delay_for(
97 Self::TRANSITION_TIMEOUT,
98 this.is_transition_timeout_stopped.get(),
99 );
100 drop(
101 this.timeout_handle
102 .borrow_mut()
103 .replace(timeout_handle),
104 );
105 match future::select(
106 states.next(),
107 Box::pin(timeout),
108 )
109 .await
110 {
111 Either::Left(_) => (),
112 Either::Right(_) => {
113 #[expect( clippy::shadow_unrelated,
115 reason = "actually related"
116 )]
117 if let Some(this) = weak_this.upgrade() {
118 let stable = this
119 .state
120 .get()
121 .cancel_transition();
122 this.state.set(stable);
123 }
124 }
125 }
126 });
127 }
128 } else {
129 break;
130 }
131 }
132 });
133 }
134
135 pub fn subscribe_stable(&self) -> LocalBoxStream<'static, S> {
140 self.state
141 .subscribe()
142 .filter_map(async |s| {
143 let (s, _guard) = s.into_parts();
144 if let TransitableState::Stable(stable) = s {
145 Some(stable)
146 } else {
147 None
148 }
149 })
150 .boxed_local()
151 }
152
153 pub fn subscribe_transition(&self) -> LocalBoxStream<'static, T> {
158 self.state
159 .subscribe()
160 .filter_map(async |s| {
161 let (s, _guard) = s.into_parts();
162 if let TransitableState::Transition(transition) = s {
163 Some(transition)
164 } else {
165 None
166 }
167 })
168 .boxed_local()
169 }
170
171 pub fn stop_transition_timeout(&self) {
173 self.is_transition_timeout_stopped.set(true);
174 if let Some(timer) = &*self.timeout_handle.borrow() {
175 timer.stop();
176 }
177 }
178
179 pub fn reset_transition_timeout(&self) {
181 self.is_transition_timeout_stopped.set(false);
182 if let Some(timer) = &*self.timeout_handle.borrow() {
183 timer.reset();
184 }
185 }
186
187 #[must_use]
189 pub fn state(&self) -> TransitableState<S, T> {
190 self.state.get()
191 }
192
193 pub fn transition_to(&self, desired_state: S) {
196 let current_state = self.state.get();
197 self.state.set(current_state.transition_to(desired_state));
198 }
199
200 pub fn when_media_state_stable(
214 &self,
215 desired_state: S,
216 ) -> future::LocalBoxFuture<'static, Result<(), S>> {
217 let mut states = self.state.subscribe();
218 async move {
219 while let Some(state) = states.next().await {
220 let (state, _guard) = state.into_parts();
221 match state {
222 TransitableState::Transition(_) => {}
223 TransitableState::Stable(s) => {
224 return if s == desired_state {
225 Ok(())
226 } else {
227 Err(s)
228 };
229 }
230 }
231 }
232 Ok(())
233 }
234 .boxed_local()
235 }
236
237 pub fn when_processed(&self) -> Processed<'static> {
240 self.state.when_all_processed()
241 }
242
243 pub fn when_stabilized(self: Rc<Self>) -> Processed<'static, ()> {
246 Processed::new(Box::new(move || {
247 let stable = self.subscribe_stable();
248 Box::pin(async move {
249 stable.fuse().select_next_some().map(drop).await;
250 })
251 }))
252 }
253
254 pub(in super::super) fn update(&self, new_state: S) {
256 let current_state = self.state.get();
257
258 let state_update = match current_state {
259 TransitableState::Stable(_) => new_state.into(),
260 TransitableState::Transition(t) => {
261 if t.intended() == new_state {
262 new_state.into()
263 } else {
264 t.set_inner(new_state).into()
265 }
266 }
267 };
268
269 self.state.set(state_update);
270 }
271}
272
273impl MuteStateController {
274 #[must_use]
277 pub fn muted(&self) -> bool {
278 self.state.get() == mute_state::Stable::Muted.into()
279 }
280
281 #[must_use]
284 pub fn unmuted(&self) -> bool {
285 self.state.get() == mute_state::Stable::Unmuted.into()
286 }
287}
288
289impl MediaExchangeStateController {
290 #[must_use]
293 pub fn disabled(&self) -> bool {
294 self.state.get() == media_exchange_state::Stable::Disabled.into()
295 }
296
297 #[must_use]
300 pub fn enabled(&self) -> bool {
301 self.state.get() == media_exchange_state::Stable::Enabled.into()
302 }
303}