hyperdb_api/arrow_reader.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Arrow IPC stream reader for query results.
5//!
6//! This module provides the [`ArrowReader`] struct for reading query results
7//! in Arrow IPC stream format.
8//!
9//! Unlike regular query execution which returns row-by-row results, `ArrowReader`
10//! returns complete Arrow IPC streams, which can be directly consumed by Arrow
11//! libraries like the `arrow` crate.
12//!
13//! # Example
14//!
15//! ```no_run
16//! # use hyperdb_api::{ArrowReader, Connection, Result};
17//! # fn example(conn: &Connection) -> Result<()> {
18//! use hyperdb_api::{ArrowReader, Connection};
19//!
20//! let reader = ArrowReader::new(&conn);
21//!
22//! // Query results as Arrow IPC stream
23//! let arrow_data = reader.query_to_arrow("SELECT * FROM my_table")?;
24//!
25//! // Or export an entire table
26//! let arrow_data = reader.table_to_arrow("my_table")?;
27//! # Ok(())
28//! # }
29//! ```
30
31use crate::connection::Connection;
32use crate::error::Result;
33
34/// Reads query results in Arrow IPC stream format.
35///
36/// `ArrowReader` provides methods to execute queries and receive results as
37/// Arrow IPC stream data. This is useful for integration with Arrow-based
38/// data processing pipelines.
39///
40/// # How It Works
41///
42/// Internally, `ArrowReader` uses `COPY (SELECT ...) TO STDOUT WITH (format arrowstream)`
43/// to retrieve query results in Arrow format. The returned bytes are a valid
44/// Arrow IPC stream containing:
45/// 1. A schema message
46/// 2. One or more record batch messages
47///
48/// # Example
49///
50/// ```no_run
51/// use hyperdb_api::{ArrowReader, Connection, CreateMode, Result};
52///
53/// fn main() -> Result<()> {
54/// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::CreateIfNotExists)?;
55///
56/// // Create and populate a table
57/// conn.execute_command("CREATE TABLE data (id INT, value DOUBLE PRECISION)")?;
58/// conn.execute_command("INSERT INTO data VALUES (1, 1.5), (2, 2.5), (3, 3.5)")?;
59///
60/// // Read the table as Arrow
61/// let reader = ArrowReader::new(&conn);
62/// let arrow_data = reader.table_to_arrow("data")?;
63///
64/// println!("Got {} bytes of Arrow IPC data", arrow_data.len());
65/// Ok(())
66/// }
67/// ```
68#[derive(Debug)]
69pub struct ArrowReader<'conn> {
70 connection: &'conn Connection,
71}
72
73impl<'conn> ArrowReader<'conn> {
74 /// Creates a new Arrow reader for the given connection.
75 pub fn new(connection: &'conn Connection) -> Self {
76 ArrowReader { connection }
77 }
78
79 /// Executes a SELECT query and returns results as Arrow IPC stream.
80 ///
81 /// The query should be a SELECT statement. It will be wrapped in a
82 /// `COPY (...) TO STDOUT WITH (format arrowstream)` to retrieve the
83 /// results in Arrow format.
84 ///
85 /// # Arguments
86 ///
87 /// * `select_query` - A SELECT query (without COPY wrapper)
88 ///
89 /// # Returns
90 ///
91 /// Raw Arrow IPC stream bytes that can be parsed by Arrow libraries.
92 ///
93 /// # Example
94 ///
95 /// ```no_run
96 /// # use hyperdb_api::{ArrowReader, Connection, Result};
97 /// # fn example(conn: &Connection) -> Result<()> {
98 /// let reader = ArrowReader::new(&conn);
99 /// let arrow_data = reader.query_to_arrow("SELECT id, name FROM users WHERE active = true")?;
100 /// # Ok(())
101 /// # }
102 /// ```
103 ///
104 /// # Errors
105 ///
106 /// - Returns [`crate::Error::Other`] if the connection is using gRPC
107 /// transport (ArrowReader wraps `COPY TO STDOUT`, which is TCP-only).
108 /// - Returns [`crate::Error::Client`] if the server rejects the
109 /// `COPY (<query>) TO STDOUT WITH (format arrowstream)` statement.
110 /// - Returns [`crate::Error::Io`] on transport-level I/O failures.
111 pub fn query_to_arrow(&self, select_query: &str) -> Result<Vec<u8>> {
112 let copy_query = format!("COPY ({select_query}) TO STDOUT WITH (format arrowstream)");
113 let client = self.connection.tcp_client().ok_or_else(|| {
114 crate::Error::new("ArrowReader requires a TCP connection. Use Connection::execute_query_to_arrow() for gRPC.")
115 })?;
116 Ok(client.copy_out(©_query)?)
117 }
118
119 /// Exports an entire table to Arrow IPC stream format.
120 ///
121 /// This is equivalent to `query_to_arrow("SELECT * FROM table_name")`.
122 ///
123 /// # Arguments
124 ///
125 /// * `table_name` - The table name (should be properly escaped if needed)
126 ///
127 /// # Returns
128 ///
129 /// Raw Arrow IPC stream bytes containing all rows from the table.
130 ///
131 /// # Example
132 ///
133 /// ```no_run
134 /// # use hyperdb_api::{ArrowReader, Connection, Result};
135 /// # fn example(conn: &Connection) -> Result<()> {
136 /// let reader = ArrowReader::new(&conn);
137 /// let arrow_data = reader.table_to_arrow("my_table")?;
138 /// # Ok(())
139 /// # }
140 /// ```
141 ///
142 /// # Errors
143 ///
144 /// See [`query_to_arrow`](Self::query_to_arrow).
145 pub fn table_to_arrow(&self, table_name: &str) -> Result<Vec<u8>> {
146 self.query_to_arrow(&format!("SELECT * FROM {table_name}"))
147 }
148
149 /// Exports specific columns from a table to Arrow IPC stream format.
150 ///
151 /// # Arguments
152 ///
153 /// * `table_name` - The table name
154 /// * `columns` - Column names to export
155 ///
156 /// # Returns
157 ///
158 /// Raw Arrow IPC stream bytes containing the specified columns.
159 ///
160 /// # Example
161 ///
162 /// ```no_run
163 /// # use hyperdb_api::{ArrowReader, Connection, Result};
164 /// # fn example(conn: &Connection) -> Result<()> {
165 /// let reader = ArrowReader::new(&conn);
166 /// let arrow_data = reader.table_columns_to_arrow("users", &["id", "name", "email"])?;
167 /// # Ok(())
168 /// # }
169 /// ```
170 ///
171 /// # Errors
172 ///
173 /// See [`query_to_arrow`](Self::query_to_arrow).
174 pub fn table_columns_to_arrow(&self, table_name: &str, columns: &[&str]) -> Result<Vec<u8>> {
175 let column_list = columns.join(", ");
176 self.query_to_arrow(&format!("SELECT {column_list} FROM {table_name}"))
177 }
178
179 /// Exports a table with a WHERE clause to Arrow IPC stream format.
180 ///
181 /// # Arguments
182 ///
183 /// * `table_name` - The table name
184 /// * `where_clause` - The WHERE clause (without the "WHERE" keyword)
185 ///
186 /// # Returns
187 ///
188 /// Raw Arrow IPC stream bytes containing filtered rows.
189 ///
190 /// # Example
191 ///
192 /// ```no_run
193 /// # use hyperdb_api::{ArrowReader, Connection, Result};
194 /// # fn example(conn: &Connection) -> Result<()> {
195 /// let reader = ArrowReader::new(&conn);
196 /// let arrow_data = reader.table_filtered_to_arrow("users", "active = true")?;
197 /// # Ok(())
198 /// # }
199 /// ```
200 ///
201 /// # Errors
202 ///
203 /// See [`query_to_arrow`](Self::query_to_arrow).
204 pub fn table_filtered_to_arrow(&self, table_name: &str, where_clause: &str) -> Result<Vec<u8>> {
205 self.query_to_arrow(&format!("SELECT * FROM {table_name} WHERE {where_clause}"))
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 // Integration tests require a running Hyper instance
212 // See hyperdb-api/tests/arrow_reader_tests.rs
213}