safa_buffer_pool/context/
multi_thread.rs

1use std::{
2    cmp::min,
3    mem::ManuallyDrop,
4    ops::{Deref, DerefMut, Index, IndexMut},
5    sync::Arc,
6};
7
8use tokio::{sync::Mutex, time::sleep};
9
10use crate::{BufferPool as RawBufferPool, BufferPoolBuilder};
11
12///BufferPool for Multi-thread context
13#[derive(Clone)]
14pub struct BufferPool {
15    inner_arc: Arc<Mutex<RawBufferPool>>,
16}
17
18impl BufferPool {
19    ///Get a new buffer from the pool
20    /// 
21    ///Return None if none buffer available
22    pub async fn get(&self) -> Option<BufferGuard> {
23        let mut pool = self.inner_arc.lock().await;
24
25        if let Some(new_buffer) = pool.all_available_buffer.pop() {
26            let nb_available_buffer = pool.all_available_buffer.len();
27            if nb_available_buffer < pool.min_available_nb_buffer {
28                pool.min_available_nb_buffer = nb_available_buffer;
29            }
30
31            return Some(BufferGuard {
32                pool: self.clone(),
33                buffer: ManuallyDrop::new(new_buffer),
34            });
35        }
36
37        if pool.total_nb_buffer == pool.max_nb_buffer {
38            return None;
39        }
40        pool.total_nb_buffer += 1;
41
42        let buffer_size = pool.buffer_size;
43
44        drop(pool); //Free lock
45
46        Some(BufferGuard {
47            pool: self.clone(),
48            buffer: ManuallyDrop::new(vec![0u8; buffer_size].into_boxed_slice()),
49        })
50    }
51
52    ///Like BufferPoolBuilder.build_multi_thread()
53    pub fn from_builder(builder: &BufferPoolBuilder) -> BufferPool {
54        let mut all_buffer = Vec::with_capacity(builder.max_nb_buffer);
55
56        for _ in 0..builder.min_nb_buffer {
57            all_buffer.push(vec![0u8; builder.buffer_size].into_boxed_slice());
58        }
59
60        let pool = BufferPool {
61            inner_arc: Arc::new(Mutex::new(RawBufferPool {
62                total_nb_buffer: builder.min_nb_buffer,
63
64                max_nb_buffer: builder.max_nb_buffer,
65                min_nb_buffer: builder.min_nb_buffer,
66                buffer_size: builder.buffer_size,
67
68                min_available_nb_buffer: builder.min_nb_buffer,
69
70                all_available_buffer: all_buffer,
71            })),
72        };
73
74        if let Some(over_buffer_lifetime) = builder.over_buffer_lifetime_opt {
75            let pool_weak = Arc::downgrade(&pool.inner_arc);
76            tokio::spawn(async move {
77                let mut pool_destroyed = false;
78                while !pool_destroyed {
79                    sleep(over_buffer_lifetime).await;
80
81                    if let Some(pool_arc) = pool_weak.upgrade() {
82                        let mut pool = pool_arc.lock().await;
83
84                        let total_droppable_buffer = pool.total_nb_buffer - pool.min_nb_buffer;
85                        let nb_drop_buffer =
86                            min(total_droppable_buffer, pool.min_available_nb_buffer);
87
88                        let nb_buffer_to_keep = pool.all_available_buffer.len() - nb_drop_buffer;
89                        pool.total_nb_buffer -= nb_drop_buffer;
90                        pool.all_available_buffer.truncate(nb_buffer_to_keep);
91
92                        pool.min_available_nb_buffer = pool.all_available_buffer.len();
93                    } else {
94                        pool_destroyed = true;
95                    }
96                }
97            });
98        }
99
100        pool
101    }
102}
103
104///A buffer guard for auto-drop of buffer, useable like a buffer
105pub struct BufferGuard {
106    pool: BufferPool,
107    buffer: ManuallyDrop<Box<[u8]>>,
108}
109
110impl Drop for BufferGuard {
111    fn drop(&mut self) {
112        let pool_mtx = self.pool.clone();
113        let buffer = unsafe { ManuallyDrop::take(&mut self.buffer) }; //unsafe without risk
114
115        tokio::spawn(async move {
116            let mut pool = pool_mtx.inner_arc.lock().await;
117            pool.all_available_buffer.push(buffer);
118        });
119    }
120}
121
122impl Deref for BufferGuard {
123    type Target = [u8];
124    fn deref(&self) -> &Self::Target {
125        &self.buffer
126    }
127}
128
129impl DerefMut for BufferGuard {
130    fn deref_mut(&mut self) -> &mut Self::Target {
131        &mut self.buffer
132    }
133}
134
135impl Index<usize> for BufferGuard {
136    type Output = u8;
137    fn index(&self, idx: usize) -> &Self::Output {
138        &self.buffer[idx]
139    }
140}
141impl IndexMut<usize> for BufferGuard {
142    fn index_mut(&mut self, idx: usize) -> &mut Self::Output {
143        &mut self.buffer[idx]
144    }
145}