1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
use std::{collections::HashMap, sync::Arc, time::Duration};
use lance_core::io::{
commit::CommitHandler,
object_store::{ObjectStore, ObjectStoreParams},
};
use object_store::{aws::AwsCredentialProvider, DynObjectStore};
use snafu::{location, Location};
use url::Url;
use super::{ReadParams, DEFAULT_INDEX_CACHE_SIZE, DEFAULT_METADATA_CACHE_SIZE};
use crate::{
error::{Error, Result},
session::Session,
Dataset,
};
/// builder for loading a [`Dataset`].
#[derive(Debug, Clone)]
pub struct DatasetBuilder {
/// Cache size for index cache. If it is zero, index cache is disabled.
index_cache_size: usize,
/// Metadata cache size for the fragment metadata. If it is zero, metadata
/// cache is disabled.
metadata_cache_size: usize,
session: Option<Arc<Session>>,
options: ObjectStoreParams,
version: Option<u64>,
table_uri: String,
}
impl DatasetBuilder {
pub fn from_uri<T: AsRef<str>>(table_uri: T) -> Self {
Self {
index_cache_size: DEFAULT_INDEX_CACHE_SIZE,
metadata_cache_size: DEFAULT_METADATA_CACHE_SIZE,
table_uri: table_uri.as_ref().to_string(),
options: ObjectStoreParams::default(),
session: None,
version: None,
}
}
}
// Much of this builder is directly inspired from the to delta-rs table builder implementation
// https://github.com/delta-io/delta-rs/main/crates/deltalake-core/src/table/builder.rs
impl DatasetBuilder {
/// Set the cache size for indices. Set to zero, to disable the cache.
pub fn with_index_cache_size(mut self, cache_size: usize) -> Self {
self.index_cache_size = cache_size;
self
}
/// Set the cache size for the file metadata. Set to zero to disable this cache.
pub fn with_metadata_cache_size(mut self, cache_size: usize) -> Self {
self.metadata_cache_size = cache_size;
self
}
/// The block size passed to the underlying Object Store reader.
///
/// This is used to control the minimal request size.
/// Defaults to 4KB for local files and 64KB for others
pub fn with_block_size(mut self, block_size: usize) -> Self {
self.options.block_size = Some(block_size);
self
}
/// Sets `version` to the builder
pub fn with_version(mut self, version: u64) -> Self {
self.version = Some(version);
self
}
pub fn with_commit_handler(mut self, commit_handler: Arc<dyn CommitHandler>) -> Self {
self.options.commit_handler = Some(commit_handler);
self
}
/// Sets the s3 credentials refresh.
/// This only applies to s3 storage.
pub fn with_s3_credentials_refresh_offset(mut self, offset: Duration) -> Self {
self.options.s3_credentials_refresh_offset = offset;
self
}
/// Sets the aws credentials provider.
/// This only applies to aws object store.
pub fn with_aws_credentials_provider(mut self, credentials: AwsCredentialProvider) -> Self {
self.options.aws_credentials = Some(credentials);
self
}
/// Directly set the object store to use.
pub fn with_object_store(mut self, object_store: Arc<DynObjectStore>, location: Url) -> Self {
self.options.object_store = Some((object_store, location));
self
}
/// Set options used to initialize storage backend
///
/// Options may be passed in the HashMap or set as environment variables. See documentation of
/// underlying object store implementation for details.
///
/// - [Azure options](https://docs.rs/object_store/latest/object_store/azure/enum.AzureConfigKey.html#variants)
/// - [S3 options](https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants)
/// - [Google options](https://docs.rs/object_store/latest/object_store/gcp/enum.GoogleConfigKey.html#variants)
pub fn with_storage_options(mut self, storage_options: HashMap<String, String>) -> Self {
self.options.storage_options = Some(storage_options);
self
}
/// Set a single option used to initialize storage backend
/// For example, to set the region for S3, you can use:
///
/// ```ignore
/// let builder = DatasetBuilder::from_uri("s3://bucket/path")
/// .with_storage_option("region", "us-east-1");
/// ```
pub fn with_storage_option(mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> Self {
let mut storage_options = self.options.storage_options.unwrap_or_default();
storage_options.insert(key.as_ref().to_string(), value.as_ref().to_string());
self.options.storage_options = Some(storage_options);
self
}
/// Set options based on [ReadParams].
pub fn with_read_params(mut self, read_params: ReadParams) -> Self {
self = self
.with_index_cache_size(read_params.index_cache_size)
.with_metadata_cache_size(read_params.metadata_cache_size);
if let Some(options) = read_params.store_options {
self.options = options;
}
if let Some(session) = read_params.session {
self.session = Some(session);
}
self
}
/// Re-use an existing session.
///
/// The session holds caches for index and metadata.
///
/// If this is set, then `with_index_cache_size` and `with_metadata_cache_size` are ignored.
pub fn with_session(mut self, session: Arc<Session>) -> Self {
self.session = Some(session);
self
}
/// Build a lance object store for the given config
pub async fn build_object_store(self) -> Result<ObjectStore> {
match &self.options.object_store {
Some(store) => Ok(ObjectStore::new(
store.0.clone(),
store.1.clone(),
self.options.block_size,
self.options.commit_handler,
self.options.object_store_wrapper,
)),
None => {
let (store, _path) =
ObjectStore::from_uri_and_params(&self.table_uri, &self.options).await?;
Ok(store)
}
}
}
pub async fn load(mut self) -> Result<Dataset> {
let session = match self.session.take() {
Some(session) => session,
None => Arc::new(Session::new(
self.index_cache_size,
self.metadata_cache_size,
)),
};
let version = self.version;
let object_store = self.build_object_store().await?;
let base_path = object_store.base_path();
let manifest = match version {
Some(version) => {
object_store
.commit_handler
.resolve_version(base_path, version, &object_store.inner)
.await?
}
None => object_store
.commit_handler
.resolve_latest_version(base_path, &object_store.inner)
.await
.map_err(|e| Error::DatasetNotFound {
path: base_path.to_string(),
source: Box::new(e),
location: location!(),
})?,
};
Dataset::checkout_manifest(
Arc::new(object_store.clone()),
base_path.clone(),
&manifest,
session,
)
.await
}
}