icechunk 2.0.4

Transactional storage engine for Zarr designed for use on cloud object storage
Documentation
#![allow(clippy::panic, clippy::unwrap_used, clippy::expect_used)]
use std::{collections::HashMap, sync::Arc};

use bytes::Bytes;
use icechunk::{
    Repository, Storage,
    format::{ChunkIndices, Path, manifest::ChunkPayload, snapshot::ArrayShape},
    repository::VersionInfo,
    session::{Session, SessionError},
    storage::new_in_memory_storage,
};
use itertools::Itertools as _;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("# The `Repository` abstraction in Icechunk 2");
    println!("## First create the `Repository`");
    println!(
        r#"
```
let storage: Arc<dyn Storage + Send + Sync> = Arc::new(InMemoryStorage::new());
let storage: Arc<dyn Storage + Send + Sync> =
    Arc::new(MemCachingStorage::new(storage, 100_000_000));
let mut ds = Repository::create(Arc::clone(&storage));
```
"#,
    );

    let storage: Arc<dyn Storage + Send + Sync> = new_in_memory_storage().await?;
    let repo = Repository::create(None, Arc::clone(&storage), HashMap::new(), None, true)
        .await?;
    let mut ds = repo.writable_session("main").await?;

    println!();
    println!();
    println!(
        r#"## Adding 3 groups

```
ds.add_group("/".into()).await?;
ds.add_group("/group1".into()).await?;
ds.add_group("/group2".into()).await?;
```
"#,
    );

    let user_data = Bytes::new();
    ds.add_group(Path::root(), user_data.clone()).await?;
    ds.add_group("/group1".try_into().unwrap(), user_data.clone()).await?;
    ds.add_group("/group2".try_into().unwrap(), user_data.clone()).await?;

    println!();
    print_nodes(&ds).await?;

    println!(
        r#"
```
let rows = ds
    .list_nodes()
    .await
    .sorted_by_key(|n| n.path.clone())
```
"#,
    );

    println!();
    println!("## Adding an array");

    println!(
        r#"
```
let zarr_meta1 = ZarrArrayMetadata {{
    shape: vec![3],
    data_type: DataType::Int32,
    chunk_shape: ChunkShape(vec![
        NonZeroU64::new(1).unwrap(),
        NonZeroU64::new(1).unwrap(),
        NonZeroU64::new(1).unwrap(),
    ]),
    chunk_key_encoding: ChunkKeyEncoding::Slash,
    fill_value: FillValue::Int32(0),
    codecs: Codecs("codec".to_string()),
    storage_transformers: Some(StorageTransformers("transformers".to_string())),
    dimension_names: Some(vec![
        Some("x".to_string()),
        Some("y".to_string()),
        Some("t".to_string()),
    ]),
}};

let array1_path: Path = "/group1/array1".into();

ds.add_array(array1_path.clone(), zarr_meta1).await?;
```
"#,
    );

    let shape = ArrayShape::new(vec![(3, 1)]).unwrap();
    let dimension_names = Some(vec!["x".into()]);
    let array1_path: Path = "/group1/array1".try_into().unwrap();
    ds.add_array(array1_path.clone(), shape, dimension_names, user_data.clone()).await?;
    println!();
    print_nodes(&ds).await?;

    println!("## Committing");
    let v1_id = ds.commit("some message").max_concurrent_nodes(8).execute().await?;
    println!(
        r#"
```
ds.commit("some message").execute().await?;
=> {v1_id:?}
```
 "#
    );

    println!("\nNow we continue to use the same repository instance");
    println!();
    println!();
    println!("## Adding an inline chunk");
    let mut ds = repo.writable_session("main").await?;
    ds.set_chunk_ref(
        array1_path.clone(),
        ChunkIndices(vec![0]),
        Some(ChunkPayload::Inline("hello".into())),
    )
    .await?;
    println!(
        r#"
```
ds.set_chunk(
    array1_path.clone(),
    ChunkIndices(vec![0]),
    Some(ChunkPayload::Inline(b"hello".into())),
)
.await?;
```
 "#,
    );
    let chunk = ds.get_chunk_ref(&array1_path, &ChunkIndices(vec![0])).await.unwrap();
    println!("## Reading the chunk");
    println!(
        r#"
```
let chunk = ds.get_chunk_ref(&array1_path, &ChunkIndices(vec![0])).await.unwrap();
=> {chunk:?}
```
 "#
    );

    println!();
    println!("## Committing");
    let v2_id = ds.commit("a message").max_concurrent_nodes(8).execute().await?;
    println!(
        r#"
```
ds.commit("a message").anonymous().execute().await?;
=> {v2_id:?}
```
 "#
    );

    println!("## Creating a new Repository instance @ latest version");

    let mut ds = repo.writable_session("main").await?;

    println!(
        r#"
```
let mut ds = Repository::update(Arc::clone(&storage), ObjectId.from("{v2_id:?}"));
```
 "#
    );

    print_nodes(&ds).await?;

    println!("## Adding a new inline chunk");
    ds.set_chunk_ref(
        array1_path.clone(),
        ChunkIndices(vec![1]),
        Some(ChunkPayload::Inline("bye".into())),
    )
    .await?;

    println!(
        r#"
```
ds.set_chunk(
    array1_path.clone(),
    ChunkIndices(vec![1]),
    Some(icechunk::ChunkPayload::Inline(b"bye".into())),
)
.await?;
```
 "#,
    );

    let chunk = ds.get_chunk_ref(&array1_path, &ChunkIndices(vec![1])).await.unwrap();
    println!("Reading the new chunk => `{chunk:?}`");

    let chunk = ds.get_chunk_ref(&array1_path, &ChunkIndices(vec![0])).await.unwrap();
    println!("Reading the old chunk => `{chunk:?}`");

    println!();
    println!("## Committing");
    let v3_id = ds.commit("commit").max_concurrent_nodes(8).execute().await?;
    println!(
        r#"
```
ds.commit("commit").anonymous().execute().await?;
=> {v3_id:?}
```
 "#
    );

    println!("Creating a new Repository instance, on the previous version");

    let ds = repo.readonly_session(&VersionInfo::SnapshotId(v2_id.clone())).await?;

    println!(
        r#"
```
let ds = Repository::update(Arc::clone(&storage), ObjectId::from("{v2_id:?}"));
```
 "#
    );

    print_nodes(&ds).await?;

    let chunk = ds.get_chunk_ref(&array1_path, &ChunkIndices(vec![0])).await.unwrap();
    println!("Reading the old chunk: {chunk:?}");

    let chunk = ds.get_chunk_ref(&array1_path, &ChunkIndices(vec![1])).await;
    println!("Reading the new chunk: {chunk:?}");

    Ok(())
}

async fn print_nodes(ds: &Session) -> Result<(), SessionError> {
    println!("### List of nodes");
    let rows = ds
        .list_nodes(&Path::root())
        .await?
        .map(|n| n.unwrap())
        .sorted_by_key(|n| n.path.clone())
        .map(|node| format!("|{:10?}|{:15}\n", node.node_type(), node.path.to_string(),))
        .format("");

    println!("{rows}");
    Ok(())
}