Skip to main content

opentalk_roomserver_module_timer/
lib.rs

1// SPDX-License-Identifier: EUPL-1.2
2//
3// SPDX-FileCopyrightText: OpenTalk Team <mail@opentalk.eu>
4
5use std::{
6    collections::{BTreeSet, HashMap},
7    time::Duration,
8};
9
10use anyhow::Context;
11use opentalk_roomserver_signaling::{
12    module_context::{ChannelDroppedError, ModuleContext},
13    signaling_module::{
14        ModuleJoinData, ModuleSwitchData, NoOp, PeerDataMap, SignalingModule,
15        SignalingModuleDescription, SignalingModuleFeatureDescription, SignalingModuleInitData,
16    },
17};
18use opentalk_roomserver_types::{
19    breakout::BreakoutRoom, connection_id::ConnectionId, room_kind::RoomKind,
20    signaling::module_error::SignalingModuleError,
21};
22use opentalk_roomserver_types_timer::{
23    Kind, StopKind, TIMER_MODULE_ID, TimerCommand, TimerConfig, TimerError, command,
24    event::{Stopped, TimerEvent},
25    peer_state::TimerPeerState,
26    state::TimerState,
27};
28use opentalk_types_common::{modules::ModuleId, time::Timestamp};
29use opentalk_types_signaling::ParticipantId;
30
31use crate::timer::Timer;
32
33mod timer;
34
35#[derive(Debug)]
36pub enum TimerLoopback {
37    Stopped(Stopped),
38    ChannelDropped,
39}
40
41impl From<ChannelDroppedError> for TimerLoopback {
42    fn from(_: ChannelDroppedError) -> Self {
43        Self::ChannelDropped
44    }
45}
46
47#[derive(Debug)]
48pub struct TimerModule {
49    timers: HashMap<RoomKind, Option<Timer>>,
50    ready_participants: HashMap<RoomKind, BTreeSet<ParticipantId>>,
51}
52
53impl SignalingModuleDescription for TimerModule {
54    const MODULE_ID: ModuleId = TIMER_MODULE_ID;
55    const DESCRIPTION: &'static str =
56        "Handles timer functionality including the coffee-break timer.";
57    const FEATURES: &[SignalingModuleFeatureDescription] = &[];
58}
59
60impl SignalingModule for TimerModule {
61    const NAMESPACE: ModuleId = TIMER_MODULE_ID;
62
63    type Incoming = TimerCommand;
64
65    type Outgoing = TimerEvent;
66
67    type Internal = NoOp;
68
69    type Loopback = TimerLoopback;
70
71    type JoinInfo = TimerState;
72
73    type PeerJoinInfo = TimerPeerState;
74
75    type Error = TimerError;
76
77    fn init(_init_data: SignalingModuleInitData) -> Option<Self> {
78        Some(Self {
79            timers: HashMap::from([(RoomKind::Main, None)]),
80            ready_participants: HashMap::from([(RoomKind::Main, BTreeSet::new())]),
81        })
82    }
83
84    fn on_participant_joined(
85        &mut self,
86        ctx: &mut ModuleContext<'_, Self>,
87        participant_id: ParticipantId,
88        _connection_id: ConnectionId,
89        _is_first_connection: bool,
90    ) -> Result<ModuleJoinData<Self>, SignalingModuleError<Self::Error>> {
91        let timer = self
92            .timers
93            .get(&ctx.room)
94            .with_context(|| format!("Room '{:?}' does not exist in timers", ctx.room))?;
95
96        // Do not add JoinSuccess or PeerJoinInfo when there is no running timer
97        let Some(timer) = timer else {
98            return Ok(ModuleJoinData {
99                join_success: None,
100                peer_events: PeerDataMap::default(),
101                peer_data: PeerDataMap::default(),
102            });
103        };
104
105        if timer.config.ready_check_enabled {
106            // Joining participants might already be ready when reconnecting
107            let ready_status = self
108                .ready_participants
109                .get(&ctx.room)
110                .with_context(|| {
111                    format!(
112                        "Room '{:?}' does not exist in participant ready state",
113                        ctx.room
114                    )
115                })?
116                .contains(&participant_id);
117
118            // Append ready state when the running timer has ready check enabled
119            let mut peer = PeerDataMap::default();
120            peer.insert_for_all(ctx, TimerPeerState { ready_status })?;
121
122            // Collect ready state of all other participants for the joined participant
123            let mut participant_states = PeerDataMap::default();
124            for p in ctx.participants.connected().ids() {
125                let ready_status = self
126                    .ready_participants
127                    .get(&ctx.room)
128                    .with_context(|| format!("Room '{:?}' does not exist in timers", ctx.room))?
129                    .contains(&p);
130                participant_states.insert(p, TimerPeerState { ready_status })?;
131            }
132            Ok(ModuleJoinData {
133                join_success: Some(TimerState {
134                    config: timer.config.clone(),
135                    ready_status: Some(ready_status),
136                }),
137                peer_data: participant_states,
138                peer_events: peer,
139            })
140        } else {
141            Ok(ModuleJoinData {
142                join_success: Some(TimerState {
143                    config: timer.config.clone(),
144                    ready_status: None,
145                }),
146                peer_data: PeerDataMap::default(),
147                peer_events: PeerDataMap::default(),
148            })
149        }
150    }
151
152    #[allow(unused_variables)]
153    fn on_participant_disconnected(
154        &mut self,
155        ctx: &mut ModuleContext<'_, Self>,
156        participant_id: ParticipantId,
157        connection_id: ConnectionId,
158    ) -> Result<(), SignalingModuleError<Self::Error>> {
159        Ok(())
160    }
161
162    fn on_breakout_start(
163        &mut self,
164        _ctx: &mut ModuleContext<'_, Self>,
165        rooms: &[BreakoutRoom],
166        _duration: Option<Duration>,
167    ) -> Result<(), SignalingModuleError<Self::Error>> {
168        for room in rooms {
169            let room = RoomKind::Breakout(room.id);
170            self.timers.insert(room, None);
171            self.ready_participants.insert(room, BTreeSet::new());
172        }
173        Ok(())
174    }
175
176    fn on_breakout_switch(
177        &mut self,
178        ctx: &mut ModuleContext<'_, Self>,
179        participant_id: ParticipantId,
180        _old_room: RoomKind,
181        new_room: RoomKind,
182    ) -> Result<ModuleSwitchData<Self>, SignalingModuleError<Self::Error>> {
183        let timer = self
184            .timers
185            .get(&new_room)
186            .with_context(|| format!("Room '{new_room:?}' does not exist in timers"))?;
187
188        // Timer is disabled, send empty JoinInfo
189        let Some(timer) = timer else {
190            return Ok(ModuleSwitchData::<Self>::new());
191        };
192
193        let ready_status = if timer.config.ready_check_enabled {
194            // Joining participants might already be ready when they were in the
195            // room before
196            Some(
197                self.ready_participants
198                    .get(&new_room)
199                    .with_context(|| {
200                        format!("Room '{new_room:?}' does not exist in participant ready state")
201                    })?
202                    .contains(&participant_id),
203            )
204        } else {
205            None
206        };
207        let timer_state = Some(TimerState {
208            config: timer.config.clone(),
209            ready_status,
210        });
211
212        let connections = ctx
213            .participant_state(participant_id)
214            .with_context(|| format!("Participant '{participant_id}' does not have state"))?
215            .connections();
216        let switch_success = connections.map(|con| (con, timer_state.clone())).collect();
217
218        Ok(ModuleSwitchData {
219            switch_success,
220            ..Default::default()
221        })
222    }
223
224    fn on_breakout_closed(
225        &mut self,
226        _ctx: &mut ModuleContext<'_, Self>,
227    ) -> Result<(), SignalingModuleError<Self::Error>> {
228        self.ready_participants
229            .retain(|room, _| *room == RoomKind::Main);
230        self.timers.retain(|room, _| *room == RoomKind::Main);
231        Ok(())
232    }
233
234    fn on_websocket_message(
235        &mut self,
236        ctx: &mut ModuleContext<'_, Self>,
237        sender: ParticipantId,
238        _connection_id: ConnectionId,
239        payload: Self::Incoming,
240    ) -> Result<(), SignalingModuleError<Self::Error>> {
241        match payload {
242            TimerCommand::Start {
243                kind,
244                style,
245                title,
246                enable_ready_check,
247            } => self.start_timer(ctx, sender, kind, style, title, enable_ready_check)?,
248            TimerCommand::Stop { reason } => self.stop_timer(ctx, sender, reason)?,
249            TimerCommand::UpdateReadyStatus { status } => {
250                self.update_ready_status(ctx, sender, status)?;
251            }
252        }
253        Ok(())
254    }
255
256    fn on_loopback_event(
257        &mut self,
258        ctx: &mut ModuleContext<'_, Self>,
259        event: Self::Loopback,
260    ) -> Result<(), SignalingModuleError<Self::Error>> {
261        self.remove_timer(ctx.room)?;
262
263        match event {
264            TimerLoopback::Stopped(stopped) => {
265                ctx.send_ws_message(
266                    ctx.participants.in_room(ctx.room).ids(),
267                    TimerEvent::Stopped(stopped),
268                )?;
269            }
270            TimerLoopback::ChannelDropped => {
271                ctx.send_ws_message(
272                    ctx.participants.in_room(ctx.room).ids(),
273                    TimerEvent::Error(TimerError::Internal),
274                )?;
275            }
276        }
277        Ok(())
278    }
279}
280
281impl TimerModule {
282    fn start_timer(
283        &mut self,
284        ctx: &mut ModuleContext<'_, Self>,
285        sender: ParticipantId,
286        kind: command::Kind,
287        style: Option<String>,
288        title: Option<String>,
289        ready_check_enabled: bool,
290    ) -> Result<(), SignalingModuleError<<TimerModule as SignalingModule>::Error>> {
291        if !ctx.is_moderator(sender) {
292            return Err(TimerError::InsufficientPermissions.into());
293        }
294
295        let timer = self
296            .timers
297            .get_mut(&ctx.room)
298            .with_context(|| format!("Room '{:?}' does not exist in timers", ctx.room))?;
299        if timer.is_some() {
300            return Err(TimerError::TimerAlreadyRunning.into());
301        }
302
303        let started_at = ctx.timestamp;
304        let mut tx_cancel = None;
305        let kind = match kind {
306            command::Kind::Stopwatch => Kind::Stopwatch,
307            command::Kind::Countdown { duration } => {
308                let ends_at = started_at
309                    .checked_add_signed(chrono::Duration::seconds(duration.into()))
310                    .ok_or(TimerError::InvalidDuration)?;
311
312                let tx = ctx.loopback_after(Duration::from_secs(duration as u64), || {
313                    TimerLoopback::Stopped(Stopped {
314                        kind: StopKind::Expired,
315                        reason: None,
316                    })
317                });
318                tx_cancel = Some(tx);
319
320                Kind::Countdown {
321                    ends_at: Timestamp::from(ends_at),
322                }
323            }
324        };
325
326        *timer = Some(Timer {
327            config: TimerConfig {
328                started_at,
329                kind,
330                style: style.clone(),
331                title: title.clone(),
332                ready_check_enabled,
333            },
334            tx_cancel,
335        });
336
337        ctx.send_ws_message(
338            ctx.participants.filter().room(ctx.room).ids(),
339            TimerEvent::Started {
340                config: TimerConfig {
341                    started_at,
342                    kind,
343                    style,
344                    title,
345                    ready_check_enabled,
346                },
347            },
348        )?;
349        Ok(())
350    }
351
352    fn stop_timer(
353        &mut self,
354        ctx: &mut ModuleContext<'_, Self>,
355        sender: ParticipantId,
356        reason: Option<String>,
357    ) -> Result<(), SignalingModuleError<<TimerModule as SignalingModule>::Error>> {
358        if !ctx.is_moderator(sender) {
359            return Err(TimerError::InsufficientPermissions.into());
360        }
361
362        if let Some(mut timer) = self.remove_timer(ctx.room)? {
363            let stopped = Stopped {
364                kind: StopKind::ByModerator(sender),
365                reason,
366            };
367            if let Some(tx) = timer.tx_cancel.take() {
368                if tx.send(TimerLoopback::Stopped(stopped)).is_err() {
369                    tracing::debug!("Timer cancel sender has been dropped");
370                }
371            } else {
372                // If there is no cancel sender, this means the timer does not use a
373                // loopback task (e.g. stopwatch). In this case we can simply notify
374                // the participants that the timer was cancelled.
375                ctx.send_ws_message(
376                    ctx.participants.filter().room(ctx.room).ids(),
377                    TimerEvent::Stopped(stopped),
378                )?;
379            }
380        }
381
382        Ok(())
383    }
384
385    /// Removes the timer and the associated ready state
386    fn remove_timer(
387        &mut self,
388        room: RoomKind,
389    ) -> Result<Option<Timer>, SignalingModuleError<<TimerModule as SignalingModule>::Error>> {
390        let timer = self
391            .timers
392            .get_mut(&room)
393            .with_context(|| format!("Room '{room:?}' does not exist in timers"))?;
394        let ready_states = self
395            .ready_participants
396            .get_mut(&room)
397            .with_context(|| format!("Room '{room:?}' does not exist in ready states",))?;
398        if let Some(timer) = timer.take() {
399            ready_states.clear();
400            return Ok(Some(timer));
401        }
402        Ok(None)
403    }
404
405    fn update_ready_status(
406        &mut self,
407        ctx: &mut ModuleContext<'_, Self>,
408        sender: ParticipantId,
409        ready: bool,
410    ) -> Result<(), SignalingModuleError<<TimerModule as SignalingModule>::Error>> {
411        let timer = self
412            .timers
413            .get_mut(&ctx.room)
414            .with_context(|| format!("Room '{:?}' does not exist in timers", ctx.room))?;
415
416        let Some(timer) = timer else {
417            return Err(TimerError::TimerNotRunning)?;
418        };
419
420        if !timer.config.ready_check_enabled {
421            return Err(TimerError::ReadyCheckNotEnabled)?;
422        }
423
424        let ready_participants = self
425            .ready_participants
426            .get_mut(&ctx.room)
427            .with_context(|| {
428                format!(
429                    "Room '{:?}' does not exist in participant ready state",
430                    ctx.room
431                )
432            })?;
433
434        let changed = if ready {
435            ready_participants.insert(sender)
436        } else {
437            ready_participants.remove(&sender)
438        };
439        if changed {
440            ctx.send_ws_message(
441                ctx.participants.filter().room(ctx.room).ids(),
442                TimerEvent::UpdatedReadyStatus {
443                    participant_id: sender,
444                    status: ready,
445                },
446            )?;
447        }
448
449        Ok(())
450    }
451}