1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, hash_map::Entry};
6
7use anyhow::Context;
8use opentalk_roomserver_signaling::{
9 module_context::ModuleContext,
10 participant_state::ParticipantState,
11 signaling_module::{
12 ModuleJoinData, ModuleSwitchData, NoOp, PeerDataMap, SignalingModule,
13 SignalingModuleDescription, SignalingModuleFeatureDescription, SignalingModuleInitData,
14 },
15};
16use opentalk_roomserver_types::{
17 breakout::BreakoutRoom,
18 client_parameters::ClientKind,
19 connection_id::ConnectionId,
20 room_kind::RoomKind,
21 signaling::module_error::{FatalError, SignalingModuleError},
22};
23use opentalk_roomserver_types_recording::{
24 RECORD_FEATURE_ID, RECORDING_MODULE_ID, RecordingSettings, RecordingStatus, STREAM_FEATURE_ID,
25 StreamStatus, StreamingTarget,
26 command::RecordingCommand,
27 event::{RecordingError, RecordingEvent},
28 peer_state::RecordingPeerState,
29 service::{
30 command::RecordingServiceCommand,
31 event::RecordingServiceEvent,
32 state::{RecordingServiceState, ServiceStreamingTarget},
33 },
34 state::RecordingState,
35};
36use opentalk_types_api_internal::recording::RecordingTarget;
37use opentalk_types_common::{
38 features::ModuleFeatureId,
39 modules::ModuleId,
40 streaming::{RoomStreamingTarget, StreamingTargetId},
41};
42use opentalk_types_signaling::ParticipantId;
43
44pub struct RecordingModule {
45 settings: RecordingSettings,
46 http_client: reqwest::Client,
47
48 can_record: bool,
50 can_stream: bool,
51
52 streaming_targets: Vec<RoomStreamingTarget>,
54
55 consenting_participants: HashSet<ParticipantId>,
57
58 recording_states: HashMap<RoomKind, RecordingStatus>,
60
61 stream_states: HashMap<StreamingTargetId, (RoomKind, StreamStatus)>,
64}
65
66pub enum LoopBackEvent {
67 RecorderRequestFailed,
68}
69
70impl SignalingModuleDescription for RecordingModule {
71 const MODULE_ID: ModuleId = RECORDING_MODULE_ID;
72 const DESCRIPTION: &'static str = "Handles recording functionality.";
73 const FEATURES: &[SignalingModuleFeatureDescription] = &[
74 SignalingModuleFeatureDescription {
75 feature_id: RECORD_FEATURE_ID,
76 description: "Allows creation of recordings for meetings",
77 },
78 SignalingModuleFeatureDescription {
79 feature_id: STREAM_FEATURE_ID,
80 description: "Allows streaming meetings to streaming services",
81 },
82 ];
83}
84
85impl SignalingModule for RecordingModule {
86 const NAMESPACE: ModuleId = RECORDING_MODULE_ID;
87
88 type Incoming = RecordingCommand;
89 type Outgoing = RecordingEvent;
90 type Internal = NoOp;
91 type Loopback = LoopBackEvent;
92 type JoinInfo = RecordingState;
93 type PeerJoinInfo = RecordingPeerState;
94 type Error = RecordingError;
95
96 fn init(init_data: SignalingModuleInitData) -> Option<Self> {
97 let settings = (init_data
98 .room_parameters
99 .module_settings
100 .get::<RecordingSettings>()
101 .ok()?)?;
102
103 let tariff = &init_data.room_parameters.tariff;
104 let can_record = !tariff.disabled_features.contains(&ModuleFeatureId {
105 module: RECORDING_MODULE_ID,
106 feature: RECORD_FEATURE_ID,
107 });
108 let can_stream = !tariff.disabled_features.contains(&ModuleFeatureId {
109 module: RECORDING_MODULE_ID,
110 feature: STREAM_FEATURE_ID,
111 });
112
113 Some(Self {
114 settings,
115 http_client: reqwest::Client::new(),
116 can_record,
117 can_stream,
118 streaming_targets: init_data.room_parameters.streaming_targets.clone(),
119 consenting_participants: HashSet::new(),
120 recording_states: HashMap::new(),
121 stream_states: HashMap::new(),
122 })
123 }
124
125 fn on_participant_joined(
126 &mut self,
127 ctx: &mut ModuleContext<'_, Self>,
128 p_joined: ParticipantId,
129 _connection_id: ConnectionId,
130 _is_first_connection: bool,
131 ) -> Result<ModuleJoinData<Self>, SignalingModuleError<Self::Error>> {
132 let recording_state = self.build_module_state(ctx, p_joined)?;
133
134 let mut peer_data = PeerDataMap::default();
135
136 peer_data.insert_for_matching(
138 ctx,
139 RecordingPeerState {
140 consents_recording: true,
141 },
142 |participant_id, _| self.consenting_participants.contains(&participant_id),
143 )?;
144 peer_data.insert_for_matching(
145 ctx,
146 RecordingPeerState {
147 consents_recording: false,
148 },
149 |participant_id, _| !self.consenting_participants.contains(&participant_id),
150 )?;
151
152 let mut peer_events = PeerDataMap::default();
153 peer_events.insert_for_all(
154 ctx,
155 RecordingPeerState {
156 consents_recording: self.consenting_participants.contains(&p_joined),
157 },
158 )?;
159
160 Ok(ModuleJoinData {
161 join_success: Some(recording_state),
162 peer_events,
163 peer_data,
164 })
165 }
166
167 fn on_participant_disconnected(
168 &mut self,
169 ctx: &mut ModuleContext<'_, Self>,
170 participant_id: ParticipantId,
171 _connection_id: ConnectionId,
172 ) -> Result<(), SignalingModuleError<Self::Error>> {
173 let participant_state = ctx
174 .participant_state(participant_id)
175 .context("Failed to find disconnected participant state")
176 .map_err(FatalError)?;
177
178 if participant_state.kind == (ClientKind::Recorder { room: ctx.room }) {
179 self.reset_states_if_no_recorder_is_connected(ctx)
180 } else {
181 Ok(())
182 }
183 }
184
185 fn on_websocket_message(
186 &mut self,
187 ctx: &mut ModuleContext<'_, Self>,
188 participant_id: ParticipantId,
189 _connection_id: ConnectionId,
190 command: Self::Incoming,
191 ) -> Result<(), SignalingModuleError<Self::Error>> {
192 match command {
193 RecordingCommand::SetConsent { consent } => {
194 self.set_consent(ctx, participant_id, consent)
195 }
196 RecordingCommand::StartRecording => self.start_recording(ctx, participant_id),
197 RecordingCommand::PauseRecording => self.pause_recording(ctx, participant_id),
198 RecordingCommand::StopRecording => self.stop_recording(ctx, participant_id),
199 RecordingCommand::StartStream { target_ids } => {
200 self.start_stream(ctx, participant_id, target_ids)
201 }
202 RecordingCommand::PauseStream { target_ids } => {
203 self.pause_stream(ctx, participant_id, target_ids)
204 }
205 RecordingCommand::StopStream { target_ids } => {
206 self.stop_stream(ctx, participant_id, target_ids)
207 }
208 RecordingCommand::Service { command } => {
209 self.handle_service_message(ctx, participant_id, command)
210 }
211 }
212 }
213
214 fn on_breakout_start(
215 &mut self,
216 _ctx: &mut ModuleContext<'_, Self>,
217 _rooms: &[BreakoutRoom],
218 _duration: Option<std::time::Duration>,
219 ) -> Result<(), SignalingModuleError<Self::Error>> {
220 Ok(())
221 }
222
223 fn on_breakout_switch(
224 &mut self,
225 ctx: &mut ModuleContext<'_, Self>,
226 participant_id: ParticipantId,
227 _old_room: RoomKind,
228 _new_room: RoomKind,
229 ) -> Result<ModuleSwitchData<Self>, SignalingModuleError<Self::Error>> {
230 let recording_state = self.build_module_state(ctx, participant_id)?;
231
232 let connections = ctx
233 .participant_state(participant_id)
234 .with_context(|| format!("Participant '{participant_id}' does not have state"))?
235 .connections();
236
237 let switch_success = connections
238 .map(|connection_id| (connection_id, Some(recording_state.clone())))
239 .collect();
240
241 Ok(ModuleSwitchData {
242 switch_success,
243 ..Default::default()
244 })
245 }
246
247 fn on_loopback_event(
248 &mut self,
249 ctx: &mut ModuleContext<'_, Self>,
250 event: Self::Loopback,
251 ) -> Result<(), SignalingModuleError<Self::Error>> {
252 match event {
253 LoopBackEvent::RecorderRequestFailed => {
254 self.reset_states_if_no_recorder_is_connected(ctx)?;
256
257 Err(RecordingError::FailedToRequestRecordingService.into())
258 }
259 }
260 }
261
262 fn on_breakout_closed(
263 &mut self,
264 _ctx: &mut ModuleContext<'_, Self>,
265 ) -> Result<(), SignalingModuleError<Self::Error>> {
266 Ok(())
267 }
268}
269
270impl RecordingModule {
271 fn build_module_state(
272 &mut self,
273 ctx: &mut ModuleContext<'_, RecordingModule>,
274 participant_id: ParticipantId,
275 ) -> Result<RecordingState, SignalingModuleError<RecordingError>> {
276 let mut stream_states = BTreeMap::new();
277
278 for target in &self.streaming_targets {
279 let status = self
280 .stream_states
281 .get(&target.id)
282 .map(|(room, status)| {
283 if *room == ctx.room {
284 status.clone()
285 } else {
286 StreamStatus::InUse
287 }
288 })
289 .unwrap_or(StreamStatus::Inactive);
290
291 let public_url = match &target.streaming_target.kind {
292 opentalk_types_common::streaming::StreamingTargetKind::Custom {
293 public_url,
294 ..
295 } => public_url.clone(),
296 };
297
298 stream_states.insert(
299 target.id,
300 StreamingTarget {
301 name: target.streaming_target.name.clone(),
302 public_url,
303 status,
304 },
305 );
306 }
307
308 let mut recording_state = RecordingState {
309 consents_recording: self.consenting_participants.contains(&participant_id),
310 recording_state: self
311 .recording_states
312 .get(&ctx.room)
313 .cloned()
314 .unwrap_or(RecordingStatus::Inactive),
315 stream_states,
316 service: None,
317 };
318
319 let joined_participant_state = ctx
320 .participant_state(participant_id)
321 .context("Failed to find state for joined participant")
322 .map_err(FatalError)?;
323
324 if joined_participant_state.kind == (ClientKind::Recorder { room: ctx.room }) {
325 let mut streaming_targets = BTreeMap::new();
326
327 for target in &self.streaming_targets {
328 let Some(location) = target.streaming_target.kind.get_stream_target_location()
329 else {
330 tracing::warn!("Failed to build streaming url for a streaming-target");
331 continue;
332 };
333
334 streaming_targets.insert(target.id, ServiceStreamingTarget { location });
335 }
336
337 recording_state.service = Some(RecordingServiceState { streaming_targets });
338 }
339
340 Ok(recording_state)
341 }
342
343 fn set_consent(
344 &mut self,
345 ctx: &mut ModuleContext<'_, RecordingModule>,
346 participant_id: ParticipantId,
347 consent: bool,
348 ) -> Result<(), SignalingModuleError<RecordingError>> {
349 let updated = if consent {
350 self.consenting_participants.insert(participant_id)
351 } else {
352 self.consenting_participants.remove(&participant_id)
353 };
354
355 if updated {
356 ctx.send_ws_message(
357 ctx.participants.connected().ids(),
358 RecordingEvent::ConsentUpdated {
359 participant: participant_id,
360 consents: consent,
361 },
362 )?;
363 }
364
365 Ok(())
366 }
367
368 fn request_recorder(
369 &mut self,
370 ctx: &mut ModuleContext<'_, RecordingModule>,
371 ) -> Result<(), SignalingModuleError<RecordingError>> {
372 let jwt_bearer_token = self
373 .settings
374 .api_key
375 .generate_jwt()
376 .context("Failed to generate JWT from recorder api_key")
377 .map_err(FatalError)?;
378
379 let http_client = self.http_client.clone();
380
381 let body = RecordingTarget {
382 room_id: ctx.room_id,
383 breakout_room: match ctx.room {
384 RoomKind::Main => None,
385 RoomKind::Breakout(breakout_id) => Some(breakout_id.into()),
386 },
387 };
388
389 let url = match self.settings.url.join("v1/init") {
390 Ok(url) => url,
391 Err(err) => {
392 tracing::error!(
393 "Failed to build recorder init url from base_url: {}, {err}",
394 self.settings.url
395 );
396 self.reset_states_if_no_recorder_is_connected(ctx)?;
397
398 return Err(RecordingError::FailedToRequestRecordingService.into());
399 }
400 };
401
402 ctx.spawn_optional(async move {
403 tracing::debug!("Sending off recorder start request to {url} with body {body:?}");
404
405 let response = http_client
406 .post(url)
407 .bearer_auth(jwt_bearer_token)
408 .json(&body)
409 .send()
410 .await;
411
412 let response = match response {
413 Ok(response) => response,
414 Err(err) => {
415 tracing::error!("Failed to send init request to recorder, {err:?}");
416 return Some(LoopBackEvent::RecorderRequestFailed);
417 }
418 };
419
420 let response_status = response.status();
421 let response_body = response
422 .bytes()
423 .await
424 .map(|bytes| String::from_utf8_lossy(&bytes).into_owned());
425
426 if response_status.is_success() {
427 tracing::debug!(
428 "Got recorder start response status={} body={:?}",
429 response_status,
430 response_body,
431 );
432 } else {
433 tracing::debug!(
434 "Got non-success response to recorder start request status={} body={:?}",
435 response_status,
436 response_body,
437 );
438 return Some(LoopBackEvent::RecorderRequestFailed);
439 }
440
441 None
442 });
443
444 Ok(())
445 }
446
447 fn start_recording(
448 &mut self,
449 ctx: &mut ModuleContext<'_, RecordingModule>,
450 participant_id: ParticipantId,
451 ) -> Result<(), SignalingModuleError<RecordingError>> {
452 if !ctx.is_moderator(participant_id) {
453 return Err(RecordingError::InsufficientPermissions.into());
454 }
455
456 if !self.can_record {
457 return Err(RecordingError::RecordFeatureDisabled.into());
458 }
459
460 match self.recording_states.entry(ctx.room) {
461 Entry::Occupied(mut occupied_entry) => {
462 if occupied_entry.get().is_running() {
463 return Err(RecordingError::RecordingAlreadyActive.into());
464 }
465
466 occupied_entry.insert(RecordingStatus::Requested);
467 }
468 Entry::Vacant(vacant_entry) => {
469 vacant_entry.insert(RecordingStatus::Requested);
470 }
471 }
472
473 let request_result = if let Some((recorder, _)) = find_recorder(ctx) {
474 ctx.send_ws_message(
475 [*recorder],
476 RecordingEvent::Service {
477 event: RecordingServiceCommand::StartRecording,
478 },
479 )
480 .map_err(SignalingModuleError::from)
481 } else {
482 self.request_recorder(ctx)
483 };
484
485 if let Err(e) = request_result {
486 self.recording_states
488 .insert(ctx.room, RecordingStatus::Inactive);
489
490 return Err(e);
491 }
492
493 ctx.send_ws_message(
494 ctx.participants.connected().room(ctx.room).ids(),
495 RecordingEvent::RecordingUpdated(RecordingStatus::Requested),
496 )?;
497
498 Ok(())
499 }
500
501 fn pause_recording(
502 &mut self,
503 ctx: &mut ModuleContext<'_, RecordingModule>,
504 participant_id: ParticipantId,
505 ) -> Result<(), SignalingModuleError<RecordingError>> {
506 if !ctx.is_moderator(participant_id) {
507 return Err(RecordingError::InsufficientPermissions.into());
508 }
509
510 match self.recording_states.entry(ctx.room) {
511 Entry::Occupied(mut occupied_entry) => {
512 occupied_entry.insert(RecordingStatus::Paused);
513 }
514 Entry::Vacant(..) => return Err(RecordingError::RecordingNotActive.into()),
515 }
516
517 let (recorder, _) = find_recorder(ctx)
518 .context("Invalid state, set recording to paused but no recorder is connected")?;
519
520 ctx.send_ws_message(
521 [*recorder],
522 RecordingEvent::Service {
523 event: RecordingServiceCommand::PauseRecording,
524 },
525 )?;
526
527 Ok(())
528 }
529
530 fn stop_recording(
531 &mut self,
532 ctx: &mut ModuleContext<'_, RecordingModule>,
533 participant_id: ParticipantId,
534 ) -> Result<(), SignalingModuleError<RecordingError>> {
535 if !ctx.is_moderator(participant_id) {
536 return Err(RecordingError::InsufficientPermissions.into());
537 }
538
539 if self
540 .recording_states
541 .get(&ctx.room)
542 .is_none_or(|s| !s.is_running())
543 {
544 return Err(RecordingError::RecordingNotActive.into());
545 }
546
547 let (recorder, _) = find_recorder(ctx)
548 .context("Failed to find recorder in conference despite an active recording")?;
549
550 ctx.send_ws_message(
551 [*recorder],
552 RecordingEvent::Service {
553 event: RecordingServiceCommand::StopRecording,
554 },
555 )?;
556
557 Ok(())
558 }
559
560 fn start_stream(
561 &mut self,
562 ctx: &mut ModuleContext<'_, RecordingModule>,
563 participant_id: ParticipantId,
564 target_ids: BTreeSet<StreamingTargetId>,
565 ) -> Result<(), SignalingModuleError<RecordingError>> {
566 if !ctx.is_moderator(participant_id) {
567 return Err(RecordingError::InsufficientPermissions.into());
568 }
569
570 if !self.can_stream {
571 return Err(RecordingError::StreamFeatureDisabled.into());
572 }
573
574 for target_id in &target_ids {
576 if !self.streaming_targets.iter().any(|c| c.id == *target_id) {
577 return Err(RecordingError::InvalidStreamingId.into());
578 }
579
580 if self
581 .stream_states
582 .get(target_id)
583 .is_some_and(|(_, status)| !status.can_be_started())
584 {
585 return Err(RecordingError::StreamingTargetInUse.into());
586 }
587 }
588
589 for target_id in &target_ids {
591 self.stream_states
592 .insert(*target_id, (ctx.room, StreamStatus::Requested));
593 }
594
595 let request_result = if let Some((recorder, _)) = find_recorder(ctx) {
596 ctx.send_ws_message(
597 [*recorder],
598 RecordingEvent::Service {
599 event: RecordingServiceCommand::StartStreams {
600 target_ids: target_ids.clone(),
601 },
602 },
603 )
604 .map_err(SignalingModuleError::from)
605 } else {
606 self.request_recorder(ctx)
607 };
608
609 if let Err(e) = request_result {
610 for target_id in &target_ids {
612 self.stream_states.remove(target_id);
613 }
614
615 return Err(e);
616 }
617
618 for target_id in target_ids {
619 ctx.send_ws_message(
620 ctx.participants.in_room(ctx.room).connected().ids(),
621 RecordingEvent::StreamUpdated {
622 target_id,
623 status: StreamStatus::Requested,
624 },
625 )?;
626
627 ctx.send_ws_message(
628 ctx.participants
629 .connected()
630 .iter()
631 .filter(|(_, state)| state.room != ctx.room)
632 .map(|(id, _)| *id),
633 RecordingEvent::StreamUpdated {
634 target_id,
635 status: StreamStatus::InUse,
636 },
637 )?;
638 }
639
640 Ok(())
641 }
642
643 fn pause_stream(
644 &mut self,
645 ctx: &mut ModuleContext<'_, RecordingModule>,
646 participant_id: ParticipantId,
647 target_ids: BTreeSet<StreamingTargetId>,
648 ) -> Result<(), SignalingModuleError<RecordingError>> {
649 if !ctx.is_moderator(participant_id) {
650 return Err(RecordingError::InsufficientPermissions.into());
651 }
652
653 for target_id in &target_ids {
655 if !self
656 .stream_states
657 .get(target_id)
658 .is_some_and(|(room, status)| *room == ctx.room && status.is_running())
659 {
660 return Err(RecordingError::InvalidStreamingId.into());
661 }
662 }
663
664 let (recorder, _) = find_recorder(ctx)
665 .context("Failed to find recorder in conference despite having active streams")?;
666
667 ctx.send_ws_message(
668 [*recorder],
669 RecordingEvent::Service {
670 event: RecordingServiceCommand::PauseStreams { target_ids },
671 },
672 )?;
673
674 Ok(())
675 }
676
677 fn stop_stream(
678 &mut self,
679 ctx: &mut ModuleContext<'_, RecordingModule>,
680 participant_id: ParticipantId,
681 target_ids: BTreeSet<StreamingTargetId>,
682 ) -> Result<(), SignalingModuleError<RecordingError>> {
683 if !ctx.is_moderator(participant_id) {
684 return Err(RecordingError::InsufficientPermissions.into());
685 }
686
687 for target_id in &target_ids {
689 if !self
690 .stream_states
691 .get(target_id)
692 .is_some_and(|(room, status)| *room == ctx.room && status.is_running())
693 {
694 return Err(RecordingError::InvalidStreamingId.into());
695 }
696 }
697
698 let (recorder, _) = find_recorder(ctx)
699 .context("Failed to find recorder in conference despite having active streams")?;
700
701 ctx.send_ws_message(
702 [*recorder],
703 RecordingEvent::Service {
704 event: RecordingServiceCommand::StopStreams { target_ids },
705 },
706 )?;
707
708 Ok(())
709 }
710
711 fn handle_service_message(
712 &mut self,
713 ctx: &mut ModuleContext<'_, RecordingModule>,
714 participant_id: ParticipantId,
715 command: RecordingServiceEvent,
716 ) -> Result<(), SignalingModuleError<RecordingError>> {
717 if !ctx
718 .participant_state(participant_id)
719 .is_some_and(|state| state.kind == ClientKind::Recorder { room: ctx.room })
720 {
721 return Err(RecordingError::InsufficientPermissions.into());
722 }
723
724 match command {
725 RecordingServiceEvent::RecordingUpdated(status) => {
726 let Some(state) = self.recording_states.get_mut(&ctx.room) else {
727 return Err(RecordingError::InvalidStreamingId.into());
728 };
729
730 *state = status.clone();
731
732 ctx.send_ws_message(
733 ctx.participants.in_room(ctx.room).connected().ids(),
734 RecordingEvent::RecordingUpdated(status),
735 )?;
736
737 Ok(())
738 }
739 RecordingServiceEvent::StreamUpdated { target_id, status } => {
740 let Some((room, current_status)) = self.stream_states.get_mut(&target_id) else {
741 return Err(RecordingError::InvalidStreamingId.into());
742 };
743
744 if *room != ctx.room {
745 return Err(RecordingError::InvalidStreamingId.into());
746 }
747
748 ctx.send_ws_message(
749 ctx.participants.in_room(ctx.room).connected().ids(),
750 RecordingEvent::StreamUpdated {
751 target_id,
752 status: status.clone(),
753 },
754 )?;
755
756 if current_status.can_be_started() != status.can_be_started() {
758 let status = if status.can_be_started() {
759 StreamStatus::Inactive
760 } else {
761 StreamStatus::InUse
762 };
763
764 ctx.send_ws_message(
765 ctx.participants
766 .connected()
767 .iter()
768 .filter(|(_, state)| state.room != ctx.room)
769 .map(|(id, _)| *id),
770 RecordingEvent::StreamUpdated { target_id, status },
771 )?;
772 }
773
774 *current_status = status;
775
776 Ok(())
777 }
778 }
779 }
780
781 fn reset_states_if_no_recorder_is_connected(
784 &mut self,
785 ctx: &mut ModuleContext<'_, RecordingModule>,
786 ) -> Result<(), SignalingModuleError<RecordingError>> {
787 if find_recorder(ctx).is_some() {
788 return Ok(());
789 }
790
791 if self
792 .recording_states
793 .remove(&ctx.room)
794 .is_some_and(|state| state != RecordingStatus::Inactive)
795 {
796 ctx.send_ws_message(
797 ctx.participants.in_room(ctx.room).connected().ids(),
798 RecordingEvent::RecordingUpdated(RecordingStatus::Inactive),
799 )?;
800 }
801
802 let target_ids: Vec<_> = self
804 .stream_states
805 .iter()
806 .filter(|(_, (room, status))| *room == ctx.room && *status != StreamStatus::Inactive)
807 .map(|(target_id, _)| *target_id)
808 .collect();
809
810 for target_id in target_ids {
811 self.stream_states.remove(&target_id);
812
813 ctx.send_ws_message(
814 ctx.participants.in_room(ctx.room).connected().ids(),
815 RecordingEvent::StreamUpdated {
816 target_id,
817 status: StreamStatus::Inactive,
818 },
819 )?;
820 }
821
822 Ok(())
823 }
824}
825
826fn find_recorder<'a>(
827 ctx: &'a ModuleContext<'a, RecordingModule>,
828) -> Option<(&'a ParticipantId, &'a ParticipantState)> {
829 ctx.participants.in_room(ctx.room).iter().find(|(_, p)| {
830 p.is_connected() && matches!(p.kind, ClientKind::Recorder { room } if room == ctx.room)
831 })
832}