voltdb_client_rust/
pool.rs

1use std::{
2    fmt,
3    sync::{
4        Arc,
5        atomic::{AtomicUsize, Ordering},
6    },
7};
8use std::time::SystemTime;
9
10use crate::{block_for_result, Node, node, NodeOpt, Opts, Value, VoltError, VoltTable};
11
12#[derive(Debug)]
13struct InnerPool {
14    opts: Opts,
15    pool: Vec<Node>,
16}
17
18
19impl InnerPool {
20    pub fn node_sizes(&self) -> usize {
21        return self.opts.0.ip_ports.len();
22    }
23
24    fn to_node_opt(&self, i: usize) -> NodeOpt {
25        return NodeOpt {
26            ip_port: self.opts.0.ip_ports.get(i).cloned().unwrap(),
27            pass: self.opts.0.pass.clone(),
28            user: self.opts.0.user.clone(),
29        };
30    }
31    fn get_node(&mut self, idx: usize) -> &mut Node {
32        return self.pool.get_mut(idx).unwrap();
33    }
34    fn new(size: usize, opts: Opts) -> Result<InnerPool, VoltError> {
35        let mut pool = InnerPool {
36            opts,
37            pool: Vec::with_capacity(size),
38        };
39        let total = pool.node_sizes();
40        for i in 0..size {
41            let z = i % total;
42            pool.new_conn(z)?;
43        }
44        Ok(pool)
45    }
46    fn new_conn(&mut self, idx: usize) -> Result<(), VoltError> {
47        match node::Node::new(self.to_node_opt(idx)) {
48            Ok(conn) => {
49                self.pool.push(conn);
50                Ok(())
51            }
52            Err(err) => Err(err),
53        }
54    }
55}
56
57
58pub struct Pool {
59    size: usize,
60    total: Arc<AtomicUsize>,
61    inner_pool: InnerPool,
62}
63
64impl fmt::Debug for Pool {
65    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66        write!(
67            f,
68            "Pool total: {}, size: {}",
69            self.total.load(Ordering::Relaxed),
70            self.size
71        )
72    }
73}
74
75impl Pool {
76    fn _get_conn(&mut self) -> Result<PooledConn, VoltError> {
77        let total = self.total.fetch_add(1, Ordering::Relaxed);
78        let idx = total % self.size;
79        Ok(PooledConn {
80            created: SystemTime::now(),
81            conn: self.inner_pool.get_node(idx),
82        })
83    }
84
85    pub fn new<T: Into<Opts>>(opts: T) -> Result<Pool, VoltError> {
86        Pool::new_manual(10, opts)
87    }
88
89    pub fn new_manual<T: Into<Opts>>(size: usize, opts: T) -> Result<Pool, VoltError> {
90        let pool = InnerPool::new(size, opts.into())?;
91        Ok(Pool {
92            inner_pool: pool,
93            size,
94            total: Arc::new(AtomicUsize::from(0 as usize)),
95
96        })
97    }
98
99    pub fn get_conn(&mut self) -> Result<PooledConn, VoltError> {
100        self._get_conn()
101    }
102}
103
104#[derive(Debug)]
105pub struct PooledConn<'a> {
106    created: SystemTime,
107    conn: &'a mut Node,
108}
109
110impl<'a> Drop for PooledConn<'a> {
111    fn drop(&mut self) {
112//        let since = SystemTime::now().duration_since(self.created);
113        // TODO record error ,
114        //   println!("used {:?} ", since)
115    }
116}
117
118impl<'a> PooledConn<'a> {
119    pub fn query(&mut self, sql: &str) -> Result<VoltTable, VoltError> {
120        return block_for_result(&self.conn.query(sql)?);
121    }
122    pub fn list_procedures(&mut self) -> Result<VoltTable, VoltError> {
123        return block_for_result(&self.conn.list_procedures()?);
124    }
125    pub fn call_sp(&mut self, query: &str, param: Vec<&dyn Value>) -> Result<VoltTable, VoltError> {
126        return block_for_result(&self.conn.call_sp(query, param)?);
127    }
128    pub fn upload_jar(&mut self, bs: Vec<u8>) -> Result<VoltTable, VoltError> {
129        return block_for_result(&self.conn.upload_jar(bs)?);
130    }
131}