#[cfg(feature = "dep_arrow")]
use arrow::{
array::ArrayRef, datatypes::Schema, ipc::writer::FileWriter, record_batch::RecordBatch,
};
use std::path::PathBuf;
#[cfg(feature = "dep_arrow")]
pub fn write_batches(
path: PathBuf,
schema: Schema,
chunk: RecordBatch,
) -> Result<(), Box<dyn std::error::Error>> {
let file = std::fs::File::create(path)?;
let mut writer = FileWriter::try_new(file, &schema)?;
writer.write(&chunk)?;
writer.finish()?;
Ok(())
}
#[cfg(feature = "dep_arrow")]
pub fn open_arrow_mutex_writer(
output: PathBuf,
schema: Schema,
) -> Result<std::sync::Mutex<FileWriter<std::fs::File>>, Box<dyn std::error::Error>> {
let file = std::fs::File::create(output)?;
let writer = FileWriter::try_new(file, &schema)?;
Ok(std::sync::Mutex::new(writer))
}
#[cfg(feature = "dep_arrow")]
pub fn write_to_arrow_mutex_writer(
writer: &std::sync::Mutex<FileWriter<std::fs::File>>,
res: ArrayRef,
batch_length: usize,
) -> Option<usize> {
if batch_length == 0 {
return None;
}
let mut file_lock = match writer.lock() {
Ok(lock) => lock,
Err(err) => {
tracing::error!("Error locking file: {:?}", err);
return None;
}
};
let chunk: RecordBatch = res
.as_any()
.downcast_ref::<arrow::array::StructArray>()
.unwrap()
.into();
match file_lock.write(&chunk) {
Ok(_) => Some(batch_length),
Err(err) => {
tracing::error!("Error writing chunk: {:?}", err);
None
}
}
}
#[cfg(feature = "dep_arrow")]
pub fn close_arrow_mutex_writer(
writer: std::sync::Mutex<FileWriter<std::fs::File>>,
) -> Result<(), Box<dyn std::error::Error>> {
let mut writer = match writer.lock() {
Ok(writer) => writer,
Err(err) => {
tracing::error!("Error locking file: {:?}", err);
return Err("Lock error".into());
}
};
writer.finish()?;
Ok(())
}