Skip to main content

vortex_io/compat/
obj_store.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Display;
5use std::fmt::Formatter;
6use std::ops::Range;
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::BoxStream;
11use object_store::CopyOptions;
12use object_store::GetOptions;
13use object_store::GetResult;
14use object_store::GetResultPayload;
15use object_store::ListResult;
16use object_store::MultipartUpload;
17use object_store::ObjectMeta;
18use object_store::ObjectStore;
19use object_store::PutMultipartOptions;
20use object_store::PutOptions;
21use object_store::PutPayload;
22use object_store::PutResult;
23use object_store::RenameOptions;
24use object_store::Result;
25use object_store::UploadPart;
26use object_store::path::Path;
27use smol::future::FutureExt;
28use smol::stream::StreamExt;
29
30use crate::compat::Compat;
31
32impl<T: ObjectStore> Display for Compat<T> {
33    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
34        write!(f, "Compat<{}>", self.inner())
35    }
36}
37
38#[async_trait]
39impl<T: ObjectStore> ObjectStore for Compat<T> {
40    async fn put_opts(
41        &self,
42        location: &Path,
43        payload: PutPayload,
44        opts: PutOptions,
45    ) -> Result<PutResult> {
46        Compat::new(self.inner().put_opts(location, payload, opts)).await
47    }
48
49    async fn put_multipart_opts(
50        &self,
51        location: &Path,
52        opts: PutMultipartOptions,
53    ) -> Result<Box<dyn MultipartUpload>> {
54        Ok(Box::new(Compat::new(
55            Compat::new(self.inner().put_multipart_opts(location, opts)).await?,
56        )))
57    }
58
59    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
60        let mut result = Compat::new(self.inner().get_opts(location, options)).await?;
61        result.payload = match result.payload {
62            GetResultPayload::Stream(stream) => {
63                GetResultPayload::Stream(Compat::new(stream).boxed())
64            }
65            #[cfg(not(target_arch = "wasm32"))]
66            GetResultPayload::File(file, path) => GetResultPayload::File(file, path),
67        };
68        Ok(result)
69    }
70
71    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
72        Compat::new(self.inner().get_ranges(location, ranges)).await
73    }
74
75    fn delete_stream(
76        &self,
77        locations: BoxStream<'static, Result<Path>>,
78    ) -> BoxStream<'static, Result<Path>> {
79        Compat::new(self.inner().delete_stream(locations)).boxed()
80    }
81
82    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
83        Compat::new(self.inner().list(prefix)).boxed()
84    }
85
86    fn list_with_offset(
87        &self,
88        prefix: Option<&Path>,
89        offset: &Path,
90    ) -> BoxStream<'static, Result<ObjectMeta>> {
91        Compat::new(self.inner().list_with_offset(prefix, offset)).boxed()
92    }
93
94    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
95        Compat::new(self.inner().list_with_delimiter(prefix)).await
96    }
97
98    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
99        Compat::new(self.inner().copy_opts(from, to, options)).await
100    }
101
102    async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
103        Compat::new(self.inner().rename_opts(from, to, options)).await
104    }
105}
106
107#[async_trait]
108impl<T: MultipartUpload> MultipartUpload for Compat<T> {
109    fn put_part(&mut self, data: PutPayload) -> UploadPart {
110        Compat::new(self.inner_mut().put_part(data)).boxed()
111    }
112
113    async fn complete(&mut self) -> Result<PutResult> {
114        Compat::new(self.inner_mut().complete()).await
115    }
116
117    async fn abort(&mut self) -> Result<()> {
118        Compat::new(self.inner_mut().abort()).await
119    }
120}