1use std::collections::{BTreeMap, HashMap};
2use std::hash::Hash;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::{fmt, io, mem};
6
7use b_tree::collate::Collate;
8use b_tree::{BTree, BTreeLock, Key};
9use freqfs::{DirDeref, DirLock, DirReadGuardOwned, DirWriteGuardOwned, FileLoad};
10use futures::future::{try_join_all, TryFutureExt};
11use futures::stream::{Stream, TryStreamExt};
12use safecast::AsType;
13use smallvec::{smallvec, SmallVec};
14
15use super::plan::{IndexQuery, QueryPlan};
16use super::schema::*;
17use super::{IndexStack, Node};
18
19const PRIMARY: &str = "primary";
20
21pub const ROW_STACK_SIZE: usize = 32;
23
24pub type TableReadGuard<S, IS, C, FE> = Table<S, IS, C, Arc<DirReadGuardOwned<FE>>>;
26
27pub type TableWriteGuard<S, IS, C, FE> = Table<S, IS, C, DirWriteGuardOwned<FE>>;
29
30pub type Row<V> = SmallVec<[V; ROW_STACK_SIZE]>;
32
33pub type Rows<V> = Pin<Box<dyn Stream<Item = Result<Row<V>, io::Error>> + Send>>;
35
36pub struct TableLock<S, IS, C, FE> {
38 schema: Arc<TableSchema<S>>,
39 dir: DirLock<FE>,
40 primary: BTreeLock<IS, C, FE>,
41 auxiliary: BTreeMap<Arc<str>, BTreeLock<IS, C, FE>>,
43}
44
45impl<S, IS, C, FE> Clone for TableLock<S, IS, C, FE>
46where
47 C: Clone,
48{
49 fn clone(&self) -> Self {
50 Self {
51 schema: self.schema.clone(),
52 dir: self.dir.clone(),
53 primary: self.primary.clone(),
54 auxiliary: self.auxiliary.clone(),
55 }
56 }
57}
58
59impl<S, IS, C, FE> TableLock<S, IS, C, FE> {
60 pub fn schema(&self) -> &S {
62 self.schema.inner()
63 }
64
65 pub fn collator(&self) -> &b_tree::Collator<C> {
67 self.primary.collator()
68 }
69}
70
71impl<S, C, FE> TableLock<S, S::Index, C, FE>
72where
73 S: Schema,
74 C: Clone,
75 FE: AsType<Node<S::Value>> + Send + Sync,
76 Node<S::Value>: FileLoad,
77{
78 pub fn create(schema: S, collator: C, dir: DirLock<FE>) -> Result<Self, io::Error> {
80 valid_schema(&schema)?;
81
82 let mut dir_contents = dir.try_write()?;
83
84 let primary = {
85 let dir = dir_contents.create_dir(PRIMARY.to_string())?;
86 BTreeLock::create(schema.primary().clone(), collator.clone(), dir)
87 }?;
88
89 let mut auxiliary = BTreeMap::new();
90 for (name, schema) in schema.auxiliary() {
91 let index = {
92 let dir = dir_contents.create_dir(name.to_string())?;
93 BTreeLock::create(schema.clone(), collator.clone(), dir)
94 }?;
95
96 auxiliary.insert(name.clone().into(), index);
97 }
98
99 std::mem::drop(dir_contents);
100
101 Ok(Self {
102 schema: Arc::new(schema.into()),
103 primary,
104 auxiliary,
105 dir,
106 })
107 }
108
109 pub fn load(schema: S, collator: C, dir: DirLock<FE>) -> Result<Self, io::Error> {
111 valid_schema(&schema)?;
112
113 let mut dir_contents = dir.try_write()?;
114
115 let primary = {
116 let dir = dir_contents.get_or_create_dir(PRIMARY.to_string())?;
117 BTreeLock::load(schema.primary().clone(), collator.clone(), dir.clone())
118 }?;
119
120 let mut auxiliary = BTreeMap::new();
121 for (name, schema) in schema.auxiliary() {
122 let index = {
123 let dir = dir_contents.get_or_create_dir(name.clone())?;
124 BTreeLock::load(schema.clone(), collator.clone(), dir.clone())
125 }?;
126
127 auxiliary.insert(name.clone().into(), index);
128 }
129
130 std::mem::drop(dir_contents);
131
132 Ok(Self {
133 schema: Arc::new(schema.into()),
134 primary,
135 auxiliary,
136 dir,
137 })
138 }
139
140 pub async fn sync(&self) -> Result<(), io::Error>
141 where
142 FE: for<'a> freqfs::FileSave<'a>,
143 {
144 self.dir.sync().await
145 }
146}
147
148impl<S, C, FE> TableLock<S, S::Index, C, FE>
149where
150 S: Schema,
151 C: Clone,
152 FE: Send + Sync,
153 Node<S::Value>: FileLoad,
154{
155 pub async fn read(&self) -> TableReadGuard<S, S::Index, C, FE> {
157 #[cfg(feature = "logging")]
158 log::debug!("locking table for reading...");
159
160 let schema = self.schema.clone();
161
162 let primary = self.primary.read().await;
164
165 #[cfg(feature = "logging")]
166 log::trace!("locked primary index for reading");
167
168 let mut auxiliary = HashMap::with_capacity(self.auxiliary.len());
170 for (name, index) in &self.auxiliary {
171 let index = index.read().await;
172 auxiliary.insert(name.clone(), index);
173
174 #[cfg(feature = "logging")]
175 log::trace!("locked index {name} for reading");
176 }
177
178 Table {
179 schema,
180 state: TableState { auxiliary, primary },
181 }
182 }
183
184 pub async fn into_read(self) -> TableReadGuard<S, S::Index, C, FE> {
186 #[cfg(feature = "logging")]
187 log::debug!("locking table for reading...");
188
189 let schema = self.schema.clone();
190
191 let primary = self.primary.into_read().await;
193
194 #[cfg(feature = "logging")]
195 log::trace!("locked primary index for reading");
196
197 let mut auxiliary = HashMap::with_capacity(self.auxiliary.len());
199 for (name, index) in self.auxiliary {
200 let index = index.into_read().await;
201
202 #[cfg(feature = "logging")]
203 log::trace!("locked index {name} for reading");
204
205 auxiliary.insert(name, index);
206 }
207
208 Table {
209 schema,
210 state: TableState { auxiliary, primary },
211 }
212 }
213
214 pub fn try_read(&self) -> Result<TableReadGuard<S, S::Index, C, FE>, io::Error> {
216 #[cfg(feature = "logging")]
217 log::debug!("locking table for reading...");
218
219 let schema = self.schema.clone();
220
221 let primary = self.primary.try_read()?;
223
224 #[cfg(feature = "logging")]
225 log::trace!("locked primary index for reading");
226
227 let mut auxiliary = HashMap::with_capacity(self.auxiliary.len());
229 for (name, index) in self.auxiliary.iter() {
230 let index = index.try_read()?;
231 auxiliary.insert(name.clone(), index);
232
233 #[cfg(feature = "logging")]
234 log::trace!("locked index {name} for reading");
235 }
236
237 Ok(Table {
238 schema,
239 state: TableState { auxiliary, primary },
240 })
241 }
242
243 pub async fn write(&self) -> TableWriteGuard<S, S::Index, C, FE> {
245 #[cfg(feature = "logging")]
246 log::debug!("locking table for writing...");
247
248 let schema = self.schema.clone();
249
250 let primary = self.primary.write().await;
252
253 #[cfg(feature = "logging")]
254 log::trace!("locked primary index for writing");
255
256 let mut auxiliary = HashMap::with_capacity(self.auxiliary.len());
258 for (name, index) in self.auxiliary.iter() {
259 let index = index.write().await;
260 auxiliary.insert(name.clone(), index);
261
262 #[cfg(feature = "logging")]
263 log::trace!("locked index {name} for writing");
264 }
265
266 Table {
267 schema,
268 state: TableState { auxiliary, primary },
269 }
270 }
271
272 pub async fn into_write(self) -> TableWriteGuard<S, S::Index, C, FE> {
274 #[cfg(feature = "logging")]
275 log::debug!("locking table for reading...");
276
277 let schema = self.schema.clone();
278
279 let primary = self.primary.into_write().await;
281
282 #[cfg(feature = "logging")]
283 log::trace!("locked primary index for writing");
284
285 let mut auxiliary = HashMap::with_capacity(self.auxiliary.len());
287 for (name, index) in self.auxiliary.into_iter() {
288 let index = index.into_write().await;
289
290 #[cfg(feature = "logging")]
291 log::trace!("locked index {name} for writing");
292
293 auxiliary.insert(name, index);
294 }
295
296 Table {
297 schema,
298 state: TableState { auxiliary, primary },
299 }
300 }
301
302 pub fn try_write(&self) -> Result<TableWriteGuard<S, S::Index, C, FE>, io::Error> {
304 #[cfg(feature = "logging")]
305 log::debug!("locking table for writing...");
306
307 let schema = self.schema.clone();
308
309 let primary = self.primary.try_write()?;
311
312 #[cfg(feature = "logging")]
313 log::trace!("locked primary index for writing");
314
315 let mut auxiliary = HashMap::with_capacity(self.auxiliary.len());
317 for (name, index) in self.auxiliary.iter() {
318 let index = index.try_write()?;
319 auxiliary.insert(name.clone(), index);
320
321 #[cfg(feature = "logging")]
322 log::trace!("locked index {name} for writing");
323 }
324
325 Ok(Table {
326 schema,
327 state: TableState { auxiliary, primary },
328 })
329 }
330}
331
332struct TableState<IS, C, G> {
333 auxiliary: HashMap<Arc<str>, BTree<IS, C, G>>,
335 primary: BTree<IS, C, G>,
336}
337
338impl<IS, C: Clone, G: Clone> Clone for TableState<IS, C, G> {
339 fn clone(&self) -> Self {
340 Self {
341 primary: self.primary.clone(),
342 auxiliary: self.auxiliary.clone(),
343 }
344 }
345}
346
347impl<IS, C, G> TableState<IS, C, G> {
348 #[inline]
349 fn get_index<'a, Id>(&'a self, index_id: Id) -> Option<&'a BTree<IS, C, G>>
350 where
351 IndexId<'a>: From<Id>,
352 {
353 match index_id.into() {
354 IndexId::Primary => Some(&self.primary),
355 IndexId::Auxiliary(index_id) => self.auxiliary.get(index_id),
356 }
357 }
358}
359impl<IS, C, FE, G> TableState<IS, C, G>
360where
361 IS: IndexSchema,
362 C: Collate<Value = IS::Value> + 'static,
363 FE: AsType<Node<IS::Value>> + Send + Sync + 'static,
364 G: DirDeref<Entry = FE> + 'static,
365 Node<IS::Value>: FileLoad,
366 Range<IS::Id, IS::Value>: fmt::Debug,
367{
368 async fn contains(&self, prefix: &[IS::Value]) -> Result<bool, io::Error> {
369 self.primary.contains(prefix).await
370 }
371
372 async fn get_row(&self, key: &[IS::Value]) -> Result<Option<Row<IS::Value>>, io::Error> {
373 self.primary.first(b_tree::Range::from_prefix(key)).await
374 }
375
376 async fn first<'a>(
377 &self,
378 plan: &QueryPlan<'a, IS::Id>,
379 range: &HashMap<IS::Id, ColumnRange<IS::Value>>,
380 select: &[IS::Id],
381 key_columns: &[IS::Id],
382 ) -> Result<Option<Row<IS::Value>>, io::Error> {
383 let mut plan = plan.indices.iter();
384
385 let (mut first, mut columns) = if let Some((index_id, _query)) = plan.next() {
386 let index = self.get_index(*index_id).expect("index");
387 let columns = index.schema().columns();
388 let index_range = index_range_borrow(columns, range);
389
390 if let Some(first) = index.first(index_range).await? {
391 (first, columns)
392 } else {
393 return Ok(None);
394 }
395 } else {
396 let index_range = index_range_borrow(self.primary.schema().columns(), range);
397
398 return self
399 .primary
400 .first(index_range)
401 .map_ok(|first| {
402 first.map(|first| {
403 extract_columns(first, self.primary.schema().columns(), select)
404 })
405 })
406 .await;
407 };
408
409 for (index_id, _query) in plan {
410 let index = self.get_index(*index_id).expect("index");
411
412 columns = index.schema().columns();
413
414 let index_range = index_range_borrow(&columns, range);
415
416 first = if let Some(key) = index.first(index_range).await? {
417 key
418 } else {
419 return Ok(None);
420 }
421 }
422
423 if !select.iter().all(|col_name| columns.contains(col_name)) {
424 let pk = extract_columns(first, columns, key_columns);
425
426 first = self
427 .get_row(&pk)
428 .map_ok(|maybe_row| maybe_row.expect("row"))
429 .await?;
430
431 columns = self.primary.schema().columns();
432 }
433
434 Ok(Some(extract_columns(first, columns, select)))
435 }
436}
437
438impl<IS, C, FE, G> TableState<IS, C, G>
439where
440 IS: IndexSchema,
441 C: Collate<Value = IS::Value> + Clone + Send + Sync + 'static,
442 FE: AsType<Node<IS::Value>> + Send + Sync + 'static,
443 G: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
444 Node<IS::Value>: FileLoad,
445 Range<IS::Id, IS::Value>: fmt::Debug,
446{
447 async fn count<'a>(
448 &'a self,
449 plan: QueryPlan<'a, IS::Id>,
450 range: HashMap<IS::Id, ColumnRange<IS::Value>>,
451 key_columns: &'a [IS::Id],
452 ) -> Result<u64, io::Error> {
453 let mut rows = self
455 .rows(plan, range, false, key_columns, key_columns)
456 .await?;
457
458 let mut count = 0;
459 while let Some(_row) = rows.try_next().await? {
460 count += 1;
461 }
462
463 Ok(count)
464 }
465
466 async fn is_empty<'a>(
467 &'a self,
468 plan: QueryPlan<'a, IS::Id>,
469 range: HashMap<IS::Id, ColumnRange<IS::Value>>,
470 key_columns: &'a [IS::Id],
471 ) -> Result<bool, io::Error> {
472 self.first(&plan, &range, key_columns, key_columns)
473 .map_ok(|maybe_row| maybe_row.is_none())
474 .await
475 }
476
477 async fn rows<'a>(
479 &'a self,
480 mut plan: QueryPlan<'a, IS::Id>,
481 mut range: HashMap<IS::Id, ColumnRange<IS::Value>>,
482 reverse: bool,
483 select: &'a [IS::Id],
484 key_columns: &'a [IS::Id],
485 ) -> Result<Rows<IS::Value>, io::Error> {
486 #[cfg(feature = "logging")]
487 log::debug!("construct row stream with plan {plan:?}");
488
489 let mut keys: Option<(b_tree::Keys<IS::Value>, &'a [IS::Id])> = None;
490
491 let last_query = plan.indices.pop();
492
493 if let Some((index_id, query)) = plan.indices.first() {
494 assert_eq!(query.prefix_len(), 0);
495 let index = self.get_index(*index_id).expect("index");
496
497 let columns = &index.schema().columns()[..query.selected()];
498 assert!(query.range().iter().zip(columns).all(|(r, c)| *r == c));
499
500 let index_range = index_range_for(&columns[..query.range().len()], &mut range);
501 let index_prefixes = index
502 .clone()
503 .groups(index_range, columns.len(), reverse)
504 .await?;
505
506 keys = Some((index_prefixes, columns));
507 }
508
509 for (index_id, query) in plan.indices.into_iter().skip(1) {
511 let index = self.get_index(index_id).expect("index");
514
515 let (prefixes, columns_in) = keys.take().expect("prefixes");
516
517 let columns_out = &index.schema().columns()[..query.selected()];
518
519 assert_eq!(query.prefix_len(), columns_in.len());
520 assert!(columns_out.len() > columns_in.len());
521 assert!(query.range().iter().zip(columns_out).all(|(r, c)| *r == c));
522
523 debug_assert!(columns_out
524 .iter()
525 .take(columns_in.len())
526 .all(|c| columns_in.contains(c)));
527
528 let extract_prefix = prefix_extractor(columns_in, &columns_out[..columns_in.len()]);
529
530 let inner_range = inner_range_for(&query, &mut range);
531
532 let n = columns_out.len();
533 let index = index.clone();
534
535 let index_prefixes = prefixes
536 .map_ok(extract_prefix)
537 .map_ok(move |prefix| inner_range.clone().prepend(prefix))
538 .map_ok(move |index_range| {
539 let index = index.clone();
540 async move { index.groups(index_range, n, reverse).await }
541 })
542 .try_buffered(num_cpus::get())
543 .try_flatten();
544
545 keys = Some((Box::pin(index_prefixes), columns_out))
546 }
547
548 if let Some((index_id, query)) = last_query {
549 if let Some((prefixes, columns_in)) = keys.take() {
550 assert_eq!(query.prefix_len(), columns_in.len());
553
554 let index = self.get_index(index_id).expect("index");
555
556 let columns_out = &index.schema().columns();
557
558 debug_assert!(
559 columns_out.len() > columns_in.len(),
560 "cannot select {columns_out:?} with prefix {columns_in:?}"
561 );
562
563 debug_assert!(columns_out
564 .iter()
565 .take(columns_in.len())
566 .all(|c| columns_in.contains(c)));
567
568 let extract_prefix = prefix_extractor(columns_in, &columns_out[..columns_in.len()]);
569
570 let inner_range = inner_range_for(&query, &mut range);
571
572 let index = index.clone();
573
574 let index_keys = prefixes
575 .map_ok(extract_prefix)
576 .map_ok(move |prefix| inner_range.clone().prepend(prefix))
577 .map_ok(move |index_range| {
578 let index = index.clone();
579 async move { index.keys(index_range, reverse).await }
580 })
581 .try_buffered(num_cpus::get())
582 .try_flatten();
583
584 keys = Some((Box::pin(index_keys), columns_out))
585 } else {
586 let index = self.get_index(index_id).expect("index");
587 let columns = index.schema().columns();
588
589 let index_range = index_range_for(columns, &mut range);
590 assert!(range.is_empty());
591
592 let index_keys = index.clone().keys(index_range, reverse).await?;
593 keys = Some((Box::pin(index_keys), columns));
594 }
595 }
596
597 let (keys, columns) = if let Some((keys, columns)) = keys {
598 if select.iter().all(|c| columns.contains(c)) {
599 (keys, columns)
601 } else {
602 let index = self.primary.clone();
605 let extract_prefix = prefix_extractor(columns, key_columns);
606
607 let rows = keys
608 .map_ok(extract_prefix)
609 .map_ok(move |primary_key| {
610 let index = index.clone();
611 async move { index.first(b_tree::Range::from(primary_key)).await }
612 })
613 .try_buffered(num_cpus::get())
614 .map_ok(|maybe_row| maybe_row.expect("row"));
615
616 let rows: Rows<IS::Value> = Box::pin(rows);
617 (rows, self.primary.schema().columns())
618 }
619 } else {
620 let columns = self.primary.schema().columns();
621 let index_range = index_range_for(columns, &mut range);
622 assert!(range.is_empty());
623 let keys = self.primary.clone().keys(index_range, reverse).await?;
624 (keys, columns)
625 };
626
627 if columns == select {
628 Ok(keys)
629 } else {
630 let extract_prefix = prefix_extractor(columns, select);
631 let rows = keys.map_ok(extract_prefix);
632 Ok(Box::pin(rows))
633 }
634 }
635}
636
637#[inline]
638fn index_range_borrow<'a, K: Eq + Hash, V>(
639 columns: &[K],
640 range: &'a HashMap<K, ColumnRange<V>>,
641) -> b_tree::Range<&'a V> {
642 let mut prefix = Key::with_capacity(range.len());
643
644 for col_name in columns {
645 if let Some(col_range) = range.get(col_name) {
646 match col_range {
647 ColumnRange::Eq(value) => {
648 prefix.push(value);
649 }
650 ColumnRange::In((start, end)) => {
651 return b_tree::Range::with_bounds(prefix, (start.as_ref(), end.as_ref()));
652 }
653 }
654 } else {
655 break;
656 }
657 }
658
659 b_tree::Range::from_prefix(prefix)
660}
661
662#[inline]
663fn index_range_for<'a, K: Eq + Hash, V>(
664 columns: &[K],
665 range: &mut HashMap<K, ColumnRange<V>>,
666) -> b_tree::Range<V> {
667 let mut prefix = Key::with_capacity(range.len());
668
669 for col_name in columns {
670 if let Some(col_range) = range.remove(col_name) {
671 match col_range {
672 ColumnRange::Eq(value) => {
673 prefix.push(value);
674 }
675 ColumnRange::In(bounds) => {
676 return b_tree::Range::with_bounds(prefix, bounds);
677 }
678 }
679 } else {
680 break;
681 }
682 }
683
684 b_tree::Range::from_prefix(prefix)
685}
686
687#[inline]
688fn inner_range_for<'a, K, V>(
689 query: &IndexQuery<'a, K>,
690 range: &HashMap<K, ColumnRange<V>>,
691) -> b_tree::Range<V>
692where
693 K: Eq + Hash + fmt::Debug,
694 V: Clone,
695{
696 let mut inner_range = Key::with_capacity(query.range().len());
697 let mut range_columns = query.range().into_iter();
698
699 let inner_range = loop {
700 if let Some(col_name) = range_columns.next() {
701 match range.get(col_name).cloned().expect("column range") {
702 ColumnRange::Eq(value) => inner_range.push(value),
703 ColumnRange::In(bounds) => break b_tree::Range::with_bounds(inner_range, bounds),
704 }
705 } else {
706 break b_tree::Range::from(inner_range);
707 }
708 };
709
710 assert!(range_columns.next().is_none());
711
712 inner_range
713}
714
715fn prefix_extractor<K, V>(columns_in: &[K], columns_out: &[K]) -> impl Fn(Key<V>) -> Key<V> + Send
716where
717 K: PartialEq + fmt::Debug,
718 V: Default + Clone,
719{
720 debug_assert!(columns_out.len() <= columns_in.len());
721 debug_assert!(!columns_out.is_empty());
722 debug_assert!(
723 columns_out.iter().all(|id| columns_in.contains(&id)),
724 "{columns_out:?} is not a subset of {columns_in:?}"
725 );
726
727 #[cfg(feature = "logging")]
728 log::trace!("extract columns {columns_out:?} from {columns_in:?}");
729
730 let indices = columns_out
731 .iter()
732 .map(|name_out| {
733 columns_in
734 .iter()
735 .position(|name_in| name_in == name_out)
736 .expect("column index")
737 })
738 .collect::<IndexStack<_>>();
739
740 move |mut key| {
741 let mut prefix = smallvec![V::default(); indices.len()];
742
743 for (i_to, i_from) in indices.iter().copied().enumerate() {
744 mem::swap(&mut key[i_from], &mut prefix[i_to]);
745 }
746
747 prefix
748 }
749}
750
751impl<IS, C, FE> TableState<IS, C, DirWriteGuardOwned<FE>>
752where
753 IS: IndexSchema + Send + Sync,
754 C: Collate<Value = IS::Value> + Clone + Send + Sync + 'static,
755 FE: AsType<Node<IS::Value>> + Send + Sync + 'static,
756 DirWriteGuardOwned<FE>: DirDeref<Entry = FE>,
757 Node<IS::Value>: FileLoad,
758{
759 async fn delete_row(&mut self, key: &[IS::Value]) -> Result<bool, io::Error> {
760 let row = if let Some(row) = self.get_row(key).await? {
761 row
762 } else {
763 return Ok(false);
764 };
765
766 let mut deletes = IndexStack::with_capacity(self.auxiliary.len() + 1);
767
768 for (_name, index) in self.auxiliary.iter_mut() {
769 let index_key = borrow_columns(
770 &row,
771 self.primary.schema().columns(),
772 index.schema().columns(),
773 );
774
775 deletes.push(async move { index.delete(&index_key).await })
776 }
777
778 self.primary.delete(&row).await?;
779
780 for present in try_join_all(deletes).await? {
781 assert!(present, "table index is out of sync");
782 }
783
784 Ok(true)
785 }
786
787 async fn delete_range<'a>(
788 &mut self,
789 plan: QueryPlan<'a, IS::Id>,
790 range: HashMap<IS::Id, ColumnRange<IS::Value>>,
791 key_columns: &[IS::Id],
792 ) -> Result<usize, io::Error> {
793 let mut deleted = 0;
794
795 while let Some(pk) = self.first(&plan, &range, key_columns, key_columns).await? {
796 self.delete_row(&pk).await?;
797 deleted += 1;
798 }
799
800 Ok(deleted)
801 }
802
803 async fn delete_all<OG>(&mut self, mut other: TableState<IS, C, OG>) -> Result<(), io::Error>
804 where
805 OG: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
806 {
807 let mut deletes = IndexStack::with_capacity(self.auxiliary.len() + 1);
808
809 deletes.push(self.primary.delete_all(other.primary));
810
811 for (name, this) in self.auxiliary.iter_mut() {
812 let that = other.auxiliary.remove(name).expect("other index");
813 deletes.push(this.delete_all(that));
814 }
815
816 try_join_all(deletes).await?;
817
818 Ok(())
819 }
820
821 async fn merge<OG>(&mut self, mut other: TableState<IS, C, OG>) -> Result<(), io::Error>
822 where
823 OG: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
824 {
825 let mut merges = IndexStack::with_capacity(self.auxiliary.len() + 1);
826
827 merges.push(self.primary.merge(other.primary));
828
829 for (name, this) in self.auxiliary.iter_mut() {
830 let that = other.auxiliary.remove(name).expect("other index");
831 merges.push(this.merge(that));
832 }
833
834 try_join_all(merges).await?;
835
836 Ok(())
837 }
838
839 async fn upsert(&mut self, row: Vec<IS::Value>) -> Result<bool, io::Error> {
840 let mut inserts = IndexStack::with_capacity(self.auxiliary.len() + 1);
841
842 for (_name, index) in self.auxiliary.iter_mut() {
843 let index_key = clone_columns(
844 &row,
845 self.primary.schema().columns(),
846 index.schema().columns(),
847 );
848
849 inserts.push(index.insert(index_key));
850 }
851
852 inserts.push(self.primary.insert(row));
853
854 let mut inserts = try_join_all(inserts).await?;
855 let new = inserts.pop().expect("insert");
856 while let Some(index_new) = inserts.pop() {
857 assert_eq!(new, index_new, "index out of sync");
858 }
859
860 Ok(new)
861 }
862
863 async fn truncate(&mut self) -> Result<(), io::Error> {
864 let mut truncates = IndexStack::with_capacity(self.auxiliary.len() + 1);
865 truncates.push(self.primary.truncate());
866
867 for index in self.auxiliary.values_mut() {
868 truncates.push(index.truncate());
869 }
870
871 try_join_all(truncates).await?;
872
873 Ok(())
874 }
875}
876
877impl<IS, C, FE> TableState<IS, C, DirWriteGuardOwned<FE>> {
878 fn downgrade(self) -> TableState<IS, C, Arc<DirReadGuardOwned<FE>>> {
879 TableState {
880 primary: self.primary.downgrade(),
881 auxiliary: self
882 .auxiliary
883 .into_iter()
884 .map(|(name, index)| (name, index.downgrade()))
885 .collect(),
886 }
887 }
888}
889
890pub struct Table<S, IS, C, G> {
892 schema: Arc<TableSchema<S>>,
893 state: TableState<IS, C, G>,
894}
895
896impl<S, IS, C, G> Clone for Table<S, IS, C, G>
897where
898 C: Clone,
899 G: Clone,
900{
901 fn clone(&self) -> Self {
902 Self {
903 schema: self.schema.clone(),
904 state: self.state.clone(),
905 }
906 }
907}
908
909impl<S, C, FE, G> Table<S, S::Index, C, G>
910where
911 S: Schema,
912 C: Collate<Value = S::Value> + 'static,
913 FE: AsType<Node<S::Value>> + Send + Sync + 'static,
914 G: DirDeref<Entry = FE> + 'static,
915 Node<S::Value>: FileLoad,
916 Range<S::Id, S::Value>: fmt::Debug,
917{
918 pub async fn contains(&self, key: &[S::Value]) -> Result<bool, io::Error> {
920 let key_len = self.schema.key().len();
921
922 if key.len() == key_len {
923 self.state.contains(key).await
924 } else {
925 Err(bad_key(key, key_len))
926 }
927 }
928
929 pub async fn first(
931 &self,
932 range: Range<S::Id, S::Value>,
933 order: &[S::Id],
934 select: Option<&[S::Id]>,
935 ) -> Result<Option<Row<S::Value>>, io::Error> {
936 let range = range.into_inner();
937 let select = select.unwrap_or(self.schema.key());
938 let plan = self.schema.plan_query(&range, order, self.schema.key())?;
939
940 self.state
941 .first(&plan, &range, select, self.schema.key())
942 .await
943 }
944
945 pub async fn get_row(&self, key: &[S::Value]) -> Result<Option<Row<S::Value>>, io::Error> {
947 let key_len = self.schema.key().len();
948
949 if key.len() == key_len {
950 self.state.get_row(&key).await
951 } else {
952 Err(bad_key(&key, key_len))
953 }
954 }
955
956 pub async fn get_value(&self, key: &[S::Value]) -> Result<Option<Row<S::Value>>, io::Error> {
958 let key_len = self.schema.key().len();
959
960 self.get_row(key)
961 .map_ok(move |maybe_row| maybe_row.map(move |mut row| row.drain(key_len..).collect()))
962 .await
963 }
964}
965
966impl<S, C, FE, G> Table<S, S::Index, C, G>
967where
968 S: Schema,
969 C: Collate<Value = S::Value> + Clone + Send + Sync + 'static,
970 FE: AsType<Node<S::Value>> + Send + Sync + 'static,
971 G: DirDeref<Entry = FE> + Clone + Send + Sync + 'static,
972 Node<S::Value>: FileLoad,
973 Range<S::Id, S::Value>: fmt::Debug,
974{
975 pub async fn count(&self, range: Range<S::Id, S::Value>) -> Result<u64, io::Error> {
977 let range = range.into_inner();
978 let plan = self.schema.plan_query(&range, &[], self.schema.key())?;
979 self.state.count(plan, range, self.schema.key()).await
980 }
981
982 pub async fn is_empty(&self, range: Range<S::Id, S::Value>) -> Result<bool, io::Error> {
984 let range = range.into_inner();
985 let plan = self.schema.plan_query(&range, &[], Default::default())?;
986 self.state.is_empty(plan, range, self.schema.key()).await
987 }
988
989 pub async fn rows<'a>(
991 &'a self,
992 range: Range<S::Id, S::Value>,
993 order: &'a [S::Id],
994 reverse: bool,
995 select: Option<&'a [S::Id]>,
996 ) -> Result<Rows<S::Value>, io::Error> {
997 #[cfg(feature = "logging")]
998 log::debug!("Table::rows with order {order:?}");
999
1000 let range = range.into_inner();
1001 let select = select.unwrap_or(self.schema.primary().columns());
1002 let plan = self.schema.plan_query(&range, order, self.schema.key())?;
1003
1004 self.state
1005 .rows(plan, range, reverse, select, self.schema.key())
1006 .await
1007 }
1008
1009 pub async fn into_rows(self) -> Result<Rows<S::Value>, io::Error> {
1011 let rows = self.rows(Range::default(), &[], false, None).await?;
1012 Ok(Box::pin(rows))
1013 }
1014}
1015
1016impl<S: fmt::Debug, IS, C, G> fmt::Debug for Table<S, IS, C, G> {
1017 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1018 write!(f, "table with schema {:?}", self.schema.inner())
1019 }
1020}
1021
1022impl<S, IS, C, FE> Table<S, IS, C, DirWriteGuardOwned<FE>> {
1023 pub fn downgrade(self) -> Table<S, IS, C, Arc<DirReadGuardOwned<FE>>> {
1025 Table {
1026 schema: self.schema,
1027 state: self.state.downgrade(),
1028 }
1029 }
1030}
1031
1032impl<S, C, FE> Table<S, S::Index, C, DirWriteGuardOwned<FE>>
1033where
1034 S: Schema + Send + Sync,
1035 C: Collate<Value = S::Value> + Clone + Send + Sync + 'static,
1036 FE: AsType<Node<S::Value>> + Send + Sync + 'static,
1037 <S as Schema>::Index: Send + Sync,
1038 Node<S::Value>: FileLoad,
1039{
1040 pub async fn delete_row(&mut self, key: &[S::Value]) -> Result<bool, io::Error> {
1043 let key_len = self.schema.key().len();
1044
1045 if key.len() == key_len {
1046 self.state.delete_row(key).await
1047 } else {
1048 Err(bad_key(&key, key_len))
1049 }
1050 }
1051
1052 pub async fn delete_range(
1054 &mut self,
1055 range: Range<S::Id, S::Value>,
1056 ) -> Result<usize, io::Error> {
1057 #[cfg(feature = "logging")]
1058 log::debug!("Table::delete_range {range:?}");
1059
1060 let range = range.into_inner();
1061 let plan = self.schema.plan_query(&range, &[], self.schema.key())?;
1062
1063 self.state
1064 .delete_range(plan, range, self.schema.key())
1065 .await
1066 }
1067
1068 pub async fn delete_all(
1071 &mut self,
1072 other: TableReadGuard<S, S::Index, C, FE>,
1073 ) -> Result<(), io::Error> {
1074 if self.schema != other.schema {
1078 return Err(io::Error::new(
1079 io::ErrorKind::InvalidInput,
1080 format!(
1081 "cannot delete the contents of a table with schema {:?} from one with schema {:?}",
1082 other.schema.inner(), self.schema.inner()
1083 ),
1084 )
1085 .into());
1086 }
1087
1088 self.state.delete_all(other.state).await
1089 }
1090
1091 pub async fn merge(
1094 &mut self,
1095 other: TableReadGuard<S, S::Index, C, FE>,
1096 ) -> Result<(), io::Error> {
1097 if self.schema != other.schema {
1101 return Err(io::Error::new(
1102 io::ErrorKind::InvalidInput,
1103 format!(
1104 "cannot merge a table with schema {:?} into one with schema {:?}",
1105 other.schema.inner(),
1106 self.schema.inner()
1107 ),
1108 )
1109 .into());
1110 }
1111
1112 self.state.merge(other.state).await
1113 }
1114
1115 pub async fn upsert(
1118 &mut self,
1119 key: Vec<S::Value>,
1120 values: Vec<S::Value>,
1121 ) -> Result<bool, S::Error> {
1122 let key = self.schema.validate_key(key)?;
1123 let values = self.schema.validate_values(values)?;
1124
1125 let mut row = Vec::with_capacity(key.len() + values.len());
1126 row.extend(key);
1127 row.extend(values);
1128
1129 self.state.upsert(row).map_err(S::Error::from).await
1130 }
1131
1132 pub async fn truncate(&mut self) -> Result<(), io::Error> {
1134 #[cfg(feature = "logging")]
1135 log::debug!("Table::truncate");
1136
1137 self.state.truncate().await
1138 }
1139}
1140
1141#[inline]
1142fn borrow_columns<'a, K, V>(row: &'a [V], columns_in: &[K], columns_out: &[K]) -> Key<&'a V>
1143where
1144 K: Eq,
1145{
1146 assert_eq!(row.len(), columns_in.len());
1147
1148 debug_assert!(columns_out
1149 .iter()
1150 .all(|col_name| columns_in.contains(col_name)));
1151
1152 columns_out
1153 .iter()
1154 .filter_map(|col_name| columns_in.iter().position(|c| c == col_name))
1155 .map(|i| &row[i])
1156 .collect()
1157}
1158
1159#[inline]
1160fn clone_columns<K, V>(row: &[V], columns_in: &[K], columns_out: &[K]) -> Vec<V>
1161where
1162 K: Eq,
1163 V: Clone,
1164{
1165 assert_eq!(row.len(), columns_in.len());
1166
1167 debug_assert!(columns_out
1168 .iter()
1169 .all(|col_name| columns_in.contains(col_name)));
1170
1171 columns_out
1172 .iter()
1173 .filter_map(|col_name| columns_in.iter().position(|c| c == col_name))
1174 .map(|i| row[i].clone())
1175 .collect()
1176}
1177
1178#[inline]
1179fn extract_columns<K, V>(mut row: Key<V>, columns_in: &[K], columns_out: &[K]) -> Key<V>
1180where
1181 K: Eq + fmt::Debug,
1182 V: Default + Clone + fmt::Debug,
1183{
1184 assert_eq!(
1185 row.len(),
1186 columns_in.len(),
1187 "row {row:?} does not match column schema {columns_in:?}"
1188 );
1189
1190 debug_assert!(
1191 columns_out
1192 .iter()
1193 .all(|col_name| columns_in.contains(col_name)),
1194 "input columns {columns_in:?} are missing some output columns {columns_out:?}"
1195 );
1196
1197 let mut selection = smallvec![V::default(); columns_out.len()];
1198
1199 for (i_to, name_out) in columns_out.iter().enumerate() {
1200 let i_from = columns_in
1201 .iter()
1202 .position(|name_in| name_in == name_out)
1203 .expect("column index");
1204
1205 mem::swap(&mut row[i_from], &mut selection[i_to]);
1206 }
1207
1208 selection
1209}
1210
1211#[inline]
1212fn bad_key<V: fmt::Debug>(key: &[V], key_len: usize) -> io::Error {
1213 io::Error::new(
1214 io::ErrorKind::InvalidInput,
1215 format!("invalid key: {key:?}, expected exactly {key_len} column(s)",),
1216 )
1217}
1218
1219#[inline]
1220fn valid_schema<S: Schema>(schema: &S) -> Result<(), io::Error> {
1221 if schema.primary().columns().is_empty() {
1222 return Err(io::Error::new(
1223 io::ErrorKind::InvalidInput,
1224 format!("{schema:?} contains no columns"),
1225 ));
1226 }
1227
1228 for (index_name, index) in schema.auxiliary() {
1229 if index.columns().is_empty() {
1230 return Err(io::Error::new(
1231 io::ErrorKind::InvalidInput,
1232 format!("index {index_name} is empty"),
1233 ));
1234 }
1235
1236 for col_name in index.columns() {
1237 if !schema.primary().columns().contains(col_name) {
1238 return Err(io::Error::new(
1239 io::ErrorKind::InvalidInput,
1240 format!("index {index_name} refers to unknown column {col_name}"),
1241 ));
1242 }
1243 }
1244
1245 for col_name in schema.key() {
1249 if !index.columns().contains(col_name) {
1250 return Err(io::Error::new(
1251 io::ErrorKind::InvalidInput,
1252 format!("index {index_name} is missing primary key column {col_name}"),
1253 ));
1254 }
1255 }
1256 }
1257
1258 Ok(())
1259}