scopedb_client/lib.rs
1// Copyright 2024 ScopeDB, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use arrow::array::RecordBatch;
16use reqwest::Client;
17
18use crate::{
19 api::{IngestData, IngestFormat, do_ingest, do_submit_statement},
20 config::Config,
21 error::Error,
22};
23
24mod api;
25mod codec;
26mod config;
27mod error;
28
29/// A connection to a ScopeDB instance.
30pub struct Connection {
31 config: Config,
32 client: Client,
33}
34
35impl Connection {
36 /// Connect to a ScopeDB instance. The endpoint is the base URL of the instance.
37 pub fn connect(endpoint: &str) -> Self {
38 Self {
39 config: Config {
40 endpoint: endpoint.to_string(),
41 },
42 client: Client::new(),
43 }
44 }
45
46 /// Submit query and return the result as Arrow record batches.
47 ///
48 /// # Example
49 /// ```ignore
50 /// let conn = Connection::connect("http://localhost:6543");
51 /// let result = conn.query("select 1").await.unwrap();
52 /// ```
53 pub async fn query(&self, statement: &str) -> Result<Vec<RecordBatch>, Error> {
54 // TODO: support asynchronous queries
55 let resp = do_submit_statement(
56 &self.client,
57 &self.config.endpoint,
58 statement,
59 api::ResultFormat::ArrowJson,
60 )
61 .await?;
62
63 if resp.status != api::StatementStatus::Finished {
64 return Err(Error::Internal("statement not finished".to_string()));
65 }
66
67 let result = if let Some(rs) = resp.result_set {
68 codec::decode_arrow(&rs.rows)?
69 } else {
70 return Err(Error::Internal("no result set".to_string()));
71 };
72
73 Ok(result)
74 }
75
76 /// Insert record batches into a table.
77 ///
78 /// # Example
79 /// ```ignore
80 /// let conn = Connection::connect("http://localhost:6543");
81 /// conn.insert("database", "schema", "table", &[record_batch]).await.unwrap();
82 /// ```
83 pub async fn insert(
84 &self,
85 database: &str,
86 schema: &str,
87 table: &str,
88 data: &[RecordBatch],
89 ) -> Result<(), Error> {
90 let data = codec::encode_arrow(data)?;
91 let ingest_data = IngestData {
92 format: IngestFormat::Arrow,
93 rows: data,
94 };
95 let statement = format!("insert into {database}.{schema}.{table}");
96 do_ingest(&self.client, &self.config.endpoint, ingest_data, &statement).await?;
97
98 Ok(())
99 }
100
101 /// Insert record batches into a table with custom transforms.
102 ///
103 /// # Example
104 /// ```ignore
105 /// let conn = Connection::connect("http://localhost:6543");
106 /// conn.insert_with_transform(
107 /// "database",
108 /// "schema",
109 /// "table",
110 /// &[record_batch],
111 /// "select $1 as foo where foo is not null",
112 /// ).await.unwrap();
113 /// ```
114 pub async fn insert_with_transform(
115 &self,
116 database: &str,
117 schema: &str,
118 table: &str,
119 data: &[RecordBatch],
120 transform: &str,
121 ) -> Result<(), Error> {
122 let data = codec::encode_arrow(data)?;
123 let ingest_data = IngestData {
124 format: IngestFormat::Arrow,
125 rows: data,
126 };
127 let statement = format!("{transform} insert into {database}.{schema}.{table}");
128 do_ingest(&self.client, &self.config.endpoint, ingest_data, &statement).await?;
129
130 Ok(())
131 }
132}