Skip to main content

quill_sql/execution/physical_plan/
limit.rs

1use std::rc::Rc;
2use std::sync::atomic::AtomicUsize;
3
4use crate::catalog::SchemaRef;
5use crate::{
6    error::QuillSQLResult,
7    execution::{ExecutionContext, VolcanoExecutor},
8    storage::tuple::Tuple,
9};
10
11use super::PhysicalPlan;
12
13#[derive(Debug)]
14pub struct PhysicalLimit {
15    pub limit: Option<usize>,
16    pub offset: usize,
17    pub input: Rc<PhysicalPlan>,
18
19    cursor: AtomicUsize,
20}
21impl PhysicalLimit {
22    pub fn new(limit: Option<usize>, offset: usize, input: Rc<PhysicalPlan>) -> Self {
23        PhysicalLimit {
24            limit,
25            offset,
26            input,
27            cursor: AtomicUsize::new(0),
28        }
29    }
30}
31impl VolcanoExecutor for PhysicalLimit {
32    fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
33        self.input.init(context)?;
34        self.cursor.store(0, std::sync::atomic::Ordering::SeqCst);
35        Ok(())
36    }
37    fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
38        loop {
39            let next_tuple = self.input.next(context)?;
40            if next_tuple.is_none() {
41                return Ok(None);
42            }
43            let cursor = self
44                .cursor
45                .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
46            if cursor < self.offset {
47                continue;
48            }
49            return if let Some(limit) = self.limit {
50                if cursor < self.offset + limit {
51                    Ok(next_tuple)
52                } else {
53                    Ok(None)
54                }
55            } else {
56                Ok(next_tuple)
57            };
58        }
59    }
60
61    fn output_schema(&self) -> SchemaRef {
62        self.input.output_schema()
63    }
64}
65
66impl std::fmt::Display for PhysicalLimit {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        write!(f, "Limit")
69    }
70}