Skip to main content

object_store/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![cfg_attr(docsrs, feature(doc_cfg))]
19#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
20#![warn(
21    missing_copy_implementations,
22    missing_debug_implementations,
23    missing_docs,
24    clippy::explicit_iter_loop,
25    clippy::future_not_send,
26    clippy::use_self,
27    clippy::clone_on_ref_ptr,
28    unreachable_pub
29)]
30
31//! # object_store
32//!
33//! This crate provides a uniform API for interacting with object
34//! storage services and local files via the [`ObjectStore`]
35//! trait.
36//!
37//! Using this crate, the same binary and code can run in multiple
38//! clouds and local test environments, via a simple runtime
39//! configuration change.
40//!
41//! # Highlights
42//!
43//! 1. A high-performance async API focused on providing a consistent interface
44//!    mirroring that of object stores such as [S3]
45//!
46//! 2. Production quality, leading this crate to be used in large
47//!    scale production systems, such as [crates.io] and [InfluxDB IOx]
48//!
49//! 3. Support for advanced functionality, including atomic, conditional reads
50//!    and writes, vectored IO, bulk deletion, and more...
51//!
52//! 4. Stable and predictable governance via the [Apache Arrow] project
53//!
54//! 5. Small dependency footprint, depending on only a small number of common crates
55//!
56//! Originally developed by [InfluxData] and subsequently donated
57//! to [Apache Arrow].
58//!
59//! [Apache Arrow]: https://arrow.apache.org/
60//! [InfluxData]: https://www.influxdata.com/
61//! [crates.io]: https://github.com/rust-lang/crates.io
62//! [ACID]: https://en.wikipedia.org/wiki/ACID
63//! [S3]: https://aws.amazon.com/s3/
64//!
65//! # APIs
66//!
67//! * [`ObjectStore`]: Core object store API
68//! * [`ObjectStoreExt`]: (*New in 0.13.0*) Extension trait with additional convenience methods
69//!
70//! # Available [`ObjectStore`] Implementations
71//!
72//! By default, this crate provides the following implementations:
73//!
74//! * Memory: [`InMemory`](memory::InMemory)
75//!
76//! Feature flags are used to enable support for other implementations:
77//!
78#![cfg_attr(
79    feature = "fs",
80    doc = "* Local filesystem: [`LocalFileSystem`](local::LocalFileSystem)"
81)]
82#![cfg_attr(
83    feature = "gcp-base",
84    doc = "* [`gcp`]: [Google Cloud Storage](https://cloud.google.com/storage/) support. See [`GoogleCloudStorageBuilder`](gcp::GoogleCloudStorageBuilder)"
85)]
86#![cfg_attr(
87    feature = "aws-base",
88    doc = "* [`aws`]: [Amazon S3](https://aws.amazon.com/s3/). See [`AmazonS3Builder`](aws::AmazonS3Builder)"
89)]
90#![cfg_attr(
91    feature = "azure-base",
92    doc = "* [`azure`]: [Azure Blob Storage](https://azure.microsoft.com/en-gb/services/storage/blobs/). See [`MicrosoftAzureBuilder`](azure::MicrosoftAzureBuilder)"
93)]
94#![cfg_attr(
95    feature = "http-base",
96    doc = "* [`http`]: [HTTP/WebDAV Storage](https://datatracker.ietf.org/doc/html/rfc2518). See [`HttpBuilder`](http::HttpBuilder)"
97)]
98//!
99//! See [Feature Flags](#feature-flags) for the full set of flags.
100//!
101//! # Why not a Filesystem Interface?
102//!
103//! The [`ObjectStore`] interface is designed to mirror the APIs
104//! of object stores and *not* filesystems, and thus has stateless APIs instead
105//! of cursor based interfaces such as [`Read`] or [`Seek`] available in filesystems.
106//!
107//! This design provides the following advantages:
108//!
109//! * All operations are atomic, and readers cannot observe partial and/or failed writes
110//! * Methods map directly to object store APIs, providing both efficiency and predictability
111//! * Abstracts away filesystem and operating system specific quirks, ensuring portability
112//! * Allows for functionality not native to filesystems, such as operation preconditions
113//!   and atomic multipart uploads
114//!
115//! This crate does provide [`BufReader`] and [`BufWriter`] adapters
116//! which provide a more filesystem-like API for working with the
117//! [`ObjectStore`] trait, however, they should be used with care
118//!
119//! [`BufReader`]: buffered::BufReader
120//! [`BufWriter`]: buffered::BufWriter
121//! [`Read`]: std::io::Read
122//! [`Seek`]: std::io::Seek
123//!
124//! # Adapters
125//!
126//! [`ObjectStore`] instances can be composed with various adapters
127//! which add additional functionality:
128//!
129//! * Rate Throttling: [`ThrottleConfig`](throttle::ThrottleConfig)
130//! * Concurrent Request Limit: [`LimitStore`](limit::LimitStore)
131//!
132//! # Configuration System
133//!
134//! This crate provides a configuration system inspired by the APIs exposed by [fsspec],
135//! [PyArrow FileSystem], and [Hadoop FileSystem], allowing creating a [`DynObjectStore`]
136//! from a URL and an optional list of key value pairs. This provides a flexible interface
137//! to support a wide variety of user-defined store configurations, with minimal additional
138//! application complexity.
139//!
140//! ```no_run,ignore-wasm32
141//! # #[cfg(feature = "aws")] {
142//! # use url::Url;
143//! # use object_store::{parse_url, parse_url_opts};
144//! # use object_store::aws::{AmazonS3, AmazonS3Builder};
145//! #
146//! #
147//! // Can manually create a specific store variant using the appropriate builder
148//! let store: AmazonS3 = AmazonS3Builder::from_env()
149//!     .with_bucket_name("my-bucket").build().unwrap();
150//!
151//! // Alternatively can create an ObjectStore from an S3 URL
152//! let url = Url::parse("s3://bucket/path").unwrap();
153//! let (store, path) = parse_url(&url).unwrap();
154//! assert_eq!(path.as_ref(), "path");
155//!
156//! // Potentially with additional options
157//! let (store, path) = parse_url_opts(&url, vec![("aws_access_key_id", "...")]).unwrap();
158//!
159//! // Or with URLs that encode the bucket name in the URL path
160//! let url = Url::parse("https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket/path").unwrap();
161//! let (store, path) = parse_url(&url).unwrap();
162//! assert_eq!(path.as_ref(), "path");
163//! # }
164//! ```
165//!
166//! [PyArrow FileSystem]: https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.from_uri
167//! [fsspec]: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem
168//! [Hadoop FileSystem]: https://hadoop.apache.org/docs/r3.0.0/api/org/apache/hadoop/fs/FileSystem.html#get-java.net.URI-org.apache.hadoop.conf.Configuration-
169//!
170//! # List objects
171//!
172//! Use the [`ObjectStore::list`] method to iterate over objects in
173//! remote storage or files in the local filesystem:
174//!
175//! ```ignore-wasm32
176//! # use object_store::local::LocalFileSystem;
177//! # use std::sync::Arc;
178//! # use object_store::{path::Path, ObjectStore};
179//! # use futures_util::stream::StreamExt;
180//! # // use LocalFileSystem for example
181//! # fn get_object_store() -> Arc<dyn ObjectStore> {
182//! #   Arc::new(LocalFileSystem::new())
183//! # }
184//! #
185//! # async fn example() {
186//! #
187//! // create an ObjectStore
188//! let object_store: Arc<dyn ObjectStore> = get_object_store();
189//!
190//! // Recursively list all files below the 'data' path.
191//! // 1. On AWS S3 this would be the 'data/' prefix
192//! // 2. On a local filesystem, this would be the 'data' directory
193//! let prefix = Path::from("data");
194//!
195//! // Get an `async` stream of Metadata objects:
196//! let mut list_stream = object_store.list(Some(&prefix));
197//!
198//! // Print a line about each object
199//! while let Some(meta) = list_stream.next().await.transpose().unwrap() {
200//!     println!("Name: {}, size: {}", meta.location, meta.size);
201//! }
202//! # }
203//! ```
204//!
205//! Which will print out something like the following:
206//!
207//! ```text
208//! Name: data/file01.parquet, size: 112832
209//! Name: data/file02.parquet, size: 143119
210//! Name: data/child/file03.parquet, size: 100
211//! ...
212//! ```
213//!
214//! # Fetch objects
215//!
216//! Use the [`ObjectStoreExt::get`] / [`ObjectStore::get_opts`] method to fetch the data bytes
217//! from remote storage or files in the local filesystem as a stream.
218//!
219//! ```ignore-wasm32
220//! # use futures_util::TryStreamExt;
221//! # use object_store::local::LocalFileSystem;
222//! # use std::sync::Arc;
223//! #  use bytes::Bytes;
224//! # use object_store::{path::Path, ObjectStore, ObjectStoreExt, GetResult};
225//! # fn get_object_store() -> Arc<dyn ObjectStore> {
226//! #   Arc::new(LocalFileSystem::new())
227//! # }
228//! #
229//! # async fn example() {
230//! #
231//! // Create an ObjectStore
232//! let object_store: Arc<dyn ObjectStore> = get_object_store();
233//!
234//! // Retrieve a specific file
235//! let path = Path::from("data/file01.parquet");
236//!
237//! // Fetch just the file metadata
238//! let meta = object_store.head(&path).await.unwrap();
239//! println!("{meta:?}");
240//!
241//! // Fetch the object including metadata
242//! let result: GetResult = object_store.get(&path).await.unwrap();
243//! assert_eq!(result.meta, meta);
244//!
245//! // Buffer the entire object in memory
246//! let object: Bytes = result.bytes().await.unwrap();
247//! assert_eq!(object.len() as u64, meta.size);
248//!
249//! // Alternatively stream the bytes from object storage
250//! let stream = object_store.get(&path).await.unwrap().into_stream();
251//!
252//! // Count the '0's using `try_fold` from `TryStreamExt` trait
253//! let num_zeros = stream
254//!     .try_fold(0, |acc, bytes| async move {
255//!         Ok(acc + bytes.iter().filter(|b| **b == 0).count())
256//!     }).await.unwrap();
257//!
258//! println!("Num zeros in {} is {}", path, num_zeros);
259//! # }
260//! ```
261//!
262//! # Put Object
263//!
264//! Use the [`ObjectStoreExt::put`] method to atomically write data.
265//!
266//! ```ignore-wasm32
267//! # use object_store::local::LocalFileSystem;
268//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
269//! # use std::sync::Arc;
270//! # use object_store::path::Path;
271//! # fn get_object_store() -> Arc<dyn ObjectStore> {
272//! #   Arc::new(LocalFileSystem::new())
273//! # }
274//! # async fn put() {
275//! #
276//! let object_store: Arc<dyn ObjectStore> = get_object_store();
277//! let path = Path::from("data/file1");
278//! let payload = PutPayload::from_static(b"hello");
279//! object_store.put(&path, payload).await.unwrap();
280//! # }
281//! ```
282//!
283//! # Multipart Upload
284//!
285//! Use the [`ObjectStoreExt::put_multipart`] / [`ObjectStore::put_multipart_opts`] method to atomically write a large
286//! amount of data
287//!
288//! ```ignore-wasm32
289//! # use object_store::local::LocalFileSystem;
290//! # use object_store::{ObjectStore, ObjectStoreExt, WriteMultipart};
291//! # use std::sync::Arc;
292//! # use bytes::Bytes;
293//! # use tokio::io::AsyncWriteExt;
294//! # use object_store::path::Path;
295//! # fn get_object_store() -> Arc<dyn ObjectStore> {
296//! #   Arc::new(LocalFileSystem::new())
297//! # }
298//! # async fn multi_upload() {
299//! #
300//! let object_store: Arc<dyn ObjectStore> = get_object_store();
301//! let path = Path::from("data/large_file");
302//! let upload =  object_store.put_multipart(&path).await.unwrap();
303//! let mut write = WriteMultipart::new(upload);
304//! write.write(b"hello");
305//! write.finish().await.unwrap();
306//! # }
307//! ```
308//!
309//! # Vectored Read
310//!
311//! A common pattern, especially when reading structured datasets, is to need to fetch
312//! multiple, potentially non-contiguous, ranges of a particular object.
313//!
314//! [`ObjectStore::get_ranges`] provides an efficient way to perform such vectored IO, and will
315//! automatically coalesce adjacent ranges into an appropriate number of parallel requests.
316//!
317//! ```ignore-wasm32
318//! # use object_store::local::LocalFileSystem;
319//! # use object_store::ObjectStore;
320//! # use std::sync::Arc;
321//! # use bytes::Bytes;
322//! # use tokio::io::AsyncWriteExt;
323//! # use object_store::path::Path;
324//! # fn get_object_store() -> Arc<dyn ObjectStore> {
325//! #   Arc::new(LocalFileSystem::new())
326//! # }
327//! # async fn multi_upload() {
328//! #
329//! let object_store: Arc<dyn ObjectStore> = get_object_store();
330//! let path = Path::from("data/large_file");
331//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap();
332//! assert_eq!(ranges.len(), 3);
333//! assert_eq!(ranges[0].len(), 10);
334//! # }
335//! ```
336//!
337//! To retrieve ranges from a versioned object, use [`ObjectStore::get_opts`] by specifying the range in the [`GetOptions`].
338//!
339//! ```ignore-wasm32
340//! # use object_store::local::LocalFileSystem;
341//! # use object_store::ObjectStore;
342//! # use object_store::GetOptions;
343//! # use std::sync::Arc;
344//! # use bytes::Bytes;
345//! # use tokio::io::AsyncWriteExt;
346//! # use object_store::path::Path;
347//! # fn get_object_store() -> Arc<dyn ObjectStore> {
348//! #   Arc::new(LocalFileSystem::new())
349//! # }
350//! # async fn get_range_with_options() {
351//! #
352//! let object_store: Arc<dyn ObjectStore> = get_object_store();
353//! let path = Path::from("data/large_file");
354//! let ranges = vec![90..100, 400..600, 0..10];
355//! for range in ranges {
356//!     let opts = GetOptions::default().with_range(Some(range));
357//!     let data = object_store.get_opts(&path, opts).await.unwrap();
358//!     // Do something with the data
359//! }
360//! # }
361//! ``````
362//!
363//! # Vectored Write
364//!
365//! When writing data it is often the case that the size of the output is not known ahead of time.
366//!
367//! A common approach to handling this is to bump-allocate a `Vec`, whereby the underlying
368//! allocation is repeatedly reallocated, each time doubling the capacity. The performance of
369//! this is suboptimal as reallocating memory will often involve copying it to a new location.
370//!
371//! Fortunately, as [`PutPayload`] does not require memory regions to be contiguous, it is
372//! possible to instead allocate memory in chunks and avoid bump allocating. [`PutPayloadMut`]
373//! encapsulates this approach
374//!
375//! ```ignore-wasm32
376//! # use object_store::local::LocalFileSystem;
377//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayloadMut};
378//! # use std::sync::Arc;
379//! # use bytes::Bytes;
380//! # use tokio::io::AsyncWriteExt;
381//! # use object_store::path::Path;
382//! # fn get_object_store() -> Arc<dyn ObjectStore> {
383//! #   Arc::new(LocalFileSystem::new())
384//! # }
385//! # async fn multi_upload() {
386//! #
387//! let object_store: Arc<dyn ObjectStore> = get_object_store();
388//! let path = Path::from("data/large_file");
389//! let mut buffer = PutPayloadMut::new().with_block_size(8192);
390//! for _ in 0..22 {
391//!     buffer.extend_from_slice(&[0; 1024]);
392//! }
393//! let payload = buffer.freeze();
394//!
395//! // Payload consists of 3 separate 8KB allocations
396//! assert_eq!(payload.as_ref().len(), 3);
397//! assert_eq!(payload.as_ref()[0].len(), 8192);
398//! assert_eq!(payload.as_ref()[1].len(), 8192);
399//! assert_eq!(payload.as_ref()[2].len(), 6144);
400//!
401//! object_store.put(&path, payload).await.unwrap();
402//! # }
403//! ```
404//!
405//! # Conditional Fetch
406//!
407//! More complex object retrieval can be supported by [`ObjectStore::get_opts`].
408//!
409//! For example, efficiently refreshing a cache without re-fetching the entire object
410//! data if the object hasn't been modified.
411//!
412//! ```
413//! # use std::collections::btree_map::Entry;
414//! # use std::collections::HashMap;
415//! # use object_store::{GetOptions, GetResult, ObjectStore, ObjectStoreExt, Result, Error};
416//! # use std::sync::Arc;
417//! # use std::time::{Duration, Instant};
418//! # use bytes::Bytes;
419//! # use tokio::io::AsyncWriteExt;
420//! # use object_store::path::Path;
421//! struct CacheEntry {
422//!     /// Data returned by last request
423//!     data: Bytes,
424//!     /// ETag identifying the object returned by the server
425//!     e_tag: String,
426//!     /// Instant of last refresh
427//!     refreshed_at: Instant,
428//! }
429//!
430//! /// Example cache that checks entries after 10 seconds for a new version
431//! struct Cache {
432//!     entries: HashMap<Path, CacheEntry>,
433//!     store: Arc<dyn ObjectStore>,
434//! }
435//!
436//! impl Cache {
437//!     pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
438//!         Ok(match self.entries.get_mut(path) {
439//!             Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) {
440//!                 true => e.data.clone(), // Return cached data
441//!                 false => { // Check if remote version has changed
442//!                     let opts = GetOptions::new().with_if_none_match(Some(e.e_tag.clone()));
443//!                     match self.store.get_opts(&path, opts).await {
444//!                         Ok(d) => e.data = d.bytes().await?,
445//!                         Err(Error::NotModified { .. }) => {} // Data has not changed
446//!                         Err(e) => return Err(e),
447//!                     };
448//!                     e.refreshed_at = Instant::now();
449//!                     e.data.clone()
450//!                 }
451//!             },
452//!             None => { // Not cached, fetch data
453//!                 let get = self.store.get(&path).await?;
454//!                 let e_tag = get.meta.e_tag.clone();
455//!                 let data = get.bytes().await?;
456//!                 if let Some(e_tag) = e_tag {
457//!                     let entry = CacheEntry {
458//!                         e_tag,
459//!                         data: data.clone(),
460//!                         refreshed_at: Instant::now(),
461//!                     };
462//!                     self.entries.insert(path.clone(), entry);
463//!                 }
464//!                 data
465//!             }
466//!         })
467//!     }
468//! }
469//! ```
470//!
471//! # Conditional Put
472//!
473//! The default behaviour when writing data is to upsert any existing object at the given path,
474//! overwriting any previous value. More complex behaviours can be achieved using [`PutMode`], and
475//! can be used to build [Optimistic Concurrency Control] based transactions. This facilitates
476//! building metadata catalogs, such as [Apache Iceberg] or [Delta Lake], directly on top of object
477//! storage, without relying on a separate DBMS.
478//!
479//! ```
480//! # use object_store::{Error, ObjectStore, ObjectStoreExt, PutMode, UpdateVersion};
481//! # use std::sync::Arc;
482//! # use bytes::Bytes;
483//! # use tokio::io::AsyncWriteExt;
484//! # use object_store::memory::InMemory;
485//! # use object_store::path::Path;
486//! # fn get_object_store() -> Arc<dyn ObjectStore> {
487//! #   Arc::new(InMemory::new())
488//! # }
489//! # fn do_update(b: Bytes) -> Bytes {b}
490//! # async fn conditional_put() {
491//! let store = get_object_store();
492//! let path = Path::from("test");
493//!
494//! // Perform a conditional update on path
495//! loop {
496//!     // Perform get request
497//!     let r = store.get(&path).await.unwrap();
498//!
499//!     // Save version information fetched
500//!     let version = UpdateVersion {
501//!         e_tag: r.meta.e_tag.clone(),
502//!         version: r.meta.version.clone(),
503//!     };
504//!
505//!     // Compute new version of object contents
506//!     let new = do_update(r.bytes().await.unwrap());
507//!
508//!     // Attempt to commit transaction
509//!     match store.put_opts(&path, new.into(), PutMode::Update(version).into()).await {
510//!         Ok(_) => break, // Successfully committed
511//!         Err(Error::Precondition { .. }) => continue, // Object has changed, try again
512//!         Err(e) => panic!("{e}")
513//!     }
514//! }
515//! # }
516//! ```
517//!
518//! [Optimistic Concurrency Control]: https://en.wikipedia.org/wiki/Optimistic_concurrency_control
519//! [Apache Iceberg]: https://iceberg.apache.org/
520//! [Delta Lake]: https://delta.io/
521//!
522//! # Feature Flags
523//!
524//! The feature set is layered so that you can pick an object store
525//! implementation, its HTTP transport, and its cryptography provider
526//! independently:
527//!
528//! * `cloud-base` shared cloud implementation (XML/JSON parsing,
529//!   credentials, retry, etc.) and intentionally does *not* depend on
530//!   `reqwest` or a cryptography provider.
531//! * `reqwest` enables the built-in [`reqwest`]-based [`HttpConnector`].
532//! * `aws-lc-rs` and `ring` each provide a bundled [`client::CryptoProvider`].
533//! * `<provider>-base` (`aws-base`, `azure-base`, `gcp-base`, `http-base`)
534//!   adds the implementation specific logic on top of `cloud-base` without pulling in
535//!   `reqwest` or a cryptography provider.
536//! * `<provider>` (`aws`, `azure`, `gcp`, `http`) is the batteries-included
537//!   feature for `<provider>-base` + `reqwest` (with `rustls`) + the default
538//!   `aws-lc-rs` cryptography provider, and is the typical choice.
539//!
540//! ## Implementation specific features
541//!
542//! | Feature | Enables | Notes |
543//! | --- | --- | --- |
544//! | `aws` | `aws-base` + `reqwest` + `aws-lc-rs` | Amazon S3 with the built-in HTTP transport. |
545//! | `azure` | `azure-base` + `reqwest` + `aws-lc-rs` | Azure Blob Storage with the built-in HTTP transport. |
546//! | `gcp` | `gcp-base` + `reqwest` + `aws-lc-rs` | Google Cloud Storage with the built-in HTTP transport. |
547//! | `http` | `http-base` + `reqwest` + `aws-lc-rs` | HTTP/WebDAV with the built-in HTTP transport. |
548//! | `aws-base` |  | S3 without `reqwest` or crypto; supply your own [`HttpConnector`] and [`client::CryptoProvider`]. |
549//! | `azure-base` |  | Azure without `reqwest` or crypto; supply your own [`HttpConnector`] and [`client::CryptoProvider`]. |
550//! | `gcp-base` |  | GCS without `reqwest` or crypto; supply your own [`HttpConnector`] and [`client::CryptoProvider`]. |
551//! | `http-base` |  | HTTP/WebDAV without `reqwest`; supply your own [`HttpConnector`]. |
552//!
553//! ## Transport and crypto features
554//!
555//! | Feature | Description |
556//! | --- | --- |
557//! | `reqwest` | Enables the [`reqwest`]-based [`HttpConnector`]. Enabled automatically by `aws`, `azure`, `gcp`, and `http`. |
558//! | `aws-lc-rs` | Bundled [`aws-lc-rs`]-based [`client::CryptoProvider`]. The default for the batteries-included provider features. |
559//! | `ring` | Bundled [`ring`]-based [`client::CryptoProvider`], e.g. for WASM targets. |
560//! | `cloud-base` | Shared cloud-provider implementation. Enabled automatically by `*-base` features; usually not enabled directly. |
561//!
562//! ## Other features
563//!
564//! | Feature | Description |
565//! | --- | --- |
566//! | `fs` *(default)* | Local filesystem store via [`LocalFileSystem`](local::LocalFileSystem). |
567//! | `tokio` | Enables Tokio-based utilities such as [`BufReader`](buffered::BufReader) and [`BufWriter`](buffered::BufWriter). Pulled in automatically by `fs` and the `*-base` features. |
568//! | `integration` | Exposes the [`integration`] module, a reusable test suite for verifying custom [`ObjectStore`] implementations. Not API-stable. |
569//!
570//! ## Selecting a `reqwest` TLS backend
571//!
572//! `reqwest` needs a TLS backend to compile, so whenever you enable the `reqwest` feature directly
573//! you must also enable one of `reqwest`'s TLS features:
574//!
575//! | reqwest feature | TLS stack | Notes |
576//! | --- | --- | --- |
577//! | `reqwest/rustls` | [rustls] with [`aws-lc-rs`] | enables `aws-lc-rs`. This is what `aws`/`azure`/`gcp`/`http` enable. |
578//! | `reqwest/native-tls` | the platform's native TLS (OpenSSL / SChannel / Secure Transport) | enables neither `rustls` nor `aws-lc-rs`. |
579//! | `reqwest/rustls-no-provider` | [rustls] with no bundled provider | enables neither provider; you must install one at runtime, e.g. `rustls::crypto::ring::default_provider().install_default()`. |
580//!
581//! ## Feature examples
582//!
583//! S3 implementation only; user provides the HTTP connector and crypto provider:
584//! ```toml
585//! object_store = { default-features = false, features = ["aws-base"] }
586//! ```
587//!
588//! S3 implementation + `reqwest` + `aws-lc-rs` signing (equivalent to the `aws` feature):
589//! ```toml
590//! object_store = { default-features = false, features = ["aws-base", "reqwest", "reqwest/rustls", "aws-lc-rs"] }
591//! ```
592//!
593//! S3 implementation + `reqwest` with native TLS + `ring` signing (no `aws-lc-rs` in the dependency tree):
594//! ```toml
595//! object_store = { default-features = false, features = ["aws-base", "reqwest", "reqwest/native-tls", "ring"] }
596//! ```
597//!
598//! [rustls]: https://crates.io/crates/rustls/
599//!
600//! # Cryptography
601//!
602//! Request signing (e.g. AWS SigV4 or GCP service-account signing) requires a
603//! [`client::CryptoProvider`]. The `aws`, `gcp`, and `azure` features
604//! use [`aws-lc-rs`], matching `reqwest`'s default so that applications do not end up with
605//! two crypto stacks.
606//!
607//! If you wish to use [`ring`] (e.g. to support WASM targets), use the
608//! `*-base` feature flags, e.g. `aws-base`, and then enable the `ring` feature.
609//!
610//! When enabling the `aws-lc-rs` feature without the built-in `reqwest`
611//! transport (e.g. `aws-base` + `aws-lc-rs` with a custom [`HttpConnector`]),
612//! you must also select an [`aws-lc-rs`] backend, as `object_store` does not
613//! pick one for you.
614//! For example enable `aws-lc-rs/aws-lc-sys` (or `fips` / `non-fips`).
615//!
616//! If both `ring` and `aws-lc-rs` are enabled, `aws-lc-rs` is used by default.
617//!
618//! You can also implement a custom [`client::CryptoProvider`] to use your own cryptographic library.
619//!
620//! This signing provider is independent of the TLS crypto provider used by the
621//! built-in `reqwest` transport — see
622//! [Selecting a `reqwest` TLS backend](#selecting-a-reqwest-tls-backend). The
623//! only combination that needs the provider registered manually (e.g.
624//! `rustls::crypto::ring::default_provider().install_default()` in your `main`)
625//! is `reqwest/rustls-no-provider`; `reqwest/rustls` and `reqwest/native-tls`
626//! configure their TLS stack automatically.
627//!
628//! [`aws-lc-rs`]: https://crates.io/crates/aws-lc-rs/
629//! [`ring`]: https://crates.io/crates/ring/
630//!
631//! # TLS Certificates
632//!
633//! Stores that use HTTPS/TLS (this is true for most cloud stores) can choose how certificates are validated.
634//!
635//! By default [`rustls-platform-verifier`] is used to verify certificates using the system's certificate
636//! facilities. Alternatively, this functionality can be disabled using
637//! [`ClientOptions::with_no_system_certificates`] and certificates manually registered using
638//! [`ClientOptions::with_root_certificate`].
639//!
640//! These could be a custom CA chain, or alternatively an alternative trust store, e.g. [`webpki-roots`].
641//!
642//! ```ignore-wasm32
643//! # #[cfg(feature = "aws")] {
644//! use object_store::{ClientOptions, Certificate};
645//!
646//! let mut options = ClientOptions::default().with_no_system_certificates(true);
647//! for root_cert in webpki_root_certs::TLS_SERVER_ROOT_CERTS {
648//!     options = options.with_root_certificate(Certificate::from_der(root_cert.as_ref()).unwrap());
649//! }
650//! # }
651//! ```
652//!
653//! [CA]: https://en.wikipedia.org/wiki/Certificate_authority
654//! [`rustls-platform-verifier`]: https://crates.io/crates/rustls-platform-verifier/
655//! [`webpki-roots`]: https://crates.io/crates/webpki-roots
656//!
657//! # Customizing HTTP Clients
658//!
659//! Many [`ObjectStore`] implementations permit customization of the HTTP client via
660//! the [`HttpConnector`] trait and utilities in the [`client`] module.
661//! Examples include injecting custom HTTP headers or using an alternate
662//! tokio Runtime for I/O requests. To replace `reqwest` entirely (rather than
663//! tweak the bundled transport) see [Disabling `reqwest`](#disabling-reqwest).
664//!
665//! [`HttpConnector`]: client::HttpConnector
666//!
667//! # Disabling `reqwest`
668//!
669//! The `aws`, `azure`, `gcp`, and `http` features each bundle a
670//! [`reqwest`]-based HTTP transport, which is the right choice for most
671//! applications. If you would rather supply your own HTTP client — for example
672//! to share an existing client, to target a platform where `reqwest` does not
673//! compile (such as `wasm32-wasip1`), or to keep `reqwest` out of your
674//! dependency tree — use the matching `*-base` feature and provide an
675//! [`HttpConnector`](client::HttpConnector) at builder time.
676//!
677//! Remember to disable the default features so that `fs` (and its transitive
678//! dependencies) is not pulled in:
679//!
680//! ```toml
681//! [dependencies]
682//! object_store = { version = "0.13", default-features = false, features = ["aws-base"] }
683//! ```
684//!
685//! ```ignore
686//! use object_store::aws::AmazonS3Builder;
687//!
688//! let store = AmazonS3Builder::from_env()
689//!     // `my_connector` is your own `impl HttpConnector`
690//!     .with_http_connector(my_connector)
691//!     .build()?;
692//! ```
693//!
694//! See [Feature Flags](#feature-flags) above for the full set of flags.
695//!
696//! [`reqwest`]: https://crates.io/crates/reqwest
697
698#[cfg(feature = "aws-base")]
699pub mod aws;
700#[cfg(feature = "azure-base")]
701pub mod azure;
702#[cfg(feature = "tokio")]
703pub mod buffered;
704#[cfg(not(target_arch = "wasm32"))]
705pub mod chunked;
706pub mod delimited;
707#[cfg(feature = "gcp-base")]
708pub mod gcp;
709#[cfg(feature = "http-base")]
710pub mod http;
711#[cfg(feature = "tokio")]
712pub mod limit;
713#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
714pub mod local;
715pub mod memory;
716pub mod path;
717pub mod prefix;
718pub mod registry;
719#[cfg(feature = "cloud-base")]
720pub mod signer;
721#[cfg(feature = "tokio")]
722pub mod throttle;
723
724#[cfg(feature = "cloud-base")]
725pub mod client;
726
727#[cfg(feature = "cloud-base")]
728pub use client::{
729    ClientConfigKey, ClientOptions, CredentialProvider, StaticCredentialProvider,
730    backoff::BackoffConfig, retry::RetryConfig,
731};
732
733#[cfg(all(
734    feature = "cloud-base",
735    feature = "reqwest",
736    not(target_arch = "wasm32")
737))]
738pub use client::Certificate;
739
740#[cfg(feature = "cloud-base")]
741mod config;
742
743mod tags;
744
745pub use tags::TagSet;
746
747pub mod list;
748pub mod multipart;
749mod parse;
750mod payload;
751mod upload;
752mod util;
753
754mod attributes;
755
756#[cfg(any(feature = "integration", test))]
757pub mod integration;
758
759pub use attributes::*;
760
761pub use parse::{ObjectStoreScheme, parse_url, parse_url_opts};
762pub use payload::*;
763pub use upload::*;
764pub use util::{GetRange, OBJECT_STORE_COALESCE_DEFAULT, coalesce_ranges, collect_bytes};
765
766// Re-export HTTP types used in public API
767pub use ::http::{Extensions, HeaderMap, HeaderValue};
768
769use crate::path::Path;
770#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
771use crate::util::maybe_spawn_blocking;
772use async_trait::async_trait;
773use bytes::Bytes;
774use chrono::{DateTime, Utc};
775use futures_util::{StreamExt, TryStreamExt, stream::BoxStream};
776use std::fmt::{Debug, Formatter};
777use std::ops::Range;
778use std::sync::Arc;
779
780/// An alias for a dynamically dispatched object store implementation.
781pub type DynObjectStore = dyn ObjectStore;
782
783/// Id type for multipart uploads.
784pub type MultipartId = String;
785
786/// Universal API for object store services.
787///
788/// See the [module-level documentation](crate) for a high level overview and
789/// examples. See [`ObjectStoreExt`] for additional convenience methods.
790///
791/// # Contract
792/// This trait is a contract between object store _implementations_
793/// (e.g. providers, wrappers) and the `object_store` crate itself. It is
794/// intended to be the minimum API required for an object store.
795///
796/// The [`ObjectStoreExt`] acts as an API/contract between `object_store` and
797/// the store _users_ and provides additional methods that may be simpler to use
798/// but overlap in functionality with [`ObjectStore`].
799///
800/// # Clone
801/// If a store implements [`Clone`], that will only clone the handle to the underlying data. It will NOT clone/fork the
802/// actual key-value data. Hence, the cloned instance and the original instance share the same state.
803///
804/// # Minimal Default Implementations
805/// There are only a few default implementations for methods in this trait by
806/// design. This was different from versions prior to `0.13.0`, which had many
807/// more default implementations. Default implementations are convenient for
808/// users, but error-prone for implementors as they require keeping the
809/// convenience APIs correctly in sync.
810///
811/// As of version 0.13.0, most methods on [`ObjectStore`] must be implemented, and
812/// the convenience methods have been moved to the [`ObjectStoreExt`] trait as
813/// described above. See [#385] for more details.
814///
815/// [#385]: https://github.com/apache/arrow-rs-object-store/issues/385
816///
817/// # Wrappers
818/// If you wrap an [`ObjectStore`] -- e.g. to add observability -- you SHOULD
819/// implement all trait methods. This ensures that defaults implementations
820/// that are overwritten by the wrapped store are also used by the wrapper.
821/// For example:
822///
823/// ```ignore
824/// struct MyStore {
825///     ...
826/// }
827///
828/// #[async_trait]
829/// impl ObjectStore for MyStore {
830///     // implement custom ranges handling
831///     async fn get_ranges(
832///         &self,
833///         location: &Path,
834///         ranges: &[Range<u64>],
835///     ) -> Result<Vec<Bytes>> {
836///         ...
837///     }
838///
839///     ...
840/// }
841///
842/// struct Wrapper {
843///     inner: Arc<dyn ObjectStore>,
844/// }
845///
846/// #[async_trait]
847/// #[deny(clippy::missing_trait_methods)]
848/// impl ObjectStore for Wrapper {
849///     // If we would not implement this method,
850///     // we would get the trait default and not
851///     // use the actual implementation of `inner`.
852///     async fn get_ranges(
853///         &self,
854///         location: &Path,
855///         ranges: &[Range<u64>],
856///     ) -> Result<Vec<Bytes>> {
857///         ...
858///     }
859///
860///     ...
861/// }
862/// ```
863///
864/// To automatically detect this issue, use
865/// [`#[deny(clippy::missing_trait_methods)]`](https://rust-lang.github.io/rust-clippy/master/index.html#missing_trait_methods).
866///
867/// # Upgrade Guide for 0.13.0
868///
869/// Upgrading to object_store 0.13.0 from an earlier version typically involves:
870///
871/// 1. Add a `use` for [`ObjectStoreExt`] to solve the error
872///
873/// ```text
874/// error[E0599]: no method named `put` found for reference `&dyn object_store::ObjectStore` in the current scope
875///    --> datafusion/datasource/src/url.rs:993:14
876/// ```
877///
878/// 2. Remove any (now) redundant implementations (such as `ObjectStore::put`) from any
879///   `ObjectStore` implementations to resolve the error
880///
881/// ```text
882/// error[E0407]: method `put` is not a member of trait `ObjectStore`
883///     --> datafusion/datasource/src/url.rs:1103:9
884///      |
885/// ```
886///
887/// 3. Convert `ObjectStore::delete` to [`ObjectStore::delete_stream`] (see documentation
888///    on that method for details and examples)
889///
890/// 4. Combine `ObjectStore::copy` and `ObjectStore::copy_if_not_exists` implementations into
891///    [`ObjectStore::copy_opts`] (see documentation on that method for details and examples)
892///
893/// 5. Update `object_store::Error::NotImplemented` to include the name of the missing method
894///
895/// For example, change instances of
896/// ```text
897/// object_store::Error::NotImplemented
898/// ```
899/// to
900/// ```
901/// object_store::Error::NotImplemented {
902///    operation: "put".to_string(),
903///    implementer: "RequestCountingObjectStore".to_string(),
904///  };
905/// ```
906///
907#[async_trait]
908pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
909    /// Save the provided `payload` to `location` with the given options
910    ///
911    /// The operation is guaranteed to be atomic, it will either successfully
912    /// write the entirety of `payload` to `location`, or fail. No clients
913    /// should be able to observe a partially written object
914    async fn put_opts(
915        &self,
916        location: &Path,
917        payload: PutPayload,
918        opts: PutOptions,
919    ) -> Result<PutResult>;
920
921    /// Perform a multipart upload with options
922    ///
923    /// Client should prefer [`ObjectStore::put_opts`] for small payloads, as streaming uploads
924    /// typically require multiple separate requests. See [`MultipartUpload`] for more information
925    ///
926    /// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
927    async fn put_multipart_opts(
928        &self,
929        location: &Path,
930        opts: PutMultipartOptions,
931    ) -> Result<Box<dyn MultipartUpload>>;
932
933    /// Perform a get request with options
934    ///
935    /// ## Example
936    ///
937    /// This example uses a basic local filesystem object store to get an object with a specific etag.
938    /// On the local filesystem, supplying an invalid etag will error.
939    /// Versioned object stores will return the specified object version, if it exists.
940    ///
941    /// ```ignore-wasm32
942    /// # use object_store::local::LocalFileSystem;
943    /// # use tempfile::tempdir;
944    /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt, GetOptions};
945    /// async fn get_opts_example() {
946    ///     let tmp = tempdir().unwrap();
947    ///     let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
948    ///     let location = Path::from("example.txt");
949    ///     let content = b"Hello, Object Store!";
950    ///
951    ///     // Put the object into the store
952    ///     store
953    ///         .put(&location, content.as_ref().into())
954    ///         .await
955    ///         .expect("Failed to put object");
956    ///
957    ///     // Get the object from the store to figure out the right etag
958    ///     let result: object_store::GetResult = store.get(&location).await.expect("Failed to get object");
959    ///
960    ///     let etag = result.meta.e_tag.expect("ETag should be present");
961    ///
962    ///     // Get the object from the store with range and etag
963    ///     let bytes = store
964    ///         .get_opts(
965    ///             &location,
966    ///             GetOptions::new()
967    ///                 .with_if_match(Some(etag.clone())),
968    ///         )
969    ///         .await
970    ///         .expect("Failed to get object with range and etag")
971    ///         .bytes()
972    ///         .await
973    ///         .expect("Failed to read bytes");
974    ///
975    ///     println!(
976    ///         "Retrieved with ETag {}: {}",
977    ///         etag,
978    ///         String::from_utf8_lossy(&bytes)
979    ///     );
980    ///
981    ///     // Show that if the etag does not match, we get an error
982    ///     let wrong_etag = "wrong-etag".to_string();
983    ///     match store
984    ///         .get_opts(
985    ///             &location,
986    ///             GetOptions::new().with_if_match(Some(wrong_etag))
987    ///         )
988    ///         .await
989    ///     {
990    ///         Ok(_) => println!("Unexpectedly succeeded with wrong ETag"),
991    ///         Err(e) => println!("On a non-versioned object store, getting an invalid ETag ('wrong-etag') results in an error as expected: {}", e),
992    ///     }
993    /// }
994    /// ```
995    ///
996    /// To retrieve a range of bytes from a versioned object, specify the range in the [`GetOptions`] supplied to this method.
997    ///
998    /// ```ignore-wasm32
999    /// # use object_store::local::LocalFileSystem;
1000    /// # use tempfile::tempdir;
1001    /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt, GetOptions};
1002    /// async fn get_opts_range_example() {
1003    ///     let tmp = tempdir().unwrap();
1004    ///     let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
1005    ///     let location = Path::from("example.txt");
1006    ///     let content = b"Hello, Object Store!";
1007    ///
1008    ///     // Put the object into the store
1009    ///     store
1010    ///         .put(&location, content.as_ref().into())
1011    ///         .await
1012    ///         .expect("Failed to put object");
1013    ///
1014    ///     // Get the object from the store to figure out the right etag
1015    ///     let result: object_store::GetResult = store.get(&location).await.expect("Failed to get object");
1016    ///
1017    ///     let etag = result.meta.e_tag.expect("ETag should be present");
1018    ///
1019    ///     // Get the object from the store with range and etag
1020    ///     let bytes = store
1021    ///         .get_opts(
1022    ///             &location,
1023    ///             GetOptions::new()
1024    ///                 .with_range(Some(0..5))
1025    ///                 .with_if_match(Some(etag.clone())),
1026    ///         )
1027    ///         .await
1028    ///         .expect("Failed to get object with range and etag")
1029    ///         .bytes()
1030    ///         .await
1031    ///         .expect("Failed to read bytes");
1032    ///
1033    ///     println!(
1034    ///         "Retrieved range [0-5] with ETag {}: {}",
1035    ///         etag,
1036    ///         String::from_utf8_lossy(&bytes)
1037    ///     );
1038    ///
1039    ///     // Show that if the etag does not match, we get an error
1040    ///     let wrong_etag = "wrong-etag".to_string();
1041    ///     match store
1042    ///         .get_opts(
1043    ///             &location,
1044    ///             GetOptions::new().with_range(Some(0..5)).with_if_match(Some(wrong_etag))
1045    ///         )
1046    ///         .await
1047    ///     {
1048    ///         Ok(_) => println!("Unexpectedly succeeded with wrong ETag"),
1049    ///         Err(e) => println!("On a non-versioned object store, getting an invalid ETag ('wrong-etag') results in an error as expected: {}", e),
1050    ///     }
1051    /// }
1052    /// ```
1053    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
1054
1055    /// Return the bytes that are stored at the specified location
1056    /// in the given byte ranges
1057    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
1058        coalesce_ranges(
1059            ranges,
1060            |range| self.get_range(location, range),
1061            OBJECT_STORE_COALESCE_DEFAULT,
1062        )
1063        .await
1064    }
1065
1066    /// Delete all the objects at the specified locations
1067    ///
1068    /// When supported, this method will use bulk operations that delete more
1069    /// than one object per a request. Otherwise, the implementation may call
1070    /// the single object delete method for each location.
1071    ///
1072    /// # Bulk Delete Support
1073    ///
1074    /// The following backends support native bulk delete operations:
1075    ///
1076    /// - **AWS (S3)**: Uses the native [DeleteObjects] API with batches of up to 1000 objects
1077    /// - **Azure**: Uses the native [Blob Batch] API with batches of up to 256 objects
1078    ///
1079    /// The following backends use concurrent individual delete operations:
1080    ///
1081    /// - **GCP**: Performs individual delete requests with up to 10 concurrent operations
1082    /// - **HTTP**: Performs individual delete requests with up to 10 concurrent operations
1083    /// - **Local**: Performs individual file deletions with up to 10 concurrent operations
1084    /// - **Memory**: Performs individual in-memory deletions sequentially
1085    ///
1086    /// [DeleteObjects]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
1087    /// [Blob Batch]: https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch
1088    ///
1089    /// The returned stream yields the results of the delete operations in the
1090    /// same order as the input locations. However, some errors will be from
1091    /// an overall call to a bulk delete operation, and not from a specific
1092    /// location.
1093    ///
1094    /// If the object did not exist, the result may be an error or a success,
1095    /// depending on the behavior of the underlying store. For example, local
1096    /// filesystems, GCP, and Azure return an error, while S3 and in-memory will
1097    /// return Ok. If it is an error, it will be [`Error::NotFound`].
1098    ///
1099    /// ```ignore-wasm32
1100    /// # use futures_util::{StreamExt, TryStreamExt};
1101    /// # use object_store::local::LocalFileSystem;
1102    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1103    /// # let root = tempfile::TempDir::new().unwrap();
1104    /// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1105    /// # use object_store::{ObjectStore, ObjectStoreExt, ObjectMeta};
1106    /// # use object_store::path::Path;
1107    /// # use futures_util::{StreamExt, TryStreamExt};
1108    /// #
1109    /// // Create two objects
1110    /// store.put(&Path::from("foo"), "foo".into()).await?;
1111    /// store.put(&Path::from("bar"), "bar".into()).await?;
1112    ///
1113    /// // List object
1114    /// let locations = store.list(None).map_ok(|m| m.location).boxed();
1115    ///
1116    /// // Delete them
1117    /// store.delete_stream(locations).try_collect::<Vec<Path>>().await?;
1118    /// # Ok(())
1119    /// # }
1120    /// # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
1121    /// # rt.block_on(example()).unwrap();
1122    /// ```
1123    ///
1124    /// Note: Before version 0.13, `delete_stream` has a default implementation
1125    /// that deletes each object with up to 10 concurrent requests. This default
1126    /// behavior has been removed, and each implementation must now provide its
1127    /// own `delete_stream` implementation explicitly. The following example
1128    /// shows how to implement `delete_stream` to get the previous default
1129    /// behavior.
1130    ///
1131    /// ```
1132    /// # use async_trait::async_trait;
1133    /// # use futures_util::stream::{BoxStream, StreamExt};
1134    /// # use object_store::path::Path;
1135    /// # use object_store::{
1136    /// #     CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
1137    /// #     PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
1138    /// # };
1139    /// # use std::fmt;
1140    /// # use std::fmt::Debug;
1141    /// # use std::sync::Arc;
1142    /// #
1143    /// # struct ExampleClient;
1144    /// #
1145    /// # impl ExampleClient {
1146    /// #     async fn delete(&self, _path: &Path) -> Result<()> {
1147    /// #         Ok(())
1148    /// #     }
1149    /// # }
1150    /// #
1151    /// # struct ExampleStore {
1152    /// #     client: Arc<ExampleClient>,
1153    /// # }
1154    /// #
1155    /// # impl Debug for ExampleStore {
1156    /// #     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1157    /// #         write!(f, "ExampleStore")
1158    /// #     }
1159    /// # }
1160    /// #
1161    /// # impl fmt::Display for ExampleStore {
1162    /// #     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1163    /// #         write!(f, "ExampleStore")
1164    /// #     }
1165    /// # }
1166    /// #
1167    /// # #[async_trait]
1168    /// # impl ObjectStore for ExampleStore {
1169    /// #     async fn put_opts(&self, _: &Path, _: PutPayload, _: PutOptions) -> Result<PutResult> {
1170    /// #         todo!()
1171    /// #     }
1172    /// #
1173    /// #     async fn put_multipart_opts(
1174    /// #         &self,
1175    /// #         _: &Path,
1176    /// #         _: PutMultipartOptions,
1177    /// #     ) -> Result<Box<dyn MultipartUpload>> {
1178    /// #         todo!()
1179    /// #     }
1180    /// #
1181    /// #     async fn get_opts(&self, _: &Path, _: GetOptions) -> Result<GetResult> {
1182    /// #         todo!()
1183    /// #     }
1184    /// #
1185    /// fn delete_stream(
1186    ///     &self,
1187    ///     locations: BoxStream<'static, Result<Path>>,
1188    /// ) -> BoxStream<'static, Result<Path>> {
1189    ///     let client = Arc::clone(&self.client);
1190    ///     locations
1191    ///         .map(move |location| {
1192    ///             let client = Arc::clone(&client);
1193    ///             async move {
1194    ///                 let location = location?;
1195    ///                 client.delete(&location).await?;
1196    ///                 Ok(location)
1197    ///             }
1198    ///         })
1199    ///         .buffered(10)
1200    ///         .boxed()
1201    /// }
1202    /// #
1203    /// #     fn list(&self, _: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
1204    /// #         todo!()
1205    /// #     }
1206    /// #
1207    /// #     async fn list_with_delimiter(&self, _: Option<&Path>) -> Result<ListResult> {
1208    /// #         todo!()
1209    /// #     }
1210    /// #
1211    /// #     async fn copy_opts(&self, _: &Path, _: &Path, _: CopyOptions) -> Result<()> {
1212    /// #         todo!()
1213    /// #     }
1214    /// # }
1215    /// #
1216    /// # async fn example() {
1217    /// #     let store = ExampleStore { client: Arc::new(ExampleClient) };
1218    /// #     let paths = futures_util::stream::iter(vec![Ok(Path::from("foo")), Ok(Path::from("bar"))]).boxed();
1219    /// #     let results = store.delete_stream(paths).collect::<Vec<_>>().await;
1220    /// #     assert_eq!(results.len(), 2);
1221    /// #     assert_eq!(results[0].as_ref().unwrap(), &Path::from("foo"));
1222    /// #     assert_eq!(results[1].as_ref().unwrap(), &Path::from("bar"));
1223    /// # }
1224    /// #
1225    /// # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
1226    /// # rt.block_on(example());
1227    /// ```
1228    fn delete_stream(
1229        &self,
1230        locations: BoxStream<'static, Result<Path>>,
1231    ) -> BoxStream<'static, Result<Path>>;
1232
1233    /// List all the objects with the given prefix.
1234    ///
1235    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar` is a prefix of `foo/bar/x` but not of
1236    /// `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x` will be included.
1237    ///
1238    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
1239    ///
1240    /// For more advanced listing see [`PaginatedListStore`](list::PaginatedListStore)
1241    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>;
1242
1243    /// List all the objects with the given prefix and a location greater than `offset`
1244    ///
1245    /// Some stores, such as S3 and GCS, may be able to push `offset` down to reduce
1246    /// the number of network requests required.
1247    ///
1248    /// This returns an exclusive offset, i.e. objects at exactly `offset` will not be included.
1249    ///
1250    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
1251    ///
1252    /// For more advanced listing see [`PaginatedListStore`](list::PaginatedListStore)
1253    fn list_with_offset(
1254        &self,
1255        prefix: Option<&Path>,
1256        offset: &Path,
1257    ) -> BoxStream<'static, Result<ObjectMeta>> {
1258        let offset = offset.clone();
1259        self.list(prefix)
1260            .try_filter(move |f| futures_util::future::ready(f.location > offset))
1261            .boxed()
1262    }
1263
1264    /// List objects with the given prefix and an implementation specific
1265    /// delimiter. Returns common prefixes (directories) in addition to object
1266    /// metadata.
1267    ///
1268    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar` is a prefix of `foo/bar/x` but not of
1269    /// `foo/bar_baz/x`. List is not recursive, i.e. `foo/bar/more/x` will not be included.
1270    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
1271
1272    /// Copy an object from one path to another in the same object store.
1273    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()>;
1274
1275    /// Move an object from one path to another in the same object store.
1276    ///
1277    /// By default, this is implemented as a copy and then delete source. It may not
1278    /// check when deleting source that it was the same object that was originally copied.
1279    async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
1280        let RenameOptions {
1281            target_mode,
1282            extensions,
1283        } = options;
1284        let copy_mode = match target_mode {
1285            RenameTargetMode::Overwrite => CopyMode::Overwrite,
1286            RenameTargetMode::Create => CopyMode::Create,
1287        };
1288        let copy_options = CopyOptions {
1289            mode: copy_mode,
1290            extensions,
1291        };
1292        self.copy_opts(from, to, copy_options).await?;
1293        self.delete(from).await?;
1294        Ok(())
1295    }
1296}
1297
1298macro_rules! as_ref_impl {
1299    ($type:ty) => {
1300        #[async_trait]
1301        #[deny(clippy::missing_trait_methods)]
1302        impl<T: ObjectStore + ?Sized> ObjectStore for $type {
1303            async fn put_opts(
1304                &self,
1305                location: &Path,
1306                payload: PutPayload,
1307                opts: PutOptions,
1308            ) -> Result<PutResult> {
1309                self.as_ref().put_opts(location, payload, opts).await
1310            }
1311
1312            async fn put_multipart_opts(
1313                &self,
1314                location: &Path,
1315                opts: PutMultipartOptions,
1316            ) -> Result<Box<dyn MultipartUpload>> {
1317                self.as_ref().put_multipart_opts(location, opts).await
1318            }
1319
1320            async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
1321                self.as_ref().get_opts(location, options).await
1322            }
1323
1324            async fn get_ranges(
1325                &self,
1326                location: &Path,
1327                ranges: &[Range<u64>],
1328            ) -> Result<Vec<Bytes>> {
1329                self.as_ref().get_ranges(location, ranges).await
1330            }
1331
1332            fn delete_stream(
1333                &self,
1334                locations: BoxStream<'static, Result<Path>>,
1335            ) -> BoxStream<'static, Result<Path>> {
1336                self.as_ref().delete_stream(locations)
1337            }
1338
1339            fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
1340                self.as_ref().list(prefix)
1341            }
1342
1343            fn list_with_offset(
1344                &self,
1345                prefix: Option<&Path>,
1346                offset: &Path,
1347            ) -> BoxStream<'static, Result<ObjectMeta>> {
1348                self.as_ref().list_with_offset(prefix, offset)
1349            }
1350
1351            async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
1352                self.as_ref().list_with_delimiter(prefix).await
1353            }
1354
1355            async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
1356                self.as_ref().copy_opts(from, to, options).await
1357            }
1358
1359            async fn rename_opts(
1360                &self,
1361                from: &Path,
1362                to: &Path,
1363                options: RenameOptions,
1364            ) -> Result<()> {
1365                self.as_ref().rename_opts(from, to, options).await
1366            }
1367        }
1368    };
1369}
1370
1371as_ref_impl!(Arc<T>);
1372as_ref_impl!(Box<T>);
1373
1374/// Extension trait for [`ObjectStore`] with convenience functions.
1375///
1376/// See the [module-level documentation](crate) for a high level overview and
1377/// examples. See "contract" section within the [`ObjectStore`] documentation
1378/// for more reasoning.
1379///
1380/// # Implementation
1381/// You MUST NOT implement this trait yourself. It is automatically implemented for all [`ObjectStore`] implementations.
1382pub trait ObjectStoreExt: ObjectStore {
1383    /// Save the provided bytes to the specified location
1384    ///
1385    /// The operation is guaranteed to be atomic, it will either successfully
1386    /// write the entirety of `payload` to `location`, or fail. No clients
1387    /// should be able to observe a partially written object
1388    fn put(&self, location: &Path, payload: PutPayload) -> impl Future<Output = Result<PutResult>>;
1389
1390    /// Perform a multipart upload
1391    ///
1392    /// Client should prefer [`ObjectStoreExt::put`] for small payloads, as streaming uploads
1393    /// typically require multiple separate requests. See [`MultipartUpload`] for more information
1394    ///
1395    /// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
1396    fn put_multipart(
1397        &self,
1398        location: &Path,
1399    ) -> impl Future<Output = Result<Box<dyn MultipartUpload>>>;
1400
1401    /// Return the bytes that are stored at the specified location.
1402    ///
1403    /// ## Example
1404    ///
1405    /// This example uses a basic local filesystem object store to get an object.
1406    ///
1407    /// ```ignore-wasm32
1408    /// # use object_store::local::LocalFileSystem;
1409    /// # use tempfile::tempdir;
1410    /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
1411    /// async fn get_example() {
1412    ///     let tmp = tempdir().unwrap();
1413    ///     let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
1414    ///     let location = Path::from("example.txt");
1415    ///     let content = b"Hello, Object Store!";
1416    ///
1417    ///     // Put the object into the store
1418    ///     store
1419    ///         .put(&location, content.as_ref().into())
1420    ///         .await
1421    ///         .expect("Failed to put object");
1422    ///
1423    ///     // Get the object from the store
1424    ///     let get_result = store.get(&location).await.expect("Failed to get object");
1425    ///     let bytes = get_result.bytes().await.expect("Failed to read bytes");
1426    ///     println!("Retrieved content: {}", String::from_utf8_lossy(&bytes));
1427    /// }
1428    /// ```
1429    fn get(&self, location: &Path) -> impl Future<Output = Result<GetResult>>;
1430
1431    /// Return the bytes that are stored at the specified location
1432    /// in the given byte range.
1433    ///
1434    /// See [`GetRange::Bounded`] for more details on how `range` gets interpreted.
1435    ///
1436    /// To retrieve a range of bytes from a versioned object, use [`ObjectStore::get_opts`] by specifying the range in the [`GetOptions`].
1437    ///
1438    /// ## Examples
1439    ///
1440    /// This example uses a basic local filesystem object store to get a byte range from an object.
1441    ///
1442    /// ```ignore-wasm32
1443    /// # use object_store::local::LocalFileSystem;
1444    /// # use tempfile::tempdir;
1445    /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
1446    /// async fn get_range_example() {
1447    ///     let tmp = tempdir().unwrap();
1448    ///     let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
1449    ///     let location = Path::from("example.txt");
1450    ///     let content = b"Hello, Object Store!";
1451    ///
1452    ///     // Put the object into the store
1453    ///     store
1454    ///         .put(&location, content.as_ref().into())
1455    ///         .await
1456    ///         .expect("Failed to put object");
1457    ///
1458    ///     // Get the object from the store
1459    ///     let bytes = store
1460    ///         .get_range(&location, 0..5)
1461    ///         .await
1462    ///         .expect("Failed to get object");
1463    ///     println!("Retrieved range [0-5]: {}", String::from_utf8_lossy(&bytes));
1464    /// }
1465    /// ```
1466    fn get_range(&self, location: &Path, range: Range<u64>) -> impl Future<Output = Result<Bytes>>;
1467
1468    /// Return the metadata for the specified location
1469    fn head(&self, location: &Path) -> impl Future<Output = Result<ObjectMeta>>;
1470
1471    /// Delete the object at the specified location.
1472    fn delete(&self, location: &Path) -> impl Future<Output = Result<()>>;
1473
1474    /// Copy an object from one path to another in the same object store.
1475    ///
1476    /// If there exists an object at the destination, it will be overwritten.
1477    fn copy(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1478
1479    /// Copy an object from one path to another, only if destination is empty.
1480    ///
1481    /// Will return an error if the destination already has an object.
1482    ///
1483    /// Performs an atomic operation if the underlying object storage supports it.
1484    /// If atomic operations are not supported by the underlying object storage (like S3)
1485    /// it will return an error.
1486    fn copy_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1487
1488    /// Move an object from one path to another in the same object store.
1489    ///
1490    /// By default, this is implemented as a copy and then delete source. It may not
1491    /// check when deleting source that it was the same object that was originally copied.
1492    ///
1493    /// If there exists an object at the destination, it will be overwritten.
1494    fn rename(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1495
1496    /// Move an object from one path to another in the same object store.
1497    ///
1498    /// Will return an error if the destination already has an object.
1499    fn rename_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1500}
1501
1502impl<T> ObjectStoreExt for T
1503where
1504    T: ObjectStore + ?Sized,
1505{
1506    async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
1507        self.put_opts(location, payload, PutOptions::default())
1508            .await
1509    }
1510
1511    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
1512        self.put_multipart_opts(location, PutMultipartOptions::default())
1513            .await
1514    }
1515
1516    async fn get(&self, location: &Path) -> Result<GetResult> {
1517        self.get_opts(location, GetOptions::default()).await
1518    }
1519
1520    async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
1521        let options = GetOptions::new().with_range(Some(range));
1522        self.get_opts(location, options).await?.bytes().await
1523    }
1524
1525    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
1526        let options = GetOptions::new().with_head(true);
1527        Ok(self.get_opts(location, options).await?.meta)
1528    }
1529
1530    async fn delete(&self, location: &Path) -> Result<()> {
1531        let location = location.clone();
1532        let mut stream =
1533            self.delete_stream(futures_util::stream::once(async move { Ok(location) }).boxed());
1534        let _path = stream.try_next().await?.ok_or_else(|| Error::Generic {
1535            store: "ext",
1536            source: "`delete_stream` with one location should yield once but didn't".into(),
1537        })?;
1538        if stream.next().await.is_some() {
1539            Err(Error::Generic {
1540                store: "ext",
1541                source: "`delete_stream` with one location expected to yield exactly once, but yielded more than once".into(),
1542            })
1543        } else {
1544            Ok(())
1545        }
1546    }
1547
1548    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
1549        let options = CopyOptions::new().with_mode(CopyMode::Overwrite);
1550        self.copy_opts(from, to, options).await
1551    }
1552
1553    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
1554        let options = CopyOptions::new().with_mode(CopyMode::Create);
1555        self.copy_opts(from, to, options).await
1556    }
1557
1558    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
1559        let options = RenameOptions::new().with_target_mode(RenameTargetMode::Overwrite);
1560        self.rename_opts(from, to, options).await
1561    }
1562
1563    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
1564        let options = RenameOptions::new().with_target_mode(RenameTargetMode::Create);
1565        self.rename_opts(from, to, options).await
1566    }
1567}
1568
1569/// Result of a list call that includes objects, prefixes (directories) and a
1570/// token for the next set of results. Individual result sets may be limited to
1571/// 1,000 objects based on the underlying object storage's limitations.
1572#[derive(Debug)]
1573pub struct ListResult {
1574    /// Prefixes that are common (like directories)
1575    pub common_prefixes: Vec<Path>,
1576    /// Object metadata for the listing
1577    pub objects: Vec<ObjectMeta>,
1578    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1579    /// that need to return context-specific information (like cache status) from trait methods.
1580    ///
1581    /// HTTP-backed stores in this crate populate this with the extensions of the HTTP
1582    /// response, allowing custom HTTP middleware to propagate information to callers.
1583    /// Where a result is assembled from multiple paginated requests, the extensions of
1584    /// each response are merged, with those of later responses taking precedence.
1585    pub extensions: Extensions,
1586}
1587
1588/// The metadata that describes an object.
1589#[derive(Debug, Clone, PartialEq, Eq)]
1590pub struct ObjectMeta {
1591    /// The full path to the object
1592    pub location: Path,
1593    /// The last modified time
1594    pub last_modified: DateTime<Utc>,
1595    /// The size in bytes of the object.
1596    ///
1597    /// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM
1598    pub size: u64,
1599    /// The unique identifier for the object
1600    ///
1601    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
1602    pub e_tag: Option<String>,
1603    /// A version indicator for this object
1604    pub version: Option<String>,
1605}
1606
1607/// Options for a get request, such as range
1608#[derive(Debug, Default, Clone)]
1609pub struct GetOptions {
1610    /// Request will succeed if the `ObjectMeta::e_tag` matches
1611    /// otherwise returning [`Error::Precondition`]
1612    ///
1613    /// See <https://datatracker.ietf.org/doc/html/rfc9110#name-if-match>
1614    ///
1615    /// Examples:
1616    ///
1617    /// ```text
1618    /// If-Match: "xyzzy"
1619    /// If-Match: "xyzzy", "r2d2xxxx", "c3piozzzz"
1620    /// If-Match: *
1621    /// ```
1622    pub if_match: Option<String>,
1623    /// Request will succeed if the `ObjectMeta::e_tag` does not match
1624    /// otherwise returning [`Error::NotModified`]
1625    ///
1626    /// See <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.2>
1627    ///
1628    /// Examples:
1629    ///
1630    /// ```text
1631    /// If-None-Match: "xyzzy"
1632    /// If-None-Match: "xyzzy", "r2d2xxxx", "c3piozzzz"
1633    /// If-None-Match: *
1634    /// ```
1635    pub if_none_match: Option<String>,
1636    /// Request will succeed if the object has been modified since
1637    ///
1638    /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.3>
1639    pub if_modified_since: Option<DateTime<Utc>>,
1640    /// Request will succeed if the object has not been modified since
1641    /// otherwise returning [`Error::Precondition`]
1642    ///
1643    /// Some stores, such as S3, will only return `NotModified` for exact
1644    /// timestamp matches, instead of for any timestamp greater than or equal.
1645    ///
1646    /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.4>
1647    pub if_unmodified_since: Option<DateTime<Utc>>,
1648    /// Request transfer of only the specified range of bytes
1649    /// otherwise returning [`Error::NotModified`]
1650    ///
1651    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
1652    pub range: Option<GetRange>,
1653    /// Request a particular object version
1654    pub version: Option<String>,
1655    /// Request transfer of no content
1656    ///
1657    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-head>
1658    pub head: bool,
1659    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1660    /// that need to pass context-specific information (like tracing spans) via trait methods.
1661    ///
1662    /// These extensions are ignored entirely by backends offered through this crate.
1663    pub extensions: Extensions,
1664}
1665
1666impl GetOptions {
1667    /// Returns an error if the modification conditions on this request are not satisfied
1668    ///
1669    /// <https://datatracker.ietf.org/doc/html/rfc7232#section-6>
1670    pub fn check_preconditions(&self, meta: &ObjectMeta) -> Result<()> {
1671        // The use of the invalid etag "*" means no ETag is equivalent to never matching
1672        let etag = meta.e_tag.as_deref().unwrap_or("*");
1673        let last_modified = meta.last_modified;
1674
1675        if let Some(m) = &self.if_match {
1676            if m != "*" && m.split(',').map(str::trim).all(|x| x != etag) {
1677                return Err(Error::Precondition {
1678                    path: meta.location.to_string(),
1679                    source: format!("{etag} does not match {m}").into(),
1680                });
1681            }
1682        } else if let Some(date) = self.if_unmodified_since {
1683            if last_modified > date {
1684                return Err(Error::Precondition {
1685                    path: meta.location.to_string(),
1686                    source: format!("{date} < {last_modified}").into(),
1687                });
1688            }
1689        }
1690
1691        if let Some(m) = &self.if_none_match {
1692            if m == "*" || m.split(',').map(str::trim).any(|x| x == etag) {
1693                return Err(Error::NotModified {
1694                    path: meta.location.to_string(),
1695                    source: format!("{etag} matches {m}").into(),
1696                });
1697            }
1698        } else if let Some(date) = self.if_modified_since {
1699            if last_modified <= date {
1700                return Err(Error::NotModified {
1701                    path: meta.location.to_string(),
1702                    source: format!("{date} >= {last_modified}").into(),
1703                });
1704            }
1705        }
1706        Ok(())
1707    }
1708
1709    /// Create a new [`GetOptions`]
1710    pub fn new() -> Self {
1711        Self::default()
1712    }
1713
1714    /// Sets the `if_match` condition.
1715    ///
1716    /// See [`GetOptions::if_match`]
1717    #[must_use]
1718    pub fn with_if_match(mut self, etag: Option<impl Into<String>>) -> Self {
1719        self.if_match = etag.map(Into::into);
1720        self
1721    }
1722
1723    /// Sets the `if_none_match` condition.
1724    ///
1725    /// See [`GetOptions::if_none_match`]
1726    #[must_use]
1727    pub fn with_if_none_match(mut self, etag: Option<impl Into<String>>) -> Self {
1728        self.if_none_match = etag.map(Into::into);
1729        self
1730    }
1731
1732    /// Sets the `if_modified_since` condition.
1733    ///
1734    /// See [`GetOptions::if_modified_since`]
1735    #[must_use]
1736    pub fn with_if_modified_since(mut self, dt: Option<impl Into<DateTime<Utc>>>) -> Self {
1737        self.if_modified_since = dt.map(Into::into);
1738        self
1739    }
1740
1741    /// Sets the `if_unmodified_since` condition.
1742    ///
1743    /// See [`GetOptions::if_unmodified_since`]
1744    #[must_use]
1745    pub fn with_if_unmodified_since(mut self, dt: Option<impl Into<DateTime<Utc>>>) -> Self {
1746        self.if_unmodified_since = dt.map(Into::into);
1747        self
1748    }
1749
1750    /// Sets the `range` condition.
1751    ///
1752    /// See [`GetOptions::range`]
1753    #[must_use]
1754    pub fn with_range(mut self, range: Option<impl Into<GetRange>>) -> Self {
1755        self.range = range.map(Into::into);
1756        self
1757    }
1758
1759    /// Sets the `version` condition.
1760    ///
1761    /// See [`GetOptions::version`]
1762    #[must_use]
1763    pub fn with_version(mut self, version: Option<impl Into<String>>) -> Self {
1764        self.version = version.map(Into::into);
1765        self
1766    }
1767
1768    /// Sets the `head` condition.
1769    ///
1770    /// See [`GetOptions::head`]
1771    #[must_use]
1772    pub fn with_head(mut self, head: impl Into<bool>) -> Self {
1773        self.head = head.into();
1774        self
1775    }
1776
1777    /// Sets the `extensions` condition.
1778    ///
1779    /// See [`GetOptions::extensions`]
1780    #[must_use]
1781    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1782        self.extensions = extensions;
1783        self
1784    }
1785}
1786
1787/// Result for a get request
1788#[derive(Debug)]
1789pub struct GetResult {
1790    /// The [`GetResultPayload`]
1791    pub payload: GetResultPayload,
1792    /// The [`ObjectMeta`] for this object
1793    pub meta: ObjectMeta,
1794    /// The range of bytes returned by this request
1795    ///
1796    /// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM
1797    pub range: Range<u64>,
1798    /// Additional object attributes
1799    pub attributes: Attributes,
1800    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1801    /// that need to return context-specific information (like cache status) from trait methods.
1802    ///
1803    /// HTTP-backed stores in this crate populate this with the extensions of the HTTP
1804    /// response, allowing custom HTTP middleware to propagate information to callers.
1805    pub extensions: Extensions,
1806}
1807
1808/// The kind of a [`GetResult`]
1809///
1810/// This special cases the case of a local file, as some systems may
1811/// be able to optimise the case of a file already present on local disk
1812pub enum GetResultPayload {
1813    /// The file, path
1814    #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1815    File(std::fs::File, std::path::PathBuf),
1816    /// An opaque stream of bytes
1817    Stream(BoxStream<'static, Result<Bytes>>),
1818}
1819
1820impl Debug for GetResultPayload {
1821    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1822        match self {
1823            #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1824            Self::File(_, _) => write!(f, "GetResultPayload(File)"),
1825            Self::Stream(_) => write!(f, "GetResultPayload(Stream)"),
1826        }
1827    }
1828}
1829
1830impl GetResult {
1831    /// Collects the data into a [`Bytes`]
1832    pub async fn bytes(self) -> Result<Bytes> {
1833        let len = self.range.end - self.range.start;
1834        match self.payload {
1835            #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1836            GetResultPayload::File(mut file, path) => {
1837                maybe_spawn_blocking(move || {
1838                    use crate::local::read_range;
1839
1840                    let buffer = read_range(&mut file, &path, self.range)?;
1841
1842                    Ok(buffer)
1843                })
1844                .await
1845            }
1846            GetResultPayload::Stream(s) => collect_bytes(s, Some(len)).await,
1847        }
1848    }
1849
1850    /// Converts this into a byte stream
1851    ///
1852    /// If the `self.kind` is [`GetResultPayload::File`] will perform chunked reads of the file,
1853    /// otherwise will return the [`GetResultPayload::Stream`].
1854    ///
1855    /// # Tokio Compatibility
1856    ///
1857    /// Tokio discourages performing blocking IO on a tokio worker thread, however,
1858    /// no major operating systems have stable async file APIs. Therefore if called from
1859    /// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
1860    /// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
1861    ///
1862    /// If not called from a tokio context, this will perform IO on the current thread with
1863    /// no additional complexity or overheads
1864    pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
1865        match self.payload {
1866            #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1867            GetResultPayload::File(file, path) => {
1868                const CHUNK_SIZE: usize = 8 * 1024;
1869                local::chunked_stream(file, path, self.range, CHUNK_SIZE)
1870            }
1871            GetResultPayload::Stream(s) => s,
1872        }
1873    }
1874}
1875
1876/// Configure preconditions for the put operation
1877#[derive(Debug, Clone, PartialEq, Eq, Default)]
1878pub enum PutMode {
1879    /// Perform an atomic write operation, overwriting any object present at the provided path
1880    #[default]
1881    Overwrite,
1882    /// Perform an atomic write operation, returning [`Error::AlreadyExists`] if an
1883    /// object already exists at the provided path
1884    Create,
1885    /// Perform an atomic write operation if the current version of the object matches the
1886    /// provided [`UpdateVersion`], returning [`Error::Precondition`] otherwise
1887    Update(UpdateVersion),
1888}
1889
1890/// Uniquely identifies a version of an object to update
1891///
1892/// Stores will use differing combinations of `e_tag` and `version` to provide conditional
1893/// updates, and it is therefore recommended applications preserve both
1894#[derive(Debug, Clone, PartialEq, Eq)]
1895pub struct UpdateVersion {
1896    /// The unique identifier for the newly created object
1897    ///
1898    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
1899    pub e_tag: Option<String>,
1900    /// A version indicator for the newly created object
1901    pub version: Option<String>,
1902}
1903
1904impl From<PutResult> for UpdateVersion {
1905    fn from(value: PutResult) -> Self {
1906        Self {
1907            e_tag: value.e_tag,
1908            version: value.version,
1909        }
1910    }
1911}
1912
1913/// Options for a put request
1914#[derive(Debug, Clone, Default)]
1915pub struct PutOptions {
1916    /// Configure the [`PutMode`] for this operation
1917    pub mode: PutMode,
1918    /// Provide a [`TagSet`] for this object
1919    ///
1920    /// Implementations that don't support object tagging should ignore this
1921    pub tags: TagSet,
1922    /// Provide a set of [`Attributes`]
1923    ///
1924    /// Implementations that don't support an attribute should return an error
1925    pub attributes: Attributes,
1926    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1927    /// that need to pass context-specific information (like tracing spans) via trait methods.
1928    ///
1929    /// These extensions are ignored entirely by backends offered through this crate.
1930    ///
1931    /// They are also excluded from [`PartialEq`] and [`Eq`].
1932    pub extensions: Extensions,
1933}
1934
1935impl PartialEq<Self> for PutOptions {
1936    fn eq(&self, other: &Self) -> bool {
1937        let Self {
1938            mode,
1939            tags,
1940            attributes,
1941            extensions: _,
1942        } = self;
1943        let Self {
1944            mode: other_mode,
1945            tags: other_tags,
1946            attributes: other_attributes,
1947            extensions: _,
1948        } = other;
1949        (mode == other_mode) && (tags == other_tags) && (attributes == other_attributes)
1950    }
1951}
1952
1953impl Eq for PutOptions {}
1954
1955impl From<PutMode> for PutOptions {
1956    fn from(mode: PutMode) -> Self {
1957        Self {
1958            mode,
1959            ..Default::default()
1960        }
1961    }
1962}
1963
1964impl From<TagSet> for PutOptions {
1965    fn from(tags: TagSet) -> Self {
1966        Self {
1967            tags,
1968            ..Default::default()
1969        }
1970    }
1971}
1972
1973impl From<Attributes> for PutOptions {
1974    fn from(attributes: Attributes) -> Self {
1975        Self {
1976            attributes,
1977            ..Default::default()
1978        }
1979    }
1980}
1981
1982// See <https://github.com/apache/arrow-rs-object-store/issues/339>.
1983#[doc(hidden)]
1984#[deprecated(note = "Use PutMultipartOptions", since = "0.12.3")]
1985pub type PutMultipartOpts = PutMultipartOptions;
1986
1987/// Options for [`ObjectStore::put_multipart_opts`]
1988#[derive(Debug, Clone, Default)]
1989pub struct PutMultipartOptions {
1990    /// Provide a [`TagSet`] for this object
1991    ///
1992    /// Implementations that don't support object tagging should ignore this
1993    pub tags: TagSet,
1994    /// Provide a set of [`Attributes`]
1995    ///
1996    /// Implementations that don't support an attribute should return an error
1997    pub attributes: Attributes,
1998    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1999    /// that need to pass context-specific information (like tracing spans) via trait methods.
2000    ///
2001    /// These extensions are ignored entirely by backends offered through this crate.
2002    ///
2003    /// They are also excluded from [`PartialEq`] and [`Eq`].
2004    pub extensions: Extensions,
2005}
2006
2007impl PartialEq<Self> for PutMultipartOptions {
2008    fn eq(&self, other: &Self) -> bool {
2009        let Self {
2010            tags,
2011            attributes,
2012            extensions: _,
2013        } = self;
2014        let Self {
2015            tags: other_tags,
2016            attributes: other_attributes,
2017            extensions: _,
2018        } = other;
2019        (tags == other_tags) && (attributes == other_attributes)
2020    }
2021}
2022
2023impl Eq for PutMultipartOptions {}
2024
2025impl From<TagSet> for PutMultipartOptions {
2026    fn from(tags: TagSet) -> Self {
2027        Self {
2028            tags,
2029            ..Default::default()
2030        }
2031    }
2032}
2033
2034impl From<Attributes> for PutMultipartOptions {
2035    fn from(attributes: Attributes) -> Self {
2036        Self {
2037            attributes,
2038            ..Default::default()
2039        }
2040    }
2041}
2042
2043/// Result for a put request
2044#[derive(Debug, Clone)]
2045pub struct PutResult {
2046    /// The unique identifier for the newly created object
2047    ///
2048    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
2049    pub e_tag: Option<String>,
2050    /// A version indicator for the newly created object
2051    pub version: Option<String>,
2052    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
2053    /// that need to return context-specific information (like cache status) from trait methods.
2054    ///
2055    /// HTTP-backed stores in this crate populate this with the extensions of the HTTP
2056    /// response, allowing custom HTTP middleware to propagate information to callers.
2057    ///
2058    /// These extensions are excluded from [`PartialEq`] and [`Eq`].
2059    pub extensions: Extensions,
2060}
2061
2062impl PartialEq<Self> for PutResult {
2063    fn eq(&self, other: &Self) -> bool {
2064        let Self {
2065            e_tag,
2066            version,
2067            extensions: _,
2068        } = self;
2069        let Self {
2070            e_tag: other_e_tag,
2071            version: other_version,
2072            extensions: _,
2073        } = other;
2074        (e_tag == other_e_tag) && (version == other_version)
2075    }
2076}
2077
2078impl Eq for PutResult {}
2079
2080/// Configure preconditions for the copy operation
2081#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
2082pub enum CopyMode {
2083    /// Perform an atomic write operation, overwriting any object present at the provided path
2084    #[default]
2085    Overwrite,
2086    /// Perform an atomic write operation, returning [`Error::AlreadyExists`] if an
2087    /// object already exists at the provided path
2088    Create,
2089}
2090
2091/// Options for a copy request
2092#[derive(Debug, Clone, Default)]
2093pub struct CopyOptions {
2094    /// Configure the [`CopyMode`] for this operation
2095    pub mode: CopyMode,
2096    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
2097    /// that need to pass context-specific information (like tracing spans) via trait methods.
2098    ///
2099    /// These extensions are ignored entirely by backends offered through this crate.
2100    ///
2101    /// They are also excluded from [`PartialEq`] and [`Eq`].
2102    pub extensions: Extensions,
2103}
2104
2105impl CopyOptions {
2106    /// Create a new [`CopyOptions`]
2107    pub fn new() -> Self {
2108        Self::default()
2109    }
2110
2111    /// Sets the `mode.
2112    ///
2113    /// See [`CopyOptions::mode`].
2114    #[must_use]
2115    pub fn with_mode(mut self, mode: CopyMode) -> Self {
2116        self.mode = mode;
2117        self
2118    }
2119
2120    /// Sets the `extensions`.
2121    ///
2122    /// See [`CopyOptions::extensions`].
2123    #[must_use]
2124    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
2125        self.extensions = extensions;
2126        self
2127    }
2128}
2129
2130impl PartialEq<Self> for CopyOptions {
2131    fn eq(&self, other: &Self) -> bool {
2132        let Self {
2133            mode,
2134            extensions: _,
2135        } = self;
2136        let Self {
2137            mode: mode_other,
2138            extensions: _,
2139        } = other;
2140
2141        mode == mode_other
2142    }
2143}
2144
2145impl Eq for CopyOptions {}
2146
2147/// Configure preconditions for the target of rename operation.
2148///
2149/// Note though that the source location may or not be deleted at the same time in an atomic operation. There is
2150/// currently NO flag to control the atomicity of "delete source at the same time as creating the target".
2151#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
2152pub enum RenameTargetMode {
2153    /// Perform a write operation on the target, overwriting any object present at the provided path.
2154    #[default]
2155    Overwrite,
2156    /// Perform an atomic write operation of the target, returning [`Error::AlreadyExists`] if an
2157    /// object already exists at the provided path.
2158    Create,
2159}
2160
2161/// Options for a rename request
2162#[derive(Debug, Clone, Default)]
2163pub struct RenameOptions {
2164    /// Configure the [`RenameTargetMode`] for this operation
2165    pub target_mode: RenameTargetMode,
2166    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
2167    /// that need to pass context-specific information (like tracing spans) via trait methods.
2168    ///
2169    /// These extensions are ignored entirely by backends offered through this crate.
2170    ///
2171    /// They are also excluded from [`PartialEq`] and [`Eq`].
2172    pub extensions: Extensions,
2173}
2174
2175impl RenameOptions {
2176    /// Create a new [`RenameOptions`]
2177    pub fn new() -> Self {
2178        Self::default()
2179    }
2180
2181    /// Sets the `target_mode=.
2182    ///
2183    /// See [`RenameOptions::target_mode`].
2184    #[must_use]
2185    pub fn with_target_mode(mut self, target_mode: RenameTargetMode) -> Self {
2186        self.target_mode = target_mode;
2187        self
2188    }
2189
2190    /// Sets the `extensions`.
2191    ///
2192    /// See [`RenameOptions::extensions`].
2193    #[must_use]
2194    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
2195        self.extensions = extensions;
2196        self
2197    }
2198}
2199
2200impl PartialEq<Self> for RenameOptions {
2201    fn eq(&self, other: &Self) -> bool {
2202        let Self {
2203            target_mode,
2204            extensions: _,
2205        } = self;
2206        let Self {
2207            target_mode: target_mode_other,
2208            extensions: _,
2209        } = other;
2210
2211        target_mode == target_mode_other
2212    }
2213}
2214
2215impl Eq for RenameOptions {}
2216
2217/// A specialized `Result` for object store-related errors
2218pub type Result<T, E = Error> = std::result::Result<T, E>;
2219
2220/// A specialized `Error` for object store-related errors
2221#[derive(Debug, thiserror::Error)]
2222#[non_exhaustive]
2223pub enum Error {
2224    /// A fallback error type when no variant matches
2225    #[error("Generic {} error: {}", store, source)]
2226    Generic {
2227        /// The store this error originated from
2228        store: &'static str,
2229        /// The wrapped error
2230        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2231    },
2232
2233    /// Error when the object is not found at given location
2234    #[error("Object at location {} not found: {}", path, source)]
2235    NotFound {
2236        /// The path to file
2237        path: String,
2238        /// The wrapped error
2239        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2240    },
2241
2242    /// Error for invalid path
2243    #[error("Encountered object with invalid path: {}", source)]
2244    InvalidPath {
2245        /// The wrapped error
2246        #[from]
2247        source: path::Error,
2248    },
2249
2250    /// Error when `tokio::spawn` failed
2251    #[cfg(feature = "tokio")]
2252    #[error("Error joining spawned task: {}", source)]
2253    JoinError {
2254        /// The wrapped error
2255        #[from]
2256        source: tokio::task::JoinError,
2257    },
2258
2259    /// Error when the attempted operation is not supported
2260    #[error("Operation not supported: {}", source)]
2261    NotSupported {
2262        /// The wrapped error
2263        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2264    },
2265
2266    /// Error when the object already exists
2267    #[error("Object at location {} already exists: {}", path, source)]
2268    AlreadyExists {
2269        /// The path to the
2270        path: String,
2271        /// The wrapped error
2272        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2273    },
2274
2275    /// Error when the required conditions failed for the operation
2276    #[error("Request precondition failure for path {}: {}", path, source)]
2277    Precondition {
2278        /// The path to the file
2279        path: String,
2280        /// The wrapped error
2281        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2282    },
2283
2284    /// Error when the object at the location isn't modified
2285    #[error("Object at location {} not modified: {}", path, source)]
2286    NotModified {
2287        /// The path to the file
2288        path: String,
2289        /// The wrapped error
2290        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2291    },
2292
2293    /// Error when an operation is not implemented
2294    #[error("Operation {operation} not yet implemented by {implementer}.")]
2295    NotImplemented {
2296        /// What isn't implemented. Should include at least the method
2297        /// name that was called; could also include other relevant
2298        /// subcontexts.
2299        operation: String,
2300
2301        /// Which driver this is that hasn't implemented this operation,
2302        /// to aid debugging in contexts that may be using multiple implementations.
2303        implementer: String,
2304    },
2305
2306    /// Error when the used credentials don't have enough permission
2307    /// to perform the requested operation
2308    #[error(
2309        "The operation lacked the necessary privileges to complete for path {}: {}",
2310        path,
2311        source
2312    )]
2313    PermissionDenied {
2314        /// The path to the file
2315        path: String,
2316        /// The wrapped error
2317        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2318    },
2319
2320    /// Error when the used credentials lack valid authentication
2321    #[error(
2322        "The operation lacked valid authentication credentials for path {}: {}",
2323        path,
2324        source
2325    )]
2326    Unauthenticated {
2327        /// The path to the file
2328        path: String,
2329        /// The wrapped error
2330        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2331    },
2332
2333    /// Error when a configuration key is invalid for the store used
2334    #[error("Configuration key: '{}' is not valid for store '{}'.", key, store)]
2335    UnknownConfigurationKey {
2336        /// The object store used
2337        store: &'static str,
2338        /// The configuration key used
2339        key: String,
2340    },
2341}
2342
2343impl From<Error> for std::io::Error {
2344    fn from(e: Error) -> Self {
2345        let kind = match &e {
2346            Error::NotFound { .. } => std::io::ErrorKind::NotFound,
2347            _ => std::io::ErrorKind::Other,
2348        };
2349        Self::new(kind, e)
2350    }
2351}
2352
2353#[cfg(test)]
2354mod tests {
2355    use super::*;
2356
2357    use chrono::TimeZone;
2358
2359    macro_rules! maybe_skip_integration {
2360        () => {
2361            if std::env::var("TEST_INTEGRATION").is_err() {
2362                eprintln!("Skipping integration test - set TEST_INTEGRATION");
2363                return;
2364            }
2365        };
2366    }
2367    pub(crate) use maybe_skip_integration;
2368
2369    /// Test that the returned stream does not borrow the lifetime of Path
2370    fn list_store<'a>(
2371        store: &'a dyn ObjectStore,
2372        path_str: &str,
2373    ) -> BoxStream<'a, Result<ObjectMeta>> {
2374        let path = Path::from(path_str);
2375        store.list(Some(&path))
2376    }
2377
2378    #[cfg(any(feature = "azure-base", feature = "aws-base"))]
2379    pub(crate) async fn signing<T>(integration: &T)
2380    where
2381        T: ObjectStore + signer::Signer,
2382    {
2383        use ::http::Method;
2384        use std::time::Duration;
2385
2386        let data = Bytes::from("hello world");
2387        let path = Path::from("file.txt");
2388        integration.put(&path, data.clone().into()).await.unwrap();
2389
2390        let signed = integration
2391            .signed_url(Method::GET, &path, Duration::from_secs(60))
2392            .await
2393            .unwrap();
2394
2395        let resp = reqwest::get(signed).await.unwrap();
2396        let loaded = resp.bytes().await.unwrap();
2397
2398        assert_eq!(data, loaded);
2399    }
2400
2401    #[cfg(any(feature = "aws-base", feature = "azure-base"))]
2402    pub(crate) async fn tagging<F, Fut>(storage: Arc<dyn ObjectStore>, validate: bool, get_tags: F)
2403    where
2404        F: Fn(Path) -> Fut + Send + Sync,
2405        Fut: std::future::Future<Output = Result<client::HttpResponse>> + Send,
2406    {
2407        use bytes::Buf;
2408        use serde::Deserialize;
2409        use tokio::io::AsyncWriteExt;
2410
2411        use crate::buffered::BufWriter;
2412
2413        #[derive(Deserialize)]
2414        struct Tagging {
2415            #[serde(rename = "TagSet")]
2416            list: TagList,
2417        }
2418
2419        #[derive(Debug, Deserialize)]
2420        struct TagList {
2421            #[serde(rename = "Tag")]
2422            tags: Vec<Tag>,
2423        }
2424
2425        #[derive(Debug, Deserialize, Eq, PartialEq)]
2426        #[serde(rename_all = "PascalCase")]
2427        struct Tag {
2428            key: String,
2429            value: String,
2430        }
2431
2432        let tags = vec![
2433            Tag {
2434                key: "foo.com=bar/s".to_string(),
2435                value: "bananas/foo.com-_".to_string(),
2436            },
2437            Tag {
2438                key: "namespace/key.foo".to_string(),
2439                value: "value with a space".to_string(),
2440            },
2441        ];
2442        let mut tag_set = TagSet::default();
2443        for t in &tags {
2444            tag_set.push(&t.key, &t.value)
2445        }
2446
2447        let path = Path::from("tag_test");
2448        storage
2449            .put_opts(&path, "test".into(), tag_set.clone().into())
2450            .await
2451            .unwrap();
2452
2453        let multi_path = Path::from("tag_test_multi");
2454        let mut write = storage
2455            .put_multipart_opts(&multi_path, tag_set.clone().into())
2456            .await
2457            .unwrap();
2458
2459        write.put_part("foo".into()).await.unwrap();
2460        write.complete().await.unwrap();
2461
2462        let buf_path = Path::from("tag_test_buf");
2463        let mut buf = BufWriter::new(storage, buf_path.clone()).with_tags(tag_set);
2464        buf.write_all(b"foo").await.unwrap();
2465        buf.shutdown().await.unwrap();
2466
2467        // Write should always succeed, but certain configurations may simply ignore tags
2468        if !validate {
2469            return;
2470        }
2471
2472        for path in [path, multi_path, buf_path] {
2473            let resp = get_tags(path.clone()).await.unwrap();
2474            let body = resp.into_body().bytes().await.unwrap();
2475
2476            let mut resp: Tagging = quick_xml::de::from_reader(body.reader()).unwrap();
2477            resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key));
2478            assert_eq!(resp.list.tags, tags);
2479        }
2480    }
2481
2482    #[tokio::test]
2483    async fn test_list_lifetimes() {
2484        let store = memory::InMemory::new();
2485        let mut stream = list_store(&store, "path");
2486        assert!(stream.next().await.is_none());
2487    }
2488
2489    #[test]
2490    fn test_preconditions() {
2491        let mut meta = ObjectMeta {
2492            location: Path::from("test"),
2493            last_modified: Utc.timestamp_nanos(100),
2494            size: 100,
2495            e_tag: Some("123".to_string()),
2496            version: None,
2497        };
2498
2499        let mut options = GetOptions::default();
2500        options.check_preconditions(&meta).unwrap();
2501
2502        options.if_modified_since = Some(Utc.timestamp_nanos(50));
2503        options.check_preconditions(&meta).unwrap();
2504
2505        options.if_modified_since = Some(Utc.timestamp_nanos(100));
2506        options.check_preconditions(&meta).unwrap_err();
2507
2508        options.if_modified_since = Some(Utc.timestamp_nanos(101));
2509        options.check_preconditions(&meta).unwrap_err();
2510
2511        options = GetOptions::default();
2512
2513        options.if_unmodified_since = Some(Utc.timestamp_nanos(50));
2514        options.check_preconditions(&meta).unwrap_err();
2515
2516        options.if_unmodified_since = Some(Utc.timestamp_nanos(100));
2517        options.check_preconditions(&meta).unwrap();
2518
2519        options.if_unmodified_since = Some(Utc.timestamp_nanos(101));
2520        options.check_preconditions(&meta).unwrap();
2521
2522        options = GetOptions::default();
2523
2524        options.if_match = Some("123".to_string());
2525        options.check_preconditions(&meta).unwrap();
2526
2527        options.if_match = Some("123,354".to_string());
2528        options.check_preconditions(&meta).unwrap();
2529
2530        options.if_match = Some("354, 123,".to_string());
2531        options.check_preconditions(&meta).unwrap();
2532
2533        options.if_match = Some("354".to_string());
2534        options.check_preconditions(&meta).unwrap_err();
2535
2536        options.if_match = Some("*".to_string());
2537        options.check_preconditions(&meta).unwrap();
2538
2539        // If-Match takes precedence
2540        options.if_unmodified_since = Some(Utc.timestamp_nanos(200));
2541        options.check_preconditions(&meta).unwrap();
2542
2543        options = GetOptions::default();
2544
2545        options.if_none_match = Some("123".to_string());
2546        options.check_preconditions(&meta).unwrap_err();
2547
2548        options.if_none_match = Some("*".to_string());
2549        options.check_preconditions(&meta).unwrap_err();
2550
2551        options.if_none_match = Some("1232".to_string());
2552        options.check_preconditions(&meta).unwrap();
2553
2554        options.if_none_match = Some("23, 123".to_string());
2555        options.check_preconditions(&meta).unwrap_err();
2556
2557        // If-None-Match takes precedence
2558        options.if_modified_since = Some(Utc.timestamp_nanos(10));
2559        options.check_preconditions(&meta).unwrap_err();
2560
2561        // Check missing ETag
2562        meta.e_tag = None;
2563        options = GetOptions::default();
2564
2565        options.if_none_match = Some("*".to_string()); // Fails if any file exists
2566        options.check_preconditions(&meta).unwrap_err();
2567
2568        options = GetOptions::default();
2569        options.if_match = Some("*".to_string()); // Passes if file exists
2570        options.check_preconditions(&meta).unwrap();
2571    }
2572
2573    #[test]
2574    #[cfg(feature = "http-base")]
2575    fn test_reexported_types() {
2576        // Test HeaderMap
2577        let mut headers = HeaderMap::new();
2578        headers.insert("content-type", HeaderValue::from_static("text/plain"));
2579        assert_eq!(headers.len(), 1);
2580
2581        // Test HeaderValue
2582        let value = HeaderValue::from_static("test-value");
2583        assert_eq!(value.as_bytes(), b"test-value");
2584
2585        // Test Extensions
2586        let mut extensions = Extensions::new();
2587        extensions.insert("test-key");
2588        assert!(extensions.get::<&str>().is_some());
2589    }
2590
2591    #[test]
2592    fn test_get_options_builder() {
2593        let dt = Utc::now();
2594        let extensions = Extensions::new();
2595
2596        let options = GetOptions::new();
2597
2598        // assert defaults
2599        assert_eq!(options.if_match, None);
2600        assert_eq!(options.if_none_match, None);
2601        assert_eq!(options.if_modified_since, None);
2602        assert_eq!(options.if_unmodified_since, None);
2603        assert_eq!(options.range, None);
2604        assert_eq!(options.version, None);
2605        assert!(!options.head);
2606        assert!(options.extensions.get::<&str>().is_none());
2607
2608        let options = options
2609            .with_if_match(Some("etag-match"))
2610            .with_if_none_match(Some("etag-none-match"))
2611            .with_if_modified_since(Some(dt))
2612            .with_if_unmodified_since(Some(dt))
2613            .with_range(Some(0..100))
2614            .with_version(Some("version-1"))
2615            .with_head(true)
2616            .with_extensions(extensions.clone());
2617
2618        assert_eq!(options.if_match, Some("etag-match".to_string()));
2619        assert_eq!(options.if_none_match, Some("etag-none-match".to_string()));
2620        assert_eq!(options.if_modified_since, Some(dt));
2621        assert_eq!(options.if_unmodified_since, Some(dt));
2622        assert_eq!(options.range, Some(GetRange::Bounded(0..100)));
2623        assert_eq!(options.version, Some("version-1".to_string()));
2624        assert!(options.head);
2625        assert_eq!(options.extensions.get::<&str>(), extensions.get::<&str>());
2626    }
2627
2628    fn takes_generic_object_store<T: ObjectStore>(store: T) {
2629        // This function is just to ensure that the trait bounds are satisfied
2630        let _ = store;
2631    }
2632    #[test]
2633    fn test_dyn_impl() {
2634        let store: Arc<dyn ObjectStore> = Arc::new(memory::InMemory::new());
2635        takes_generic_object_store(store);
2636        let store: Box<dyn ObjectStore> = Box::new(memory::InMemory::new());
2637        takes_generic_object_store(store);
2638    }
2639    #[test]
2640    fn test_generic_impl() {
2641        let store = Arc::new(memory::InMemory::new());
2642        takes_generic_object_store(store);
2643        let store = Box::new(memory::InMemory::new());
2644        takes_generic_object_store(store);
2645    }
2646}