use crate::db::{data::primary_key_value_from_structural_value, key_taxonomy::PrimaryKeyValue};
use crate::{
db::{
cursor::CursorPlanError,
direction::Direction,
query::plan::{
OrderDirection, OrderSpec,
expr::{Expr, ExprType, infer_expr_type},
},
schema::{FieldType, SchemaInfo, literal_matches_type},
},
value::Value,
};
use serde::Deserialize;
use std::cmp::Ordering;
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
pub(crate) enum CursorBoundarySlot {
Missing,
Present(Value),
}
#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
pub(crate) struct CursorBoundary {
pub(crate) slots: Vec<CursorBoundarySlot>,
}
#[must_use]
pub(in crate::db) const fn apply_order_direction(
ordering: Ordering,
direction: OrderDirection,
) -> Ordering {
match direction {
OrderDirection::Asc => ordering,
OrderDirection::Desc => ordering.reverse(),
}
}
pub(in crate::db) fn validate_cursor_direction(
expected_direction: Direction,
actual_direction: Direction,
) -> Result<(), CursorPlanError> {
if actual_direction != expected_direction {
return Err(CursorPlanError::continuation_cursor_direction_mismatch());
}
Ok(())
}
pub(in crate::db) const fn validate_cursor_window_offset(
expected_initial_offset: u32,
actual_initial_offset: u32,
) -> Result<(), CursorPlanError> {
if actual_initial_offset != expected_initial_offset {
return Err(CursorPlanError::continuation_cursor_window_mismatch(
expected_initial_offset,
actual_initial_offset,
));
}
Ok(())
}
pub(in crate::db) const fn validate_cursor_boundary_arity(
boundary: &CursorBoundary,
expected_arity: usize,
) -> Result<(), CursorPlanError> {
if boundary.slots.len() != expected_arity {
return Err(
CursorPlanError::continuation_cursor_boundary_arity_mismatch(
expected_arity,
boundary.slots.len(),
),
);
}
Ok(())
}
pub(in crate::db) fn validate_cursor_boundary_for_order(
schema: &SchemaInfo,
order: &OrderSpec,
boundary: &CursorBoundary,
) -> Result<PrimaryKeyValue, CursorPlanError> {
validate_cursor_boundary_arity(boundary, order.fields.len())?;
validate_cursor_boundary_types(schema, order, boundary)?;
decode_structural_primary_key_cursor_slots(schema, order, boundary)
}
fn boundary_order_field_type<'a>(
schema: &'a SchemaInfo,
field: &str,
) -> Result<&'a FieldType, CursorPlanError> {
schema
.field(field)
.ok_or_else(|| CursorPlanError::continuation_cursor_unknown_order_field(field))
}
fn primary_key_boundary_index(order: &OrderSpec, pk_field: &str) -> Result<usize, CursorPlanError> {
order
.fields
.iter()
.position(|term| term.direct_field() == Some(pk_field))
.ok_or_else(|| {
CursorPlanError::continuation_cursor_primary_key_tie_break_required(pk_field)
})
}
pub(in crate::db) fn validate_cursor_boundary_types(
schema: &SchemaInfo,
order: &OrderSpec,
boundary: &CursorBoundary,
) -> Result<(), CursorPlanError> {
for (term, slot) in order.fields.iter().zip(boundary.slots.iter()) {
let field = term.direct_field();
let expression = field.is_none().then(|| term.expr().clone());
let field_type = match field {
Some(field) => Some(boundary_order_field_type(schema, field)?),
None => None,
};
let rendered = expression.as_ref().map_or_else(
|| {
field
.expect("field-backed order term should have field")
.to_owned()
},
|_| term.rendered_label(),
);
match slot {
CursorBoundarySlot::Missing => {
if field.is_some_and(|field| is_primary_key_field(schema, field)) {
return Err(
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
rendered,
boundary_order_expected_type_name(
schema,
field_type,
expression.as_ref(),
),
None,
),
);
}
}
CursorBoundarySlot::Present(value) => {
let type_matches = match (field_type, expression.as_ref()) {
(Some(field_type), None) => literal_matches_type(value, field_type),
(None, Some(expression)) => {
boundary_order_expression_value_matches(schema, expression, value)?
}
_ => false,
};
if !type_matches {
let expected =
boundary_order_expected_type_name(schema, field_type, expression.as_ref());
if field.is_some_and(|field| is_primary_key_field(schema, field)) {
return Err(
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
rendered,
expected,
Some(value.clone()),
),
);
}
return Err(CursorPlanError::continuation_cursor_boundary_type_mismatch(
rendered,
expected,
value.clone(),
));
}
if field.is_some_and(|field| is_primary_key_field(schema, field))
&& Value::as_primary_key_value(value).is_none()
{
return Err(
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
rendered,
boundary_order_expected_type_name(
schema,
field_type,
expression.as_ref(),
),
Some(value.clone()),
),
);
}
}
}
}
Ok(())
}
fn is_primary_key_field(schema: &SchemaInfo, field: &str) -> bool {
schema
.primary_key_names()
.iter()
.any(|primary_key_field| primary_key_field == field)
}
fn boundary_order_expected_type_name(
schema: &SchemaInfo,
field_type: Option<&FieldType>,
expression: Option<&Expr>,
) -> String {
if let Some(field_type) = field_type {
return field_type.to_string();
}
let Some(expression) = expression else {
return "unknown".to_string();
};
match infer_expr_type(expression, schema) {
Ok(ExprType::Bool) => "bool".to_string(),
Ok(ExprType::Blob) => "blob".to_string(),
Ok(ExprType::Text) => "text".to_string(),
Ok(ExprType::Numeric(_)) => "numeric".to_string(),
Ok(ExprType::Collection) => "collection".to_string(),
Ok(ExprType::Structured) => "structured".to_string(),
Ok(ExprType::Opaque) => "opaque".to_string(),
Ok(ExprType::Unknown) | Err(_) => "unknown".to_string(),
#[cfg(test)]
Ok(ExprType::Null) => "null".to_string(),
}
}
fn boundary_order_expression_value_matches(
schema: &SchemaInfo,
expression: &Expr,
value: &Value,
) -> Result<bool, CursorPlanError> {
let inferred = infer_expr_type(expression, schema)
.map_err(|_| CursorPlanError::continuation_cursor_unknown_order_field("expression"))?;
Ok(match inferred {
ExprType::Bool => matches!(value, Value::Bool(_)),
ExprType::Blob => matches!(value, Value::Blob(_)),
ExprType::Text => matches!(value, Value::Text(_) | Value::Enum(_)),
ExprType::Numeric(_) => matches!(
value,
Value::Int64(_)
| Value::Int128(_)
| Value::IntBig(_)
| Value::Nat64(_)
| Value::Nat128(_)
| Value::NatBig(_)
| Value::Decimal(_)
| Value::Float32(_)
| Value::Float64(_)
| Value::Date(_)
| Value::Duration(_)
| Value::Timestamp(_)
),
ExprType::Collection | ExprType::Structured | ExprType::Opaque | ExprType::Unknown => false,
#[cfg(test)]
ExprType::Null => matches!(value, Value::Null),
})
}
pub(in crate::db) fn decode_structural_primary_key_cursor_slots(
schema: &SchemaInfo,
order: &OrderSpec,
boundary: &CursorBoundary,
) -> Result<PrimaryKeyValue, CursorPlanError> {
let primary_key_fields: Vec<&str> = schema
.primary_key_names()
.iter()
.map(String::as_str)
.collect();
if let [primary_key_field] = primary_key_fields.as_slice() {
return decode_structural_primary_key_cursor_slot_from_name(
primary_key_field,
order,
boundary,
);
}
let mut values = Vec::with_capacity(primary_key_fields.len());
for primary_key_field in &primary_key_fields {
let index = primary_key_boundary_index(order, primary_key_field)?;
let slot = &boundary.slots[index];
match slot {
CursorBoundarySlot::Missing => {
return Err(
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
primary_key_field.to_string(),
"primary key value".to_string(),
None,
),
);
}
CursorBoundarySlot::Present(value) => values.push(value.clone()),
}
}
primary_key_value_from_structural_value(&Value::List(values)).map_err(|_| {
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
primary_key_fields.join(", "),
"primary key value".to_string(),
None,
)
})
}
pub(in crate::db) fn decode_structural_primary_key_cursor_slot_from_name(
pk_field: &str,
order: &OrderSpec,
boundary: &CursorBoundary,
) -> Result<PrimaryKeyValue, CursorPlanError> {
let pk_index = primary_key_boundary_index(order, pk_field)?;
let expected = "primary key value".to_string();
let pk_slot = &boundary.slots[pk_index];
match pk_slot {
CursorBoundarySlot::Missing => Err(
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
pk_field.to_string(),
expected,
None,
),
),
CursorBoundarySlot::Present(value) => primary_key_value_from_structural_value(value)
.map_err(|_| {
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
pk_field.to_string(),
expected,
Some(value.clone()),
)
}),
}
}
pub(in crate::db) fn decode_pk_cursor_boundary_primary_key_value_for_names(
boundary: Option<&CursorBoundary>,
primary_key_names: &[&str],
) -> Result<Option<PrimaryKeyValue>, CursorPlanError> {
let Some(boundary) = boundary else {
return Ok(None);
};
debug_assert_eq!(
boundary.slots.len(),
primary_key_names.len(),
"pk-ordered continuation boundaries are validated by the cursor spine",
);
let order = OrderSpec {
fields: primary_key_names
.iter()
.map(|field| crate::db::query::plan::OrderTerm::field(*field, OrderDirection::Asc))
.collect(),
};
if let [primary_key_name] = primary_key_names {
return decode_structural_primary_key_cursor_slot_from_name(
primary_key_name,
&order,
boundary,
)
.map(Some);
}
let mut values = Vec::with_capacity(primary_key_names.len());
for primary_key_name in primary_key_names {
let index = primary_key_boundary_index(&order, primary_key_name)?;
let slot = &boundary.slots[index];
match slot {
CursorBoundarySlot::Missing => {
return Err(
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
(*primary_key_name).to_string(),
"primary key value".to_string(),
None,
),
);
}
CursorBoundarySlot::Present(value) => values.push(value.clone()),
}
}
primary_key_value_from_structural_value(&Value::List(values))
.map(Some)
.map_err(|_| {
CursorPlanError::continuation_cursor_primary_key_type_mismatch(
primary_key_names.join(", "),
"primary key value".to_string(),
None,
)
})
}
#[cfg(test)]
mod tests {
use super::{
CursorBoundary, CursorBoundarySlot, decode_pk_cursor_boundary_primary_key_value_for_names,
};
use crate::{
db::key_taxonomy::{CompositePrimaryKeyValue, PrimaryKeyComponent, PrimaryKeyValue},
value::Value,
};
#[test]
fn pk_ordered_boundary_decode_accepts_ordered_composite_primary_key_fields() {
let boundary = CursorBoundary {
slots: vec![
CursorBoundarySlot::Present(Value::Nat64(7)),
CursorBoundarySlot::Present(Value::Int64(-3)),
],
};
let composite = CompositePrimaryKeyValue::try_from_components(&[
PrimaryKeyComponent::Nat64(7),
PrimaryKeyComponent::Int64(-3),
])
.expect("fixture components should form composite key");
let decoded = decode_pk_cursor_boundary_primary_key_value_for_names(
Some(&boundary),
&["tenant", "id"],
)
.expect("composite pk cursor boundary should decode");
assert_eq!(decoded, Some(PrimaryKeyValue::Composite(composite)));
}
}