Skip to main content

dremio_rs/
lib.rs

1//! `dremio-rs` is a Rust client for interacting with Dremio's Flight SQL service.
2//!
3//! This crate provides a convenient way to connect to a Dremio coordinator,
4//! execute SQL queries, and retrieve data as Apache Arrow `RecordBatch`es.
5//! It also includes functionality to write query results directly to Parquet files.
6//!
7//! The client is built on top of the `arrow-flight` and `tonic` crates,
8//! offering an asynchronous API for efficient data retrieval.
9//!
10//! # Features
11//!
12//! - Connect to Dremio Flight SQL endpoint.
13//! - Authenticate with username and password.
14//! - Execute SQL queries.
15//! - Retrieve query results as `Vec<RecordBatch>`.
16//! - Write query results to Parquet files.
17//!
18//! # Example
19//!
20//! ```no_run
21//! use dremio_rs::Client;
22//!
23//! #[tokio::main]
24//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
25//!     let mut client = Client::new("http://localhost:32010", "dremio", "dremio123").await?;
26//!
27//!     // Execute a query and get record batches
28//!     let batches = client.get_record_batches("SELECT * FROM sys.options").await?;
29//!     for batch in batches {
30//!         println!("RecordBatch: {:?}", batch);
31//!     }
32//!
33//!     // Write query results to a Parquet file
34//!     client.write_parquet("SELECT * FROM sys.options", "sys_options.parquet").await?;
35//!
36//!     Ok(())
37//! }
38//! ```
39
40use arrow::array::RecordBatch;
41use arrow::error::ArrowError;
42use arrow_flight::error::FlightError;
43use arrow_flight::sql::client::FlightSqlServiceClient;
44use futures::stream::StreamExt;
45use parquet::arrow::ArrowWriter;
46use parquet::errors::ParquetError;
47use std::io::Error as IoError;
48use thiserror::Error;
49use tonic::transport::{Channel, Endpoint, Error as TonicError};
50
51/// Represents the possible errors that can occur when using the Dremio client.
52#[derive(Error, Debug)]
53pub enum DremioClientError {
54    /// An error originating from the `tonic` gRPC framework.
55    #[error("Tonic Error: {0}")]
56    TonicError(#[from] TonicError),
57    /// An error originating from the `arrow` data processing library.
58    #[error("Arrow Error: {0}")]
59    ArrowError(#[from] ArrowError),
60    /// An error originating from the `arrow-flight` Flight SQL client.
61    #[error("Flight Error: {0}")]
62    FlightError(#[from] FlightError),
63    /// An error originating from standard I/O operations.
64    #[error("IO Error: {0}")]
65    IoError(#[from] IoError),
66    /// An error originating from the `parquet` file format library.
67    #[error("Parquet Error: {0}")]
68    ParquetError(#[from] ParquetError),
69}
70
71/// A client for interacting with Dremio's Flight SQL service.
72///
73/// This client wraps the `FlightSqlServiceClient` and provides a simplified
74/// interface for common operations such as executing SQL queries and
75/// retrieving data as Arrow `RecordBatch`es, or writing them to Parquet files.
76pub struct Client {
77    flight_sql_service_client: FlightSqlServiceClient<Channel>,
78}
79
80impl Client {
81    /// Creates a new `Client` instance and establishes a connection to the Dremio coordinator.
82    ///
83    /// # Arguments
84    ///
85    /// * `url` - The URL of the Dremio coordinator (e.g., "http://localhost:32010").
86    /// * `user` - The username for authentication.
87    /// * `pass` - The password for authentication.
88    ///
89    /// # Returns
90    ///
91    /// A `Result` which is:
92    /// - `Ok(Self)` if the connection is successful and authentication succeeds.
93    /// - `Err(DremioClientError)` if an error occurs during connection or authentication.
94    ///
95    /// # Example
96    ///
97    /// ```no_run
98    /// use dremio_rs::Client;
99    ///
100    /// #[tokio::main]
101    /// async fn main() {
102    ///    let mut client = Client::new("http://localhost:32010", "dremio", "dremio123").await.unwrap();
103    /// }
104    /// ```
105    pub async fn new(url: &str, user: &str, pass: &str) -> Result<Self, DremioClientError> {
106        let mut client =
107            FlightSqlServiceClient::new(Endpoint::from_shared(url.to_string())?.connect().await?);
108        client.handshake(user, pass).await?;
109        Ok(Self {
110            flight_sql_service_client: client,
111        })
112    }
113
114    /// Executes a SQL query against Dremio and retrieves the results as a vector of `RecordBatch`es.
115    ///
116    /// # Arguments
117    ///
118    /// * `query` - The SQL query string to execute.
119    ///
120    /// # Returns
121    ///
122    /// A `Result` which is:
123    /// - `Ok(Vec<RecordBatch>)` containing the query results if successful.
124    /// - `Err(DremioClientError)` if an error occurs during query execution or data retrieval.
125    ///
126    /// # Example
127    ///
128    /// ```no_run
129    /// use dremio_rs::Client;
130    ///
131    /// #[tokio::main]
132    /// async fn main() {
133    ///   let mut client = Client::new("http://localhost:32010", "dremio", "dremio123").await.unwrap();
134    ///   let batches = client.get_record_batches("SELECT * FROM sys.options").await.unwrap();
135    ///   for batch in batches {
136    ///     println!("{:?}", batch);
137    ///   }
138    /// }
139    /// ```
140    pub async fn get_record_batches(
141        &mut self,
142        query: &str,
143    ) -> Result<Vec<RecordBatch>, DremioClientError> {
144        let flight_info = self
145            .flight_sql_service_client
146            .execute(query.to_string(), None)
147            .await?;
148        let ticket = flight_info.endpoint[0]
149            .ticket
150            .clone()
151            .expect("Missing ticket");
152        let mut stream = self.flight_sql_service_client.do_get(ticket).await?;
153        let mut batches = Vec::new();
154
155        while let Some(batch) = stream.next().await {
156            batches.push(batch?);
157        }
158        Ok(batches)
159    }
160
161    /// Executes a SQL query and writes the results directly to a Parquet file.
162    ///
163    /// # Arguments
164    ///
165    /// * `query` - The SQL query string to execute.
166    /// * `path` - The file path where the Parquet file will be written.
167    ///
168    /// # Returns
169    ///
170    /// A `Result` which is:
171    /// - `Ok(())` if the Parquet file is successfully written.
172    /// - `Err(DremioClientError)` if an error occurs during query execution,
173    ///   data retrieval, or file writing.
174    ///
175    /// # Example
176    ///
177    /// ```no_run
178    /// use dremio_rs::Client;
179    ///
180    /// #[tokio::main]
181    /// async fn main() {
182    ///  let mut client = Client::new("http://localhost:32010", "dremio", "dremio123").await.unwrap();
183    ///  client.write_parquet("SELECT * FROM sys.options", "my_table.parquet").await.unwrap();
184    /// }
185    /// ```
186    pub async fn write_parquet(
187        &mut self,
188        query: &str,
189        path: &str,
190    ) -> Result<(), DremioClientError> {
191        let batches = self.get_record_batches(query).await?;
192        let file = std::fs::File::create(path)?;
193        let mut writer = ArrowWriter::try_new(file, batches[0].schema(), None)?;
194        for batch in batches {
195            writer.write(&batch)?;
196        }
197        writer.close()?;
198        Ok(())
199    }
200
201    /// Returns a shared reference to the underlying `FlightSqlServiceClient`.
202    ///
203    /// This can be used to access more advanced Flight SQL operations not directly
204    /// exposed by the `Client` interface.
205    ///
206    /// # Returns
207    ///
208    /// A reference to the `FlightSqlServiceClient<Channel>`.
209    pub fn inner(&self) -> &FlightSqlServiceClient<Channel> {
210        &self.flight_sql_service_client
211    }
212}