1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
//! Implementation of the [`Component`].
use std::{borrow::Borrow, convert::Infallible, fmt::Display, rc::Rc};
use derive_more::Deref;
use futures::{future, Future, FutureExt as _, Stream, StreamExt};
use medea_reactive::AllProcessed;
use sealed::sealed;
use crate::{media::LocalTracksConstraints, platform, utils::TaskHandle};
/// Abstraction over a state which can be transformed to the states from the
/// [`medea_client_api_proto::state`].
pub trait AsProtoState {
/// [`medea_client_api_proto::state`] into which this state can be
/// transformed.
type Output;
/// Converts this state to the [`medea_client_api_proto::state`]
/// representation.
fn as_proto(&self) -> Self::Output;
}
/// Abstraction of state which can be updated or created by the
/// [`medea_client_api_proto::state`].
pub trait SynchronizableState {
/// [`medea_client_api_proto::state`] by which this state can be updated.
type Input;
/// Creates a new state from the [`medea_client_api_proto::state`]
/// representation.
fn from_proto(
input: Self::Input,
send_cons: &LocalTracksConstraints,
) -> Self;
/// Updates this state with a provided [`medea_client_api_proto::state`].
fn apply(&self, input: Self::Input, send_cons: &LocalTracksConstraints);
}
/// Abstraction over a state which can be updated by a client side.
pub trait Updatable {
/// Returns [`Future`] resolving once this [`Updatable`] state resolves its
/// intentions.
fn when_stabilized(&self) -> AllProcessed<'static>;
/// Returns [`Future`] resolving once all the client updates are performed
/// on this state.
fn when_updated(&self) -> AllProcessed<'static>;
/// Notifies about a RPC connection loss.
fn connection_lost(&self);
/// Notifies about a RPC connection recovering.
fn connection_recovered(&self);
}
/// Component is a base that helps managing reactive components.
///
/// It consists of two parts: state and object. Object is listening to its state
/// changes and updates accordingly, so all mutations are meant to be applied to
/// the state.
#[derive(Debug, Deref)]
pub struct Component<S, O> {
/// Object holding the state.
#[deref]
obj: Rc<O>,
/// State being reactively listened.
state: Rc<S>,
/// All the spawned watchers of the state.
_spawned_watchers: Vec<TaskHandle>,
}
impl<S, O> Component<S, O> {
/// Returns [`Rc`] to the object managed by this [`Component`].
#[must_use]
pub fn obj(&self) -> Rc<O> {
Rc::clone(&self.obj)
}
/// Returns reference to the state of this [`Component`].
#[must_use]
pub fn state(&self) -> Rc<S> {
Rc::clone(&self.state)
}
}
impl<S: ComponentState<O> + 'static, O: 'static> Component<S, O> {
/// Returns new [`Component`] with a provided object and state.
///
/// Spawns all watchers of this [`Component`].
pub fn new(obj: Rc<O>, state: Rc<S>) -> Self {
let mut watchers_spawner =
WatchersSpawner::new(Rc::clone(&state), Rc::clone(&obj));
state.spawn_watchers(&mut watchers_spawner);
Self {
state,
obj,
_spawned_watchers: watchers_spawner.finish(),
}
}
}
/// Spawner for the [`Component`]'s watchers.
#[derive(Debug)]
pub struct WatchersSpawner<S, O> {
/// State being watched.
state: Rc<S>,
/// Object holding the state.
obj: Rc<O>,
/// All the spawned watchers of the state.
spawned_watchers: Vec<TaskHandle>,
}
impl<S: 'static, O: 'static> WatchersSpawner<S, O> {
/// Spawns watchers for the provided [`Stream`].
///
/// If watcher returns an error then this error will be printed to the error
/// log.
///
/// You can stop all listeners tasks spawned by this function by
/// [`Drop`]ping the [`Component`].
pub fn spawn<R, V, F, H>(&mut self, mut rx: R, handle: F)
where
R: Stream<Item = V> + Unpin + 'static,
F: Fn(Rc<O>, Rc<S>, V) -> H + 'static,
H: Future + 'static,
<H as Future>::Output: IntoResult,
{
let obj = Rc::clone(&self.obj);
let state = Rc::clone(&self.state);
let (fut, handle) = future::abortable(async move {
while let Some(value) = rx.next().await {
if let Err(e) =
(handle)(Rc::clone(&obj), Rc::clone(&state), value)
.await
.into_result()
{
log::error!("{e}");
}
}
});
platform::spawn(fut.map(drop));
self.spawned_watchers.push(handle.into());
}
/// Spawns synchronous watchers for the provided [`Stream`].
///
/// If watcher returns an error then this error will be printed to the error
/// log.
///
/// You can stop all listeners tasks spawned by this function by
/// [`Drop`]ping the [`Component`].
pub fn spawn_sync<R, V, F, Obj, St, H>(&mut self, rx: R, handle: F)
where
R: Stream<Item = V> + Unpin + 'static,
F: Fn(&Obj, &St, V) -> H + 'static,
Rc<O>: Borrow<Obj>,
Rc<S>: Borrow<St>,
H: IntoResult + 'static,
{
self.spawn(rx, move |o, s, v| {
future::ready(handle(o.borrow(), s.borrow(), v))
});
}
/// Creates new [`WatchersSpawner`] for the provided object and state.
fn new(state: Rc<S>, obj: Rc<O>) -> Self {
Self {
state,
obj,
spawned_watchers: Vec::new(),
}
}
/// Returns [`TaskHandle`]s for the watchers spawned by this
/// [`WatchersSpawner`].
// false positive: destructors cannot be evaluated at compile-time
#[allow(clippy::missing_const_for_fn)]
fn finish(self) -> Vec<TaskHandle> {
self.spawned_watchers
}
}
/// Abstraction describing state of the [`Component`].
pub trait ComponentState<C>: Sized {
/// Spawns all watchers required for this [`ComponentState`].
fn spawn_watchers(&self, spawner: &mut WatchersSpawner<Self, C>);
}
/// Helper trait for naming types of the [`Component`]'s state and object for
/// the [`ComponentState`] implementation generated by
/// [`medea_macro::watchers`].
pub trait ComponentTypes {
/// Type of [`Component`]'s state.
type State;
/// Type of object managed by [`Component`].
type Obj;
}
impl<S, O> ComponentTypes for Component<S, O> {
type Obj = O;
type State = S;
}
/// [`Result`] coercion for a `handle` accepted by the
/// [`WatchersSpawner::spawn()`] method.
#[sealed]
pub trait IntoResult {
/// Type of the coerced [`Result::Err`].
type Error: Display;
/// Coerces into the [`Result`].
fn into_result(self) -> Result<(), Self::Error>;
}
#[sealed]
impl IntoResult for () {
type Error = Infallible;
fn into_result(self) -> Result<Self, Self::Error> {
Ok(self)
}
}
#[sealed]
impl<E: Display> IntoResult for Result<(), E> {
type Error = E;
fn into_result(self) -> Result<(), Self::Error> {
self
}
}