Skip to main content

hyperdb_api/
arrow_reader.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Arrow IPC stream reader for query results.
5//!
6//! This module provides the [`ArrowReader`] struct for reading query results
7//! in Arrow IPC stream format.
8//!
9//! Unlike regular query execution which returns row-by-row results, `ArrowReader`
10//! returns complete Arrow IPC streams, which can be directly consumed by Arrow
11//! libraries like the `arrow` crate.
12//!
13//! # Example
14//!
15//! ```no_run
16//! # use hyperdb_api::{ArrowReader, Connection, Result};
17//! # fn example(conn: &Connection) -> Result<()> {
18//! use hyperdb_api::{ArrowReader, Connection};
19//!
20//! let reader = ArrowReader::new(&conn);
21//!
22//! // Query results as Arrow IPC stream
23//! let arrow_data = reader.query_to_arrow("SELECT * FROM my_table")?;
24//!
25//! // Or export an entire table
26//! let arrow_data = reader.table_to_arrow("my_table")?;
27//! # Ok(())
28//! # }
29//! ```
30
31use crate::connection::Connection;
32use crate::error::Result;
33
34/// Reads query results in Arrow IPC stream format.
35///
36/// `ArrowReader` provides methods to execute queries and receive results as
37/// Arrow IPC stream data. This is useful for integration with Arrow-based
38/// data processing pipelines.
39///
40/// # How It Works
41///
42/// Internally, `ArrowReader` uses `COPY (SELECT ...) TO STDOUT WITH (format arrowstream)`
43/// to retrieve query results in Arrow format. The returned bytes are a valid
44/// Arrow IPC stream containing:
45/// 1. A schema message
46/// 2. One or more record batch messages
47///
48/// # Example
49///
50/// ```no_run
51/// use hyperdb_api::{ArrowReader, Connection, CreateMode, Result};
52///
53/// fn main() -> Result<()> {
54///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
55///
56///     // Create and populate a table
57///     conn.execute_command("CREATE TABLE data (id INT, value DOUBLE PRECISION)")?;
58///     conn.execute_command("INSERT INTO data VALUES (1, 1.5), (2, 2.5), (3, 3.5)")?;
59///
60///     // Read the table as Arrow
61///     let reader = ArrowReader::new(&conn);
62///     let arrow_data = reader.table_to_arrow("data")?;
63///
64///     println!("Got {} bytes of Arrow IPC data", arrow_data.len());
65///     Ok(())
66/// }
67/// ```
68#[derive(Debug)]
69pub struct ArrowReader<'conn> {
70    connection: &'conn Connection,
71}
72
73impl<'conn> ArrowReader<'conn> {
74    /// Creates a new Arrow reader for the given connection.
75    pub fn new(connection: &'conn Connection) -> Self {
76        ArrowReader { connection }
77    }
78
79    /// Executes a SELECT query and returns results as Arrow IPC stream.
80    ///
81    /// The query should be a SELECT statement. It will be wrapped in a
82    /// `COPY (...) TO STDOUT WITH (format arrowstream)` to retrieve the
83    /// results in Arrow format.
84    ///
85    /// # Arguments
86    ///
87    /// * `select_query` - A SELECT query (without COPY wrapper)
88    ///
89    /// # Returns
90    ///
91    /// Raw Arrow IPC stream bytes that can be parsed by Arrow libraries.
92    ///
93    /// # Example
94    ///
95    /// ```no_run
96    /// # use hyperdb_api::{ArrowReader, Connection, Result};
97    /// # fn example(conn: &Connection) -> Result<()> {
98    /// let reader = ArrowReader::new(&conn);
99    /// let arrow_data = reader.query_to_arrow("SELECT id, name FROM users WHERE active = true")?;
100    /// # Ok(())
101    /// # }
102    /// ```
103    ///
104    /// # Errors
105    ///
106    /// - Returns [`crate::Error::Other`] if the connection is using gRPC
107    ///   transport (ArrowReader wraps `COPY TO STDOUT`, which is TCP-only).
108    /// - Returns [`crate::Error::Client`] if the server rejects the
109    ///   `COPY (<query>) TO STDOUT WITH (format arrowstream)` statement.
110    /// - Returns [`crate::Error::Io`] on transport-level I/O failures.
111    pub fn query_to_arrow(&self, select_query: &str) -> Result<Vec<u8>> {
112        let copy_query = format!("COPY ({select_query}) TO STDOUT WITH (format arrowstream)");
113        let client = self.connection.tcp_client().ok_or_else(|| {
114            crate::Error::new("ArrowReader requires a TCP connection. Use Connection::execute_query_to_arrow() for gRPC.")
115        })?;
116        Ok(client.copy_out(&copy_query)?)
117    }
118
119    /// Exports an entire table to Arrow IPC stream format.
120    ///
121    /// This is equivalent to `query_to_arrow("SELECT * FROM table_name")`.
122    ///
123    /// # Arguments
124    ///
125    /// * `table_name` - The table name (should be properly escaped if needed)
126    ///
127    /// # Returns
128    ///
129    /// Raw Arrow IPC stream bytes containing all rows from the table.
130    ///
131    /// # Example
132    ///
133    /// ```no_run
134    /// # use hyperdb_api::{ArrowReader, Connection, Result};
135    /// # fn example(conn: &Connection) -> Result<()> {
136    /// let reader = ArrowReader::new(&conn);
137    /// let arrow_data = reader.table_to_arrow("my_table")?;
138    /// # Ok(())
139    /// # }
140    /// ```
141    ///
142    /// # Errors
143    ///
144    /// See [`query_to_arrow`](Self::query_to_arrow).
145    pub fn table_to_arrow(&self, table_name: &str) -> Result<Vec<u8>> {
146        self.query_to_arrow(&format!("SELECT * FROM {table_name}"))
147    }
148
149    /// Exports specific columns from a table to Arrow IPC stream format.
150    ///
151    /// # Arguments
152    ///
153    /// * `table_name` - The table name
154    /// * `columns` - Column names to export
155    ///
156    /// # Returns
157    ///
158    /// Raw Arrow IPC stream bytes containing the specified columns.
159    ///
160    /// # Example
161    ///
162    /// ```no_run
163    /// # use hyperdb_api::{ArrowReader, Connection, Result};
164    /// # fn example(conn: &Connection) -> Result<()> {
165    /// let reader = ArrowReader::new(&conn);
166    /// let arrow_data = reader.table_columns_to_arrow("users", &["id", "name", "email"])?;
167    /// # Ok(())
168    /// # }
169    /// ```
170    ///
171    /// # Errors
172    ///
173    /// See [`query_to_arrow`](Self::query_to_arrow).
174    pub fn table_columns_to_arrow(&self, table_name: &str, columns: &[&str]) -> Result<Vec<u8>> {
175        let column_list = columns.join(", ");
176        self.query_to_arrow(&format!("SELECT {column_list} FROM {table_name}"))
177    }
178
179    /// Exports a table with a WHERE clause to Arrow IPC stream format.
180    ///
181    /// # Arguments
182    ///
183    /// * `table_name` - The table name
184    /// * `where_clause` - The WHERE clause (without the "WHERE" keyword)
185    ///
186    /// # Returns
187    ///
188    /// Raw Arrow IPC stream bytes containing filtered rows.
189    ///
190    /// # Example
191    ///
192    /// ```no_run
193    /// # use hyperdb_api::{ArrowReader, Connection, Result};
194    /// # fn example(conn: &Connection) -> Result<()> {
195    /// let reader = ArrowReader::new(&conn);
196    /// let arrow_data = reader.table_filtered_to_arrow("users", "active = true")?;
197    /// # Ok(())
198    /// # }
199    /// ```
200    ///
201    /// # Errors
202    ///
203    /// See [`query_to_arrow`](Self::query_to_arrow).
204    pub fn table_filtered_to_arrow(&self, table_name: &str, where_clause: &str) -> Result<Vec<u8>> {
205        self.query_to_arrow(&format!("SELECT * FROM {table_name} WHERE {where_clause}"))
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    // Integration tests require a running Hyper instance
212    // See hyperdb-api/tests/arrow_reader_tests.rs
213}