zarrs_icechunk/
lib.rs

1//! [`icechunk`] store support for the [`zarrs`](https://docs.rs/zarrs/latest/zarrs/index.html) crate.
2//!
3//! Icechunk is a transactional store that enables `git`-like version control of Zarr hierarchies.
4//!
5//! `zarrs_icechunk` can read data in a range of archival formats (e.g., [`netCDF4`](https://www.unidata.ucar.edu/software/netcdf/), [`HDF5`](https://www.hdfgroup.org/solutions/hdf5/), etc.) that are converted to `icechunk`-backed "virtual Zarr datacubes" via [`VirtualiZarr`](https://github.com/zarr-developers/VirtualiZarr) (example below).
6//!
7//! ## Version Compatibility Matrix
8//!
9#![doc = include_str!("../doc/version_compatibility_matrix.md")]
10//!
11//! ## Examples
12//! ### Basic Usage and Version Control
13//! ```
14//! # use std::sync::Arc;
15//! # use zarrs_storage::{AsyncWritableStorageTraits, StoreKey};
16//! # use tokio::sync::RwLock;
17//! # use std::collections::HashMap;
18//! use icechunk::{Repository, RepositoryConfig, repository::VersionInfo};
19//! use zarrs_icechunk::AsyncIcechunkStore;
20//! # tokio_test::block_on(async {
21//! // Create an icechunk repository
22//! let storage = icechunk::new_in_memory_storage().await?;
23//! let config = RepositoryConfig::default();
24//! let repo = Repository::create(Some(config), storage, HashMap::new()).await?;
25//!
26//! // Do some array/metadata manipulation with zarrs, then commit a snapshot
27//! let session = repo.writable_session("main").await?;
28//! let store = Arc::new(AsyncIcechunkStore::new(session));
29//! # let root_json = StoreKey::new("zarr.json").unwrap();
30//! # store.set(&root_json, r#"{"zarr_format":3,"node_type":"group"}"#.into()).await?;
31//! let snapshot0 = store.session().write().await.commit("Initial commit", None).await?;
32//!
33//! // Do some more array/metadata manipulation, then commit another snapshot
34//! let session = repo.writable_session("main").await?;
35//! let store = Arc::new(AsyncIcechunkStore::new(session));
36//! # store.set(&root_json, r#"{"zarr_format":3,"node_type":"group","attributes":{"a":"b"}}"#.into()).await?;
37//! let snapshot1 = store.session().write().await.commit("Update data", None).await?;
38//!
39//! // Checkout the first snapshot
40//! let session = repo.readonly_session(&VersionInfo::SnapshotId(snapshot0)).await?;
41//! let store = Arc::new(AsyncIcechunkStore::new(session));
42//! # Ok::<_, Box<dyn std::error::Error>>(())
43//! # }).unwrap();
44//! ```
45//!
46//! ### Virtualise NetCDF as Zarr (via [`VirtualiZarr`](https://github.com/zarr-developers/VirtualiZarr))
47//! Decode a virtual Zarr array [`/examples/data/test.icechunk.zarr`]:
48//! ```bash
49//! cargo run --example virtualizarr_netcdf
50//! ```
51//! This references `/examples/data/test[0,1].nc` hosted in this repository over HTTP.
52//! [`/examples/data/test.icechunk.zarr`] was created with [`/examples/virtualizarr_netcdf.py`](https://github.com/zarrs/zarrs_icechunk/blob/main/examples/virtualizarr_netcdf.py).
53//!
54//! ## Licence
55//! `zarrs_icechunk` is licensed under either of
56//! - the Apache License, Version 2.0 [LICENSE-APACHE](https://docs.rs/crate/zarrs_icechunk/latest/source/LICENCE-APACHE) or <http://www.apache.org/licenses/LICENSE-2.0> or
57//! - the MIT license [LICENSE-MIT](https://docs.rs/crate/zarrs_icechunk/latest/source/LICENCE-MIT) or <http://opensource.org/licenses/MIT>, at your option.
58//!
59//! [`/examples/data/test.icechunk.zarr`]: https://github.com/zarrs/zarrs_icechunk/tree/main/examples/data/test.icechunk.zarr
60
61use std::sync::Arc;
62
63use futures::{future, stream::FuturesUnordered, StreamExt, TryStreamExt};
64pub use icechunk;
65
66use tokio::sync::RwLock;
67use zarrs_storage::{
68    byte_range::{ByteRange, ByteRangeIterator},
69    AsyncListableStorageTraits, AsyncMaybeBytesIterator, AsyncReadableStorageTraits,
70    AsyncWritableStorageTraits, Bytes, MaybeBytes, OffsetBytesIterator, StorageError, StoreKey,
71    StoreKeys, StoreKeysPrefixes, StorePrefix,
72};
73
74fn handle_err(err: icechunk::store::StoreError) -> StorageError {
75    StorageError::Other(err.to_string())
76}
77
78/// Map [`icechunk::zarr::StoreError::NotFound`] to None, pass through other errors
79fn handle_result_notfound<T>(
80    result: Result<T, icechunk::store::StoreError>,
81) -> Result<Option<T>, StorageError> {
82    match result {
83        Ok(result) => Ok(Some(result)),
84        Err(err) => {
85            if matches!(
86                err.kind(),
87                &icechunk::store::StoreErrorKind::NotFound { .. }
88            ) {
89                Ok(None)
90            } else {
91                Err(StorageError::Other(err.to_string()))
92            }
93        }
94    }
95}
96
97fn handle_result<T>(result: Result<T, icechunk::store::StoreError>) -> Result<T, StorageError> {
98    result.map_err(handle_err)
99}
100
101/// An asynchronous store backed by an [`icechunk::session::Session`].
102pub struct AsyncIcechunkStore {
103    icechunk_session: Arc<RwLock<icechunk::session::Session>>,
104}
105
106impl From<Arc<RwLock<icechunk::session::Session>>> for AsyncIcechunkStore {
107    fn from(icechunk_session: Arc<RwLock<icechunk::session::Session>>) -> Self {
108        Self { icechunk_session }
109    }
110}
111
112impl AsyncIcechunkStore {
113    async fn store(&self) -> icechunk::Store {
114        icechunk::Store::from_session(self.icechunk_session.clone()).await
115    }
116
117    /// Create a new [`AsyncIcechunkStore`].
118    #[must_use]
119    pub fn new(icechunk_session: icechunk::session::Session) -> Self {
120        Self {
121            icechunk_session: Arc::new(RwLock::new(icechunk_session)),
122        }
123    }
124
125    /// Return the inner [`icechunk::session::Session`].
126    #[must_use]
127    pub fn session(&self) -> Arc<RwLock<icechunk::session::Session>> {
128        self.icechunk_session.clone()
129    }
130
131    // TODO: Wait for async closures
132    // // /// Run a method on the underlying session.
133    // pub async fn with_session<F, T>(&self, f: F) -> icechunk::session::SessionResult<T>
134    // where
135    //     F: async FnOnce(&icechunk::session::Session) -> icechunk::session::SessionResult<T>,
136    // {
137    //     let session = self.icechunk_session.read().await;
138    //     f(&session).await
139    // }
140
141    // /// Run a mutable method on the underlying session.
142    // pub async fn with_session_mut<F, T>(&self, f: F) -> icechunk::session::SessionResult<T>
143    // where
144    //     F: async FnOnce(&icechunk::session::Session) -> icechunk::session::SessionResult<T>,
145    // {
146    //     let mut session = self.icechunk_session.write().await;
147    //     f(&mut session).await
148    // }
149}
150
151#[async_trait::async_trait]
152impl AsyncReadableStorageTraits for AsyncIcechunkStore {
153    async fn get(&self, key: &StoreKey) -> Result<MaybeBytes, StorageError> {
154        handle_result_notfound(
155            self.store()
156                .await
157                .get(key.as_str(), &icechunk::format::ByteRange::ALL)
158                .await,
159        )
160    }
161
162    async fn get_partial_many<'a>(
163        &'a self,
164        key: &StoreKey,
165        byte_ranges: ByteRangeIterator<'a>,
166    ) -> Result<AsyncMaybeBytesIterator<'a>, StorageError> {
167        let byte_ranges: Vec<_> = byte_ranges
168            .map(|byte_range| {
169                let key = key.to_string();
170                let byte_range = match byte_range {
171                    ByteRange::FromStart(offset, None) => {
172                        icechunk::format::ByteRange::from_offset(offset)
173                    }
174                    ByteRange::FromStart(offset, Some(length)) => {
175                        icechunk::format::ByteRange::from_offset_with_length(offset, length)
176                    }
177                    ByteRange::Suffix(length) => icechunk::format::ByteRange::Last(length),
178                };
179                (key, byte_range)
180            })
181            .collect();
182        let result =
183            handle_result_notfound(self.store().await.get_partial_values(byte_ranges).await)?;
184        if let Some(result) = result {
185            Ok(Some(
186                futures::stream::iter(result.into_iter().map(handle_result)).boxed(),
187            ))
188        } else {
189            Ok(None)
190        }
191    }
192
193    // NOTE: this does not differentiate between not found and empty
194    async fn size_key(&self, key: &StoreKey) -> Result<Option<u64>, StorageError> {
195        let key = key.to_string();
196        handle_result(self.store().await.getsize(&key).await).map(Some)
197    }
198
199    fn supports_get_partial(&self) -> bool {
200        true
201    }
202}
203
204#[async_trait::async_trait]
205impl AsyncWritableStorageTraits for AsyncIcechunkStore {
206    async fn set(&self, key: &StoreKey, value: Bytes) -> Result<(), StorageError> {
207        handle_result(self.store().await.set(key.as_str(), value).await)?;
208        Ok(())
209    }
210
211    async fn set_partial_many<'a>(
212        &'a self,
213        _key: &StoreKey,
214        _offset_values: OffsetBytesIterator<'a>,
215    ) -> Result<(), StorageError> {
216        if self
217            .store()
218            .await
219            .supports_partial_writes()
220            .map_err(handle_err)?
221        {
222            // FIXME: Upstream: icechunk::Store does not support partial writes
223            Err(StorageError::Unsupported(
224                "the store does not support partial writes".to_string(),
225            ))
226        } else {
227            Err(StorageError::Unsupported(
228                "the store does not support partial writes".to_string(),
229            ))
230        }
231    }
232
233    async fn erase(&self, key: &StoreKey) -> Result<(), StorageError> {
234        if self.store().await.supports_deletes().map_err(handle_err)? {
235            handle_result_notfound(self.store().await.delete(key.as_str()).await)?;
236            Ok(())
237        } else {
238            Err(StorageError::Unsupported(
239                "the store does not support deletion".to_string(),
240            ))
241        }
242    }
243
244    async fn erase_prefix(&self, prefix: &StorePrefix) -> Result<(), StorageError> {
245        if self.store().await.supports_deletes().map_err(handle_err)? {
246            let keys = self
247                .store()
248                .await
249                .list_prefix(prefix.as_str())
250                .await
251                .map_err(handle_err)?
252                .try_collect::<Vec<_>>() // TODO: do not collect, use try_for_each
253                .await
254                .map_err(handle_err)?;
255            for key in keys {
256                self.store().await.delete(&key).await.map_err(handle_err)?;
257            }
258            Ok(())
259        } else {
260            Err(StorageError::Unsupported(
261                "the store does not support deletion".to_string(),
262            ))
263        }
264    }
265
266    fn supports_set_partial(&self) -> bool {
267        false
268    }
269}
270
271#[async_trait::async_trait]
272impl AsyncListableStorageTraits for AsyncIcechunkStore {
273    async fn list(&self) -> Result<StoreKeys, StorageError> {
274        let keys = self.store().await.list().await.map_err(handle_err)?;
275        keys.map(|key| match key {
276            Ok(key) => Ok(StoreKey::new(&key)?),
277            Err(err) => Err(StorageError::Other(err.to_string())),
278        })
279        .try_collect::<Vec<_>>()
280        .await
281    }
282
283    async fn list_prefix(&self, prefix: &StorePrefix) -> Result<StoreKeys, StorageError> {
284        let keys = self
285            .store()
286            .await
287            .list_prefix(prefix.as_str())
288            .await
289            .map_err(handle_err)?;
290        keys.map(|key| match key {
291            Ok(key) => Ok(StoreKey::new(&key)?),
292            Err(err) => Err(StorageError::Other(err.to_string())),
293        })
294        .try_collect::<Vec<_>>()
295        .await
296    }
297
298    async fn list_dir(&self, prefix: &StorePrefix) -> Result<StoreKeysPrefixes, StorageError> {
299        let keys_prefixes = self
300            .store()
301            .await
302            .list_dir_items(prefix.as_str())
303            .await
304            .map_err(handle_err)?;
305        let mut keys = vec![];
306        let mut prefixes = vec![];
307        keys_prefixes
308            .map_err(handle_err)
309            .map(|item| {
310                match item? {
311                    icechunk::store::ListDirItem::Key(key) => {
312                        keys.push(StoreKey::new(format!("{}{}", prefix.as_str(), &key))?);
313                    }
314                    icechunk::store::ListDirItem::Prefix(prefix_inner) => {
315                        prefixes.push(StorePrefix::new(format!(
316                            "{}{}/",
317                            prefix.as_str(),
318                            &prefix_inner
319                        ))?);
320                    }
321                }
322                Ok::<_, StorageError>(())
323            })
324            .try_for_each(|_| future::ready(Ok(())))
325            .await?;
326
327        Ok(StoreKeysPrefixes::new(keys, prefixes))
328    }
329
330    async fn size_prefix(&self, prefix: &StorePrefix) -> Result<u64, StorageError> {
331        let keys = self.list_prefix(prefix).await?;
332        let mut futures: FuturesUnordered<_> = keys
333            .into_iter()
334            .map(|key| async move {
335                let key = key.to_string();
336                handle_result(self.store().await.getsize(&key).await)
337            })
338            .collect();
339        let mut sum = 0;
340        while let Some(result) = futures.next().await {
341            sum += result?;
342        }
343        Ok(sum)
344    }
345
346    async fn size(&self) -> Result<u64, StorageError> {
347        self.size_prefix(&StorePrefix::root()).await
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use icechunk::{repository::VersionInfo, Repository, RepositoryConfig};
354
355    use super::*;
356    use std::{collections::HashMap, error::Error};
357
358    fn remove_whitespace(s: &str) -> String {
359        s.chars().filter(|c| !c.is_whitespace()).collect()
360    }
361
362    // NOTE: The icechunk store is not a run-of-the-mill Zarr store that knows nothing about Zarr.
363    // It adds additional requirements on keys/data (like looking for known zarr metadata, c prefix, etc.)
364    // Thus it does not support the current zarrs async store test suite.
365    // The test suite could be changed to only create a structure that is actually zarr specific (standard keys, actually valid group/array json, c/ prefix etc)
366    #[tokio::test]
367    #[ignore]
368    async fn icechunk() -> Result<(), Box<dyn Error>> {
369        let storage = icechunk::new_in_memory_storage().await?;
370        let config = RepositoryConfig::default();
371        let repo = Repository::create(Some(config), storage, HashMap::new()).await?;
372        let store = AsyncIcechunkStore::new(repo.writable_session("main").await?);
373
374        zarrs_storage::store_test::async_store_write(&store).await?;
375        zarrs_storage::store_test::async_store_read(&store).await?;
376        zarrs_storage::store_test::async_store_list(&store).await?;
377
378        Ok(())
379    }
380
381    #[tokio::test]
382    async fn icechunk_time_travel() -> Result<(), Box<dyn Error>> {
383        let storage = icechunk::new_in_memory_storage().await?;
384        let config = RepositoryConfig::default();
385        let repo = Repository::create(Some(config), storage, HashMap::new()).await?;
386
387        let json = r#"{
388            "zarr_format": 3,
389            "node_type": "group"
390        }"#;
391        let json: String = remove_whitespace(json);
392
393        let json_updated = r#"{
394            "zarr_format": 3,
395            "node_type": "group",
396            "attributes": {
397                "icechunk": "x zarrs"
398            }
399        }"#;
400        let json_updated: String = remove_whitespace(json_updated);
401
402        let root_json = StoreKey::new("zarr.json").unwrap();
403
404        let store = AsyncIcechunkStore::new(repo.writable_session("main").await?);
405        assert_eq!(store.get(&root_json).await?, None);
406        store.set(&root_json, json.clone().into()).await?;
407        assert_eq!(store.get(&root_json).await?, Some(json.clone().into()));
408        let snapshot0 = store
409            .session()
410            .write()
411            .await
412            .commit("intial commit", None)
413            .await?;
414
415        let store = AsyncIcechunkStore::new(repo.writable_session("main").await?);
416        store.set(&root_json, json_updated.clone().into()).await?;
417        let _snapshot1 = store
418            .session()
419            .write()
420            .await
421            .commit("write attributes", None)
422            .await?;
423        assert_eq!(store.get(&root_json).await?, Some(json_updated.into()));
424
425        let session = repo
426            .readonly_session(&VersionInfo::SnapshotId(snapshot0))
427            .await?;
428        let store = AsyncIcechunkStore::new(session);
429        assert_eq!(store.get(&root_json).await?, Some(json.clone().into()));
430
431        Ok(())
432    }
433
434    #[tokio::test]
435    async fn list_dir_and_list_prefix_nested() -> Result<(), Box<dyn Error>> {
436        // Create an icechunk repository with a deeply nested zarr hierarchy
437        let storage = icechunk::new_in_memory_storage().await?;
438        let config = RepositoryConfig::default();
439        let repo = Repository::create(Some(config), storage, HashMap::new()).await?;
440        let store = AsyncIcechunkStore::new(repo.writable_session("main").await?);
441
442        let group_json = r#"{"zarr_format":3,"node_type":"group"}"#;
443        let array_json = r#"{"zarr_format":3,"node_type":"array","shape":[10, 10],"data_type":"int32","chunk_grid":{"name":"regular","configuration":{"chunk_shape":[5, 5]}},"chunk_key_encoding":{"name":"default","configuration":{"separator":"/"}},"fill_value":0,"codecs":[{"name":"bytes","configuration":{"endian":"little"}}]}"#;
444
445        // Create a deeply nested hierarchy:
446        // root/
447        //   zarr.json
448        //   group0/
449        //     zarr.json
450        //     group1/
451        //       zarr.json
452        //       array0/
453        //         zarr.json
454        //         c/0/0
455        //         c/0/1
456        //       array1/
457        //         zarr.json
458        //         c/0/0
459
460        // Create groups
461        store
462            .set(&StoreKey::new("zarr.json")?, group_json.into())
463            .await?;
464        store
465            .set(&StoreKey::new("group0/zarr.json")?, group_json.into())
466            .await?;
467        store
468            .set(
469                &StoreKey::new("group0/group1/zarr.json")?,
470                group_json.into(),
471            )
472            .await?;
473
474        // Create arrays
475        store
476            .set(
477                &StoreKey::new("group0/group1/array0/zarr.json")?,
478                array_json.into(),
479            )
480            .await?;
481        store
482            .set(
483                &StoreKey::new("group0/group1/array0/c/0/0")?,
484                vec![0u8; 20].into(),
485            )
486            .await?;
487        store
488            .set(
489                &StoreKey::new("group0/group1/array0/c/0/1")?,
490                vec![1u8; 20].into(),
491            )
492            .await?;
493
494        store
495            .set(
496                &StoreKey::new("group0/group1/array1/zarr.json")?,
497                array_json.into(),
498            )
499            .await?;
500        store
501            .set(
502                &StoreKey::new("group0/group1/array1/c/0/0")?,
503                vec![2u8; 20].into(),
504            )
505            .await?;
506
507        // Commit the data
508        store
509            .session()
510            .write()
511            .await
512            .commit("Create nested hierarchy", None)
513            .await?;
514
515        // Test list_dir at root
516        let root_items = store.list_dir(&StorePrefix::root()).await?;
517        assert_eq!(root_items.keys().len(), 1); // zarr.json
518        assert_eq!(root_items.prefixes().len(), 1); // group0/
519        assert!(root_items.keys().contains(&StoreKey::new("zarr.json")?));
520        assert!(root_items
521            .prefixes()
522            .contains(&StorePrefix::new("group0/")?));
523
524        // Test list_dir at group0
525        let group0_items = store.list_dir(&StorePrefix::new("group0/")?).await?;
526        assert_eq!(group0_items.keys().len(), 1); // zarr.json
527        assert_eq!(group0_items.prefixes().len(), 1); // group1/
528        assert!(group0_items
529            .keys()
530            .contains(&StoreKey::new("group0/zarr.json")?));
531        assert!(group0_items
532            .prefixes()
533            .contains(&StorePrefix::new("group0/group1/")?));
534
535        // Test list_dir at group1
536        let group1_items = store.list_dir(&StorePrefix::new("group0/group1/")?).await?;
537        assert_eq!(group1_items.keys().len(), 1); // zarr.json
538        assert_eq!(group1_items.prefixes().len(), 2); // array0/, array1/
539        assert!(group1_items
540            .keys()
541            .contains(&StoreKey::new("group0/group1/zarr.json")?));
542        assert!(group1_items
543            .prefixes()
544            .contains(&StorePrefix::new("group0/group1/array0/")?));
545        assert!(group1_items
546            .prefixes()
547            .contains(&StorePrefix::new("group0/group1/array1/")?));
548
549        // Test list_dir inside array0
550        let array0_items = store
551            .list_dir(&StorePrefix::new("group0/group1/array0/")?)
552            .await?;
553        assert_eq!(array0_items.keys().len(), 1); // zarr.json
554        assert_eq!(array0_items.prefixes().len(), 1); // c/
555        assert!(array0_items
556            .keys()
557            .contains(&StoreKey::new("group0/group1/array0/zarr.json")?));
558        assert!(array0_items
559            .prefixes()
560            .contains(&StorePrefix::new("group0/group1/array0/c/")?));
561
562        // Test list_dir inside array0/c/ directory
563        let array0_c_items = store
564            .list_dir(&StorePrefix::new("group0/group1/array0/c/")?)
565            .await?;
566        assert_eq!(array0_c_items.keys().len(), 0); // no direct keys
567        assert_eq!(array0_c_items.prefixes().len(), 1); // 0/
568        assert!(array0_c_items
569            .prefixes()
570            .contains(&StorePrefix::new("group0/group1/array0/c/0/")?));
571
572        // Test list_dir inside array0/c/0/ directory
573        let array0_c0_items = store
574            .list_dir(&StorePrefix::new("group0/group1/array0/c/0/")?)
575            .await?;
576        assert_eq!(array0_c0_items.keys().len(), 2); // 0, 1
577        assert_eq!(array0_c0_items.prefixes().len(), 0); // no subdirectories
578        assert!(array0_c0_items
579            .keys()
580            .contains(&StoreKey::new("group0/group1/array0/c/0/0")?));
581        assert!(array0_c0_items
582            .keys()
583            .contains(&StoreKey::new("group0/group1/array0/c/0/1")?));
584
585        // Test list_prefix at root (should get all keys recursively)
586        let all_keys = store.list_prefix(&StorePrefix::root()).await?;
587        assert_eq!(all_keys.len(), 8); // Total of 8 keys in the hierarchy
588
589        // Test list_prefix at group0
590        let group0_keys = store.list_prefix(&StorePrefix::new("group0/")?).await?;
591        assert_eq!(group0_keys.len(), 7); // All keys under group0/
592
593        // Test list_prefix at group1
594        let group1_keys = store
595            .list_prefix(&StorePrefix::new("group0/group1/")?)
596            .await?;
597        assert_eq!(group1_keys.len(), 6); // zarr.json + 2 arrays with their chunks
598
599        // Test list_prefix for array0 (should get all chunks recursively)
600        let array0_keys = store
601            .list_prefix(&StorePrefix::new("group0/group1/array0/")?)
602            .await?;
603        assert_eq!(array0_keys.len(), 3); // zarr.json + 2 chunks
604        assert!(array0_keys.contains(&StoreKey::new("group0/group1/array0/zarr.json")?));
605        assert!(array0_keys.contains(&StoreKey::new("group0/group1/array0/c/0/0")?));
606        assert!(array0_keys.contains(&StoreKey::new("group0/group1/array0/c/0/1")?));
607
608        // Test list_prefix for array1 (should get all chunks recursively)
609        let array1_keys = store
610            .list_prefix(&StorePrefix::new("group0/group1/array1/")?)
611            .await?;
612        assert_eq!(array1_keys.len(), 2); // zarr.json + 1 chunk
613        assert!(array1_keys.contains(&StoreKey::new("group0/group1/array1/zarr.json")?));
614        assert!(array1_keys.contains(&StoreKey::new("group0/group1/array1/c/0/0")?));
615
616        Ok(())
617    }
618}