use std::io::{Read, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream, ToSocketAddrs};
use std::thread;
use std::time::Duration;
use crate::amf::{self, Amf0Value};
use crate::chunk::{ChunkReader, ChunkWriter, Message};
use crate::error::{Error, Result};
use crate::flv::{parse_audio, parse_video, AudioTag, VideoTag};
use crate::message::*;
const SERVER_CHUNK_SIZE: u32 = 4096;
const WINDOW_ACK_SIZE: u32 = 5_000_000;
const PEER_BW_LIMIT_DYNAMIC: u8 = 2;
pub struct RtmpServer {
listener: TcpListener,
}
impl RtmpServer {
pub fn bind(addr: impl ToSocketAddrs) -> Result<Self> {
let listener = TcpListener::bind(addr)?;
Ok(Self { listener })
}
pub fn local_addr(&self) -> Result<SocketAddr> {
Ok(self.listener.local_addr()?)
}
pub fn accept(&self) -> Result<PublishRequest> {
loop {
let (stream, peer_addr) = self.listener.accept()?;
match drive_until_publish(stream, peer_addr) {
Ok(req) => return Ok(req),
Err(e) => {
eprintln!("oxideav-rtmp: dropped connection from {peer_addr}: {e}");
}
}
}
}
pub fn serve<F>(&self, handler: F) -> Result<()>
where
F: Fn(PublishRequest) + Send + Sync + 'static,
{
use std::sync::Arc;
let handler = Arc::new(handler);
for conn in self.listener.incoming() {
let stream = match conn {
Ok(s) => s,
Err(e) => {
eprintln!("oxideav-rtmp: accept failed: {e}");
continue;
}
};
let peer_addr = match stream.peer_addr() {
Ok(a) => a,
Err(_) => continue,
};
let h = handler.clone();
thread::Builder::new()
.name(format!("oxideav-rtmp-session-{peer_addr}"))
.spawn(move || match drive_until_publish(stream, peer_addr) {
Ok(req) => h(req),
Err(e) => {
eprintln!("oxideav-rtmp: dropped connection from {peer_addr}: {e}");
}
})
.map_err(|e| Error::Other(format!("spawn session thread: {e}")))?;
}
Ok(())
}
}
pub struct PublishRequest {
pub app: String,
pub stream_name: String,
pub publish_type: String,
pub peer_addr: SocketAddr,
pub tc_url: String,
pending: PendingSession,
}
struct PendingSession {
stream: TcpStream,
reader: ChunkReader<TcpStream>,
writer: ChunkWriter<TcpStream>,
stream_id: u32,
#[allow(dead_code)]
publish_tx_id: f64,
}
impl PublishRequest {
pub fn accept(self) -> Result<RtmpSession> {
let PublishRequest {
app,
stream_name,
publish_type,
peer_addr,
tc_url: _,
pending,
} = self;
let PendingSession {
stream,
reader,
mut writer,
stream_id,
publish_tx_id: _,
} = pending;
writer.write_message(
CSID_PROTOCOL_CONTROL,
&build_user_control_stream_begin(stream_id),
)?;
writer.write_message(
CSID_COMMAND,
&build_on_status(
stream_id,
"status",
"NetStream.Publish.Start",
&format!("Started publishing {stream_name}"),
),
)?;
writer.flush()?;
Ok(RtmpSession {
stream,
reader,
writer,
app,
stream_name,
publish_type,
peer_addr,
stream_id,
ended: false,
})
}
pub fn reject(self, reason: &str) -> Result<()> {
let PublishRequest { pending, .. } = self;
let PendingSession {
stream,
mut writer,
stream_id,
..
} = pending;
let _ = writer.write_message(
CSID_COMMAND,
&build_on_status(stream_id, "error", "NetStream.Publish.BadName", reason),
);
let _ = writer.flush();
let _ = stream.shutdown(Shutdown::Both);
Err(Error::Rejected(reason.to_string()))
}
}
pub struct RtmpSession {
stream: TcpStream,
reader: ChunkReader<TcpStream>,
writer: ChunkWriter<TcpStream>,
app: String,
stream_name: String,
publish_type: String,
peer_addr: SocketAddr,
stream_id: u32,
ended: bool,
}
#[derive(Debug, Clone)]
pub enum StreamPacket {
Audio {
timestamp: u32,
tag: AudioTag,
},
Video {
timestamp: u32,
tag: VideoTag,
},
Metadata(Amf0Value),
}
impl RtmpSession {
pub fn app(&self) -> &str {
&self.app
}
pub fn stream_name(&self) -> &str {
&self.stream_name
}
pub fn publish_type(&self) -> &str {
&self.publish_type
}
pub fn peer_addr(&self) -> SocketAddr {
self.peer_addr
}
pub fn set_read_timeout(&self, d: Option<Duration>) -> Result<()> {
self.stream.set_read_timeout(d)?;
Ok(())
}
pub fn close(mut self) -> Result<()> {
let _ = self.writer.write_message(
CSID_COMMAND,
&build_on_status(
self.stream_id,
"status",
"NetStream.Unpublish.Success",
"Stream closed.",
),
);
let _ = self.writer.flush();
let _ = self.stream.shutdown(Shutdown::Both);
Ok(())
}
pub fn next_packet(&mut self) -> Result<Option<StreamPacket>> {
while !self.ended {
let msg = match self.reader.read_message() {
Ok(m) => m,
Err(Error::Io(e))
if matches!(
e.kind(),
std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionReset
) =>
{
return Ok(None);
}
Err(e) => return Err(e),
};
match msg.msg_type_id {
MSG_AUDIO => {
let tag = parse_audio(&msg.payload)?;
return Ok(Some(StreamPacket::Audio {
timestamp: msg.timestamp,
tag,
}));
}
MSG_VIDEO => {
let tag = parse_video(&msg.payload)?;
return Ok(Some(StreamPacket::Video {
timestamp: msg.timestamp,
tag,
}));
}
MSG_DATA_AMF0 => {
let values = amf::decode_all(&msg.payload)?;
let meta = values
.iter()
.rev()
.find(|v| matches!(v, Amf0Value::Object(_) | Amf0Value::EcmaArray(_)))
.cloned();
if let Some(m) = meta {
return Ok(Some(StreamPacket::Metadata(m)));
}
}
MSG_COMMAND_AMF0 => {
let values = amf::decode_all(&msg.payload)?;
if let Some(name) = values.first().and_then(Amf0Value::as_str) {
if matches!(name, "closeStream" | "deleteStream" | "FCUnpublish") {
self.ended = true;
return Ok(None);
}
}
}
MSG_SET_CHUNK_SIZE => {
let size = read_u32_be(&msg.payload)? & 0x7FFF_FFFF;
self.reader.set_chunk_size(size as usize);
}
MSG_ACK | MSG_USER_CONTROL | MSG_WINDOW_ACK_SIZE | MSG_SET_PEER_BANDWIDTH => {
}
_ => {
}
}
}
Ok(None)
}
}
fn drive_until_publish(stream: TcpStream, peer_addr: SocketAddr) -> Result<PublishRequest> {
let _ = stream.set_nodelay(true);
let mut hs_stream = stream.try_clone()?;
crate::handshake::server_handshake(&mut hs_stream)?;
let reader_stream = stream.try_clone()?;
let writer_stream = stream.try_clone()?;
let mut reader = ChunkReader::new(reader_stream);
let mut writer = ChunkWriter::new(writer_stream);
let tc_url;
let app;
loop {
let msg = reader.read_message()?;
match msg.msg_type_id {
MSG_SET_CHUNK_SIZE => {
let size = read_u32_be(&msg.payload)? & 0x7FFF_FFFF;
reader.set_chunk_size(size as usize);
}
MSG_COMMAND_AMF0 => {
let values = amf::decode_all(&msg.payload)?;
let name = values
.first()
.and_then(Amf0Value::as_str)
.ok_or_else(|| Error::InvalidCommand("missing command name".into()))?;
if name != "connect" {
return Err(Error::InvalidCommand(format!(
"expected `connect` first, got `{name}`"
)));
}
let tx_id = values.get(1).and_then(Amf0Value::as_f64).unwrap_or(1.0);
let cmd_obj = values.get(2).ok_or_else(|| {
Error::InvalidCommand("`connect` missing command object".into())
})?;
tc_url = cmd_obj
.get("tcUrl")
.and_then(Amf0Value::as_str)
.unwrap_or("")
.to_owned();
app = cmd_obj
.get("app")
.and_then(Amf0Value::as_str)
.unwrap_or("")
.to_owned();
writer.write_message(
CSID_PROTOCOL_CONTROL,
&build_window_ack_size(WINDOW_ACK_SIZE),
)?;
writer.write_message(
CSID_PROTOCOL_CONTROL,
&build_set_peer_bandwidth(WINDOW_ACK_SIZE, PEER_BW_LIMIT_DYNAMIC),
)?;
writer.write_message(CSID_PROTOCOL_CONTROL, &build_user_control_stream_begin(0))?;
writer.write_message(CSID_COMMAND, &build_connect_result(tx_id))?;
writer.write_message(
CSID_PROTOCOL_CONTROL,
&build_set_chunk_size(SERVER_CHUNK_SIZE),
)?;
writer.set_chunk_size(SERVER_CHUNK_SIZE as usize);
writer.flush()?;
break;
}
_ => {
}
}
}
let mut next_stream_id: u32 = 1;
loop {
let msg = reader.read_message()?;
match msg.msg_type_id {
MSG_SET_CHUNK_SIZE => {
let size = read_u32_be(&msg.payload)? & 0x7FFF_FFFF;
reader.set_chunk_size(size as usize);
continue;
}
MSG_COMMAND_AMF0 => {
let values = amf::decode_all(&msg.payload)?;
let name = values
.first()
.and_then(Amf0Value::as_str)
.ok_or_else(|| Error::InvalidCommand("missing command name".into()))?
.to_owned();
let tx_id = values.get(1).and_then(Amf0Value::as_f64).unwrap_or(0.0);
match name.as_str() {
"releaseStream" | "FCPublish" => {
let payload = amf::encode_command(
"_result",
tx_id,
Amf0Value::Null,
&[Amf0Value::Undefined],
);
let reply = Message {
msg_type_id: MSG_COMMAND_AMF0,
msg_stream_id: 0,
timestamp: 0,
payload,
};
writer.write_message(CSID_COMMAND, &reply)?;
writer.flush()?;
}
"createStream" => {
let sid = next_stream_id;
next_stream_id += 1;
writer.write_message(
CSID_COMMAND,
&build_create_stream_result(tx_id, sid as f64),
)?;
writer.flush()?;
}
"publish" => {
let stream_name = values
.get(3)
.and_then(Amf0Value::as_str)
.ok_or_else(|| {
Error::InvalidCommand("publish missing stream_name".into())
})?
.to_owned();
let publish_type = values
.get(4)
.and_then(Amf0Value::as_str)
.unwrap_or("live")
.to_owned();
return Ok(PublishRequest {
app,
stream_name,
publish_type,
peer_addr,
tc_url,
pending: PendingSession {
stream,
reader,
writer,
stream_id: msg.msg_stream_id.max(1),
publish_tx_id: tx_id,
},
});
}
_ => {
}
}
}
_ => {
}
}
}
}
fn read_u32_be(buf: &[u8]) -> Result<u32> {
if buf.len() < 4 {
return Err(Error::ProtocolViolation("need 4 bytes for u32be".into()));
}
Ok(u32::from_be_bytes([buf[0], buf[1], buf[2], buf[3]]))
}
#[allow(dead_code)]
fn flush_writer<W: Write>(w: &mut W) -> Result<()> {
w.flush()?;
Ok(())
}
#[allow(dead_code)]
fn read_exact<R: Read>(r: &mut R, n: usize) -> Result<Vec<u8>> {
let mut buf = vec![0u8; n];
r.read_exact(&mut buf)?;
Ok(buf)
}