voltdb_client_rust/
async_pool.rs

1#![cfg(feature = "tokio")]
2use std::fmt;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::time::SystemTime;
6
7use crate::async_node::{AsyncNode, async_block_for_result};
8use crate::node::{NodeOpt, Opts};
9use crate::{Value, VoltError, VoltTable};
10
11struct AsyncInnerPool {
12    opts: Opts,
13    pool: Vec<Arc<AsyncNode>>,
14}
15
16impl AsyncInnerPool {
17    pub fn node_sizes(&self) -> usize {
18        self.opts.0.ip_ports.len()
19    }
20
21    fn to_node_opt(&self, i: usize) -> NodeOpt {
22        NodeOpt {
23            ip_port: self.opts.0.ip_ports.get(i).cloned().unwrap(),
24            pass: self.opts.0.pass.clone(),
25            user: self.opts.0.user.clone(),
26        }
27    }
28
29    fn get_node(&self, idx: usize) -> Arc<AsyncNode> {
30        Arc::clone(self.pool.get(idx).unwrap())
31    }
32
33    async fn new(size: usize, opts: Opts) -> Result<AsyncInnerPool, VoltError> {
34        let mut pool = AsyncInnerPool {
35            opts,
36            pool: Vec::with_capacity(size),
37        };
38        let total = pool.node_sizes();
39        for i in 0..size {
40            let z = i % total;
41            pool.new_conn(z).await?;
42        }
43        Ok(pool)
44    }
45
46    async fn new_conn(&mut self, idx: usize) -> Result<(), VoltError> {
47        let node = AsyncNode::new(self.to_node_opt(idx)).await?;
48        self.pool.push(Arc::new(node));
49        Ok(())
50    }
51}
52
53/// Async connection pool for VoltDB
54pub struct AsyncPool {
55    size: usize,
56    total: Arc<AtomicUsize>,
57    inner_pool: AsyncInnerPool,
58}
59
60impl fmt::Debug for AsyncPool {
61    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
62        write!(
63            f,
64            "AsyncPool total: {}, size: {}",
65            self.total.load(Ordering::Relaxed),
66            self.size
67        )
68    }
69}
70
71impl AsyncPool {
72    /// Create a new async connection pool with default size (10 connections)
73    pub async fn new<T: Into<Opts>>(opts: T) -> Result<AsyncPool, VoltError> {
74        AsyncPool::new_manual(10, opts).await
75    }
76
77    /// Create a new async connection pool with specified size
78    pub async fn new_manual<T: Into<Opts>>(size: usize, opts: T) -> Result<AsyncPool, VoltError> {
79        let pool = AsyncInnerPool::new(size, opts.into()).await?;
80        Ok(AsyncPool {
81            inner_pool: pool,
82            size,
83            total: Arc::new(AtomicUsize::from(0)),
84        })
85    }
86
87    /// Get a connection from the pool (round-robin)
88    pub fn get_conn(&self) -> AsyncPooledConn {
89        let total = self.total.fetch_add(1, Ordering::Relaxed);
90        let idx = total % self.size;
91        AsyncPooledConn {
92            created: SystemTime::now(),
93            conn: self.inner_pool.get_node(idx),
94        }
95    }
96}
97
98/// A connection borrowed from the async pool
99pub struct AsyncPooledConn {
100    #[allow(dead_code)]
101    created: SystemTime,
102    conn: Arc<AsyncNode>,
103}
104
105impl fmt::Debug for AsyncPooledConn {
106    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
107        write!(f, "AsyncPooledConn created: {:?}", self.created)
108    }
109}
110
111impl AsyncPooledConn {
112    /// Execute an ad-hoc SQL query
113    pub async fn query(&self, sql: &str) -> Result<VoltTable, VoltError> {
114        let mut rx = self.conn.query(sql).await?;
115        async_block_for_result(&mut rx).await
116    }
117
118    /// List all stored procedures
119    pub async fn list_procedures(&self) -> Result<VoltTable, VoltError> {
120        let mut rx = self.conn.list_procedures().await?;
121        async_block_for_result(&mut rx).await
122    }
123
124    /// Call a stored procedure with parameters
125    pub async fn call_sp(
126        &self,
127        query: &str,
128        param: Vec<&dyn Value>,
129    ) -> Result<VoltTable, VoltError> {
130        let mut rx = self.conn.call_sp(query, param).await?;
131        async_block_for_result(&mut rx).await
132    }
133
134    /// Upload a JAR file to VoltDB
135    pub async fn upload_jar(&self, bs: Vec<u8>) -> Result<VoltTable, VoltError> {
136        let mut rx = self.conn.upload_jar(bs).await?;
137        async_block_for_result(&mut rx).await
138    }
139}