use smallvec::SmallVec;
use crate::{Record, MAX_ARITY};
use super::error::Result;
pub trait GenericProcessor<Rf>: Send + Clone {
fn process_record_batch(&mut self, records: impl Iterator<Item = Rf>) -> Result<()>;
fn on_batch_complete(&mut self) -> Result<()> {
Ok(())
}
fn on_thread_complete(&mut self) -> Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn set_thread_id(&mut self, thread_id: usize) {
}
}
pub trait ParallelProcessor<Rf: Record>: Send + Clone {
fn process_record_batch(&mut self, records: impl Iterator<Item = Rf>) -> Result<()> {
for record in records {
self.process_record(record)?;
}
Ok(())
}
fn process_record(&mut self, _record: Rf) -> Result<()> {
unimplemented!("Either ParallelProcessor::process_record or ParallelProcessor::process_record_batch must be implemented!");
}
fn on_batch_complete(&mut self) -> Result<()> {
Ok(())
}
fn on_thread_complete(&mut self) -> Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn set_thread_id(&mut self, thread_id: usize) {
}
fn get_thread_id(&self) -> usize {
unimplemented!("Must be implemented by the processor to be used")
}
}
impl<Rf: Record, P: ParallelProcessor<Rf>> GenericProcessor<Rf> for P {
fn process_record_batch(&mut self, records: impl Iterator<Item = Rf>) -> Result<()> {
self.process_record_batch(records)
}
fn on_batch_complete(&mut self) -> Result<()> {
self.on_batch_complete()
}
fn on_thread_complete(&mut self) -> Result<()> {
self.on_thread_complete()
}
fn set_thread_id(&mut self, thread_id: usize) {
self.set_thread_id(thread_id);
}
}
pub trait PairedParallelProcessor<Rf: Record>: Send + Clone {
fn process_record_pair_batch(
&mut self,
record_pairs: impl Iterator<Item = (Rf, Rf)>,
) -> Result<()> {
for record_pair in record_pairs {
self.process_record_pair(record_pair.0, record_pair.1)?;
}
Ok(())
}
fn process_record_pair(&mut self, _record1: Rf, _record2: Rf) -> Result<()> {
unimplemented!("Either PairedParallelProcessor::process_record_pair or PairedParallelProcessor::process_record_pair_batch must be implemented!");
}
fn on_batch_complete(&mut self) -> Result<()> {
Ok(())
}
fn on_thread_complete(&mut self) -> Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn set_thread_id(&mut self, thread_id: usize) {
}
fn get_thread_id(&self) -> usize {
unimplemented!("Must be implemented by the processor to be used")
}
}
impl<Rf: Record, P: PairedParallelProcessor<Rf>> GenericProcessor<(Rf, Rf)> for P {
fn process_record_batch(&mut self, records: impl Iterator<Item = (Rf, Rf)>) -> Result<()> {
self.process_record_pair_batch(records)
}
fn on_batch_complete(&mut self) -> Result<()> {
self.on_batch_complete()
}
fn on_thread_complete(&mut self) -> Result<()> {
self.on_thread_complete()
}
fn set_thread_id(&mut self, thread_id: usize) {
self.set_thread_id(thread_id);
}
}
pub trait MultiParallelProcessor<Rf: Record>: Send + Clone {
fn process_multi_record_batch(
&mut self,
multi_records: impl Iterator<Item = SmallVec<[Rf; MAX_ARITY]>>,
) -> Result<()> {
for multi_record in multi_records {
self.process_multi_record(&multi_record)?;
}
Ok(())
}
fn process_multi_record(&mut self, _records: &[Rf]) -> Result<()> {
unimplemented!("Either MultiParallelProcessor::process_multi_record or MultiParallelProcessor::process_multi_record_batch must be implemented!");
}
fn on_batch_complete(&mut self) -> Result<()> {
Ok(())
}
fn on_thread_complete(&mut self) -> Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn set_thread_id(&mut self, thread_id: usize) {
}
fn get_thread_id(&self) -> usize {
unimplemented!("Must be implemented by the processor to be used")
}
}
impl<Rf: Record, P: MultiParallelProcessor<Rf>> GenericProcessor<SmallVec<[Rf; MAX_ARITY]>> for P {
fn process_record_batch(
&mut self,
multi_records: impl Iterator<Item = SmallVec<[Rf; MAX_ARITY]>>,
) -> Result<()> {
self.process_multi_record_batch(multi_records)
}
fn on_batch_complete(&mut self) -> Result<()> {
self.on_batch_complete()
}
fn on_thread_complete(&mut self) -> Result<()> {
self.on_thread_complete()
}
fn set_thread_id(&mut self, thread_id: usize) {
self.set_thread_id(thread_id);
}
}
impl<Rf: Record, F> ParallelProcessor<Rf> for F
where
F: for<'a> FnMut(&'a mut dyn Iterator<Item = Rf>) -> Result<()> + Send + Clone,
{
fn process_record_batch(&mut self, mut records: impl Iterator<Item = Rf>) -> Result<()> {
self(&mut records)
}
}
impl<Rf: Record, F> PairedParallelProcessor<Rf> for F
where
F: FnMut(&mut dyn Iterator<Item = (Rf, Rf)>) -> Result<()> + Send + Clone,
{
fn process_record_pair_batch(
&mut self,
mut record_pairs: impl Iterator<Item = (Rf, Rf)>,
) -> Result<()> {
self(&mut record_pairs)
}
}
impl<Rf: Record, F> MultiParallelProcessor<Rf> for F
where
F: FnMut(&mut dyn Iterator<Item = SmallVec<[Rf; MAX_ARITY]>>) -> Result<()> + Send + Clone,
{
fn process_multi_record_batch(
&mut self,
mut multi_records: impl Iterator<Item = SmallVec<[Rf; MAX_ARITY]>>,
) -> Result<()> {
self(&mut multi_records)
}
}