scopedb_client/
lib.rs

1// Copyright 2024 ScopeDB, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use arrow::array::RecordBatch;
16use reqwest::Client;
17
18use crate::{
19    api::{IngestData, IngestFormat, do_ingest, do_submit_statement},
20    config::Config,
21    error::Error,
22};
23
24mod api;
25mod codec;
26mod config;
27mod error;
28
29/// A connection to a ScopeDB instance.
30pub struct Connection {
31    config: Config,
32    client: Client,
33}
34
35impl Connection {
36    /// Connect to a ScopeDB instance. The endpoint is the base URL of the instance.
37    pub fn connect(endpoint: &str) -> Self {
38        Self {
39            config: Config {
40                endpoint: endpoint.to_string(),
41            },
42            client: Client::new(),
43        }
44    }
45
46    /// Submit query and return the result as Arrow record batches.
47    ///
48    /// # Example
49    /// ```ignore
50    /// let conn = Connection::connect("http://localhost:6543");
51    /// let result = conn.query("select 1").await.unwrap();
52    /// ```
53    pub async fn query(&self, statement: &str) -> Result<Vec<RecordBatch>, Error> {
54        // TODO: support asynchronous queries
55        let resp = do_submit_statement(
56            &self.client,
57            &self.config.endpoint,
58            statement,
59            api::ResultFormat::ArrowJson,
60        )
61        .await?;
62
63        if resp.status != api::StatementStatus::Finished {
64            return Err(Error::Internal("statement not finished".to_string()));
65        }
66
67        let result = if let Some(rs) = resp.result_set {
68            codec::decode_arrow(&rs.rows)?
69        } else {
70            return Err(Error::Internal("no result set".to_string()));
71        };
72
73        Ok(result)
74    }
75
76    /// Insert record batches into a table.
77    ///
78    /// # Example
79    /// ```ignore
80    /// let conn = Connection::connect("http://localhost:6543");
81    /// conn.insert("database", "schema", "table", &[record_batch]).await.unwrap();
82    /// ```
83    pub async fn insert(
84        &self,
85        database: &str,
86        schema: &str,
87        table: &str,
88        data: &[RecordBatch],
89    ) -> Result<(), Error> {
90        let data = codec::encode_arrow(data)?;
91        let ingest_data = IngestData {
92            format: IngestFormat::Arrow,
93            rows: data,
94        };
95        let statement = format!("insert into {database}.{schema}.{table}");
96        do_ingest(&self.client, &self.config.endpoint, ingest_data, &statement).await?;
97
98        Ok(())
99    }
100
101    /// Insert record batches into a table with custom transforms.
102    ///
103    /// # Example
104    /// ```ignore
105    /// let conn = Connection::connect("http://localhost:6543");
106    /// conn.insert_with_transform(
107    ///     "database",
108    ///     "schema",
109    ///     "table",
110    ///     &[record_batch],
111    ///     "select $1 as foo where foo is not null",
112    /// ).await.unwrap();
113    /// ```
114    pub async fn insert_with_transform(
115        &self,
116        database: &str,
117        schema: &str,
118        table: &str,
119        data: &[RecordBatch],
120        transform: &str,
121    ) -> Result<(), Error> {
122        let data = codec::encode_arrow(data)?;
123        let ingest_data = IngestData {
124            format: IngestFormat::Arrow,
125            rows: data,
126        };
127        let statement = format!("{transform} insert into {database}.{schema}.{table}");
128        do_ingest(&self.client, &self.config.endpoint, ingest_data, &statement).await?;
129
130        Ok(())
131    }
132}