wfb-rs 0.1.23

Rust WFB-style command-line tools built on openipc-rs
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream, UdpSocket};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    if let Err(err) = run() {
        eprintln!("error: {err}");
        std::process::exit(1);
    }
}

fn run() -> Result<(), Box<dyn std::error::Error>> {
    let config = Config::parse(std::env::args().skip(1))?;
    let target = Arc::new(Mutex::new(None::<SocketAddr>));
    spawn_rtp_forwarder(config.rtp_port, target.clone())?;

    let listener = TcpListener::bind(("0.0.0.0", config.rtsp_port))?;
    eprintln!(
        "H{} stream ready at rtsp://127.0.0.1:{}{}; RTP input UDP port {}",
        config.codec.h_number(),
        config.rtsp_port,
        config.uri,
        config.rtp_port
    );

    for stream in listener.incoming() {
        match stream {
            Ok(stream) => {
                let session = RtspSession {
                    config: config.clone(),
                    target: target.clone(),
                };
                thread::spawn(move || {
                    if let Err(err) = session.handle(stream) {
                        eprintln!("RTSP client failed: {err}");
                    }
                });
            }
            Err(err) => eprintln!("RTSP accept failed: {err}"),
        }
    }
    Ok(())
}

#[derive(Debug, Clone)]
struct Config {
    mtu: usize,
    uri: String,
    rtsp_port: u16,
    rtp_port: u16,
    latency_ms: u64,
    codec: Codec,
}

impl Config {
    fn parse(args: impl Iterator<Item = String>) -> Result<Self, Box<dyn std::error::Error>> {
        let mut config = Self {
            mtu: 1400,
            uri: "/wfb".to_owned(),
            rtsp_port: 8554,
            rtp_port: 5600,
            latency_ms: 0,
            codec: Codec::H264,
        };
        let mut codec = None;
        let mut args = args.peekable();
        while let Some(arg) = args.next() {
            match arg.as_str() {
                "-h" | "--help" => {
                    print_help();
                    std::process::exit(0);
                }
                "-m" => config.mtu = next_arg(&mut args, "-m")?.parse()?,
                "-u" => {
                    let uri = next_arg(&mut args, "-u")?;
                    if !uri.starts_with('/') {
                        return Err("RTSP URI must start with '/'".into());
                    }
                    config.uri = uri;
                }
                "-p" => config.rtsp_port = next_arg(&mut args, "-p")?.parse()?,
                "-P" => config.rtp_port = next_arg(&mut args, "-P")?.parse()?,
                "-l" => config.latency_ms = next_arg(&mut args, "-l")?.parse()?,
                "h264" => codec = Some(Codec::H264),
                "h265" => codec = Some(Codec::H265),
                _ => return Err(format!("unknown option or codec: {arg}").into()),
            }
        }
        config.codec = codec.ok_or("missing codec: expected h264 or h265")?;
        Ok(config)
    }
}

#[derive(Debug, Clone, Copy)]
enum Codec {
    H264,
    H265,
}

impl Codec {
    const fn h_number(self) -> u16 {
        match self {
            Self::H264 => 264,
            Self::H265 => 265,
        }
    }

    const fn rtpmap(self) -> &'static str {
        match self {
            Self::H264 => "H264/90000",
            Self::H265 => "H265/90000",
        }
    }
}

struct RtspSession {
    config: Config,
    target: Arc<Mutex<Option<SocketAddr>>>,
}

impl RtspSession {
    fn handle(self, mut stream: TcpStream) -> Result<(), Box<dyn std::error::Error>> {
        stream.set_read_timeout(Some(Duration::from_secs(60)))?;
        let peer = stream.peer_addr()?;
        let mut read_buf = Vec::<u8>::with_capacity(4096);
        let mut scratch = [0u8; 1024];

        loop {
            let amount = stream.read(&mut scratch)?;
            if amount == 0 {
                clear_target(&self.target, peer);
                return Ok(());
            }
            read_buf.extend_from_slice(&scratch[..amount]);

            while let Some(end) = header_end(&read_buf) {
                let request = String::from_utf8_lossy(&read_buf[..end]).into_owned();
                read_buf.drain(..end + 4);
                self.respond(&mut stream, peer, &request)?;
            }
        }
    }

    fn respond(
        &self,
        stream: &mut TcpStream,
        peer: SocketAddr,
        request: &str,
    ) -> Result<(), Box<dyn std::error::Error>> {
        let first = request.lines().next().unwrap_or_default();
        let method = first.split_whitespace().next().unwrap_or_default();
        let cseq = header_value(request, "CSeq").unwrap_or("1");

        match method {
            "OPTIONS" => write_response(
                stream,
                cseq,
                "200 OK",
                &[("Public", "OPTIONS, DESCRIBE, SETUP, PLAY, TEARDOWN")],
                "",
            )?,
            "DESCRIBE" => {
                let sdp = self.sdp();
                write_response(
                    stream,
                    cseq,
                    "200 OK",
                    &[("Content-Type", "application/sdp")],
                    &sdp,
                )?;
            }
            "SETUP" => {
                let transport = header_value(request, "Transport").unwrap_or_default();
                let client_rtp_port = parse_client_rtp_port(transport)
                    .ok_or("SETUP Transport missing client_port")?;
                let target = SocketAddr::new(peer.ip(), client_rtp_port);
                *self.target.lock().expect("target mutex poisoned") = Some(target);
                let transport_response = format!(
                    "RTP/AVP;unicast;client_port={}-{};server_port={}-{};ssrc=4f504950",
                    client_rtp_port,
                    client_rtp_port + 1,
                    self.config.rtp_port,
                    self.config.rtp_port + 1
                );
                write_response(
                    stream,
                    cseq,
                    "200 OK",
                    &[
                        ("Transport", transport_response.as_str()),
                        ("Session", "openipc-rs"),
                    ],
                    "",
                )?;
            }
            "PLAY" => write_response(
                stream,
                cseq,
                "200 OK",
                &[("Session", "openipc-rs"), ("RTP-Info", "url=trackID=0")],
                "",
            )?,
            "TEARDOWN" => {
                clear_target(&self.target, peer);
                write_response(stream, cseq, "200 OK", &[("Session", "openipc-rs")], "")?;
            }
            _ => write_response(stream, cseq, "405 Method Not Allowed", &[], "")?,
        }
        Ok(())
    }

    fn sdp(&self) -> String {
        format!(
            "v=0\r\n\
             o=- 0 0 IN IP4 127.0.0.1\r\n\
             s=OpenIPC WFB H{}\r\n\
             c=IN IP4 0.0.0.0\r\n\
             t=0 0\r\n\
             a=control:*\r\n\
             m=video 0 RTP/AVP 96\r\n\
             a=rtpmap:96 {}\r\n\
             a=control:trackID=0\r\n",
            self.config.codec.h_number(),
            self.config.codec.rtpmap()
        )
    }
}

fn spawn_rtp_forwarder(
    input_port: u16,
    target: Arc<Mutex<Option<SocketAddr>>>,
) -> std::io::Result<()> {
    let socket = UdpSocket::bind(("0.0.0.0", input_port))?;
    socket.set_read_timeout(Some(Duration::from_millis(500)))?;
    thread::spawn(move || {
        let mut buf = vec![0u8; 2048];
        loop {
            let amount = match socket.recv(&mut buf) {
                Ok(amount) => amount,
                Err(err)
                    if matches!(
                        err.kind(),
                        std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
                    ) =>
                {
                    continue;
                }
                Err(err) => {
                    eprintln!("RTP input socket failed: {err}");
                    return;
                }
            };
            let Some(dest) = *target.lock().expect("target mutex poisoned") else {
                continue;
            };
            let _ = socket.send_to(&buf[..amount], dest);
        }
    });
    Ok(())
}

fn write_response(
    stream: &mut TcpStream,
    cseq: &str,
    status: &str,
    headers: &[(&str, &str)],
    body: &str,
) -> std::io::Result<()> {
    write!(stream, "RTSP/1.0 {status}\r\nCSeq: {cseq}\r\n")?;
    for (name, value) in headers {
        write!(stream, "{name}: {value}\r\n")?;
    }
    if body.is_empty() {
        write!(stream, "Content-Length: 0\r\n\r\n")?;
    } else {
        write!(stream, "Content-Length: {}\r\n\r\n{}", body.len(), body)?;
    }
    stream.flush()
}

fn header_end(buf: &[u8]) -> Option<usize> {
    buf.windows(4).position(|window| window == b"\r\n\r\n")
}

fn header_value<'a>(request: &'a str, name: &str) -> Option<&'a str> {
    request.lines().find_map(|line| {
        let (key, value) = line.split_once(':')?;
        if key.eq_ignore_ascii_case(name) {
            Some(value.trim())
        } else {
            None
        }
    })
}

fn parse_client_rtp_port(transport: &str) -> Option<u16> {
    for token in transport.split(';') {
        let (key, value) = token.split_once('=')?;
        if key.trim().eq_ignore_ascii_case("client_port") {
            let first = value.split('-').next()?;
            return first.trim().parse().ok();
        }
    }
    None
}

fn clear_target(target: &Arc<Mutex<Option<SocketAddr>>>, peer: SocketAddr) {
    let mut target = target.lock().expect("target mutex poisoned");
    if target.map(|addr| addr.ip() == peer.ip()).unwrap_or(false) {
        *target = None;
    }
}

fn next_arg(
    args: &mut std::iter::Peekable<impl Iterator<Item = String>>,
    option: &str,
) -> Result<String, Box<dyn std::error::Error>> {
    args.next()
        .ok_or_else(|| format!("{option} requires a value").into())
}

fn print_help() {
    println!(
        r#"wfb_rtsp

Minimal Rust RTSP/RTP UDP proxy for WFB video.

Usage:
  wfb_rtsp [-m mtu] [-u uri] [-p rtsp_port] [-P rtp_port] [-l latency] {{ h264 | h265 }}

Defaults:
  mtu=1400 uri=/wfb rtsp_port=8554 rtp_port=5600 latency=0

This receives RTP packets on UDP rtp_port and forwards them to the RTSP
client-selected RTP port after SETUP/PLAY. It does not depayload, jitter-buffer,
or repacketize like the upstream GStreamer helper."#
    );
}