use gut::prelude::*;
mod pq {
use anyhow::Result;
use serde::Serialize;
use arrow2::{
array::Array,
chunk::Chunk,
datatypes::{Field, Schema},
io::parquet::write::{Encoding, WriteOptions},
};
pub fn get_parquet_columns<T: Serialize + ?Sized>(items: &T) -> Result<(Schema, Chunk<Box<dyn Array>>)> {
use serde_arrow::schema::{SchemaLike, SerdeArrowSchema, TracingOptions};
let fields: Vec<Field> =
SerdeArrowSchema::from_samples(items, TracingOptions::default().allow_null_fields(true).guess_dates(true))?
.try_into()?;
let arrays = serde_arrow::to_arrow2(&fields, items)?;
Ok((Schema::from(fields), Chunk::new(arrays)))
}
pub fn default_write_options() -> WriteOptions {
use arrow2::io::parquet::write::{CompressionOptions, Version};
WriteOptions {
write_statistics: false,
compression: CompressionOptions::Snappy,
version: Version::V2,
data_pagesize_limit: None,
}
}
pub fn get_encodings(schema: &Schema) -> Vec<Vec<Encoding>> {
use arrow2::io::parquet::write::transverse;
schema
.fields
.iter()
.map(|f| transverse(&f.data_type, |_| Encoding::Plain))
.collect()
}
}
mod writer {
use super::pq::*;
use super::Result;
use arrow2::io::parquet::write::FileWriter;
use serde::Serialize;
use std::fs::File;
use std::path::Path;
use std::path::PathBuf;
pub struct SimpleParquetFileWriter {
path: PathBuf,
writer: Option<FileWriter<File>>,
}
impl SimpleParquetFileWriter {
pub fn new(path: &Path) -> Self {
Self {
path: path.to_owned(),
writer: None,
}
}
pub fn write_row_group<T: Serialize + ?Sized>(&mut self, records: &T) -> Result<&mut Self> {
use anyhow::ensure;
use arrow2::io::parquet::write::RowGroupIterator;
let (schema, columns) = get_parquet_columns(records)?;
let options = default_write_options();
if self.writer.is_none() {
let file = File::create(&self.path)?;
self.writer = FileWriter::try_new(file, schema.clone(), options).ok();
}
ensure!(self.writer.is_some());
if let Some(writer) = self.writer.as_mut() {
let encodings = get_encodings(&schema);
let iter = vec![Ok(columns)];
let row_groups = RowGroupIterator::try_new(iter.into_iter(), &schema, options, encodings)?;
for group in row_groups {
writer.write(group?)?;
}
}
Ok(self)
}
pub fn close(self) -> Result<()> {
if let Some(mut writer) = self.writer {
let _size = writer.end(None)?;
}
Ok(())
}
}
}
pub use writer::SimpleParquetFileWriter;
#[test]
fn test_parquet_writer() -> Result<()> {
use writer::SimpleParquetFileWriter;
#[derive(Debug, Serialize)]
struct Coord {
x: f64,
e: Option<f64>,
i: [f64; 3],
b: Vec<f64>,
}
let mut writer = SimpleParquetFileWriter::new("/tmp/b.pq".as_ref());
let rows = vec![
Coord {
x: 1.0,
e: None,
i: [0.0; 3],
b: vec![1.0],
},
Coord {
x: 2.0,
i: [0.0; 3],
e: None,
b: vec![0.2],
},
];
writer.write_row_group(rows.as_slice())?;
let rows = vec![
Coord {
x: 1.0,
e: None,
i: [0.0; 3],
b: vec![1.0],
},
Coord {
x: 0.5,
e: Some(2.0),
i: [0.0; 3],
b: vec![0.2],
},
];
writer.write_row_group(rows.as_slice())?;
writer.close()?;
Ok(())
}
#[test]
fn test_parquet_gchemol() -> anyhow::Result<()> {
use gchemol::io::formats::ExtxyzFile;
use writer::SimpleParquetFileWriter;
#[derive(Debug, Serialize)]
struct Coords {
frame_index: usize,
atom_number: usize,
position: [f64; 3],
forces: [f64; 3],
}
let mut writer = SimpleParquetFileWriter::new("/tmp/cu.pq".as_ref());
let f = "tests/files/cu.xyz";
let mut rows = vec![];
let mols = ExtxyzFile::read_molecules_from(f)?;
for (i, mol) in mols.enumerate() {
for (j, atom) in mol.atoms() {
let forces: [f64; 3] = atom.properties.load("forces")?;
rows.push(Coords {
frame_index: i,
atom_number: j,
position: atom.position(),
forces,
});
}
writer.write_row_group(rows.as_slice())?;
}
writer.close()?;
Ok(())
}