Skip to main content

hyperdb_api/
copy.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! CSV/text export and import via COPY protocol.
5//!
6//! This module provides ergonomic APIs for:
7//! - **Exporting** query results or tables as CSV/TSV to files, writers, or strings
8//! - **Importing** CSV/TSV data from files, readers, or strings into tables
9//! - **Streaming export** for large datasets without buffering all data in memory
10//!
11//! # CSV Export
12//!
13//! ```no_run
14//! use hyperdb_api::{Connection, CreateMode, Result};
15//! use hyperdb_api::copy::CopyOptions;
16//!
17//! fn main() -> Result<()> {
18//!     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
19//!
20//!     // Export to a file
21//!     conn.export_csv("SELECT * FROM users", &mut std::fs::File::create("users.csv")?)?;
22//!
23//!     // Export with custom options
24//!     let opts = CopyOptions::csv().with_header(true).with_delimiter(b'\t');
25//!     conn.export_text("SELECT * FROM users", &opts, &mut std::io::stdout())?;
26//!
27//!     Ok(())
28//! }
29//! ```
30//!
31//! # CSV Import
32//!
33//! ```no_run
34//! use hyperdb_api::{Connection, CreateMode, Result};
35//!
36//! fn main() -> Result<()> {
37//!     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
38//!
39//!     // Import from a file
40//!     let csv_data = b"1,Alice\n2,Bob\n";
41//!     let rows = conn.import_csv("my_table", &csv_data[..])?;
42//!     println!("Imported {} rows", rows);
43//!
44//!     Ok(())
45//! }
46//! ```
47
48use crate::connection::Connection;
49use crate::error::{Error, Result};
50
51/// Default import chunk size (1 MB).
52const DEFAULT_IMPORT_CHUNK_SIZE: usize = 1024 * 1024;
53
54/// Options for COPY text format operations (CSV, TSV, etc.).
55///
56/// # Example
57///
58/// ```
59/// use hyperdb_api::copy::CopyOptions;
60///
61/// // CSV with header
62/// let opts = CopyOptions::csv().with_header(true);
63///
64/// // TSV (tab-separated)
65/// let opts = CopyOptions::tsv();
66///
67/// // Custom delimiter
68/// let opts = CopyOptions::csv().with_delimiter(b'|');
69/// ```
70#[derive(Debug, Clone)]
71pub struct CopyOptions {
72    format: CopyFormat,
73    header: bool,
74    delimiter: Option<u8>,
75    null_string: Option<String>,
76    quote: Option<u8>,
77    escape: Option<u8>,
78    /// Import chunk size in bytes. `None` means use the default (1 MB).
79    chunk_size: Option<usize>,
80}
81
82/// COPY format type.
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
84enum CopyFormat {
85    Csv,
86    Text,
87}
88
89impl CopyOptions {
90    /// Creates CSV format options (comma-separated, no header by default).
91    #[must_use]
92    pub fn csv() -> Self {
93        CopyOptions {
94            format: CopyFormat::Csv,
95            header: false,
96            delimiter: None,
97            null_string: None,
98            quote: None,
99            escape: None,
100            chunk_size: None,
101        }
102    }
103
104    /// Creates TSV format options (tab-separated).
105    #[must_use]
106    pub fn tsv() -> Self {
107        CopyOptions {
108            format: CopyFormat::Text,
109            header: false,
110            delimiter: Some(b'\t'),
111            null_string: None,
112            quote: None,
113            escape: None,
114            chunk_size: None,
115        }
116    }
117
118    /// Creates plain text format options (tab-separated, Hyper default).
119    #[must_use]
120    pub fn text() -> Self {
121        CopyOptions {
122            format: CopyFormat::Text,
123            header: false,
124            delimiter: None,
125            null_string: None,
126            quote: None,
127            escape: None,
128            chunk_size: None,
129        }
130    }
131
132    /// Enables or disables the header row.
133    #[must_use]
134    pub fn with_header(mut self, header: bool) -> Self {
135        self.header = header;
136        self
137    }
138
139    /// Sets the column delimiter character.
140    #[must_use]
141    pub fn with_delimiter(mut self, delimiter: u8) -> Self {
142        self.delimiter = Some(delimiter);
143        self
144    }
145
146    #[must_use]
147    /// Sets the string used to represent NULL values.
148    pub fn with_null(mut self, null_string: impl Into<String>) -> Self {
149        self.null_string = Some(null_string.into());
150        self
151    }
152
153    /// Sets the quote character for CSV format.
154    #[must_use]
155    pub fn with_quote(mut self, quote: u8) -> Self {
156        self.quote = Some(quote);
157        self
158    }
159
160    /// Sets the escape character for CSV format.
161    #[must_use]
162    pub fn with_escape(mut self, escape: u8) -> Self {
163        self.escape = Some(escape);
164        self
165    }
166
167    /// Sets the import chunk size in bytes.
168    ///
169    /// Controls the buffer size used when streaming data from a reader
170    /// during [`import_text()`](Connection::import_text). Larger values
171    /// reduce syscall overhead but use more memory.
172    ///
173    /// Default is 1 MB. Typical range: 64 KB to 16 MB.
174    ///
175    /// # Panics
176    ///
177    /// Panics if `size` is 0.
178    #[must_use]
179    pub fn with_chunk_size(mut self, size: usize) -> Self {
180        assert!(size > 0, "chunk size must be > 0");
181        self.chunk_size = Some(size);
182        self
183    }
184
185    /// Validates that the option combination is legal.
186    ///
187    /// Catches invalid combinations early (e.g., CSV-only options on TEXT
188    /// format) instead of letting them surface as opaque server errors.
189    fn validate(&self) -> Result<()> {
190        if self.format == CopyFormat::Text {
191            if self.quote.is_some() {
192                return Err(Error::new(
193                    "QUOTE option is only supported with CSV format. \
194                     Use CopyOptions::csv() instead of CopyOptions::text().",
195                ));
196            }
197            if self.escape.is_some() {
198                return Err(Error::new(
199                    "ESCAPE option is only supported with CSV format. \
200                     Use CopyOptions::csv() instead of CopyOptions::text().",
201                ));
202            }
203        }
204        Ok(())
205    }
206
207    /// Builds the WITH clause for COPY TO STDOUT.
208    fn to_copy_out_options(&self) -> String {
209        let mut parts = Vec::new();
210        match self.format {
211            CopyFormat::Csv => parts.push("FORMAT csv".to_string()),
212            CopyFormat::Text => parts.push("FORMAT text".to_string()),
213        }
214        if self.header {
215            parts.push("HEADER true".to_string());
216        }
217        if let Some(d) = self.delimiter {
218            parts.push(format!("DELIMITER E'\\x{d:02x}'"));
219        }
220        if let Some(ref n) = self.null_string {
221            parts.push(format!("NULL '{}'", n.replace('\'', "''")));
222        }
223        if let Some(q) = self.quote {
224            parts.push(format!("QUOTE E'\\x{q:02x}'"));
225        }
226        if let Some(e) = self.escape {
227            parts.push(format!("ESCAPE E'\\x{e:02x}'"));
228        }
229        format!("WITH ({})", parts.join(", "))
230    }
231
232    /// Builds the WITH clause for COPY FROM STDIN.
233    fn to_copy_in_options(&self) -> String {
234        self.to_copy_out_options()
235    }
236}
237
238impl Connection {
239    /// Exports query results as CSV to a writer.
240    ///
241    /// Uses default CSV options (comma-separated, with header row).
242    /// For custom options, use [`export_text`](Self::export_text).
243    ///
244    /// This method streams data directly to the writer without buffering
245    /// the entire result set in memory, making it safe for large exports.
246    ///
247    /// Returns the number of bytes written.
248    ///
249    /// # Example
250    ///
251    /// ```no_run
252    /// use hyperdb_api::{Connection, CreateMode, Result};
253    ///
254    /// fn main() -> Result<()> {
255    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
256    ///
257    ///     // Export to file
258    ///     let mut file = std::fs::File::create("output.csv")?;
259    ///     conn.export_csv("SELECT * FROM users", &mut file)?;
260    ///
261    ///     // Export to string
262    ///     let mut buf = Vec::new();
263    ///     conn.export_csv("SELECT * FROM users", &mut buf)?;
264    ///     let csv_string = String::from_utf8(buf).unwrap();
265    ///     Ok(())
266    /// }
267    /// ```
268    ///
269    /// # Errors
270    ///
271    /// - Returns [`Error::Other`] if the connection is using gRPC transport
272    ///   (COPY is TCP-only).
273    /// - Returns [`Error::Client`] if the server rejects the
274    ///   `COPY (<select_query>) TO STDOUT` statement.
275    /// - Returns [`Error::Io`] if writing to `writer` fails.
276    pub fn export_csv(&self, select_query: &str, writer: &mut dyn std::io::Write) -> Result<u64> {
277        let opts = CopyOptions::csv().with_header(true);
278        self.export_text(select_query, &opts, writer)
279    }
280
281    /// Exports query results as text (CSV/TSV/custom) to a writer.
282    ///
283    /// This method streams data directly to the writer without buffering
284    /// the entire result set in memory.
285    ///
286    /// Returns the number of bytes written.
287    ///
288    /// # Example
289    ///
290    /// ```no_run
291    /// use hyperdb_api::{Connection, CreateMode, Result};
292    /// use hyperdb_api::copy::CopyOptions;
293    ///
294    /// fn main() -> Result<()> {
295    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
296    ///
297    ///     // TSV export
298    ///     let opts = CopyOptions::tsv().with_header(true);
299    ///     let mut buf = Vec::new();
300    ///     conn.export_text("SELECT * FROM users", &opts, &mut buf)?;
301    ///
302    ///     // Pipe-separated with custom NULL
303    ///     let opts = CopyOptions::csv()
304    ///         .with_delimiter(b'|')
305    ///         .with_null("\\N".to_string())
306    ///         .with_header(true);
307    ///     conn.export_text("SELECT * FROM users", &opts, &mut std::io::stdout())?;
308    ///     Ok(())
309    /// }
310    /// ```
311    ///
312    /// # Errors
313    ///
314    /// - Returns [`Error::Other`] if `options` fail validation (e.g. an
315    ///   illegal delimiter/quote combination) or the connection is on
316    ///   gRPC.
317    /// - Returns [`Error::Client`] if the server rejects the
318    ///   `COPY TO STDOUT` statement.
319    /// - Returns [`Error::Io`] if writing to `writer` fails.
320    pub fn export_text(
321        &self,
322        select_query: &str,
323        options: &CopyOptions,
324        writer: &mut dyn std::io::Write,
325    ) -> Result<u64> {
326        options.validate()?;
327        let copy_query = format!(
328            "COPY ({}) TO STDOUT {}",
329            select_query,
330            options.to_copy_out_options()
331        );
332        let client = self.tcp_client().ok_or_else(|| {
333            Error::new(
334                "CSV export requires a TCP connection. gRPC does not support COPY operations.",
335            )
336        })?;
337        Ok(client.copy_out_to_writer(&copy_query, writer)?)
338    }
339
340    /// Exports query results as CSV to a String.
341    ///
342    /// Convenience method that collects the CSV output into a String.
343    /// For large datasets, prefer [`export_csv`](Self::export_csv) with a file writer.
344    ///
345    /// # Example
346    ///
347    /// ```no_run
348    /// use hyperdb_api::{Connection, CreateMode, Result};
349    ///
350    /// fn main() -> Result<()> {
351    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
352    ///     let csv = conn.export_csv_string("SELECT id, name FROM users")?;
353    ///     println!("{}", csv);
354    ///     Ok(())
355    /// }
356    /// ```
357    ///
358    /// # Errors
359    ///
360    /// - Returns whatever [`export_csv`](Self::export_csv) returns.
361    /// - Returns [`Error::Other`] with message
362    ///   `"CSV output is not valid UTF-8"` if the server emitted bytes that
363    ///   are not valid UTF-8 (a Hyper server only emits UTF-8, so this
364    ///   indicates a non-UTF-8 `CLIENT_ENCODING` setting).
365    pub fn export_csv_string(&self, select_query: &str) -> Result<String> {
366        let mut buf = Vec::new();
367        self.export_csv(select_query, &mut buf)?;
368        String::from_utf8(buf)
369            .map_err(|e| Error::new(format!("CSV output is not valid UTF-8: {e}")))
370    }
371
372    /// Imports CSV data from a reader into a table.
373    ///
374    /// Uses default CSV options (comma-separated, no header).
375    /// For custom options, use [`import_text`](Self::import_text).
376    ///
377    /// Returns the number of rows imported.
378    ///
379    /// # Example
380    ///
381    /// ```no_run
382    /// use hyperdb_api::{Connection, CreateMode, Result};
383    ///
384    /// fn main() -> Result<()> {
385    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
386    ///
387    ///     // Import from a string
388    ///     let csv = "1,Alice\n2,Bob\n";
389    ///     let rows = conn.import_csv("users", csv.as_bytes())?;
390    ///
391    ///     // Import from a file
392    ///     let file = std::fs::File::open("data.csv")?;
393    ///     let rows = conn.import_csv("users", file)?;
394    ///     Ok(())
395    /// }
396    /// ```
397    ///
398    /// # Errors
399    ///
400    /// See [`import_text`](Self::import_text).
401    pub fn import_csv(&self, table_name: &str, reader: impl std::io::Read) -> Result<u64> {
402        let opts = CopyOptions::csv();
403        self.import_text(table_name, &opts, reader)
404    }
405
406    /// Imports CSV data with header from a reader into a table.
407    ///
408    /// The first line is treated as a header row and skipped.
409    ///
410    /// Returns the number of rows imported.
411    ///
412    /// # Errors
413    ///
414    /// See [`import_text`](Self::import_text).
415    pub fn import_csv_with_header(
416        &self,
417        table_name: &str,
418        reader: impl std::io::Read,
419    ) -> Result<u64> {
420        let opts = CopyOptions::csv().with_header(true);
421        self.import_text(table_name, &opts, reader)
422    }
423
424    /// Imports text-format data (CSV/TSV/custom) from a reader into a table.
425    ///
426    /// Streams data in chunks to the server via COPY FROM STDIN, keeping
427    /// memory usage constant regardless of input size.
428    ///
429    /// Returns the number of rows imported.
430    ///
431    /// # Example
432    ///
433    /// ```no_run
434    /// use hyperdb_api::{Connection, CreateMode, Result};
435    /// use hyperdb_api::copy::CopyOptions;
436    ///
437    /// fn main() -> Result<()> {
438    ///     let conn = Connection::connect("localhost:7483", "test.hyper", CreateMode::DoNotCreate)?;
439    ///
440    ///     // Import TSV data
441    ///     let opts = CopyOptions::tsv();
442    ///     let tsv = "1\tAlice\n2\tBob\n";
443    ///     let rows = conn.import_text("users", &opts, tsv.as_bytes())?;
444    ///
445    ///     // Import pipe-delimited with header
446    ///     let opts = CopyOptions::csv().with_delimiter(b'|').with_header(true);
447    ///     let data = "id|name\n1|Alice\n2|Bob\n";
448    ///     let rows = conn.import_text("users", &opts, data.as_bytes())?;
449    ///     Ok(())
450    /// }
451    /// ```
452    ///
453    /// # Errors
454    ///
455    /// - Returns [`Error::Other`] if `options` fail validation or the
456    ///   connection is on gRPC.
457    /// - Returns [`Error::Client`] if the server rejects the
458    ///   `COPY <table> FROM STDIN` statement or a row during import.
459    /// - Returns [`Error::Io`] if reading from `reader` fails.
460    pub fn import_text(
461        &self,
462        table_name: &str,
463        options: &CopyOptions,
464        mut reader: impl std::io::Read,
465    ) -> Result<u64> {
466        options.validate()?;
467        let escaped_table = table_name.replace('"', "\"\"");
468        let copy_query = format!(
469            "COPY \"{}\" FROM STDIN {}",
470            escaped_table,
471            options.to_copy_in_options()
472        );
473
474        let client = self.tcp_client().ok_or_else(|| {
475            Error::new(
476                "CSV import requires a TCP connection. gRPC does not support COPY operations.",
477            )
478        })?;
479
480        let mut writer = client.copy_in_raw(&copy_query)?;
481
482        // Stream data in chunks (default 1 MB, configurable via with_chunk_size)
483        let chunk_size = options.chunk_size.unwrap_or(DEFAULT_IMPORT_CHUNK_SIZE);
484        let mut buf = vec![0u8; chunk_size];
485        loop {
486            let n = reader
487                .read(&mut buf)
488                .map_err(|e| Error::with_cause("Failed to read import data", e))?;
489            if n == 0 {
490                break;
491            }
492            writer.send(&buf[..n])?;
493        }
494
495        Ok(writer.finish()?)
496    }
497}
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502
503    #[test]
504    fn test_csv_options_valid() {
505        let opts = CopyOptions::csv().with_quote(b'"').with_escape(b'\\');
506        assert!(opts.validate().is_ok());
507    }
508
509    #[test]
510    fn test_text_quote_rejected() {
511        let opts = CopyOptions::text().with_quote(b'"');
512        let err = opts.validate().unwrap_err();
513        assert!(err.to_string().contains("QUOTE"));
514        assert!(err.to_string().contains("CSV format"));
515    }
516
517    #[test]
518    fn test_text_escape_rejected() {
519        let opts = CopyOptions::text().with_escape(b'\\');
520        let err = opts.validate().unwrap_err();
521        assert!(err.to_string().contains("ESCAPE"));
522        assert!(err.to_string().contains("CSV format"));
523    }
524
525    #[test]
526    fn test_tsv_quote_rejected() {
527        let opts = CopyOptions::tsv().with_quote(b'"');
528        let err = opts.validate().unwrap_err();
529        assert!(err.to_string().contains("QUOTE"));
530    }
531
532    #[test]
533    fn test_text_without_csv_options_valid() {
534        let opts = CopyOptions::text().with_header(true).with_delimiter(b'|');
535        assert!(opts.validate().is_ok());
536    }
537
538    #[test]
539    fn test_chunk_size_custom() {
540        let opts = CopyOptions::csv().with_chunk_size(4 * 1024 * 1024);
541        assert_eq!(opts.chunk_size, Some(4 * 1024 * 1024));
542    }
543
544    #[test]
545    fn test_chunk_size_default() {
546        let opts = CopyOptions::csv();
547        assert_eq!(
548            opts.chunk_size.unwrap_or(DEFAULT_IMPORT_CHUNK_SIZE),
549            1024 * 1024
550        );
551    }
552
553    #[test]
554    #[should_panic(expected = "chunk size must be > 0")]
555    fn test_chunk_size_zero_panics() {
556        let _ = CopyOptions::csv().with_chunk_size(0);
557    }
558
559    #[test]
560    fn test_copy_in_options_csv() {
561        let opts = CopyOptions::csv().with_header(true).with_delimiter(b'|');
562        let sql = opts.to_copy_in_options();
563        assert!(sql.contains("FORMAT csv"));
564        assert!(sql.contains("HEADER true"));
565        assert!(sql.contains("DELIMITER"));
566    }
567
568    #[test]
569    fn test_copy_in_options_text() {
570        let opts = CopyOptions::text();
571        let sql = opts.to_copy_in_options();
572        assert!(sql.contains("FORMAT text"));
573    }
574}