use crate::ast::FieldRef;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PathKeyDirection {
Ascending,
Descending,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PathKeyNulls {
First,
Last,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PathKey {
pub field: FieldRef,
pub direction: PathKeyDirection,
pub nulls: PathKeyNulls,
}
impl PathKey {
pub fn asc(field: FieldRef) -> Self {
Self {
field,
direction: PathKeyDirection::Ascending,
nulls: PathKeyNulls::Last,
}
}
pub fn desc(field: FieldRef) -> Self {
Self {
field,
direction: PathKeyDirection::Descending,
nulls: PathKeyNulls::First,
}
}
pub fn same_column_and_direction(&self, other: &PathKey) -> bool {
self.field == other.field && self.direction == other.direction
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct PathKeys {
pub keys: Vec<PathKey>,
}
impl PathKeys {
pub fn unordered() -> Self {
Self { keys: Vec::new() }
}
pub fn asc(field: FieldRef) -> Self {
Self {
keys: vec![PathKey::asc(field)],
}
}
pub fn desc(field: FieldRef) -> Self {
Self {
keys: vec![PathKey::desc(field)],
}
}
pub fn is_unordered(&self) -> bool {
self.keys.is_empty()
}
pub fn prefix_match(&self, required: &PathKeys) -> usize {
let mut matched = 0;
for (mine, req) in self.keys.iter().zip(required.keys.iter()) {
if mine.same_column_and_direction(req) {
matched += 1;
} else {
break;
}
}
matched
}
pub fn satisfies(&self, required: &PathKeys) -> bool {
if required.keys.len() > self.keys.len() {
return false;
}
self.prefix_match(required) == required.keys.len()
}
pub fn appended(&self, key: PathKey) -> Self {
let mut keys = self.keys.clone();
keys.push(key);
Self { keys }
}
pub fn truncated(&self, n: usize) -> Self {
Self {
keys: self.keys.iter().take(n).cloned().collect(),
}
}
pub fn len(&self) -> usize {
self.keys.len()
}
pub fn is_empty(&self) -> bool {
self.keys.is_empty()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SortStrategy {
None,
Incremental { prefix_len: usize },
Full,
}
pub fn plan_sort(input_order: &PathKeys, required_order: &PathKeys) -> SortStrategy {
if required_order.is_unordered() {
return SortStrategy::None;
}
if input_order.satisfies(required_order) {
return SortStrategy::None;
}
let prefix = input_order.prefix_match(required_order);
if prefix == 0 {
return SortStrategy::Full;
}
SortStrategy::Incremental { prefix_len: prefix }
}
#[cfg(test)]
mod tests {
use super::*;
fn col(name: &str) -> FieldRef {
FieldRef::column("", name)
}
#[test]
fn constructors_apply_sql_default_null_placement() {
let asc = PathKey::asc(col("created_at"));
assert_eq!(asc.direction, PathKeyDirection::Ascending);
assert_eq!(asc.nulls, PathKeyNulls::Last);
let desc = PathKey::desc(col("score"));
assert_eq!(desc.direction, PathKeyDirection::Descending);
assert_eq!(desc.nulls, PathKeyNulls::First);
}
#[test]
fn prefix_matching_ignores_null_placement_but_not_direction_or_column() {
let input = PathKeys {
keys: vec![
PathKey {
field: col("tenant_id"),
direction: PathKeyDirection::Ascending,
nulls: PathKeyNulls::First,
},
PathKey::desc(col("created_at")),
],
};
let required = PathKeys {
keys: vec![
PathKey::asc(col("tenant_id")),
PathKey::desc(col("created_at")),
PathKey::asc(col("id")),
],
};
assert_eq!(input.prefix_match(&required), 2);
let wrong_direction = PathKeys::desc(col("tenant_id"));
assert_eq!(input.prefix_match(&wrong_direction), 0);
let wrong_column = PathKeys::asc(col("account_id"));
assert_eq!(input.prefix_match(&wrong_column), 0);
}
#[test]
fn satisfies_requires_all_required_keys_in_order() {
let input = PathKeys::asc(col("a"))
.appended(PathKey::asc(col("b")))
.appended(PathKey::desc(col("c")));
assert!(input.satisfies(&PathKeys::asc(col("a"))));
assert!(input.satisfies(&PathKeys::asc(col("a")).appended(PathKey::asc(col("b")))));
assert!(!PathKeys::asc(col("a")).satisfies(&input));
assert!(!input.satisfies(&PathKeys::asc(col("b"))));
}
#[test]
fn appended_truncated_and_empty_report_stable_lengths() {
let unordered = PathKeys::unordered();
assert!(unordered.is_unordered());
assert!(unordered.is_empty());
assert_eq!(unordered.len(), 0);
let keys = unordered
.appended(PathKey::asc(col("a")))
.appended(PathKey::desc(col("b")));
assert_eq!(keys.len(), 2);
assert_eq!(keys.truncated(1), PathKeys::asc(col("a")));
assert_eq!(keys.truncated(99), keys);
}
#[test]
fn plan_sort_distinguishes_none_incremental_and_full() {
let input = PathKeys::asc(col("tenant_id")).appended(PathKey::desc(col("created_at")));
let exact = input.clone();
let extension = exact.clone().appended(PathKey::asc(col("id")));
let unrelated = PathKeys::asc(col("status"));
assert_eq!(
plan_sort(&input, &PathKeys::unordered()),
SortStrategy::None
);
assert_eq!(plan_sort(&input, &exact), SortStrategy::None);
assert_eq!(
plan_sort(&input, &extension),
SortStrategy::Incremental { prefix_len: 2 }
);
assert_eq!(plan_sort(&input, &unrelated), SortStrategy::Full);
}
}