databend_driver/
client.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use once_cell::sync::Lazy;
16use std::collections::BTreeMap;
17use std::path::Path;
18use std::str::FromStr;
19use url::Url;
20
21use crate::conn::IConnection;
22#[cfg(feature = "flight-sql")]
23use crate::flight_sql::FlightSQLConnection;
24use crate::ConnectionInfo;
25use crate::Params;
26
27use databend_client::PresignedResponse;
28use databend_driver_core::error::{Error, Result};
29use databend_driver_core::raw_rows::{RawRow, RawRowIterator};
30use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, ServerStats};
31
32use crate::rest_api::RestAPIConnection;
33
34static VERSION: Lazy<String> = Lazy::new(|| {
35    let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
36    version.to_string()
37});
38
39#[derive(Clone, Copy, Debug, PartialEq)]
40pub enum LoadMethod {
41    Stage,
42    Streaming,
43}
44
45impl FromStr for LoadMethod {
46    type Err = Error;
47
48    fn from_str(s: &str) -> Result<Self, Self::Err> {
49        match s.to_lowercase().as_str() {
50            "stage" => Ok(LoadMethod::Stage),
51            "streaming" => Ok(LoadMethod::Streaming),
52            _ => Err(Error::BadArgument(format!("invalid load method: {s}"))),
53        }
54    }
55}
56
57#[derive(Clone)]
58pub struct Client {
59    dsn: String,
60    name: String,
61}
62
63use crate::conn::Reader;
64
65pub struct Connection {
66    inner: Box<dyn IConnection>,
67}
68
69impl Client {
70    pub fn new(dsn: String) -> Self {
71        let name = format!("databend-driver-rust/{}", VERSION.as_str());
72        Self { dsn, name }
73    }
74
75    pub fn with_name(mut self, name: String) -> Self {
76        self.name = name;
77        self
78    }
79
80    pub async fn get_conn(&self) -> Result<Connection> {
81        let u = Url::parse(&self.dsn)?;
82        match u.scheme() {
83            "databend" | "databend+http" | "databend+https" => {
84                let conn = RestAPIConnection::try_create(&self.dsn, self.name.clone()).await?;
85                Ok(Connection {
86                    inner: Box::new(conn),
87                })
88            }
89            #[cfg(feature = "flight-sql")]
90            "databend+flight" | "databend+grpc" => {
91                let conn = FlightSQLConnection::try_create(&self.dsn, self.name.clone()).await?;
92                Ok(Connection {
93                    inner: Box::new(conn),
94                })
95            }
96            _ => Err(Error::Parsing(format!(
97                "Unsupported scheme: {}",
98                u.scheme()
99            ))),
100        }
101    }
102}
103
104impl Connection {
105    pub fn inner(&self) -> &dyn IConnection {
106        self.inner.as_ref()
107    }
108
109    pub async fn info(&self) -> ConnectionInfo {
110        self.inner.info().await
111    }
112    pub async fn close(&self) -> Result<()> {
113        self.inner.close().await
114    }
115
116    pub fn last_query_id(&self) -> Option<String> {
117        self.inner.last_query_id()
118    }
119
120    pub async fn version(&self) -> Result<String> {
121        self.inner.version().await
122    }
123
124    pub fn format_sql<P: Into<Params> + Send>(&self, sql: &str, params: P) -> String {
125        let params = params.into();
126        params.replace(sql)
127    }
128
129    pub async fn kill_query(&self, query_id: &str) -> Result<()> {
130        self.inner.kill_query(query_id).await
131    }
132
133    pub async fn exec<P: Into<Params> + Send>(&self, sql: &str, params: P) -> Result<i64> {
134        let params = params.into();
135        self.inner.exec(&params.replace(sql)).await
136    }
137    pub async fn query_iter<P: Into<Params> + Send>(
138        &self,
139        sql: &str,
140        params: P,
141    ) -> Result<RowIterator> {
142        let params = params.into();
143        self.inner.query_iter(&params.replace(sql)).await
144    }
145
146    pub async fn query_iter_ext<P: Into<Params> + Send>(
147        &self,
148        sql: &str,
149        params: P,
150    ) -> Result<RowStatsIterator> {
151        let params = params.into();
152        self.inner.query_iter_ext(&params.replace(sql)).await
153    }
154
155    pub async fn query_row<P: Into<Params> + Send>(
156        &self,
157        sql: &str,
158        params: P,
159    ) -> Result<Option<Row>> {
160        let params = params.into();
161        self.inner.query_row(&params.replace(sql)).await
162    }
163
164    pub async fn query_all<P: Into<Params> + Send>(
165        &self,
166        sql: &str,
167        params: P,
168    ) -> Result<Vec<Row>> {
169        let params = params.into();
170        self.inner.query_all(&params.replace(sql)).await
171    }
172
173    // raw data response query, only for test
174    pub async fn query_raw_iter(&self, sql: &str) -> Result<RawRowIterator> {
175        self.inner.query_raw_iter(sql).await
176    }
177
178    // raw data response query, only for test
179    pub async fn query_raw_all(&self, sql: &str) -> Result<Vec<RawRow>> {
180        self.inner.query_raw_all(sql).await
181    }
182
183    /// Get presigned url for a given operation and stage location.
184    /// The operation can be "UPLOAD" or "DOWNLOAD".
185    pub async fn get_presigned_url(
186        &self,
187        operation: &str,
188        stage: &str,
189    ) -> Result<PresignedResponse> {
190        self.inner.get_presigned_url(operation, stage).await
191    }
192
193    pub async fn upload_to_stage(&self, stage: &str, data: Reader, size: u64) -> Result<()> {
194        self.inner.upload_to_stage(stage, data, size).await
195    }
196
197    pub async fn load_data(
198        &self,
199        sql: &str,
200        data: Reader,
201        size: u64,
202        method: LoadMethod,
203    ) -> Result<ServerStats> {
204        self.inner.load_data(sql, data, size, method).await
205    }
206
207    pub async fn load_file(&self, sql: &str, fp: &Path, method: LoadMethod) -> Result<ServerStats> {
208        self.inner.load_file(sql, fp, method).await
209    }
210
211    pub async fn load_file_with_options(
212        &self,
213        sql: &str,
214        fp: &Path,
215        file_format_options: Option<BTreeMap<&str, &str>>,
216        copy_options: Option<BTreeMap<&str, &str>>,
217    ) -> Result<ServerStats> {
218        self.inner
219            .load_file_with_options(sql, fp, file_format_options, copy_options)
220            .await
221    }
222
223    pub async fn stream_load(
224        &self,
225        sql: &str,
226        data: Vec<Vec<&str>>,
227        method: LoadMethod,
228    ) -> Result<ServerStats> {
229        self.inner.stream_load(sql, data, method).await
230    }
231
232    // PUT file://<path_to_file>/<filename> internalStage|externalStage
233    pub async fn put_files(&self, local_file: &str, stage: &str) -> Result<RowStatsIterator> {
234        self.inner.put_files(local_file, stage).await
235    }
236
237    pub async fn get_files(&self, stage: &str, local_file: &str) -> Result<RowStatsIterator> {
238        self.inner.get_files(stage, local_file).await
239    }
240}