use crossbeam::channel;
use evolution_common::error::{Result, SetupError};
use evolution_common::{newline, NUM_BYTES_FOR_NEWLINE};
use evolution_schema::schema::FixedSchema;
use evolution_writer::writer::{FixedLengthFileWriter, FixedLengthFileWriterProperties, Writer};
use log::{info, warn};
use padder::pad_and_push_to_buffer;
use rand::rngs::ThreadRng;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::{spawn, JoinHandle};
use crate::mock_column;
pub static MIN_NUM_ROWS_FOR_MULTITHREADING: usize = 100_000;
pub trait Mocker {}
pub type MockerRef = Box<dyn Mocker>;
pub struct FixedLengthFileMocker {
schema: FixedSchema,
writer: FixedLengthFileWriter,
n_rows: usize,
n_threads: usize,
write_buffer_size: usize,
thread_channel_capacity: usize,
}
impl FixedLengthFileMocker {
pub fn builder() -> FixedLengthFileMockerBuilder {
FixedLengthFileMockerBuilder {
..Default::default()
}
}
pub fn try_mock(&mut self) -> Result<()> {
if self.n_threads > 1 {
self.try_mock_multithreaded()?;
} else {
self.try_mock_single_threaded()?;
}
Ok(())
}
fn try_mock_multithreaded(&mut self) -> Result<()> {
let thread_workloads: Vec<usize> = self.distribute_worker_thread_workloads();
let (sender, receiver) = channel::bounded(self.thread_channel_capacity);
info!(
"Starting {} worker threads to generate {} mocked rows.",
self.n_threads - 1,
self.n_rows
);
let arc_schema = Arc::new(self.schema.clone());
let t_n_rows_buffer_size: usize = self.write_buffer_size;
let t_buffer_size: usize = 2 * self.write_buffer_size * arc_schema.row_length()
+ NUM_BYTES_FOR_NEWLINE * self.write_buffer_size;
let threads = thread_workloads
.into_iter()
.enumerate()
.map(|(t_idx, t_workload)| {
let t_schema = Arc::clone(&arc_schema);
let t_sender = sender.clone();
spawn(move || {
let mut rng: ThreadRng = rand::thread_rng();
let mut buffer: Vec<u8> = Vec::with_capacity(t_buffer_size);
for row_idx in 0..t_workload {
if (row_idx % t_n_rows_buffer_size == 0) && (row_idx != 0) {
t_sender.send(buffer).unwrap_or_else(|_| {
panic!(
"Thread {} could not send buffer to master thread!",
t_idx + 1
)
});
buffer = Vec::with_capacity(t_buffer_size);
}
for column in t_schema.iter() {
pad_and_push_to_buffer(
mock_column(column, &mut rng).as_bytes(),
column.length(),
column.alignment(),
column.pad_symbol(),
&mut buffer,
);
}
buffer.extend_from_slice(newline().as_bytes());
}
t_sender.send(buffer).unwrap_or_else(|_| {
panic!(
"Thread {} could not send buffer to master thread!",
t_idx + 1
)
});
info!("Thread {} done!", t_idx + 1);
drop(t_sender);
})
})
.collect::<Vec<JoinHandle<()>>>();
drop(sender);
for buffer in receiver {
self.writer.try_write(&buffer)?;
drop(buffer);
}
for (t_idx, handle) in threads.into_iter().enumerate() {
handle
.join()
.unwrap_or_else(|_| panic!("Thread {} could not join the master thread!", t_idx));
}
Ok(())
}
fn try_mock_single_threaded(&mut self) -> Result<()> {
let n_runes_in_row = self.schema.row_length();
let writer_buffer_size: usize = 4 * self.write_buffer_size * NUM_BYTES_FOR_NEWLINE
+ self.write_buffer_size * n_runes_in_row;
let mut buffer: Vec<u8> = Vec::with_capacity(writer_buffer_size);
let mut rng: ThreadRng = rand::thread_rng();
info!(
"Generating {} mocked rows in single-threaded mode.",
self.n_rows
);
for ridx in 0..self.n_rows {
if (ridx % self.write_buffer_size == 0) && (ridx != 0) {
self.writer.try_write(&buffer)?;
buffer.clear();
}
for column in self.schema.iter() {
pad_and_push_to_buffer(
mock_column(column, &mut rng).as_bytes(),
column.length(),
column.alignment(),
column.pad_symbol(),
&mut buffer,
);
}
buffer.extend_from_slice(newline().as_bytes());
}
info!("Done mocking, flushing any remaining buffers.");
self.writer.try_write(&buffer)?;
self.writer.try_finish()?;
Ok(())
}
fn distribute_worker_thread_workloads(&self) -> Vec<usize> {
let n_worker_threads: usize = self.n_threads - 1;
let n_rows_per_thread: usize = self.n_rows / n_worker_threads;
let n_rows_remaining: usize = self.n_rows - (n_rows_per_thread * n_worker_threads);
let mut thread_workloads: Vec<usize> = (0..(n_worker_threads - 1))
.map(|_| n_rows_per_thread)
.collect();
thread_workloads.push(n_rows_per_thread + n_rows_remaining);
thread_workloads
}
}
impl Mocker for FixedLengthFileMocker {}
#[derive(Default)]
pub struct FixedLengthFileMockerBuilder {
schema_path: Option<PathBuf>,
out_path: Option<PathBuf>,
n_rows: Option<usize>,
n_threads: Option<usize>,
write_buffer_size: Option<usize>,
thread_channel_capacity: Option<usize>,
force_create_new: Option<bool>,
truncate_existing: Option<bool>,
}
impl FixedLengthFileMockerBuilder {
pub fn with_schema(mut self, schema_path: PathBuf) -> Self {
self.schema_path = Some(schema_path);
self
}
pub fn with_out_file(mut self, out_path: PathBuf) -> Self {
self.out_path = Some(out_path);
self
}
pub fn with_num_rows(mut self, n_rows: usize) -> Self {
self.n_rows = Some(n_rows);
self
}
pub fn with_num_threads(mut self, n_thread: usize) -> Self {
self.n_threads = Some(n_thread);
self
}
pub fn with_write_buffer_size(mut self, buffer_size: usize) -> Self {
self.write_buffer_size = Some(buffer_size);
self
}
pub fn with_force_create_new(mut self, force_create_new: bool) -> Self {
self.force_create_new = Some(force_create_new);
self
}
pub fn with_truncate_existing(mut self, truncate_existing: bool) -> Self {
self.truncate_existing = Some(truncate_existing);
self
}
pub fn with_thread_channel_capacity(mut self, capacity: Option<usize>) -> Self {
self.thread_channel_capacity = capacity;
self
}
pub fn with_default_values(mut self) -> Self {
self.n_threads = Some(1);
self.write_buffer_size = Some(1000);
self.force_create_new = Some(true);
self.truncate_existing = Some(true);
self.thread_channel_capacity = Some(1);
self
}
pub fn try_build(self) -> Result<FixedLengthFileMocker> {
let schema: FixedSchema = match self.schema_path {
Some(p) => FixedSchema::from_path(p)?,
None => {
return Err(Box::new(SetupError::new(
"Required field 'schema_path' was not provided, exiting...",
)))
}
};
let out_path: PathBuf = match self.out_path {
Some(p) => p,
None => {
return Err(Box::new(SetupError::new(
"Required field 'out_path' was not provided, exiting...",
)))
}
};
let n_rows: usize = self.n_rows.ok_or_else(|| {
Box::new(SetupError::new(
"Required field 'n_rows' was not provided, exiting...",
))
})?;
let mut n_threads: usize = self.n_threads.ok_or_else(|| {
Box::new(SetupError::new(
"Required field 'n_threads' was not provided, exiting...",
))
})?;
let write_buffer_size: usize = self.write_buffer_size.ok_or_else(|| {
Box::new(SetupError::new(
"Required field 'write_buffer_size' was not provided, exiting...",
))
})?;
let force_create_new: bool = self.force_create_new.ok_or_else(|| {
Box::new(SetupError::new(
"Required field 'force_create_new' was not provided, exiting...",
))
})?;
let truncate_existing: bool = self.truncate_existing.ok_or_else(|| {
Box::new(SetupError::new(
"Required field 'truncate_existing' was not provided, exiting...",
))
})?;
let thread_channel_capacity: usize = self.thread_channel_capacity.unwrap_or(n_threads);
let writer_properties: FixedLengthFileWriterProperties =
FixedLengthFileWriterProperties::builder()
.with_force_create_new(force_create_new)
.with_create_or_open(true)
.with_truncate_existing(truncate_existing)
.try_build()?;
let writer: FixedLengthFileWriter = FixedLengthFileWriter::builder()
.with_out_path(out_path)
.with_properties(writer_properties)
.try_build()?;
let multithreading: bool = (n_rows >= MIN_NUM_ROWS_FOR_MULTITHREADING) && (n_threads > 1);
if !multithreading && n_threads > 1 {
warn!(
"You specified to use {} threads but only want to mock {} rows.",
n_threads, n_rows
);
warn!(
"This is done much more efficiently single-threaded, ignoring any multithreading!"
);
n_threads = 1;
}
Ok(FixedLengthFileMocker {
schema,
writer,
n_rows,
n_threads,
write_buffer_size,
thread_channel_capacity,
})
}
pub fn build(self) -> FixedLengthFileMocker {
self.try_build().unwrap()
}
}