use parquet::{
file::{properties::WriterProperties, writer::SerializedFileWriter},
record::RecordWriter,
};
use std::{fs::File, io, path::Path, sync::Arc};
use thiserror::Error;
const DEFAULT_MAXIMUM_BUFFER_SIZE: usize = 2_usize.pow(17);
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum Error {
#[error("I/O error")]
IO(#[from] io::Error),
#[error("Parquet error")]
Parquet(#[from] parquet::errors::ParquetError),
}
fn create_unique_file<P: AsRef<Path>>(path: P) -> io::Result<File> {
let mut index: u32 = 0;
loop {
let numbered_path = path.as_ref().with_added_extension(index.to_string());
match File::create_new(numbered_path) {
Ok(file) => return Ok(file),
Err(error) => match error.kind() {
io::ErrorKind::AlreadyExists => (),
_ => return Err(error),
},
}
index += 1;
}
}
pub struct ParquetLogger<T>
where
for<'a> &'a [T]: RecordWriter<T>,
{
writer: SerializedFileWriter<File>,
buffer: Vec<T>,
maximum_buffer_size: usize,
}
impl<T> ParquetLogger<T>
where
for<'a> &'a [T]: RecordWriter<T>,
{
#[inline]
pub fn create<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let buffer = Vec::<T>::new();
let schema = buffer.as_slice().schema()?;
let props = Arc::new(WriterProperties::builder().build());
let log_file = File::create(path)?;
let writer = SerializedFileWriter::new(log_file, schema, props)?;
Ok(Self {
writer,
buffer,
maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
})
}
#[inline]
pub fn create_unique<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let buffer = Vec::<T>::new();
let schema = buffer.as_slice().schema()?;
let props = Arc::new(WriterProperties::builder().build());
let log_file = create_unique_file(path)?;
let writer = SerializedFileWriter::new(log_file, schema, props)?;
Ok(Self {
writer,
buffer,
maximum_buffer_size: DEFAULT_MAXIMUM_BUFFER_SIZE,
})
}
#[inline]
pub fn log(&mut self, record: T) -> Result<(), Error> {
self.buffer.push(record);
if self.buffer.len() >= self.maximum_buffer_size {
self.sync()?;
}
Ok(())
}
#[inline]
#[must_use]
pub fn maximum_buffer_size(&self) -> usize {
self.maximum_buffer_size
}
#[inline]
pub fn maximum_buffer_size_mut(&mut self) -> &mut usize {
&mut self.maximum_buffer_size
}
#[inline]
pub fn sync(&mut self) -> Result<(), Error> {
if !self.buffer.is_empty() {
let mut row_group = self.writer.next_row_group()?;
self.buffer.as_slice().write_to_row_group(&mut row_group)?;
row_group.close()?;
self.buffer.clear();
self.writer.flush()?;
}
Ok(())
}
}
impl<T> Drop for ParquetLogger<T>
where
for<'a> &'a [T]: RecordWriter<T>,
{
#[inline]
fn drop(&mut self) {
let _ = self.sync();
let _ = self.writer.finish();
}
}
#[cfg(test)]
mod tests {
use std::fs;
use super::*;
use assert2::check;
use parquet_derive::ParquetRecordWriter;
use tempfile::tempdir;
#[derive(ParquetRecordWriter)]
struct LogRecord {
a: f64,
}
#[test]
fn test_create_unique() -> anyhow::Result<()> {
let tmp_dir = tempdir()?;
let path = tmp_dir.path().join("test.parquet");
let _ = ParquetLogger::create_unique(path.clone())?;
check!(fs::exists(path.with_added_extension("0"))?);
check!(!fs::exists(path.with_added_extension("1"))?);
check!(!fs::exists(path.with_added_extension("2"))?);
let _ = ParquetLogger::create_unique(path.clone())?;
check!(fs::exists(path.with_added_extension("0"))?);
check!(fs::exists(path.with_added_extension("1"))?);
check!(!fs::exists(path.with_added_extension("2"))?);
let _ = ParquetLogger::create_unique(path.clone())?;
check!(fs::exists(path.with_added_extension("0"))?);
check!(fs::exists(path.with_added_extension("1"))?);
check!(fs::exists(path.with_added_extension("2"))?);
Ok(())
}
}