Skip to main content

opentalk_roomserver_module_automod/
lib.rs

1// SPDX-License-Identifier: EUPL-1.2
2// SPDX-FileCopyrightText: OpenTalk Team <mail@opentalk.eu>
3
4//! # Auto-Moderation Module
5//!
6//! ## Functionality
7//!
8//! On room startup the automod is disabled.
9//!
10//! Selecting the options for the automod is managed by the frontend and this module does not
11//! provide templates or anything else
12//!
13//! Following `selection_strategies` are defined:
14//!
15//! - `None`: No automatic reselection happens after the current speaker yields. The next one must
16//!   always be selected by the moderator. The moderator may choose a participant directly or let
17//!   the roomserver choose one randomly. For that the roomserver holds a `allow_list` which is a
18//!   set of participants which are able to be randomly selected. Furthermore the roomserver will
19//!   hold a list of start/stop speaker events. That list can optionally be used to avoid double
20//!   selections when randomly choosing a participant.
21//!
22//! - `Playlist`: The playlist-strategy requires a playlist of participants. This list will be
23//!   stored ordered inside the roomserver. Whenever a speaker yields the roomserver will
24//!   automatically choose the next participant in the list to be the next speaker.
25//!
26//!   A moderator may choose to skip over a speaker. That can be done by selecting the next one or
27//!   let the roomserver choose someone random from the playlist.
28//!   The playlist can, while the automod is active, be edited.
29//!
30//! - `Random`: This strategy behaves like `None` but will always choose the next speaker randomly
31//!   from the `allow_list` as soon as the current speaker yields.
32//!
33//! - `Nomination`: This strategy behaves like `None` but requires the current speaker to nominate
34//!   the next participant to be speaker. The nominated participant MUST be inside the `allow_list`
35//!   and if double selection is not enabled the roomserver will check if the nominated participant
36//!   already was a speaker.
37//!
38//! ### Lifecycle
39//!
40//! As soon as a moderator starts the automod, the automod-module will set the config in memory and
41//! then send a start event to all participants.
42//!
43//! The selection of the first speaker must be done by the frontend then, depending of the
44//! `selection_strategy`, will the automod continue running until finished or stopped.
45//!
46//! Once the active speaker yields or their time runs out, the automod module is responsible to
47//! select the next speaker (if the `selection_strategy` requires it). This behavior MUST only
48//! be executed after ensuring that this participant is in fact still the speaker.
49//!
50//! If the participant leaves while being speaker, the automod-module must execute the same
51//! behavior as if the participants simply yielded without selecting the next one (which would be
52//! required for the `nominate` `selection_strategy`. A moderator has to intervene in this
53//! situation).
54//!
55//! Moderators will always be able to execute a re-selection of the current speaker regardless of
56//! the `selection_strategy`.
57
58use std::{
59    collections::{
60        HashMap,
61        hash_map::Entry::{Occupied, Vacant},
62    },
63    time::Duration,
64};
65
66use anyhow::Context as _;
67use opentalk_roomserver_module_livekit::LiveKitModule;
68use opentalk_roomserver_signaling::{
69    module_context::{ChannelDroppedError, ModuleContext},
70    signaling_module::{
71        ModuleJoinData, ModuleSwitchData, NoOp, SignalingModule, SignalingModuleDescription,
72        SignalingModuleFeatureDescription, SignalingModuleInitData,
73    },
74};
75use opentalk_roomserver_types::{
76    connection_id::ConnectionId,
77    room_kind::RoomKind,
78    signaling::module_error::{FatalError, SignalingModuleError},
79};
80use opentalk_roomserver_types_automod::{
81    AUTOMOD_MODULE_ID,
82    command::{AutomodCommand, Select},
83    config::{FrontendConfig, Parameter, SelectionStrategy},
84    event::{AutomodError, AutomodEvent, StoppedReason},
85    state::AutomodState,
86};
87use opentalk_roomserver_types_livekit::{LiveKitInternal, ParticipantsMuted};
88use opentalk_types_common::modules::ModuleId;
89use opentalk_types_signaling::ParticipantId;
90use tokio::sync::oneshot;
91
92use crate::{
93    session::Session,
94    speaker_selection::{SpeakerSelectionOutput, SpeakerUpdate},
95};
96
97pub(crate) mod history_entry;
98mod session;
99mod speaker_selection;
100
101/// Indicates that the time limit for a speaker has been reached
102pub struct SpeakerTimeLimitReached {
103    /// The participant that has reached the time limit
104    pub speaker: Option<ParticipantId>,
105}
106
107pub struct AutomodModule {
108    /// The currently active automod sessions, indexed by room
109    sessions: HashMap<RoomKind, Session>,
110}
111
112pub enum AutomodLoopback {
113    SpeakerTimeLimitReached { speaker: Option<ParticipantId> },
114    ParticipantsMuted(ParticipantsMuted),
115    ChannelDropped,
116}
117
118impl From<ChannelDroppedError> for AutomodLoopback {
119    fn from(_: ChannelDroppedError) -> Self {
120        AutomodLoopback::ChannelDropped
121    }
122}
123
124impl SignalingModuleDescription for AutomodModule {
125    const MODULE_ID: ModuleId = AUTOMOD_MODULE_ID;
126    const DESCRIPTION: &'static str =
127        "Handles auto-moderation functionality such as the talking stick";
128    const FEATURES: &[SignalingModuleFeatureDescription] = &[];
129}
130
131impl SignalingModule for AutomodModule {
132    const NAMESPACE: ModuleId = AUTOMOD_MODULE_ID;
133
134    type Incoming = AutomodCommand;
135
136    type Outgoing = AutomodEvent;
137
138    type Internal = NoOp;
139
140    type Loopback = AutomodLoopback;
141
142    type JoinInfo = AutomodState;
143
144    type PeerJoinInfo = ();
145
146    type Error = AutomodError;
147
148    fn init(_init_data: SignalingModuleInitData) -> Option<Self> {
149        Some(Self {
150            sessions: HashMap::new(),
151        })
152    }
153
154    fn on_participant_joined(
155        &mut self,
156        ctx: &mut ModuleContext<'_, Self>,
157        participant_id: ParticipantId,
158        _connection_id: ConnectionId,
159        _is_first_connection: bool,
160    ) -> Result<ModuleJoinData<Self>, SignalingModuleError<Self::Error>> {
161        let state = self.join_room(ctx, ctx.room, participant_id)?;
162
163        Ok(ModuleJoinData {
164            join_success: state,
165            ..Default::default()
166        })
167    }
168
169    fn on_participant_disconnected(
170        &mut self,
171        ctx: &mut ModuleContext<'_, Self>,
172        participant_id: ParticipantId,
173        _connection_id: ConnectionId,
174    ) -> Result<(), SignalingModuleError<Self::Error>> {
175        self.remove_participant(ctx, ctx.room, participant_id)
176    }
177
178    fn on_websocket_message(
179        &mut self,
180        ctx: &mut ModuleContext<'_, Self>,
181        sender: ParticipantId,
182        _connection_id: ConnectionId,
183        content: Self::Incoming,
184    ) -> Result<(), SignalingModuleError<Self::Error>> {
185        match content {
186            AutomodCommand::Start {
187                parameter,
188                allow_list,
189                playlist,
190            } => self.start(ctx, sender, parameter, allow_list, playlist),
191            AutomodCommand::Edit {
192                allow_list,
193                playlist,
194            } => self.edit(ctx, sender, allow_list, playlist),
195            AutomodCommand::Stop => self.stop(ctx, sender),
196            AutomodCommand::Select(select) => self.select(ctx, sender, select),
197            AutomodCommand::Yield { next } => self.yield_next(ctx, sender, next),
198        }
199    }
200
201    fn on_loopback_event(
202        &mut self,
203        ctx: &mut ModuleContext<'_, Self>,
204        event: Self::Loopback,
205    ) -> Result<(), SignalingModuleError<Self::Error>> {
206        match event {
207            AutomodLoopback::SpeakerTimeLimitReached { speaker } => {
208                self.on_speaker_time_limit_reached(ctx, speaker)?;
209            }
210            AutomodLoopback::ParticipantsMuted(ParticipantsMuted { participants, .. }) => {
211                tracing::debug!(
212                    "Following participants were muted by the {} module: {participants:?}",
213                    Self::NAMESPACE
214                );
215            }
216            AutomodLoopback::ChannelDropped => {
217                ctx.send_ws_message(
218                    ctx.participants.in_room(ctx.room).connected().ids(),
219                    AutomodEvent::Error(AutomodError::Internal),
220                )?;
221            }
222        }
223
224        Ok(())
225    }
226
227    fn on_breakout_switch(
228        &mut self,
229        ctx: &mut ModuleContext<'_, Self>,
230        participant_id: ParticipantId,
231        old_room: RoomKind,
232        new_room: RoomKind,
233    ) -> Result<ModuleSwitchData<Self>, SignalingModuleError<Self::Error>> {
234        // Remove the participant from the remaining list of the old room
235        self.remove_participant(ctx, old_room, participant_id)?;
236
237        let Some(state) = self.join_room(ctx, new_room, participant_id)? else {
238            // Automod not active in the new room, return empty `ModuleSwitchData`
239            return Ok(ModuleSwitchData::<Self>::default());
240        };
241
242        let switch_success = ctx
243            .participant_state(participant_id)
244            .with_context(|| format!("Missing state for participant {participant_id}"))?
245            .connections()
246            .map(|con| (con, Some(state.clone())))
247            .collect();
248
249        Ok(ModuleSwitchData {
250            switch_success,
251            ..Default::default()
252        })
253    }
254
255    fn on_breakout_closed(
256        &mut self,
257        _ctx: &mut ModuleContext<'_, Self>,
258    ) -> Result<(), SignalingModuleError<Self::Error>> {
259        // Remove the sessions for the breakout rooms
260        self.sessions.retain(|room, _| *room == RoomKind::Main);
261        Ok(())
262    }
263}
264
265impl AutomodModule {
266    #[tracing::instrument(skip(self, ctx), level = "debug")]
267    fn start(
268        &mut self,
269        ctx: &mut ModuleContext<'_, Self>,
270        sender: ParticipantId,
271        parameter: Parameter,
272        allow_list: Option<Vec<ParticipantId>>,
273        playlist: Option<Vec<ParticipantId>>,
274    ) -> Result<(), SignalingModuleError<AutomodError>> {
275        if !ctx.is_moderator(sender) {
276            return Err(AutomodError::InsufficientPermissions.into());
277        }
278
279        let remaining = Self::resolve_valid_speaker_list(
280            ctx,
281            parameter.selection_strategy,
282            allow_list,
283            playlist,
284        )
285        .ok_or(SignalingModuleError::Module(AutomodError::InvalidSelection))?;
286
287        match self.sessions.entry(ctx.room) {
288            Occupied(..) => return Err(AutomodError::SessionAlreadyRunning.into()),
289            Vacant(vacant) => {
290                tracing::debug!("Starting automod session in room: {:?}", ctx.room);
291                vacant.insert(Session::new(sender, parameter.clone(), remaining.clone()))
292            }
293        };
294
295        ctx.send_ws_message(
296            ctx.participants.in_room(ctx.room).connected().ids(),
297            AutomodEvent::Started(
298                FrontendConfig {
299                    parameter,
300                    history: Vec::new(),
301                    remaining,
302                    issued_by: sender,
303                }
304                .into_public(),
305            ),
306        )?;
307
308        let (tx, rx) = oneshot::channel();
309        ctx.send_internal_command::<LiveKitModule>(LiveKitInternal::Mute {
310            sender: None,
311            participants: ctx.participants.in_room(ctx.room).ids().collect(),
312            return_channel: tx,
313        });
314
315        ctx.recv_loopback(rx, AutomodLoopback::ParticipantsMuted);
316
317        Ok(())
318    }
319
320    #[tracing::instrument(skip(self, ctx), level = "debug")]
321    fn edit(
322        &mut self,
323        ctx: &mut ModuleContext<'_, Self>,
324        sender: ParticipantId,
325        allow_list: Option<Vec<ParticipantId>>,
326        playlist: Option<Vec<ParticipantId>>,
327    ) -> Result<(), SignalingModuleError<AutomodError>> {
328        if !ctx.is_moderator(sender) {
329            return Err(AutomodError::InsufficientPermissions.into());
330        }
331
332        // Only edit if automod is active in the current room
333        let Some(session) = self.sessions.get_mut(&ctx.room) else {
334            return Err(AutomodError::SessionNotRunning.into());
335        };
336
337        let remaining = Self::resolve_valid_speaker_list(
338            ctx,
339            session.parameter.selection_strategy,
340            allow_list,
341            playlist,
342        )
343        .ok_or(SignalingModuleError::Module(AutomodError::InvalidEdit))?;
344        session.remaining.clone_from(&remaining);
345
346        ctx.send_ws_message(
347            ctx.participants.in_room(ctx.room).connected().ids(),
348            AutomodEvent::RemainingUpdated { remaining },
349        )?;
350
351        Ok(())
352    }
353
354    #[tracing::instrument(skip(self, ctx), level = "debug")]
355    fn stop(
356        &mut self,
357        ctx: &mut ModuleContext<'_, Self>,
358        sender: ParticipantId,
359    ) -> Result<(), SignalingModuleError<AutomodError>> {
360        if !ctx.is_moderator(sender) {
361            return Err(AutomodError::InsufficientPermissions.into());
362        }
363
364        self.stop_session(ctx, StoppedReason::StoppedByModerator { issued_by: sender })?;
365
366        Ok(())
367    }
368
369    #[tracing::instrument(skip(self, ctx), level = "debug")]
370    fn stop_session(
371        &mut self,
372        ctx: &mut ModuleContext<'_, Self>,
373        reason: StoppedReason,
374    ) -> Result<(), FatalError> {
375        self.sessions.remove(&ctx.room);
376
377        ctx.send_ws_message(
378            ctx.participants.in_room(ctx.room).connected().ids(),
379            AutomodEvent::Stopped(reason),
380        )
381    }
382
383    #[tracing::instrument(skip(self, ctx), level = "debug")]
384    fn select(
385        &mut self,
386        ctx: &mut ModuleContext<'_, Self>,
387        sender: ParticipantId,
388        select: Select,
389    ) -> Result<(), SignalingModuleError<AutomodError>> {
390        if !ctx.is_moderator(sender) {
391            return Err(AutomodError::InsufficientPermissions.into());
392        }
393
394        match select {
395            Select::None => self.select_none(ctx),
396            Select::Random => self.select_random(ctx),
397            Select::Next => self.select_next(ctx),
398            Select::Specific {
399                participant,
400                keep_in_remaining,
401            } => self.select_specific(ctx, participant, keep_in_remaining),
402        }
403    }
404
405    #[tracing::instrument(skip(self, ctx), level = "debug")]
406    fn select_none(
407        &mut self,
408        ctx: &mut ModuleContext<'_, Self>,
409    ) -> Result<(), SignalingModuleError<AutomodError>> {
410        let session = self
411            .sessions
412            .get_mut(&ctx.room)
413            .ok_or(AutomodError::SessionNotRunning)?;
414        let previous_speaker = session.speaker;
415
416        let update = speaker_selection::select_unchecked(session, None);
417        let time_limit = session.parameter.time_limit;
418        self.handle_speaker_update(
419            ctx,
420            SpeakerSelectionOutput::ContinueWith { update },
421            time_limit,
422            previous_speaker,
423        )?;
424
425        Ok(())
426    }
427
428    #[tracing::instrument(skip(self, ctx), level = "debug")]
429    fn select_random(
430        &mut self,
431        ctx: &mut ModuleContext<'_, Self>,
432    ) -> Result<(), SignalingModuleError<AutomodError>> {
433        let session = self
434            .sessions
435            .get_mut(&ctx.room)
436            .ok_or(AutomodError::SessionNotRunning)?;
437        let previous_speaker = session.speaker;
438
439        let update = speaker_selection::select_random(session, &mut rand::rng());
440        let time_limit = session.parameter.time_limit;
441        self.handle_speaker_update(ctx, update, time_limit, previous_speaker)?;
442
443        Ok(())
444    }
445
446    #[tracing::instrument(skip(self, ctx), level = "debug")]
447    fn select_next(
448        &mut self,
449        ctx: &mut ModuleContext<'_, Self>,
450    ) -> Result<(), SignalingModuleError<AutomodError>> {
451        let session = self
452            .sessions
453            .get_mut(&ctx.room)
454            .ok_or(AutomodError::SessionNotRunning)?;
455
456        let valid = match session.parameter.selection_strategy {
457            SelectionStrategy::None | SelectionStrategy::Nomination => false,
458            SelectionStrategy::Playlist | SelectionStrategy::Random => true,
459        };
460
461        if !valid {
462            return Err(AutomodError::InvalidSelection.into());
463        }
464
465        let previous_speaker = session.speaker;
466
467        let update = speaker_selection::select_next(session, None, &mut rand::rng())?;
468        let time_limit = session.parameter.time_limit;
469        self.handle_speaker_update(ctx, update, time_limit, previous_speaker)?;
470
471        Ok(())
472    }
473
474    #[tracing::instrument(skip(self, ctx), level = "debug")]
475    fn select_specific(
476        &mut self,
477        ctx: &mut ModuleContext<'_, Self>,
478        participant: ParticipantId,
479        keep_in_remaining: bool,
480    ) -> Result<(), SignalingModuleError<AutomodError>> {
481        let session = self
482            .sessions
483            .get_mut(&ctx.room)
484            .ok_or(AutomodError::SessionNotRunning)?;
485
486        if !ctx
487            .participants
488            .in_room(ctx.room)
489            .connected()
490            .contains(&participant)
491        {
492            return Err(AutomodError::InvalidSelection.into());
493        }
494
495        let previous_speaker = session.speaker;
496        let output =
497            speaker_selection::select_specific(session, Some(participant), keep_in_remaining)?;
498
499        let time_limit = session.parameter.time_limit;
500        self.handle_speaker_update(ctx, output, time_limit, previous_speaker)?;
501
502        Ok(())
503    }
504
505    #[tracing::instrument(skip(self, ctx), level = "debug")]
506    fn handle_speaker_update(
507        &mut self,
508        ctx: &mut ModuleContext<'_, Self>,
509        output: SpeakerSelectionOutput,
510        time_limit: Option<Duration>,
511        previous_speaker: Option<ParticipantId>,
512    ) -> Result<(), FatalError> {
513        // Mute all participants when the speaker is changed
514        let (tx, rx) = oneshot::channel();
515        ctx.send_internal_command::<LiveKitModule>(LiveKitInternal::Mute {
516            sender: None,
517            participants: ctx
518                .participants
519                .in_room(ctx.room)
520                .connected()
521                .ids()
522                .collect(),
523            return_channel: tx,
524        });
525        ctx.recv_loopback(rx, AutomodLoopback::ParticipantsMuted);
526
527        let update = match output {
528            SpeakerSelectionOutput::ContinueWith { update } => update,
529            SpeakerSelectionOutput::End => {
530                return self.stop_session(ctx, StoppedReason::SessionFinished);
531            }
532        };
533
534        if let Some(SpeakerUpdate {
535            speaker,
536            history,
537            remaining,
538        }) = update
539        {
540            if let Some(time_limit) = time_limit {
541                ctx.loopback_after(time_limit, move || {
542                    AutomodLoopback::SpeakerTimeLimitReached { speaker }
543                });
544            }
545
546            ctx.send_ws_message(
547                ctx.participants.in_room(ctx.room).connected().ids(),
548                AutomodEvent::SpeakerUpdated {
549                    speaker,
550                    history,
551                    remaining,
552                },
553            )?;
554        } else {
555            let session = self.sessions.get(&ctx.room).with_context(|| {
556                format!(
557                    "Trying to handle speaker update in room '{:?}' without a running automod session",
558                    ctx.room
559                )
560            }).map_err(FatalError)?;
561
562            ctx.send_ws_message(
563                ctx.participants.in_room(ctx.room).connected().ids(),
564                AutomodEvent::SpeakerUpdated {
565                    speaker: session.speaker,
566                    history: Some(session.participant_history().collect()),
567                    remaining: Some(session.remaining.clone()),
568                },
569            )?;
570        }
571
572        Ok(())
573    }
574
575    #[tracing::instrument(skip(self, ctx), level = "debug")]
576    fn yield_next(
577        &mut self,
578        ctx: &mut ModuleContext<'_, Self>,
579        sender: ParticipantId,
580        next: Option<ParticipantId>,
581    ) -> Result<(), SignalingModuleError<AutomodError>> {
582        let session = self
583            .sessions
584            .get_mut(&ctx.room)
585            .ok_or(AutomodError::SessionNotRunning)?;
586
587        if session.speaker != Some(sender) {
588            return Err(AutomodError::InsufficientPermissions.into());
589        }
590
591        let valid = match session.parameter.selection_strategy {
592            SelectionStrategy::None => false,
593            SelectionStrategy::Playlist | SelectionStrategy::Random => next.is_none(),
594            SelectionStrategy::Nomination => next.is_some(),
595        };
596
597        if !valid {
598            return Err(AutomodError::InvalidSelection.into());
599        }
600
601        let previous_speaker = session.speaker;
602
603        let output = speaker_selection::select_next(session, next, &mut rand::rng())?;
604        let time_limit = session.parameter.time_limit;
605        self.handle_speaker_update(ctx, output, time_limit, previous_speaker)?;
606
607        Ok(())
608    }
609
610    #[tracing::instrument(skip(self, ctx), level = "debug")]
611    fn join_room(
612        &mut self,
613        ctx: &mut ModuleContext<'_, Self>,
614        room: RoomKind,
615        participant_id: ParticipantId,
616    ) -> Result<Option<AutomodState>, FatalError> {
617        let session = self.sessions.get_mut(&room);
618        let Some(session) = session else {
619            // Automod not active, return empty JoinInfo
620            return Ok(None);
621        };
622
623        let history: Vec<ParticipantId> = session.participant_history().collect();
624
625        if session.parameter.auto_append_on_join && !history.contains(&participant_id) {
626            // Append the joining participant to the history
627            session.remaining.push(participant_id);
628            ctx.send_ws_message(
629                ctx.participants
630                    .in_room(room)
631                    .connected()
632                    .iter()
633                    .filter_map(
634                        |(&id, _)| {
635                            if id == participant_id { None } else { Some(id) }
636                        },
637                    ),
638                AutomodEvent::RemainingUpdated {
639                    remaining: session.remaining.clone(),
640                },
641            )?;
642        }
643
644        Ok(Some(AutomodState {
645            config: FrontendConfig {
646                parameter: session.parameter.clone(),
647                history,
648                remaining: session.remaining.clone(),
649                issued_by: session.issued_by,
650            }
651            .into_public(),
652            speaker: session.speaker,
653        }))
654    }
655
656    /// Removes the participant from the automod session in `room`.
657    #[tracing::instrument(skip(self, ctx), level = "debug")]
658    fn remove_participant(
659        &mut self,
660        ctx: &mut ModuleContext<'_, Self>,
661        room: RoomKind,
662        participant_id: ParticipantId,
663    ) -> Result<(), SignalingModuleError<AutomodError>> {
664        let session = self.sessions.get_mut(&room);
665        let Some(session) = session else {
666            // Automod not active, nothing to do
667            return Ok(());
668        };
669
670        let index = session
671            .remaining
672            .iter()
673            .position(|id| *id == participant_id);
674        if let Some(index) = index {
675            session.remaining.remove(index);
676            ctx.send_ws_message(
677                ctx.participants.in_room(room).connected().ids(),
678                AutomodEvent::RemainingUpdated {
679                    remaining: session.remaining.clone(),
680                },
681            )?;
682        }
683
684        if session.speaker == Some(participant_id) {
685            self.select_next(ctx)?;
686        }
687
688        Ok(())
689    }
690
691    #[tracing::instrument(skip(self, ctx), level = "debug")]
692    fn on_speaker_time_limit_reached(
693        &mut self,
694        ctx: &mut ModuleContext<'_, Self>,
695        speaker: Option<ParticipantId>,
696    ) -> Result<(), SignalingModuleError<AutomodError>> {
697        let Some(session) = self.sessions.get(&ctx.room) else {
698            // The session has ended in the meantime
699            return Ok(());
700        };
701        if session.speaker != speaker {
702            // The speaker has changed in the meantime
703            return Ok(());
704        }
705
706        match session.parameter.selection_strategy {
707            // Selection strategies `None` and `Nomination` do not have a concept of
708            // a "next" speaker. Select no speaker.
709            SelectionStrategy::None | SelectionStrategy::Nomination => self.select_none(ctx)?,
710            _ => self.select_next(ctx)?,
711        }
712
713        Ok(())
714    }
715
716    /// Returns the list corresponding to the given `selection_strategy` or [`None`]
717    /// if no list matches the criteria.
718    fn resolve_valid_speaker_list(
719        ctx: &mut ModuleContext<'_, Self>,
720        selection_strategy: SelectionStrategy,
721        allow_list: Option<Vec<ParticipantId>>,
722        playlist: Option<Vec<ParticipantId>>,
723    ) -> Option<Vec<ParticipantId>> {
724        let list = if selection_strategy.uses_allow_list() {
725            allow_list
726        } else {
727            playlist
728        };
729
730        let list = match list {
731            Some(list) if list.is_empty() => return None,
732            Some(list) => list,
733            None => return None,
734        };
735
736        let connected_participants: Vec<ParticipantId> = ctx
737            .participants
738            .in_room(ctx.room)
739            .connected()
740            .ids()
741            .collect();
742        if !list
743            .iter()
744            .all(|participant| connected_participants.contains(participant))
745        {
746            return None;
747        }
748
749        Some(list)
750    }
751}