medea_jason/utils/component.rs
1//! Implementation of the [`Component`].
2
3use std::{borrow::Borrow, convert::Infallible, fmt::Display, rc::Rc};
4
5use derive_more::with_trait::Deref;
6use futures::{FutureExt as _, Stream, StreamExt as _, future};
7use medea_reactive::AllProcessed;
8use sealed::sealed;
9
10use crate::{media::LocalTracksConstraints, platform, utils::TaskHandle};
11
12/// Abstraction over a state which can be transformed to the states from the
13/// [`medea_client_api_proto::state`].
14pub trait AsProtoState {
15 /// [`medea_client_api_proto::state`] into which this state can be
16 /// transformed.
17 type Output;
18
19 /// Converts this state to the [`medea_client_api_proto::state`]
20 /// representation.
21 fn as_proto(&self) -> Self::Output;
22}
23
24/// Abstraction of state which can be updated or created by the
25/// [`medea_client_api_proto::state`].
26pub trait SynchronizableState {
27 /// [`medea_client_api_proto::state`] by which this state can be updated.
28 type Input;
29
30 /// Creates a new state from the [`medea_client_api_proto::state`]
31 /// representation.
32 fn from_proto(
33 input: Self::Input,
34 send_constraints: &LocalTracksConstraints,
35 ) -> Self;
36
37 /// Updates this state with a provided [`medea_client_api_proto::state`].
38 fn apply(&self, input: Self::Input, send_cons: &LocalTracksConstraints);
39}
40
41/// Abstraction over a state which can be updated by a client side.
42pub trait Updatable {
43 /// Returns [`Future`] resolving once this [`Updatable`] state resolves its
44 /// intentions.
45 fn when_stabilized(&self) -> AllProcessed<'static>;
46
47 /// Returns [`Future`] resolving once all the client updates are performed
48 /// on this state.
49 fn when_updated(&self) -> AllProcessed<'static>;
50
51 /// Notifies about a RPC connection loss.
52 fn connection_lost(&self);
53
54 /// Notifies about a RPC connection recovering.
55 fn connection_recovered(&self);
56}
57
58/// Component is a base that helps managing reactive components.
59///
60/// It consists of two parts: state and object. Object is listening to its state
61/// changes and updates accordingly, so all mutations are meant to be applied to
62/// the state.
63#[derive(Debug, Deref)]
64pub struct Component<S, O> {
65 /// Object holding the state.
66 #[deref]
67 obj: Rc<O>,
68
69 /// State being reactively listened.
70 state: Rc<S>,
71
72 /// All the spawned watchers of the state.
73 _spawned_watchers: Vec<TaskHandle>,
74}
75
76impl<S, O> Component<S, O> {
77 /// Returns [`Rc`] to the object managed by this [`Component`].
78 #[must_use]
79 pub fn obj(&self) -> Rc<O> {
80 Rc::clone(&self.obj)
81 }
82
83 /// Returns reference to the state of this [`Component`].
84 #[must_use]
85 pub fn state(&self) -> Rc<S> {
86 Rc::clone(&self.state)
87 }
88}
89
90impl<S: ComponentState<O> + 'static, O: 'static> Component<S, O> {
91 /// Returns new [`Component`] with a provided object and state.
92 ///
93 /// Spawns all watchers of this [`Component`].
94 pub fn new(obj: Rc<O>, state: Rc<S>) -> Self {
95 let mut watchers_spawner =
96 WatchersSpawner::new(Rc::clone(&state), Rc::clone(&obj));
97 state.spawn_watchers(&mut watchers_spawner);
98
99 Self { state, obj, _spawned_watchers: watchers_spawner.finish() }
100 }
101}
102
103/// Spawner for the [`Component`]'s watchers.
104#[derive(Debug)]
105pub struct WatchersSpawner<S, O> {
106 /// State being watched.
107 state: Rc<S>,
108
109 /// Object holding the state.
110 obj: Rc<O>,
111
112 /// All the spawned watchers of the state.
113 spawned_watchers: Vec<TaskHandle>,
114}
115
116impl<S: 'static, O: 'static> WatchersSpawner<S, O> {
117 /// Spawns watchers for the provided [`Stream`].
118 ///
119 /// If watcher returns an error then this error will be printed to the error
120 /// log.
121 ///
122 /// You can stop all listeners tasks spawned by this function by
123 /// [`Drop`]ping the [`Component`].
124 pub fn spawn<R, V, F, H>(&mut self, mut rx: R, handle: F)
125 where
126 R: Stream<Item = V> + Unpin + 'static,
127 F: Fn(Rc<O>, Rc<S>, V) -> H + 'static,
128 H: Future + 'static,
129 <H as Future>::Output: IntoResult,
130 {
131 let obj = Rc::clone(&self.obj);
132 let state = Rc::clone(&self.state);
133 let (fut, handle) = future::abortable(async move {
134 while let Some(value) = rx.next().await {
135 if let Err(e) =
136 (handle)(Rc::clone(&obj), Rc::clone(&state), value)
137 .await
138 .into_result()
139 {
140 log::error!("{e}");
141 }
142 }
143 });
144 platform::spawn(fut.map(drop));
145
146 self.spawned_watchers.push(handle.into());
147 }
148
149 /// Spawns synchronous watchers for the provided [`Stream`].
150 ///
151 /// If watcher returns an error then this error will be printed to the error
152 /// log.
153 ///
154 /// You can stop all listeners tasks spawned by this function by
155 /// [`Drop`]ping the [`Component`].
156 pub fn spawn_sync<R, V, F, Obj, St, H>(&mut self, rx: R, handle: F)
157 where
158 R: Stream<Item = V> + Unpin + 'static,
159 F: Fn(&Obj, &St, V) -> H + 'static,
160 Rc<O>: Borrow<Obj>,
161 Rc<S>: Borrow<St>,
162 H: IntoResult + 'static,
163 {
164 self.spawn(rx, move |o, s, v| {
165 future::ready(handle(o.borrow(), s.borrow(), v))
166 });
167 }
168
169 /// Creates new [`WatchersSpawner`] for the provided object and state.
170 const fn new(state: Rc<S>, obj: Rc<O>) -> Self {
171 Self { state, obj, spawned_watchers: Vec::new() }
172 }
173
174 /// Returns [`TaskHandle`]s for the watchers spawned by this
175 /// [`WatchersSpawner`].
176 fn finish(self) -> Vec<TaskHandle> {
177 self.spawned_watchers
178 }
179}
180
181/// Abstraction describing state of the [`Component`].
182pub trait ComponentState<C>: Sized {
183 /// Spawns all watchers required for this [`ComponentState`].
184 fn spawn_watchers(&self, spawner: &mut WatchersSpawner<Self, C>);
185}
186
187/// Helper trait for naming types of the [`Component`]'s state and object for
188/// the [`ComponentState`] implementation generated by
189/// [`medea_macro::watchers`].
190pub trait ComponentTypes {
191 /// Type of [`Component`]'s state.
192 type State;
193
194 /// Type of object managed by [`Component`].
195 type Obj;
196}
197
198impl<S, O> ComponentTypes for Component<S, O> {
199 type Obj = O;
200 type State = S;
201}
202
203/// [`Result`] coercion for a `handle` accepted by the
204/// [`WatchersSpawner::spawn()`] method.
205#[sealed]
206pub trait IntoResult {
207 /// Type of the coerced [`Result::Err`].
208 type Error: Display;
209
210 /// Coerces into the [`Result`].
211 ///
212 /// # Errors
213 ///
214 /// If coerces into a [`Result::Err`].
215 fn into_result(self) -> Result<(), Self::Error>;
216}
217
218#[sealed]
219impl IntoResult for () {
220 type Error = Infallible;
221
222 fn into_result(self) -> Result<Self, Self::Error> {
223 Ok(self)
224 }
225}
226
227#[sealed]
228impl<E: Display> IntoResult for Result<(), E> {
229 type Error = E;
230
231 fn into_result(self) -> Result<(), Self::Error> {
232 self
233 }
234}