use crate::{
db::{
cursor::{
ContinuationSignature, GroupedContinuationToken, GroupedPlannedCursor, PlannedCursor,
},
direction::Direction,
executor::{
PreparedLoadPlan,
continuation::scalar::{ResolvedScalarContinuationContext, ScalarContinuationContext},
},
query::plan::ExecutionOrdering,
},
error::InternalError,
value::Value,
};
pub(in crate::db::executor) struct ContinuationEngine;
impl ContinuationEngine {
pub(in crate::db::executor) fn resolve_load_cursor_context(
plan: &PreparedLoadPlan,
cursor: LoadCursorInput,
requested_shape: RequestedLoadExecutionShape,
) -> Result<ResolvedLoadCursorContext, InternalError> {
let ordering = plan.execution_ordering()?;
match (requested_shape, &ordering) {
(
RequestedLoadExecutionShape::Scalar,
ExecutionOrdering::PrimaryKey | ExecutionOrdering::Explicit(_),
)
| (RequestedLoadExecutionShape::Grouped, ExecutionOrdering::Grouped(_)) => {}
(RequestedLoadExecutionShape::Scalar, ExecutionOrdering::Grouped(_)) => {
return Err(InternalError::query_executor_invariant(
"grouped plans require grouped load execution mode",
));
}
(
RequestedLoadExecutionShape::Grouped,
ExecutionOrdering::PrimaryKey | ExecutionOrdering::Explicit(_),
) => {
return Err(InternalError::query_executor_invariant(
"grouped load execution mode requires grouped logical plans",
));
}
}
let cursor = match (requested_shape, cursor) {
(RequestedLoadExecutionShape::Scalar, LoadCursorInput::Scalar(cursor)) => {
let cursor = plan.revalidate_cursor(*cursor)?;
let continuation_signature = plan.continuation_signature_for_runtime()?;
let resolved = Self::resolve_scalar_context(cursor, continuation_signature);
PreparedLoadCursor::Scalar(Box::new(resolved))
}
(RequestedLoadExecutionShape::Grouped, LoadCursorInput::Grouped(cursor)) => {
PreparedLoadCursor::Grouped(plan.revalidate_grouped_cursor(cursor)?)
}
(RequestedLoadExecutionShape::Scalar, LoadCursorInput::Grouped(_)) => {
return Err(InternalError::query_executor_invariant(
"scalar load execution mode requires scalar cursor input",
));
}
(RequestedLoadExecutionShape::Grouped, LoadCursorInput::Scalar(_)) => {
return Err(InternalError::query_executor_invariant(
"grouped load execution mode requires grouped cursor input",
));
}
};
Ok(ResolvedLoadCursorContext::new(cursor))
}
#[must_use]
pub(in crate::db::executor) fn resolve_scalar_context(
cursor: PlannedCursor,
continuation_signature: ContinuationSignature,
) -> ResolvedScalarContinuationContext {
ResolvedScalarContinuationContext::new(
ScalarContinuationContext::new(cursor),
continuation_signature,
)
}
#[must_use]
pub(in crate::db::executor) const fn grouped_next_cursor_token(
continuation_signature: ContinuationSignature,
last_group_key: Vec<Value>,
resume_initial_offset: u32,
) -> GroupedContinuationToken {
GroupedContinuationToken::new_with_direction(
continuation_signature,
last_group_key,
Direction::Asc,
resume_initial_offset,
)
}
}
pub(in crate::db::executor) enum LoadCursorInput {
Scalar(Box<PlannedCursor>),
Grouped(GroupedPlannedCursor),
}
#[derive(Clone, Copy)]
pub(in crate::db::executor) enum RequestedLoadExecutionShape {
Scalar,
Grouped,
}
impl LoadCursorInput {
#[must_use]
pub(in crate::db::executor) fn scalar(cursor: impl Into<PlannedCursor>) -> Self {
Self::Scalar(Box::new(cursor.into()))
}
#[must_use]
pub(in crate::db::executor) fn grouped(cursor: impl Into<GroupedPlannedCursor>) -> Self {
Self::Grouped(cursor.into())
}
}
pub(in crate::db::executor) enum PreparedLoadCursor {
Scalar(Box<ResolvedScalarContinuationContext>),
Grouped(GroupedPlannedCursor),
}
pub(in crate::db::executor) struct ResolvedLoadCursorContext {
cursor: PreparedLoadCursor,
}
impl ResolvedLoadCursorContext {
#[must_use]
const fn new(cursor: PreparedLoadCursor) -> Self {
Self { cursor }
}
#[must_use]
pub(in crate::db::executor) fn into_cursor(self) -> PreparedLoadCursor {
self.cursor
}
}