medea_control_api_proto/control/mod.rs
1//! [`ControlApi`] definitions.
2//!
3//! [`ControlApi`]: Api
4
5pub mod endpoint;
6pub mod member;
7pub mod room;
8
9pub use std::collections::HashMap as Pipeline;
10use std::{collections::HashMap, str::FromStr};
11
12use async_trait::async_trait;
13use derive_more::with_trait::{Display, Error, From};
14#[cfg(feature = "serde")]
15use serde::{Deserialize, Serialize};
16
17#[doc(inline)]
18pub use self::{endpoint::Endpoint, member::Member, room::Room};
19
20/// API allowing to control a media server dynamically, by creating, updating
21/// and destroying pipelines of media [`Element`]s on it.
22///
23/// Both API client and API server should implement this trait.
24#[async_trait]
25pub trait Api {
26 /// Error returned by this [`ControlApi`].
27 ///
28 /// [`ControlApi`]: Api
29 type Error;
30
31 /// Creates a new [`Element`] on the media server.
32 ///
33 /// # Non-idempotent
34 ///
35 /// Errors if an [`Element`] with such ID already exists.
36 ///
37 /// # Errors
38 ///
39 /// - If the [`Element`]'s parent [`Element`] (identified by a [`Fid`])
40 /// doesn't exist.
41 /// - If an [`Element`] with such ID already exists.
42 /// - If the media server failed to perform this request.
43 async fn create(
44 &self,
45 request: Request,
46 ) -> Result<member::Sids, Self::Error>;
47
48 /// Applies changes to an existing [`Element`] on the media server, or
49 /// creates a new one in case there is no [`Element`] with such ID.
50 ///
51 /// # Idempotent
52 ///
53 /// If no [`Element`] with such ID exists, then it will be created,
54 /// otherwise it will be reconfigured. [`Element`]s that exist on the same
55 /// hierarchy level, but are not specified in the provided [`Request`], will
56 /// be removed.
57 ///
58 /// # Errors
59 ///
60 /// - If the [`Element`]'s parent [`Element`] (identified by a [`Fid`])
61 /// doesn't exist.
62 /// - If the media server failed to perform this request.
63 async fn apply(
64 &self,
65 request: Request,
66 ) -> Result<member::Sids, Self::Error>;
67
68 /// Removes [`Element`]s from the media server.
69 ///
70 /// Allows referring multiple [`Element`]s on the last two levels of a
71 /// [`Fid`].
72 ///
73 /// # Idempotent
74 ///
75 /// If no [`Element`]s with such [`Fid`]s exist, then succeeds.
76 ///
77 /// # Errors
78 ///
79 /// - If no [`Fid`]s were specified.
80 /// - If any [`Fid`] contains multiple [`room::Id`]s.
81 /// - If the media server failed to perform this request.
82 async fn delete(&self, fids: &[Fid]) -> Result<(), Self::Error>;
83
84 /// Lookups [`Element`]s by their [`Fid`]s on the media server.
85 ///
86 /// If no [`Fid`]s are specified, then returns all the current [`Element`]s
87 /// on the media server.
88 ///
89 /// If no [`Element`] exists for some [`Fid`], then it won't be present in
90 /// the returned [`Elements`] collection.
91 ///
92 /// # Errors
93 ///
94 /// - If the media server failed to perform this request.
95 async fn get(&self, fids: &[Fid]) -> Result<Elements, Self::Error>;
96
97 /// Checks healthiness of the media server.
98 ///
99 /// Caller should assert that the returned [`Pong`] has the same nonce as
100 /// the sent [`Ping`].
101 ///
102 /// # Errors
103 ///
104 /// - If the media server failed to perform this request.
105 async fn healthz(&self, ping: Ping) -> Result<Pong, Self::Error>;
106}
107
108/// Request for creating or applying an [`Element`] on a media server.
109#[derive(Clone, Debug)]
110pub enum Request {
111 /// [`Room`] to be created or to apply changes to.
112 Room {
113 /// ID of the created [`Room`].
114 id: room::Id,
115
116 /// Spec of the created [`Room`].
117 spec: room::Spec,
118 },
119
120 /// [`Member`] to be created or to apply changes to.
121 Member {
122 /// ID of the created [`Member`].
123 id: member::Id,
124
125 /// ID of the [`Room`] this [`Member`] participates in.
126 room_id: room::Id,
127
128 /// Spec of the created [`Member`].
129 spec: Box<member::Spec>,
130 },
131
132 /// [`Endpoint`] to be created or to apply changes to.
133 Endpoint {
134 /// ID of the created [`Endpoint`].
135 id: endpoint::Id,
136
137 /// ID of the [`Room`] this [`Endpoint`] belongs to.
138 room_id: room::Id,
139
140 /// ID of the [`Member`] this [`Endpoint`] belongs to.
141 member_id: member::Id,
142
143 /// Spec of the created [`Endpoint`].
144 spec: endpoint::Spec,
145 },
146}
147
148/// All possible media elements of [`ControlApi`].
149///
150/// [`ControlApi`]: Api
151#[derive(Clone, Debug, From)]
152pub enum Element {
153 /// [`Room`] media element.
154 Room(Room),
155
156 /// [`Member`] media element.
157 Member(Box<Member>),
158
159 /// [`Endpoint`] media element.
160 Endpoint(Endpoint),
161}
162
163/// Collection of uniquely identified [`Element`]s.
164pub type Elements = HashMap<Fid, Element>;
165
166/// Possible [`Element`]s allowed to act as a root of [`ControlApi`] static
167/// spec.
168///
169/// [`ControlApi`]: Api
170#[derive(Clone, Debug, Eq, PartialEq)]
171#[cfg_attr(feature = "serde", derive(Deserialize, Serialize))]
172#[cfg_attr(feature = "serde", serde(tag = "kind"))]
173pub enum RootElement {
174 /// [`Room`] media [`Element`].
175 Room(Room),
176}
177
178/// FID (Full ID) is a composition of media [`Element`] IDs referring to some
179/// [`Element`] on a whole media server uniquely.
180#[derive(Clone, Debug, Display, Eq, Hash, Ord, PartialEq, PartialOrd)]
181pub enum Fid {
182 /// FID of a [`Room`].
183 #[display("{id}")]
184 Room {
185 /// Unique ID of the [`Room`].
186 id: room::Id,
187 },
188
189 /// FID of a [`Member`].
190 #[display("{room_id}/{id}")]
191 Member {
192 /// ID of the [`Member`] in the [`Room`].
193 id: member::Id,
194
195 /// Unique ID of the [`Room`].
196 room_id: room::Id,
197 },
198
199 /// FID of an [`Endpoint`].
200 #[display("{room_id}/{member_id}/{id}")]
201 Endpoint {
202 /// ID of the [`Endpoint`] of the [`Member`].
203 id: endpoint::Id,
204
205 /// Unique ID of the [`Room`].
206 room_id: room::Id,
207
208 /// ID of the [`Member`] in the [`Room`].
209 member_id: member::Id,
210 },
211}
212
213impl FromStr for Fid {
214 type Err = ParseFidError;
215
216 fn from_str(val: &str) -> Result<Self, Self::Err> {
217 if val.is_empty() {
218 return Err(ParseFidError::Empty);
219 }
220
221 let mut splitted = val.split('/');
222
223 let room_id = splitted.next().ok_or(ParseFidError::Empty)?;
224 if room_id.is_empty() {
225 return Err(ParseFidError::MissingPath(val.into()));
226 }
227
228 let Some(member_id) = splitted.next() else {
229 return Ok(Self::Room { id: room_id.into() });
230 };
231 if member_id.is_empty() {
232 return Err(ParseFidError::MissingPath(val.into()));
233 }
234
235 let Some(endpoint_id) = splitted.next() else {
236 return Ok(Self::Member {
237 id: member_id.into(),
238 room_id: room_id.into(),
239 });
240 };
241 if endpoint_id.is_empty() {
242 return Err(ParseFidError::MissingPath(val.into()));
243 }
244
245 if splitted.next().is_some() {
246 Err(ParseFidError::TooManyPaths(val.into()))
247 } else {
248 Ok(Self::Endpoint {
249 id: endpoint_id.into(),
250 room_id: room_id.into(),
251 member_id: member_id.into(),
252 })
253 }
254 }
255}
256
257/// Possible errors of parsing a [`Fid`].
258#[derive(Debug, Display, Error)]
259pub enum ParseFidError {
260 /// [`Fid`] is empty.
261 #[display("FID is empty")]
262 Empty,
263
264 /// [`Fid`] has too many paths.
265 #[display("FID has too many paths: {_0}")]
266 TooManyPaths(#[error(not(source))] Box<str>),
267
268 /// [`Fid`] has missing paths.
269 #[display("FID has missing paths: {_0}")]
270 MissingPath(#[error(not(source))] Box<str>),
271}
272
273/// [`Ping`] message received by a media server periodically for probing its
274/// healthiness.
275///
276/// Each new [`Ping`] should increment its nonce, starting with `0`.
277#[derive(
278 Clone, Copy, Debug, Display, Eq, From, Hash, Ord, PartialEq, PartialOrd,
279)]
280pub struct Ping(pub u32);
281
282/// [`Pong`] message sent by a media server in response to a received [`Ping`]
283/// message.
284///
285/// Contains nonce of the answered [`Ping`] message.
286#[derive(
287 Clone, Copy, Debug, Display, Eq, From, Hash, Ord, PartialEq, PartialOrd,
288)]
289pub struct Pong(pub u32);
290
291#[cfg(all(feature = "serde", test))]
292mod serialization {
293 use super::{
294 Room, RootElement,
295 endpoint::{
296 web_rtc_play::{self, LocalSrcUri},
297 web_rtc_publish::{self, AudioSettings, P2pMode, VideoSettings},
298 },
299 member::{self, Credentials},
300 room,
301 };
302
303 // language=YAML
304 const SPEC: &str = r#"
305kind: Room
306id: test-call
307spec:
308 pipeline:
309 caller:
310 kind: Member
311 spec:
312 credentials:
313 plain: test
314 pipeline:
315 publish:
316 kind: WebRtcPublishEndpoint
317 spec:
318 p2p: Always
319 some-member:
320 kind: Member
321 spec:
322 credentials:
323 plain: test
324 pipeline:
325 publish:
326 kind: WebRtcPublishEndpoint
327 spec:
328 p2p: Always
329 responder:
330 kind: Member
331 spec:
332 credentials:
333 plain: test
334 pipeline:
335 play:
336 kind: WebRtcPlayEndpoint
337 spec:
338 src: "local://test-call/caller/publish"
339 play2:
340 kind: WebRtcPlayEndpoint
341 spec:
342 src: "local://test-call/some-member/publish"
343 "#;
344
345 #[test]
346 fn spec() {
347 assert_eq!(
348 serde_yaml::from_str::<RootElement>(SPEC)
349 .unwrap_or_else(|e| panic!("{e}")),
350 RootElement::Room(Room {
351 id: "test-call".into(),
352 spec: room::Spec {
353 pipeline: [
354 (
355 "caller".into(),
356 member::Spec {
357 pipeline: [(
358 "publish".into(),
359 web_rtc_publish::Spec {
360 p2p: P2pMode::Always,
361 force_relay: false,
362 audio_settings: AudioSettings::default(
363 ),
364 video_settings: VideoSettings::default(
365 ),
366 }
367 .into(),
368 )]
369 .into_iter()
370 .collect(),
371 credentials: Some(Credentials::Plain(
372 "test".into(),
373 )),
374 on_join: None,
375 on_leave: None,
376 idle_timeout: None,
377 reconnect_timeout: None,
378 ping_interval: None,
379 }
380 .into(),
381 ),
382 (
383 "some-member".into(),
384 member::Spec {
385 pipeline: [(
386 "publish".into(),
387 web_rtc_publish::Spec {
388 p2p: P2pMode::Always,
389 force_relay: false,
390 audio_settings: AudioSettings::default(
391 ),
392 video_settings: VideoSettings::default(
393 ),
394 }
395 .into(),
396 )]
397 .into_iter()
398 .collect(),
399 credentials: Some(Credentials::Plain(
400 "test".into()
401 )),
402 on_join: None,
403 on_leave: None,
404 idle_timeout: None,
405 reconnect_timeout: None,
406 ping_interval: None,
407 }
408 .into(),
409 ),
410 (
411 "responder".into(),
412 member::Spec {
413 pipeline: [
414 (
415 "play".into(),
416 web_rtc_play::Spec {
417 src: LocalSrcUri {
418 room_id: "test-call".into(),
419 member_id: "caller".into(),
420 endpoint_id: "publish".into(),
421 },
422 force_relay: false,
423 }
424 .into(),
425 ),
426 (
427 "play2".into(),
428 web_rtc_play::Spec {
429 src: LocalSrcUri {
430 room_id: "test-call".into(),
431 member_id: "some-member".into(),
432 endpoint_id: "publish".into(),
433 },
434 force_relay: false,
435 }
436 .into(),
437 )
438 ]
439 .into_iter()
440 .collect(),
441 credentials: Some(Credentials::Plain(
442 "test".into(),
443 )),
444 on_join: None,
445 on_leave: None,
446 idle_timeout: None,
447 reconnect_timeout: None,
448 ping_interval: None,
449 }
450 .into(),
451 ),
452 ]
453 .into_iter()
454 .collect(),
455 }
456 })
457 );
458 }
459}