use crate::error::DbxResult;
use crate::sql::executor::operators::PhysicalOperator;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
pub struct LimitOperator {
input: Box<dyn PhysicalOperator>,
count: usize,
offset: usize,
emitted: usize,
skipped: usize,
}
impl LimitOperator {
pub fn new(input: Box<dyn PhysicalOperator>, count: usize, offset: usize) -> Self {
Self {
input,
count,
offset,
emitted: 0,
skipped: 0,
}
}
}
impl PhysicalOperator for LimitOperator {
fn schema(&self) -> &Schema {
self.input.schema()
}
fn next(&mut self) -> DbxResult<Option<RecordBatch>> {
if self.emitted >= self.count {
return Ok(None);
}
loop {
match self.input.next()? {
None => return Ok(None),
Some(batch) => {
let batch_rows = batch.num_rows();
if self.skipped < self.offset {
let remaining_to_skip = self.offset - self.skipped;
if batch_rows <= remaining_to_skip {
self.skipped += batch_rows;
continue;
} else {
self.skipped = self.offset;
let remaining_rows = batch_rows - remaining_to_skip;
let remaining_to_emit = self.count - self.emitted;
let take = remaining_rows.min(remaining_to_emit);
self.emitted += take;
return Ok(Some(batch.slice(remaining_to_skip, take)));
}
}
let remaining = self.count - self.emitted;
if batch_rows <= remaining {
self.emitted += batch_rows;
return Ok(Some(batch));
} else {
self.emitted += remaining;
return Ok(Some(batch.slice(0, remaining)));
}
}
}
}
}
fn reset(&mut self) -> DbxResult<()> {
self.emitted = 0;
self.skipped = 0;
self.input.reset()
}
}