quill_sql/execution/physical_plan/
limit.rs1use std::sync::atomic::AtomicUsize;
2use std::sync::Arc;
3
4use crate::catalog::SchemaRef;
5use crate::{
6 error::QuillSQLResult,
7 execution::{ExecutionContext, VolcanoExecutor},
8 storage::tuple::Tuple,
9};
10
11use super::PhysicalPlan;
12
13#[derive(Debug)]
14pub struct PhysicalLimit {
15 pub limit: Option<usize>,
16 pub offset: usize,
17 pub input: Arc<PhysicalPlan>,
18
19 cursor: AtomicUsize,
20}
21impl PhysicalLimit {
22 pub fn new(limit: Option<usize>, offset: usize, input: Arc<PhysicalPlan>) -> Self {
23 PhysicalLimit {
24 limit,
25 offset,
26 input,
27 cursor: AtomicUsize::new(0),
28 }
29 }
30}
31impl VolcanoExecutor for PhysicalLimit {
32 fn init(&self, context: &mut ExecutionContext) -> QuillSQLResult<()> {
33 self.input.init(context)?;
34 self.cursor.store(0, std::sync::atomic::Ordering::SeqCst);
35 Ok(())
36 }
37 fn next(&self, context: &mut ExecutionContext) -> QuillSQLResult<Option<Tuple>> {
38 loop {
39 let next_tuple = self.input.next(context)?;
40 if next_tuple.is_none() {
41 return Ok(None);
42 }
43 let cursor = self
44 .cursor
45 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
46 if cursor < self.offset {
47 continue;
48 }
49 return if let Some(limit) = self.limit {
50 if cursor < self.offset + limit {
51 Ok(next_tuple)
52 } else {
53 Ok(None)
54 }
55 } else {
56 Ok(next_tuple)
57 };
58 }
59 }
60
61 fn output_schema(&self) -> SchemaRef {
62 self.input.output_schema()
63 }
64}
65
66impl std::fmt::Display for PhysicalLimit {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 write!(f, "Limit")
69 }
70}