mod config;
mod errors;
mod events;
mod outstanding_transaction;
mod publish_request_type;
mod result;
mod state;
#[cfg(test)]
mod tests;
pub use self::config::ClientSessionConfig;
pub use self::errors::ClientSessionError;
pub use self::events::ClientSessionEvent;
pub use self::publish_request_type::PublishRequestType;
pub use self::result::ClientSessionResult;
pub use self::state::ClientState;
use self::outstanding_transaction::{OutstandingTransaction, TransactionPurpose};
use bytes::Bytes;
use chunk_io::{ChunkDeserializer, ChunkSerializer, Packet};
use messages::{RtmpMessage, UserControlEventType};
use rml_amf0::Amf0Value;
use sessions::StreamMetadata;
use std::collections::HashMap;
use std::mem;
use std::time::SystemTime;
use time::RtmpTimestamp;
type ClientResult = Result<Vec<ClientSessionResult>, ClientSessionError>;
pub struct ClientSession {
start_time: SystemTime,
serializer: ChunkSerializer,
deserializer: ChunkDeserializer,
config: ClientSessionConfig,
next_transaction_id: u32,
outstanding_transactions: HashMap<u32, OutstandingTransaction>,
current_state: ClientState,
connected_app_name: Option<String>,
active_stream_id: Option<u32>,
peer_window_ack_size: Option<u32>,
bytes_received: u64,
bytes_received_since_last_ack: u32,
}
impl ClientSession {
pub fn new(
config: ClientSessionConfig,
) -> Result<(ClientSession, Vec<ClientSessionResult>), ClientSessionError> {
let session = ClientSession {
start_time: SystemTime::now(),
serializer: ChunkSerializer::new(),
deserializer: ChunkDeserializer::new(),
next_transaction_id: 1,
outstanding_transactions: HashMap::new(),
current_state: ClientState::Disconnected,
active_stream_id: None,
connected_app_name: None,
peer_window_ack_size: None,
bytes_received: 0,
bytes_received_since_last_ack: 0,
config,
};
let results = Vec::new();
Ok((session, results))
}
pub fn handle_input(&mut self, bytes: &[u8]) -> ClientResult {
let mut results = Vec::new();
self.bytes_received += bytes.len() as u64;
if let Some(peer_ack_size) = self.peer_window_ack_size {
self.bytes_received_since_last_ack += bytes.len() as u32;
if self.bytes_received_since_last_ack >= peer_ack_size {
let ack_message = RtmpMessage::Acknowledgement {
sequence_number: self.bytes_received_since_last_ack,
};
let ack_payload = ack_message.into_message_payload(self.get_epoch(), 0)?;
let ack_packet = self.serializer.serialize(&ack_payload, false, false)?;
self.bytes_received_since_last_ack = 0;
results.push(ClientSessionResult::OutboundResponse(ack_packet));
}
}
let mut bytes_to_process = bytes;
loop {
match self.deserializer.get_next_message(bytes_to_process)? {
None => break, Some(payload) => {
let message = payload.to_rtmp_message()?;
let mut message_results = match message {
RtmpMessage::Acknowledgement { sequence_number } => {
self.handle_acknowledgement(sequence_number)?
}
RtmpMessage::Amf0Command {
command_name,
transaction_id,
command_object,
additional_arguments,
} => self.handle_amf0_command(
command_name,
transaction_id,
command_object,
additional_arguments,
)?,
RtmpMessage::Amf0Data { values } => {
self.handle_amf0_data(values, payload.message_stream_id)?
}
RtmpMessage::AudioData { data } => self.handle_audio_data(
payload.message_stream_id,
data,
payload.timestamp,
)?,
RtmpMessage::VideoData { data } => self.handle_video_data(
payload.message_stream_id,
data,
payload.timestamp,
)?,
RtmpMessage::UserControl {
event_type,
timestamp,
stream_id,
buffer_length,
} => self.handle_user_control(
event_type,
timestamp,
stream_id,
buffer_length,
)?,
RtmpMessage::WindowAcknowledgement { size } => {
self.handle_window_ack_size(size)?
}
RtmpMessage::SetChunkSize { size } => self.handle_set_chunk_size(size)?,
_ => vec![ClientSessionResult::UnhandleableMessageReceived(payload)],
};
results.append(&mut message_results);
bytes_to_process = &[];
}
}
}
Ok(results)
}
pub fn request_connection(
&mut self,
app_name: String,
) -> Result<ClientSessionResult, ClientSessionError> {
match self.current_state {
ClientState::Disconnected => (),
_ => {
return Err(ClientSessionError::CantConnectWhileAlreadyConnected);
}
}
let transaction_id = self.get_next_transaction_id();
let transaction = OutstandingTransaction::ConnectionRequested {
app_name: app_name.clone(),
};
self.outstanding_transactions
.insert(transaction_id, transaction);
let mut properties = HashMap::new();
properties.insert("app".to_string(), Amf0Value::Utf8String(app_name));
properties.insert(
"flashVer".to_string(),
Amf0Value::Utf8String(self.config.flash_version.clone()),
);
properties.insert("objectEncoding".to_string(), Amf0Value::Number(0.0));
match &self.config.tc_url {
Some(tc_url) => {
properties.insert("tcUrl".to_string(), Amf0Value::Utf8String(tc_url.clone()));
}
None => (),
};
let message = RtmpMessage::Amf0Command {
command_name: "connect".to_string(),
command_object: Amf0Value::Object(properties),
additional_arguments: vec![],
transaction_id: transaction_id as f64,
};
let payload = message.into_message_payload(self.get_epoch(), 0)?;
let packet = self.serializer.serialize(&payload, false, false)?;
Ok(ClientSessionResult::OutboundResponse(packet))
}
pub fn request_playback(
&mut self,
stream_key: String,
) -> Result<ClientSessionResult, ClientSessionError> {
match self.current_state {
ClientState::Connected => (),
_ => {
return Err(ClientSessionError::SessionInInvalidState {
current_state: self.current_state.clone(),
});
}
}
let transaction_id = self.get_next_transaction_id();
let transaction = OutstandingTransaction::CreateStream {
purpose: TransactionPurpose::PlayRequest { stream_key },
};
self.outstanding_transactions
.insert(transaction_id, transaction);
let message = RtmpMessage::Amf0Command {
command_name: "createStream".to_string(),
transaction_id: transaction_id as f64,
command_object: Amf0Value::Null,
additional_arguments: Vec::new(),
};
let payload = message.into_message_payload(self.get_epoch(), 0)?;
let packet = self.serializer.serialize(&payload, false, false)?;
Ok(ClientSessionResult::OutboundResponse(packet))
}
pub fn request_publishing(
&mut self,
stream_key: String,
publish_type: PublishRequestType,
) -> Result<ClientSessionResult, ClientSessionError> {
match self.current_state {
ClientState::Connected => (),
_ => {
return Err(ClientSessionError::SessionInInvalidState {
current_state: self.current_state.clone(),
});
}
}
let transaction_id = self.get_next_transaction_id();
let transaction = OutstandingTransaction::CreateStream {
purpose: TransactionPurpose::PublishRequest {
stream_key,
request_type: publish_type,
},
};
self.outstanding_transactions
.insert(transaction_id, transaction);
let message = RtmpMessage::Amf0Command {
command_name: "createStream".to_string(),
transaction_id: transaction_id as f64,
command_object: Amf0Value::Null,
additional_arguments: Vec::new(),
};
let payload = message.into_message_payload(self.get_epoch(), 0)?;
let packet = self.serializer.serialize(&payload, false, false)?;
Ok(ClientSessionResult::OutboundResponse(packet))
}
pub fn stop_playback(&mut self) -> ClientResult {
match self.current_state {
ClientState::Playing { .. } => (),
ClientState::PlayRequested { .. } => (),
_ => return Ok(Vec::new()), }
self.current_state = ClientState::Connected;
match mem::replace(&mut self.active_stream_id, None) {
None => Ok(Vec::new()), Some(stream_id) => {
let message = RtmpMessage::Amf0Command {
command_name: "deleteStream".to_string(),
transaction_id: 0.0, command_object: Amf0Value::Null,
additional_arguments: vec![Amf0Value::Number(stream_id as f64)],
};
let payload = message.into_message_payload(self.get_epoch(), stream_id)?;
let packet = self.serializer.serialize(&payload, false, false)?;
Ok(vec![ClientSessionResult::OutboundResponse(packet)])
}
}
}
pub fn stop_publishing(&mut self) -> ClientResult {
match self.current_state {
ClientState::Publishing { .. } => (),
ClientState::PublishRequested { .. } => (),
_ => return Ok(Vec::new()), }
self.current_state = ClientState::Connected;
match mem::replace(&mut self.active_stream_id, None) {
None => Ok(Vec::new()), Some(stream_id) => {
let message = RtmpMessage::Amf0Command {
command_name: "deleteStream".to_string(),
transaction_id: 0.0, command_object: Amf0Value::Null,
additional_arguments: vec![Amf0Value::Number(stream_id as f64)],
};
let payload = message.into_message_payload(self.get_epoch(), stream_id)?;
let packet = self.serializer.serialize(&payload, false, false)?;
Ok(vec![ClientSessionResult::OutboundResponse(packet)])
}
}
}
pub fn send_ping_request(&mut self) -> Result<(Packet, RtmpTimestamp), ClientSessionError> {
let current_epoch = self.get_epoch();
let message = RtmpMessage::UserControl {
event_type: UserControlEventType::PingRequest,
buffer_length: None,
stream_id: None,
timestamp: Some(current_epoch.clone()),
};
let payload = message.into_message_payload(self.get_epoch(), 0)?;
let packet = self.serializer.serialize(&payload, false, false)?;
Ok((packet, current_epoch))
}
pub fn publish_metadata(
&mut self,
metadata: &StreamMetadata,
) -> Result<ClientSessionResult, ClientSessionError> {
match self.current_state {
ClientState::Publishing => (),
_ => {
return Err(ClientSessionError::SessionInInvalidState {
current_state: self.current_state.clone(),
});
}
}
let active_stream_id = match self.active_stream_id {
Some(x) => x,
None => {
return Err(ClientSessionError::NoKnownActiveStreamIdWhenRequired);
}
};
let mut properties = HashMap::new();
if let Some(x) = metadata.video_width {
properties.insert("width".to_string(), Amf0Value::Number(x as f64));
}
if let Some(x) = metadata.video_height {
properties.insert("height".to_string(), Amf0Value::Number(x as f64));
}
if let Some(x) = metadata.video_codec_id {
properties.insert("videocodecid".to_string(), Amf0Value::Number(x as f64));
}
if let Some(x) = metadata.video_frame_rate {
properties.insert("framerate".to_string(), Amf0Value::Number(x as f64));
}
if let Some(x) = metadata.video_bitrate_kbps {
properties.insert("videodatarate".to_string(), Amf0Value::Number(x as f64));
}
if let Some(x) = metadata.audio_codec_id {
properties.insert("audiocodecid".to_string(), Amf0Value::Number(x as f64));
}
if let Some(x) = metadata.audio_bitrate_kbps {
properties.insert("audiodatarate".to_string(), Amf0Value::Number(x as f64));
}
if let Some(x) = metadata.audio_sample_rate {
properties.insert("audiosamplerate".to_string(), Amf0Value::Number(x as f64));
}
if let Some(x) = metadata.audio_channels {
properties.insert("audiochannels".to_string(), Amf0Value::Number(x as f64));
}
if let Some(x) = metadata.audio_is_stereo {
properties.insert("stereo".to_string(), Amf0Value::Boolean(x));
}
if let Some(ref x) = metadata.encoder {
properties.insert("encoder".to_string(), Amf0Value::Utf8String(x.clone()));
}
let message = RtmpMessage::Amf0Data {
values: vec![
Amf0Value::Utf8String("@setDataFrame".to_string()),
Amf0Value::Utf8String("onMetaData".to_string()),
Amf0Value::Object(properties),
],
};
let payload = message.into_message_payload(self.get_epoch(), active_stream_id)?;
let packet = self.serializer.serialize(&payload, false, false)?;
Ok(ClientSessionResult::OutboundResponse(packet))
}
pub fn publish_video_data(
&mut self,
data: Bytes,
timestamp: RtmpTimestamp,
can_be_dropped: bool,
) -> Result<ClientSessionResult, ClientSessionError> {
match self.current_state {
ClientState::Publishing => (),
_ => {
return Err(ClientSessionError::SessionInInvalidState {
current_state: self.current_state.clone(),
});
}
}
let active_stream_id = match self.active_stream_id {
Some(x) => x,
None => {
return Err(ClientSessionError::NoKnownActiveStreamIdWhenRequired);
}
};
let message = RtmpMessage::VideoData { data };
let payload = message.into_message_payload(timestamp, active_stream_id)?;
let packet = self.serializer.serialize(&payload, false, can_be_dropped)?;
Ok(ClientSessionResult::OutboundResponse(packet))
}
pub fn publish_audio_data(
&mut self,
data: Bytes,
timestamp: RtmpTimestamp,
can_be_dropped: bool,
) -> Result<ClientSessionResult, ClientSessionError> {
match self.current_state {
ClientState::Publishing => (),
_ => {
return Err(ClientSessionError::SessionInInvalidState {
current_state: self.current_state.clone(),
});
}
}
let active_stream_id = match self.active_stream_id {
Some(x) => x,
None => {
return Err(ClientSessionError::NoKnownActiveStreamIdWhenRequired);
}
};
let message = RtmpMessage::AudioData { data };
let payload = message.into_message_payload(timestamp, active_stream_id)?;
let packet = self.serializer.serialize(&payload, false, can_be_dropped)?;
Ok(ClientSessionResult::OutboundResponse(packet))
}
fn handle_video_data(
&self,
stream_id: u32,
data: Bytes,
timestamp: RtmpTimestamp,
) -> ClientResult {
match self.current_state {
ClientState::PlayRequested { .. } => (),
ClientState::Playing { .. } => (),
_ => {
return Err(ClientSessionError::SessionInInvalidState {
current_state: self.current_state.clone(),
});
}
}
match self.active_stream_id {
None => return Ok(Vec::new()), Some(active_stream_id) if active_stream_id != stream_id => return Ok(Vec::new()), Some(_) => (),
}
let event = ClientSessionEvent::VideoDataReceived { data, timestamp };
Ok(vec![ClientSessionResult::RaisedEvent(event)])
}
fn handle_audio_data(
&self,
stream_id: u32,
data: Bytes,
timestamp: RtmpTimestamp,
) -> ClientResult {
match self.current_state {
ClientState::PlayRequested { .. } => (),
ClientState::Playing { .. } => (),
_ => {
return Err(ClientSessionError::SessionInInvalidState {
current_state: self.current_state.clone(),
});
}
}
match self.active_stream_id {
None => return Ok(Vec::new()), Some(active_stream_id) if active_stream_id != stream_id => return Ok(Vec::new()), Some(_) => (),
}
let event = ClientSessionEvent::AudioDataReceived { data, timestamp };
Ok(vec![ClientSessionResult::RaisedEvent(event)])
}
fn handle_amf0_data(&mut self, mut data: Vec<Amf0Value>, stream_id: u32) -> ClientResult {
if data.len() == 0 {
return Ok(Vec::new());
}
match self.active_stream_id {
None => return Ok(Vec::new()), Some(active_stream_id) if active_stream_id != stream_id => return Ok(Vec::new()), Some(_) => (),
}
let first_element = data.remove(0);
match first_element {
Amf0Value::Utf8String(ref value) if value == "onMetaData" => {
self.handle_amf0_data_on_meta_data(data)
}
_ => Ok(Vec::new()),
}
}
fn handle_amf0_command(
&mut self,
name: String,
transaction_id: f64,
command_object: Amf0Value,
additional_args: Vec<Amf0Value>,
) -> ClientResult {
match name.as_str() {
"_result" => self.handle_amf0_command_success_result(
transaction_id,
command_object,
additional_args,
),
"_error" => self.handle_amf0_command_failed_result(
transaction_id,
command_object,
additional_args,
),
"onStatus" => self.handle_on_status_command(additional_args),
_ => {
let event = ClientSessionEvent::UnhandleableAmf0Command {
command_name: name,
additional_values: additional_args,
command_object,
transaction_id,
};
Ok(vec![ClientSessionResult::RaisedEvent(event)])
}
}
}
fn handle_amf0_command_failed_result(
&mut self,
transaction_id: f64,
command_object: Amf0Value,
mut additional_args: Vec<Amf0Value>,
) -> ClientResult {
let outstanding_transaction = match self
.outstanding_transactions
.remove(&(transaction_id as u32))
{
Some(transaction) => transaction,
None => {
let event = ClientSessionEvent::UnknownTransactionResultReceived {
additional_values: additional_args,
command_object,
transaction_id,
};
return Ok(vec![ClientSessionResult::RaisedEvent(event)]);
}
};
match outstanding_transaction {
OutstandingTransaction::ConnectionRequested { app_name: _ } => {
let description = if additional_args.len() > 0 {
if let Amf0Value::Object(mut properties) = additional_args.remove(0) {
if let Some(Amf0Value::Utf8String(value)) = properties.remove("description")
{
value
} else {
"".to_string()
}
} else {
"".to_string()
}
} else {
"".to_string()
};
let event = ClientSessionEvent::ConnectionRequestRejected { description };
Ok(vec![ClientSessionResult::RaisedEvent(event)])
}
OutstandingTransaction::CreateStream { purpose: _ } => {
return Err(ClientSessionError::CreateStreamFailed);
}
}
}
fn handle_amf0_command_success_result(
&mut self,
transaction_id: f64,
command_object: Amf0Value,
additional_args: Vec<Amf0Value>,
) -> ClientResult {
let outstanding_transaction = match self
.outstanding_transactions
.remove(&(transaction_id as u32))
{
Some(transaction) => transaction,
None => {
let event = ClientSessionEvent::UnknownTransactionResultReceived {
additional_values: additional_args,
command_object,
transaction_id,
};
return Ok(vec![ClientSessionResult::RaisedEvent(event)]);
}
};
match outstanding_transaction {
OutstandingTransaction::ConnectionRequested { app_name } => {
self.current_state = ClientState::Connected;
self.connected_app_name = Some(app_name);
let message = RtmpMessage::WindowAcknowledgement {
size: self.config.window_ack_size,
};
let payload = message.into_message_payload(self.get_epoch(), 0)?;
let packet = self.serializer.serialize(&payload, false, false)?;
let event = ClientSessionEvent::ConnectionRequestAccepted;
let chunk_size_packet = self
.serializer
.set_max_chunk_size(self.config.chunk_size, RtmpTimestamp::new(0))?;
Ok(vec![
ClientSessionResult::OutboundResponse(packet),
ClientSessionResult::RaisedEvent(event),
ClientSessionResult::OutboundResponse(chunk_size_packet),
])
}
OutstandingTransaction::CreateStream { purpose } => {
if additional_args.len() == 0 {
return Err(ClientSessionError::CreateStreamResponseHadNoStreamNumber);
}
let stream_id = match additional_args[0] {
Amf0Value::Number(number) => number as u32,
_ => {
return Err(ClientSessionError::CreateStreamResponseHadNoStreamNumber);
}
};
self.active_stream_id = Some(stream_id);
match purpose {
TransactionPurpose::PlayRequest { stream_key } => {
self.current_state = ClientState::PlayRequested;
let buffer_message = RtmpMessage::UserControl {
event_type: UserControlEventType::SetBufferLength,
buffer_length: Some(self.config.playback_buffer_length_ms),
stream_id: Some(stream_id),
timestamp: None,
};
let buffer_payload =
buffer_message.into_message_payload(self.get_epoch(), 0)?;
let buffer_packet =
self.serializer.serialize(&buffer_payload, false, false)?;
let play_message = RtmpMessage::Amf0Command {
command_name: "play".to_string(),
transaction_id: 0.0,
command_object: Amf0Value::Null,
additional_arguments: vec![Amf0Value::Utf8String(stream_key)],
};
let play_payload =
play_message.into_message_payload(self.get_epoch(), stream_id)?;
let play_packet = self.serializer.serialize(&play_payload, false, false)?;
Ok(vec![
ClientSessionResult::OutboundResponse(buffer_packet),
ClientSessionResult::OutboundResponse(play_packet),
])
}
TransactionPurpose::PublishRequest {
stream_key,
request_type,
} => {
self.current_state = ClientState::PublishRequested;
let publish_type_string = match request_type {
PublishRequestType::Live => "live".to_string(),
PublishRequestType::Record => "record".to_string(),
PublishRequestType::Append => "append".to_string(),
};
let publish_message = RtmpMessage::Amf0Command {
command_name: "publish".to_string(),
transaction_id: 0.0,
command_object: Amf0Value::Null,
additional_arguments: vec![
Amf0Value::Utf8String(stream_key),
Amf0Value::Utf8String(publish_type_string),
],
};
let publish_payload =
publish_message.into_message_payload(self.get_epoch(), stream_id)?;
let publish_packet =
self.serializer.serialize(&publish_payload, false, false)?;
Ok(vec![ClientSessionResult::OutboundResponse(publish_packet)])
}
}
}
}
}
fn handle_on_status_command(&mut self, mut arguments: Vec<Amf0Value>) -> ClientResult {
if arguments.len() < 1 {
return Err(ClientSessionError::InvalidOnStatusArguments);
}
let mut properties = match arguments.remove(0) {
Amf0Value::Object(properties) => properties,
_ => {
return Err(ClientSessionError::InvalidOnStatusArguments);
}
};
let code = match properties.remove("code") {
Some(Amf0Value::Utf8String(code)) => code,
_ => {
return Err(ClientSessionError::InvalidOnStatusArguments);
}
};
match code.as_ref() {
"NetStream.Play.Start" => self.handle_play_start(),
"NetStream.Publish.Start" => self.handle_publish_start(),
x => {
let event = ClientSessionEvent::UnhandleableOnStatusCode {
code: x.to_string(),
};
Ok(vec![ClientSessionResult::RaisedEvent(event)])
}
}
}
fn handle_play_start(&mut self) -> ClientResult {
match self.current_state {
ClientState::PlayRequested => (),
_ => {
return Err(ClientSessionError::SessionInInvalidState {
current_state: self.current_state.clone(),
});
}
};
self.current_state = ClientState::Playing;
let event = ClientSessionEvent::PlaybackRequestAccepted;
Ok(vec![ClientSessionResult::RaisedEvent(event)])
}
fn handle_publish_start(&mut self) -> ClientResult {
match self.current_state {
ClientState::PublishRequested => (),
_ => {
return Err(ClientSessionError::SessionInInvalidState {
current_state: self.current_state.clone(),
});
}
};
self.current_state = ClientState::Publishing;
let event = ClientSessionEvent::PublishRequestAccepted;
Ok(vec![ClientSessionResult::RaisedEvent(event)])
}
fn handle_amf0_data_on_meta_data(&mut self, mut data: Vec<Amf0Value>) -> ClientResult {
if data.len() < 1 {
return Ok(Vec::new());
}
let properties = match data.remove(0) {
Amf0Value::Object(properties) => properties,
_ => return Ok(Vec::new()), };
let mut metadata = StreamMetadata::new();
metadata.apply_metadata_values(properties);
let event = ClientSessionEvent::StreamMetadataReceived { metadata };
Ok(vec![ClientSessionResult::RaisedEvent(event)])
}
fn handle_acknowledgement(&mut self, sequence_number: u32) -> ClientResult {
let event = ClientSessionEvent::AcknowledgementReceived {
bytes_received: sequence_number,
};
Ok(vec![ClientSessionResult::RaisedEvent(event)])
}
fn handle_window_ack_size(&mut self, size: u32) -> ClientResult {
self.peer_window_ack_size = Some(size);
Ok(Vec::new())
}
fn handle_user_control(
&mut self,
event_type: UserControlEventType,
timestamp: Option<RtmpTimestamp>,
_stream_id: Option<u32>,
_buffer_length: Option<u32>,
) -> ClientResult {
match event_type {
UserControlEventType::PingRequest => self.handle_ping_request(timestamp),
UserControlEventType::PingResponse => self.handle_ping_response(timestamp),
_ => Ok(Vec::new()),
}
}
fn handle_ping_request(&mut self, timestamp: Option<RtmpTimestamp>) -> ClientResult {
let message = RtmpMessage::UserControl {
event_type: UserControlEventType::PingResponse,
buffer_length: None,
stream_id: None,
timestamp,
};
let payload = message.into_message_payload(self.get_epoch(), 0)?;
let packet = self.serializer.serialize(&payload, false, false)?;
Ok(vec![ClientSessionResult::OutboundResponse(packet)])
}
fn handle_ping_response(&mut self, timestamp: Option<RtmpTimestamp>) -> ClientResult {
let timestamp = timestamp.unwrap_or(RtmpTimestamp::new(0));
let event = ClientSessionEvent::PingResponseReceived { timestamp };
Ok(vec![ClientSessionResult::RaisedEvent(event)])
}
fn handle_set_chunk_size(&mut self, size: u32) -> ClientResult {
self.deserializer.set_max_chunk_size(size as usize)?;
Ok(Vec::new())
}
fn get_epoch(&self) -> RtmpTimestamp {
match self.start_time.elapsed() {
Ok(duration) => {
let milliseconds =
(duration.as_secs() * 1000) + (duration.subsec_nanos() as u64 / 1_000_000);
RtmpTimestamp::new(milliseconds as u32)
}
Err(_) => RtmpTimestamp::new(0), }
}
fn get_next_transaction_id(&mut self) -> u32 {
let transaction_id = self.next_transaction_id;
self.next_transaction_id += 1;
transaction_id
}
}