1use std::{
59 collections::{
60 HashMap,
61 hash_map::Entry::{Occupied, Vacant},
62 },
63 time::Duration,
64};
65
66use anyhow::Context as _;
67use opentalk_roomserver_module_livekit::LiveKitModule;
68use opentalk_roomserver_signaling::{
69 module_context::{ChannelDroppedError, ModuleContext},
70 signaling_module::{
71 ModuleJoinData, ModuleSwitchData, NoOp, SignalingModule, SignalingModuleDescription,
72 SignalingModuleFeatureDescription, SignalingModuleInitData,
73 },
74};
75use opentalk_roomserver_types::{
76 connection_id::ConnectionId,
77 room_kind::RoomKind,
78 signaling::module_error::{FatalError, SignalingModuleError},
79};
80use opentalk_roomserver_types_automod::{
81 AUTOMOD_MODULE_ID,
82 command::{AutomodCommand, Select},
83 config::{FrontendConfig, Parameter, SelectionStrategy},
84 event::{AutomodError, AutomodEvent, StoppedReason},
85 state::AutomodState,
86};
87use opentalk_roomserver_types_livekit::{LiveKitInternal, ParticipantsMuted};
88use opentalk_types_common::modules::ModuleId;
89use opentalk_types_signaling::ParticipantId;
90use tokio::sync::oneshot;
91
92use crate::{
93 session::Session,
94 speaker_selection::{SpeakerSelectionOutput, SpeakerUpdate},
95};
96
97pub(crate) mod history_entry;
98mod session;
99mod speaker_selection;
100
101pub struct SpeakerTimeLimitReached {
103 pub speaker: Option<ParticipantId>,
105}
106
107pub struct AutomodModule {
108 sessions: HashMap<RoomKind, Session>,
110}
111
112pub enum AutomodLoopback {
113 SpeakerTimeLimitReached { speaker: Option<ParticipantId> },
114 ParticipantsMuted(ParticipantsMuted),
115 ChannelDropped,
116}
117
118impl From<ChannelDroppedError> for AutomodLoopback {
119 fn from(_: ChannelDroppedError) -> Self {
120 AutomodLoopback::ChannelDropped
121 }
122}
123
124impl SignalingModuleDescription for AutomodModule {
125 const MODULE_ID: ModuleId = AUTOMOD_MODULE_ID;
126 const DESCRIPTION: &'static str =
127 "Handles auto-moderation functionality such as the talking stick";
128 const FEATURES: &[SignalingModuleFeatureDescription] = &[];
129}
130
131impl SignalingModule for AutomodModule {
132 const NAMESPACE: ModuleId = AUTOMOD_MODULE_ID;
133
134 type Incoming = AutomodCommand;
135
136 type Outgoing = AutomodEvent;
137
138 type Internal = NoOp;
139
140 type Loopback = AutomodLoopback;
141
142 type JoinInfo = AutomodState;
143
144 type PeerJoinInfo = ();
145
146 type Error = AutomodError;
147
148 fn init(_init_data: SignalingModuleInitData) -> Option<Self> {
149 Some(Self {
150 sessions: HashMap::new(),
151 })
152 }
153
154 fn on_participant_joined(
155 &mut self,
156 ctx: &mut ModuleContext<'_, Self>,
157 participant_id: ParticipantId,
158 _connection_id: ConnectionId,
159 _is_first_connection: bool,
160 ) -> Result<ModuleJoinData<Self>, SignalingModuleError<Self::Error>> {
161 let state = self.join_room(ctx, ctx.room, participant_id)?;
162
163 Ok(ModuleJoinData {
164 join_success: state,
165 ..Default::default()
166 })
167 }
168
169 fn on_participant_disconnected(
170 &mut self,
171 ctx: &mut ModuleContext<'_, Self>,
172 participant_id: ParticipantId,
173 _connection_id: ConnectionId,
174 ) -> Result<(), SignalingModuleError<Self::Error>> {
175 self.remove_participant(ctx, ctx.room, participant_id)
176 }
177
178 fn on_websocket_message(
179 &mut self,
180 ctx: &mut ModuleContext<'_, Self>,
181 sender: ParticipantId,
182 _connection_id: ConnectionId,
183 content: Self::Incoming,
184 ) -> Result<(), SignalingModuleError<Self::Error>> {
185 match content {
186 AutomodCommand::Start {
187 parameter,
188 allow_list,
189 playlist,
190 } => self.start(ctx, sender, parameter, allow_list, playlist),
191 AutomodCommand::Edit {
192 allow_list,
193 playlist,
194 } => self.edit(ctx, sender, allow_list, playlist),
195 AutomodCommand::Stop => self.stop(ctx, sender),
196 AutomodCommand::Select(select) => self.select(ctx, sender, select),
197 AutomodCommand::Yield { next } => self.yield_next(ctx, sender, next),
198 }
199 }
200
201 fn on_loopback_event(
202 &mut self,
203 ctx: &mut ModuleContext<'_, Self>,
204 event: Self::Loopback,
205 ) -> Result<(), SignalingModuleError<Self::Error>> {
206 match event {
207 AutomodLoopback::SpeakerTimeLimitReached { speaker } => {
208 self.on_speaker_time_limit_reached(ctx, speaker)?;
209 }
210 AutomodLoopback::ParticipantsMuted(ParticipantsMuted { participants, .. }) => {
211 tracing::debug!(
212 "Following participants were muted by the {} module: {participants:?}",
213 Self::NAMESPACE
214 );
215 }
216 AutomodLoopback::ChannelDropped => {
217 ctx.send_ws_message(
218 ctx.participants.in_room(ctx.room).connected().ids(),
219 AutomodEvent::Error(AutomodError::Internal),
220 )?;
221 }
222 }
223
224 Ok(())
225 }
226
227 fn on_breakout_switch(
228 &mut self,
229 ctx: &mut ModuleContext<'_, Self>,
230 participant_id: ParticipantId,
231 old_room: RoomKind,
232 new_room: RoomKind,
233 ) -> Result<ModuleSwitchData<Self>, SignalingModuleError<Self::Error>> {
234 self.remove_participant(ctx, old_room, participant_id)?;
236
237 let Some(state) = self.join_room(ctx, new_room, participant_id)? else {
238 return Ok(ModuleSwitchData::<Self>::default());
240 };
241
242 let switch_success = ctx
243 .participant_state(participant_id)
244 .with_context(|| format!("Missing state for participant {participant_id}"))?
245 .connections()
246 .map(|con| (con, Some(state.clone())))
247 .collect();
248
249 Ok(ModuleSwitchData {
250 switch_success,
251 ..Default::default()
252 })
253 }
254
255 fn on_breakout_closed(
256 &mut self,
257 _ctx: &mut ModuleContext<'_, Self>,
258 ) -> Result<(), SignalingModuleError<Self::Error>> {
259 self.sessions.retain(|room, _| *room == RoomKind::Main);
261 Ok(())
262 }
263}
264
265impl AutomodModule {
266 #[tracing::instrument(skip(self, ctx), level = "debug")]
267 fn start(
268 &mut self,
269 ctx: &mut ModuleContext<'_, Self>,
270 sender: ParticipantId,
271 parameter: Parameter,
272 allow_list: Option<Vec<ParticipantId>>,
273 playlist: Option<Vec<ParticipantId>>,
274 ) -> Result<(), SignalingModuleError<AutomodError>> {
275 if !ctx.is_moderator(sender) {
276 return Err(AutomodError::InsufficientPermissions.into());
277 }
278
279 let remaining = Self::resolve_valid_speaker_list(
280 ctx,
281 parameter.selection_strategy,
282 allow_list,
283 playlist,
284 )
285 .ok_or(SignalingModuleError::Module(AutomodError::InvalidSelection))?;
286
287 match self.sessions.entry(ctx.room) {
288 Occupied(..) => return Err(AutomodError::SessionAlreadyRunning.into()),
289 Vacant(vacant) => {
290 tracing::debug!("Starting automod session in room: {:?}", ctx.room);
291 vacant.insert(Session::new(sender, parameter.clone(), remaining.clone()))
292 }
293 };
294
295 ctx.send_ws_message(
296 ctx.participants.in_room(ctx.room).connected().ids(),
297 AutomodEvent::Started(
298 FrontendConfig {
299 parameter,
300 history: Vec::new(),
301 remaining,
302 issued_by: sender,
303 }
304 .into_public(),
305 ),
306 )?;
307
308 let (tx, rx) = oneshot::channel();
309 ctx.send_internal_command::<LiveKitModule>(LiveKitInternal::Mute {
310 sender: None,
311 participants: ctx.participants.in_room(ctx.room).ids().collect(),
312 return_channel: tx,
313 });
314
315 ctx.recv_loopback(rx, AutomodLoopback::ParticipantsMuted);
316
317 Ok(())
318 }
319
320 #[tracing::instrument(skip(self, ctx), level = "debug")]
321 fn edit(
322 &mut self,
323 ctx: &mut ModuleContext<'_, Self>,
324 sender: ParticipantId,
325 allow_list: Option<Vec<ParticipantId>>,
326 playlist: Option<Vec<ParticipantId>>,
327 ) -> Result<(), SignalingModuleError<AutomodError>> {
328 if !ctx.is_moderator(sender) {
329 return Err(AutomodError::InsufficientPermissions.into());
330 }
331
332 let Some(session) = self.sessions.get_mut(&ctx.room) else {
334 return Err(AutomodError::SessionNotRunning.into());
335 };
336
337 let remaining = Self::resolve_valid_speaker_list(
338 ctx,
339 session.parameter.selection_strategy,
340 allow_list,
341 playlist,
342 )
343 .ok_or(SignalingModuleError::Module(AutomodError::InvalidEdit))?;
344 session.remaining.clone_from(&remaining);
345
346 ctx.send_ws_message(
347 ctx.participants.in_room(ctx.room).connected().ids(),
348 AutomodEvent::RemainingUpdated { remaining },
349 )?;
350
351 Ok(())
352 }
353
354 #[tracing::instrument(skip(self, ctx), level = "debug")]
355 fn stop(
356 &mut self,
357 ctx: &mut ModuleContext<'_, Self>,
358 sender: ParticipantId,
359 ) -> Result<(), SignalingModuleError<AutomodError>> {
360 if !ctx.is_moderator(sender) {
361 return Err(AutomodError::InsufficientPermissions.into());
362 }
363
364 self.stop_session(ctx, StoppedReason::StoppedByModerator { issued_by: sender })?;
365
366 Ok(())
367 }
368
369 #[tracing::instrument(skip(self, ctx), level = "debug")]
370 fn stop_session(
371 &mut self,
372 ctx: &mut ModuleContext<'_, Self>,
373 reason: StoppedReason,
374 ) -> Result<(), FatalError> {
375 self.sessions.remove(&ctx.room);
376
377 ctx.send_ws_message(
378 ctx.participants.in_room(ctx.room).connected().ids(),
379 AutomodEvent::Stopped(reason),
380 )
381 }
382
383 #[tracing::instrument(skip(self, ctx), level = "debug")]
384 fn select(
385 &mut self,
386 ctx: &mut ModuleContext<'_, Self>,
387 sender: ParticipantId,
388 select: Select,
389 ) -> Result<(), SignalingModuleError<AutomodError>> {
390 if !ctx.is_moderator(sender) {
391 return Err(AutomodError::InsufficientPermissions.into());
392 }
393
394 match select {
395 Select::None => self.select_none(ctx),
396 Select::Random => self.select_random(ctx),
397 Select::Next => self.select_next(ctx),
398 Select::Specific {
399 participant,
400 keep_in_remaining,
401 } => self.select_specific(ctx, participant, keep_in_remaining),
402 }
403 }
404
405 #[tracing::instrument(skip(self, ctx), level = "debug")]
406 fn select_none(
407 &mut self,
408 ctx: &mut ModuleContext<'_, Self>,
409 ) -> Result<(), SignalingModuleError<AutomodError>> {
410 let session = self
411 .sessions
412 .get_mut(&ctx.room)
413 .ok_or(AutomodError::SessionNotRunning)?;
414 let previous_speaker = session.speaker;
415
416 let update = speaker_selection::select_unchecked(session, None);
417 let time_limit = session.parameter.time_limit;
418 self.handle_speaker_update(
419 ctx,
420 SpeakerSelectionOutput::ContinueWith { update },
421 time_limit,
422 previous_speaker,
423 )?;
424
425 Ok(())
426 }
427
428 #[tracing::instrument(skip(self, ctx), level = "debug")]
429 fn select_random(
430 &mut self,
431 ctx: &mut ModuleContext<'_, Self>,
432 ) -> Result<(), SignalingModuleError<AutomodError>> {
433 let session = self
434 .sessions
435 .get_mut(&ctx.room)
436 .ok_or(AutomodError::SessionNotRunning)?;
437 let previous_speaker = session.speaker;
438
439 let update = speaker_selection::select_random(session, &mut rand::rng());
440 let time_limit = session.parameter.time_limit;
441 self.handle_speaker_update(ctx, update, time_limit, previous_speaker)?;
442
443 Ok(())
444 }
445
446 #[tracing::instrument(skip(self, ctx), level = "debug")]
447 fn select_next(
448 &mut self,
449 ctx: &mut ModuleContext<'_, Self>,
450 ) -> Result<(), SignalingModuleError<AutomodError>> {
451 let session = self
452 .sessions
453 .get_mut(&ctx.room)
454 .ok_or(AutomodError::SessionNotRunning)?;
455
456 let valid = match session.parameter.selection_strategy {
457 SelectionStrategy::None | SelectionStrategy::Nomination => false,
458 SelectionStrategy::Playlist | SelectionStrategy::Random => true,
459 };
460
461 if !valid {
462 return Err(AutomodError::InvalidSelection.into());
463 }
464
465 let previous_speaker = session.speaker;
466
467 let update = speaker_selection::select_next(session, None, &mut rand::rng())?;
468 let time_limit = session.parameter.time_limit;
469 self.handle_speaker_update(ctx, update, time_limit, previous_speaker)?;
470
471 Ok(())
472 }
473
474 #[tracing::instrument(skip(self, ctx), level = "debug")]
475 fn select_specific(
476 &mut self,
477 ctx: &mut ModuleContext<'_, Self>,
478 participant: ParticipantId,
479 keep_in_remaining: bool,
480 ) -> Result<(), SignalingModuleError<AutomodError>> {
481 let session = self
482 .sessions
483 .get_mut(&ctx.room)
484 .ok_or(AutomodError::SessionNotRunning)?;
485
486 if !ctx
487 .participants
488 .in_room(ctx.room)
489 .connected()
490 .contains(&participant)
491 {
492 return Err(AutomodError::InvalidSelection.into());
493 }
494
495 let previous_speaker = session.speaker;
496 let output =
497 speaker_selection::select_specific(session, Some(participant), keep_in_remaining)?;
498
499 let time_limit = session.parameter.time_limit;
500 self.handle_speaker_update(ctx, output, time_limit, previous_speaker)?;
501
502 Ok(())
503 }
504
505 #[tracing::instrument(skip(self, ctx), level = "debug")]
506 fn handle_speaker_update(
507 &mut self,
508 ctx: &mut ModuleContext<'_, Self>,
509 output: SpeakerSelectionOutput,
510 time_limit: Option<Duration>,
511 previous_speaker: Option<ParticipantId>,
512 ) -> Result<(), FatalError> {
513 let (tx, rx) = oneshot::channel();
515 ctx.send_internal_command::<LiveKitModule>(LiveKitInternal::Mute {
516 sender: None,
517 participants: ctx
518 .participants
519 .in_room(ctx.room)
520 .connected()
521 .ids()
522 .collect(),
523 return_channel: tx,
524 });
525 ctx.recv_loopback(rx, AutomodLoopback::ParticipantsMuted);
526
527 let update = match output {
528 SpeakerSelectionOutput::ContinueWith { update } => update,
529 SpeakerSelectionOutput::End => {
530 return self.stop_session(ctx, StoppedReason::SessionFinished);
531 }
532 };
533
534 if let Some(SpeakerUpdate {
535 speaker,
536 history,
537 remaining,
538 }) = update
539 {
540 if let Some(time_limit) = time_limit {
541 ctx.loopback_after(time_limit, move || {
542 AutomodLoopback::SpeakerTimeLimitReached { speaker }
543 });
544 }
545
546 ctx.send_ws_message(
547 ctx.participants.in_room(ctx.room).connected().ids(),
548 AutomodEvent::SpeakerUpdated {
549 speaker,
550 history,
551 remaining,
552 },
553 )?;
554 } else {
555 let session = self.sessions.get(&ctx.room).with_context(|| {
556 format!(
557 "Trying to handle speaker update in room '{:?}' without a running automod session",
558 ctx.room
559 )
560 }).map_err(FatalError)?;
561
562 ctx.send_ws_message(
563 ctx.participants.in_room(ctx.room).connected().ids(),
564 AutomodEvent::SpeakerUpdated {
565 speaker: session.speaker,
566 history: Some(session.participant_history().collect()),
567 remaining: Some(session.remaining.clone()),
568 },
569 )?;
570 }
571
572 Ok(())
573 }
574
575 #[tracing::instrument(skip(self, ctx), level = "debug")]
576 fn yield_next(
577 &mut self,
578 ctx: &mut ModuleContext<'_, Self>,
579 sender: ParticipantId,
580 next: Option<ParticipantId>,
581 ) -> Result<(), SignalingModuleError<AutomodError>> {
582 let session = self
583 .sessions
584 .get_mut(&ctx.room)
585 .ok_or(AutomodError::SessionNotRunning)?;
586
587 if session.speaker != Some(sender) {
588 return Err(AutomodError::InsufficientPermissions.into());
589 }
590
591 let valid = match session.parameter.selection_strategy {
592 SelectionStrategy::None => false,
593 SelectionStrategy::Playlist | SelectionStrategy::Random => next.is_none(),
594 SelectionStrategy::Nomination => next.is_some(),
595 };
596
597 if !valid {
598 return Err(AutomodError::InvalidSelection.into());
599 }
600
601 let previous_speaker = session.speaker;
602
603 let output = speaker_selection::select_next(session, next, &mut rand::rng())?;
604 let time_limit = session.parameter.time_limit;
605 self.handle_speaker_update(ctx, output, time_limit, previous_speaker)?;
606
607 Ok(())
608 }
609
610 #[tracing::instrument(skip(self, ctx), level = "debug")]
611 fn join_room(
612 &mut self,
613 ctx: &mut ModuleContext<'_, Self>,
614 room: RoomKind,
615 participant_id: ParticipantId,
616 ) -> Result<Option<AutomodState>, FatalError> {
617 let session = self.sessions.get_mut(&room);
618 let Some(session) = session else {
619 return Ok(None);
621 };
622
623 let history: Vec<ParticipantId> = session.participant_history().collect();
624
625 if session.parameter.auto_append_on_join && !history.contains(&participant_id) {
626 session.remaining.push(participant_id);
628 ctx.send_ws_message(
629 ctx.participants
630 .in_room(room)
631 .connected()
632 .iter()
633 .filter_map(
634 |(&id, _)| {
635 if id == participant_id { None } else { Some(id) }
636 },
637 ),
638 AutomodEvent::RemainingUpdated {
639 remaining: session.remaining.clone(),
640 },
641 )?;
642 }
643
644 Ok(Some(AutomodState {
645 config: FrontendConfig {
646 parameter: session.parameter.clone(),
647 history,
648 remaining: session.remaining.clone(),
649 issued_by: session.issued_by,
650 }
651 .into_public(),
652 speaker: session.speaker,
653 }))
654 }
655
656 #[tracing::instrument(skip(self, ctx), level = "debug")]
658 fn remove_participant(
659 &mut self,
660 ctx: &mut ModuleContext<'_, Self>,
661 room: RoomKind,
662 participant_id: ParticipantId,
663 ) -> Result<(), SignalingModuleError<AutomodError>> {
664 let session = self.sessions.get_mut(&room);
665 let Some(session) = session else {
666 return Ok(());
668 };
669
670 let index = session
671 .remaining
672 .iter()
673 .position(|id| *id == participant_id);
674 if let Some(index) = index {
675 session.remaining.remove(index);
676 ctx.send_ws_message(
677 ctx.participants.in_room(room).connected().ids(),
678 AutomodEvent::RemainingUpdated {
679 remaining: session.remaining.clone(),
680 },
681 )?;
682 }
683
684 if session.speaker == Some(participant_id) {
685 self.select_next(ctx)?;
686 }
687
688 Ok(())
689 }
690
691 #[tracing::instrument(skip(self, ctx), level = "debug")]
692 fn on_speaker_time_limit_reached(
693 &mut self,
694 ctx: &mut ModuleContext<'_, Self>,
695 speaker: Option<ParticipantId>,
696 ) -> Result<(), SignalingModuleError<AutomodError>> {
697 let Some(session) = self.sessions.get(&ctx.room) else {
698 return Ok(());
700 };
701 if session.speaker != speaker {
702 return Ok(());
704 }
705
706 match session.parameter.selection_strategy {
707 SelectionStrategy::None | SelectionStrategy::Nomination => self.select_none(ctx)?,
710 _ => self.select_next(ctx)?,
711 }
712
713 Ok(())
714 }
715
716 fn resolve_valid_speaker_list(
719 ctx: &mut ModuleContext<'_, Self>,
720 selection_strategy: SelectionStrategy,
721 allow_list: Option<Vec<ParticipantId>>,
722 playlist: Option<Vec<ParticipantId>>,
723 ) -> Option<Vec<ParticipantId>> {
724 let list = if selection_strategy.uses_allow_list() {
725 allow_list
726 } else {
727 playlist
728 };
729
730 let list = match list {
731 Some(list) if list.is_empty() => return None,
732 Some(list) => list,
733 None => return None,
734 };
735
736 let connected_participants: Vec<ParticipantId> = ctx
737 .participants
738 .in_room(ctx.room)
739 .connected()
740 .ids()
741 .collect();
742 if !list
743 .iter()
744 .all(|participant| connected_participants.contains(participant))
745 {
746 return None;
747 }
748
749 Some(list)
750 }
751}