use std::fmt::Debug;
use std::sync::Arc;
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use lance::Dataset;
use lance::dataset::scanner::{ColumnOrdering, DatasetRecordBatchStream};
use lance::dataset::{WhenMatched, WhenNotMatched};
use crate::db::{Snapshot, SubTableEntry};
use crate::error::Result;
use crate::table_store::{DeleteState, StagedWrite, TableState, TableStore};
pub(crate) mod sealed {
pub trait Sealed {}
impl Sealed for crate::table_store::TableStore {}
}
#[derive(Debug, Clone)]
pub struct SnapshotHandle {
pub(crate) inner: Arc<Dataset>,
}
impl SnapshotHandle {
pub(crate) fn new(ds: Dataset) -> Self {
Self { inner: Arc::new(ds) }
}
pub(crate) fn dataset(&self) -> &Dataset {
&self.inner
}
pub(crate) fn into_arc(self) -> Arc<Dataset> {
self.inner
}
pub fn version(&self) -> u64 {
self.inner.version().version
}
pub fn uses_stable_row_ids(&self) -> bool {
self.inner.manifest.uses_stable_row_ids()
}
}
#[derive(Debug, Clone)]
pub struct StagedHandle {
pub(crate) inner: StagedWrite,
}
impl StagedHandle {
pub(crate) fn new(staged: StagedWrite) -> Self {
Self { inner: staged }
}
pub(crate) fn into_staged(self) -> StagedWrite {
self.inner
}
}
pub(crate) fn staged_handles_as_writes(handles: &[StagedHandle]) -> Vec<StagedWrite> {
handles.iter().map(|h| h.inner.clone()).collect()
}
#[async_trait]
pub trait TableStorage: sealed::Sealed + Send + Sync + Debug {
async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle>;
async fn open_snapshot_at_table(
&self,
snapshot: &Snapshot,
table_key: &str,
) -> Result<SnapshotHandle>;
async fn open_dataset_head(
&self,
dataset_uri: &str,
branch: Option<&str>,
) -> Result<SnapshotHandle>;
async fn open_dataset_head_for_write(
&self,
table_key: &str,
dataset_uri: &str,
branch: Option<&str>,
) -> Result<SnapshotHandle>;
async fn open_dataset_at_state(
&self,
table_path: &str,
branch: Option<&str>,
version: u64,
) -> Result<SnapshotHandle>;
async fn fork_branch_from_state(
&self,
dataset_uri: &str,
source_branch: Option<&str>,
table_key: &str,
source_version: u64,
target_branch: &str,
) -> Result<SnapshotHandle>;
async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()>;
async fn reopen_for_mutation(
&self,
dataset_uri: &str,
branch: Option<&str>,
table_key: &str,
expected_version: u64,
) -> Result<SnapshotHandle>;
fn ensure_expected_version(
&self,
snapshot: &SnapshotHandle,
table_key: &str,
expected_version: u64,
) -> Result<()>;
async fn scan(
&self,
snapshot: &SnapshotHandle,
projection: Option<&[&str]>,
filter: Option<&str>,
order_by: Option<Vec<ColumnOrdering>>,
) -> Result<Vec<RecordBatch>>;
async fn scan_with_row_id(
&self,
snapshot: &SnapshotHandle,
projection: Option<&[&str]>,
filter: Option<&str>,
order_by: Option<Vec<ColumnOrdering>>,
with_row_id: bool,
) -> Result<Vec<RecordBatch>>;
async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>>;
async fn scan_batches_for_rewrite(
&self,
snapshot: &SnapshotHandle,
) -> Result<Vec<RecordBatch>>;
async fn count_rows(
&self,
snapshot: &SnapshotHandle,
filter: Option<String>,
) -> Result<usize>;
async fn count_rows_with_staged(
&self,
snapshot: &SnapshotHandle,
staged: &[StagedHandle],
filter: Option<String>,
) -> Result<usize>;
async fn scan_with_staged(
&self,
snapshot: &SnapshotHandle,
staged: &[StagedHandle],
projection: Option<&[&str]>,
filter: Option<&str>,
) -> Result<Vec<RecordBatch>>;
async fn scan_with_pending(
&self,
snapshot: &SnapshotHandle,
pending: &[RecordBatch],
pending_schema: Option<SchemaRef>,
projection: Option<&[&str]>,
filter: Option<&str>,
key_column: Option<&str>,
) -> Result<Vec<RecordBatch>>;
async fn first_row_id_for_filter(
&self,
snapshot: &SnapshotHandle,
filter: &str,
) -> Result<Option<u64>>;
async fn table_state(
&self,
dataset_uri: &str,
snapshot: &SnapshotHandle,
) -> Result<TableState>;
async fn stage_append(
&self,
snapshot: &SnapshotHandle,
batch: RecordBatch,
prior_stages: &[StagedHandle],
) -> Result<StagedHandle>;
async fn stage_merge_insert(
&self,
snapshot: SnapshotHandle,
batch: RecordBatch,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<StagedHandle>;
async fn commit_staged(
&self,
snapshot: SnapshotHandle,
staged: StagedHandle,
) -> Result<SnapshotHandle>;
async fn stage_overwrite(
&self,
snapshot: &SnapshotHandle,
batch: RecordBatch,
) -> Result<StagedHandle>;
async fn stage_create_btree_index(
&self,
snapshot: &SnapshotHandle,
columns: &[&str],
) -> Result<StagedHandle>;
async fn stage_create_inverted_index(
&self,
snapshot: &SnapshotHandle,
column: &str,
) -> Result<StagedHandle>;
async fn append_batch(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batch: RecordBatch,
) -> Result<(SnapshotHandle, TableState)>;
async fn merge_insert_batches(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batches: Vec<RecordBatch>,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<TableState>;
async fn overwrite_batch(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batch: RecordBatch,
) -> Result<(SnapshotHandle, TableState)>;
async fn delete_where(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
filter: &str,
) -> Result<(SnapshotHandle, DeleteState)>;
async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool>;
async fn create_btree_index(
&self,
snapshot: SnapshotHandle,
columns: &[&str],
) -> Result<SnapshotHandle>;
async fn create_inverted_index(
&self,
snapshot: SnapshotHandle,
column: &str,
) -> Result<SnapshotHandle>;
async fn create_vector_index(
&self,
snapshot: SnapshotHandle,
column: &str,
) -> Result<SnapshotHandle>;
fn root_uri(&self) -> &str;
fn dataset_uri(&self, table_path: &str) -> String;
async fn scan_stream(
&self,
snapshot: &SnapshotHandle,
projection: Option<&[&str]>,
filter: Option<&str>,
order_by: Option<Vec<ColumnOrdering>>,
with_row_id: bool,
) -> Result<DatasetRecordBatchStream>;
}
#[async_trait]
impl TableStorage for TableStore {
async fn open_snapshot_at_entry(&self, entry: &SubTableEntry) -> Result<SnapshotHandle> {
self.open_at_entry(entry).await.map(SnapshotHandle::new)
}
async fn open_snapshot_at_table(
&self,
snapshot: &Snapshot,
table_key: &str,
) -> Result<SnapshotHandle> {
self.open_snapshot_table(snapshot, table_key)
.await
.map(SnapshotHandle::new)
}
async fn open_dataset_head(
&self,
dataset_uri: &str,
branch: Option<&str>,
) -> Result<SnapshotHandle> {
TableStore::open_dataset_head(self, dataset_uri, branch)
.await
.map(SnapshotHandle::new)
}
async fn open_dataset_head_for_write(
&self,
table_key: &str,
dataset_uri: &str,
branch: Option<&str>,
) -> Result<SnapshotHandle> {
TableStore::open_dataset_head_for_write(self, table_key, dataset_uri, branch)
.await
.map(SnapshotHandle::new)
}
async fn open_dataset_at_state(
&self,
table_path: &str,
branch: Option<&str>,
version: u64,
) -> Result<SnapshotHandle> {
TableStore::open_dataset_at_state(self, table_path, branch, version)
.await
.map(SnapshotHandle::new)
}
async fn fork_branch_from_state(
&self,
dataset_uri: &str,
source_branch: Option<&str>,
table_key: &str,
source_version: u64,
target_branch: &str,
) -> Result<SnapshotHandle> {
TableStore::fork_branch_from_state(
self,
dataset_uri,
source_branch,
table_key,
source_version,
target_branch,
)
.await
.map(SnapshotHandle::new)
}
async fn delete_branch(&self, dataset_uri: &str, branch: &str) -> Result<()> {
TableStore::delete_branch(self, dataset_uri, branch).await
}
async fn reopen_for_mutation(
&self,
dataset_uri: &str,
branch: Option<&str>,
table_key: &str,
expected_version: u64,
) -> Result<SnapshotHandle> {
TableStore::reopen_for_mutation(self, dataset_uri, branch, table_key, expected_version)
.await
.map(SnapshotHandle::new)
}
fn ensure_expected_version(
&self,
snapshot: &SnapshotHandle,
table_key: &str,
expected_version: u64,
) -> Result<()> {
TableStore::ensure_expected_version(self, snapshot.dataset(), table_key, expected_version)
}
async fn scan(
&self,
snapshot: &SnapshotHandle,
projection: Option<&[&str]>,
filter: Option<&str>,
order_by: Option<Vec<ColumnOrdering>>,
) -> Result<Vec<RecordBatch>> {
TableStore::scan(self, snapshot.dataset(), projection, filter, order_by).await
}
async fn scan_with_row_id(
&self,
snapshot: &SnapshotHandle,
projection: Option<&[&str]>,
filter: Option<&str>,
order_by: Option<Vec<ColumnOrdering>>,
with_row_id: bool,
) -> Result<Vec<RecordBatch>> {
TableStore::scan_with(
self,
snapshot.dataset(),
projection,
filter,
order_by,
with_row_id,
|_| Ok(()),
)
.await
}
async fn scan_batches(&self, snapshot: &SnapshotHandle) -> Result<Vec<RecordBatch>> {
TableStore::scan_batches(self, snapshot.dataset()).await
}
async fn scan_batches_for_rewrite(
&self,
snapshot: &SnapshotHandle,
) -> Result<Vec<RecordBatch>> {
TableStore::scan_batches_for_rewrite(self, snapshot.dataset()).await
}
async fn count_rows(
&self,
snapshot: &SnapshotHandle,
filter: Option<String>,
) -> Result<usize> {
TableStore::count_rows(self, snapshot.dataset(), filter).await
}
async fn count_rows_with_staged(
&self,
snapshot: &SnapshotHandle,
staged: &[StagedHandle],
filter: Option<String>,
) -> Result<usize> {
let staged_writes = staged_handles_as_writes(staged);
TableStore::count_rows_with_staged(self, snapshot.dataset(), &staged_writes, filter).await
}
async fn scan_with_staged(
&self,
snapshot: &SnapshotHandle,
staged: &[StagedHandle],
projection: Option<&[&str]>,
filter: Option<&str>,
) -> Result<Vec<RecordBatch>> {
let staged_writes = staged_handles_as_writes(staged);
TableStore::scan_with_staged(
self,
snapshot.dataset(),
&staged_writes,
projection,
filter,
)
.await
}
async fn scan_with_pending(
&self,
snapshot: &SnapshotHandle,
pending: &[RecordBatch],
pending_schema: Option<SchemaRef>,
projection: Option<&[&str]>,
filter: Option<&str>,
key_column: Option<&str>,
) -> Result<Vec<RecordBatch>> {
TableStore::scan_with_pending(
self,
snapshot.dataset(),
pending,
pending_schema,
projection,
filter,
key_column,
)
.await
}
async fn first_row_id_for_filter(
&self,
snapshot: &SnapshotHandle,
filter: &str,
) -> Result<Option<u64>> {
TableStore::first_row_id_for_filter(self, snapshot.dataset(), filter).await
}
async fn table_state(
&self,
dataset_uri: &str,
snapshot: &SnapshotHandle,
) -> Result<TableState> {
TableStore::table_state(self, dataset_uri, snapshot.dataset()).await
}
async fn stage_append(
&self,
snapshot: &SnapshotHandle,
batch: RecordBatch,
prior_stages: &[StagedHandle],
) -> Result<StagedHandle> {
let staged_writes = staged_handles_as_writes(prior_stages);
TableStore::stage_append(self, snapshot.dataset(), batch, &staged_writes)
.await
.map(StagedHandle::new)
}
async fn stage_merge_insert(
&self,
snapshot: SnapshotHandle,
batch: RecordBatch,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<StagedHandle> {
let ds = Arc::try_unwrap(snapshot.into_arc())
.unwrap_or_else(|arc| (*arc).clone());
TableStore::stage_merge_insert(
self,
ds,
batch,
key_columns,
when_matched,
when_not_matched,
)
.await
.map(StagedHandle::new)
}
async fn commit_staged(
&self,
snapshot: SnapshotHandle,
staged: StagedHandle,
) -> Result<SnapshotHandle> {
let ds_arc = snapshot.into_arc();
let transaction = staged.into_staged().transaction;
TableStore::commit_staged(self, ds_arc, transaction)
.await
.map(SnapshotHandle::new)
}
async fn stage_overwrite(
&self,
snapshot: &SnapshotHandle,
batch: RecordBatch,
) -> Result<StagedHandle> {
TableStore::stage_overwrite(self, snapshot.dataset(), batch)
.await
.map(StagedHandle::new)
}
async fn stage_create_btree_index(
&self,
snapshot: &SnapshotHandle,
columns: &[&str],
) -> Result<StagedHandle> {
TableStore::stage_create_btree_index(self, snapshot.dataset(), columns)
.await
.map(StagedHandle::new)
}
async fn stage_create_inverted_index(
&self,
snapshot: &SnapshotHandle,
column: &str,
) -> Result<StagedHandle> {
TableStore::stage_create_inverted_index(self, snapshot.dataset(), column)
.await
.map(StagedHandle::new)
}
async fn append_batch(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batch: RecordBatch,
) -> Result<(SnapshotHandle, TableState)> {
let mut ds = Arc::try_unwrap(snapshot.into_arc())
.unwrap_or_else(|arc| (*arc).clone());
let state = TableStore::append_batch(self, dataset_uri, &mut ds, batch).await?;
Ok((SnapshotHandle::new(ds), state))
}
async fn merge_insert_batches(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batches: Vec<RecordBatch>,
key_columns: Vec<String>,
when_matched: WhenMatched,
when_not_matched: WhenNotMatched,
) -> Result<TableState> {
let ds = Arc::try_unwrap(snapshot.into_arc())
.unwrap_or_else(|arc| (*arc).clone());
TableStore::merge_insert_batches(
self,
dataset_uri,
ds,
batches,
key_columns,
when_matched,
when_not_matched,
)
.await
}
async fn overwrite_batch(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
batch: RecordBatch,
) -> Result<(SnapshotHandle, TableState)> {
let mut ds = Arc::try_unwrap(snapshot.into_arc())
.unwrap_or_else(|arc| (*arc).clone());
let state = TableStore::overwrite_batch(self, dataset_uri, &mut ds, batch).await?;
Ok((SnapshotHandle::new(ds), state))
}
async fn delete_where(
&self,
dataset_uri: &str,
snapshot: SnapshotHandle,
filter: &str,
) -> Result<(SnapshotHandle, DeleteState)> {
let mut ds = Arc::try_unwrap(snapshot.into_arc())
.unwrap_or_else(|arc| (*arc).clone());
let state = TableStore::delete_where(self, dataset_uri, &mut ds, filter).await?;
Ok((SnapshotHandle::new(ds), state))
}
async fn has_btree_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
TableStore::has_btree_index(self, snapshot.dataset(), column).await
}
async fn has_fts_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
TableStore::has_fts_index(self, snapshot.dataset(), column).await
}
async fn has_vector_index(&self, snapshot: &SnapshotHandle, column: &str) -> Result<bool> {
TableStore::has_vector_index(self, snapshot.dataset(), column).await
}
async fn create_btree_index(
&self,
snapshot: SnapshotHandle,
columns: &[&str],
) -> Result<SnapshotHandle> {
let mut ds = Arc::try_unwrap(snapshot.into_arc())
.unwrap_or_else(|arc| (*arc).clone());
TableStore::create_btree_index(self, &mut ds, columns).await?;
Ok(SnapshotHandle::new(ds))
}
async fn create_inverted_index(
&self,
snapshot: SnapshotHandle,
column: &str,
) -> Result<SnapshotHandle> {
let mut ds = Arc::try_unwrap(snapshot.into_arc())
.unwrap_or_else(|arc| (*arc).clone());
TableStore::create_inverted_index(self, &mut ds, column).await?;
Ok(SnapshotHandle::new(ds))
}
async fn create_vector_index(
&self,
snapshot: SnapshotHandle,
column: &str,
) -> Result<SnapshotHandle> {
let mut ds = Arc::try_unwrap(snapshot.into_arc())
.unwrap_or_else(|arc| (*arc).clone());
TableStore::create_vector_index(self, &mut ds, column).await?;
Ok(SnapshotHandle::new(ds))
}
fn root_uri(&self) -> &str {
TableStore::root_uri(self)
}
fn dataset_uri(&self, table_path: &str) -> String {
TableStore::dataset_uri(self, table_path)
}
async fn scan_stream(
&self,
snapshot: &SnapshotHandle,
projection: Option<&[&str]>,
filter: Option<&str>,
order_by: Option<Vec<ColumnOrdering>>,
with_row_id: bool,
) -> Result<DatasetRecordBatchStream> {
TableStore::scan_stream(snapshot.dataset(), projection, filter, order_by, with_row_id).await
}
}