Expand description
§object_store
This crate provides a uniform API for interacting with object
storage services and local files via the ObjectStore
trait.
Using this crate, the same binary and code can run in multiple clouds and local test environments, via a simple runtime configuration change.
§Highlights
-
A high-performance async API focused on providing a consistent interface mirroring that of object stores such as S3
-
Production quality, leading this crate to be used in large scale production systems, such as crates.io and [InfluxDB IOx]
-
Support for advanced functionality, including atomic, conditional reads and writes, vectored IO, bulk deletion, and more…
-
Stable and predictable governance via the Apache Arrow project
-
Small dependency footprint, depending on only a small number of common crates
Originally developed by InfluxData and subsequently donated to Apache Arrow.
§APIs
ObjectStore: Core object store APIObjectStoreExt: (New in 0.13.0) Extension trait with additional convenience methods
§Available ObjectStore Implementations
By default, this crate provides the following implementations:
- Memory:
InMemory
Feature flags are used to enable support for other implementations:
- Local filesystem:
LocalFileSystem gcp: Google Cloud Storage support. SeeGoogleCloudStorageBuilderaws: Amazon S3. SeeAmazonS3Builderazure: Azure Blob Storage. SeeMicrosoftAzureBuilderhttp: HTTP/WebDAV Storage. SeeHttpBuilder
See Feature Flags for the full set of flags.
§Why not a Filesystem Interface?
The ObjectStore interface is designed to mirror the APIs
of object stores and not filesystems, and thus has stateless APIs instead
of cursor based interfaces such as Read or Seek available in filesystems.
This design provides the following advantages:
- All operations are atomic, and readers cannot observe partial and/or failed writes
- Methods map directly to object store APIs, providing both efficiency and predictability
- Abstracts away filesystem and operating system specific quirks, ensuring portability
- Allows for functionality not native to filesystems, such as operation preconditions and atomic multipart uploads
This crate does provide BufReader and BufWriter adapters
which provide a more filesystem-like API for working with the
ObjectStore trait, however, they should be used with care
§Adapters
ObjectStore instances can be composed with various adapters
which add additional functionality:
- Rate Throttling:
ThrottleConfig - Concurrent Request Limit:
LimitStore
§Configuration System
This crate provides a configuration system inspired by the APIs exposed by fsspec,
PyArrow FileSystem, and Hadoop FileSystem, allowing creating a DynObjectStore
from a URL and an optional list of key value pairs. This provides a flexible interface
to support a wide variety of user-defined store configurations, with minimal additional
application complexity.
// Can manually create a specific store variant using the appropriate builder
let store: AmazonS3 = AmazonS3Builder::from_env()
.with_bucket_name("my-bucket").build().unwrap();
// Alternatively can create an ObjectStore from an S3 URL
let url = Url::parse("s3://bucket/path").unwrap();
let (store, path) = parse_url(&url).unwrap();
assert_eq!(path.as_ref(), "path");
// Potentially with additional options
let (store, path) = parse_url_opts(&url, vec![("aws_access_key_id", "...")]).unwrap();
// Or with URLs that encode the bucket name in the URL path
let url = Url::parse("https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket/path").unwrap();
let (store, path) = parse_url(&url).unwrap();
assert_eq!(path.as_ref(), "path");§List objects
Use the ObjectStore::list method to iterate over objects in
remote storage or files in the local filesystem:
// create an ObjectStore
let object_store: Arc<dyn ObjectStore> = get_object_store();
// Recursively list all files below the 'data' path.
// 1. On AWS S3 this would be the 'data/' prefix
// 2. On a local filesystem, this would be the 'data' directory
let prefix = Path::from("data");
// Get an `async` stream of Metadata objects:
let mut list_stream = object_store.list(Some(&prefix));
// Print a line about each object
while let Some(meta) = list_stream.next().await.transpose().unwrap() {
println!("Name: {}, size: {}", meta.location, meta.size);
}Which will print out something like the following:
Name: data/file01.parquet, size: 112832
Name: data/file02.parquet, size: 143119
Name: data/child/file03.parquet, size: 100
...§Fetch objects
Use the ObjectStoreExt::get / ObjectStore::get_opts method to fetch the data bytes
from remote storage or files in the local filesystem as a stream.
// Create an ObjectStore
let object_store: Arc<dyn ObjectStore> = get_object_store();
// Retrieve a specific file
let path = Path::from("data/file01.parquet");
// Fetch just the file metadata
let meta = object_store.head(&path).await.unwrap();
println!("{meta:?}");
// Fetch the object including metadata
let result: GetResult = object_store.get(&path).await.unwrap();
assert_eq!(result.meta, meta);
// Buffer the entire object in memory
let object: Bytes = result.bytes().await.unwrap();
assert_eq!(object.len() as u64, meta.size);
// Alternatively stream the bytes from object storage
let stream = object_store.get(&path).await.unwrap().into_stream();
// Count the '0's using `try_fold` from `TryStreamExt` trait
let num_zeros = stream
.try_fold(0, |acc, bytes| async move {
Ok(acc + bytes.iter().filter(|b| **b == 0).count())
}).await.unwrap();
println!("Num zeros in {} is {}", path, num_zeros);§Put Object
Use the ObjectStoreExt::put method to atomically write data.
let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/file1");
let payload = PutPayload::from_static(b"hello");
object_store.put(&path, payload).await.unwrap();§Multipart Upload
Use the ObjectStoreExt::put_multipart / ObjectStore::put_multipart_opts method to atomically write a large
amount of data
let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/large_file");
let upload = object_store.put_multipart(&path).await.unwrap();
let mut write = WriteMultipart::new(upload);
write.write(b"hello");
write.finish().await.unwrap();§Vectored Read
A common pattern, especially when reading structured datasets, is to need to fetch multiple, potentially non-contiguous, ranges of a particular object.
ObjectStore::get_ranges provides an efficient way to perform such vectored IO, and will
automatically coalesce adjacent ranges into an appropriate number of parallel requests.
let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/large_file");
let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap();
assert_eq!(ranges.len(), 3);
assert_eq!(ranges[0].len(), 10);To retrieve ranges from a versioned object, use ObjectStore::get_opts by specifying the range in the GetOptions.
let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/large_file");
let ranges = vec![90..100, 400..600, 0..10];
for range in ranges {
let opts = GetOptions::default().with_range(Some(range));
let data = object_store.get_opts(&path, opts).await.unwrap();
// Do something with the data
}§Vectored Write
When writing data it is often the case that the size of the output is not known ahead of time.
A common approach to handling this is to bump-allocate a Vec, whereby the underlying
allocation is repeatedly reallocated, each time doubling the capacity. The performance of
this is suboptimal as reallocating memory will often involve copying it to a new location.
Fortunately, as PutPayload does not require memory regions to be contiguous, it is
possible to instead allocate memory in chunks and avoid bump allocating. PutPayloadMut
encapsulates this approach
let object_store: Arc<dyn ObjectStore> = get_object_store();
let path = Path::from("data/large_file");
let mut buffer = PutPayloadMut::new().with_block_size(8192);
for _ in 0..22 {
buffer.extend_from_slice(&[0; 1024]);
}
let payload = buffer.freeze();
// Payload consists of 3 separate 8KB allocations
assert_eq!(payload.as_ref().len(), 3);
assert_eq!(payload.as_ref()[0].len(), 8192);
assert_eq!(payload.as_ref()[1].len(), 8192);
assert_eq!(payload.as_ref()[2].len(), 6144);
object_store.put(&path, payload).await.unwrap();§Conditional Fetch
More complex object retrieval can be supported by ObjectStore::get_opts.
For example, efficiently refreshing a cache without re-fetching the entire object data if the object hasn’t been modified.
struct CacheEntry {
/// Data returned by last request
data: Bytes,
/// ETag identifying the object returned by the server
e_tag: String,
/// Instant of last refresh
refreshed_at: Instant,
}
/// Example cache that checks entries after 10 seconds for a new version
struct Cache {
entries: HashMap<Path, CacheEntry>,
store: Arc<dyn ObjectStore>,
}
impl Cache {
pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
Ok(match self.entries.get_mut(path) {
Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) {
true => e.data.clone(), // Return cached data
false => { // Check if remote version has changed
let opts = GetOptions::new().with_if_none_match(Some(e.e_tag.clone()));
match self.store.get_opts(&path, opts).await {
Ok(d) => e.data = d.bytes().await?,
Err(Error::NotModified { .. }) => {} // Data has not changed
Err(e) => return Err(e),
};
e.refreshed_at = Instant::now();
e.data.clone()
}
},
None => { // Not cached, fetch data
let get = self.store.get(&path).await?;
let e_tag = get.meta.e_tag.clone();
let data = get.bytes().await?;
if let Some(e_tag) = e_tag {
let entry = CacheEntry {
e_tag,
data: data.clone(),
refreshed_at: Instant::now(),
};
self.entries.insert(path.clone(), entry);
}
data
}
})
}
}§Conditional Put
The default behaviour when writing data is to upsert any existing object at the given path,
overwriting any previous value. More complex behaviours can be achieved using PutMode, and
can be used to build Optimistic Concurrency Control based transactions. This facilitates
building metadata catalogs, such as Apache Iceberg or Delta Lake, directly on top of object
storage, without relying on a separate DBMS.
let store = get_object_store();
let path = Path::from("test");
// Perform a conditional update on path
loop {
// Perform get request
let r = store.get(&path).await.unwrap();
// Save version information fetched
let version = UpdateVersion {
e_tag: r.meta.e_tag.clone(),
version: r.meta.version.clone(),
};
// Compute new version of object contents
let new = do_update(r.bytes().await.unwrap());
// Attempt to commit transaction
match store.put_opts(&path, new.into(), PutMode::Update(version).into()).await {
Ok(_) => break, // Successfully committed
Err(Error::Precondition { .. }) => continue, // Object has changed, try again
Err(e) => panic!("{e}")
}
}§Feature Flags
The feature set is layered so that you can pick an object store implementation, its HTTP transport, and its cryptography provider independently:
cloud-baseshared cloud implementation (XML/JSON parsing, credentials, retry, etc.) and intentionally does not depend onreqwestor a cryptography provider.reqwestenables the built-inreqwest-basedHttpConnector.aws-lc-rsandringeach provide a bundledclient::CryptoProvider.<provider>-base(aws-base,azure-base,gcp-base,http-base) adds the implementation specific logic on top ofcloud-basewithout pulling inreqwestor a cryptography provider.<provider>(aws,azure,gcp,http) is the batteries-included feature for<provider>-base+reqwest(withrustls) + the defaultaws-lc-rscryptography provider, and is the typical choice.
§Implementation specific features
| Feature | Enables | Notes |
|---|---|---|
aws | aws-base + reqwest + aws-lc-rs | Amazon S3 with the built-in HTTP transport. |
azure | azure-base + reqwest + aws-lc-rs | Azure Blob Storage with the built-in HTTP transport. |
gcp | gcp-base + reqwest + aws-lc-rs | Google Cloud Storage with the built-in HTTP transport. |
http | http-base + reqwest + aws-lc-rs | HTTP/WebDAV with the built-in HTTP transport. |
aws-base | S3 without reqwest or crypto; supply your own HttpConnector and client::CryptoProvider. | |
azure-base | Azure without reqwest or crypto; supply your own HttpConnector and client::CryptoProvider. | |
gcp-base | GCS without reqwest or crypto; supply your own HttpConnector and client::CryptoProvider. | |
http-base | HTTP/WebDAV without reqwest; supply your own HttpConnector. |
§Transport and crypto features
| Feature | Description |
|---|---|
reqwest | Enables the reqwest-based HttpConnector. Enabled automatically by aws, azure, gcp, and http. |
aws-lc-rs | Bundled aws-lc-rs-based client::CryptoProvider. The default for the batteries-included provider features. |
ring | Bundled ring-based client::CryptoProvider, e.g. for WASM targets. |
cloud-base | Shared cloud-provider implementation. Enabled automatically by *-base features; usually not enabled directly. |
§Other features
| Feature | Description |
|---|---|
fs (default) | Local filesystem store via LocalFileSystem. |
tokio | Enables Tokio-based utilities such as BufReader and BufWriter. Pulled in automatically by fs and the *-base features. |
integration | Exposes the integration module, a reusable test suite for verifying custom ObjectStore implementations. Not API-stable. |
§Selecting a reqwest TLS backend
reqwest needs a TLS backend to compile, so whenever you enable the reqwest feature directly
you must also enable one of reqwest’s TLS features:
| reqwest feature | TLS stack | Notes |
|---|---|---|
reqwest/rustls | rustls with aws-lc-rs | enables aws-lc-rs. This is what aws/azure/gcp/http enable. |
reqwest/native-tls | the platform’s native TLS (OpenSSL / SChannel / Secure Transport) | enables neither rustls nor aws-lc-rs. |
reqwest/rustls-no-provider | rustls with no bundled provider | enables neither provider; you must install one at runtime, e.g. rustls::crypto::ring::default_provider().install_default(). |
§Feature examples
S3 implementation only; user provides the HTTP connector and crypto provider:
object_store = { default-features = false, features = ["aws-base"] }S3 implementation + reqwest + aws-lc-rs signing (equivalent to the aws feature):
object_store = { default-features = false, features = ["aws-base", "reqwest", "reqwest/rustls", "aws-lc-rs"] }S3 implementation + reqwest with native TLS + ring signing (no aws-lc-rs in the dependency tree):
object_store = { default-features = false, features = ["aws-base", "reqwest", "reqwest/native-tls", "ring"] }§Cryptography
Request signing (e.g. AWS SigV4 or GCP service-account signing) requires a
client::CryptoProvider. The aws, gcp, and azure features
use aws-lc-rs, matching reqwest’s default so that applications do not end up with
two crypto stacks.
If you wish to use ring (e.g. to support WASM targets), use the
*-base feature flags, e.g. aws-base, and then enable the ring feature.
When enabling the aws-lc-rs feature without the built-in reqwest
transport (e.g. aws-base + aws-lc-rs with a custom HttpConnector),
you must also select an aws-lc-rs backend, as object_store does not
pick one for you.
For example enable aws-lc-rs/aws-lc-sys (or fips / non-fips).
If both ring and aws-lc-rs are enabled, aws-lc-rs is used by default.
You can also implement a custom client::CryptoProvider to use your own cryptographic library.
This signing provider is independent of the TLS crypto provider used by the
built-in reqwest transport — see
Selecting a reqwest TLS backend. The
only combination that needs the provider registered manually (e.g.
rustls::crypto::ring::default_provider().install_default() in your main)
is reqwest/rustls-no-provider; reqwest/rustls and reqwest/native-tls
configure their TLS stack automatically.
§TLS Certificates
Stores that use HTTPS/TLS (this is true for most cloud stores) can choose how certificates are validated.
By default rustls-platform-verifier is used to verify certificates using the system’s certificate
facilities. Alternatively, this functionality can be disabled using
ClientOptions::with_no_system_certificates and certificates manually registered using
ClientOptions::with_root_certificate.
These could be a custom CA chain, or alternatively an alternative trust store, e.g. webpki-roots.
use object_store::{ClientOptions, Certificate};
let mut options = ClientOptions::default().with_no_system_certificates(true);
for root_cert in webpki_root_certs::TLS_SERVER_ROOT_CERTS {
options = options.with_root_certificate(Certificate::from_der(root_cert.as_ref()).unwrap());
}§Customizing HTTP Clients
Many ObjectStore implementations permit customization of the HTTP client via
the HttpConnector trait and utilities in the client module.
Examples include injecting custom HTTP headers or using an alternate
tokio Runtime for I/O requests. To replace reqwest entirely (rather than
tweak the bundled transport) see Disabling reqwest.
§Disabling reqwest
The aws, azure, gcp, and http features each bundle a
reqwest-based HTTP transport, which is the right choice for most
applications. If you would rather supply your own HTTP client — for example
to share an existing client, to target a platform where reqwest does not
compile (such as wasm32-wasip1), or to keep reqwest out of your
dependency tree — use the matching *-base feature and provide an
HttpConnector at builder time.
Remember to disable the default features so that fs (and its transitive
dependencies) is not pulled in:
[dependencies]
object_store = { version = "0.13", default-features = false, features = ["aws-base"] }use object_store::aws::AmazonS3Builder;
let store = AmazonS3Builder::from_env()
// `my_connector` is your own `impl HttpConnector`
.with_http_connector(my_connector)
.build()?;See Feature Flags above for the full set of flags.
Re-exports§
pub use client::ClientConfigKey;cloud-basepub use client::ClientOptions;cloud-basepub use client::CredentialProvider;cloud-basepub use client::StaticCredentialProvider;cloud-basepub use client::Certificate;cloud-baseandreqwestand non-WebAssembly
Modules§
- aws
aws-base - An object store implementation for S3
- azure
azure-base - An object store implementation for Azure blob storage
- buffered
tokio - Utilities for performing tokio-style buffered IO
- chunked
Non-WebAssembly - A
ChunkedStorethat can be used to test streaming behaviour - client
cloud-base - Generic utilities for network based
ObjectStoreimplementations - delimited
- Utility for streaming newline delimited files from object storage
- gcp
gcp-base - An object store implementation for Google Cloud Storage
- http
http-base - An object store implementation for generic HTTP servers
- integration
integration - Integration tests for custom object store implementations
- limit
tokio - An object store that limits the maximum concurrency of the wrapped implementation
- list
- Paginated Listing
- local
fsand non-WebAssembly - An object store implementation for a local filesystem
- memory
- An in-memory object store implementation
- multipart
- Cloud Multipart Upload
- path
- Path abstraction for Object Storage
- prefix
- An object store wrapper handling a constant path prefix
- registry
- Map object URLs to
ObjectStore - signer
cloud-base - Abstraction of signed URL generation for those object store implementations that support it
- throttle
tokio - A throttling object store wrapper
Structs§
- Attribute
Value - The value of an
Attribute - Attributes
- Additional attributes of an object
- Attributes
Iter - Iterator over
Attributes - Backoff
Config cloud-base - Exponential backoff with decorrelated jitter algorithm
- Copy
Options - Options for a copy request
- Extensions
- A type map of protocol extensions.
- GetOptions
- Options for a get request, such as range
- GetResult
- Result for a get request
- Header
Map - A specialized multimap for header names and values.
- Header
Value - Represents an HTTP header field value.
- List
Result - Result of a list call that includes objects, prefixes (directories) and a token for the next set of results. Individual result sets may be limited to 1,000 objects based on the underlying object storage’s limitations.
- Object
Meta - The metadata that describes an object.
- PutMultipart
Options - Options for
ObjectStore::put_multipart_opts - PutOptions
- Options for a put request
- PutPayload
- A cheaply cloneable, ordered collection of
Bytes - PutPayload
Into Iter - An owning iterator of
PutPayload - PutPayload
Iter - An iterator over
PutPayload - PutPayload
Mut - A builder for
PutPayloadthat avoids reallocating memory - PutResult
- Result for a put request
- Rename
Options - Options for a rename request
- Retry
Config cloud-base - The configuration for how to respond to request errors
- TagSet
- A collection of key value pairs used to annotate objects
- Update
Version - Uniquely identifies a version of an object to update
- Write
Multipart tokio - A synchronous write API for uploading data in parallel in fixed size chunks
Enums§
- Attribute
- Additional object attribute types
- Copy
Mode - Configure preconditions for the copy operation
- Error
- A specialized
Errorfor object store-related errors - GetRange
- Request only a portion of an object’s bytes
- GetResult
Payload - The kind of a
GetResult - Object
Store Scheme - Recognizes various URL formats, identifying the relevant
ObjectStore - PutMode
- Configure preconditions for the put operation
- Rename
Target Mode - Configure preconditions for the target of rename operation.
Constants§
- OBJECT_
STORE_ COALESCE_ DEFAULT - Range requests with a gap less than or equal to this,
will be coalesced into a single request by
coalesce_ranges
Traits§
- Multipart
Upload - A trait allowing writing an object in fixed size chunks
- Object
Store - Universal API for object store services.
- Object
Store Ext - Extension trait for
ObjectStorewith convenience functions.
Functions§
- coalesce_
ranges - Takes a function
fetchthat can fetch a range of bytes and uses this to fetch the provided byteranges - collect_
bytes - Collect a stream into
Bytesavoiding copying in the event of a single chunk - parse_
url - Create an
ObjectStorebased on the providedurl - parse_
url_ opts - Create an
ObjectStorebased on the providedurland options
Type Aliases§
- DynObject
Store - An alias for a dynamically dispatched object store implementation.
- Multipart
Id - Id type for multipart uploads.
- Result
- A specialized
Resultfor object store-related errors - Upload
Part - An upload part request