use crate::peer_connection::event::RTCEventInternal;
use crate::peer_connection::message::internal::{
DTLSMessage, RTCMessageInternal, RTPMessage, STUNMessage, TaggedRTCMessageInternal,
};
use crate::statistics::accumulator::RTCStatsAccumulator;
use log::{debug, error};
use shared::error::Error;
use std::collections::VecDeque;
use std::time::Instant;
fn match_range(lower: u8, upper: u8, buf: &[u8]) -> bool {
if buf.is_empty() {
return false;
}
let b = buf[0];
b >= lower && b <= upper
}
fn match_dtls(b: &[u8]) -> bool {
match_range(20, 63, b)
}
fn match_srtp(b: &[u8]) -> bool {
match_range(128, 191, b)
}
#[derive(Default)]
pub(crate) struct DemuxerHandlerContext {
pub(crate) read_outs: VecDeque<TaggedRTCMessageInternal>,
pub(crate) write_outs: VecDeque<TaggedRTCMessageInternal>,
pub(crate) event_outs: VecDeque<RTCEventInternal>,
}
pub(crate) struct DemuxerHandler<'a> {
ctx: &'a mut DemuxerHandlerContext,
stats: &'a mut RTCStatsAccumulator,
}
impl<'a> DemuxerHandler<'a> {
pub(crate) fn new(
ctx: &'a mut DemuxerHandlerContext,
stats: &'a mut RTCStatsAccumulator,
) -> Self {
DemuxerHandler { ctx, stats }
}
pub(crate) fn name(&self) -> &'static str {
"DemuxerHandler"
}
}
impl<'a> sansio::Protocol<TaggedRTCMessageInternal, TaggedRTCMessageInternal, RTCEventInternal>
for DemuxerHandler<'a>
{
type Rout = TaggedRTCMessageInternal;
type Wout = TaggedRTCMessageInternal;
type Eout = RTCEventInternal;
type Error = Error;
type Time = Instant;
fn handle_read(&mut self, msg: TaggedRTCMessageInternal) -> Result<(), Self::Error> {
if let RTCMessageInternal::Raw(message) = msg.message {
self.stats.transport.on_packet_received(message.len());
if message.is_empty() {
error!("drop invalid packet due to zero length");
} else if match_dtls(&message) {
self.ctx.read_outs.push_back(TaggedRTCMessageInternal {
now: msg.now,
transport: msg.transport,
message: RTCMessageInternal::Dtls(DTLSMessage::Raw(message)),
});
} else if match_srtp(&message) {
self.ctx.read_outs.push_back(TaggedRTCMessageInternal {
now: msg.now,
transport: msg.transport,
message: RTCMessageInternal::Rtp(RTPMessage::Raw(message)),
});
} else {
self.ctx.read_outs.push_back(TaggedRTCMessageInternal {
now: msg.now,
transport: msg.transport,
message: RTCMessageInternal::Stun(STUNMessage::Raw(message)),
});
}
} else {
debug!("drop non-RAW packet {:?}", msg.message);
}
Ok(())
}
fn poll_read(&mut self) -> Option<Self::Rout> {
self.ctx.read_outs.pop_front()
}
fn handle_write(&mut self, msg: TaggedRTCMessageInternal) -> Result<(), Self::Error> {
match msg.message {
RTCMessageInternal::Raw(message)
| RTCMessageInternal::Stun(STUNMessage::Raw(message))
| RTCMessageInternal::Dtls(DTLSMessage::Raw(message))
| RTCMessageInternal::Rtp(RTPMessage::Raw(message)) => {
self.stats.transport.on_packet_sent(message.len());
self.ctx.write_outs.push_back(TaggedRTCMessageInternal {
now: msg.now,
transport: msg.transport,
message: RTCMessageInternal::Raw(message),
});
}
_ => {
debug!("drop non-RAW packet {:?}", msg.message);
}
}
Ok(())
}
fn poll_write(&mut self) -> Option<Self::Wout> {
self.ctx.write_outs.pop_front()
}
fn handle_event(&mut self, evt: RTCEventInternal) -> shared::error::Result<()> {
self.ctx.event_outs.push_back(evt);
Ok(())
}
fn poll_event(&mut self) -> Option<Self::Eout> {
self.ctx.event_outs.pop_front()
}
fn handle_timeout(&mut self, _now: Instant) -> Result<(), Self::Error> {
Ok(())
}
fn poll_timeout(&mut self) -> Option<Instant> {
None
}
fn close(&mut self) -> Result<(), Self::Error> {
Ok(())
}
}