use crate::{
db::{
cursor::{CursorBoundary, IndexScanContinuationInput},
direction::Direction,
index::RawIndexKey,
query::plan::{AccessPlannedQuery, effective_offset_for_cursor_window},
},
error::InternalError,
};
use std::ops::Bound;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) struct WindowCursorContract {
offset_remaining: usize,
limit_remaining: Option<usize>,
}
impl WindowCursorContract {
#[must_use]
pub(in crate::db) fn from_plan(
plan: &AccessPlannedQuery,
cursor_boundary: Option<&CursorBoundary>,
) -> Self {
let window_size = plan
.scalar_plan()
.page
.as_ref()
.map_or(0, |page| page.offset);
let effective_offset =
effective_offset_for_cursor_window(window_size, cursor_boundary.is_some());
let offset_remaining = usize::try_from(effective_offset).unwrap_or(usize::MAX);
let limit_remaining = plan
.scalar_plan()
.page
.as_ref()
.and_then(|page| page.limit)
.map(|limit| usize::try_from(limit).unwrap_or(usize::MAX));
Self {
offset_remaining,
limit_remaining,
}
}
#[must_use]
pub(in crate::db) const fn unbounded() -> Self {
Self {
offset_remaining: 0,
limit_remaining: None,
}
}
#[must_use]
pub(in crate::db) const fn exhausted(&self) -> bool {
matches!(self.limit_remaining, Some(0))
}
pub(in crate::db) const fn accept_existing_row(&mut self) -> bool {
if self.offset_remaining > 0 {
self.offset_remaining = self.offset_remaining.saturating_sub(1);
return false;
}
if let Some(remaining) = self.limit_remaining.as_mut() {
if *remaining == 0 {
return false;
}
*remaining = remaining.saturating_sub(1);
}
true
}
}
#[must_use]
pub(in crate::db) fn window_cursor_contract_for_plan(
plan: &AccessPlannedQuery,
cursor_boundary: Option<&CursorBoundary>,
) -> WindowCursorContract {
WindowCursorContract::from_plan(plan, cursor_boundary)
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) enum LoopAction {
Skip,
Emit,
Stop,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(crate) struct ContinuationKeyRef<'a> {
raw_key: &'a RawIndexKey,
}
impl<'a> ContinuationKeyRef<'a> {
#[must_use]
pub(crate) const fn scan(raw_key: &'a RawIndexKey) -> Self {
Self { raw_key }
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) struct ContinuationRuntime<'a> {
scan: IndexScanContinuationInput<'a>,
window: WindowCursorContract,
}
impl<'a> ContinuationRuntime<'a> {
#[must_use]
pub(in crate::db) const fn new(
scan: IndexScanContinuationInput<'a>,
window: WindowCursorContract,
) -> Self {
Self { scan, window }
}
#[must_use]
pub(in crate::db) const fn from_window(window: WindowCursorContract) -> Self {
Self::new(
IndexScanContinuationInput::new(None, Direction::Asc),
window,
)
}
#[must_use]
pub(in crate::db) const fn pre_fetch(&self) -> LoopAction {
if self.window.exhausted() {
return LoopAction::Stop;
}
LoopAction::Emit
}
pub(in crate::db) fn scan_bounds(
&self,
bounds: (&Bound<RawIndexKey>, &Bound<RawIndexKey>),
) -> Result<(Bound<RawIndexKey>, Bound<RawIndexKey>), InternalError> {
self.scan.resume_bounds(bounds)
}
pub(in crate::db) fn accept_key(
&self,
key: ContinuationKeyRef<'_>,
) -> Result<LoopAction, InternalError> {
self.scan.validate_candidate_advancement(key.raw_key)?;
Ok(LoopAction::Emit)
}
pub(in crate::db) const fn accept_row(&mut self) -> LoopAction {
if self.window.exhausted() {
return LoopAction::Stop;
}
if !self.window.accept_existing_row() {
return LoopAction::Skip;
}
LoopAction::Emit
}
#[must_use]
pub(in crate::db) const fn direction(&self) -> Direction {
self.scan.direction()
}
}