use arrow::datatypes::SchemaRef as ArrowSchemaRef;
use crossbeam::channel;
use crossbeam::thread::scope;
use crossbeam::thread::ScopedJoinHandle;
use parquet::file::properties::WriterProperties as ArrowWriterProperties;
use evolution_builder::builder::ParquetBuilder;
use evolution_common::error::{ExecutionError, Result, SetupError};
use evolution_common::thread::estimate_best_thread_channel_capacity;
use evolution_common::NUM_BYTES_FOR_NEWLINE;
use evolution_schema::schema::FixedSchema;
use evolution_slicer::slicer::{FileSlicer, Slicer};
use evolution_writer::parquet::ParquetWriter;
#[cfg(debug_assertions)]
use log::debug;
use log::info;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread;
pub trait Converter {}
pub type ConverterRef = Box<dyn Converter>;
pub struct ParquetConverter {
slicer: FileSlicer,
writer: ParquetWriter,
builder: ParquetBuilder,
schema: FixedSchema,
read_buffer_size: usize,
n_threads: usize,
thread_channel_capacity: usize,
}
impl ParquetConverter {
pub fn builder() -> ParquetConverterBuilder {
ParquetConverterBuilder {
..Default::default()
}
}
pub fn try_convert(&mut self) -> Result<()> {
if self.n_threads > 1 {
self.try_convert_multithreaded()?;
} else {
self.try_convert_single_threaded()?;
}
Ok(())
}
pub fn try_convert_multithreaded(&mut self) -> Result<()> {
let mut buffer_capacity = self.read_buffer_size;
let n_worker_threads: usize = self.n_threads - 1;
info!("Converting flf to parquet in multithreaded mode.");
info!(
"The file to convert is {} bytes in total.",
self.slicer.bytes_to_read(),
);
let mut line_break_indices: Vec<usize> = Vec::with_capacity(buffer_capacity);
let mut thread_workloads: Vec<(usize, usize)> = Vec::with_capacity(n_worker_threads);
loop {
if self.slicer.is_done() {
break;
}
let mut remaining_bytes: usize = self.slicer.remaining_bytes();
let mut bytes_processed: usize = self.slicer.bytes_processed();
let mut bytes_overlapped: usize = self.slicer.bytes_overlapped();
if remaining_bytes < buffer_capacity {
buffer_capacity = remaining_bytes;
}
let mut buffer: Vec<u8> = vec![0u8; buffer_capacity];
self.slicer.try_read_to_buffer(&mut buffer)?;
self.slicer
.try_find_line_breaks(&buffer, &mut line_break_indices, true)?;
let byte_idx_last_line_break: usize = self.slicer.try_find_last_line_break(&buffer)?;
let n_bytes_left_after_last_line_break: usize =
buffer_capacity - byte_idx_last_line_break - NUM_BYTES_FOR_NEWLINE;
self.distribute_worker_thread_workloads(&line_break_indices, &mut thread_workloads);
self.spawn_converter_threads(&buffer, &thread_workloads)?;
line_break_indices.clear();
thread_workloads.clear();
self.slicer
.try_seek_relative(-(n_bytes_left_after_last_line_break as i64))?;
bytes_processed += buffer_capacity - n_bytes_left_after_last_line_break;
bytes_overlapped += n_bytes_left_after_last_line_break;
remaining_bytes -= buffer_capacity - n_bytes_left_after_last_line_break;
self.slicer.set_remaining_bytes(remaining_bytes);
self.slicer.set_bytes_processed(bytes_processed);
self.slicer.set_bytes_overlapped(bytes_overlapped);
}
#[cfg(debug_assertions)]
debug!("Finishing and closing writer.");
self.writer.try_finish()?;
info!("Done converting flf to parquet in multithreaded mode!");
if self.slicer.bytes_overlapped() > 0 {
info!(
"We read {} bytes two times (due to sliding window overlap).",
self.slicer.bytes_overlapped(),
);
}
Ok(())
}
pub fn try_convert_single_threaded(&mut self) -> Result<()> {
let mut buffer_capacity: usize = self.read_buffer_size;
info!("Converting flf to parquet in single-threaded mode.");
info!(
"The file to convert is {} bytes in total.",
self.slicer.bytes_to_read(),
);
loop {
if self.slicer.is_done() {
break;
}
let mut remaining_bytes: usize = self.slicer.remaining_bytes();
let mut bytes_processed: usize = self.slicer.bytes_processed();
let mut bytes_overlapped: usize = self.slicer.bytes_overlapped();
if remaining_bytes < buffer_capacity {
buffer_capacity = remaining_bytes;
}
let mut buffer: Vec<u8> = vec![0u8; buffer_capacity];
self.slicer.try_read_to_buffer(&mut buffer)?;
let byte_idx_last_line_break: usize = self.slicer.try_find_last_line_break(&buffer)?;
let n_bytes_left_after_last_line_break: usize =
buffer_capacity - byte_idx_last_line_break - NUM_BYTES_FOR_NEWLINE;
self.builder.try_build_from_slice(&buffer)?;
self.writer.try_write_from_builder(&mut self.builder)?;
self.slicer
.try_seek_relative(-(n_bytes_left_after_last_line_break as i64))?;
bytes_processed += buffer_capacity - n_bytes_left_after_last_line_break;
bytes_overlapped += n_bytes_left_after_last_line_break;
remaining_bytes -= buffer_capacity - n_bytes_left_after_last_line_break;
self.slicer.set_remaining_bytes(remaining_bytes);
self.slicer.set_bytes_processed(bytes_processed);
self.slicer.set_bytes_overlapped(bytes_overlapped);
}
self.writer.try_finish()?;
info!("Done converting flf to parquet in single-threaded mode!");
if self.slicer.bytes_overlapped() > 0 {
info!(
"We read {} bytes two times (due to sliding window overlap).",
self.slicer.bytes_overlapped(),
);
}
Ok(())
}
fn distribute_worker_thread_workloads(
&self,
line_break_indices: &[usize],
thread_workloads: &mut Vec<(usize, usize)>,
) {
let n_line_break_indices: usize = line_break_indices.len();
let n_rows_per_thread: usize = n_line_break_indices / thread_workloads.capacity();
let mut prev_line_break_byte_idx: usize = 0;
for worker_idx in 1..(thread_workloads.capacity()) {
let next_line_break_byte_idx: usize =
line_break_indices[n_rows_per_thread * worker_idx];
thread_workloads.push((prev_line_break_byte_idx, next_line_break_byte_idx));
prev_line_break_byte_idx = next_line_break_byte_idx + NUM_BYTES_FOR_NEWLINE;
}
thread_workloads.push((
prev_line_break_byte_idx,
line_break_indices[n_line_break_indices - 1],
));
}
fn spawn_converter_threads(
&mut self,
buffer: &Vec<u8>,
thread_workloads: &[(usize, usize)],
) -> Result<()> {
let (sender, receiver) = channel::bounded(self.thread_channel_capacity);
let arc_buffer: Arc<&Vec<u8>> = Arc::new(buffer);
let thread_result: thread::Result<()> = scope(|s| {
#[cfg(debug_assertions)]
debug!(
"Starting {} worker threads for conversion.",
thread_workloads.len()
);
let threads = thread_workloads
.iter()
.enumerate()
.map(|(t_idx, (from, to))| {
let t_sender: channel::Sender<ParquetBuilder> = sender.clone();
let mut t_builder: ParquetBuilder = self.schema.clone().into_builder();
let t_buffer: Arc<&Vec<u8>> = arc_buffer.clone();
let t_buffer_slice: &[u8] = &t_buffer[*from..*to];
s.spawn(move |_| {
#[cfg(debug_assertions)]
debug!("Thread {} working...", t_idx + 1);
t_builder.try_build_from_slice(t_buffer_slice).unwrap();
t_sender.send(t_builder).unwrap();
drop(t_sender);
#[cfg(debug_assertions)]
debug!("Thread {} done!", t_idx + 1);
})
})
.collect::<Vec<ScopedJoinHandle<()>>>();
drop(sender);
#[cfg(debug_assertions)]
debug!("Thread 0 waiting for batches to write to buffer...");
for mut builder in receiver {
self.writer.try_write_from_builder(&mut builder).unwrap();
drop(builder);
}
#[cfg(debug_assertions)]
debug!("Writer thread done!");
for handle in threads {
handle.join().expect("Could not join worker thread handle!");
}
});
if thread_result.is_err() {
return Err(Box::new(ExecutionError::new(
format!(
"One of the scoped threads returned an error: {:?}",
thread_result
)
.as_str(),
)));
}
#[cfg(debug_assertions)]
debug!("Buffer chunk done!");
Ok(())
}
}
impl Converter for ParquetConverter {}
#[derive(Default)]
pub struct ParquetConverterBuilder {
in_path: Option<PathBuf>,
schema_path: Option<PathBuf>,
out_path: Option<PathBuf>,
n_threads: Option<usize>,
read_buffer_size: Option<usize>,
thread_channel_capacity: Option<usize>,
write_properties: Option<ArrowWriterProperties>,
}
impl ParquetConverterBuilder {
pub fn with_in_file(mut self, in_path: PathBuf) -> Self {
self.in_path = Some(in_path);
self
}
pub fn with_schema(mut self, schema_path: PathBuf) -> Self {
self.schema_path = Some(schema_path);
self
}
pub fn with_out_file(mut self, out_path: PathBuf) -> Self {
self.out_path = Some(out_path);
self
}
pub fn with_num_threads(mut self, n_threads: usize) -> Self {
self.n_threads = Some(n_threads);
self
}
pub fn with_read_buffer_size(mut self, buffer_size: usize) -> Self {
self.read_buffer_size = Some(buffer_size);
self
}
pub fn with_write_properties(mut self, properties: ArrowWriterProperties) -> Self {
self.write_properties = Some(properties);
self
}
pub fn with_thread_channel_capacity(mut self, capacity: Option<usize>) -> Self {
self.thread_channel_capacity = capacity;
self
}
pub fn with_default_values(mut self) -> Self {
let n_threads: usize = num_cpus::get();
self.n_threads = Some(n_threads);
self.read_buffer_size = Some(500 * 1024 * 1024); self.thread_channel_capacity = Some(n_threads);
self
}
pub fn try_build(self) -> Result<ParquetConverter> {
let in_file: PathBuf = self.in_path.ok_or_else(|| {
Box::new(SetupError::new(
"Required field 'in_path' was not provided, exiting...",
))
})?;
let schema_path: PathBuf = self.schema_path.ok_or_else(|| {
Box::new(SetupError::new(
"Required field 'schema_path' was not provided, exiting...",
))
})?;
let out_path: PathBuf = self.out_path.ok_or_else(|| {
Box::new(SetupError::new(
"Required field 'out_path' was not provided, exiting...",
))
})?;
let n_threads: usize = self.n_threads.ok_or_else(|| {
Box::new(SetupError::new(
"Required field 'n_threads' was not provided, exiting...",
))
})?;
let read_buffer_size: usize = self.read_buffer_size.ok_or_else(|| {
Box::new(SetupError::new(
"Required field 'read_buffer_size' was not provided, exiting...",
))
})?;
let thread_channel_capacity: usize = self
.thread_channel_capacity
.unwrap_or(estimate_best_thread_channel_capacity(n_threads));
let slicer: FileSlicer = FileSlicer::try_from_path(in_file)?;
let schema: FixedSchema = FixedSchema::from_path(schema_path)?;
let builder: ParquetBuilder = schema.clone().into_builder::<ParquetBuilder>();
let arrow_schema: ArrowSchemaRef = Arc::new(schema.clone().into_arrow_schema());
let writer: ParquetWriter = ParquetWriter::builder()
.with_out_path(out_path)
.with_arrow_schema(arrow_schema)
.with_properties(self.write_properties)
.try_build()?;
Ok(ParquetConverter {
slicer,
writer,
builder,
schema,
read_buffer_size,
n_threads,
thread_channel_capacity,
})
}
}