#[cfg(feature = "parallel")]
use rayon::prelude::*;
pub struct BatchProcessor<T, B = Vec<T>> {
items: B,
batch_size: usize,
buffer: Vec<T>,
}
impl<T> BatchProcessor<T, Vec<T>> {
pub fn new(items: Vec<T>) -> Self {
Self {
items,
batch_size: 64,
buffer: Vec::with_capacity(64),
}
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
items: Vec::with_capacity(capacity),
batch_size: 64,
buffer: Vec::with_capacity(64),
}
}
}
impl<T, B: AsRef<[T]>> BatchProcessor<T, B> {
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
pub fn len(&self) -> usize {
self.items.as_ref().len()
}
pub fn is_empty(&self) -> bool {
self.items.as_ref().is_empty()
}
}
impl<T: Clone + Send + 'static> BatchProcessor<T, Vec<T>> {
#[cfg(feature = "parallel")]
pub fn process_par<F, R>(self, f: F) -> Vec<R>
where
F: Fn(T) -> R + Send + Sync + Copy,
R: Send + 'static,
{
let Self {
items,
batch_size,
buffer: _,
} = self;
items
.into_par_iter()
.chunks(batch_size)
.flat_map(move |chunk| {
chunk.into_iter().map(f).collect::<Vec<R>>()
})
.collect()
}
pub fn process_sequential<F, R>(self, mut f: F) -> Vec<R>
where
F: FnMut(&T, &mut Vec<T>) -> R,
{
let Self {
items,
batch_size: _,
buffer,
} = self;
let mut results = Vec::with_capacity(items.len());
let mut buffer = buffer;
for item in items {
buffer.clear();
let result = f(&item, &mut buffer);
results.push(result);
}
results
}
}
pub struct GraphBatchQuery<G, T> {
graph: G,
queries: Vec<T>,
batch_size: usize,
}
impl<G, T> GraphBatchQuery<G, T> {
pub fn new(graph: G, queries: Vec<T>) -> Self {
Self {
graph,
queries,
batch_size: 64,
}
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
pub fn len(&self) -> usize {
self.queries.len()
}
pub fn is_empty(&self) -> bool {
self.queries.is_empty()
}
}
#[cfg(feature = "parallel")]
impl<G, T> GraphBatchQuery<G, T>
where
G: Send + Sync,
T: Send + Sync,
{
pub fn execute_par<F, R>(self, query_fn: F) -> Vec<R>
where
F: Fn(&G, &T) -> R + Send + Sync,
R: Send + 'static,
{
let Self {
graph,
queries,
batch_size,
} = self;
queries
.into_par_iter()
.chunks(batch_size)
.flat_map(|chunk| {
chunk
.into_iter()
.map(|query| query_fn(&graph, &query))
.collect::<Vec<R>>()
})
.collect()
}
}
impl<G, T> GraphBatchQuery<G, T>
where
G: Clone,
{
pub fn execute_sequential<F, R>(self, mut query_fn: F) -> Vec<R>
where
F: FnMut(&G, &T) -> R,
{
let Self {
graph,
queries,
batch_size: _,
} = self;
queries
.into_iter()
.map(|query| query_fn(&graph, &query))
.collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_batch_processor_creation() {
let processor = BatchProcessor::new(vec![1, 2, 3, 4, 5]);
assert_eq!(processor.len(), 5);
assert!(!processor.is_empty());
}
#[test]
fn test_batch_processor_empty() {
let processor: BatchProcessor<i32> = BatchProcessor::new(vec![]);
assert_eq!(processor.len(), 0);
assert!(processor.is_empty());
}
#[test]
#[cfg(feature = "parallel")]
fn test_batch_processor_parallel() {
let processor = BatchProcessor::new(vec![1, 2, 3, 4, 5, 6, 7, 8]);
let results: Vec<_> = processor.process_par(|x| x * 2);
assert_eq!(results, vec![2, 4, 6, 8, 10, 12, 14, 16]);
}
#[test]
fn test_batch_processor_sequential() {
let processor = BatchProcessor::new(vec![1, 2, 3, 4, 5]);
let results: Vec<_> = processor.process_sequential(|&x, buffer| {
buffer.push(x);
x * 2
});
assert_eq!(results, vec![2, 4, 6, 8, 10]);
}
}