use crate::{
db::{
access::LoweredKey,
cursor::{
ContinuationSignature, CursorBoundary, PlannedCursor, RangeToken,
decode_pk_cursor_boundary_storage_key,
effective_keep_count_for_limit as continuation_keep_count_for_limit,
effective_page_offset_for_window as continuation_page_offset_for_window,
range_token_anchor_key, range_token_from_validated_cursor_anchor,
},
direction::Direction,
executor::{
AccessScanContinuationInput, ContinuationMode, RouteContinuationPlan,
continuation::capabilities::ContinuationCapabilities, route::LoadOrderRouteContract,
},
query::plan::{AccessPlannedQuery, ContinuationPolicy},
},
error::InternalError,
model::entity::EntityModel,
};
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db::executor) struct ScalarContinuationContext {
cursor_boundary: Option<CursorBoundary>,
index_range_token: Option<RangeToken>,
}
impl ScalarContinuationContext {
#[must_use]
pub(in crate::db::executor) const fn initial() -> Self {
Self {
cursor_boundary: None,
index_range_token: None,
}
}
#[must_use]
pub(in crate::db::executor) fn new(cursor: PlannedCursor) -> Self {
let cursor_boundary = cursor.boundary().cloned();
let index_range_token = cursor
.index_range_anchor()
.map(range_token_from_validated_cursor_anchor);
Self {
cursor_boundary,
index_range_token,
}
}
#[must_use]
pub(in crate::db::executor) const fn cursor_boundary(&self) -> Option<&CursorBoundary> {
self.cursor_boundary.as_ref()
}
#[must_use]
pub(in crate::db::executor) const fn has_cursor_boundary(&self) -> bool {
self.cursor_boundary.is_some()
}
pub(in crate::db::executor) fn validate_pk_fast_path_boundary_for_model(
&self,
model: &EntityModel,
) -> Result<(), InternalError> {
let _ = decode_pk_cursor_boundary_storage_key(self.cursor_boundary(), model)?;
Ok(())
}
#[must_use]
pub(in crate::db::executor) const fn index_range_token(&self) -> Option<&RangeToken> {
self.index_range_token.as_ref()
}
#[must_use]
pub(in crate::db::executor) const fn has_index_range_anchor(&self) -> bool {
self.index_range_token.is_some()
}
#[must_use]
pub(in crate::db::executor) const fn route_continuation_mode(&self) -> ContinuationMode {
match (self.has_cursor_boundary(), self.has_index_range_anchor()) {
(_, true) => ContinuationMode::IndexRangeAnchor,
(true, false) => ContinuationMode::CursorBoundary,
(false, false) => ContinuationMode::Initial,
}
}
#[must_use]
pub(in crate::db::executor) const fn continuation_capabilities(
&self,
continuation_policy: ContinuationPolicy,
) -> ContinuationCapabilities {
ContinuationCapabilities::new(self.route_continuation_mode(), continuation_policy)
}
#[must_use]
pub(in crate::db::executor) fn route_continuation_plan(
&self,
plan: &AccessPlannedQuery,
continuation_policy: ContinuationPolicy,
) -> RouteContinuationPlan {
let continuation_capabilities = self.continuation_capabilities(continuation_policy);
RouteContinuationPlan::from_scalar_access_window_plan(
continuation_capabilities,
plan.scalar_access_window_plan(self.has_cursor_boundary()),
)
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db::executor) struct ResolvedScalarContinuationContext {
runtime: ScalarContinuationContext,
continuation_signature: ContinuationSignature,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) struct ScalarRouteContinuationInvariantProjection {
strict_advance_required_when_applied: bool,
effective_offset: u32,
}
impl ScalarRouteContinuationInvariantProjection {
#[must_use]
pub(in crate::db::executor) const fn new(
strict_advance_required_when_applied: bool,
effective_offset: u32,
) -> Self {
Self {
strict_advance_required_when_applied,
effective_offset,
}
}
#[must_use]
pub(in crate::db::executor) const fn strict_advance_required_when_applied(self) -> bool {
self.strict_advance_required_when_applied
}
#[must_use]
pub(in crate::db::executor) const fn effective_offset(self) -> u32 {
self.effective_offset
}
}
impl ResolvedScalarContinuationContext {
#[must_use]
pub(in crate::db::executor) const fn new(
runtime: ScalarContinuationContext,
continuation_signature: ContinuationSignature,
) -> Self {
Self {
runtime,
continuation_signature,
}
}
#[must_use]
pub(in crate::db::executor) const fn route_context(&self) -> &ScalarContinuationContext {
&self.runtime
}
#[must_use]
pub(in crate::db::executor) const fn cursor_boundary(&self) -> Option<&CursorBoundary> {
self.runtime.cursor_boundary()
}
#[must_use]
pub(in crate::db::executor) fn bindings(
&self,
direction: Direction,
) -> ScalarContinuationBindings<'_> {
ScalarContinuationBindings::new(
self.runtime.cursor_boundary(),
self.previous_index_range_anchor(),
direction,
self.continuation_signature,
)
}
#[must_use]
pub(in crate::db::executor) fn access_scan_input(
&self,
direction: Direction,
) -> AccessScanContinuationInput<'_> {
AccessScanContinuationInput::new(self.previous_index_range_anchor(), direction)
}
pub(in crate::db::executor) fn debug_assert_route_continuation_invariants(
&self,
plan: &AccessPlannedQuery,
projection: ScalarRouteContinuationInvariantProjection,
) {
debug_assert!(
projection.strict_advance_required_when_applied(),
"route invariant: continuation executions must enforce strict advancement policy",
);
debug_assert_eq!(
projection.effective_offset(),
continuation_page_offset_for_window(plan, self.cursor_boundary().is_some()),
"route window effective offset must match logical plan offset semantics",
);
}
#[must_use]
fn previous_index_range_anchor(&self) -> Option<&LoweredKey> {
self.runtime.index_range_token().map(range_token_anchor_key)
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db::executor) struct ScalarContinuationBindings<'a> {
cursor_boundary: Option<&'a CursorBoundary>,
previous_index_range_anchor: Option<&'a LoweredKey>,
direction: Direction,
continuation_signature: ContinuationSignature,
}
impl<'a> ScalarContinuationBindings<'a> {
#[must_use]
pub(in crate::db::executor) const fn new(
cursor_boundary: Option<&'a CursorBoundary>,
previous_index_range_anchor: Option<&'a LoweredKey>,
direction: Direction,
continuation_signature: ContinuationSignature,
) -> Self {
Self {
cursor_boundary,
previous_index_range_anchor,
direction,
continuation_signature,
}
}
#[must_use]
pub(in crate::db::executor) const fn post_access_cursor_boundary(
&self,
) -> Option<&'a CursorBoundary> {
self.cursor_boundary
}
#[must_use]
pub(in crate::db::executor) const fn continuation_applied(&self) -> bool {
self.cursor_boundary.is_some()
}
#[must_use]
pub(in crate::db::executor) fn keep_count_for_limit_window(
&self,
plan: &AccessPlannedQuery,
limit: u32,
) -> usize {
continuation_keep_count_for_limit(plan, self.continuation_applied(), limit)
}
#[must_use]
pub(in crate::db::executor) const fn previous_index_range_anchor(
&self,
) -> Option<&'a LoweredKey> {
self.previous_index_range_anchor
}
#[must_use]
pub(in crate::db::executor) const fn direction(&self) -> Direction {
self.direction
}
#[must_use]
pub(in crate::db::executor) const fn continuation_signature(&self) -> ContinuationSignature {
self.continuation_signature
}
pub(in crate::db::executor) fn validate_load_scan_budget_hint(
&self,
scan_budget_hint: Option<usize>,
load_order_route_contract: LoadOrderRouteContract,
) -> Result<(), InternalError> {
if scan_budget_hint.is_some() {
if self.continuation_applied() {
return Err(InternalError::query_executor_invariant(
"load page scan budget hint requires non-continuation execution",
));
}
if !load_order_route_contract.allows_streaming_load() {
return Err(InternalError::query_executor_invariant(
"load page scan budget hint requires streaming-safe access shape",
));
}
}
Ok(())
}
}