1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
use std::collections::HashMap;
use std::io::{Read, Result as IoResult};
use crate::stream::Stream;
use crate::unit::Unit;
use url::Url;
pub const DEFAULT_HOST: &str = "localhost";
/// Holder of recycled connections.
///
/// *Internal API*
#[derive(Default, Debug)]
pub(crate) struct ConnectionPool {
// the actual pooled connection. however only one per hostname:port.
recycle: HashMap<PoolKey, Stream>,
}
impl ConnectionPool {
pub fn new() -> Self {
ConnectionPool {
..Default::default()
}
}
/// How the unit::connect tries to get a pooled connection.
pub fn try_get_connection(&mut self, url: &Url) -> Option<Stream> {
self.recycle.remove(&PoolKey::new(url))
}
#[cfg(test)]
pub fn len(&self) -> usize {
self.recycle.len()
}
#[cfg(test)]
pub fn get(&self, hostname: &str, port: u16) -> Option<&Stream> {
let key = PoolKey {
hostname: hostname.into(),
port,
};
self.recycle.get(&key)
}
}
#[derive(Debug, PartialEq, Clone, Eq, Hash)]
struct PoolKey {
hostname: String,
port: u16,
}
impl PoolKey {
fn new(url: &Url) -> Self {
let port = if cfg!(test) {
if let Some(p) = url.port_or_known_default() {
Some(p)
} else if url.scheme() == "test" {
Some(42)
} else {
None
}
} else {
url.port_or_known_default()
};
PoolKey {
hostname: url.host_str().unwrap_or(DEFAULT_HOST).into(),
port: port.expect("Failed to get port for pool key"),
}
}
}
/// Read wrapper that returns the stream to the pool once the
/// read is exhausted (reached a 0).
///
/// *Internal API*
pub(crate) struct PoolReturnRead<R: Read + Sized> {
// unit that contains the agent where we want to return the reader.
unit: Option<Unit>,
// pointer to underlying stream.
// this pointer forces the entire PoolReturnRead to be !Sync and !Send
// that's a good thing, because the pool return logic is certainly not
// thread safe.
stream: *mut Stream,
// wrapped reader around the same stream
reader: Option<R>,
}
impl<R: Read + Sized> PoolReturnRead<R> {
pub fn new(unit: Option<Unit>, stream: *mut Stream, reader: R) -> Self {
PoolReturnRead {
unit,
stream,
reader: Some(reader),
}
}
fn return_connection(&mut self) {
// guard we only do this once.
if let Some(unit) = self.unit.take() {
// this frees up the wrapper type around the Stream so
// we can safely bring the stream pointer back.
self.reader.take();
if self.stream.is_null() {
return;
}
let state = &mut unit.agent.lock().unwrap();
// bring back stream here to either go into pool or dealloc
let stream = unsafe { *Box::from_raw(self.stream) };
self.stream = ::std::ptr::null_mut();
if let Some(agent) = state.as_mut() {
if !stream.is_poolable() {
// just let it deallocate
return;
}
// insert back into pool
let key = PoolKey::new(&unit.url);
agent.pool().recycle.insert(key, stream);
}
}
}
fn do_read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
match self.reader.as_mut() {
None => Ok(0),
Some(reader) => reader.read(buf),
}
}
}
impl<R: Read + Sized> Read for PoolReturnRead<R> {
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
let amount = self.do_read(buf)?;
// only if the underlying reader is exhausted can we send a new
// request to the same socket. hence, we only return it now.
if amount == 0 {
self.return_connection();
}
Ok(amount)
}
}
impl<R: Read + Sized> Drop for PoolReturnRead<R> {
fn drop(&mut self) {
self.return_connection();
}
}