use crate::flv::flv_header::FlvHeader;
use crate::flv::flv_tag::FlvTag;
use crate::flv::flv_tag_header::FlvTagHeader;
use crate::flv::{FLV_HEADER_LENGTH, FLV_TAG_HEADER_LENGTH, PREVIOUS_TAG_SIZE_LENGTH};
use byteorder::{BigEndian, ReadBytesExt};
use log::{debug, warn};
use std::io;
use std::io::Cursor;
#[derive(Debug)]
pub struct FlvBuffer {
buffer: Vec<u8>, head: usize, tail: usize, flv_header: Option<FlvHeader>, header_parsed: bool, initial_capacity: usize, }
impl FlvBuffer {
pub fn new() -> Self {
Self::with_capacity(1024 * 1024)
}
pub fn with_capacity(capacity: usize) -> Self {
let capacity = if capacity.is_power_of_two() {
capacity
} else {
capacity.checked_next_power_of_two().unwrap_or(usize::MAX)
};
FlvBuffer {
buffer: vec![0; capacity],
head: 0,
tail: 0,
flv_header: None,
header_parsed: false, initial_capacity: capacity,
}
}
#[inline]
fn len(&self) -> usize {
(self.tail.wrapping_sub(self.head)) & (self.buffer.len() - 1)
}
#[inline]
fn available_space(&self) -> usize {
(self.head.wrapping_sub(self.tail).wrapping_sub(1)) & (self.buffer.len() - 1)
}
pub fn write_data(&mut self, data: &[u8]) {
let data_len = data.len();
if data_len == 0 {
return;
}
if data_len > self.available_space() {
self.resize_buffer(self.len() + data_len + 1);
}
if self.tail >= self.head {
let available_at_end = self.buffer.len() - self.tail;
if data_len <= available_at_end {
unsafe {
std::ptr::copy_nonoverlapping(
data.as_ptr(),
self.buffer.as_mut_ptr().add(self.tail),
data_len,
);
}
self.tail += data_len;
} else {
if available_at_end > 0 {
unsafe {
std::ptr::copy_nonoverlapping(
data.as_ptr(),
self.buffer.as_mut_ptr().add(self.tail),
available_at_end,
);
}
}
unsafe {
std::ptr::copy_nonoverlapping(
data.as_ptr().add(available_at_end),
self.buffer.as_mut_ptr(),
data_len - available_at_end,
);
}
self.tail = data_len - available_at_end; }
} else {
unsafe {
std::ptr::copy_nonoverlapping(
data.as_ptr(),
self.buffer.as_mut_ptr().add(self.tail),
data_len,
);
}
self.tail += data_len;
}
if self.tail == self.buffer.len() {
self.tail = 0;
}
}
fn resize_buffer(&mut self, new_capacity: usize) {
let new_capacity = new_capacity
.checked_next_power_of_two()
.unwrap_or(usize::MAX)
.max(self.initial_capacity);
let mut new_buffer = vec![0; new_capacity];
let current_len = self.len();
if self.tail > self.head {
new_buffer[..current_len].copy_from_slice(&self.buffer[self.head..self.tail]);
unsafe {
std::ptr::copy_nonoverlapping(
self.buffer.as_ptr().add(self.head),
new_buffer.as_mut_ptr(),
current_len,
);
}
} else if current_len > 0 {
let first_part = self.buffer.len() - self.head;
unsafe {
std::ptr::copy_nonoverlapping(
self.buffer.as_ptr().add(self.head),
new_buffer.as_mut_ptr(),
first_part,
);
}
unsafe {
std::ptr::copy_nonoverlapping(
self.buffer.as_ptr(),
new_buffer.as_mut_ptr().add(first_part),
current_len - first_part,
);
}
}
self.buffer = new_buffer;
self.head = 0;
self.tail = current_len;
}
#[inline]
fn skip_previous_tag_size(&mut self) {
self.head += PREVIOUS_TAG_SIZE_LENGTH;
if self.head >= self.buffer.len() {
self.head -= self.buffer.len(); }
}
fn parse_flv_header(&mut self) -> io::Result<()> {
if self.header_parsed {
return Ok(()); }
if self.len() < FLV_HEADER_LENGTH {
return Ok(()); }
let mut temp_buffer = [0u8; FLV_HEADER_LENGTH];
self.read_data(self.head, &mut temp_buffer);
let mut reader = Cursor::new(&temp_buffer);
let flv_signature = reader.read_u24::<BigEndian>()?;
debug!("FLV Signature: {:#X}", flv_signature);
if flv_signature != 0x464C56 {
self.skip_previous_tag_size();
self.header_parsed = true;
return Ok(()); }
let version = reader.read_u8()?;
debug!("FLV Version: {}", version);
if version != 1 {
self.skip_previous_tag_size();
self.header_parsed = true;
return Ok(()); }
let flags = reader.read_u8()?;
debug!("FLV Flags: {:#X}", flags);
match flags {
0x01 => debug!("Audio: No, Video: Yes"),
0x04 => debug!("Audio: Yes, Video: No"),
0x05 => debug!("Audio: Yes, Video: Yes"),
_ => {
self.skip_previous_tag_size();
self.header_parsed = true;
return Ok(());
} }
let data_offset = reader.read_u32::<BigEndian>()?;
if data_offset != 9 {
self.skip_previous_tag_size();
self.header_parsed = true;
return Ok(());
}
debug!("FLV Data Offset: {}", data_offset);
self.flv_header = Some(FlvHeader { flags });
self.header_parsed = true;
self.head += FLV_HEADER_LENGTH;
self.skip_previous_tag_size();
Ok(())
}
pub fn get_flv_header(&self) -> Option<&FlvHeader> {
self.flv_header.as_ref()
}
pub fn get_flv_tag(&mut self) -> Option<FlvTag> {
if self.len() < FLV_TAG_HEADER_LENGTH {
return None; }
if let Err(e) = self.parse_flv_header() {
warn!("Failed parsing FLV header: {}", e);
return None; }
let mut header_reader = CursorRing::new(&self.buffer, self.head, self.buffer.len());
let tag_type = header_reader.read_u8().ok()?;
let data_size = header_reader.read_u24::<BigEndian>().ok()?;
let timestamp = header_reader.read_u24::<BigEndian>().ok()?;
let timestamp_ext = header_reader.read_u8().ok()?;
let _stream_id = header_reader.read_u24::<BigEndian>().ok()?;
let total_tag_size = FLV_TAG_HEADER_LENGTH + data_size as usize + PREVIOUS_TAG_SIZE_LENGTH;
if self.len() < total_tag_size {
return None; }
let mut data = vec![0u8; data_size as usize];
let data_start = self.head + FLV_TAG_HEADER_LENGTH;
self.read_data(data_start, &mut data);
let flv_tag = FlvTag {
header: FlvTagHeader {
tag_type,
data_size,
timestamp,
timestamp_ext,
stream_id: 0, },
data: bytes::Bytes::from(data),
previous_tag_size: (FLV_TAG_HEADER_LENGTH + data_size as usize) as u32, };
self.head += total_tag_size;
while self.head >= self.buffer.len() {
self.head -= self.buffer.len();
}
Some(flv_tag)
}
fn read_data(&self, start: usize, buffer: &mut [u8]) {
let buffer_size = self.buffer.len();
if buffer_size == 0 || buffer.is_empty() {
return;
}
let normalized_start = start % buffer_size;
let request_len = buffer.len();
let safe_len = request_len.min(buffer_size);
let (first_len, second_len) = {
let virtual_end = normalized_start + safe_len;
if virtual_end <= buffer_size {
(safe_len, 0)
} else {
(
buffer_size - normalized_start,
safe_len - (buffer_size - normalized_start),
)
}
};
unsafe {
std::ptr::copy_nonoverlapping(
self.buffer.as_ptr().add(normalized_start),
buffer.as_mut_ptr(),
first_len,
);
}
if second_len > 0 {
unsafe {
std::ptr::copy_nonoverlapping(
self.buffer.as_ptr(),
buffer.as_mut_ptr().add(first_len),
second_len
);
}
}
}
}
struct CursorRing<'a> {
buffer: &'a [u8],
position: usize,
buffer_size: usize,
}
impl<'a> CursorRing<'a> {
fn new(buffer: &'a [u8], start: usize, buffer_size: usize) -> Self {
Self {
buffer,
position: start,
buffer_size,
}
}
}
impl<'a> io::Read for CursorRing<'a> {
#[inline(always)]
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let mut bytes_read = 0;
let buf_len = buf.len();
let buffer_end = self.buffer_size;
let wrap_around = self.position + buf_len > buffer_end;
if wrap_around {
let first_part_len = buffer_end - self.position;
unsafe {
std::ptr::copy_nonoverlapping(
self.buffer.as_ptr().add(self.position),
buf.as_mut_ptr(),
first_part_len,
);
}
self.position = 0;
bytes_read += first_part_len;
}
let remaining_len = buf_len - bytes_read;
if remaining_len > 0 {
unsafe {
std::ptr::copy_nonoverlapping(
self.buffer.as_ptr().add(self.position),
buf.as_mut_ptr().add(bytes_read),
remaining_len,
);
}
self.position += remaining_len;
bytes_read += remaining_len;
}
if self.position == buffer_end {
self.position = 0;
}
Ok(bytes_read)
}
}
#[cfg(test)]
mod tests {
#[test]
fn test_len() {
assert_eq!(base_len(0, 10, 16), len(0, 10, 16));
assert_eq!(base_len(1, 3, 16), len(1, 3, 16));
assert_eq!(base_len(3, 5, 16), len(3, 5, 16));
assert_eq!(base_len(4, 4, 16), len(4, 4, 16));
assert_eq!(base_len(10, 2, 16), len(10, 2, 16));
assert_eq!(base_len(8, 3, 16), len(8, 3, 16));
assert_eq!(base_len(9, 0, 16), len(9, 0, 16));
}
fn len(head: usize, tail: usize, buffer_len: usize) -> usize {
(tail.wrapping_sub(head)) & (buffer_len - 1)
}
fn base_len(head: usize, tail: usize, buffer_len: usize) -> usize {
if tail >= head {
tail - head
} else {
buffer_len - head + tail
}
}
#[test]
fn test_available_space() {
assert_eq!(base_available_space(0, 10, 16), available_space(0, 10, 16));
assert_eq!(base_available_space(1, 3, 16), available_space(1, 3, 16));
assert_eq!(base_available_space(3, 5, 16), available_space(3, 5, 16));
assert_eq!(base_available_space(4, 5, 16), available_space(4, 5, 16));
assert_eq!(base_available_space(10, 2, 16), available_space(10, 2, 16));
assert_eq!(base_available_space(8, 3, 16), available_space(8, 3, 16));
assert_eq!(base_available_space(9, 0, 16), available_space(9, 0, 16));
}
fn base_available_space(head: usize, tail: usize, buffer_len: usize) -> usize {
buffer_len - len(head, tail, buffer_len) - 1
}
fn available_space(head: usize, tail: usize, buffer_len: usize) -> usize {
(head.wrapping_sub(tail).wrapping_sub(1)) & (buffer_len - 1)
}
}