use std::cmp::Ordering;
use std::collections::HashMap;
use std::hash::Hash;
use std::ops::{Bound, Deref};
use std::{fmt, io, iter};
use b_tree::collate::*;
use crate::plan::QueryPlan;
pub use b_tree::Schema as BTreeSchema;
#[derive(Copy, Clone, Eq, PartialEq, Hash)]
pub enum IndexId<'a> {
Primary,
Auxiliary(&'a str),
}
impl<'a> Default for IndexId<'a> {
fn default() -> Self {
Self::Primary
}
}
impl<'a> From<&'a str> for IndexId<'a> {
fn from(id: &'a str) -> Self {
Self::Auxiliary(id)
}
}
impl<'a> From<&'a String> for IndexId<'a> {
fn from(id: &'a String) -> Self {
Self::Auxiliary(id)
}
}
impl<'a> fmt::Debug for IndexId<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Primary => f.write_str("primary"),
Self::Auxiliary(id) => id.fmt(f),
}
}
}
pub trait IndexSchema: BTreeSchema + Clone + Send + Sync + 'static {
type Id: Hash + Eq + Clone + fmt::Debug + fmt::Display + 'static;
fn columns(&self) -> &[Self::Id];
fn supports(&self, range: &Range<Self::Id, Self::Value>) -> bool {
range_is_supported(&range.columns, self.columns())
}
}
pub trait Schema: Eq + Send + Sync + Sized + fmt::Debug {
type Id: Hash + Eq + Clone + Send + Sync + fmt::Debug + fmt::Display + 'static;
type Error: std::error::Error + From<io::Error>;
type Value: Clone + Eq + Send + Sync + fmt::Debug + 'static;
type Index: IndexSchema<Error = Self::Error, Id = Self::Id, Value = Self::Value>
+ Send
+ Sync
+ 'static;
fn key(&self) -> &[Self::Id];
fn values(&self) -> &[Self::Id];
fn primary(&self) -> &Self::Index;
fn auxiliary(&self) -> &[(String, Self::Index)];
fn validate_key(&self, key: Vec<Self::Value>) -> Result<Vec<Self::Value>, Self::Error>;
fn validate_values(&self, values: Vec<Self::Value>) -> Result<Vec<Self::Value>, Self::Error>;
}
#[derive(Copy, Clone, Eq, PartialEq)]
pub enum ColumnRange<V> {
Eq(V),
In((Bound<V>, Bound<V>)),
}
impl<V> ColumnRange<V> {
pub fn is_range(&self) -> bool {
match self {
Self::Eq(_) => false,
Self::In(_) => true,
}
}
}
impl<C> OverlapsRange<Self, C> for ColumnRange<C::Value>
where
C: Collate,
C::Value: fmt::Debug,
std::ops::Range<C::Value>: OverlapsRange<std::ops::Range<C::Value>, C>,
{
fn overlaps(&self, other: &Self, collator: &C) -> Overlap {
match (self, other) {
(Self::Eq(this), Self::Eq(that)) => match collator.cmp(this, that) {
Ordering::Less => Overlap::Less,
Ordering::Equal => Overlap::Equal,
Ordering::Greater => Overlap::Greater,
},
(Self::In(this), Self::Eq(that)) => this.overlaps_value(that, collator),
(Self::Eq(this), Self::In(that)) => match that.overlaps_value(this, collator) {
Overlap::Equal => Overlap::Equal,
Overlap::Less => Overlap::Greater,
Overlap::WideLess => Overlap::WideGreater,
Overlap::Wide => Overlap::Narrow,
Overlap::WideGreater => Overlap::WideLess,
Overlap::Greater => Overlap::Less,
Overlap::Narrow => unreachable!("{:?} is narrower than {:?}", that, this),
},
(Self::In(this), Self::In(that)) => this.overlaps(that, collator),
}
}
}
impl<V> From<V> for ColumnRange<V> {
fn from(value: V) -> Self {
Self::Eq(value)
}
}
impl<V> From<std::ops::Range<V>> for ColumnRange<V> {
fn from(range: std::ops::Range<V>) -> Self {
Self::In((Bound::Included(range.start), Bound::Excluded(range.end)))
}
}
impl<V: fmt::Debug> fmt::Debug for ColumnRange<V> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Self::Eq(value) => write!(f, "{:?}", value),
Self::In((start, end)) => {
match start {
Bound::Unbounded => f.write_str("[."),
Bound::Included(start) => write!(f, "[{start:?}."),
Bound::Excluded(start) => write!(f, "({start:?}."),
}?;
match end {
Bound::Unbounded => f.write_str(".]"),
Bound::Included(end) => write!(f, ".{end:?}]"),
Bound::Excluded(end) => write!(f, ".{end:?})"),
}
}
}
}
}
#[derive(Clone)]
pub struct Range<K, V> {
columns: HashMap<K, ColumnRange<V>>,
}
impl<K, V> Default for Range<K, V> {
fn default() -> Self {
Self {
columns: HashMap::with_capacity(0),
}
}
}
impl<K, V> Range<K, V> {
pub fn inner(&self) -> &HashMap<K, ColumnRange<V>> {
&self.columns
}
pub fn into_inner(self) -> HashMap<K, ColumnRange<V>> {
self.columns
}
pub fn is_default(&self) -> bool {
self.columns.is_empty()
}
pub fn len(&self) -> usize {
self.columns.len()
}
}
impl<K: Eq + Hash, V> Range<K, V> {
pub fn get(&self, column: &K) -> Option<&ColumnRange<V>> {
self.columns.get(column)
}
}
impl<C, K> OverlapsRange<Self, C> for Range<K, C::Value>
where
K: Eq + Hash,
C: Collate,
C::Value: fmt::Debug,
{
fn overlaps(&self, other: &Self, collator: &C) -> Overlap {
let mut overlap: Option<Overlap> = None;
for name in other.columns.keys() {
if !self.columns.contains_key(name) {
return Overlap::Wide;
}
}
for (name, this) in &self.columns {
let column_overlap = other
.columns
.get(name)
.map(|that| this.overlaps(that, collator))
.unwrap_or(Overlap::Narrow);
if let Some(overlap) = overlap.as_mut() {
*overlap = overlap.then(column_overlap);
} else {
overlap = Some(column_overlap);
}
}
overlap.unwrap_or(Overlap::Equal)
}
}
impl<K, V> From<HashMap<K, ColumnRange<V>>> for Range<K, V> {
fn from(columns: HashMap<K, ColumnRange<V>>) -> Self {
Self { columns }
}
}
impl<K: Hash + Eq, V> FromIterator<(K, V)> for Range<K, V> {
fn from_iter<I: IntoIterator<Item = (K, V)>>(iter: I) -> Self {
Self {
columns: iter
.into_iter()
.map(|(name, bound)| (name, ColumnRange::Eq(bound)))
.collect(),
}
}
}
impl<K: Hash + Eq, V> FromIterator<(K, (Bound<V>, Bound<V>))> for Range<K, V> {
fn from_iter<I: IntoIterator<Item = (K, (Bound<V>, Bound<V>))>>(iter: I) -> Self {
Self {
columns: iter
.into_iter()
.map(|(name, bounds)| (name, ColumnRange::In(bounds)))
.collect(),
}
}
}
impl<K: Hash + Eq, V> FromIterator<(K, ColumnRange<V>)> for Range<K, V> {
fn from_iter<I: IntoIterator<Item = (K, ColumnRange<V>)>>(iter: I) -> Self {
Self {
columns: iter.into_iter().collect(),
}
}
}
impl<K, V> fmt::Debug for Range<K, V>
where
K: fmt::Display,
ColumnRange<V>: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("{")?;
for (i, (column, bound)) in self.columns.iter().enumerate() {
write!(f, "{column}: {bound:?}")?;
if i < self.len() - 1 {
f.write_str(", ")?;
}
}
f.write_str("}")
}
}
#[derive(Eq, PartialEq)]
pub(crate) struct TableSchema<S> {
inner: S,
}
impl<S> TableSchema<S> {
pub fn inner(&self) -> &S {
&self.inner
}
}
impl<S: Schema> TableSchema<S> {
#[inline]
pub fn get_index<'a>(&'a self, index_id: IndexId<'a>) -> Option<&'a S::Index> {
match index_id {
IndexId::Primary => Some(self.primary()),
IndexId::Auxiliary(index_id) => self
.auxiliary()
.iter()
.filter_map(|(name, index)| if name == index_id { Some(index) } else { None })
.next(),
}
}
pub fn index_ids(&self) -> impl Iterator<Item = IndexId> {
let aux = self.inner.auxiliary().iter().map(|(name, _)| name.into());
iter::once(IndexId::Primary).chain(aux)
}
pub fn plan_query<'a>(
&'a self,
range: &HashMap<S::Id, ColumnRange<S::Value>>,
order: &'a [S::Id],
select: &'a [S::Id],
) -> Result<QueryPlan<'a, S::Id>, io::Error> {
QueryPlan::new(self, &range, order, select).ok_or_else(|| {
io::Error::new(
io::ErrorKind::Unsupported,
format!(
"{:?} has no index to support range {range:?} and order {order:?}",
self.inner
),
)
})
}
}
impl<S> Deref for TableSchema<S> {
type Target = S;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<S: Schema> From<S> for TableSchema<S> {
fn from(inner: S) -> Self {
Self { inner }
}
}
#[inline]
pub(crate) fn range_is_supported<K, V>(range: &HashMap<K, ColumnRange<V>>, columns: &[K]) -> bool
where
K: Eq + PartialEq + Hash,
{
let mut i = 0;
while i < columns.len() {
match range.get(&columns[i]) {
None => break,
Some(ColumnRange::Eq(_)) => i += 1,
Some(ColumnRange::In(_)) => {
i += 1;
break;
}
}
}
i == range.len()
}