1use std::cmp::Ordering;
2use std::collections::HashMap;
3use std::hash::Hash;
4use std::ops::{Bound, Deref};
5use std::{fmt, io, iter};
6
7use b_tree::collate::*;
8
9use crate::plan::QueryPlan;
10pub use b_tree::Schema as BTreeSchema;
11
12#[derive(Copy, Clone, Eq, PartialEq, Hash)]
14pub enum IndexId<'a> {
15 Primary,
16 Auxiliary(&'a str),
17}
18
19impl<'a> Default for IndexId<'a> {
20 fn default() -> Self {
21 Self::Primary
22 }
23}
24
25impl<'a> From<&'a str> for IndexId<'a> {
26 fn from(id: &'a str) -> Self {
27 Self::Auxiliary(id)
28 }
29}
30
31impl<'a> From<&'a String> for IndexId<'a> {
32 fn from(id: &'a String) -> Self {
33 Self::Auxiliary(id)
34 }
35}
36
37impl<'a> fmt::Debug for IndexId<'a> {
38 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
39 match self {
40 Self::Primary => f.write_str("primary"),
41 Self::Auxiliary(id) => id.fmt(f),
42 }
43 }
44}
45
46pub trait IndexSchema: BTreeSchema + Clone + Send + Sync + 'static {
48 type Id: Hash + Eq + Clone + fmt::Debug + fmt::Display + 'static;
49
50 fn columns(&self) -> &[Self::Id];
52
53 fn supports(&self, range: &Range<Self::Id, Self::Value>) -> bool {
55 range_is_supported(&range.columns, self.columns())
56 }
57}
58
59pub trait Schema: Eq + Send + Sync + Sized + fmt::Debug {
61 type Id: Hash + Eq + Clone + Send + Sync + fmt::Debug + fmt::Display + 'static;
63
64 type Error: std::error::Error + From<io::Error>;
66
67 type Value: Clone + Eq + Send + Sync + fmt::Debug + 'static;
69
70 type Index: IndexSchema<Error = Self::Error, Id = Self::Id, Value = Self::Value>
72 + Send
73 + Sync
74 + 'static;
75
76 fn key(&self) -> &[Self::Id];
78
79 fn values(&self) -> &[Self::Id];
81
82 fn primary(&self) -> &Self::Index;
84
85 fn auxiliary(&self) -> &[(String, Self::Index)];
88
89 fn validate_key(&self, key: Vec<Self::Value>) -> Result<Vec<Self::Value>, Self::Error>;
91
92 fn validate_values(&self, values: Vec<Self::Value>) -> Result<Vec<Self::Value>, Self::Error>;
94}
95
96#[derive(Copy, Clone, Eq, PartialEq)]
98pub enum ColumnRange<V> {
99 Eq(V),
100 In((Bound<V>, Bound<V>)),
101}
102
103impl<V> ColumnRange<V> {
104 pub fn is_range(&self) -> bool {
106 match self {
107 Self::Eq(_) => false,
108 Self::In(_) => true,
109 }
110 }
111}
112
113impl<C> OverlapsRange<Self, C> for ColumnRange<C::Value>
114where
115 C: Collate,
116 C::Value: fmt::Debug,
117 std::ops::Range<C::Value>: OverlapsRange<std::ops::Range<C::Value>, C>,
118{
119 fn overlaps(&self, other: &Self, collator: &C) -> Overlap {
120 match (self, other) {
121 (Self::Eq(this), Self::Eq(that)) => match collator.cmp(this, that) {
122 Ordering::Less => Overlap::Less,
123 Ordering::Equal => Overlap::Equal,
124 Ordering::Greater => Overlap::Greater,
125 },
126 (Self::In(this), Self::Eq(that)) => this.overlaps_value(that, collator),
127 (Self::Eq(this), Self::In(that)) => match that.overlaps_value(this, collator) {
128 Overlap::Equal => Overlap::Equal,
129
130 Overlap::Less => Overlap::Greater,
131 Overlap::WideLess => Overlap::WideGreater,
132 Overlap::Wide => Overlap::Narrow,
133 Overlap::WideGreater => Overlap::WideLess,
134 Overlap::Greater => Overlap::Less,
135
136 Overlap::Narrow => unreachable!("{:?} is narrower than {:?}", that, this),
137 },
138 (Self::In(this), Self::In(that)) => this.overlaps(that, collator),
139 }
140 }
141}
142
143impl<V> From<V> for ColumnRange<V> {
144 fn from(value: V) -> Self {
145 Self::Eq(value)
146 }
147}
148
149impl<V> From<std::ops::Range<V>> for ColumnRange<V> {
150 fn from(range: std::ops::Range<V>) -> Self {
151 Self::In((Bound::Included(range.start), Bound::Excluded(range.end)))
152 }
153}
154
155impl<V: fmt::Debug> fmt::Debug for ColumnRange<V> {
156 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
157 match self {
158 Self::Eq(value) => write!(f, "{:?}", value),
159 Self::In((start, end)) => {
160 match start {
161 Bound::Unbounded => f.write_str("[."),
162 Bound::Included(start) => write!(f, "[{start:?}."),
163 Bound::Excluded(start) => write!(f, "({start:?}."),
164 }?;
165
166 match end {
167 Bound::Unbounded => f.write_str(".]"),
168 Bound::Included(end) => write!(f, ".{end:?}]"),
169 Bound::Excluded(end) => write!(f, ".{end:?})"),
170 }
171 }
172 }
173 }
174}
175
176#[derive(Clone)]
178pub struct Range<K, V> {
179 columns: HashMap<K, ColumnRange<V>>,
180}
181
182impl<K, V> Default for Range<K, V> {
183 fn default() -> Self {
184 Self {
185 columns: HashMap::with_capacity(0),
186 }
187 }
188}
189
190impl<K, V> Range<K, V> {
191 pub fn inner(&self) -> &HashMap<K, ColumnRange<V>> {
193 &self.columns
194 }
195
196 pub fn into_inner(self) -> HashMap<K, ColumnRange<V>> {
198 self.columns
199 }
200
201 pub fn is_default(&self) -> bool {
203 self.columns.is_empty()
204 }
205
206 pub fn len(&self) -> usize {
208 self.columns.len()
209 }
210}
211
212impl<K: Eq + Hash, V> Range<K, V> {
213 pub fn get(&self, column: &K) -> Option<&ColumnRange<V>> {
215 self.columns.get(column)
216 }
217}
218
219impl<C, K> OverlapsRange<Self, C> for Range<K, C::Value>
220where
221 K: Eq + Hash,
222 C: Collate,
223 C::Value: fmt::Debug,
224{
225 fn overlaps(&self, other: &Self, collator: &C) -> Overlap {
226 let mut overlap: Option<Overlap> = None;
227
228 for name in other.columns.keys() {
230 if !self.columns.contains_key(name) {
231 return Overlap::Wide;
232 }
233 }
234
235 for (name, this) in &self.columns {
236 let column_overlap = other
237 .columns
238 .get(name)
239 .map(|that| this.overlaps(that, collator))
240 .unwrap_or(Overlap::Narrow);
242
243 if let Some(overlap) = overlap.as_mut() {
244 *overlap = overlap.then(column_overlap);
245 } else {
246 overlap = Some(column_overlap);
247 }
248 }
249
250 overlap.unwrap_or(Overlap::Equal)
252 }
253}
254
255impl<K, V> From<HashMap<K, ColumnRange<V>>> for Range<K, V> {
256 fn from(columns: HashMap<K, ColumnRange<V>>) -> Self {
257 Self { columns }
258 }
259}
260
261impl<K: Hash + Eq, V> FromIterator<(K, V)> for Range<K, V> {
262 fn from_iter<I: IntoIterator<Item = (K, V)>>(iter: I) -> Self {
263 Self {
264 columns: iter
265 .into_iter()
266 .map(|(name, bound)| (name, ColumnRange::Eq(bound)))
267 .collect(),
268 }
269 }
270}
271
272impl<K: Hash + Eq, V> FromIterator<(K, (Bound<V>, Bound<V>))> for Range<K, V> {
273 fn from_iter<I: IntoIterator<Item = (K, (Bound<V>, Bound<V>))>>(iter: I) -> Self {
274 Self {
275 columns: iter
276 .into_iter()
277 .map(|(name, bounds)| (name, ColumnRange::In(bounds)))
278 .collect(),
279 }
280 }
281}
282
283impl<K: Hash + Eq, V> FromIterator<(K, ColumnRange<V>)> for Range<K, V> {
284 fn from_iter<I: IntoIterator<Item = (K, ColumnRange<V>)>>(iter: I) -> Self {
285 Self {
286 columns: iter.into_iter().collect(),
287 }
288 }
289}
290
291impl<K, V> fmt::Debug for Range<K, V>
292where
293 K: fmt::Display,
294 ColumnRange<V>: fmt::Debug,
295{
296 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
297 f.write_str("{")?;
298
299 for (i, (column, bound)) in self.columns.iter().enumerate() {
300 write!(f, "{column}: {bound:?}")?;
301
302 if i < self.len() - 1 {
303 f.write_str(", ")?;
304 }
305 }
306
307 f.write_str("}")
308 }
309}
310
311#[derive(Eq, PartialEq)]
312pub(crate) struct TableSchema<S> {
313 inner: S,
314}
315
316impl<S> TableSchema<S> {
317 pub fn inner(&self) -> &S {
318 &self.inner
319 }
320}
321
322impl<S: Schema> TableSchema<S> {
323 #[inline]
324 pub fn get_index<'a>(&'a self, index_id: IndexId<'a>) -> Option<&'a S::Index> {
325 match index_id {
326 IndexId::Primary => Some(self.primary()),
327 IndexId::Auxiliary(index_id) => self
328 .auxiliary()
329 .iter()
330 .filter_map(|(name, index)| if name == index_id { Some(index) } else { None })
331 .next(),
332 }
333 }
334
335 pub fn index_ids(&self) -> impl Iterator<Item = IndexId> {
336 let aux = self.inner.auxiliary().iter().map(|(name, _)| name.into());
337
338 iter::once(IndexId::Primary).chain(aux)
339 }
340
341 pub fn plan_query<'a>(
342 &'a self,
343 range: &HashMap<S::Id, ColumnRange<S::Value>>,
344 order: &'a [S::Id],
345 select: &'a [S::Id],
346 ) -> Result<QueryPlan<'a, S::Id>, io::Error> {
347 QueryPlan::new(self, &range, order, select).ok_or_else(|| {
348 io::Error::new(
349 io::ErrorKind::Unsupported,
350 format!(
351 "{:?} has no index to support range {range:?} and order {order:?}",
352 self.inner
353 ),
354 )
355 })
356 }
357}
358
359impl<S> Deref for TableSchema<S> {
360 type Target = S;
361
362 fn deref(&self) -> &Self::Target {
363 &self.inner
364 }
365}
366
367impl<S: Schema> From<S> for TableSchema<S> {
368 fn from(inner: S) -> Self {
369 Self { inner }
370 }
371}
372
373#[inline]
374pub(crate) fn range_is_supported<K, V>(range: &HashMap<K, ColumnRange<V>>, columns: &[K]) -> bool
375where
376 K: Eq + PartialEq + Hash,
377{
378 let mut i = 0;
379
380 while i < columns.len() {
381 match range.get(&columns[i]) {
382 None => break,
383 Some(ColumnRange::Eq(_)) => i += 1,
384 Some(ColumnRange::In(_)) => {
385 i += 1;
386 break;
387 }
388 }
389 }
390
391 i == range.len()
392}