voltdb_client_rust/
async_pool.rs1#![cfg(feature = "tokio")]
2use std::fmt;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::time::SystemTime;
6
7use crate::async_node::{AsyncNode, async_block_for_result};
8use crate::node::{NodeOpt, Opts};
9use crate::{Value, VoltError, VoltTable};
10
11struct AsyncInnerPool {
12 opts: Opts,
13 pool: Vec<Arc<AsyncNode>>,
14}
15
16impl AsyncInnerPool {
17 pub fn node_sizes(&self) -> usize {
18 self.opts.0.ip_ports.len()
19 }
20
21 fn to_node_opt(&self, i: usize) -> NodeOpt {
22 NodeOpt {
23 ip_port: self.opts.0.ip_ports.get(i).cloned().unwrap(),
24 pass: self.opts.0.pass.clone(),
25 user: self.opts.0.user.clone(),
26 }
27 }
28
29 fn get_node(&self, idx: usize) -> Arc<AsyncNode> {
30 Arc::clone(self.pool.get(idx).unwrap())
31 }
32
33 async fn new(size: usize, opts: Opts) -> Result<AsyncInnerPool, VoltError> {
34 let mut pool = AsyncInnerPool {
35 opts,
36 pool: Vec::with_capacity(size),
37 };
38 let total = pool.node_sizes();
39 for i in 0..size {
40 let z = i % total;
41 pool.new_conn(z).await?;
42 }
43 Ok(pool)
44 }
45
46 async fn new_conn(&mut self, idx: usize) -> Result<(), VoltError> {
47 let node = AsyncNode::new(self.to_node_opt(idx)).await?;
48 self.pool.push(Arc::new(node));
49 Ok(())
50 }
51}
52
53pub struct AsyncPool {
55 size: usize,
56 total: Arc<AtomicUsize>,
57 inner_pool: AsyncInnerPool,
58}
59
60impl fmt::Debug for AsyncPool {
61 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62 write!(
63 f,
64 "AsyncPool total: {}, size: {}",
65 self.total.load(Ordering::Relaxed),
66 self.size
67 )
68 }
69}
70
71impl AsyncPool {
72 pub async fn new<T: Into<Opts>>(opts: T) -> Result<AsyncPool, VoltError> {
74 AsyncPool::new_manual(10, opts).await
75 }
76
77 pub async fn new_manual<T: Into<Opts>>(size: usize, opts: T) -> Result<AsyncPool, VoltError> {
79 let pool = AsyncInnerPool::new(size, opts.into()).await?;
80 Ok(AsyncPool {
81 inner_pool: pool,
82 size,
83 total: Arc::new(AtomicUsize::from(0)),
84 })
85 }
86
87 pub fn get_conn(&self) -> AsyncPooledConn {
89 let total = self.total.fetch_add(1, Ordering::Relaxed);
90 let idx = total % self.size;
91 AsyncPooledConn {
92 created: SystemTime::now(),
93 conn: self.inner_pool.get_node(idx),
94 }
95 }
96}
97
98pub struct AsyncPooledConn {
100 #[allow(dead_code)]
101 created: SystemTime,
102 conn: Arc<AsyncNode>,
103}
104
105impl fmt::Debug for AsyncPooledConn {
106 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107 write!(f, "AsyncPooledConn created: {:?}", self.created)
108 }
109}
110
111impl AsyncPooledConn {
112 pub async fn query(&self, sql: &str) -> Result<VoltTable, VoltError> {
114 let mut rx = self.conn.query(sql).await?;
115 async_block_for_result(&mut rx).await
116 }
117
118 pub async fn list_procedures(&self) -> Result<VoltTable, VoltError> {
120 let mut rx = self.conn.list_procedures().await?;
121 async_block_for_result(&mut rx).await
122 }
123
124 pub async fn call_sp(
126 &self,
127 query: &str,
128 param: Vec<&dyn Value>,
129 ) -> Result<VoltTable, VoltError> {
130 let mut rx = self.conn.call_sp(query, param).await?;
131 async_block_for_result(&mut rx).await
132 }
133
134 pub async fn upload_jar(&self, bs: Vec<u8>) -> Result<VoltTable, VoltError> {
136 let mut rx = self.conn.upload_jar(bs).await?;
137 async_block_for_result(&mut rx).await
138 }
139}