Skip to main content

hyperdb_api/
arrow_reader.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Arrow IPC stream reader for query results.
5//!
6//! This module provides the [`ArrowReader`] struct for reading query results
7//! in Arrow IPC stream format.
8//!
9//! Unlike regular query execution which returns row-by-row results, `ArrowReader`
10//! returns complete Arrow IPC streams, which can be directly consumed by Arrow
11//! libraries like the `arrow` crate.
12//!
13//! # Example
14//!
15//! ```no_run
16//! # use hyperdb_api::{ArrowReader, Connection, Result};
17//! # fn example(conn: &Connection) -> Result<()> {
18//! use hyperdb_api::{ArrowReader, Connection};
19//!
20//! let reader = ArrowReader::new(&conn);
21//!
22//! // Query results as Arrow IPC stream
23//! let arrow_data = reader.query_to_arrow("SELECT * FROM my_table")?;
24//!
25//! // Or export an entire table
26//! let arrow_data = reader.table_to_arrow("my_table")?;
27//! # Ok(())
28//! # }
29//! ```
30
31use crate::connection::Connection;
32use crate::error::Result;
33
34/// Reads query results in Arrow IPC stream format.
35///
36/// `ArrowReader` provides methods to execute queries and receive results as
37/// Arrow IPC stream data. This is useful for integration with Arrow-based
38/// data processing pipelines.
39///
40/// # How It Works
41///
42/// Internally, `ArrowReader` uses `COPY (SELECT ...) TO STDOUT WITH (format arrowstream)`
43/// to retrieve query results in Arrow format. The returned bytes are a valid
44/// Arrow IPC stream containing:
45/// 1. A schema message
46/// 2. One or more record batch messages
47///
48/// # Example
49///
50/// ```no_run
51/// use hyperdb_api::{ArrowReader, Connection, CreateMode, Result};
52///
53/// fn main() -> Result<()> {
54///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
55///
56///     // Create and populate a table
57///     conn.execute_command("CREATE TABLE data (id INT, value DOUBLE PRECISION)")?;
58///     conn.execute_command("INSERT INTO data VALUES (1, 1.5), (2, 2.5), (3, 3.5)")?;
59///
60///     // Read the table as Arrow
61///     let reader = ArrowReader::new(&conn);
62///     let arrow_data = reader.table_to_arrow("data")?;
63///
64///     println!("Got {} bytes of Arrow IPC data", arrow_data.len());
65///     Ok(())
66/// }
67/// ```
68#[derive(Debug)]
69pub struct ArrowReader<'conn> {
70    connection: &'conn Connection,
71}
72
73impl<'conn> ArrowReader<'conn> {
74    /// Creates a new Arrow reader for the given connection.
75    pub fn new(connection: &'conn Connection) -> Self {
76        ArrowReader { connection }
77    }
78
79    /// Executes a SELECT query and returns results as Arrow IPC stream.
80    ///
81    /// The query should be a SELECT statement. It will be wrapped in a
82    /// `COPY (...) TO STDOUT WITH (format arrowstream)` to retrieve the
83    /// results in Arrow format.
84    ///
85    /// # Arguments
86    ///
87    /// * `select_query` - A SELECT query (without COPY wrapper)
88    ///
89    /// # Returns
90    ///
91    /// Raw Arrow IPC stream bytes that can be parsed by Arrow libraries.
92    ///
93    /// # Example
94    ///
95    /// ```no_run
96    /// # use hyperdb_api::{ArrowReader, Connection, Result};
97    /// # fn example(conn: &Connection) -> Result<()> {
98    /// let reader = ArrowReader::new(&conn);
99    /// let arrow_data = reader.query_to_arrow("SELECT id, name FROM users WHERE active = true")?;
100    /// # Ok(())
101    /// # }
102    /// ```
103    ///
104    /// # Errors
105    ///
106    /// - Returns [`crate::Error::FeatureNotSupported`] if the connection is using gRPC
107    ///   transport (ArrowReader wraps `COPY TO STDOUT`, which is TCP-only).
108    /// - Returns [`crate::Error::Server`] if the server rejects the
109    ///   `COPY (<query>) TO STDOUT WITH (format arrowstream)` statement.
110    /// - Returns [`crate::Error::Io`] on transport-level I/O failures.
111    pub fn query_to_arrow(&self, select_query: &str) -> Result<Vec<u8>> {
112        let copy_query = format!("COPY ({select_query}) TO STDOUT WITH (format arrowstream)");
113        let client = self.connection.tcp_client().ok_or_else(|| {
114            crate::Error::feature_not_supported(
115                "ArrowReader requires a TCP connection. Use Connection::execute_query_to_arrow() for gRPC."
116            )
117        })?;
118        Ok(client.copy_out(&copy_query)?)
119    }
120
121    /// Exports an entire table to Arrow IPC stream format.
122    ///
123    /// This is equivalent to `query_to_arrow("SELECT * FROM table_name")`.
124    ///
125    /// # Arguments
126    ///
127    /// * `table_name` - The table name (should be properly escaped if needed)
128    ///
129    /// # Returns
130    ///
131    /// Raw Arrow IPC stream bytes containing all rows from the table.
132    ///
133    /// # Example
134    ///
135    /// ```no_run
136    /// # use hyperdb_api::{ArrowReader, Connection, Result};
137    /// # fn example(conn: &Connection) -> Result<()> {
138    /// let reader = ArrowReader::new(&conn);
139    /// let arrow_data = reader.table_to_arrow("my_table")?;
140    /// # Ok(())
141    /// # }
142    /// ```
143    ///
144    /// # Errors
145    ///
146    /// See [`query_to_arrow`](Self::query_to_arrow).
147    pub fn table_to_arrow(&self, table_name: &str) -> Result<Vec<u8>> {
148        self.query_to_arrow(&format!("SELECT * FROM {table_name}"))
149    }
150
151    /// Exports specific columns from a table to Arrow IPC stream format.
152    ///
153    /// # Arguments
154    ///
155    /// * `table_name` - The table name
156    /// * `columns` - Column names to export
157    ///
158    /// # Returns
159    ///
160    /// Raw Arrow IPC stream bytes containing the specified columns.
161    ///
162    /// # Example
163    ///
164    /// ```no_run
165    /// # use hyperdb_api::{ArrowReader, Connection, Result};
166    /// # fn example(conn: &Connection) -> Result<()> {
167    /// let reader = ArrowReader::new(&conn);
168    /// let arrow_data = reader.table_columns_to_arrow("users", &["id", "name", "email"])?;
169    /// # Ok(())
170    /// # }
171    /// ```
172    ///
173    /// # Errors
174    ///
175    /// See [`query_to_arrow`](Self::query_to_arrow).
176    pub fn table_columns_to_arrow(&self, table_name: &str, columns: &[&str]) -> Result<Vec<u8>> {
177        let column_list = columns.join(", ");
178        self.query_to_arrow(&format!("SELECT {column_list} FROM {table_name}"))
179    }
180
181    /// Exports a table with a WHERE clause to Arrow IPC stream format.
182    ///
183    /// # Arguments
184    ///
185    /// * `table_name` - The table name
186    /// * `where_clause` - The WHERE clause (without the "WHERE" keyword)
187    ///
188    /// # Returns
189    ///
190    /// Raw Arrow IPC stream bytes containing filtered rows.
191    ///
192    /// # Example
193    ///
194    /// ```no_run
195    /// # use hyperdb_api::{ArrowReader, Connection, Result};
196    /// # fn example(conn: &Connection) -> Result<()> {
197    /// let reader = ArrowReader::new(&conn);
198    /// let arrow_data = reader.table_filtered_to_arrow("users", "active = true")?;
199    /// # Ok(())
200    /// # }
201    /// ```
202    ///
203    /// # Errors
204    ///
205    /// See [`query_to_arrow`](Self::query_to_arrow).
206    pub fn table_filtered_to_arrow(&self, table_name: &str, where_clause: &str) -> Result<Vec<u8>> {
207        self.query_to_arrow(&format!("SELECT * FROM {table_name} WHERE {where_clause}"))
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    // Integration tests require a running Hyper instance
214    // See hyperdb-api/tests/arrow_reader_tests.rs
215}