zarrs_object_store/
lib.rs1#![doc = include_str!("../doc/version_compatibility_matrix.md")]
21pub use object_store;
39
40use futures::{stream, StreamExt, TryStreamExt};
41use object_store::path::Path;
42
43use zarrs_storage::{
44 async_store_set_partial_many, byte_range::ByteRangeIterator, AsyncListableStorageTraits,
45 AsyncMaybeBytesIterator, AsyncReadableStorageTraits, AsyncWritableStorageTraits, Bytes,
46 MaybeBytes, OffsetBytesIterator, StorageError, StoreKey, StoreKeys, StoreKeysPrefixes,
47 StorePrefix,
48};
49
50fn key_to_path(key: &StoreKey) -> object_store::path::Path {
52 object_store::path::Path::from(key.as_str())
53}
54
55fn handle_result_notfound<T>(
57 result: Result<T, object_store::Error>,
58) -> Result<Option<T>, StorageError> {
59 match result {
60 Ok(result) => Ok(Some(result)),
61 Err(err) => {
62 if matches!(err, object_store::Error::NotFound { .. }) {
63 Ok(None)
64 } else {
65 Err(StorageError::Other(err.to_string()))
66 }
67 }
68 }
69}
70
71fn handle_result<T>(result: Result<T, object_store::Error>) -> Result<T, StorageError> {
72 result.map_err(|err| StorageError::Other(err.to_string()))
73}
74
75pub struct AsyncObjectStore<T> {
77 object_store: T,
78 }
80
81impl<T: object_store::ObjectStore> AsyncObjectStore<T> {
82 #[must_use]
84 pub fn new(object_store: T) -> Self {
85 Self { object_store }
86 }
87}
88
89#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
90#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
91impl<T: object_store::ObjectStore> AsyncReadableStorageTraits for AsyncObjectStore<T> {
92 async fn get(&self, key: &StoreKey) -> Result<MaybeBytes, StorageError> {
93 let get = handle_result_notfound(self.object_store.get(&key_to_path(key)).await)?;
94 if let Some(get) = get {
95 let bytes = handle_result(get.bytes().await)?;
96 Ok(Some(bytes))
97 } else {
98 Ok(None)
99 }
100 }
101
102 async fn get_partial_many<'a>(
103 &'a self,
104 key: &StoreKey,
105 byte_ranges: ByteRangeIterator<'a>,
106 ) -> Result<AsyncMaybeBytesIterator<'a>, StorageError> {
107 let Some(size) = self.size_key(key).await? else {
108 return Ok(None);
109 };
110 let ranges = byte_ranges
111 .map(|byte_range| byte_range.to_range(size))
112 .collect::<Vec<_>>();
113 let get_ranges = handle_result_notfound(
114 self.object_store
115 .get_ranges(&key_to_path(key), &ranges)
116 .await,
117 )?;
118 if let Some(get_ranges) = get_ranges {
119 let result = std::iter::zip(ranges, get_ranges).map(|(range, bytes)| {
120 let range_len = range.end.saturating_sub(range.start);
121 if range_len == bytes.len() as u64 {
122 Ok(bytes)
123 } else {
124 Err(StorageError::Other(format!(
125 "Unexpected length of bytes returned, expected {}, got {}",
126 range_len,
127 bytes.len()
128 )))
129 }
130 });
131 Ok(Some(stream::iter(result).boxed()))
132 } else {
133 Ok(None)
134 }
135 }
136
137 async fn size_key(&self, key: &StoreKey) -> Result<Option<u64>, StorageError> {
138 Ok(
139 handle_result_notfound(self.object_store.head(&key_to_path(key)).await)?
140 .map(|meta| meta.size),
141 )
142 }
143
144 fn supports_get_partial(&self) -> bool {
145 true
146 }
147}
148
149#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
150#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
151impl<T: object_store::ObjectStore> AsyncWritableStorageTraits for AsyncObjectStore<T> {
152 async fn set(&self, key: &StoreKey, value: Bytes) -> Result<(), StorageError> {
153 handle_result(self.object_store.put(&key_to_path(key), value.into()).await)?;
154 Ok(())
155 }
156
157 async fn set_partial_many<'a>(
158 &'a self,
159 key: &StoreKey,
160 offset_values: OffsetBytesIterator<'a>,
161 ) -> Result<(), StorageError> {
162 async_store_set_partial_many(self, key, offset_values).await
163 }
164
165 async fn erase(&self, key: &StoreKey) -> Result<(), StorageError> {
166 handle_result_notfound(self.object_store.delete(&key_to_path(key)).await)?;
167 Ok(())
168 }
169
170 async fn erase_prefix(&self, prefix: &StorePrefix) -> Result<(), StorageError> {
171 let prefix: object_store::path::Path = prefix.as_str().into();
172 let locations = self
173 .object_store
174 .list(Some(&prefix))
175 .map_ok(|m| m.location)
176 .boxed();
177 handle_result(
178 self.object_store
179 .delete_stream(locations)
180 .try_collect::<Vec<Path>>()
181 .await,
182 )?;
183 Ok(())
184 }
185
186 fn supports_set_partial(&self) -> bool {
187 false
188 }
189}
190
191#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
192#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
193impl<T: object_store::ObjectStore> AsyncListableStorageTraits for AsyncObjectStore<T> {
194 async fn list(&self) -> Result<StoreKeys, StorageError> {
195 let mut list = handle_result(
196 self.object_store
197 .list(None)
198 .collect::<Vec<_>>()
199 .await
200 .into_iter()
201 .map(|object_meta| {
202 object_meta.map(|object_meta| {
203 let path: &str = object_meta.location.as_ref();
204 StoreKey::try_from(path).unwrap() })
206 })
207 .collect::<Result<Vec<_>, _>>(),
208 )?;
209 list.sort();
210 Ok(list)
211 }
212
213 async fn list_prefix(&self, prefix: &StorePrefix) -> Result<StoreKeys, StorageError> {
214 let path: object_store::path::Path = prefix.as_str().into();
216 let mut list = handle_result(
217 self.object_store
218 .list(Some(&path))
219 .collect::<Vec<_>>()
220 .await
221 .into_iter()
222 .map(|object_meta| {
223 object_meta.map(|object_meta| {
224 let path: &str = object_meta.location.as_ref();
225 StoreKey::try_from(path).unwrap() })
227 })
228 .collect::<Result<Vec<_>, _>>(),
229 )?;
230 list.sort();
231 Ok(list)
232 }
233
234 async fn list_dir(&self, prefix: &StorePrefix) -> Result<StoreKeysPrefixes, StorageError> {
235 let path: object_store::path::Path = prefix.as_str().into();
236 let list_result = handle_result(self.object_store.list_with_delimiter(Some(&path)).await)?;
237 let mut prefixes = list_result
238 .common_prefixes
239 .iter()
240 .map(|path| {
241 let path: &str = path.as_ref();
242 StorePrefix::new(path.to_string() + "/")
243 })
244 .collect::<Result<Vec<_>, _>>()?;
245 let mut keys = list_result
246 .objects
247 .iter()
248 .map(|object_meta| {
249 let path: &str = object_meta.location.as_ref();
250 StoreKey::try_from(path)
251 })
252 .collect::<Result<Vec<_>, _>>()?;
253 keys.sort();
254 prefixes.sort();
255 Ok(StoreKeysPrefixes::new(keys, prefixes))
256 }
257
258 async fn size_prefix(&self, prefix: &StorePrefix) -> Result<u64, StorageError> {
259 let prefix: object_store::path::Path = prefix.as_str().into();
260 let mut locations = self.object_store.list(Some(&prefix));
261 let mut size = 0;
262 while let Some(item) = locations.next().await {
263 let meta = handle_result(item)?;
264 size += meta.size;
265 }
266 Ok(size)
267 }
268
269 async fn size(&self) -> Result<u64, StorageError> {
270 let mut locations = self.object_store.list(None);
271 let mut size = 0;
272 while let Some(item) = locations.next().await {
273 let meta = handle_result(item)?;
274 size += meta.size;
275 }
276 Ok(size)
277 }
278}