medea_control_api_proto/control/mod.rs
1//! [`ControlApi`] definitions.
2//!
3//! [`ControlApi`]: Api
4
5pub mod endpoint;
6pub mod member;
7pub mod room;
8
9pub use std::collections::HashMap as Pipeline;
10use std::{collections::HashMap, str::FromStr};
11
12use async_trait::async_trait;
13use derive_more::with_trait::{Display, Error, From};
14#[cfg(feature = "serde")]
15use serde::{Deserialize, Serialize};
16
17#[doc(inline)]
18pub use self::{endpoint::Endpoint, member::Member, room::Room};
19
20/// API allowing to control a media server dynamically, by creating, updating
21/// and destroying pipelines of media [`Element`]s on it.
22///
23/// Both API client and API server should implement this trait.
24#[async_trait]
25pub trait Api {
26    /// Error returned by this [`ControlApi`].
27    ///
28    /// [`ControlApi`]: Api
29    type Error;
30
31    /// Creates a new [`Element`] on the media server.
32    ///
33    /// # Non-idempotent
34    ///
35    /// Errors if an [`Element`] with such ID already exists.
36    ///
37    /// # Errors
38    ///
39    /// - If the [`Element`]'s parent [`Element`] (identified by a [`Fid`])
40    ///   doesn't exist.
41    /// - If an [`Element`] with such ID already exists.
42    /// - If the media server failed to perform this request.
43    async fn create(
44        &self,
45        request: Request,
46    ) -> Result<member::Sids, Self::Error>;
47
48    /// Applies changes to an existing [`Element`] on the media server, or
49    /// creates a new one in case there is no [`Element`] with such ID.
50    ///
51    /// # Idempotent
52    ///
53    /// If no [`Element`] with such ID exists, then it will be created,
54    /// otherwise it will be reconfigured. [`Element`]s that exist on the same
55    /// hierarchy level, but are not specified in the provided [`Request`], will
56    /// be removed.
57    ///
58    /// # Errors
59    ///
60    /// - If the [`Element`]'s parent [`Element`] (identified by a [`Fid`])
61    ///   doesn't exist.
62    /// - If the media server failed to perform this request.
63    async fn apply(
64        &self,
65        request: Request,
66    ) -> Result<member::Sids, Self::Error>;
67
68    /// Removes [`Element`]s from the media server.
69    ///
70    /// Allows referring multiple [`Element`]s on the last two levels of a
71    /// [`Fid`].
72    ///
73    /// # Idempotent
74    ///
75    /// If no [`Element`]s with such [`Fid`]s exist, then succeeds.
76    ///
77    /// # Errors
78    ///
79    /// - If no [`Fid`]s were specified.
80    /// - If any [`Fid`] contains multiple [`room::Id`]s.
81    /// - If the media server failed to perform this request.
82    async fn delete(&self, fids: &[Fid]) -> Result<(), Self::Error>;
83
84    /// Lookups [`Element`]s by their [`Fid`]s on the media server.
85    ///
86    /// If no [`Fid`]s are specified, then returns all the current [`Element`]s
87    /// on the media server.
88    ///
89    /// If no [`Element`] exists for some [`Fid`], then it won't be present in
90    /// the returned [`Elements`] collection.
91    ///
92    /// # Errors
93    ///
94    /// - If the media server failed to perform this request.
95    async fn get(&self, fids: &[Fid]) -> Result<Elements, Self::Error>;
96
97    /// Checks healthiness of the media server.
98    ///
99    /// Caller should assert that the returned [`Pong`] has the same nonce as
100    /// the sent [`Ping`].
101    ///
102    /// # Errors
103    ///
104    /// - If the media server failed to perform this request.
105    async fn healthz(&self, ping: Ping) -> Result<Pong, Self::Error>;
106}
107
108/// Request for creating or applying an [`Element`] on a media server.
109#[derive(Clone, Debug)]
110pub enum Request {
111    /// [`Room`] to be created or to apply changes to.
112    Room {
113        /// ID of the created [`Room`].
114        id: room::Id,
115
116        /// Spec of the created [`Room`].
117        spec: room::Spec,
118    },
119
120    /// [`Member`] to be created or to apply changes to.
121    Member {
122        /// ID of the created [`Member`].
123        id: member::Id,
124
125        /// ID of the [`Room`] this [`Member`] participates in.
126        room_id: room::Id,
127
128        /// Spec of the created [`Member`].
129        spec: Box<member::Spec>,
130    },
131
132    /// [`Endpoint`] to be created or to apply changes to.
133    Endpoint {
134        /// ID of the created [`Endpoint`].
135        id: endpoint::Id,
136
137        /// ID of the [`Room`] this [`Endpoint`] belongs to.
138        room_id: room::Id,
139
140        /// ID of the [`Member`] this [`Endpoint`] belongs to.
141        member_id: member::Id,
142
143        /// Spec of the created [`Endpoint`].
144        spec: endpoint::Spec,
145    },
146}
147
148/// All possible media elements of [`ControlApi`].
149///
150/// [`ControlApi`]: Api
151#[derive(Clone, Debug, From)]
152pub enum Element {
153    /// [`Room`] media element.
154    Room(Room),
155
156    /// [`Member`] media element.
157    Member(Box<Member>),
158
159    /// [`Endpoint`] media element.
160    Endpoint(Endpoint),
161}
162
163/// Collection of uniquely identified [`Element`]s.
164pub type Elements = HashMap<Fid, Element>;
165
166/// Possible [`Element`]s allowed to act as a root of [`ControlApi`] static
167/// spec.
168///
169/// [`ControlApi`]: Api
170#[derive(Clone, Debug, Eq, PartialEq)]
171#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
172#[cfg_attr(feature = "serde", serde(tag = "kind"))]
173pub enum RootElement {
174    /// [`Room`] media [`Element`].
175    Room(Room),
176}
177
178/// FID (Full ID) is a composition of media [`Element`] IDs referring to some
179/// [`Element`] on a whole media server uniquely.
180#[derive(Clone, Debug, Display, Eq, Hash, Ord, PartialEq, PartialOrd)]
181pub enum Fid {
182    /// FID of a [`Room`].
183    #[display("{id}")]
184    Room {
185        /// Unique ID of the [`Room`].
186        id: room::Id,
187    },
188
189    /// FID of a [`Member`].
190    #[display("{room_id}/{id}")]
191    Member {
192        /// ID of the [`Member`] in the [`Room`].
193        id: member::Id,
194
195        /// Unique ID of the [`Room`].
196        room_id: room::Id,
197    },
198
199    /// FID of an [`Endpoint`].
200    #[display("{room_id}/{member_id}/{id}")]
201    Endpoint {
202        /// ID of the [`Endpoint`] of the [`Member`].
203        id: endpoint::Id,
204
205        /// Unique ID of the [`Room`].
206        room_id: room::Id,
207
208        /// ID of the [`Member`] in the [`Room`].
209        member_id: member::Id,
210    },
211}
212
213impl FromStr for Fid {
214    type Err = ParseFidError;
215
216    fn from_str(val: &str) -> Result<Self, Self::Err> {
217        if val.is_empty() {
218            return Err(ParseFidError::Empty);
219        }
220
221        let mut splitted = val.split('/');
222
223        let room_id = splitted.next().ok_or(ParseFidError::Empty)?;
224        if room_id.is_empty() {
225            return Err(ParseFidError::MissingPath(val.into()));
226        }
227
228        let Some(member_id) = splitted.next() else {
229            return Ok(Self::Room { id: room_id.into() });
230        };
231        if member_id.is_empty() {
232            return Err(ParseFidError::MissingPath(val.into()));
233        }
234
235        let Some(endpoint_id) = splitted.next() else {
236            return Ok(Self::Member {
237                id: member_id.into(),
238                room_id: room_id.into(),
239            });
240        };
241        if endpoint_id.is_empty() {
242            return Err(ParseFidError::MissingPath(val.into()));
243        }
244
245        if splitted.next().is_some() {
246            Err(ParseFidError::TooManyPaths(val.into()))
247        } else {
248            Ok(Self::Endpoint {
249                id: endpoint_id.into(),
250                room_id: room_id.into(),
251                member_id: member_id.into(),
252            })
253        }
254    }
255}
256
257/// Possible errors of parsing a [`Fid`].
258#[derive(Debug, Display, Error)]
259pub enum ParseFidError {
260    /// [`Fid`] is empty.
261    #[display("FID is empty")]
262    Empty,
263
264    /// [`Fid`] has too many paths.
265    #[display("FID has too many paths: {_0}")]
266    TooManyPaths(#[error(not(source))] Box<str>),
267
268    /// [`Fid`] has missing paths.
269    #[display("FID has missing paths: {_0}")]
270    MissingPath(#[error(not(source))] Box<str>),
271}
272
273/// [`Ping`] message received by a media server periodically for probing its
274/// healthiness.
275///
276/// Each new [`Ping`] should increment its nonce, starting with `0`.
277#[derive(
278    Clone, Copy, Debug, Display, Eq, From, Hash, Ord, PartialEq, PartialOrd,
279)]
280pub struct Ping(pub u32);
281
282/// [`Pong`] message sent by a media server in response to a received [`Ping`]
283/// message.
284///
285/// Contains nonce of the answered [`Ping`] message.
286#[derive(
287    Clone, Copy, Debug, Display, Eq, From, Hash, Ord, PartialEq, PartialOrd,
288)]
289pub struct Pong(pub u32);
290
291#[cfg(all(feature = "serde", test))]
292mod serialization {
293    use super::{
294        Room, RootElement,
295        endpoint::{
296            web_rtc_play::{self, LocalSrcUri},
297            web_rtc_publish::{self, AudioSettings, P2pMode, VideoSettings},
298        },
299        member::{self, Credentials},
300        room,
301    };
302
303    // language=YAML
304    const SPEC: &str = r#"
305kind: Room
306id: test-call
307spec:
308  pipeline:
309    caller:
310      kind: Member
311      spec:
312        credentials:
313          plain: test
314        pipeline:
315          publish:
316            kind: WebRtcPublishEndpoint
317            spec:
318              p2p: Always
319    some-member:
320      kind: Member
321      spec:
322        credentials:
323          plain: test
324        pipeline:
325          publish:
326            kind: WebRtcPublishEndpoint
327            spec:
328              p2p: Always
329    responder:
330      kind: Member
331      spec:
332        credentials:
333          plain: test
334        pipeline:
335          play:
336            kind: WebRtcPlayEndpoint
337            spec:
338              src: "local://test-call/caller/publish"
339          play2:
340            kind: WebRtcPlayEndpoint
341            spec:
342              src: "local://test-call/some-member/publish"
343    "#;
344
345    #[test]
346    fn spec() {
347        assert_eq!(
348            serde_yaml::from_str::<RootElement>(SPEC)
349                .unwrap_or_else(|e| panic!("{e}")),
350            RootElement::Room(Room {
351                id: "test-call".into(),
352                spec: room::Spec {
353                    pipeline: [
354                        (
355                            "caller".into(),
356                            member::Spec {
357                                pipeline: [(
358                                    "publish".into(),
359                                    web_rtc_publish::Spec {
360                                        p2p: P2pMode::Always,
361                                        force_relay: false,
362                                        audio_settings: AudioSettings::default(
363                                        ),
364                                        video_settings: VideoSettings::default(
365                                        ),
366                                    }
367                                    .into(),
368                                )]
369                                .into_iter()
370                                .collect(),
371                                credentials: Some(Credentials::Plain(
372                                    "test".into(),
373                                )),
374                                on_join: None,
375                                on_leave: None,
376                                idle_timeout: None,
377                                reconnect_timeout: None,
378                                ping_interval: None,
379                            }
380                            .into(),
381                        ),
382                        (
383                            "some-member".into(),
384                            member::Spec {
385                                pipeline: [(
386                                    "publish".into(),
387                                    web_rtc_publish::Spec {
388                                        p2p: P2pMode::Always,
389                                        force_relay: false,
390                                        audio_settings: AudioSettings::default(
391                                        ),
392                                        video_settings: VideoSettings::default(
393                                        ),
394                                    }
395                                    .into(),
396                                )]
397                                .into_iter()
398                                .collect(),
399                                credentials: Some(Credentials::Plain(
400                                    "test".into()
401                                )),
402                                on_join: None,
403                                on_leave: None,
404                                idle_timeout: None,
405                                reconnect_timeout: None,
406                                ping_interval: None,
407                            }
408                            .into(),
409                        ),
410                        (
411                            "responder".into(),
412                            member::Spec {
413                                pipeline: [
414                                    (
415                                        "play".into(),
416                                        web_rtc_play::Spec {
417                                            src: LocalSrcUri {
418                                                room_id: "test-call".into(),
419                                                member_id: "caller".into(),
420                                                endpoint_id: "publish".into(),
421                                            },
422                                            force_relay: false,
423                                        }
424                                        .into(),
425                                    ),
426                                    (
427                                        "play2".into(),
428                                        web_rtc_play::Spec {
429                                            src: LocalSrcUri {
430                                                room_id: "test-call".into(),
431                                                member_id: "some-member".into(),
432                                                endpoint_id: "publish".into(),
433                                            },
434                                            force_relay: false,
435                                        }
436                                        .into(),
437                                    )
438                                ]
439                                .into_iter()
440                                .collect(),
441                                credentials: Some(Credentials::Plain(
442                                    "test".into(),
443                                )),
444                                on_join: None,
445                                on_leave: None,
446                                idle_timeout: None,
447                                reconnect_timeout: None,
448                                ping_interval: None,
449                            }
450                            .into(),
451                        ),
452                    ]
453                    .into_iter()
454                    .collect(),
455                }
456            })
457        );
458    }
459}