use crate::{
db::{
cursor::CursorPlanError,
direction::Direction,
query::plan::{
OrderDirection, OrderSpec,
expr::{parse_supported_order_expr, supported_order_expr_field},
},
schema::{FieldType, SchemaInfo, literal_matches_type},
},
model::entity::EntityModel,
traits::FieldValue,
value::{StorageKey, Value},
};
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub(crate) enum CursorBoundarySlot {
Missing,
Present(Value),
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub(crate) struct CursorBoundary {
pub(crate) slots: Vec<CursorBoundarySlot>,
}
#[must_use]
pub(in crate::db) const fn apply_order_direction(
ordering: Ordering,
direction: OrderDirection,
) -> Ordering {
match direction {
OrderDirection::Asc => ordering,
OrderDirection::Desc => ordering.reverse(),
}
}
pub(in crate::db) fn validate_cursor_direction(
expected_direction: Direction,
actual_direction: Direction,
) -> Result<(), CursorPlanError> {
if actual_direction != expected_direction {
return Err(CursorPlanError::continuation_cursor_direction_mismatch());
}
Ok(())
}
pub(in crate::db) const fn validate_cursor_window_offset(
expected_initial_offset: u32,
actual_initial_offset: u32,
) -> Result<(), CursorPlanError> {
if actual_initial_offset != expected_initial_offset {
return Err(CursorPlanError::continuation_cursor_window_mismatch(
expected_initial_offset,
actual_initial_offset,
));
}
Ok(())
}
pub(in crate::db) const fn validate_cursor_boundary_arity(
boundary: &CursorBoundary,
expected_arity: usize,
) -> Result<(), CursorPlanError> {
if boundary.slots.len() != expected_arity {
return Err(
CursorPlanError::continuation_cursor_boundary_arity_mismatch(
expected_arity,
boundary.slots.len(),
),
);
}
Ok(())
}
pub(in crate::db) fn validate_cursor_boundary_for_order<K: FieldValue>(
model: &EntityModel,
order: &OrderSpec,
boundary: &CursorBoundary,
) -> Result<K, CursorPlanError> {
validate_cursor_boundary_arity(boundary, order.fields.len())?;
validate_cursor_boundary_types(model, order, boundary)?;
decode_typed_primary_key_cursor_slot::<K>(model, order, boundary)
}
fn boundary_schema(model: &EntityModel) -> &'static SchemaInfo {
SchemaInfo::cached_for_entity_model(model)
}
fn boundary_order_field_type<'a>(
schema: &'a SchemaInfo,
field: &str,
) -> Result<&'a FieldType, CursorPlanError> {
if let Some(expression) = parse_supported_order_expr(field) {
return schema
.field(
supported_order_expr_field(&expression)
.expect("supported order expression parsing must preserve one field argument")
.as_str(),
)
.ok_or_else(|| CursorPlanError::continuation_cursor_unknown_order_field(field));
}
schema
.field(field)
.ok_or_else(|| CursorPlanError::continuation_cursor_unknown_order_field(field))
}
fn primary_key_boundary_index(order: &OrderSpec, pk_field: &str) -> Result<usize, CursorPlanError> {
order
.fields
.iter()
.position(|(field, _)| field == pk_field)
.ok_or_else(|| {
CursorPlanError::continuation_cursor_primary_key_tie_break_required(pk_field)
})
}
pub(in crate::db) fn validate_cursor_boundary_types(
model: &EntityModel,
order: &OrderSpec,
boundary: &CursorBoundary,
) -> Result<(), CursorPlanError> {
let schema = boundary_schema(model);
for ((field, _), slot) in order.fields.iter().zip(boundary.slots.iter()) {
let field_type = boundary_order_field_type(schema, field)?;
match slot {
CursorBoundarySlot::Missing => {
if field == model.primary_key.name {
return Err(
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
field.clone(),
field_type.to_string(),
None,
),
);
}
}
CursorBoundarySlot::Present(value) => {
if !literal_matches_type(value, field_type) {
if field == model.primary_key.name {
return Err(
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
field.clone(),
field_type.to_string(),
Some(value.clone()),
),
);
}
return Err(CursorPlanError::continuation_cursor_boundary_type_mismatch(
field.clone(),
field_type.to_string(),
value.clone(),
));
}
if field == model.primary_key.name && Value::as_storage_key(value).is_none() {
return Err(
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
field.clone(),
field_type.to_string(),
Some(value.clone()),
),
);
}
}
}
}
Ok(())
}
pub(in crate::db) fn decode_typed_primary_key_cursor_slot<K: FieldValue>(
model: &EntityModel,
order: &OrderSpec,
boundary: &CursorBoundary,
) -> Result<K, CursorPlanError> {
let pk_field = model.primary_key.name;
let pk_index = primary_key_boundary_index(order, pk_field)?;
let schema = boundary_schema(model);
let expected = boundary_order_field_type(schema, pk_field)?.to_string();
let pk_slot = &boundary.slots[pk_index];
match pk_slot {
CursorBoundarySlot::Missing => Err(
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
pk_field.to_string(),
expected,
None,
),
),
CursorBoundarySlot::Present(value) => K::from_value(value).ok_or_else(|| {
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
pk_field.to_string(),
expected,
Some(value.clone()),
)
}),
}
}
pub(in crate::db) fn decode_structural_primary_key_cursor_slot_from_name(
pk_field: &str,
order: &OrderSpec,
boundary: &CursorBoundary,
) -> Result<StorageKey, CursorPlanError> {
let pk_index = primary_key_boundary_index(order, pk_field)?;
let expected = "storage key".to_string();
let pk_slot = &boundary.slots[pk_index];
match pk_slot {
CursorBoundarySlot::Missing => Err(
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
pk_field.to_string(),
expected,
None,
),
),
CursorBoundarySlot::Present(value) => value.as_storage_key().ok_or_else(|| {
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
pk_field.to_string(),
expected,
Some(value.clone()),
)
}),
}
}
pub(in crate::db) fn decode_pk_cursor_boundary_storage_key_for_name(
boundary: Option<&CursorBoundary>,
primary_key_name: &str,
) -> Result<Option<StorageKey>, CursorPlanError> {
let Some(boundary) = boundary else {
return Ok(None);
};
debug_assert_eq!(
boundary.slots.len(),
1,
"pk-ordered continuation boundaries are validated by the cursor spine",
);
let order = OrderSpec {
fields: vec![(primary_key_name.to_string(), OrderDirection::Asc)],
};
decode_structural_primary_key_cursor_slot_from_name(primary_key_name, &order, boundary)
.map(Some)
}