icechunk-python 0.1.0-alpha.1

Transactional storage engine for Zarr designed for use on cloud object storage
Documentation
import asyncio
import math
import random
import string
import time
from typing import Literal

import numpy as np
import zarr
from icechunk import IcechunkStore, S3Credentials, StorageConfig, StoreConfig
from zarr.abc.store import Store
from zarr.storage import LocalStore, MemoryStore, RemoteStore


def rdms(n):
    return "".join(
        random.choice(string.ascii_uppercase + string.digits) for _ in range(n)
    )


def generate_array_chunks(size: int, dtype=np.int32):
    # dim sizes
    nz = 64
    nt = 128
    nx = ny = int(math.sqrt(size / nz / nt))

    # chunk sizes
    ct = 2
    cz = 8
    cx = max(nx // 3, 1)
    cy = max(ny // 2, 1)
    chunk_shape = (cx, cy, cz, ct)
    shape = (nx, ny, nz, nt)

    array = np.arange(nx * ny * nz * nt, dtype=dtype).reshape(shape)

    return array, chunk_shape


def create_array(*, group, name, size, dtype, fill_value) -> np.ndarray:
    dims = ("x", "y", "z", "t")
    attrs = {"description": "icechunk test data"}

    array, chunk_shape = generate_array_chunks(size=size, dtype=dtype)

    group.require_array(
        name=name,
        shape=array.shape,
        dtype=array.dtype,
        fill_value=fill_value,
        chunk_shape=chunk_shape,
        dimension_names=dims,
        attributes=attrs,
        data=array,
        exists_ok=True,
    )

    return array


async def run(store: Store) -> None:
    write_start = time.time()
    group = zarr.group(store=store, overwrite=True)
    group.attrs["foo"] = "foo"
    print(group)

    first_commit = None
    if isinstance(store, IcechunkStore):
        first_commit = await store.commit("initial commit")

    expected = {}
    expected["root-foo"] = create_array(
        group=group, name="root-foo", size=1 * 1024 * 256, dtype=np.int32, fill_value=-1
    )

    print(f"Root group members are: {group.members()}")
    print(f"Root group attrs: {dict(group['root-foo'].attrs)}")

    group["root-foo"].attrs["update"] = "new attr"

    if isinstance(store, IcechunkStore):
        _second_commit = await store.commit("added array, updated attr")

    assert len(group["root-foo"].attrs) == 2
    assert len(group.members()) == 1

    if isinstance(store, IcechunkStore) and first_commit is not None:
        await store.checkout(first_commit)
    group.attrs["update"] = "new attr 2"

    if isinstance(store, IcechunkStore):
        try:
            await store.commit("new attr 2")
        except ValueError:
            pass
        else:
            raise ValueError("should have conflicted")

        await store.reset()  # FIXME: WHY
        await store.checkout(branch="main")

    group["root-foo"].attrs["update"] = "new attr 2"
    if isinstance(store, IcechunkStore):
        _third_commit = await store.commit("new attr 2")

        try:
            await store.commit("rewrote array")
        except ValueError:
            pass
        else:
            raise ValueError("should have failed, committing without changes.")

    newgroup = zarr.group(store=store, path="group1/")
    expected["group1/foo1"] = create_array(
        group=newgroup,
        name="foo1",
        dtype=np.float32,
        size=1 * 1024 * 128,
        fill_value=-1234,
    )
    expected["group1/foo2"] = create_array(
        group=newgroup,
        name="foo2",
        dtype=np.float16,
        size=1 * 1024 * 64,
        fill_value=-1234,
    )
    newgroup = zarr.group(store=store, path="group2/")
    expected["group2/foo3"] = create_array(
        group=newgroup,
        name="foo3",
        dtype=np.int64,
        size=1 * 1024 * 32,
        fill_value=-1234,
    )
    if isinstance(store, IcechunkStore):
        _fourth_commit = await store.commit("added groups and arrays")

    print(f"Write done in {time.time() - write_start} secs")

    read_start = time.time()

    root_group = zarr.group(store=store)
    for key, value in expected.items():
        print(key)
        array = root_group[key]
        assert isinstance(array, zarr.Array)

        print(
            f"numchunks: {math.prod(s // c for s, c in zip(array.shape, array.chunks, strict=False))}"
        )
        np.testing.assert_array_equal(array[:], value)

    print(f"Read done in {time.time() - read_start} secs")


async def create_icechunk_store(*, storage: StorageConfig) -> IcechunkStore:
    return await IcechunkStore.open(
        storage=storage, mode="r+", config=StoreConfig(inline_chunk_threshold_bytes=1)
    )


async def create_zarr_store(*, store: Literal["memory", "local", "s3"]) -> Store:
    if store == "local":
        return await LocalStore.open(f"/tmp/{rdms(6)}", mode="w")
    if store == "memory":
        return await MemoryStore.open(mode="w")
    if store == "s3":
        return RemoteStore.from_url(
            "s3://testbucket/root-zarr",
            mode="w",
            storage_options={
                "anon": False,
                "key": "minio123",
                "secret": "minio123",
                "region": "us-east-1",
                "endpoint_url": "http://localhost:9000",
            },
        )


if __name__ == "__main__":
    MEMORY = StorageConfig.memory("new")
    MINIO = StorageConfig.s3_from_config(
        bucket="testbucket",
        prefix="root-icechunk",
        credentials=S3Credentials(
            access_key_id="minio123",
            secret_access_key="minio123",
            session_token=None,
        ),
        region="us-east-1",
        allow_http=True,
        endpoint_url="http://localhost:9000",
    )

    print("Icechunk store")
    store = asyncio.run(create_icechunk_store(storage=MINIO))
    asyncio.run(run(store))

    print("Zarr store")
    zarr_store = asyncio.run(create_zarr_store(store="local"))
    asyncio.run(run(zarr_store))