1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use crate::Server;
use crate::{Error, ErrorKind};
use lazy_static::lazy_static;
use std::collections::VecDeque;
use std::ops::{Deref, DerefMut, Drop};
use std::sync::{Arc, Mutex};
use tokio::sync::{Semaphore, SemaphorePermit};
const DEFAULT_POOL_SIZE: usize = 100;
lazy_static! {
pub(crate) static ref SERVER_POOL: ServerPool = ServerPool::new(DEFAULT_POOL_SIZE);
}
pub struct ServerGuard {
server: Option<Server>,
_permit: SemaphorePermit<'static>,
}
impl ServerGuard {
fn new(server: Server, _permit: SemaphorePermit<'static>) -> ServerGuard {
ServerGuard {
server: Some(server),
_permit,
}
}
}
impl Deref for ServerGuard {
type Target = Server;
fn deref(&self) -> &Self::Target {
self.server.as_ref().unwrap()
}
}
impl DerefMut for ServerGuard {
fn deref_mut(&mut self) -> &mut Self::Target {
self.server.as_mut().unwrap()
}
}
impl Drop for ServerGuard {
fn drop(&mut self) {
if let Some(server) = self.server.take() {
SERVER_POOL.recycle(server);
}
}
}
pub(crate) struct ServerPool {
max_size: usize,
created: usize,
semaphore: Semaphore,
state: Arc<Mutex<VecDeque<Server>>>,
}
impl ServerPool {
fn new(max_size: usize) -> ServerPool {
let created = 0;
let semaphore = Semaphore::new(max_size);
let state = Arc::new(Mutex::new(VecDeque::new()));
ServerPool {
max_size,
created,
semaphore,
state,
}
}
pub(crate) async fn get_async(&'static self) -> Result<ServerGuard, Error> {
let permit = self
.semaphore
.acquire()
.await
.map_err(|err| Error::new_with_context(ErrorKind::Deadlock, err))?;
let server = if self.created < self.max_size {
Some(Server::try_new_with_port_async(0).await?)
} else {
None
};
let state_mutex = self.state.clone();
let mut state = state_mutex.lock().unwrap();
if let Some(server) = server {
state.push_back(server);
}
if let Some(server) = state.pop_front() {
Ok(ServerGuard::new(server, permit))
} else {
Err(Error::new(ErrorKind::ServerBusy))
}
}
fn recycle(&self, mut server: Server) {
server.reset();
let state_mutex = self.state.clone();
let mut state = state_mutex.lock().unwrap();
state.push_back(server);
}
}