use crate::KeyRange;
use std::ops::{Bound, RangeBounds};
pub trait Ranged {
fn key_range(&self) -> &KeyRange;
}
pub struct Indexed<T: Ranged> {
inner: T,
}
impl<T: Ranged> Ranged for Indexed<T> {
fn key_range(&self) -> &KeyRange {
self.inner.key_range()
}
}
impl<T: Ranged> std::ops::Deref for Indexed<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Run<T: Ranged>(Vec<T>);
impl<T: Ranged> std::ops::Deref for Run<T> {
type Target = [T];
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T: Ranged> Run<T> {
pub fn new(items: Vec<T>) -> Option<Self> {
if items.is_empty() {
None
} else {
Some(Self(items))
}
}
pub fn inner_mut(&mut self) -> &mut Vec<T> {
&mut self.0
}
pub fn push(&mut self, item: T) {
self.0.push(item);
self.0
.sort_by(|a, b| a.key_range().min().cmp(b.key_range().min()));
}
pub fn extend(&mut self, items: Vec<T>) {
self.0.extend(items);
}
pub fn retain<F>(&mut self, f: F)
where
F: FnMut(&T) -> bool,
{
self.0.retain(f);
}
pub fn remove(&mut self, idx: usize) -> T {
self.0.remove(idx)
}
pub fn get_for_key(&self, key: &[u8]) -> Option<&T> {
let idx = self.partition_point(|x| x.key_range().max() < &key);
self.0.get(idx).filter(|x| x.key_range().min() <= &key)
}
pub fn aggregate_key_range(&self) -> KeyRange {
#[expect(clippy::expect_used, reason = "by definition, runs are never empty")]
let lo = self.first().expect("run should never be empty");
#[expect(clippy::expect_used, reason = "by definition, runs are never empty")]
let hi = self.last().expect("run should never be empty");
KeyRange::new((lo.key_range().min().clone(), hi.key_range().max().clone()))
}
pub fn get_overlapping<'a>(&'a self, key_range: &'a KeyRange) -> &'a [T] {
let range = key_range.min()..=key_range.max();
let Some((lo, hi)) = self.range_overlap_indexes::<crate::Slice, _>(&range) else {
return &[];
};
self.get(lo..=hi).unwrap_or_default()
}
pub fn get_contained<'a>(&'a self, key_range: &KeyRange) -> &'a [T] {
fn trim_slice<T, F>(s: &[T], pred: F) -> &[T]
where
F: Fn(&T) -> bool,
{
let start = s.iter().position(&pred).unwrap_or(s.len());
let end = s.iter().rposition(&pred).map_or(start, |i| i + 1);
s.get(start..end).expect("should be in range")
}
let range = key_range.min()..=key_range.max();
let Some((lo, hi)) = self.range_overlap_indexes::<crate::Slice, _>(&range) else {
return &[];
};
self.get(lo..=hi)
.map(|slice| trim_slice(slice, |x| key_range.contains_range(x.key_range())))
.unwrap_or_default()
}
pub fn range_overlap_indexes<K: AsRef<[u8]>, R: RangeBounds<K>>(
&self,
key_range: &R,
) -> Option<(usize, usize)> {
let level = &self.0;
let lo = match key_range.start_bound() {
Bound::Unbounded => 0,
Bound::Included(start_key) => {
level.partition_point(|x| x.key_range().max() < start_key)
}
Bound::Excluded(start_key) => {
level.partition_point(|x| x.key_range().max() <= start_key)
}
};
if lo >= level.len() {
return None;
}
#[expect(clippy::indexing_slicing)]
let truncated_level = &level[lo..];
let hi = match key_range.end_bound() {
Bound::Unbounded => level.len() - 1,
Bound::Included(end_key) => {
let idx = lo + truncated_level.partition_point(|x| x.key_range().min() <= end_key);
if idx == 0 {
return None;
}
idx.saturating_sub(1) }
Bound::Excluded(end_key) => {
let idx = lo + truncated_level.partition_point(|x| x.key_range().min() < end_key);
if idx == 0 {
return None;
}
idx.saturating_sub(1) }
};
if lo > hi {
return None;
}
Some((lo, hi))
}
}
#[cfg(test)]
#[expect(clippy::unwrap_used)]
mod tests {
use super::*;
use test_log::test;
#[derive(Clone)]
struct FakeTable {
id: u64,
key_range: KeyRange,
}
impl Ranged for FakeTable {
fn key_range(&self) -> &KeyRange {
&self.key_range
}
}
fn s(id: u64, min: &str, max: &str) -> FakeTable {
FakeTable {
id,
key_range: KeyRange::new((min.as_bytes().into(), max.as_bytes().into())),
}
}
#[test]
fn run_aggregate_key_range() {
let items = vec![
s(0, "a", "d"),
s(1, "e", "j"),
s(2, "k", "o"),
s(3, "p", "z"),
];
let run = Run(items);
assert_eq!(
KeyRange::new((b"a".into(), b"z".into())),
run.aggregate_key_range(),
);
}
#[test]
fn run_point_lookup() {
let items = vec![
s(0, "a", "d"),
s(1, "e", "j"),
s(2, "k", "o"),
s(3, "p", "z"),
];
let run = Run(items);
assert_eq!(0, run.get_for_key(b"a").unwrap().id);
assert_eq!(0, run.get_for_key(b"aaa").unwrap().id);
assert_eq!(0, run.get_for_key(b"b").unwrap().id);
assert_eq!(0, run.get_for_key(b"c").unwrap().id);
assert_eq!(0, run.get_for_key(b"d").unwrap().id);
assert_eq!(1, run.get_for_key(b"e").unwrap().id);
assert_eq!(1, run.get_for_key(b"j").unwrap().id);
assert_eq!(2, run.get_for_key(b"k").unwrap().id);
assert_eq!(2, run.get_for_key(b"o").unwrap().id);
assert_eq!(3, run.get_for_key(b"p").unwrap().id);
assert_eq!(3, run.get_for_key(b"z").unwrap().id);
assert!(run.get_for_key(b"zzz").is_none());
}
#[test]
fn run_range_culling() {
let items = vec![
s(0, "a", "d"),
s(1, "e", "j"),
s(2, "k", "o"),
s(3, "p", "z"),
];
let run = Run(items);
assert_eq!(Some((0, 3)), run.range_overlap_indexes::<&[u8], _>(&..));
assert_eq!(
Some((0, 0)),
run.range_overlap_indexes(&(b"a" as &[u8]..=b"a"))
);
assert_eq!(
Some((0, 0)),
run.range_overlap_indexes(&(b"a" as &[u8]..=b"b"))
);
assert_eq!(
Some((0, 0)),
run.range_overlap_indexes(&(b"a" as &[u8]..=b"d"))
);
assert_eq!(
Some((0, 0)),
run.range_overlap_indexes(&(b"d" as &[u8]..=b"d"))
);
assert_eq!(
Some((0, 0)),
run.range_overlap_indexes(&(b"a" as &[u8]..b"d"))
);
assert_eq!(
Some((0, 1)),
run.range_overlap_indexes(&(b"a" as &[u8]..=b"g"))
);
assert_eq!(
Some((1, 1)),
run.range_overlap_indexes(&(b"j" as &[u8]..=b"j"))
);
assert_eq!(
Some((0, 3)),
run.range_overlap_indexes(&(b"a" as &[u8]..=b"z"))
);
assert_eq!(
Some((3, 3)),
run.range_overlap_indexes(&(b"z" as &[u8]..=b"zzz"))
);
assert_eq!(Some((3, 3)), run.range_overlap_indexes(&(b"z" as &[u8]..)));
assert!(run
.range_overlap_indexes(&(b"zzz" as &[u8]..=b"zzzzzzz"))
.is_none());
}
#[test]
fn run_range_contained() {
use crate::TableId;
let items = vec![
s(0, "a", "d"),
s(1, "e", "j"),
s(2, "k", "o"),
s(3, "p", "z"),
];
let run = Run(items);
assert_eq!(
&[] as &[TableId],
&*run
.get_contained(&KeyRange::new((b"a".into(), b"a".into())))
.iter()
.map(|x| x.id)
.collect::<Vec<_>>(),
);
assert_eq!(
&[0],
&*run
.get_contained(&KeyRange::new((b"a".into(), b"d".into())))
.iter()
.map(|x| x.id)
.collect::<Vec<_>>(),
);
assert_eq!(
&[0, 1],
&*run
.get_contained(&KeyRange::new((b"a".into(), b"j".into())))
.iter()
.map(|x| x.id)
.collect::<Vec<_>>(),
);
assert_eq!(
&[0, 1],
&*run
.get_contained(&KeyRange::new((b"a".into(), b"k".into())))
.iter()
.map(|x| x.id)
.collect::<Vec<_>>(),
);
assert_eq!(
&[0, 1],
&*run
.get_contained(&KeyRange::new((b"a".into(), b"l".into())))
.iter()
.map(|x| x.id)
.collect::<Vec<_>>(),
);
assert_eq!(
&[0, 1, 2, 3],
&*run
.get_contained(&KeyRange::new((b"a".into(), b"z".into())))
.iter()
.map(|x| x.id)
.collect::<Vec<_>>(),
);
}
#[test]
fn run_range_overlaps() {
let items = vec![
s(0, "a", "d"),
s(1, "e", "j"),
s(2, "k", "o"),
s(3, "p", "z"),
];
let run = Run(items);
assert_eq!(
&[0],
&*run
.get_overlapping(&KeyRange::new((b"a".into(), b"a".into())))
.iter()
.map(|x| x.id)
.collect::<Vec<_>>(),
);
assert_eq!(
&[0],
&*run
.get_overlapping(&KeyRange::new((b"d".into(), b"d".into())))
.iter()
.map(|x| x.id)
.collect::<Vec<_>>(),
);
assert_eq!(
&[0],
&*run
.get_overlapping(&KeyRange::new((b"a".into(), b"d".into())))
.iter()
.map(|x| x.id)
.collect::<Vec<_>>(),
);
assert_eq!(
&[0, 1],
&*run
.get_overlapping(&KeyRange::new((b"a".into(), b"f".into())))
.iter()
.map(|x| x.id)
.collect::<Vec<_>>(),
);
assert_eq!(
&[0, 1, 2, 3],
&*run
.get_overlapping(&KeyRange::new((b"a".into(), b"zzz".into())))
.iter()
.map(|x| x.id)
.collect::<Vec<_>>(),
);
assert_eq!(
&[] as &[u64],
&*run
.get_overlapping(&KeyRange::new((b"zzz".into(), b"zzzz".into())))
.iter()
.map(|x| x.id)
.collect::<Vec<_>>(),
);
}
}