Skip to main content

vortex_io/compat/
obj_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Display;
5use std::fmt::Formatter;
6use std::ops::Range;
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::BoxStream;
11use object_store::CopyOptions;
12use object_store::GetOptions;
13use object_store::GetResult;
14use object_store::ListResult;
15use object_store::MultipartUpload;
16use object_store::ObjectMeta;
17use object_store::ObjectStore;
18use object_store::PutMultipartOptions;
19use object_store::PutOptions;
20use object_store::PutPayload;
21use object_store::PutResult;
22use object_store::RenameOptions;
23use object_store::Result;
24use object_store::UploadPart;
25use object_store::path::Path;
26use smol::future::FutureExt;
27use smol::stream::StreamExt;
28
29use crate::compat::Compat;
30
31impl<T: ObjectStore> Display for Compat<T> {
32    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33        write!(f, "Compat<{}>", self.inner())
34    }
35}
36
37#[async_trait]
38impl<T: ObjectStore> ObjectStore for Compat<T> {
39    async fn put_opts(
40        &self,
41        location: &Path,
42        payload: PutPayload,
43        opts: PutOptions,
44    ) -> Result<PutResult> {
45        Compat::new(self.inner().put_opts(location, payload, opts)).await
46    }
47
48    async fn put_multipart_opts(
49        &self,
50        location: &Path,
51        opts: PutMultipartOptions,
52    ) -> Result<Box<dyn MultipartUpload>> {
53        Ok(Box::new(Compat::new(
54            Compat::new(self.inner().put_multipart_opts(location, opts)).await?,
55        )))
56    }
57
58    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
59        Compat::new(self.inner().get_opts(location, options)).await
60    }
61
62    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
63        Compat::new(self.inner().get_ranges(location, ranges)).await
64    }
65
66    fn delete_stream(
67        &self,
68        locations: BoxStream<'static, Result<Path>>,
69    ) -> BoxStream<'static, Result<Path>> {
70        Compat::new(self.inner().delete_stream(locations)).boxed()
71    }
72
73    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
74        Compat::new(self.inner().list(prefix)).boxed()
75    }
76
77    fn list_with_offset(
78        &self,
79        prefix: Option<&Path>,
80        offset: &Path,
81    ) -> BoxStream<'static, Result<ObjectMeta>> {
82        Compat::new(self.inner().list_with_offset(prefix, offset)).boxed()
83    }
84
85    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
86        Compat::new(self.inner().list_with_delimiter(prefix)).await
87    }
88
89    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
90        Compat::new(self.inner().copy_opts(from, to, options)).await
91    }
92
93    async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
94        Compat::new(self.inner().rename_opts(from, to, options)).await
95    }
96}
97
98#[async_trait]
99impl<T: MultipartUpload> MultipartUpload for Compat<T> {
100    fn put_part(&mut self, data: PutPayload) -> UploadPart {
101        Compat::new(self.inner_mut().put_part(data)).boxed()
102    }
103
104    async fn complete(&mut self) -> Result<PutResult> {
105        Compat::new(self.inner_mut().complete()).await
106    }
107
108    async fn abort(&mut self) -> Result<()> {
109        Compat::new(self.inner_mut().abort()).await
110    }
111}