vertica_rs/
client.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio::sync::Mutex;
4
5use crate::conn::async_conn::AsyncConnection;
6use crate::conn::{ConnectionConfig, ConnectionState};
7use crate::error::Result;
8use crate::query::{QueryResult, Statement, Transaction};
9use crate::types::Value;
10
11/// A client for interacting with Vertica database
12#[derive(Debug, Clone)]
13pub struct Client {
14    /// 用于与 Vertica 数据库进行异步连接的连接对象,使用 Arc 和 Mutex 实现多线程安全
15    pub connection: Arc<Mutex<AsyncConnection>>,
16}
17
18impl Client {
19    /// Create a new client from connection configuration
20    pub async fn new(config: ConnectionConfig) -> Result<Self> {
21        let connection = AsyncConnection::new(config).await?;
22        Ok(Self {
23            connection: Arc::new(Mutex::new(connection)),
24        })
25    }
26
27    /// Create a new client from URL
28    pub async fn from_url(url: &str) -> Result<Self> {
29        let config = ConnectionConfig::from_url(url)?;
30        Self::new(config).await
31    }
32
33    /// Query the database
34    pub async fn query(&self, sql: &str) -> Result<QueryResult> {
35        let mut conn = self.connection.lock().await;
36        let result = conn.simple_query(sql).await?;
37        Ok(result)
38    }
39
40    /// Execute a SQL statement (INSERT, UPDATE, DELETE, etc.)
41    pub async fn execute(&self, sql: &str) -> Result<u64> {
42        let mut conn = self.connection.lock().await;
43        let result = conn.simple_query(sql).await?;
44        Ok(result.affected_rows())
45    }
46
47    /// Prepare a SQL statement
48    pub async fn prepare(&self, query: &str) -> Result<Statement> {
49        let stmt_id = (0..16)
50            .map(|_| fastrand::alphanumeric())
51            .collect::<String>();
52        
53        let mut conn = self.connection.lock().await;
54        conn.prepare(&stmt_id, query).await?;
55        
56        Ok(Statement::new(
57            self.clone(),
58            stmt_id,
59            query.to_string(),
60        ))
61}
62
63    /// Execute a query with parameters
64    pub async fn execute_with_params(&self, query: &str, params: &[Value]) -> Result<u64> {
65        let stmt = self.prepare(query).await?;
66        stmt.execute(params).await
67    }
68
69    /// Execute a prepared statement with parameters
70    pub async fn execute_prepared(&self, stmt: &Statement, params: &[Value]) -> Result<QueryResult> {
71        let mut conn = self.connection.lock().await;
72        conn.execute_prepared(&stmt.sql(), params).await
73    }
74
75    /// Begin a transaction
76    pub async fn transaction(&self) -> Result<Transaction> {
77        let mut conn = self.connection.lock().await;
78        conn.simple_query("BEGIN").await?;
79        Ok(Transaction::new(self.clone()))
80    }
81
82    /// Get connection state
83    pub async fn connection_state(&self) -> ConnectionState {
84        let conn = self.connection.lock().await;
85        conn.state()
86    }
87
88    /// Check if connection is ready
89    pub async fn is_ready(&self) -> bool {
90        let conn = self.connection.lock().await;
91        conn.is_ready()
92    }
93
94    /// Get server parameters
95    pub async fn server_parameters(&self) -> HashMap<String, String> {
96        let conn = self.connection.lock().await;
97        conn.parameters().clone()
98    }
99
100    /// Ping the server to check connectivity
101    pub async fn ping(&self) -> Result<()> {
102        self.query("SELECT 1").await?;
103        Ok(())
104    }
105
106    /// Execute a batch of queries
107    pub async fn execute_batch(&self, queries: &[&str]) -> Result<Vec<QueryResult>> {
108        let mut results = Vec::new();
109        for query in queries {
110            results.push(self.query(query).await?);
111        }
112        Ok(results)
113    }
114
115    /// Close the connection
116    pub async fn close(&self) -> Result<()> {
117        let mut conn = self.connection.lock().await;
118        conn.close().await
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125
126    #[tokio::test]
127    async fn test_client_creation() {
128        let config = ConnectionConfig::from_url("vertica://user:pass@localhost:5433/db").unwrap();
129        let client = Client::new(config).await;
130        assert!(client.is_ok());
131    }
132
133    #[tokio::test]
134    async fn test_client_from_url() {
135        let client = Client::from_url("vertica://user:pass@localhost:5433/db").await;
136        assert!(client.is_ok());
137    }
138
139    #[tokio::test]
140    async fn test_client_operations() {
141        let client = Client::from_url("vertica://user:pass@localhost:5433/db").await;
142        assert!(client.is_ok());
143        
144        let client = client.unwrap();
145        assert!(!client.is_ready().await);
146    }
147}