Skip to main content

media_core/
buffer.rs

1use std::{
2    mem,
3    sync::{
4        atomic::{AtomicUsize, Ordering},
5        Arc, Weak,
6    },
7};
8
9use crossbeam_queue::SegQueue;
10
11const DEFAULT_BUFFER_CAPACITY: usize = 1024;
12
13#[derive(Debug)]
14pub struct Buffer {
15    data: Box<[u8]>,
16    available: usize,
17    pool: Weak<BufferPool>,
18}
19
20impl Buffer {
21    fn new(data: Box<[u8]>, pool: &Arc<BufferPool>) -> Self {
22        let available = data.len();
23
24        Self {
25            data,
26            available,
27            pool: Arc::downgrade(pool),
28        }
29    }
30
31    fn new_with_available(data: Box<[u8]>, pool: &Arc<BufferPool>, available: usize) -> Self {
32        let available = available.min(data.len());
33
34        Self {
35            data,
36            available,
37            pool: Arc::downgrade(pool),
38        }
39    }
40
41    pub fn capacity(&self) -> usize {
42        self.data.len()
43    }
44
45    pub fn len(&self) -> usize {
46        self.available
47    }
48
49    pub fn is_empty(&self) -> bool {
50        self.available == 0
51    }
52
53    pub fn data(&self) -> &[u8] {
54        &self.data[..self.available]
55    }
56
57    pub fn data_mut(&mut self) -> &mut [u8] {
58        &mut self.data[..self.available]
59    }
60
61    // Resize the buffer to the specified length, not exceeding its capacity
62    pub fn resize(&mut self, len: usize) {
63        self.available = len.min(self.capacity());
64    }
65}
66
67impl Drop for Buffer {
68    fn drop(&mut self) {
69        if let Some(pool) = self.pool.upgrade() {
70            pool.recycle_buffer(mem::take(&mut self.data));
71        }
72    }
73}
74
75#[derive(Debug)]
76pub struct BufferPool {
77    queue: SegQueue<Arc<Buffer>>,
78    buffer_capacity: AtomicUsize,
79}
80
81impl BufferPool {
82    pub fn new(buffer_capacity: usize) -> Arc<Self> {
83        let buffer_capacity = if buffer_capacity == 0 {
84            DEFAULT_BUFFER_CAPACITY
85        } else {
86            buffer_capacity
87        };
88
89        Arc::new(Self {
90            queue: SegQueue::new(),
91            buffer_capacity: AtomicUsize::new(buffer_capacity),
92        })
93    }
94
95    pub fn available(&self) -> usize {
96        self.queue.len()
97    }
98
99    pub fn get_buffer(self: &Arc<Self>) -> Arc<Buffer> {
100        let buffer_capacity = self.buffer_capacity.load(Ordering::Acquire);
101        if let Some(mut buffer) = self.queue.pop() {
102            if buffer_capacity == buffer.capacity() {
103                if let Some(buffer_mut) = Arc::get_mut(&mut buffer) {
104                    buffer_mut.resize(buffer_capacity);
105                    return buffer;
106                }
107            }
108        }
109
110        Arc::new(Buffer::new(vec![0u8; buffer_capacity].into_boxed_slice(), self))
111    }
112
113    pub fn get_buffer_with_length(self: &Arc<Self>, len: usize) -> Arc<Buffer> {
114        let mut buffer_capacity = self.buffer_capacity.load(Ordering::Acquire);
115
116        if len > buffer_capacity {
117            self.set_buffer_capacity(len);
118            buffer_capacity = len;
119        }
120
121        if let Some(mut buffer) = self.queue.pop() {
122            if buffer_capacity == buffer.capacity() {
123                if let Some(buffer_mut) = Arc::get_mut(&mut buffer) {
124                    buffer_mut.resize(len);
125                    return buffer;
126                }
127            }
128        }
129
130        Arc::new(Buffer::new_with_available(vec![0u8; buffer_capacity].into_boxed_slice(), self, len))
131    }
132
133    pub fn recycle_buffer(self: &Arc<Self>, buffer_data: Box<[u8]>) {
134        if buffer_data.len() == self.buffer_capacity.load(Ordering::Acquire) {
135            self.queue.push(Arc::new(Buffer::new(buffer_data, self)));
136        }
137    }
138
139    pub fn get_buffer_capacity(&self) -> usize {
140        self.buffer_capacity.load(Ordering::Relaxed)
141    }
142
143    pub fn set_buffer_capacity(&self, buffer_capacity: usize) {
144        self.buffer_capacity.store(buffer_capacity, Ordering::Release);
145        while self.queue.pop().is_some() {}
146    }
147}