hyperdb_api/arrow_reader.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Arrow IPC stream reader for query results.
5//!
6//! This module provides the [`ArrowReader`] struct for reading query results
7//! in Arrow IPC stream format.
8//!
9//! Unlike regular query execution which returns row-by-row results, `ArrowReader`
10//! returns complete Arrow IPC streams, which can be directly consumed by Arrow
11//! libraries like the `arrow` crate.
12//!
13//! # Example
14//!
15//! ```no_run
16//! # use hyperdb_api::{ArrowReader, Connection, Result};
17//! # fn example(conn: &Connection) -> Result<()> {
18//! use hyperdb_api::{ArrowReader, Connection};
19//!
20//! let reader = ArrowReader::new(&conn);
21//!
22//! // Query results as Arrow IPC stream
23//! let arrow_data = reader.query_to_arrow("SELECT * FROM my_table")?;
24//!
25//! // Or export an entire table
26//! let arrow_data = reader.table_to_arrow("my_table")?;
27//! # Ok(())
28//! # }
29//! ```
30
31use crate::connection::Connection;
32use crate::error::Result;
33
34/// Reads query results in Arrow IPC stream format.
35///
36/// `ArrowReader` provides methods to execute queries and receive results as
37/// Arrow IPC stream data. This is useful for integration with Arrow-based
38/// data processing pipelines.
39///
40/// # How It Works
41///
42/// Internally, `ArrowReader` uses `COPY (SELECT ...) TO STDOUT WITH (format arrowstream)`
43/// to retrieve query results in Arrow format. The returned bytes are a valid
44/// Arrow IPC stream containing:
45/// 1. A schema message
46/// 2. One or more record batch messages
47///
48/// # Example
49///
50/// ```no_run
51/// use hyperdb_api::{ArrowReader, Connection, CreateMode, Result};
52///
53/// fn main() -> Result<()> {
54/// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
55///
56/// // Create and populate a table
57/// conn.execute_command("CREATE TABLE data (id INT, value DOUBLE PRECISION)")?;
58/// conn.execute_command("INSERT INTO data VALUES (1, 1.5), (2, 2.5), (3, 3.5)")?;
59///
60/// // Read the table as Arrow
61/// let reader = ArrowReader::new(&conn);
62/// let arrow_data = reader.table_to_arrow("data")?;
63///
64/// println!("Got {} bytes of Arrow IPC data", arrow_data.len());
65/// Ok(())
66/// }
67/// ```
68#[derive(Debug)]
69pub struct ArrowReader<'conn> {
70 connection: &'conn Connection,
71}
72
73impl<'conn> ArrowReader<'conn> {
74 /// Creates a new Arrow reader for the given connection.
75 pub fn new(connection: &'conn Connection) -> Self {
76 ArrowReader { connection }
77 }
78
79 /// Executes a SELECT query and returns results as Arrow IPC stream.
80 ///
81 /// The query should be a SELECT statement. It will be wrapped in a
82 /// `COPY (...) TO STDOUT WITH (format arrowstream)` to retrieve the
83 /// results in Arrow format.
84 ///
85 /// # Arguments
86 ///
87 /// * `select_query` - A SELECT query (without COPY wrapper)
88 ///
89 /// # Returns
90 ///
91 /// Raw Arrow IPC stream bytes that can be parsed by Arrow libraries.
92 ///
93 /// # Example
94 ///
95 /// ```no_run
96 /// # use hyperdb_api::{ArrowReader, Connection, Result};
97 /// # fn example(conn: &Connection) -> Result<()> {
98 /// let reader = ArrowReader::new(&conn);
99 /// let arrow_data = reader.query_to_arrow("SELECT id, name FROM users WHERE active = true")?;
100 /// # Ok(())
101 /// # }
102 /// ```
103 ///
104 /// # Errors
105 ///
106 /// - Returns [`crate::Error::FeatureNotSupported`] if the connection is using gRPC
107 /// transport (ArrowReader wraps `COPY TO STDOUT`, which is TCP-only).
108 /// - Returns [`crate::Error::Server`] if the server rejects the
109 /// `COPY (<query>) TO STDOUT WITH (format arrowstream)` statement.
110 /// - Returns [`crate::Error::Io`] on transport-level I/O failures.
111 pub fn query_to_arrow(&self, select_query: &str) -> Result<Vec<u8>> {
112 let copy_query = format!("COPY ({select_query}) TO STDOUT WITH (format arrowstream)");
113 let client = self.connection.tcp_client().ok_or_else(|| {
114 crate::Error::feature_not_supported(
115 "ArrowReader requires a TCP connection. Use Connection::execute_query_to_arrow() for gRPC."
116 )
117 })?;
118 Ok(client.copy_out(©_query)?)
119 }
120
121 /// Exports an entire table to Arrow IPC stream format.
122 ///
123 /// This is equivalent to `query_to_arrow("SELECT * FROM table_name")`.
124 ///
125 /// # Arguments
126 ///
127 /// * `table_name` - The table name (should be properly escaped if needed)
128 ///
129 /// # Returns
130 ///
131 /// Raw Arrow IPC stream bytes containing all rows from the table.
132 ///
133 /// # Example
134 ///
135 /// ```no_run
136 /// # use hyperdb_api::{ArrowReader, Connection, Result};
137 /// # fn example(conn: &Connection) -> Result<()> {
138 /// let reader = ArrowReader::new(&conn);
139 /// let arrow_data = reader.table_to_arrow("my_table")?;
140 /// # Ok(())
141 /// # }
142 /// ```
143 ///
144 /// # Errors
145 ///
146 /// See [`query_to_arrow`](Self::query_to_arrow).
147 pub fn table_to_arrow(&self, table_name: &str) -> Result<Vec<u8>> {
148 self.query_to_arrow(&format!("SELECT * FROM {table_name}"))
149 }
150
151 /// Exports specific columns from a table to Arrow IPC stream format.
152 ///
153 /// # Arguments
154 ///
155 /// * `table_name` - The table name
156 /// * `columns` - Column names to export
157 ///
158 /// # Returns
159 ///
160 /// Raw Arrow IPC stream bytes containing the specified columns.
161 ///
162 /// # Example
163 ///
164 /// ```no_run
165 /// # use hyperdb_api::{ArrowReader, Connection, Result};
166 /// # fn example(conn: &Connection) -> Result<()> {
167 /// let reader = ArrowReader::new(&conn);
168 /// let arrow_data = reader.table_columns_to_arrow("users", &["id", "name", "email"])?;
169 /// # Ok(())
170 /// # }
171 /// ```
172 ///
173 /// # Errors
174 ///
175 /// See [`query_to_arrow`](Self::query_to_arrow).
176 pub fn table_columns_to_arrow(&self, table_name: &str, columns: &[&str]) -> Result<Vec<u8>> {
177 let column_list = columns.join(", ");
178 self.query_to_arrow(&format!("SELECT {column_list} FROM {table_name}"))
179 }
180
181 /// Exports a table with a WHERE clause to Arrow IPC stream format.
182 ///
183 /// # Arguments
184 ///
185 /// * `table_name` - The table name
186 /// * `where_clause` - The WHERE clause (without the "WHERE" keyword)
187 ///
188 /// # Returns
189 ///
190 /// Raw Arrow IPC stream bytes containing filtered rows.
191 ///
192 /// # Example
193 ///
194 /// ```no_run
195 /// # use hyperdb_api::{ArrowReader, Connection, Result};
196 /// # fn example(conn: &Connection) -> Result<()> {
197 /// let reader = ArrowReader::new(&conn);
198 /// let arrow_data = reader.table_filtered_to_arrow("users", "active = true")?;
199 /// # Ok(())
200 /// # }
201 /// ```
202 ///
203 /// # Errors
204 ///
205 /// See [`query_to_arrow`](Self::query_to_arrow).
206 pub fn table_filtered_to_arrow(&self, table_name: &str, where_clause: &str) -> Result<Vec<u8>> {
207 self.query_to_arrow(&format!("SELECT * FROM {table_name} WHERE {where_clause}"))
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 // Integration tests require a running Hyper instance
214 // See hyperdb-api/tests/arrow_reader_tests.rs
215}