use std::{borrow::Borrow, convert::Infallible, fmt::Display, rc::Rc};
use derive_more::Deref;
use futures::{future, Future, FutureExt as _, Stream, StreamExt as _};
use medea_reactive::AllProcessed;
use sealed::sealed;
use crate::{media::LocalTracksConstraints, platform, utils::TaskHandle};
pub trait AsProtoState {
type Output;
fn as_proto(&self) -> Self::Output;
}
pub trait SynchronizableState {
type Input;
fn from_proto(
input: Self::Input,
send_constraints: &LocalTracksConstraints,
) -> Self;
fn apply(&self, input: Self::Input, send_cons: &LocalTracksConstraints);
}
pub trait Updatable {
fn when_stabilized(&self) -> AllProcessed<'static>;
fn when_updated(&self) -> AllProcessed<'static>;
fn connection_lost(&self);
fn connection_recovered(&self);
}
#[derive(Debug, Deref)]
pub struct Component<S, O> {
#[deref]
obj: Rc<O>,
state: Rc<S>,
_spawned_watchers: Vec<TaskHandle>,
}
impl<S, O> Component<S, O> {
#[must_use]
pub fn obj(&self) -> Rc<O> {
Rc::clone(&self.obj)
}
#[must_use]
pub fn state(&self) -> Rc<S> {
Rc::clone(&self.state)
}
}
impl<S: ComponentState<O> + 'static, O: 'static> Component<S, O> {
pub fn new(obj: Rc<O>, state: Rc<S>) -> Self {
let mut watchers_spawner =
WatchersSpawner::new(Rc::clone(&state), Rc::clone(&obj));
state.spawn_watchers(&mut watchers_spawner);
Self {
state,
obj,
_spawned_watchers: watchers_spawner.finish(),
}
}
}
#[derive(Debug)]
pub struct WatchersSpawner<S, O> {
state: Rc<S>,
obj: Rc<O>,
spawned_watchers: Vec<TaskHandle>,
}
impl<S: 'static, O: 'static> WatchersSpawner<S, O> {
pub fn spawn<R, V, F, H>(&mut self, mut rx: R, handle: F)
where
R: Stream<Item = V> + Unpin + 'static,
F: Fn(Rc<O>, Rc<S>, V) -> H + 'static,
H: Future + 'static,
<H as Future>::Output: IntoResult,
{
let obj = Rc::clone(&self.obj);
let state = Rc::clone(&self.state);
let (fut, handle) = future::abortable(async move {
while let Some(value) = rx.next().await {
if let Err(e) =
(handle)(Rc::clone(&obj), Rc::clone(&state), value)
.await
.into_result()
{
log::error!("{e}");
}
}
});
platform::spawn(fut.map(drop));
self.spawned_watchers.push(handle.into());
}
pub fn spawn_sync<R, V, F, Obj, St, H>(&mut self, rx: R, handle: F)
where
R: Stream<Item = V> + Unpin + 'static,
F: Fn(&Obj, &St, V) -> H + 'static,
Rc<O>: Borrow<Obj>,
Rc<S>: Borrow<St>,
H: IntoResult + 'static,
{
self.spawn(rx, move |o, s, v| {
future::ready(handle(o.borrow(), s.borrow(), v))
});
}
const fn new(state: Rc<S>, obj: Rc<O>) -> Self {
Self {
state,
obj,
spawned_watchers: Vec::new(),
}
}
fn finish(self) -> Vec<TaskHandle> {
self.spawned_watchers
}
}
pub trait ComponentState<C>: Sized {
fn spawn_watchers(&self, spawner: &mut WatchersSpawner<Self, C>);
}
pub trait ComponentTypes {
type State;
type Obj;
}
impl<S, O> ComponentTypes for Component<S, O> {
type Obj = O;
type State = S;
}
#[sealed]
pub trait IntoResult {
type Error: Display;
fn into_result(self) -> Result<(), Self::Error>;
}
#[sealed]
impl IntoResult for () {
type Error = Infallible;
fn into_result(self) -> Result<Self, Self::Error> {
Ok(self)
}
}
#[sealed]
impl<E: Display> IntoResult for Result<(), E> {
type Error = E;
fn into_result(self) -> Result<(), Self::Error> {
self
}
}