Skip to main content

livekit_datatrack/local/
proto.rs

1// Copyright 2025 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::events::*;
16use crate::{
17    api::{DataTrackInfo, DataTrackSid, InternalError, PublishError},
18    packet::Handle,
19};
20use anyhow::{anyhow, Context};
21use livekit_protocol as proto;
22use std::{borrow::Borrow, sync::RwLock};
23
24// MARK: - Output event -> protocol
25
26impl From<SfuPublishRequest> for proto::PublishDataTrackRequest {
27    fn from(event: SfuPublishRequest) -> Self {
28        use proto::encryption::Type;
29        let encryption = if event.uses_e2ee { Type::Gcm } else { Type::None }.into();
30        Self { pub_handle: event.handle.into(), name: event.name, encryption }
31    }
32}
33
34impl From<SfuUnpublishRequest> for proto::UnpublishDataTrackRequest {
35    fn from(event: SfuUnpublishRequest) -> Self {
36        Self { pub_handle: event.handle.into() }
37    }
38}
39
40// MARK: - Protocol -> input event
41
42impl TryFrom<proto::PublishDataTrackResponse> for SfuPublishResponse {
43    type Error = InternalError;
44
45    fn try_from(msg: proto::PublishDataTrackResponse) -> Result<Self, Self::Error> {
46        let info: DataTrackInfo = msg.info.context("Missing info")?.try_into()?;
47        Ok(Self { handle: info.pub_handle, result: Ok(info) })
48    }
49}
50
51impl TryFrom<proto::UnpublishDataTrackResponse> for SfuUnpublishResponse {
52    type Error = InternalError;
53
54    fn try_from(msg: proto::UnpublishDataTrackResponse) -> Result<Self, Self::Error> {
55        let handle: Handle =
56            msg.info.context("Missing info")?.pub_handle.try_into().map_err(anyhow::Error::from)?;
57        Ok(Self { handle })
58    }
59}
60
61impl TryFrom<proto::DataTrackInfo> for DataTrackInfo {
62    type Error = InternalError;
63
64    fn try_from(msg: proto::DataTrackInfo) -> Result<Self, Self::Error> {
65        let handle: Handle = msg.pub_handle.try_into().map_err(anyhow::Error::from)?;
66        let uses_e2ee = match msg.encryption() {
67            proto::encryption::Type::None => false,
68            proto::encryption::Type::Gcm => true,
69            other => Err(anyhow!("Unsupported E2EE type: {:?}", other))?,
70        };
71        let sid: DataTrackSid = msg.sid.try_into().map_err(anyhow::Error::from)?;
72        Ok(Self { pub_handle: handle, sid: RwLock::new(sid).into(), name: msg.name, uses_e2ee })
73    }
74}
75
76pub fn publish_result_from_request_response(
77    msg: &proto::RequestResponse,
78) -> Option<SfuPublishResponse> {
79    use proto::request_response::{Reason, Request};
80    let Some(request) = &msg.request else { return None };
81    let Request::PublishDataTrack(request) = request else { return None };
82    let Ok(handle) = TryInto::<Handle>::try_into(request.pub_handle) else { return None };
83    let error = match msg.reason() {
84        // If new error reasons are introduced in the future, consider adding them
85        // to the public error enum if they are useful to the user.
86        Reason::NotAllowed => PublishError::NotAllowed,
87        Reason::DuplicateName => PublishError::DuplicateName,
88        Reason::InvalidName => PublishError::InvalidName,
89        _ => PublishError::Internal(anyhow!("SFU rejected: {}", msg.message).into()),
90    };
91    let event = SfuPublishResponse { handle, result: Err(error) };
92    Some(event)
93}
94
95// MARK: - Sync state support
96
97impl From<DataTrackInfo> for proto::DataTrackInfo {
98    fn from(info: DataTrackInfo) -> Self {
99        let encryption = if info.uses_e2ee() {
100            proto::encryption::Type::Gcm
101        } else {
102            proto::encryption::Type::None
103        };
104        Self {
105            pub_handle: info.pub_handle.into(),
106            sid: info.sid().to_string(),
107            name: info.name,
108            encryption: encryption as i32,
109        }
110    }
111}
112
113/// Form publish responses for each publish data track to support sync state.
114pub fn publish_responses_for_sync_state(
115    published_tracks: impl IntoIterator<Item = impl Borrow<DataTrackInfo>>,
116) -> Vec<proto::PublishDataTrackResponse> {
117    published_tracks
118        .into_iter()
119        .map(|info| proto::PublishDataTrackResponse { info: Some(info.borrow().clone().into()) })
120        .collect()
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use fake::{Fake, Faker};
127
128    #[test]
129    fn test_from_publish_request_event() {
130        let event = SfuPublishRequest {
131            handle: 1u32.try_into().unwrap(),
132            name: "track".into(),
133            uses_e2ee: true,
134        };
135        let request: proto::PublishDataTrackRequest = event.into();
136        assert_eq!(request.pub_handle, 1);
137        assert_eq!(request.name, "track");
138        assert_eq!(request.encryption(), proto::encryption::Type::Gcm);
139    }
140
141    #[test]
142    fn test_from_unpublish_request_event() {
143        let event = SfuUnpublishRequest { handle: 1u32.try_into().unwrap() };
144        let request: proto::UnpublishDataTrackRequest = event.into();
145        assert_eq!(request.pub_handle, 1);
146    }
147
148    #[test]
149    fn test_from_publish_response() {
150        let response = proto::PublishDataTrackResponse {
151            info: proto::DataTrackInfo {
152                pub_handle: 1,
153                sid: "DTR_1234".into(),
154                name: "track".into(),
155                encryption: proto::encryption::Type::Gcm.into(),
156            }
157            .into(),
158        };
159        let event: SfuPublishResponse = response.try_into().unwrap();
160        assert_eq!(event.handle, 1u32.try_into().unwrap());
161
162        let info = event.result.expect("Expected ok result");
163        assert_eq!(info.pub_handle, 1u32.try_into().unwrap());
164        assert_eq!(*info.sid.read().unwrap(), "DTR_1234".to_string().try_into().unwrap());
165        assert_eq!(info.name, "track");
166        assert!(info.uses_e2ee);
167    }
168
169    #[test]
170    fn test_from_request_response() {
171        use proto::request_response::{Reason, Request};
172        let response = proto::RequestResponse {
173            request: Request::PublishDataTrack(proto::PublishDataTrackRequest {
174                pub_handle: 1,
175                ..Default::default()
176            })
177            .into(),
178            reason: Reason::NotAllowed.into(),
179            ..Default::default()
180        };
181
182        let event = publish_result_from_request_response(&response).expect("Expected event");
183        assert_eq!(event.handle, 1u32.try_into().unwrap());
184        assert!(matches!(event.result, Err(PublishError::NotAllowed)));
185    }
186
187    #[test]
188    fn test_publish_responses_for_sync_state() {
189        let mut first: DataTrackInfo = Faker.fake();
190        first.uses_e2ee = true;
191
192        let mut second: DataTrackInfo = Faker.fake();
193        second.uses_e2ee = false;
194
195        let publish_responses = publish_responses_for_sync_state(vec![first, second]);
196        assert_eq!(
197            publish_responses[0].info.as_ref().unwrap().encryption(),
198            proto::encryption::Type::Gcm
199        );
200        assert_eq!(
201            publish_responses[1].info.as_ref().unwrap().encryption(),
202            proto::encryption::Type::None
203        );
204    }
205}