use crate::execution::chunk::DataChunk;
use crate::execution::operators::OperatorError;
use crate::execution::pipeline::{ChunkSizeHint, PushOperator, Sink};
use crate::execution::selection::SelectionVector;
pub struct LimitPushOperator {
limit: usize,
passed: usize,
}
impl LimitPushOperator {
pub fn new(limit: usize) -> Self {
Self { limit, passed: 0 }
}
pub fn limit(&self) -> usize {
self.limit
}
pub fn passed(&self) -> usize {
self.passed
}
pub fn is_exhausted(&self) -> bool {
self.passed >= self.limit
}
}
impl PushOperator for LimitPushOperator {
fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
if self.passed >= self.limit {
return Ok(false);
}
let chunk_len = chunk.len();
let remaining = self.limit - self.passed;
if chunk_len <= remaining {
self.passed += chunk_len;
let should_continue = sink.consume(chunk)?;
Ok(should_continue && self.passed < self.limit)
} else {
self.passed += remaining;
let selection = SelectionVector::new_all(remaining);
let truncated = chunk.filter(&selection);
sink.consume(truncated)?;
Ok(false) }
}
fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
Ok(())
}
fn preferred_chunk_size(&self) -> ChunkSizeHint {
if self.limit < 256 {
ChunkSizeHint::AtMost(self.limit)
} else if self.limit < 1000 {
ChunkSizeHint::Small
} else {
ChunkSizeHint::Default
}
}
fn name(&self) -> &'static str {
"LimitPush"
}
}
pub struct SkipPushOperator {
skip: usize,
skipped: usize,
}
impl SkipPushOperator {
pub fn new(skip: usize) -> Self {
Self { skip, skipped: 0 }
}
pub fn skip(&self) -> usize {
self.skip
}
pub fn skip_complete(&self) -> bool {
self.skipped >= self.skip
}
}
impl PushOperator for SkipPushOperator {
fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
if self.skipped >= self.skip {
return sink.consume(chunk);
}
let chunk_len = chunk.len();
let remaining_to_skip = self.skip - self.skipped;
if chunk_len <= remaining_to_skip {
self.skipped += chunk_len;
Ok(true)
} else {
self.skipped = self.skip;
let start = remaining_to_skip;
let selection = SelectionVector::from_predicate(chunk_len, |i| i >= start);
let passed = chunk.filter(&selection);
sink.consume(passed)
}
}
fn finalize(&mut self, _sink: &mut dyn Sink) -> Result<(), OperatorError> {
Ok(())
}
fn name(&self) -> &'static str {
"SkipPush"
}
}
pub struct SkipLimitPushOperator {
skip: SkipPushOperator,
limit: LimitPushOperator,
}
impl SkipLimitPushOperator {
pub fn new(skip: usize, limit: usize) -> Self {
Self {
skip: SkipPushOperator::new(skip),
limit: LimitPushOperator::new(limit),
}
}
}
impl PushOperator for SkipLimitPushOperator {
fn push(&mut self, chunk: DataChunk, sink: &mut dyn Sink) -> Result<bool, OperatorError> {
if self.limit.is_exhausted() {
return Ok(false);
}
if !self.skip.skip_complete() {
let chunk_len = chunk.len();
let remaining_to_skip = self.skip.skip - self.skip.skipped;
if chunk_len <= remaining_to_skip {
self.skip.skipped += chunk_len;
return Ok(true);
}
self.skip.skipped = self.skip.skip;
let start = remaining_to_skip;
let selection = SelectionVector::from_predicate(chunk_len, |i| i >= start);
let passed = chunk.filter(&selection);
return self.limit.push(passed, sink);
}
self.limit.push(chunk, sink)
}
fn finalize(&mut self, sink: &mut dyn Sink) -> Result<(), OperatorError> {
self.skip.finalize(sink)?;
self.limit.finalize(sink)
}
fn preferred_chunk_size(&self) -> ChunkSizeHint {
self.limit.preferred_chunk_size()
}
fn name(&self) -> &'static str {
"SkipLimitPush"
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::execution::sink::CollectorSink;
use crate::execution::vector::ValueVector;
use grafeo_common::types::Value;
fn create_test_chunk(values: &[i64]) -> DataChunk {
let v: Vec<Value> = values.iter().map(|&i| Value::Int64(i)).collect();
let vector = ValueVector::from_values(&v);
DataChunk::new(vec![vector])
}
#[test]
fn test_limit_under_limit() {
let mut limit = LimitPushOperator::new(10);
let mut sink = CollectorSink::new();
limit
.push(create_test_chunk(&[1, 2, 3]), &mut sink)
.unwrap();
limit.finalize(&mut sink).unwrap();
assert_eq!(sink.row_count(), 3);
assert!(!limit.is_exhausted());
}
#[test]
fn test_limit_exact_limit() {
let mut limit = LimitPushOperator::new(5);
let mut sink = CollectorSink::new();
let should_continue = limit
.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
.unwrap();
limit.finalize(&mut sink).unwrap();
assert_eq!(sink.row_count(), 5);
assert!(!should_continue);
assert!(limit.is_exhausted());
}
#[test]
fn test_limit_over_limit() {
let mut limit = LimitPushOperator::new(3);
let mut sink = CollectorSink::new();
let should_continue = limit
.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
.unwrap();
limit.finalize(&mut sink).unwrap();
assert_eq!(sink.row_count(), 3);
assert!(!should_continue);
}
#[test]
fn test_limit_multiple_chunks() {
let mut limit = LimitPushOperator::new(5);
let mut sink = CollectorSink::new();
limit.push(create_test_chunk(&[1, 2]), &mut sink).unwrap();
limit.push(create_test_chunk(&[3, 4]), &mut sink).unwrap();
let should_continue = limit
.push(create_test_chunk(&[5, 6, 7]), &mut sink)
.unwrap();
limit.finalize(&mut sink).unwrap();
assert_eq!(sink.row_count(), 5);
assert!(!should_continue);
}
#[test]
fn test_skip_under_skip() {
let mut skip = SkipPushOperator::new(10);
let mut sink = CollectorSink::new();
skip.push(create_test_chunk(&[1, 2, 3]), &mut sink).unwrap();
skip.finalize(&mut sink).unwrap();
assert_eq!(sink.row_count(), 0);
assert!(!skip.skip_complete());
}
#[test]
fn test_skip_exact_skip() {
let mut skip = SkipPushOperator::new(3);
let mut sink = CollectorSink::new();
skip.push(create_test_chunk(&[1, 2, 3]), &mut sink).unwrap();
skip.push(create_test_chunk(&[4, 5, 6]), &mut sink).unwrap();
skip.finalize(&mut sink).unwrap();
assert_eq!(sink.row_count(), 3); assert!(skip.skip_complete());
}
#[test]
fn test_skip_partial() {
let mut skip = SkipPushOperator::new(2);
let mut sink = CollectorSink::new();
skip.push(create_test_chunk(&[1, 2, 3, 4, 5]), &mut sink)
.unwrap();
skip.finalize(&mut sink).unwrap();
assert_eq!(sink.row_count(), 3); }
#[test]
fn test_skip_limit_combined() {
let mut op = SkipLimitPushOperator::new(2, 3);
let mut sink = CollectorSink::new();
op.push(create_test_chunk(&[1, 2, 3, 4, 5, 6, 7]), &mut sink)
.unwrap();
op.finalize(&mut sink).unwrap();
assert_eq!(sink.row_count(), 3); }
}