object_store/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![cfg_attr(docsrs, feature(doc_cfg))]
19#![deny(rustdoc::broken_intra_doc_links, rustdoc::bare_urls, rust_2018_idioms)]
20#![warn(
21    missing_copy_implementations,
22    missing_debug_implementations,
23    missing_docs,
24    clippy::explicit_iter_loop,
25    clippy::future_not_send,
26    clippy::use_self,
27    clippy::clone_on_ref_ptr,
28    unreachable_pub
29)]
30
31//! # object_store
32//!
33//! This crate provides a uniform API for interacting with object
34//! storage services and local files via the [`ObjectStore`]
35//! trait.
36//!
37//! Using this crate, the same binary and code can run in multiple
38//! clouds and local test environments, via a simple runtime
39//! configuration change.
40//!
41//! # Highlights
42//!
43//! 1. A high-performance async API focused on providing a consistent interface
44//!    mirroring that of object stores such as [S3]
45//!
46//! 2. Production quality, leading this crate to be used in large
47//!    scale production systems, such as [crates.io] and [InfluxDB IOx]
48//!
49//! 3. Support for advanced functionality, including atomic, conditional reads
50//!    and writes, vectored IO, bulk deletion, and more...
51//!
52//! 4. Stable and predictable governance via the [Apache Arrow] project
53//!
54//! 5. Small dependency footprint, depending on only a small number of common crates
55//!
56//! Originally developed by [InfluxData] and subsequently donated
57//! to [Apache Arrow].
58//!
59//! [Apache Arrow]: https://arrow.apache.org/
60//! [InfluxData]: https://www.influxdata.com/
61//! [crates.io]: https://github.com/rust-lang/crates.io
62//! [ACID]: https://en.wikipedia.org/wiki/ACID
63//! [S3]: https://aws.amazon.com/s3/
64//!
65//! # Available [`ObjectStore`] Implementations
66//!
67//! By default, this crate provides the following implementations:
68//!
69//! * Memory: [`InMemory`](memory::InMemory)
70//!
71//! Feature flags are used to enable support for other implementations:
72//!
73#![cfg_attr(
74    feature = "fs",
75    doc = "* Local filesystem: [`LocalFileSystem`](local::LocalFileSystem)"
76)]
77#![cfg_attr(
78    feature = "gcp",
79    doc = "* [`gcp`]: [Google Cloud Storage](https://cloud.google.com/storage/) support. See [`GoogleCloudStorageBuilder`](gcp::GoogleCloudStorageBuilder)"
80)]
81#![cfg_attr(
82    feature = "aws",
83    doc = "* [`aws`]: [Amazon S3](https://aws.amazon.com/s3/). See [`AmazonS3Builder`](aws::AmazonS3Builder)"
84)]
85#![cfg_attr(
86    feature = "azure",
87    doc = "* [`azure`]: [Azure Blob Storage](https://azure.microsoft.com/en-gb/services/storage/blobs/). See [`MicrosoftAzureBuilder`](azure::MicrosoftAzureBuilder)"
88)]
89#![cfg_attr(
90    feature = "http",
91    doc = "* [`http`]: [HTTP/WebDAV Storage](https://datatracker.ietf.org/doc/html/rfc2518). See [`HttpBuilder`](http::HttpBuilder)"
92)]
93//!
94//! # Why not a Filesystem Interface?
95//!
96//! The [`ObjectStore`] interface is designed to mirror the APIs
97//! of object stores and *not* filesystems, and thus has stateless APIs instead
98//! of cursor based interfaces such as [`Read`] or [`Seek`] available in filesystems.
99//!
100//! This design provides the following advantages:
101//!
102//! * All operations are atomic, and readers cannot observe partial and/or failed writes
103//! * Methods map directly to object store APIs, providing both efficiency and predictability
104//! * Abstracts away filesystem and operating system specific quirks, ensuring portability
105//! * Allows for functionality not native to filesystems, such as operation preconditions
106//!   and atomic multipart uploads
107//!
108//! This crate does provide [`BufReader`] and [`BufWriter`] adapters
109//! which provide a more filesystem-like API for working with the
110//! [`ObjectStore`] trait, however, they should be used with care
111//!
112//! [`BufReader`]: buffered::BufReader
113//! [`BufWriter`]: buffered::BufWriter
114//!
115//! # Adapters
116//!
117//! [`ObjectStore`] instances can be composed with various adapters
118//! which add additional functionality:
119//!
120//! * Rate Throttling: [`ThrottleConfig`](throttle::ThrottleConfig)
121//! * Concurrent Request Limit: [`LimitStore`](limit::LimitStore)
122//!
123//! # Configuration System
124//!
125//! This crate provides a configuration system inspired by the APIs exposed by [fsspec],
126//! [PyArrow FileSystem], and [Hadoop FileSystem], allowing creating a [`DynObjectStore`]
127//! from a URL and an optional list of key value pairs. This provides a flexible interface
128//! to support a wide variety of user-defined store configurations, with minimal additional
129//! application complexity.
130//!
131//! ```no_run,ignore-wasm32
132//! # #[cfg(feature = "aws")] {
133//! # use url::Url;
134//! # use object_store::{parse_url, parse_url_opts};
135//! # use object_store::aws::{AmazonS3, AmazonS3Builder};
136//! #
137//! #
138//! // Can manually create a specific store variant using the appropriate builder
139//! let store: AmazonS3 = AmazonS3Builder::from_env()
140//!     .with_bucket_name("my-bucket").build().unwrap();
141//!
142//! // Alternatively can create an ObjectStore from an S3 URL
143//! let url = Url::parse("s3://bucket/path").unwrap();
144//! let (store, path) = parse_url(&url).unwrap();
145//! assert_eq!(path.as_ref(), "path");
146//!
147//! // Potentially with additional options
148//! let (store, path) = parse_url_opts(&url, vec![("aws_access_key_id", "...")]).unwrap();
149//!
150//! // Or with URLs that encode the bucket name in the URL path
151//! let url = Url::parse("https://ACCOUNT_ID.r2.cloudflarestorage.com/bucket/path").unwrap();
152//! let (store, path) = parse_url(&url).unwrap();
153//! assert_eq!(path.as_ref(), "path");
154//! # }
155//! ```
156//!
157//! [PyArrow FileSystem]: https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.from_uri
158//! [fsspec]: https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem
159//! [Hadoop FileSystem]: https://hadoop.apache.org/docs/r3.0.0/api/org/apache/hadoop/fs/FileSystem.html#get-java.net.URI-org.apache.hadoop.conf.Configuration-
160//!
161//! # List objects
162//!
163//! Use the [`ObjectStore::list`] method to iterate over objects in
164//! remote storage or files in the local filesystem:
165//!
166//! ```ignore-wasm32
167//! # use object_store::local::LocalFileSystem;
168//! # use std::sync::Arc;
169//! # use object_store::{path::Path, ObjectStore};
170//! # use futures::stream::StreamExt;
171//! # // use LocalFileSystem for example
172//! # fn get_object_store() -> Arc<dyn ObjectStore> {
173//! #   Arc::new(LocalFileSystem::new())
174//! # }
175//! #
176//! # async fn example() {
177//! #
178//! // create an ObjectStore
179//! let object_store: Arc<dyn ObjectStore> = get_object_store();
180//!
181//! // Recursively list all files below the 'data' path.
182//! // 1. On AWS S3 this would be the 'data/' prefix
183//! // 2. On a local filesystem, this would be the 'data' directory
184//! let prefix = Path::from("data");
185//!
186//! // Get an `async` stream of Metadata objects:
187//! let mut list_stream = object_store.list(Some(&prefix));
188//!
189//! // Print a line about each object
190//! while let Some(meta) = list_stream.next().await.transpose().unwrap() {
191//!     println!("Name: {}, size: {}", meta.location, meta.size);
192//! }
193//! # }
194//! ```
195//!
196//! Which will print out something like the following:
197//!
198//! ```text
199//! Name: data/file01.parquet, size: 112832
200//! Name: data/file02.parquet, size: 143119
201//! Name: data/child/file03.parquet, size: 100
202//! ...
203//! ```
204//!
205//! # Fetch objects
206//!
207//! Use the [`ObjectStore::get`] method to fetch the data bytes
208//! from remote storage or files in the local filesystem as a stream.
209//!
210//! ```ignore-wasm32
211//! # use futures::TryStreamExt;
212//! # use object_store::local::LocalFileSystem;
213//! # use std::sync::Arc;
214//! #  use bytes::Bytes;
215//! # use object_store::{path::Path, ObjectStore, GetResult};
216//! # fn get_object_store() -> Arc<dyn ObjectStore> {
217//! #   Arc::new(LocalFileSystem::new())
218//! # }
219//! #
220//! # async fn example() {
221//! #
222//! // Create an ObjectStore
223//! let object_store: Arc<dyn ObjectStore> = get_object_store();
224//!
225//! // Retrieve a specific file
226//! let path = Path::from("data/file01.parquet");
227//!
228//! // Fetch just the file metadata
229//! let meta = object_store.head(&path).await.unwrap();
230//! println!("{meta:?}");
231//!
232//! // Fetch the object including metadata
233//! let result: GetResult = object_store.get(&path).await.unwrap();
234//! assert_eq!(result.meta, meta);
235//!
236//! // Buffer the entire object in memory
237//! let object: Bytes = result.bytes().await.unwrap();
238//! assert_eq!(object.len() as u64, meta.size);
239//!
240//! // Alternatively stream the bytes from object storage
241//! let stream = object_store.get(&path).await.unwrap().into_stream();
242//!
243//! // Count the '0's using `try_fold` from `TryStreamExt` trait
244//! let num_zeros = stream
245//!     .try_fold(0, |acc, bytes| async move {
246//!         Ok(acc + bytes.iter().filter(|b| **b == 0).count())
247//!     }).await.unwrap();
248//!
249//! println!("Num zeros in {} is {}", path, num_zeros);
250//! # }
251//! ```
252//!
253//! # Put Object
254//!
255//! Use the [`ObjectStore::put`] method to atomically write data.
256//!
257//! ```ignore-wasm32
258//! # use object_store::local::LocalFileSystem;
259//! # use object_store::{ObjectStore, PutPayload};
260//! # use std::sync::Arc;
261//! # use object_store::path::Path;
262//! # fn get_object_store() -> Arc<dyn ObjectStore> {
263//! #   Arc::new(LocalFileSystem::new())
264//! # }
265//! # async fn put() {
266//! #
267//! let object_store: Arc<dyn ObjectStore> = get_object_store();
268//! let path = Path::from("data/file1");
269//! let payload = PutPayload::from_static(b"hello");
270//! object_store.put(&path, payload).await.unwrap();
271//! # }
272//! ```
273//!
274//! # Multipart Upload
275//!
276//! Use the [`ObjectStore::put_multipart`] method to atomically write a large amount of data
277//!
278//! ```ignore-wasm32
279//! # use object_store::local::LocalFileSystem;
280//! # use object_store::{ObjectStore, WriteMultipart};
281//! # use std::sync::Arc;
282//! # use bytes::Bytes;
283//! # use tokio::io::AsyncWriteExt;
284//! # use object_store::path::Path;
285//! # fn get_object_store() -> Arc<dyn ObjectStore> {
286//! #   Arc::new(LocalFileSystem::new())
287//! # }
288//! # async fn multi_upload() {
289//! #
290//! let object_store: Arc<dyn ObjectStore> = get_object_store();
291//! let path = Path::from("data/large_file");
292//! let upload =  object_store.put_multipart(&path).await.unwrap();
293//! let mut write = WriteMultipart::new(upload);
294//! write.write(b"hello");
295//! write.finish().await.unwrap();
296//! # }
297//! ```
298//!
299//! # Vectored Read
300//!
301//! A common pattern, especially when reading structured datasets, is to need to fetch
302//! multiple, potentially non-contiguous, ranges of a particular object.
303//!
304//! [`ObjectStore::get_ranges`] provides an efficient way to perform such vectored IO, and will
305//! automatically coalesce adjacent ranges into an appropriate number of parallel requests.
306//!
307//! ```ignore-wasm32
308//! # use object_store::local::LocalFileSystem;
309//! # use object_store::ObjectStore;
310//! # use std::sync::Arc;
311//! # use bytes::Bytes;
312//! # use tokio::io::AsyncWriteExt;
313//! # use object_store::path::Path;
314//! # fn get_object_store() -> Arc<dyn ObjectStore> {
315//! #   Arc::new(LocalFileSystem::new())
316//! # }
317//! # async fn multi_upload() {
318//! #
319//! let object_store: Arc<dyn ObjectStore> = get_object_store();
320//! let path = Path::from("data/large_file");
321//! let ranges = object_store.get_ranges(&path, &[90..100, 400..600, 0..10]).await.unwrap();
322//! assert_eq!(ranges.len(), 3);
323//! assert_eq!(ranges[0].len(), 10);
324//! # }
325//! ```
326//!
327//! # Vectored Write
328//!
329//! When writing data it is often the case that the size of the output is not known ahead of time.
330//!
331//! A common approach to handling this is to bump-allocate a `Vec`, whereby the underlying
332//! allocation is repeatedly reallocated, each time doubling the capacity. The performance of
333//! this is suboptimal as reallocating memory will often involve copying it to a new location.
334//!
335//! Fortunately, as [`PutPayload`] does not require memory regions to be contiguous, it is
336//! possible to instead allocate memory in chunks and avoid bump allocating. [`PutPayloadMut`]
337//! encapsulates this approach
338//!
339//! ```ignore-wasm32
340//! # use object_store::local::LocalFileSystem;
341//! # use object_store::{ObjectStore, PutPayloadMut};
342//! # use std::sync::Arc;
343//! # use bytes::Bytes;
344//! # use tokio::io::AsyncWriteExt;
345//! # use object_store::path::Path;
346//! # fn get_object_store() -> Arc<dyn ObjectStore> {
347//! #   Arc::new(LocalFileSystem::new())
348//! # }
349//! # async fn multi_upload() {
350//! #
351//! let object_store: Arc<dyn ObjectStore> = get_object_store();
352//! let path = Path::from("data/large_file");
353//! let mut buffer = PutPayloadMut::new().with_block_size(8192);
354//! for _ in 0..22 {
355//!     buffer.extend_from_slice(&[0; 1024]);
356//! }
357//! let payload = buffer.freeze();
358//!
359//! // Payload consists of 3 separate 8KB allocations
360//! assert_eq!(payload.as_ref().len(), 3);
361//! assert_eq!(payload.as_ref()[0].len(), 8192);
362//! assert_eq!(payload.as_ref()[1].len(), 8192);
363//! assert_eq!(payload.as_ref()[2].len(), 6144);
364//!
365//! object_store.put(&path, payload).await.unwrap();
366//! # }
367//! ```
368//!
369//! # Conditional Fetch
370//!
371//! More complex object retrieval can be supported by [`ObjectStore::get_opts`].
372//!
373//! For example, efficiently refreshing a cache without re-fetching the entire object
374//! data if the object hasn't been modified.
375//!
376//! ```
377//! # use std::collections::btree_map::Entry;
378//! # use std::collections::HashMap;
379//! # use object_store::{GetOptions, GetResult, ObjectStore, Result, Error};
380//! # use std::sync::Arc;
381//! # use std::time::{Duration, Instant};
382//! # use bytes::Bytes;
383//! # use tokio::io::AsyncWriteExt;
384//! # use object_store::path::Path;
385//! struct CacheEntry {
386//!     /// Data returned by last request
387//!     data: Bytes,
388//!     /// ETag identifying the object returned by the server
389//!     e_tag: String,
390//!     /// Instant of last refresh
391//!     refreshed_at: Instant,
392//! }
393//!
394//! /// Example cache that checks entries after 10 seconds for a new version
395//! struct Cache {
396//!     entries: HashMap<Path, CacheEntry>,
397//!     store: Arc<dyn ObjectStore>,
398//! }
399//!
400//! impl Cache {
401//!     pub async fn get(&mut self, path: &Path) -> Result<Bytes> {
402//!         Ok(match self.entries.get_mut(path) {
403//!             Some(e) => match e.refreshed_at.elapsed() < Duration::from_secs(10) {
404//!                 true => e.data.clone(), // Return cached data
405//!                 false => { // Check if remote version has changed
406//!                     let opts = GetOptions {
407//!                         if_none_match: Some(e.e_tag.clone()),
408//!                         ..GetOptions::default()
409//!                     };
410//!                     match self.store.get_opts(&path, opts).await {
411//!                         Ok(d) => e.data = d.bytes().await?,
412//!                         Err(Error::NotModified { .. }) => {} // Data has not changed
413//!                         Err(e) => return Err(e),
414//!                     };
415//!                     e.refreshed_at = Instant::now();
416//!                     e.data.clone()
417//!                 }
418//!             },
419//!             None => { // Not cached, fetch data
420//!                 let get = self.store.get(&path).await?;
421//!                 let e_tag = get.meta.e_tag.clone();
422//!                 let data = get.bytes().await?;
423//!                 if let Some(e_tag) = e_tag {
424//!                     let entry = CacheEntry {
425//!                         e_tag,
426//!                         data: data.clone(),
427//!                         refreshed_at: Instant::now(),
428//!                     };
429//!                     self.entries.insert(path.clone(), entry);
430//!                 }
431//!                 data
432//!             }
433//!         })
434//!     }
435//! }
436//! ```
437//!
438//! # Conditional Put
439//!
440//! The default behaviour when writing data is to upsert any existing object at the given path,
441//! overwriting any previous value. More complex behaviours can be achieved using [`PutMode`], and
442//! can be used to build [Optimistic Concurrency Control] based transactions. This facilitates
443//! building metadata catalogs, such as [Apache Iceberg] or [Delta Lake], directly on top of object
444//! storage, without relying on a separate DBMS.
445//!
446//! ```
447//! # use object_store::{Error, ObjectStore, PutMode, UpdateVersion};
448//! # use std::sync::Arc;
449//! # use bytes::Bytes;
450//! # use tokio::io::AsyncWriteExt;
451//! # use object_store::memory::InMemory;
452//! # use object_store::path::Path;
453//! # fn get_object_store() -> Arc<dyn ObjectStore> {
454//! #   Arc::new(InMemory::new())
455//! # }
456//! # fn do_update(b: Bytes) -> Bytes {b}
457//! # async fn conditional_put() {
458//! let store = get_object_store();
459//! let path = Path::from("test");
460//!
461//! // Perform a conditional update on path
462//! loop {
463//!     // Perform get request
464//!     let r = store.get(&path).await.unwrap();
465//!
466//!     // Save version information fetched
467//!     let version = UpdateVersion {
468//!         e_tag: r.meta.e_tag.clone(),
469//!         version: r.meta.version.clone(),
470//!     };
471//!
472//!     // Compute new version of object contents
473//!     let new = do_update(r.bytes().await.unwrap());
474//!
475//!     // Attempt to commit transaction
476//!     match store.put_opts(&path, new.into(), PutMode::Update(version).into()).await {
477//!         Ok(_) => break, // Successfully committed
478//!         Err(Error::Precondition { .. }) => continue, // Object has changed, try again
479//!         Err(e) => panic!("{e}")
480//!     }
481//! }
482//! # }
483//! ```
484//!
485//! [Optimistic Concurrency Control]: https://en.wikipedia.org/wiki/Optimistic_concurrency_control
486//! [Apache Iceberg]: https://iceberg.apache.org/
487//! [Delta Lake]: https://delta.io/
488//!
489//! # TLS Certificates
490//!
491//! Stores that use HTTPS/TLS (this is true for most cloud stores) can choose the source of their [CA]
492//! certificates. By default the system-bundled certificates are used (see
493//! [`rustls-native-certs`]). The `tls-webpki-roots` feature switch can be used to also bundle Mozilla's
494//! root certificates with the library/application (see [`webpki-roots`]).
495//!
496//! [CA]: https://en.wikipedia.org/wiki/Certificate_authority
497//! [`rustls-native-certs`]: https://crates.io/crates/rustls-native-certs/
498//! [`webpki-roots`]: https://crates.io/crates/webpki-roots
499//!
500//! # Customizing HTTP Clients
501//!
502//! Many [`ObjectStore`] implementations permit customization of the HTTP client via
503//! the [`HttpConnector`] trait and utilities in the [`client`] module.
504//! Examples include injecting custom HTTP headers or using an alternate
505//! tokio Runtime I/O requests.
506//!
507//! [`HttpConnector`]: client::HttpConnector
508
509#[cfg(feature = "aws")]
510pub mod aws;
511#[cfg(feature = "azure")]
512pub mod azure;
513pub mod buffered;
514#[cfg(not(target_arch = "wasm32"))]
515pub mod chunked;
516pub mod delimited;
517#[cfg(feature = "gcp")]
518pub mod gcp;
519#[cfg(feature = "http")]
520pub mod http;
521pub mod limit;
522#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
523pub mod local;
524pub mod memory;
525pub mod path;
526pub mod prefix;
527pub mod registry;
528#[cfg(feature = "cloud")]
529pub mod signer;
530pub mod throttle;
531
532#[cfg(feature = "cloud")]
533pub mod client;
534
535#[cfg(feature = "cloud")]
536pub use client::{
537    backoff::BackoffConfig, retry::RetryConfig, ClientConfigKey, ClientOptions, CredentialProvider,
538    StaticCredentialProvider,
539};
540
541#[cfg(all(feature = "cloud", not(target_arch = "wasm32")))]
542pub use client::Certificate;
543
544#[cfg(feature = "cloud")]
545mod config;
546
547mod tags;
548
549pub use tags::TagSet;
550
551pub mod list;
552pub mod multipart;
553mod parse;
554mod payload;
555mod upload;
556mod util;
557
558mod attributes;
559
560#[cfg(any(feature = "integration", test))]
561pub mod integration;
562
563pub use attributes::*;
564
565pub use parse::{parse_url, parse_url_opts, ObjectStoreScheme};
566pub use payload::*;
567pub use upload::*;
568pub use util::{coalesce_ranges, collect_bytes, GetRange, OBJECT_STORE_COALESCE_DEFAULT};
569
570// Re-export HTTP types used in public API
571pub use ::http::{Extensions, HeaderMap, HeaderValue};
572
573use crate::path::Path;
574#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
575use crate::util::maybe_spawn_blocking;
576use async_trait::async_trait;
577use bytes::Bytes;
578use chrono::{DateTime, Utc};
579use futures::{stream::BoxStream, StreamExt, TryStreamExt};
580use std::fmt::{Debug, Formatter};
581#[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
582use std::io::{Read, Seek, SeekFrom};
583use std::ops::Range;
584use std::sync::Arc;
585
586/// An alias for a dynamically dispatched object store implementation.
587pub type DynObjectStore = dyn ObjectStore;
588
589/// Id type for multipart uploads.
590pub type MultipartId = String;
591
592/// Universal API to multiple object store services.
593#[async_trait]
594pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static {
595    /// Save the provided bytes to the specified location
596    ///
597    /// The operation is guaranteed to be atomic, it will either successfully
598    /// write the entirety of `payload` to `location`, or fail. No clients
599    /// should be able to observe a partially written object
600    async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
601        self.put_opts(location, payload, PutOptions::default())
602            .await
603    }
604
605    /// Save the provided `payload` to `location` with the given options
606    async fn put_opts(
607        &self,
608        location: &Path,
609        payload: PutPayload,
610        opts: PutOptions,
611    ) -> Result<PutResult>;
612
613    /// Perform a multipart upload
614    ///
615    /// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads
616    /// typically require multiple separate requests. See [`MultipartUpload`] for more information
617    ///
618    /// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
619    async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
620        self.put_multipart_opts(location, PutMultipartOptions::default())
621            .await
622    }
623
624    /// Perform a multipart upload with options
625    ///
626    /// Client should prefer [`ObjectStore::put`] for small payloads, as streaming uploads
627    /// typically require multiple separate requests. See [`MultipartUpload`] for more information
628    ///
629    /// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore)
630    async fn put_multipart_opts(
631        &self,
632        location: &Path,
633        opts: PutMultipartOptions,
634    ) -> Result<Box<dyn MultipartUpload>>;
635
636    /// Return the bytes that are stored at the specified location.
637    async fn get(&self, location: &Path) -> Result<GetResult> {
638        self.get_opts(location, GetOptions::default()).await
639    }
640
641    /// Perform a get request with options
642    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult>;
643
644    /// Return the bytes that are stored at the specified location
645    /// in the given byte range.
646    ///
647    /// See [`GetRange::Bounded`] for more details on how `range` gets interpreted
648    async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
649        let options = GetOptions {
650            range: Some(range.into()),
651            ..Default::default()
652        };
653        self.get_opts(location, options).await?.bytes().await
654    }
655
656    /// Return the bytes that are stored at the specified location
657    /// in the given byte ranges
658    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
659        coalesce_ranges(
660            ranges,
661            |range| self.get_range(location, range),
662            OBJECT_STORE_COALESCE_DEFAULT,
663        )
664        .await
665    }
666
667    /// Return the metadata for the specified location
668    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
669        let options = GetOptions {
670            head: true,
671            ..Default::default()
672        };
673        Ok(self.get_opts(location, options).await?.meta)
674    }
675
676    /// Delete the object at the specified location.
677    async fn delete(&self, location: &Path) -> Result<()>;
678
679    /// Delete all the objects at the specified locations
680    ///
681    /// When supported, this method will use bulk operations that delete more
682    /// than one object per a request. The default implementation will call
683    /// the single object delete method for each location, but with up to 10
684    /// concurrent requests.
685    ///
686    /// The returned stream yields the results of the delete operations in the
687    /// same order as the input locations. However, some errors will be from
688    /// an overall call to a bulk delete operation, and not from a specific
689    /// location.
690    ///
691    /// If the object did not exist, the result may be an error or a success,
692    /// depending on the behavior of the underlying store. For example, local
693    /// filesystems, GCP, and Azure return an error, while S3 and in-memory will
694    /// return Ok. If it is an error, it will be [`Error::NotFound`].
695    ///
696    /// ```ignore-wasm32
697    /// # use futures::{StreamExt, TryStreamExt};
698    /// # use object_store::local::LocalFileSystem;
699    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
700    /// # let root = tempfile::TempDir::new().unwrap();
701    /// # let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
702    /// # use object_store::{ObjectStore, ObjectMeta};
703    /// # use object_store::path::Path;
704    /// # use futures::{StreamExt, TryStreamExt};
705    /// #
706    /// // Create two objects
707    /// store.put(&Path::from("foo"), "foo".into()).await?;
708    /// store.put(&Path::from("bar"), "bar".into()).await?;
709    ///
710    /// // List object
711    /// let locations = store.list(None).map_ok(|m| m.location).boxed();
712    ///
713    /// // Delete them
714    /// store.delete_stream(locations).try_collect::<Vec<Path>>().await?;
715    /// # Ok(())
716    /// # }
717    /// # let rt = tokio::runtime::Builder::new_current_thread().build().unwrap();
718    /// # rt.block_on(example()).unwrap();
719    /// ```
720    fn delete_stream<'a>(
721        &'a self,
722        locations: BoxStream<'a, Result<Path>>,
723    ) -> BoxStream<'a, Result<Path>> {
724        locations
725            .map(|location| async {
726                let location = location?;
727                self.delete(&location).await?;
728                Ok(location)
729            })
730            .buffered(10)
731            .boxed()
732    }
733
734    /// List all the objects with the given prefix.
735    ///
736    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar` is a prefix of `foo/bar/x` but not of
737    /// `foo/bar_baz/x`. List is recursive, i.e. `foo/bar/more/x` will be included.
738    ///
739    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
740    ///
741    /// For more advanced listing see [`PaginatedListStore`](list::PaginatedListStore)
742    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>>;
743
744    /// List all the objects with the given prefix and a location greater than `offset`
745    ///
746    /// Some stores, such as S3 and GCS, may be able to push `offset` down to reduce
747    /// the number of network requests required
748    ///
749    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
750    ///
751    /// For more advanced listing see [`PaginatedListStore`](list::PaginatedListStore)
752    fn list_with_offset(
753        &self,
754        prefix: Option<&Path>,
755        offset: &Path,
756    ) -> BoxStream<'static, Result<ObjectMeta>> {
757        let offset = offset.clone();
758        self.list(prefix)
759            .try_filter(move |f| futures::future::ready(f.location > offset))
760            .boxed()
761    }
762
763    /// List objects with the given prefix and an implementation specific
764    /// delimiter. Returns common prefixes (directories) in addition to object
765    /// metadata.
766    ///
767    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar` is a prefix of `foo/bar/x` but not of
768    /// `foo/bar_baz/x`. List is not recursive, i.e. `foo/bar/more/x` will not be included.
769    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult>;
770
771    /// Copy an object from one path to another in the same object store.
772    ///
773    /// If there exists an object at the destination, it will be overwritten.
774    async fn copy(&self, from: &Path, to: &Path) -> Result<()>;
775
776    /// Move an object from one path to another in the same object store.
777    ///
778    /// By default, this is implemented as a copy and then delete source. It may not
779    /// check when deleting source that it was the same object that was originally copied.
780    ///
781    /// If there exists an object at the destination, it will be overwritten.
782    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
783        self.copy(from, to).await?;
784        self.delete(from).await
785    }
786
787    /// Copy an object from one path to another, only if destination is empty.
788    ///
789    /// Will return an error if the destination already has an object.
790    ///
791    /// Performs an atomic operation if the underlying object storage supports it.
792    /// If atomic operations are not supported by the underlying object storage (like S3)
793    /// it will return an error.
794    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()>;
795
796    /// Move an object from one path to another in the same object store.
797    ///
798    /// Will return an error if the destination already has an object.
799    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
800        self.copy_if_not_exists(from, to).await?;
801        self.delete(from).await
802    }
803}
804
805macro_rules! as_ref_impl {
806    ($type:ty) => {
807        #[async_trait]
808        impl ObjectStore for $type {
809            async fn put(&self, location: &Path, payload: PutPayload) -> Result<PutResult> {
810                self.as_ref().put(location, payload).await
811            }
812
813            async fn put_opts(
814                &self,
815                location: &Path,
816                payload: PutPayload,
817                opts: PutOptions,
818            ) -> Result<PutResult> {
819                self.as_ref().put_opts(location, payload, opts).await
820            }
821
822            async fn put_multipart(&self, location: &Path) -> Result<Box<dyn MultipartUpload>> {
823                self.as_ref().put_multipart(location).await
824            }
825
826            async fn put_multipart_opts(
827                &self,
828                location: &Path,
829                opts: PutMultipartOptions,
830            ) -> Result<Box<dyn MultipartUpload>> {
831                self.as_ref().put_multipart_opts(location, opts).await
832            }
833
834            async fn get(&self, location: &Path) -> Result<GetResult> {
835                self.as_ref().get(location).await
836            }
837
838            async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
839                self.as_ref().get_opts(location, options).await
840            }
841
842            async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
843                self.as_ref().get_range(location, range).await
844            }
845
846            async fn get_ranges(
847                &self,
848                location: &Path,
849                ranges: &[Range<u64>],
850            ) -> Result<Vec<Bytes>> {
851                self.as_ref().get_ranges(location, ranges).await
852            }
853
854            async fn head(&self, location: &Path) -> Result<ObjectMeta> {
855                self.as_ref().head(location).await
856            }
857
858            async fn delete(&self, location: &Path) -> Result<()> {
859                self.as_ref().delete(location).await
860            }
861
862            fn delete_stream<'a>(
863                &'a self,
864                locations: BoxStream<'a, Result<Path>>,
865            ) -> BoxStream<'a, Result<Path>> {
866                self.as_ref().delete_stream(locations)
867            }
868
869            fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
870                self.as_ref().list(prefix)
871            }
872
873            fn list_with_offset(
874                &self,
875                prefix: Option<&Path>,
876                offset: &Path,
877            ) -> BoxStream<'static, Result<ObjectMeta>> {
878                self.as_ref().list_with_offset(prefix, offset)
879            }
880
881            async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
882                self.as_ref().list_with_delimiter(prefix).await
883            }
884
885            async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
886                self.as_ref().copy(from, to).await
887            }
888
889            async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
890                self.as_ref().rename(from, to).await
891            }
892
893            async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
894                self.as_ref().copy_if_not_exists(from, to).await
895            }
896
897            async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
898                self.as_ref().rename_if_not_exists(from, to).await
899            }
900        }
901    };
902}
903
904as_ref_impl!(Arc<dyn ObjectStore>);
905as_ref_impl!(Box<dyn ObjectStore>);
906
907/// Result of a list call that includes objects, prefixes (directories) and a
908/// token for the next set of results. Individual result sets may be limited to
909/// 1,000 objects based on the underlying object storage's limitations.
910#[derive(Debug)]
911pub struct ListResult {
912    /// Prefixes that are common (like directories)
913    pub common_prefixes: Vec<Path>,
914    /// Object metadata for the listing
915    pub objects: Vec<ObjectMeta>,
916}
917
918/// The metadata that describes an object.
919#[derive(Debug, Clone, PartialEq, Eq)]
920pub struct ObjectMeta {
921    /// The full path to the object
922    pub location: Path,
923    /// The last modified time
924    pub last_modified: DateTime<Utc>,
925    /// The size in bytes of the object.
926    ///
927    /// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM
928    pub size: u64,
929    /// The unique identifier for the object
930    ///
931    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
932    pub e_tag: Option<String>,
933    /// A version indicator for this object
934    pub version: Option<String>,
935}
936
937/// Options for a get request, such as range
938#[derive(Debug, Default, Clone)]
939pub struct GetOptions {
940    /// Request will succeed if the `ObjectMeta::e_tag` matches
941    /// otherwise returning [`Error::Precondition`]
942    ///
943    /// See <https://datatracker.ietf.org/doc/html/rfc9110#name-if-match>
944    ///
945    /// Examples:
946    ///
947    /// ```text
948    /// If-Match: "xyzzy"
949    /// If-Match: "xyzzy", "r2d2xxxx", "c3piozzzz"
950    /// If-Match: *
951    /// ```
952    pub if_match: Option<String>,
953    /// Request will succeed if the `ObjectMeta::e_tag` does not match
954    /// otherwise returning [`Error::NotModified`]
955    ///
956    /// See <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.2>
957    ///
958    /// Examples:
959    ///
960    /// ```text
961    /// If-None-Match: "xyzzy"
962    /// If-None-Match: "xyzzy", "r2d2xxxx", "c3piozzzz"
963    /// If-None-Match: *
964    /// ```
965    pub if_none_match: Option<String>,
966    /// Request will succeed if the object has been modified since
967    ///
968    /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.3>
969    pub if_modified_since: Option<DateTime<Utc>>,
970    /// Request will succeed if the object has not been modified since
971    /// otherwise returning [`Error::Precondition`]
972    ///
973    /// Some stores, such as S3, will only return `NotModified` for exact
974    /// timestamp matches, instead of for any timestamp greater than or equal.
975    ///
976    /// <https://datatracker.ietf.org/doc/html/rfc9110#section-13.1.4>
977    pub if_unmodified_since: Option<DateTime<Utc>>,
978    /// Request transfer of only the specified range of bytes
979    /// otherwise returning [`Error::NotModified`]
980    ///
981    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-range>
982    pub range: Option<GetRange>,
983    /// Request a particular object version
984    pub version: Option<String>,
985    /// Request transfer of no content
986    ///
987    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-head>
988    pub head: bool,
989    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
990    /// that need to pass context-specific information (like tracing spans) via trait methods.
991    ///
992    /// These extensions are ignored entirely by backends offered through this crate.
993    pub extensions: Extensions,
994}
995
996impl GetOptions {
997    /// Returns an error if the modification conditions on this request are not satisfied
998    ///
999    /// <https://datatracker.ietf.org/doc/html/rfc7232#section-6>
1000    pub fn check_preconditions(&self, meta: &ObjectMeta) -> Result<()> {
1001        // The use of the invalid etag "*" means no ETag is equivalent to never matching
1002        let etag = meta.e_tag.as_deref().unwrap_or("*");
1003        let last_modified = meta.last_modified;
1004
1005        if let Some(m) = &self.if_match {
1006            if m != "*" && m.split(',').map(str::trim).all(|x| x != etag) {
1007                return Err(Error::Precondition {
1008                    path: meta.location.to_string(),
1009                    source: format!("{etag} does not match {m}").into(),
1010                });
1011            }
1012        } else if let Some(date) = self.if_unmodified_since {
1013            if last_modified > date {
1014                return Err(Error::Precondition {
1015                    path: meta.location.to_string(),
1016                    source: format!("{date} < {last_modified}").into(),
1017                });
1018            }
1019        }
1020
1021        if let Some(m) = &self.if_none_match {
1022            if m == "*" || m.split(',').map(str::trim).any(|x| x == etag) {
1023                return Err(Error::NotModified {
1024                    path: meta.location.to_string(),
1025                    source: format!("{etag} matches {m}").into(),
1026                });
1027            }
1028        } else if let Some(date) = self.if_modified_since {
1029            if last_modified <= date {
1030                return Err(Error::NotModified {
1031                    path: meta.location.to_string(),
1032                    source: format!("{date} >= {last_modified}").into(),
1033                });
1034            }
1035        }
1036        Ok(())
1037    }
1038}
1039
1040/// Result for a get request
1041#[derive(Debug)]
1042pub struct GetResult {
1043    /// The [`GetResultPayload`]
1044    pub payload: GetResultPayload,
1045    /// The [`ObjectMeta`] for this object
1046    pub meta: ObjectMeta,
1047    /// The range of bytes returned by this request
1048    ///
1049    /// Note this is not `usize` as `object_store` supports 32-bit architectures such as WASM
1050    pub range: Range<u64>,
1051    /// Additional object attributes
1052    pub attributes: Attributes,
1053}
1054
1055/// The kind of a [`GetResult`]
1056///
1057/// This special cases the case of a local file, as some systems may
1058/// be able to optimise the case of a file already present on local disk
1059pub enum GetResultPayload {
1060    /// The file, path
1061    #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1062    File(std::fs::File, std::path::PathBuf),
1063    /// An opaque stream of bytes
1064    Stream(BoxStream<'static, Result<Bytes>>),
1065}
1066
1067impl Debug for GetResultPayload {
1068    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1069        match self {
1070            #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1071            Self::File(_, _) => write!(f, "GetResultPayload(File)"),
1072            Self::Stream(_) => write!(f, "GetResultPayload(Stream)"),
1073        }
1074    }
1075}
1076
1077impl GetResult {
1078    /// Collects the data into a [`Bytes`]
1079    pub async fn bytes(self) -> Result<Bytes> {
1080        let len = self.range.end - self.range.start;
1081        match self.payload {
1082            #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1083            GetResultPayload::File(mut file, path) => {
1084                maybe_spawn_blocking(move || {
1085                    file.seek(SeekFrom::Start(self.range.start as _))
1086                        .map_err(|source| local::Error::Seek {
1087                            source,
1088                            path: path.clone(),
1089                        })?;
1090
1091                    let mut buffer = if let Ok(len) = len.try_into() {
1092                        Vec::with_capacity(len)
1093                    } else {
1094                        Vec::new()
1095                    };
1096                    file.take(len as _)
1097                        .read_to_end(&mut buffer)
1098                        .map_err(|source| local::Error::UnableToReadBytes { source, path })?;
1099
1100                    Ok(buffer.into())
1101                })
1102                .await
1103            }
1104            GetResultPayload::Stream(s) => collect_bytes(s, Some(len)).await,
1105        }
1106    }
1107
1108    /// Converts this into a byte stream
1109    ///
1110    /// If the `self.kind` is [`GetResultPayload::File`] will perform chunked reads of the file,
1111    /// otherwise will return the [`GetResultPayload::Stream`].
1112    ///
1113    /// # Tokio Compatibility
1114    ///
1115    /// Tokio discourages performing blocking IO on a tokio worker thread, however,
1116    /// no major operating systems have stable async file APIs. Therefore if called from
1117    /// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
1118    /// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
1119    ///
1120    /// If not called from a tokio context, this will perform IO on the current thread with
1121    /// no additional complexity or overheads
1122    pub fn into_stream(self) -> BoxStream<'static, Result<Bytes>> {
1123        match self.payload {
1124            #[cfg(all(feature = "fs", not(target_arch = "wasm32")))]
1125            GetResultPayload::File(file, path) => {
1126                const CHUNK_SIZE: usize = 8 * 1024;
1127                local::chunked_stream(file, path, self.range, CHUNK_SIZE)
1128            }
1129            GetResultPayload::Stream(s) => s,
1130        }
1131    }
1132}
1133
1134/// Configure preconditions for the put operation
1135#[derive(Debug, Clone, PartialEq, Eq, Default)]
1136pub enum PutMode {
1137    /// Perform an atomic write operation, overwriting any object present at the provided path
1138    #[default]
1139    Overwrite,
1140    /// Perform an atomic write operation, returning [`Error::AlreadyExists`] if an
1141    /// object already exists at the provided path
1142    Create,
1143    /// Perform an atomic write operation if the current version of the object matches the
1144    /// provided [`UpdateVersion`], returning [`Error::Precondition`] otherwise
1145    Update(UpdateVersion),
1146}
1147
1148/// Uniquely identifies a version of an object to update
1149///
1150/// Stores will use differing combinations of `e_tag` and `version` to provide conditional
1151/// updates, and it is therefore recommended applications preserve both
1152#[derive(Debug, Clone, PartialEq, Eq)]
1153pub struct UpdateVersion {
1154    /// The unique identifier for the newly created object
1155    ///
1156    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
1157    pub e_tag: Option<String>,
1158    /// A version indicator for the newly created object
1159    pub version: Option<String>,
1160}
1161
1162impl From<PutResult> for UpdateVersion {
1163    fn from(value: PutResult) -> Self {
1164        Self {
1165            e_tag: value.e_tag,
1166            version: value.version,
1167        }
1168    }
1169}
1170
1171/// Options for a put request
1172#[derive(Debug, Clone, Default)]
1173pub struct PutOptions {
1174    /// Configure the [`PutMode`] for this operation
1175    pub mode: PutMode,
1176    /// Provide a [`TagSet`] for this object
1177    ///
1178    /// Implementations that don't support object tagging should ignore this
1179    pub tags: TagSet,
1180    /// Provide a set of [`Attributes`]
1181    ///
1182    /// Implementations that don't support an attribute should return an error
1183    pub attributes: Attributes,
1184    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1185    /// that need to pass context-specific information (like tracing spans) via trait methods.
1186    ///
1187    /// These extensions are ignored entirely by backends offered through this crate.
1188    ///
1189    /// They are also eclused from [`PartialEq`] and [`Eq`].
1190    pub extensions: Extensions,
1191}
1192
1193impl PartialEq<Self> for PutOptions {
1194    fn eq(&self, other: &Self) -> bool {
1195        let Self {
1196            mode,
1197            tags,
1198            attributes,
1199            extensions: _,
1200        } = self;
1201        let Self {
1202            mode: other_mode,
1203            tags: other_tags,
1204            attributes: other_attributes,
1205            extensions: _,
1206        } = other;
1207        (mode == other_mode) && (tags == other_tags) && (attributes == other_attributes)
1208    }
1209}
1210
1211impl Eq for PutOptions {}
1212
1213impl From<PutMode> for PutOptions {
1214    fn from(mode: PutMode) -> Self {
1215        Self {
1216            mode,
1217            ..Default::default()
1218        }
1219    }
1220}
1221
1222impl From<TagSet> for PutOptions {
1223    fn from(tags: TagSet) -> Self {
1224        Self {
1225            tags,
1226            ..Default::default()
1227        }
1228    }
1229}
1230
1231impl From<Attributes> for PutOptions {
1232    fn from(attributes: Attributes) -> Self {
1233        Self {
1234            attributes,
1235            ..Default::default()
1236        }
1237    }
1238}
1239
1240// See <https://github.com/apache/arrow-rs-object-store/issues/339>.
1241#[doc(hidden)]
1242#[deprecated(note = "Use PutMultipartOptions", since = "0.12.3")]
1243pub type PutMultipartOpts = PutMultipartOptions;
1244
1245/// Options for [`ObjectStore::put_multipart_opts`]
1246#[derive(Debug, Clone, Default)]
1247pub struct PutMultipartOptions {
1248    /// Provide a [`TagSet`] for this object
1249    ///
1250    /// Implementations that don't support object tagging should ignore this
1251    pub tags: TagSet,
1252    /// Provide a set of [`Attributes`]
1253    ///
1254    /// Implementations that don't support an attribute should return an error
1255    pub attributes: Attributes,
1256    /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
1257    /// that need to pass context-specific information (like tracing spans) via trait methods.
1258    ///
1259    /// These extensions are ignored entirely by backends offered through this crate.
1260    ///
1261    /// They are also eclused from [`PartialEq`] and [`Eq`].
1262    pub extensions: Extensions,
1263}
1264
1265impl PartialEq<Self> for PutMultipartOptions {
1266    fn eq(&self, other: &Self) -> bool {
1267        let Self {
1268            tags,
1269            attributes,
1270            extensions: _,
1271        } = self;
1272        let Self {
1273            tags: other_tags,
1274            attributes: other_attributes,
1275            extensions: _,
1276        } = other;
1277        (tags == other_tags) && (attributes == other_attributes)
1278    }
1279}
1280
1281impl Eq for PutMultipartOptions {}
1282
1283impl From<TagSet> for PutMultipartOptions {
1284    fn from(tags: TagSet) -> Self {
1285        Self {
1286            tags,
1287            ..Default::default()
1288        }
1289    }
1290}
1291
1292impl From<Attributes> for PutMultipartOptions {
1293    fn from(attributes: Attributes) -> Self {
1294        Self {
1295            attributes,
1296            ..Default::default()
1297        }
1298    }
1299}
1300
1301/// Result for a put request
1302#[derive(Debug, Clone, PartialEq, Eq)]
1303pub struct PutResult {
1304    /// The unique identifier for the newly created object
1305    ///
1306    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
1307    pub e_tag: Option<String>,
1308    /// A version indicator for the newly created object
1309    pub version: Option<String>,
1310}
1311
1312/// A specialized `Result` for object store-related errors
1313pub type Result<T, E = Error> = std::result::Result<T, E>;
1314
1315/// A specialized `Error` for object store-related errors
1316#[derive(Debug, thiserror::Error)]
1317#[non_exhaustive]
1318pub enum Error {
1319    /// A fallback error type when no variant matches
1320    #[error("Generic {} error: {}", store, source)]
1321    Generic {
1322        /// The store this error originated from
1323        store: &'static str,
1324        /// The wrapped error
1325        source: Box<dyn std::error::Error + Send + Sync + 'static>,
1326    },
1327
1328    /// Error when the object is not found at given location
1329    #[error("Object at location {} not found: {}", path, source)]
1330    NotFound {
1331        /// The path to file
1332        path: String,
1333        /// The wrapped error
1334        source: Box<dyn std::error::Error + Send + Sync + 'static>,
1335    },
1336
1337    /// Error for invalid path
1338    #[error("Encountered object with invalid path: {}", source)]
1339    InvalidPath {
1340        /// The wrapped error
1341        #[from]
1342        source: path::Error,
1343    },
1344
1345    /// Error when `tokio::spawn` failed
1346    #[error("Error joining spawned task: {}", source)]
1347    JoinError {
1348        /// The wrapped error
1349        #[from]
1350        source: tokio::task::JoinError,
1351    },
1352
1353    /// Error when the attempted operation is not supported
1354    #[error("Operation not supported: {}", source)]
1355    NotSupported {
1356        /// The wrapped error
1357        source: Box<dyn std::error::Error + Send + Sync + 'static>,
1358    },
1359
1360    /// Error when the object already exists
1361    #[error("Object at location {} already exists: {}", path, source)]
1362    AlreadyExists {
1363        /// The path to the
1364        path: String,
1365        /// The wrapped error
1366        source: Box<dyn std::error::Error + Send + Sync + 'static>,
1367    },
1368
1369    /// Error when the required conditions failed for the operation
1370    #[error("Request precondition failure for path {}: {}", path, source)]
1371    Precondition {
1372        /// The path to the file
1373        path: String,
1374        /// The wrapped error
1375        source: Box<dyn std::error::Error + Send + Sync + 'static>,
1376    },
1377
1378    /// Error when the object at the location isn't modified
1379    #[error("Object at location {} not modified: {}", path, source)]
1380    NotModified {
1381        /// The path to the file
1382        path: String,
1383        /// The wrapped error
1384        source: Box<dyn std::error::Error + Send + Sync + 'static>,
1385    },
1386
1387    /// Error when an operation is not implemented
1388    #[error("Operation not yet implemented.")]
1389    NotImplemented,
1390
1391    /// Error when the used credentials don't have enough permission
1392    /// to perform the requested operation
1393    #[error(
1394        "The operation lacked the necessary privileges to complete for path {}: {}",
1395        path,
1396        source
1397    )]
1398    PermissionDenied {
1399        /// The path to the file
1400        path: String,
1401        /// The wrapped error
1402        source: Box<dyn std::error::Error + Send + Sync + 'static>,
1403    },
1404
1405    /// Error when the used credentials lack valid authentication
1406    #[error(
1407        "The operation lacked valid authentication credentials for path {}: {}",
1408        path,
1409        source
1410    )]
1411    Unauthenticated {
1412        /// The path to the file
1413        path: String,
1414        /// The wrapped error
1415        source: Box<dyn std::error::Error + Send + Sync + 'static>,
1416    },
1417
1418    /// Error when a configuration key is invalid for the store used
1419    #[error("Configuration key: '{}' is not valid for store '{}'.", key, store)]
1420    UnknownConfigurationKey {
1421        /// The object store used
1422        store: &'static str,
1423        /// The configuration key used
1424        key: String,
1425    },
1426}
1427
1428impl From<Error> for std::io::Error {
1429    fn from(e: Error) -> Self {
1430        let kind = match &e {
1431            Error::NotFound { .. } => std::io::ErrorKind::NotFound,
1432            _ => std::io::ErrorKind::Other,
1433        };
1434        Self::new(kind, e)
1435    }
1436}
1437
1438#[cfg(test)]
1439mod tests {
1440    use super::*;
1441    use crate::buffered::BufWriter;
1442    use chrono::TimeZone;
1443    use tokio::io::AsyncWriteExt;
1444
1445    macro_rules! maybe_skip_integration {
1446        () => {
1447            if std::env::var("TEST_INTEGRATION").is_err() {
1448                eprintln!("Skipping integration test - set TEST_INTEGRATION");
1449                return;
1450            }
1451        };
1452    }
1453    pub(crate) use maybe_skip_integration;
1454
1455    /// Test that the returned stream does not borrow the lifetime of Path
1456    fn list_store<'a>(
1457        store: &'a dyn ObjectStore,
1458        path_str: &str,
1459    ) -> BoxStream<'a, Result<ObjectMeta>> {
1460        let path = Path::from(path_str);
1461        store.list(Some(&path))
1462    }
1463
1464    #[cfg(any(feature = "azure", feature = "aws"))]
1465    pub(crate) async fn signing<T>(integration: &T)
1466    where
1467        T: ObjectStore + signer::Signer,
1468    {
1469        use reqwest::Method;
1470        use std::time::Duration;
1471
1472        let data = Bytes::from("hello world");
1473        let path = Path::from("file.txt");
1474        integration.put(&path, data.clone().into()).await.unwrap();
1475
1476        let signed = integration
1477            .signed_url(Method::GET, &path, Duration::from_secs(60))
1478            .await
1479            .unwrap();
1480
1481        let resp = reqwest::get(signed).await.unwrap();
1482        let loaded = resp.bytes().await.unwrap();
1483
1484        assert_eq!(data, loaded);
1485    }
1486
1487    #[cfg(any(feature = "aws", feature = "azure"))]
1488    pub(crate) async fn tagging<F, Fut>(storage: Arc<dyn ObjectStore>, validate: bool, get_tags: F)
1489    where
1490        F: Fn(Path) -> Fut + Send + Sync,
1491        Fut: std::future::Future<Output = Result<client::HttpResponse>> + Send,
1492    {
1493        use bytes::Buf;
1494        use serde::Deserialize;
1495
1496        #[derive(Deserialize)]
1497        struct Tagging {
1498            #[serde(rename = "TagSet")]
1499            list: TagList,
1500        }
1501
1502        #[derive(Debug, Deserialize)]
1503        struct TagList {
1504            #[serde(rename = "Tag")]
1505            tags: Vec<Tag>,
1506        }
1507
1508        #[derive(Debug, Deserialize, Eq, PartialEq)]
1509        #[serde(rename_all = "PascalCase")]
1510        struct Tag {
1511            key: String,
1512            value: String,
1513        }
1514
1515        let tags = vec![
1516            Tag {
1517                key: "foo.com=bar/s".to_string(),
1518                value: "bananas/foo.com-_".to_string(),
1519            },
1520            Tag {
1521                key: "namespace/key.foo".to_string(),
1522                value: "value with a space".to_string(),
1523            },
1524        ];
1525        let mut tag_set = TagSet::default();
1526        for t in &tags {
1527            tag_set.push(&t.key, &t.value)
1528        }
1529
1530        let path = Path::from("tag_test");
1531        storage
1532            .put_opts(&path, "test".into(), tag_set.clone().into())
1533            .await
1534            .unwrap();
1535
1536        let multi_path = Path::from("tag_test_multi");
1537        let mut write = storage
1538            .put_multipart_opts(&multi_path, tag_set.clone().into())
1539            .await
1540            .unwrap();
1541
1542        write.put_part("foo".into()).await.unwrap();
1543        write.complete().await.unwrap();
1544
1545        let buf_path = Path::from("tag_test_buf");
1546        let mut buf = BufWriter::new(storage, buf_path.clone()).with_tags(tag_set);
1547        buf.write_all(b"foo").await.unwrap();
1548        buf.shutdown().await.unwrap();
1549
1550        // Write should always succeed, but certain configurations may simply ignore tags
1551        if !validate {
1552            return;
1553        }
1554
1555        for path in [path, multi_path, buf_path] {
1556            let resp = get_tags(path.clone()).await.unwrap();
1557            let body = resp.into_body().bytes().await.unwrap();
1558
1559            let mut resp: Tagging = quick_xml::de::from_reader(body.reader()).unwrap();
1560            resp.list.tags.sort_by(|a, b| a.key.cmp(&b.key));
1561            assert_eq!(resp.list.tags, tags);
1562        }
1563    }
1564
1565    #[tokio::test]
1566    async fn test_list_lifetimes() {
1567        let store = memory::InMemory::new();
1568        let mut stream = list_store(&store, "path");
1569        assert!(stream.next().await.is_none());
1570    }
1571
1572    #[test]
1573    fn test_preconditions() {
1574        let mut meta = ObjectMeta {
1575            location: Path::from("test"),
1576            last_modified: Utc.timestamp_nanos(100),
1577            size: 100,
1578            e_tag: Some("123".to_string()),
1579            version: None,
1580        };
1581
1582        let mut options = GetOptions::default();
1583        options.check_preconditions(&meta).unwrap();
1584
1585        options.if_modified_since = Some(Utc.timestamp_nanos(50));
1586        options.check_preconditions(&meta).unwrap();
1587
1588        options.if_modified_since = Some(Utc.timestamp_nanos(100));
1589        options.check_preconditions(&meta).unwrap_err();
1590
1591        options.if_modified_since = Some(Utc.timestamp_nanos(101));
1592        options.check_preconditions(&meta).unwrap_err();
1593
1594        options = GetOptions::default();
1595
1596        options.if_unmodified_since = Some(Utc.timestamp_nanos(50));
1597        options.check_preconditions(&meta).unwrap_err();
1598
1599        options.if_unmodified_since = Some(Utc.timestamp_nanos(100));
1600        options.check_preconditions(&meta).unwrap();
1601
1602        options.if_unmodified_since = Some(Utc.timestamp_nanos(101));
1603        options.check_preconditions(&meta).unwrap();
1604
1605        options = GetOptions::default();
1606
1607        options.if_match = Some("123".to_string());
1608        options.check_preconditions(&meta).unwrap();
1609
1610        options.if_match = Some("123,354".to_string());
1611        options.check_preconditions(&meta).unwrap();
1612
1613        options.if_match = Some("354, 123,".to_string());
1614        options.check_preconditions(&meta).unwrap();
1615
1616        options.if_match = Some("354".to_string());
1617        options.check_preconditions(&meta).unwrap_err();
1618
1619        options.if_match = Some("*".to_string());
1620        options.check_preconditions(&meta).unwrap();
1621
1622        // If-Match takes precedence
1623        options.if_unmodified_since = Some(Utc.timestamp_nanos(200));
1624        options.check_preconditions(&meta).unwrap();
1625
1626        options = GetOptions::default();
1627
1628        options.if_none_match = Some("123".to_string());
1629        options.check_preconditions(&meta).unwrap_err();
1630
1631        options.if_none_match = Some("*".to_string());
1632        options.check_preconditions(&meta).unwrap_err();
1633
1634        options.if_none_match = Some("1232".to_string());
1635        options.check_preconditions(&meta).unwrap();
1636
1637        options.if_none_match = Some("23, 123".to_string());
1638        options.check_preconditions(&meta).unwrap_err();
1639
1640        // If-None-Match takes precedence
1641        options.if_modified_since = Some(Utc.timestamp_nanos(10));
1642        options.check_preconditions(&meta).unwrap_err();
1643
1644        // Check missing ETag
1645        meta.e_tag = None;
1646        options = GetOptions::default();
1647
1648        options.if_none_match = Some("*".to_string()); // Fails if any file exists
1649        options.check_preconditions(&meta).unwrap_err();
1650
1651        options = GetOptions::default();
1652        options.if_match = Some("*".to_string()); // Passes if file exists
1653        options.check_preconditions(&meta).unwrap();
1654    }
1655
1656    #[test]
1657    #[cfg(feature = "http")]
1658    fn test_reexported_types() {
1659        // Test HeaderMap
1660        let mut headers = HeaderMap::new();
1661        headers.insert("content-type", HeaderValue::from_static("text/plain"));
1662        assert_eq!(headers.len(), 1);
1663
1664        // Test HeaderValue
1665        let value = HeaderValue::from_static("test-value");
1666        assert_eq!(value.as_bytes(), b"test-value");
1667
1668        // Test Extensions
1669        let mut extensions = Extensions::new();
1670        extensions.insert("test-key");
1671        assert!(extensions.get::<&str>().is_some());
1672    }
1673}