vortex_io/compat/
obj_store.rs1use std::fmt::Display;
5use std::fmt::Formatter;
6use std::ops::Range;
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use futures::stream::BoxStream;
11use object_store::CopyOptions;
12use object_store::GetOptions;
13use object_store::GetResult;
14use object_store::GetResultPayload;
15use object_store::ListResult;
16use object_store::MultipartUpload;
17use object_store::ObjectMeta;
18use object_store::ObjectStore;
19use object_store::PutMultipartOptions;
20use object_store::PutOptions;
21use object_store::PutPayload;
22use object_store::PutResult;
23use object_store::RenameOptions;
24use object_store::Result;
25use object_store::UploadPart;
26use object_store::path::Path;
27use smol::future::FutureExt;
28use smol::stream::StreamExt;
29
30use crate::compat::Compat;
31
32impl<T: ObjectStore> Display for Compat<T> {
33 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
34 write!(f, "Compat<{}>", self.inner())
35 }
36}
37
38#[async_trait]
39impl<T: ObjectStore> ObjectStore for Compat<T> {
40 async fn put_opts(
41 &self,
42 location: &Path,
43 payload: PutPayload,
44 opts: PutOptions,
45 ) -> Result<PutResult> {
46 Compat::new(self.inner().put_opts(location, payload, opts)).await
47 }
48
49 async fn put_multipart_opts(
50 &self,
51 location: &Path,
52 opts: PutMultipartOptions,
53 ) -> Result<Box<dyn MultipartUpload>> {
54 Ok(Box::new(Compat::new(
55 Compat::new(self.inner().put_multipart_opts(location, opts)).await?,
56 )))
57 }
58
59 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
60 let mut result = Compat::new(self.inner().get_opts(location, options)).await?;
61 result.payload = match result.payload {
62 GetResultPayload::Stream(stream) => {
63 GetResultPayload::Stream(Compat::new(stream).boxed())
64 }
65 #[cfg(not(target_arch = "wasm32"))]
66 GetResultPayload::File(file, path) => GetResultPayload::File(file, path),
67 };
68 Ok(result)
69 }
70
71 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
72 Compat::new(self.inner().get_ranges(location, ranges)).await
73 }
74
75 fn delete_stream(
76 &self,
77 locations: BoxStream<'static, Result<Path>>,
78 ) -> BoxStream<'static, Result<Path>> {
79 Compat::new(self.inner().delete_stream(locations)).boxed()
80 }
81
82 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
83 Compat::new(self.inner().list(prefix)).boxed()
84 }
85
86 fn list_with_offset(
87 &self,
88 prefix: Option<&Path>,
89 offset: &Path,
90 ) -> BoxStream<'static, Result<ObjectMeta>> {
91 Compat::new(self.inner().list_with_offset(prefix, offset)).boxed()
92 }
93
94 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
95 Compat::new(self.inner().list_with_delimiter(prefix)).await
96 }
97
98 async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
99 Compat::new(self.inner().copy_opts(from, to, options)).await
100 }
101
102 async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
103 Compat::new(self.inner().rename_opts(from, to, options)).await
104 }
105}
106
107#[async_trait]
108impl<T: MultipartUpload> MultipartUpload for Compat<T> {
109 fn put_part(&mut self, data: PutPayload) -> UploadPart {
110 Compat::new(self.inner_mut().put_part(data)).boxed()
111 }
112
113 async fn complete(&mut self) -> Result<PutResult> {
114 Compat::new(self.inner_mut().complete()).await
115 }
116
117 async fn abort(&mut self) -> Result<()> {
118 Compat::new(self.inner_mut().abort()).await
119 }
120}