media_core/
buffer.rs

1use std::{
2    mem,
3    sync::{
4        atomic::{AtomicUsize, Ordering},
5        Arc, Weak,
6    },
7};
8
9use crossbeam_queue::SegQueue;
10
11pub struct Buffer {
12    data: Box<[u8]>,
13    available: usize,
14    pool: Weak<BufferPool>,
15}
16
17impl Buffer {
18    fn new(data: Box<[u8]>, pool: &Arc<BufferPool>) -> Self {
19        let available = data.len();
20
21        Self {
22            data,
23            available,
24            pool: Arc::downgrade(pool),
25        }
26    }
27
28    fn new_with_available(data: Box<[u8]>, pool: &Arc<BufferPool>, available: usize) -> Self {
29        let available = available.min(data.len());
30
31        Self {
32            data,
33            available,
34            pool: Arc::downgrade(pool),
35        }
36    }
37
38    pub fn capacity(&self) -> usize {
39        self.data.len()
40    }
41
42    pub fn len(&self) -> usize {
43        self.available
44    }
45
46    pub fn is_empty(&self) -> bool {
47        self.available == 0
48    }
49
50    pub fn data(&self) -> &[u8] {
51        &self.data[..self.available]
52    }
53
54    pub fn data_mut(&mut self) -> &mut [u8] {
55        &mut self.data[..self.available]
56    }
57
58    // Resize the buffer to the specified length, not exceeding its capacity
59    pub fn resize(&mut self, len: usize) {
60        self.available = len.min(self.capacity());
61    }
62}
63
64impl Drop for Buffer {
65    fn drop(&mut self) {
66        if let Some(pool) = self.pool.upgrade() {
67            pool.recycle_buffer(Arc::new(Buffer::new(mem::take(&mut self.data), &pool)));
68        }
69    }
70}
71
72pub struct BufferPool {
73    queue: SegQueue<Arc<Buffer>>,
74    buffer_capacity: AtomicUsize,
75}
76
77impl BufferPool {
78    pub fn new(buffer_capacity: usize) -> Arc<Self> {
79        Arc::new(Self {
80            queue: SegQueue::new(),
81            buffer_capacity: AtomicUsize::new(buffer_capacity),
82        })
83    }
84
85    pub fn available(&self) -> usize {
86        self.queue.len()
87    }
88
89    pub fn get_buffer(self: &Arc<Self>) -> Arc<Buffer> {
90        let buffer_capacity = self.buffer_capacity.load(Ordering::Relaxed);
91        if let Some(mut buffer) = self.queue.pop() {
92            if buffer_capacity == buffer.capacity() {
93                if let Some(buffer_mut) = Arc::get_mut(&mut buffer) {
94                    buffer_mut.resize(buffer_capacity);
95                    buffer_mut.data_mut().fill(0);
96                    return buffer;
97                }
98            }
99        }
100
101        Arc::new(Buffer::new(vec![0u8; buffer_capacity].into_boxed_slice(), self))
102    }
103
104    pub fn get_buffer_with_length(self: &Arc<Self>, len: usize) -> Arc<Buffer> {
105        let buffer_capacity = self.buffer_capacity.load(Ordering::Relaxed);
106        let len = len.min(buffer_capacity);
107
108        if let Some(mut buffer) = self.queue.pop() {
109            if buffer_capacity == buffer.capacity() {
110                if let Some(buffer_mut) = Arc::get_mut(&mut buffer) {
111                    buffer_mut.resize(len);
112                    buffer_mut.data_mut().fill(0);
113                    return buffer;
114                }
115            }
116        }
117
118        Arc::new(Buffer::new_with_available(vec![0u8; buffer_capacity].into_boxed_slice(), self, len))
119    }
120
121    pub fn recycle_buffer(&self, buffer: Arc<Buffer>) {
122        if buffer.capacity() == self.buffer_capacity.load(Ordering::Relaxed) {
123            self.queue.push(buffer);
124        }
125    }
126
127    pub fn get_buffer_capacity(&self) -> usize {
128        self.buffer_capacity.load(Ordering::Relaxed)
129    }
130
131    pub fn set_buffer_capacity(&self, buffer_capacity: usize) {
132        self.buffer_capacity.store(buffer_capacity, Ordering::Relaxed);
133        while self.queue.pop().is_some() {}
134    }
135}