safa_buffer_pool/context/
multi_thread.rs1use std::{ mem::ManuallyDrop, ops::{ Deref, DerefMut, Index, IndexMut }, sync::{ Arc, Weak } };
2
3use crate::builder::BufferPoolBuilder;
4
5use super::common::BufferPool as RawBufferPool;
6
7#[cfg(feature = "async")]
8use tokio::{ sync::Mutex, time::sleep };
9#[cfg(not(feature = "async"))]
10use std::{ sync::Mutex, thread::sleep };
11
12#[derive(Clone)]
14pub struct BufferPool {
15 inner_arc: Arc<Mutex<RawBufferPool>>,
16}
17
18impl BufferPool {
19 #[cfg(feature = "async")]
23 pub async fn get(&self) -> Option<BufferGuard> {
24 let mut pool = self.inner_arc.lock().await;
25
26 pool.get().map(|buffer| BufferGuard {
27 pool: self.clone(),
28 buffer,
29 })
30 }
31 #[cfg(not(feature = "async"))]
32 pub fn get(&self) -> Option<BufferGuard> {
33 let mut pool = self.inner_arc.lock().expect("mutex lock error");
34
35 pool.get().map(|buffer| BufferGuard {
36 pool: self.clone(),
37 buffer,
38 })
39 }
40
41 #[cfg(feature = "async")]
45 async fn clean_excess_buffer(pool_weak: &Weak<Mutex<RawBufferPool>>) -> bool {
46 if let Some(pool_arc) = pool_weak.upgrade() {
47 let mut pool = pool_arc.lock().await;
48
49 pool.clean_excess_buffer();
50
51 return false;
52 }
53 true
54 }
55 #[cfg(not(feature = "async"))]
56 fn clean_excess_buffer(pool_weak: &Weak<Mutex<RawBufferPool>>) -> bool {
57 if let Some(pool_arc) = pool_weak.upgrade() {
58 let mut pool = pool_arc.lock().expect("mutex lock error");
59
60 pool.clean_excess_buffer();
61
62 return false;
63 }
64 true
65 }
66
67 pub fn from_builder(builder: &BufferPoolBuilder) -> Self {
69 let pool = Self {
70 inner_arc: Arc::new(Mutex::new(RawBufferPool::from_builder(builder))),
71 };
72
73 if let Some(over_buffer_lifetime) = builder.over_buffer_lifetime_opt {
74 let pool_weak = Arc::downgrade(&pool.inner_arc);
75
76 #[cfg(feature = "async")]
77 tokio::spawn(async move {
78 loop {
79 sleep(over_buffer_lifetime).await;
80
81 if BufferPool::clean_excess_buffer(&pool_weak).await {
82 return;
83 }
84 }
85 });
86
87 #[cfg(not(feature = "async"))]
88 std::thread::spawn(move || {
89 loop {
90 sleep(over_buffer_lifetime);
91
92 if BufferPool::clean_excess_buffer(&pool_weak) {
93 return;
94 }
95 }
96 });
97 }
98
99 pool
100 }
101}
102
103pub struct BufferGuard {
105 pool: BufferPool,
106 buffer: ManuallyDrop<Box<[u8]>>,
107}
108
109impl Drop for BufferGuard {
110 fn drop(&mut self) {
111 let pool_mtx = self.pool.clone();
112 let buffer = unsafe { ManuallyDrop::take(&mut self.buffer) }; #[cfg(feature = "async")]
115 tokio::spawn(async move {
116 let mut pool = pool_mtx.inner_arc.lock().await;
117 pool.free(buffer);
118 });
119
120 #[cfg(not(feature = "async"))]
121 std::thread::spawn(move || {
122 let mut pool = pool_mtx.inner_arc.lock().expect("mutex lock error");
123 pool.free(buffer);
124 });
125 }
126}
127
128impl Deref for BufferGuard {
129 type Target = [u8];
130 fn deref(&self) -> &Self::Target {
131 &self.buffer
132 }
133}
134
135impl DerefMut for BufferGuard {
136 fn deref_mut(&mut self) -> &mut Self::Target {
137 &mut self.buffer
138 }
139}
140
141impl Index<usize> for BufferGuard {
142 type Output = u8;
143 fn index(&self, idx: usize) -> &Self::Output {
144 &self.buffer[idx]
145 }
146}
147impl IndexMut<usize> for BufferGuard {
148 fn index_mut(&mut self, idx: usize) -> &mut Self::Output {
149 &mut self.buffer[idx]
150 }
151}