hyperdb_api/copy.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! CSV/text export and import via COPY protocol.
5//!
6//! This module provides ergonomic APIs for:
7//! - **Exporting** query results or tables as CSV/TSV to files, writers, or strings
8//! - **Importing** CSV/TSV data from files, readers, or strings into tables
9//! - **Streaming export** for large datasets without buffering all data in memory
10//!
11//! # CSV Export
12//!
13//! ```no_run
14//! use hyperdb_api::{Connection, CreateMode, Result};
15//! use hyperdb_api::copy::CopyOptions;
16//!
17//! fn main() -> Result<()> {
18//! let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
19//!
20//! // Export to a file
21//! conn.export_csv("SELECT * FROM users", &mut std::fs::File::create("users.csv")?)?;
22//!
23//! // Export with custom options
24//! let opts = CopyOptions::csv().with_header(true).with_delimiter(b'\t');
25//! conn.export_text("SELECT * FROM users", &opts, &mut std::io::stdout())?;
26//!
27//! Ok(())
28//! }
29//! ```
30//!
31//! # CSV Import
32//!
33//! ```no_run
34//! use hyperdb_api::{Connection, CreateMode, Result};
35//!
36//! fn main() -> Result<()> {
37//! let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
38//!
39//! // Import from a file
40//! let csv_data = b"1,Alice\n2,Bob\n";
41//! let rows = conn.import_csv("my_table", &csv_data[..])?;
42//! println!("Imported {} rows", rows);
43//!
44//! Ok(())
45//! }
46//! ```
47
48use crate::connection::Connection;
49use crate::error::{Error, Result};
50
51/// Default import chunk size (1 MB).
52const DEFAULT_IMPORT_CHUNK_SIZE: usize = 1024 * 1024;
53
54/// Options for COPY text format operations (CSV, TSV, etc.).
55///
56/// # Example
57///
58/// ```
59/// use hyperdb_api::copy::CopyOptions;
60///
61/// // CSV with header
62/// let opts = CopyOptions::csv().with_header(true);
63///
64/// // TSV (tab-separated)
65/// let opts = CopyOptions::tsv();
66///
67/// // Custom delimiter
68/// let opts = CopyOptions::csv().with_delimiter(b'|');
69/// ```
70#[derive(Debug, Clone)]
71pub struct CopyOptions {
72 format: CopyFormat,
73 header: bool,
74 delimiter: Option<u8>,
75 null_string: Option<String>,
76 quote: Option<u8>,
77 escape: Option<u8>,
78 /// Import chunk size in bytes. `None` means use the default (1 MB).
79 chunk_size: Option<usize>,
80}
81
82/// COPY format type.
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84enum CopyFormat {
85 Csv,
86 Text,
87}
88
89impl CopyOptions {
90 /// Creates CSV format options (comma-separated, no header by default).
91 #[must_use]
92 pub fn csv() -> Self {
93 CopyOptions {
94 format: CopyFormat::Csv,
95 header: false,
96 delimiter: None,
97 null_string: None,
98 quote: None,
99 escape: None,
100 chunk_size: None,
101 }
102 }
103
104 /// Creates TSV format options (tab-separated).
105 #[must_use]
106 pub fn tsv() -> Self {
107 CopyOptions {
108 format: CopyFormat::Text,
109 header: false,
110 delimiter: Some(b'\t'),
111 null_string: None,
112 quote: None,
113 escape: None,
114 chunk_size: None,
115 }
116 }
117
118 /// Creates plain text format options (tab-separated, Hyper default).
119 #[must_use]
120 pub fn text() -> Self {
121 CopyOptions {
122 format: CopyFormat::Text,
123 header: false,
124 delimiter: None,
125 null_string: None,
126 quote: None,
127 escape: None,
128 chunk_size: None,
129 }
130 }
131
132 /// Enables or disables the header row.
133 #[must_use]
134 pub fn with_header(mut self, header: bool) -> Self {
135 self.header = header;
136 self
137 }
138
139 /// Sets the column delimiter character.
140 #[must_use]
141 pub fn with_delimiter(mut self, delimiter: u8) -> Self {
142 self.delimiter = Some(delimiter);
143 self
144 }
145
146 #[must_use]
147 /// Sets the string used to represent NULL values.
148 pub fn with_null(mut self, null_string: impl Into<String>) -> Self {
149 self.null_string = Some(null_string.into());
150 self
151 }
152
153 /// Sets the quote character for CSV format.
154 #[must_use]
155 pub fn with_quote(mut self, quote: u8) -> Self {
156 self.quote = Some(quote);
157 self
158 }
159
160 /// Sets the escape character for CSV format.
161 #[must_use]
162 pub fn with_escape(mut self, escape: u8) -> Self {
163 self.escape = Some(escape);
164 self
165 }
166
167 /// Sets the import chunk size in bytes.
168 ///
169 /// Controls the buffer size used when streaming data from a reader
170 /// during [`import_text()`](Connection::import_text). Larger values
171 /// reduce syscall overhead but use more memory.
172 ///
173 /// Default is 1 MB. Typical range: 64 KB to 16 MB.
174 ///
175 /// # Panics
176 ///
177 /// Panics if `size` is 0.
178 #[must_use]
179 pub fn with_chunk_size(mut self, size: usize) -> Self {
180 assert!(size > 0, "chunk size must be > 0");
181 self.chunk_size = Some(size);
182 self
183 }
184
185 /// Validates that the option combination is legal.
186 ///
187 /// Catches invalid combinations early (e.g., CSV-only options on TEXT
188 /// format) instead of letting them surface as opaque server errors.
189 fn validate(&self) -> Result<()> {
190 if self.format == CopyFormat::Text {
191 if self.quote.is_some() {
192 return Err(Error::new(
193 "QUOTE option is only supported with CSV format. \
194 Use CopyOptions::csv() instead of CopyOptions::text().",
195 ));
196 }
197 if self.escape.is_some() {
198 return Err(Error::new(
199 "ESCAPE option is only supported with CSV format. \
200 Use CopyOptions::csv() instead of CopyOptions::text().",
201 ));
202 }
203 }
204 Ok(())
205 }
206
207 /// Builds the WITH clause for COPY TO STDOUT.
208 fn to_copy_out_options(&self) -> String {
209 let mut parts = Vec::new();
210 match self.format {
211 CopyFormat::Csv => parts.push("FORMAT csv".to_string()),
212 CopyFormat::Text => parts.push("FORMAT text".to_string()),
213 }
214 if self.header {
215 parts.push("HEADER true".to_string());
216 }
217 if let Some(d) = self.delimiter {
218 parts.push(format!("DELIMITER E'\\x{d:02x}'"));
219 }
220 if let Some(ref n) = self.null_string {
221 parts.push(format!("NULL '{}'", n.replace('\'', "''")));
222 }
223 if let Some(q) = self.quote {
224 parts.push(format!("QUOTE E'\\x{q:02x}'"));
225 }
226 if let Some(e) = self.escape {
227 parts.push(format!("ESCAPE E'\\x{e:02x}'"));
228 }
229 format!("WITH ({})", parts.join(", "))
230 }
231
232 /// Builds the WITH clause for COPY FROM STDIN.
233 fn to_copy_in_options(&self) -> String {
234 self.to_copy_out_options()
235 }
236}
237
238impl Connection {
239 /// Exports query results as CSV to a writer.
240 ///
241 /// Uses default CSV options (comma-separated, with header row).
242 /// For custom options, use [`export_text`](Self::export_text).
243 ///
244 /// This method streams data directly to the writer without buffering
245 /// the entire result set in memory, making it safe for large exports.
246 ///
247 /// Returns the number of bytes written.
248 ///
249 /// # Example
250 ///
251 /// ```no_run
252 /// use hyperdb_api::{Connection, CreateMode, Result};
253 ///
254 /// fn main() -> Result<()> {
255 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
256 ///
257 /// // Export to file
258 /// let mut file = std::fs::File::create("output.csv")?;
259 /// conn.export_csv("SELECT * FROM users", &mut file)?;
260 ///
261 /// // Export to string
262 /// let mut buf = Vec::new();
263 /// conn.export_csv("SELECT * FROM users", &mut buf)?;
264 /// let csv_string = String::from_utf8(buf).unwrap();
265 /// Ok(())
266 /// }
267 /// ```
268 ///
269 /// # Errors
270 ///
271 /// - Returns [`Error::Other`] if the connection is using gRPC transport
272 /// (COPY is TCP-only).
273 /// - Returns [`Error::Client`] if the server rejects the
274 /// `COPY (<select_query>) TO STDOUT` statement.
275 /// - Returns [`Error::Io`] if writing to `writer` fails.
276 pub fn export_csv(&self, select_query: &str, writer: &mut dyn std::io::Write) -> Result<u64> {
277 let opts = CopyOptions::csv().with_header(true);
278 self.export_text(select_query, &opts, writer)
279 }
280
281 /// Exports query results as text (CSV/TSV/custom) to a writer.
282 ///
283 /// This method streams data directly to the writer without buffering
284 /// the entire result set in memory.
285 ///
286 /// Returns the number of bytes written.
287 ///
288 /// # Example
289 ///
290 /// ```no_run
291 /// use hyperdb_api::{Connection, CreateMode, Result};
292 /// use hyperdb_api::copy::CopyOptions;
293 ///
294 /// fn main() -> Result<()> {
295 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
296 ///
297 /// // TSV export
298 /// let opts = CopyOptions::tsv().with_header(true);
299 /// let mut buf = Vec::new();
300 /// conn.export_text("SELECT * FROM users", &opts, &mut buf)?;
301 ///
302 /// // Pipe-separated with custom NULL
303 /// let opts = CopyOptions::csv()
304 /// .with_delimiter(b'|')
305 /// .with_null("\\N".to_string())
306 /// .with_header(true);
307 /// conn.export_text("SELECT * FROM users", &opts, &mut std::io::stdout())?;
308 /// Ok(())
309 /// }
310 /// ```
311 ///
312 /// # Errors
313 ///
314 /// - Returns [`Error::Other`] if `options` fail validation (e.g. an
315 /// illegal delimiter/quote combination) or the connection is on
316 /// gRPC.
317 /// - Returns [`Error::Client`] if the server rejects the
318 /// `COPY TO STDOUT` statement.
319 /// - Returns [`Error::Io`] if writing to `writer` fails.
320 pub fn export_text(
321 &self,
322 select_query: &str,
323 options: &CopyOptions,
324 writer: &mut dyn std::io::Write,
325 ) -> Result<u64> {
326 options.validate()?;
327 let copy_query = format!(
328 "COPY ({}) TO STDOUT {}",
329 select_query,
330 options.to_copy_out_options()
331 );
332 let client = self.tcp_client().ok_or_else(|| {
333 Error::new(
334 "CSV export requires a TCP connection. gRPC does not support COPY operations.",
335 )
336 })?;
337 Ok(client.copy_out_to_writer(©_query, writer)?)
338 }
339
340 /// Exports query results as CSV to a String.
341 ///
342 /// Convenience method that collects the CSV output into a String.
343 /// For large datasets, prefer [`export_csv`](Self::export_csv) with a file writer.
344 ///
345 /// # Example
346 ///
347 /// ```no_run
348 /// use hyperdb_api::{Connection, CreateMode, Result};
349 ///
350 /// fn main() -> Result<()> {
351 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
352 /// let csv = conn.export_csv_string("SELECT id, name FROM users")?;
353 /// println!("{}", csv);
354 /// Ok(())
355 /// }
356 /// ```
357 ///
358 /// # Errors
359 ///
360 /// - Returns whatever [`export_csv`](Self::export_csv) returns.
361 /// - Returns [`Error::Other`] with message
362 /// `"CSV output is not valid UTF-8"` if the server emitted bytes that
363 /// are not valid UTF-8 (a Hyper server only emits UTF-8, so this
364 /// indicates a non-UTF-8 `CLIENT_ENCODING` setting).
365 pub fn export_csv_string(&self, select_query: &str) -> Result<String> {
366 let mut buf = Vec::new();
367 self.export_csv(select_query, &mut buf)?;
368 String::from_utf8(buf)
369 .map_err(|e| Error::new(format!("CSV output is not valid UTF-8: {e}")))
370 }
371
372 /// Imports CSV data from a reader into a table.
373 ///
374 /// Uses default CSV options (comma-separated, no header).
375 /// For custom options, use [`import_text`](Self::import_text).
376 ///
377 /// Returns the number of rows imported.
378 ///
379 /// # Example
380 ///
381 /// ```no_run
382 /// use hyperdb_api::{Connection, CreateMode, Result};
383 ///
384 /// fn main() -> Result<()> {
385 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
386 ///
387 /// // Import from a string
388 /// let csv = "1,Alice\n2,Bob\n";
389 /// let rows = conn.import_csv("users", csv.as_bytes())?;
390 ///
391 /// // Import from a file
392 /// let file = std::fs::File::open("data.csv")?;
393 /// let rows = conn.import_csv("users", file)?;
394 /// Ok(())
395 /// }
396 /// ```
397 ///
398 /// # Errors
399 ///
400 /// See [`import_text`](Self::import_text).
401 pub fn import_csv(&self, table_name: &str, reader: impl std::io::Read) -> Result<u64> {
402 let opts = CopyOptions::csv();
403 self.import_text(table_name, &opts, reader)
404 }
405
406 /// Imports CSV data with header from a reader into a table.
407 ///
408 /// The first line is treated as a header row and skipped.
409 ///
410 /// Returns the number of rows imported.
411 ///
412 /// # Errors
413 ///
414 /// See [`import_text`](Self::import_text).
415 pub fn import_csv_with_header(
416 &self,
417 table_name: &str,
418 reader: impl std::io::Read,
419 ) -> Result<u64> {
420 let opts = CopyOptions::csv().with_header(true);
421 self.import_text(table_name, &opts, reader)
422 }
423
424 /// Imports text-format data (CSV/TSV/custom) from a reader into a table.
425 ///
426 /// Streams data in chunks to the server via COPY FROM STDIN, keeping
427 /// memory usage constant regardless of input size.
428 ///
429 /// Returns the number of rows imported.
430 ///
431 /// # Example
432 ///
433 /// ```no_run
434 /// use hyperdb_api::{Connection, CreateMode, Result};
435 /// use hyperdb_api::copy::CopyOptions;
436 ///
437 /// fn main() -> Result<()> {
438 /// let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
439 ///
440 /// // Import TSV data
441 /// let opts = CopyOptions::tsv();
442 /// let tsv = "1\tAlice\n2\tBob\n";
443 /// let rows = conn.import_text("users", &opts, tsv.as_bytes())?;
444 ///
445 /// // Import pipe-delimited with header
446 /// let opts = CopyOptions::csv().with_delimiter(b'|').with_header(true);
447 /// let data = "id|name\n1|Alice\n2|Bob\n";
448 /// let rows = conn.import_text("users", &opts, data.as_bytes())?;
449 /// Ok(())
450 /// }
451 /// ```
452 ///
453 /// # Errors
454 ///
455 /// - Returns [`Error::Other`] if `options` fail validation or the
456 /// connection is on gRPC.
457 /// - Returns [`Error::Client`] if the server rejects the
458 /// `COPY <table> FROM STDIN` statement or a row during import.
459 /// - Returns [`Error::Io`] if reading from `reader` fails.
460 pub fn import_text(
461 &self,
462 table_name: &str,
463 options: &CopyOptions,
464 mut reader: impl std::io::Read,
465 ) -> Result<u64> {
466 options.validate()?;
467 let escaped_table = table_name.replace('"', "\"\"");
468 let copy_query = format!(
469 "COPY \"{}\" FROM STDIN {}",
470 escaped_table,
471 options.to_copy_in_options()
472 );
473
474 let client = self.tcp_client().ok_or_else(|| {
475 Error::new(
476 "CSV import requires a TCP connection. gRPC does not support COPY operations.",
477 )
478 })?;
479
480 let mut writer = client.copy_in_raw(©_query)?;
481
482 // Stream data in chunks (default 1 MB, configurable via with_chunk_size)
483 let chunk_size = options.chunk_size.unwrap_or(DEFAULT_IMPORT_CHUNK_SIZE);
484 let mut buf = vec![0u8; chunk_size];
485 loop {
486 let n = reader
487 .read(&mut buf)
488 .map_err(|e| Error::with_cause("Failed to read import data", e))?;
489 if n == 0 {
490 break;
491 }
492 writer.send(&buf[..n])?;
493 }
494
495 Ok(writer.finish()?)
496 }
497}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502
503 #[test]
504 fn test_csv_options_valid() {
505 let opts = CopyOptions::csv().with_quote(b'"').with_escape(b'\\');
506 assert!(opts.validate().is_ok());
507 }
508
509 #[test]
510 fn test_text_quote_rejected() {
511 let opts = CopyOptions::text().with_quote(b'"');
512 let err = opts.validate().unwrap_err();
513 assert!(err.to_string().contains("QUOTE"));
514 assert!(err.to_string().contains("CSV format"));
515 }
516
517 #[test]
518 fn test_text_escape_rejected() {
519 let opts = CopyOptions::text().with_escape(b'\\');
520 let err = opts.validate().unwrap_err();
521 assert!(err.to_string().contains("ESCAPE"));
522 assert!(err.to_string().contains("CSV format"));
523 }
524
525 #[test]
526 fn test_tsv_quote_rejected() {
527 let opts = CopyOptions::tsv().with_quote(b'"');
528 let err = opts.validate().unwrap_err();
529 assert!(err.to_string().contains("QUOTE"));
530 }
531
532 #[test]
533 fn test_text_without_csv_options_valid() {
534 let opts = CopyOptions::text().with_header(true).with_delimiter(b'|');
535 assert!(opts.validate().is_ok());
536 }
537
538 #[test]
539 fn test_chunk_size_custom() {
540 let opts = CopyOptions::csv().with_chunk_size(4 * 1024 * 1024);
541 assert_eq!(opts.chunk_size, Some(4 * 1024 * 1024));
542 }
543
544 #[test]
545 fn test_chunk_size_default() {
546 let opts = CopyOptions::csv();
547 assert_eq!(
548 opts.chunk_size.unwrap_or(DEFAULT_IMPORT_CHUNK_SIZE),
549 1024 * 1024
550 );
551 }
552
553 #[test]
554 #[should_panic(expected = "chunk size must be > 0")]
555 fn test_chunk_size_zero_panics() {
556 let _ = CopyOptions::csv().with_chunk_size(0);
557 }
558
559 #[test]
560 fn test_copy_in_options_csv() {
561 let opts = CopyOptions::csv().with_header(true).with_delimiter(b'|');
562 let sql = opts.to_copy_in_options();
563 assert!(sql.contains("FORMAT csv"));
564 assert!(sql.contains("HEADER true"));
565 assert!(sql.contains("DELIMITER"));
566 }
567
568 #[test]
569 fn test_copy_in_options_text() {
570 let opts = CopyOptions::text();
571 let sql = opts.to_copy_in_options();
572 assert!(sql.contains("FORMAT text"));
573 }
574}