use crate::error::{CudfError, Result};
use crate::table::{Table, TableWithMetadata};
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(i32)]
pub enum Compression {
None = 0,
Auto = 1,
Snappy = 2,
Gzip = 3,
Bzip2 = 4,
Brotli = 5,
Zip = 6,
Xz = 7,
Zlib = 8,
Lz4 = 9,
Lzo = 10,
Zstd = 11,
}
pub struct ParquetReader {
path: String,
columns: Vec<String>,
skip_rows: i64,
num_rows: i64,
}
impl ParquetReader {
pub fn new(path: impl Into<String>) -> Self {
Self {
path: path.into(),
columns: vec![],
skip_rows: -1,
num_rows: -1,
}
}
pub fn columns(mut self, cols: Vec<String>) -> Self {
self.columns = cols;
self
}
pub fn skip_rows(mut self, n: usize) -> Self {
self.skip_rows = n as i64;
self
}
pub fn num_rows(mut self, n: usize) -> Self {
self.num_rows = n as i64;
self
}
pub fn read(self) -> Result<Table> {
let raw = cudf_cxx::io::parquet::ffi::read_parquet(
&self.path,
&self.columns,
self.skip_rows,
self.num_rows,
)
.map_err(CudfError::from_cxx)?;
Ok(Table { inner: raw })
}
pub fn read_with_metadata(self) -> Result<TableWithMetadata> {
let raw = cudf_cxx::io::parquet::ffi::read_parquet_with_metadata(
&self.path,
&self.columns,
self.skip_rows,
self.num_rows,
)
.map_err(CudfError::from_cxx)?;
TableWithMetadata::from_raw(raw)
}
}
pub struct ParquetWriter<'a> {
table: &'a Table,
path: String,
compression: Compression,
}
impl<'a> ParquetWriter<'a> {
pub fn new(table: &'a Table, path: impl Into<String>) -> Self {
Self {
table,
path: path.into(),
compression: Compression::Snappy,
}
}
pub fn compression(mut self, c: Compression) -> Self {
self.compression = c;
self
}
pub fn write(self) -> Result<()> {
cudf_cxx::io::parquet::ffi::write_parquet(
&self.table.inner,
&self.path,
self.compression as i32,
)
.map_err(CudfError::from_cxx)
}
}
pub fn read_parquet(path: impl Into<String>) -> Result<Table> {
ParquetReader::new(path).read()
}
pub fn write_parquet(table: &Table, path: impl Into<String>) -> Result<()> {
ParquetWriter::new(table, path).write()
}
pub struct ParquetMetadata {
inner: cxx::UniquePtr<cudf_cxx::io::parquet::ffi::OwnedParquetMetadata>,
}
impl ParquetMetadata {
pub fn num_rows(&self) -> i64 {
cudf_cxx::io::parquet::ffi::get_num_rows(&self.inner)
}
pub fn num_row_groups(&self) -> i32 {
cudf_cxx::io::parquet::ffi::get_num_row_groups(&self.inner)
}
pub fn num_columns(&self) -> i32 {
cudf_cxx::io::parquet::ffi::get_num_columns(&self.inner)
}
pub fn column_names(&self) -> Vec<String> {
let n = self.num_columns();
(0..n)
.map(|i| cudf_cxx::io::parquet::ffi::get_column_name(&self.inner, i))
.collect()
}
}
pub fn read_parquet_metadata(path: impl Into<String>) -> Result<ParquetMetadata> {
let path = path.into();
let inner =
cudf_cxx::io::parquet::ffi::read_parquet_metadata(&path).map_err(CudfError::from_cxx)?;
Ok(ParquetMetadata { inner })
}
pub struct ChunkedParquetReader {
inner: cxx::UniquePtr<cudf_cxx::io::parquet::ffi::OwnedChunkedParquetReader>,
}
unsafe impl Send for ChunkedParquetReader {}
impl ChunkedParquetReader {
pub fn new(path: impl Into<String>, chunk_read_limit: usize) -> Result<Self> {
let path = path.into();
let inner = cudf_cxx::io::parquet::ffi::chunked_parquet_reader_create(
&path,
chunk_read_limit as i64,
)
.map_err(CudfError::from_cxx)?;
Ok(Self { inner })
}
pub fn has_next(&self) -> Result<bool> {
cudf_cxx::io::parquet::ffi::chunked_parquet_reader_has_next(&self.inner)
.map_err(CudfError::from_cxx)
}
pub fn read_chunk(&self) -> Result<Table> {
let raw = cudf_cxx::io::parquet::ffi::chunked_parquet_reader_read_chunk(&self.inner)
.map_err(CudfError::from_cxx)?;
Ok(Table { inner: raw })
}
}
pub struct ChunkedParquetWriter {
inner: cxx::UniquePtr<cudf_cxx::io::parquet::ffi::OwnedChunkedParquetWriter>,
closed: bool,
}
unsafe impl Send for ChunkedParquetWriter {}
impl ChunkedParquetWriter {
pub fn new(path: impl Into<String>) -> Result<Self> {
Self::with_compression(path, Compression::Snappy)
}
pub fn with_compression(path: impl Into<String>, compression: Compression) -> Result<Self> {
let path = path.into();
let inner =
cudf_cxx::io::parquet::ffi::chunked_parquet_writer_create(&path, compression as i32)
.map_err(CudfError::from_cxx)?;
Ok(Self {
inner,
closed: false,
})
}
pub fn write(&mut self, table: &Table) -> Result<()> {
if self.closed {
return Err(CudfError::InvalidArgument(
"writer is already closed".into(),
));
}
cudf_cxx::io::parquet::ffi::chunked_parquet_writer_write(self.inner.pin_mut(), &table.inner)
.map_err(CudfError::from_cxx)
}
pub fn close(&mut self) -> Result<()> {
if self.closed {
return Ok(());
}
cudf_cxx::io::parquet::ffi::chunked_parquet_writer_close(self.inner.pin_mut())
.map_err(CudfError::from_cxx)?;
self.closed = true;
Ok(())
}
}
impl Drop for ChunkedParquetWriter {
fn drop(&mut self) {
if !self.closed {
let _ = cudf_cxx::io::parquet::ffi::chunked_parquet_writer_close(self.inner.pin_mut());
}
}
}