mod egress;
mod handshake;
mod packet;
pub use egress::SrtCaller;
pub use handshake::{HandshakeType, SrtHandshake};
pub use packet::{ControlType, SrtPacket};
pub use crate::protocol::tsdemux::{TsDemuxer, TsPayload, TsTrackKind};
use crate::inbound::{InboundProtocol, IngestContext};
use crate::{CodecId, MediaFrame, Result, StreamKey};
use async_trait::async_trait;
use std::net::SocketAddr;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
#[derive(Debug, Clone)]
pub struct SrtHandler {
bind: SocketAddr,
key: StreamKey,
}
impl SrtHandler {
pub fn new(bind: SocketAddr, key: StreamKey) -> Self {
Self { bind, key }
}
}
#[async_trait]
impl InboundProtocol for SrtHandler {
fn name(&self) -> &'static str {
"srt"
}
async fn serve(&self, ctx: IngestContext, shutdown: CancellationToken) -> Result<()> {
use tokio::net::UdpSocket;
let socket = UdpSocket::bind(self.bind).await?;
info!(bind = %self.bind, "srt listener bound");
let mut buf = vec![0u8; 1500];
let mut peer: Option<SocketAddr> = None;
let mut session: Option<crate::inbound::PublishSession> = None;
let mut demux = TsDemuxer::new();
loop {
let (n, from) = tokio::select! {
_ = shutdown.cancelled() => break,
r = socket.recv_from(&mut buf) => match r {
Ok(v) => v,
Err(e) => { warn!(error = %e, "srt recv failed"); continue; }
}
};
let datagram = &buf[..n];
let Some(pkt) = SrtPacket::parse(datagram) else {
continue;
};
match pkt {
SrtPacket::Control { control_type, .. } => {
if control_type == ControlType::Handshake {
if let Some(reply) = handshake::respond(datagram) {
let _ = socket.send_to(&reply, from).await;
peer = Some(from);
debug!(%from, "srt handshake answered");
}
}
}
SrtPacket::Data { payload_offset, .. } => {
if peer != Some(from) {
continue; }
if session.is_none() {
session = Some(ctx.open_publish(self.key.clone()).await?);
}
let sess = session.as_ref().unwrap();
for au in demux.push(&datagram[payload_offset..]) {
if au.codec == CodecId::Unknown {
continue;
}
let pts = au.pts_ms;
let mut frame = match au.kind {
TsTrackKind::Video => {
MediaFrame::new_video(pts, pts, au.data, au.codec, au.keyframe)
}
TsTrackKind::Audio => MediaFrame::new_audio(pts, au.data, au.codec),
};
if au.is_config {
frame.flags |= crate::FrameFlags::CONFIG;
}
let _ = sess.publish_frame(frame)?;
}
}
}
}
if let Some(sess) = session {
sess.finish().await?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn handler_reports_name_and_key() {
let h = SrtHandler::new(
"127.0.0.1:9000".parse().unwrap(),
StreamKey::new("live", "feed"),
);
assert_eq!(h.name(), "srt");
assert_eq!(h.key.stream_id.as_str(), "feed");
}
}