1use std::{
5 collections::{BTreeMap, BTreeSet, HashMap},
6 sync::Arc,
7 time::Duration,
8};
9
10use anyhow::anyhow;
11use futures::{StreamExt as _, stream};
12use livekit_api::services::room::RoomClient;
13use livekit_protocol::TrackSource;
14use opentalk_roomserver_livekit_proxy::{ShutdownSender, build_livekit_rtc_url, proxy_websocket};
15use opentalk_roomserver_signaling::{
16 module_context::{ChannelDroppedError, ModuleContext},
17 signaling_module::{
18 ModuleJoinData, ModuleSwitchData, SignalingModule, SignalingModuleDescription,
19 SignalingModuleFeatureDescription, SignalingModuleInitData,
20 },
21};
22use opentalk_roomserver_types::{
23 breakout::BreakoutRoom, connection_id::ConnectionId, room_kind::RoomKind,
24 signaling::module_error::SignalingModuleError,
25};
26use opentalk_roomserver_types_livekit::{
27 LiveKitCommand, LiveKitError, LiveKitEvent, LiveKitInternal, LiveKitSettings, LiveKitState,
28 MicrophoneRestrictionError, MicrophoneRestrictionState, ParticipantsMuted,
29};
30use opentalk_roomserver_web_api::livekit_proxy::{
31 LiveKitProxyTarget, WebsocketRequest, WebsocketResponse,
32};
33use opentalk_types_common::modules::{ModuleId, module_id};
34use opentalk_types_signaling::ParticipantId;
35use tokio::sync::oneshot;
36
37use crate::{
38 loopback::LiveKitLoopback,
39 room::{LiveKitConnection, LiveKitSubroom},
40};
41
42pub mod loopback;
43mod room;
44
45const LIVEKIT_MODULE_ID: ModuleId = module_id!("livekit");
46
47const PARALLEL_UPDATES: usize = 25;
48const ACCESS_TOKEN_TTL: Duration = Duration::from_secs(32);
49const LIVEKIT_MEDIA_SOURCES: [TrackSource; 4] = [
50 TrackSource::Camera,
51 TrackSource::Microphone,
52 TrackSource::ScreenShare,
53 TrackSource::ScreenShareAudio,
54];
55
56pub struct LiveKitModule {
57 settings: Arc<LiveKitSettings>,
58
59 default_screenshare_permission: bool,
65
66 livekit_client: Arc<RoomClient>,
68
69 rooms: HashMap<RoomKind, LiveKitSubroom>,
70 proxy_shutdown: HashMap<(ParticipantId, ConnectionId), ShutdownSender>,
71}
72
73impl SignalingModuleDescription for LiveKitModule {
74 const MODULE_ID: ModuleId = LIVEKIT_MODULE_ID;
75 const DESCRIPTION: &'static str = "Handles Livekit media streams coordination and integration";
76 const FEATURES: &[SignalingModuleFeatureDescription] = &[];
77}
78
79impl SignalingModule for LiveKitModule {
80 const NAMESPACE: ModuleId = LIVEKIT_MODULE_ID;
81
82 type Incoming = LiveKitCommand;
83
84 type Outgoing = LiveKitEvent;
85
86 type Internal = LiveKitInternal;
87
88 type Loopback = Result<LiveKitLoopback, LiveKitError>;
89
90 type JoinInfo = LiveKitState;
91
92 type PeerJoinInfo = ();
93
94 type Error = LiveKitError;
95
96 fn init(init_data: SignalingModuleInitData) -> Option<Self> {
97 let livekit_settings = (init_data
98 .room_parameters
99 .module_settings
100 .get::<LiveKitSettings>()
101 .ok()?)?;
102
103 let default_screenshare_permission = init_data
104 .settings
105 .defaults
106 .as_ref()
107 .is_some_and(|d| !d.screen_share_requires_permission);
108
109 let livekit_client = RoomClient::with_api_key(
110 &livekit_settings.service_url,
111 &livekit_settings.api_key,
112 &livekit_settings.api_secret,
113 );
114
115 Some(Self {
116 settings: Arc::new(livekit_settings.clone()),
117 default_screenshare_permission,
118
119 livekit_client: Arc::new(livekit_client),
120
121 rooms: HashMap::new(),
122 proxy_shutdown: HashMap::new(),
123 })
124 }
125
126 fn on_participant_joined(
127 &mut self,
128 ctx: &mut ModuleContext<'_, Self>,
129 participant_id: ParticipantId,
130 connection_id: ConnectionId,
131 _is_first_connection: bool,
132 ) -> Result<ModuleJoinData<Self>, SignalingModuleError<Self::Error>> {
133 let room = self.rooms.entry(ctx.room).or_insert_with(|| {
134 LiveKitSubroom::new(
135 ctx,
136 self.default_screenshare_permission,
137 Arc::clone(&self.settings),
138 Arc::clone(&self.livekit_client),
139 ctx.room_id,
140 ctx.room,
141 )
142 });
143
144 room.join_info(ctx, participant_id, connection_id)
145 }
146
147 fn on_participant_disconnected(
148 &mut self,
149 ctx: &mut ModuleContext<'_, Self>,
150 participant_id: ParticipantId,
151 connection_id: ConnectionId,
152 ) -> Result<(), SignalingModuleError<Self::Error>> {
153 self.cancel_proxies_for_connection(participant_id, connection_id);
154
155 let Some(room) = self.rooms.get_mut(&ctx.room) else {
156 return Err(anyhow::anyhow!("Unknown room").into());
157 };
158 room.start_revoke_participant_access(ctx, participant_id, connection_id);
159 Ok(())
160 }
161
162 fn on_websocket_message(
163 &mut self,
164 ctx: &mut ModuleContext<'_, Self>,
165 sender: ParticipantId,
166 connection_id: ConnectionId,
167 payload: Self::Incoming,
168 ) -> Result<(), SignalingModuleError<Self::Error>> {
169 match payload {
170 LiveKitCommand::CreateNewAccessToken => {
171 self.issue_access_token(ctx, sender, connection_id)
172 }
173 LiveKitCommand::GrantScreenSharePermission { participants } => {
174 self.set_screenshare_permissions(ctx, sender, participants, true)
175 }
176 LiveKitCommand::RevokeScreenSharePermission { participants } => {
177 self.set_screenshare_permissions(ctx, sender, participants, false)
178 }
179 LiveKitCommand::RequestPopoutStreamAccessToken => {
180 self.issue_popout_stream_access_token(ctx, sender, connection_id)
181 }
182 }
183 }
184
185 fn on_loopback_event(
186 &mut self,
187 ctx: &mut ModuleContext<'_, Self>,
188 event: Self::Loopback,
189 ) -> Result<(), SignalingModuleError<Self::Error>> {
190 match event? {
191 LiveKitLoopback::RoomCreated
192 | LiveKitLoopback::RoomRemoved
193 | LiveKitLoopback::ProxySocketClosed => Ok(()),
194 LiveKitLoopback::NoteRevokedTokens {
195 token_identities,
196 participant_id,
197 connection_id,
198 } => self.note_revoked_tokens(ctx, token_identities, participant_id, connection_id),
199 LiveKitLoopback::ScreenSharePermissionsUpdated {
200 sender,
201 participants,
202 grant,
203 } => Self::notify_screen_share_permission_update(ctx, sender, participants, grant),
204 }
205 }
206
207 fn on_internal_command(
208 &mut self,
209 ctx: &mut ModuleContext<'_, Self>,
210 command: Self::Internal,
211 ) -> Result<(), SignalingModuleError<Self::Error>> {
212 match command {
213 LiveKitInternal::Mute {
214 sender,
215 participants,
216 return_channel,
217 } => self.mute(ctx, sender, participants, return_channel),
218 LiveKitInternal::UpdateMicrophoneRestrictions {
219 sender,
220 new_state,
221 return_channel,
222 } => self.update_microphone_restrictions(ctx, sender, new_state, return_channel)?,
223 LiveKitInternal::ProxyLivekitSocket {
224 websocket_request,
225 return_channel,
226 } => self.proxy_livekit_socket(ctx, *websocket_request, return_channel),
227 LiveKitInternal::GetLivekitServiceUrl { return_channel } => {
228 self.get_livekit_service_url(return_channel)
229 }
230 }
231 Ok(())
232 }
233
234 fn on_closing(&mut self, ctx: &mut ModuleContext<'_, Self>) -> Result<(), anyhow::Error> {
235 self.proxy_shutdown.clear();
236
237 let rooms = self.rooms.drain().collect();
238 Self::cleanup_rooms(ctx, rooms);
239
240 Ok(())
241 }
242
243 fn on_breakout_start(
244 &mut self,
245 ctx: &mut ModuleContext<'_, Self>,
246 rooms: &[BreakoutRoom],
247 _duration: Option<Duration>,
248 ) -> Result<(), SignalingModuleError<Self::Error>> {
249 for room in rooms {
250 self.rooms
251 .entry(RoomKind::Breakout(room.id))
252 .or_insert_with(|| {
253 let room_kind = RoomKind::Breakout(room.id);
254 tracing::debug!("create room: {:?}", room_kind);
255 LiveKitSubroom::new(
256 ctx,
257 self.default_screenshare_permission,
258 self.settings.clone(),
259 Arc::clone(&self.livekit_client),
260 ctx.room_id,
261 room_kind,
262 )
263 });
264 }
265 Ok(())
266 }
267
268 fn on_breakout_switch(
269 &mut self,
270 ctx: &mut ModuleContext<'_, Self>,
271 participant_id: ParticipantId,
272 old_room: RoomKind,
273 new_room: RoomKind,
274 ) -> Result<ModuleSwitchData<Self>, SignalingModuleError<Self::Error>> {
275 let connections = ctx.participants.connections();
276 let connections = connections.get(&participant_id).ok_or_else(|| {
277 anyhow::anyhow!("Unknown participant can't switch breakout rooms {participant_id}")
278 })?;
279
280 for connection_id in connections {
281 self.cancel_proxies_for_connection(participant_id, *connection_id);
282 }
283
284 let Some(room) = self.rooms.get_mut(&old_room) else {
285 return Err(anyhow::anyhow!(
286 "Source room not found while switching breakout rooms ({old_room:?})"
287 )
288 .into());
289 };
290 for connection_id in connections {
291 room.start_revoke_participant_access(ctx, participant_id, *connection_id);
292 }
293
294 let Some(room) = self.rooms.get_mut(&new_room) else {
295 return Err(anyhow::anyhow!(
296 "Destination room not found while switching breakout rooms ({new_room:?})"
297 )
298 .into());
299 };
300 let mut switch_success = BTreeMap::new();
301 for &connection_id in connections {
302 let join_info = room
303 .join_info(ctx, participant_id, connection_id)?
304 .join_success;
305 switch_success.insert(connection_id, join_info);
306 }
307 Ok(ModuleSwitchData {
308 switch_success,
309 ..Default::default()
310 })
311 }
312
313 fn on_breakout_closed(
314 &mut self,
315 ctx: &mut ModuleContext<'_, Self>,
316 ) -> Result<(), SignalingModuleError<Self::Error>> {
317 let breakout_rooms = self
318 .rooms
319 .extract_if(|kind, _| *kind != RoomKind::Main)
320 .collect();
321 Self::cleanup_rooms(ctx, breakout_rooms);
322
323 Ok(())
324 }
325}
326
327impl LiveKitModule {
328 fn proxy_livekit_socket(
329 &mut self,
330 ctx: &mut ModuleContext<'_, Self>,
331 websocket_request: WebsocketRequest,
332 response_sender: oneshot::Sender<WebsocketResponse>,
333 ) {
334 if !Self::is_proxy_request_authorized(ctx, &websocket_request) {
335 let _ = response_sender.send(WebsocketResponse::unauthorized());
336 return;
337 }
338
339 let participant_id = websocket_request.participant_id;
340 let connection_id = websocket_request.connection_id;
341 let LiveKitProxyTarget::LiveKit { room_kind } = websocket_request.proxy_target else {
342 let _ = response_sender.send(WebsocketResponse::unauthorized());
343 return;
344 };
345 let access_token = websocket_request.access_token.clone();
346 let Ok(livekit_rtc_url) = build_livekit_rtc_url(&self.settings.service_url) else {
347 tracing::warn!(?self.settings.service_url, "invalid livekit service URL");
348 let _ = response_sender.send(WebsocketResponse::internal_error());
349 return;
350 };
351
352 let (shutdown_tx, shutdown_rx) = ShutdownSender::new();
355 let shutdown_key = (participant_id, connection_id);
356 self.proxy_shutdown.insert(shutdown_key, shutdown_tx);
357
358 let (socket_tx, socket_rx) = oneshot::channel();
360 let response = websocket_request.ws_upgrade(move |socket| async move {
361 let _ = socket_tx.send(socket);
362 });
363
364 if response_sender.send(response).is_err() {
365 self.proxy_shutdown.remove(&shutdown_key);
366 return;
367 }
368
369 ctx.spawn(async move {
370 let Ok(downstream_socket) = socket_rx.await else {
371 return Ok(LiveKitLoopback::ProxySocketClosed);
372 };
373
374 if let Err(err) = proxy_websocket(
375 livekit_rtc_url,
376 access_token,
377 downstream_socket,
378 shutdown_rx,
379 )
380 .await
381 {
382 tracing::warn!(
383 ?participant_id,
384 ?connection_id,
385 ?room_kind,
386 "livekit websocket proxy stopped with error: {err:?}"
387 );
388 }
389
390 Ok(LiveKitLoopback::ProxySocketClosed)
391 });
392 }
393
394 fn is_proxy_request_authorized(
395 ctx: &ModuleContext<'_, Self>,
396 websocket_request: &WebsocketRequest,
397 ) -> bool {
398 let LiveKitProxyTarget::LiveKit { room_kind } = &websocket_request.proxy_target else {
399 return false;
400 };
401
402 ctx.participant_state(websocket_request.participant_id)
403 .is_some_and(|participant| {
404 !participant.in_waiting_room
405 && participant.room == *room_kind
406 && participant
407 .connections
408 .contains_key(&websocket_request.connection_id)
409 })
410 }
411
412 fn cancel_proxies_for_connection(
413 &mut self,
414 participant_id: ParticipantId,
415 connection_id: ConnectionId,
416 ) {
417 self.proxy_shutdown.remove(&(participant_id, connection_id));
418 }
419
420 fn get_livekit_service_url(&self, return_channel: oneshot::Sender<String>) {
421 let _ = return_channel.send(self.settings.service_url.clone());
422 }
423 fn issue_access_token(
425 &mut self,
426 ctx: &mut ModuleContext<'_, LiveKitModule>,
427 participant: ParticipantId,
428 connection: ConnectionId,
429 ) -> Result<(), SignalingModuleError<LiveKitError>> {
430 let Some(room) = self.rooms.get_mut(&ctx.room) else {
431 return Err(anyhow::anyhow!("Unknown room").into());
432 };
433 tracing::debug!("Issue access token to {participant:?}");
434 let credentials = room.create_new_access_token(ctx, participant, connection)?;
435 ctx.send_ws_message([participant], LiveKitEvent::Credentials(credentials))?;
436 Ok(())
437 }
438
439 #[tracing::instrument(level = "debug", skip(self, ctx, return_channel))]
440 pub fn mute(
441 &self,
442 ctx: &mut ModuleContext<'_, LiveKitModule>,
443 sender: Option<ParticipantId>,
444 participants: BTreeSet<ParticipantId>,
445 return_channel: oneshot::Sender<ParticipantsMuted>,
446 ) {
447 let connections = ctx
448 .participants
449 .all_unfiltered
450 .iter()
451 .filter(|(participant_id, _)| participants.contains(participant_id))
452 .flat_map(|(participant_id, state)| {
453 state.connections().map(|connection_id| {
454 LiveKitConnection::new(*participant_id, connection_id, ctx.room_id, state.room)
455 })
456 })
457 .collect();
458
459 tracing::debug!("spawn background task to mute participants");
460 let livekit_client = Arc::clone(&self.livekit_client);
461 ctx.spawn_optional(async move {
462 let muted = loopback::mute_participants(livekit_client, sender, connections).await;
463 if return_channel.send(muted).is_err() {
464 tracing::error!("Channel dropped when muting participants");
465 }
466 None
467 });
468 }
469
470 fn note_revoked_tokens(
471 &mut self,
472 ctx: &mut ModuleContext<'_, LiveKitModule>,
473 revoked_token_identities: BTreeSet<String>,
474 participant_id: ParticipantId,
475 connection_id: ConnectionId,
476 ) -> Result<(), SignalingModuleError<LiveKitError>> {
477 let Some(room) = self.rooms.get_mut(&ctx.room) else {
478 return Err(anyhow::anyhow!("Unknown room").into());
479 };
480 room.note_revoked_tokens(revoked_token_identities, participant_id, connection_id)
481 }
482
483 fn set_screenshare_permissions(
484 &mut self,
485 ctx: &mut ModuleContext<'_, LiveKitModule>,
486 sender: ParticipantId,
487 participants: BTreeSet<ParticipantId>,
488 grant: bool,
489 ) -> Result<(), SignalingModuleError<LiveKitError>> {
490 let Some(room) = self.rooms.get_mut(&ctx.room) else {
491 return Err(anyhow::anyhow!("Unknown room").into());
492 };
493 room.set_screenshare_permissions(ctx, sender, participants, grant)
494 }
495
496 fn notify_screen_share_permission_update(
497 ctx: &mut ModuleContext<'_, LiveKitModule>,
498 sender: ParticipantId,
499 participants: BTreeSet<ParticipantId>,
500 grant: bool,
501 ) -> Result<(), SignalingModuleError<LiveKitError>> {
502 ctx.send_ws_message(
503 [sender],
504 LiveKitEvent::ScreenSharePermissionsUpdated {
505 grant,
506 participants,
507 },
508 )?;
509 Ok(())
510 }
511
512 fn issue_popout_stream_access_token(
513 &mut self,
514 ctx: &mut ModuleContext<'_, LiveKitModule>,
515 participant_id: ParticipantId,
516 connection_id: ConnectionId,
517 ) -> Result<(), SignalingModuleError<LiveKitError>> {
518 let Some(room) = self.rooms.get_mut(&ctx.room) else {
519 return Err(anyhow::anyhow!("Unknown room").into());
520 };
521 room.issue_popout_stream_access_token(ctx, participant_id, connection_id)
522 }
523
524 fn update_microphone_restrictions(
525 &mut self,
526 ctx: &mut ModuleContext<'_, Self>,
527 sender: ParticipantId,
528 new_state: MicrophoneRestrictionState,
529 return_channel: oneshot::Sender<
530 Result<MicrophoneRestrictionState, MicrophoneRestrictionError>,
531 >,
532 ) -> Result<(), SignalingModuleError<LiveKitError>> {
533 let Some(room) = self.rooms.get_mut(&ctx.room) else {
534 return Err(anyhow::anyhow!("Unknown room").into());
535 };
536 room.update_microphone_restrictions(ctx, sender, new_state, return_channel)
537 .map_err(|ChannelDroppedError| {
538 SignalingModuleError::Internal(anyhow!(
539 "Channel dropped when restricting microphone permissions"
540 ))
541 })
542 }
543
544 fn cleanup_rooms(ctx: &mut ModuleContext<'_, Self>, rooms: HashMap<RoomKind, LiveKitSubroom>) {
545 ctx.spawn(async {
546 stream::iter(rooms.into_iter().map(|(id, r)| async move {
547 r.cleanup_room().await;
548 tracing::debug!("LiveKitRoom removed: {id:?}");
549 }))
550 .buffer_unordered(PARALLEL_UPDATES)
551 .collect::<Vec<()>>()
552 .await;
553
554 Ok(LiveKitLoopback::RoomRemoved)
555 });
556 }
557}
558
559fn build_livekit_participant_id(participant: ParticipantId, connection: ConnectionId) -> String {
560 format!("{participant}:{connection}")
561}