voltdb_client_rust/
pool.rs

1use std::time::SystemTime;
2use std::{
3    fmt,
4    sync::{
5        Arc,
6        atomic::{AtomicUsize, Ordering},
7    },
8};
9
10use crate::{Node, NodeOpt, Opts, Value, VoltError, VoltTable, block_for_result, node};
11
12#[derive(Debug)]
13struct InnerPool {
14    opts: Opts,
15    pool: Vec<Node>,
16}
17
18impl InnerPool {
19    pub fn node_sizes(&self) -> usize {
20        self.opts.0.ip_ports.len()
21    }
22
23    fn to_node_opt(&self, i: usize) -> NodeOpt {
24        NodeOpt {
25            ip_port: self.opts.0.ip_ports.get(i).cloned().unwrap(),
26            pass: self.opts.0.pass.clone(),
27            user: self.opts.0.user.clone(),
28        }
29    }
30    fn get_node(&mut self, idx: usize) -> &mut Node {
31        self.pool.get_mut(idx).unwrap()
32    }
33    fn new(size: usize, opts: Opts) -> Result<InnerPool, VoltError> {
34        let mut pool = InnerPool {
35            opts,
36            pool: Vec::with_capacity(size),
37        };
38        let total = pool.node_sizes();
39        for i in 0..size {
40            let z = i % total;
41            pool.new_conn(z)?;
42        }
43        Ok(pool)
44    }
45    fn new_conn(&mut self, idx: usize) -> Result<(), VoltError> {
46        match node::Node::new(self.to_node_opt(idx)) {
47            Ok(conn) => {
48                self.pool.push(conn);
49                Ok(())
50            }
51            Err(err) => Err(err),
52        }
53    }
54}
55
56pub struct Pool {
57    size: usize,
58    total: Arc<AtomicUsize>,
59    inner_pool: InnerPool,
60}
61
62impl fmt::Debug for Pool {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        write!(
65            f,
66            "Pool total: {}, size: {}",
67            self.total.load(Ordering::Relaxed),
68            self.size
69        )
70    }
71}
72
73impl Pool {
74    fn _get_conn(&mut self) -> Result<PooledConn<'_>, VoltError> {
75        let total = self.total.fetch_add(1, Ordering::Relaxed);
76        let idx = total % self.size;
77        Ok(PooledConn {
78            created: SystemTime::now(),
79            conn: self.inner_pool.get_node(idx),
80        })
81    }
82
83    pub fn new<T: Into<Opts>>(opts: T) -> Result<Pool, VoltError> {
84        Pool::new_manual(10, opts)
85    }
86
87    pub fn new_manual<T: Into<Opts>>(size: usize, opts: T) -> Result<Pool, VoltError> {
88        let pool = InnerPool::new(size, opts.into())?;
89        Ok(Pool {
90            inner_pool: pool,
91            size,
92            total: Arc::new(AtomicUsize::from(0_usize)),
93        })
94    }
95
96    pub fn get_conn(&mut self) -> Result<PooledConn<'_>, VoltError> {
97        self._get_conn()
98    }
99}
100
101#[derive(Debug)]
102#[allow(dead_code)]
103pub struct PooledConn<'a> {
104    created: SystemTime,
105    conn: &'a mut Node,
106}
107
108impl<'a> Drop for PooledConn<'a> {
109    fn drop(&mut self) {
110        //        let since = SystemTime::now().duration_since(self.created);
111        // TODO record error ,
112        //   println!("used {:?} ", since)
113    }
114}
115
116impl<'a> PooledConn<'a> {
117    pub fn query(&mut self, sql: &str) -> Result<VoltTable, VoltError> {
118        block_for_result(&self.conn.query(sql)?)
119    }
120    pub fn list_procedures(&mut self) -> Result<VoltTable, VoltError> {
121        block_for_result(&self.conn.list_procedures()?)
122    }
123    pub fn call_sp(&mut self, query: &str, param: Vec<&dyn Value>) -> Result<VoltTable, VoltError> {
124        block_for_result(&self.conn.call_sp(query, param)?)
125    }
126    pub fn upload_jar(&mut self, bs: Vec<u8>) -> Result<VoltTable, VoltError> {
127        block_for_result(&self.conn.upload_jar(bs)?)
128    }
129}