use crate::{
db::{
contracts::canonical_value_compare,
cursor::CursorPlanError,
direction::Direction,
query::plan::{OrderDirection, OrderSpec},
schema::{FieldType, SchemaInfo, literal_matches_type},
},
model::entity::{EntityModel, resolve_field_slot},
traits::FieldValue,
value::{StorageKey, Value},
};
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub(crate) enum CursorBoundarySlot {
Missing,
Present(Value),
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub(crate) struct CursorBoundary {
pub(crate) slots: Vec<CursorBoundarySlot>,
}
impl CursorBoundary {
#[must_use]
pub(in crate::db) fn from_slot_reader<F>(
model: &EntityModel,
order: &OrderSpec,
read_slot: &mut F,
) -> Self
where
F: FnMut(usize) -> Option<Value>,
{
Self {
slots: boundary_slots_from_slot_reader(model, order, read_slot),
}
}
}
#[must_use]
pub(in crate::db) fn boundary_slots_from_slot_reader<F>(
model: &EntityModel,
order: &OrderSpec,
read_slot: &mut F,
) -> Vec<CursorBoundarySlot>
where
F: FnMut(usize) -> Option<Value>,
{
order
.fields
.iter()
.map(|(field, _)| {
let value = resolve_field_slot(model, field).and_then(&mut *read_slot);
match value {
Some(value) => CursorBoundarySlot::Present(value),
None => CursorBoundarySlot::Missing,
}
})
.collect()
}
#[must_use]
pub(in crate::db) fn compare_boundary_slots(
left: &CursorBoundarySlot,
right: &CursorBoundarySlot,
) -> Ordering {
match (left, right) {
(CursorBoundarySlot::Missing, CursorBoundarySlot::Missing) => Ordering::Equal,
(CursorBoundarySlot::Missing, CursorBoundarySlot::Present(_)) => Ordering::Less,
(CursorBoundarySlot::Present(_), CursorBoundarySlot::Missing) => Ordering::Greater,
(CursorBoundarySlot::Present(left_value), CursorBoundarySlot::Present(right_value)) => {
canonical_value_compare(left_value, right_value)
}
}
}
#[must_use]
pub(in crate::db) const fn apply_order_direction(
ordering: Ordering,
direction: OrderDirection,
) -> Ordering {
match direction {
OrderDirection::Asc => ordering,
OrderDirection::Desc => ordering.reverse(),
}
}
pub(in crate::db) fn validate_cursor_direction(
expected_direction: Direction,
actual_direction: Direction,
) -> Result<(), CursorPlanError> {
if actual_direction != expected_direction {
return Err(CursorPlanError::continuation_cursor_direction_mismatch());
}
Ok(())
}
pub(in crate::db) const fn validate_cursor_window_offset(
expected_initial_offset: u32,
actual_initial_offset: u32,
) -> Result<(), CursorPlanError> {
if actual_initial_offset != expected_initial_offset {
return Err(CursorPlanError::continuation_cursor_window_mismatch(
expected_initial_offset,
actual_initial_offset,
));
}
Ok(())
}
pub(in crate::db) const fn validate_cursor_boundary_arity(
boundary: &CursorBoundary,
expected_arity: usize,
) -> Result<(), CursorPlanError> {
if boundary.slots.len() != expected_arity {
return Err(
CursorPlanError::continuation_cursor_boundary_arity_mismatch(
expected_arity,
boundary.slots.len(),
),
);
}
Ok(())
}
pub(in crate::db) fn validate_cursor_boundary_for_order<K: FieldValue>(
model: &EntityModel,
order: &OrderSpec,
boundary: &CursorBoundary,
) -> Result<K, CursorPlanError> {
validate_cursor_boundary_arity(boundary, order.fields.len())?;
validate_cursor_boundary_types(model, order, boundary)?;
decode_typed_primary_key_cursor_slot::<K>(model, order, boundary)
}
fn boundary_schema(model: &EntityModel) -> Result<SchemaInfo, CursorPlanError> {
SchemaInfo::from_entity_model(model)
.map_err(CursorPlanError::invalid_continuation_cursor_schema)
}
fn boundary_order_field_type<'a>(
schema: &'a SchemaInfo,
field: &str,
) -> Result<&'a FieldType, CursorPlanError> {
schema
.field(field)
.ok_or_else(|| CursorPlanError::continuation_cursor_unknown_order_field(field))
}
fn primary_key_boundary_index(order: &OrderSpec, pk_field: &str) -> Result<usize, CursorPlanError> {
order
.fields
.iter()
.position(|(field, _)| field == pk_field)
.ok_or_else(|| {
CursorPlanError::continuation_cursor_primary_key_tie_break_required(pk_field)
})
}
pub(in crate::db) fn validate_cursor_boundary_types(
model: &EntityModel,
order: &OrderSpec,
boundary: &CursorBoundary,
) -> Result<(), CursorPlanError> {
let schema = boundary_schema(model)?;
for ((field, _), slot) in order.fields.iter().zip(boundary.slots.iter()) {
let field_type = boundary_order_field_type(&schema, field)?;
match slot {
CursorBoundarySlot::Missing => {
if field == model.primary_key.name {
return Err(
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
field.clone(),
field_type.to_string(),
None,
),
);
}
}
CursorBoundarySlot::Present(value) => {
if !literal_matches_type(value, field_type) {
if field == model.primary_key.name {
return Err(
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
field.clone(),
field_type.to_string(),
Some(value.clone()),
),
);
}
return Err(CursorPlanError::continuation_cursor_boundary_type_mismatch(
field.clone(),
field_type.to_string(),
value.clone(),
));
}
if field == model.primary_key.name && Value::as_storage_key(value).is_none() {
return Err(
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
field.clone(),
field_type.to_string(),
Some(value.clone()),
),
);
}
}
}
}
Ok(())
}
pub(in crate::db) fn decode_typed_primary_key_cursor_slot<K: FieldValue>(
model: &EntityModel,
order: &OrderSpec,
boundary: &CursorBoundary,
) -> Result<K, CursorPlanError> {
let pk_field = model.primary_key.name;
let pk_index = primary_key_boundary_index(order, pk_field)?;
let schema = boundary_schema(model)?;
let expected = boundary_order_field_type(&schema, pk_field)?.to_string();
let pk_slot = &boundary.slots[pk_index];
match pk_slot {
CursorBoundarySlot::Missing => Err(
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
pk_field.to_string(),
expected,
None,
),
),
CursorBoundarySlot::Present(value) => K::from_value(value).ok_or_else(|| {
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
pk_field.to_string(),
expected,
Some(value.clone()),
)
}),
}
}
pub(in crate::db) fn decode_structural_primary_key_cursor_slot(
model: &EntityModel,
order: &OrderSpec,
boundary: &CursorBoundary,
) -> Result<StorageKey, CursorPlanError> {
let pk_field = model.primary_key.name;
let pk_index = primary_key_boundary_index(order, pk_field)?;
let schema = boundary_schema(model)?;
let expected = boundary_order_field_type(&schema, pk_field)?.to_string();
let pk_slot = &boundary.slots[pk_index];
match pk_slot {
CursorBoundarySlot::Missing => Err(
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
pk_field.to_string(),
expected,
None,
),
),
CursorBoundarySlot::Present(value) => value.as_storage_key().ok_or_else(|| {
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
pk_field.to_string(),
expected,
Some(value.clone()),
)
}),
}
}
pub(in crate::db) fn decode_pk_cursor_boundary_storage_key(
boundary: Option<&CursorBoundary>,
model: &EntityModel,
) -> Result<Option<StorageKey>, CursorPlanError> {
let Some(boundary) = boundary else {
return Ok(None);
};
debug_assert_eq!(
boundary.slots.len(),
1,
"pk-ordered continuation boundaries are validated by the cursor spine",
);
let order = OrderSpec {
fields: vec![(model.primary_key.name.to_string(), OrderDirection::Asc)],
};
decode_structural_primary_key_cursor_slot(model, &order, boundary).map(Some)
}