b_table/
table.rs

1use std::collections::{BTreeMap, HashMap};
2use std::hash::Hash;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::{fmt, io, mem};
6
7use b_tree::collate::Collate;
8use b_tree::{BTree, BTreeLock, Key};
9use freqfs::{DirDeref, DirLock, DirReadGuardOwned, DirWriteGuardOwned, FileLoad};
10use futures::future::{try_join_all, TryFutureExt};
11use futures::stream::{Stream, TryStreamExt};
12use safecast::AsType;
13use smallvec::{smallvec, SmallVec};
14
15use super::plan::{IndexQuery, QueryPlan};
16use super::schema::*;
17use super::{IndexStack, Node};
18
19const PRIMARY: &str = "primary";
20
21/// The maximum number of values in a stack-allocated [`Row`]
22pub const ROW_STACK_SIZE: usize = 32;
23
24/// A read guard acquired on a [`TableLock`]
25pub type TableReadGuard<S, IS, C, FE> = Table<S, IS, C, Arc<DirReadGuardOwned<FE>>>;
26
27/// A write guard acquired on a [`TableLock`]
28pub type TableWriteGuard<S, IS, C, FE> = Table<S, IS, C, DirWriteGuardOwned<FE>>;
29
30/// The type of row returned in a [`Stream`] of [`Rows`]
31pub type Row<V> = SmallVec<[V; ROW_STACK_SIZE]>;
32
33/// A stream of table rows
34pub type Rows<V> = Pin<Box<dyn Stream<Item = Result<Row<V>, io::Error>> + Send>>;
35
36/// A futures-aware read-write lock on a [`Table`]
37pub struct TableLock<S, IS, C, FE> {
38    schema: Arc<TableSchema<S>>,
39    dir: DirLock<FE>,
40    primary: BTreeLock<IS, C, FE>,
41    // use a BTreeMap to make sure index locks are always acquired in-order
42    auxiliary: BTreeMap<Arc<str>, BTreeLock<IS, C, FE>>,
43}
44
45impl<S, IS, C, FE> Clone for TableLock<S, IS, C, FE>
46where
47    C: Clone,
48{
49    fn clone(&self) -> Self {
50        Self {
51            schema: self.schema.clone(),
52            dir: self.dir.clone(),
53            primary: self.primary.clone(),
54            auxiliary: self.auxiliary.clone(),
55        }
56    }
57}
58
59impl<S, IS, C, FE> TableLock<S, IS, C, FE> {
60    /// Borrow the [`Schema`] of this [`Table`].
61    pub fn schema(&self) -> &S {
62        self.schema.inner()
63    }
64
65    /// Borrow the collator for this [`Table`].
66    pub fn collator(&self) -> &b_tree::Collator<C> {
67        self.primary.collator()
68    }
69}
70
71impl<S, C, FE> TableLock<S, S::Index, C, FE>
72where
73    S: Schema,
74    C: Clone,
75    FE: AsType<Node<S::Value>> + Send + Sync,
76    Node<S::Value>: FileLoad,
77{
78    /// Create a new [`Table`]
79    pub fn create(schema: S, collator: C, dir: DirLock<FE>) -> Result<Self, io::Error> {
80        valid_schema(&schema)?;
81
82        let mut dir_contents = dir.try_write()?;
83
84        let primary = {
85            let dir = dir_contents.create_dir(PRIMARY.to_string())?;
86            BTreeLock::create(schema.primary().clone(), collator.clone(), dir)
87        }?;
88
89        let mut auxiliary = BTreeMap::new();
90        for (name, schema) in schema.auxiliary() {
91            let index = {
92                let dir = dir_contents.create_dir(name.to_string())?;
93                BTreeLock::create(schema.clone(), collator.clone(), dir)
94            }?;
95
96            auxiliary.insert(name.clone().into(), index);
97        }
98
99        std::mem::drop(dir_contents);
100
101        Ok(Self {
102            schema: Arc::new(schema.into()),
103            primary,
104            auxiliary,
105            dir,
106        })
107    }
108
109    /// Load an existing [`Table`] with the given `schema` from the given `dir`
110    pub fn load(schema: S, collator: C, dir: DirLock<FE>) -> Result<Self, io::Error> {
111        valid_schema(&schema)?;
112
113        let mut dir_contents = dir.try_write()?;
114
115        let primary = {
116            let dir = dir_contents.get_or_create_dir(PRIMARY.to_string())?;
117            BTreeLock::load(schema.primary().clone(), collator.clone(), dir.clone())
118        }?;
119
120        let mut auxiliary = BTreeMap::new();
121        for (name, schema) in schema.auxiliary() {
122            let index = {
123                let dir = dir_contents.get_or_create_dir(name.clone())?;
124                BTreeLock::load(schema.clone(), collator.clone(), dir.clone())
125            }?;
126
127            auxiliary.insert(name.clone().into(), index);
128        }
129
130        std::mem::drop(dir_contents);
131
132        Ok(Self {
133            schema: Arc::new(schema.into()),
134            primary,
135            auxiliary,
136            dir,
137        })
138    }
139
140    pub async fn sync(&self) -> Result<(), io::Error>
141    where
142        FE: for<'a> freqfs::FileSave<'a>,
143    {
144        self.dir.sync().await
145    }
146}
147
148impl<S, C, FE> TableLock<S, S::Index, C, FE>
149where
150    S: Schema,
151    C: Clone,
152    FE: Send + Sync,
153    Node<S::Value>: FileLoad,
154{
155    /// Lock this [`Table`] for reading.
156    pub async fn read(&self) -> TableReadGuard<S, S::Index, C, FE> {
157        #[cfg(feature = "logging")]
158        log::debug!("locking table for reading...");
159
160        let schema = self.schema.clone();
161
162        // lock the primary key first, separately from the indices, to avoid a deadlock
163        let primary = self.primary.read().await;
164
165        #[cfg(feature = "logging")]
166        log::trace!("locked primary index for reading");
167
168        // then lock each index in-order
169        let mut auxiliary = HashMap::with_capacity(self.auxiliary.len());
170        for (name, index) in &self.auxiliary {
171            let index = index.read().await;
172            auxiliary.insert(name.clone(), index);
173
174            #[cfg(feature = "logging")]
175            log::trace!("locked index {name} for reading");
176        }
177
178        Table {
179            schema,
180            state: TableState { auxiliary, primary },
181        }
182    }
183
184    /// Lock this [`Table`] for reading, without borrowing.
185    pub async fn into_read(self) -> TableReadGuard<S, S::Index, C, FE> {
186        #[cfg(feature = "logging")]
187        log::debug!("locking table for reading...");
188
189        let schema = self.schema.clone();
190
191        // lock the primary key first, separately from the indices, to avoid a deadlock
192        let primary = self.primary.into_read().await;
193
194        #[cfg(feature = "logging")]
195        log::trace!("locked primary index for reading");
196
197        // then lock each index in-order
198        let mut auxiliary = HashMap::with_capacity(self.auxiliary.len());
199        for (name, index) in self.auxiliary {
200            let index = index.into_read().await;
201
202            #[cfg(feature = "logging")]
203            log::trace!("locked index {name} for reading");
204
205            auxiliary.insert(name, index);
206        }
207
208        Table {
209            schema,
210            state: TableState { auxiliary, primary },
211        }
212    }
213
214    /// Lock this [`Table`] for reading synchronously, if possible.
215    pub fn try_read(&self) -> Result<TableReadGuard<S, S::Index, C, FE>, io::Error> {
216        #[cfg(feature = "logging")]
217        log::debug!("locking table for reading...");
218
219        let schema = self.schema.clone();
220
221        // lock the primary key first, separately from the indices, to avoid a deadlock
222        let primary = self.primary.try_read()?;
223
224        #[cfg(feature = "logging")]
225        log::trace!("locked primary index for reading");
226
227        // then lock each index in-order
228        let mut auxiliary = HashMap::with_capacity(self.auxiliary.len());
229        for (name, index) in self.auxiliary.iter() {
230            let index = index.try_read()?;
231            auxiliary.insert(name.clone(), index);
232
233            #[cfg(feature = "logging")]
234            log::trace!("locked index {name} for reading");
235        }
236
237        Ok(Table {
238            schema,
239            state: TableState { auxiliary, primary },
240        })
241    }
242
243    /// Lock this [`Table`] for writing.
244    pub async fn write(&self) -> TableWriteGuard<S, S::Index, C, FE> {
245        #[cfg(feature = "logging")]
246        log::debug!("locking table for writing...");
247
248        let schema = self.schema.clone();
249
250        // lock the primary key first, separately from the indices, to avoid a deadlock
251        let primary = self.primary.write().await;
252
253        #[cfg(feature = "logging")]
254        log::trace!("locked primary index for writing");
255
256        // then lock each index in-order
257        let mut auxiliary = HashMap::with_capacity(self.auxiliary.len());
258        for (name, index) in self.auxiliary.iter() {
259            let index = index.write().await;
260            auxiliary.insert(name.clone(), index);
261
262            #[cfg(feature = "logging")]
263            log::trace!("locked index {name} for writing");
264        }
265
266        Table {
267            schema,
268            state: TableState { auxiliary, primary },
269        }
270    }
271
272    /// Lock this [`Table`] for writing, without borrowing.
273    pub async fn into_write(self) -> TableWriteGuard<S, S::Index, C, FE> {
274        #[cfg(feature = "logging")]
275        log::debug!("locking table for reading...");
276
277        let schema = self.schema.clone();
278
279        // lock the primary key first, separately from the indices, to avoid a deadlock
280        let primary = self.primary.into_write().await;
281
282        #[cfg(feature = "logging")]
283        log::trace!("locked primary index for writing");
284
285        // then lock each index in-order
286        let mut auxiliary = HashMap::with_capacity(self.auxiliary.len());
287        for (name, index) in self.auxiliary.into_iter() {
288            let index = index.into_write().await;
289
290            #[cfg(feature = "logging")]
291            log::trace!("locked index {name} for writing");
292
293            auxiliary.insert(name, index);
294        }
295
296        Table {
297            schema,
298            state: TableState { auxiliary, primary },
299        }
300    }
301
302    /// Lock this [`Table`] for writing synchronously, if possible.
303    pub fn try_write(&self) -> Result<TableWriteGuard<S, S::Index, C, FE>, io::Error> {
304        #[cfg(feature = "logging")]
305        log::debug!("locking table for writing...");
306
307        let schema = self.schema.clone();
308
309        // lock the primary key first, separately from the indices, to avoid a deadlock
310        let primary = self.primary.try_write()?;
311
312        #[cfg(feature = "logging")]
313        log::trace!("locked primary index for writing");
314
315        // then lock each index in-order
316        let mut auxiliary = HashMap::with_capacity(self.auxiliary.len());
317        for (name, index) in self.auxiliary.iter() {
318            let index = index.try_write()?;
319            auxiliary.insert(name.clone(), index);
320
321            #[cfg(feature = "logging")]
322            log::trace!("locked index {name} for writing");
323        }
324
325        Ok(Table {
326            schema,
327            state: TableState { auxiliary, primary },
328        })
329    }
330}
331
332struct TableState<IS, C, G> {
333    // IMPORTANT! the auxiliary field must go before primary so that it will be dropped first
334    auxiliary: HashMap<Arc<str>, BTree<IS, C, G>>,
335    primary: BTree<IS, C, G>,
336}
337
338impl<IS, C: Clone, G: Clone> Clone for TableState<IS, C, G> {
339    fn clone(&self) -> Self {
340        Self {
341            primary: self.primary.clone(),
342            auxiliary: self.auxiliary.clone(),
343        }
344    }
345}
346
347impl<IS, C, G> TableState<IS, C, G> {
348    #[inline]
349    fn get_index<'a, Id>(&'a self, index_id: Id) -> Option<&'a BTree<IS, C, G>>
350    where
351        IndexId<'a>: From<Id>,
352    {
353        match index_id.into() {
354            IndexId::Primary => Some(&self.primary),
355            IndexId::Auxiliary(index_id) => self.auxiliary.get(index_id),
356        }
357    }
358}
359impl<IS, C, FE, G> TableState<IS, C, G>
360where
361    IS: IndexSchema,
362    C: Collate<Value = IS::Value> + 'static,
363    FE: AsType<Node<IS::Value>> + Send + Sync + 'static,
364    G: DirDeref<Entry = FE> + 'static,
365    Node<IS::Value>: FileLoad,
366    Range<IS::Id, IS::Value>: fmt::Debug,
367{
368    async fn contains(&self, prefix: &[IS::Value]) -> Result<bool, io::Error> {
369        self.primary.contains(prefix).await
370    }
371
372    async fn get_row(&self, key: &[IS::Value]) -> Result<Option<Row<IS::Value>>, io::Error> {
373        self.primary.first(b_tree::Range::from_prefix(key)).await
374    }
375
376    async fn first<'a>(
377        &self,
378        plan: &QueryPlan<'a, IS::Id>,
379        range: &HashMap<IS::Id, ColumnRange<IS::Value>>,
380        select: &[IS::Id],
381        key_columns: &[IS::Id],
382    ) -> Result<Option<Row<IS::Value>>, io::Error> {
383        let mut plan = plan.indices.iter();
384
385        let (mut first, mut columns) = if let Some((index_id, _query)) = plan.next() {
386            let index = self.get_index(*index_id).expect("index");
387            let columns = index.schema().columns();
388            let index_range = index_range_borrow(columns, range);
389
390            if let Some(first) = index.first(index_range).await? {
391                (first, columns)
392            } else {
393                return Ok(None);
394            }
395        } else {
396            let index_range = index_range_borrow(self.primary.schema().columns(), range);
397
398            return self
399                .primary
400                .first(index_range)
401                .map_ok(|first| {
402                    first.map(|first| {
403                        extract_columns(first, self.primary.schema().columns(), select)
404                    })
405                })
406                .await;
407        };
408
409        for (index_id, _query) in plan {
410            let index = self.get_index(*index_id).expect("index");
411
412            columns = index.schema().columns();
413
414            let index_range = index_range_borrow(&columns, range);
415
416            first = if let Some(key) = index.first(index_range).await? {
417                key
418            } else {
419                return Ok(None);
420            }
421        }
422
423        if !select.iter().all(|col_name| columns.contains(col_name)) {
424            let pk = extract_columns(first, columns, key_columns);
425
426            first = self
427                .get_row(&pk)
428                .map_ok(|maybe_row| maybe_row.expect("row"))
429                .await?;
430
431            columns = self.primary.schema().columns();
432        }
433
434        Ok(Some(extract_columns(first, columns, select)))
435    }
436}
437
438impl<IS, C, FE, G> TableState<IS, C, G>
439where
440    IS: IndexSchema,
441    C: Collate<Value = IS::Value> + Clone + Send + Sync + 'static,
442    FE: AsType<Node<IS::Value>> + Send + Sync + 'static,
443    G: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
444    Node<IS::Value>: FileLoad,
445    Range<IS::Id, IS::Value>: fmt::Debug,
446{
447    async fn count<'a>(
448        &'a self,
449        plan: QueryPlan<'a, IS::Id>,
450        range: HashMap<IS::Id, ColumnRange<IS::Value>>,
451        key_columns: &'a [IS::Id],
452    ) -> Result<u64, io::Error> {
453        // TODO: optimize
454        let mut rows = self
455            .rows(plan, range, false, key_columns, key_columns)
456            .await?;
457
458        let mut count = 0;
459        while let Some(_row) = rows.try_next().await? {
460            count += 1;
461        }
462
463        Ok(count)
464    }
465
466    async fn is_empty<'a>(
467        &'a self,
468        plan: QueryPlan<'a, IS::Id>,
469        range: HashMap<IS::Id, ColumnRange<IS::Value>>,
470        key_columns: &'a [IS::Id],
471    ) -> Result<bool, io::Error> {
472        self.first(&plan, &range, key_columns, key_columns)
473            .map_ok(|maybe_row| maybe_row.is_none())
474            .await
475    }
476
477    // note: it would be clearer to implement this recursively but it would require removing the lifetime parameter
478    async fn rows<'a>(
479        &'a self,
480        mut plan: QueryPlan<'a, IS::Id>,
481        mut range: HashMap<IS::Id, ColumnRange<IS::Value>>,
482        reverse: bool,
483        select: &'a [IS::Id],
484        key_columns: &'a [IS::Id],
485    ) -> Result<Rows<IS::Value>, io::Error> {
486        #[cfg(feature = "logging")]
487        log::debug!("construct row stream with plan {plan:?}");
488
489        let mut keys: Option<(b_tree::Keys<IS::Value>, &'a [IS::Id])> = None;
490
491        let last_query = plan.indices.pop();
492
493        if let Some((index_id, query)) = plan.indices.first() {
494            assert_eq!(query.prefix_len(), 0);
495            let index = self.get_index(*index_id).expect("index");
496
497            let columns = &index.schema().columns()[..query.selected()];
498            assert!(query.range().iter().zip(columns).all(|(r, c)| *r == c));
499
500            let index_range = index_range_for(&columns[..query.range().len()], &mut range);
501            let index_prefixes = index
502                .clone()
503                .groups(index_range, columns.len(), reverse)
504                .await?;
505
506            keys = Some((index_prefixes, columns));
507        }
508
509        // for each index before the last
510        for (index_id, query) in plan.indices.into_iter().skip(1) {
511            // merge all unique prefixes beginning with each prefix
512
513            let index = self.get_index(index_id).expect("index");
514
515            let (prefixes, columns_in) = keys.take().expect("prefixes");
516
517            let columns_out = &index.schema().columns()[..query.selected()];
518
519            assert_eq!(query.prefix_len(), columns_in.len());
520            assert!(columns_out.len() > columns_in.len());
521            assert!(query.range().iter().zip(columns_out).all(|(r, c)| *r == c));
522
523            debug_assert!(columns_out
524                .iter()
525                .take(columns_in.len())
526                .all(|c| columns_in.contains(c)));
527
528            let extract_prefix = prefix_extractor(columns_in, &columns_out[..columns_in.len()]);
529
530            let inner_range = inner_range_for(&query, &mut range);
531
532            let n = columns_out.len();
533            let index = index.clone();
534
535            let index_prefixes = prefixes
536                .map_ok(extract_prefix)
537                .map_ok(move |prefix| inner_range.clone().prepend(prefix))
538                .map_ok(move |index_range| {
539                    let index = index.clone();
540                    async move { index.groups(index_range, n, reverse).await }
541                })
542                .try_buffered(num_cpus::get())
543                .try_flatten();
544
545            keys = Some((Box::pin(index_prefixes), columns_out))
546        }
547
548        if let Some((index_id, query)) = last_query {
549            if let Some((prefixes, columns_in)) = keys.take() {
550                // merge streams of all keys in the last index beginning with each prefix
551
552                assert_eq!(query.prefix_len(), columns_in.len());
553
554                let index = self.get_index(index_id).expect("index");
555
556                let columns_out = &index.schema().columns();
557
558                debug_assert!(
559                    columns_out.len() > columns_in.len(),
560                    "cannot select {columns_out:?} with prefix {columns_in:?}"
561                );
562
563                debug_assert!(columns_out
564                    .iter()
565                    .take(columns_in.len())
566                    .all(|c| columns_in.contains(c)));
567
568                let extract_prefix = prefix_extractor(columns_in, &columns_out[..columns_in.len()]);
569
570                let inner_range = inner_range_for(&query, &mut range);
571
572                let index = index.clone();
573
574                let index_keys = prefixes
575                    .map_ok(extract_prefix)
576                    .map_ok(move |prefix| inner_range.clone().prepend(prefix))
577                    .map_ok(move |index_range| {
578                        let index = index.clone();
579                        async move { index.keys(index_range, reverse).await }
580                    })
581                    .try_buffered(num_cpus::get())
582                    .try_flatten();
583
584                keys = Some((Box::pin(index_keys), columns_out))
585            } else {
586                let index = self.get_index(index_id).expect("index");
587                let columns = index.schema().columns();
588
589                let index_range = index_range_for(columns, &mut range);
590                assert!(range.is_empty());
591
592                let index_keys = index.clone().keys(index_range, reverse).await?;
593                keys = Some((Box::pin(index_keys), columns));
594            }
595        }
596
597        let (keys, columns) = if let Some((keys, columns)) = keys {
598            if select.iter().all(|c| columns.contains(c)) {
599                // if all columns to select are already present, return the stream
600                (keys, columns)
601            } else {
602                // otherwise, construct a stream of rows by extracting & selecting each primary key
603
604                let index = self.primary.clone();
605                let extract_prefix = prefix_extractor(columns, key_columns);
606
607                let rows = keys
608                    .map_ok(extract_prefix)
609                    .map_ok(move |primary_key| {
610                        let index = index.clone();
611                        async move { index.first(b_tree::Range::from(primary_key)).await }
612                    })
613                    .try_buffered(num_cpus::get())
614                    .map_ok(|maybe_row| maybe_row.expect("row"));
615
616                let rows: Rows<IS::Value> = Box::pin(rows);
617                (rows, self.primary.schema().columns())
618            }
619        } else {
620            let columns = self.primary.schema().columns();
621            let index_range = index_range_for(columns, &mut range);
622            assert!(range.is_empty());
623            let keys = self.primary.clone().keys(index_range, reverse).await?;
624            (keys, columns)
625        };
626
627        if columns == select {
628            Ok(keys)
629        } else {
630            let extract_prefix = prefix_extractor(columns, select);
631            let rows = keys.map_ok(extract_prefix);
632            Ok(Box::pin(rows))
633        }
634    }
635}
636
637#[inline]
638fn index_range_borrow<'a, K: Eq + Hash, V>(
639    columns: &[K],
640    range: &'a HashMap<K, ColumnRange<V>>,
641) -> b_tree::Range<&'a V> {
642    let mut prefix = Key::with_capacity(range.len());
643
644    for col_name in columns {
645        if let Some(col_range) = range.get(col_name) {
646            match col_range {
647                ColumnRange::Eq(value) => {
648                    prefix.push(value);
649                }
650                ColumnRange::In((start, end)) => {
651                    return b_tree::Range::with_bounds(prefix, (start.as_ref(), end.as_ref()));
652                }
653            }
654        } else {
655            break;
656        }
657    }
658
659    b_tree::Range::from_prefix(prefix)
660}
661
662#[inline]
663fn index_range_for<'a, K: Eq + Hash, V>(
664    columns: &[K],
665    range: &mut HashMap<K, ColumnRange<V>>,
666) -> b_tree::Range<V> {
667    let mut prefix = Key::with_capacity(range.len());
668
669    for col_name in columns {
670        if let Some(col_range) = range.remove(col_name) {
671            match col_range {
672                ColumnRange::Eq(value) => {
673                    prefix.push(value);
674                }
675                ColumnRange::In(bounds) => {
676                    return b_tree::Range::with_bounds(prefix, bounds);
677                }
678            }
679        } else {
680            break;
681        }
682    }
683
684    b_tree::Range::from_prefix(prefix)
685}
686
687#[inline]
688fn inner_range_for<'a, K, V>(
689    query: &IndexQuery<'a, K>,
690    range: &HashMap<K, ColumnRange<V>>,
691) -> b_tree::Range<V>
692where
693    K: Eq + Hash + fmt::Debug,
694    V: Clone,
695{
696    let mut inner_range = Key::with_capacity(query.range().len());
697    let mut range_columns = query.range().into_iter();
698
699    let inner_range = loop {
700        if let Some(col_name) = range_columns.next() {
701            match range.get(col_name).cloned().expect("column range") {
702                ColumnRange::Eq(value) => inner_range.push(value),
703                ColumnRange::In(bounds) => break b_tree::Range::with_bounds(inner_range, bounds),
704            }
705        } else {
706            break b_tree::Range::from(inner_range);
707        }
708    };
709
710    assert!(range_columns.next().is_none());
711
712    inner_range
713}
714
715fn prefix_extractor<K, V>(columns_in: &[K], columns_out: &[K]) -> impl Fn(Key<V>) -> Key<V> + Send
716where
717    K: PartialEq + fmt::Debug,
718    V: Default + Clone,
719{
720    debug_assert!(columns_out.len() <= columns_in.len());
721    debug_assert!(!columns_out.is_empty());
722    debug_assert!(
723        columns_out.iter().all(|id| columns_in.contains(&id)),
724        "{columns_out:?} is not a subset of {columns_in:?}"
725    );
726
727    #[cfg(feature = "logging")]
728    log::trace!("extract columns {columns_out:?} from {columns_in:?}");
729
730    let indices = columns_out
731        .iter()
732        .map(|name_out| {
733            columns_in
734                .iter()
735                .position(|name_in| name_in == name_out)
736                .expect("column index")
737        })
738        .collect::<IndexStack<_>>();
739
740    move |mut key| {
741        let mut prefix = smallvec![V::default(); indices.len()];
742
743        for (i_to, i_from) in indices.iter().copied().enumerate() {
744            mem::swap(&mut key[i_from], &mut prefix[i_to]);
745        }
746
747        prefix
748    }
749}
750
751impl<IS, C, FE> TableState<IS, C, DirWriteGuardOwned<FE>>
752where
753    IS: IndexSchema + Send + Sync,
754    C: Collate<Value = IS::Value> + Clone + Send + Sync + 'static,
755    FE: AsType<Node<IS::Value>> + Send + Sync + 'static,
756    DirWriteGuardOwned<FE>: DirDeref<Entry = FE>,
757    Node<IS::Value>: FileLoad,
758{
759    async fn delete_row(&mut self, key: &[IS::Value]) -> Result<bool, io::Error> {
760        let row = if let Some(row) = self.get_row(key).await? {
761            row
762        } else {
763            return Ok(false);
764        };
765
766        let mut deletes = IndexStack::with_capacity(self.auxiliary.len() + 1);
767
768        for (_name, index) in self.auxiliary.iter_mut() {
769            let index_key = borrow_columns(
770                &row,
771                self.primary.schema().columns(),
772                index.schema().columns(),
773            );
774
775            deletes.push(async move { index.delete(&index_key).await })
776        }
777
778        self.primary.delete(&row).await?;
779
780        for present in try_join_all(deletes).await? {
781            assert!(present, "table index is out of sync");
782        }
783
784        Ok(true)
785    }
786
787    async fn delete_range<'a>(
788        &mut self,
789        plan: QueryPlan<'a, IS::Id>,
790        range: HashMap<IS::Id, ColumnRange<IS::Value>>,
791        key_columns: &[IS::Id],
792    ) -> Result<usize, io::Error> {
793        let mut deleted = 0;
794
795        while let Some(pk) = self.first(&plan, &range, key_columns, key_columns).await? {
796            self.delete_row(&pk).await?;
797            deleted += 1;
798        }
799
800        Ok(deleted)
801    }
802
803    async fn delete_all<OG>(&mut self, mut other: TableState<IS, C, OG>) -> Result<(), io::Error>
804    where
805        OG: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
806    {
807        let mut deletes = IndexStack::with_capacity(self.auxiliary.len() + 1);
808
809        deletes.push(self.primary.delete_all(other.primary));
810
811        for (name, this) in self.auxiliary.iter_mut() {
812            let that = other.auxiliary.remove(name).expect("other index");
813            deletes.push(this.delete_all(that));
814        }
815
816        try_join_all(deletes).await?;
817
818        Ok(())
819    }
820
821    async fn merge<OG>(&mut self, mut other: TableState<IS, C, OG>) -> Result<(), io::Error>
822    where
823        OG: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
824    {
825        let mut merges = IndexStack::with_capacity(self.auxiliary.len() + 1);
826
827        merges.push(self.primary.merge(other.primary));
828
829        for (name, this) in self.auxiliary.iter_mut() {
830            let that = other.auxiliary.remove(name).expect("other index");
831            merges.push(this.merge(that));
832        }
833
834        try_join_all(merges).await?;
835
836        Ok(())
837    }
838
839    async fn upsert(&mut self, row: Vec<IS::Value>) -> Result<bool, io::Error> {
840        let mut inserts = IndexStack::with_capacity(self.auxiliary.len() + 1);
841
842        for (_name, index) in self.auxiliary.iter_mut() {
843            let index_key = clone_columns(
844                &row,
845                self.primary.schema().columns(),
846                index.schema().columns(),
847            );
848
849            inserts.push(index.insert(index_key));
850        }
851
852        inserts.push(self.primary.insert(row));
853
854        let mut inserts = try_join_all(inserts).await?;
855        let new = inserts.pop().expect("insert");
856        while let Some(index_new) = inserts.pop() {
857            assert_eq!(new, index_new, "index out of sync");
858        }
859
860        Ok(new)
861    }
862
863    async fn truncate(&mut self) -> Result<(), io::Error> {
864        let mut truncates = IndexStack::with_capacity(self.auxiliary.len() + 1);
865        truncates.push(self.primary.truncate());
866
867        for index in self.auxiliary.values_mut() {
868            truncates.push(index.truncate());
869        }
870
871        try_join_all(truncates).await?;
872
873        Ok(())
874    }
875}
876
877impl<IS, C, FE> TableState<IS, C, DirWriteGuardOwned<FE>> {
878    fn downgrade(self) -> TableState<IS, C, Arc<DirReadGuardOwned<FE>>> {
879        TableState {
880            primary: self.primary.downgrade(),
881            auxiliary: self
882                .auxiliary
883                .into_iter()
884                .map(|(name, index)| (name, index.downgrade()))
885                .collect(),
886        }
887    }
888}
889
890/// A database table with support for multiple indices
891pub struct Table<S, IS, C, G> {
892    schema: Arc<TableSchema<S>>,
893    state: TableState<IS, C, G>,
894}
895
896impl<S, IS, C, G> Clone for Table<S, IS, C, G>
897where
898    C: Clone,
899    G: Clone,
900{
901    fn clone(&self) -> Self {
902        Self {
903            schema: self.schema.clone(),
904            state: self.state.clone(),
905        }
906    }
907}
908
909impl<S, C, FE, G> Table<S, S::Index, C, G>
910where
911    S: Schema,
912    C: Collate<Value = S::Value> + 'static,
913    FE: AsType<Node<S::Value>> + Send + Sync + 'static,
914    G: DirDeref<Entry = FE> + 'static,
915    Node<S::Value>: FileLoad,
916    Range<S::Id, S::Value>: fmt::Debug,
917{
918    /// Return `true` if the given `key` is present in this [`Table`].
919    pub async fn contains(&self, key: &[S::Value]) -> Result<bool, io::Error> {
920        let key_len = self.schema.key().len();
921
922        if key.len() == key_len {
923            self.state.contains(key).await
924        } else {
925            Err(bad_key(key, key_len))
926        }
927    }
928
929    /// Return the first row in the given `range` using the given `order`.
930    pub async fn first(
931        &self,
932        range: Range<S::Id, S::Value>,
933        order: &[S::Id],
934        select: Option<&[S::Id]>,
935    ) -> Result<Option<Row<S::Value>>, io::Error> {
936        let range = range.into_inner();
937        let select = select.unwrap_or(self.schema.key());
938        let plan = self.schema.plan_query(&range, order, self.schema.key())?;
939
940        self.state
941            .first(&plan, &range, select, self.schema.key())
942            .await
943    }
944
945    /// Look up a row by its `key`.
946    pub async fn get_row(&self, key: &[S::Value]) -> Result<Option<Row<S::Value>>, io::Error> {
947        let key_len = self.schema.key().len();
948
949        if key.len() == key_len {
950            self.state.get_row(&key).await
951        } else {
952            Err(bad_key(&key, key_len))
953        }
954    }
955
956    /// Look up a value by its `key`.
957    pub async fn get_value(&self, key: &[S::Value]) -> Result<Option<Row<S::Value>>, io::Error> {
958        let key_len = self.schema.key().len();
959
960        self.get_row(key)
961            .map_ok(move |maybe_row| maybe_row.map(move |mut row| row.drain(key_len..).collect()))
962            .await
963    }
964}
965
966impl<S, C, FE, G> Table<S, S::Index, C, G>
967where
968    S: Schema,
969    C: Collate<Value = S::Value> + Clone + Send + Sync + 'static,
970    FE: AsType<Node<S::Value>> + Send + Sync + 'static,
971    G: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
972    Node<S::Value>: FileLoad,
973    Range<S::Id, S::Value>: fmt::Debug,
974{
975    /// Count how many rows in this [`Table`] lie within the given `range`.
976    pub async fn count(&self, range: Range<S::Id, S::Value>) -> Result<u64, io::Error> {
977        let range = range.into_inner();
978        let plan = self.schema.plan_query(&range, &[], self.schema.key())?;
979        self.state.count(plan, range, self.schema.key()).await
980    }
981
982    /// Return `true` if the given [`Range`] of this [`Table`] does not contain any rows.
983    pub async fn is_empty(&self, range: Range<S::Id, S::Value>) -> Result<bool, io::Error> {
984        let range = range.into_inner();
985        let plan = self.schema.plan_query(&range, &[], Default::default())?;
986        self.state.is_empty(plan, range, self.schema.key()).await
987    }
988
989    /// Construct a [`Stream`] of the `select`ed columns of the [`Rows`] within the given `range`.
990    pub async fn rows<'a>(
991        &'a self,
992        range: Range<S::Id, S::Value>,
993        order: &'a [S::Id],
994        reverse: bool,
995        select: Option<&'a [S::Id]>,
996    ) -> Result<Rows<S::Value>, io::Error> {
997        #[cfg(feature = "logging")]
998        log::debug!("Table::rows with order {order:?}");
999
1000        let range = range.into_inner();
1001        let select = select.unwrap_or(self.schema.primary().columns());
1002        let plan = self.schema.plan_query(&range, order, self.schema.key())?;
1003
1004        self.state
1005            .rows(plan, range, reverse, select, self.schema.key())
1006            .await
1007    }
1008
1009    /// Consume this [`TableReadGuard`] to construct a [`Stream`] of all the rows in the [`Table`].
1010    pub async fn into_rows(self) -> Result<Rows<S::Value>, io::Error> {
1011        let rows = self.rows(Range::default(), &[], false, None).await?;
1012        Ok(Box::pin(rows))
1013    }
1014}
1015
1016impl<S: fmt::Debug, IS, C, G> fmt::Debug for Table<S, IS, C, G> {
1017    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1018        write!(f, "table with schema {:?}", self.schema.inner())
1019    }
1020}
1021
1022impl<S, IS, C, FE> Table<S, IS, C, DirWriteGuardOwned<FE>> {
1023    /// Downgrade this write lock to a read lock.
1024    pub fn downgrade(self) -> Table<S, IS, C, Arc<DirReadGuardOwned<FE>>> {
1025        Table {
1026            schema: self.schema,
1027            state: self.state.downgrade(),
1028        }
1029    }
1030}
1031
1032impl<S, C, FE> Table<S, S::Index, C, DirWriteGuardOwned<FE>>
1033where
1034    S: Schema + Send + Sync,
1035    C: Collate<Value = S::Value> + Clone + Send + Sync + 'static,
1036    FE: AsType<Node<S::Value>> + Send + Sync + 'static,
1037    <S as Schema>::Index: Send + Sync,
1038    Node<S::Value>: FileLoad,
1039{
1040    /// Delete a row from this [`Table`] by its `key`.
1041    /// Returns `true` if the given `key` was present.
1042    pub async fn delete_row(&mut self, key: &[S::Value]) -> Result<bool, io::Error> {
1043        let key_len = self.schema.key().len();
1044
1045        if key.len() == key_len {
1046            self.state.delete_row(key).await
1047        } else {
1048            Err(bad_key(&key, key_len))
1049        }
1050    }
1051
1052    /// Delete all rows in the given `range` from this [`Table`].
1053    pub async fn delete_range(
1054        &mut self,
1055        range: Range<S::Id, S::Value>,
1056    ) -> Result<usize, io::Error> {
1057        #[cfg(feature = "logging")]
1058        log::debug!("Table::delete_range {range:?}");
1059
1060        let range = range.into_inner();
1061        let plan = self.schema.plan_query(&range, &[], self.schema.key())?;
1062
1063        self.state
1064            .delete_range(plan, range, self.schema.key())
1065            .await
1066    }
1067
1068    /// Delete all rows from the `other` table from this one.
1069    /// The `other` table **must** have an identical schema and collation.
1070    pub async fn delete_all(
1071        &mut self,
1072        other: TableReadGuard<S, S::Index, C, FE>,
1073    ) -> Result<(), io::Error> {
1074        // no need to check the collator for equality, that will be done in the index operations
1075
1076        // but do check that the indices to merge are the same
1077        if self.schema != other.schema {
1078            return Err(io::Error::new(
1079                io::ErrorKind::InvalidInput,
1080                format!(
1081                    "cannot delete the contents of a table with schema {:?} from one with schema {:?}",
1082                    other.schema.inner(), self.schema.inner()
1083                ),
1084            )
1085            .into());
1086        }
1087
1088        self.state.delete_all(other.state).await
1089    }
1090
1091    /// Insert all rows from the `other` table into this one.
1092    /// The `other` table **must** have an identical schema and collation.
1093    pub async fn merge(
1094        &mut self,
1095        other: TableReadGuard<S, S::Index, C, FE>,
1096    ) -> Result<(), io::Error> {
1097        // no need to check the collator for equality, that will be done in the merge operations
1098
1099        // but do check that the indices to merge are the same
1100        if self.schema != other.schema {
1101            return Err(io::Error::new(
1102                io::ErrorKind::InvalidInput,
1103                format!(
1104                    "cannot merge a table with schema {:?} into one with schema {:?}",
1105                    other.schema.inner(),
1106                    self.schema.inner()
1107                ),
1108            )
1109            .into());
1110        }
1111
1112        self.state.merge(other.state).await
1113    }
1114
1115    /// Insert or update a row in this [`Table`].
1116    /// Returns `true` if a new row was inserted.
1117    pub async fn upsert(
1118        &mut self,
1119        key: Vec<S::Value>,
1120        values: Vec<S::Value>,
1121    ) -> Result<bool, S::Error> {
1122        let key = self.schema.validate_key(key)?;
1123        let values = self.schema.validate_values(values)?;
1124
1125        let mut row = Vec::with_capacity(key.len() + values.len());
1126        row.extend(key);
1127        row.extend(values);
1128
1129        self.state.upsert(row).map_err(S::Error::from).await
1130    }
1131
1132    /// Delete all rows from this [`Table`].
1133    pub async fn truncate(&mut self) -> Result<(), io::Error> {
1134        #[cfg(feature = "logging")]
1135        log::debug!("Table::truncate");
1136
1137        self.state.truncate().await
1138    }
1139}
1140
1141#[inline]
1142fn borrow_columns<'a, K, V>(row: &'a [V], columns_in: &[K], columns_out: &[K]) -> Key<&'a V>
1143where
1144    K: Eq,
1145{
1146    assert_eq!(row.len(), columns_in.len());
1147
1148    debug_assert!(columns_out
1149        .iter()
1150        .all(|col_name| columns_in.contains(col_name)));
1151
1152    columns_out
1153        .iter()
1154        .filter_map(|col_name| columns_in.iter().position(|c| c == col_name))
1155        .map(|i| &row[i])
1156        .collect()
1157}
1158
1159#[inline]
1160fn clone_columns<K, V>(row: &[V], columns_in: &[K], columns_out: &[K]) -> Vec<V>
1161where
1162    K: Eq,
1163    V: Clone,
1164{
1165    assert_eq!(row.len(), columns_in.len());
1166
1167    debug_assert!(columns_out
1168        .iter()
1169        .all(|col_name| columns_in.contains(col_name)));
1170
1171    columns_out
1172        .iter()
1173        .filter_map(|col_name| columns_in.iter().position(|c| c == col_name))
1174        .map(|i| row[i].clone())
1175        .collect()
1176}
1177
1178#[inline]
1179fn extract_columns<K, V>(mut row: Key<V>, columns_in: &[K], columns_out: &[K]) -> Key<V>
1180where
1181    K: Eq + fmt::Debug,
1182    V: Default + Clone + fmt::Debug,
1183{
1184    assert_eq!(
1185        row.len(),
1186        columns_in.len(),
1187        "row {row:?} does not match column schema {columns_in:?}"
1188    );
1189
1190    debug_assert!(
1191        columns_out
1192            .iter()
1193            .all(|col_name| columns_in.contains(col_name)),
1194        "input columns {columns_in:?} are missing some output columns {columns_out:?}"
1195    );
1196
1197    let mut selection = smallvec![V::default(); columns_out.len()];
1198
1199    for (i_to, name_out) in columns_out.iter().enumerate() {
1200        let i_from = columns_in
1201            .iter()
1202            .position(|name_in| name_in == name_out)
1203            .expect("column index");
1204
1205        mem::swap(&mut row[i_from], &mut selection[i_to]);
1206    }
1207
1208    selection
1209}
1210
1211#[inline]
1212fn bad_key<V: fmt::Debug>(key: &[V], key_len: usize) -> io::Error {
1213    io::Error::new(
1214        io::ErrorKind::InvalidInput,
1215        format!("invalid key: {key:?}, expected exactly {key_len} column(s)",),
1216    )
1217}
1218
1219#[inline]
1220fn valid_schema<S: Schema>(schema: &S) -> Result<(), io::Error> {
1221    if schema.primary().columns().is_empty() {
1222        return Err(io::Error::new(
1223            io::ErrorKind::InvalidInput,
1224            format!("{schema:?} contains no columns"),
1225        ));
1226    }
1227
1228    for (index_name, index) in schema.auxiliary() {
1229        if index.columns().is_empty() {
1230            return Err(io::Error::new(
1231                io::ErrorKind::InvalidInput,
1232                format!("index {index_name} is empty"),
1233            ));
1234        }
1235
1236        for col_name in index.columns() {
1237            if !schema.primary().columns().contains(col_name) {
1238                return Err(io::Error::new(
1239                    io::ErrorKind::InvalidInput,
1240                    format!("index {index_name} refers to unknown column {col_name}"),
1241                ));
1242            }
1243        }
1244
1245        // note: it's inefficient to remove this requirement
1246        // because it would break the assumption
1247        // that every key constructed by merging two indices exists in the primary index
1248        for col_name in schema.key() {
1249            if !index.columns().contains(col_name) {
1250                return Err(io::Error::new(
1251                    io::ErrorKind::InvalidInput,
1252                    format!("index {index_name} is missing primary key column {col_name}"),
1253                ));
1254            }
1255        }
1256    }
1257
1258    Ok(())
1259}