net_utils/net/
poolmgr.rs

1//! Connection Pool.
2
3use std::collections::VecDeque;
4use std::io::{Result, Error, ErrorKind};
5use std::sync::Mutex;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::default::Default;
8
9
10use net::conn;
11use net::config;
12
13
14/// ConnectionPool which provide pooling capability for Connection objects
15/// It has support for max number of connections with temporary allowable connections
16pub struct ConnectionPool {
17    idle_conns: Mutex<VecDeque<conn::Connection>>,
18    min_conns: usize,
19    max_conns: usize,
20    tmp_conn_allowed: bool,
21    config: config::Config,
22    conns_inuse: AtomicUsize,
23}
24
25/// Default implementation for  ConnectionPool
26impl Default for ConnectionPool {
27    fn default() -> ConnectionPool {
28
29        ConnectionPool {
30            idle_conns: Mutex::new(VecDeque::new()),
31            // idle_conns: VecDeque::new(),
32            min_conns: 0,
33            max_conns: 10,
34            tmp_conn_allowed: true,
35            config: Default::default(),
36            conns_inuse: AtomicUsize::new(0),
37        }
38    }
39}
40
41/// Connection pool implementation
42impl ConnectionPool {
43    /// New instance
44    pub fn new(
45        pool_min_size: usize,
46        pool_max_size: usize,
47        tmp_allowed: bool,
48        conn_config: &config::Config,
49    ) -> ConnectionPool {
50        ConnectionPool {
51            idle_conns: Mutex::new(VecDeque::new()),
52            min_conns: pool_min_size,
53            max_conns: pool_max_size,
54            tmp_conn_allowed: tmp_allowed,
55            config: conn_config.clone(),
56            conns_inuse: AtomicUsize::new(0),
57        }
58    }
59    #[cfg(test)]
60    pub fn idle_conns_count(&self) -> usize {
61        self.idle_conns.lock().unwrap().len()
62
63    }
64    /// Initial the connection pool
65    pub fn init(&self) -> bool {
66        self.idle_conns.lock().unwrap().reserve(self.max_conns);
67        for i in 0..self.min_conns {
68            info!("*****Init:Creating connection {}", i);
69            let conn = conn::Connection::connect(&self.config);
70
71            let host: &str = &self.config.server.clone();
72            let port = &self.config.port;
73
74            match conn {
75                Ok(c) => {
76                    let id = c.id().clone();
77                    self.idle_conns.lock().unwrap().push_back(c);
78                    info!(
79                        "Connection id:{}, Connecting to server {}:{}",
80                        id,
81                        host,
82                        port
83                    );
84                }
85                Err(e) => {
86
87                    error!(
88                        "Failed to create a connection to {}:{}. Error: {}",
89                        host,
90                        port,
91                        e
92                    );
93                    return false;
94                }
95            }
96        }
97        return true;
98    }
99
100    /// Release all :  Remove all connections  from th pool
101    pub fn release_all(&self) {
102        info!("release_all called");
103        info!("It should trigger drop connection");
104        self.idle_conns.lock().unwrap().clear();
105        self.conns_inuse.store(0, Ordering::Relaxed);
106        let total_count = self.idle_conns.lock().unwrap().len() +
107            self.conns_inuse.load(Ordering::Relaxed);
108        info!("release_all called: Total_count: {}", total_count);
109    }
110
111
112
113    ///Releae connection
114    #[allow(dead_code)]
115    pub fn release(&self, conn: conn::Connection) {
116        let a = self.idle_conns.lock();
117        let conn_inuse = self.conns_inuse.load(Ordering::Relaxed);
118        let id = conn.id().clone();
119        let is_valid = conn.is_valid();
120
121        let idle_count = a.unwrap().len();
122        let total = idle_count + conn_inuse;
123
124        info!(
125            "release(): conn id:{}, min_conn:{}, idle connection: {}, connection in use:{},  total: {}",
126            id,
127            self.min_conns,
128            idle_count,
129            conn_inuse,
130            total
131        );
132
133        if total < self.min_conns && is_valid {
134            info!("Pushing back to ideal_conns");
135            self.idle_conns.lock().unwrap().push_back(conn);
136            self.conns_inuse.fetch_sub(1, Ordering::Relaxed);
137            return;
138        }
139        if !is_valid {
140            self.conns_inuse.fetch_sub(1, Ordering::Relaxed);
141            info!("Connection not valid. It should trigger drop connection");
142        } else {
143            //drop(conn);
144            self.conns_inuse.fetch_sub(1, Ordering::Relaxed);
145            info!(
146                "conn id:{}:It should trigger drop connection from inuse",
147                id
148            );
149        }
150        info!(
151            "release() end: Total_count: {}",
152            self.idle_conns.lock().unwrap().len() + self.conns_inuse.load(Ordering::Relaxed)
153        );
154
155        let _ = a;
156    }
157
158    /// Drop connection.  Use only if disconect.
159    #[allow(unused_variables)]
160    pub fn drop(&self, conn: conn::Connection) {
161        self.conns_inuse.fetch_sub(1, Ordering::Relaxed);
162        warn!(
163            "drop() end: Total_count: {}",
164            self.idle_conns.lock().unwrap().len() + self.conns_inuse.load(Ordering::Relaxed)
165        );
166
167    }
168
169
170    /// Aquire Connection
171    pub fn acquire(&self) -> Result<conn::Connection> {
172
173        let mut conns = self.idle_conns.lock().unwrap();
174        {
175            if !conns.is_empty() {
176                let result = conns.pop_front();
177                if result.is_some() {
178                    let conn = result.unwrap();
179                    // self.inuse_conns.push_back(conn);
180                    self.conns_inuse.fetch_add(1, Ordering::Relaxed);
181                    return Ok(conn);
182                }
183            }
184            info!("Allocating new connection");
185            let total_count = conns.len() + self.conns_inuse.load(Ordering::Relaxed);
186            if total_count >= self.max_conns && self.tmp_conn_allowed == false {
187                return Err(Error::new(
188                    ErrorKind::Other,
189                    // desc: "No connection available",
190                    "Max pool size has reached and temporary connections are \
191                                       not allowed."
192                        .to_string(),
193                ));
194
195            }
196        }
197        info!("*****Init:Creating connection..");
198        let conn = conn::Connection::connect(&self.config);
199        match conn {
200            Ok(c) => {
201                info!("New connection id:{}", c.id().clone());
202                self.conns_inuse.fetch_add(1, Ordering::Relaxed);
203                return Ok(c);
204            }
205            Err(e) => {
206                error!("Failed to create a connection : {}", e);
207                return Err(e);
208            }
209        }
210
211    }
212}
213
214#[cfg(test)]
215pub mod tests {
216    use std::io::prelude::*;
217    use std::net::{TcpListener, TcpStream};
218    // use std::default::Default;
219    use std::sync::Arc;
220    use std::sync::mpsc::{channel, Sender, Receiver, TryRecvError};
221    use std::thread;
222    use net::config;
223    use std::str;
224    // use std::io::{Read, Write};
225    // use std::old_io;
226    // use std::test;
227    extern crate env_logger;
228    use std::thread::sleep;
229    use std::time::Duration;
230    //use std::path::Path;
231
232
233
234    #[cfg(test)]
235    #[allow(unused_variables)]
236    pub fn listen_ip4_localhost(port: u16, rx: Receiver<isize>) {
237        let uri = format!("127.0.0.1:{}", port);
238        let acceptor = TcpListener::bind(&*uri).unwrap();
239
240        // acceptor.set_timeout(Some(1000));
241        for stream in acceptor.incoming() {
242
243            match stream {
244                Err(e) => debug!("Accept err {}", e),
245                Ok(stream) => {
246                    info!("Got new connection on port {}", port);
247                    thread::spawn(move || {
248                        let result = handle_client(stream);
249                        // info!("{}", handle_client(stream).unwrap());
250                    });
251                }
252            }
253            match rx.try_recv() {
254                Ok(_) => {
255                    info!("******Terminating thread with port {}.", port);
256                    break;
257                }
258                Err(TryRecvError::Disconnected) => {
259                    info!("******Terminating thread with port {}.", port);
260                    break;
261                }
262                Err(TryRecvError::Empty) => {}
263            }
264        }
265        drop(acceptor);
266    }
267    #[cfg(test)]
268    #[allow(unused_variables)]
269    fn handle_client(mut stream: TcpStream) -> () {
270
271        let mut buf = [0];
272        loop {
273            let got = stream.read(&mut buf).unwrap();
274            if got == 0 {
275                warn!("handle_client: Received: 0");
276                let result = stream.write("Fail to read\r\n".as_bytes());
277                // Ok(result)
278                // stream.flush().unwrap();
279                // return result;
280                break;
281            } else {
282                debug!(
283                    "handle_client: Received: {}. Sending it back",
284                    str::from_utf8(&buf).unwrap()
285                );
286                let result = stream.write(&buf[0..got]);
287                // return result;
288                //    Ok(result)
289                // break;
290            }
291        }
292        // Ok(())
293    }
294    #[cfg(test)]
295    fn next_test_port() -> u16 {
296        use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
297        static NEXT_OFFSET: AtomicUsize = ATOMIC_USIZE_INIT;
298        const BASE_PORT: u16 = 9600;
299        BASE_PORT + NEXT_OFFSET.fetch_add(1, Ordering::Relaxed) as u16
300    }
301
302
303    #[test]
304    fn test_new() {
305        info!("test_new started---------");
306
307        let cfg: config::Config = Default::default();
308        let pool = super::ConnectionPool::new(0, 5, false, &cfg);
309        assert_eq!(pool.idle_conns_count(), 0);
310        sleep(Duration::from_millis(1000));
311        pool.release_all();
312        assert_eq!(pool.idle_conns_count(), 0);
313        info!("test_new ended---------");
314    }
315
316
317
318    ///test google
319    #[test]
320    fn test_google() {
321
322        info!("test_lib started---------");
323        let mut cfg: config::Config = Default::default();
324        cfg.port = 80; //Some(old_io::test::next_test_port());
325        cfg.server = "google.com".to_string(); //"127.0.0.1".to_string();
326
327        let pool = super::ConnectionPool::new(2, 20, true, &cfg);
328        assert_eq!(pool.init(), true);
329        assert_eq!(pool.idle_conns_count(), 2);
330        let mut conn = pool.acquire().unwrap();
331        assert_eq!(conn.is_valid(), true);
332        assert_eq!(pool.idle_conns_count(), 1);
333        conn.writer.write("GET google.com\r\n".as_bytes()).unwrap();
334        conn.writer.flush().unwrap();
335        let mut buffer = String::new();
336        let r = conn.reader.read_line(&mut buffer);
337        if r.unwrap() > 0 {
338            println!("Received {}", buffer);
339        }
340        pool.release(conn);
341        assert_eq!(pool.idle_conns_count(), 1);
342        info!("test_lib ended---------");
343
344    }
345
346    #[test]
347    fn test_init() {
348
349        info!("test_init started---------");
350
351        let mut cfg: config::Config = Default::default();
352        cfg.port = next_test_port();
353        cfg.server = "127.0.0.1".to_string();
354        let listen_port = cfg.port;
355
356
357
358        {
359            info!("test_init starting channel---------");
360            let (tx, rx): (Sender<isize>, Receiver<isize>) = channel();
361            info!("test_init spawning thread---------");
362            thread::spawn(move || {
363                info!(
364                    "test_init calling listen_Ip4_localhost with port {}",
365                    listen_port
366                );
367                listen_ip4_localhost(listen_port, rx);
368            });
369            sleep(Duration::from_millis(500));
370            info!("test_init starting connection pool");
371            let pool = super::ConnectionPool::new(1, 5, false, &cfg);
372            assert_eq!(pool.init(), true);
373            assert_eq!(pool.idle_conns_count(), 1);
374            let mut c1 = pool.acquire().unwrap();
375            info!("test_init acquire connection");
376            assert_eq!(c1.is_valid(), true);
377            info!("test_init send data");
378            c1.writer.write("GET google.com\r\n".as_bytes()).unwrap();
379            c1.writer.flush().unwrap();
380            info!("reading line");
381            let mut buffer = String::new();
382            let r = c1.reader.read_line(&mut buffer);
383            if r.unwrap() > 0 {
384                println!("Received {}", buffer);
385            }
386            info!("sending 0 to channel");
387            tx.send(0);
388            info!("releasing all");
389            pool.release_all();
390            assert_eq!(pool.idle_conns_count(), 0);
391
392        }
393
394        info!("test_init ended---------");
395
396    }
397
398    #[test]
399    fn test_example() {
400
401        info!("test_example started---------");
402        //   env_logger::init().unwrap();
403        let mut cfg: config::Config = Default::default();
404        cfg.port = next_test_port();
405        cfg.server = "127.0.0.1".to_string();
406        let listen_port = cfg.port;
407        let (tx, rx): (Sender<isize>, Receiver<isize>) = channel();
408        thread::spawn(move || { listen_ip4_localhost(listen_port, rx); });
409
410        let pool = super::ConnectionPool::new(2, 5, true, &cfg);
411        let pool_shared = Arc::new(pool);
412        let mut ts = Vec::new();
413
414        for _ in 0u32..6 {
415            let pool = pool_shared.clone();
416            let t = thread::spawn(move || {
417                warn!("test_example error---------");
418                let mut conn = pool.acquire().unwrap();
419                warn!("test_example error---------");
420                conn.writer.write("GET google.com\r\n".as_bytes());
421                conn.writer.flush();
422                let mut buffer = String::new();
423                let r = conn.reader.read_line(&mut buffer);
424                match r {
425                    Ok(v) => {
426                        if v > 0 {
427                            println!("Received {}", buffer);
428                        }
429                    }
430                    Err(e) => println!("error : {:?}", e),
431
432                }
433                pool.release(conn);
434            });
435            ts.push(t);
436        }
437        let l = ts.len();
438        for _ in 0..l {
439            let t = ts.pop();
440            t.unwrap().join();
441        }
442        sleep(Duration::from_millis(500));
443        assert_eq!(pool_shared.idle_conns_count(), 1);
444        pool_shared.release_all();
445        assert_eq!(pool_shared.idle_conns_count(), 0);
446        tx.send(0);
447        info!("test_example ended---------");
448    }
449
450    #[test]
451    fn test_acquire_release_1() {
452        let _ = env_logger::init();
453        // log::set_logger(Box::new( custlogger::CustLogger { handle: stderr() }) );
454        info!("test_acquire_release started---------");
455        let mut cfg: config::Config = Default::default();
456
457        cfg.port = next_test_port();
458        cfg.server = "127.0.0.1".to_string();
459        let listen_port = cfg.port;
460        let (tx, rx): (Sender<isize>, Receiver<isize>) = channel();
461        thread::spawn(move || { listen_ip4_localhost(listen_port, rx); });
462        sleep(Duration::from_millis(1000));
463        {
464            let pool = super::ConnectionPool::new(2, 2, true, &cfg);
465            assert_eq!(pool.init(), true);
466            assert_eq!(pool.idle_conns_count(), 2);
467
468            let c1 = pool.acquire().unwrap();
469            info!("c1: {}", pool.idle_conns_count());
470            assert_eq!(pool.idle_conns_count(), 1);
471            let c2 = pool.acquire().unwrap();
472            info!("c2: {}", pool.idle_conns_count());
473            assert_eq!(pool.idle_conns_count(), 0);
474            let c3 = pool.acquire().unwrap();
475            pool.release(c1);
476            assert_eq!(pool.idle_conns_count(), 0);
477            pool.release(c2);
478            assert_eq!(pool.idle_conns_count(), 0);
479            pool.release(c3);
480            assert_eq!(pool.idle_conns_count(), 1);
481
482            pool.release_all();
483            assert_eq!(pool.idle_conns_count(), 0);
484            tx.send(0);
485        }
486
487        info!("test_acquire_release ended---------");
488    }
489
490    #[test]
491    fn test_acquire_release_multithread() {
492        //sleep(Duration::from_millis(2000));
493        info!("test_acquire_release_multithread started---------");
494        let mut cfg: config::Config = Default::default();
495        cfg.port = next_test_port() + 10;
496        cfg.server = "127.0.0.1".to_string();
497        let listen_port = cfg.port;
498        let (tx, rx): (Sender<isize>, Receiver<isize>) = channel();
499        {
500            thread::spawn(move || { listen_ip4_localhost(listen_port, rx); });
501
502            sleep(Duration::from_millis(1000));
503
504            let pool = super::ConnectionPool::new(2, 10, true, &cfg);
505            assert_eq!(pool.init(), true);
506            let pool_shared = Arc::new(pool);
507            for _ in 0u32..10 {
508                let p1 = pool_shared.clone();
509                thread::spawn(move || {
510                    let c1 = p1.acquire().unwrap();
511                    let c2 = p1.acquire().unwrap();
512                    let c3 = p1.acquire().unwrap();
513                    p1.release(c1);
514                    p1.release(c2);
515                    p1.release(c3);
516                });
517            }
518            sleep(Duration::from_millis(2000));
519            assert_eq!(pool_shared.idle_conns_count(), 1);
520            pool_shared.release_all();
521            assert_eq!(pool_shared.idle_conns_count(), 0);
522            tx.send(0);
523        }
524
525        // assert_eq!(pool_shared.idle_conns_count(), 10);
526        info!("test_acquire_release_multithread ended---------");
527    }
528
529    #[test]
530    fn test_acquire_release_multithread_2() {
531
532        info!("test_acquire_release_multithread_2 started---------");
533        let mut cfg: config::Config = Default::default();
534        cfg.port = next_test_port();
535        cfg.server = "127.0.0.1".to_string();
536        let listen_port = cfg.port;
537        let (tx, rx): (Sender<isize>, Receiver<isize>) = channel();
538        thread::spawn(move || { listen_ip4_localhost(listen_port, rx); });
539        sleep(Duration::from_millis(1000));
540        let pool = super::ConnectionPool::new(2, 3, true, &cfg);
541        assert_eq!(pool.init(), true);
542        let pool_shared = Arc::new(pool);
543        for _ in 0u32..2 {
544            let p1 = pool_shared.clone();
545            thread::spawn(move || {
546                info!("test_acquire_release_multithread_2 acquired connection in thread");
547                let c1 = p1.acquire().unwrap();
548                info!("test_acquire_release_multithread_2 release connection in thread");
549                p1.release(c1);
550
551            });
552        }
553        sleep(Duration::from_millis(500));
554        info!(
555            "test_acquire_release_multithread_2 out of for loop :{}",
556            pool_shared.idle_conns_count()
557        );
558        assert_eq!(pool_shared.idle_conns_count(), 1);
559        pool_shared.release_all();
560        assert_eq!(pool_shared.idle_conns_count(), 0);
561        tx.send(0);
562
563        info!("test_acquire_release_multithread_2 ended---------");
564    }
565
566    #[test]
567    #[cfg(feature = "ssl")]
568    fn test_init_ssl() {
569        info!("test_init_ssl started---------");
570        let mut cfg: config::Config = Default::default();
571        cfg.port = 443;
572        cfg.server = "google.com".to_string();
573        cfg.use_ssl = Some(true);
574        cfg.verify = Some(false);
575        cfg.read_timeout = Some(5_000);
576
577        // cfg.server = "google.com".to_string();
578        let pool = super::ConnectionPool::new(2, 5, false, &cfg);
579        assert_eq!(pool.init(), true);
580        let mut conn = pool.acquire().unwrap();
581        assert_eq!(conn.is_valid(), true);
582        warn!("test_example error---------");
583        println!("test_init_ssl: sending GET request");
584        conn.writer.write("GET /index.html\r\n".as_bytes());
585        println!("test_init_ssl: flushing buffer");
586        conn.writer.flush();
587        let mut buffer = String::new();
588        //let mut buffer_byte = [0; 10];
589
590        println!("test_init_ssl: reading a line");
591        let r = conn.reader.read_to_string(&mut buffer);
592        println!("test_init_ssl: printing read buffer");
593        match r {
594            Ok(v) => {
595                if v > 0 {
596                    println!("test_init_ssl:Received {}", buffer);
597                } else {
598                    println!("test_init_ssl:Received {}", v);
599                }
600            }
601            Err(e) => println!("test_init_ssl:error : {:?}", e),
602
603        }
604        println!("test_init_ssl: releasing connection");
605        pool.release(conn);
606
607        pool.release_all();
608        assert_eq!(pool.idle_conns_count(), 0);
609        info!("test_init_ssl ended---------");
610    }
611}