Skip to main content

livekit_datatrack/remote/
proto.rs

1// Copyright 2025 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::events::*;
16use crate::{
17    api::{DataTrackInfo, DataTrackSid, InternalError},
18    packet::Handle,
19};
20use livekit_protocol as proto;
21use std::{collections::HashMap, mem};
22
23// MARK: - Protocol -> input event
24
25impl TryFrom<proto::DataTrackSubscriberHandles> for SfuSubscriberHandles {
26    type Error = InternalError;
27
28    fn try_from(msg: proto::DataTrackSubscriberHandles) -> Result<Self, Self::Error> {
29        let mapping = msg
30            .sub_handles
31            .into_iter()
32            .map(|(handle, info)| -> Result<_, InternalError> {
33                let handle: Handle = handle.try_into().map_err(anyhow::Error::from)?;
34                let sid: DataTrackSid = info.track_sid.try_into().map_err(anyhow::Error::from)?;
35                Ok((handle, sid))
36            })
37            .collect::<Result<HashMap<Handle, DataTrackSid>, _>>()?;
38        Ok(SfuSubscriberHandles { mapping })
39    }
40}
41
42/// Extracts an [`SfuPublicationUpdates`] event from a join response.
43///
44/// This takes ownership of the `data_tracks` vector for each participant
45/// (except for the local participant), leaving an empty vector in its place.
46///
47pub fn event_from_join(
48    msg: &mut proto::JoinResponse,
49) -> Result<SfuPublicationUpdates, InternalError> {
50    event_from_participant_info(&mut msg.other_participants, None)
51}
52
53/// Extracts an [`SfuPublicationUpdates`] event from a participant update.
54///
55/// This takes ownership of the `data_tracks` vector for each participant in
56/// the update, leaving an empty vector in its place.
57///
58pub fn event_from_participant_update(
59    msg: &mut proto::ParticipantUpdate,
60    local_participant_identity: &str,
61) -> Result<SfuPublicationUpdates, InternalError> {
62    // TODO: is there a better way to exclude the local participant?
63    event_from_participant_info(&mut msg.participants, local_participant_identity.into())
64}
65
66fn event_from_participant_info(
67    msg: &mut [proto::ParticipantInfo],
68    local_participant_identity: Option<&str>,
69) -> Result<SfuPublicationUpdates, InternalError> {
70    let updates = msg
71        .iter_mut()
72        .filter(|participant| {
73            local_participant_identity.is_none_or(|identity| participant.identity != identity)
74        })
75        .map(|participant| -> Result<_, InternalError> {
76            Ok((participant.identity.clone(), extract_track_info(participant)?))
77        })
78        .collect::<Result<HashMap<String, Vec<DataTrackInfo>>, _>>()?;
79    Ok(SfuPublicationUpdates { updates })
80}
81
82fn extract_track_info(
83    msg: &mut proto::ParticipantInfo,
84) -> Result<Vec<DataTrackInfo>, InternalError> {
85    mem::take(&mut msg.data_tracks)
86        .into_iter()
87        .map(TryInto::<DataTrackInfo>::try_into)
88        .collect::<Result<Vec<_>, InternalError>>()
89}
90
91// MARK: - Output event -> protocol
92
93impl From<SfuUpdateSubscription> for proto::UpdateDataSubscription {
94    fn from(event: SfuUpdateSubscription) -> Self {
95        let update = proto::update_data_subscription::Update {
96            track_sid: event.sid.into(),
97            subscribe: event.subscribe,
98            options: Default::default(),
99        };
100        Self { updates: vec![update] }
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107
108    #[test]
109    fn test_from_subscriber_handles() {
110        let sub_handles = [
111            (
112                1,
113                proto::data_track_subscriber_handles::PublishedDataTrack {
114                    track_sid: "DTR_1234".into(),
115                    ..Default::default()
116                },
117            ),
118            (
119                2,
120                proto::data_track_subscriber_handles::PublishedDataTrack {
121                    track_sid: "DTR_4567".into(),
122                    ..Default::default()
123                },
124            ),
125        ];
126        let subscriber_handles =
127            proto::DataTrackSubscriberHandles { sub_handles: HashMap::from(sub_handles) };
128
129        let event: SfuSubscriberHandles = subscriber_handles.try_into().unwrap();
130        assert_eq!(
131            event.mapping.get(&1u32.try_into().unwrap()).unwrap(),
132            &"DTR_1234".to_string().try_into().unwrap()
133        );
134        assert_eq!(
135            event.mapping.get(&2u32.try_into().unwrap()).unwrap(),
136            &"DTR_4567".to_string().try_into().unwrap()
137        );
138    }
139
140    #[test]
141    fn test_extract_track_info() {
142        let data_tracks = vec![proto::DataTrackInfo {
143            pub_handle: 1,
144            sid: "DTR_1234".into(),
145            name: "track1".into(),
146            encryption: proto::encryption::Type::Gcm.into(),
147        }];
148        let mut participant_info = proto::ParticipantInfo { data_tracks, ..Default::default() };
149
150        let track_info = extract_track_info(&mut participant_info).unwrap();
151        assert!(participant_info.data_tracks.is_empty(), "Expected original vec taken");
152        assert_eq!(track_info.len(), 1);
153
154        let first = track_info.first().unwrap();
155        assert_eq!(first.pub_handle, 1u32.try_into().unwrap());
156        assert_eq!(first.name, "track1");
157        assert_eq!(*first.sid.read().unwrap(), "DTR_1234".to_string().try_into().unwrap());
158    }
159}