use std::{
cell::{Cell, RefCell},
rc::Rc,
time::Duration,
};
use futures::{
StreamExt as _, future,
future::{Either, LocalBoxFuture},
stream::LocalBoxStream,
};
use medea_reactive::ObservableCell;
use crate::{
platform,
utils::{ResettableDelayHandle, resettable_delay_for},
};
pub const DESCRIPTION_APPROVE_TIMEOUT: Duration = {
#[cfg(not(feature = "mockable"))]
{
Duration::from_secs(10)
}
#[cfg(feature = "mockable")]
{
Duration::from_millis(500)
}
};
#[derive(Clone, Debug, Default)]
pub struct LocalSdp(Rc<Inner>);
impl LocalSdp {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn subscribe(&self) -> LocalBoxStream<'static, Option<String>> {
self.0.current_sdp.subscribe()
}
pub fn when_approved(&self) -> LocalBoxFuture<'static, ()> {
let approved = Rc::clone(&self.0.approved);
Box::pin(async move {
_ = approved.when_eq(true).await;
})
}
pub fn on_approve(&self) -> LocalBoxStream<'static, ()> {
Box::pin(
self.0
.approved
.subscribe()
.filter_map(|approved| future::ready(approved.then_some(()))),
)
}
pub fn rollback(&self) {
self.0.current_sdp.set(self.0.prev_sdp.borrow().clone());
self.0.approved.set(true);
}
pub fn unapproved_set(&self, sdp: String) {
let prev_sdp = self.0.current_sdp.replace(Some(sdp));
drop(self.0.prev_sdp.replace(prev_sdp));
self.0.approved.set(false);
self.spawn_rollback_task();
}
pub fn approved_set(&self, sdp: String) {
let is_current_approved =
self.0.current_sdp.borrow().as_ref() == Some(&sdp);
if !is_current_approved {
let is_restart_needed = self
.0
.prev_sdp
.borrow()
.as_ref()
.is_some_and(|prev| prev == &sdp);
if is_restart_needed {
self.0.restart_needed.set(true);
}
drop(self.0.current_sdp.replace(Some(sdp)));
}
self.0.approved.set(true);
}
#[must_use]
pub fn current(&self) -> Option<String> {
self.0.current_sdp.get()
}
#[must_use]
pub fn is_rollback(&self) -> bool {
self.0.current_sdp.borrow().as_ref().is_some_and(|current| {
self.0
.prev_sdp
.borrow()
.as_ref()
.is_some_and(|prev| prev == current)
})
}
pub fn stop_timeout(&self) {
self.0.is_rollback_timeout_stopped.set(true);
if let Some(handle) = self.0.rollback_task_handle.borrow().as_ref() {
handle.stop();
}
}
pub fn resume_timeout(&self) {
self.0.is_rollback_timeout_stopped.set(false);
if let Some(handle) = self.0.rollback_task_handle.borrow().as_ref() {
handle.reset();
}
}
fn spawn_rollback_task(&self) {
let (timeout, rollback_task) = resettable_delay_for(
DESCRIPTION_APPROVE_TIMEOUT,
self.0.is_rollback_timeout_stopped.get(),
);
platform::spawn({
let this = self.clone();
async move {
if let Either::Right(_) =
future::select(this.when_approved(), Box::pin(timeout))
.await
{
this.rollback();
}
}
});
drop(self.0.rollback_task_handle.replace(Some(rollback_task)));
}
#[must_use]
pub fn is_restart_needed(&self) -> bool {
self.0.restart_needed.get()
}
}
#[derive(Debug)]
struct Inner {
current_sdp: ObservableCell<Option<String>>,
prev_sdp: RefCell<Option<String>>,
approved: Rc<ObservableCell<bool>>,
rollback_task_handle: RefCell<Option<ResettableDelayHandle>>,
is_rollback_timeout_stopped: Cell<bool>,
restart_needed: Cell<bool>,
}
impl Default for Inner {
fn default() -> Self {
Self {
prev_sdp: RefCell::new(None),
current_sdp: ObservableCell::new(None),
approved: Rc::new(ObservableCell::new(true)),
rollback_task_handle: RefCell::new(None),
is_rollback_timeout_stopped: Cell::new(false),
restart_needed: Cell::new(false),
}
}
}