Skip to main content

object_store/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![cfg_attr(docsrs, feature(doc_cfg))]
19#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
20#![warn(
21    missing_copy_implementations,
22    missing_debug_implementations,
23    missing_docs,
24    clippy::explicit_iter_loop,
25    clippy::future_not_send,
26    clippy::use_self,
27    clippy::clone_on_ref_ptr,
28    unreachable_pub
29)]
30
31//! # object_store
32//!
33//! This crate provides a uniform API for interacting with object
34//! storage services and local files via the [`ObjectStore`]
35//! trait.
36//!
37//! Using this crate, the same binary and code can run in multiple
38//! clouds and local test environments, via a simple runtime
39//! configuration change.
40//!
41//! # Highlights
42//!
43//! 1. A high-performance async API focused on providing a consistent interface
44//!    mirroring that of object stores such as [S3]
45//!
46//! 2. Production quality, leading this crate to be used in large
47//!    scale production systems, such as [crates.io] and [InfluxDB IOx]
48//!
49//! 3. Support for advanced functionality, including atomic, conditional reads
50//!    and writes, vectored IO, bulk deletion, and more...
51//!
52//! 4. Stable and predictable governance via the [Apache Arrow] project
53//!
54//! 5. Small dependency footprint, depending on only a small number of common crates
55//!
56//! Originally developed by [InfluxData] and subsequently donated
57//! to [Apache Arrow].
58//!
59//! [Apache Arrow]: https://arrow.apache.org/
60//! [InfluxData]: https://www.influxdata.com/
61//! [crates.io]: https://github.com/rust-lang/crates.io
62//! [ACID]: https://en.wikipedia.org/wiki/ACID
63//! [S3]: https://aws.amazon.com/s3/
64//!
65//! # APIs
66//!
67//! * [`ObjectStore`]: Core object store API
68//! * [`ObjectStoreExt`]: (*New in 0.13.0*) Extension trait with additional convenience methods
69//!
70//! # Available [`ObjectStore`] Implementations
71//!
72//! By default, this crate provides the following implementations:
73//!
74//! * Memory: [`InMemory`](memory::InMemory)
75//!
76//! Feature flags are used to enable support for other implementations:
77//!
78#![cfg_attr(
79    feature = "fs",
80    doc = "* Local filesystem: [`LocalFileSystem`](local::LocalFileSystem)"
81)]
82#![cfg_attr(
83    feature = "gcp",
84    doc = "* [`gcp`]: [Google Cloud Storage](https://cloud.google.com/storage/) support. See [`GoogleCloudStorageBuilder`](gcp::GoogleCloudStorageBuilder)"
85)]
86#![cfg_attr(
87    feature = "aws",
88    doc = "* [`aws`]: [Amazon S3](https://aws.amazon.com/s3/). See [`AmazonS3Builder`](aws::AmazonS3Builder)"
89)]
90#![cfg_attr(
91    feature = "azure",
92    doc = "* [`azure`]: [Azure Blob Storage](https://azure.microsoft.com/en-gb/services/storage/blobs/). See [`MicrosoftAzureBuilder`](azure::MicrosoftAzureBuilder)"
93)]
94#![cfg_attr(
95    feature = "http",
96    doc = "* [`http`]: [HTTP/WebDAV Storage](https://datatracker.ietf.org/doc/html/rfc2518). See [`HttpBuilder`](http::HttpBuilder)"
97)]
98//!
99//! # Why not a Filesystem Interface?
100//!
101//! The [`ObjectStore`] interface is designed to mirror the APIs
102//! of object stores and *not* filesystems, and thus has stateless APIs instead
103//! of cursor based interfaces such as [`Read`] or [`Seek`] available in filesystems.
104//!
105//! This design provides the following advantages:
106//!
107//! * All operations are atomic, and readers cannot observe partial and/or failed writes
108//! * Methods map directly to object store APIs, providing both efficiency and predictability
109//! * Abstracts away filesystem and operating system specific quirks, ensuring portability
110//! * Allows for functionality not native to filesystems, such as operation preconditions
111//!   and atomic multipart uploads
112//!
113//! This crate does provide [`BufReader`] and [`BufWriter`] adapters
114//! which provide a more filesystem-like API for working with the
115//! [`ObjectStore`] trait, however, they should be used with care
116//!
117//! [`BufReader`]: buffered::BufReader
118//! [`BufWriter`]: buffered::BufWriter
119//! [`Read`]: std::io::Read
120//! [`Seek`]: std::io::Seek
121//!
122//! # Adapters
123//!
124//! [`ObjectStore`] instances can be composed with various adapters
125//! which add additional functionality:
126//!
127//! * Rate Throttling: [`ThrottleConfig`](throttle::ThrottleConfig)
128//! * Concurrent Request Limit: [`LimitStore`](limit::LimitStore)
129//!
130//! # Configuration System
131//!
132//! This crate provides a configuration system inspired by the APIs exposed by [fsspec],
133//! [PyArrow FileSystem], and [Hadoop FileSystem], allowing creating a [`DynObjectStore`]
134//! from a URL and an optional list of key value pairs. This provides a flexible interface
135//! to support a wide variety of user-defined store configurations, with minimal additional
136//! application complexity.
137//!
138//! ```no_run,ignore-wasm32
139//! # #[cfg(feature = "aws")] {
140//! # use url::Url;
141//! # use object_store::{parse_url, parse_url_opts};
142//! # use object_store::aws::{AmazonS3, AmazonS3Builder};
143//! #
144//! #
145//! // Can manually create a specific store variant using the appropriate builder
146//! let store: AmazonS3 = AmazonS3Builder::from_env()
147//!     .with_bucket_name("my-bucket").build().unwrap();
148//!
149//! // Alternatively can create an ObjectStore from an S3 URL
150//! let url = Url::parse("s3://bucket/path").unwrap();
151//! let (store, path) = parse_url(&url).unwrap();
152//! assert_eq!(path.as_ref(), "path");
153//!
154//! // Potentially with additional options
155//! let (store, path) = parse_url_opts(&url, vec![("aws_access_key_id", "...")]).unwrap();
156//!
157//! // Or with URLs that encode the bucket name in the URL path
158//! let url = Url::parse("https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket/path").unwrap();
159//! let (store, path) = parse_url(&url).unwrap();
160//! assert_eq!(path.as_ref(), "path");
161//! # }
162//! ```
163//!
164//! [PyArrow FileSystem]: https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.from_uri
165//! [fsspec]: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem
166//! [Hadoop FileSystem]: https://hadoop.apache.org/docs/r3.0.0/api/org/apache/hadoop/fs/FileSystem.html#get-java.net.URI-org.apache.hadoop.conf.Configuration-
167//!
168//! # List objects
169//!
170//! Use the [`ObjectStore::list`] method to iterate over objects in
171//! remote storage or files in the local filesystem:
172//!
173//! ```ignore-wasm32
174//! # use object_store::local::LocalFileSystem;
175//! # use std::sync::Arc;
176//! # use object_store::{path::Path, ObjectStore};
177//! # use futures_util::stream::StreamExt;
178//! # // use LocalFileSystem for example
179//! # fn get_object_store() -> Arc<dyn ObjectStore> {
180//! #   Arc::new(LocalFileSystem::new())
181//! # }
182//! #
183//! # async fn example() {
184//! #
185//! // create an ObjectStore
186//! let object_store: Arc<dyn ObjectStore> = get_object_store();
187//!
188//! // Recursively list all files below the 'data' path.
189//! // 1. On AWS S3 this would be the 'data/' prefix
190//! // 2. On a local filesystem, this would be the 'data' directory
191//! let prefix = Path::from("data");
192//!
193//! // Get an `async` stream of Metadata objects:
194//! let mut list_stream = object_store.list(Some(&prefix));
195//!
196//! // Print a line about each object
197//! while let Some(meta) = list_stream.next().await.transpose().unwrap() {
198//!     println!("Name: {}, size: {}", meta.location, meta.size);
199//! }
200//! # }
201//! ```
202//!
203//! Which will print out something like the following:
204//!
205//! ```text
206//! Name: data/file01.parquet, size: 112832
207//! Name: data/file02.parquet, size: 143119
208//! Name: data/child/file03.parquet, size: 100
209//! ...
210//! ```
211//!
212//! # Fetch objects
213//!
214//! Use the [`ObjectStoreExt::get`] / [`ObjectStore::get_opts`] method to fetch the data bytes
215//! from remote storage or files in the local filesystem as a stream.
216//!
217//! ```ignore-wasm32
218//! # use futures_util::TryStreamExt;
219//! # use object_store::local::LocalFileSystem;
220//! # use std::sync::Arc;
221//! #  use bytes::Bytes;
222//! # use object_store::{path::Path, ObjectStore, ObjectStoreExt, GetResult};
223//! # fn get_object_store() -> Arc<dyn ObjectStore> {
224//! #   Arc::new(LocalFileSystem::new())
225//! # }
226//! #
227//! # async fn example() {
228//! #
229//! // Create an ObjectStore
230//! let object_store: Arc<dyn ObjectStore> = get_object_store();
231//!
232//! // Retrieve a specific file
233//! let path = Path::from("data/file01.parquet");
234//!
235//! // Fetch just the file metadata
236//! let meta = object_store.head(&path).await.unwrap();
237//! println!("{meta:?}");
238//!
239//! // Fetch the object including metadata
240//! let result: GetResult = object_store.get(&path).await.unwrap();
241//! assert_eq!(result.meta, meta);
242//!
243//! // Buffer the entire object in memory
244//! let object: Bytes = result.bytes().await.unwrap();
245//! assert_eq!(object.len() as u64, meta.size);
246//!
247//! // Alternatively stream the bytes from object storage
248//! let stream = object_store.get(&path).await.unwrap().into_stream();
249//!
250//! // Count the '0's using `try_fold` from `TryStreamExt` trait
251//! let num_zeros = stream
252//!     .try_fold(0, |acc, bytes| async move {
253//!         Ok(acc + bytes.iter().filter(|b| **b == 0).count())
254//!     }).await.unwrap();
255//!
256//! println!("Num zeros in {} is {}", path, num_zeros);
257//! # }
258//! ```
259//!
260//! # Put Object
261//!
262//! Use the [`ObjectStoreExt::put`] method to atomically write data.
263//!
264//! ```ignore-wasm32
265//! # use object_store::local::LocalFileSystem;
266//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayload};
267//! # use std::sync::Arc;
268//! # use object_store::path::Path;
269//! # fn get_object_store() -> Arc<dyn ObjectStore> {
270//! #   Arc::new(LocalFileSystem::new())
271//! # }
272//! # async fn put() {
273//! #
274//! let object_store: Arc<dyn ObjectStore> = get_object_store();
275//! let path = Path::from("data/file1");
276//! let payload = PutPayload::from_static(b"hello");
277//! object_store.put(&path, payload).await.unwrap();
278//! # }
279//! ```
280//!
281//! # Multipart Upload
282//!
283//! Use the [`ObjectStoreExt::put_multipart`] / [`ObjectStore::put_multipart_opts`] method to atomically write a large
284//! amount of data
285//!
286//! ```ignore-wasm32
287//! # use object_store::local::LocalFileSystem;
288//! # use object_store::{ObjectStore, ObjectStoreExt, WriteMultipart};
289//! # use std::sync::Arc;
290//! # use bytes::Bytes;
291//! # use tokio::io::AsyncWriteExt;
292//! # use object_store::path::Path;
293//! # fn get_object_store() -> Arc<dyn ObjectStore> {
294//! #   Arc::new(LocalFileSystem::new())
295//! # }
296//! # async fn multi_upload() {
297//! #
298//! let object_store: Arc<dyn ObjectStore> = get_object_store();
299//! let path = Path::from("data/large_file");
300//! let upload =  object_store.put_multipart(&path).await.unwrap();
301//! let mut write = WriteMultipart::new(upload);
302//! write.write(b"hello");
303//! write.finish().await.unwrap();
304//! # }
305//! ```
306//!
307//! # Vectored Read
308//!
309//! A common pattern, especially when reading structured datasets, is to need to fetch
310//! multiple, potentially non-contiguous, ranges of a particular object.
311//!
312//! [`ObjectStore::get_ranges`] provides an efficient way to perform such vectored IO, and will
313//! automatically coalesce adjacent ranges into an appropriate number of parallel requests.
314//!
315//! ```ignore-wasm32
316//! # use object_store::local::LocalFileSystem;
317//! # use object_store::ObjectStore;
318//! # use std::sync::Arc;
319//! # use bytes::Bytes;
320//! # use tokio::io::AsyncWriteExt;
321//! # use object_store::path::Path;
322//! # fn get_object_store() -> Arc<dyn ObjectStore> {
323//! #   Arc::new(LocalFileSystem::new())
324//! # }
325//! # async fn multi_upload() {
326//! #
327//! let object_store: Arc<dyn ObjectStore> = get_object_store();
328//! let path = Path::from("data/large_file");
329//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap();
330//! assert_eq!(ranges.len(), 3);
331//! assert_eq!(ranges[0].len(), 10);
332//! # }
333//! ```
334//!
335//! To retrieve ranges from a versioned object, use [`ObjectStore::get_opts`] by specifying the range in the [`GetOptions`].
336//!
337//! ```ignore-wasm32
338//! # use object_store::local::LocalFileSystem;
339//! # use object_store::ObjectStore;
340//! # use object_store::GetOptions;
341//! # use std::sync::Arc;
342//! # use bytes::Bytes;
343//! # use tokio::io::AsyncWriteExt;
344//! # use object_store::path::Path;
345//! # fn get_object_store() -> Arc<dyn ObjectStore> {
346//! #   Arc::new(LocalFileSystem::new())
347//! # }
348//! # async fn get_range_with_options() {
349//! #
350//! let object_store: Arc<dyn ObjectStore> = get_object_store();
351//! let path = Path::from("data/large_file");
352//! let ranges = vec![90..100, 400..600, 0..10];
353//! for range in ranges {
354//!     let opts = GetOptions::default().with_range(Some(range));
355//!     let data = object_store.get_opts(&path, opts).await.unwrap();
356//!     // Do something with the data
357//! }
358//! # }
359//! ``````
360//!
361//! # Vectored Write
362//!
363//! When writing data it is often the case that the size of the output is not known ahead of time.
364//!
365//! A common approach to handling this is to bump-allocate a `Vec`, whereby the underlying
366//! allocation is repeatedly reallocated, each time doubling the capacity. The performance of
367//! this is suboptimal as reallocating memory will often involve copying it to a new location.
368//!
369//! Fortunately, as [`PutPayload`] does not require memory regions to be contiguous, it is
370//! possible to instead allocate memory in chunks and avoid bump allocating. [`PutPayloadMut`]
371//! encapsulates this approach
372//!
373//! ```ignore-wasm32
374//! # use object_store::local::LocalFileSystem;
375//! # use object_store::{ObjectStore, ObjectStoreExt, PutPayloadMut};
376//! # use std::sync::Arc;
377//! # use bytes::Bytes;
378//! # use tokio::io::AsyncWriteExt;
379//! # use object_store::path::Path;
380//! # fn get_object_store() -> Arc<dyn ObjectStore> {
381//! #   Arc::new(LocalFileSystem::new())
382//! # }
383//! # async fn multi_upload() {
384//! #
385//! let object_store: Arc<dyn ObjectStore> = get_object_store();
386//! let path = Path::from("data/large_file");
387//! let mut buffer = PutPayloadMut::new().with_block_size(8192);
388//! for _ in 0..22 {
389//!     buffer.extend_from_slice(&[0; 1024]);
390//! }
391//! let payload = buffer.freeze();
392//!
393//! // Payload consists of 3 separate 8KB allocations
394//! assert_eq!(payload.as_ref().len(), 3);
395//! assert_eq!(payload.as_ref()[0].len(), 8192);
396//! assert_eq!(payload.as_ref()[1].len(), 8192);
397//! assert_eq!(payload.as_ref()[2].len(), 6144);
398//!
399//! object_store.put(&path, payload).await.unwrap();
400//! # }
401//! ```
402//!
403//! # Conditional Fetch
404//!
405//! More complex object retrieval can be supported by [`ObjectStore::get_opts`].
406//!
407//! For example, efficiently refreshing a cache without re-fetching the entire object
408//! data if the object hasn't been modified.
409//!
410//! ```
411//! # use std::collections::btree_map::Entry;
412//! # use std::collections::HashMap;
413//! # use object_store::{GetOptions, GetResult, ObjectStore, ObjectStoreExt, Result, Error};
414//! # use std::sync::Arc;
415//! # use std::time::{Duration, Instant};
416//! # use bytes::Bytes;
417//! # use tokio::io::AsyncWriteExt;
418//! # use object_store::path::Path;
419//! struct CacheEntry {
420//!     /// Data returned by last request
421//!     data: Bytes,
422//!     /// ETag identifying the object returned by the server
423//!     e_tag: String,
424//!     /// Instant of last refresh
425//!     refreshed_at: Instant,
426//! }
427//!
428//! /// Example cache that checks entries after 10 seconds for a new version
429//! struct Cache {
430//!     entries: HashMap<Path, CacheEntry>,
431//!     store: Arc<dyn ObjectStore>,
432//! }
433//!
434//! impl Cache {
435//!     pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
436//!         Ok(match self.entries.get_mut(path) {
437//!             Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) {
438//!                 true => e.data.clone(), // Return cached data
439//!                 false => { // Check if remote version has changed
440//!                     let opts = GetOptions::new().with_if_none_match(Some(e.e_tag.clone()));
441//!                     match self.store.get_opts(&path, opts).await {
442//!                         Ok(d) => e.data = d.bytes().await?,
443//!                         Err(Error::NotModified { .. }) => {} // Data has not changed
444//!                         Err(e) => return Err(e),
445//!                     };
446//!                     e.refreshed_at = Instant::now();
447//!                     e.data.clone()
448//!                 }
449//!             },
450//!             None => { // Not cached, fetch data
451//!                 let get = self.store.get(&path).await?;
452//!                 let e_tag = get.meta.e_tag.clone();
453//!                 let data = get.bytes().await?;
454//!                 if let Some(e_tag) = e_tag {
455//!                     let entry = CacheEntry {
456//!                         e_tag,
457//!                         data: data.clone(),
458//!                         refreshed_at: Instant::now(),
459//!                     };
460//!                     self.entries.insert(path.clone(), entry);
461//!                 }
462//!                 data
463//!             }
464//!         })
465//!     }
466//! }
467//! ```
468//!
469//! # Conditional Put
470//!
471//! The default behaviour when writing data is to upsert any existing object at the given path,
472//! overwriting any previous value. More complex behaviours can be achieved using [`PutMode`], and
473//! can be used to build [Optimistic Concurrency Control] based transactions. This facilitates
474//! building metadata catalogs, such as [Apache Iceberg] or [Delta Lake], directly on top of object
475//! storage, without relying on a separate DBMS.
476//!
477//! ```
478//! # use object_store::{Error, ObjectStore, ObjectStoreExt, PutMode, UpdateVersion};
479//! # use std::sync::Arc;
480//! # use bytes::Bytes;
481//! # use tokio::io::AsyncWriteExt;
482//! # use object_store::memory::InMemory;
483//! # use object_store::path::Path;
484//! # fn get_object_store() -> Arc<dyn ObjectStore> {
485//! #   Arc::new(InMemory::new())
486//! # }
487//! # fn do_update(b: Bytes) -> Bytes {b}
488//! # async fn conditional_put() {
489//! let store = get_object_store();
490//! let path = Path::from("test");
491//!
492//! // Perform a conditional update on path
493//! loop {
494//!     // Perform get request
495//!     let r = store.get(&path).await.unwrap();
496//!
497//!     // Save version information fetched
498//!     let version = UpdateVersion {
499//!         e_tag: r.meta.e_tag.clone(),
500//!         version: r.meta.version.clone(),
501//!     };
502//!
503//!     // Compute new version of object contents
504//!     let new = do_update(r.bytes().await.unwrap());
505//!
506//!     // Attempt to commit transaction
507//!     match store.put_opts(&path, new.into(), PutMode::Update(version).into()).await {
508//!         Ok(_) => break, // Successfully committed
509//!         Err(Error::Precondition { .. }) => continue, // Object has changed, try again
510//!         Err(e) => panic!("{e}")
511//!     }
512//! }
513//! # }
514//! ```
515//!
516//! [Optimistic Concurrency Control]: https://en.wikipedia.org/wiki/Optimistic_concurrency_control
517//! [Apache Iceberg]: https://iceberg.apache.org/
518//! [Delta Lake]: https://delta.io/
519//!
520//! # TLS Certificates
521//!
522//! Stores that use HTTPS/TLS (this is true for most cloud stores) can choose the source of their [CA]
523//! certificates. By default the system-bundled certificates are used (see
524//! [`rustls-native-certs`]). The `tls-webpki-roots` feature switch can be used to also bundle Mozilla's
525//! root certificates with the library/application (see [`webpki-roots`]).
526//!
527//! [CA]: https://en.wikipedia.org/wiki/Certificate_authority
528//! [`rustls-native-certs`]: https://crates.io/crates/rustls-native-certs/
529//! [`webpki-roots`]: https://crates.io/crates/webpki-roots
530//!
531//! # Customizing HTTP Clients
532//!
533//! Many [`ObjectStore`] implementations permit customization of the HTTP client via
534//! the [`HttpConnector`] trait and utilities in the [`client`] module.
535//! Examples include injecting custom HTTP headers or using an alternate
536//! tokio Runtime I/O requests.
537//!
538//! [`HttpConnector`]: client::HttpConnector
539
540#[cfg(feature = "aws")]
541pub mod aws;
542#[cfg(feature = "azure")]
543pub mod azure;
544#[cfg(feature = "tokio")]
545pub mod buffered;
546#[cfg(not(target_arch = "wasm32"))]
547pub mod chunked;
548pub mod delimited;
549#[cfg(feature = "gcp")]
550pub mod gcp;
551#[cfg(feature = "http")]
552pub mod http;
553#[cfg(feature = "tokio")]
554pub mod limit;
555#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
556pub mod local;
557pub mod memory;
558pub mod path;
559pub mod prefix;
560pub mod registry;
561#[cfg(feature = "cloud")]
562pub mod signer;
563#[cfg(feature = "tokio")]
564pub mod throttle;
565
566#[cfg(feature = "cloud")]
567pub mod client;
568
569#[cfg(feature = "cloud")]
570pub use client::{
571    ClientConfigKey, ClientOptions, CredentialProvider, StaticCredentialProvider,
572    backoff::BackoffConfig, retry::RetryConfig,
573};
574
575#[cfg(all(feature = "cloud", not(target_arch = "wasm32")))]
576pub use client::Certificate;
577
578#[cfg(feature = "cloud")]
579mod config;
580
581mod tags;
582
583pub use tags::TagSet;
584
585pub mod list;
586pub mod multipart;
587mod parse;
588mod payload;
589mod upload;
590mod util;
591
592mod attributes;
593
594#[cfg(any(feature = "integration", test))]
595pub mod integration;
596
597pub use attributes::*;
598
599pub use parse::{ObjectStoreScheme, parse_url, parse_url_opts};
600pub use payload::*;
601pub use upload::*;
602pub use util::{GetRange, OBJECT_STORE_COALESCE_DEFAULT, coalesce_ranges, collect_bytes};
603
604// Re-export HTTP types used in public API
605pub use ::http::{Extensions, HeaderMap, HeaderValue};
606
607use crate::path::Path;
608#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
609use crate::util::maybe_spawn_blocking;
610use async_trait::async_trait;
611use bytes::Bytes;
612use chrono::{DateTime, Utc};
613use futures_util::{StreamExt, TryStreamExt, stream::BoxStream};
614use std::fmt::{Debug, Formatter};
615use std::ops::Range;
616use std::sync::Arc;
617
618/// An alias for a dynamically dispatched object store implementation.
619pub type DynObjectStore = dyn ObjectStore;
620
621/// Id type for multipart uploads.
622pub type MultipartId = String;
623
624/// Universal API for object store services.
625///
626/// See the [module-level documentation](crate) for a high level overview and
627/// examples. See [`ObjectStoreExt`] for additional convenience methods.
628///
629/// # Contract
630/// This trait is a contract between object store _implementations_
631/// (e.g. providers, wrappers) and the `object_store` crate itself. It is
632/// intended to be the minimum API required for an object store.
633///
634/// The [`ObjectStoreExt`] acts as an API/contract between `object_store` and
635/// the store _users_ and provides additional methods that may be simpler to use
636/// but overlap in functionality with [`ObjectStore`].
637///
638/// # Clone
639/// If a store implements [`Clone`], that will only clone the handle to the underlying data. It will NOT clone/fork the
640/// actual key-value data. Hence, the cloned instance and the original instance share the same state.
641///
642/// # Minimal Default Implementations
643/// There are only a few default implementations for methods in this trait by
644/// design. This was different from versions prior to `0.13.0`, which had many
645/// more default implementations. Default implementations are convenient for
646/// users, but error-prone for implementors as they require keeping the
647/// convenience APIs correctly in sync.
648///
649/// As of version 0.13.0, most methods on [`ObjectStore`] must be implemented, and
650/// the convenience methods have been moved to the [`ObjectStoreExt`] trait as
651/// described above. See [#385] for more details.
652///
653/// [#385]: https://github.com/apache/arrow-rs-object-store/issues/385
654///
655/// # Wrappers
656/// If you wrap an [`ObjectStore`] -- e.g. to add observability -- you SHOULD
657/// implement all trait methods. This ensures that defaults implementations
658/// that are overwritten by the wrapped store are also used by the wrapper.
659/// For example:
660///
661/// ```ignore
662/// struct MyStore {
663///     ...
664/// }
665///
666/// #[async_trait]
667/// impl ObjectStore for MyStore {
668///     // implement custom ranges handling
669///     async fn get_ranges(
670///         &self,
671///         location: &Path,
672///         ranges: &[Range<u64>],
673///     ) -> Result<Vec<Bytes>> {
674///         ...
675///     }
676///
677///     ...
678/// }
679///
680/// struct Wrapper {
681///     inner: Arc<dyn ObjectStore>,
682/// }
683///
684/// #[async_trait]
685/// #[deny(clippy::missing_trait_methods)]
686/// impl ObjectStore for Wrapper {
687///     // If we would not implement this method,
688///     // we would get the trait default and not
689///     // use the actual implementation of `inner`.
690///     async fn get_ranges(
691///         &self,
692///         location: &Path,
693///         ranges: &[Range<u64>],
694///     ) -> Result<Vec<Bytes>> {
695///         ...
696///     }
697///
698///     ...
699/// }
700/// ```
701///
702/// To automatically detect this issue, use
703/// [`#[deny(clippy::missing_trait_methods)]`](https://rust-lang.github.io/rust-clippy/master/index.html#missing_trait_methods).
704///
705/// # Upgrade Guide for 0.13.0
706///
707/// Upgrading to object_store 0.13.0 from an earlier version typically involves:
708///
709/// 1. Add a `use` for [`ObjectStoreExt`] to solve the error
710///
711/// ```text
712/// error[E0599]: no method named `put` found for reference `&dyn object_store::ObjectStore` in the current scope
713///    --> datafusion/datasource/src/url.rs:993:14
714/// ```
715///
716/// 2. Remove any (now) redundant implementations (such as `ObjectStore::put`) from any
717///   `ObjectStore` implementations to resolve the error
718///
719/// ```text
720/// error[E0407]: method `put` is not a member of trait `ObjectStore`
721///     --> datafusion/datasource/src/url.rs:1103:9
722///      |
723/// ```
724///
725/// 3. Convert `ObjectStore::delete` to [`ObjectStore::delete_stream`] (see documentation
726///    on that method for details and examples)
727///
728/// 4. Combine `ObjectStore::copy` and `ObjectStore::copy_if_not_exists` implementations into
729///    [`ObjectStore::copy_opts`] (see documentation on that method for details and examples)
730///
731/// 5. Update `object_store::Error::NotImplemented` to include the name of the missing method
732///
733/// For example, change instances of
734/// ```text
735/// object_store::Error::NotImplemented
736/// ```
737/// to
738/// ```
739/// object_store::Error::NotImplemented {
740///    operation: "put".to_string(),
741///    implementer: "RequestCountingObjectStore".to_string(),
742///  };
743/// ```
744///
745#[async_trait]
746pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
747    /// Save the provided `payload` to `location` with the given options
748    ///
749    /// The operation is guaranteed to be atomic, it will either successfully
750    /// write the entirety of `payload` to `location`, or fail. No clients
751    /// should be able to observe a partially written object
752    async fn put_opts(
753        &self,
754        location: &Path,
755        payload: PutPayload,
756        opts: PutOptions,
757    ) -> Result<PutResult>;
758
759    /// Perform a multipart upload with options
760    ///
761    /// Client should prefer [`ObjectStore::put_opts`] for small payloads, as streaming uploads
762    /// typically require multiple separate requests. See [`MultipartUpload`] for more information
763    ///
764    /// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
765    async fn put_multipart_opts(
766        &self,
767        location: &Path,
768        opts: PutMultipartOptions,
769    ) -> Result<Box<dyn MultipartUpload>>;
770
771    /// Perform a get request with options
772    ///
773    /// ## Example
774    ///
775    /// This example uses a basic local filesystem object store to get an object with a specific etag.
776    /// On the local filesystem, supplying an invalid etag will error.
777    /// Versioned object stores will return the specified object version, if it exists.
778    ///
779    /// ```ignore-wasm32
780    /// # use object_store::local::LocalFileSystem;
781    /// # use tempfile::tempdir;
782    /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt, GetOptions};
783    /// async fn get_opts_example() {
784    ///     let tmp = tempdir().unwrap();
785    ///     let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
786    ///     let location = Path::from("example.txt");
787    ///     let content = b"Hello, Object Store!";
788    ///
789    ///     // Put the object into the store
790    ///     store
791    ///         .put(&location, content.as_ref().into())
792    ///         .await
793    ///         .expect("Failed to put object");
794    ///
795    ///     // Get the object from the store to figure out the right etag
796    ///     let result: object_store::GetResult = store.get(&location).await.expect("Failed to get object");
797    ///
798    ///     let etag = result.meta.e_tag.expect("ETag should be present");
799    ///
800    ///     // Get the object from the store with range and etag
801    ///     let bytes = store
802    ///         .get_opts(
803    ///             &location,
804    ///             GetOptions::new()
805    ///                 .with_if_match(Some(etag.clone())),
806    ///         )
807    ///         .await
808    ///         .expect("Failed to get object with range and etag")
809    ///         .bytes()
810    ///         .await
811    ///         .expect("Failed to read bytes");
812    ///
813    ///     println!(
814    ///         "Retrieved with ETag {}: {}",
815    ///         etag,
816    ///         String::from_utf8_lossy(&bytes)
817    ///     );
818    ///
819    ///     // Show that if the etag does not match, we get an error
820    ///     let wrong_etag = "wrong-etag".to_string();
821    ///     match store
822    ///         .get_opts(
823    ///             &location,
824    ///             GetOptions::new().with_if_match(Some(wrong_etag))
825    ///         )
826    ///         .await
827    ///     {
828    ///         Ok(_) => println!("Unexpectedly succeeded with wrong ETag"),
829    ///         Err(e) => println!("On a non-versioned object store, getting an invalid ETag ('wrong-etag') results in an error as expected: {}", e),
830    ///     }
831    /// }
832    /// ```
833    ///
834    /// To retrieve a range of bytes from a versioned object, specify the range in the [`GetOptions`] supplied to this method.
835    ///
836    /// ```ignore-wasm32
837    /// # use object_store::local::LocalFileSystem;
838    /// # use tempfile::tempdir;
839    /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt, GetOptions};
840    /// async fn get_opts_range_example() {
841    ///     let tmp = tempdir().unwrap();
842    ///     let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
843    ///     let location = Path::from("example.txt");
844    ///     let content = b"Hello, Object Store!";
845    ///
846    ///     // Put the object into the store
847    ///     store
848    ///         .put(&location, content.as_ref().into())
849    ///         .await
850    ///         .expect("Failed to put object");
851    ///
852    ///     // Get the object from the store to figure out the right etag
853    ///     let result: object_store::GetResult = store.get(&location).await.expect("Failed to get object");
854    ///
855    ///     let etag = result.meta.e_tag.expect("ETag should be present");
856    ///
857    ///     // Get the object from the store with range and etag
858    ///     let bytes = store
859    ///         .get_opts(
860    ///             &location,
861    ///             GetOptions::new()
862    ///                 .with_range(Some(0..5))
863    ///                 .with_if_match(Some(etag.clone())),
864    ///         )
865    ///         .await
866    ///         .expect("Failed to get object with range and etag")
867    ///         .bytes()
868    ///         .await
869    ///         .expect("Failed to read bytes");
870    ///
871    ///     println!(
872    ///         "Retrieved range [0-5] with ETag {}: {}",
873    ///         etag,
874    ///         String::from_utf8_lossy(&bytes)
875    ///     );
876    ///
877    ///     // Show that if the etag does not match, we get an error
878    ///     let wrong_etag = "wrong-etag".to_string();
879    ///     match store
880    ///         .get_opts(
881    ///             &location,
882    ///             GetOptions::new().with_range(Some(0..5)).with_if_match(Some(wrong_etag))
883    ///         )
884    ///         .await
885    ///     {
886    ///         Ok(_) => println!("Unexpectedly succeeded with wrong ETag"),
887    ///         Err(e) => println!("On a non-versioned object store, getting an invalid ETag ('wrong-etag') results in an error as expected: {}", e),
888    ///     }
889    /// }
890    /// ```
891    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
892
893    /// Return the bytes that are stored at the specified location
894    /// in the given byte ranges
895    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
896        coalesce_ranges(
897            ranges,
898            |range| self.get_range(location, range),
899            OBJECT_STORE_COALESCE_DEFAULT,
900        )
901        .await
902    }
903
904    /// Delete all the objects at the specified locations
905    ///
906    /// When supported, this method will use bulk operations that delete more
907    /// than one object per a request. Otherwise, the implementation may call
908    /// the single object delete method for each location.
909    ///
910    /// # Bulk Delete Support
911    ///
912    /// The following backends support native bulk delete operations:
913    ///
914    /// - **AWS (S3)**: Uses the native [DeleteObjects] API with batches of up to 1000 objects
915    /// - **Azure**: Uses the native [Blob Batch] API with batches of up to 256 objects
916    ///
917    /// The following backends use concurrent individual delete operations:
918    ///
919    /// - **GCP**: Performs individual delete requests with up to 10 concurrent operations
920    /// - **HTTP**: Performs individual delete requests with up to 10 concurrent operations
921    /// - **Local**: Performs individual file deletions with up to 10 concurrent operations
922    /// - **Memory**: Performs individual in-memory deletions sequentially
923    ///
924    /// [DeleteObjects]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
925    /// [Blob Batch]: https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch
926    ///
927    /// The returned stream yields the results of the delete operations in the
928    /// same order as the input locations. However, some errors will be from
929    /// an overall call to a bulk delete operation, and not from a specific
930    /// location.
931    ///
932    /// If the object did not exist, the result may be an error or a success,
933    /// depending on the behavior of the underlying store. For example, local
934    /// filesystems, GCP, and Azure return an error, while S3 and in-memory will
935    /// return Ok. If it is an error, it will be [`Error::NotFound`].
936    ///
937    /// ```ignore-wasm32
938    /// # use futures_util::{StreamExt, TryStreamExt};
939    /// # use object_store::local::LocalFileSystem;
940    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
941    /// # let root = tempfile::TempDir::new().unwrap();
942    /// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
943    /// # use object_store::{ObjectStore, ObjectStoreExt, ObjectMeta};
944    /// # use object_store::path::Path;
945    /// # use futures_util::{StreamExt, TryStreamExt};
946    /// #
947    /// // Create two objects
948    /// store.put(&Path::from("foo"), "foo".into()).await?;
949    /// store.put(&Path::from("bar"), "bar".into()).await?;
950    ///
951    /// // List object
952    /// let locations = store.list(None).map_ok(|m| m.location).boxed();
953    ///
954    /// // Delete them
955    /// store.delete_stream(locations).try_collect::<Vec<Path>>().await?;
956    /// # Ok(())
957    /// # }
958    /// # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
959    /// # rt.block_on(example()).unwrap();
960    /// ```
961    ///
962    /// Note: Before version 0.13, `delete_stream` has a default implementation
963    /// that deletes each object with up to 10 concurrent requests. This default
964    /// behavior has been removed, and each implementation must now provide its
965    /// own `delete_stream` implementation explicitly. The following example
966    /// shows how to implement `delete_stream` to get the previous default
967    /// behavior.
968    ///
969    /// ```
970    /// # use async_trait::async_trait;
971    /// # use futures_util::stream::{BoxStream, StreamExt};
972    /// # use object_store::path::Path;
973    /// # use object_store::{
974    /// #     CopyOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
975    /// #     PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
976    /// # };
977    /// # use std::fmt;
978    /// # use std::fmt::Debug;
979    /// # use std::sync::Arc;
980    /// #
981    /// # struct ExampleClient;
982    /// #
983    /// # impl ExampleClient {
984    /// #     async fn delete(&self, _path: &Path) -> Result<()> {
985    /// #         Ok(())
986    /// #     }
987    /// # }
988    /// #
989    /// # struct ExampleStore {
990    /// #     client: Arc<ExampleClient>,
991    /// # }
992    /// #
993    /// # impl Debug for ExampleStore {
994    /// #     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
995    /// #         write!(f, "ExampleStore")
996    /// #     }
997    /// # }
998    /// #
999    /// # impl fmt::Display for ExampleStore {
1000    /// #     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1001    /// #         write!(f, "ExampleStore")
1002    /// #     }
1003    /// # }
1004    /// #
1005    /// # #[async_trait]
1006    /// # impl ObjectStore for ExampleStore {
1007    /// #     async fn put_opts(&self, _: &Path, _: PutPayload, _: PutOptions) -> Result<PutResult> {
1008    /// #         todo!()
1009    /// #     }
1010    /// #
1011    /// #     async fn put_multipart_opts(
1012    /// #         &self,
1013    /// #         _: &Path,
1014    /// #         _: PutMultipartOptions,
1015    /// #     ) -> Result<Box<dyn MultipartUpload>> {
1016    /// #         todo!()
1017    /// #     }
1018    /// #
1019    /// #     async fn get_opts(&self, _: &Path, _: GetOptions) -> Result<GetResult> {
1020    /// #         todo!()
1021    /// #     }
1022    /// #
1023    /// fn delete_stream(
1024    ///     &self,
1025    ///     locations: BoxStream<'static, Result<Path>>,
1026    /// ) -> BoxStream<'static, Result<Path>> {
1027    ///     let client = Arc::clone(&self.client);
1028    ///     locations
1029    ///         .map(move |location| {
1030    ///             let client = Arc::clone(&client);
1031    ///             async move {
1032    ///                 let location = location?;
1033    ///                 client.delete(&location).await?;
1034    ///                 Ok(location)
1035    ///             }
1036    ///         })
1037    ///         .buffered(10)
1038    ///         .boxed()
1039    /// }
1040    /// #
1041    /// #     fn list(&self, _: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
1042    /// #         todo!()
1043    /// #     }
1044    /// #
1045    /// #     async fn list_with_delimiter(&self, _: Option<&Path>) -> Result<ListResult> {
1046    /// #         todo!()
1047    /// #     }
1048    /// #
1049    /// #     async fn copy_opts(&self, _: &Path, _: &Path, _: CopyOptions) -> Result<()> {
1050    /// #         todo!()
1051    /// #     }
1052    /// # }
1053    /// #
1054    /// # async fn example() {
1055    /// #     let store = ExampleStore { client: Arc::new(ExampleClient) };
1056    /// #     let paths = futures_util::stream::iter(vec![Ok(Path::from("foo")), Ok(Path::from("bar"))]).boxed();
1057    /// #     let results = store.delete_stream(paths).collect::<Vec<_>>().await;
1058    /// #     assert_eq!(results.len(), 2);
1059    /// #     assert_eq!(results[0].as_ref().unwrap(), &Path::from("foo"));
1060    /// #     assert_eq!(results[1].as_ref().unwrap(), &Path::from("bar"));
1061    /// # }
1062    /// #
1063    /// # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
1064    /// # rt.block_on(example());
1065    /// ```
1066    fn delete_stream(
1067        &self,
1068        locations: BoxStream<'static, Result<Path>>,
1069    ) -> BoxStream<'static, Result<Path>>;
1070
1071    /// List all the objects with the given prefix.
1072    ///
1073    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar` is a prefix of `foo/bar/x` but not of
1074    /// `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x` will be included.
1075    ///
1076    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
1077    ///
1078    /// For more advanced listing see [`PaginatedListStore`](list::PaginatedListStore)
1079    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>;
1080
1081    /// List all the objects with the given prefix and a location greater than `offset`
1082    ///
1083    /// Some stores, such as S3 and GCS, may be able to push `offset` down to reduce
1084    /// the number of network requests required.
1085    ///
1086    /// This returns an exclusive offset, i.e. objects at exactly `offset` will not be included.
1087    ///
1088    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
1089    ///
1090    /// For more advanced listing see [`PaginatedListStore`](list::PaginatedListStore)
1091    fn list_with_offset(
1092        &self,
1093        prefix: Option<&Path>,
1094        offset: &Path,
1095    ) -> BoxStream<'static, Result<ObjectMeta>> {
1096        let offset = offset.clone();
1097        self.list(prefix)
1098            .try_filter(move |f| futures_util::future::ready(f.location > offset))
1099            .boxed()
1100    }
1101
1102    /// List objects with the given prefix and an implementation specific
1103    /// delimiter. Returns common prefixes (directories) in addition to object
1104    /// metadata.
1105    ///
1106    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar` is a prefix of `foo/bar/x` but not of
1107    /// `foo/bar_baz/x`. List is not recursive, i.e. `foo/bar/more/x` will not be included.
1108    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
1109
1110    /// Copy an object from one path to another in the same object store.
1111    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()>;
1112
1113    /// Move an object from one path to another in the same object store.
1114    ///
1115    /// By default, this is implemented as a copy and then delete source. It may not
1116    /// check when deleting source that it was the same object that was originally copied.
1117    async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
1118        let RenameOptions {
1119            target_mode,
1120            extensions,
1121        } = options;
1122        let copy_mode = match target_mode {
1123            RenameTargetMode::Overwrite => CopyMode::Overwrite,
1124            RenameTargetMode::Create => CopyMode::Create,
1125        };
1126        let copy_options = CopyOptions {
1127            mode: copy_mode,
1128            extensions,
1129        };
1130        self.copy_opts(from, to, copy_options).await?;
1131        self.delete(from).await?;
1132        Ok(())
1133    }
1134}
1135
1136macro_rules! as_ref_impl {
1137    ($type:ty) => {
1138        #[async_trait]
1139        #[deny(clippy::missing_trait_methods)]
1140        impl<T: ObjectStore + ?Sized> ObjectStore for $type {
1141            async fn put_opts(
1142                &self,
1143                location: &Path,
1144                payload: PutPayload,
1145                opts: PutOptions,
1146            ) -> Result<PutResult> {
1147                self.as_ref().put_opts(location, payload, opts).await
1148            }
1149
1150            async fn put_multipart_opts(
1151                &self,
1152                location: &Path,
1153                opts: PutMultipartOptions,
1154            ) -> Result<Box<dyn MultipartUpload>> {
1155                self.as_ref().put_multipart_opts(location, opts).await
1156            }
1157
1158            async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
1159                self.as_ref().get_opts(location, options).await
1160            }
1161
1162            async fn get_ranges(
1163                &self,
1164                location: &Path,
1165                ranges: &[Range<u64>],
1166            ) -> Result<Vec<Bytes>> {
1167                self.as_ref().get_ranges(location, ranges).await
1168            }
1169
1170            fn delete_stream(
1171                &self,
1172                locations: BoxStream<'static, Result<Path>>,
1173            ) -> BoxStream<'static, Result<Path>> {
1174                self.as_ref().delete_stream(locations)
1175            }
1176
1177            fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
1178                self.as_ref().list(prefix)
1179            }
1180
1181            fn list_with_offset(
1182                &self,
1183                prefix: Option<&Path>,
1184                offset: &Path,
1185            ) -> BoxStream<'static, Result<ObjectMeta>> {
1186                self.as_ref().list_with_offset(prefix, offset)
1187            }
1188
1189            async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
1190                self.as_ref().list_with_delimiter(prefix).await
1191            }
1192
1193            async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
1194                self.as_ref().copy_opts(from, to, options).await
1195            }
1196
1197            async fn rename_opts(
1198                &self,
1199                from: &Path,
1200                to: &Path,
1201                options: RenameOptions,
1202            ) -> Result<()> {
1203                self.as_ref().rename_opts(from, to, options).await
1204            }
1205        }
1206    };
1207}
1208
1209as_ref_impl!(Arc<T>);
1210as_ref_impl!(Box<T>);
1211
1212/// Extension trait for [`ObjectStore`] with convenience functions.
1213///
1214/// See the [module-level documentation](crate) for a high level overview and
1215/// examples. See "contract" section within the [`ObjectStore`] documentation
1216/// for more reasoning.
1217///
1218/// # Implementation
1219/// You MUST NOT implement this trait yourself. It is automatically implemented for all [`ObjectStore`] implementations.
1220pub trait ObjectStoreExt: ObjectStore {
1221    /// Save the provided bytes to the specified location
1222    ///
1223    /// The operation is guaranteed to be atomic, it will either successfully
1224    /// write the entirety of `payload` to `location`, or fail. No clients
1225    /// should be able to observe a partially written object
1226    fn put(&self, location: &Path, payload: PutPayload) -> impl Future<Output = Result<PutResult>>;
1227
1228    /// Perform a multipart upload
1229    ///
1230    /// Client should prefer [`ObjectStoreExt::put`] for small payloads, as streaming uploads
1231    /// typically require multiple separate requests. See [`MultipartUpload`] for more information
1232    ///
1233    /// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
1234    fn put_multipart(
1235        &self,
1236        location: &Path,
1237    ) -> impl Future<Output = Result<Box<dyn MultipartUpload>>>;
1238
1239    /// Return the bytes that are stored at the specified location.
1240    ///
1241    /// ## Example
1242    ///
1243    /// This example uses a basic local filesystem object store to get an object.
1244    ///
1245    /// ```ignore-wasm32
1246    /// # use object_store::local::LocalFileSystem;
1247    /// # use tempfile::tempdir;
1248    /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
1249    /// async fn get_example() {
1250    ///     let tmp = tempdir().unwrap();
1251    ///     let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
1252    ///     let location = Path::from("example.txt");
1253    ///     let content = b"Hello, Object Store!";
1254    ///
1255    ///     // Put the object into the store
1256    ///     store
1257    ///         .put(&location, content.as_ref().into())
1258    ///         .await
1259    ///         .expect("Failed to put object");
1260    ///
1261    ///     // Get the object from the store
1262    ///     let get_result = store.get(&location).await.expect("Failed to get object");
1263    ///     let bytes = get_result.bytes().await.expect("Failed to read bytes");
1264    ///     println!("Retrieved content: {}", String::from_utf8_lossy(&bytes));
1265    /// }
1266    /// ```
1267    fn get(&self, location: &Path) -> impl Future<Output = Result<GetResult>>;
1268
1269    /// Return the bytes that are stored at the specified location
1270    /// in the given byte range.
1271    ///
1272    /// See [`GetRange::Bounded`] for more details on how `range` gets interpreted.
1273    ///
1274    /// To retrieve a range of bytes from a versioned object, use [`ObjectStore::get_opts`] by specifying the range in the [`GetOptions`].
1275    ///
1276    /// ## Examples
1277    ///
1278    /// This example uses a basic local filesystem object store to get a byte range from an object.
1279    ///
1280    /// ```ignore-wasm32
1281    /// # use object_store::local::LocalFileSystem;
1282    /// # use tempfile::tempdir;
1283    /// # use object_store::{path::Path, ObjectStore, ObjectStoreExt};
1284    /// async fn get_range_example() {
1285    ///     let tmp = tempdir().unwrap();
1286    ///     let store = LocalFileSystem::new_with_prefix(tmp.path()).unwrap();
1287    ///     let location = Path::from("example.txt");
1288    ///     let content = b"Hello, Object Store!";
1289    ///
1290    ///     // Put the object into the store
1291    ///     store
1292    ///         .put(&location, content.as_ref().into())
1293    ///         .await
1294    ///         .expect("Failed to put object");
1295    ///
1296    ///     // Get the object from the store
1297    ///     let bytes = store
1298    ///         .get_range(&location, 0..5)
1299    ///         .await
1300    ///         .expect("Failed to get object");
1301    ///     println!("Retrieved range [0-5]: {}", String::from_utf8_lossy(&bytes));
1302    /// }
1303    /// ```
1304    fn get_range(&self, location: &Path, range: Range<u64>) -> impl Future<Output = Result<Bytes>>;
1305
1306    /// Return the metadata for the specified location
1307    fn head(&self, location: &Path) -> impl Future<Output = Result<ObjectMeta>>;
1308
1309    /// Delete the object at the specified location.
1310    fn delete(&self, location: &Path) -> impl Future<Output = Result<()>>;
1311
1312    /// Copy an object from one path to another in the same object store.
1313    ///
1314    /// If there exists an object at the destination, it will be overwritten.
1315    fn copy(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1316
1317    /// Copy an object from one path to another, only if destination is empty.
1318    ///
1319    /// Will return an error if the destination already has an object.
1320    ///
1321    /// Performs an atomic operation if the underlying object storage supports it.
1322    /// If atomic operations are not supported by the underlying object storage (like S3)
1323    /// it will return an error.
1324    fn copy_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1325
1326    /// Move an object from one path to another in the same object store.
1327    ///
1328    /// By default, this is implemented as a copy and then delete source. It may not
1329    /// check when deleting source that it was the same object that was originally copied.
1330    ///
1331    /// If there exists an object at the destination, it will be overwritten.
1332    fn rename(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1333
1334    /// Move an object from one path to another in the same object store.
1335    ///
1336    /// Will return an error if the destination already has an object.
1337    fn rename_if_not_exists(&self, from: &Path, to: &Path) -> impl Future<Output = Result<()>>;
1338}
1339
1340impl<T> ObjectStoreExt for T
1341where
1342    T: ObjectStore + ?Sized,
1343{
1344    async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
1345        self.put_opts(location, payload, PutOptions::default())
1346            .await
1347    }
1348
1349    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
1350        self.put_multipart_opts(location, PutMultipartOptions::default())
1351            .await
1352    }
1353
1354    async fn get(&self, location: &Path) -> Result<GetResult> {
1355        self.get_opts(location, GetOptions::default()).await
1356    }
1357
1358    async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
1359        let options = GetOptions::new().with_range(Some(range));
1360        self.get_opts(location, options).await?.bytes().await
1361    }
1362
1363    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
1364        let options = GetOptions::new().with_head(true);
1365        Ok(self.get_opts(location, options).await?.meta)
1366    }
1367
1368    async fn delete(&self, location: &Path) -> Result<()> {
1369        let location = location.clone();
1370        let mut stream =
1371            self.delete_stream(futures_util::stream::once(async move { Ok(location) }).boxed());
1372        let _path = stream.try_next().await?.ok_or_else(|| Error::Generic {
1373            store: "ext",
1374            source: "`delete_stream` with one location should yield once but didn't".into(),
1375        })?;
1376        if stream.next().await.is_some() {
1377            Err(Error::Generic {
1378                store: "ext",
1379                source: "`delete_stream` with one location expected to yield exactly once, but yielded more than once".into(),
1380            })
1381        } else {
1382            Ok(())
1383        }
1384    }
1385
1386    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
1387        let options = CopyOptions::new().with_mode(CopyMode::Overwrite);
1388        self.copy_opts(from, to, options).await
1389    }
1390
1391    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
1392        let options = CopyOptions::new().with_mode(CopyMode::Create);
1393        self.copy_opts(from, to, options).await
1394    }
1395
1396    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
1397        let options = RenameOptions::new().with_target_mode(RenameTargetMode::Overwrite);
1398        self.rename_opts(from, to, options).await
1399    }
1400
1401    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
1402        let options = RenameOptions::new().with_target_mode(RenameTargetMode::Create);
1403        self.rename_opts(from, to, options).await
1404    }
1405}
1406
1407/// Result of a list call that includes objects, prefixes (directories) and a
1408/// token for the next set of results. Individual result sets may be limited to
1409/// 1,000 objects based on the underlying object storage's limitations.
1410#[derive(Debug)]
1411pub struct ListResult {
1412    /// Prefixes that are common (like directories)
1413    pub common_prefixes: Vec<Path>,
1414    /// Object metadata for the listing
1415    pub objects: Vec<ObjectMeta>,
1416}
1417
1418/// The metadata that describes an object.
1419#[derive(Debug, Clone, PartialEq, Eq)]
1420pub struct ObjectMeta {
1421    /// The full path to the object
1422    pub location: Path,
1423    /// The last modified time
1424    pub last_modified: DateTime<Utc>,
1425    /// The size in bytes of the object.
1426    ///
1427    /// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM
1428    pub size: u64,
1429    /// The unique identifier for the object
1430    ///
1431    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
1432    pub e_tag: Option<String>,
1433    /// A version indicator for this object
1434    pub version: Option<String>,
1435}
1436
1437/// Options for a get request, such as range
1438#[derive(Debug, Default, Clone)]
1439pub struct GetOptions {
1440    /// Request will succeed if the `ObjectMeta::e_tag` matches
1441    /// otherwise returning [`Error::Precondition`]
1442    ///
1443    /// See <https://datatracker.ietf.org/doc/html/rfc9110#name-if-match>
1444    ///
1445    /// Examples:
1446    ///
1447    /// ```text
1448    /// If-Match: "xyzzy"
1449    /// If-Match: "xyzzy", "r2d2xxxx", "c3piozzzz"
1450    /// If-Match: *
1451    /// ```
1452    pub if_match: Option<String>,
1453    /// Request will succeed if the `ObjectMeta::e_tag` does not match
1454    /// otherwise returning [`Error::NotModified`]
1455    ///
1456    /// See <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.2>
1457    ///
1458    /// Examples:
1459    ///
1460    /// ```text
1461    /// If-None-Match: "xyzzy"
1462    /// If-None-Match: "xyzzy", "r2d2xxxx", "c3piozzzz"
1463    /// If-None-Match: *
1464    /// ```
1465    pub if_none_match: Option<String>,
1466    /// Request will succeed if the object has been modified since
1467    ///
1468    /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.3>
1469    pub if_modified_since: Option<DateTime<Utc>>,
1470    /// Request will succeed if the object has not been modified since
1471    /// otherwise returning [`Error::Precondition`]
1472    ///
1473    /// Some stores, such as S3, will only return `NotModified` for exact
1474    /// timestamp matches, instead of for any timestamp greater than or equal.
1475    ///
1476    /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.4>
1477    pub if_unmodified_since: Option<DateTime<Utc>>,
1478    /// Request transfer of only the specified range of bytes
1479    /// otherwise returning [`Error::NotModified`]
1480    ///
1481    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
1482    pub range: Option<GetRange>,
1483    /// Request a particular object version
1484    pub version: Option<String>,
1485    /// Request transfer of no content
1486    ///
1487    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-head>
1488    pub head: bool,
1489    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1490    /// that need to pass context-specific information (like tracing spans) via trait methods.
1491    ///
1492    /// These extensions are ignored entirely by backends offered through this crate.
1493    pub extensions: Extensions,
1494}
1495
1496impl GetOptions {
1497    /// Returns an error if the modification conditions on this request are not satisfied
1498    ///
1499    /// <https://datatracker.ietf.org/doc/html/rfc7232#section-6>
1500    pub fn check_preconditions(&self, meta: &ObjectMeta) -> Result<()> {
1501        // The use of the invalid etag "*" means no ETag is equivalent to never matching
1502        let etag = meta.e_tag.as_deref().unwrap_or("*");
1503        let last_modified = meta.last_modified;
1504
1505        if let Some(m) = &self.if_match {
1506            if m != "*" && m.split(',').map(str::trim).all(|x| x != etag) {
1507                return Err(Error::Precondition {
1508                    path: meta.location.to_string(),
1509                    source: format!("{etag} does not match {m}").into(),
1510                });
1511            }
1512        } else if let Some(date) = self.if_unmodified_since {
1513            if last_modified > date {
1514                return Err(Error::Precondition {
1515                    path: meta.location.to_string(),
1516                    source: format!("{date} < {last_modified}").into(),
1517                });
1518            }
1519        }
1520
1521        if let Some(m) = &self.if_none_match {
1522            if m == "*" || m.split(',').map(str::trim).any(|x| x == etag) {
1523                return Err(Error::NotModified {
1524                    path: meta.location.to_string(),
1525                    source: format!("{etag} matches {m}").into(),
1526                });
1527            }
1528        } else if let Some(date) = self.if_modified_since {
1529            if last_modified <= date {
1530                return Err(Error::NotModified {
1531                    path: meta.location.to_string(),
1532                    source: format!("{date} >= {last_modified}").into(),
1533                });
1534            }
1535        }
1536        Ok(())
1537    }
1538
1539    /// Create a new [`GetOptions`]
1540    pub fn new() -> Self {
1541        Self::default()
1542    }
1543
1544    /// Sets the `if_match` condition.
1545    ///
1546    /// See [`GetOptions::if_match`]
1547    #[must_use]
1548    pub fn with_if_match(mut self, etag: Option<impl Into<String>>) -> Self {
1549        self.if_match = etag.map(Into::into);
1550        self
1551    }
1552
1553    /// Sets the `if_none_match` condition.
1554    ///
1555    /// See [`GetOptions::if_none_match`]
1556    #[must_use]
1557    pub fn with_if_none_match(mut self, etag: Option<impl Into<String>>) -> Self {
1558        self.if_none_match = etag.map(Into::into);
1559        self
1560    }
1561
1562    /// Sets the `if_modified_since` condition.
1563    ///
1564    /// See [`GetOptions::if_modified_since`]
1565    #[must_use]
1566    pub fn with_if_modified_since(mut self, dt: Option<impl Into<DateTime<Utc>>>) -> Self {
1567        self.if_modified_since = dt.map(Into::into);
1568        self
1569    }
1570
1571    /// Sets the `if_unmodified_since` condition.
1572    ///
1573    /// See [`GetOptions::if_unmodified_since`]
1574    #[must_use]
1575    pub fn with_if_unmodified_since(mut self, dt: Option<impl Into<DateTime<Utc>>>) -> Self {
1576        self.if_unmodified_since = dt.map(Into::into);
1577        self
1578    }
1579
1580    /// Sets the `range` condition.
1581    ///
1582    /// See [`GetOptions::range`]
1583    #[must_use]
1584    pub fn with_range(mut self, range: Option<impl Into<GetRange>>) -> Self {
1585        self.range = range.map(Into::into);
1586        self
1587    }
1588
1589    /// Sets the `version` condition.
1590    ///
1591    /// See [`GetOptions::version`]
1592    #[must_use]
1593    pub fn with_version(mut self, version: Option<impl Into<String>>) -> Self {
1594        self.version = version.map(Into::into);
1595        self
1596    }
1597
1598    /// Sets the `head` condition.
1599    ///
1600    /// See [`GetOptions::head`]
1601    #[must_use]
1602    pub fn with_head(mut self, head: impl Into<bool>) -> Self {
1603        self.head = head.into();
1604        self
1605    }
1606
1607    /// Sets the `extensions` condition.
1608    ///
1609    /// See [`GetOptions::extensions`]
1610    #[must_use]
1611    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1612        self.extensions = extensions;
1613        self
1614    }
1615}
1616
1617/// Result for a get request
1618#[derive(Debug)]
1619pub struct GetResult {
1620    /// The [`GetResultPayload`]
1621    pub payload: GetResultPayload,
1622    /// The [`ObjectMeta`] for this object
1623    pub meta: ObjectMeta,
1624    /// The range of bytes returned by this request
1625    ///
1626    /// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM
1627    pub range: Range<u64>,
1628    /// Additional object attributes
1629    pub attributes: Attributes,
1630}
1631
1632/// The kind of a [`GetResult`]
1633///
1634/// This special cases the case of a local file, as some systems may
1635/// be able to optimise the case of a file already present on local disk
1636pub enum GetResultPayload {
1637    /// The file, path
1638    #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1639    File(std::fs::File, std::path::PathBuf),
1640    /// An opaque stream of bytes
1641    Stream(BoxStream<'static, Result<Bytes>>),
1642}
1643
1644impl Debug for GetResultPayload {
1645    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1646        match self {
1647            #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1648            Self::File(_, _) => write!(f, "GetResultPayload(File)"),
1649            Self::Stream(_) => write!(f, "GetResultPayload(Stream)"),
1650        }
1651    }
1652}
1653
1654impl GetResult {
1655    /// Collects the data into a [`Bytes`]
1656    pub async fn bytes(self) -> Result<Bytes> {
1657        let len = self.range.end - self.range.start;
1658        match self.payload {
1659            #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1660            GetResultPayload::File(mut file, path) => {
1661                maybe_spawn_blocking(move || {
1662                    use crate::local::read_range;
1663
1664                    let buffer = read_range(&mut file, &path, self.range)?;
1665
1666                    Ok(buffer)
1667                })
1668                .await
1669            }
1670            GetResultPayload::Stream(s) => collect_bytes(s, Some(len)).await,
1671        }
1672    }
1673
1674    /// Converts this into a byte stream
1675    ///
1676    /// If the `self.kind` is [`GetResultPayload::File`] will perform chunked reads of the file,
1677    /// otherwise will return the [`GetResultPayload::Stream`].
1678    ///
1679    /// # Tokio Compatibility
1680    ///
1681    /// Tokio discourages performing blocking IO on a tokio worker thread, however,
1682    /// no major operating systems have stable async file APIs. Therefore if called from
1683    /// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
1684    /// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
1685    ///
1686    /// If not called from a tokio context, this will perform IO on the current thread with
1687    /// no additional complexity or overheads
1688    pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
1689        match self.payload {
1690            #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1691            GetResultPayload::File(file, path) => {
1692                const CHUNK_SIZE: usize = 8 * 1024;
1693                local::chunked_stream(file, path, self.range, CHUNK_SIZE)
1694            }
1695            GetResultPayload::Stream(s) => s,
1696        }
1697    }
1698}
1699
1700/// Configure preconditions for the put operation
1701#[derive(Debug, Clone, PartialEq, Eq, Default)]
1702pub enum PutMode {
1703    /// Perform an atomic write operation, overwriting any object present at the provided path
1704    #[default]
1705    Overwrite,
1706    /// Perform an atomic write operation, returning [`Error::AlreadyExists`] if an
1707    /// object already exists at the provided path
1708    Create,
1709    /// Perform an atomic write operation if the current version of the object matches the
1710    /// provided [`UpdateVersion`], returning [`Error::Precondition`] otherwise
1711    Update(UpdateVersion),
1712}
1713
1714/// Uniquely identifies a version of an object to update
1715///
1716/// Stores will use differing combinations of `e_tag` and `version` to provide conditional
1717/// updates, and it is therefore recommended applications preserve both
1718#[derive(Debug, Clone, PartialEq, Eq)]
1719pub struct UpdateVersion {
1720    /// The unique identifier for the newly created object
1721    ///
1722    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
1723    pub e_tag: Option<String>,
1724    /// A version indicator for the newly created object
1725    pub version: Option<String>,
1726}
1727
1728impl From<PutResult> for UpdateVersion {
1729    fn from(value: PutResult) -> Self {
1730        Self {
1731            e_tag: value.e_tag,
1732            version: value.version,
1733        }
1734    }
1735}
1736
1737/// Options for a put request
1738#[derive(Debug, Clone, Default)]
1739pub struct PutOptions {
1740    /// Configure the [`PutMode`] for this operation
1741    pub mode: PutMode,
1742    /// Provide a [`TagSet`] for this object
1743    ///
1744    /// Implementations that don't support object tagging should ignore this
1745    pub tags: TagSet,
1746    /// Provide a set of [`Attributes`]
1747    ///
1748    /// Implementations that don't support an attribute should return an error
1749    pub attributes: Attributes,
1750    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1751    /// that need to pass context-specific information (like tracing spans) via trait methods.
1752    ///
1753    /// These extensions are ignored entirely by backends offered through this crate.
1754    ///
1755    /// They are also excluded from [`PartialEq`] and [`Eq`].
1756    pub extensions: Extensions,
1757}
1758
1759impl PartialEq<Self> for PutOptions {
1760    fn eq(&self, other: &Self) -> bool {
1761        let Self {
1762            mode,
1763            tags,
1764            attributes,
1765            extensions: _,
1766        } = self;
1767        let Self {
1768            mode: other_mode,
1769            tags: other_tags,
1770            attributes: other_attributes,
1771            extensions: _,
1772        } = other;
1773        (mode == other_mode) && (tags == other_tags) && (attributes == other_attributes)
1774    }
1775}
1776
1777impl Eq for PutOptions {}
1778
1779impl From<PutMode> for PutOptions {
1780    fn from(mode: PutMode) -> Self {
1781        Self {
1782            mode,
1783            ..Default::default()
1784        }
1785    }
1786}
1787
1788impl From<TagSet> for PutOptions {
1789    fn from(tags: TagSet) -> Self {
1790        Self {
1791            tags,
1792            ..Default::default()
1793        }
1794    }
1795}
1796
1797impl From<Attributes> for PutOptions {
1798    fn from(attributes: Attributes) -> Self {
1799        Self {
1800            attributes,
1801            ..Default::default()
1802        }
1803    }
1804}
1805
1806// See <https://github.com/apache/arrow-rs-object-store/issues/339>.
1807#[doc(hidden)]
1808#[deprecated(note = "Use PutMultipartOptions", since = "0.12.3")]
1809pub type PutMultipartOpts = PutMultipartOptions;
1810
1811/// Options for [`ObjectStore::put_multipart_opts`]
1812#[derive(Debug, Clone, Default)]
1813pub struct PutMultipartOptions {
1814    /// Provide a [`TagSet`] for this object
1815    ///
1816    /// Implementations that don't support object tagging should ignore this
1817    pub tags: TagSet,
1818    /// Provide a set of [`Attributes`]
1819    ///
1820    /// Implementations that don't support an attribute should return an error
1821    pub attributes: Attributes,
1822    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1823    /// that need to pass context-specific information (like tracing spans) via trait methods.
1824    ///
1825    /// These extensions are ignored entirely by backends offered through this crate.
1826    ///
1827    /// They are also excluded from [`PartialEq`] and [`Eq`].
1828    pub extensions: Extensions,
1829}
1830
1831impl PartialEq<Self> for PutMultipartOptions {
1832    fn eq(&self, other: &Self) -> bool {
1833        let Self {
1834            tags,
1835            attributes,
1836            extensions: _,
1837        } = self;
1838        let Self {
1839            tags: other_tags,
1840            attributes: other_attributes,
1841            extensions: _,
1842        } = other;
1843        (tags == other_tags) && (attributes == other_attributes)
1844    }
1845}
1846
1847impl Eq for PutMultipartOptions {}
1848
1849impl From<TagSet> for PutMultipartOptions {
1850    fn from(tags: TagSet) -> Self {
1851        Self {
1852            tags,
1853            ..Default::default()
1854        }
1855    }
1856}
1857
1858impl From<Attributes> for PutMultipartOptions {
1859    fn from(attributes: Attributes) -> Self {
1860        Self {
1861            attributes,
1862            ..Default::default()
1863        }
1864    }
1865}
1866
1867/// Result for a put request
1868#[derive(Debug, Clone, PartialEq, Eq)]
1869pub struct PutResult {
1870    /// The unique identifier for the newly created object
1871    ///
1872    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
1873    pub e_tag: Option<String>,
1874    /// A version indicator for the newly created object
1875    pub version: Option<String>,
1876}
1877
1878/// Configure preconditions for the copy operation
1879#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1880pub enum CopyMode {
1881    /// Perform an atomic write operation, overwriting any object present at the provided path
1882    #[default]
1883    Overwrite,
1884    /// Perform an atomic write operation, returning [`Error::AlreadyExists`] if an
1885    /// object already exists at the provided path
1886    Create,
1887}
1888
1889/// Options for a copy request
1890#[derive(Debug, Clone, Default)]
1891pub struct CopyOptions {
1892    /// Configure the [`CopyMode`] for this operation
1893    pub mode: CopyMode,
1894    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1895    /// that need to pass context-specific information (like tracing spans) via trait methods.
1896    ///
1897    /// These extensions are ignored entirely by backends offered through this crate.
1898    ///
1899    /// They are also excluded from [`PartialEq`] and [`Eq`].
1900    pub extensions: Extensions,
1901}
1902
1903impl CopyOptions {
1904    /// Create a new [`CopyOptions`]
1905    pub fn new() -> Self {
1906        Self::default()
1907    }
1908
1909    /// Sets the `mode.
1910    ///
1911    /// See [`CopyOptions::mode`].
1912    #[must_use]
1913    pub fn with_mode(mut self, mode: CopyMode) -> Self {
1914        self.mode = mode;
1915        self
1916    }
1917
1918    /// Sets the `extensions`.
1919    ///
1920    /// See [`CopyOptions::extensions`].
1921    #[must_use]
1922    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1923        self.extensions = extensions;
1924        self
1925    }
1926}
1927
1928impl PartialEq<Self> for CopyOptions {
1929    fn eq(&self, other: &Self) -> bool {
1930        let Self {
1931            mode,
1932            extensions: _,
1933        } = self;
1934        let Self {
1935            mode: mode_other,
1936            extensions: _,
1937        } = other;
1938
1939        mode == mode_other
1940    }
1941}
1942
1943impl Eq for CopyOptions {}
1944
1945/// Configure preconditions for the target of rename operation.
1946///
1947/// Note though that the source location may or not be deleted at the same time in an atomic operation. There is
1948/// currently NO flag to control the atomicity of "delete source at the same time as creating the target".
1949#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
1950pub enum RenameTargetMode {
1951    /// Perform a write operation on the target, overwriting any object present at the provided path.
1952    #[default]
1953    Overwrite,
1954    /// Perform an atomic write operation of the target, returning [`Error::AlreadyExists`] if an
1955    /// object already exists at the provided path.
1956    Create,
1957}
1958
1959/// Options for a rename request
1960#[derive(Debug, Clone, Default)]
1961pub struct RenameOptions {
1962    /// Configure the [`RenameTargetMode`] for this operation
1963    pub target_mode: RenameTargetMode,
1964    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1965    /// that need to pass context-specific information (like tracing spans) via trait methods.
1966    ///
1967    /// These extensions are ignored entirely by backends offered through this crate.
1968    ///
1969    /// They are also excluded from [`PartialEq`] and [`Eq`].
1970    pub extensions: Extensions,
1971}
1972
1973impl RenameOptions {
1974    /// Create a new [`RenameOptions`]
1975    pub fn new() -> Self {
1976        Self::default()
1977    }
1978
1979    /// Sets the `target_mode=.
1980    ///
1981    /// See [`RenameOptions::target_mode`].
1982    #[must_use]
1983    pub fn with_target_mode(mut self, target_mode: RenameTargetMode) -> Self {
1984        self.target_mode = target_mode;
1985        self
1986    }
1987
1988    /// Sets the `extensions`.
1989    ///
1990    /// See [`RenameOptions::extensions`].
1991    #[must_use]
1992    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1993        self.extensions = extensions;
1994        self
1995    }
1996}
1997
1998impl PartialEq<Self> for RenameOptions {
1999    fn eq(&self, other: &Self) -> bool {
2000        let Self {
2001            target_mode,
2002            extensions: _,
2003        } = self;
2004        let Self {
2005            target_mode: target_mode_other,
2006            extensions: _,
2007        } = other;
2008
2009        target_mode == target_mode_other
2010    }
2011}
2012
2013impl Eq for RenameOptions {}
2014
2015/// A specialized `Result` for object store-related errors
2016pub type Result<T, E = Error> = std::result::Result<T, E>;
2017
2018/// A specialized `Error` for object store-related errors
2019#[derive(Debug, thiserror::Error)]
2020#[non_exhaustive]
2021pub enum Error {
2022    /// A fallback error type when no variant matches
2023    #[error("Generic {} error: {}", store, source)]
2024    Generic {
2025        /// The store this error originated from
2026        store: &'static str,
2027        /// The wrapped error
2028        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2029    },
2030
2031    /// Error when the object is not found at given location
2032    #[error("Object at location {} not found: {}", path, source)]
2033    NotFound {
2034        /// The path to file
2035        path: String,
2036        /// The wrapped error
2037        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2038    },
2039
2040    /// Error for invalid path
2041    #[error("Encountered object with invalid path: {}", source)]
2042    InvalidPath {
2043        /// The wrapped error
2044        #[from]
2045        source: path::Error,
2046    },
2047
2048    /// Error when `tokio::spawn` failed
2049    #[cfg(feature = "tokio")]
2050    #[error("Error joining spawned task: {}", source)]
2051    JoinError {
2052        /// The wrapped error
2053        #[from]
2054        source: tokio::task::JoinError,
2055    },
2056
2057    /// Error when the attempted operation is not supported
2058    #[error("Operation not supported: {}", source)]
2059    NotSupported {
2060        /// The wrapped error
2061        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2062    },
2063
2064    /// Error when the object already exists
2065    #[error("Object at location {} already exists: {}", path, source)]
2066    AlreadyExists {
2067        /// The path to the
2068        path: String,
2069        /// The wrapped error
2070        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2071    },
2072
2073    /// Error when the required conditions failed for the operation
2074    #[error("Request precondition failure for path {}: {}", path, source)]
2075    Precondition {
2076        /// The path to the file
2077        path: String,
2078        /// The wrapped error
2079        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2080    },
2081
2082    /// Error when the object at the location isn't modified
2083    #[error("Object at location {} not modified: {}", path, source)]
2084    NotModified {
2085        /// The path to the file
2086        path: String,
2087        /// The wrapped error
2088        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2089    },
2090
2091    /// Error when an operation is not implemented
2092    #[error("Operation {operation} not yet implemented by {implementer}.")]
2093    NotImplemented {
2094        /// What isn't implemented. Should include at least the method
2095        /// name that was called; could also include other relevant
2096        /// subcontexts.
2097        operation: String,
2098
2099        /// Which driver this is that hasn't implemented this operation,
2100        /// to aid debugging in contexts that may be using multiple implementations.
2101        implementer: String,
2102    },
2103
2104    /// Error when the used credentials don't have enough permission
2105    /// to perform the requested operation
2106    #[error(
2107        "The operation lacked the necessary privileges to complete for path {}: {}",
2108        path,
2109        source
2110    )]
2111    PermissionDenied {
2112        /// The path to the file
2113        path: String,
2114        /// The wrapped error
2115        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2116    },
2117
2118    /// Error when the used credentials lack valid authentication
2119    #[error(
2120        "The operation lacked valid authentication credentials for path {}: {}",
2121        path,
2122        source
2123    )]
2124    Unauthenticated {
2125        /// The path to the file
2126        path: String,
2127        /// The wrapped error
2128        source: Box<dyn std::error::Error + Send + Sync + 'static>,
2129    },
2130
2131    /// Error when a configuration key is invalid for the store used
2132    #[error("Configuration key: '{}' is not valid for store '{}'.", key, store)]
2133    UnknownConfigurationKey {
2134        /// The object store used
2135        store: &'static str,
2136        /// The configuration key used
2137        key: String,
2138    },
2139}
2140
2141impl From<Error> for std::io::Error {
2142    fn from(e: Error) -> Self {
2143        let kind = match &e {
2144            Error::NotFound { .. } => std::io::ErrorKind::NotFound,
2145            _ => std::io::ErrorKind::Other,
2146        };
2147        Self::new(kind, e)
2148    }
2149}
2150
2151#[cfg(test)]
2152mod tests {
2153    use super::*;
2154
2155    use chrono::TimeZone;
2156
2157    macro_rules! maybe_skip_integration {
2158        () => {
2159            if std::env::var("TEST_INTEGRATION").is_err() {
2160                eprintln!("Skipping integration test - set TEST_INTEGRATION");
2161                return;
2162            }
2163        };
2164    }
2165    pub(crate) use maybe_skip_integration;
2166
2167    /// Test that the returned stream does not borrow the lifetime of Path
2168    fn list_store<'a>(
2169        store: &'a dyn ObjectStore,
2170        path_str: &str,
2171    ) -> BoxStream<'a, Result<ObjectMeta>> {
2172        let path = Path::from(path_str);
2173        store.list(Some(&path))
2174    }
2175
2176    #[cfg(any(feature = "azure", feature = "aws"))]
2177    pub(crate) async fn signing<T>(integration: &T)
2178    where
2179        T: ObjectStore + signer::Signer,
2180    {
2181        use reqwest::Method;
2182        use std::time::Duration;
2183
2184        let data = Bytes::from("hello world");
2185        let path = Path::from("file.txt");
2186        integration.put(&path, data.clone().into()).await.unwrap();
2187
2188        let signed = integration
2189            .signed_url(Method::GET, &path, Duration::from_secs(60))
2190            .await
2191            .unwrap();
2192
2193        let resp = reqwest::get(signed).await.unwrap();
2194        let loaded = resp.bytes().await.unwrap();
2195
2196        assert_eq!(data, loaded);
2197    }
2198
2199    #[cfg(any(feature = "aws", feature = "azure"))]
2200    pub(crate) async fn tagging<F, Fut>(storage: Arc<dyn ObjectStore>, validate: bool, get_tags: F)
2201    where
2202        F: Fn(Path) -> Fut + Send + Sync,
2203        Fut: std::future::Future<Output = Result<client::HttpResponse>> + Send,
2204    {
2205        use bytes::Buf;
2206        use serde::Deserialize;
2207        use tokio::io::AsyncWriteExt;
2208
2209        use crate::buffered::BufWriter;
2210
2211        #[derive(Deserialize)]
2212        struct Tagging {
2213            #[serde(rename = "TagSet")]
2214            list: TagList,
2215        }
2216
2217        #[derive(Debug, Deserialize)]
2218        struct TagList {
2219            #[serde(rename = "Tag")]
2220            tags: Vec<Tag>,
2221        }
2222
2223        #[derive(Debug, Deserialize, Eq, PartialEq)]
2224        #[serde(rename_all = "PascalCase")]
2225        struct Tag {
2226            key: String,
2227            value: String,
2228        }
2229
2230        let tags = vec![
2231            Tag {
2232                key: "foo.com=bar/s".to_string(),
2233                value: "bananas/foo.com-_".to_string(),
2234            },
2235            Tag {
2236                key: "namespace/key.foo".to_string(),
2237                value: "value with a space".to_string(),
2238            },
2239        ];
2240        let mut tag_set = TagSet::default();
2241        for t in &tags {
2242            tag_set.push(&t.key, &t.value)
2243        }
2244
2245        let path = Path::from("tag_test");
2246        storage
2247            .put_opts(&path, "test".into(), tag_set.clone().into())
2248            .await
2249            .unwrap();
2250
2251        let multi_path = Path::from("tag_test_multi");
2252        let mut write = storage
2253            .put_multipart_opts(&multi_path, tag_set.clone().into())
2254            .await
2255            .unwrap();
2256
2257        write.put_part("foo".into()).await.unwrap();
2258        write.complete().await.unwrap();
2259
2260        let buf_path = Path::from("tag_test_buf");
2261        let mut buf = BufWriter::new(storage, buf_path.clone()).with_tags(tag_set);
2262        buf.write_all(b"foo").await.unwrap();
2263        buf.shutdown().await.unwrap();
2264
2265        // Write should always succeed, but certain configurations may simply ignore tags
2266        if !validate {
2267            return;
2268        }
2269
2270        for path in [path, multi_path, buf_path] {
2271            let resp = get_tags(path.clone()).await.unwrap();
2272            let body = resp.into_body().bytes().await.unwrap();
2273
2274            let mut resp: Tagging = quick_xml::de::from_reader(body.reader()).unwrap();
2275            resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key));
2276            assert_eq!(resp.list.tags, tags);
2277        }
2278    }
2279
2280    #[tokio::test]
2281    async fn test_list_lifetimes() {
2282        let store = memory::InMemory::new();
2283        let mut stream = list_store(&store, "path");
2284        assert!(stream.next().await.is_none());
2285    }
2286
2287    #[test]
2288    fn test_preconditions() {
2289        let mut meta = ObjectMeta {
2290            location: Path::from("test"),
2291            last_modified: Utc.timestamp_nanos(100),
2292            size: 100,
2293            e_tag: Some("123".to_string()),
2294            version: None,
2295        };
2296
2297        let mut options = GetOptions::default();
2298        options.check_preconditions(&meta).unwrap();
2299
2300        options.if_modified_since = Some(Utc.timestamp_nanos(50));
2301        options.check_preconditions(&meta).unwrap();
2302
2303        options.if_modified_since = Some(Utc.timestamp_nanos(100));
2304        options.check_preconditions(&meta).unwrap_err();
2305
2306        options.if_modified_since = Some(Utc.timestamp_nanos(101));
2307        options.check_preconditions(&meta).unwrap_err();
2308
2309        options = GetOptions::default();
2310
2311        options.if_unmodified_since = Some(Utc.timestamp_nanos(50));
2312        options.check_preconditions(&meta).unwrap_err();
2313
2314        options.if_unmodified_since = Some(Utc.timestamp_nanos(100));
2315        options.check_preconditions(&meta).unwrap();
2316
2317        options.if_unmodified_since = Some(Utc.timestamp_nanos(101));
2318        options.check_preconditions(&meta).unwrap();
2319
2320        options = GetOptions::default();
2321
2322        options.if_match = Some("123".to_string());
2323        options.check_preconditions(&meta).unwrap();
2324
2325        options.if_match = Some("123,354".to_string());
2326        options.check_preconditions(&meta).unwrap();
2327
2328        options.if_match = Some("354, 123,".to_string());
2329        options.check_preconditions(&meta).unwrap();
2330
2331        options.if_match = Some("354".to_string());
2332        options.check_preconditions(&meta).unwrap_err();
2333
2334        options.if_match = Some("*".to_string());
2335        options.check_preconditions(&meta).unwrap();
2336
2337        // If-Match takes precedence
2338        options.if_unmodified_since = Some(Utc.timestamp_nanos(200));
2339        options.check_preconditions(&meta).unwrap();
2340
2341        options = GetOptions::default();
2342
2343        options.if_none_match = Some("123".to_string());
2344        options.check_preconditions(&meta).unwrap_err();
2345
2346        options.if_none_match = Some("*".to_string());
2347        options.check_preconditions(&meta).unwrap_err();
2348
2349        options.if_none_match = Some("1232".to_string());
2350        options.check_preconditions(&meta).unwrap();
2351
2352        options.if_none_match = Some("23, 123".to_string());
2353        options.check_preconditions(&meta).unwrap_err();
2354
2355        // If-None-Match takes precedence
2356        options.if_modified_since = Some(Utc.timestamp_nanos(10));
2357        options.check_preconditions(&meta).unwrap_err();
2358
2359        // Check missing ETag
2360        meta.e_tag = None;
2361        options = GetOptions::default();
2362
2363        options.if_none_match = Some("*".to_string()); // Fails if any file exists
2364        options.check_preconditions(&meta).unwrap_err();
2365
2366        options = GetOptions::default();
2367        options.if_match = Some("*".to_string()); // Passes if file exists
2368        options.check_preconditions(&meta).unwrap();
2369    }
2370
2371    #[test]
2372    #[cfg(feature = "http")]
2373    fn test_reexported_types() {
2374        // Test HeaderMap
2375        let mut headers = HeaderMap::new();
2376        headers.insert("content-type", HeaderValue::from_static("text/plain"));
2377        assert_eq!(headers.len(), 1);
2378
2379        // Test HeaderValue
2380        let value = HeaderValue::from_static("test-value");
2381        assert_eq!(value.as_bytes(), b"test-value");
2382
2383        // Test Extensions
2384        let mut extensions = Extensions::new();
2385        extensions.insert("test-key");
2386        assert!(extensions.get::<&str>().is_some());
2387    }
2388
2389    #[test]
2390    fn test_get_options_builder() {
2391        let dt = Utc::now();
2392        let extensions = Extensions::new();
2393
2394        let options = GetOptions::new();
2395
2396        // assert defaults
2397        assert_eq!(options.if_match, None);
2398        assert_eq!(options.if_none_match, None);
2399        assert_eq!(options.if_modified_since, None);
2400        assert_eq!(options.if_unmodified_since, None);
2401        assert_eq!(options.range, None);
2402        assert_eq!(options.version, None);
2403        assert!(!options.head);
2404        assert!(options.extensions.get::<&str>().is_none());
2405
2406        let options = options
2407            .with_if_match(Some("etag-match"))
2408            .with_if_none_match(Some("etag-none-match"))
2409            .with_if_modified_since(Some(dt))
2410            .with_if_unmodified_since(Some(dt))
2411            .with_range(Some(0..100))
2412            .with_version(Some("version-1"))
2413            .with_head(true)
2414            .with_extensions(extensions.clone());
2415
2416        assert_eq!(options.if_match, Some("etag-match".to_string()));
2417        assert_eq!(options.if_none_match, Some("etag-none-match".to_string()));
2418        assert_eq!(options.if_modified_since, Some(dt));
2419        assert_eq!(options.if_unmodified_since, Some(dt));
2420        assert_eq!(options.range, Some(GetRange::Bounded(0..100)));
2421        assert_eq!(options.version, Some("version-1".to_string()));
2422        assert!(options.head);
2423        assert_eq!(options.extensions.get::<&str>(), extensions.get::<&str>());
2424    }
2425
2426    fn takes_generic_object_store<T: ObjectStore>(store: T) {
2427        // This function is just to ensure that the trait bounds are satisfied
2428        let _ = store;
2429    }
2430    #[test]
2431    fn test_dyn_impl() {
2432        let store: Arc<dyn ObjectStore> = Arc::new(memory::InMemory::new());
2433        takes_generic_object_store(store);
2434        let store: Box<dyn ObjectStore> = Box::new(memory::InMemory::new());
2435        takes_generic_object_store(store);
2436    }
2437    #[test]
2438    fn test_generic_impl() {
2439        let store = Arc::new(memory::InMemory::new());
2440        takes_generic_object_store(store);
2441        let store = Box::new(memory::InMemory::new());
2442        takes_generic_object_store(store);
2443    }
2444}