Skip to main content

object_store/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![cfg_attr(docsrs, feature(doc_cfg))]
19#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
20#![warn(
21    missing_copy_implementations,
22    missing_debug_implementations,
23    missing_docs,
24    clippy::explicit_iter_loop,
25    clippy::future_not_send,
26    clippy::use_self,
27    clippy::clone_on_ref_ptr,
28    unreachable_pub
29)]
30
31//! # object_store
32//!
33//! This crate provides a uniform API for interacting with object
34//! storage services and local files via the [`ObjectStore`]
35//! trait.
36//!
37//! Using this crate, the same binary and code can run in multiple
38//! clouds and local test environments, via a simple runtime
39//! configuration change.
40//!
41//! # Highlights
42//!
43//! 1. A high-performance async API focused on providing a consistent interface
44//!    mirroring that of object stores such as [S3]
45//!
46//! 2. Production quality, leading this crate to be used in large
47//!    scale production systems, such as [crates.io] and [InfluxDB IOx]
48//!
49//! 3. Support for advanced functionality, including atomic, conditional reads
50//!    and writes, vectored IO, bulk deletion, and more...
51//!
52//! 4. Stable and predictable governance via the [Apache Arrow] project
53//!
54//! 5. Small dependency footprint, depending on only a small number of common crates
55//!
56//! Originally developed by [InfluxData] and subsequently donated
57//! to [Apache Arrow].
58//!
59//! [Apache Arrow]: https://arrow.apache.org/
60//! [InfluxData]: https://www.influxdata.com/
61//! [crates.io]: https://github.com/rust-lang/crates.io
62//! [ACID]: https://en.wikipedia.org/wiki/ACID
63//! [S3]: https://aws.amazon.com/s3/
64//!
65//! # APIs
66//!
67//! * [`ObjectStore`]: Core object store API
68//! * [`ObjectStoreExt`]: (*New in 0.13.0*) Extension trait with additional convenience methods
69//!
70//! # Available [`ObjectStore`] Implementations
71//!
72//! By default, this crate provides the following implementations:
73//!
74//! * Memory: [`InMemory`](memory::InMemory)
75//!
76//! Feature flags are used to enable support for other implementations:
77//!
78#![cfg_attr(
79    feature = "fs",
80    doc = "* Local filesystem: [`LocalFileSystem`](local::LocalFileSystem)"
81)]
82#![cfg_attr(
83    feature = "gcp",
84    doc = "* [`gcp`]: [Google Cloud Storage](https://cloud.google.com/storage/) support. See [`GoogleCloudStorageBuilder`](gcp::GoogleCloudStorageBuilder)"
85)]
86#![cfg_attr(
87    feature = "aws",
88    doc = "* [`aws`]: [Amazon S3](https://aws.amazon.com/s3/). See [`AmazonS3Builder`](aws::AmazonS3Builder)"
89)]
90#![cfg_attr(
91    feature = "azure",
92    doc = "* [`azure`]: [Azure Blob Storage](https://azure.microsoft.com/en-gb/services/storage/blobs/). See [`MicrosoftAzureBuilder`](azure::MicrosoftAzureBuilder)"
93)]
94#![cfg_attr(
95    feature = "http",
96    doc = "* [`http`]: [HTTP/WebDAV Storage](https://datatracker.ietf.org/doc/html/rfc2518). See [`HttpBuilder`](http::HttpBuilder)"
97)]
98//!
99//! # Why not a Filesystem Interface?
100//!
101//! The [`ObjectStore`] interface is designed to mirror the APIs
102//! of object stores and *not* filesystems, and thus has stateless APIs instead
103//! of cursor based interfaces such as [`Read`] or [`Seek`] available in filesystems.
104//!
105//! This design provides the following advantages:
106//!
107//! * All operations are atomic, and readers cannot observe partial and/or failed writes
108//! * Methods map directly to object store APIs, providing both efficiency and predictability
109//! * Abstracts away filesystem and operating system specific quirks, ensuring portability
110//! * Allows for functionality not native to filesystems, such as operation preconditions
111//!   and atomic multipart uploads
112//!
113//! This crate does provide [`BufReader`] and [`BufWriter`] adapters
114//! which provide a more filesystem-like API for working with the
115//! [`ObjectStore`] trait, however, they should be used with care
116//!
117//! [`BufReader`]: buffered::BufReader
118//! [`BufWriter`]: buffered::BufWriter
119//!
120//! # Adapters
121//!
122//! [`ObjectStore`] instances can be composed with various adapters
123//! which add additional functionality:
124//!
125//! * Rate Throttling: [`ThrottleConfig`](throttle::ThrottleConfig)
126//! * Concurrent Request Limit: [`LimitStore`](limit::LimitStore)
127//!
128//! # Configuration System
129//!
130//! This crate provides a configuration system inspired by the APIs exposed by [fsspec],
131//! [PyArrow FileSystem], and [Hadoop FileSystem], allowing creating a [`DynObjectStore`]
132//! from a URL and an optional list of key value pairs. This provides a flexible interface
133//! to support a wide variety of user-defined store configurations, with minimal additional
134//! application complexity.
135//!
136//! ```no_run,ignore-wasm32
137//! # #[cfg(feature = "aws")] {
138//! # use url::Url;
139//! # use object_store::{parse_url, parse_url_opts};
140//! # use object_store::aws::{AmazonS3, AmazonS3Builder};
141//! #
142//! #
143//! // Can manually create a specific store variant using the appropriate builder
144//! let store: AmazonS3 = AmazonS3Builder::from_env()
145//!     .with_bucket_name("my-bucket").build().unwrap();
146//!
147//! // Alternatively can create an ObjectStore from an S3 URL
148//! let url = Url::parse("s3://bucket/path").unwrap();
149//! let (store, path) = parse_url(&url).unwrap();
150//! assert_eq!(path.as_ref(), "path");
151//!
152//! // Potentially with additional options
153//! let (store, path) = parse_url_opts(&url, vec![("aws_access_key_id", "...")]).unwrap();
154//!
155//! // Or with URLs that encode the bucket name in the URL path
156//! let url = Url::parse("https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket/path").unwrap();
157//! let (store, path) = parse_url(&url).unwrap();
158//! assert_eq!(path.as_ref(), "path");
159//! # }
160//! ```
161//!
162//! [PyArrow FileSystem]: https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.from_uri
163//! [fsspec]: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem
164//! [Hadoop FileSystem]: https://hadoop.apache.org/docs/r3.0.0/api/org/apache/hadoop/fs/FileSystem.html#get-java.net.URI-org.apache.hadoop.conf.Configuration-
165//!
166//! # List objects
167//!
168//! Use the [`ObjectStore::list`] method to iterate over objects in
169//! remote storage or files in the local filesystem:
170//!
171//! ```ignore-wasm32
172//! # use object_store::local::LocalFileSystem;
173//! # use std::sync::Arc;
174//! # use object_store::{path::Path, ObjectStore};
175//! # use futures::stream::StreamExt;
176//! # // use LocalFileSystem for example
177//! # fn get_object_store() -> Arc<dyn ObjectStore> {
178//! #   Arc::new(LocalFileSystem::new())
179//! # }
180//! #
181//! # async fn example() {
182//! #
183//! // create an ObjectStore
184//! let object_store: Arc<dyn ObjectStore> = get_object_store();
185//!
186//! // Recursively list all files below the 'data' path.
187//! // 1. On AWS S3 this would be the 'data/' prefix
188//! // 2. On a local filesystem, this would be the 'data' directory
189//! let prefix = Path::from("data");
190//!
191//! // Get an `async` stream of Metadata objects:
192//! let mut list_stream = object_store.list(Some(&prefix));
193//!
194//! // Print a line about each object
195//! while let Some(meta) = list_stream.next().await.transpose().unwrap() {
196//!     println!("Name: {}, size: {}", meta.location, meta.size);
197//! }
198//! # }
199//! ```
200//!
201//! Which will print out something like the following:
202//!
203//! ```text
204//! Name: data/file01.parquet, size: 112832
205//! Name: data/file02.parquet, size: 143119
206//! Name: data/child/file03.parquet, size: 100
207//! ...
208//! ```
209//!
210//! # Fetch objects
211//!
212//! Use the [`ObjectStoreExt::get`] / [`ObjectStore::get_opts`] method to fetch the data bytes
213//! from remote storage or files in the local filesystem as a stream.
214//!
215//! ```ignore-wasm32
216//! # use futures::TryStreamExt;
217//! # use object_store::local::LocalFileSystem;
218//! # use std::sync::Arc;
219//! #  use bytes::Bytes;
220//! # use object_store::{path::Path, ObjectStore, ObjectStoreExt, GetResult};
221//! # fn get_object_store() -> Arc<dyn ObjectStore> {
222//! #   Arc::new(LocalFileSystem::new())
223//! # }
224//! #
225//! # async fn example() {
226//! #
227//! // Create an ObjectStore
228//! let object_store: Arc<dyn ObjectStore> = get_object_store();
229//!
230//! // Retrieve a specific file
231//! let path = Path::from("data/file01.parquet");
232//!
233//! // Fetch just the file metadata
234//! let meta = object_store.head(&path).await.unwrap();
235//! println!("{meta:?}");
236//!
237//! // Fetch the object including metadata
238//! let result: GetResult = object_store.get(&path).await.unwrap();
239//! assert_eq!(result.meta, meta);
240//!
241//! // Buffer the entire object in memory
242//! let object: Bytes = result.bytes().await.unwrap();
243//! assert_eq!(object.len() as u64, meta.size);
244//!
245//! // Alternatively stream the bytes from object storage
246//! let stream = object_store.get(&path).await.unwrap().into_stream();
247//!
248//! // Count the '0's using `try_fold` from `TryStreamExt` trait
249//! let num_zeros = stream
250//!     .try_fold(0, |acc, bytes| async move {
251//!         Ok(acc + bytes.iter().filter(|b| **b == 0).count())
252//!     }).await.unwrap();
253//!
254//! println!("Num zeros in {} is {}", path, num_zeros);
255//! # }
256//! ```
257//!
258//! # Put Object
259//!
260//! Use the [`ObjectStoreExt::put`] method to atomically write data.
261//!
262//! ```ignore-wasm32
263//! # use object_store::local::LocalFileSystem;
264//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
265//! # use std::sync::Arc;
266//! # use object_store::path::Path;
267//! # fn get_object_store() -> Arc<dyn ObjectStore> {
268//! #   Arc::new(LocalFileSystem::new())
269//! # }
270//! # async fn put() {
271//! #
272//! let object_store: Arc<dyn ObjectStore> = get_object_store();
273//! let path = Path::from("data/file1");
274//! let payload = PutPayload::from_static(b"hello");
275//! object_store.put(&path, payload).await.unwrap();
276//! # }
277//! ```
278//!
279//! # Multipart Upload
280//!
281//! Use the [`ObjectStoreExt::put_multipart`] / [`ObjectStore::put_multipart_opts`] method to atomically write a large
282//! amount of data
283//!
284//! ```ignore-wasm32
285//! # use object_store::local::LocalFileSystem;
286//! # use object_store::{ObjectStore, ObjectStoreExt, WriteMultipart};
287//! # use std::sync::Arc;
288//! # use bytes::Bytes;
289//! # use tokio::io::AsyncWriteExt;
290//! # use object_store::path::Path;
291//! # fn get_object_store() -> Arc<dyn ObjectStore> {
292//! #   Arc::new(LocalFileSystem::new())
293//! # }
294//! # async fn multi_upload() {
295//! #
296//! let object_store: Arc<dyn ObjectStore> = get_object_store();
297//! let path = Path::from("data/large_file");
298//! let upload =  object_store.put_multipart(&path).await.unwrap();
299//! let mut write = WriteMultipart::new(upload);
300//! write.write(b"hello");
301//! write.finish().await.unwrap();
302//! # }
303//! ```
304//!
305//! # Vectored Read
306//!
307//! A common pattern, especially when reading structured datasets, is to need to fetch
308//! multiple, potentially non-contiguous, ranges of a particular object.
309//!
310//! [`ObjectStore::get_ranges`] provides an efficient way to perform such vectored IO, and will
311//! automatically coalesce adjacent ranges into an appropriate number of parallel requests.
312//!
313//! ```ignore-wasm32
314//! # use object_store::local::LocalFileSystem;
315//! # use object_store::ObjectStore;
316//! # use std::sync::Arc;
317//! # use bytes::Bytes;
318//! # use tokio::io::AsyncWriteExt;
319//! # use object_store::path::Path;
320//! # fn get_object_store() -> Arc<dyn ObjectStore> {
321//! #   Arc::new(LocalFileSystem::new())
322//! # }
323//! # async fn multi_upload() {
324//! #
325//! let object_store: Arc<dyn ObjectStore> = get_object_store();
326//! let path = Path::from("data/large_file");
327//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap();
328//! assert_eq!(ranges.len(), 3);
329//! assert_eq!(ranges[0].len(), 10);
330//! # }
331//! ```
332//!
333//! To retrieve ranges from a versioned object, use [`ObjectStore::get_opts`] by specifying the range in the [`GetOptions`].
334//!
335//! ```ignore-wasm32
336//! # use object_store::local::LocalFileSystem;
337//! # use object_store::ObjectStore;
338//! # use object_store::GetOptions;
339//! # use std::sync::Arc;
340//! # use bytes::Bytes;
341//! # use tokio::io::AsyncWriteExt;
342//! # use object_store::path::Path;
343//! # fn get_object_store() -> Arc<dyn ObjectStore> {
344//! #   Arc::new(LocalFileSystem::new())
345//! # }
346//! # async fn get_range_with_options() {
347//! #
348//! let object_store: Arc<dyn ObjectStore> = get_object_store();
349//! let path = Path::from("data/large_file");
350//! let ranges = vec![90..100, 400..600, 0..10];
351//! for range in ranges {
352//!     let opts = GetOptions::default().with_range(Some(range));
353//!     let data = object_store.get_opts(&path, opts).await.unwrap();
354//!     // Do something with the data
355//! }
356//! # }
357//! ``````
358//!
359//! # Vectored Write
360//!
361//! When writing data it is often the case that the size of the output is not known ahead of time.
362//!
363//! A common approach to handling this is to bump-allocate a `Vec`, whereby the underlying
364//! allocation is repeatedly reallocated, each time doubling the capacity. The performance of
365//! this is suboptimal as reallocating memory will often involve copying it to a new location.
366//!
367//! Fortunately, as [`PutPayload`] does not require memory regions to be contiguous, it is
368//! possible to instead allocate memory in chunks and avoid bump allocating. [`PutPayloadMut`]
369//! encapsulates this approach
370//!
371//! ```ignore-wasm32
372//! # use object_store::local::LocalFileSystem;
373//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayloadMut};
374//! # use std::sync::Arc;
375//! # use bytes::Bytes;
376//! # use tokio::io::AsyncWriteExt;
377//! # use object_store::path::Path;
378//! # fn get_object_store() -> Arc<dyn ObjectStore> {
379//! #   Arc::new(LocalFileSystem::new())
380//! # }
381//! # async fn multi_upload() {
382//! #
383//! let object_store: Arc<dyn ObjectStore> = get_object_store();
384//! let path = Path::from("data/large_file");
385//! let mut buffer = PutPayloadMut::new().with_block_size(8192);
386//! for _ in 0..22 {
387//!     buffer.extend_from_slice(&[0; 1024]);
388//! }
389//! let payload = buffer.freeze();
390//!
391//! // Payload consists of 3 separate 8KB allocations
392//! assert_eq!(payload.as_ref().len(), 3);
393//! assert_eq!(payload.as_ref()[0].len(), 8192);
394//! assert_eq!(payload.as_ref()[1].len(), 8192);
395//! assert_eq!(payload.as_ref()[2].len(), 6144);
396//!
397//! object_store.put(&path, payload).await.unwrap();
398//! # }
399//! ```
400//!
401//! # Conditional Fetch
402//!
403//! More complex object retrieval can be supported by [`ObjectStore::get_opts`].
404//!
405//! For example, efficiently refreshing a cache without re-fetching the entire object
406//! data if the object hasn't been modified.
407//!
408//! ```
409//! # use std::collections::btree_map::Entry;
410//! # use std::collections::HashMap;
411//! # use object_store::{GetOptions, GetResult, ObjectStore, ObjectStoreExt, Result, Error};
412//! # use std::sync::Arc;
413//! # use std::time::{Duration, Instant};
414//! # use bytes::Bytes;
415//! # use tokio::io::AsyncWriteExt;
416//! # use object_store::path::Path;
417//! struct CacheEntry {
418//!     /// Data returned by last request
419//!     data: Bytes,
420//!     /// ETag identifying the object returned by the server
421//!     e_tag: String,
422//!     /// Instant of last refresh
423//!     refreshed_at: Instant,
424//! }
425//!
426//! /// Example cache that checks entries after 10 seconds for a new version
427//! struct Cache {
428//!     entries: HashMap<Path, CacheEntry>,
429//!     store: Arc<dyn ObjectStore>,
430//! }
431//!
432//! impl Cache {
433//!     pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
434//!         Ok(match self.entries.get_mut(path) {
435//!             Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) {
436//!                 true => e.data.clone(), // Return cached data
437//!                 false => { // Check if remote version has changed
438//!                     let opts = GetOptions::new().with_if_none_match(Some(e.e_tag.clone()));
439//!                     match self.store.get_opts(&path, opts).await {
440//!                         Ok(d) => e.data = d.bytes().await?,
441//!                         Err(Error::NotModified { .. }) => {} // Data has not changed
442//!                         Err(e) => return Err(e),
443//!                     };
444//!                     e.refreshed_at = Instant::now();
445//!                     e.data.clone()
446//!                 }
447//!             },
448//!             None => { // Not cached, fetch data
449//!                 let get = self.store.get(&path).await?;
450//!                 let e_tag = get.meta.e_tag.clone();
451//!                 let data = get.bytes().await?;
452//!                 if let Some(e_tag) = e_tag {
453//!                     let entry = CacheEntry {
454//!                         e_tag,
455//!                         data: data.clone(),
456//!                         refreshed_at: Instant::now(),
457//!                     };
458//!                     self.entries.insert(path.clone(), entry);
459//!                 }
460//!                 data
461//!             }
462//!         })
463//!     }
464//! }
465//! ```
466//!
467//! # Conditional Put
468//!
469//! The default behaviour when writing data is to upsert any existing object at the given path,
470//! overwriting any previous value. More complex behaviours can be achieved using [`PutMode`], and
471//! can be used to build [Optimistic Concurrency Control] based transactions. This facilitates
472//! building metadata catalogs, such as [Apache Iceberg] or [Delta Lake], directly on top of object
473//! storage, without relying on a separate DBMS.
474//!
475//! ```
476//! # use object_store::{Error, ObjectStore, ObjectStoreExt, PutMode, UpdateVersion};
477//! # use std::sync::Arc;
478//! # use bytes::Bytes;
479//! # use tokio::io::AsyncWriteExt;
480//! # use object_store::memory::InMemory;
481//! # use object_store::path::Path;
482//! # fn get_object_store() -> Arc<dyn ObjectStore> {
483//! #   Arc::new(InMemory::new())
484//! # }
485//! # fn do_update(b: Bytes) -> Bytes {b}
486//! # async fn conditional_put() {
487//! let store = get_object_store();
488//! let path = Path::from("test");
489//!
490//! // Perform a conditional update on path
491//! loop {
492//!     // Perform get request
493//!     let r = store.get(&path).await.unwrap();
494//!
495//!     // Save version information fetched
496//!     let version = UpdateVersion {
497//!         e_tag: r.meta.e_tag.clone(),
498//!         version: r.meta.version.clone(),
499//!     };
500//!
501//!     // Compute new version of object contents
502//!     let new = do_update(r.bytes().await.unwrap());
503//!
504//!     // Attempt to commit transaction
505//!     match store.put_opts(&path, new.into(), PutMode::Update(version).into()).await {
506//!         Ok(_) => break, // Successfully committed
507//!         Err(Error::Precondition { .. }) => continue, // Object has changed, try again
508//!         Err(e) => panic!("{e}")
509//!     }
510//! }
511//! # }
512//! ```
513//!
514//! [Optimistic Concurrency Control]: https://en.wikipedia.org/wiki/Optimistic_concurrency_control
515//! [Apache Iceberg]: https://iceberg.apache.org/
516//! [Delta Lake]: https://delta.io/
517//!
518//! # TLS Certificates
519//!
520//! Stores that use HTTPS/TLS (this is true for most cloud stores) can choose the source of their [CA]
521//! certificates. By default the system-bundled certificates are used (see
522//! [`rustls-native-certs`]). The `tls-webpki-roots` feature switch can be used to also bundle Mozilla's
523//! root certificates with the library/application (see [`webpki-roots`]).
524//!
525//! [CA]: https://en.wikipedia.org/wiki/Certificate_authority
526//! [`rustls-native-certs`]: https://crates.io/crates/rustls-native-certs/
527//! [`webpki-roots`]: https://crates.io/crates/webpki-roots
528//!
529//! # Customizing HTTP Clients
530//!
531//! Many [`ObjectStore`] implementations permit customization of the HTTP client via
532//! the [`HttpConnector`] trait and utilities in the [`client`] module.
533//! Examples include injecting custom HTTP headers or using an alternate
534//! tokio Runtime I/O requests.
535//!
536//! [`HttpConnector`]: client::HttpConnector
537
538#[cfg(feature = "aws")]
539pub mod aws;
540#[cfg(feature = "azure")]
541pub mod azure;
542pub mod buffered;
543#[cfg(not(target_arch = "wasm32"))]
544pub mod chunked;
545pub mod delimited;
546#[cfg(feature = "gcp")]
547pub mod gcp;
548#[cfg(feature = "http")]
549pub mod http;
550pub mod limit;
551#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
552pub mod local;
553pub mod memory;
554pub mod path;
555pub mod prefix;
556pub mod registry;
557#[cfg(feature = "cloud")]
558pub mod signer;
559pub mod throttle;
560
561#[cfg(feature = "cloud")]
562pub mod client;
563
564#[cfg(feature = "cloud")]
565pub use client::{
566    ClientConfigKey, ClientOptions, CredentialProvider, StaticCredentialProvider,
567    backoff::BackoffConfig, retry::RetryConfig,
568};
569
570#[cfg(all(feature = "cloud", not(target_arch = "wasm32")))]
571pub use client::Certificate;
572
573#[cfg(feature = "cloud")]
574mod config;
575
576mod tags;
577
578pub use tags::TagSet;
579
580pub mod list;
581pub mod multipart;
582mod parse;
583mod payload;
584mod upload;
585mod util;
586
587mod attributes;
588
589#[cfg(any(feature = "integration", test))]
590pub mod integration;
591
592pub use attributes::*;
593
594pub use parse::{ObjectStoreScheme, parse_url, parse_url_opts};
595pub use payload::*;
596pub use upload::*;
597pub use util::{GetRange, OBJECT_STORE_COALESCE_DEFAULT, coalesce_ranges, collect_bytes};
598
599// Re-export HTTP types used in public API
600pub use ::http::{Extensions, HeaderMap, HeaderValue};
601
602use crate::path::Path;
603#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
604use crate::util::maybe_spawn_blocking;
605use async_trait::async_trait;
606use bytes::Bytes;
607use chrono::{DateTime, Utc};
608use futures::{StreamExt, TryStreamExt, stream::BoxStream};
609use std::fmt::{Debug, Formatter};
610#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
611use std::io::{Read, Seek, SeekFrom};
612use std::ops::Range;
613use std::sync::Arc;
614
615/// An alias for a dynamically dispatched object store implementation.
616pub type DynObjectStore = dyn ObjectStore;
617
618/// Id type for multipart uploads.
619pub type MultipartId = String;
620
621/// Universal API for object store services.
622///
623/// See the [module-level documentation](crate) for a high level overview and
624/// examples. See [`ObjectStoreExt`] for additional convenience methods.
625///
626/// # Contract
627/// This trait is a contract between object store _implementations_
628/// (e.g. providers, wrappers) and the `object_store` crate itself. It is
629/// intended to be the minimum API required for an object store.
630///
631/// The [`ObjectStoreExt`] acts as an API/contract between `object_store` and
632/// the store _users_ and provides additional methods that may be simpler to use
633/// but overlap in functionality with [`ObjectStore`].
634///
635/// # Minimal Default Implementations
636/// There are only a few default implementations for methods in this trait by
637/// design. This was different from versions prior to `0.13.0`, which had many
638/// more default implementations. Default implementations are convenient for
639/// users, but error-prone for implementors as they require keeping the
640/// convenience APIs correctly in sync.
641///
642/// As of version 0.13.0, most methods on [`ObjectStore`] must be implemented, and
643/// the convenience methods have been moved to the [`ObjectStoreExt`] trait as
644/// described above. See [#385] for more details.
645///
646/// [#385]: https://github.com/apache/arrow-rs-object-store/issues/385
647///
648/// # Wrappers
649/// If you wrap an [`ObjectStore`] -- e.g. to add observability -- you SHOULD
650/// implement all trait methods. This ensures that defaults implementations
651/// that are overwritten by the wrapped store are also used by the wrapper.
652/// For example:
653///
654/// ```ignore
655/// struct MyStore {
656///     ...
657/// }
658///
659/// #[async_trait]
660/// impl ObjectStore for MyStore {
661///     // implement custom ranges handling
662///     async fn get_ranges(
663///         &self,
664///         location: &Path,
665///         ranges: &[Range<u64>],
666///     ) -> Result<Vec<Bytes>> {
667///         ...
668///     }
669///
670///     ...
671/// }
672///
673/// struct Wrapper {
674///     inner: Arc<dyn ObjectStore>,
675/// }
676///
677/// #[async_trait]
678/// #[deny(clippy::missing_trait_methods)]
679/// impl ObjectStore for Wrapper {
680///     // If we would not implement this method,
681///     // we would get the trait default and not
682///     // use the actual implementation of `inner`.
683///     async fn get_ranges(
684///         &self,
685///         location: &Path,
686///         ranges: &[Range<u64>],
687///     ) -> Result<Vec<Bytes>> {
688///         ...
689///     }
690///
691///     ...
692/// }
693/// ```
694///
695/// To automatically detect this issue, use
696/// [`#[deny(clippy::missing_trait_methods)]`](https://rust-lang.github.io/rust-clippy/master/index.html#missing_trait_methods).
697///
698/// # Upgrade Guide for 0.13.0
699///
700/// Upgrading to object_store 0.13.0 from an earlier version typically involves:
701///
702/// 1. Add a `use` for [`ObjectStoreExt`] to solve the error
703///
704/// ```text
705/// error[E0599]: no method named `put` found for reference `&dyn object_store::ObjectStore` in the current scope
706///    --> datafusion/datasource/src/url.rs:993:14
707/// ```
708///
709/// 2. Remove any (now) redundant implementations (such as `ObjectStore::put`) from any
710///   `ObjectStore` implementations to resolve the error
711///
712/// ```text
713/// error[E0407]: method `put` is not a member of trait `ObjectStore`
714///     --> datafusion/datasource/src/url.rs:1103:9
715///      |
716/// ```
717///
718/// 3. Convert `ObjectStore::delete` to [`ObjectStore::delete_stream`] (see documentation
719///    on that method for details and examples)
720///
721/// 4. Combine `ObjectStore::copy` and `ObjectStore::copy_if_not_exists` implementations into
722///    [`ObjectStore::copy_opts`] (see documentation on that method for details and examples)
723///
724/// 5. Update `object_store::Error::NotImplemented` to include the name of the missing method
725///
726/// For example, change instances of
727/// ```text
728/// object_store::Error::NotImplemented
729/// ```
730/// to
731/// ```
732/// object_store::Error::NotImplemented {
733///    operation: "put".to_string(),
734///    implementer: "RequestCountingObjectStore".to_string(),
735///  };
736/// ```
737///
738#[async_trait]
739pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
740    /// Save the provided `payload` to `location` with the given options
741    ///
742    /// The operation is guaranteed to be atomic, it will either successfully
743    /// write the entirety of `payload` to `location`, or fail. No clients
744    /// should be able to observe a partially written object
745    async fn put_opts(
746        &self,
747        location: &Path,
748        payload: PutPayload,
749        opts: PutOptions,
750    ) -> Result<PutResult>;
751
752    /// Perform a multipart upload with options
753    ///
754    /// Client should prefer [`ObjectStore::put_opts`] for small payloads, as streaming uploads
755    /// typically require multiple separate requests. See [`MultipartUpload`] for more information
756    ///
757    /// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
758    async fn put_multipart_opts(
759        &self,
760        location: &Path,
761        opts: PutMultipartOptions,
762    ) -> Result<Box<dyn MultipartUpload>>;
763
764    /// Perform a get request with options
765    ///
766    /// ## Example
767    ///
768    /// This example uses a basic local filesystem object store to get an object with a specific etag.
769    /// On the local filesystem, supplying an invalid etag will error.
770    /// Versioned object stores will return the specified object version, if it exists.
771    ///
772    /// ```ignore-wasm32
773    /// # use object_store::local::LocalFileSystem;
774    /// # use tempfile::tempdir;
775    /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt, GetOptions};
776    /// async fn get_opts_example() {
777    ///     let tmp = tempdir().unwrap();
778    ///     let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
779    ///     let location = Path::from("example.txt");
780    ///     let content = b"Hello, Object Store!";
781    ///
782    ///     // Put the object into the store
783    ///     store
784    ///         .put(&location, content.as_ref().into())
785    ///         .await
786    ///         .expect("Failed to put object");
787    ///
788    ///     // Get the object from the store to figure out the right etag
789    ///     let result: object_store::GetResult = store.get(&location).await.expect("Failed to get object");
790    ///
791    ///     let etag = result.meta.e_tag.expect("ETag should be present");
792    ///
793    ///     // Get the object from the store with range and etag
794    ///     let bytes = store
795    ///         .get_opts(
796    ///             &location,
797    ///             GetOptions::new()
798    ///                 .with_if_match(Some(etag.clone())),
799    ///         )
800    ///         .await
801    ///         .expect("Failed to get object with range and etag")
802    ///         .bytes()
803    ///         .await
804    ///         .expect("Failed to read bytes");
805    ///
806    ///     println!(
807    ///         "Retrieved with ETag {}: {}",
808    ///         etag,
809    ///         String::from_utf8_lossy(&bytes)
810    ///     );
811    ///
812    ///     // Show that if the etag does not match, we get an error
813    ///     let wrong_etag = "wrong-etag".to_string();
814    ///     match store
815    ///         .get_opts(
816    ///             &location,
817    ///             GetOptions::new().with_if_match(Some(wrong_etag))
818    ///         )
819    ///         .await
820    ///     {
821    ///         Ok(_) => println!("Unexpectedly succeeded with wrong ETag"),
822    ///         Err(e) => println!("On a non-versioned object store, getting an invalid ETag ('wrong-etag') results in an error as expected: {}", e),
823    ///     }
824    /// }
825    /// ```
826    ///
827    /// To retrieve a range of bytes from a versioned object, specify the range in the [`GetOptions`] supplied to this method.
828    ///
829    /// ```ignore-wasm32
830    /// # use object_store::local::LocalFileSystem;
831    /// # use tempfile::tempdir;
832    /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt, GetOptions};
833    /// async fn get_opts_range_example() {
834    ///     let tmp = tempdir().unwrap();
835    ///     let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
836    ///     let location = Path::from("example.txt");
837    ///     let content = b"Hello, Object Store!";
838    ///
839    ///     // Put the object into the store
840    ///     store
841    ///         .put(&location, content.as_ref().into())
842    ///         .await
843    ///         .expect("Failed to put object");
844    ///
845    ///     // Get the object from the store to figure out the right etag
846    ///     let result: object_store::GetResult = store.get(&location).await.expect("Failed to get object");
847    ///
848    ///     let etag = result.meta.e_tag.expect("ETag should be present");
849    ///
850    ///     // Get the object from the store with range and etag
851    ///     let bytes = store
852    ///         .get_opts(
853    ///             &location,
854    ///             GetOptions::new()
855    ///                 .with_range(Some(0..5))
856    ///                 .with_if_match(Some(etag.clone())),
857    ///         )
858    ///         .await
859    ///         .expect("Failed to get object with range and etag")
860    ///         .bytes()
861    ///         .await
862    ///         .expect("Failed to read bytes");
863    ///
864    ///     println!(
865    ///         "Retrieved range [0-5] with ETag {}: {}",
866    ///         etag,
867    ///         String::from_utf8_lossy(&bytes)
868    ///     );
869    ///
870    ///     // Show that if the etag does not match, we get an error
871    ///     let wrong_etag = "wrong-etag".to_string();
872    ///     match store
873    ///         .get_opts(
874    ///             &location,
875    ///             GetOptions::new().with_range(Some(0..5)).with_if_match(Some(wrong_etag))
876    ///         )
877    ///         .await
878    ///     {
879    ///         Ok(_) => println!("Unexpectedly succeeded with wrong ETag"),
880    ///         Err(e) => println!("On a non-versioned object store, getting an invalid ETag ('wrong-etag') results in an error as expected: {}", e),
881    ///     }
882    /// }
883    /// ```
884    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
885
886    /// Return the bytes that are stored at the specified location
887    /// in the given byte ranges
888    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
889        coalesce_ranges(
890            ranges,
891            |range| self.get_range(location, range),
892            OBJECT_STORE_COALESCE_DEFAULT,
893        )
894        .await
895    }
896
897    /// Delete all the objects at the specified locations
898    ///
899    /// When supported, this method will use bulk operations that delete more
900    /// than one object per a request. Otherwise, the implementation may call
901    /// the single object delete method for each location.
902    ///
903    /// # Bulk Delete Support
904    ///
905    /// The following backends support native bulk delete operations:
906    ///
907    /// - **AWS (S3)**: Uses the native [DeleteObjects] API with batches of up to 1000 objects
908    /// - **Azure**: Uses the native [Blob Batch] API with batches of up to 256 objects
909    ///
910    /// The following backends use concurrent individual delete operations:
911    ///
912    /// - **GCP**: Performs individual delete requests with up to 10 concurrent operations
913    /// - **HTTP**: Performs individual delete requests with up to 10 concurrent operations
914    /// - **Local**: Performs individual file deletions with up to 10 concurrent operations
915    /// - **Memory**: Performs individual in-memory deletions sequentially
916    ///
917    /// [DeleteObjects]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
918    /// [Blob Batch]: https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch
919    ///
920    /// The returned stream yields the results of the delete operations in the
921    /// same order as the input locations. However, some errors will be from
922    /// an overall call to a bulk delete operation, and not from a specific
923    /// location.
924    ///
925    /// If the object did not exist, the result may be an error or a success,
926    /// depending on the behavior of the underlying store. For example, local
927    /// filesystems, GCP, and Azure return an error, while S3 and in-memory will
928    /// return Ok. If it is an error, it will be [`Error::NotFound`].
929    ///
930    /// ```ignore-wasm32
931    /// # use futures::{StreamExt, TryStreamExt};
932    /// # use object_store::local::LocalFileSystem;
933    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
934    /// # let root = tempfile::TempDir::new().unwrap();
935    /// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
936    /// # use object_store::{ObjectStore, ObjectStoreExt, ObjectMeta};
937    /// # use object_store::path::Path;
938    /// # use futures::{StreamExt, TryStreamExt};
939    /// #
940    /// // Create two objects
941    /// store.put(&Path::from("foo"), "foo".into()).await?;
942    /// store.put(&Path::from("bar"), "bar".into()).await?;
943    ///
944    /// // List object
945    /// let locations = store.list(None).map_ok(|m| m.location).boxed();
946    ///
947    /// // Delete them
948    /// store.delete_stream(locations).try_collect::<Vec<Path>>().await?;
949    /// # Ok(())
950    /// # }
951    /// # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
952    /// # rt.block_on(example()).unwrap();
953    /// ```
954    ///
955    /// Note: Before version 0.13, `delete_stream` has a default implementation
956    /// that deletes each object with up to 10 concurrent requests. This default
957    /// behavior has been removed, and each implementation must now provide its
958    /// own `delete_stream` implementation explicitly. The following example
959    /// shows how to implement `delete_stream` to get the previous default
960    /// behavior.
961    ///
962    /// ```
963    /// # use async_trait::async_trait;
964    /// # use futures::stream::{BoxStream, StreamExt};
965    /// # use object_store::path::Path;
966    /// # use object_store::{
967    /// #     CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
968    /// #     PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
969    /// # };
970    /// # use std::fmt;
971    /// # use std::fmt::Debug;
972    /// # use std::sync::Arc;
973    /// #
974    /// # struct ExampleClient;
975    /// #
976    /// # impl ExampleClient {
977    /// #     async fn delete(&self, _path: &Path) -> Result<()> {
978    /// #         Ok(())
979    /// #     }
980    /// # }
981    /// #
982    /// # struct ExampleStore {
983    /// #     client: Arc<ExampleClient>,
984    /// # }
985    /// #
986    /// # impl Debug for ExampleStore {
987    /// #     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
988    /// #         write!(f, "ExampleStore")
989    /// #     }
990    /// # }
991    /// #
992    /// # impl fmt::Display for ExampleStore {
993    /// #     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
994    /// #         write!(f, "ExampleStore")
995    /// #     }
996    /// # }
997    /// #
998    /// # #[async_trait]
999    /// # impl ObjectStore for ExampleStore {
1000    /// #     async fn put_opts(&self, _: &Path, _: PutPayload, _: PutOptions) -> Result<PutResult> {
1001    /// #         todo!()
1002    /// #     }
1003    /// #
1004    /// #     async fn put_multipart_opts(
1005    /// #         &self,
1006    /// #         _: &Path,
1007    /// #         _: PutMultipartOptions,
1008    /// #     ) -> Result<Box<dyn MultipartUpload>> {
1009    /// #         todo!()
1010    /// #     }
1011    /// #
1012    /// #     async fn get_opts(&self, _: &Path, _: GetOptions) -> Result<GetResult> {
1013    /// #         todo!()
1014    /// #     }
1015    /// #
1016    /// fn delete_stream(
1017    ///     &self,
1018    ///     locations: BoxStream<'static, Result<Path>>,
1019    /// ) -> BoxStream<'static, Result<Path>> {
1020    ///     let client = Arc::clone(&self.client);
1021    ///     locations
1022    ///         .map(move |location| {
1023    ///             let client = Arc::clone(&client);
1024    ///             async move {
1025    ///                 let location = location?;
1026    ///                 client.delete(&location).await?;
1027    ///                 Ok(location)
1028    ///             }
1029    ///         })
1030    ///         .buffered(10)
1031    ///         .boxed()
1032    /// }
1033    /// #
1034    /// #     fn list(&self, _: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
1035    /// #         todo!()
1036    /// #     }
1037    /// #
1038    /// #     async fn list_with_delimiter(&self, _: Option<&Path>) -> Result<ListResult> {
1039    /// #         todo!()
1040    /// #     }
1041    /// #
1042    /// #     async fn copy_opts(&self, _: &Path, _: &Path, _: CopyOptions) -> Result<()> {
1043    /// #         todo!()
1044    /// #     }
1045    /// # }
1046    /// #
1047    /// # async fn example() {
1048    /// #     let store = ExampleStore { client: Arc::new(ExampleClient) };
1049    /// #     let paths = futures::stream::iter(vec![Ok(Path::from("foo")), Ok(Path::from("bar"))]).boxed();
1050    /// #     let results = store.delete_stream(paths).collect::<Vec<_>>().await;
1051    /// #     assert_eq!(results.len(), 2);
1052    /// #     assert_eq!(results[0].as_ref().unwrap(), &Path::from("foo"));
1053    /// #     assert_eq!(results[1].as_ref().unwrap(), &Path::from("bar"));
1054    /// # }
1055    /// #
1056    /// # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
1057    /// # rt.block_on(example());
1058    /// ```
1059    fn delete_stream(
1060        &self,
1061        locations: BoxStream<'static, Result<Path>>,
1062    ) -> BoxStream<'static, Result<Path>>;
1063
1064    /// List all the objects with the given prefix.
1065    ///
1066    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar` is a prefix of `foo/bar/x` but not of
1067    /// `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x` will be included.
1068    ///
1069    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
1070    ///
1071    /// For more advanced listing see [`PaginatedListStore`](list::PaginatedListStore)
1072    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>;
1073
1074    /// List all the objects with the given prefix and a location greater than `offset`
1075    ///
1076    /// Some stores, such as S3 and GCS, may be able to push `offset` down to reduce
1077    /// the number of network requests required
1078    ///
1079    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
1080    ///
1081    /// For more advanced listing see [`PaginatedListStore`](list::PaginatedListStore)
1082    fn list_with_offset(
1083        &self,
1084        prefix: Option<&Path>,
1085        offset: &Path,
1086    ) -> BoxStream<'static, Result<ObjectMeta>> {
1087        let offset = offset.clone();
1088        self.list(prefix)
1089            .try_filter(move |f| futures::future::ready(f.location > offset))
1090            .boxed()
1091    }
1092
1093    /// List objects with the given prefix and an implementation specific
1094    /// delimiter. Returns common prefixes (directories) in addition to object
1095    /// metadata.
1096    ///
1097    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar` is a prefix of `foo/bar/x` but not of
1098    /// `foo/bar_baz/x`. List is not recursive, i.e. `foo/bar/more/x` will not be included.
1099    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
1100
1101    /// Copy an object from one path to another in the same object store.
1102    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()>;
1103
1104    /// Move an object from one path to another in the same object store.
1105    ///
1106    /// By default, this is implemented as a copy and then delete source. It may not
1107    /// check when deleting source that it was the same object that was originally copied.
1108    async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
1109        let RenameOptions {
1110            target_mode,
1111            extensions,
1112        } = options;
1113        let copy_mode = match target_mode {
1114            RenameTargetMode::Overwrite => CopyMode::Overwrite,
1115            RenameTargetMode::Create => CopyMode::Create,
1116        };
1117        let copy_options = CopyOptions {
1118            mode: copy_mode,
1119            extensions,
1120        };
1121        self.copy_opts(from, to, copy_options).await?;
1122        self.delete(from).await?;
1123        Ok(())
1124    }
1125}
1126
1127macro_rules! as_ref_impl {
1128    ($type:ty) => {
1129        #[async_trait]
1130        #[deny(clippy::missing_trait_methods)]
1131        impl<T: ObjectStore + ?Sized> ObjectStore for $type {
1132            async fn put_opts(
1133                &self,
1134                location: &Path,
1135                payload: PutPayload,
1136                opts: PutOptions,
1137            ) -> Result<PutResult> {
1138                self.as_ref().put_opts(location, payload, opts).await
1139            }
1140
1141            async fn put_multipart_opts(
1142                &self,
1143                location: &Path,
1144                opts: PutMultipartOptions,
1145            ) -> Result<Box<dyn MultipartUpload>> {
1146                self.as_ref().put_multipart_opts(location, opts).await
1147            }
1148
1149            async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
1150                self.as_ref().get_opts(location, options).await
1151            }
1152
1153            async fn get_ranges(
1154                &self,
1155                location: &Path,
1156                ranges: &[Range<u64>],
1157            ) -> Result<Vec<Bytes>> {
1158                self.as_ref().get_ranges(location, ranges).await
1159            }
1160
1161            fn delete_stream(
1162                &self,
1163                locations: BoxStream<'static, Result<Path>>,
1164            ) -> BoxStream<'static, Result<Path>> {
1165                self.as_ref().delete_stream(locations)
1166            }
1167
1168            fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
1169                self.as_ref().list(prefix)
1170            }
1171
1172            fn list_with_offset(
1173                &self,
1174                prefix: Option<&Path>,
1175                offset: &Path,
1176            ) -> BoxStream<'static, Result<ObjectMeta>> {
1177                self.as_ref().list_with_offset(prefix, offset)
1178            }
1179
1180            async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
1181                self.as_ref().list_with_delimiter(prefix).await
1182            }
1183
1184            async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
1185                self.as_ref().copy_opts(from, to, options).await
1186            }
1187
1188            async fn rename_opts(
1189                &self,
1190                from: &Path,
1191                to: &Path,
1192                options: RenameOptions,
1193            ) -> Result<()> {
1194                self.as_ref().rename_opts(from, to, options).await
1195            }
1196        }
1197    };
1198}
1199
1200as_ref_impl!(Arc<T>);
1201as_ref_impl!(Box<T>);
1202
1203/// Extension trait for [`ObjectStore`] with convenience functions.
1204///
1205/// See the [module-level documentation](crate) for a high leve overview and
1206/// examples. See "contract" section within the [`ObjectStore`] documentation
1207/// for more reasoning.
1208///
1209/// # Implementation
1210/// You MUST NOT implement this trait yourself. It is automatically implemented for all [`ObjectStore`] implementations.
1211pub trait ObjectStoreExt: ObjectStore {
1212    /// Save the provided bytes to the specified location
1213    ///
1214    /// The operation is guaranteed to be atomic, it will either successfully
1215    /// write the entirety of `payload` to `location`, or fail. No clients
1216    /// should be able to observe a partially written object
1217    fn put(&self, location: &Path, payload: PutPayload) -> impl Future<Output = Result<PutResult>>;
1218
1219    /// Perform a multipart upload
1220    ///
1221    /// Client should prefer [`ObjectStoreExt::put`] for small payloads, as streaming uploads
1222    /// typically require multiple separate requests. See [`MultipartUpload`] for more information
1223    ///
1224    /// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
1225    fn put_multipart(
1226        &self,
1227        location: &Path,
1228    ) -> impl Future<Output = Result<Box<dyn MultipartUpload>>>;
1229
1230    /// Return the bytes that are stored at the specified location.
1231    ///
1232    /// ## Example
1233    ///
1234    /// This example uses a basic local filesystem object store to get an object.
1235    ///
1236    /// ```ignore-wasm32
1237    /// # use object_store::local::LocalFileSystem;
1238    /// # use tempfile::tempdir;
1239    /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
1240    /// async fn get_example() {
1241    ///     let tmp = tempdir().unwrap();
1242    ///     let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
1243    ///     let location = Path::from("example.txt");
1244    ///     let content = b"Hello, Object Store!";
1245    ///
1246    ///     // Put the object into the store
1247    ///     store
1248    ///         .put(&location, content.as_ref().into())
1249    ///         .await
1250    ///         .expect("Failed to put object");
1251    ///
1252    ///     // Get the object from the store
1253    ///     let get_result = store.get(&location).await.expect("Failed to get object");
1254    ///     let bytes = get_result.bytes().await.expect("Failed to read bytes");
1255    ///     println!("Retrieved content: {}", String::from_utf8_lossy(&bytes));
1256    /// }
1257    /// ```
1258    fn get(&self, location: &Path) -> impl Future<Output = Result<GetResult>>;
1259
1260    /// Return the bytes that are stored at the specified location
1261    /// in the given byte range.
1262    ///
1263    /// See [`GetRange::Bounded`] for more details on how `range` gets interpreted.
1264    ///
1265    /// To retrieve a range of bytes from a versioned object, use [`ObjectStore::get_opts`] by specifying the range in the [`GetOptions`].
1266    ///
1267    /// ## Examples
1268    ///
1269    /// This example uses a basic local filesystem object store to get a byte range from an object.
1270    ///
1271    /// ```ignore-wasm32
1272    /// # use object_store::local::LocalFileSystem;
1273    /// # use tempfile::tempdir;
1274    /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
1275    /// async fn get_range_example() {
1276    ///     let tmp = tempdir().unwrap();
1277    ///     let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
1278    ///     let location = Path::from("example.txt");
1279    ///     let content = b"Hello, Object Store!";
1280    ///
1281    ///     // Put the object into the store
1282    ///     store
1283    ///         .put(&location, content.as_ref().into())
1284    ///         .await
1285    ///         .expect("Failed to put object");
1286    ///
1287    ///     // Get the object from the store
1288    ///     let bytes = store
1289    ///         .get_range(&location, 0..5)
1290    ///         .await
1291    ///         .expect("Failed to get object");
1292    ///     println!("Retrieved range [0-5]: {}", String::from_utf8_lossy(&bytes));
1293    /// }
1294    /// ```
1295    fn get_range(&self, location: &Path, range: Range<u64>) -> impl Future<Output = Result<Bytes>>;
1296
1297    /// Return the metadata for the specified location
1298    fn head(&self, location: &Path) -> impl Future<Output = Result<ObjectMeta>>;
1299
1300    /// Delete the object at the specified location.
1301    fn delete(&self, location: &Path) -> impl Future<Output = Result<()>>;
1302
1303    /// Copy an object from one path to another in the same object store.
1304    ///
1305    /// If there exists an object at the destination, it will be overwritten.
1306    fn copy(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1307
1308    /// Copy an object from one path to another, only if destination is empty.
1309    ///
1310    /// Will return an error if the destination already has an object.
1311    ///
1312    /// Performs an atomic operation if the underlying object storage supports it.
1313    /// If atomic operations are not supported by the underlying object storage (like S3)
1314    /// it will return an error.
1315    fn copy_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1316
1317    /// Move an object from one path to another in the same object store.
1318    ///
1319    /// By default, this is implemented as a copy and then delete source. It may not
1320    /// check when deleting source that it was the same object that was originally copied.
1321    ///
1322    /// If there exists an object at the destination, it will be overwritten.
1323    fn rename(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1324
1325    /// Move an object from one path to another in the same object store.
1326    ///
1327    /// Will return an error if the destination already has an object.
1328    fn rename_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1329}
1330
1331impl<T> ObjectStoreExt for T
1332where
1333    T: ObjectStore + ?Sized,
1334{
1335    async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
1336        self.put_opts(location, payload, PutOptions::default())
1337            .await
1338    }
1339
1340    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
1341        self.put_multipart_opts(location, PutMultipartOptions::default())
1342            .await
1343    }
1344
1345    async fn get(&self, location: &Path) -> Result<GetResult> {
1346        self.get_opts(location, GetOptions::default()).await
1347    }
1348
1349    async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
1350        let options = GetOptions::new().with_range(Some(range));
1351        self.get_opts(location, options).await?.bytes().await
1352    }
1353
1354    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
1355        let options = GetOptions::new().with_head(true);
1356        Ok(self.get_opts(location, options).await?.meta)
1357    }
1358
1359    async fn delete(&self, location: &Path) -> Result<()> {
1360        let location = location.clone();
1361        let mut stream =
1362            self.delete_stream(futures::stream::once(async move { Ok(location) }).boxed());
1363        let _path = stream.try_next().await?.ok_or_else(|| Error::Generic {
1364            store: "ext",
1365            source: "`delete_stream` with one location should yield once but didn't".into(),
1366        })?;
1367        if stream.next().await.is_some() {
1368            Err(Error::Generic {
1369                store: "ext",
1370                source: "`delete_stream` with one location expected to yield exactly once, but yielded more than once".into(),
1371            })
1372        } else {
1373            Ok(())
1374        }
1375    }
1376
1377    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
1378        let options = CopyOptions::new().with_mode(CopyMode::Overwrite);
1379        self.copy_opts(from, to, options).await
1380    }
1381
1382    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
1383        let options = CopyOptions::new().with_mode(CopyMode::Create);
1384        self.copy_opts(from, to, options).await
1385    }
1386
1387    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
1388        let options = RenameOptions::new().with_target_mode(RenameTargetMode::Overwrite);
1389        self.rename_opts(from, to, options).await
1390    }
1391
1392    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
1393        let options = RenameOptions::new().with_target_mode(RenameTargetMode::Create);
1394        self.rename_opts(from, to, options).await
1395    }
1396}
1397
1398/// Result of a list call that includes objects, prefixes (directories) and a
1399/// token for the next set of results. Individual result sets may be limited to
1400/// 1,000 objects based on the underlying object storage's limitations.
1401#[derive(Debug)]
1402pub struct ListResult {
1403    /// Prefixes that are common (like directories)
1404    pub common_prefixes: Vec<Path>,
1405    /// Object metadata for the listing
1406    pub objects: Vec<ObjectMeta>,
1407}
1408
1409/// The metadata that describes an object.
1410#[derive(Debug, Clone, PartialEq, Eq)]
1411pub struct ObjectMeta {
1412    /// The full path to the object
1413    pub location: Path,
1414    /// The last modified time
1415    pub last_modified: DateTime<Utc>,
1416    /// The size in bytes of the object.
1417    ///
1418    /// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM
1419    pub size: u64,
1420    /// The unique identifier for the object
1421    ///
1422    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
1423    pub e_tag: Option<String>,
1424    /// A version indicator for this object
1425    pub version: Option<String>,
1426}
1427
1428/// Options for a get request, such as range
1429#[derive(Debug, Default, Clone)]
1430pub struct GetOptions {
1431    /// Request will succeed if the `ObjectMeta::e_tag` matches
1432    /// otherwise returning [`Error::Precondition`]
1433    ///
1434    /// See <https://datatracker.ietf.org/doc/html/rfc9110#name-if-match>
1435    ///
1436    /// Examples:
1437    ///
1438    /// ```text
1439    /// If-Match: "xyzzy"
1440    /// If-Match: "xyzzy", "r2d2xxxx", "c3piozzzz"
1441    /// If-Match: *
1442    /// ```
1443    pub if_match: Option<String>,
1444    /// Request will succeed if the `ObjectMeta::e_tag` does not match
1445    /// otherwise returning [`Error::NotModified`]
1446    ///
1447    /// See <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.2>
1448    ///
1449    /// Examples:
1450    ///
1451    /// ```text
1452    /// If-None-Match: "xyzzy"
1453    /// If-None-Match: "xyzzy", "r2d2xxxx", "c3piozzzz"
1454    /// If-None-Match: *
1455    /// ```
1456    pub if_none_match: Option<String>,
1457    /// Request will succeed if the object has been modified since
1458    ///
1459    /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.3>
1460    pub if_modified_since: Option<DateTime<Utc>>,
1461    /// Request will succeed if the object has not been modified since
1462    /// otherwise returning [`Error::Precondition`]
1463    ///
1464    /// Some stores, such as S3, will only return `NotModified` for exact
1465    /// timestamp matches, instead of for any timestamp greater than or equal.
1466    ///
1467    /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.4>
1468    pub if_unmodified_since: Option<DateTime<Utc>>,
1469    /// Request transfer of only the specified range of bytes
1470    /// otherwise returning [`Error::NotModified`]
1471    ///
1472    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
1473    pub range: Option<GetRange>,
1474    /// Request a particular object version
1475    pub version: Option<String>,
1476    /// Request transfer of no content
1477    ///
1478    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-head>
1479    pub head: bool,
1480    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1481    /// that need to pass context-specific information (like tracing spans) via trait methods.
1482    ///
1483    /// These extensions are ignored entirely by backends offered through this crate.
1484    pub extensions: Extensions,
1485}
1486
1487impl GetOptions {
1488    /// Returns an error if the modification conditions on this request are not satisfied
1489    ///
1490    /// <https://datatracker.ietf.org/doc/html/rfc7232#section-6>
1491    pub fn check_preconditions(&self, meta: &ObjectMeta) -> Result<()> {
1492        // The use of the invalid etag "*" means no ETag is equivalent to never matching
1493        let etag = meta.e_tag.as_deref().unwrap_or("*");
1494        let last_modified = meta.last_modified;
1495
1496        if let Some(m) = &self.if_match {
1497            if m != "*" && m.split(',').map(str::trim).all(|x| x != etag) {
1498                return Err(Error::Precondition {
1499                    path: meta.location.to_string(),
1500                    source: format!("{etag} does not match {m}").into(),
1501                });
1502            }
1503        } else if let Some(date) = self.if_unmodified_since {
1504            if last_modified > date {
1505                return Err(Error::Precondition {
1506                    path: meta.location.to_string(),
1507                    source: format!("{date} < {last_modified}").into(),
1508                });
1509            }
1510        }
1511
1512        if let Some(m) = &self.if_none_match {
1513            if m == "*" || m.split(',').map(str::trim).any(|x| x == etag) {
1514                return Err(Error::NotModified {
1515                    path: meta.location.to_string(),
1516                    source: format!("{etag} matches {m}").into(),
1517                });
1518            }
1519        } else if let Some(date) = self.if_modified_since {
1520            if last_modified <= date {
1521                return Err(Error::NotModified {
1522                    path: meta.location.to_string(),
1523                    source: format!("{date} >= {last_modified}").into(),
1524                });
1525            }
1526        }
1527        Ok(())
1528    }
1529
1530    /// Create a new [`GetOptions`]
1531    pub fn new() -> Self {
1532        Self::default()
1533    }
1534
1535    /// Sets the `if_match` condition.
1536    ///
1537    /// See [`GetOptions::if_match`]
1538    #[must_use]
1539    pub fn with_if_match(mut self, etag: Option<impl Into<String>>) -> Self {
1540        self.if_match = etag.map(Into::into);
1541        self
1542    }
1543
1544    /// Sets the `if_none_match` condition.
1545    ///
1546    /// See [`GetOptions::if_none_match`]
1547    #[must_use]
1548    pub fn with_if_none_match(mut self, etag: Option<impl Into<String>>) -> Self {
1549        self.if_none_match = etag.map(Into::into);
1550        self
1551    }
1552
1553    /// Sets the `if_modified_since` condition.
1554    ///
1555    /// See [`GetOptions::if_modified_since`]
1556    #[must_use]
1557    pub fn with_if_modified_since(mut self, dt: Option<impl Into<DateTime<Utc>>>) -> Self {
1558        self.if_modified_since = dt.map(Into::into);
1559        self
1560    }
1561
1562    /// Sets the `if_unmodified_since` condition.
1563    ///
1564    /// See [`GetOptions::if_unmodified_since`]
1565    #[must_use]
1566    pub fn with_if_unmodified_since(mut self, dt: Option<impl Into<DateTime<Utc>>>) -> Self {
1567        self.if_unmodified_since = dt.map(Into::into);
1568        self
1569    }
1570
1571    /// Sets the `range` condition.
1572    ///
1573    /// See [`GetOptions::range`]
1574    #[must_use]
1575    pub fn with_range(mut self, range: Option<impl Into<GetRange>>) -> Self {
1576        self.range = range.map(Into::into);
1577        self
1578    }
1579
1580    /// Sets the `version` condition.
1581    ///
1582    /// See [`GetOptions::version`]
1583    #[must_use]
1584    pub fn with_version(mut self, version: Option<impl Into<String>>) -> Self {
1585        self.version = version.map(Into::into);
1586        self
1587    }
1588
1589    /// Sets the `head` condition.
1590    ///
1591    /// See [`GetOptions::head`]
1592    #[must_use]
1593    pub fn with_head(mut self, head: impl Into<bool>) -> Self {
1594        self.head = head.into();
1595        self
1596    }
1597
1598    /// Sets the `extensions` condition.
1599    ///
1600    /// See [`GetOptions::extensions`]
1601    #[must_use]
1602    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1603        self.extensions = extensions;
1604        self
1605    }
1606}
1607
1608/// Result for a get request
1609#[derive(Debug)]
1610pub struct GetResult {
1611    /// The [`GetResultPayload`]
1612    pub payload: GetResultPayload,
1613    /// The [`ObjectMeta`] for this object
1614    pub meta: ObjectMeta,
1615    /// The range of bytes returned by this request
1616    ///
1617    /// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM
1618    pub range: Range<u64>,
1619    /// Additional object attributes
1620    pub attributes: Attributes,
1621}
1622
1623/// The kind of a [`GetResult`]
1624///
1625/// This special cases the case of a local file, as some systems may
1626/// be able to optimise the case of a file already present on local disk
1627pub enum GetResultPayload {
1628    /// The file, path
1629    #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1630    File(std::fs::File, std::path::PathBuf),
1631    /// An opaque stream of bytes
1632    Stream(BoxStream<'static, Result<Bytes>>),
1633}
1634
1635impl Debug for GetResultPayload {
1636    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1637        match self {
1638            #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1639            Self::File(_, _) => write!(f, "GetResultPayload(File)"),
1640            Self::Stream(_) => write!(f, "GetResultPayload(Stream)"),
1641        }
1642    }
1643}
1644
1645impl GetResult {
1646    /// Collects the data into a [`Bytes`]
1647    pub async fn bytes(self) -> Result<Bytes> {
1648        let len = self.range.end - self.range.start;
1649        match self.payload {
1650            #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1651            GetResultPayload::File(mut file, path) => {
1652                maybe_spawn_blocking(move || {
1653                    file.seek(SeekFrom::Start(self.range.start as _))
1654                        .map_err(|source| local::Error::Seek {
1655                            source,
1656                            path: path.clone(),
1657                        })?;
1658
1659                    let mut buffer = if let Ok(len) = len.try_into() {
1660                        Vec::with_capacity(len)
1661                    } else {
1662                        Vec::new()
1663                    };
1664                    file.take(len as _)
1665                        .read_to_end(&mut buffer)
1666                        .map_err(|source| local::Error::UnableToReadBytes { source, path })?;
1667
1668                    Ok(buffer.into())
1669                })
1670                .await
1671            }
1672            GetResultPayload::Stream(s) => collect_bytes(s, Some(len)).await,
1673        }
1674    }
1675
1676    /// Converts this into a byte stream
1677    ///
1678    /// If the `self.kind` is [`GetResultPayload::File`] will perform chunked reads of the file,
1679    /// otherwise will return the [`GetResultPayload::Stream`].
1680    ///
1681    /// # Tokio Compatibility
1682    ///
1683    /// Tokio discourages performing blocking IO on a tokio worker thread, however,
1684    /// no major operating systems have stable async file APIs. Therefore if called from
1685    /// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
1686    /// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
1687    ///
1688    /// If not called from a tokio context, this will perform IO on the current thread with
1689    /// no additional complexity or overheads
1690    pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
1691        match self.payload {
1692            #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1693            GetResultPayload::File(file, path) => {
1694                const CHUNK_SIZE: usize = 8 * 1024;
1695                local::chunked_stream(file, path, self.range, CHUNK_SIZE)
1696            }
1697            GetResultPayload::Stream(s) => s,
1698        }
1699    }
1700}
1701
1702/// Configure preconditions for the put operation
1703#[derive(Debug, Clone, PartialEq, Eq, Default)]
1704pub enum PutMode {
1705    /// Perform an atomic write operation, overwriting any object present at the provided path
1706    #[default]
1707    Overwrite,
1708    /// Perform an atomic write operation, returning [`Error::AlreadyExists`] if an
1709    /// object already exists at the provided path
1710    Create,
1711    /// Perform an atomic write operation if the current version of the object matches the
1712    /// provided [`UpdateVersion`], returning [`Error::Precondition`] otherwise
1713    Update(UpdateVersion),
1714}
1715
1716/// Uniquely identifies a version of an object to update
1717///
1718/// Stores will use differing combinations of `e_tag` and `version` to provide conditional
1719/// updates, and it is therefore recommended applications preserve both
1720#[derive(Debug, Clone, PartialEq, Eq)]
1721pub struct UpdateVersion {
1722    /// The unique identifier for the newly created object
1723    ///
1724    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
1725    pub e_tag: Option<String>,
1726    /// A version indicator for the newly created object
1727    pub version: Option<String>,
1728}
1729
1730impl From<PutResult> for UpdateVersion {
1731    fn from(value: PutResult) -> Self {
1732        Self {
1733            e_tag: value.e_tag,
1734            version: value.version,
1735        }
1736    }
1737}
1738
1739/// Options for a put request
1740#[derive(Debug, Clone, Default)]
1741pub struct PutOptions {
1742    /// Configure the [`PutMode`] for this operation
1743    pub mode: PutMode,
1744    /// Provide a [`TagSet`] for this object
1745    ///
1746    /// Implementations that don't support object tagging should ignore this
1747    pub tags: TagSet,
1748    /// Provide a set of [`Attributes`]
1749    ///
1750    /// Implementations that don't support an attribute should return an error
1751    pub attributes: Attributes,
1752    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1753    /// that need to pass context-specific information (like tracing spans) via trait methods.
1754    ///
1755    /// These extensions are ignored entirely by backends offered through this crate.
1756    ///
1757    /// They are also excluded from [`PartialEq`] and [`Eq`].
1758    pub extensions: Extensions,
1759}
1760
1761impl PartialEq<Self> for PutOptions {
1762    fn eq(&self, other: &Self) -> bool {
1763        let Self {
1764            mode,
1765            tags,
1766            attributes,
1767            extensions: _,
1768        } = self;
1769        let Self {
1770            mode: other_mode,
1771            tags: other_tags,
1772            attributes: other_attributes,
1773            extensions: _,
1774        } = other;
1775        (mode == other_mode) && (tags == other_tags) && (attributes == other_attributes)
1776    }
1777}
1778
1779impl Eq for PutOptions {}
1780
1781impl From<PutMode> for PutOptions {
1782    fn from(mode: PutMode) -> Self {
1783        Self {
1784            mode,
1785            ..Default::default()
1786        }
1787    }
1788}
1789
1790impl From<TagSet> for PutOptions {
1791    fn from(tags: TagSet) -> Self {
1792        Self {
1793            tags,
1794            ..Default::default()
1795        }
1796    }
1797}
1798
1799impl From<Attributes> for PutOptions {
1800    fn from(attributes: Attributes) -> Self {
1801        Self {
1802            attributes,
1803            ..Default::default()
1804        }
1805    }
1806}
1807
1808// See <https://github.com/apache/arrow-rs-object-store/issues/339>.
1809#[doc(hidden)]
1810#[deprecated(note = "Use PutMultipartOptions", since = "0.12.3")]
1811pub type PutMultipartOpts = PutMultipartOptions;
1812
1813/// Options for [`ObjectStore::put_multipart_opts`]
1814#[derive(Debug, Clone, Default)]
1815pub struct PutMultipartOptions {
1816    /// Provide a [`TagSet`] for this object
1817    ///
1818    /// Implementations that don't support object tagging should ignore this
1819    pub tags: TagSet,
1820    /// Provide a set of [`Attributes`]
1821    ///
1822    /// Implementations that don't support an attribute should return an error
1823    pub attributes: Attributes,
1824    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1825    /// that need to pass context-specific information (like tracing spans) via trait methods.
1826    ///
1827    /// These extensions are ignored entirely by backends offered through this crate.
1828    ///
1829    /// They are also excluded from [`PartialEq`] and [`Eq`].
1830    pub extensions: Extensions,
1831}
1832
1833impl PartialEq<Self> for PutMultipartOptions {
1834    fn eq(&self, other: &Self) -> bool {
1835        let Self {
1836            tags,
1837            attributes,
1838            extensions: _,
1839        } = self;
1840        let Self {
1841            tags: other_tags,
1842            attributes: other_attributes,
1843            extensions: _,
1844        } = other;
1845        (tags == other_tags) && (attributes == other_attributes)
1846    }
1847}
1848
1849impl Eq for PutMultipartOptions {}
1850
1851impl From<TagSet> for PutMultipartOptions {
1852    fn from(tags: TagSet) -> Self {
1853        Self {
1854            tags,
1855            ..Default::default()
1856        }
1857    }
1858}
1859
1860impl From<Attributes> for PutMultipartOptions {
1861    fn from(attributes: Attributes) -> Self {
1862        Self {
1863            attributes,
1864            ..Default::default()
1865        }
1866    }
1867}
1868
1869/// Result for a put request
1870#[derive(Debug, Clone, PartialEq, Eq)]
1871pub struct PutResult {
1872    /// The unique identifier for the newly created object
1873    ///
1874    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
1875    pub e_tag: Option<String>,
1876    /// A version indicator for the newly created object
1877    pub version: Option<String>,
1878}
1879
1880/// Configure preconditions for the copy operation
1881#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1882pub enum CopyMode {
1883    /// Perform an atomic write operation, overwriting any object present at the provided path
1884    #[default]
1885    Overwrite,
1886    /// Perform an atomic write operation, returning [`Error::AlreadyExists`] if an
1887    /// object already exists at the provided path
1888    Create,
1889}
1890
1891/// Options for a copy request
1892#[derive(Debug, Clone, Default)]
1893pub struct CopyOptions {
1894    /// Configure the [`CopyMode`] for this operation
1895    pub mode: CopyMode,
1896    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1897    /// that need to pass context-specific information (like tracing spans) via trait methods.
1898    ///
1899    /// These extensions are ignored entirely by backends offered through this crate.
1900    ///
1901    /// They are also excluded from [`PartialEq`] and [`Eq`].
1902    pub extensions: Extensions,
1903}
1904
1905impl CopyOptions {
1906    /// Create a new [`CopyOptions`]
1907    pub fn new() -> Self {
1908        Self::default()
1909    }
1910
1911    /// Sets the `mode.
1912    ///
1913    /// See [`CopyOptions::mode`].
1914    #[must_use]
1915    pub fn with_mode(mut self, mode: CopyMode) -> Self {
1916        self.mode = mode;
1917        self
1918    }
1919
1920    /// Sets the `extensions`.
1921    ///
1922    /// See [`CopyOptions::extensions`].
1923    #[must_use]
1924    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1925        self.extensions = extensions;
1926        self
1927    }
1928}
1929
1930impl PartialEq<Self> for CopyOptions {
1931    fn eq(&self, other: &Self) -> bool {
1932        let Self {
1933            mode,
1934            extensions: _,
1935        } = self;
1936        let Self {
1937            mode: mode_other,
1938            extensions: _,
1939        } = other;
1940
1941        mode == mode_other
1942    }
1943}
1944
1945impl Eq for CopyOptions {}
1946
1947/// Configure preconditions for the target of rename operation.
1948///
1949/// Note though that the source location may or not be deleted at the same time in an atomic operation. There is
1950/// currently NO flag to control the atomicity of "delete source at the same time as creating the target".
1951#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1952pub enum RenameTargetMode {
1953    /// Perform a write operation on the target, overwriting any object present at the provided path.
1954    #[default]
1955    Overwrite,
1956    /// Perform an atomic write operation of the target, returning [`Error::AlreadyExists`] if an
1957    /// object already exists at the provided path.
1958    Create,
1959}
1960
1961/// Options for a rename request
1962#[derive(Debug, Clone, Default)]
1963pub struct RenameOptions {
1964    /// Configure the [`RenameTargetMode`] for this operation
1965    pub target_mode: RenameTargetMode,
1966    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1967    /// that need to pass context-specific information (like tracing spans) via trait methods.
1968    ///
1969    /// These extensions are ignored entirely by backends offered through this crate.
1970    ///
1971    /// They are also excluded from [`PartialEq`] and [`Eq`].
1972    pub extensions: Extensions,
1973}
1974
1975impl RenameOptions {
1976    /// Create a new [`RenameOptions`]
1977    pub fn new() -> Self {
1978        Self::default()
1979    }
1980
1981    /// Sets the `target_mode=.
1982    ///
1983    /// See [`RenameOptions::target_mode`].
1984    #[must_use]
1985    pub fn with_target_mode(mut self, target_mode: RenameTargetMode) -> Self {
1986        self.target_mode = target_mode;
1987        self
1988    }
1989
1990    /// Sets the `extensions`.
1991    ///
1992    /// See [`RenameOptions::extensions`].
1993    #[must_use]
1994    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1995        self.extensions = extensions;
1996        self
1997    }
1998}
1999
2000impl PartialEq<Self> for RenameOptions {
2001    fn eq(&self, other: &Self) -> bool {
2002        let Self {
2003            target_mode,
2004            extensions: _,
2005        } = self;
2006        let Self {
2007            target_mode: target_mode_other,
2008            extensions: _,
2009        } = other;
2010
2011        target_mode == target_mode_other
2012    }
2013}
2014
2015impl Eq for RenameOptions {}
2016
2017/// A specialized `Result` for object store-related errors
2018pub type Result<T, E = Error> = std::result::Result<T, E>;
2019
2020/// A specialized `Error` for object store-related errors
2021#[derive(Debug, thiserror::Error)]
2022#[non_exhaustive]
2023pub enum Error {
2024    /// A fallback error type when no variant matches
2025    #[error("Generic {} error: {}", store, source)]
2026    Generic {
2027        /// The store this error originated from
2028        store: &'static str,
2029        /// The wrapped error
2030        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2031    },
2032
2033    /// Error when the object is not found at given location
2034    #[error("Object at location {} not found: {}", path, source)]
2035    NotFound {
2036        /// The path to file
2037        path: String,
2038        /// The wrapped error
2039        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2040    },
2041
2042    /// Error for invalid path
2043    #[error("Encountered object with invalid path: {}", source)]
2044    InvalidPath {
2045        /// The wrapped error
2046        #[from]
2047        source: path::Error,
2048    },
2049
2050    /// Error when `tokio::spawn` failed
2051    #[error("Error joining spawned task: {}", source)]
2052    JoinError {
2053        /// The wrapped error
2054        #[from]
2055        source: tokio::task::JoinError,
2056    },
2057
2058    /// Error when the attempted operation is not supported
2059    #[error("Operation not supported: {}", source)]
2060    NotSupported {
2061        /// The wrapped error
2062        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2063    },
2064
2065    /// Error when the object already exists
2066    #[error("Object at location {} already exists: {}", path, source)]
2067    AlreadyExists {
2068        /// The path to the
2069        path: String,
2070        /// The wrapped error
2071        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2072    },
2073
2074    /// Error when the required conditions failed for the operation
2075    #[error("Request precondition failure for path {}: {}", path, source)]
2076    Precondition {
2077        /// The path to the file
2078        path: String,
2079        /// The wrapped error
2080        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2081    },
2082
2083    /// Error when the object at the location isn't modified
2084    #[error("Object at location {} not modified: {}", path, source)]
2085    NotModified {
2086        /// The path to the file
2087        path: String,
2088        /// The wrapped error
2089        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2090    },
2091
2092    /// Error when an operation is not implemented
2093    #[error("Operation {operation} not yet implemented by {implementer}.")]
2094    NotImplemented {
2095        /// What isn't implemented. Should include at least the method
2096        /// name that was called; could also include other relevant
2097        /// subcontexts.
2098        operation: String,
2099
2100        /// Which driver this is that hasn't implemented this operation,
2101        /// to aid debugging in contexts that may be using multiple implementations.
2102        implementer: String,
2103    },
2104
2105    /// Error when the used credentials don't have enough permission
2106    /// to perform the requested operation
2107    #[error(
2108        "The operation lacked the necessary privileges to complete for path {}: {}",
2109        path,
2110        source
2111    )]
2112    PermissionDenied {
2113        /// The path to the file
2114        path: String,
2115        /// The wrapped error
2116        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2117    },
2118
2119    /// Error when the used credentials lack valid authentication
2120    #[error(
2121        "The operation lacked valid authentication credentials for path {}: {}",
2122        path,
2123        source
2124    )]
2125    Unauthenticated {
2126        /// The path to the file
2127        path: String,
2128        /// The wrapped error
2129        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2130    },
2131
2132    /// Error when a configuration key is invalid for the store used
2133    #[error("Configuration key: '{}' is not valid for store '{}'.", key, store)]
2134    UnknownConfigurationKey {
2135        /// The object store used
2136        store: &'static str,
2137        /// The configuration key used
2138        key: String,
2139    },
2140}
2141
2142impl From<Error> for std::io::Error {
2143    fn from(e: Error) -> Self {
2144        let kind = match &e {
2145            Error::NotFound { .. } => std::io::ErrorKind::NotFound,
2146            _ => std::io::ErrorKind::Other,
2147        };
2148        Self::new(kind, e)
2149    }
2150}
2151
2152#[cfg(test)]
2153mod tests {
2154    use super::*;
2155
2156    use chrono::TimeZone;
2157
2158    macro_rules! maybe_skip_integration {
2159        () => {
2160            if std::env::var("TEST_INTEGRATION").is_err() {
2161                eprintln!("Skipping integration test - set TEST_INTEGRATION");
2162                return;
2163            }
2164        };
2165    }
2166    pub(crate) use maybe_skip_integration;
2167
2168    /// Test that the returned stream does not borrow the lifetime of Path
2169    fn list_store<'a>(
2170        store: &'a dyn ObjectStore,
2171        path_str: &str,
2172    ) -> BoxStream<'a, Result<ObjectMeta>> {
2173        let path = Path::from(path_str);
2174        store.list(Some(&path))
2175    }
2176
2177    #[cfg(any(feature = "azure", feature = "aws"))]
2178    pub(crate) async fn signing<T>(integration: &T)
2179    where
2180        T: ObjectStore + signer::Signer,
2181    {
2182        use reqwest::Method;
2183        use std::time::Duration;
2184
2185        let data = Bytes::from("hello world");
2186        let path = Path::from("file.txt");
2187        integration.put(&path, data.clone().into()).await.unwrap();
2188
2189        let signed = integration
2190            .signed_url(Method::GET, &path, Duration::from_secs(60))
2191            .await
2192            .unwrap();
2193
2194        let resp = reqwest::get(signed).await.unwrap();
2195        let loaded = resp.bytes().await.unwrap();
2196
2197        assert_eq!(data, loaded);
2198    }
2199
2200    #[cfg(any(feature = "aws", feature = "azure"))]
2201    pub(crate) async fn tagging<F, Fut>(storage: Arc<dyn ObjectStore>, validate: bool, get_tags: F)
2202    where
2203        F: Fn(Path) -> Fut + Send + Sync,
2204        Fut: std::future::Future<Output = Result<client::HttpResponse>> + Send,
2205    {
2206        use bytes::Buf;
2207        use serde::Deserialize;
2208        use tokio::io::AsyncWriteExt;
2209
2210        use crate::buffered::BufWriter;
2211
2212        #[derive(Deserialize)]
2213        struct Tagging {
2214            #[serde(rename = "TagSet")]
2215            list: TagList,
2216        }
2217
2218        #[derive(Debug, Deserialize)]
2219        struct TagList {
2220            #[serde(rename = "Tag")]
2221            tags: Vec<Tag>,
2222        }
2223
2224        #[derive(Debug, Deserialize, Eq, PartialEq)]
2225        #[serde(rename_all = "PascalCase")]
2226        struct Tag {
2227            key: String,
2228            value: String,
2229        }
2230
2231        let tags = vec![
2232            Tag {
2233                key: "foo.com=bar/s".to_string(),
2234                value: "bananas/foo.com-_".to_string(),
2235            },
2236            Tag {
2237                key: "namespace/key.foo".to_string(),
2238                value: "value with a space".to_string(),
2239            },
2240        ];
2241        let mut tag_set = TagSet::default();
2242        for t in &tags {
2243            tag_set.push(&t.key, &t.value)
2244        }
2245
2246        let path = Path::from("tag_test");
2247        storage
2248            .put_opts(&path, "test".into(), tag_set.clone().into())
2249            .await
2250            .unwrap();
2251
2252        let multi_path = Path::from("tag_test_multi");
2253        let mut write = storage
2254            .put_multipart_opts(&multi_path, tag_set.clone().into())
2255            .await
2256            .unwrap();
2257
2258        write.put_part("foo".into()).await.unwrap();
2259        write.complete().await.unwrap();
2260
2261        let buf_path = Path::from("tag_test_buf");
2262        let mut buf = BufWriter::new(storage, buf_path.clone()).with_tags(tag_set);
2263        buf.write_all(b"foo").await.unwrap();
2264        buf.shutdown().await.unwrap();
2265
2266        // Write should always succeed, but certain configurations may simply ignore tags
2267        if !validate {
2268            return;
2269        }
2270
2271        for path in [path, multi_path, buf_path] {
2272            let resp = get_tags(path.clone()).await.unwrap();
2273            let body = resp.into_body().bytes().await.unwrap();
2274
2275            let mut resp: Tagging = quick_xml::de::from_reader(body.reader()).unwrap();
2276            resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key));
2277            assert_eq!(resp.list.tags, tags);
2278        }
2279    }
2280
2281    #[tokio::test]
2282    async fn test_list_lifetimes() {
2283        let store = memory::InMemory::new();
2284        let mut stream = list_store(&store, "path");
2285        assert!(stream.next().await.is_none());
2286    }
2287
2288    #[test]
2289    fn test_preconditions() {
2290        let mut meta = ObjectMeta {
2291            location: Path::from("test"),
2292            last_modified: Utc.timestamp_nanos(100),
2293            size: 100,
2294            e_tag: Some("123".to_string()),
2295            version: None,
2296        };
2297
2298        let mut options = GetOptions::default();
2299        options.check_preconditions(&meta).unwrap();
2300
2301        options.if_modified_since = Some(Utc.timestamp_nanos(50));
2302        options.check_preconditions(&meta).unwrap();
2303
2304        options.if_modified_since = Some(Utc.timestamp_nanos(100));
2305        options.check_preconditions(&meta).unwrap_err();
2306
2307        options.if_modified_since = Some(Utc.timestamp_nanos(101));
2308        options.check_preconditions(&meta).unwrap_err();
2309
2310        options = GetOptions::default();
2311
2312        options.if_unmodified_since = Some(Utc.timestamp_nanos(50));
2313        options.check_preconditions(&meta).unwrap_err();
2314
2315        options.if_unmodified_since = Some(Utc.timestamp_nanos(100));
2316        options.check_preconditions(&meta).unwrap();
2317
2318        options.if_unmodified_since = Some(Utc.timestamp_nanos(101));
2319        options.check_preconditions(&meta).unwrap();
2320
2321        options = GetOptions::default();
2322
2323        options.if_match = Some("123".to_string());
2324        options.check_preconditions(&meta).unwrap();
2325
2326        options.if_match = Some("123,354".to_string());
2327        options.check_preconditions(&meta).unwrap();
2328
2329        options.if_match = Some("354, 123,".to_string());
2330        options.check_preconditions(&meta).unwrap();
2331
2332        options.if_match = Some("354".to_string());
2333        options.check_preconditions(&meta).unwrap_err();
2334
2335        options.if_match = Some("*".to_string());
2336        options.check_preconditions(&meta).unwrap();
2337
2338        // If-Match takes precedence
2339        options.if_unmodified_since = Some(Utc.timestamp_nanos(200));
2340        options.check_preconditions(&meta).unwrap();
2341
2342        options = GetOptions::default();
2343
2344        options.if_none_match = Some("123".to_string());
2345        options.check_preconditions(&meta).unwrap_err();
2346
2347        options.if_none_match = Some("*".to_string());
2348        options.check_preconditions(&meta).unwrap_err();
2349
2350        options.if_none_match = Some("1232".to_string());
2351        options.check_preconditions(&meta).unwrap();
2352
2353        options.if_none_match = Some("23, 123".to_string());
2354        options.check_preconditions(&meta).unwrap_err();
2355
2356        // If-None-Match takes precedence
2357        options.if_modified_since = Some(Utc.timestamp_nanos(10));
2358        options.check_preconditions(&meta).unwrap_err();
2359
2360        // Check missing ETag
2361        meta.e_tag = None;
2362        options = GetOptions::default();
2363
2364        options.if_none_match = Some("*".to_string()); // Fails if any file exists
2365        options.check_preconditions(&meta).unwrap_err();
2366
2367        options = GetOptions::default();
2368        options.if_match = Some("*".to_string()); // Passes if file exists
2369        options.check_preconditions(&meta).unwrap();
2370    }
2371
2372    #[test]
2373    #[cfg(feature = "http")]
2374    fn test_reexported_types() {
2375        // Test HeaderMap
2376        let mut headers = HeaderMap::new();
2377        headers.insert("content-type", HeaderValue::from_static("text/plain"));
2378        assert_eq!(headers.len(), 1);
2379
2380        // Test HeaderValue
2381        let value = HeaderValue::from_static("test-value");
2382        assert_eq!(value.as_bytes(), b"test-value");
2383
2384        // Test Extensions
2385        let mut extensions = Extensions::new();
2386        extensions.insert("test-key");
2387        assert!(extensions.get::<&str>().is_some());
2388    }
2389
2390    #[test]
2391    fn test_get_options_builder() {
2392        let dt = Utc::now();
2393        let extensions = Extensions::new();
2394
2395        let options = GetOptions::new();
2396
2397        // assert defaults
2398        assert_eq!(options.if_match, None);
2399        assert_eq!(options.if_none_match, None);
2400        assert_eq!(options.if_modified_since, None);
2401        assert_eq!(options.if_unmodified_since, None);
2402        assert_eq!(options.range, None);
2403        assert_eq!(options.version, None);
2404        assert!(!options.head);
2405        assert!(options.extensions.get::<&str>().is_none());
2406
2407        let options = options
2408            .with_if_match(Some("etag-match"))
2409            .with_if_none_match(Some("etag-none-match"))
2410            .with_if_modified_since(Some(dt))
2411            .with_if_unmodified_since(Some(dt))
2412            .with_range(Some(0..100))
2413            .with_version(Some("version-1"))
2414            .with_head(true)
2415            .with_extensions(extensions.clone());
2416
2417        assert_eq!(options.if_match, Some("etag-match".to_string()));
2418        assert_eq!(options.if_none_match, Some("etag-none-match".to_string()));
2419        assert_eq!(options.if_modified_since, Some(dt));
2420        assert_eq!(options.if_unmodified_since, Some(dt));
2421        assert_eq!(options.range, Some(GetRange::Bounded(0..100)));
2422        assert_eq!(options.version, Some("version-1".to_string()));
2423        assert!(options.head);
2424        assert_eq!(options.extensions.get::<&str>(), extensions.get::<&str>());
2425    }
2426
2427    fn takes_generic_object_store<T: ObjectStore>(store: T) {
2428        // This function is just to ensure that the trait bounds are satisfied
2429        let _ = store;
2430    }
2431    #[test]
2432    fn test_dyn_impl() {
2433        let store: Arc<dyn ObjectStore> = Arc::new(memory::InMemory::new());
2434        takes_generic_object_store(store);
2435        let store: Box<dyn ObjectStore> = Box::new(memory::InMemory::new());
2436        takes_generic_object_store(store);
2437    }
2438    #[test]
2439    fn test_generic_impl() {
2440        let store = Arc::new(memory::InMemory::new());
2441        takes_generic_object_store(store);
2442        let store = Box::new(memory::InMemory::new());
2443        takes_generic_object_store(store);
2444    }
2445}