Skip to main content

opentalk_roomserver_module_livekit/loopback/
mod.rs

1// SPDX-FileCopyrightText: OpenTalk GmbH <mail@opentalk.eu>
2//
3// SPDX-License-Identifier: EUPL-1.2
4
5use std::collections::BTreeSet;
6
7use futures::{StreamExt as _, stream};
8use livekit_api::services::room::{RoomClient, UpdateParticipantOptions};
9use livekit_protocol::{ParticipantInfo, ParticipantPermission, TrackSource};
10use opentalk_roomserver_types::connection_id::ConnectionId;
11use opentalk_types_signaling::ParticipantId;
12
13pub use crate::loopback::{
14    create_room::create_room, microphone_restriction::update_restricted_microphones,
15    mute::mute_participants, revoke_token::revoke_token,
16    screen_share_permissions::set_screenshare_permissions,
17};
18use crate::{LIVEKIT_MEDIA_SOURCES, PARALLEL_UPDATES};
19
20mod create_room;
21mod microphone_restriction;
22mod mute;
23mod revoke_token;
24mod screen_share_permissions;
25
26pub enum LiveKitLoopback {
27    RoomCreated,
28    RoomRemoved,
29    ProxySocketClosed,
30
31    /// Note that the token identities were removed
32    NoteRevokedTokens {
33        participant_id: ParticipantId,
34        connection_id: ConnectionId,
35        token_identities: BTreeSet<String>,
36    },
37
38    ScreenSharePermissionsUpdated {
39        sender: ParticipantId,
40        participants: BTreeSet<ParticipantId>,
41        grant: bool,
42    },
43}
44
45/// Update all provided participants.
46///
47/// If `grant` is `true`, the provided `source_numbers` will be allowed to be published.
48async fn update_participants_permission(
49    livekit_client: &RoomClient,
50    participants: Vec<ParticipantInfo>,
51    source_numbers: &[i32],
52    grant: bool,
53    room: &str,
54) {
55    stream::iter(participants)
56        .map(|participant| {
57            update_single_participant_permission(
58                livekit_client,
59                participant,
60                source_numbers,
61                grant,
62                room,
63            )
64        })
65        .buffer_unordered(PARALLEL_UPDATES)
66        .collect::<Vec<_>>()
67        .await;
68}
69
70#[tracing::instrument(skip(livekit_client, participant), level = "debug", fields(livekit_participant_id=participant.identity))]
71async fn update_single_participant_permission(
72    livekit_client: &RoomClient,
73    participant: ParticipantInfo,
74    source_numbers: &[i32],
75    grant: bool,
76    room: &str,
77) {
78    let mut can_publish_sources = participant.permission.map_or_else(
79        || {
80            LIVEKIT_MEDIA_SOURCES
81                .map(|s: TrackSource| s as i32)
82                .to_vec()
83        },
84        |p| p.can_publish_sources,
85    );
86
87    for source_number in source_numbers {
88        update_publish_sources(&mut can_publish_sources, *source_number, grant);
89    }
90
91    if let Err(e) = livekit_client
92        .update_participant(
93            room,
94            &participant.identity,
95            UpdateParticipantOptions {
96                permission: Some(ParticipantPermission {
97                    can_subscribe: true,
98                    can_publish: true,
99                    can_publish_data: false,
100                    can_publish_sources,
101                    hidden: false,
102                    can_update_metadata: false,
103                    ..Default::default()
104                }),
105                ..Default::default()
106            },
107        )
108        .await
109    {
110        tracing::error!(
111            livekit.participant = participant.identity,
112            room = room,
113            "Failed to update participant, {e}",
114        );
115    }
116    tracing::trace!("participant permissions updated");
117}
118
119fn update_publish_sources(can_publish_sources: &mut Vec<i32>, source: i32, grant: bool) {
120    if grant {
121        if !can_publish_sources.contains(&source) {
122            can_publish_sources.push(source);
123        }
124    } else {
125        can_publish_sources.retain(|&x| x != source);
126    }
127}