medea_jason/utils/
component.rs

1//! Implementation of the [`Component`].
2
3use std::{borrow::Borrow, convert::Infallible, fmt::Display, rc::Rc};
4
5use derive_more::with_trait::Deref;
6use futures::{FutureExt as _, Stream, StreamExt as _, future};
7use medea_reactive::AllProcessed;
8use sealed::sealed;
9
10use crate::{media::LocalTracksConstraints, platform, utils::TaskHandle};
11
12/// Abstraction over a state which can be transformed to the states from the
13/// [`medea_client_api_proto::state`].
14pub trait AsProtoState {
15    /// [`medea_client_api_proto::state`] into which this state can be
16    /// transformed.
17    type Output;
18
19    /// Converts this state to the [`medea_client_api_proto::state`]
20    /// representation.
21    fn as_proto(&self) -> Self::Output;
22}
23
24/// Abstraction of state which can be updated or created by the
25/// [`medea_client_api_proto::state`].
26pub trait SynchronizableState {
27    /// [`medea_client_api_proto::state`] by which this state can be updated.
28    type Input;
29
30    /// Creates a new state from the [`medea_client_api_proto::state`]
31    /// representation.
32    fn from_proto(
33        input: Self::Input,
34        send_constraints: &LocalTracksConstraints,
35    ) -> Self;
36
37    /// Updates this state with a provided [`medea_client_api_proto::state`].
38    fn apply(&self, input: Self::Input, send_cons: &LocalTracksConstraints);
39}
40
41/// Abstraction over a state which can be updated by a client side.
42pub trait Updatable {
43    /// Returns [`Future`] resolving once this [`Updatable`] state resolves its
44    /// intentions.
45    fn when_stabilized(&self) -> AllProcessed<'static>;
46
47    /// Returns [`Future`] resolving once all the client updates are performed
48    /// on this state.
49    fn when_updated(&self) -> AllProcessed<'static>;
50
51    /// Notifies about a RPC connection loss.
52    fn connection_lost(&self);
53
54    /// Notifies about a RPC connection recovering.
55    fn connection_recovered(&self);
56}
57
58/// Component is a base that helps managing reactive components.
59///
60/// It consists of two parts: state and object. Object is listening to its state
61/// changes and updates accordingly, so all mutations are meant to be applied to
62/// the state.
63#[derive(Debug, Deref)]
64pub struct Component<S, O> {
65    /// Object holding the state.
66    #[deref]
67    obj: Rc<O>,
68
69    /// State being reactively listened.
70    state: Rc<S>,
71
72    /// All the spawned watchers of the state.
73    _spawned_watchers: Vec<TaskHandle>,
74}
75
76impl<S, O> Component<S, O> {
77    /// Returns [`Rc`] to the object managed by this [`Component`].
78    #[must_use]
79    pub fn obj(&self) -> Rc<O> {
80        Rc::clone(&self.obj)
81    }
82
83    /// Returns reference to the state of this [`Component`].
84    #[must_use]
85    pub fn state(&self) -> Rc<S> {
86        Rc::clone(&self.state)
87    }
88}
89
90impl<S: ComponentState<O> + 'static, O: 'static> Component<S, O> {
91    /// Returns new [`Component`] with a provided object and state.
92    ///
93    /// Spawns all watchers of this [`Component`].
94    pub fn new(obj: Rc<O>, state: Rc<S>) -> Self {
95        let mut watchers_spawner =
96            WatchersSpawner::new(Rc::clone(&state), Rc::clone(&obj));
97        state.spawn_watchers(&mut watchers_spawner);
98
99        Self { state, obj, _spawned_watchers: watchers_spawner.finish() }
100    }
101}
102
103/// Spawner for the [`Component`]'s watchers.
104#[derive(Debug)]
105pub struct WatchersSpawner<S, O> {
106    /// State being watched.
107    state: Rc<S>,
108
109    /// Object holding the state.
110    obj: Rc<O>,
111
112    /// All the spawned watchers of the state.
113    spawned_watchers: Vec<TaskHandle>,
114}
115
116impl<S: 'static, O: 'static> WatchersSpawner<S, O> {
117    /// Spawns watchers for the provided [`Stream`].
118    ///
119    /// If watcher returns an error then this error will be printed to the error
120    /// log.
121    ///
122    /// You can stop all listeners tasks spawned by this function by
123    /// [`Drop`]ping the [`Component`].
124    pub fn spawn<R, V, F, H>(&mut self, mut rx: R, handle: F)
125    where
126        R: Stream<Item = V> + Unpin + 'static,
127        F: Fn(Rc<O>, Rc<S>, V) -> H + 'static,
128        H: Future + 'static,
129        <H as Future>::Output: IntoResult,
130    {
131        let obj = Rc::clone(&self.obj);
132        let state = Rc::clone(&self.state);
133        let (fut, handle) = future::abortable(async move {
134            while let Some(value) = rx.next().await {
135                if let Err(e) =
136                    (handle)(Rc::clone(&obj), Rc::clone(&state), value)
137                        .await
138                        .into_result()
139                {
140                    log::error!("{e}");
141                }
142            }
143        });
144        platform::spawn(fut.map(drop));
145
146        self.spawned_watchers.push(handle.into());
147    }
148
149    /// Spawns synchronous watchers for the provided [`Stream`].
150    ///
151    /// If watcher returns an error then this error will be printed to the error
152    /// log.
153    ///
154    /// You can stop all listeners tasks spawned by this function by
155    /// [`Drop`]ping the [`Component`].
156    pub fn spawn_sync<R, V, F, Obj, St, H>(&mut self, rx: R, handle: F)
157    where
158        R: Stream<Item = V> + Unpin + 'static,
159        F: Fn(&Obj, &St, V) -> H + 'static,
160        Rc<O>: Borrow<Obj>,
161        Rc<S>: Borrow<St>,
162        H: IntoResult + 'static,
163    {
164        self.spawn(rx, move |o, s, v| {
165            future::ready(handle(o.borrow(), s.borrow(), v))
166        });
167    }
168
169    /// Creates new [`WatchersSpawner`] for the provided object and state.
170    const fn new(state: Rc<S>, obj: Rc<O>) -> Self {
171        Self { state, obj, spawned_watchers: Vec::new() }
172    }
173
174    /// Returns [`TaskHandle`]s for the watchers spawned by this
175    /// [`WatchersSpawner`].
176    fn finish(self) -> Vec<TaskHandle> {
177        self.spawned_watchers
178    }
179}
180
181/// Abstraction describing state of the [`Component`].
182pub trait ComponentState<C>: Sized {
183    /// Spawns all watchers required for this [`ComponentState`].
184    fn spawn_watchers(&self, spawner: &mut WatchersSpawner<Self, C>);
185}
186
187/// Helper trait for naming types of the [`Component`]'s state and object for
188/// the [`ComponentState`] implementation generated by
189/// [`medea_macro::watchers`].
190pub trait ComponentTypes {
191    /// Type of [`Component`]'s state.
192    type State;
193
194    /// Type of object managed by [`Component`].
195    type Obj;
196}
197
198impl<S, O> ComponentTypes for Component<S, O> {
199    type Obj = O;
200    type State = S;
201}
202
203/// [`Result`] coercion for a `handle` accepted by the
204/// [`WatchersSpawner::spawn()`] method.
205#[sealed]
206pub trait IntoResult {
207    /// Type of the coerced [`Result::Err`].
208    type Error: Display;
209
210    /// Coerces into the [`Result`].
211    ///
212    /// # Errors
213    ///
214    /// If coerces into a [`Result::Err`].
215    fn into_result(self) -> Result<(), Self::Error>;
216}
217
218#[sealed]
219impl IntoResult for () {
220    type Error = Infallible;
221
222    fn into_result(self) -> Result<Self, Self::Error> {
223        Ok(self)
224    }
225}
226
227#[sealed]
228impl<E: Display> IntoResult for Result<(), E> {
229    type Error = E;
230
231    fn into_result(self) -> Result<(), Self::Error> {
232        self
233    }
234}