Skip to main content

opentalk_roomserver_module_livekit/
lib.rs

1// SPDX-License-Identifier: EUPL-1.2
2// SPDX-FileCopyrightText: OpenTalk Team <mail@opentalk.eu>
3
4use std::{
5    collections::{BTreeMap, BTreeSet, HashMap},
6    sync::Arc,
7    time::Duration,
8};
9
10use anyhow::anyhow;
11use futures::{StreamExt as _, stream};
12use livekit_api::services::room::RoomClient;
13use livekit_protocol::TrackSource;
14use opentalk_roomserver_livekit_proxy::{ShutdownSender, build_livekit_rtc_url, proxy_websocket};
15use opentalk_roomserver_signaling::{
16    module_context::{ChannelDroppedError, ModuleContext},
17    signaling_module::{
18        ModuleJoinData, ModuleSwitchData, SignalingModule, SignalingModuleDescription,
19        SignalingModuleFeatureDescription, SignalingModuleInitData,
20    },
21};
22use opentalk_roomserver_types::{
23    breakout::BreakoutRoom, connection_id::ConnectionId, room_kind::RoomKind,
24    signaling::module_error::SignalingModuleError,
25};
26use opentalk_roomserver_types_livekit::{
27    LiveKitCommand, LiveKitError, LiveKitEvent, LiveKitInternal, LiveKitSettings, LiveKitState,
28    MicrophoneRestrictionError, MicrophoneRestrictionState, ParticipantsMuted,
29};
30use opentalk_roomserver_web_api::livekit_proxy::{
31    LiveKitProxyTarget, WebsocketRequest, WebsocketResponse,
32};
33use opentalk_types_common::modules::{ModuleId, module_id};
34use opentalk_types_signaling::ParticipantId;
35use tokio::sync::oneshot;
36
37use crate::{
38    loopback::LiveKitLoopback,
39    room::{LiveKitConnection, LiveKitSubroom},
40};
41
42pub mod loopback;
43mod room;
44
45const LIVEKIT_MODULE_ID: ModuleId = module_id!("livekit");
46
47const PARALLEL_UPDATES: usize = 25;
48const ACCESS_TOKEN_TTL: Duration = Duration::from_secs(32);
49const LIVEKIT_MEDIA_SOURCES: [TrackSource; 4] = [
50    TrackSource::Camera,
51    TrackSource::Microphone,
52    TrackSource::ScreenShare,
53    TrackSource::ScreenShareAudio,
54];
55
56pub struct LiveKitModule {
57    settings: Arc<LiveKitSettings>,
58
59    /// The default screenshare permission. If the moderator didn't explicitly set a policy,
60    /// this will be used to grant or deny screensharing privileges.
61    ///
62    /// `True` - all participants are allowed to screenshare
63    /// `False` - only moderators are allowed to screenshare
64    default_screenshare_permission: bool,
65
66    /// LiveKit API client used to communicate with the LiveKit server
67    livekit_client: Arc<RoomClient>,
68
69    rooms: HashMap<RoomKind, LiveKitSubroom>,
70    proxy_shutdown: HashMap<(ParticipantId, ConnectionId), ShutdownSender>,
71}
72
73impl SignalingModuleDescription for LiveKitModule {
74    const MODULE_ID: ModuleId = LIVEKIT_MODULE_ID;
75    const DESCRIPTION: &'static str = "Handles Livekit media streams coordination and integration";
76    const FEATURES: &[SignalingModuleFeatureDescription] = &[];
77}
78
79impl SignalingModule for LiveKitModule {
80    const NAMESPACE: ModuleId = LIVEKIT_MODULE_ID;
81
82    type Incoming = LiveKitCommand;
83
84    type Outgoing = LiveKitEvent;
85
86    type Internal = LiveKitInternal;
87
88    type Loopback = Result<LiveKitLoopback, LiveKitError>;
89
90    type JoinInfo = LiveKitState;
91
92    type PeerJoinInfo = ();
93
94    type Error = LiveKitError;
95
96    fn init(init_data: SignalingModuleInitData) -> Option<Self> {
97        let livekit_settings = (init_data
98            .room_parameters
99            .module_settings
100            .get::<LiveKitSettings>()
101            .ok()?)?;
102
103        let default_screenshare_permission = init_data
104            .settings
105            .defaults
106            .as_ref()
107            .is_some_and(|d| !d.screen_share_requires_permission);
108
109        let livekit_client = RoomClient::with_api_key(
110            &livekit_settings.service_url,
111            &livekit_settings.api_key,
112            &livekit_settings.api_secret,
113        );
114
115        Some(Self {
116            settings: Arc::new(livekit_settings.clone()),
117            default_screenshare_permission,
118
119            livekit_client: Arc::new(livekit_client),
120
121            rooms: HashMap::new(),
122            proxy_shutdown: HashMap::new(),
123        })
124    }
125
126    fn on_participant_joined(
127        &mut self,
128        ctx: &mut ModuleContext<'_, Self>,
129        participant_id: ParticipantId,
130        connection_id: ConnectionId,
131        _is_first_connection: bool,
132    ) -> Result<ModuleJoinData<Self>, SignalingModuleError<Self::Error>> {
133        let room = self.rooms.entry(ctx.room).or_insert_with(|| {
134            LiveKitSubroom::new(
135                ctx,
136                self.default_screenshare_permission,
137                Arc::clone(&self.settings),
138                Arc::clone(&self.livekit_client),
139                ctx.room_id,
140                ctx.room,
141            )
142        });
143
144        room.join_info(ctx, participant_id, connection_id)
145    }
146
147    fn on_participant_disconnected(
148        &mut self,
149        ctx: &mut ModuleContext<'_, Self>,
150        participant_id: ParticipantId,
151        connection_id: ConnectionId,
152    ) -> Result<(), SignalingModuleError<Self::Error>> {
153        self.cancel_proxies_for_connection(participant_id, connection_id);
154
155        let Some(room) = self.rooms.get_mut(&ctx.room) else {
156            return Err(anyhow::anyhow!("Unknown room").into());
157        };
158        room.start_revoke_participant_access(ctx, participant_id, connection_id);
159        Ok(())
160    }
161
162    fn on_websocket_message(
163        &mut self,
164        ctx: &mut ModuleContext<'_, Self>,
165        sender: ParticipantId,
166        connection_id: ConnectionId,
167        payload: Self::Incoming,
168    ) -> Result<(), SignalingModuleError<Self::Error>> {
169        match payload {
170            LiveKitCommand::CreateNewAccessToken => {
171                self.issue_access_token(ctx, sender, connection_id)
172            }
173            LiveKitCommand::GrantScreenSharePermission { participants } => {
174                self.set_screenshare_permissions(ctx, sender, participants, true)
175            }
176            LiveKitCommand::RevokeScreenSharePermission { participants } => {
177                self.set_screenshare_permissions(ctx, sender, participants, false)
178            }
179            LiveKitCommand::RequestPopoutStreamAccessToken => {
180                self.issue_popout_stream_access_token(ctx, sender, connection_id)
181            }
182        }
183    }
184
185    fn on_loopback_event(
186        &mut self,
187        ctx: &mut ModuleContext<'_, Self>,
188        event: Self::Loopback,
189    ) -> Result<(), SignalingModuleError<Self::Error>> {
190        match event? {
191            LiveKitLoopback::RoomCreated
192            | LiveKitLoopback::RoomRemoved
193            | LiveKitLoopback::ProxySocketClosed => Ok(()),
194            LiveKitLoopback::NoteRevokedTokens {
195                token_identities,
196                participant_id,
197                connection_id,
198            } => self.note_revoked_tokens(ctx, token_identities, participant_id, connection_id),
199            LiveKitLoopback::ScreenSharePermissionsUpdated {
200                sender,
201                participants,
202                grant,
203            } => Self::notify_screen_share_permission_update(ctx, sender, participants, grant),
204        }
205    }
206
207    fn on_internal_command(
208        &mut self,
209        ctx: &mut ModuleContext<'_, Self>,
210        command: Self::Internal,
211    ) -> Result<(), SignalingModuleError<Self::Error>> {
212        match command {
213            LiveKitInternal::Mute {
214                sender,
215                participants,
216                return_channel,
217            } => self.mute(ctx, sender, participants, return_channel),
218            LiveKitInternal::UpdateMicrophoneRestrictions {
219                sender,
220                new_state,
221                return_channel,
222            } => self.update_microphone_restrictions(ctx, sender, new_state, return_channel)?,
223            LiveKitInternal::ProxyLivekitSocket {
224                websocket_request,
225                return_channel,
226            } => self.proxy_livekit_socket(ctx, *websocket_request, return_channel),
227            LiveKitInternal::GetLivekitServiceUrl { return_channel } => {
228                self.get_livekit_service_url(return_channel)
229            }
230        }
231        Ok(())
232    }
233
234    fn on_closing(&mut self, ctx: &mut ModuleContext<'_, Self>) -> Result<(), anyhow::Error> {
235        self.proxy_shutdown.clear();
236
237        let rooms = self.rooms.drain().collect();
238        Self::cleanup_rooms(ctx, rooms);
239
240        Ok(())
241    }
242
243    fn on_breakout_start(
244        &mut self,
245        ctx: &mut ModuleContext<'_, Self>,
246        rooms: &[BreakoutRoom],
247        _duration: Option<Duration>,
248    ) -> Result<(), SignalingModuleError<Self::Error>> {
249        for room in rooms {
250            self.rooms
251                .entry(RoomKind::Breakout(room.id))
252                .or_insert_with(|| {
253                    let room_kind = RoomKind::Breakout(room.id);
254                    tracing::debug!("create room: {:?}", room_kind);
255                    LiveKitSubroom::new(
256                        ctx,
257                        self.default_screenshare_permission,
258                        self.settings.clone(),
259                        Arc::clone(&self.livekit_client),
260                        ctx.room_id,
261                        room_kind,
262                    )
263                });
264        }
265        Ok(())
266    }
267
268    fn on_breakout_switch(
269        &mut self,
270        ctx: &mut ModuleContext<'_, Self>,
271        participant_id: ParticipantId,
272        old_room: RoomKind,
273        new_room: RoomKind,
274    ) -> Result<ModuleSwitchData<Self>, SignalingModuleError<Self::Error>> {
275        let connections = ctx.participants.connections();
276        let connections = connections.get(&participant_id).ok_or_else(|| {
277            anyhow::anyhow!("Unknown participant can't switch breakout rooms {participant_id}")
278        })?;
279
280        for connection_id in connections {
281            self.cancel_proxies_for_connection(participant_id, *connection_id);
282        }
283
284        let Some(room) = self.rooms.get_mut(&old_room) else {
285            return Err(anyhow::anyhow!(
286                "Source room not found while switching breakout rooms ({old_room:?})"
287            )
288            .into());
289        };
290        for connection_id in connections {
291            room.start_revoke_participant_access(ctx, participant_id, *connection_id);
292        }
293
294        let Some(room) = self.rooms.get_mut(&new_room) else {
295            return Err(anyhow::anyhow!(
296                "Destination room not found while switching breakout rooms ({new_room:?})"
297            )
298            .into());
299        };
300        let mut switch_success = BTreeMap::new();
301        for &connection_id in connections {
302            let join_info = room
303                .join_info(ctx, participant_id, connection_id)?
304                .join_success;
305            switch_success.insert(connection_id, join_info);
306        }
307        Ok(ModuleSwitchData {
308            switch_success,
309            ..Default::default()
310        })
311    }
312
313    fn on_breakout_closed(
314        &mut self,
315        ctx: &mut ModuleContext<'_, Self>,
316    ) -> Result<(), SignalingModuleError<Self::Error>> {
317        let breakout_rooms = self
318            .rooms
319            .extract_if(|kind, _| *kind != RoomKind::Main)
320            .collect();
321        Self::cleanup_rooms(ctx, breakout_rooms);
322
323        Ok(())
324    }
325}
326
327impl LiveKitModule {
328    fn proxy_livekit_socket(
329        &mut self,
330        ctx: &mut ModuleContext<'_, Self>,
331        websocket_request: WebsocketRequest,
332        response_sender: oneshot::Sender<WebsocketResponse>,
333    ) {
334        if !Self::is_proxy_request_authorized(ctx, &websocket_request) {
335            let _ = response_sender.send(WebsocketResponse::unauthorized());
336            return;
337        }
338
339        let participant_id = websocket_request.participant_id;
340        let connection_id = websocket_request.connection_id;
341        let LiveKitProxyTarget::LiveKit { room_kind } = websocket_request.proxy_target else {
342            let _ = response_sender.send(WebsocketResponse::unauthorized());
343            return;
344        };
345        let access_token = websocket_request.access_token.clone();
346        let Ok(livekit_rtc_url) = build_livekit_rtc_url(&self.settings.service_url) else {
347            tracing::warn!(?self.settings.service_url, "invalid livekit service URL");
348            let _ = response_sender.send(WebsocketResponse::internal_error());
349            return;
350        };
351
352        // Channel used to shutdown the proxy task. This happens when the participant leaves the
353        // meeting
354        let (shutdown_tx, shutdown_rx) = ShutdownSender::new();
355        let shutdown_key = (participant_id, connection_id);
356        self.proxy_shutdown.insert(shutdown_key, shutdown_tx);
357
358        // Channel to pass the socket from the upgrade callback to the loopback task
359        let (socket_tx, socket_rx) = oneshot::channel();
360        let response = websocket_request.ws_upgrade(move |socket| async move {
361            let _ = socket_tx.send(socket);
362        });
363
364        if response_sender.send(response).is_err() {
365            self.proxy_shutdown.remove(&shutdown_key);
366            return;
367        }
368
369        ctx.spawn(async move {
370            let Ok(downstream_socket) = socket_rx.await else {
371                return Ok(LiveKitLoopback::ProxySocketClosed);
372            };
373
374            if let Err(err) = proxy_websocket(
375                livekit_rtc_url,
376                access_token,
377                downstream_socket,
378                shutdown_rx,
379            )
380            .await
381            {
382                tracing::warn!(
383                    ?participant_id,
384                    ?connection_id,
385                    ?room_kind,
386                    "livekit websocket proxy stopped with error: {err:?}"
387                );
388            }
389
390            Ok(LiveKitLoopback::ProxySocketClosed)
391        });
392    }
393
394    fn is_proxy_request_authorized(
395        ctx: &ModuleContext<'_, Self>,
396        websocket_request: &WebsocketRequest,
397    ) -> bool {
398        let LiveKitProxyTarget::LiveKit { room_kind } = &websocket_request.proxy_target else {
399            return false;
400        };
401
402        ctx.participant_state(websocket_request.participant_id)
403            .is_some_and(|participant| {
404                !participant.in_waiting_room
405                    && participant.room == *room_kind
406                    && participant
407                        .connections
408                        .contains_key(&websocket_request.connection_id)
409            })
410    }
411
412    fn cancel_proxies_for_connection(
413        &mut self,
414        participant_id: ParticipantId,
415        connection_id: ConnectionId,
416    ) {
417        self.proxy_shutdown.remove(&(participant_id, connection_id));
418    }
419
420    fn get_livekit_service_url(&self, return_channel: oneshot::Sender<String>) {
421        let _ = return_channel.send(self.settings.service_url.clone());
422    }
423    /// creates a new access token and sends it to the participant
424    fn issue_access_token(
425        &mut self,
426        ctx: &mut ModuleContext<'_, LiveKitModule>,
427        participant: ParticipantId,
428        connection: ConnectionId,
429    ) -> Result<(), SignalingModuleError<LiveKitError>> {
430        let Some(room) = self.rooms.get_mut(&ctx.room) else {
431            return Err(anyhow::anyhow!("Unknown room").into());
432        };
433        tracing::debug!("Issue access token to {participant:?}");
434        let credentials = room.create_new_access_token(ctx, participant, connection)?;
435        ctx.send_ws_message([participant], LiveKitEvent::Credentials(credentials))?;
436        Ok(())
437    }
438
439    #[tracing::instrument(level = "debug", skip(self, ctx, return_channel))]
440    pub fn mute(
441        &self,
442        ctx: &mut ModuleContext<'_, LiveKitModule>,
443        sender: Option<ParticipantId>,
444        participants: BTreeSet<ParticipantId>,
445        return_channel: oneshot::Sender<ParticipantsMuted>,
446    ) {
447        let connections = ctx
448            .participants
449            .all_unfiltered
450            .iter()
451            .filter(|(participant_id, _)| participants.contains(participant_id))
452            .flat_map(|(participant_id, state)| {
453                state.connections().map(|connection_id| {
454                    LiveKitConnection::new(*participant_id, connection_id, ctx.room_id, state.room)
455                })
456            })
457            .collect();
458
459        tracing::debug!("spawn background task to mute participants");
460        let livekit_client = Arc::clone(&self.livekit_client);
461        ctx.spawn_optional(async move {
462            let muted = loopback::mute_participants(livekit_client, sender, connections).await;
463            if return_channel.send(muted).is_err() {
464                tracing::error!("Channel dropped when muting participants");
465            }
466            None
467        });
468    }
469
470    fn note_revoked_tokens(
471        &mut self,
472        ctx: &mut ModuleContext<'_, LiveKitModule>,
473        revoked_token_identities: BTreeSet<String>,
474        participant_id: ParticipantId,
475        connection_id: ConnectionId,
476    ) -> Result<(), SignalingModuleError<LiveKitError>> {
477        let Some(room) = self.rooms.get_mut(&ctx.room) else {
478            return Err(anyhow::anyhow!("Unknown room").into());
479        };
480        room.note_revoked_tokens(revoked_token_identities, participant_id, connection_id)
481    }
482
483    fn set_screenshare_permissions(
484        &mut self,
485        ctx: &mut ModuleContext<'_, LiveKitModule>,
486        sender: ParticipantId,
487        participants: BTreeSet<ParticipantId>,
488        grant: bool,
489    ) -> Result<(), SignalingModuleError<LiveKitError>> {
490        let Some(room) = self.rooms.get_mut(&ctx.room) else {
491            return Err(anyhow::anyhow!("Unknown room").into());
492        };
493        room.set_screenshare_permissions(ctx, sender, participants, grant)
494    }
495
496    fn notify_screen_share_permission_update(
497        ctx: &mut ModuleContext<'_, LiveKitModule>,
498        sender: ParticipantId,
499        participants: BTreeSet<ParticipantId>,
500        grant: bool,
501    ) -> Result<(), SignalingModuleError<LiveKitError>> {
502        ctx.send_ws_message(
503            [sender],
504            LiveKitEvent::ScreenSharePermissionsUpdated {
505                grant,
506                participants,
507            },
508        )?;
509        Ok(())
510    }
511
512    fn issue_popout_stream_access_token(
513        &mut self,
514        ctx: &mut ModuleContext<'_, LiveKitModule>,
515        participant_id: ParticipantId,
516        connection_id: ConnectionId,
517    ) -> Result<(), SignalingModuleError<LiveKitError>> {
518        let Some(room) = self.rooms.get_mut(&ctx.room) else {
519            return Err(anyhow::anyhow!("Unknown room").into());
520        };
521        room.issue_popout_stream_access_token(ctx, participant_id, connection_id)
522    }
523
524    fn update_microphone_restrictions(
525        &mut self,
526        ctx: &mut ModuleContext<'_, Self>,
527        sender: ParticipantId,
528        new_state: MicrophoneRestrictionState,
529        return_channel: oneshot::Sender<
530            Result<MicrophoneRestrictionState, MicrophoneRestrictionError>,
531        >,
532    ) -> Result<(), SignalingModuleError<LiveKitError>> {
533        let Some(room) = self.rooms.get_mut(&ctx.room) else {
534            return Err(anyhow::anyhow!("Unknown room").into());
535        };
536        room.update_microphone_restrictions(ctx, sender, new_state, return_channel)
537            .map_err(|ChannelDroppedError| {
538                SignalingModuleError::Internal(anyhow!(
539                    "Channel dropped when restricting microphone permissions"
540                ))
541            })
542    }
543
544    fn cleanup_rooms(ctx: &mut ModuleContext<'_, Self>, rooms: HashMap<RoomKind, LiveKitSubroom>) {
545        ctx.spawn(async {
546            stream::iter(rooms.into_iter().map(|(id, r)| async move {
547                r.cleanup_room().await;
548                tracing::debug!("LiveKitRoom removed: {id:?}");
549            }))
550            .buffer_unordered(PARALLEL_UPDATES)
551            .collect::<Vec<()>>()
552            .await;
553
554            Ok(LiveKitLoopback::RoomRemoved)
555        });
556    }
557}
558
559fn build_livekit_participant_id(participant: ParticipantId, connection: ConnectionId) -> String {
560    format!("{participant}:{connection}")
561}