opentalk_roomserver_module_livekit/loopback/
mod.rs1use std::collections::BTreeSet;
6
7use futures::{StreamExt as _, stream};
8use livekit_api::services::room::{RoomClient, UpdateParticipantOptions};
9use livekit_protocol::{ParticipantInfo, ParticipantPermission, TrackSource};
10use opentalk_roomserver_types::connection_id::ConnectionId;
11use opentalk_types_signaling::ParticipantId;
12
13pub use crate::loopback::{
14 create_room::create_room, microphone_restriction::update_restricted_microphones,
15 mute::mute_participants, revoke_token::revoke_token,
16 screen_share_permissions::set_screenshare_permissions,
17};
18use crate::{LIVEKIT_MEDIA_SOURCES, PARALLEL_UPDATES};
19
20mod create_room;
21mod microphone_restriction;
22mod mute;
23mod revoke_token;
24mod screen_share_permissions;
25
26pub enum LiveKitLoopback {
27 RoomCreated,
28 RoomRemoved,
29 ProxySocketClosed,
30
31 NoteRevokedTokens {
33 participant_id: ParticipantId,
34 connection_id: ConnectionId,
35 token_identities: BTreeSet<String>,
36 },
37
38 ScreenSharePermissionsUpdated {
39 sender: ParticipantId,
40 participants: BTreeSet<ParticipantId>,
41 grant: bool,
42 },
43}
44
45async fn update_participants_permission(
49 livekit_client: &RoomClient,
50 participants: Vec<ParticipantInfo>,
51 source_numbers: &[i32],
52 grant: bool,
53 room: &str,
54) {
55 stream::iter(participants)
56 .map(|participant| {
57 update_single_participant_permission(
58 livekit_client,
59 participant,
60 source_numbers,
61 grant,
62 room,
63 )
64 })
65 .buffer_unordered(PARALLEL_UPDATES)
66 .collect::<Vec<_>>()
67 .await;
68}
69
70#[tracing::instrument(skip(livekit_client, participant), level = "debug", fields(livekit_participant_id=participant.identity))]
71async fn update_single_participant_permission(
72 livekit_client: &RoomClient,
73 participant: ParticipantInfo,
74 source_numbers: &[i32],
75 grant: bool,
76 room: &str,
77) {
78 let mut can_publish_sources = participant.permission.map_or_else(
79 || {
80 LIVEKIT_MEDIA_SOURCES
81 .map(|s: TrackSource| s as i32)
82 .to_vec()
83 },
84 |p| p.can_publish_sources,
85 );
86
87 for source_number in source_numbers {
88 update_publish_sources(&mut can_publish_sources, *source_number, grant);
89 }
90
91 if let Err(e) = livekit_client
92 .update_participant(
93 room,
94 &participant.identity,
95 UpdateParticipantOptions {
96 permission: Some(ParticipantPermission {
97 can_subscribe: true,
98 can_publish: true,
99 can_publish_data: false,
100 can_publish_sources,
101 hidden: false,
102 can_update_metadata: false,
103 ..Default::default()
104 }),
105 ..Default::default()
106 },
107 )
108 .await
109 {
110 tracing::error!(
111 livekit.participant = participant.identity,
112 room = room,
113 "Failed to update participant, {e}",
114 );
115 }
116 tracing::trace!("participant permissions updated");
117}
118
119fn update_publish_sources(can_publish_sources: &mut Vec<i32>, source: i32, grant: bool) {
120 if grant {
121 if !can_publish_sources.contains(&source) {
122 can_publish_sources.push(source);
123 }
124 } else {
125 can_publish_sources.retain(|&x| x != source);
126 }
127}