use super::chunk::DataChunk;
use super::operators::OperatorError;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ChunkSizeHint {
Default,
Small,
Large,
Exact(usize),
AtMost(usize),
}
impl Default for ChunkSizeHint {
fn default() -> Self {
Self::Default
}
}
pub const DEFAULT_CHUNK_SIZE: usize = 2048;
pub const SMALL_CHUNK_SIZE: usize = 512;
pub const LARGE_CHUNK_SIZE: usize = 4096;
pub trait Source: Send + Sync {
fn next_chunk(&mut self, chunk_size: usize) -> Result<Option<DataChunk>, OperatorError>;
fn reset(&mut self);
fn name(&self) -> &'static str;
}
pub trait Sink: Send + Sync {
fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError>;
fn finalize(&mut self) -> Result<(), OperatorError>;
fn name(&self) -> &'static str;
}
pub trait PushOperator: Send + Sync {
fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError>;
fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError>;
fn preferred_chunk_size(&self) -> ChunkSizeHint {
ChunkSizeHint::Default
}
fn name(&self) -> &'static str;
}
pub struct Pipeline {
source: Box<dyn Source>,
operators: Vec<Box<dyn PushOperator>>,
sink: Box<dyn Sink>,
}
impl Pipeline {
pub fn new(
source: Box<dyn Source>,
operators: Vec<Box<dyn PushOperator>>,
sink: Box<dyn Sink>,
) -> Self {
Self {
source,
operators,
sink,
}
}
pub fn simple(source: Box<dyn Source>, sink: Box<dyn Sink>) -> Self {
Self {
source,
operators: Vec::new(),
sink,
}
}
#[must_use]
pub fn with_operator(mut self, op: Box<dyn PushOperator>) -> Self {
self.operators.push(op);
self
}
pub fn execute(&mut self) -> Result<(), OperatorError> {
let chunk_size = self.compute_chunk_size();
while let Some(chunk) = self.source.next_chunk(chunk_size)? {
if !self.push_through(chunk)? {
break;
}
}
self.finalize_all()
}
fn compute_chunk_size(&self) -> usize {
let mut size = DEFAULT_CHUNK_SIZE;
for op in &self.operators {
match op.preferred_chunk_size() {
ChunkSizeHint::Default => {}
ChunkSizeHint::Small => size = size.min(SMALL_CHUNK_SIZE),
ChunkSizeHint::Large => size = size.max(LARGE_CHUNK_SIZE),
ChunkSizeHint::Exact(s) => return s,
ChunkSizeHint::AtMost(s) => size = size.min(s),
}
}
size
}
fn push_through(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
if self.operators.is_empty() {
return self.sink.consume(chunk);
}
let mut current_chunk = chunk;
let num_operators = self.operators.len();
for i in 0..num_operators {
let is_last = i == num_operators - 1;
if is_last {
return self.operators[i].push(current_chunk, &mut *self.sink);
}
let mut collector = ChunkCollector::new();
let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
if !continue_processing || collector.is_empty() {
return Ok(continue_processing);
}
current_chunk = collector.into_single_chunk();
}
Ok(true)
}
fn finalize_all(&mut self) -> Result<(), OperatorError> {
if self.operators.is_empty() {
return self.sink.finalize();
}
for i in 0..self.operators.len() {
let is_last = i == self.operators.len() - 1;
if is_last {
self.operators[i].finalize(&mut *self.sink)?;
} else {
let mut collector = ChunkCollector::new();
self.operators[i].finalize(&mut collector)?;
for chunk in collector.into_chunks() {
self.push_through_from(chunk, i + 1)?;
}
}
}
self.sink.finalize()
}
fn push_through_from(&mut self, chunk: DataChunk, start: usize) -> Result<bool, OperatorError> {
let mut current_chunk = chunk;
for i in start..self.operators.len() {
let is_last = i == self.operators.len() - 1;
if is_last {
return self.operators[i].push(current_chunk, &mut *self.sink);
}
let mut collector = ChunkCollector::new();
let continue_processing = self.operators[i].push(current_chunk, &mut collector)?;
if !continue_processing || collector.is_empty() {
return Ok(continue_processing);
}
current_chunk = collector.into_single_chunk();
}
self.sink.consume(current_chunk)
}
}
pub struct ChunkCollector {
chunks: Vec<DataChunk>,
}
impl ChunkCollector {
pub fn new() -> Self {
Self { chunks: Vec::new() }
}
pub fn is_empty(&self) -> bool {
self.chunks.is_empty()
}
pub fn row_count(&self) -> usize {
self.chunks.iter().map(DataChunk::len).sum()
}
pub fn into_chunks(self) -> Vec<DataChunk> {
self.chunks
}
pub fn into_single_chunk(self) -> DataChunk {
if self.chunks.is_empty() {
return DataChunk::empty();
}
if self.chunks.len() == 1 {
return self
.chunks
.into_iter()
.next()
.expect("chunks has exactly one element: checked on previous line");
}
DataChunk::concat(&self.chunks)
}
}
impl Default for ChunkCollector {
fn default() -> Self {
Self::new()
}
}
impl Sink for ChunkCollector {
fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
if !chunk.is_empty() {
self.chunks.push(chunk);
}
Ok(true)
}
fn finalize(&mut self) -> Result<(), OperatorError> {
Ok(())
}
fn name(&self) -> &'static str {
"ChunkCollector"
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::execution::vector::ValueVector;
use grafeo_common::types::Value;
struct TestSource {
remaining: usize,
values_per_chunk: usize,
}
impl TestSource {
fn new(num_chunks: usize, values_per_chunk: usize) -> Self {
Self {
remaining: num_chunks,
values_per_chunk,
}
}
}
impl Source for TestSource {
fn next_chunk(&mut self, _chunk_size: usize) -> Result<Option<DataChunk>, OperatorError> {
if self.remaining == 0 {
return Ok(None);
}
self.remaining -= 1;
let values: Vec<Value> = (0..self.values_per_chunk)
.map(|i| Value::Int64(i as i64))
.collect();
let vector = ValueVector::from_values(&values);
let chunk = DataChunk::new(vec![vector]);
Ok(Some(chunk))
}
fn reset(&mut self) {}
fn name(&self) -> &'static str {
"TestSource"
}
}
struct TestSink {
chunks: Vec<DataChunk>,
finalized: bool,
}
impl TestSink {
fn new() -> Self {
Self {
chunks: Vec::new(),
finalized: false,
}
}
}
impl Sink for TestSink {
fn consume(&mut self, chunk: DataChunk) -> Result<bool, OperatorError> {
self.chunks.push(chunk);
Ok(true)
}
fn finalize(&mut self) -> Result<(), OperatorError> {
self.finalized = true;
Ok(())
}
fn name(&self) -> &'static str {
"TestSink"
}
}
struct PassThroughOperator;
impl PushOperator for PassThroughOperator {
fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
sink.consume(chunk)
}
fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
Ok(())
}
fn name(&self) -> &'static str {
"PassThrough"
}
}
#[test]
fn test_simple_pipeline() {
let source = Box::new(TestSource::new(3, 10));
let sink = Box::new(TestSink::new());
let mut pipeline = Pipeline::simple(source, sink);
pipeline.execute().unwrap();
}
#[test]
fn test_pipeline_with_operator() {
let source = Box::new(TestSource::new(2, 5));
let sink = Box::new(TestSink::new());
let mut pipeline =
Pipeline::simple(source, sink).with_operator(Box::new(PassThroughOperator));
pipeline.execute().unwrap();
}
#[test]
fn test_chunk_collector() {
let mut collector = ChunkCollector::new();
assert!(collector.is_empty());
let values: Vec<Value> = vec![Value::Int64(1), Value::Int64(2)];
let vector = ValueVector::from_values(&values);
let chunk = DataChunk::new(vec![vector]);
collector.consume(chunk).unwrap();
assert!(!collector.is_empty());
assert_eq!(collector.row_count(), 2);
let merged = collector.into_single_chunk();
assert_eq!(merged.len(), 2);
}
#[test]
fn test_chunk_size_hints() {
assert_eq!(ChunkSizeHint::default(), ChunkSizeHint::Default);
let source = Box::new(TestSource::new(1, 10));
let sink = Box::new(TestSink::new());
struct SmallHintOp;
impl PushOperator for SmallHintOp {
fn push(
&mut self,
chunk: DataChunk,
sink: &mut dyn Sink,
) -> Result<bool, OperatorError> {
sink.consume(chunk)
}
fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
Ok(())
}
fn preferred_chunk_size(&self) -> ChunkSizeHint {
ChunkSizeHint::Small
}
fn name(&self) -> &'static str {
"SmallHint"
}
}
let pipeline = Pipeline::simple(source, sink).with_operator(Box::new(SmallHintOp));
let computed_size = pipeline.compute_chunk_size();
assert!(computed_size <= SMALL_CHUNK_SIZE);
}
}