1mod ice_candidates;
4mod local_sdp;
5mod tracks_repository;
6mod watchers;
7
8use std::{cell::Cell, collections::HashSet, rc::Rc};
9
10use futures::{StreamExt as _, TryFutureExt as _, future::LocalBoxFuture};
11pub use local_sdp::DESCRIPTION_APPROVE_TIMEOUT;
12use medea_client_api_proto::{
13 self as proto, IceCandidate, IceServer, NegotiationRole, PeerId as Id,
14 TrackId,
15};
16use medea_reactive::{AllProcessed, ObservableCell, ProgressableCell};
17use proto::{ConnectionMode, MemberId};
18use tracerr::Traced;
19
20use self::{
21 ice_candidates::IceCandidates, local_sdp::LocalSdp,
22 tracks_repository::TracksRepository,
23};
24use crate::{
25 media::LocalTracksConstraints,
26 peer::{
27 LocalStreamUpdateCriteria, PeerConnection, UpdateLocalStreamError,
28 media::{receiver, sender},
29 },
30 utils::{AsProtoState, SynchronizableState, Updatable, component},
31};
32
33#[derive(Clone, Copy, Debug, Eq, PartialEq)]
35pub enum SyncPhase {
36 Desynced,
38
39 Syncing,
41
42 Synced,
44}
45
46#[derive(Clone, Copy, Debug, Eq, PartialEq)]
80enum NegotiationPhase {
81 Stable,
83
84 WaitLocalSdp,
86
87 WaitLocalSdpApprove,
89
90 WaitRemoteSdp,
92}
93
94#[derive(Debug)]
96pub struct State {
97 id: Id,
99
100 connection_mode: ConnectionMode,
105
106 senders: TracksRepository<sender::State>,
108
109 receivers: TracksRepository<receiver::State>,
111
112 force_relay: bool,
115
116 ice_servers: Vec<IceServer>,
118
119 negotiation_role: ProgressableCell<Option<NegotiationRole>>,
121
122 negotiation_phase: ObservableCell<NegotiationPhase>,
124
125 local_sdp: LocalSdp,
127
128 remote_sdp: ProgressableCell<Option<String>>,
130
131 restart_ice: Cell<bool>,
133
134 ice_candidates: IceCandidates,
136
137 maybe_update_local_stream: ObservableCell<bool>,
140
141 maybe_update_connections:
146 ObservableCell<Option<(TrackId, HashSet<MemberId>)>>,
147
148 sync_phase: ObservableCell<SyncPhase>,
150}
151
152impl State {
153 #[must_use]
155 pub fn new(
156 id: Id,
157 ice_servers: Vec<IceServer>,
158 force_relay: bool,
159 negotiation_role: Option<NegotiationRole>,
160 connection_mode: ConnectionMode,
161 ) -> Self {
162 Self {
163 id,
164 connection_mode,
165 senders: TracksRepository::new(),
166 receivers: TracksRepository::new(),
167 ice_servers,
168 force_relay,
169 remote_sdp: ProgressableCell::new(None),
170 local_sdp: LocalSdp::new(),
171 negotiation_role: ProgressableCell::new(negotiation_role),
172 negotiation_phase: ObservableCell::new(NegotiationPhase::Stable),
173 restart_ice: Cell::new(false),
174 ice_candidates: IceCandidates::new(),
175 maybe_update_local_stream: ObservableCell::new(false),
176 maybe_update_connections: ObservableCell::new(None),
177 sync_phase: ObservableCell::new(SyncPhase::Synced),
178 }
179 }
180
181 #[must_use]
183 pub const fn connection_mode(&self) -> ConnectionMode {
184 self.connection_mode
185 }
186
187 #[must_use]
189 pub const fn id(&self) -> Id {
190 self.id
191 }
192
193 #[must_use]
195 pub const fn ice_servers(&self) -> &Vec<IceServer> {
196 &self.ice_servers
197 }
198
199 #[must_use]
201 pub const fn force_relay(&self) -> bool {
202 self.force_relay
203 }
204
205 pub fn insert_sender(&self, track_id: TrackId, sender: Rc<sender::State>) {
207 self.senders.insert(track_id, sender);
208 }
209
210 pub fn insert_receiver(
212 &self,
213 track_id: TrackId,
214 receiver: Rc<receiver::State>,
215 ) {
216 self.receivers.insert(track_id, receiver);
217 }
218
219 #[must_use]
221 pub fn get_sender(&self, track_id: TrackId) -> Option<Rc<sender::State>> {
222 self.senders.get(track_id)
223 }
224
225 #[must_use]
227 pub fn get_receiver(
228 &self,
229 track_id: TrackId,
230 ) -> Option<Rc<receiver::State>> {
231 self.receivers.get(track_id)
232 }
233
234 pub fn get_send_tracks(&self) -> Vec<TrackId> {
236 self.senders.ids()
237 }
238
239 pub fn get_recv_tracks(&self) -> Vec<TrackId> {
241 self.receivers.ids()
242 }
243
244 pub fn get_tracks(&self) -> Vec<TrackId> {
246 self.get_send_tracks()
247 .into_iter()
248 .chain(self.get_recv_tracks())
249 .collect()
250 }
251
252 pub async fn set_negotiation_role(
254 &self,
255 negotiation_role: NegotiationRole,
256 ) {
257 _ = self
258 .negotiation_role
259 .subscribe()
260 .any(async |val| val.is_none())
261 .await;
262 self.negotiation_role.set(Some(negotiation_role));
263 }
264
265 pub fn restart_ice(&self) {
267 self.restart_ice.set(true);
268 }
269
270 pub fn remove_track(&self, track_id: TrackId) {
273 if !self.receivers.remove(track_id) {
274 _ = self.senders.remove(track_id);
275 }
276 }
277
278 pub fn set_remote_sdp(&self, sdp: String) {
280 self.remote_sdp.set(Some(sdp));
281 }
282
283 pub fn add_ice_candidate(&self, ice_candidate: IceCandidate) {
285 self.ice_candidates.add(ice_candidate);
286 }
287
288 pub fn apply_local_sdp(&self, sdp: String) {
290 self.local_sdp.approved_set(sdp);
291 }
292
293 pub fn stop_timeouts(&self) {
297 self.local_sdp.stop_timeout();
298 }
299
300 pub fn resume_timeouts(&self) {
304 self.local_sdp.resume_timeout();
305 }
306
307 pub fn local_stream_update_result(
321 &self,
322 tracks_ids: HashSet<TrackId>,
323 ) -> LocalBoxFuture<'static, Result<(), Traced<UpdateLocalStreamError>>>
324 {
325 Box::pin(
326 self.senders
327 .local_stream_update_result(tracks_ids)
328 .map_err(tracerr::map_from_and_wrap!()),
329 )
330 }
331
332 pub fn when_all_updated(&self) -> AllProcessed<'static> {
335 medea_reactive::when_all_processed(vec![
336 self.senders.when_updated().into(),
337 self.receivers.when_updated().into(),
338 ])
339 }
340
341 async fn update_local_stream(
346 &self,
347 peer: &Rc<PeerConnection>,
348 ) -> Result<(), Traced<UpdateLocalStreamError>> {
349 let mut criteria = LocalStreamUpdateCriteria::empty();
350 let senders = self.senders.get_outdated();
351 for s in &senders {
352 criteria.add(s.media_kind(), s.media_source());
353 }
354 let res = peer
355 .update_local_stream(criteria)
356 .await
357 .map_err(tracerr::map_from_and_wrap!())
358 .map(drop);
359 for s in senders {
360 if let Err(err) = res.clone() {
361 s.failed_local_stream_update(err);
362 } else {
363 s.local_stream_updated();
364 }
365 }
366 res
367 }
368
369 pub fn insert_track(
371 &self,
372 track: &proto::Track,
373 send_constraints: LocalTracksConstraints,
374 ) {
375 match &track.direction {
376 proto::Direction::Send { receivers, mid } => {
377 self.senders.insert(
378 track.id,
379 Rc::new(sender::State::new(
380 track.id,
381 mid.clone(),
382 track.media_type.clone(),
383 track.media_direction,
384 track.muted,
385 receivers.clone(),
386 send_constraints,
387 self.connection_mode,
388 )),
389 );
390 }
391 proto::Direction::Recv { sender, mid } => {
392 self.receivers.insert(
393 track.id,
394 Rc::new(receiver::State::new(
395 track.id,
396 mid.clone(),
397 track.media_type.clone(),
398 track.media_direction,
399 track.muted,
400 sender.clone(),
401 self.connection_mode,
402 )),
403 );
404 }
405 }
406 }
407
408 pub fn when_all_senders_processed(&self) -> AllProcessed<'static> {
411 self.senders.when_all_processed()
412 }
413
414 fn when_all_receivers_processed(&self) -> AllProcessed<'static> {
417 self.receivers.when_all_processed()
418 }
419
420 pub async fn patch_track(&self, patch: proto::TrackPatchEvent) {
425 if let Some(receivers) = &patch.receivers {
426 _ = self.maybe_update_connections.when_eq(None).await;
427 self.maybe_update_connections
428 .set(Some((patch.id, receivers.clone().into_iter().collect())));
429 }
430
431 if let Some(sender) = self.get_sender(patch.id) {
432 sender.update(patch);
433 _ = self.maybe_update_local_stream.when_eq(false).await;
434 self.maybe_update_local_stream.set(true);
435 } else if let Some(receiver) = self.get_receiver(patch.id) {
436 receiver.update(&patch);
437 } else {
438 log::warn!("Cannot apply patch to `Track`: {}", patch.id.0);
439 }
440 }
441
442 #[must_use]
444 pub fn current_sdp_offer(&self) -> Option<String> {
445 self.local_sdp.current()
446 }
447}
448
449pub type Component = component::Component<State, PeerConnection>;
451
452impl AsProtoState for State {
453 type Output = proto::state::Peer;
454
455 fn as_proto(&self) -> Self::Output {
456 Self::Output {
457 id: self.id,
458 connection_mode: self.connection_mode,
459 senders: self.senders.as_proto(),
460 receivers: self.receivers.as_proto(),
461 ice_candidates: self.ice_candidates.as_proto(),
462 force_relay: self.force_relay,
463 ice_servers: self.ice_servers.clone(),
464 negotiation_role: self.negotiation_role.get(),
465 local_sdp: self.local_sdp.current(),
466 remote_sdp: self.remote_sdp.get(),
467 restart_ice: self.restart_ice.get(),
468 }
469 }
470}
471
472impl SynchronizableState for State {
473 type Input = proto::state::Peer;
474
475 fn from_proto(
476 input: Self::Input,
477 send_constraints: &LocalTracksConstraints,
478 ) -> Self {
479 let state = Self::new(
480 input.id,
481 input.ice_servers,
482 input.force_relay,
483 input.negotiation_role,
484 input.connection_mode,
485 );
486
487 #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
488 for (id, sender) in input.senders {
489 if !sender.receivers.is_empty() {
490 state.senders.insert(
491 id,
492 Rc::new(sender::State::from_proto(
493 sender,
494 send_constraints,
495 )),
496 );
497 }
498 }
499 #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
500 for (id, receiver) in input.receivers {
501 state.receivers.insert(
502 id,
503 Rc::new(receiver::State::from_proto(
504 receiver,
505 send_constraints,
506 )),
507 );
508 }
509 #[expect(clippy::iter_over_hash_type, reason = "order doesn't matter")]
510 for ice_candidate in input.ice_candidates {
511 state.ice_candidates.add(ice_candidate);
512 }
513
514 state
515 }
516
517 fn apply(&self, input: Self::Input, send_cons: &LocalTracksConstraints) {
518 if input.negotiation_role.is_some() {
519 self.negotiation_role.set(input.negotiation_role);
520 }
521 if input.restart_ice {
522 self.restart_ice.set(true);
523 }
524 if let Some(sdp_offer) = input.local_sdp {
525 self.local_sdp.approved_set(sdp_offer);
526 } else {
527 self.negotiation_phase.set(NegotiationPhase::WaitLocalSdp);
528 }
529 self.remote_sdp.set(input.remote_sdp);
530 self.ice_candidates.apply(input.ice_candidates, send_cons);
531 self.senders.apply(input.senders, send_cons);
532 self.receivers.apply(input.receivers, send_cons);
533
534 self.sync_phase.set(SyncPhase::Synced);
535 }
536}
537
538impl Updatable for State {
539 fn when_stabilized(&self) -> AllProcessed<'static> {
540 medea_reactive::when_all_processed(vec![
541 self.senders.when_stabilized().into(),
542 self.receivers.when_stabilized().into(),
543 ])
544 }
545
546 fn when_updated(&self) -> AllProcessed<'static> {
547 medea_reactive::when_all_processed(vec![
548 self.receivers.when_updated().into(),
549 self.senders.when_updated().into(),
550 ])
551 }
552
553 fn connection_lost(&self) {
554 self.sync_phase.set(SyncPhase::Desynced);
555 self.senders.connection_lost();
556 self.receivers.connection_lost();
557 }
558
559 fn connection_recovered(&self) {
560 self.sync_phase.set(SyncPhase::Syncing);
561 self.senders.connection_recovered();
562 self.receivers.connection_recovered();
563 }
564}
565
566#[cfg(feature = "mockable")]
567#[expect(clippy::allow_attributes, reason = "`#[expect]` is not considered")]
569#[allow(clippy::multiple_inherent_impl, reason = "feature gated")]
570impl State {
571 pub async fn when_remote_sdp_processed(&self) {
573 self.remote_sdp.when_all_processed().await;
574 }
575
576 pub fn reset_negotiation_role(&self) {
578 self.negotiation_phase.set(NegotiationPhase::Stable);
579 self.negotiation_role.set(None);
580 }
581
582 #[must_use]
584 pub fn negotiation_role(&self) -> Option<NegotiationRole> {
585 self.negotiation_role.get()
586 }
587
588 pub fn when_local_sdp_approve_needed(
590 &self,
591 ) -> impl Future<Output = ()> + use<> {
592 use futures::FutureExt as _;
593
594 self.negotiation_phase
595 .when_eq(NegotiationPhase::WaitLocalSdpApprove)
596 .map(drop)
597 }
598
599 pub fn stabilize_all(&self) {
601 self.receivers.stabilize_all();
602 }
603
604 pub async fn when_local_sdp_updated(&self) -> Option<String> {
607 use futures::StreamExt as _;
608
609 self.local_sdp.subscribe().skip(1).next().await.flatten()
610 }
611
612 pub async fn when_all_tracks_created(&self) {
615 medea_reactive::when_all_processed(vec![
616 self.senders.when_insert_processed().into(),
617 self.receivers.when_insert_processed().into(),
618 ])
619 .await;
620 }
621
622 pub fn synced(&self) {
624 self.senders.synced();
625 self.receivers.synced();
626 self.sync_phase.set(SyncPhase::Synced);
627 }
628}