Skip to main content

opentalk_roomserver_module_recording/
lib.rs

1// SPDX-FileCopyrightText: OpenTalk GmbH <mail@opentalk.eu>
2//
3// SPDX-License-Identifier: EUPL-1.2
4
5use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet, hash_map::Entry};
6
7use anyhow::Context;
8use opentalk_roomserver_signaling::{
9    module_context::ModuleContext,
10    participant_state::ParticipantState,
11    signaling_module::{
12        ModuleJoinData, ModuleSwitchData, NoOp, PeerDataMap, SignalingModule,
13        SignalingModuleDescription, SignalingModuleFeatureDescription, SignalingModuleInitData,
14    },
15};
16use opentalk_roomserver_types::{
17    breakout::BreakoutRoom,
18    client_parameters::ClientKind,
19    connection_id::ConnectionId,
20    room_kind::RoomKind,
21    signaling::module_error::{FatalError, SignalingModuleError},
22};
23use opentalk_roomserver_types_recording::{
24    RECORD_FEATURE_ID, RECORDING_MODULE_ID, RecordingSettings, RecordingStatus, STREAM_FEATURE_ID,
25    StreamStatus, StreamingTarget,
26    command::RecordingCommand,
27    event::{RecordingError, RecordingEvent},
28    peer_state::RecordingPeerState,
29    service::{
30        command::RecordingServiceCommand,
31        event::RecordingServiceEvent,
32        state::{RecordingServiceState, ServiceStreamingTarget},
33    },
34    state::RecordingState,
35};
36use opentalk_types_api_internal::recording::RecordingTarget;
37use opentalk_types_common::{
38    features::ModuleFeatureId,
39    modules::ModuleId,
40    streaming::{RoomStreamingTarget, StreamingTargetId},
41};
42use opentalk_types_signaling::ParticipantId;
43
44pub struct RecordingModule {
45    settings: RecordingSettings,
46    http_client: reqwest::Client,
47
48    // Features
49    can_record: bool,
50    can_stream: bool,
51
52    /// Streaming targets configured for the room
53    streaming_targets: Vec<RoomStreamingTarget>,
54
55    /// Set of participants which consented to being recorded
56    consenting_participants: HashSet<ParticipantId>,
57
58    /// Recording state for each room. No entry equals `StreamStatus::Inactive`
59    recording_states: HashMap<RoomKind, RecordingStatus>,
60
61    /// Stream states and the room in which the configuration is used.
62    /// No entry means the streaming target is unused
63    stream_states: HashMap<StreamingTargetId, (RoomKind, StreamStatus)>,
64}
65
66pub enum LoopBackEvent {
67    RecorderRequestFailed,
68}
69
70impl SignalingModuleDescription for RecordingModule {
71    const MODULE_ID: ModuleId = RECORDING_MODULE_ID;
72    const DESCRIPTION: &'static str = "Handles recording functionality.";
73    const FEATURES: &[SignalingModuleFeatureDescription] = &[
74        SignalingModuleFeatureDescription {
75            feature_id: RECORD_FEATURE_ID,
76            description: "Allows creation of recordings for meetings",
77        },
78        SignalingModuleFeatureDescription {
79            feature_id: STREAM_FEATURE_ID,
80            description: "Allows streaming meetings to streaming services",
81        },
82    ];
83}
84
85impl SignalingModule for RecordingModule {
86    const NAMESPACE: ModuleId = RECORDING_MODULE_ID;
87
88    type Incoming = RecordingCommand;
89    type Outgoing = RecordingEvent;
90    type Internal = NoOp;
91    type Loopback = LoopBackEvent;
92    type JoinInfo = RecordingState;
93    type PeerJoinInfo = RecordingPeerState;
94    type Error = RecordingError;
95
96    fn init(init_data: SignalingModuleInitData) -> Option<Self> {
97        let settings = (init_data
98            .room_parameters
99            .module_settings
100            .get::<RecordingSettings>()
101            .ok()?)?;
102
103        let tariff = &init_data.room_parameters.tariff;
104        let can_record = !tariff.disabled_features.contains(&ModuleFeatureId {
105            module: RECORDING_MODULE_ID,
106            feature: RECORD_FEATURE_ID,
107        });
108        let can_stream = !tariff.disabled_features.contains(&ModuleFeatureId {
109            module: RECORDING_MODULE_ID,
110            feature: STREAM_FEATURE_ID,
111        });
112
113        Some(Self {
114            settings,
115            http_client: reqwest::Client::new(),
116            can_record,
117            can_stream,
118            streaming_targets: init_data.room_parameters.streaming_targets.clone(),
119            consenting_participants: HashSet::new(),
120            recording_states: HashMap::new(),
121            stream_states: HashMap::new(),
122        })
123    }
124
125    fn on_participant_joined(
126        &mut self,
127        ctx: &mut ModuleContext<'_, Self>,
128        p_joined: ParticipantId,
129        _connection_id: ConnectionId,
130        _is_first_connection: bool,
131    ) -> Result<ModuleJoinData<Self>, SignalingModuleError<Self::Error>> {
132        let recording_state = self.build_module_state(ctx, p_joined)?;
133
134        let mut peer_data = PeerDataMap::default();
135
136        // Use insert_for_matching here to avoid serializing the same message multiple times
137        peer_data.insert_for_matching(
138            ctx,
139            RecordingPeerState {
140                consents_recording: true,
141            },
142            |participant_id, _| self.consenting_participants.contains(&participant_id),
143        )?;
144        peer_data.insert_for_matching(
145            ctx,
146            RecordingPeerState {
147                consents_recording: false,
148            },
149            |participant_id, _| !self.consenting_participants.contains(&participant_id),
150        )?;
151
152        let mut peer_events = PeerDataMap::default();
153        peer_events.insert_for_all(
154            ctx,
155            RecordingPeerState {
156                consents_recording: self.consenting_participants.contains(&p_joined),
157            },
158        )?;
159
160        Ok(ModuleJoinData {
161            join_success: Some(recording_state),
162            peer_events,
163            peer_data,
164        })
165    }
166
167    fn on_participant_disconnected(
168        &mut self,
169        ctx: &mut ModuleContext<'_, Self>,
170        participant_id: ParticipantId,
171        _connection_id: ConnectionId,
172    ) -> Result<(), SignalingModuleError<Self::Error>> {
173        let participant_state = ctx
174            .participant_state(participant_id)
175            .context("Failed to find disconnected participant state")
176            .map_err(FatalError)?;
177
178        if participant_state.kind == (ClientKind::Recorder { room: ctx.room }) {
179            self.reset_states_if_no_recorder_is_connected(ctx)
180        } else {
181            Ok(())
182        }
183    }
184
185    fn on_websocket_message(
186        &mut self,
187        ctx: &mut ModuleContext<'_, Self>,
188        participant_id: ParticipantId,
189        _connection_id: ConnectionId,
190        command: Self::Incoming,
191    ) -> Result<(), SignalingModuleError<Self::Error>> {
192        match command {
193            RecordingCommand::SetConsent { consent } => {
194                self.set_consent(ctx, participant_id, consent)
195            }
196            RecordingCommand::StartRecording => self.start_recording(ctx, participant_id),
197            RecordingCommand::PauseRecording => self.pause_recording(ctx, participant_id),
198            RecordingCommand::StopRecording => self.stop_recording(ctx, participant_id),
199            RecordingCommand::StartStream { target_ids } => {
200                self.start_stream(ctx, participant_id, target_ids)
201            }
202            RecordingCommand::PauseStream { target_ids } => {
203                self.pause_stream(ctx, participant_id, target_ids)
204            }
205            RecordingCommand::StopStream { target_ids } => {
206                self.stop_stream(ctx, participant_id, target_ids)
207            }
208            RecordingCommand::Service { command } => {
209                self.handle_service_message(ctx, participant_id, command)
210            }
211        }
212    }
213
214    fn on_breakout_start(
215        &mut self,
216        _ctx: &mut ModuleContext<'_, Self>,
217        _rooms: &[BreakoutRoom],
218        _duration: Option<std::time::Duration>,
219    ) -> Result<(), SignalingModuleError<Self::Error>> {
220        Ok(())
221    }
222
223    fn on_breakout_switch(
224        &mut self,
225        ctx: &mut ModuleContext<'_, Self>,
226        participant_id: ParticipantId,
227        _old_room: RoomKind,
228        _new_room: RoomKind,
229    ) -> Result<ModuleSwitchData<Self>, SignalingModuleError<Self::Error>> {
230        let recording_state = self.build_module_state(ctx, participant_id)?;
231
232        let connections = ctx
233            .participant_state(participant_id)
234            .with_context(|| format!("Participant '{participant_id}' does not have state"))?
235            .connections();
236
237        let switch_success = connections
238            .map(|connection_id| (connection_id, Some(recording_state.clone())))
239            .collect();
240
241        Ok(ModuleSwitchData {
242            switch_success,
243            ..Default::default()
244        })
245    }
246
247    fn on_loopback_event(
248        &mut self,
249        ctx: &mut ModuleContext<'_, Self>,
250        event: Self::Loopback,
251    ) -> Result<(), SignalingModuleError<Self::Error>> {
252        match event {
253            LoopBackEvent::RecorderRequestFailed => {
254                // Roll back requested targets since recorder could not be found
255                self.reset_states_if_no_recorder_is_connected(ctx)?;
256
257                Err(RecordingError::FailedToRequestRecordingService.into())
258            }
259        }
260    }
261
262    fn on_breakout_closed(
263        &mut self,
264        _ctx: &mut ModuleContext<'_, Self>,
265    ) -> Result<(), SignalingModuleError<Self::Error>> {
266        Ok(())
267    }
268}
269
270impl RecordingModule {
271    fn build_module_state(
272        &mut self,
273        ctx: &mut ModuleContext<'_, RecordingModule>,
274        participant_id: ParticipantId,
275    ) -> Result<RecordingState, SignalingModuleError<RecordingError>> {
276        let mut stream_states = BTreeMap::new();
277
278        for target in &self.streaming_targets {
279            let status = self
280                .stream_states
281                .get(&target.id)
282                .map(|(room, status)| {
283                    if *room == ctx.room {
284                        status.clone()
285                    } else {
286                        StreamStatus::InUse
287                    }
288                })
289                .unwrap_or(StreamStatus::Inactive);
290
291            let public_url = match &target.streaming_target.kind {
292                opentalk_types_common::streaming::StreamingTargetKind::Custom {
293                    public_url,
294                    ..
295                } => public_url.clone(),
296            };
297
298            stream_states.insert(
299                target.id,
300                StreamingTarget {
301                    name: target.streaming_target.name.clone(),
302                    public_url,
303                    status,
304                },
305            );
306        }
307
308        let mut recording_state = RecordingState {
309            consents_recording: self.consenting_participants.contains(&participant_id),
310            recording_state: self
311                .recording_states
312                .get(&ctx.room)
313                .cloned()
314                .unwrap_or(RecordingStatus::Inactive),
315            stream_states,
316            service: None,
317        };
318
319        let joined_participant_state = ctx
320            .participant_state(participant_id)
321            .context("Failed to find state for joined participant")
322            .map_err(FatalError)?;
323
324        if joined_participant_state.kind == (ClientKind::Recorder { room: ctx.room }) {
325            let mut streaming_targets = BTreeMap::new();
326
327            for target in &self.streaming_targets {
328                let Some(location) = target.streaming_target.kind.get_stream_target_location()
329                else {
330                    tracing::warn!("Failed to build streaming url for a streaming-target");
331                    continue;
332                };
333
334                streaming_targets.insert(target.id, ServiceStreamingTarget { location });
335            }
336
337            recording_state.service = Some(RecordingServiceState { streaming_targets });
338        }
339
340        Ok(recording_state)
341    }
342
343    fn set_consent(
344        &mut self,
345        ctx: &mut ModuleContext<'_, RecordingModule>,
346        participant_id: ParticipantId,
347        consent: bool,
348    ) -> Result<(), SignalingModuleError<RecordingError>> {
349        let updated = if consent {
350            self.consenting_participants.insert(participant_id)
351        } else {
352            self.consenting_participants.remove(&participant_id)
353        };
354
355        if updated {
356            ctx.send_ws_message(
357                ctx.participants.connected().ids(),
358                RecordingEvent::ConsentUpdated {
359                    participant: participant_id,
360                    consents: consent,
361                },
362            )?;
363        }
364
365        Ok(())
366    }
367
368    fn request_recorder(
369        &mut self,
370        ctx: &mut ModuleContext<'_, RecordingModule>,
371    ) -> Result<(), SignalingModuleError<RecordingError>> {
372        let jwt_bearer_token = self
373            .settings
374            .api_key
375            .generate_jwt()
376            .context("Failed to generate JWT from recorder api_key")
377            .map_err(FatalError)?;
378
379        let http_client = self.http_client.clone();
380
381        let body = RecordingTarget {
382            room_id: ctx.room_id,
383            breakout_room: match ctx.room {
384                RoomKind::Main => None,
385                RoomKind::Breakout(breakout_id) => Some(breakout_id.into()),
386            },
387        };
388
389        let url = match self.settings.url.join("v1/init") {
390            Ok(url) => url,
391            Err(err) => {
392                tracing::error!(
393                    "Failed to build recorder init url from base_url: {}, {err}",
394                    self.settings.url
395                );
396                self.reset_states_if_no_recorder_is_connected(ctx)?;
397
398                return Err(RecordingError::FailedToRequestRecordingService.into());
399            }
400        };
401
402        ctx.spawn_optional(async move {
403            tracing::debug!("Sending off recorder start request to {url} with body {body:?}");
404
405            let response = http_client
406                .post(url)
407                .bearer_auth(jwt_bearer_token)
408                .json(&body)
409                .send()
410                .await;
411
412            let response = match response {
413                Ok(response) => response,
414                Err(err) => {
415                    tracing::error!("Failed to send init request to recorder, {err:?}");
416                    return Some(LoopBackEvent::RecorderRequestFailed);
417                }
418            };
419
420            let response_status = response.status();
421            let response_body = response
422                .bytes()
423                .await
424                .map(|bytes| String::from_utf8_lossy(&bytes).into_owned());
425
426            if response_status.is_success() {
427                tracing::debug!(
428                    "Got recorder start response status={} body={:?}",
429                    response_status,
430                    response_body,
431                );
432            } else {
433                tracing::debug!(
434                    "Got non-success response to recorder start request status={} body={:?}",
435                    response_status,
436                    response_body,
437                );
438                return Some(LoopBackEvent::RecorderRequestFailed);
439            }
440
441            None
442        });
443
444        Ok(())
445    }
446
447    fn start_recording(
448        &mut self,
449        ctx: &mut ModuleContext<'_, RecordingModule>,
450        participant_id: ParticipantId,
451    ) -> Result<(), SignalingModuleError<RecordingError>> {
452        if !ctx.is_moderator(participant_id) {
453            return Err(RecordingError::InsufficientPermissions.into());
454        }
455
456        if !self.can_record {
457            return Err(RecordingError::RecordFeatureDisabled.into());
458        }
459
460        match self.recording_states.entry(ctx.room) {
461            Entry::Occupied(mut occupied_entry) => {
462                if occupied_entry.get().is_running() {
463                    return Err(RecordingError::RecordingAlreadyActive.into());
464                }
465
466                occupied_entry.insert(RecordingStatus::Requested);
467            }
468            Entry::Vacant(vacant_entry) => {
469                vacant_entry.insert(RecordingStatus::Requested);
470            }
471        }
472
473        let request_result = if let Some((recorder, _)) = find_recorder(ctx) {
474            ctx.send_ws_message(
475                [*recorder],
476                RecordingEvent::Service {
477                    event: RecordingServiceCommand::StartRecording,
478                },
479            )
480            .map_err(SignalingModuleError::from)
481        } else {
482            self.request_recorder(ctx)
483        };
484
485        if let Err(e) = request_result {
486            // Reset status back to inactive
487            self.recording_states
488                .insert(ctx.room, RecordingStatus::Inactive);
489
490            return Err(e);
491        }
492
493        ctx.send_ws_message(
494            ctx.participants.connected().room(ctx.room).ids(),
495            RecordingEvent::RecordingUpdated(RecordingStatus::Requested),
496        )?;
497
498        Ok(())
499    }
500
501    fn pause_recording(
502        &mut self,
503        ctx: &mut ModuleContext<'_, RecordingModule>,
504        participant_id: ParticipantId,
505    ) -> Result<(), SignalingModuleError<RecordingError>> {
506        if !ctx.is_moderator(participant_id) {
507            return Err(RecordingError::InsufficientPermissions.into());
508        }
509
510        match self.recording_states.entry(ctx.room) {
511            Entry::Occupied(mut occupied_entry) => {
512                occupied_entry.insert(RecordingStatus::Paused);
513            }
514            Entry::Vacant(..) => return Err(RecordingError::RecordingNotActive.into()),
515        }
516
517        let (recorder, _) = find_recorder(ctx)
518            .context("Invalid state, set recording to paused but no recorder is connected")?;
519
520        ctx.send_ws_message(
521            [*recorder],
522            RecordingEvent::Service {
523                event: RecordingServiceCommand::PauseRecording,
524            },
525        )?;
526
527        Ok(())
528    }
529
530    fn stop_recording(
531        &mut self,
532        ctx: &mut ModuleContext<'_, RecordingModule>,
533        participant_id: ParticipantId,
534    ) -> Result<(), SignalingModuleError<RecordingError>> {
535        if !ctx.is_moderator(participant_id) {
536            return Err(RecordingError::InsufficientPermissions.into());
537        }
538
539        if self
540            .recording_states
541            .get(&ctx.room)
542            .is_none_or(|s| !s.is_running())
543        {
544            return Err(RecordingError::RecordingNotActive.into());
545        }
546
547        let (recorder, _) = find_recorder(ctx)
548            .context("Failed to find recorder in conference despite an active recording")?;
549
550        ctx.send_ws_message(
551            [*recorder],
552            RecordingEvent::Service {
553                event: RecordingServiceCommand::StopRecording,
554            },
555        )?;
556
557        Ok(())
558    }
559
560    fn start_stream(
561        &mut self,
562        ctx: &mut ModuleContext<'_, RecordingModule>,
563        participant_id: ParticipantId,
564        target_ids: BTreeSet<StreamingTargetId>,
565    ) -> Result<(), SignalingModuleError<RecordingError>> {
566        if !ctx.is_moderator(participant_id) {
567            return Err(RecordingError::InsufficientPermissions.into());
568        }
569
570        if !self.can_stream {
571            return Err(RecordingError::StreamFeatureDisabled.into());
572        }
573
574        // Validate target ids, must reference a streaming target & it may not be already active
575        for target_id in &target_ids {
576            if !self.streaming_targets.iter().any(|c| c.id == *target_id) {
577                return Err(RecordingError::InvalidStreamingId.into());
578            }
579
580            if self
581                .stream_states
582                .get(target_id)
583                .is_some_and(|(_, status)| !status.can_be_started())
584            {
585                return Err(RecordingError::StreamingTargetInUse.into());
586            }
587        }
588
589        // Set the stream config to be in use
590        for target_id in &target_ids {
591            self.stream_states
592                .insert(*target_id, (ctx.room, StreamStatus::Requested));
593        }
594
595        let request_result = if let Some((recorder, _)) = find_recorder(ctx) {
596            ctx.send_ws_message(
597                [*recorder],
598                RecordingEvent::Service {
599                    event: RecordingServiceCommand::StartStreams {
600                        target_ids: target_ids.clone(),
601                    },
602                },
603            )
604            .map_err(SignalingModuleError::from)
605        } else {
606            self.request_recorder(ctx)
607        };
608
609        if let Err(e) = request_result {
610            // Remove stream states since request was unsuccessful
611            for target_id in &target_ids {
612                self.stream_states.remove(target_id);
613            }
614
615            return Err(e);
616        }
617
618        for target_id in target_ids {
619            ctx.send_ws_message(
620                ctx.participants.in_room(ctx.room).connected().ids(),
621                RecordingEvent::StreamUpdated {
622                    target_id,
623                    status: StreamStatus::Requested,
624                },
625            )?;
626
627            ctx.send_ws_message(
628                ctx.participants
629                    .connected()
630                    .iter()
631                    .filter(|(_, state)| state.room != ctx.room)
632                    .map(|(id, _)| *id),
633                RecordingEvent::StreamUpdated {
634                    target_id,
635                    status: StreamStatus::InUse,
636                },
637            )?;
638        }
639
640        Ok(())
641    }
642
643    fn pause_stream(
644        &mut self,
645        ctx: &mut ModuleContext<'_, RecordingModule>,
646        participant_id: ParticipantId,
647        target_ids: BTreeSet<StreamingTargetId>,
648    ) -> Result<(), SignalingModuleError<RecordingError>> {
649        if !ctx.is_moderator(participant_id) {
650            return Err(RecordingError::InsufficientPermissions.into());
651        }
652
653        // Validate target ids it must be running in the current room
654        for target_id in &target_ids {
655            if !self
656                .stream_states
657                .get(target_id)
658                .is_some_and(|(room, status)| *room == ctx.room && status.is_running())
659            {
660                return Err(RecordingError::InvalidStreamingId.into());
661            }
662        }
663
664        let (recorder, _) = find_recorder(ctx)
665            .context("Failed to find recorder in conference despite having active streams")?;
666
667        ctx.send_ws_message(
668            [*recorder],
669            RecordingEvent::Service {
670                event: RecordingServiceCommand::PauseStreams { target_ids },
671            },
672        )?;
673
674        Ok(())
675    }
676
677    fn stop_stream(
678        &mut self,
679        ctx: &mut ModuleContext<'_, RecordingModule>,
680        participant_id: ParticipantId,
681        target_ids: BTreeSet<StreamingTargetId>,
682    ) -> Result<(), SignalingModuleError<RecordingError>> {
683        if !ctx.is_moderator(participant_id) {
684            return Err(RecordingError::InsufficientPermissions.into());
685        }
686
687        // Validate target ids, must reference a streaming target & it must be running
688        for target_id in &target_ids {
689            if !self
690                .stream_states
691                .get(target_id)
692                .is_some_and(|(room, status)| *room == ctx.room && status.is_running())
693            {
694                return Err(RecordingError::InvalidStreamingId.into());
695            }
696        }
697
698        let (recorder, _) = find_recorder(ctx)
699            .context("Failed to find recorder in conference despite having active streams")?;
700
701        ctx.send_ws_message(
702            [*recorder],
703            RecordingEvent::Service {
704                event: RecordingServiceCommand::StopStreams { target_ids },
705            },
706        )?;
707
708        Ok(())
709    }
710
711    fn handle_service_message(
712        &mut self,
713        ctx: &mut ModuleContext<'_, RecordingModule>,
714        participant_id: ParticipantId,
715        command: RecordingServiceEvent,
716    ) -> Result<(), SignalingModuleError<RecordingError>> {
717        if !ctx
718            .participant_state(participant_id)
719            .is_some_and(|state| state.kind == ClientKind::Recorder { room: ctx.room })
720        {
721            return Err(RecordingError::InsufficientPermissions.into());
722        }
723
724        match command {
725            RecordingServiceEvent::RecordingUpdated(status) => {
726                let Some(state) = self.recording_states.get_mut(&ctx.room) else {
727                    return Err(RecordingError::InvalidStreamingId.into());
728                };
729
730                *state = status.clone();
731
732                ctx.send_ws_message(
733                    ctx.participants.in_room(ctx.room).connected().ids(),
734                    RecordingEvent::RecordingUpdated(status),
735                )?;
736
737                Ok(())
738            }
739            RecordingServiceEvent::StreamUpdated { target_id, status } => {
740                let Some((room, current_status)) = self.stream_states.get_mut(&target_id) else {
741                    return Err(RecordingError::InvalidStreamingId.into());
742                };
743
744                if *room != ctx.room {
745                    return Err(RecordingError::InvalidStreamingId.into());
746                }
747
748                ctx.send_ws_message(
749                    ctx.participants.in_room(ctx.room).connected().ids(),
750                    RecordingEvent::StreamUpdated {
751                        target_id,
752                        status: status.clone(),
753                    },
754                )?;
755
756                // Participants in other rooms must be updated if the StreamingTarget is now in use
757                if current_status.can_be_started() != status.can_be_started() {
758                    let status = if status.can_be_started() {
759                        StreamStatus::Inactive
760                    } else {
761                        StreamStatus::InUse
762                    };
763
764                    ctx.send_ws_message(
765                        ctx.participants
766                            .connected()
767                            .iter()
768                            .filter(|(_, state)| state.room != ctx.room)
769                            .map(|(id, _)| *id),
770                        RecordingEvent::StreamUpdated { target_id, status },
771                    )?;
772                }
773
774                *current_status = status;
775
776                Ok(())
777            }
778        }
779    }
780
781    /// Resets the recording state & stream states back to inactive if no recorder is present in the
782    /// room
783    fn reset_states_if_no_recorder_is_connected(
784        &mut self,
785        ctx: &mut ModuleContext<'_, RecordingModule>,
786    ) -> Result<(), SignalingModuleError<RecordingError>> {
787        if find_recorder(ctx).is_some() {
788            return Ok(());
789        }
790
791        if self
792            .recording_states
793            .remove(&ctx.room)
794            .is_some_and(|state| state != RecordingStatus::Inactive)
795        {
796            ctx.send_ws_message(
797                ctx.participants.in_room(ctx.room).connected().ids(),
798                RecordingEvent::RecordingUpdated(RecordingStatus::Inactive),
799            )?;
800        }
801
802        // Find all stream targets to reset to inactive
803        let target_ids: Vec<_> = self
804            .stream_states
805            .iter()
806            .filter(|(_, (room, status))| *room == ctx.room && *status != StreamStatus::Inactive)
807            .map(|(target_id, _)| *target_id)
808            .collect();
809
810        for target_id in target_ids {
811            self.stream_states.remove(&target_id);
812
813            ctx.send_ws_message(
814                ctx.participants.in_room(ctx.room).connected().ids(),
815                RecordingEvent::StreamUpdated {
816                    target_id,
817                    status: StreamStatus::Inactive,
818                },
819            )?;
820        }
821
822        Ok(())
823    }
824}
825
826fn find_recorder<'a>(
827    ctx: &'a ModuleContext<'a, RecordingModule>,
828) -> Option<(&'a ParticipantId, &'a ParticipantState)> {
829    ctx.participants.in_room(ctx.room).iter().find(|(_, p)| {
830        p.is_connected() && matches!(p.kind, ClientKind::Recorder { room } if room == ctx.room)
831    })
832}