vortex_io/compat/
obj_store.rs1use std::fmt::Display;
5use std::fmt::Formatter;
6use std::ops::Range;
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::BoxStream;
11use object_store::CopyOptions;
12use object_store::GetOptions;
13use object_store::GetResult;
14use object_store::ListResult;
15use object_store::MultipartUpload;
16use object_store::ObjectMeta;
17use object_store::ObjectStore;
18use object_store::PutMultipartOptions;
19use object_store::PutOptions;
20use object_store::PutPayload;
21use object_store::PutResult;
22use object_store::RenameOptions;
23use object_store::Result;
24use object_store::UploadPart;
25use object_store::path::Path;
26use smol::future::FutureExt;
27use smol::stream::StreamExt;
28
29use crate::compat::Compat;
30
31impl<T: ObjectStore> Display for Compat<T> {
32 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
33 write!(f, "Compat<{}>", self.inner())
34 }
35}
36
37#[async_trait]
38impl<T: ObjectStore> ObjectStore for Compat<T> {
39 async fn put_opts(
40 &self,
41 location: &Path,
42 payload: PutPayload,
43 opts: PutOptions,
44 ) -> Result<PutResult> {
45 Compat::new(self.inner().put_opts(location, payload, opts)).await
46 }
47
48 async fn put_multipart_opts(
49 &self,
50 location: &Path,
51 opts: PutMultipartOptions,
52 ) -> Result<Box<dyn MultipartUpload>> {
53 Ok(Box::new(Compat::new(
54 Compat::new(self.inner().put_multipart_opts(location, opts)).await?,
55 )))
56 }
57
58 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
59 Compat::new(self.inner().get_opts(location, options)).await
60 }
61
62 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
63 Compat::new(self.inner().get_ranges(location, ranges)).await
64 }
65
66 fn delete_stream(
67 &self,
68 locations: BoxStream<'static, Result<Path>>,
69 ) -> BoxStream<'static, Result<Path>> {
70 Compat::new(self.inner().delete_stream(locations)).boxed()
71 }
72
73 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
74 Compat::new(self.inner().list(prefix)).boxed()
75 }
76
77 fn list_with_offset(
78 &self,
79 prefix: Option<&Path>,
80 offset: &Path,
81 ) -> BoxStream<'static, Result<ObjectMeta>> {
82 Compat::new(self.inner().list_with_offset(prefix, offset)).boxed()
83 }
84
85 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
86 Compat::new(self.inner().list_with_delimiter(prefix)).await
87 }
88
89 async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
90 Compat::new(self.inner().copy_opts(from, to, options)).await
91 }
92
93 async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
94 Compat::new(self.inner().rename_opts(from, to, options)).await
95 }
96}
97
98#[async_trait]
99impl<T: MultipartUpload> MultipartUpload for Compat<T> {
100 fn put_part(&mut self, data: PutPayload) -> UploadPart {
101 Compat::new(self.inner_mut().put_part(data)).boxed()
102 }
103
104 async fn complete(&mut self) -> Result<PutResult> {
105 Compat::new(self.inner_mut().complete()).await
106 }
107
108 async fn abort(&mut self) -> Result<()> {
109 Compat::new(self.inner_mut().abort()).await
110 }
111}