use super::events::*;
use crate::{
api::{DataTrackInfo, DataTrackSid, InternalError},
packet::Handle,
};
use livekit_protocol as proto;
use std::{collections::HashMap, mem};
impl TryFrom<proto::DataTrackSubscriberHandles> for SfuSubscriberHandles {
type Error = InternalError;
fn try_from(msg: proto::DataTrackSubscriberHandles) -> Result<Self, Self::Error> {
let mapping = msg
.sub_handles
.into_iter()
.map(|(handle, info)| -> Result<_, InternalError> {
let handle: Handle = handle.try_into().map_err(anyhow::Error::from)?;
let sid: DataTrackSid = info.track_sid.try_into().map_err(anyhow::Error::from)?;
Ok((handle, sid))
})
.collect::<Result<HashMap<Handle, DataTrackSid>, _>>()?;
Ok(SfuSubscriberHandles { mapping })
}
}
pub fn event_from_join(
msg: &mut proto::JoinResponse,
) -> Result<SfuPublicationUpdates, InternalError> {
event_from_participant_info(&mut msg.other_participants, None)
}
pub fn event_from_participant_update(
msg: &mut proto::ParticipantUpdate,
local_participant_identity: &str,
) -> Result<SfuPublicationUpdates, InternalError> {
event_from_participant_info(&mut msg.participants, local_participant_identity.into())
}
fn event_from_participant_info(
msg: &mut [proto::ParticipantInfo],
local_participant_identity: Option<&str>,
) -> Result<SfuPublicationUpdates, InternalError> {
let updates = msg
.iter_mut()
.filter(|participant| {
local_participant_identity.is_none_or(|identity| participant.identity != identity)
})
.map(|participant| -> Result<_, InternalError> {
Ok((participant.identity.clone(), extract_track_info(participant)?))
})
.collect::<Result<HashMap<String, Vec<DataTrackInfo>>, _>>()?;
Ok(SfuPublicationUpdates { updates })
}
fn extract_track_info(
msg: &mut proto::ParticipantInfo,
) -> Result<Vec<DataTrackInfo>, InternalError> {
mem::take(&mut msg.data_tracks)
.into_iter()
.map(TryInto::<DataTrackInfo>::try_into)
.collect::<Result<Vec<_>, InternalError>>()
}
impl From<SfuUpdateSubscription> for proto::UpdateDataSubscription {
fn from(event: SfuUpdateSubscription) -> Self {
let update = proto::update_data_subscription::Update {
track_sid: event.sid.into(),
subscribe: event.subscribe,
options: Default::default(),
};
Self { updates: vec![update] }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_from_subscriber_handles() {
let sub_handles = [
(
1,
proto::data_track_subscriber_handles::PublishedDataTrack {
track_sid: "DTR_1234".into(),
..Default::default()
},
),
(
2,
proto::data_track_subscriber_handles::PublishedDataTrack {
track_sid: "DTR_4567".into(),
..Default::default()
},
),
];
let subscriber_handles =
proto::DataTrackSubscriberHandles { sub_handles: HashMap::from(sub_handles) };
let event: SfuSubscriberHandles = subscriber_handles.try_into().unwrap();
assert_eq!(
event.mapping.get(&1u32.try_into().unwrap()).unwrap(),
&"DTR_1234".to_string().try_into().unwrap()
);
assert_eq!(
event.mapping.get(&2u32.try_into().unwrap()).unwrap(),
&"DTR_4567".to_string().try_into().unwrap()
);
}
#[test]
fn test_extract_track_info() {
let data_tracks = vec![proto::DataTrackInfo {
pub_handle: 1,
sid: "DTR_1234".into(),
name: "track1".into(),
encryption: proto::encryption::Type::Gcm.into(),
}];
let mut participant_info = proto::ParticipantInfo { data_tracks, ..Default::default() };
let track_info = extract_track_info(&mut participant_info).unwrap();
assert!(participant_info.data_tracks.is_empty(), "Expected original vec taken");
assert_eq!(track_info.len(), 1);
let first = track_info.first().unwrap();
assert_eq!(first.pub_handle, 1u32.try_into().unwrap());
assert_eq!(first.name, "track1");
assert_eq!(*first.sid.read().unwrap(), "DTR_1234".to_string().try_into().unwrap());
}
}