Skip to main content

rusty_sockslib/
buffer_pool.rs

1use std::sync::Arc;
2use tokio::sync::{Mutex, MutexGuard};
3
4pub struct BufferPool {
5    buffer_size: usize,
6    buffers: Vec<Arc<Mutex<Vec<u8>>>>,
7}
8
9impl BufferPool {
10    pub fn new(buffer_size: usize) -> Self {
11        BufferPool {
12            buffer_size,
13            buffers: Vec::<Arc<Mutex<Vec<u8>>>>::new(),
14        }
15    }
16
17    pub fn lease(&mut self) -> Buffer {
18        let mut free_buffer_index: Option<usize> = None;
19
20        // Find an unleased buffer.
21        for k in 0..self.buffers.len() {
22            let ref_count = Arc::strong_count(&self.buffers[k]);
23            if ref_count < 2 {
24                free_buffer_index = Some(k);
25                break;
26            }
27        }
28
29        // Or, create a new one.
30        if free_buffer_index.is_none() {
31            free_buffer_index = Some(self.add_buffer());
32        }
33
34        assert_ne!(None, free_buffer_index);
35
36        let index = free_buffer_index.unwrap();
37
38        Buffer::new(self.buffers[index].clone())
39    }
40
41    pub fn leased_count(&self) -> usize {
42        self.buffers.iter().filter(|b| Arc::strong_count(b) >= 2).count()
43    }
44
45    pub fn total_count(&self) -> usize {
46        self.buffers.len()
47    }
48
49    fn add_buffer(&mut self) -> usize {
50        self.buffers.push(Arc::new(Mutex::new(vec![0; self.buffer_size])));
51
52        self.buffers.len() - 1
53    }
54}
55
56pub struct Buffer {
57    buffer: Arc<Mutex<Vec<u8>>>,
58}
59
60impl Buffer {
61    fn new(buffer: Arc<Mutex<Vec<u8>>>) -> Buffer {
62        Buffer { buffer }
63    }
64
65    pub async fn get(&mut self) -> MutexGuard<'_, Vec<u8>> {
66        self.buffer.lock().await
67    }
68}
69
70#[cfg(test)]
71mod tests {
72    use super::*;
73    use pretty_assertions::assert_eq;
74
75    #[test]
76    fn lease_reuses_freed_buffers() {
77        let mut pool = BufferPool::new(1024);
78        assert_eq!(pool.total_count(), 0);
79
80        let a = pool.lease();
81        assert_eq!(pool.leased_count(), 1);
82        assert_eq!(pool.total_count(), 1);
83
84        // A second concurrent lease must allocate a new buffer.
85        let b = pool.lease();
86        assert_eq!(pool.total_count(), 2);
87
88        // Dropping frees them; a subsequent lease reuses rather than growing the pool.
89        drop(a);
90        drop(b);
91        assert_eq!(pool.leased_count(), 0);
92
93        let _c = pool.lease();
94        assert_eq!(pool.leased_count(), 1);
95        assert_eq!(pool.total_count(), 2);
96    }
97
98    #[tokio::test]
99    async fn leased_buffer_has_requested_size() {
100        let mut pool = BufferPool::new(1500);
101        let mut buffer = pool.lease();
102        assert_eq!(buffer.get().await.len(), 1500);
103    }
104}