safa_buffer_pool/context/
multi_thread.rs1use std::{
2 cmp::min,
3 mem::ManuallyDrop,
4 ops::{Deref, DerefMut, Index, IndexMut},
5 sync::Arc,
6};
7
8use tokio::{sync::Mutex, time::sleep};
9
10use crate::{BufferPool as RawBufferPool, BufferPoolBuilder};
11
12#[derive(Clone)]
14pub struct BufferPool {
15 inner_arc: Arc<Mutex<RawBufferPool>>,
16}
17
18impl BufferPool {
19 pub async fn get(&self) -> Option<BufferGuard> {
23 let mut pool = self.inner_arc.lock().await;
24
25 if let Some(new_buffer) = pool.all_available_buffer.pop() {
26 let nb_available_buffer = pool.all_available_buffer.len();
27 if nb_available_buffer < pool.min_available_nb_buffer {
28 pool.min_available_nb_buffer = nb_available_buffer;
29 }
30
31 return Some(BufferGuard {
32 pool: self.clone(),
33 buffer: ManuallyDrop::new(new_buffer),
34 });
35 }
36
37 if pool.total_nb_buffer == pool.max_nb_buffer {
38 return None;
39 }
40 pool.total_nb_buffer += 1;
41
42 let buffer_size = pool.buffer_size;
43
44 drop(pool); Some(BufferGuard {
47 pool: self.clone(),
48 buffer: ManuallyDrop::new(vec![0u8; buffer_size].into_boxed_slice()),
49 })
50 }
51
52 pub fn from_builder(builder: &BufferPoolBuilder) -> BufferPool {
54 let mut all_buffer = Vec::with_capacity(builder.max_nb_buffer);
55
56 for _ in 0..builder.min_nb_buffer {
57 all_buffer.push(vec![0u8; builder.buffer_size].into_boxed_slice());
58 }
59
60 let pool = BufferPool {
61 inner_arc: Arc::new(Mutex::new(RawBufferPool {
62 total_nb_buffer: builder.min_nb_buffer,
63
64 max_nb_buffer: builder.max_nb_buffer,
65 min_nb_buffer: builder.min_nb_buffer,
66 buffer_size: builder.buffer_size,
67
68 min_available_nb_buffer: builder.min_nb_buffer,
69
70 all_available_buffer: all_buffer,
71 })),
72 };
73
74 if let Some(over_buffer_lifetime) = builder.over_buffer_lifetime_opt {
75 let pool_weak = Arc::downgrade(&pool.inner_arc);
76 tokio::spawn(async move {
77 let mut pool_destroyed = false;
78 while !pool_destroyed {
79 sleep(over_buffer_lifetime).await;
80
81 if let Some(pool_arc) = pool_weak.upgrade() {
82 let mut pool = pool_arc.lock().await;
83
84 let total_droppable_buffer = pool.total_nb_buffer - pool.min_nb_buffer;
85 let nb_drop_buffer =
86 min(total_droppable_buffer, pool.min_available_nb_buffer);
87
88 let nb_buffer_to_keep = pool.all_available_buffer.len() - nb_drop_buffer;
89 pool.total_nb_buffer -= nb_drop_buffer;
90 pool.all_available_buffer.truncate(nb_buffer_to_keep);
91
92 pool.min_available_nb_buffer = pool.all_available_buffer.len();
93 } else {
94 pool_destroyed = true;
95 }
96 }
97 });
98 }
99
100 pool
101 }
102}
103
104pub struct BufferGuard {
106 pool: BufferPool,
107 buffer: ManuallyDrop<Box<[u8]>>,
108}
109
110impl Drop for BufferGuard {
111 fn drop(&mut self) {
112 let pool_mtx = self.pool.clone();
113 let buffer = unsafe { ManuallyDrop::take(&mut self.buffer) }; tokio::spawn(async move {
116 let mut pool = pool_mtx.inner_arc.lock().await;
117 pool.all_available_buffer.push(buffer);
118 });
119 }
120}
121
122impl Deref for BufferGuard {
123 type Target = [u8];
124 fn deref(&self) -> &Self::Target {
125 &self.buffer
126 }
127}
128
129impl DerefMut for BufferGuard {
130 fn deref_mut(&mut self) -> &mut Self::Target {
131 &mut self.buffer
132 }
133}
134
135impl Index<usize> for BufferGuard {
136 type Output = u8;
137 fn index(&self, idx: usize) -> &Self::Output {
138 &self.buffer[idx]
139 }
140}
141impl IndexMut<usize> for BufferGuard {
142 fn index_mut(&mut self, idx: usize) -> &mut Self::Output {
143 &mut self.buffer[idx]
144 }
145}