use grafeo_common::types::{LogicalType, Value};
use super::{Operator, OperatorResult};
use crate::execution::chunk::DataChunkBuilder;
pub struct LimitOperator {
child: Box<dyn Operator>,
limit: usize,
output_schema: Vec<LogicalType>,
returned: usize,
}
impl LimitOperator {
pub fn new(child: Box<dyn Operator>, limit: usize, output_schema: Vec<LogicalType>) -> Self {
Self {
child,
limit,
output_schema,
returned: 0,
}
}
pub fn into_parts(self) -> (Box<dyn Operator>, usize) {
(self.child, self.limit)
}
}
impl Operator for LimitOperator {
fn next(&mut self) -> OperatorResult {
if self.returned >= self.limit {
return Ok(None);
}
let remaining = self.limit - self.returned;
loop {
let Some(chunk) = self.child.next()? else {
return Ok(None);
};
let row_count = chunk.row_count();
if row_count == 0 {
continue;
}
if row_count <= remaining {
self.returned += row_count;
return Ok(Some(chunk));
}
let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, remaining);
let mut count = 0;
for row in chunk.selected_indices() {
if count >= remaining {
break;
}
for col_idx in 0..chunk.column_count() {
if let (Some(src_col), Some(dst_col)) =
(chunk.column(col_idx), builder.column_mut(col_idx))
{
if let Some(value) = src_col.get_value(row) {
dst_col.push_value(value);
} else {
dst_col.push_value(Value::Null);
}
}
}
builder.advance_row();
count += 1;
}
self.returned += count;
return Ok(Some(builder.finish()));
}
}
fn reset(&mut self) {
self.child.reset();
self.returned = 0;
}
fn name(&self) -> &'static str {
"Limit"
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
self
}
}
pub struct SkipOperator {
child: Box<dyn Operator>,
skip: usize,
output_schema: Vec<LogicalType>,
skipped: usize,
}
impl SkipOperator {
pub fn new(child: Box<dyn Operator>, skip: usize, output_schema: Vec<LogicalType>) -> Self {
Self {
child,
skip,
output_schema,
skipped: 0,
}
}
}
impl Operator for SkipOperator {
fn next(&mut self) -> OperatorResult {
while self.skipped < self.skip {
let Some(chunk) = self.child.next()? else {
return Ok(None);
};
let row_count = chunk.row_count();
let to_skip = (self.skip - self.skipped).min(row_count);
if to_skip >= row_count {
self.skipped += row_count;
continue;
}
self.skipped = self.skip;
let mut builder =
DataChunkBuilder::with_capacity(&self.output_schema, row_count - to_skip);
let rows: Vec<usize> = chunk.selected_indices().collect();
for &row in rows.iter().skip(to_skip) {
for col_idx in 0..chunk.column_count() {
if let (Some(src_col), Some(dst_col)) =
(chunk.column(col_idx), builder.column_mut(col_idx))
{
if let Some(value) = src_col.get_value(row) {
dst_col.push_value(value);
} else {
dst_col.push_value(Value::Null);
}
}
}
builder.advance_row();
}
return Ok(Some(builder.finish()));
}
self.child.next()
}
fn reset(&mut self) {
self.child.reset();
self.skipped = 0;
}
fn name(&self) -> &'static str {
"Skip"
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
self
}
}
pub struct LimitSkipOperator {
child: Box<dyn Operator>,
skip: usize,
limit: usize,
output_schema: Vec<LogicalType>,
skipped: usize,
returned: usize,
}
impl LimitSkipOperator {
pub fn new(
child: Box<dyn Operator>,
skip: usize,
limit: usize,
output_schema: Vec<LogicalType>,
) -> Self {
Self {
child,
skip,
limit,
output_schema,
skipped: 0,
returned: 0,
}
}
}
impl Operator for LimitSkipOperator {
fn next(&mut self) -> OperatorResult {
if self.returned >= self.limit {
return Ok(None);
}
loop {
let Some(chunk) = self.child.next()? else {
return Ok(None);
};
let row_count = chunk.row_count();
if row_count == 0 {
continue;
}
let rows: Vec<usize> = chunk.selected_indices().collect();
let mut start_idx = 0;
if self.skipped < self.skip {
let to_skip = (self.skip - self.skipped).min(row_count);
if to_skip >= row_count {
self.skipped += row_count;
continue;
}
self.skipped = self.skip;
start_idx = to_skip;
}
let remaining_in_chunk = row_count - start_idx;
let remaining_to_return = self.limit - self.returned;
let to_return = remaining_in_chunk.min(remaining_to_return);
if to_return == 0 {
return Ok(None);
}
let mut builder = DataChunkBuilder::with_capacity(&self.output_schema, to_return);
for &row in rows.iter().skip(start_idx).take(to_return) {
for col_idx in 0..chunk.column_count() {
if let (Some(src_col), Some(dst_col)) =
(chunk.column(col_idx), builder.column_mut(col_idx))
{
if let Some(value) = src_col.get_value(row) {
dst_col.push_value(value);
} else {
dst_col.push_value(Value::Null);
}
}
}
builder.advance_row();
}
self.returned += to_return;
return Ok(Some(builder.finish()));
}
}
fn reset(&mut self) {
self.child.reset();
self.skipped = 0;
self.returned = 0;
}
fn name(&self) -> &'static str {
"LimitSkip"
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::execution::DataChunk;
use crate::execution::chunk::DataChunkBuilder;
struct MockOperator {
chunks: Vec<DataChunk>,
position: usize,
}
impl MockOperator {
fn new(chunks: Vec<DataChunk>) -> Self {
Self {
chunks,
position: 0,
}
}
}
impl Operator for MockOperator {
fn next(&mut self) -> OperatorResult {
if self.position < self.chunks.len() {
let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
self.position += 1;
Ok(Some(chunk))
} else {
Ok(None)
}
}
fn reset(&mut self) {
self.position = 0;
}
fn name(&self) -> &'static str {
"Mock"
}
fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
self
}
}
fn create_numbered_chunk(values: &[i64]) -> DataChunk {
let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
for &v in values {
builder.column_mut(0).unwrap().push_int64(v);
builder.advance_row();
}
builder.finish()
}
#[test]
fn test_limit() {
let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3, 4, 5])]);
let mut limit = LimitOperator::new(Box::new(mock), 3, vec![LogicalType::Int64]);
let mut results = Vec::new();
while let Some(chunk) = limit.next().unwrap() {
for row in chunk.selected_indices() {
let val = chunk.column(0).unwrap().get_int64(row).unwrap();
results.push(val);
}
}
assert_eq!(results, vec![1, 2, 3]);
}
#[test]
fn test_limit_larger_than_input() {
let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3])]);
let mut limit = LimitOperator::new(Box::new(mock), 10, vec![LogicalType::Int64]);
let mut results = Vec::new();
while let Some(chunk) = limit.next().unwrap() {
for row in chunk.selected_indices() {
let val = chunk.column(0).unwrap().get_int64(row).unwrap();
results.push(val);
}
}
assert_eq!(results, vec![1, 2, 3]);
}
#[test]
fn test_skip() {
let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3, 4, 5])]);
let mut skip = SkipOperator::new(Box::new(mock), 2, vec![LogicalType::Int64]);
let mut results = Vec::new();
while let Some(chunk) = skip.next().unwrap() {
for row in chunk.selected_indices() {
let val = chunk.column(0).unwrap().get_int64(row).unwrap();
results.push(val);
}
}
assert_eq!(results, vec![3, 4, 5]);
}
#[test]
fn test_skip_all() {
let mock = MockOperator::new(vec![create_numbered_chunk(&[1, 2, 3])]);
let mut skip = SkipOperator::new(Box::new(mock), 5, vec![LogicalType::Int64]);
let result = skip.next().unwrap();
assert!(result.is_none());
}
#[test]
fn test_limit_skip_combined() {
let mock = MockOperator::new(vec![create_numbered_chunk(&[
1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
])]);
let mut op = LimitSkipOperator::new(
Box::new(mock),
3, 4, vec![LogicalType::Int64],
);
let mut results = Vec::new();
while let Some(chunk) = op.next().unwrap() {
for row in chunk.selected_indices() {
let val = chunk.column(0).unwrap().get_int64(row).unwrap();
results.push(val);
}
}
assert_eq!(results, vec![4, 5, 6, 7]);
}
#[test]
fn test_limit_across_chunks() {
let mock = MockOperator::new(vec![
create_numbered_chunk(&[1, 2]),
create_numbered_chunk(&[3, 4]),
create_numbered_chunk(&[5, 6]),
]);
let mut limit = LimitOperator::new(Box::new(mock), 5, vec![LogicalType::Int64]);
let mut results = Vec::new();
while let Some(chunk) = limit.next().unwrap() {
for row in chunk.selected_indices() {
let val = chunk.column(0).unwrap().get_int64(row).unwrap();
results.push(val);
}
}
assert_eq!(results, vec![1, 2, 3, 4, 5]);
}
#[test]
fn test_skip_across_chunks() {
let mock = MockOperator::new(vec![
create_numbered_chunk(&[1, 2]),
create_numbered_chunk(&[3, 4]),
create_numbered_chunk(&[5, 6]),
]);
let mut skip = SkipOperator::new(Box::new(mock), 3, vec![LogicalType::Int64]);
let mut results = Vec::new();
while let Some(chunk) = skip.next().unwrap() {
for row in chunk.selected_indices() {
let val = chunk.column(0).unwrap().get_int64(row).unwrap();
results.push(val);
}
}
assert_eq!(results, vec![4, 5, 6]);
}
#[test]
fn test_limit_into_parts() {
let child = Box::new(MockOperator::new(vec![]));
let limit = LimitOperator::new(child, 42, vec![LogicalType::Int64]);
let (_, limit_value) = limit.into_parts();
assert_eq!(limit_value, 42);
}
#[test]
fn test_limit_into_any() {
let child = Box::new(MockOperator::new(vec![]));
let limit: Box<dyn Operator> = Box::new(LimitOperator::new(child, 10, vec![]));
let any = limit.into_any();
assert!(any.downcast::<LimitOperator>().is_ok());
}
}