use crate::{
db::{
cursor::{
ContinuationSignature, ContinuationToken, CursorBoundary, CursorPlanError,
GroupedContinuationToken, GroupedPlannedCursor, IndexRangeCursorAnchor, PlannedCursor,
ValidatedInEnvelopeIndexRangeCursorAnchor,
anchor::{
validate_index_range_anchor, validate_index_range_boundary_anchor_consistency,
},
validate_cursor_boundary_for_order, validate_cursor_direction,
validate_cursor_window_offset,
},
direction::Direction,
executor::ExecutableAccessPath,
query::plan::OrderSpec,
},
model::entity::EntityModel,
traits::FieldValue,
types::EntityTag,
};
trait CursorPlanSurface<K: FieldValue> {
fn entity_model(&self) -> &EntityModel;
fn order_spec(&self) -> &OrderSpec;
fn direction(&self) -> Direction;
fn access(&self) -> Option<&ExecutableAccessPath<'_, K>>;
fn initial_offset(&self) -> u32;
}
struct CursorPlanSurfaceAdapter<'a, K> {
access: Option<ExecutableAccessPath<'a, K>>,
model: &'a EntityModel,
order: &'a OrderSpec,
direction: Direction,
initial_offset: u32,
}
impl<K: FieldValue> CursorPlanSurface<K> for CursorPlanSurfaceAdapter<'_, K> {
fn entity_model(&self) -> &EntityModel {
self.model
}
fn order_spec(&self) -> &OrderSpec {
self.order
}
fn direction(&self) -> Direction {
self.direction
}
fn access(&self) -> Option<&ExecutableAccessPath<'_, K>> {
self.access.as_ref()
}
fn initial_offset(&self) -> u32 {
self.initial_offset
}
}
#[expect(clippy::too_many_arguments)]
pub(in crate::db) fn validate_planned_cursor<K>(
cursor: Option<&[u8]>,
access: Option<ExecutableAccessPath<'_, K>>,
entity_path: &'static str,
entity_tag: EntityTag,
model: &EntityModel,
order: &OrderSpec,
expected_signature: ContinuationSignature,
direction: Direction,
expected_initial_offset: u32,
) -> Result<PlannedCursor, CursorPlanError>
where
K: FieldValue,
{
let Some(cursor) = cursor else {
return Ok(PlannedCursor::none());
};
let surface = CursorPlanSurfaceAdapter {
access,
model,
order,
direction,
initial_offset: expected_initial_offset,
};
let token = decode_validated_cursor(cursor, entity_path, expected_signature)?;
validate_structured_cursor(
token.boundary().clone(),
token.index_range_anchor().cloned(),
token.direction(),
token.initial_offset(),
entity_tag,
&surface,
true,
)
}
pub(in crate::db) fn validate_planned_cursor_state<K>(
cursor: PlannedCursor,
access: Option<ExecutableAccessPath<'_, K>>,
entity_tag: EntityTag,
model: &EntityModel,
order: &OrderSpec,
direction: Direction,
expected_initial_offset: u32,
) -> Result<PlannedCursor, CursorPlanError>
where
K: FieldValue,
{
if cursor.is_empty() {
return Ok(PlannedCursor::none());
}
let surface = CursorPlanSurfaceAdapter {
access,
model,
order,
direction,
initial_offset: expected_initial_offset,
};
let boundary = cursor.boundary().cloned().ok_or_else(|| {
CursorPlanError::continuation_cursor_invariant("continuation cursor boundary is missing")
})?;
let index_range_anchor = cursor
.index_range_anchor()
.map(ValidatedInEnvelopeIndexRangeCursorAnchor::as_unvalidated_anchor);
validate_structured_cursor(
boundary,
index_range_anchor,
direction,
cursor.initial_offset(),
entity_tag,
&surface,
false,
)
}
fn decode_validated_cursor(
cursor: &[u8],
entity_path: &'static str,
expected_signature: ContinuationSignature,
) -> Result<ContinuationToken, CursorPlanError> {
let token =
ContinuationToken::decode(cursor).map_err(CursorPlanError::from_token_wire_error)?;
validate_cursor_signature(entity_path, &expected_signature, &token.signature())?;
Ok(token)
}
fn validate_cursor_signature(
entity_path: &'static str,
expected_signature: &ContinuationSignature,
actual_signature: &ContinuationSignature,
) -> Result<(), CursorPlanError> {
if actual_signature != expected_signature {
return Err(CursorPlanError::continuation_cursor_signature_mismatch(
entity_path,
expected_signature,
actual_signature,
));
}
Ok(())
}
fn validate_structured_cursor<K: FieldValue, S: CursorPlanSurface<K>>(
boundary: CursorBoundary,
index_range_anchor: Option<IndexRangeCursorAnchor>,
actual_direction: Direction,
actual_initial_offset: u32,
entity_tag: EntityTag,
surface: &S,
require_index_range_anchor: bool,
) -> Result<PlannedCursor, CursorPlanError> {
let validated_index_range_anchor = validate_cursor_boundary_anchor_invariants(
&boundary,
index_range_anchor.as_ref(),
actual_direction,
actual_initial_offset,
entity_tag,
surface,
require_index_range_anchor,
)?;
Ok(PlannedCursor::new(
boundary,
validated_index_range_anchor,
actual_initial_offset,
))
}
fn validate_cursor_boundary_anchor_invariants<K: FieldValue, S: CursorPlanSurface<K>>(
boundary: &CursorBoundary,
index_range_anchor: Option<&IndexRangeCursorAnchor>,
actual_direction: Direction,
actual_initial_offset: u32,
entity_tag: EntityTag,
surface: &S,
require_index_range_anchor: bool,
) -> Result<Option<ValidatedInEnvelopeIndexRangeCursorAnchor>, CursorPlanError> {
let expected_direction = surface.direction();
validate_cursor_direction(expected_direction, actual_direction)?;
let expected_initial_offset = surface.initial_offset();
validate_cursor_window_offset(expected_initial_offset, actual_initial_offset)?;
let validated_index_range_anchor = validate_index_range_anchor(
index_range_anchor,
surface.access(),
entity_tag,
actual_direction,
require_index_range_anchor,
)?;
let pk_key = validate_cursor_boundary_for_order::<K>(
surface.entity_model(),
surface.order_spec(),
boundary,
)?;
validate_index_range_boundary_anchor_consistency(
validated_index_range_anchor.as_ref(),
surface.access(),
pk_key,
)?;
Ok(validated_index_range_anchor)
}
#[cfg(test)]
pub(in crate::db) fn validate_grouped_cursor(
cursor: Option<&[u8]>,
entity_path: &'static str,
continuation_signature: ContinuationSignature,
expected_direction: Direction,
expected_initial_offset: u32,
) -> Result<GroupedPlannedCursor, CursorPlanError> {
let Some(cursor) = cursor else {
return Ok(GroupedPlannedCursor::none());
};
let token =
GroupedContinuationToken::decode(cursor).map_err(CursorPlanError::from_token_wire_error)?;
validate_cursor_signature(entity_path, &continuation_signature, &token.signature())?;
validate_grouped_cursor_direction(expected_direction, token.direction())?;
validate_cursor_window_offset(expected_initial_offset, token.initial_offset())?;
Ok(GroupedPlannedCursor::new(
token.last_group_key().to_vec(),
token.initial_offset(),
))
}
pub(in crate::db) fn validate_grouped_cursor_token(
cursor: Option<GroupedContinuationToken>,
entity_path: &'static str,
continuation_signature: ContinuationSignature,
expected_direction: Direction,
expected_initial_offset: u32,
) -> Result<GroupedPlannedCursor, CursorPlanError> {
let Some(token) = cursor else {
return Ok(GroupedPlannedCursor::none());
};
let (signature, last_group_key, direction, initial_offset) = token.into_parts();
validate_cursor_signature(entity_path, &continuation_signature, &signature)?;
validate_grouped_cursor_direction(expected_direction, direction)?;
validate_cursor_window_offset(expected_initial_offset, initial_offset)?;
Ok(GroupedPlannedCursor::new(last_group_key, initial_offset))
}
pub(in crate::db) fn validate_grouped_cursor_state(
expected_initial_offset: u32,
cursor: GroupedPlannedCursor,
) -> Result<GroupedPlannedCursor, CursorPlanError> {
if cursor.is_empty() {
return Ok(GroupedPlannedCursor::none());
}
validate_cursor_window_offset(expected_initial_offset, cursor.initial_offset())?;
Ok(cursor)
}
fn validate_grouped_cursor_direction(
expected_direction: Direction,
actual_direction: Direction,
) -> Result<(), CursorPlanError> {
if actual_direction != expected_direction {
return Err(CursorPlanError::grouped_continuation_cursor_direction_mismatch());
}
Ok(())
}