use std::collections::{BTreeMap, HashMap, HashSet};
use std::convert::TryFrom;
use std::fmt;
use std::iter::{self, FromIterator};
use std::marker::PhantomData;
use std::sync::Arc;
use async_trait::async_trait;
use destream::de;
use futures::future::{self, join_all, try_join_all, TryFutureExt};
use futures::stream::TryStreamExt;
use log::debug;
use tc_btree::{BTreeFile, BTreeInstance, BTreeWrite, Node, NodeId};
use tc_error::*;
use tc_transact::fs::{CopyFrom, Dir, DirCreateFile, DirReadFile, File, Persist, Restore};
use tc_transact::lock::{TxnLock, TxnLockCommit};
use tc_transact::{Transact, Transaction, TxnId};
use tc_value::Value;
use tcgeneric::{label, Id, Instance, Label, TCBoxTryStream, Tuple};
use super::view::{Limited, MergeSource, Merged, Selection, TableSlice as Slice};
use super::{
Bounds, Column, ColumnBound, IndexSchema, IndexSlice, Key, Row, Table, TableInstance,
TableOrder, TableRead, TableSchema, TableSlice, TableStream, TableType, TableWrite, Values,
};
const PRIMARY_INDEX: Label = label("primary");
#[derive(Clone)]
pub struct Index<F, D, Txn> {
btree: BTreeFile<F, D, Txn>,
schema: IndexSchema,
}
impl<F, D, Txn> Index<F, D, Txn>
where
F: File<Key = NodeId, Block = Node>,
D: Dir,
Txn: Transaction<D>,
{
pub fn btree(&'_ self) -> &'_ BTreeFile<F, D, Txn> {
&self.btree
}
pub async fn is_empty(&self, txn: &Txn) -> TCResult<bool> {
self.btree.is_empty(*txn.id()).await
}
pub fn index_slice(self, bounds: Bounds) -> TCResult<IndexSlice<F, D, Txn>> {
debug!("Index::index_slice");
let bounds = bounds.validate(&self.schema.columns())?;
IndexSlice::new(self.btree, self.schema, bounds)
}
pub fn schema(&'_ self) -> &'_ IndexSchema {
&self.schema
}
pub fn validate_slice_bounds(&self, outer: Bounds, inner: Bounds) -> TCResult<()> {
let columns = &self.schema.columns();
let outer = outer.validate(columns)?.into_btree_range(columns)?;
let inner = inner.validate(columns)?.into_btree_range(columns)?;
if outer.contains(&inner, self.btree.collator()) {
Ok(())
} else {
Err(TCError::unsupported(
"slice does not contain requested bounds",
))
}
}
pub async fn slice_rows<'a>(
self,
txn_id: TxnId,
bounds: Bounds,
reverse: bool,
) -> TCResult<TCBoxTryStream<'a, Vec<Value>>> {
self.validate_bounds(&bounds)?;
let range = bounds.into_btree_range(&self.schema.columns())?;
self.btree.slice(range, reverse)?.keys(txn_id).await
}
async fn delete_inner(&self, txn_id: TxnId, key: Key) -> TCResult<()> {
debug!("Index::delete {:?}", key);
let range = tc_btree::Range::with_prefix(key.to_vec());
self.btree.delete(txn_id, range).await
}
async fn delete(&self, txn_id: TxnId, mut row: Row) -> TCResult<()> {
let key = self
.schema
.key()
.iter()
.map(|col| {
row.remove(&col.name)
.ok_or_else(|| TCError::bad_request("missing value for column", &col.name))
})
.collect::<TCResult<Key>>()?;
self.delete_inner(txn_id, key).await
}
async fn replace(&self, txn_id: TxnId, mut row: Row, mut update: Row) -> TCResult<()> {
debug!("Index::replace {} with updated values {}", row, update);
let old_key = self
.schema
.key()
.iter()
.map(|col| {
row.get(&col.name)
.cloned()
.ok_or_else(|| TCError::bad_request("missing value for column", &col.name))
})
.collect::<TCResult<Key>>()?;
let new_key = BTreeInstance::schema(&self.btree)
.iter()
.map(|col| {
if let Some(value) = update.remove(&col.name) {
Ok(value)
} else if let Some(value) = row.remove(&col.name) {
Ok(value)
} else {
Err(TCError::bad_request("missing value for column", &col.name))
}
})
.collect::<TCResult<Key>>()?;
self.delete_inner(txn_id, old_key).await?;
self.btree.insert(txn_id, new_key).await
}
}
impl<F, D, Txn> Instance for Index<F, D, Txn>
where
Self: Send + Sync,
{
type Class = TableType;
fn class(&self) -> TableType {
TableType::Index
}
}
impl<F, D, Txn> TableInstance for Index<F, D, Txn>
where
F: File<Key = NodeId, Block = Node>,
D: Dir,
Txn: Transaction<D>,
{
fn key(&self) -> &[Column] {
self.schema.key()
}
fn values(&self) -> &[Column] {
self.schema.values()
}
fn schema(&self) -> TableSchema {
self.schema.clone().into()
}
}
impl<F, D, Txn> TableOrder for Index<F, D, Txn>
where
F: File<Key = NodeId, Block = Node>,
D: Dir,
Txn: Transaction<D>,
{
type OrderBy = IndexSlice<F, D, Txn>;
type Reverse = IndexSlice<F, D, Txn>;
fn order_by(self, order: Vec<Id>, reverse: bool) -> TCResult<Self::OrderBy> {
if self.schema.starts_with(&order) {
Ok(IndexSlice::all(self.btree, self.schema, reverse))
} else {
Err(TCError::bad_request(
&format!("Index with schema {} does not support order", self.schema),
Value::from_iter(order),
))
}
}
fn reverse(self) -> TCResult<Self::Reverse> {
Ok(IndexSlice::all(self.btree, self.schema, true).into())
}
fn validate_order(&self, order: &[Id]) -> TCResult<()> {
if !self.schema.starts_with(&order) {
let order: Vec<String> = order.iter().map(|c| c.to_string()).collect();
Err(TCError::bad_request(
&format!("cannot order index with schema {} by", self.schema),
order.join(", "),
))
} else {
Ok(())
}
}
}
#[async_trait]
impl<F, D, Txn> TableStream for Index<F, D, Txn>
where
F: File<Key = NodeId, Block = Node>,
D: Dir,
Txn: Transaction<D>,
{
type Limit = Limited<F, D, Txn>;
type Selection = Selection<F, D, Txn, Self>;
async fn count(self, txn_id: TxnId) -> TCResult<u64> {
self.btree.count(txn_id).await
}
fn limit(self, limit: u64) -> Self::Limit {
Limited::new(self, limit)
}
fn select(self, columns: Vec<Id>) -> TCResult<Self::Selection> {
Selection::new(self, columns)
}
async fn rows<'a>(self, txn_id: TxnId) -> TCResult<TCBoxTryStream<'a, Vec<Value>>> {
debug!("Index::rows");
self.btree.keys(txn_id).await
}
}
impl<F, D, Txn> TableSlice for Index<F, D, Txn>
where
F: File<Key = NodeId, Block = Node>,
D: Dir,
Txn: Transaction<D>,
{
type Slice = IndexSlice<F, D, Txn>;
fn slice(self, bounds: Bounds) -> TCResult<IndexSlice<F, D, Txn>> {
self.index_slice(bounds).map(|is| is.into())
}
fn validate_bounds(&self, bounds: &Bounds) -> TCResult<()> {
if bounds.is_empty() {
return Ok(());
}
let columns = self.schema.columns();
let mut bounds = bounds.clone();
let mut ordered_bounds = Vec::with_capacity(columns.len());
for column in columns {
let bound = bounds.remove(column.name()).unwrap_or_default();
ordered_bounds.push(bound);
if bounds.is_empty() {
break;
}
}
if !bounds.is_empty() {
return Err(TCError::bad_request(
"Index has no such columns: {}",
Value::from_iter(bounds.keys().cloned()),
));
}
debug!(
"ordered bounds: {}",
Tuple::<&ColumnBound>::from_iter(&ordered_bounds)
);
if ordered_bounds[..ordered_bounds.len() - 1]
.iter()
.any(ColumnBound::is_range)
{
return Err(TCError::unsupported(
"Index bounds must include a maximum of one range, only on the rightmost column",
));
}
Ok(())
}
}
#[async_trait]
impl<F, D, Txn> Transact for Index<F, D, Txn>
where
F: File<Key = NodeId, Block = Node> + Transact,
D: Dir,
Txn: Transaction<D>,
{
type Commit = <tc_btree::BTreeFile<F, D, Txn> as Transact>::Commit;
async fn commit(&self, txn_id: TxnId) -> Self::Commit {
self.btree.commit(txn_id).await
}
async fn rollback(&self, txn_id: &TxnId) {
self.btree.rollback(txn_id).await
}
async fn finalize(&self, txn_id: &TxnId) {
self.btree.finalize(txn_id).await
}
}
impl<F, D, Txn> Persist<D> for Index<F, D, Txn>
where
F: File<Key = NodeId, Block = Node, Inner = D::Inner> + TryFrom<D::Store, Error = TCError>,
D: Dir + TryFrom<D::Store, Error = TCError>,
Txn: Transaction<D>,
D::Store: From<F>,
{
type Txn = Txn;
type Schema = IndexSchema;
fn create(txn_id: TxnId, schema: Self::Schema, store: D::Store) -> TCResult<Self> {
BTreeFile::create(txn_id, schema.clone().into(), store).map(|btree| Self { schema, btree })
}
fn load(txn_id: TxnId, schema: Self::Schema, store: D::Store) -> TCResult<Self> {
BTreeFile::load(txn_id, schema.clone().into(), store).map(|btree| Self { schema, btree })
}
fn dir(&self) -> F::Inner {
BTreeFile::dir(&self.btree)
}
}
#[async_trait]
impl<F, D, Txn> Restore<D> for Index<F, D, Txn>
where
F: File<Key = NodeId, Block = Node, Inner = D::Inner> + TryFrom<D::Store, Error = TCError>,
D: Dir + TryFrom<D::Store, Error = TCError>,
Txn: Transaction<D>,
D::Store: From<F>,
{
async fn restore(&self, txn_id: TxnId, backup: &Self) -> TCResult<()> {
self.btree.restore(txn_id, &backup.btree).await
}
}
impl<F, D, Txn> From<Index<F, D, Txn>> for Table<F, D, Txn> {
fn from(index: Index<F, D, Txn>) -> Self {
Table::Index(index)
}
}
struct Inner<F, D, Txn> {
schema: TableSchema,
primary: Index<F, D, Txn>,
auxiliary: Vec<(Id, Index<F, D, Txn>)>,
dir: D,
}
#[derive(Clone)]
pub struct TableIndex<F, D, Txn> {
inner: Arc<Inner<F, D, Txn>>,
}
impl<F, D, Txn> TableIndex<F, D, Txn>
where
F: File<Key = NodeId, Block = Node, Inner = D::Inner> + TryFrom<D::Store, Error = TCError>,
D: Dir + TryFrom<D::Store, Error = TCError>,
Txn: Transaction<D>,
D::Write: DirCreateFile<F>,
D::Store: From<F>,
{
fn create_index(
txn_id: TxnId,
primary: &IndexSchema,
file: F,
key: Vec<Id>,
) -> TCResult<Index<F, D, Txn>> {
let schema = primary.auxiliary(&key)?;
Index::create(txn_id, schema, file.into())
}
}
impl<F, D, Txn> TableIndex<F, D, Txn>
where
F: File<Key = NodeId, Block = Node>,
D: Dir,
Txn: Transaction<D>,
{
pub async fn is_empty(&self, txn: &Txn) -> TCResult<bool> {
self.inner.primary.is_empty(txn).await
}
pub fn merge_bounds(&self, all_bounds: Vec<Bounds>) -> TCResult<Bounds> {
let collator = self.inner.primary.btree().collator();
let mut merged = Bounds::default();
for bounds in all_bounds {
merged.merge(bounds, collator)?;
}
Ok(merged)
}
pub fn primary(&self) -> &Index<F, D, Txn> {
&self.inner.primary
}
pub fn supporting_index(&self, bounds: &Bounds) -> TCResult<Index<F, D, Txn>> {
if self.inner.primary.validate_bounds(bounds).is_ok() {
return Ok(self.inner.primary.clone());
}
for (_, index) in &self.inner.auxiliary {
if index.validate_bounds(bounds).is_ok() {
return Ok(index.clone());
}
}
Err(TCError::bad_request(
"this table has no index which supports bounds",
bounds,
))
}
pub async fn slice_rows<'a>(
self,
txn_id: TxnId,
bounds: Bounds,
reverse: bool,
) -> TCResult<TCBoxTryStream<'a, Vec<Value>>> {
self.inner
.primary
.clone()
.slice_rows(txn_id, bounds, reverse)
.await
}
}
impl<F, D, Txn> Instance for TableIndex<F, D, Txn>
where
Self: Send + Sync,
{
type Class = TableType;
fn class(&self) -> TableType {
TableType::Table
}
}
#[async_trait]
impl<F, D, Txn> TableInstance for TableIndex<F, D, Txn>
where
F: File<Key = NodeId, Block = Node>,
D: Dir,
Txn: Transaction<D>,
{
fn key(&self) -> &[Column] {
self.inner.primary.key()
}
fn values(&self) -> &[Column] {
self.inner.primary.values()
}
fn schema(&self) -> TableSchema {
self.inner.schema.clone()
}
}
impl<F, D, Txn> TableOrder for TableIndex<F, D, Txn>
where
F: File<Key = NodeId, Block = Node>,
D: Dir,
Txn: Transaction<D>,
{
type OrderBy = Merged<F, D, Txn>;
type Reverse = Merged<F, D, Txn>;
fn order_by(self, columns: Vec<Id>, reverse: bool) -> TCResult<Self::OrderBy> {
self.validate_order(&columns)?;
let selection = Slice::new(self.clone(), Bounds::default())?;
let merge_source = MergeSource::Table(selection);
if self.primary().validate_order(&columns).is_ok() {
debug!("primary key can order by {}", Tuple::from(columns.clone()));
let index_slice = self.primary().clone().index_slice(Bounds::default())?;
let merged = Merged::new(merge_source, index_slice)?;
return if reverse {
merged.reverse()
} else {
Ok(merged.into())
};
} else {
for (name, index) in &self.inner.auxiliary {
if index.validate_order(&columns).is_ok() {
debug!(
"index {} can order by {}",
name,
Tuple::from(columns.clone())
);
let index_slice = index.clone().index_slice(Bounds::default())?;
let merged = Merged::new(merge_source, index_slice)?;
return if reverse {
merged.reverse()
} else {
Ok(merged.into())
};
}
}
}
Err(TCError::bad_request(
"table has no index to order by",
Tuple::<Id>::from_iter(columns),
))
}
fn reverse(self) -> TCResult<Self::Reverse> {
Err(TCError::unsupported(
"cannot reverse a Table itself, consider reversing a slice of the table instead",
))
}
fn validate_order(&self, mut order: &[Id]) -> TCResult<()> {
while !order.is_empty() {
let initial = order.to_vec();
let mut i = order.len();
loop {
let subset = &order[..i];
if self.inner.primary.validate_order(subset).is_ok() {
order = &order[i..];
break;
}
for (_, index) in &self.inner.auxiliary {
if index.validate_order(subset).is_ok() {
order = &order[i..];
break;
}
}
if order.is_empty() {
break;
} else {
i = i - 1;
}
}
if order == &initial[..] {
let order: Vec<String> = order.iter().map(|id| id.to_string()).collect();
return Err(TCError::bad_request(
"This table has no index to support the order",
order.join(", "),
));
}
}
Ok(())
}
}
#[async_trait]
impl<F, D, Txn> TableRead for TableIndex<F, D, Txn>
where
F: File<Key = NodeId, Block = Node>,
D: Dir,
Txn: Transaction<D>,
{
async fn read(&self, txn_id: &TxnId, key: &Key) -> TCResult<Option<Vec<Value>>> {
let slice = self
.inner
.primary
.btree
.clone()
.slice(tc_btree::Range::with_prefix(key.to_vec()), false)?;
let mut keys = slice.keys(*txn_id).await?;
keys.try_next().await
}
}
#[async_trait]
impl<F, D, Txn> TableStream for TableIndex<F, D, Txn>
where
F: File<Key = NodeId, Block = Node>,
D: Dir,
Txn: Transaction<D>,
{
type Limit = Limited<F, D, Txn>;
type Selection = Selection<F, D, Txn, Self>;
async fn count(self, txn_id: TxnId) -> TCResult<u64> {
self.inner.primary.clone().count(txn_id).await
}
fn limit(self, limit: u64) -> Self::Limit {
Limited::new(self, limit)
}
fn select(self, columns: Vec<Id>) -> TCResult<Self::Selection> {
Selection::new(self, columns)
}
async fn rows<'a>(self, txn_id: TxnId) -> TCResult<TCBoxTryStream<'a, Vec<Value>>> {
self.inner.primary.clone().rows(txn_id).await
}
}
impl<F, D, Txn> TableSlice for TableIndex<F, D, Txn>
where
F: File<Key = NodeId, Block = Node>,
D: Dir,
Txn: Transaction<D>,
{
type Slice = Merged<F, D, Txn>;
fn slice(self, bounds: Bounds) -> TCResult<Merged<F, D, Txn>> {
debug!("TableIndex::slice {}", bounds);
let primary = &self.inner.primary;
let auxiliary = &self.inner.auxiliary;
let columns: Vec<Id> = primary
.schema()
.columns()
.iter()
.map(|c| c.name())
.cloned()
.collect();
let bounds: Vec<(Id, ColumnBound)> = columns
.into_iter()
.filter_map(|name| bounds.get(&name).map(|bound| (name, bound.clone())))
.collect();
let selection = Slice::new(self.clone(), Bounds::default())?;
let mut merge_source = MergeSource::Table(selection);
let mut bounds = &bounds[..];
loop {
let initial = bounds.len();
let mut i = bounds.len();
while i > 0 {
let subset: HashMap<Id, ColumnBound> = bounds[..i].to_vec().into_iter().collect();
let subset = Bounds::from(subset);
if primary.validate_bounds(&subset).is_ok() {
debug!("primary key can slice {}", subset);
let index_slice = primary.clone().index_slice(subset)?;
let merged = Merged::new(merge_source, index_slice)?;
bounds = &bounds[i..];
if bounds.is_empty() {
return Ok(merged);
}
merge_source = MergeSource::Merge(Box::new(merged));
break;
} else {
let mut supported = false;
for (name, index) in auxiliary {
debug!("checking index {} with schema {}", name, index.schema());
match index.validate_bounds(&subset) {
Ok(()) => {
debug!("index {} can slice {}", name, subset);
supported = true;
let index_slice = index.clone().index_slice(subset)?;
let merged = Merged::new(merge_source, index_slice)?;
bounds = &bounds[i..];
if bounds.is_empty() {
return Ok(merged);
}
merge_source = MergeSource::Merge(Box::new(merged));
break;
}
Err(cause) => {
debug!("index {} cannot slice {}: {}", name, subset, cause);
}
}
}
if supported {
break;
}
};
i = i - 1;
}
if bounds.len() == initial {
return Err(TCError::unsupported(
"this Table has no Index to support the requested selection bounds",
));
}
}
}
fn validate_bounds(&self, bounds: &Bounds) -> TCResult<()> {
let primary = &self.inner.primary;
let auxiliary = &self.inner.auxiliary;
if primary.validate_bounds(bounds).is_ok() {
return Ok(());
}
let bounds: Vec<(Id, ColumnBound)> = primary
.schema()
.columns()
.iter()
.filter_map(|c| {
bounds
.get(c.name())
.map(|bound| (c.name().clone(), bound.clone()))
})
.collect();
let mut bounds = &bounds[..];
while !bounds.is_empty() {
let initial = bounds.len();
let mut i = bounds.len();
loop {
let subset: HashMap<Id, ColumnBound> = bounds[..i].iter().cloned().collect();
let subset = Bounds::from(subset);
if primary.validate_bounds(&subset).is_ok() {
bounds = &bounds[i..];
break;
}
for (_, index) in auxiliary {
if index.validate_bounds(&subset).is_ok() {
bounds = &bounds[i..];
break;
}
}
if bounds.is_empty() {
break;
} else {
i = i - 1;
}
}
if bounds.len() == initial {
let bounds = Tuple::<String>::from_iter(
bounds
.into_iter()
.map(|(id, bound)| format!("{}: {}", id, bound)),
);
return Err(TCError::unsupported(format!("this table has no index to support selection bounds on {}--available indices are {}", bounds, Tuple::<&Id>::from_iter(auxiliary.iter().map(|(id, _)| id)))));
}
}
Ok(())
}
}
#[async_trait]
impl<F, D, Txn> TableWrite for TableIndex<F, D, Txn>
where
F: File<Key = NodeId, Block = Node>,
D: Dir,
Txn: Transaction<D>,
{
async fn delete(&self, txn_id: TxnId, key: Key) -> TCResult<()> {
let primary = &self.inner.primary;
let aux = &self.inner.auxiliary;
let key = primary.schema.validate_key(key)?;
let row = match self.read(&txn_id, &key).await? {
Some(row) => row,
None => return Ok(()),
};
let row = primary.schema.row_from_values(row)?;
let mut deletes = Vec::with_capacity(aux.len() + 1);
for (_, index) in aux {
deletes.push(index.delete(txn_id, row.clone()));
}
deletes.push(primary.delete(txn_id, row));
try_join_all(deletes).await?;
Ok(())
}
async fn update(&self, txn_id: TxnId, key: Key, values: Row) -> TCResult<()> {
let columns_updated: HashSet<Id> = values.keys().cloned().collect();
let primary = &self.inner.primary;
let aux = &self.inner.auxiliary;
let key = primary.schema.validate_key(key)?;
let row = match self.read(&txn_id, &key).await? {
Some(values) => primary.schema.row_from_values(values)?,
None => return Ok(()),
};
let mut updates = Vec::with_capacity(aux.len() + 1);
for (_, index) in aux {
if !index
.schema
.column_names()
.any(|name| columns_updated.contains(name))
{
continue;
}
updates.push(index.replace(txn_id, row.clone(), values.clone()));
}
updates.push(primary.replace(txn_id, row, values));
try_join_all(updates).await?;
Ok(())
}
async fn upsert(&self, txn_id: TxnId, key: Key, values: Values) -> TCResult<()> {
let primary = &self.inner.primary;
let aux = &self.inner.auxiliary;
let key = primary.schema.validate_key(key)?;
let values = primary.schema.validate_values(values)?;
let columns: HashSet<Id> = primary
.schema
.values()
.iter()
.map(|col| &col.name)
.cloned()
.collect();
let row = primary.schema.row_from_key_values(key, values)?;
let update: Row = row
.clone()
.into_iter()
.filter(|(id, _)| columns.contains(id))
.collect();
let mut upserts = Vec::with_capacity(aux.len() + 1);
for (_name, index) in aux {
upserts.push(index.replace(txn_id, row.clone(), update.clone()));
}
upserts.push(primary.replace(txn_id, row, update));
try_join_all(upserts).await?;
Ok(())
}
}
#[async_trait]
impl<F, D, Txn> Transact for TableIndex<F, D, Txn>
where
F: File<Key = NodeId, Block = Node>
+ Transact<Commit = Option<TxnLockCommit<BTreeMap<NodeId, TxnLock<TxnId>>>>>,
D: Dir,
Txn: Transaction<D>,
{
type Commit = <BTreeFile<F, D, Txn> as Transact>::Commit;
async fn commit(&self, txn_id: TxnId) -> Self::Commit {
let guard = self.inner.primary.commit(txn_id).await?;
let index_commits = self
.inner
.auxiliary
.iter()
.map(|(_, index)| index.commit(txn_id));
join_all(index_commits).await;
Some(guard)
}
async fn rollback(&self, txn_id: &TxnId) {
let index_rollbacks = self
.inner
.auxiliary
.iter()
.map(|(_, index)| index.rollback(txn_id));
join_all(iter::once(self.inner.primary.rollback(txn_id)).chain(index_rollbacks)).await;
}
async fn finalize(&self, txn_id: &TxnId) {
let index_cleanups = self
.inner
.auxiliary
.iter()
.map(|(_, index)| index.finalize(txn_id));
join_all(iter::once(self.inner.primary.finalize(txn_id)).chain(index_cleanups)).await;
}
}
impl<F, D, Txn> Persist<D> for TableIndex<F, D, Txn>
where
F: File<Key = NodeId, Block = Node, Inner = D::Inner> + TryFrom<D::Store, Error = TCError>,
D: Dir + TryFrom<D::Store, Error = TCError>,
Txn: Transaction<D>,
D::Read: DirReadFile<F>,
D::Write: DirCreateFile<F>,
D::Store: From<F>,
{
type Txn = Txn;
type Schema = TableSchema;
fn create(txn_id: TxnId, schema: Self::Schema, store: D::Store) -> TCResult<Self> {
let dir = D::try_from(store)?;
let mut dir_lock = dir.try_write(txn_id)?;
let primary_file = dir_lock.create_file(PRIMARY_INDEX.into())?;
let primary = Index::create(txn_id, schema.primary().clone(), primary_file.into())?;
let primary_schema = schema.primary();
let mut auxiliary = Vec::with_capacity(schema.indices().len());
for (name, column_names) in schema.indices() {
if name == &PRIMARY_INDEX {
return Err(TCError::bad_request(
"cannot create an auxiliary index with reserved name",
PRIMARY_INDEX,
));
}
let file = dir_lock.create_file(name.clone())?;
let index = Self::create_index(txn_id, primary_schema, file, column_names.to_vec())
.map(move |index| (name.clone(), index))?;
auxiliary.push(index);
}
Ok(Self {
inner: Arc::new(Inner {
schema,
primary,
auxiliary,
dir,
}),
})
}
fn load(txn_id: TxnId, schema: Self::Schema, store: D::Store) -> TCResult<Self> {
let dir = D::try_from(store)?;
let dir_lock = dir.try_read(txn_id)?;
let file = dir_lock
.get_file(&PRIMARY_INDEX.into())?
.ok_or_else(|| TCError::internal("cannot load Table: primary index is missing"))?;
let primary = Index::load(txn_id, schema.primary().clone(), file.into())?;
let mut auxiliary = Vec::with_capacity(schema.indices().len());
for (name, columns) in schema.indices() {
let file = dir_lock.get_file(name)?.ok_or_else(|| {
TCError::internal(format!("cannot load Table: missing index {}", name))
})?;
let index_schema = schema.primary().auxiliary(columns)?;
let index = Index::load(txn_id, index_schema, file.into())?;
auxiliary.push((name.clone(), index));
}
Ok(Self {
inner: Arc::new(Inner {
schema,
primary,
auxiliary,
dir,
}),
})
}
fn dir(&self) -> D::Inner {
self.inner.dir.clone().into_inner()
}
}
#[async_trait]
impl<F, D, Txn> Restore<D> for TableIndex<F, D, Txn>
where
F: File<Key = NodeId, Block = Node, Inner = D::Inner> + TryFrom<D::Store, Error = TCError>,
D: Dir + TryFrom<D::Store, Error = TCError>,
Txn: Transaction<D>,
D::Read: DirReadFile<F>,
D::Write: DirCreateFile<F>,
D::Store: From<F>,
{
async fn restore(&self, txn_id: TxnId, backup: &Self) -> TCResult<()> {
if self.inner.schema != backup.inner.schema {
return Err(TCError::unsupported(
"cannot restore a Table using a backup with a different schema",
));
}
let mut restores = Vec::with_capacity(self.inner.auxiliary.len() + 1);
restores.push(self.inner.primary.restore(txn_id, &backup.inner.primary));
let mut backup_indices = BTreeMap::from_iter(
backup
.inner
.auxiliary
.iter()
.map(|(name, index)| (name, index)),
);
for (name, index) in &self.inner.auxiliary {
restores.push(index.restore(txn_id, backup_indices.remove(name).unwrap()));
}
try_join_all(restores).await?;
Ok(())
}
}
#[async_trait]
impl<F, D, Txn, I> CopyFrom<D, I> for TableIndex<F, D, Txn>
where
F: File<Key = NodeId, Block = Node, Inner = D::Inner> + TryFrom<D::Store, Error = TCError>,
D: Dir + TryFrom<D::Store, Error = TCError>,
Txn: Transaction<D>,
I: TableStream + 'static,
D::Read: DirReadFile<F>,
D::Write: DirCreateFile<F>,
D::Store: From<F>,
{
async fn copy_from(txn: &Txn, store: D::Store, source: I) -> TCResult<Self> {
let txn_id = *txn.id();
let schema = source.schema();
let key_len = schema.primary().key().len();
let table = Self::create(txn_id, schema, store)?;
let rows = source.rows(txn_id).await?;
rows.map_ok(|mut row| (row.drain(..key_len).collect(), row))
.map_ok(|(key, values)| table.upsert(txn_id, key, values))
.try_buffer_unordered(num_cpus::get())
.try_fold((), |(), ()| future::ready(Ok(())))
.await?;
Ok(table)
}
}
struct TableVisitor<F: File<Key = NodeId, Block = Node>, D: Dir, Txn: Transaction<D>> {
txn: Txn,
phantom_file: PhantomData<F>,
phantom_dir: PhantomData<D>,
}
#[async_trait]
impl<F, D, Txn> de::Visitor for TableVisitor<F, D, Txn>
where
F: File<Key = NodeId, Block = Node, Inner = D::Inner> + TryFrom<D::Store, Error = TCError>,
D: Dir + TryFrom<D::Store, Error = TCError>,
Txn: Transaction<D>,
D::Read: DirReadFile<F>,
D::Write: DirCreateFile<F>,
D::Store: From<D> + From<F>,
{
type Value = TableIndex<F, D, Txn>;
fn expecting() -> &'static str {
"a Table"
}
async fn visit_seq<A: de::SeqAccess>(self, mut seq: A) -> Result<Self::Value, A::Error> {
let txn_id = *self.txn.id();
let schema = seq
.next_element(())
.await?
.ok_or_else(|| de::Error::invalid_length(0, "a Table schema"))?;
let table = TableIndex::create(txn_id, schema, self.txn.context().clone().into())
.map_err(de::Error::custom)?;
if let Some(visitor) = seq
.next_element::<RowVisitor<F, D, Txn>>((txn_id, table.clone()))
.await?
{
Ok(visitor.table)
} else {
Ok(table)
}
}
}
struct RowVisitor<F: File<Key = NodeId, Block = Node>, D: Dir, Txn: Transaction<D>> {
table: TableIndex<F, D, Txn>,
txn_id: TxnId,
}
#[async_trait]
impl<F, D, Txn> de::Visitor for RowVisitor<F, D, Txn>
where
F: File<Key = NodeId, Block = Node, Inner = D::Inner> + TryFrom<D::Store, Error = TCError>,
D: Dir + TryFrom<D::Store, Error = TCError>,
Txn: Transaction<D>,
D::Write: DirCreateFile<F>,
D::Store: From<F>,
{
type Value = Self;
fn expecting() -> &'static str {
"a sequence of table rows"
}
async fn visit_seq<A: de::SeqAccess>(self, mut seq: A) -> Result<Self::Value, A::Error> {
let schema = self.table.primary().schema();
while let Some(row) = seq.next_element(()).await? {
let row = schema.row_from_values(row).map_err(de::Error::custom)?;
let (key, values) = schema
.key_values_from_row(row, true)
.map_err(de::Error::custom)?;
self.table
.upsert(self.txn_id, key, values)
.map_err(de::Error::custom)
.await?;
}
Ok(self)
}
}
#[async_trait]
impl<F, D, Txn> de::FromStream for RowVisitor<F, D, Txn>
where
F: File<Key = NodeId, Block = Node, Inner = D::Inner> + TryFrom<D::Store, Error = TCError>,
D: Dir + TryFrom<D::Store, Error = TCError>,
Txn: Transaction<D>,
D::Write: DirCreateFile<F>,
D::Store: From<F>,
{
type Context = (TxnId, TableIndex<F, D, Txn>);
async fn from_stream<De: de::Decoder>(
cxt: Self::Context,
decoder: &mut De,
) -> Result<Self, De::Error> {
let (txn_id, table) = cxt;
decoder.decode_seq(Self { txn_id, table }).await
}
}
#[async_trait]
impl<F, D, Txn> de::FromStream for TableIndex<F, D, Txn>
where
F: File<Key = NodeId, Block = Node, Inner = D::Inner> + TryFrom<D::Store, Error = TCError>,
D: Dir + TryFrom<D::Store, Error = TCError>,
Txn: Transaction<D>,
D::Read: DirReadFile<F>,
D::Write: DirCreateFile<F>,
D::Store: From<D> + From<F>,
{
type Context = Txn;
async fn from_stream<De: de::Decoder>(txn: Txn, decoder: &mut De) -> Result<Self, De::Error> {
decoder
.decode_seq(TableVisitor {
txn,
phantom_dir: PhantomData,
phantom_file: PhantomData,
})
.await
}
}
impl<F, D, Txn> From<TableIndex<F, D, Txn>> for Table<F, D, Txn>
where
F: File<Key = NodeId, Block = Node>,
D: Dir,
Txn: Transaction<D>,
{
fn from(table: TableIndex<F, D, Txn>) -> Self {
Self::Table(table)
}
}
impl<F: File<Key = NodeId, Block = Node>, D: Dir, Txn: Transaction<D>> fmt::Display
for TableIndex<F, D, Txn>
{
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("a Table")
}
}