1use std::collections::VecDeque;
4use std::io::{Result, Error, ErrorKind};
5use std::sync::Mutex;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::default::Default;
8
9
10use net::conn;
11use net::config;
12
13
14pub struct ConnectionPool {
17 idle_conns: Mutex<VecDeque<conn::Connection>>,
18 min_conns: usize,
19 max_conns: usize,
20 tmp_conn_allowed: bool,
21 config: config::Config,
22 conns_inuse: AtomicUsize,
23}
24
25impl Default for ConnectionPool {
27 fn default() -> ConnectionPool {
28
29 ConnectionPool {
30 idle_conns: Mutex::new(VecDeque::new()),
31 min_conns: 0,
33 max_conns: 10,
34 tmp_conn_allowed: true,
35 config: Default::default(),
36 conns_inuse: AtomicUsize::new(0),
37 }
38 }
39}
40
41impl ConnectionPool {
43 pub fn new(
45 pool_min_size: usize,
46 pool_max_size: usize,
47 tmp_allowed: bool,
48 conn_config: &config::Config,
49 ) -> ConnectionPool {
50 ConnectionPool {
51 idle_conns: Mutex::new(VecDeque::new()),
52 min_conns: pool_min_size,
53 max_conns: pool_max_size,
54 tmp_conn_allowed: tmp_allowed,
55 config: conn_config.clone(),
56 conns_inuse: AtomicUsize::new(0),
57 }
58 }
59 #[cfg(test)]
60 pub fn idle_conns_count(&self) -> usize {
61 self.idle_conns.lock().unwrap().len()
62
63 }
64 pub fn init(&self) -> bool {
66 self.idle_conns.lock().unwrap().reserve(self.max_conns);
67 for i in 0..self.min_conns {
68 info!("*****Init:Creating connection {}", i);
69 let conn = conn::Connection::connect(&self.config);
70
71 let host: &str = &self.config.server.clone();
72 let port = &self.config.port;
73
74 match conn {
75 Ok(c) => {
76 let id = c.id().clone();
77 self.idle_conns.lock().unwrap().push_back(c);
78 info!(
79 "Connection id:{}, Connecting to server {}:{}",
80 id,
81 host,
82 port
83 );
84 }
85 Err(e) => {
86
87 error!(
88 "Failed to create a connection to {}:{}. Error: {}",
89 host,
90 port,
91 e
92 );
93 return false;
94 }
95 }
96 }
97 return true;
98 }
99
100 pub fn release_all(&self) {
102 info!("release_all called");
103 info!("It should trigger drop connection");
104 self.idle_conns.lock().unwrap().clear();
105 self.conns_inuse.store(0, Ordering::Relaxed);
106 let total_count = self.idle_conns.lock().unwrap().len() +
107 self.conns_inuse.load(Ordering::Relaxed);
108 info!("release_all called: Total_count: {}", total_count);
109 }
110
111
112
113 #[allow(dead_code)]
115 pub fn release(&self, conn: conn::Connection) {
116 let a = self.idle_conns.lock();
117 let conn_inuse = self.conns_inuse.load(Ordering::Relaxed);
118 let id = conn.id().clone();
119 let is_valid = conn.is_valid();
120
121 let idle_count = a.unwrap().len();
122 let total = idle_count + conn_inuse;
123
124 info!(
125 "release(): conn id:{}, min_conn:{}, idle connection: {}, connection in use:{}, total: {}",
126 id,
127 self.min_conns,
128 idle_count,
129 conn_inuse,
130 total
131 );
132
133 if total < self.min_conns && is_valid {
134 info!("Pushing back to ideal_conns");
135 self.idle_conns.lock().unwrap().push_back(conn);
136 self.conns_inuse.fetch_sub(1, Ordering::Relaxed);
137 return;
138 }
139 if !is_valid {
140 self.conns_inuse.fetch_sub(1, Ordering::Relaxed);
141 info!("Connection not valid. It should trigger drop connection");
142 } else {
143 self.conns_inuse.fetch_sub(1, Ordering::Relaxed);
145 info!(
146 "conn id:{}:It should trigger drop connection from inuse",
147 id
148 );
149 }
150 info!(
151 "release() end: Total_count: {}",
152 self.idle_conns.lock().unwrap().len() + self.conns_inuse.load(Ordering::Relaxed)
153 );
154
155 let _ = a;
156 }
157
158 #[allow(unused_variables)]
160 pub fn drop(&self, conn: conn::Connection) {
161 self.conns_inuse.fetch_sub(1, Ordering::Relaxed);
162 warn!(
163 "drop() end: Total_count: {}",
164 self.idle_conns.lock().unwrap().len() + self.conns_inuse.load(Ordering::Relaxed)
165 );
166
167 }
168
169
170 pub fn acquire(&self) -> Result<conn::Connection> {
172
173 let mut conns = self.idle_conns.lock().unwrap();
174 {
175 if !conns.is_empty() {
176 let result = conns.pop_front();
177 if result.is_some() {
178 let conn = result.unwrap();
179 self.conns_inuse.fetch_add(1, Ordering::Relaxed);
181 return Ok(conn);
182 }
183 }
184 info!("Allocating new connection");
185 let total_count = conns.len() + self.conns_inuse.load(Ordering::Relaxed);
186 if total_count >= self.max_conns && self.tmp_conn_allowed == false {
187 return Err(Error::new(
188 ErrorKind::Other,
189 "Max pool size has reached and temporary connections are \
191 not allowed."
192 .to_string(),
193 ));
194
195 }
196 }
197 info!("*****Init:Creating connection..");
198 let conn = conn::Connection::connect(&self.config);
199 match conn {
200 Ok(c) => {
201 info!("New connection id:{}", c.id().clone());
202 self.conns_inuse.fetch_add(1, Ordering::Relaxed);
203 return Ok(c);
204 }
205 Err(e) => {
206 error!("Failed to create a connection : {}", e);
207 return Err(e);
208 }
209 }
210
211 }
212}
213
214#[cfg(test)]
215pub mod tests {
216 use std::io::prelude::*;
217 use std::net::{TcpListener, TcpStream};
218 use std::sync::Arc;
220 use std::sync::mpsc::{channel, Sender, Receiver, TryRecvError};
221 use std::thread;
222 use net::config;
223 use std::str;
224 extern crate env_logger;
228 use std::thread::sleep;
229 use std::time::Duration;
230 #[cfg(test)]
235 #[allow(unused_variables)]
236 pub fn listen_ip4_localhost(port: u16, rx: Receiver<isize>) {
237 let uri = format!("127.0.0.1:{}", port);
238 let acceptor = TcpListener::bind(&*uri).unwrap();
239
240 for stream in acceptor.incoming() {
242
243 match stream {
244 Err(e) => debug!("Accept err {}", e),
245 Ok(stream) => {
246 info!("Got new connection on port {}", port);
247 thread::spawn(move || {
248 let result = handle_client(stream);
249 });
251 }
252 }
253 match rx.try_recv() {
254 Ok(_) => {
255 info!("******Terminating thread with port {}.", port);
256 break;
257 }
258 Err(TryRecvError::Disconnected) => {
259 info!("******Terminating thread with port {}.", port);
260 break;
261 }
262 Err(TryRecvError::Empty) => {}
263 }
264 }
265 drop(acceptor);
266 }
267 #[cfg(test)]
268 #[allow(unused_variables)]
269 fn handle_client(mut stream: TcpStream) -> () {
270
271 let mut buf = [0];
272 loop {
273 let got = stream.read(&mut buf).unwrap();
274 if got == 0 {
275 warn!("handle_client: Received: 0");
276 let result = stream.write("Fail to read\r\n".as_bytes());
277 break;
281 } else {
282 debug!(
283 "handle_client: Received: {}. Sending it back",
284 str::from_utf8(&buf).unwrap()
285 );
286 let result = stream.write(&buf[0..got]);
287 }
291 }
292 }
294 #[cfg(test)]
295 fn next_test_port() -> u16 {
296 use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
297 static NEXT_OFFSET: AtomicUsize = ATOMIC_USIZE_INIT;
298 const BASE_PORT: u16 = 9600;
299 BASE_PORT + NEXT_OFFSET.fetch_add(1, Ordering::Relaxed) as u16
300 }
301
302
303 #[test]
304 fn test_new() {
305 info!("test_new started---------");
306
307 let cfg: config::Config = Default::default();
308 let pool = super::ConnectionPool::new(0, 5, false, &cfg);
309 assert_eq!(pool.idle_conns_count(), 0);
310 sleep(Duration::from_millis(1000));
311 pool.release_all();
312 assert_eq!(pool.idle_conns_count(), 0);
313 info!("test_new ended---------");
314 }
315
316
317
318 #[test]
320 fn test_google() {
321
322 info!("test_lib started---------");
323 let mut cfg: config::Config = Default::default();
324 cfg.port = 80; cfg.server = "google.com".to_string(); let pool = super::ConnectionPool::new(2, 20, true, &cfg);
328 assert_eq!(pool.init(), true);
329 assert_eq!(pool.idle_conns_count(), 2);
330 let mut conn = pool.acquire().unwrap();
331 assert_eq!(conn.is_valid(), true);
332 assert_eq!(pool.idle_conns_count(), 1);
333 conn.writer.write("GET google.com\r\n".as_bytes()).unwrap();
334 conn.writer.flush().unwrap();
335 let mut buffer = String::new();
336 let r = conn.reader.read_line(&mut buffer);
337 if r.unwrap() > 0 {
338 println!("Received {}", buffer);
339 }
340 pool.release(conn);
341 assert_eq!(pool.idle_conns_count(), 1);
342 info!("test_lib ended---------");
343
344 }
345
346 #[test]
347 fn test_init() {
348
349 info!("test_init started---------");
350
351 let mut cfg: config::Config = Default::default();
352 cfg.port = next_test_port();
353 cfg.server = "127.0.0.1".to_string();
354 let listen_port = cfg.port;
355
356
357
358 {
359 info!("test_init starting channel---------");
360 let (tx, rx): (Sender<isize>, Receiver<isize>) = channel();
361 info!("test_init spawning thread---------");
362 thread::spawn(move || {
363 info!(
364 "test_init calling listen_Ip4_localhost with port {}",
365 listen_port
366 );
367 listen_ip4_localhost(listen_port, rx);
368 });
369 sleep(Duration::from_millis(500));
370 info!("test_init starting connection pool");
371 let pool = super::ConnectionPool::new(1, 5, false, &cfg);
372 assert_eq!(pool.init(), true);
373 assert_eq!(pool.idle_conns_count(), 1);
374 let mut c1 = pool.acquire().unwrap();
375 info!("test_init acquire connection");
376 assert_eq!(c1.is_valid(), true);
377 info!("test_init send data");
378 c1.writer.write("GET google.com\r\n".as_bytes()).unwrap();
379 c1.writer.flush().unwrap();
380 info!("reading line");
381 let mut buffer = String::new();
382 let r = c1.reader.read_line(&mut buffer);
383 if r.unwrap() > 0 {
384 println!("Received {}", buffer);
385 }
386 info!("sending 0 to channel");
387 tx.send(0);
388 info!("releasing all");
389 pool.release_all();
390 assert_eq!(pool.idle_conns_count(), 0);
391
392 }
393
394 info!("test_init ended---------");
395
396 }
397
398 #[test]
399 fn test_example() {
400
401 info!("test_example started---------");
402 let mut cfg: config::Config = Default::default();
404 cfg.port = next_test_port();
405 cfg.server = "127.0.0.1".to_string();
406 let listen_port = cfg.port;
407 let (tx, rx): (Sender<isize>, Receiver<isize>) = channel();
408 thread::spawn(move || { listen_ip4_localhost(listen_port, rx); });
409
410 let pool = super::ConnectionPool::new(2, 5, true, &cfg);
411 let pool_shared = Arc::new(pool);
412 let mut ts = Vec::new();
413
414 for _ in 0u32..6 {
415 let pool = pool_shared.clone();
416 let t = thread::spawn(move || {
417 warn!("test_example error---------");
418 let mut conn = pool.acquire().unwrap();
419 warn!("test_example error---------");
420 conn.writer.write("GET google.com\r\n".as_bytes());
421 conn.writer.flush();
422 let mut buffer = String::new();
423 let r = conn.reader.read_line(&mut buffer);
424 match r {
425 Ok(v) => {
426 if v > 0 {
427 println!("Received {}", buffer);
428 }
429 }
430 Err(e) => println!("error : {:?}", e),
431
432 }
433 pool.release(conn);
434 });
435 ts.push(t);
436 }
437 let l = ts.len();
438 for _ in 0..l {
439 let t = ts.pop();
440 t.unwrap().join();
441 }
442 sleep(Duration::from_millis(500));
443 assert_eq!(pool_shared.idle_conns_count(), 1);
444 pool_shared.release_all();
445 assert_eq!(pool_shared.idle_conns_count(), 0);
446 tx.send(0);
447 info!("test_example ended---------");
448 }
449
450 #[test]
451 fn test_acquire_release_1() {
452 let _ = env_logger::init();
453 info!("test_acquire_release started---------");
455 let mut cfg: config::Config = Default::default();
456
457 cfg.port = next_test_port();
458 cfg.server = "127.0.0.1".to_string();
459 let listen_port = cfg.port;
460 let (tx, rx): (Sender<isize>, Receiver<isize>) = channel();
461 thread::spawn(move || { listen_ip4_localhost(listen_port, rx); });
462 sleep(Duration::from_millis(1000));
463 {
464 let pool = super::ConnectionPool::new(2, 2, true, &cfg);
465 assert_eq!(pool.init(), true);
466 assert_eq!(pool.idle_conns_count(), 2);
467
468 let c1 = pool.acquire().unwrap();
469 info!("c1: {}", pool.idle_conns_count());
470 assert_eq!(pool.idle_conns_count(), 1);
471 let c2 = pool.acquire().unwrap();
472 info!("c2: {}", pool.idle_conns_count());
473 assert_eq!(pool.idle_conns_count(), 0);
474 let c3 = pool.acquire().unwrap();
475 pool.release(c1);
476 assert_eq!(pool.idle_conns_count(), 0);
477 pool.release(c2);
478 assert_eq!(pool.idle_conns_count(), 0);
479 pool.release(c3);
480 assert_eq!(pool.idle_conns_count(), 1);
481
482 pool.release_all();
483 assert_eq!(pool.idle_conns_count(), 0);
484 tx.send(0);
485 }
486
487 info!("test_acquire_release ended---------");
488 }
489
490 #[test]
491 fn test_acquire_release_multithread() {
492 info!("test_acquire_release_multithread started---------");
494 let mut cfg: config::Config = Default::default();
495 cfg.port = next_test_port() + 10;
496 cfg.server = "127.0.0.1".to_string();
497 let listen_port = cfg.port;
498 let (tx, rx): (Sender<isize>, Receiver<isize>) = channel();
499 {
500 thread::spawn(move || { listen_ip4_localhost(listen_port, rx); });
501
502 sleep(Duration::from_millis(1000));
503
504 let pool = super::ConnectionPool::new(2, 10, true, &cfg);
505 assert_eq!(pool.init(), true);
506 let pool_shared = Arc::new(pool);
507 for _ in 0u32..10 {
508 let p1 = pool_shared.clone();
509 thread::spawn(move || {
510 let c1 = p1.acquire().unwrap();
511 let c2 = p1.acquire().unwrap();
512 let c3 = p1.acquire().unwrap();
513 p1.release(c1);
514 p1.release(c2);
515 p1.release(c3);
516 });
517 }
518 sleep(Duration::from_millis(2000));
519 assert_eq!(pool_shared.idle_conns_count(), 1);
520 pool_shared.release_all();
521 assert_eq!(pool_shared.idle_conns_count(), 0);
522 tx.send(0);
523 }
524
525 info!("test_acquire_release_multithread ended---------");
527 }
528
529 #[test]
530 fn test_acquire_release_multithread_2() {
531
532 info!("test_acquire_release_multithread_2 started---------");
533 let mut cfg: config::Config = Default::default();
534 cfg.port = next_test_port();
535 cfg.server = "127.0.0.1".to_string();
536 let listen_port = cfg.port;
537 let (tx, rx): (Sender<isize>, Receiver<isize>) = channel();
538 thread::spawn(move || { listen_ip4_localhost(listen_port, rx); });
539 sleep(Duration::from_millis(1000));
540 let pool = super::ConnectionPool::new(2, 3, true, &cfg);
541 assert_eq!(pool.init(), true);
542 let pool_shared = Arc::new(pool);
543 for _ in 0u32..2 {
544 let p1 = pool_shared.clone();
545 thread::spawn(move || {
546 info!("test_acquire_release_multithread_2 acquired connection in thread");
547 let c1 = p1.acquire().unwrap();
548 info!("test_acquire_release_multithread_2 release connection in thread");
549 p1.release(c1);
550
551 });
552 }
553 sleep(Duration::from_millis(500));
554 info!(
555 "test_acquire_release_multithread_2 out of for loop :{}",
556 pool_shared.idle_conns_count()
557 );
558 assert_eq!(pool_shared.idle_conns_count(), 1);
559 pool_shared.release_all();
560 assert_eq!(pool_shared.idle_conns_count(), 0);
561 tx.send(0);
562
563 info!("test_acquire_release_multithread_2 ended---------");
564 }
565
566 #[test]
567 #[cfg(feature = "ssl")]
568 fn test_init_ssl() {
569 info!("test_init_ssl started---------");
570 let mut cfg: config::Config = Default::default();
571 cfg.port = 443;
572 cfg.server = "google.com".to_string();
573 cfg.use_ssl = Some(true);
574 cfg.verify = Some(false);
575 cfg.read_timeout = Some(5_000);
576
577 let pool = super::ConnectionPool::new(2, 5, false, &cfg);
579 assert_eq!(pool.init(), true);
580 let mut conn = pool.acquire().unwrap();
581 assert_eq!(conn.is_valid(), true);
582 warn!("test_example error---------");
583 println!("test_init_ssl: sending GET request");
584 conn.writer.write("GET /index.html\r\n".as_bytes());
585 println!("test_init_ssl: flushing buffer");
586 conn.writer.flush();
587 let mut buffer = String::new();
588 println!("test_init_ssl: reading a line");
591 let r = conn.reader.read_to_string(&mut buffer);
592 println!("test_init_ssl: printing read buffer");
593 match r {
594 Ok(v) => {
595 if v > 0 {
596 println!("test_init_ssl:Received {}", buffer);
597 } else {
598 println!("test_init_ssl:Received {}", v);
599 }
600 }
601 Err(e) => println!("test_init_ssl:error : {:?}", e),
602
603 }
604 println!("test_init_ssl: releasing connection");
605 pool.release(conn);
606
607 pool.release_all();
608 assert_eq!(pool.idle_conns_count(), 0);
609 info!("test_init_ssl ended---------");
610 }
611}