1use std::{
6 collections::{BTreeSet, HashMap},
7 time::Duration,
8};
9
10use anyhow::Context;
11use opentalk_roomserver_signaling::{
12 module_context::{ChannelDroppedError, ModuleContext},
13 signaling_module::{
14 ModuleJoinData, ModuleSwitchData, NoOp, PeerDataMap, SignalingModule,
15 SignalingModuleDescription, SignalingModuleFeatureDescription, SignalingModuleInitData,
16 },
17};
18use opentalk_roomserver_types::{
19 breakout::BreakoutRoom, connection_id::ConnectionId, room_kind::RoomKind,
20 signaling::module_error::SignalingModuleError,
21};
22use opentalk_roomserver_types_timer::{
23 Kind, StopKind, TIMER_MODULE_ID, TimerCommand, TimerConfig, TimerError, command,
24 event::{Stopped, TimerEvent},
25 peer_state::TimerPeerState,
26 state::TimerState,
27};
28use opentalk_types_common::{modules::ModuleId, time::Timestamp};
29use opentalk_types_signaling::ParticipantId;
30
31use crate::timer::Timer;
32
33mod timer;
34
35#[derive(Debug)]
36pub enum TimerLoopback {
37 Stopped(Stopped),
38 ChannelDropped,
39}
40
41impl From<ChannelDroppedError> for TimerLoopback {
42 fn from(_: ChannelDroppedError) -> Self {
43 Self::ChannelDropped
44 }
45}
46
47#[derive(Debug)]
48pub struct TimerModule {
49 timers: HashMap<RoomKind, Option<Timer>>,
50 ready_participants: HashMap<RoomKind, BTreeSet<ParticipantId>>,
51}
52
53impl SignalingModuleDescription for TimerModule {
54 const MODULE_ID: ModuleId = TIMER_MODULE_ID;
55 const DESCRIPTION: &'static str =
56 "Handles timer functionality including the coffee-break timer.";
57 const FEATURES: &[SignalingModuleFeatureDescription] = &[];
58}
59
60impl SignalingModule for TimerModule {
61 const NAMESPACE: ModuleId = TIMER_MODULE_ID;
62
63 type Incoming = TimerCommand;
64
65 type Outgoing = TimerEvent;
66
67 type Internal = NoOp;
68
69 type Loopback = TimerLoopback;
70
71 type JoinInfo = TimerState;
72
73 type PeerJoinInfo = TimerPeerState;
74
75 type Error = TimerError;
76
77 fn init(_init_data: SignalingModuleInitData) -> Option<Self> {
78 Some(Self {
79 timers: HashMap::from([(RoomKind::Main, None)]),
80 ready_participants: HashMap::from([(RoomKind::Main, BTreeSet::new())]),
81 })
82 }
83
84 fn on_participant_joined(
85 &mut self,
86 ctx: &mut ModuleContext<'_, Self>,
87 participant_id: ParticipantId,
88 _connection_id: ConnectionId,
89 _is_first_connection: bool,
90 ) -> Result<ModuleJoinData<Self>, SignalingModuleError<Self::Error>> {
91 let timer = self
92 .timers
93 .get(&ctx.room)
94 .with_context(|| format!("Room '{:?}' does not exist in timers", ctx.room))?;
95
96 let Some(timer) = timer else {
98 return Ok(ModuleJoinData {
99 join_success: None,
100 peer_events: PeerDataMap::default(),
101 peer_data: PeerDataMap::default(),
102 });
103 };
104
105 if timer.config.ready_check_enabled {
106 let ready_status = self
108 .ready_participants
109 .get(&ctx.room)
110 .with_context(|| {
111 format!(
112 "Room '{:?}' does not exist in participant ready state",
113 ctx.room
114 )
115 })?
116 .contains(&participant_id);
117
118 let mut peer = PeerDataMap::default();
120 peer.insert_for_all(ctx, TimerPeerState { ready_status })?;
121
122 let mut participant_states = PeerDataMap::default();
124 for p in ctx.participants.connected().ids() {
125 let ready_status = self
126 .ready_participants
127 .get(&ctx.room)
128 .with_context(|| format!("Room '{:?}' does not exist in timers", ctx.room))?
129 .contains(&p);
130 participant_states.insert(p, TimerPeerState { ready_status })?;
131 }
132 Ok(ModuleJoinData {
133 join_success: Some(TimerState {
134 config: timer.config.clone(),
135 ready_status: Some(ready_status),
136 }),
137 peer_data: participant_states,
138 peer_events: peer,
139 })
140 } else {
141 Ok(ModuleJoinData {
142 join_success: Some(TimerState {
143 config: timer.config.clone(),
144 ready_status: None,
145 }),
146 peer_data: PeerDataMap::default(),
147 peer_events: PeerDataMap::default(),
148 })
149 }
150 }
151
152 #[allow(unused_variables)]
153 fn on_participant_disconnected(
154 &mut self,
155 ctx: &mut ModuleContext<'_, Self>,
156 participant_id: ParticipantId,
157 connection_id: ConnectionId,
158 ) -> Result<(), SignalingModuleError<Self::Error>> {
159 Ok(())
160 }
161
162 fn on_breakout_start(
163 &mut self,
164 _ctx: &mut ModuleContext<'_, Self>,
165 rooms: &[BreakoutRoom],
166 _duration: Option<Duration>,
167 ) -> Result<(), SignalingModuleError<Self::Error>> {
168 for room in rooms {
169 let room = RoomKind::Breakout(room.id);
170 self.timers.insert(room, None);
171 self.ready_participants.insert(room, BTreeSet::new());
172 }
173 Ok(())
174 }
175
176 fn on_breakout_switch(
177 &mut self,
178 ctx: &mut ModuleContext<'_, Self>,
179 participant_id: ParticipantId,
180 _old_room: RoomKind,
181 new_room: RoomKind,
182 ) -> Result<ModuleSwitchData<Self>, SignalingModuleError<Self::Error>> {
183 let timer = self
184 .timers
185 .get(&new_room)
186 .with_context(|| format!("Room '{new_room:?}' does not exist in timers"))?;
187
188 let Some(timer) = timer else {
190 return Ok(ModuleSwitchData::<Self>::new());
191 };
192
193 let ready_status = if timer.config.ready_check_enabled {
194 Some(
197 self.ready_participants
198 .get(&new_room)
199 .with_context(|| {
200 format!("Room '{new_room:?}' does not exist in participant ready state")
201 })?
202 .contains(&participant_id),
203 )
204 } else {
205 None
206 };
207 let timer_state = Some(TimerState {
208 config: timer.config.clone(),
209 ready_status,
210 });
211
212 let connections = ctx
213 .participant_state(participant_id)
214 .with_context(|| format!("Participant '{participant_id}' does not have state"))?
215 .connections();
216 let switch_success = connections.map(|con| (con, timer_state.clone())).collect();
217
218 Ok(ModuleSwitchData {
219 switch_success,
220 ..Default::default()
221 })
222 }
223
224 fn on_breakout_closed(
225 &mut self,
226 _ctx: &mut ModuleContext<'_, Self>,
227 ) -> Result<(), SignalingModuleError<Self::Error>> {
228 self.ready_participants
229 .retain(|room, _| *room == RoomKind::Main);
230 self.timers.retain(|room, _| *room == RoomKind::Main);
231 Ok(())
232 }
233
234 fn on_websocket_message(
235 &mut self,
236 ctx: &mut ModuleContext<'_, Self>,
237 sender: ParticipantId,
238 _connection_id: ConnectionId,
239 payload: Self::Incoming,
240 ) -> Result<(), SignalingModuleError<Self::Error>> {
241 match payload {
242 TimerCommand::Start {
243 kind,
244 style,
245 title,
246 enable_ready_check,
247 } => self.start_timer(ctx, sender, kind, style, title, enable_ready_check)?,
248 TimerCommand::Stop { reason } => self.stop_timer(ctx, sender, reason)?,
249 TimerCommand::UpdateReadyStatus { status } => {
250 self.update_ready_status(ctx, sender, status)?;
251 }
252 }
253 Ok(())
254 }
255
256 fn on_loopback_event(
257 &mut self,
258 ctx: &mut ModuleContext<'_, Self>,
259 event: Self::Loopback,
260 ) -> Result<(), SignalingModuleError<Self::Error>> {
261 self.remove_timer(ctx.room)?;
262
263 match event {
264 TimerLoopback::Stopped(stopped) => {
265 ctx.send_ws_message(
266 ctx.participants.in_room(ctx.room).ids(),
267 TimerEvent::Stopped(stopped),
268 )?;
269 }
270 TimerLoopback::ChannelDropped => {
271 ctx.send_ws_message(
272 ctx.participants.in_room(ctx.room).ids(),
273 TimerEvent::Error(TimerError::Internal),
274 )?;
275 }
276 }
277 Ok(())
278 }
279}
280
281impl TimerModule {
282 fn start_timer(
283 &mut self,
284 ctx: &mut ModuleContext<'_, Self>,
285 sender: ParticipantId,
286 kind: command::Kind,
287 style: Option<String>,
288 title: Option<String>,
289 ready_check_enabled: bool,
290 ) -> Result<(), SignalingModuleError<<TimerModule as SignalingModule>::Error>> {
291 if !ctx.is_moderator(sender) {
292 return Err(TimerError::InsufficientPermissions.into());
293 }
294
295 let timer = self
296 .timers
297 .get_mut(&ctx.room)
298 .with_context(|| format!("Room '{:?}' does not exist in timers", ctx.room))?;
299 if timer.is_some() {
300 return Err(TimerError::TimerAlreadyRunning.into());
301 }
302
303 let started_at = ctx.timestamp;
304 let mut tx_cancel = None;
305 let kind = match kind {
306 command::Kind::Stopwatch => Kind::Stopwatch,
307 command::Kind::Countdown { duration } => {
308 let ends_at = started_at
309 .checked_add_signed(chrono::Duration::seconds(duration.into()))
310 .ok_or(TimerError::InvalidDuration)?;
311
312 let tx = ctx.loopback_after(Duration::from_secs(duration as u64), || {
313 TimerLoopback::Stopped(Stopped {
314 kind: StopKind::Expired,
315 reason: None,
316 })
317 });
318 tx_cancel = Some(tx);
319
320 Kind::Countdown {
321 ends_at: Timestamp::from(ends_at),
322 }
323 }
324 };
325
326 *timer = Some(Timer {
327 config: TimerConfig {
328 started_at,
329 kind,
330 style: style.clone(),
331 title: title.clone(),
332 ready_check_enabled,
333 },
334 tx_cancel,
335 });
336
337 ctx.send_ws_message(
338 ctx.participants.filter().room(ctx.room).ids(),
339 TimerEvent::Started {
340 config: TimerConfig {
341 started_at,
342 kind,
343 style,
344 title,
345 ready_check_enabled,
346 },
347 },
348 )?;
349 Ok(())
350 }
351
352 fn stop_timer(
353 &mut self,
354 ctx: &mut ModuleContext<'_, Self>,
355 sender: ParticipantId,
356 reason: Option<String>,
357 ) -> Result<(), SignalingModuleError<<TimerModule as SignalingModule>::Error>> {
358 if !ctx.is_moderator(sender) {
359 return Err(TimerError::InsufficientPermissions.into());
360 }
361
362 if let Some(mut timer) = self.remove_timer(ctx.room)? {
363 let stopped = Stopped {
364 kind: StopKind::ByModerator(sender),
365 reason,
366 };
367 if let Some(tx) = timer.tx_cancel.take() {
368 if tx.send(TimerLoopback::Stopped(stopped)).is_err() {
369 tracing::debug!("Timer cancel sender has been dropped");
370 }
371 } else {
372 ctx.send_ws_message(
376 ctx.participants.filter().room(ctx.room).ids(),
377 TimerEvent::Stopped(stopped),
378 )?;
379 }
380 }
381
382 Ok(())
383 }
384
385 fn remove_timer(
387 &mut self,
388 room: RoomKind,
389 ) -> Result<Option<Timer>, SignalingModuleError<<TimerModule as SignalingModule>::Error>> {
390 let timer = self
391 .timers
392 .get_mut(&room)
393 .with_context(|| format!("Room '{room:?}' does not exist in timers"))?;
394 let ready_states = self
395 .ready_participants
396 .get_mut(&room)
397 .with_context(|| format!("Room '{room:?}' does not exist in ready states",))?;
398 if let Some(timer) = timer.take() {
399 ready_states.clear();
400 return Ok(Some(timer));
401 }
402 Ok(None)
403 }
404
405 fn update_ready_status(
406 &mut self,
407 ctx: &mut ModuleContext<'_, Self>,
408 sender: ParticipantId,
409 ready: bool,
410 ) -> Result<(), SignalingModuleError<<TimerModule as SignalingModule>::Error>> {
411 let timer = self
412 .timers
413 .get_mut(&ctx.room)
414 .with_context(|| format!("Room '{:?}' does not exist in timers", ctx.room))?;
415
416 let Some(timer) = timer else {
417 return Err(TimerError::TimerNotRunning)?;
418 };
419
420 if !timer.config.ready_check_enabled {
421 return Err(TimerError::ReadyCheckNotEnabled)?;
422 }
423
424 let ready_participants = self
425 .ready_participants
426 .get_mut(&ctx.room)
427 .with_context(|| {
428 format!(
429 "Room '{:?}' does not exist in participant ready state",
430 ctx.room
431 )
432 })?;
433
434 let changed = if ready {
435 ready_participants.insert(sender)
436 } else {
437 ready_participants.remove(&sender)
438 };
439 if changed {
440 ctx.send_ws_message(
441 ctx.participants.filter().room(ctx.room).ids(),
442 TimerEvent::UpdatedReadyStatus {
443 participant_id: sender,
444 status: ready,
445 },
446 )?;
447 }
448
449 Ok(())
450 }
451}