voltdb_client_rust/
pool.rs1use std::time::SystemTime;
2use std::{
3 fmt,
4 sync::{
5 Arc,
6 atomic::{AtomicUsize, Ordering},
7 },
8};
9
10use crate::{Node, NodeOpt, Opts, Value, VoltError, VoltTable, block_for_result, node};
11
12#[derive(Debug)]
13struct InnerPool {
14 opts: Opts,
15 pool: Vec<Node>,
16}
17
18impl InnerPool {
19 pub fn node_sizes(&self) -> usize {
20 self.opts.0.ip_ports.len()
21 }
22
23 fn to_node_opt(&self, i: usize) -> NodeOpt {
24 NodeOpt {
25 ip_port: self.opts.0.ip_ports.get(i).cloned().unwrap(),
26 pass: self.opts.0.pass.clone(),
27 user: self.opts.0.user.clone(),
28 }
29 }
30 fn get_node(&mut self, idx: usize) -> &mut Node {
31 self.pool.get_mut(idx).unwrap()
32 }
33 fn new(size: usize, opts: Opts) -> Result<InnerPool, VoltError> {
34 let mut pool = InnerPool {
35 opts,
36 pool: Vec::with_capacity(size),
37 };
38 let total = pool.node_sizes();
39 for i in 0..size {
40 let z = i % total;
41 pool.new_conn(z)?;
42 }
43 Ok(pool)
44 }
45 fn new_conn(&mut self, idx: usize) -> Result<(), VoltError> {
46 match node::Node::new(self.to_node_opt(idx)) {
47 Ok(conn) => {
48 self.pool.push(conn);
49 Ok(())
50 }
51 Err(err) => Err(err),
52 }
53 }
54}
55
56pub struct Pool {
57 size: usize,
58 total: Arc<AtomicUsize>,
59 inner_pool: InnerPool,
60}
61
62impl fmt::Debug for Pool {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 write!(
65 f,
66 "Pool total: {}, size: {}",
67 self.total.load(Ordering::Relaxed),
68 self.size
69 )
70 }
71}
72
73impl Pool {
74 fn _get_conn(&mut self) -> Result<PooledConn<'_>, VoltError> {
75 let total = self.total.fetch_add(1, Ordering::Relaxed);
76 let idx = total % self.size;
77 Ok(PooledConn {
78 created: SystemTime::now(),
79 conn: self.inner_pool.get_node(idx),
80 })
81 }
82
83 pub fn new<T: Into<Opts>>(opts: T) -> Result<Pool, VoltError> {
84 Pool::new_manual(10, opts)
85 }
86
87 pub fn new_manual<T: Into<Opts>>(size: usize, opts: T) -> Result<Pool, VoltError> {
88 let pool = InnerPool::new(size, opts.into())?;
89 Ok(Pool {
90 inner_pool: pool,
91 size,
92 total: Arc::new(AtomicUsize::from(0_usize)),
93 })
94 }
95
96 pub fn get_conn(&mut self) -> Result<PooledConn<'_>, VoltError> {
97 self._get_conn()
98 }
99}
100
101#[derive(Debug)]
102#[allow(dead_code)]
103pub struct PooledConn<'a> {
104 created: SystemTime,
105 conn: &'a mut Node,
106}
107
108impl<'a> Drop for PooledConn<'a> {
109 fn drop(&mut self) {
110 }
114}
115
116impl<'a> PooledConn<'a> {
117 pub fn query(&mut self, sql: &str) -> Result<VoltTable, VoltError> {
118 block_for_result(&self.conn.query(sql)?)
119 }
120 pub fn list_procedures(&mut self) -> Result<VoltTable, VoltError> {
121 block_for_result(&self.conn.list_procedures()?)
122 }
123 pub fn call_sp(&mut self, query: &str, param: Vec<&dyn Value>) -> Result<VoltTable, VoltError> {
124 block_for_result(&self.conn.call_sp(query, param)?)
125 }
126 pub fn upload_jar(&mut self, bs: Vec<u8>) -> Result<VoltTable, VoltError> {
127 block_for_result(&self.conn.upload_jar(bs)?)
128 }
129}