livekit_datatrack/local/
proto.rs1use super::events::*;
16use crate::{
17 api::{DataTrackInfo, DataTrackSid, InternalError, PublishError},
18 packet::Handle,
19};
20use anyhow::{anyhow, Context};
21use livekit_protocol as proto;
22use std::{borrow::Borrow, sync::RwLock};
23
24impl From<SfuPublishRequest> for proto::PublishDataTrackRequest {
27 fn from(event: SfuPublishRequest) -> Self {
28 use proto::encryption::Type;
29 let encryption = if event.uses_e2ee { Type::Gcm } else { Type::None }.into();
30 Self { pub_handle: event.handle.into(), name: event.name, encryption }
31 }
32}
33
34impl From<SfuUnpublishRequest> for proto::UnpublishDataTrackRequest {
35 fn from(event: SfuUnpublishRequest) -> Self {
36 Self { pub_handle: event.handle.into() }
37 }
38}
39
40impl TryFrom<proto::PublishDataTrackResponse> for SfuPublishResponse {
43 type Error = InternalError;
44
45 fn try_from(msg: proto::PublishDataTrackResponse) -> Result<Self, Self::Error> {
46 let info: DataTrackInfo = msg.info.context("Missing info")?.try_into()?;
47 Ok(Self { handle: info.pub_handle, result: Ok(info) })
48 }
49}
50
51impl TryFrom<proto::UnpublishDataTrackResponse> for SfuUnpublishResponse {
52 type Error = InternalError;
53
54 fn try_from(msg: proto::UnpublishDataTrackResponse) -> Result<Self, Self::Error> {
55 let handle: Handle =
56 msg.info.context("Missing info")?.pub_handle.try_into().map_err(anyhow::Error::from)?;
57 Ok(Self { handle })
58 }
59}
60
61impl TryFrom<proto::DataTrackInfo> for DataTrackInfo {
62 type Error = InternalError;
63
64 fn try_from(msg: proto::DataTrackInfo) -> Result<Self, Self::Error> {
65 let handle: Handle = msg.pub_handle.try_into().map_err(anyhow::Error::from)?;
66 let uses_e2ee = match msg.encryption() {
67 proto::encryption::Type::None => false,
68 proto::encryption::Type::Gcm => true,
69 other => Err(anyhow!("Unsupported E2EE type: {:?}", other))?,
70 };
71 let sid: DataTrackSid = msg.sid.try_into().map_err(anyhow::Error::from)?;
72 Ok(Self { pub_handle: handle, sid: RwLock::new(sid).into(), name: msg.name, uses_e2ee })
73 }
74}
75
76pub fn publish_result_from_request_response(
77 msg: &proto::RequestResponse,
78) -> Option<SfuPublishResponse> {
79 use proto::request_response::{Reason, Request};
80 let Some(request) = &msg.request else { return None };
81 let Request::PublishDataTrack(request) = request else { return None };
82 let Ok(handle) = TryInto::<Handle>::try_into(request.pub_handle) else { return None };
83 let error = match msg.reason() {
84 Reason::NotAllowed => PublishError::NotAllowed,
87 Reason::DuplicateName => PublishError::DuplicateName,
88 Reason::InvalidName => PublishError::InvalidName,
89 _ => PublishError::Internal(anyhow!("SFU rejected: {}", msg.message).into()),
90 };
91 let event = SfuPublishResponse { handle, result: Err(error) };
92 Some(event)
93}
94
95impl From<DataTrackInfo> for proto::DataTrackInfo {
98 fn from(info: DataTrackInfo) -> Self {
99 let encryption = if info.uses_e2ee() {
100 proto::encryption::Type::Gcm
101 } else {
102 proto::encryption::Type::None
103 };
104 Self {
105 pub_handle: info.pub_handle.into(),
106 sid: info.sid().to_string(),
107 name: info.name,
108 encryption: encryption as i32,
109 }
110 }
111}
112
113pub fn publish_responses_for_sync_state(
115 published_tracks: impl IntoIterator<Item = impl Borrow<DataTrackInfo>>,
116) -> Vec<proto::PublishDataTrackResponse> {
117 published_tracks
118 .into_iter()
119 .map(|info| proto::PublishDataTrackResponse { info: Some(info.borrow().clone().into()) })
120 .collect()
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126 use fake::{Fake, Faker};
127
128 #[test]
129 fn test_from_publish_request_event() {
130 let event = SfuPublishRequest {
131 handle: 1u32.try_into().unwrap(),
132 name: "track".into(),
133 uses_e2ee: true,
134 };
135 let request: proto::PublishDataTrackRequest = event.into();
136 assert_eq!(request.pub_handle, 1);
137 assert_eq!(request.name, "track");
138 assert_eq!(request.encryption(), proto::encryption::Type::Gcm);
139 }
140
141 #[test]
142 fn test_from_unpublish_request_event() {
143 let event = SfuUnpublishRequest { handle: 1u32.try_into().unwrap() };
144 let request: proto::UnpublishDataTrackRequest = event.into();
145 assert_eq!(request.pub_handle, 1);
146 }
147
148 #[test]
149 fn test_from_publish_response() {
150 let response = proto::PublishDataTrackResponse {
151 info: proto::DataTrackInfo {
152 pub_handle: 1,
153 sid: "DTR_1234".into(),
154 name: "track".into(),
155 encryption: proto::encryption::Type::Gcm.into(),
156 }
157 .into(),
158 };
159 let event: SfuPublishResponse = response.try_into().unwrap();
160 assert_eq!(event.handle, 1u32.try_into().unwrap());
161
162 let info = event.result.expect("Expected ok result");
163 assert_eq!(info.pub_handle, 1u32.try_into().unwrap());
164 assert_eq!(*info.sid.read().unwrap(), "DTR_1234".to_string().try_into().unwrap());
165 assert_eq!(info.name, "track");
166 assert!(info.uses_e2ee);
167 }
168
169 #[test]
170 fn test_from_request_response() {
171 use proto::request_response::{Reason, Request};
172 let response = proto::RequestResponse {
173 request: Request::PublishDataTrack(proto::PublishDataTrackRequest {
174 pub_handle: 1,
175 ..Default::default()
176 })
177 .into(),
178 reason: Reason::NotAllowed.into(),
179 ..Default::default()
180 };
181
182 let event = publish_result_from_request_response(&response).expect("Expected event");
183 assert_eq!(event.handle, 1u32.try_into().unwrap());
184 assert!(matches!(event.result, Err(PublishError::NotAllowed)));
185 }
186
187 #[test]
188 fn test_publish_responses_for_sync_state() {
189 let mut first: DataTrackInfo = Faker.fake();
190 first.uses_e2ee = true;
191
192 let mut second: DataTrackInfo = Faker.fake();
193 second.uses_e2ee = false;
194
195 let publish_responses = publish_responses_for_sync_state(vec![first, second]);
196 assert_eq!(
197 publish_responses[0].info.as_ref().unwrap().encryption(),
198 proto::encryption::Type::Gcm
199 );
200 assert_eq!(
201 publish_responses[1].info.as_ref().unwrap().encryption(),
202 proto::encryption::Type::None
203 );
204 }
205}