object_store/lib.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![cfg_attr(docsrs, feature(doc_cfg))]
19#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
20#![warn(
21 missing_copy_implementations,
22 missing_debug_implementations,
23 missing_docs,
24 clippy::explicit_iter_loop,
25 clippy::future_not_send,
26 clippy::use_self,
27 clippy::clone_on_ref_ptr,
28 unreachable_pub
29)]
30
31//! # object_store
32//!
33//! This crate provides a uniform API for interacting with object
34//! storage services and local files via the [`ObjectStore`]
35//! trait.
36//!
37//! Using this crate, the same binary and code can run in multiple
38//! clouds and local test environments, via a simple runtime
39//! configuration change.
40//!
41//! # Highlights
42//!
43//! 1. A high-performance async API focused on providing a consistent interface
44//! mirroring that of object stores such as [S3]
45//!
46//! 2. Production quality, leading this crate to be used in large
47//! scale production systems, such as [crates.io] and [InfluxDB IOx]
48//!
49//! 3. Support for advanced functionality, including atomic, conditional reads
50//! and writes, vectored IO, bulk deletion, and more...
51//!
52//! 4. Stable and predictable governance via the [Apache Arrow] project
53//!
54//! 5. Small dependency footprint, depending on only a small number of common crates
55//!
56//! Originally developed by [InfluxData] and subsequently donated
57//! to [Apache Arrow].
58//!
59//! [Apache Arrow]: https://arrow.apache.org/
60//! [InfluxData]: https://www.influxdata.com/
61//! [crates.io]: https://github.com/rust-lang/crates.io
62//! [ACID]: https://en.wikipedia.org/wiki/ACID
63//! [S3]: https://aws.amazon.com/s3/
64//!
65//! # APIs
66//!
67//! * [`ObjectStore`]: Core object store API
68//! * [`ObjectStoreExt`]: (*New in 0.13.0*) Extension trait with additional convenience methods
69//!
70//! # Available [`ObjectStore`] Implementations
71//!
72//! By default, this crate provides the following implementations:
73//!
74//! * Memory: [`InMemory`](memory::InMemory)
75//!
76//! Feature flags are used to enable support for other implementations:
77//!
78#"
81)]
82#![cfg_attr(
83 feature = "gcp-base",
84 doc = "* [`gcp`]: [Google Cloud Storage](https://cloud.google.com/storage/) support. See [`GoogleCloudStorageBuilder`](gcp::GoogleCloudStorageBuilder)"
85)]
86#![cfg_attr(
87 feature = "aws-base",
88 doc = "* [`aws`]: [Amazon S3](https://aws.amazon.com/s3/). See [`AmazonS3Builder`](aws::AmazonS3Builder)"
89)]
90#![cfg_attr(
91 feature = "azure-base",
92 doc = "* [`azure`]: [Azure Blob Storage](https://azure.microsoft.com/en-gb/services/storage/blobs/). See [`MicrosoftAzureBuilder`](azure::MicrosoftAzureBuilder)"
93)]
94#![cfg_attr(
95 feature = "http-base",
96 doc = "* [`http`]: [HTTP/WebDAV Storage](https://datatracker.ietf.org/doc/html/rfc2518). See [`HttpBuilder`](http::HttpBuilder)"
97)]
98//!
99//! See [Feature Flags](#feature-flags) for the full set of flags.
100//!
101//! # Why not a Filesystem Interface?
102//!
103//! The [`ObjectStore`] interface is designed to mirror the APIs
104//! of object stores and *not* filesystems, and thus has stateless APIs instead
105//! of cursor based interfaces such as [`Read`] or [`Seek`] available in filesystems.
106//!
107//! This design provides the following advantages:
108//!
109//! * All operations are atomic, and readers cannot observe partial and/or failed writes
110//! * Methods map directly to object store APIs, providing both efficiency and predictability
111//! * Abstracts away filesystem and operating system specific quirks, ensuring portability
112//! * Allows for functionality not native to filesystems, such as operation preconditions
113//! and atomic multipart uploads
114//!
115//! This crate does provide [`BufReader`] and [`BufWriter`] adapters
116//! which provide a more filesystem-like API for working with the
117//! [`ObjectStore`] trait, however, they should be used with care
118//!
119//! [`BufReader`]: buffered::BufReader
120//! [`BufWriter`]: buffered::BufWriter
121//! [`Read`]: std::io::Read
122//! [`Seek`]: std::io::Seek
123//!
124//! # Adapters
125//!
126//! [`ObjectStore`] instances can be composed with various adapters
127//! which add additional functionality:
128//!
129//! * Rate Throttling: [`ThrottleConfig`](throttle::ThrottleConfig)
130//! * Concurrent Request Limit: [`LimitStore`](limit::LimitStore)
131//!
132//! # Configuration System
133//!
134//! This crate provides a configuration system inspired by the APIs exposed by [fsspec],
135//! [PyArrow FileSystem], and [Hadoop FileSystem], allowing creating a [`DynObjectStore`]
136//! from a URL and an optional list of key value pairs. This provides a flexible interface
137//! to support a wide variety of user-defined store configurations, with minimal additional
138//! application complexity.
139//!
140//! ```no_run,ignore-wasm32
141//! # #[cfg(feature = "aws")] {
142//! # use url::Url;
143//! # use object_store::{parse_url, parse_url_opts};
144//! # use object_store::aws::{AmazonS3, AmazonS3Builder};
145//! #
146//! #
147//! // Can manually create a specific store variant using the appropriate builder
148//! let store: AmazonS3 = AmazonS3Builder::from_env()
149//! .with_bucket_name("my-bucket").build().unwrap();
150//!
151//! // Alternatively can create an ObjectStore from an S3 URL
152//! let url = Url::parse("s3://bucket/path").unwrap();
153//! let (store, path) = parse_url(&url).unwrap();
154//! assert_eq!(path.as_ref(), "path");
155//!
156//! // Potentially with additional options
157//! let (store, path) = parse_url_opts(&url, vec![("aws_access_key_id", "...")]).unwrap();
158//!
159//! // Or with URLs that encode the bucket name in the URL path
160//! let url = Url::parse("https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket/path").unwrap();
161//! let (store, path) = parse_url(&url).unwrap();
162//! assert_eq!(path.as_ref(), "path");
163//! # }
164//! ```
165//!
166//! [PyArrow FileSystem]: https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.from_uri
167//! [fsspec]: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem
168//! [Hadoop FileSystem]: https://hadoop.apache.org/docs/r3.0.0/api/org/apache/hadoop/fs/FileSystem.html#get-java.net.URI-org.apache.hadoop.conf.Configuration-
169//!
170//! # List objects
171//!
172//! Use the [`ObjectStore::list`] method to iterate over objects in
173//! remote storage or files in the local filesystem:
174//!
175//! ```ignore-wasm32
176//! # use object_store::local::LocalFileSystem;
177//! # use std::sync::Arc;
178//! # use object_store::{path::Path, ObjectStore};
179//! # use futures_util::stream::StreamExt;
180//! # // use LocalFileSystem for example
181//! # fn get_object_store() -> Arc<dyn ObjectStore> {
182//! # Arc::new(LocalFileSystem::new())
183//! # }
184//! #
185//! # async fn example() {
186//! #
187//! // create an ObjectStore
188//! let object_store: Arc<dyn ObjectStore> = get_object_store();
189//!
190//! // Recursively list all files below the 'data' path.
191//! // 1. On AWS S3 this would be the 'data/' prefix
192//! // 2. On a local filesystem, this would be the 'data' directory
193//! let prefix = Path::from("data");
194//!
195//! // Get an `async` stream of Metadata objects:
196//! let mut list_stream = object_store.list(Some(&prefix));
197//!
198//! // Print a line about each object
199//! while let Some(meta) = list_stream.next().await.transpose().unwrap() {
200//! println!("Name: {}, size: {}", meta.location, meta.size);
201//! }
202//! # }
203//! ```
204//!
205//! Which will print out something like the following:
206//!
207//! ```text
208//! Name: data/file01.parquet, size: 112832
209//! Name: data/file02.parquet, size: 143119
210//! Name: data/child/file03.parquet, size: 100
211//! ...
212//! ```
213//!
214//! # Fetch objects
215//!
216//! Use the [`ObjectStoreExt::get`] / [`ObjectStore::get_opts`] method to fetch the data bytes
217//! from remote storage or files in the local filesystem as a stream.
218//!
219//! ```ignore-wasm32
220//! # use futures_util::TryStreamExt;
221//! # use object_store::local::LocalFileSystem;
222//! # use std::sync::Arc;
223//! # use bytes::Bytes;
224//! # use object_store::{path::Path, ObjectStore, ObjectStoreExt, GetResult};
225//! # fn get_object_store() -> Arc<dyn ObjectStore> {
226//! # Arc::new(LocalFileSystem::new())
227//! # }
228//! #
229//! # async fn example() {
230//! #
231//! // Create an ObjectStore
232//! let object_store: Arc<dyn ObjectStore> = get_object_store();
233//!
234//! // Retrieve a specific file
235//! let path = Path::from("data/file01.parquet");
236//!
237//! // Fetch just the file metadata
238//! let meta = object_store.head(&path).await.unwrap();
239//! println!("{meta:?}");
240//!
241//! // Fetch the object including metadata
242//! let result: GetResult = object_store.get(&path).await.unwrap();
243//! assert_eq!(result.meta, meta);
244//!
245//! // Buffer the entire object in memory
246//! let object: Bytes = result.bytes().await.unwrap();
247//! assert_eq!(object.len() as u64, meta.size);
248//!
249//! // Alternatively stream the bytes from object storage
250//! let stream = object_store.get(&path).await.unwrap().into_stream();
251//!
252//! // Count the '0's using `try_fold` from `TryStreamExt` trait
253//! let num_zeros = stream
254//! .try_fold(0, |acc, bytes| async move {
255//! Ok(acc + bytes.iter().filter(|b| **b == 0).count())
256//! }).await.unwrap();
257//!
258//! println!("Num zeros in {} is {}", path, num_zeros);
259//! # }
260//! ```
261//!
262//! # Put Object
263//!
264//! Use the [`ObjectStoreExt::put`] method to atomically write data.
265//!
266//! ```ignore-wasm32
267//! # use object_store::local::LocalFileSystem;
268//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
269//! # use std::sync::Arc;
270//! # use object_store::path::Path;
271//! # fn get_object_store() -> Arc<dyn ObjectStore> {
272//! # Arc::new(LocalFileSystem::new())
273//! # }
274//! # async fn put() {
275//! #
276//! let object_store: Arc<dyn ObjectStore> = get_object_store();
277//! let path = Path::from("data/file1");
278//! let payload = PutPayload::from_static(b"hello");
279//! object_store.put(&path, payload).await.unwrap();
280//! # }
281//! ```
282//!
283//! # Multipart Upload
284//!
285//! Use the [`ObjectStoreExt::put_multipart`] / [`ObjectStore::put_multipart_opts`] method to atomically write a large
286//! amount of data
287//!
288//! ```ignore-wasm32
289//! # use object_store::local::LocalFileSystem;
290//! # use object_store::{ObjectStore, ObjectStoreExt, WriteMultipart};
291//! # use std::sync::Arc;
292//! # use bytes::Bytes;
293//! # use tokio::io::AsyncWriteExt;
294//! # use object_store::path::Path;
295//! # fn get_object_store() -> Arc<dyn ObjectStore> {
296//! # Arc::new(LocalFileSystem::new())
297//! # }
298//! # async fn multi_upload() {
299//! #
300//! let object_store: Arc<dyn ObjectStore> = get_object_store();
301//! let path = Path::from("data/large_file");
302//! let upload = object_store.put_multipart(&path).await.unwrap();
303//! let mut write = WriteMultipart::new(upload);
304//! write.write(b"hello");
305//! write.finish().await.unwrap();
306//! # }
307//! ```
308//!
309//! # Vectored Read
310//!
311//! A common pattern, especially when reading structured datasets, is to need to fetch
312//! multiple, potentially non-contiguous, ranges of a particular object.
313//!
314//! [`ObjectStore::get_ranges`] provides an efficient way to perform such vectored IO, and will
315//! automatically coalesce adjacent ranges into an appropriate number of parallel requests.
316//!
317//! ```ignore-wasm32
318//! # use object_store::local::LocalFileSystem;
319//! # use object_store::ObjectStore;
320//! # use std::sync::Arc;
321//! # use bytes::Bytes;
322//! # use tokio::io::AsyncWriteExt;
323//! # use object_store::path::Path;
324//! # fn get_object_store() -> Arc<dyn ObjectStore> {
325//! # Arc::new(LocalFileSystem::new())
326//! # }
327//! # async fn multi_upload() {
328//! #
329//! let object_store: Arc<dyn ObjectStore> = get_object_store();
330//! let path = Path::from("data/large_file");
331//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap();
332//! assert_eq!(ranges.len(), 3);
333//! assert_eq!(ranges[0].len(), 10);
334//! # }
335//! ```
336//!
337//! To retrieve ranges from a versioned object, use [`ObjectStore::get_opts`] by specifying the range in the [`GetOptions`].
338//!
339//! ```ignore-wasm32
340//! # use object_store::local::LocalFileSystem;
341//! # use object_store::ObjectStore;
342//! # use object_store::GetOptions;
343//! # use std::sync::Arc;
344//! # use bytes::Bytes;
345//! # use tokio::io::AsyncWriteExt;
346//! # use object_store::path::Path;
347//! # fn get_object_store() -> Arc<dyn ObjectStore> {
348//! # Arc::new(LocalFileSystem::new())
349//! # }
350//! # async fn get_range_with_options() {
351//! #
352//! let object_store: Arc<dyn ObjectStore> = get_object_store();
353//! let path = Path::from("data/large_file");
354//! let ranges = vec![90..100, 400..600, 0..10];
355//! for range in ranges {
356//! let opts = GetOptions::default().with_range(Some(range));
357//! let data = object_store.get_opts(&path, opts).await.unwrap();
358//! // Do something with the data
359//! }
360//! # }
361//! ``````
362//!
363//! # Vectored Write
364//!
365//! When writing data it is often the case that the size of the output is not known ahead of time.
366//!
367//! A common approach to handling this is to bump-allocate a `Vec`, whereby the underlying
368//! allocation is repeatedly reallocated, each time doubling the capacity. The performance of
369//! this is suboptimal as reallocating memory will often involve copying it to a new location.
370//!
371//! Fortunately, as [`PutPayload`] does not require memory regions to be contiguous, it is
372//! possible to instead allocate memory in chunks and avoid bump allocating. [`PutPayloadMut`]
373//! encapsulates this approach
374//!
375//! ```ignore-wasm32
376//! # use object_store::local::LocalFileSystem;
377//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayloadMut};
378//! # use std::sync::Arc;
379//! # use bytes::Bytes;
380//! # use tokio::io::AsyncWriteExt;
381//! # use object_store::path::Path;
382//! # fn get_object_store() -> Arc<dyn ObjectStore> {
383//! # Arc::new(LocalFileSystem::new())
384//! # }
385//! # async fn multi_upload() {
386//! #
387//! let object_store: Arc<dyn ObjectStore> = get_object_store();
388//! let path = Path::from("data/large_file");
389//! let mut buffer = PutPayloadMut::new().with_block_size(8192);
390//! for _ in 0..22 {
391//! buffer.extend_from_slice(&[0; 1024]);
392//! }
393//! let payload = buffer.freeze();
394//!
395//! // Payload consists of 3 separate 8KB allocations
396//! assert_eq!(payload.as_ref().len(), 3);
397//! assert_eq!(payload.as_ref()[0].len(), 8192);
398//! assert_eq!(payload.as_ref()[1].len(), 8192);
399//! assert_eq!(payload.as_ref()[2].len(), 6144);
400//!
401//! object_store.put(&path, payload).await.unwrap();
402//! # }
403//! ```
404//!
405//! # Conditional Fetch
406//!
407//! More complex object retrieval can be supported by [`ObjectStore::get_opts`].
408//!
409//! For example, efficiently refreshing a cache without re-fetching the entire object
410//! data if the object hasn't been modified.
411//!
412//! ```
413//! # use std::collections::btree_map::Entry;
414//! # use std::collections::HashMap;
415//! # use object_store::{GetOptions, GetResult, ObjectStore, ObjectStoreExt, Result, Error};
416//! # use std::sync::Arc;
417//! # use std::time::{Duration, Instant};
418//! # use bytes::Bytes;
419//! # use tokio::io::AsyncWriteExt;
420//! # use object_store::path::Path;
421//! struct CacheEntry {
422//! /// Data returned by last request
423//! data: Bytes,
424//! /// ETag identifying the object returned by the server
425//! e_tag: String,
426//! /// Instant of last refresh
427//! refreshed_at: Instant,
428//! }
429//!
430//! /// Example cache that checks entries after 10 seconds for a new version
431//! struct Cache {
432//! entries: HashMap<Path, CacheEntry>,
433//! store: Arc<dyn ObjectStore>,
434//! }
435//!
436//! impl Cache {
437//! pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
438//! Ok(match self.entries.get_mut(path) {
439//! Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) {
440//! true => e.data.clone(), // Return cached data
441//! false => { // Check if remote version has changed
442//! let opts = GetOptions::new().with_if_none_match(Some(e.e_tag.clone()));
443//! match self.store.get_opts(&path, opts).await {
444//! Ok(d) => e.data = d.bytes().await?,
445//! Err(Error::NotModified { .. }) => {} // Data has not changed
446//! Err(e) => return Err(e),
447//! };
448//! e.refreshed_at = Instant::now();
449//! e.data.clone()
450//! }
451//! },
452//! None => { // Not cached, fetch data
453//! let get = self.store.get(&path).await?;
454//! let e_tag = get.meta.e_tag.clone();
455//! let data = get.bytes().await?;
456//! if let Some(e_tag) = e_tag {
457//! let entry = CacheEntry {
458//! e_tag,
459//! data: data.clone(),
460//! refreshed_at: Instant::now(),
461//! };
462//! self.entries.insert(path.clone(), entry);
463//! }
464//! data
465//! }
466//! })
467//! }
468//! }
469//! ```
470//!
471//! # Conditional Put
472//!
473//! The default behaviour when writing data is to upsert any existing object at the given path,
474//! overwriting any previous value. More complex behaviours can be achieved using [`PutMode`], and
475//! can be used to build [Optimistic Concurrency Control] based transactions. This facilitates
476//! building metadata catalogs, such as [Apache Iceberg] or [Delta Lake], directly on top of object
477//! storage, without relying on a separate DBMS.
478//!
479//! ```
480//! # use object_store::{Error, ObjectStore, ObjectStoreExt, PutMode, UpdateVersion};
481//! # use std::sync::Arc;
482//! # use bytes::Bytes;
483//! # use tokio::io::AsyncWriteExt;
484//! # use object_store::memory::InMemory;
485//! # use object_store::path::Path;
486//! # fn get_object_store() -> Arc<dyn ObjectStore> {
487//! # Arc::new(InMemory::new())
488//! # }
489//! # fn do_update(b: Bytes) -> Bytes {b}
490//! # async fn conditional_put() {
491//! let store = get_object_store();
492//! let path = Path::from("test");
493//!
494//! // Perform a conditional update on path
495//! loop {
496//! // Perform get request
497//! let r = store.get(&path).await.unwrap();
498//!
499//! // Save version information fetched
500//! let version = UpdateVersion {
501//! e_tag: r.meta.e_tag.clone(),
502//! version: r.meta.version.clone(),
503//! };
504//!
505//! // Compute new version of object contents
506//! let new = do_update(r.bytes().await.unwrap());
507//!
508//! // Attempt to commit transaction
509//! match store.put_opts(&path, new.into(), PutMode::Update(version).into()).await {
510//! Ok(_) => break, // Successfully committed
511//! Err(Error::Precondition { .. }) => continue, // Object has changed, try again
512//! Err(e) => panic!("{e}")
513//! }
514//! }
515//! # }
516//! ```
517//!
518//! [Optimistic Concurrency Control]: https://en.wikipedia.org/wiki/Optimistic_concurrency_control
519//! [Apache Iceberg]: https://iceberg.apache.org/
520//! [Delta Lake]: https://delta.io/
521//!
522//! # Feature Flags
523//!
524//! The feature set is layered so that you can pick an object store
525//! implementation, its HTTP transport, and its cryptography provider
526//! independently:
527//!
528//! * `cloud-base` shared cloud implementation (XML/JSON parsing,
529//! credentials, retry, etc.) and intentionally does *not* depend on
530//! `reqwest` or a cryptography provider.
531//! * `reqwest` enables the built-in [`reqwest`]-based [`HttpConnector`].
532//! * `aws-lc-rs` and `ring` each provide a bundled [`client::CryptoProvider`].
533//! * `<provider>-base` (`aws-base`, `azure-base`, `gcp-base`, `http-base`)
534//! adds the implementation specific logic on top of `cloud-base` without pulling in
535//! `reqwest` or a cryptography provider.
536//! * `<provider>` (`aws`, `azure`, `gcp`, `http`) is the batteries-included
537//! feature for `<provider>-base` + `reqwest` (with `rustls`) + the default
538//! `aws-lc-rs` cryptography provider, and is the typical choice.
539//!
540//! ## Implementation specific features
541//!
542//! | Feature | Enables | Notes |
543//! | --- | --- | --- |
544//! | `aws` | `aws-base` + `reqwest` + `aws-lc-rs` | Amazon S3 with the built-in HTTP transport. |
545//! | `azure` | `azure-base` + `reqwest` + `aws-lc-rs` | Azure Blob Storage with the built-in HTTP transport. |
546//! | `gcp` | `gcp-base` + `reqwest` + `aws-lc-rs` | Google Cloud Storage with the built-in HTTP transport. |
547//! | `http` | `http-base` + `reqwest` + `aws-lc-rs` | HTTP/WebDAV with the built-in HTTP transport. |
548//! | `aws-base` | | S3 without `reqwest` or crypto; supply your own [`HttpConnector`] and [`client::CryptoProvider`]. |
549//! | `azure-base` | | Azure without `reqwest` or crypto; supply your own [`HttpConnector`] and [`client::CryptoProvider`]. |
550//! | `gcp-base` | | GCS without `reqwest` or crypto; supply your own [`HttpConnector`] and [`client::CryptoProvider`]. |
551//! | `http-base` | | HTTP/WebDAV without `reqwest`; supply your own [`HttpConnector`]. |
552//!
553//! ## Transport and crypto features
554//!
555//! | Feature | Description |
556//! | --- | --- |
557//! | `reqwest` | Enables the [`reqwest`]-based [`HttpConnector`]. Enabled automatically by `aws`, `azure`, `gcp`, and `http`. |
558//! | `aws-lc-rs` | Bundled [`aws-lc-rs`]-based [`client::CryptoProvider`]. The default for the batteries-included provider features. |
559//! | `ring` | Bundled [`ring`]-based [`client::CryptoProvider`], e.g. for WASM targets. |
560//! | `cloud-base` | Shared cloud-provider implementation. Enabled automatically by `*-base` features; usually not enabled directly. |
561//!
562//! ## Other features
563//!
564//! | Feature | Description |
565//! | --- | --- |
566//! | `fs` *(default)* | Local filesystem store via [`LocalFileSystem`](local::LocalFileSystem). |
567//! | `tokio` | Enables Tokio-based utilities such as [`BufReader`](buffered::BufReader) and [`BufWriter`](buffered::BufWriter). Pulled in automatically by `fs` and the `*-base` features. |
568//! | `integration` | Exposes the [`integration`] module, a reusable test suite for verifying custom [`ObjectStore`] implementations. Not API-stable. |
569//!
570//! ## Selecting a `reqwest` TLS backend
571//!
572//! `reqwest` needs a TLS backend to compile, so whenever you enable the `reqwest` feature directly
573//! you must also enable one of `reqwest`'s TLS features:
574//!
575//! | reqwest feature | TLS stack | Notes |
576//! | --- | --- | --- |
577//! | `reqwest/rustls` | [rustls] with [`aws-lc-rs`] | enables `aws-lc-rs`. This is what `aws`/`azure`/`gcp`/`http` enable. |
578//! | `reqwest/native-tls` | the platform's native TLS (OpenSSL / SChannel / Secure Transport) | enables neither `rustls` nor `aws-lc-rs`. |
579//! | `reqwest/rustls-no-provider` | [rustls] with no bundled provider | enables neither provider; you must install one at runtime, e.g. `rustls::crypto::ring::default_provider().install_default()`. |
580//!
581//! ## Feature examples
582//!
583//! S3 implementation only; user provides the HTTP connector and crypto provider:
584//! ```toml
585//! object_store = { default-features = false, features = ["aws-base"] }
586//! ```
587//!
588//! S3 implementation + `reqwest` + `aws-lc-rs` signing (equivalent to the `aws` feature):
589//! ```toml
590//! object_store = { default-features = false, features = ["aws-base", "reqwest", "reqwest/rustls", "aws-lc-rs"] }
591//! ```
592//!
593//! S3 implementation + `reqwest` with native TLS + `ring` signing (no `aws-lc-rs` in the dependency tree):
594//! ```toml
595//! object_store = { default-features = false, features = ["aws-base", "reqwest", "reqwest/native-tls", "ring"] }
596//! ```
597//!
598//! [rustls]: https://crates.io/crates/rustls/
599//!
600//! # Cryptography
601//!
602//! Request signing (e.g. AWS SigV4 or GCP service-account signing) requires a
603//! [`client::CryptoProvider`]. The `aws`, `gcp`, and `azure` features
604//! use [`aws-lc-rs`], matching `reqwest`'s default so that applications do not end up with
605//! two crypto stacks.
606//!
607//! If you wish to use [`ring`] (e.g. to support WASM targets), use the
608//! `*-base` feature flags, e.g. `aws-base`, and then enable the `ring` feature.
609//!
610//! When enabling the `aws-lc-rs` feature without the built-in `reqwest`
611//! transport (e.g. `aws-base` + `aws-lc-rs` with a custom [`HttpConnector`]),
612//! you must also select an [`aws-lc-rs`] backend, as `object_store` does not
613//! pick one for you.
614//! For example enable `aws-lc-rs/aws-lc-sys` (or `fips` / `non-fips`).
615//!
616//! If both `ring` and `aws-lc-rs` are enabled, `aws-lc-rs` is used by default.
617//!
618//! You can also implement a custom [`client::CryptoProvider`] to use your own cryptographic library.
619//!
620//! This signing provider is independent of the TLS crypto provider used by the
621//! built-in `reqwest` transport — see
622//! [Selecting a `reqwest` TLS backend](#selecting-a-reqwest-tls-backend). The
623//! only combination that needs the provider registered manually (e.g.
624//! `rustls::crypto::ring::default_provider().install_default()` in your `main`)
625//! is `reqwest/rustls-no-provider`; `reqwest/rustls` and `reqwest/native-tls`
626//! configure their TLS stack automatically.
627//!
628//! [`aws-lc-rs`]: https://crates.io/crates/aws-lc-rs/
629//! [`ring`]: https://crates.io/crates/ring/
630//!
631//! # TLS Certificates
632//!
633//! Stores that use HTTPS/TLS (this is true for most cloud stores) can choose how certificates are validated.
634//!
635//! By default [`rustls-platform-verifier`] is used to verify certificates using the system's certificate
636//! facilities. Alternatively, this functionality can be disabled using
637//! [`ClientOptions::with_no_system_certificates`] and certificates manually registered using
638//! [`ClientOptions::with_root_certificate`].
639//!
640//! These could be a custom CA chain, or alternatively an alternative trust store, e.g. [`webpki-roots`].
641//!
642//! ```ignore-wasm32
643//! # #[cfg(feature = "aws")] {
644//! use object_store::{ClientOptions, Certificate};
645//!
646//! let mut options = ClientOptions::default().with_no_system_certificates(true);
647//! for root_cert in webpki_root_certs::TLS_SERVER_ROOT_CERTS {
648//! options = options.with_root_certificate(Certificate::from_der(root_cert.as_ref()).unwrap());
649//! }
650//! # }
651//! ```
652//!
653//! [CA]: https://en.wikipedia.org/wiki/Certificate_authority
654//! [`rustls-platform-verifier`]: https://crates.io/crates/rustls-platform-verifier/
655//! [`webpki-roots`]: https://crates.io/crates/webpki-roots
656//!
657//! # Customizing HTTP Clients
658//!
659//! Many [`ObjectStore`] implementations permit customization of the HTTP client via
660//! the [`HttpConnector`] trait and utilities in the [`client`] module.
661//! Examples include injecting custom HTTP headers or using an alternate
662//! tokio Runtime for I/O requests. To replace `reqwest` entirely (rather than
663//! tweak the bundled transport) see [Disabling `reqwest`](#disabling-reqwest).
664//!
665//! [`HttpConnector`]: client::HttpConnector
666//!
667//! # Disabling `reqwest`
668//!
669//! The `aws`, `azure`, `gcp`, and `http` features each bundle a
670//! [`reqwest`]-based HTTP transport, which is the right choice for most
671//! applications. If you would rather supply your own HTTP client — for example
672//! to share an existing client, to target a platform where `reqwest` does not
673//! compile (such as `wasm32-wasip1`), or to keep `reqwest` out of your
674//! dependency tree — use the matching `*-base` feature and provide an
675//! [`HttpConnector`](client::HttpConnector) at builder time.
676//!
677//! Remember to disable the default features so that `fs` (and its transitive
678//! dependencies) is not pulled in:
679//!
680//! ```toml
681//! [dependencies]
682//! object_store = { version = "0.13", default-features = false, features = ["aws-base"] }
683//! ```
684//!
685//! ```ignore
686//! use object_store::aws::AmazonS3Builder;
687//!
688//! let store = AmazonS3Builder::from_env()
689//! // `my_connector` is your own `impl HttpConnector`
690//! .with_http_connector(my_connector)
691//! .build()?;
692//! ```
693//!
694//! See [Feature Flags](#feature-flags) above for the full set of flags.
695//!
696//! [`reqwest`]: https://crates.io/crates/reqwest
697
698#[cfg(feature = "aws-base")]
699pub mod aws;
700#[cfg(feature = "azure-base")]
701pub mod azure;
702#[cfg(feature = "tokio")]
703pub mod buffered;
704#[cfg(not(target_arch = "wasm32"))]
705pub mod chunked;
706pub mod delimited;
707#[cfg(feature = "gcp-base")]
708pub mod gcp;
709#[cfg(feature = "http-base")]
710pub mod http;
711#[cfg(feature = "tokio")]
712pub mod limit;
713#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
714pub mod local;
715pub mod memory;
716pub mod path;
717pub mod prefix;
718pub mod registry;
719#[cfg(feature = "cloud-base")]
720pub mod signer;
721#[cfg(feature = "tokio")]
722pub mod throttle;
723
724#[cfg(feature = "cloud-base")]
725pub mod client;
726
727#[cfg(feature = "cloud-base")]
728pub use client::{
729 ClientConfigKey, ClientOptions, CredentialProvider, StaticCredentialProvider,
730 backoff::BackoffConfig, retry::RetryConfig,
731};
732
733#[cfg(all(
734 feature = "cloud-base",
735 feature = "reqwest",
736 not(target_arch = "wasm32")
737))]
738pub use client::Certificate;
739
740#[cfg(feature = "cloud-base")]
741mod config;
742
743mod tags;
744
745pub use tags::TagSet;
746
747pub mod list;
748pub mod multipart;
749mod parse;
750mod payload;
751mod upload;
752mod util;
753
754mod attributes;
755
756#[cfg(any(feature = "integration", test))]
757pub mod integration;
758
759pub use attributes::*;
760
761pub use parse::{ObjectStoreScheme, parse_url, parse_url_opts};
762pub use payload::*;
763pub use upload::*;
764pub use util::{GetRange, OBJECT_STORE_COALESCE_DEFAULT, coalesce_ranges, collect_bytes};
765
766// Re-export HTTP types used in public API
767pub use ::http::{Extensions, HeaderMap, HeaderValue};
768
769use crate::path::Path;
770#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
771use crate::util::maybe_spawn_blocking;
772use async_trait::async_trait;
773use bytes::Bytes;
774use chrono::{DateTime, Utc};
775use futures_util::{StreamExt, TryStreamExt, stream::BoxStream};
776use std::fmt::{Debug, Formatter};
777use std::ops::Range;
778use std::sync::Arc;
779
780/// An alias for a dynamically dispatched object store implementation.
781pub type DynObjectStore = dyn ObjectStore;
782
783/// Id type for multipart uploads.
784pub type MultipartId = String;
785
786/// Universal API for object store services.
787///
788/// See the [module-level documentation](crate) for a high level overview and
789/// examples. See [`ObjectStoreExt`] for additional convenience methods.
790///
791/// # Contract
792/// This trait is a contract between object store _implementations_
793/// (e.g. providers, wrappers) and the `object_store` crate itself. It is
794/// intended to be the minimum API required for an object store.
795///
796/// The [`ObjectStoreExt`] acts as an API/contract between `object_store` and
797/// the store _users_ and provides additional methods that may be simpler to use
798/// but overlap in functionality with [`ObjectStore`].
799///
800/// # Clone
801/// If a store implements [`Clone`], that will only clone the handle to the underlying data. It will NOT clone/fork the
802/// actual key-value data. Hence, the cloned instance and the original instance share the same state.
803///
804/// # Minimal Default Implementations
805/// There are only a few default implementations for methods in this trait by
806/// design. This was different from versions prior to `0.13.0`, which had many
807/// more default implementations. Default implementations are convenient for
808/// users, but error-prone for implementors as they require keeping the
809/// convenience APIs correctly in sync.
810///
811/// As of version 0.13.0, most methods on [`ObjectStore`] must be implemented, and
812/// the convenience methods have been moved to the [`ObjectStoreExt`] trait as
813/// described above. See [#385] for more details.
814///
815/// [#385]: https://github.com/apache/arrow-rs-object-store/issues/385
816///
817/// # Wrappers
818/// If you wrap an [`ObjectStore`] -- e.g. to add observability -- you SHOULD
819/// implement all trait methods. This ensures that defaults implementations
820/// that are overwritten by the wrapped store are also used by the wrapper.
821/// For example:
822///
823/// ```ignore
824/// struct MyStore {
825/// ...
826/// }
827///
828/// #[async_trait]
829/// impl ObjectStore for MyStore {
830/// // implement custom ranges handling
831/// async fn get_ranges(
832/// &self,
833/// location: &Path,
834/// ranges: &[Range<u64>],
835/// ) -> Result<Vec<Bytes>> {
836/// ...
837/// }
838///
839/// ...
840/// }
841///
842/// struct Wrapper {
843/// inner: Arc<dyn ObjectStore>,
844/// }
845///
846/// #[async_trait]
847/// #[deny(clippy::missing_trait_methods)]
848/// impl ObjectStore for Wrapper {
849/// // If we would not implement this method,
850/// // we would get the trait default and not
851/// // use the actual implementation of `inner`.
852/// async fn get_ranges(
853/// &self,
854/// location: &Path,
855/// ranges: &[Range<u64>],
856/// ) -> Result<Vec<Bytes>> {
857/// ...
858/// }
859///
860/// ...
861/// }
862/// ```
863///
864/// To automatically detect this issue, use
865/// [`#[deny(clippy::missing_trait_methods)]`](https://rust-lang.github.io/rust-clippy/master/index.html#missing_trait_methods).
866///
867/// # Upgrade Guide for 0.13.0
868///
869/// Upgrading to object_store 0.13.0 from an earlier version typically involves:
870///
871/// 1. Add a `use` for [`ObjectStoreExt`] to solve the error
872///
873/// ```text
874/// error[E0599]: no method named `put` found for reference `&dyn object_store::ObjectStore` in the current scope
875/// --> datafusion/datasource/src/url.rs:993:14
876/// ```
877///
878/// 2. Remove any (now) redundant implementations (such as `ObjectStore::put`) from any
879/// `ObjectStore` implementations to resolve the error
880///
881/// ```text
882/// error[E0407]: method `put` is not a member of trait `ObjectStore`
883/// --> datafusion/datasource/src/url.rs:1103:9
884/// |
885/// ```
886///
887/// 3. Convert `ObjectStore::delete` to [`ObjectStore::delete_stream`] (see documentation
888/// on that method for details and examples)
889///
890/// 4. Combine `ObjectStore::copy` and `ObjectStore::copy_if_not_exists` implementations into
891/// [`ObjectStore::copy_opts`] (see documentation on that method for details and examples)
892///
893/// 5. Update `object_store::Error::NotImplemented` to include the name of the missing method
894///
895/// For example, change instances of
896/// ```text
897/// object_store::Error::NotImplemented
898/// ```
899/// to
900/// ```
901/// object_store::Error::NotImplemented {
902/// operation: "put".to_string(),
903/// implementer: "RequestCountingObjectStore".to_string(),
904/// };
905/// ```
906///
907#[async_trait]
908pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
909 /// Save the provided `payload` to `location` with the given options
910 ///
911 /// The operation is guaranteed to be atomic, it will either successfully
912 /// write the entirety of `payload` to `location`, or fail. No clients
913 /// should be able to observe a partially written object
914 async fn put_opts(
915 &self,
916 location: &Path,
917 payload: PutPayload,
918 opts: PutOptions,
919 ) -> Result<PutResult>;
920
921 /// Perform a multipart upload with options
922 ///
923 /// Client should prefer [`ObjectStore::put_opts`] for small payloads, as streaming uploads
924 /// typically require multiple separate requests. See [`MultipartUpload`] for more information
925 ///
926 /// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
927 async fn put_multipart_opts(
928 &self,
929 location: &Path,
930 opts: PutMultipartOptions,
931 ) -> Result<Box<dyn MultipartUpload>>;
932
933 /// Perform a get request with options
934 ///
935 /// ## Example
936 ///
937 /// This example uses a basic local filesystem object store to get an object with a specific etag.
938 /// On the local filesystem, supplying an invalid etag will error.
939 /// Versioned object stores will return the specified object version, if it exists.
940 ///
941 /// ```ignore-wasm32
942 /// # use object_store::local::LocalFileSystem;
943 /// # use tempfile::tempdir;
944 /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt, GetOptions};
945 /// async fn get_opts_example() {
946 /// let tmp = tempdir().unwrap();
947 /// let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
948 /// let location = Path::from("example.txt");
949 /// let content = b"Hello, Object Store!";
950 ///
951 /// // Put the object into the store
952 /// store
953 /// .put(&location, content.as_ref().into())
954 /// .await
955 /// .expect("Failed to put object");
956 ///
957 /// // Get the object from the store to figure out the right etag
958 /// let result: object_store::GetResult = store.get(&location).await.expect("Failed to get object");
959 ///
960 /// let etag = result.meta.e_tag.expect("ETag should be present");
961 ///
962 /// // Get the object from the store with range and etag
963 /// let bytes = store
964 /// .get_opts(
965 /// &location,
966 /// GetOptions::new()
967 /// .with_if_match(Some(etag.clone())),
968 /// )
969 /// .await
970 /// .expect("Failed to get object with range and etag")
971 /// .bytes()
972 /// .await
973 /// .expect("Failed to read bytes");
974 ///
975 /// println!(
976 /// "Retrieved with ETag {}: {}",
977 /// etag,
978 /// String::from_utf8_lossy(&bytes)
979 /// );
980 ///
981 /// // Show that if the etag does not match, we get an error
982 /// let wrong_etag = "wrong-etag".to_string();
983 /// match store
984 /// .get_opts(
985 /// &location,
986 /// GetOptions::new().with_if_match(Some(wrong_etag))
987 /// )
988 /// .await
989 /// {
990 /// Ok(_) => println!("Unexpectedly succeeded with wrong ETag"),
991 /// Err(e) => println!("On a non-versioned object store, getting an invalid ETag ('wrong-etag') results in an error as expected: {}", e),
992 /// }
993 /// }
994 /// ```
995 ///
996 /// To retrieve a range of bytes from a versioned object, specify the range in the [`GetOptions`] supplied to this method.
997 ///
998 /// ```ignore-wasm32
999 /// # use object_store::local::LocalFileSystem;
1000 /// # use tempfile::tempdir;
1001 /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt, GetOptions};
1002 /// async fn get_opts_range_example() {
1003 /// let tmp = tempdir().unwrap();
1004 /// let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
1005 /// let location = Path::from("example.txt");
1006 /// let content = b"Hello, Object Store!";
1007 ///
1008 /// // Put the object into the store
1009 /// store
1010 /// .put(&location, content.as_ref().into())
1011 /// .await
1012 /// .expect("Failed to put object");
1013 ///
1014 /// // Get the object from the store to figure out the right etag
1015 /// let result: object_store::GetResult = store.get(&location).await.expect("Failed to get object");
1016 ///
1017 /// let etag = result.meta.e_tag.expect("ETag should be present");
1018 ///
1019 /// // Get the object from the store with range and etag
1020 /// let bytes = store
1021 /// .get_opts(
1022 /// &location,
1023 /// GetOptions::new()
1024 /// .with_range(Some(0..5))
1025 /// .with_if_match(Some(etag.clone())),
1026 /// )
1027 /// .await
1028 /// .expect("Failed to get object with range and etag")
1029 /// .bytes()
1030 /// .await
1031 /// .expect("Failed to read bytes");
1032 ///
1033 /// println!(
1034 /// "Retrieved range [0-5] with ETag {}: {}",
1035 /// etag,
1036 /// String::from_utf8_lossy(&bytes)
1037 /// );
1038 ///
1039 /// // Show that if the etag does not match, we get an error
1040 /// let wrong_etag = "wrong-etag".to_string();
1041 /// match store
1042 /// .get_opts(
1043 /// &location,
1044 /// GetOptions::new().with_range(Some(0..5)).with_if_match(Some(wrong_etag))
1045 /// )
1046 /// .await
1047 /// {
1048 /// Ok(_) => println!("Unexpectedly succeeded with wrong ETag"),
1049 /// Err(e) => println!("On a non-versioned object store, getting an invalid ETag ('wrong-etag') results in an error as expected: {}", e),
1050 /// }
1051 /// }
1052 /// ```
1053 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
1054
1055 /// Return the bytes that are stored at the specified location
1056 /// in the given byte ranges
1057 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
1058 coalesce_ranges(
1059 ranges,
1060 |range| self.get_range(location, range),
1061 OBJECT_STORE_COALESCE_DEFAULT,
1062 )
1063 .await
1064 }
1065
1066 /// Delete all the objects at the specified locations
1067 ///
1068 /// When supported, this method will use bulk operations that delete more
1069 /// than one object per a request. Otherwise, the implementation may call
1070 /// the single object delete method for each location.
1071 ///
1072 /// # Bulk Delete Support
1073 ///
1074 /// The following backends support native bulk delete operations:
1075 ///
1076 /// - **AWS (S3)**: Uses the native [DeleteObjects] API with batches of up to 1000 objects
1077 /// - **Azure**: Uses the native [Blob Batch] API with batches of up to 256 objects
1078 ///
1079 /// The following backends use concurrent individual delete operations:
1080 ///
1081 /// - **GCP**: Performs individual delete requests with up to 10 concurrent operations
1082 /// - **HTTP**: Performs individual delete requests with up to 10 concurrent operations
1083 /// - **Local**: Performs individual file deletions with up to 10 concurrent operations
1084 /// - **Memory**: Performs individual in-memory deletions sequentially
1085 ///
1086 /// [DeleteObjects]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
1087 /// [Blob Batch]: https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch
1088 ///
1089 /// The returned stream yields the results of the delete operations in the
1090 /// same order as the input locations. However, some errors will be from
1091 /// an overall call to a bulk delete operation, and not from a specific
1092 /// location.
1093 ///
1094 /// If the object did not exist, the result may be an error or a success,
1095 /// depending on the behavior of the underlying store. For example, local
1096 /// filesystems, GCP, and Azure return an error, while S3 and in-memory will
1097 /// return Ok. If it is an error, it will be [`Error::NotFound`].
1098 ///
1099 /// ```ignore-wasm32
1100 /// # use futures_util::{StreamExt, TryStreamExt};
1101 /// # use object_store::local::LocalFileSystem;
1102 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1103 /// # let root = tempfile::TempDir::new().unwrap();
1104 /// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1105 /// # use object_store::{ObjectStore, ObjectStoreExt, ObjectMeta};
1106 /// # use object_store::path::Path;
1107 /// # use futures_util::{StreamExt, TryStreamExt};
1108 /// #
1109 /// // Create two objects
1110 /// store.put(&Path::from("foo"), "foo".into()).await?;
1111 /// store.put(&Path::from("bar"), "bar".into()).await?;
1112 ///
1113 /// // List object
1114 /// let locations = store.list(None).map_ok(|m| m.location).boxed();
1115 ///
1116 /// // Delete them
1117 /// store.delete_stream(locations).try_collect::<Vec<Path>>().await?;
1118 /// # Ok(())
1119 /// # }
1120 /// # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
1121 /// # rt.block_on(example()).unwrap();
1122 /// ```
1123 ///
1124 /// Note: Before version 0.13, `delete_stream` has a default implementation
1125 /// that deletes each object with up to 10 concurrent requests. This default
1126 /// behavior has been removed, and each implementation must now provide its
1127 /// own `delete_stream` implementation explicitly. The following example
1128 /// shows how to implement `delete_stream` to get the previous default
1129 /// behavior.
1130 ///
1131 /// ```
1132 /// # use async_trait::async_trait;
1133 /// # use futures_util::stream::{BoxStream, StreamExt};
1134 /// # use object_store::path::Path;
1135 /// # use object_store::{
1136 /// # CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
1137 /// # PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
1138 /// # };
1139 /// # use std::fmt;
1140 /// # use std::fmt::Debug;
1141 /// # use std::sync::Arc;
1142 /// #
1143 /// # struct ExampleClient;
1144 /// #
1145 /// # impl ExampleClient {
1146 /// # async fn delete(&self, _path: &Path) -> Result<()> {
1147 /// # Ok(())
1148 /// # }
1149 /// # }
1150 /// #
1151 /// # struct ExampleStore {
1152 /// # client: Arc<ExampleClient>,
1153 /// # }
1154 /// #
1155 /// # impl Debug for ExampleStore {
1156 /// # fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1157 /// # write!(f, "ExampleStore")
1158 /// # }
1159 /// # }
1160 /// #
1161 /// # impl fmt::Display for ExampleStore {
1162 /// # fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1163 /// # write!(f, "ExampleStore")
1164 /// # }
1165 /// # }
1166 /// #
1167 /// # #[async_trait]
1168 /// # impl ObjectStore for ExampleStore {
1169 /// # async fn put_opts(&self, _: &Path, _: PutPayload, _: PutOptions) -> Result<PutResult> {
1170 /// # todo!()
1171 /// # }
1172 /// #
1173 /// # async fn put_multipart_opts(
1174 /// # &self,
1175 /// # _: &Path,
1176 /// # _: PutMultipartOptions,
1177 /// # ) -> Result<Box<dyn MultipartUpload>> {
1178 /// # todo!()
1179 /// # }
1180 /// #
1181 /// # async fn get_opts(&self, _: &Path, _: GetOptions) -> Result<GetResult> {
1182 /// # todo!()
1183 /// # }
1184 /// #
1185 /// fn delete_stream(
1186 /// &self,
1187 /// locations: BoxStream<'static, Result<Path>>,
1188 /// ) -> BoxStream<'static, Result<Path>> {
1189 /// let client = Arc::clone(&self.client);
1190 /// locations
1191 /// .map(move |location| {
1192 /// let client = Arc::clone(&client);
1193 /// async move {
1194 /// let location = location?;
1195 /// client.delete(&location).await?;
1196 /// Ok(location)
1197 /// }
1198 /// })
1199 /// .buffered(10)
1200 /// .boxed()
1201 /// }
1202 /// #
1203 /// # fn list(&self, _: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
1204 /// # todo!()
1205 /// # }
1206 /// #
1207 /// # async fn list_with_delimiter(&self, _: Option<&Path>) -> Result<ListResult> {
1208 /// # todo!()
1209 /// # }
1210 /// #
1211 /// # async fn copy_opts(&self, _: &Path, _: &Path, _: CopyOptions) -> Result<()> {
1212 /// # todo!()
1213 /// # }
1214 /// # }
1215 /// #
1216 /// # async fn example() {
1217 /// # let store = ExampleStore { client: Arc::new(ExampleClient) };
1218 /// # let paths = futures_util::stream::iter(vec![Ok(Path::from("foo")), Ok(Path::from("bar"))]).boxed();
1219 /// # let results = store.delete_stream(paths).collect::<Vec<_>>().await;
1220 /// # assert_eq!(results.len(), 2);
1221 /// # assert_eq!(results[0].as_ref().unwrap(), &Path::from("foo"));
1222 /// # assert_eq!(results[1].as_ref().unwrap(), &Path::from("bar"));
1223 /// # }
1224 /// #
1225 /// # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
1226 /// # rt.block_on(example());
1227 /// ```
1228 fn delete_stream(
1229 &self,
1230 locations: BoxStream<'static, Result<Path>>,
1231 ) -> BoxStream<'static, Result<Path>>;
1232
1233 /// List all the objects with the given prefix.
1234 ///
1235 /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar` is a prefix of `foo/bar/x` but not of
1236 /// `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x` will be included.
1237 ///
1238 /// Note: the order of returned [`ObjectMeta`] is not guaranteed
1239 ///
1240 /// For more advanced listing see [`PaginatedListStore`](list::PaginatedListStore)
1241 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>;
1242
1243 /// List all the objects with the given prefix and a location greater than `offset`
1244 ///
1245 /// Some stores, such as S3 and GCS, may be able to push `offset` down to reduce
1246 /// the number of network requests required.
1247 ///
1248 /// This returns an exclusive offset, i.e. objects at exactly `offset` will not be included.
1249 ///
1250 /// Note: the order of returned [`ObjectMeta`] is not guaranteed
1251 ///
1252 /// For more advanced listing see [`PaginatedListStore`](list::PaginatedListStore)
1253 fn list_with_offset(
1254 &self,
1255 prefix: Option<&Path>,
1256 offset: &Path,
1257 ) -> BoxStream<'static, Result<ObjectMeta>> {
1258 let offset = offset.clone();
1259 self.list(prefix)
1260 .try_filter(move |f| futures_util::future::ready(f.location > offset))
1261 .boxed()
1262 }
1263
1264 /// List objects with the given prefix and an implementation specific
1265 /// delimiter. Returns common prefixes (directories) in addition to object
1266 /// metadata.
1267 ///
1268 /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar` is a prefix of `foo/bar/x` but not of
1269 /// `foo/bar_baz/x`. List is not recursive, i.e. `foo/bar/more/x` will not be included.
1270 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
1271
1272 /// Copy an object from one path to another in the same object store.
1273 async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()>;
1274
1275 /// Move an object from one path to another in the same object store.
1276 ///
1277 /// By default, this is implemented as a copy and then delete source. It may not
1278 /// check when deleting source that it was the same object that was originally copied.
1279 async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
1280 let RenameOptions {
1281 target_mode,
1282 extensions,
1283 } = options;
1284 let copy_mode = match target_mode {
1285 RenameTargetMode::Overwrite => CopyMode::Overwrite,
1286 RenameTargetMode::Create => CopyMode::Create,
1287 };
1288 let copy_options = CopyOptions {
1289 mode: copy_mode,
1290 extensions,
1291 };
1292 self.copy_opts(from, to, copy_options).await?;
1293 self.delete(from).await?;
1294 Ok(())
1295 }
1296}
1297
1298macro_rules! as_ref_impl {
1299 ($type:ty) => {
1300 #[async_trait]
1301 #[deny(clippy::missing_trait_methods)]
1302 impl<T: ObjectStore + ?Sized> ObjectStore for $type {
1303 async fn put_opts(
1304 &self,
1305 location: &Path,
1306 payload: PutPayload,
1307 opts: PutOptions,
1308 ) -> Result<PutResult> {
1309 self.as_ref().put_opts(location, payload, opts).await
1310 }
1311
1312 async fn put_multipart_opts(
1313 &self,
1314 location: &Path,
1315 opts: PutMultipartOptions,
1316 ) -> Result<Box<dyn MultipartUpload>> {
1317 self.as_ref().put_multipart_opts(location, opts).await
1318 }
1319
1320 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
1321 self.as_ref().get_opts(location, options).await
1322 }
1323
1324 async fn get_ranges(
1325 &self,
1326 location: &Path,
1327 ranges: &[Range<u64>],
1328 ) -> Result<Vec<Bytes>> {
1329 self.as_ref().get_ranges(location, ranges).await
1330 }
1331
1332 fn delete_stream(
1333 &self,
1334 locations: BoxStream<'static, Result<Path>>,
1335 ) -> BoxStream<'static, Result<Path>> {
1336 self.as_ref().delete_stream(locations)
1337 }
1338
1339 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
1340 self.as_ref().list(prefix)
1341 }
1342
1343 fn list_with_offset(
1344 &self,
1345 prefix: Option<&Path>,
1346 offset: &Path,
1347 ) -> BoxStream<'static, Result<ObjectMeta>> {
1348 self.as_ref().list_with_offset(prefix, offset)
1349 }
1350
1351 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
1352 self.as_ref().list_with_delimiter(prefix).await
1353 }
1354
1355 async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
1356 self.as_ref().copy_opts(from, to, options).await
1357 }
1358
1359 async fn rename_opts(
1360 &self,
1361 from: &Path,
1362 to: &Path,
1363 options: RenameOptions,
1364 ) -> Result<()> {
1365 self.as_ref().rename_opts(from, to, options).await
1366 }
1367 }
1368 };
1369}
1370
1371as_ref_impl!(Arc<T>);
1372as_ref_impl!(Box<T>);
1373
1374/// Extension trait for [`ObjectStore`] with convenience functions.
1375///
1376/// See the [module-level documentation](crate) for a high level overview and
1377/// examples. See "contract" section within the [`ObjectStore`] documentation
1378/// for more reasoning.
1379///
1380/// # Implementation
1381/// You MUST NOT implement this trait yourself. It is automatically implemented for all [`ObjectStore`] implementations.
1382pub trait ObjectStoreExt: ObjectStore {
1383 /// Save the provided bytes to the specified location
1384 ///
1385 /// The operation is guaranteed to be atomic, it will either successfully
1386 /// write the entirety of `payload` to `location`, or fail. No clients
1387 /// should be able to observe a partially written object
1388 fn put(&self, location: &Path, payload: PutPayload) -> impl Future<Output = Result<PutResult>>;
1389
1390 /// Perform a multipart upload
1391 ///
1392 /// Client should prefer [`ObjectStoreExt::put`] for small payloads, as streaming uploads
1393 /// typically require multiple separate requests. See [`MultipartUpload`] for more information
1394 ///
1395 /// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
1396 fn put_multipart(
1397 &self,
1398 location: &Path,
1399 ) -> impl Future<Output = Result<Box<dyn MultipartUpload>>>;
1400
1401 /// Return the bytes that are stored at the specified location.
1402 ///
1403 /// ## Example
1404 ///
1405 /// This example uses a basic local filesystem object store to get an object.
1406 ///
1407 /// ```ignore-wasm32
1408 /// # use object_store::local::LocalFileSystem;
1409 /// # use tempfile::tempdir;
1410 /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
1411 /// async fn get_example() {
1412 /// let tmp = tempdir().unwrap();
1413 /// let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
1414 /// let location = Path::from("example.txt");
1415 /// let content = b"Hello, Object Store!";
1416 ///
1417 /// // Put the object into the store
1418 /// store
1419 /// .put(&location, content.as_ref().into())
1420 /// .await
1421 /// .expect("Failed to put object");
1422 ///
1423 /// // Get the object from the store
1424 /// let get_result = store.get(&location).await.expect("Failed to get object");
1425 /// let bytes = get_result.bytes().await.expect("Failed to read bytes");
1426 /// println!("Retrieved content: {}", String::from_utf8_lossy(&bytes));
1427 /// }
1428 /// ```
1429 fn get(&self, location: &Path) -> impl Future<Output = Result<GetResult>>;
1430
1431 /// Return the bytes that are stored at the specified location
1432 /// in the given byte range.
1433 ///
1434 /// See [`GetRange::Bounded`] for more details on how `range` gets interpreted.
1435 ///
1436 /// To retrieve a range of bytes from a versioned object, use [`ObjectStore::get_opts`] by specifying the range in the [`GetOptions`].
1437 ///
1438 /// ## Examples
1439 ///
1440 /// This example uses a basic local filesystem object store to get a byte range from an object.
1441 ///
1442 /// ```ignore-wasm32
1443 /// # use object_store::local::LocalFileSystem;
1444 /// # use tempfile::tempdir;
1445 /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
1446 /// async fn get_range_example() {
1447 /// let tmp = tempdir().unwrap();
1448 /// let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
1449 /// let location = Path::from("example.txt");
1450 /// let content = b"Hello, Object Store!";
1451 ///
1452 /// // Put the object into the store
1453 /// store
1454 /// .put(&location, content.as_ref().into())
1455 /// .await
1456 /// .expect("Failed to put object");
1457 ///
1458 /// // Get the object from the store
1459 /// let bytes = store
1460 /// .get_range(&location, 0..5)
1461 /// .await
1462 /// .expect("Failed to get object");
1463 /// println!("Retrieved range [0-5]: {}", String::from_utf8_lossy(&bytes));
1464 /// }
1465 /// ```
1466 fn get_range(&self, location: &Path, range: Range<u64>) -> impl Future<Output = Result<Bytes>>;
1467
1468 /// Return the metadata for the specified location
1469 fn head(&self, location: &Path) -> impl Future<Output = Result<ObjectMeta>>;
1470
1471 /// Delete the object at the specified location.
1472 fn delete(&self, location: &Path) -> impl Future<Output = Result<()>>;
1473
1474 /// Copy an object from one path to another in the same object store.
1475 ///
1476 /// If there exists an object at the destination, it will be overwritten.
1477 fn copy(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1478
1479 /// Copy an object from one path to another, only if destination is empty.
1480 ///
1481 /// Will return an error if the destination already has an object.
1482 ///
1483 /// Performs an atomic operation if the underlying object storage supports it.
1484 /// If atomic operations are not supported by the underlying object storage (like S3)
1485 /// it will return an error.
1486 fn copy_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1487
1488 /// Move an object from one path to another in the same object store.
1489 ///
1490 /// By default, this is implemented as a copy and then delete source. It may not
1491 /// check when deleting source that it was the same object that was originally copied.
1492 ///
1493 /// If there exists an object at the destination, it will be overwritten.
1494 fn rename(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1495
1496 /// Move an object from one path to another in the same object store.
1497 ///
1498 /// Will return an error if the destination already has an object.
1499 fn rename_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1500}
1501
1502impl<T> ObjectStoreExt for T
1503where
1504 T: ObjectStore + ?Sized,
1505{
1506 async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
1507 self.put_opts(location, payload, PutOptions::default())
1508 .await
1509 }
1510
1511 async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
1512 self.put_multipart_opts(location, PutMultipartOptions::default())
1513 .await
1514 }
1515
1516 async fn get(&self, location: &Path) -> Result<GetResult> {
1517 self.get_opts(location, GetOptions::default()).await
1518 }
1519
1520 async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
1521 let options = GetOptions::new().with_range(Some(range));
1522 self.get_opts(location, options).await?.bytes().await
1523 }
1524
1525 async fn head(&self, location: &Path) -> Result<ObjectMeta> {
1526 let options = GetOptions::new().with_head(true);
1527 Ok(self.get_opts(location, options).await?.meta)
1528 }
1529
1530 async fn delete(&self, location: &Path) -> Result<()> {
1531 let location = location.clone();
1532 let mut stream =
1533 self.delete_stream(futures_util::stream::once(async move { Ok(location) }).boxed());
1534 let _path = stream.try_next().await?.ok_or_else(|| Error::Generic {
1535 store: "ext",
1536 source: "`delete_stream` with one location should yield once but didn't".into(),
1537 })?;
1538 if stream.next().await.is_some() {
1539 Err(Error::Generic {
1540 store: "ext",
1541 source: "`delete_stream` with one location expected to yield exactly once, but yielded more than once".into(),
1542 })
1543 } else {
1544 Ok(())
1545 }
1546 }
1547
1548 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
1549 let options = CopyOptions::new().with_mode(CopyMode::Overwrite);
1550 self.copy_opts(from, to, options).await
1551 }
1552
1553 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
1554 let options = CopyOptions::new().with_mode(CopyMode::Create);
1555 self.copy_opts(from, to, options).await
1556 }
1557
1558 async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
1559 let options = RenameOptions::new().with_target_mode(RenameTargetMode::Overwrite);
1560 self.rename_opts(from, to, options).await
1561 }
1562
1563 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
1564 let options = RenameOptions::new().with_target_mode(RenameTargetMode::Create);
1565 self.rename_opts(from, to, options).await
1566 }
1567}
1568
1569/// Result of a list call that includes objects, prefixes (directories) and a
1570/// token for the next set of results. Individual result sets may be limited to
1571/// 1,000 objects based on the underlying object storage's limitations.
1572#[derive(Debug)]
1573pub struct ListResult {
1574 /// Prefixes that are common (like directories)
1575 pub common_prefixes: Vec<Path>,
1576 /// Object metadata for the listing
1577 pub objects: Vec<ObjectMeta>,
1578 /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1579 /// that need to return context-specific information (like cache status) from trait methods.
1580 ///
1581 /// HTTP-backed stores in this crate populate this with the extensions of the HTTP
1582 /// response, allowing custom HTTP middleware to propagate information to callers.
1583 /// Where a result is assembled from multiple paginated requests, the extensions of
1584 /// each response are merged, with those of later responses taking precedence.
1585 pub extensions: Extensions,
1586}
1587
1588/// The metadata that describes an object.
1589#[derive(Debug, Clone, PartialEq, Eq)]
1590pub struct ObjectMeta {
1591 /// The full path to the object
1592 pub location: Path,
1593 /// The last modified time
1594 pub last_modified: DateTime<Utc>,
1595 /// The size in bytes of the object.
1596 ///
1597 /// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM
1598 pub size: u64,
1599 /// The unique identifier for the object
1600 ///
1601 /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
1602 pub e_tag: Option<String>,
1603 /// A version indicator for this object
1604 pub version: Option<String>,
1605}
1606
1607/// Options for a get request, such as range
1608#[derive(Debug, Default, Clone)]
1609pub struct GetOptions {
1610 /// Request will succeed if the `ObjectMeta::e_tag` matches
1611 /// otherwise returning [`Error::Precondition`]
1612 ///
1613 /// See <https://datatracker.ietf.org/doc/html/rfc9110#name-if-match>
1614 ///
1615 /// Examples:
1616 ///
1617 /// ```text
1618 /// If-Match: "xyzzy"
1619 /// If-Match: "xyzzy", "r2d2xxxx", "c3piozzzz"
1620 /// If-Match: *
1621 /// ```
1622 pub if_match: Option<String>,
1623 /// Request will succeed if the `ObjectMeta::e_tag` does not match
1624 /// otherwise returning [`Error::NotModified`]
1625 ///
1626 /// See <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.2>
1627 ///
1628 /// Examples:
1629 ///
1630 /// ```text
1631 /// If-None-Match: "xyzzy"
1632 /// If-None-Match: "xyzzy", "r2d2xxxx", "c3piozzzz"
1633 /// If-None-Match: *
1634 /// ```
1635 pub if_none_match: Option<String>,
1636 /// Request will succeed if the object has been modified since
1637 ///
1638 /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.3>
1639 pub if_modified_since: Option<DateTime<Utc>>,
1640 /// Request will succeed if the object has not been modified since
1641 /// otherwise returning [`Error::Precondition`]
1642 ///
1643 /// Some stores, such as S3, will only return `NotModified` for exact
1644 /// timestamp matches, instead of for any timestamp greater than or equal.
1645 ///
1646 /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.4>
1647 pub if_unmodified_since: Option<DateTime<Utc>>,
1648 /// Request transfer of only the specified range of bytes
1649 /// otherwise returning [`Error::NotModified`]
1650 ///
1651 /// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
1652 pub range: Option<GetRange>,
1653 /// Request a particular object version
1654 pub version: Option<String>,
1655 /// Request transfer of no content
1656 ///
1657 /// <https://datatracker.ietf.org/doc/html/rfc9110#name-head>
1658 pub head: bool,
1659 /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1660 /// that need to pass context-specific information (like tracing spans) via trait methods.
1661 ///
1662 /// These extensions are ignored entirely by backends offered through this crate.
1663 pub extensions: Extensions,
1664}
1665
1666impl GetOptions {
1667 /// Returns an error if the modification conditions on this request are not satisfied
1668 ///
1669 /// <https://datatracker.ietf.org/doc/html/rfc7232#section-6>
1670 pub fn check_preconditions(&self, meta: &ObjectMeta) -> Result<()> {
1671 // The use of the invalid etag "*" means no ETag is equivalent to never matching
1672 let etag = meta.e_tag.as_deref().unwrap_or("*");
1673 let last_modified = meta.last_modified;
1674
1675 if let Some(m) = &self.if_match {
1676 if m != "*" && m.split(',').map(str::trim).all(|x| x != etag) {
1677 return Err(Error::Precondition {
1678 path: meta.location.to_string(),
1679 source: format!("{etag} does not match {m}").into(),
1680 });
1681 }
1682 } else if let Some(date) = self.if_unmodified_since {
1683 if last_modified > date {
1684 return Err(Error::Precondition {
1685 path: meta.location.to_string(),
1686 source: format!("{date} < {last_modified}").into(),
1687 });
1688 }
1689 }
1690
1691 if let Some(m) = &self.if_none_match {
1692 if m == "*" || m.split(',').map(str::trim).any(|x| x == etag) {
1693 return Err(Error::NotModified {
1694 path: meta.location.to_string(),
1695 source: format!("{etag} matches {m}").into(),
1696 });
1697 }
1698 } else if let Some(date) = self.if_modified_since {
1699 if last_modified <= date {
1700 return Err(Error::NotModified {
1701 path: meta.location.to_string(),
1702 source: format!("{date} >= {last_modified}").into(),
1703 });
1704 }
1705 }
1706 Ok(())
1707 }
1708
1709 /// Create a new [`GetOptions`]
1710 pub fn new() -> Self {
1711 Self::default()
1712 }
1713
1714 /// Sets the `if_match` condition.
1715 ///
1716 /// See [`GetOptions::if_match`]
1717 #[must_use]
1718 pub fn with_if_match(mut self, etag: Option<impl Into<String>>) -> Self {
1719 self.if_match = etag.map(Into::into);
1720 self
1721 }
1722
1723 /// Sets the `if_none_match` condition.
1724 ///
1725 /// See [`GetOptions::if_none_match`]
1726 #[must_use]
1727 pub fn with_if_none_match(mut self, etag: Option<impl Into<String>>) -> Self {
1728 self.if_none_match = etag.map(Into::into);
1729 self
1730 }
1731
1732 /// Sets the `if_modified_since` condition.
1733 ///
1734 /// See [`GetOptions::if_modified_since`]
1735 #[must_use]
1736 pub fn with_if_modified_since(mut self, dt: Option<impl Into<DateTime<Utc>>>) -> Self {
1737 self.if_modified_since = dt.map(Into::into);
1738 self
1739 }
1740
1741 /// Sets the `if_unmodified_since` condition.
1742 ///
1743 /// See [`GetOptions::if_unmodified_since`]
1744 #[must_use]
1745 pub fn with_if_unmodified_since(mut self, dt: Option<impl Into<DateTime<Utc>>>) -> Self {
1746 self.if_unmodified_since = dt.map(Into::into);
1747 self
1748 }
1749
1750 /// Sets the `range` condition.
1751 ///
1752 /// See [`GetOptions::range`]
1753 #[must_use]
1754 pub fn with_range(mut self, range: Option<impl Into<GetRange>>) -> Self {
1755 self.range = range.map(Into::into);
1756 self
1757 }
1758
1759 /// Sets the `version` condition.
1760 ///
1761 /// See [`GetOptions::version`]
1762 #[must_use]
1763 pub fn with_version(mut self, version: Option<impl Into<String>>) -> Self {
1764 self.version = version.map(Into::into);
1765 self
1766 }
1767
1768 /// Sets the `head` condition.
1769 ///
1770 /// See [`GetOptions::head`]
1771 #[must_use]
1772 pub fn with_head(mut self, head: impl Into<bool>) -> Self {
1773 self.head = head.into();
1774 self
1775 }
1776
1777 /// Sets the `extensions` condition.
1778 ///
1779 /// See [`GetOptions::extensions`]
1780 #[must_use]
1781 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1782 self.extensions = extensions;
1783 self
1784 }
1785}
1786
1787/// Result for a get request
1788#[derive(Debug)]
1789pub struct GetResult {
1790 /// The [`GetResultPayload`]
1791 pub payload: GetResultPayload,
1792 /// The [`ObjectMeta`] for this object
1793 pub meta: ObjectMeta,
1794 /// The range of bytes returned by this request
1795 ///
1796 /// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM
1797 pub range: Range<u64>,
1798 /// Additional object attributes
1799 pub attributes: Attributes,
1800 /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1801 /// that need to return context-specific information (like cache status) from trait methods.
1802 ///
1803 /// HTTP-backed stores in this crate populate this with the extensions of the HTTP
1804 /// response, allowing custom HTTP middleware to propagate information to callers.
1805 pub extensions: Extensions,
1806}
1807
1808/// The kind of a [`GetResult`]
1809///
1810/// This special cases the case of a local file, as some systems may
1811/// be able to optimise the case of a file already present on local disk
1812pub enum GetResultPayload {
1813 /// The file, path
1814 #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1815 File(std::fs::File, std::path::PathBuf),
1816 /// An opaque stream of bytes
1817 Stream(BoxStream<'static, Result<Bytes>>),
1818}
1819
1820impl Debug for GetResultPayload {
1821 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1822 match self {
1823 #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1824 Self::File(_, _) => write!(f, "GetResultPayload(File)"),
1825 Self::Stream(_) => write!(f, "GetResultPayload(Stream)"),
1826 }
1827 }
1828}
1829
1830impl GetResult {
1831 /// Collects the data into a [`Bytes`]
1832 pub async fn bytes(self) -> Result<Bytes> {
1833 let len = self.range.end - self.range.start;
1834 match self.payload {
1835 #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1836 GetResultPayload::File(mut file, path) => {
1837 maybe_spawn_blocking(move || {
1838 use crate::local::read_range;
1839
1840 let buffer = read_range(&mut file, &path, self.range)?;
1841
1842 Ok(buffer)
1843 })
1844 .await
1845 }
1846 GetResultPayload::Stream(s) => collect_bytes(s, Some(len)).await,
1847 }
1848 }
1849
1850 /// Converts this into a byte stream
1851 ///
1852 /// If the `self.kind` is [`GetResultPayload::File`] will perform chunked reads of the file,
1853 /// otherwise will return the [`GetResultPayload::Stream`].
1854 ///
1855 /// # Tokio Compatibility
1856 ///
1857 /// Tokio discourages performing blocking IO on a tokio worker thread, however,
1858 /// no major operating systems have stable async file APIs. Therefore if called from
1859 /// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
1860 /// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
1861 ///
1862 /// If not called from a tokio context, this will perform IO on the current thread with
1863 /// no additional complexity or overheads
1864 pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
1865 match self.payload {
1866 #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1867 GetResultPayload::File(file, path) => {
1868 const CHUNK_SIZE: usize = 8 * 1024;
1869 local::chunked_stream(file, path, self.range, CHUNK_SIZE)
1870 }
1871 GetResultPayload::Stream(s) => s,
1872 }
1873 }
1874}
1875
1876/// Configure preconditions for the put operation
1877#[derive(Debug, Clone, PartialEq, Eq, Default)]
1878pub enum PutMode {
1879 /// Perform an atomic write operation, overwriting any object present at the provided path
1880 #[default]
1881 Overwrite,
1882 /// Perform an atomic write operation, returning [`Error::AlreadyExists`] if an
1883 /// object already exists at the provided path
1884 Create,
1885 /// Perform an atomic write operation if the current version of the object matches the
1886 /// provided [`UpdateVersion`], returning [`Error::Precondition`] otherwise
1887 Update(UpdateVersion),
1888}
1889
1890/// Uniquely identifies a version of an object to update
1891///
1892/// Stores will use differing combinations of `e_tag` and `version` to provide conditional
1893/// updates, and it is therefore recommended applications preserve both
1894#[derive(Debug, Clone, PartialEq, Eq)]
1895pub struct UpdateVersion {
1896 /// The unique identifier for the newly created object
1897 ///
1898 /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
1899 pub e_tag: Option<String>,
1900 /// A version indicator for the newly created object
1901 pub version: Option<String>,
1902}
1903
1904impl From<PutResult> for UpdateVersion {
1905 fn from(value: PutResult) -> Self {
1906 Self {
1907 e_tag: value.e_tag,
1908 version: value.version,
1909 }
1910 }
1911}
1912
1913/// Options for a put request
1914#[derive(Debug, Clone, Default)]
1915pub struct PutOptions {
1916 /// Configure the [`PutMode`] for this operation
1917 pub mode: PutMode,
1918 /// Provide a [`TagSet`] for this object
1919 ///
1920 /// Implementations that don't support object tagging should ignore this
1921 pub tags: TagSet,
1922 /// Provide a set of [`Attributes`]
1923 ///
1924 /// Implementations that don't support an attribute should return an error
1925 pub attributes: Attributes,
1926 /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1927 /// that need to pass context-specific information (like tracing spans) via trait methods.
1928 ///
1929 /// These extensions are ignored entirely by backends offered through this crate.
1930 ///
1931 /// They are also excluded from [`PartialEq`] and [`Eq`].
1932 pub extensions: Extensions,
1933}
1934
1935impl PartialEq<Self> for PutOptions {
1936 fn eq(&self, other: &Self) -> bool {
1937 let Self {
1938 mode,
1939 tags,
1940 attributes,
1941 extensions: _,
1942 } = self;
1943 let Self {
1944 mode: other_mode,
1945 tags: other_tags,
1946 attributes: other_attributes,
1947 extensions: _,
1948 } = other;
1949 (mode == other_mode) && (tags == other_tags) && (attributes == other_attributes)
1950 }
1951}
1952
1953impl Eq for PutOptions {}
1954
1955impl From<PutMode> for PutOptions {
1956 fn from(mode: PutMode) -> Self {
1957 Self {
1958 mode,
1959 ..Default::default()
1960 }
1961 }
1962}
1963
1964impl From<TagSet> for PutOptions {
1965 fn from(tags: TagSet) -> Self {
1966 Self {
1967 tags,
1968 ..Default::default()
1969 }
1970 }
1971}
1972
1973impl From<Attributes> for PutOptions {
1974 fn from(attributes: Attributes) -> Self {
1975 Self {
1976 attributes,
1977 ..Default::default()
1978 }
1979 }
1980}
1981
1982// See <https://github.com/apache/arrow-rs-object-store/issues/339>.
1983#[doc(hidden)]
1984#[deprecated(note = "Use PutMultipartOptions", since = "0.12.3")]
1985pub type PutMultipartOpts = PutMultipartOptions;
1986
1987/// Options for [`ObjectStore::put_multipart_opts`]
1988#[derive(Debug, Clone, Default)]
1989pub struct PutMultipartOptions {
1990 /// Provide a [`TagSet`] for this object
1991 ///
1992 /// Implementations that don't support object tagging should ignore this
1993 pub tags: TagSet,
1994 /// Provide a set of [`Attributes`]
1995 ///
1996 /// Implementations that don't support an attribute should return an error
1997 pub attributes: Attributes,
1998 /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1999 /// that need to pass context-specific information (like tracing spans) via trait methods.
2000 ///
2001 /// These extensions are ignored entirely by backends offered through this crate.
2002 ///
2003 /// They are also excluded from [`PartialEq`] and [`Eq`].
2004 pub extensions: Extensions,
2005}
2006
2007impl PartialEq<Self> for PutMultipartOptions {
2008 fn eq(&self, other: &Self) -> bool {
2009 let Self {
2010 tags,
2011 attributes,
2012 extensions: _,
2013 } = self;
2014 let Self {
2015 tags: other_tags,
2016 attributes: other_attributes,
2017 extensions: _,
2018 } = other;
2019 (tags == other_tags) && (attributes == other_attributes)
2020 }
2021}
2022
2023impl Eq for PutMultipartOptions {}
2024
2025impl From<TagSet> for PutMultipartOptions {
2026 fn from(tags: TagSet) -> Self {
2027 Self {
2028 tags,
2029 ..Default::default()
2030 }
2031 }
2032}
2033
2034impl From<Attributes> for PutMultipartOptions {
2035 fn from(attributes: Attributes) -> Self {
2036 Self {
2037 attributes,
2038 ..Default::default()
2039 }
2040 }
2041}
2042
2043/// Result for a put request
2044#[derive(Debug, Clone)]
2045pub struct PutResult {
2046 /// The unique identifier for the newly created object
2047 ///
2048 /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
2049 pub e_tag: Option<String>,
2050 /// A version indicator for the newly created object
2051 pub version: Option<String>,
2052 /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
2053 /// that need to return context-specific information (like cache status) from trait methods.
2054 ///
2055 /// HTTP-backed stores in this crate populate this with the extensions of the HTTP
2056 /// response, allowing custom HTTP middleware to propagate information to callers.
2057 ///
2058 /// These extensions are excluded from [`PartialEq`] and [`Eq`].
2059 pub extensions: Extensions,
2060}
2061
2062impl PartialEq<Self> for PutResult {
2063 fn eq(&self, other: &Self) -> bool {
2064 let Self {
2065 e_tag,
2066 version,
2067 extensions: _,
2068 } = self;
2069 let Self {
2070 e_tag: other_e_tag,
2071 version: other_version,
2072 extensions: _,
2073 } = other;
2074 (e_tag == other_e_tag) && (version == other_version)
2075 }
2076}
2077
2078impl Eq for PutResult {}
2079
2080/// Configure preconditions for the copy operation
2081#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
2082pub enum CopyMode {
2083 /// Perform an atomic write operation, overwriting any object present at the provided path
2084 #[default]
2085 Overwrite,
2086 /// Perform an atomic write operation, returning [`Error::AlreadyExists`] if an
2087 /// object already exists at the provided path
2088 Create,
2089}
2090
2091/// Options for a copy request
2092#[derive(Debug, Clone, Default)]
2093pub struct CopyOptions {
2094 /// Configure the [`CopyMode`] for this operation
2095 pub mode: CopyMode,
2096 /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
2097 /// that need to pass context-specific information (like tracing spans) via trait methods.
2098 ///
2099 /// These extensions are ignored entirely by backends offered through this crate.
2100 ///
2101 /// They are also excluded from [`PartialEq`] and [`Eq`].
2102 pub extensions: Extensions,
2103}
2104
2105impl CopyOptions {
2106 /// Create a new [`CopyOptions`]
2107 pub fn new() -> Self {
2108 Self::default()
2109 }
2110
2111 /// Sets the `mode.
2112 ///
2113 /// See [`CopyOptions::mode`].
2114 #[must_use]
2115 pub fn with_mode(mut self, mode: CopyMode) -> Self {
2116 self.mode = mode;
2117 self
2118 }
2119
2120 /// Sets the `extensions`.
2121 ///
2122 /// See [`CopyOptions::extensions`].
2123 #[must_use]
2124 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
2125 self.extensions = extensions;
2126 self
2127 }
2128}
2129
2130impl PartialEq<Self> for CopyOptions {
2131 fn eq(&self, other: &Self) -> bool {
2132 let Self {
2133 mode,
2134 extensions: _,
2135 } = self;
2136 let Self {
2137 mode: mode_other,
2138 extensions: _,
2139 } = other;
2140
2141 mode == mode_other
2142 }
2143}
2144
2145impl Eq for CopyOptions {}
2146
2147/// Configure preconditions for the target of rename operation.
2148///
2149/// Note though that the source location may or not be deleted at the same time in an atomic operation. There is
2150/// currently NO flag to control the atomicity of "delete source at the same time as creating the target".
2151#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
2152pub enum RenameTargetMode {
2153 /// Perform a write operation on the target, overwriting any object present at the provided path.
2154 #[default]
2155 Overwrite,
2156 /// Perform an atomic write operation of the target, returning [`Error::AlreadyExists`] if an
2157 /// object already exists at the provided path.
2158 Create,
2159}
2160
2161/// Options for a rename request
2162#[derive(Debug, Clone, Default)]
2163pub struct RenameOptions {
2164 /// Configure the [`RenameTargetMode`] for this operation
2165 pub target_mode: RenameTargetMode,
2166 /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
2167 /// that need to pass context-specific information (like tracing spans) via trait methods.
2168 ///
2169 /// These extensions are ignored entirely by backends offered through this crate.
2170 ///
2171 /// They are also excluded from [`PartialEq`] and [`Eq`].
2172 pub extensions: Extensions,
2173}
2174
2175impl RenameOptions {
2176 /// Create a new [`RenameOptions`]
2177 pub fn new() -> Self {
2178 Self::default()
2179 }
2180
2181 /// Sets the `target_mode=.
2182 ///
2183 /// See [`RenameOptions::target_mode`].
2184 #[must_use]
2185 pub fn with_target_mode(mut self, target_mode: RenameTargetMode) -> Self {
2186 self.target_mode = target_mode;
2187 self
2188 }
2189
2190 /// Sets the `extensions`.
2191 ///
2192 /// See [`RenameOptions::extensions`].
2193 #[must_use]
2194 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
2195 self.extensions = extensions;
2196 self
2197 }
2198}
2199
2200impl PartialEq<Self> for RenameOptions {
2201 fn eq(&self, other: &Self) -> bool {
2202 let Self {
2203 target_mode,
2204 extensions: _,
2205 } = self;
2206 let Self {
2207 target_mode: target_mode_other,
2208 extensions: _,
2209 } = other;
2210
2211 target_mode == target_mode_other
2212 }
2213}
2214
2215impl Eq for RenameOptions {}
2216
2217/// A specialized `Result` for object store-related errors
2218pub type Result<T, E = Error> = std::result::Result<T, E>;
2219
2220/// A specialized `Error` for object store-related errors
2221#[derive(Debug, thiserror::Error)]
2222#[non_exhaustive]
2223pub enum Error {
2224 /// A fallback error type when no variant matches
2225 #[error("Generic {} error: {}", store, source)]
2226 Generic {
2227 /// The store this error originated from
2228 store: &'static str,
2229 /// The wrapped error
2230 source: Box<dyn std::error::Error + Send + Sync + 'static>,
2231 },
2232
2233 /// Error when the object is not found at given location
2234 #[error("Object at location {} not found: {}", path, source)]
2235 NotFound {
2236 /// The path to file
2237 path: String,
2238 /// The wrapped error
2239 source: Box<dyn std::error::Error + Send + Sync + 'static>,
2240 },
2241
2242 /// Error for invalid path
2243 #[error("Encountered object with invalid path: {}", source)]
2244 InvalidPath {
2245 /// The wrapped error
2246 #[from]
2247 source: path::Error,
2248 },
2249
2250 /// Error when `tokio::spawn` failed
2251 #[cfg(feature = "tokio")]
2252 #[error("Error joining spawned task: {}", source)]
2253 JoinError {
2254 /// The wrapped error
2255 #[from]
2256 source: tokio::task::JoinError,
2257 },
2258
2259 /// Error when the attempted operation is not supported
2260 #[error("Operation not supported: {}", source)]
2261 NotSupported {
2262 /// The wrapped error
2263 source: Box<dyn std::error::Error + Send + Sync + 'static>,
2264 },
2265
2266 /// Error when the object already exists
2267 #[error("Object at location {} already exists: {}", path, source)]
2268 AlreadyExists {
2269 /// The path to the
2270 path: String,
2271 /// The wrapped error
2272 source: Box<dyn std::error::Error + Send + Sync + 'static>,
2273 },
2274
2275 /// Error when the required conditions failed for the operation
2276 #[error("Request precondition failure for path {}: {}", path, source)]
2277 Precondition {
2278 /// The path to the file
2279 path: String,
2280 /// The wrapped error
2281 source: Box<dyn std::error::Error + Send + Sync + 'static>,
2282 },
2283
2284 /// Error when the object at the location isn't modified
2285 #[error("Object at location {} not modified: {}", path, source)]
2286 NotModified {
2287 /// The path to the file
2288 path: String,
2289 /// The wrapped error
2290 source: Box<dyn std::error::Error + Send + Sync + 'static>,
2291 },
2292
2293 /// Error when an operation is not implemented
2294 #[error("Operation {operation} not yet implemented by {implementer}.")]
2295 NotImplemented {
2296 /// What isn't implemented. Should include at least the method
2297 /// name that was called; could also include other relevant
2298 /// subcontexts.
2299 operation: String,
2300
2301 /// Which driver this is that hasn't implemented this operation,
2302 /// to aid debugging in contexts that may be using multiple implementations.
2303 implementer: String,
2304 },
2305
2306 /// Error when the used credentials don't have enough permission
2307 /// to perform the requested operation
2308 #[error(
2309 "The operation lacked the necessary privileges to complete for path {}: {}",
2310 path,
2311 source
2312 )]
2313 PermissionDenied {
2314 /// The path to the file
2315 path: String,
2316 /// The wrapped error
2317 source: Box<dyn std::error::Error + Send + Sync + 'static>,
2318 },
2319
2320 /// Error when the used credentials lack valid authentication
2321 #[error(
2322 "The operation lacked valid authentication credentials for path {}: {}",
2323 path,
2324 source
2325 )]
2326 Unauthenticated {
2327 /// The path to the file
2328 path: String,
2329 /// The wrapped error
2330 source: Box<dyn std::error::Error + Send + Sync + 'static>,
2331 },
2332
2333 /// Error when a configuration key is invalid for the store used
2334 #[error("Configuration key: '{}' is not valid for store '{}'.", key, store)]
2335 UnknownConfigurationKey {
2336 /// The object store used
2337 store: &'static str,
2338 /// The configuration key used
2339 key: String,
2340 },
2341}
2342
2343impl From<Error> for std::io::Error {
2344 fn from(e: Error) -> Self {
2345 let kind = match &e {
2346 Error::NotFound { .. } => std::io::ErrorKind::NotFound,
2347 _ => std::io::ErrorKind::Other,
2348 };
2349 Self::new(kind, e)
2350 }
2351}
2352
2353#[cfg(test)]
2354mod tests {
2355 use super::*;
2356
2357 use chrono::TimeZone;
2358
2359 macro_rules! maybe_skip_integration {
2360 () => {
2361 if std::env::var("TEST_INTEGRATION").is_err() {
2362 eprintln!("Skipping integration test - set TEST_INTEGRATION");
2363 return;
2364 }
2365 };
2366 }
2367 pub(crate) use maybe_skip_integration;
2368
2369 /// Test that the returned stream does not borrow the lifetime of Path
2370 fn list_store<'a>(
2371 store: &'a dyn ObjectStore,
2372 path_str: &str,
2373 ) -> BoxStream<'a, Result<ObjectMeta>> {
2374 let path = Path::from(path_str);
2375 store.list(Some(&path))
2376 }
2377
2378 #[cfg(any(feature = "azure-base", feature = "aws-base"))]
2379 pub(crate) async fn signing<T>(integration: &T)
2380 where
2381 T: ObjectStore + signer::Signer,
2382 {
2383 use ::http::Method;
2384 use std::time::Duration;
2385
2386 let data = Bytes::from("hello world");
2387 let path = Path::from("file.txt");
2388 integration.put(&path, data.clone().into()).await.unwrap();
2389
2390 let signed = integration
2391 .signed_url(Method::GET, &path, Duration::from_secs(60))
2392 .await
2393 .unwrap();
2394
2395 let resp = reqwest::get(signed).await.unwrap();
2396 let loaded = resp.bytes().await.unwrap();
2397
2398 assert_eq!(data, loaded);
2399 }
2400
2401 #[cfg(any(feature = "aws-base", feature = "azure-base"))]
2402 pub(crate) async fn tagging<F, Fut>(storage: Arc<dyn ObjectStore>, validate: bool, get_tags: F)
2403 where
2404 F: Fn(Path) -> Fut + Send + Sync,
2405 Fut: std::future::Future<Output = Result<client::HttpResponse>> + Send,
2406 {
2407 use bytes::Buf;
2408 use serde::Deserialize;
2409 use tokio::io::AsyncWriteExt;
2410
2411 use crate::buffered::BufWriter;
2412
2413 #[derive(Deserialize)]
2414 struct Tagging {
2415 #[serde(rename = "TagSet")]
2416 list: TagList,
2417 }
2418
2419 #[derive(Debug, Deserialize)]
2420 struct TagList {
2421 #[serde(rename = "Tag")]
2422 tags: Vec<Tag>,
2423 }
2424
2425 #[derive(Debug, Deserialize, Eq, PartialEq)]
2426 #[serde(rename_all = "PascalCase")]
2427 struct Tag {
2428 key: String,
2429 value: String,
2430 }
2431
2432 let tags = vec![
2433 Tag {
2434 key: "foo.com=bar/s".to_string(),
2435 value: "bananas/foo.com-_".to_string(),
2436 },
2437 Tag {
2438 key: "namespace/key.foo".to_string(),
2439 value: "value with a space".to_string(),
2440 },
2441 ];
2442 let mut tag_set = TagSet::default();
2443 for t in &tags {
2444 tag_set.push(&t.key, &t.value)
2445 }
2446
2447 let path = Path::from("tag_test");
2448 storage
2449 .put_opts(&path, "test".into(), tag_set.clone().into())
2450 .await
2451 .unwrap();
2452
2453 let multi_path = Path::from("tag_test_multi");
2454 let mut write = storage
2455 .put_multipart_opts(&multi_path, tag_set.clone().into())
2456 .await
2457 .unwrap();
2458
2459 write.put_part("foo".into()).await.unwrap();
2460 write.complete().await.unwrap();
2461
2462 let buf_path = Path::from("tag_test_buf");
2463 let mut buf = BufWriter::new(storage, buf_path.clone()).with_tags(tag_set);
2464 buf.write_all(b"foo").await.unwrap();
2465 buf.shutdown().await.unwrap();
2466
2467 // Write should always succeed, but certain configurations may simply ignore tags
2468 if !validate {
2469 return;
2470 }
2471
2472 for path in [path, multi_path, buf_path] {
2473 let resp = get_tags(path.clone()).await.unwrap();
2474 let body = resp.into_body().bytes().await.unwrap();
2475
2476 let mut resp: Tagging = quick_xml::de::from_reader(body.reader()).unwrap();
2477 resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key));
2478 assert_eq!(resp.list.tags, tags);
2479 }
2480 }
2481
2482 #[tokio::test]
2483 async fn test_list_lifetimes() {
2484 let store = memory::InMemory::new();
2485 let mut stream = list_store(&store, "path");
2486 assert!(stream.next().await.is_none());
2487 }
2488
2489 #[test]
2490 fn test_preconditions() {
2491 let mut meta = ObjectMeta {
2492 location: Path::from("test"),
2493 last_modified: Utc.timestamp_nanos(100),
2494 size: 100,
2495 e_tag: Some("123".to_string()),
2496 version: None,
2497 };
2498
2499 let mut options = GetOptions::default();
2500 options.check_preconditions(&meta).unwrap();
2501
2502 options.if_modified_since = Some(Utc.timestamp_nanos(50));
2503 options.check_preconditions(&meta).unwrap();
2504
2505 options.if_modified_since = Some(Utc.timestamp_nanos(100));
2506 options.check_preconditions(&meta).unwrap_err();
2507
2508 options.if_modified_since = Some(Utc.timestamp_nanos(101));
2509 options.check_preconditions(&meta).unwrap_err();
2510
2511 options = GetOptions::default();
2512
2513 options.if_unmodified_since = Some(Utc.timestamp_nanos(50));
2514 options.check_preconditions(&meta).unwrap_err();
2515
2516 options.if_unmodified_since = Some(Utc.timestamp_nanos(100));
2517 options.check_preconditions(&meta).unwrap();
2518
2519 options.if_unmodified_since = Some(Utc.timestamp_nanos(101));
2520 options.check_preconditions(&meta).unwrap();
2521
2522 options = GetOptions::default();
2523
2524 options.if_match = Some("123".to_string());
2525 options.check_preconditions(&meta).unwrap();
2526
2527 options.if_match = Some("123,354".to_string());
2528 options.check_preconditions(&meta).unwrap();
2529
2530 options.if_match = Some("354, 123,".to_string());
2531 options.check_preconditions(&meta).unwrap();
2532
2533 options.if_match = Some("354".to_string());
2534 options.check_preconditions(&meta).unwrap_err();
2535
2536 options.if_match = Some("*".to_string());
2537 options.check_preconditions(&meta).unwrap();
2538
2539 // If-Match takes precedence
2540 options.if_unmodified_since = Some(Utc.timestamp_nanos(200));
2541 options.check_preconditions(&meta).unwrap();
2542
2543 options = GetOptions::default();
2544
2545 options.if_none_match = Some("123".to_string());
2546 options.check_preconditions(&meta).unwrap_err();
2547
2548 options.if_none_match = Some("*".to_string());
2549 options.check_preconditions(&meta).unwrap_err();
2550
2551 options.if_none_match = Some("1232".to_string());
2552 options.check_preconditions(&meta).unwrap();
2553
2554 options.if_none_match = Some("23, 123".to_string());
2555 options.check_preconditions(&meta).unwrap_err();
2556
2557 // If-None-Match takes precedence
2558 options.if_modified_since = Some(Utc.timestamp_nanos(10));
2559 options.check_preconditions(&meta).unwrap_err();
2560
2561 // Check missing ETag
2562 meta.e_tag = None;
2563 options = GetOptions::default();
2564
2565 options.if_none_match = Some("*".to_string()); // Fails if any file exists
2566 options.check_preconditions(&meta).unwrap_err();
2567
2568 options = GetOptions::default();
2569 options.if_match = Some("*".to_string()); // Passes if file exists
2570 options.check_preconditions(&meta).unwrap();
2571 }
2572
2573 #[test]
2574 #[cfg(feature = "http-base")]
2575 fn test_reexported_types() {
2576 // Test HeaderMap
2577 let mut headers = HeaderMap::new();
2578 headers.insert("content-type", HeaderValue::from_static("text/plain"));
2579 assert_eq!(headers.len(), 1);
2580
2581 // Test HeaderValue
2582 let value = HeaderValue::from_static("test-value");
2583 assert_eq!(value.as_bytes(), b"test-value");
2584
2585 // Test Extensions
2586 let mut extensions = Extensions::new();
2587 extensions.insert("test-key");
2588 assert!(extensions.get::<&str>().is_some());
2589 }
2590
2591 #[test]
2592 fn test_get_options_builder() {
2593 let dt = Utc::now();
2594 let extensions = Extensions::new();
2595
2596 let options = GetOptions::new();
2597
2598 // assert defaults
2599 assert_eq!(options.if_match, None);
2600 assert_eq!(options.if_none_match, None);
2601 assert_eq!(options.if_modified_since, None);
2602 assert_eq!(options.if_unmodified_since, None);
2603 assert_eq!(options.range, None);
2604 assert_eq!(options.version, None);
2605 assert!(!options.head);
2606 assert!(options.extensions.get::<&str>().is_none());
2607
2608 let options = options
2609 .with_if_match(Some("etag-match"))
2610 .with_if_none_match(Some("etag-none-match"))
2611 .with_if_modified_since(Some(dt))
2612 .with_if_unmodified_since(Some(dt))
2613 .with_range(Some(0..100))
2614 .with_version(Some("version-1"))
2615 .with_head(true)
2616 .with_extensions(extensions.clone());
2617
2618 assert_eq!(options.if_match, Some("etag-match".to_string()));
2619 assert_eq!(options.if_none_match, Some("etag-none-match".to_string()));
2620 assert_eq!(options.if_modified_since, Some(dt));
2621 assert_eq!(options.if_unmodified_since, Some(dt));
2622 assert_eq!(options.range, Some(GetRange::Bounded(0..100)));
2623 assert_eq!(options.version, Some("version-1".to_string()));
2624 assert!(options.head);
2625 assert_eq!(options.extensions.get::<&str>(), extensions.get::<&str>());
2626 }
2627
2628 fn takes_generic_object_store<T: ObjectStore>(store: T) {
2629 // This function is just to ensure that the trait bounds are satisfied
2630 let _ = store;
2631 }
2632 #[test]
2633 fn test_dyn_impl() {
2634 let store: Arc<dyn ObjectStore> = Arc::new(memory::InMemory::new());
2635 takes_generic_object_store(store);
2636 let store: Box<dyn ObjectStore> = Box::new(memory::InMemory::new());
2637 takes_generic_object_store(store);
2638 }
2639 #[test]
2640 fn test_generic_impl() {
2641 let store = Arc::new(memory::InMemory::new());
2642 takes_generic_object_store(store);
2643 let store = Box::new(memory::InMemory::new());
2644 takes_generic_object_store(store);
2645 }
2646}