databend_driver/
client.rs1use std::collections::BTreeMap;
16use std::path::Path;
17
18use once_cell::sync::Lazy;
19use url::Url;
20
21use crate::conn::IConnection;
22#[cfg(feature = "flight-sql")]
23use crate::flight_sql::FlightSQLConnection;
24use crate::ConnectionInfo;
25use crate::Params;
26
27use databend_client::PresignedResponse;
28use databend_driver_core::error::{Error, Result};
29use databend_driver_core::raw_rows::{RawRow, RawRowIterator};
30use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, ServerStats};
31
32use crate::rest_api::RestAPIConnection;
33
34static VERSION: Lazy<String> = Lazy::new(|| {
35 let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
36 version.to_string()
37});
38
39#[derive(Clone)]
40pub struct Client {
41 dsn: String,
42 name: String,
43}
44
45use crate::conn::Reader;
46
47pub struct Connection {
48 inner: Box<dyn IConnection>,
49}
50
51impl Client {
52 pub fn new(dsn: String) -> Self {
53 let name = format!("databend-driver-rust/{}", VERSION.as_str());
54 Self { dsn, name }
55 }
56
57 pub fn with_name(mut self, name: String) -> Self {
58 self.name = name;
59 self
60 }
61
62 pub async fn get_conn(&self) -> Result<Connection> {
63 let u = Url::parse(&self.dsn)?;
64 match u.scheme() {
65 "databend" | "databend+http" | "databend+https" => {
66 let conn = RestAPIConnection::try_create(&self.dsn, self.name.clone()).await?;
67 Ok(Connection {
68 inner: Box::new(conn),
69 })
70 }
71 #[cfg(feature = "flight-sql")]
72 "databend+flight" | "databend+grpc" => {
73 let conn = FlightSQLConnection::try_create(&self.dsn, self.name.clone()).await?;
74 Ok(Connection {
75 inner: Box::new(conn),
76 })
77 }
78 _ => Err(Error::Parsing(format!(
79 "Unsupported scheme: {}",
80 u.scheme()
81 ))),
82 }
83 }
84}
85
86impl Connection {
87 pub fn inner(&self) -> &dyn IConnection {
88 self.inner.as_ref()
89 }
90
91 pub async fn info(&self) -> ConnectionInfo {
92 self.inner.info().await
93 }
94 pub async fn close(&self) -> Result<()> {
95 self.inner.close().await
96 }
97
98 pub fn last_query_id(&self) -> Option<String> {
99 self.inner.last_query_id()
100 }
101
102 pub async fn version(&self) -> Result<String> {
103 self.inner.version().await
104 }
105
106 pub fn format_sql<P: Into<Params> + Send>(&self, sql: &str, params: P) -> String {
107 let params = params.into();
108 params.replace(sql)
109 }
110
111 pub async fn kill_query(&self, query_id: &str) -> Result<()> {
112 self.inner.kill_query(query_id).await
113 }
114
115 pub async fn exec<P: Into<Params> + Send>(&self, sql: &str, params: P) -> Result<i64> {
116 let params = params.into();
117 self.inner.exec(¶ms.replace(sql)).await
118 }
119 pub async fn query_iter<P: Into<Params> + Send>(
120 &self,
121 sql: &str,
122 params: P,
123 ) -> Result<RowIterator> {
124 let params = params.into();
125 self.inner.query_iter(¶ms.replace(sql)).await
126 }
127
128 pub async fn query_iter_ext<P: Into<Params> + Send>(
129 &self,
130 sql: &str,
131 params: P,
132 ) -> Result<RowStatsIterator> {
133 let params = params.into();
134 self.inner.query_iter_ext(¶ms.replace(sql)).await
135 }
136
137 pub async fn query_row<P: Into<Params> + Send>(
138 &self,
139 sql: &str,
140 params: P,
141 ) -> Result<Option<Row>> {
142 let params = params.into();
143 self.inner.query_row(¶ms.replace(sql)).await
144 }
145
146 pub async fn query_all<P: Into<Params> + Send>(
147 &self,
148 sql: &str,
149 params: P,
150 ) -> Result<Vec<Row>> {
151 let params = params.into();
152 self.inner.query_all(¶ms.replace(sql)).await
153 }
154
155 pub async fn query_raw_iter(&self, sql: &str) -> Result<RawRowIterator> {
157 self.inner.query_raw_iter(sql).await
158 }
159
160 pub async fn query_raw_all(&self, sql: &str) -> Result<Vec<RawRow>> {
162 self.inner.query_raw_all(sql).await
163 }
164
165 pub async fn get_presigned_url(
168 &self,
169 operation: &str,
170 stage: &str,
171 ) -> Result<PresignedResponse> {
172 self.inner.get_presigned_url(operation, stage).await
173 }
174
175 pub async fn upload_to_stage(&self, stage: &str, data: Reader, size: u64) -> Result<()> {
176 self.inner.upload_to_stage(stage, data, size).await
177 }
178
179 pub async fn load_data(
180 &self,
181 sql: &str,
182 data: Reader,
183 size: u64,
184 file_format_options: Option<BTreeMap<&str, &str>>,
185 copy_options: Option<BTreeMap<&str, &str>>,
186 ) -> Result<ServerStats> {
187 self.inner
188 .load_data(sql, data, size, file_format_options, copy_options)
189 .await
190 }
191
192 pub async fn load_file(
193 &self,
194 sql: &str,
195 fp: &Path,
196 format_options: Option<BTreeMap<&str, &str>>,
197 copy_options: Option<BTreeMap<&str, &str>>,
198 ) -> Result<ServerStats> {
199 self.inner
200 .load_file(sql, fp, format_options, copy_options)
201 .await
202 }
203
204 pub async fn stream_load(&self, sql: &str, data: Vec<Vec<&str>>) -> Result<ServerStats> {
205 self.inner.stream_load(sql, data).await
206 }
207
208 pub async fn put_files(&self, local_file: &str, stage: &str) -> Result<RowStatsIterator> {
210 self.inner.put_files(local_file, stage).await
211 }
212
213 pub async fn get_files(&self, stage: &str, local_file: &str) -> Result<RowStatsIterator> {
214 self.inner.get_files(stage, local_file).await
215 }
216}