1use once_cell::sync::Lazy;
16use std::collections::BTreeMap;
17use std::path::Path;
18use std::str::FromStr;
19use url::Url;
20
21use crate::conn::IConnection;
22#[cfg(feature = "flight-sql")]
23use crate::flight_sql::FlightSQLConnection;
24use crate::ConnectionInfo;
25use crate::Params;
26
27use databend_client::PresignedResponse;
28use databend_driver_core::error::{Error, Result};
29use databend_driver_core::raw_rows::{RawRow, RawRowIterator};
30use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, ServerStats};
31
32use crate::rest_api::RestAPIConnection;
33
34static VERSION: Lazy<String> = Lazy::new(|| {
35 let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
36 version.to_string()
37});
38
39#[derive(Clone, Copy, Debug, PartialEq)]
40pub enum LoadMethod {
41 Stage,
42 Streaming,
43}
44
45impl FromStr for LoadMethod {
46 type Err = Error;
47
48 fn from_str(s: &str) -> Result<Self, Self::Err> {
49 match s.to_lowercase().as_str() {
50 "stage" => Ok(LoadMethod::Stage),
51 "streaming" => Ok(LoadMethod::Streaming),
52 _ => Err(Error::BadArgument(format!("invalid load method: {s}"))),
53 }
54 }
55}
56
57#[derive(Clone)]
58pub struct Client {
59 dsn: String,
60 name: String,
61}
62
63use crate::conn::Reader;
64
65pub struct Connection {
66 inner: Box<dyn IConnection>,
67}
68
69impl Client {
70 pub fn new(dsn: String) -> Self {
71 let name = format!("databend-driver-rust/{}", VERSION.as_str());
72 Self { dsn, name }
73 }
74
75 pub fn with_name(mut self, name: String) -> Self {
76 self.name = name;
77 self
78 }
79
80 pub async fn get_conn(&self) -> Result<Connection> {
81 let u = Url::parse(&self.dsn)?;
82 match u.scheme() {
83 "databend" | "databend+http" | "databend+https" => {
84 let conn = RestAPIConnection::try_create(&self.dsn, self.name.clone()).await?;
85 Ok(Connection {
86 inner: Box::new(conn),
87 })
88 }
89 #[cfg(feature = "flight-sql")]
90 "databend+flight" | "databend+grpc" => {
91 let conn = FlightSQLConnection::try_create(&self.dsn, self.name.clone()).await?;
92 Ok(Connection {
93 inner: Box::new(conn),
94 })
95 }
96 _ => Err(Error::Parsing(format!(
97 "Unsupported scheme: {}",
98 u.scheme()
99 ))),
100 }
101 }
102}
103
104impl Connection {
105 pub fn inner(&self) -> &dyn IConnection {
106 self.inner.as_ref()
107 }
108
109 pub async fn info(&self) -> ConnectionInfo {
110 self.inner.info().await
111 }
112 pub async fn close(&self) -> Result<()> {
113 self.inner.close().await
114 }
115
116 pub fn last_query_id(&self) -> Option<String> {
117 self.inner.last_query_id()
118 }
119
120 pub async fn version(&self) -> Result<String> {
121 self.inner.version().await
122 }
123
124 pub fn format_sql<P: Into<Params> + Send>(&self, sql: &str, params: P) -> String {
125 let params = params.into();
126 params.replace(sql)
127 }
128
129 pub async fn kill_query(&self, query_id: &str) -> Result<()> {
130 self.inner.kill_query(query_id).await
131 }
132
133 pub async fn exec<P: Into<Params> + Send>(&self, sql: &str, params: P) -> Result<i64> {
134 let params = params.into();
135 self.inner.exec(¶ms.replace(sql)).await
136 }
137 pub async fn query_iter<P: Into<Params> + Send>(
138 &self,
139 sql: &str,
140 params: P,
141 ) -> Result<RowIterator> {
142 let params = params.into();
143 self.inner.query_iter(¶ms.replace(sql)).await
144 }
145
146 pub async fn query_iter_ext<P: Into<Params> + Send>(
147 &self,
148 sql: &str,
149 params: P,
150 ) -> Result<RowStatsIterator> {
151 let params = params.into();
152 self.inner.query_iter_ext(¶ms.replace(sql)).await
153 }
154
155 pub async fn query_row<P: Into<Params> + Send>(
156 &self,
157 sql: &str,
158 params: P,
159 ) -> Result<Option<Row>> {
160 let params = params.into();
161 self.inner.query_row(¶ms.replace(sql)).await
162 }
163
164 pub async fn query_all<P: Into<Params> + Send>(
165 &self,
166 sql: &str,
167 params: P,
168 ) -> Result<Vec<Row>> {
169 let params = params.into();
170 self.inner.query_all(¶ms.replace(sql)).await
171 }
172
173 pub async fn query_raw_iter(&self, sql: &str) -> Result<RawRowIterator> {
175 self.inner.query_raw_iter(sql).await
176 }
177
178 pub async fn query_raw_all(&self, sql: &str) -> Result<Vec<RawRow>> {
180 self.inner.query_raw_all(sql).await
181 }
182
183 pub async fn get_presigned_url(
186 &self,
187 operation: &str,
188 stage: &str,
189 ) -> Result<PresignedResponse> {
190 self.inner.get_presigned_url(operation, stage).await
191 }
192
193 pub async fn upload_to_stage(&self, stage: &str, data: Reader, size: u64) -> Result<()> {
194 self.inner.upload_to_stage(stage, data, size).await
195 }
196
197 pub async fn load_data(
198 &self,
199 sql: &str,
200 data: Reader,
201 size: u64,
202 method: LoadMethod,
203 ) -> Result<ServerStats> {
204 self.inner.load_data(sql, data, size, method).await
205 }
206
207 pub async fn load_file(&self, sql: &str, fp: &Path, method: LoadMethod) -> Result<ServerStats> {
208 self.inner.load_file(sql, fp, method).await
209 }
210
211 pub async fn load_file_with_options(
212 &self,
213 sql: &str,
214 fp: &Path,
215 file_format_options: Option<BTreeMap<&str, &str>>,
216 copy_options: Option<BTreeMap<&str, &str>>,
217 ) -> Result<ServerStats> {
218 self.inner
219 .load_file_with_options(sql, fp, file_format_options, copy_options)
220 .await
221 }
222
223 pub async fn stream_load(
224 &self,
225 sql: &str,
226 data: Vec<Vec<&str>>,
227 method: LoadMethod,
228 ) -> Result<ServerStats> {
229 self.inner.stream_load(sql, data, method).await
230 }
231
232 pub async fn put_files(&self, local_file: &str, stage: &str) -> Result<RowStatsIterator> {
234 self.inner.put_files(local_file, stage).await
235 }
236
237 pub async fn get_files(&self, stage: &str, local_file: &str) -> Result<RowStatsIterator> {
238 self.inner.get_files(stage, local_file).await
239 }
240}