use std::io::{Read, Write};
use std::time::{SystemTime, UNIX_EPOCH};
use crate::error::{Error, Result};
pub const RTMP_VERSION: u8 = 0x03;
pub const HANDSHAKE_PAYLOAD_LEN: usize = 1536;
pub fn client_handshake<S: Read + Write>(stream: &mut S) -> Result<()> {
let mut c0c1 = [0u8; 1 + HANDSHAKE_PAYLOAD_LEN];
c0c1[0] = RTMP_VERSION;
fill_outgoing_payload(&mut c0c1[1..]);
stream.write_all(&c0c1)?;
let mut s0 = [0u8; 1];
stream.read_exact(&mut s0)?;
if s0[0] != RTMP_VERSION {
return Err(Error::UnsupportedHandshakeVersion(s0[0]));
}
let mut s1 = [0u8; HANDSHAKE_PAYLOAD_LEN];
stream.read_exact(&mut s1)?;
stream.write_all(&s1)?;
let mut s2 = [0u8; HANDSHAKE_PAYLOAD_LEN];
stream.read_exact(&mut s2)?;
Ok(())
}
pub fn server_handshake<S: Read + Write>(stream: &mut S) -> Result<()> {
let mut c0 = [0u8; 1];
stream.read_exact(&mut c0)?;
if c0[0] != RTMP_VERSION {
return Err(Error::UnsupportedHandshakeVersion(c0[0]));
}
let mut c1 = [0u8; HANDSHAKE_PAYLOAD_LEN];
stream.read_exact(&mut c1)?;
let mut s0s1s2 = [0u8; 1 + HANDSHAKE_PAYLOAD_LEN * 2];
s0s1s2[0] = RTMP_VERSION;
fill_outgoing_payload(&mut s0s1s2[1..1 + HANDSHAKE_PAYLOAD_LEN]);
s0s1s2[1 + HANDSHAKE_PAYLOAD_LEN..].copy_from_slice(&c1);
stream.write_all(&s0s1s2)?;
let mut c2 = [0u8; HANDSHAKE_PAYLOAD_LEN];
stream.read_exact(&mut c2)?;
Ok(())
}
fn fill_outgoing_payload(buf: &mut [u8]) {
assert_eq!(buf.len(), HANDSHAKE_PAYLOAD_LEN);
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs() as u32)
.unwrap_or(0);
buf[0..4].copy_from_slice(&secs.to_be_bytes());
buf[4..8].copy_from_slice(&[0u8; 4]);
let mut state: u64 = secs as u64 | ((secs as u64) << 32) | 0x9E37_79B9_7F4A_7C15;
for chunk in buf[8..].chunks_mut(8) {
state ^= state << 13;
state ^= state >> 7;
state ^= state << 17;
let bytes = state.wrapping_mul(0x2545_F491_4F6C_DD1D).to_be_bytes();
let n = chunk.len();
chunk.copy_from_slice(&bytes[..n]);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn client_handshake_completes_against_trivial_server() {
let mut server_to_client = Vec::new();
server_to_client.push(RTMP_VERSION);
server_to_client.extend_from_slice(&[0u8; HANDSHAKE_PAYLOAD_LEN]);
server_to_client.extend_from_slice(&[0u8; HANDSHAKE_PAYLOAD_LEN]);
let mut duplex = DuplexBuf {
read: Cursor::new(server_to_client),
write: Vec::new(),
};
client_handshake(&mut duplex).expect("handshake");
assert_eq!(
duplex.write.len(),
1 + HANDSHAKE_PAYLOAD_LEN + HANDSHAKE_PAYLOAD_LEN
);
assert_eq!(duplex.write[0], RTMP_VERSION);
}
struct DuplexBuf {
read: Cursor<Vec<u8>>,
write: Vec<u8>,
}
impl std::io::Read for DuplexBuf {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.read.read(buf)
}
}
impl std::io::Write for DuplexBuf {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.write.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
}