safa_buffer_pool/context/
multi_thread.rs

1use std::{ mem::ManuallyDrop, ops::{ Deref, DerefMut, Index, IndexMut }, sync::{ Arc, Weak } };
2
3use crate::builder::BufferPoolBuilder;
4
5use super::common::BufferPool as RawBufferPool;
6
7#[cfg(feature = "async")]
8use tokio::{ sync::Mutex, time::sleep };
9#[cfg(not(feature = "async"))]
10use std::{ sync::Mutex, thread::sleep };
11
12///BufferPool for Multi-thread context
13#[derive(Clone)]
14pub struct BufferPool {
15    inner_arc: Arc<Mutex<RawBufferPool>>,
16}
17
18impl BufferPool {
19    ///Get a new buffer from the pool
20    ///
21    ///Return None if none buffer available
22    #[cfg(feature = "async")]
23    pub async fn get(&self) -> Option<BufferGuard> {
24        let mut pool = self.inner_arc.lock().await;
25
26        pool.get().map(|buffer| BufferGuard {
27            pool: self.clone(),
28            buffer,
29        })
30    }
31    #[cfg(not(feature = "async"))]
32    pub fn get(&self) -> Option<BufferGuard> {
33        let mut pool = self.inner_arc.lock().expect("mutex lock error");
34
35        pool.get().map(|buffer| BufferGuard {
36            pool: self.clone(),
37            buffer,
38        })
39    }
40
41    ///Optimize the number of buffer by deleted excess buffer of the pool
42    ///
43    /// Return if the pool was droped
44    #[cfg(feature = "async")]
45    async fn clean_excess_buffer(pool_weak: &Weak<Mutex<RawBufferPool>>) -> bool {
46        if let Some(pool_arc) = pool_weak.upgrade() {
47            let mut pool = pool_arc.lock().await;
48
49            pool.clean_excess_buffer();
50
51            return false;
52        }
53        true
54    }
55    #[cfg(not(feature = "async"))]
56    fn clean_excess_buffer(pool_weak: &Weak<Mutex<RawBufferPool>>) -> bool {
57        if let Some(pool_arc) = pool_weak.upgrade() {
58            let mut pool = pool_arc.lock().expect("mutex lock error");
59
60            pool.clean_excess_buffer();
61
62            return false;
63        }
64        true
65    }
66
67    ///Like BufferPoolBuilder.build_multi_thread()
68    pub fn from_builder(builder: &BufferPoolBuilder) -> Self {
69        let pool = Self {
70            inner_arc: Arc::new(Mutex::new(RawBufferPool::from_builder(builder))),
71        };
72
73        if let Some(over_buffer_lifetime) = builder.over_buffer_lifetime_opt {
74            let pool_weak = Arc::downgrade(&pool.inner_arc);
75
76            #[cfg(feature = "async")]
77            tokio::spawn(async move {
78                loop {
79                    sleep(over_buffer_lifetime).await;
80
81                    if BufferPool::clean_excess_buffer(&pool_weak).await {
82                        return;
83                    }
84                }
85            });
86
87            #[cfg(not(feature = "async"))]
88            std::thread::spawn(move || {
89                loop {
90                    sleep(over_buffer_lifetime);
91
92                    if BufferPool::clean_excess_buffer(&pool_weak) {
93                        return;
94                    }
95                }
96            });
97        }
98
99        pool
100    }
101}
102
103///A buffer guard for auto-drop of buffer, useable like a buffer
104pub struct BufferGuard {
105    pool: BufferPool,
106    buffer: ManuallyDrop<Box<[u8]>>,
107}
108
109impl Drop for BufferGuard {
110    fn drop(&mut self) {
111        let pool_mtx = self.pool.clone();
112        let buffer = unsafe { ManuallyDrop::take(&mut self.buffer) }; //unsafe without risk
113
114        #[cfg(feature = "async")]
115        tokio::spawn(async move {
116            let mut pool = pool_mtx.inner_arc.lock().await;
117            pool.free(buffer);
118        });
119
120        #[cfg(not(feature = "async"))]
121        std::thread::spawn(move || {
122            let mut pool = pool_mtx.inner_arc.lock().expect("mutex lock error");
123            pool.free(buffer);
124        });
125    }
126}
127
128impl Deref for BufferGuard {
129    type Target = [u8];
130    fn deref(&self) -> &Self::Target {
131        &self.buffer
132    }
133}
134
135impl DerefMut for BufferGuard {
136    fn deref_mut(&mut self) -> &mut Self::Target {
137        &mut self.buffer
138    }
139}
140
141impl Index<usize> for BufferGuard {
142    type Output = u8;
143    fn index(&self, idx: usize) -> &Self::Output {
144        &self.buffer[idx]
145    }
146}
147impl IndexMut<usize> for BufferGuard {
148    fn index_mut(&mut self, idx: usize) -> &mut Self::Output {
149        &mut self.buffer[idx]
150    }
151}