1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use async_trait::async_trait;

use tc_error::{TCError, TCResult};
use tc_transact::fs::{Dir, File};
use tc_transact::{Transaction, TxnId};
use tc_value::ValueCollator;
use tcgeneric::{Instance, TCTryStream};

use super::{
    validate_range, BTree, BTreeFile, BTreeInstance, BTreeType, Key, Node, Range, RowSchema,
};
use futures::{TryFutureExt, TryStreamExt};

/// A slice of a [`BTree`]
#[derive(Clone)]
pub struct BTreeSlice<F, D, T> {
    source: BTreeFile<F, D, T>,
    range: Range,
    reverse: bool,
}

impl<F: File<Node>, D: Dir, T: Transaction<D>> BTreeSlice<F, D, T> {
    pub fn new(
        source: BTree<F, D, T>,
        range: Range,
        reverse: bool,
    ) -> TCResult<BTreeSlice<F, D, T>> {
        let range = validate_range(range, source.schema())?;

        match source {
            BTree::File(tree) => Ok(Self {
                source: tree,
                range,
                reverse,
            }),

            BTree::Slice(view) => {
                let source = view.source.clone();
                let reverse = view.reverse ^ reverse;

                if range == Range::default() {
                    Ok(Self {
                        source,
                        range: view.range,
                        reverse,
                    })
                } else if view.range.contains(&range, view.source.collator()) {
                    Ok(Self {
                        source,
                        range,
                        reverse,
                    })
                } else {
                    Err(TCError::unsupported(
                        "BTreeSlice does not contain requested range",
                    ))
                }
            }
        }
    }
}

impl<F, D, T> Instance for BTreeSlice<F, D, T>
where
    BTreeFile<F, D, T>: Send + Sync,
{
    type Class = BTreeType;

    fn class(&self) -> Self::Class {
        BTreeType::Slice
    }
}

#[async_trait]
impl<F: File<Node>, D: Dir, T: Transaction<D>> BTreeInstance for BTreeSlice<F, D, T>
where
    BTreeFile<F, D, T>: Clone + 'static,
{
    type Slice = Self;

    fn collator(&'_ self) -> &'_ ValueCollator {
        self.source.collator()
    }

    fn schema(&'_ self) -> &'_ RowSchema {
        self.source.schema()
    }

    fn slice(self, range: Range, reverse: bool) -> TCResult<Self::Slice> {
        let range = validate_range(range, self.schema())?;

        if self.range.contains(&range, self.collator()) {
            Self::new(BTree::Slice(self), range, reverse)
        } else {
            Err(TCError::unsupported(
                "BTreeSlice does not contain the requested range",
            ))
        }
    }

    async fn is_empty(&self, txn_id: TxnId) -> TCResult<bool> {
        let mut rows = self
            .source
            .clone()
            .rows_in_range(txn_id, self.range.clone(), self.reverse)
            .await?;

        rows.try_next().map_ok(|row| row.is_none()).await
    }

    async fn delete(&self, txn_id: TxnId) -> TCResult<()> {
        self.source.delete_range(txn_id, &self.range).await
    }

    async fn insert(&self, txn_id: TxnId, key: Key) -> TCResult<()> {
        self.source.insert(txn_id, key).await
    }

    async fn keys<'a>(self, txn_id: TxnId) -> TCResult<TCTryStream<'a, Key>> {
        self.source
            .rows_in_range(txn_id, self.range, self.reverse)
            .await
    }
}