ez_ffmpeg/flv/
flv_buffer.rs

1//! A high-performance ring buffer implementation designed for FLV (Flash Video) data processing.
2//!
3//! This buffer is optimized for:
4//! - Minimizing memory fragmentation by using a pre-allocated ring buffer
5//! - Reducing memory allocation/deallocation overhead
6//! - High-performance memory operations using unsafe `copy_nonoverlapping`
7//!
8//! The buffer size is always a power of two to enable efficient modulo operations
9//! using bitwise AND operations.
10//!
11//! # Memory Management
12//! - Uses a ring buffer to cache data and prevent frequent memory allocations
13//! - Only resizes when absolutely necessary to maintain performance
14//! - All memory operations are performed using `copy_nonoverlapping` for maximum efficiency
15//!
16//! # Safety
17//! While the implementation uses unsafe code for performance optimization,
18//! all unsafe operations are carefully bounded and checked to maintain memory safety.
19//!
20use crate::flv::flv_header::FlvHeader;
21use crate::flv::flv_tag::FlvTag;
22use crate::flv::flv_tag_header::FlvTagHeader;
23use crate::flv::{FLV_HEADER_LENGTH, FLV_TAG_HEADER_LENGTH, PREVIOUS_TAG_SIZE_LENGTH};
24use byteorder::{BigEndian, ReadBytesExt};
25use log::{debug, warn};
26use std::io;
27use std::io::Cursor;
28
29#[derive(Debug)]
30pub struct FlvBuffer {
31    buffer: Vec<u8>,               // Internal buffer
32    head: usize,                   // Index of the first valid byte in the buffer
33    tail: usize,                   // Index where new data will be written
34    flv_header: Option<FlvHeader>, // Store the parsed FLV header (if available)
35    header_parsed: bool,           // Flag to track if FLV file header has been parsed
36    initial_capacity: usize,       // Initial buffer capacity
37}
38
39impl FlvBuffer {
40    /// Creates a new FlvBuffer with default capacity (1M).
41    pub fn new() -> Self {
42        Self::with_capacity(1024 * 1024)
43    }
44
45    /// Creates a new FlvBuffer with the specified capacity.
46    /// The actual capacity will be rounded up to the next power of two.
47    pub fn with_capacity(capacity: usize) -> Self {
48        let capacity = if capacity.is_power_of_two() {
49            capacity
50        } else {
51            capacity.checked_next_power_of_two().unwrap_or(usize::MAX)
52        };
53
54        FlvBuffer {
55            buffer: vec![0; capacity],
56            head: 0,
57            tail: 0,
58            flv_header: None,
59            header_parsed: false, // Initially, the header is not parsed
60            initial_capacity: capacity,
61        }
62    }
63
64    /// Calculates the current length of valid data in the buffer.
65    /// Uses efficient bitwise operations for modulo calculation.
66    #[inline]
67    fn len(&self) -> usize {
68        /*if self.tail >= self.head {
69            self.tail - self.head
70        } else {
71            self.buffer.len() - self.head + self.tail
72        }*/
73        (self.tail.wrapping_sub(self.head)) & (self.buffer.len() - 1)
74    }
75
76    /// Calculates the available space in the buffer.
77    /// Uses efficient bitwise operations for modulo calculation.
78    #[inline]
79    fn available_space(&self) -> usize {
80        // self.buffer.len() - self.len() - 1
81        (self.head.wrapping_sub(self.tail).wrapping_sub(1)) & (self.buffer.len() - 1)
82    }
83
84    /// Writes data into the ring buffer.
85    ///
86    /// This method handles:
87    /// - Buffer resizing if needed
88    /// - Wrapping around the buffer end
89    /// - High-performance memory copying using `copy_nonoverlapping`
90    ///
91    /// # Performance
92    /// Uses unsafe `copy_nonoverlapping` for optimal memory copying performance,
93    /// avoiding bounds checks and overlapping memory verification.
94    pub fn write_data(&mut self, data: &[u8]) {
95        let data_len = data.len();
96        if data_len == 0 {
97            return;
98        }
99
100        // Resize the buffer if needed to accommodate the new data.
101        if data_len > self.available_space() {
102            self.resize_buffer(self.len() + data_len + 1);
103        }
104
105        // Write data to the buffer, handling the wrap-around if necessary.
106        if self.tail >= self.head {
107            let available_at_end = self.buffer.len() - self.tail;
108
109            if data_len <= available_at_end {
110                // Can write the entire chunk in one go
111                // self.buffer[self.tail..self.tail + data_len].copy_from_slice(data);
112                unsafe {
113                    std::ptr::copy_nonoverlapping(
114                        data.as_ptr(),
115                        self.buffer.as_mut_ptr().add(self.tail),
116                        data_len,
117                    );
118                }
119
120                self.tail += data_len;
121            } else {
122                // Need to wrap around.
123                if available_at_end > 0 {
124                    // self.buffer[self.tail..].copy_from_slice(&data[..available_at_end]);
125                    unsafe {
126                        std::ptr::copy_nonoverlapping(
127                            data.as_ptr(),
128                            self.buffer.as_mut_ptr().add(self.tail),
129                            available_at_end,
130                        );
131                    }
132                }
133                // self.buffer[..data_len - available_at_end].copy_from_slice(&data[available_at_end..]);
134                unsafe {
135                    std::ptr::copy_nonoverlapping(
136                        data.as_ptr().add(available_at_end),
137                        self.buffer.as_mut_ptr(),
138                        data_len - available_at_end,
139                    );
140                }
141
142                self.tail = data_len - available_at_end; //Tail is now at start of buffer.
143            }
144        } else {
145            // Head is after tail - just write to the end.
146            // self.buffer[self.tail..self.tail + data_len].copy_from_slice(data);
147            unsafe {
148                std::ptr::copy_nonoverlapping(
149                    data.as_ptr(),
150                    self.buffer.as_mut_ptr().add(self.tail),
151                    data_len,
152                );
153            }
154
155            self.tail += data_len;
156        }
157
158        // Wrap around the tail if it reaches the end of the buffer.
159        if self.tail == self.buffer.len() {
160            self.tail = 0;
161        }
162    }
163
164    /// Resizes the buffer to accommodate more data.
165    ///
166    /// # Notes
167    /// - New capacity is always a power of two
168    /// - Maintains data continuity during resize
169    /// - Uses high-performance memory copying
170    fn resize_buffer(&mut self, new_capacity: usize) {
171        let new_capacity = new_capacity
172            .checked_next_power_of_two()
173            .unwrap_or(usize::MAX)
174            .max(self.initial_capacity);
175
176        let mut new_buffer = vec![0; new_capacity];
177
178        // Calculate the current data length BEFORE replacing the buffer
179        let current_len = self.len();
180
181        // Copy existing data directly to new buffer
182        if self.tail > self.head {
183            new_buffer[..current_len].copy_from_slice(&self.buffer[self.head..self.tail]);
184            unsafe {
185                std::ptr::copy_nonoverlapping(
186                    self.buffer.as_ptr().add(self.head),
187                    new_buffer.as_mut_ptr(),
188                    current_len,
189                );
190            }
191
192        } else if current_len > 0 {
193            let first_part = self.buffer.len() - self.head;
194            // new_buffer[..first_part].copy_from_slice(&self.buffer[self.head..]);
195            unsafe {
196                std::ptr::copy_nonoverlapping(
197                    self.buffer.as_ptr().add(self.head),
198                    new_buffer.as_mut_ptr(),
199                    first_part,
200                );
201            }
202
203            // new_buffer[first_part..current_len].copy_from_slice(&self.buffer[..self.tail]);
204            unsafe {
205                std::ptr::copy_nonoverlapping(
206                    self.buffer.as_ptr(),
207                    new_buffer.as_mut_ptr().add(first_part),
208                    current_len - first_part,
209                );
210            }
211        }
212
213        self.buffer = new_buffer;
214        self.head = 0;
215        self.tail = current_len;
216    }
217
218    /// Skips the PreviousTagSize field (4 bytes) after reading a tag.
219    #[inline]
220    fn skip_previous_tag_size(&mut self) {
221        self.head += PREVIOUS_TAG_SIZE_LENGTH;
222        if self.head >= self.buffer.len() {
223            self.head -= self.buffer.len(); // Handle wrap-around.
224        }
225    }
226
227    /// Attempts to parse the FLV file header from the buffer.
228    ///
229    /// # Returns
230    /// - `Ok(())` if parsing succeeds or needs to be deferred
231    /// - `Err` if an IO error occurs during parsing
232    fn parse_flv_header(&mut self) -> io::Result<()> {
233        if self.header_parsed {
234            return Ok(()); // Header already parsed
235        }
236
237        // Check if there is enough data in buffer for an FLV header.
238        if self.len() < FLV_HEADER_LENGTH {
239            return Ok(()); // Not enough data, skip parsing.
240        }
241
242        let mut temp_buffer = [0u8; FLV_HEADER_LENGTH];
243        self.read_data(self.head, &mut temp_buffer);
244
245        let mut reader = Cursor::new(&temp_buffer);
246
247        // Check if the file starts with "FLV"
248        let flv_signature = reader.read_u24::<BigEndian>()?;
249        debug!("FLV Signature: {:#X}", flv_signature);
250        if flv_signature != 0x464C56 {
251            // "FLV" in ASCII
252            self.skip_previous_tag_size();
253            self.header_parsed = true;
254            return Ok(()); // Skip files that don't start with "FLV"
255        }
256
257        // Read the version (should be 1)
258        let version = reader.read_u8()?;
259        debug!("FLV Version: {}", version);
260        if version != 1 {
261            self.skip_previous_tag_size();
262            self.header_parsed = true;
263            return Ok(()); // Skip if the version is not 1
264        }
265
266        // Read the flags (should be 0x05 for audio and video presence)
267        let flags = reader.read_u8()?;
268        debug!("FLV Flags: {:#X}", flags);
269        match flags {
270            0x01 => debug!("Audio: No, Video: Yes"),
271            0x04 => debug!("Audio: Yes, Video: No"),
272            0x05 => debug!("Audio: Yes, Video: Yes"),
273            _ => {
274                self.skip_previous_tag_size();
275                self.header_parsed = true;
276                return Ok(());
277            } // Skip invalid flags
278        }
279
280        // Read the data offset (indicates where the data starts)
281        let data_offset = reader.read_u32::<BigEndian>()?;
282        if data_offset != 9 {
283            self.skip_previous_tag_size();
284            self.header_parsed = true;
285            return Ok(());
286        }
287        debug!("FLV Data Offset: {}", data_offset);
288
289        // Store the header information
290        self.flv_header = Some(FlvHeader { flags });
291
292        // Mark the header as parsed
293        self.header_parsed = true;
294
295        // Advance cursor past the FLV header and the subsequent PreviousTagSize
296        self.head += FLV_HEADER_LENGTH;
297        self.skip_previous_tag_size();
298
299        Ok(())
300    }
301
302    /// Returns a reference to the parsed FLV header, if available.
303    pub fn get_flv_header(&self) -> Option<&FlvHeader> {
304        self.flv_header.as_ref()
305    }
306
307    /// Attempts to parse and return a complete FLV tag from the buffer.
308    ///
309    /// # Returns
310    /// - `Some(FlvTag)` if a complete tag is available
311    /// - `None` if there isn't enough data for a complete tag
312    pub fn get_flv_tag(&mut self) -> Option<FlvTag> {
313        // Check if there's enough data to read a complete FLV Tag
314        if self.len() < FLV_TAG_HEADER_LENGTH {
315            return None; // Not enough data to read a tag header
316        }
317
318        // Ensure the FLV file header is parsed or skip if not valid
319        if let Err(e) = self.parse_flv_header() {
320            warn!("Failed parsing FLV header: {}", e);
321            return None; // Return None if header parsing fails
322        }
323
324        // Create a reader that can handle buffer wrap-around
325        let mut header_reader = CursorRing::new(&self.buffer, self.head, self.buffer.len());
326
327        // Parse the FLV Tag Header
328        let tag_type = header_reader.read_u8().ok()?;
329        let data_size = header_reader.read_u24::<BigEndian>().ok()?;
330        let timestamp = header_reader.read_u24::<BigEndian>().ok()?;
331        let timestamp_ext = header_reader.read_u8().ok()?;
332        let _stream_id = header_reader.read_u24::<BigEndian>().ok()?;
333
334        // Calculate the total size of the tag, including the header and PreviousTagSize
335        let total_tag_size = FLV_TAG_HEADER_LENGTH + data_size as usize + PREVIOUS_TAG_SIZE_LENGTH;
336
337        // Check if there's enough data to read the full tag data
338        if self.len() < total_tag_size {
339            return None; // Not enough data to read the tag data and PreviousTagSize
340        }
341
342        // Read the tag data into a temporary buffer
343        let mut data = vec![0u8; data_size as usize];
344        let data_start = self.head + FLV_TAG_HEADER_LENGTH;
345        self.read_data(data_start, &mut data);
346
347        // Create the FLV Tag
348        let flv_tag = FlvTag {
349            header: FlvTagHeader {
350                tag_type,
351                data_size,
352                timestamp,
353                timestamp_ext,
354                stream_id: 0, // Always 0
355            },
356            data: bytes::Bytes::from(data),
357            previous_tag_size: (FLV_TAG_HEADER_LENGTH + data_size as usize) as u32, // Store PreviousTagSize
358        };
359
360        // Advance the head past the entire tag (header + data + PreviousTagSize)
361        self.head += total_tag_size;
362
363        // Handle wrap-around for head.
364        while self.head >= self.buffer.len() {
365            self.head -= self.buffer.len();
366        }
367
368        Some(flv_tag)
369    }
370
371    /// Reads data from the ring buffer, safely handling wrap-around.
372    ///
373    /// # Performance
374    /// Uses unsafe `copy_nonoverlapping` for optimal memory copying,
375    /// with careful bounds checking to ensure safety.
376    fn read_data(&self, start: usize, buffer: &mut [u8]) {
377        let buffer_size = self.buffer.len();
378        if buffer_size == 0 || buffer.is_empty() {
379            return;
380        }
381
382        let normalized_start = start % buffer_size;
383        let request_len = buffer.len();
384        let safe_len = request_len.min(buffer_size);
385
386        let (first_len, second_len) = {
387            let virtual_end = normalized_start + safe_len;
388            if virtual_end <= buffer_size {
389                (safe_len, 0)
390            } else {
391                (
392                    buffer_size - normalized_start,
393                    safe_len - (buffer_size - normalized_start),
394                )
395            }
396        };
397
398        // buffer[..first_len].copy_from_slice(&self.buffer[normalized_start..normalized_start  + first_len]);
399        unsafe {
400            std::ptr::copy_nonoverlapping(
401                self.buffer.as_ptr().add(normalized_start),
402                buffer.as_mut_ptr(),
403                first_len,
404            );
405        }
406
407        if second_len > 0 {
408            // buffer[first_len..first_len + second_len].copy_from_slice(&self.buffer[..second_len]);
409            unsafe {
410                std::ptr::copy_nonoverlapping(
411                    self.buffer.as_ptr(),
412                    buffer.as_mut_ptr().add(first_len),
413                    second_len
414                );
415            }
416        }
417    }
418}
419
420
421/// A cursor implementation that handles ring buffer wrap-around.
422/// Used for reading FLV tag headers efficiently.
423struct CursorRing<'a> {
424    buffer: &'a [u8],
425    position: usize,
426    buffer_size: usize,
427}
428
429impl<'a> CursorRing<'a> {
430    fn new(buffer: &'a [u8], start: usize, buffer_size: usize) -> Self {
431        Self {
432            buffer,
433            position: start,
434            buffer_size,
435        }
436    }
437}
438
439impl<'a> io::Read for CursorRing<'a> {
440    #[inline(always)]
441    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
442        let mut bytes_read = 0;
443        let buf_len = buf.len();
444        let buffer_end = self.buffer_size;
445
446        let wrap_around = self.position + buf_len > buffer_end;
447
448        if wrap_around {
449            let first_part_len = buffer_end - self.position;
450            // buf[..first_part_len].copy_from_slice(&self.buffer[self.position..]);
451            unsafe {
452                std::ptr::copy_nonoverlapping(
453                    self.buffer.as_ptr().add(self.position),
454                    buf.as_mut_ptr(),
455                    first_part_len,
456                );
457            }
458            self.position = 0;
459            bytes_read += first_part_len;
460        }
461
462        let remaining_len = buf_len - bytes_read;
463        if remaining_len > 0 {
464            // buf[bytes_read..].copy_from_slice(&self.buffer[self.position..self.position + remaining_len]);
465            unsafe {
466                std::ptr::copy_nonoverlapping(
467                    self.buffer.as_ptr().add(self.position),
468                    buf.as_mut_ptr().add(bytes_read),
469                    remaining_len,
470                );
471            }
472            self.position += remaining_len;
473            bytes_read += remaining_len;
474        }
475
476        if self.position == buffer_end {
477            self.position = 0;
478        }
479
480        Ok(bytes_read)
481    }
482}
483
484#[cfg(test)]
485mod tests {
486
487    #[test]
488    fn test_len() {
489        assert_eq!(base_len(0, 10, 16), len(0, 10, 16));
490        assert_eq!(base_len(1, 3, 16), len(1, 3, 16));
491        assert_eq!(base_len(3, 5, 16), len(3, 5, 16));
492        assert_eq!(base_len(4, 4, 16), len(4, 4, 16));
493        assert_eq!(base_len(10, 2, 16), len(10, 2, 16));
494        assert_eq!(base_len(8, 3, 16), len(8, 3, 16));
495        assert_eq!(base_len(9, 0, 16), len(9, 0, 16));
496    }
497
498    fn len(head: usize, tail: usize, buffer_len: usize) -> usize {
499        (tail.wrapping_sub(head)) & (buffer_len - 1)
500    }
501
502    fn base_len(head: usize, tail: usize, buffer_len: usize) -> usize {
503        if tail >= head {
504            tail - head
505        } else {
506            buffer_len - head + tail
507        }
508    }
509
510    #[test]
511    fn test_available_space() {
512        assert_eq!(base_available_space(0, 10, 16), available_space(0, 10, 16));
513        assert_eq!(base_available_space(1, 3, 16), available_space(1, 3, 16));
514        assert_eq!(base_available_space(3, 5, 16), available_space(3, 5, 16));
515        assert_eq!(base_available_space(4, 5, 16), available_space(4, 5, 16));
516        assert_eq!(base_available_space(10, 2, 16), available_space(10, 2, 16));
517        assert_eq!(base_available_space(8, 3, 16), available_space(8, 3, 16));
518        assert_eq!(base_available_space(9, 0, 16), available_space(9, 0, 16));
519    }
520
521    fn base_available_space(head: usize, tail: usize, buffer_len: usize) -> usize {
522        buffer_len - len(head, tail, buffer_len) - 1
523    }
524
525    fn available_space(head: usize, tail: usize, buffer_len: usize) -> usize {
526        (head.wrapping_sub(tail).wrapping_sub(1)) & (buffer_len - 1)
527    }
528}