use std::{collections::HashMap, sync::Arc, time::Duration};
use lance_io::object_store::{ObjectStore, ObjectStoreParams};
use lance_table::io::commit::{commit_handler_from_url, CommitHandler};
use object_store::{aws::AwsCredentialProvider, DynObjectStore};
use snafu::{location, Location};
use tracing::instrument;
use url::Url;
use super::{ReadParams, DEFAULT_INDEX_CACHE_SIZE, DEFAULT_METADATA_CACHE_SIZE};
use crate::{
error::{Error, Result},
session::Session,
Dataset,
};
#[derive(Debug, Clone)]
pub struct DatasetBuilder {
index_cache_size: usize,
metadata_cache_size: usize,
session: Option<Arc<Session>>,
commit_handler: Option<Arc<dyn CommitHandler>>,
options: ObjectStoreParams,
version: Option<u64>,
table_uri: String,
}
impl DatasetBuilder {
pub fn from_uri<T: AsRef<str>>(table_uri: T) -> Self {
Self {
index_cache_size: DEFAULT_INDEX_CACHE_SIZE,
metadata_cache_size: DEFAULT_METADATA_CACHE_SIZE,
table_uri: table_uri.as_ref().to_string(),
options: ObjectStoreParams::default(),
commit_handler: None,
session: None,
version: None,
}
}
}
impl DatasetBuilder {
pub fn with_index_cache_size(mut self, cache_size: usize) -> Self {
self.index_cache_size = cache_size;
self
}
pub fn with_metadata_cache_size(mut self, cache_size: usize) -> Self {
self.metadata_cache_size = cache_size;
self
}
pub fn with_block_size(mut self, block_size: usize) -> Self {
self.options.block_size = Some(block_size);
self
}
pub fn with_version(mut self, version: u64) -> Self {
self.version = Some(version);
self
}
pub fn with_commit_handler(mut self, commit_handler: Arc<dyn CommitHandler>) -> Self {
self.commit_handler = Some(commit_handler);
self
}
pub fn with_s3_credentials_refresh_offset(mut self, offset: Duration) -> Self {
self.options.s3_credentials_refresh_offset = offset;
self
}
pub fn with_aws_credentials_provider(mut self, credentials: AwsCredentialProvider) -> Self {
self.options.aws_credentials = Some(credentials);
self
}
pub fn with_object_store(
mut self,
object_store: Arc<DynObjectStore>,
location: Url,
commit_handler: Arc<dyn CommitHandler>,
) -> Self {
self.options.object_store = Some((object_store, location));
self.commit_handler = Some(commit_handler);
self
}
pub fn with_storage_options(mut self, storage_options: HashMap<String, String>) -> Self {
self.options.storage_options = Some(storage_options);
self
}
pub fn with_storage_option(mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> Self {
let mut storage_options = self.options.storage_options.unwrap_or_default();
storage_options.insert(key.as_ref().to_string(), value.as_ref().to_string());
self.options.storage_options = Some(storage_options);
self
}
pub fn with_read_params(mut self, read_params: ReadParams) -> Self {
self = self
.with_index_cache_size(read_params.index_cache_size)
.with_metadata_cache_size(read_params.metadata_cache_size);
if let Some(options) = read_params.store_options {
self.options = options;
}
if let Some(session) = read_params.session {
self.session = Some(session);
}
if let Some(commit_handler) = read_params.commit_handler {
self.commit_handler = Some(commit_handler);
}
self
}
pub fn with_session(mut self, session: Arc<Session>) -> Self {
self.session = Some(session);
self
}
pub async fn build_object_store(self) -> Result<(ObjectStore, Arc<dyn CommitHandler>)> {
let commit_handler = match self.commit_handler {
Some(commit_handler) => Ok(commit_handler),
None => commit_handler_from_url(&self.table_uri, &Some(self.options.clone())).await,
}?;
match &self.options.object_store {
Some(store) => Ok((
ObjectStore::new(
store.0.clone(),
store.1.clone(),
self.options.block_size,
self.options.object_store_wrapper,
),
commit_handler,
)),
None => {
let (store, _path) =
ObjectStore::from_uri_and_params(&self.table_uri, &self.options).await?;
Ok((store, commit_handler))
}
}
}
#[instrument(skip_all)]
pub async fn load(mut self) -> Result<Dataset> {
let session = match self.session.take() {
Some(session) => session,
None => Arc::new(Session::new(
self.index_cache_size,
self.metadata_cache_size,
)),
};
let version = self.version;
let (object_store, commit_handler) = self.build_object_store().await?;
let base_path = object_store.base_path();
let manifest = match version {
Some(version) => {
commit_handler
.resolve_version(base_path, version, &object_store.inner)
.await?
}
None => commit_handler
.resolve_latest_version(base_path, &object_store.inner)
.await
.map_err(|e| Error::DatasetNotFound {
path: base_path.to_string(),
source: Box::new(e),
location: location!(),
})?,
};
Dataset::checkout_manifest(
Arc::new(object_store.clone()),
base_path.clone(),
&manifest,
session,
commit_handler,
)
.await
}
}