1#![doc = include_str!("../doc/version_compatibility_matrix.md")]
10use std::sync::Arc;
62
63use futures::{future, stream::FuturesUnordered, StreamExt, TryStreamExt};
64pub use icechunk;
65
66use tokio::sync::RwLock;
67use zarrs_storage::{
68 byte_range::{ByteRange, ByteRangeIterator},
69 AsyncListableStorageTraits, AsyncMaybeBytesIterator, AsyncReadableStorageTraits,
70 AsyncWritableStorageTraits, Bytes, MaybeBytes, OffsetBytesIterator, StorageError, StoreKey,
71 StoreKeys, StoreKeysPrefixes, StorePrefix,
72};
73
74fn handle_err(err: icechunk::store::StoreError) -> StorageError {
75 StorageError::Other(err.to_string())
76}
77
78fn handle_result_notfound<T>(
80 result: Result<T, icechunk::store::StoreError>,
81) -> Result<Option<T>, StorageError> {
82 match result {
83 Ok(result) => Ok(Some(result)),
84 Err(err) => {
85 if matches!(
86 err.kind(),
87 &icechunk::store::StoreErrorKind::NotFound { .. }
88 ) {
89 Ok(None)
90 } else {
91 Err(StorageError::Other(err.to_string()))
92 }
93 }
94 }
95}
96
97fn handle_result<T>(result: Result<T, icechunk::store::StoreError>) -> Result<T, StorageError> {
98 result.map_err(handle_err)
99}
100
101pub struct AsyncIcechunkStore {
103 icechunk_session: Arc<RwLock<icechunk::session::Session>>,
104}
105
106impl From<Arc<RwLock<icechunk::session::Session>>> for AsyncIcechunkStore {
107 fn from(icechunk_session: Arc<RwLock<icechunk::session::Session>>) -> Self {
108 Self { icechunk_session }
109 }
110}
111
112impl AsyncIcechunkStore {
113 async fn store(&self) -> icechunk::Store {
114 icechunk::Store::from_session(self.icechunk_session.clone()).await
115 }
116
117 #[must_use]
119 pub fn new(icechunk_session: icechunk::session::Session) -> Self {
120 Self {
121 icechunk_session: Arc::new(RwLock::new(icechunk_session)),
122 }
123 }
124
125 #[must_use]
127 pub fn session(&self) -> Arc<RwLock<icechunk::session::Session>> {
128 self.icechunk_session.clone()
129 }
130
131 }
150
151#[async_trait::async_trait]
152impl AsyncReadableStorageTraits for AsyncIcechunkStore {
153 async fn get(&self, key: &StoreKey) -> Result<MaybeBytes, StorageError> {
154 handle_result_notfound(
155 self.store()
156 .await
157 .get(key.as_str(), &icechunk::format::ByteRange::ALL)
158 .await,
159 )
160 }
161
162 async fn get_partial_many<'a>(
163 &'a self,
164 key: &StoreKey,
165 byte_ranges: ByteRangeIterator<'a>,
166 ) -> Result<AsyncMaybeBytesIterator<'a>, StorageError> {
167 let byte_ranges: Vec<_> = byte_ranges
168 .map(|byte_range| {
169 let key = key.to_string();
170 let byte_range = match byte_range {
171 ByteRange::FromStart(offset, None) => {
172 icechunk::format::ByteRange::from_offset(offset)
173 }
174 ByteRange::FromStart(offset, Some(length)) => {
175 icechunk::format::ByteRange::from_offset_with_length(offset, length)
176 }
177 ByteRange::Suffix(length) => icechunk::format::ByteRange::Last(length),
178 };
179 (key, byte_range)
180 })
181 .collect();
182 let result =
183 handle_result_notfound(self.store().await.get_partial_values(byte_ranges).await)?;
184 if let Some(result) = result {
185 Ok(Some(
186 futures::stream::iter(result.into_iter().map(handle_result)).boxed(),
187 ))
188 } else {
189 Ok(None)
190 }
191 }
192
193 async fn size_key(&self, key: &StoreKey) -> Result<Option<u64>, StorageError> {
195 let key = key.to_string();
196 handle_result(self.store().await.getsize(&key).await).map(Some)
197 }
198
199 fn supports_get_partial(&self) -> bool {
200 true
201 }
202}
203
204#[async_trait::async_trait]
205impl AsyncWritableStorageTraits for AsyncIcechunkStore {
206 async fn set(&self, key: &StoreKey, value: Bytes) -> Result<(), StorageError> {
207 handle_result(self.store().await.set(key.as_str(), value).await)?;
208 Ok(())
209 }
210
211 async fn set_partial_many<'a>(
212 &'a self,
213 _key: &StoreKey,
214 _offset_values: OffsetBytesIterator<'a>,
215 ) -> Result<(), StorageError> {
216 if self
217 .store()
218 .await
219 .supports_partial_writes()
220 .map_err(handle_err)?
221 {
222 Err(StorageError::Unsupported(
224 "the store does not support partial writes".to_string(),
225 ))
226 } else {
227 Err(StorageError::Unsupported(
228 "the store does not support partial writes".to_string(),
229 ))
230 }
231 }
232
233 async fn erase(&self, key: &StoreKey) -> Result<(), StorageError> {
234 if self.store().await.supports_deletes().map_err(handle_err)? {
235 handle_result_notfound(self.store().await.delete(key.as_str()).await)?;
236 Ok(())
237 } else {
238 Err(StorageError::Unsupported(
239 "the store does not support deletion".to_string(),
240 ))
241 }
242 }
243
244 async fn erase_prefix(&self, prefix: &StorePrefix) -> Result<(), StorageError> {
245 if self.store().await.supports_deletes().map_err(handle_err)? {
246 let keys = self
247 .store()
248 .await
249 .list_prefix(prefix.as_str())
250 .await
251 .map_err(handle_err)?
252 .try_collect::<Vec<_>>() .await
254 .map_err(handle_err)?;
255 for key in keys {
256 self.store().await.delete(&key).await.map_err(handle_err)?;
257 }
258 Ok(())
259 } else {
260 Err(StorageError::Unsupported(
261 "the store does not support deletion".to_string(),
262 ))
263 }
264 }
265
266 fn supports_set_partial(&self) -> bool {
267 false
268 }
269}
270
271#[async_trait::async_trait]
272impl AsyncListableStorageTraits for AsyncIcechunkStore {
273 async fn list(&self) -> Result<StoreKeys, StorageError> {
274 let keys = self.store().await.list().await.map_err(handle_err)?;
275 keys.map(|key| match key {
276 Ok(key) => Ok(StoreKey::new(&key)?),
277 Err(err) => Err(StorageError::Other(err.to_string())),
278 })
279 .try_collect::<Vec<_>>()
280 .await
281 }
282
283 async fn list_prefix(&self, prefix: &StorePrefix) -> Result<StoreKeys, StorageError> {
284 let keys = self
285 .store()
286 .await
287 .list_prefix(prefix.as_str())
288 .await
289 .map_err(handle_err)?;
290 keys.map(|key| match key {
291 Ok(key) => Ok(StoreKey::new(&key)?),
292 Err(err) => Err(StorageError::Other(err.to_string())),
293 })
294 .try_collect::<Vec<_>>()
295 .await
296 }
297
298 async fn list_dir(&self, prefix: &StorePrefix) -> Result<StoreKeysPrefixes, StorageError> {
299 let keys_prefixes = self
300 .store()
301 .await
302 .list_dir_items(prefix.as_str())
303 .await
304 .map_err(handle_err)?;
305 let mut keys = vec![];
306 let mut prefixes = vec![];
307 keys_prefixes
308 .map_err(handle_err)
309 .map(|item| {
310 match item? {
311 icechunk::store::ListDirItem::Key(key) => {
312 keys.push(StoreKey::new(format!("{}{}", prefix.as_str(), &key))?);
313 }
314 icechunk::store::ListDirItem::Prefix(prefix_inner) => {
315 prefixes.push(StorePrefix::new(format!(
316 "{}{}/",
317 prefix.as_str(),
318 &prefix_inner
319 ))?);
320 }
321 }
322 Ok::<_, StorageError>(())
323 })
324 .try_for_each(|_| future::ready(Ok(())))
325 .await?;
326
327 Ok(StoreKeysPrefixes::new(keys, prefixes))
328 }
329
330 async fn size_prefix(&self, prefix: &StorePrefix) -> Result<u64, StorageError> {
331 let keys = self.list_prefix(prefix).await?;
332 let mut futures: FuturesUnordered<_> = keys
333 .into_iter()
334 .map(|key| async move {
335 let key = key.to_string();
336 handle_result(self.store().await.getsize(&key).await)
337 })
338 .collect();
339 let mut sum = 0;
340 while let Some(result) = futures.next().await {
341 sum += result?;
342 }
343 Ok(sum)
344 }
345
346 async fn size(&self) -> Result<u64, StorageError> {
347 self.size_prefix(&StorePrefix::root()).await
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use icechunk::{repository::VersionInfo, Repository, RepositoryConfig};
354
355 use super::*;
356 use std::{collections::HashMap, error::Error};
357
358 fn remove_whitespace(s: &str) -> String {
359 s.chars().filter(|c| !c.is_whitespace()).collect()
360 }
361
362 #[tokio::test]
367 #[ignore]
368 async fn icechunk() -> Result<(), Box<dyn Error>> {
369 let storage = icechunk::new_in_memory_storage().await?;
370 let config = RepositoryConfig::default();
371 let repo = Repository::create(Some(config), storage, HashMap::new()).await?;
372 let store = AsyncIcechunkStore::new(repo.writable_session("main").await?);
373
374 zarrs_storage::store_test::async_store_write(&store).await?;
375 zarrs_storage::store_test::async_store_read(&store).await?;
376 zarrs_storage::store_test::async_store_list(&store).await?;
377
378 Ok(())
379 }
380
381 #[tokio::test]
382 async fn icechunk_time_travel() -> Result<(), Box<dyn Error>> {
383 let storage = icechunk::new_in_memory_storage().await?;
384 let config = RepositoryConfig::default();
385 let repo = Repository::create(Some(config), storage, HashMap::new()).await?;
386
387 let json = r#"{
388 "zarr_format": 3,
389 "node_type": "group"
390 }"#;
391 let json: String = remove_whitespace(json);
392
393 let json_updated = r#"{
394 "zarr_format": 3,
395 "node_type": "group",
396 "attributes": {
397 "icechunk": "x zarrs"
398 }
399 }"#;
400 let json_updated: String = remove_whitespace(json_updated);
401
402 let root_json = StoreKey::new("zarr.json").unwrap();
403
404 let store = AsyncIcechunkStore::new(repo.writable_session("main").await?);
405 assert_eq!(store.get(&root_json).await?, None);
406 store.set(&root_json, json.clone().into()).await?;
407 assert_eq!(store.get(&root_json).await?, Some(json.clone().into()));
408 let snapshot0 = store
409 .session()
410 .write()
411 .await
412 .commit("intial commit", None)
413 .await?;
414
415 let store = AsyncIcechunkStore::new(repo.writable_session("main").await?);
416 store.set(&root_json, json_updated.clone().into()).await?;
417 let _snapshot1 = store
418 .session()
419 .write()
420 .await
421 .commit("write attributes", None)
422 .await?;
423 assert_eq!(store.get(&root_json).await?, Some(json_updated.into()));
424
425 let session = repo
426 .readonly_session(&VersionInfo::SnapshotId(snapshot0))
427 .await?;
428 let store = AsyncIcechunkStore::new(session);
429 assert_eq!(store.get(&root_json).await?, Some(json.clone().into()));
430
431 Ok(())
432 }
433
434 #[tokio::test]
435 async fn list_dir_and_list_prefix_nested() -> Result<(), Box<dyn Error>> {
436 let storage = icechunk::new_in_memory_storage().await?;
438 let config = RepositoryConfig::default();
439 let repo = Repository::create(Some(config), storage, HashMap::new()).await?;
440 let store = AsyncIcechunkStore::new(repo.writable_session("main").await?);
441
442 let group_json = r#"{"zarr_format":3,"node_type":"group"}"#;
443 let array_json = r#"{"zarr_format":3,"node_type":"array","shape":[10, 10],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[5, 5]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"bytes","configuration":{"endian":"little"}}]}"#;
444
445 store
462 .set(&StoreKey::new("zarr.json")?, group_json.into())
463 .await?;
464 store
465 .set(&StoreKey::new("group0/zarr.json")?, group_json.into())
466 .await?;
467 store
468 .set(
469 &StoreKey::new("group0/group1/zarr.json")?,
470 group_json.into(),
471 )
472 .await?;
473
474 store
476 .set(
477 &StoreKey::new("group0/group1/array0/zarr.json")?,
478 array_json.into(),
479 )
480 .await?;
481 store
482 .set(
483 &StoreKey::new("group0/group1/array0/c/0/0")?,
484 vec![0u8; 20].into(),
485 )
486 .await?;
487 store
488 .set(
489 &StoreKey::new("group0/group1/array0/c/0/1")?,
490 vec![1u8; 20].into(),
491 )
492 .await?;
493
494 store
495 .set(
496 &StoreKey::new("group0/group1/array1/zarr.json")?,
497 array_json.into(),
498 )
499 .await?;
500 store
501 .set(
502 &StoreKey::new("group0/group1/array1/c/0/0")?,
503 vec![2u8; 20].into(),
504 )
505 .await?;
506
507 store
509 .session()
510 .write()
511 .await
512 .commit("Create nested hierarchy", None)
513 .await?;
514
515 let root_items = store.list_dir(&StorePrefix::root()).await?;
517 assert_eq!(root_items.keys().len(), 1); assert_eq!(root_items.prefixes().len(), 1); assert!(root_items.keys().contains(&StoreKey::new("zarr.json")?));
520 assert!(root_items
521 .prefixes()
522 .contains(&StorePrefix::new("group0/")?));
523
524 let group0_items = store.list_dir(&StorePrefix::new("group0/")?).await?;
526 assert_eq!(group0_items.keys().len(), 1); assert_eq!(group0_items.prefixes().len(), 1); assert!(group0_items
529 .keys()
530 .contains(&StoreKey::new("group0/zarr.json")?));
531 assert!(group0_items
532 .prefixes()
533 .contains(&StorePrefix::new("group0/group1/")?));
534
535 let group1_items = store.list_dir(&StorePrefix::new("group0/group1/")?).await?;
537 assert_eq!(group1_items.keys().len(), 1); assert_eq!(group1_items.prefixes().len(), 2); assert!(group1_items
540 .keys()
541 .contains(&StoreKey::new("group0/group1/zarr.json")?));
542 assert!(group1_items
543 .prefixes()
544 .contains(&StorePrefix::new("group0/group1/array0/")?));
545 assert!(group1_items
546 .prefixes()
547 .contains(&StorePrefix::new("group0/group1/array1/")?));
548
549 let array0_items = store
551 .list_dir(&StorePrefix::new("group0/group1/array0/")?)
552 .await?;
553 assert_eq!(array0_items.keys().len(), 1); assert_eq!(array0_items.prefixes().len(), 1); assert!(array0_items
556 .keys()
557 .contains(&StoreKey::new("group0/group1/array0/zarr.json")?));
558 assert!(array0_items
559 .prefixes()
560 .contains(&StorePrefix::new("group0/group1/array0/c/")?));
561
562 let array0_c_items = store
564 .list_dir(&StorePrefix::new("group0/group1/array0/c/")?)
565 .await?;
566 assert_eq!(array0_c_items.keys().len(), 0); assert_eq!(array0_c_items.prefixes().len(), 1); assert!(array0_c_items
569 .prefixes()
570 .contains(&StorePrefix::new("group0/group1/array0/c/0/")?));
571
572 let array0_c0_items = store
574 .list_dir(&StorePrefix::new("group0/group1/array0/c/0/")?)
575 .await?;
576 assert_eq!(array0_c0_items.keys().len(), 2); assert_eq!(array0_c0_items.prefixes().len(), 0); assert!(array0_c0_items
579 .keys()
580 .contains(&StoreKey::new("group0/group1/array0/c/0/0")?));
581 assert!(array0_c0_items
582 .keys()
583 .contains(&StoreKey::new("group0/group1/array0/c/0/1")?));
584
585 let all_keys = store.list_prefix(&StorePrefix::root()).await?;
587 assert_eq!(all_keys.len(), 8); let group0_keys = store.list_prefix(&StorePrefix::new("group0/")?).await?;
591 assert_eq!(group0_keys.len(), 7); let group1_keys = store
595 .list_prefix(&StorePrefix::new("group0/group1/")?)
596 .await?;
597 assert_eq!(group1_keys.len(), 6); let array0_keys = store
601 .list_prefix(&StorePrefix::new("group0/group1/array0/")?)
602 .await?;
603 assert_eq!(array0_keys.len(), 3); assert!(array0_keys.contains(&StoreKey::new("group0/group1/array0/zarr.json")?));
605 assert!(array0_keys.contains(&StoreKey::new("group0/group1/array0/c/0/0")?));
606 assert!(array0_keys.contains(&StoreKey::new("group0/group1/array0/c/0/1")?));
607
608 let array1_keys = store
610 .list_prefix(&StorePrefix::new("group0/group1/array1/")?)
611 .await?;
612 assert_eq!(array1_keys.len(), 2); assert!(array1_keys.contains(&StoreKey::new("group0/group1/array1/zarr.json")?));
614 assert!(array1_keys.contains(&StoreKey::new("group0/group1/array1/c/0/0")?));
615
616 Ok(())
617 }
618}