use super::enhanced::{EnhancedRtmpCapabilities, FourCC};
use crate::error::{NetError, NetResult};
use std::collections::HashMap;
#[derive(Debug, Clone)]
pub struct NegotiationCapabilities {
pub video_codecs: Vec<FourCC>,
pub audio_codecs: Vec<FourCC>,
pub enhanced_rtmp: bool,
pub app: String,
pub rtmp_version: String,
pub supports_push: bool,
pub supports_reconnect: bool,
pub custom: HashMap<String, String>,
}
impl Default for NegotiationCapabilities {
fn default() -> Self {
Self {
video_codecs: vec![FourCC::AV1, FourCC::VP9, FourCC::AVC],
audio_codecs: vec![FourCC::OPUS, FourCC::FLAC, FourCC::AAC],
enhanced_rtmp: true,
app: "live".to_owned(),
rtmp_version: "oximedia/0.1.2".to_owned(),
supports_push: false,
supports_reconnect: true,
custom: HashMap::new(),
}
}
}
impl NegotiationCapabilities {
#[must_use]
pub fn patent_free() -> Self {
Self {
video_codecs: vec![FourCC::AV1, FourCC::VP9],
audio_codecs: vec![FourCC::OPUS, FourCC::FLAC],
..Self::default()
}
}
#[must_use]
pub fn fourcc_list(&self) -> String {
let mut list: Vec<String> = self
.video_codecs
.iter()
.chain(&self.audio_codecs)
.filter_map(|cc| cc.as_str().map(|s| s.to_owned()))
.collect();
list.sort();
list.dedup();
list.join(",")
}
#[must_use]
pub fn to_enhanced_caps(&self) -> EnhancedRtmpCapabilities {
EnhancedRtmpCapabilities {
video_codecs: self.video_codecs.clone(),
audio_codecs: self.audio_codecs.clone(),
enhanced_supported: self.enhanced_rtmp,
version: self.rtmp_version.clone(),
}
}
pub fn add_custom(&mut self, key: impl Into<String>, value: impl Into<String>) {
self.custom.insert(key.into(), value.into());
}
#[must_use]
pub fn supports_video(&self, codec: &FourCC) -> bool {
self.video_codecs.contains(codec)
}
#[must_use]
pub fn supports_audio(&self, codec: &FourCC) -> bool {
self.audio_codecs.contains(codec)
}
}
#[derive(Debug, Clone)]
pub struct ConnectCommand {
pub app: String,
pub flash_ver: String,
pub swf_url: Option<String>,
pub tc_url: String,
pub fpad: bool,
pub audio_codecs: u16,
pub video_codecs: u16,
pub video_function: u32,
pub object_encoding: u8,
pub fourcc_list: Option<String>,
}
impl ConnectCommand {
#[must_use]
pub fn new(app: impl Into<String>, tc_url: impl Into<String>) -> Self {
Self {
app: app.into(),
flash_ver: "LNX 9,0,124,2".to_owned(),
swf_url: None,
tc_url: tc_url.into(),
fpad: false,
audio_codecs: 0x0FFF,
video_codecs: 0x0080,
video_function: 1,
object_encoding: 0,
fourcc_list: None,
}
}
#[must_use]
pub fn enhanced(
app: impl Into<String>,
tc_url: impl Into<String>,
caps: &NegotiationCapabilities,
) -> Self {
Self {
fourcc_list: Some(caps.fourcc_list()),
..Self::new(app, tc_url)
}
}
#[must_use]
pub fn is_enhanced(&self) -> bool {
self.fourcc_list.is_some()
}
#[must_use]
pub fn parsed_fourccs(&self) -> Vec<FourCC> {
self.fourcc_list
.as_deref()
.map(|list| {
list.split(',')
.filter_map(|code| FourCC::from_str_code(code.trim()))
.collect()
})
.unwrap_or_default()
}
}
#[derive(Debug, Clone)]
pub struct NegotiatedSession {
pub video_codecs: Vec<FourCC>,
pub audio_codecs: Vec<FourCC>,
pub enhanced_mode: bool,
pub app: String,
pub peer_connect: ConnectCommand,
}
impl NegotiatedSession {
#[must_use]
pub fn can_use_video(&self, codec: &FourCC) -> bool {
self.video_codecs.contains(codec)
}
#[must_use]
pub fn can_use_audio(&self, codec: &FourCC) -> bool {
self.audio_codecs.contains(codec)
}
#[must_use]
pub fn summary(&self) -> String {
let v: Vec<&str> = self.video_codecs.iter().map(|c| c.codec_name()).collect();
let a: Vec<&str> = self.audio_codecs.iter().map(|c| c.codec_name()).collect();
format!(
"enhanced={}, video=[{}], audio=[{}]",
self.enhanced_mode,
v.join(", "),
a.join(", ")
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NegotiatorState {
Idle,
WaitingResult,
Connected,
Failed,
}
pub struct EnhancedRtmpNegotiator {
local_caps: NegotiationCapabilities,
state: NegotiatorState,
peer_caps: Option<NegotiationCapabilities>,
session: Option<NegotiatedSession>,
}
impl EnhancedRtmpNegotiator {
#[must_use]
pub fn new(local_caps: NegotiationCapabilities) -> Self {
Self {
local_caps,
state: NegotiatorState::Idle,
peer_caps: None,
session: None,
}
}
#[must_use]
pub fn patent_free() -> Self {
Self::new(NegotiationCapabilities::patent_free())
}
#[must_use]
pub fn state(&self) -> NegotiatorState {
self.state
}
#[must_use]
pub fn is_connected(&self) -> bool {
self.state == NegotiatorState::Connected
}
#[must_use]
pub fn session(&self) -> Option<&NegotiatedSession> {
self.session.as_ref()
}
pub fn build_connect_command(
&mut self,
app: impl Into<String>,
tc_url: impl Into<String>,
) -> NetResult<ConnectCommand> {
if self.state != NegotiatorState::Idle {
return Err(NetError::invalid_state(
"build_connect_command called in non-Idle state",
));
}
let cmd = ConnectCommand::enhanced(app, tc_url, &self.local_caps);
self.state = NegotiatorState::WaitingResult;
Ok(cmd)
}
pub fn process_server_result(
&mut self,
server_connect: &ConnectCommand,
) -> NetResult<&NegotiatedSession> {
if self.state != NegotiatorState::WaitingResult {
return Err(NetError::invalid_state(
"process_server_result called before build_connect_command",
));
}
let session = self.negotiate_session(server_connect);
self.state = NegotiatorState::Connected;
self.session = Some(session);
self.session
.as_ref()
.ok_or_else(|| NetError::invalid_state("session failed to initialise"))
}
pub fn process_client_connect(
&mut self,
client_connect: &ConnectCommand,
) -> NetResult<ConnectCommand> {
if self.state != NegotiatorState::Idle {
return Err(NetError::invalid_state(
"process_client_connect called in non-Idle state",
));
}
let session = self.negotiate_session(client_connect);
self.state = NegotiatorState::Connected;
self.session = Some(session);
let result_cmd = ConnectCommand::enhanced(
&client_connect.app,
&client_connect.tc_url,
&self.local_caps,
);
Ok(result_cmd)
}
fn negotiate_session(&mut self, peer_cmd: &ConnectCommand) -> NegotiatedSession {
let peer_fourccs = peer_cmd.parsed_fourccs();
let peer_video: Vec<FourCC> = peer_fourccs
.iter()
.filter(|cc| matches!(**cc, FourCC::AV1 | FourCC::VP9 | FourCC::HEVC | FourCC::AVC))
.copied()
.collect();
let peer_audio: Vec<FourCC> = peer_fourccs
.iter()
.filter(|cc| matches!(**cc, FourCC::OPUS | FourCC::FLAC | FourCC::AAC))
.copied()
.collect();
let video_negotiated: Vec<FourCC> = self
.local_caps
.video_codecs
.iter()
.filter(|cc| peer_video.contains(cc))
.copied()
.collect();
let audio_negotiated: Vec<FourCC> = self
.local_caps
.audio_codecs
.iter()
.filter(|cc| peer_audio.contains(cc))
.copied()
.collect();
let enhanced_mode = peer_cmd.is_enhanced() && self.local_caps.enhanced_rtmp;
self.peer_caps = Some(NegotiationCapabilities {
video_codecs: peer_video,
audio_codecs: peer_audio,
enhanced_rtmp: enhanced_mode,
app: peer_cmd.app.clone(),
rtmp_version: peer_cmd.flash_ver.clone(),
supports_push: false,
supports_reconnect: false,
custom: HashMap::new(),
});
NegotiatedSession {
video_codecs: video_negotiated,
audio_codecs: audio_negotiated,
enhanced_mode,
app: peer_cmd.app.clone(),
peer_connect: peer_cmd.clone(),
}
}
}
impl std::fmt::Debug for EnhancedRtmpNegotiator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EnhancedRtmpNegotiator")
.field("state", &self.state)
.field("enhanced", &self.local_caps.enhanced_rtmp)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn caps() -> NegotiationCapabilities {
NegotiationCapabilities::default()
}
fn patent_free_caps() -> NegotiationCapabilities {
NegotiationCapabilities::patent_free()
}
#[test]
fn test_default_caps() {
let c = caps();
assert!(c.supports_video(&FourCC::AV1));
assert!(c.supports_audio(&FourCC::OPUS));
assert!(c.enhanced_rtmp);
}
#[test]
fn test_patent_free_caps() {
let c = patent_free_caps();
assert!(!c.supports_video(&FourCC::AVC));
assert!(!c.supports_audio(&FourCC::AAC));
}
#[test]
fn test_fourcc_list() {
let c = NegotiationCapabilities::patent_free();
let list = c.fourcc_list();
assert!(list.contains("av01"));
assert!(list.contains("vp09"));
assert!(list.contains("Opus"));
}
#[test]
fn test_add_custom() {
let mut c = caps();
c.add_custom("latency", "30ms");
assert_eq!(c.custom.get("latency").map(String::as_str), Some("30ms"));
}
#[test]
fn test_connect_command_new() {
let cmd = ConnectCommand::new("live", "rtmp://server.example.com/live");
assert_eq!(cmd.app, "live");
assert!(!cmd.is_enhanced());
assert!(cmd.parsed_fourccs().is_empty());
}
#[test]
fn test_connect_command_enhanced() {
let c = NegotiationCapabilities::patent_free();
let cmd = ConnectCommand::enhanced("live", "rtmp://s.example.com/live", &c);
assert!(cmd.is_enhanced());
let fourccs = cmd.parsed_fourccs();
assert!(!fourccs.is_empty());
}
#[test]
fn test_parsed_fourccs_round_trip() {
let c = NegotiationCapabilities::patent_free();
let cmd = ConnectCommand::enhanced("live", "rtmp://s.example.com/live", &c);
let fourccs = cmd.parsed_fourccs();
assert!(fourccs.contains(&FourCC::AV1));
assert!(fourccs.contains(&FourCC::VP9));
}
#[test]
fn test_negotiator_initial_state() {
let n = EnhancedRtmpNegotiator::new(caps());
assert_eq!(n.state(), NegotiatorState::Idle);
assert!(!n.is_connected());
}
#[test]
fn test_negotiator_patent_free() {
let n = EnhancedRtmpNegotiator::patent_free();
assert_eq!(n.state(), NegotiatorState::Idle);
}
#[test]
fn test_build_connect_command() {
let mut n = EnhancedRtmpNegotiator::new(caps());
let cmd = n
.build_connect_command("live", "rtmp://s.example.com/live")
.expect("should succeed");
assert_eq!(n.state(), NegotiatorState::WaitingResult);
assert!(cmd.is_enhanced());
}
#[test]
fn test_build_connect_command_non_idle() {
let mut n = EnhancedRtmpNegotiator::new(caps());
n.build_connect_command("live", "rtmp://s.example.com/live")
.expect("first call ok");
let result = n.build_connect_command("live", "rtmp://s.example.com/live");
assert!(result.is_err());
}
#[test]
fn test_client_side_full_flow() {
let mut client = EnhancedRtmpNegotiator::new(caps());
let connect_cmd = client
.build_connect_command("live", "rtmp://server.example.com/live")
.expect("build ok");
let server_caps = NegotiationCapabilities::patent_free();
let server_result =
ConnectCommand::enhanced(&connect_cmd.app, &connect_cmd.tc_url, &server_caps);
{
let session = client
.process_server_result(&server_result)
.expect("negotiation ok");
assert!(session.can_use_video(&FourCC::AV1));
assert!(session.can_use_video(&FourCC::VP9));
assert!(!session.can_use_video(&FourCC::AVC)); }
assert!(client.is_connected());
}
#[test]
fn test_server_side_full_flow() {
let mut server = EnhancedRtmpNegotiator::new(NegotiationCapabilities::patent_free());
let client_caps = caps();
let client_cmd =
ConnectCommand::enhanced("live", "rtmp://server.example.com/live", &client_caps);
let result_cmd = server.process_client_connect(&client_cmd).expect("ok");
assert!(server.is_connected());
assert!(result_cmd.is_enhanced());
let session = server.session().expect("should have session");
assert!(session.enhanced_mode);
}
#[test]
fn test_negotiated_session_summary() {
let mut server = EnhancedRtmpNegotiator::new(NegotiationCapabilities::patent_free());
let client_cmd = ConnectCommand::enhanced("live", "rtmp://s/live", &caps());
server.process_client_connect(&client_cmd).expect("ok");
let summary = server.session().expect("session").summary();
assert!(summary.contains("enhanced=true"));
}
#[test]
fn test_no_common_codecs() {
let local = NegotiationCapabilities {
video_codecs: vec![FourCC::AV1],
audio_codecs: vec![FourCC::OPUS],
..NegotiationCapabilities::default()
};
let mut n = EnhancedRtmpNegotiator::new(local);
let peer_cmd = ConnectCommand {
fourcc_list: Some("hvc1,mp4a".to_owned()),
..ConnectCommand::new("live", "rtmp://s/live")
};
let _ = n
.build_connect_command("live", "rtmp://s/live")
.expect("ok");
let session = n.process_server_result(&peer_cmd).expect("ok");
assert!(session.video_codecs.is_empty());
assert!(session.audio_codecs.is_empty());
}
#[test]
fn test_process_result_before_connect() {
let mut n = EnhancedRtmpNegotiator::new(caps());
let cmd = ConnectCommand::new("live", "rtmp://s/live");
let result = n.process_server_result(&cmd);
assert!(result.is_err());
}
#[test]
fn test_process_client_connect_twice() {
let mut n = EnhancedRtmpNegotiator::new(caps());
let cmd = ConnectCommand::enhanced("live", "rtmp://s/live", &caps());
n.process_client_connect(&cmd).expect("first ok");
let result = n.process_client_connect(&cmd);
assert!(result.is_err());
}
#[test]
fn test_swf_url_optional() {
let mut cmd = ConnectCommand::new("live", "rtmp://s/live");
cmd.swf_url = Some("http://player.example.com/player.swf".to_owned());
assert!(cmd.swf_url.is_some());
}
#[test]
fn test_to_enhanced_caps() {
let c = caps();
let enhanced = c.to_enhanced_caps();
assert!(enhanced.enhanced_supported);
assert!(enhanced.supports_video_codec(&FourCC::AV1));
}
#[test]
fn test_negotiator_debug() {
let n = EnhancedRtmpNegotiator::new(caps());
let debug = format!("{n:?}");
assert!(debug.contains("Idle") || debug.contains("state"));
}
#[test]
fn test_object_encoding() {
let cmd = ConnectCommand::new("live", "rtmp://s/live");
assert_eq!(cmd.object_encoding, 0);
}
#[test]
fn test_fpad_default() {
let cmd = ConnectCommand::new("live", "rtmp://s/live");
assert!(!cmd.fpad);
}
#[test]
fn test_negotiated_app() {
let mut n = EnhancedRtmpNegotiator::new(caps());
let cmd = ConnectCommand::enhanced("myapp", "rtmp://s/myapp", &caps());
n.process_client_connect(&cmd).expect("ok");
assert_eq!(n.session().expect("session").app, "myapp");
}
}