livekit_datatrack/remote/
proto.rs1use super::events::*;
16use crate::{
17 api::{DataTrackInfo, DataTrackSid, InternalError},
18 packet::Handle,
19};
20use livekit_protocol as proto;
21use std::{collections::HashMap, mem};
22
23impl TryFrom<proto::DataTrackSubscriberHandles> for SfuSubscriberHandles {
26 type Error = InternalError;
27
28 fn try_from(msg: proto::DataTrackSubscriberHandles) -> Result<Self, Self::Error> {
29 let mapping = msg
30 .sub_handles
31 .into_iter()
32 .map(|(handle, info)| -> Result<_, InternalError> {
33 let handle: Handle = handle.try_into().map_err(anyhow::Error::from)?;
34 let sid: DataTrackSid = info.track_sid.try_into().map_err(anyhow::Error::from)?;
35 Ok((handle, sid))
36 })
37 .collect::<Result<HashMap<Handle, DataTrackSid>, _>>()?;
38 Ok(SfuSubscriberHandles { mapping })
39 }
40}
41
42pub fn event_from_join(
48 msg: &mut proto::JoinResponse,
49) -> Result<SfuPublicationUpdates, InternalError> {
50 event_from_participant_info(&mut msg.other_participants, None)
51}
52
53pub fn event_from_participant_update(
59 msg: &mut proto::ParticipantUpdate,
60 local_participant_identity: &str,
61) -> Result<SfuPublicationUpdates, InternalError> {
62 event_from_participant_info(&mut msg.participants, local_participant_identity.into())
64}
65
66fn event_from_participant_info(
67 msg: &mut [proto::ParticipantInfo],
68 local_participant_identity: Option<&str>,
69) -> Result<SfuPublicationUpdates, InternalError> {
70 let updates = msg
71 .iter_mut()
72 .filter(|participant| {
73 local_participant_identity.is_none_or(|identity| participant.identity != identity)
74 })
75 .map(|participant| -> Result<_, InternalError> {
76 Ok((participant.identity.clone(), extract_track_info(participant)?))
77 })
78 .collect::<Result<HashMap<String, Vec<DataTrackInfo>>, _>>()?;
79 Ok(SfuPublicationUpdates { updates })
80}
81
82fn extract_track_info(
83 msg: &mut proto::ParticipantInfo,
84) -> Result<Vec<DataTrackInfo>, InternalError> {
85 mem::take(&mut msg.data_tracks)
86 .into_iter()
87 .map(TryInto::<DataTrackInfo>::try_into)
88 .collect::<Result<Vec<_>, InternalError>>()
89}
90
91impl From<SfuUpdateSubscription> for proto::UpdateDataSubscription {
94 fn from(event: SfuUpdateSubscription) -> Self {
95 let update = proto::update_data_subscription::Update {
96 track_sid: event.sid.into(),
97 subscribe: event.subscribe,
98 options: Default::default(),
99 };
100 Self { updates: vec![update] }
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107
108 #[test]
109 fn test_from_subscriber_handles() {
110 let sub_handles = [
111 (
112 1,
113 proto::data_track_subscriber_handles::PublishedDataTrack {
114 track_sid: "DTR_1234".into(),
115 ..Default::default()
116 },
117 ),
118 (
119 2,
120 proto::data_track_subscriber_handles::PublishedDataTrack {
121 track_sid: "DTR_4567".into(),
122 ..Default::default()
123 },
124 ),
125 ];
126 let subscriber_handles =
127 proto::DataTrackSubscriberHandles { sub_handles: HashMap::from(sub_handles) };
128
129 let event: SfuSubscriberHandles = subscriber_handles.try_into().unwrap();
130 assert_eq!(
131 event.mapping.get(&1u32.try_into().unwrap()).unwrap(),
132 &"DTR_1234".to_string().try_into().unwrap()
133 );
134 assert_eq!(
135 event.mapping.get(&2u32.try_into().unwrap()).unwrap(),
136 &"DTR_4567".to_string().try_into().unwrap()
137 );
138 }
139
140 #[test]
141 fn test_extract_track_info() {
142 let data_tracks = vec![proto::DataTrackInfo {
143 pub_handle: 1,
144 sid: "DTR_1234".into(),
145 name: "track1".into(),
146 encryption: proto::encryption::Type::Gcm.into(),
147 }];
148 let mut participant_info = proto::ParticipantInfo { data_tracks, ..Default::default() };
149
150 let track_info = extract_track_info(&mut participant_info).unwrap();
151 assert!(participant_info.data_tracks.is_empty(), "Expected original vec taken");
152 assert_eq!(track_info.len(), 1);
153
154 let first = track_info.first().unwrap();
155 assert_eq!(first.pub_handle, 1u32.try_into().unwrap());
156 assert_eq!(first.name, "track1");
157 assert_eq!(*first.sid.read().unwrap(), "DTR_1234".to_string().try_into().unwrap());
158 }
159}