nntp_proxy/pool/
buffer.rs1use crate::types::BufferSize;
2use crossbeam::queue::SegQueue;
3use std::ops::Deref;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use tracing::info;
7
8pub struct PooledBuffer {
22 buffer: Vec<u8>,
23 initialized: usize,
24 pool: Arc<SegQueue<Vec<u8>>>,
25 pool_size: Arc<AtomicUsize>,
26 max_pool_size: usize,
27}
28
29impl PooledBuffer {
30 #[inline]
32 pub fn capacity(&self) -> usize {
33 self.buffer.len()
34 }
35
36 #[inline]
38 pub fn initialized(&self) -> usize {
39 self.initialized
40 }
41
42 pub async fn read_from<R>(&mut self, reader: &mut R) -> std::io::Result<usize>
44 where
45 R: tokio::io::AsyncReadExt + Unpin,
46 {
47 let n = reader.read(&mut self.buffer[..]).await?;
48 self.initialized = n;
49 Ok(n)
50 }
51
52 #[inline]
57 pub fn copy_from_slice(&mut self, data: &[u8]) {
58 assert!(
59 data.len() <= self.buffer.len(),
60 "data exceeds buffer capacity"
61 );
62 self.buffer[..data.len()].copy_from_slice(data);
63 self.initialized = data.len();
64 }
65}
66
67impl Deref for PooledBuffer {
68 type Target = [u8];
69
70 #[inline]
71 fn deref(&self) -> &Self::Target {
72 &self.buffer[..self.initialized]
74 }
75}
76
77impl AsRef<[u8]> for PooledBuffer {
80 #[inline]
81 fn as_ref(&self) -> &[u8] {
82 &self.buffer[..self.initialized]
84 }
85}
86
87impl Drop for PooledBuffer {
88 fn drop(&mut self) {
89 let mut current_size = self.pool_size.load(Ordering::Relaxed);
91 while current_size < self.max_pool_size {
92 match self.pool_size.compare_exchange_weak(
93 current_size,
94 current_size + 1,
95 Ordering::Relaxed,
96 Ordering::Relaxed,
97 ) {
98 Ok(_) => {
99 let buffer = std::mem::take(&mut self.buffer);
100 self.pool.push(buffer);
101 return;
102 }
103 Err(new_size) => {
104 current_size = new_size;
105 }
106 }
107 }
108 }
110}
111
112#[derive(Debug, Clone)]
115pub struct BufferPool {
116 pool: Arc<SegQueue<Vec<u8>>>,
117 buffer_size: BufferSize,
118 max_pool_size: usize,
119 pool_size: Arc<AtomicUsize>,
120}
121
122impl BufferPool {
123 #[allow(clippy::uninit_vec)]
142 fn create_aligned_buffer(size: usize) -> Vec<u8> {
143 let page_size = 4096;
145 let aligned_size = size.div_ceil(page_size) * page_size;
146
147 let mut buffer = Vec::with_capacity(aligned_size);
149 unsafe {
156 buffer.set_len(size);
157 }
158 buffer
159 }
160
161 #[must_use]
169 pub fn new(buffer_size: BufferSize, max_pool_size: usize) -> Self {
170 let pool = Arc::new(SegQueue::new());
171 let pool_size = Arc::new(AtomicUsize::new(0));
172
173 info!(
175 "Pre-allocating {} buffers of {}KB each ({}MB total)",
176 max_pool_size,
177 buffer_size.get() / 1024,
178 (max_pool_size * buffer_size.get()) / (1024 * 1024)
179 );
180
181 for _ in 0..max_pool_size {
182 let buffer = Self::create_aligned_buffer(buffer_size.get());
183 pool.push(buffer);
184 pool_size.fetch_add(1, Ordering::Relaxed);
185 }
186
187 info!("Buffer pool pre-allocation complete");
188
189 Self {
190 pool,
191 buffer_size,
192 max_pool_size,
193 pool_size,
194 }
195 }
196
197 pub async fn get_buffer(&self) -> PooledBuffer {
205 let buffer = if let Some(buffer) = self.pool.pop() {
206 self.pool_size.fetch_sub(1, Ordering::Relaxed);
207 debug_assert_eq!(buffer.len(), self.buffer_size.get());
209 buffer
210 } else {
211 Self::create_aligned_buffer(self.buffer_size.get())
213 };
214
215 PooledBuffer {
216 buffer,
217 initialized: 0, pool: Arc::clone(&self.pool),
219 pool_size: Arc::clone(&self.pool_size),
220 max_pool_size: self.max_pool_size,
221 }
222 }
223
224 #[allow(dead_code)]
228 pub async fn return_buffer(&self, buffer: Vec<u8>) {
229 if buffer.len() == self.buffer_size.get() {
230 let current_size = self.pool_size.load(Ordering::Relaxed);
231 if current_size < self.max_pool_size {
232 self.pool.push(buffer);
233 self.pool_size.fetch_add(1, Ordering::Relaxed);
234 }
235 }
237 }
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243
244 #[tokio::test]
245 async fn test_buffer_pool_creation() {
246 let pool = BufferPool::new(BufferSize::new(8192).unwrap(), 10);
247
248 let buffer1 = pool.get_buffer().await;
250 assert_eq!(buffer1.capacity(), 8192);
251 assert_eq!(buffer1.initialized(), 0); }
254
255 #[tokio::test]
256 async fn test_buffer_pool_get_and_return() {
257 let pool = BufferPool::new(BufferSize::new(4096).unwrap(), 5);
258
259 let buffer = pool.get_buffer().await;
261 assert_eq!(buffer.capacity(), 4096);
262 assert_eq!(buffer.initialized(), 0);
263
264 drop(buffer);
269
270 let buffer2 = pool.get_buffer().await;
272 assert_eq!(buffer2.capacity(), 4096);
273 }
274
275 #[tokio::test]
276 async fn test_buffer_pool_exhaustion() {
277 let pool = BufferPool::new(BufferSize::new(1024).unwrap(), 2);
278
279 let buf1 = pool.get_buffer().await;
281 let buf2 = pool.get_buffer().await;
282
283 let buf3 = pool.get_buffer().await;
285 assert_eq!(buf3.capacity(), 1024);
286
287 drop(buf1);
289 drop(buf2);
290 drop(buf3);
291 }
292
293 #[tokio::test]
294 async fn test_buffer_pool_concurrent_access() {
295 let pool = BufferPool::new(BufferSize::new(2048).unwrap(), 10);
296
297 let mut handles = vec![];
299
300 for _ in 0..20 {
301 let pool_clone = pool.clone();
302 let handle = tokio::spawn(async move {
303 let buffer = pool_clone.get_buffer().await;
304 assert_eq!(buffer.capacity(), 2048);
305 tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
307 });
308 handles.push(handle);
309 }
310
311 for handle in handles {
313 handle.await.unwrap();
314 }
315 }
316
317 #[tokio::test]
318 async fn test_buffer_alignment() {
319 let pool = BufferPool::new(BufferSize::new(8192).unwrap(), 1);
320 let buffer = pool.get_buffer().await;
321
322 assert!(buffer.capacity() >= 8192);
324 assert_eq!(buffer.capacity() % 4096, 0);
326 }
327
328 #[tokio::test]
329 async fn test_buffer_clear_and_resize() {
330 let pool = BufferPool::new(BufferSize::new(1024).unwrap(), 2);
331
332 let mut buffer = pool.get_buffer().await;
333
334 let data = vec![42u8; 101];
336 buffer.copy_from_slice(&data);
337 assert_eq!(buffer.initialized(), 101);
338
339 drop(buffer);
341
342 let buffer2 = pool.get_buffer().await;
344 assert_eq!(buffer2.capacity(), 1024);
345 }
347
348 #[tokio::test]
349 async fn test_buffer_pool_max_size_enforcement() {
350 let pool = BufferPool::new(BufferSize::new(512).unwrap(), 3);
351
352 let buf1 = pool.get_buffer().await;
354 let buf2 = pool.get_buffer().await;
355 let buf3 = pool.get_buffer().await;
356
357 let buf4 = pool.get_buffer().await;
359
360 drop(buf1);
362 drop(buf2);
363 drop(buf3);
364 drop(buf4);
365
366 }
369
370 #[tokio::test]
371 async fn test_buffer_wrong_size_not_returned() {
372 let pool = BufferPool::new(BufferSize::new(1024).unwrap(), 2);
373
374 let buffer = pool.get_buffer().await;
375 assert_eq!(buffer.capacity(), 1024);
376
377 drop(buffer);
379 }
380
381 #[tokio::test]
382 async fn test_buffer_pool_multiple_get_return_cycles() {
383 let pool = BufferPool::new(BufferSize::new(4096).unwrap(), 5);
384
385 for i in 0..20 {
387 let mut buffer = pool.get_buffer().await;
388 assert_eq!(buffer.capacity(), 4096);
389
390 let data = vec![i as u8; 1];
392 buffer.copy_from_slice(&data);
393 assert_eq!(buffer.initialized(), 1);
394 }
395 }
396
397 #[test]
398 fn test_buffer_pool_clone() {
399 let pool1 = BufferPool::new(BufferSize::new(1024).unwrap(), 5);
400 let _pool2 = pool1.clone();
401
402 }
405
406 #[tokio::test]
407 async fn test_different_buffer_sizes() {
408 let small_pool = BufferPool::new(BufferSize::new(1024).unwrap(), 5);
409 let medium_pool = BufferPool::new(BufferSize::new(8192).unwrap(), 5);
410 let large_pool = BufferPool::new(BufferSize::new(65536).unwrap(), 5);
411
412 let small_buf = small_pool.get_buffer().await;
413 let medium_buf = medium_pool.get_buffer().await;
414 let large_buf = large_pool.get_buffer().await;
415
416 assert_eq!(small_buf.capacity(), 1024);
417 assert_eq!(medium_buf.capacity(), 8192);
418 assert_eq!(large_buf.capacity(), 65536);
419
420 }
422
423 #[tokio::test]
424 async fn test_buffer_pool_stress() {
425 let pool = BufferPool::new(BufferSize::new(4096).unwrap(), 10);
426
427 let mut handles = vec![];
429
430 for _ in 0..100 {
431 let pool_clone = pool.clone();
432 let handle = tokio::spawn(async move {
433 for _ in 0..10 {
434 let buffer = pool_clone.get_buffer().await;
435 assert_eq!(buffer.capacity(), 4096);
436 }
437 });
438 handles.push(handle);
439 }
440
441 for handle in handles {
442 handle.await.unwrap();
443 }
444 }
445}