use crate::{
db::{
access::AccessPlan,
cursor::{ContinuationSignature, CursorPlanError, GroupedPlannedCursor, PlannedCursor},
query::plan::{
AccessPlannedQuery, ExecutionOrderContract, ExecutionShapeSignature,
GroupedCursorPolicyViolation, grouped_cursor_policy_violation,
},
},
value::Value,
};
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct PlannedContinuationContract {
pub(in crate::db) shape_signature: ExecutionShapeSignature,
pub(in crate::db) boundary_arity: usize,
pub(in crate::db) window_size: usize,
pub(in crate::db) order_contract: ExecutionOrderContract,
page_limit: Option<usize>,
access: AccessPlan<Value>,
grouped_cursor_policy_violation: Option<GroupedCursorPolicyViolation>,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub(in crate::db) struct GroupedContinuationWindow {
limit: Option<usize>,
initial_offset_for_page: usize,
selection_bound: Option<usize>,
resume_initial_offset: u32,
resume_boundary: Option<Value>,
}
impl GroupedContinuationWindow {
const fn new(
limit: Option<usize>,
initial_offset_for_page: usize,
selection_bound: Option<usize>,
resume_initial_offset: u32,
resume_boundary: Option<Value>,
) -> Self {
Self {
limit,
initial_offset_for_page,
selection_bound,
resume_initial_offset,
resume_boundary,
}
}
#[must_use]
pub(in crate::db) fn into_parts(
self,
) -> (Option<usize>, usize, Option<usize>, u32, Option<Value>) {
(
self.limit,
self.initial_offset_for_page,
self.selection_bound,
self.resume_initial_offset,
self.resume_boundary,
)
}
}
struct GroupedWindowProjection {
limit: Option<usize>,
initial_offset_for_page: usize,
selection_bound: Option<usize>,
resume_initial_offset: u32,
resume_boundary: Option<Value>,
}
impl GroupedWindowProjection {
fn from_contract_and_cursor(
contract: &PlannedContinuationContract,
cursor: &GroupedPlannedCursor,
) -> Self {
let resume_initial_offset = if cursor.is_empty() {
contract.effective_offset(false)
} else {
cursor.initial_offset()
};
let initial_offset_for_page = if cursor.is_empty() {
contract.window_size()
} else {
0
};
let selection_bound = contract.page_limit().and_then(|limit| {
limit
.checked_add(initial_offset_for_page)
.and_then(|count| count.checked_add(1))
});
let resume_boundary = cursor
.last_group_key()
.map(|last_group_key| Value::List(last_group_key.to_vec()));
Self {
limit: contract.page_limit(),
initial_offset_for_page,
selection_bound,
resume_initial_offset,
resume_boundary,
}
}
fn into_window(self) -> GroupedContinuationWindow {
GroupedContinuationWindow::new(
self.limit,
self.initial_offset_for_page,
self.selection_bound,
self.resume_initial_offset,
self.resume_boundary,
)
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum GroupedCursorAction {
Prepare,
Revalidate,
PagingWindow,
}
impl GroupedCursorAction {
const fn grouped_plan_required_message(self) -> &'static str {
match self {
Self::Prepare => "grouped cursor preparation requires grouped logical plans",
Self::Revalidate => "grouped cursor revalidation requires grouped logical plans",
Self::PagingWindow => "grouped paging window requires grouped logical plans",
}
}
}
#[must_use]
pub(in crate::db) const fn effective_offset_for_cursor_window(
window_size: u32,
cursor_present: bool,
) -> u32 {
if cursor_present { 0 } else { window_size }
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub(in crate::db) struct ScalarAccessWindowPlan {
effective_offset: u32,
limit: Option<u32>,
}
impl ScalarAccessWindowPlan {
#[must_use]
pub(in crate::db) const fn new(effective_offset: u32, limit: Option<u32>) -> Self {
Self {
effective_offset,
limit,
}
}
#[must_use]
pub(in crate::db) const fn limit(self) -> Option<u32> {
self.limit
}
#[must_use]
pub(in crate::db) fn keep_count(self) -> Option<usize> {
let limit = self.limit?;
let offset = usize::try_from(self.effective_offset).unwrap_or(usize::MAX);
let limit = usize::try_from(limit).unwrap_or(usize::MAX);
Some(offset.saturating_add(limit))
}
#[must_use]
pub(in crate::db) fn lower_bound(self) -> usize {
usize::try_from(self.effective_offset).unwrap_or(usize::MAX)
}
#[must_use]
pub(in crate::db) fn fetch_count(self) -> Option<usize> {
let keep_count = self.keep_count();
if self.limit.is_none() {
return keep_count;
}
if self.limit == Some(0) {
return Some(0);
}
keep_count.map(|fetch| fetch.saturating_add(1))
}
}
impl PlannedContinuationContract {
#[must_use]
pub(in crate::db) const fn new(
shape_signature: ExecutionShapeSignature,
boundary_arity: usize,
window_size: usize,
order_contract: ExecutionOrderContract,
page_limit: Option<usize>,
access: AccessPlan<Value>,
grouped_cursor_policy_violation: Option<GroupedCursorPolicyViolation>,
) -> Self {
Self {
shape_signature,
boundary_arity,
window_size,
order_contract,
page_limit,
access,
grouped_cursor_policy_violation,
}
}
#[must_use]
pub(in crate::db) const fn is_grouped(&self) -> bool {
self.order_contract.is_grouped()
}
#[must_use]
pub(in crate::db) const fn order_contract(&self) -> &ExecutionOrderContract {
&self.order_contract
}
#[must_use]
pub(in crate::db) const fn page_limit(&self) -> Option<usize> {
self.page_limit
}
#[must_use]
pub(in crate::db) const fn window_size(&self) -> usize {
self.window_size
}
#[must_use]
pub(in crate::db) const fn access_plan(&self) -> &AccessPlan<Value> {
&self.access
}
#[must_use]
pub(in crate::db) const fn grouped_cursor_policy_violation(
&self,
) -> Option<GroupedCursorPolicyViolation> {
self.grouped_cursor_policy_violation
}
#[must_use]
pub(in crate::db) const fn continuation_signature(&self) -> ContinuationSignature {
self.shape_signature.continuation_signature()
}
#[must_use]
pub(in crate::db) const fn boundary_arity(&self) -> usize {
self.boundary_arity
}
#[must_use]
pub(in crate::db) fn expected_initial_offset(&self) -> u32 {
u32::try_from(self.window_size()).unwrap_or(u32::MAX)
}
#[must_use]
pub(in crate::db) fn effective_offset(&self, cursor_present: bool) -> u32 {
effective_offset_for_cursor_window(self.expected_initial_offset(), cursor_present)
}
pub(in crate::db) fn prepare_scalar_cursor(
&self,
entity_path: &'static str,
entity_tag: crate::types::EntityTag,
entity_model: &crate::model::entity::EntityModel,
bytes: Option<&[u8]>,
) -> Result<PlannedCursor, CursorPlanError> {
if self.is_grouped() {
return Err(CursorPlanError::continuation_cursor_invariant(
"grouped plans require grouped cursor preparation",
));
}
crate::db::cursor::prepare_cursor(
self.access_plan().resolve_strategy().as_path().cloned(),
entity_path,
entity_tag,
entity_model,
self.order_contract.order_spec(),
self.order_contract.direction(),
self.continuation_signature(),
self.expected_initial_offset(),
bytes,
)
}
#[cfg(test)]
pub(in crate::db) fn prepare_grouped_cursor(
&self,
entity_path: &'static str,
bytes: Option<&[u8]>,
) -> Result<GroupedPlannedCursor, CursorPlanError> {
self.validate_grouped_cursor_contract(GroupedCursorAction::Prepare, bytes.is_some())?;
crate::db::cursor::prepare_grouped_cursor(
entity_path,
self.order_contract.order_spec(),
self.continuation_signature(),
self.expected_initial_offset(),
bytes,
)
}
pub(in crate::db) fn prepare_grouped_cursor_token(
&self,
entity_path: &'static str,
cursor: Option<crate::db::cursor::GroupedContinuationToken>,
) -> Result<GroupedPlannedCursor, CursorPlanError> {
self.validate_grouped_cursor_contract(GroupedCursorAction::Prepare, cursor.is_some())?;
crate::db::cursor::prepare_grouped_cursor_token(
entity_path,
self.order_contract.order_spec(),
self.continuation_signature(),
self.expected_initial_offset(),
cursor,
)
}
pub(in crate::db) fn revalidate_scalar_cursor(
&self,
entity_tag: crate::types::EntityTag,
entity_model: &crate::model::entity::EntityModel,
cursor: PlannedCursor,
) -> Result<PlannedCursor, CursorPlanError> {
if self.is_grouped() {
return Err(CursorPlanError::continuation_cursor_invariant(
"grouped plans require grouped cursor revalidation",
));
}
crate::db::cursor::revalidate_cursor(
self.access_plan().resolve_strategy().as_path().cloned(),
entity_tag,
entity_model,
self.order_contract.order_spec(),
self.order_contract.direction(),
self.expected_initial_offset(),
cursor,
)
}
pub(in crate::db) fn revalidate_grouped_cursor(
&self,
cursor: GroupedPlannedCursor,
) -> Result<GroupedPlannedCursor, CursorPlanError> {
self.validate_grouped_cursor_contract(GroupedCursorAction::Revalidate, !cursor.is_empty())?;
crate::db::cursor::revalidate_grouped_cursor(self.expected_initial_offset(), cursor)
}
pub(in crate::db) fn project_grouped_paging_window(
&self,
cursor: &GroupedPlannedCursor,
) -> Result<GroupedContinuationWindow, CursorPlanError> {
self.validate_grouped_cursor_contract(
GroupedCursorAction::PagingWindow,
!cursor.is_empty(),
)?;
Ok(GroupedWindowProjection::from_contract_and_cursor(self, cursor).into_window())
}
fn validate_grouped_cursor_contract(
&self,
action: GroupedCursorAction,
cursor_applied: bool,
) -> Result<(), CursorPlanError> {
if !self.is_grouped() {
return Err(CursorPlanError::continuation_cursor_invariant(
action.grouped_plan_required_message(),
));
}
self.validate_grouped_cursor_policy_if_applied(cursor_applied)
}
fn validate_grouped_cursor_policy_if_applied(
&self,
cursor_applied: bool,
) -> Result<(), CursorPlanError> {
if !cursor_applied {
return Ok(());
}
self.validate_grouped_cursor_policy()
}
fn validate_grouped_cursor_policy(&self) -> Result<(), CursorPlanError> {
if let Some(violation) = self.grouped_cursor_policy_violation() {
return Err(violation.into_cursor_plan_error());
}
Ok(())
}
}
impl AccessPlannedQuery {
#[must_use]
pub(in crate::db) fn scalar_access_window_plan(
&self,
cursor_present: bool,
) -> ScalarAccessWindowPlan {
let page = self.scalar_plan().page.as_ref();
let offset = page.map_or(0, |page| page.offset);
let limit = page.and_then(|page| page.limit);
let effective_offset = effective_offset_for_cursor_window(offset, cursor_present);
ScalarAccessWindowPlan::new(effective_offset, limit)
}
#[must_use]
pub(in crate::db) fn planned_continuation_contract(
&self,
entity_path: &'static str,
) -> Option<PlannedContinuationContract> {
if !self.scalar_plan().mode.is_load() {
return None;
}
let shape_signature = self.execution_shape_signature(entity_path);
let boundary_arity = self.grouped_plan().map_or_else(
|| {
self.scalar_plan()
.order
.as_ref()
.map_or(0, |order| order.fields.len())
},
|grouped| grouped.group.group_fields.len(),
);
let window_size = self
.scalar_plan()
.page
.as_ref()
.map_or(0, |page| usize::try_from(page.offset).unwrap_or(usize::MAX));
let page_limit = self
.scalar_plan()
.page
.as_ref()
.and_then(|page| page.limit)
.map(|limit| usize::try_from(limit).unwrap_or(usize::MAX));
let is_grouped = self.grouped_plan().is_some();
let order_contract =
ExecutionOrderContract::from_plan(is_grouped, self.scalar_plan().order.as_ref());
let access = self.access.clone();
let grouped_cursor_policy_violation = self
.grouped_plan()
.and_then(|grouped| grouped_cursor_policy_violation(grouped, true));
Some(PlannedContinuationContract::new(
shape_signature,
boundary_arity,
window_size,
order_contract,
page_limit,
access,
grouped_cursor_policy_violation,
))
}
}
#[cfg(test)]
mod tests {
use super::{PlannedContinuationContract, ScalarAccessWindowPlan};
use crate::{
db::{
access::{AccessPath, AccessPlan},
cursor::{
ContinuationSignature, CursorPlanError, GroupedContinuationToken,
GroupedPlannedCursor,
},
direction::Direction,
query::plan::{
ExecutionOrderContract, ExecutionShapeSignature, GroupedCursorPolicyViolation,
},
},
value::Value,
};
fn continuation_signature_fixture() -> ContinuationSignature {
ContinuationSignature::from_bytes([0x11; 32])
}
fn grouped_contract(
violation: Option<GroupedCursorPolicyViolation>,
) -> PlannedContinuationContract {
PlannedContinuationContract::new(
ExecutionShapeSignature::new(continuation_signature_fixture()),
1,
4,
ExecutionOrderContract::from_plan(true, None),
Some(2),
AccessPlan::path(AccessPath::FullScan),
violation,
)
}
fn applied_grouped_cursor(contract: &PlannedContinuationContract) -> GroupedPlannedCursor {
GroupedPlannedCursor::new(vec![Value::Uint(7)], contract.expected_initial_offset())
}
#[test]
fn scalar_access_window_fetch_count_unbounded_remains_unbounded() {
let window = ScalarAccessWindowPlan::new(3, None);
assert_eq!(window.fetch_count(), None);
}
#[test]
fn scalar_access_window_fetch_count_bounded_adds_lookahead_row() {
let window = ScalarAccessWindowPlan::new(3, Some(2));
assert_eq!(window.keep_count(), Some(5));
assert_eq!(window.fetch_count(), Some(6));
}
#[test]
fn scalar_access_window_fetch_count_limit_zero_projects_zero_lookahead() {
let window = ScalarAccessWindowPlan::new(4, Some(0));
assert_eq!(window.keep_count(), Some(4));
assert_eq!(window.fetch_count(), Some(0));
}
#[test]
fn grouped_cursor_contract_shares_policy_gate_for_token_and_window_paths() {
let contract = grouped_contract(Some(
GroupedCursorPolicyViolation::ContinuationRequiresLimit,
));
let continuation_token = GroupedContinuationToken::new_with_direction(
continuation_signature_fixture(),
vec![Value::Uint(7)],
Direction::Asc,
contract.expected_initial_offset(),
);
let token_err = contract
.prepare_grouped_cursor_token("PlanEntity", Some(continuation_token))
.expect_err("grouped cursor token reuse should honor grouped cursor policy");
let window_err = contract
.project_grouped_paging_window(&applied_grouped_cursor(&contract))
.expect_err("grouped paging window should honor grouped cursor policy");
assert!(matches!(
&token_err,
CursorPlanError::ContinuationCursorInvariantViolation { reason }
if reason == "grouped continuation cursors require an explicit LIMIT"
));
assert_eq!(
token_err.to_string(),
window_err.to_string(),
"grouped token preparation and grouped paging window must project the same grouped cursor policy error",
);
}
#[test]
fn grouped_cursor_contract_skips_policy_gate_for_initial_grouped_page() {
let contract = grouped_contract(Some(
GroupedCursorPolicyViolation::ContinuationRequiresLimit,
));
let prepared = contract
.prepare_grouped_cursor_token("PlanEntity", None)
.expect("initial grouped page should not be blocked by continuation-only policy");
let window = contract
.project_grouped_paging_window(&GroupedPlannedCursor::none())
.expect(
"initial grouped page window should not be blocked by continuation-only policy",
);
let (
limit,
initial_offset_for_page,
selection_bound,
resume_initial_offset,
resume_boundary,
) = window.into_parts();
assert!(prepared.is_empty());
assert_eq!(limit, Some(2));
assert_eq!(initial_offset_for_page, 4);
assert_eq!(selection_bound, Some(7));
assert_eq!(resume_initial_offset, 4);
assert_eq!(resume_boundary, None);
}
}