voltdb_client_rust/
pool.rs1use std::{
2 fmt,
3 sync::{
4 Arc,
5 atomic::{AtomicUsize, Ordering},
6 },
7};
8use std::time::SystemTime;
9
10use crate::{block_for_result, Node, node, NodeOpt, Opts, Value, VoltError, VoltTable};
11
12#[derive(Debug)]
13struct InnerPool {
14 opts: Opts,
15 pool: Vec<Node>,
16}
17
18
19impl InnerPool {
20 pub fn node_sizes(&self) -> usize {
21 return self.opts.0.ip_ports.len();
22 }
23
24 fn to_node_opt(&self, i: usize) -> NodeOpt {
25 return NodeOpt {
26 ip_port: self.opts.0.ip_ports.get(i).cloned().unwrap(),
27 pass: self.opts.0.pass.clone(),
28 user: self.opts.0.user.clone(),
29 };
30 }
31 fn get_node(&mut self, idx: usize) -> &mut Node {
32 return self.pool.get_mut(idx).unwrap();
33 }
34 fn new(size: usize, opts: Opts) -> Result<InnerPool, VoltError> {
35 let mut pool = InnerPool {
36 opts,
37 pool: Vec::with_capacity(size),
38 };
39 let total = pool.node_sizes();
40 for i in 0..size {
41 let z = i % total;
42 pool.new_conn(z)?;
43 }
44 Ok(pool)
45 }
46 fn new_conn(&mut self, idx: usize) -> Result<(), VoltError> {
47 match node::Node::new(self.to_node_opt(idx)) {
48 Ok(conn) => {
49 self.pool.push(conn);
50 Ok(())
51 }
52 Err(err) => Err(err),
53 }
54 }
55}
56
57
58pub struct Pool {
59 size: usize,
60 total: Arc<AtomicUsize>,
61 inner_pool: InnerPool,
62}
63
64impl fmt::Debug for Pool {
65 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
66 write!(
67 f,
68 "Pool total: {}, size: {}",
69 self.total.load(Ordering::Relaxed),
70 self.size
71 )
72 }
73}
74
75impl Pool {
76 fn _get_conn(&mut self) -> Result<PooledConn, VoltError> {
77 let total = self.total.fetch_add(1, Ordering::Relaxed);
78 let idx = total % self.size;
79 Ok(PooledConn {
80 created: SystemTime::now(),
81 conn: self.inner_pool.get_node(idx),
82 })
83 }
84
85 pub fn new<T: Into<Opts>>(opts: T) -> Result<Pool, VoltError> {
86 Pool::new_manual(10, opts)
87 }
88
89 pub fn new_manual<T: Into<Opts>>(size: usize, opts: T) -> Result<Pool, VoltError> {
90 let pool = InnerPool::new(size, opts.into())?;
91 Ok(Pool {
92 inner_pool: pool,
93 size,
94 total: Arc::new(AtomicUsize::from(0 as usize)),
95
96 })
97 }
98
99 pub fn get_conn(&mut self) -> Result<PooledConn, VoltError> {
100 self._get_conn()
101 }
102}
103
104#[derive(Debug)]
105pub struct PooledConn<'a> {
106 created: SystemTime,
107 conn: &'a mut Node,
108}
109
110impl<'a> Drop for PooledConn<'a> {
111 fn drop(&mut self) {
112}
116}
117
118impl<'a> PooledConn<'a> {
119 pub fn query(&mut self, sql: &str) -> Result<VoltTable, VoltError> {
120 return block_for_result(&self.conn.query(sql)?);
121 }
122 pub fn list_procedures(&mut self) -> Result<VoltTable, VoltError> {
123 return block_for_result(&self.conn.list_procedures()?);
124 }
125 pub fn call_sp(&mut self, query: &str, param: Vec<&dyn Value>) -> Result<VoltTable, VoltError> {
126 return block_for_result(&self.conn.call_sp(query, param)?);
127 }
128 pub fn upload_jar(&mut self, bs: Vec<u8>) -> Result<VoltTable, VoltError> {
129 return block_for_result(&self.conn.upload_jar(bs)?);
130 }
131}