databend_driver/
client.rs

1// Copyright 2021 Datafuse Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::path::Path;
17
18use once_cell::sync::Lazy;
19use url::Url;
20
21use crate::conn::IConnection;
22#[cfg(feature = "flight-sql")]
23use crate::flight_sql::FlightSQLConnection;
24use crate::ConnectionInfo;
25use crate::Params;
26
27use databend_client::PresignedResponse;
28use databend_driver_core::error::{Error, Result};
29use databend_driver_core::raw_rows::{RawRow, RawRowIterator};
30use databend_driver_core::rows::{Row, RowIterator, RowStatsIterator, ServerStats};
31
32use crate::rest_api::RestAPIConnection;
33
34static VERSION: Lazy<String> = Lazy::new(|| {
35    let version = option_env!("CARGO_PKG_VERSION").unwrap_or("unknown");
36    version.to_string()
37});
38
39#[derive(Clone)]
40pub struct Client {
41    dsn: String,
42    name: String,
43}
44
45use crate::conn::Reader;
46
47pub struct Connection {
48    inner: Box<dyn IConnection>,
49}
50
51impl Client {
52    pub fn new(dsn: String) -> Self {
53        let name = format!("databend-driver-rust/{}", VERSION.as_str());
54        Self { dsn, name }
55    }
56
57    pub fn with_name(mut self, name: String) -> Self {
58        self.name = name;
59        self
60    }
61
62    pub async fn get_conn(&self) -> Result<Connection> {
63        let u = Url::parse(&self.dsn)?;
64        match u.scheme() {
65            "databend" | "databend+http" | "databend+https" => {
66                let conn = RestAPIConnection::try_create(&self.dsn, self.name.clone()).await?;
67                Ok(Connection {
68                    inner: Box::new(conn),
69                })
70            }
71            #[cfg(feature = "flight-sql")]
72            "databend+flight" | "databend+grpc" => {
73                let conn = FlightSQLConnection::try_create(&self.dsn, self.name.clone()).await?;
74                Ok(Connection {
75                    inner: Box::new(conn),
76                })
77            }
78            _ => Err(Error::Parsing(format!(
79                "Unsupported scheme: {}",
80                u.scheme()
81            ))),
82        }
83    }
84}
85
86impl Connection {
87    pub fn inner(&self) -> &dyn IConnection {
88        self.inner.as_ref()
89    }
90
91    pub async fn info(&self) -> ConnectionInfo {
92        self.inner.info().await
93    }
94    pub async fn close(&self) -> Result<()> {
95        self.inner.close().await
96    }
97
98    pub fn last_query_id(&self) -> Option<String> {
99        self.inner.last_query_id()
100    }
101
102    pub async fn version(&self) -> Result<String> {
103        self.inner.version().await
104    }
105
106    pub fn format_sql<P: Into<Params> + Send>(&self, sql: &str, params: P) -> String {
107        let params = params.into();
108        params.replace(sql)
109    }
110
111    pub async fn kill_query(&self, query_id: &str) -> Result<()> {
112        self.inner.kill_query(query_id).await
113    }
114
115    pub async fn exec<P: Into<Params> + Send>(&self, sql: &str, params: P) -> Result<i64> {
116        let params = params.into();
117        self.inner.exec(&params.replace(sql)).await
118    }
119    pub async fn query_iter<P: Into<Params> + Send>(
120        &self,
121        sql: &str,
122        params: P,
123    ) -> Result<RowIterator> {
124        let params = params.into();
125        self.inner.query_iter(&params.replace(sql)).await
126    }
127
128    pub async fn query_iter_ext<P: Into<Params> + Send>(
129        &self,
130        sql: &str,
131        params: P,
132    ) -> Result<RowStatsIterator> {
133        let params = params.into();
134        self.inner.query_iter_ext(&params.replace(sql)).await
135    }
136
137    pub async fn query_row<P: Into<Params> + Send>(
138        &self,
139        sql: &str,
140        params: P,
141    ) -> Result<Option<Row>> {
142        let params = params.into();
143        self.inner.query_row(&params.replace(sql)).await
144    }
145
146    pub async fn query_all<P: Into<Params> + Send>(
147        &self,
148        sql: &str,
149        params: P,
150    ) -> Result<Vec<Row>> {
151        let params = params.into();
152        self.inner.query_all(&params.replace(sql)).await
153    }
154
155    // raw data response query, only for test
156    pub async fn query_raw_iter(&self, sql: &str) -> Result<RawRowIterator> {
157        self.inner.query_raw_iter(sql).await
158    }
159
160    // raw data response query, only for test
161    pub async fn query_raw_all(&self, sql: &str) -> Result<Vec<RawRow>> {
162        self.inner.query_raw_all(sql).await
163    }
164
165    /// Get presigned url for a given operation and stage location.
166    /// The operation can be "UPLOAD" or "DOWNLOAD".
167    pub async fn get_presigned_url(
168        &self,
169        operation: &str,
170        stage: &str,
171    ) -> Result<PresignedResponse> {
172        self.inner.get_presigned_url(operation, stage).await
173    }
174
175    pub async fn upload_to_stage(&self, stage: &str, data: Reader, size: u64) -> Result<()> {
176        self.inner.upload_to_stage(stage, data, size).await
177    }
178
179    pub async fn load_data(
180        &self,
181        sql: &str,
182        data: Reader,
183        size: u64,
184        file_format_options: Option<BTreeMap<&str, &str>>,
185        copy_options: Option<BTreeMap<&str, &str>>,
186    ) -> Result<ServerStats> {
187        self.inner
188            .load_data(sql, data, size, file_format_options, copy_options)
189            .await
190    }
191
192    pub async fn load_file(
193        &self,
194        sql: &str,
195        fp: &Path,
196        format_options: Option<BTreeMap<&str, &str>>,
197        copy_options: Option<BTreeMap<&str, &str>>,
198    ) -> Result<ServerStats> {
199        self.inner
200            .load_file(sql, fp, format_options, copy_options)
201            .await
202    }
203
204    pub async fn stream_load(&self, sql: &str, data: Vec<Vec<&str>>) -> Result<ServerStats> {
205        self.inner.stream_load(sql, data).await
206    }
207
208    // PUT file://<path_to_file>/<filename> internalStage|externalStage
209    pub async fn put_files(&self, local_file: &str, stage: &str) -> Result<RowStatsIterator> {
210        self.inner.put_files(local_file, stage).await
211    }
212
213    pub async fn get_files(&self, stage: &str, local_file: &str) -> Result<RowStatsIterator> {
214        self.inner.get_files(stage, local_file).await
215    }
216}