1use crate::flv::flv_header::FlvHeader;
21use crate::flv::flv_tag::FlvTag;
22use crate::flv::flv_tag_header::FlvTagHeader;
23use crate::flv::{FLV_HEADER_LENGTH, FLV_TAG_HEADER_LENGTH, PREVIOUS_TAG_SIZE_LENGTH};
24use byteorder::{BigEndian, ReadBytesExt};
25use log::{debug, warn};
26use std::io;
27use std::io::Cursor;
28
29#[derive(Debug)]
30pub struct FlvBuffer {
31 buffer: Vec<u8>, head: usize, tail: usize, flv_header: Option<FlvHeader>, header_parsed: bool, initial_capacity: usize, }
38
39impl FlvBuffer {
40 pub fn new() -> Self {
42 Self::with_capacity(1024 * 1024)
43 }
44
45 pub fn with_capacity(capacity: usize) -> Self {
48 let capacity = if capacity.is_power_of_two() {
49 capacity
50 } else {
51 capacity.checked_next_power_of_two().unwrap_or(usize::MAX)
52 };
53
54 FlvBuffer {
55 buffer: vec![0; capacity],
56 head: 0,
57 tail: 0,
58 flv_header: None,
59 header_parsed: false, initial_capacity: capacity,
61 }
62 }
63
64 #[inline]
67 fn len(&self) -> usize {
68 (self.tail.wrapping_sub(self.head)) & (self.buffer.len() - 1)
74 }
75
76 #[inline]
79 fn available_space(&self) -> usize {
80 (self.head.wrapping_sub(self.tail).wrapping_sub(1)) & (self.buffer.len() - 1)
82 }
83
84 pub fn write_data(&mut self, data: &[u8]) {
95 let data_len = data.len();
96 if data_len == 0 {
97 return;
98 }
99
100 if data_len > self.available_space() {
102 self.resize_buffer(self.len() + data_len + 1);
103 }
104
105 if self.tail >= self.head {
107 let available_at_end = self.buffer.len() - self.tail;
108
109 if data_len <= available_at_end {
110 unsafe {
113 std::ptr::copy_nonoverlapping(
114 data.as_ptr(),
115 self.buffer.as_mut_ptr().add(self.tail),
116 data_len,
117 );
118 }
119
120 self.tail += data_len;
121 } else {
122 if available_at_end > 0 {
124 unsafe {
126 std::ptr::copy_nonoverlapping(
127 data.as_ptr(),
128 self.buffer.as_mut_ptr().add(self.tail),
129 available_at_end,
130 );
131 }
132 }
133 unsafe {
135 std::ptr::copy_nonoverlapping(
136 data.as_ptr().add(available_at_end),
137 self.buffer.as_mut_ptr(),
138 data_len - available_at_end,
139 );
140 }
141
142 self.tail = data_len - available_at_end; }
144 } else {
145 unsafe {
148 std::ptr::copy_nonoverlapping(
149 data.as_ptr(),
150 self.buffer.as_mut_ptr().add(self.tail),
151 data_len,
152 );
153 }
154
155 self.tail += data_len;
156 }
157
158 if self.tail == self.buffer.len() {
160 self.tail = 0;
161 }
162 }
163
164 fn resize_buffer(&mut self, new_capacity: usize) {
171 let new_capacity = new_capacity
172 .checked_next_power_of_two()
173 .unwrap_or(usize::MAX)
174 .max(self.initial_capacity);
175
176 let mut new_buffer = vec![0; new_capacity];
177
178 let current_len = self.len();
180
181 if self.tail > self.head {
183 new_buffer[..current_len].copy_from_slice(&self.buffer[self.head..self.tail]);
184 unsafe {
185 std::ptr::copy_nonoverlapping(
186 self.buffer.as_ptr().add(self.head),
187 new_buffer.as_mut_ptr(),
188 current_len,
189 );
190 }
191
192 } else if current_len > 0 {
193 let first_part = self.buffer.len() - self.head;
194 unsafe {
196 std::ptr::copy_nonoverlapping(
197 self.buffer.as_ptr().add(self.head),
198 new_buffer.as_mut_ptr(),
199 first_part,
200 );
201 }
202
203 unsafe {
205 std::ptr::copy_nonoverlapping(
206 self.buffer.as_ptr(),
207 new_buffer.as_mut_ptr().add(first_part),
208 current_len - first_part,
209 );
210 }
211 }
212
213 self.buffer = new_buffer;
214 self.head = 0;
215 self.tail = current_len;
216 }
217
218 #[inline]
220 fn skip_previous_tag_size(&mut self) {
221 self.head += PREVIOUS_TAG_SIZE_LENGTH;
222 if self.head >= self.buffer.len() {
223 self.head -= self.buffer.len(); }
225 }
226
227 fn parse_flv_header(&mut self) -> io::Result<()> {
233 if self.header_parsed {
234 return Ok(()); }
236
237 if self.len() < FLV_HEADER_LENGTH {
239 return Ok(()); }
241
242 let mut temp_buffer = [0u8; FLV_HEADER_LENGTH];
243 self.read_data(self.head, &mut temp_buffer);
244
245 let mut reader = Cursor::new(&temp_buffer);
246
247 let flv_signature = reader.read_u24::<BigEndian>()?;
249 debug!("FLV Signature: {:#X}", flv_signature);
250 if flv_signature != 0x464C56 {
251 self.skip_previous_tag_size();
253 self.header_parsed = true;
254 return Ok(()); }
256
257 let version = reader.read_u8()?;
259 debug!("FLV Version: {}", version);
260 if version != 1 {
261 self.skip_previous_tag_size();
262 self.header_parsed = true;
263 return Ok(()); }
265
266 let flags = reader.read_u8()?;
268 debug!("FLV Flags: {:#X}", flags);
269 match flags {
270 0x01 => debug!("Audio: No, Video: Yes"),
271 0x04 => debug!("Audio: Yes, Video: No"),
272 0x05 => debug!("Audio: Yes, Video: Yes"),
273 _ => {
274 self.skip_previous_tag_size();
275 self.header_parsed = true;
276 return Ok(());
277 } }
279
280 let data_offset = reader.read_u32::<BigEndian>()?;
282 if data_offset != 9 {
283 self.skip_previous_tag_size();
284 self.header_parsed = true;
285 return Ok(());
286 }
287 debug!("FLV Data Offset: {}", data_offset);
288
289 self.flv_header = Some(FlvHeader { flags });
291
292 self.header_parsed = true;
294
295 self.head += FLV_HEADER_LENGTH;
297 self.skip_previous_tag_size();
298
299 Ok(())
300 }
301
302 pub fn get_flv_header(&self) -> Option<&FlvHeader> {
304 self.flv_header.as_ref()
305 }
306
307 pub fn get_flv_tag(&mut self) -> Option<FlvTag> {
313 if self.len() < FLV_TAG_HEADER_LENGTH {
315 return None; }
317
318 if let Err(e) = self.parse_flv_header() {
320 warn!("Failed parsing FLV header: {}", e);
321 return None; }
323
324 let mut header_reader = CursorRing::new(&self.buffer, self.head, self.buffer.len());
326
327 let tag_type = header_reader.read_u8().ok()?;
329 let data_size = header_reader.read_u24::<BigEndian>().ok()?;
330 let timestamp = header_reader.read_u24::<BigEndian>().ok()?;
331 let timestamp_ext = header_reader.read_u8().ok()?;
332 let _stream_id = header_reader.read_u24::<BigEndian>().ok()?;
333
334 let total_tag_size = FLV_TAG_HEADER_LENGTH + data_size as usize + PREVIOUS_TAG_SIZE_LENGTH;
336
337 if self.len() < total_tag_size {
339 return None; }
341
342 let mut data = vec![0u8; data_size as usize];
344 let data_start = self.head + FLV_TAG_HEADER_LENGTH;
345 self.read_data(data_start, &mut data);
346
347 let flv_tag = FlvTag {
349 header: FlvTagHeader {
350 tag_type,
351 data_size,
352 timestamp,
353 timestamp_ext,
354 stream_id: 0, },
356 data: bytes::Bytes::from(data),
357 previous_tag_size: (FLV_TAG_HEADER_LENGTH + data_size as usize) as u32, };
359
360 self.head += total_tag_size;
362
363 while self.head >= self.buffer.len() {
365 self.head -= self.buffer.len();
366 }
367
368 Some(flv_tag)
369 }
370
371 fn read_data(&self, start: usize, buffer: &mut [u8]) {
377 let buffer_size = self.buffer.len();
378 if buffer_size == 0 || buffer.is_empty() {
379 return;
380 }
381
382 let normalized_start = start % buffer_size;
383 let request_len = buffer.len();
384 let safe_len = request_len.min(buffer_size);
385
386 let (first_len, second_len) = {
387 let virtual_end = normalized_start + safe_len;
388 if virtual_end <= buffer_size {
389 (safe_len, 0)
390 } else {
391 (
392 buffer_size - normalized_start,
393 safe_len - (buffer_size - normalized_start),
394 )
395 }
396 };
397
398 unsafe {
400 std::ptr::copy_nonoverlapping(
401 self.buffer.as_ptr().add(normalized_start),
402 buffer.as_mut_ptr(),
403 first_len,
404 );
405 }
406
407 if second_len > 0 {
408 unsafe {
410 std::ptr::copy_nonoverlapping(
411 self.buffer.as_ptr(),
412 buffer.as_mut_ptr().add(first_len),
413 second_len
414 );
415 }
416 }
417 }
418}
419
420
421struct CursorRing<'a> {
424 buffer: &'a [u8],
425 position: usize,
426 buffer_size: usize,
427}
428
429impl<'a> CursorRing<'a> {
430 fn new(buffer: &'a [u8], start: usize, buffer_size: usize) -> Self {
431 Self {
432 buffer,
433 position: start,
434 buffer_size,
435 }
436 }
437}
438
439impl<'a> io::Read for CursorRing<'a> {
440 #[inline(always)]
441 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
442 let mut bytes_read = 0;
443 let buf_len = buf.len();
444 let buffer_end = self.buffer_size;
445
446 let wrap_around = self.position + buf_len > buffer_end;
447
448 if wrap_around {
449 let first_part_len = buffer_end - self.position;
450 unsafe {
452 std::ptr::copy_nonoverlapping(
453 self.buffer.as_ptr().add(self.position),
454 buf.as_mut_ptr(),
455 first_part_len,
456 );
457 }
458 self.position = 0;
459 bytes_read += first_part_len;
460 }
461
462 let remaining_len = buf_len - bytes_read;
463 if remaining_len > 0 {
464 unsafe {
466 std::ptr::copy_nonoverlapping(
467 self.buffer.as_ptr().add(self.position),
468 buf.as_mut_ptr().add(bytes_read),
469 remaining_len,
470 );
471 }
472 self.position += remaining_len;
473 bytes_read += remaining_len;
474 }
475
476 if self.position == buffer_end {
477 self.position = 0;
478 }
479
480 Ok(bytes_read)
481 }
482}
483
484#[cfg(test)]
485mod tests {
486
487 #[test]
488 fn test_len() {
489 assert_eq!(base_len(0, 10, 16), len(0, 10, 16));
490 assert_eq!(base_len(1, 3, 16), len(1, 3, 16));
491 assert_eq!(base_len(3, 5, 16), len(3, 5, 16));
492 assert_eq!(base_len(4, 4, 16), len(4, 4, 16));
493 assert_eq!(base_len(10, 2, 16), len(10, 2, 16));
494 assert_eq!(base_len(8, 3, 16), len(8, 3, 16));
495 assert_eq!(base_len(9, 0, 16), len(9, 0, 16));
496 }
497
498 fn len(head: usize, tail: usize, buffer_len: usize) -> usize {
499 (tail.wrapping_sub(head)) & (buffer_len - 1)
500 }
501
502 fn base_len(head: usize, tail: usize, buffer_len: usize) -> usize {
503 if tail >= head {
504 tail - head
505 } else {
506 buffer_len - head + tail
507 }
508 }
509
510 #[test]
511 fn test_available_space() {
512 assert_eq!(base_available_space(0, 10, 16), available_space(0, 10, 16));
513 assert_eq!(base_available_space(1, 3, 16), available_space(1, 3, 16));
514 assert_eq!(base_available_space(3, 5, 16), available_space(3, 5, 16));
515 assert_eq!(base_available_space(4, 5, 16), available_space(4, 5, 16));
516 assert_eq!(base_available_space(10, 2, 16), available_space(10, 2, 16));
517 assert_eq!(base_available_space(8, 3, 16), available_space(8, 3, 16));
518 assert_eq!(base_available_space(9, 0, 16), available_space(9, 0, 16));
519 }
520
521 fn base_available_space(head: usize, tail: usize, buffer_len: usize) -> usize {
522 buffer_len - len(head, tail, buffer_len) - 1
523 }
524
525 fn available_space(head: usize, tail: usize, buffer_len: usize) -> usize {
526 (head.wrapping_sub(tail).wrapping_sub(1)) & (buffer_len - 1)
527 }
528}