dremio_rs/lib.rs
1//! `dremio-rs` is a Rust client for interacting with Dremio's Flight SQL service.
2//!
3//! This crate provides a convenient way to connect to a Dremio coordinator,
4//! execute SQL queries, and retrieve data as Apache Arrow `RecordBatch`es.
5//! It also includes functionality to write query results directly to Parquet files.
6//!
7//! The client is built on top of the `arrow-flight` and `tonic` crates,
8//! offering an asynchronous API for efficient data retrieval.
9//!
10//! # Features
11//!
12//! - Connect to Dremio Flight SQL endpoint.
13//! - Authenticate with username and password.
14//! - Execute SQL queries.
15//! - Retrieve query results as `Vec<RecordBatch>`.
16//! - Write query results to Parquet files.
17//!
18//! # Example
19//!
20//! ```no_run
21//! use dremio_rs::Client;
22//!
23//! #[tokio::main]
24//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
25//! let mut client = Client::new("http://localhost:32010", "dremio", "dremio123").await?;
26//!
27//! // Execute a query and get record batches
28//! let batches = client.get_record_batches("SELECT * FROM sys.options").await?;
29//! for batch in batches {
30//! println!("RecordBatch: {:?}", batch);
31//! }
32//!
33//! // Write query results to a Parquet file
34//! client.write_parquet("SELECT * FROM sys.options", "sys_options.parquet").await?;
35//!
36//! Ok(())
37//! }
38//! ```
39
40use arrow::array::RecordBatch;
41use arrow::error::ArrowError;
42use arrow_flight::error::FlightError;
43use arrow_flight::sql::client::FlightSqlServiceClient;
44use futures::stream::StreamExt;
45use parquet::arrow::ArrowWriter;
46use parquet::errors::ParquetError;
47use std::io::Error as IoError;
48use thiserror::Error;
49use tonic::transport::{Channel, Endpoint, Error as TonicError};
50
51/// Represents the possible errors that can occur when using the Dremio client.
52#[derive(Error, Debug)]
53pub enum DremioClientError {
54 /// An error originating from the `tonic` gRPC framework.
55 #[error("Tonic Error: {0}")]
56 TonicError(#[from] TonicError),
57 /// An error originating from the `arrow` data processing library.
58 #[error("Arrow Error: {0}")]
59 ArrowError(#[from] ArrowError),
60 /// An error originating from the `arrow-flight` Flight SQL client.
61 #[error("Flight Error: {0}")]
62 FlightError(#[from] FlightError),
63 /// An error originating from standard I/O operations.
64 #[error("IO Error: {0}")]
65 IoError(#[from] IoError),
66 /// An error originating from the `parquet` file format library.
67 #[error("Parquet Error: {0}")]
68 ParquetError(#[from] ParquetError),
69}
70
71/// A client for interacting with Dremio's Flight SQL service.
72///
73/// This client wraps the `FlightSqlServiceClient` and provides a simplified
74/// interface for common operations such as executing SQL queries and
75/// retrieving data as Arrow `RecordBatch`es, or writing them to Parquet files.
76pub struct Client {
77 flight_sql_service_client: FlightSqlServiceClient<Channel>,
78}
79
80impl Client {
81 /// Creates a new `Client` instance and establishes a connection to the Dremio coordinator.
82 ///
83 /// # Arguments
84 ///
85 /// * `url` - The URL of the Dremio coordinator (e.g., "http://localhost:32010").
86 /// * `user` - The username for authentication.
87 /// * `pass` - The password for authentication.
88 ///
89 /// # Returns
90 ///
91 /// A `Result` which is:
92 /// - `Ok(Self)` if the connection is successful and authentication succeeds.
93 /// - `Err(DremioClientError)` if an error occurs during connection or authentication.
94 ///
95 /// # Example
96 ///
97 /// ```no_run
98 /// use dremio_rs::Client;
99 ///
100 /// #[tokio::main]
101 /// async fn main() {
102 /// let mut client = Client::new("http://localhost:32010", "dremio", "dremio123").await.unwrap();
103 /// }
104 /// ```
105 pub async fn new(url: &str, user: &str, pass: &str) -> Result<Self, DremioClientError> {
106 let mut client =
107 FlightSqlServiceClient::new(Endpoint::from_shared(url.to_string())?.connect().await?);
108 client.handshake(user, pass).await?;
109 Ok(Self {
110 flight_sql_service_client: client,
111 })
112 }
113
114 /// Executes a SQL query against Dremio and retrieves the results as a vector of `RecordBatch`es.
115 ///
116 /// # Arguments
117 ///
118 /// * `query` - The SQL query string to execute.
119 ///
120 /// # Returns
121 ///
122 /// A `Result` which is:
123 /// - `Ok(Vec<RecordBatch>)` containing the query results if successful.
124 /// - `Err(DremioClientError)` if an error occurs during query execution or data retrieval.
125 ///
126 /// # Example
127 ///
128 /// ```no_run
129 /// use dremio_rs::Client;
130 ///
131 /// #[tokio::main]
132 /// async fn main() {
133 /// let mut client = Client::new("http://localhost:32010", "dremio", "dremio123").await.unwrap();
134 /// let batches = client.get_record_batches("SELECT * FROM sys.options").await.unwrap();
135 /// for batch in batches {
136 /// println!("{:?}", batch);
137 /// }
138 /// }
139 /// ```
140 pub async fn get_record_batches(
141 &mut self,
142 query: &str,
143 ) -> Result<Vec<RecordBatch>, DremioClientError> {
144 let flight_info = self
145 .flight_sql_service_client
146 .execute(query.to_string(), None)
147 .await?;
148 let ticket = flight_info.endpoint[0]
149 .ticket
150 .clone()
151 .expect("Missing ticket");
152 let mut stream = self.flight_sql_service_client.do_get(ticket).await?;
153 let mut batches = Vec::new();
154
155 while let Some(batch) = stream.next().await {
156 batches.push(batch?);
157 }
158 Ok(batches)
159 }
160
161 /// Executes a SQL query and writes the results directly to a Parquet file.
162 ///
163 /// # Arguments
164 ///
165 /// * `query` - The SQL query string to execute.
166 /// * `path` - The file path where the Parquet file will be written.
167 ///
168 /// # Returns
169 ///
170 /// A `Result` which is:
171 /// - `Ok(())` if the Parquet file is successfully written.
172 /// - `Err(DremioClientError)` if an error occurs during query execution,
173 /// data retrieval, or file writing.
174 ///
175 /// # Example
176 ///
177 /// ```no_run
178 /// use dremio_rs::Client;
179 ///
180 /// #[tokio::main]
181 /// async fn main() {
182 /// let mut client = Client::new("http://localhost:32010", "dremio", "dremio123").await.unwrap();
183 /// client.write_parquet("SELECT * FROM sys.options", "my_table.parquet").await.unwrap();
184 /// }
185 /// ```
186 pub async fn write_parquet(
187 &mut self,
188 query: &str,
189 path: &str,
190 ) -> Result<(), DremioClientError> {
191 let batches = self.get_record_batches(query).await?;
192 let file = std::fs::File::create(path)?;
193 let mut writer = ArrowWriter::try_new(file, batches[0].schema(), None)?;
194 for batch in batches {
195 writer.write(&batch)?;
196 }
197 writer.close()?;
198 Ok(())
199 }
200
201 /// Returns a shared reference to the underlying `FlightSqlServiceClient`.
202 ///
203 /// This can be used to access more advanced Flight SQL operations not directly
204 /// exposed by the `Client` interface.
205 ///
206 /// # Returns
207 ///
208 /// A reference to the `FlightSqlServiceClient<Channel>`.
209 pub fn inner(&self) -> &FlightSqlServiceClient<Channel> {
210 &self.flight_sql_service_client
211 }
212}