use libwebrtc::{self as rtc, prelude::*};
use livekit_protocol as proto;
use tokio::sync::mpsc;
use super::peer_transport::PeerTransport;
use crate::{
rtc_engine::{
peer_transport::OnOfferCreated,
rtc_session::{LOSSY_DC_LABEL, RELIABLE_DC_LABEL},
},
DataPacketKind,
};
pub type RtcEmitter = mpsc::UnboundedSender<RtcEvent>;
pub type RtcEvents = mpsc::UnboundedReceiver<RtcEvent>;
#[derive(Debug)]
pub enum RtcEvent {
IceCandidate {
ice_candidate: IceCandidate,
target: proto::SignalTarget,
},
ConnectionChange {
state: PeerConnectionState,
target: proto::SignalTarget,
},
DataChannel {
data_channel: DataChannel,
target: proto::SignalTarget,
},
Offer {
offer: SessionDescription,
target: proto::SignalTarget,
},
Track {
streams: Vec<MediaStream>,
track: MediaStreamTrack,
transceiver: RtpTransceiver,
target: proto::SignalTarget,
},
Data {
data: Vec<u8>,
binary: bool,
kind: DataPacketKind,
},
DataChannelBufferedAmountChange {
sent: u64,
amount: u64,
kind: DataPacketKind,
},
}
fn on_connection_state_change(
target: proto::SignalTarget,
emitter: RtcEmitter,
) -> rtc::peer_connection::OnConnectionChange {
Box::new(move |state| {
let _ = emitter.send(RtcEvent::ConnectionChange { state, target });
})
}
fn on_ice_candidate(
target: proto::SignalTarget,
emitter: RtcEmitter,
) -> rtc::peer_connection::OnIceCandidate {
Box::new(move |ice_candidate| {
let _ = emitter.send(RtcEvent::IceCandidate { ice_candidate, target });
})
}
fn on_offer(target: proto::SignalTarget, emitter: RtcEmitter) -> OnOfferCreated {
Box::new(move |offer| {
let _ = emitter.send(RtcEvent::Offer { offer, target });
})
}
fn on_data_channel(
target: proto::SignalTarget,
emitter: RtcEmitter,
) -> rtc::peer_connection::OnDataChannel {
Box::new(move |data_channel| {
match data_channel.label().as_str() {
RELIABLE_DC_LABEL => {
data_channel.on_message(Some(on_message(emitter.clone(), DataPacketKind::Reliable)))
}
LOSSY_DC_LABEL => {
data_channel.on_message(Some(on_message(emitter.clone(), DataPacketKind::Lossy)))
}
_ => {}
}
let _ = emitter.send(RtcEvent::DataChannel { data_channel, target });
})
}
fn on_track(target: proto::SignalTarget, emitter: RtcEmitter) -> rtc::peer_connection::OnTrack {
Box::new(move |event| {
let _ = emitter.send(RtcEvent::Track {
streams: event.streams,
track: event.track,
transceiver: event.transceiver,
target,
});
})
}
fn on_ice_candidate_error(
_target: proto::SignalTarget,
_emitter: RtcEmitter,
) -> rtc::peer_connection::OnIceCandidateError {
Box::new(move |ice_error| {
log::debug!("{:?}", ice_error);
})
}
pub fn forward_pc_events(transport: &mut PeerTransport, rtc_emitter: RtcEmitter) {
let signal_target = transport.signal_target();
transport
.peer_connection()
.on_ice_candidate(Some(on_ice_candidate(signal_target, rtc_emitter.clone())));
transport
.peer_connection()
.on_data_channel(Some(on_data_channel(signal_target, rtc_emitter.clone())));
transport.peer_connection().on_track(Some(on_track(signal_target, rtc_emitter.clone())));
transport.peer_connection().on_connection_state_change(Some(on_connection_state_change(
signal_target,
rtc_emitter.clone(),
)));
transport
.peer_connection()
.on_ice_candidate_error(Some(on_ice_candidate_error(signal_target, rtc_emitter.clone())));
transport.on_offer(Some(on_offer(signal_target, rtc_emitter)));
}
fn on_message(emitter: RtcEmitter, kind: DataPacketKind) -> rtc::data_channel::OnMessage {
Box::new(move |buffer| {
let _ = emitter.send(RtcEvent::Data {
data: buffer.data.to_vec(),
binary: buffer.binary,
kind,
});
})
}
fn on_buffered_amount_change(
emitter: RtcEmitter,
dc: DataChannel,
kind: DataPacketKind,
) -> rtc::data_channel::OnBufferedAmountChange {
Box::new(move |sent| {
let amount = dc.buffered_amount();
let _ = emitter.send(RtcEvent::DataChannelBufferedAmountChange { sent, amount, kind });
})
}
pub fn forward_dc_events(dc: &mut DataChannel, kind: DataPacketKind, rtc_emitter: RtcEmitter) {
dc.on_message(Some(on_message(rtc_emitter.clone(), kind)));
dc.on_buffered_amount_change(Some(on_buffered_amount_change(rtc_emitter, dc.clone(), kind)));
}